summaryrefslogtreecommitdiff
path: root/res/res_corosync.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_corosync.c')
-rw-r--r--res/res_corosync.c390
1 files changed, 323 insertions, 67 deletions
diff --git a/res/res_corosync.c b/res/res_corosync.c
index ce4165498..6c4a3d1e7 100644
--- a/res/res_corosync.c
+++ b/res/res_corosync.c
@@ -44,20 +44,125 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
+#include "asterisk/app.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
AST_RWLOCK_DEFINE_STATIC(event_types_lock);
+static void publish_mwi_to_stasis(struct ast_event *event);
+static void publish_device_state_to_stasis(struct ast_event *event);
+
+/*! \brief The internal topic used for message forwarding and pings */
+static struct stasis_topic *corosync_aggregate_topic;
+
+/*! \brief Our \ref stasis message router */
+static struct stasis_message_router *stasis_router;
+
+/*! \brief Internal accessor for our topic */
+static struct stasis_topic *corosync_topic(void)
+{
+ return corosync_aggregate_topic;
+}
+
+/*! \brief A payload wrapper around a corosync ping event */
+struct corosync_ping_payload {
+ /*! The corosync ping event being passed over \ref stasis */
+ struct ast_event *event;
+};
+
+/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
+static void corosync_ping_payload_dtor(void *obj)
+{
+ struct corosync_ping_payload *payload = obj;
+
+ ast_free(payload->event);
+}
+
+/*! \brief Convert a Corosync PING to a \ref ast_event */
+static struct ast_event *corosync_ping_to_event(struct stasis_message *message)
+{
+ struct corosync_ping_payload *payload;
+ struct ast_event *event;
+ struct ast_eid *event_eid;
+
+ if (!message) {
+ return NULL;
+ }
+
+ payload = stasis_message_data(message);
+
+ if (!payload->event) {
+ return NULL;
+ }
+
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
+
+ event = ast_event_new(AST_EVENT_PING,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
+ AST_EVENT_IE_END);
+
+ return event;
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
+ .to_event = corosync_ping_to_event, );
+
+/*! \brief Publish a Corosync ping to \ref stasis */
+static void publish_corosync_ping_to_stasis(struct ast_event *event)
+{
+ struct corosync_ping_payload *payload;
+ struct stasis_message *message;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
+ ast_assert(event != NULL);
+
+ payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
+ if (!payload) {
+ return;
+ }
+ payload->event = event;
+
+ message = stasis_message_create(corosync_ping_message_type(), payload);
+ if (!message) {
+ ao2_t_ref(payload, -1, "Destroy payload on off nominal");
+ return;
+ }
+
+ stasis_publish(corosync_topic(), message);
+
+ ao2_t_ref(payload, -1, "Hand ref to stasis");
+ ao2_t_ref(message, -1, "Hand ref to stasis");
+}
+
static struct {
const char *name;
- struct ast_event_sub *sub;
+ struct stasis_forward *sub;
unsigned char publish;
unsigned char publish_default;
unsigned char subscribe;
unsigned char subscribe_default;
+ struct stasis_topic *(* topic_fn)(void);
+ struct stasis_cache *(* cache_fn)(void);
+ struct stasis_message_type *(* message_type_fn)(void);
+ void (* publish_to_stasis)(struct ast_event *);
} event_types[] = {
- [AST_EVENT_MWI] = { .name = "mwi", },
- [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
- [AST_EVENT_PING] = { .name = "ping", .publish_default = 1, .subscribe_default = 1 },
+ [AST_EVENT_MWI] = { .name = "mwi",
+ .topic_fn = ast_mwi_topic_all,
+ .cache_fn = ast_mwi_state_cache,
+ .message_type_fn = ast_mwi_state_type,
+ .publish_to_stasis = publish_mwi_to_stasis, },
+ [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
+ .topic_fn = ast_device_state_topic_all,
+ .cache_fn = ast_device_state_cache,
+ .message_type_fn = ast_device_state_message_type,
+ .publish_to_stasis = publish_device_state_to_stasis, },
+ [AST_EVENT_PING] = { .name = "ping",
+ .publish_default = 1,
+ .subscribe_default = 1,
+ .topic_fn = corosync_topic,
+ .message_type_fn = corosync_ping_message_type,
+ .publish_to_stasis = publish_corosync_ping_to_stasis, },
};
static struct {
@@ -88,6 +193,71 @@ static corosync_cfg_callbacks_t cfg_callbacks = {
.corosync_cfg_shutdown_callback = cfg_shutdown_cb,
};
+/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
+static void publish_mwi_to_stasis(struct ast_event *event)
+{
+ const char *mailbox;
+ const char *context;
+ unsigned int new_msgs;
+ unsigned int old_msgs;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_MWI);
+
+ mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
+ context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
+ new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+ old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
+ return;
+ }
+
+ if (new_msgs > INT_MAX) {
+ new_msgs = INT_MAX;
+ }
+
+ if (old_msgs > INT_MAX) {
+ old_msgs = INT_MAX;
+ }
+
+ if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
+ (int)old_msgs, NULL, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
+ mailbox, context, eid);
+ }
+}
+
+/*! \brief Publish a received device state \ref ast_event to \ref stasis */
+static void publish_device_state_to_stasis(struct ast_event *event)
+{
+ const char *device;
+ enum ast_device_state state;
+ unsigned int cachable;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE);
+
+ device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
+ state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+ cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(device)) {
+ return;
+ }
+
+ if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
+ device, eid);
+ }
+}
+
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
@@ -101,8 +271,6 @@ static cpg_callbacks_t cpg_callbacks = {
.cpg_confchg_fn = cpg_confchg_cb,
};
-static void ast_event_cb(const struct ast_event *event, void *data);
-
#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
static void cfg_state_track_cb(
corosync_cfg_state_notification_buffer_t *notification_buffer,
@@ -120,6 +288,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
struct ast_event *event;
+ void (*publish_handler)(struct ast_event *) = NULL;
+ enum ast_event_type event_type;
if (msg_len < ast_event_minimum_length()) {
ast_debug(1, "Ignoring event that's too small. %u < %u\n",
@@ -133,9 +303,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
return;
}
+ event_type = ast_event_get_type(msg);
+ if (event_type > AST_EVENT_TOTAL) {
+ /* Egads, we don't support this */
+ return;
+ }
+
ast_rwlock_rdlock(&event_types_lock);
- if (!event_types[ast_event_get_type(msg)].subscribe) {
- /* We are not configured to subscribe to these events. */
+ publish_handler = event_types[event_type].publish_to_stasis;
+ if (!event_types[event_type].subscribe || !publish_handler) {
+ /* We are not configured to subscribe to these events or
+ we have no way to publish it internally. */
ast_rwlock_unlock(&event_types_lock);
return;
}
@@ -147,20 +325,80 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
memcpy(event, msg, msg_len);
+ if (event_type == AST_EVENT_PING) {
+ const struct ast_eid *eid;
+ char buf[128] = "";
+
+ eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
+ ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
+ }
+ ast_debug(5, "Publishing event %s (%d) to stasis\n",
+ ast_event_get_type_name(event), event_type);
+ publish_handler(event);
+}
+
+static void publish_to_corosync(struct stasis_message *message)
+{
+ cs_error_t cs_err;
+ struct iovec iov;
+ struct ast_event *event;
+
+ event = stasis_message_to_event(message);
+ if (!event) {
+ return;
+ }
+
+ if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
+ /* If the event didn't originate from this server, don't send it back out. */
+ ast_event_destroy(event);
+ return;
+ }
+
if (ast_event_get_type(event) == AST_EVENT_PING) {
const struct ast_eid *eid;
char buf[128] = "";
eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(cpg_deliver_cb) Got event PING from server with EID: '%s'\n", buf);
+ ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
+ }
+
+ iov.iov_base = (void *)event;
+ iov.iov_len = ast_event_get_size(event);
+
+ ast_debug(5, "Publishing event %s (%d) to corosync\n",
+ ast_event_get_type_name(event), ast_event_get_type(event));
- ast_event_queue(event);
- } else {
- ast_event_queue_and_cache(event);
+ /* The stasis subscription will only exist if we are configured to publish
+ * these events, so just send away. */
+ if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
+ ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
}
}
+static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ if (!message) {
+ return;
+ }
+
+ publish_to_corosync(message);
+}
+
+static int dump_cache_cb(void *obj, void *arg, int flags)
+{
+ struct stasis_message *message = obj;
+
+ if (!message) {
+ return 0;
+ }
+
+ publish_to_corosync(message);
+
+ return 0;
+}
+
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
@@ -176,20 +414,27 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
for (i = 0; i < ARRAY_LEN(event_types); i++) {
- struct ast_event_sub *event_sub;
+ struct ao2_container *messages;
ast_rwlock_rdlock(&event_types_lock);
if (!event_types[i].publish) {
ast_rwlock_unlock(&event_types_lock);
continue;
}
+
+ if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
+ ast_rwlock_unlock(&event_types_lock);
+ continue;
+ }
+
+ messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
+ event_types[i].message_type_fn(),
+ &ast_eid_default);
ast_rwlock_unlock(&event_types_lock);
- event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
- ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
- &ast_eid_default, sizeof(ast_eid_default));
- ast_event_dump_cache(event_sub);
- ast_event_sub_destroy(event_sub);
+ ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
+
+ ao2_t_ref(messages, -1, "Dispose of dumped cache");
}
}
@@ -231,13 +476,13 @@ static void *dispatch_thread_handler(void *data)
if (pfd[0].revents & POLLIN) {
if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
}
}
if (pfd[1].revents & POLLIN) {
if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
}
}
@@ -287,37 +532,6 @@ static void *dispatch_thread_handler(void *data)
return NULL;
}
-static void ast_event_cb(const struct ast_event *event, void *data)
-{
- cs_error_t cs_err;
- struct iovec iov = {
- .iov_base = (void *) event,
- .iov_len = ast_event_get_size(event),
- };
-
- if (ast_event_get_type(event) == AST_EVENT_PING) {
- const struct ast_eid *eid;
- char buf[128] = "";
-
- eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
- ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(ast_event_cb) Got event PING from server with EID: '%s'\n", buf);
- }
-
- if (ast_eid_cmp(&ast_eid_default,
- ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
- /* If the event didn't originate from this server, don't send it back out. */
- return;
- }
-
- /* The ast_event subscription will only exist if we are configured to publish
- * these events, so just send away. */
-
- if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
- ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err);
- }
-}
-
static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
cs_error_t cs_err;
@@ -368,7 +582,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
continue;
}
- ast_cli(a->fd, "=== Node %u\n", i);
+ ast_cli(a->fd, "=== Node %d\n", i);
ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
for (j = 0; j < num_addrs; j++) {
@@ -378,7 +592,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
- ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
+ ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
}
}
@@ -421,7 +635,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_FAILURE;
}
- ast_event_queue(event);
+ ast_rwlock_rdlock(&event_types_lock);
+ event_types[AST_EVENT_PING].publish_to_stasis(event);
+ ast_rwlock_unlock(&event_types_lock);
return CLI_SUCCESS;
}
@@ -532,11 +748,16 @@ static int load_general_config(struct ast_config *cfg)
for (i = 0; i < ARRAY_LEN(event_types); i++) {
if (event_types[i].publish && !event_types[i].sub) {
- event_types[i].sub = ast_event_subscribe(i,
- ast_event_cb, "Corosync", NULL,
- AST_EVENT_IE_END);
+ event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(),
+ corosync_topic());
+ stasis_message_router_add(stasis_router,
+ event_types[i].message_type_fn(),
+ stasis_message_cb,
+ NULL);
} else if (!event_types[i].publish && event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
}
}
@@ -577,14 +798,32 @@ static void cleanup_module(void)
cs_error_t cs_err;
unsigned int i;
- for (i = 0; i < ARRAY_LEN(event_types); i++) {
- if (event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ if (stasis_router) {
+
+ /* Unsubscribe all topic forwards and cancel all message routes */
+ ast_rwlock_wrlock(&event_types_lock);
+ for (i = 0; i < ARRAY_LEN(event_types); i++) {
+ if (event_types[i].sub) {
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
+ }
+ event_types[i].publish = 0;
+ event_types[i].subscribe = 0;
}
- event_types[i].publish = 0;
- event_types[i].subscribe = 0;
+ ast_rwlock_unlock(&event_types_lock);
+
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+ }
+
+ if (corosync_aggregate_topic) {
+ ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
+ corosync_aggregate_topic = NULL;
}
+ STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
+
if (dispatch_thread.id != AST_PTHREADT_NULL) {
char meepmeep = 'x';
dispatch_thread.stop = 1;
@@ -623,13 +862,30 @@ static int load_module(void)
enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
struct cpg_name name;
+ corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
+ if (!corosync_aggregate_topic) {
+ ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
+ goto failed;
+ }
+
+ stasis_router = stasis_message_router_create(corosync_aggregate_topic);
+ if (!stasis_router) {
+ ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
+ goto failed;
+ }
+
+ if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
+ ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
+ goto failed;
+ }
+
if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
- return AST_MODULE_LOAD_DECLINE;
+ ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
+ goto failed;
}
if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
goto failed;
}
@@ -637,7 +893,7 @@ static int load_module(void)
name.length = strlen(name.value);
if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
goto failed;
}