diff options
Diffstat (limited to 'res/res_corosync.c')
-rw-r--r-- | res/res_corosync.c | 273 |
1 files changed, 258 insertions, 15 deletions
diff --git a/res/res_corosync.c b/res/res_corosync.c index 6642acd2c..675bb3307 100644 --- a/res/res_corosync.c +++ b/res/res_corosync.c @@ -47,11 +47,16 @@ ASTERISK_REGISTER_FILE(); #include "asterisk/app.h" #include "asterisk/stasis.h" #include "asterisk/stasis_message_router.h" +#include "asterisk/stasis_system.h" AST_RWLOCK_DEFINE_STATIC(event_types_lock); static void publish_mwi_to_stasis(struct ast_event *event); static void publish_device_state_to_stasis(struct ast_event *event); +static void publish_cluster_discovery_to_stasis(struct ast_event *event); + +/*! \brief All the nodes that we're aware of */ +static struct ao2_container *nodes; /*! \brief The internal topic used for message forwarding and pings */ static struct stasis_topic *corosync_aggregate_topic; @@ -65,6 +70,78 @@ static struct stasis_topic *corosync_topic(void) return corosync_aggregate_topic; } +struct corosync_node { + /*! The corosync ID */ + int id; + /*! The Asterisk EID */ + struct ast_eid eid; + /*! The IP address of the node */ + struct ast_sockaddr addr; +}; + +static struct corosync_node *corosync_node_alloc(struct ast_event *event) +{ + struct corosync_node *node; + + node = ao2_alloc_options(sizeof(*node), NULL, AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!node) { + return NULL; + } + + memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid)); + node->id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID); + ast_sockaddr_parse(&node->addr, ast_event_get_ie_str(event, AST_EVENT_IE_LOCAL_ADDR), PARSE_PORT_IGNORE); + + return node; +} + +static int corosync_node_hash_fn(const void *obj, const int flags) +{ + const struct corosync_node *node; + const int *id; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + id = obj; + break; + case OBJ_SEARCH_OBJECT: + node = obj; + id = &node->id; + break; + default: + ast_assert(0); + return 0; + } + return *id; +} + +static int corosync_node_cmp_fn(void *obj, void *arg, int flags) +{ + struct corosync_node *left = obj; + struct corosync_node *right = arg; + const int *id = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + id = &right->id; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = (left->id == *id); + break; + case OBJ_SEARCH_PARTIAL_KEY: + cmp = (left->id == right->id); + break; + default: + /* Sort can only work on something with a full or partial key. */ + ast_assert(0); + cmp = 1; + break; + } + return cmp ? CMP_MATCH : 0; +} + + /*! \brief A payload wrapper around a corosync ping event */ struct corosync_ping_payload { /*! The corosync ping event being passed over \ref stasis */ @@ -167,6 +244,12 @@ static struct { .topic_fn = corosync_topic, .message_type_fn = corosync_ping_message_type, .publish_to_stasis = publish_corosync_ping_to_stasis, }, + [AST_EVENT_CLUSTER_DISCOVERY] = { .name = "cluster_discovery", + .publish_default = 1, + .subscribe_default = 1, + .topic_fn = ast_system_topic, + .message_type_fn = ast_cluster_discovery_type, + .publish_to_stasis = publish_cluster_discovery_to_stasis, }, }; static struct { @@ -197,6 +280,97 @@ static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = cfg_shutdown_cb, }; +/*! \brief Publish cluster discovery to \ref stasis */ +static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined) +{ + struct ast_json *json; + struct ast_json_payload *payload; + struct stasis_message *message; + char eid[18]; + const char *addr; + + ast_eid_to_str(eid, sizeof(eid), &node->eid); + addr = ast_sockaddr_stringify_addr(&node->addr); + + ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n", + node->id, + eid, + addr, + joined ? "joined" : "left"); + + json = ast_json_pack("{s: s, s: i, s: s, s: i}", + "address", addr, + "node_id", node->id, + "eid", eid, + "joined", joined); + if (!json) { + return; + } + + payload = ast_json_payload_create(json); + if (!payload) { + ast_json_unref(json); + return; + } + + message = stasis_message_create(ast_cluster_discovery_type(), payload); + if (!message) { + ast_json_unref(json); + ao2_ref(payload, -1); + return; + } + + stasis_publish(ast_system_topic(), message); + ast_json_unref(json); + ao2_ref(payload, -1); + ao2_ref(message, -1); +} + +static void send_cluster_notify(void); + +/*! \brief Publish a received cluster discovery \ref ast_event to \ref stasis */ +static void publish_cluster_discovery_to_stasis(struct ast_event *event) +{ + struct corosync_node *node; + int id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID); + struct ast_eid *event_eid; + + ast_assert(ast_event_get_type(event) == AST_EVENT_CLUSTER_DISCOVERY); + + event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + if (!ast_eid_cmp(&ast_eid_default, event_eid)) { + /* Don't feed events back in that originated locally. */ + return; + } + + ao2_lock(nodes); + node = ao2_find(nodes, &id, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (node) { + /* We already know about this node */ + ao2_unlock(nodes); + ao2_ref(node, -1); + return; + } + + node = corosync_node_alloc(event); + if (!node) { + ao2_unlock(nodes); + return; + } + ao2_link_flags(nodes, node, OBJ_NOLOCK); + ao2_unlock(nodes); + + publish_cluster_discovery_to_stasis_full(node, 1); + + ao2_ref(node, -1); + + /* + * When we get news that someone else has joined, we need to let them + * know we exist as well. + */ + send_cluster_notify(); +} + /*! \brief Publish a received MWI \ref ast_event to \ref stasis */ static void publish_mwi_to_stasis(struct ast_event *event) { @@ -228,7 +402,7 @@ static void publish_mwi_to_stasis(struct ast_event *event) if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs, (int)old_msgs, NULL, event_eid)) { - char eid[16]; + char eid[18]; ast_eid_to_str(eid, sizeof(eid), event_eid); ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n", mailbox, context, eid); @@ -255,7 +429,7 @@ static void publish_device_state_to_stasis(struct ast_event *event) } if (ast_publish_device_state_full(device, state, cachable, event_eid)) { - char eid[16]; + char eid[18]; ast_eid_to_str(eid, sizeof(eid), event_eid); ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n", device, eid); @@ -342,10 +516,27 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam publish_handler(event); } -static void publish_to_corosync(struct stasis_message *message) +static void publish_event_to_corosync(struct ast_event *event) { cs_error_t cs_err; struct iovec iov; + + iov.iov_base = (void *)event; + iov.iov_len = ast_event_get_size(event); + + ast_debug(5, "Publishing event %s (%u) to corosync\n", + ast_event_get_type_name(event), ast_event_get_type(event)); + + /* The stasis subscription will only exist if we are configured to publish + * these events, so just send away. */ + if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { + ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n", + cs_err, ast_event_get_type_name(event), ast_event_get_type(event)); + } +} + +static void publish_to_corosync(struct stasis_message *message) +{ struct ast_event *event; event = stasis_message_to_event(message); @@ -368,17 +559,7 @@ static void publish_to_corosync(struct stasis_message *message) ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf); } - iov.iov_base = (void *)event; - iov.iov_len = ast_event_get_size(event); - - ast_debug(5, "Publishing event %s (%u) to corosync\n", - ast_event_get_type_name(event), ast_event_get_type(event)); - - /* The stasis subscription will only exist if we are configured to publish - * these events, so just send away. */ - if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { - ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err); - } + publish_event_to_corosync(event); } static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) @@ -410,9 +591,22 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam { unsigned int i; + + for (i = 0; i < left_list_entries; i++) { + const struct cpg_address *cpg_node = &left_list[i]; + struct corosync_node* node; + + node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY); + if (!node) { + continue; + } + + publish_cluster_discovery_to_stasis_full(node, 0); + ao2_ref(node, -1); + } + /* If any new nodes have joined, dump our cache of events we are publishing * that originated from this server. */ - if (!joined_list_entries) { return; } @@ -442,6 +636,45 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam } } +/*! \brief Informs the cluster of our EID and our IP addresses */ +static void send_cluster_notify(void) +{ + struct ast_event *event; + unsigned int node_id; + cs_error_t cs_err; + corosync_cfg_node_address_t corosync_addr; + int num_addrs = 0; + struct sockaddr *sa; + size_t sa_len; + char buf[128]; + int res; + + if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) { + ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n"); + return; + } + + if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) { + ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n"); + return; + } + + sa = (struct sockaddr *)corosync_addr.address; + sa_len = (size_t)corosync_addr.address_length; + if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) { + ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n", + gai_strerror(res), res); + return; + } + + event = ast_event_new(AST_EVENT_CLUSTER_DISCOVERY, + AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_UINT, node_id, + AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_PLTYPE_STR, buf, + AST_EVENT_IE_END); + publish_event_to_corosync(event); + ast_free(event); +} + static void *dispatch_thread_handler(void *data) { cs_error_t cs_err; @@ -463,6 +696,7 @@ static void *dispatch_thread_handler(void *data) pfd[2].fd = dispatch_thread.alert_pipe[0]; + send_cluster_notify(); while (!dispatch_thread.stop) { int res; @@ -530,6 +764,7 @@ static void *dispatch_thread_handler(void *data) } ast_log(LOG_NOTICE, "Corosync recovery complete.\n"); + send_cluster_notify(); } } @@ -858,6 +1093,9 @@ static void cleanup_module(void) ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err); } cfg_handle = 0; + + ao2_cleanup(nodes); + nodes = NULL; } static int load_module(void) @@ -865,6 +1103,11 @@ static int load_module(void) cs_error_t cs_err; struct cpg_name name; + nodes = ao2_container_alloc(23, corosync_node_hash_fn, corosync_node_cmp_fn); + if (!nodes) { + goto failed; + } + corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic"); if (!corosync_aggregate_topic) { ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n"); |