diff options
author | Russell Bryant <russell@russellbryant.com> | 2012-02-05 10:58:37 +0000 |
---|---|---|
committer | Russell Bryant <russell@russellbryant.com> | 2012-02-05 10:58:37 +0000 |
commit | 055a19e1286fbb7a3559c9928fad4e7d08af5329 (patch) | |
tree | bf9e55be47bfefe82e02466a8e2091f4f8de8bd2 /res | |
parent | a898eb4d078750c46cefc2f3fba6b34207eec160 (diff) |
Replace res_ais with a new module, res_corosync.
This patch removes res_ais and introduces a new module, res_corosync.
The OpenAIS project is deprecated and is now just a wrapper around
Corosync. This module provides the same functionality using the same
core infrastructure, but without the use of the deprecated components.
Technically res_ais could have been used with an AIS implementation other
than OpenAIS, but that is the only one I know of that was ever used.
Review: https://reviewboard.asterisk.org/r/1700/
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@354046 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res')
-rw-r--r-- | res/ais/ais.h | 48 | ||||
-rw-r--r-- | res/ais/clm.c | 168 | ||||
-rw-r--r-- | res/ais/evt.c | 583 | ||||
-rw-r--r-- | res/res_corosync.c | 574 |
4 files changed, 574 insertions, 799 deletions
diff --git a/res/ais/ais.h b/res/ais/ais.h deleted file mode 100644 index 6aaeadf15..000000000 --- a/res/ais/ais.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Asterisk -- An open source telephony toolkit. - * - * Copyright (C) 2007, Digium, Inc. - * - * Russell Bryant <russell@digium.com> - * - * See http://www.asterisk.org for more information about - * the Asterisk project. Please do not directly contact - * any of the maintainers of this project for assistance; - * the project provides a web site, mailing lists and IRC - * channels for your use. - * - * This program is free software, distributed under the terms of - * the GNU General Public License Version 2. See the LICENSE file - * at the top of the source tree. - */ - -/*! - * \file - * \author Russell Bryant <russell@digium.com> - * - * \brief Usage of the SAForum AIS (Application Interface Specification) - * - * \arg http://www.openais.org/ - */ - -#ifndef RES_AIS_AIS_H -#define RES_AIS_AIS_H - -#include <saAis.h> -#include <saClm.h> -#include <saEvt.h> - -extern SaVersionT ais_version; - -extern SaClmHandleT clm_handle; -extern SaEvtHandleT evt_handle; - -int ast_ais_clm_load_module(void); -int ast_ais_clm_unload_module(void); - -int ast_ais_evt_load_module(void); -int ast_ais_evt_unload_module(void); - -const char *ais_err2str(SaAisErrorT error); - -#endif /* RES_AIS_AIS_H */ diff --git a/res/ais/clm.c b/res/ais/clm.c deleted file mode 100644 index d290ee2cd..000000000 --- a/res/ais/clm.c +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Asterisk -- An open source telephony toolkit. - * - * Copyright (C) 2007, Digium, Inc. - * - * Russell Bryant <russell@digium.com> - * - * See http://www.asterisk.org for more information about - * the Asterisk project. Please do not directly contact - * any of the maintainers of this project for assistance; - * the project provides a web site, mailing lists and IRC - * channels for your use. - * - * This program is free software, distributed under the terms of - * the GNU General Public License Version 2. See the LICENSE file - * at the top of the source tree. - */ - -/*! - * \file - * \author Russell Bryant <russell@digium.com> - * - * \brief Usage of the SAForum AIS (Application Interface Specification) - * - * \arg http://www.openais.org/ - * - * This file contains the code specific to the use of the CLM - * (Cluster Membership) Service. - */ - -#include "asterisk.h" - -ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <unistd.h> -#include <errno.h> - -#include "ais.h" - -#include "asterisk/module.h" -#include "asterisk/utils.h" -#include "asterisk/cli.h" -#include "asterisk/logger.h" - -SaClmHandleT clm_handle; -static SaAisErrorT clm_init_res; - -static void clm_node_get_cb(SaInvocationT invocation, - const SaClmClusterNodeT *cluster_node, SaAisErrorT error); -static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer, - SaUint32T num_members, SaAisErrorT error); - -static const SaClmCallbacksT clm_callbacks = { - .saClmClusterNodeGetCallback = clm_node_get_cb, - .saClmClusterTrackCallback = clm_track_cb, -}; - -static void clm_node_get_cb(SaInvocationT invocation, - const SaClmClusterNodeT *cluster_node, SaAisErrorT error) -{ - -} - -static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer, - SaUint32T num_members, SaAisErrorT error) -{ - -} - -static char *ais_clm_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) -{ - int i; - SaClmClusterNotificationBufferT buf; - SaClmClusterNotificationT notif[64]; - SaAisErrorT ais_res; - - switch (cmd) { - case CLI_INIT: - e->command = "ais clm show members"; - e->usage = - "Usage: ais clm show members\n" - " List members of the cluster using the CLM (Cluster Membership) service.\n"; - return NULL; - - case CLI_GENERATE: - return NULL; /* no completion */ - } - - if (a->argc != e->args) - return CLI_SHOWUSAGE; - - buf.notification = notif; - buf.numberOfItems = ARRAY_LEN(notif); - - ais_res = saClmClusterTrack(clm_handle, SA_TRACK_CURRENT, &buf); - if (ais_res != SA_AIS_OK) { - ast_cli(a->fd, "Error retrieving current cluster members.\n"); - return CLI_FAILURE; - } - - ast_cli(a->fd, "\n" - "=============================================================\n" - "=== Cluster Members =========================================\n" - "=============================================================\n" - "===\n"); - - for (i = 0; i < buf.numberOfItems; i++) { - SaClmClusterNodeT *node = &buf.notification[i].clusterNode; - - ast_cli(a->fd, "=== ---------------------------------------------------------\n" - "=== Node Name: %s\n" - "=== ==> ID: 0x%x\n" - "=== ==> Address: %s\n" - "=== ==> Member: %s\n", - (char *) node->nodeName.value, (int) node->nodeId, - (char *) node->nodeAddress.value, - node->member ? "Yes" : "No"); - - ast_cli(a->fd, "=== ---------------------------------------------------------\n" - "===\n"); - } - - ast_cli(a->fd, "=============================================================\n" - "\n"); - - return CLI_SUCCESS; -} - -static struct ast_cli_entry ais_cli[] = { - AST_CLI_DEFINE(ais_clm_show_members, "List current members of the cluster"), -}; - -int ast_ais_clm_load_module(void) -{ - clm_init_res = saClmInitialize(&clm_handle, &clm_callbacks, &ais_version); - if (clm_init_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Could not initialize cluster membership service: %s\n", - ais_err2str(clm_init_res)); - return -1; - } - - ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli)); - - return 0; -} - -int ast_ais_clm_unload_module(void) -{ - SaAisErrorT ais_res; - - if (clm_init_res != SA_AIS_OK) { - return 0; - } - - ast_cli_unregister_multiple(ais_cli, ARRAY_LEN(ais_cli)); - - ais_res = saClmFinalize(clm_handle); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Problem stopping cluster membership service: %s\n", - ais_err2str(ais_res)); - return -1; - } - - return 0; -} diff --git a/res/ais/evt.c b/res/ais/evt.c deleted file mode 100644 index 8d11c6473..000000000 --- a/res/ais/evt.c +++ /dev/null @@ -1,583 +0,0 @@ -/* - * Asterisk -- An open source telephony toolkit. - * - * Copyright (C) 2007, Digium, Inc. - * - * Russell Bryant <russell@digium.com> - * - * See http://www.asterisk.org for more information about - * the Asterisk project. Please do not directly contact - * any of the maintainers of this project for assistance; - * the project provides a web site, mailing lists and IRC - * channels for your use. - * - * This program is free software, distributed under the terms of - * the GNU General Public License Version 2. See the LICENSE file - * at the top of the source tree. - */ - -/*! - * \file - * \author Russell Bryant <russell@digium.com> - * - * \brief Usage of the SAForum AIS (Application Interface Specification) - * - * \arg http://www.openais.org/ - * - * This file contains the code specific to the use of the EVT - * (Event) Service. - */ - -#include "asterisk.h" - -ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <unistd.h> -#include <errno.h> - -#include "ais.h" - -#include "asterisk/module.h" -#include "asterisk/utils.h" -#include "asterisk/cli.h" -#include "asterisk/logger.h" -#include "asterisk/event.h" -#include "asterisk/config.h" -#include "asterisk/linkedlists.h" -#include "asterisk/devicestate.h" - -#ifndef AST_MODULE -/* XXX HACK */ -#define AST_MODULE "res_ais" -#endif - -SaEvtHandleT evt_handle; -static SaAisErrorT evt_init_res; - -void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle, - SaAisErrorT error); -void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id, - const SaEvtEventHandleT event_handle, const SaSizeT event_datalen); - -static const SaEvtCallbacksT evt_callbacks = { - .saEvtChannelOpenCallback = evt_channel_open_cb, - .saEvtEventDeliverCallback = evt_event_deliver_cb, -}; - -static const struct { - const char *str; - enum ast_event_type type; -} supported_event_types[] = { - { "mwi", AST_EVENT_MWI }, - { "device_state", AST_EVENT_DEVICE_STATE_CHANGE }, -}; - -/*! Used to provide unique id's to egress subscriptions */ -static int unique_id; - -struct subscribe_event { - AST_LIST_ENTRY(subscribe_event) entry; - /*! This is a unique identifier to identify this subscription in the event - * channel through the different API calls, subscribe, unsubscribe, and - * the event deliver callback. */ - SaEvtSubscriptionIdT id; - enum ast_event_type type; -}; - -struct publish_event { - AST_LIST_ENTRY(publish_event) entry; - /*! We subscribe to events internally so that we can publish them - * on this event channel. */ - struct ast_event_sub *sub; - enum ast_event_type type; -}; - -struct event_channel { - AST_RWLIST_ENTRY(event_channel) entry; - AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events; - AST_LIST_HEAD_NOLOCK(, publish_event) publish_events; - SaEvtChannelHandleT handle; - char name[1]; -}; - -static AST_RWLIST_HEAD_STATIC(event_channels, event_channel); - -void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle, - SaAisErrorT error) -{ - -} - -static void queue_event(struct ast_event *ast_event) -{ - ast_event_queue_and_cache(ast_event); -} - -void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id, - const SaEvtEventHandleT event_handle, const SaSizeT event_datalen) -{ - /* It is important to note that this works because we *know* that this - * function will only be called by a single thread, the dispatch_thread. - * If this module gets changed such that this is no longer the case, this - * should get changed to a thread-local buffer, instead. */ - static unsigned char buf[4096]; - struct ast_event *event_dup, *event = (void *) buf; - SaAisErrorT ais_res; - SaSizeT len = sizeof(buf); - - if (event_datalen > len) { - ast_log(LOG_ERROR, "Event received with size %u, which is too big\n" - "for the allocated size %u. Change the code to increase the size.\n", - (unsigned int) event_datalen, (unsigned int) len); - return; - } - - if (event_datalen < ast_event_minimum_length()) { - ast_debug(1, "Ignoring event that's too small. %u < %u\n", - (unsigned int) event_datalen, - (unsigned int) ast_event_minimum_length()); - return; - } - - ais_res = saEvtEventDataGet(event_handle, event, &len); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error retrieving event payload: %s\n", - ais_err2str(ais_res)); - return; - } - - if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { - /* Don't feed events back in that originated locally. */ - return; - } - - if (!(event_dup = ast_malloc(len))) - return; - - memcpy(event_dup, event, len); - - queue_event(event_dup); -} - -static const char *type_to_filter_str(enum ast_event_type type) -{ - const char *filter_str = NULL; - int i; - - for (i = 0; i < ARRAY_LEN(supported_event_types); i++) { - if (supported_event_types[i].type == type) { - filter_str = supported_event_types[i].str; - break; - } - } - - return filter_str; -} - -static void ast_event_cb(const struct ast_event *ast_event, void *data) -{ - SaEvtEventHandleT event_handle; - SaAisErrorT ais_res; - struct event_channel *event_channel = data; - SaClmClusterNodeT local_node; - SaEvtEventPatternArrayT pattern_array; - SaEvtEventPatternT pattern; - SaSizeT len; - const char *filter_str; - SaEvtEventIdT event_id; - - ast_debug(1, "Got an event to forward\n"); - - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) { - /* If the event didn't originate from this server, don't send it back out. */ - ast_debug(1, "Returning here\n"); - return; - } - - ais_res = saEvtEventAllocate(event_channel->handle, &event_handle); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res)); - ast_debug(1, "Returning here\n"); - return; - } - - ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID, - SA_TIME_ONE_SECOND, &local_node); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res)); - goto return_event_free; - } - - filter_str = type_to_filter_str(ast_event_get_type(ast_event)); - len = strlen(filter_str) + 1; - pattern.pattern = (SaUint8T *) filter_str; - pattern.patternSize = len; - pattern.allocatedSize = len; - - pattern_array.allocatedNumber = 1; - pattern_array.patternsNumber = 1; - pattern_array.patterns = &pattern; - - /*! - * /todo Make retention time configurable - * /todo Make event priorities configurable - */ - ais_res = saEvtEventAttributesSet(event_handle, &pattern_array, - SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res)); - goto return_event_free; - } - - ais_res = saEvtEventPublish(event_handle, - ast_event, ast_event_get_size(ast_event), &event_id); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res)); - goto return_event_free; - } - -return_event_free: - ais_res = saEvtEventFree(event_handle); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res)); - } - ast_debug(1, "Returning here (event_free)\n"); -} - -static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) -{ - struct event_channel *event_channel; - - switch (cmd) { - case CLI_INIT: - e->command = "ais evt show event channels"; - e->usage = - "Usage: ais evt show event channels\n" - " List configured event channels for the (EVT) Eventing service.\n"; - return NULL; - - case CLI_GENERATE: - return NULL; /* no completion */ - } - - if (a->argc != e->args) - return CLI_SHOWUSAGE; - - ast_cli(a->fd, "\n" - "=============================================================\n" - "=== Event Channels ==========================================\n" - "=============================================================\n" - "===\n"); - - AST_RWLIST_RDLOCK(&event_channels); - AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) { - struct publish_event *publish_event; - struct subscribe_event *subscribe_event; - - ast_cli(a->fd, "=== ---------------------------------------------------------\n" - "=== Event Channel Name: %s\n", event_channel->name); - - AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) { - ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n", - type_to_filter_str(publish_event->type)); - } - - AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) { - ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n", - type_to_filter_str(subscribe_event->type)); - } - - ast_cli(a->fd, "=== ---------------------------------------------------------\n" - "===\n"); - } - AST_RWLIST_UNLOCK(&event_channels); - - ast_cli(a->fd, "=============================================================\n" - "\n"); - - return CLI_SUCCESS; -} - -static struct ast_cli_entry ais_cli[] = { - AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"), -}; - -static void add_publish_event(struct event_channel *event_channel, const char *event_type) -{ - int i; - enum ast_event_type type = -1; - struct publish_event *publish_event; - - for (i = 0; i < ARRAY_LEN(supported_event_types); i++) { - if (!strcasecmp(event_type, supported_event_types[i].str)) { - type = supported_event_types[i].type; - break; - } - } - - if (type == -1) { - ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type); - return; - } - - if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) { - return; - } - - if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) { - return; - } - - publish_event->type = type; - ast_debug(1, "Subscribing to event type %d\n", type); - publish_event->sub = ast_event_subscribe(type, ast_event_cb, "AIS", event_channel, - AST_EVENT_IE_END); - ast_event_dump_cache(publish_event->sub); - - AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry); -} - -static SaAisErrorT set_egress_subscription(struct event_channel *event_channel, - struct subscribe_event *subscribe_event) -{ - SaAisErrorT ais_res; - SaEvtEventFilterArrayT filter_array; - SaEvtEventFilterT filter; - const char *filter_str = NULL; - SaSizeT len; - - /* We know it's going to be valid. It was checked earlier. */ - filter_str = type_to_filter_str(subscribe_event->type); - - filter.filterType = SA_EVT_EXACT_FILTER; - len = strlen(filter_str) + 1; - filter.filter.allocatedSize = len; - filter.filter.patternSize = len; - filter.filter.pattern = (SaUint8T *) filter_str; - - filter_array.filtersNumber = 1; - filter_array.filters = &filter; - - ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array, - subscribe_event->id); - - return ais_res; -} - -static void add_subscribe_event(struct event_channel *event_channel, const char *event_type) -{ - int i; - enum ast_event_type type = -1; - struct subscribe_event *subscribe_event; - SaAisErrorT ais_res; - - for (i = 0; i < ARRAY_LEN(supported_event_types); i++) { - if (!strcasecmp(event_type, supported_event_types[i].str)) { - type = supported_event_types[i].type; - break; - } - } - - if (type == -1) { - ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type); - return; - } - - if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) { - return; - } - - if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) { - return; - } - - subscribe_event->type = type; - subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1); - - ais_res = set_egress_subscription(event_channel, subscribe_event); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n", - ais_err2str(ais_res)); - free(subscribe_event); - return; - } - - AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry); -} - -static void build_event_channel(struct ast_config *cfg, const char *cat) -{ - struct ast_variable *var; - struct event_channel *event_channel; - SaAisErrorT ais_res; - SaNameT sa_name = { 0, }; - - AST_RWLIST_WRLOCK(&event_channels); - AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) { - if (!strcasecmp(event_channel->name, cat)) - break; - } - AST_RWLIST_UNLOCK(&event_channels); - if (event_channel) { - ast_log(LOG_WARNING, "Event channel '%s' was specified twice in " - "configuration. Second instance ignored.\n", cat); - return; - } - - if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat)))) - return; - - strcpy(event_channel->name, cat); - ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value)); - sa_name.length = strlen((char *) sa_name.value); - ais_res = saEvtChannelOpen(evt_handle, &sa_name, - SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE, - SA_TIME_MAX, &event_channel->handle); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res)); - free(event_channel); - return; - } - - for (var = ast_variable_browse(cfg, cat); var; var = var->next) { - if (!strcasecmp(var->name, "type")) { - continue; - } else if (!strcasecmp(var->name, "publish_event")) { - add_publish_event(event_channel, var->value); - } else if (!strcasecmp(var->name, "subscribe_event")) { - add_subscribe_event(event_channel, var->value); - } else { - ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n", - event_channel->name, var->name); - } - } - - AST_RWLIST_WRLOCK(&event_channels); - AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry); - AST_RWLIST_UNLOCK(&event_channels); -} - -static void load_config(void) -{ - static const char filename[] = "ais.conf"; - struct ast_config *cfg; - const char *cat = NULL; - struct ast_flags config_flags = { 0 }; - - if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID) - return; - - while ((cat = ast_category_browse(cfg, cat))) { - const char *type; - - if (!strcasecmp(cat, "general")) - continue; - - if (!(type = ast_variable_retrieve(cfg, cat, "type"))) { - ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n", - filename); - continue; - } - - if (!strcasecmp(type, "event_channel")) { - build_event_channel(cfg, cat); - } else { - ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n", - filename, type); - } - } - - ast_config_destroy(cfg); -} - -static void publish_event_destroy(struct publish_event *publish_event) -{ - ast_event_unsubscribe(publish_event->sub); - - free(publish_event); -} - -static void subscribe_event_destroy(const struct event_channel *event_channel, - struct subscribe_event *subscribe_event) -{ - SaAisErrorT ais_res; - - /* saEvtChannelClose() will actually do this automatically, but it just - * feels cleaner to go ahead and do it manually ... */ - ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res)); - } - - free(subscribe_event); -} - -static void event_channel_destroy(struct event_channel *event_channel) -{ - struct publish_event *publish_event; - struct subscribe_event *subscribe_event; - SaAisErrorT ais_res; - - while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry))) - publish_event_destroy(publish_event); - while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry))) - subscribe_event_destroy(event_channel, subscribe_event); - - ais_res = saEvtChannelClose(event_channel->handle); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n", - event_channel->name, ais_err2str(ais_res)); - } - - free(event_channel); -} - -static void destroy_event_channels(void) -{ - struct event_channel *event_channel; - - AST_RWLIST_WRLOCK(&event_channels); - while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry))) { - event_channel_destroy(event_channel); - } - AST_RWLIST_UNLOCK(&event_channels); -} - -int ast_ais_evt_load_module(void) -{ - evt_init_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version); - if (evt_init_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n", - ais_err2str(evt_init_res)); - return -1; - } - - load_config(); - - ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli)); - - return 0; -} - -int ast_ais_evt_unload_module(void) -{ - SaAisErrorT ais_res; - - if (evt_init_res != SA_AIS_OK) { - return 0; - } - - destroy_event_channels(); - - ais_res = saEvtFinalize(evt_handle); - if (ais_res != SA_AIS_OK) { - ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n", - ais_err2str(ais_res)); - return -1; - } - - return 0; -} diff --git a/res/res_corosync.c b/res/res_corosync.c new file mode 100644 index 000000000..9ce44103d --- /dev/null +++ b/res/res_corosync.c @@ -0,0 +1,574 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2007, Digium, Inc. + * Copyright (C) 2012, Russell Bryant + * + * Russell Bryant <russell@russellbryant.net> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \author Russell Bryant <russell@russellbryant.net> + * + * This module is based on and replaces the previous res_ais module. + */ + +/*** MODULEINFO + <depend>corosync</depend> + <support_level>extended</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); + +#include <corosync/cpg.h> +#include <corosync/cfg.h> + +#include "asterisk/module.h" +#include "asterisk/logger.h" +#include "asterisk/poll-compat.h" +#include "asterisk/config.h" +#include "asterisk/event.h" +#include "asterisk/cli.h" +#include "asterisk/devicestate.h" + +AST_RWLOCK_DEFINE_STATIC(event_types_lock); + +static struct { + const char *name; + struct ast_event_sub *sub; + unsigned char publish; + unsigned char subscribe; +} event_types[] = { + [AST_EVENT_MWI] = { .name = "mwi", }, + [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", }, +}; + +static struct { + pthread_t id; + int alert_pipe[2]; + unsigned int stop:1; +} dispatch_thread = { + .id = AST_PTHREADT_NULL, + .alert_pipe = { -1, -1 }, +}; + +static cpg_handle_t cpg_handle; +static corosync_cfg_handle_t cfg_handle; + +static void cfg_state_track_cb( + corosync_cfg_state_notification_buffer_t *notification_buffer, + cs_error_t error); + +static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle, + corosync_cfg_shutdown_flags_t flags); + +static corosync_cfg_callbacks_t cfg_callbacks = { + .corosync_cfg_state_track_callback = cfg_state_track_cb, + .corosync_cfg_shutdown_callback = cfg_shutdown_cb, +}; + +static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len); + +static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries); + +static cpg_callbacks_t cpg_callbacks = { + .cpg_deliver_fn = cpg_deliver_cb, + .cpg_confchg_fn = cpg_confchg_cb, +}; + +static void ast_event_cb(const struct ast_event *event, void *data); + +static void cfg_state_track_cb( + corosync_cfg_state_notification_buffer_t *notification_buffer, + cs_error_t error) +{ +} + +static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle, + corosync_cfg_shutdown_flags_t flags) +{ +} + +static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +{ + struct ast_event *event; + + if (msg_len < ast_event_minimum_length()) { + ast_debug(1, "Ignoring event that's too small. %u < %u\n", + (unsigned int) msg_len, + (unsigned int) ast_event_minimum_length()); + return; + } + + if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) { + /* Don't feed events back in that originated locally. */ + return; + } + + ast_rwlock_rdlock(&event_types_lock); + if (!event_types[ast_event_get_type(msg)].subscribe) { + /* We are not configured to subscribe to these events. */ + ast_rwlock_unlock(&event_types_lock); + return; + } + ast_rwlock_unlock(&event_types_lock); + + if (!(event = ast_malloc(msg_len))) { + return; + } + + memcpy(event, msg, msg_len); + + ast_event_queue_and_cache(event); +} + +static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ + unsigned int i; + + /* If any new nodes have joined, dump our cache of events we are publishing + * that originated from this server. */ + + if (!joined_list_entries) { + return; + } + + for (i = 0; i < ARRAY_LEN(event_types); i++) { + struct ast_event_sub *event_sub; + + ast_rwlock_rdlock(&event_types_lock); + if (!event_types[i].publish) { + ast_rwlock_unlock(&event_types_lock); + continue; + } + ast_rwlock_unlock(&event_types_lock); + + event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL); + ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID, + &ast_eid_default, sizeof(ast_eid_default)); + ast_event_dump_cache(event_sub); + ast_event_sub_destroy(event_sub); + } +} + +static void *dispatch_thread_handler(void *data) +{ + cs_error_t cs_err; + struct pollfd pfd[3] = { + { .events = POLLIN, }, + { .events = POLLIN, }, + { .events = POLLIN, }, + }; + + if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n"); + return NULL; + } + + if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n"); + return NULL; + } + + pfd[2].fd = dispatch_thread.alert_pipe[0]; + + while (!dispatch_thread.stop) { + int res; + + pfd[0].revents = 0; + pfd[1].revents = 0; + pfd[2].revents = 0; + + res = ast_poll(pfd, ARRAY_LEN(pfd), -1); + if (res == -1 && errno != EINTR && errno != EAGAIN) { + ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno); + continue; + } + + if (pfd[0].revents & POLLIN) { + if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) { + ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err); + } + } + + if (pfd[1].revents & POLLIN) { + if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) { + ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err); + } + } + } + + return NULL; +} + +static void ast_event_cb(const struct ast_event *event, void *data) +{ + cs_error_t cs_err; + struct iovec iov = { + .iov_base = (void *) event, + .iov_len = ast_event_get_size(event), + }; + + if (ast_eid_cmp(&ast_eid_default, + ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { + /* If the event didn't originate from this server, don't send it back out. */ + return; + } + + /* The ast_event subscription will only exist if we are configured to publish + * these events, so just send away. */ + + if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { + ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err); + } +} + +static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + cs_error_t cs_err; + struct cpg_name name; + struct cpg_address member_list[CPG_MEMBERS_MAX] = { { 0, }, }; + int num_members = CPG_MEMBERS_MAX; + unsigned int i; + + switch (cmd) { + case CLI_INIT: + e->command = "corosync show members"; + e->usage = + "Usage: corosync show members\n" + " Show corosync cluster members\n"; + return NULL; + + case CLI_GENERATE: + return NULL; /* no completion */ + } + + if (a->argc != e->args) { + return CLI_SHOWUSAGE; + } + + ast_copy_string(name.value, "asterisk", sizeof(name.value)); + name.length = strlen(name.value); + + cs_err = cpg_membership_get(cpg_handle, &name, member_list, &num_members); + + if (cs_err != CS_OK) { + ast_cli(a->fd, "Failed to get membership list\n"); + return CLI_FAILURE; + } + + ast_cli(a->fd, "\n" + "=============================================================\n" + "=== Cluster members =========================================\n" + "=============================================================\n" + "===\n" + "=== Number of members: %d\n" + "===\n", num_members); + + for (i = 0; i < num_members; i++) { + corosync_cfg_node_address_t addrs[8]; + int num_addrs = 0; + unsigned int j; + + cs_err = corosync_cfg_get_node_addrs(cfg_handle, member_list[i].nodeid, + ARRAY_LEN(addrs), &num_addrs, addrs); + if (cs_err != CS_OK) { + ast_log(LOG_WARNING, "Failed to get node addresses\n"); + continue; + } + + ast_cli(a->fd, "=== Node %d\n", i + 1); + + for (j = 0; j < num_addrs; j++) { + struct sockaddr *sa = (struct sockaddr *) addrs[j].address; + size_t sa_len = (size_t) addrs[j].address_length; + char buf[128]; + + getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST); + + ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf); + } + } + + ast_cli(a->fd, "===\n" + "=============================================================\n" + "\n"); + + return CLI_SUCCESS; +} + +static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + unsigned int i; + + switch (cmd) { + case CLI_INIT: + e->command = "corosync show config"; + e->usage = + "Usage: corosync show config\n" + " Show configuration loaded from res_corosync.conf\n"; + return NULL; + + case CLI_GENERATE: + return NULL; /* no completion */ + } + + if (a->argc != e->args) { + return CLI_SHOWUSAGE; + } + + ast_cli(a->fd, "\n" + "=============================================================\n" + "=== res_corosync config =====================================\n" + "=============================================================\n" + "===\n"); + + ast_rwlock_rdlock(&event_types_lock); + for (i = 0; i < ARRAY_LEN(event_types); i++) { + if (event_types[i].publish) { + ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n", + event_types[i].name); + } + if (event_types[i].subscribe) { + ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n", + event_types[i].name); + } + } + ast_rwlock_unlock(&event_types_lock); + + ast_cli(a->fd, "===\n" + "=============================================================\n" + "\n"); + + return CLI_SUCCESS; +} + +static struct ast_cli_entry corosync_cli[] = { + AST_CLI_DEFINE(corosync_show_config, "Show configuration"), + AST_CLI_DEFINE(corosync_show_members, "Show cluster members"), +}; + +enum { + PUBLISH, + SUBSCRIBE, +}; + +static int set_event(const char *event_type, int pubsub) +{ + unsigned int i; + + for (i = 0; i < ARRAY_LEN(event_types); i++) { + if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) { + continue; + } + + switch (pubsub) { + case PUBLISH: + event_types[i].publish = 1; + break; + case SUBSCRIBE: + event_types[i].subscribe = 1; + break; + } + + break; + } + + return (i == ARRAY_LEN(event_types)) ? -1 : 0; +} + +static int load_general_config(struct ast_config *cfg) +{ + struct ast_variable *v; + int res = 0; + unsigned int i; + + ast_rwlock_wrlock(&event_types_lock); + + for (i = 0; i < ARRAY_LEN(event_types); i++) { + event_types[i].publish = 0; + event_types[i].subscribe = 0; + } + + for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) { + if (!strcasecmp(v->name, "publish_event")) { + res = set_event(v->value, PUBLISH); + } else if (!strcasecmp(v->name, "subscribe_event")) { + res = set_event(v->value, SUBSCRIBE); + } else { + ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name); + } + } + + for (i = 0; i < ARRAY_LEN(event_types); i++) { + if (event_types[i].publish && !event_types[i].sub) { + event_types[i].sub = ast_event_subscribe(i, + ast_event_cb, "Corosync", NULL, + AST_EVENT_IE_END); + } else if (!event_types[i].publish && event_types[i].sub) { + event_types[i].sub = ast_event_unsubscribe(event_types[i].sub); + } + } + + ast_rwlock_unlock(&event_types_lock); + + return res; +} + +static int load_config(unsigned int reload) +{ + static const char filename[] = "res_corosync.conf"; + struct ast_config *cfg; + const char *cat = NULL; + struct ast_flags config_flags = { 0 }; + int res = 0; + + cfg = ast_config_load(filename, config_flags); + + if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) { + return -1; + } + + while ((cat = ast_category_browse(cfg, cat))) { + if (!strcasecmp(cat, "general")) { + res = load_general_config(cfg); + } else { + ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat); + } + } + + ast_config_destroy(cfg); + + return res; +} + +static void cleanup_module(void) +{ + cs_error_t cs_err; + unsigned int i; + + for (i = 0; i < ARRAY_LEN(event_types); i++) { + if (event_types[i].sub) { + event_types[i].sub = ast_event_unsubscribe(event_types[i].sub); + } + event_types[i].publish = 0; + event_types[i].subscribe = 0; + } + + if (dispatch_thread.id != AST_PTHREADT_NULL) { + char meepmeep = 'x'; + dispatch_thread.stop = 1; + if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1, + 5000) == -1) { + ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n", + strerror(errno), errno); + } + pthread_join(dispatch_thread.id, NULL); + } + + if (dispatch_thread.alert_pipe[0] != -1) { + close(dispatch_thread.alert_pipe[0]); + dispatch_thread.alert_pipe[0] = -1; + } + + if (dispatch_thread.alert_pipe[1] != -1) { + close(dispatch_thread.alert_pipe[1]); + dispatch_thread.alert_pipe[1] = -1; + } + + if (cpg_handle && (cs_err = cpg_finalize(cpg_handle) != CS_OK)) { + ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err); + } + cpg_handle = 0; + + if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle) != CS_OK)) { + ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err); + } + cfg_handle = 0; +} + +static int load_module(void) +{ + cs_error_t cs_err; + enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE; + struct cpg_name name; + + if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks) != CS_OK)) { + ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err); + return AST_MODULE_LOAD_DECLINE; + } + + if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) { + ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err); + goto failed; + } + + ast_copy_string(name.value, "asterisk", sizeof(name.value)); + name.length = strlen(name.value); + + if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err); + goto failed; + } + + if (pipe(dispatch_thread.alert_pipe) == -1) { + ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n", + strerror(errno), errno); + goto failed; + } + + if (ast_pthread_create_background(&dispatch_thread.id, NULL, + dispatch_thread_handler, NULL)) { + ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n"); + goto failed; + } + + if (load_config(0)) { + /* simply not configured is not a fatal error */ + res = AST_MODULE_LOAD_DECLINE; + goto failed; + } + + ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli)); + + ast_enable_distributed_devstate(); + + return AST_MODULE_LOAD_SUCCESS; + +failed: + cleanup_module(); + + return res; +} + +static int unload_module(void) +{ + ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli)); + + cleanup_module(); + + return 0; +} + +AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Corosync"); |