summaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorGeorge Joseph <george.joseph@fairview5.com>2016-03-17 11:28:26 -0600
committerGeorge Joseph <george.joseph@fairview5.com>2016-04-14 13:07:40 -0600
commit9740277713fe308a3f6014a07180a52c3de08b58 (patch)
treee2d443554ec598292461803c4a15214f4a470f4d /include
parent13cb5ea73fad7cc0ae9140ad8070d362f3aa6527 (diff)
res_pjsip: Add serialized scheduler (res_pjsip/pjsip_scheduler.c)
There are several places that do scheduled tasks or periodic housecleaning, each with its own implementation: * res_pjsip_keepalive has a thread that sends keepalives. * pjsip_distributor has a thread that cleans up expired unidentified requests. * res_pjsip_registrar_expire has a thread that cleans up expired contacts. * res_pjsip_pubsub uses ast_sched directly and then calls ast_sip_push_task. * res_pjsip_sdp_rtp also uses ast_sched to send keepalives. There are also places where we should be doing scheduled work but aren't. A good example are the places we have sorcery observers to start registration or qualify. These don't work when changes are made to a backend database without a pjsip reload. We need to check periodically. As a first step to solving these issues, a new ast_sip_sched facility has been created. ast_sip_sched wraps ast_sched but only uses ast_sched as a scheduled queue. When a task is ready to run, ast_sip_task_pusk is called for it. This ensures that the task is executed in a PJLIB registered thread and doesn't hold up the ast_sched thread so it can immediately continue processing the queue. The serializer used by ast_sip_sched is one of your choosing or a random one from the res_pjsip pool if you don't choose one. Another feature is the ability to automatically clean up the task_data when the task expires (if ever). If it's an ao2 object, it will be dereferenced, if it's a malloc'd object it will be freed. This is selectable when the task is scheduled. Even if you choose to not auto dereference an ao2 task data object, the scheduler itself maintains a reference to it while the task is under it's control. This prevents the data from disappearing out from under the task. There are two scheduling models. AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at the specific interval. That is, every "interval" milliseconds, regardless of how long the task takes. If the task takes longer than the interval, it will be scheduled at the next available multiple of interval. For exmaple: If the task has an interval of 60 secs and the task takes 70 secs (it better not), the next invocation will happen at 120 seconds. AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should start "interval" milliseconds after the current invocation has finished. Also, the same ast_sched facility for fixed or variable intervals exists. The task's return code in conjunction with the AST_SIP_SCHED_TASK_FIXED or AST_SIP_SCHED_TASK_VARIABLE flags controls the next invocation start time. One res_pjsip.h housekeeping change was made. The pjsip header files were added to the top. There have been a few cases lately where I've needed res_pjsip.h just for ast_sip calls and had compiles fail spectacularly because I didn't add the pjsip header files to my source even though I never referenced any pjsip calls. Finally, a few new convenience APIs were added to astobj2 to make things a little easier in the scheduler. ao2_ref_and_lock() calls ao2_ref() and ao2_lock() in one go. ao2_unlock_and_unref() does the reverse. A few macros were also copied from res_phoneprov because I got tired of having to duplicate the same hash, sort and compare functions over and over again. The AO2_STRING_FIELD_(HASH|SORT|CMP)_FN macros will insert functions suitable for aor_container_alloc into your source. This facility can be used immediately for the situations where we already have a thread that wakes up periodically or do some scheduled work. For the registration and qualify issues, additional sorcery and schema changes would need to be made so that we can easily detect changed objects on a periodic basis without having to pull the entire database back to check. I'm thinking of a last-updated timestamp on the rows but more on this later. Change-Id: I7af6ad2b2d896ea68e478aa1ae201d6dd016ba1c
Diffstat (limited to 'include')
-rw-r--r--include/asterisk/astobj2.h134
-rw-r--r--include/asterisk/res_pjsip.h235
2 files changed, 367 insertions, 2 deletions
diff --git a/include/asterisk/astobj2.h b/include/asterisk/astobj2.h
index 692cc7cb4..4bd44db76 100644
--- a/include/asterisk/astobj2.h
+++ b/include/asterisk/astobj2.h
@@ -19,6 +19,7 @@
#include "asterisk/compat.h"
#include "asterisk/lock.h"
+#include "asterisk/inline_api.h"
/*! \file
* \ref AstObj2
@@ -638,6 +639,46 @@ int __ao2_trylock(void *a, enum ao2_lock_req lock_how, const char *file, const c
void *ao2_object_get_lockaddr(void *obj);
+/*!
+ * \brief Increment reference count on an object and lock it
+ * \since 13.9.0
+ *
+ * \param[in] obj A pointer to the ao2 object
+ * \retval 0 The object is not an ao2 object or wasn't locked successfully
+ * \retval 1 The object's reference count was incremented and was locked
+ */
+AST_INLINE_API(
+int ao2_ref_and_lock(void *obj),
+{
+ ao2_ref(obj, +1);
+ if (ao2_lock(obj)) {
+ ao2_ref(obj, -1);
+ return 0;
+ }
+ return 1;
+}
+)
+
+/*!
+ * \brief Unlock an object and decrement its reference count
+ * \since 13.9.0
+ *
+ * \param[in] obj A pointer to the ao2 object
+ * \retval 0 The object is not an ao2 object or wasn't unlocked successfully
+ * \retval 1 The object was unlocked and it's reference count was decremented
+ */
+AST_INLINE_API(
+int ao2_unlock_and_unref(void *obj),
+{
+ if (ao2_unlock(obj)) {
+ return 0;
+ }
+ ao2_ref(obj, -1);
+
+ return 1;
+}
+)
+
/*! Global ao2 object holder structure. */
struct ao2_global_obj {
/*! Access lock to the held ao2 object. */
@@ -1985,4 +2026,97 @@ void ao2_iterator_cleanup(struct ao2_iterator *iter);
*/
int ao2_iterator_count(struct ao2_iterator *iter);
+/*!
+ * \brief Creates a hash function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to hash
+ *
+ * AO2_STRING_FIELD_HASH_CB(mystruct, myfield) will produce a function
+ * named mystruct_hash_fn which hashes mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_HASH_FN(stype, field) \
+static int stype ## _hash_fn(const void *obj, const int flags) \
+{ \
+ const struct stype *object = obj; \
+ const char *key; \
+ switch (flags & OBJ_SEARCH_MASK) { \
+ case OBJ_SEARCH_KEY: \
+ key = obj; \
+ break; \
+ case OBJ_SEARCH_OBJECT: \
+ key = object->field; \
+ break; \
+ default: \
+ ast_assert(0); \
+ return 0; \
+ } \
+ return ast_str_hash(key); \
+}
+
+/*!
+ * \brief Creates a compare function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to compare
+ *
+ * AO2_STRING_FIELD_CMP_FN(mystruct, myfield) will produce a function
+ * named mystruct_cmp_fn which compares mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_CMP_FN(stype, field) \
+static int stype ## _cmp_fn(void *obj, void *arg, int flags) \
+{ \
+ const struct stype *object_left = obj, *object_right = arg; \
+ const char *right_key = arg; \
+ int cmp; \
+ switch (flags & OBJ_SEARCH_MASK) { \
+ case OBJ_SEARCH_OBJECT: \
+ right_key = object_right->field; \
+ case OBJ_SEARCH_KEY: \
+ cmp = strcmp(object_left->field, right_key); \
+ break; \
+ case OBJ_SEARCH_PARTIAL_KEY: \
+ cmp = strncmp(object_left->field, right_key, strlen(right_key)); \
+ break; \
+ default: \
+ cmp = 0; \
+ break; \
+ } \
+ if (cmp) { \
+ return 0; \
+ } \
+ return CMP_MATCH; \
+}
+
+/*!
+ * \brief Creates a sort function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to compare
+ *
+ * AO2_STRING_FIELD_SORT_FN(mystruct, myfield) will produce a function
+ * named mystruct_sort_fn which compares mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_SORT_FN(stype, field) \
+static int stype ## _sort_fn(const void *obj, const void *arg, int flags) \
+{ \
+ const struct stype *object_left = obj; \
+ const struct stype *object_right = arg; \
+ const char *right_key = arg; \
+ int cmp; \
+\
+ switch (flags & OBJ_SEARCH_MASK) { \
+ case OBJ_SEARCH_OBJECT: \
+ right_key = object_right->field; \
+ /* Fall through */ \
+ case OBJ_SEARCH_KEY: \
+ cmp = strcmp(object_left->field, right_key); \
+ break; \
+ case OBJ_SEARCH_PARTIAL_KEY: \
+ cmp = strncmp(object_left->field, right_key, strlen(right_key)); \
+ break; \
+ default: \
+ cmp = 0; \
+ break; \
+ } \
+ return cmp; \
+}
+
#endif /* _ASTERISK_ASTOBJ2_H */
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 3a9d61e4c..eef6a54ff 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -19,6 +19,13 @@
#ifndef _RES_PJSIP_H
#define _RES_PJSIP_H
+#include <pjsip.h>
+/* Needed for SUBSCRIBE, NOTIFY, and PUBLISH method definitions */
+#include <pjsip_simple.h>
+#include <pjsip/sip_transaction.h>
+#include <pj/timer.h>
+#include <pjlib.h>
+
#include "asterisk/stringfields.h"
/* Needed for struct ast_sockaddr */
#include "asterisk/netsock2.h"
@@ -1174,8 +1181,9 @@ struct ast_sip_auth *ast_sip_get_artificial_auth(void);
*/
struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
-/*!
- * \page Threading model for SIP
+/*! \defgroup pjsip_threading PJSIP Threading Model
+ * @{
+ * \page PJSIP PJSIP Threading Model
*
* There are three major types of threads that SIP will have to deal with:
* \li Asterisk threads
@@ -1224,6 +1232,19 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* previous tasks pushed with the same serializer have completed. For more information
* on serializers and the benefits they provide, see \ref ast_threadpool_serializer
*
+ * \par Scheduler
+ *
+ * Some situations require that a task run periodically or at a future time. Normally
+ * the ast_sched functionality would be used but ast_sched only uses 1 thread for all
+ * tasks and that thread isn't registered with PJLIB and therefore can't do any PJSIP
+ * related work.
+ *
+ * ast_sip_sched uses ast_sched only as a scheduled queue. When a task is ready to run,
+ * it's pushed to a Serializer to be invoked asynchronously by a Servant. This ensures
+ * that the task is executed in a PJLIB registered thread and allows the ast_sched thread
+ * to immediately continue processing the queue. The Serializer used by ast_sip_sched
+ * is one of your choosing or a random one from the res_pjsip pool if you don't choose one.
+ *
* \note
*
* Do not make assumptions about individual threads based on a corresponding serializer.
@@ -1232,6 +1253,8 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* tasks, even though they are all guaranteed to be executed in sequence.
*/
+typedef int (*ast_sip_task)(void *user_data);
+
/*!
* \brief Create a new serializer for SIP tasks
*
@@ -1369,6 +1392,214 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
int ast_sip_thread_is_servant(void);
/*!
+ * \brief Task flags for the res_pjsip scheduler
+ *
+ * The default is AST_SIP_SCHED_TASK_FIXED
+ * | AST_SIP_SCHED_TASK_DATA_NOT_AO2
+ * | AST_SIP_SCHED_TASK_DATA_NO_CLEANUP
+ * | AST_SIP_SCHED_TASK_PERIODIC
+ */
+enum ast_sip_scheduler_task_flags {
+ /*!
+ * The defaults
+ */
+ AST_SIP_SCHED_TASK_DEFAULTS = (0 << 0),
+
+ /*!
+ * Run at a fixed interval.
+ * Stop scheduling if the callback returns 0.
+ * Any other value is ignored.
+ */
+ AST_SIP_SCHED_TASK_FIXED = (0 << 0),
+ /*!
+ * Run at a variable interval.
+ * Stop scheduling if the callback returns 0.
+ * Any other return value is used as the new interval.
+ */
+ AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),
+
+ /*!
+ * The task data is not an AO2 object.
+ */
+ AST_SIP_SCHED_TASK_DATA_NOT_AO2 = (0 << 1),
+ /*!
+ * The task data is an AO2 object.
+ * A reference count will be held by the scheduler until
+ * after the task has run for the final time (if ever).
+ */
+ AST_SIP_SCHED_TASK_DATA_AO2 = (1 << 1),
+
+ /*!
+ * Don't take any cleanup action on the data
+ */
+ AST_SIP_SCHED_TASK_DATA_NO_CLEANUP = (0 << 3),
+ /*!
+ * If AST_SIP_SCHED_TASK_DATA_AO2 is set, decrement the reference count
+ * otherwise call ast_free on it.
+ */
+ AST_SIP_SCHED_TASK_DATA_FREE = ( 1 << 3 ),
+
+ /*! \brief AST_SIP_SCHED_TASK_PERIODIC
+ * The task is scheduled at multiples of interval
+ * \see Interval
+ */
+ AST_SIP_SCHED_TASK_PERIODIC = (0 << 4),
+ /*! \brief AST_SIP_SCHED_TASK_DELAY
+ * The next invocation of the task is at last finish + interval
+ * \see Interval
+ */
+ AST_SIP_SCHED_TASK_DELAY = (1 << 4),
+};
+
+/*!
+ * \brief Scheduler task data structure
+ */
+struct ast_sip_sched_task;
+
+/*!
+ * \brief Schedule a task to run in the res_pjsip thread pool
+ * \since 13.9.0
+ *
+ * \param serializer The serializer to use. If NULL, don't use a serializer (see note below)
+ * \param interval The invocation interval in milliseconds (see note below)
+ * \param sip_task The task to invoke
+ * \param name An optional name to associate with the task
+ * \param task_data Optional data to pass to the task
+ * \param flags One of enum ast_sip_scheduler_task_type
+ *
+ * \returns Pointer to \ref ast_sip_sched_task ao2 object which must be dereferenced when done.
+ *
+ * \paragraph Serialization
+ *
+ * Specifying a serializer guarantees serialized execution but NOT specifying a serializer
+ * may still result in tasks being effectively serialized if the thread pool is busy.
+ * The point of the serializer BTW is not to prevent parallel executions of the SAME task.
+ * That happens automatically (see below). It's to prevent the task from running at the same
+ * time as other work using the same serializer, whether or not it's being run by the scheduler.
+ *
+ * \paragraph Interval
+ *
+ * The interval is used to calculate the next time the task should run. There are two models.
+ *
+ * \ref AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at the
+ * specific interval. That is, every \ref "interval" milliseconds, regardless of how long the task
+ * takes. If the task takes longer than \ref interval, it will be scheduled at the next available
+ * multiple of \ref interval. For exmaple: If the task has an interval of 60 seconds and the task
+ * takes 70 seconds, the next invocation will happen at 120 seconds.
+ *
+ * \ref AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should start
+ * at \ref interval milliseconds after the current invocation has finished.
+ *
+ */
+struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
+ int interval, ast_sip_task sip_task, char *name, void *task_data,
+ enum ast_sip_scheduler_task_flags flags);
+
+/*!
+ * \brief Cancels the next invocation of a task
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Only cancels future invocations not the currently running invocation.
+ */
+int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Cancels the next invocation of a task by name
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Only cancels future invocations not the currently running invocation.
+ */
+int ast_sip_sched_task_cancel_by_name(const char *name);
+
+/*!
+ * \brief Gets the last start and end times of the task
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \param[out] when_queued Pointer to a timeval structure to contain the time when queued
+ * \param[out] last_start Pointer to a timeval structure to contain the time when last started
+ * \param[out] last_end Pointer to a timeval structure to contain the time when last ended
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Any of the pointers can be NULL if you don't need them.
+ */
+int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
+ struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end);
+
+/*!
+ * \brief Gets the last start and end times of the task by name
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \param[out] when_queued Pointer to a timeval structure to contain the time when queued
+ * \param[out] last_start Pointer to a timeval structure to contain the time when last started
+ * \param[out] last_end Pointer to a timeval structure to contain the time when last ended
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Any of the pointers can be NULL if you don't need them.
+ */
+int ast_sip_sched_task_get_times_by_name(const char *name,
+ struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end);
+
+/*!
+ * \brief Gets the number of milliseconds until the next invocation
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled
+ */
+int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Gets the number of milliseconds until the next invocation
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled
+ */
+int ast_sip_sched_task_get_next_run_by_name(const char *name);
+
+/*!
+ * \brief Checks if the task is currently running
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 not running
+ * \retval 1 running
+ */
+int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Checks if the task is currently running
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \retval 0 not running or not found
+ * \retval 1 running
+ */
+int ast_sip_sched_is_task_running_by_name(const char *name);
+
+/*!
+ * \brief Gets the task name
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 success
+ * \retval 1 failure
+ */
+int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen);
+
+/*!
+ * @}
+ */
+
+/*!
* \brief SIP body description
*
* This contains a type and subtype that will be added as