diff options
38 files changed, 2341 insertions, 388 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c index 939a0e2ad..dbd83938d 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -1510,7 +1510,6 @@ struct member { struct call_queue *lastqueue; /*!< Last queue we received a call */ unsigned int dead:1; /*!< Used to detect members deleted in realtime */ unsigned int delme:1; /*!< Flag to delete entry on reload */ - unsigned int call_pending:1; /*!< TRUE if the Q is attempting to place a call to the member. */ char rt_uniqueid[80]; /*!< Unique id of realtime member entry */ unsigned int ringinuse:1; /*!< Flag to ring queue members even if their status is 'inuse' */ }; @@ -2267,6 +2266,70 @@ static int get_member_status(struct call_queue *q, int max_penalty, int min_pena return -1; } +/* + * A "pool" of member objects that calls are currently pending on. If an + * agent is a member of multiple queues it's possible for that agent to be + * called by each of the queues at the same time. This happens because device + * state is slow to notify the queue app of one of it's member's being rung. + * This "pool" allows us to track which members are currently being rung while + * we wait on the device state change. + */ +static struct ao2_container *pending_members; +#define MAX_CALL_ATTEMPT_BUCKETS 353 + +static int pending_members_hash(const void *obj, const int flags) +{ + const struct member *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = object->interface; + break; + default: + ast_assert(0); + return 0; + } + return ast_str_case_hash(key); +} + +static int pending_members_cmp(void *obj, void *arg, int flags) +{ + const struct member *object_left = obj; + const struct member *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->interface; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcasecmp(object_left->interface, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* Not supported by container. */ + ast_assert(0); + return 0; + default: + cmp = 0; + break; + } + if (cmp) { + return 0; + } + return CMP_MATCH; +} + +static void pending_members_remove(struct member *mem) +{ + ao2_find(pending_members, mem, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK); +} + /*! \brief set a member's status based on device state of that member's state_interface. * * Lock interface list find sc, iterate through each queues queue_member list for member to @@ -2276,6 +2339,9 @@ static void update_status(struct call_queue *q, struct member *m, const int stat { m->status = status; + /* Whatever the status is clear the member from the pending members pool */ + pending_members_remove(m); + queue_publish_member_blob(queue_member_status_type(), queue_member_blob_create(q, m)); } @@ -3132,6 +3198,7 @@ static void member_add_to_queue(struct call_queue *queue, struct member *mem) */ static void member_remove_from_queue(struct call_queue *queue, struct member *mem) { + pending_members_remove(mem); ao2_lock(queue->members); ast_devstate_changed(QUEUE_UNKNOWN_PAUSED_DEVSTATE, AST_DEVSTATE_CACHABLE, "Queue:%s_pause_%s", queue->name, mem->interface); queue_member_follower_removal(queue, mem); @@ -4110,41 +4177,6 @@ static int member_status_available(int status) /*! * \internal - * \brief Clear the member call pending flag. - * - * \param mem Queue member. - * - * \return Nothing - */ -static void member_call_pending_clear(struct member *mem) -{ - ao2_lock(mem); - mem->call_pending = 0; - ao2_unlock(mem); -} - -/*! - * \internal - * \brief Set the member call pending flag. - * - * \param mem Queue member. - * - * \retval non-zero if call pending flag was already set. - */ -static int member_call_pending_set(struct member *mem) -{ - int old_pending; - - ao2_lock(mem); - old_pending = mem->call_pending; - mem->call_pending = 1; - ao2_unlock(mem); - - return old_pending; -} - -/*! - * \internal * \brief Determine if can ring a queue entry. * * \param qe Queue entry to check. @@ -4164,7 +4196,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call) return 0; } - if (call->member->in_call && call->lastqueue->wrapuptime) { + if (call->member->in_call && call->lastqueue && call->lastqueue->wrapuptime) { ast_debug(1, "%s is in call, so not available (wrapuptime %d)\n", call->interface, call->lastqueue->wrapuptime); return 0; @@ -4185,13 +4217,32 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call) } if (!call->member->ringinuse) { - if (member_call_pending_set(call->member)) { - ast_debug(1, "%s has another call pending, can't receive call\n", - call->interface); + struct member *mem; + + ao2_lock(pending_members); + + mem = ao2_find(pending_members, call->member, + OBJ_SEARCH_OBJECT | OBJ_NOLOCK); + if (mem) { + /* + * If found that means this member is currently being attempted + * from another calling thread, so stop trying from this thread + */ + ast_debug(1, "%s has another call trying, can't receive call\n", + call->interface); + ao2_ref(mem, -1); + ao2_unlock(pending_members); return 0; } /* + * If not found add it to the container so another queue + * won't attempt to call this member at the same time. + */ + ao2_link(pending_members, call->member); + ao2_unlock(pending_members); + + /* * The queue member is available. Get current status to be sure * because the device state and extension state callbacks may * not have updated the status yet. @@ -4199,7 +4250,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call) if (!member_status_available(get_queue_member_status(call->member))) { ast_debug(1, "%s actually not available, can't receive call\n", call->interface); - member_call_pending_clear(call->member); + pending_members_remove(call->member); return 0; } } @@ -4236,7 +4287,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies ++*busies; return 0; } - ast_assert(tmp->member->ringinuse || tmp->member->call_pending); ast_copy_string(tech, tmp->interface, sizeof(tech)); if ((location = strchr(tech, '/'))) { @@ -4253,7 +4303,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies qe->linpos++; ao2_unlock(qe->parent); - member_call_pending_clear(tmp->member); + pending_members_remove(tmp->member); publish_dial_end_event(qe->chan, tmp, NULL, "BUSY"); tmp->stillgoing = 0; @@ -4324,7 +4374,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies /* Again, keep going even if there's an error */ ast_verb(3, "Couldn't call %s\n", tmp->interface); do_hang(tmp); - member_call_pending_clear(tmp->member); + pending_members_remove(tmp->member); ++*busies; return 0; } @@ -4344,7 +4394,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies ast_verb(3, "Called %s\n", tmp->interface); - member_call_pending_clear(tmp->member); return 1; } @@ -9472,7 +9521,7 @@ static int manager_queues_summary(struct mansession *s, const struct message *m) ao2_lock(q); /* List queue properties */ - if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) { + if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) { /* Reset the necessary local variables if no queuefilter is set*/ qmemcount = 0; qmemavail = 0; @@ -9550,7 +9599,7 @@ static int manager_queues_status(struct mansession *s, const struct message *m) ao2_lock(q); /* List queue properties */ - if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) { + if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) { sl = ((q->callscompleted > 0) ? 100 * ((float)q->callscompletedinsl / (float)q->callscompleted) : 0); astman_append(s, "Event: QueueParams\r\n" "Queue: %s\r\n" @@ -10806,6 +10855,8 @@ static int unload_module(void) ast_unload_realtime("queue_members"); ao2_cleanup(queues); + ao2_cleanup(pending_members); + queues = NULL; return 0; } @@ -10833,6 +10884,13 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } + pending_members = ao2_container_alloc( + MAX_CALL_ATTEMPT_BUCKETS, pending_members_hash, pending_members_cmp); + if (!pending_members) { + unload_module(); + return AST_MODULE_LOAD_DECLINE; + } + use_weight = 0; if (reload_handler(0, &mask, NULL)) { diff --git a/apps/app_talkdetect.c b/apps/app_talkdetect.c index a021252de..f7086fdd9 100644 --- a/apps/app_talkdetect.c +++ b/apps/app_talkdetect.c @@ -26,7 +26,7 @@ */ /*** MODULEINFO - <support_level>extended</support_level> + <support_level>core</support_level> ***/ #include "asterisk.h" diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index 798f844fa..a0b668d8d 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -622,12 +622,12 @@ static AST_LIST_HEAD_STATIC(vmstates, vmstate); #define OPERATOR_EXIT 300 enum vm_box { - NEW_FOLDER, - OLD_FOLDER, - WORK_FOLDER, - FAMILY_FOLDER, - FRIENDS_FOLDER, - GREETINGS_FOLDER + NEW_FOLDER = 0, + OLD_FOLDER = 1, + WORK_FOLDER = 2, + FAMILY_FOLDER = 3, + FRIENDS_FOLDER = 4, + GREETINGS_FOLDER = -1 }; enum vm_option_flags { diff --git a/apps/confbridge/conf_chan_announce.c b/apps/confbridge/conf_chan_announce.c index 6596a8537..ee4660687 100644 --- a/apps/confbridge/conf_chan_announce.c +++ b/apps/confbridge/conf_chan_announce.c @@ -199,7 +199,6 @@ int conf_announce_channel_push(struct ast_channel *ast) /* Impart the output channel into the bridge */ if (ast_bridge_impart(p->bridge, chan, NULL, features, AST_BRIDGE_IMPART_CHAN_DEPARTABLE)) { - ast_bridge_features_destroy(features); ast_channel_unref(chan); return -1; } diff --git a/bridges/bridge_softmix.c b/bridges/bridge_softmix.c index e3df18fe5..fe058e4e6 100644 --- a/bridges/bridge_softmix.c +++ b/bridges/bridge_softmix.c @@ -359,6 +359,9 @@ static void set_softmix_bridge_data(int rate, int interval, struct ast_bridge_ch struct ast_format *slin_format; int setup_fail; + /* The callers have already ensured that sc is never NULL. */ + ast_assert(sc != NULL); + slin_format = ast_format_cache_get_slin_by_rate(rate); ast_mutex_lock(&sc->lock); @@ -714,7 +717,7 @@ static int softmix_bridge_write(struct ast_bridge *bridge, struct ast_bridge_cha { int res = 0; - if (!bridge->tech_pvt || (bridge_channel && !bridge_channel->tech_pvt)) { + if (!bridge->tech_pvt || !bridge_channel || !bridge_channel->tech_pvt) { /* "Accept" the frame and discard it. */ return 0; } @@ -984,6 +987,11 @@ static int softmix_mixing_loop(struct ast_bridge *bridge) AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) { struct softmix_channel *sc = bridge_channel->tech_pvt; + if (!sc) { + /* This channel failed to join successfully. */ + continue; + } + /* Update the sample rate to match the bridge's native sample rate if necessary. */ if (update_all_rates) { set_softmix_bridge_data(softmix_data->internal_rate, softmix_data->internal_mixing_interval, bridge_channel, 1); @@ -1019,7 +1027,8 @@ static int softmix_mixing_loop(struct ast_bridge *bridge) AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) { struct softmix_channel *sc = bridge_channel->tech_pvt; - if (bridge_channel->suspended) { + if (!sc || bridge_channel->suspended) { + /* This channel failed to join successfully or is suspended. */ continue; } diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 91fb0b546..cbbda4e73 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -35170,17 +35170,19 @@ static int load_module(void) /* And start the monitor for the first time */ restart_monitor(); - ast_realtime_require_field(ast_check_realtime("sipregs") ? "sipregs" : "sippeers", - "name", RQ_CHAR, 10, - "ipaddr", RQ_CHAR, INET6_ADDRSTRLEN - 1, - "port", RQ_UINTEGER2, 5, - "regseconds", RQ_INTEGER4, 11, - "defaultuser", RQ_CHAR, 10, - "fullcontact", RQ_CHAR, 35, - "regserver", RQ_CHAR, 20, - "useragent", RQ_CHAR, 20, - "lastms", RQ_INTEGER4, 11, - SENTINEL); + if (sip_cfg.peer_rtupdate) { + ast_realtime_require_field(ast_check_realtime("sipregs") ? "sipregs" : "sippeers", + "name", RQ_CHAR, 10, + "ipaddr", RQ_CHAR, INET6_ADDRSTRLEN - 1, + "port", RQ_UINTEGER2, 5, + "regseconds", RQ_INTEGER4, 11, + "defaultuser", RQ_CHAR, 10, + "fullcontact", RQ_CHAR, 35, + "regserver", RQ_CHAR, 20, + "useragent", RQ_CHAR, 20, + "lastms", RQ_INTEGER4, 11, + SENTINEL); + } sip_register_tests(); @@ -35199,7 +35201,7 @@ static int unload_module(void) struct sip_pvt *p; struct sip_threadinfo *th; struct ao2_iterator i; - int wait_count; + struct timeval start; ast_sip_api_provider_unregister(); @@ -35349,11 +35351,11 @@ static int unload_module(void) * joinable. They can die on their own and remove themselves * from the container thus resulting in a huge memory leak. */ - wait_count = 1000; - while (ao2_container_count(threadt) && --wait_count) { + start = ast_tvnow(); + while (ao2_container_count(threadt) && (ast_tvdiff_sec(ast_tvnow(), start) < 5)) { sched_yield(); } - if (!wait_count) { + if (ao2_container_count(threadt)) { ast_debug(2, "TCP/TLS thread container did not become empty :(\n"); } diff --git a/configs/samples/sip.conf.sample b/configs/samples/sip.conf.sample index a24ab30a6..5c3238e2a 100644 --- a/configs/samples/sip.conf.sample +++ b/configs/samples/sip.conf.sample @@ -1479,7 +1479,6 @@ srvlookup=yes ; Enable DNS SRV lookups on outbound calls ;allow=ulaw ;allow=alaw ;mailbox=1234@default,1233@default ; Subscribe to status of multiple mailboxes -;registertrying=yes ; Send a 100 Trying when the device registers. ;[snom] ;type=friend ; Friends place calls and receive calls diff --git a/funcs/func_odbc.c b/funcs/func_odbc.c index 0af3fd1c8..d17debd4c 100644 --- a/funcs/func_odbc.c +++ b/funcs/func_odbc.c @@ -137,6 +137,163 @@ struct odbc_datastore { char names[0]; }; +/* \brief Data source name + * + * This holds data that pertains to a DSN + */ +struct dsn { + /*! A connection to the database */ + struct odbc_obj *connection; + /*! The name of the DSN as defined in res_odbc.conf */ + char name[0]; +}; + +#define DSN_BUCKETS 37 + +struct ao2_container *dsns; + +static int dsn_hash(const void *obj, const int flags) +{ + const struct dsn *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = object->name; + break; + default: + ast_assert(0); + return 0; + } + return ast_str_hash(key); +} + +static int dsn_cmp(void *obj, void *arg, int flags) +{ + const struct dsn *object_left = obj; + const struct dsn *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->name; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcmp(object_left->name, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + cmp = strncmp(object_left->name, right_key, strlen(right_key)); + break; + default: + cmp = 0; + break; + } + + if (cmp) { + return 0; + } + + return CMP_MATCH; +} + +static void dsn_destructor(void *obj) +{ + struct dsn *dsn = obj; + + if (dsn->connection) { + ast_odbc_release_obj(dsn->connection); + } +} + +/*! + * \brief Create a DSN and connect to the database + * + * \param name The name of the DSN as found in res_odbc.conf + * \retval NULL Fail + * \retval non-NULL The newly-created structure + */ +static struct dsn *create_dsn(const char *name) +{ + struct dsn *dsn; + + dsn = ao2_alloc(sizeof(*dsn) + strlen(name) + 1, dsn_destructor); + if (!dsn) { + return NULL; + } + + /* Safe */ + strcpy(dsn->name, name); + + dsn->connection = ast_odbc_request_obj(name, 0); + if (!dsn->connection) { + ao2_ref(dsn, -1); + return NULL; + } + + if (!ao2_link_flags(dsns, dsn, OBJ_NOLOCK)) { + ao2_ref(dsn, -1); + return NULL; + } + + return dsn; +} + +/*! + * \brief Retrieve a DSN, or create it if it does not exist. + * + * The created DSN is returned locked. This should be inconsequential + * to callers in most cases. + * + * When finished with the returned structure, the caller must call + * \ref release_dsn + * + * \param name Name of the DSN as found in res_odbc.conf + * \retval NULL Unable to retrieve or create the DSN + * \retval non-NULL The retrieved/created locked DSN + */ +static struct dsn *get_dsn(const char *name) +{ + struct dsn *dsn; + + ao2_lock(dsns); + dsn = ao2_find(dsns, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!dsn) { + dsn = create_dsn(name); + } + ao2_unlock(dsns); + + if (!dsn) { + return NULL; + } + + ao2_lock(dsn->connection); + + return dsn; +} + +/*! + * \brief Unlock and unreference a DSN + * + * \param dsn The dsn to unlock and unreference + * \return NULL + */ +static void *release_dsn(struct dsn *dsn) +{ + if (!dsn) { + return NULL; + } + + ao2_unlock(dsn->connection); + ao2_ref(dsn, -1); + + return NULL; +} + static AST_RWLIST_HEAD_STATIC(queries, acf_odbc_query); static int resultcount = 0; @@ -214,7 +371,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co struct odbc_obj *obj = NULL; struct acf_odbc_query *query; char *t, varname[15]; - int i, dsn, bogus_chan = 0; + int i, dsn_num, bogus_chan = 0; int transactional = 0; AST_DECLARE_APP_ARGS(values, AST_APP_ARG(field)[100]; @@ -227,6 +384,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co struct ast_str *buf = ast_str_thread_get(&sql_buf, 16); struct ast_str *insertbuf = ast_str_thread_get(&sql2_buf, 16); const char *status = "FAILURE"; + struct dsn *dsn = NULL; if (!buf || !insertbuf) { return -1; @@ -324,17 +482,21 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co * to multiple DSNs. We MUST have a single handle all the way through the * transaction, or else we CANNOT enforce atomicity. */ - for (dsn = 0; dsn < 5; dsn++) { - if (!ast_strlen_zero(query->writehandle[dsn])) { + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (!ast_strlen_zero(query->writehandle[dsn_num])) { if (transactional) { /* This can only happen second time through or greater. */ ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n"); } - if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) { + if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) { transactional = 1; } else { - obj = ast_odbc_request_obj(query->writehandle[dsn], 0); + dsn = get_dsn(query->writehandle[dsn_num]); + if (!dsn) { + continue; + } + obj = dsn->connection; transactional = 0; } @@ -342,10 +504,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co break; } - if (obj && !transactional) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); } } @@ -358,25 +517,25 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co status = "SUCCESS"; } else if (query->sql_insert) { - if (obj && !transactional) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); - for (transactional = 0, dsn = 0; dsn < 5; dsn++) { - if (!ast_strlen_zero(query->writehandle[dsn])) { + for (transactional = 0, dsn_num = 0; dsn_num < 5; dsn_num++) { + if (!ast_strlen_zero(query->writehandle[dsn_num])) { if (transactional) { /* This can only happen second time through or greater. */ ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n"); } else if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); } - if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) { + if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) { transactional = 1; } else { - obj = ast_odbc_request_obj(query->writehandle[dsn], 0); + dsn = get_dsn(query->writehandle[dsn_num]); + if (!dsn) { + continue; + } + obj = dsn->connection; transactional = 0; } if (obj) { @@ -406,10 +565,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status); } - if (obj && !transactional) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); if (!bogus_chan) { ast_autoservice_stop(chan); @@ -420,11 +576,10 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, char *buf, size_t len) { - struct odbc_obj *obj = NULL; struct acf_odbc_query *query; char varname[15], rowcount[12] = "-1"; struct ast_str *colnames = ast_str_thread_get(&colnames_buf, 16); - int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn, bogus_chan = 0; + int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn_num, bogus_chan = 0; AST_DECLARE_APP_ARGS(args, AST_APP_ARG(field)[100]; ); @@ -436,6 +591,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha struct odbc_datastore_row *row = NULL; struct ast_str *sql = ast_str_thread_get(&sql_buf, 16); const char *status = "FAILURE"; + struct dsn *dsn = NULL; if (!sql || !colnames) { if (chan) { @@ -523,28 +679,23 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha } AST_RWLIST_UNLOCK(&queries); - for (dsn = 0; dsn < 5; dsn++) { - if (!ast_strlen_zero(query->readhandle[dsn])) { - obj = ast_odbc_request_obj(query->readhandle[dsn], 0); - if (obj) { - stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)); + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (!ast_strlen_zero(query->readhandle[dsn_num])) { + dsn = get_dsn(query->readhandle[dsn_num]); + if (!dsn) { + continue; } + stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)); } if (stmt) { break; } - if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); } if (!stmt) { ast_log(LOG_ERROR, "Unable to execute query [%s]\n", ast_str_buffer(sql)); - if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); ast_autoservice_stop(chan); @@ -558,8 +709,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql)); SQLCloseCursor(stmt); SQLFreeHandle (SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); ast_autoservice_stop(chan); @@ -583,8 +733,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha } SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status); @@ -607,8 +756,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha odbc_datastore_free(resultset); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR"); ast_autoservice_stop(chan); @@ -640,8 +788,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha odbc_datastore_free(resultset); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (!bogus_chan) { pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR"); @@ -750,8 +897,7 @@ end_acf_read: odbc_datastore_free(resultset); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR"); ast_autoservice_stop(chan); return -1; @@ -764,8 +910,7 @@ end_acf_read: } SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (resultset && !multirow) { /* Fetch the first resultset */ if (!acf_fetch(chan, "", buf, buf, len)) { @@ -1192,8 +1337,8 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args if (a->argc == 5 && !strcmp(a->argv[4], "exec")) { /* Execute the query */ - struct odbc_obj *obj = NULL; - int dsn, executed = 0; + struct dsn *dsn = NULL; + int dsn_num, executed = 0; SQLHSTMT stmt; int rows = 0, res, x; SQLSMALLINT colcount = 0, collength; @@ -1207,19 +1352,18 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args return CLI_SUCCESS; } - for (dsn = 0; dsn < 5; dsn++) { - if (ast_strlen_zero(query->readhandle[dsn])) { + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (ast_strlen_zero(query->readhandle[dsn_num])) { continue; } - ast_debug(1, "Found handle %s\n", query->readhandle[dsn]); - if (!(obj = ast_odbc_request_obj(query->readhandle[dsn], 0))) { + dsn = get_dsn(query->readhandle[dsn_num]); + if (!dsn) { continue; } + ast_debug(1, "Found handle %s\n", query->readhandle[dsn_num]); - ast_debug(1, "Got obj\n"); - if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) { - ast_odbc_release_obj(obj); - obj = NULL; + if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) { + dsn = release_dsn(dsn); continue; } @@ -1230,8 +1374,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args ast_cli(a->fd, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql)); SQLCloseCursor(stmt); SQLFreeHandle (SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); AST_RWLIST_UNLOCK(&queries); return CLI_SUCCESS; } @@ -1240,10 +1383,9 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) { SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); if (res == SQL_NO_DATA) { - ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn, query->readhandle[dsn], ast_str_buffer(sql)); + ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn_num, query->readhandle[dsn_num], ast_str_buffer(sql)); break; } else { ast_cli(a->fd, "Error %d in FETCH [%s]\n", res, ast_str_buffer(sql)); @@ -1270,8 +1412,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args ast_cli(a->fd, "SQL Get Data error %d!\n[%s]\n\n", res, ast_str_buffer(sql)); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; + dsn = release_dsn(dsn); AST_RWLIST_UNLOCK(&queries); return CLI_SUCCESS; } @@ -1289,15 +1430,11 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args } SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; - ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn, query->readhandle[dsn]); + dsn = release_dsn(dsn); + ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn_num, query->readhandle[dsn_num]); break; } - if (obj) { - ast_odbc_release_obj(obj); - obj = NULL; - } + dsn = release_dsn(dsn); if (!executed) { ast_cli(a->fd, "Failed to execute query. [%s]\n", ast_str_buffer(sql)); @@ -1420,30 +1557,29 @@ static char *cli_odbc_write(struct ast_cli_entry *e, int cmd, struct ast_cli_arg if (a->argc == 6 && !strcmp(a->argv[5], "exec")) { /* Execute the query */ - struct odbc_obj *obj = NULL; - int dsn, executed = 0; + struct dsn *dsn; + int dsn_num, executed = 0; SQLHSTMT stmt; SQLLEN rows = -1; - for (dsn = 0; dsn < 5; dsn++) { - if (ast_strlen_zero(query->writehandle[dsn])) { + for (dsn_num = 0; dsn_num < 5; dsn_num++) { + if (ast_strlen_zero(query->writehandle[dsn_num])) { continue; } - if (!(obj = ast_odbc_request_obj(query->writehandle[dsn], 0))) { + dsn = get_dsn(query->writehandle[dsn_num]); + if (!dsn) { continue; } - if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) { - ast_odbc_release_obj(obj); - obj = NULL; + if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) { + dsn = release_dsn(dsn); continue; } SQLRowCount(stmt, &rows); SQLCloseCursor(stmt); SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - obj = NULL; - ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn, query->writehandle[dsn]); + dsn = release_dsn(dsn); + ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn_num, query->writehandle[dsn_num]); executed = 1; break; } @@ -1470,6 +1606,11 @@ static int load_module(void) char *catg; struct ast_flags config_flags = { 0 }; + dsns = ao2_container_alloc(DSN_BUCKETS, dsn_hash, dsn_cmp); + if (!dsns) { + return AST_MODULE_LOAD_DECLINE; + } + res |= ast_custom_function_register(&fetch_function); res |= ast_register_application_xml(app_odbcfinish, exec_odbcfinish); AST_RWLIST_WRLOCK(&queries); @@ -1478,6 +1619,7 @@ static int load_module(void) if (!cfg || cfg == CONFIG_STATUS_FILEINVALID) { ast_log(LOG_NOTICE, "Unable to load config for func_odbc: %s\n", config); AST_RWLIST_UNLOCK(&queries); + ao2_ref(dsns, -1); return AST_MODULE_LOAD_DECLINE; } @@ -1531,6 +1673,8 @@ static int unload_module(void) AST_RWLIST_WRLOCK(&queries); AST_RWLIST_UNLOCK(&queries); + + ao2_ref(dsns, -1); return res; } diff --git a/include/asterisk/astobj2.h b/include/asterisk/astobj2.h index 692cc7cb4..4bd44db76 100644 --- a/include/asterisk/astobj2.h +++ b/include/asterisk/astobj2.h @@ -19,6 +19,7 @@ #include "asterisk/compat.h" #include "asterisk/lock.h" +#include "asterisk/inline_api.h" /*! \file * \ref AstObj2 @@ -638,6 +639,46 @@ int __ao2_trylock(void *a, enum ao2_lock_req lock_how, const char *file, const c void *ao2_object_get_lockaddr(void *obj); +/*! + * \brief Increment reference count on an object and lock it + * \since 13.9.0 + * + * \param[in] obj A pointer to the ao2 object + * \retval 0 The object is not an ao2 object or wasn't locked successfully + * \retval 1 The object's reference count was incremented and was locked + */ +AST_INLINE_API( +int ao2_ref_and_lock(void *obj), +{ + ao2_ref(obj, +1); + if (ao2_lock(obj)) { + ao2_ref(obj, -1); + return 0; + } + return 1; +} +) + +/*! + * \brief Unlock an object and decrement its reference count + * \since 13.9.0 + * + * \param[in] obj A pointer to the ao2 object + * \retval 0 The object is not an ao2 object or wasn't unlocked successfully + * \retval 1 The object was unlocked and it's reference count was decremented + */ +AST_INLINE_API( +int ao2_unlock_and_unref(void *obj), +{ + if (ao2_unlock(obj)) { + return 0; + } + ao2_ref(obj, -1); + + return 1; +} +) + /*! Global ao2 object holder structure. */ struct ao2_global_obj { /*! Access lock to the held ao2 object. */ @@ -1985,4 +2026,97 @@ void ao2_iterator_cleanup(struct ao2_iterator *iter); */ int ao2_iterator_count(struct ao2_iterator *iter); +/*! + * \brief Creates a hash function for a structure string field. + * \param stype The structure type + * \param field The string field in the structure to hash + * + * AO2_STRING_FIELD_HASH_CB(mystruct, myfield) will produce a function + * named mystruct_hash_fn which hashes mystruct->myfield. + */ +#define AO2_STRING_FIELD_HASH_FN(stype, field) \ +static int stype ## _hash_fn(const void *obj, const int flags) \ +{ \ + const struct stype *object = obj; \ + const char *key; \ + switch (flags & OBJ_SEARCH_MASK) { \ + case OBJ_SEARCH_KEY: \ + key = obj; \ + break; \ + case OBJ_SEARCH_OBJECT: \ + key = object->field; \ + break; \ + default: \ + ast_assert(0); \ + return 0; \ + } \ + return ast_str_hash(key); \ +} + +/*! + * \brief Creates a compare function for a structure string field. + * \param stype The structure type + * \param field The string field in the structure to compare + * + * AO2_STRING_FIELD_CMP_FN(mystruct, myfield) will produce a function + * named mystruct_cmp_fn which compares mystruct->myfield. + */ +#define AO2_STRING_FIELD_CMP_FN(stype, field) \ +static int stype ## _cmp_fn(void *obj, void *arg, int flags) \ +{ \ + const struct stype *object_left = obj, *object_right = arg; \ + const char *right_key = arg; \ + int cmp; \ + switch (flags & OBJ_SEARCH_MASK) { \ + case OBJ_SEARCH_OBJECT: \ + right_key = object_right->field; \ + case OBJ_SEARCH_KEY: \ + cmp = strcmp(object_left->field, right_key); \ + break; \ + case OBJ_SEARCH_PARTIAL_KEY: \ + cmp = strncmp(object_left->field, right_key, strlen(right_key)); \ + break; \ + default: \ + cmp = 0; \ + break; \ + } \ + if (cmp) { \ + return 0; \ + } \ + return CMP_MATCH; \ +} + +/*! + * \brief Creates a sort function for a structure string field. + * \param stype The structure type + * \param field The string field in the structure to compare + * + * AO2_STRING_FIELD_SORT_FN(mystruct, myfield) will produce a function + * named mystruct_sort_fn which compares mystruct->myfield. + */ +#define AO2_STRING_FIELD_SORT_FN(stype, field) \ +static int stype ## _sort_fn(const void *obj, const void *arg, int flags) \ +{ \ + const struct stype *object_left = obj; \ + const struct stype *object_right = arg; \ + const char *right_key = arg; \ + int cmp; \ +\ + switch (flags & OBJ_SEARCH_MASK) { \ + case OBJ_SEARCH_OBJECT: \ + right_key = object_right->field; \ + /* Fall through */ \ + case OBJ_SEARCH_KEY: \ + cmp = strcmp(object_left->field, right_key); \ + break; \ + case OBJ_SEARCH_PARTIAL_KEY: \ + cmp = strncmp(object_left->field, right_key, strlen(right_key)); \ + break; \ + default: \ + cmp = 0; \ + break; \ + } \ + return cmp; \ +} + #endif /* _ASTERISK_ASTOBJ2_H */ diff --git a/include/asterisk/bridge_channel_internal.h b/include/asterisk/bridge_channel_internal.h index 7f7d5a88b..fb8e781e8 100644 --- a/include/asterisk/bridge_channel_internal.h +++ b/include/asterisk/bridge_channel_internal.h @@ -151,47 +151,20 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel, void bridge_channel_internal_pull(struct ast_bridge_channel *bridge_channel); /*! - * \brief Internal bridge channel wait condition and associated result. - */ -struct bridge_channel_internal_cond { - /*! Lock for the data structure */ - ast_mutex_t lock; - /*! Wait condition */ - ast_cond_t cond; - /*! Wait until done */ - int done; - /*! The bridge channel */ - struct ast_bridge_channel *bridge_channel; -}; - -/*! - * \internal - * \brief Wait for the expected signal. - * \since 13.5.0 - * - * \param cond the wait object - * - * \return Nothing - */ -void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond); - -/*! - * \internal - * \brief Signal the condition wait. - * \since 13.5.0 + * \brief Signal imparting threads to wake up. + * \since 13.9.0 * - * \param cond the wait object + * \param chan Channel imparted that we need to signal. * * \return Nothing */ -void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond); +void bridge_channel_impart_signal(struct ast_channel *chan); /*! * \internal * \brief Join the bridge_channel to the bridge (blocking) * * \param bridge_channel The Channel in the bridge - * \param cond data used for signaling * * \note The bridge_channel->swap holds a channel reference for the swap * channel going into the bridging system. The ref ensures that the swap @@ -206,8 +179,7 @@ void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond); * \retval 0 bridge channel successfully joined the bridge * \retval -1 bridge channel failed to join the bridge */ -int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, - struct bridge_channel_internal_cond *cond); +int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel); /*! * \internal diff --git a/include/asterisk/bridge_technology.h b/include/asterisk/bridge_technology.h index 7de573a23..7f5d746f8 100644 --- a/include/asterisk/bridge_technology.h +++ b/include/asterisk/bridge_technology.h @@ -107,6 +107,9 @@ struct ast_bridge_technology { * \retval -1 on failure * * \note On entry, bridge is already locked. + * + * \note The bridge technology must tollerate a failed to join channel + * until it can be kicked from the bridge. */ int (*join)(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel); /*! diff --git a/include/asterisk/features.h b/include/asterisk/features.h index b63124c2f..a4aed5d18 100644 --- a/include/asterisk/features.h +++ b/include/asterisk/features.h @@ -51,6 +51,7 @@ int ast_bridge_call(struct ast_channel *chan, struct ast_channel *peer, struct a /*! * \brief Bridge a call, and add additional flags to the bridge * + * \details * This does the same thing as \ref ast_bridge_call, except that once the bridge * is created, the provided flags are set on the bridge. The provided flags are * added to the bridge's flags; they will not clear any flags already set. @@ -70,6 +71,7 @@ int ast_bridge_call_with_flags(struct ast_channel *chan, struct ast_channel *pee * \brief Add an arbitrary channel to a bridge * \since 12.0.0 * + * \details * The channel that is being added to the bridge can be in any state: unbridged, * bridged, answered, unanswered, etc. The channel will be added asynchronously, * meaning that when this function returns once the channel has been added to @@ -87,11 +89,16 @@ int ast_bridge_call_with_flags(struct ast_channel *chan, struct ast_channel *pee * \param features Features for this channel in the bridge * \param play_tone Indicates if a tone should be played to the channel * \param xfersound Sound that should be used to indicate transfer with play_tone + * + * \note The features parameter must be NULL or obtained by + * ast_bridge_features_new(). You must not dereference features + * after calling even if the call fails. + * * \retval 0 Success * \retval -1 Failure */ int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan, - struct ast_bridge_features *features, int play_tone, const char *xfersound); + struct ast_bridge_features *features, int play_tone, const char *xfersound); diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 125473fa5..696074159 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -19,6 +19,13 @@ #ifndef _RES_PJSIP_H #define _RES_PJSIP_H +#include <pjsip.h> +/* Needed for SUBSCRIBE, NOTIFY, and PUBLISH method definitions */ +#include <pjsip_simple.h> +#include <pjsip/sip_transaction.h> +#include <pj/timer.h> +#include <pjlib.h> + #include "asterisk/stringfields.h" /* Needed for struct ast_sockaddr */ #include "asterisk/netsock2.h" @@ -1174,8 +1181,9 @@ struct ast_sip_auth *ast_sip_get_artificial_auth(void); */ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); -/*! - * \page Threading model for SIP +/*! \defgroup pjsip_threading PJSIP Threading Model + * @{ + * \page PJSIP PJSIP Threading Model * * There are three major types of threads that SIP will have to deal with: * \li Asterisk threads @@ -1224,6 +1232,19 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); * previous tasks pushed with the same serializer have completed. For more information * on serializers and the benefits they provide, see \ref ast_threadpool_serializer * + * \par Scheduler + * + * Some situations require that a task run periodically or at a future time. Normally + * the ast_sched functionality would be used but ast_sched only uses 1 thread for all + * tasks and that thread isn't registered with PJLIB and therefore can't do any PJSIP + * related work. + * + * ast_sip_sched uses ast_sched only as a scheduled queue. When a task is ready to run, + * it's pushed to a Serializer to be invoked asynchronously by a Servant. This ensures + * that the task is executed in a PJLIB registered thread and allows the ast_sched thread + * to immediately continue processing the queue. The Serializer used by ast_sip_sched + * is one of your choosing or a random one from the res_pjsip pool if you don't choose one. + * * \note * * Do not make assumptions about individual threads based on a corresponding serializer. @@ -1232,6 +1253,8 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); * tasks, even though they are all guaranteed to be executed in sequence. */ +typedef int (*ast_sip_task)(void *user_data); + /*! * \brief Create a new serializer for SIP tasks * @@ -1369,6 +1392,214 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si int ast_sip_thread_is_servant(void); /*! + * \brief Task flags for the res_pjsip scheduler + * + * The default is AST_SIP_SCHED_TASK_FIXED + * | AST_SIP_SCHED_TASK_DATA_NOT_AO2 + * | AST_SIP_SCHED_TASK_DATA_NO_CLEANUP + * | AST_SIP_SCHED_TASK_PERIODIC + */ +enum ast_sip_scheduler_task_flags { + /*! + * The defaults + */ + AST_SIP_SCHED_TASK_DEFAULTS = (0 << 0), + + /*! + * Run at a fixed interval. + * Stop scheduling if the callback returns 0. + * Any other value is ignored. + */ + AST_SIP_SCHED_TASK_FIXED = (0 << 0), + /*! + * Run at a variable interval. + * Stop scheduling if the callback returns 0. + * Any other return value is used as the new interval. + */ + AST_SIP_SCHED_TASK_VARIABLE = (1 << 0), + + /*! + * The task data is not an AO2 object. + */ + AST_SIP_SCHED_TASK_DATA_NOT_AO2 = (0 << 1), + /*! + * The task data is an AO2 object. + * A reference count will be held by the scheduler until + * after the task has run for the final time (if ever). + */ + AST_SIP_SCHED_TASK_DATA_AO2 = (1 << 1), + + /*! + * Don't take any cleanup action on the data + */ + AST_SIP_SCHED_TASK_DATA_NO_CLEANUP = (0 << 3), + /*! + * If AST_SIP_SCHED_TASK_DATA_AO2 is set, decrement the reference count + * otherwise call ast_free on it. + */ + AST_SIP_SCHED_TASK_DATA_FREE = ( 1 << 3 ), + + /*! \brief AST_SIP_SCHED_TASK_PERIODIC + * The task is scheduled at multiples of interval + * \see Interval + */ + AST_SIP_SCHED_TASK_PERIODIC = (0 << 4), + /*! \brief AST_SIP_SCHED_TASK_DELAY + * The next invocation of the task is at last finish + interval + * \see Interval + */ + AST_SIP_SCHED_TASK_DELAY = (1 << 4), +}; + +/*! + * \brief Scheduler task data structure + */ +struct ast_sip_sched_task; + +/*! + * \brief Schedule a task to run in the res_pjsip thread pool + * \since 13.9.0 + * + * \param serializer The serializer to use. If NULL, don't use a serializer (see note below) + * \param interval The invocation interval in milliseconds (see note below) + * \param sip_task The task to invoke + * \param name An optional name to associate with the task + * \param task_data Optional data to pass to the task + * \param flags One of enum ast_sip_scheduler_task_type + * + * \returns Pointer to \ref ast_sip_sched_task ao2 object which must be dereferenced when done. + * + * \paragraph Serialization + * + * Specifying a serializer guarantees serialized execution but NOT specifying a serializer + * may still result in tasks being effectively serialized if the thread pool is busy. + * The point of the serializer BTW is not to prevent parallel executions of the SAME task. + * That happens automatically (see below). It's to prevent the task from running at the same + * time as other work using the same serializer, whether or not it's being run by the scheduler. + * + * \paragraph Interval + * + * The interval is used to calculate the next time the task should run. There are two models. + * + * \ref AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at the + * specific interval. That is, every \ref "interval" milliseconds, regardless of how long the task + * takes. If the task takes longer than \ref interval, it will be scheduled at the next available + * multiple of \ref interval. For exmaple: If the task has an interval of 60 seconds and the task + * takes 70 seconds, the next invocation will happen at 120 seconds. + * + * \ref AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should start + * at \ref interval milliseconds after the current invocation has finished. + * + */ +struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer, + int interval, ast_sip_task sip_task, char *name, void *task_data, + enum ast_sip_scheduler_task_flags flags); + +/*! + * \brief Cancels the next invocation of a task + * \since 13.9.0 + * + * \param schtd The task structure pointer + * \retval 0 Success + * \retval -1 Failure + * \note Only cancels future invocations not the currently running invocation. + */ +int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd); + +/*! + * \brief Cancels the next invocation of a task by name + * \since 13.9.0 + * + * \param name The task name + * \retval 0 Success + * \retval -1 Failure + * \note Only cancels future invocations not the currently running invocation. + */ +int ast_sip_sched_task_cancel_by_name(const char *name); + +/*! + * \brief Gets the last start and end times of the task + * \since 13.9.0 + * + * \param schtd The task structure pointer + * \param[out] when_queued Pointer to a timeval structure to contain the time when queued + * \param[out] last_start Pointer to a timeval structure to contain the time when last started + * \param[out] last_end Pointer to a timeval structure to contain the time when last ended + * \retval 0 Success + * \retval -1 Failure + * \note Any of the pointers can be NULL if you don't need them. + */ +int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd, + struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end); + +/*! + * \brief Gets the last start and end times of the task by name + * \since 13.9.0 + * + * \param name The task name + * \param[out] when_queued Pointer to a timeval structure to contain the time when queued + * \param[out] last_start Pointer to a timeval structure to contain the time when last started + * \param[out] last_end Pointer to a timeval structure to contain the time when last ended + * \retval 0 Success + * \retval -1 Failure + * \note Any of the pointers can be NULL if you don't need them. + */ +int ast_sip_sched_task_get_times_by_name(const char *name, + struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end); + +/*! + * \brief Gets the number of milliseconds until the next invocation + * \since 13.9.0 + * + * \param schtd The task structure pointer + * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled + */ +int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd); + +/*! + * \brief Gets the number of milliseconds until the next invocation + * \since 13.9.0 + * + * \param name The task name + * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled + */ +int ast_sip_sched_task_get_next_run_by_name(const char *name); + +/*! + * \brief Checks if the task is currently running + * \since 13.9.0 + * + * \param schtd The task structure pointer + * \retval 0 not running + * \retval 1 running + */ +int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd); + +/*! + * \brief Checks if the task is currently running + * \since 13.9.0 + * + * \param name The task name + * \retval 0 not running or not found + * \retval 1 running + */ +int ast_sip_sched_is_task_running_by_name(const char *name); + +/*! + * \brief Gets the task name + * \since 13.9.0 + * + * \param schtd The task structure pointer + * \retval 0 success + * \retval 1 failure + */ +int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen); + +/*! + * @} + */ + +/*! * \brief SIP body description * * This contains a type and subtype that will be added as diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 16b30ccb3..4fc295bc4 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -416,14 +416,14 @@ const struct timeval *stasis_message_timestamp(const struct stasis_message *msg) * May return \c NULL, to indicate no representation. The returned object should * be ast_json_unref()'ed. * - * \param message Message to convert to JSON string. + * \param msg Message to convert to JSON string. * \param sanitize Snapshot sanitization callback. * * \return Newly allocated string with JSON message. * \return \c NULL on error. * \return \c NULL if JSON format is not supported. */ -struct ast_json *stasis_message_to_json(struct stasis_message *message, struct stasis_message_sanitizer *sanitize); +struct ast_json *stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize); /*! * \brief Build the AMI representation of the message. @@ -431,12 +431,21 @@ struct ast_json *stasis_message_to_json(struct stasis_message *message, struct s * May return \c NULL, to indicate no representation. The returned object should * be ao2_cleanup()'ed. * - * \param message Message to convert to AMI. + * \param msg Message to convert to AMI. * \return \c NULL on error. * \return \c NULL if AMI format is not supported. */ -struct ast_manager_event_blob *stasis_message_to_ami( - struct stasis_message *message); +struct ast_manager_event_blob *stasis_message_to_ami(struct stasis_message *msg); + +/*! + * \brief Determine if the given message can be converted to AMI. + * + * \param msg Message to see if can be converted to AMI. + * + * \retval 0 Cannot be converted + * \retval non-zero Can be converted + */ +int stasis_message_can_be_ami(struct stasis_message *msg); /*! * \brief Build the \ref AstGenericEvents representation of the message. @@ -444,12 +453,11 @@ struct ast_manager_event_blob *stasis_message_to_ami( * May return \c NULL, to indicate no representation. The returned object should * be disposed of via \ref ast_event_destroy. * - * \param message Message to convert to AMI. + * \param msg Message to convert to AMI. * \return \c NULL on error. * \return \c NULL if AMI format is not supported. */ -struct ast_event *stasis_message_to_event( - struct stasis_message *message); +struct ast_event *stasis_message_to_event(struct stasis_message *msg); /*! @} */ diff --git a/main/bridge.c b/main/bridge.c index fd83cfb7b..ee5ad735b 100644 --- a/main/bridge.c +++ b/main/bridge.c @@ -420,10 +420,12 @@ static void bridge_channel_complete_join(struct ast_bridge *bridge, struct ast_b bridge->technology->name); if (bridge->technology->join && bridge->technology->join(bridge, bridge_channel)) { - ast_debug(1, "Bridge %s: %p(%s) failed to join %s technology\n", + /* We cannot leave the channel partially in the bridge so we must kick it out */ + ast_debug(1, "Bridge %s: %p(%s) failed to join %s technology (Kicking it out)\n", bridge->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan), bridge->technology->name); bridge_channel->just_joined = 1; + ast_bridge_channel_leave_bridge(bridge_channel, BRIDGE_CHANNEL_STATE_END, 0); return; } @@ -1483,6 +1485,150 @@ void ast_bridge_notify_masquerade(struct ast_channel *chan) ao2_ref(bridge_channel, -1); } +/*! + * \brief Internal bridge impart wait condition and associated conditional. + */ +struct bridge_channel_impart_cond { + AST_LIST_ENTRY(bridge_channel_impart_cond) node; + /*! Lock for the data structure */ + ast_mutex_t lock; + /*! Wait condition */ + ast_cond_t cond; + /*! Wait until done */ + int done; +}; + +AST_LIST_HEAD_NOLOCK(bridge_channel_impart_ds_head, bridge_channel_impart_cond); + +/*! + * \internal + * \brief Signal imparting threads to wake up. + * \since 13.9.0 + * + * \param ds_head List of imparting threads to wake up. + * + * \return Nothing + */ +static void bridge_channel_impart_ds_head_signal(struct bridge_channel_impart_ds_head *ds_head) +{ + if (ds_head) { + struct bridge_channel_impart_cond *cond; + + while ((cond = AST_LIST_REMOVE_HEAD(ds_head, node))) { + ast_mutex_lock(&cond->lock); + cond->done = 1; + ast_cond_signal(&cond->cond); + ast_mutex_unlock(&cond->lock); + } + } +} + +static void bridge_channel_impart_ds_head_dtor(void *doomed) +{ + bridge_channel_impart_ds_head_signal(doomed); + ast_free(doomed); +} + +/*! + * \internal + * \brief Fixup the bridge impart datastore. + * \since 13.9.0 + * + * \param data Bridge impart datastore data to fixup from old_chan. + * \param old_chan The datastore is moving from this channel. + * \param new_chan The datastore is moving to this channel. + * + * \return Nothing + */ +static void bridge_channel_impart_ds_head_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan) +{ + /* + * Signal any waiting impart threads. The masquerade is going to kill + * old_chan and we don't need to be waiting on new_chan. + */ + bridge_channel_impart_ds_head_signal(data); +} + +static const struct ast_datastore_info bridge_channel_impart_ds_info = { + .type = "bridge-impart-ds", + .destroy = bridge_channel_impart_ds_head_dtor, + .chan_fixup = bridge_channel_impart_ds_head_fixup, +}; + +/*! + * \internal + * \brief Add impart wait datastore conditional to channel. + * \since 13.9.0 + * + * \param chan Channel to add the impart wait conditional. + * \param cond Imparting conditional to add. + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int bridge_channel_impart_add(struct ast_channel *chan, struct bridge_channel_impart_cond *cond) +{ + struct ast_datastore *datastore; + struct bridge_channel_impart_ds_head *ds_head; + + ast_channel_lock(chan); + + datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL); + if (!datastore) { + datastore = ast_datastore_alloc(&bridge_channel_impart_ds_info, NULL); + if (!datastore) { + ast_channel_unlock(chan); + return -1; + } + ds_head = ast_calloc(1, sizeof(*ds_head)); + if (!ds_head) { + ast_channel_unlock(chan); + ast_datastore_free(datastore); + return -1; + } + datastore->data = ds_head; + ast_channel_datastore_add(chan, datastore); + } else { + ds_head = datastore->data; + ast_assert(ds_head != NULL); + } + + AST_LIST_INSERT_TAIL(ds_head, cond, node); + + ast_channel_unlock(chan); + return 0; +} + +void bridge_channel_impart_signal(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + + ast_channel_lock(chan); + datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL); + if (datastore) { + bridge_channel_impart_ds_head_signal(datastore->data); + } + ast_channel_unlock(chan); +} + +/*! + * \internal + * \brief Block imparting channel thread until signaled. + * \since 13.9.0 + * + * \param cond Imparting conditional to wait for. + * + * \return Nothing + */ +static void bridge_channel_impart_wait(struct bridge_channel_impart_cond *cond) +{ + ast_mutex_lock(&cond->lock); + while (!cond->done) { + ast_cond_wait(&cond->cond, &cond->lock); + } + ast_mutex_unlock(&cond->lock); +} + /* * XXX ASTERISK-21271 make ast_bridge_join() require features to be allocated just like ast_bridge_impart() and not expect the struct back. * @@ -1551,7 +1697,7 @@ int ast_bridge_join(struct ast_bridge *bridge, } if (!res) { - res = bridge_channel_internal_join(bridge_channel, NULL); + res = bridge_channel_internal_join(bridge_channel); } /* Cleanup all the data in the bridge channel after it leaves the bridge. */ @@ -1568,6 +1714,7 @@ int ast_bridge_join(struct ast_bridge *bridge, join_exit:; ast_bridge_run_after_callback(chan); + bridge_channel_impart_signal(chan); if (!(ast_channel_softhangup_internal_flag(chan) & AST_SOFTHANGUP_ASYNCGOTO) && !ast_bridge_setup_after_goto(chan)) { /* Claim the after bridge goto is an async goto destination. */ @@ -1581,14 +1728,13 @@ join_exit:; /*! \brief Thread responsible for imparted bridged channels to be departed */ static void *bridge_channel_depart_thread(void *data) { - struct bridge_channel_internal_cond *cond = data; - struct ast_bridge_channel *bridge_channel = cond->bridge_channel; + struct ast_bridge_channel *bridge_channel = data; if (bridge_channel->callid) { ast_callid_threadassoc_add(bridge_channel->callid); } - bridge_channel_internal_join(bridge_channel, cond); + bridge_channel_internal_join(bridge_channel); /* * cleanup @@ -1601,6 +1747,8 @@ static void *bridge_channel_depart_thread(void *data) bridge_channel->features = NULL; ast_bridge_discard_after_callback(bridge_channel->chan, AST_BRIDGE_AFTER_CB_REASON_DEPART); + /* If join failed there will be impart threads waiting. */ + bridge_channel_impart_signal(bridge_channel->chan); ast_bridge_discard_after_goto(bridge_channel->chan); return NULL; @@ -1609,15 +1757,14 @@ static void *bridge_channel_depart_thread(void *data) /*! \brief Thread responsible for independent imparted bridged channels */ static void *bridge_channel_ind_thread(void *data) { - struct bridge_channel_internal_cond *cond = data; - struct ast_bridge_channel *bridge_channel = cond->bridge_channel; + struct ast_bridge_channel *bridge_channel = data; struct ast_channel *chan; if (bridge_channel->callid) { ast_callid_threadassoc_add(bridge_channel->callid); } - bridge_channel_internal_join(bridge_channel, cond); + bridge_channel_internal_join(bridge_channel); chan = bridge_channel->chan; /* cleanup */ @@ -1634,15 +1781,18 @@ static void *bridge_channel_ind_thread(void *data) ao2_ref(bridge_channel, -1); ast_bridge_run_after_callback(chan); + /* If join failed there will be impart threads waiting. */ + bridge_channel_impart_signal(chan); ast_bridge_run_after_goto(chan); return NULL; } -int ast_bridge_impart(struct ast_bridge *bridge, +static int bridge_impart_internal(struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap, struct ast_bridge_features *features, - enum ast_bridge_impart_flags flags) + enum ast_bridge_impart_flags flags, + struct bridge_channel_impart_cond *cond) { int res = 0; struct ast_bridge_channel *bridge_channel; @@ -1701,27 +1851,20 @@ int ast_bridge_impart(struct ast_bridge *bridge, /* Actually create the thread that will handle the channel */ if (!res) { - struct bridge_channel_internal_cond cond = { - .done = 0, - .bridge_channel = bridge_channel - }; - ast_mutex_init(&cond.lock); - ast_cond_init(&cond.cond, NULL); - + res = bridge_channel_impart_add(chan, cond); + } + if (!res) { if ((flags & AST_BRIDGE_IMPART_CHAN_MASK) == AST_BRIDGE_IMPART_CHAN_INDEPENDENT) { res = ast_pthread_create_detached(&bridge_channel->thread, NULL, - bridge_channel_ind_thread, &cond); + bridge_channel_ind_thread, bridge_channel); } else { res = ast_pthread_create(&bridge_channel->thread, NULL, - bridge_channel_depart_thread, &cond); + bridge_channel_depart_thread, bridge_channel); } if (!res) { - bridge_channel_internal_wait(&cond); + bridge_channel_impart_wait(cond); } - - ast_cond_destroy(&cond.cond); - ast_mutex_destroy(&cond.lock); } if (res) { @@ -1742,6 +1885,32 @@ int ast_bridge_impart(struct ast_bridge *bridge, return 0; } +int ast_bridge_impart(struct ast_bridge *bridge, + struct ast_channel *chan, + struct ast_channel *swap, + struct ast_bridge_features *features, + enum ast_bridge_impart_flags flags) +{ + struct bridge_channel_impart_cond cond = { + .done = 0, + }; + int res; + + ast_mutex_init(&cond.lock); + ast_cond_init(&cond.cond, NULL); + + res = bridge_impart_internal(bridge, chan, swap, features, flags, &cond); + if (res) { + /* Impart failed. Signal any other waiting impart threads */ + bridge_channel_impart_signal(chan); + } + + ast_cond_destroy(&cond.cond); + ast_mutex_destroy(&cond.lock); + + return res; +} + int ast_bridge_depart(struct ast_channel *chan) { struct ast_bridge_channel *bridge_channel; @@ -2318,6 +2487,9 @@ int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan, if (chan_bridge) { struct ast_bridge_channel *bridge_channel; + /* The channel is in a bridge so it is not getting any new features. */ + ast_bridge_features_destroy(features); + ast_bridge_lock_both(bridge, chan_bridge); bridge_channel = bridge_find_channel(chan_bridge, chan); @@ -2340,9 +2512,6 @@ int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan, bridge_dissolve_check_stolen(chan_bridge, bridge_channel); ast_bridge_unlock(chan_bridge); ast_bridge_unlock(bridge); - - /* The channel was in a bridge so it is not getting any new features. */ - ast_bridge_features_destroy(features); } else { /* Slightly less easy case. We need to yank channel A from * where he currently is and impart him into our bridge. @@ -2350,6 +2519,7 @@ int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan, yanked_chan = ast_channel_yank(chan); if (!yanked_chan) { ast_log(LOG_WARNING, "Could not gain control of channel %s\n", ast_channel_name(chan)); + ast_bridge_features_destroy(features); return -1; } if (ast_channel_state(yanked_chan) != AST_STATE_UP) { diff --git a/main/bridge_channel.c b/main/bridge_channel.c index c9262a84a..4baae3cc5 100644 --- a/main/bridge_channel.c +++ b/main/bridge_channel.c @@ -2117,13 +2117,14 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel, if (bridge->dissolved || bridge_channel->state != BRIDGE_CHANNEL_STATE_WAIT || (swap && swap->state != BRIDGE_CHANNEL_STATE_WAIT) - || bridge->v_table->push(bridge, bridge_channel, swap) - || ast_bridge_channel_establish_roles(bridge_channel)) { + || bridge->v_table->push(bridge, bridge_channel, swap)) { ast_debug(1, "Bridge %s: pushing %p(%s) into bridge failed\n", bridge->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan)); return -1; } + ast_bridge_channel_establish_roles(bridge_channel); + if (swap) { int dissolve = ast_test_flag(&bridge->feature_flags, AST_BRIDGE_FLAG_DISSOLVE_EMPTY); @@ -2636,27 +2637,7 @@ static void bridge_channel_event_join_leave(struct ast_bridge_channel *bridge_ch ao2_iterator_destroy(&iter); } -void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond) -{ - ast_mutex_lock(&cond->lock); - while (!cond->done) { - ast_cond_wait(&cond->cond, &cond->lock); - } - ast_mutex_unlock(&cond->lock); -} - -void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond) -{ - if (cond) { - ast_mutex_lock(&cond->lock); - cond->done = 1; - ast_cond_signal(&cond->cond); - ast_mutex_unlock(&cond->lock); - } -} - -int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, - struct bridge_channel_internal_cond *cond) +int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel) { int res = 0; struct ast_bridge_features *channel_features; @@ -2686,7 +2667,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, bridge_channel->bridge->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan)); - bridge_channel_internal_signal(cond); return -1; } ast_channel_internal_bridge_set(bridge_channel->chan, bridge_channel->bridge); @@ -2721,8 +2701,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, } bridge_reconfigured(bridge_channel->bridge, !bridge_channel->inhibit_colp); - bridge_channel_internal_signal(cond); - if (bridge_channel->state == BRIDGE_CHANNEL_STATE_WAIT) { /* * Indicate a source change since this channel is entering the @@ -2734,6 +2712,7 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel, ast_indicate(bridge_channel->chan, AST_CONTROL_SRCCHANGE); } + bridge_channel_impart_signal(bridge_channel->chan); ast_bridge_unlock(bridge_channel->bridge); /* Must release any swap ref after unlocking the bridge. */ diff --git a/main/core_unreal.c b/main/core_unreal.c index e9b7a8d66..f2404dfca 100644 --- a/main/core_unreal.c +++ b/main/core_unreal.c @@ -808,7 +808,6 @@ int ast_unreal_channel_push_to_bridge(struct ast_channel *ast, struct ast_bridge /* Impart the semi2 channel into the bridge */ if (ast_bridge_impart(bridge, chan, NULL, features, AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) { - ast_bridge_features_destroy(features); ast_channel_unref(chan); return -1; } diff --git a/main/features.c b/main/features.c index 1810b1556..b96cbd68c 100644 --- a/main/features.c +++ b/main/features.c @@ -1104,7 +1104,6 @@ static int bridge_exec(struct ast_channel *chan, const char *data) xfer_cfg ? xfer_cfg->xfersound : NULL); ao2_cleanup(xfer_cfg); if (bridge_add_failed) { - ast_bridge_features_destroy(peer_features); ast_bridge_features_cleanup(&chan_features); ast_bridge_destroy(bridge, 0); goto done; diff --git a/main/format_cap.c b/main/format_cap.c index 17ae18cd4..bf3bd1c4b 100644 --- a/main/format_cap.c +++ b/main/format_cap.c @@ -376,7 +376,7 @@ int ast_format_cap_update_by_allow_disallow(struct ast_format_cap *cap, const ch } - while ((this = strsep(&parse, ",|"))) { + while ((this = ast_strip(strsep(&parse, ",|")))) { int framems = 0; struct ast_format *format = NULL; diff --git a/main/lock.c b/main/lock.c index dd90d7bd9..03f1cd974 100644 --- a/main/lock.c +++ b/main/lock.c @@ -286,17 +286,19 @@ int __ast_pthread_mutex_lock(const char *filename, int lineno, const char *func, if (wait_time > reported_wait && (wait_time % 5) == 0) { __ast_mutex_logger("%s line %d (%s): Deadlock? waited %d sec for mutex '%s'?\n", filename, lineno, func, (int) wait_time, mutex_name); - ast_reentrancy_lock(lt); + if (lt) { + ast_reentrancy_lock(lt); #ifdef HAVE_BKTR - __dump_backtrace(<->backtrace[lt->reentrancy], canlog); + __dump_backtrace(<->backtrace[lt->reentrancy], canlog); #endif - __ast_mutex_logger("%s line %d (%s): '%s' was locked here.\n", - lt->file[ROFFSET], lt->lineno[ROFFSET], - lt->func[ROFFSET], mutex_name); + __ast_mutex_logger("%s line %d (%s): '%s' was locked here.\n", + lt->file[ROFFSET], lt->lineno[ROFFSET], + lt->func[ROFFSET], mutex_name); #ifdef HAVE_BKTR - __dump_backtrace(<->backtrace[ROFFSET], canlog); + __dump_backtrace(<->backtrace[ROFFSET], canlog); #endif - ast_reentrancy_unlock(lt); + ast_reentrancy_unlock(lt); + } reported_wait = wait_time; } usleep(200); diff --git a/main/manager.c b/main/manager.c index 7c2155015..ba261e8e9 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1541,6 +1541,17 @@ static AST_RWLIST_HEAD_STATIC(manager_hooks, manager_custom_hook); /*! \brief A container of event documentation nodes */ static AO2_GLOBAL_OBJ_STATIC(event_docs); +static int __attribute__((format(printf, 9, 0))) __manager_event_sessions( + struct ao2_container *sessions, + int category, + const char *event, + int chancount, + struct ast_channel **chans, + const char *file, + int line, + const char *func, + const char *fmt, + ...); static enum add_filter_result manager_add_filter(const char *filter_pattern, struct ao2_container *whitefilters, struct ao2_container *blackfilters); static int match_filter(struct mansession *s, char *eventdata); @@ -1679,37 +1690,75 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl return res; } +#define manager_event_sessions(sessions, category, event, contents , ...) \ + __manager_event_sessions(sessions, category, event, 0, NULL, __FILE__, __LINE__, __PRETTY_FUNCTION__, contents , ## __VA_ARGS__) + +#define any_manager_listeners(sessions) \ + ((sessions && ao2_container_count(sessions)) || !AST_RWLIST_EMPTY(&manager_hooks)) + static void manager_default_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { - RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup); + struct ao2_container *sessions; + struct ast_manager_event_blob *ev; - ev = stasis_message_to_ami(message); + if (!stasis_message_can_be_ami(message)) { + /* Not an AMI message; disregard */ + return; + } + + sessions = ao2_global_obj_ref(mgr_sessions); + if (!any_manager_listeners(sessions)) { + /* Nobody is listening */ + ao2_cleanup(sessions); + return; + } - if (ev == NULL) { - /* Not and AMI message; disregard */ + ev = stasis_message_to_ami(message); + if (!ev) { + /* Conversion failure */ + ao2_cleanup(sessions); return; } - manager_event(ev->event_flags, ev->manager_event, "%s", - ev->extra_fields); + manager_event_sessions(sessions, ev->event_flags, ev->manager_event, + "%s", ev->extra_fields); + ao2_ref(ev, -1); + ao2_cleanup(sessions); } static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { - struct ast_json_payload *payload = stasis_message_data(message); - int class_type = ast_json_integer_get(ast_json_object_get(payload->json, "class_type")); - const char *type = ast_json_string_get(ast_json_object_get(payload->json, "type")); - struct ast_json *event = ast_json_object_get(payload->json, "event"); - RAII_VAR(struct ast_str *, event_buffer, NULL, ast_free); + struct ast_json_payload *payload; + int class_type; + const char *type; + struct ast_json *event; + struct ast_str *event_buffer; + struct ao2_container *sessions; + + sessions = ao2_global_obj_ref(mgr_sessions); + if (!any_manager_listeners(sessions)) { + /* Nobody is listening */ + ao2_cleanup(sessions); + return; + } + + payload = stasis_message_data(message); + class_type = ast_json_integer_get(ast_json_object_get(payload->json, "class_type")); + type = ast_json_string_get(ast_json_object_get(payload->json, "type")); + event = ast_json_object_get(payload->json, "event"); event_buffer = ast_manager_str_from_json_object(event, NULL); if (!event_buffer) { ast_log(AST_LOG_WARNING, "Error while creating payload for event %s\n", type); + ao2_cleanup(sessions); return; } - manager_event(class_type, type, "%s", ast_str_buffer(event_buffer)); + manager_event_sessions(sessions, class_type, type, + "%s", ast_str_buffer(event_buffer)); + ast_free(event_buffer); + ao2_cleanup(sessions); } void ast_manager_publish_event(const char *type, int class_type, struct ast_json *obj) @@ -4698,7 +4747,7 @@ static int action_blind_transfer(struct mansession *s, const struct message *m) const char *name = astman_get_header(m, "Channel"); const char *exten = astman_get_header(m, "Exten"); const char *context = astman_get_header(m, "Context"); - RAII_VAR(struct ast_channel *, chan, NULL, ao2_cleanup); + struct ast_channel *chan; if (ast_strlen_zero(name)) { astman_send_error(s, m, "No channel specified"); @@ -4735,6 +4784,7 @@ static int action_blind_transfer(struct mansession *s, const struct message *m) break; } + ast_channel_unref(chan); return 0; } @@ -5907,7 +5957,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m const char *actionid = astman_get_header(m, "ActionID"); char idText[256]; int numchans = 0; - RAII_VAR(struct ao2_container *, channels, NULL, ao2_cleanup); + struct ao2_container *channels; struct ao2_iterator it_chans; struct stasis_message *msg; @@ -5917,7 +5967,8 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m idText[0] = '\0'; } - if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { + channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()); + if (!channels) { astman_send_error(s, m, "Could not get cached channels"); return 0; } @@ -5969,6 +6020,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m astman_send_list_complete_start(s, m, "CoreShowChannelsComplete", numchans); astman_send_list_complete_end(s); + ao2_ref(channels, -1); return 0; } @@ -6597,11 +6649,10 @@ static int append_event(const char *str, int category) static void append_channel_vars(struct ast_str **pbuf, struct ast_channel *chan) { - RAII_VAR(struct varshead *, vars, NULL, ao2_cleanup); + struct varshead *vars; struct ast_var_t *var; vars = ast_channel_get_manager_vars(chan); - if (!vars) { return; } @@ -6609,62 +6660,67 @@ static void append_channel_vars(struct ast_str **pbuf, struct ast_channel *chan) AST_LIST_TRAVERSE(vars, var, entries) { ast_str_append(pbuf, 0, "ChanVariable(%s): %s=%s\r\n", ast_channel_name(chan), var->name, var->value); } + ao2_ref(vars, -1); } /* XXX see if can be moved inside the function */ AST_THREADSTORAGE(manager_event_buf); #define MANAGER_EVENT_BUF_INITSIZE 256 -int __ast_manager_event_multichan(int category, const char *event, int chancount, - struct ast_channel **chans, const char *file, int line, const char *func, - const char *fmt, ...) +static int __attribute__((format(printf, 9, 0))) __manager_event_sessions_va( + struct ao2_container *sessions, + int category, + const char *event, + int chancount, + struct ast_channel **chans, + const char *file, + int line, + const char *func, + const char *fmt, + va_list ap) { - RAII_VAR(struct ao2_container *, sessions, ao2_global_obj_ref(mgr_sessions), ao2_cleanup); - struct mansession_session *session; - struct manager_custom_hook *hook; struct ast_str *auth = ast_str_alloca(MAX_AUTH_PERM_STRING); const char *cat_str; - va_list ap; struct timeval now; struct ast_str *buf; int i; - if (!(sessions && ao2_container_count(sessions)) && AST_RWLIST_EMPTY(&manager_hooks)) { - return 0; - } - - if (!(buf = ast_str_thread_get(&manager_event_buf, MANAGER_EVENT_BUF_INITSIZE))) { + buf = ast_str_thread_get(&manager_event_buf, MANAGER_EVENT_BUF_INITSIZE); + if (!buf) { return -1; } cat_str = authority_to_str(category, &auth); ast_str_set(&buf, 0, - "Event: %s\r\nPrivilege: %s\r\n", - event, cat_str); + "Event: %s\r\n" + "Privilege: %s\r\n", + event, cat_str); if (timestampevents) { now = ast_tvnow(); ast_str_append(&buf, 0, - "Timestamp: %ld.%06lu\r\n", - (long)now.tv_sec, (unsigned long) now.tv_usec); + "Timestamp: %ld.%06lu\r\n", + (long)now.tv_sec, (unsigned long) now.tv_usec); } if (manager_debug) { static int seq; + ast_str_append(&buf, 0, - "SequenceNumber: %d\r\n", - ast_atomic_fetchadd_int(&seq, 1)); + "SequenceNumber: %d\r\n", + ast_atomic_fetchadd_int(&seq, 1)); ast_str_append(&buf, 0, - "File: %s\r\nLine: %d\r\nFunc: %s\r\n", file, line, func); + "File: %s\r\n" + "Line: %d\r\n" + "Func: %s\r\n", + file, line, func); } if (!ast_strlen_zero(ast_config_AST_SYSTEM_NAME)) { ast_str_append(&buf, 0, - "SystemName: %s\r\n", - ast_config_AST_SYSTEM_NAME); + "SystemName: %s\r\n", + ast_config_AST_SYSTEM_NAME); } - va_start(ap, fmt); ast_str_append_va(&buf, 0, fmt, ap); - va_end(ap); for (i = 0; i < chancount; i++) { append_channel_vars(&buf, chans[i]); } @@ -6675,9 +6731,11 @@ int __ast_manager_event_multichan(int category, const char *event, int chancount /* Wake up any sleeping sessions */ if (sessions) { - struct ao2_iterator i; - i = ao2_iterator_init(sessions, 0); - while ((session = ao2_iterator_next(&i))) { + struct ao2_iterator iter; + struct mansession_session *session; + + iter = ao2_iterator_init(sessions, 0); + while ((session = ao2_iterator_next(&iter))) { ao2_lock(session); if (session->waiting_thread != AST_PTHREADT_NULL) { pthread_kill(session->waiting_thread, SIGURG); @@ -6692,10 +6750,12 @@ int __ast_manager_event_multichan(int category, const char *event, int chancount ao2_unlock(session); unref_mansession(session); } - ao2_iterator_destroy(&i); + ao2_iterator_destroy(&iter); } if (category != EVENT_FLAG_SHUTDOWN && !AST_RWLIST_EMPTY(&manager_hooks)) { + struct manager_custom_hook *hook; + AST_RWLIST_RDLOCK(&manager_hooks); AST_RWLIST_TRAVERSE(&manager_hooks, hook, list) { hook->helper(category, event, ast_str_buffer(buf)); @@ -6706,6 +6766,50 @@ int __ast_manager_event_multichan(int category, const char *event, int chancount return 0; } +static int __attribute__((format(printf, 9, 0))) __manager_event_sessions( + struct ao2_container *sessions, + int category, + const char *event, + int chancount, + struct ast_channel **chans, + const char *file, + int line, + const char *func, + const char *fmt, + ...) +{ + va_list ap; + int res; + + va_start(ap, fmt); + res = __manager_event_sessions_va(sessions, category, event, chancount, chans, + file, line, func, fmt, ap); + va_end(ap); + return res; +} + +int __ast_manager_event_multichan(int category, const char *event, int chancount, + struct ast_channel **chans, const char *file, int line, const char *func, + const char *fmt, ...) +{ + struct ao2_container *sessions = ao2_global_obj_ref(mgr_sessions); + va_list ap; + int res; + + if (!any_manager_listeners(sessions)) { + /* Nobody is listening */ + ao2_cleanup(sessions); + return 0; + } + + va_start(ap, fmt); + res = __manager_event_sessions_va(sessions, category, event, chancount, chans, + file, line, func, fmt, ap); + va_end(ap); + ao2_cleanup(sessions); + return res; +} + /*! \brief * support functions to register/unregister AMI action handlers, */ @@ -9184,6 +9288,7 @@ int ast_str_append_event_header(struct ast_str **fields_string, static void manager_event_blob_dtor(void *obj) { struct ast_manager_event_blob *ev = obj; + ast_string_field_free_memory(ev); } @@ -9195,18 +9300,19 @@ ast_manager_event_blob_create( const char *extra_fields_fmt, ...) { - RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup); + struct ast_manager_event_blob *ev; va_list argp; ast_assert(extra_fields_fmt != NULL); ast_assert(manager_event != NULL); - ev = ao2_alloc(sizeof(*ev), manager_event_blob_dtor); + ev = ao2_alloc_options(sizeof(*ev), manager_event_blob_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); if (!ev) { return NULL; } if (ast_string_field_init(ev, 20)) { + ao2_ref(ev, -1); return NULL; } @@ -9214,10 +9320,8 @@ ast_manager_event_blob_create( ev->event_flags = event_flags; va_start(argp, extra_fields_fmt); - ast_string_field_ptr_build_va(ev, &ev->extra_fields, extra_fields_fmt, - argp); + ast_string_field_ptr_build_va(ev, &ev->extra_fields, extra_fields_fmt, argp); va_end(argp); - ao2_ref(ev, +1); return ev; } diff --git a/main/manager_channels.c b/main/manager_channels.c index adef639e8..ef71c65b1 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -697,28 +697,33 @@ static void channel_hangup_request_cb(void *data, struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); - RAII_VAR(struct ast_str *, extra, NULL, ast_free); - RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); + struct ast_str *extra; + struct ast_str *channel_event_string; struct ast_json *cause; int is_soft; char *manager_event = "HangupRequest"; + if (!obj->snapshot) { + /* No snapshot? Likely an earlier allocation failure creating it. */ + return; + } + extra = ast_str_create(20); if (!extra) { return; } channel_event_string = ast_manager_build_channel_state_string(obj->snapshot); - if (!channel_event_string) { + ast_free(extra); return; } cause = ast_json_object_get(obj->blob, "cause"); if (cause) { ast_str_append(&extra, 0, - "Cause: %jd\r\n", - ast_json_integer_get(cause)); + "Cause: %jd\r\n", + ast_json_integer_get(cause)); } is_soft = ast_json_is_true(ast_json_object_get(obj->blob, "soft")); @@ -727,9 +732,12 @@ static void channel_hangup_request_cb(void *data, } manager_event(EVENT_FLAG_CALL, manager_event, - "%s%s", - ast_str_buffer(channel_event_string), - ast_str_buffer(extra)); + "%s%s", + ast_str_buffer(channel_event_string), + ast_str_buffer(extra)); + + ast_free(channel_event_string); + ast_free(extra); } static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub, diff --git a/main/stasis_message.c b/main/stasis_message.c index c797cdfa0..99721ef3c 100644 --- a/main/stasis_message.c +++ b/main/stasis_message.c @@ -170,17 +170,17 @@ const struct timeval *stasis_message_timestamp(const struct stasis_message *msg) return &msg->timestamp; } -#define INVOKE_VIRTUAL(fn, ...) \ - ({ \ - if (msg == NULL) { \ - return NULL; \ - } \ - ast_assert(msg->type != NULL); \ +#define INVOKE_VIRTUAL(fn, ...) \ + ({ \ + if (!msg) { \ + return NULL; \ + } \ + ast_assert(msg->type != NULL); \ ast_assert(msg->type->vtable != NULL); \ - if (msg->type->vtable->fn == NULL) { \ - return NULL; \ - } \ - msg->type->vtable->fn(__VA_ARGS__); \ + if (!msg->type->vtable->fn) { \ + return NULL; \ + } \ + msg->type->vtable->fn(__VA_ARGS__); \ }) struct ast_manager_event_blob *stasis_message_to_ami(struct stasis_message *msg) @@ -199,3 +199,18 @@ struct ast_event *stasis_message_to_event(struct stasis_message *msg) { return INVOKE_VIRTUAL(to_event, msg); } + +#define HAS_VIRTUAL(fn, msg) \ + ({ \ + if (!msg) { \ + return 0; \ + } \ + ast_assert(msg->type != NULL); \ + ast_assert(msg->type->vtable != NULL); \ + !!msg->type->vtable->fn; \ + }) + +int stasis_message_can_be_ami(struct stasis_message *msg) +{ + return HAS_VIRTUAL(to_ami, msg); +} diff --git a/res/res_agi.c b/res/res_agi.c index ff3358062..e3839dd6d 100644 --- a/res/res_agi.c +++ b/res/res_agi.c @@ -3736,6 +3736,24 @@ static enum agi_result agi_handle_command(struct ast_channel *chan, AGI *agi, ch return AGI_RESULT_SUCCESS; } + +AST_LIST_HEAD_NOLOCK(deferred_frames, ast_frame); + +static void queue_deferred_frames(struct deferred_frames *deferred_frames, + struct ast_channel *chan) +{ + struct ast_frame *f; + + if (!AST_LIST_EMPTY(deferred_frames)) { + ast_channel_lock(chan); + while ((f = AST_LIST_REMOVE_HEAD(deferred_frames, frame_list))) { + ast_queue_frame_head(chan, f); + ast_frfree(f); + } + ast_channel_unlock(chan); + } +} + static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi, int pid, int *status, int dead, int argc, char *argv[]) { struct ast_channel *c; @@ -3754,6 +3772,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi const char *sighup_str; const char *exit_on_hangup_str; int exit_on_hangup; + struct deferred_frames deferred_frames; + + AST_LIST_HEAD_INIT_NOLOCK(&deferred_frames); ast_channel_lock(chan); sighup_str = pbx_builtin_getvar_helper(chan, "AGISIGHUP"); @@ -3815,8 +3836,20 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi /* Write, ignoring errors */ if (write(agi->audio, f->data.ptr, f->datalen) < 0) { } + ast_frfree(f); + } else if (ast_is_deferrable_frame(f)) { + struct ast_frame *dup_f; + + if ((dup_f = ast_frisolate(f))) { + AST_LIST_INSERT_HEAD(&deferred_frames, dup_f, frame_list); + } + + if (dup_f != f) { + ast_frfree(f); + } + } else { + ast_frfree(f); } - ast_frfree(f); } } else if (outfd > -1) { size_t len = sizeof(buf); @@ -3864,6 +3897,8 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi buf[buflen - 1] = '\0'; } + queue_deferred_frames(&deferred_frames, chan); + if (agidebug) ast_verbose("<%s>AGI Rx << %s\n", ast_channel_name(chan), buf); cmd_status = agi_handle_command(chan, agi, buf, dead); @@ -3885,6 +3920,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi } } } + + queue_deferred_frames(&deferred_frames, chan); + if (agi->speech) { ast_speech_destroy(agi->speech); } diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 7f6175115..fc9fbe4dc 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -3608,11 +3608,7 @@ int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void serializer = serializer_pool[pos]; } - if (serializer) { - return ast_taskprocessor_push(serializer, sip_task, task_data); - } else { - return ast_threadpool_push(sip_threadpool, sip_task, task_data); - } + return ast_taskprocessor_push(serializer, sip_task, task_data); } struct sync_task_data { @@ -4131,6 +4127,11 @@ static int load_module(void) goto error; } + if (ast_sip_initialize_scheduler()) { + ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n"); + goto error; + } + /* Now load all the pjproject infrastructure. */ if (load_pjsip()) { goto error; @@ -4171,8 +4172,10 @@ static int load_module(void) return AST_MODULE_LOAD_SUCCESS; error: - /* These functions all check for NULLs and are safe to call at any time */ unload_pjsip(NULL); + + /* These functions all check for NULLs and are safe to call at any time */ + ast_sip_destroy_scheduler(); serializer_pool_shutdown(); ast_threadpool_shutdown(sip_threadpool); @@ -4203,7 +4206,7 @@ static int unload_module(void) * so we have to push the work to the threadpool to handle */ ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL); - + ast_sip_destroy_scheduler(); serializer_pool_shutdown(); ast_threadpool_shutdown(sip_threadpool); diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index 72a4387f1..04cd85408 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -325,4 +325,23 @@ struct ast_sip_contact_status *ast_res_pjsip_find_or_create_contact_status(const */ int ast_sip_validate_uri_length(const char *uri); +/*! + * \brief Initialize scheduler + * \since 13.9.0 + * + * \retval -1 failure + * \retval 0 success + */ +int ast_sip_initialize_scheduler(void); + +/*! + * \internal + * \brief Destroy scheduler + * \since 13.9.0 + * + * \retval -1 failure + * \retval 0 success + */ +int ast_sip_destroy_scheduler(void); + #endif /* RES_PJSIP_PRIVATE_H_ */ diff --git a/res/res_pjsip/location.c b/res/res_pjsip/location.c index 19fa803f0..2a7dbb54f 100644 --- a/res/res_pjsip/location.c +++ b/res/res_pjsip/location.c @@ -417,38 +417,64 @@ static int permanent_uri_sort_fn(const void *obj_left, const void *obj_right, in int ast_sip_validate_uri_length(const char *contact_uri) { - pjsip_uri *uri; - pjsip_sip_uri *sip_uri; - pj_pool_t *pool; int max_length = pj_max_hostname - 1; + char *contact = ast_strdupa(contact_uri); + char *host; + char *at; + int theres_a_port = 0; if (strlen(contact_uri) > pjsip_max_url_size - 1) { return -1; } - if (!(pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "uri validation", 512, 512))) { - ast_log(LOG_ERROR, "Unable to allocate pool for uri validation\n"); + contact = ast_strip_quoted(contact, "<", ">"); + + if (!strncasecmp(contact, "sip:", 4)) { + host = contact + 4; + } else if (!strncasecmp(contact, "sips:", 5)) { + host = contact + 5; + } else { + /* Not a SIP URI */ return -1; } - if (!(uri = pjsip_parse_uri(pool, (char *)contact_uri, strlen(contact_uri), 0)) || - (!PJSIP_URI_SCHEME_IS_SIP(uri) && !PJSIP_URI_SCHEME_IS_SIPS(uri))) { - pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); - return -1; + at = strchr(contact, '@'); + if (at) { + /* sip[s]:user@host */ + host = at + 1; + } + + if (host[0] == '[') { + /* Host is an IPv6 address. Just get up to the matching bracket */ + char *close_bracket; + + close_bracket = strchr(host, ']'); + if (!close_bracket) { + return -1; + } + close_bracket++; + if (*close_bracket == ':') { + theres_a_port = 1; + } + *close_bracket = '\0'; + } else { + /* uri parameters could contain ';' so trim them off first */ + host = strsep(&host, ";?"); + /* Host is FQDN or IPv4 address. Need to find closing delimiter */ + if (strchr(host, ':')) { + theres_a_port = 1; + host = strsep(&host, ":"); + } } - sip_uri = pjsip_uri_get_uri(uri); - if (sip_uri->port == 0) { + if (!theres_a_port) { max_length -= strlen("_sips.tcp."); } - if (sip_uri->host.slen > max_length) { - pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + if (strlen(host) > max_length) { return -1; } - pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); - return 0; } diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c new file mode 100644 index 000000000..a5d406cb5 --- /dev/null +++ b/res/res_pjsip/pjsip_scheduler.c @@ -0,0 +1,495 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2016, Fairview 5 Engineering, LLC + * + * George Joseph <george.joseph@fairview5.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief res_pjsip Scheduler + * + * \author George Joseph <george.joseph@fairview5.com> + */ + +#include "asterisk.h" + +ASTERISK_REGISTER_FILE() + +#include "asterisk/res_pjsip.h" +#include "include/res_pjsip_private.h" +#include "asterisk/res_pjsip_cli.h" + +#define TASK_BUCKETS 53 + +static struct ast_sched_context *scheduler_context; +static struct ao2_container *tasks; +static int task_count; + +struct ast_sip_sched_task { + /*! ast_sip_sched task id */ + uint32_t task_id; + /*! ast_sched scheudler id */ + int current_scheduler_id; + /*! task is currently running */ + int is_running; + /*! task */ + ast_sip_task task; + /*! task data */ + void *task_data; + /*! reschedule interval in milliseconds */ + int interval; + /*! the time the task was queued */ + struct timeval when_queued; + /*! the last time the task was started */ + struct timeval last_start; + /*! the last time the task was ended */ + struct timeval last_end; + /*! times run */ + int run_count; + /*! the task reschedule, cleanup and policy flags */ + enum ast_sip_scheduler_task_flags flags; + /*! the serializer to be used (if any) */ + struct ast_taskprocessor *serializer; + /* A name to be associated with the task */ + char name[0]; +}; + +AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name); +AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name); +AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name); + +static int push_to_serializer(const void *data); + +/* + * This function is run in the context of the serializer. + * It runs the task with a simple call and reschedules based on the result. + */ +static int run_task(void *data) +{ + RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup); + int res; + int delay; + + ao2_lock(schtd); + schtd->last_start = ast_tvnow(); + schtd->is_running = 1; + schtd->run_count++; + ao2_unlock(schtd); + + res = schtd->task(schtd->task_data); + + ao2_lock(schtd); + schtd->is_running = 0; + schtd->last_end = ast_tvnow(); + + /* + * Don't restart if the task returned 0 or if the interval + * was set to 0 while the task was running + */ + if (!res || !schtd->interval) { + schtd->interval = 0; + ao2_unlock(schtd); + ao2_unlink(tasks, schtd); + return -1; + } + + if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) { + schtd->interval = res; + } + + if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) { + delay = schtd->interval; + } else { + delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval); + } + + schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd); + if (schtd->current_scheduler_id < 0) { + schtd->interval = 0; + ao2_unlock(schtd); + ao2_unlink(tasks, schtd); + return -1; + } + + ao2_unlock(schtd); + + return 0; +} + +/* + * This function is run by the scheduler thread. Its only job is to push the task + * to the serialize and return. It returns 0 so it's not rescheduled. + */ +static int push_to_serializer(const void *data) +{ + struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data; + + if (ast_sip_push_task(schtd->serializer, run_task, schtd)) { + ao2_ref(schtd, -1); + } + + return 0; +} + +int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd) +{ + int res; + + if (!ao2_ref_and_lock(schtd)) { + return -1; + } + + if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) { + ao2_unlock_and_unref(schtd); + return 0; + } + + schtd->interval = 0; + ao2_unlock_and_unref(schtd); + ao2_unlink(tasks, schtd); + res = ast_sched_del(scheduler_context, schtd->current_scheduler_id); + + return res; +} + +int ast_sip_sched_task_cancel_by_name(const char *name) +{ + RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + + if (ast_strlen_zero(name)) { + return -1; + } + + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!schtd) { + return -1; + } + + return ast_sip_sched_task_cancel(schtd); +} + + +int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd, + struct timeval *queued, struct timeval *last_start, struct timeval *last_end) +{ + if (!ao2_ref_and_lock(schtd)) { + return -1; + } + + if (queued) { + memcpy(queued, &schtd->when_queued, sizeof(struct timeval)); + } + if (last_start) { + memcpy(last_start, &schtd->last_start, sizeof(struct timeval)); + } + if (last_end) { + memcpy(last_end, &schtd->last_end, sizeof(struct timeval)); + } + + ao2_unlock_and_unref(schtd); + + return 0; +} + +int ast_sip_sched_task_get_times_by_name(const char *name, + struct timeval *queued, struct timeval *last_start, struct timeval *last_end) +{ + RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + + if (ast_strlen_zero(name)) { + return -1; + } + + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!schtd) { + return -1; + } + + return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end); +} + +int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen) +{ + if (maxlen <= 0) { + return -1; + } + + if (!ao2_ref_and_lock(schtd)) { + return -1; + } + + ast_copy_string(name, schtd->name, maxlen); + + ao2_unlock_and_unref(schtd); + + return 0; +} + +int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd) +{ + int delay; + struct timeval since_when; + struct timeval now; + + if (!ao2_ref_and_lock(schtd)) { + return -1; + } + + if (schtd->interval) { + delay = schtd->interval; + now = ast_tvnow(); + + if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) { + since_when = schtd->is_running ? now : schtd->last_end; + } else { + since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued; + } + + delay -= ast_tvdiff_ms(now, since_when); + + delay = delay < 0 ? 0 : delay; + } else { + delay = -1; + } + + ao2_unlock_and_unref(schtd); + + return delay; +} + +int ast_sip_sched_task_get_next_run_by_name(const char *name) +{ + RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + + if (ast_strlen_zero(name)) { + return -1; + } + + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!schtd) { + return -1; + } + + return ast_sip_sched_task_get_next_run(schtd); +} + +int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd) +{ + if (!schtd) { + return 0; + } + + return schtd->is_running; +} + +int ast_sip_sched_is_task_running_by_name(const char *name) +{ + RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup); + + if (ast_strlen_zero(name)) { + return 0; + } + + schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!schtd) { + return 0; + } + + return schtd->is_running; +} + +static void schtd_destructor(void *data) +{ + struct ast_sip_sched_task *schtd = data; + + if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) { + /* release our own ref, then release the callers if asked to do so */ + ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1); + } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) { + ast_free(schtd->task_data); + } +} + +struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer, + int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags) +{ +#define ID_LEN 13 /* task_deadbeef */ + struct ast_sip_sched_task *schtd; + int res; + + if (interval < 0) { + return NULL; + } + + schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor); + if (!schtd) { + return NULL; + } + + schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1); + schtd->serializer = serializer; + schtd->task = sip_task; + if (!ast_strlen_zero(name)) { + strcpy(schtd->name, name); /* Safe */ + } else { + sprintf(schtd->name, "task_%08x", schtd->task_id); + } + schtd->task_data = task_data; + schtd->flags = flags; + schtd->interval = interval; + schtd->when_queued = ast_tvnow(); + + if (flags & AST_SIP_SCHED_TASK_DATA_AO2) { + ao2_ref(task_data, +1); + } + res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd); + if (res < 0) { + ao2_ref(schtd, -1); + return NULL; + } else { + schtd->current_scheduler_id = res; + ao2_link(tasks, schtd); + } + + return schtd; +#undef ID_LEN +} + +static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ao2_iterator i; + struct ast_sip_sched_task *schtd; + const char *log_format = ast_logger_get_dateformat(); + struct ast_tm tm; + char queued[32]; + char last_start[32]; + char last_end[32]; + int datelen; + struct timeval now = ast_tvnow(); + const char *separator = "======================================"; + + switch (cmd) { + case CLI_INIT: + e->command = "pjsip show scheduled_tasks"; + e->usage = "Usage: pjsip show scheduled_tasks\n" + " Show all scheduled tasks\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != 3) { + return CLI_SHOWUSAGE; + } + + ast_localtime(&now, &tm, NULL); + datelen = ast_strftime(queued, sizeof(queued), log_format, &tm); + + ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n"); + + ast_cli(a->fd, " %1$-24s %2$-8s %3$-9s %4$-7s %6$-*5$s %7$-*5$s %8$-*5$s\n", + "Task Name", "Interval", "Times Run", "State", + datelen, "Queued", "Last Started", "Last Ended"); + + ast_cli(a->fd, " %1$-24.24s %2$-8.8s %3$-9.9s %4$-7.7s %6$-*5$.*5$s %7$-*5$.*5$s %8$-*5$.*5$s\n", + separator, separator, separator, separator, + datelen, separator, separator, separator); + + + ao2_ref(tasks, +1); + ao2_rdlock(tasks); + i = ao2_iterator_init(tasks, 0); + while ((schtd = ao2_iterator_next(&i))) { + + ast_localtime(&schtd->when_queued, &tm, NULL); + ast_strftime(queued, sizeof(queued), log_format, &tm); + + if (ast_tvzero(schtd->last_start)) { + strcpy(last_start, "not yet started"); + } else { + ast_localtime(&schtd->last_start, &tm, NULL); + ast_strftime(last_start, sizeof(last_start), log_format, &tm); + } + + if (ast_tvzero(schtd->last_end)) { + if (ast_tvzero(schtd->last_start)) { + strcpy(last_end, "not yet started"); + } else { + strcpy(last_end, "running"); + } + } else { + ast_localtime(&schtd->last_end, &tm, NULL); + ast_strftime(last_end, sizeof(last_end), log_format, &tm); + } + + ast_cli(a->fd, " %1$-24.24s %2$-8.3f %3$-9d %4$-7s %6$-*5$s %7$-*5$s %8$-*5$s\n", + schtd->name, + schtd->interval / 1000.0, + schtd->run_count, + schtd->is_running ? "running" : "waiting", + datelen, queued, last_start, last_end); + ao2_cleanup(schtd); + } + ao2_iterator_destroy(&i); + ao2_unlock(tasks); + ao2_ref(tasks, -1); + ast_cli(a->fd, "\n"); + + return CLI_SUCCESS; +} + +static struct ast_cli_entry cli_commands[] = { + AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"), +}; + +int ast_sip_initialize_scheduler(void) +{ + if (!(scheduler_context = ast_sched_context_create())) { + ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n"); + return -1; + } + + if (ast_sched_start_thread(scheduler_context)) { + ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n"); + ast_sched_context_destroy(scheduler_context); + return -1; + } + + tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, + TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn); + if (!tasks) { + ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n"); + ast_sched_context_destroy(scheduler_context); + return -1; + } + + ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands)); + + return 0; +} + +int ast_sip_destroy_scheduler(void) +{ + ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); + + if (scheduler_context) { + ast_sched_context_destroy(scheduler_context); + } + + ao2_cleanup(tasks); + tasks = NULL; + + return 0; +} diff --git a/res/res_pjsip_caller_id.c b/res/res_pjsip_caller_id.c index 9af2a8a64..efa1b89a8 100644 --- a/res/res_pjsip_caller_id.c +++ b/res/res_pjsip_caller_id.c @@ -424,6 +424,12 @@ static pjsip_fromto_hdr *create_new_id_hdr(const pj_str_t *hdr_name, pjsip_fromt ast_escape_quoted(id->name.str, name_buf, name_buf_len); pj_strdup2(tdata->pool, &id_name_addr->display, name_buf); + } else { + /* + * We need to clear the remnants of the clone or it'll be left set. + * pj_strdup2 is safe to call with a NULL src and it resets both slen and ptr. + */ + pj_strdup2(tdata->pool, &id_name_addr->display, NULL); } pj_strdup2(tdata->pool, &id_uri->user, id->number.str); diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c index 776700490..851a5f77d 100644 --- a/res/res_pjsip_registrar.c +++ b/res/res_pjsip_registrar.c @@ -576,7 +576,6 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co ao2_cleanup(contact_update); } else { /* We want to report the user agent that was actually in the removed contact */ - user_agent = ast_strdupa(contact->user_agent); ast_sip_location_delete_contact(contact); ast_verb(3, "Removed contact '%s' from AOR '%s' due to request\n", contact_uri, aor_name); ast_test_suite_event_notify("AOR_CONTACT_REMOVED", @@ -585,7 +584,7 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co "UserAgent: %s", contact_uri, aor_name, - user_agent); + contact->user_agent); } } diff --git a/res/res_pjsip_transport_management.c b/res/res_pjsip_transport_management.c index eb0240438..afd94eb1f 100644 --- a/res/res_pjsip_transport_management.c +++ b/res/res_pjsip_transport_management.c @@ -24,6 +24,8 @@ #include "asterisk.h" +#include <signal.h> + #include <pjsip.h> #include <pjsip_ua.h> @@ -93,7 +95,7 @@ static void *keepalive_transport_thread(void *data) /* Once loaded this module just keeps on going as it is unsafe to stop and change the underlying * callback for the transport manager. */ - while (1) { + while (keepalive_interval) { sleep(keepalive_interval); ao2_callback(transports, OBJ_NODATA, keepalive_transport_cb, NULL); } @@ -347,7 +349,19 @@ static int load_module(void) static int unload_module(void) { - /* This will never get called */ + pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()); + + if (keepalive_interval) { + keepalive_interval = 0; + pthread_kill(keepalive_thread, SIGURG); + pthread_join(keepalive_thread, NULL); + } + + ast_sched_context_destroy(sched); + ao2_ref(transports, -1); + + ast_sip_unregister_service(&idle_monitor_module); + pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback); return 0; } diff --git a/res/stasis/control.c b/res/stasis/control.c index 97b0b8809..b2b076b73 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -323,7 +323,7 @@ static int app_control_dial(struct stasis_app_control *control, AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) { ast_hangup(new_chan); } else { - control_add_channel_to_bridge(control, chan, bridge); + control_swap_channel_in_bridge(control, bridge, chan, NULL); } return 0; @@ -982,11 +982,8 @@ static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason, ast_bridge_after_cb_reason_string(reason)); } -int control_add_channel_to_bridge( - struct stasis_app_control *control, - struct ast_channel *chan, void *data) +int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap) { - struct ast_bridge *bridge = data; int res; if (!control || !bridge) { @@ -1039,7 +1036,7 @@ int control_add_channel_to_bridge( res = ast_bridge_impart(bridge, chan, - NULL, /* swap channel */ + swap, NULL, /* features */ AST_BRIDGE_IMPART_CHAN_DEPARTABLE); if (res != 0) { @@ -1055,6 +1052,11 @@ int control_add_channel_to_bridge( return 0; } +int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data) +{ + return control_swap_channel_in_bridge(control, data, chan, NULL); +} + int stasis_app_control_add_channel_to_bridge( struct stasis_app_control *control, struct ast_bridge *bridge) { diff --git a/res/stasis/control.h b/res/stasis/control.h index 1d37a494a..868a8091b 100644 --- a/res/stasis/control.h +++ b/res/stasis/control.h @@ -111,12 +111,20 @@ struct stasis_app *control_app(struct stasis_app_control *control); * \brief Command callback for adding a channel to a bridge * * \param control The control for chan - * \param channel The channel on which commands should be executed - * \param bridge Data to be passed to the callback + * \param chan The channel on which commands should be executed + * \param data Bridge to be passed to the callback + */ +int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data); + +/*! + * \brief Command for swapping a channel in a bridge + * + * \param control The control for chan + * \param chan The channel on which commands should be executed + * \param bridge Bridge to be passed to the callback + * \param swap Channel to swap with when joining the bridge */ -int control_add_channel_to_bridge( - struct stasis_app_control *control, - struct ast_channel *chan, void *obj); +int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap); /*! * \brief Stop playing silence to a channel right now. diff --git a/res/stasis/stasis_bridge.c b/res/stasis/stasis_bridge.c index e41088134..9ffc2d7be 100644 --- a/res/stasis/stasis_bridge.c +++ b/res/stasis/stasis_bridge.c @@ -76,24 +76,54 @@ static void bridge_stasis_run_cb(struct ast_channel *chan, void *data) pbx_exec(chan, app_stasis, app_name); } -static int add_channel_to_bridge( +struct defer_bridge_add_obj { + /*! Bridge to join (has ref) */ + struct ast_bridge *bridge; + /*! + * \brief Channel to swap with in the bridge. (has ref) + * + * \note NULL if not swapping with a channel. + */ + struct ast_channel *swap; +}; + +static void defer_bridge_add_dtor(void *obj) +{ + struct defer_bridge_add_obj *defer = obj; + + ao2_cleanup(defer->bridge); + ast_channel_cleanup(defer->swap); +} + +static int defer_bridge_add( struct stasis_app_control *control, struct ast_channel *chan, void *obj) { - struct ast_bridge *bridge = obj; - int res; + struct defer_bridge_add_obj *defer = obj; - res = control_add_channel_to_bridge(control, - chan, bridge); - return res; + return control_swap_channel_in_bridge(control, defer->bridge, chan, defer->swap); } static void bridge_stasis_queue_join_action(struct ast_bridge *self, - struct ast_bridge_channel *bridge_channel) + struct ast_bridge_channel *bridge_channel, struct ast_bridge_channel *swap) { + struct defer_bridge_add_obj *defer; + + defer = ao2_alloc_options(sizeof(*defer), defer_bridge_add_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!defer) { + return; + } + ao2_ref(self, +1); + defer->bridge = self; + if (swap) { + ast_channel_ref(swap->chan); + defer->swap = swap->chan; + } + ast_channel_lock(bridge_channel->chan); - command_prestart_queue_command(bridge_channel->chan, add_channel_to_bridge, - ao2_bump(self), __ao2_cleanup); + command_prestart_queue_command(bridge_channel->chan, defer_bridge_add, + defer, __ao2_cleanup); ast_channel_unlock(bridge_channel->chan); } @@ -167,18 +197,19 @@ static int bridge_stasis_push(struct ast_bridge *self, struct ast_bridge_channel if (!control && !stasis_app_channel_is_internal(bridge_channel->chan)) { /* channel not in Stasis(), get it there */ + ast_debug(1, "Bridge %s: pushing non-stasis %p(%s) setup to come back in under stasis\n", + self->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan)); + /* Attach after-bridge callback and pass ownership of swap_app to it */ if (ast_bridge_set_after_callback(bridge_channel->chan, bridge_stasis_run_cb, NULL, NULL)) { - ast_log(LOG_ERROR, "Failed to set after bridge callback\n"); + ast_log(LOG_ERROR, + "Failed to set after bridge callback for bridge %s non-stasis push of %s\n", + self->uniqueid, ast_channel_name(bridge_channel->chan)); return -1; } - bridge_stasis_queue_join_action(self, bridge_channel); - if (swap) { - /* nudge the swap channel out of the bridge */ - ast_bridge_channel_leave_bridge(swap, BRIDGE_CHANNEL_STATE_END_NO_DISSOLVE, 0); - } + bridge_stasis_queue_join_action(self, bridge_channel, swap); /* Return -1 so the push fails and the after-bridge callback gets called * This keeps the bridging framework from putting the channel into the bridge diff --git a/tests/test_channel_feature_hooks.c b/tests/test_channel_feature_hooks.c index fbc9786cc..c5d3b9b86 100644 --- a/tests/test_channel_feature_hooks.c +++ b/tests/test_channel_feature_hooks.c @@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/bridge.h" #include "asterisk/bridge_basic.h" #include "asterisk/features.h" +#include "asterisk/format_cache.h" #define TEST_CATEGORY "/channels/features/" @@ -47,6 +48,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #define TEST_BACKEND_NAME "Features Test Logging" +#define TEST_CHANNEL_FORMAT ast_format_slin + /*! \brief A channel technology used for the unit tests */ static struct ast_channel_tech test_features_chan_tech = { .type = CHANNEL_TECH_NAME, @@ -94,6 +97,11 @@ static void wait_for_unbridged(struct ast_channel *channel) #define START_CHANNEL(channel, name, number) do { \ channel = ast_channel_alloc(0, AST_STATE_UP, number, name, number, number, \ "default", NULL, NULL, 0, CHANNEL_TECH_NAME "/" name); \ + ast_channel_nativeformats_set(channel, test_features_chan_tech.capabilities); \ + ast_channel_set_rawwriteformat(channel, TEST_CHANNEL_FORMAT); \ + ast_channel_set_rawreadformat(channel, TEST_CHANNEL_FORMAT); \ + ast_channel_set_writeformat(channel, TEST_CHANNEL_FORMAT); \ + ast_channel_set_readformat(channel, TEST_CHANNEL_FORMAT); \ ast_channel_unlock(channel); \ } while (0) @@ -329,12 +337,19 @@ static int unload_module(void) AST_TEST_UNREGISTER(test_features_channel_interval); ast_channel_unregister(&test_features_chan_tech); + ao2_cleanup(test_features_chan_tech.capabilities); + test_features_chan_tech.capabilities = NULL; return 0; } static int load_module(void) { + test_features_chan_tech.capabilities = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT); + if (!test_features_chan_tech.capabilities) { + return AST_MODULE_LOAD_FAILURE; + } + ast_format_cap_append(test_features_chan_tech.capabilities, TEST_CHANNEL_FORMAT, 0); ast_channel_register(&test_features_chan_tech); AST_TEST_REGISTER(test_features_channel_dtmf); diff --git a/tests/test_message.c b/tests/test_message.c index f7ee02730..f73901ea6 100644 --- a/tests/test_message.c +++ b/tests/test_message.c @@ -232,8 +232,8 @@ static int user_event_hook_cb(int category, const char *event, char *body) static int handler_wait_for_message(struct ast_test *test) { int error = 0; - struct timeval wait_now = ast_tvnow(); - struct timespec wait_time = { .tv_sec = wait_now.tv_sec + 1, .tv_nsec = wait_now.tv_usec * 1000 }; + struct timeval wait = ast_tvadd(ast_tvnow(), ast_tv(5 /* seconds */, 0)); + struct timespec wait_time = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000 }; ast_mutex_lock(&handler_lock); while (!handler_received_message) { @@ -253,8 +253,8 @@ static int handler_wait_for_message(struct ast_test *test) static int user_event_wait_for_events(struct ast_test *test, int expected_events) { int error; - struct timeval wait_now = ast_tvnow(); - struct timespec wait_time = { .tv_sec = wait_now.tv_sec + 1, .tv_nsec = wait_now.tv_usec * 1000 }; + struct timeval wait = ast_tvadd(ast_tvnow(), ast_tv(5 /* seconds */, 0)); + struct timespec wait_time = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000 }; expected_user_events = expected_events; diff --git a/tests/test_res_pjsip_scheduler.c b/tests/test_res_pjsip_scheduler.c new file mode 100644 index 000000000..f9a1633ac --- /dev/null +++ b/tests/test_res_pjsip_scheduler.c @@ -0,0 +1,400 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2016, Fairview 5 Engineering, LLC + * + * George Joseph <george.joseph@fairview5.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \brief res_pjsip scheduler tests + * + * \author George Joseph <george.joseph@fairview5.com> + * + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <depend>res_pjsip</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_REGISTER_FILE() + +#include <pjsip.h> +#include "asterisk/test.h" +#include "asterisk/module.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/res_pjsip.h" +#include "asterisk/utils.h" + +#define CATEGORY "/res/res_pjsip/scheduler/" + +struct test_data { + ast_mutex_t lock; + ast_cond_t cond; + pthread_t tid; + struct timeval test_start; + struct timeval task_start; + struct timeval task_end; + int is_servant; + int interval; + int sleep; + int done; + struct ast_test *test; +}; + +#define S2U(x) (long int)(x * 1000 * 1000) +#define M2U(x) (long int)(x * 1000) + +static int task_1(void *data) +{ + struct test_data *test = data; + + test->done = 0; + test->task_start = ast_tvnow(); + test->tid = pthread_self(); + test->is_servant = ast_sip_thread_is_servant(); + usleep(M2U(test->sleep)); + test->task_end = ast_tvnow(); + + ast_mutex_lock(&test->lock); + test->done = 1; + ast_mutex_unlock(&test->lock); + ast_cond_signal(&test->cond); + + return test->interval; +} + + +static void data_cleanup(void *data) +{ + struct test_data *test_data = data; + ast_mutex_destroy(&test_data->lock); + ast_cond_destroy(&test_data->cond); +} + +#define waitfor(x) \ +{ \ + ast_mutex_lock(&(x)->lock); \ + while (!(x)->done) { \ + ast_cond_wait(&(x)->cond, &(x)->lock); \ + } \ + (x)->done = 0; \ + ast_mutex_unlock(&(x)->lock); \ +} + +static int scheduler(struct ast_test *test, int serialized) +{ + RAII_VAR(struct ast_taskprocessor *, tp1, NULL, ast_taskprocessor_unreference); + RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup); + RAII_VAR(struct test_data *, test_data2, ao2_alloc(sizeof(*test_data2), data_cleanup), ao2_cleanup); + RAII_VAR(struct ast_sip_sched_task *, task1, NULL, ao2_cleanup); + RAII_VAR(struct ast_sip_sched_task *, task2, NULL, ao2_cleanup); + int duration; + int delay; + struct timeval task1_start; + + ast_test_validate(test, test_data1 != NULL); + ast_test_validate(test, test_data2 != NULL); + + test_data1->test = test; + test_data1->test_start = ast_tvnow(); + test_data1->interval = 2000; + test_data1->sleep = 1000; + ast_mutex_init(&test_data1->lock); + ast_cond_init(&test_data1->cond, NULL); + + test_data2->test = test; + test_data2->test_start = ast_tvnow(); + test_data2->interval = 2000; + test_data2->sleep = 1000; + ast_mutex_init(&test_data2->lock); + ast_cond_init(&test_data2->cond, NULL); + + if (serialized) { + ast_test_status_update(test, "This test will take about %3.1f seconds\n", + (test_data1->interval + test_data1->sleep + (MAX(test_data1->interval - test_data2->interval, 0)) + test_data2->sleep) / 1000.0); + tp1 = ast_sip_create_serializer(); + ast_test_validate(test, (tp1 != NULL)); + } else { + ast_test_status_update(test, "This test will take about %3.1f seconds\n", + ((MAX(test_data1->interval, test_data2->interval) + MAX(test_data1->sleep, test_data2->sleep)) / 1000.0)); + } + + task1 = ast_sip_schedule_task(tp1, test_data1->interval, task_1, NULL, test_data1, AST_SIP_SCHED_TASK_FIXED); + ast_test_validate(test, task1 != NULL); + + task2 = ast_sip_schedule_task(tp1, test_data2->interval, task_1, NULL, test_data2, AST_SIP_SCHED_TASK_FIXED); + ast_test_validate(test, task2 != NULL); + + waitfor(test_data1); + ast_sip_sched_task_cancel(task1); + ast_test_validate(test, test_data1->is_servant); + + duration = ast_tvdiff_ms(test_data1->task_end, test_data1->test_start); + ast_test_validate(test, (duration > ((test_data1->interval + test_data1->sleep) * 0.9)) + && (duration < ((test_data1->interval + test_data1->sleep) * 1.1))); + + ast_sip_sched_task_get_times(task1, NULL, &task1_start, NULL); + delay = ast_tvdiff_ms(task1_start, test_data1->test_start); + ast_test_validate(test, (delay > (test_data1->interval * 0.9) + && (delay < (test_data1->interval * 1.1)))); + + waitfor(test_data2); + ast_sip_sched_task_cancel(task2); + ast_test_validate(test, test_data2->is_servant); + + if (serialized) { + ast_test_validate(test, test_data1->tid == test_data2->tid); + ast_test_validate(test, ast_tvdiff_ms(test_data2->task_start, test_data1->task_end) >= 0); + } else { + ast_test_validate(test, test_data1->tid != test_data2->tid); + } + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(serialized_scheduler) +{ + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = CATEGORY; + info->summary = "Test res_pjsip serialized scheduler"; + info->description = "Test res_pjsip serialized scheduler"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + return scheduler(test, 1); +} + +AST_TEST_DEFINE(unserialized_scheduler) +{ + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = CATEGORY; + info->summary = "Test res_pjsip unserialized scheduler"; + info->description = "Test res_pjsip unserialized scheduler"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + return scheduler(test, 0); +} + +static int run_count; +static int destruct_count; + +static int dummy_task(void *data) +{ + int *sleep = data; + + usleep(M2U(*sleep)); + run_count++; + + return 0; +} + +static void test_destructor(void *data) +{ + destruct_count++; +} + +AST_TEST_DEFINE(scheduler_cleanup) +{ + RAII_VAR(int *, sleep, NULL, ao2_cleanup); + RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup); + int interval; + int when; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = CATEGORY; + info->summary = "Test res_pjsip scheduler cleanup"; + info->description = "Test res_pjsip scheduler cleanup"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + destruct_count = 0; + interval = 1000; + + sleep = ao2_alloc(sizeof(*sleep), test_destructor); + ast_test_validate(test, sleep != NULL); + *sleep = 500; + + ast_test_status_update(test, "This test will take about %3.1f seconds\n", + ((interval * 1.1) + *sleep) / 1000.0); + + task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep, + AST_SIP_SCHED_TASK_DATA_AO2 | AST_SIP_SCHED_TASK_DATA_FREE); + ast_test_validate(test, task != NULL); + usleep(M2U(interval * 0.5)); + when = ast_sip_sched_task_get_next_run(task); + ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6))); + usleep(M2U(interval * 0.6)); + ast_test_validate(test, ast_sip_sched_is_task_running(task)); + + usleep(M2U(*sleep)); + + ast_test_validate(test, (ast_sip_sched_is_task_running(task) == 0)); + when = ast_sip_sched_task_get_next_run(task); + ast_test_validate(test, (when < 0), res, error); + ast_test_validate(test, (ao2_ref(task, 0) == 1)); + ao2_ref(task, -1); + task = NULL; + ast_test_validate(test, (destruct_count == 1)); + sleep = NULL; + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(scheduler_cancel) +{ + RAII_VAR(int *, sleep, NULL, ao2_cleanup); + RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup); + int interval; + int when; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = CATEGORY; + info->summary = "Test res_pjsip scheduler cancel task"; + info->description = "Test res_pjsip scheduler cancel task"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + destruct_count = 0; + interval = 1000; + + sleep = ao2_alloc(sizeof(*sleep), test_destructor); + ast_test_validate(test, sleep != NULL); + *sleep = 500; + + ast_test_status_update(test, "This test will take about %3.1f seconds\n", + (interval + *sleep) / 1000.0); + + task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep, AST_SIP_SCHED_TASK_DATA_NO_CLEANUP); + ast_test_validate(test, task != NULL); + + usleep(M2U(interval * 0.5)); + when = ast_sip_sched_task_get_next_run_by_name("dummy"); + ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6))); + ast_test_validate(test, !ast_sip_sched_is_task_running_by_name("dummy")); + ast_test_validate(test, ao2_ref(task, 0) == 2); + + ast_sip_sched_task_cancel_by_name("dummy"); + + when = ast_sip_sched_task_get_next_run(task); + ast_test_validate(test, when < 0); + + usleep(M2U(interval)); + ast_test_validate(test, run_count == 0); + ast_test_validate(test, destruct_count == 0); + ast_test_validate(test, ao2_ref(task, 0) == 1); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(scheduler_policy) +{ + RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup); + RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup); + int when; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = CATEGORY; + info->summary = "Test res_pjsip scheduler cancel task"; + info->description = "Test res_pjsip scheduler cancel task"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + ast_test_validate(test, test_data1 != NULL); + + destruct_count = 0; + run_count = 0; + test_data1->test = test; + test_data1->test_start = ast_tvnow(); + test_data1->interval = 1000; + test_data1->sleep = 500; + ast_mutex_init(&test_data1->lock); + ast_cond_init(&test_data1->cond, NULL); + + ast_test_status_update(test, "This test will take about %3.1f seconds\n", + ((test_data1->interval * 3) + test_data1->sleep) / 1000.0); + + task = ast_sip_schedule_task(NULL, test_data1->interval, task_1, "test_1", test_data1, + AST_SIP_SCHED_TASK_DATA_NO_CLEANUP | AST_SIP_SCHED_TASK_PERIODIC); + ast_test_validate(test, task != NULL); + + waitfor(test_data1); + when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start); + ast_test_validate(test, when > test_data1->interval * 0.9 && when < test_data1->interval * 1.1); + + waitfor(test_data1); + when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start); + ast_test_validate(test, when > test_data1->interval * 2 * 0.9 && when < test_data1->interval * 2 * 1.1); + + waitfor(test_data1); + when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start); + ast_test_validate(test, when > test_data1->interval * 3 * 0.9 && when < test_data1->interval * 3 * 1.1); + + ast_sip_sched_task_cancel(task); + ao2_ref(task, -1); + task = NULL; + + return AST_TEST_PASS; +} + +static int load_module(void) +{ + CHECK_PJSIP_MODULE_LOADED(); + + AST_TEST_REGISTER(serialized_scheduler); + AST_TEST_REGISTER(unserialized_scheduler); + AST_TEST_REGISTER(scheduler_cleanup); + AST_TEST_REGISTER(scheduler_cancel); + AST_TEST_REGISTER(scheduler_policy); + return AST_MODULE_LOAD_SUCCESS; +} + +static int unload_module(void) +{ + AST_TEST_UNREGISTER(scheduler_cancel); + AST_TEST_UNREGISTER(scheduler_cleanup); + AST_TEST_UNREGISTER(unserialized_scheduler); + AST_TEST_UNREGISTER(serialized_scheduler); + AST_TEST_UNREGISTER(scheduler_policy); + return 0; +} + +AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "res_pjsip scheduler test module"); diff --git a/third-party/pjproject/patches/0001-sip_parser.c-Remove-wholesale-strip-from-parse_param.patch b/third-party/pjproject/patches/0001-sip_parser.c-Remove-wholesale-strip-from-parse_param.patch new file mode 100644 index 000000000..e0bd9129c --- /dev/null +++ b/third-party/pjproject/patches/0001-sip_parser.c-Remove-wholesale-strip-from-parse_param.patch @@ -0,0 +1,55 @@ +From ce426249ec1270f27560919791f3e13eaeea9152 Mon Sep 17 00:00:00 2001 +From: George Joseph <george.joseph@fairview5.com> +Date: Tue, 12 Apr 2016 14:09:53 -0600 +Subject: [PATCH] sip_parser.c: Remove wholesale '[]' strip from + parse_param_impl + +The wholesale stripping of '[]' from header parameters causes issues if +something (like a port) occurrs after the final ']'. + +'[2001:a::b]' will correctly parse to '2001:a::b' +'[2001:a::b]:8080' will correctly parse to '2001:a::b' but the scanner is left +with ':8080' and parsing stops with a syntax error. + +I can't even find a case where stripping the '[]' is a good thing anyway. Even +if you continued to parse and resulted in a string that looks like this... +'2001:a::b:8080', it's not valid. + +This came up in Asterisk because Kamailio sends us a Contact with an alias +URI parameter that has an IPv6 address in it like this: +Contact: <sip:1171@127.0.0.1:5080;alias=[2001:1:2::3]~43691~6> +which should be legal but causes a syntax error because of the characters +after the final ']'. Even if it didn't, the '[]' should still not be stripped. + +I've run the Asterisk Test Suite for PJSIP (252 tests) many of which are IPv6 +enabled. No issues were caused by removing the code that strips the '[]'. + +I tried running 'make pjsip-test' but that fails even without my change. :) + +The Asterisk ticket is: https://issues.asterisk.org/jira/browse/ASTERISK-25123 +--- + pjsip/src/pjsip/sip_parser.c | 8 -------- + 1 file changed, 8 deletions(-) + +diff --git a/pjsip/src/pjsip/sip_parser.c b/pjsip/src/pjsip/sip_parser.c +index c18faa3..98eb5ea 100644 +--- a/pjsip/src/pjsip/sip_parser.c ++++ b/pjsip/src/pjsip/sip_parser.c +@@ -1149,14 +1149,6 @@ static void parse_param_imp( pj_scanner *scanner, pj_pool_t *pool, + pvalue->ptr++; + pvalue->slen -= 2; + } +- } else if (*scanner->curptr == '[') { +- /* pvalue can be a quoted IPv6; in this case, the +- * '[' and ']' quote characters are to be removed +- * from the pvalue. +- */ +- pj_scan_get_char(scanner); +- pj_scan_get_until_ch(scanner, ']', pvalue); +- pj_scan_get_char(scanner); + } else if(pj_cis_match(spec, *scanner->curptr)) { + parser_get_and_unescape(scanner, pool, spec, esc_spec, pvalue); + } +-- +2.5.5 + |