From 8fdea12f33c5a5abb0c5bd45479bbcaceba1b952 Mon Sep 17 00:00:00 2001 From: "Dwayne M. Hubbard" Date: Sat, 3 May 2008 03:57:42 +0000 Subject: app_queue uses a taskprocessor for device state changes git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@115270 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- apps/app_queue.c | 116 +++++++++++++------------------------------------------ 1 file changed, 27 insertions(+), 89 deletions(-) (limited to 'apps/app_queue.c') diff --git a/apps/app_queue.c b/apps/app_queue.c index 63f84b24d..c4c205fda 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -92,6 +92,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/strings.h" #include "asterisk/global_datastores.h" +#include "asterisk/taskprocessor.h" /*! * \par Please read before modifying this file. @@ -131,6 +132,8 @@ static const struct strategy { { QUEUE_STRATEGY_WRANDOM, "wrandom"}, }; +static struct ast_taskprocessor *devicestate_tps; + #define DEFAULT_RETRY 5 #define DEFAULT_TIMEOUT 15 #define RECHECK 1 /*!< Recheck every second to see we we're at the top yet */ @@ -739,18 +742,20 @@ static int update_status(const char *interface, const int status) } /*! \brief set a member's status based on device state of that member's interface*/ -static void *handle_statechange(struct statechange *sc) +static int handle_statechange(void *datap) { struct member_interface *curint; char *loc; char *technology; + struct statechange *sc = datap; technology = ast_strdupa(sc->dev); loc = strchr(technology, '/'); if (loc) { *loc++ = '\0'; } else { - return NULL; + ast_free(sc); + return 0; } AST_LIST_LOCK(&interfaces); @@ -770,84 +775,14 @@ static void *handle_statechange(struct statechange *sc) if (!curint) { if (option_debug > 2) ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", technology, loc, sc->state, devstate2str(sc->state)); - return NULL; + return 0; } if (option_debug) ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state)); update_status(sc->dev, sc->state); - - return NULL; -} - -/*! \brief Data used by the device state thread */ -static struct { - /*! Set to 1 to stop the thread */ - unsigned int stop:1; - /*! The device state monitoring thread */ - pthread_t thread; - /*! Lock for the state change queue */ - ast_mutex_t lock; - /*! Condition for the state change queue */ - ast_cond_t cond; - /*! Queue of state changes */ - AST_LIST_HEAD_NOLOCK(, statechange) state_change_q; -} device_state = { - .thread = AST_PTHREADT_NULL, -}; - -/*! \brief Consumer of the statechange queue */ -static void *device_state_thread(void *data) -{ - struct statechange *sc = NULL; - - while (!device_state.stop) { - ast_mutex_lock(&device_state.lock); - if (!(sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry))) { - ast_cond_wait(&device_state.cond, &device_state.lock); - sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry); - } - ast_mutex_unlock(&device_state.lock); - - /* Check to see if we were woken up to see the request to stop */ - if (device_state.stop) - break; - - if (!sc) - continue; - - handle_statechange(sc); - - ast_free(sc); - sc = NULL; - } - - if (sc) - ast_free(sc); - - while ((sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry))) - ast_free(sc); - - return NULL; -} - -/*! \brief Producer of the statechange queue */ -static int statechange_queue(const char *dev, enum ast_device_state state) -{ - struct statechange *sc; - - if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(dev) + 1))) - return 0; - - sc->state = state; - strcpy(sc->dev, dev); - - ast_mutex_lock(&device_state.lock); - AST_LIST_INSERT_TAIL(&device_state.state_change_q, sc, entry); - ast_cond_signal(&device_state.cond); - ast_mutex_unlock(&device_state.lock); - + ast_free(sc); return 0; } @@ -855,6 +790,8 @@ static void device_state_cb(const struct ast_event *event, void *unused) { enum ast_device_state state; const char *device; + struct statechange *sc; + size_t datapsize; state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE); @@ -863,8 +800,16 @@ static void device_state_cb(const struct ast_event *event, void *unused) ast_log(LOG_ERROR, "Received invalid event that had no device IE\n"); return; } - - statechange_queue(device, state); + datapsize = sizeof(*sc) + strlen(device) + 1; + if (!(sc = ast_calloc(1, datapsize))) { + ast_log(LOG_ERROR, "failed to calloc a state change struct\n"); + return; + } + sc->state = state; + strcpy(sc->dev, device); + if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) { + ast_free(sc); + } } /*! \brief allocate space for new queue member and set fields based on parameters passed */ @@ -6249,14 +6194,6 @@ static int unload_module(void) struct ao2_iterator q_iter; struct call_queue *q = NULL; - if (device_state.thread != AST_PTHREADT_NULL) { - device_state.stop = 1; - ast_mutex_lock(&device_state.lock); - ast_cond_signal(&device_state.cond); - ast_mutex_unlock(&device_state.lock); - pthread_join(device_state.thread, NULL); - } - ast_cli_unregister_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry)); res = ast_manager_unregister("QueueStatus"); res |= ast_manager_unregister("Queues"); @@ -6296,7 +6233,7 @@ static int unload_module(void) queue_unref(q); } ao2_ref(queues, -1); - + devicestate_tps = ast_taskprocessor_unreference(devicestate_tps); return res; } @@ -6319,10 +6256,6 @@ static int load_module(void) if (queue_persistent_members) reload_queue_members(); - ast_mutex_init(&device_state.lock); - ast_cond_init(&device_state.cond, NULL); - ast_pthread_create(&device_state.thread, NULL, device_state_thread, NULL); - ast_cli_register_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry)); res = ast_register_application(app, queue_exec, synopsis, descrip); res |= ast_register_application(app_aqm, aqm_exec, app_aqm_synopsis, app_aqm_descrip); @@ -6345,6 +6278,11 @@ static int load_module(void) res |= ast_custom_function_register(&queuememberlist_function); res |= ast_custom_function_register(&queuewaitingcount_function); res |= ast_custom_function_register(&queuememberpenalty_function); + + if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) { + ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n"); + } + if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END))) res = -1; -- cgit v1.2.3