summaryrefslogtreecommitdiff
path: root/res/stasis/app.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2013-10-04 16:01:48 +0000
committerMatthew Jordan <mjordan@digium.com>2013-10-04 16:01:48 +0000
commit8d7873b836999b09caad87abec27579f1f065b84 (patch)
treecdf683ab18142553b42102de6c5ca52fc71b22a2 /res/stasis/app.c
parentb52c972b172087d27178c0e60127d486d4e500f8 (diff)
ARI: Add subscription support
This patch adds an /applications API to ARI, allowing explicit management of Stasis applications. * GET /applications - list current applications * GET /applications/{applicationName} - get details of a specific application * POST /applications/{applicationName}/subscription - explicitly subscribe to a channel, bridge or endpoint * DELETE /applications/{applicationName}/subscription - explicitly unsubscribe from a channel, bridge or endpoint Subscriptions work by a reference counting mechanism: if you subscript to an event source X number of times, you must unsubscribe X number of times to stop receiveing events for that event source. Review: https://reviewboard.asterisk.org/r/2862 (issue ASTERISK-22451) Reported by: Matt Jordan ........ Merged revisions 400522 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400523 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res/stasis/app.c')
-rw-r--r--res/stasis/app.c205
1 files changed, 201 insertions, 4 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c
index bc1268fb7..aac9760b3 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_app.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
struct app {
@@ -52,6 +53,12 @@ struct app {
char name[];
};
+enum forward_type {
+ FORWARD_CHANNEL,
+ FORWARD_BRIDGE,
+ FORWARD_ENDPOINT,
+};
+
/*! Subscription info for a particular channel/bridge. */
struct app_forwards {
/*! Count of number of times this channel/bridge has been subscribed */
@@ -62,6 +69,8 @@ struct app_forwards {
/*! Forward for the caching topic */
struct stasis_forward *topic_cached_forward;
+ /* Type of object being forwarded */
+ enum forward_type forward_type;
/*! Unique id of the object being forwarded */
char id[];
};
@@ -119,6 +128,7 @@ static struct app_forwards *forwards_create_channel(struct app *app,
return NULL;
}
+ forwards->forward_type = FORWARD_CHANNEL;
forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
app->topic);
if (!forwards->topic_forward) {
@@ -153,6 +163,7 @@ static struct app_forwards *forwards_create_bridge(struct app *app,
return NULL;
}
+ forwards->forward_type = FORWARD_BRIDGE;
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
app->topic);
if (!forwards->topic_forward) {
@@ -172,6 +183,41 @@ static struct app_forwards *forwards_create_bridge(struct app *app,
return forwards;
}
+/*! Forward a endpoint's topics to an app */
+static struct app_forwards *forwards_create_endpoint(struct app *app,
+ struct ast_endpoint *endpoint)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !endpoint) {
+ return NULL;
+ }
+
+ forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+ if (!forwards) {
+ return NULL;
+ }
+
+ forwards->forward_type = FORWARD_ENDPOINT;
+ forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+ app->topic);
+ if (!forwards->topic_forward) {
+ return NULL;
+ }
+
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_endpoint_topic_cached(endpoint), app->topic);
+ if (!forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ stasis_forward_cancel(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ return NULL;
+ }
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
{
const struct app_forwards *object_left = obj_left;
@@ -397,6 +443,47 @@ static void sub_channel_update_handler(void *data,
}
}
+static struct ast_json *simple_endpoint_event(
+ const char *type,
+ struct ast_endpoint_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "endpoint", ast_endpoint_snapshot_to_json(snapshot));
+}
+
+static void sub_endpoint_update_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct app *app = data;
+ struct stasis_cache_update *update;
+ struct ast_endpoint_snapshot *new_snapshot;
+ const struct timeval *tv;
+
+ ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+ update = stasis_message_data(message);
+
+ ast_assert(update->type == ast_endpoint_snapshot_type());
+
+ new_snapshot = stasis_message_data(update->new_snapshot);
+ tv = update->new_snapshot ?
+ stasis_message_timestamp(update->new_snapshot) :
+ stasis_message_timestamp(message);
+
+ json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
+}
+
static struct ast_json *simple_bridge_event(
const char *type,
struct ast_bridge_snapshot *snapshot,
@@ -526,6 +613,9 @@ struct app *app_create(const char *name, stasis_app_cb handler, void *data)
res |= stasis_message_router_add_cache_update(app->router,
ast_channel_snapshot_type(), sub_channel_update_handler, app);
+ res |= stasis_message_router_add_cache_update(app->router,
+ ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
+
res |= stasis_message_router_set_default(app->router,
sub_default_handler, app);
@@ -640,6 +730,56 @@ const char *app_name(const struct app *app)
return app->name;
}
+struct ast_json *app_to_json(const struct app *app)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ast_json *channels;
+ struct ast_json *bridges;
+ struct ast_json *endpoints;
+ struct ao2_iterator i;
+ void *obj;
+
+ json = ast_json_pack("{s: s, s: [], s: [], s: []}",
+ "name", app->name,
+ "channel_ids", "bridge_ids", "endpoint_ids");
+ channels = ast_json_object_get(json, "channel_ids");
+ bridges = ast_json_object_get(json, "bridge_ids");
+ endpoints = ast_json_object_get(json, "endpoint_ids");
+
+ i = ao2_iterator_init(app->forwards, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
+ RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
+ int append_res = -1;
+
+ id = ast_json_string_create(forwards->id);
+
+ switch (forwards->forward_type) {
+ case FORWARD_CHANNEL:
+ append_res = ast_json_array_append(channels,
+ ast_json_ref(id));
+ break;
+ case FORWARD_BRIDGE:
+ append_res = ast_json_array_append(bridges,
+ ast_json_ref(id));
+ break;
+ case FORWARD_ENDPOINT:
+ append_res = ast_json_array_append(endpoints,
+ ast_json_ref(id));
+ break;
+ }
+
+ if (append_res != 0) {
+ ast_log(LOG_ERROR, "Error building response\n");
+ ao2_iterator_destroy(&i);
+ return NULL;
+ }
+ }
+ ao2_iterator_destroy(&i);
+
+ return ast_json_ref(json);
+}
+
int app_subscribe_channel(struct app *app, struct ast_channel *chan)
{
int res;
@@ -678,8 +818,8 @@ static int unsubscribe(struct app *app, const char *kind, const char *id)
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
- ast_log(LOG_ERROR,
- "App '%s' not subscribed to %s '%s'",
+ ast_log(LOG_WARNING,
+ "App '%s' not subscribed to %s '%s'\n",
app->name, kind, id);
return -1;
}
@@ -701,7 +841,23 @@ int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
return -1;
}
- return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
+ return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+}
+
+int app_unsubscribe_channel_id(struct app *app, const char *channel_id)
+{
+ if (!app || !channel_id) {
+ return -1;
+ }
+
+ return unsubscribe(app, "channel", channel_id);
+}
+
+int app_is_subscribed_channel_id(struct app *app, const char *channel_id)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
+ return forwards != NULL;
}
int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
@@ -735,5 +891,46 @@ int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
return -1;
}
- return unsubscribe(app, "bridge", bridge->uniqueid);
+ return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+}
+
+int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id)
+{
+ if (!app || !bridge_id) {
+ return -1;
+ }
+
+ return unsubscribe(app, "bridge", bridge_id);
+}
+
+int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
+ return forwards != NULL;
+}
+
+int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint)
+{
+ if (!app || !endpoint) {
+ return -1;
+ } else {
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_endpoint(app, endpoint);
+ if (!forwards) {
+ return -1;
+ }
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ }
+
+ ++forwards->interested;
+ return 0;
+ }
}