summaryrefslogtreecommitdiff
path: root/res/res_stasis.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_stasis.c')
-rw-r--r--res/res_stasis.c478
1 files changed, 451 insertions, 27 deletions
diff --git a/res/res_stasis.c b/res/res_stasis.c
index 7b5d16f1a..3480c9e23 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -109,6 +109,31 @@ struct ao2_container *app_bridges_moh;
struct ao2_container *app_bridges_playback;
+static struct ast_json *stasis_end_json_payload(struct ast_channel_snapshot *snapshot,
+ const struct stasis_message_sanitizer *sanitize)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", "StasisEnd",
+ "timestamp", ast_json_timeval(ast_tvnow(), NULL),
+ "channel", ast_channel_snapshot_to_json(snapshot, sanitize));
+}
+
+static struct ast_json *stasis_end_to_json(struct stasis_message *message,
+ const struct stasis_message_sanitizer *sanitize)
+{
+ struct ast_channel_blob *payload = stasis_message_data(message);
+
+ if (sanitize && sanitize->channel_snapshot &&
+ sanitize->channel_snapshot(payload->snapshot)) {
+ return NULL;
+ }
+
+ return stasis_end_json_payload(payload->snapshot, sanitize);
+}
+
+STASIS_MESSAGE_TYPE_DEFN(ast_stasis_end_message_type,
+ .to_json = stasis_end_to_json);
+
const char *stasis_app_name(const struct stasis_app *app)
{
return app_name(app);
@@ -726,26 +751,121 @@ void stasis_app_bridge_destroy(const char *bridge_id)
ast_bridge_destroy(bridge, 0);
}
-static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
- int argc, char *argv[])
+struct replace_channel_store {
+ struct ast_channel_snapshot *snapshot;
+ char *app;
+};
+
+static void replace_channel_destroy(void *obj)
{
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ struct replace_channel_store *replace = obj;
- struct ast_json *json_args;
- int i;
- struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
+ ao2_cleanup(replace->snapshot);
+ ast_free(replace->app);
+ ast_free(replace);
+}
- ast_assert(chan != NULL);
+static const struct ast_datastore_info replace_channel_store_info = {
+ .type = "replace-channel-store",
+ .destroy = replace_channel_destroy,
+};
- /* Set channel info */
- ast_channel_lock(chan);
- snapshot = ast_channel_snapshot_create(chan);
- ast_channel_unlock(chan);
- if (!snapshot) {
+static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create)
+{
+ struct ast_datastore *datastore;
+
+ SCOPED_CHANNELLOCK(lock, chan);
+ datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL);
+ if (!datastore) {
+ if (no_create) {
+ return NULL;
+ }
+
+ datastore = ast_datastore_alloc(&replace_channel_store_info, NULL);
+ if (!datastore) {
+ return NULL;
+ }
+ ast_channel_datastore_add(chan, datastore);
+ }
+
+ if (!datastore->data) {
+ datastore->data = ast_calloc(1, sizeof(struct replace_channel_store));
+ }
+ return datastore->data;
+}
+
+int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot)
+{
+ struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
+
+ if (!replace) {
return -1;
}
+ ao2_replace(replace->snapshot, replace_snapshot);
+ return 0;
+}
+
+int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_app)
+{
+ struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
+
+ if (!replace) {
+ return -1;
+ }
+
+ ast_free(replace->app);
+ replace->app = NULL;
+
+ if (replace_app) {
+ replace->app = ast_strdup(replace_app);
+ if (!replace->app) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static struct ast_channel_snapshot *get_replace_channel_snapshot(struct ast_channel *chan)
+{
+ struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
+ struct ast_channel_snapshot *replace_channel_snapshot;
+
+ if (!replace) {
+ return NULL;
+ }
+
+ replace_channel_snapshot = replace->snapshot;
+ replace->snapshot = NULL;
+
+ return replace_channel_snapshot;
+}
+
+char *app_get_replace_channel_app(struct ast_channel *chan)
+{
+ struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
+ char *replace_channel_app;
+
+ if (!replace) {
+ return NULL;
+ }
+
+ replace_channel_app = replace->app;
+ replace->app = NULL;
+
+ return replace_channel_app;
+}
+
+static int send_start_msg_snapshots(struct stasis_app *app,
+ int argc, char *argv[], struct ast_channel_snapshot *snapshot,
+ struct ast_channel_snapshot *replace_channel_snapshot)
+{
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+ struct ast_json *json_args;
+ struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
+ int i;
+
if (sanitize && sanitize->channel_snapshot
&& sanitize->channel_snapshot(snapshot)) {
return 0;
@@ -760,6 +880,15 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
return -1;
}
+ if (replace_channel_snapshot) {
+ int res = ast_json_object_set(msg, "replace_channel",
+ ast_channel_snapshot_to_json(replace_channel_snapshot, NULL));
+
+ if (res) {
+ return -1;
+ }
+ }
+
/* Append arguments to args array */
json_args = ast_json_object_get(msg, "args");
ast_assert(json_args != NULL);
@@ -776,39 +905,213 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
return 0;
}
-static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
+static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
+ int argc, char *argv[])
{
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
- struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
+ RAII_VAR(struct ast_channel_snapshot *, replace_channel_snapshot,
+ NULL, ao2_cleanup);
ast_assert(chan != NULL);
+ replace_channel_snapshot = get_replace_channel_snapshot(chan);
+
/* Set channel info */
ast_channel_lock(chan);
snapshot = ast_channel_snapshot_create(chan);
ast_channel_unlock(chan);
- if (snapshot == NULL) {
+ if (!snapshot) {
return -1;
}
+ return send_start_msg_snapshots(app, argc, argv, snapshot, replace_channel_snapshot);
+}
+
+static int send_end_msg_snapshot(struct stasis_app *app, struct ast_channel_snapshot *snapshot)
+{
+ struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
+ struct ast_json *msg;
if (sanitize && sanitize->channel_snapshot
&& sanitize->channel_snapshot(snapshot)) {
return 0;
}
- msg = ast_json_pack("{s: s, s: o, s: o}",
- "type", "StasisEnd",
- "timestamp", ast_json_timeval(ast_tvnow(), NULL),
- "channel", ast_channel_snapshot_to_json(snapshot, NULL));
+ msg = stasis_end_json_payload(snapshot, sanitize);
if (!msg) {
return -1;
}
app_send(app, msg);
+ ast_json_unref(msg);
+ return 0;
+}
+
+static void remove_masquerade_store(struct ast_channel *chan);
+
+static int masq_match_cb(void *obj, void *data, int flags)
+{
+ struct stasis_app_control *control = obj;
+ struct ast_channel *chan = data;
+
+ if (!strcmp(ast_channel_uniqueid(chan),
+ stasis_app_control_get_channel_id(control))) {
+ return CMP_MATCH;
+ }
+
+ return 0;
+}
+
+static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+{
+ struct ast_channel_snapshot *snapshot;
+ struct stasis_app_control *control;
+
+ /* grab a snapshot */
+ snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
+ if (!snapshot) {
+ ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
+ return;
+ }
+
+ /* find control */
+ control = ao2_callback(app_controls, 0, masq_match_cb, old_chan);
+ if (!control) {
+ ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n");
+ ao2_cleanup(snapshot);
+ return;
+ }
+
+ /* send the StasisEnd message to the app */
+ send_end_msg_snapshot(control_app(control), snapshot);
+
+ /* remove the datastore */
+ remove_masquerade_store(old_chan);
+
+ ao2_cleanup(control);
+ ao2_cleanup(snapshot);
+}
+
+static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, new_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, old_snapshot, NULL, ao2_cleanup);
+ struct stasis_app_control *control;
+
+ /* At this point, new_chan is the channel pointer that is in Stasis() and
+ * has the unknown channel's name in it while old_chan is the channel pointer
+ * that is not in Stasis(), but has the guts of the channel that Stasis() knows
+ * about */
+
+ /* grab a snapshot for the channel that is jumping into Stasis() */
+ new_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
+ if (!new_snapshot) {
+ ast_log(LOG_ERROR, "Could not get snapshot for masquerading channel\n");
+ return;
+ }
+
+ /* grab a snapshot for the channel that has been kicked out of Stasis() */
+ old_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(old_chan));
+ if (!old_snapshot) {
+ ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
+ return;
+ }
+
+ /* find, unlink, and relink control since the channel has a new name and
+ * its hash has likely changed */
+ control = ao2_callback(app_controls, OBJ_UNLINK, masq_match_cb, new_chan);
+ if (!control) {
+ ast_log(LOG_ERROR, "Could not find control for masquerading channel\n");
+ return;
+ }
+ ao2_link(app_controls, control);
+
+
+ /* send the StasisStart with replace_channel to the app */
+ send_start_msg_snapshots(control_app(control), 0, NULL, new_snapshot,
+ old_snapshot);
+ /* send the StasisEnd message to the app */
+ send_end_msg_snapshot(control_app(control), old_snapshot);
+
+ /* fixup channel topic forwards */
+ if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) {
+ ast_log(LOG_ERROR, "Failed to fixup channel topic forwards for %s(%s) owned by %s\n",
+ old_snapshot->name, old_snapshot->uniqueid, app_name(control_app(control)));
+ }
+ ao2_cleanup(control);
+}
+
+static const struct ast_datastore_info masquerade_store_info = {
+ .type = "stasis-masqerade",
+ .chan_fixup = channel_stolen_cb,
+ .chan_breakdown = channel_replaced_cb,
+};
+
+static int has_masquerade_store(struct ast_channel *chan)
+{
+ SCOPED_CHANNELLOCK(lock, chan);
+ return !!ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
+}
+
+static int add_masquerade_store(struct ast_channel *chan)
+{
+ struct ast_datastore *datastore;
+
+ SCOPED_CHANNELLOCK(lock, chan);
+ if (ast_channel_datastore_find(chan, &masquerade_store_info, NULL)) {
+ return 0;
+ }
+
+ datastore = ast_datastore_alloc(&masquerade_store_info, NULL);
+ if (!datastore) {
+ return -1;
+ }
+
+ ast_channel_datastore_add(chan, datastore);
+
return 0;
}
+static void remove_masquerade_store(struct ast_channel *chan)
+{
+ struct ast_datastore *datastore;
+
+ SCOPED_CHANNELLOCK(lock, chan);
+ datastore = ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
+ if (!datastore) {
+ return;
+ }
+
+ ast_channel_datastore_remove(chan, datastore);
+ ast_datastore_free(datastore);
+}
+
+static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
+{
+ struct ast_channel_snapshot *snapshot;
+ int res = 0;
+
+ ast_assert(chan != NULL);
+
+ /* A masquerade has occurred and this message will be wrong so it
+ * has already been sent elsewhere. */
+ if (!has_masquerade_store(chan)) {
+ return 0;
+ }
+
+ /* Set channel info */
+ snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
+ if (!snapshot) {
+ return -1;
+ }
+
+ if (send_end_msg_snapshot(app, snapshot)) {
+ res = -1;
+ }
+
+ ao2_cleanup(snapshot);
+ return res;
+}
+
void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
{
while (!control_is_done(control)) {
@@ -837,6 +1140,46 @@ int stasis_app_control_is_done(struct stasis_app_control *control)
return control_is_done(control);
}
+struct ast_datastore_info set_end_published_info = {
+ .type = "stasis_end_published",
+};
+
+void stasis_app_channel_set_stasis_end_published(struct ast_channel *chan)
+{
+ struct ast_datastore *datastore;
+
+ datastore = ast_datastore_alloc(&set_end_published_info, NULL);
+
+ ast_channel_lock(chan);
+ ast_channel_datastore_add(chan, datastore);
+ ast_channel_unlock(chan);
+}
+
+int stasis_app_channel_is_stasis_end_published(struct ast_channel *chan)
+{
+ struct ast_datastore *datastore;
+
+ ast_channel_lock(chan);
+ datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
+ ast_channel_unlock(chan);
+
+ return datastore ? 1 : 0;
+}
+
+static void remove_stasis_end_published(struct ast_channel *chan)
+{
+ struct ast_datastore *datastore;
+
+ ast_channel_lock(chan);
+ datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
+ ast_channel_unlock(chan);
+
+ if (datastore) {
+ ast_channel_datastore_remove(chan, datastore);
+ ast_datastore_free(datastore);
+ }
+}
+
/*! /brief Stasis dialplan application callback */
int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
char *argv[])
@@ -850,6 +1193,11 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
ast_assert(chan != NULL);
+ /* Just in case there's a lingering indication that the channel has had a stasis
+ * end published on it, remove that now.
+ */
+ remove_stasis_end_published(chan);
+
app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
if (!app) {
ast_log(LOG_ERROR,
@@ -869,10 +1217,16 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
}
ao2_link(app_controls, control);
+ if (add_masquerade_store(chan)) {
+ ast_log(LOG_ERROR, "Failed to attach masquerade detector\n");
+ return -1;
+ }
+
res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending start message to '%s'\n", app_name);
+ remove_masquerade_store(chan);
return -1;
}
@@ -880,9 +1234,13 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
if (res != 0) {
ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
app_name, ast_channel_name(chan));
+ remove_masquerade_store(chan);
return -1;
}
+ /* Pull queued prestart commands and execute */
+ control_prestart_dispatch_all(control, chan);
+
while (!control_is_done(control)) {
RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
int r;
@@ -948,14 +1306,20 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
}
app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
- app_unsubscribe_channel(app, chan);
ao2_cleanup(bridge);
- res = send_end_msg(app, chan);
- if (res != 0) {
- ast_log(LOG_ERROR,
- "Error sending end message to %s\n", app_name);
- return res;
+ /* Only publish a stasis_end event if it hasn't already been published */
+ if (!stasis_app_channel_is_stasis_end_published(chan)) {
+ app_unsubscribe_channel(app, chan);
+ res = send_end_msg(app, chan);
+ remove_masquerade_store(chan);
+ if (res != 0) {
+ ast_log(LOG_ERROR,
+ "Error sending end message to %s\n", app_name);
+ return res;
+ }
+ } else {
+ remove_stasis_end_published(chan);
}
/* There's an off chance that app is ready for cleanup. Go ahead
@@ -1434,8 +1798,15 @@ void stasis_app_unref(void)
ast_module_unref(ast_module_info->self);
}
+/*!
+ * \brief Subscription to StasisEnd events
+ */
+struct stasis_subscription *stasis_end_sub;
+
static int unload_module(void)
{
+ stasis_end_sub = stasis_unsubscribe(stasis_end_sub);
+
stasis_app_unregister_event_sources();
messaging_cleanup();
@@ -1455,6 +1826,8 @@ static int unload_module(void)
ao2_cleanup(app_bridges_playback);
app_bridges_playback = NULL;
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_stasis_end_message_type);
+
return 0;
}
@@ -1486,8 +1859,53 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
return &app_sanitizer;
}
+static void remove_masquerade_store_by_name(const char *channel_name)
+{
+ struct ast_channel *chan;
+
+ chan = ast_channel_get_by_name(channel_name);
+ if (!chan) {
+ return;
+ }
+
+ remove_masquerade_store(chan);
+ ast_channel_unref(chan);
+}
+
+static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ struct ast_channel_blob *payload;
+ struct ast_channel_snapshot *snapshot;
+ const char *app_name;
+ char *channel_uri;
+ size_t alloc_size;
+ const char *channels[1];
+
+ if (stasis_message_type(message) != ast_stasis_end_message_type()) {
+ return;
+ }
+
+ payload = stasis_message_data(message);
+ snapshot = payload->snapshot;
+ app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app"));
+
+ /* +8 is for the length of "channel:" */
+ alloc_size = AST_MAX_UNIQUEID + 8;
+ channel_uri = ast_alloca(alloc_size);
+ snprintf(channel_uri, alloc_size, "channel:%s", snapshot->uniqueid);
+
+ channels[0] = channel_uri;
+ stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL);
+
+ remove_masquerade_store_by_name(snapshot->name);
+}
+
static int load_module(void)
{
+ if (STASIS_MESSAGE_TYPE_INIT(ast_stasis_end_message_type) != 0) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
@@ -1511,6 +1929,12 @@ static int load_module(void)
stasis_app_register_event_sources();
+ stasis_end_sub = stasis_subscribe(ast_channel_topic_all(), check_for_stasis_end, NULL);
+ if (!stasis_end_sub) {
+ unload_module();
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
return AST_MODULE_LOAD_SUCCESS;
}