diff options
Diffstat (limited to 'res/stasis/messaging.c')
-rw-r--r-- | res/stasis/messaging.c | 531 |
1 files changed, 531 insertions, 0 deletions
diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c new file mode 100644 index 000000000..47730851f --- /dev/null +++ b/res/stasis/messaging.c @@ -0,0 +1,531 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2014, Digium, Inc. + * + * Matt Jordan <mjordan@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * + * \brief Stasis out-of-call text message support + * + * \author Matt Jordan <mjordan@digium.com> + */ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/message.h" +#include "asterisk/endpoints.h" +#include "asterisk/astobj2.h" +#include "asterisk/vector.h" +#include "asterisk/lock.h" +#include "asterisk/utils.h" +#include "asterisk/test.h" +#include "messaging.h" + +/*! + * \brief Number of buckets for the \ref endpoint_subscriptions container + */ +#define ENDPOINTS_NUM_BUCKETS 127 + +/*! \brief Storage object for an application */ +struct application_tuple { + /*! ao2 ref counted private object to pass to the callback */ + void *pvt; + /*! The callback to call when this application has a message */ + message_received_cb callback; + /*! The name (key) of the application */ + char app_name[]; +}; + +/*! \brief A subscription to some endpoint or technology */ +struct message_subscription { + /*! The applications that have subscribed to this endpoint or tech */ + AST_VECTOR(, struct application_tuple *) applications; + /*! The name of this endpoint or tech */ + char token[]; +}; + +/*! \brief The subscriptions to endpoints */ +static struct ao2_container *endpoint_subscriptions; + +/*! + * \brief The subscriptions to technologies + * + * \note These are stored separately from standard endpoints, given how + * relatively few of them there are. + */ +static AST_VECTOR(,struct message_subscription *) tech_subscriptions; + +/*! \brief RWLock for \c tech_subscriptions */ +static ast_rwlock_t tech_subscriptions_lock; + +/*! \internal \brief Destructor for \c application_tuple */ +static void application_tuple_dtor(void *obj) +{ + struct application_tuple *tuple = obj; + + ao2_cleanup(tuple->pvt); +} + +/*! \internal \brief Constructor for \c application_tuple */ +static struct application_tuple *application_tuple_alloc(const char *app_name, message_received_cb callback, void *pvt) +{ + struct application_tuple *tuple; + size_t size = sizeof(*tuple) + strlen(app_name) + 1; + + ast_assert(callback != NULL); + + tuple = ao2_t_alloc(size, application_tuple_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!tuple) { + return NULL; + } + + strcpy(tuple->app_name, app_name); /* Safe */ + tuple->pvt = ao2_bump(pvt); + tuple->callback = callback; + + return tuple; +} + +/*! \internal \brief Destructor for \ref message_subscription */ +static void message_subscription_dtor(void *obj) +{ + struct message_subscription *sub = obj; + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { + struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i); + + ao2_cleanup(tuple); + } + AST_VECTOR_FREE(&sub->applications); +} + +/*! \internal \brief Constructor for \ref message_subscription */ +static struct message_subscription *message_subscription_alloc(const char *token) +{ + struct message_subscription *sub; + size_t size = sizeof(*sub) + strlen(token) + 1; + + sub = ao2_t_alloc(size, message_subscription_dtor, AO2_ALLOC_OPT_LOCK_RWLOCK); + if (!sub) { + return NULL; + } + strcpy(sub->token, token); /* Safe */ + + return sub; +} + +/*! AO2 hash function for \ref message_subscription */ +static int message_subscription_hash_cb(const void *obj, const int flags) +{ + const struct message_subscription *sub; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + sub = obj; + key = sub->token; + break; + default: + /* Hash can only work on something with a full key. */ + ast_assert(0); + return 0; + } + return ast_str_hash(key); +} + +/*! AO2 comparison function for \ref message_subscription */ +static int message_subscription_compare_cb(void *obj, void *arg, int flags) +{ + const struct message_subscription *object_left = obj; + const struct message_subscription *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->token; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcmp(object_left->token, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* + * We could also use a partial key struct containing a length + * so strlen() does not get called for every comparison instead. + */ + cmp = strncmp(object_left->token, right_key, strlen(right_key)); + break; + default: + /* + * What arg points to is specific to this traversal callback + * and has no special meaning to astobj2. + */ + cmp = 0; + break; + } + if (cmp) { + return 0; + } + /* + * At this point the traversal callback is identical to a sorted + * container. + */ + return CMP_MATCH; +} + +/*! \internal \brief Convert a \c ast_msg To/From URI to a Stasis endpoint name */ +static void msg_to_endpoint(const struct ast_msg *msg, char *buf, size_t len) +{ + const char *endpoint = ast_msg_get_endpoint(msg); + + snprintf(buf, len, "%s%s%s", ast_msg_get_tech(msg), + ast_strlen_zero(endpoint) ? "" : "/", + S_OR(endpoint, "")); +} + +/*! \internal + * \brief Callback from the \c message API that determines if we can handle + * this message + */ +static int has_destination_cb(const struct ast_msg *msg) +{ + struct message_subscription *sub; + int i; + char buf[256]; + + msg_to_endpoint(msg, buf, sizeof(buf)); + + ast_rwlock_rdlock(&tech_subscriptions_lock); + for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { + sub = AST_VECTOR_GET(&tech_subscriptions, i); + + if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token)) + || !strncasecmp(sub->token, buf, strlen(sub->token)))) { + ast_rwlock_unlock(&tech_subscriptions_lock); + sub = NULL; /* No ref bump! */ + goto match; + } + + } + ast_rwlock_unlock(&tech_subscriptions_lock); + + sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); + if (sub) { + goto match; + } + + ast_debug(1, "No subscription found for %s\n", buf); + return 0; + +match: + ao2_cleanup(sub); + return 1; +} + +static struct ast_json *msg_to_json(struct ast_msg *msg) +{ + struct ast_json *json_obj; + struct ast_json *json_vars; + struct ast_msg_var_iterator *it_vars; + const char *name; + const char *value; + + it_vars = ast_msg_var_iterator_init(msg); + if (!it_vars) { + return NULL; + } + + json_vars = ast_json_array_create(); + if (!json_vars) { + return NULL; + } + + while (ast_msg_var_iterator_next(msg, it_vars, &name, &value)) { + struct ast_json *json_tuple; + + json_tuple = ast_json_pack("{s: s}", name, value); + if (!json_tuple) { + ast_json_free(json_vars); + return NULL; + } + + ast_json_array_append(json_vars, json_tuple); + ast_msg_var_unref_current(it_vars); + } + ast_msg_var_iterator_destroy(it_vars); + + json_obj = ast_json_pack("{s: s, s: s, s: s, s: o}", + "from", ast_msg_get_from(msg), + "to", ast_msg_get_to(msg), + "body", ast_msg_get_body(msg), + "variables", json_vars); + + return json_obj; +} + +static int handle_msg_cb(struct ast_msg *msg) +{ + struct message_subscription *sub; + int i; + char buf[256]; + const char *endpoint_name; + struct ast_json *json_msg; + + msg_to_endpoint(msg, buf, sizeof(buf)); + + ast_rwlock_rdlock(&tech_subscriptions_lock); + for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { + sub = AST_VECTOR_GET(&tech_subscriptions, i); + + if (!sub) { + continue; + } + + if (!strncasecmp(sub->token, buf, strlen(sub->token))) { + ast_rwlock_unlock(&tech_subscriptions_lock); + ao2_bump(sub); + endpoint_name = buf; + goto match; + } + } + ast_rwlock_unlock(&tech_subscriptions_lock); + + sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); + if (sub) { + endpoint_name = buf; + goto match; + } + + return -1; + +match: + ast_debug(3, "Dispatching message for %s\n", endpoint_name); + + json_msg = msg_to_json(msg); + if (!json_msg) { + ao2_ref(sub, -1); + return -1; + } + + for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { + struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i); + + tuple->callback(endpoint_name, json_msg, tuple->pvt); + } + + ast_json_unref(json_msg); + ao2_ref(sub, -1); + return 0; +} + +struct ast_msg_handler ari_msg_handler = { + .name = "ari", + .handle_msg = handle_msg_cb, + .has_destination = has_destination_cb, +}; + +static int messaging_subscription_cmp(struct message_subscription *sub, const char *key) +{ + return !strcmp(sub->token, key) ? 1 : 0; +} + +static int application_tuple_cmp(struct application_tuple *item, const char *key) +{ + return !strcmp(item->app_name, key) ? 1 : 0; +} + +static int is_app_subscribed(struct message_subscription *sub, const char *app_name) +{ + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { + struct application_tuple *tuple; + + tuple = AST_VECTOR_GET(&sub->applications, i); + if (tuple && !strcmp(tuple->app_name, app_name)) { + return 1; + } + } + + return 0; +} + +static struct message_subscription *get_subscription(struct ast_endpoint *endpoint) +{ + struct message_subscription *sub = NULL; + + if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY); + } else { + int i; + + ast_rwlock_rdlock(&tech_subscriptions_lock); + for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { + sub = AST_VECTOR_GET(&tech_subscriptions, i); + + if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) { + ao2_bump(sub); + break; + } + } + ast_rwlock_unlock(&tech_subscriptions_lock); + } + + return sub; +} + +void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id) +{ + RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup); + RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); + + endpoint = ast_endpoint_find_by_id(endpoint_id); + if (!endpoint) { + return; + } + + sub = get_subscription(endpoint); + if (!sub) { + return; + } + + ao2_lock(sub); + if (!is_app_subscribed(sub, app_name)) { + ao2_unlock(sub); + return; + } + + AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup); + if (AST_VECTOR_SIZE(&sub->applications) == 0) { + if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + ao2_unlink(endpoint_subscriptions, sub); + } else { + ast_rwlock_wrlock(&tech_subscriptions_lock); + AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint), + messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP); + ast_rwlock_unlock(&tech_subscriptions_lock); + } + } + ao2_unlock(sub); + ao2_ref(sub, -1); + + ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n", + app_name, ast_endpoint_get_id(endpoint)); +} + +static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint) +{ + struct message_subscription *sub = get_subscription(endpoint); + + if (sub) { + return sub; + } + + sub = message_subscription_alloc(ast_endpoint_get_id(endpoint)); + if (!sub) { + return NULL; + } + + if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + ao2_link(endpoint_subscriptions, sub); + } else { + ast_rwlock_wrlock(&tech_subscriptions_lock); + AST_VECTOR_APPEND(&tech_subscriptions, ao2_bump(sub)); + ast_rwlock_unlock(&tech_subscriptions_lock); + } + + return sub; +} + +int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt) +{ + RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup); + struct application_tuple *tuple; + + sub = get_or_create_subscription(endpoint); + if (!sub) { + return -1; + } + + ao2_lock(sub); + if (is_app_subscribed(sub, app_name)) { + ao2_unlock(sub); + return 0; + } + + tuple = application_tuple_alloc(app_name, callback, pvt); + if (!tuple) { + ao2_unlock(sub); + return -1; + } + AST_VECTOR_APPEND(&sub->applications, tuple); + ao2_unlock(sub); + + ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n", + app_name, ast_endpoint_get_id(endpoint)); + + return 0; +} + + +int messaging_cleanup(void) +{ + ast_msg_handler_unregister(&ari_msg_handler); + ao2_ref(endpoint_subscriptions, -1); + AST_VECTOR_FREE(&tech_subscriptions); + ast_rwlock_destroy(&tech_subscriptions_lock);\ + + return 0; +} + +int messaging_init(void) +{ + endpoint_subscriptions = ao2_t_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0, + ENDPOINTS_NUM_BUCKETS, message_subscription_hash_cb, NULL, + message_subscription_compare_cb, "Endpoint messaging subscription container creation"); + if (!endpoint_subscriptions) { + return -1; + } + + if (AST_VECTOR_INIT(&tech_subscriptions, 4)) { + ao2_ref(endpoint_subscriptions, -1); + return -1; + } + + if (ast_rwlock_init(&tech_subscriptions_lock)) { + ao2_ref(endpoint_subscriptions, -1); + AST_VECTOR_FREE(&tech_subscriptions); + return -1; + } + + if (ast_msg_handler_register(&ari_msg_handler)) { + ao2_ref(endpoint_subscriptions, -1); + AST_VECTOR_FREE(&tech_subscriptions); + ast_rwlock_destroy(&tech_subscriptions_lock); + return -1; + } + + return 0; +} |