summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apps/app_queue.c152
-rw-r--r--apps/app_talkdetect.c2
-rw-r--r--apps/app_voicemail.c12
-rw-r--r--apps/confbridge/conf_chan_announce.c1
-rw-r--r--bridges/bridge_softmix.c13
-rw-r--r--channels/chan_sip.c32
-rw-r--r--configs/samples/sip.conf.sample1
-rw-r--r--funcs/func_odbc.c314
-rw-r--r--include/asterisk/astobj2.h134
-rw-r--r--include/asterisk/bridge_channel_internal.h38
-rw-r--r--include/asterisk/bridge_technology.h3
-rw-r--r--include/asterisk/features.h9
-rw-r--r--include/asterisk/res_pjsip.h235
-rw-r--r--include/asterisk/stasis.h24
-rw-r--r--main/bridge.c222
-rw-r--r--main/bridge_channel.c31
-rw-r--r--main/core_unreal.c1
-rw-r--r--main/features.c1
-rw-r--r--main/format_cap.c2
-rw-r--r--main/lock.c16
-rw-r--r--main/manager.c202
-rw-r--r--main/manager_channels.c24
-rw-r--r--main/stasis_message.c35
-rw-r--r--res/res_agi.c40
-rw-r--r--res/res_pjsip.c17
-rw-r--r--res/res_pjsip/include/res_pjsip_private.h19
-rw-r--r--res/res_pjsip/location.c56
-rw-r--r--res/res_pjsip/pjsip_scheduler.c495
-rw-r--r--res/res_pjsip_caller_id.c6
-rw-r--r--res/res_pjsip_registrar.c3
-rw-r--r--res/res_pjsip_transport_management.c18
-rw-r--r--res/stasis/control.c14
-rw-r--r--res/stasis/control.h18
-rw-r--r--res/stasis/stasis_bridge.c61
-rw-r--r--tests/test_channel_feature_hooks.c15
-rw-r--r--tests/test_message.c8
-rw-r--r--tests/test_res_pjsip_scheduler.c400
-rw-r--r--third-party/pjproject/patches/0001-sip_parser.c-Remove-wholesale-strip-from-parse_param.patch55
38 files changed, 2341 insertions, 388 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c
index 939a0e2ad..dbd83938d 100644
--- a/apps/app_queue.c
+++ b/apps/app_queue.c
@@ -1510,7 +1510,6 @@ struct member {
struct call_queue *lastqueue; /*!< Last queue we received a call */
unsigned int dead:1; /*!< Used to detect members deleted in realtime */
unsigned int delme:1; /*!< Flag to delete entry on reload */
- unsigned int call_pending:1; /*!< TRUE if the Q is attempting to place a call to the member. */
char rt_uniqueid[80]; /*!< Unique id of realtime member entry */
unsigned int ringinuse:1; /*!< Flag to ring queue members even if their status is 'inuse' */
};
@@ -2267,6 +2266,70 @@ static int get_member_status(struct call_queue *q, int max_penalty, int min_pena
return -1;
}
+/*
+ * A "pool" of member objects that calls are currently pending on. If an
+ * agent is a member of multiple queues it's possible for that agent to be
+ * called by each of the queues at the same time. This happens because device
+ * state is slow to notify the queue app of one of it's member's being rung.
+ * This "pool" allows us to track which members are currently being rung while
+ * we wait on the device state change.
+ */
+static struct ao2_container *pending_members;
+#define MAX_CALL_ATTEMPT_BUCKETS 353
+
+static int pending_members_hash(const void *obj, const int flags)
+{
+ const struct member *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = object->interface;
+ break;
+ default:
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_case_hash(key);
+}
+
+static int pending_members_cmp(void *obj, void *arg, int flags)
+{
+ const struct member *object_left = obj;
+ const struct member *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->interface;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcasecmp(object_left->interface, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /* Not supported by container. */
+ ast_assert(0);
+ return 0;
+ default:
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ return CMP_MATCH;
+}
+
+static void pending_members_remove(struct member *mem)
+{
+ ao2_find(pending_members, mem, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK);
+}
+
/*! \brief set a member's status based on device state of that member's state_interface.
*
* Lock interface list find sc, iterate through each queues queue_member list for member to
@@ -2276,6 +2339,9 @@ static void update_status(struct call_queue *q, struct member *m, const int stat
{
m->status = status;
+ /* Whatever the status is clear the member from the pending members pool */
+ pending_members_remove(m);
+
queue_publish_member_blob(queue_member_status_type(), queue_member_blob_create(q, m));
}
@@ -3132,6 +3198,7 @@ static void member_add_to_queue(struct call_queue *queue, struct member *mem)
*/
static void member_remove_from_queue(struct call_queue *queue, struct member *mem)
{
+ pending_members_remove(mem);
ao2_lock(queue->members);
ast_devstate_changed(QUEUE_UNKNOWN_PAUSED_DEVSTATE, AST_DEVSTATE_CACHABLE, "Queue:%s_pause_%s", queue->name, mem->interface);
queue_member_follower_removal(queue, mem);
@@ -4110,41 +4177,6 @@ static int member_status_available(int status)
/*!
* \internal
- * \brief Clear the member call pending flag.
- *
- * \param mem Queue member.
- *
- * \return Nothing
- */
-static void member_call_pending_clear(struct member *mem)
-{
- ao2_lock(mem);
- mem->call_pending = 0;
- ao2_unlock(mem);
-}
-
-/*!
- * \internal
- * \brief Set the member call pending flag.
- *
- * \param mem Queue member.
- *
- * \retval non-zero if call pending flag was already set.
- */
-static int member_call_pending_set(struct member *mem)
-{
- int old_pending;
-
- ao2_lock(mem);
- old_pending = mem->call_pending;
- mem->call_pending = 1;
- ao2_unlock(mem);
-
- return old_pending;
-}
-
-/*!
- * \internal
* \brief Determine if can ring a queue entry.
*
* \param qe Queue entry to check.
@@ -4164,7 +4196,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
return 0;
}
- if (call->member->in_call && call->lastqueue->wrapuptime) {
+ if (call->member->in_call && call->lastqueue && call->lastqueue->wrapuptime) {
ast_debug(1, "%s is in call, so not available (wrapuptime %d)\n",
call->interface, call->lastqueue->wrapuptime);
return 0;
@@ -4185,13 +4217,32 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
}
if (!call->member->ringinuse) {
- if (member_call_pending_set(call->member)) {
- ast_debug(1, "%s has another call pending, can't receive call\n",
- call->interface);
+ struct member *mem;
+
+ ao2_lock(pending_members);
+
+ mem = ao2_find(pending_members, call->member,
+ OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
+ if (mem) {
+ /*
+ * If found that means this member is currently being attempted
+ * from another calling thread, so stop trying from this thread
+ */
+ ast_debug(1, "%s has another call trying, can't receive call\n",
+ call->interface);
+ ao2_ref(mem, -1);
+ ao2_unlock(pending_members);
return 0;
}
/*
+ * If not found add it to the container so another queue
+ * won't attempt to call this member at the same time.
+ */
+ ao2_link(pending_members, call->member);
+ ao2_unlock(pending_members);
+
+ /*
* The queue member is available. Get current status to be sure
* because the device state and extension state callbacks may
* not have updated the status yet.
@@ -4199,7 +4250,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
if (!member_status_available(get_queue_member_status(call->member))) {
ast_debug(1, "%s actually not available, can't receive call\n",
call->interface);
- member_call_pending_clear(call->member);
+ pending_members_remove(call->member);
return 0;
}
}
@@ -4236,7 +4287,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
++*busies;
return 0;
}
- ast_assert(tmp->member->ringinuse || tmp->member->call_pending);
ast_copy_string(tech, tmp->interface, sizeof(tech));
if ((location = strchr(tech, '/'))) {
@@ -4253,7 +4303,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
qe->linpos++;
ao2_unlock(qe->parent);
- member_call_pending_clear(tmp->member);
+ pending_members_remove(tmp->member);
publish_dial_end_event(qe->chan, tmp, NULL, "BUSY");
tmp->stillgoing = 0;
@@ -4324,7 +4374,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
/* Again, keep going even if there's an error */
ast_verb(3, "Couldn't call %s\n", tmp->interface);
do_hang(tmp);
- member_call_pending_clear(tmp->member);
+ pending_members_remove(tmp->member);
++*busies;
return 0;
}
@@ -4344,7 +4394,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
ast_verb(3, "Called %s\n", tmp->interface);
- member_call_pending_clear(tmp->member);
return 1;
}
@@ -9472,7 +9521,7 @@ static int manager_queues_summary(struct mansession *s, const struct message *m)
ao2_lock(q);
/* List queue properties */
- if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) {
+ if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) {
/* Reset the necessary local variables if no queuefilter is set*/
qmemcount = 0;
qmemavail = 0;
@@ -9550,7 +9599,7 @@ static int manager_queues_status(struct mansession *s, const struct message *m)
ao2_lock(q);
/* List queue properties */
- if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) {
+ if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) {
sl = ((q->callscompleted > 0) ? 100 * ((float)q->callscompletedinsl / (float)q->callscompleted) : 0);
astman_append(s, "Event: QueueParams\r\n"
"Queue: %s\r\n"
@@ -10806,6 +10855,8 @@ static int unload_module(void)
ast_unload_realtime("queue_members");
ao2_cleanup(queues);
+ ao2_cleanup(pending_members);
+
queues = NULL;
return 0;
}
@@ -10833,6 +10884,13 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
+ pending_members = ao2_container_alloc(
+ MAX_CALL_ATTEMPT_BUCKETS, pending_members_hash, pending_members_cmp);
+ if (!pending_members) {
+ unload_module();
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
use_weight = 0;
if (reload_handler(0, &mask, NULL)) {
diff --git a/apps/app_talkdetect.c b/apps/app_talkdetect.c
index a021252de..f7086fdd9 100644
--- a/apps/app_talkdetect.c
+++ b/apps/app_talkdetect.c
@@ -26,7 +26,7 @@
*/
/*** MODULEINFO
- <support_level>extended</support_level>
+ <support_level>core</support_level>
***/
#include "asterisk.h"
diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c
index 798f844fa..a0b668d8d 100644
--- a/apps/app_voicemail.c
+++ b/apps/app_voicemail.c
@@ -622,12 +622,12 @@ static AST_LIST_HEAD_STATIC(vmstates, vmstate);
#define OPERATOR_EXIT 300
enum vm_box {
- NEW_FOLDER,
- OLD_FOLDER,
- WORK_FOLDER,
- FAMILY_FOLDER,
- FRIENDS_FOLDER,
- GREETINGS_FOLDER
+ NEW_FOLDER = 0,
+ OLD_FOLDER = 1,
+ WORK_FOLDER = 2,
+ FAMILY_FOLDER = 3,
+ FRIENDS_FOLDER = 4,
+ GREETINGS_FOLDER = -1
};
enum vm_option_flags {
diff --git a/apps/confbridge/conf_chan_announce.c b/apps/confbridge/conf_chan_announce.c
index 6596a8537..ee4660687 100644
--- a/apps/confbridge/conf_chan_announce.c
+++ b/apps/confbridge/conf_chan_announce.c
@@ -199,7 +199,6 @@ int conf_announce_channel_push(struct ast_channel *ast)
/* Impart the output channel into the bridge */
if (ast_bridge_impart(p->bridge, chan, NULL, features,
AST_BRIDGE_IMPART_CHAN_DEPARTABLE)) {
- ast_bridge_features_destroy(features);
ast_channel_unref(chan);
return -1;
}
diff --git a/bridges/bridge_softmix.c b/bridges/bridge_softmix.c
index e3df18fe5..fe058e4e6 100644
--- a/bridges/bridge_softmix.c
+++ b/bridges/bridge_softmix.c
@@ -359,6 +359,9 @@ static void set_softmix_bridge_data(int rate, int interval, struct ast_bridge_ch
struct ast_format *slin_format;
int setup_fail;
+ /* The callers have already ensured that sc is never NULL. */
+ ast_assert(sc != NULL);
+
slin_format = ast_format_cache_get_slin_by_rate(rate);
ast_mutex_lock(&sc->lock);
@@ -714,7 +717,7 @@ static int softmix_bridge_write(struct ast_bridge *bridge, struct ast_bridge_cha
{
int res = 0;
- if (!bridge->tech_pvt || (bridge_channel && !bridge_channel->tech_pvt)) {
+ if (!bridge->tech_pvt || !bridge_channel || !bridge_channel->tech_pvt) {
/* "Accept" the frame and discard it. */
return 0;
}
@@ -984,6 +987,11 @@ static int softmix_mixing_loop(struct ast_bridge *bridge)
AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) {
struct softmix_channel *sc = bridge_channel->tech_pvt;
+ if (!sc) {
+ /* This channel failed to join successfully. */
+ continue;
+ }
+
/* Update the sample rate to match the bridge's native sample rate if necessary. */
if (update_all_rates) {
set_softmix_bridge_data(softmix_data->internal_rate, softmix_data->internal_mixing_interval, bridge_channel, 1);
@@ -1019,7 +1027,8 @@ static int softmix_mixing_loop(struct ast_bridge *bridge)
AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) {
struct softmix_channel *sc = bridge_channel->tech_pvt;
- if (bridge_channel->suspended) {
+ if (!sc || bridge_channel->suspended) {
+ /* This channel failed to join successfully or is suspended. */
continue;
}
diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index 91fb0b546..cbbda4e73 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -35170,17 +35170,19 @@ static int load_module(void)
/* And start the monitor for the first time */
restart_monitor();
- ast_realtime_require_field(ast_check_realtime("sipregs") ? "sipregs" : "sippeers",
- "name", RQ_CHAR, 10,
- "ipaddr", RQ_CHAR, INET6_ADDRSTRLEN - 1,
- "port", RQ_UINTEGER2, 5,
- "regseconds", RQ_INTEGER4, 11,
- "defaultuser", RQ_CHAR, 10,
- "fullcontact", RQ_CHAR, 35,
- "regserver", RQ_CHAR, 20,
- "useragent", RQ_CHAR, 20,
- "lastms", RQ_INTEGER4, 11,
- SENTINEL);
+ if (sip_cfg.peer_rtupdate) {
+ ast_realtime_require_field(ast_check_realtime("sipregs") ? "sipregs" : "sippeers",
+ "name", RQ_CHAR, 10,
+ "ipaddr", RQ_CHAR, INET6_ADDRSTRLEN - 1,
+ "port", RQ_UINTEGER2, 5,
+ "regseconds", RQ_INTEGER4, 11,
+ "defaultuser", RQ_CHAR, 10,
+ "fullcontact", RQ_CHAR, 35,
+ "regserver", RQ_CHAR, 20,
+ "useragent", RQ_CHAR, 20,
+ "lastms", RQ_INTEGER4, 11,
+ SENTINEL);
+ }
sip_register_tests();
@@ -35199,7 +35201,7 @@ static int unload_module(void)
struct sip_pvt *p;
struct sip_threadinfo *th;
struct ao2_iterator i;
- int wait_count;
+ struct timeval start;
ast_sip_api_provider_unregister();
@@ -35349,11 +35351,11 @@ static int unload_module(void)
* joinable. They can die on their own and remove themselves
* from the container thus resulting in a huge memory leak.
*/
- wait_count = 1000;
- while (ao2_container_count(threadt) && --wait_count) {
+ start = ast_tvnow();
+ while (ao2_container_count(threadt) && (ast_tvdiff_sec(ast_tvnow(), start) < 5)) {
sched_yield();
}
- if (!wait_count) {
+ if (ao2_container_count(threadt)) {
ast_debug(2, "TCP/TLS thread container did not become empty :(\n");
}
diff --git a/configs/samples/sip.conf.sample b/configs/samples/sip.conf.sample
index a24ab30a6..5c3238e2a 100644
--- a/configs/samples/sip.conf.sample
+++ b/configs/samples/sip.conf.sample
@@ -1479,7 +1479,6 @@ srvlookup=yes ; Enable DNS SRV lookups on outbound calls
;allow=ulaw
;allow=alaw
;mailbox=1234@default,1233@default ; Subscribe to status of multiple mailboxes
-;registertrying=yes ; Send a 100 Trying when the device registers.
;[snom]
;type=friend ; Friends place calls and receive calls
diff --git a/funcs/func_odbc.c b/funcs/func_odbc.c
index 0af3fd1c8..d17debd4c 100644
--- a/funcs/func_odbc.c
+++ b/funcs/func_odbc.c
@@ -137,6 +137,163 @@ struct odbc_datastore {
char names[0];
};
+/* \brief Data source name
+ *
+ * This holds data that pertains to a DSN
+ */
+struct dsn {
+ /*! A connection to the database */
+ struct odbc_obj *connection;
+ /*! The name of the DSN as defined in res_odbc.conf */
+ char name[0];
+};
+
+#define DSN_BUCKETS 37
+
+struct ao2_container *dsns;
+
+static int dsn_hash(const void *obj, const int flags)
+{
+ const struct dsn *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = object->name;
+ break;
+ default:
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_hash(key);
+}
+
+static int dsn_cmp(void *obj, void *arg, int flags)
+{
+ const struct dsn *object_left = obj;
+ const struct dsn *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->name;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcmp(object_left->name, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ cmp = strncmp(object_left->name, right_key, strlen(right_key));
+ break;
+ default:
+ cmp = 0;
+ break;
+ }
+
+ if (cmp) {
+ return 0;
+ }
+
+ return CMP_MATCH;
+}
+
+static void dsn_destructor(void *obj)
+{
+ struct dsn *dsn = obj;
+
+ if (dsn->connection) {
+ ast_odbc_release_obj(dsn->connection);
+ }
+}
+
+/*!
+ * \brief Create a DSN and connect to the database
+ *
+ * \param name The name of the DSN as found in res_odbc.conf
+ * \retval NULL Fail
+ * \retval non-NULL The newly-created structure
+ */
+static struct dsn *create_dsn(const char *name)
+{
+ struct dsn *dsn;
+
+ dsn = ao2_alloc(sizeof(*dsn) + strlen(name) + 1, dsn_destructor);
+ if (!dsn) {
+ return NULL;
+ }
+
+ /* Safe */
+ strcpy(dsn->name, name);
+
+ dsn->connection = ast_odbc_request_obj(name, 0);
+ if (!dsn->connection) {
+ ao2_ref(dsn, -1);
+ return NULL;
+ }
+
+ if (!ao2_link_flags(dsns, dsn, OBJ_NOLOCK)) {
+ ao2_ref(dsn, -1);
+ return NULL;
+ }
+
+ return dsn;
+}
+
+/*!
+ * \brief Retrieve a DSN, or create it if it does not exist.
+ *
+ * The created DSN is returned locked. This should be inconsequential
+ * to callers in most cases.
+ *
+ * When finished with the returned structure, the caller must call
+ * \ref release_dsn
+ *
+ * \param name Name of the DSN as found in res_odbc.conf
+ * \retval NULL Unable to retrieve or create the DSN
+ * \retval non-NULL The retrieved/created locked DSN
+ */
+static struct dsn *get_dsn(const char *name)
+{
+ struct dsn *dsn;
+
+ ao2_lock(dsns);
+ dsn = ao2_find(dsns, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!dsn) {
+ dsn = create_dsn(name);
+ }
+ ao2_unlock(dsns);
+
+ if (!dsn) {
+ return NULL;
+ }
+
+ ao2_lock(dsn->connection);
+
+ return dsn;
+}
+
+/*!
+ * \brief Unlock and unreference a DSN
+ *
+ * \param dsn The dsn to unlock and unreference
+ * \return NULL
+ */
+static void *release_dsn(struct dsn *dsn)
+{
+ if (!dsn) {
+ return NULL;
+ }
+
+ ao2_unlock(dsn->connection);
+ ao2_ref(dsn, -1);
+
+ return NULL;
+}
+
static AST_RWLIST_HEAD_STATIC(queries, acf_odbc_query);
static int resultcount = 0;
@@ -214,7 +371,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
struct odbc_obj *obj = NULL;
struct acf_odbc_query *query;
char *t, varname[15];
- int i, dsn, bogus_chan = 0;
+ int i, dsn_num, bogus_chan = 0;
int transactional = 0;
AST_DECLARE_APP_ARGS(values,
AST_APP_ARG(field)[100];
@@ -227,6 +384,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
struct ast_str *buf = ast_str_thread_get(&sql_buf, 16);
struct ast_str *insertbuf = ast_str_thread_get(&sql2_buf, 16);
const char *status = "FAILURE";
+ struct dsn *dsn = NULL;
if (!buf || !insertbuf) {
return -1;
@@ -324,17 +482,21 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
* to multiple DSNs. We MUST have a single handle all the way through the
* transaction, or else we CANNOT enforce atomicity.
*/
- for (dsn = 0; dsn < 5; dsn++) {
- if (!ast_strlen_zero(query->writehandle[dsn])) {
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (!ast_strlen_zero(query->writehandle[dsn_num])) {
if (transactional) {
/* This can only happen second time through or greater. */
ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
}
- if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+ if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
transactional = 1;
} else {
- obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+ dsn = get_dsn(query->writehandle[dsn_num]);
+ if (!dsn) {
+ continue;
+ }
+ obj = dsn->connection;
transactional = 0;
}
@@ -342,10 +504,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
break;
}
- if (obj && !transactional) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
}
}
@@ -358,25 +517,25 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
status = "SUCCESS";
} else if (query->sql_insert) {
- if (obj && !transactional) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
- for (transactional = 0, dsn = 0; dsn < 5; dsn++) {
- if (!ast_strlen_zero(query->writehandle[dsn])) {
+ for (transactional = 0, dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (!ast_strlen_zero(query->writehandle[dsn_num])) {
if (transactional) {
/* This can only happen second time through or greater. */
ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
} else if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
}
- if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+ if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
transactional = 1;
} else {
- obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+ dsn = get_dsn(query->writehandle[dsn_num]);
+ if (!dsn) {
+ continue;
+ }
+ obj = dsn->connection;
transactional = 0;
}
if (obj) {
@@ -406,10 +565,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
}
- if (obj && !transactional) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
ast_autoservice_stop(chan);
@@ -420,11 +576,10 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, char *buf, size_t len)
{
- struct odbc_obj *obj = NULL;
struct acf_odbc_query *query;
char varname[15], rowcount[12] = "-1";
struct ast_str *colnames = ast_str_thread_get(&colnames_buf, 16);
- int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn, bogus_chan = 0;
+ int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn_num, bogus_chan = 0;
AST_DECLARE_APP_ARGS(args,
AST_APP_ARG(field)[100];
);
@@ -436,6 +591,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
struct odbc_datastore_row *row = NULL;
struct ast_str *sql = ast_str_thread_get(&sql_buf, 16);
const char *status = "FAILURE";
+ struct dsn *dsn = NULL;
if (!sql || !colnames) {
if (chan) {
@@ -523,28 +679,23 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
}
AST_RWLIST_UNLOCK(&queries);
- for (dsn = 0; dsn < 5; dsn++) {
- if (!ast_strlen_zero(query->readhandle[dsn])) {
- obj = ast_odbc_request_obj(query->readhandle[dsn], 0);
- if (obj) {
- stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql));
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (!ast_strlen_zero(query->readhandle[dsn_num])) {
+ dsn = get_dsn(query->readhandle[dsn_num]);
+ if (!dsn) {
+ continue;
}
+ stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql));
}
if (stmt) {
break;
}
- if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
}
if (!stmt) {
ast_log(LOG_ERROR, "Unable to execute query [%s]\n", ast_str_buffer(sql));
- if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
ast_autoservice_stop(chan);
@@ -558,8 +709,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
SQLCloseCursor(stmt);
SQLFreeHandle (SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
ast_autoservice_stop(chan);
@@ -583,8 +733,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
}
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
@@ -607,8 +756,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
odbc_datastore_free(resultset);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
ast_autoservice_stop(chan);
@@ -640,8 +788,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
odbc_datastore_free(resultset);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (!bogus_chan) {
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
@@ -750,8 +897,7 @@ end_acf_read:
odbc_datastore_free(resultset);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
ast_autoservice_stop(chan);
return -1;
@@ -764,8 +910,7 @@ end_acf_read:
}
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (resultset && !multirow) {
/* Fetch the first resultset */
if (!acf_fetch(chan, "", buf, buf, len)) {
@@ -1192,8 +1337,8 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
if (a->argc == 5 && !strcmp(a->argv[4], "exec")) {
/* Execute the query */
- struct odbc_obj *obj = NULL;
- int dsn, executed = 0;
+ struct dsn *dsn = NULL;
+ int dsn_num, executed = 0;
SQLHSTMT stmt;
int rows = 0, res, x;
SQLSMALLINT colcount = 0, collength;
@@ -1207,19 +1352,18 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_SUCCESS;
}
- for (dsn = 0; dsn < 5; dsn++) {
- if (ast_strlen_zero(query->readhandle[dsn])) {
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (ast_strlen_zero(query->readhandle[dsn_num])) {
continue;
}
- ast_debug(1, "Found handle %s\n", query->readhandle[dsn]);
- if (!(obj = ast_odbc_request_obj(query->readhandle[dsn], 0))) {
+ dsn = get_dsn(query->readhandle[dsn_num]);
+ if (!dsn) {
continue;
}
+ ast_debug(1, "Found handle %s\n", query->readhandle[dsn_num]);
- ast_debug(1, "Got obj\n");
- if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
- ast_odbc_release_obj(obj);
- obj = NULL;
+ if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+ dsn = release_dsn(dsn);
continue;
}
@@ -1230,8 +1374,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
ast_cli(a->fd, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
SQLCloseCursor(stmt);
SQLFreeHandle (SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
AST_RWLIST_UNLOCK(&queries);
return CLI_SUCCESS;
}
@@ -1240,10 +1383,9 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
if (res == SQL_NO_DATA) {
- ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn, query->readhandle[dsn], ast_str_buffer(sql));
+ ast_cli(a->fd, "Returned %d rows. Query executed on handle %d:%s [%s]\n", rows, dsn_num, query->readhandle[dsn_num], ast_str_buffer(sql));
break;
} else {
ast_cli(a->fd, "Error %d in FETCH [%s]\n", res, ast_str_buffer(sql));
@@ -1270,8 +1412,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
ast_cli(a->fd, "SQL Get Data error %d!\n[%s]\n\n", res, ast_str_buffer(sql));
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
+ dsn = release_dsn(dsn);
AST_RWLIST_UNLOCK(&queries);
return CLI_SUCCESS;
}
@@ -1289,15 +1430,11 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
}
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
- ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn, query->readhandle[dsn]);
+ dsn = release_dsn(dsn);
+ ast_cli(a->fd, "Returned %d row%s. Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn_num, query->readhandle[dsn_num]);
break;
}
- if (obj) {
- ast_odbc_release_obj(obj);
- obj = NULL;
- }
+ dsn = release_dsn(dsn);
if (!executed) {
ast_cli(a->fd, "Failed to execute query. [%s]\n", ast_str_buffer(sql));
@@ -1420,30 +1557,29 @@ static char *cli_odbc_write(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
if (a->argc == 6 && !strcmp(a->argv[5], "exec")) {
/* Execute the query */
- struct odbc_obj *obj = NULL;
- int dsn, executed = 0;
+ struct dsn *dsn;
+ int dsn_num, executed = 0;
SQLHSTMT stmt;
SQLLEN rows = -1;
- for (dsn = 0; dsn < 5; dsn++) {
- if (ast_strlen_zero(query->writehandle[dsn])) {
+ for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+ if (ast_strlen_zero(query->writehandle[dsn_num])) {
continue;
}
- if (!(obj = ast_odbc_request_obj(query->writehandle[dsn], 0))) {
+ dsn = get_dsn(query->writehandle[dsn_num]);
+ if (!dsn) {
continue;
}
- if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
- ast_odbc_release_obj(obj);
- obj = NULL;
+ if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+ dsn = release_dsn(dsn);
continue;
}
SQLRowCount(stmt, &rows);
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- ast_odbc_release_obj(obj);
- obj = NULL;
- ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn, query->writehandle[dsn]);
+ dsn = release_dsn(dsn);
+ ast_cli(a->fd, "Affected %d rows. Query executed on handle %d [%s]\n", (int)rows, dsn_num, query->writehandle[dsn_num]);
executed = 1;
break;
}
@@ -1470,6 +1606,11 @@ static int load_module(void)
char *catg;
struct ast_flags config_flags = { 0 };
+ dsns = ao2_container_alloc(DSN_BUCKETS, dsn_hash, dsn_cmp);
+ if (!dsns) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
res |= ast_custom_function_register(&fetch_function);
res |= ast_register_application_xml(app_odbcfinish, exec_odbcfinish);
AST_RWLIST_WRLOCK(&queries);
@@ -1478,6 +1619,7 @@ static int load_module(void)
if (!cfg || cfg == CONFIG_STATUS_FILEINVALID) {
ast_log(LOG_NOTICE, "Unable to load config for func_odbc: %s\n", config);
AST_RWLIST_UNLOCK(&queries);
+ ao2_ref(dsns, -1);
return AST_MODULE_LOAD_DECLINE;
}
@@ -1531,6 +1673,8 @@ static int unload_module(void)
AST_RWLIST_WRLOCK(&queries);
AST_RWLIST_UNLOCK(&queries);
+
+ ao2_ref(dsns, -1);
return res;
}
diff --git a/include/asterisk/astobj2.h b/include/asterisk/astobj2.h
index 692cc7cb4..4bd44db76 100644
--- a/include/asterisk/astobj2.h
+++ b/include/asterisk/astobj2.h
@@ -19,6 +19,7 @@
#include "asterisk/compat.h"
#include "asterisk/lock.h"
+#include "asterisk/inline_api.h"
/*! \file
* \ref AstObj2
@@ -638,6 +639,46 @@ int __ao2_trylock(void *a, enum ao2_lock_req lock_how, const char *file, const c
void *ao2_object_get_lockaddr(void *obj);
+/*!
+ * \brief Increment reference count on an object and lock it
+ * \since 13.9.0
+ *
+ * \param[in] obj A pointer to the ao2 object
+ * \retval 0 The object is not an ao2 object or wasn't locked successfully
+ * \retval 1 The object's reference count was incremented and was locked
+ */
+AST_INLINE_API(
+int ao2_ref_and_lock(void *obj),
+{
+ ao2_ref(obj, +1);
+ if (ao2_lock(obj)) {
+ ao2_ref(obj, -1);
+ return 0;
+ }
+ return 1;
+}
+)
+
+/*!
+ * \brief Unlock an object and decrement its reference count
+ * \since 13.9.0
+ *
+ * \param[in] obj A pointer to the ao2 object
+ * \retval 0 The object is not an ao2 object or wasn't unlocked successfully
+ * \retval 1 The object was unlocked and it's reference count was decremented
+ */
+AST_INLINE_API(
+int ao2_unlock_and_unref(void *obj),
+{
+ if (ao2_unlock(obj)) {
+ return 0;
+ }
+ ao2_ref(obj, -1);
+
+ return 1;
+}
+)
+
/*! Global ao2 object holder structure. */
struct ao2_global_obj {
/*! Access lock to the held ao2 object. */
@@ -1985,4 +2026,97 @@ void ao2_iterator_cleanup(struct ao2_iterator *iter);
*/
int ao2_iterator_count(struct ao2_iterator *iter);
+/*!
+ * \brief Creates a hash function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to hash
+ *
+ * AO2_STRING_FIELD_HASH_CB(mystruct, myfield) will produce a function
+ * named mystruct_hash_fn which hashes mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_HASH_FN(stype, field) \
+static int stype ## _hash_fn(const void *obj, const int flags) \
+{ \
+ const struct stype *object = obj; \
+ const char *key; \
+ switch (flags & OBJ_SEARCH_MASK) { \
+ case OBJ_SEARCH_KEY: \
+ key = obj; \
+ break; \
+ case OBJ_SEARCH_OBJECT: \
+ key = object->field; \
+ break; \
+ default: \
+ ast_assert(0); \
+ return 0; \
+ } \
+ return ast_str_hash(key); \
+}
+
+/*!
+ * \brief Creates a compare function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to compare
+ *
+ * AO2_STRING_FIELD_CMP_FN(mystruct, myfield) will produce a function
+ * named mystruct_cmp_fn which compares mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_CMP_FN(stype, field) \
+static int stype ## _cmp_fn(void *obj, void *arg, int flags) \
+{ \
+ const struct stype *object_left = obj, *object_right = arg; \
+ const char *right_key = arg; \
+ int cmp; \
+ switch (flags & OBJ_SEARCH_MASK) { \
+ case OBJ_SEARCH_OBJECT: \
+ right_key = object_right->field; \
+ case OBJ_SEARCH_KEY: \
+ cmp = strcmp(object_left->field, right_key); \
+ break; \
+ case OBJ_SEARCH_PARTIAL_KEY: \
+ cmp = strncmp(object_left->field, right_key, strlen(right_key)); \
+ break; \
+ default: \
+ cmp = 0; \
+ break; \
+ } \
+ if (cmp) { \
+ return 0; \
+ } \
+ return CMP_MATCH; \
+}
+
+/*!
+ * \brief Creates a sort function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to compare
+ *
+ * AO2_STRING_FIELD_SORT_FN(mystruct, myfield) will produce a function
+ * named mystruct_sort_fn which compares mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_SORT_FN(stype, field) \
+static int stype ## _sort_fn(const void *obj, const void *arg, int flags) \
+{ \
+ const struct stype *object_left = obj; \
+ const struct stype *object_right = arg; \
+ const char *right_key = arg; \
+ int cmp; \
+\
+ switch (flags & OBJ_SEARCH_MASK) { \
+ case OBJ_SEARCH_OBJECT: \
+ right_key = object_right->field; \
+ /* Fall through */ \
+ case OBJ_SEARCH_KEY: \
+ cmp = strcmp(object_left->field, right_key); \
+ break; \
+ case OBJ_SEARCH_PARTIAL_KEY: \
+ cmp = strncmp(object_left->field, right_key, strlen(right_key)); \
+ break; \
+ default: \
+ cmp = 0; \
+ break; \
+ } \
+ return cmp; \
+}
+
#endif /* _ASTERISK_ASTOBJ2_H */
diff --git a/include/asterisk/bridge_channel_internal.h b/include/asterisk/bridge_channel_internal.h
index 7f7d5a88b..fb8e781e8 100644
--- a/include/asterisk/bridge_channel_internal.h
+++ b/include/asterisk/bridge_channel_internal.h
@@ -151,47 +151,20 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel,
void bridge_channel_internal_pull(struct ast_bridge_channel *bridge_channel);
/*!
- * \brief Internal bridge channel wait condition and associated result.
- */
-struct bridge_channel_internal_cond {
- /*! Lock for the data structure */
- ast_mutex_t lock;
- /*! Wait condition */
- ast_cond_t cond;
- /*! Wait until done */
- int done;
- /*! The bridge channel */
- struct ast_bridge_channel *bridge_channel;
-};
-
-/*!
- * \internal
- * \brief Wait for the expected signal.
- * \since 13.5.0
- *
- * \param cond the wait object
- *
- * \return Nothing
- */
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond);
-
-/*!
- * \internal
- * \brief Signal the condition wait.
- * \since 13.5.0
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
*
- * \param cond the wait object
+ * \param chan Channel imparted that we need to signal.
*
* \return Nothing
*/
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
+void bridge_channel_impart_signal(struct ast_channel *chan);
/*!
* \internal
* \brief Join the bridge_channel to the bridge (blocking)
*
* \param bridge_channel The Channel in the bridge
- * \param cond data used for signaling
*
* \note The bridge_channel->swap holds a channel reference for the swap
* channel going into the bridging system. The ref ensures that the swap
@@ -206,8 +179,7 @@ void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
* \retval 0 bridge channel successfully joined the bridge
* \retval -1 bridge channel failed to join the bridge
*/
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
- struct bridge_channel_internal_cond *cond);
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel);
/*!
* \internal
diff --git a/include/asterisk/bridge_technology.h b/include/asterisk/bridge_technology.h
index 7de573a23..7f5d746f8 100644
--- a/include/asterisk/bridge_technology.h
+++ b/include/asterisk/bridge_technology.h
@@ -107,6 +107,9 @@ struct ast_bridge_technology {
* \retval -1 on failure
*
* \note On entry, bridge is already locked.
+ *
+ * \note The bridge technology must tollerate a failed to join channel
+ * until it can be kicked from the bridge.
*/
int (*join)(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel);
/*!
diff --git a/include/asterisk/features.h b/include/asterisk/features.h
index b63124c2f..a4aed5d18 100644
--- a/include/asterisk/features.h
+++ b/include/asterisk/features.h
@@ -51,6 +51,7 @@ int ast_bridge_call(struct ast_channel *chan, struct ast_channel *peer, struct a
/*!
* \brief Bridge a call, and add additional flags to the bridge
*
+ * \details
* This does the same thing as \ref ast_bridge_call, except that once the bridge
* is created, the provided flags are set on the bridge. The provided flags are
* added to the bridge's flags; they will not clear any flags already set.
@@ -70,6 +71,7 @@ int ast_bridge_call_with_flags(struct ast_channel *chan, struct ast_channel *pee
* \brief Add an arbitrary channel to a bridge
* \since 12.0.0
*
+ * \details
* The channel that is being added to the bridge can be in any state: unbridged,
* bridged, answered, unanswered, etc. The channel will be added asynchronously,
* meaning that when this function returns once the channel has been added to
@@ -87,11 +89,16 @@ int ast_bridge_call_with_flags(struct ast_channel *chan, struct ast_channel *pee
* \param features Features for this channel in the bridge
* \param play_tone Indicates if a tone should be played to the channel
* \param xfersound Sound that should be used to indicate transfer with play_tone
+ *
+ * \note The features parameter must be NULL or obtained by
+ * ast_bridge_features_new(). You must not dereference features
+ * after calling even if the call fails.
+ *
* \retval 0 Success
* \retval -1 Failure
*/
int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan,
- struct ast_bridge_features *features, int play_tone, const char *xfersound);
+ struct ast_bridge_features *features, int play_tone, const char *xfersound);
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 125473fa5..696074159 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -19,6 +19,13 @@
#ifndef _RES_PJSIP_H
#define _RES_PJSIP_H
+#include <pjsip.h>
+/* Needed for SUBSCRIBE, NOTIFY, and PUBLISH method definitions */
+#include <pjsip_simple.h>
+#include <pjsip/sip_transaction.h>
+#include <pj/timer.h>
+#include <pjlib.h>
+
#include "asterisk/stringfields.h"
/* Needed for struct ast_sockaddr */
#include "asterisk/netsock2.h"
@@ -1174,8 +1181,9 @@ struct ast_sip_auth *ast_sip_get_artificial_auth(void);
*/
struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
-/*!
- * \page Threading model for SIP
+/*! \defgroup pjsip_threading PJSIP Threading Model
+ * @{
+ * \page PJSIP PJSIP Threading Model
*
* There are three major types of threads that SIP will have to deal with:
* \li Asterisk threads
@@ -1224,6 +1232,19 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* previous tasks pushed with the same serializer have completed. For more information
* on serializers and the benefits they provide, see \ref ast_threadpool_serializer
*
+ * \par Scheduler
+ *
+ * Some situations require that a task run periodically or at a future time. Normally
+ * the ast_sched functionality would be used but ast_sched only uses 1 thread for all
+ * tasks and that thread isn't registered with PJLIB and therefore can't do any PJSIP
+ * related work.
+ *
+ * ast_sip_sched uses ast_sched only as a scheduled queue. When a task is ready to run,
+ * it's pushed to a Serializer to be invoked asynchronously by a Servant. This ensures
+ * that the task is executed in a PJLIB registered thread and allows the ast_sched thread
+ * to immediately continue processing the queue. The Serializer used by ast_sip_sched
+ * is one of your choosing or a random one from the res_pjsip pool if you don't choose one.
+ *
* \note
*
* Do not make assumptions about individual threads based on a corresponding serializer.
@@ -1232,6 +1253,8 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* tasks, even though they are all guaranteed to be executed in sequence.
*/
+typedef int (*ast_sip_task)(void *user_data);
+
/*!
* \brief Create a new serializer for SIP tasks
*
@@ -1369,6 +1392,214 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
int ast_sip_thread_is_servant(void);
/*!
+ * \brief Task flags for the res_pjsip scheduler
+ *
+ * The default is AST_SIP_SCHED_TASK_FIXED
+ * | AST_SIP_SCHED_TASK_DATA_NOT_AO2
+ * | AST_SIP_SCHED_TASK_DATA_NO_CLEANUP
+ * | AST_SIP_SCHED_TASK_PERIODIC
+ */
+enum ast_sip_scheduler_task_flags {
+ /*!
+ * The defaults
+ */
+ AST_SIP_SCHED_TASK_DEFAULTS = (0 << 0),
+
+ /*!
+ * Run at a fixed interval.
+ * Stop scheduling if the callback returns 0.
+ * Any other value is ignored.
+ */
+ AST_SIP_SCHED_TASK_FIXED = (0 << 0),
+ /*!
+ * Run at a variable interval.
+ * Stop scheduling if the callback returns 0.
+ * Any other return value is used as the new interval.
+ */
+ AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),
+
+ /*!
+ * The task data is not an AO2 object.
+ */
+ AST_SIP_SCHED_TASK_DATA_NOT_AO2 = (0 << 1),
+ /*!
+ * The task data is an AO2 object.
+ * A reference count will be held by the scheduler until
+ * after the task has run for the final time (if ever).
+ */
+ AST_SIP_SCHED_TASK_DATA_AO2 = (1 << 1),
+
+ /*!
+ * Don't take any cleanup action on the data
+ */
+ AST_SIP_SCHED_TASK_DATA_NO_CLEANUP = (0 << 3),
+ /*!
+ * If AST_SIP_SCHED_TASK_DATA_AO2 is set, decrement the reference count
+ * otherwise call ast_free on it.
+ */
+ AST_SIP_SCHED_TASK_DATA_FREE = ( 1 << 3 ),
+
+ /*! \brief AST_SIP_SCHED_TASK_PERIODIC
+ * The task is scheduled at multiples of interval
+ * \see Interval
+ */
+ AST_SIP_SCHED_TASK_PERIODIC = (0 << 4),
+ /*! \brief AST_SIP_SCHED_TASK_DELAY
+ * The next invocation of the task is at last finish + interval
+ * \see Interval
+ */
+ AST_SIP_SCHED_TASK_DELAY = (1 << 4),
+};
+
+/*!
+ * \brief Scheduler task data structure
+ */
+struct ast_sip_sched_task;
+
+/*!
+ * \brief Schedule a task to run in the res_pjsip thread pool
+ * \since 13.9.0
+ *
+ * \param serializer The serializer to use. If NULL, don't use a serializer (see note below)
+ * \param interval The invocation interval in milliseconds (see note below)
+ * \param sip_task The task to invoke
+ * \param name An optional name to associate with the task
+ * \param task_data Optional data to pass to the task
+ * \param flags One of enum ast_sip_scheduler_task_type
+ *
+ * \returns Pointer to \ref ast_sip_sched_task ao2 object which must be dereferenced when done.
+ *
+ * \paragraph Serialization
+ *
+ * Specifying a serializer guarantees serialized execution but NOT specifying a serializer
+ * may still result in tasks being effectively serialized if the thread pool is busy.
+ * The point of the serializer BTW is not to prevent parallel executions of the SAME task.
+ * That happens automatically (see below). It's to prevent the task from running at the same
+ * time as other work using the same serializer, whether or not it's being run by the scheduler.
+ *
+ * \paragraph Interval
+ *
+ * The interval is used to calculate the next time the task should run. There are two models.
+ *
+ * \ref AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at the
+ * specific interval. That is, every \ref "interval" milliseconds, regardless of how long the task
+ * takes. If the task takes longer than \ref interval, it will be scheduled at the next available
+ * multiple of \ref interval. For exmaple: If the task has an interval of 60 seconds and the task
+ * takes 70 seconds, the next invocation will happen at 120 seconds.
+ *
+ * \ref AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should start
+ * at \ref interval milliseconds after the current invocation has finished.
+ *
+ */
+struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
+ int interval, ast_sip_task sip_task, char *name, void *task_data,
+ enum ast_sip_scheduler_task_flags flags);
+
+/*!
+ * \brief Cancels the next invocation of a task
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Only cancels future invocations not the currently running invocation.
+ */
+int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Cancels the next invocation of a task by name
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Only cancels future invocations not the currently running invocation.
+ */
+int ast_sip_sched_task_cancel_by_name(const char *name);
+
+/*!
+ * \brief Gets the last start and end times of the task
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \param[out] when_queued Pointer to a timeval structure to contain the time when queued
+ * \param[out] last_start Pointer to a timeval structure to contain the time when last started
+ * \param[out] last_end Pointer to a timeval structure to contain the time when last ended
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Any of the pointers can be NULL if you don't need them.
+ */
+int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
+ struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end);
+
+/*!
+ * \brief Gets the last start and end times of the task by name
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \param[out] when_queued Pointer to a timeval structure to contain the time when queued
+ * \param[out] last_start Pointer to a timeval structure to contain the time when last started
+ * \param[out] last_end Pointer to a timeval structure to contain the time when last ended
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Any of the pointers can be NULL if you don't need them.
+ */
+int ast_sip_sched_task_get_times_by_name(const char *name,
+ struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end);
+
+/*!
+ * \brief Gets the number of milliseconds until the next invocation
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled
+ */
+int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Gets the number of milliseconds until the next invocation
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled
+ */
+int ast_sip_sched_task_get_next_run_by_name(const char *name);
+
+/*!
+ * \brief Checks if the task is currently running
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 not running
+ * \retval 1 running
+ */
+int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Checks if the task is currently running
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \retval 0 not running or not found
+ * \retval 1 running
+ */
+int ast_sip_sched_is_task_running_by_name(const char *name);
+
+/*!
+ * \brief Gets the task name
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 success
+ * \retval 1 failure
+ */
+int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen);
+
+/*!
+ * @}
+ */
+
+/*!
* \brief SIP body description
*
* This contains a type and subtype that will be added as
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 16b30ccb3..4fc295bc4 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -416,14 +416,14 @@ const struct timeval *stasis_message_timestamp(const struct stasis_message *msg)
* May return \c NULL, to indicate no representation. The returned object should
* be ast_json_unref()'ed.
*
- * \param message Message to convert to JSON string.
+ * \param msg Message to convert to JSON string.
* \param sanitize Snapshot sanitization callback.
*
* \return Newly allocated string with JSON message.
* \return \c NULL on error.
* \return \c NULL if JSON format is not supported.
*/
-struct ast_json *stasis_message_to_json(struct stasis_message *message, struct stasis_message_sanitizer *sanitize);
+struct ast_json *stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize);
/*!
* \brief Build the AMI representation of the message.
@@ -431,12 +431,21 @@ struct ast_json *stasis_message_to_json(struct stasis_message *message, struct s
* May return \c NULL, to indicate no representation. The returned object should
* be ao2_cleanup()'ed.
*
- * \param message Message to convert to AMI.
+ * \param msg Message to convert to AMI.
* \return \c NULL on error.
* \return \c NULL if AMI format is not supported.
*/
-struct ast_manager_event_blob *stasis_message_to_ami(
- struct stasis_message *message);
+struct ast_manager_event_blob *stasis_message_to_ami(struct stasis_message *msg);
+
+/*!
+ * \brief Determine if the given message can be converted to AMI.
+ *
+ * \param msg Message to see if can be converted to AMI.
+ *
+ * \retval 0 Cannot be converted
+ * \retval non-zero Can be converted
+ */
+int stasis_message_can_be_ami(struct stasis_message *msg);
/*!
* \brief Build the \ref AstGenericEvents representation of the message.
@@ -444,12 +453,11 @@ struct ast_manager_event_blob *stasis_message_to_ami(
* May return \c NULL, to indicate no representation. The returned object should
* be disposed of via \ref ast_event_destroy.
*
- * \param message Message to convert to AMI.
+ * \param msg Message to convert to AMI.
* \return \c NULL on error.
* \return \c NULL if AMI format is not supported.
*/
-struct ast_event *stasis_message_to_event(
- struct stasis_message *message);
+struct ast_event *stasis_message_to_event(struct stasis_message *msg);
/*! @} */
diff --git a/main/bridge.c b/main/bridge.c
index fd83cfb7b..ee5ad735b 100644
--- a/main/bridge.c
+++ b/main/bridge.c
@@ -420,10 +420,12 @@ static void bridge_channel_complete_join(struct ast_bridge *bridge, struct ast_b
bridge->technology->name);
if (bridge->technology->join
&& bridge->technology->join(bridge, bridge_channel)) {
- ast_debug(1, "Bridge %s: %p(%s) failed to join %s technology\n",
+ /* We cannot leave the channel partially in the bridge so we must kick it out */
+ ast_debug(1, "Bridge %s: %p(%s) failed to join %s technology (Kicking it out)\n",
bridge->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan),
bridge->technology->name);
bridge_channel->just_joined = 1;
+ ast_bridge_channel_leave_bridge(bridge_channel, BRIDGE_CHANNEL_STATE_END, 0);
return;
}
@@ -1483,6 +1485,150 @@ void ast_bridge_notify_masquerade(struct ast_channel *chan)
ao2_ref(bridge_channel, -1);
}
+/*!
+ * \brief Internal bridge impart wait condition and associated conditional.
+ */
+struct bridge_channel_impart_cond {
+ AST_LIST_ENTRY(bridge_channel_impart_cond) node;
+ /*! Lock for the data structure */
+ ast_mutex_t lock;
+ /*! Wait condition */
+ ast_cond_t cond;
+ /*! Wait until done */
+ int done;
+};
+
+AST_LIST_HEAD_NOLOCK(bridge_channel_impart_ds_head, bridge_channel_impart_cond);
+
+/*!
+ * \internal
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
+ *
+ * \param ds_head List of imparting threads to wake up.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_signal(struct bridge_channel_impart_ds_head *ds_head)
+{
+ if (ds_head) {
+ struct bridge_channel_impart_cond *cond;
+
+ while ((cond = AST_LIST_REMOVE_HEAD(ds_head, node))) {
+ ast_mutex_lock(&cond->lock);
+ cond->done = 1;
+ ast_cond_signal(&cond->cond);
+ ast_mutex_unlock(&cond->lock);
+ }
+ }
+}
+
+static void bridge_channel_impart_ds_head_dtor(void *doomed)
+{
+ bridge_channel_impart_ds_head_signal(doomed);
+ ast_free(doomed);
+}
+
+/*!
+ * \internal
+ * \brief Fixup the bridge impart datastore.
+ * \since 13.9.0
+ *
+ * \param data Bridge impart datastore data to fixup from old_chan.
+ * \param old_chan The datastore is moving from this channel.
+ * \param new_chan The datastore is moving to this channel.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+{
+ /*
+ * Signal any waiting impart threads. The masquerade is going to kill
+ * old_chan and we don't need to be waiting on new_chan.
+ */
+ bridge_channel_impart_ds_head_signal(data);
+}
+
+static const struct ast_datastore_info bridge_channel_impart_ds_info = {
+ .type = "bridge-impart-ds",
+ .destroy = bridge_channel_impart_ds_head_dtor,
+ .chan_fixup = bridge_channel_impart_ds_head_fixup,
+};
+
+/*!
+ * \internal
+ * \brief Add impart wait datastore conditional to channel.
+ * \since 13.9.0
+ *
+ * \param chan Channel to add the impart wait conditional.
+ * \param cond Imparting conditional to add.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int bridge_channel_impart_add(struct ast_channel *chan, struct bridge_channel_impart_cond *cond)
+{
+ struct ast_datastore *datastore;
+ struct bridge_channel_impart_ds_head *ds_head;
+
+ ast_channel_lock(chan);
+
+ datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+ if (!datastore) {
+ datastore = ast_datastore_alloc(&bridge_channel_impart_ds_info, NULL);
+ if (!datastore) {
+ ast_channel_unlock(chan);
+ return -1;
+ }
+ ds_head = ast_calloc(1, sizeof(*ds_head));
+ if (!ds_head) {
+ ast_channel_unlock(chan);
+ ast_datastore_free(datastore);
+ return -1;
+ }
+ datastore->data = ds_head;
+ ast_channel_datastore_add(chan, datastore);
+ } else {
+ ds_head = datastore->data;
+ ast_assert(ds_head != NULL);
+ }
+
+ AST_LIST_INSERT_TAIL(ds_head, cond, node);
+
+ ast_channel_unlock(chan);
+ return 0;
+}
+
+void bridge_channel_impart_signal(struct ast_channel *chan)
+{
+ struct ast_datastore *datastore;
+
+ ast_channel_lock(chan);
+ datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+ if (datastore) {
+ bridge_channel_impart_ds_head_signal(datastore->data);
+ }
+ ast_channel_unlock(chan);
+}
+
+/*!
+ * \internal
+ * \brief Block imparting channel thread until signaled.
+ * \since 13.9.0
+ *
+ * \param cond Imparting conditional to wait for.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_wait(struct bridge_channel_impart_cond *cond)
+{
+ ast_mutex_lock(&cond->lock);
+ while (!cond->done) {
+ ast_cond_wait(&cond->cond, &cond->lock);
+ }
+ ast_mutex_unlock(&cond->lock);
+}
+
/*
* XXX ASTERISK-21271 make ast_bridge_join() require features to be allocated just like ast_bridge_impart() and not expect the struct back.
*
@@ -1551,7 +1697,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
}
if (!res) {
- res = bridge_channel_internal_join(bridge_channel, NULL);
+ res = bridge_channel_internal_join(bridge_channel);
}
/* Cleanup all the data in the bridge channel after it leaves the bridge. */
@@ -1568,6 +1714,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
join_exit:;
ast_bridge_run_after_callback(chan);
+ bridge_channel_impart_signal(chan);
if (!(ast_channel_softhangup_internal_flag(chan) & AST_SOFTHANGUP_ASYNCGOTO)
&& !ast_bridge_setup_after_goto(chan)) {
/* Claim the after bridge goto is an async goto destination. */
@@ -1581,14 +1728,13 @@ join_exit:;
/*! \brief Thread responsible for imparted bridged channels to be departed */
static void *bridge_channel_depart_thread(void *data)
{
- struct bridge_channel_internal_cond *cond = data;
- struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+ struct ast_bridge_channel *bridge_channel = data;
if (bridge_channel->callid) {
ast_callid_threadassoc_add(bridge_channel->callid);
}
- bridge_channel_internal_join(bridge_channel, cond);
+ bridge_channel_internal_join(bridge_channel);
/*
* cleanup
@@ -1601,6 +1747,8 @@ static void *bridge_channel_depart_thread(void *data)
bridge_channel->features = NULL;
ast_bridge_discard_after_callback(bridge_channel->chan, AST_BRIDGE_AFTER_CB_REASON_DEPART);
+ /* If join failed there will be impart threads waiting. */
+ bridge_channel_impart_signal(bridge_channel->chan);
ast_bridge_discard_after_goto(bridge_channel->chan);
return NULL;
@@ -1609,15 +1757,14 @@ static void *bridge_channel_depart_thread(void *data)
/*! \brief Thread responsible for independent imparted bridged channels */
static void *bridge_channel_ind_thread(void *data)
{
- struct bridge_channel_internal_cond *cond = data;
- struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+ struct ast_bridge_channel *bridge_channel = data;
struct ast_channel *chan;
if (bridge_channel->callid) {
ast_callid_threadassoc_add(bridge_channel->callid);
}
- bridge_channel_internal_join(bridge_channel, cond);
+ bridge_channel_internal_join(bridge_channel);
chan = bridge_channel->chan;
/* cleanup */
@@ -1634,15 +1781,18 @@ static void *bridge_channel_ind_thread(void *data)
ao2_ref(bridge_channel, -1);
ast_bridge_run_after_callback(chan);
+ /* If join failed there will be impart threads waiting. */
+ bridge_channel_impart_signal(chan);
ast_bridge_run_after_goto(chan);
return NULL;
}
-int ast_bridge_impart(struct ast_bridge *bridge,
+static int bridge_impart_internal(struct ast_bridge *bridge,
struct ast_channel *chan,
struct ast_channel *swap,
struct ast_bridge_features *features,
- enum ast_bridge_impart_flags flags)
+ enum ast_bridge_impart_flags flags,
+ struct bridge_channel_impart_cond *cond)
{
int res = 0;
struct ast_bridge_channel *bridge_channel;
@@ -1701,27 +1851,20 @@ int ast_bridge_impart(struct ast_bridge *bridge,
/* Actually create the thread that will handle the channel */
if (!res) {
- struct bridge_channel_internal_cond cond = {
- .done = 0,
- .bridge_channel = bridge_channel
- };
- ast_mutex_init(&cond.lock);
- ast_cond_init(&cond.cond, NULL);
-
+ res = bridge_channel_impart_add(chan, cond);
+ }
+ if (!res) {
if ((flags & AST_BRIDGE_IMPART_CHAN_MASK) == AST_BRIDGE_IMPART_CHAN_INDEPENDENT) {
res = ast_pthread_create_detached(&bridge_channel->thread, NULL,
- bridge_channel_ind_thread, &cond);
+ bridge_channel_ind_thread, bridge_channel);
} else {
res = ast_pthread_create(&bridge_channel->thread, NULL,
- bridge_channel_depart_thread, &cond);
+ bridge_channel_depart_thread, bridge_channel);
}
if (!res) {
- bridge_channel_internal_wait(&cond);
+ bridge_channel_impart_wait(cond);
}
-
- ast_cond_destroy(&cond.cond);
- ast_mutex_destroy(&cond.lock);
}
if (res) {
@@ -1742,6 +1885,32 @@ int ast_bridge_impart(struct ast_bridge *bridge,
return 0;
}
+int ast_bridge_impart(struct ast_bridge *bridge,
+ struct ast_channel *chan,
+ struct ast_channel *swap,
+ struct ast_bridge_features *features,
+ enum ast_bridge_impart_flags flags)
+{
+ struct bridge_channel_impart_cond cond = {
+ .done = 0,
+ };
+ int res;
+
+ ast_mutex_init(&cond.lock);
+ ast_cond_init(&cond.cond, NULL);
+
+ res = bridge_impart_internal(bridge, chan, swap, features, flags, &cond);
+ if (res) {
+ /* Impart failed. Signal any other waiting impart threads */
+ bridge_channel_impart_signal(chan);
+ }
+
+ ast_cond_destroy(&cond.cond);
+ ast_mutex_destroy(&cond.lock);
+
+ return res;
+}
+
int ast_bridge_depart(struct ast_channel *chan)
{
struct ast_bridge_channel *bridge_channel;
@@ -2318,6 +2487,9 @@ int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan,
if (chan_bridge) {
struct ast_bridge_channel *bridge_channel;
+ /* The channel is in a bridge so it is not getting any new features. */
+ ast_bridge_features_destroy(features);
+
ast_bridge_lock_both(bridge, chan_bridge);
bridge_channel = bridge_find_channel(chan_bridge, chan);
@@ -2340,9 +2512,6 @@ int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan,
bridge_dissolve_check_stolen(chan_bridge, bridge_channel);
ast_bridge_unlock(chan_bridge);
ast_bridge_unlock(bridge);
-
- /* The channel was in a bridge so it is not getting any new features. */
- ast_bridge_features_destroy(features);
} else {
/* Slightly less easy case. We need to yank channel A from
* where he currently is and impart him into our bridge.
@@ -2350,6 +2519,7 @@ int ast_bridge_add_channel(struct ast_bridge *bridge, struct ast_channel *chan,
yanked_chan = ast_channel_yank(chan);
if (!yanked_chan) {
ast_log(LOG_WARNING, "Could not gain control of channel %s\n", ast_channel_name(chan));
+ ast_bridge_features_destroy(features);
return -1;
}
if (ast_channel_state(yanked_chan) != AST_STATE_UP) {
diff --git a/main/bridge_channel.c b/main/bridge_channel.c
index c9262a84a..4baae3cc5 100644
--- a/main/bridge_channel.c
+++ b/main/bridge_channel.c
@@ -2117,13 +2117,14 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel,
if (bridge->dissolved
|| bridge_channel->state != BRIDGE_CHANNEL_STATE_WAIT
|| (swap && swap->state != BRIDGE_CHANNEL_STATE_WAIT)
- || bridge->v_table->push(bridge, bridge_channel, swap)
- || ast_bridge_channel_establish_roles(bridge_channel)) {
+ || bridge->v_table->push(bridge, bridge_channel, swap)) {
ast_debug(1, "Bridge %s: pushing %p(%s) into bridge failed\n",
bridge->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan));
return -1;
}
+ ast_bridge_channel_establish_roles(bridge_channel);
+
if (swap) {
int dissolve = ast_test_flag(&bridge->feature_flags, AST_BRIDGE_FLAG_DISSOLVE_EMPTY);
@@ -2636,27 +2637,7 @@ static void bridge_channel_event_join_leave(struct ast_bridge_channel *bridge_ch
ao2_iterator_destroy(&iter);
}
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond)
-{
- ast_mutex_lock(&cond->lock);
- while (!cond->done) {
- ast_cond_wait(&cond->cond, &cond->lock);
- }
- ast_mutex_unlock(&cond->lock);
-}
-
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond)
-{
- if (cond) {
- ast_mutex_lock(&cond->lock);
- cond->done = 1;
- ast_cond_signal(&cond->cond);
- ast_mutex_unlock(&cond->lock);
- }
-}
-
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
- struct bridge_channel_internal_cond *cond)
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel)
{
int res = 0;
struct ast_bridge_features *channel_features;
@@ -2686,7 +2667,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
bridge_channel->bridge->uniqueid,
bridge_channel,
ast_channel_name(bridge_channel->chan));
- bridge_channel_internal_signal(cond);
return -1;
}
ast_channel_internal_bridge_set(bridge_channel->chan, bridge_channel->bridge);
@@ -2721,8 +2701,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
}
bridge_reconfigured(bridge_channel->bridge, !bridge_channel->inhibit_colp);
- bridge_channel_internal_signal(cond);
-
if (bridge_channel->state == BRIDGE_CHANNEL_STATE_WAIT) {
/*
* Indicate a source change since this channel is entering the
@@ -2734,6 +2712,7 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
ast_indicate(bridge_channel->chan, AST_CONTROL_SRCCHANGE);
}
+ bridge_channel_impart_signal(bridge_channel->chan);
ast_bridge_unlock(bridge_channel->bridge);
/* Must release any swap ref after unlocking the bridge. */
diff --git a/main/core_unreal.c b/main/core_unreal.c
index e9b7a8d66..f2404dfca 100644
--- a/main/core_unreal.c
+++ b/main/core_unreal.c
@@ -808,7 +808,6 @@ int ast_unreal_channel_push_to_bridge(struct ast_channel *ast, struct ast_bridge
/* Impart the semi2 channel into the bridge */
if (ast_bridge_impart(bridge, chan, NULL, features,
AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) {
- ast_bridge_features_destroy(features);
ast_channel_unref(chan);
return -1;
}
diff --git a/main/features.c b/main/features.c
index 1810b1556..b96cbd68c 100644
--- a/main/features.c
+++ b/main/features.c
@@ -1104,7 +1104,6 @@ static int bridge_exec(struct ast_channel *chan, const char *data)
xfer_cfg ? xfer_cfg->xfersound : NULL);
ao2_cleanup(xfer_cfg);
if (bridge_add_failed) {
- ast_bridge_features_destroy(peer_features);
ast_bridge_features_cleanup(&chan_features);
ast_bridge_destroy(bridge, 0);
goto done;
diff --git a/main/format_cap.c b/main/format_cap.c
index 17ae18cd4..bf3bd1c4b 100644
--- a/main/format_cap.c
+++ b/main/format_cap.c
@@ -376,7 +376,7 @@ int ast_format_cap_update_by_allow_disallow(struct ast_format_cap *cap, const ch
}
- while ((this = strsep(&parse, ",|"))) {
+ while ((this = ast_strip(strsep(&parse, ",|")))) {
int framems = 0;
struct ast_format *format = NULL;
diff --git a/main/lock.c b/main/lock.c
index dd90d7bd9..03f1cd974 100644
--- a/main/lock.c
+++ b/main/lock.c
@@ -286,17 +286,19 @@ int __ast_pthread_mutex_lock(const char *filename, int lineno, const char *func,
if (wait_time > reported_wait && (wait_time % 5) == 0) {
__ast_mutex_logger("%s line %d (%s): Deadlock? waited %d sec for mutex '%s'?\n",
filename, lineno, func, (int) wait_time, mutex_name);
- ast_reentrancy_lock(lt);
+ if (lt) {
+ ast_reentrancy_lock(lt);
#ifdef HAVE_BKTR
- __dump_backtrace(&lt->backtrace[lt->reentrancy], canlog);
+ __dump_backtrace(&lt->backtrace[lt->reentrancy], canlog);
#endif
- __ast_mutex_logger("%s line %d (%s): '%s' was locked here.\n",
- lt->file[ROFFSET], lt->lineno[ROFFSET],
- lt->func[ROFFSET], mutex_name);
+ __ast_mutex_logger("%s line %d (%s): '%s' was locked here.\n",
+ lt->file[ROFFSET], lt->lineno[ROFFSET],
+ lt->func[ROFFSET], mutex_name);
#ifdef HAVE_BKTR
- __dump_backtrace(&lt->backtrace[ROFFSET], canlog);
+ __dump_backtrace(&lt->backtrace[ROFFSET], canlog);
#endif
- ast_reentrancy_unlock(lt);
+ ast_reentrancy_unlock(lt);
+ }
reported_wait = wait_time;
}
usleep(200);
diff --git a/main/manager.c b/main/manager.c
index 7c2155015..ba261e8e9 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1541,6 +1541,17 @@ static AST_RWLIST_HEAD_STATIC(manager_hooks, manager_custom_hook);
/*! \brief A container of event documentation nodes */
static AO2_GLOBAL_OBJ_STATIC(event_docs);
+static int __attribute__((format(printf, 9, 0))) __manager_event_sessions(
+ struct ao2_container *sessions,
+ int category,
+ const char *event,
+ int chancount,
+ struct ast_channel **chans,
+ const char *file,
+ int line,
+ const char *func,
+ const char *fmt,
+ ...);
static enum add_filter_result manager_add_filter(const char *filter_pattern, struct ao2_container *whitefilters, struct ao2_container *blackfilters);
static int match_filter(struct mansession *s, char *eventdata);
@@ -1679,37 +1690,75 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl
return res;
}
+#define manager_event_sessions(sessions, category, event, contents , ...) \
+ __manager_event_sessions(sessions, category, event, 0, NULL, __FILE__, __LINE__, __PRETTY_FUNCTION__, contents , ## __VA_ARGS__)
+
+#define any_manager_listeners(sessions) \
+ ((sessions && ao2_container_count(sessions)) || !AST_RWLIST_EMPTY(&manager_hooks))
+
static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
- RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
+ struct ao2_container *sessions;
+ struct ast_manager_event_blob *ev;
- ev = stasis_message_to_ami(message);
+ if (!stasis_message_can_be_ami(message)) {
+ /* Not an AMI message; disregard */
+ return;
+ }
+
+ sessions = ao2_global_obj_ref(mgr_sessions);
+ if (!any_manager_listeners(sessions)) {
+ /* Nobody is listening */
+ ao2_cleanup(sessions);
+ return;
+ }
- if (ev == NULL) {
- /* Not and AMI message; disregard */
+ ev = stasis_message_to_ami(message);
+ if (!ev) {
+ /* Conversion failure */
+ ao2_cleanup(sessions);
return;
}
- manager_event(ev->event_flags, ev->manager_event, "%s",
- ev->extra_fields);
+ manager_event_sessions(sessions, ev->event_flags, ev->manager_event,
+ "%s", ev->extra_fields);
+ ao2_ref(ev, -1);
+ ao2_cleanup(sessions);
}
static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
- struct ast_json_payload *payload = stasis_message_data(message);
- int class_type = ast_json_integer_get(ast_json_object_get(payload->json, "class_type"));
- const char *type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
- struct ast_json *event = ast_json_object_get(payload->json, "event");
- RAII_VAR(struct ast_str *, event_buffer, NULL, ast_free);
+ struct ast_json_payload *payload;
+ int class_type;
+ const char *type;
+ struct ast_json *event;
+ struct ast_str *event_buffer;
+ struct ao2_container *sessions;
+
+ sessions = ao2_global_obj_ref(mgr_sessions);
+ if (!any_manager_listeners(sessions)) {
+ /* Nobody is listening */
+ ao2_cleanup(sessions);
+ return;
+ }
+
+ payload = stasis_message_data(message);
+ class_type = ast_json_integer_get(ast_json_object_get(payload->json, "class_type"));
+ type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
+ event = ast_json_object_get(payload->json, "event");
event_buffer = ast_manager_str_from_json_object(event, NULL);
if (!event_buffer) {
ast_log(AST_LOG_WARNING, "Error while creating payload for event %s\n", type);
+ ao2_cleanup(sessions);
return;
}
- manager_event(class_type, type, "%s", ast_str_buffer(event_buffer));
+ manager_event_sessions(sessions, class_type, type,
+ "%s", ast_str_buffer(event_buffer));
+ ast_free(event_buffer);
+ ao2_cleanup(sessions);
}
void ast_manager_publish_event(const char *type, int class_type, struct ast_json *obj)
@@ -4698,7 +4747,7 @@ static int action_blind_transfer(struct mansession *s, const struct message *m)
const char *name = astman_get_header(m, "Channel");
const char *exten = astman_get_header(m, "Exten");
const char *context = astman_get_header(m, "Context");
- RAII_VAR(struct ast_channel *, chan, NULL, ao2_cleanup);
+ struct ast_channel *chan;
if (ast_strlen_zero(name)) {
astman_send_error(s, m, "No channel specified");
@@ -4735,6 +4784,7 @@ static int action_blind_transfer(struct mansession *s, const struct message *m)
break;
}
+ ast_channel_unref(chan);
return 0;
}
@@ -5907,7 +5957,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m
const char *actionid = astman_get_header(m, "ActionID");
char idText[256];
int numchans = 0;
- RAII_VAR(struct ao2_container *, channels, NULL, ao2_cleanup);
+ struct ao2_container *channels;
struct ao2_iterator it_chans;
struct stasis_message *msg;
@@ -5917,7 +5967,8 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m
idText[0] = '\0';
}
- if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
+ channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type());
+ if (!channels) {
astman_send_error(s, m, "Could not get cached channels");
return 0;
}
@@ -5969,6 +6020,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m
astman_send_list_complete_start(s, m, "CoreShowChannelsComplete", numchans);
astman_send_list_complete_end(s);
+ ao2_ref(channels, -1);
return 0;
}
@@ -6597,11 +6649,10 @@ static int append_event(const char *str, int category)
static void append_channel_vars(struct ast_str **pbuf, struct ast_channel *chan)
{
- RAII_VAR(struct varshead *, vars, NULL, ao2_cleanup);
+ struct varshead *vars;
struct ast_var_t *var;
vars = ast_channel_get_manager_vars(chan);
-
if (!vars) {
return;
}
@@ -6609,62 +6660,67 @@ static void append_channel_vars(struct ast_str **pbuf, struct ast_channel *chan)
AST_LIST_TRAVERSE(vars, var, entries) {
ast_str_append(pbuf, 0, "ChanVariable(%s): %s=%s\r\n", ast_channel_name(chan), var->name, var->value);
}
+ ao2_ref(vars, -1);
}
/* XXX see if can be moved inside the function */
AST_THREADSTORAGE(manager_event_buf);
#define MANAGER_EVENT_BUF_INITSIZE 256
-int __ast_manager_event_multichan(int category, const char *event, int chancount,
- struct ast_channel **chans, const char *file, int line, const char *func,
- const char *fmt, ...)
+static int __attribute__((format(printf, 9, 0))) __manager_event_sessions_va(
+ struct ao2_container *sessions,
+ int category,
+ const char *event,
+ int chancount,
+ struct ast_channel **chans,
+ const char *file,
+ int line,
+ const char *func,
+ const char *fmt,
+ va_list ap)
{
- RAII_VAR(struct ao2_container *, sessions, ao2_global_obj_ref(mgr_sessions), ao2_cleanup);
- struct mansession_session *session;
- struct manager_custom_hook *hook;
struct ast_str *auth = ast_str_alloca(MAX_AUTH_PERM_STRING);
const char *cat_str;
- va_list ap;
struct timeval now;
struct ast_str *buf;
int i;
- if (!(sessions && ao2_container_count(sessions)) && AST_RWLIST_EMPTY(&manager_hooks)) {
- return 0;
- }
-
- if (!(buf = ast_str_thread_get(&manager_event_buf, MANAGER_EVENT_BUF_INITSIZE))) {
+ buf = ast_str_thread_get(&manager_event_buf, MANAGER_EVENT_BUF_INITSIZE);
+ if (!buf) {
return -1;
}
cat_str = authority_to_str(category, &auth);
ast_str_set(&buf, 0,
- "Event: %s\r\nPrivilege: %s\r\n",
- event, cat_str);
+ "Event: %s\r\n"
+ "Privilege: %s\r\n",
+ event, cat_str);
if (timestampevents) {
now = ast_tvnow();
ast_str_append(&buf, 0,
- "Timestamp: %ld.%06lu\r\n",
- (long)now.tv_sec, (unsigned long) now.tv_usec);
+ "Timestamp: %ld.%06lu\r\n",
+ (long)now.tv_sec, (unsigned long) now.tv_usec);
}
if (manager_debug) {
static int seq;
+
ast_str_append(&buf, 0,
- "SequenceNumber: %d\r\n",
- ast_atomic_fetchadd_int(&seq, 1));
+ "SequenceNumber: %d\r\n",
+ ast_atomic_fetchadd_int(&seq, 1));
ast_str_append(&buf, 0,
- "File: %s\r\nLine: %d\r\nFunc: %s\r\n", file, line, func);
+ "File: %s\r\n"
+ "Line: %d\r\n"
+ "Func: %s\r\n",
+ file, line, func);
}
if (!ast_strlen_zero(ast_config_AST_SYSTEM_NAME)) {
ast_str_append(&buf, 0,
- "SystemName: %s\r\n",
- ast_config_AST_SYSTEM_NAME);
+ "SystemName: %s\r\n",
+ ast_config_AST_SYSTEM_NAME);
}
- va_start(ap, fmt);
ast_str_append_va(&buf, 0, fmt, ap);
- va_end(ap);
for (i = 0; i < chancount; i++) {
append_channel_vars(&buf, chans[i]);
}
@@ -6675,9 +6731,11 @@ int __ast_manager_event_multichan(int category, const char *event, int chancount
/* Wake up any sleeping sessions */
if (sessions) {
- struct ao2_iterator i;
- i = ao2_iterator_init(sessions, 0);
- while ((session = ao2_iterator_next(&i))) {
+ struct ao2_iterator iter;
+ struct mansession_session *session;
+
+ iter = ao2_iterator_init(sessions, 0);
+ while ((session = ao2_iterator_next(&iter))) {
ao2_lock(session);
if (session->waiting_thread != AST_PTHREADT_NULL) {
pthread_kill(session->waiting_thread, SIGURG);
@@ -6692,10 +6750,12 @@ int __ast_manager_event_multichan(int category, const char *event, int chancount
ao2_unlock(session);
unref_mansession(session);
}
- ao2_iterator_destroy(&i);
+ ao2_iterator_destroy(&iter);
}
if (category != EVENT_FLAG_SHUTDOWN && !AST_RWLIST_EMPTY(&manager_hooks)) {
+ struct manager_custom_hook *hook;
+
AST_RWLIST_RDLOCK(&manager_hooks);
AST_RWLIST_TRAVERSE(&manager_hooks, hook, list) {
hook->helper(category, event, ast_str_buffer(buf));
@@ -6706,6 +6766,50 @@ int __ast_manager_event_multichan(int category, const char *event, int chancount
return 0;
}
+static int __attribute__((format(printf, 9, 0))) __manager_event_sessions(
+ struct ao2_container *sessions,
+ int category,
+ const char *event,
+ int chancount,
+ struct ast_channel **chans,
+ const char *file,
+ int line,
+ const char *func,
+ const char *fmt,
+ ...)
+{
+ va_list ap;
+ int res;
+
+ va_start(ap, fmt);
+ res = __manager_event_sessions_va(sessions, category, event, chancount, chans,
+ file, line, func, fmt, ap);
+ va_end(ap);
+ return res;
+}
+
+int __ast_manager_event_multichan(int category, const char *event, int chancount,
+ struct ast_channel **chans, const char *file, int line, const char *func,
+ const char *fmt, ...)
+{
+ struct ao2_container *sessions = ao2_global_obj_ref(mgr_sessions);
+ va_list ap;
+ int res;
+
+ if (!any_manager_listeners(sessions)) {
+ /* Nobody is listening */
+ ao2_cleanup(sessions);
+ return 0;
+ }
+
+ va_start(ap, fmt);
+ res = __manager_event_sessions_va(sessions, category, event, chancount, chans,
+ file, line, func, fmt, ap);
+ va_end(ap);
+ ao2_cleanup(sessions);
+ return res;
+}
+
/*! \brief
* support functions to register/unregister AMI action handlers,
*/
@@ -9184,6 +9288,7 @@ int ast_str_append_event_header(struct ast_str **fields_string,
static void manager_event_blob_dtor(void *obj)
{
struct ast_manager_event_blob *ev = obj;
+
ast_string_field_free_memory(ev);
}
@@ -9195,18 +9300,19 @@ ast_manager_event_blob_create(
const char *extra_fields_fmt,
...)
{
- RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
+ struct ast_manager_event_blob *ev;
va_list argp;
ast_assert(extra_fields_fmt != NULL);
ast_assert(manager_event != NULL);
- ev = ao2_alloc(sizeof(*ev), manager_event_blob_dtor);
+ ev = ao2_alloc_options(sizeof(*ev), manager_event_blob_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!ev) {
return NULL;
}
if (ast_string_field_init(ev, 20)) {
+ ao2_ref(ev, -1);
return NULL;
}
@@ -9214,10 +9320,8 @@ ast_manager_event_blob_create(
ev->event_flags = event_flags;
va_start(argp, extra_fields_fmt);
- ast_string_field_ptr_build_va(ev, &ev->extra_fields, extra_fields_fmt,
- argp);
+ ast_string_field_ptr_build_va(ev, &ev->extra_fields, extra_fields_fmt, argp);
va_end(argp);
- ao2_ref(ev, +1);
return ev;
}
diff --git a/main/manager_channels.c b/main/manager_channels.c
index adef639e8..ef71c65b1 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -697,28 +697,33 @@ static void channel_hangup_request_cb(void *data,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
- RAII_VAR(struct ast_str *, extra, NULL, ast_free);
- RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+ struct ast_str *extra;
+ struct ast_str *channel_event_string;
struct ast_json *cause;
int is_soft;
char *manager_event = "HangupRequest";
+ if (!obj->snapshot) {
+ /* No snapshot? Likely an earlier allocation failure creating it. */
+ return;
+ }
+
extra = ast_str_create(20);
if (!extra) {
return;
}
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
-
if (!channel_event_string) {
+ ast_free(extra);
return;
}
cause = ast_json_object_get(obj->blob, "cause");
if (cause) {
ast_str_append(&extra, 0,
- "Cause: %jd\r\n",
- ast_json_integer_get(cause));
+ "Cause: %jd\r\n",
+ ast_json_integer_get(cause));
}
is_soft = ast_json_is_true(ast_json_object_get(obj->blob, "soft"));
@@ -727,9 +732,12 @@ static void channel_hangup_request_cb(void *data,
}
manager_event(EVENT_FLAG_CALL, manager_event,
- "%s%s",
- ast_str_buffer(channel_event_string),
- ast_str_buffer(extra));
+ "%s%s",
+ ast_str_buffer(channel_event_string),
+ ast_str_buffer(extra));
+
+ ast_free(channel_event_string);
+ ast_free(extra);
}
static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
diff --git a/main/stasis_message.c b/main/stasis_message.c
index c797cdfa0..99721ef3c 100644
--- a/main/stasis_message.c
+++ b/main/stasis_message.c
@@ -170,17 +170,17 @@ const struct timeval *stasis_message_timestamp(const struct stasis_message *msg)
return &msg->timestamp;
}
-#define INVOKE_VIRTUAL(fn, ...) \
- ({ \
- if (msg == NULL) { \
- return NULL; \
- } \
- ast_assert(msg->type != NULL); \
+#define INVOKE_VIRTUAL(fn, ...) \
+ ({ \
+ if (!msg) { \
+ return NULL; \
+ } \
+ ast_assert(msg->type != NULL); \
ast_assert(msg->type->vtable != NULL); \
- if (msg->type->vtable->fn == NULL) { \
- return NULL; \
- } \
- msg->type->vtable->fn(__VA_ARGS__); \
+ if (!msg->type->vtable->fn) { \
+ return NULL; \
+ } \
+ msg->type->vtable->fn(__VA_ARGS__); \
})
struct ast_manager_event_blob *stasis_message_to_ami(struct stasis_message *msg)
@@ -199,3 +199,18 @@ struct ast_event *stasis_message_to_event(struct stasis_message *msg)
{
return INVOKE_VIRTUAL(to_event, msg);
}
+
+#define HAS_VIRTUAL(fn, msg) \
+ ({ \
+ if (!msg) { \
+ return 0; \
+ } \
+ ast_assert(msg->type != NULL); \
+ ast_assert(msg->type->vtable != NULL); \
+ !!msg->type->vtable->fn; \
+ })
+
+int stasis_message_can_be_ami(struct stasis_message *msg)
+{
+ return HAS_VIRTUAL(to_ami, msg);
+}
diff --git a/res/res_agi.c b/res/res_agi.c
index ff3358062..e3839dd6d 100644
--- a/res/res_agi.c
+++ b/res/res_agi.c
@@ -3736,6 +3736,24 @@ static enum agi_result agi_handle_command(struct ast_channel *chan, AGI *agi, ch
return AGI_RESULT_SUCCESS;
}
+
+AST_LIST_HEAD_NOLOCK(deferred_frames, ast_frame);
+
+static void queue_deferred_frames(struct deferred_frames *deferred_frames,
+ struct ast_channel *chan)
+{
+ struct ast_frame *f;
+
+ if (!AST_LIST_EMPTY(deferred_frames)) {
+ ast_channel_lock(chan);
+ while ((f = AST_LIST_REMOVE_HEAD(deferred_frames, frame_list))) {
+ ast_queue_frame_head(chan, f);
+ ast_frfree(f);
+ }
+ ast_channel_unlock(chan);
+ }
+}
+
static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi, int pid, int *status, int dead, int argc, char *argv[])
{
struct ast_channel *c;
@@ -3754,6 +3772,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
const char *sighup_str;
const char *exit_on_hangup_str;
int exit_on_hangup;
+ struct deferred_frames deferred_frames;
+
+ AST_LIST_HEAD_INIT_NOLOCK(&deferred_frames);
ast_channel_lock(chan);
sighup_str = pbx_builtin_getvar_helper(chan, "AGISIGHUP");
@@ -3815,8 +3836,20 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
/* Write, ignoring errors */
if (write(agi->audio, f->data.ptr, f->datalen) < 0) {
}
+ ast_frfree(f);
+ } else if (ast_is_deferrable_frame(f)) {
+ struct ast_frame *dup_f;
+
+ if ((dup_f = ast_frisolate(f))) {
+ AST_LIST_INSERT_HEAD(&deferred_frames, dup_f, frame_list);
+ }
+
+ if (dup_f != f) {
+ ast_frfree(f);
+ }
+ } else {
+ ast_frfree(f);
}
- ast_frfree(f);
}
} else if (outfd > -1) {
size_t len = sizeof(buf);
@@ -3864,6 +3897,8 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
buf[buflen - 1] = '\0';
}
+ queue_deferred_frames(&deferred_frames, chan);
+
if (agidebug)
ast_verbose("<%s>AGI Rx << %s\n", ast_channel_name(chan), buf);
cmd_status = agi_handle_command(chan, agi, buf, dead);
@@ -3885,6 +3920,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
}
}
}
+
+ queue_deferred_frames(&deferred_frames, chan);
+
if (agi->speech) {
ast_speech_destroy(agi->speech);
}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 7f6175115..fc9fbe4dc 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -3608,11 +3608,7 @@ int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void
serializer = serializer_pool[pos];
}
- if (serializer) {
- return ast_taskprocessor_push(serializer, sip_task, task_data);
- } else {
- return ast_threadpool_push(sip_threadpool, sip_task, task_data);
- }
+ return ast_taskprocessor_push(serializer, sip_task, task_data);
}
struct sync_task_data {
@@ -4131,6 +4127,11 @@ static int load_module(void)
goto error;
}
+ if (ast_sip_initialize_scheduler()) {
+ ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
+ goto error;
+ }
+
/* Now load all the pjproject infrastructure. */
if (load_pjsip()) {
goto error;
@@ -4171,8 +4172,10 @@ static int load_module(void)
return AST_MODULE_LOAD_SUCCESS;
error:
- /* These functions all check for NULLs and are safe to call at any time */
unload_pjsip(NULL);
+
+ /* These functions all check for NULLs and are safe to call at any time */
+ ast_sip_destroy_scheduler();
serializer_pool_shutdown();
ast_threadpool_shutdown(sip_threadpool);
@@ -4203,7 +4206,7 @@ static int unload_module(void)
* so we have to push the work to the threadpool to handle
*/
ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL);
-
+ ast_sip_destroy_scheduler();
serializer_pool_shutdown();
ast_threadpool_shutdown(sip_threadpool);
diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h
index 72a4387f1..04cd85408 100644
--- a/res/res_pjsip/include/res_pjsip_private.h
+++ b/res/res_pjsip/include/res_pjsip_private.h
@@ -325,4 +325,23 @@ struct ast_sip_contact_status *ast_res_pjsip_find_or_create_contact_status(const
*/
int ast_sip_validate_uri_length(const char *uri);
+/*!
+ * \brief Initialize scheduler
+ * \since 13.9.0
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_initialize_scheduler(void);
+
+/*!
+ * \internal
+ * \brief Destroy scheduler
+ * \since 13.9.0
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_destroy_scheduler(void);
+
#endif /* RES_PJSIP_PRIVATE_H_ */
diff --git a/res/res_pjsip/location.c b/res/res_pjsip/location.c
index 19fa803f0..2a7dbb54f 100644
--- a/res/res_pjsip/location.c
+++ b/res/res_pjsip/location.c
@@ -417,38 +417,64 @@ static int permanent_uri_sort_fn(const void *obj_left, const void *obj_right, in
int ast_sip_validate_uri_length(const char *contact_uri)
{
- pjsip_uri *uri;
- pjsip_sip_uri *sip_uri;
- pj_pool_t *pool;
int max_length = pj_max_hostname - 1;
+ char *contact = ast_strdupa(contact_uri);
+ char *host;
+ char *at;
+ int theres_a_port = 0;
if (strlen(contact_uri) > pjsip_max_url_size - 1) {
return -1;
}
- if (!(pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "uri validation", 512, 512))) {
- ast_log(LOG_ERROR, "Unable to allocate pool for uri validation\n");
+ contact = ast_strip_quoted(contact, "<", ">");
+
+ if (!strncasecmp(contact, "sip:", 4)) {
+ host = contact + 4;
+ } else if (!strncasecmp(contact, "sips:", 5)) {
+ host = contact + 5;
+ } else {
+ /* Not a SIP URI */
return -1;
}
- if (!(uri = pjsip_parse_uri(pool, (char *)contact_uri, strlen(contact_uri), 0)) ||
- (!PJSIP_URI_SCHEME_IS_SIP(uri) && !PJSIP_URI_SCHEME_IS_SIPS(uri))) {
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
- return -1;
+ at = strchr(contact, '@');
+ if (at) {
+ /* sip[s]:user@host */
+ host = at + 1;
+ }
+
+ if (host[0] == '[') {
+ /* Host is an IPv6 address. Just get up to the matching bracket */
+ char *close_bracket;
+
+ close_bracket = strchr(host, ']');
+ if (!close_bracket) {
+ return -1;
+ }
+ close_bracket++;
+ if (*close_bracket == ':') {
+ theres_a_port = 1;
+ }
+ *close_bracket = '\0';
+ } else {
+ /* uri parameters could contain ';' so trim them off first */
+ host = strsep(&host, ";?");
+ /* Host is FQDN or IPv4 address. Need to find closing delimiter */
+ if (strchr(host, ':')) {
+ theres_a_port = 1;
+ host = strsep(&host, ":");
+ }
}
- sip_uri = pjsip_uri_get_uri(uri);
- if (sip_uri->port == 0) {
+ if (!theres_a_port) {
max_length -= strlen("_sips.tcp.");
}
- if (sip_uri->host.slen > max_length) {
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+ if (strlen(host) > max_length) {
return -1;
}
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-
return 0;
}
diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c
new file mode 100644
index 000000000..a5d406cb5
--- /dev/null
+++ b/res/res_pjsip/pjsip_scheduler.c
@@ -0,0 +1,495 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2016, Fairview 5 Engineering, LLC
+ *
+ * George Joseph <george.joseph@fairview5.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief res_pjsip Scheduler
+ *
+ * \author George Joseph <george.joseph@fairview5.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_REGISTER_FILE()
+
+#include "asterisk/res_pjsip.h"
+#include "include/res_pjsip_private.h"
+#include "asterisk/res_pjsip_cli.h"
+
+#define TASK_BUCKETS 53
+
+static struct ast_sched_context *scheduler_context;
+static struct ao2_container *tasks;
+static int task_count;
+
+struct ast_sip_sched_task {
+ /*! ast_sip_sched task id */
+ uint32_t task_id;
+ /*! ast_sched scheudler id */
+ int current_scheduler_id;
+ /*! task is currently running */
+ int is_running;
+ /*! task */
+ ast_sip_task task;
+ /*! task data */
+ void *task_data;
+ /*! reschedule interval in milliseconds */
+ int interval;
+ /*! the time the task was queued */
+ struct timeval when_queued;
+ /*! the last time the task was started */
+ struct timeval last_start;
+ /*! the last time the task was ended */
+ struct timeval last_end;
+ /*! times run */
+ int run_count;
+ /*! the task reschedule, cleanup and policy flags */
+ enum ast_sip_scheduler_task_flags flags;
+ /*! the serializer to be used (if any) */
+ struct ast_taskprocessor *serializer;
+ /* A name to be associated with the task */
+ char name[0];
+};
+
+AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name);
+AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name);
+AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name);
+
+static int push_to_serializer(const void *data);
+
+/*
+ * This function is run in the context of the serializer.
+ * It runs the task with a simple call and reschedules based on the result.
+ */
+static int run_task(void *data)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
+ int res;
+ int delay;
+
+ ao2_lock(schtd);
+ schtd->last_start = ast_tvnow();
+ schtd->is_running = 1;
+ schtd->run_count++;
+ ao2_unlock(schtd);
+
+ res = schtd->task(schtd->task_data);
+
+ ao2_lock(schtd);
+ schtd->is_running = 0;
+ schtd->last_end = ast_tvnow();
+
+ /*
+ * Don't restart if the task returned 0 or if the interval
+ * was set to 0 while the task was running
+ */
+ if (!res || !schtd->interval) {
+ schtd->interval = 0;
+ ao2_unlock(schtd);
+ ao2_unlink(tasks, schtd);
+ return -1;
+ }
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) {
+ schtd->interval = res;
+ }
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
+ delay = schtd->interval;
+ } else {
+ delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
+ }
+
+ schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd);
+ if (schtd->current_scheduler_id < 0) {
+ schtd->interval = 0;
+ ao2_unlock(schtd);
+ ao2_unlink(tasks, schtd);
+ return -1;
+ }
+
+ ao2_unlock(schtd);
+
+ return 0;
+}
+
+/*
+ * This function is run by the scheduler thread. Its only job is to push the task
+ * to the serialize and return. It returns 0 so it's not rescheduled.
+ */
+static int push_to_serializer(const void *data)
+{
+ struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
+
+ if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
+ ao2_ref(schtd, -1);
+ }
+
+ return 0;
+}
+
+int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
+{
+ int res;
+
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
+ ao2_unlock_and_unref(schtd);
+ return 0;
+ }
+
+ schtd->interval = 0;
+ ao2_unlock_and_unref(schtd);
+ ao2_unlink(tasks, schtd);
+ res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
+
+ return res;
+}
+
+int ast_sip_sched_task_cancel_by_name(const char *name)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return -1;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return -1;
+ }
+
+ return ast_sip_sched_task_cancel(schtd);
+}
+
+
+int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
+ struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
+{
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ if (queued) {
+ memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
+ }
+ if (last_start) {
+ memcpy(last_start, &schtd->last_start, sizeof(struct timeval));
+ }
+ if (last_end) {
+ memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
+ }
+
+ ao2_unlock_and_unref(schtd);
+
+ return 0;
+}
+
+int ast_sip_sched_task_get_times_by_name(const char *name,
+ struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return -1;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return -1;
+ }
+
+ return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
+}
+
+int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
+{
+ if (maxlen <= 0) {
+ return -1;
+ }
+
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ ast_copy_string(name, schtd->name, maxlen);
+
+ ao2_unlock_and_unref(schtd);
+
+ return 0;
+}
+
+int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
+{
+ int delay;
+ struct timeval since_when;
+ struct timeval now;
+
+ if (!ao2_ref_and_lock(schtd)) {
+ return -1;
+ }
+
+ if (schtd->interval) {
+ delay = schtd->interval;
+ now = ast_tvnow();
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
+ since_when = schtd->is_running ? now : schtd->last_end;
+ } else {
+ since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued;
+ }
+
+ delay -= ast_tvdiff_ms(now, since_when);
+
+ delay = delay < 0 ? 0 : delay;
+ } else {
+ delay = -1;
+ }
+
+ ao2_unlock_and_unref(schtd);
+
+ return delay;
+}
+
+int ast_sip_sched_task_get_next_run_by_name(const char *name)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return -1;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return -1;
+ }
+
+ return ast_sip_sched_task_get_next_run(schtd);
+}
+
+int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
+{
+ if (!schtd) {
+ return 0;
+ }
+
+ return schtd->is_running;
+}
+
+int ast_sip_sched_is_task_running_by_name(const char *name)
+{
+ RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(name)) {
+ return 0;
+ }
+
+ schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!schtd) {
+ return 0;
+ }
+
+ return schtd->is_running;
+}
+
+static void schtd_destructor(void *data)
+{
+ struct ast_sip_sched_task *schtd = data;
+
+ if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
+ /* release our own ref, then release the callers if asked to do so */
+ ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
+ } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
+ ast_free(schtd->task_data);
+ }
+}
+
+struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
+ int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags)
+{
+#define ID_LEN 13 /* task_deadbeef */
+ struct ast_sip_sched_task *schtd;
+ int res;
+
+ if (interval < 0) {
+ return NULL;
+ }
+
+ schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
+ if (!schtd) {
+ return NULL;
+ }
+
+ schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
+ schtd->serializer = serializer;
+ schtd->task = sip_task;
+ if (!ast_strlen_zero(name)) {
+ strcpy(schtd->name, name); /* Safe */
+ } else {
+ sprintf(schtd->name, "task_%08x", schtd->task_id);
+ }
+ schtd->task_data = task_data;
+ schtd->flags = flags;
+ schtd->interval = interval;
+ schtd->when_queued = ast_tvnow();
+
+ if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
+ ao2_ref(task_data, +1);
+ }
+ res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd);
+ if (res < 0) {
+ ao2_ref(schtd, -1);
+ return NULL;
+ } else {
+ schtd->current_scheduler_id = res;
+ ao2_link(tasks, schtd);
+ }
+
+ return schtd;
+#undef ID_LEN
+}
+
+static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct ao2_iterator i;
+ struct ast_sip_sched_task *schtd;
+ const char *log_format = ast_logger_get_dateformat();
+ struct ast_tm tm;
+ char queued[32];
+ char last_start[32];
+ char last_end[32];
+ int datelen;
+ struct timeval now = ast_tvnow();
+ const char *separator = "======================================";
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "pjsip show scheduled_tasks";
+ e->usage = "Usage: pjsip show scheduled_tasks\n"
+ " Show all scheduled tasks\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != 3) {
+ return CLI_SHOWUSAGE;
+ }
+
+ ast_localtime(&now, &tm, NULL);
+ datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
+
+ ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
+
+ ast_cli(a->fd, " %1$-24s %2$-8s %3$-9s %4$-7s %6$-*5$s %7$-*5$s %8$-*5$s\n",
+ "Task Name", "Interval", "Times Run", "State",
+ datelen, "Queued", "Last Started", "Last Ended");
+
+ ast_cli(a->fd, " %1$-24.24s %2$-8.8s %3$-9.9s %4$-7.7s %6$-*5$.*5$s %7$-*5$.*5$s %8$-*5$.*5$s\n",
+ separator, separator, separator, separator,
+ datelen, separator, separator, separator);
+
+
+ ao2_ref(tasks, +1);
+ ao2_rdlock(tasks);
+ i = ao2_iterator_init(tasks, 0);
+ while ((schtd = ao2_iterator_next(&i))) {
+
+ ast_localtime(&schtd->when_queued, &tm, NULL);
+ ast_strftime(queued, sizeof(queued), log_format, &tm);
+
+ if (ast_tvzero(schtd->last_start)) {
+ strcpy(last_start, "not yet started");
+ } else {
+ ast_localtime(&schtd->last_start, &tm, NULL);
+ ast_strftime(last_start, sizeof(last_start), log_format, &tm);
+ }
+
+ if (ast_tvzero(schtd->last_end)) {
+ if (ast_tvzero(schtd->last_start)) {
+ strcpy(last_end, "not yet started");
+ } else {
+ strcpy(last_end, "running");
+ }
+ } else {
+ ast_localtime(&schtd->last_end, &tm, NULL);
+ ast_strftime(last_end, sizeof(last_end), log_format, &tm);
+ }
+
+ ast_cli(a->fd, " %1$-24.24s %2$-8.3f %3$-9d %4$-7s %6$-*5$s %7$-*5$s %8$-*5$s\n",
+ schtd->name,
+ schtd->interval / 1000.0,
+ schtd->run_count,
+ schtd->is_running ? "running" : "waiting",
+ datelen, queued, last_start, last_end);
+ ao2_cleanup(schtd);
+ }
+ ao2_iterator_destroy(&i);
+ ao2_unlock(tasks);
+ ao2_ref(tasks, -1);
+ ast_cli(a->fd, "\n");
+
+ return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry cli_commands[] = {
+ AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"),
+};
+
+int ast_sip_initialize_scheduler(void)
+{
+ if (!(scheduler_context = ast_sched_context_create())) {
+ ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
+ return -1;
+ }
+
+ if (ast_sched_start_thread(scheduler_context)) {
+ ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
+ ast_sched_context_destroy(scheduler_context);
+ return -1;
+ }
+
+ tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
+ TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
+ if (!tasks) {
+ ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
+ ast_sched_context_destroy(scheduler_context);
+ return -1;
+ }
+
+ ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
+ return 0;
+}
+
+int ast_sip_destroy_scheduler(void)
+{
+ ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
+ if (scheduler_context) {
+ ast_sched_context_destroy(scheduler_context);
+ }
+
+ ao2_cleanup(tasks);
+ tasks = NULL;
+
+ return 0;
+}
diff --git a/res/res_pjsip_caller_id.c b/res/res_pjsip_caller_id.c
index 9af2a8a64..efa1b89a8 100644
--- a/res/res_pjsip_caller_id.c
+++ b/res/res_pjsip_caller_id.c
@@ -424,6 +424,12 @@ static pjsip_fromto_hdr *create_new_id_hdr(const pj_str_t *hdr_name, pjsip_fromt
ast_escape_quoted(id->name.str, name_buf, name_buf_len);
pj_strdup2(tdata->pool, &id_name_addr->display, name_buf);
+ } else {
+ /*
+ * We need to clear the remnants of the clone or it'll be left set.
+ * pj_strdup2 is safe to call with a NULL src and it resets both slen and ptr.
+ */
+ pj_strdup2(tdata->pool, &id_name_addr->display, NULL);
}
pj_strdup2(tdata->pool, &id_uri->user, id->number.str);
diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c
index 776700490..851a5f77d 100644
--- a/res/res_pjsip_registrar.c
+++ b/res/res_pjsip_registrar.c
@@ -576,7 +576,6 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co
ao2_cleanup(contact_update);
} else {
/* We want to report the user agent that was actually in the removed contact */
- user_agent = ast_strdupa(contact->user_agent);
ast_sip_location_delete_contact(contact);
ast_verb(3, "Removed contact '%s' from AOR '%s' due to request\n", contact_uri, aor_name);
ast_test_suite_event_notify("AOR_CONTACT_REMOVED",
@@ -585,7 +584,7 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co
"UserAgent: %s",
contact_uri,
aor_name,
- user_agent);
+ contact->user_agent);
}
}
diff --git a/res/res_pjsip_transport_management.c b/res/res_pjsip_transport_management.c
index eb0240438..afd94eb1f 100644
--- a/res/res_pjsip_transport_management.c
+++ b/res/res_pjsip_transport_management.c
@@ -24,6 +24,8 @@
#include "asterisk.h"
+#include <signal.h>
+
#include <pjsip.h>
#include <pjsip_ua.h>
@@ -93,7 +95,7 @@ static void *keepalive_transport_thread(void *data)
/* Once loaded this module just keeps on going as it is unsafe to stop and change the underlying
* callback for the transport manager.
*/
- while (1) {
+ while (keepalive_interval) {
sleep(keepalive_interval);
ao2_callback(transports, OBJ_NODATA, keepalive_transport_cb, NULL);
}
@@ -347,7 +349,19 @@ static int load_module(void)
static int unload_module(void)
{
- /* This will never get called */
+ pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
+
+ if (keepalive_interval) {
+ keepalive_interval = 0;
+ pthread_kill(keepalive_thread, SIGURG);
+ pthread_join(keepalive_thread, NULL);
+ }
+
+ ast_sched_context_destroy(sched);
+ ao2_ref(transports, -1);
+
+ ast_sip_unregister_service(&idle_monitor_module);
+ pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback);
return 0;
}
diff --git a/res/stasis/control.c b/res/stasis/control.c
index 97b0b8809..b2b076b73 100644
--- a/res/stasis/control.c
+++ b/res/stasis/control.c
@@ -323,7 +323,7 @@ static int app_control_dial(struct stasis_app_control *control,
AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) {
ast_hangup(new_chan);
} else {
- control_add_channel_to_bridge(control, chan, bridge);
+ control_swap_channel_in_bridge(control, bridge, chan, NULL);
}
return 0;
@@ -982,11 +982,8 @@ static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
ast_bridge_after_cb_reason_string(reason));
}
-int control_add_channel_to_bridge(
- struct stasis_app_control *control,
- struct ast_channel *chan, void *data)
+int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap)
{
- struct ast_bridge *bridge = data;
int res;
if (!control || !bridge) {
@@ -1039,7 +1036,7 @@ int control_add_channel_to_bridge(
res = ast_bridge_impart(bridge,
chan,
- NULL, /* swap channel */
+ swap,
NULL, /* features */
AST_BRIDGE_IMPART_CHAN_DEPARTABLE);
if (res != 0) {
@@ -1055,6 +1052,11 @@ int control_add_channel_to_bridge(
return 0;
}
+int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data)
+{
+ return control_swap_channel_in_bridge(control, data, chan, NULL);
+}
+
int stasis_app_control_add_channel_to_bridge(
struct stasis_app_control *control, struct ast_bridge *bridge)
{
diff --git a/res/stasis/control.h b/res/stasis/control.h
index 1d37a494a..868a8091b 100644
--- a/res/stasis/control.h
+++ b/res/stasis/control.h
@@ -111,12 +111,20 @@ struct stasis_app *control_app(struct stasis_app_control *control);
* \brief Command callback for adding a channel to a bridge
*
* \param control The control for chan
- * \param channel The channel on which commands should be executed
- * \param bridge Data to be passed to the callback
+ * \param chan The channel on which commands should be executed
+ * \param data Bridge to be passed to the callback
+ */
+int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data);
+
+/*!
+ * \brief Command for swapping a channel in a bridge
+ *
+ * \param control The control for chan
+ * \param chan The channel on which commands should be executed
+ * \param bridge Bridge to be passed to the callback
+ * \param swap Channel to swap with when joining the bridge
*/
-int control_add_channel_to_bridge(
- struct stasis_app_control *control,
- struct ast_channel *chan, void *obj);
+int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap);
/*!
* \brief Stop playing silence to a channel right now.
diff --git a/res/stasis/stasis_bridge.c b/res/stasis/stasis_bridge.c
index e41088134..9ffc2d7be 100644
--- a/res/stasis/stasis_bridge.c
+++ b/res/stasis/stasis_bridge.c
@@ -76,24 +76,54 @@ static void bridge_stasis_run_cb(struct ast_channel *chan, void *data)
pbx_exec(chan, app_stasis, app_name);
}
-static int add_channel_to_bridge(
+struct defer_bridge_add_obj {
+ /*! Bridge to join (has ref) */
+ struct ast_bridge *bridge;
+ /*!
+ * \brief Channel to swap with in the bridge. (has ref)
+ *
+ * \note NULL if not swapping with a channel.
+ */
+ struct ast_channel *swap;
+};
+
+static void defer_bridge_add_dtor(void *obj)
+{
+ struct defer_bridge_add_obj *defer = obj;
+
+ ao2_cleanup(defer->bridge);
+ ast_channel_cleanup(defer->swap);
+}
+
+static int defer_bridge_add(
struct stasis_app_control *control,
struct ast_channel *chan, void *obj)
{
- struct ast_bridge *bridge = obj;
- int res;
+ struct defer_bridge_add_obj *defer = obj;
- res = control_add_channel_to_bridge(control,
- chan, bridge);
- return res;
+ return control_swap_channel_in_bridge(control, defer->bridge, chan, defer->swap);
}
static void bridge_stasis_queue_join_action(struct ast_bridge *self,
- struct ast_bridge_channel *bridge_channel)
+ struct ast_bridge_channel *bridge_channel, struct ast_bridge_channel *swap)
{
+ struct defer_bridge_add_obj *defer;
+
+ defer = ao2_alloc_options(sizeof(*defer), defer_bridge_add_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!defer) {
+ return;
+ }
+ ao2_ref(self, +1);
+ defer->bridge = self;
+ if (swap) {
+ ast_channel_ref(swap->chan);
+ defer->swap = swap->chan;
+ }
+
ast_channel_lock(bridge_channel->chan);
- command_prestart_queue_command(bridge_channel->chan, add_channel_to_bridge,
- ao2_bump(self), __ao2_cleanup);
+ command_prestart_queue_command(bridge_channel->chan, defer_bridge_add,
+ defer, __ao2_cleanup);
ast_channel_unlock(bridge_channel->chan);
}
@@ -167,18 +197,19 @@ static int bridge_stasis_push(struct ast_bridge *self, struct ast_bridge_channel
if (!control && !stasis_app_channel_is_internal(bridge_channel->chan)) {
/* channel not in Stasis(), get it there */
+ ast_debug(1, "Bridge %s: pushing non-stasis %p(%s) setup to come back in under stasis\n",
+ self->uniqueid, bridge_channel, ast_channel_name(bridge_channel->chan));
+
/* Attach after-bridge callback and pass ownership of swap_app to it */
if (ast_bridge_set_after_callback(bridge_channel->chan,
bridge_stasis_run_cb, NULL, NULL)) {
- ast_log(LOG_ERROR, "Failed to set after bridge callback\n");
+ ast_log(LOG_ERROR,
+ "Failed to set after bridge callback for bridge %s non-stasis push of %s\n",
+ self->uniqueid, ast_channel_name(bridge_channel->chan));
return -1;
}
- bridge_stasis_queue_join_action(self, bridge_channel);
- if (swap) {
- /* nudge the swap channel out of the bridge */
- ast_bridge_channel_leave_bridge(swap, BRIDGE_CHANNEL_STATE_END_NO_DISSOLVE, 0);
- }
+ bridge_stasis_queue_join_action(self, bridge_channel, swap);
/* Return -1 so the push fails and the after-bridge callback gets called
* This keeps the bridging framework from putting the channel into the bridge
diff --git a/tests/test_channel_feature_hooks.c b/tests/test_channel_feature_hooks.c
index fbc9786cc..c5d3b9b86 100644
--- a/tests/test_channel_feature_hooks.c
+++ b/tests/test_channel_feature_hooks.c
@@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/bridge.h"
#include "asterisk/bridge_basic.h"
#include "asterisk/features.h"
+#include "asterisk/format_cache.h"
#define TEST_CATEGORY "/channels/features/"
@@ -47,6 +48,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#define TEST_BACKEND_NAME "Features Test Logging"
+#define TEST_CHANNEL_FORMAT ast_format_slin
+
/*! \brief A channel technology used for the unit tests */
static struct ast_channel_tech test_features_chan_tech = {
.type = CHANNEL_TECH_NAME,
@@ -94,6 +97,11 @@ static void wait_for_unbridged(struct ast_channel *channel)
#define START_CHANNEL(channel, name, number) do { \
channel = ast_channel_alloc(0, AST_STATE_UP, number, name, number, number, \
"default", NULL, NULL, 0, CHANNEL_TECH_NAME "/" name); \
+ ast_channel_nativeformats_set(channel, test_features_chan_tech.capabilities); \
+ ast_channel_set_rawwriteformat(channel, TEST_CHANNEL_FORMAT); \
+ ast_channel_set_rawreadformat(channel, TEST_CHANNEL_FORMAT); \
+ ast_channel_set_writeformat(channel, TEST_CHANNEL_FORMAT); \
+ ast_channel_set_readformat(channel, TEST_CHANNEL_FORMAT); \
ast_channel_unlock(channel); \
} while (0)
@@ -329,12 +337,19 @@ static int unload_module(void)
AST_TEST_UNREGISTER(test_features_channel_interval);
ast_channel_unregister(&test_features_chan_tech);
+ ao2_cleanup(test_features_chan_tech.capabilities);
+ test_features_chan_tech.capabilities = NULL;
return 0;
}
static int load_module(void)
{
+ test_features_chan_tech.capabilities = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
+ if (!test_features_chan_tech.capabilities) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ ast_format_cap_append(test_features_chan_tech.capabilities, TEST_CHANNEL_FORMAT, 0);
ast_channel_register(&test_features_chan_tech);
AST_TEST_REGISTER(test_features_channel_dtmf);
diff --git a/tests/test_message.c b/tests/test_message.c
index f7ee02730..f73901ea6 100644
--- a/tests/test_message.c
+++ b/tests/test_message.c
@@ -232,8 +232,8 @@ static int user_event_hook_cb(int category, const char *event, char *body)
static int handler_wait_for_message(struct ast_test *test)
{
int error = 0;
- struct timeval wait_now = ast_tvnow();
- struct timespec wait_time = { .tv_sec = wait_now.tv_sec + 1, .tv_nsec = wait_now.tv_usec * 1000 };
+ struct timeval wait = ast_tvadd(ast_tvnow(), ast_tv(5 /* seconds */, 0));
+ struct timespec wait_time = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000 };
ast_mutex_lock(&handler_lock);
while (!handler_received_message) {
@@ -253,8 +253,8 @@ static int handler_wait_for_message(struct ast_test *test)
static int user_event_wait_for_events(struct ast_test *test, int expected_events)
{
int error;
- struct timeval wait_now = ast_tvnow();
- struct timespec wait_time = { .tv_sec = wait_now.tv_sec + 1, .tv_nsec = wait_now.tv_usec * 1000 };
+ struct timeval wait = ast_tvadd(ast_tvnow(), ast_tv(5 /* seconds */, 0));
+ struct timespec wait_time = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000 };
expected_user_events = expected_events;
diff --git a/tests/test_res_pjsip_scheduler.c b/tests/test_res_pjsip_scheduler.c
new file mode 100644
index 000000000..f9a1633ac
--- /dev/null
+++ b/tests/test_res_pjsip_scheduler.c
@@ -0,0 +1,400 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2016, Fairview 5 Engineering, LLC
+ *
+ * George Joseph <george.joseph@fairview5.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \brief res_pjsip scheduler tests
+ *
+ * \author George Joseph <george.joseph@fairview5.com>
+ *
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <depend>res_pjsip</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_REGISTER_FILE()
+
+#include <pjsip.h>
+#include "asterisk/test.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/res_pjsip.h"
+#include "asterisk/utils.h"
+
+#define CATEGORY "/res/res_pjsip/scheduler/"
+
+struct test_data {
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ pthread_t tid;
+ struct timeval test_start;
+ struct timeval task_start;
+ struct timeval task_end;
+ int is_servant;
+ int interval;
+ int sleep;
+ int done;
+ struct ast_test *test;
+};
+
+#define S2U(x) (long int)(x * 1000 * 1000)
+#define M2U(x) (long int)(x * 1000)
+
+static int task_1(void *data)
+{
+ struct test_data *test = data;
+
+ test->done = 0;
+ test->task_start = ast_tvnow();
+ test->tid = pthread_self();
+ test->is_servant = ast_sip_thread_is_servant();
+ usleep(M2U(test->sleep));
+ test->task_end = ast_tvnow();
+
+ ast_mutex_lock(&test->lock);
+ test->done = 1;
+ ast_mutex_unlock(&test->lock);
+ ast_cond_signal(&test->cond);
+
+ return test->interval;
+}
+
+
+static void data_cleanup(void *data)
+{
+ struct test_data *test_data = data;
+ ast_mutex_destroy(&test_data->lock);
+ ast_cond_destroy(&test_data->cond);
+}
+
+#define waitfor(x) \
+{ \
+ ast_mutex_lock(&(x)->lock); \
+ while (!(x)->done) { \
+ ast_cond_wait(&(x)->cond, &(x)->lock); \
+ } \
+ (x)->done = 0; \
+ ast_mutex_unlock(&(x)->lock); \
+}
+
+static int scheduler(struct ast_test *test, int serialized)
+{
+ RAII_VAR(struct ast_taskprocessor *, tp1, NULL, ast_taskprocessor_unreference);
+ RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup);
+ RAII_VAR(struct test_data *, test_data2, ao2_alloc(sizeof(*test_data2), data_cleanup), ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task1, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task2, NULL, ao2_cleanup);
+ int duration;
+ int delay;
+ struct timeval task1_start;
+
+ ast_test_validate(test, test_data1 != NULL);
+ ast_test_validate(test, test_data2 != NULL);
+
+ test_data1->test = test;
+ test_data1->test_start = ast_tvnow();
+ test_data1->interval = 2000;
+ test_data1->sleep = 1000;
+ ast_mutex_init(&test_data1->lock);
+ ast_cond_init(&test_data1->cond, NULL);
+
+ test_data2->test = test;
+ test_data2->test_start = ast_tvnow();
+ test_data2->interval = 2000;
+ test_data2->sleep = 1000;
+ ast_mutex_init(&test_data2->lock);
+ ast_cond_init(&test_data2->cond, NULL);
+
+ if (serialized) {
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ (test_data1->interval + test_data1->sleep + (MAX(test_data1->interval - test_data2->interval, 0)) + test_data2->sleep) / 1000.0);
+ tp1 = ast_sip_create_serializer();
+ ast_test_validate(test, (tp1 != NULL));
+ } else {
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ ((MAX(test_data1->interval, test_data2->interval) + MAX(test_data1->sleep, test_data2->sleep)) / 1000.0));
+ }
+
+ task1 = ast_sip_schedule_task(tp1, test_data1->interval, task_1, NULL, test_data1, AST_SIP_SCHED_TASK_FIXED);
+ ast_test_validate(test, task1 != NULL);
+
+ task2 = ast_sip_schedule_task(tp1, test_data2->interval, task_1, NULL, test_data2, AST_SIP_SCHED_TASK_FIXED);
+ ast_test_validate(test, task2 != NULL);
+
+ waitfor(test_data1);
+ ast_sip_sched_task_cancel(task1);
+ ast_test_validate(test, test_data1->is_servant);
+
+ duration = ast_tvdiff_ms(test_data1->task_end, test_data1->test_start);
+ ast_test_validate(test, (duration > ((test_data1->interval + test_data1->sleep) * 0.9))
+ && (duration < ((test_data1->interval + test_data1->sleep) * 1.1)));
+
+ ast_sip_sched_task_get_times(task1, NULL, &task1_start, NULL);
+ delay = ast_tvdiff_ms(task1_start, test_data1->test_start);
+ ast_test_validate(test, (delay > (test_data1->interval * 0.9)
+ && (delay < (test_data1->interval * 1.1))));
+
+ waitfor(test_data2);
+ ast_sip_sched_task_cancel(task2);
+ ast_test_validate(test, test_data2->is_servant);
+
+ if (serialized) {
+ ast_test_validate(test, test_data1->tid == test_data2->tid);
+ ast_test_validate(test, ast_tvdiff_ms(test_data2->task_start, test_data1->task_end) >= 0);
+ } else {
+ ast_test_validate(test, test_data1->tid != test_data2->tid);
+ }
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(serialized_scheduler)
+{
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip serialized scheduler";
+ info->description = "Test res_pjsip serialized scheduler";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ return scheduler(test, 1);
+}
+
+AST_TEST_DEFINE(unserialized_scheduler)
+{
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip unserialized scheduler";
+ info->description = "Test res_pjsip unserialized scheduler";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ return scheduler(test, 0);
+}
+
+static int run_count;
+static int destruct_count;
+
+static int dummy_task(void *data)
+{
+ int *sleep = data;
+
+ usleep(M2U(*sleep));
+ run_count++;
+
+ return 0;
+}
+
+static void test_destructor(void *data)
+{
+ destruct_count++;
+}
+
+AST_TEST_DEFINE(scheduler_cleanup)
+{
+ RAII_VAR(int *, sleep, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+ int interval;
+ int when;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip scheduler cleanup";
+ info->description = "Test res_pjsip scheduler cleanup";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ destruct_count = 0;
+ interval = 1000;
+
+ sleep = ao2_alloc(sizeof(*sleep), test_destructor);
+ ast_test_validate(test, sleep != NULL);
+ *sleep = 500;
+
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ ((interval * 1.1) + *sleep) / 1000.0);
+
+ task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep,
+ AST_SIP_SCHED_TASK_DATA_AO2 | AST_SIP_SCHED_TASK_DATA_FREE);
+ ast_test_validate(test, task != NULL);
+ usleep(M2U(interval * 0.5));
+ when = ast_sip_sched_task_get_next_run(task);
+ ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6)));
+ usleep(M2U(interval * 0.6));
+ ast_test_validate(test, ast_sip_sched_is_task_running(task));
+
+ usleep(M2U(*sleep));
+
+ ast_test_validate(test, (ast_sip_sched_is_task_running(task) == 0));
+ when = ast_sip_sched_task_get_next_run(task);
+ ast_test_validate(test, (when < 0), res, error);
+ ast_test_validate(test, (ao2_ref(task, 0) == 1));
+ ao2_ref(task, -1);
+ task = NULL;
+ ast_test_validate(test, (destruct_count == 1));
+ sleep = NULL;
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(scheduler_cancel)
+{
+ RAII_VAR(int *, sleep, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+ int interval;
+ int when;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip scheduler cancel task";
+ info->description = "Test res_pjsip scheduler cancel task";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ destruct_count = 0;
+ interval = 1000;
+
+ sleep = ao2_alloc(sizeof(*sleep), test_destructor);
+ ast_test_validate(test, sleep != NULL);
+ *sleep = 500;
+
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ (interval + *sleep) / 1000.0);
+
+ task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep, AST_SIP_SCHED_TASK_DATA_NO_CLEANUP);
+ ast_test_validate(test, task != NULL);
+
+ usleep(M2U(interval * 0.5));
+ when = ast_sip_sched_task_get_next_run_by_name("dummy");
+ ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6)));
+ ast_test_validate(test, !ast_sip_sched_is_task_running_by_name("dummy"));
+ ast_test_validate(test, ao2_ref(task, 0) == 2);
+
+ ast_sip_sched_task_cancel_by_name("dummy");
+
+ when = ast_sip_sched_task_get_next_run(task);
+ ast_test_validate(test, when < 0);
+
+ usleep(M2U(interval));
+ ast_test_validate(test, run_count == 0);
+ ast_test_validate(test, destruct_count == 0);
+ ast_test_validate(test, ao2_ref(task, 0) == 1);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(scheduler_policy)
+{
+ RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup);
+ RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+ int when;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = CATEGORY;
+ info->summary = "Test res_pjsip scheduler cancel task";
+ info->description = "Test res_pjsip scheduler cancel task";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ ast_test_validate(test, test_data1 != NULL);
+
+ destruct_count = 0;
+ run_count = 0;
+ test_data1->test = test;
+ test_data1->test_start = ast_tvnow();
+ test_data1->interval = 1000;
+ test_data1->sleep = 500;
+ ast_mutex_init(&test_data1->lock);
+ ast_cond_init(&test_data1->cond, NULL);
+
+ ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+ ((test_data1->interval * 3) + test_data1->sleep) / 1000.0);
+
+ task = ast_sip_schedule_task(NULL, test_data1->interval, task_1, "test_1", test_data1,
+ AST_SIP_SCHED_TASK_DATA_NO_CLEANUP | AST_SIP_SCHED_TASK_PERIODIC);
+ ast_test_validate(test, task != NULL);
+
+ waitfor(test_data1);
+ when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+ ast_test_validate(test, when > test_data1->interval * 0.9 && when < test_data1->interval * 1.1);
+
+ waitfor(test_data1);
+ when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+ ast_test_validate(test, when > test_data1->interval * 2 * 0.9 && when < test_data1->interval * 2 * 1.1);
+
+ waitfor(test_data1);
+ when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+ ast_test_validate(test, when > test_data1->interval * 3 * 0.9 && when < test_data1->interval * 3 * 1.1);
+
+ ast_sip_sched_task_cancel(task);
+ ao2_ref(task, -1);
+ task = NULL;
+
+ return AST_TEST_PASS;
+}
+
+static int load_module(void)
+{
+ CHECK_PJSIP_MODULE_LOADED();
+
+ AST_TEST_REGISTER(serialized_scheduler);
+ AST_TEST_REGISTER(unserialized_scheduler);
+ AST_TEST_REGISTER(scheduler_cleanup);
+ AST_TEST_REGISTER(scheduler_cancel);
+ AST_TEST_REGISTER(scheduler_policy);
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+static int unload_module(void)
+{
+ AST_TEST_UNREGISTER(scheduler_cancel);
+ AST_TEST_UNREGISTER(scheduler_cleanup);
+ AST_TEST_UNREGISTER(unserialized_scheduler);
+ AST_TEST_UNREGISTER(serialized_scheduler);
+ AST_TEST_UNREGISTER(scheduler_policy);
+ return 0;
+}
+
+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "res_pjsip scheduler test module");
diff --git a/third-party/pjproject/patches/0001-sip_parser.c-Remove-wholesale-strip-from-parse_param.patch b/third-party/pjproject/patches/0001-sip_parser.c-Remove-wholesale-strip-from-parse_param.patch
new file mode 100644
index 000000000..e0bd9129c
--- /dev/null
+++ b/third-party/pjproject/patches/0001-sip_parser.c-Remove-wholesale-strip-from-parse_param.patch
@@ -0,0 +1,55 @@
+From ce426249ec1270f27560919791f3e13eaeea9152 Mon Sep 17 00:00:00 2001
+From: George Joseph <george.joseph@fairview5.com>
+Date: Tue, 12 Apr 2016 14:09:53 -0600
+Subject: [PATCH] sip_parser.c: Remove wholesale '[]' strip from
+ parse_param_impl
+
+The wholesale stripping of '[]' from header parameters causes issues if
+something (like a port) occurrs after the final ']'.
+
+'[2001:a::b]' will correctly parse to '2001:a::b'
+'[2001:a::b]:8080' will correctly parse to '2001:a::b' but the scanner is left
+with ':8080' and parsing stops with a syntax error.
+
+I can't even find a case where stripping the '[]' is a good thing anyway. Even
+if you continued to parse and resulted in a string that looks like this...
+'2001:a::b:8080', it's not valid.
+
+This came up in Asterisk because Kamailio sends us a Contact with an alias
+URI parameter that has an IPv6 address in it like this:
+Contact: <sip:1171@127.0.0.1:5080;alias=[2001:1:2::3]~43691~6>
+which should be legal but causes a syntax error because of the characters
+after the final ']'. Even if it didn't, the '[]' should still not be stripped.
+
+I've run the Asterisk Test Suite for PJSIP (252 tests) many of which are IPv6
+enabled. No issues were caused by removing the code that strips the '[]'.
+
+I tried running 'make pjsip-test' but that fails even without my change. :)
+
+The Asterisk ticket is: https://issues.asterisk.org/jira/browse/ASTERISK-25123
+---
+ pjsip/src/pjsip/sip_parser.c | 8 --------
+ 1 file changed, 8 deletions(-)
+
+diff --git a/pjsip/src/pjsip/sip_parser.c b/pjsip/src/pjsip/sip_parser.c
+index c18faa3..98eb5ea 100644
+--- a/pjsip/src/pjsip/sip_parser.c
++++ b/pjsip/src/pjsip/sip_parser.c
+@@ -1149,14 +1149,6 @@ static void parse_param_imp( pj_scanner *scanner, pj_pool_t *pool,
+ pvalue->ptr++;
+ pvalue->slen -= 2;
+ }
+- } else if (*scanner->curptr == '[') {
+- /* pvalue can be a quoted IPv6; in this case, the
+- * '[' and ']' quote characters are to be removed
+- * from the pvalue.
+- */
+- pj_scan_get_char(scanner);
+- pj_scan_get_until_ch(scanner, ']', pvalue);
+- pj_scan_get_char(scanner);
+ } else if(pj_cis_match(spec, *scanner->curptr)) {
+ parser_get_and_unescape(scanner, pool, spec, esc_spec, pvalue);
+ }
+--
+2.5.5
+