diff options
Diffstat (limited to 'res/stasis/control.c')
-rw-r--r-- | res/stasis/control.c | 239 |
1 files changed, 226 insertions, 13 deletions
diff --git a/res/stasis/control.c b/res/stasis/control.c index dcc029701..df279165f 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -33,62 +33,99 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "control.h" #include "asterisk/dial.h" #include "asterisk/bridge.h" +#include "asterisk/bridge_after.h" #include "asterisk/bridge_basic.h" #include "asterisk/frame.h" #include "asterisk/pbx.h" #include "asterisk/musiconhold.h" struct stasis_app_control { + ast_cond_t wait_cond; /*! Queue of commands to dispatch on the channel */ struct ao2_container *command_queue; /*! - * When set, /c app_stasis should exit and continue in the dialplan. - */ - int is_done:1; - /*! * The associated channel. * Be very careful with the threading associated w/ manipulating * the channel. */ struct ast_channel *channel; + /*! + * When a channel is in a bridge, the bridge that it is in. + */ + struct ast_bridge *bridge; + /*! + * Holding place for channel's PBX while imparted to a bridge. + */ + struct ast_pbx *pbx; + /*! + * When set, /c app_stasis should exit and continue in the dialplan. + */ + int is_done:1; }; +static void control_dtor(void *obj) +{ + struct stasis_app_control *control = obj; + + ao2_cleanup(control->command_queue); + ast_cond_destroy(&control->wait_cond); +} + struct stasis_app_control *control_create(struct ast_channel *channel) { - struct stasis_app_control *control; + RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup); + int res; - control = ao2_alloc(sizeof(*control), NULL); + control = ao2_alloc(sizeof(*control), control_dtor); if (!control) { return NULL; } + res = ast_cond_init(&control->wait_cond, NULL); + if (res != 0) { + ast_log(LOG_ERROR, "Error initializing ast_cond_t: %s\n", + strerror(errno)); + return NULL; + } + control->command_queue = ao2_container_alloc_list( AO2_ALLOC_OPT_LOCK_MUTEX, 0, NULL, NULL); if (!control->command_queue) { - ao2_cleanup(control); return NULL; } control->channel = channel; + ao2_ref(control, +1); return control; } +static void *noop_cb(struct stasis_app_control *control, + struct ast_channel *chan, void *data) +{ + return NULL; +} + + static struct stasis_app_command *exec_command( struct stasis_app_control *control, stasis_app_command_cb command_fn, void *data) { RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup); + command_fn = command_fn ? : noop_cb; + command = command_create(command_fn, data); if (!command) { return NULL; } - /* command_queue is a thread safe list; no lock needed */ - ao2_link(control->command_queue, command); + ao2_lock(control->command_queue); + ao2_link_flags(control->command_queue, command, OBJ_NOLOCK); + ast_cond_signal(&control->wait_cond); + ao2_unlock(control->command_queue); ao2_ref(command, +1); return command; @@ -195,6 +232,14 @@ static void *app_control_continue(struct stasis_app_control *control, { RAII_VAR(struct stasis_app_control_continue_data *, continue_data, data, ast_free); + ast_assert(control->channel != NULL); + + /* If we're in a Stasis bridge, depart it before going back to the + * dialplan */ + if (stasis_app_get_bridge(control)) { + ast_bridge_depart(control->channel); + } + /* Called from stasis_app_exec thread; no lock needed */ ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority); @@ -422,6 +467,161 @@ int stasis_app_send_command_async(struct stasis_app_control *control, return 0; } +struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control) +{ + if (!control) { + return NULL; + } else { + SCOPED_AO2LOCK(lock, control); + return control->bridge; + } +} + + +static void bridge_after_cb(struct ast_channel *chan, void *data) +{ + struct stasis_app_control *control = data; + SCOPED_AO2LOCK(lock, control); + + ast_debug(3, "%s, %s: Channel leaving bridge\n", + ast_channel_uniqueid(chan), control->bridge->uniqueid); + + ast_assert(chan == control->channel); + + /* Restore the channel's PBX */ + ast_channel_pbx_set(control->channel, control->pbx); + control->pbx = NULL; + + /* No longer in the bridge */ + control->bridge = NULL; + + /* Wakeup the command_queue loop */ + exec_command(control, NULL, NULL); +} + +static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason, + void *data) +{ + struct stasis_app_control *control = data; + + bridge_after_cb(control->channel, data); + + ast_debug(3, " reason: %s\n", + ast_bridge_after_cb_reason_string(reason)); +} + +static void *app_control_add_channel_to_bridge( + struct stasis_app_control *control, + struct ast_channel *chan, void *data) +{ + struct ast_bridge *bridge = data; + int res; + + if (!control || !bridge) { + return NULL; + } + + ast_debug(3, "%s: Adding to bridge %s\n", + stasis_app_control_get_channel_id(control), + bridge->uniqueid); + + ast_assert(chan != NULL); + + /* Depart whatever Stasis bridge we're currently in. */ + if (stasis_app_get_bridge(control)) { + /* Note that it looks like there's a race condition here, since + * we don't have control locked. But this happens from the + * control callback thread, so there won't be any other + * concurrent attempts to bridge. + */ + ast_bridge_depart(chan); + } + + + res = ast_bridge_set_after_callback(chan, bridge_after_cb, + bridge_after_cb_failed, control); + if (res != 0) { + ast_log(LOG_ERROR, "Error setting after-bridge callback\n"); + return NULL; + } + + { + /* pbx and bridge are modified by the bridging impart thread. + * It shouldn't happen concurrently, but we still need to lock + * for the memory fence. + */ + SCOPED_AO2LOCK(lock, control); + + /* Save off the channel's PBX */ + ast_assert(control->pbx == NULL); + if (!control->pbx) { + control->pbx = ast_channel_pbx(chan); + ast_channel_pbx_set(chan, NULL); + } + + res = ast_bridge_impart(bridge, + chan, + NULL, /* swap channel */ + NULL, /* features */ + 0); /* independent - false allows us to ast_bridge_depart() */ + + if (res != 0) { + ast_log(LOG_ERROR, "Error adding channel to bridge\n"); + ast_channel_pbx_set(chan, control->pbx); + control->pbx = NULL; + return NULL; + } + + ast_assert(stasis_app_get_bridge(control) == NULL); + control->bridge = bridge; + } + return NULL; +} + +void stasis_app_control_add_channel_to_bridge( + struct stasis_app_control *control, struct ast_bridge *bridge) +{ + ast_debug(3, "%s: Sending channel add_to_bridge command\n", + stasis_app_control_get_channel_id(control)); + stasis_app_send_command_async(control, + app_control_add_channel_to_bridge, bridge); +} + +static void *app_control_remove_channel_from_bridge( + struct stasis_app_control *control, + struct ast_channel *chan, void *data) +{ + struct ast_bridge *bridge = data; + + if (!control) { + return NULL; + } + + /* We should only depart from our own bridge */ + ast_debug(3, "%s: Departing bridge %s\n", + stasis_app_control_get_channel_id(control), + bridge->uniqueid); + + if (bridge != stasis_app_get_bridge(control)) { + ast_log(LOG_WARNING, "%s: Not in bridge %s; not removing\n", + stasis_app_control_get_channel_id(control), + bridge->uniqueid); + return NULL; + } + + ast_bridge_depart(chan); + return NULL; +} + +void stasis_app_control_remove_channel_from_bridge( + struct stasis_app_control *control, struct ast_bridge *bridge) +{ + ast_debug(3, "%s: Sending channel remove_from_bridge command\n", + stasis_app_control_get_channel_id(control)); + stasis_app_send_command_async(control, + app_control_remove_channel_from_bridge, bridge); +} + const char *stasis_app_control_get_channel_id( const struct stasis_app_control *control) { @@ -464,9 +664,22 @@ int control_dispatch_all(struct stasis_app_control *control, return count; } -/* Must be defined here since it must operate on the channel outside of the queue */ -int stasis_app_control_remove_channel_from_bridge( - struct stasis_app_control *control, struct ast_bridge *bridge) +void control_wait(struct stasis_app_control *control) { - return ast_bridge_remove(bridge, control->channel); + if (!control) { + return; + } + + ast_assert(control->command_queue != NULL); + + ao2_lock(control->command_queue); + while (ao2_container_count(control->command_queue) == 0) { + int res = ast_cond_wait(&control->wait_cond, + ao2_object_get_lockaddr(control->command_queue)); + if (res < 0) { + ast_log(LOG_ERROR, "Error waiting on command queue\n"); + break; + } + } + ao2_unlock(control->command_queue); } |