diff options
Diffstat (limited to 'res/res_pjsip/pjsip_scheduler.c')
-rw-r--r-- | res/res_pjsip/pjsip_scheduler.c | 311 |
1 files changed, 214 insertions, 97 deletions
diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c index e4459da66..bbf666fd7 100644 --- a/res/res_pjsip/pjsip_scheduler.c +++ b/res/res_pjsip/pjsip_scheduler.c @@ -28,6 +28,7 @@ #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" #include "asterisk/res_pjsip_cli.h" +#include "asterisk/taskprocessor.h" #define TASK_BUCKETS 53 @@ -36,31 +37,31 @@ static struct ao2_container *tasks; static int task_count; struct ast_sip_sched_task { - /*! ast_sip_sched task id */ - uint32_t task_id; - /*! ast_sched scheudler id */ - int current_scheduler_id; - /*! task is currently running */ - int is_running; - /*! task */ - ast_sip_task task; + /*! The serializer to be used (if any) (Holds a ref) */ + struct ast_taskprocessor *serializer; /*! task data */ void *task_data; - /*! reschedule interval in milliseconds */ - int interval; - /*! the time the task was queued */ + /*! task function */ + ast_sip_task task; + /*! the time the task was originally scheduled/queued */ struct timeval when_queued; /*! the last time the task was started */ struct timeval last_start; /*! the last time the task was ended */ struct timeval last_end; + /*! When the periodic task is next expected to run */ + struct timeval next_periodic; + /*! reschedule interval in milliseconds */ + int interval; + /*! ast_sched scheudler id */ + int current_scheduler_id; + /*! task is currently running */ + int is_running; /*! times run */ int run_count; /*! the task reschedule, cleanup and policy flags */ enum ast_sip_scheduler_task_flags flags; - /*! the serializer to be used (if any) */ - struct ast_taskprocessor *serializer; - /* A name to be associated with the task */ + /*! A name to be associated with the task */ char name[0]; }; @@ -76,14 +77,22 @@ static int push_to_serializer(const void *data); */ static int run_task(void *data) { - RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup); + RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup); int res; int delay; + if (!schtd->interval) { + /* Task was cancelled while waiting to be executed by the serializer */ + return -1; + } + + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Running %s\n", schtd, schtd->name); + } ao2_lock(schtd); schtd->last_start = ast_tvnow(); schtd->is_running = 1; - schtd->run_count++; + ++schtd->run_count; ao2_unlock(schtd); res = schtd->task(schtd->task_data); @@ -93,10 +102,10 @@ static int run_task(void *data) schtd->last_end = ast_tvnow(); /* - * Don't restart if the task returned 0 or if the interval + * Don't restart if the task returned <= 0 or if the interval * was set to 0 while the task was running */ - if (!res || !schtd->interval) { + if (res <= 0 || !schtd->interval) { schtd->interval = 0; ao2_unlock(schtd); ao2_unlink(tasks, schtd); @@ -110,18 +119,31 @@ static int run_task(void *data) if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) { delay = schtd->interval; } else { - delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval); + int64_t diff; + + /* Determine next periodic interval we need to expire. */ + do { + schtd->next_periodic = ast_tvadd(schtd->next_periodic, + ast_samp2tv(schtd->interval, 1000)); + diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end); + } while (diff <= 0); + delay = diff; } - schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd); + schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd); if (schtd->current_scheduler_id < 0) { schtd->interval = 0; ao2_unlock(schtd); + ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name); ao2_unlink(tasks, schtd); return -1; } ao2_unlock(schtd); + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Rescheduled %s for %d ms\n", schtd, schtd->name, + delay); + } return 0; } @@ -133,9 +155,32 @@ static int run_task(void *data) static int push_to_serializer(const void *data) { struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data; + int sched_id; + ao2_lock(schtd); + sched_id = schtd->current_scheduler_id; + schtd->current_scheduler_id = -1; + ao2_unlock(schtd); + if (sched_id < 0) { + /* Task was cancelled while waiting on the lock */ + return 0; + } + + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Ready to run %s\n", schtd, schtd->name); + } + ao2_t_ref(schtd, +1, "Give ref to run_task()"); if (ast_sip_push_task(schtd->serializer, run_task, schtd)) { - ao2_ref(schtd, -1); + /* + * Oh my. Have to cancel the scheduled item because we + * unexpectedly cannot run it anymore. + */ + ao2_unlink(tasks, schtd); + ao2_lock(schtd); + schtd->interval = 0; + ao2_unlock(schtd); + + ao2_t_ref(schtd, -1, "Failed so release run_task() ref"); } return 0; @@ -144,48 +189,54 @@ static int push_to_serializer(const void *data) int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd) { int res; + int sched_id; - if (!ao2_ref_and_lock(schtd)) { - return -1; - } - - if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) { - ao2_unlock_and_unref(schtd); - return 0; + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Canceling %s\n", schtd, schtd->name); } + /* + * Prevent any tasks in the serializer queue from + * running and restarting the scheduled item on us + * first. + */ + ao2_lock(schtd); schtd->interval = 0; - ao2_unlock_and_unref(schtd); + + sched_id = schtd->current_scheduler_id; + schtd->current_scheduler_id = -1; + ao2_unlock(schtd); + res = ast_sched_del(scheduler_context, sched_id); + ao2_unlink(tasks, schtd); - res = ast_sched_del(scheduler_context, schtd->current_scheduler_id); return res; } int ast_sip_sched_task_cancel_by_name(const char *name) { - RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + int res; + struct ast_sip_sched_task *schtd; if (ast_strlen_zero(name)) { return -1; } - schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY); if (!schtd) { return -1; } - return ast_sip_sched_task_cancel(schtd); + res = ast_sip_sched_task_cancel(schtd); + ao2_ref(schtd, -1); + return res; } int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd, struct timeval *queued, struct timeval *last_start, struct timeval *last_end) { - if (!ao2_ref_and_lock(schtd)) { - return -1; - } - + ao2_lock(schtd); if (queued) { memcpy(queued, &schtd->when_queued, sizeof(struct timeval)); } @@ -195,8 +246,7 @@ int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd, if (last_end) { memcpy(last_end, &schtd->last_end, sizeof(struct timeval)); } - - ao2_unlock_and_unref(schtd); + ao2_unlock(schtd); return 0; } @@ -204,18 +254,21 @@ int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd, int ast_sip_sched_task_get_times_by_name(const char *name, struct timeval *queued, struct timeval *last_start, struct timeval *last_end) { - RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + int res; + struct ast_sip_sched_task *schtd; if (ast_strlen_zero(name)) { return -1; } - schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY); if (!schtd) { return -1; } - return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end); + res = ast_sip_sched_task_get_times(schtd, queued, last_start, last_end); + ao2_ref(schtd, -1); + return res; } int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen) @@ -224,13 +277,9 @@ int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, si return -1; } - if (!ao2_ref_and_lock(schtd)) { - return -1; - } - + ao2_lock(schtd); ast_copy_string(name, schtd->name, maxlen); - - ao2_unlock_and_unref(schtd); + ao2_unlock(schtd); return 0; } @@ -241,9 +290,7 @@ int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd) struct timeval since_when; struct timeval now; - if (!ao2_ref_and_lock(schtd)) { - return -1; - } + ao2_lock(schtd); if (schtd->interval) { delay = schtd->interval; @@ -262,103 +309,136 @@ int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd) delay = -1; } - ao2_unlock_and_unref(schtd); + ao2_unlock(schtd); return delay; } int ast_sip_sched_task_get_next_run_by_name(const char *name) { - RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + int next_run; + struct ast_sip_sched_task *schtd; if (ast_strlen_zero(name)) { return -1; } - schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY); if (!schtd) { return -1; } - return ast_sip_sched_task_get_next_run(schtd); + next_run = ast_sip_sched_task_get_next_run(schtd); + ao2_ref(schtd, -1); + return next_run; } int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd) { - if (!schtd) { - return 0; - } - - return schtd->is_running; + return schtd ? schtd->is_running : 0; } int ast_sip_sched_is_task_running_by_name(const char *name) { - RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + int is_running; + struct ast_sip_sched_task *schtd; if (ast_strlen_zero(name)) { return 0; } - schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY); if (!schtd) { return 0; } - return schtd->is_running; + is_running = schtd->is_running; + ao2_ref(schtd, -1); + return is_running; } -static void schtd_destructor(void *data) +static void schtd_dtor(void *data) { struct ast_sip_sched_task *schtd = data; + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Destructor %s\n", schtd, schtd->name); + } if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) { /* release our own ref, then release the callers if asked to do so */ ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1); } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) { ast_free(schtd->task_data); } + ast_taskprocessor_unreference(schtd->serializer); } struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer, - int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags) + int interval, ast_sip_task sip_task, const char *name, void *task_data, + enum ast_sip_scheduler_task_flags flags) { #define ID_LEN 13 /* task_deadbeef */ struct ast_sip_sched_task *schtd; int res; - if (interval < 0) { + if (interval <= 0) { return NULL; } - schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor); + schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), + schtd_dtor); if (!schtd) { return NULL; } - schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1); - schtd->serializer = serializer; + schtd->serializer = ao2_bump(serializer); + schtd->task_data = task_data; schtd->task = sip_task; + schtd->interval = interval; + schtd->flags = flags; if (!ast_strlen_zero(name)) { strcpy(schtd->name, name); /* Safe */ } else { - sprintf(schtd->name, "task_%08x", schtd->task_id); + uint32_t task_id; + + task_id = ast_atomic_fetchadd_int(&task_count, 1); + sprintf(schtd->name, "task_%08x", task_id); + } + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Scheduling %s for %d ms\n", schtd, schtd->name, + interval); } - schtd->task_data = task_data; - schtd->flags = flags; - schtd->interval = interval; schtd->when_queued = ast_tvnow(); + if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) { + schtd->next_periodic = ast_tvadd(schtd->when_queued, + ast_samp2tv(schtd->interval, 1000)); + } if (flags & AST_SIP_SCHED_TASK_DATA_AO2) { ao2_ref(task_data, +1); } - res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd); + + /* + * We must put it in the 'tasks' container before scheduling + * the task because we don't want the push_to_serializer() + * sched task to "remove" it on failure before we even put + * it in. If this happens then nothing would remove it from + * the 'tasks' container. + */ + ao2_link(tasks, schtd); + + /* + * Lock so we are guaranteed to get the sched id set before + * the push_to_serializer() sched task can clear it. + */ + ao2_lock(schtd); + res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd); + schtd->current_scheduler_id = res; + ao2_unlock(schtd); if (res < 0) { + ao2_unlink(tasks, schtd); ao2_ref(schtd, -1); return NULL; - } else { - schtd->current_scheduler_id = res; - ao2_link(tasks, schtd); } return schtd; @@ -367,16 +447,17 @@ struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *seria static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { - struct ao2_iterator i; + struct ao2_iterator iter; + struct ao2_container *sorted_tasks; struct ast_sip_sched_task *schtd; - const char *log_format = ast_logger_get_dateformat(); + const char *log_format; struct ast_tm tm; char queued[32]; char last_start[32]; char next_start[32]; int datelen; - struct timeval now = ast_tvnow(); - const char *separator = "======================================"; + struct timeval now; + static const char separator[] = "============================================="; switch (cmd) { case CLI_INIT: @@ -392,26 +473,47 @@ static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_SHOWUSAGE; } + /* Get a sorted snapshot of the scheduled tasks */ + sorted_tasks = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, + ast_sip_sched_task_sort_fn, NULL); + if (!sorted_tasks) { + return CLI_SUCCESS; + } + if (ao2_container_dup(sorted_tasks, tasks, 0)) { + ao2_ref(sorted_tasks, -1); + return CLI_SUCCESS; + } + + now = ast_tvnow(); + log_format = ast_logger_get_dateformat(); + ast_localtime(&now, &tm, NULL); datelen = ast_strftime(queued, sizeof(queued), log_format, &tm); ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n"); - ast_cli(a->fd, " %1$-24s %2$-9s %3$-9s %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s %9$7s\n", + ast_cli(a->fd, "%1$-45s %2$-9s %3$-9s %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s %9$7s\n", "Task Name", "Interval", "Times Run", "State", datelen, "Queued", "Last Started", "Next Start", "( secs)"); - ast_cli(a->fd, " %1$-24.24s %2$-9.9s %3$-9.9s %4$-5.5s %6$-*5$.*5$s %7$-*5$.*5$s %9$-*8$.*8$s\n", + ast_cli(a->fd, "%1$-45.45s %2$-9.9s %3$-9.9s %4$-5.5s %6$-*5$.*5$s %7$-*5$.*5$s %9$-*8$.*8$s\n", separator, separator, separator, separator, datelen, separator, separator, datelen + 8, separator); + iter = ao2_iterator_init(sorted_tasks, AO2_ITERATOR_UNLINK); + for (; (schtd = ao2_iterator_next(&iter)); ao2_ref(schtd, -1)) { + int next_run_sec; + struct timeval next; + + ao2_lock(schtd); - ao2_ref(tasks, +1); - ao2_rdlock(tasks); - i = ao2_iterator_init(tasks, 0); - while ((schtd = ao2_iterator_next(&i))) { - int next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000; - struct timeval next = ast_tvadd(now, (struct timeval) {next_run_sec, 0}); + next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000; + if (next_run_sec < 0) { + /* Scheduled task is now canceled */ + ao2_unlock(schtd); + continue; + } + next = ast_tvadd(now, ast_tv(next_run_sec, 0)); ast_localtime(&schtd->when_queued, &tm, NULL); ast_strftime(queued, sizeof(queued), log_format, &tm); @@ -426,7 +528,7 @@ static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_arg ast_localtime(&next, &tm, NULL); ast_strftime(next_start, sizeof(next_start), log_format, &tm); - ast_cli(a->fd, " %1$-24.24s %2$9.3f %3$9d %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s (%9$5d)\n", + ast_cli(a->fd, "%1$-46.46s%2$9.3f %3$9d %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s (%9$5d)\n", schtd->name, schtd->interval / 1000.0, schtd->run_count, @@ -434,11 +536,10 @@ static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_arg datelen, queued, last_start, next_start, next_run_sec); - ao2_cleanup(schtd); + ao2_unlock(schtd); } - ao2_iterator_destroy(&i); - ao2_unlock(tasks); - ao2_ref(tasks, -1); + ao2_iterator_destroy(&iter); + ao2_ref(sorted_tasks, -1); ast_cli(a->fd, "\n"); return CLI_SUCCESS; @@ -450,7 +551,8 @@ static struct ast_cli_entry cli_commands[] = { int ast_sip_initialize_scheduler(void) { - if (!(scheduler_context = ast_sched_context_create())) { + scheduler_context = ast_sched_context_create(); + if (!scheduler_context) { ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n"); return -1; } @@ -461,8 +563,9 @@ int ast_sip_initialize_scheduler(void) return -1; } - tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, - TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn); + tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, + AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, TASK_BUCKETS, ast_sip_sched_task_hash_fn, + ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn); if (!tasks) { ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n"); ast_sched_context_destroy(scheduler_context); @@ -479,7 +582,21 @@ int ast_sip_destroy_scheduler(void) ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); if (scheduler_context) { + if (tasks) { + struct ao2_iterator iter; + struct ast_sip_sched_task *schtd; + + /* Cancel all scheduled tasks */ + iter = ao2_iterator_init(tasks, 0); + while ((schtd = ao2_iterator_next(&iter))) { + ast_sip_sched_task_cancel(schtd); + ao2_ref(schtd, -1); + } + ao2_iterator_destroy(&iter); + } + ast_sched_context_destroy(scheduler_context); + scheduler_context = NULL; } ao2_cleanup(tasks); |