From 5013659972a4678ca7f6f64512ee9d5fc7172882 Mon Sep 17 00:00:00 2001 From: Kinsey Moore Date: Thu, 13 Nov 2014 15:44:28 +0000 Subject: Stasis: Fix StasisEnd message ordering This change corrects message ordering in cases where a channel-related message can be received after a Stasis/ARI application has received the StasisEnd message. The StasisEnd message was being passed to applications directly without waiting for the channel topic to empty. As a result of this fix, other bugs were also identified and fixed: * StasisStart messages were also being sent directly to apps and are now routed through the stasis message bus properly * Masquerade monitor datastores were being removed at the incorrect time in some cases and were causing StasisEnd messages to not be sent * General refactoring where necessary for the above * Unsubscription on StasisEnd timing changes to prevent additional messages from following the StasisEnd when they shouldn't A channel sanitization function pointer was added to reduce processing and AO2 lookups. Review: https://reviewboard.asterisk.org/r/4163/ ASTERISK-24501 #close Reported by: Matt Jordan ........ Merged revisions 427788 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/13@427789 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- res/res_stasis.c | 254 ++++++++++++++++++++++----------------------- res/stasis/app.c | 4 + res/stasis/app.h | 23 ++++ res/stasis/stasis_bridge.c | 7 +- 4 files changed, 155 insertions(+), 133 deletions(-) (limited to 'res') diff --git a/res/res_stasis.c b/res/res_stasis.c index 99a3aed93..18d5963be 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -109,30 +109,68 @@ struct ao2_container *app_bridges_moh; struct ao2_container *app_bridges_playback; -static struct ast_json *stasis_end_json_payload(struct ast_channel_snapshot *snapshot, +static struct ast_json *stasis_end_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) { + struct ast_channel_blob *payload = stasis_message_data(message); + + if (sanitize && sanitize->channel_snapshot && + sanitize->channel_snapshot(payload->snapshot)) { + return NULL; + } + return ast_json_pack("{s: s, s: o, s: o}", "type", "StasisEnd", "timestamp", ast_json_timeval(ast_tvnow(), NULL), - "channel", ast_channel_snapshot_to_json(snapshot, sanitize)); + "channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize)); } -static struct ast_json *stasis_end_to_json(struct stasis_message *message, +STASIS_MESSAGE_TYPE_DEFN(app_end_message_type, + .to_json = stasis_end_to_json); + +struct start_message_blob { + struct ast_channel_snapshot *channel; /*!< Channel that is entering Stasis() */ + struct ast_channel_snapshot *replace_channel; /*!< Channel that is being replaced (optional) */ + struct ast_json *blob; /*!< JSON blob containing timestamp and args */ +}; + +static struct ast_json *stasis_start_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) { - struct ast_channel_blob *payload = stasis_message_data(message); + struct start_message_blob *payload = stasis_message_data(message); + struct ast_json *msg; if (sanitize && sanitize->channel_snapshot && - sanitize->channel_snapshot(payload->snapshot)) { + sanitize->channel_snapshot(payload->channel)) { return NULL; } - return stasis_end_json_payload(payload->snapshot, sanitize); + msg = ast_json_pack("{s: s, s: O, s: O, s: o}", + "type", "StasisStart", + "timestamp", ast_json_object_get(payload->blob, "timestamp"), + "args", ast_json_object_get(payload->blob, "args"), + "channel", ast_channel_snapshot_to_json(payload->channel, NULL)); + if (!msg) { + ast_log(LOG_ERROR, "Failed to pack JSON for StasisStart message\n"); + return NULL; + } + + if (payload->replace_channel) { + int res = ast_json_object_set(msg, "replace_channel", + ast_channel_snapshot_to_json(payload->replace_channel, NULL)); + + if (res) { + ast_json_unref(msg); + ast_log(LOG_ERROR, "Failed to append JSON for StasisStart message\n"); + return NULL; + } + } + + return msg; } -STASIS_MESSAGE_TYPE_DEFN(ast_stasis_end_message_type, - .to_json = stasis_end_to_json); +STASIS_MESSAGE_TYPE_DEFN_LOCAL(start_message_type, + .to_json = stasis_start_to_json); const char *stasis_app_name(const struct stasis_app *app) { @@ -862,51 +900,64 @@ char *app_get_replace_channel_app(struct ast_channel *chan) return replace_channel_app; } -static int send_start_msg_snapshots(struct stasis_app *app, +static void start_message_blob_dtor(void *obj) +{ + struct start_message_blob *payload = obj; + + ao2_cleanup(payload->channel); + ao2_cleanup(payload->replace_channel); + ast_json_unref(payload->blob); +} + +static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app *app, int argc, char *argv[], struct ast_channel_snapshot *snapshot, struct ast_channel_snapshot *replace_channel_snapshot) { - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); + RAII_VAR(struct ast_json *, json_blob, NULL, ast_json_unref); struct ast_json *json_args; - struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); + RAII_VAR(struct start_message_blob *, payload, NULL, ao2_cleanup); + struct stasis_message *msg; int i; - if (sanitize && sanitize->channel_snapshot - && sanitize->channel_snapshot(snapshot)) { - return 0; - } - - msg = ast_json_pack("{s: s, s: o, s: [], s: o}", - "type", "StasisStart", - "timestamp", ast_json_timeval(ast_tvnow(), NULL), - "args", - "channel", ast_channel_snapshot_to_json(snapshot, NULL)); - if (!msg) { + payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor); + if (!payload) { + ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n"); return -1; } - if (replace_channel_snapshot) { - int res = ast_json_object_set(msg, "replace_channel", - ast_channel_snapshot_to_json(replace_channel_snapshot, NULL)); + payload->channel = ao2_bump(snapshot); + payload->replace_channel = ao2_bump(replace_channel_snapshot); - if (res) { - return -1; - } + json_blob = ast_json_pack("{s: o, s: []}", + "timestamp", ast_json_timeval(ast_tvnow(), NULL), + "args"); + if (!json_blob) { + ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n"); + return -1; } /* Append arguments to args array */ - json_args = ast_json_object_get(msg, "args"); + json_args = ast_json_object_get(json_blob, "args"); ast_assert(json_args != NULL); for (i = 0; i < argc; ++i) { int r = ast_json_array_append(json_args, ast_json_string_create(argv[i])); if (r != 0) { - ast_log(LOG_ERROR, "Error appending start message\n"); + ast_log(LOG_ERROR, "Error appending to StasisStart message\n"); return -1; } } - app_send(app, msg); + payload->blob = ast_json_ref(json_blob); + + msg = stasis_message_create(start_message_type(), payload); + if (!msg) { + ast_log(LOG_ERROR, "Error sending StasisStart message\n"); + return -1; + } + + stasis_publish(ast_channel_topic(chan), msg); + ao2_ref(msg, -1); return 0; } @@ -928,31 +979,36 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan, if (!snapshot) { return -1; } - return send_start_msg_snapshots(app, argc, argv, snapshot, replace_channel_snapshot); + return send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot); } -static int send_end_msg_snapshot(struct stasis_app *app, struct ast_channel_snapshot *snapshot) +static void remove_masquerade_store(struct ast_channel *chan); + +int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan) { struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); - struct ast_json *msg; + struct ast_json *blob; - if (sanitize && sanitize->channel_snapshot - && sanitize->channel_snapshot(snapshot)) { + if (sanitize && sanitize->channel + && sanitize->channel(chan)) { return 0; } - msg = stasis_end_json_payload(snapshot, sanitize); - if (!msg) { + blob = ast_json_pack("{s: s}", "app", app_name(app)); + if (!blob) { + ast_log(LOG_ERROR, "Error packing JSON for StasisEnd message\n"); return -1; } - app_send(app, msg); - ast_json_unref(msg); + stasis_app_channel_set_stasis_end_published(chan); + remove_masquerade_store(chan); + ast_channel_publish_blob(chan, app_end_message_type(), blob); + + ast_json_unref(blob); + return 0; } -static void remove_masquerade_store(struct ast_channel *chan); - static int masq_match_cb(void *obj, void *data, int flags) { struct stasis_app_control *control = obj; @@ -968,32 +1024,22 @@ static int masq_match_cb(void *obj, void *data, int flags) static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan) { - struct ast_channel_snapshot *snapshot; struct stasis_app_control *control; - /* grab a snapshot */ - snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan)); - if (!snapshot) { - ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n"); - return; - } - /* find control */ control = ao2_callback(app_controls, 0, masq_match_cb, old_chan); if (!control) { ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n"); - ao2_cleanup(snapshot); return; } /* send the StasisEnd message to the app */ - send_end_msg_snapshot(control_app(control), snapshot); + app_send_end_msg(control_app(control), new_chan); /* remove the datastore */ remove_masquerade_store(old_chan); ao2_cleanup(control); - ao2_cleanup(snapshot); } static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan) @@ -1032,10 +1078,10 @@ static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct /* send the StasisStart with replace_channel to the app */ - send_start_msg_snapshots(control_app(control), 0, NULL, new_snapshot, + send_start_msg_snapshots(new_chan, control_app(control), 0, NULL, new_snapshot, old_snapshot); /* send the StasisEnd message to the app */ - send_end_msg_snapshot(control_app(control), old_snapshot); + app_send_end_msg(control_app(control), old_chan); /* fixup channel topic forwards */ if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) { @@ -1090,33 +1136,6 @@ static void remove_masquerade_store(struct ast_channel *chan) ast_datastore_free(datastore); } -static int send_end_msg(struct stasis_app *app, struct ast_channel *chan) -{ - struct ast_channel_snapshot *snapshot; - int res = 0; - - ast_assert(chan != NULL); - - /* A masquerade has occurred and this message will be wrong so it - * has already been sent elsewhere. */ - if (!has_masquerade_store(chan)) { - return 0; - } - - /* Set channel info */ - snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan)); - if (!snapshot) { - return -1; - } - - if (send_end_msg_snapshot(app, snapshot)) { - res = -1; - } - - ao2_cleanup(snapshot); - return res; -} - void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control) { while (!control_is_done(control)) { @@ -1232,18 +1251,18 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, return -1; } - res = send_start_msg(app, chan, argc, argv); + res = app_subscribe_channel(app, chan); if (res != 0) { - ast_log(LOG_ERROR, - "Error sending start message to '%s'\n", app_name); + ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", + app_name, ast_channel_name(chan)); remove_masquerade_store(chan); return -1; } - res = app_subscribe_channel(app, chan); + res = send_start_msg(app, chan, argc, argv); if (res != 0) { - ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", - app_name, ast_channel_name(chan)); + ast_log(LOG_ERROR, + "Error sending start message to '%s'\n", app_name); remove_masquerade_store(chan); return -1; } @@ -1327,9 +1346,9 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, /* Only publish a stasis_end event if it hasn't already been published */ if (!stasis_app_channel_is_stasis_end_published(chan)) { - app_unsubscribe_channel(app, chan); - res = send_end_msg(app, chan); - remove_masquerade_store(chan); + /* A masquerade has occurred and this message will be wrong so it + * has already been sent elsewhere. */ + res = has_masquerade_store(chan) && app_send_end_msg(app, chan); if (res != 0) { ast_log(LOG_ERROR, "Error sending end message to %s\n", app_name); @@ -1853,15 +1872,8 @@ void stasis_app_unref(void) ast_module_unref(ast_module_info->self); } -/*! - * \brief Subscription to StasisEnd events - */ -struct stasis_subscription *stasis_end_sub; - static int unload_module(void) { - stasis_end_sub = stasis_unsubscribe(stasis_end_sub); - stasis_app_unregister_event_sources(); messaging_cleanup(); @@ -1882,7 +1894,8 @@ static int unload_module(void) ao2_cleanup(app_bridges_playback); app_bridges_playback = NULL; - STASIS_MESSAGE_TYPE_CLEANUP(ast_stasis_end_message_type); + STASIS_MESSAGE_TYPE_CLEANUP(app_end_message_type); + STASIS_MESSAGE_TYPE_CLEANUP(start_message_type); return 0; } @@ -1896,6 +1909,15 @@ static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapsho return 1; } +/* \brief Sanitization callback for channels */ +static int channel_sanitizer(const struct ast_channel *chan) +{ + if (!chan || !(ast_channel_tech(chan)->properties & AST_CHAN_TP_INTERNAL)) { + return 0; + } + return 1; +} + /* \brief Sanitization callback for channel unique IDs */ static int channel_id_sanitizer(const char *id) { @@ -1908,6 +1930,7 @@ static int channel_id_sanitizer(const char *id) struct stasis_message_sanitizer app_sanitizer = { .channel_id = channel_id_sanitizer, .channel_snapshot = channel_snapshot_sanitizer, + .channel = channel_sanitizer, }; struct stasis_message_sanitizer *stasis_app_get_sanitizer(void) @@ -1915,21 +1938,7 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void) return &app_sanitizer; } -static void remove_masquerade_store_by_name(const char *channel_name) -{ - struct ast_channel *chan; - - chan = ast_channel_get_by_name(channel_name); - if (!chan) { - return; - } - - remove_masquerade_store(chan); - ast_channel_unref(chan); -} - -static void check_for_stasis_end(void *data, struct stasis_subscription *sub, - struct stasis_message *message) +void app_end_message_handler(struct stasis_message *message) { struct ast_channel_blob *payload; struct ast_channel_snapshot *snapshot; @@ -1938,10 +1947,6 @@ static void check_for_stasis_end(void *data, struct stasis_subscription *sub, size_t alloc_size; const char *channels[1]; - if (stasis_message_type(message) != ast_stasis_end_message_type()) { - return; - } - payload = stasis_message_data(message); snapshot = payload->snapshot; app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app")); @@ -1953,8 +1958,6 @@ static void check_for_stasis_end(void *data, struct stasis_subscription *sub, channels[0] = channel_uri; stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL); - - remove_masquerade_store_by_name(snapshot->name); } static const struct ast_datastore_info stasis_internal_channel_info = { @@ -2027,7 +2030,10 @@ int stasis_app_channel_is_internal(struct ast_channel *chan) static int load_module(void) { - if (STASIS_MESSAGE_TYPE_INIT(ast_stasis_end_message_type) != 0) { + if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) { + return AST_MODULE_LOAD_DECLINE; + } + if (STASIS_MESSAGE_TYPE_INIT(app_end_message_type) != 0) { return AST_MODULE_LOAD_DECLINE; } apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare); @@ -2053,12 +2059,6 @@ static int load_module(void) stasis_app_register_event_sources(); - stasis_end_sub = stasis_subscribe(ast_channel_topic_all(), check_for_stasis_end, NULL); - if (!stasis_end_sub) { - unload_module(); - return AST_MODULE_LOAD_DECLINE; - } - return AST_MODULE_LOAD_SUCCESS; } diff --git a/res/stasis/app.c b/res/stasis/app.c index 9440cf1bd..cda1c045d 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -302,6 +302,10 @@ static void sub_default_handler(void *data, struct stasis_subscription *sub, call_forwarded_handler(app, message); } + if (stasis_message_type(message) == app_end_message_type()) { + app_end_message_handler(message); + } + /* By default, send any message that has a JSON representation */ json = stasis_message_to_json(message, stasis_app_get_sanitizer()); if (!json) { diff --git a/res/stasis/app.h b/res/stasis/app.h index 1ab6097a7..63143e026 100644 --- a/res/stasis/app.h +++ b/res/stasis/app.h @@ -270,4 +270,27 @@ char *app_get_replace_channel_app(struct ast_channel *chan); */ int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan); +/*! + * \brief Send StasisEnd message to the listening app + * + * \param app The app that owns the channel + * \param chan The channel for which the message is being sent + * + * \retval zero on success + * \return non-zero on failure + */ +int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan); + +/*! + * \brief Handle cleanup related to StasisEnd messages + * + * \param message The message for which to clean up + */ +void app_end_message_handler(struct stasis_message *message); + +/*! + * \brief Accessor for the StasisEnd message type + */ +struct stasis_message_type *app_end_message_type(void); + #endif /* _ASTERISK_RES_STASIS_APP_H */ diff --git a/res/stasis/stasis_bridge.c b/res/stasis/stasis_bridge.c index 7229a87d5..fd984ab86 100644 --- a/res/stasis/stasis_bridge.c +++ b/res/stasis/stasis_bridge.c @@ -164,7 +164,6 @@ static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void { if (src->v_table == &bridge_stasis_v_table && dst->v_table != &bridge_stasis_v_table) { - RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup); struct ast_channel *chan; @@ -176,11 +175,7 @@ static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void return -1; } - blob = ast_json_pack("{s: s}", "app", app_name(control_app(control))); - - stasis_app_channel_set_stasis_end_published(chan); - - ast_channel_publish_blob(chan, ast_stasis_end_message_type(), blob); + app_send_end_msg(control_app(control), chan); } return -1; -- cgit v1.2.3