diff options
Diffstat (limited to 'pjnath/src/pjnath/ice_strans.c')
-rw-r--r-- | pjnath/src/pjnath/ice_strans.c | 182 |
1 files changed, 77 insertions, 105 deletions
diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c index a798586b..4c1182fd 100644 --- a/pjnath/src/pjnath/ice_strans.c +++ b/pjnath/src/pjnath/ice_strans.c @@ -126,13 +126,11 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, /* Forward decls */ +static void ice_st_on_destroy(void *obj); static void destroy_ice_st(pj_ice_strans *ice_st); #define ice_st_perror(ice_st,msg,rc) pjnath_perror(ice_st->obj_name,msg,rc) static void sess_init_update(pj_ice_strans *ice_st); -static void sess_add_ref(pj_ice_strans *ice_st); -static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st); - /** * This structure describes an ICE stream transport component. A component * in ICE stream transport typically corresponds to a single socket created @@ -172,7 +170,7 @@ struct pj_ice_strans void *user_data; /**< Application data. */ pj_ice_strans_cfg cfg; /**< Configuration. */ pj_ice_strans_cb cb; /**< Application callback. */ - pj_lock_t *init_lock; /**< Initialization mutex. */ + pj_grp_lock_t *grp_lock; /**< Group lock. */ pj_ice_strans_state state; /**< Session state. */ pj_ice_sess *ice; /**< ICE session. */ @@ -183,7 +181,6 @@ struct pj_ice_strans pj_timer_entry ka_timer; /**< STUN keep-alive timer. */ - pj_atomic_t *busy_cnt; /**< To prevent destroy */ pj_bool_t destroy_req;/**< Destroy has been called? */ pj_bool_t cb_called; /**< Init error callback called?*/ }; @@ -551,23 +548,22 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, comp_cnt)); pj_log_push_indent(); - pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg); - pj_memcpy(&ice_st->cb, cb, sizeof(*cb)); - - status = pj_atomic_create(pool, 0, &ice_st->busy_cnt); - if (status != PJ_SUCCESS) { - destroy_ice_st(ice_st); - return status; - } - - status = pj_lock_create_recursive_mutex(pool, ice_st->obj_name, - &ice_st->init_lock); + status = pj_grp_lock_create(pool, NULL, &ice_st->grp_lock); if (status != PJ_SUCCESS) { - destroy_ice_st(ice_st); + pj_pool_release(pool); pj_log_pop_indent(); return status; } + pj_grp_lock_add_ref(ice_st->grp_lock); + pj_grp_lock_add_handler(ice_st->grp_lock, pool, ice_st, + &ice_st_on_destroy); + + pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg); + ice_st->cfg.stun.cfg.grp_lock = ice_st->grp_lock; + ice_st->cfg.turn.cfg.grp_lock = ice_st->grp_lock; + pj_memcpy(&ice_st->cb, cb, sizeof(*cb)); + ice_st->comp_cnt = comp_cnt; ice_st->comp = (pj_ice_strans_comp**) pj_pool_calloc(pool, comp_cnt, sizeof(pj_ice_strans_comp*)); @@ -578,12 +574,12 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, /* Acquire initialization mutex to prevent callback to be * called before we finish initialization. */ - pj_lock_acquire(ice_st->init_lock); + pj_grp_lock_acquire(ice_st->grp_lock); for (i=0; i<comp_cnt; ++i) { status = create_comp(ice_st, i+1); if (status != PJ_SUCCESS) { - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); destroy_ice_st(ice_st); pj_log_pop_indent(); return status; @@ -591,9 +587,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, } /* Done with initialization */ - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); - PJ_LOG(4,(ice_st->obj_name, "ICE stream transport created")); + PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p created", ice_st)); *p_ice_st = ice_st; @@ -605,14 +601,35 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, return PJ_SUCCESS; } +/* REALLY destroy ICE */ +static void ice_st_on_destroy(void *obj) +{ + pj_ice_strans *ice_st = (pj_ice_strans*)obj; + + PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p destroyed", obj)); + + /* Done */ + pj_pool_release(ice_st->pool); +} + /* Destroy ICE */ static void destroy_ice_st(pj_ice_strans *ice_st) { unsigned i; - PJ_LOG(5,(ice_st->obj_name, "ICE stream transport destroying..")); + PJ_LOG(5,(ice_st->obj_name, "ICE stream transport %p destroy request..", + ice_st)); pj_log_push_indent(); + pj_grp_lock_acquire(ice_st->grp_lock); + + if (ice_st->destroy_req) { + pj_grp_lock_release(ice_st->grp_lock); + return; + } + + ice_st->destroy_req = PJ_TRUE; + /* Destroy ICE if we have ICE */ if (ice_st->ice) { pj_ice_sess_destroy(ice_st->ice); @@ -623,38 +640,19 @@ static void destroy_ice_st(pj_ice_strans *ice_st) for (i=0; i<ice_st->comp_cnt; ++i) { if (ice_st->comp[i]) { if (ice_st->comp[i]->stun_sock) { - pj_stun_sock_set_user_data(ice_st->comp[i]->stun_sock, NULL); pj_stun_sock_destroy(ice_st->comp[i]->stun_sock); ice_st->comp[i]->stun_sock = NULL; } if (ice_st->comp[i]->turn_sock) { - pj_turn_sock_set_user_data(ice_st->comp[i]->turn_sock, NULL); pj_turn_sock_destroy(ice_st->comp[i]->turn_sock); ice_st->comp[i]->turn_sock = NULL; } } } - ice_st->comp_cnt = 0; - - /* Destroy mutex */ - if (ice_st->init_lock) { - pj_lock_acquire(ice_st->init_lock); - pj_lock_release(ice_st->init_lock); - pj_lock_destroy(ice_st->init_lock); - ice_st->init_lock = NULL; - } - /* Destroy reference counter */ - if (ice_st->busy_cnt) { - pj_assert(pj_atomic_get(ice_st->busy_cnt)==0); - pj_atomic_destroy(ice_st->busy_cnt); - ice_st->busy_cnt = NULL; - } + pj_grp_lock_dec_ref(ice_st->grp_lock); + pj_grp_lock_release(ice_st->grp_lock); - PJ_LOG(4,(ice_st->obj_name, "ICE stream transport destroyed")); - - /* Done */ - pj_pool_release(ice_st->pool); pj_log_pop_indent(); } @@ -739,45 +737,12 @@ static void sess_init_update(pj_ice_strans *ice_st) */ PJ_DEF(pj_status_t) pj_ice_strans_destroy(pj_ice_strans *ice_st) { - PJ_ASSERT_RETURN(ice_st, PJ_EINVAL); - - sess_add_ref(ice_st); - ice_st->destroy_req = PJ_TRUE; - if (sess_dec_ref(ice_st)) { - PJ_LOG(5,(ice_st->obj_name, - "ICE strans object is busy, will destroy later")); - return PJ_EPENDING; - } - + destroy_ice_st(ice_st); return PJ_SUCCESS; } /* - * Increment busy counter. - */ -static void sess_add_ref(pj_ice_strans *ice_st) -{ - pj_atomic_inc(ice_st->busy_cnt); -} - -/* - * Decrement busy counter. If the counter has reached zero and destroy - * has been requested, destroy the object and return FALSE. - */ -static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st) -{ - int count = pj_atomic_dec_and_get(ice_st->busy_cnt); - pj_assert(count >= 0); - if (count==0 && ice_st->destroy_req) { - destroy_ice_st(ice_st); - return PJ_FALSE; - } else { - return PJ_TRUE; - } -} - -/* * Get user data */ PJ_DEF(void*) pj_ice_strans_get_user_data(pj_ice_strans *ice_st) @@ -840,7 +805,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st, /* Create! */ status = pj_ice_sess_create(&ice_st->cfg.stun_cfg, ice_st->obj_name, role, ice_st->comp_cnt, &ice_cb, - local_ufrag, local_passwd, &ice_st->ice); + local_ufrag, local_passwd, + ice_st->grp_lock, + &ice_st->ice); if (status != PJ_SUCCESS) return status; @@ -1255,7 +1222,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) pj_time_val t; unsigned msec; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); pj_gettimeofday(&t); PJ_TIME_VAL_SUB(t, ice_st->start_time); @@ -1337,7 +1304,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) } - sess_dec_ref(ice_st); + pj_grp_lock_dec_ref(ice_st->grp_lock); } /* @@ -1426,7 +1393,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock, ice_st = comp->ice_st; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); if (ice_st->ice == NULL) { /* The ICE session is gone, but we're still receiving packets. @@ -1451,7 +1418,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock, } } - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE; } /* Notifification when asynchronous send operation to the STUN socket @@ -1482,10 +1449,10 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, comp = (pj_ice_strans_comp*) pj_stun_sock_get_user_data(stun_sock); ice_st = comp->ice_st; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); /* Wait until initialization completes */ - pj_lock_acquire(ice_st->init_lock); + pj_grp_lock_acquire(ice_st->grp_lock); /* Find the srflx cancidate */ for (i=0; i<comp->cand_cnt; ++i) { @@ -1495,14 +1462,14 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, } } - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); /* It is possible that we don't have srflx candidate even though this * callback is called. This could happen when we cancel adding srflx * candidate due to initialization error. */ if (cand == NULL) { - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE; } switch (op) { @@ -1618,7 +1585,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, break; } - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock)? PJ_FALSE : PJ_TRUE; } /* Callback when TURN socket has received a packet */ @@ -1637,7 +1604,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock, return; } - sess_add_ref(comp->ice_st); + pj_grp_lock_add_ref(comp->ice_st->grp_lock); if (comp->ice_st->ice == NULL) { /* The ICE session is gone, but we're still receiving packets. @@ -1664,7 +1631,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock, } } - sess_dec_ref(comp->ice_st); + pj_grp_lock_dec_ref(comp->ice_st->grp_lock); } @@ -1686,7 +1653,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_name(old_state), pj_turn_state_name(new_state))); pj_log_push_indent(); - sess_add_ref(comp->ice_st); + pj_grp_lock_add_ref(comp->ice_st->grp_lock); if (new_state == PJ_TURN_STATE_READY) { pj_turn_session_info rel_info; @@ -1700,7 +1667,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_sock_get_info(turn_sock, &rel_info); /* Wait until initialization completes */ - pj_lock_acquire(comp->ice_st->init_lock); + pj_grp_lock_acquire(comp->ice_st->grp_lock); /* Find relayed candidate in the component */ for (i=0; i<comp->cand_cnt; ++i) { @@ -1711,7 +1678,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, } pj_assert(cand != NULL); - pj_lock_release(comp->ice_st->init_lock); + pj_grp_lock_release(comp->ice_st->grp_lock); /* Update candidate */ pj_sockaddr_cp(&cand->addr, &rel_info.relay_addr); @@ -1744,22 +1711,27 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_sock_set_user_data(turn_sock, NULL); comp->turn_sock = NULL; - /* Set session to fail if we're still initializing */ - if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) { - sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT, - "TURN allocation failed", info.last_status); - } else if (comp->turn_err_cnt > 1) { - sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE, - "TURN refresh failed", info.last_status); - } else { - PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status, - "Comp %d: TURN allocation failed, retrying", - comp->comp_id)); - add_update_turn(comp->ice_st, comp); + /* Set session to fail on error. last_status PJ_SUCCESS means normal + * deallocation, which should not trigger sess_fail as it may have + * been initiated by ICE destroy + */ + if (info.last_status != PJ_SUCCESS) { + if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) { + sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT, + "TURN allocation failed", info.last_status); + } else if (comp->turn_err_cnt > 1) { + sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE, + "TURN refresh failed", info.last_status); + } else { + PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status, + "Comp %d: TURN allocation failed, retrying", + comp->comp_id)); + add_update_turn(comp->ice_st, comp); + } } } - sess_dec_ref(comp->ice_st); + pj_grp_lock_dec_ref(comp->ice_st->grp_lock); pj_log_pop_indent(); } |