summaryrefslogtreecommitdiff
path: root/res/stasis/control.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/stasis/control.c')
-rw-r--r--res/stasis/control.c239
1 files changed, 226 insertions, 13 deletions
diff --git a/res/stasis/control.c b/res/stasis/control.c
index dcc029701..df279165f 100644
--- a/res/stasis/control.c
+++ b/res/stasis/control.c
@@ -33,62 +33,99 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "control.h"
#include "asterisk/dial.h"
#include "asterisk/bridge.h"
+#include "asterisk/bridge_after.h"
#include "asterisk/bridge_basic.h"
#include "asterisk/frame.h"
#include "asterisk/pbx.h"
#include "asterisk/musiconhold.h"
struct stasis_app_control {
+ ast_cond_t wait_cond;
/*! Queue of commands to dispatch on the channel */
struct ao2_container *command_queue;
/*!
- * When set, /c app_stasis should exit and continue in the dialplan.
- */
- int is_done:1;
- /*!
* The associated channel.
* Be very careful with the threading associated w/ manipulating
* the channel.
*/
struct ast_channel *channel;
+ /*!
+ * When a channel is in a bridge, the bridge that it is in.
+ */
+ struct ast_bridge *bridge;
+ /*!
+ * Holding place for channel's PBX while imparted to a bridge.
+ */
+ struct ast_pbx *pbx;
+ /*!
+ * When set, /c app_stasis should exit and continue in the dialplan.
+ */
+ int is_done:1;
};
+static void control_dtor(void *obj)
+{
+ struct stasis_app_control *control = obj;
+
+ ao2_cleanup(control->command_queue);
+ ast_cond_destroy(&control->wait_cond);
+}
+
struct stasis_app_control *control_create(struct ast_channel *channel)
{
- struct stasis_app_control *control;
+ RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup);
+ int res;
- control = ao2_alloc(sizeof(*control), NULL);
+ control = ao2_alloc(sizeof(*control), control_dtor);
if (!control) {
return NULL;
}
+ res = ast_cond_init(&control->wait_cond, NULL);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error initializing ast_cond_t: %s\n",
+ strerror(errno));
+ return NULL;
+ }
+
control->command_queue = ao2_container_alloc_list(
AO2_ALLOC_OPT_LOCK_MUTEX, 0, NULL, NULL);
if (!control->command_queue) {
- ao2_cleanup(control);
return NULL;
}
control->channel = channel;
+ ao2_ref(control, +1);
return control;
}
+static void *noop_cb(struct stasis_app_control *control,
+ struct ast_channel *chan, void *data)
+{
+ return NULL;
+}
+
+
static struct stasis_app_command *exec_command(
struct stasis_app_control *control, stasis_app_command_cb command_fn,
void *data)
{
RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
+ command_fn = command_fn ? : noop_cb;
+
command = command_create(command_fn, data);
if (!command) {
return NULL;
}
- /* command_queue is a thread safe list; no lock needed */
- ao2_link(control->command_queue, command);
+ ao2_lock(control->command_queue);
+ ao2_link_flags(control->command_queue, command, OBJ_NOLOCK);
+ ast_cond_signal(&control->wait_cond);
+ ao2_unlock(control->command_queue);
ao2_ref(command, +1);
return command;
@@ -195,6 +232,14 @@ static void *app_control_continue(struct stasis_app_control *control,
{
RAII_VAR(struct stasis_app_control_continue_data *, continue_data, data, ast_free);
+ ast_assert(control->channel != NULL);
+
+ /* If we're in a Stasis bridge, depart it before going back to the
+ * dialplan */
+ if (stasis_app_get_bridge(control)) {
+ ast_bridge_depart(control->channel);
+ }
+
/* Called from stasis_app_exec thread; no lock needed */
ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority);
@@ -422,6 +467,161 @@ int stasis_app_send_command_async(struct stasis_app_control *control,
return 0;
}
+struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control)
+{
+ if (!control) {
+ return NULL;
+ } else {
+ SCOPED_AO2LOCK(lock, control);
+ return control->bridge;
+ }
+}
+
+
+static void bridge_after_cb(struct ast_channel *chan, void *data)
+{
+ struct stasis_app_control *control = data;
+ SCOPED_AO2LOCK(lock, control);
+
+ ast_debug(3, "%s, %s: Channel leaving bridge\n",
+ ast_channel_uniqueid(chan), control->bridge->uniqueid);
+
+ ast_assert(chan == control->channel);
+
+ /* Restore the channel's PBX */
+ ast_channel_pbx_set(control->channel, control->pbx);
+ control->pbx = NULL;
+
+ /* No longer in the bridge */
+ control->bridge = NULL;
+
+ /* Wakeup the command_queue loop */
+ exec_command(control, NULL, NULL);
+}
+
+static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
+ void *data)
+{
+ struct stasis_app_control *control = data;
+
+ bridge_after_cb(control->channel, data);
+
+ ast_debug(3, " reason: %s\n",
+ ast_bridge_after_cb_reason_string(reason));
+}
+
+static void *app_control_add_channel_to_bridge(
+ struct stasis_app_control *control,
+ struct ast_channel *chan, void *data)
+{
+ struct ast_bridge *bridge = data;
+ int res;
+
+ if (!control || !bridge) {
+ return NULL;
+ }
+
+ ast_debug(3, "%s: Adding to bridge %s\n",
+ stasis_app_control_get_channel_id(control),
+ bridge->uniqueid);
+
+ ast_assert(chan != NULL);
+
+ /* Depart whatever Stasis bridge we're currently in. */
+ if (stasis_app_get_bridge(control)) {
+ /* Note that it looks like there's a race condition here, since
+ * we don't have control locked. But this happens from the
+ * control callback thread, so there won't be any other
+ * concurrent attempts to bridge.
+ */
+ ast_bridge_depart(chan);
+ }
+
+
+ res = ast_bridge_set_after_callback(chan, bridge_after_cb,
+ bridge_after_cb_failed, control);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error setting after-bridge callback\n");
+ return NULL;
+ }
+
+ {
+ /* pbx and bridge are modified by the bridging impart thread.
+ * It shouldn't happen concurrently, but we still need to lock
+ * for the memory fence.
+ */
+ SCOPED_AO2LOCK(lock, control);
+
+ /* Save off the channel's PBX */
+ ast_assert(control->pbx == NULL);
+ if (!control->pbx) {
+ control->pbx = ast_channel_pbx(chan);
+ ast_channel_pbx_set(chan, NULL);
+ }
+
+ res = ast_bridge_impart(bridge,
+ chan,
+ NULL, /* swap channel */
+ NULL, /* features */
+ 0); /* independent - false allows us to ast_bridge_depart() */
+
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error adding channel to bridge\n");
+ ast_channel_pbx_set(chan, control->pbx);
+ control->pbx = NULL;
+ return NULL;
+ }
+
+ ast_assert(stasis_app_get_bridge(control) == NULL);
+ control->bridge = bridge;
+ }
+ return NULL;
+}
+
+void stasis_app_control_add_channel_to_bridge(
+ struct stasis_app_control *control, struct ast_bridge *bridge)
+{
+ ast_debug(3, "%s: Sending channel add_to_bridge command\n",
+ stasis_app_control_get_channel_id(control));
+ stasis_app_send_command_async(control,
+ app_control_add_channel_to_bridge, bridge);
+}
+
+static void *app_control_remove_channel_from_bridge(
+ struct stasis_app_control *control,
+ struct ast_channel *chan, void *data)
+{
+ struct ast_bridge *bridge = data;
+
+ if (!control) {
+ return NULL;
+ }
+
+ /* We should only depart from our own bridge */
+ ast_debug(3, "%s: Departing bridge %s\n",
+ stasis_app_control_get_channel_id(control),
+ bridge->uniqueid);
+
+ if (bridge != stasis_app_get_bridge(control)) {
+ ast_log(LOG_WARNING, "%s: Not in bridge %s; not removing\n",
+ stasis_app_control_get_channel_id(control),
+ bridge->uniqueid);
+ return NULL;
+ }
+
+ ast_bridge_depart(chan);
+ return NULL;
+}
+
+void stasis_app_control_remove_channel_from_bridge(
+ struct stasis_app_control *control, struct ast_bridge *bridge)
+{
+ ast_debug(3, "%s: Sending channel remove_from_bridge command\n",
+ stasis_app_control_get_channel_id(control));
+ stasis_app_send_command_async(control,
+ app_control_remove_channel_from_bridge, bridge);
+}
+
const char *stasis_app_control_get_channel_id(
const struct stasis_app_control *control)
{
@@ -464,9 +664,22 @@ int control_dispatch_all(struct stasis_app_control *control,
return count;
}
-/* Must be defined here since it must operate on the channel outside of the queue */
-int stasis_app_control_remove_channel_from_bridge(
- struct stasis_app_control *control, struct ast_bridge *bridge)
+void control_wait(struct stasis_app_control *control)
{
- return ast_bridge_remove(bridge, control->channel);
+ if (!control) {
+ return;
+ }
+
+ ast_assert(control->command_queue != NULL);
+
+ ao2_lock(control->command_queue);
+ while (ao2_container_count(control->command_queue) == 0) {
+ int res = ast_cond_wait(&control->wait_cond,
+ ao2_object_get_lockaddr(control->command_queue));
+ if (res < 0) {
+ ast_log(LOG_ERROR, "Error waiting on command queue\n");
+ break;
+ }
+ }
+ ao2_unlock(control->command_queue);
}