diff options
-rw-r--r-- | apps/app_queue.c | 149 | ||||
-rw-r--r-- | configs/samples/sip.conf.sample | 1 | ||||
-rw-r--r-- | funcs/func_odbc.c | 314 | ||||
-rw-r--r-- | include/asterisk/bridge_channel_internal.h | 38 | ||||
-rw-r--r-- | main/bridge.c | 211 | ||||
-rw-r--r-- | main/bridge_channel.c | 26 | ||||
-rw-r--r-- | res/res_agi.c | 40 | ||||
-rw-r--r-- | res/stasis/control.c | 12 | ||||
-rw-r--r-- | res/stasis/control.h | 18 | ||||
-rw-r--r-- | res/stasis/stasis_bridge.c | 54 |
10 files changed, 627 insertions, 236 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c index 40aed2fe5..927c4967e 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -1554,7 +1554,6 @@ struct member { struct call_queue *lastqueue; /*!< Last queue we received a call */ unsigned int dead:1; /*!< Used to detect members deleted in realtime */ unsigned int delme:1; /*!< Flag to delete entry on reload */ - unsigned int call_pending:1; /*!< TRUE if the Q is attempting to place a call to the member. */ char rt_uniqueid[80]; /*!< Unique id of realtime member entry */ unsigned int ringinuse:1; /*!< Flag to ring queue members even if their status is 'inuse' */ }; @@ -2289,6 +2288,70 @@ static int get_member_status(struct call_queue *q, int max_penalty, int min_pena return -1; } +/* + * A "pool" of member objects that calls are currently pending on. If an + * agent is a member of multiple queues it's possible for that agent to be + * called by each of the queues at the same time. This happens because device + * state is slow to notify the queue app of one of it's member's being rung. + * This "pool" allows us to track which members are currently being rung while + * we wait on the device state change. + */ +static struct ao2_container *pending_members; +#define MAX_CALL_ATTEMPT_BUCKETS 353 + +static int pending_members_hash(const void *obj, const int flags) +{ + const struct member *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = object->interface; + break; + default: + ast_assert(0); + return 0; + } + return ast_str_case_hash(key); +} + +static int pending_members_cmp(void *obj, void *arg, int flags) +{ + const struct member *object_left = obj; + const struct member *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->interface; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcasecmp(object_left->interface, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* Not supported by container. */ + ast_assert(0); + return 0; + default: + cmp = 0; + break; + } + if (cmp) { + return 0; + } + return CMP_MATCH; +} + +static void pending_members_remove(struct member *mem) +{ + ao2_find(pending_members, mem, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK); +} + /*! \brief set a member's status based on device state of that member's state_interface. * * Lock interface list find sc, iterate through each queues queue_member list for member to @@ -2298,6 +2361,9 @@ static void update_status(struct call_queue *q, struct member *m, const int stat { m->status = status; + /* Whatever the status is clear the member from the pending members pool */ + pending_members_remove(m); + queue_publish_member_blob(queue_member_status_type(), queue_member_blob_create(q, m)); } @@ -3157,6 +3223,7 @@ static void member_add_to_queue(struct call_queue *queue, struct member *mem) */ static void member_remove_from_queue(struct call_queue *queue, struct member *mem) { + pending_members_remove(mem); ao2_lock(queue->members); ast_devstate_changed(QUEUE_UNKNOWN_PAUSED_DEVSTATE, AST_DEVSTATE_CACHABLE, "Queue:%s_pause_%s", queue->name, mem->interface); queue_member_follower_removal(queue, mem); @@ -4135,41 +4202,6 @@ static int member_status_available(int status) /*! * \internal - * \brief Clear the member call pending flag. - * - * \param mem Queue member. - * - * \return Nothing - */ -static void member_call_pending_clear(struct member *mem) -{ - ao2_lock(mem); - mem->call_pending = 0; - ao2_unlock(mem); -} - -/*! - * \internal - * \brief Set the member call pending flag. - * - * \param mem Queue member. - * - * \retval non-zero if call pending flag was already set. - */ -static int member_call_pending_set(struct member *mem) -{ - int old_pending; - - ao2_lock(mem); - old_pending = mem->call_pending; - mem->call_pending = 1; - ao2_unlock(mem); - - return old_pending; -} - -/*! - * \internal * \brief Determine if can ring a queue entry. * * \param qe Queue entry to check. @@ -4210,13 +4242,32 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call) } if (!call->member->ringinuse) { - if (member_call_pending_set(call->member)) { - ast_debug(1, "%s has another call pending, can't receive call\n", - call->interface); + struct member *mem; + + ao2_lock(pending_members); + + mem = ao2_find(pending_members, call->member, + OBJ_SEARCH_OBJECT | OBJ_NOLOCK); + if (mem) { + /* + * If found that means this member is currently being attempted + * from another calling thread, so stop trying from this thread + */ + ast_debug(1, "%s has another call trying, can't receive call\n", + call->interface); + ao2_ref(mem, -1); + ao2_unlock(pending_members); return 0; } /* + * If not found add it to the container so another queue + * won't attempt to call this member at the same time. + */ + ao2_link(pending_members, call->member); + ao2_unlock(pending_members); + + /* * The queue member is available. Get current status to be sure * because the device state and extension state callbacks may * not have updated the status yet. @@ -4224,7 +4275,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call) if (!member_status_available(get_queue_member_status(call->member))) { ast_debug(1, "%s actually not available, can't receive call\n", call->interface); - member_call_pending_clear(call->member); + pending_members_remove(call->member); return 0; } } @@ -4261,7 +4312,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies ++*busies; return 0; } - ast_assert(tmp->member->ringinuse || tmp->member->call_pending); ast_copy_string(tech, tmp->interface, sizeof(tech)); if ((location = strchr(tech, '/'))) { @@ -4278,7 +4328,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies qe->linpos++; ao2_unlock(qe->parent); - member_call_pending_clear(tmp->member); + pending_members_remove(tmp->member); publish_dial_end_event(qe->chan, tmp, NULL, "BUSY"); tmp->stillgoing = 0; @@ -4349,7 +4399,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies /* Again, keep going even if there's an error */ ast_verb(3, "Couldn't call %s\n", tmp->interface); do_hang(tmp); - member_call_pending_clear(tmp->member); + pending_members_remove(tmp->member); ++*busies; return 0; } @@ -4369,7 +4419,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies ast_verb(3, "Called %s\n", tmp->interface); - member_call_pending_clear(tmp->member); return 1; } @@ -9599,7 +9648,7 @@ static int manager_queues_summary(struct mansession *s, const struct message *m) ao2_lock(q); /* List queue properties */ - if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) { + if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) { /* Reset the necessary local variables if no queuefilter is set*/ qmemcount = 0; qmemavail = 0; @@ -9677,7 +9726,7 @@ static int manager_queues_status(struct mansession *s, const struct message *m) ao2_lock(q); /* List queue properties */ - if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) { + if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) { sl = ((q->callscompleted > 0) ? 100 * ((float)q->callscompletedinsl / (float)q->callscompleted) : 0); astman_append(s, "Event: QueueParams\r\n" "Queue: %s\r\n" @@ -10934,6 +10983,7 @@ static int unload_module(void) ast_extension_state_del(0, extension_state_cb); ast_unload_realtime("queue_members"); + ao2_cleanup(pending_members); ao2_cleanup(queues); queues = NULL; return 0; @@ -10962,6 +11012,13 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } + pending_members = ao2_container_alloc( + MAX_CALL_ATTEMPT_BUCKETS, pending_members_hash, pending_members_cmp); + if (!pending_members) { + unload_module(); + return AST_MODULE_LOAD_DECLINE; + } + use_weight = 0; if (reload_handler(0, &mask, NULL)) { diff --git a/configs/samples/sip.conf.sample b/configs/samples/sip.conf.sample index d89a2a157..8f28e2680 100644 --- a/configs/samples/sip.conf.sample +++ b/configs/samples/sip.conf.sample @@ -1509,7 +1509,6 @@ srvlookup=yes ; Enable DNS SRV lookups on outbound calls ;allow=ulaw ;allow=alaw ;mailbox=1234@default,1233@default ; Subscribe to status of multiple mailboxes -;registertrying=yes ; Send a 100 Trying when the device registers. ;[snom] ;type=friend ; Friends place calls and receive calls diff --git a/funcs/func_odbc.c b/funcs/func_odbc.c index 23930ed4d..ca15d703f 100644 --- a/funcs/func_odbc.c +++ b/funcs/func_odbc.c @@ -137,6 +137,163 @@ struct odbc_datastore { char names[0]; }; +/* \brief Data source name + * + * This holds data that pertains to a DSN + */ +struct dsn { + /*! A connection to the database */ + struct odbc_obj *connection; + /*! The name of the DSN as defined in res_odbc.conf */ + char name[0]; +}; + +#define DSN_BUCKETS 37 + +struct ao2_container *dsns; + +static int dsn_hash(const void *obj, const int flags) +{ + const struct dsn *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = object->name; + break; + default: + ast_assert(0); + return 0; + } + return ast_str_hash(key); +} + +static int dsn_cmp(void *obj, void *arg, int flags) +{ + const struct dsn *object_left = obj; + const struct dsn *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->name; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcmp(object_left->name, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + cmp = strncmp(object_left->name, right_key, strlen(right_key)); + break; + default: + cmp = 0; + break; + } + + if (cmp) { + return 0; + } + + return CMP_MATCH; +} + +static void dsn_destructor(void *obj) +{ + struct dsn *dsn = obj; + + if (dsn->connection) { + ast_odbc_release_obj(dsn->connection); + } +} + +/*! + * \brief Create a DSN and connect to the database + * + * \param name The name of the DSN as found in res_odbc.conf + * \retval NULL Fail + * \retval non-NULL The newly-created structure + */ +static struct dsn *create_dsn(const char *name) +{ + struct dsn *dsn; + + dsn = ao2_alloc(sizeof(*dsn) + strlen(name) + 1, dsn_destructor); + if (!dsn) { + return NULL; + } + + /* Safe */ + strcpy(dsn->name, name); + + dsn->connection = ast_odbc_request_obj(name, 0); + if (!dsn->connection) { + ao2_ref(dsn, -1); + return NULL; + } + + if (!ao2_link_flags(dsns, dsn, OBJ_NOLOCK)) { + ao2_ref(dsn, -1); + return NULL; + } + + return dsn; +} + +/*! + * \brief Retrieve a DSN, or create it if it does not exist. + * + * The created DSN is returned locked. This should be inconsequential + * to callers in most cases. + * + * When finished with the returned structure, the caller must call + * \ref release_dsn + * + * \param name Name of the DSN as found in res_odbc.conf + * \retval NULL Unable to retrieve or create the DSN + * \retval non-NULL The retrieved/created locked DSN + */ +static struct dsn *get_dsn(const char *name) +{ + struct dsn *dsn; + + ao2_lock(dsns); + dsn = ao2_find(dsns, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!dsn) { + dsn = create_dsn(name); + } + ao2_unlock(dsns); + + if (!dsn) { + return NULL; + } + + ao2_lock(dsn->connection); + + return dsn; +} + +/*! + * \brief Unlock and unreference a DSN + * + * \param dsn The dsn to unlock and unreference + * \return NULL + */ +static void *release_dsn(struct dsn *dsn) +{ + if (!dsn) { + return NULL; + } + + ao2_unlock(dsn->connection); + ao2_ref(dsn, -1); + + return NULL; +} + static AST_RWLIST_HEAD_STATIC(queries, acf_odbc_query); static int resultcount = 0; @@ -214,7 +371,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co struct odbc_obj *obj = NULL; struct acf_odbc_query *query; char *t, varname[15]; - int i, dsn, bogus_chan = 0; + int i, dsn_num, bogus_chan = 0; int transactional = 0; AST_DECLARE_APP_ARGS(values, AST_APP_ARG(field)[100]; @@ -227,6 +384,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co struct ast_str *buf = ast_str_thread_get(&sql_buf, 16); struct ast_str *insertbuf = ast_str_thread_get(&sql2_buf, 16); const char *status = "FAILURE"; + struct dsn *dsn = NULL; if (!buf || !insertbuf) { return -1; @@ -324,17 +482,21 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co * to multiple DSNs. We MUST have a single handle all the way through the * transaction, or else we CANNOT enforce atomicity. */ - for (dsn = 0; dsn < 5; dsn++) { - if (!ast_strlen_zero(query->writehandle[dsn])) { + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (!ast_strlen_zero(query->writehandle[dsn_num])) { if (transactional) { /* This can only happen second time through or greater. */ ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n"); } - if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) { + if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) { transactional = 1; } else { - obj = ast_odbc_request_obj(query->writehandle[dsn], 0); + dsn = get_dsn(query->writehandle[dsn_num]); + if (!dsn) { + continue; + } + obj = dsn->connection; transactional = 0; } @@ -342,10 +504,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co break; } - if (obj && !transactional) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); } } @@ -358,25 +517,25 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co status = "SUCCESS"; } else if (query->sql_insert) { - if (obj && !transactional) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); - for (transactional = 0, dsn = 0; dsn < 5; dsn++) { - if (!ast_strlen_zero(query->writehandle[dsn])) { + for (transactional = 0, dsn_num = 0; dsn_num < 5; dsn_num++) { + if (!ast_strlen_zero(query->writehandle[dsn_num])) { if (transactional) { /* This can only happen second time through or greater. */ ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n"); } else if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); } - if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) { + if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) { transactional = 1; } else { - obj = ast_odbc_request_obj(query->writehandle[dsn], 0); + dsn = get_dsn(query->writehandle[dsn_num]); + if (!dsn) { + continue; + } + obj = dsn->connection; transactional = 0; } if (obj) { @@ -406,10 +565,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status); } - if (obj && !transactional) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); if (!bogus_chan) { ast_autoservice_stop(chan); @@ -420,11 +576,10 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, char *buf, size_t len) { - struct odbc_obj *obj = NULL; struct acf_odbc_query *query; char varname[15], rowcount[12] = "-1"; struct ast_str *colnames = ast_str_thread_get(&colnames_buf, 16); - int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn, bogus_chan = 0; + int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn_num, bogus_chan = 0; AST_DECLARE_APP_ARGS(args, AST_APP_ARG(field)[100]; ); @@ -436,6 +591,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha struct odbc_datastore_row *row = NULL; struct ast_str *sql = ast_str_thread_get(&sql_buf, 16); const char *status = "FAILURE"; + struct dsn *dsn = NULL; if (!sql || !colnames) { if (chan) { @@ -523,28 +679,23 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha } AST_RWLIST_UNLOCK(&queries); - for (dsn = 0; dsn < 5; dsn++) { - if (!ast_strlen_zero(query->readhandle[dsn])) { - obj = ast_odbc_request_obj(query->readhandle[dsn], 0); - if (obj) { - stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)); + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (!ast_strlen_zero(query->readhandle[dsn_num])) { + dsn = get_dsn(query->readhandle[dsn_num]); + if (!dsn) { + continue; } + stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)); } if (stmt) { break; } - if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); } if (!stmt) { ast_log(LOG_ERROR, "Unable to execute query [%s]\n", ast_str_buffer(sql)); - if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); ast_autoservice_stop(chan); @@ -558,8 +709,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql)); SQLCloseCursor(stmt); SQLFreeHandle (SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); ast_autoservice_stop(chan); @@ -583,8 +733,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha } SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status); @@ -607,8 +756,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha odbc_datastore_free(resultset); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR"); ast_autoservice_stop(chan); @@ -640,8 +788,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha odbc_datastore_free(resultset); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR"); @@ -750,8 +897,7 @@ end_acf_read: odbc_datastore_free(resultset); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR"); ast_autoservice_stop(chan); return -1; @@ -764,8 +910,7 @@ end_acf_read: } SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (resultset && !multirow) { /* Fetch the first resultset */ if (!acf_fetch(chan, "", buf, buf, len)) { @@ -1192,8 +1337,8 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args if (a->argc == 5 && !strcmp(a->argv[4], "exec")) { /* Execute the query */ - struct odbc_obj *obj = NULL; - int dsn, executed = 0; + struct dsn *dsn = NULL; + int dsn_num, executed = 0; SQLHSTMT stmt; int rows = 0, res, x; SQLSMALLINT colcount = 0, collength; @@ -1207,19 +1352,18 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args return CLI_SUCCESS; } - for (dsn = 0; dsn < 5; dsn++) { - if (ast_strlen_zero(query->readhandle[dsn])) { + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (ast_strlen_zero(query->readhandle[dsn_num])) { continue; } - ast_debug(1, "Found handle %s\n", query->readhandle[dsn]); - if (!(obj = ast_odbc_request_obj(query->readhandle[dsn], 0))) { + dsn = get_dsn(query->readhandle[dsn_num]); + if (!dsn) { continue; } + ast_debug(1, "Found handle %s\n", query->readhandle[dsn_num]); - ast_debug(1, "Got obj\n"); - if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) { - ast_odbc_release_obj(obj); - obj = NULL; + if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) { + dsn = release_dsn(dsn); continue; } @@ -1230,8 +1374,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args ast_cli(a->fd, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql)); SQLCloseCursor(stmt); SQLFreeHandle (SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); AST_RWLIST_UNLOCK(&queries); return CLI_SUCCESS; } @@ -1240,10 +1383,9 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) { SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (res == SQL_NO_DATA) { - ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn, query->readhandle[dsn], ast_str_buffer(sql)); + ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn_num, query->readhandle[dsn_num], ast_str_buffer(sql)); break; } else { ast_cli(a->fd, "Error %d in FETCH [%s]\n", res, ast_str_buffer(sql)); @@ -1270,8 +1412,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args ast_cli(a->fd, "SQL Get Data error %d!\n[%s]\n\n", res, ast_str_buffer(sql)); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); AST_RWLIST_UNLOCK(&queries); return CLI_SUCCESS; } @@ -1289,15 +1430,11 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args } SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; - ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn, query->readhandle[dsn]); + dsn = release_dsn(dsn); + ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn_num, query->readhandle[dsn_num]); break; } - if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); if (!executed) { ast_cli(a->fd, "Failed to execute query. [%s]\n", ast_str_buffer(sql)); @@ -1420,30 +1557,29 @@ static char *cli_odbc_write(struct ast_cli_entry *e, int cmd, struct ast_cli_arg if (a->argc == 6 && !strcmp(a->argv[5], "exec")) { /* Execute the query */ - struct odbc_obj *obj = NULL; - int dsn, executed = 0; + struct dsn *dsn; + int dsn_num, executed = 0; SQLHSTMT stmt; SQLLEN rows = -1; - for (dsn = 0; dsn < 5; dsn++) { - if (ast_strlen_zero(query->writehandle[dsn])) { + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (ast_strlen_zero(query->writehandle[dsn_num])) { continue; } - if (!(obj = ast_odbc_request_obj(query->writehandle[dsn], 0))) { + dsn = get_dsn(query->writehandle[dsn_num]); + if (!dsn) { continue; } - if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) { - ast_odbc_release_obj(obj); - obj = NULL; + if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) { + dsn = release_dsn(dsn); continue; } SQLRowCount(stmt, &rows); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; - ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn, query->writehandle[dsn]); + dsn = release_dsn(dsn); + ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn_num, query->writehandle[dsn_num]); executed = 1; break; } @@ -1470,6 +1606,11 @@ static int load_module(void) char *catg; struct ast_flags config_flags = { 0 }; + dsns = ao2_container_alloc(DSN_BUCKETS, dsn_hash, dsn_cmp); + if (!dsns) { + return AST_MODULE_LOAD_DECLINE; + } + res |= ast_custom_function_register(&fetch_function); res |= ast_register_application_xml(app_odbcfinish, exec_odbcfinish); AST_RWLIST_WRLOCK(&queries); @@ -1478,6 +1619,7 @@ static int load_module(void) if (!cfg || cfg == CONFIG_STATUS_FILEINVALID) { ast_log(LOG_NOTICE, "Unable to load config for func_odbc: %s\n", config); AST_RWLIST_UNLOCK(&queries); + ao2_ref(dsns, -1); return AST_MODULE_LOAD_DECLINE; } @@ -1531,6 +1673,8 @@ static int unload_module(void) AST_RWLIST_WRLOCK(&queries); AST_RWLIST_UNLOCK(&queries); + + ao2_ref(dsns, -1); return res; } diff --git a/include/asterisk/bridge_channel_internal.h b/include/asterisk/bridge_channel_internal.h index 7f7d5a88b..fb8e781e8 100644 --- a/include/asterisk/bridge_channel_internal.h +++ b/include/asterisk/bridge_channel_internal.h @@ -151,47 +151,20 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel, void bridge_channel_internal_pull(struct ast_bridge_channel *bridge_channel); /*! - * \brief Internal bridge channel wait condition and associated result. - */ -struct bridge_channel_internal_cond { - /*! Lock for the data structure */ - ast_mutex_t lock; - /*! Wait condition */ - ast_cond_t cond; - /*! Wait until done */ - int done; - /*! The bridge channel */ - struct ast_bridge_channel *bridge_channel; -}; - -/*! - * \internal - * \brief Wait for the expected signal. - * \since 13.5.0 - * - * \param cond the wait object - * - * \return Nothing - */ -void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond); - -/*! - * \internal - * \brief Signal the condition wait. - * \since 13.5.0 + * \brief Signal imparting threads to wake up. + * \since 13.9.0 * - * \param cond the wait object + * \param chan Channel imparted that we need to signal. * * \return Nothing */ -void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond); +void bridge_channel_impart_signal(struct ast_channel *chan); /*! * \internal * \brief Join the bridge_channel to the bridge (blocking) * * \param bridge_channel The Channel in the bridge - * \param cond data used for signaling * * \note The bridge_channel->swap holds a channel reference for the swap * channel going into the bridging system. The ref ensures that the swap @@ -206,8 +179,7 @@ void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond); * \retval 0 bridge channel successfully joined the bridge * \retval -1 bridge channel failed to join the bridge */ -int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, - struct bridge_channel_internal_cond *cond); +int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel); /*! * \internal diff --git a/main/bridge.c b/main/bridge.c index a56555bc9..c28d49c02 100644 --- a/main/bridge.c +++ b/main/bridge.c @@ -1483,6 +1483,150 @@ void ast_bridge_notify_masquerade(struct ast_channel *chan) ao2_ref(bridge_channel, -1); } +/*! + * \brief Internal bridge impart wait condition and associated conditional. + */ +struct bridge_channel_impart_cond { + AST_LIST_ENTRY(bridge_channel_impart_cond) node; + /*! Lock for the data structure */ + ast_mutex_t lock; + /*! Wait condition */ + ast_cond_t cond; + /*! Wait until done */ + int done; +}; + +AST_LIST_HEAD_NOLOCK(bridge_channel_impart_ds_head, bridge_channel_impart_cond); + +/*! + * \internal + * \brief Signal imparting threads to wake up. + * \since 13.9.0 + * + * \param ds_head List of imparting threads to wake up. + * + * \return Nothing + */ +static void bridge_channel_impart_ds_head_signal(struct bridge_channel_impart_ds_head *ds_head) +{ + if (ds_head) { + struct bridge_channel_impart_cond *cond; + + while ((cond = AST_LIST_REMOVE_HEAD(ds_head, node))) { + ast_mutex_lock(&cond->lock); + cond->done = 1; + ast_cond_signal(&cond->cond); + ast_mutex_unlock(&cond->lock); + } + } +} + +static void bridge_channel_impart_ds_head_dtor(void *doomed) +{ + bridge_channel_impart_ds_head_signal(doomed); + ast_free(doomed); +} + +/*! + * \internal + * \brief Fixup the bridge impart datastore. + * \since 13.9.0 + * + * \param data Bridge impart datastore data to fixup from old_chan. + * \param old_chan The datastore is moving from this channel. + * \param new_chan The datastore is moving to this channel. + * + * \return Nothing + */ +static void bridge_channel_impart_ds_head_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan) +{ + /* + * Signal any waiting impart threads. The masquerade is going to kill + * old_chan and we don't need to be waiting on new_chan. + */ + bridge_channel_impart_ds_head_signal(data); +} + +static const struct ast_datastore_info bridge_channel_impart_ds_info = { + .type = "bridge-impart-ds", + .destroy = bridge_channel_impart_ds_head_dtor, + .chan_fixup = bridge_channel_impart_ds_head_fixup, +}; + +/*! + * \internal + * \brief Add impart wait datastore conditional to channel. + * \since 13.9.0 + * + * \param chan Channel to add the impart wait conditional. + * \param cond Imparting conditional to add. + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int bridge_channel_impart_add(struct ast_channel *chan, struct bridge_channel_impart_cond *cond) +{ + struct ast_datastore *datastore; + struct bridge_channel_impart_ds_head *ds_head; + + ast_channel_lock(chan); + + datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL); + if (!datastore) { + datastore = ast_datastore_alloc(&bridge_channel_impart_ds_info, NULL); + if (!datastore) { + ast_channel_unlock(chan); + return -1; + } + ds_head = ast_calloc(1, sizeof(*ds_head)); + if (!ds_head) { + ast_channel_unlock(chan); + ast_datastore_free(datastore); + return -1; + } + datastore->data = ds_head; + ast_channel_datastore_add(chan, datastore); + } else { + ds_head = datastore->data; + ast_assert(ds_head != NULL); + } + + AST_LIST_INSERT_TAIL(ds_head, cond, node); + + ast_channel_unlock(chan); + return 0; +} + +void bridge_channel_impart_signal(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + + ast_channel_lock(chan); + datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL); + if (datastore) { + bridge_channel_impart_ds_head_signal(datastore->data); + } + ast_channel_unlock(chan); +} + +/*! + * \internal + * \brief Block imparting channel thread until signaled. + * \since 13.9.0 + * + * \param cond Imparting conditional to wait for. + * + * \return Nothing + */ +static void bridge_channel_impart_wait(struct bridge_channel_impart_cond *cond) +{ + ast_mutex_lock(&cond->lock); + while (!cond->done) { + ast_cond_wait(&cond->cond, &cond->lock); + } + ast_mutex_unlock(&cond->lock); +} + /* * XXX ASTERISK-21271 make ast_bridge_join() require features to be allocated just like ast_bridge_impart() and not expect the struct back. * @@ -1551,7 +1695,7 @@ int ast_bridge_join(struct ast_bridge *bridge, } if (!res) { - res = bridge_channel_internal_join(bridge_channel, NULL); + res = bridge_channel_internal_join(bridge_channel); } /* Cleanup all the data in the bridge channel after it leaves the bridge. */ @@ -1568,6 +1712,7 @@ int ast_bridge_join(struct ast_bridge *bridge, join_exit:; ast_bridge_run_after_callback(chan); + bridge_channel_impart_signal(chan); if (!(ast_channel_softhangup_internal_flag(chan) & AST_SOFTHANGUP_ASYNCGOTO) && !ast_bridge_setup_after_goto(chan)) { /* Claim the after bridge goto is an async goto destination. */ @@ -1581,14 +1726,13 @@ join_exit:; /*! \brief Thread responsible for imparted bridged channels to be departed */ static void *bridge_channel_depart_thread(void *data) { - struct bridge_channel_internal_cond *cond = data; - struct ast_bridge_channel *bridge_channel = cond->bridge_channel; + struct ast_bridge_channel *bridge_channel = data; if (bridge_channel->callid) { ast_callid_threadassoc_add(bridge_channel->callid); } - bridge_channel_internal_join(bridge_channel, cond); + bridge_channel_internal_join(bridge_channel); /* * cleanup @@ -1601,6 +1745,8 @@ static void *bridge_channel_depart_thread(void *data) bridge_channel->features = NULL; ast_bridge_discard_after_callback(bridge_channel->chan, AST_BRIDGE_AFTER_CB_REASON_DEPART); + /* If join failed there will be impart threads waiting. */ + bridge_channel_impart_signal(bridge_channel->chan); ast_bridge_discard_after_goto(bridge_channel->chan); return NULL; @@ -1609,15 +1755,14 @@ static void *bridge_channel_depart_thread(void *data) /*! \brief Thread responsible for independent imparted bridged channels */ static void *bridge_channel_ind_thread(void *data) { - struct bridge_channel_internal_cond *cond = data; - struct ast_bridge_channel *bridge_channel = cond->bridge_channel; + struct ast_bridge_channel *bridge_channel = data; struct ast_channel *chan; if (bridge_channel->callid) { ast_callid_threadassoc_add(bridge_channel->callid); } - bridge_channel_internal_join(bridge_channel, cond); + bridge_channel_internal_join(bridge_channel); chan = bridge_channel->chan; /* cleanup */ @@ -1634,15 +1779,18 @@ static void *bridge_channel_ind_thread(void *data) ao2_ref(bridge_channel, -1); ast_bridge_run_after_callback(chan); + /* If join failed there will be impart threads waiting. */ + bridge_channel_impart_signal(chan); ast_bridge_run_after_goto(chan); return NULL; } -int ast_bridge_impart(struct ast_bridge *bridge, +static int bridge_impart_internal(struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap, struct ast_bridge_features *features, - enum ast_bridge_impart_flags flags) + enum ast_bridge_impart_flags flags, + struct bridge_channel_impart_cond *cond) { int res = 0; struct ast_bridge_channel *bridge_channel; @@ -1701,27 +1849,20 @@ int ast_bridge_impart(struct ast_bridge *bridge, /* Actually create the thread that will handle the channel */ if (!res) { - struct bridge_channel_internal_cond cond = { - .done = 0, - .bridge_channel = bridge_channel - }; - ast_mutex_init(&cond.lock); - ast_cond_init(&cond.cond, NULL); - + res = bridge_channel_impart_add(chan, cond); + } + if (!res) { if ((flags & AST_BRIDGE_IMPART_CHAN_MASK) == AST_BRIDGE_IMPART_CHAN_INDEPENDENT) { res = ast_pthread_create_detached(&bridge_channel->thread, NULL, - bridge_channel_ind_thread, &cond); + bridge_channel_ind_thread, bridge_channel); } else { res = ast_pthread_create(&bridge_channel->thread, NULL, - bridge_channel_depart_thread, &cond); + bridge_channel_depart_thread, bridge_channel); } if (!res) { - bridge_channel_internal_wait(&cond); + bridge_channel_impart_wait(cond); } - - ast_cond_destroy(&cond.cond); - ast_mutex_destroy(&cond.lock); } if (res) { @@ -1742,6 +1883,32 @@ int ast_bridge_impart(struct ast_bridge *bridge, return 0; } +int ast_bridge_impart(struct ast_bridge *bridge, + struct ast_channel *chan, + struct ast_channel *swap, + struct ast_bridge_features *features, + enum ast_bridge_impart_flags flags) +{ + struct bridge_channel_impart_cond cond = { + .done = 0, + }; + int res; + + ast_mutex_init(&cond.lock); + ast_cond_init(&cond.cond, NULL); + + res = bridge_impart_internal(bridge, chan, swap, features, flags, &cond); + if (res) { + /* Impart failed. Signal any other waiting impart threads */ + bridge_channel_impart_signal(chan); + } + + ast_cond_destroy(&cond.cond); + ast_mutex_destroy(&cond.lock); + + return res; +} + int ast_bridge_depart(struct ast_channel *chan) { struct ast_bridge_channel *bridge_channel; diff --git a/main/bridge_channel.c b/main/bridge_channel.c index 66f26eefe..db4ecfe57 100644 --- a/main/bridge_channel.c +++ b/main/bridge_channel.c @@ -2637,27 +2637,7 @@ static void bridge_channel_event_join_leave(struct ast_bridge_channel *bridge_ch ao2_iterator_destroy(&iter); } -void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond) -{ - ast_mutex_lock(&cond->lock); - while (!cond->done) { - ast_cond_wait(&cond->cond, &cond->lock); - } - ast_mutex_unlock(&cond->lock); -} - -void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond) -{ - if (cond) { - ast_mutex_lock(&cond->lock); - cond->done = 1; - ast_cond_signal(&cond->cond); - ast_mutex_unlock(&cond->lock); - } -} - -int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, - struct bridge_channel_internal_cond *cond) +int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel) { int res = 0; struct ast_bridge_features *channel_features; @@ -2687,7 +2667,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, bridge_channel->bridge->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan)); - bridge_channel_internal_signal(cond); return -1; } ast_channel_internal_bridge_set(bridge_channel->chan, bridge_channel->bridge); @@ -2722,8 +2701,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, } bridge_reconfigured(bridge_channel->bridge, !bridge_channel->inhibit_colp); - bridge_channel_internal_signal(cond); - if (bridge_channel->state == BRIDGE_CHANNEL_STATE_WAIT) { /* * Indicate a source change since this channel is entering the @@ -2735,6 +2712,7 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, ast_indicate(bridge_channel->chan, AST_CONTROL_SRCCHANGE); } + bridge_channel_impart_signal(bridge_channel->chan); ast_bridge_unlock(bridge_channel->bridge); /* Must release any swap ref after unlocking the bridge. */ diff --git a/res/res_agi.c b/res/res_agi.c index f6ce74960..e8249e202 100644 --- a/res/res_agi.c +++ b/res/res_agi.c @@ -3736,6 +3736,24 @@ static enum agi_result agi_handle_command(struct ast_channel *chan, AGI *agi, ch return AGI_RESULT_SUCCESS; } + +AST_LIST_HEAD_NOLOCK(deferred_frames, ast_frame); + +static void queue_deferred_frames(struct deferred_frames *deferred_frames, + struct ast_channel *chan) +{ + struct ast_frame *f; + + if (!AST_LIST_EMPTY(deferred_frames)) { + ast_channel_lock(chan); + while ((f = AST_LIST_REMOVE_HEAD(deferred_frames, frame_list))) { + ast_queue_frame_head(chan, f); + ast_frfree(f); + } + ast_channel_unlock(chan); + } +} + static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi, int pid, int *status, int dead, int argc, char *argv[]) { struct ast_channel *c; @@ -3754,6 +3772,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi const char *sighup_str; const char *exit_on_hangup_str; int exit_on_hangup; + struct deferred_frames deferred_frames; + + AST_LIST_HEAD_INIT_NOLOCK(&deferred_frames); ast_channel_lock(chan); sighup_str = pbx_builtin_getvar_helper(chan, "AGISIGHUP"); @@ -3815,8 +3836,20 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi /* Write, ignoring errors */ if (write(agi->audio, f->data.ptr, f->datalen) < 0) { } + ast_frfree(f); + } else if (ast_is_deferrable_frame(f)) { + struct ast_frame *dup_f; + + if ((dup_f = ast_frisolate(f))) { + AST_LIST_INSERT_HEAD(&deferred_frames, dup_f, frame_list); + } + + if (dup_f != f) { + ast_frfree(f); + } + } else { + ast_frfree(f); } - ast_frfree(f); } } else if (outfd > -1) { size_t len = sizeof(buf); @@ -3864,6 +3897,8 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi buf[buflen - 1] = '\0'; } + queue_deferred_frames(&deferred_frames, chan); + if (agidebug) ast_verbose("<%s>AGI Rx << %s\n", ast_channel_name(chan), buf); cmd_status = agi_handle_command(chan, agi, buf, dead); @@ -3885,6 +3920,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi } } } + + queue_deferred_frames(&deferred_frames, chan); + if (agi->speech) { ast_speech_destroy(agi->speech); } diff --git a/res/stasis/control.c b/res/stasis/control.c index 3c5b75041..aa6866aee 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -903,11 +903,8 @@ static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason, ast_bridge_after_cb_reason_string(reason)); } -int control_add_channel_to_bridge( - struct stasis_app_control *control, - struct ast_channel *chan, void *data) +int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap) { - struct ast_bridge *bridge = data; int res; if (!control || !bridge) { @@ -960,7 +957,7 @@ int control_add_channel_to_bridge( res = ast_bridge_impart(bridge, chan, - NULL, /* swap channel */ + swap, NULL, /* features */ AST_BRIDGE_IMPART_CHAN_DEPARTABLE); if (res != 0) { @@ -976,6 +973,11 @@ int control_add_channel_to_bridge( return 0; } +int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data) +{ + return control_swap_channel_in_bridge(control, data, chan, NULL); +} + int stasis_app_control_add_channel_to_bridge( struct stasis_app_control *control, struct ast_bridge *bridge) { diff --git a/res/stasis/control.h b/res/stasis/control.h index 1d37a494a..868a8091b 100644 --- a/res/stasis/control.h +++ b/res/stasis/control.h @@ -111,12 +111,20 @@ struct stasis_app *control_app(struct stasis_app_control *control); * \brief Command callback for adding a channel to a bridge * * \param control The control for chan - * \param channel The channel on which commands should be executed - * \param bridge Data to be passed to the callback + * \param chan The channel on which commands should be executed + * \param data Bridge to be passed to the callback + */ +int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data); + +/*! + * \brief Command for swapping a channel in a bridge + * + * \param control The control for chan + * \param chan The channel on which commands should be executed + * \param bridge Bridge to be passed to the callback + * \param swap Channel to swap with when joining the bridge */ -int control_add_channel_to_bridge( - struct stasis_app_control *control, - struct ast_channel *chan, void *obj); +int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap); /*! * \brief Stop playing silence to a channel right now. diff --git a/res/stasis/stasis_bridge.c b/res/stasis/stasis_bridge.c index bfd287226..aa21ec29c 100644 --- a/res/stasis/stasis_bridge.c +++ b/res/stasis/stasis_bridge.c @@ -76,24 +76,54 @@ static void bridge_stasis_run_cb(struct ast_channel *chan, void *data) pbx_exec(chan, app_stasis, app_name); } -static int add_channel_to_bridge( +struct defer_bridge_add_obj { + /*! Bridge to join (has ref) */ + struct ast_bridge *bridge; + /*! + * \brief Channel to swap with in the bridge. (has ref) + * + * \note NULL if not swapping with a channel. + */ + struct ast_channel *swap; +}; + +static void defer_bridge_add_dtor(void *obj) +{ + struct defer_bridge_add_obj *defer = obj; + + ao2_cleanup(defer->bridge); + ast_channel_cleanup(defer->swap); +} + +static int defer_bridge_add( struct stasis_app_control *control, struct ast_channel *chan, void *obj) { - struct ast_bridge *bridge = obj; - int res; + struct defer_bridge_add_obj *defer = obj; - res = control_add_channel_to_bridge(control, - chan, bridge); - return res; + return control_swap_channel_in_bridge(control, defer->bridge, chan, defer->swap); } static void bridge_stasis_queue_join_action(struct ast_bridge *self, - struct ast_bridge_channel *bridge_channel) + struct ast_bridge_channel *bridge_channel, struct ast_bridge_channel *swap) { + struct defer_bridge_add_obj *defer; + + defer = ao2_alloc_options(sizeof(*defer), defer_bridge_add_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!defer) { + return; + } + ao2_ref(self, +1); + defer->bridge = self; + if (swap) { + ast_channel_ref(swap->chan); + defer->swap = swap->chan; + } + ast_channel_lock(bridge_channel->chan); - command_prestart_queue_command(bridge_channel->chan, add_channel_to_bridge, - ao2_bump(self), __ao2_cleanup); + command_prestart_queue_command(bridge_channel->chan, defer_bridge_add, + defer, __ao2_cleanup); ast_channel_unlock(bridge_channel->chan); } @@ -179,11 +209,7 @@ static int bridge_stasis_push(struct ast_bridge *self, struct ast_bridge_channel return -1; } - bridge_stasis_queue_join_action(self, bridge_channel); - if (swap) { - /* nudge the swap channel out of the bridge */ - ast_bridge_channel_leave_bridge(swap, BRIDGE_CHANNEL_STATE_END_NO_DISSOLVE, 0); - } + bridge_stasis_queue_join_action(self, bridge_channel, swap); /* Return -1 so the push fails and the after-bridge callback gets called * This keeps the bridging framework from putting the channel into the bridge |