diff options
-rw-r--r-- | include/asterisk/stasis_app.h | 15 | ||||
-rw-r--r-- | res/ari/resource_channels.c | 22 | ||||
-rw-r--r-- | res/res_stasis.c | 23 | ||||
-rw-r--r-- | res/stasis/app.c | 46 |
4 files changed, 82 insertions, 24 deletions
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h index 334155a5b..a7b204034 100644 --- a/include/asterisk/stasis_app.h +++ b/include/asterisk/stasis_app.h @@ -297,6 +297,21 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name, const char **event_source_uris, int event_sources_count, struct ast_json **json); +/*! + * \brief Directly subscribe an application to a channel + * + * \param app_name Name of the application to subscribe. + * \param chan The channel to subscribe to + * + * \return \ref stasis_app_subscribe_res return code. + * + * \note This method can be used when you already hold a channel and its + * lock. This bypasses the channel lookup that would normally be + * performed by \ref stasis_app_subscribe. + */ +enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name, + struct ast_channel *chan); + /*! @} */ /*! @{ */ diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index 6cc00ce41..393609298 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -42,6 +42,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_app_snoop.h" #include "asterisk/stasis_channels.h" #include "asterisk/causes.h" +#include "asterisk/core_local.h" #include "resource_channels.h" #include <limits.h> @@ -775,6 +776,7 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint, struct ast_format tmp_fmt; char *stuff; struct ast_channel *chan; + struct ast_channel *local_peer; RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); struct ast_assigned_ids assignedids = { .uniqueid = args_channel_id, @@ -859,20 +861,24 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint, return; } - snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan)); - ast_channel_unlock(chan); + /* See if this is a Local channel and if so, get the peer */ + local_peer = ast_local_get_peer(chan); if (!ast_strlen_zero(args_app)) { - /* channel: + channel ID + null terminator */ - char uri[9 + strlen(ast_channel_uniqueid(chan))]; - const char *uris[1] = { uri, }; - - sprintf(uri, "channel:%s", ast_channel_uniqueid(chan)); - stasis_app_subscribe(args_app, uris, 1, NULL); + stasis_app_subscribe_channel(args_app, chan); + if (local_peer) { + stasis_app_subscribe_channel(args_app, local_peer); + } } + snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan)); + ast_channel_unlock(chan); + ast_ari_response_ok(response, ast_channel_snapshot_to_json(snapshot, NULL)); ast_channel_unref(chan); + if (local_peer) { + ast_channel_unref(local_peer); + } } void ast_ari_channels_originate_with_id(struct ast_variable *headers, diff --git a/res/res_stasis.c b/res/res_stasis.c index 0184d209c..ff7424503 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -1225,6 +1225,29 @@ static enum stasis_app_subscribe_res app_handle_subscriptions( return STASIS_ASR_OK; } +enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name, + struct ast_channel *chan) +{ + RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup); + int res; + + if (!app) { + return STASIS_ASR_APP_NOT_FOUND; + } + + ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan)); + + res = app_subscribe_channel(app, chan); + if (res != 0) { + ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", + app_name, ast_channel_uniqueid(chan)); + return STASIS_ASR_INTERNAL_ERROR; + } + + return STASIS_ASR_OK; +} + + /*! * \internal * \brief Subscribe an app to an event source. diff --git a/res/stasis/app.c b/res/stasis/app.c index 4dcb635ef..41f6ccf65 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -36,6 +36,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_endpoints.h" #include "asterisk/stasis_message_router.h" +static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate); + struct stasis_app { /*! Aggregation topic for this application. */ struct stasis_topic *topic; @@ -449,7 +451,7 @@ static struct ast_json *channel_callerid( static channel_snapshot_monitor channel_monitors[] = { channel_state, channel_dialplan, - channel_callerid + channel_callerid, }; static void sub_channel_update_handler(void *data, @@ -486,6 +488,10 @@ static void sub_channel_update_handler(void *data, app_send(app, msg); } } + + if (!new_snapshot && old_snapshot) { + unsubscribe(app, "channel", old_snapshot->uniqueid, 1); + } } static struct ast_json *simple_endpoint_event( @@ -513,6 +519,7 @@ static void sub_endpoint_update_handler(void *data, struct stasis_app *app = data; struct stasis_cache_update *update; struct ast_endpoint_snapshot *new_snapshot; + struct ast_endpoint_snapshot *old_snapshot; const struct timeval *tv; ast_assert(stasis_message_type(message) == stasis_cache_update_type()); @@ -522,17 +529,22 @@ static void sub_endpoint_update_handler(void *data, ast_assert(update->type == ast_endpoint_snapshot_type()); new_snapshot = stasis_message_data(update->new_snapshot); - tv = update->new_snapshot ? - stasis_message_timestamp(update->new_snapshot) : - stasis_message_timestamp(message); + old_snapshot = stasis_message_data(update->old_snapshot); - json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv); + if (new_snapshot) { + tv = stasis_message_timestamp(update->new_snapshot); - if (!json) { - return; + json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv); + if (!json) { + return; + } + + app_send(app, json); } - app_send(app, json); + if (!new_snapshot && old_snapshot) { + unsubscribe(app, "endpoint", old_snapshot->id, 1); + } } static struct ast_json *simple_bridge_event( @@ -580,11 +592,13 @@ static void sub_bridge_update_handler(void *data, json = simple_bridge_event("BridgeCreated", new_snapshot, tv); } - if (!json) { - return; + if (json) { + app_send(app, json); } - app_send(app, json); + if (!new_snapshot && old_snapshot) { + unsubscribe(app, "bridge", old_snapshot->uniqueid, 1); + } } @@ -982,7 +996,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj) return app_subscribe_channel(app, obj); } -static int unsubscribe(struct stasis_app *app, const char *kind, const char *id) +static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate) { RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); SCOPED_AO2LOCK(lock, app->forwards); @@ -997,7 +1011,7 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id) forwards->interested--; ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name); - if (forwards->interested == 0) { + if (forwards->interested == 0 || terminate) { /* No one is interested any more; unsubscribe */ ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name); forwards_unsubscribe(forwards); @@ -1024,7 +1038,7 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id) return -1; } - return unsubscribe(app, "channel", channel_id); + return unsubscribe(app, "channel", channel_id, 0); } int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id) @@ -1093,7 +1107,7 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id) return -1; } - return unsubscribe(app, "bridge", bridge_id); + return unsubscribe(app, "bridge", bridge_id, 0); } int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id) @@ -1153,7 +1167,7 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) return -1; } - return unsubscribe(app, "endpoint", endpoint_id); + return unsubscribe(app, "endpoint", endpoint_id, 0); } int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id) |