summaryrefslogtreecommitdiff
path: root/res/res_corosync.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-05-22 12:01:37 +0000
committerMatthew Jordan <mjordan@digium.com>2014-05-22 12:01:37 +0000
commit9cee08f50240b7da7ddd5efc04a8ef0d31292e7a (patch)
tree0180f85de28dce14f04ed17a979152bf8d362d9d /res/res_corosync.c
parent3bac303dc95afbe60069247eda469d9b48cfd701 (diff)
res_corosync: Update module to work with Stasis (and compile)
This patch fixes res_corosync such that it works with Asterisk 12. This restores the functionality that was present in previous versions of Asterisk, and ensures compatibility with those versions by restoring the binary message format needed to pass information from/to them. The following changes were made in the core to support this: * The event system has been partially restored. All event definition and event types in this patch were pulled from Asterisk 11. Previously, we had hoped that this information would live in res_corosync; however, the approach in this patch seems to be better for a few reasons: (1) Theoretically, ast_events can be used by any module as a binary representation of a Stasis message. Given the structure of an ast_event object, that information has to live in the core to be used universally. For example, defining the payload of a device state ast_event in res_corosync could result in an incompatible device state representation in another module. (2) Much of this representation already lived in the core, and was not easily extensible. (3) The code already existed. :-) * Stasis message types now have a message formatter that converts their payload to an ast_event object. * Stasis message forwarders now handle forwarding to themselves. Previously this would result in an infinite recursive call. Now, this simply creates a new forwarding object with no forwards set up (as it is the thing it is forwarding to). This is advantageous for res_corosync, as returning NULL would also imply an unrecoverable error. Returning a subscription in this case allows for easier handling of message types that are published directly to an aggregate topic that has forwarders. Review: https://reviewboard.asterisk.org/r/3486/ ASTERISK-22912 #close ASTERISK-22372 #close ........ Merged revisions 414330 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@414331 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res/res_corosync.c')
-rw-r--r--res/res_corosync.c390
1 files changed, 323 insertions, 67 deletions
diff --git a/res/res_corosync.c b/res/res_corosync.c
index ce4165498..6c4a3d1e7 100644
--- a/res/res_corosync.c
+++ b/res/res_corosync.c
@@ -44,20 +44,125 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
+#include "asterisk/app.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
AST_RWLOCK_DEFINE_STATIC(event_types_lock);
+static void publish_mwi_to_stasis(struct ast_event *event);
+static void publish_device_state_to_stasis(struct ast_event *event);
+
+/*! \brief The internal topic used for message forwarding and pings */
+static struct stasis_topic *corosync_aggregate_topic;
+
+/*! \brief Our \ref stasis message router */
+static struct stasis_message_router *stasis_router;
+
+/*! \brief Internal accessor for our topic */
+static struct stasis_topic *corosync_topic(void)
+{
+ return corosync_aggregate_topic;
+}
+
+/*! \brief A payload wrapper around a corosync ping event */
+struct corosync_ping_payload {
+ /*! The corosync ping event being passed over \ref stasis */
+ struct ast_event *event;
+};
+
+/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
+static void corosync_ping_payload_dtor(void *obj)
+{
+ struct corosync_ping_payload *payload = obj;
+
+ ast_free(payload->event);
+}
+
+/*! \brief Convert a Corosync PING to a \ref ast_event */
+static struct ast_event *corosync_ping_to_event(struct stasis_message *message)
+{
+ struct corosync_ping_payload *payload;
+ struct ast_event *event;
+ struct ast_eid *event_eid;
+
+ if (!message) {
+ return NULL;
+ }
+
+ payload = stasis_message_data(message);
+
+ if (!payload->event) {
+ return NULL;
+ }
+
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
+
+ event = ast_event_new(AST_EVENT_PING,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
+ AST_EVENT_IE_END);
+
+ return event;
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
+ .to_event = corosync_ping_to_event, );
+
+/*! \brief Publish a Corosync ping to \ref stasis */
+static void publish_corosync_ping_to_stasis(struct ast_event *event)
+{
+ struct corosync_ping_payload *payload;
+ struct stasis_message *message;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
+ ast_assert(event != NULL);
+
+ payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
+ if (!payload) {
+ return;
+ }
+ payload->event = event;
+
+ message = stasis_message_create(corosync_ping_message_type(), payload);
+ if (!message) {
+ ao2_t_ref(payload, -1, "Destroy payload on off nominal");
+ return;
+ }
+
+ stasis_publish(corosync_topic(), message);
+
+ ao2_t_ref(payload, -1, "Hand ref to stasis");
+ ao2_t_ref(message, -1, "Hand ref to stasis");
+}
+
static struct {
const char *name;
- struct ast_event_sub *sub;
+ struct stasis_forward *sub;
unsigned char publish;
unsigned char publish_default;
unsigned char subscribe;
unsigned char subscribe_default;
+ struct stasis_topic *(* topic_fn)(void);
+ struct stasis_cache *(* cache_fn)(void);
+ struct stasis_message_type *(* message_type_fn)(void);
+ void (* publish_to_stasis)(struct ast_event *);
} event_types[] = {
- [AST_EVENT_MWI] = { .name = "mwi", },
- [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
- [AST_EVENT_PING] = { .name = "ping", .publish_default = 1, .subscribe_default = 1 },
+ [AST_EVENT_MWI] = { .name = "mwi",
+ .topic_fn = ast_mwi_topic_all,
+ .cache_fn = ast_mwi_state_cache,
+ .message_type_fn = ast_mwi_state_type,
+ .publish_to_stasis = publish_mwi_to_stasis, },
+ [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
+ .topic_fn = ast_device_state_topic_all,
+ .cache_fn = ast_device_state_cache,
+ .message_type_fn = ast_device_state_message_type,
+ .publish_to_stasis = publish_device_state_to_stasis, },
+ [AST_EVENT_PING] = { .name = "ping",
+ .publish_default = 1,
+ .subscribe_default = 1,
+ .topic_fn = corosync_topic,
+ .message_type_fn = corosync_ping_message_type,
+ .publish_to_stasis = publish_corosync_ping_to_stasis, },
};
static struct {
@@ -88,6 +193,71 @@ static corosync_cfg_callbacks_t cfg_callbacks = {
.corosync_cfg_shutdown_callback = cfg_shutdown_cb,
};
+/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
+static void publish_mwi_to_stasis(struct ast_event *event)
+{
+ const char *mailbox;
+ const char *context;
+ unsigned int new_msgs;
+ unsigned int old_msgs;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_MWI);
+
+ mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
+ context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
+ new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+ old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
+ return;
+ }
+
+ if (new_msgs > INT_MAX) {
+ new_msgs = INT_MAX;
+ }
+
+ if (old_msgs > INT_MAX) {
+ old_msgs = INT_MAX;
+ }
+
+ if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
+ (int)old_msgs, NULL, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
+ mailbox, context, eid);
+ }
+}
+
+/*! \brief Publish a received device state \ref ast_event to \ref stasis */
+static void publish_device_state_to_stasis(struct ast_event *event)
+{
+ const char *device;
+ enum ast_device_state state;
+ unsigned int cachable;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE);
+
+ device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
+ state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+ cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(device)) {
+ return;
+ }
+
+ if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
+ device, eid);
+ }
+}
+
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
@@ -101,8 +271,6 @@ static cpg_callbacks_t cpg_callbacks = {
.cpg_confchg_fn = cpg_confchg_cb,
};
-static void ast_event_cb(const struct ast_event *event, void *data);
-
#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
static void cfg_state_track_cb(
corosync_cfg_state_notification_buffer_t *notification_buffer,
@@ -120,6 +288,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
struct ast_event *event;
+ void (*publish_handler)(struct ast_event *) = NULL;
+ enum ast_event_type event_type;
if (msg_len < ast_event_minimum_length()) {
ast_debug(1, "Ignoring event that's too small. %u < %u\n",
@@ -133,9 +303,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
return;
}
+ event_type = ast_event_get_type(msg);
+ if (event_type > AST_EVENT_TOTAL) {
+ /* Egads, we don't support this */
+ return;
+ }
+
ast_rwlock_rdlock(&event_types_lock);
- if (!event_types[ast_event_get_type(msg)].subscribe) {
- /* We are not configured to subscribe to these events. */
+ publish_handler = event_types[event_type].publish_to_stasis;
+ if (!event_types[event_type].subscribe || !publish_handler) {
+ /* We are not configured to subscribe to these events or
+ we have no way to publish it internally. */
ast_rwlock_unlock(&event_types_lock);
return;
}
@@ -147,20 +325,80 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
memcpy(event, msg, msg_len);
+ if (event_type == AST_EVENT_PING) {
+ const struct ast_eid *eid;
+ char buf[128] = "";
+
+ eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
+ ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
+ }
+ ast_debug(5, "Publishing event %s (%d) to stasis\n",
+ ast_event_get_type_name(event), event_type);
+ publish_handler(event);
+}
+
+static void publish_to_corosync(struct stasis_message *message)
+{
+ cs_error_t cs_err;
+ struct iovec iov;
+ struct ast_event *event;
+
+ event = stasis_message_to_event(message);
+ if (!event) {
+ return;
+ }
+
+ if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
+ /* If the event didn't originate from this server, don't send it back out. */
+ ast_event_destroy(event);
+ return;
+ }
+
if (ast_event_get_type(event) == AST_EVENT_PING) {
const struct ast_eid *eid;
char buf[128] = "";
eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(cpg_deliver_cb) Got event PING from server with EID: '%s'\n", buf);
+ ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
+ }
+
+ iov.iov_base = (void *)event;
+ iov.iov_len = ast_event_get_size(event);
+
+ ast_debug(5, "Publishing event %s (%d) to corosync\n",
+ ast_event_get_type_name(event), ast_event_get_type(event));
- ast_event_queue(event);
- } else {
- ast_event_queue_and_cache(event);
+ /* The stasis subscription will only exist if we are configured to publish
+ * these events, so just send away. */
+ if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
+ ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
}
}
+static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ if (!message) {
+ return;
+ }
+
+ publish_to_corosync(message);
+}
+
+static int dump_cache_cb(void *obj, void *arg, int flags)
+{
+ struct stasis_message *message = obj;
+
+ if (!message) {
+ return 0;
+ }
+
+ publish_to_corosync(message);
+
+ return 0;
+}
+
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
@@ -176,20 +414,27 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
for (i = 0; i < ARRAY_LEN(event_types); i++) {
- struct ast_event_sub *event_sub;
+ struct ao2_container *messages;
ast_rwlock_rdlock(&event_types_lock);
if (!event_types[i].publish) {
ast_rwlock_unlock(&event_types_lock);
continue;
}
+
+ if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
+ ast_rwlock_unlock(&event_types_lock);
+ continue;
+ }
+
+ messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
+ event_types[i].message_type_fn(),
+ &ast_eid_default);
ast_rwlock_unlock(&event_types_lock);
- event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
- ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
- &ast_eid_default, sizeof(ast_eid_default));
- ast_event_dump_cache(event_sub);
- ast_event_sub_destroy(event_sub);
+ ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
+
+ ao2_t_ref(messages, -1, "Dispose of dumped cache");
}
}
@@ -231,13 +476,13 @@ static void *dispatch_thread_handler(void *data)
if (pfd[0].revents & POLLIN) {
if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
}
}
if (pfd[1].revents & POLLIN) {
if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
}
}
@@ -287,37 +532,6 @@ static void *dispatch_thread_handler(void *data)
return NULL;
}
-static void ast_event_cb(const struct ast_event *event, void *data)
-{
- cs_error_t cs_err;
- struct iovec iov = {
- .iov_base = (void *) event,
- .iov_len = ast_event_get_size(event),
- };
-
- if (ast_event_get_type(event) == AST_EVENT_PING) {
- const struct ast_eid *eid;
- char buf[128] = "";
-
- eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
- ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(ast_event_cb) Got event PING from server with EID: '%s'\n", buf);
- }
-
- if (ast_eid_cmp(&ast_eid_default,
- ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
- /* If the event didn't originate from this server, don't send it back out. */
- return;
- }
-
- /* The ast_event subscription will only exist if we are configured to publish
- * these events, so just send away. */
-
- if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
- ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err);
- }
-}
-
static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
cs_error_t cs_err;
@@ -368,7 +582,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
continue;
}
- ast_cli(a->fd, "=== Node %u\n", i);
+ ast_cli(a->fd, "=== Node %d\n", i);
ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
for (j = 0; j < num_addrs; j++) {
@@ -378,7 +592,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
- ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
+ ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
}
}
@@ -421,7 +635,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_FAILURE;
}
- ast_event_queue(event);
+ ast_rwlock_rdlock(&event_types_lock);
+ event_types[AST_EVENT_PING].publish_to_stasis(event);
+ ast_rwlock_unlock(&event_types_lock);
return CLI_SUCCESS;
}
@@ -532,11 +748,16 @@ static int load_general_config(struct ast_config *cfg)
for (i = 0; i < ARRAY_LEN(event_types); i++) {
if (event_types[i].publish && !event_types[i].sub) {
- event_types[i].sub = ast_event_subscribe(i,
- ast_event_cb, "Corosync", NULL,
- AST_EVENT_IE_END);
+ event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(),
+ corosync_topic());
+ stasis_message_router_add(stasis_router,
+ event_types[i].message_type_fn(),
+ stasis_message_cb,
+ NULL);
} else if (!event_types[i].publish && event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
}
}
@@ -577,14 +798,32 @@ static void cleanup_module(void)
cs_error_t cs_err;
unsigned int i;
- for (i = 0; i < ARRAY_LEN(event_types); i++) {
- if (event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ if (stasis_router) {
+
+ /* Unsubscribe all topic forwards and cancel all message routes */
+ ast_rwlock_wrlock(&event_types_lock);
+ for (i = 0; i < ARRAY_LEN(event_types); i++) {
+ if (event_types[i].sub) {
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
+ }
+ event_types[i].publish = 0;
+ event_types[i].subscribe = 0;
}
- event_types[i].publish = 0;
- event_types[i].subscribe = 0;
+ ast_rwlock_unlock(&event_types_lock);
+
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+ }
+
+ if (corosync_aggregate_topic) {
+ ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
+ corosync_aggregate_topic = NULL;
}
+ STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
+
if (dispatch_thread.id != AST_PTHREADT_NULL) {
char meepmeep = 'x';
dispatch_thread.stop = 1;
@@ -623,13 +862,30 @@ static int load_module(void)
enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
struct cpg_name name;
+ corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
+ if (!corosync_aggregate_topic) {
+ ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
+ goto failed;
+ }
+
+ stasis_router = stasis_message_router_create(corosync_aggregate_topic);
+ if (!stasis_router) {
+ ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
+ goto failed;
+ }
+
+ if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
+ ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
+ goto failed;
+ }
+
if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
- return AST_MODULE_LOAD_DECLINE;
+ ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
+ goto failed;
}
if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
goto failed;
}
@@ -637,7 +893,7 @@ static int load_module(void)
name.length = strlen(name.value);
if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
goto failed;
}