summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/stasis_app.h15
-rw-r--r--res/ari/resource_channels.c22
-rw-r--r--res/res_stasis.c23
-rw-r--r--res/stasis/app.c46
4 files changed, 82 insertions, 24 deletions
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h
index 334155a5b..a7b204034 100644
--- a/include/asterisk/stasis_app.h
+++ b/include/asterisk/stasis_app.h
@@ -297,6 +297,21 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json);
+/*!
+ * \brief Directly subscribe an application to a channel
+ *
+ * \param app_name Name of the application to subscribe.
+ * \param chan The channel to subscribe to
+ *
+ * \return \ref stasis_app_subscribe_res return code.
+ *
+ * \note This method can be used when you already hold a channel and its
+ * lock. This bypasses the channel lookup that would normally be
+ * performed by \ref stasis_app_subscribe.
+ */
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+ struct ast_channel *chan);
+
/*! @} */
/*! @{ */
diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c
index 6cc00ce41..393609298 100644
--- a/res/ari/resource_channels.c
+++ b/res/ari/resource_channels.c
@@ -42,6 +42,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_app_snoop.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/causes.h"
+#include "asterisk/core_local.h"
#include "resource_channels.h"
#include <limits.h>
@@ -775,6 +776,7 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
struct ast_format tmp_fmt;
char *stuff;
struct ast_channel *chan;
+ struct ast_channel *local_peer;
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
struct ast_assigned_ids assignedids = {
.uniqueid = args_channel_id,
@@ -859,20 +861,24 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
return;
}
- snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
- ast_channel_unlock(chan);
+ /* See if this is a Local channel and if so, get the peer */
+ local_peer = ast_local_get_peer(chan);
if (!ast_strlen_zero(args_app)) {
- /* channel: + channel ID + null terminator */
- char uri[9 + strlen(ast_channel_uniqueid(chan))];
- const char *uris[1] = { uri, };
-
- sprintf(uri, "channel:%s", ast_channel_uniqueid(chan));
- stasis_app_subscribe(args_app, uris, 1, NULL);
+ stasis_app_subscribe_channel(args_app, chan);
+ if (local_peer) {
+ stasis_app_subscribe_channel(args_app, local_peer);
+ }
}
+ snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
+ ast_channel_unlock(chan);
+
ast_ari_response_ok(response, ast_channel_snapshot_to_json(snapshot, NULL));
ast_channel_unref(chan);
+ if (local_peer) {
+ ast_channel_unref(local_peer);
+ }
}
void ast_ari_channels_originate_with_id(struct ast_variable *headers,
diff --git a/res/res_stasis.c b/res/res_stasis.c
index 0184d209c..ff7424503 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -1225,6 +1225,29 @@ static enum stasis_app_subscribe_res app_handle_subscriptions(
return STASIS_ASR_OK;
}
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
+ int res;
+
+ if (!app) {
+ return STASIS_ASR_APP_NOT_FOUND;
+ }
+
+ ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
+
+ res = app_subscribe_channel(app, chan);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+ app_name, ast_channel_uniqueid(chan));
+ return STASIS_ASR_INTERNAL_ERROR;
+ }
+
+ return STASIS_ASR_OK;
+}
+
+
/*!
* \internal
* \brief Subscribe an app to an event source.
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 4dcb635ef..41f6ccf65 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -36,6 +36,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
+
struct stasis_app {
/*! Aggregation topic for this application. */
struct stasis_topic *topic;
@@ -449,7 +451,7 @@ static struct ast_json *channel_callerid(
static channel_snapshot_monitor channel_monitors[] = {
channel_state,
channel_dialplan,
- channel_callerid
+ channel_callerid,
};
static void sub_channel_update_handler(void *data,
@@ -486,6 +488,10 @@ static void sub_channel_update_handler(void *data,
app_send(app, msg);
}
}
+
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
+ }
}
static struct ast_json *simple_endpoint_event(
@@ -513,6 +519,7 @@ static void sub_endpoint_update_handler(void *data,
struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot;
+ struct ast_endpoint_snapshot *old_snapshot;
const struct timeval *tv;
ast_assert(stasis_message_type(message) == stasis_cache_update_type());
@@ -522,17 +529,22 @@ static void sub_endpoint_update_handler(void *data,
ast_assert(update->type == ast_endpoint_snapshot_type());
new_snapshot = stasis_message_data(update->new_snapshot);
- tv = update->new_snapshot ?
- stasis_message_timestamp(update->new_snapshot) :
- stasis_message_timestamp(message);
+ old_snapshot = stasis_message_data(update->old_snapshot);
- json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+ if (new_snapshot) {
+ tv = stasis_message_timestamp(update->new_snapshot);
- if (!json) {
- return;
+ json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
}
- app_send(app, json);
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "endpoint", old_snapshot->id, 1);
+ }
}
static struct ast_json *simple_bridge_event(
@@ -580,11 +592,13 @@ static void sub_bridge_update_handler(void *data,
json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
}
- if (!json) {
- return;
+ if (json) {
+ app_send(app, json);
}
- app_send(app, json);
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
+ }
}
@@ -982,7 +996,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj)
return app_subscribe_channel(app, obj);
}
-static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
@@ -997,7 +1011,7 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
forwards->interested--;
ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
- if (forwards->interested == 0) {
+ if (forwards->interested == 0 || terminate) {
/* No one is interested any more; unsubscribe */
ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
forwards_unsubscribe(forwards);
@@ -1024,7 +1038,7 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
return -1;
}
- return unsubscribe(app, "channel", channel_id);
+ return unsubscribe(app, "channel", channel_id, 0);
}
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
@@ -1093,7 +1107,7 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
return -1;
}
- return unsubscribe(app, "bridge", bridge_id);
+ return unsubscribe(app, "bridge", bridge_id, 0);
}
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
@@ -1153,7 +1167,7 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
return -1;
}
- return unsubscribe(app, "endpoint", endpoint_id);
+ return unsubscribe(app, "endpoint", endpoint_id, 0);
}
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)