diff options
Diffstat (limited to 'res/stasis/app.c')
-rw-r--r-- | res/stasis/app.c | 335 |
1 files changed, 226 insertions, 109 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c index caa27abfc..353918241 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -38,6 +38,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_endpoints.h" #include "asterisk/stasis_message_router.h" +#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC" +#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC" +#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC" + static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate); struct stasis_app { @@ -47,12 +51,16 @@ struct stasis_app { struct stasis_message_router *router; /*! Router for handling messages to the bridge all \a topic. */ struct stasis_message_router *bridge_router; + /*! Optional router for handling endpoint messages in 'all' subscriptions */ + struct stasis_message_router *endpoint_router; /*! Container of the channel forwards to this app's topic. */ struct ao2_container *forwards; /*! Callback function for this application. */ stasis_app_cb handler; /*! Opaque data to hand to callback function. */ void *data; + /*! Subscription model for the application */ + enum stasis_app_subscription_model subscription_model; /*! Name of the Stasis application */ char name[]; }; @@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app, static struct app_forwards *forwards_create_channel(struct stasis_app *app, struct ast_channel *chan) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + struct app_forwards *forwards; - if (!app || !chan) { + if (!app) { return NULL; } - forwards = forwards_create(app, ast_channel_uniqueid(chan)); + forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_CHANNEL; - forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), - app->topic); - if (!forwards->topic_forward) { - return NULL; + if (chan) { + forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), + app->topic); } - forwards->topic_cached_forward = stasis_forward_all( - ast_channel_topic_cached(chan), app->topic); - if (!forwards->topic_cached_forward) { + chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(), + app->topic); + + if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); return NULL; } - ao2_ref(forwards, +1); return forwards; } @@ -156,69 +163,101 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app, static struct app_forwards *forwards_create_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + struct app_forwards *forwards; - if (!app || !bridge) { + if (!app) { return NULL; } - forwards = forwards_create(app, bridge->uniqueid); + forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_BRIDGE; - forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), - app->topic); - if (!forwards->topic_forward) { - return NULL; + if (bridge) { + forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), + app->topic); } - forwards->topic_cached_forward = stasis_forward_all( - ast_bridge_topic_cached(bridge), app->topic); - if (!forwards->topic_cached_forward) { + bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(), + app->topic); + + if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); return NULL; } - ao2_ref(forwards, +1); return forwards; } +static void endpoint_state_cb(void *data, struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct stasis_app *app = data; + + stasis_publish(app->topic, message); +} + /*! Forward a endpoint's topics to an app */ static struct app_forwards *forwards_create_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - if (!app || !endpoint) { + struct app_forwards *forwards; + int ret = 0; + + if (!app) { return NULL; } - forwards = forwards_create(app, ast_endpoint_get_id(endpoint)); + forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_ENDPOINT; - forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint), - app->topic); - if (!forwards->topic_forward) { - return NULL; - } + if (endpoint) { + forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint), + app->topic); + forwards->topic_cached_forward = stasis_forward_all( + ast_endpoint_topic_cached(endpoint), app->topic); + + if (!forwards->topic_forward || !forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); + return NULL; + } + } else { + /* Since endpoint subscriptions also subscribe to channels, in the case + * of all endpoint subscriptions, we only want messages for the endpoints. + * As such, we route those particular messages and then re-publish them + * on the app's topic. + */ + ast_assert(app->endpoint_router == NULL); + app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached()); + if (!app->endpoint_router) { + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); + return NULL; + } - forwards->topic_cached_forward = stasis_forward_all( - ast_endpoint_topic_cached(endpoint), app->topic); - if (!forwards->topic_cached_forward) { - /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; - return NULL; + ret |= stasis_message_router_add(app->endpoint_router, + ast_endpoint_state_type(), endpoint_state_cb, app); + ret |= stasis_message_router_add(app->endpoint_router, + ast_endpoint_contact_state_type(), endpoint_state_cb, app); + + if (ret) { + ao2_ref(app->endpoint_router, -1); + app->endpoint_router = NULL; + ao2_ref(forwards, -1); + return NULL; + } } - ao2_ref(forwards, +1); return forwards; } @@ -260,6 +299,7 @@ static void app_dtor(void *obj) ast_assert(app->router == NULL); ast_assert(app->bridge_router == NULL); + ast_assert(app->endpoint_router == NULL); ao2_cleanup(app->topic); app->topic = NULL; @@ -793,7 +833,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub, } } -struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data) +struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model) { RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); size_t size; @@ -806,10 +846,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat size = sizeof(*app) + strlen(name) + 1; app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX); - if (!app) { return NULL; } + app->subscription_model = subscription_model; app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, @@ -877,7 +917,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat return app; } -struct stasis_topic *ast_app_get_topic(struct stasis_app *app) { +struct stasis_topic *ast_app_get_topic(struct stasis_app *app) +{ return app->topic; } @@ -930,6 +971,8 @@ void app_shutdown(struct stasis_app *app) app->router = NULL; stasis_message_router_unsubscribe(app->bridge_router); app->bridge_router = NULL; + stasis_message_router_unsubscribe(app->endpoint_router); + app->endpoint_router = NULL; } int app_is_active(struct stasis_app *app) @@ -1029,34 +1072,47 @@ struct ast_json *app_to_json(const struct stasis_app *app) int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); int res; - if (!app || !chan) { + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan), - OBJ_SEARCH_KEY | OBJ_NOLOCK); - if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_channel(app, chan); - if (!forwards) { - return -1; - } + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } - res = ao2_link_flags(app->forwards, forwards, - OBJ_NOLOCK); - if (!res) { - return -1; - } + forwards = ao2_find(app->forwards, + chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_channel(app, chan); + if (!forwards) { + return -1; } - ++forwards->interested; - ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name); - return 0; + res = ao2_link_flags(app->forwards, forwards, + OBJ_NOLOCK); + if (!res) { + ao2_ref(forwards, -1); + return -1; + } } + + ++forwards->interested; + ast_debug(3, "Channel '%s' is %d interested in %s\n", + chan ? ast_channel_uniqueid(chan) : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_channel(struct stasis_app *app, void *obj) @@ -1069,6 +1125,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); SCOPED_AO2LOCK(lock, app->forwards); + if (!id) { + if (!strcmp(kind, "bridge")) { + id = BRIDGE_ALL; + } else if (!strcmp(kind, "channel")) { + id = CHANNEL_ALL; + } else if (!strcmp(kind, "endpoint")) { + id = ENDPOINT_ALL; + } else { + ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind); + return -1; + } + } + forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (!forwards) { ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id); @@ -1095,16 +1164,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan) { - if (!app || !chan) { + if (!app) { return -1; } - return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan)); + return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL); } int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id) { - if (!app || !channel_id) { + if (!app) { return -1; } @@ -1114,6 +1183,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id) int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id) { RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (ast_strlen_zero(channel_id)) { + channel_id = CHANNEL_ALL; + } forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY); return forwards != NULL; } @@ -1133,28 +1206,39 @@ struct stasis_app_event_source channel_event_source = { int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - if (!app || !bridge) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, bridge->uniqueid, - OBJ_SEARCH_KEY | OBJ_NOLOCK); + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } + forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_bridge(app, bridge); if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_bridge(app, bridge); - if (!forwards) { - return -1; - } - ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); + return -1; } - - ++forwards->interested; - ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name); - return 0; + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); } + + ++forwards->interested; + ast_debug(3, "Bridge '%s' is %d interested in %s\n", + bridge ? bridge->uniqueid : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_bridge(struct stasis_app *app, void *obj) @@ -1164,16 +1248,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj) int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - if (!app || !bridge) { + if (!app) { return -1; } - return app_unsubscribe_bridge_id(app, bridge->uniqueid); + return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL); } int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id) { - if (!app || !bridge_id) { + if (!app) { return -1; } @@ -1182,9 +1266,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id) int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY); - return forwards != NULL; + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 1; + } + + if (ast_strlen_zero(bridge_id)) { + bridge_id = BRIDGE_ALL; + } + + forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 1; + } + + return 0; } static void *bridge_find(const struct stasis_app *app, const char *id) @@ -1202,31 +1303,43 @@ struct stasis_app_event_source bridge_event_source = { int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint) { - if (!app || !endpoint) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint), - OBJ_SEARCH_KEY | OBJ_NOLOCK); + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } + forwards = ao2_find(app->forwards, + endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_endpoint(app, endpoint); if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_endpoint(app, endpoint); - if (!forwards) { - return -1; - } - ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); - - /* Subscribe for messages */ - messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); + return -1; } + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); - ++forwards->interested; - ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name); - return 0; + /* Subscribe for messages */ + messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); } + + ++forwards->interested; + ast_debug(3, "Endpoint '%s' is %d interested in %s\n", + endpoint ? ast_endpoint_get_id(endpoint) : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_endpoint(struct stasis_app *app, void *obj) @@ -1236,7 +1349,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj) int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) { - if (!app || !endpoint_id) { + if (!app) { return -1; } @@ -1246,6 +1359,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id) { RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (ast_strlen_zero(endpoint_id)) { + endpoint_id = ENDPOINT_ALL; + } forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY); return forwards != NULL; } |