diff options
Diffstat (limited to 'res/res_corosync.c')
-rw-r--r-- | res/res_corosync.c | 390 |
1 files changed, 323 insertions, 67 deletions
diff --git a/res/res_corosync.c b/res/res_corosync.c index ce4165498..6c4a3d1e7 100644 --- a/res/res_corosync.c +++ b/res/res_corosync.c @@ -44,20 +44,125 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/event.h" #include "asterisk/cli.h" #include "asterisk/devicestate.h" +#include "asterisk/app.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_message_router.h" AST_RWLOCK_DEFINE_STATIC(event_types_lock); +static void publish_mwi_to_stasis(struct ast_event *event); +static void publish_device_state_to_stasis(struct ast_event *event); + +/*! \brief The internal topic used for message forwarding and pings */ +static struct stasis_topic *corosync_aggregate_topic; + +/*! \brief Our \ref stasis message router */ +static struct stasis_message_router *stasis_router; + +/*! \brief Internal accessor for our topic */ +static struct stasis_topic *corosync_topic(void) +{ + return corosync_aggregate_topic; +} + +/*! \brief A payload wrapper around a corosync ping event */ +struct corosync_ping_payload { + /*! The corosync ping event being passed over \ref stasis */ + struct ast_event *event; +}; + +/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */ +static void corosync_ping_payload_dtor(void *obj) +{ + struct corosync_ping_payload *payload = obj; + + ast_free(payload->event); +} + +/*! \brief Convert a Corosync PING to a \ref ast_event */ +static struct ast_event *corosync_ping_to_event(struct stasis_message *message) +{ + struct corosync_ping_payload *payload; + struct ast_event *event; + struct ast_eid *event_eid; + + if (!message) { + return NULL; + } + + payload = stasis_message_data(message); + + if (!payload->event) { + return NULL; + } + + event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID); + + event = ast_event_new(AST_EVENT_PING, + AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid), + AST_EVENT_IE_END); + + return event; +} + +STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type, + .to_event = corosync_ping_to_event, ); + +/*! \brief Publish a Corosync ping to \ref stasis */ +static void publish_corosync_ping_to_stasis(struct ast_event *event) +{ + struct corosync_ping_payload *payload; + struct stasis_message *message; + + ast_assert(ast_event_get_type(event) == AST_EVENT_PING); + ast_assert(event != NULL); + + payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload"); + if (!payload) { + return; + } + payload->event = event; + + message = stasis_message_create(corosync_ping_message_type(), payload); + if (!message) { + ao2_t_ref(payload, -1, "Destroy payload on off nominal"); + return; + } + + stasis_publish(corosync_topic(), message); + + ao2_t_ref(payload, -1, "Hand ref to stasis"); + ao2_t_ref(message, -1, "Hand ref to stasis"); +} + static struct { const char *name; - struct ast_event_sub *sub; + struct stasis_forward *sub; unsigned char publish; unsigned char publish_default; unsigned char subscribe; unsigned char subscribe_default; + struct stasis_topic *(* topic_fn)(void); + struct stasis_cache *(* cache_fn)(void); + struct stasis_message_type *(* message_type_fn)(void); + void (* publish_to_stasis)(struct ast_event *); } event_types[] = { - [AST_EVENT_MWI] = { .name = "mwi", }, - [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", }, - [AST_EVENT_PING] = { .name = "ping", .publish_default = 1, .subscribe_default = 1 }, + [AST_EVENT_MWI] = { .name = "mwi", + .topic_fn = ast_mwi_topic_all, + .cache_fn = ast_mwi_state_cache, + .message_type_fn = ast_mwi_state_type, + .publish_to_stasis = publish_mwi_to_stasis, }, + [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", + .topic_fn = ast_device_state_topic_all, + .cache_fn = ast_device_state_cache, + .message_type_fn = ast_device_state_message_type, + .publish_to_stasis = publish_device_state_to_stasis, }, + [AST_EVENT_PING] = { .name = "ping", + .publish_default = 1, + .subscribe_default = 1, + .topic_fn = corosync_topic, + .message_type_fn = corosync_ping_message_type, + .publish_to_stasis = publish_corosync_ping_to_stasis, }, }; static struct { @@ -88,6 +193,71 @@ static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = cfg_shutdown_cb, }; +/*! \brief Publish a received MWI \ref ast_event to \ref stasis */ +static void publish_mwi_to_stasis(struct ast_event *event) +{ + const char *mailbox; + const char *context; + unsigned int new_msgs; + unsigned int old_msgs; + struct ast_eid *event_eid; + + ast_assert(ast_event_get_type(event) == AST_EVENT_MWI); + + mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX); + context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT); + new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); + old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS); + event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + + if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) { + return; + } + + if (new_msgs > INT_MAX) { + new_msgs = INT_MAX; + } + + if (old_msgs > INT_MAX) { + old_msgs = INT_MAX; + } + + if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs, + (int)old_msgs, NULL, event_eid)) { + char eid[16]; + ast_eid_to_str(eid, sizeof(eid), event_eid); + ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n", + mailbox, context, eid); + } +} + +/*! \brief Publish a received device state \ref ast_event to \ref stasis */ +static void publish_device_state_to_stasis(struct ast_event *event) +{ + const char *device; + enum ast_device_state state; + unsigned int cachable; + struct ast_eid *event_eid; + + ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE); + + device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE); + state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); + cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE); + event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + + if (ast_strlen_zero(device)) { + return; + } + + if (ast_publish_device_state_full(device, state, cachable, event_eid)) { + char eid[16]; + ast_eid_to_str(eid, sizeof(eid), event_eid); + ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n", + device, eid); + } +} + static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len); @@ -101,8 +271,6 @@ static cpg_callbacks_t cpg_callbacks = { .cpg_confchg_fn = cpg_confchg_cb, }; -static void ast_event_cb(const struct ast_event *event, void *data); - #ifdef HAVE_COROSYNC_CFG_STATE_TRACK static void cfg_state_track_cb( corosync_cfg_state_notification_buffer_t *notification_buffer, @@ -120,6 +288,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { struct ast_event *event; + void (*publish_handler)(struct ast_event *) = NULL; + enum ast_event_type event_type; if (msg_len < ast_event_minimum_length()) { ast_debug(1, "Ignoring event that's too small. %u < %u\n", @@ -133,9 +303,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam return; } + event_type = ast_event_get_type(msg); + if (event_type > AST_EVENT_TOTAL) { + /* Egads, we don't support this */ + return; + } + ast_rwlock_rdlock(&event_types_lock); - if (!event_types[ast_event_get_type(msg)].subscribe) { - /* We are not configured to subscribe to these events. */ + publish_handler = event_types[event_type].publish_to_stasis; + if (!event_types[event_type].subscribe || !publish_handler) { + /* We are not configured to subscribe to these events or + we have no way to publish it internally. */ ast_rwlock_unlock(&event_types_lock); return; } @@ -147,20 +325,80 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam memcpy(event, msg, msg_len); + if (event_type == AST_EVENT_PING) { + const struct ast_eid *eid; + char buf[128] = ""; + + eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); + ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf); + } + ast_debug(5, "Publishing event %s (%d) to stasis\n", + ast_event_get_type_name(event), event_type); + publish_handler(event); +} + +static void publish_to_corosync(struct stasis_message *message) +{ + cs_error_t cs_err; + struct iovec iov; + struct ast_event *event; + + event = stasis_message_to_event(message); + if (!event) { + return; + } + + if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { + /* If the event didn't originate from this server, don't send it back out. */ + ast_event_destroy(event); + return; + } + if (ast_event_get_type(event) == AST_EVENT_PING) { const struct ast_eid *eid; char buf[128] = ""; eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); - ast_log(LOG_NOTICE, "(cpg_deliver_cb) Got event PING from server with EID: '%s'\n", buf); + ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf); + } + + iov.iov_base = (void *)event; + iov.iov_len = ast_event_get_size(event); + + ast_debug(5, "Publishing event %s (%d) to corosync\n", + ast_event_get_type_name(event), ast_event_get_type(event)); - ast_event_queue(event); - } else { - ast_event_queue_and_cache(event); + /* The stasis subscription will only exist if we are configured to publish + * these events, so just send away. */ + if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { + ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err); } } +static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + if (!message) { + return; + } + + publish_to_corosync(message); +} + +static int dump_cache_cb(void *obj, void *arg, int flags) +{ + struct stasis_message *message = obj; + + if (!message) { + return 0; + } + + publish_to_corosync(message); + + return 0; +} + static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, @@ -176,20 +414,27 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam } for (i = 0; i < ARRAY_LEN(event_types); i++) { - struct ast_event_sub *event_sub; + struct ao2_container *messages; ast_rwlock_rdlock(&event_types_lock); if (!event_types[i].publish) { ast_rwlock_unlock(&event_types_lock); continue; } + + if (!event_types[i].cache_fn || !event_types[i].message_type_fn) { + ast_rwlock_unlock(&event_types_lock); + continue; + } + + messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(), + event_types[i].message_type_fn(), + &ast_eid_default); ast_rwlock_unlock(&event_types_lock); - event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL); - ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID, - &ast_eid_default, sizeof(ast_eid_default)); - ast_event_dump_cache(event_sub); - ast_event_sub_destroy(event_sub); + ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL); + + ao2_t_ref(messages, -1, "Dispose of dumped cache"); } } @@ -231,13 +476,13 @@ static void *dispatch_thread_handler(void *data) if (pfd[0].revents & POLLIN) { if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) { - ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err); + ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err); } } if (pfd[1].revents & POLLIN) { if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) { - ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err); + ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err); } } @@ -287,37 +532,6 @@ static void *dispatch_thread_handler(void *data) return NULL; } -static void ast_event_cb(const struct ast_event *event, void *data) -{ - cs_error_t cs_err; - struct iovec iov = { - .iov_base = (void *) event, - .iov_len = ast_event_get_size(event), - }; - - if (ast_event_get_type(event) == AST_EVENT_PING) { - const struct ast_eid *eid; - char buf[128] = ""; - - eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); - ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); - ast_log(LOG_NOTICE, "(ast_event_cb) Got event PING from server with EID: '%s'\n", buf); - } - - if (ast_eid_cmp(&ast_eid_default, - ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { - /* If the event didn't originate from this server, don't send it back out. */ - return; - } - - /* The ast_event subscription will only exist if we are configured to publish - * these events, so just send away. */ - - if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { - ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err); - } -} - static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { cs_error_t cs_err; @@ -368,7 +582,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_ continue; } - ast_cli(a->fd, "=== Node %u\n", i); + ast_cli(a->fd, "=== Node %d\n", i); ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value); for (j = 0; j < num_addrs; j++) { @@ -378,7 +592,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_ getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST); - ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf); + ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf); } } @@ -421,7 +635,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args return CLI_FAILURE; } - ast_event_queue(event); + ast_rwlock_rdlock(&event_types_lock); + event_types[AST_EVENT_PING].publish_to_stasis(event); + ast_rwlock_unlock(&event_types_lock); return CLI_SUCCESS; } @@ -532,11 +748,16 @@ static int load_general_config(struct ast_config *cfg) for (i = 0; i < ARRAY_LEN(event_types); i++) { if (event_types[i].publish && !event_types[i].sub) { - event_types[i].sub = ast_event_subscribe(i, - ast_event_cb, "Corosync", NULL, - AST_EVENT_IE_END); + event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(), + corosync_topic()); + stasis_message_router_add(stasis_router, + event_types[i].message_type_fn(), + stasis_message_cb, + NULL); } else if (!event_types[i].publish && event_types[i].sub) { - event_types[i].sub = ast_event_unsubscribe(event_types[i].sub); + event_types[i].sub = stasis_forward_cancel(event_types[i].sub); + stasis_message_router_remove(stasis_router, + event_types[i].message_type_fn()); } } @@ -577,14 +798,32 @@ static void cleanup_module(void) cs_error_t cs_err; unsigned int i; - for (i = 0; i < ARRAY_LEN(event_types); i++) { - if (event_types[i].sub) { - event_types[i].sub = ast_event_unsubscribe(event_types[i].sub); + if (stasis_router) { + + /* Unsubscribe all topic forwards and cancel all message routes */ + ast_rwlock_wrlock(&event_types_lock); + for (i = 0; i < ARRAY_LEN(event_types); i++) { + if (event_types[i].sub) { + event_types[i].sub = stasis_forward_cancel(event_types[i].sub); + stasis_message_router_remove(stasis_router, + event_types[i].message_type_fn()); + } + event_types[i].publish = 0; + event_types[i].subscribe = 0; } - event_types[i].publish = 0; - event_types[i].subscribe = 0; + ast_rwlock_unlock(&event_types_lock); + + stasis_message_router_unsubscribe_and_join(stasis_router); + stasis_router = NULL; + } + + if (corosync_aggregate_topic) { + ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup"); + corosync_aggregate_topic = NULL; } + STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type); + if (dispatch_thread.id != AST_PTHREADT_NULL) { char meepmeep = 'x'; dispatch_thread.stop = 1; @@ -623,13 +862,30 @@ static int load_module(void) enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE; struct cpg_name name; + corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic"); + if (!corosync_aggregate_topic) { + ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n"); + goto failed; + } + + stasis_router = stasis_message_router_create(corosync_aggregate_topic); + if (!stasis_router) { + ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n"); + goto failed; + } + + if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) { + ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n"); + goto failed; + } + if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err); - return AST_MODULE_LOAD_DECLINE; + ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err); + goto failed; } if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err); + ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err); goto failed; } @@ -637,7 +893,7 @@ static int load_module(void) name.length = strlen(name.value); if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err); + ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err); goto failed; } |