summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES15
-rw-r--r--include/asterisk/stasis_app.h15
-rw-r--r--res/ari/resource_events.c24
-rw-r--r--res/ari/resource_events.h2
-rw-r--r--res/res_ari_events.c6
-rw-r--r--res/res_stasis.c41
-rw-r--r--res/stasis/app.c335
-rw-r--r--res/stasis/app.h15
-rw-r--r--res/stasis/messaging.c44
-rw-r--r--rest-api/api-docs/events.json8
10 files changed, 363 insertions, 142 deletions
diff --git a/CHANGES b/CHANGES
index c24919517..0c467a803 100644
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,21 @@ Dialplan Functions
return the SIP Call-ID associated with the INVITE request that established
the PJSIP channel.
+ARI
+------------------
+ * Added the ability to subscribe to all ARI events in Asterisk, regardless
+ of whether the application 'controls' the resource. This is useful for
+ scenarios where an ARI application merely wants to observe the system,
+ as opposed to control it. There are two ways to accomplish this:
+ (1) Via the WebSocket connection URI. A new query paramter, 'subscribeAll',
+ has been added that, when present and True, will subscribe all
+ specified applications to all ARI event sources in Asterisk.
+ (2) Via the applications resource. An ARI client can, at any time, subscribe
+ to all resources in an event source merely by not providing an explicit
+ resource. For example, subscribing to an event source of 'channels:'
+ as opposed to 'channels:12345' will subscribe the application to all
+ channels.
+
------------------------------------------------------------------------------
--- Functionality changes from Asterisk 13.4.0 to Asterisk 13.5.0 ------------
------------------------------------------------------------------------------
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h
index 567670b69..f2b07e0bf 100644
--- a/include/asterisk/stasis_app.h
+++ b/include/asterisk/stasis_app.h
@@ -92,6 +92,21 @@ struct ao2_container *stasis_app_get_all(void);
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data);
/*!
+ * \brief Register a new Stasis application that receives all Asterisk events.
+ *
+ * If an application is already registered with the given name, the old
+ * application is sent a 'replaced' message and unregistered.
+ *
+ * \param app_name Name of this application.
+ * \param handler Callback for application messages.
+ * \param data Data blob to pass to the callback. Must be AO2 managed.
+ *
+ * \return 0 for success
+ * \return -1 for error.
+ */
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data);
+
+/*!
* \brief Unregister a Stasis application.
* \param app_name Name of the application to unregister.
*/
diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c
index 09bcafc2d..71d54b494 100644
--- a/res/ari/resource_events.c
+++ b/res/ari/resource_events.c
@@ -148,9 +148,11 @@ static void app_handler(void *data, const char *app_name,
* \brief Register for all of the apps given.
* \param session Session info struct.
* \param app_name Name of application to register.
+ * \param register_handler Pointer to the application registration handler
*/
static int session_register_app(struct event_session *session,
- const char *app_name)
+ const char *app_name,
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data))
{
SCOPED_AO2LOCK(lock, session);
@@ -167,7 +169,7 @@ static int session_register_app(struct event_session *session,
return -1;
}
- stasis_app_register(app_name, app_handler, session);
+ register_handler(app_name, app_handler, session);
return 0;
}
@@ -178,6 +180,7 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session
{
int res = 0;
size_t i, j;
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data);
ast_debug(3, "/events WebSocket attempted\n");
@@ -186,13 +189,19 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session
return -1;
}
+ if (args->subscribe_all) {
+ register_handler = &stasis_app_register_all;
+ } else {
+ register_handler = &stasis_app_register;
+ }
+
for (i = 0; i < args->app_count; ++i) {
if (ast_strlen_zero(args->app[i])) {
res = -1;
break;
}
- res |= stasis_app_register(args->app[i], app_handler, NULL);
+ res |= register_handler(args->app[i], app_handler, NULL);
}
if (res) {
@@ -213,6 +222,7 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock
struct ast_json *msg;
int res;
size_t i;
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data);
ast_debug(3, "/events WebSocket connection\n");
@@ -222,12 +232,18 @@ void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websock
return;
}
+ if (args->subscribe_all) {
+ register_handler = &stasis_app_register_all;
+ } else {
+ register_handler = &stasis_app_register;
+ }
+
res = 0;
for (i = 0; i < args->app_count; ++i) {
if (ast_strlen_zero(args->app[i])) {
continue;
}
- res |= session_register_app(session, args->app[i]);
+ res |= session_register_app(session, args->app[i], register_handler);
}
if (ao2_container_count(session->websocket_apps) == 0) {
diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h
index 2b631819b..c48269958 100644
--- a/res/ari/resource_events.h
+++ b/res/ari/resource_events.h
@@ -47,6 +47,8 @@ struct ast_ari_events_event_websocket_args {
size_t app_count;
/*! Parsing context for app. */
char *app_parse;
+ /*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
+ int subscribe_all;
};
/*!
diff --git a/res/res_ari_events.c b/res/res_ari_events.c
index 426538511..65bd38d5b 100644
--- a/res/res_ari_events.c
+++ b/res/res_ari_events.c
@@ -110,6 +110,9 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess
args.app[j] = (vals[j]);
}
} else
+ if (strcmp(i->name, "subscribeAll") == 0) {
+ args.subscribe_all = ast_true(i->value);
+ } else
{}
}
@@ -208,6 +211,9 @@ static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocke
args.app[j] = (vals[j]);
}
} else
+ if (strcmp(i->name, "subscribeAll") == 0) {
+ args.subscribe_all = ast_true(i->value);
+ } else
{}
}
diff --git a/res/res_stasis.c b/res/res_stasis.c
index fc34fa36f..25866d9bb 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -109,6 +109,11 @@ struct ao2_container *app_bridges_moh;
struct ao2_container *app_bridges_playback;
+/*!
+ * \internal \brief List of registered event sources.
+ */
+AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
+
static struct ast_json *stasis_end_to_json(struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize)
{
@@ -1469,7 +1474,7 @@ struct ao2_container *stasis_app_get_all(void)
return ao2_bump(apps);
}
-int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@@ -1482,8 +1487,20 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
if (app) {
app_update(app, handler, data);
} else {
- app = app_create(app_name, handler, data);
+ app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
if (app) {
+ if (all_events) {
+ struct stasis_app_event_source *source;
+ SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+ AST_LIST_TRAVERSE(&event_sources, source, next) {
+ if (!source->subscribe) {
+ continue;
+ }
+
+ source->subscribe(app, NULL);
+ }
+ }
ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
} else {
ao2_unlock(apps_registry);
@@ -1499,6 +1516,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
return 0;
}
+int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+{
+ return __stasis_app_register(app_name, handler, data, 0);
+}
+
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
+{
+ return __stasis_app_register(app_name, handler, data, 1);
+}
+
void stasis_app_unregister(const char *app_name)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@@ -1526,11 +1553,6 @@ void stasis_app_unregister(const char *app_name)
cleanup();
}
-/*!
- * \internal \brief List of registered event sources.
- */
-AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
-
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
{
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
@@ -1727,8 +1749,8 @@ static enum stasis_app_subscribe_res app_subscribe(
ast_debug(3, "%s: Checking %s\n", app_name, uri);
- if (!event_source->find ||
- (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
+ if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
+ (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
@@ -2062,6 +2084,7 @@ static int load_module(void)
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
+ .load_pri = AST_MODPRI_APP_DEPEND,
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module,
diff --git a/res/stasis/app.c b/res/stasis/app.c
index caa27abfc..353918241 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -38,6 +38,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
+#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
+#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
+
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
@@ -47,12 +51,16 @@ struct stasis_app {
struct stasis_message_router *router;
/*! Router for handling messages to the bridge all \a topic. */
struct stasis_message_router *bridge_router;
+ /*! Optional router for handling endpoint messages in 'all' subscriptions */
+ struct stasis_message_router *endpoint_router;
/*! Container of the channel forwards to this app's topic. */
struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
+ /*! Subscription model for the application */
+ enum stasis_app_subscription_model subscription_model;
/*! Name of the Stasis application */
char name[];
};
@@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app,
static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !chan) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
- forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (chan) {
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_channel_topic_cached(chan), app->topic);
- if (!forwards->topic_cached_forward) {
+ chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
@@ -156,69 +163,101 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app,
static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !bridge) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, bridge->uniqueid);
+ forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
- forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (bridge) {
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_bridge_topic_cached(bridge), app->topic);
- if (!forwards->topic_cached_forward) {
+ bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
+static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ struct stasis_app *app = data;
+
+ stasis_publish(app->topic, message);
+}
+
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ int ret = 0;
+
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+ forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
- forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
- }
+ if (endpoint) {
+ forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+ app->topic);
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_endpoint_topic_cached(endpoint), app->topic);
+
+ if (!forwards->topic_forward || !forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
+ } else {
+ /* Since endpoint subscriptions also subscribe to channels, in the case
+ * of all endpoint subscriptions, we only want messages for the endpoints.
+ * As such, we route those particular messages and then re-publish them
+ * on the app's topic.
+ */
+ ast_assert(app->endpoint_router == NULL);
+ app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
+ if (!app->endpoint_router) {
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
- forwards->topic_cached_forward = stasis_forward_all(
- ast_endpoint_topic_cached(endpoint), app->topic);
- if (!forwards->topic_cached_forward) {
- /* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
- return NULL;
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_state_type(), endpoint_state_cb, app);
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_contact_state_type(), endpoint_state_cb, app);
+
+ if (ret) {
+ ao2_ref(app->endpoint_router, -1);
+ app->endpoint_router = NULL;
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
}
- ao2_ref(forwards, +1);
return forwards;
}
@@ -260,6 +299,7 @@ static void app_dtor(void *obj)
ast_assert(app->router == NULL);
ast_assert(app->bridge_router == NULL);
+ ast_assert(app->endpoint_router == NULL);
ao2_cleanup(app->topic);
app->topic = NULL;
@@ -793,7 +833,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub,
}
}
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
@@ -806,10 +846,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
size = sizeof(*app) + strlen(name) + 1;
app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
-
if (!app) {
return NULL;
}
+ app->subscription_model = subscription_model;
app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@@ -877,7 +917,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return app;
}
-struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
+{
return app->topic;
}
@@ -930,6 +971,8 @@ void app_shutdown(struct stasis_app *app)
app->router = NULL;
stasis_message_router_unsubscribe(app->bridge_router);
app->bridge_router = NULL;
+ stasis_message_router_unsubscribe(app->endpoint_router);
+ app->endpoint_router = NULL;
}
int app_is_active(struct stasis_app *app)
@@ -1029,34 +1072,47 @@ struct ast_json *app_to_json(const struct stasis_app *app)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
int res;
- if (!app || !chan) {
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
- if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_channel(app, chan);
- if (!forwards) {
- return -1;
- }
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
- res = ao2_link_flags(app->forwards, forwards,
- OBJ_NOLOCK);
- if (!res) {
- return -1;
- }
+ forwards = ao2_find(app->forwards,
+ chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
}
- ++forwards->interested;
- ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
- return 0;
+ res = ao2_link_flags(app->forwards, forwards,
+ OBJ_NOLOCK);
+ if (!res) {
+ ao2_ref(forwards, -1);
+ return -1;
+ }
}
+
+ ++forwards->interested;
+ ast_debug(3, "Channel '%s' is %d interested in %s\n",
+ chan ? ast_channel_uniqueid(chan) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_channel(struct stasis_app *app, void *obj)
@@ -1069,6 +1125,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
+ if (!id) {
+ if (!strcmp(kind, "bridge")) {
+ id = BRIDGE_ALL;
+ } else if (!strcmp(kind, "channel")) {
+ id = CHANNEL_ALL;
+ } else if (!strcmp(kind, "endpoint")) {
+ id = ENDPOINT_ALL;
+ } else {
+ ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
+ return -1;
+ }
+ }
+
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
@@ -1095,16 +1164,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
- if (!app || !chan) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+ return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
}
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
- if (!app || !channel_id) {
+ if (!app) {
return -1;
}
@@ -1114,6 +1183,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(channel_id)) {
+ channel_id = CHANNEL_ALL;
+ }
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
@@ -1133,28 +1206,39 @@ struct stasis_app_event_source channel_event_source = {
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, bridge->uniqueid,
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_bridge(app, bridge);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ return -1;
}
-
- ++forwards->interested;
- ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
- return 0;
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Bridge '%s' is %d interested in %s\n",
+ bridge ? bridge->uniqueid : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_bridge(struct stasis_app *app, void *obj)
@@ -1164,16 +1248,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+ return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
}
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- if (!app || !bridge_id) {
+ if (!app) {
return -1;
}
@@ -1182,9 +1266,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
- return forwards != NULL;
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ if (ast_strlen_zero(bridge_id)) {
+ bridge_id = BRIDGE_ALL;
+ }
+
+ forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ return 0;
}
static void *bridge_find(const struct stasis_app *app, const char *id)
@@ -1202,31 +1303,43 @@ struct stasis_app_event_source bridge_event_source = {
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards,
+ endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_endpoint(app, endpoint);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-
- /* Subscribe for messages */
- messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
+ return -1;
}
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
- ++forwards->interested;
- ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
- return 0;
+ /* Subscribe for messages */
+ messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
+ endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_endpoint(struct stasis_app *app, void *obj)
@@ -1236,7 +1349,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
- if (!app || !endpoint_id) {
+ if (!app) {
return -1;
}
@@ -1246,6 +1359,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(endpoint_id)) {
+ endpoint_id = ENDPOINT_ALL;
+ }
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
diff --git a/res/stasis/app.h b/res/stasis/app.h
index 59574f584..2c8db1ccd 100644
--- a/res/stasis/app.h
+++ b/res/stasis/app.h
@@ -36,6 +36,19 @@
*/
struct stasis_app;
+enum stasis_app_subscription_model {
+ /*
+ * \brief An application must manually subscribe to each
+ * resource that it cares about. This is the default approach.
+ */
+ STASIS_APP_SUBSCRIBE_MANUAL,
+ /*
+ * \brief An application is automatically subscribed to all
+ * resources in Asterisk, even if it does not control them.
+ */
+ STASIS_APP_SUBSCRIBE_ALL
+};
+
/*!
* \brief Create a res_stasis application.
*
@@ -45,7 +58,7 @@ struct stasis_app;
* \return New \c res_stasis application.
* \return \c NULL on error.
*/
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data);
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model);
/*!
* \brief Tears down an application.
diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c
index fd7cf9f7b..229a3a646 100644
--- a/res/stasis/messaging.c
+++ b/res/stasis/messaging.c
@@ -38,6 +38,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "messaging.h"
/*!
+ * \brief Subscription to all technologies
+ */
+#define TECH_WILDCARD "__AST_ALL_TECH"
+
+/*!
* \brief Number of buckets for the \ref endpoint_subscriptions container
*/
#define ENDPOINTS_NUM_BUCKETS 127
@@ -219,10 +224,14 @@ static int has_destination_cb(const struct ast_msg *msg)
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
- || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+ if (!sub) {
+ continue;
+ }
+
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
- sub = NULL; /* No ref bump! */
goto match;
}
@@ -231,6 +240,7 @@ static int has_destination_cb(const struct ast_msg *msg)
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) {
+ ao2_ref(sub, -1);
goto match;
}
@@ -238,7 +248,6 @@ static int has_destination_cb(const struct ast_msg *msg)
return 0;
match:
- ao2_cleanup(sub);
return 1;
}
@@ -301,7 +310,8 @@ static int handle_msg_cb(struct ast_msg *msg)
continue;
}
- if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
ao2_bump(sub);
endpoint_name = buf;
@@ -374,7 +384,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
{
struct message_subscription *sub = NULL;
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
} else {
int i;
@@ -383,7 +393,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+ if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
ao2_bump(sub);
break;
}
@@ -400,10 +410,6 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
endpoint = ast_endpoint_find_by_id(endpoint_id);
- if (!endpoint) {
- return;
- }
-
sub = get_subscription(endpoint);
if (!sub) {
return;
@@ -417,11 +423,11 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
if (AST_VECTOR_SIZE(&sub->applications) == 0) {
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_unlink(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
- AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
ast_rwlock_unlock(&tech_subscriptions_lock);
}
@@ -429,9 +435,9 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
ao2_unlock(sub);
ao2_ref(sub, -1);
- ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
}
static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
@@ -442,12 +448,12 @@ static struct message_subscription *get_or_create_subscription(struct ast_endpoi
return sub;
}
- sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+ sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
if (!sub) {
return NULL;
}
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_link(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
@@ -482,9 +488,9 @@ int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *
AST_VECTOR_APPEND(&sub->applications, tuple);
ao2_unlock(sub);
- ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
return 0;
}
diff --git a/rest-api/api-docs/events.json b/rest-api/api-docs/events.json
index 8d74900f2..6276fc224 100644
--- a/rest-api/api-docs/events.json
+++ b/rest-api/api-docs/events.json
@@ -26,6 +26,14 @@
"required": true,
"allowMultiple": true,
"dataType": "string"
+ },
+ {
+ "name": "subscribeAll",
+ "description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.",
+ "paramType": "query",
+ "required": false,
+ "allowMultiple": false,
+ "dataType": "boolean"
}
]
}