diff options
author | BJ Weschke <bweschke@btwtech.com> | 2006-05-25 21:47:03 +0000 |
---|---|---|
committer | BJ Weschke <bweschke@btwtech.com> | 2006-05-25 21:47:03 +0000 |
commit | 295c340164dce3624c2cdc2c52678f9283e2a86c (patch) | |
tree | 1a4d97ea5e554d7e0e31bd280a3da690265db12a /apps | |
parent | 8ecf6a7bb168b6e86f7a24121aa4d9fea3c022a6 (diff) |
A new way to try and deal with deadlocks that occur in app_queue at present. Using this approach, we only manipulate the main queue mutexes when we get a dev state change on a device that is actually a member of a queue. Further optimizations are still possible (eg - store and manage pointers to the status integer of the member record that this interface/device has a one-to-one relationship with and then go directly to those pointers to make status modifications rather than the recursive looping that goes on now) BUT first things first. :)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@30430 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'apps')
-rw-r--r-- | apps/app_queue.c | 176 |
1 files changed, 145 insertions, 31 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c index d5db8053e..5187800e9 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -318,6 +318,13 @@ struct member { struct member *next; /*!< Next member */ }; +struct ast_member_interfaces { + char interface[80]; + AST_LIST_ENTRY(ast_member_interfaces) list; /*!< Next call queue */ +}; + +static AST_LIST_HEAD_STATIC(interfaces, ast_member_interfaces); + /* values used in multi-bit flags in ast_call_queue */ #define QUEUE_EMPTY_NORMAL 1 #define QUEUE_EMPTY_STRICT 2 @@ -482,6 +489,7 @@ static void *changethread(void *data) struct ast_call_queue *q; struct statechange *sc = data; struct member *cur; + struct ast_member_interfaces *curint; char *loc; char *technology; @@ -494,36 +502,50 @@ static void *changethread(void *data) free(sc); return NULL; } - if (option_debug) - ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state)); - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); - cur = q->members; - while(cur) { - if (!strcasecmp(sc->dev, cur->interface)) { - if (cur->status != sc->state) { - cur->status = sc->state; - if (!q->maskmemberstatus) { - manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus", - "Queue: %s\r\n" - "Location: %s\r\n" - "Membership: %s\r\n" - "Penalty: %d\r\n" - "CallsTaken: %d\r\n" - "LastCall: %d\r\n" - "Status: %d\r\n" - "Paused: %d\r\n", - q->name, cur->interface, cur->dynamic ? "dynamic" : "static", - cur->penalty, cur->calls, (int)cur->lastcall, cur->status, cur->paused); + + AST_LIST_LOCK(&interfaces); + AST_LIST_TRAVERSE(&interfaces, curint, list) { + if (!strcasecmp(curint->interface, sc->dev)) + break; + } + AST_LIST_UNLOCK(&interfaces); + + if (curint) { + + if (option_debug) + ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state)); + AST_LIST_LOCK(&queues); + AST_LIST_TRAVERSE(&queues, q, list) { + ast_mutex_lock(&q->lock); + cur = q->members; + while(cur) { + if (!strcasecmp(sc->dev, cur->interface)) { + if (cur->status != sc->state) { + cur->status = sc->state; + if (!q->maskmemberstatus) { + manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus", + "Queue: %s\r\n" + "Location: %s\r\n" + "Membership: %s\r\n" + "Penalty: %d\r\n" + "CallsTaken: %d\r\n" + "LastCall: %d\r\n" + "Status: %d\r\n" + "Paused: %d\r\n", + q->name, cur->interface, cur->dynamic ? "dynamic" : "static", + cur->penalty, cur->calls, (int)cur->lastcall, cur->status, cur->paused); + } } } + cur = cur->next; } - cur = cur->next; + ast_mutex_unlock(&q->lock); } - ast_mutex_unlock(&q->lock); + AST_LIST_UNLOCK(&queues); + } else { + if (option_debug) + ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", technology, loc, sc->state, devstate2str(sc->state)); } - AST_LIST_UNLOCK(&queues); free(sc); return NULL; } @@ -622,6 +644,87 @@ static void clear_queue(struct ast_call_queue *q) q->wrapuptime = 0; } +static int add_to_interfaces(char *interface) +{ + struct ast_member_interfaces *curint, *newint; + + AST_LIST_LOCK(&interfaces); + AST_LIST_TRAVERSE(&interfaces, curint, list) { + if (!strcasecmp(curint->interface, interface)) + break; + } + + if (!curint) { + if (option_debug) + ast_log(LOG_DEBUG, "Adding %s to the list of interfaces that make up all of our queue members.\n", interface); + + if ((newint = ast_calloc(1, sizeof(*newint)))) { + ast_copy_string(newint->interface, interface, sizeof(newint->interface)); + AST_LIST_INSERT_HEAD(&interfaces, newint, list); + } + } + AST_LIST_UNLOCK(&interfaces); + + return 0; +} + +static int interface_exists_global(char *interface) +{ + struct ast_call_queue *q; + struct member *mem; + int ret = 0; + + AST_LIST_LOCK(&queues); + AST_LIST_TRAVERSE(&queues, q, list) { + ast_mutex_lock(&q->lock); + for (mem = q->members; mem; mem = mem->next) + if (!strcasecmp(interface, mem->interface)) { + ast_mutex_unlock(&q->lock); + ret = 1; + break; + } + ast_mutex_unlock(&q->lock); + } + AST_LIST_UNLOCK(&queues); + + return ret; +} + + +static int remove_from_interfaces(char *interface) +{ + struct ast_member_interfaces *curint; + + AST_LIST_LOCK(&interfaces); + AST_LIST_TRAVERSE_SAFE_BEGIN(&interfaces, curint, list) { + if (!strcasecmp(curint->interface, interface) && !interface_exists_global(interface)) { + if (option_debug) + ast_log(LOG_DEBUG, "Removing %s from the list of interfaces that make up all of our queue members.\n", interface); + AST_LIST_REMOVE_CURRENT(&interfaces, list); + free(curint); + } + } + AST_LIST_TRAVERSE_SAFE_END; + AST_LIST_UNLOCK(&interfaces); + + return 0; +} + +static void clear_and_free_interfaces(void) +{ + struct ast_member_interfaces *curint; + + AST_LIST_LOCK(&interfaces); + AST_LIST_TRAVERSE_SAFE_BEGIN(&interfaces, curint, list) { + AST_LIST_REMOVE_CURRENT(&interfaces, list); + free(curint); + } + AST_LIST_TRAVERSE_SAFE_END; + AST_LIST_UNLOCK(&interfaces); + + return; +} + /*! \brief Configure a queue parameter. \par For error reporting, line number is passed for .conf static configuration. @@ -802,6 +905,7 @@ static void rt_handle_member_record(struct ast_call_queue *q, char *interface, c m = create_queue_member(interface, penalty, 0); if (m) { m->dead = 0; + add_to_interfaces(interface); if (prev_m) { prev_m->next = m; } else { @@ -826,6 +930,7 @@ static void free_members(struct ast_call_queue *q, int all) prev->next = next; else q->members = next; + remove_from_interfaces(curm->interface); free(curm); } else prev = curm; @@ -948,6 +1053,7 @@ static struct ast_call_queue *find_queue_by_name_rt(const char *queuename, struc } else { q->members = next_m; } + remove_from_interfaces(m->interface); free(m); } else { prev_m = m; @@ -1065,9 +1171,8 @@ static int join_queue(char *queuename, struct queue_ent *qe, enum queue_result * S_OR(qe->chan->cid.cid_num, "unknown"), /* XXX somewhere else it is <unknown> */ S_OR(qe->chan->cid.cid_name, "unknown"), q->name, qe->pos, q->count, qe->chan->uniqueid ); -#if 0 -ast_log(LOG_NOTICE, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos ); -#endif + if (option_debug) + ast_log(LOG_DEBUG, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos ); } ast_mutex_unlock(&q->lock); AST_LIST_UNLOCK(&queues); @@ -2596,10 +2701,14 @@ static int remove_from_queue(char *queuename, char *interface) } ast_mutex_unlock(&q->lock); } + if (res == RES_OKAY) { + remove_from_interfaces(interface); + } AST_LIST_UNLOCK(&queues); return res; } + static int add_to_queue(char *queuename, char *interface, int penalty, int paused, int dump) { struct ast_call_queue *q; @@ -2615,6 +2724,9 @@ static int add_to_queue(char *queuename, char *interface, int penalty, int pause if (q) { ast_mutex_lock(&q->lock); if (interface_exists(q, interface) == NULL) { + + add_to_interfaces(interface); + new_member = create_queue_member(interface, penalty, paused); if (new_member != NULL) { @@ -3183,7 +3295,6 @@ check_turns: /* Try calling all queue members for 'timeout' seconds */ res = try_calling(&qe, args.options, args.announceoverride, args.url, &go_on, args.agi); - if (res) { if (res < 0) { if (!qe.handled) { @@ -3552,6 +3663,8 @@ static void reload_queues(void) } free(cur); } else { + /* Add them to the master int list if necessary */ + add_to_interfaces(interface); newm->next = q->members; q->members = newm; } @@ -3575,6 +3688,7 @@ static void reload_queues(void) q->members = cur->next; newm = cur; } + remove_from_interfaces(cur->interface); } } @@ -4128,6 +4242,7 @@ static int unload_module(void *mod) { int res; + clear_and_free_interfaces(); res = ast_cli_unregister(&cli_show_queue); res |= ast_cli_unregister(&cli_show_queues); res |= ast_cli_unregister(&cli_add_queue_member); @@ -4137,7 +4252,6 @@ static int unload_module(void *mod) res |= ast_manager_unregister("QueueAdd"); res |= ast_manager_unregister("QueueRemove"); res |= ast_manager_unregister("QueuePause"); - ast_devstate_del(statechange_queue, NULL); res |= ast_unregister_application(app_aqm); res |= ast_unregister_application(app_rqm); res |= ast_unregister_application(app_pqm); @@ -4162,7 +4276,6 @@ static int load_module(void *mod) res |= ast_cli_register(&cli_show_queues); res |= ast_cli_register(&cli_add_queue_member); res |= ast_cli_register(&cli_remove_queue_member); - res |= ast_devstate_add(statechange_queue, NULL); res |= ast_manager_register( "Queues", 0, manager_queues_show, "Queues" ); res |= ast_manager_register( "QueueStatus", 0, manager_queues_status, "Queue Status" ); res |= ast_manager_register( "QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member, "Add interface to queue." ); @@ -4176,6 +4289,7 @@ static int load_module(void *mod) res |= ast_custom_function_register(&queuemembercount_function); res |= ast_custom_function_register(&queuememberlist_function); res |= ast_custom_function_register(&queuewaitingcount_function); + res |= ast_devstate_add(statechange_queue, NULL); if (!res) { reload_queues(); |