diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/manager.c | 4 | ||||
-rw-r--r-- | main/manager_endpoints.c | 104 | ||||
-rw-r--r-- | main/stasis_endpoints.c | 131 |
3 files changed, 239 insertions, 0 deletions
diff --git a/main/manager.c b/main/manager.c index d4960d6de..a1b0c7fea 100644 --- a/main/manager.c +++ b/main/manager.c @@ -7771,6 +7771,10 @@ static int __init_manager(int reload, int by_external_config) if (manager_bridging_init()) { return -1; } + if (manager_endpoints_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager endpoints handling\n"); + return -1; + } } if (!registered) { diff --git a/main/manager_endpoints.c b/main/manager_endpoints.c new file mode 100644 index 000000000..1a36424af --- /dev/null +++ b/main/manager_endpoints.c @@ -0,0 +1,104 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * Joshua Colp <jcolp@digium.com> + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief The Asterisk Management Interface - AMI (endpoint handling) + * + * \author Joshua Colp <jcolp@digium.com> + * \author David M. Lee, II <dlee@digium.com> + * + */ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/callerid.h" +#include "asterisk/channel.h" +#include "asterisk/manager.h" +#include "asterisk/stasis_message_router.h" +#include "asterisk/pbx.h" +#include "asterisk/stasis_endpoints.h" + +static struct stasis_message_router *endpoint_router; + +/*! \brief The \ref stasis subscription returned by the forwarding of the endpoint topic + * to the manager topic + */ +static struct stasis_subscription *topic_forwarder; + +static void manager_endpoints_shutdown(void) +{ + stasis_message_router_unsubscribe_and_join(endpoint_router); + endpoint_router = NULL; + + stasis_unsubscribe(topic_forwarder); + topic_forwarder = NULL; +} + +int manager_endpoints_init(void) +{ + struct stasis_topic *manager_topic; + struct stasis_topic *endpoint_topic; + struct stasis_message_router *message_router; + int ret = 0; + + if (endpoint_router) { + /* Already initialized */ + return 0; + } + + ast_register_atexit(manager_endpoints_shutdown); + + manager_topic = ast_manager_get_topic(); + if (!manager_topic) { + return -1; + } + message_router = ast_manager_get_message_router(); + if (!message_router) { + return -1; + } + endpoint_topic = stasis_caching_get_topic(ast_endpoint_topic_all_cached()); + if (!endpoint_topic) { + return -1; + } + + topic_forwarder = stasis_forward_all(endpoint_topic, manager_topic); + if (!topic_forwarder) { + return -1; + } + + endpoint_router = stasis_message_router_create(endpoint_topic); + + if (!endpoint_router) { + return -1; + } + + /* If somehow we failed to add any routes, just shut down the whole + * thing and fail it. + */ + if (ret) { + manager_endpoints_shutdown(); + return -1; + } + + return 0; +} + diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index d5347cbcb..90d968567 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -35,12 +35,139 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis.h" #include "asterisk/stasis_endpoints.h" +/*** DOCUMENTATION + <managerEvent language="en_US" name="PeerStatus"> + <managerEventInstance class="EVENT_FLAG_SYSTEM"> + <synopsis>Raised when the state of a peer changes.</synopsis> + <syntax> + <parameter name="ChannelType"> + <para>The channel technology of the peer.</para> + </parameter> + <parameter name="Peer"> + <para>The name of the peer (including channel technology).</para> + </parameter> + <parameter name="PeerStatus"> + <para>New status of the peer.</para> + <enumlist> + <enum name="Unknown"/> + <enum name="Registered"/> + <enum name="Unregistered"/> + <enum name="Rejected"/> + <enum name="Lagged"/> + </enumlist> + </parameter> + <parameter name="Cause"> + <para>The reason the status has changed.</para> + </parameter> + <parameter name="Address"> + <para>New address of the peer.</para> + </parameter> + <parameter name="Port"> + <para>New port for the peer.</para> + </parameter> + <parameter name="Time"> + <para>Time it takes to reach the peer and receive a response.</para> + </parameter> + </syntax> + </managerEventInstance> + </managerEvent> +***/ + +static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg); + STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type); +STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type, + .to_ami = peerstatus_to_ami, +); static struct stasis_topic *endpoint_topic_all; static struct stasis_caching_topic *endpoint_topic_all_cached; +static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg) +{ + struct ast_endpoint_blob *obj = stasis_message_data(msg); + RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free); + const char *value; + + /* peer_status is the only *required* thing */ + if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) { + return NULL; + } + ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value); + + if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) { + ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value); + } + if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) { + ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value); + } + if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) { + ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value); + } + if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) { + ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value); + } + + return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "PeerStatus", + "ChannelType: %s\r\n" + "Peer: %s/%s\r\n" + "%s", + obj->snapshot->tech, + obj->snapshot->tech, + obj->snapshot->resource, + ast_str_buffer(peerstatus_event_string)); +} + +static void endpoint_blob_dtor(void *obj) +{ + struct ast_endpoint_blob *event = obj; + ao2_cleanup(event->snapshot); + ast_json_unref(event->blob); +} + +struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint, + struct stasis_message_type *type, struct ast_json *blob) +{ + RAII_VAR(struct ast_endpoint_blob *, obj, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + if (!blob) { + blob = ast_json_null(); + } + + if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) { + return NULL; + } + + if (endpoint) { + if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) { + return NULL; + } + } + + obj->blob = ast_json_ref(blob); + + if (!(msg = stasis_message_create(type, obj))) { + return NULL; + } + + ao2_ref(msg, +1); + return msg; +} + +void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type, + struct ast_json *blob) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + if (blob) { + message = ast_endpoint_blob_create(endpoint, type, blob); + } + if (message) { + stasis_publish(ast_endpoint_topic(endpoint), message); + } +} + struct stasis_topic *ast_endpoint_topic_all(void) { return endpoint_topic_all; @@ -175,5 +302,9 @@ int ast_endpoint_stasis_init(void) return -1; } + if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type) != 0) { + return -1; + } + return 0; } |