summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/stasis_app.h10
-rw-r--r--res/ari/resource_bridges.c5
-rw-r--r--res/res_stasis.c7
-rw-r--r--res/res_stasis_playback.c2
-rw-r--r--res/res_stasis_recording.c2
-rw-r--r--res/stasis/control.c36
-rw-r--r--res/stasis/control.h10
7 files changed, 62 insertions, 10 deletions
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h
index f2b07e0bf..90ef82ebf 100644
--- a/include/asterisk/stasis_app.h
+++ b/include/asterisk/stasis_app.h
@@ -440,6 +440,16 @@ int stasis_app_control_is_done(
struct stasis_app_control *control);
/*!
+ * \brief Flush the control command queue.
+ * \since 13.9.0
+ *
+ * \param control Control object to flush command queue.
+ *
+ * \return Nothing
+ */
+void stasis_app_control_flush_queue(struct stasis_app_control *control);
+
+/*!
* \brief Returns the uniqueid of the channel associated with this control
*
* \param control Control object.
diff --git a/res/ari/resource_bridges.c b/res/ari/resource_bridges.c
index ad37053f7..7861f4a45 100644
--- a/res/ari/resource_bridges.c
+++ b/res/ari/resource_bridges.c
@@ -297,10 +297,11 @@ static void *bridge_channel_control_thread(void *data)
thread_data = NULL;
stasis_app_control_execute_until_exhausted(bridge_channel, control);
+ stasis_app_control_flush_queue(control);
- ast_hangup(bridge_channel);
- ao2_cleanup(control);
stasis_forward_cancel(forward);
+ ao2_cleanup(control);
+ ast_hangup(bridge_channel);
return NULL;
}
diff --git a/res/res_stasis.c b/res/res_stasis.c
index cecc6ef2d..4bb967dbf 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -1179,6 +1179,11 @@ int stasis_app_control_is_done(struct stasis_app_control *control)
return control_is_done(control);
}
+void stasis_app_control_flush_queue(struct stasis_app_control *control)
+{
+ control_flush_queue(control);
+}
+
struct ast_datastore_info set_end_published_info = {
.type = "stasis_end_published",
};
@@ -1371,6 +1376,8 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
remove_stasis_end_published(chan);
}
+ control_flush_queue(control);
+
/* Stop any lingering silence generator */
control_silence_stop_now(control);
diff --git a/res/res_stasis_playback.c b/res/res_stasis_playback.c
index 779dd7755..7fdb5d4ad 100644
--- a/res/res_stasis_playback.c
+++ b/res/res_stasis_playback.c
@@ -118,6 +118,7 @@ static void playback_dtor(void *obj)
{
struct stasis_app_playback *playback = obj;
+ ao2_cleanup(playback->control);
ast_string_field_free_memory(playback);
}
@@ -143,6 +144,7 @@ static struct stasis_app_playback *playback_create(
ast_string_field_set(playback, id, uuid);
}
+ ao2_ref(control, +1);
playback->control = control;
ao2_ref(playback, +1);
diff --git a/res/res_stasis_recording.c b/res/res_stasis_recording.c
index 392d92c8e..f907b7a0d 100644
--- a/res/res_stasis_recording.c
+++ b/res/res_stasis_recording.c
@@ -358,6 +358,7 @@ static void recording_dtor(void *obj)
struct stasis_app_recording *recording = obj;
ast_free(recording->absolute_name);
+ ao2_cleanup(recording->control);
ao2_cleanup(recording->options);
}
@@ -413,6 +414,7 @@ struct stasis_app_recording *stasis_app_control_record(
ao2_ref(options, +1);
recording->options = options;
+ ao2_ref(control, +1);
recording->control = control;
recording->state = STASIS_APP_RECORDING_STATE_QUEUED;
diff --git a/res/stasis/control.c b/res/stasis/control.c
index 00385a09f..97b0b8809 100644
--- a/res/stasis/control.c
+++ b/res/stasis/control.c
@@ -251,6 +251,11 @@ static struct stasis_app_command *exec_command_on_condition(
}
ao2_lock(control->command_queue);
+ if (control->is_done) {
+ ao2_unlock(control->command_queue);
+ ao2_ref(command, -1);
+ return NULL;
+ }
if (can_exec_fn && (retval = can_exec_fn(control))) {
ao2_unlock(control->command_queue);
command_complete(command, retval);
@@ -402,7 +407,10 @@ int control_is_done(struct stasis_app_control *control)
void control_mark_done(struct stasis_app_control *control)
{
+ /* Locking necessary to sync with other threads adding commands to the queue. */
+ ao2_lock(control->command_queue);
control->is_done = 1;
+ ao2_unlock(control->command_queue);
}
struct stasis_app_control_continue_data {
@@ -427,7 +435,7 @@ static int app_control_continue(struct stasis_app_control *control,
/* Called from stasis_app_exec thread; no lock needed */
ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority);
- control->is_done = 1;
+ control_mark_done(control);
return 0;
}
@@ -1115,24 +1123,36 @@ int stasis_app_control_queue_control(struct stasis_app_control *control,
return ast_queue_control(control->channel, frame_type);
}
+void control_flush_queue(struct stasis_app_control *control)
+{
+ struct ao2_iterator iter;
+ struct stasis_app_command *command;
+
+ iter = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
+ while ((command = ao2_iterator_next(&iter))) {
+ command_complete(command, -1);
+ ao2_ref(command, -1);
+ }
+ ao2_iterator_destroy(&iter);
+}
+
int control_dispatch_all(struct stasis_app_control *control,
struct ast_channel *chan)
{
int count = 0;
- struct ao2_iterator i;
- void *obj;
+ struct ao2_iterator iter;
+ struct stasis_app_command *command;
ast_assert(control->channel == chan);
- i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
-
- while ((obj = ao2_iterator_next(&i))) {
- RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
+ iter = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
+ while ((command = ao2_iterator_next(&iter))) {
command_invoke(command, control, chan);
+ ao2_ref(command, -1);
++count;
}
+ ao2_iterator_destroy(&iter);
- ao2_iterator_destroy(&i);
return count;
}
diff --git a/res/stasis/control.h b/res/stasis/control.h
index d053a35f7..1d37a494a 100644
--- a/res/stasis/control.h
+++ b/res/stasis/control.h
@@ -41,6 +41,16 @@
struct stasis_app_control *control_create(struct ast_channel *channel, struct stasis_app *app);
/*!
+ * \brief Flush the control command queue.
+ * \since 13.9.0
+ *
+ * \param control Control object to flush command queue.
+ *
+ * \return Nothing
+ */
+void control_flush_queue(struct stasis_app_control *control);
+
+/*!
* \brief Dispatch all commands enqueued to this control.
*
* \param control Control object to dispatch.