summaryrefslogtreecommitdiff
path: root/res/res_rtp_asterisk.c
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2017-03-22 16:05:49 -0500
committerRichard Mudgett <rmudgett@digium.com>2017-04-19 13:40:57 -0500
commitd165079cbcb51c94ce729b97f25b42cdbd7b443e (patch)
tree4aa762d04c6f313acd7b3d5333af1d6dfcbcc1d7 /res/res_rtp_asterisk.c
parentb84abac1449c56d6c9f781525a533691b326ad8c (diff)
rtp_engine/res_rtp_asterisk: Fix RTP struct reentrancy crashes.
The struct ast_rtp_instance has historically been indirectly protected from reentrancy issues by the channel lock because early channel drivers held the lock for really long times. Holding the channel lock for such a long time has caused many deadlock problems in the past. Along comes chan_pjsip/res_pjsip which doesn't necessarily hold the channel lock because sometimes there may not be an associated channel created yet or the channel pointer isn't available. In the case of ASTERISK-26835 a pjsip serializer thread was processing a message's SDP body while another thread was reading a RTP packet from the socket. Both threads wound up changing the rtp->rtcp->local_addr_str string and interfering with each other. The classic reentrancy problem resulted in a crash. In the case of ASTERISK-26853 a pjsip serializer thread was processing a message's SDP body while another thread was reading a RTP packet from the socket. Both threads wound up processing ICE candidates in PJPROJECT and interfering with each other. The classic reentrancy problem resulted in a crash. * rtp_engine.c: Make the ast_rtp_instance_xxx() calls lock the RTP instance struct. * rtp_engine.c: Make ICE and DTLS wrapper functions to lock the RTP instance struct for the API call. * res_rtp_asterisk.c: Lock the RTP instance to prevent a reentrancy problem with rtp->rtcp->local_addr_str in the scheduler thread running ast_rtcp_write(). * res_rtp_asterisk.c: Avoid deadlock when local RTP bridging in bridge_p2p_rtp_write() because there are two RTP instance structs involved. * res_rtp_asterisk.c: Avoid deadlock when trying to stop scheduler callbacks. We cannot hold the instance lock when trying to stop a scheduler callback. * res_rtp_asterisk.c: Remove the lock in struct dtls_details and use the struct ast_rtp_instance ao2 object lock instead. The lock was used to synchronize two threads to prevent a race condition between starting and stopping a timeout timer. The race condition is no longer present between dtls_perform_handshake() and __rtp_recvfrom() because the instance lock prevents these functions from overlapping each other with regards to the timeout timer. * res_rtp_asterisk.c: Remove the lock in struct ast_rtp and use the struct ast_rtp_instance ao2 object lock instead. The lock was used to synchronize two threads using a condition signal to know when TURN negotiations complete. * res_rtp_asterisk.c: Avoid deadlock when trying to stop the TURN ioqueue_worker_thread(). We cannot hold the instance lock when trying to create or shut down the worker thread without a risk of deadlock. This patch exposed a race condition between a PJSIP serializer thread setting up an ICE session in ice_create() and another thread reading RTP packets. * res_rtp_asterisk.c:ice_create(): Set the new rtp->ice pointer after we have re-locked the RTP instance to prevent the other thread from trying to process ICE packets on an incomplete ICE session setup. A similar race condition is between a PJSIP serializer thread resetting up an ICE session in ice_create() and the timer_worker_thread() processing the completion of the previous ICE session. * res_rtp_asterisk.c:ast_rtp_on_ice_complete(): Protect against an uninitialized/null remote_address after calling update_address_with_ice_candidate(). * res_rtp_asterisk.c: Eliminate the chance of ice_reset_session() destroying and setting the rtp->ice pointer to NULL while other threads are using it by adding an ao2 wrapper around the PJPROJECT ice pointer. Now when we have to unlock the RTP instance object to call a PJPROJECT ICE function we will hold a ref to the wrapper. Also added some rtp->ice NULL checks after we relock the RTP instance and have to do something with the ICE structure. ASTERISK-26835 #close ASTERISK-26853 #close Change-Id: I780b39ec935dcefcce880d50c1a7261744f1d1b4
Diffstat (limited to 'res/res_rtp_asterisk.c')
-rw-r--r--res/res_rtp_asterisk.c622
1 files changed, 477 insertions, 145 deletions
diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c
index a7103860b..e2638320c 100644
--- a/res/res_rtp_asterisk.c
+++ b/res/res_rtp_asterisk.c
@@ -222,7 +222,6 @@ struct rtp_learning_info {
#ifdef HAVE_OPENSSL_SRTP
struct dtls_details {
- ast_mutex_t lock; /*!< Lock for timeout timer synchronization */
SSL *ssl; /*!< SSL session */
BIO *read_bio; /*!< Memory buffer for reading */
BIO *write_bio; /*!< Memory buffer for writing */
@@ -232,6 +231,13 @@ struct dtls_details {
};
#endif
+#ifdef HAVE_PJPROJECT
+/*! An ao2 wrapper protecting the PJPROJECT ice structure with ref counting. */
+struct ice_wrap {
+ pj_ice_sess *real_ice; /*!< ICE session */
+};
+#endif
+
/*! \brief RTP session description */
struct ast_rtp {
int s;
@@ -307,11 +313,10 @@ struct ast_rtp {
struct rtp_red *red;
- ast_mutex_t lock; /*!< Lock for synchronization purposes */
- ast_cond_t cond; /*!< Condition for signaling */
-
#ifdef HAVE_PJPROJECT
- pj_ice_sess *ice; /*!< ICE session */
+ ast_cond_t cond; /*!< ICE/TURN condition for signaling */
+
+ struct ice_wrap *ice; /*!< ao2 wrapped ICE session */
pj_turn_sock *turn_rtp; /*!< RTP TURN relay */
pj_turn_sock *turn_rtcp; /*!< RTCP TURN relay */
pj_turn_state_t turn_state; /*!< Current state of the TURN relay session */
@@ -477,7 +482,7 @@ static void dtls_srtp_start_timeout_timer(struct ast_rtp_instance *instance, str
static void dtls_srtp_stop_timeout_timer(struct ast_rtp_instance *instance, struct ast_rtp *rtp, int rtcp);
#endif
-static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *ice, int use_srtp);
+static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *via_ice, int use_srtp);
#ifdef HAVE_PJPROJECT
/*! \brief Helper function which clears the ICE host candidate mapping */
@@ -513,17 +518,20 @@ static void host_candidate_overrides_apply(unsigned int count, pj_sockaddr addrs
}
/*! \brief Helper function which updates an ast_sockaddr with the candidate used for the component */
-static void update_address_with_ice_candidate(struct ast_rtp *rtp, enum ast_rtp_ice_component_type component,
+static void update_address_with_ice_candidate(pj_ice_sess *ice, enum ast_rtp_ice_component_type component,
struct ast_sockaddr *cand_address)
{
char address[PJ_INET6_ADDRSTRLEN];
- if (!rtp->ice || (component < 1) || !rtp->ice->comp[component - 1].valid_check) {
+ if (component < 1 || !ice->comp[component - 1].valid_check) {
return;
}
- ast_sockaddr_parse(cand_address, pj_sockaddr_print(&rtp->ice->comp[component - 1].valid_check->rcand->addr, address, sizeof(address), 0), 0);
- ast_sockaddr_set_port(cand_address, pj_sockaddr_get_port(&rtp->ice->comp[component - 1].valid_check->rcand->addr));
+ ast_sockaddr_parse(cand_address,
+ pj_sockaddr_print(&ice->comp[component - 1].valid_check->rcand->addr, address,
+ sizeof(address), 0), 0);
+ ast_sockaddr_set_port(cand_address,
+ pj_sockaddr_get_port(&ice->comp[component - 1].valid_check->rcand->addr));
}
/*! \brief Destructor for locally created ICE candidates */
@@ -540,6 +548,7 @@ static void ast_rtp_ice_candidate_destroy(void *obj)
}
}
+/*! \pre instance is locked */
static void ast_rtp_ice_set_authentication(struct ast_rtp_instance *instance, const char *ufrag, const char *password)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -567,6 +576,7 @@ static int ice_candidate_cmp(void *obj, void *arg, int flags)
return CMP_MATCH | CMP_STOP;
}
+/*! \pre instance is locked */
static void ast_rtp_ice_add_remote_candidate(struct ast_rtp_instance *instance, const struct ast_rtp_engine_ice_candidate *candidate)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -631,40 +641,60 @@ static void pj_thread_register_check(void)
static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *addr,
int port, int replace);
+/*! \pre instance is locked */
static void ast_rtp_ice_stop(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ struct ice_wrap *ice;
- if (!rtp->ice) {
- return;
+ ice = rtp->ice;
+ rtp->ice = NULL;
+ if (ice) {
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
+ ao2_ref(ice, -1);
+ ao2_lock(instance);
}
+}
- pj_thread_register_check();
+/*!
+ * \brief ao2 ICE wrapper object destructor.
+ *
+ * \param vdoomed Object being destroyed.
+ *
+ * \note The associated struct ast_rtp_instance object must not
+ * be locked when unreffing the object. Otherwise we could
+ * deadlock trying to destroy the PJPROJECT ICE structure.
+ */
+static void ice_wrap_dtor(void *vdoomed)
+{
+ struct ice_wrap *ice = vdoomed;
- pj_ice_sess_destroy(rtp->ice);
- rtp->ice = NULL;
+ if (ice->real_ice) {
+ pj_thread_register_check();
+
+ pj_ice_sess_destroy(ice->real_ice);
+ }
}
+/*! \pre instance is locked */
static int ice_reset_session(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
- pj_ice_sess_role role = rtp->ice->role;
+ pj_ice_sess_role role = rtp->ice->real_ice->role;
int res;
ast_debug(3, "Resetting ICE for RTP instance '%p'\n", instance);
- if (!rtp->ice->is_nominating && !rtp->ice->is_complete) {
+ if (!rtp->ice->real_ice->is_nominating && !rtp->ice->real_ice->is_complete) {
ast_debug(3, "Nevermind. ICE isn't ready for a reset\n");
return 0;
}
- ast_debug(3, "Stopping ICE for RTP instance '%p'\n", instance);
- ast_rtp_ice_stop(instance);
-
ast_debug(3, "Recreating ICE session %s (%d) for RTP instance '%p'\n", ast_sockaddr_stringify(&rtp->ice_original_rtp_addr), rtp->ice_port, instance);
res = ice_create(instance, &rtp->ice_original_rtp_addr, rtp->ice_port, 1);
if (!res) {
/* Preserve the role that the old ICE session used */
- pj_ice_sess_change_role(rtp->ice, role);
+ pj_ice_sess_change_role(rtp->ice->real_ice, role);
}
/* If we only have one component now, and we previously set up TURN for RTCP,
@@ -674,13 +704,15 @@ static int ice_reset_session(struct ast_rtp_instance *instance)
struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000));
struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
- ast_mutex_lock(&rtp->lock);
- pj_turn_sock_destroy(rtp->turn_rtcp);
rtp->turn_state = PJ_TURN_STATE_NULL;
+
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
+ pj_turn_sock_destroy(rtp->turn_rtcp);
+ ao2_lock(instance);
while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
- ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ ast_cond_timedwait(&rtp->cond, ao2_object_get_lockaddr(instance), &ts);
}
- ast_mutex_unlock(&rtp->lock);
}
return res;
@@ -713,6 +745,7 @@ static int ice_candidates_compare(struct ao2_container *left, struct ao2_contain
return 0;
}
+/*! \pre instance is locked */
static void ast_rtp_ice_start(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -757,7 +790,8 @@ static void ast_rtp_ice_start(struct ast_rtp_instance *instance)
has_rtp |= candidate->id == AST_RTP_ICE_COMPONENT_RTP;
has_rtcp |= candidate->id == AST_RTP_ICE_COMPONENT_RTCP;
- pj_strdup2(rtp->ice->pool, &candidates[cand_cnt].foundation, candidate->foundation);
+ pj_strdup2(rtp->ice->real_ice->pool, &candidates[cand_cnt].foundation,
+ candidate->foundation);
candidates[cand_cnt].comp_id = candidate->id;
candidates[cand_cnt].prio = candidate->priority;
@@ -777,10 +811,16 @@ static void ast_rtp_ice_start(struct ast_rtp_instance *instance)
if (candidate->id == AST_RTP_ICE_COMPONENT_RTP && rtp->turn_rtp) {
ast_debug(3, "RTP candidate %s (%p)\n", ast_sockaddr_stringify(&candidate->address), instance);
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
pj_turn_sock_set_perm(rtp->turn_rtp, 1, &candidates[cand_cnt].addr, 1);
+ ao2_lock(instance);
} else if (candidate->id == AST_RTP_ICE_COMPONENT_RTCP && rtp->turn_rtcp) {
ast_debug(3, "RTCP candidate %s (%p)\n", ast_sockaddr_stringify(&candidate->address), instance);
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
pj_turn_sock_set_perm(rtp->turn_rtcp, 1, &candidates[cand_cnt].addr, 1);
+ ao2_lock(instance);
}
cand_cnt++;
@@ -803,18 +843,28 @@ static void ast_rtp_ice_start(struct ast_rtp_instance *instance)
ast_log(LOG_WARNING, "No RTCP candidates; skipping ICE checklist (%p)\n", instance);
}
- if (has_rtp && (has_rtcp || rtp->ice_num_components == 1)) {
- pj_status_t res = pj_ice_sess_create_check_list(rtp->ice, &ufrag, &passwd, cand_cnt, &candidates[0]);
+ if (rtp->ice && has_rtp && (has_rtcp || rtp->ice_num_components == 1)) {
+ pj_status_t res;
char reason[80];
+ struct ice_wrap *ice;
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ice = rtp->ice;
+ ao2_ref(ice, +1);
+ ao2_unlock(instance);
+ res = pj_ice_sess_create_check_list(ice->real_ice, &ufrag, &passwd, cand_cnt, &candidates[0]);
if (res == PJ_SUCCESS) {
ast_debug(3, "Successfully created ICE checklist (%p)\n", instance);
ast_test_suite_event_notify("ICECHECKLISTCREATE", "Result: SUCCESS");
- pj_ice_sess_start_check(rtp->ice);
+ pj_ice_sess_start_check(ice->real_ice);
pj_timer_heap_poll(timer_heap, NULL);
+ ao2_ref(ice, -1);
+ ao2_lock(instance);
rtp->strict_rtp_state = STRICT_RTP_OPEN;
return;
}
+ ao2_ref(ice, -1);
+ ao2_lock(instance);
pj_strerror(res, reason, sizeof(reason));
ast_log(LOG_WARNING, "Failed to create ICE session check list: %s (%p)\n", reason, instance);
@@ -828,9 +878,12 @@ static void ast_rtp_ice_start(struct ast_rtp_instance *instance)
this function may be re-entered */
ao2_ref(rtp->ice_active_remote_candidates, -1);
rtp->ice_active_remote_candidates = NULL;
- rtp->ice->rcand_cnt = rtp->ice->clist.count = 0;
+ if (rtp->ice) {
+ rtp->ice->real_ice->rcand_cnt = rtp->ice->real_ice->clist.count = 0;
+ }
}
+/*! \pre instance is locked */
static const char *ast_rtp_ice_get_ufrag(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -838,6 +891,7 @@ static const char *ast_rtp_ice_get_ufrag(struct ast_rtp_instance *instance)
return rtp->local_ufrag;
}
+/*! \pre instance is locked */
static const char *ast_rtp_ice_get_password(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -845,6 +899,7 @@ static const char *ast_rtp_ice_get_password(struct ast_rtp_instance *instance)
return rtp->local_passwd;
}
+/*! \pre instance is locked */
static struct ao2_container *ast_rtp_ice_get_local_candidates(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -856,6 +911,7 @@ static struct ao2_container *ast_rtp_ice_get_local_candidates(struct ast_rtp_ins
return rtp->ice_local_candidates;
}
+/*! \pre instance is locked */
static void ast_rtp_ice_lite(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -866,9 +922,10 @@ static void ast_rtp_ice_lite(struct ast_rtp_instance *instance)
pj_thread_register_check();
- pj_ice_sess_change_role(rtp->ice, PJ_ICE_SESS_ROLE_CONTROLLING);
+ pj_ice_sess_change_role(rtp->ice->real_ice, PJ_ICE_SESS_ROLE_CONTROLLING);
}
+/*! \pre instance is locked */
static void ast_rtp_ice_set_role(struct ast_rtp_instance *instance, enum ast_rtp_ice_role role)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -883,20 +940,29 @@ static void ast_rtp_ice_set_role(struct ast_rtp_instance *instance, enum ast_rtp
pj_thread_register_check();
- pj_ice_sess_change_role(rtp->ice, role == AST_RTP_ICE_ROLE_CONTROLLED ?
+ pj_ice_sess_change_role(rtp->ice->real_ice, role == AST_RTP_ICE_ROLE_CONTROLLED ?
PJ_ICE_SESS_ROLE_CONTROLLED : PJ_ICE_SESS_ROLE_CONTROLLING);
}
-static void ast_rtp_ice_add_cand(struct ast_rtp *rtp, unsigned comp_id, unsigned transport_id, pj_ice_cand_type type, pj_uint16_t local_pref,
- const pj_sockaddr_t *addr, const pj_sockaddr_t *base_addr, const pj_sockaddr_t *rel_addr, int addr_len)
+/*! \pre instance is locked */
+static void ast_rtp_ice_add_cand(struct ast_rtp_instance *instance, struct ast_rtp *rtp,
+ unsigned comp_id, unsigned transport_id, pj_ice_cand_type type, pj_uint16_t local_pref,
+ const pj_sockaddr_t *addr, const pj_sockaddr_t *base_addr, const pj_sockaddr_t *rel_addr,
+ int addr_len)
{
pj_str_t foundation;
struct ast_rtp_engine_ice_candidate *candidate, *existing;
+ struct ice_wrap *ice;
char address[PJ_INET6_ADDRSTRLEN];
+ pj_status_t status;
+
+ if (!rtp->ice) {
+ return;
+ }
pj_thread_register_check();
- pj_ice_calc_foundation(rtp->ice->pool, &foundation, type, addr);
+ pj_ice_calc_foundation(rtp->ice->real_ice->pool, &foundation, type, addr);
if (!rtp->ice_local_candidates && !(rtp->ice_local_candidates = ao2_container_alloc(1, NULL, ice_candidate_cmp))) {
return;
@@ -932,42 +998,60 @@ static void ast_rtp_ice_add_cand(struct ast_rtp *rtp, unsigned comp_id, unsigned
return;
}
- if (pj_ice_sess_add_cand(rtp->ice, comp_id, transport_id, type, local_pref, &foundation, addr, base_addr, rel_addr, addr_len, NULL) != PJ_SUCCESS) {
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ice = rtp->ice;
+ ao2_ref(ice, +1);
+ ao2_unlock(instance);
+ status = pj_ice_sess_add_cand(ice->real_ice, comp_id, transport_id, type, local_pref,
+ &foundation, addr, base_addr, rel_addr, addr_len, NULL);
+ ao2_ref(ice, -1);
+ ao2_lock(instance);
+ if (!rtp->ice || status != PJ_SUCCESS) {
ao2_ref(candidate, -1);
return;
}
/* By placing the candidate into the ICE session it will have produced the priority, so update the local candidate with it */
- candidate->priority = rtp->ice->lcand[rtp->ice->lcand_cnt - 1].prio;
+ candidate->priority = rtp->ice->real_ice->lcand[rtp->ice->real_ice->lcand_cnt - 1].prio;
ao2_link(rtp->ice_local_candidates, candidate);
ao2_ref(candidate, -1);
}
+/* PJPROJECT TURN callback */
static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
{
struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ struct ice_wrap *ice;
pj_status_t status;
- status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP, pkt, pkt_len, peer_addr,
- addr_len);
- if (status != PJ_SUCCESS) {
- char buf[100];
+ ao2_lock(instance);
+ ice = ao2_bump(rtp->ice);
+ ao2_unlock(instance);
- pj_strerror(status, buf, sizeof(buf));
- ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
- (int)status, buf);
- return;
- }
- if (!rtp->rtp_passthrough) {
- return;
+ if (ice) {
+ status = pj_ice_sess_on_rx_pkt(ice->real_ice, AST_RTP_ICE_COMPONENT_RTP,
+ TRANSPORT_TURN_RTP, pkt, pkt_len, peer_addr, addr_len);
+ ao2_ref(ice, -1);
+ if (status != PJ_SUCCESS) {
+ char buf[100];
+
+ pj_strerror(status, buf, sizeof(buf));
+ ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+ (int)status, buf);
+ return;
+ }
+ if (!rtp->rtp_passthrough) {
+ return;
+ }
+ rtp->rtp_passthrough = 0;
}
- rtp->rtp_passthrough = 0;
ast_sendto(rtp->s, pkt, pkt_len, 0, &rtp->rtp_loop);
}
+/* PJPROJECT TURN callback */
static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
{
struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
@@ -980,8 +1064,9 @@ static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t o
rtp = ast_rtp_instance_get_data(instance);
+ ao2_lock(instance);
+
/* We store the new state so the other thread can actually handle it */
- ast_mutex_lock(&rtp->lock);
rtp->turn_state = new_state;
ast_cond_signal(&rtp->cond);
@@ -990,7 +1075,7 @@ static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t o
rtp->turn_rtp = NULL;
}
- ast_mutex_unlock(&rtp->lock);
+ ao2_unlock(instance);
}
/* RTP TURN Socket interface declaration */
@@ -999,34 +1084,44 @@ static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
.on_state = ast_rtp_on_turn_rtp_state,
};
+/* PJPROJECT TURN callback */
static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
{
struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ struct ice_wrap *ice;
pj_status_t status;
- status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP, pkt, pkt_len, peer_addr,
- addr_len);
- if (status != PJ_SUCCESS) {
- char buf[100];
+ ao2_lock(instance);
+ ice = ao2_bump(rtp->ice);
+ ao2_unlock(instance);
- pj_strerror(status, buf, sizeof(buf));
- ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
- (int)status, buf);
- return;
- }
- if (!rtp->rtcp_passthrough) {
- return;
+ if (ice) {
+ status = pj_ice_sess_on_rx_pkt(ice->real_ice, AST_RTP_ICE_COMPONENT_RTCP,
+ TRANSPORT_TURN_RTCP, pkt, pkt_len, peer_addr, addr_len);
+ ao2_ref(ice, -1);
+ if (status != PJ_SUCCESS) {
+ char buf[100];
+
+ pj_strerror(status, buf, sizeof(buf));
+ ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+ (int)status, buf);
+ return;
+ }
+ if (!rtp->rtcp_passthrough) {
+ return;
+ }
+ rtp->rtcp_passthrough = 0;
}
- rtp->rtcp_passthrough = 0;
ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp_loop);
}
+/* PJPROJECT TURN callback */
static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
{
struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
- struct ast_rtp *rtp = NULL;
+ struct ast_rtp *rtp;
/* If this is a leftover from an already destroyed RTP instance just ignore the state change */
if (!instance) {
@@ -1035,8 +1130,9 @@ static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t
rtp = ast_rtp_instance_get_data(instance);
+ ao2_lock(instance);
+
/* We store the new state so the other thread can actually handle it */
- ast_mutex_lock(&rtp->lock);
rtp->turn_state = new_state;
ast_cond_signal(&rtp->cond);
@@ -1045,7 +1141,7 @@ static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t
rtp->turn_rtcp = NULL;
}
- ast_mutex_unlock(&rtp->lock);
+ ao2_unlock(instance);
}
/* RTCP TURN Socket interface declaration */
@@ -1177,6 +1273,7 @@ end:
return ioqueue;
}
+/*! \pre instance is locked */
static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast_rtp_ice_component_type component,
enum ast_transport transport, const char *server, unsigned int port, const char *username, const char *password)
{
@@ -1193,6 +1290,7 @@ static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast
struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
pj_turn_session_info info;
struct ast_sockaddr local, loop;
+ pj_status_t status;
ast_rtp_instance_get_local_address(instance, &local);
if (ast_sockaddr_is_ipv4(&local)) {
@@ -1227,18 +1325,27 @@ static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast
ast_sockaddr_parse(&addr, server, PARSE_PORT_FORBID);
- ast_mutex_lock(&rtp->lock);
if (*turn_sock) {
- pj_turn_sock_destroy(*turn_sock);
rtp->turn_state = PJ_TURN_STATE_NULL;
+
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
+ pj_turn_sock_destroy(*turn_sock);
+ ao2_lock(instance);
while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
- ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ ast_cond_timedwait(&rtp->cond, ao2_object_get_lockaddr(instance), &ts);
}
}
- ast_mutex_unlock(&rtp->lock);
if (component == AST_RTP_ICE_COMPONENT_RTP && !rtp->ioqueue) {
+ /*
+ * We cannot hold the instance lock because we could wait
+ * for the ioqueue thread to die and we might deadlock as
+ * a result.
+ */
+ ao2_unlock(instance);
rtp->ioqueue = rtp_ioqueue_thread_get_or_create();
+ ao2_lock(instance);
if (!rtp->ioqueue) {
return;
}
@@ -1246,9 +1353,14 @@ static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast
pj_stun_config_init(&stun_config, &cachingpool.factory, 0, rtp->ioqueue->ioqueue, rtp->ioqueue->timerheap);
- if (pj_turn_sock_create(&stun_config, ast_sockaddr_is_ipv4(&addr) ? pj_AF_INET() : pj_AF_INET6(), conn_type,
- turn_cb, NULL, instance, turn_sock) != PJ_SUCCESS) {
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
+ status = pj_turn_sock_create(&stun_config,
+ ast_sockaddr_is_ipv4(&addr) ? pj_AF_INET() : pj_AF_INET6(), conn_type,
+ turn_cb, NULL, instance, turn_sock);
+ if (status != PJ_SUCCESS) {
ast_log(LOG_WARNING, "Could not create a TURN client socket\n");
+ ao2_lock(instance);
return;
}
@@ -1257,13 +1369,16 @@ static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast
cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
pj_strset2(&cred.data.static_cred.data, (char*)password);
- /* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
- ast_mutex_lock(&rtp->lock);
pj_turn_sock_alloc(*turn_sock, pj_cstr(&turn_addr, server), port, NULL, &cred, NULL);
+ ao2_lock(instance);
+
+ /*
+ * Because the TURN socket is asynchronous and we are synchronous we need to
+ * wait until it is done
+ */
while (rtp->turn_state < PJ_TURN_STATE_READY) {
- ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ ast_cond_timedwait(&rtp->cond, ao2_object_get_lockaddr(instance), &ts);
}
- ast_mutex_unlock(&rtp->lock);
/* If a TURN session was allocated add it as a candidate */
if (rtp->turn_state != PJ_TURN_STATE_READY) {
@@ -1272,8 +1387,9 @@ static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast
pj_turn_sock_get_info(*turn_sock, &info);
- ast_rtp_ice_add_cand(rtp, component, conn_transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr,
- &info.relay_addr, &info.mapped_addr, pj_sockaddr_get_len(&info.relay_addr));
+ ast_rtp_ice_add_cand(instance, rtp, component, conn_transport,
+ PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr, &info.relay_addr,
+ &info.mapped_addr, pj_sockaddr_get_len(&info.relay_addr));
if (component == AST_RTP_ICE_COMPONENT_RTP) {
ast_sockaddr_copy(&rtp->rtp_loop, &loop);
@@ -1295,6 +1411,7 @@ static char *generate_random_string(char *buf, size_t size)
return buf;
}
+/*! \pre instance is locked */
static void ast_rtp_ice_change_components(struct ast_rtp_instance *instance, int num_components)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1364,8 +1481,6 @@ static int dtls_details_initialize(struct dtls_details *dtls, SSL_CTX *ssl_ctx,
}
dtls->connection = AST_RTP_DTLS_CONNECTION_NEW;
- ast_mutex_init(&dtls->lock);
-
return 0;
error:
@@ -1397,6 +1512,7 @@ static int dtls_setup_rtcp(struct ast_rtp_instance *instance)
return dtls_details_initialize(&rtp->rtcp->dtls, rtp->ssl_ctx, rtp->dtls.dtls_setup);
}
+/*! \pre instance is locked */
static int ast_rtp_dtls_set_configuration(struct ast_rtp_instance *instance, const struct ast_rtp_dtls_cfg *dtls_cfg)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1572,6 +1688,7 @@ static int ast_rtp_dtls_set_configuration(struct ast_rtp_instance *instance, con
return res;
}
+/*! \pre instance is locked */
static int ast_rtp_dtls_active(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1579,12 +1696,15 @@ static int ast_rtp_dtls_active(struct ast_rtp_instance *instance)
return !rtp->ssl_ctx ? 0 : 1;
}
+/*! \pre instance is locked */
static void ast_rtp_dtls_stop(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
SSL *ssl = rtp->dtls.ssl;
+ ao2_unlock(instance);
dtls_srtp_stop_timeout_timer(instance, rtp, 0);
+ ao2_lock(instance);
if (rtp->ssl_ctx) {
SSL_CTX_free(rtp->ssl_ctx);
@@ -1594,20 +1714,21 @@ static void ast_rtp_dtls_stop(struct ast_rtp_instance *instance)
if (rtp->dtls.ssl) {
SSL_free(rtp->dtls.ssl);
rtp->dtls.ssl = NULL;
- ast_mutex_destroy(&rtp->dtls.lock);
}
if (rtp->rtcp) {
+ ao2_unlock(instance);
dtls_srtp_stop_timeout_timer(instance, rtp, 1);
+ ao2_lock(instance);
if (rtp->rtcp->dtls.ssl && (rtp->rtcp->dtls.ssl != ssl)) {
SSL_free(rtp->rtcp->dtls.ssl);
rtp->rtcp->dtls.ssl = NULL;
- ast_mutex_destroy(&rtp->rtcp->dtls.lock);
}
}
}
+/*! \pre instance is locked */
static void ast_rtp_dtls_reset(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1623,6 +1744,7 @@ static void ast_rtp_dtls_reset(struct ast_rtp_instance *instance)
}
}
+/*! \pre instance is locked */
static enum ast_rtp_dtls_connection ast_rtp_dtls_get_connection(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1630,6 +1752,7 @@ static enum ast_rtp_dtls_connection ast_rtp_dtls_get_connection(struct ast_rtp_i
return rtp->dtls.connection;
}
+/*! \pre instance is locked */
static enum ast_rtp_dtls_setup ast_rtp_dtls_get_setup(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1681,6 +1804,7 @@ static void dtls_set_setup(enum ast_rtp_dtls_setup *dtls_setup, enum ast_rtp_dtl
}
}
+/*! \pre instance is locked */
static void ast_rtp_dtls_set_setup(struct ast_rtp_instance *instance, enum ast_rtp_dtls_setup setup)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1694,6 +1818,7 @@ static void ast_rtp_dtls_set_setup(struct ast_rtp_instance *instance, enum ast_r
}
}
+/*! \pre instance is locked */
static void ast_rtp_dtls_set_fingerprint(struct ast_rtp_instance *instance, enum ast_rtp_dtls_hash hash, const char *fingerprint)
{
char *tmp = ast_strdupa(fingerprint), *value;
@@ -1711,6 +1836,7 @@ static void ast_rtp_dtls_set_fingerprint(struct ast_rtp_instance *instance, enum
}
}
+/*! \pre instance is locked */
static enum ast_rtp_dtls_hash ast_rtp_dtls_get_fingerprint_hash(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1718,6 +1844,7 @@ static enum ast_rtp_dtls_hash ast_rtp_dtls_get_fingerprint_hash(struct ast_rtp_i
return rtp->local_hash;
}
+/*! \pre instance is locked */
static const char *ast_rtp_dtls_get_fingerprint(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1777,6 +1904,7 @@ static struct ast_rtp_engine asterisk_rtp_engine = {
};
#ifdef HAVE_OPENSSL_SRTP
+/*! \pre instance is locked */
static void dtls_perform_handshake(struct ast_rtp_instance *instance, struct dtls_details *dtls, int rtcp)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1791,40 +1919,51 @@ static void dtls_perform_handshake(struct ast_rtp_instance *instance, struct dtl
SSL_do_handshake(dtls->ssl);
- /* Since the handshake is started in a thread outside of the channel thread it's possible
- * for the response to be handled in the channel thread before we start the timeout timer.
- * To ensure this doesn't actually happen we hold the DTLS lock. The channel thread will
- * block until we're done at which point the timeout timer will be immediately stopped.
+ /*
+ * A race condition is prevented between this function and __rtp_recvfrom()
+ * because both functions have to get the instance lock before they can do
+ * anything. Without holding the instance lock, this function could start
+ * the SSL handshake above in one thread and the __rtp_recvfrom() function
+ * called by the channel thread could read the response and stop the timeout
+ * timer before we have a chance to even start it.
*/
- ast_mutex_lock(&dtls->lock);
- dtls_srtp_check_pending(instance, rtp, rtcp);
dtls_srtp_start_timeout_timer(instance, rtp, rtcp);
- ast_mutex_unlock(&dtls->lock);
+
+ /*
+ * We must call dtls_srtp_check_pending() after starting the timer.
+ * Otherwise we won't prevent the race condition.
+ */
+ dtls_srtp_check_pending(instance, rtp, rtcp);
}
#endif
#ifdef HAVE_PJPROJECT
static void rtp_learning_seq_init(struct rtp_learning_info *info, uint16_t seq);
+/* PJPROJECT ICE callback */
static void ast_rtp_on_ice_complete(pj_ice_sess *ice, pj_status_t status)
{
struct ast_rtp_instance *instance = ice->user_data;
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ ao2_lock(instance);
if (status == PJ_SUCCESS) {
struct ast_sockaddr remote_address;
- /* Symmetric RTP must be disabled for the remote address to not get overwritten */
- ast_rtp_instance_set_prop(instance, AST_RTP_PROPERTY_NAT, 0);
+ ast_sockaddr_setnull(&remote_address);
+ update_address_with_ice_candidate(ice, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
+ if (!ast_sockaddr_isnull(&remote_address)) {
+ /* Symmetric RTP must be disabled for the remote address to not get overwritten */
+ ast_rtp_instance_set_prop(instance, AST_RTP_PROPERTY_NAT, 0);
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
- ast_rtp_instance_set_remote_address(instance, &remote_address);
+ ast_rtp_instance_set_remote_address(instance, &remote_address);
+ }
if (rtp->rtcp) {
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &rtp->rtcp->them);
+ update_address_with_ice_candidate(ice, AST_RTP_ICE_COMPONENT_RTCP, &rtp->rtcp->them);
}
}
-
+
#ifdef HAVE_OPENSSL_SRTP
dtls_perform_handshake(instance, &rtp->dtls, 0);
@@ -1834,13 +1973,16 @@ static void ast_rtp_on_ice_complete(pj_ice_sess *ice, pj_status_t status)
#endif
if (!strictrtp) {
+ ao2_unlock(instance);
return;
}
rtp->strict_rtp_state = STRICT_RTP_LEARN;
rtp_learning_seq_init(&rtp->rtp_source_learn, (uint16_t)rtp->seqno);
+ ao2_unlock(instance);
}
+/* PJPROJECT ICE callback */
static void ast_rtp_on_ice_rx_data(pj_ice_sess *ice, unsigned comp_id, unsigned transport_id, void *pkt, pj_size_t size, const pj_sockaddr_t *src_addr, unsigned src_addr_len)
{
struct ast_rtp_instance *instance = ice->user_data;
@@ -1857,6 +1999,7 @@ static void ast_rtp_on_ice_rx_data(pj_ice_sess *ice, unsigned comp_id, unsigned
}
}
+/* PJPROJECT ICE callback */
static pj_status_t ast_rtp_on_ice_tx_pkt(pj_ice_sess *ice, unsigned comp_id, unsigned transport_id, const void *pkt, pj_size_t size, const pj_sockaddr_t *dst_addr, unsigned dst_addr_len)
{
struct ast_rtp_instance *instance = ice->user_data;
@@ -1953,6 +2096,7 @@ static inline int rtcp_debug_test_addr(struct ast_sockaddr *addr)
}
#ifdef HAVE_OPENSSL_SRTP
+/*! \pre instance is locked */
static int dtls_srtp_handle_timeout(struct ast_rtp_instance *instance, int rtcp)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -1971,13 +2115,15 @@ static int dtls_srtp_handle_timeout(struct ast_rtp_instance *instance, int rtcp)
return dtls_timeout.tv_sec * 1000 + dtls_timeout.tv_usec / 1000;
}
+/* Scheduler callback */
static int dtls_srtp_handle_rtp_timeout(const void *data)
{
struct ast_rtp_instance *instance = (struct ast_rtp_instance *)data;
int reschedule;
+ ao2_lock(instance);
reschedule = dtls_srtp_handle_timeout(instance, 0);
-
+ ao2_unlock(instance);
if (!reschedule) {
ao2_ref(instance, -1);
}
@@ -1985,13 +2131,15 @@ static int dtls_srtp_handle_rtp_timeout(const void *data)
return reschedule;
}
+/* Scheduler callback */
static int dtls_srtp_handle_rtcp_timeout(const void *data)
{
struct ast_rtp_instance *instance = (struct ast_rtp_instance *)data;
int reschedule;
+ ao2_lock(instance);
reschedule = dtls_srtp_handle_timeout(instance, 1);
-
+ ao2_unlock(instance);
if (!reschedule) {
ao2_ref(instance, -1);
}
@@ -2019,6 +2167,7 @@ static void dtls_srtp_start_timeout_timer(struct ast_rtp_instance *instance, str
}
}
+/*! \pre Must not be called with the instance locked. */
static void dtls_srtp_stop_timeout_timer(struct ast_rtp_instance *instance, struct ast_rtp *rtp, int rtcp)
{
struct dtls_details *dtls = !rtcp ? &rtp->dtls : &rtp->rtcp->dtls;
@@ -2026,6 +2175,7 @@ static void dtls_srtp_stop_timeout_timer(struct ast_rtp_instance *instance, stru
AST_SCHED_DEL_UNREF(rtp->sched, dtls->timeout_timer, ao2_ref(instance, -1));
}
+/*! \pre instance is locked */
static void dtls_srtp_check_pending(struct ast_rtp_instance *instance, struct ast_rtp *rtp, int rtcp)
{
struct dtls_details *dtls = !rtcp ? &rtp->dtls : &rtp->rtcp->dtls;
@@ -2059,11 +2209,14 @@ static void dtls_srtp_check_pending(struct ast_rtp_instance *instance, struct as
}
}
+/* Scheduler callback */
static int dtls_srtp_renegotiate(const void *data)
{
struct ast_rtp_instance *instance = (struct ast_rtp_instance *)data;
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ ao2_lock(instance);
+
SSL_renegotiate(rtp->dtls.ssl);
SSL_do_handshake(rtp->dtls.ssl);
dtls_srtp_check_pending(instance, rtp, 0);
@@ -2075,6 +2228,8 @@ static int dtls_srtp_renegotiate(const void *data)
}
rtp->rekeyid = -1;
+
+ ao2_unlock(instance);
ao2_ref(instance, -1);
return 0;
@@ -2216,6 +2371,7 @@ error:
}
#endif
+/*! \pre instance is locked */
static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp)
{
int len;
@@ -2244,14 +2400,18 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
return -1;
}
- /* This mutex is locked so that this thread blocks until the dtls_perform_handshake function
- * completes.
+ /*
+ * A race condition is prevented between dtls_perform_handshake()
+ * and this function because both functions have to get the
+ * instance lock before they can do anything. The
+ * dtls_perform_handshake() function needs to start the timer
+ * before we stop it below.
*/
- ast_mutex_lock(&dtls->lock);
- ast_mutex_unlock(&dtls->lock);
/* Before we feed data into OpenSSL ensure that the timeout timer is either stopped or completed */
+ ao2_unlock(instance);
dtls_srtp_stop_timeout_timer(instance, rtp, rtcp);
+ ao2_lock(instance);
/* If we don't yet know if we are active or passive and we receive a packet... we are obviously passive */
if (dtls->dtls_setup == AST_RTP_DTLS_SETUP_ACTPASS) {
@@ -2302,14 +2462,22 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
pj_str_t combined = pj_str(ast_sockaddr_stringify(sa));
pj_sockaddr address;
pj_status_t status;
+ struct ice_wrap *ice;
pj_thread_register_check();
pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &combined, &address);
- status = pj_ice_sess_on_rx_pkt(rtp->ice, rtcp ? AST_RTP_ICE_COMPONENT_RTCP : AST_RTP_ICE_COMPONENT_RTP,
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ice = rtp->ice;
+ ao2_ref(ice, +1);
+ ao2_unlock(instance);
+ status = pj_ice_sess_on_rx_pkt(ice->real_ice,
+ rtcp ? AST_RTP_ICE_COMPONENT_RTCP : AST_RTP_ICE_COMPONENT_RTP,
rtcp ? TRANSPORT_SOCKET_RTCP : TRANSPORT_SOCKET_RTP, buf, len, &address,
pj_sockaddr_get_len(&address));
+ ao2_ref(ice, -1);
+ ao2_lock(instance);
if (status != PJ_SUCCESS) {
char buf[100];
@@ -2332,17 +2500,20 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
return len;
}
+/*! \pre instance is locked */
static int rtcp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa)
{
return __rtp_recvfrom(instance, buf, size, flags, sa, 1);
}
+/*! \pre instance is locked */
static int rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa)
{
return __rtp_recvfrom(instance, buf, size, flags, sa, 0);
}
-static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *ice, int use_srtp)
+/*! \pre instance is locked */
+static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *via_ice, int use_srtp)
{
int len = size;
void *temp = buf;
@@ -2350,7 +2521,7 @@ static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t siz
struct ast_srtp *srtp = ast_rtp_instance_get_srtp(instance, rtcp);
int res;
- *ice = 0;
+ *via_ice = 0;
if (use_srtp && res_srtp && srtp && res_srtp->protect(srtp, &temp, &len, rtcp) < 0) {
return -1;
@@ -2358,10 +2529,21 @@ static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t siz
#ifdef HAVE_PJPROJECT
if (rtp->ice) {
+ pj_status_t status;
+ struct ice_wrap *ice;
+
pj_thread_register_check();
- if (pj_ice_sess_send_data(rtp->ice, rtcp ? AST_RTP_ICE_COMPONENT_RTCP : AST_RTP_ICE_COMPONENT_RTP, temp, len) == PJ_SUCCESS) {
- *ice = 1;
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ice = rtp->ice;
+ ao2_ref(ice, +1);
+ ao2_unlock(instance);
+ status = pj_ice_sess_send_data(ice->real_ice,
+ rtcp ? AST_RTP_ICE_COMPONENT_RTCP : AST_RTP_ICE_COMPONENT_RTP, temp, len);
+ ao2_ref(ice, -1);
+ ao2_lock(instance);
+ if (status == PJ_SUCCESS) {
+ *via_ice = 1;
return len;
}
}
@@ -2375,11 +2557,13 @@ static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t siz
return res;
}
+/*! \pre instance is locked */
static int rtcp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int *ice)
{
return __rtp_sendto(instance, buf, size, flags, sa, 1, ice, 1);
}
+/*! \pre instance is locked */
static int rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int *ice)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -2554,6 +2738,7 @@ static int stun_address_is_blacklisted(const struct ast_sockaddr *addr)
return result;
}
+/*! \pre instance is locked */
static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct ast_rtp *rtp, struct ast_sockaddr *addr, int port, int component,
int transport)
{
@@ -2578,8 +2763,9 @@ static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct
basepos = pos;
}
pj_sockaddr_set_port(&address[pos], port);
- ast_rtp_ice_add_cand(rtp, component, transport, PJ_ICE_CAND_TYPE_HOST, 65535, &address[pos], &address[pos], NULL,
- pj_sockaddr_get_len(&address[pos]));
+ ast_rtp_ice_add_cand(instance, rtp, component, transport,
+ PJ_ICE_CAND_TYPE_HOST, 65535, &address[pos], &address[pos], NULL,
+ pj_sockaddr_get_len(&address[pos]));
}
}
if (basepos == -1) {
@@ -2591,8 +2777,17 @@ static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct
if (stunaddr.sin_addr.s_addr && count && ast_sockaddr_is_ipv4(addr)
&& !stun_address_is_blacklisted(addr)) {
struct sockaddr_in answer;
+ int rsp;
- if (!ast_stun_request(component == AST_RTP_ICE_COMPONENT_RTCP ? rtp->rtcp->s : rtp->s, &stunaddr, NULL, &answer)) {
+ /*
+ * The instance should not be locked because we can block
+ * waiting for a STUN respone.
+ */
+ ao2_unlock(instance);
+ rsp = ast_stun_request(component == AST_RTP_ICE_COMPONENT_RTCP
+ ? rtp->rtcp->s : rtp->s, &stunaddr, NULL, &answer);
+ ao2_lock(instance);
+ if (!rsp) {
pj_sockaddr base;
pj_sockaddr ext;
pj_str_t mapped = pj_str(ast_strdupa(ast_inet_ntoa(answer.sin_addr)));
@@ -2612,8 +2807,9 @@ static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct
}
if (srflx) {
- ast_rtp_ice_add_cand(rtp, component, transport, PJ_ICE_CAND_TYPE_SRFLX, 65535, &ext, &base,
- &base, pj_sockaddr_get_len(&ext));
+ ast_rtp_ice_add_cand(instance, rtp, component, transport,
+ PJ_ICE_CAND_TYPE_SRFLX, 65535, &ext, &base, &base,
+ pj_sockaddr_get_len(&ext));
}
}
}
@@ -2665,6 +2861,8 @@ static unsigned int calc_txstamp(struct ast_rtp *rtp, struct timeval *delivery)
* \param port port to use for adding RTP candidates to the ICE session
* \param replace 0 when creating a new session, 1 when replacing a destroyed session
*
+ * \pre instance is locked
+ *
* \retval 0 on success
* \retval -1 on failure
*/
@@ -2673,11 +2871,21 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad
{
pj_stun_config stun_config;
pj_str_t ufrag, passwd;
+ pj_status_t status;
+ struct ice_wrap *ice_old;
+ struct ice_wrap *ice;
+ pj_ice_sess *real_ice = NULL;
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
ao2_cleanup(rtp->ice_local_candidates);
rtp->ice_local_candidates = NULL;
+ ice = ao2_alloc_options(sizeof(*ice), ice_wrap_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!ice) {
+ ast_rtp_ice_stop(instance);
+ return -1;
+ }
+
pj_thread_register_check();
pj_stun_config_init(&stun_config, &cachingpool.factory, 0, NULL, timer_heap);
@@ -2685,11 +2893,23 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad
ufrag = pj_str(rtp->local_ufrag);
passwd = pj_str(rtp->local_passwd);
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
/* Create an ICE session for ICE negotiation */
- if (pj_ice_sess_create(&stun_config, NULL, PJ_ICE_SESS_ROLE_UNKNOWN, rtp->ice_num_components,
- &ast_rtp_ice_sess_cb, &ufrag, &passwd, NULL, &rtp->ice) == PJ_SUCCESS) {
- /* Make this available for the callbacks */
- rtp->ice->user_data = instance;
+ status = pj_ice_sess_create(&stun_config, NULL, PJ_ICE_SESS_ROLE_UNKNOWN,
+ rtp->ice_num_components, &ast_rtp_ice_sess_cb, &ufrag, &passwd, NULL, &real_ice);
+ ao2_lock(instance);
+ if (status == PJ_SUCCESS) {
+ /* Safely complete linking the ICE session into the instance */
+ real_ice->user_data = instance;
+ ice->real_ice = real_ice;
+ ice_old = rtp->ice;
+ rtp->ice = ice;
+ if (ice_old) {
+ ao2_unlock(instance);
+ ao2_ref(ice_old, -1);
+ ao2_lock(instance);
+ }
/* Add all of the available candidates to the ICE session */
rtp_add_candidates_to_ice(instance, rtp, addr, port, AST_RTP_ICE_COMPONENT_RTP,
@@ -2707,11 +2927,19 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad
return 0;
}
+ /*
+ * It is safe to unref this while instance is locked here.
+ * It was not initialized with a real_ice pointer.
+ */
+ ao2_ref(ice, -1);
+
+ ast_rtp_ice_stop(instance);
return -1;
}
#endif
+/*! \pre instance is locked */
static int ast_rtp_new(struct ast_rtp_instance *instance,
struct ast_sched_context *sched, struct ast_sockaddr *addr,
void *data)
@@ -2724,10 +2952,6 @@ static int ast_rtp_new(struct ast_rtp_instance *instance,
return -1;
}
- /* Initialize synchronization aspects */
- ast_mutex_init(&rtp->lock);
- ast_cond_init(&rtp->cond, NULL);
-
/* Set default parameters on the newly created RTP structure */
rtp->ssrc = ast_random();
rtp->seqno = ast_random() & 0x7fff;
@@ -2776,6 +3000,9 @@ static int ast_rtp_new(struct ast_rtp_instance *instance,
}
#ifdef HAVE_PJPROJECT
+ /* Initialize synchronization aspects */
+ ast_cond_init(&rtp->cond, NULL);
+
generate_random_string(rtp->local_ufrag, sizeof(rtp->local_ufrag));
generate_random_string(rtp->local_passwd, sizeof(rtp->local_passwd));
#endif
@@ -2786,7 +3013,7 @@ static int ast_rtp_new(struct ast_rtp_instance *instance,
rtp->ice_num_components = 2;
ast_debug(3, "Creating ICE session %s (%d) for RTP instance '%p'\n", ast_sockaddr_stringify(addr), x, instance);
if (ice_create(instance, addr, x, 0)) {
- ast_log(LOG_NOTICE, "Failed to start ICE session\n");
+ ast_log(LOG_NOTICE, "Failed to create ICE session\n");
} else {
rtp->ice_port = x;
ast_sockaddr_copy(&rtp->ice_original_rtp_addr, addr);
@@ -2808,6 +3035,7 @@ static int ast_rtp_new(struct ast_rtp_instance *instance,
return 0;
}
+/*! \pre instance is locked */
static int ast_rtp_destroy(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -2844,7 +3072,9 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance)
/* Destroy RED if it was being used */
if (rtp->red) {
+ ao2_unlock(instance);
AST_SCHED_DEL(rtp->sched, rtp->red->schedid);
+ ao2_lock(instance);
ast_free(rtp->red);
rtp->red = NULL;
}
@@ -2852,34 +3082,38 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance)
#ifdef HAVE_PJPROJECT
pj_thread_register_check();
- /* Destroy the RTP TURN relay if being used */
- ast_mutex_lock(&rtp->lock);
+ /*
+ * The instance lock is already held.
+ *
+ * Destroy the RTP TURN relay if being used
+ */
if (rtp->turn_rtp) {
- pj_turn_sock_destroy(rtp->turn_rtp);
rtp->turn_state = PJ_TURN_STATE_NULL;
+
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
+ pj_turn_sock_destroy(rtp->turn_rtp);
+ ao2_lock(instance);
while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
- ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ ast_cond_timedwait(&rtp->cond, ao2_object_get_lockaddr(instance), &ts);
}
}
/* Destroy the RTCP TURN relay if being used */
if (rtp->turn_rtcp) {
- pj_turn_sock_destroy(rtp->turn_rtcp);
rtp->turn_state = PJ_TURN_STATE_NULL;
+
+ /* Release the instance lock to avoid deadlock with PJPROJECT group lock */
+ ao2_unlock(instance);
+ pj_turn_sock_destroy(rtp->turn_rtcp);
+ ao2_lock(instance);
while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
- ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ ast_cond_timedwait(&rtp->cond, ao2_object_get_lockaddr(instance), &ts);
}
}
- ast_mutex_unlock(&rtp->lock);
-
- if (rtp->ioqueue) {
- rtp_ioqueue_thread_remove(rtp->ioqueue);
- }
- /* Destroy the ICE session if being used */
- if (rtp->ice) {
- pj_ice_sess_destroy(rtp->ice);
- }
+ /* Destroy any ICE session */
+ ast_rtp_ice_stop(instance);
/* Destroy any candidates */
if (rtp->ice_local_candidates) {
@@ -2889,15 +3123,27 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance)
if (rtp->ice_active_remote_candidates) {
ao2_ref(rtp->ice_active_remote_candidates, -1);
}
+
+ if (rtp->ioqueue) {
+ /*
+ * We cannot hold the instance lock because we could wait
+ * for the ioqueue thread to die and we might deadlock as
+ * a result.
+ */
+ ao2_unlock(instance);
+ rtp_ioqueue_thread_remove(rtp->ioqueue);
+ ao2_lock(instance);
+ }
#endif
ao2_cleanup(rtp->lasttxformat);
ao2_cleanup(rtp->lastrxformat);
ao2_cleanup(rtp->f.subclass.format);
+#ifdef HAVE_PJPROJECT
/* Destroy synchronization items */
- ast_mutex_destroy(&rtp->lock);
ast_cond_destroy(&rtp->cond);
+#endif
/* Finally destroy ourselves */
ast_free(rtp);
@@ -2905,6 +3151,7 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance)
return 0;
}
+/*! \pre instance is locked */
static int ast_rtp_dtmf_mode_set(struct ast_rtp_instance *instance, enum ast_rtp_dtmf_mode dtmf_mode)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -2912,12 +3159,14 @@ static int ast_rtp_dtmf_mode_set(struct ast_rtp_instance *instance, enum ast_rtp
return 0;
}
+/*! \pre instance is locked */
static enum ast_rtp_dtmf_mode ast_rtp_dtmf_mode_get(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
return rtp->dtmfmode;
}
+/*! \pre instance is locked */
static int ast_rtp_dtmf_begin(struct ast_rtp_instance *instance, char digit)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -2992,6 +3241,7 @@ static int ast_rtp_dtmf_begin(struct ast_rtp_instance *instance, char digit)
return 0;
}
+/*! \pre instance is locked */
static int ast_rtp_dtmf_continuation(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -3037,6 +3287,7 @@ static int ast_rtp_dtmf_continuation(struct ast_rtp_instance *instance)
return 0;
}
+/*! \pre instance is locked */
static int ast_rtp_dtmf_end_with_duration(struct ast_rtp_instance *instance, char digit, unsigned int duration)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -3116,11 +3367,13 @@ cleanup:
return res;
}
+/*! \pre instance is locked */
static int ast_rtp_dtmf_end(struct ast_rtp_instance *instance, char digit)
{
return ast_rtp_dtmf_end_with_duration(instance, digit, 0);
}
+/*! \pre instance is locked */
static void ast_rtp_update_source(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -3132,6 +3385,7 @@ static void ast_rtp_update_source(struct ast_rtp_instance *instance)
return;
}
+/*! \pre instance is locked */
static void ast_rtp_change_source(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -3254,7 +3508,11 @@ static void calculate_lost_packet_statistics(struct ast_rtp *rtp,
rtp->rtcp->rxlost_count++;
}
-/*! \brief Send RTCP SR or RR report */
+/*!
+ * \brief Send RTCP SR or RR report
+ *
+ * \pre instance is locked
+ */
static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -3408,9 +3666,14 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr)
return res;
}
-/*! \brief Write and RTCP packet to the far end
+/*!
+ * \brief Write a RTCP packet to the far end
+ *
* \note Decide if we are going to send an SR (with Reception Block) or RR
- * RR is sent if we have not sent any rtp packets in the previous interval */
+ * RR is sent if we have not sent any rtp packets in the previous interval
+ *
+ * Scheduler callback
+ */
static int ast_rtcp_write(const void *data)
{
struct ast_rtp_instance *instance = (struct ast_rtp_instance *) data;
@@ -3422,6 +3685,7 @@ static int ast_rtcp_write(const void *data)
return 0;
}
+ ao2_lock(instance);
if (rtp->txcount > rtp->rtcp->lastsrtxcount) {
/* Send an SR */
res = ast_rtcp_write_report(instance, 1);
@@ -3429,6 +3693,7 @@ static int ast_rtcp_write(const void *data)
/* Send an RR */
res = ast_rtcp_write_report(instance, 0);
}
+ ao2_unlock(instance);
if (!res) {
/*
@@ -3441,6 +3706,7 @@ static int ast_rtcp_write(const void *data)
return res;
}
+/*! \pre instance is locked */
static int ast_rtp_raw_write(struct ast_rtp_instance *instance, struct ast_frame *frame, int codec)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -3607,6 +3873,7 @@ static struct ast_frame *red_t140_to_red(struct rtp_red *red)
return &red->t140red;
}
+/*! \pre instance is locked */
static int ast_rtp_write(struct ast_rtp_instance *instance, struct ast_frame *frame)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -4432,6 +4699,7 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c
}
+/*! \pre instance is locked */
static struct ast_frame *ast_rtcp_read(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -4483,10 +4751,12 @@ static struct ast_frame *ast_rtcp_read(struct ast_rtp_instance *instance)
return ast_rtcp_interpret(instance, read_area, res, &addr);
}
-static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance, unsigned int *rtpheader, int len, int hdrlen)
+/*! \pre instance is locked */
+static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance,
+ struct ast_rtp_instance *instance1, unsigned int *rtpheader, int len, int hdrlen)
{
- struct ast_rtp_instance *instance1 = ast_rtp_instance_get_bridged(instance);
- struct ast_rtp *rtp = ast_rtp_instance_get_data(instance), *bridged = ast_rtp_instance_get_data(instance1);
+ struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ struct ast_rtp *bridged = ast_rtp_instance_get_data(instance1);
int res = 0, payload = 0, bridged_payload = 0, mark;
RAII_VAR(struct ast_rtp_payload_type *, payload_type, NULL, ao2_cleanup);
int reconstruct = ntohl(rtpheader[0]);
@@ -4550,10 +4820,27 @@ static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance, unsigned int
reconstruct |= (mark << 23);
rtpheader[0] = htonl(reconstruct);
+ /*
+ * We have now determined that we need to send the RTP packet
+ * out the bridged instance to do local bridging so we must unlock
+ * the receiving instance to prevent deadlock with the bridged
+ * instance.
+ *
+ * Technically we should grab a ref to instance1 so it won't go
+ * away on us. However, we should be safe because the bridged
+ * instance won't change without both channels involved being
+ * locked and we currently have the channel lock for the receiving
+ * instance.
+ */
+ ao2_unlock(instance);
+ ao2_lock(instance1);
+
ast_rtp_instance_get_remote_address(instance1, &remote_address);
if (ast_sockaddr_isnull(&remote_address)) {
ast_debug(5, "Remote address is null, most likely RTP has been stopped\n");
+ ao2_unlock(instance1);
+ ao2_lock(instance);
return 0;
}
@@ -4575,6 +4862,8 @@ static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance, unsigned int
}
ast_set_flag(bridged, FLAG_NAT_INACTIVE_NOWARN);
}
+ ao2_unlock(instance1);
+ ao2_lock(instance);
return 0;
}
@@ -4585,6 +4874,8 @@ static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance, unsigned int
bridged_payload, len - hdrlen);
}
+ ao2_unlock(instance1);
+ ao2_lock(instance);
return 0;
}
@@ -4621,9 +4912,11 @@ static int rtcp_mux(struct ast_rtp *rtp, const unsigned char *packet)
return 0;
}
+/*! \pre instance is locked */
static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtcp)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ struct ast_rtp_instance *instance1;
struct ast_sockaddr addr;
int res, hdrlen = 12, version, payloadtype, padding, mark, ext, cc, prev_seqno;
unsigned char *read_area = rtp->rawdata + AST_FRIENDLY_OFFSET;
@@ -4762,7 +5055,9 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc
}
/* If we are directly bridged to another instance send the audio directly out */
- if (ast_rtp_instance_get_bridged(instance) && !bridge_p2p_rtp_write(instance, rtpheader, res, hdrlen)) {
+ instance1 = ast_rtp_instance_get_bridged(instance);
+ if (instance1
+ && !bridge_p2p_rtp_write(instance, instance1, rtpheader, res, hdrlen)) {
return &ast_null_frame;
}
@@ -5053,6 +5348,7 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc
return AST_LIST_FIRST(&frames);
}
+/*! \pre instance is locked */
static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_property property, int value)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -5168,14 +5464,17 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro
} else {
if (rtp->rtcp) {
if (rtp->rtcp->schedid > -1) {
+ ao2_unlock(instance);
if (!ast_sched_del(rtp->sched, rtp->rtcp->schedid)) {
/* Successfully cancelled scheduler entry. */
ao2_ref(instance, -1);
} else {
/* Unable to cancel scheduler entry */
ast_debug(1, "Failed to tear down RTCP on RTP instance '%p'\n", instance);
+ ao2_lock(instance);
return;
}
+ ao2_lock(instance);
rtp->rtcp->schedid = -1;
}
if (rtp->rtcp->s > -1 && rtp->rtcp->s != rtp->s) {
@@ -5197,6 +5496,7 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro
return;
}
+/*! \pre instance is locked */
static int ast_rtp_fd(struct ast_rtp_instance *instance, int rtcp)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -5204,6 +5504,7 @@ static int ast_rtp_fd(struct ast_rtp_instance *instance, int rtcp)
return rtcp ? (rtp->rtcp ? rtp->rtcp->s : -1) : rtp->s;
}
+/*! \pre instance is locked */
static void ast_rtp_remote_address_set(struct ast_rtp_instance *instance, struct ast_sockaddr *addr)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -5245,19 +5546,26 @@ static void ast_rtp_remote_address_set(struct ast_rtp_instance *instance, struct
}
}
-/*! \brief Write t140 redundacy frame
+/*!
+ * \brief Write t140 redundacy frame
+ *
* \param data primary data to be buffered
+ *
+ * Scheduler callback
*/
static int red_write(const void *data)
{
struct ast_rtp_instance *instance = (struct ast_rtp_instance*) data;
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ ao2_lock(instance);
ast_rtp_write(instance, &rtp->red->t140);
+ ao2_unlock(instance);
return 1;
}
+/*! \pre instance is locked */
static int rtp_red_init(struct ast_rtp_instance *instance, int buffer_time, int *payloads, int generations)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -5290,6 +5598,7 @@ static int rtp_red_init(struct ast_rtp_instance *instance, int buffer_time, int
return 0;
}
+/*! \pre instance is locked */
static int rtp_red_buffer(struct ast_rtp_instance *instance, struct ast_frame *frame)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -5304,15 +5613,19 @@ static int rtp_red_buffer(struct ast_rtp_instance *instance, struct ast_frame *f
return 0;
}
+/*! \pre Neither instance0 nor instance1 are locked */
static int ast_rtp_local_bridge(struct ast_rtp_instance *instance0, struct ast_rtp_instance *instance1)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance0);
+ ao2_lock(instance0);
ast_set_flag(rtp, FLAG_NEED_MARKER_BIT);
+ ao2_unlock(instance0);
return 0;
}
+/*! \pre instance is locked */
static int ast_rtp_get_stat(struct ast_rtp_instance *instance, struct ast_rtp_instance_stats *stats, enum ast_rtp_instance_stat stat)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -5364,6 +5677,7 @@ static int ast_rtp_get_stat(struct ast_rtp_instance *instance, struct ast_rtp_in
return 0;
}
+/*! \pre Neither instance0 nor instance1 are locked */
static int ast_rtp_dtmf_compatible(struct ast_channel *chan0, struct ast_rtp_instance *instance0, struct ast_channel *chan1, struct ast_rtp_instance *instance1)
{
/* If both sides are not using the same method of DTMF transmission
@@ -5380,40 +5694,52 @@ static int ast_rtp_dtmf_compatible(struct ast_channel *chan0, struct ast_rtp_ins
(!ast_channel_tech(chan0)->send_digit_begin != !ast_channel_tech(chan1)->send_digit_begin)) ? 0 : 1);
}
+/*! \pre instance is NOT locked */
static void ast_rtp_stun_request(struct ast_rtp_instance *instance, struct ast_sockaddr *suggestion, const char *username)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
struct sockaddr_in suggestion_tmp;
+ /*
+ * The instance should not be locked because we can block
+ * waiting for a STUN respone.
+ */
ast_sockaddr_to_sin(suggestion, &suggestion_tmp);
ast_stun_request(rtp->s, &suggestion_tmp, username, NULL);
ast_sockaddr_from_sin(suggestion, &suggestion_tmp);
}
+/*! \pre instance is locked */
static void ast_rtp_stop(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
struct ast_sockaddr addr = { {0,} };
#ifdef HAVE_OPENSSL_SRTP
+ ao2_unlock(instance);
AST_SCHED_DEL_UNREF(rtp->sched, rtp->rekeyid, ao2_ref(instance, -1));
dtls_srtp_stop_timeout_timer(instance, rtp, 0);
if (rtp->rtcp) {
dtls_srtp_stop_timeout_timer(instance, rtp, 1);
}
+ ao2_lock(instance);
#endif
if (rtp->rtcp && rtp->rtcp->schedid > -1) {
+ ao2_unlock(instance);
if (!ast_sched_del(rtp->sched, rtp->rtcp->schedid)) {
/* successfully cancelled scheduler entry. */
ao2_ref(instance, -1);
}
+ ao2_lock(instance);
rtp->rtcp->schedid = -1;
}
if (rtp->red) {
+ ao2_unlock(instance);
AST_SCHED_DEL(rtp->sched, rtp->red->schedid);
+ ao2_lock(instance);
ast_free(rtp->red);
rtp->red = NULL;
}
@@ -5426,6 +5752,7 @@ static void ast_rtp_stop(struct ast_rtp_instance *instance)
ast_set_flag(rtp, FLAG_NEED_MARKER_BIT);
}
+/*! \pre instance is locked */
static int ast_rtp_qos_set(struct ast_rtp_instance *instance, int tos, int cos, const char *desc)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
@@ -5433,7 +5760,11 @@ static int ast_rtp_qos_set(struct ast_rtp_instance *instance, int tos, int cos,
return ast_set_qos(rtp->s, tos, cos, desc);
}
-/*! \brief generate comfort noice (CNG) */
+/*!
+ * \brief generate comfort noice (CNG)
+ *
+ * \pre instance is locked
+ */
static int ast_rtp_sendcng(struct ast_rtp_instance *instance, int level)
{
unsigned int *rtpheader;
@@ -5498,6 +5829,7 @@ static void dtls_perform_setup(struct dtls_details *dtls)
dtls->connection = AST_RTP_DTLS_CONNECTION_NEW;
}
+/*! \pre instance is locked */
static int ast_rtp_activate(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);