summaryrefslogtreecommitdiff
path: root/res/res_pjsip
diff options
context:
space:
mode:
authorGeorge Joseph <george.joseph@fairview5.com>2016-03-17 11:28:26 -0600
committerGeorge Joseph <george.joseph@fairview5.com>2016-04-14 13:07:40 -0600
commit9740277713fe308a3f6014a07180a52c3de08b58 (patch)
treee2d443554ec598292461803c4a15214f4a470f4d /res/res_pjsip
parent13cb5ea73fad7cc0ae9140ad8070d362f3aa6527 (diff)
res_pjsip: Add serialized scheduler (res_pjsip/pjsip_scheduler.c)
There are several places that do scheduled tasks or periodic housecleaning, each with its own implementation: * res_pjsip_keepalive has a thread that sends keepalives. * pjsip_distributor has a thread that cleans up expired unidentified requests. * res_pjsip_registrar_expire has a thread that cleans up expired contacts. * res_pjsip_pubsub uses ast_sched directly and then calls ast_sip_push_task. * res_pjsip_sdp_rtp also uses ast_sched to send keepalives. There are also places where we should be doing scheduled work but aren't. A good example are the places we have sorcery observers to start registration or qualify. These don't work when changes are made to a backend database without a pjsip reload. We need to check periodically. As a first step to solving these issues, a new ast_sip_sched facility has been created. ast_sip_sched wraps ast_sched but only uses ast_sched as a scheduled queue. When a task is ready to run, ast_sip_task_pusk is called for it. This ensures that the task is executed in a PJLIB registered thread and doesn't hold up the ast_sched thread so it can immediately continue processing the queue. The serializer used by ast_sip_sched is one of your choosing or a random one from the res_pjsip pool if you don't choose one. Another feature is the ability to automatically clean up the task_data when the task expires (if ever). If it's an ao2 object, it will be dereferenced, if it's a malloc'd object it will be freed. This is selectable when the task is scheduled. Even if you choose to not auto dereference an ao2 task data object, the scheduler itself maintains a reference to it while the task is under it's control. This prevents the data from disappearing out from under the task. There are two scheduling models. AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at the specific interval. That is, every "interval" milliseconds, regardless of how long the task takes. If the task takes longer than the interval, it will be scheduled at the next available multiple of interval. For exmaple: If the task has an interval of 60 secs and the task takes 70 secs (it better not), the next invocation will happen at 120 seconds. AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should start "interval" milliseconds after the current invocation has finished. Also, the same ast_sched facility for fixed or variable intervals exists. The task's return code in conjunction with the AST_SIP_SCHED_TASK_FIXED or AST_SIP_SCHED_TASK_VARIABLE flags controls the next invocation start time. One res_pjsip.h housekeeping change was made. The pjsip header files were added to the top. There have been a few cases lately where I've needed res_pjsip.h just for ast_sip calls and had compiles fail spectacularly because I didn't add the pjsip header files to my source even though I never referenced any pjsip calls. Finally, a few new convenience APIs were added to astobj2 to make things a little easier in the scheduler. ao2_ref_and_lock() calls ao2_ref() and ao2_lock() in one go. ao2_unlock_and_unref() does the reverse. A few macros were also copied from res_phoneprov because I got tired of having to duplicate the same hash, sort and compare functions over and over again. The AO2_STRING_FIELD_(HASH|SORT|CMP)_FN macros will insert functions suitable for aor_container_alloc into your source. This facility can be used immediately for the situations where we already have a thread that wakes up periodically or do some scheduled work. For the registration and qualify issues, additional sorcery and schema changes would need to be made so that we can easily detect changed objects on a periodic basis without having to pull the entire database back to check. I'm thinking of a last-updated timestamp on the rows but more on this later. Change-Id: I7af6ad2b2d896ea68e478aa1ae201d6dd016ba1c
Diffstat (limited to 'res/res_pjsip')
-rw-r--r--res/res_pjsip/include/res_pjsip_private.h19
-rw-r--r--res/res_pjsip/pjsip_scheduler.c495
2 files changed, 514 insertions, 0 deletions
diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h
index 72a4387f1..04cd85408 100644
--- a/res/res_pjsip/include/res_pjsip_private.h
+++ b/res/res_pjsip/include/res_pjsip_private.h
@@ -325,4 +325,23 @@ struct ast_sip_contact_status *ast_res_pjsip_find_or_create_contact_status(const
*/
int ast_sip_validate_uri_length(const char *uri);
+/*!
+ * \brief Initialize scheduler
+ * \since 13.9.0
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_initialize_scheduler(void);
+
+/*!
+ * \internal
+ * \brief Destroy scheduler
+ * \since 13.9.0
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_destroy_scheduler(void);
+
#endif /* RES_PJSIP_PRIVATE_H_ */
diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c
new file mode 100644
index 000000000..a5d406cb5
--- /dev/null
+++ b/res/res_pjsip/pjsip_scheduler.c
@@ -0,0 +1,495 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2016, Fairview 5 Engineering, LLC
+ *
+ * George Joseph <george.joseph@fairview5.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief res_pjsip Scheduler
+ *
+ * \author George Joseph <george.joseph@fairview5.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_REGISTER_FILE()
+
+#include "asterisk/res_pjsip.h"
+#include "include/res_pjsip_private.h"
+#include "asterisk/res_pjsip_cli.h"
+
+#define TASK_BUCKETS 53
+
+static struct ast_sched_context *scheduler_context;
+static struct ao2_container *tasks;
+static int task_count;
+
+struct ast_sip_sched_task {
+ /*! ast_sip_sched task id */
+ uint32_t task_id;
+ /*! ast_sched scheudler id */
+ int current_scheduler_id;
+ /*! task is currently running */
+ int is_running;
+ /*! task */
+ ast_sip_task task;
+ /*! task data */
+ void *task_data;
+ /*! reschedule interval in milliseconds */
+ int interval;
+ /*! the time the task was queued */
+ struct timeval when_queued;
+ /*! the last time the task was started */
+ struct timeval last_start;
+ /*! the last time the task was ended */
+ struct timeval last_end;
+ /*! times run */
+ int run_count;
+ /*! the task reschedule, cleanup and policy flags */
+ enum ast_sip_scheduler_task_flags flags;
+ /*! the serializer to be used (if any) */
+ struct ast_taskprocessor *serializer;
+ /* A name to be associated with the task */
+ char name[0];
+};
+
+AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name);
+AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name);
+AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name);
+
+static int push_to_serializer(const void *data);
+
+/*
+ * This function is run in the context of the serializer.
+ * It runs the task with a simple call and reschedules based on the result.
+ */
+static int run_task(void *data)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
+ int res;
+ int delay;
+
+ ao2_lock(schtd);
+ schtd->last_start = ast_tvnow();
+ schtd->is_running = 1;
+ schtd->run_count++;
+ ao2_unlock(schtd);
+
+ res = schtd->task(schtd->task_data);
+
+ ao2_lock(schtd);
+ schtd->is_running = 0;
+ schtd->last_end = ast_tvnow();
+
+ /*
+ * Don't restart if the task returned 0 or if the interval
+ * was set to 0 while the task was running
+ */
+ if (!res || !schtd->interval) {
+ schtd->interval = 0;
+ ao2_unlock(schtd);
+ ao2_unlink(tasks, schtd);
+ return -1;
+ }
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) {
+ schtd->interval = res;
+ }
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
+ delay = schtd->interval;
+ } else {
+ delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
+ }
+
+ schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd);
+ if (schtd->current_scheduler_id < 0) {
+ schtd->interval = 0;
+ ao2_unlock(schtd);
+ ao2_unlink(tasks, schtd);
+ return -1;
+ }
+
+ ao2_unlock(schtd);
+
+ return 0;
+}
+
+/*
+ * This function is run by the scheduler thread. Its only job is to push the task
+ * to the serialize and return. It returns 0 so it's not rescheduled.
+ */
+static int push_to_serializer(const void *data)
+{
+ struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
+
+ if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
+ ao2_ref(schtd, -1);
+ }
+
+ return 0;
+}
+
+int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
+{
+ int res;
+
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
+ ao2_unlock_and_unref(schtd);
+ return 0;
+ }
+
+ schtd->interval = 0;
+ ao2_unlock_and_unref(schtd);
+ ao2_unlink(tasks, schtd);
+ res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
+
+ return res;
+}
+
+int ast_sip_sched_task_cancel_by_name(const char *name)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return -1;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return -1;
+ }
+
+ return ast_sip_sched_task_cancel(schtd);
+}
+
+
+int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
+ struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
+{
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ if (queued) {
+ memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
+ }
+ if (last_start) {
+ memcpy(last_start, &schtd->last_start, sizeof(struct timeval));
+ }
+ if (last_end) {
+ memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
+ }
+
+ ao2_unlock_and_unref(schtd);
+
+ return 0;
+}
+
+int ast_sip_sched_task_get_times_by_name(const char *name,
+ struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return -1;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return -1;
+ }
+
+ return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
+}
+
+int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
+{
+ if (maxlen <= 0) {
+ return -1;
+ }
+
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ ast_copy_string(name, schtd->name, maxlen);
+
+ ao2_unlock_and_unref(schtd);
+
+ return 0;
+}
+
+int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
+{
+ int delay;
+ struct timeval since_when;
+ struct timeval now;
+
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ if (schtd->interval) {
+ delay = schtd->interval;
+ now = ast_tvnow();
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
+ since_when = schtd->is_running ? now : schtd->last_end;
+ } else {
+ since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued;
+ }
+
+ delay -= ast_tvdiff_ms(now, since_when);
+
+ delay = delay < 0 ? 0 : delay;
+ } else {
+ delay = -1;
+ }
+
+ ao2_unlock_and_unref(schtd);
+
+ return delay;
+}
+
+int ast_sip_sched_task_get_next_run_by_name(const char *name)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return -1;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return -1;
+ }
+
+ return ast_sip_sched_task_get_next_run(schtd);
+}
+
+int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
+{
+ if (!schtd) {
+ return 0;
+ }
+
+ return schtd->is_running;
+}
+
+int ast_sip_sched_is_task_running_by_name(const char *name)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return 0;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return 0;
+ }
+
+ return schtd->is_running;
+}
+
+static void schtd_destructor(void *data)
+{
+ struct ast_sip_sched_task *schtd = data;
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
+ /* release our own ref, then release the callers if asked to do so */
+ ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
+ } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
+ ast_free(schtd->task_data);
+ }
+}
+
+struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
+ int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags)
+{
+#define ID_LEN 13 /* task_deadbeef */
+ struct ast_sip_sched_task *schtd;
+ int res;
+
+ if (interval < 0) {
+ return NULL;
+ }
+
+ schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
+ if (!schtd) {
+ return NULL;
+ }
+
+ schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
+ schtd->serializer = serializer;
+ schtd->task = sip_task;
+ if (!ast_strlen_zero(name)) {
+ strcpy(schtd->name, name); /* Safe */
+ } else {
+ sprintf(schtd->name, "task_%08x", schtd->task_id);
+ }
+ schtd->task_data = task_data;
+ schtd->flags = flags;
+ schtd->interval = interval;
+ schtd->when_queued = ast_tvnow();
+
+ if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
+ ao2_ref(task_data, +1);
+ }
+ res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd);
+ if (res < 0) {
+ ao2_ref(schtd, -1);
+ return NULL;
+ } else {
+ schtd->current_scheduler_id = res;
+ ao2_link(tasks, schtd);
+ }
+
+ return schtd;
+#undef ID_LEN
+}
+
+static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct ao2_iterator i;
+ struct ast_sip_sched_task *schtd;
+ const char *log_format = ast_logger_get_dateformat();
+ struct ast_tm tm;
+ char queued[32];
+ char last_start[32];
+ char last_end[32];
+ int datelen;
+ struct timeval now = ast_tvnow();
+ const char *separator = "======================================";
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "pjsip show scheduled_tasks";
+ e->usage = "Usage: pjsip show scheduled_tasks\n"
+ " Show all scheduled tasks\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != 3) {
+ return CLI_SHOWUSAGE;
+ }
+
+ ast_localtime(&now, &tm, NULL);
+ datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
+
+ ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
+
+ ast_cli(a->fd, " %1$-24s %2$-8s %3$-9s %4$-7s %6$-*5$s %7$-*5$s %8$-*5$s\n",
+ "Task Name", "Interval", "Times Run", "State",
+ datelen, "Queued", "Last Started", "Last Ended");
+
+ ast_cli(a->fd, " %1$-24.24s %2$-8.8s %3$-9.9s %4$-7.7s %6$-*5$.*5$s %7$-*5$.*5$s %8$-*5$.*5$s\n",
+ separator, separator, separator, separator,
+ datelen, separator, separator, separator);
+
+
+ ao2_ref(tasks, +1);
+ ao2_rdlock(tasks);
+ i = ao2_iterator_init(tasks, 0);
+ while ((schtd = ao2_iterator_next(&i))) {
+
+ ast_localtime(&schtd->when_queued, &tm, NULL);
+ ast_strftime(queued, sizeof(queued), log_format, &tm);
+
+ if (ast_tvzero(schtd->last_start)) {
+ strcpy(last_start, "not yet started");
+ } else {
+ ast_localtime(&schtd->last_start, &tm, NULL);
+ ast_strftime(last_start, sizeof(last_start), log_format, &tm);
+ }
+
+ if (ast_tvzero(schtd->last_end)) {
+ if (ast_tvzero(schtd->last_start)) {
+ strcpy(last_end, "not yet started");
+ } else {
+ strcpy(last_end, "running");
+ }
+ } else {
+ ast_localtime(&schtd->last_end, &tm, NULL);
+ ast_strftime(last_end, sizeof(last_end), log_format, &tm);
+ }
+
+ ast_cli(a->fd, " %1$-24.24s %2$-8.3f %3$-9d %4$-7s %6$-*5$s %7$-*5$s %8$-*5$s\n",
+ schtd->name,
+ schtd->interval / 1000.0,
+ schtd->run_count,
+ schtd->is_running ? "running" : "waiting",
+ datelen, queued, last_start, last_end);
+ ao2_cleanup(schtd);
+ }
+ ao2_iterator_destroy(&i);
+ ao2_unlock(tasks);
+ ao2_ref(tasks, -1);
+ ast_cli(a->fd, "\n");
+
+ return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry cli_commands[] = {
+ AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"),
+};
+
+int ast_sip_initialize_scheduler(void)
+{
+ if (!(scheduler_context = ast_sched_context_create())) {
+ ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
+ return -1;
+ }
+
+ if (ast_sched_start_thread(scheduler_context)) {
+ ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
+ ast_sched_context_destroy(scheduler_context);
+ return -1;
+ }
+
+ tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
+ TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
+ if (!tasks) {
+ ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
+ ast_sched_context_destroy(scheduler_context);
+ return -1;
+ }
+
+ ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
+ return 0;
+}
+
+int ast_sip_destroy_scheduler(void)
+{
+ ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
+ if (scheduler_context) {
+ ast_sched_context_destroy(scheduler_context);
+ }
+
+ ao2_cleanup(tasks);
+ tasks = NULL;
+
+ return 0;
+}