summaryrefslogtreecommitdiff
path: root/res/stasis
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-08-27 19:19:36 +0000
committerDavid M. Lee <dlee@digium.com>2013-08-27 19:19:36 +0000
commit451993f4f57331fae84a9c04b31fd5663ccfc593 (patch)
treeb9baca6011939f78f161e64e1f8382a6acaa2c82 /res/stasis
parent3540c7ac6eca33609c294baa1fbcf56722536f6a (diff)
ARI: WebSocket event cleanup
Stasis events (which get distributed over the ARI WebSocket) are created by subscribing to the channel_all_cached and bridge_all_cached topics, filtering out events for channels/bridges currently subscribed to. There are two issues with that. First was a race condition, where messages in-flight to the master subscribe-to-all-things topic would get sent out, even though the events happened before the channel was put into Stasis. Secondly, as the number of channels and bridges grow in the system, the work spent filtering messages becomes excessive. Since r395954, individual channels and bridges have caching topics, and can be subscribed to individually. This patch takes advantage, so that channels and bridges are subscribed to on demand, instead of filtering the global topics. The one case where filtering is still required is handling BridgeMerge messages, which are published directly to the bridge_all topic. Other than the change to how subscriptions work, this patch mostly just moves code around. Most of the work generating JSON objects from messages was moved to .to_json handlers on the message types. The callback functions handling app subscriptions were moved from res_stasis (b/c they were global to the model) to stasis/app.c (b/c they are local to the app now). (closes issue ASTERISK-21969) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/2754/ ........ Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res/stasis')
-rw-r--r--res/stasis/app.c639
-rw-r--r--res/stasis/app.h89
2 files changed, 584 insertions, 144 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 6f80ed64a..8abe0c19c 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -29,132 +29,519 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "app.h"
+#include "asterisk/callerid.h"
#include "asterisk/stasis_app.h"
+#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
-
-/*!
- * \brief Number of buckets for the channels container for app instances. Remember
- * to keep it a prime number!
- */
-#define APP_CHANNELS_BUCKETS 7
-
-/*!
- * \brief Number of buckets for the bridges container for app instances. Remember
- * to keep it a prime number!
- */
-#define APP_BRIDGES_BUCKETS 7
+#include "asterisk/stasis_message_router.h"
struct app {
+ /*! Aggregation topic for this application. */
+ struct stasis_topic *topic;
+ /*! Router for handling messages forwarded to \a topic. */
+ struct stasis_message_router *router;
+ /*! Subscription to watch for bridge merge messages */
+ struct stasis_subscription *bridge_merge_sub;
+ /*! Container of the channel forwards to this app's topic. */
+ struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
- /*! List of channel identifiers this app instance is interested in */
- struct ao2_container *channels;
- /*! List of bridge identifiers this app instance owns */
- struct ao2_container *bridges;
/*! Name of the Stasis application */
char name[];
};
+/*! Subscription info for a particular channel/bridge. */
+struct app_forwards {
+ /*! Count of number of times this channel/bridge has been subscribed */
+ int interested;
+
+ /*! Forward for the regular topic */
+ struct stasis_subscription *topic_forward;
+ /*! Forward for the caching topic */
+ struct stasis_subscription *topic_cached_forward;
+
+ /*! Unique id of the object being forwarded */
+ char id[];
+};
+
+static void forwards_dtor(void *obj)
+{
+ struct app_forwards *forwards = obj;
+
+ ast_assert(forwards->topic_forward == NULL);
+ ast_assert(forwards->topic_cached_forward == NULL);
+}
+
+static void forwards_unsubscribe(struct app_forwards *forwards)
+{
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ stasis_unsubscribe(forwards->topic_cached_forward);
+ forwards->topic_cached_forward = NULL;
+}
+
+static struct app_forwards *forwards_create(struct app *app,
+ const char *id)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || ast_strlen_zero(id)) {
+ return NULL;
+ }
+
+ forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+ if (!forwards) {
+ return NULL;
+ }
+
+ strcpy(forwards->id, id);
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+/*! Forward a channel's topics to an app */
+static struct app_forwards *forwards_create_channel(struct app *app,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !chan) {
+ return NULL;
+ }
+
+ forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ if (!forwards) {
+ return NULL;
+ }
+
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
+ if (!forwards->topic_forward) {
+ return NULL;
+ }
+
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_channel_topic_cached(chan), app->topic);
+ if (!forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ return NULL;
+ }
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+/*! Forward a bridge's topics to an app */
+static struct app_forwards *forwards_create_bridge(struct app *app,
+ struct ast_bridge *bridge)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !bridge) {
+ return NULL;
+ }
+
+ forwards = forwards_create(app, bridge->uniqueid);
+ if (!forwards) {
+ return NULL;
+ }
+
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
+ if (!forwards->topic_forward) {
+ return NULL;
+ }
+
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_bridge_topic_cached(bridge), app->topic);
+ if (!forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ return NULL;
+ }
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
+{
+ const struct app_forwards *object_left = obj_left;
+ const struct app_forwards *object_right = obj_right;
+ const char *right_key = obj_right;
+ int cmp;
+
+ switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+ case OBJ_POINTER:
+ right_key = object_right->id;
+ /* Fall through */
+ case OBJ_KEY:
+ cmp = strcmp(object_left->id, right_key);
+ break;
+ case OBJ_PARTIAL_KEY:
+ /*
+ * We could also use a partial key struct containing a length
+ * so strlen() does not get called for every comparison instead.
+ */
+ cmp = strncmp(object_left->id, right_key, strlen(right_key));
+ break;
+ default:
+ /* Sort can only work on something with a full or partial key. */
+ ast_assert(0);
+ cmp = 0;
+ break;
+ }
+ return cmp;
+}
+
static void app_dtor(void *obj)
{
struct app *app = obj;
+ ast_verb(1, "Destroying Stasis app %s\n", app->name);
+
+ ast_assert(app->router == NULL);
+ ast_assert(app->bridge_merge_sub == NULL);
+
+ ao2_cleanup(app->topic);
+ app->topic = NULL;
+ ao2_cleanup(app->forwards);
+ app->forwards = NULL;
ao2_cleanup(app->data);
app->data = NULL;
- ao2_cleanup(app->channels);
- app->channels = NULL;
- ao2_cleanup(app->bridges);
- app->bridges = NULL;
}
-struct app *app_create(const char *name, stasis_app_cb handler, void *data)
+static void sub_default_handler(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
{
- RAII_VAR(struct app *, app, NULL, ao2_cleanup);
- size_t size;
+ struct app *app = data;
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
- ast_assert(name != NULL);
- ast_assert(handler != NULL);
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(app);
+ }
- ast_verb(1, "Creating Stasis app '%s'\n", name);
+ /* By default, send any message that has a JSON representation */
+ json = stasis_message_to_json(message);
+ if (!json) {
+ return;
+ }
- size = sizeof(*app) + strlen(name) + 1;
- app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
+ app_send(app, json);
+}
- if (!app) {
- return NULL;
+/*! \brief Typedef for callbacks that get called on channel snapshot updates */
+typedef struct ast_json *(*channel_snapshot_monitor)(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv);
+
+static struct ast_json *simple_channel_event(
+ const char *type,
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_created_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelCreated", snapshot, tv);
+}
+
+static struct ast_json *channel_destroyed_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelDestroyed",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "cause", snapshot->hangupcause,
+ "cause_txt", ast_cause2str(snapshot->hangupcause),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_state_change_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelStateChange", snapshot, tv);
+}
+
+/*! \brief Handle channel state changes */
+static struct ast_json *channel_state(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
+{
+ struct ast_channel_snapshot *snapshot = new_snapshot ?
+ new_snapshot : old_snapshot;
+
+ if (!old_snapshot) {
+ return channel_created_event(snapshot, tv);
+ } else if (!new_snapshot) {
+ return channel_destroyed_event(snapshot, tv);
+ } else if (old_snapshot->state != new_snapshot->state) {
+ return channel_state_change_event(snapshot, tv);
}
- strncpy(app->name, name, size - sizeof(*app));
- app->handler = handler;
- ao2_ref(data, +1);
- app->data = data;
+ return NULL;
+}
- app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
- if (!app->channels) {
+static struct ast_json *channel_dialplan(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+ /* No Newexten event on cache clear or first event */
+ if (!old_snapshot || !new_snapshot) {
return NULL;
}
- app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS);
- if (!app->bridges) {
+ /* Empty application is not valid for a Newexten event */
+ if (ast_strlen_zero(new_snapshot->appl)) {
return NULL;
}
- ao2_ref(app, +1);
- return app;
+ if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
+ return NULL;
+ }
+
+ return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
+ "type", "ChannelDialplan",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "dialplan_app", new_snapshot->appl,
+ "dialplan_app_data", new_snapshot->data,
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
-int app_add_channel(struct app *app, const struct ast_channel *chan)
+static struct ast_json *channel_callerid(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
- SCOPED_AO2LOCK(lock, app);
- const char *uniqueid;
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
- ast_assert(app != NULL);
- ast_assert(chan != NULL);
+ /* No NewCallerid event on cache clear or first event */
+ if (!old_snapshot || !new_snapshot) {
+ return NULL;
+ }
- /* Don't accept new channels in an inactive application */
- if (!app->handler) {
- return -1;
+ if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
+ return NULL;
}
- uniqueid = ast_channel_uniqueid(chan);
- return ast_str_container_add(app->channels, uniqueid) ? -1 : 0;
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelCallerId",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "caller_presentation", new_snapshot->caller_pres,
+ "caller_presentation_txt", ast_describe_caller_presentation(
+ new_snapshot->caller_pres),
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
-void app_remove_channel(struct app* app, const struct ast_channel *chan)
+static channel_snapshot_monitor channel_monitors[] = {
+ channel_state,
+ channel_dialplan,
+ channel_callerid
+};
+
+static void sub_channel_update_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
{
- SCOPED_AO2LOCK(lock, app);
+ struct app *app = data;
+ struct stasis_cache_update *update;
+ struct ast_channel_snapshot *new_snapshot;
+ struct ast_channel_snapshot *old_snapshot;
+ const struct timeval *tv;
+ int i;
+
+ ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+ update = stasis_message_data(message);
+
+ ast_assert(update->type == ast_channel_snapshot_type());
+
+ new_snapshot = stasis_message_data(update->new_snapshot);
+ old_snapshot = stasis_message_data(update->old_snapshot);
+
+ /* Pull timestamp from the new snapshot, or from the update message
+ * when there isn't one. */
+ tv = update->new_snapshot ?
+ stasis_message_timestamp(update->new_snapshot) :
+ stasis_message_timestamp(message);
- ast_assert(app != NULL);
- ast_assert(chan != NULL);
+ for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
+ msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
+ if (msg) {
+ app_send(app, msg);
+ }
+ }
}
-int app_add_bridge(struct app *app, const char *uniqueid)
+static struct ast_json *simple_bridge_event(
+ const char *type,
+ struct ast_bridge_snapshot *snapshot,
+ const struct timeval *tv)
{
- SCOPED_AO2LOCK(lock, app);
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
- ast_assert(app != NULL);
- ast_assert(uniqueid != NULL);
+static void sub_bridge_update_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct app *app = data;
+ struct stasis_cache_update *update;
+ struct ast_bridge_snapshot *new_snapshot;
+ struct ast_bridge_snapshot *old_snapshot;
+ const struct timeval *tv;
- /* Don't accept new bridges in an inactive application */
- if (!app->handler) {
- return -1;
+ ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+ update = stasis_message_data(message);
+
+ ast_assert(update->type == ast_bridge_snapshot_type());
+
+ new_snapshot = stasis_message_data(update->new_snapshot);
+ old_snapshot = stasis_message_data(update->old_snapshot);
+ tv = update->new_snapshot ?
+ stasis_message_timestamp(update->new_snapshot) :
+ stasis_message_timestamp(message);
+
+ if (!new_snapshot) {
+ json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
+ } else if (!old_snapshot) {
+ json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
+ }
+
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
+}
+
+static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct app *app = data;
+ struct ast_bridge_merge_message *merge;
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(app);
}
- return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0;
+ if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
+ return;
+ }
+
+ merge = stasis_message_data(message);
+
+ /* Find out if we're subscribed to either bridge */
+ forwards = ao2_find(app->forwards, merge->from->uniqueid,
+ OBJ_SEARCH_KEY);
+ if (!forwards) {
+ forwards = ao2_find(app->forwards, merge->to->uniqueid,
+ OBJ_SEARCH_KEY);
+ }
+
+ if (!forwards) {
+ return;
+ }
+
+ /* Forward the message to the app */
+ stasis_forward_message(app->topic, topic, message);
}
-void app_remove_bridge(struct app* app, const char *uniqueid)
+struct app *app_create(const char *name, stasis_app_cb handler, void *data)
{
- SCOPED_AO2LOCK(lock, app);
+ RAII_VAR(struct app *, app, NULL, ao2_cleanup);
+ size_t size;
+ int res = 0;
+
+ ast_assert(name != NULL);
+ ast_assert(handler != NULL);
+
+ ast_verb(1, "Creating Stasis app '%s'\n", name);
+
+ size = sizeof(*app) + strlen(name) + 1;
+ app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
+
+ if (!app) {
+ return NULL;
+ }
+
+ app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+ forwards_sort, NULL);
+ if (!app->forwards) {
+ return NULL;
+ }
+
+ app->topic = stasis_topic_create(name);
+ if (!app->topic) {
+ return NULL;
+ }
+
+ app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
+ bridge_merge_handler, app);
+ if (!app->bridge_merge_sub) {
+ return NULL;
+ }
+ /* Subscription holds a reference */
+ ao2_ref(app, +1);
+
+ app->router = stasis_message_router_create(app->topic);
+ if (!app->router) {
+ return NULL;
+ }
+
+ res |= stasis_message_router_add_cache_update(app->router,
+ ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
+
+ res |= stasis_message_router_add_cache_update(app->router,
+ ast_channel_snapshot_type(), sub_channel_update_handler, app);
+
+ res |= stasis_message_router_set_default(app->router,
+ sub_default_handler, app);
- ast_assert(app != NULL);
- ast_assert(uniqueid != NULL);
+ if (res != 0) {
+ return NULL;
+ }
+ /* Router holds a reference */
+ ao2_ref(app, +1);
+
+ strncpy(app->name, name, size - sizeof(*app));
+ app->handler = handler;
+ ao2_ref(data, +1);
+ app->data = data;
- ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE);
+ ao2_ref(app, +1);
+ return app;
}
/*!
@@ -196,6 +583,18 @@ void app_deactivate(struct app *app)
app->data = NULL;
}
+void app_shutdown(struct app *app)
+{
+ SCOPED_AO2LOCK(lock, app);
+
+ ast_assert(app_is_finished(app));
+
+ stasis_message_router_unsubscribe(app->router);
+ app->router = NULL;
+ stasis_unsubscribe(app->bridge_merge_sub);
+ app->bridge_merge_sub = NULL;
+}
+
int app_is_active(struct app *app)
{
SCOPED_AO2LOCK(lock, app);
@@ -206,8 +605,7 @@ int app_is_finished(struct app *app)
{
SCOPED_AO2LOCK(lock, app);
- return app->handler == NULL &&
- ao2_container_count(app->channels) == 0;
+ return app->handler == NULL && ao2_container_count(app->forwards) == 0;
}
void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -229,7 +627,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data)
ast_verb(1, "Activating Stasis app '%s'\n", app->name);
}
-
app->handler = handler;
ao2_cleanup(app->data);
if (data) {
@@ -243,16 +640,100 @@ const char *app_name(const struct app *app)
return app->name;
}
-int app_is_watching_channel(struct app *app, const char *uniqueid)
+int app_subscribe_channel(struct app *app, struct ast_channel *chan)
{
- RAII_VAR(char *, found, NULL, ao2_cleanup);
- found = ao2_find(app->channels, uniqueid, OBJ_KEY);
- return found != NULL;
+ int res;
+
+ if (!app || !chan) {
+ return -1;
+ } else {
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
+ }
+
+ res = ao2_link_flags(app->forwards, forwards,
+ OBJ_NOLOCK);
+ if (!res) {
+ return -1;
+ }
+ }
+
+ ++forwards->interested;
+ return 0;
+ }
+}
+
+static int unsubscribe(struct app *app, const char *kind, const char *id)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ ast_log(LOG_ERROR,
+ "App '%s' not subscribed to %s '%s'",
+ app->name, kind, id);
+ return -1;
+ }
+
+ if (--forwards->interested == 0) {
+ /* No one is interested any more; unsubscribe */
+ forwards_unsubscribe(forwards);
+ ao2_find(app->forwards, forwards,
+ OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
+ OBJ_NODATA);
+ }
+
+ return 0;
}
-int app_is_watching_bridge(struct app *app, const char *uniqueid)
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
{
- RAII_VAR(char *, found, NULL, ao2_cleanup);
- found = ao2_find(app->bridges, uniqueid, OBJ_KEY);
- return found != NULL;
+ if (!app || !chan) {
+ return -1;
+ }
+
+ return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
+}
+
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+ if (!app || !bridge) {
+ return -1;
+ } else {
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, bridge->uniqueid,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
+ if (!forwards) {
+ return -1;
+ }
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ }
+
+ ++forwards->interested;
+ return 0;
+ }
+}
+
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+ if (!app || !bridge) {
+ return -1;
+ }
+
+ return unsubscribe(app, "bridge", bridge->uniqueid);
}
diff --git a/res/stasis/app.h b/res/stasis/app.h
index 0cf92217f..5f9f1d7e7 100644
--- a/res/stasis/app.h
+++ b/res/stasis/app.h
@@ -48,6 +48,15 @@ struct app;
struct app *app_create(const char *name, stasis_app_cb handler, void *data);
/*!
+ * \brief Tears down an application.
+ *
+ * It should be finished before calling this.
+ *
+ * \param app Application to unsubscribe.
+ */
+void app_shutdown(struct app *app);
+
+/*!
* \brief Deactivates an application.
*
* Any channels currently in the application remain active (since the app might
@@ -96,17 +105,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data);
const char *app_name(const struct app *app);
/*!
- * \brief Subscribe an application to a topic.
- *
- * \param app Application.
- * \param topic Topic to subscribe to.
- * \return New subscription.
- * \return \c NULL on error.
- */
-struct stasis_subscription *app_subscribe(struct app *app,
- struct stasis_topic *topic);
-
-/*!
* \brief Send a message to an application.
*
* \param app Application.
@@ -114,83 +112,44 @@ struct stasis_subscription *app_subscribe(struct app *app,
*/
void app_send(struct app *app, struct ast_json *message);
-/*!
- * \brief Send the start message to an application.
- *
- * \param app Application.
- * \param chan The channel entering the application.
- * \param argc The number of arguments for the application.
- * \param argv The arguments for the application.
- * \return 0 on success.
- * \return Non-zero on error.
- */
-int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc,
- char *argv[]);
+struct app_forwards;
/*!
- * \brief Send the end message to an application.
+ * \brief Subscribes an application to a channel.
*
* \param app Application.
- * \param chan The channel leaving the application.
+ * \param chan Channel to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_send_end_msg(struct app *app, struct ast_channel *chan);
+int app_subscribe_channel(struct app *app, struct ast_channel *chan);
/*!
- * \brief Checks if an application is watching a given channel.
+ * \brief Cancel the subscription an app has for a channel.
*
- * \param app Application.
- * \param uniqueid Uniqueid of the channel to check about.
- * \return True (non-zero) if \a app is watching channel with given \a uniqueid
- * \return False (zero) if \a app isn't.
+ * \param app Subscribing application.
+ * \param forwards Returned object from app_subscribe_channel().
*/
-int app_is_watching_channel(struct app *app, const char *uniqueid);
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan);
/*!
- * \brief Add a channel to an application's watch list.
+ * \brief Add a bridge subscription to an existing channel subscription.
*
* \param app Application.
- * \param chan Channel to watch.
+ * \param bridge Bridge to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_add_channel(struct app *app, const struct ast_channel *chan);
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge);
/*!
- * \brief Remove a channel from an application's watch list.
+ * \brief Cancel the bridge subscription for an application.
*
- * \param app Application.
- * \param chan Channel to watch.
- */
-void app_remove_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Add a bridge to an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to watch.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_add_bridge(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Remove a bridge from an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to remove.
- */
-void app_remove_bridge(struct app* app, const char *uniqueid);
-
-/*!
- * \brief Checks if an application is watching a given bridge.
- *
- * \param app Application.
- * \param uniqueid Uniqueid of the bridge to check.
- * \return True (non-zero) if \a app is watching bridge with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_bridge(struct app *app, const char *uniqueid);
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge);
#endif /* _ASTERISK_RES_STASIS_APP_H */