summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apps/app_queue.c149
-rw-r--r--configs/samples/sip.conf.sample1
-rw-r--r--funcs/func_odbc.c314
-rw-r--r--include/asterisk/bridge_channel_internal.h38
-rw-r--r--main/bridge.c211
-rw-r--r--main/bridge_channel.c26
-rw-r--r--res/res_agi.c40
-rw-r--r--res/stasis/control.c12
-rw-r--r--res/stasis/control.h18
-rw-r--r--res/stasis/stasis_bridge.c54
10 files changed, 627 insertions, 236 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c
index 40aed2fe5..927c4967e 100644
--- a/apps/app_queue.c
+++ b/apps/app_queue.c
@@ -1554,7 +1554,6 @@ struct member {
struct call_queue *lastqueue; /*!< Last queue we received a call */
unsigned int dead:1; /*!< Used to detect members deleted in realtime */
unsigned int delme:1; /*!< Flag to delete entry on reload */
- unsigned int call_pending:1; /*!< TRUE if the Q is attempting to place a call to the member. */
char rt_uniqueid[80]; /*!< Unique id of realtime member entry */
unsigned int ringinuse:1; /*!< Flag to ring queue members even if their status is 'inuse' */
};
@@ -2289,6 +2288,70 @@ static int get_member_status(struct call_queue *q, int max_penalty, int min_pena
return -1;
}
+/*
+ * A "pool" of member objects that calls are currently pending on. If an
+ * agent is a member of multiple queues it's possible for that agent to be
+ * called by each of the queues at the same time. This happens because device
+ * state is slow to notify the queue app of one of it's member's being rung.
+ * This "pool" allows us to track which members are currently being rung while
+ * we wait on the device state change.
+ */
+static struct ao2_container *pending_members;
+#define MAX_CALL_ATTEMPT_BUCKETS 353
+
+static int pending_members_hash(const void *obj, const int flags)
+{
+ const struct member *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = object->interface;
+ break;
+ default:
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_case_hash(key);
+}
+
+static int pending_members_cmp(void *obj, void *arg, int flags)
+{
+ const struct member *object_left = obj;
+ const struct member *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->interface;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcasecmp(object_left->interface, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /* Not supported by container. */
+ ast_assert(0);
+ return 0;
+ default:
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ return CMP_MATCH;
+}
+
+static void pending_members_remove(struct member *mem)
+{
+ ao2_find(pending_members, mem, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK);
+}
+
/*! \brief set a member's status based on device state of that member's state_interface.
*
* Lock interface list find sc, iterate through each queues queue_member list for member to
@@ -2298,6 +2361,9 @@ static void update_status(struct call_queue *q, struct member *m, const int stat
{
m->status = status;
+ /* Whatever the status is clear the member from the pending members pool */
+ pending_members_remove(m);
+
queue_publish_member_blob(queue_member_status_type(), queue_member_blob_create(q, m));
}
@@ -3157,6 +3223,7 @@ static void member_add_to_queue(struct call_queue *queue, struct member *mem)
*/
static void member_remove_from_queue(struct call_queue *queue, struct member *mem)
{
+ pending_members_remove(mem);
ao2_lock(queue->members);
ast_devstate_changed(QUEUE_UNKNOWN_PAUSED_DEVSTATE, AST_DEVSTATE_CACHABLE, "Queue:%s_pause_%s", queue->name, mem->interface);
queue_member_follower_removal(queue, mem);
@@ -4135,41 +4202,6 @@ static int member_status_available(int status)
/*!
* \internal
- * \brief Clear the member call pending flag.
- *
- * \param mem Queue member.
- *
- * \return Nothing
- */
-static void member_call_pending_clear(struct member *mem)
-{
- ao2_lock(mem);
- mem->call_pending = 0;
- ao2_unlock(mem);
-}
-
-/*!
- * \internal
- * \brief Set the member call pending flag.
- *
- * \param mem Queue member.
- *
- * \retval non-zero if call pending flag was already set.
- */
-static int member_call_pending_set(struct member *mem)
-{
- int old_pending;
-
- ao2_lock(mem);
- old_pending = mem->call_pending;
- mem->call_pending = 1;
- ao2_unlock(mem);
-
- return old_pending;
-}
-
-/*!
- * \internal
* \brief Determine if can ring a queue entry.
*
* \param qe Queue entry to check.
@@ -4210,13 +4242,32 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
}
if (!call->member->ringinuse) {
- if (member_call_pending_set(call->member)) {
- ast_debug(1, "%s has another call pending, can't receive call\n",
- call->interface);
+ struct member *mem;
+
+ ao2_lock(pending_members);
+
+ mem = ao2_find(pending_members, call->member,
+ OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
+ if (mem) {
+ /*
+ * If found that means this member is currently being attempted
+ * from another calling thread, so stop trying from this thread
+ */
+ ast_debug(1, "%s has another call trying, can't receive call\n",
+ call->interface);
+ ao2_ref(mem, -1);
+ ao2_unlock(pending_members);
return 0;
}
/*
+ * If not found add it to the container so another queue
+ * won't attempt to call this member at the same time.
+ */
+ ao2_link(pending_members, call->member);
+ ao2_unlock(pending_members);
+
+ /*
* The queue member is available. Get current status to be sure
* because the device state and extension state callbacks may
* not have updated the status yet.
@@ -4224,7 +4275,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
if (!member_status_available(get_queue_member_status(call->member))) {
ast_debug(1, "%s actually not available, can't receive call\n",
call->interface);
- member_call_pending_clear(call->member);
+ pending_members_remove(call->member);
return 0;
}
}
@@ -4261,7 +4312,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
++*busies;
return 0;
}
- ast_assert(tmp->member->ringinuse || tmp->member->call_pending);
ast_copy_string(tech, tmp->interface, sizeof(tech));
if ((location = strchr(tech, '/'))) {
@@ -4278,7 +4328,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
qe->linpos++;
ao2_unlock(qe->parent);
- member_call_pending_clear(tmp->member);
+ pending_members_remove(tmp->member);
publish_dial_end_event(qe->chan, tmp, NULL, "BUSY");
tmp->stillgoing = 0;
@@ -4349,7 +4399,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
/* Again, keep going even if there's an error */
ast_verb(3, "Couldn't call %s\n", tmp->interface);
do_hang(tmp);
- member_call_pending_clear(tmp->member);
+ pending_members_remove(tmp->member);
++*busies;
return 0;
}
@@ -4369,7 +4419,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
ast_verb(3, "Called %s\n", tmp->interface);
- member_call_pending_clear(tmp->member);
return 1;
}
@@ -9599,7 +9648,7 @@ static int manager_queues_summary(struct mansession *s, const struct message *m)
ao2_lock(q);
/* List queue properties */
- if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) {
+ if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) {
/* Reset the necessary local variables if no queuefilter is set*/
qmemcount = 0;
qmemavail = 0;
@@ -9677,7 +9726,7 @@ static int manager_queues_status(struct mansession *s, const struct message *m)
ao2_lock(q);
/* List queue properties */
- if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) {
+ if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) {
sl = ((q->callscompleted > 0) ? 100 * ((float)q->callscompletedinsl / (float)q->callscompleted) : 0);
astman_append(s, "Event: QueueParams\r\n"
"Queue: %s\r\n"
@@ -10934,6 +10983,7 @@ static int unload_module(void)
ast_extension_state_del(0, extension_state_cb);
ast_unload_realtime("queue_members");
+ ao2_cleanup(pending_members);
ao2_cleanup(queues);
queues = NULL;
return 0;
@@ -10962,6 +11012,13 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
+ pending_members = ao2_container_alloc(
+ MAX_CALL_ATTEMPT_BUCKETS, pending_members_hash, pending_members_cmp);
+ if (!pending_members) {
+ unload_module();
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
use_weight = 0;
if (reload_handler(0, &mask, NULL)) {
diff --git a/configs/samples/sip.conf.sample b/configs/samples/sip.conf.sample
index d89a2a157..8f28e2680 100644
--- a/configs/samples/sip.conf.sample
+++ b/configs/samples/sip.conf.sample
@@ -1509,7 +1509,6 @@ srvlookup=yes ; Enable DNS SRV lookups on outbound calls
;allow=ulaw
;allow=alaw
;mailbox=1234@default,1233@default ; Subscribe to status of multiple mailboxes
-;registertrying=yes ; Send a 100 Trying when the device registers.
;[snom]
;type=friend ; Friends place calls and receive calls
diff --git a/funcs/func_odbc.c b/funcs/func_odbc.c
index 23930ed4d..ca15d703f 100644
--- a/funcs/func_odbc.c
+++ b/funcs/func_odbc.c
@@ -137,6 +137,163 @@ struct odbc_datastore {
char names[0];
};
+/* \brief Data source name
+ *
+ * This holds data that pertains to a DSN
+ */
+struct dsn {
+ /*! A connection to the database */
+ struct odbc_obj *connection;
+ /*! The name of the DSN as defined in res_odbc.conf */
+ char name[0];
+};
+
+#define DSN_BUCKETS 37
+
+struct ao2_container *dsns;
+
+static int dsn_hash(const void *obj, const int flags)
+{
+ const struct dsn *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = object->name;
+ break;
+ default:
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_hash(key);
+}
+
+static int dsn_cmp(void *obj, void *arg, int flags)
+{
+ const struct dsn *object_left = obj;
+ const struct dsn *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->name;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcmp(object_left->name, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ cmp = strncmp(object_left->name, right_key, strlen(right_key));
+ break;
+ default:
+ cmp = 0;
+ break;
+ }
+
+ if (cmp) {
+ return 0;
+ }
+
+ return CMP_MATCH;
+}
+
+static void dsn_destructor(void *obj)
+{
+ struct dsn *dsn = obj;
+
+ if (dsn->connection) {
+ ast_odbc_release_obj(dsn->connection);
+ }
+}
+
+/*!
+ * \brief Create a DSN and connect to the database
+ *
+ * \param name The name of the DSN as found in res_odbc.conf
+ * \retval NULL Fail
+ * \retval non-NULL The newly-created structure
+ */
+static struct dsn *create_dsn(const char *name)
+{
+ struct dsn *dsn;
+
+ dsn = ao2_alloc(sizeof(*dsn) + strlen(name) + 1, dsn_destructor);
+ if (!dsn) {
+ return NULL;
+ }
+
+ /* Safe */
+ strcpy(dsn->name, name);
+
+ dsn->connection = ast_odbc_request_obj(name, 0);
+ if (!dsn->connection) {
+ ao2_ref(dsn, -1);
+ return NULL;
+ }
+
+ if (!ao2_link_flags(dsns, dsn, OBJ_NOLOCK)) {
+ ao2_ref(dsn, -1);
+ return NULL;
+ }
+
+ return dsn;
+}
+
+/*!
+ * \brief Retrieve a DSN, or create it if it does not exist.
+ *
+ * The created DSN is returned locked. This should be inconsequential
+ * to callers in most cases.
+ *
+ * When finished with the returned structure, the caller must call
+ * \ref release_dsn
+ *
+ * \param name Name of the DSN as found in res_odbc.conf
+ * \retval NULL Unable to retrieve or create the DSN
+ * \retval non-NULL The retrieved/created locked DSN
+ */
+static struct dsn *get_dsn(const char *name)
+{
+ struct dsn *dsn;
+
+ ao2_lock(dsns);
+ dsn = ao2_find(dsns, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!dsn) {
+ dsn = create_dsn(name);
+ }
+ ao2_unlock(dsns);
+
+ if (!dsn) {
+ return NULL;
+ }
+
+ ao2_lock(dsn->connection);
+
+ return dsn;
+}
+
+/*!
+ * \brief Unlock and unreference a DSN
+ *
+ * \param dsn The dsn to unlock and unreference
+ * \return NULL
+ */
+static void *release_dsn(struct dsn *dsn)
+{
+ if (!dsn) {
+ return NULL;
+ }
+
+ ao2_unlock(dsn->connection);
+ ao2_ref(dsn, -1);
+
+ return NULL;
+}
+
static AST_RWLIST_HEAD_STATIC(queries, acf_odbc_query);
static int resultcount = 0;
@@ -214,7 +371,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
struct odbc_obj *obj = NULL;
struct acf_odbc_query *query;
char *t, varname[15];
- int i, dsn, bogus_chan = 0;
+ int i, dsn_num, bogus_chan = 0;
int transactional = 0;
AST_DECLARE_APP_ARGS(values,
AST_APP_ARG(field)[100];
@@ -227,6 +384,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
struct ast_str *buf = ast_str_thread_get(&sql_buf, 16);
struct ast_str *insertbuf = ast_str_thread_get(&sql2_buf, 16);
const char *status = "FAILURE";
+ struct dsn *dsn = NULL;
if (!buf || !insertbuf) {
return -1;
@@ -324,17 +482,21 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
* to multiple DSNs. We MUST have a single handle all the way through the
* transaction, or else we CANNOT enforce atomicity.
*/
- for (dsn = 0; dsn < 5; dsn++) {
- if (!ast_strlen_zero(query->writehandle[dsn])) {
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (!ast_strlen_zero(query->writehandle[dsn_num])) {
if (transactional) {
/* This can only happen second time through or greater. */
ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
}
- if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+ if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
transactional = 1;
} else {
- obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+ dsn = get_dsn(query->writehandle[dsn_num]);
+ if (!dsn) {
+ continue;
+ }
+ obj = dsn->connection;
transactional = 0;
}
@@ -342,10 +504,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
break;
}
- if (obj && !transactional) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
}
}
@@ -358,25 +517,25 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
status = "SUCCESS";
} else if (query->sql_insert) {
- if (obj && !transactional) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
- for (transactional = 0, dsn = 0; dsn < 5; dsn++) {
- if (!ast_strlen_zero(query->writehandle[dsn])) {
+ for (transactional = 0, dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (!ast_strlen_zero(query->writehandle[dsn_num])) {
if (transactional) {
/* This can only happen second time through or greater. */
ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
} else if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
}
- if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+ if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
transactional = 1;
} else {
- obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+ dsn = get_dsn(query->writehandle[dsn_num]);
+ if (!dsn) {
+ continue;
+ }
+ obj = dsn->connection;
transactional = 0;
}
if (obj) {
@@ -406,10 +565,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
}
- if (obj && !transactional) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
ast_autoservice_stop(chan);
@@ -420,11 +576,10 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, char *buf, size_t len)
{
- struct odbc_obj *obj = NULL;
struct acf_odbc_query *query;
char varname[15], rowcount[12] = "-1";
struct ast_str *colnames = ast_str_thread_get(&colnames_buf, 16);
- int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn, bogus_chan = 0;
+ int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn_num, bogus_chan = 0;
AST_DECLARE_APP_ARGS(args,
AST_APP_ARG(field)[100];
);
@@ -436,6 +591,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
struct odbc_datastore_row *row = NULL;
struct ast_str *sql = ast_str_thread_get(&sql_buf, 16);
const char *status = "FAILURE";
+ struct dsn *dsn = NULL;
if (!sql || !colnames) {
if (chan) {
@@ -523,28 +679,23 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
}
AST_RWLIST_UNLOCK(&queries);
- for (dsn = 0; dsn < 5; dsn++) {
- if (!ast_strlen_zero(query->readhandle[dsn])) {
- obj = ast_odbc_request_obj(query->readhandle[dsn], 0);
- if (obj) {
- stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql));
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (!ast_strlen_zero(query->readhandle[dsn_num])) {
+ dsn = get_dsn(query->readhandle[dsn_num]);
+ if (!dsn) {
+ continue;
}
+ stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql));
}
if (stmt) {
break;
}
- if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
}
if (!stmt) {
ast_log(LOG_ERROR, "Unable to execute query [%s]\n", ast_str_buffer(sql));
- if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
ast_autoservice_stop(chan);
@@ -558,8 +709,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
SQLCloseCursor(stmt);
SQLFreeHandle (SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
ast_autoservice_stop(chan);
@@ -583,8 +733,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
}
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
@@ -607,8 +756,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
odbc_datastore_free(resultset);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
ast_autoservice_stop(chan);
@@ -640,8 +788,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
odbc_datastore_free(resultset);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
@@ -750,8 +897,7 @@ end_acf_read:
odbc_datastore_free(resultset);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
ast_autoservice_stop(chan);
return -1;
@@ -764,8 +910,7 @@ end_acf_read:
}
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (resultset && !multirow) {
/* Fetch the first resultset */
if (!acf_fetch(chan, "", buf, buf, len)) {
@@ -1192,8 +1337,8 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
if (a->argc == 5 && !strcmp(a->argv[4], "exec")) {
/* Execute the query */
- struct odbc_obj *obj = NULL;
- int dsn, executed = 0;
+ struct dsn *dsn = NULL;
+ int dsn_num, executed = 0;
SQLHSTMT stmt;
int rows = 0, res, x;
SQLSMALLINT colcount = 0, collength;
@@ -1207,19 +1352,18 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_SUCCESS;
}
- for (dsn = 0; dsn < 5; dsn++) {
- if (ast_strlen_zero(query->readhandle[dsn])) {
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (ast_strlen_zero(query->readhandle[dsn_num])) {
continue;
}
- ast_debug(1, "Found handle %s\n", query->readhandle[dsn]);
- if (!(obj = ast_odbc_request_obj(query->readhandle[dsn], 0))) {
+ dsn = get_dsn(query->readhandle[dsn_num]);
+ if (!dsn) {
continue;
}
+ ast_debug(1, "Found handle %s\n", query->readhandle[dsn_num]);
- ast_debug(1, "Got obj\n");
- if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
- ast_odbc_release_obj(obj);
- obj = NULL;
+ if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+ dsn = release_dsn(dsn);
continue;
}
@@ -1230,8 +1374,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
ast_cli(a->fd, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
SQLCloseCursor(stmt);
SQLFreeHandle (SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
AST_RWLIST_UNLOCK(&queries);
return CLI_SUCCESS;
}
@@ -1240,10 +1383,9 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (res == SQL_NO_DATA) {
- ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn, query->readhandle[dsn], ast_str_buffer(sql));
+ ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn_num, query->readhandle[dsn_num], ast_str_buffer(sql));
break;
} else {
ast_cli(a->fd, "Error %d in FETCH [%s]\n", res, ast_str_buffer(sql));
@@ -1270,8 +1412,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
ast_cli(a->fd, "SQL Get Data error %d!\n[%s]\n\n", res, ast_str_buffer(sql));
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
AST_RWLIST_UNLOCK(&queries);
return CLI_SUCCESS;
}
@@ -1289,15 +1430,11 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
}
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
- ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn, query->readhandle[dsn]);
+ dsn = release_dsn(dsn);
+ ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn_num, query->readhandle[dsn_num]);
break;
}
- if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
if (!executed) {
ast_cli(a->fd, "Failed to execute query. [%s]\n", ast_str_buffer(sql));
@@ -1420,30 +1557,29 @@ static char *cli_odbc_write(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
if (a->argc == 6 && !strcmp(a->argv[5], "exec")) {
/* Execute the query */
- struct odbc_obj *obj = NULL;
- int dsn, executed = 0;
+ struct dsn *dsn;
+ int dsn_num, executed = 0;
SQLHSTMT stmt;
SQLLEN rows = -1;
- for (dsn = 0; dsn < 5; dsn++) {
- if (ast_strlen_zero(query->writehandle[dsn])) {
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (ast_strlen_zero(query->writehandle[dsn_num])) {
continue;
}
- if (!(obj = ast_odbc_request_obj(query->writehandle[dsn], 0))) {
+ dsn = get_dsn(query->writehandle[dsn_num]);
+ if (!dsn) {
continue;
}
- if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
- ast_odbc_release_obj(obj);
- obj = NULL;
+ if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+ dsn = release_dsn(dsn);
continue;
}
SQLRowCount(stmt, &rows);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
- ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn, query->writehandle[dsn]);
+ dsn = release_dsn(dsn);
+ ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn_num, query->writehandle[dsn_num]);
executed = 1;
break;
}
@@ -1470,6 +1606,11 @@ static int load_module(void)
char *catg;
struct ast_flags config_flags = { 0 };
+ dsns = ao2_container_alloc(DSN_BUCKETS, dsn_hash, dsn_cmp);
+ if (!dsns) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
res |= ast_custom_function_register(&fetch_function);
res |= ast_register_application_xml(app_odbcfinish, exec_odbcfinish);
AST_RWLIST_WRLOCK(&queries);
@@ -1478,6 +1619,7 @@ static int load_module(void)
if (!cfg || cfg == CONFIG_STATUS_FILEINVALID) {
ast_log(LOG_NOTICE, "Unable to load config for func_odbc: %s\n", config);
AST_RWLIST_UNLOCK(&queries);
+ ao2_ref(dsns, -1);
return AST_MODULE_LOAD_DECLINE;
}
@@ -1531,6 +1673,8 @@ static int unload_module(void)
AST_RWLIST_WRLOCK(&queries);
AST_RWLIST_UNLOCK(&queries);
+
+ ao2_ref(dsns, -1);
return res;
}
diff --git a/include/asterisk/bridge_channel_internal.h b/include/asterisk/bridge_channel_internal.h
index 7f7d5a88b..fb8e781e8 100644
--- a/include/asterisk/bridge_channel_internal.h
+++ b/include/asterisk/bridge_channel_internal.h
@@ -151,47 +151,20 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel,
void bridge_channel_internal_pull(struct ast_bridge_channel *bridge_channel);
/*!
- * \brief Internal bridge channel wait condition and associated result.
- */
-struct bridge_channel_internal_cond {
- /*! Lock for the data structure */
- ast_mutex_t lock;
- /*! Wait condition */
- ast_cond_t cond;
- /*! Wait until done */
- int done;
- /*! The bridge channel */
- struct ast_bridge_channel *bridge_channel;
-};
-
-/*!
- * \internal
- * \brief Wait for the expected signal.
- * \since 13.5.0
- *
- * \param cond the wait object
- *
- * \return Nothing
- */
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond);
-
-/*!
- * \internal
- * \brief Signal the condition wait.
- * \since 13.5.0
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
*
- * \param cond the wait object
+ * \param chan Channel imparted that we need to signal.
*
* \return Nothing
*/
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
+void bridge_channel_impart_signal(struct ast_channel *chan);
/*!
* \internal
* \brief Join the bridge_channel to the bridge (blocking)
*
* \param bridge_channel The Channel in the bridge
- * \param cond data used for signaling
*
* \note The bridge_channel->swap holds a channel reference for the swap
* channel going into the bridging system. The ref ensures that the swap
@@ -206,8 +179,7 @@ void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
* \retval 0 bridge channel successfully joined the bridge
* \retval -1 bridge channel failed to join the bridge
*/
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
- struct bridge_channel_internal_cond *cond);
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel);
/*!
* \internal
diff --git a/main/bridge.c b/main/bridge.c
index a56555bc9..c28d49c02 100644
--- a/main/bridge.c
+++ b/main/bridge.c
@@ -1483,6 +1483,150 @@ void ast_bridge_notify_masquerade(struct ast_channel *chan)
ao2_ref(bridge_channel, -1);
}
+/*!
+ * \brief Internal bridge impart wait condition and associated conditional.
+ */
+struct bridge_channel_impart_cond {
+ AST_LIST_ENTRY(bridge_channel_impart_cond) node;
+ /*! Lock for the data structure */
+ ast_mutex_t lock;
+ /*! Wait condition */
+ ast_cond_t cond;
+ /*! Wait until done */
+ int done;
+};
+
+AST_LIST_HEAD_NOLOCK(bridge_channel_impart_ds_head, bridge_channel_impart_cond);
+
+/*!
+ * \internal
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
+ *
+ * \param ds_head List of imparting threads to wake up.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_signal(struct bridge_channel_impart_ds_head *ds_head)
+{
+ if (ds_head) {
+ struct bridge_channel_impart_cond *cond;
+
+ while ((cond = AST_LIST_REMOVE_HEAD(ds_head, node))) {
+ ast_mutex_lock(&cond->lock);
+ cond->done = 1;
+ ast_cond_signal(&cond->cond);
+ ast_mutex_unlock(&cond->lock);
+ }
+ }
+}
+
+static void bridge_channel_impart_ds_head_dtor(void *doomed)
+{
+ bridge_channel_impart_ds_head_signal(doomed);
+ ast_free(doomed);
+}
+
+/*!
+ * \internal
+ * \brief Fixup the bridge impart datastore.
+ * \since 13.9.0
+ *
+ * \param data Bridge impart datastore data to fixup from old_chan.
+ * \param old_chan The datastore is moving from this channel.
+ * \param new_chan The datastore is moving to this channel.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+{
+ /*
+ * Signal any waiting impart threads. The masquerade is going to kill
+ * old_chan and we don't need to be waiting on new_chan.
+ */
+ bridge_channel_impart_ds_head_signal(data);
+}
+
+static const struct ast_datastore_info bridge_channel_impart_ds_info = {
+ .type = "bridge-impart-ds",
+ .destroy = bridge_channel_impart_ds_head_dtor,
+ .chan_fixup = bridge_channel_impart_ds_head_fixup,
+};
+
+/*!
+ * \internal
+ * \brief Add impart wait datastore conditional to channel.
+ * \since 13.9.0
+ *
+ * \param chan Channel to add the impart wait conditional.
+ * \param cond Imparting conditional to add.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int bridge_channel_impart_add(struct ast_channel *chan, struct bridge_channel_impart_cond *cond)
+{
+ struct ast_datastore *datastore;
+ struct bridge_channel_impart_ds_head *ds_head;
+
+ ast_channel_lock(chan);
+
+ datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+ if (!datastore) {
+ datastore = ast_datastore_alloc(&bridge_channel_impart_ds_info, NULL);
+ if (!datastore) {
+ ast_channel_unlock(chan);
+ return -1;
+ }
+ ds_head = ast_calloc(1, sizeof(*ds_head));
+ if (!ds_head) {
+ ast_channel_unlock(chan);
+ ast_datastore_free(datastore);
+ return -1;
+ }
+ datastore->data = ds_head;
+ ast_channel_datastore_add(chan, datastore);
+ } else {
+ ds_head = datastore->data;
+ ast_assert(ds_head != NULL);
+ }
+
+ AST_LIST_INSERT_TAIL(ds_head, cond, node);
+
+ ast_channel_unlock(chan);
+ return 0;
+}
+
+void bridge_channel_impart_signal(struct ast_channel *chan)
+{
+ struct ast_datastore *datastore;
+
+ ast_channel_lock(chan);
+ datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+ if (datastore) {
+ bridge_channel_impart_ds_head_signal(datastore->data);
+ }
+ ast_channel_unlock(chan);
+}
+
+/*!
+ * \internal
+ * \brief Block imparting channel thread until signaled.
+ * \since 13.9.0
+ *
+ * \param cond Imparting conditional to wait for.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_wait(struct bridge_channel_impart_cond *cond)
+{
+ ast_mutex_lock(&cond->lock);
+ while (!cond->done) {
+ ast_cond_wait(&cond->cond, &cond->lock);
+ }
+ ast_mutex_unlock(&cond->lock);
+}
+
/*
* XXX ASTERISK-21271 make ast_bridge_join() require features to be allocated just like ast_bridge_impart() and not expect the struct back.
*
@@ -1551,7 +1695,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
}
if (!res) {
- res = bridge_channel_internal_join(bridge_channel, NULL);
+ res = bridge_channel_internal_join(bridge_channel);
}
/* Cleanup all the data in the bridge channel after it leaves the bridge. */
@@ -1568,6 +1712,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
join_exit:;
ast_bridge_run_after_callback(chan);
+ bridge_channel_impart_signal(chan);
if (!(ast_channel_softhangup_internal_flag(chan) & AST_SOFTHANGUP_ASYNCGOTO)
&& !ast_bridge_setup_after_goto(chan)) {
/* Claim the after bridge goto is an async goto destination. */
@@ -1581,14 +1726,13 @@ join_exit:;
/*! \brief Thread responsible for imparted bridged channels to be departed */
static void *bridge_channel_depart_thread(void *data)
{
- struct bridge_channel_internal_cond *cond = data;
- struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+ struct ast_bridge_channel *bridge_channel = data;
if (bridge_channel->callid) {
ast_callid_threadassoc_add(bridge_channel->callid);
}
- bridge_channel_internal_join(bridge_channel, cond);
+ bridge_channel_internal_join(bridge_channel);
/*
* cleanup
@@ -1601,6 +1745,8 @@ static void *bridge_channel_depart_thread(void *data)
bridge_channel->features = NULL;
ast_bridge_discard_after_callback(bridge_channel->chan, AST_BRIDGE_AFTER_CB_REASON_DEPART);
+ /* If join failed there will be impart threads waiting. */
+ bridge_channel_impart_signal(bridge_channel->chan);
ast_bridge_discard_after_goto(bridge_channel->chan);
return NULL;
@@ -1609,15 +1755,14 @@ static void *bridge_channel_depart_thread(void *data)
/*! \brief Thread responsible for independent imparted bridged channels */
static void *bridge_channel_ind_thread(void *data)
{
- struct bridge_channel_internal_cond *cond = data;
- struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+ struct ast_bridge_channel *bridge_channel = data;
struct ast_channel *chan;
if (bridge_channel->callid) {
ast_callid_threadassoc_add(bridge_channel->callid);
}
- bridge_channel_internal_join(bridge_channel, cond);
+ bridge_channel_internal_join(bridge_channel);
chan = bridge_channel->chan;
/* cleanup */
@@ -1634,15 +1779,18 @@ static void *bridge_channel_ind_thread(void *data)
ao2_ref(bridge_channel, -1);
ast_bridge_run_after_callback(chan);
+ /* If join failed there will be impart threads waiting. */
+ bridge_channel_impart_signal(chan);
ast_bridge_run_after_goto(chan);
return NULL;
}
-int ast_bridge_impart(struct ast_bridge *bridge,
+static int bridge_impart_internal(struct ast_bridge *bridge,
struct ast_channel *chan,
struct ast_channel *swap,
struct ast_bridge_features *features,
- enum ast_bridge_impart_flags flags)
+ enum ast_bridge_impart_flags flags,
+ struct bridge_channel_impart_cond *cond)
{
int res = 0;
struct ast_bridge_channel *bridge_channel;
@@ -1701,27 +1849,20 @@ int ast_bridge_impart(struct ast_bridge *bridge,
/* Actually create the thread that will handle the channel */
if (!res) {
- struct bridge_channel_internal_cond cond = {
- .done = 0,
- .bridge_channel = bridge_channel
- };
- ast_mutex_init(&cond.lock);
- ast_cond_init(&cond.cond, NULL);
-
+ res = bridge_channel_impart_add(chan, cond);
+ }
+ if (!res) {
if ((flags & AST_BRIDGE_IMPART_CHAN_MASK) == AST_BRIDGE_IMPART_CHAN_INDEPENDENT) {
res = ast_pthread_create_detached(&bridge_channel->thread, NULL,
- bridge_channel_ind_thread, &cond);
+ bridge_channel_ind_thread, bridge_channel);
} else {
res = ast_pthread_create(&bridge_channel->thread, NULL,
- bridge_channel_depart_thread, &cond);
+ bridge_channel_depart_thread, bridge_channel);
}
if (!res) {
- bridge_channel_internal_wait(&cond);
+ bridge_channel_impart_wait(cond);
}
-
- ast_cond_destroy(&cond.cond);
- ast_mutex_destroy(&cond.lock);
}
if (res) {
@@ -1742,6 +1883,32 @@ int ast_bridge_impart(struct ast_bridge *bridge,
return 0;
}
+int ast_bridge_impart(struct ast_bridge *bridge,
+ struct ast_channel *chan,
+ struct ast_channel *swap,
+ struct ast_bridge_features *features,
+ enum ast_bridge_impart_flags flags)
+{
+ struct bridge_channel_impart_cond cond = {
+ .done = 0,
+ };
+ int res;
+
+ ast_mutex_init(&cond.lock);
+ ast_cond_init(&cond.cond, NULL);
+
+ res = bridge_impart_internal(bridge, chan, swap, features, flags, &cond);
+ if (res) {
+ /* Impart failed. Signal any other waiting impart threads */
+ bridge_channel_impart_signal(chan);
+ }
+
+ ast_cond_destroy(&cond.cond);
+ ast_mutex_destroy(&cond.lock);
+
+ return res;
+}
+
int ast_bridge_depart(struct ast_channel *chan)
{
struct ast_bridge_channel *bridge_channel;
diff --git a/main/bridge_channel.c b/main/bridge_channel.c
index 66f26eefe..db4ecfe57 100644
--- a/main/bridge_channel.c
+++ b/main/bridge_channel.c
@@ -2637,27 +2637,7 @@ static void bridge_channel_event_join_leave(struct ast_bridge_channel *bridge_ch
ao2_iterator_destroy(&iter);
}
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond)
-{
- ast_mutex_lock(&cond->lock);
- while (!cond->done) {
- ast_cond_wait(&cond->cond, &cond->lock);
- }
- ast_mutex_unlock(&cond->lock);
-}
-
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond)
-{
- if (cond) {
- ast_mutex_lock(&cond->lock);
- cond->done = 1;
- ast_cond_signal(&cond->cond);
- ast_mutex_unlock(&cond->lock);
- }
-}
-
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
- struct bridge_channel_internal_cond *cond)
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel)
{
int res = 0;
struct ast_bridge_features *channel_features;
@@ -2687,7 +2667,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
bridge_channel->bridge->uniqueid,
bridge_channel,
ast_channel_name(bridge_channel->chan));
- bridge_channel_internal_signal(cond);
return -1;
}
ast_channel_internal_bridge_set(bridge_channel->chan, bridge_channel->bridge);
@@ -2722,8 +2701,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
}
bridge_reconfigured(bridge_channel->bridge, !bridge_channel->inhibit_colp);
- bridge_channel_internal_signal(cond);
-
if (bridge_channel->state == BRIDGE_CHANNEL_STATE_WAIT) {
/*
* Indicate a source change since this channel is entering the
@@ -2735,6 +2712,7 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
ast_indicate(bridge_channel->chan, AST_CONTROL_SRCCHANGE);
}
+ bridge_channel_impart_signal(bridge_channel->chan);
ast_bridge_unlock(bridge_channel->bridge);
/* Must release any swap ref after unlocking the bridge. */
diff --git a/res/res_agi.c b/res/res_agi.c
index f6ce74960..e8249e202 100644
--- a/res/res_agi.c
+++ b/res/res_agi.c
@@ -3736,6 +3736,24 @@ static enum agi_result agi_handle_command(struct ast_channel *chan, AGI *agi, ch
return AGI_RESULT_SUCCESS;
}
+
+AST_LIST_HEAD_NOLOCK(deferred_frames, ast_frame);
+
+static void queue_deferred_frames(struct deferred_frames *deferred_frames,
+ struct ast_channel *chan)
+{
+ struct ast_frame *f;
+
+ if (!AST_LIST_EMPTY(deferred_frames)) {
+ ast_channel_lock(chan);
+ while ((f = AST_LIST_REMOVE_HEAD(deferred_frames, frame_list))) {
+ ast_queue_frame_head(chan, f);
+ ast_frfree(f);
+ }
+ ast_channel_unlock(chan);
+ }
+}
+
static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi, int pid, int *status, int dead, int argc, char *argv[])
{
struct ast_channel *c;
@@ -3754,6 +3772,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
const char *sighup_str;
const char *exit_on_hangup_str;
int exit_on_hangup;
+ struct deferred_frames deferred_frames;
+
+ AST_LIST_HEAD_INIT_NOLOCK(&deferred_frames);
ast_channel_lock(chan);
sighup_str = pbx_builtin_getvar_helper(chan, "AGISIGHUP");
@@ -3815,8 +3836,20 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
/* Write, ignoring errors */
if (write(agi->audio, f->data.ptr, f->datalen) < 0) {
}
+ ast_frfree(f);
+ } else if (ast_is_deferrable_frame(f)) {
+ struct ast_frame *dup_f;
+
+ if ((dup_f = ast_frisolate(f))) {
+ AST_LIST_INSERT_HEAD(&deferred_frames, dup_f, frame_list);
+ }
+
+ if (dup_f != f) {
+ ast_frfree(f);
+ }
+ } else {
+ ast_frfree(f);
}
- ast_frfree(f);
}
} else if (outfd > -1) {
size_t len = sizeof(buf);
@@ -3864,6 +3897,8 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
buf[buflen - 1] = '\0';
}
+ queue_deferred_frames(&deferred_frames, chan);
+
if (agidebug)
ast_verbose("<%s>AGI Rx << %s\n", ast_channel_name(chan), buf);
cmd_status = agi_handle_command(chan, agi, buf, dead);
@@ -3885,6 +3920,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
}
}
}
+
+ queue_deferred_frames(&deferred_frames, chan);
+
if (agi->speech) {
ast_speech_destroy(agi->speech);
}
diff --git a/res/stasis/control.c b/res/stasis/control.c
index 3c5b75041..aa6866aee 100644
--- a/res/stasis/control.c
+++ b/res/stasis/control.c
@@ -903,11 +903,8 @@ static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
ast_bridge_after_cb_reason_string(reason));
}
-int control_add_channel_to_bridge(
- struct stasis_app_control *control,
- struct ast_channel *chan, void *data)
+int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap)
{
- struct ast_bridge *bridge = data;
int res;
if (!control || !bridge) {
@@ -960,7 +957,7 @@ int control_add_channel_to_bridge(
res = ast_bridge_impart(bridge,
chan,
- NULL, /* swap channel */
+ swap,
NULL, /* features */
AST_BRIDGE_IMPART_CHAN_DEPARTABLE);
if (res != 0) {
@@ -976,6 +973,11 @@ int control_add_channel_to_bridge(
return 0;
}
+int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data)
+{
+ return control_swap_channel_in_bridge(control, data, chan, NULL);
+}
+
int stasis_app_control_add_channel_to_bridge(
struct stasis_app_control *control, struct ast_bridge *bridge)
{
diff --git a/res/stasis/control.h b/res/stasis/control.h
index 1d37a494a..868a8091b 100644
--- a/res/stasis/control.h
+++ b/res/stasis/control.h
@@ -111,12 +111,20 @@ struct stasis_app *control_app(struct stasis_app_control *control);
* \brief Command callback for adding a channel to a bridge
*
* \param control The control for chan
- * \param channel The channel on which commands should be executed
- * \param bridge Data to be passed to the callback
+ * \param chan The channel on which commands should be executed
+ * \param data Bridge to be passed to the callback
+ */
+int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data);
+
+/*!
+ * \brief Command for swapping a channel in a bridge
+ *
+ * \param control The control for chan
+ * \param chan The channel on which commands should be executed
+ * \param bridge Bridge to be passed to the callback
+ * \param swap Channel to swap with when joining the bridge
*/
-int control_add_channel_to_bridge(
- struct stasis_app_control *control,
- struct ast_channel *chan, void *obj);
+int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap);
/*!
* \brief Stop playing silence to a channel right now.
diff --git a/res/stasis/stasis_bridge.c b/res/stasis/stasis_bridge.c
index bfd287226..aa21ec29c 100644
--- a/res/stasis/stasis_bridge.c
+++ b/res/stasis/stasis_bridge.c
@@ -76,24 +76,54 @@ static void bridge_stasis_run_cb(struct ast_channel *chan, void *data)
pbx_exec(chan, app_stasis, app_name);
}
-static int add_channel_to_bridge(
+struct defer_bridge_add_obj {
+ /*! Bridge to join (has ref) */
+ struct ast_bridge *bridge;
+ /*!
+ * \brief Channel to swap with in the bridge. (has ref)
+ *
+ * \note NULL if not swapping with a channel.
+ */
+ struct ast_channel *swap;
+};
+
+static void defer_bridge_add_dtor(void *obj)
+{
+ struct defer_bridge_add_obj *defer = obj;
+
+ ao2_cleanup(defer->bridge);
+ ast_channel_cleanup(defer->swap);
+}
+
+static int defer_bridge_add(
struct stasis_app_control *control,
struct ast_channel *chan, void *obj)
{
- struct ast_bridge *bridge = obj;
- int res;
+ struct defer_bridge_add_obj *defer = obj;
- res = control_add_channel_to_bridge(control,
- chan, bridge);
- return res;
+ return control_swap_channel_in_bridge(control, defer->bridge, chan, defer->swap);
}
static void bridge_stasis_queue_join_action(struct ast_bridge *self,
- struct ast_bridge_channel *bridge_channel)
+ struct ast_bridge_channel *bridge_channel, struct ast_bridge_channel *swap)
{
+ struct defer_bridge_add_obj *defer;
+
+ defer = ao2_alloc_options(sizeof(*defer), defer_bridge_add_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!defer) {
+ return;
+ }
+ ao2_ref(self, +1);
+ defer->bridge = self;
+ if (swap) {
+ ast_channel_ref(swap->chan);
+ defer->swap = swap->chan;
+ }
+
ast_channel_lock(bridge_channel->chan);
- command_prestart_queue_command(bridge_channel->chan, add_channel_to_bridge,
- ao2_bump(self), __ao2_cleanup);
+ command_prestart_queue_command(bridge_channel->chan, defer_bridge_add,
+ defer, __ao2_cleanup);
ast_channel_unlock(bridge_channel->chan);
}
@@ -179,11 +209,7 @@ static int bridge_stasis_push(struct ast_bridge *self, struct ast_bridge_channel
return -1;
}
- bridge_stasis_queue_join_action(self, bridge_channel);
- if (swap) {
- /* nudge the swap channel out of the bridge */
- ast_bridge_channel_leave_bridge(swap, BRIDGE_CHANNEL_STATE_END_NO_DISSOLVE, 0);
- }
+ bridge_stasis_queue_join_action(self, bridge_channel, swap);
/* Return -1 so the push fails and the after-bridge callback gets called
* This keeps the bridging framework from putting the channel into the bridge