summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Jordan <mjordan@digium.com>2015-09-04 12:25:07 -0500
committerMatt Jordan <mjordan@digium.com>2015-09-22 13:27:14 -0500
commit4c9f613309d66ae6a8e5454cd53276459bcd2674 (patch)
tree2934381535e27f1ca732865d3805a78af7dc06d0
parentec514ad64dbc0014525008977c8c74c2856c9d3a (diff)
ARI: Add the ability to subscribe to all events
This patch adds the ability to subscribe to all events. There are two possible ways to accomplish this: (1) On initial WebSocket connection. This patch adds a new query parameter, 'subscribeAll'. If present and True, Asterisk will subscribe the applications to all ARI events. (2) Via the applications resource. When subscribing in this manner, an ARI client should merely specify a blank resource name, i.e., 'channels:' instead of 'channels:12354'. This will subscribe the application to all resources of the 'channels' type. ASTERISK-24870 #close Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6
-rw-r--r--CHANGES15
-rw-r--r--include/asterisk/stasis_app.h15
-rw-r--r--res/ari/resource_events.c24
-rw-r--r--res/ari/resource_events.h2
-rw-r--r--res/res_ari_events.c6
-rw-r--r--res/res_stasis.c41
-rw-r--r--res/stasis/app.c335
-rw-r--r--res/stasis/app.h15
-rw-r--r--res/stasis/messaging.c44
-rw-r--r--rest-api/api-docs/events.json8
10 files changed, 363 insertions, 142 deletions
diff --git a/CHANGES b/CHANGES
index c24919517..0c467a803 100644
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,21 @@ Dialplan Functions
return the SIP Call-ID associated with the INVITE request that established
the PJSIP channel.
+ARI
+------------------
+ * Added the ability to subscribe to all ARI events in Asterisk, regardless
+ of whether the application 'controls' the resource. This is useful for
+ scenarios where an ARI application merely wants to observe the system,
+ as opposed to control it. There are two ways to accomplish this:
+ (1) Via the WebSocket connection URI. A new query paramter, 'subscribeAll',
+ has been added that, when present and True, will subscribe all
+ specified applications to all ARI event sources in Asterisk.
+ (2) Via the applications resource. An ARI client can, at any time, subscribe
+ to all resources in an event source merely by not providing an explicit
+ resource. For example, subscribing to an event source of 'channels:'
+ as opposed to 'channels:12345' will subscribe the application to all
+ channels.
+
------------------------------------------------------------------------------
--- Functionality changes from Asterisk 13.4.0 to Asterisk 13.5.0 ------------
------------------------------------------------------------------------------
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h
index 567670b69..f2b07e0bf 100644
--- a/include/asterisk/stasis_app.h
+++ b/include/asterisk/stasis_app.h
@@ -92,6 +92,21 @@ struct ao2_container *stasis_app_get_all(void);
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data);
/*!
+ * \brief Register a new Stasis application that receives all Asterisk events.
+ *
+ * If an application is already registered with the given name, the old
+ * application is sent a 'replaced' message and unregistered.
+ *
+ * \param app_name Name of this application.
+ * \param handler Callback for application messages.
+ * \param data Data blob to pass to the callback. Must be AO2 managed.
+ *
+ * \return 0 for success
+ * \return -1 for error.
+ */
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data);
+
+/*!
* \brief Unregister a Stasis application.
* \param app_name Name of the application to unregister.
*/
diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c
index 09bcafc2d..71d54b494 100644
--- a/res/ari/resource_events.c
+++ b/res/ari/resource_events.c
@@ -148,9 +148,11 @@ static void app_handler(void *data, const char *app_name,
* \brief Register for all of the apps given.
* \param session Session info struct.
* \param app_name Name of application to register.
+ * \param register_handler Pointer to the application registration handler
*/
static int session_register_app(struct event_session *session,
- const char *app_name)
+ const char *app_name,
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data))
{
SCOPED_AO2LOCK(lock, session);
@@ -167,7 +169,7 @@ static int session_register_app(struct event_session *session,
return -1;
}
- stasis_app_register(app_name, app_handler, session);
+ register_handler(app_name, app_handler, session);
return 0;
}
@@ -178,6 +180,7 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session
{
int res = 0;
size_t i, j;
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data);
ast_debug(3, "/events WebSocket attempted\n");
@@ -186,13 +189,19 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session
return -1;
}
+ if (args->subscribe_all) {
+ register_handler = &stasis_app_register_all;
+ } else {
+ register_handler = &stasis_app_register;
+ }
+
for (i = 0; i < args->app_count; ++i) {
if (ast_strlen_zero(args->app[i])) {
res = -1;
break;
}
- res |= stasis_app_register(args->app[i], app_handler, NULL);
+ res |= register_handler(args->app[i], app_handler, NULL);
}
if (res) {
@@ -213,6 +222,7 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock
struct ast_json *msg;
int res;
size_t i;
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data);
ast_debug(3, "/events WebSocket connection\n");
@@ -222,12 +232,18 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock
return;
}
+ if (args->subscribe_all) {
+ register_handler = &stasis_app_register_all;
+ } else {
+ register_handler = &stasis_app_register;
+ }
+
res = 0;
for (i = 0; i < args->app_count; ++i) {
if (ast_strlen_zero(args->app[i])) {
continue;
}
- res |= session_register_app(session, args->app[i]);
+ res |= session_register_app(session, args->app[i], register_handler);
}
if (ao2_container_count(session->websocket_apps) == 0) {
diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h
index 2b631819b..c48269958 100644
--- a/res/ari/resource_events.h
+++ b/res/ari/resource_events.h
@@ -47,6 +47,8 @@ struct ast_ari_events_event_websocket_args {
size_t app_count;
/*! Parsing context for app. */
char *app_parse;
+ /*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
+ int subscribe_all;
};
/*!
diff --git a/res/res_ari_events.c b/res/res_ari_events.c
index 426538511..65bd38d5b 100644
--- a/res/res_ari_events.c
+++ b/res/res_ari_events.c
@@ -110,6 +110,9 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess
args.app[j] = (vals[j]);
}
} else
+ if (strcmp(i->name, "subscribeAll") == 0) {
+ args.subscribe_all = ast_true(i->value);
+ } else
{}
}
@@ -208,6 +211,9 @@ static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocke
args.app[j] = (vals[j]);
}
} else
+ if (strcmp(i->name, "subscribeAll") == 0) {
+ args.subscribe_all = ast_true(i->value);
+ } else
{}
}
diff --git a/res/res_stasis.c b/res/res_stasis.c
index fc34fa36f..25866d9bb 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -109,6 +109,11 @@ struct ao2_container *app_bridges_moh;
struct ao2_container *app_bridges_playback;
+/*!
+ * \internal \brief List of registered event sources.
+ */
+AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
+
static struct ast_json *stasis_end_to_json(struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize)
{
@@ -1469,7 +1474,7 @@ struct ao2_container *stasis_app_get_all(void)
return ao2_bump(apps);
}
-int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@@ -1482,8 +1487,20 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
if (app) {
app_update(app, handler, data);
} else {
- app = app_create(app_name, handler, data);
+ app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
if (app) {
+ if (all_events) {
+ struct stasis_app_event_source *source;
+ SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+ AST_LIST_TRAVERSE(&event_sources, source, next) {
+ if (!source->subscribe) {
+ continue;
+ }
+
+ source->subscribe(app, NULL);
+ }
+ }
ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
} else {
ao2_unlock(apps_registry);
@@ -1499,6 +1516,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
return 0;
}
+int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+{
+ return __stasis_app_register(app_name, handler, data, 0);
+}
+
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
+{
+ return __stasis_app_register(app_name, handler, data, 1);
+}
+
void stasis_app_unregister(const char *app_name)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@@ -1526,11 +1553,6 @@ void stasis_app_unregister(const char *app_name)
cleanup();
}
-/*!
- * \internal \brief List of registered event sources.
- */
-AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
-
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
{
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
@@ -1727,8 +1749,8 @@ static enum stasis_app_subscribe_res app_subscribe(
ast_debug(3, "%s: Checking %s\n", app_name, uri);
- if (!event_source->find ||
- (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
+ if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
+ (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
@@ -2062,6 +2084,7 @@ static int load_module(void)
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
+ .load_pri = AST_MODPRI_APP_DEPEND,
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module,
diff --git a/res/stasis/app.c b/res/stasis/app.c
index caa27abfc..353918241 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -38,6 +38,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
+#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
+#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
+
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
@@ -47,12 +51,16 @@ struct stasis_app {
struct stasis_message_router *router;
/*! Router for handling messages to the bridge all \a topic. */
struct stasis_message_router *bridge_router;
+ /*! Optional router for handling endpoint messages in 'all' subscriptions */
+ struct stasis_message_router *endpoint_router;
/*! Container of the channel forwards to this app's topic. */
struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
+ /*! Subscription model for the application */
+ enum stasis_app_subscription_model subscription_model;
/*! Name of the Stasis application */
char name[];
};
@@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app,
static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !chan) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
- forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (chan) {
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_channel_topic_cached(chan), app->topic);
- if (!forwards->topic_cached_forward) {
+ chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
@@ -156,69 +163,101 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app,
static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !bridge) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, bridge->uniqueid);
+ forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
- forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (bridge) {
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_bridge_topic_cached(bridge), app->topic);
- if (!forwards->topic_cached_forward) {
+ bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
+static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ struct stasis_app *app = data;
+
+ stasis_publish(app->topic, message);
+}
+
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ int ret = 0;
+
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+ forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
- forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
- }
+ if (endpoint) {
+ forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+ app->topic);
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_endpoint_topic_cached(endpoint), app->topic);
+
+ if (!forwards->topic_forward || !forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
+ } else {
+ /* Since endpoint subscriptions also subscribe to channels, in the case
+ * of all endpoint subscriptions, we only want messages for the endpoints.
+ * As such, we route those particular messages and then re-publish them
+ * on the app's topic.
+ */
+ ast_assert(app->endpoint_router == NULL);
+ app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
+ if (!app->endpoint_router) {
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
- forwards->topic_cached_forward = stasis_forward_all(
- ast_endpoint_topic_cached(endpoint), app->topic);
- if (!forwards->topic_cached_forward) {
- /* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
- return NULL;
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_state_type(), endpoint_state_cb, app);
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_contact_state_type(), endpoint_state_cb, app);
+
+ if (ret) {
+ ao2_ref(app->endpoint_router, -1);
+ app->endpoint_router = NULL;
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
}
- ao2_ref(forwards, +1);
return forwards;
}
@@ -260,6 +299,7 @@ static void app_dtor(void *obj)
ast_assert(app->router == NULL);
ast_assert(app->bridge_router == NULL);
+ ast_assert(app->endpoint_router == NULL);
ao2_cleanup(app->topic);
app->topic = NULL;
@@ -793,7 +833,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub,
}
}
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
@@ -806,10 +846,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
size = sizeof(*app) + strlen(name) + 1;
app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
-
if (!app) {
return NULL;
}
+ app->subscription_model = subscription_model;
app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@@ -877,7 +917,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return app;
}
-struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
+{
return app->topic;
}
@@ -930,6 +971,8 @@ void app_shutdown(struct stasis_app *app)
app->router = NULL;
stasis_message_router_unsubscribe(app->bridge_router);
app->bridge_router = NULL;
+ stasis_message_router_unsubscribe(app->endpoint_router);
+ app->endpoint_router = NULL;
}
int app_is_active(struct stasis_app *app)
@@ -1029,34 +1072,47 @@ struct ast_json *app_to_json(const struct stasis_app *app)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
int res;
- if (!app || !chan) {
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
- if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_channel(app, chan);
- if (!forwards) {
- return -1;
- }
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
- res = ao2_link_flags(app->forwards, forwards,
- OBJ_NOLOCK);
- if (!res) {
- return -1;
- }
+ forwards = ao2_find(app->forwards,
+ chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
}
- ++forwards->interested;
- ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
- return 0;
+ res = ao2_link_flags(app->forwards, forwards,
+ OBJ_NOLOCK);
+ if (!res) {
+ ao2_ref(forwards, -1);
+ return -1;
+ }
}
+
+ ++forwards->interested;
+ ast_debug(3, "Channel '%s' is %d interested in %s\n",
+ chan ? ast_channel_uniqueid(chan) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_channel(struct stasis_app *app, void *obj)
@@ -1069,6 +1125,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
+ if (!id) {
+ if (!strcmp(kind, "bridge")) {
+ id = BRIDGE_ALL;
+ } else if (!strcmp(kind, "channel")) {
+ id = CHANNEL_ALL;
+ } else if (!strcmp(kind, "endpoint")) {
+ id = ENDPOINT_ALL;
+ } else {
+ ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
+ return -1;
+ }
+ }
+
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
@@ -1095,16 +1164,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
- if (!app || !chan) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+ return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
}
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
- if (!app || !channel_id) {
+ if (!app) {
return -1;
}
@@ -1114,6 +1183,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(channel_id)) {
+ channel_id = CHANNEL_ALL;
+ }
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
@@ -1133,28 +1206,39 @@ struct stasis_app_event_source channel_event_source = {
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, bridge->uniqueid,
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_bridge(app, bridge);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ return -1;
}
-
- ++forwards->interested;
- ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
- return 0;
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Bridge '%s' is %d interested in %s\n",
+ bridge ? bridge->uniqueid : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_bridge(struct stasis_app *app, void *obj)
@@ -1164,16 +1248,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+ return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
}
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- if (!app || !bridge_id) {
+ if (!app) {
return -1;
}
@@ -1182,9 +1266,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
- return forwards != NULL;
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ if (ast_strlen_zero(bridge_id)) {
+ bridge_id = BRIDGE_ALL;
+ }
+
+ forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ return 0;
}
static void *bridge_find(const struct stasis_app *app, const char *id)
@@ -1202,31 +1303,43 @@ struct stasis_app_event_source bridge_event_source = {
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards,
+ endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_endpoint(app, endpoint);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-
- /* Subscribe for messages */
- messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
+ return -1;
}
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
- ++forwards->interested;
- ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
- return 0;
+ /* Subscribe for messages */
+ messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
+ endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_endpoint(struct stasis_app *app, void *obj)
@@ -1236,7 +1349,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
- if (!app || !endpoint_id) {
+ if (!app) {
return -1;
}
@@ -1246,6 +1359,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(endpoint_id)) {
+ endpoint_id = ENDPOINT_ALL;
+ }
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
diff --git a/res/stasis/app.h b/res/stasis/app.h
index 59574f584..2c8db1ccd 100644
--- a/res/stasis/app.h
+++ b/res/stasis/app.h
@@ -36,6 +36,19 @@
*/
struct stasis_app;
+enum stasis_app_subscription_model {
+ /*
+ * \brief An application must manually subscribe to each
+ * resource that it cares about. This is the default approach.
+ */
+ STASIS_APP_SUBSCRIBE_MANUAL,
+ /*
+ * \brief An application is automatically subscribed to all
+ * resources in Asterisk, even if it does not control them.
+ */
+ STASIS_APP_SUBSCRIBE_ALL
+};
+
/*!
* \brief Create a res_stasis application.
*
@@ -45,7 +58,7 @@ struct stasis_app;
* \return New \c res_stasis application.
* \return \c NULL on error.
*/
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data);
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model);
/*!
* \brief Tears down an application.
diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c
index fd7cf9f7b..229a3a646 100644
--- a/res/stasis/messaging.c
+++ b/res/stasis/messaging.c
@@ -38,6 +38,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "messaging.h"
/*!
+ * \brief Subscription to all technologies
+ */
+#define TECH_WILDCARD "__AST_ALL_TECH"
+
+/*!
* \brief Number of buckets for the \ref endpoint_subscriptions container
*/
#define ENDPOINTS_NUM_BUCKETS 127
@@ -219,10 +224,14 @@ static int has_destination_cb(const struct ast_msg *msg)
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
- || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+ if (!sub) {
+ continue;
+ }
+
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
- sub = NULL; /* No ref bump! */
goto match;
}
@@ -231,6 +240,7 @@ static int has_destination_cb(const struct ast_msg *msg)
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) {
+ ao2_ref(sub, -1);
goto match;
}
@@ -238,7 +248,6 @@ static int has_destination_cb(const struct ast_msg *msg)
return 0;
match:
- ao2_cleanup(sub);
return 1;
}
@@ -301,7 +310,8 @@ static int handle_msg_cb(struct ast_msg *msg)
continue;
}
- if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
ao2_bump(sub);
endpoint_name = buf;
@@ -374,7 +384,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
{
struct message_subscription *sub = NULL;
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
} else {
int i;
@@ -383,7 +393,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+ if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
ao2_bump(sub);
break;
}
@@ -400,10 +410,6 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
endpoint = ast_endpoint_find_by_id(endpoint_id);
- if (!endpoint) {
- return;
- }
-
sub = get_subscription(endpoint);
if (!sub) {
return;
@@ -417,11 +423,11 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
if (AST_VECTOR_SIZE(&sub->applications) == 0) {
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_unlink(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
- AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
ast_rwlock_unlock(&tech_subscriptions_lock);
}
@@ -429,9 +435,9 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
ao2_unlock(sub);
ao2_ref(sub, -1);
- ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
}
static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
@@ -442,12 +448,12 @@ static struct message_subscription *get_or_create_subscription(struct ast_endpoi
return sub;
}
- sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+ sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
if (!sub) {
return NULL;
}
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_link(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
@@ -482,9 +488,9 @@ int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *
AST_VECTOR_APPEND(&sub->applications, tuple);
ao2_unlock(sub);
- ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
return 0;
}
diff --git a/rest-api/api-docs/events.json b/rest-api/api-docs/events.json
index 8d74900f2..6276fc224 100644
--- a/rest-api/api-docs/events.json
+++ b/rest-api/api-docs/events.json
@@ -26,6 +26,14 @@
"required": true,
"allowMultiple": true,
"dataType": "string"
+ },
+ {
+ "name": "subscribeAll",
+ "description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.",
+ "paramType": "query",
+ "required": false,
+ "allowMultiple": false,
+ "dataType": "boolean"
}
]
}