diff options
-rw-r--r-- | include/asterisk/stasis_app.h | 10 | ||||
-rw-r--r-- | res/ari/resource_bridges.c | 5 | ||||
-rw-r--r-- | res/res_stasis.c | 7 | ||||
-rw-r--r-- | res/res_stasis_playback.c | 2 | ||||
-rw-r--r-- | res/res_stasis_recording.c | 2 | ||||
-rw-r--r-- | res/stasis/control.c | 36 | ||||
-rw-r--r-- | res/stasis/control.h | 10 |
7 files changed, 62 insertions, 10 deletions
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h index f2b07e0bf..90ef82ebf 100644 --- a/include/asterisk/stasis_app.h +++ b/include/asterisk/stasis_app.h @@ -440,6 +440,16 @@ int stasis_app_control_is_done( struct stasis_app_control *control); /*! + * \brief Flush the control command queue. + * \since 13.9.0 + * + * \param control Control object to flush command queue. + * + * \return Nothing + */ +void stasis_app_control_flush_queue(struct stasis_app_control *control); + +/*! * \brief Returns the uniqueid of the channel associated with this control * * \param control Control object. diff --git a/res/ari/resource_bridges.c b/res/ari/resource_bridges.c index ad37053f7..7861f4a45 100644 --- a/res/ari/resource_bridges.c +++ b/res/ari/resource_bridges.c @@ -297,10 +297,11 @@ static void *bridge_channel_control_thread(void *data) thread_data = NULL; stasis_app_control_execute_until_exhausted(bridge_channel, control); + stasis_app_control_flush_queue(control); - ast_hangup(bridge_channel); - ao2_cleanup(control); stasis_forward_cancel(forward); + ao2_cleanup(control); + ast_hangup(bridge_channel); return NULL; } diff --git a/res/res_stasis.c b/res/res_stasis.c index cecc6ef2d..4bb967dbf 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -1179,6 +1179,11 @@ int stasis_app_control_is_done(struct stasis_app_control *control) return control_is_done(control); } +void stasis_app_control_flush_queue(struct stasis_app_control *control) +{ + control_flush_queue(control); +} + struct ast_datastore_info set_end_published_info = { .type = "stasis_end_published", }; @@ -1371,6 +1376,8 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, remove_stasis_end_published(chan); } + control_flush_queue(control); + /* Stop any lingering silence generator */ control_silence_stop_now(control); diff --git a/res/res_stasis_playback.c b/res/res_stasis_playback.c index 779dd7755..7fdb5d4ad 100644 --- a/res/res_stasis_playback.c +++ b/res/res_stasis_playback.c @@ -118,6 +118,7 @@ static void playback_dtor(void *obj) { struct stasis_app_playback *playback = obj; + ao2_cleanup(playback->control); ast_string_field_free_memory(playback); } @@ -143,6 +144,7 @@ static struct stasis_app_playback *playback_create( ast_string_field_set(playback, id, uuid); } + ao2_ref(control, +1); playback->control = control; ao2_ref(playback, +1); diff --git a/res/res_stasis_recording.c b/res/res_stasis_recording.c index 392d92c8e..f907b7a0d 100644 --- a/res/res_stasis_recording.c +++ b/res/res_stasis_recording.c @@ -358,6 +358,7 @@ static void recording_dtor(void *obj) struct stasis_app_recording *recording = obj; ast_free(recording->absolute_name); + ao2_cleanup(recording->control); ao2_cleanup(recording->options); } @@ -413,6 +414,7 @@ struct stasis_app_recording *stasis_app_control_record( ao2_ref(options, +1); recording->options = options; + ao2_ref(control, +1); recording->control = control; recording->state = STASIS_APP_RECORDING_STATE_QUEUED; diff --git a/res/stasis/control.c b/res/stasis/control.c index 00385a09f..97b0b8809 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -251,6 +251,11 @@ static struct stasis_app_command *exec_command_on_condition( } ao2_lock(control->command_queue); + if (control->is_done) { + ao2_unlock(control->command_queue); + ao2_ref(command, -1); + return NULL; + } if (can_exec_fn && (retval = can_exec_fn(control))) { ao2_unlock(control->command_queue); command_complete(command, retval); @@ -402,7 +407,10 @@ int control_is_done(struct stasis_app_control *control) void control_mark_done(struct stasis_app_control *control) { + /* Locking necessary to sync with other threads adding commands to the queue. */ + ao2_lock(control->command_queue); control->is_done = 1; + ao2_unlock(control->command_queue); } struct stasis_app_control_continue_data { @@ -427,7 +435,7 @@ static int app_control_continue(struct stasis_app_control *control, /* Called from stasis_app_exec thread; no lock needed */ ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority); - control->is_done = 1; + control_mark_done(control); return 0; } @@ -1115,24 +1123,36 @@ int stasis_app_control_queue_control(struct stasis_app_control *control, return ast_queue_control(control->channel, frame_type); } +void control_flush_queue(struct stasis_app_control *control) +{ + struct ao2_iterator iter; + struct stasis_app_command *command; + + iter = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK); + while ((command = ao2_iterator_next(&iter))) { + command_complete(command, -1); + ao2_ref(command, -1); + } + ao2_iterator_destroy(&iter); +} + int control_dispatch_all(struct stasis_app_control *control, struct ast_channel *chan) { int count = 0; - struct ao2_iterator i; - void *obj; + struct ao2_iterator iter; + struct stasis_app_command *command; ast_assert(control->channel == chan); - i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK); - - while ((obj = ao2_iterator_next(&i))) { - RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup); + iter = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK); + while ((command = ao2_iterator_next(&iter))) { command_invoke(command, control, chan); + ao2_ref(command, -1); ++count; } + ao2_iterator_destroy(&iter); - ao2_iterator_destroy(&i); return count; } diff --git a/res/stasis/control.h b/res/stasis/control.h index d053a35f7..1d37a494a 100644 --- a/res/stasis/control.h +++ b/res/stasis/control.h @@ -41,6 +41,16 @@ struct stasis_app_control *control_create(struct ast_channel *channel, struct stasis_app *app); /*! + * \brief Flush the control command queue. + * \since 13.9.0 + * + * \param control Control object to flush command queue. + * + * \return Nothing + */ +void control_flush_queue(struct stasis_app_control *control); + +/*! * \brief Dispatch all commands enqueued to this control. * * \param control Control object to dispatch. |