summaryrefslogtreecommitdiff
path: root/pjnath/src/pjnath
diff options
context:
space:
mode:
Diffstat (limited to 'pjnath/src/pjnath')
-rw-r--r--pjnath/src/pjnath/ice_session.c294
-rw-r--r--pjnath/src/pjnath/ice_strans.c182
-rw-r--r--pjnath/src/pjnath/nat_detect.c2
-rw-r--r--pjnath/src/pjnath/stun_session.c364
-rw-r--r--pjnath/src/pjnath/stun_sock.c189
-rw-r--r--pjnath/src/pjnath/stun_transaction.c135
-rw-r--r--pjnath/src/pjnath/turn_session.c173
-rw-r--r--pjnath/src/pjnath/turn_sock.c128
8 files changed, 832 insertions, 635 deletions
diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c
index 81aa8143..b6159f96 100644
--- a/pjnath/src/pjnath/ice_session.c
+++ b/pjnath/src/pjnath/ice_session.c
@@ -97,6 +97,7 @@ static pj_uint8_t cand_type_prefs[4] =
#endif
};
+#define THIS_FILE "ice_session.c"
#define CHECK_NAME_LEN 128
#define LOG4(expr) PJ_LOG(4,expr)
#define LOG5(expr) PJ_LOG(4,expr)
@@ -134,6 +135,7 @@ typedef struct timer_data
static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te);
static void on_ice_complete(pj_ice_sess *ice, pj_status_t status);
static void ice_keep_alive(pj_ice_sess *ice, pj_bool_t send_now);
+static void ice_on_destroy(void *obj);
static void destroy_ice(pj_ice_sess *ice,
pj_status_t reason);
static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -288,6 +290,7 @@ static pj_status_t init_comp(pj_ice_sess *ice,
/* Create STUN session for this candidate */
status = pj_stun_session_create(&ice->stun_cfg, NULL,
&sess_cb, PJ_TRUE,
+ ice->grp_lock,
&comp->stun_sess);
if (status != PJ_SUCCESS)
return status;
@@ -332,6 +335,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg,
const pj_ice_sess_cb *cb,
const pj_str_t *local_ufrag,
const pj_str_t *local_passwd,
+ pj_grp_lock_t *grp_lock,
pj_ice_sess **p_ice)
{
pj_pool_t *pool;
@@ -359,13 +363,20 @@ PJ_DEF(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg,
pj_ansi_snprintf(ice->obj_name, sizeof(ice->obj_name),
name, ice);
- status = pj_mutex_create_recursive(pool, ice->obj_name,
- &ice->mutex);
- if (status != PJ_SUCCESS) {
- destroy_ice(ice, status);
- return status;
+ if (grp_lock) {
+ ice->grp_lock = grp_lock;
+ } else {
+ status = pj_grp_lock_create(pool, NULL, &ice->grp_lock);
+ if (status != PJ_SUCCESS) {
+ pj_pool_release(pool);
+ return status;
+ }
}
+ pj_grp_lock_add_ref(ice->grp_lock);
+ pj_grp_lock_add_handler(ice->grp_lock, pool, ice,
+ &ice_on_destroy);
+
pj_memcpy(&ice->cb, cb, sizeof(*cb));
pj_memcpy(&ice->stun_cfg, stun_cfg, sizeof(*stun_cfg));
@@ -444,6 +455,21 @@ PJ_DEF(pj_status_t) pj_ice_sess_set_options(pj_ice_sess *ice,
/*
+ * Callback to really destroy the session
+ */
+static void ice_on_destroy(void *obj)
+{
+ pj_ice_sess *ice = (pj_ice_sess*) obj;
+
+ if (ice->pool) {
+ pj_pool_t *pool = ice->pool;
+ ice->pool = NULL;
+ pj_pool_release(pool);
+ }
+ LOG4((THIS_FILE, "ICE session %p destroyed", ice));
+}
+
+/*
* Destroy
*/
static void destroy_ice(pj_ice_sess *ice,
@@ -452,22 +478,20 @@ static void destroy_ice(pj_ice_sess *ice,
unsigned i;
if (reason == PJ_SUCCESS) {
- LOG4((ice->obj_name, "Destroying ICE session"));
+ LOG4((ice->obj_name, "Destroying ICE session %p", ice));
}
- ice->is_destroying = PJ_TRUE;
+ pj_grp_lock_acquire(ice->grp_lock);
- /* Let other callbacks finish */
- if (ice->mutex) {
- pj_mutex_lock(ice->mutex);
- pj_mutex_unlock(ice->mutex);
+ if (ice->is_destroying) {
+ pj_grp_lock_release(ice->grp_lock);
+ return;
}
- if (ice->timer.id) {
- pj_timer_heap_cancel(ice->stun_cfg.timer_heap,
- &ice->timer);
- ice->timer.id = PJ_FALSE;
- }
+ ice->is_destroying = PJ_TRUE;
+
+ pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap,
+ &ice->timer, PJ_FALSE);
for (i=0; i<ice->comp_cnt; ++i) {
if (ice->comp[i].stun_sess) {
@@ -476,21 +500,12 @@ static void destroy_ice(pj_ice_sess *ice,
}
}
- if (ice->clist.timer.id) {
- pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->clist.timer);
- ice->clist.timer.id = PJ_FALSE;
- }
-
- if (ice->mutex) {
- pj_mutex_destroy(ice->mutex);
- ice->mutex = NULL;
- }
+ pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap,
+ &ice->clist.timer,
+ PJ_FALSE);
- if (ice->pool) {
- pj_pool_t *pool = ice->pool;
- ice->pool = NULL;
- pj_pool_release(pool);
- }
+ pj_grp_lock_dec_ref(ice->grp_lock);
+ pj_grp_lock_release(ice->grp_lock);
}
@@ -709,7 +724,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
PJ_EINVAL);
PJ_ASSERT_RETURN(comp_id <= ice->comp_cnt, PJ_EINVAL);
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
if (ice->lcand_cnt >= PJ_ARRAY_SIZE(ice->lcand)) {
status = PJ_ETOOMANY;
@@ -749,7 +764,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
++ice->lcand_cnt;
on_error:
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return status;
}
@@ -766,7 +781,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice,
*cand_id = -1;
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
/* First find in valid list if we have nominated pair */
for (i=0; i<ice->valid_list.count; ++i) {
@@ -774,7 +789,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice,
if (check->lcand->comp_id == comp_id) {
*cand_id = GET_LCAND_ID(check->lcand);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
}
@@ -786,7 +801,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice,
lcand->type == PJ_ICE_CAND_TYPE_RELAYED)
{
*cand_id = GET_LCAND_ID(lcand);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
}
@@ -799,7 +814,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice,
lcand->type == PJ_ICE_CAND_TYPE_PRFLX))
{
*cand_id = GET_LCAND_ID(lcand);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
}
@@ -811,13 +826,13 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice,
lcand->type == PJ_ICE_CAND_TYPE_HOST)
{
*cand_id = GET_LCAND_ID(lcand);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
}
/* Still no candidate is found! :( */
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
pj_assert(!"Should have a candidate by now");
return PJ_EBUG;
@@ -1127,14 +1142,20 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te)
{
pj_ice_sess *ice = (pj_ice_sess*) te->user_data;
enum timer_type type = (enum timer_type)te->id;
- pj_bool_t has_mutex = PJ_TRUE;
PJ_UNUSED_ARG(th);
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
te->id = TIMER_NONE;
+ if (ice->is_destroying) {
+ /* Stray timer, could happen when destroy is invoked while callback
+ * is pending. */
+ pj_grp_lock_release(ice->grp_lock);
+ return;
+ }
+
switch (type) {
case TIMER_CONTROLLED_WAIT_NOM:
LOG4((ice->obj_name,
@@ -1157,8 +1178,6 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te)
/* Release mutex in case app destroy us in the callback */
ice_status = ice->ice_status;
on_ice_complete = ice->cb.on_ice_complete;
- has_mutex = PJ_FALSE;
- pj_mutex_unlock(ice->mutex);
/* Notify app about ICE completion*/
if (on_ice_complete)
@@ -1176,8 +1195,7 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te)
break;
}
- if (has_mutex)
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
}
/* Send keep-alive */
@@ -1235,8 +1253,10 @@ done:
ice->comp_cnt;
pj_time_val_normalize(&delay);
- ice->timer.id = TIMER_KEEP_ALIVE;
- pj_timer_heap_schedule(ice->stun_cfg.timer_heap, &ice->timer, &delay);
+ pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap,
+ &ice->timer, &delay,
+ TIMER_KEEP_ALIVE,
+ ice->grp_lock);
} else {
pj_assert(!"Not expected any timer active");
@@ -1250,10 +1270,8 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status)
ice->is_complete = PJ_TRUE;
ice->ice_status = status;
- if (ice->timer.id != TIMER_NONE) {
- pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer);
- ice->timer.id = TIMER_NONE;
- }
+ pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer,
+ TIMER_NONE);
/* Log message */
LOG4((ice->obj_name, "ICE process complete, status=%s",
@@ -1266,9 +1284,10 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status)
if (ice->cb.on_ice_complete) {
pj_time_val delay = {0, 0};
- ice->timer.id = TIMER_COMPLETION_CALLBACK;
- pj_timer_heap_schedule(ice->stun_cfg.timer_heap,
- &ice->timer, &delay);
+ pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap,
+ &ice->timer, &delay,
+ TIMER_COMPLETION_CALLBACK,
+ ice->grp_lock);
}
}
}
@@ -1496,10 +1515,11 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice,
delay.msec = ice->opt.controlled_agent_want_nom_timeout;
pj_time_val_normalize(&delay);
- ice->timer.id = TIMER_CONTROLLED_WAIT_NOM;
- pj_timer_heap_schedule(ice->stun_cfg.timer_heap,
- &ice->timer,
- &delay);
+ pj_timer_heap_schedule_w_grp_lock(
+ ice->stun_cfg.timer_heap,
+ &ice->timer, &delay,
+ TIMER_CONTROLLED_WAIT_NOM,
+ ice->grp_lock);
LOG5((ice->obj_name,
"All checks have completed. Controlled agent now "
@@ -1575,10 +1595,8 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice,
"Scheduling nominated check in %d ms",
ice->opt.nominated_check_delay));
- if (ice->timer.id != TIMER_NONE) {
- pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer);
- ice->timer.id = TIMER_NONE;
- }
+ pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer,
+ TIMER_NONE);
/* All components have valid pair. Let connectivity checks run for
* a little bit more time, then start our nominated check.
@@ -1587,8 +1605,10 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice,
delay.msec = ice->opt.nominated_check_delay;
pj_time_val_normalize(&delay);
- ice->timer.id = TIMER_START_NOMINATED_CHECK;
- pj_timer_heap_schedule(ice->stun_cfg.timer_heap, &ice->timer, &delay);
+ pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap,
+ &ice->timer, &delay,
+ TIMER_START_NOMINATED_CHECK,
+ ice->grp_lock);
return PJ_FALSE;
}
@@ -1618,7 +1638,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list(
PJ_ASSERT_RETURN(rcand_cnt + ice->rcand_cnt <= PJ_ICE_MAX_CAND,
PJ_ETOOMANY);
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
/* Save credentials */
username.ptr = buf;
@@ -1666,7 +1686,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list(
pj_ice_sess_check *chk = &clist->checks[clist->count];
if (clist->count >= PJ_ICE_MAX_CHECKS) {
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_ETOOMANY;
}
@@ -1694,7 +1714,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list(
/* This could happen if candidates have no matching address families */
if (clist->count == 0) {
LOG4((ice->obj_name, "Error: no checklist can be created"));
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_ENOTFOUND;
}
@@ -1704,7 +1724,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list(
/* Prune the checklist */
status = prune_checklist(ice, clist);
if (status != PJ_SUCCESS) {
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return status;
}
@@ -1731,7 +1751,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list(
/* Log checklist */
dump_checklist("Checklist created:", ice, clist);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
@@ -1850,13 +1870,10 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
ice = td->ice;
clist = td->clist;
- if (ice->is_destroying)
- return PJ_SUCCESS;
-
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
if (ice->is_destroying) {
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
@@ -1878,7 +1895,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
if (check->state == PJ_ICE_SESS_CHECK_STATE_WAITING) {
status = perform_check(ice, clist, i, ice->is_nominating);
if (status != PJ_SUCCESS) {
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
pj_log_pop_indent();
return status;
}
@@ -1898,7 +1915,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
if (check->state == PJ_ICE_SESS_CHECK_STATE_FROZEN) {
status = perform_check(ice, clist, i, ice->is_nominating);
if (status != PJ_SUCCESS) {
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
pj_log_pop_indent();
return status;
}
@@ -1915,12 +1932,12 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
/* Schedule for next timer */
pj_time_val timeout = {0, PJ_ICE_TA_VAL};
- te->id = PJ_TRUE;
pj_time_val_normalize(&timeout);
- pj_timer_heap_schedule(th, te, &timeout);
+ pj_timer_heap_schedule_w_grp_lock(th, te, &timeout, PJ_TRUE,
+ ice->grp_lock);
}
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
pj_log_pop_indent();
return PJ_SUCCESS;
}
@@ -1940,8 +1957,8 @@ static void start_nominated_check(pj_ice_sess *ice)
/* Stop our timer if it's active */
if (ice->timer.id == TIMER_START_NOMINATED_CHECK) {
- pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer);
- ice->timer.id = TIMER_NONE;
+ pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer,
+ TIMER_NONE);
}
/* For each component, set the check state of valid check with
@@ -1969,18 +1986,15 @@ static void start_nominated_check(pj_ice_sess *ice)
}
/* And (re)start the periodic check */
- if (ice->clist.timer.id) {
- pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->clist.timer);
- ice->clist.timer.id = PJ_FALSE;
- }
+ pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap,
+ &ice->clist.timer, PJ_FALSE);
- ice->clist.timer.id = PJ_TRUE;
delay.sec = delay.msec = 0;
- status = pj_timer_heap_schedule(ice->stun_cfg.timer_heap,
- &ice->clist.timer, &delay);
- if (status != PJ_SUCCESS) {
- ice->clist.timer.id = PJ_FALSE;
- } else {
+ status = pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap,
+ &ice->clist.timer, &delay,
+ PJ_TRUE,
+ ice->grp_lock);
+ if (status == PJ_SUCCESS) {
LOG5((ice->obj_name, "Periodic timer rescheduled.."));
}
@@ -2030,7 +2044,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice)
PJ_ASSERT_RETURN(ice->clist.count > 0, PJ_EINVALIDOP);
/* Lock session */
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
LOG4((ice->obj_name, "Starting ICE check.."));
pj_log_push_indent();
@@ -2060,7 +2074,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice)
}
if (i == clist->count) {
pj_assert(!"Unable to find checklist for component 1");
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
pj_log_pop_indent();
return PJNATH_EICEINCOMPID;
}
@@ -2114,15 +2128,15 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice)
* instead to reduce stack usage:
* return start_periodic_check(ice->stun_cfg.timer_heap, &clist->timer);
*/
- clist->timer.id = PJ_TRUE;
delay.sec = delay.msec = 0;
- status = pj_timer_heap_schedule(ice->stun_cfg.timer_heap,
- &clist->timer, &delay);
+ status = pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap,
+ &clist->timer, &delay,
+ PJ_TRUE, ice->grp_lock);
if (status != PJ_SUCCESS) {
clist->timer.id = PJ_FALSE;
}
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
pj_log_pop_indent();
return status;
}
@@ -2143,9 +2157,22 @@ static pj_status_t on_stun_send_msg(pj_stun_session *sess,
stun_data *sd = (stun_data*) pj_stun_session_get_user_data(sess);
pj_ice_sess *ice = sd->ice;
pj_ice_msg_data *msg_data = (pj_ice_msg_data*) token;
+ pj_status_t status;
- return (*ice->cb.on_tx_pkt)(ice, sd->comp_id, msg_data->transport_id,
- pkt, pkt_size, dst_addr, addr_len);
+ pj_grp_lock_acquire(ice->grp_lock);
+
+ if (ice->is_destroying) {
+ /* Stray retransmit timer that could happen while
+ * we're being destroyed */
+ pj_grp_lock_release(ice->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
+ status = (*ice->cb.on_tx_pkt)(ice, sd->comp_id, msg_data->transport_id,
+ pkt, pkt_size, dst_addr, addr_len);
+
+ pj_grp_lock_release(ice->grp_lock);
+ return status;
}
@@ -2180,7 +2207,13 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
pj_assert(tdata == check->tdata);
check->tdata = NULL;
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
+
+ if (ice->is_destroying) {
+ /* Not sure if this is possible but just in case */
+ pj_grp_lock_release(ice->grp_lock);
+ return;
+ }
/* Init lcand to NULL. lcand will be found from the mapped address
* found in the response.
@@ -2231,7 +2264,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
perform_check(ice, clist, msg_data->data.req.ckid,
check->nominated || ice->is_nominating);
pj_log_pop_indent();
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return;
}
@@ -2246,7 +2279,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status);
on_check_complete(ice, check);
pj_log_pop_indent();
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return;
}
@@ -2270,7 +2303,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status);
on_check_complete(ice, check);
pj_log_pop_indent();
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return;
}
@@ -2303,7 +2336,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED,
PJNATH_ESTUNNOMAPPEDADDR);
on_check_complete(ice, check);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return;
}
@@ -2351,7 +2384,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED,
status);
on_check_complete(ice, check);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return;
}
@@ -2411,11 +2444,11 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
*/
if (on_check_complete(ice, check)) {
/* ICE complete! */
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return;
}
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
}
@@ -2456,7 +2489,12 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
sd = (stun_data*) pj_stun_session_get_user_data(sess);
ice = sd->ice;
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
+
+ if (ice->is_destroying) {
+ pj_grp_lock_release(ice->grp_lock);
+ return PJ_EINVALIDOP;
+ }
/*
* Note:
@@ -2471,7 +2509,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_PRIORITY, 0);
if (prio_attr == NULL) {
LOG5((ice->obj_name, "Received Binding request with no PRIORITY"));
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
@@ -2516,7 +2554,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
pj_stun_session_respond(sess, rdata, PJ_STUN_SC_ROLE_CONFLICT,
NULL, token, PJ_TRUE,
src_addr, src_addr_len);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
@@ -2528,7 +2566,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
pj_stun_session_respond(sess, rdata, PJ_STUN_SC_ROLE_CONFLICT,
NULL, token, PJ_TRUE,
src_addr, src_addr_len);
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
} else {
/* Switch role to controlled */
@@ -2543,7 +2581,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
*/
status = pj_stun_session_create_res(sess, rdata, 0, NULL, &tdata);
if (status != PJ_SUCCESS) {
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return status;
}
@@ -2595,7 +2633,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
handle_incoming_check(ice, rcheck);
}
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_SUCCESS;
}
@@ -2884,18 +2922,23 @@ PJ_DEF(pj_status_t) pj_ice_sess_send_data(pj_ice_sess *ice,
return PJNATH_EICEINCOMPID;
}
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
+
+ if (ice->is_destroying) {
+ pj_grp_lock_release(ice->grp_lock);
+ return PJ_EINVALIDOP;
+ }
comp = find_comp(ice, comp_id);
if (comp == NULL) {
status = PJNATH_EICEINCOMPID;
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
goto on_return;
}
if (comp->valid_check == NULL) {
status = PJNATH_EICEINPROGRESS;
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
goto on_return;
}
@@ -2904,7 +2947,9 @@ PJ_DEF(pj_status_t) pj_ice_sess_send_data(pj_ice_sess *ice,
pj_sockaddr_cp(&addr, &comp->valid_check->rcand->addr);
/* Release the mutex now to avoid deadlock (see ticket #1451). */
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
+
+ PJ_RACE_ME(5);
status = (*ice->cb.on_tx_pkt)(ice, comp_id, transport_id,
data, data_len,
@@ -2931,11 +2976,16 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice,
PJ_ASSERT_RETURN(ice, PJ_EINVAL);
- pj_mutex_lock(ice->mutex);
+ pj_grp_lock_acquire(ice->grp_lock);
+
+ if (ice->is_destroying) {
+ pj_grp_lock_release(ice->grp_lock);
+ return PJ_EINVALIDOP;
+ }
comp = find_comp(ice, comp_id);
if (comp == NULL) {
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJNATH_EICEINCOMPID;
}
@@ -2948,7 +2998,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice,
}
if (msg_data == NULL) {
pj_assert(!"Invalid transport ID");
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
return PJ_EINVAL;
}
@@ -2968,12 +3018,14 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice,
LOG4((ice->obj_name, "Error processing incoming message: %s",
ice->tmp.errmsg));
}
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
} else {
/* Not a STUN packet. Call application's callback instead, but release
* the mutex now or otherwise we may get deadlock.
*/
- pj_mutex_unlock(ice->mutex);
+ pj_grp_lock_release(ice->grp_lock);
+
+ PJ_RACE_ME(5);
(*ice->cb.on_rx_data)(ice, comp_id, transport_id, pkt, pkt_size,
src_addr, src_addr_len);
diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c
index a798586b..4c1182fd 100644
--- a/pjnath/src/pjnath/ice_strans.c
+++ b/pjnath/src/pjnath/ice_strans.c
@@ -126,13 +126,11 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state,
/* Forward decls */
+static void ice_st_on_destroy(void *obj);
static void destroy_ice_st(pj_ice_strans *ice_st);
#define ice_st_perror(ice_st,msg,rc) pjnath_perror(ice_st->obj_name,msg,rc)
static void sess_init_update(pj_ice_strans *ice_st);
-static void sess_add_ref(pj_ice_strans *ice_st);
-static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st);
-
/**
* This structure describes an ICE stream transport component. A component
* in ICE stream transport typically corresponds to a single socket created
@@ -172,7 +170,7 @@ struct pj_ice_strans
void *user_data; /**< Application data. */
pj_ice_strans_cfg cfg; /**< Configuration. */
pj_ice_strans_cb cb; /**< Application callback. */
- pj_lock_t *init_lock; /**< Initialization mutex. */
+ pj_grp_lock_t *grp_lock; /**< Group lock. */
pj_ice_strans_state state; /**< Session state. */
pj_ice_sess *ice; /**< ICE session. */
@@ -183,7 +181,6 @@ struct pj_ice_strans
pj_timer_entry ka_timer; /**< STUN keep-alive timer. */
- pj_atomic_t *busy_cnt; /**< To prevent destroy */
pj_bool_t destroy_req;/**< Destroy has been called? */
pj_bool_t cb_called; /**< Init error callback called?*/
};
@@ -551,23 +548,22 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name,
comp_cnt));
pj_log_push_indent();
- pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg);
- pj_memcpy(&ice_st->cb, cb, sizeof(*cb));
-
- status = pj_atomic_create(pool, 0, &ice_st->busy_cnt);
- if (status != PJ_SUCCESS) {
- destroy_ice_st(ice_st);
- return status;
- }
-
- status = pj_lock_create_recursive_mutex(pool, ice_st->obj_name,
- &ice_st->init_lock);
+ status = pj_grp_lock_create(pool, NULL, &ice_st->grp_lock);
if (status != PJ_SUCCESS) {
- destroy_ice_st(ice_st);
+ pj_pool_release(pool);
pj_log_pop_indent();
return status;
}
+ pj_grp_lock_add_ref(ice_st->grp_lock);
+ pj_grp_lock_add_handler(ice_st->grp_lock, pool, ice_st,
+ &ice_st_on_destroy);
+
+ pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg);
+ ice_st->cfg.stun.cfg.grp_lock = ice_st->grp_lock;
+ ice_st->cfg.turn.cfg.grp_lock = ice_st->grp_lock;
+ pj_memcpy(&ice_st->cb, cb, sizeof(*cb));
+
ice_st->comp_cnt = comp_cnt;
ice_st->comp = (pj_ice_strans_comp**)
pj_pool_calloc(pool, comp_cnt, sizeof(pj_ice_strans_comp*));
@@ -578,12 +574,12 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name,
/* Acquire initialization mutex to prevent callback to be
* called before we finish initialization.
*/
- pj_lock_acquire(ice_st->init_lock);
+ pj_grp_lock_acquire(ice_st->grp_lock);
for (i=0; i<comp_cnt; ++i) {
status = create_comp(ice_st, i+1);
if (status != PJ_SUCCESS) {
- pj_lock_release(ice_st->init_lock);
+ pj_grp_lock_release(ice_st->grp_lock);
destroy_ice_st(ice_st);
pj_log_pop_indent();
return status;
@@ -591,9 +587,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name,
}
/* Done with initialization */
- pj_lock_release(ice_st->init_lock);
+ pj_grp_lock_release(ice_st->grp_lock);
- PJ_LOG(4,(ice_st->obj_name, "ICE stream transport created"));
+ PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p created", ice_st));
*p_ice_st = ice_st;
@@ -605,14 +601,35 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name,
return PJ_SUCCESS;
}
+/* REALLY destroy ICE */
+static void ice_st_on_destroy(void *obj)
+{
+ pj_ice_strans *ice_st = (pj_ice_strans*)obj;
+
+ PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p destroyed", obj));
+
+ /* Done */
+ pj_pool_release(ice_st->pool);
+}
+
/* Destroy ICE */
static void destroy_ice_st(pj_ice_strans *ice_st)
{
unsigned i;
- PJ_LOG(5,(ice_st->obj_name, "ICE stream transport destroying.."));
+ PJ_LOG(5,(ice_st->obj_name, "ICE stream transport %p destroy request..",
+ ice_st));
pj_log_push_indent();
+ pj_grp_lock_acquire(ice_st->grp_lock);
+
+ if (ice_st->destroy_req) {
+ pj_grp_lock_release(ice_st->grp_lock);
+ return;
+ }
+
+ ice_st->destroy_req = PJ_TRUE;
+
/* Destroy ICE if we have ICE */
if (ice_st->ice) {
pj_ice_sess_destroy(ice_st->ice);
@@ -623,38 +640,19 @@ static void destroy_ice_st(pj_ice_strans *ice_st)
for (i=0; i<ice_st->comp_cnt; ++i) {
if (ice_st->comp[i]) {
if (ice_st->comp[i]->stun_sock) {
- pj_stun_sock_set_user_data(ice_st->comp[i]->stun_sock, NULL);
pj_stun_sock_destroy(ice_st->comp[i]->stun_sock);
ice_st->comp[i]->stun_sock = NULL;
}
if (ice_st->comp[i]->turn_sock) {
- pj_turn_sock_set_user_data(ice_st->comp[i]->turn_sock, NULL);
pj_turn_sock_destroy(ice_st->comp[i]->turn_sock);
ice_st->comp[i]->turn_sock = NULL;
}
}
}
- ice_st->comp_cnt = 0;
-
- /* Destroy mutex */
- if (ice_st->init_lock) {
- pj_lock_acquire(ice_st->init_lock);
- pj_lock_release(ice_st->init_lock);
- pj_lock_destroy(ice_st->init_lock);
- ice_st->init_lock = NULL;
- }
- /* Destroy reference counter */
- if (ice_st->busy_cnt) {
- pj_assert(pj_atomic_get(ice_st->busy_cnt)==0);
- pj_atomic_destroy(ice_st->busy_cnt);
- ice_st->busy_cnt = NULL;
- }
+ pj_grp_lock_dec_ref(ice_st->grp_lock);
+ pj_grp_lock_release(ice_st->grp_lock);
- PJ_LOG(4,(ice_st->obj_name, "ICE stream transport destroyed"));
-
- /* Done */
- pj_pool_release(ice_st->pool);
pj_log_pop_indent();
}
@@ -739,45 +737,12 @@ static void sess_init_update(pj_ice_strans *ice_st)
*/
PJ_DEF(pj_status_t) pj_ice_strans_destroy(pj_ice_strans *ice_st)
{
- PJ_ASSERT_RETURN(ice_st, PJ_EINVAL);
-
- sess_add_ref(ice_st);
- ice_st->destroy_req = PJ_TRUE;
- if (sess_dec_ref(ice_st)) {
- PJ_LOG(5,(ice_st->obj_name,
- "ICE strans object is busy, will destroy later"));
- return PJ_EPENDING;
- }
-
+ destroy_ice_st(ice_st);
return PJ_SUCCESS;
}
/*
- * Increment busy counter.
- */
-static void sess_add_ref(pj_ice_strans *ice_st)
-{
- pj_atomic_inc(ice_st->busy_cnt);
-}
-
-/*
- * Decrement busy counter. If the counter has reached zero and destroy
- * has been requested, destroy the object and return FALSE.
- */
-static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st)
-{
- int count = pj_atomic_dec_and_get(ice_st->busy_cnt);
- pj_assert(count >= 0);
- if (count==0 && ice_st->destroy_req) {
- destroy_ice_st(ice_st);
- return PJ_FALSE;
- } else {
- return PJ_TRUE;
- }
-}
-
-/*
* Get user data
*/
PJ_DEF(void*) pj_ice_strans_get_user_data(pj_ice_strans *ice_st)
@@ -840,7 +805,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st,
/* Create! */
status = pj_ice_sess_create(&ice_st->cfg.stun_cfg, ice_st->obj_name, role,
ice_st->comp_cnt, &ice_cb,
- local_ufrag, local_passwd, &ice_st->ice);
+ local_ufrag, local_passwd,
+ ice_st->grp_lock,
+ &ice_st->ice);
if (status != PJ_SUCCESS)
return status;
@@ -1255,7 +1222,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status)
pj_time_val t;
unsigned msec;
- sess_add_ref(ice_st);
+ pj_grp_lock_add_ref(ice_st->grp_lock);
pj_gettimeofday(&t);
PJ_TIME_VAL_SUB(t, ice_st->start_time);
@@ -1337,7 +1304,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status)
}
- sess_dec_ref(ice_st);
+ pj_grp_lock_dec_ref(ice_st->grp_lock);
}
/*
@@ -1426,7 +1393,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock,
ice_st = comp->ice_st;
- sess_add_ref(ice_st);
+ pj_grp_lock_add_ref(ice_st->grp_lock);
if (ice_st->ice == NULL) {
/* The ICE session is gone, but we're still receiving packets.
@@ -1451,7 +1418,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock,
}
}
- return sess_dec_ref(ice_st);
+ return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE;
}
/* Notifification when asynchronous send operation to the STUN socket
@@ -1482,10 +1449,10 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
comp = (pj_ice_strans_comp*) pj_stun_sock_get_user_data(stun_sock);
ice_st = comp->ice_st;
- sess_add_ref(ice_st);
+ pj_grp_lock_add_ref(ice_st->grp_lock);
/* Wait until initialization completes */
- pj_lock_acquire(ice_st->init_lock);
+ pj_grp_lock_acquire(ice_st->grp_lock);
/* Find the srflx cancidate */
for (i=0; i<comp->cand_cnt; ++i) {
@@ -1495,14 +1462,14 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
}
}
- pj_lock_release(ice_st->init_lock);
+ pj_grp_lock_release(ice_st->grp_lock);
/* It is possible that we don't have srflx candidate even though this
* callback is called. This could happen when we cancel adding srflx
* candidate due to initialization error.
*/
if (cand == NULL) {
- return sess_dec_ref(ice_st);
+ return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE;
}
switch (op) {
@@ -1618,7 +1585,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
break;
}
- return sess_dec_ref(ice_st);
+ return pj_grp_lock_dec_ref(ice_st->grp_lock)? PJ_FALSE : PJ_TRUE;
}
/* Callback when TURN socket has received a packet */
@@ -1637,7 +1604,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock,
return;
}
- sess_add_ref(comp->ice_st);
+ pj_grp_lock_add_ref(comp->ice_st->grp_lock);
if (comp->ice_st->ice == NULL) {
/* The ICE session is gone, but we're still receiving packets.
@@ -1664,7 +1631,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock,
}
}
- sess_dec_ref(comp->ice_st);
+ pj_grp_lock_dec_ref(comp->ice_st->grp_lock);
}
@@ -1686,7 +1653,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state,
pj_turn_state_name(old_state), pj_turn_state_name(new_state)));
pj_log_push_indent();
- sess_add_ref(comp->ice_st);
+ pj_grp_lock_add_ref(comp->ice_st->grp_lock);
if (new_state == PJ_TURN_STATE_READY) {
pj_turn_session_info rel_info;
@@ -1700,7 +1667,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state,
pj_turn_sock_get_info(turn_sock, &rel_info);
/* Wait until initialization completes */
- pj_lock_acquire(comp->ice_st->init_lock);
+ pj_grp_lock_acquire(comp->ice_st->grp_lock);
/* Find relayed candidate in the component */
for (i=0; i<comp->cand_cnt; ++i) {
@@ -1711,7 +1678,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state,
}
pj_assert(cand != NULL);
- pj_lock_release(comp->ice_st->init_lock);
+ pj_grp_lock_release(comp->ice_st->grp_lock);
/* Update candidate */
pj_sockaddr_cp(&cand->addr, &rel_info.relay_addr);
@@ -1744,22 +1711,27 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state,
pj_turn_sock_set_user_data(turn_sock, NULL);
comp->turn_sock = NULL;
- /* Set session to fail if we're still initializing */
- if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) {
- sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT,
- "TURN allocation failed", info.last_status);
- } else if (comp->turn_err_cnt > 1) {
- sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE,
- "TURN refresh failed", info.last_status);
- } else {
- PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status,
- "Comp %d: TURN allocation failed, retrying",
- comp->comp_id));
- add_update_turn(comp->ice_st, comp);
+ /* Set session to fail on error. last_status PJ_SUCCESS means normal
+ * deallocation, which should not trigger sess_fail as it may have
+ * been initiated by ICE destroy
+ */
+ if (info.last_status != PJ_SUCCESS) {
+ if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) {
+ sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT,
+ "TURN allocation failed", info.last_status);
+ } else if (comp->turn_err_cnt > 1) {
+ sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE,
+ "TURN refresh failed", info.last_status);
+ } else {
+ PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status,
+ "Comp %d: TURN allocation failed, retrying",
+ comp->comp_id));
+ add_update_turn(comp->ice_st, comp);
+ }
}
}
- sess_dec_ref(comp->ice_st);
+ pj_grp_lock_dec_ref(comp->ice_st->grp_lock);
pj_log_pop_indent();
}
diff --git a/pjnath/src/pjnath/nat_detect.c b/pjnath/src/pjnath/nat_detect.c
index f9644c2b..3dfed92a 100644
--- a/pjnath/src/pjnath/nat_detect.c
+++ b/pjnath/src/pjnath/nat_detect.c
@@ -307,7 +307,7 @@ PJ_DEF(pj_status_t) pj_stun_detect_nat_type(const pj_sockaddr_in *server,
sess_cb.on_request_complete = &on_request_complete;
sess_cb.on_send_msg = &on_send_msg;
status = pj_stun_session_create(stun_cfg, pool->obj_name, &sess_cb,
- PJ_FALSE, &sess->stun_sess);
+ PJ_FALSE, NULL, &sess->stun_sess);
if (status != PJ_SUCCESS)
goto on_error;
diff --git a/pjnath/src/pjnath/stun_session.c b/pjnath/src/pjnath/stun_session.c
index 4feae906..173c729d 100644
--- a/pjnath/src/pjnath/stun_session.c
+++ b/pjnath/src/pjnath/stun_session.c
@@ -25,13 +25,10 @@ struct pj_stun_session
{
pj_stun_config *cfg;
pj_pool_t *pool;
- pj_lock_t *lock;
- pj_bool_t delete_lock;
+ pj_grp_lock_t *grp_lock;
pj_stun_session_cb cb;
void *user_data;
-
- pj_atomic_t *busy;
- pj_bool_t destroy_request;
+ pj_bool_t is_destroying;
pj_bool_t use_fingerprint;
@@ -55,14 +52,15 @@ struct pj_stun_session
};
#define SNAME(s_) ((s_)->pool->obj_name)
+#define THIS_FILE "stun_session.c"
-#if PJ_LOG_MAX_LEVEL >= 5
+#if 1
# define TRACE_(expr) PJ_LOG(5,expr)
#else
# define TRACE_(expr)
#endif
-#define LOG_ERR_(sess,title,rc) pjnath_perror(sess->pool->obj_name,title,rc)
+#define LOG_ERR_(sess,title,rc) PJ_PERROR(3,(sess->pool->obj_name,rc,title))
#define TDATA_POOL_SIZE PJNATH_POOL_LEN_STUN_TDATA
#define TDATA_POOL_INC PJNATH_POOL_INC_STUN_TDATA
@@ -77,6 +75,7 @@ static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx,
const void *stun_pkt,
pj_size_t pkt_size);
static void stun_tsx_on_destroy(pj_stun_client_tsx *tsx);
+static void stun_sess_on_destroy(void *comp);
static pj_stun_tsx_cb tsx_cb =
{
@@ -148,31 +147,38 @@ static void stun_tsx_on_destroy(pj_stun_client_tsx *tsx)
pj_stun_tx_data *tdata;
tdata = (pj_stun_tx_data*) pj_stun_client_tsx_get_data(tsx);
- tsx_erase(tdata->sess, tdata);
+ pj_stun_client_tsx_stop(tsx);
+ if (tdata) {
+ tsx_erase(tdata->sess, tdata);
+ pj_pool_release(tdata->pool);
+ }
- pj_stun_client_tsx_destroy(tsx);
- pj_pool_release(tdata->pool);
+ TRACE_((THIS_FILE, "STUN transaction %p destroyed", tsx));
}
static void destroy_tdata(pj_stun_tx_data *tdata, pj_bool_t force)
{
+ TRACE_((THIS_FILE, "tdata %p destroy request, force=%d, tsx=%p", tdata,
+ force, tdata->client_tsx));
+
if (tdata->res_timer.id != PJ_FALSE) {
- pj_timer_heap_cancel(tdata->sess->cfg->timer_heap,
- &tdata->res_timer);
- tdata->res_timer.id = PJ_FALSE;
+ pj_timer_heap_cancel_if_active(tdata->sess->cfg->timer_heap,
+ &tdata->res_timer, PJ_FALSE);
pj_list_erase(tdata);
}
if (force) {
+ pj_list_erase(tdata);
if (tdata->client_tsx) {
- tsx_erase(tdata->sess, tdata);
- pj_stun_client_tsx_destroy(tdata->client_tsx);
+ pj_stun_client_tsx_stop(tdata->client_tsx);
+ pj_stun_client_tsx_set_data(tdata->client_tsx, NULL);
}
pj_pool_release(tdata->pool);
} else {
if (tdata->client_tsx) {
- pj_time_val delay = {2, 0};
+ /* "Probably" this is to absorb retransmission */
+ pj_time_val delay = {0, 300};
pj_stun_client_tsx_schedule_destroy(tdata->client_tsx, &delay);
} else {
@@ -206,7 +212,7 @@ static void on_cache_timeout(pj_timer_heap_t *timer_heap,
PJ_LOG(5,(SNAME(tdata->sess), "Response cache deleted"));
pj_list_erase(tdata);
- pj_stun_msg_destroy_tdata(tdata->sess, tdata);
+ destroy_tdata(tdata, PJ_FALSE);
}
static pj_status_t apply_msg_options(pj_stun_session *sess,
@@ -419,8 +425,12 @@ static void stun_tsx_on_complete(pj_stun_client_tsx *tsx,
sess = tdata->sess;
/* Lock the session and prevent user from destroying us in the callback */
- pj_atomic_inc(sess->busy);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_stun_msg_destroy_tdata(sess, tdata);
+ pj_grp_lock_release(sess->grp_lock);
+ return;
+ }
/* Handle authentication challenge */
handle_auth_challenge(sess, tdata, response, src_addr,
@@ -434,15 +444,13 @@ static void stun_tsx_on_complete(pj_stun_client_tsx *tsx,
/* Destroy the transmit data. This will remove the transaction
* from the pending list too.
*/
- pj_stun_msg_destroy_tdata(sess, tdata);
+ if (status == PJNATH_ESTUNTIMEDOUT)
+ destroy_tdata(tdata, PJ_TRUE);
+ else
+ destroy_tdata(tdata, PJ_FALSE);
tdata = NULL;
- pj_lock_release(sess->lock);
-
- if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) {
- pj_stun_session_destroy(sess);
- return;
- }
+ pj_grp_lock_release(sess->grp_lock);
}
static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx,
@@ -457,20 +465,21 @@ static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx,
sess = tdata->sess;
/* Lock the session and prevent user from destroying us in the callback */
- pj_atomic_inc(sess->busy);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ /* Stray timer */
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
status = sess->cb.on_send_msg(tdata->sess, tdata->token, stun_pkt,
pkt_size, tdata->dst_addr,
tdata->addr_len);
- pj_lock_release(sess->lock);
+ if (pj_grp_lock_release(sess->grp_lock))
+ return PJ_EGONE;
- if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) {
- pj_stun_session_destroy(sess);
- return PJNATH_ESTUNDESTROYED;
- } else {
- return status;
- }
+ return status;
}
/* **************************************************************************/
@@ -479,6 +488,7 @@ PJ_DEF(pj_status_t) pj_stun_session_create( pj_stun_config *cfg,
const char *name,
const pj_stun_session_cb *cb,
pj_bool_t fingerprint,
+ pj_grp_lock_t *grp_lock,
pj_stun_session **p_sess)
{
pj_pool_t *pool;
@@ -501,46 +511,37 @@ PJ_DEF(pj_status_t) pj_stun_session_create( pj_stun_config *cfg,
sess->use_fingerprint = fingerprint;
sess->log_flag = 0xFFFF;
+ if (grp_lock) {
+ sess->grp_lock = grp_lock;
+ } else {
+ status = pj_grp_lock_create(pool, NULL, &sess->grp_lock);
+ if (status != PJ_SUCCESS) {
+ pj_pool_release(pool);
+ return status;
+ }
+ }
+
+ pj_grp_lock_add_ref(sess->grp_lock);
+ pj_grp_lock_add_handler(sess->grp_lock, pool, sess,
+ &stun_sess_on_destroy);
+
pj_stun_session_set_software_name(sess, &cfg->software_name);
- sess->rx_pool = pj_pool_create(sess->cfg->pf, name,
- PJNATH_POOL_LEN_STUN_TDATA,
+ sess->rx_pool = pj_pool_create(sess->cfg->pf, name,
+ PJNATH_POOL_LEN_STUN_TDATA,
PJNATH_POOL_INC_STUN_TDATA, NULL);
pj_list_init(&sess->pending_request_list);
pj_list_init(&sess->cached_response_list);
- status = pj_lock_create_recursive_mutex(pool, name, &sess->lock);
- if (status != PJ_SUCCESS) {
- pj_pool_release(pool);
- return status;
- }
- sess->delete_lock = PJ_TRUE;
-
- status = pj_atomic_create(pool, 0, &sess->busy);
- if (status != PJ_SUCCESS) {
- pj_lock_destroy(sess->lock);
- pj_pool_release(pool);
- return status;
- }
-
*p_sess = sess;
return PJ_SUCCESS;
}
-PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess)
+static void stun_sess_on_destroy(void *comp)
{
- PJ_ASSERT_RETURN(sess, PJ_EINVAL);
-
- pj_lock_acquire(sess->lock);
-
- /* Can't destroy if we're in a callback */
- sess->destroy_request = PJ_TRUE;
- if (pj_atomic_get(sess->busy)) {
- pj_lock_release(sess->lock);
- return PJ_EPENDING;
- }
+ pj_stun_session *sess = (pj_stun_session*)comp;
while (!pj_list_empty(&sess->pending_request_list)) {
pj_stun_tx_data *tdata = sess->pending_request_list.next;
@@ -551,11 +552,6 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess)
pj_stun_tx_data *tdata = sess->cached_response_list.next;
destroy_tdata(tdata, PJ_TRUE);
}
- pj_lock_release(sess->lock);
-
- if (sess->delete_lock) {
- pj_lock_destroy(sess->lock);
- }
if (sess->rx_pool) {
pj_pool_release(sess->rx_pool);
@@ -564,6 +560,47 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess)
pj_pool_release(sess->pool);
+ TRACE_((THIS_FILE, "STUN session %p destroyed", sess));
+}
+
+PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess)
+{
+ pj_stun_tx_data *tdata;
+
+ PJ_ASSERT_RETURN(sess, PJ_EINVAL);
+
+ TRACE_((SNAME(sess), "STUN session %p destroy request, ref_cnt=%d",
+ sess, pj_grp_lock_get_ref(sess->grp_lock)));
+
+ pj_grp_lock_acquire(sess->grp_lock);
+
+ if (sess->is_destroying) {
+ /* Prevent from decrementing the ref counter more than once */
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
+ sess->is_destroying = PJ_TRUE;
+
+ /* We need to stop transactions and cached response because they are
+ * holding the group lock's reference counter while retransmitting.
+ */
+ tdata = sess->pending_request_list.next;
+ while (tdata != &sess->pending_request_list) {
+ if (tdata->client_tsx)
+ pj_stun_client_tsx_stop(tdata->client_tsx);
+ tdata = tdata->next;
+ }
+
+ tdata = sess->cached_response_list.next;
+ while (tdata != &sess->cached_response_list) {
+ pj_timer_heap_cancel_if_active(tdata->sess->cfg->timer_heap,
+ &tdata->res_timer, PJ_FALSE);
+ tdata = tdata->next;
+ }
+
+ pj_grp_lock_dec_ref(sess->grp_lock);
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -572,9 +609,9 @@ PJ_DEF(pj_status_t) pj_stun_session_set_user_data( pj_stun_session *sess,
void *user_data)
{
PJ_ASSERT_RETURN(sess, PJ_EINVAL);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
sess->user_data = user_data;
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -584,35 +621,16 @@ PJ_DEF(void*) pj_stun_session_get_user_data(pj_stun_session *sess)
return sess->user_data;
}
-PJ_DEF(pj_status_t) pj_stun_session_set_lock( pj_stun_session *sess,
- pj_lock_t *lock,
- pj_bool_t auto_del)
-{
- pj_lock_t *old_lock = sess->lock;
- pj_bool_t old_del;
-
- PJ_ASSERT_RETURN(sess && lock, PJ_EINVAL);
-
- pj_lock_acquire(old_lock);
- sess->lock = lock;
- old_del = sess->delete_lock;
- sess->delete_lock = auto_del;
- pj_lock_release(old_lock);
-
- if (old_lock)
- pj_lock_destroy(old_lock);
-
- return PJ_SUCCESS;
-}
-
PJ_DEF(pj_status_t) pj_stun_session_set_software_name(pj_stun_session *sess,
const pj_str_t *sw)
{
PJ_ASSERT_RETURN(sess, PJ_EINVAL);
+ pj_grp_lock_acquire(sess->grp_lock);
if (sw && sw->slen)
pj_strdup(sess->pool, &sess->srv_name, sw);
else
sess->srv_name.slen = 0;
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -622,6 +640,7 @@ PJ_DEF(pj_status_t) pj_stun_session_set_credential(pj_stun_session *sess,
{
PJ_ASSERT_RETURN(sess, PJ_EINVAL);
+ pj_grp_lock_acquire(sess->grp_lock);
sess->auth_type = auth_type;
if (cred) {
pj_stun_auth_cred_dup(sess->pool, &sess->cred, cred);
@@ -629,6 +648,7 @@ PJ_DEF(pj_status_t) pj_stun_session_set_credential(pj_stun_session *sess,
sess->auth_type = PJ_STUN_AUTH_NONE;
pj_bzero(&sess->cred, sizeof(sess->cred));
}
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -705,17 +725,21 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess,
PJ_ASSERT_RETURN(sess && p_tdata, PJ_EINVAL);
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
status = create_tdata(sess, &tdata);
if (status != PJ_SUCCESS)
- return status;
+ goto on_error;
/* Create STUN message */
status = pj_stun_msg_create(tdata->pool, method, magic,
tsx_id, &tdata->msg);
- if (status != PJ_SUCCESS) {
- pj_pool_release(tdata->pool);
- return status;
- }
+ if (status != PJ_SUCCESS)
+ goto on_error;
/* copy the request's transaction ID as the transaction key. */
pj_assert(sizeof(tdata->msg_key)==sizeof(tdata->msg->hdr.tsx_id));
@@ -731,10 +755,8 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess,
} else if (sess->auth_type == PJ_STUN_AUTH_SHORT_TERM) {
/* MUST put authentication in request */
status = get_auth(sess, tdata);
- if (status != PJ_SUCCESS) {
- pj_pool_release(tdata->pool);
- return status;
- }
+ if (status != PJ_SUCCESS)
+ goto on_error;
} else if (sess->auth_type == PJ_STUN_AUTH_LONG_TERM) {
/* Only put authentication information if we've received
@@ -742,22 +764,27 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess,
*/
if (sess->next_nonce.slen != 0) {
status = get_auth(sess, tdata);
- if (status != PJ_SUCCESS) {
- pj_pool_release(tdata->pool);
- return status;
- }
+ if (status != PJ_SUCCESS)
+ goto on_error;
tdata->auth_info.nonce = sess->next_nonce;
tdata->auth_info.realm = sess->server_realm;
}
} else {
pj_assert(!"Invalid authentication type");
- pj_pool_release(tdata->pool);
- return PJ_EBUG;
+ status = PJ_EBUG;
+ goto on_error;
}
*p_tdata = tdata;
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
+
+on_error:
+ if (tdata)
+ pj_pool_release(tdata->pool);
+ pj_grp_lock_release(sess->grp_lock);
+ return status;
}
PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess,
@@ -769,9 +796,17 @@ PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess,
PJ_ASSERT_RETURN(sess && p_tdata, PJ_EINVAL);
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
status = create_tdata(sess, &tdata);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
+ pj_grp_lock_release(sess->grp_lock);
return status;
+ }
/* Create STUN message */
msg_type |= PJ_STUN_INDICATION_BIT;
@@ -779,10 +814,13 @@ PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess,
NULL, &tdata->msg);
if (status != PJ_SUCCESS) {
pj_pool_release(tdata->pool);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
*p_tdata = tdata;
+
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -798,15 +836,24 @@ PJ_DEF(pj_status_t) pj_stun_session_create_res( pj_stun_session *sess,
pj_status_t status;
pj_stun_tx_data *tdata = NULL;
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
status = create_tdata(sess, &tdata);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
+ pj_grp_lock_release(sess->grp_lock);
return status;
+ }
/* Create STUN response message */
status = pj_stun_msg_create_response(tdata->pool, rdata->msg,
err_code, err_msg, &tdata->msg);
if (status != PJ_SUCCESS) {
pj_pool_release(tdata->pool);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -821,6 +868,8 @@ PJ_DEF(pj_status_t) pj_stun_session_create_res( pj_stun_session *sess,
*p_tdata = tdata;
+ pj_grp_lock_release(sess->grp_lock);
+
return PJ_SUCCESS;
}
@@ -867,6 +916,13 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess,
PJ_ASSERT_RETURN(sess && addr_len && server && tdata, PJ_EINVAL);
+ /* Lock the session and prevent user from destroying us in the callback */
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
pj_log_push_indent();
/* Allocate packet */
@@ -876,10 +932,6 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess,
tdata->token = token;
tdata->retransmit = retransmit;
- /* Lock the session and prevent user from destroying us in the callback */
- pj_atomic_inc(sess->busy);
- pj_lock_acquire(sess->lock);
-
/* Apply options */
status = apply_msg_options(sess, tdata->pool, &tdata->auth_info,
tdata->msg);
@@ -909,7 +961,8 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess,
if (PJ_STUN_IS_REQUEST(tdata->msg->hdr.type)) {
/* Create STUN client transaction */
- status = pj_stun_client_tsx_create(sess->cfg, tdata->pool,
+ status = pj_stun_client_tsx_create(sess->cfg, tdata->pool,
+ sess->grp_lock,
&tsx_cb, &tdata->client_tsx);
PJ_ASSERT_RETURN(status==PJ_SUCCESS, status);
pj_stun_client_tsx_set_data(tdata->client_tsx, (void*)tdata);
@@ -939,17 +992,17 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess,
pj_time_val timeout;
pj_memset(&tdata->res_timer, 0, sizeof(tdata->res_timer));
- pj_timer_entry_init(&tdata->res_timer, PJ_TRUE, tdata,
+ pj_timer_entry_init(&tdata->res_timer, PJ_FALSE, tdata,
&on_cache_timeout);
timeout.sec = sess->cfg->res_cache_msec / 1000;
timeout.msec = sess->cfg->res_cache_msec % 1000;
- status = pj_timer_heap_schedule(sess->cfg->timer_heap,
- &tdata->res_timer,
- &timeout);
+ status = pj_timer_heap_schedule_w_grp_lock(sess->cfg->timer_heap,
+ &tdata->res_timer,
+ &timeout, PJ_TRUE,
+ sess->grp_lock);
if (status != PJ_SUCCESS) {
- tdata->res_timer.id = PJ_FALSE;
pj_stun_msg_destroy_tdata(sess, tdata);
LOG_ERR_(sess, "Error scheduling response timer", status);
goto on_return;
@@ -975,15 +1028,10 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess,
}
on_return:
- pj_lock_release(sess->lock);
-
pj_log_pop_indent();
- /* Check if application has called destroy() in the callback */
- if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) {
- pj_stun_session_destroy(sess);
- return PJNATH_ESTUNDESTROYED;
- }
+ if (pj_grp_lock_release(sess->grp_lock))
+ return PJ_EGONE;
return status;
}
@@ -1005,14 +1053,25 @@ PJ_DEF(pj_status_t) pj_stun_session_respond( pj_stun_session *sess,
pj_str_t reason;
pj_stun_tx_data *tdata;
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
status = pj_stun_session_create_res(sess, rdata, code,
(errmsg?pj_cstr(&reason,errmsg):NULL),
&tdata);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
+ pj_grp_lock_release(sess->grp_lock);
return status;
+ }
- return pj_stun_session_send_msg(sess, token, cache, PJ_FALSE,
- dst_addr, addr_len, tdata);
+ status = pj_stun_session_send_msg(sess, token, cache, PJ_FALSE,
+ dst_addr, addr_len, tdata);
+
+ pj_grp_lock_release(sess->grp_lock);
+ return status;
}
@@ -1029,8 +1088,11 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess,
PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL);
/* Lock the session and prevent user from destroying us in the callback */
- pj_atomic_inc(sess->busy);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
if (notify) {
(sess->cb.on_request_complete)(sess, notify_status, tdata->token,
@@ -1040,12 +1102,7 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess,
/* Just destroy tdata. This will destroy the transaction as well */
pj_stun_msg_destroy_tdata(sess, tdata);
- pj_lock_release(sess->lock);
-
- if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) {
- pj_stun_session_destroy(sess);
- return PJNATH_ESTUNDESTROYED;
- }
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -1063,17 +1120,15 @@ PJ_DEF(pj_status_t) pj_stun_session_retransmit_req(pj_stun_session *sess,
PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL);
/* Lock the session and prevent user from destroying us in the callback */
- pj_atomic_inc(sess->busy);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
status = pj_stun_client_tsx_retransmit(tdata->client_tsx, mod_count);
- pj_lock_release(sess->lock);
-
- if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) {
- pj_stun_session_destroy(sess);
- return PJNATH_ESTUNDESTROYED;
- }
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -1361,11 +1416,15 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess,
PJ_ASSERT_RETURN(sess && packet && pkt_size, PJ_EINVAL);
- pj_log_push_indent();
-
/* Lock the session and prevent user from destroying us in the callback */
- pj_atomic_inc(sess->busy);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
+
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
+ pj_log_push_indent();
/* Reset pool */
pj_pool_reset(sess->rx_pool);
@@ -1418,17 +1477,10 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess,
}
on_return:
- pj_lock_release(sess->lock);
-
pj_log_pop_indent();
- /* If we've received destroy request while we're on the callback,
- * destroy the session now.
- */
- if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) {
- pj_stun_session_destroy(sess);
- return PJNATH_ESTUNDESTROYED;
- }
+ if (pj_grp_lock_release(sess->grp_lock))
+ return PJ_EGONE;
return status;
}
diff --git a/pjnath/src/pjnath/stun_sock.c b/pjnath/src/pjnath/stun_sock.c
index 3f756f50..bc395411 100644
--- a/pjnath/src/pjnath/stun_sock.c
+++ b/pjnath/src/pjnath/stun_sock.c
@@ -28,9 +28,15 @@
#include <pj/assert.h>
#include <pj/ip_helper.h>
#include <pj/log.h>
+#include <pj/os.h>
#include <pj/pool.h>
#include <pj/rand.h>
+#if 1
+# define TRACE_(x) PJ_LOG(5,x)
+#else
+# define TRACE_(x)
+#endif
enum { MAX_BIND_RETRY = 100 };
@@ -39,7 +45,7 @@ struct pj_stun_sock
char *obj_name; /* Log identification */
pj_pool_t *pool; /* Pool */
void *user_data; /* Application user data */
-
+ pj_bool_t is_destroying; /* Destroy already called */
int af; /* Address family */
pj_stun_config stun_cfg; /* STUN config (ioqueue etc)*/
pj_stun_sock_cb cb; /* Application callbacks */
@@ -58,13 +64,16 @@ struct pj_stun_sock
pj_uint16_t tsx_id[6]; /* .. to match STUN msg */
pj_stun_session *stun_sess; /* STUN session */
-
+ pj_grp_lock_t *grp_lock; /* Session group lock */
};
/*
* Prototypes for static functions
*/
+/* Destructor for group lock */
+static void stun_sock_destructor(void *obj);
+
/* This callback is called by the STUN session to send packet */
static pj_status_t sess_on_send_msg(pj_stun_session *sess,
void *token,
@@ -202,6 +211,20 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg,
if (stun_sock->ka_interval == 0)
stun_sock->ka_interval = PJ_STUN_KEEP_ALIVE_SEC;
+ if (cfg && cfg->grp_lock) {
+ stun_sock->grp_lock = cfg->grp_lock;
+ } else {
+ status = pj_grp_lock_create(pool, NULL, &stun_sock->grp_lock);
+ if (status != PJ_SUCCESS) {
+ pj_pool_release(pool);
+ return status;
+ }
+ }
+
+ pj_grp_lock_add_ref(stun_sock->grp_lock);
+ pj_grp_lock_add_handler(stun_sock->grp_lock, pool, stun_sock,
+ &stun_sock_destructor);
+
/* Create socket and bind socket */
status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &stun_sock->sock_fd);
if (status != PJ_SUCCESS)
@@ -252,6 +275,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg,
pj_activesock_cb activesock_cb;
pj_activesock_cfg_default(&activesock_cfg);
+ activesock_cfg.grp_lock = stun_sock->grp_lock;
activesock_cfg.async_cnt = cfg->async_cnt;
activesock_cfg.concurrency = 0;
@@ -290,6 +314,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg,
status = pj_stun_session_create(&stun_sock->stun_cfg,
stun_sock->obj_name,
&sess_cb, PJ_FALSE,
+ stun_sock->grp_lock,
&stun_sock->stun_sess);
if (status != PJ_SUCCESS)
goto on_error;
@@ -332,6 +357,8 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock,
PJ_ASSERT_RETURN(stun_sock && domain && default_port, PJ_EINVAL);
+ pj_grp_lock_acquire(stun_sock->grp_lock);
+
/* Check whether the domain contains IP address */
stun_sock->srv_addr.addr.sa_family = (pj_uint16_t)stun_sock->af;
status = pj_inet_pton(stun_sock->af, domain,
@@ -360,7 +387,6 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock,
&stun_sock->q);
/* Processing will resume when the DNS SRV callback is called */
- return status;
} else {
@@ -378,53 +404,70 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock,
pj_sockaddr_set_port(&stun_sock->srv_addr, (pj_uint16_t)default_port);
/* Start sending Binding request */
- return get_mapped_addr(stun_sock);
+ status = get_mapped_addr(stun_sock);
}
+
+ pj_grp_lock_release(stun_sock->grp_lock);
+ return status;
}
-/* Destroy */
-PJ_DEF(pj_status_t) pj_stun_sock_destroy(pj_stun_sock *stun_sock)
+/* Destructor */
+static void stun_sock_destructor(void *obj)
{
+ pj_stun_sock *stun_sock = (pj_stun_sock*)obj;
+
if (stun_sock->q) {
pj_dns_srv_cancel_query(stun_sock->q, PJ_FALSE);
stun_sock->q = NULL;
}
+ /*
if (stun_sock->stun_sess) {
- pj_stun_session_set_user_data(stun_sock->stun_sess, NULL);
+ pj_stun_session_destroy(stun_sock->stun_sess);
+ stun_sock->stun_sess = NULL;
}
-
- /* Destroy the active socket first just in case we'll get
- * stray callback.
- */
+ */
+
+ if (stun_sock->pool) {
+ pj_pool_t *pool = stun_sock->pool;
+ stun_sock->pool = NULL;
+ pj_pool_release(pool);
+ }
+
+ TRACE_(("", "STUN sock %p destroyed", stun_sock));
+
+}
+
+/* Destroy */
+PJ_DEF(pj_status_t) pj_stun_sock_destroy(pj_stun_sock *stun_sock)
+{
+ TRACE_((stun_sock->obj_name, "STUN sock %p request, ref_cnt=%d",
+ stun_sock, pj_grp_lock_get_ref(stun_sock->grp_lock)));
+
+ pj_grp_lock_acquire(stun_sock->grp_lock);
+ if (stun_sock->is_destroying) {
+ /* Destroy already called */
+ pj_grp_lock_release(stun_sock->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
+ stun_sock->is_destroying = PJ_TRUE;
+ pj_timer_heap_cancel_if_active(stun_sock->stun_cfg.timer_heap,
+ &stun_sock->ka_timer, 0);
+
if (stun_sock->active_sock != NULL) {
- pj_activesock_t *asock = stun_sock->active_sock;
- stun_sock->active_sock = NULL;
stun_sock->sock_fd = PJ_INVALID_SOCKET;
- pj_activesock_set_user_data(asock, NULL);
- pj_activesock_close(asock);
+ pj_activesock_close(stun_sock->active_sock);
} else if (stun_sock->sock_fd != PJ_INVALID_SOCKET) {
pj_sock_close(stun_sock->sock_fd);
stun_sock->sock_fd = PJ_INVALID_SOCKET;
}
- if (stun_sock->ka_timer.id != 0) {
- pj_timer_heap_cancel(stun_sock->stun_cfg.timer_heap,
- &stun_sock->ka_timer);
- stun_sock->ka_timer.id = 0;
- }
-
if (stun_sock->stun_sess) {
pj_stun_session_destroy(stun_sock->stun_sess);
- stun_sock->stun_sess = NULL;
}
-
- if (stun_sock->pool) {
- pj_pool_t *pool = stun_sock->pool;
- stun_sock->pool = NULL;
- pj_pool_release(pool);
- }
-
+ pj_grp_lock_dec_ref(stun_sock->grp_lock);
+ pj_grp_lock_release(stun_sock->grp_lock);
return PJ_SUCCESS;
}
@@ -468,12 +511,15 @@ static void dns_srv_resolver_cb(void *user_data,
{
pj_stun_sock *stun_sock = (pj_stun_sock*) user_data;
+ pj_grp_lock_acquire(stun_sock->grp_lock);
+
/* Clear query */
stun_sock->q = NULL;
/* Handle error */
if (status != PJ_SUCCESS) {
sess_fail(stun_sock, PJ_STUN_SOCK_DNS_OP, status);
+ pj_grp_lock_release(stun_sock->grp_lock);
return;
}
@@ -490,6 +536,8 @@ static void dns_srv_resolver_cb(void *user_data,
/* Start sending Binding request */
get_mapped_addr(stun_sock);
+
+ pj_grp_lock_release(stun_sock->grp_lock);
}
@@ -533,6 +581,8 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock,
PJ_ASSERT_RETURN(stun_sock && info, PJ_EINVAL);
+ pj_grp_lock_acquire(stun_sock->grp_lock);
+
/* Copy STUN server address and mapped address */
pj_memcpy(&info->srv_addr, &stun_sock->srv_addr,
sizeof(pj_sockaddr));
@@ -543,8 +593,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock,
addr_len = sizeof(info->bound_addr);
status = pj_sock_getsockname(stun_sock->sock_fd, &info->bound_addr,
&addr_len);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
+ pj_grp_lock_release(stun_sock->grp_lock);
return status;
+ }
/* If socket is bound to a specific interface, then only put that
* interface in the alias list. Otherwise query all the interfaces
@@ -560,8 +612,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock,
/* Get the default address */
status = pj_gethostip(stun_sock->af, &def_addr);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
+ pj_grp_lock_release(stun_sock->grp_lock);
return status;
+ }
pj_sockaddr_set_port(&def_addr, port);
@@ -569,8 +623,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock,
info->alias_cnt = PJ_ARRAY_SIZE(info->aliases);
status = pj_enum_ip_interface(stun_sock->af, &info->alias_cnt,
info->aliases);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
+ pj_grp_lock_release(stun_sock->grp_lock);
return status;
+ }
/* Set the port number for each address.
*/
@@ -590,6 +646,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock,
}
}
+ pj_grp_lock_release(stun_sock->grp_lock);
return PJ_SUCCESS;
}
@@ -603,14 +660,29 @@ PJ_DEF(pj_status_t) pj_stun_sock_sendto( pj_stun_sock *stun_sock,
unsigned addr_len)
{
pj_ssize_t size;
+ pj_status_t status;
+
PJ_ASSERT_RETURN(stun_sock && pkt && dst_addr && addr_len, PJ_EINVAL);
+ pj_grp_lock_acquire(stun_sock->grp_lock);
+
+ if (!stun_sock->active_sock) {
+ /* We have been shutdown, but this callback may still get called
+ * by retransmit timer.
+ */
+ pj_grp_lock_release(stun_sock->grp_lock);
+ return PJ_EINVALIDOP;
+ }
+
if (send_key==NULL)
send_key = &stun_sock->send_key;
size = pkt_len;
- return pj_activesock_sendto(stun_sock->active_sock, send_key,
- pkt, &size, flag, dst_addr, addr_len);
+ status = pj_activesock_sendto(stun_sock->active_sock, send_key,
+ pkt, &size, flag, dst_addr, addr_len);
+
+ pj_grp_lock_release(stun_sock->grp_lock);
+ return status;
}
/* This callback is called by the STUN session to send packet */
@@ -625,14 +697,18 @@ static pj_status_t sess_on_send_msg(pj_stun_session *sess,
pj_ssize_t size;
stun_sock = (pj_stun_sock *) pj_stun_session_get_user_data(sess);
- if (!stun_sock || !stun_sock->active_sock)
+ if (!stun_sock || !stun_sock->active_sock) {
+ /* We have been shutdown, but this callback may still get called
+ * by retransmit timer.
+ */
return PJ_EINVALIDOP;
+ }
pj_assert(token==INTERNAL_MSG_TOKEN);
PJ_UNUSED_ARG(token);
size = pkt_size;
- return pj_activesock_sendto(stun_sock->active_sock,
+ return pj_activesock_sendto(stun_sock->active_sock,
&stun_sock->int_send_key,
pkt, &size, 0, dst_addr, addr_len);
}
@@ -726,25 +802,20 @@ on_return:
/* Schedule keep-alive timer */
static void start_ka_timer(pj_stun_sock *stun_sock)
{
- if (stun_sock->ka_timer.id != 0) {
- pj_timer_heap_cancel(stun_sock->stun_cfg.timer_heap,
- &stun_sock->ka_timer);
- stun_sock->ka_timer.id = 0;
- }
+ pj_timer_heap_cancel_if_active(stun_sock->stun_cfg.timer_heap,
+ &stun_sock->ka_timer, 0);
pj_assert(stun_sock->ka_interval != 0);
- if (stun_sock->ka_interval > 0) {
+ if (stun_sock->ka_interval > 0 && !stun_sock->is_destroying) {
pj_time_val delay;
delay.sec = stun_sock->ka_interval;
delay.msec = 0;
- if (pj_timer_heap_schedule(stun_sock->stun_cfg.timer_heap,
- &stun_sock->ka_timer,
- &delay) == PJ_SUCCESS)
- {
- stun_sock->ka_timer.id = PJ_TRUE;
- }
+ pj_timer_heap_schedule_w_grp_lock(stun_sock->stun_cfg.timer_heap,
+ &stun_sock->ka_timer,
+ &delay, PJ_TRUE,
+ stun_sock->grp_lock);
}
}
@@ -756,14 +827,18 @@ static void ka_timer_cb(pj_timer_heap_t *th, pj_timer_entry *te)
stun_sock = (pj_stun_sock *) te->user_data;
PJ_UNUSED_ARG(th);
+ pj_grp_lock_acquire(stun_sock->grp_lock);
/* Time to send STUN Binding request */
- if (get_mapped_addr(stun_sock) != PJ_SUCCESS)
+ if (get_mapped_addr(stun_sock) != PJ_SUCCESS) {
+ pj_grp_lock_release(stun_sock->grp_lock);
return;
+ }
/* Next keep-alive timer will be scheduled once the request
* is complete.
*/
+ pj_grp_lock_release(stun_sock->grp_lock);
}
/* Callback from active socket when incoming packet is received */
@@ -788,6 +863,8 @@ static pj_bool_t on_data_recvfrom(pj_activesock_t *asock,
return PJ_TRUE;
}
+ pj_grp_lock_acquire(stun_sock->grp_lock);
+
/* Check that this is STUN message */
status = pj_stun_msg_check((const pj_uint8_t*)data, size,
PJ_STUN_IS_DATAGRAM | PJ_STUN_CHECK_PACKET);
@@ -823,7 +900,10 @@ static pj_bool_t on_data_recvfrom(pj_activesock_t *asock,
status = pj_stun_session_on_rx_pkt(stun_sock->stun_sess, data, size,
PJ_STUN_IS_DATAGRAM, NULL, NULL,
src_addr, addr_len);
- return status!=PJNATH_ESTUNDESTROYED ? PJ_TRUE : PJ_FALSE;
+
+ status = pj_grp_lock_release(stun_sock->grp_lock);
+
+ return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE;
process_app_data:
if (stun_sock->cb.on_rx_data) {
@@ -831,10 +911,12 @@ process_app_data:
ret = (*stun_sock->cb.on_rx_data)(stun_sock, data, size,
src_addr, addr_len);
- return ret;
+ status = pj_grp_lock_release(stun_sock->grp_lock);
+ return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE;
}
- return PJ_TRUE;
+ status = pj_grp_lock_release(stun_sock->grp_lock);
+ return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE;
}
/* Callback from active socket about send status */
@@ -857,6 +939,8 @@ static pj_bool_t on_data_sent(pj_activesock_t *asock,
if (stun_sock->cb.on_data_sent) {
pj_bool_t ret;
+ pj_grp_lock_acquire(stun_sock->grp_lock);
+
/* If app gives NULL send_key in sendto() function, then give
* NULL in the callback too
*/
@@ -866,6 +950,7 @@ static pj_bool_t on_data_sent(pj_activesock_t *asock,
/* Call callback */
ret = (*stun_sock->cb.on_data_sent)(stun_sock, send_key, sent);
+ pj_grp_lock_release(stun_sock->grp_lock);
return ret;
}
diff --git a/pjnath/src/pjnath/stun_transaction.c b/pjnath/src/pjnath/stun_transaction.c
index 8677d322..390f67f7 100644
--- a/pjnath/src/pjnath/stun_transaction.c
+++ b/pjnath/src/pjnath/stun_transaction.c
@@ -26,6 +26,8 @@
#include <pj/timer.h>
+#define THIS_FILE "stun_transaction.c"
+#define TIMER_INACTIVE 0
#define TIMER_ACTIVE 1
@@ -34,6 +36,7 @@ struct pj_stun_client_tsx
char obj_name[PJ_MAX_OBJ_NAME];
pj_stun_tsx_cb cb;
void *user_data;
+ pj_grp_lock_t *grp_lock;
pj_bool_t complete;
@@ -51,18 +54,24 @@ struct pj_stun_client_tsx
};
+#if 1
+# define TRACE_(expr) PJ_LOG(5,expr)
+#else
+# define TRACE_(expr)
+#endif
+
+
static void retransmit_timer_callback(pj_timer_heap_t *timer_heap,
pj_timer_entry *timer);
static void destroy_timer_callback(pj_timer_heap_t *timer_heap,
pj_timer_entry *timer);
-#define stun_perror(tsx,msg,rc) pjnath_perror(tsx->obj_name, msg, rc)
-
/*
* Create a STUN client transaction.
*/
PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg,
pj_pool_t *pool,
+ pj_grp_lock_t *grp_lock,
const pj_stun_tsx_cb *cb,
pj_stun_client_tsx **p_tsx)
{
@@ -74,6 +83,7 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg,
tsx = PJ_POOL_ZALLOC_T(pool, pj_stun_client_tsx);
tsx->rto_msec = cfg->rto_msec;
tsx->timer_heap = cfg->timer_heap;
+ tsx->grp_lock = grp_lock;
pj_memcpy(&tsx->cb, cb, sizeof(*cb));
tsx->retransmit_timer.cb = &retransmit_timer_callback;
@@ -82,7 +92,7 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg,
tsx->destroy_timer.cb = &destroy_timer_callback;
tsx->destroy_timer.user_data = tsx;
- pj_ansi_snprintf(tsx->obj_name, sizeof(tsx->obj_name), "stuntsx%p", tsx);
+ pj_ansi_snprintf(tsx->obj_name, sizeof(tsx->obj_name), "utsx%p", tsx);
*p_tsx = tsx;
@@ -100,26 +110,30 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_schedule_destroy(
PJ_ASSERT_RETURN(tsx && delay, PJ_EINVAL);
PJ_ASSERT_RETURN(tsx->cb.on_destroy, PJ_EINVAL);
+ pj_grp_lock_acquire(tsx->grp_lock);
+
/* Cancel previously registered timer */
- if (tsx->destroy_timer.id != 0) {
- pj_timer_heap_cancel(tsx->timer_heap, &tsx->destroy_timer);
- tsx->destroy_timer.id = 0;
- }
+ pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->destroy_timer,
+ TIMER_INACTIVE);
/* Stop retransmission, just in case */
- if (tsx->retransmit_timer.id != 0) {
- pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer);
- tsx->retransmit_timer.id = 0;
- }
+ pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer,
+ TIMER_INACTIVE);
- status = pj_timer_heap_schedule(tsx->timer_heap,
- &tsx->destroy_timer, delay);
- if (status != PJ_SUCCESS)
+ status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap,
+ &tsx->destroy_timer, delay,
+ TIMER_ACTIVE, tsx->grp_lock);
+ if (status != PJ_SUCCESS) {
+ pj_grp_lock_release(tsx->grp_lock);
return status;
+ }
- tsx->destroy_timer.id = TIMER_ACTIVE;
tsx->cb.on_complete = NULL;
+ pj_grp_lock_release(tsx->grp_lock);
+
+ TRACE_((tsx->obj_name, "STUN transaction %p schedule destroy", tsx));
+
return PJ_SUCCESS;
}
@@ -127,20 +141,21 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_schedule_destroy(
/*
* Destroy transaction immediately.
*/
-PJ_DEF(pj_status_t) pj_stun_client_tsx_destroy(pj_stun_client_tsx *tsx)
+PJ_DEF(pj_status_t) pj_stun_client_tsx_stop(pj_stun_client_tsx *tsx)
{
PJ_ASSERT_RETURN(tsx, PJ_EINVAL);
- if (tsx->retransmit_timer.id != 0) {
- pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer);
- tsx->retransmit_timer.id = 0;
- }
- if (tsx->destroy_timer.id != 0) {
- pj_timer_heap_cancel(tsx->timer_heap, &tsx->destroy_timer);
- tsx->destroy_timer.id = 0;
- }
+ /* Don't call grp_lock_acquire() because we might be called on
+ * group lock's destructor.
+ */
+ pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer,
+ TIMER_INACTIVE);
+ pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->destroy_timer,
+ TIMER_INACTIVE);
+
+ PJ_LOG(5,(tsx->obj_name, "STUN client transaction %p stopped, ref_cnt=%d",
+ tsx, pj_grp_lock_get_ref(tsx->grp_lock)));
- PJ_LOG(5,(tsx->obj_name, "STUN client transaction destroyed"));
return PJ_SUCCESS;
}
@@ -185,7 +200,7 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx,
{
pj_status_t status;
- PJ_ASSERT_RETURN(tsx->retransmit_timer.id == 0 ||
+ PJ_ASSERT_RETURN(tsx->retransmit_timer.id == TIMER_INACTIVE ||
!tsx->require_retransmit, PJ_EBUSY);
if (tsx->require_retransmit && mod_count) {
@@ -211,14 +226,15 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx,
* cancel it (as opposed to when schedule_timer() failed we cannot
* cancel transmission).
*/;
- status = pj_timer_heap_schedule(tsx->timer_heap,
- &tsx->retransmit_timer,
- &tsx->retransmit_time);
+ status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap,
+ &tsx->retransmit_timer,
+ &tsx->retransmit_time,
+ TIMER_ACTIVE,
+ tsx->grp_lock);
if (status != PJ_SUCCESS) {
- tsx->retransmit_timer.id = 0;
+ tsx->retransmit_timer.id = TIMER_INACTIVE;
return status;
}
- tsx->retransmit_timer.id = TIMER_ACTIVE;
}
@@ -235,12 +251,12 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx,
if (status == PJNATH_ESTUNDESTROYED) {
/* We've been destroyed, don't access the object. */
} else if (status != PJ_SUCCESS) {
- if (tsx->retransmit_timer.id != 0 && mod_count) {
- pj_timer_heap_cancel(tsx->timer_heap,
- &tsx->retransmit_timer);
- tsx->retransmit_timer.id = 0;
+ if (mod_count) {
+ pj_timer_heap_cancel_if_active( tsx->timer_heap,
+ &tsx->retransmit_timer,
+ TIMER_INACTIVE);
}
- stun_perror(tsx, "STUN error sending message", status);
+ PJ_PERROR(4, (tsx->obj_name, status, "STUN error sending message"));
}
pj_log_pop_indent();
@@ -261,6 +277,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_send_msg(pj_stun_client_tsx *tsx,
PJ_ASSERT_RETURN(tsx && pkt && pkt_len, PJ_EINVAL);
PJ_ASSERT_RETURN(tsx->retransmit_timer.id == 0, PJ_EBUSY);
+ pj_grp_lock_acquire(tsx->grp_lock);
+
/* Encode message */
tsx->last_pkt = pkt;
tsx->last_pkt_size = pkt_len;
@@ -286,27 +304,29 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_send_msg(pj_stun_client_tsx *tsx,
* cancel it (as opposed to when schedule_timer() failed we cannot
* cancel transmission).
*/;
- status = pj_timer_heap_schedule(tsx->timer_heap,
- &tsx->retransmit_timer,
- &tsx->retransmit_time);
+ status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap,
+ &tsx->retransmit_timer,
+ &tsx->retransmit_time,
+ TIMER_ACTIVE,
+ tsx->grp_lock);
if (status != PJ_SUCCESS) {
- tsx->retransmit_timer.id = 0;
+ tsx->retransmit_timer.id = TIMER_INACTIVE;
+ pj_grp_lock_release(tsx->grp_lock);
return status;
}
- tsx->retransmit_timer.id = TIMER_ACTIVE;
}
/* Send the message */
status = tsx_transmit_msg(tsx, PJ_TRUE);
if (status != PJ_SUCCESS) {
- if (tsx->retransmit_timer.id != 0) {
- pj_timer_heap_cancel(tsx->timer_heap,
- &tsx->retransmit_timer);
- tsx->retransmit_timer.id = 0;
- }
+ pj_timer_heap_cancel_if_active(tsx->timer_heap,
+ &tsx->retransmit_timer,
+ TIMER_INACTIVE);
+ pj_grp_lock_release(tsx->grp_lock);
return status;
}
+ pj_grp_lock_release(tsx->grp_lock);
return PJ_SUCCESS;
}
@@ -319,6 +339,7 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap,
pj_status_t status;
PJ_UNUSED_ARG(timer_heap);
+ pj_grp_lock_acquire(tsx->grp_lock);
if (tsx->transmit_count >= PJ_STUN_MAX_TRANSMIT_COUNT) {
/* Retransmission count exceeded. Transaction has failed */
@@ -331,6 +352,7 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap,
tsx->cb.on_complete(tsx, PJNATH_ESTUNTIMEDOUT, NULL, NULL, 0);
}
}
+ pj_grp_lock_release(tsx->grp_lock);
/* We might have been destroyed, don't try to access the object */
pj_log_pop_indent();
return;
@@ -338,9 +360,7 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap,
tsx->retransmit_timer.id = 0;
status = tsx_transmit_msg(tsx, PJ_TRUE);
- if (status == PJNATH_ESTUNDESTROYED) {
- /* We've been destroyed, don't try to access the object */
- } else if (status != PJ_SUCCESS) {
+ if (status != PJ_SUCCESS) {
tsx->retransmit_timer.id = 0;
if (!tsx->complete) {
tsx->complete = PJ_TRUE;
@@ -348,8 +368,10 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap,
tsx->cb.on_complete(tsx, status, NULL, NULL, 0);
}
}
- /* We might have been destroyed, don't try to access the object */
}
+
+ pj_grp_lock_release(tsx->grp_lock);
+ /* We might have been destroyed, don't try to access the object */
}
/*
@@ -362,10 +384,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_retransmit(pj_stun_client_tsx *tsx,
return PJ_SUCCESS;
}
- if (tsx->retransmit_timer.id != 0) {
- pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer);
- tsx->retransmit_timer.id = 0;
- }
+ pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer,
+ TIMER_INACTIVE);
return tsx_transmit_msg(tsx, mod_count);
}
@@ -379,6 +399,7 @@ static void destroy_timer_callback(pj_timer_heap_t *timer_heap,
PJ_UNUSED_ARG(timer_heap);
tsx->destroy_timer.id = PJ_FALSE;
+
tsx->cb.on_destroy(tsx);
/* Don't access transaction after this */
}
@@ -408,10 +429,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_on_rx_msg(pj_stun_client_tsx *tsx,
/* We have a response with matching transaction ID.
* We can cancel retransmit timer now.
*/
- if (tsx->retransmit_timer.id) {
- pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer);
- tsx->retransmit_timer.id = 0;
- }
+ pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer,
+ TIMER_INACTIVE);
/* Find STUN error code attribute */
err_attr = (pj_stun_errcode_attr*)
diff --git a/pjnath/src/pjnath/turn_session.c b/pjnath/src/pjnath/turn_session.c
index ea89e304..e783912b 100644
--- a/pjnath/src/pjnath/turn_session.c
+++ b/pjnath/src/pjnath/turn_session.c
@@ -112,8 +112,9 @@ struct pj_turn_session
pj_turn_session_cb cb;
void *user_data;
pj_stun_config stun_cfg;
+ pj_bool_t is_destroying;
- pj_lock_t *lock;
+ pj_grp_lock_t *grp_lock;
int busy;
pj_turn_state_t state;
@@ -161,6 +162,7 @@ struct pj_turn_session
*/
static void sess_shutdown(pj_turn_session *sess,
pj_status_t status);
+static void turn_sess_on_destroy(void *comp);
static void do_destroy(pj_turn_session *sess);
static void send_refresh(pj_turn_session *sess, int lifetime);
static pj_status_t stun_on_send_msg(pj_stun_session *sess,
@@ -236,6 +238,7 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg,
const char *name,
int af,
pj_turn_tp_type conn_type,
+ pj_grp_lock_t *grp_lock,
const pj_turn_session_cb *cb,
unsigned options,
void *user_data,
@@ -244,7 +247,6 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg,
pj_pool_t *pool;
pj_turn_session *sess;
pj_stun_session_cb stun_cb;
- pj_lock_t *null_lock;
pj_status_t status;
PJ_ASSERT_RETURN(cfg && cfg->pf && cb && p_sess, PJ_EINVAL);
@@ -281,13 +283,20 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg,
sess->perm_table = pj_hash_create(pool, PJ_TURN_PERM_HTABLE_SIZE);
/* Session lock */
- status = pj_lock_create_recursive_mutex(pool, sess->obj_name,
- &sess->lock);
- if (status != PJ_SUCCESS) {
- do_destroy(sess);
- return status;
+ if (grp_lock) {
+ sess->grp_lock = grp_lock;
+ } else {
+ status = pj_grp_lock_create(pool, NULL, &sess->grp_lock);
+ if (status != PJ_SUCCESS) {
+ pj_pool_release(pool);
+ return status;
+ }
}
+ pj_grp_lock_add_ref(sess->grp_lock);
+ pj_grp_lock_add_handler(sess->grp_lock, pool, sess,
+ &turn_sess_on_destroy);
+
/* Timer */
pj_timer_entry_init(&sess->timer, TIMER_NONE, sess, &on_timer_event);
@@ -297,7 +306,7 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg,
stun_cb.on_request_complete = &stun_on_request_complete;
stun_cb.on_rx_indication = &stun_on_rx_indication;
status = pj_stun_session_create(&sess->stun_cfg, sess->obj_name, &stun_cb,
- PJ_FALSE, &sess->stun);
+ PJ_FALSE, sess->grp_lock, &sess->stun);
if (status != PJ_SUCCESS) {
do_destroy(sess);
return status;
@@ -306,16 +315,6 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg,
/* Attach ourself to STUN session */
pj_stun_session_set_user_data(sess->stun, sess);
- /* Replace mutex in STUN session with a NULL mutex, since access to
- * STUN session is serialized.
- */
- status = pj_lock_create_null_mutex(pool, name, &null_lock);
- if (status != PJ_SUCCESS) {
- do_destroy(sess);
- return status;
- }
- pj_stun_session_set_lock(sess->stun, null_lock, PJ_TRUE);
-
/* Done */
PJ_LOG(4,(sess->obj_name, "TURN client session created"));
@@ -325,32 +324,9 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg,
}
-/* Destroy */
-static void do_destroy(pj_turn_session *sess)
+static void turn_sess_on_destroy(void *comp)
{
- /* Lock session */
- if (sess->lock) {
- pj_lock_acquire(sess->lock);
- }
-
- /* Cancel pending timer, if any */
- if (sess->timer.id != TIMER_NONE) {
- pj_timer_heap_cancel(sess->timer_heap, &sess->timer);
- sess->timer.id = TIMER_NONE;
- }
-
- /* Destroy STUN session */
- if (sess->stun) {
- pj_stun_session_destroy(sess->stun);
- sess->stun = NULL;
- }
-
- /* Destroy lock */
- if (sess->lock) {
- pj_lock_release(sess->lock);
- pj_lock_destroy(sess->lock);
- sess->lock = NULL;
- }
+ pj_turn_session *sess = (pj_turn_session*) comp;
/* Destroy pool */
if (sess->pool) {
@@ -363,6 +339,26 @@ static void do_destroy(pj_turn_session *sess)
}
}
+/* Destroy */
+static void do_destroy(pj_turn_session *sess)
+{
+ PJ_LOG(4,(sess->obj_name, "TURN session destroy request, ref_cnt=%d",
+ pj_grp_lock_get_ref(sess->grp_lock)));
+
+ pj_grp_lock_acquire(sess->grp_lock);
+ if (sess->is_destroying) {
+ pj_grp_lock_release(sess->grp_lock);
+ return;
+ }
+
+ sess->is_destroying = PJ_TRUE;
+ pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer, TIMER_NONE);
+ pj_stun_session_destroy(sess->stun);
+
+ pj_grp_lock_dec_ref(sess->grp_lock);
+ pj_grp_lock_release(sess->grp_lock);
+}
+
/* Set session state */
static void set_state(pj_turn_session *sess, enum pj_turn_state_t state)
@@ -437,13 +433,11 @@ static void sess_shutdown(pj_turn_session *sess,
set_state(sess, PJ_TURN_STATE_DESTROYING);
- if (sess->timer.id != TIMER_NONE) {
- pj_timer_heap_cancel(sess->timer_heap, &sess->timer);
- sess->timer.id = TIMER_NONE;
- }
-
- sess->timer.id = TIMER_DESTROY;
- pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay);
+ pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer,
+ TIMER_NONE);
+ pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer,
+ &delay, TIMER_DESTROY,
+ sess->grp_lock);
}
}
@@ -455,11 +449,11 @@ PJ_DEF(pj_status_t) pj_turn_session_shutdown(pj_turn_session *sess)
{
PJ_ASSERT_RETURN(sess, PJ_EINVAL);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
sess_shutdown(sess, PJ_SUCCESS);
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -553,9 +547,9 @@ PJ_DEF(pj_status_t) pj_turn_session_set_software_name( pj_turn_session *sess,
{
pj_status_t status;
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
status = pj_stun_session_set_software_name(sess->stun, sw);
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -576,7 +570,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_server( pj_turn_session *sess,
PJ_ASSERT_RETURN(sess && domain, PJ_EINVAL);
PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_NULL, PJ_EINVALIDOP);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
/* See if "domain" contains just IP address */
tmp_addr.addr.sa_family = sess->af;
@@ -676,7 +670,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_server( pj_turn_session *sess,
}
on_return:
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -690,11 +684,11 @@ PJ_DEF(pj_status_t) pj_turn_session_set_credential(pj_turn_session *sess,
PJ_ASSERT_RETURN(sess && cred, PJ_EINVAL);
PJ_ASSERT_RETURN(sess->stun, PJ_EINVALIDOP);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
pj_stun_session_set_credential(sess->stun, PJ_STUN_AUTH_LONG_TERM, cred);
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -715,7 +709,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess,
sess->state<=PJ_TURN_STATE_RESOLVED,
PJ_EINVALIDOP);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
if (param && param != &sess->alloc_param)
pj_turn_alloc_param_copy(sess->pool, &sess->alloc_param, param);
@@ -726,7 +720,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess,
PJ_LOG(4,(sess->obj_name, "Pending ALLOCATE in state %s",
state_names[sess->state]));
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
}
@@ -738,7 +732,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess,
status = pj_stun_session_create_req(sess->stun, PJ_STUN_ALLOCATE_REQUEST,
PJ_STUN_MAGIC, NULL, &tdata);
if (status != PJ_SUCCESS) {
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -778,7 +772,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess,
set_state(sess, PJ_TURN_STATE_RESOLVED);
}
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -799,14 +793,14 @@ PJ_DEF(pj_status_t) pj_turn_session_set_perm( pj_turn_session *sess,
PJ_ASSERT_RETURN(sess && addr_cnt && addr, PJ_EINVAL);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
/* Create a bare CreatePermission request */
status = pj_stun_session_create_req(sess->stun,
PJ_STUN_CREATE_PERM_REQUEST,
PJ_STUN_MAGIC, NULL, &tdata);
if (status != PJ_SUCCESS) {
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -857,7 +851,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_perm( pj_turn_session *sess,
goto on_error;
}
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return PJ_SUCCESS;
on_error:
@@ -874,7 +868,7 @@ on_error:
if (perm->req_token == req_token)
invalidate_perm(sess, perm);
}
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -945,7 +939,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess,
}
/* Lock session now */
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
/* Lookup permission first */
perm = lookup_perm(sess, addr, pj_sockaddr_get_len(addr), PJ_FALSE);
@@ -960,7 +954,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess,
status = pj_turn_session_set_perm(sess, 1, (const pj_sockaddr*)addr,
0);
if (status != PJ_SUCCESS) {
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
}
@@ -1035,7 +1029,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess,
}
on_return:
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -1055,7 +1049,7 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess,
PJ_ASSERT_RETURN(sess && peer_adr && addr_len, PJ_EINVAL);
PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_READY, PJ_EINVALIDOP);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
/* Create blank ChannelBind request */
status = pj_stun_session_create_req(sess->stun,
@@ -1098,7 +1092,7 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess,
tdata);
on_return:
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -1121,7 +1115,7 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess,
*/
/* Start locking the session */
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
is_datagram = (sess->conn_type==PJ_TURN_TP_UDP);
@@ -1193,7 +1187,7 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess,
}
on_return:
- pj_lock_release(sess->lock);
+ pj_grp_lock_release(sess->grp_lock);
return status;
}
@@ -1385,20 +1379,22 @@ static void on_allocate_success(pj_turn_session *sess,
/* Cancel existing keep-alive timer, if any */
pj_assert(sess->timer.id != TIMER_DESTROY);
-
- if (sess->timer.id != TIMER_NONE) {
- pj_timer_heap_cancel(sess->timer_heap, &sess->timer);
- sess->timer.id = TIMER_NONE;
+ if (sess->timer.id == TIMER_KEEP_ALIVE) {
+ pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer,
+ TIMER_NONE);
}
/* Start keep-alive timer once allocation succeeds */
- timeout.sec = sess->ka_interval;
- timeout.msec = 0;
+ if (sess->state < PJ_TURN_STATE_DEALLOCATING) {
+ timeout.sec = sess->ka_interval;
+ timeout.msec = 0;
- sess->timer.id = TIMER_KEEP_ALIVE;
- pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &timeout);
+ pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer,
+ &timeout, TIMER_KEEP_ALIVE,
+ sess->grp_lock);
- set_state(sess, PJ_TURN_STATE_READY);
+ set_state(sess, PJ_TURN_STATE_READY);
+ }
}
/*
@@ -1948,7 +1944,7 @@ static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e)
PJ_UNUSED_ARG(th);
- pj_lock_acquire(sess->lock);
+ pj_grp_lock_acquire(sess->grp_lock);
eid = (enum timer_id_t) e->id;
e->id = TIMER_NONE;
@@ -2025,19 +2021,18 @@ static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e)
delay.sec = sess->ka_interval;
delay.msec = 0;
- sess->timer.id = TIMER_KEEP_ALIVE;
- pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay);
+ pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer,
+ &delay, TIMER_KEEP_ALIVE,
+ sess->grp_lock);
}
- pj_lock_release(sess->lock);
-
} else if (eid == TIMER_DESTROY) {
/* Time to destroy */
- pj_lock_release(sess->lock);
do_destroy(sess);
} else {
pj_assert(!"Unknown timer event");
- pj_lock_release(sess->lock);
}
+
+ pj_grp_lock_release(sess->grp_lock);
}
diff --git a/pjnath/src/pjnath/turn_sock.c b/pjnath/src/pjnath/turn_sock.c
index 08831a4c..6392428f 100644
--- a/pjnath/src/pjnath/turn_sock.c
+++ b/pjnath/src/pjnath/turn_sock.c
@@ -46,13 +46,13 @@ struct pj_turn_sock
pj_turn_sock_cb cb;
void *user_data;
- pj_lock_t *lock;
+ pj_bool_t is_destroying;
+ pj_grp_lock_t *grp_lock;
pj_turn_alloc_param alloc_param;
pj_stun_config cfg;
pj_turn_sock_cfg setting;
- pj_bool_t destroy_request;
pj_timer_entry timer;
int af;
@@ -93,6 +93,7 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock,
+static void turn_sock_on_destroy(void *comp);
static void destroy(pj_turn_sock *turn_sock);
static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e);
@@ -168,14 +169,21 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg,
pj_memcpy(&turn_sock->cb, cb, sizeof(*cb));
}
- /* Create lock */
- status = pj_lock_create_recursive_mutex(pool, turn_sock->obj_name,
- &turn_sock->lock);
- if (status != PJ_SUCCESS) {
- destroy(turn_sock);
- return status;
+ /* Session lock */
+ if (setting && setting->grp_lock) {
+ turn_sock->grp_lock = setting->grp_lock;
+ } else {
+ status = pj_grp_lock_create(pool, NULL, &turn_sock->grp_lock);
+ if (status != PJ_SUCCESS) {
+ pj_pool_release(pool);
+ return status;
+ }
}
+ pj_grp_lock_add_ref(turn_sock->grp_lock);
+ pj_grp_lock_add_handler(turn_sock->grp_lock, pool, turn_sock,
+ &turn_sock_on_destroy);
+
/* Init timer */
pj_timer_entry_init(&turn_sock->timer, TIMER_NONE, turn_sock, &timer_cb);
@@ -186,7 +194,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg,
sess_cb.on_rx_data = &turn_on_rx_data;
sess_cb.on_state = &turn_on_state;
status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type,
- &sess_cb, 0, turn_sock, &turn_sock->sess);
+ turn_sock->grp_lock, &sess_cb, 0,
+ turn_sock, &turn_sock->sess);
if (status != PJ_SUCCESS) {
destroy(turn_sock);
return status;
@@ -203,42 +212,45 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg,
/*
* Destroy.
*/
-static void destroy(pj_turn_sock *turn_sock)
+static void turn_sock_on_destroy(void *comp)
{
- if (turn_sock->lock) {
- pj_lock_acquire(turn_sock->lock);
- }
-
- if (turn_sock->sess) {
- pj_turn_session_set_user_data(turn_sock->sess, NULL);
- pj_turn_session_shutdown(turn_sock->sess);
- turn_sock->sess = NULL;
- }
-
- if (turn_sock->active_sock) {
- pj_activesock_set_user_data(turn_sock->active_sock, NULL);
- pj_activesock_close(turn_sock->active_sock);
- turn_sock->active_sock = NULL;
- }
-
- if (turn_sock->lock) {
- pj_lock_release(turn_sock->lock);
- pj_lock_destroy(turn_sock->lock);
- turn_sock->lock = NULL;
- }
+ pj_turn_sock *turn_sock = (pj_turn_sock*) comp;
if (turn_sock->pool) {
pj_pool_t *pool = turn_sock->pool;
+ PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroyed"));
turn_sock->pool = NULL;
pj_pool_release(pool);
}
}
+static void destroy(pj_turn_sock *turn_sock)
+{
+ PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroy request, ref_cnt=%d",
+ pj_grp_lock_get_ref(turn_sock->grp_lock)));
+
+ pj_grp_lock_acquire(turn_sock->grp_lock);
+ if (turn_sock->is_destroying) {
+ pj_grp_lock_release(turn_sock->grp_lock);
+ return;
+ }
+
+ turn_sock->is_destroying = PJ_TRUE;
+ if (turn_sock->sess)
+ pj_turn_session_shutdown(turn_sock->sess);
+ if (turn_sock->active_sock)
+ pj_activesock_close(turn_sock->active_sock);
+ pj_grp_lock_dec_ref(turn_sock->grp_lock);
+ pj_grp_lock_release(turn_sock->grp_lock);
+}
PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock)
{
- pj_lock_acquire(turn_sock->lock);
- turn_sock->destroy_request = PJ_TRUE;
+ pj_grp_lock_acquire(turn_sock->grp_lock);
+ if (turn_sock->is_destroying) {
+ pj_grp_lock_release(turn_sock->grp_lock);
+ return;
+ }
if (turn_sock->sess) {
pj_turn_session_shutdown(turn_sock->sess);
@@ -246,12 +258,11 @@ PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock)
* session state is DESTROYING we will schedule a timer to
* destroy ourselves.
*/
- pj_lock_release(turn_sock->lock);
} else {
- pj_lock_release(turn_sock->lock);
destroy(turn_sock);
}
+ pj_grp_lock_release(turn_sock->grp_lock);
}
@@ -267,7 +278,6 @@ static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e)
switch (eid) {
case TIMER_DESTROY:
- PJ_LOG(5,(turn_sock->obj_name, "Destroying TURN"));
destroy(turn_sock);
break;
default:
@@ -337,7 +347,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock,
*/
PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock)
{
- return pj_lock_acquire(turn_sock->lock);
+ return pj_grp_lock_acquire(turn_sock->grp_lock);
}
/**
@@ -345,7 +355,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock)
*/
PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock)
{
- return pj_lock_release(turn_sock->lock);
+ return pj_grp_lock_release(turn_sock->grp_lock);
}
/*
@@ -381,6 +391,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock,
PJ_ASSERT_RETURN(turn_sock && domain, PJ_EINVAL);
PJ_ASSERT_RETURN(turn_sock->sess, PJ_EINVALIDOP);
+ pj_grp_lock_acquire(turn_sock->grp_lock);
+
/* Copy alloc param. We will call session_alloc() only after the
* server address has been resolved.
*/
@@ -395,6 +407,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock,
status = pj_turn_session_set_credential(turn_sock->sess, cred);
if (status != PJ_SUCCESS) {
sess_fail(turn_sock, "Error setting credential", status);
+ pj_grp_lock_release(turn_sock->grp_lock);
return status;
}
}
@@ -404,13 +417,14 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock,
resolver);
if (status != PJ_SUCCESS) {
sess_fail(turn_sock, "Error setting TURN server", status);
+ pj_grp_lock_release(turn_sock->grp_lock);
return status;
}
/* Done for now. The next work will be done when session state moved
* to RESOLVED state.
*/
-
+ pj_grp_lock_release(turn_sock->grp_lock);
return PJ_SUCCESS;
}
@@ -472,16 +486,20 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock,
if (!turn_sock)
return PJ_FALSE;
+ pj_grp_lock_acquire(turn_sock->grp_lock);
+
/* TURN session may have already been destroyed here.
* See ticket #1557 (http://trac.pjsip.org/repos/ticket/1557).
*/
if (!turn_sock->sess) {
sess_fail(turn_sock, "TURN session already destroyed", status);
+ pj_grp_lock_release(turn_sock->grp_lock);
return PJ_FALSE;
}
if (status != PJ_SUCCESS) {
sess_fail(turn_sock, "TCP connect() error", status);
+ pj_grp_lock_release(turn_sock->grp_lock);
return PJ_FALSE;
}
@@ -500,9 +518,11 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock,
status = pj_turn_session_alloc(turn_sock->sess, &turn_sock->alloc_param);
if (status != PJ_SUCCESS) {
sess_fail(turn_sock, "Error sending ALLOCATE", status);
+ pj_grp_lock_release(turn_sock->grp_lock);
return PJ_FALSE;
}
+ pj_grp_lock_release(turn_sock->grp_lock);
return PJ_TRUE;
}
@@ -562,9 +582,9 @@ static pj_bool_t on_data_read(pj_activesock_t *asock,
pj_bool_t ret = PJ_TRUE;
turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock);
- pj_lock_acquire(turn_sock->lock);
+ pj_grp_lock_acquire(turn_sock->grp_lock);
- if (status == PJ_SUCCESS && turn_sock->sess) {
+ if (status == PJ_SUCCESS && turn_sock->sess && !turn_sock->is_destroying) {
/* Report incoming packet to TURN session, repeat while we have
* "packet" in the buffer (required for stream-oriented transports)
*/
@@ -614,7 +634,7 @@ static pj_bool_t on_data_read(pj_activesock_t *asock,
}
on_return:
- pj_lock_release(turn_sock->lock);
+ pj_grp_lock_release(turn_sock->grp_lock);
return ret;
}
@@ -634,7 +654,7 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess,
pj_ssize_t len = pkt_len;
pj_status_t status;
- if (turn_sock == NULL) {
+ if (turn_sock == NULL || turn_sock->is_destroying) {
/* We've been destroyed */
// https://trac.pjsip.org/repos/ticket/1316
//pj_assert(!"We should shutdown gracefully");
@@ -680,7 +700,7 @@ static void turn_on_rx_data(pj_turn_session *sess,
{
pj_turn_sock *turn_sock = (pj_turn_sock*)
pj_turn_session_get_user_data(sess);
- if (turn_sock == NULL) {
+ if (turn_sock == NULL || turn_sock->is_destroying) {
/* We've been destroyed */
return;
}
@@ -729,6 +749,7 @@ static void turn_on_state(pj_turn_session *sess,
char addrtxt[PJ_INET6_ADDRSTRLEN+8];
int sock_type;
pj_sock_t sock;
+ pj_activesock_cfg asock_cfg;
pj_activesock_cb asock_cb;
pj_sockaddr bound_addr, *cfg_bind_addr;
pj_uint16_t max_bind_retry;
@@ -790,11 +811,14 @@ static void turn_on_state(pj_turn_session *sess,
}
/* Create active socket */
+ pj_activesock_cfg_default(&asock_cfg);
+ asock_cfg.grp_lock = turn_sock->grp_lock;
+
pj_bzero(&asock_cb, sizeof(asock_cb));
asock_cb.on_data_read = &on_data_read;
asock_cb.on_connect_complete = &on_connect_complete;
status = pj_activesock_create(turn_sock->pool, sock,
- sock_type, NULL,
+ sock_type, &asock_cfg,
turn_sock->cfg.ioqueue, &asock_cb,
turn_sock,
&turn_sock->active_sock);
@@ -835,14 +859,12 @@ static void turn_on_state(pj_turn_session *sess,
turn_sock->sess = NULL;
pj_turn_session_set_user_data(sess, NULL);
- if (turn_sock->timer.id) {
- pj_timer_heap_cancel(turn_sock->cfg.timer_heap, &turn_sock->timer);
- turn_sock->timer.id = 0;
- }
-
- turn_sock->timer.id = TIMER_DESTROY;
- pj_timer_heap_schedule(turn_sock->cfg.timer_heap, &turn_sock->timer,
- &delay);
+ pj_timer_heap_cancel_if_active(turn_sock->cfg.timer_heap,
+ &turn_sock->timer, 0);
+ pj_timer_heap_schedule_w_grp_lock(turn_sock->cfg.timer_heap,
+ &turn_sock->timer,
+ &delay, TIMER_DESTROY,
+ turn_sock->grp_lock);
}
}