summaryrefslogtreecommitdiff
path: root/res/stasis/app.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/stasis/app.c')
-rw-r--r--res/stasis/app.c335
1 files changed, 226 insertions, 109 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c
index caa27abfc..353918241 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -38,6 +38,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
+#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
+#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
+
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
@@ -47,12 +51,16 @@ struct stasis_app {
struct stasis_message_router *router;
/*! Router for handling messages to the bridge all \a topic. */
struct stasis_message_router *bridge_router;
+ /*! Optional router for handling endpoint messages in 'all' subscriptions */
+ struct stasis_message_router *endpoint_router;
/*! Container of the channel forwards to this app's topic. */
struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
+ /*! Subscription model for the application */
+ enum stasis_app_subscription_model subscription_model;
/*! Name of the Stasis application */
char name[];
};
@@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app,
static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !chan) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
- forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (chan) {
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_channel_topic_cached(chan), app->topic);
- if (!forwards->topic_cached_forward) {
+ chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
@@ -156,69 +163,101 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app,
static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !bridge) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, bridge->uniqueid);
+ forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
- forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (bridge) {
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_bridge_topic_cached(bridge), app->topic);
- if (!forwards->topic_cached_forward) {
+ bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
+static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ struct stasis_app *app = data;
+
+ stasis_publish(app->topic, message);
+}
+
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ int ret = 0;
+
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+ forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
- forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
- }
+ if (endpoint) {
+ forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+ app->topic);
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_endpoint_topic_cached(endpoint), app->topic);
+
+ if (!forwards->topic_forward || !forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
+ } else {
+ /* Since endpoint subscriptions also subscribe to channels, in the case
+ * of all endpoint subscriptions, we only want messages for the endpoints.
+ * As such, we route those particular messages and then re-publish them
+ * on the app's topic.
+ */
+ ast_assert(app->endpoint_router == NULL);
+ app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
+ if (!app->endpoint_router) {
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
- forwards->topic_cached_forward = stasis_forward_all(
- ast_endpoint_topic_cached(endpoint), app->topic);
- if (!forwards->topic_cached_forward) {
- /* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
- return NULL;
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_state_type(), endpoint_state_cb, app);
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_contact_state_type(), endpoint_state_cb, app);
+
+ if (ret) {
+ ao2_ref(app->endpoint_router, -1);
+ app->endpoint_router = NULL;
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
}
- ao2_ref(forwards, +1);
return forwards;
}
@@ -260,6 +299,7 @@ static void app_dtor(void *obj)
ast_assert(app->router == NULL);
ast_assert(app->bridge_router == NULL);
+ ast_assert(app->endpoint_router == NULL);
ao2_cleanup(app->topic);
app->topic = NULL;
@@ -793,7 +833,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub,
}
}
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
@@ -806,10 +846,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
size = sizeof(*app) + strlen(name) + 1;
app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
-
if (!app) {
return NULL;
}
+ app->subscription_model = subscription_model;
app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@@ -877,7 +917,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return app;
}
-struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
+{
return app->topic;
}
@@ -930,6 +971,8 @@ void app_shutdown(struct stasis_app *app)
app->router = NULL;
stasis_message_router_unsubscribe(app->bridge_router);
app->bridge_router = NULL;
+ stasis_message_router_unsubscribe(app->endpoint_router);
+ app->endpoint_router = NULL;
}
int app_is_active(struct stasis_app *app)
@@ -1029,34 +1072,47 @@ struct ast_json *app_to_json(const struct stasis_app *app)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
int res;
- if (!app || !chan) {
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
- if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_channel(app, chan);
- if (!forwards) {
- return -1;
- }
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
- res = ao2_link_flags(app->forwards, forwards,
- OBJ_NOLOCK);
- if (!res) {
- return -1;
- }
+ forwards = ao2_find(app->forwards,
+ chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
}
- ++forwards->interested;
- ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
- return 0;
+ res = ao2_link_flags(app->forwards, forwards,
+ OBJ_NOLOCK);
+ if (!res) {
+ ao2_ref(forwards, -1);
+ return -1;
+ }
}
+
+ ++forwards->interested;
+ ast_debug(3, "Channel '%s' is %d interested in %s\n",
+ chan ? ast_channel_uniqueid(chan) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_channel(struct stasis_app *app, void *obj)
@@ -1069,6 +1125,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
+ if (!id) {
+ if (!strcmp(kind, "bridge")) {
+ id = BRIDGE_ALL;
+ } else if (!strcmp(kind, "channel")) {
+ id = CHANNEL_ALL;
+ } else if (!strcmp(kind, "endpoint")) {
+ id = ENDPOINT_ALL;
+ } else {
+ ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
+ return -1;
+ }
+ }
+
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
@@ -1095,16 +1164,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
- if (!app || !chan) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+ return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
}
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
- if (!app || !channel_id) {
+ if (!app) {
return -1;
}
@@ -1114,6 +1183,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(channel_id)) {
+ channel_id = CHANNEL_ALL;
+ }
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
@@ -1133,28 +1206,39 @@ struct stasis_app_event_source channel_event_source = {
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, bridge->uniqueid,
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_bridge(app, bridge);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ return -1;
}
-
- ++forwards->interested;
- ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
- return 0;
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Bridge '%s' is %d interested in %s\n",
+ bridge ? bridge->uniqueid : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_bridge(struct stasis_app *app, void *obj)
@@ -1164,16 +1248,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+ return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
}
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- if (!app || !bridge_id) {
+ if (!app) {
return -1;
}
@@ -1182,9 +1266,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
- return forwards != NULL;
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ if (ast_strlen_zero(bridge_id)) {
+ bridge_id = BRIDGE_ALL;
+ }
+
+ forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ return 0;
}
static void *bridge_find(const struct stasis_app *app, const char *id)
@@ -1202,31 +1303,43 @@ struct stasis_app_event_source bridge_event_source = {
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards,
+ endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_endpoint(app, endpoint);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-
- /* Subscribe for messages */
- messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
+ return -1;
}
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
- ++forwards->interested;
- ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
- return 0;
+ /* Subscribe for messages */
+ messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
+ endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_endpoint(struct stasis_app *app, void *obj)
@@ -1236,7 +1349,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
- if (!app || !endpoint_id) {
+ if (!app) {
return -1;
}
@@ -1246,6 +1359,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(endpoint_id)) {
+ endpoint_id = ENDPOINT_ALL;
+ }
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}