summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2018-03-21 19:43:21 -0500
committerRichard Mudgett <rmudgett@digium.com>2018-04-12 17:34:16 -0500
commitc2f85e881de51578ec2eab2978126d59ba6c8cc0 (patch)
tree1f7b0115fd3b0b3fd6fa514a58bf2f5c10dbe8ae
parent3c5d76863bc3178d8e2ac5ea64386f8027046b75 (diff)
pjsip_scheduler.c: Fix some corner cases.
* Fix the periodic interval wander because it may take significant time between the sched thread queueing the task in the serializer and the serializer actually executing the task. The time it takes to actually execute the task was already taken into account. * Pass a schtd ref to the serializer when we queue a scheduled task on the serializer. We don't want it going away on us while it is in the serializer queue. * Skip the scheduled task if the task was canceled between queueing the task to the serializer and the serializer actually executing the task. * Reorder struct ast_sip_sched_task to avoid unnecessary padding. Removed task_id and added next_periodic. * Hold a ref to the passed in serializer so the serializer cannot go away on the scheduled task. ASTERISK_26806 Change-Id: I6c8046b75f6953792c8c30e55b836a4291143f24
-rw-r--r--include/asterisk/res_pjsip.h6
-rw-r--r--res/res_pjsip/pjsip_scheduler.c159
2 files changed, 120 insertions, 45 deletions
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index d3849ad34..b01d6f5d0 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -1407,7 +1407,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* the next item on the SIP socket(s) can be serviced. On incoming messages,
* Asterisk automatically will push the request to a servant thread. When your
* module callback is called, processing will already be in a servant. However,
- * for other PSJIP events, such as transaction state changes due to timer
+ * for other PJSIP events, such as transaction state changes due to timer
* expirations, your module will be called into from a PJSIP thread. If you
* are called into from a PJSIP thread, then you should push whatever processing
* is needed to a servant as soon as possible. You can discern if you are currently
@@ -1588,13 +1588,13 @@ enum ast_sip_scheduler_task_flags {
/*!
* Run at a fixed interval.
- * Stop scheduling if the callback returns 0.
+ * Stop scheduling if the callback returns <= 0.
* Any other value is ignored.
*/
AST_SIP_SCHED_TASK_FIXED = (0 << 0),
/*!
* Run at a variable interval.
- * Stop scheduling if the callback returns 0.
+ * Stop scheduling if the callback returns <= 0.
* Any other return value is used as the new interval.
*/
AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),
diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c
index 5b86a791b..4210664dc 100644
--- a/res/res_pjsip/pjsip_scheduler.c
+++ b/res/res_pjsip/pjsip_scheduler.c
@@ -28,6 +28,7 @@
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/res_pjsip_cli.h"
+#include "asterisk/taskprocessor.h"
#define TASK_BUCKETS 53
@@ -36,30 +37,30 @@ static struct ao2_container *tasks;
static int task_count;
struct ast_sip_sched_task {
- /*! ast_sip_sched task id */
- uint32_t task_id;
- /*! ast_sched scheudler id */
- int current_scheduler_id;
- /*! task is currently running */
- int is_running;
- /*! task */
- ast_sip_task task;
+ /*! The serializer to be used (if any) (Holds a ref) */
+ struct ast_taskprocessor *serializer;
/*! task data */
void *task_data;
- /*! reschedule interval in milliseconds */
- int interval;
- /*! the time the task was queued */
+ /*! task function */
+ ast_sip_task task;
+ /*! the time the task was originally scheduled/queued */
struct timeval when_queued;
/*! the last time the task was started */
struct timeval last_start;
/*! the last time the task was ended */
struct timeval last_end;
+ /*! When the periodic task is next expected to run */
+ struct timeval next_periodic;
+ /*! reschedule interval in milliseconds */
+ int interval;
+ /*! ast_sched scheudler id */
+ int current_scheduler_id;
+ /*! task is currently running */
+ int is_running;
/*! times run */
int run_count;
/*! the task reschedule, cleanup and policy flags */
enum ast_sip_scheduler_task_flags flags;
- /*! the serializer to be used (if any) */
- struct ast_taskprocessor *serializer;
/*! A name to be associated with the task */
char name[0];
};
@@ -76,14 +77,19 @@ static int push_to_serializer(const void *data);
*/
static int run_task(void *data)
{
- RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);
int res;
int delay;
+ if (!schtd->interval) {
+ /* Task was cancelled while waiting to be executed by the serializer */
+ return -1;
+ }
+
ao2_lock(schtd);
schtd->last_start = ast_tvnow();
schtd->is_running = 1;
- schtd->run_count++;
+ ++schtd->run_count;
ao2_unlock(schtd);
res = schtd->task(schtd->task_data);
@@ -93,10 +99,10 @@ static int run_task(void *data)
schtd->last_end = ast_tvnow();
/*
- * Don't restart if the task returned 0 or if the interval
+ * Don't restart if the task returned <= 0 or if the interval
* was set to 0 while the task was running
*/
- if (!res || !schtd->interval) {
+ if (res <= 0 || !schtd->interval) {
schtd->interval = 0;
ao2_unlock(schtd);
ao2_unlink(tasks, schtd);
@@ -110,13 +116,22 @@ static int run_task(void *data)
if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
delay = schtd->interval;
} else {
- delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
+ int64_t diff;
+
+ /* Determine next periodic interval we need to expire. */
+ do {
+ schtd->next_periodic = ast_tvadd(schtd->next_periodic,
+ ast_samp2tv(schtd->interval, 1000));
+ diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);
+ } while (diff <= 0);
+ delay = diff;
}
schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);
if (schtd->current_scheduler_id < 0) {
schtd->interval = 0;
ao2_unlock(schtd);
+ ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);
ao2_unlink(tasks, schtd);
return -1;
}
@@ -133,9 +148,29 @@ static int run_task(void *data)
static int push_to_serializer(const void *data)
{
struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
+ int sched_id;
+ ao2_lock(schtd);
+ sched_id = schtd->current_scheduler_id;
+ schtd->current_scheduler_id = -1;
+ ao2_unlock(schtd);
+ if (sched_id < 0) {
+ /* Task was cancelled while waiting on the lock */
+ return 0;
+ }
+
+ ao2_t_ref(schtd, +1, "Give ref to run_task()");
if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
- ao2_ref(schtd, -1);
+ /*
+ * Oh my. Have to cancel the scheduled item because we
+ * unexpectedly cannot run it anymore.
+ */
+ ao2_unlink(tasks, schtd);
+ ao2_lock(schtd);
+ schtd->interval = 0;
+ ao2_unlock(schtd);
+
+ ao2_t_ref(schtd, -1, "Failed so release run_task() ref");
}
return 0;
@@ -144,20 +179,22 @@ static int push_to_serializer(const void *data)
int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
{
int res;
+ int sched_id;
- if (!ao2_ref_and_lock(schtd)) {
- return -1;
- }
+ /*
+ * Prevent any tasks in the serializer queue from
+ * running and restarting the scheduled item on us
+ * first.
+ */
+ ao2_lock(schtd);
+ schtd->interval = 0;
- if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
- ao2_unlock_and_unref(schtd);
- return 0;
- }
+ sched_id = schtd->current_scheduler_id;
+ schtd->current_scheduler_id = -1;
+ ao2_unlock(schtd);
+ res = ast_sched_del(scheduler_context, sched_id);
- schtd->interval = 0;
- ao2_unlock_and_unref(schtd);
ao2_unlink(tasks, schtd);
- res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
return res;
}
@@ -306,7 +343,7 @@ int ast_sip_sched_is_task_running_by_name(const char *name)
return is_running;
}
-static void schtd_destructor(void *data)
+static void schtd_dtor(void *data)
{
struct ast_sip_sched_task *schtd = data;
@@ -316,6 +353,7 @@ static void schtd_destructor(void *data)
} else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
ast_free(schtd->task_data);
}
+ ast_taskprocessor_unreference(schtd->serializer);
}
struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
@@ -326,38 +364,60 @@ struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *seria
struct ast_sip_sched_task *schtd;
int res;
- if (interval < 0) {
+ if (interval <= 0) {
return NULL;
}
- schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
+ schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),
+ schtd_dtor);
if (!schtd) {
return NULL;
}
- schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
- schtd->serializer = serializer;
+ schtd->serializer = ao2_bump(serializer);
+ schtd->task_data = task_data;
schtd->task = sip_task;
+ schtd->interval = interval;
+ schtd->flags = flags;
if (!ast_strlen_zero(name)) {
strcpy(schtd->name, name); /* Safe */
} else {
- sprintf(schtd->name, "task_%08x", schtd->task_id);
+ uint32_t task_id;
+
+ task_id = ast_atomic_fetchadd_int(&task_count, 1);
+ sprintf(schtd->name, "task_%08x", task_id);
}
- schtd->task_data = task_data;
- schtd->flags = flags;
- schtd->interval = interval;
schtd->when_queued = ast_tvnow();
+ if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {
+ schtd->next_periodic = ast_tvadd(schtd->when_queued,
+ ast_samp2tv(schtd->interval, 1000));
+ }
if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
ao2_ref(task_data, +1);
}
+
+ /*
+ * We must put it in the 'tasks' container before scheduling
+ * the task because we don't want the push_to_serializer()
+ * sched task to "remove" it on failure before we even put
+ * it in. If this happens then nothing would remove it from
+ * the 'tasks' container.
+ */
+ ao2_link(tasks, schtd);
+
+ /*
+ * Lock so we are guaranteed to get the sched id set before
+ * the push_to_serializer() sched task can clear it.
+ */
+ ao2_lock(schtd);
res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);
+ schtd->current_scheduler_id = res;
+ ao2_unlock(schtd);
if (res < 0) {
+ ao2_unlink(tasks, schtd);
ao2_ref(schtd, -1);
return NULL;
- } else {
- schtd->current_scheduler_id = res;
- ao2_link(tasks, schtd);
}
return schtd;
@@ -457,7 +517,8 @@ static struct ast_cli_entry cli_commands[] = {
int ast_sip_initialize_scheduler(void)
{
- if (!(scheduler_context = ast_sched_context_create())) {
+ scheduler_context = ast_sched_context_create();
+ if (!scheduler_context) {
ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
return -1;
}
@@ -487,7 +548,21 @@ int ast_sip_destroy_scheduler(void)
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
if (scheduler_context) {
+ if (tasks) {
+ struct ao2_iterator iter;
+ struct ast_sip_sched_task *schtd;
+
+ /* Cancel all scheduled tasks */
+ iter = ao2_iterator_init(tasks, 0);
+ while ((schtd = ao2_iterator_next(&iter))) {
+ ast_sip_sched_task_cancel(schtd);
+ ao2_ref(schtd, -1);
+ }
+ ao2_iterator_destroy(&iter);
+ }
+
ast_sched_context_destroy(scheduler_context);
+ scheduler_context = NULL;
}
ao2_cleanup(tasks);