diff options
Diffstat (limited to 'pjnath/src/pjnath/turn_sock.c')
-rw-r--r-- | pjnath/src/pjnath/turn_sock.c | 173 |
1 files changed, 118 insertions, 55 deletions
diff --git a/pjnath/src/pjnath/turn_sock.c b/pjnath/src/pjnath/turn_sock.c index 799b557..970a955 100644 --- a/pjnath/src/pjnath/turn_sock.c +++ b/pjnath/src/pjnath/turn_sock.c @@ -1,4 +1,4 @@ -/* $Id: turn_sock.c 3841 2011-10-24 09:28:13Z ming $ */ +/* $Id: turn_sock.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -32,6 +32,10 @@ enum TIMER_DESTROY }; + +enum { MAX_BIND_RETRY = 100 }; + + #define INIT 0x1FFFFFFF struct pj_turn_sock @@ -42,13 +46,13 @@ struct pj_turn_sock pj_turn_sock_cb cb; void *user_data; - pj_lock_t *lock; + pj_bool_t is_destroying; + pj_grp_lock_t *grp_lock; pj_turn_alloc_param alloc_param; pj_stun_config cfg; pj_turn_sock_cfg setting; - pj_bool_t destroy_request; pj_timer_entry timer; int af; @@ -89,6 +93,7 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, +static void turn_sock_on_destroy(void *comp); static void destroy(pj_turn_sock *turn_sock); static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e); @@ -97,10 +102,12 @@ static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e); PJ_DEF(void) pj_turn_sock_cfg_default(pj_turn_sock_cfg *cfg) { pj_bzero(cfg, sizeof(*cfg)); + cfg->max_pkt_size = PJ_TURN_MAX_PKT_LEN; cfg->qos_type = PJ_QOS_TYPE_BEST_EFFORT; cfg->qos_ignore_error = PJ_TRUE; } + /* * Create. */ @@ -162,14 +169,21 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, pj_memcpy(&turn_sock->cb, cb, sizeof(*cb)); } - /* Create lock */ - status = pj_lock_create_recursive_mutex(pool, turn_sock->obj_name, - &turn_sock->lock); - if (status != PJ_SUCCESS) { - destroy(turn_sock); - return status; + /* Session lock */ + if (setting && setting->grp_lock) { + turn_sock->grp_lock = setting->grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &turn_sock->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(turn_sock->grp_lock); + pj_grp_lock_add_handler(turn_sock->grp_lock, pool, turn_sock, + &turn_sock_on_destroy); + /* Init timer */ pj_timer_entry_init(&turn_sock->timer, TIMER_NONE, turn_sock, &timer_cb); @@ -180,7 +194,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, sess_cb.on_rx_data = &turn_on_rx_data; sess_cb.on_state = &turn_on_state; status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, - &sess_cb, 0, turn_sock, &turn_sock->sess); + turn_sock->grp_lock, &sess_cb, 0, + turn_sock, &turn_sock->sess); if (status != PJ_SUCCESS) { destroy(turn_sock); return status; @@ -197,41 +212,45 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, /* * Destroy. */ -static void destroy(pj_turn_sock *turn_sock) +static void turn_sock_on_destroy(void *comp) { - if (turn_sock->lock) { - pj_lock_acquire(turn_sock->lock); - } - - if (turn_sock->sess) { - pj_turn_session_set_user_data(turn_sock->sess, NULL); - pj_turn_session_shutdown(turn_sock->sess); - turn_sock->sess = NULL; - } - - if (turn_sock->active_sock) { - pj_activesock_close(turn_sock->active_sock); - turn_sock->active_sock = NULL; - } - - if (turn_sock->lock) { - pj_lock_release(turn_sock->lock); - pj_lock_destroy(turn_sock->lock); - turn_sock->lock = NULL; - } + pj_turn_sock *turn_sock = (pj_turn_sock*) comp; if (turn_sock->pool) { pj_pool_t *pool = turn_sock->pool; + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroyed")); turn_sock->pool = NULL; pj_pool_release(pool); } } +static void destroy(pj_turn_sock *turn_sock) +{ + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroy request, ref_cnt=%d", + pj_grp_lock_get_ref(turn_sock->grp_lock))); + + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } + + turn_sock->is_destroying = PJ_TRUE; + if (turn_sock->sess) + pj_turn_session_shutdown(turn_sock->sess); + if (turn_sock->active_sock) + pj_activesock_close(turn_sock->active_sock); + pj_grp_lock_dec_ref(turn_sock->grp_lock); + pj_grp_lock_release(turn_sock->grp_lock); +} PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) { - pj_lock_acquire(turn_sock->lock); - turn_sock->destroy_request = PJ_TRUE; + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } if (turn_sock->sess) { pj_turn_session_shutdown(turn_sock->sess); @@ -239,12 +258,11 @@ PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) * session state is DESTROYING we will schedule a timer to * destroy ourselves. */ - pj_lock_release(turn_sock->lock); } else { - pj_lock_release(turn_sock->lock); destroy(turn_sock); } + pj_grp_lock_release(turn_sock->grp_lock); } @@ -260,7 +278,6 @@ static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e) switch (eid) { case TIMER_DESTROY: - PJ_LOG(5,(turn_sock->obj_name, "Destroying TURN")); destroy(turn_sock); break; default: @@ -330,7 +347,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, */ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) { - return pj_lock_acquire(turn_sock->lock); + return pj_grp_lock_acquire(turn_sock->grp_lock); } /** @@ -338,7 +355,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) */ PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock) { - return pj_lock_release(turn_sock->lock); + return pj_grp_lock_release(turn_sock->grp_lock); } /* @@ -374,6 +391,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, PJ_ASSERT_RETURN(turn_sock && domain, PJ_EINVAL); PJ_ASSERT_RETURN(turn_sock->sess, PJ_EINVALIDOP); + pj_grp_lock_acquire(turn_sock->grp_lock); + /* Copy alloc param. We will call session_alloc() only after the * server address has been resolved. */ @@ -388,6 +407,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, status = pj_turn_session_set_credential(turn_sock->sess, cred); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting credential", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } } @@ -397,13 +417,14 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, resolver); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting TURN server", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } /* Done for now. The next work will be done when session state moved * to RESOLVED state. */ - + pj_grp_lock_release(turn_sock->grp_lock); return PJ_SUCCESS; } @@ -462,9 +483,23 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, pj_turn_sock *turn_sock; turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); + if (!turn_sock) + return PJ_FALSE; + + pj_grp_lock_acquire(turn_sock->grp_lock); + + /* TURN session may have already been destroyed here. + * See ticket #1557 (http://trac.pjsip.org/repos/ticket/1557). + */ + if (!turn_sock->sess) { + sess_fail(turn_sock, "TURN session already destroyed", status); + pj_grp_lock_release(turn_sock->grp_lock); + return PJ_FALSE; + } if (status != PJ_SUCCESS) { sess_fail(turn_sock, "TCP connect() error", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } @@ -474,7 +509,7 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, /* Kick start pending read operation */ status = pj_activesock_start_read(asock, turn_sock->pool, - PJ_TURN_MAX_PKT_LEN, 0); + turn_sock->setting.max_pkt_size, 0); /* Init send_key */ pj_ioqueue_op_key_init(&turn_sock->send_key, sizeof(turn_sock->send_key)); @@ -483,9 +518,11 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, status = pj_turn_session_alloc(turn_sock->sess, &turn_sock->alloc_param); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error sending ALLOCATE", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } + pj_grp_lock_release(turn_sock->grp_lock); return PJ_TRUE; } @@ -545,9 +582,9 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, pj_bool_t ret = PJ_TRUE; turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); - pj_lock_acquire(turn_sock->lock); + pj_grp_lock_acquire(turn_sock->grp_lock); - if (status == PJ_SUCCESS && turn_sock->sess) { + if (status == PJ_SUCCESS && turn_sock->sess && !turn_sock->is_destroying) { /* Report incoming packet to TURN session, repeat while we have * "packet" in the buffer (required for stream-oriented transports) */ @@ -597,7 +634,7 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, } on_return: - pj_lock_release(turn_sock->lock); + pj_grp_lock_release(turn_sock->grp_lock); return ret; } @@ -617,7 +654,7 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess, pj_ssize_t len = pkt_len; pj_status_t status; - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ // https://trac.pjsip.org/repos/ticket/1316 //pj_assert(!"We should shutdown gracefully"); @@ -663,7 +700,7 @@ static void turn_on_rx_data(pj_turn_session *sess, { pj_turn_sock *turn_sock = (pj_turn_sock*) pj_turn_session_get_user_data(sess); - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ return; } @@ -712,7 +749,10 @@ static void turn_on_state(pj_turn_session *sess, char addrtxt[PJ_INET6_ADDRSTRLEN+8]; int sock_type; pj_sock_t sock; + pj_activesock_cfg asock_cfg; pj_activesock_cb asock_cb; + pj_sockaddr bound_addr, *cfg_bind_addr; + pj_uint16_t max_bind_retry; /* Close existing connection, if any. This happens when * we're switching to alternate TURN server when either TCP @@ -738,7 +778,29 @@ static void turn_on_state(pj_turn_session *sess, return; } - /* Apply QoS, if specified */ + /* Bind socket */ + cfg_bind_addr = &turn_sock->setting.bound_addr; + max_bind_retry = MAX_BIND_RETRY; + if (turn_sock->setting.port_range && + turn_sock->setting.port_range < max_bind_retry) + { + max_bind_retry = turn_sock->setting.port_range; + } + pj_sockaddr_init(turn_sock->af, &bound_addr, NULL, 0); + if (cfg_bind_addr->addr.sa_family == pj_AF_INET() || + cfg_bind_addr->addr.sa_family == pj_AF_INET6()) + { + pj_sockaddr_cp(&bound_addr, cfg_bind_addr); + } + status = pj_sock_bind_random(sock, &bound_addr, + turn_sock->setting.port_range, + max_bind_retry); + if (status != PJ_SUCCESS) { + pj_turn_sock_destroy(turn_sock); + return; + } + + /* Apply QoS, if specified */ status = pj_sock_apply_qos2(sock, turn_sock->setting.qos_type, &turn_sock->setting.qos_params, (turn_sock->setting.qos_ignore_error?2:1), @@ -749,11 +811,14 @@ static void turn_on_state(pj_turn_session *sess, } /* Create active socket */ + pj_activesock_cfg_default(&asock_cfg); + asock_cfg.grp_lock = turn_sock->grp_lock; + pj_bzero(&asock_cb, sizeof(asock_cb)); asock_cb.on_data_read = &on_data_read; asock_cb.on_connect_complete = &on_connect_complete; status = pj_activesock_create(turn_sock->pool, sock, - sock_type, NULL, + sock_type, &asock_cfg, turn_sock->cfg.ioqueue, &asock_cb, turn_sock, &turn_sock->active_sock); @@ -794,14 +859,12 @@ static void turn_on_state(pj_turn_session *sess, turn_sock->sess = NULL; pj_turn_session_set_user_data(sess, NULL); - if (turn_sock->timer.id) { - pj_timer_heap_cancel(turn_sock->cfg.timer_heap, &turn_sock->timer); - turn_sock->timer.id = 0; - } - - turn_sock->timer.id = TIMER_DESTROY; - pj_timer_heap_schedule(turn_sock->cfg.timer_heap, &turn_sock->timer, - &delay); + pj_timer_heap_cancel_if_active(turn_sock->cfg.timer_heap, + &turn_sock->timer, 0); + pj_timer_heap_schedule_w_grp_lock(turn_sock->cfg.timer_heap, + &turn_sock->timer, + &delay, TIMER_DESTROY, + turn_sock->grp_lock); } } |