summaryrefslogtreecommitdiff
path: root/res/res_corosync.c
diff options
context:
space:
mode:
authorMatt Jordan <mjordan@digium.com>2015-10-19 18:55:33 -0500
committerMatt Jordan <mjordan@digium.com>2016-07-13 09:11:37 -0500
commitf12311ee696e4a623a4160aa0c2a83712088b5d3 (patch)
treed0a73289b9ae41a2eecc6ad3eef0caee799c9e4d /res/res_corosync.c
parent73d8cb587d6515b707c9cafe334e18fbd7c24905 (diff)
res/res_corosync: Raise a Stasis message on node join/leave events
When res_corosync detects that a node leaves or joins, it currently is informed of this via Corosync callbacks. However, there are a few limitations with the information presented: (1) While we have information that Corosync is aware of - such as the Corosync nodeid - that information is really only useful inside of Corosync or res_corosync. There's no way to translate a Corosync nodeid to some other internally useful unique identifier for the Asterisk instance that just joined or left the cluster. (2) While res_corosync is notified of the instance joining or leaving the cluster, it has no mechanism to inform the Asterisk core or other modules of this event. This limits the usefulness of res_corosync as a heartbeat mechanism for other modules. This patch addresses both issues. First, it adds the notion of a cluster discovery message both within the Stasis message bus, as well as the binary event messages that res_corosync uses to transmit data back and forth within the cluster. When Asterisk joins the cluster, it sends a discovery message to the other nodes in the cluster, which correlates the Corosync nodeid along with the Asterisk EID. res_corosync now maintains a hash of Corosync nodeids to Asterisk EIDs, such that it can map changes in cluster state with the Asterisk instance that has that nodeid. Likewise, when an Asterisk instance receives a discovery message from a node in the cluster, it now sends its own discovery message back to the originating node with the local Asterisk EID. This lets Asterisk instances within the cluster build a complete picture of the other Asterisk instances within the cluster. Second, it publishes the discovery messages onto the Stasis message bus. Said messages are published whenever a node joins or leaves the cluster. Interested modules can subscribe for the ast_cluster_discovery_type() message under the ast_system_topic() and be notified when changes in cluster state occur. Change-Id: I9015f418d6ae7f47e4994e04e18948df4d49b465
Diffstat (limited to 'res/res_corosync.c')
-rw-r--r--res/res_corosync.c273
1 files changed, 258 insertions, 15 deletions
diff --git a/res/res_corosync.c b/res/res_corosync.c
index 6642acd2c..675bb3307 100644
--- a/res/res_corosync.c
+++ b/res/res_corosync.c
@@ -47,11 +47,16 @@ ASTERISK_REGISTER_FILE();
#include "asterisk/app.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
+#include "asterisk/stasis_system.h"
AST_RWLOCK_DEFINE_STATIC(event_types_lock);
static void publish_mwi_to_stasis(struct ast_event *event);
static void publish_device_state_to_stasis(struct ast_event *event);
+static void publish_cluster_discovery_to_stasis(struct ast_event *event);
+
+/*! \brief All the nodes that we're aware of */
+static struct ao2_container *nodes;
/*! \brief The internal topic used for message forwarding and pings */
static struct stasis_topic *corosync_aggregate_topic;
@@ -65,6 +70,78 @@ static struct stasis_topic *corosync_topic(void)
return corosync_aggregate_topic;
}
+struct corosync_node {
+ /*! The corosync ID */
+ int id;
+ /*! The Asterisk EID */
+ struct ast_eid eid;
+ /*! The IP address of the node */
+ struct ast_sockaddr addr;
+};
+
+static struct corosync_node *corosync_node_alloc(struct ast_event *event)
+{
+ struct corosync_node *node;
+
+ node = ao2_alloc_options(sizeof(*node), NULL, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!node) {
+ return NULL;
+ }
+
+ memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid));
+ node->id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID);
+ ast_sockaddr_parse(&node->addr, ast_event_get_ie_str(event, AST_EVENT_IE_LOCAL_ADDR), PARSE_PORT_IGNORE);
+
+ return node;
+}
+
+static int corosync_node_hash_fn(const void *obj, const int flags)
+{
+ const struct corosync_node *node;
+ const int *id;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ id = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ node = obj;
+ id = &node->id;
+ break;
+ default:
+ ast_assert(0);
+ return 0;
+ }
+ return *id;
+}
+
+static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
+{
+ struct corosync_node *left = obj;
+ struct corosync_node *right = arg;
+ const int *id = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ id = &right->id;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = (left->id == *id);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ cmp = (left->id == right->id);
+ break;
+ default:
+ /* Sort can only work on something with a full or partial key. */
+ ast_assert(0);
+ cmp = 1;
+ break;
+ }
+ return cmp ? CMP_MATCH : 0;
+}
+
+
/*! \brief A payload wrapper around a corosync ping event */
struct corosync_ping_payload {
/*! The corosync ping event being passed over \ref stasis */
@@ -167,6 +244,12 @@ static struct {
.topic_fn = corosync_topic,
.message_type_fn = corosync_ping_message_type,
.publish_to_stasis = publish_corosync_ping_to_stasis, },
+ [AST_EVENT_CLUSTER_DISCOVERY] = { .name = "cluster_discovery",
+ .publish_default = 1,
+ .subscribe_default = 1,
+ .topic_fn = ast_system_topic,
+ .message_type_fn = ast_cluster_discovery_type,
+ .publish_to_stasis = publish_cluster_discovery_to_stasis, },
};
static struct {
@@ -197,6 +280,97 @@ static corosync_cfg_callbacks_t cfg_callbacks = {
.corosync_cfg_shutdown_callback = cfg_shutdown_cb,
};
+/*! \brief Publish cluster discovery to \ref stasis */
+static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
+{
+ struct ast_json *json;
+ struct ast_json_payload *payload;
+ struct stasis_message *message;
+ char eid[18];
+ const char *addr;
+
+ ast_eid_to_str(eid, sizeof(eid), &node->eid);
+ addr = ast_sockaddr_stringify_addr(&node->addr);
+
+ ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
+ node->id,
+ eid,
+ addr,
+ joined ? "joined" : "left");
+
+ json = ast_json_pack("{s: s, s: i, s: s, s: i}",
+ "address", addr,
+ "node_id", node->id,
+ "eid", eid,
+ "joined", joined);
+ if (!json) {
+ return;
+ }
+
+ payload = ast_json_payload_create(json);
+ if (!payload) {
+ ast_json_unref(json);
+ return;
+ }
+
+ message = stasis_message_create(ast_cluster_discovery_type(), payload);
+ if (!message) {
+ ast_json_unref(json);
+ ao2_ref(payload, -1);
+ return;
+ }
+
+ stasis_publish(ast_system_topic(), message);
+ ast_json_unref(json);
+ ao2_ref(payload, -1);
+ ao2_ref(message, -1);
+}
+
+static void send_cluster_notify(void);
+
+/*! \brief Publish a received cluster discovery \ref ast_event to \ref stasis */
+static void publish_cluster_discovery_to_stasis(struct ast_event *event)
+{
+ struct corosync_node *node;
+ int id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID);
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_CLUSTER_DISCOVERY);
+
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ if (!ast_eid_cmp(&ast_eid_default, event_eid)) {
+ /* Don't feed events back in that originated locally. */
+ return;
+ }
+
+ ao2_lock(nodes);
+ node = ao2_find(nodes, &id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (node) {
+ /* We already know about this node */
+ ao2_unlock(nodes);
+ ao2_ref(node, -1);
+ return;
+ }
+
+ node = corosync_node_alloc(event);
+ if (!node) {
+ ao2_unlock(nodes);
+ return;
+ }
+ ao2_link_flags(nodes, node, OBJ_NOLOCK);
+ ao2_unlock(nodes);
+
+ publish_cluster_discovery_to_stasis_full(node, 1);
+
+ ao2_ref(node, -1);
+
+ /*
+ * When we get news that someone else has joined, we need to let them
+ * know we exist as well.
+ */
+ send_cluster_notify();
+}
+
/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
static void publish_mwi_to_stasis(struct ast_event *event)
{
@@ -228,7 +402,7 @@ static void publish_mwi_to_stasis(struct ast_event *event)
if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
(int)old_msgs, NULL, event_eid)) {
- char eid[16];
+ char eid[18];
ast_eid_to_str(eid, sizeof(eid), event_eid);
ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
mailbox, context, eid);
@@ -255,7 +429,7 @@ static void publish_device_state_to_stasis(struct ast_event *event)
}
if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
- char eid[16];
+ char eid[18];
ast_eid_to_str(eid, sizeof(eid), event_eid);
ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
device, eid);
@@ -342,10 +516,27 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
publish_handler(event);
}
-static void publish_to_corosync(struct stasis_message *message)
+static void publish_event_to_corosync(struct ast_event *event)
{
cs_error_t cs_err;
struct iovec iov;
+
+ iov.iov_base = (void *)event;
+ iov.iov_len = ast_event_get_size(event);
+
+ ast_debug(5, "Publishing event %s (%u) to corosync\n",
+ ast_event_get_type_name(event), ast_event_get_type(event));
+
+ /* The stasis subscription will only exist if we are configured to publish
+ * these events, so just send away. */
+ if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
+ ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
+ cs_err, ast_event_get_type_name(event), ast_event_get_type(event));
+ }
+}
+
+static void publish_to_corosync(struct stasis_message *message)
+{
struct ast_event *event;
event = stasis_message_to_event(message);
@@ -368,17 +559,7 @@ static void publish_to_corosync(struct stasis_message *message)
ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
}
- iov.iov_base = (void *)event;
- iov.iov_len = ast_event_get_size(event);
-
- ast_debug(5, "Publishing event %s (%u) to corosync\n",
- ast_event_get_type_name(event), ast_event_get_type(event));
-
- /* The stasis subscription will only exist if we are configured to publish
- * these events, so just send away. */
- if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
- ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err);
- }
+ publish_event_to_corosync(event);
}
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
@@ -410,9 +591,22 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
{
unsigned int i;
+
+ for (i = 0; i < left_list_entries; i++) {
+ const struct cpg_address *cpg_node = &left_list[i];
+ struct corosync_node* node;
+
+ node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
+ if (!node) {
+ continue;
+ }
+
+ publish_cluster_discovery_to_stasis_full(node, 0);
+ ao2_ref(node, -1);
+ }
+
/* If any new nodes have joined, dump our cache of events we are publishing
* that originated from this server. */
-
if (!joined_list_entries) {
return;
}
@@ -442,6 +636,45 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
}
+/*! \brief Informs the cluster of our EID and our IP addresses */
+static void send_cluster_notify(void)
+{
+ struct ast_event *event;
+ unsigned int node_id;
+ cs_error_t cs_err;
+ corosync_cfg_node_address_t corosync_addr;
+ int num_addrs = 0;
+ struct sockaddr *sa;
+ size_t sa_len;
+ char buf[128];
+ int res;
+
+ if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
+ ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
+ return;
+ }
+
+ if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
+ ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
+ return;
+ }
+
+ sa = (struct sockaddr *)corosync_addr.address;
+ sa_len = (size_t)corosync_addr.address_length;
+ if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
+ ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
+ gai_strerror(res), res);
+ return;
+ }
+
+ event = ast_event_new(AST_EVENT_CLUSTER_DISCOVERY,
+ AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_UINT, node_id,
+ AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_PLTYPE_STR, buf,
+ AST_EVENT_IE_END);
+ publish_event_to_corosync(event);
+ ast_free(event);
+}
+
static void *dispatch_thread_handler(void *data)
{
cs_error_t cs_err;
@@ -463,6 +696,7 @@ static void *dispatch_thread_handler(void *data)
pfd[2].fd = dispatch_thread.alert_pipe[0];
+ send_cluster_notify();
while (!dispatch_thread.stop) {
int res;
@@ -530,6 +764,7 @@ static void *dispatch_thread_handler(void *data)
}
ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
+ send_cluster_notify();
}
}
@@ -858,6 +1093,9 @@ static void cleanup_module(void)
ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
}
cfg_handle = 0;
+
+ ao2_cleanup(nodes);
+ nodes = NULL;
}
static int load_module(void)
@@ -865,6 +1103,11 @@ static int load_module(void)
cs_error_t cs_err;
struct cpg_name name;
+ nodes = ao2_container_alloc(23, corosync_node_hash_fn, corosync_node_cmp_fn);
+ if (!nodes) {
+ goto failed;
+ }
+
corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
if (!corosync_aggregate_topic) {
ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");