diff options
Diffstat (limited to 'res/res_stasis.c')
-rw-r--r-- | res/res_stasis.c | 170 |
1 files changed, 164 insertions, 6 deletions
diff --git a/res/res_stasis.c b/res/res_stasis.c index 3527adaa5..045362546 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -40,6 +40,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_channels.h" #include "asterisk/strings.h" +/*! Time to wait for a frame in the application */ +#define MAX_WAIT_MS 200 + /*! * \brief Number of buckets for the Stasis application hash table. Remember to * keep it a prime number! @@ -147,7 +150,67 @@ static void app_send(struct app *app, struct ast_json *message) app->handler(app->data, app->name, message); } +typedef void* (*stasis_app_command_cb)(struct stasis_app_control *control, + struct ast_channel *chan, + void *data); + +struct stasis_app_command { + ast_mutex_t lock; + ast_cond_t condition; + stasis_app_command_cb callback; + void *data; + void *retval; + int is_done:1; +}; + +static void command_dtor(void *obj) +{ + struct stasis_app_command *command = obj; + ast_mutex_destroy(&command->lock); + ast_cond_destroy(&command->condition); +} + +static struct stasis_app_command *command_create(stasis_app_command_cb callback, + void *data) +{ + RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup); + + command = ao2_alloc(sizeof(*command), command_dtor); + if (!command) { + return NULL; + } + + ast_mutex_init(&command->lock); + ast_cond_init(&command->condition, 0); + command->callback = callback; + command->data = data; + + ao2_ref(command, +1); + return command; +} + +static void command_complete(struct stasis_app_command *command, void *retval) +{ + SCOPED_MUTEX(lock, &command->lock); + + command->is_done = 1; + command->retval = retval; + ast_cond_signal(&command->condition); +} + +static void *wait_for_command(struct stasis_app_command *command) +{ + SCOPED_MUTEX(lock, &command->lock); + while (!command->is_done) { + ast_cond_wait(&command->condition, &command->lock); + } + + return command->retval; +} + struct stasis_app_control { + /*! Queue of commands to dispatch on the channel */ + struct ao2_container *command_queue; /*! * When set, /c app_stasis should exit and continue in the dialplan. */ @@ -167,11 +230,24 @@ static struct stasis_app_control *control_create(const char *uniqueid) return NULL; } + control->command_queue = ao2_container_alloc_list(0, 0, NULL, NULL); + strncpy(control->channel_id, uniqueid, size - sizeof(*control)); return control; } +static void *exec_command(struct stasis_app_control *control, + struct stasis_app_command *command) +{ + ao2_lock(control); + ao2_ref(command, +1); + ao2_link(control->command_queue, command); + ao2_unlock(control); + + return wait_for_command(command); +} + /*! AO2 hash function for \ref stasis_app_control */ static int control_hash(const void *obj, const int flags) { @@ -199,13 +275,20 @@ static int control_compare(void *lhs, void *rhs, int flags) struct stasis_app_control *stasis_app_control_find_by_channel( const struct ast_channel *chan) { - RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup); if (chan == NULL) { return NULL; } + return stasis_app_control_find_by_channel_id( + ast_channel_uniqueid(chan)); +} + +struct stasis_app_control *stasis_app_control_find_by_channel_id( + const char *channel_id) +{ + RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup); controls = app_controls(); - return ao2_find(controls, ast_channel_uniqueid(chan), OBJ_KEY); + return ao2_find(controls, channel_id, OBJ_KEY); } /*! @@ -233,6 +316,33 @@ void stasis_app_control_continue(struct stasis_app_control *control) control->continue_to_dialplan = 1; } +static int OK = 0; +static int FAIL = -1; + +static void *__app_control_answer(struct stasis_app_control *control, + struct ast_channel *chan, void *data) +{ + ast_debug(3, "%s: Answering", control->channel_id); + return __ast_answer(chan, 0, 1) == 0 ? &OK : &FAIL; +} + +int stasis_app_control_answer(struct stasis_app_control *control) +{ + RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup); + int *retval; + + ast_debug(3, "%s: Sending answer command\n", control->channel_id); + + command = command_create(__app_control_answer, NULL); + retval = exec_command(control, command); + + if (*retval != 0) { + ast_log(LOG_WARNING, "Failed to answer channel"); + } + + return *retval; +} + static struct ast_json *app_event_create( const char *event_name, const struct ast_channel_snapshot *snapshot, @@ -410,6 +520,26 @@ static void control_unlink(struct stasis_app_control *control) ao2_cleanup(control); } +static void dispatch_commands(struct stasis_app_control *control, + struct ast_channel *chan) +{ + struct ao2_iterator i; + void *obj; + + SCOPED_AO2LOCK(lock, control); + + i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK); + + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup); + void *retval = command->callback(control, chan, command->data); + command_complete(command, retval); + } + + ao2_iterator_destroy(&i); +} + + /*! /brief Stasis dialplan application callback */ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, char *argv[]) @@ -458,8 +588,38 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, return res; } - while (!hungup && !control_continue_test_and_reset(control) && ast_waitfor(chan, -1) > -1) { - RAII_VAR(struct ast_frame *, f, ast_read(chan), ast_frame_dtor); + while (1) { + RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor); + int r; + + if (hungup) { + ast_debug(3, "%s: Hangup\n", + ast_channel_uniqueid(chan)); + break; + } + + if (control_continue_test_and_reset(control)) { + ast_debug(3, "%s: Continue\n", + ast_channel_uniqueid(chan)); + break; + } + + r = ast_waitfor(chan, MAX_WAIT_MS); + + if (r < 0) { + ast_debug(3, "%s: Poll error\n", + ast_channel_uniqueid(chan)); + break; + } + + dispatch_commands(control, chan); + + if (r == 0) { + /* Timeout */ + continue; + } + + f = ast_read(chan); if (!f) { ast_debug(3, "%s: No more frames. Must be done, I guess.\n", ast_channel_uniqueid(chan)); break; @@ -468,8 +628,6 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, switch (f->frametype) { case AST_FRAME_CONTROL: if (f->subclass.integer == AST_CONTROL_HANGUP) { - ast_debug(3, "%s: Received hangup\n", - ast_channel_uniqueid(chan)); hungup = 1; } break; |