summaryrefslogtreecommitdiff
path: root/res/stasis/app.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-07-07 02:15:00 +0000
committerMatthew Jordan <mjordan@digium.com>2014-07-07 02:15:00 +0000
commitd4b436d0ea766d51d7743a874df441d40fdcd226 (patch)
tree955ebd4c8cf179d6dcdcdbfa6a32ac654f3600a9 /res/stasis/app.c
parentedcaa54019a14cdfd2d5e8453b15a52819cecb36 (diff)
ARI/res_stasis: Subscribe to both Local channel halves when originating to app
This patch fixes two bugs: 1. When originating a channel into a Stasis application, we already create a subscription for the channel that is going into our Stasis app. Unfortunately, when you create a Local channel and pass it off to a Stasis app, you really aren't creating just one channel: you're creating two. This patch snags the second half of the Local channel pair (assuming it is a Local channel pair, but luckily core_local is kind about such assumptions) and subscribes to it as well. 2. Subscriptions are a bit sticky right now. If a subscription is made, the 'interest' count gets bumped on the Stasis subscription - but unless something explicitly unsubscribes the channel, said subscription sticks around. This is not much of a problem is a user is creating the subscription - if they made it, they must want it. However, when we are creating implicit subscriptions, we need to make sure something clears them out. This patch takes a pessimistic approach: it watches the cache updates coming from Stasis and, if we notice that the cache just cleared out an object, we delete our subscription object. This keeps our ao2 container of Stasis forwards in an application from growing out of hand; it also is a bit more forgiving for end users who may not realize they were supposed to unsubscribe from that channel that just hung up. Review: https://reviewboard.asterisk.org/r/3710/ #ASTERISK-23939 #close ........ Merged revisions 418089 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@418090 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res/stasis/app.c')
-rw-r--r--res/stasis/app.c46
1 files changed, 30 insertions, 16 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 4dcb635ef..41f6ccf65 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -36,6 +36,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
+
struct stasis_app {
/*! Aggregation topic for this application. */
struct stasis_topic *topic;
@@ -449,7 +451,7 @@ static struct ast_json *channel_callerid(
static channel_snapshot_monitor channel_monitors[] = {
channel_state,
channel_dialplan,
- channel_callerid
+ channel_callerid,
};
static void sub_channel_update_handler(void *data,
@@ -486,6 +488,10 @@ static void sub_channel_update_handler(void *data,
app_send(app, msg);
}
}
+
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
+ }
}
static struct ast_json *simple_endpoint_event(
@@ -513,6 +519,7 @@ static void sub_endpoint_update_handler(void *data,
struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot;
+ struct ast_endpoint_snapshot *old_snapshot;
const struct timeval *tv;
ast_assert(stasis_message_type(message) == stasis_cache_update_type());
@@ -522,17 +529,22 @@ static void sub_endpoint_update_handler(void *data,
ast_assert(update->type == ast_endpoint_snapshot_type());
new_snapshot = stasis_message_data(update->new_snapshot);
- tv = update->new_snapshot ?
- stasis_message_timestamp(update->new_snapshot) :
- stasis_message_timestamp(message);
+ old_snapshot = stasis_message_data(update->old_snapshot);
- json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+ if (new_snapshot) {
+ tv = stasis_message_timestamp(update->new_snapshot);
- if (!json) {
- return;
+ json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
}
- app_send(app, json);
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "endpoint", old_snapshot->id, 1);
+ }
}
static struct ast_json *simple_bridge_event(
@@ -580,11 +592,13 @@ static void sub_bridge_update_handler(void *data,
json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
}
- if (!json) {
- return;
+ if (json) {
+ app_send(app, json);
}
- app_send(app, json);
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
+ }
}
@@ -982,7 +996,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj)
return app_subscribe_channel(app, obj);
}
-static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
@@ -997,7 +1011,7 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
forwards->interested--;
ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
- if (forwards->interested == 0) {
+ if (forwards->interested == 0 || terminate) {
/* No one is interested any more; unsubscribe */
ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
forwards_unsubscribe(forwards);
@@ -1024,7 +1038,7 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
return -1;
}
- return unsubscribe(app, "channel", channel_id);
+ return unsubscribe(app, "channel", channel_id, 0);
}
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
@@ -1093,7 +1107,7 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
return -1;
}
- return unsubscribe(app, "bridge", bridge_id);
+ return unsubscribe(app, "bridge", bridge_id, 0);
}
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
@@ -1153,7 +1167,7 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
return -1;
}
- return unsubscribe(app, "endpoint", endpoint_id);
+ return unsubscribe(app, "endpoint", endpoint_id, 0);
}
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)