summaryrefslogtreecommitdiff
path: root/main/sched.c
diff options
context:
space:
mode:
authorRussell Bryant <russell@russellbryant.com>2010-12-20 17:15:54 +0000
committerRussell Bryant <russell@russellbryant.com>2010-12-20 17:15:54 +0000
commitcc0b7e7df540e5833c79c9b12ef880f03a74b922 (patch)
treed1049b4c6e02a7236e3a64c09f10633b99decc64 /main/sched.c
parentcf655aa1c9acabad60dcd9febd952de136b9838d (diff)
Some scheduler API cleanup and improvements.
Previously, I had added the ast_sched_thread stuff that was a generic scheduler thread implementation. However, if you used it, it required using different functions for modifying scheduler contents. This patch reworks how this is done and just allows you to optionally start a thread on the original scheduler context structure that has always been there. This makes it trivial to switch to the generic scheduler thread implementation without having to touch any of the other code that adds or removes scheduler entries. In passing, I made some naming tweaks to add ast_ prefixes where they were not there before. Review: https://reviewboard.asterisk.org/r/1007/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@299091 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/sched.c')
-rw-r--r--main/sched.c234
1 files changed, 104 insertions, 130 deletions
diff --git a/main/sched.c b/main/sched.c
index 8a3602a39..bab8cfbe6 100644
--- a/main/sched.c
+++ b/main/sched.c
@@ -1,9 +1,10 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 1999 - 2008, Digium, Inc.
+ * Copyright (C) 1999 - 2010, Digium, Inc.
*
* Mark Spencer <markster@digium.com>
+ * Russell Bryant <russell@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
@@ -48,6 +49,16 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/heap.h"
#include "asterisk/threadstorage.h"
+/*!
+ * \brief Max num of schedule structs
+ *
+ * \note The max number of schedule structs to keep around
+ * for use. Undefine to disable schedule structure
+ * caching. (Only disable this on very low memory
+ * machines)
+ */
+#define SCHED_MAX_CACHE 128
+
AST_THREADSTORAGE(last_del_id);
struct sched {
@@ -61,13 +72,20 @@ struct sched {
ssize_t __heap_index;
};
-struct sched_context {
+struct sched_thread {
+ pthread_t thread;
+ ast_cond_t cond;
+ unsigned int stop:1;
+};
+
+struct ast_sched_context {
ast_mutex_t lock;
unsigned int eventcnt; /*!< Number of events processed */
unsigned int schedcnt; /*!< Number of outstanding schedule events */
unsigned int highwater; /*!< highest count so far */
struct ast_hashtab *schedq_ht; /*!< hash table for fast searching */
struct ast_heap *sched_heap;
+ struct sched_thread *sched_thread;
#ifdef SCHED_MAX_CACHE
AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */
@@ -75,151 +93,97 @@ struct sched_context {
#endif
};
-struct ast_sched_thread {
- pthread_t thread;
- ast_mutex_t lock;
- ast_cond_t cond;
- struct sched_context *context;
- unsigned int stop:1;
-};
-
static void *sched_run(void *data)
{
- struct ast_sched_thread *st = data;
+ struct ast_sched_context *con = data;
- while (!st->stop) {
+ while (!con->sched_thread->stop) {
int ms;
struct timespec ts = {
- .tv_sec = 0,
+ .tv_sec = 0,
};
- ast_mutex_lock(&st->lock);
+ ast_mutex_lock(&con->lock);
- if (st->stop) {
- ast_mutex_unlock(&st->lock);
+ if (con->sched_thread->stop) {
+ ast_mutex_unlock(&con->lock);
return NULL;
}
- ms = ast_sched_wait(st->context);
+ ms = ast_sched_wait(con);
if (ms == -1) {
- ast_cond_wait(&st->cond, &st->lock);
- } else {
+ ast_cond_wait(&con->sched_thread->cond, &con->lock);
+ } else {
struct timeval tv;
tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
- ast_cond_timedwait(&st->cond, &st->lock, &ts);
+ ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
}
- ast_mutex_unlock(&st->lock);
+ ast_mutex_unlock(&con->lock);
- if (st->stop) {
+ if (con->sched_thread->stop) {
return NULL;
}
- ast_sched_runq(st->context);
+ ast_sched_runq(con);
}
return NULL;
}
-void ast_sched_thread_poke(struct ast_sched_thread *st)
-{
- ast_mutex_lock(&st->lock);
- ast_cond_signal(&st->cond);
- ast_mutex_unlock(&st->lock);
-}
-
-struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
+static void sched_thread_destroy(struct ast_sched_context *con)
{
- return st->context;
-}
-
-struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
-{
- if (st->thread != AST_PTHREADT_NULL) {
- ast_mutex_lock(&st->lock);
- st->stop = 1;
- ast_cond_signal(&st->cond);
- ast_mutex_unlock(&st->lock);
- pthread_join(st->thread, NULL);
- st->thread = AST_PTHREADT_NULL;
+ if (!con->sched_thread) {
+ return;
}
- ast_mutex_destroy(&st->lock);
- ast_cond_destroy(&st->cond);
-
- if (st->context) {
- sched_context_destroy(st->context);
- st->context = NULL;
+ if (con->sched_thread->thread != AST_PTHREADT_NULL) {
+ ast_mutex_lock(&con->lock);
+ con->sched_thread->stop = 1;
+ ast_cond_signal(&con->sched_thread->cond);
+ ast_mutex_unlock(&con->lock);
+ pthread_join(con->sched_thread->thread, NULL);
+ con->sched_thread->thread = AST_PTHREADT_NULL;
}
- ast_free(st);
+ ast_cond_destroy(&con->sched_thread->cond);
- return NULL;
+ ast_free(con->sched_thread);
+
+ con->sched_thread = NULL;
}
-struct ast_sched_thread *ast_sched_thread_create(void)
+int ast_sched_start_thread(struct ast_sched_context *con)
{
- struct ast_sched_thread *st;
+ struct sched_thread *st;
+
+ if (con->sched_thread) {
+ ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
+ return -1;
+ }
if (!(st = ast_calloc(1, sizeof(*st)))) {
- return NULL;
+ return -1;
}
- ast_mutex_init(&st->lock);
ast_cond_init(&st->cond, NULL);
st->thread = AST_PTHREADT_NULL;
- if (!(st->context = sched_context_create())) {
- ast_log(LOG_ERROR, "Failed to create scheduler\n");
- ast_sched_thread_destroy(st);
- return NULL;
- }
-
- if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
- ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
- ast_sched_thread_destroy(st);
- return NULL;
- }
+ con->sched_thread = st;
- return st;
-}
-
-int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
- const void *data, int variable)
-{
- int res;
-
- ast_mutex_lock(&st->lock);
- res = ast_sched_add_variable(st->context, when, cb, data, variable);
- if (res != -1) {
- ast_cond_signal(&st->cond);
- }
- ast_mutex_unlock(&st->lock);
-
- return res;
-}
-
-int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
- const void *data)
-{
- int res;
-
- ast_mutex_lock(&st->lock);
- res = ast_sched_add(st->context, when, cb, data);
- if (res != -1) {
- ast_cond_signal(&st->cond);
+ if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
+ ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
+ sched_thread_destroy(con);
+ return -1;
}
- ast_mutex_unlock(&st->lock);
- return res;
+ return 0;
}
-/* hash routines for sched */
-
static int sched_cmp(const void *a, const void *b)
{
const struct sched *as = a;
@@ -239,12 +203,13 @@ static int sched_time_cmp(void *a, void *b)
return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
}
-struct sched_context *sched_context_create(void)
+struct ast_sched_context *ast_sched_context_create(void)
{
- struct sched_context *tmp;
+ struct ast_sched_context *tmp;
- if (!(tmp = ast_calloc(1, sizeof(*tmp))))
+ if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
return NULL;
+ }
ast_mutex_init(&tmp->lock);
tmp->eventcnt = 1;
@@ -253,23 +218,26 @@ struct sched_context *sched_context_create(void)
if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
offsetof(struct sched, __heap_index)))) {
- sched_context_destroy(tmp);
+ ast_sched_context_destroy(tmp);
return NULL;
}
return tmp;
}
-void sched_context_destroy(struct sched_context *con)
+void ast_sched_context_destroy(struct ast_sched_context *con)
{
struct sched *s;
+ sched_thread_destroy(con);
+ con->sched_thread = NULL;
+
ast_mutex_lock(&con->lock);
#ifdef SCHED_MAX_CACHE
- /* Eliminate the cache */
- while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
+ while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
ast_free(s);
+ }
#endif
if (con->sched_heap) {
@@ -282,14 +250,14 @@ void sched_context_destroy(struct sched_context *con)
ast_hashtab_destroy(con->schedq_ht, NULL);
con->schedq_ht = NULL;
-
- /* And the context */
+
ast_mutex_unlock(&con->lock);
ast_mutex_destroy(&con->lock);
+
ast_free(con);
}
-static struct sched *sched_alloc(struct sched_context *con)
+static struct sched *sched_alloc(struct ast_sched_context *con)
{
struct sched *tmp;
@@ -307,7 +275,7 @@ static struct sched *sched_alloc(struct sched_context *con)
return tmp;
}
-static void sched_release(struct sched_context *con, struct sched *tmp)
+static void sched_release(struct ast_sched_context *con, struct sched *tmp)
{
/*
* Add to the cache, or just free() if we
@@ -327,7 +295,7 @@ static void sched_release(struct sched_context *con, struct sched *tmp)
* Return the number of milliseconds
* until the next scheduled event
*/
-int ast_sched_wait(struct sched_context *con)
+int ast_sched_wait(struct ast_sched_context *con)
{
int ms;
struct sched *s;
@@ -354,7 +322,7 @@ int ast_sched_wait(struct sched_context *con)
* queue, such that the soonest event is
* first in the list.
*/
-static void schedule(struct sched_context *con, struct sched *s)
+static void schedule(struct ast_sched_context *con, struct sched *s)
{
ast_heap_push(con->sched_heap, s);
@@ -387,7 +355,7 @@ static int sched_settime(struct timeval *t, int when)
return 0;
}
-int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
+int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
{
/* 0 means the schedule item is new; do not delete */
if (old_id > 0) {
@@ -399,7 +367,7 @@ int ast_sched_replace_variable(int old_id, struct sched_context *con, int when,
/*! \brief
* Schedule callback(data) to happen when ms into the future
*/
-int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
+int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
{
struct sched *tmp;
int res = -1;
@@ -426,12 +394,15 @@ int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb cal
if (option_debug)
ast_sched_dump(con);
#endif
+ if (con->sched_thread) {
+ ast_cond_signal(&con->sched_thread->cond);
+ }
ast_mutex_unlock(&con->lock);
return res;
}
-int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
+int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
{
if (old_id > -1) {
AST_SCHED_DEL(con, old_id);
@@ -439,12 +410,12 @@ int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched
return ast_sched_add(con, when, callback, data);
}
-int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
+int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
{
return ast_sched_add_variable(con, when, callback, data, 0);
}
-const void *ast_sched_find_data(struct sched_context *con, int id)
+const void *ast_sched_find_data(struct ast_sched_context *con, int id)
{
struct sched tmp,*res;
tmp.id = id;
@@ -461,9 +432,9 @@ const void *ast_sched_find_data(struct sched_context *con, int id)
* id.
*/
#ifndef AST_DEVMODE
-int ast_sched_del(struct sched_context *con, int id)
+int ast_sched_del(struct ast_sched_context *con, int id)
#else
-int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
+int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
#endif
{
struct sched *s, tmp = {
@@ -498,6 +469,9 @@ int _ast_sched_del(struct sched_context *con, int id, const char *file, int line
if (option_debug)
ast_sched_dump(con);
#endif
+ if (con->sched_thread) {
+ ast_cond_signal(&con->sched_thread->cond);
+ }
ast_mutex_unlock(&con->lock);
if (!s && *last_id != id) {
@@ -520,7 +494,7 @@ int _ast_sched_del(struct sched_context *con, int id, const char *file, int line
return 0;
}
-void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
+void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
{
int i, x;
struct sched *cur;
@@ -558,7 +532,7 @@ void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct as
}
/*! \brief Dump the contents of the scheduler to LOG_DEBUG */
-void ast_sched_dump(struct sched_context *con)
+void ast_sched_dump(struct ast_sched_context *con)
{
struct sched *q;
struct timeval when = ast_tvnow();
@@ -593,7 +567,7 @@ void ast_sched_dump(struct sched_context *con)
/*! \brief
* Launch all events which need to be run at this time.
*/
-int ast_sched_runq(struct sched_context *con)
+int ast_sched_runq(struct ast_sched_context *con)
{
struct sched *current;
struct timeval when;
@@ -601,7 +575,7 @@ int ast_sched_runq(struct sched_context *con)
int res;
DEBUG(ast_debug(1, "ast_sched_runq()\n"));
-
+
ast_mutex_lock(&con->lock);
when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
@@ -614,7 +588,7 @@ int ast_sched_runq(struct sched_context *con)
if (ast_tvcmp(current->when, when) != -1) {
break;
}
-
+
current = ast_heap_pop(con->sched_heap);
if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
@@ -631,13 +605,13 @@ int ast_sched_runq(struct sched_context *con)
* the schedule queue. If that's what it wants to do, it
* should return 0.
*/
-
+
ast_mutex_unlock(&con->lock);
res = current->callback(current->data);
ast_mutex_lock(&con->lock);
-
+
if (res) {
- /*
+ /*
* If they return non-zero, we should schedule them to be
* run again.
*/
@@ -653,27 +627,27 @@ int ast_sched_runq(struct sched_context *con)
}
ast_mutex_unlock(&con->lock);
-
+
return numevents;
}
-long ast_sched_when(struct sched_context *con,int id)
+long ast_sched_when(struct ast_sched_context *con,int id)
{
struct sched *s, tmp;
long secs = -1;
DEBUG(ast_debug(1, "ast_sched_when()\n"));
ast_mutex_lock(&con->lock);
-
+
/* these next 2 lines replace a lookup loop */
tmp.id = id;
s = ast_hashtab_lookup(con->schedq_ht, &tmp);
-
+
if (s) {
struct timeval now = ast_tvnow();
secs = s->when.tv_sec - now.tv_sec;
}
ast_mutex_unlock(&con->lock);
-
+
return secs;
}