diff options
Diffstat (limited to 'pjnath/src/pjnath/turn_sock.c')
-rw-r--r-- | pjnath/src/pjnath/turn_sock.c | 128 |
1 files changed, 75 insertions, 53 deletions
diff --git a/pjnath/src/pjnath/turn_sock.c b/pjnath/src/pjnath/turn_sock.c index 08831a4c..6392428f 100644 --- a/pjnath/src/pjnath/turn_sock.c +++ b/pjnath/src/pjnath/turn_sock.c @@ -46,13 +46,13 @@ struct pj_turn_sock pj_turn_sock_cb cb; void *user_data; - pj_lock_t *lock; + pj_bool_t is_destroying; + pj_grp_lock_t *grp_lock; pj_turn_alloc_param alloc_param; pj_stun_config cfg; pj_turn_sock_cfg setting; - pj_bool_t destroy_request; pj_timer_entry timer; int af; @@ -93,6 +93,7 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, +static void turn_sock_on_destroy(void *comp); static void destroy(pj_turn_sock *turn_sock); static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e); @@ -168,14 +169,21 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, pj_memcpy(&turn_sock->cb, cb, sizeof(*cb)); } - /* Create lock */ - status = pj_lock_create_recursive_mutex(pool, turn_sock->obj_name, - &turn_sock->lock); - if (status != PJ_SUCCESS) { - destroy(turn_sock); - return status; + /* Session lock */ + if (setting && setting->grp_lock) { + turn_sock->grp_lock = setting->grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &turn_sock->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(turn_sock->grp_lock); + pj_grp_lock_add_handler(turn_sock->grp_lock, pool, turn_sock, + &turn_sock_on_destroy); + /* Init timer */ pj_timer_entry_init(&turn_sock->timer, TIMER_NONE, turn_sock, &timer_cb); @@ -186,7 +194,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, sess_cb.on_rx_data = &turn_on_rx_data; sess_cb.on_state = &turn_on_state; status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, - &sess_cb, 0, turn_sock, &turn_sock->sess); + turn_sock->grp_lock, &sess_cb, 0, + turn_sock, &turn_sock->sess); if (status != PJ_SUCCESS) { destroy(turn_sock); return status; @@ -203,42 +212,45 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, /* * Destroy. */ -static void destroy(pj_turn_sock *turn_sock) +static void turn_sock_on_destroy(void *comp) { - if (turn_sock->lock) { - pj_lock_acquire(turn_sock->lock); - } - - if (turn_sock->sess) { - pj_turn_session_set_user_data(turn_sock->sess, NULL); - pj_turn_session_shutdown(turn_sock->sess); - turn_sock->sess = NULL; - } - - if (turn_sock->active_sock) { - pj_activesock_set_user_data(turn_sock->active_sock, NULL); - pj_activesock_close(turn_sock->active_sock); - turn_sock->active_sock = NULL; - } - - if (turn_sock->lock) { - pj_lock_release(turn_sock->lock); - pj_lock_destroy(turn_sock->lock); - turn_sock->lock = NULL; - } + pj_turn_sock *turn_sock = (pj_turn_sock*) comp; if (turn_sock->pool) { pj_pool_t *pool = turn_sock->pool; + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroyed")); turn_sock->pool = NULL; pj_pool_release(pool); } } +static void destroy(pj_turn_sock *turn_sock) +{ + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroy request, ref_cnt=%d", + pj_grp_lock_get_ref(turn_sock->grp_lock))); + + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } + + turn_sock->is_destroying = PJ_TRUE; + if (turn_sock->sess) + pj_turn_session_shutdown(turn_sock->sess); + if (turn_sock->active_sock) + pj_activesock_close(turn_sock->active_sock); + pj_grp_lock_dec_ref(turn_sock->grp_lock); + pj_grp_lock_release(turn_sock->grp_lock); +} PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) { - pj_lock_acquire(turn_sock->lock); - turn_sock->destroy_request = PJ_TRUE; + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } if (turn_sock->sess) { pj_turn_session_shutdown(turn_sock->sess); @@ -246,12 +258,11 @@ PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) * session state is DESTROYING we will schedule a timer to * destroy ourselves. */ - pj_lock_release(turn_sock->lock); } else { - pj_lock_release(turn_sock->lock); destroy(turn_sock); } + pj_grp_lock_release(turn_sock->grp_lock); } @@ -267,7 +278,6 @@ static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e) switch (eid) { case TIMER_DESTROY: - PJ_LOG(5,(turn_sock->obj_name, "Destroying TURN")); destroy(turn_sock); break; default: @@ -337,7 +347,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, */ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) { - return pj_lock_acquire(turn_sock->lock); + return pj_grp_lock_acquire(turn_sock->grp_lock); } /** @@ -345,7 +355,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) */ PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock) { - return pj_lock_release(turn_sock->lock); + return pj_grp_lock_release(turn_sock->grp_lock); } /* @@ -381,6 +391,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, PJ_ASSERT_RETURN(turn_sock && domain, PJ_EINVAL); PJ_ASSERT_RETURN(turn_sock->sess, PJ_EINVALIDOP); + pj_grp_lock_acquire(turn_sock->grp_lock); + /* Copy alloc param. We will call session_alloc() only after the * server address has been resolved. */ @@ -395,6 +407,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, status = pj_turn_session_set_credential(turn_sock->sess, cred); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting credential", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } } @@ -404,13 +417,14 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, resolver); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting TURN server", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } /* Done for now. The next work will be done when session state moved * to RESOLVED state. */ - + pj_grp_lock_release(turn_sock->grp_lock); return PJ_SUCCESS; } @@ -472,16 +486,20 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, if (!turn_sock) return PJ_FALSE; + pj_grp_lock_acquire(turn_sock->grp_lock); + /* TURN session may have already been destroyed here. * See ticket #1557 (http://trac.pjsip.org/repos/ticket/1557). */ if (!turn_sock->sess) { sess_fail(turn_sock, "TURN session already destroyed", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } if (status != PJ_SUCCESS) { sess_fail(turn_sock, "TCP connect() error", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } @@ -500,9 +518,11 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, status = pj_turn_session_alloc(turn_sock->sess, &turn_sock->alloc_param); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error sending ALLOCATE", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } + pj_grp_lock_release(turn_sock->grp_lock); return PJ_TRUE; } @@ -562,9 +582,9 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, pj_bool_t ret = PJ_TRUE; turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); - pj_lock_acquire(turn_sock->lock); + pj_grp_lock_acquire(turn_sock->grp_lock); - if (status == PJ_SUCCESS && turn_sock->sess) { + if (status == PJ_SUCCESS && turn_sock->sess && !turn_sock->is_destroying) { /* Report incoming packet to TURN session, repeat while we have * "packet" in the buffer (required for stream-oriented transports) */ @@ -614,7 +634,7 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, } on_return: - pj_lock_release(turn_sock->lock); + pj_grp_lock_release(turn_sock->grp_lock); return ret; } @@ -634,7 +654,7 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess, pj_ssize_t len = pkt_len; pj_status_t status; - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ // https://trac.pjsip.org/repos/ticket/1316 //pj_assert(!"We should shutdown gracefully"); @@ -680,7 +700,7 @@ static void turn_on_rx_data(pj_turn_session *sess, { pj_turn_sock *turn_sock = (pj_turn_sock*) pj_turn_session_get_user_data(sess); - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ return; } @@ -729,6 +749,7 @@ static void turn_on_state(pj_turn_session *sess, char addrtxt[PJ_INET6_ADDRSTRLEN+8]; int sock_type; pj_sock_t sock; + pj_activesock_cfg asock_cfg; pj_activesock_cb asock_cb; pj_sockaddr bound_addr, *cfg_bind_addr; pj_uint16_t max_bind_retry; @@ -790,11 +811,14 @@ static void turn_on_state(pj_turn_session *sess, } /* Create active socket */ + pj_activesock_cfg_default(&asock_cfg); + asock_cfg.grp_lock = turn_sock->grp_lock; + pj_bzero(&asock_cb, sizeof(asock_cb)); asock_cb.on_data_read = &on_data_read; asock_cb.on_connect_complete = &on_connect_complete; status = pj_activesock_create(turn_sock->pool, sock, - sock_type, NULL, + sock_type, &asock_cfg, turn_sock->cfg.ioqueue, &asock_cb, turn_sock, &turn_sock->active_sock); @@ -835,14 +859,12 @@ static void turn_on_state(pj_turn_session *sess, turn_sock->sess = NULL; pj_turn_session_set_user_data(sess, NULL); - if (turn_sock->timer.id) { - pj_timer_heap_cancel(turn_sock->cfg.timer_heap, &turn_sock->timer); - turn_sock->timer.id = 0; - } - - turn_sock->timer.id = TIMER_DESTROY; - pj_timer_heap_schedule(turn_sock->cfg.timer_heap, &turn_sock->timer, - &delay); + pj_timer_heap_cancel_if_active(turn_sock->cfg.timer_heap, + &turn_sock->timer, 0); + pj_timer_heap_schedule_w_grp_lock(turn_sock->cfg.timer_heap, + &turn_sock->timer, + &delay, TIMER_DESTROY, + turn_sock->grp_lock); } } |