From 4bb556a8478741dc81361327d3a50a8809be09f6 Mon Sep 17 00:00:00 2001 From: Kinsey Moore Date: Mon, 8 Dec 2014 15:45:46 +0000 Subject: Stasis: Fix StasisStart/End order and missing events This corrects several bugs that currently exist in the stasis application code. * After a masquerade, the resulting channels have channel topics that do not match their uniqueids ** Masquerades now swap channel topics appropriately * StasisStart and StasisEnd messages are leaked to observer applications due to being published on channel topics ** StasisStart and StasisEnd publishing is now properly restricted to controlling apps via app topics * Race conditions exist where StasisStart and StasisEnd messages due to a masquerade may be received out of order due to being published on different topics ** These messages are now published directly on the app topic so this is now a non-issue * StasisEnds are sometimes missing when sent due to masquerades and bridge swaps into and out of Stasis() ** This was due to StasisEnd processing adjusting message-sent flags after Stasis() had already exited and Stasis() had been re-entered ** This was corrected by adjusting these flags prior to sending the message while the initial Stasis() application was still shutting down Review: https://reviewboard.asterisk.org/r/4213/ ASTERISK-24537 #close Reported by: Matt DiMeo ........ Merged revisions 429061 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 429062 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@429063 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- include/asterisk/channel.h | 11 ++++++++ main/channel.c | 3 +++ main/channel_internal_api.c | 9 +++++++ res/res_stasis.c | 66 ++++++++++++++++----------------------------- res/stasis/app.c | 28 ------------------- res/stasis/app.h | 24 ----------------- res/stasis/stasis_bridge.c | 1 + 7 files changed, 47 insertions(+), 95 deletions(-) diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h index 96b34ad4a..45e94ce29 100644 --- a/include/asterisk/channel.h +++ b/include/asterisk/channel.h @@ -2464,6 +2464,17 @@ void ast_channel_internal_copy_linkedid(struct ast_channel *dest, struct ast_cha */ void ast_channel_internal_swap_uniqueid_and_linkedid(struct ast_channel *a, struct ast_channel *b); +/*! + * \brief Swap topics beteween two channels + * \param a First channel + * \param b Second channel + * \return void + * + * \note + * This is used in masquerade to exchange topics for message routing + */ +void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b); + /*! * \brief Set uniqueid and linkedid string value only (not time) * \param chan The channel to set the uniqueid to diff --git a/main/channel.c b/main/channel.c index 922fd931c..6bd253174 100644 --- a/main/channel.c +++ b/main/channel.c @@ -6519,6 +6519,9 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann */ ast_channel_internal_swap_uniqueid_and_linkedid(clonechan, original); + /* Make sure the Stasis topic on the channel is updated appropriately */ + ast_channel_internal_swap_topics(clonechan, original); + /* Swap channel names. This uses ast_channel_name_set directly, so we * don't get any spurious rename events. */ diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index c62812ea7..624bdd1cb 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -1534,6 +1534,15 @@ void ast_channel_internal_swap_uniqueid_and_linkedid(struct ast_channel *a, stru b->linkedid = temp; } +void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b) +{ + struct stasis_cp_single *temp; + + temp = a->topics; + a->topics = b->topics; + b->topics = temp; +} + void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid) { ast_copy_string(chan->uniqueid.unique_id, uniqueid, sizeof(chan->uniqueid.unique_id)); diff --git a/res/res_stasis.c b/res/res_stasis.c index 18d5963be..ad081d412 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -125,7 +125,7 @@ static struct ast_json *stasis_end_to_json(struct stasis_message *message, "channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize)); } -STASIS_MESSAGE_TYPE_DEFN(app_end_message_type, +STASIS_MESSAGE_TYPE_DEFN_LOCAL(end_message_type, .to_json = stasis_end_to_json); struct start_message_blob { @@ -919,6 +919,12 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app struct stasis_message *msg; int i; + if (app_subscribe_channel(app, chan)) { + ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", + app_name(app), ast_channel_name(chan)); + return -1; + } + payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor); if (!payload) { ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n"); @@ -928,7 +934,8 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app payload->channel = ao2_bump(snapshot); payload->replace_channel = ao2_bump(replace_channel_snapshot); - json_blob = ast_json_pack("{s: o, s: []}", + json_blob = ast_json_pack("{s: s, s: o, s: []}", + "app", app_name(app), "timestamp", ast_json_timeval(ast_tvnow(), NULL), "args"); if (!json_blob) { @@ -956,7 +963,10 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app return -1; } - stasis_publish(ast_channel_topic(chan), msg); + if (replace_channel_snapshot) { + app_unsubscribe_channel_id(app, replace_channel_snapshot->uniqueid); + } + stasis_publish(ast_app_get_topic(app), msg); ao2_ref(msg, -1); return 0; } @@ -988,6 +998,7 @@ int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan) { struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); struct ast_json *blob; + struct stasis_message *msg; if (sanitize && sanitize->channel && sanitize->channel(chan)) { @@ -1000,10 +1011,13 @@ int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan) return -1; } - stasis_app_channel_set_stasis_end_published(chan); remove_masquerade_store(chan); - ast_channel_publish_blob(chan, app_end_message_type(), blob); - + app_unsubscribe_channel(app, chan); + msg = ast_channel_blob_create(chan, end_message_type(), blob); + if (msg) { + stasis_publish(ast_app_get_topic(app), msg); + } + ao2_cleanup(msg); ast_json_unref(blob); return 0; @@ -1034,6 +1048,7 @@ static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct a } /* send the StasisEnd message to the app */ + stasis_app_channel_set_stasis_end_published(new_chan); app_send_end_msg(control_app(control), new_chan); /* remove the datastore */ @@ -1083,11 +1098,6 @@ static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct /* send the StasisEnd message to the app */ app_send_end_msg(control_app(control), old_chan); - /* fixup channel topic forwards */ - if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) { - ast_log(LOG_ERROR, "Failed to fixup channel topic forwards for %s(%s) owned by %s\n", - old_snapshot->name, old_snapshot->uniqueid, app_name(control_app(control))); - } ao2_cleanup(control); } @@ -1251,14 +1261,6 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, return -1; } - res = app_subscribe_channel(app, chan); - if (res != 0) { - ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", - app_name, ast_channel_name(chan)); - remove_masquerade_store(chan); - return -1; - } - res = send_start_msg(app, chan, argc, argv); if (res != 0) { ast_log(LOG_ERROR, @@ -1894,7 +1896,7 @@ static int unload_module(void) ao2_cleanup(app_bridges_playback); app_bridges_playback = NULL; - STASIS_MESSAGE_TYPE_CLEANUP(app_end_message_type); + STASIS_MESSAGE_TYPE_CLEANUP(end_message_type); STASIS_MESSAGE_TYPE_CLEANUP(start_message_type); return 0; @@ -1938,28 +1940,6 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void) return &app_sanitizer; } -void app_end_message_handler(struct stasis_message *message) -{ - struct ast_channel_blob *payload; - struct ast_channel_snapshot *snapshot; - const char *app_name; - char *channel_uri; - size_t alloc_size; - const char *channels[1]; - - payload = stasis_message_data(message); - snapshot = payload->snapshot; - app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app")); - - /* +8 is for the length of "channel:" */ - alloc_size = AST_MAX_UNIQUEID + 8; - channel_uri = ast_alloca(alloc_size); - snprintf(channel_uri, alloc_size, "channel:%s", snapshot->uniqueid); - - channels[0] = channel_uri; - stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL); -} - static const struct ast_datastore_info stasis_internal_channel_info = { .type = "stasis-internal-channel", }; @@ -2033,7 +2013,7 @@ static int load_module(void) if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) { return AST_MODULE_LOAD_DECLINE; } - if (STASIS_MESSAGE_TYPE_INIT(app_end_message_type) != 0) { + if (STASIS_MESSAGE_TYPE_INIT(end_message_type) != 0) { return AST_MODULE_LOAD_DECLINE; } apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare); diff --git a/res/stasis/app.c b/res/stasis/app.c index 07e273fa7..725414561 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -302,10 +302,6 @@ static void sub_default_handler(void *data, struct stasis_subscription *sub, call_forwarded_handler(app, message); } - if (stasis_message_type(message) == app_end_message_type()) { - app_end_message_handler(message); - } - /* By default, send any message that has a JSON representation */ json = stasis_message_to_json(message, stasis_app_get_sanitizer()); if (!json) { @@ -1128,30 +1124,6 @@ int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id) return forwards != NULL; } -int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan) -{ - RAII_VAR(struct app_forwards *, old_forwards, NULL, ao2_cleanup); - struct app_forwards *new_forwards; - - old_forwards = ao2_find(app->forwards, old_id, OBJ_SEARCH_KEY | OBJ_UNLINK); - if (!old_forwards) { - return -1; - } - - new_forwards = forwards_create_channel(app, new_chan); - if (!new_forwards) { - return -1; - } - - new_forwards->interested = old_forwards->interested; - ao2_link_flags(app->forwards, new_forwards, 0); - ao2_cleanup(new_forwards); - - /* Clean up old forwards */ - forwards_unsubscribe(old_forwards); - return 0; -} - static void *channel_find(const struct stasis_app *app, const char *id) { return ast_channel_get_by_name(id); diff --git a/res/stasis/app.h b/res/stasis/app.h index 63143e026..59574f584 100644 --- a/res/stasis/app.h +++ b/res/stasis/app.h @@ -258,18 +258,6 @@ int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_ap */ char *app_get_replace_channel_app(struct ast_channel *chan); -/*! - * \brief Replace channel topic forwards for the old channel with forwards for the new channel - * - * \param app The app that owns the channel - * \param old_id The unique ID of the channel to be replaced - * \param new_chan The channel that is replacing the old one - * - * \retval zero on success - * \return non-zero on failure - */ -int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan); - /*! * \brief Send StasisEnd message to the listening app * @@ -281,16 +269,4 @@ int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, str */ int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan); -/*! - * \brief Handle cleanup related to StasisEnd messages - * - * \param message The message for which to clean up - */ -void app_end_message_handler(struct stasis_message *message); - -/*! - * \brief Accessor for the StasisEnd message type - */ -struct stasis_message_type *app_end_message_type(void); - #endif /* _ASTERISK_RES_STASIS_APP_H */ diff --git a/res/stasis/stasis_bridge.c b/res/stasis/stasis_bridge.c index fd984ab86..646b3062a 100644 --- a/res/stasis/stasis_bridge.c +++ b/res/stasis/stasis_bridge.c @@ -175,6 +175,7 @@ static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void return -1; } + stasis_app_channel_set_stasis_end_published(chan); app_send_end_msg(control_app(control), chan); } -- cgit v1.2.3