From 66cbe070298b9dd71737ebbaa869260a8445358a Mon Sep 17 00:00:00 2001 From: Mark Spencer Date: Fri, 21 Jan 2005 04:14:26 +0000 Subject: Merge mjohnston's pause/unpause (bug #3252) git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@4861 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- apps/app_queue.c | 269 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 247 insertions(+), 22 deletions(-) diff --git a/apps/app_queue.c b/apps/app_queue.c index 27a8149c7..8ab4351ad 100755 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -148,6 +148,30 @@ static char *app_rqm_descrip = "Example: RemoveQueueMember(techsupport|SIP/3000)\n" ""; +static char *app_pqm = "PauseQueueMember" ; +static char *app_pqm_synopsis = "Pauses a queue member" ; +static char *app_pqm_descrip = +" PauseQueueMember([queuename]|interface):\n" +"Pauses (blocks calls for) a queue member.\n" +"The given interface will be paused in the given queue. This prevents\n" +"any calls from being sent from the queue to the interface until it is\n" +"unpaused with UnpauseQueueMember or the manager interface. If no\n" +"queuename is given, the interface is paused in every queue it is a\n" +"member of. If the interface is not in the named queue, or if no queue\n" +"is given and the interface is not in any queue, it will jump to\n" +" priority n+101, if it exists. Returns -1 if the interface is not\n" +"found and no extension to jump to exists, 0 otherwise.\n" +"Example: PauseQueueMember(|SIP/3000)\n"; + +static char *app_upqm = "UnpauseQueueMember" ; +static char *app_upqm_synopsis = "Unpauses a queue member" ; +static char *app_upqm_descrip = +" UnpauseQueueMember([queuename]|interface):\n" +"Unpauses (resumes calls to) a queue member.\n" +"This is the counterpart to PauseQueueMember and operates exactly the\n" +"same way, except it unpauses instead of pausing the given interface.\n" +"Example: UnpauseQueueMember(|SIP/3000)\n"; + /* Persistent Members astdb family */ static const char *pm_family = "/Queue/PersistentMembers"; /* The maximum lengh of each persistent member queue database entry */ @@ -214,6 +238,7 @@ struct member { int calls; /* Number of calls serviced by this member */ int dynamic; /* Are we dynamically added? */ int status; /* Status of queue member */ + int paused; /* Are we paused (not accepting calls)? */ time_t lastcall; /* When last successful call was hungup */ struct member *next; /* Next member */ }; @@ -367,9 +392,10 @@ static void *changethread(void *data) "Penalty: %d\r\n" "CallsTaken: %d\r\n" "LastCall: %ld\r\n" - "Status: %d\r\n", + "Status: %d\r\n" + "Paused: %d\r\n", q->name, cur->interface, cur->dynamic ? "dynamic" : "static", - cur->penalty, cur->calls, cur->lastcall, cur->status); + cur->penalty, cur->calls, cur->lastcall, cur->status, cur->paused); } } cur = cur->next; @@ -693,9 +719,10 @@ static int update_status(struct ast_call_queue *q, struct member *member, int st "Penalty: %d\r\n" "CallsTaken: %d\r\n" "LastCall: %ld\r\n" - "Status: %d\r\n", + "Status: %d\r\n" + "Paused: %d\r\n", q->name, cur->interface, cur->dynamic ? "dynamic" : "static", - cur->penalty, cur->calls, cur->lastcall, cur->status); + cur->penalty, cur->calls, cur->lastcall, cur->status, cur->paused); break; } cur = cur->next; @@ -789,6 +816,15 @@ static int ring_entry(struct queue_ent *qe, struct localuser *tmp, int *busies) return 0; } + if (tmp->member->paused) { + if (option_debug) + ast_log(LOG_DEBUG, "%s paused, can't receive call\n", tmp->interface); + if (qe->chan->cdr) + ast_cdr_busy(qe->chan->cdr); + tmp->stillgoing = 0; + return 0; + } + strncpy(tech, tmp->interface, sizeof(tech) - 1); if ((location = strchr(tech, '/'))) *location++ = '\0'; @@ -1616,7 +1652,7 @@ static struct member * interface_exists(struct ast_call_queue *q, char *interfac } -static struct member *create_queue_node(char *interface, int penalty) +static struct member *create_queue_node(char *interface, int penalty, int paused) { struct member *cur; @@ -1627,6 +1663,7 @@ static struct member *create_queue_node(char *interface, int penalty) if (cur) { memset(cur, 0, sizeof(struct member)); cur->penalty = penalty; + cur->paused = paused; strncpy(cur->interface, interface, sizeof(cur->interface) - 1); if (!strchr(cur->interface, '/')) ast_log(LOG_WARNING, "No location at interface '%s'\n", interface); @@ -1638,7 +1675,7 @@ static struct member *create_queue_node(char *interface, int penalty) /* Dump all members in a specific queue to the databse * - * / = ;;... + * / = ;;;... * */ static void dump_queue_members(struct ast_call_queue *pm_queue) @@ -1655,7 +1692,7 @@ static void dump_queue_members(struct ast_call_queue *pm_queue) while (cur_member) { if (cur_member->dynamic) { value_len = strlen(value); - res = snprintf(value+value_len, sizeof(value)-value_len, "%s;%d;", cur_member->interface, cur_member->penalty); + res = snprintf(value+value_len, sizeof(value)-value_len, "%s;%d;%d;", cur_member->interface, cur_member->penalty, cur_member->paused); if (res != strlen(value + value_len)) { ast_log(LOG_WARNING, "Could not create persistent member string, out of space\n"); break; @@ -1721,7 +1758,7 @@ static int remove_from_queue(char *queuename, char *interface) return res; } -static int add_to_queue(char *queuename, char *interface, int penalty) +static int add_to_queue(char *queuename, char *interface, int penalty, int paused) { struct ast_call_queue *q; struct member *new_member; @@ -1732,7 +1769,7 @@ static int add_to_queue(char *queuename, char *interface, int penalty) ast_mutex_lock(&q->lock); if (!strcmp(q->name, queuename)) { if (interface_exists(q, interface) == NULL) { - new_member = create_queue_node(interface, penalty); + new_member = create_queue_node(interface, penalty, paused); if (new_member != NULL) { new_member->dynamic = 1; @@ -1745,9 +1782,10 @@ static int add_to_queue(char *queuename, char *interface, int penalty) "Penalty: %d\r\n" "CallsTaken: %d\r\n" "LastCall: %ld\r\n" - "Status: %d\r\n", + "Status: %d\r\n" + "Paused: %d\r\n", q->name, new_member->interface, new_member->dynamic ? "dynamic" : "static", - new_member->penalty, new_member->calls, new_member->lastcall, new_member->status); + new_member->penalty, new_member->calls, new_member->lastcall, new_member->status, new_member->paused); if (queue_persistent_members) dump_queue_members(q); @@ -1768,6 +1806,49 @@ static int add_to_queue(char *queuename, char *interface, int penalty) return res; } +static int set_member_paused(char *queuename, char *interface, int paused) +{ + int found = 0; + struct ast_call_queue *q; + struct member *mem; + + /* Special event for when all queues are paused - individual events still generated */ + + if (ast_strlen_zero(queuename)) + ast_queue_log("NONE", "NONE", interface, (paused ? "PAUSEALL" : "UNPAUSEALL"), "%s", ""); + + ast_mutex_lock(&qlock); + for (q = queues ; q ; q = q->next) { + ast_mutex_lock(&q->lock); + if (ast_strlen_zero(queuename) || !strcmp(q->name, queuename)) { + if ((mem = interface_exists(q, interface))) { + found++; + if (mem->paused == paused) + ast_log(LOG_DEBUG, "%spausing already-%spaused queue member %s:%s\n", (paused ? "" : "un"), (paused ? "" : "un"), q->name, interface); + mem->paused = paused; + + if (queue_persistent_members) + dump_queue_members(q); + + ast_queue_log(q->name, "NONE", interface, (paused ? "PAUSE" : "UNPAUSE"), "%s", ""); + + manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused", + "Queue: %s\r\n" + "Location: %s\r\n" + "Paused: %d\r\n", + q->name, mem->interface, paused); + } + } + ast_mutex_unlock(&q->lock); + } + ast_mutex_unlock(&qlock); + + if (found) + return RESULT_SUCCESS; + else + return RESULT_FAILURE; +} + /* Add members saved in the queue members DB file saves * created by dump_queue_members(), back into the queues */ static void reload_queue_members(void) @@ -1777,6 +1858,8 @@ static void reload_queue_members(void) char *pm_interface; char *pm_penalty_tok; int pm_penalty = 0; + char *pm_paused_tok; + int pm_paused = 0; struct ast_db_entry *pm_db_tree = NULL; int pm_family_len = 0; struct ast_call_queue *cur_queue = NULL; @@ -1813,12 +1896,13 @@ static void reload_queue_members(void) ast_mutex_unlock(&cur_queue->lock); if (!ast_db_get(pm_family, pm_queue_name, queue_data, PM_MAX_LEN)) { - /* Parse each ;; from the value of the - * queuename key and add it to the respective queue */ cur_pm_ptr = queue_data; while ((pm_interface = strsep(&cur_pm_ptr, ";"))) { + /* On the last iteration, pm_interface is a pointer to an empty string. Don't report a spurious error. */ + if (pm_interface[0] == 0) + break; if (!(pm_penalty_tok = strsep(&cur_pm_ptr, ";"))) { - ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s'\n", pm_queue_name); + ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s' (penalty)\n", pm_queue_name); break; } pm_penalty = strtol(pm_penalty_tok, NULL, 10); @@ -1826,11 +1910,26 @@ static void reload_queue_members(void) ast_log(LOG_WARNING, "Error converting penalty: %s: Out of range.\n", pm_penalty_tok); break; } + + /* If ptr[1] is ';', the string is 1 char long and can't be an interface */ + + if (cur_pm_ptr[1] == ';') { + if (!(pm_paused_tok = strsep(&cur_pm_ptr, ";"))) { + ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s' (paused)\n", pm_queue_name); + break; + } + pm_paused = strtol(pm_paused_tok, NULL, 10); + if ((errno == ERANGE) || (pm_paused < 0 || pm_paused > 1)) { + ast_log(LOG_WARNING, "Error converting paused: %s: Expected 0 or 1.\n", pm_paused_tok); + break; + } + } else if (option_debug) + ast_verbose(VERBOSE_PREFIX_3 "Found old-format queue member %s:%s\n", pm_queue_name, pm_interface); if (option_debug) - ast_log(LOG_DEBUG, "Reload Members: Queue: %s Member: %s Penalty: %d\n", pm_queue_name, pm_interface, pm_penalty); + ast_log(LOG_DEBUG, "Reload Members: Queue: %s Member: %s Penalty: %d Paused: %d\n", pm_queue_name, pm_interface, pm_penalty, pm_paused); - if (add_to_queue(pm_queue_name, pm_interface, pm_penalty) == RES_OUTOFMEMORY) { + if (add_to_queue(pm_queue_name, pm_interface, pm_penalty, pm_paused) == RES_OUTOFMEMORY) { ast_log(LOG_ERROR, "Out of Memory when loading queue member from astdb\n"); break; } @@ -1848,6 +1947,90 @@ static void reload_queue_members(void) } } +static int pqm_exec(struct ast_channel *chan, void *data) +{ + struct localuser *u; + char *queuename, *interface; + + if (!data) { + ast_log(LOG_WARNING, "PauseQueueMember requires an argument ([queuename]|interface])\n"); + return -1; + } + + queuename = ast_strdupa((char *)data); + if (!queuename) { + ast_log(LOG_ERROR, "Out of memory\n"); + return -1; + } + + interface = strchr(queuename, '|'); + if (!interface) { + ast_log(LOG_WARNING, "Missing interface argument to PauseQueueMember ([queuename]|interface])\n"); + return -1; + } + + LOCAL_USER_ADD(u); + + *interface = '\0'; + interface++; + + if (set_member_paused(queuename, interface, 1)) { + ast_log(LOG_WARNING, "Attempt to pause interface %s, not found\n", interface); + if (ast_exists_extension(chan, chan->context, chan->exten, chan->priority + 101, chan->cid.cid_num)) { + chan->priority += 100; + LOCAL_USER_REMOVE(u); + return 0; + } + return -1; + } + + LOCAL_USER_REMOVE(u); + + return 0; +} + +static int upqm_exec(struct ast_channel *chan, void *data) +{ + struct localuser *u; + char *queuename, *interface; + + if (!data) { + ast_log(LOG_WARNING, "UnpauseQueueMember requires an argument ([queuename]|interface])\n"); + return -1; + } + + queuename = ast_strdupa((char *)data); + if (!queuename) { + ast_log(LOG_ERROR, "Out of memory\n"); + return -1; + } + + interface = strchr(queuename, '|'); + if (!interface) { + ast_log(LOG_WARNING, "Missing interface argument to PauseQueueMember ([queuename]|interface])\n"); + return -1; + } + + LOCAL_USER_ADD(u); + + *interface = '\0'; + interface++; + + if (set_member_paused(queuename, interface, 0)) { + ast_log(LOG_WARNING, "Attempt to unpause interface %s, not found\n", interface); + if (ast_exists_extension(chan, chan->context, chan->exten, chan->priority + 101, chan->cid.cid_num)) { + chan->priority += 100; + LOCAL_USER_REMOVE(u); + return 0; + } + return -1; + } + + LOCAL_USER_REMOVE(u); + + return 0; +} + static int rqm_exec(struct ast_channel *chan, void *data) { int res=-1; @@ -1962,7 +2145,7 @@ static int aqm_exec(struct ast_channel *chan, void *data) } } - switch (add_to_queue(queuename, interface, penalty)) { + switch (add_to_queue(queuename, interface, penalty, 0)) { case RES_OKAY: ast_log(LOG_NOTICE, "Added interface '%s' to queue '%s'\n", interface, queuename); res = 0; @@ -2502,6 +2685,8 @@ static int __queues_show(int fd, int argc, char **argv, int queue_show) max[0] = '\0'; if (mem->dynamic) strncat(max, " (dynamic)", sizeof(max) - strlen(max) - 1); + if (mem->paused) + strncat(max, " (paused)", sizeof(max) - strlen(max) - 1); if (mem->status) snprintf(max + strlen(max), sizeof(max) - strlen(max), " (%s)", status2str(mem->status, tmpbuf, sizeof(tmpbuf))); if (mem->calls) { @@ -2615,10 +2800,11 @@ static int manager_queues_status( struct mansession *s, struct message *m ) "CallsTaken: %d\r\n" "LastCall: %ld\r\n" "Status: %d\r\n" + "Paused: %d\r\n" "%s" "\r\n", q->name, mem->interface, mem->dynamic ? "dynamic" : "static", - mem->penalty, mem->calls, mem->lastcall, mem->status, idText); + mem->penalty, mem->calls, mem->lastcall, mem->status, mem->paused, idText); /* List Queue Entries */ @@ -2646,12 +2832,13 @@ static int manager_queues_status( struct mansession *s, struct message *m ) static int manager_add_queue_member(struct mansession *s, struct message *m) { - char *queuename, *interface, *penalty_s; - int penalty = 0; + char *queuename, *interface, *penalty_s, *paused_s; + int paused, penalty = 0; queuename = astman_get_header(m, "Queue"); interface = astman_get_header(m, "Interface"); penalty_s = astman_get_header(m, "Penalty"); + paused_s = astman_get_header(m, "Paused"); if (ast_strlen_zero(queuename)) { astman_send_error(s, m, "'Queue' not specified."); @@ -2669,7 +2856,12 @@ static int manager_add_queue_member(struct mansession *s, struct message *m) penalty = 0; } - switch (add_to_queue(queuename, interface, penalty)) { + if (ast_strlen_zero(paused_s)) + paused = 0; + else + paused = abs(ast_true(paused_s)); + + switch (add_to_queue(queuename, interface, penalty, paused)) { case RES_OKAY: astman_send_ack(s, m, "Added interface to queue"); break; @@ -2715,6 +2907,33 @@ static int manager_remove_queue_member(struct mansession *s, struct message *m) return 0; } +static int manager_pause_queue_member(struct mansession *s, struct message *m) +{ + char *queuename, *interface, *paused_s; + int paused; + + interface = astman_get_header(m, "Interface"); + paused_s = astman_get_header(m, "Paused"); + queuename = astman_get_header(m, "Queue"); /* Optional - if not supplied, pause the given Interface in all queues */ + + if (ast_strlen_zero(interface) || ast_strlen_zero(paused_s)) { + astman_send_error(s, m, "Need 'Interface' and 'Paused' parameters."); + return 0; + } + + paused = abs(ast_true(paused_s)); + + if (set_member_paused(queuename, interface, paused)) + astman_send_error(s, m, "Interface not found"); + else + if (paused) + astman_send_ack(s, m, "Interface paused successfully"); + else + astman_send_ack(s, m, "Interface unpaused successfully"); + + return 0; +} + static int handle_add_queue_member(int fd, int argc, char *argv[]) { char *queuename, *interface; @@ -2744,7 +2963,7 @@ static int handle_add_queue_member(int fd, int argc, char *argv[]) penalty = 0; } - switch (add_to_queue(queuename, interface, penalty)) { + switch (add_to_queue(queuename, interface, penalty, 0)) { case RES_OKAY: ast_cli(fd, "Added interface '%s' to queue '%s'\n", interface, queuename); return RESULT_SUCCESS; @@ -2909,9 +3128,12 @@ int unload_module(void) ast_manager_unregister("QueueStatus"); ast_manager_unregister("QueueAdd"); ast_manager_unregister("QueueRemove"); + ast_manager_unregister("QueuePause"); ast_devstate_del(statechange_queue, NULL); ast_unregister_application(app_aqm); ast_unregister_application(app_rqm); + ast_unregister_application(app_pqm); + ast_unregister_application(app_upqm); return ast_unregister_application(app); } @@ -2929,8 +3151,11 @@ int load_module(void) ast_manager_register( "QueueStatus", 0, manager_queues_status, "Queue Status" ); ast_manager_register( "QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member, "Add interface to queue." ); ast_manager_register( "QueueRemove", EVENT_FLAG_AGENT, manager_remove_queue_member, "Remove interface from queue." ); + ast_manager_register( "QueuePause", EVENT_FLAG_AGENT, manager_pause_queue_member, "Makes a queue member temporarily unavailable" ); ast_register_application(app_aqm, aqm_exec, app_aqm_synopsis, app_aqm_descrip) ; ast_register_application(app_rqm, rqm_exec, app_rqm_synopsis, app_rqm_descrip) ; + ast_register_application(app_pqm, pqm_exec, app_pqm_synopsis, app_pqm_descrip) ; + ast_register_application(app_upqm, upqm_exec, app_upqm_synopsis, app_upqm_descrip) ; } reload_queues(); -- cgit v1.2.3