diff options
Diffstat (limited to 'res')
-rw-r--r-- | res/ari/ari_model_validators.c | 45 | ||||
-rw-r--r-- | res/ari/ari_model_validators.h | 5 | ||||
-rw-r--r-- | res/res_stasis.c | 478 | ||||
-rw-r--r-- | res/stasis/app.c | 61 | ||||
-rw-r--r-- | res/stasis/app.h | 44 | ||||
-rw-r--r-- | res/stasis/command.c | 58 | ||||
-rw-r--r-- | res/stasis/command.h | 27 | ||||
-rw-r--r-- | res/stasis/control.c | 43 | ||||
-rw-r--r-- | res/stasis/control.h | 31 | ||||
-rw-r--r-- | res/stasis/stasis_bridge.c | 111 |
10 files changed, 864 insertions, 39 deletions
diff --git a/res/ari/ari_model_validators.c b/res/ari/ari_model_validators.c index 10fd3bd83..be1a244df 100644 --- a/res/ari/ari_model_validators.c +++ b/res/ari/ari_model_validators.c @@ -1845,6 +1845,15 @@ int ast_ari_validate_bridge_attended_transfer(struct ast_json *json) res = 0; } } else + if (strcmp("replace_channel", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_channel( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI BridgeAttendedTransfer field replace_channel failed validation\n"); + res = 0; + } + } else if (strcmp("result", ast_json_object_iter_key(iter)) == 0) { int prop_is_valid; has_result = 1; @@ -1855,6 +1864,24 @@ int ast_ari_validate_bridge_attended_transfer(struct ast_json *json) res = 0; } } else + if (strcmp("transfer_target", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_channel( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI BridgeAttendedTransfer field transfer_target failed validation\n"); + res = 0; + } + } else + if (strcmp("transferee", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_channel( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI BridgeAttendedTransfer field transferee failed validation\n"); + res = 0; + } + } else if (strcmp("transferer_first_leg", ast_json_object_iter_key(iter)) == 0) { int prop_is_valid; has_transferer_first_leg = 1; @@ -2045,6 +2072,15 @@ int ast_ari_validate_bridge_blind_transfer(struct ast_json *json) res = 0; } } else + if (strcmp("transferee", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_channel( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI BridgeBlindTransfer field transferee failed validation\n"); + res = 0; + } + } else { ast_log(LOG_ERROR, "ARI BridgeBlindTransfer has undocumented field %s\n", @@ -4828,6 +4864,15 @@ int ast_ari_validate_stasis_start(struct ast_json *json) res = 0; } } else + if (strcmp("replace_channel", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_channel( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI StasisStart field replace_channel failed validation\n"); + res = 0; + } + } else { ast_log(LOG_ERROR, "ARI StasisStart has undocumented field %s\n", diff --git a/res/ari/ari_model_validators.h b/res/ari/ari_model_validators.h index beace67b2..64dd1b071 100644 --- a/res/ari/ari_model_validators.h +++ b/res/ari/ari_model_validators.h @@ -1287,7 +1287,10 @@ ari_validator ast_ari_validate_application_fn(void); * - destination_threeway_channel: Channel * - destination_type: string (required) * - is_external: boolean (required) + * - replace_channel: Channel * - result: string (required) + * - transfer_target: Channel + * - transferee: Channel * - transferer_first_leg: Channel (required) * - transferer_first_leg_bridge: Bridge * - transferer_second_leg: Channel (required) @@ -1302,6 +1305,7 @@ ari_validator ast_ari_validate_application_fn(void); * - exten: string (required) * - is_external: boolean (required) * - result: string (required) + * - transferee: Channel * BridgeCreated * - type: string (required) * - application: string (required) @@ -1467,6 +1471,7 @@ ari_validator ast_ari_validate_application_fn(void); * - timestamp: Date * - args: List[string] (required) * - channel: Channel (required) + * - replace_channel: Channel * TextMessageReceived * - type: string (required) * - application: string (required) diff --git a/res/res_stasis.c b/res/res_stasis.c index 7b5d16f1a..3480c9e23 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -109,6 +109,31 @@ struct ao2_container *app_bridges_moh; struct ao2_container *app_bridges_playback; +static struct ast_json *stasis_end_json_payload(struct ast_channel_snapshot *snapshot, + const struct stasis_message_sanitizer *sanitize) +{ + return ast_json_pack("{s: s, s: o, s: o}", + "type", "StasisEnd", + "timestamp", ast_json_timeval(ast_tvnow(), NULL), + "channel", ast_channel_snapshot_to_json(snapshot, sanitize)); +} + +static struct ast_json *stasis_end_to_json(struct stasis_message *message, + const struct stasis_message_sanitizer *sanitize) +{ + struct ast_channel_blob *payload = stasis_message_data(message); + + if (sanitize && sanitize->channel_snapshot && + sanitize->channel_snapshot(payload->snapshot)) { + return NULL; + } + + return stasis_end_json_payload(payload->snapshot, sanitize); +} + +STASIS_MESSAGE_TYPE_DEFN(ast_stasis_end_message_type, + .to_json = stasis_end_to_json); + const char *stasis_app_name(const struct stasis_app *app) { return app_name(app); @@ -726,26 +751,121 @@ void stasis_app_bridge_destroy(const char *bridge_id) ast_bridge_destroy(bridge, 0); } -static int send_start_msg(struct stasis_app *app, struct ast_channel *chan, - int argc, char *argv[]) +struct replace_channel_store { + struct ast_channel_snapshot *snapshot; + char *app; +}; + +static void replace_channel_destroy(void *obj) { - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + struct replace_channel_store *replace = obj; - struct ast_json *json_args; - int i; - struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); + ao2_cleanup(replace->snapshot); + ast_free(replace->app); + ast_free(replace); +} - ast_assert(chan != NULL); +static const struct ast_datastore_info replace_channel_store_info = { + .type = "replace-channel-store", + .destroy = replace_channel_destroy, +}; - /* Set channel info */ - ast_channel_lock(chan); - snapshot = ast_channel_snapshot_create(chan); - ast_channel_unlock(chan); - if (!snapshot) { +static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create) +{ + struct ast_datastore *datastore; + + SCOPED_CHANNELLOCK(lock, chan); + datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL); + if (!datastore) { + if (no_create) { + return NULL; + } + + datastore = ast_datastore_alloc(&replace_channel_store_info, NULL); + if (!datastore) { + return NULL; + } + ast_channel_datastore_add(chan, datastore); + } + + if (!datastore->data) { + datastore->data = ast_calloc(1, sizeof(struct replace_channel_store)); + } + return datastore->data; +} + +int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot) +{ + struct replace_channel_store *replace = get_replace_channel_store(chan, 0); + + if (!replace) { return -1; } + ao2_replace(replace->snapshot, replace_snapshot); + return 0; +} + +int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_app) +{ + struct replace_channel_store *replace = get_replace_channel_store(chan, 0); + + if (!replace) { + return -1; + } + + ast_free(replace->app); + replace->app = NULL; + + if (replace_app) { + replace->app = ast_strdup(replace_app); + if (!replace->app) { + return -1; + } + } + + return 0; +} + +static struct ast_channel_snapshot *get_replace_channel_snapshot(struct ast_channel *chan) +{ + struct replace_channel_store *replace = get_replace_channel_store(chan, 1); + struct ast_channel_snapshot *replace_channel_snapshot; + + if (!replace) { + return NULL; + } + + replace_channel_snapshot = replace->snapshot; + replace->snapshot = NULL; + + return replace_channel_snapshot; +} + +char *app_get_replace_channel_app(struct ast_channel *chan) +{ + struct replace_channel_store *replace = get_replace_channel_store(chan, 1); + char *replace_channel_app; + + if (!replace) { + return NULL; + } + + replace_channel_app = replace->app; + replace->app = NULL; + + return replace_channel_app; +} + +static int send_start_msg_snapshots(struct stasis_app *app, + int argc, char *argv[], struct ast_channel_snapshot *snapshot, + struct ast_channel_snapshot *replace_channel_snapshot) +{ + RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); + struct ast_json *json_args; + struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); + int i; + if (sanitize && sanitize->channel_snapshot && sanitize->channel_snapshot(snapshot)) { return 0; @@ -760,6 +880,15 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan, return -1; } + if (replace_channel_snapshot) { + int res = ast_json_object_set(msg, "replace_channel", + ast_channel_snapshot_to_json(replace_channel_snapshot, NULL)); + + if (res) { + return -1; + } + } + /* Append arguments to args array */ json_args = ast_json_object_get(msg, "args"); ast_assert(json_args != NULL); @@ -776,39 +905,213 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan, return 0; } -static int send_end_msg(struct stasis_app *app, struct ast_channel *chan) +static int send_start_msg(struct stasis_app *app, struct ast_channel *chan, + int argc, char *argv[]) { - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); - struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); + RAII_VAR(struct ast_channel_snapshot *, replace_channel_snapshot, + NULL, ao2_cleanup); ast_assert(chan != NULL); + replace_channel_snapshot = get_replace_channel_snapshot(chan); + /* Set channel info */ ast_channel_lock(chan); snapshot = ast_channel_snapshot_create(chan); ast_channel_unlock(chan); - if (snapshot == NULL) { + if (!snapshot) { return -1; } + return send_start_msg_snapshots(app, argc, argv, snapshot, replace_channel_snapshot); +} + +static int send_end_msg_snapshot(struct stasis_app *app, struct ast_channel_snapshot *snapshot) +{ + struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); + struct ast_json *msg; if (sanitize && sanitize->channel_snapshot && sanitize->channel_snapshot(snapshot)) { return 0; } - msg = ast_json_pack("{s: s, s: o, s: o}", - "type", "StasisEnd", - "timestamp", ast_json_timeval(ast_tvnow(), NULL), - "channel", ast_channel_snapshot_to_json(snapshot, NULL)); + msg = stasis_end_json_payload(snapshot, sanitize); if (!msg) { return -1; } app_send(app, msg); + ast_json_unref(msg); + return 0; +} + +static void remove_masquerade_store(struct ast_channel *chan); + +static int masq_match_cb(void *obj, void *data, int flags) +{ + struct stasis_app_control *control = obj; + struct ast_channel *chan = data; + + if (!strcmp(ast_channel_uniqueid(chan), + stasis_app_control_get_channel_id(control))) { + return CMP_MATCH; + } + + return 0; +} + +static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan) +{ + struct ast_channel_snapshot *snapshot; + struct stasis_app_control *control; + + /* grab a snapshot */ + snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan)); + if (!snapshot) { + ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n"); + return; + } + + /* find control */ + control = ao2_callback(app_controls, 0, masq_match_cb, old_chan); + if (!control) { + ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n"); + ao2_cleanup(snapshot); + return; + } + + /* send the StasisEnd message to the app */ + send_end_msg_snapshot(control_app(control), snapshot); + + /* remove the datastore */ + remove_masquerade_store(old_chan); + + ao2_cleanup(control); + ao2_cleanup(snapshot); +} + +static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan) +{ + RAII_VAR(struct ast_channel_snapshot *, new_snapshot, NULL, ao2_cleanup); + RAII_VAR(struct ast_channel_snapshot *, old_snapshot, NULL, ao2_cleanup); + struct stasis_app_control *control; + + /* At this point, new_chan is the channel pointer that is in Stasis() and + * has the unknown channel's name in it while old_chan is the channel pointer + * that is not in Stasis(), but has the guts of the channel that Stasis() knows + * about */ + + /* grab a snapshot for the channel that is jumping into Stasis() */ + new_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan)); + if (!new_snapshot) { + ast_log(LOG_ERROR, "Could not get snapshot for masquerading channel\n"); + return; + } + + /* grab a snapshot for the channel that has been kicked out of Stasis() */ + old_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(old_chan)); + if (!old_snapshot) { + ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n"); + return; + } + + /* find, unlink, and relink control since the channel has a new name and + * its hash has likely changed */ + control = ao2_callback(app_controls, OBJ_UNLINK, masq_match_cb, new_chan); + if (!control) { + ast_log(LOG_ERROR, "Could not find control for masquerading channel\n"); + return; + } + ao2_link(app_controls, control); + + + /* send the StasisStart with replace_channel to the app */ + send_start_msg_snapshots(control_app(control), 0, NULL, new_snapshot, + old_snapshot); + /* send the StasisEnd message to the app */ + send_end_msg_snapshot(control_app(control), old_snapshot); + + /* fixup channel topic forwards */ + if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) { + ast_log(LOG_ERROR, "Failed to fixup channel topic forwards for %s(%s) owned by %s\n", + old_snapshot->name, old_snapshot->uniqueid, app_name(control_app(control))); + } + ao2_cleanup(control); +} + +static const struct ast_datastore_info masquerade_store_info = { + .type = "stasis-masqerade", + .chan_fixup = channel_stolen_cb, + .chan_breakdown = channel_replaced_cb, +}; + +static int has_masquerade_store(struct ast_channel *chan) +{ + SCOPED_CHANNELLOCK(lock, chan); + return !!ast_channel_datastore_find(chan, &masquerade_store_info, NULL); +} + +static int add_masquerade_store(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + + SCOPED_CHANNELLOCK(lock, chan); + if (ast_channel_datastore_find(chan, &masquerade_store_info, NULL)) { + return 0; + } + + datastore = ast_datastore_alloc(&masquerade_store_info, NULL); + if (!datastore) { + return -1; + } + + ast_channel_datastore_add(chan, datastore); + return 0; } +static void remove_masquerade_store(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + + SCOPED_CHANNELLOCK(lock, chan); + datastore = ast_channel_datastore_find(chan, &masquerade_store_info, NULL); + if (!datastore) { + return; + } + + ast_channel_datastore_remove(chan, datastore); + ast_datastore_free(datastore); +} + +static int send_end_msg(struct stasis_app *app, struct ast_channel *chan) +{ + struct ast_channel_snapshot *snapshot; + int res = 0; + + ast_assert(chan != NULL); + + /* A masquerade has occurred and this message will be wrong so it + * has already been sent elsewhere. */ + if (!has_masquerade_store(chan)) { + return 0; + } + + /* Set channel info */ + snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan)); + if (!snapshot) { + return -1; + } + + if (send_end_msg_snapshot(app, snapshot)) { + res = -1; + } + + ao2_cleanup(snapshot); + return res; +} + void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control) { while (!control_is_done(control)) { @@ -837,6 +1140,46 @@ int stasis_app_control_is_done(struct stasis_app_control *control) return control_is_done(control); } +struct ast_datastore_info set_end_published_info = { + .type = "stasis_end_published", +}; + +void stasis_app_channel_set_stasis_end_published(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + + datastore = ast_datastore_alloc(&set_end_published_info, NULL); + + ast_channel_lock(chan); + ast_channel_datastore_add(chan, datastore); + ast_channel_unlock(chan); +} + +int stasis_app_channel_is_stasis_end_published(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + + ast_channel_lock(chan); + datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL); + ast_channel_unlock(chan); + + return datastore ? 1 : 0; +} + +static void remove_stasis_end_published(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + + ast_channel_lock(chan); + datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL); + ast_channel_unlock(chan); + + if (datastore) { + ast_channel_datastore_remove(chan, datastore); + ast_datastore_free(datastore); + } +} + /*! /brief Stasis dialplan application callback */ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, char *argv[]) @@ -850,6 +1193,11 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, ast_assert(chan != NULL); + /* Just in case there's a lingering indication that the channel has had a stasis + * end published on it, remove that now. + */ + remove_stasis_end_published(chan); + app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY); if (!app) { ast_log(LOG_ERROR, @@ -869,10 +1217,16 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, } ao2_link(app_controls, control); + if (add_masquerade_store(chan)) { + ast_log(LOG_ERROR, "Failed to attach masquerade detector\n"); + return -1; + } + res = send_start_msg(app, chan, argc, argv); if (res != 0) { ast_log(LOG_ERROR, "Error sending start message to '%s'\n", app_name); + remove_masquerade_store(chan); return -1; } @@ -880,9 +1234,13 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, if (res != 0) { ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", app_name, ast_channel_name(chan)); + remove_masquerade_store(chan); return -1; } + /* Pull queued prestart commands and execute */ + control_prestart_dispatch_all(control, chan); + while (!control_is_done(control)) { RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor); int r; @@ -948,14 +1306,20 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, } app_unsubscribe_bridge(app, stasis_app_get_bridge(control)); - app_unsubscribe_channel(app, chan); ao2_cleanup(bridge); - res = send_end_msg(app, chan); - if (res != 0) { - ast_log(LOG_ERROR, - "Error sending end message to %s\n", app_name); - return res; + /* Only publish a stasis_end event if it hasn't already been published */ + if (!stasis_app_channel_is_stasis_end_published(chan)) { + app_unsubscribe_channel(app, chan); + res = send_end_msg(app, chan); + remove_masquerade_store(chan); + if (res != 0) { + ast_log(LOG_ERROR, + "Error sending end message to %s\n", app_name); + return res; + } + } else { + remove_stasis_end_published(chan); } /* There's an off chance that app is ready for cleanup. Go ahead @@ -1434,8 +1798,15 @@ void stasis_app_unref(void) ast_module_unref(ast_module_info->self); } +/*! + * \brief Subscription to StasisEnd events + */ +struct stasis_subscription *stasis_end_sub; + static int unload_module(void) { + stasis_end_sub = stasis_unsubscribe(stasis_end_sub); + stasis_app_unregister_event_sources(); messaging_cleanup(); @@ -1455,6 +1826,8 @@ static int unload_module(void) ao2_cleanup(app_bridges_playback); app_bridges_playback = NULL; + STASIS_MESSAGE_TYPE_CLEANUP(ast_stasis_end_message_type); + return 0; } @@ -1486,8 +1859,53 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void) return &app_sanitizer; } +static void remove_masquerade_store_by_name(const char *channel_name) +{ + struct ast_channel *chan; + + chan = ast_channel_get_by_name(channel_name); + if (!chan) { + return; + } + + remove_masquerade_store(chan); + ast_channel_unref(chan); +} + +static void check_for_stasis_end(void *data, struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct ast_channel_blob *payload; + struct ast_channel_snapshot *snapshot; + const char *app_name; + char *channel_uri; + size_t alloc_size; + const char *channels[1]; + + if (stasis_message_type(message) != ast_stasis_end_message_type()) { + return; + } + + payload = stasis_message_data(message); + snapshot = payload->snapshot; + app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app")); + + /* +8 is for the length of "channel:" */ + alloc_size = AST_MAX_UNIQUEID + 8; + channel_uri = ast_alloca(alloc_size); + snprintf(channel_uri, alloc_size, "channel:%s", snapshot->uniqueid); + + channels[0] = channel_uri; + stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL); + + remove_masquerade_store_by_name(snapshot->name); +} + static int load_module(void) { + if (STASIS_MESSAGE_TYPE_INIT(ast_stasis_end_message_type) != 0) { + return AST_MODULE_LOAD_DECLINE; + } apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare); app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare); app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare); @@ -1511,6 +1929,12 @@ static int load_module(void) stasis_app_register_event_sources(); + stasis_end_sub = stasis_subscribe(ast_channel_topic_all(), check_for_stasis_end, NULL); + if (!stasis_end_sub) { + unload_module(); + return AST_MODULE_LOAD_DECLINE; + } + return AST_MODULE_LOAD_SUCCESS; } diff --git a/res/stasis/app.c b/res/stasis/app.c index 7e7911b9c..745969615 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -28,6 +28,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "app.h" +#include "control.h" #include "messaging.h" #include "asterisk/callerid.h" @@ -699,14 +700,32 @@ static void bridge_blind_transfer_handler(void *data, struct stasis_subscription struct stasis_message *message) { struct stasis_app *app = data; - struct ast_bridge_blob *blob = stasis_message_data(message); + struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message); + struct ast_bridge_snapshot *bridge = transfer_msg->to_transferee.bridge_snapshot; - if (bridge_app_subscribed(app, blob->channel->uniqueid) || - (blob->bridge && bridge_app_subscribed_involved(app, blob->bridge))) { + if (bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid) || + (bridge && bridge_app_subscribed_involved(app, bridge))) { stasis_publish(app->topic, message); } } +static void set_replacement_channel(struct ast_channel_snapshot *to_be_replaced, + struct ast_channel_snapshot *replacing) +{ + struct stasis_app_control *control = stasis_app_control_find_by_channel_id( + to_be_replaced->uniqueid); + struct ast_channel *chan = ast_channel_get_by_name(replacing->uniqueid); + + if (control && chan) { + ast_channel_lock(chan); + app_set_replace_channel_app(chan, app_name(control_app(control))); + app_set_replace_channel_snapshot(chan, to_be_replaced); + ast_channel_unlock(chan); + } + ast_channel_cleanup(chan); + ao2_cleanup(control); +} + static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) { @@ -751,6 +770,18 @@ static void bridge_attended_transfer_handler(void *data, struct stasis_subscript if (subscribed) { stasis_publish(app->topic, message); } + + if (transfer_msg->replace_channel) { + set_replacement_channel(transfer_msg->to_transferee.channel_snapshot, + transfer_msg->replace_channel); + } + + if (transfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_LINK) { + set_replacement_channel(transfer_msg->to_transferee.channel_snapshot, + transfer_msg->dest.links[0]); + set_replacement_channel(transfer_msg->to_transfer_target.channel_snapshot, + transfer_msg->dest.links[1]); + } } static void bridge_default_handler(void *data, struct stasis_subscription *sub, @@ -1091,6 +1122,30 @@ int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id) return forwards != NULL; } +int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan) +{ + RAII_VAR(struct app_forwards *, old_forwards, NULL, ao2_cleanup); + struct app_forwards *new_forwards; + + old_forwards = ao2_find(app->forwards, old_id, OBJ_SEARCH_KEY | OBJ_UNLINK); + if (!old_forwards) { + return -1; + } + + new_forwards = forwards_create_channel(app, new_chan); + if (!new_forwards) { + return -1; + } + + new_forwards->interested = old_forwards->interested; + ao2_link_flags(app->forwards, new_forwards, 0); + ao2_cleanup(new_forwards); + + /* Clean up old forwards */ + forwards_unsubscribe(old_forwards); + return 0; +} + static void *channel_find(const struct stasis_app *app, const char *id) { return ast_channel_get_by_name(id); diff --git a/res/stasis/app.h b/res/stasis/app.h index 419ec54a8..1ab6097a7 100644 --- a/res/stasis/app.h +++ b/res/stasis/app.h @@ -226,4 +226,48 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) */ int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id); +/*! + * \brief Set the snapshot of the channel that this channel will replace + * + * \param channel The channel on which this will be set + * \param replace_snapshot The snapshot of the channel that is being replaced + * + * \retval zero success + * \retval non-zero failure + */ +int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot); + +/*! + * \brief Set the app that the replacement channel will be controlled by + * + * \param channel The channel on which this will be set + * \param replace_app The app that will be controlling this channel + * + * \retval zero success + * \retval non-zero failure + */ +int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_app); + +/*! + * \brief Get the app that the replacement channel will be controlled by + * + * \param channel The channel on which this will be set + * + * \retval NULL on error + * \return the name of the controlling app (must be ast_free()d) + */ +char *app_get_replace_channel_app(struct ast_channel *chan); + +/*! + * \brief Replace channel topic forwards for the old channel with forwards for the new channel + * + * \param app The app that owns the channel + * \param old_id The unique ID of the channel to be replaced + * \param new_chan The channel that is replacing the old one + * + * \retval zero on success + * \return non-zero on failure + */ +int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan); + #endif /* _ASTERISK_RES_STASIS_APP_H */ diff --git a/res/stasis/command.c b/res/stasis/command.c index a9e53af12..867de180a 100644 --- a/res/stasis/command.c +++ b/res/stasis/command.c @@ -93,3 +93,61 @@ void command_invoke(struct stasis_app_command *command, command_complete(command, retval); } +static void command_queue_prestart_destroy(void *obj) +{ + /* Clean up the container */ + ao2_cleanup(obj); +} + +static const struct ast_datastore_info command_queue_prestart = { + .type = "stasis-command-prestart-queue", + .destroy = command_queue_prestart_destroy, +}; + +int command_prestart_queue_command(struct ast_channel *chan, + stasis_app_command_cb command_fn, void *data) +{ + struct ast_datastore *datastore; + struct ao2_container *command_queue; + RAII_VAR(struct stasis_app_command *, command, + command_create(command_fn, data), ao2_cleanup); + + if (!command) { + return -1; + } + + datastore = ast_channel_datastore_find(chan, &command_queue_prestart, NULL); + if (datastore) { + command_queue = datastore->data; + ao2_link(command_queue, command); + return 0; + } + + command_queue = ao2_container_alloc_list( + AO2_ALLOC_OPT_LOCK_MUTEX, 0, NULL, NULL); + if (!command_queue) { + return -1; + } + + datastore = ast_datastore_alloc(&command_queue_prestart, NULL); + if (!datastore) { + ao2_cleanup(command_queue); + return -1; + } + ast_channel_datastore_add(chan, datastore); + + datastore->data = command_queue; + ao2_link(command_queue, command); + + return 0; +} + +struct ao2_container *command_prestart_get_container(struct ast_channel *chan) +{ + struct ast_datastore *datastore = ast_channel_datastore_find(chan, &command_queue_prestart, NULL); + if (!datastore) { + return NULL; + } + + return ao2_bump(datastore->data); +} diff --git a/res/stasis/command.h b/res/stasis/command.h index a99d40d0a..7f12ab36f 100644 --- a/res/stasis/command.h +++ b/res/stasis/command.h @@ -41,4 +41,31 @@ void command_invoke(struct stasis_app_command *command, int command_join(struct stasis_app_command *command); +/*! + * \brief Queue a Stasis() prestart command for a channel + * + * \pre chan must be locked + * + * \param chan The channel on which to queue the prestart command + * \param command_fn The callback to call for the command + * \param data The data to pass to the command callback + * + * \retval zero on success + * \retval non-zero on failure + */ +int command_prestart_queue_command(struct ast_channel *chan, + stasis_app_command_cb command_fn, void *data); + +/*! + * \brief Get the Stasis() prestart commands for a channel + * + * \pre chan must be locked + * + * \param chan The channel from which to get prestart commands + * + * \return The command prestart container for chan (must be ao2_cleanup()'d) + */ +struct ao2_container *command_prestart_get_container(struct ast_channel *chan); + + #endif /* _ASTERISK_RES_STASIS_CONTROL_H */ diff --git a/res/stasis/control.c b/res/stasis/control.c index 8802e8128..0a9669d3b 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -276,10 +276,6 @@ struct stasis_app_control_dial_data { int timeout; }; -static int app_control_add_channel_to_bridge( - struct stasis_app_control *control, - struct ast_channel *chan, void *data); - static int app_control_dial(struct stasis_app_control *control, struct ast_channel *chan, void *data) { @@ -322,7 +318,7 @@ static int app_control_dial(struct stasis_app_control *control, AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) { ast_hangup(new_chan); } else { - app_control_add_channel_to_bridge(control, chan, bridge); + control_add_channel_to_bridge(control, chan, bridge); } return 0; @@ -855,7 +851,7 @@ static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason, ast_bridge_after_cb_reason_string(reason)); } -static int app_control_add_channel_to_bridge( +int control_add_channel_to_bridge( struct stasis_app_control *control, struct ast_channel *chan, void *data) { @@ -935,7 +931,7 @@ int stasis_app_control_add_channel_to_bridge( stasis_app_control_get_channel_id(control)); return app_send_command_on_condition( - control, app_control_add_channel_to_bridge, bridge, + control, control_add_channel_to_bridge, bridge, app_control_can_add_channel_to_bridge); } @@ -1036,3 +1032,36 @@ void control_wait(struct stasis_app_control *control) } ao2_unlock(control->command_queue); } + +int control_prestart_dispatch_all(struct stasis_app_control *control, + struct ast_channel *chan) +{ + struct ao2_container *command_queue; + int count = 0; + struct ao2_iterator iter; + struct stasis_app_command *command; + + ast_channel_lock(chan); + command_queue = command_prestart_get_container(chan); + ast_channel_unlock(chan); + if (!command_queue) { + return 0; + } + + iter = ao2_iterator_init(command_queue, AO2_ITERATOR_UNLINK); + + while ((command = ao2_iterator_next(&iter))) { + command_invoke(command, control, chan); + ao2_cleanup(command); + ++count; + } + + ao2_iterator_destroy(&iter); + ao2_cleanup(command_queue); + return count; +} + +struct stasis_app *control_app(struct stasis_app_control *control) +{ + return control->app; +} diff --git a/res/stasis/control.h b/res/stasis/control.h index 0febd8438..a139f82e4 100644 --- a/res/stasis/control.h +++ b/res/stasis/control.h @@ -77,5 +77,36 @@ int control_is_done(struct stasis_app_control *control); void control_mark_done(struct stasis_app_control *control); +/*! + * \brief Dispatch all queued prestart commands + * + * \param control The control for chan + * \param channel The channel on which commands should be executed + * + * \return The number of commands executed + */ +int control_prestart_dispatch_all(struct stasis_app_control *control, + struct ast_channel *chan); + +/*! + * \brief Returns the pointer (non-reffed) to the app associated with this control + * + * \param control Control to query. + * + * \returns A pointer to the associated stasis_app + */ +struct stasis_app *control_app(struct stasis_app_control *control); + +/*! + * \brief Command callback for adding a channel to a bridge + * + * \param control The control for chan + * \param channel The channel on which commands should be executed + * \param bridge Data to be passed to the callback + */ +int control_add_channel_to_bridge( + struct stasis_app_control *control, + struct ast_channel *chan, void *obj); + #endif /* _ASTERISK_RES_STASIS_CONTROL_H */ diff --git a/res/stasis/stasis_bridge.c b/res/stasis/stasis_bridge.c index c3a266a11..be7836d35 100644 --- a/res/stasis/stasis_bridge.c +++ b/res/stasis/stasis_bridge.c @@ -32,11 +32,73 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/bridge.h" +#include "asterisk/bridge_after.h" #include "asterisk/bridge_internal.h" +#include "asterisk/bridge_features.h" +#include "asterisk/stasis_app.h" +#include "asterisk/stasis_channels.h" #include "stasis_bridge.h" +#include "control.h" +#include "command.h" +#include "app.h" +#include "asterisk/stasis_app.h" +#include "asterisk/pbx.h" /* ------------------------------------------------------------------- */ +static struct ast_bridge_methods bridge_stasis_v_table; + +static void bridge_stasis_run_cb(struct ast_channel *chan, void *data) +{ + RAII_VAR(char *, app_name, NULL, ast_free); + struct ast_app *app_stasis; + + /* Take ownership of the swap_app memory from the datastore */ + app_name = app_get_replace_channel_app(chan); + if (!app_name) { + ast_log(LOG_ERROR, "Failed to get app name for %s (%p)\n", ast_channel_name(chan), chan); + return; + } + + /* find Stasis() */ + app_stasis = pbx_findapp("Stasis"); + if (!app_stasis) { + ast_log(LOG_WARNING, "Could not find application (Stasis)\n"); + return; + } + + if (ast_check_hangup_locked(chan)) { + /* channel hungup, don't run Stasis() */ + return; + } + + /* run Stasis() */ + pbx_exec(chan, app_stasis, app_name); +} + +static int add_channel_to_bridge( + struct stasis_app_control *control, + struct ast_channel *chan, void *obj) +{ + struct ast_bridge *bridge = obj; + int res; + + res = control_add_channel_to_bridge(control, + chan, bridge); + ao2_cleanup(bridge); + return res; +} + +static void bridge_stasis_queue_join_action(struct ast_bridge *self, + struct ast_bridge_channel *bridge_channel) +{ + ast_channel_lock(bridge_channel->chan); + if (command_prestart_queue_command(bridge_channel->chan, add_channel_to_bridge, ao2_bump(self))) { + ao2_cleanup(self); + } + ast_channel_unlock(bridge_channel->chan); +} + /*! * \internal * \brief Push this channel into the Stasis bridge. @@ -53,6 +115,24 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") */ static int bridge_stasis_push(struct ast_bridge *self, struct ast_bridge_channel *bridge_channel, struct ast_bridge_channel *swap) { + struct stasis_app_control *control = stasis_app_control_find_by_channel(bridge_channel->chan); + + if (!control) { + /* channel not in Stasis(), get it there */ + /* Attach after-bridge callback and pass ownership of swap_app to it */ + if (ast_bridge_set_after_callback(bridge_channel->chan, + bridge_stasis_run_cb, NULL, NULL)) { + ast_log(LOG_ERROR, "Failed to set after bridge callback\n"); + return -1; + } + + bridge_stasis_queue_join_action(self, bridge_channel); + + /* Return -1 so the push fails and the after-bridge callback gets called */ + return -1; + } + + ao2_cleanup(control); if (self->allowed_capabilities & STASIS_BRIDGE_MIXING_CAPABILITIES) { ast_bridge_channel_update_linkedids(bridge_channel, swap); if (ast_test_flag(&self->feature_flags, AST_BRIDGE_FLAG_SMART)) { @@ -63,6 +143,33 @@ static int bridge_stasis_push(struct ast_bridge *self, struct ast_bridge_channel return ast_bridge_base_v_table.push(self, bridge_channel, swap); } +static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void *hook_pvt, + struct ast_bridge *src, struct ast_bridge *dst) +{ + if (src->v_table == &bridge_stasis_v_table && + dst->v_table != &bridge_stasis_v_table) { + RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); + RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup); + struct ast_channel *chan; + + chan = bridge_channel->chan; + ast_assert(chan != NULL); + + control = stasis_app_control_find_by_channel(chan); + if (!control) { + return -1; + } + + blob = ast_json_pack("{s: s}", "app", app_name(control_app(control))); + + stasis_app_channel_set_stasis_end_published(chan); + + ast_channel_publish_blob(chan, ast_stasis_end_message_type(), blob); + } + + return -1; +} + /*! * \internal * \brief Pull this channel from the Stasis bridge. @@ -82,11 +189,11 @@ static void bridge_stasis_pull(struct ast_bridge *self, struct ast_bridge_channe ast_bridge_channel_update_accountcodes(NULL, bridge_channel); } + ast_bridge_move_hook(bridge_channel->features, bridge_stasis_moving, NULL, NULL, 0); + ast_bridge_base_v_table.pull(self, bridge_channel); } -static struct ast_bridge_methods bridge_stasis_v_table; - struct ast_bridge *bridge_stasis_new(uint32_t capabilities, unsigned int flags, const char *name, const char *id) { void *bridge; |