summaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2013-08-22 18:52:41 +0000
committerMark Michelson <mmichelson@digium.com>2013-08-22 18:52:41 +0000
commit00baddb9065b528c6d7d503e754a915da1ac10d1 (patch)
treeefc9f254012e996542b6abec7afdac7a7964d990 /apps
parent8049bf94f70edd405b59b4a2ed5aa8119fd9d62b (diff)
Massively clean up app_queue.
This essentially makes app_queue usable again. From reviewboard: * Reporting of transfers and call completion is done by creating stasis subscriptions and listening for specific events in order to determine when the call is finished (either via a transfer or hangup). * Dial end messages have been added where they were previously missing. * Queue stats are properly being updated again once calls have finished. * AgentComplete stasis messages and AMI events are now occurring again. * Mixmonitor starting has been factored into its own function and uses the Mixmonitor API now instead of using ast_pbx_run() In addition to the changes in app_queue, there are several supplementary changes as well: * Queue logging now differentiates between attended and blind transfers. A note about this is in the CHANGES file. * Local channel optimization events now report more information. This includes which of the two local channels involved is the destination of the optimization, the channel that is replacing the destination local channel, and an identifier so that begin and end events can be matched to each other. The end events are now sent whether the optimization was successful or not and includes an indicator of whether the optimization was successful. * Changes were made to features and bridging_basic so that additional flags may be set on a bridge. This is necessary because the queue requires that its bridge only allows move-swap local channel optimizations into the bridge. (closes issue ASTERISK-21517) Reported by Matt Jordan (closes issue ASTERISK-21943) Reported by Matt Jordan Review: https://reviewboard.asterisk.org/r/2694 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397451 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'apps')
-rw-r--r--apps/app_queue.c1061
1 files changed, 785 insertions, 276 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c
index 30b5ed670..bd7958767 100644
--- a/apps/app_queue.c
+++ b/apps/app_queue.c
@@ -108,6 +108,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/bridge_after.h"
+#include "asterisk/stasis_bridges.h"
+#include "asterisk/core_local.h"
+#include "asterisk/mixmonitor.h"
+#include "asterisk/core_unreal.h"
+#include "asterisk/bridge_basic.h"
/* Define, to debug reference counts on queues, without debugging reference counts on queue members */
/* #define REF_DEBUG_ONLY_QUEUES */
@@ -1584,10 +1589,6 @@ static void update_realtime_members(struct call_queue *q);
static struct member *interface_exists(struct call_queue *q, const char *interface);
static int set_member_paused(const char *queuename, const char *interface, const char *reason, int paused);
-#if 0 // BUGBUG
-static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan);
-#endif // BUGBUG
-
static struct member *find_member_by_queuename_and_interface(const char *queuename, const char *interface);
/*! \brief sets the QUEUESTATUS channel variable */
static void set_queue_result(struct ast_channel *chan, enum queue_result res)
@@ -1734,7 +1735,9 @@ static inline struct call_queue *_queue_ref(struct call_queue *q, const char *ta
static inline struct call_queue *_queue_unref(struct call_queue *q, const char *tag, const char *file, int line, const char *filename)
{
- __ao2_ref_debug(q, -1, tag, file, line, filename);
+ if (q) {
+ __ao2_ref_debug(q, -1, tag, file, line, filename);
+ }
return NULL;
}
@@ -1753,7 +1756,9 @@ static inline struct call_queue *queue_ref(struct call_queue *q)
static inline struct call_queue *queue_unref(struct call_queue *q)
{
- ao2_ref(q, -1);
+ if (q) {
+ ao2_ref(q, -1);
+ }
return NULL;
}
#endif
@@ -1906,25 +1911,19 @@ static void queue_member_manager_event(void *data,
"%s", ast_str_buffer(event_string));
}
-static void queue_publish_multi_channel_blob(struct ast_channel *caller, struct ast_channel *agent, struct stasis_message_type *type, struct ast_json *blob)
+static void queue_publish_multi_channel_snapshot_blob(struct stasis_topic *topic,
+ struct ast_channel_snapshot *caller_snapshot,
+ struct ast_channel_snapshot *agent_snapshot,
+ struct stasis_message_type *type, struct ast_json *blob)
{
RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- struct ast_channel_snapshot *caller_snapshot;
- struct ast_channel_snapshot *agent_snapshot;
payload = ast_multi_channel_blob_create(blob);
if (!payload) {
return;
}
- caller_snapshot = ast_channel_snapshot_create(caller);
- agent_snapshot = ast_channel_snapshot_create(agent);
-
- if (!caller_snapshot || !agent_snapshot) {
- return;
- }
-
ast_multi_channel_blob_add_channel(payload, "caller", caller_snapshot);
ast_multi_channel_blob_add_channel(payload, "agent", agent_snapshot);
@@ -1933,7 +1932,24 @@ static void queue_publish_multi_channel_blob(struct ast_channel *caller, struct
return;
}
- stasis_publish(ast_channel_topic(caller), msg);
+ stasis_publish(topic, msg);
+}
+
+static void queue_publish_multi_channel_blob(struct ast_channel *caller, struct ast_channel *agent,
+ struct stasis_message_type *type, struct ast_json *blob)
+{
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, agent_snapshot, NULL, ao2_cleanup);
+
+ caller_snapshot = ast_channel_snapshot_create(caller);
+ agent_snapshot = ast_channel_snapshot_create(agent);
+
+ if (!caller_snapshot || !agent_snapshot) {
+ return;
+ }
+
+ queue_publish_multi_channel_snapshot_blob(ast_channel_topic(caller), caller_snapshot,
+ agent_snapshot, type, blob);
}
static void queue_publish_member_blob(struct stasis_message_type *type, struct ast_json *blob)
@@ -3927,7 +3943,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
member_call_pending_clear(tmp->member);
- /* BUGBUG: Raise a BUSY dial end message here */
+ publish_dial_end_event(qe->chan, tmp, NULL, "BUSY");
tmp->stillgoing = 0;
++*busies;
return 0;
@@ -4350,14 +4366,8 @@ static struct callattempt *wait_for_answer(struct queue_ent *qe, struct callatte
if (pos == 1 /* not found */) {
if (numlines == (numbusies + numnochan)) {
ast_debug(1, "Everyone is busy at this time\n");
- /* BUGBUG: We shouldn't have to set anything here, as each
- * individual dial attempt should have set that CDR to busy
- */
} else {
ast_debug(3, "No one is answering queue '%s' (%d numlines / %d busies / %d failed channels)\n", queue, numlines, numbusies, numnochan);
- /* BUGBUG: We shouldn't have to set anything here, as each
- * individual dial attempt should have set that CDR to busy
- */
}
*to = 0;
return NULL;
@@ -4983,7 +4993,6 @@ static int wait_our_turn(struct queue_ent *qe, int ringing, enum queue_result *r
return res;
}
-#if 0 // BUGBUG
/*!
* \brief update the queue status
* \retval Always 0
@@ -5028,7 +5037,6 @@ static int update_queue(struct call_queue *q, struct member *member, int callcom
ao2_unlock(q);
return 0;
}
-#endif // BUGBUG
/*! \brief Calculate the metric of each member in the outgoing callattempts
*
@@ -5117,15 +5125,17 @@ enum agent_complete_reason {
TRANSFER
};
-#if 0 // BUGBUG
/*! \brief Send out AMI message with member call completion status information */
-static void send_agent_complete(const struct queue_ent *qe, const char *queuename,
- const struct ast_channel *peer, const struct member *member, time_t callstart,
- char *vars, size_t vars_len, enum agent_complete_reason rsn)
+static void send_agent_complete(const char *queuename, struct ast_channel_snapshot *caller,
+ struct ast_channel_snapshot *peer, const struct member *member, time_t holdstart,
+ time_t callstart, enum agent_complete_reason rsn)
{
const char *reason = NULL; /* silence dumb compilers */
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
+ ast_assert(peer != NULL);
+ ast_assert(caller != NULL);
+
switch (rsn) {
case CALLER:
reason = "caller";
@@ -5142,121 +5152,656 @@ static void send_agent_complete(const struct queue_ent *qe, const char *queuenam
"Queue", queuename,
"Interface", member->interface,
"MemberName", member->membername,
- "HoldTime", (long)(callstart - qe->start)
- "TalkTime", (long)(time(NULL) - callstart)
+ "HoldTime", (long)(callstart - holdstart),
+ "TalkTime", (long)(time(NULL) - callstart),
"Reason", reason);
- queue_publish_multi_channel_blob(qe->chan, peer, queue_agent_complete_type(), blob);
+
+ queue_publish_multi_channel_snapshot_blob(ast_queue_topic(queuename), caller, peer,
+ queue_agent_complete_type(), blob);
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-struct queue_transfer_ds {
- struct queue_ent *qe;
+static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct ast_channel_blob *agent_blob;
+
+ agent_blob = stasis_message_data(msg);
+
+ if (ast_channel_agent_login_type() == stasis_message_type(msg)) {
+ ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
+ ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
+ "AGENTLOGIN", "%s", agent_blob->snapshot->name);
+ } else if (ast_channel_agent_logoff_type() == stasis_message_type(msg)) {
+ ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
+ ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
+ "AGENTLOGOFF", "%s|%ld", agent_blob->snapshot->name,
+ (long) ast_json_integer_get(ast_json_object_get(agent_blob->blob, "logintime")));
+ }
+}
+
+/*!
+ * \brief Structure representing relevant data during a local channel optimization
+ *
+ * The reason we care about local channel optimizations is that we want to be able
+ * to accurately report when the caller and queue member have stopped talking to
+ * each other. A local channel optimization can cause it to appear that the conversation
+ * has stopped immediately after it has begun. By tracking that the relevant channels
+ * to monitor have changed due to a local channel optimization, we can give accurate
+ * reports.
+ *
+ * Local channel optimizations for queues are restricted from their normal operation.
+ * Bridges created by queues can only be the destination of local channel optimizations,
+ * not the source. In addition, move-swap local channel optimizations are the only
+ * permitted types of local channel optimization.
+ *
+ * This data is populated when we are told that a local channel optimization begin
+ * is occurring. When we get told the optimization has ended successfully, we then
+ * apply the data here into the queue_stasis_data.
+ */
+struct local_optimization {
+ /*! The uniqueid of the channel that will be taking the place of the caller or member */
+ const char *source_chan_uniqueid;
+ /*! Indication of whether we think there is a local channel optimization in progress */
+ int in_progress;
+ /*! The identifier for this local channel optimization */
+ unsigned int id;
+};
+
+/*!
+ * \brief User data for stasis subscriptions used for queue calls.
+ *
+ * app_queue subscribes to channel and bridge events for all bridged calls.
+ * app_queue cares about the following events:
+ *
+ * \li bridge enter: To determine the unique ID of the bridge created for the call.
+ * \li blind transfer: To send an appropriate agent complete event.
+ * \li attended transfer: To send an appropriate agent complete event.
+ * \li local optimization: To update caller and member unique IDs for the call.
+ * \li hangup: To send an appropriate agent complete event.
+ *
+ * The stasis subscriptions last until we determine that the caller and the member
+ * are no longer bridged with each other.
+ */
+struct queue_stasis_data {
+ AST_DECLARE_STRING_FIELDS(
+ /*! The unique ID of the caller's channel. */
+ AST_STRING_FIELD(caller_uniqueid);
+ /*! The unique ID of the queue member's channel */
+ AST_STRING_FIELD(member_uniqueid);
+ /*! The unique ID of the bridge created by the queue */
+ AST_STRING_FIELD(bridge_uniqueid);
+ );
+ /*! The relevant queue */
+ struct call_queue *queue;
+ /*! The queue member that has answered the call */
struct member *member;
+ /*! The time at which the caller entered the queue. Start of the caller's hold time */
+ time_t holdstart;
+ /*! The time at which the member answered the call. */
time_t starttime;
+ /*! The original position of the caller when he entered the queue */
+ int caller_pos;
+ /*! Indication if the call was answered within the configured service level of the queue */
int callcompletedinsl;
+ /*! Indicates if the stasis subscriptions are shutting down */
+ int dying;
+ /*! The stasis message router for bridge events */
+ struct stasis_message_router *bridge_router;
+ /*! The stasis message router for channel events */
+ struct stasis_message_router *channel_router;
+ /*! Local channel optimization details for the caller */
+ struct local_optimization caller_optimize;
+ /*! Local channel optimization details for the member */
+ struct local_optimization member_optimize;
};
-#endif // BUGBUG
-#if 0 // BUGBUG
-static void queue_transfer_destroy(void *data)
+/*!
+ * \internal
+ * \brief Free memory for a queue_stasis_data
+ */
+static void queue_stasis_data_destructor(void *obj)
{
- struct queue_transfer_ds *qtds = data;
- ast_free(qtds);
+ struct queue_stasis_data *queue_data = obj;
+
+ /* This can only happen if refcounts for this object have got severely messed up */
+ ast_assert(queue_data->bridge_router == NULL);
+ ast_assert(queue_data->channel_router == NULL);
+
+ ao2_cleanup(queue_data->member);
+ queue_unref(queue_data->queue);
+ ast_string_field_free_memory(queue_data);
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-/*! \brief a datastore used to help correctly log attended transfers of queue callers
+/*!
+ * \internal
+ * \brief End all stasis subscriptions on a queue_stasis_data
*/
-static const struct ast_datastore_info queue_transfer_info = {
- .type = "queue_transfer",
- .chan_fixup = queue_transfer_fixup,
- .destroy = queue_transfer_destroy,
-};
-#endif // BUGBUG
+static void remove_stasis_subscriptions(struct queue_stasis_data *queue_data)
+{
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ queue_data->dying = 1;
+ stasis_message_router_unsubscribe(queue_data->bridge_router);
+ queue_data->bridge_router = NULL;
+ stasis_message_router_unsubscribe(queue_data->channel_router);
+ queue_data->channel_router = NULL;
+}
+
+/*!
+ * \internal
+ * \brief Allocate a queue_stasis_data and initialize its data.
+ */
+static struct queue_stasis_data *queue_stasis_data_alloc(struct queue_ent *qe,
+ struct ast_channel *peer, struct member *mem, time_t holdstart,
+ time_t starttime, int callcompletedinsl)
+{
+ struct queue_stasis_data *queue_data;
+
+ queue_data = ao2_alloc(sizeof(*queue_data), queue_stasis_data_destructor);
+ if (!queue_data) {
+ return NULL;
+ }
+
+ if (ast_string_field_init(queue_data, 64)) {
+ ao2_cleanup(queue_data);
+ return NULL;
+ }
-#if 0 // BUGBUG
-/*! \brief Log an attended transfer when a queue caller channel is masqueraded
+ ast_string_field_set(queue_data, caller_uniqueid, ast_channel_uniqueid(qe->chan));
+ ast_string_field_set(queue_data, member_uniqueid, ast_channel_uniqueid(peer));
+ queue_data->queue = queue_ref(qe->parent);
+ queue_data->starttime = starttime;
+ queue_data->holdstart = holdstart;
+ queue_data->callcompletedinsl = callcompletedinsl;
+ queue_data->caller_pos = qe->opos;
+ ao2_ref(mem, +1);
+ queue_data->member = mem;
+ return queue_data;
+}
+
+/*!
+ * \internal
+ * \brief Log an attended transfer in the queue log.
*
- * When a caller is masqueraded, we want to log a transfer. Fixup time is the closest we can come to when
- * the actual transfer occurs. This happens during the masquerade after datastores are moved from old_chan
- * to new_chan. This is why new_chan is referenced for exten, context, and datastore information.
+ * Attended transfer queue log messages vary based on the method by which the
+ * attended transfer was completed.
*
- * At the end of this, we want to remove the datastore so that this fixup function is not called on any
- * future masquerades of the caller during the current call.
+ * \param queue_data Data pertaining to the particular call in the queue.
+ * \param caller The channel snapshot for the caller channel in the queue.
+ * \param atxfer_msg The stasis attended transfer message data.
*/
-static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+static void log_attended_transfer(struct queue_stasis_data *queue_data, struct ast_channel_snapshot *caller,
+ struct ast_attended_transfer_message *atxfer_msg)
{
- struct queue_transfer_ds *qtds = data;
- struct queue_ent *qe = qtds->qe;
- struct member *member = qtds->member;
- time_t callstart = qtds->starttime;
- int callcompletedinsl = qtds->callcompletedinsl;
- struct ast_datastore *datastore;
+ RAII_VAR(struct ast_str *, transfer_str, ast_str_create(32), ast_free);
+
+ if (!transfer_str) {
+ ast_log(LOG_WARNING, "Unable to log attended transfer to queue log\n");
+ return;
+ }
+
+ switch (atxfer_msg->dest_type) {
+ case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
+ ast_str_set(&transfer_str, 0, "BRIDGE|%s", atxfer_msg->dest.bridge);
+ break;
+ case AST_ATTENDED_TRANSFER_DEST_APP:
+ ast_str_set(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
+ break;
+ case AST_ATTENDED_TRANSFER_DEST_LINK:
+ ast_str_set(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
+ atxfer_msg->dest.links[1]->name);
+ break;
+ case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
+ case AST_ATTENDED_TRANSFER_DEST_FAIL:
+ /* Threeways are headed off and should not be logged here */
+ ast_assert(0);
+ return;
+ }
+
+ ast_queue_log(queue_data->queue->name, caller->uniqueid, queue_data->member->membername, "ATTENDEDTRANSFER", "%s|%ld|%ld|%d",
+ ast_str_buffer(transfer_str),
+ (long) queue_data->starttime - queue_data->holdstart,
+ (long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
+}
- ast_queue_log(qe->parent->name, ast_channel_uniqueid(qe->chan), member->membername, "TRANSFER", "%s|%s|%ld|%ld|%d",
- ast_channel_exten(new_chan), ast_channel_context(new_chan), (long) (callstart - qe->start),
- (long) (time(NULL) - callstart), qe->opos);
+/*!
+ * \internal
+ * \brief Handle a stasis bridge enter event.
+ *
+ * We track this particular event in order to learn what bridge
+ * was created for the queue call.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the bridge enter event
+ */
+static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (!ast_strlen_zero(queue_data->bridge_uniqueid)) {
+ return;
+ }
- update_queue(qe->parent, member, callcompletedinsl, (time(NULL) - callstart));
+ if (!strcmp(enter_blob->channel->uniqueid, queue_data->caller_uniqueid)) {
+ ast_string_field_set(queue_data, bridge_uniqueid,
+ enter_blob->bridge->uniqueid);
+ ast_debug(3, "Detected entry of caller channel %s into bridge %s\n",
+ enter_blob->channel->name, queue_data->bridge_uniqueid);
+ }
+}
- /* No need to lock the channels because they are already locked in ast_do_masquerade */
- if ((datastore = ast_channel_datastore_find(old_chan, &queue_transfer_info, NULL))) {
- ast_channel_datastore_remove(old_chan, datastore);
+/*!
+ * \brief Handle a blind transfer event
+ *
+ * This event is important in order to be able to log the end of the
+ * call to the queue log and to stasis.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the blind transfer event
+ */
+static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
+ struct ast_json *result_blob;
+ struct ast_json *exten_blob;
+ struct ast_json *context_blob;
+ const char *exten;
+ const char *context;
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ result_blob = ast_json_object_get(blind_blob->blob, "result");
+ if (!result_blob) {
+ return;
+ }
+
+ if (ast_json_integer_get(result_blob) == AST_BRIDGE_TRANSFER_FAIL) {
+ return;
+ }
+
+ ao2_lock(queue_data);
+
+ if (ast_strlen_zero(queue_data->bridge_uniqueid) ||
+ strcmp(queue_data->bridge_uniqueid, blind_blob->bridge->uniqueid)) {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+ member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+ ao2_unlock(queue_data);
+
+ exten_blob = ast_json_object_get(blind_blob->blob, "exten");
+ exten = exten_blob ? ast_json_string_get(exten_blob) : "<unknown>";
+ context_blob = ast_json_object_get(blind_blob->blob, "context");
+ context = context_blob ? ast_json_string_get(context_blob) : "<unknown>";
+
+ ast_debug(3, "Detected blind transfer in queue %s\n", queue_data->queue->name);
+ ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
+ "BLINDTRANSFER", "%s|%s|%ld|%ld|%d",
+ exten, context,
+ (long) queue_data->starttime - queue_data->holdstart,
+ (long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
+
+ send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+ queue_data->holdstart, queue_data->starttime, TRANSFER);
+ update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+ time(NULL) - queue_data->starttime);
+ remove_stasis_subscriptions(queue_data);
+}
+
+/*!
+ * \brief Handle an attended transfer event
+ *
+ * This event is important in order to be able to log the end of the
+ * call to the queue log and to stasis.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the attended transfer event.
+ */
+static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (atxfer_msg->result == AST_BRIDGE_TRANSFER_FAIL ||
+ atxfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_THREEWAY) {
+ return;
+ }
+
+ ao2_lock(queue_data);
+
+ if (ast_strlen_zero(queue_data->bridge_uniqueid)) {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ if ((!atxfer_msg->to_transferee.bridge_snapshot || strcmp(queue_data->bridge_uniqueid,
+ atxfer_msg->to_transferee.bridge_snapshot->uniqueid)) &&
+ (!atxfer_msg->to_transfer_target.bridge_snapshot || strcmp(queue_data->bridge_uniqueid,
+ atxfer_msg->to_transfer_target.bridge_snapshot->uniqueid))) {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+ member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+ ao2_unlock(queue_data);
+
+ ast_debug(3, "Detected attended transfer in queue %s\n", queue_data->queue->name);
+
+ log_attended_transfer(queue_data, caller_snapshot, atxfer_msg);
+
+ send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+ queue_data->holdstart, queue_data->starttime, TRANSFER);
+ update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+ time(NULL) - queue_data->starttime);
+ remove_stasis_subscriptions(queue_data);
+}
+
+/*!
+ * \internal
+ * \brief Callback for all stasis bridge events
+ *
+ * Based on the event and what bridge it is on, the task is farmed out to relevant
+ * subroutines for further processing.
+ */
+static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ if (stasis_subscription_final_message(sub, msg)) {
+ ao2_cleanup(userdata);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Handler for the beginning of a local channel optimization
+ *
+ * This method gathers data relevant to the local channel optimization and stores
+ * it to be used once the local optimization completes.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the local optimization begin event
+ */
+static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
+ struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
+ struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
+ struct ast_channel_snapshot *source = ast_multi_channel_blob_get_channel(optimization_blob, "source");
+ struct local_optimization *optimization;
+ unsigned int id;
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
+ optimization = &queue_data->member_optimize;
+ } else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
+ optimization = &queue_data->caller_optimize;
} else {
- ast_log(LOG_WARNING, "Can't find the queue_transfer datastore.\n");
+ return;
+ }
+
+ /* We only allow move-swap optimizations, so there had BETTER be a source */
+ ast_assert(source != NULL);
+
+ optimization->source_chan_uniqueid = ast_strdup(source->uniqueid);
+ if (!optimization->source_chan_uniqueid) {
+ ast_log(LOG_ERROR, "Unable to track local channel optimization for channel %s. Expect further errors\n", local_one->name);
+ return;
}
+ id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
+
+ optimization->id = id;
+ optimization->in_progress = 1;
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-/*! \brief mechanism to tell if a queue caller was atxferred by a queue member.
+/*!
+ * \internal
+ * \brief Handler for the end of a local channel optimization
*
- * When a caller is atxferred, then the queue_transfer_info datastore
- * is removed from the channel. If it's still there after the bridge is
- * broken, then the caller was not atxferred.
+ * This method takes the data gathered during the local channel optimization begin
+ * event and applies it to the queue stasis data appropriately. This generally involves
+ * updating the caller or member unique ID with the channel that is taking the place of
+ * the previous caller or member.
*
- * \note Only call this with chan locked
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the local optimization end event
*/
-static int attended_transfer_occurred(struct ast_channel *chan)
+static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
+ struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
+ struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
+ struct local_optimization *optimization;
+ int is_caller;
+ unsigned int id;
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
+ optimization = &queue_data->member_optimize;
+ is_caller = 0;
+ } else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
+ optimization = &queue_data->caller_optimize;
+ is_caller = 1;
+ } else {
+ return;
+ }
+
+ id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
+
+ if (!optimization->in_progress) {
+ ast_log(LOG_WARNING, "Told of a local optimization end when we had no previous begin\n");
+ return;
+ }
+
+ if (id != optimization->id) {
+ ast_log(LOG_WARNING, "Local optimization end event ID does not match begin (%u != %u)\n",
+ id, optimization->id);
+ return;
+ }
+
+ if (is_caller) {
+ ast_debug(3, "Local optimization: Changing queue caller uniqueid from %s to %s\n",
+ queue_data->caller_uniqueid, optimization->source_chan_uniqueid);
+ ast_string_field_set(queue_data, caller_uniqueid, optimization->source_chan_uniqueid);
+ } else {
+ ast_debug(3, "Local optimization: Changing queue member uniqueid from %s to %s\n",
+ queue_data->member_uniqueid, optimization->source_chan_uniqueid);
+ ast_string_field_set(queue_data, member_uniqueid, optimization->source_chan_uniqueid);
+ }
+
+ optimization->in_progress = 0;
+}
+
+/*!
+ * \internal
+ * \brief Handler for hangup stasis event
+ *
+ * This is how we determine that the caller or member has hung up and the call
+ * has ended. An appropriate queue log and stasis message are raised in this
+ * callback.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the hangup event.
+ */
+static void handle_hangup(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
{
- return ast_channel_datastore_find(chan, &queue_transfer_info, NULL) ? 0 : 1;
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_channel_blob *channel_blob = stasis_message_data(msg);
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel *, chan, NULL, ao2_cleanup);
+ enum agent_complete_reason reason;
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ ao2_lock(queue_data);
+
+ if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->caller_uniqueid)) {
+ reason = CALLER;
+ } else if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->member_uniqueid)) {
+ reason = AGENT;
+ } else {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ chan = ast_channel_get_by_name(channel_blob->snapshot->name);
+ if (chan && ast_channel_has_role(chan, AST_TRANSFERER_ROLE_NAME)) {
+ /* Channel that is hanging up is doing it as part of a transfer.
+ * We'll get a transfer event later
+ */
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+ member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+ ao2_unlock(queue_data);
+
+ ast_debug(3, "Detected hangup of queue %s channel %s\n", reason == CALLER ? "caller" : "member",
+ channel_blob->snapshot->name);
+
+ ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
+ reason == CALLER ? "COMPLETECALLER" : "COMPLETEAGENT", "%ld|%ld|%d",
+ (long) (queue_data->starttime - queue_data->holdstart),
+ (long) (time(NULL) - queue_data->starttime), queue_data->caller_pos);
+
+ send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+ queue_data->holdstart, queue_data->starttime, reason);
+ update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+ time(NULL) - queue_data->starttime);
+ remove_stasis_subscriptions(queue_data);
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-/*! \brief create a datastore for storing relevant info to log attended transfers in the queue_log
+/*!
+ * \internal
+ * \brief Callback for all stasis channel events
+ *
+ * Based on the event and the channels involved, the work is farmed out into
+ * subroutines for further processing.
*/
-static struct ast_datastore *setup_transfer_datastore(struct queue_ent *qe, struct member *member, time_t starttime, int callcompletedinsl)
+static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
{
- struct ast_datastore *ds;
- struct queue_transfer_ds *qtds = ast_calloc(1, sizeof(*qtds));
+ if (stasis_subscription_final_message(sub, msg)) {
+ ao2_cleanup(userdata);
+ }
+}
- if (!qtds) {
- ast_log(LOG_WARNING, "Memory allocation error!\n");
- return NULL;
+/*!
+ * \internal
+ * \brief Create stasis subscriptions for a particular call in the queue.
+ *
+ * These subscriptions are created once the call has been answered. The subscriptions
+ * are put in place so that call progress may be tracked. Once the call can be determined
+ * to have ended, then messages are logged to the queue log and stasis events are emitted.
+ *
+ * \param qe The queue entry representing the caller
+ * \param peer The channel that has answered the call
+ * \param mem The queue member that answered the call
+ * \param holdstart The time at which the caller entered the queue
+ * \param starttime The time at which the call was answered
+ * \param callcompletedinsl Indicates if the call was answered within the configured service level of the queue.
+ * \retval 0 Success
+ * \retval non-zero Failure
+ */
+static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, struct member *mem,
+ time_t holdstart, time_t starttime, int callcompletedinsl)
+{
+ struct queue_stasis_data *queue_data = queue_stasis_data_alloc(qe, peer, mem, holdstart, starttime, callcompletedinsl);
+
+ if (!queue_data) {
+ return -1;
}
- ast_channel_lock(qe->chan);
- if (!(ds = ast_datastore_alloc(&queue_transfer_info, NULL))) {
- ast_channel_unlock(qe->chan);
- ast_free(qtds);
- ast_log(LOG_WARNING, "Unable to create transfer datastore. queue_log will not show attended transfer\n");
- return NULL;
+ queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+ if (!queue_data->bridge_router) {
+ ao2_ref(queue_data, -1);
+ return -1;
}
- qtds->qe = qe;
- /* This member is refcounted in try_calling, so no need to add it here, too */
- qtds->member = member;
- qtds->starttime = starttime;
- qtds->callcompletedinsl = callcompletedinsl;
- ds->data = qtds;
- ast_channel_datastore_add(qe->chan, ds);
- ast_channel_unlock(qe->chan);
- return ds;
+ stasis_message_router_add(queue_data->bridge_router, ast_channel_entered_bridge_type(),
+ handle_bridge_enter, queue_data);
+ stasis_message_router_add(queue_data->bridge_router, ast_blind_transfer_type(),
+ handle_blind_transfer, queue_data);
+ stasis_message_router_add(queue_data->bridge_router, ast_attended_transfer_type(),
+ handle_attended_transfer, queue_data);
+ stasis_message_router_set_default(queue_data->bridge_router,
+ queue_bridge_cb, queue_data);
+
+ queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+ if (!queue_data->channel_router) {
+ /* Unsubscribing from the bridge router will remove the only ref of queue_data,
+ * thus beginning the destruction process
+ */
+ stasis_message_router_unsubscribe(queue_data->bridge_router);
+ queue_data->bridge_router = NULL;
+ return -1;
+ }
+
+ ao2_ref(queue_data, +1);
+ stasis_message_router_add(queue_data->channel_router, ast_local_optimization_begin_type(),
+ handle_local_optimization_begin, queue_data);
+ stasis_message_router_add(queue_data->channel_router, ast_local_optimization_end_type(),
+ handle_local_optimization_end, queue_data);
+ stasis_message_router_add(queue_data->channel_router, ast_channel_hangup_request_type(),
+ handle_hangup, queue_data);
+ stasis_message_router_set_default(queue_data->channel_router,
+ queue_channel_cb, queue_data);
+
+ return 0;
}
-#endif // BUGBUG
struct queue_end_bridge {
struct call_queue *q;
@@ -5312,6 +5857,82 @@ static void setup_peer_after_bridge_goto(struct ast_channel *chan, struct ast_ch
}
}
+static void escape_and_substitute(struct ast_channel *chan, const char *input,
+ char *output, size_t size)
+{
+ const char *m = input;
+ char escaped[size];
+ char *p;
+
+ for (p = escaped; p < escaped + size - 1; p++, m++) {
+ switch (*m) {
+ case '^':
+ if (*(m + 1) == '{') {
+ *p = '$';
+ }
+ break;
+ case ',':
+ *p++ = '\\';
+ /* Fall through */
+ default:
+ *p = *m;
+ }
+ if (*m == '\0')
+ break;
+ }
+
+ if (p == escaped + size) {
+ escaped[size - 1] = '\0';
+ }
+
+ pbx_substitute_variables_helper(chan, escaped, output, size - 1);
+}
+
+static void setup_mixmonitor(struct queue_ent *qe, const char *filename)
+{
+ char escaped_filename[256];
+ char file_with_ext[256];
+ char mixmonargs[1512];
+ char escaped_monitor_exec[1024];
+ const char *monitor_options;
+ const char *monitor_exec;
+
+ if (filename) {
+ escape_and_substitute(qe->chan, filename, escaped_filename, sizeof(escaped_filename));
+ } else {
+ ast_copy_string(escaped_filename, ast_channel_uniqueid(qe->chan), sizeof(escaped_filename));
+ }
+
+ ast_channel_lock(qe->chan);
+ if ((monitor_exec = pbx_builtin_getvar_helper(qe->chan, "MONITOR_EXEC"))) {
+ monitor_exec = ast_strdupa(monitor_exec);
+ }
+ if ((monitor_options = pbx_builtin_getvar_helper(qe->chan, "MONITOR_OPTIONS"))) {
+ monitor_options = ast_strdupa(monitor_options);
+ } else {
+ monitor_options = "";
+ }
+ ast_channel_unlock(qe->chan);
+
+ if (monitor_exec) {
+ escape_and_substitute(qe->chan, monitor_exec, escaped_monitor_exec, sizeof(escaped_monitor_exec));
+ }
+
+ snprintf(file_with_ext, sizeof(file_with_ext), "%s.%s", escaped_filename, qe->parent->monfmt);
+
+ if (!ast_strlen_zero(escaped_monitor_exec)) {
+ snprintf(mixmonargs, sizeof(mixmonargs), "b%s,%s", monitor_options, escaped_monitor_exec);
+ } else {
+ snprintf(mixmonargs, sizeof(mixmonargs), "b%s", monitor_options);
+ }
+
+ ast_debug(1, "Arguments being passed to MixMonitor: %s,%s\n", file_with_ext, mixmonargs);
+
+ if (ast_start_mixmonitor(qe->chan, file_with_ext, mixmonargs)) {
+ ast_log(LOG_WARNING, "Unable to start mixmonitor. Is the MixMonitor app loaded?\n");
+ }
+}
+
/*!
* \internal
* \brief A large function which calls members, updates statistics, and bridges the caller and a member
@@ -5361,9 +5982,7 @@ static int try_calling(struct queue_ent *qe, struct ast_flags opts, char **opt_a
int x=0;
char *announce = NULL;
char digit = 0;
-#if 0 // BUGBUG
time_t callstart;
-#endif // BUGBUG
time_t now = time(NULL);
struct ast_bridge_config bridge_config;
char nondataquality = 1;
@@ -5371,23 +5990,13 @@ static int try_calling(struct queue_ent *qe, struct ast_flags opts, char **opt_a
char *macroexec = NULL;
char *gosubexec = NULL;
const char *monitorfilename;
- const char *monitor_exec;
- const char *monitor_options;
- char tmpid[256], tmpid2[256];
- char meid[1024], meid2[1024];
- char mixmonargs[1512];
- struct ast_app *mixmonapp = NULL;
- char *p;
+ char tmpid[256];
+ char meid[1024];
int forwardsallowed = 1;
int block_connected_line = 0;
-#if 0 // BUGBUG
int callcompletedinsl;
-#endif // BUGBUG
struct ao2_iterator memi;
struct ast_datastore *datastore;
-#if 0 // BUGBUG
- struct ast_datastore *transfer_ds;
-#endif // BUGBUG
struct queue_end_bridge *queue_end_bridge = NULL;
ast_channel_lock(qe->chan);
@@ -5654,9 +6263,7 @@ static int try_calling(struct queue_ent *qe, struct ast_flags opts, char **opt_a
time(&now);
recalc_holdtime(qe, (now - qe->start));
ao2_lock(qe->parent);
-#if 0 // BUGBUG
callcompletedinsl = ((now - qe->start) <= qe->parent->servicelevel);
-#endif // BUGBUG
ao2_unlock(qe->parent);
member = lpeer->member;
/* Increment the refcount for this member, since we're going to be using it for awhile in here. */
@@ -5810,96 +6417,7 @@ static int try_calling(struct queue_ent *qe, struct ast_flags opts, char **opt_a
ast_monitor_setjoinfiles(which, 1);
}
} else {
- mixmonapp = pbx_findapp("MixMonitor");
-
- if (mixmonapp) {
- ast_debug(1, "Starting MixMonitor as requested.\n");
- if (!monitorfilename) {
- if (qe->chan) {
- ast_copy_string(tmpid, ast_channel_uniqueid(qe->chan), sizeof(tmpid));
- } else {
- snprintf(tmpid, sizeof(tmpid), "chan-%lx", ast_random());
- }
- } else {
- const char *m = monitorfilename;
- for (p = tmpid2; p < tmpid2 + sizeof(tmpid2) - 1; p++, m++) {
- switch (*m) {
- case '^':
- if (*(m + 1) == '{')
- *p = '$';
- break;
- case ',':
- *p++ = '\\';
- /* Fall through */
- default:
- *p = *m;
- }
- if (*m == '\0')
- break;
- }
- if (p == tmpid2 + sizeof(tmpid2))
- tmpid2[sizeof(tmpid2) - 1] = '\0';
-
- pbx_substitute_variables_helper(qe->chan, tmpid2, tmpid, sizeof(tmpid) - 1);
- }
-
- ast_channel_lock(qe->chan);
- if ((monitor_exec = pbx_builtin_getvar_helper(qe->chan, "MONITOR_EXEC"))) {
- monitor_exec = ast_strdupa(monitor_exec);
- }
- if ((monitor_options = pbx_builtin_getvar_helper(qe->chan, "MONITOR_OPTIONS"))) {
- monitor_options = ast_strdupa(monitor_options);
- } else {
- monitor_options = "";
- }
- ast_channel_unlock(qe->chan);
-
- if (monitor_exec) {
- const char *m = monitor_exec;
- for (p = meid2; p < meid2 + sizeof(meid2) - 1; p++, m++) {
- switch (*m) {
- case '^':
- if (*(m + 1) == '{')
- *p = '$';
- break;
- case ',':
- *p++ = '\\';
- /* Fall through */
- default:
- *p = *m;
- }
- if (*m == '\0') {
- break;
- }
- }
- if (p == meid2 + sizeof(meid2)) {
- meid2[sizeof(meid2) - 1] = '\0';
- }
-
- pbx_substitute_variables_helper(qe->chan, meid2, meid, sizeof(meid) - 1);
- }
-
- snprintf(tmpid2, sizeof(tmpid2), "%s.%s", tmpid, qe->parent->monfmt);
-
- if (!ast_strlen_zero(monitor_exec)) {
- snprintf(mixmonargs, sizeof(mixmonargs), "%s,b%s,%s", tmpid2, monitor_options, monitor_exec);
- } else {
- snprintf(mixmonargs, sizeof(mixmonargs), "%s,b%s", tmpid2, monitor_options);
- }
-
- ast_debug(1, "Arguments being passed to MixMonitor: %s\n", mixmonargs);
- /* BUGBUG
- * This needs to be done differently. We need to start a MixMonitor on
- * the actual queue bridge itself, not drop some channel out and pull it
- * back. Once the media channel work is done, start a media channel on
- * the bridge.
- *
- * Alternatively, don't use pbx_exec to put an audio hook on a channel.
- */
- pbx_exec(qe->chan, mixmonapp, mixmonargs);
- } else {
- ast_log(LOG_WARNING, "Asked to run MixMonitor on this call, but cannot find the MixMonitor app!\n");
- }
+ setup_mixmonitor(qe, monitorfilename);
}
}
/* Drop out of the queue at this point, to prepare for next caller */
@@ -6008,53 +6526,10 @@ static int try_calling(struct queue_ent *qe, struct ast_flags opts, char **opt_a
queue_t_ref(qe->parent, "For bridge_config reference");
}
-#if 0 // BUGBUG
time(&callstart);
- transfer_ds = setup_transfer_datastore(qe, member, callstart, callcompletedinsl);
-#endif // BUGBUG
- bridge = ast_bridge_call(qe->chan, peer, &bridge_config);
-
-/* BUGBUG need to do this queue logging a different way because we cannot reference peer anymore. Likely needs to be made a subscriber of stasis transfer events. */
-#if 0 // BUGBUG
- /* If the queue member did an attended transfer, then the TRANSFER already was logged in the queue_log
- * when the masquerade occurred. These other "ending" queue_log messages are unnecessary, except for
- * the AgentComplete manager event
- */
- ast_channel_lock(qe->chan);
- if (!attended_transfer_occurred(qe->chan)) {
- struct ast_datastore *tds;
-
- /* detect a blind transfer */
- if (!(ast_channel_softhangup_internal_flag(qe->chan) | ast_channel_softhangup_internal_flag(peer)) && (strcasecmp(oldcontext, ast_channel_context(qe->chan)) || strcasecmp(oldexten, ast_channel_exten(qe->chan)))) {
- ast_queue_log(queuename, ast_channel_uniqueid(qe->chan), member->membername, "TRANSFER", "%s|%s|%ld|%ld|%d",
- ast_channel_exten(qe->chan), ast_channel_context(qe->chan), (long) (callstart - qe->start),
- (long) (time(NULL) - callstart), qe->opos);
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), TRANSFER);
- } else if (ast_check_hangup(qe->chan) && !ast_check_hangup(peer)) {
- ast_queue_log(queuename, ast_channel_uniqueid(qe->chan), member->membername, "COMPLETECALLER", "%ld|%ld|%d",
- (long) (callstart - qe->start), (long) (time(NULL) - callstart), qe->opos);
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), CALLER);
- } else {
- ast_queue_log(queuename, ast_channel_uniqueid(qe->chan), member->membername, "COMPLETEAGENT", "%ld|%ld|%d",
- (long) (callstart - qe->start), (long) (time(NULL) - callstart), qe->opos);
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), AGENT);
- }
- if ((tds = ast_channel_datastore_find(qe->chan, &queue_transfer_info, NULL))) {
- ast_channel_datastore_remove(qe->chan, tds);
- }
- ast_channel_unlock(qe->chan);
- update_queue(qe->parent, member, callcompletedinsl, (time(NULL) - callstart));
- } else {
- ast_channel_unlock(qe->chan);
-
- /* We already logged the TRANSFER on the queue_log, but we still need to send the AgentComplete event */
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), TRANSFER);
- }
-
- if (transfer_ds) {
- ast_datastore_free(transfer_ds);
- }
-#endif // BUGBUG
+ setup_stasis_subs(qe, peer, member, qe->start, callstart, callcompletedinsl);
+ bridge = ast_bridge_call_with_flags(qe->chan, peer, &bridge_config,
+ AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM);
res = bridge ? bridge : 1;
ao2_ref(member, -1);
@@ -9835,6 +10310,9 @@ static const struct ast_data_entry queue_data_providers[] = {
AST_DATA_ENTRY("asterisk/application/queue/list", &queues_data_provider),
};
+static struct stasis_message_router *agent_router;
+static struct stasis_subscription *topic_forwarder;
+
static int unload_module(void)
{
int res;
@@ -9860,6 +10338,8 @@ static int unload_module(void)
stasis_message_router_remove(message_router, queue_agent_dump_type());
stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
}
+ stasis_message_router_unsubscribe_and_join(agent_router);
+ topic_forwarder = stasis_unsubscribe(topic_forwarder);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);
@@ -9937,7 +10417,9 @@ static int load_module(void)
int res;
struct ast_flags mask = {AST_FLAGS_ALL, };
struct ast_config *member_config;
- struct stasis_message_router *message_router;
+ struct stasis_message_router *manager_router;
+ struct stasis_topic *queue_topic;
+ struct stasis_topic *manager_topic;
queues = ao2_container_alloc(MAX_QUEUE_BUCKETS, queue_hash_cb, queue_cmp_cb);
@@ -10008,8 +10490,25 @@ static int load_module(void)
res = -1;
}
- message_router = ast_manager_get_message_router();
- if (!message_router) {
+ manager_topic = ast_manager_get_topic();
+ if (!manager_topic) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+ manager_router = ast_manager_get_message_router();
+ if (!manager_router) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+ queue_topic = ast_queue_topic_all();
+ if (!queue_topic) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+ topic_forwarder = stasis_forward_all(queue_topic, manager_topic);
+ if (!topic_forwarder) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
+ agent_router = stasis_message_router_create(ast_channel_topic_all());
+ if (!agent_router) {
return AST_MODULE_LOAD_DECLINE;
}
@@ -10030,76 +10529,86 @@ static int load_module(void)
STASIS_MESSAGE_TYPE_INIT(queue_agent_dump_type);
STASIS_MESSAGE_TYPE_INIT(queue_agent_ringnoanswer_type);
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_caller_join_type(),
queue_channel_manager_event,
"QueueCallerJoin");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_caller_leave_type(),
queue_channel_manager_event,
"QueueCallerLeave");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_caller_abandon_type(),
queue_channel_manager_event,
"QueueCallerAbandon");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_status_type(),
queue_member_manager_event,
"QueueMemberStatus");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_added_type(),
queue_member_manager_event,
"QueueMemberAdded");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_removed_type(),
queue_member_manager_event,
"QueueMemberRemoved");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_pause_type(),
queue_member_manager_event,
"QueueMemberPause");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_penalty_type(),
queue_member_manager_event,
"QueueMemberPenalty");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_ringinuse_type(),
queue_member_manager_event,
"QueueMemberRinginuse");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_called_type(),
queue_multi_channel_manager_event,
"AgentCalled");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_connect_type(),
queue_multi_channel_manager_event,
"AgentConnect");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_complete_type(),
queue_multi_channel_manager_event,
"AgentComplete");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_dump_type(),
queue_multi_channel_manager_event,
"AgentDump");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_ringnoanswer_type(),
queue_multi_channel_manager_event,
"AgentRingNoAnswer");
+ stasis_message_router_add(agent_router,
+ ast_channel_agent_login_type(),
+ queue_agent_cb,
+ NULL);
+
+ stasis_message_router_add(agent_router,
+ ast_channel_agent_logoff_type(),
+ queue_agent_cb,
+ NULL);
+
ast_extension_state_add(NULL, NULL, extension_state_cb, NULL);
return res ? AST_MODULE_LOAD_DECLINE : 0;