summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-07-07 02:15:00 +0000
committerMatthew Jordan <mjordan@digium.com>2014-07-07 02:15:00 +0000
commitd4b436d0ea766d51d7743a874df441d40fdcd226 (patch)
tree955ebd4c8cf179d6dcdcdbfa6a32ac654f3600a9
parentedcaa54019a14cdfd2d5e8453b15a52819cecb36 (diff)
ARI/res_stasis: Subscribe to both Local channel halves when originating to app
This patch fixes two bugs: 1. When originating a channel into a Stasis application, we already create a subscription for the channel that is going into our Stasis app. Unfortunately, when you create a Local channel and pass it off to a Stasis app, you really aren't creating just one channel: you're creating two. This patch snags the second half of the Local channel pair (assuming it is a Local channel pair, but luckily core_local is kind about such assumptions) and subscribes to it as well. 2. Subscriptions are a bit sticky right now. If a subscription is made, the 'interest' count gets bumped on the Stasis subscription - but unless something explicitly unsubscribes the channel, said subscription sticks around. This is not much of a problem is a user is creating the subscription - if they made it, they must want it. However, when we are creating implicit subscriptions, we need to make sure something clears them out. This patch takes a pessimistic approach: it watches the cache updates coming from Stasis and, if we notice that the cache just cleared out an object, we delete our subscription object. This keeps our ao2 container of Stasis forwards in an application from growing out of hand; it also is a bit more forgiving for end users who may not realize they were supposed to unsubscribe from that channel that just hung up. Review: https://reviewboard.asterisk.org/r/3710/ #ASTERISK-23939 #close ........ Merged revisions 418089 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@418090 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r--include/asterisk/stasis_app.h15
-rw-r--r--res/ari/resource_channels.c22
-rw-r--r--res/res_stasis.c23
-rw-r--r--res/stasis/app.c46
4 files changed, 82 insertions, 24 deletions
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h
index 334155a5b..a7b204034 100644
--- a/include/asterisk/stasis_app.h
+++ b/include/asterisk/stasis_app.h
@@ -297,6 +297,21 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json);
+/*!
+ * \brief Directly subscribe an application to a channel
+ *
+ * \param app_name Name of the application to subscribe.
+ * \param chan The channel to subscribe to
+ *
+ * \return \ref stasis_app_subscribe_res return code.
+ *
+ * \note This method can be used when you already hold a channel and its
+ * lock. This bypasses the channel lookup that would normally be
+ * performed by \ref stasis_app_subscribe.
+ */
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+ struct ast_channel *chan);
+
/*! @} */
/*! @{ */
diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c
index 6cc00ce41..393609298 100644
--- a/res/ari/resource_channels.c
+++ b/res/ari/resource_channels.c
@@ -42,6 +42,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_app_snoop.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/causes.h"
+#include "asterisk/core_local.h"
#include "resource_channels.h"
#include <limits.h>
@@ -775,6 +776,7 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
struct ast_format tmp_fmt;
char *stuff;
struct ast_channel *chan;
+ struct ast_channel *local_peer;
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
struct ast_assigned_ids assignedids = {
.uniqueid = args_channel_id,
@@ -859,20 +861,24 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
return;
}
- snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
- ast_channel_unlock(chan);
+ /* See if this is a Local channel and if so, get the peer */
+ local_peer = ast_local_get_peer(chan);
if (!ast_strlen_zero(args_app)) {
- /* channel: + channel ID + null terminator */
- char uri[9 + strlen(ast_channel_uniqueid(chan))];
- const char *uris[1] = { uri, };
-
- sprintf(uri, "channel:%s", ast_channel_uniqueid(chan));
- stasis_app_subscribe(args_app, uris, 1, NULL);
+ stasis_app_subscribe_channel(args_app, chan);
+ if (local_peer) {
+ stasis_app_subscribe_channel(args_app, local_peer);
+ }
}
+ snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
+ ast_channel_unlock(chan);
+
ast_ari_response_ok(response, ast_channel_snapshot_to_json(snapshot, NULL));
ast_channel_unref(chan);
+ if (local_peer) {
+ ast_channel_unref(local_peer);
+ }
}
void ast_ari_channels_originate_with_id(struct ast_variable *headers,
diff --git a/res/res_stasis.c b/res/res_stasis.c
index 0184d209c..ff7424503 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -1225,6 +1225,29 @@ static enum stasis_app_subscribe_res app_handle_subscriptions(
return STASIS_ASR_OK;
}
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
+ int res;
+
+ if (!app) {
+ return STASIS_ASR_APP_NOT_FOUND;
+ }
+
+ ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
+
+ res = app_subscribe_channel(app, chan);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+ app_name, ast_channel_uniqueid(chan));
+ return STASIS_ASR_INTERNAL_ERROR;
+ }
+
+ return STASIS_ASR_OK;
+}
+
+
/*!
* \internal
* \brief Subscribe an app to an event source.
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 4dcb635ef..41f6ccf65 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -36,6 +36,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
+
struct stasis_app {
/*! Aggregation topic for this application. */
struct stasis_topic *topic;
@@ -449,7 +451,7 @@ static struct ast_json *channel_callerid(
static channel_snapshot_monitor channel_monitors[] = {
channel_state,
channel_dialplan,
- channel_callerid
+ channel_callerid,
};
static void sub_channel_update_handler(void *data,
@@ -486,6 +488,10 @@ static void sub_channel_update_handler(void *data,
app_send(app, msg);
}
}
+
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
+ }
}
static struct ast_json *simple_endpoint_event(
@@ -513,6 +519,7 @@ static void sub_endpoint_update_handler(void *data,
struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot;
+ struct ast_endpoint_snapshot *old_snapshot;
const struct timeval *tv;
ast_assert(stasis_message_type(message) == stasis_cache_update_type());
@@ -522,17 +529,22 @@ static void sub_endpoint_update_handler(void *data,
ast_assert(update->type == ast_endpoint_snapshot_type());
new_snapshot = stasis_message_data(update->new_snapshot);
- tv = update->new_snapshot ?
- stasis_message_timestamp(update->new_snapshot) :
- stasis_message_timestamp(message);
+ old_snapshot = stasis_message_data(update->old_snapshot);
- json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+ if (new_snapshot) {
+ tv = stasis_message_timestamp(update->new_snapshot);
- if (!json) {
- return;
+ json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
}
- app_send(app, json);
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "endpoint", old_snapshot->id, 1);
+ }
}
static struct ast_json *simple_bridge_event(
@@ -580,11 +592,13 @@ static void sub_bridge_update_handler(void *data,
json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
}
- if (!json) {
- return;
+ if (json) {
+ app_send(app, json);
}
- app_send(app, json);
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
+ }
}
@@ -982,7 +996,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj)
return app_subscribe_channel(app, obj);
}
-static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
@@ -997,7 +1011,7 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
forwards->interested--;
ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
- if (forwards->interested == 0) {
+ if (forwards->interested == 0 || terminate) {
/* No one is interested any more; unsubscribe */
ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
forwards_unsubscribe(forwards);
@@ -1024,7 +1038,7 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
return -1;
}
- return unsubscribe(app, "channel", channel_id);
+ return unsubscribe(app, "channel", channel_id, 0);
}
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
@@ -1093,7 +1107,7 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
return -1;
}
- return unsubscribe(app, "bridge", bridge_id);
+ return unsubscribe(app, "bridge", bridge_id, 0);
}
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
@@ -1153,7 +1167,7 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
return -1;
}
- return unsubscribe(app, "endpoint", endpoint_id);
+ return unsubscribe(app, "endpoint", endpoint_id, 0);
}
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)