diff options
Diffstat (limited to 'res')
-rw-r--r-- | res/ari/resource_events.c | 24 | ||||
-rw-r--r-- | res/ari/resource_events.h | 2 | ||||
-rw-r--r-- | res/res_ari_events.c | 6 | ||||
-rw-r--r-- | res/res_stasis.c | 41 | ||||
-rw-r--r-- | res/stasis/app.c | 335 | ||||
-rw-r--r-- | res/stasis/app.h | 15 | ||||
-rw-r--r-- | res/stasis/messaging.c | 44 |
7 files changed, 325 insertions, 142 deletions
diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c index 09bcafc2d..71d54b494 100644 --- a/res/ari/resource_events.c +++ b/res/ari/resource_events.c @@ -148,9 +148,11 @@ static void app_handler(void *data, const char *app_name, * \brief Register for all of the apps given. * \param session Session info struct. * \param app_name Name of application to register. + * \param register_handler Pointer to the application registration handler */ static int session_register_app(struct event_session *session, - const char *app_name) + const char *app_name, + int (* register_handler)(const char *, stasis_app_cb handler, void *data)) { SCOPED_AO2LOCK(lock, session); @@ -167,7 +169,7 @@ static int session_register_app(struct event_session *session, return -1; } - stasis_app_register(app_name, app_handler, session); + register_handler(app_name, app_handler, session); return 0; } @@ -178,6 +180,7 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session { int res = 0; size_t i, j; + int (* register_handler)(const char *, stasis_app_cb handler, void *data); ast_debug(3, "/events WebSocket attempted\n"); @@ -186,13 +189,19 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session return -1; } + if (args->subscribe_all) { + register_handler = &stasis_app_register_all; + } else { + register_handler = &stasis_app_register; + } + for (i = 0; i < args->app_count; ++i) { if (ast_strlen_zero(args->app[i])) { res = -1; break; } - res |= stasis_app_register(args->app[i], app_handler, NULL); + res |= register_handler(args->app[i], app_handler, NULL); } if (res) { @@ -213,6 +222,7 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock struct ast_json *msg; int res; size_t i; + int (* register_handler)(const char *, stasis_app_cb handler, void *data); ast_debug(3, "/events WebSocket connection\n"); @@ -222,12 +232,18 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock return; } + if (args->subscribe_all) { + register_handler = &stasis_app_register_all; + } else { + register_handler = &stasis_app_register; + } + res = 0; for (i = 0; i < args->app_count; ++i) { if (ast_strlen_zero(args->app[i])) { continue; } - res |= session_register_app(session, args->app[i]); + res |= session_register_app(session, args->app[i], register_handler); } if (ao2_container_count(session->websocket_apps) == 0) { diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h index 2b631819b..c48269958 100644 --- a/res/ari/resource_events.h +++ b/res/ari/resource_events.h @@ -47,6 +47,8 @@ struct ast_ari_events_event_websocket_args { size_t app_count; /*! Parsing context for app. */ char *app_parse; + /*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */ + int subscribe_all; }; /*! diff --git a/res/res_ari_events.c b/res/res_ari_events.c index 426538511..65bd38d5b 100644 --- a/res/res_ari_events.c +++ b/res/res_ari_events.c @@ -110,6 +110,9 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess args.app[j] = (vals[j]); } } else + if (strcmp(i->name, "subscribeAll") == 0) { + args.subscribe_all = ast_true(i->value); + } else {} } @@ -208,6 +211,9 @@ static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocke args.app[j] = (vals[j]); } } else + if (strcmp(i->name, "subscribeAll") == 0) { + args.subscribe_all = ast_true(i->value); + } else {} } diff --git a/res/res_stasis.c b/res/res_stasis.c index fc34fa36f..25866d9bb 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -109,6 +109,11 @@ struct ao2_container *app_bridges_moh; struct ao2_container *app_bridges_playback; +/*! + * \internal \brief List of registered event sources. + */ +AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source); + static struct ast_json *stasis_end_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) { @@ -1469,7 +1474,7 @@ struct ao2_container *stasis_app_get_all(void) return ao2_bump(apps); } -int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) +static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events) { RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); @@ -1482,8 +1487,20 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) if (app) { app_update(app, handler, data); } else { - app = app_create(app_name, handler, data); + app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL); if (app) { + if (all_events) { + struct stasis_app_event_source *source; + SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + + AST_LIST_TRAVERSE(&event_sources, source, next) { + if (!source->subscribe) { + continue; + } + + source->subscribe(app, NULL); + } + } ao2_link_flags(apps_registry, app, OBJ_NOLOCK); } else { ao2_unlock(apps_registry); @@ -1499,6 +1516,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) return 0; } +int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) +{ + return __stasis_app_register(app_name, handler, data, 0); +} + +int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data) +{ + return __stasis_app_register(app_name, handler, data, 1); +} + void stasis_app_unregister(const char *app_name) { RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); @@ -1526,11 +1553,6 @@ void stasis_app_unregister(const char *app_name) cleanup(); } -/*! - * \internal \brief List of registered event sources. - */ -AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source); - void stasis_app_register_event_source(struct stasis_app_event_source *obj) { SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); @@ -1727,8 +1749,8 @@ static enum stasis_app_subscribe_res app_subscribe( ast_debug(3, "%s: Checking %s\n", app_name, uri); - if (!event_source->find || - (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) { + if (!ast_strlen_zero(uri + strlen(event_source->scheme)) && + (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) { ast_log(LOG_WARNING, "Event source not found: %s\n", uri); return STASIS_ASR_EVENT_SOURCE_NOT_FOUND; } @@ -2062,6 +2084,7 @@ static int load_module(void) } AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support", + .load_pri = AST_MODPRI_APP_DEPEND, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, diff --git a/res/stasis/app.c b/res/stasis/app.c index caa27abfc..353918241 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -38,6 +38,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_endpoints.h" #include "asterisk/stasis_message_router.h" +#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC" +#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC" +#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC" + static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate); struct stasis_app { @@ -47,12 +51,16 @@ struct stasis_app { struct stasis_message_router *router; /*! Router for handling messages to the bridge all \a topic. */ struct stasis_message_router *bridge_router; + /*! Optional router for handling endpoint messages in 'all' subscriptions */ + struct stasis_message_router *endpoint_router; /*! Container of the channel forwards to this app's topic. */ struct ao2_container *forwards; /*! Callback function for this application. */ stasis_app_cb handler; /*! Opaque data to hand to callback function. */ void *data; + /*! Subscription model for the application */ + enum stasis_app_subscription_model subscription_model; /*! Name of the Stasis application */ char name[]; }; @@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app, static struct app_forwards *forwards_create_channel(struct stasis_app *app, struct ast_channel *chan) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + struct app_forwards *forwards; - if (!app || !chan) { + if (!app) { return NULL; } - forwards = forwards_create(app, ast_channel_uniqueid(chan)); + forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_CHANNEL; - forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), - app->topic); - if (!forwards->topic_forward) { - return NULL; + if (chan) { + forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), + app->topic); } - forwards->topic_cached_forward = stasis_forward_all( - ast_channel_topic_cached(chan), app->topic); - if (!forwards->topic_cached_forward) { + chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(), + app->topic); + + if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); return NULL; } - ao2_ref(forwards, +1); return forwards; } @@ -156,69 +163,101 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app, static struct app_forwards *forwards_create_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + struct app_forwards *forwards; - if (!app || !bridge) { + if (!app) { return NULL; } - forwards = forwards_create(app, bridge->uniqueid); + forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_BRIDGE; - forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), - app->topic); - if (!forwards->topic_forward) { - return NULL; + if (bridge) { + forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), + app->topic); } - forwards->topic_cached_forward = stasis_forward_all( - ast_bridge_topic_cached(bridge), app->topic); - if (!forwards->topic_cached_forward) { + bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(), + app->topic); + + if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); return NULL; } - ao2_ref(forwards, +1); return forwards; } +static void endpoint_state_cb(void *data, struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct stasis_app *app = data; + + stasis_publish(app->topic, message); +} + /*! Forward a endpoint's topics to an app */ static struct app_forwards *forwards_create_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - if (!app || !endpoint) { + struct app_forwards *forwards; + int ret = 0; + + if (!app) { return NULL; } - forwards = forwards_create(app, ast_endpoint_get_id(endpoint)); + forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_ENDPOINT; - forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint), - app->topic); - if (!forwards->topic_forward) { - return NULL; - } + if (endpoint) { + forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint), + app->topic); + forwards->topic_cached_forward = stasis_forward_all( + ast_endpoint_topic_cached(endpoint), app->topic); + + if (!forwards->topic_forward || !forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); + return NULL; + } + } else { + /* Since endpoint subscriptions also subscribe to channels, in the case + * of all endpoint subscriptions, we only want messages for the endpoints. + * As such, we route those particular messages and then re-publish them + * on the app's topic. + */ + ast_assert(app->endpoint_router == NULL); + app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached()); + if (!app->endpoint_router) { + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); + return NULL; + } - forwards->topic_cached_forward = stasis_forward_all( - ast_endpoint_topic_cached(endpoint), app->topic); - if (!forwards->topic_cached_forward) { - /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; - return NULL; + ret |= stasis_message_router_add(app->endpoint_router, + ast_endpoint_state_type(), endpoint_state_cb, app); + ret |= stasis_message_router_add(app->endpoint_router, + ast_endpoint_contact_state_type(), endpoint_state_cb, app); + + if (ret) { + ao2_ref(app->endpoint_router, -1); + app->endpoint_router = NULL; + ao2_ref(forwards, -1); + return NULL; + } } - ao2_ref(forwards, +1); return forwards; } @@ -260,6 +299,7 @@ static void app_dtor(void *obj) ast_assert(app->router == NULL); ast_assert(app->bridge_router == NULL); + ast_assert(app->endpoint_router == NULL); ao2_cleanup(app->topic); app->topic = NULL; @@ -793,7 +833,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub, } } -struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data) +struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model) { RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); size_t size; @@ -806,10 +846,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat size = sizeof(*app) + strlen(name) + 1; app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX); - if (!app) { return NULL; } + app->subscription_model = subscription_model; app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, @@ -877,7 +917,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat return app; } -struct stasis_topic *ast_app_get_topic(struct stasis_app *app) { +struct stasis_topic *ast_app_get_topic(struct stasis_app *app) +{ return app->topic; } @@ -930,6 +971,8 @@ void app_shutdown(struct stasis_app *app) app->router = NULL; stasis_message_router_unsubscribe(app->bridge_router); app->bridge_router = NULL; + stasis_message_router_unsubscribe(app->endpoint_router); + app->endpoint_router = NULL; } int app_is_active(struct stasis_app *app) @@ -1029,34 +1072,47 @@ struct ast_json *app_to_json(const struct stasis_app *app) int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); int res; - if (!app || !chan) { + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan), - OBJ_SEARCH_KEY | OBJ_NOLOCK); - if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_channel(app, chan); - if (!forwards) { - return -1; - } + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } - res = ao2_link_flags(app->forwards, forwards, - OBJ_NOLOCK); - if (!res) { - return -1; - } + forwards = ao2_find(app->forwards, + chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_channel(app, chan); + if (!forwards) { + return -1; } - ++forwards->interested; - ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name); - return 0; + res = ao2_link_flags(app->forwards, forwards, + OBJ_NOLOCK); + if (!res) { + ao2_ref(forwards, -1); + return -1; + } } + + ++forwards->interested; + ast_debug(3, "Channel '%s' is %d interested in %s\n", + chan ? ast_channel_uniqueid(chan) : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_channel(struct stasis_app *app, void *obj) @@ -1069,6 +1125,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); SCOPED_AO2LOCK(lock, app->forwards); + if (!id) { + if (!strcmp(kind, "bridge")) { + id = BRIDGE_ALL; + } else if (!strcmp(kind, "channel")) { + id = CHANNEL_ALL; + } else if (!strcmp(kind, "endpoint")) { + id = ENDPOINT_ALL; + } else { + ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind); + return -1; + } + } + forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (!forwards) { ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id); @@ -1095,16 +1164,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan) { - if (!app || !chan) { + if (!app) { return -1; } - return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan)); + return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL); } int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id) { - if (!app || !channel_id) { + if (!app) { return -1; } @@ -1114,6 +1183,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id) int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id) { RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (ast_strlen_zero(channel_id)) { + channel_id = CHANNEL_ALL; + } forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY); return forwards != NULL; } @@ -1133,28 +1206,39 @@ struct stasis_app_event_source channel_event_source = { int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - if (!app || !bridge) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, bridge->uniqueid, - OBJ_SEARCH_KEY | OBJ_NOLOCK); + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } + forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_bridge(app, bridge); if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_bridge(app, bridge); - if (!forwards) { - return -1; - } - ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); + return -1; } - - ++forwards->interested; - ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name); - return 0; + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); } + + ++forwards->interested; + ast_debug(3, "Bridge '%s' is %d interested in %s\n", + bridge ? bridge->uniqueid : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_bridge(struct stasis_app *app, void *obj) @@ -1164,16 +1248,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj) int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - if (!app || !bridge) { + if (!app) { return -1; } - return app_unsubscribe_bridge_id(app, bridge->uniqueid); + return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL); } int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id) { - if (!app || !bridge_id) { + if (!app) { return -1; } @@ -1182,9 +1266,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id) int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY); - return forwards != NULL; + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 1; + } + + if (ast_strlen_zero(bridge_id)) { + bridge_id = BRIDGE_ALL; + } + + forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 1; + } + + return 0; } static void *bridge_find(const struct stasis_app *app, const char *id) @@ -1202,31 +1303,43 @@ struct stasis_app_event_source bridge_event_source = { int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint) { - if (!app || !endpoint) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint), - OBJ_SEARCH_KEY | OBJ_NOLOCK); + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } + forwards = ao2_find(app->forwards, + endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_endpoint(app, endpoint); if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_endpoint(app, endpoint); - if (!forwards) { - return -1; - } - ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); - - /* Subscribe for messages */ - messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); + return -1; } + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); - ++forwards->interested; - ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name); - return 0; + /* Subscribe for messages */ + messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); } + + ++forwards->interested; + ast_debug(3, "Endpoint '%s' is %d interested in %s\n", + endpoint ? ast_endpoint_get_id(endpoint) : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_endpoint(struct stasis_app *app, void *obj) @@ -1236,7 +1349,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj) int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) { - if (!app || !endpoint_id) { + if (!app) { return -1; } @@ -1246,6 +1359,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id) { RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (ast_strlen_zero(endpoint_id)) { + endpoint_id = ENDPOINT_ALL; + } forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY); return forwards != NULL; } diff --git a/res/stasis/app.h b/res/stasis/app.h index 59574f584..2c8db1ccd 100644 --- a/res/stasis/app.h +++ b/res/stasis/app.h @@ -36,6 +36,19 @@ */ struct stasis_app; +enum stasis_app_subscription_model { + /* + * \brief An application must manually subscribe to each + * resource that it cares about. This is the default approach. + */ + STASIS_APP_SUBSCRIBE_MANUAL, + /* + * \brief An application is automatically subscribed to all + * resources in Asterisk, even if it does not control them. + */ + STASIS_APP_SUBSCRIBE_ALL +}; + /*! * \brief Create a res_stasis application. * @@ -45,7 +58,7 @@ struct stasis_app; * \return New \c res_stasis application. * \return \c NULL on error. */ -struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data); +struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model); /*! * \brief Tears down an application. diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c index fd7cf9f7b..229a3a646 100644 --- a/res/stasis/messaging.c +++ b/res/stasis/messaging.c @@ -38,6 +38,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "messaging.h" /*! + * \brief Subscription to all technologies + */ +#define TECH_WILDCARD "__AST_ALL_TECH" + +/*! * \brief Number of buckets for the \ref endpoint_subscriptions container */ #define ENDPOINTS_NUM_BUCKETS 127 @@ -219,10 +224,14 @@ static int has_destination_cb(const struct ast_msg *msg) for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { sub = AST_VECTOR_GET(&tech_subscriptions, i); - if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token)) - || !strncasecmp(sub->token, buf, strlen(sub->token)))) { + if (!sub) { + continue; + } + + if (!strcmp(sub->token, TECH_WILDCARD) + || !strncasecmp(sub->token, buf, strlen(sub->token)) + || !strncasecmp(sub->token, buf, strlen(sub->token))) { ast_rwlock_unlock(&tech_subscriptions_lock); - sub = NULL; /* No ref bump! */ goto match; } @@ -231,6 +240,7 @@ static int has_destination_cb(const struct ast_msg *msg) sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); if (sub) { + ao2_ref(sub, -1); goto match; } @@ -238,7 +248,6 @@ static int has_destination_cb(const struct ast_msg *msg) return 0; match: - ao2_cleanup(sub); return 1; } @@ -301,7 +310,8 @@ static int handle_msg_cb(struct ast_msg *msg) continue; } - if (!strncasecmp(sub->token, buf, strlen(sub->token))) { + if (!strcmp(sub->token, TECH_WILDCARD) + || !strncasecmp(sub->token, buf, strlen(sub->token))) { ast_rwlock_unlock(&tech_subscriptions_lock); ao2_bump(sub); endpoint_name = buf; @@ -374,7 +384,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi { struct message_subscription *sub = NULL; - if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY); } else { int i; @@ -383,7 +393,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { sub = AST_VECTOR_GET(&tech_subscriptions, i); - if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) { + if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) { ao2_bump(sub); break; } @@ -400,10 +410,6 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); endpoint = ast_endpoint_find_by_id(endpoint_id); - if (!endpoint) { - return; - } - sub = get_subscription(endpoint); if (!sub) { return; @@ -417,11 +423,11 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup); if (AST_VECTOR_SIZE(&sub->applications) == 0) { - if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { ao2_unlink(endpoint_subscriptions, sub); } else { ast_rwlock_wrlock(&tech_subscriptions_lock); - AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint), + AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD, messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP); ast_rwlock_unlock(&tech_subscriptions_lock); } @@ -429,9 +435,9 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi ao2_unlock(sub); ao2_ref(sub, -1); - ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --"); ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n", - app_name, ast_endpoint_get_id(endpoint)); + app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL"); } static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint) @@ -442,12 +448,12 @@ static struct message_subscription *get_or_create_subscription(struct ast_endpoi return sub; } - sub = message_subscription_alloc(ast_endpoint_get_id(endpoint)); + sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD); if (!sub) { return NULL; } - if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { ao2_link(endpoint_subscriptions, sub); } else { ast_rwlock_wrlock(&tech_subscriptions_lock); @@ -482,9 +488,9 @@ int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint * AST_VECTOR_APPEND(&sub->applications, tuple); ao2_unlock(sub); - ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --"); ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n", - app_name, ast_endpoint_get_id(endpoint)); + app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL"); return 0; } |