summaryrefslogtreecommitdiff
path: root/res/res_pjsip/pjsip_scheduler.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_pjsip/pjsip_scheduler.c')
-rw-r--r--res/res_pjsip/pjsip_scheduler.c311
1 files changed, 214 insertions, 97 deletions
diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c
index e4459da66..bbf666fd7 100644
--- a/res/res_pjsip/pjsip_scheduler.c
+++ b/res/res_pjsip/pjsip_scheduler.c
@@ -28,6 +28,7 @@
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/res_pjsip_cli.h"
+#include "asterisk/taskprocessor.h"
#define TASK_BUCKETS 53
@@ -36,31 +37,31 @@ static struct ao2_container *tasks;
static int task_count;
struct ast_sip_sched_task {
- /*! ast_sip_sched task id */
- uint32_t task_id;
- /*! ast_sched scheudler id */
- int current_scheduler_id;
- /*! task is currently running */
- int is_running;
- /*! task */
- ast_sip_task task;
+ /*! The serializer to be used (if any) (Holds a ref) */
+ struct ast_taskprocessor *serializer;
/*! task data */
void *task_data;
- /*! reschedule interval in milliseconds */
- int interval;
- /*! the time the task was queued */
+ /*! task function */
+ ast_sip_task task;
+ /*! the time the task was originally scheduled/queued */
struct timeval when_queued;
/*! the last time the task was started */
struct timeval last_start;
/*! the last time the task was ended */
struct timeval last_end;
+ /*! When the periodic task is next expected to run */
+ struct timeval next_periodic;
+ /*! reschedule interval in milliseconds */
+ int interval;
+ /*! ast_sched scheudler id */
+ int current_scheduler_id;
+ /*! task is currently running */
+ int is_running;
/*! times run */
int run_count;
/*! the task reschedule, cleanup and policy flags */
enum ast_sip_scheduler_task_flags flags;
- /*! the serializer to be used (if any) */
- struct ast_taskprocessor *serializer;
- /* A name to be associated with the task */
+ /*! A name to be associated with the task */
char name[0];
};
@@ -76,14 +77,22 @@ static int push_to_serializer(const void *data);
*/
static int run_task(void *data)
{
- RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);
int res;
int delay;
+ if (!schtd->interval) {
+ /* Task was cancelled while waiting to be executed by the serializer */
+ return -1;
+ }
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
+ ast_log(LOG_DEBUG, "Sched %p: Running %s\n", schtd, schtd->name);
+ }
ao2_lock(schtd);
schtd->last_start = ast_tvnow();
schtd->is_running = 1;
- schtd->run_count++;
+ ++schtd->run_count;
ao2_unlock(schtd);
res = schtd->task(schtd->task_data);
@@ -93,10 +102,10 @@ static int run_task(void *data)
schtd->last_end = ast_tvnow();
/*
- * Don't restart if the task returned 0 or if the interval
+ * Don't restart if the task returned <= 0 or if the interval
* was set to 0 while the task was running
*/
- if (!res || !schtd->interval) {
+ if (res <= 0 || !schtd->interval) {
schtd->interval = 0;
ao2_unlock(schtd);
ao2_unlink(tasks, schtd);
@@ -110,18 +119,31 @@ static int run_task(void *data)
if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
delay = schtd->interval;
} else {
- delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
+ int64_t diff;
+
+ /* Determine next periodic interval we need to expire. */
+ do {
+ schtd->next_periodic = ast_tvadd(schtd->next_periodic,
+ ast_samp2tv(schtd->interval, 1000));
+ diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);
+ } while (diff <= 0);
+ delay = diff;
}
- schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd);
+ schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);
if (schtd->current_scheduler_id < 0) {
schtd->interval = 0;
ao2_unlock(schtd);
+ ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);
ao2_unlink(tasks, schtd);
return -1;
}
ao2_unlock(schtd);
+ if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
+ ast_log(LOG_DEBUG, "Sched %p: Rescheduled %s for %d ms\n", schtd, schtd->name,
+ delay);
+ }
return 0;
}
@@ -133,9 +155,32 @@ static int run_task(void *data)
static int push_to_serializer(const void *data)
{
struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
+ int sched_id;
+ ao2_lock(schtd);
+ sched_id = schtd->current_scheduler_id;
+ schtd->current_scheduler_id = -1;
+ ao2_unlock(schtd);
+ if (sched_id < 0) {
+ /* Task was cancelled while waiting on the lock */
+ return 0;
+ }
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
+ ast_log(LOG_DEBUG, "Sched %p: Ready to run %s\n", schtd, schtd->name);
+ }
+ ao2_t_ref(schtd, +1, "Give ref to run_task()");
if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
- ao2_ref(schtd, -1);
+ /*
+ * Oh my. Have to cancel the scheduled item because we
+ * unexpectedly cannot run it anymore.
+ */
+ ao2_unlink(tasks, schtd);
+ ao2_lock(schtd);
+ schtd->interval = 0;
+ ao2_unlock(schtd);
+
+ ao2_t_ref(schtd, -1, "Failed so release run_task() ref");
}
return 0;
@@ -144,48 +189,54 @@ static int push_to_serializer(const void *data)
int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
{
int res;
+ int sched_id;
- if (!ao2_ref_and_lock(schtd)) {
- return -1;
- }
-
- if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
- ao2_unlock_and_unref(schtd);
- return 0;
+ if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
+ ast_log(LOG_DEBUG, "Sched %p: Canceling %s\n", schtd, schtd->name);
}
+ /*
+ * Prevent any tasks in the serializer queue from
+ * running and restarting the scheduled item on us
+ * first.
+ */
+ ao2_lock(schtd);
schtd->interval = 0;
- ao2_unlock_and_unref(schtd);
+
+ sched_id = schtd->current_scheduler_id;
+ schtd->current_scheduler_id = -1;
+ ao2_unlock(schtd);
+ res = ast_sched_del(scheduler_context, sched_id);
+
ao2_unlink(tasks, schtd);
- res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
return res;
}
int ast_sip_sched_task_cancel_by_name(const char *name)
{
- RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+ int res;
+ struct ast_sip_sched_task *schtd;
if (ast_strlen_zero(name)) {
return -1;
}
- schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
if (!schtd) {
return -1;
}
- return ast_sip_sched_task_cancel(schtd);
+ res = ast_sip_sched_task_cancel(schtd);
+ ao2_ref(schtd, -1);
+ return res;
}
int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
{
- if (!ao2_ref_and_lock(schtd)) {
- return -1;
- }
-
+ ao2_lock(schtd);
if (queued) {
memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
}
@@ -195,8 +246,7 @@ int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
if (last_end) {
memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
}
-
- ao2_unlock_and_unref(schtd);
+ ao2_unlock(schtd);
return 0;
}
@@ -204,18 +254,21 @@ int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
int ast_sip_sched_task_get_times_by_name(const char *name,
struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
{
- RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+ int res;
+ struct ast_sip_sched_task *schtd;
if (ast_strlen_zero(name)) {
return -1;
}
- schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
if (!schtd) {
return -1;
}
- return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
+ res = ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
+ ao2_ref(schtd, -1);
+ return res;
}
int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
@@ -224,13 +277,9 @@ int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, si
return -1;
}
- if (!ao2_ref_and_lock(schtd)) {
- return -1;
- }
-
+ ao2_lock(schtd);
ast_copy_string(name, schtd->name, maxlen);
-
- ao2_unlock_and_unref(schtd);
+ ao2_unlock(schtd);
return 0;
}
@@ -241,9 +290,7 @@ int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
struct timeval since_when;
struct timeval now;
- if (!ao2_ref_and_lock(schtd)) {
- return -1;
- }
+ ao2_lock(schtd);
if (schtd->interval) {
delay = schtd->interval;
@@ -262,103 +309,136 @@ int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
delay = -1;
}
- ao2_unlock_and_unref(schtd);
+ ao2_unlock(schtd);
return delay;
}
int ast_sip_sched_task_get_next_run_by_name(const char *name)
{
- RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+ int next_run;
+ struct ast_sip_sched_task *schtd;
if (ast_strlen_zero(name)) {
return -1;
}
- schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
if (!schtd) {
return -1;
}
- return ast_sip_sched_task_get_next_run(schtd);
+ next_run = ast_sip_sched_task_get_next_run(schtd);
+ ao2_ref(schtd, -1);
+ return next_run;
}
int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
{
- if (!schtd) {
- return 0;
- }
-
- return schtd->is_running;
+ return schtd ? schtd->is_running : 0;
}
int ast_sip_sched_is_task_running_by_name(const char *name)
{
- RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+ int is_running;
+ struct ast_sip_sched_task *schtd;
if (ast_strlen_zero(name)) {
return 0;
}
- schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
if (!schtd) {
return 0;
}
- return schtd->is_running;
+ is_running = schtd->is_running;
+ ao2_ref(schtd, -1);
+ return is_running;
}
-static void schtd_destructor(void *data)
+static void schtd_dtor(void *data)
{
struct ast_sip_sched_task *schtd = data;
+ if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
+ ast_log(LOG_DEBUG, "Sched %p: Destructor %s\n", schtd, schtd->name);
+ }
if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
/* release our own ref, then release the callers if asked to do so */
ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
} else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
ast_free(schtd->task_data);
}
+ ast_taskprocessor_unreference(schtd->serializer);
}
struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
- int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags)
+ int interval, ast_sip_task sip_task, const char *name, void *task_data,
+ enum ast_sip_scheduler_task_flags flags)
{
#define ID_LEN 13 /* task_deadbeef */
struct ast_sip_sched_task *schtd;
int res;
- if (interval < 0) {
+ if (interval <= 0) {
return NULL;
}
- schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
+ schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),
+ schtd_dtor);
if (!schtd) {
return NULL;
}
- schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
- schtd->serializer = serializer;
+ schtd->serializer = ao2_bump(serializer);
+ schtd->task_data = task_data;
schtd->task = sip_task;
+ schtd->interval = interval;
+ schtd->flags = flags;
if (!ast_strlen_zero(name)) {
strcpy(schtd->name, name); /* Safe */
} else {
- sprintf(schtd->name, "task_%08x", schtd->task_id);
+ uint32_t task_id;
+
+ task_id = ast_atomic_fetchadd_int(&task_count, 1);
+ sprintf(schtd->name, "task_%08x", task_id);
+ }
+ if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
+ ast_log(LOG_DEBUG, "Sched %p: Scheduling %s for %d ms\n", schtd, schtd->name,
+ interval);
}
- schtd->task_data = task_data;
- schtd->flags = flags;
- schtd->interval = interval;
schtd->when_queued = ast_tvnow();
+ if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {
+ schtd->next_periodic = ast_tvadd(schtd->when_queued,
+ ast_samp2tv(schtd->interval, 1000));
+ }
if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
ao2_ref(task_data, +1);
}
- res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd);
+
+ /*
+ * We must put it in the 'tasks' container before scheduling
+ * the task because we don't want the push_to_serializer()
+ * sched task to "remove" it on failure before we even put
+ * it in. If this happens then nothing would remove it from
+ * the 'tasks' container.
+ */
+ ao2_link(tasks, schtd);
+
+ /*
+ * Lock so we are guaranteed to get the sched id set before
+ * the push_to_serializer() sched task can clear it.
+ */
+ ao2_lock(schtd);
+ res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);
+ schtd->current_scheduler_id = res;
+ ao2_unlock(schtd);
if (res < 0) {
+ ao2_unlink(tasks, schtd);
ao2_ref(schtd, -1);
return NULL;
- } else {
- schtd->current_scheduler_id = res;
- ao2_link(tasks, schtd);
}
return schtd;
@@ -367,16 +447,17 @@ struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *seria
static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
- struct ao2_iterator i;
+ struct ao2_iterator iter;
+ struct ao2_container *sorted_tasks;
struct ast_sip_sched_task *schtd;
- const char *log_format = ast_logger_get_dateformat();
+ const char *log_format;
struct ast_tm tm;
char queued[32];
char last_start[32];
char next_start[32];
int datelen;
- struct timeval now = ast_tvnow();
- const char *separator = "======================================";
+ struct timeval now;
+ static const char separator[] = "=============================================";
switch (cmd) {
case CLI_INIT:
@@ -392,26 +473,47 @@ static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_SHOWUSAGE;
}
+ /* Get a sorted snapshot of the scheduled tasks */
+ sorted_tasks = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+ ast_sip_sched_task_sort_fn, NULL);
+ if (!sorted_tasks) {
+ return CLI_SUCCESS;
+ }
+ if (ao2_container_dup(sorted_tasks, tasks, 0)) {
+ ao2_ref(sorted_tasks, -1);
+ return CLI_SUCCESS;
+ }
+
+ now = ast_tvnow();
+ log_format = ast_logger_get_dateformat();
+
ast_localtime(&now, &tm, NULL);
datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
- ast_cli(a->fd, " %1$-24s %2$-9s %3$-9s %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s %9$7s\n",
+ ast_cli(a->fd, "%1$-45s %2$-9s %3$-9s %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s %9$7s\n",
"Task Name", "Interval", "Times Run", "State",
datelen, "Queued", "Last Started", "Next Start", "( secs)");
- ast_cli(a->fd, " %1$-24.24s %2$-9.9s %3$-9.9s %4$-5.5s %6$-*5$.*5$s %7$-*5$.*5$s %9$-*8$.*8$s\n",
+ ast_cli(a->fd, "%1$-45.45s %2$-9.9s %3$-9.9s %4$-5.5s %6$-*5$.*5$s %7$-*5$.*5$s %9$-*8$.*8$s\n",
separator, separator, separator, separator,
datelen, separator, separator, datelen + 8, separator);
+ iter = ao2_iterator_init(sorted_tasks, AO2_ITERATOR_UNLINK);
+ for (; (schtd = ao2_iterator_next(&iter)); ao2_ref(schtd, -1)) {
+ int next_run_sec;
+ struct timeval next;
+
+ ao2_lock(schtd);
- ao2_ref(tasks, +1);
- ao2_rdlock(tasks);
- i = ao2_iterator_init(tasks, 0);
- while ((schtd = ao2_iterator_next(&i))) {
- int next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000;
- struct timeval next = ast_tvadd(now, (struct timeval) {next_run_sec, 0});
+ next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000;
+ if (next_run_sec < 0) {
+ /* Scheduled task is now canceled */
+ ao2_unlock(schtd);
+ continue;
+ }
+ next = ast_tvadd(now, ast_tv(next_run_sec, 0));
ast_localtime(&schtd->when_queued, &tm, NULL);
ast_strftime(queued, sizeof(queued), log_format, &tm);
@@ -426,7 +528,7 @@ static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
ast_localtime(&next, &tm, NULL);
ast_strftime(next_start, sizeof(next_start), log_format, &tm);
- ast_cli(a->fd, " %1$-24.24s %2$9.3f %3$9d %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s (%9$5d)\n",
+ ast_cli(a->fd, "%1$-46.46s%2$9.3f %3$9d %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s (%9$5d)\n",
schtd->name,
schtd->interval / 1000.0,
schtd->run_count,
@@ -434,11 +536,10 @@ static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
datelen, queued, last_start,
next_start,
next_run_sec);
- ao2_cleanup(schtd);
+ ao2_unlock(schtd);
}
- ao2_iterator_destroy(&i);
- ao2_unlock(tasks);
- ao2_ref(tasks, -1);
+ ao2_iterator_destroy(&iter);
+ ao2_ref(sorted_tasks, -1);
ast_cli(a->fd, "\n");
return CLI_SUCCESS;
@@ -450,7 +551,8 @@ static struct ast_cli_entry cli_commands[] = {
int ast_sip_initialize_scheduler(void)
{
- if (!(scheduler_context = ast_sched_context_create())) {
+ scheduler_context = ast_sched_context_create();
+ if (!scheduler_context) {
ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
return -1;
}
@@ -461,8 +563,9 @@ int ast_sip_initialize_scheduler(void)
return -1;
}
- tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
- TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
+ tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, TASK_BUCKETS, ast_sip_sched_task_hash_fn,
+ ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
if (!tasks) {
ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
ast_sched_context_destroy(scheduler_context);
@@ -479,7 +582,21 @@ int ast_sip_destroy_scheduler(void)
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
if (scheduler_context) {
+ if (tasks) {
+ struct ao2_iterator iter;
+ struct ast_sip_sched_task *schtd;
+
+ /* Cancel all scheduled tasks */
+ iter = ao2_iterator_init(tasks, 0);
+ while ((schtd = ao2_iterator_next(&iter))) {
+ ast_sip_sched_task_cancel(schtd);
+ ao2_ref(schtd, -1);
+ }
+ ao2_iterator_destroy(&iter);
+ }
+
ast_sched_context_destroy(scheduler_context);
+ scheduler_context = NULL;
}
ao2_cleanup(tasks);