diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/ccss.c | 19 | ||||
-rw-r--r-- | main/cdr.c | 4 | ||||
-rw-r--r-- | main/channel.c | 9 | ||||
-rw-r--r-- | main/dnsmgr.c | 4 | ||||
-rw-r--r-- | main/rtp_engine.c | 2 | ||||
-rw-r--r-- | main/sched.c | 234 | ||||
-rw-r--r-- | main/udptl.c | 4 |
7 files changed, 127 insertions, 149 deletions
diff --git a/main/ccss.c b/main/ccss.c index 4dcacd360..0b703b4cb 100644 --- a/main/ccss.c +++ b/main/ccss.c @@ -68,9 +68,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") */ /*! - * The sched_thread ID used for all generic CC timeouts + * The ast_sched_context used for all generic CC timeouts */ -static struct ast_sched_thread *cc_sched_thread; +static struct ast_sched_context *cc_sched_context; /*! * Counter used to create core IDs for CC calls. Each new * core ID is created by atomically adding 1 to the core_id_counter @@ -1235,7 +1235,7 @@ static int cc_generic_monitor_request_cc(struct ast_cc_monitor *monitor, int *av when = service == AST_CC_CCBS ? ast_get_ccbs_available_timer(monitor->interface->config_params) : ast_get_ccnr_available_timer(monitor->interface->config_params); - *available_timer_id = ast_sched_thread_add(cc_sched_thread, when * 1000, + *available_timer_id = ast_sched_add(cc_sched_context, when * 1000, ast_cc_available_timer_expire, cc_ref(monitor, "Give the scheduler a monitor reference")); if (*available_timer_id == -1) { cc_unref(monitor, "Failed to schedule available timer. (monitor)"); @@ -1333,7 +1333,7 @@ static int cc_generic_monitor_cancel_available_timer(struct ast_cc_monitor *moni ast_log_dynamic_level(cc_logger_level, "Core %d: Canceling generic monitor available timer for monitor %s\n", monitor->core_id, monitor->interface->device_name); - if (!ast_sched_thread_del(cc_sched_thread, *sched_id)) { + if (!ast_sched_del(cc_sched_context, *sched_id)) { cc_unref(monitor, "Remove scheduler's reference to the monitor"); } *sched_id = -1; @@ -2377,13 +2377,13 @@ static int cc_generic_agent_start_offer_timer(struct ast_cc_agent *agent) int sched_id; struct cc_generic_agent_pvt *generic_pvt = agent->private_data; - ast_assert(cc_sched_thread != NULL); + ast_assert(cc_sched_context != NULL); ast_assert(agent->cc_params != NULL); when = ast_get_cc_offer_timer(agent->cc_params) * 1000; ast_log_dynamic_level(cc_logger_level, "Core %d: About to schedule offer timer expiration for %d ms\n", agent->core_id, when); - if ((sched_id = ast_sched_thread_add(cc_sched_thread, when, offer_timer_expire, cc_ref(agent, "Give scheduler an agent ref"))) == -1) { + if ((sched_id = ast_sched_add(cc_sched_context, when, offer_timer_expire, cc_ref(agent, "Give scheduler an agent ref"))) == -1) { return -1; } generic_pvt->offer_timer_id = sched_id; @@ -2395,7 +2395,7 @@ static int cc_generic_agent_stop_offer_timer(struct ast_cc_agent *agent) struct cc_generic_agent_pvt *generic_pvt = agent->private_data; if (generic_pvt->offer_timer_id != -1) { - if (!ast_sched_thread_del(cc_sched_thread, generic_pvt->offer_timer_id)) { + if (!ast_sched_del(cc_sched_context, generic_pvt->offer_timer_id)) { cc_unref(agent, "Remove scheduler's reference to the agent"); } generic_pvt->offer_timer_id = -1; @@ -4192,7 +4192,10 @@ int ast_cc_init(void) if (!(cc_core_taskprocessor = ast_taskprocessor_get("CCSS core", TPS_REF_DEFAULT))) { return -1; } - if (!(cc_sched_thread = ast_sched_thread_create())) { + if (!(cc_sched_context = ast_sched_context_create())) { + return -1; + } + if (ast_sched_start_thread(cc_sched_context)) { return -1; } res = ast_register_application2(ccreq_app, ccreq_exec, NULL, NULL, NULL); diff --git a/main/cdr.c b/main/cdr.c index f2ade2192..1677b1ba3 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -80,7 +80,7 @@ static int cdr_sequence = 0; static int cdr_seq_inc(struct ast_cdr *cdr); -static struct sched_context *sched; +static struct ast_sched_context *sched; static int cdr_sched = -1; static pthread_t cdr_thread = AST_PTHREADT_NULL; @@ -1616,7 +1616,7 @@ int ast_cdr_engine_init(void) { int res; - sched = sched_context_create(); + sched = ast_sched_context_create(); if (!sched) { ast_log(LOG_ERROR, "Unable to create schedule context.\n"); return -1; diff --git a/main/channel.c b/main/channel.c index 0e5b72308..723d60ef6 100644 --- a/main/channel.c +++ b/main/channel.c @@ -1103,7 +1103,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char tmp->epfd = epoll_create(25); #endif - if (!(tmp->sched = sched_context_create())) { + if (!(tmp->sched = ast_sched_context_create())) { ast_log(LOG_WARNING, "Channel allocation failed: Unable to create schedule context\n"); return ast_channel_unref(tmp); } @@ -2339,8 +2339,9 @@ static void ast_channel_destructor(void *obj) ast_free(chan->tech_pvt); } - if (chan->sched) - sched_context_destroy(chan->sched); + if (chan->sched) { + ast_sched_context_destroy(chan->sched); + } if (chan->name) { char *dashptr; @@ -2710,7 +2711,7 @@ int ast_hangup(struct ast_channel *chan) chan->vstream = NULL; } if (chan->sched) { - sched_context_destroy(chan->sched); + ast_sched_context_destroy(chan->sched); chan->sched = NULL; } diff --git a/main/dnsmgr.c b/main/dnsmgr.c index 7fb5f8803..23e1ab6ef 100644 --- a/main/dnsmgr.c +++ b/main/dnsmgr.c @@ -45,7 +45,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/manager.h" #include "asterisk/acl.h" -static struct sched_context *sched; +static struct ast_sched_context *sched; static int refresh_sched = -1; static pthread_t refresh_thread = AST_PTHREADT_NULL; @@ -351,7 +351,7 @@ static struct ast_cli_entry cli_status = AST_CLI_DEFINE(handle_cli_status, "Disp int dnsmgr_init(void) { - if (!(sched = sched_context_create())) { + if (!(sched = ast_sched_context_create())) { ast_log(LOG_ERROR, "Unable to create schedule context.\n"); return -1; } diff --git a/main/rtp_engine.c b/main/rtp_engine.c index 0fecff8cf..4a4dbf595 100644 --- a/main/rtp_engine.c +++ b/main/rtp_engine.c @@ -300,7 +300,7 @@ int ast_rtp_instance_destroy(struct ast_rtp_instance *instance) } struct ast_rtp_instance *ast_rtp_instance_new(const char *engine_name, - struct sched_context *sched, const struct ast_sockaddr *sa, + struct ast_sched_context *sched, const struct ast_sockaddr *sa, void *data) { struct ast_sockaddr address = {{0,}}; diff --git a/main/sched.c b/main/sched.c index 8a3602a39..bab8cfbe6 100644 --- a/main/sched.c +++ b/main/sched.c @@ -1,9 +1,10 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 1999 - 2008, Digium, Inc. + * Copyright (C) 1999 - 2010, Digium, Inc. * * Mark Spencer <markster@digium.com> + * Russell Bryant <russell@digium.com> * * See http://www.asterisk.org for more information about * the Asterisk project. Please do not directly contact @@ -48,6 +49,16 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/heap.h" #include "asterisk/threadstorage.h" +/*! + * \brief Max num of schedule structs + * + * \note The max number of schedule structs to keep around + * for use. Undefine to disable schedule structure + * caching. (Only disable this on very low memory + * machines) + */ +#define SCHED_MAX_CACHE 128 + AST_THREADSTORAGE(last_del_id); struct sched { @@ -61,13 +72,20 @@ struct sched { ssize_t __heap_index; }; -struct sched_context { +struct sched_thread { + pthread_t thread; + ast_cond_t cond; + unsigned int stop:1; +}; + +struct ast_sched_context { ast_mutex_t lock; unsigned int eventcnt; /*!< Number of events processed */ unsigned int schedcnt; /*!< Number of outstanding schedule events */ unsigned int highwater; /*!< highest count so far */ struct ast_hashtab *schedq_ht; /*!< hash table for fast searching */ struct ast_heap *sched_heap; + struct sched_thread *sched_thread; #ifdef SCHED_MAX_CACHE AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */ @@ -75,151 +93,97 @@ struct sched_context { #endif }; -struct ast_sched_thread { - pthread_t thread; - ast_mutex_t lock; - ast_cond_t cond; - struct sched_context *context; - unsigned int stop:1; -}; - static void *sched_run(void *data) { - struct ast_sched_thread *st = data; + struct ast_sched_context *con = data; - while (!st->stop) { + while (!con->sched_thread->stop) { int ms; struct timespec ts = { - .tv_sec = 0, + .tv_sec = 0, }; - ast_mutex_lock(&st->lock); + ast_mutex_lock(&con->lock); - if (st->stop) { - ast_mutex_unlock(&st->lock); + if (con->sched_thread->stop) { + ast_mutex_unlock(&con->lock); return NULL; } - ms = ast_sched_wait(st->context); + ms = ast_sched_wait(con); if (ms == -1) { - ast_cond_wait(&st->cond, &st->lock); - } else { + ast_cond_wait(&con->sched_thread->cond, &con->lock); + } else { struct timeval tv; tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000)); ts.tv_sec = tv.tv_sec; ts.tv_nsec = tv.tv_usec * 1000; - ast_cond_timedwait(&st->cond, &st->lock, &ts); + ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts); } - ast_mutex_unlock(&st->lock); + ast_mutex_unlock(&con->lock); - if (st->stop) { + if (con->sched_thread->stop) { return NULL; } - ast_sched_runq(st->context); + ast_sched_runq(con); } return NULL; } -void ast_sched_thread_poke(struct ast_sched_thread *st) -{ - ast_mutex_lock(&st->lock); - ast_cond_signal(&st->cond); - ast_mutex_unlock(&st->lock); -} - -struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st) +static void sched_thread_destroy(struct ast_sched_context *con) { - return st->context; -} - -struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st) -{ - if (st->thread != AST_PTHREADT_NULL) { - ast_mutex_lock(&st->lock); - st->stop = 1; - ast_cond_signal(&st->cond); - ast_mutex_unlock(&st->lock); - pthread_join(st->thread, NULL); - st->thread = AST_PTHREADT_NULL; + if (!con->sched_thread) { + return; } - ast_mutex_destroy(&st->lock); - ast_cond_destroy(&st->cond); - - if (st->context) { - sched_context_destroy(st->context); - st->context = NULL; + if (con->sched_thread->thread != AST_PTHREADT_NULL) { + ast_mutex_lock(&con->lock); + con->sched_thread->stop = 1; + ast_cond_signal(&con->sched_thread->cond); + ast_mutex_unlock(&con->lock); + pthread_join(con->sched_thread->thread, NULL); + con->sched_thread->thread = AST_PTHREADT_NULL; } - ast_free(st); + ast_cond_destroy(&con->sched_thread->cond); - return NULL; + ast_free(con->sched_thread); + + con->sched_thread = NULL; } -struct ast_sched_thread *ast_sched_thread_create(void) +int ast_sched_start_thread(struct ast_sched_context *con) { - struct ast_sched_thread *st; + struct sched_thread *st; + + if (con->sched_thread) { + ast_log(LOG_ERROR, "Thread already started on this scheduler context\n"); + return -1; + } if (!(st = ast_calloc(1, sizeof(*st)))) { - return NULL; + return -1; } - ast_mutex_init(&st->lock); ast_cond_init(&st->cond, NULL); st->thread = AST_PTHREADT_NULL; - if (!(st->context = sched_context_create())) { - ast_log(LOG_ERROR, "Failed to create scheduler\n"); - ast_sched_thread_destroy(st); - return NULL; - } - - if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) { - ast_log(LOG_ERROR, "Failed to create scheduler thread\n"); - ast_sched_thread_destroy(st); - return NULL; - } + con->sched_thread = st; - return st; -} - -int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb, - const void *data, int variable) -{ - int res; - - ast_mutex_lock(&st->lock); - res = ast_sched_add_variable(st->context, when, cb, data, variable); - if (res != -1) { - ast_cond_signal(&st->cond); - } - ast_mutex_unlock(&st->lock); - - return res; -} - -int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb, - const void *data) -{ - int res; - - ast_mutex_lock(&st->lock); - res = ast_sched_add(st->context, when, cb, data); - if (res != -1) { - ast_cond_signal(&st->cond); + if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) { + ast_log(LOG_ERROR, "Failed to create scheduler thread\n"); + sched_thread_destroy(con); + return -1; } - ast_mutex_unlock(&st->lock); - return res; + return 0; } -/* hash routines for sched */ - static int sched_cmp(const void *a, const void *b) { const struct sched *as = a; @@ -239,12 +203,13 @@ static int sched_time_cmp(void *a, void *b) return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when); } -struct sched_context *sched_context_create(void) +struct ast_sched_context *ast_sched_context_create(void) { - struct sched_context *tmp; + struct ast_sched_context *tmp; - if (!(tmp = ast_calloc(1, sizeof(*tmp)))) + if (!(tmp = ast_calloc(1, sizeof(*tmp)))) { return NULL; + } ast_mutex_init(&tmp->lock); tmp->eventcnt = 1; @@ -253,23 +218,26 @@ struct sched_context *sched_context_create(void) if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp, offsetof(struct sched, __heap_index)))) { - sched_context_destroy(tmp); + ast_sched_context_destroy(tmp); return NULL; } return tmp; } -void sched_context_destroy(struct sched_context *con) +void ast_sched_context_destroy(struct ast_sched_context *con) { struct sched *s; + sched_thread_destroy(con); + con->sched_thread = NULL; + ast_mutex_lock(&con->lock); #ifdef SCHED_MAX_CACHE - /* Eliminate the cache */ - while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) + while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) { ast_free(s); + } #endif if (con->sched_heap) { @@ -282,14 +250,14 @@ void sched_context_destroy(struct sched_context *con) ast_hashtab_destroy(con->schedq_ht, NULL); con->schedq_ht = NULL; - - /* And the context */ + ast_mutex_unlock(&con->lock); ast_mutex_destroy(&con->lock); + ast_free(con); } -static struct sched *sched_alloc(struct sched_context *con) +static struct sched *sched_alloc(struct ast_sched_context *con) { struct sched *tmp; @@ -307,7 +275,7 @@ static struct sched *sched_alloc(struct sched_context *con) return tmp; } -static void sched_release(struct sched_context *con, struct sched *tmp) +static void sched_release(struct ast_sched_context *con, struct sched *tmp) { /* * Add to the cache, or just free() if we @@ -327,7 +295,7 @@ static void sched_release(struct sched_context *con, struct sched *tmp) * Return the number of milliseconds * until the next scheduled event */ -int ast_sched_wait(struct sched_context *con) +int ast_sched_wait(struct ast_sched_context *con) { int ms; struct sched *s; @@ -354,7 +322,7 @@ int ast_sched_wait(struct sched_context *con) * queue, such that the soonest event is * first in the list. */ -static void schedule(struct sched_context *con, struct sched *s) +static void schedule(struct ast_sched_context *con, struct sched *s) { ast_heap_push(con->sched_heap, s); @@ -387,7 +355,7 @@ static int sched_settime(struct timeval *t, int when) return 0; } -int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) +int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) { /* 0 means the schedule item is new; do not delete */ if (old_id > 0) { @@ -399,7 +367,7 @@ int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, /*! \brief * Schedule callback(data) to happen when ms into the future */ -int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) +int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) { struct sched *tmp; int res = -1; @@ -426,12 +394,15 @@ int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb cal if (option_debug) ast_sched_dump(con); #endif + if (con->sched_thread) { + ast_cond_signal(&con->sched_thread->cond); + } ast_mutex_unlock(&con->lock); return res; } -int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data) +int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) { if (old_id > -1) { AST_SCHED_DEL(con, old_id); @@ -439,12 +410,12 @@ int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched return ast_sched_add(con, when, callback, data); } -int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data) +int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) { return ast_sched_add_variable(con, when, callback, data, 0); } -const void *ast_sched_find_data(struct sched_context *con, int id) +const void *ast_sched_find_data(struct ast_sched_context *con, int id) { struct sched tmp,*res; tmp.id = id; @@ -461,9 +432,9 @@ const void *ast_sched_find_data(struct sched_context *con, int id) * id. */ #ifndef AST_DEVMODE -int ast_sched_del(struct sched_context *con, int id) +int ast_sched_del(struct ast_sched_context *con, int id) #else -int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function) +int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function) #endif { struct sched *s, tmp = { @@ -498,6 +469,9 @@ int _ast_sched_del(struct sched_context *con, int id, const char *file, int line if (option_debug) ast_sched_dump(con); #endif + if (con->sched_thread) { + ast_cond_signal(&con->sched_thread->cond); + } ast_mutex_unlock(&con->lock); if (!s && *last_id != id) { @@ -520,7 +494,7 @@ int _ast_sched_del(struct sched_context *con, int id, const char *file, int line return 0; } -void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames) +void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames) { int i, x; struct sched *cur; @@ -558,7 +532,7 @@ void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct as } /*! \brief Dump the contents of the scheduler to LOG_DEBUG */ -void ast_sched_dump(struct sched_context *con) +void ast_sched_dump(struct ast_sched_context *con) { struct sched *q; struct timeval when = ast_tvnow(); @@ -593,7 +567,7 @@ void ast_sched_dump(struct sched_context *con) /*! \brief * Launch all events which need to be run at this time. */ -int ast_sched_runq(struct sched_context *con) +int ast_sched_runq(struct ast_sched_context *con) { struct sched *current; struct timeval when; @@ -601,7 +575,7 @@ int ast_sched_runq(struct sched_context *con) int res; DEBUG(ast_debug(1, "ast_sched_runq()\n")); - + ast_mutex_lock(&con->lock); when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000)); @@ -614,7 +588,7 @@ int ast_sched_runq(struct sched_context *con) if (ast_tvcmp(current->when, when) != -1) { break; } - + current = ast_heap_pop(con->sched_heap); if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) { @@ -631,13 +605,13 @@ int ast_sched_runq(struct sched_context *con) * the schedule queue. If that's what it wants to do, it * should return 0. */ - + ast_mutex_unlock(&con->lock); res = current->callback(current->data); ast_mutex_lock(&con->lock); - + if (res) { - /* + /* * If they return non-zero, we should schedule them to be * run again. */ @@ -653,27 +627,27 @@ int ast_sched_runq(struct sched_context *con) } ast_mutex_unlock(&con->lock); - + return numevents; } -long ast_sched_when(struct sched_context *con,int id) +long ast_sched_when(struct ast_sched_context *con,int id) { struct sched *s, tmp; long secs = -1; DEBUG(ast_debug(1, "ast_sched_when()\n")); ast_mutex_lock(&con->lock); - + /* these next 2 lines replace a lookup loop */ tmp.id = id; s = ast_hashtab_lookup(con->schedq_ht, &tmp); - + if (s) { struct timeval now = ast_tvnow(); secs = s->when.tv_sec - now.tv_sec; } ast_mutex_unlock(&con->lock); - + return secs; } diff --git a/main/udptl.c b/main/udptl.c index 100044919..6fbf3784c 100644 --- a/main/udptl.c +++ b/main/udptl.c @@ -124,7 +124,7 @@ struct ast_udptl { struct ast_sockaddr us; struct ast_sockaddr them; int *ioid; - struct sched_context *sched; + struct ast_sched_context *sched; struct io_context *io; void *data; char *tag; @@ -913,7 +913,7 @@ unsigned int ast_udptl_get_far_max_ifp(struct ast_udptl *udptl) return udptl->far_max_ifp; } -struct ast_udptl *ast_udptl_new_with_bindaddr(struct sched_context *sched, struct io_context *io, int callbackmode, struct ast_sockaddr *addr) +struct ast_udptl *ast_udptl_new_with_bindaddr(struct ast_sched_context *sched, struct io_context *io, int callbackmode, struct ast_sockaddr *addr) { struct ast_udptl *udptl; int x; |