From 9a301b9f20d2ac20b89d547ad0be2aa92e7d226b Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Mon, 16 Jun 2008 13:08:13 +0000 Subject: Merge res_timing_pthread. This is a timing interface for Asterisk that does not require DAHDI. It's called "pthread" because it uses a pthread API call in the timing thread for sleeping and ensuring we wake up at an appropriate time. I wasn't sure what else to call it. :) The timing API requires a file descriptor that can be polled on. So, when you open a timer, this module creates a pipe and returns the read end of the pipe. There is a background thread that wakes up every 10ms and checks to see if any of the currently open timers need a 'tick' and writes to the appropriate pipe. git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@122928 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- res/res_timing_pthread.c | 480 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 480 insertions(+) create mode 100644 res/res_timing_pthread.c diff --git a/res/res_timing_pthread.c b/res/res_timing_pthread.c new file mode 100644 index 000000000..6b17e9b85 --- /dev/null +++ b/res/res_timing_pthread.c @@ -0,0 +1,480 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2008, Digium, Inc. + * + * Russell Bryant + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \author Russell Bryant + * + * \brief pthread timing interface + */ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); + +#include +#include + +#include "asterisk/module.h" +#include "asterisk/timing.h" +#include "asterisk/utils.h" +#include "asterisk/astobj2.h" +#include "asterisk/time.h" +#include "asterisk/lock.h" + +static void *timing_funcs_handle; + +static int pthread_timer_open(void); +static void pthread_timer_close(int handle); +static int pthread_timer_set_rate(int handle, unsigned int rate); +static void pthread_timer_ack(int handle, unsigned int quantity); +static int pthread_timer_enable_continuous(int handle); +static int pthread_timer_disable_continuous(int handle); +static enum ast_timing_event pthread_timer_get_event(int handle); + +static struct ast_timing_functions pthread_timing_functions = { + .timer_open = pthread_timer_open, + .timer_close = pthread_timer_close, + .timer_set_rate = pthread_timer_set_rate, + .timer_ack = pthread_timer_ack, + .timer_enable_continuous = pthread_timer_enable_continuous, + .timer_disable_continuous = pthread_timer_disable_continuous, + .timer_get_event = pthread_timer_get_event, +}; + +/* 1 tick / 20 ms */ +#define TIMING_INTERVAL 20 +#define MAX_RATE 50 + +static struct ao2_container *pthread_timers; +#define PTHREAD_TIMER_BUCKETS 563 + +enum { + PIPE_READ = 0, + PIPE_WRITE = 1 +}; + +enum pthread_timer_state { + TIMER_STATE_IDLE, + TIMER_STATE_TICKING, + TIMER_STATE_CONTINUOUS, +}; + +struct pthread_timer { + int pipe[2]; + enum pthread_timer_state state; + unsigned int rate; + /*! Interval in ms for current rate */ + unsigned int interval; + unsigned int tick_count; + struct timeval start; +}; + +static void pthread_timer_destructor(void *obj); +static struct pthread_timer *find_timer(int handle, int unlink); +static void write_byte(int wr_fd); +static void read_pipe(int rd_fd, unsigned int num, int clear); + +/*! + * \brief Data for the timing thread + */ +static struct { + pthread_t thread; + ast_mutex_t lock; + ast_cond_t cond; + unsigned int stop:1; +} timing_thread; + +static int pthread_timer_open(void) +{ + struct pthread_timer *timer; + + if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) { + errno = ENOMEM; + return -1; + } + + timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1; + timer->state = TIMER_STATE_IDLE; + + if (pipe(timer->pipe)) { + ao2_ref(timer, -1); + return -1; + } + + ao2_lock(pthread_timers); + if (!ao2_container_count(pthread_timers)) { + ast_mutex_lock(&timing_thread.lock); + ast_cond_signal(&timing_thread.cond); + ast_mutex_unlock(&timing_thread.lock); + } + ao2_link(pthread_timers, timer); + ao2_unlock(pthread_timers); + + return timer->pipe[PIPE_READ]; +} + +static void pthread_timer_close(int handle) +{ + struct pthread_timer *timer; + + if (!(timer = find_timer(handle, 1))) { + return; + } + + ao2_ref(timer, -1); +} + +static int pthread_timer_set_rate(int handle, unsigned int rate) +{ + struct pthread_timer *timer; + + if (!(timer = find_timer(handle, 0))) { + errno = EINVAL; + return -1; + } + + if (rate > 0 && rate < MAX_RATE) { + ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n", + MAX_RATE); + errno = EINVAL; + return -1; + } + + ao2_lock(timer); + timer->rate = rate; + timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE; + timer->interval = rate ? roundf(1000.0 / ((float) rate)) : 0; + timer->start = rate ? ast_tvnow() : ast_tv(0, 0); + timer->tick_count = 0; + ao2_unlock(timer); + + ao2_ref(timer, -1); + + return 0; +} + +static void pthread_timer_ack(int handle, unsigned int quantity) +{ + struct pthread_timer *timer; + + ast_assert(quantity > 0); + + if (!(timer = find_timer(handle, 0))) { + return; + } + + if (timer->state == TIMER_STATE_CONTINUOUS) { + /* Leave the pipe alone, please! */ + return; + } + + read_pipe(timer->pipe[PIPE_READ], quantity, 0); + + ao2_ref(timer, -1); +} + +static int pthread_timer_enable_continuous(int handle) +{ + struct pthread_timer *timer; + + if (!(timer = find_timer(handle, 0))) { + errno = EINVAL; + return -1; + } + + ao2_lock(timer); + timer->state = TIMER_STATE_CONTINUOUS; + write_byte(timer->pipe[PIPE_WRITE]); + ao2_unlock(timer); + + ao2_ref(timer, -1); + + return 0; +} + +static int pthread_timer_disable_continuous(int handle) +{ + struct pthread_timer *timer; + + if (!(timer = find_timer(handle, 0))) { + errno = EINVAL; + return -1; + } + + ao2_lock(timer); + timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE; + read_pipe(timer->pipe[PIPE_READ], 0, 1); + ao2_unlock(timer); + + ao2_ref(timer, -1); + + return 0; +} + +static enum ast_timing_event pthread_timer_get_event(int handle) +{ + struct pthread_timer *timer; + enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED; + + if (!(timer = find_timer(handle, 0))) { + return res; + } + + if (timer->state == TIMER_STATE_CONTINUOUS) { + res = AST_TIMING_EVENT_CONTINUOUS; + } + + ao2_ref(timer, -1); + + return res; +} + +static struct pthread_timer *find_timer(int handle, int unlink) +{ + struct pthread_timer *timer; + struct pthread_timer tmp_timer; + int flags = OBJ_POINTER; + + tmp_timer.pipe[PIPE_READ] = handle; + + if (unlink) { + flags |= OBJ_UNLINK; + } + + if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) { + ast_assert(timer != NULL); + return NULL; + } + + return timer; +} + +static void pthread_timer_destructor(void *obj) +{ + struct pthread_timer *timer = obj; + + if (timer->pipe[PIPE_READ] > -1) { + close(timer->pipe[PIPE_READ]); + timer->pipe[PIPE_READ] = -1; + } + + if (timer->pipe[PIPE_WRITE] > -1) { + close(timer->pipe[PIPE_WRITE]); + timer->pipe[PIPE_WRITE] = -1; + } +} + +/*! + * \note only PIPE_READ is guaranteed valid + */ +static int pthread_timer_hash(const void *obj, const int flags) +{ + const struct pthread_timer *timer = obj; + + return timer->pipe[PIPE_READ]; +} + +/*! + * \note only PIPE_READ is guaranteed valid + */ +static int pthread_timer_cmp(void *obj, void *arg, int flags) +{ + struct pthread_timer *timer1 = obj, *timer2 = arg; + + return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH : 0; +} + +/*! + * \retval 0 no timer tick needed + * \retval non-zero write to the timing pipe needed + */ +static int check_timer(struct pthread_timer *timer) +{ + struct timeval now; + + if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) { + return 0; + } + + now = ast_tvnow(); + + if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) { + timer->tick_count++; + if (!timer->tick_count) { + timer->start = now; + } + return 1; + } + + return 0; +} + +static void read_pipe(int rd_fd, unsigned int quantity, int clear) +{ + + ast_assert(quantity || clear); + + if (!quantity && clear) { + quantity = 1; + } + + do { + unsigned char buf[1024]; + ssize_t res; + fd_set rfds; + struct timeval tv = { + .tv_sec = 0, + }; + + /* Make sure there is data to read */ + FD_ZERO(&rfds); + FD_SET(rd_fd, &rfds); + + if (select(rd_fd + 1, &rfds, NULL, NULL, &tv) != 1) { + break; + } + + res = read(rd_fd, buf, + (quantity < sizeof(buf)) ? quantity : sizeof(buf)); + + if (res == -1) { + if (errno == EAGAIN) { + continue; + } + ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno)); + break; + } + + if (clear) { + continue; + } + + quantity -= res; + } while (quantity); +} + +static void write_byte(int wr_fd) +{ + do { + ssize_t res; + unsigned char x = 42; + + res = write(wr_fd, &x, 1); + + if (res == -1) { + if (errno == EAGAIN) { + continue; + } + ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno)); + } + } while (0); +} + +static int run_timer(void *obj, void *arg, int flags) +{ + struct pthread_timer *timer = obj; + + if (timer->state == TIMER_STATE_IDLE) { + return 0; + } + + ao2_lock(timer); + + if (check_timer(timer)) { + write_byte(timer->pipe[PIPE_WRITE]); + } + + ao2_unlock(timer); + + return 0; +} + +static void *do_timing(void *arg) +{ + struct timeval next_wakeup = ast_tvnow(); + + while (!timing_thread.stop) { + struct timespec ts = { 0, }; + + ao2_callback(pthread_timers, 0, run_timer, NULL); + + next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 10000)); + + ts.tv_sec = next_wakeup.tv_sec; + ts.tv_nsec = next_wakeup.tv_usec * 1000; + + ast_mutex_lock(&timing_thread.lock); + if (!timing_thread.stop) { + if (ao2_container_count(pthread_timers)) { + ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts); + } else { + ast_cond_wait(&timing_thread.cond, &timing_thread.lock); + } + } + ast_mutex_unlock(&timing_thread.lock); + } + + return NULL; +} + +static int init_timing_thread(void) +{ + ast_mutex_init(&timing_thread.lock); + ast_cond_init(&timing_thread.cond, NULL); + + if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) { + ast_log(LOG_ERROR, "Unable to start timing thread.\n"); + return -1; + } + + return 0; +} + +static int load_module(void) +{ + if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, + pthread_timer_hash, pthread_timer_cmp))) { + return AST_MODULE_LOAD_DECLINE; + } + + if (init_timing_thread()) { + ao2_ref(pthread_timers, -1); + pthread_timers = NULL; + return AST_MODULE_LOAD_DECLINE; + } + + return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ? + AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE; +} + +static int unload_module(void) +{ +#if 0 + /* XXX code to stop the timing thread ... */ + + ast_uninstall_timing_functions(timing_funcs_handle); + ao2_ref(pthread_timers, -1); +#endif + + /* This module can not currently be unloaded. No use count handling is being done. */ + + return -1; +} + +AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface"); -- cgit v1.2.3