summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorGeorge Joseph <george.joseph@fairview5.com>2016-03-17 11:28:26 -0600
committerGeorge Joseph <george.joseph@fairview5.com>2016-04-14 13:07:40 -0600
commit9740277713fe308a3f6014a07180a52c3de08b58 (patch)
treee2d443554ec598292461803c4a15214f4a470f4d /tests
parent13cb5ea73fad7cc0ae9140ad8070d362f3aa6527 (diff)
res_pjsip: Add serialized scheduler (res_pjsip/pjsip_scheduler.c)
There are several places that do scheduled tasks or periodic housecleaning, each with its own implementation: * res_pjsip_keepalive has a thread that sends keepalives. * pjsip_distributor has a thread that cleans up expired unidentified requests. * res_pjsip_registrar_expire has a thread that cleans up expired contacts. * res_pjsip_pubsub uses ast_sched directly and then calls ast_sip_push_task. * res_pjsip_sdp_rtp also uses ast_sched to send keepalives. There are also places where we should be doing scheduled work but aren't. A good example are the places we have sorcery observers to start registration or qualify. These don't work when changes are made to a backend database without a pjsip reload. We need to check periodically. As a first step to solving these issues, a new ast_sip_sched facility has been created. ast_sip_sched wraps ast_sched but only uses ast_sched as a scheduled queue. When a task is ready to run, ast_sip_task_pusk is called for it. This ensures that the task is executed in a PJLIB registered thread and doesn't hold up the ast_sched thread so it can immediately continue processing the queue. The serializer used by ast_sip_sched is one of your choosing or a random one from the res_pjsip pool if you don't choose one. Another feature is the ability to automatically clean up the task_data when the task expires (if ever). If it's an ao2 object, it will be dereferenced, if it's a malloc'd object it will be freed. This is selectable when the task is scheduled. Even if you choose to not auto dereference an ao2 task data object, the scheduler itself maintains a reference to it while the task is under it's control. This prevents the data from disappearing out from under the task. There are two scheduling models. AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at the specific interval. That is, every "interval" milliseconds, regardless of how long the task takes. If the task takes longer than the interval, it will be scheduled at the next available multiple of interval. For exmaple: If the task has an interval of 60 secs and the task takes 70 secs (it better not), the next invocation will happen at 120 seconds. AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should start "interval" milliseconds after the current invocation has finished. Also, the same ast_sched facility for fixed or variable intervals exists. The task's return code in conjunction with the AST_SIP_SCHED_TASK_FIXED or AST_SIP_SCHED_TASK_VARIABLE flags controls the next invocation start time. One res_pjsip.h housekeeping change was made. The pjsip header files were added to the top. There have been a few cases lately where I've needed res_pjsip.h just for ast_sip calls and had compiles fail spectacularly because I didn't add the pjsip header files to my source even though I never referenced any pjsip calls. Finally, a few new convenience APIs were added to astobj2 to make things a little easier in the scheduler. ao2_ref_and_lock() calls ao2_ref() and ao2_lock() in one go. ao2_unlock_and_unref() does the reverse. A few macros were also copied from res_phoneprov because I got tired of having to duplicate the same hash, sort and compare functions over and over again. The AO2_STRING_FIELD_(HASH|SORT|CMP)_FN macros will insert functions suitable for aor_container_alloc into your source. This facility can be used immediately for the situations where we already have a thread that wakes up periodically or do some scheduled work. For the registration and qualify issues, additional sorcery and schema changes would need to be made so that we can easily detect changed objects on a periodic basis without having to pull the entire database back to check. I'm thinking of a last-updated timestamp on the rows but more on this later. Change-Id: I7af6ad2b2d896ea68e478aa1ae201d6dd016ba1c
Diffstat (limited to 'tests')
-rw-r--r--tests/test_res_pjsip_scheduler.c400
1 files changed, 400 insertions, 0 deletions
diff --git a/tests/test_res_pjsip_scheduler.c b/tests/test_res_pjsip_scheduler.c
new file mode 100644
index 000000000..f9a1633ac
--- /dev/null
+++ b/tests/test_res_pjsip_scheduler.c
@@ -0,0 +1,400 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2016, Fairview 5 Engineering, LLC
+ *
+ * George Joseph <george.joseph@fairview5.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \brief res_pjsip scheduler tests
+ *
+ * \author George Joseph <george.joseph@fairview5.com>
+ *
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <depend>res_pjsip</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_REGISTER_FILE()
+
+#include <pjsip.h>
+#include "asterisk/test.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/res_pjsip.h"
+#include "asterisk/utils.h"
+
+#define CATEGORY "/res/res_pjsip/scheduler/"
+
+struct test_data {
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ pthread_t tid;
+ struct timeval test_start;
+ struct timeval task_start;
+ struct timeval task_end;
+ int is_servant;
+ int interval;
+ int sleep;
+ int done;
+ struct ast_test *test;
+};
+
+#define S2U(x) (long int)(x * 1000 * 1000)
+#define M2U(x) (long int)(x * 1000)
+
+static int task_1(void *data)
+{
+ struct test_data *test = data;
+
+ test->done = 0;
+ test->task_start = ast_tvnow();
+ test->tid = pthread_self();
+ test->is_servant = ast_sip_thread_is_servant();
+ usleep(M2U(test->sleep));
+ test->task_end = ast_tvnow();
+
+ ast_mutex_lock(&test->lock);
+ test->done = 1;
+ ast_mutex_unlock(&test->lock);
+ ast_cond_signal(&test->cond);
+
+ return test->interval;
+}
+
+
+static void data_cleanup(void *data)
+{
+ struct test_data *test_data = data;
+ ast_mutex_destroy(&test_data->lock);
+ ast_cond_destroy(&test_data->cond);
+}
+
+#define waitfor(x) \
+{ \
+ ast_mutex_lock(&(x)->lock); \
+ while (!(x)->done) { \
+ ast_cond_wait(&(x)->cond, &(x)->lock); \
+ } \
+ (x)->done = 0; \
+ ast_mutex_unlock(&(x)->lock); \
+}
+
+static int scheduler(struct ast_test *test, int serialized)
+{
+ RAII_VAR(struct ast_taskprocessor *, tp1, NULL, ast_taskprocessor_unreference);
+ RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup);
+ RAII_VAR(struct test_data *, test_data2, ao2_alloc(sizeof(*test_data2), data_cleanup), ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task1, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task2, NULL, ao2_cleanup);
+ int duration;
+ int delay;
+ struct timeval task1_start;
+
+ ast_test_validate(test, test_data1 != NULL);
+ ast_test_validate(test, test_data2 != NULL);
+
+ test_data1->test = test;
+ test_data1->test_start = ast_tvnow();
+ test_data1->interval = 2000;
+ test_data1->sleep = 1000;
+ ast_mutex_init(&test_data1->lock);
+ ast_cond_init(&test_data1->cond, NULL);
+
+ test_data2->test = test;
+ test_data2->test_start = ast_tvnow();
+ test_data2->interval = 2000;
+ test_data2->sleep = 1000;
+ ast_mutex_init(&test_data2->lock);
+ ast_cond_init(&test_data2->cond, NULL);
+
+ if (serialized) {
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ (test_data1->interval + test_data1->sleep + (MAX(test_data1->interval - test_data2->interval, 0)) + test_data2->sleep) / 1000.0);
+ tp1 = ast_sip_create_serializer();
+ ast_test_validate(test, (tp1 != NULL));
+ } else {
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ ((MAX(test_data1->interval, test_data2->interval) + MAX(test_data1->sleep, test_data2->sleep)) / 1000.0));
+ }
+
+ task1 = ast_sip_schedule_task(tp1, test_data1->interval, task_1, NULL, test_data1, AST_SIP_SCHED_TASK_FIXED);
+ ast_test_validate(test, task1 != NULL);
+
+ task2 = ast_sip_schedule_task(tp1, test_data2->interval, task_1, NULL, test_data2, AST_SIP_SCHED_TASK_FIXED);
+ ast_test_validate(test, task2 != NULL);
+
+ waitfor(test_data1);
+ ast_sip_sched_task_cancel(task1);
+ ast_test_validate(test, test_data1->is_servant);
+
+ duration = ast_tvdiff_ms(test_data1->task_end, test_data1->test_start);
+ ast_test_validate(test, (duration > ((test_data1->interval + test_data1->sleep) * 0.9))
+ && (duration < ((test_data1->interval + test_data1->sleep) * 1.1)));
+
+ ast_sip_sched_task_get_times(task1, NULL, &task1_start, NULL);
+ delay = ast_tvdiff_ms(task1_start, test_data1->test_start);
+ ast_test_validate(test, (delay > (test_data1->interval * 0.9)
+ && (delay < (test_data1->interval * 1.1))));
+
+ waitfor(test_data2);
+ ast_sip_sched_task_cancel(task2);
+ ast_test_validate(test, test_data2->is_servant);
+
+ if (serialized) {
+ ast_test_validate(test, test_data1->tid == test_data2->tid);
+ ast_test_validate(test, ast_tvdiff_ms(test_data2->task_start, test_data1->task_end) >= 0);
+ } else {
+ ast_test_validate(test, test_data1->tid != test_data2->tid);
+ }
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(serialized_scheduler)
+{
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip serialized scheduler";
+ info->description = "Test res_pjsip serialized scheduler";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ return scheduler(test, 1);
+}
+
+AST_TEST_DEFINE(unserialized_scheduler)
+{
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip unserialized scheduler";
+ info->description = "Test res_pjsip unserialized scheduler";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ return scheduler(test, 0);
+}
+
+static int run_count;
+static int destruct_count;
+
+static int dummy_task(void *data)
+{
+ int *sleep = data;
+
+ usleep(M2U(*sleep));
+ run_count++;
+
+ return 0;
+}
+
+static void test_destructor(void *data)
+{
+ destruct_count++;
+}
+
+AST_TEST_DEFINE(scheduler_cleanup)
+{
+ RAII_VAR(int *, sleep, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+ int interval;
+ int when;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip scheduler cleanup";
+ info->description = "Test res_pjsip scheduler cleanup";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ destruct_count = 0;
+ interval = 1000;
+
+ sleep = ao2_alloc(sizeof(*sleep), test_destructor);
+ ast_test_validate(test, sleep != NULL);
+ *sleep = 500;
+
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ ((interval * 1.1) + *sleep) / 1000.0);
+
+ task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep,
+ AST_SIP_SCHED_TASK_DATA_AO2 | AST_SIP_SCHED_TASK_DATA_FREE);
+ ast_test_validate(test, task != NULL);
+ usleep(M2U(interval * 0.5));
+ when = ast_sip_sched_task_get_next_run(task);
+ ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6)));
+ usleep(M2U(interval * 0.6));
+ ast_test_validate(test, ast_sip_sched_is_task_running(task));
+
+ usleep(M2U(*sleep));
+
+ ast_test_validate(test, (ast_sip_sched_is_task_running(task) == 0));
+ when = ast_sip_sched_task_get_next_run(task);
+ ast_test_validate(test, (when < 0), res, error);
+ ast_test_validate(test, (ao2_ref(task, 0) == 1));
+ ao2_ref(task, -1);
+ task = NULL;
+ ast_test_validate(test, (destruct_count == 1));
+ sleep = NULL;
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(scheduler_cancel)
+{
+ RAII_VAR(int *, sleep, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+ int interval;
+ int when;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip scheduler cancel task";
+ info->description = "Test res_pjsip scheduler cancel task";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ destruct_count = 0;
+ interval = 1000;
+
+ sleep = ao2_alloc(sizeof(*sleep), test_destructor);
+ ast_test_validate(test, sleep != NULL);
+ *sleep = 500;
+
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ (interval + *sleep) / 1000.0);
+
+ task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep, AST_SIP_SCHED_TASK_DATA_NO_CLEANUP);
+ ast_test_validate(test, task != NULL);
+
+ usleep(M2U(interval * 0.5));
+ when = ast_sip_sched_task_get_next_run_by_name("dummy");
+ ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6)));
+ ast_test_validate(test, !ast_sip_sched_is_task_running_by_name("dummy"));
+ ast_test_validate(test, ao2_ref(task, 0) == 2);
+
+ ast_sip_sched_task_cancel_by_name("dummy");
+
+ when = ast_sip_sched_task_get_next_run(task);
+ ast_test_validate(test, when < 0);
+
+ usleep(M2U(interval));
+ ast_test_validate(test, run_count == 0);
+ ast_test_validate(test, destruct_count == 0);
+ ast_test_validate(test, ao2_ref(task, 0) == 1);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(scheduler_policy)
+{
+ RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+ int when;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip scheduler cancel task";
+ info->description = "Test res_pjsip scheduler cancel task";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ ast_test_validate(test, test_data1 != NULL);
+
+ destruct_count = 0;
+ run_count = 0;
+ test_data1->test = test;
+ test_data1->test_start = ast_tvnow();
+ test_data1->interval = 1000;
+ test_data1->sleep = 500;
+ ast_mutex_init(&test_data1->lock);
+ ast_cond_init(&test_data1->cond, NULL);
+
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ ((test_data1->interval * 3) + test_data1->sleep) / 1000.0);
+
+ task = ast_sip_schedule_task(NULL, test_data1->interval, task_1, "test_1", test_data1,
+ AST_SIP_SCHED_TASK_DATA_NO_CLEANUP | AST_SIP_SCHED_TASK_PERIODIC);
+ ast_test_validate(test, task != NULL);
+
+ waitfor(test_data1);
+ when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+ ast_test_validate(test, when > test_data1->interval * 0.9 && when < test_data1->interval * 1.1);
+
+ waitfor(test_data1);
+ when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+ ast_test_validate(test, when > test_data1->interval * 2 * 0.9 && when < test_data1->interval * 2 * 1.1);
+
+ waitfor(test_data1);
+ when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+ ast_test_validate(test, when > test_data1->interval * 3 * 0.9 && when < test_data1->interval * 3 * 1.1);
+
+ ast_sip_sched_task_cancel(task);
+ ao2_ref(task, -1);
+ task = NULL;
+
+ return AST_TEST_PASS;
+}
+
+static int load_module(void)
+{
+ CHECK_PJSIP_MODULE_LOADED();
+
+ AST_TEST_REGISTER(serialized_scheduler);
+ AST_TEST_REGISTER(unserialized_scheduler);
+ AST_TEST_REGISTER(scheduler_cleanup);
+ AST_TEST_REGISTER(scheduler_cancel);
+ AST_TEST_REGISTER(scheduler_policy);
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+static int unload_module(void)
+{
+ AST_TEST_UNREGISTER(scheduler_cancel);
+ AST_TEST_UNREGISTER(scheduler_cleanup);
+ AST_TEST_UNREGISTER(unserialized_scheduler);
+ AST_TEST_UNREGISTER(serialized_scheduler);
+ AST_TEST_UNREGISTER(scheduler_policy);
+ return 0;
+}
+
+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "res_pjsip scheduler test module");