From 7d07d4d6975f55876bc096627696d23028f56d48 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Thu, 21 Feb 2013 11:26:35 +0000 Subject: Fixed #1617: major synchronization fixes in PJNATH with incorporation of group lock to avoid deadlock and crashes due to race conditions git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@4360 74dad513-b988-da41-8d7b-12977e46ad98 --- pjnath/build/Makefile | 2 +- pjnath/include/pjnath/ice_session.h | 5 +- pjnath/include/pjnath/stun_session.h | 20 +- pjnath/include/pjnath/stun_sock.h | 9 + pjnath/include/pjnath/stun_transaction.h | 8 +- pjnath/include/pjnath/turn_session.h | 6 +- pjnath/include/pjnath/turn_sock.h | 8 + pjnath/src/pjnath-test/concur_test.c | 360 ++++++++++++++++++++++++++++++ pjnath/src/pjnath-test/ice_test.c | 235 +++++++++++++++++--- pjnath/src/pjnath-test/sess_auth.c | 4 +- pjnath/src/pjnath-test/stun_sock_test.c | 12 +- pjnath/src/pjnath-test/test.c | 33 ++- pjnath/src/pjnath-test/test.h | 4 + pjnath/src/pjnath/ice_session.c | 294 +++++++++++++++---------- pjnath/src/pjnath/ice_strans.c | 182 +++++++--------- pjnath/src/pjnath/nat_detect.c | 2 +- pjnath/src/pjnath/stun_session.c | 364 ++++++++++++++++++------------- pjnath/src/pjnath/stun_sock.c | 189 +++++++++++----- pjnath/src/pjnath/stun_transaction.c | 135 +++++++----- pjnath/src/pjnath/turn_session.c | 173 +++++++-------- pjnath/src/pjnath/turn_sock.c | 128 ++++++----- pjnath/src/pjturn-srv/allocation.c | 2 +- pjnath/src/pjturn-srv/server.c | 3 +- 23 files changed, 1478 insertions(+), 700 deletions(-) create mode 100644 pjnath/src/pjnath-test/concur_test.c (limited to 'pjnath') diff --git a/pjnath/build/Makefile b/pjnath/build/Makefile index 66017c79..32b0b50d 100644 --- a/pjnath/build/Makefile +++ b/pjnath/build/Makefile @@ -40,7 +40,7 @@ export PJNATH_CFLAGS += $(_CFLAGS) # Defines for building test application # export PJNATH_TEST_SRCDIR = ../src/pjnath-test -export PJNATH_TEST_OBJS += ice_test.o stun.o sess_auth.o server.o \ +export PJNATH_TEST_OBJS += ice_test.o stun.o sess_auth.o server.o concur_test.o \ stun_sock_test.o turn_sock_test.o test.o export PJNATH_TEST_CFLAGS += $(_CFLAGS) export PJNATH_TEST_LDFLAGS += $(_LDFLAGS) diff --git a/pjnath/include/pjnath/ice_session.h b/pjnath/include/pjnath/ice_session.h index f48e12d4..8cd45864 100644 --- a/pjnath/include/pjnath/ice_session.h +++ b/pjnath/include/pjnath/ice_session.h @@ -612,7 +612,7 @@ struct pj_ice_sess pj_pool_t *pool; /**< Pool instance. */ void *user_data; /**< App. data. */ - pj_mutex_t *mutex; /**< Mutex. */ + pj_grp_lock_t *grp_lock; /**< Group lock */ pj_ice_sess_role role; /**< ICE role. */ pj_ice_sess_options opt; /**< Options */ pj_timestamp tie_breaker; /**< Tie breaker value */ @@ -730,6 +730,8 @@ PJ_DECL(void) pj_ice_sess_options_default(pj_ice_sess_options *opt); * the value is NULL, a random string will be * generated. * @param local_passwd Optional string to be used as local password. + * @param grp_lock Optional group lock to be used by this session. + * If NULL, the session will create one itself. * @param p_ice Pointer to receive the ICE session instance. * * @return PJ_SUCCESS if ICE session is created successfully. @@ -741,6 +743,7 @@ PJ_DECL(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg, const pj_ice_sess_cb *cb, const pj_str_t *local_ufrag, const pj_str_t *local_passwd, + pj_grp_lock_t *grp_lock, pj_ice_sess **p_ice); /** diff --git a/pjnath/include/pjnath/stun_session.h b/pjnath/include/pjnath/stun_session.h index 22551a8f..88dc78df 100644 --- a/pjnath/include/pjnath/stun_session.h +++ b/pjnath/include/pjnath/stun_session.h @@ -30,6 +30,7 @@ #include #include #include +#include #include PJ_BEGIN_DECL @@ -384,6 +385,8 @@ typedef enum pj_stun_sess_msg_log_flag * name will be used for example for logging purpose. * @param cb Session callback. * @param fingerprint Enable message fingerprint for outgoing messages. + * @param grp_lock Optional group lock to be used by this session. + * If NULL, the session will create one itself. * @param p_sess Pointer to receive STUN session instance. * * @return PJ_SUCCESS on success, or the appropriate error code. @@ -392,6 +395,7 @@ PJ_DECL(pj_status_t) pj_stun_session_create(pj_stun_config *cfg, const char *name, const pj_stun_session_cb *cb, pj_bool_t fingerprint, + pj_grp_lock_t *grp_lock, pj_stun_session **p_sess); /** @@ -430,22 +434,6 @@ PJ_DECL(pj_status_t) pj_stun_session_set_user_data(pj_stun_session *sess, */ PJ_DECL(void*) pj_stun_session_get_user_data(pj_stun_session *sess); -/** - * Change the lock object used by the STUN session. By default, the STUN - * session uses a mutex to protect its internal data. If application already - * protects access to STUN session with higher layer lock, it may disable - * the mutex protection in the STUN session by changing the STUN session - * lock to a NULL mutex. - * - * @param sess The STUN session instance. - * @param lock New lock instance to be used by the STUN session. - * @param auto_del Specify whether STUN session should destroy this - * lock instance when it's destroyed. - */ -PJ_DECL(pj_status_t) pj_stun_session_set_lock(pj_stun_session *sess, - pj_lock_t *lock, - pj_bool_t auto_del); - /** * Set SOFTWARE name to be included in all requests and responses. * diff --git a/pjnath/include/pjnath/stun_sock.h b/pjnath/include/pjnath/stun_sock.h index f9e9be2e..30e94a2d 100644 --- a/pjnath/include/pjnath/stun_sock.h +++ b/pjnath/include/pjnath/stun_sock.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -217,6 +218,14 @@ typedef struct pj_stun_sock_info */ typedef struct pj_stun_sock_cfg { + /** + * The group lock to be used by the STUN socket. If NULL, the STUN socket + * will create one internally. + * + * Default: NULL + */ + pj_grp_lock_t *grp_lock; + /** * Packet buffer size. * diff --git a/pjnath/include/pjnath/stun_transaction.h b/pjnath/include/pjnath/stun_transaction.h index 1f10fb7d..41344aa0 100644 --- a/pjnath/include/pjnath/stun_transaction.h +++ b/pjnath/include/pjnath/stun_transaction.h @@ -27,6 +27,7 @@ #include #include +#include PJ_BEGIN_DECL @@ -124,6 +125,7 @@ typedef struct pj_stun_tsx_cb * @param cfg The STUN endpoint, which will be used to retrieve * various settings for the transaction. * @param pool Pool to be used to allocate memory from. + * @param grp_lock Group lock to synchronize. * @param cb Callback structure, to be used by the transaction * to send message and to notify the application about * the completion of the transaction. @@ -133,6 +135,7 @@ typedef struct pj_stun_tsx_cb */ PJ_DECL(pj_status_t) pj_stun_client_tsx_create( pj_stun_config *cfg, pj_pool_t *pool, + pj_grp_lock_t *grp_lock, const pj_stun_tsx_cb *cb, pj_stun_client_tsx **p_tsx); @@ -159,15 +162,14 @@ pj_stun_client_tsx_schedule_destroy(pj_stun_client_tsx *tsx, /** - * Destroy a STUN client transaction immediately. This function can be - * called at any time to stop the transaction and destroy it. + * Stop the client transaction. * * @param tsx The STUN transaction. * * @return PJ_SUCCESS on success or PJ_EINVAL if the parameter * is NULL. */ -PJ_DECL(pj_status_t) pj_stun_client_tsx_destroy(pj_stun_client_tsx *tsx); +PJ_DECL(pj_status_t) pj_stun_client_tsx_stop(pj_stun_client_tsx *tsx); /** diff --git a/pjnath/include/pjnath/turn_session.h b/pjnath/include/pjnath/turn_session.h index 64bde93e..d3adc79b 100644 --- a/pjnath/include/pjnath/turn_session.h +++ b/pjnath/include/pjnath/turn_session.h @@ -417,7 +417,10 @@ PJ_DECL(const char*) pj_turn_state_name(pj_turn_state_t state); * @param name Optional name to identify this session in the log. * @param af Address family of the client connection. Currently * pj_AF_INET() and pj_AF_INET6() are supported. - * @param conn_type Connection type to the TURN server. + * @param conn_type Connection type to the TURN server. + * @param grp_lock Optional group lock object to be used by this session. + * If this value is NULL, the session will create + * a group lock internally. * @param cb Callback to receive events from the TURN session. * @param options Option flags, currently this value must be zero. * @param user_data Arbitrary application data to be associated with @@ -432,6 +435,7 @@ PJ_DECL(pj_status_t) pj_turn_session_create(const pj_stun_config *cfg, const char *name, int af, pj_turn_tp_type conn_type, + pj_grp_lock_t *grp_lock, const pj_turn_session_cb *cb, unsigned options, void *user_data, diff --git a/pjnath/include/pjnath/turn_sock.h b/pjnath/include/pjnath/turn_sock.h index f756a3b8..96990baa 100644 --- a/pjnath/include/pjnath/turn_sock.h +++ b/pjnath/include/pjnath/turn_sock.h @@ -108,6 +108,14 @@ typedef struct pj_turn_sock_cb */ typedef struct pj_turn_sock_cfg { + /** + * The group lock to be used by the STUN socket. If NULL, the STUN socket + * will create one internally. + * + * Default: NULL + */ + pj_grp_lock_t *grp_lock; + /** * Packet buffer size. * diff --git a/pjnath/src/pjnath-test/concur_test.c b/pjnath/src/pjnath-test/concur_test.c new file mode 100644 index 00000000..92c2d394 --- /dev/null +++ b/pjnath/src/pjnath-test/concur_test.c @@ -0,0 +1,360 @@ +/* $Id$ */ +/* + * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) + * Copyright (C) 2003-2008 Benny Prijono + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "test.h" + +#if INCLUDE_CONCUR_TEST + +#define THIS_FILE "concur_test.c" + +/****************************************************************************/ +#define WORKER_THREAD_CNT 4 +#define SERVER_THREAD_CNT 4 +#define MAX_SOCK_CLIENTS 80 + +struct stun_test_session +{ + pj_stun_config stun_cfg; + + pj_lock_t *lock; + + pj_thread_t *worker_threads[WORKER_THREAD_CNT]; + + pj_sock_t server_sock; + int server_port; + pj_thread_t *server_threads[SERVER_THREAD_CNT]; + pj_event_t *server_event; + + pj_bool_t thread_quit_flag; + + /* Test parameters: */ + struct { + int client_got_response; + + pj_bool_t server_wait_for_event; + pj_bool_t server_drop_request; + int client_sleep_after_start; + int client_sleep_before_destroy; + } param; +}; + +static int server_thread_proc(void *p) +{ + struct stun_test_session *test_sess = (struct stun_test_session*)p; + pj_pool_t *pool; + pj_status_t status; + + PJ_LOG(4,(THIS_FILE, "Server thread running")); + + pool = pj_pool_create(test_sess->stun_cfg.pf, "server", 512, 512, NULL); + + while (!test_sess->thread_quit_flag) { + pj_time_val timeout = {0, 10}; + pj_fd_set_t rdset; + int n; + + /* Serve client */ + PJ_FD_ZERO(&rdset); + PJ_FD_SET(test_sess->server_sock, &rdset); + n = pj_sock_select(test_sess->server_sock+1, &rdset, + NULL, NULL, &timeout); + if (n==1 && PJ_FD_ISSET(test_sess->server_sock, &rdset)) { + pj_uint8_t pkt[512]; + pj_ssize_t pkt_len; + pj_size_t res_len; + pj_sockaddr client_addr; + int addr_len; + + pj_stun_msg *stun_req, *stun_res; + + pj_pool_reset(pool); + + /* Got query */ + pkt_len = sizeof(pkt); + addr_len = sizeof(client_addr); + status = pj_sock_recvfrom(test_sess->server_sock, pkt, &pkt_len, + 0, &client_addr, &addr_len); + if (status != PJ_SUCCESS) { + continue; + } + + status = pj_stun_msg_decode(pool, pkt, pkt_len, + PJ_STUN_IS_DATAGRAM, + &stun_req, NULL, NULL); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "STUN request decode error")); + continue; + } + + status = pj_stun_msg_create_response(pool, stun_req, + PJ_STUN_SC_BAD_REQUEST, NULL, + &stun_res); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "STUN create response error")); + continue; + } + + status = pj_stun_msg_encode(stun_res, pkt, sizeof(pkt), 0, + NULL, &res_len); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "STUN encode error")); + continue; + } + + /* Ignore request */ + if (test_sess->param.server_drop_request) + continue; + + /* Wait for signal to continue */ + if (test_sess->param.server_wait_for_event) + pj_event_wait(test_sess->server_event); + + pkt_len = res_len; + pj_sock_sendto(test_sess->server_sock, pkt, &pkt_len, 0, + &client_addr, pj_sockaddr_get_len(&client_addr)); + } + } + + pj_pool_release(pool); + + PJ_LOG(4,(THIS_FILE, "Server thread quitting")); + return 0; +} + +static int worker_thread_proc(void *p) +{ + struct stun_test_session *test_sess = (struct stun_test_session*)p; + + PJ_LOG(4,(THIS_FILE, "Worker thread running")); + + while (!test_sess->thread_quit_flag) { + pj_time_val timeout = {0, 10}; + pj_timer_heap_poll(test_sess->stun_cfg.timer_heap, NULL); + pj_ioqueue_poll(test_sess->stun_cfg.ioqueue, &timeout); + } + + PJ_LOG(4,(THIS_FILE, "Worker thread quitting")); + return 0; +} + +static pj_bool_t stun_sock_on_status(pj_stun_sock *stun_sock, + pj_stun_sock_op op, + pj_status_t status) +{ + struct stun_test_session *test_sess = (struct stun_test_session*)pj_stun_sock_get_user_data(stun_sock); + test_sess->param.client_got_response++; + return PJ_TRUE; +} + +static int stun_destroy_test_session(struct stun_test_session *test_sess) +{ + + unsigned i; + pj_stun_sock_cb stun_cb; + pj_status_t status; + pj_stun_sock *stun_sock[MAX_SOCK_CLIENTS]; + + pj_bzero(&stun_cb, sizeof(stun_cb)); + stun_cb.on_status = &stun_sock_on_status; + + pj_event_reset(test_sess->server_event); + + /* Create all clients first */ + for (i=0; istun_cfg, name, pj_AF_INET(), + &stun_cb, NULL, test_sess, + &stun_sock[i]); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "Error creating stun socket")); + return -10; + } + } + + /* Start resolution */ + for (i=0; iserver_port, NULL); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "Error starting stun socket")); + return -20; + } + } + + /* settle down */ + pj_thread_sleep(test_sess->param.client_sleep_after_start); + + /* Resume server threads */ + pj_event_set(test_sess->server_event); + + pj_thread_sleep(test_sess->param.client_sleep_before_destroy); + + /* Destroy clients */ + for (i=0; ipool = pool; sess->stun_cfg = stun_cfg; + sess->param = test_param; pj_memcpy(&sess->caller.cfg, caller_cfg, sizeof(*caller_cfg)); sess->caller.result.init_status = sess->caller.result.nego_status = PJ_EPENDING; @@ -261,6 +280,8 @@ static int create_sess(pj_stun_config *stun_cfg, /* Destroy test session */ static void destroy_sess(struct test_sess *sess, unsigned wait_msec) { + unsigned i; + if (sess->caller.ice) { pj_ice_strans_destroy(sess->caller.ice); sess->caller.ice = NULL; @@ -271,6 +292,12 @@ static void destroy_sess(struct test_sess *sess, unsigned wait_msec) sess->callee.ice = NULL; } + sess->param->worker_quit = PJ_TRUE; + for (i=0; iparam->worker_cnt; ++i) { + if (sess->worker_threads[i]) + pj_thread_join(sess->worker_threads[i]); + } + poll_events(sess->stun_cfg, wait_msec, PJ_FALSE); if (sess->resolver) { @@ -326,6 +353,9 @@ static void ice_on_ice_complete(pj_ice_strans *ice_st, case PJ_ICE_STRANS_OP_NEGOTIATION: ept->result.nego_status = status; break; + case PJ_ICE_STRANS_OP_KEEP_ALIVE: + /* keep alive failed? */ + break; default: pj_assert(!"Unknown op"); } @@ -384,20 +414,20 @@ static int check_pair(const struct ice_ept *ept1, const struct ice_ept *ept2, c1 = pj_ice_strans_get_valid_pair(ept1->ice, i+1); if (c1 == NULL) { - PJ_LOG(3,("", INDENT "err: unable to get valid pair for ice1 " + PJ_LOG(3,(THIS_FILE, INDENT "err: unable to get valid pair for ice1 " "component %d", i+1)); return start_err - 2; } c2 = pj_ice_strans_get_valid_pair(ept2->ice, i+1); if (c2 == NULL) { - PJ_LOG(3,("", INDENT "err: unable to get valid pair for ice2 " + PJ_LOG(3,(THIS_FILE, INDENT "err: unable to get valid pair for ice2 " "component %d", i+1)); return start_err - 4; } if (pj_sockaddr_cmp(&c1->rcand->addr, &c2->lcand->addr) != 0) { - PJ_LOG(3,("", INDENT "err: candidate pair does not match " + PJ_LOG(3,(THIS_FILE, INDENT "err: candidate pair does not match " "for component %d", i+1)); return start_err - 6; } @@ -408,14 +438,14 @@ static int check_pair(const struct ice_ept *ept1, const struct ice_ept *ept2, if (ept1->cfg.comp_cnt>i && pj_ice_strans_get_valid_pair(ept1->ice, i+1) != NULL) { - PJ_LOG(3,("", INDENT "err: ice1 shouldn't have valid pair " + PJ_LOG(3,(THIS_FILE, INDENT "err: ice1 shouldn't have valid pair " "for component %d", i+1)); return start_err - 8; } if (ept2->cfg.comp_cnt>i && pj_ice_strans_get_valid_pair(ept2->ice, i+1) != NULL) { - PJ_LOG(3,("", INDENT "err: ice2 shouldn't have valid pair " + PJ_LOG(3,(THIS_FILE, INDENT "err: ice2 shouldn't have valid pair " "for component %d", i+1)); return start_err - 9; } @@ -436,26 +466,43 @@ static int check_pair(const struct ice_ept *ept1, const struct ice_ept *ept2, rc = PJ_SUCCESS; \ break; \ } \ - if (t.sec - t0.sec > (timeout)) break; \ + PJ_TIME_VAL_SUB(t, t0); \ + if (PJ_TIME_VAL_MSEC(t) >= (timeout)) break; \ } \ } +int worker_thread_proc(void *data) +{ + pj_status_t rc; + struct test_sess *sess = (struct test_sess *) data; + pj_stun_config *stun_cfg = sess->stun_cfg; + + /* Wait until negotiation is complete on both endpoints */ +#define ALL_DONE (sess->param->worker_quit || \ + (sess->caller.result.nego_status!=PJ_EPENDING && \ + sess->callee.result.nego_status!=PJ_EPENDING)) + WAIT_UNTIL(sess->param->worker_timeout, ALL_DONE, rc); + + return 0; +} -static int perform_test(const char *title, - pj_stun_config *stun_cfg, - unsigned server_flag, - struct test_cfg *caller_cfg, - struct test_cfg *callee_cfg) +static int perform_test2(const char *title, + pj_stun_config *stun_cfg, + unsigned server_flag, + struct test_cfg *caller_cfg, + struct test_cfg *callee_cfg, + struct sess_param *test_param) { pjlib_state pjlib_state; struct test_sess *sess; + unsigned i; int rc; - PJ_LOG(3,("", INDENT "%s", title)); + PJ_LOG(3,(THIS_FILE, INDENT "%s", title)); capture_pjlib_state(stun_cfg, &pjlib_state); - rc = create_sess(stun_cfg, server_flag, caller_cfg, callee_cfg, &sess); + rc = create_sess(stun_cfg, server_flag, caller_cfg, callee_cfg, test_param, &sess); if (rc != 0) return rc; @@ -463,10 +510,10 @@ static int perform_test(const char *title, sess->callee.result.init_status!=PJ_EPENDING) /* Wait until both ICE transports are initialized */ - WAIT_UNTIL(30, ALL_READY, rc); + WAIT_UNTIL(30000, ALL_READY, rc); if (!ALL_READY) { - PJ_LOG(3,("", INDENT "err: init timed-out")); + PJ_LOG(3,(THIS_FILE, INDENT "err: init timed-out")); destroy_sess(sess, 500); return -100; } @@ -489,7 +536,6 @@ static int perform_test(const char *title, rc = 0; goto on_return; } - /* Init ICE on caller */ rc = pj_ice_strans_init_ice(sess->caller.ice, sess->caller.cfg.role, &sess->caller.ufrag, &sess->caller.pass); @@ -507,17 +553,14 @@ static int perform_test(const char *title, destroy_sess(sess, 500); return -110; } - /* Start ICE on callee */ rc = start_ice(&sess->callee, &sess->caller); if (rc != PJ_SUCCESS) { destroy_sess(sess, 500); return -120; } - /* Wait for callee's answer_delay */ poll_events(stun_cfg, sess->callee.cfg.answer_delay, PJ_FALSE); - /* Start ICE on caller */ rc = start_ice(&sess->caller, &sess->callee); if (rc != PJ_SUCCESS) { @@ -525,13 +568,37 @@ static int perform_test(const char *title, return -130; } - /* Wait until negotiation is complete on both endpoints */ -#define ALL_DONE (sess->caller.result.nego_status!=PJ_EPENDING && \ - sess->callee.result.nego_status!=PJ_EPENDING) - WAIT_UNTIL(30, ALL_DONE, rc); + for (i=0; iparam->worker_cnt; ++i) { + pj_status_t status; + + status = pj_thread_create(sess->pool, "worker_thread", + worker_thread_proc, sess, 0, 0, + &sess->worker_threads[i]); + if (status != PJ_SUCCESS) { + PJ_LOG(3,(THIS_FILE, INDENT "err: create thread")); + destroy_sess(sess, 500); + return -135; + } + } + + if (sess->param->destroy_after_create) + goto on_destroy; + if (sess->param->destroy_after_one_done) { + while (sess->caller.result.init_status==PJ_EPENDING && + sess->callee.result.init_status==PJ_EPENDING) + { + if (sess->param->worker_cnt) + pj_thread_sleep(0); + else + poll_events(stun_cfg, 0, PJ_FALSE); + } + goto on_destroy; + } + + WAIT_UNTIL(30000, ALL_DONE, rc); if (!ALL_DONE) { - PJ_LOG(3,("", INDENT "err: negotiation timed-out")); + PJ_LOG(3,(THIS_FILE, INDENT "err: negotiation timed-out")); destroy_sess(sess, 500); return -140; } @@ -561,6 +628,7 @@ static int perform_test(const char *title, } /* Looks like everything is okay */ +on_destroy: /* Destroy ICE stream transports first to let it de-allocate * TURN relay (otherwise there'll be timer/memory leak, unless @@ -578,7 +646,7 @@ static int perform_test(const char *title, on_return: /* Wait.. */ - poll_events(stun_cfg, 500, PJ_FALSE); + poll_events(stun_cfg, 200, PJ_FALSE); /* Now destroy everything */ destroy_sess(sess, 500); @@ -591,7 +659,20 @@ on_return: return rc; } - return 0; + return rc; +} + +static int perform_test(const char *title, + pj_stun_config *stun_cfg, + unsigned server_flag, + struct test_cfg *caller_cfg, + struct test_cfg *callee_cfg) +{ + struct sess_param test_param; + + pj_bzero(&test_param, sizeof(test_param)); + return perform_test2(title, stun_cfg, server_flag, caller_cfg, + callee_cfg, &test_param); } #define ROLE1 PJ_ICE_SESS_ROLE_CONTROLLED @@ -680,7 +761,7 @@ int ice_test(void) if (rc != 0) goto on_return; } - + /* Simple test first with srflx candidate */ if (1) { struct sess_cfg_t cfg = @@ -744,7 +825,7 @@ int ice_test(void) {ROLE2, 2, NO, YES, NO, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}} }; - rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, + rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, &cfg.ua1, &cfg.ua2); if (rc != 0) goto on_return; @@ -785,6 +866,7 @@ int ice_test(void) goto on_return; } + /* STUN failure, testing TURN deallocation */ if (1) { struct sess_cfg_t cfg = @@ -792,11 +874,11 @@ int ice_test(void) "STUN failure, testing TURN deallocation", 0xFFFF & (~(CREATE_STUN_SERVER)), /* Role comp# host? stun? turn? flag? ans_del snd_del des_del */ - {ROLE1, 2, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}}, - {ROLE2, 2, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}} + {ROLE1, 1, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}}, + {ROLE2, 1, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}} }; - rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, + rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, &cfg.ua1, &cfg.ua2); if (rc != 0) goto on_return; @@ -818,7 +900,7 @@ int ice_test(void) unsigned delay[] = { 50, 2000 }; unsigned d; - PJ_LOG(3,("", " %s", cfg->title)); + PJ_LOG(3,(THIS_FILE, " %s", cfg->title)); /* For each test item, test with various answer delay */ for (d=0; dsess); + status = pj_stun_session_create(&stun_cfg, "server", &sess_cb, PJ_FALSE, NULL, &server->sess); if (status != PJ_SUCCESS) { destroy_server(); return -10; @@ -479,7 +479,7 @@ static int run_client_test(const char *title, pj_bzero(&sess_cb, sizeof(sess_cb)); sess_cb.on_request_complete = &client_on_request_complete; sess_cb.on_send_msg = &client_send_msg; - status = pj_stun_session_create(&stun_cfg, "client", &sess_cb, PJ_FALSE, &client->sess); + status = pj_stun_session_create(&stun_cfg, "client", &sess_cb, PJ_FALSE, NULL, &client->sess); if (status != PJ_SUCCESS) { destroy_client_server(); return -200; diff --git a/pjnath/src/pjnath-test/stun_sock_test.c b/pjnath/src/pjnath-test/stun_sock_test.c index 93e510eb..ebf5a2bd 100644 --- a/pjnath/src/pjnath-test/stun_sock_test.c +++ b/pjnath/src/pjnath-test/stun_sock_test.c @@ -298,7 +298,7 @@ static int timeout_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) struct stun_client *client; pj_str_t srv_addr; pj_time_val timeout, t; - int ret = 0; + int i, ret = 0; pj_status_t status; PJ_LOG(3,(THIS_FILE, " timeout test [%d]", destroy_on_err)); @@ -359,6 +359,8 @@ static int timeout_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) on_return: destroy_server(srv); destroy_client(client); + for (i=0; i<7; ++i) + handle_events(cfg, 50); return ret; } @@ -373,7 +375,7 @@ static int missing_attr_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) struct stun_client *client; pj_str_t srv_addr; pj_time_val timeout, t; - int ret = 0; + int i, ret = 0; pj_status_t status; PJ_LOG(3,(THIS_FILE, " missing attribute test [%d]", destroy_on_err)); @@ -426,6 +428,8 @@ static int missing_attr_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) on_return: destroy_server(srv); destroy_client(client); + for (i=0; i<7; ++i) + handle_events(cfg, 50); return ret; } @@ -440,7 +444,7 @@ static int keep_alive_test(pj_stun_config *cfg) pj_stun_sock_info info; pj_str_t srv_addr; pj_time_val timeout, t; - int ret = 0; + int i, ret = 0; pj_status_t status; PJ_LOG(3,(THIS_FILE, " normal operation")); @@ -791,6 +795,8 @@ static int keep_alive_test(pj_stun_config *cfg) on_return: destroy_server(srv); destroy_client(client); + for (i=0; i<7; ++i) + handle_events(cfg, 50); return ret; } diff --git a/pjnath/src/pjnath-test/test.c b/pjnath/src/pjnath-test/test.c index 12ee0ac3..b100af3d 100644 --- a/pjnath/src/pjnath-test/test.c +++ b/pjnath/src/pjnath-test/test.c @@ -34,6 +34,7 @@ pj_status_t create_stun_config(pj_pool_t *pool, pj_stun_config *stun_cfg) { pj_ioqueue_t *ioqueue; pj_timer_heap_t *timer_heap; + pj_lock_t *lock; pj_status_t status; status = pj_ioqueue_create(pool, 64, &ioqueue); @@ -49,6 +50,9 @@ pj_status_t create_stun_config(pj_pool_t *pool, pj_stun_config *stun_cfg) return status; } + pj_lock_create_recursive_mutex(pool, NULL, &lock); + pj_timer_heap_set_lock(timer_heap, lock, PJ_TRUE); + pj_stun_config_init(stun_cfg, mem, 0, ioqueue, timer_heap); return PJ_SUCCESS; @@ -105,7 +109,7 @@ void capture_pjlib_state(pj_stun_config *cfg, struct pjlib_state *st) st->timer_cnt = pj_timer_heap_count(cfg->timer_heap); - cp = (pj_caching_pool*)mem; + cp = (pj_caching_pool*)cfg->pf; st->pool_used_cnt = cp->used_count; } @@ -120,6 +124,10 @@ int check_pjlib_state(pj_stun_config *cfg, if (current_state.timer_cnt > initial_st->timer_cnt) { PJ_LOG(3,("", " error: possibly leaking timer")); rc |= ERR_TIMER_LEAK; + +#if PJ_TIMER_DEBUG + pj_timer_heap_dump(cfg->timer_heap); +#endif } if (current_state.pool_used_cnt > initial_st->pool_used_cnt) { @@ -148,6 +156,18 @@ pj_pool_factory *mem; int param_log_decor = PJ_LOG_HAS_NEWLINE | PJ_LOG_HAS_TIME | PJ_LOG_HAS_MICRO_SEC; +pj_log_func *orig_log_func; +FILE *log_file; + +static void test_log_func(int level, const char *data, int len) +{ + if (log_file) { + fwrite(data, len, 1, log_file); + } + if (level <= 3) + orig_log_func(level, data, len); +} + static int test_inner(void) { pj_caching_pool caching_pool; @@ -158,6 +178,11 @@ static int test_inner(void) #if 1 pj_log_set_level(3); pj_log_set_decor(param_log_decor); +#elif 1 + log_file = fopen("pjnath-test.log", "wt"); + pj_log_set_level(5); + orig_log_func = pj_log_get_log_func(); + pj_log_set_log_func(&test_log_func); #endif rc = pj_init(); @@ -189,7 +214,13 @@ static int test_inner(void) DO_TEST(turn_sock_test()); #endif +#if INCLUDE_CONCUR_TEST + DO_TEST(concur_test()); +#endif + on_return: + if (log_file) + fclose(log_file); return rc; } diff --git a/pjnath/src/pjnath-test/test.h b/pjnath/src/pjnath-test/test.h index 28b0cd9c..6a57bc0c 100644 --- a/pjnath/src/pjnath-test/test.h +++ b/pjnath/src/pjnath-test/test.h @@ -25,17 +25,21 @@ #define INCLUDE_ICE_TEST 1 #define INCLUDE_STUN_SOCK_TEST 1 #define INCLUDE_TURN_SOCK_TEST 1 +#define INCLUDE_CONCUR_TEST 1 int stun_test(void); int sess_auth_test(void); int stun_sock_test(void); int turn_sock_test(void); int ice_test(void); +int concur_test(void); int test_main(void); extern void app_perror(const char *title, pj_status_t rc); extern pj_pool_factory *mem; +int ice_one_conc_test(pj_stun_config *stun_cfg, int err_quit); + //////////////////////////////////// /* * Utilities diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c index 81aa8143..b6159f96 100644 --- a/pjnath/src/pjnath/ice_session.c +++ b/pjnath/src/pjnath/ice_session.c @@ -97,6 +97,7 @@ static pj_uint8_t cand_type_prefs[4] = #endif }; +#define THIS_FILE "ice_session.c" #define CHECK_NAME_LEN 128 #define LOG4(expr) PJ_LOG(4,expr) #define LOG5(expr) PJ_LOG(4,expr) @@ -134,6 +135,7 @@ typedef struct timer_data static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te); static void on_ice_complete(pj_ice_sess *ice, pj_status_t status); static void ice_keep_alive(pj_ice_sess *ice, pj_bool_t send_now); +static void ice_on_destroy(void *obj); static void destroy_ice(pj_ice_sess *ice, pj_status_t reason); static pj_status_t start_periodic_check(pj_timer_heap_t *th, @@ -288,6 +290,7 @@ static pj_status_t init_comp(pj_ice_sess *ice, /* Create STUN session for this candidate */ status = pj_stun_session_create(&ice->stun_cfg, NULL, &sess_cb, PJ_TRUE, + ice->grp_lock, &comp->stun_sess); if (status != PJ_SUCCESS) return status; @@ -332,6 +335,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg, const pj_ice_sess_cb *cb, const pj_str_t *local_ufrag, const pj_str_t *local_passwd, + pj_grp_lock_t *grp_lock, pj_ice_sess **p_ice) { pj_pool_t *pool; @@ -359,13 +363,20 @@ PJ_DEF(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg, pj_ansi_snprintf(ice->obj_name, sizeof(ice->obj_name), name, ice); - status = pj_mutex_create_recursive(pool, ice->obj_name, - &ice->mutex); - if (status != PJ_SUCCESS) { - destroy_ice(ice, status); - return status; + if (grp_lock) { + ice->grp_lock = grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &ice->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(ice->grp_lock); + pj_grp_lock_add_handler(ice->grp_lock, pool, ice, + &ice_on_destroy); + pj_memcpy(&ice->cb, cb, sizeof(*cb)); pj_memcpy(&ice->stun_cfg, stun_cfg, sizeof(*stun_cfg)); @@ -443,6 +454,21 @@ PJ_DEF(pj_status_t) pj_ice_sess_set_options(pj_ice_sess *ice, } +/* + * Callback to really destroy the session + */ +static void ice_on_destroy(void *obj) +{ + pj_ice_sess *ice = (pj_ice_sess*) obj; + + if (ice->pool) { + pj_pool_t *pool = ice->pool; + ice->pool = NULL; + pj_pool_release(pool); + } + LOG4((THIS_FILE, "ICE session %p destroyed", ice)); +} + /* * Destroy */ @@ -452,22 +478,20 @@ static void destroy_ice(pj_ice_sess *ice, unsigned i; if (reason == PJ_SUCCESS) { - LOG4((ice->obj_name, "Destroying ICE session")); + LOG4((ice->obj_name, "Destroying ICE session %p", ice)); } - ice->is_destroying = PJ_TRUE; + pj_grp_lock_acquire(ice->grp_lock); - /* Let other callbacks finish */ - if (ice->mutex) { - pj_mutex_lock(ice->mutex); - pj_mutex_unlock(ice->mutex); + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return; } - if (ice->timer.id) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, - &ice->timer); - ice->timer.id = PJ_FALSE; - } + ice->is_destroying = PJ_TRUE; + + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, + &ice->timer, PJ_FALSE); for (i=0; icomp_cnt; ++i) { if (ice->comp[i].stun_sess) { @@ -476,21 +500,12 @@ static void destroy_ice(pj_ice_sess *ice, } } - if (ice->clist.timer.id) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->clist.timer); - ice->clist.timer.id = PJ_FALSE; - } - - if (ice->mutex) { - pj_mutex_destroy(ice->mutex); - ice->mutex = NULL; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, + &ice->clist.timer, + PJ_FALSE); - if (ice->pool) { - pj_pool_t *pool = ice->pool; - ice->pool = NULL; - pj_pool_release(pool); - } + pj_grp_lock_dec_ref(ice->grp_lock); + pj_grp_lock_release(ice->grp_lock); } @@ -709,7 +724,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice, PJ_EINVAL); PJ_ASSERT_RETURN(comp_id <= ice->comp_cnt, PJ_EINVAL); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); if (ice->lcand_cnt >= PJ_ARRAY_SIZE(ice->lcand)) { status = PJ_ETOOMANY; @@ -749,7 +764,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice, ++ice->lcand_cnt; on_error: - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return status; } @@ -766,7 +781,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, *cand_id = -1; - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); /* First find in valid list if we have nominated pair */ for (i=0; ivalid_list.count; ++i) { @@ -774,7 +789,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, if (check->lcand->comp_id == comp_id) { *cand_id = GET_LCAND_ID(check->lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } @@ -786,7 +801,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, lcand->type == PJ_ICE_CAND_TYPE_RELAYED) { *cand_id = GET_LCAND_ID(lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } @@ -799,7 +814,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, lcand->type == PJ_ICE_CAND_TYPE_PRFLX)) { *cand_id = GET_LCAND_ID(lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } @@ -811,13 +826,13 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, lcand->type == PJ_ICE_CAND_TYPE_HOST) { *cand_id = GET_LCAND_ID(lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } /* Still no candidate is found! :( */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_assert(!"Should have a candidate by now"); return PJ_EBUG; @@ -1127,14 +1142,20 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te) { pj_ice_sess *ice = (pj_ice_sess*) te->user_data; enum timer_type type = (enum timer_type)te->id; - pj_bool_t has_mutex = PJ_TRUE; PJ_UNUSED_ARG(th); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); te->id = TIMER_NONE; + if (ice->is_destroying) { + /* Stray timer, could happen when destroy is invoked while callback + * is pending. */ + pj_grp_lock_release(ice->grp_lock); + return; + } + switch (type) { case TIMER_CONTROLLED_WAIT_NOM: LOG4((ice->obj_name, @@ -1157,8 +1178,6 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te) /* Release mutex in case app destroy us in the callback */ ice_status = ice->ice_status; on_ice_complete = ice->cb.on_ice_complete; - has_mutex = PJ_FALSE; - pj_mutex_unlock(ice->mutex); /* Notify app about ICE completion*/ if (on_ice_complete) @@ -1176,8 +1195,7 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te) break; } - if (has_mutex) - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); } /* Send keep-alive */ @@ -1235,8 +1253,10 @@ done: ice->comp_cnt; pj_time_val_normalize(&delay); - ice->timer.id = TIMER_KEEP_ALIVE; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, &ice->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_KEEP_ALIVE, + ice->grp_lock); } else { pj_assert(!"Not expected any timer active"); @@ -1250,10 +1270,8 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) ice->is_complete = PJ_TRUE; ice->ice_status = status; - if (ice->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer); - ice->timer.id = TIMER_NONE; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer, + TIMER_NONE); /* Log message */ LOG4((ice->obj_name, "ICE process complete, status=%s", @@ -1266,9 +1284,10 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) if (ice->cb.on_ice_complete) { pj_time_val delay = {0, 0}; - ice->timer.id = TIMER_COMPLETION_CALLBACK; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &ice->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_COMPLETION_CALLBACK, + ice->grp_lock); } } } @@ -1496,10 +1515,11 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice, delay.msec = ice->opt.controlled_agent_want_nom_timeout; pj_time_val_normalize(&delay); - ice->timer.id = TIMER_CONTROLLED_WAIT_NOM; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &ice->timer, - &delay); + pj_timer_heap_schedule_w_grp_lock( + ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_CONTROLLED_WAIT_NOM, + ice->grp_lock); LOG5((ice->obj_name, "All checks have completed. Controlled agent now " @@ -1575,10 +1595,8 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice, "Scheduling nominated check in %d ms", ice->opt.nominated_check_delay)); - if (ice->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer); - ice->timer.id = TIMER_NONE; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer, + TIMER_NONE); /* All components have valid pair. Let connectivity checks run for * a little bit more time, then start our nominated check. @@ -1587,8 +1605,10 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice, delay.msec = ice->opt.nominated_check_delay; pj_time_val_normalize(&delay); - ice->timer.id = TIMER_START_NOMINATED_CHECK; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, &ice->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_START_NOMINATED_CHECK, + ice->grp_lock); return PJ_FALSE; } @@ -1618,7 +1638,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( PJ_ASSERT_RETURN(rcand_cnt + ice->rcand_cnt <= PJ_ICE_MAX_CAND, PJ_ETOOMANY); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); /* Save credentials */ username.ptr = buf; @@ -1666,7 +1686,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( pj_ice_sess_check *chk = &clist->checks[clist->count]; if (clist->count >= PJ_ICE_MAX_CHECKS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_ETOOMANY; } @@ -1694,7 +1714,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( /* This could happen if candidates have no matching address families */ if (clist->count == 0) { LOG4((ice->obj_name, "Error: no checklist can be created")); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_ENOTFOUND; } @@ -1704,7 +1724,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( /* Prune the checklist */ status = prune_checklist(ice, clist); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return status; } @@ -1731,7 +1751,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( /* Log checklist */ dump_checklist("Checklist created:", ice, clist); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -1850,13 +1870,10 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, ice = td->ice; clist = td->clist; - if (ice->is_destroying) - return PJ_SUCCESS; - - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); if (ice->is_destroying) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -1878,7 +1895,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, if (check->state == PJ_ICE_SESS_CHECK_STATE_WAITING) { status = perform_check(ice, clist, i, ice->is_nominating); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return status; } @@ -1898,7 +1915,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, if (check->state == PJ_ICE_SESS_CHECK_STATE_FROZEN) { status = perform_check(ice, clist, i, ice->is_nominating); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return status; } @@ -1915,12 +1932,12 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, /* Schedule for next timer */ pj_time_val timeout = {0, PJ_ICE_TA_VAL}; - te->id = PJ_TRUE; pj_time_val_normalize(&timeout); - pj_timer_heap_schedule(th, te, &timeout); + pj_timer_heap_schedule_w_grp_lock(th, te, &timeout, PJ_TRUE, + ice->grp_lock); } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return PJ_SUCCESS; } @@ -1940,8 +1957,8 @@ static void start_nominated_check(pj_ice_sess *ice) /* Stop our timer if it's active */ if (ice->timer.id == TIMER_START_NOMINATED_CHECK) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer); - ice->timer.id = TIMER_NONE; + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer, + TIMER_NONE); } /* For each component, set the check state of valid check with @@ -1969,18 +1986,15 @@ static void start_nominated_check(pj_ice_sess *ice) } /* And (re)start the periodic check */ - if (ice->clist.timer.id) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->clist.timer); - ice->clist.timer.id = PJ_FALSE; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, + &ice->clist.timer, PJ_FALSE); - ice->clist.timer.id = PJ_TRUE; delay.sec = delay.msec = 0; - status = pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &ice->clist.timer, &delay); - if (status != PJ_SUCCESS) { - ice->clist.timer.id = PJ_FALSE; - } else { + status = pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->clist.timer, &delay, + PJ_TRUE, + ice->grp_lock); + if (status == PJ_SUCCESS) { LOG5((ice->obj_name, "Periodic timer rescheduled..")); } @@ -2030,7 +2044,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice) PJ_ASSERT_RETURN(ice->clist.count > 0, PJ_EINVALIDOP); /* Lock session */ - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); LOG4((ice->obj_name, "Starting ICE check..")); pj_log_push_indent(); @@ -2060,7 +2074,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice) } if (i == clist->count) { pj_assert(!"Unable to find checklist for component 1"); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return PJNATH_EICEINCOMPID; } @@ -2114,15 +2128,15 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice) * instead to reduce stack usage: * return start_periodic_check(ice->stun_cfg.timer_heap, &clist->timer); */ - clist->timer.id = PJ_TRUE; delay.sec = delay.msec = 0; - status = pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &clist->timer, &delay); + status = pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &clist->timer, &delay, + PJ_TRUE, ice->grp_lock); if (status != PJ_SUCCESS) { clist->timer.id = PJ_FALSE; } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return status; } @@ -2143,9 +2157,22 @@ static pj_status_t on_stun_send_msg(pj_stun_session *sess, stun_data *sd = (stun_data*) pj_stun_session_get_user_data(sess); pj_ice_sess *ice = sd->ice; pj_ice_msg_data *msg_data = (pj_ice_msg_data*) token; + pj_status_t status; - return (*ice->cb.on_tx_pkt)(ice, sd->comp_id, msg_data->transport_id, - pkt, pkt_size, dst_addr, addr_len); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + /* Stray retransmit timer that could happen while + * we're being destroyed */ + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } + + status = (*ice->cb.on_tx_pkt)(ice, sd->comp_id, msg_data->transport_id, + pkt, pkt_size, dst_addr, addr_len); + + pj_grp_lock_release(ice->grp_lock); + return status; } @@ -2180,7 +2207,13 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, pj_assert(tdata == check->tdata); check->tdata = NULL; - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + /* Not sure if this is possible but just in case */ + pj_grp_lock_release(ice->grp_lock); + return; + } /* Init lcand to NULL. lcand will be found from the mapped address * found in the response. @@ -2231,7 +2264,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, perform_check(ice, clist, msg_data->data.req.ckid, check->nominated || ice->is_nominating); pj_log_pop_indent(); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2246,7 +2279,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status); on_check_complete(ice, check); pj_log_pop_indent(); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2270,7 +2303,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status); on_check_complete(ice, check); pj_log_pop_indent(); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2303,7 +2336,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, PJNATH_ESTUNNOMAPPEDADDR); on_check_complete(ice, check); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2351,7 +2384,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status); on_check_complete(ice, check); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2411,11 +2444,11 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, */ if (on_check_complete(ice, check)) { /* ICE complete! */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); } @@ -2456,7 +2489,12 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, sd = (stun_data*) pj_stun_session_get_user_data(sess); ice = sd->ice; - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } /* * Note: @@ -2471,7 +2509,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_PRIORITY, 0); if (prio_attr == NULL) { LOG5((ice->obj_name, "Received Binding request with no PRIORITY")); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -2516,7 +2554,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, pj_stun_session_respond(sess, rdata, PJ_STUN_SC_ROLE_CONFLICT, NULL, token, PJ_TRUE, src_addr, src_addr_len); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -2528,7 +2566,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, pj_stun_session_respond(sess, rdata, PJ_STUN_SC_ROLE_CONFLICT, NULL, token, PJ_TRUE, src_addr, src_addr_len); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } else { /* Switch role to controlled */ @@ -2543,7 +2581,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, */ status = pj_stun_session_create_res(sess, rdata, 0, NULL, &tdata); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return status; } @@ -2595,7 +2633,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, handle_incoming_check(ice, rcheck); } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -2884,18 +2922,23 @@ PJ_DEF(pj_status_t) pj_ice_sess_send_data(pj_ice_sess *ice, return PJNATH_EICEINCOMPID; } - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } comp = find_comp(ice, comp_id); if (comp == NULL) { status = PJNATH_EICEINCOMPID; - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); goto on_return; } if (comp->valid_check == NULL) { status = PJNATH_EICEINPROGRESS; - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); goto on_return; } @@ -2904,7 +2947,9 @@ PJ_DEF(pj_status_t) pj_ice_sess_send_data(pj_ice_sess *ice, pj_sockaddr_cp(&addr, &comp->valid_check->rcand->addr); /* Release the mutex now to avoid deadlock (see ticket #1451). */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); + + PJ_RACE_ME(5); status = (*ice->cb.on_tx_pkt)(ice, comp_id, transport_id, data, data_len, @@ -2931,11 +2976,16 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice, PJ_ASSERT_RETURN(ice, PJ_EINVAL); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } comp = find_comp(ice, comp_id); if (comp == NULL) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJNATH_EICEINCOMPID; } @@ -2948,7 +2998,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice, } if (msg_data == NULL) { pj_assert(!"Invalid transport ID"); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_EINVAL; } @@ -2968,12 +3018,14 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice, LOG4((ice->obj_name, "Error processing incoming message: %s", ice->tmp.errmsg)); } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); } else { /* Not a STUN packet. Call application's callback instead, but release * the mutex now or otherwise we may get deadlock. */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); + + PJ_RACE_ME(5); (*ice->cb.on_rx_data)(ice, comp_id, transport_id, pkt, pkt_size, src_addr, src_addr_len); diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c index a798586b..4c1182fd 100644 --- a/pjnath/src/pjnath/ice_strans.c +++ b/pjnath/src/pjnath/ice_strans.c @@ -126,13 +126,11 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, /* Forward decls */ +static void ice_st_on_destroy(void *obj); static void destroy_ice_st(pj_ice_strans *ice_st); #define ice_st_perror(ice_st,msg,rc) pjnath_perror(ice_st->obj_name,msg,rc) static void sess_init_update(pj_ice_strans *ice_st); -static void sess_add_ref(pj_ice_strans *ice_st); -static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st); - /** * This structure describes an ICE stream transport component. A component * in ICE stream transport typically corresponds to a single socket created @@ -172,7 +170,7 @@ struct pj_ice_strans void *user_data; /**< Application data. */ pj_ice_strans_cfg cfg; /**< Configuration. */ pj_ice_strans_cb cb; /**< Application callback. */ - pj_lock_t *init_lock; /**< Initialization mutex. */ + pj_grp_lock_t *grp_lock; /**< Group lock. */ pj_ice_strans_state state; /**< Session state. */ pj_ice_sess *ice; /**< ICE session. */ @@ -183,7 +181,6 @@ struct pj_ice_strans pj_timer_entry ka_timer; /**< STUN keep-alive timer. */ - pj_atomic_t *busy_cnt; /**< To prevent destroy */ pj_bool_t destroy_req;/**< Destroy has been called? */ pj_bool_t cb_called; /**< Init error callback called?*/ }; @@ -551,23 +548,22 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, comp_cnt)); pj_log_push_indent(); - pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg); - pj_memcpy(&ice_st->cb, cb, sizeof(*cb)); - - status = pj_atomic_create(pool, 0, &ice_st->busy_cnt); - if (status != PJ_SUCCESS) { - destroy_ice_st(ice_st); - return status; - } - - status = pj_lock_create_recursive_mutex(pool, ice_st->obj_name, - &ice_st->init_lock); + status = pj_grp_lock_create(pool, NULL, &ice_st->grp_lock); if (status != PJ_SUCCESS) { - destroy_ice_st(ice_st); + pj_pool_release(pool); pj_log_pop_indent(); return status; } + pj_grp_lock_add_ref(ice_st->grp_lock); + pj_grp_lock_add_handler(ice_st->grp_lock, pool, ice_st, + &ice_st_on_destroy); + + pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg); + ice_st->cfg.stun.cfg.grp_lock = ice_st->grp_lock; + ice_st->cfg.turn.cfg.grp_lock = ice_st->grp_lock; + pj_memcpy(&ice_st->cb, cb, sizeof(*cb)); + ice_st->comp_cnt = comp_cnt; ice_st->comp = (pj_ice_strans_comp**) pj_pool_calloc(pool, comp_cnt, sizeof(pj_ice_strans_comp*)); @@ -578,12 +574,12 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, /* Acquire initialization mutex to prevent callback to be * called before we finish initialization. */ - pj_lock_acquire(ice_st->init_lock); + pj_grp_lock_acquire(ice_st->grp_lock); for (i=0; iinit_lock); + pj_grp_lock_release(ice_st->grp_lock); destroy_ice_st(ice_st); pj_log_pop_indent(); return status; @@ -591,9 +587,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, } /* Done with initialization */ - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); - PJ_LOG(4,(ice_st->obj_name, "ICE stream transport created")); + PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p created", ice_st)); *p_ice_st = ice_st; @@ -605,14 +601,35 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, return PJ_SUCCESS; } +/* REALLY destroy ICE */ +static void ice_st_on_destroy(void *obj) +{ + pj_ice_strans *ice_st = (pj_ice_strans*)obj; + + PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p destroyed", obj)); + + /* Done */ + pj_pool_release(ice_st->pool); +} + /* Destroy ICE */ static void destroy_ice_st(pj_ice_strans *ice_st) { unsigned i; - PJ_LOG(5,(ice_st->obj_name, "ICE stream transport destroying..")); + PJ_LOG(5,(ice_st->obj_name, "ICE stream transport %p destroy request..", + ice_st)); pj_log_push_indent(); + pj_grp_lock_acquire(ice_st->grp_lock); + + if (ice_st->destroy_req) { + pj_grp_lock_release(ice_st->grp_lock); + return; + } + + ice_st->destroy_req = PJ_TRUE; + /* Destroy ICE if we have ICE */ if (ice_st->ice) { pj_ice_sess_destroy(ice_st->ice); @@ -623,38 +640,19 @@ static void destroy_ice_st(pj_ice_strans *ice_st) for (i=0; icomp_cnt; ++i) { if (ice_st->comp[i]) { if (ice_st->comp[i]->stun_sock) { - pj_stun_sock_set_user_data(ice_st->comp[i]->stun_sock, NULL); pj_stun_sock_destroy(ice_st->comp[i]->stun_sock); ice_st->comp[i]->stun_sock = NULL; } if (ice_st->comp[i]->turn_sock) { - pj_turn_sock_set_user_data(ice_st->comp[i]->turn_sock, NULL); pj_turn_sock_destroy(ice_st->comp[i]->turn_sock); ice_st->comp[i]->turn_sock = NULL; } } } - ice_st->comp_cnt = 0; - - /* Destroy mutex */ - if (ice_st->init_lock) { - pj_lock_acquire(ice_st->init_lock); - pj_lock_release(ice_st->init_lock); - pj_lock_destroy(ice_st->init_lock); - ice_st->init_lock = NULL; - } - /* Destroy reference counter */ - if (ice_st->busy_cnt) { - pj_assert(pj_atomic_get(ice_st->busy_cnt)==0); - pj_atomic_destroy(ice_st->busy_cnt); - ice_st->busy_cnt = NULL; - } + pj_grp_lock_dec_ref(ice_st->grp_lock); + pj_grp_lock_release(ice_st->grp_lock); - PJ_LOG(4,(ice_st->obj_name, "ICE stream transport destroyed")); - - /* Done */ - pj_pool_release(ice_st->pool); pj_log_pop_indent(); } @@ -739,44 +737,11 @@ static void sess_init_update(pj_ice_strans *ice_st) */ PJ_DEF(pj_status_t) pj_ice_strans_destroy(pj_ice_strans *ice_st) { - PJ_ASSERT_RETURN(ice_st, PJ_EINVAL); - - sess_add_ref(ice_st); - ice_st->destroy_req = PJ_TRUE; - if (sess_dec_ref(ice_st)) { - PJ_LOG(5,(ice_st->obj_name, - "ICE strans object is busy, will destroy later")); - return PJ_EPENDING; - } - + destroy_ice_st(ice_st); return PJ_SUCCESS; } -/* - * Increment busy counter. - */ -static void sess_add_ref(pj_ice_strans *ice_st) -{ - pj_atomic_inc(ice_st->busy_cnt); -} - -/* - * Decrement busy counter. If the counter has reached zero and destroy - * has been requested, destroy the object and return FALSE. - */ -static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st) -{ - int count = pj_atomic_dec_and_get(ice_st->busy_cnt); - pj_assert(count >= 0); - if (count==0 && ice_st->destroy_req) { - destroy_ice_st(ice_st); - return PJ_FALSE; - } else { - return PJ_TRUE; - } -} - /* * Get user data */ @@ -840,7 +805,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st, /* Create! */ status = pj_ice_sess_create(&ice_st->cfg.stun_cfg, ice_st->obj_name, role, ice_st->comp_cnt, &ice_cb, - local_ufrag, local_passwd, &ice_st->ice); + local_ufrag, local_passwd, + ice_st->grp_lock, + &ice_st->ice); if (status != PJ_SUCCESS) return status; @@ -1255,7 +1222,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) pj_time_val t; unsigned msec; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); pj_gettimeofday(&t); PJ_TIME_VAL_SUB(t, ice_st->start_time); @@ -1337,7 +1304,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) } - sess_dec_ref(ice_st); + pj_grp_lock_dec_ref(ice_st->grp_lock); } /* @@ -1426,7 +1393,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock, ice_st = comp->ice_st; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); if (ice_st->ice == NULL) { /* The ICE session is gone, but we're still receiving packets. @@ -1451,7 +1418,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock, } } - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE; } /* Notifification when asynchronous send operation to the STUN socket @@ -1482,10 +1449,10 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, comp = (pj_ice_strans_comp*) pj_stun_sock_get_user_data(stun_sock); ice_st = comp->ice_st; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); /* Wait until initialization completes */ - pj_lock_acquire(ice_st->init_lock); + pj_grp_lock_acquire(ice_st->grp_lock); /* Find the srflx cancidate */ for (i=0; icand_cnt; ++i) { @@ -1495,14 +1462,14 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, } } - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); /* It is possible that we don't have srflx candidate even though this * callback is called. This could happen when we cancel adding srflx * candidate due to initialization error. */ if (cand == NULL) { - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE; } switch (op) { @@ -1618,7 +1585,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, break; } - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock)? PJ_FALSE : PJ_TRUE; } /* Callback when TURN socket has received a packet */ @@ -1637,7 +1604,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock, return; } - sess_add_ref(comp->ice_st); + pj_grp_lock_add_ref(comp->ice_st->grp_lock); if (comp->ice_st->ice == NULL) { /* The ICE session is gone, but we're still receiving packets. @@ -1664,7 +1631,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock, } } - sess_dec_ref(comp->ice_st); + pj_grp_lock_dec_ref(comp->ice_st->grp_lock); } @@ -1686,7 +1653,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_name(old_state), pj_turn_state_name(new_state))); pj_log_push_indent(); - sess_add_ref(comp->ice_st); + pj_grp_lock_add_ref(comp->ice_st->grp_lock); if (new_state == PJ_TURN_STATE_READY) { pj_turn_session_info rel_info; @@ -1700,7 +1667,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_sock_get_info(turn_sock, &rel_info); /* Wait until initialization completes */ - pj_lock_acquire(comp->ice_st->init_lock); + pj_grp_lock_acquire(comp->ice_st->grp_lock); /* Find relayed candidate in the component */ for (i=0; icand_cnt; ++i) { @@ -1711,7 +1678,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, } pj_assert(cand != NULL); - pj_lock_release(comp->ice_st->init_lock); + pj_grp_lock_release(comp->ice_st->grp_lock); /* Update candidate */ pj_sockaddr_cp(&cand->addr, &rel_info.relay_addr); @@ -1744,22 +1711,27 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_sock_set_user_data(turn_sock, NULL); comp->turn_sock = NULL; - /* Set session to fail if we're still initializing */ - if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) { - sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT, - "TURN allocation failed", info.last_status); - } else if (comp->turn_err_cnt > 1) { - sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE, - "TURN refresh failed", info.last_status); - } else { - PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status, - "Comp %d: TURN allocation failed, retrying", - comp->comp_id)); - add_update_turn(comp->ice_st, comp); + /* Set session to fail on error. last_status PJ_SUCCESS means normal + * deallocation, which should not trigger sess_fail as it may have + * been initiated by ICE destroy + */ + if (info.last_status != PJ_SUCCESS) { + if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) { + sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT, + "TURN allocation failed", info.last_status); + } else if (comp->turn_err_cnt > 1) { + sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE, + "TURN refresh failed", info.last_status); + } else { + PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status, + "Comp %d: TURN allocation failed, retrying", + comp->comp_id)); + add_update_turn(comp->ice_st, comp); + } } } - sess_dec_ref(comp->ice_st); + pj_grp_lock_dec_ref(comp->ice_st->grp_lock); pj_log_pop_indent(); } diff --git a/pjnath/src/pjnath/nat_detect.c b/pjnath/src/pjnath/nat_detect.c index f9644c2b..3dfed92a 100644 --- a/pjnath/src/pjnath/nat_detect.c +++ b/pjnath/src/pjnath/nat_detect.c @@ -307,7 +307,7 @@ PJ_DEF(pj_status_t) pj_stun_detect_nat_type(const pj_sockaddr_in *server, sess_cb.on_request_complete = &on_request_complete; sess_cb.on_send_msg = &on_send_msg; status = pj_stun_session_create(stun_cfg, pool->obj_name, &sess_cb, - PJ_FALSE, &sess->stun_sess); + PJ_FALSE, NULL, &sess->stun_sess); if (status != PJ_SUCCESS) goto on_error; diff --git a/pjnath/src/pjnath/stun_session.c b/pjnath/src/pjnath/stun_session.c index 4feae906..173c729d 100644 --- a/pjnath/src/pjnath/stun_session.c +++ b/pjnath/src/pjnath/stun_session.c @@ -25,13 +25,10 @@ struct pj_stun_session { pj_stun_config *cfg; pj_pool_t *pool; - pj_lock_t *lock; - pj_bool_t delete_lock; + pj_grp_lock_t *grp_lock; pj_stun_session_cb cb; void *user_data; - - pj_atomic_t *busy; - pj_bool_t destroy_request; + pj_bool_t is_destroying; pj_bool_t use_fingerprint; @@ -55,14 +52,15 @@ struct pj_stun_session }; #define SNAME(s_) ((s_)->pool->obj_name) +#define THIS_FILE "stun_session.c" -#if PJ_LOG_MAX_LEVEL >= 5 +#if 1 # define TRACE_(expr) PJ_LOG(5,expr) #else # define TRACE_(expr) #endif -#define LOG_ERR_(sess,title,rc) pjnath_perror(sess->pool->obj_name,title,rc) +#define LOG_ERR_(sess,title,rc) PJ_PERROR(3,(sess->pool->obj_name,rc,title)) #define TDATA_POOL_SIZE PJNATH_POOL_LEN_STUN_TDATA #define TDATA_POOL_INC PJNATH_POOL_INC_STUN_TDATA @@ -77,6 +75,7 @@ static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx, const void *stun_pkt, pj_size_t pkt_size); static void stun_tsx_on_destroy(pj_stun_client_tsx *tsx); +static void stun_sess_on_destroy(void *comp); static pj_stun_tsx_cb tsx_cb = { @@ -148,31 +147,38 @@ static void stun_tsx_on_destroy(pj_stun_client_tsx *tsx) pj_stun_tx_data *tdata; tdata = (pj_stun_tx_data*) pj_stun_client_tsx_get_data(tsx); - tsx_erase(tdata->sess, tdata); + pj_stun_client_tsx_stop(tsx); + if (tdata) { + tsx_erase(tdata->sess, tdata); + pj_pool_release(tdata->pool); + } - pj_stun_client_tsx_destroy(tsx); - pj_pool_release(tdata->pool); + TRACE_((THIS_FILE, "STUN transaction %p destroyed", tsx)); } static void destroy_tdata(pj_stun_tx_data *tdata, pj_bool_t force) { + TRACE_((THIS_FILE, "tdata %p destroy request, force=%d, tsx=%p", tdata, + force, tdata->client_tsx)); + if (tdata->res_timer.id != PJ_FALSE) { - pj_timer_heap_cancel(tdata->sess->cfg->timer_heap, - &tdata->res_timer); - tdata->res_timer.id = PJ_FALSE; + pj_timer_heap_cancel_if_active(tdata->sess->cfg->timer_heap, + &tdata->res_timer, PJ_FALSE); pj_list_erase(tdata); } if (force) { + pj_list_erase(tdata); if (tdata->client_tsx) { - tsx_erase(tdata->sess, tdata); - pj_stun_client_tsx_destroy(tdata->client_tsx); + pj_stun_client_tsx_stop(tdata->client_tsx); + pj_stun_client_tsx_set_data(tdata->client_tsx, NULL); } pj_pool_release(tdata->pool); } else { if (tdata->client_tsx) { - pj_time_val delay = {2, 0}; + /* "Probably" this is to absorb retransmission */ + pj_time_val delay = {0, 300}; pj_stun_client_tsx_schedule_destroy(tdata->client_tsx, &delay); } else { @@ -206,7 +212,7 @@ static void on_cache_timeout(pj_timer_heap_t *timer_heap, PJ_LOG(5,(SNAME(tdata->sess), "Response cache deleted")); pj_list_erase(tdata); - pj_stun_msg_destroy_tdata(tdata->sess, tdata); + destroy_tdata(tdata, PJ_FALSE); } static pj_status_t apply_msg_options(pj_stun_session *sess, @@ -419,8 +425,12 @@ static void stun_tsx_on_complete(pj_stun_client_tsx *tsx, sess = tdata->sess; /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_stun_msg_destroy_tdata(sess, tdata); + pj_grp_lock_release(sess->grp_lock); + return; + } /* Handle authentication challenge */ handle_auth_challenge(sess, tdata, response, src_addr, @@ -434,15 +444,13 @@ static void stun_tsx_on_complete(pj_stun_client_tsx *tsx, /* Destroy the transmit data. This will remove the transaction * from the pending list too. */ - pj_stun_msg_destroy_tdata(sess, tdata); + if (status == PJNATH_ESTUNTIMEDOUT) + destroy_tdata(tdata, PJ_TRUE); + else + destroy_tdata(tdata, PJ_FALSE); tdata = NULL; - pj_lock_release(sess->lock); - - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return; - } + pj_grp_lock_release(sess->grp_lock); } static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx, @@ -457,20 +465,21 @@ static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx, sess = tdata->sess; /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + /* Stray timer */ + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = sess->cb.on_send_msg(tdata->sess, tdata->token, stun_pkt, pkt_size, tdata->dst_addr, tdata->addr_len); - pj_lock_release(sess->lock); + if (pj_grp_lock_release(sess->grp_lock)) + return PJ_EGONE; - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } else { - return status; - } + return status; } /* **************************************************************************/ @@ -479,6 +488,7 @@ PJ_DEF(pj_status_t) pj_stun_session_create( pj_stun_config *cfg, const char *name, const pj_stun_session_cb *cb, pj_bool_t fingerprint, + pj_grp_lock_t *grp_lock, pj_stun_session **p_sess) { pj_pool_t *pool; @@ -501,46 +511,37 @@ PJ_DEF(pj_status_t) pj_stun_session_create( pj_stun_config *cfg, sess->use_fingerprint = fingerprint; sess->log_flag = 0xFFFF; + if (grp_lock) { + sess->grp_lock = grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &sess->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } + } + + pj_grp_lock_add_ref(sess->grp_lock); + pj_grp_lock_add_handler(sess->grp_lock, pool, sess, + &stun_sess_on_destroy); + pj_stun_session_set_software_name(sess, &cfg->software_name); - sess->rx_pool = pj_pool_create(sess->cfg->pf, name, - PJNATH_POOL_LEN_STUN_TDATA, + sess->rx_pool = pj_pool_create(sess->cfg->pf, name, + PJNATH_POOL_LEN_STUN_TDATA, PJNATH_POOL_INC_STUN_TDATA, NULL); pj_list_init(&sess->pending_request_list); pj_list_init(&sess->cached_response_list); - status = pj_lock_create_recursive_mutex(pool, name, &sess->lock); - if (status != PJ_SUCCESS) { - pj_pool_release(pool); - return status; - } - sess->delete_lock = PJ_TRUE; - - status = pj_atomic_create(pool, 0, &sess->busy); - if (status != PJ_SUCCESS) { - pj_lock_destroy(sess->lock); - pj_pool_release(pool); - return status; - } - *p_sess = sess; return PJ_SUCCESS; } -PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) +static void stun_sess_on_destroy(void *comp) { - PJ_ASSERT_RETURN(sess, PJ_EINVAL); - - pj_lock_acquire(sess->lock); - - /* Can't destroy if we're in a callback */ - sess->destroy_request = PJ_TRUE; - if (pj_atomic_get(sess->busy)) { - pj_lock_release(sess->lock); - return PJ_EPENDING; - } + pj_stun_session *sess = (pj_stun_session*)comp; while (!pj_list_empty(&sess->pending_request_list)) { pj_stun_tx_data *tdata = sess->pending_request_list.next; @@ -551,11 +552,6 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) pj_stun_tx_data *tdata = sess->cached_response_list.next; destroy_tdata(tdata, PJ_TRUE); } - pj_lock_release(sess->lock); - - if (sess->delete_lock) { - pj_lock_destroy(sess->lock); - } if (sess->rx_pool) { pj_pool_release(sess->rx_pool); @@ -564,6 +560,47 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) pj_pool_release(sess->pool); + TRACE_((THIS_FILE, "STUN session %p destroyed", sess)); +} + +PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) +{ + pj_stun_tx_data *tdata; + + PJ_ASSERT_RETURN(sess, PJ_EINVAL); + + TRACE_((SNAME(sess), "STUN session %p destroy request, ref_cnt=%d", + sess, pj_grp_lock_get_ref(sess->grp_lock))); + + pj_grp_lock_acquire(sess->grp_lock); + + if (sess->is_destroying) { + /* Prevent from decrementing the ref counter more than once */ + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + + sess->is_destroying = PJ_TRUE; + + /* We need to stop transactions and cached response because they are + * holding the group lock's reference counter while retransmitting. + */ + tdata = sess->pending_request_list.next; + while (tdata != &sess->pending_request_list) { + if (tdata->client_tsx) + pj_stun_client_tsx_stop(tdata->client_tsx); + tdata = tdata->next; + } + + tdata = sess->cached_response_list.next; + while (tdata != &sess->cached_response_list) { + pj_timer_heap_cancel_if_active(tdata->sess->cfg->timer_heap, + &tdata->res_timer, PJ_FALSE); + tdata = tdata->next; + } + + pj_grp_lock_dec_ref(sess->grp_lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -572,9 +609,9 @@ PJ_DEF(pj_status_t) pj_stun_session_set_user_data( pj_stun_session *sess, void *user_data) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); sess->user_data = user_data; - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -584,35 +621,16 @@ PJ_DEF(void*) pj_stun_session_get_user_data(pj_stun_session *sess) return sess->user_data; } -PJ_DEF(pj_status_t) pj_stun_session_set_lock( pj_stun_session *sess, - pj_lock_t *lock, - pj_bool_t auto_del) -{ - pj_lock_t *old_lock = sess->lock; - pj_bool_t old_del; - - PJ_ASSERT_RETURN(sess && lock, PJ_EINVAL); - - pj_lock_acquire(old_lock); - sess->lock = lock; - old_del = sess->delete_lock; - sess->delete_lock = auto_del; - pj_lock_release(old_lock); - - if (old_lock) - pj_lock_destroy(old_lock); - - return PJ_SUCCESS; -} - PJ_DEF(pj_status_t) pj_stun_session_set_software_name(pj_stun_session *sess, const pj_str_t *sw) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); if (sw && sw->slen) pj_strdup(sess->pool, &sess->srv_name, sw); else sess->srv_name.slen = 0; + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -622,6 +640,7 @@ PJ_DEF(pj_status_t) pj_stun_session_set_credential(pj_stun_session *sess, { PJ_ASSERT_RETURN(sess, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); sess->auth_type = auth_type; if (cred) { pj_stun_auth_cred_dup(sess->pool, &sess->cred, cred); @@ -629,6 +648,7 @@ PJ_DEF(pj_status_t) pj_stun_session_set_credential(pj_stun_session *sess, sess->auth_type = PJ_STUN_AUTH_NONE; pj_bzero(&sess->cred, sizeof(sess->cred)); } + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -705,17 +725,21 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess, PJ_ASSERT_RETURN(sess && p_tdata, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = create_tdata(sess, &tdata); if (status != PJ_SUCCESS) - return status; + goto on_error; /* Create STUN message */ status = pj_stun_msg_create(tdata->pool, method, magic, tsx_id, &tdata->msg); - if (status != PJ_SUCCESS) { - pj_pool_release(tdata->pool); - return status; - } + if (status != PJ_SUCCESS) + goto on_error; /* copy the request's transaction ID as the transaction key. */ pj_assert(sizeof(tdata->msg_key)==sizeof(tdata->msg->hdr.tsx_id)); @@ -731,10 +755,8 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess, } else if (sess->auth_type == PJ_STUN_AUTH_SHORT_TERM) { /* MUST put authentication in request */ status = get_auth(sess, tdata); - if (status != PJ_SUCCESS) { - pj_pool_release(tdata->pool); - return status; - } + if (status != PJ_SUCCESS) + goto on_error; } else if (sess->auth_type == PJ_STUN_AUTH_LONG_TERM) { /* Only put authentication information if we've received @@ -742,22 +764,27 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess, */ if (sess->next_nonce.slen != 0) { status = get_auth(sess, tdata); - if (status != PJ_SUCCESS) { - pj_pool_release(tdata->pool); - return status; - } + if (status != PJ_SUCCESS) + goto on_error; tdata->auth_info.nonce = sess->next_nonce; tdata->auth_info.realm = sess->server_realm; } } else { pj_assert(!"Invalid authentication type"); - pj_pool_release(tdata->pool); - return PJ_EBUG; + status = PJ_EBUG; + goto on_error; } *p_tdata = tdata; + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; + +on_error: + if (tdata) + pj_pool_release(tdata->pool); + pj_grp_lock_release(sess->grp_lock); + return status; } PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess, @@ -769,9 +796,17 @@ PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess, PJ_ASSERT_RETURN(sess && p_tdata, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = create_tdata(sess, &tdata); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(sess->grp_lock); return status; + } /* Create STUN message */ msg_type |= PJ_STUN_INDICATION_BIT; @@ -779,10 +814,13 @@ PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess, NULL, &tdata->msg); if (status != PJ_SUCCESS) { pj_pool_release(tdata->pool); + pj_grp_lock_release(sess->grp_lock); return status; } *p_tdata = tdata; + + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -798,15 +836,24 @@ PJ_DEF(pj_status_t) pj_stun_session_create_res( pj_stun_session *sess, pj_status_t status; pj_stun_tx_data *tdata = NULL; + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = create_tdata(sess, &tdata); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(sess->grp_lock); return status; + } /* Create STUN response message */ status = pj_stun_msg_create_response(tdata->pool, rdata->msg, err_code, err_msg, &tdata->msg); if (status != PJ_SUCCESS) { pj_pool_release(tdata->pool); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -821,6 +868,8 @@ PJ_DEF(pj_status_t) pj_stun_session_create_res( pj_stun_session *sess, *p_tdata = tdata; + pj_grp_lock_release(sess->grp_lock); + return PJ_SUCCESS; } @@ -867,6 +916,13 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, PJ_ASSERT_RETURN(sess && addr_len && server && tdata, PJ_EINVAL); + /* Lock the session and prevent user from destroying us in the callback */ + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + pj_log_push_indent(); /* Allocate packet */ @@ -876,10 +932,6 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, tdata->token = token; tdata->retransmit = retransmit; - /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); - /* Apply options */ status = apply_msg_options(sess, tdata->pool, &tdata->auth_info, tdata->msg); @@ -909,7 +961,8 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, if (PJ_STUN_IS_REQUEST(tdata->msg->hdr.type)) { /* Create STUN client transaction */ - status = pj_stun_client_tsx_create(sess->cfg, tdata->pool, + status = pj_stun_client_tsx_create(sess->cfg, tdata->pool, + sess->grp_lock, &tsx_cb, &tdata->client_tsx); PJ_ASSERT_RETURN(status==PJ_SUCCESS, status); pj_stun_client_tsx_set_data(tdata->client_tsx, (void*)tdata); @@ -939,17 +992,17 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, pj_time_val timeout; pj_memset(&tdata->res_timer, 0, sizeof(tdata->res_timer)); - pj_timer_entry_init(&tdata->res_timer, PJ_TRUE, tdata, + pj_timer_entry_init(&tdata->res_timer, PJ_FALSE, tdata, &on_cache_timeout); timeout.sec = sess->cfg->res_cache_msec / 1000; timeout.msec = sess->cfg->res_cache_msec % 1000; - status = pj_timer_heap_schedule(sess->cfg->timer_heap, - &tdata->res_timer, - &timeout); + status = pj_timer_heap_schedule_w_grp_lock(sess->cfg->timer_heap, + &tdata->res_timer, + &timeout, PJ_TRUE, + sess->grp_lock); if (status != PJ_SUCCESS) { - tdata->res_timer.id = PJ_FALSE; pj_stun_msg_destroy_tdata(sess, tdata); LOG_ERR_(sess, "Error scheduling response timer", status); goto on_return; @@ -975,15 +1028,10 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, } on_return: - pj_lock_release(sess->lock); - pj_log_pop_indent(); - /* Check if application has called destroy() in the callback */ - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + if (pj_grp_lock_release(sess->grp_lock)) + return PJ_EGONE; return status; } @@ -1005,14 +1053,25 @@ PJ_DEF(pj_status_t) pj_stun_session_respond( pj_stun_session *sess, pj_str_t reason; pj_stun_tx_data *tdata; + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = pj_stun_session_create_res(sess, rdata, code, (errmsg?pj_cstr(&reason,errmsg):NULL), &tdata); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(sess->grp_lock); return status; + } - return pj_stun_session_send_msg(sess, token, cache, PJ_FALSE, - dst_addr, addr_len, tdata); + status = pj_stun_session_send_msg(sess, token, cache, PJ_FALSE, + dst_addr, addr_len, tdata); + + pj_grp_lock_release(sess->grp_lock); + return status; } @@ -1029,8 +1088,11 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess, PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL); /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } if (notify) { (sess->cb.on_request_complete)(sess, notify_status, tdata->token, @@ -1040,12 +1102,7 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess, /* Just destroy tdata. This will destroy the transaction as well */ pj_stun_msg_destroy_tdata(sess, tdata); - pj_lock_release(sess->lock); - - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -1063,17 +1120,15 @@ PJ_DEF(pj_status_t) pj_stun_session_retransmit_req(pj_stun_session *sess, PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL); /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } status = pj_stun_client_tsx_retransmit(tdata->client_tsx, mod_count); - pj_lock_release(sess->lock); - - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1361,11 +1416,15 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess, PJ_ASSERT_RETURN(sess && packet && pkt_size, PJ_EINVAL); - pj_log_push_indent(); - /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + + pj_log_push_indent(); /* Reset pool */ pj_pool_reset(sess->rx_pool); @@ -1418,17 +1477,10 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess, } on_return: - pj_lock_release(sess->lock); - pj_log_pop_indent(); - /* If we've received destroy request while we're on the callback, - * destroy the session now. - */ - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + if (pj_grp_lock_release(sess->grp_lock)) + return PJ_EGONE; return status; } diff --git a/pjnath/src/pjnath/stun_sock.c b/pjnath/src/pjnath/stun_sock.c index 3f756f50..bc395411 100644 --- a/pjnath/src/pjnath/stun_sock.c +++ b/pjnath/src/pjnath/stun_sock.c @@ -28,9 +28,15 @@ #include #include #include +#include #include #include +#if 1 +# define TRACE_(x) PJ_LOG(5,x) +#else +# define TRACE_(x) +#endif enum { MAX_BIND_RETRY = 100 }; @@ -39,7 +45,7 @@ struct pj_stun_sock char *obj_name; /* Log identification */ pj_pool_t *pool; /* Pool */ void *user_data; /* Application user data */ - + pj_bool_t is_destroying; /* Destroy already called */ int af; /* Address family */ pj_stun_config stun_cfg; /* STUN config (ioqueue etc)*/ pj_stun_sock_cb cb; /* Application callbacks */ @@ -58,13 +64,16 @@ struct pj_stun_sock pj_uint16_t tsx_id[6]; /* .. to match STUN msg */ pj_stun_session *stun_sess; /* STUN session */ - + pj_grp_lock_t *grp_lock; /* Session group lock */ }; /* * Prototypes for static functions */ +/* Destructor for group lock */ +static void stun_sock_destructor(void *obj); + /* This callback is called by the STUN session to send packet */ static pj_status_t sess_on_send_msg(pj_stun_session *sess, void *token, @@ -202,6 +211,20 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, if (stun_sock->ka_interval == 0) stun_sock->ka_interval = PJ_STUN_KEEP_ALIVE_SEC; + if (cfg && cfg->grp_lock) { + stun_sock->grp_lock = cfg->grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &stun_sock->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } + } + + pj_grp_lock_add_ref(stun_sock->grp_lock); + pj_grp_lock_add_handler(stun_sock->grp_lock, pool, stun_sock, + &stun_sock_destructor); + /* Create socket and bind socket */ status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &stun_sock->sock_fd); if (status != PJ_SUCCESS) @@ -252,6 +275,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, pj_activesock_cb activesock_cb; pj_activesock_cfg_default(&activesock_cfg); + activesock_cfg.grp_lock = stun_sock->grp_lock; activesock_cfg.async_cnt = cfg->async_cnt; activesock_cfg.concurrency = 0; @@ -290,6 +314,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, status = pj_stun_session_create(&stun_sock->stun_cfg, stun_sock->obj_name, &sess_cb, PJ_FALSE, + stun_sock->grp_lock, &stun_sock->stun_sess); if (status != PJ_SUCCESS) goto on_error; @@ -332,6 +357,8 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock, PJ_ASSERT_RETURN(stun_sock && domain && default_port, PJ_EINVAL); + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Check whether the domain contains IP address */ stun_sock->srv_addr.addr.sa_family = (pj_uint16_t)stun_sock->af; status = pj_inet_pton(stun_sock->af, domain, @@ -360,7 +387,6 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock, &stun_sock->q); /* Processing will resume when the DNS SRV callback is called */ - return status; } else { @@ -378,53 +404,70 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock, pj_sockaddr_set_port(&stun_sock->srv_addr, (pj_uint16_t)default_port); /* Start sending Binding request */ - return get_mapped_addr(stun_sock); + status = get_mapped_addr(stun_sock); } + + pj_grp_lock_release(stun_sock->grp_lock); + return status; } -/* Destroy */ -PJ_DEF(pj_status_t) pj_stun_sock_destroy(pj_stun_sock *stun_sock) +/* Destructor */ +static void stun_sock_destructor(void *obj) { + pj_stun_sock *stun_sock = (pj_stun_sock*)obj; + if (stun_sock->q) { pj_dns_srv_cancel_query(stun_sock->q, PJ_FALSE); stun_sock->q = NULL; } + /* if (stun_sock->stun_sess) { - pj_stun_session_set_user_data(stun_sock->stun_sess, NULL); + pj_stun_session_destroy(stun_sock->stun_sess); + stun_sock->stun_sess = NULL; } - - /* Destroy the active socket first just in case we'll get - * stray callback. - */ + */ + + if (stun_sock->pool) { + pj_pool_t *pool = stun_sock->pool; + stun_sock->pool = NULL; + pj_pool_release(pool); + } + + TRACE_(("", "STUN sock %p destroyed", stun_sock)); + +} + +/* Destroy */ +PJ_DEF(pj_status_t) pj_stun_sock_destroy(pj_stun_sock *stun_sock) +{ + TRACE_((stun_sock->obj_name, "STUN sock %p request, ref_cnt=%d", + stun_sock, pj_grp_lock_get_ref(stun_sock->grp_lock))); + + pj_grp_lock_acquire(stun_sock->grp_lock); + if (stun_sock->is_destroying) { + /* Destroy already called */ + pj_grp_lock_release(stun_sock->grp_lock); + return PJ_EINVALIDOP; + } + + stun_sock->is_destroying = PJ_TRUE; + pj_timer_heap_cancel_if_active(stun_sock->stun_cfg.timer_heap, + &stun_sock->ka_timer, 0); + if (stun_sock->active_sock != NULL) { - pj_activesock_t *asock = stun_sock->active_sock; - stun_sock->active_sock = NULL; stun_sock->sock_fd = PJ_INVALID_SOCKET; - pj_activesock_set_user_data(asock, NULL); - pj_activesock_close(asock); + pj_activesock_close(stun_sock->active_sock); } else if (stun_sock->sock_fd != PJ_INVALID_SOCKET) { pj_sock_close(stun_sock->sock_fd); stun_sock->sock_fd = PJ_INVALID_SOCKET; } - if (stun_sock->ka_timer.id != 0) { - pj_timer_heap_cancel(stun_sock->stun_cfg.timer_heap, - &stun_sock->ka_timer); - stun_sock->ka_timer.id = 0; - } - if (stun_sock->stun_sess) { pj_stun_session_destroy(stun_sock->stun_sess); - stun_sock->stun_sess = NULL; } - - if (stun_sock->pool) { - pj_pool_t *pool = stun_sock->pool; - stun_sock->pool = NULL; - pj_pool_release(pool); - } - + pj_grp_lock_dec_ref(stun_sock->grp_lock); + pj_grp_lock_release(stun_sock->grp_lock); return PJ_SUCCESS; } @@ -468,12 +511,15 @@ static void dns_srv_resolver_cb(void *user_data, { pj_stun_sock *stun_sock = (pj_stun_sock*) user_data; + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Clear query */ stun_sock->q = NULL; /* Handle error */ if (status != PJ_SUCCESS) { sess_fail(stun_sock, PJ_STUN_SOCK_DNS_OP, status); + pj_grp_lock_release(stun_sock->grp_lock); return; } @@ -490,6 +536,8 @@ static void dns_srv_resolver_cb(void *user_data, /* Start sending Binding request */ get_mapped_addr(stun_sock); + + pj_grp_lock_release(stun_sock->grp_lock); } @@ -533,6 +581,8 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, PJ_ASSERT_RETURN(stun_sock && info, PJ_EINVAL); + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Copy STUN server address and mapped address */ pj_memcpy(&info->srv_addr, &stun_sock->srv_addr, sizeof(pj_sockaddr)); @@ -543,8 +593,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, addr_len = sizeof(info->bound_addr); status = pj_sock_getsockname(stun_sock->sock_fd, &info->bound_addr, &addr_len); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return status; + } /* If socket is bound to a specific interface, then only put that * interface in the alias list. Otherwise query all the interfaces @@ -560,8 +612,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, /* Get the default address */ status = pj_gethostip(stun_sock->af, &def_addr); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return status; + } pj_sockaddr_set_port(&def_addr, port); @@ -569,8 +623,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, info->alias_cnt = PJ_ARRAY_SIZE(info->aliases); status = pj_enum_ip_interface(stun_sock->af, &info->alias_cnt, info->aliases); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return status; + } /* Set the port number for each address. */ @@ -590,6 +646,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, } } + pj_grp_lock_release(stun_sock->grp_lock); return PJ_SUCCESS; } @@ -603,14 +660,29 @@ PJ_DEF(pj_status_t) pj_stun_sock_sendto( pj_stun_sock *stun_sock, unsigned addr_len) { pj_ssize_t size; + pj_status_t status; + PJ_ASSERT_RETURN(stun_sock && pkt && dst_addr && addr_len, PJ_EINVAL); + pj_grp_lock_acquire(stun_sock->grp_lock); + + if (!stun_sock->active_sock) { + /* We have been shutdown, but this callback may still get called + * by retransmit timer. + */ + pj_grp_lock_release(stun_sock->grp_lock); + return PJ_EINVALIDOP; + } + if (send_key==NULL) send_key = &stun_sock->send_key; size = pkt_len; - return pj_activesock_sendto(stun_sock->active_sock, send_key, - pkt, &size, flag, dst_addr, addr_len); + status = pj_activesock_sendto(stun_sock->active_sock, send_key, + pkt, &size, flag, dst_addr, addr_len); + + pj_grp_lock_release(stun_sock->grp_lock); + return status; } /* This callback is called by the STUN session to send packet */ @@ -625,14 +697,18 @@ static pj_status_t sess_on_send_msg(pj_stun_session *sess, pj_ssize_t size; stun_sock = (pj_stun_sock *) pj_stun_session_get_user_data(sess); - if (!stun_sock || !stun_sock->active_sock) + if (!stun_sock || !stun_sock->active_sock) { + /* We have been shutdown, but this callback may still get called + * by retransmit timer. + */ return PJ_EINVALIDOP; + } pj_assert(token==INTERNAL_MSG_TOKEN); PJ_UNUSED_ARG(token); size = pkt_size; - return pj_activesock_sendto(stun_sock->active_sock, + return pj_activesock_sendto(stun_sock->active_sock, &stun_sock->int_send_key, pkt, &size, 0, dst_addr, addr_len); } @@ -726,25 +802,20 @@ on_return: /* Schedule keep-alive timer */ static void start_ka_timer(pj_stun_sock *stun_sock) { - if (stun_sock->ka_timer.id != 0) { - pj_timer_heap_cancel(stun_sock->stun_cfg.timer_heap, - &stun_sock->ka_timer); - stun_sock->ka_timer.id = 0; - } + pj_timer_heap_cancel_if_active(stun_sock->stun_cfg.timer_heap, + &stun_sock->ka_timer, 0); pj_assert(stun_sock->ka_interval != 0); - if (stun_sock->ka_interval > 0) { + if (stun_sock->ka_interval > 0 && !stun_sock->is_destroying) { pj_time_val delay; delay.sec = stun_sock->ka_interval; delay.msec = 0; - if (pj_timer_heap_schedule(stun_sock->stun_cfg.timer_heap, - &stun_sock->ka_timer, - &delay) == PJ_SUCCESS) - { - stun_sock->ka_timer.id = PJ_TRUE; - } + pj_timer_heap_schedule_w_grp_lock(stun_sock->stun_cfg.timer_heap, + &stun_sock->ka_timer, + &delay, PJ_TRUE, + stun_sock->grp_lock); } } @@ -756,14 +827,18 @@ static void ka_timer_cb(pj_timer_heap_t *th, pj_timer_entry *te) stun_sock = (pj_stun_sock *) te->user_data; PJ_UNUSED_ARG(th); + pj_grp_lock_acquire(stun_sock->grp_lock); /* Time to send STUN Binding request */ - if (get_mapped_addr(stun_sock) != PJ_SUCCESS) + if (get_mapped_addr(stun_sock) != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return; + } /* Next keep-alive timer will be scheduled once the request * is complete. */ + pj_grp_lock_release(stun_sock->grp_lock); } /* Callback from active socket when incoming packet is received */ @@ -788,6 +863,8 @@ static pj_bool_t on_data_recvfrom(pj_activesock_t *asock, return PJ_TRUE; } + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Check that this is STUN message */ status = pj_stun_msg_check((const pj_uint8_t*)data, size, PJ_STUN_IS_DATAGRAM | PJ_STUN_CHECK_PACKET); @@ -823,7 +900,10 @@ static pj_bool_t on_data_recvfrom(pj_activesock_t *asock, status = pj_stun_session_on_rx_pkt(stun_sock->stun_sess, data, size, PJ_STUN_IS_DATAGRAM, NULL, NULL, src_addr, addr_len); - return status!=PJNATH_ESTUNDESTROYED ? PJ_TRUE : PJ_FALSE; + + status = pj_grp_lock_release(stun_sock->grp_lock); + + return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE; process_app_data: if (stun_sock->cb.on_rx_data) { @@ -831,10 +911,12 @@ process_app_data: ret = (*stun_sock->cb.on_rx_data)(stun_sock, data, size, src_addr, addr_len); - return ret; + status = pj_grp_lock_release(stun_sock->grp_lock); + return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE; } - return PJ_TRUE; + status = pj_grp_lock_release(stun_sock->grp_lock); + return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE; } /* Callback from active socket about send status */ @@ -857,6 +939,8 @@ static pj_bool_t on_data_sent(pj_activesock_t *asock, if (stun_sock->cb.on_data_sent) { pj_bool_t ret; + pj_grp_lock_acquire(stun_sock->grp_lock); + /* If app gives NULL send_key in sendto() function, then give * NULL in the callback too */ @@ -866,6 +950,7 @@ static pj_bool_t on_data_sent(pj_activesock_t *asock, /* Call callback */ ret = (*stun_sock->cb.on_data_sent)(stun_sock, send_key, sent); + pj_grp_lock_release(stun_sock->grp_lock); return ret; } diff --git a/pjnath/src/pjnath/stun_transaction.c b/pjnath/src/pjnath/stun_transaction.c index 8677d322..390f67f7 100644 --- a/pjnath/src/pjnath/stun_transaction.c +++ b/pjnath/src/pjnath/stun_transaction.c @@ -26,6 +26,8 @@ #include +#define THIS_FILE "stun_transaction.c" +#define TIMER_INACTIVE 0 #define TIMER_ACTIVE 1 @@ -34,6 +36,7 @@ struct pj_stun_client_tsx char obj_name[PJ_MAX_OBJ_NAME]; pj_stun_tsx_cb cb; void *user_data; + pj_grp_lock_t *grp_lock; pj_bool_t complete; @@ -51,18 +54,24 @@ struct pj_stun_client_tsx }; +#if 1 +# define TRACE_(expr) PJ_LOG(5,expr) +#else +# define TRACE_(expr) +#endif + + static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, pj_timer_entry *timer); static void destroy_timer_callback(pj_timer_heap_t *timer_heap, pj_timer_entry *timer); -#define stun_perror(tsx,msg,rc) pjnath_perror(tsx->obj_name, msg, rc) - /* * Create a STUN client transaction. */ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg, pj_pool_t *pool, + pj_grp_lock_t *grp_lock, const pj_stun_tsx_cb *cb, pj_stun_client_tsx **p_tsx) { @@ -74,6 +83,7 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg, tsx = PJ_POOL_ZALLOC_T(pool, pj_stun_client_tsx); tsx->rto_msec = cfg->rto_msec; tsx->timer_heap = cfg->timer_heap; + tsx->grp_lock = grp_lock; pj_memcpy(&tsx->cb, cb, sizeof(*cb)); tsx->retransmit_timer.cb = &retransmit_timer_callback; @@ -82,7 +92,7 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg, tsx->destroy_timer.cb = &destroy_timer_callback; tsx->destroy_timer.user_data = tsx; - pj_ansi_snprintf(tsx->obj_name, sizeof(tsx->obj_name), "stuntsx%p", tsx); + pj_ansi_snprintf(tsx->obj_name, sizeof(tsx->obj_name), "utsx%p", tsx); *p_tsx = tsx; @@ -100,26 +110,30 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_schedule_destroy( PJ_ASSERT_RETURN(tsx && delay, PJ_EINVAL); PJ_ASSERT_RETURN(tsx->cb.on_destroy, PJ_EINVAL); + pj_grp_lock_acquire(tsx->grp_lock); + /* Cancel previously registered timer */ - if (tsx->destroy_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->destroy_timer); - tsx->destroy_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->destroy_timer, + TIMER_INACTIVE); /* Stop retransmission, just in case */ - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); - status = pj_timer_heap_schedule(tsx->timer_heap, - &tsx->destroy_timer, delay); - if (status != PJ_SUCCESS) + status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap, + &tsx->destroy_timer, delay, + TIMER_ACTIVE, tsx->grp_lock); + if (status != PJ_SUCCESS) { + pj_grp_lock_release(tsx->grp_lock); return status; + } - tsx->destroy_timer.id = TIMER_ACTIVE; tsx->cb.on_complete = NULL; + pj_grp_lock_release(tsx->grp_lock); + + TRACE_((tsx->obj_name, "STUN transaction %p schedule destroy", tsx)); + return PJ_SUCCESS; } @@ -127,20 +141,21 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_schedule_destroy( /* * Destroy transaction immediately. */ -PJ_DEF(pj_status_t) pj_stun_client_tsx_destroy(pj_stun_client_tsx *tsx) +PJ_DEF(pj_status_t) pj_stun_client_tsx_stop(pj_stun_client_tsx *tsx) { PJ_ASSERT_RETURN(tsx, PJ_EINVAL); - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } - if (tsx->destroy_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->destroy_timer); - tsx->destroy_timer.id = 0; - } + /* Don't call grp_lock_acquire() because we might be called on + * group lock's destructor. + */ + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->destroy_timer, + TIMER_INACTIVE); + + PJ_LOG(5,(tsx->obj_name, "STUN client transaction %p stopped, ref_cnt=%d", + tsx, pj_grp_lock_get_ref(tsx->grp_lock))); - PJ_LOG(5,(tsx->obj_name, "STUN client transaction destroyed")); return PJ_SUCCESS; } @@ -185,7 +200,7 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx, { pj_status_t status; - PJ_ASSERT_RETURN(tsx->retransmit_timer.id == 0 || + PJ_ASSERT_RETURN(tsx->retransmit_timer.id == TIMER_INACTIVE || !tsx->require_retransmit, PJ_EBUSY); if (tsx->require_retransmit && mod_count) { @@ -211,14 +226,15 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx, * cancel it (as opposed to when schedule_timer() failed we cannot * cancel transmission). */; - status = pj_timer_heap_schedule(tsx->timer_heap, - &tsx->retransmit_timer, - &tsx->retransmit_time); + status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap, + &tsx->retransmit_timer, + &tsx->retransmit_time, + TIMER_ACTIVE, + tsx->grp_lock); if (status != PJ_SUCCESS) { - tsx->retransmit_timer.id = 0; + tsx->retransmit_timer.id = TIMER_INACTIVE; return status; } - tsx->retransmit_timer.id = TIMER_ACTIVE; } @@ -235,12 +251,12 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx, if (status == PJNATH_ESTUNDESTROYED) { /* We've been destroyed, don't access the object. */ } else if (status != PJ_SUCCESS) { - if (tsx->retransmit_timer.id != 0 && mod_count) { - pj_timer_heap_cancel(tsx->timer_heap, - &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; + if (mod_count) { + pj_timer_heap_cancel_if_active( tsx->timer_heap, + &tsx->retransmit_timer, + TIMER_INACTIVE); } - stun_perror(tsx, "STUN error sending message", status); + PJ_PERROR(4, (tsx->obj_name, status, "STUN error sending message")); } pj_log_pop_indent(); @@ -261,6 +277,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_send_msg(pj_stun_client_tsx *tsx, PJ_ASSERT_RETURN(tsx && pkt && pkt_len, PJ_EINVAL); PJ_ASSERT_RETURN(tsx->retransmit_timer.id == 0, PJ_EBUSY); + pj_grp_lock_acquire(tsx->grp_lock); + /* Encode message */ tsx->last_pkt = pkt; tsx->last_pkt_size = pkt_len; @@ -286,27 +304,29 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_send_msg(pj_stun_client_tsx *tsx, * cancel it (as opposed to when schedule_timer() failed we cannot * cancel transmission). */; - status = pj_timer_heap_schedule(tsx->timer_heap, - &tsx->retransmit_timer, - &tsx->retransmit_time); + status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap, + &tsx->retransmit_timer, + &tsx->retransmit_time, + TIMER_ACTIVE, + tsx->grp_lock); if (status != PJ_SUCCESS) { - tsx->retransmit_timer.id = 0; + tsx->retransmit_timer.id = TIMER_INACTIVE; + pj_grp_lock_release(tsx->grp_lock); return status; } - tsx->retransmit_timer.id = TIMER_ACTIVE; } /* Send the message */ status = tsx_transmit_msg(tsx, PJ_TRUE); if (status != PJ_SUCCESS) { - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, - &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, + &tsx->retransmit_timer, + TIMER_INACTIVE); + pj_grp_lock_release(tsx->grp_lock); return status; } + pj_grp_lock_release(tsx->grp_lock); return PJ_SUCCESS; } @@ -319,6 +339,7 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, pj_status_t status; PJ_UNUSED_ARG(timer_heap); + pj_grp_lock_acquire(tsx->grp_lock); if (tsx->transmit_count >= PJ_STUN_MAX_TRANSMIT_COUNT) { /* Retransmission count exceeded. Transaction has failed */ @@ -331,6 +352,7 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, tsx->cb.on_complete(tsx, PJNATH_ESTUNTIMEDOUT, NULL, NULL, 0); } } + pj_grp_lock_release(tsx->grp_lock); /* We might have been destroyed, don't try to access the object */ pj_log_pop_indent(); return; @@ -338,9 +360,7 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, tsx->retransmit_timer.id = 0; status = tsx_transmit_msg(tsx, PJ_TRUE); - if (status == PJNATH_ESTUNDESTROYED) { - /* We've been destroyed, don't try to access the object */ - } else if (status != PJ_SUCCESS) { + if (status != PJ_SUCCESS) { tsx->retransmit_timer.id = 0; if (!tsx->complete) { tsx->complete = PJ_TRUE; @@ -348,8 +368,10 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, tsx->cb.on_complete(tsx, status, NULL, NULL, 0); } } - /* We might have been destroyed, don't try to access the object */ } + + pj_grp_lock_release(tsx->grp_lock); + /* We might have been destroyed, don't try to access the object */ } /* @@ -362,10 +384,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_retransmit(pj_stun_client_tsx *tsx, return PJ_SUCCESS; } - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); return tsx_transmit_msg(tsx, mod_count); } @@ -379,6 +399,7 @@ static void destroy_timer_callback(pj_timer_heap_t *timer_heap, PJ_UNUSED_ARG(timer_heap); tsx->destroy_timer.id = PJ_FALSE; + tsx->cb.on_destroy(tsx); /* Don't access transaction after this */ } @@ -408,10 +429,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_on_rx_msg(pj_stun_client_tsx *tsx, /* We have a response with matching transaction ID. * We can cancel retransmit timer now. */ - if (tsx->retransmit_timer.id) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); /* Find STUN error code attribute */ err_attr = (pj_stun_errcode_attr*) diff --git a/pjnath/src/pjnath/turn_session.c b/pjnath/src/pjnath/turn_session.c index ea89e304..e783912b 100644 --- a/pjnath/src/pjnath/turn_session.c +++ b/pjnath/src/pjnath/turn_session.c @@ -112,8 +112,9 @@ struct pj_turn_session pj_turn_session_cb cb; void *user_data; pj_stun_config stun_cfg; + pj_bool_t is_destroying; - pj_lock_t *lock; + pj_grp_lock_t *grp_lock; int busy; pj_turn_state_t state; @@ -161,6 +162,7 @@ struct pj_turn_session */ static void sess_shutdown(pj_turn_session *sess, pj_status_t status); +static void turn_sess_on_destroy(void *comp); static void do_destroy(pj_turn_session *sess); static void send_refresh(pj_turn_session *sess, int lifetime); static pj_status_t stun_on_send_msg(pj_stun_session *sess, @@ -236,6 +238,7 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, const char *name, int af, pj_turn_tp_type conn_type, + pj_grp_lock_t *grp_lock, const pj_turn_session_cb *cb, unsigned options, void *user_data, @@ -244,7 +247,6 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, pj_pool_t *pool; pj_turn_session *sess; pj_stun_session_cb stun_cb; - pj_lock_t *null_lock; pj_status_t status; PJ_ASSERT_RETURN(cfg && cfg->pf && cb && p_sess, PJ_EINVAL); @@ -281,13 +283,20 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, sess->perm_table = pj_hash_create(pool, PJ_TURN_PERM_HTABLE_SIZE); /* Session lock */ - status = pj_lock_create_recursive_mutex(pool, sess->obj_name, - &sess->lock); - if (status != PJ_SUCCESS) { - do_destroy(sess); - return status; + if (grp_lock) { + sess->grp_lock = grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &sess->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(sess->grp_lock); + pj_grp_lock_add_handler(sess->grp_lock, pool, sess, + &turn_sess_on_destroy); + /* Timer */ pj_timer_entry_init(&sess->timer, TIMER_NONE, sess, &on_timer_event); @@ -297,7 +306,7 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, stun_cb.on_request_complete = &stun_on_request_complete; stun_cb.on_rx_indication = &stun_on_rx_indication; status = pj_stun_session_create(&sess->stun_cfg, sess->obj_name, &stun_cb, - PJ_FALSE, &sess->stun); + PJ_FALSE, sess->grp_lock, &sess->stun); if (status != PJ_SUCCESS) { do_destroy(sess); return status; @@ -306,16 +315,6 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, /* Attach ourself to STUN session */ pj_stun_session_set_user_data(sess->stun, sess); - /* Replace mutex in STUN session with a NULL mutex, since access to - * STUN session is serialized. - */ - status = pj_lock_create_null_mutex(pool, name, &null_lock); - if (status != PJ_SUCCESS) { - do_destroy(sess); - return status; - } - pj_stun_session_set_lock(sess->stun, null_lock, PJ_TRUE); - /* Done */ PJ_LOG(4,(sess->obj_name, "TURN client session created")); @@ -325,32 +324,9 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, } -/* Destroy */ -static void do_destroy(pj_turn_session *sess) +static void turn_sess_on_destroy(void *comp) { - /* Lock session */ - if (sess->lock) { - pj_lock_acquire(sess->lock); - } - - /* Cancel pending timer, if any */ - if (sess->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(sess->timer_heap, &sess->timer); - sess->timer.id = TIMER_NONE; - } - - /* Destroy STUN session */ - if (sess->stun) { - pj_stun_session_destroy(sess->stun); - sess->stun = NULL; - } - - /* Destroy lock */ - if (sess->lock) { - pj_lock_release(sess->lock); - pj_lock_destroy(sess->lock); - sess->lock = NULL; - } + pj_turn_session *sess = (pj_turn_session*) comp; /* Destroy pool */ if (sess->pool) { @@ -363,6 +339,26 @@ static void do_destroy(pj_turn_session *sess) } } +/* Destroy */ +static void do_destroy(pj_turn_session *sess) +{ + PJ_LOG(4,(sess->obj_name, "TURN session destroy request, ref_cnt=%d", + pj_grp_lock_get_ref(sess->grp_lock))); + + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return; + } + + sess->is_destroying = PJ_TRUE; + pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer, TIMER_NONE); + pj_stun_session_destroy(sess->stun); + + pj_grp_lock_dec_ref(sess->grp_lock); + pj_grp_lock_release(sess->grp_lock); +} + /* Set session state */ static void set_state(pj_turn_session *sess, enum pj_turn_state_t state) @@ -437,13 +433,11 @@ static void sess_shutdown(pj_turn_session *sess, set_state(sess, PJ_TURN_STATE_DESTROYING); - if (sess->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(sess->timer_heap, &sess->timer); - sess->timer.id = TIMER_NONE; - } - - sess->timer.id = TIMER_DESTROY; - pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay); + pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer, + TIMER_NONE); + pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer, + &delay, TIMER_DESTROY, + sess->grp_lock); } } @@ -455,11 +449,11 @@ PJ_DEF(pj_status_t) pj_turn_session_shutdown(pj_turn_session *sess) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); sess_shutdown(sess, PJ_SUCCESS); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -553,9 +547,9 @@ PJ_DEF(pj_status_t) pj_turn_session_set_software_name( pj_turn_session *sess, { pj_status_t status; - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); status = pj_stun_session_set_software_name(sess->stun, sw); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -576,7 +570,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_server( pj_turn_session *sess, PJ_ASSERT_RETURN(sess && domain, PJ_EINVAL); PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_NULL, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* See if "domain" contains just IP address */ tmp_addr.addr.sa_family = sess->af; @@ -676,7 +670,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_server( pj_turn_session *sess, } on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -690,11 +684,11 @@ PJ_DEF(pj_status_t) pj_turn_session_set_credential(pj_turn_session *sess, PJ_ASSERT_RETURN(sess && cred, PJ_EINVAL); PJ_ASSERT_RETURN(sess->stun, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); pj_stun_session_set_credential(sess->stun, PJ_STUN_AUTH_LONG_TERM, cred); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -715,7 +709,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, sess->state<=PJ_TURN_STATE_RESOLVED, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); if (param && param != &sess->alloc_param) pj_turn_alloc_param_copy(sess->pool, &sess->alloc_param, param); @@ -726,7 +720,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, PJ_LOG(4,(sess->obj_name, "Pending ALLOCATE in state %s", state_names[sess->state])); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -738,7 +732,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, status = pj_stun_session_create_req(sess->stun, PJ_STUN_ALLOCATE_REQUEST, PJ_STUN_MAGIC, NULL, &tdata); if (status != PJ_SUCCESS) { - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -778,7 +772,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, set_state(sess, PJ_TURN_STATE_RESOLVED); } - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -799,14 +793,14 @@ PJ_DEF(pj_status_t) pj_turn_session_set_perm( pj_turn_session *sess, PJ_ASSERT_RETURN(sess && addr_cnt && addr, PJ_EINVAL); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* Create a bare CreatePermission request */ status = pj_stun_session_create_req(sess->stun, PJ_STUN_CREATE_PERM_REQUEST, PJ_STUN_MAGIC, NULL, &tdata); if (status != PJ_SUCCESS) { - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -857,7 +851,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_perm( pj_turn_session *sess, goto on_error; } - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; on_error: @@ -874,7 +868,7 @@ on_error: if (perm->req_token == req_token) invalidate_perm(sess, perm); } - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -945,7 +939,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, } /* Lock session now */ - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* Lookup permission first */ perm = lookup_perm(sess, addr, pj_sockaddr_get_len(addr), PJ_FALSE); @@ -960,7 +954,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, status = pj_turn_session_set_perm(sess, 1, (const pj_sockaddr*)addr, 0); if (status != PJ_SUCCESS) { - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } } @@ -1035,7 +1029,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, } on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1055,7 +1049,7 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess, PJ_ASSERT_RETURN(sess && peer_adr && addr_len, PJ_EINVAL); PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_READY, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* Create blank ChannelBind request */ status = pj_stun_session_create_req(sess->stun, @@ -1098,7 +1092,7 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess, tdata); on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1121,7 +1115,7 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess, */ /* Start locking the session */ - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); is_datagram = (sess->conn_type==PJ_TURN_TP_UDP); @@ -1193,7 +1187,7 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess, } on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1385,20 +1379,22 @@ static void on_allocate_success(pj_turn_session *sess, /* Cancel existing keep-alive timer, if any */ pj_assert(sess->timer.id != TIMER_DESTROY); - - if (sess->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(sess->timer_heap, &sess->timer); - sess->timer.id = TIMER_NONE; + if (sess->timer.id == TIMER_KEEP_ALIVE) { + pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer, + TIMER_NONE); } /* Start keep-alive timer once allocation succeeds */ - timeout.sec = sess->ka_interval; - timeout.msec = 0; + if (sess->state < PJ_TURN_STATE_DEALLOCATING) { + timeout.sec = sess->ka_interval; + timeout.msec = 0; - sess->timer.id = TIMER_KEEP_ALIVE; - pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &timeout); + pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer, + &timeout, TIMER_KEEP_ALIVE, + sess->grp_lock); - set_state(sess, PJ_TURN_STATE_READY); + set_state(sess, PJ_TURN_STATE_READY); + } } /* @@ -1948,7 +1944,7 @@ static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e) PJ_UNUSED_ARG(th); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); eid = (enum timer_id_t) e->id; e->id = TIMER_NONE; @@ -2025,19 +2021,18 @@ static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e) delay.sec = sess->ka_interval; delay.msec = 0; - sess->timer.id = TIMER_KEEP_ALIVE; - pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer, + &delay, TIMER_KEEP_ALIVE, + sess->grp_lock); } - pj_lock_release(sess->lock); - } else if (eid == TIMER_DESTROY) { /* Time to destroy */ - pj_lock_release(sess->lock); do_destroy(sess); } else { pj_assert(!"Unknown timer event"); - pj_lock_release(sess->lock); } + + pj_grp_lock_release(sess->grp_lock); } diff --git a/pjnath/src/pjnath/turn_sock.c b/pjnath/src/pjnath/turn_sock.c index 08831a4c..6392428f 100644 --- a/pjnath/src/pjnath/turn_sock.c +++ b/pjnath/src/pjnath/turn_sock.c @@ -46,13 +46,13 @@ struct pj_turn_sock pj_turn_sock_cb cb; void *user_data; - pj_lock_t *lock; + pj_bool_t is_destroying; + pj_grp_lock_t *grp_lock; pj_turn_alloc_param alloc_param; pj_stun_config cfg; pj_turn_sock_cfg setting; - pj_bool_t destroy_request; pj_timer_entry timer; int af; @@ -93,6 +93,7 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, +static void turn_sock_on_destroy(void *comp); static void destroy(pj_turn_sock *turn_sock); static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e); @@ -168,14 +169,21 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, pj_memcpy(&turn_sock->cb, cb, sizeof(*cb)); } - /* Create lock */ - status = pj_lock_create_recursive_mutex(pool, turn_sock->obj_name, - &turn_sock->lock); - if (status != PJ_SUCCESS) { - destroy(turn_sock); - return status; + /* Session lock */ + if (setting && setting->grp_lock) { + turn_sock->grp_lock = setting->grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &turn_sock->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(turn_sock->grp_lock); + pj_grp_lock_add_handler(turn_sock->grp_lock, pool, turn_sock, + &turn_sock_on_destroy); + /* Init timer */ pj_timer_entry_init(&turn_sock->timer, TIMER_NONE, turn_sock, &timer_cb); @@ -186,7 +194,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, sess_cb.on_rx_data = &turn_on_rx_data; sess_cb.on_state = &turn_on_state; status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, - &sess_cb, 0, turn_sock, &turn_sock->sess); + turn_sock->grp_lock, &sess_cb, 0, + turn_sock, &turn_sock->sess); if (status != PJ_SUCCESS) { destroy(turn_sock); return status; @@ -203,42 +212,45 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, /* * Destroy. */ -static void destroy(pj_turn_sock *turn_sock) +static void turn_sock_on_destroy(void *comp) { - if (turn_sock->lock) { - pj_lock_acquire(turn_sock->lock); - } - - if (turn_sock->sess) { - pj_turn_session_set_user_data(turn_sock->sess, NULL); - pj_turn_session_shutdown(turn_sock->sess); - turn_sock->sess = NULL; - } - - if (turn_sock->active_sock) { - pj_activesock_set_user_data(turn_sock->active_sock, NULL); - pj_activesock_close(turn_sock->active_sock); - turn_sock->active_sock = NULL; - } - - if (turn_sock->lock) { - pj_lock_release(turn_sock->lock); - pj_lock_destroy(turn_sock->lock); - turn_sock->lock = NULL; - } + pj_turn_sock *turn_sock = (pj_turn_sock*) comp; if (turn_sock->pool) { pj_pool_t *pool = turn_sock->pool; + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroyed")); turn_sock->pool = NULL; pj_pool_release(pool); } } +static void destroy(pj_turn_sock *turn_sock) +{ + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroy request, ref_cnt=%d", + pj_grp_lock_get_ref(turn_sock->grp_lock))); + + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } + + turn_sock->is_destroying = PJ_TRUE; + if (turn_sock->sess) + pj_turn_session_shutdown(turn_sock->sess); + if (turn_sock->active_sock) + pj_activesock_close(turn_sock->active_sock); + pj_grp_lock_dec_ref(turn_sock->grp_lock); + pj_grp_lock_release(turn_sock->grp_lock); +} PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) { - pj_lock_acquire(turn_sock->lock); - turn_sock->destroy_request = PJ_TRUE; + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } if (turn_sock->sess) { pj_turn_session_shutdown(turn_sock->sess); @@ -246,12 +258,11 @@ PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) * session state is DESTROYING we will schedule a timer to * destroy ourselves. */ - pj_lock_release(turn_sock->lock); } else { - pj_lock_release(turn_sock->lock); destroy(turn_sock); } + pj_grp_lock_release(turn_sock->grp_lock); } @@ -267,7 +278,6 @@ static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e) switch (eid) { case TIMER_DESTROY: - PJ_LOG(5,(turn_sock->obj_name, "Destroying TURN")); destroy(turn_sock); break; default: @@ -337,7 +347,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, */ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) { - return pj_lock_acquire(turn_sock->lock); + return pj_grp_lock_acquire(turn_sock->grp_lock); } /** @@ -345,7 +355,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) */ PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock) { - return pj_lock_release(turn_sock->lock); + return pj_grp_lock_release(turn_sock->grp_lock); } /* @@ -381,6 +391,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, PJ_ASSERT_RETURN(turn_sock && domain, PJ_EINVAL); PJ_ASSERT_RETURN(turn_sock->sess, PJ_EINVALIDOP); + pj_grp_lock_acquire(turn_sock->grp_lock); + /* Copy alloc param. We will call session_alloc() only after the * server address has been resolved. */ @@ -395,6 +407,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, status = pj_turn_session_set_credential(turn_sock->sess, cred); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting credential", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } } @@ -404,13 +417,14 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, resolver); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting TURN server", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } /* Done for now. The next work will be done when session state moved * to RESOLVED state. */ - + pj_grp_lock_release(turn_sock->grp_lock); return PJ_SUCCESS; } @@ -472,16 +486,20 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, if (!turn_sock) return PJ_FALSE; + pj_grp_lock_acquire(turn_sock->grp_lock); + /* TURN session may have already been destroyed here. * See ticket #1557 (http://trac.pjsip.org/repos/ticket/1557). */ if (!turn_sock->sess) { sess_fail(turn_sock, "TURN session already destroyed", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } if (status != PJ_SUCCESS) { sess_fail(turn_sock, "TCP connect() error", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } @@ -500,9 +518,11 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, status = pj_turn_session_alloc(turn_sock->sess, &turn_sock->alloc_param); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error sending ALLOCATE", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } + pj_grp_lock_release(turn_sock->grp_lock); return PJ_TRUE; } @@ -562,9 +582,9 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, pj_bool_t ret = PJ_TRUE; turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); - pj_lock_acquire(turn_sock->lock); + pj_grp_lock_acquire(turn_sock->grp_lock); - if (status == PJ_SUCCESS && turn_sock->sess) { + if (status == PJ_SUCCESS && turn_sock->sess && !turn_sock->is_destroying) { /* Report incoming packet to TURN session, repeat while we have * "packet" in the buffer (required for stream-oriented transports) */ @@ -614,7 +634,7 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, } on_return: - pj_lock_release(turn_sock->lock); + pj_grp_lock_release(turn_sock->grp_lock); return ret; } @@ -634,7 +654,7 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess, pj_ssize_t len = pkt_len; pj_status_t status; - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ // https://trac.pjsip.org/repos/ticket/1316 //pj_assert(!"We should shutdown gracefully"); @@ -680,7 +700,7 @@ static void turn_on_rx_data(pj_turn_session *sess, { pj_turn_sock *turn_sock = (pj_turn_sock*) pj_turn_session_get_user_data(sess); - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ return; } @@ -729,6 +749,7 @@ static void turn_on_state(pj_turn_session *sess, char addrtxt[PJ_INET6_ADDRSTRLEN+8]; int sock_type; pj_sock_t sock; + pj_activesock_cfg asock_cfg; pj_activesock_cb asock_cb; pj_sockaddr bound_addr, *cfg_bind_addr; pj_uint16_t max_bind_retry; @@ -790,11 +811,14 @@ static void turn_on_state(pj_turn_session *sess, } /* Create active socket */ + pj_activesock_cfg_default(&asock_cfg); + asock_cfg.grp_lock = turn_sock->grp_lock; + pj_bzero(&asock_cb, sizeof(asock_cb)); asock_cb.on_data_read = &on_data_read; asock_cb.on_connect_complete = &on_connect_complete; status = pj_activesock_create(turn_sock->pool, sock, - sock_type, NULL, + sock_type, &asock_cfg, turn_sock->cfg.ioqueue, &asock_cb, turn_sock, &turn_sock->active_sock); @@ -835,14 +859,12 @@ static void turn_on_state(pj_turn_session *sess, turn_sock->sess = NULL; pj_turn_session_set_user_data(sess, NULL); - if (turn_sock->timer.id) { - pj_timer_heap_cancel(turn_sock->cfg.timer_heap, &turn_sock->timer); - turn_sock->timer.id = 0; - } - - turn_sock->timer.id = TIMER_DESTROY; - pj_timer_heap_schedule(turn_sock->cfg.timer_heap, &turn_sock->timer, - &delay); + pj_timer_heap_cancel_if_active(turn_sock->cfg.timer_heap, + &turn_sock->timer, 0); + pj_timer_heap_schedule_w_grp_lock(turn_sock->cfg.timer_heap, + &turn_sock->timer, + &delay, TIMER_DESTROY, + turn_sock->grp_lock); } } diff --git a/pjnath/src/pjturn-srv/allocation.c b/pjnath/src/pjturn-srv/allocation.c index 253e2f0b..b13f02f1 100644 --- a/pjnath/src/pjturn-srv/allocation.c +++ b/pjnath/src/pjturn-srv/allocation.c @@ -338,7 +338,7 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_transport *transport, sess_cb.on_rx_request = &stun_on_rx_request; sess_cb.on_rx_indication = &stun_on_rx_indication; status = pj_stun_session_create(&srv->core.stun_cfg, alloc->obj_name, - &sess_cb, PJ_FALSE, &alloc->sess); + &sess_cb, PJ_FALSE, NULL, &alloc->sess); if (status != PJ_SUCCESS) { goto on_error; } diff --git a/pjnath/src/pjturn-srv/server.c b/pjnath/src/pjturn-srv/server.c index da1afdce..f53243d5 100644 --- a/pjnath/src/pjturn-srv/server.c +++ b/pjnath/src/pjturn-srv/server.c @@ -155,7 +155,8 @@ PJ_DEF(pj_status_t) pj_turn_srv_create(pj_pool_factory *pf, sess_cb.on_send_msg = &on_tx_stun_msg; status = pj_stun_session_create(&srv->core.stun_cfg, srv->obj_name, - &sess_cb, PJ_FALSE, &srv->core.stun_sess); + &sess_cb, PJ_FALSE, NULL, + &srv->core.stun_sess); if (status != PJ_SUCCESS) { goto on_error; } -- cgit v1.2.3