diff options
author | Jason Parker <jparker@digium.com> | 2013-03-11 15:09:56 -0500 |
---|---|---|
committer | Jason Parker <jparker@digium.com> | 2013-03-11 15:09:56 -0500 |
commit | 483805f79570115ab95c69698792d238c1719b1b (patch) | |
tree | 6b53ab2fd2b2478f864ccc8bd1b0bfaedc4d2050 /pjnath | |
parent | f3ab456a17af1c89a6e3be4d20c5944853df1cb0 (diff) |
Import pjproject-2.1
Diffstat (limited to 'pjnath')
26 files changed, 1805 insertions, 809 deletions
diff --git a/pjnath/build/Makefile b/pjnath/build/Makefile index 573d957..32b0b50 100644 --- a/pjnath/build/Makefile +++ b/pjnath/build/Makefile @@ -40,7 +40,7 @@ export PJNATH_CFLAGS += $(_CFLAGS) # Defines for building test application # export PJNATH_TEST_SRCDIR = ../src/pjnath-test -export PJNATH_TEST_OBJS += ice_test.o stun.o sess_auth.o server.o \ +export PJNATH_TEST_OBJS += ice_test.o stun.o sess_auth.o server.o concur_test.o \ stun_sock_test.o turn_sock_test.o test.o export PJNATH_TEST_CFLAGS += $(_CFLAGS) export PJNATH_TEST_LDFLAGS += $(_LDFLAGS) @@ -97,6 +97,8 @@ distclean: realclean pjnath: $(MAKE) -f $(RULES_MAK) APP=PJNATH app=pjnath $(PJNATH_LIB) +$$(PJNATH_LIB): pjnath + pjnath-test: $(PJLIB_LIB) $(PJLIB_UTIL_LIB) $(PJNATH_LIB) $(MAKE) -f $(RULES_MAK) APP=PJNATH_TEST app=pjnath-test $(PJNATH_TEST_EXE) diff --git a/pjnath/build/pjnath_test.vcproj b/pjnath/build/pjnath_test.vcproj index ced2508..4ec5bbf 100644 --- a/pjnath/build/pjnath_test.vcproj +++ b/pjnath/build/pjnath_test.vcproj @@ -2999,6 +2999,64 @@ Filter="cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
>
<File
+ RelativePath="..\src\pjnath-test\concur_test.c"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions=""
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions=""
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug-Static|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions=""
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release-Dynamic|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions=""
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug-Dynamic|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions=""
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release-Static|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions=""
+ />
+ </FileConfiguration>
+ </File>
+ <File
RelativePath="..\src\pjnath-test\ice_test.c"
>
<FileConfiguration
diff --git a/pjnath/include/pjnath/config.h b/pjnath/include/pjnath/config.h index 06368df..3ee5d91 100644 --- a/pjnath/include/pjnath/config.h +++ b/pjnath/include/pjnath/config.h @@ -1,4 +1,4 @@ -/* $Id: config.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: config.h 4199 2012-07-05 10:52:55Z nanang $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -487,6 +487,17 @@ # define PJNATH_POOL_INC_TURN_SOCK 1000 #endif +/** Default STUN software name */ +#ifndef PJNATH_STUN_SOFTWARE_NAME +# define PJNATH_MAKE_SW_NAME(a,b,c,d) "pjnath-" #a "." #b "." #c d +# define PJNATH_MAKE_SW_NAME2(a,b,c,d) PJNATH_MAKE_SW_NAME(a,b,c,d) +# define PJNATH_STUN_SOFTWARE_NAME PJNATH_MAKE_SW_NAME2( \ + PJ_VERSION_NUM_MAJOR, \ + PJ_VERSION_NUM_MINOR, \ + PJ_VERSION_NUM_REV, \ + PJ_VERSION_NUM_EXTRA) +#endif + /** * @} */ diff --git a/pjnath/include/pjnath/ice_session.h b/pjnath/include/pjnath/ice_session.h index 1e65943..e21f520 100644 --- a/pjnath/include/pjnath/ice_session.h +++ b/pjnath/include/pjnath/ice_session.h @@ -1,4 +1,4 @@ -/* $Id: ice_session.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: ice_session.h 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -612,13 +612,14 @@ struct pj_ice_sess pj_pool_t *pool; /**< Pool instance. */ void *user_data; /**< App. data. */ - pj_mutex_t *mutex; /**< Mutex. */ + pj_grp_lock_t *grp_lock; /**< Group lock */ pj_ice_sess_role role; /**< ICE role. */ pj_ice_sess_options opt; /**< Options */ pj_timestamp tie_breaker; /**< Tie breaker value */ pj_uint8_t *prefs; /**< Type preference. */ pj_bool_t is_nominating; /**< Nominating stage */ pj_bool_t is_complete; /**< Complete? */ + pj_bool_t is_destroying; /**< Destroy is called */ pj_status_t ice_status; /**< Error status. */ pj_timer_entry timer; /**< ICE timer. */ pj_ice_sess_cb cb; /**< Callback. */ @@ -729,6 +730,8 @@ PJ_DECL(void) pj_ice_sess_options_default(pj_ice_sess_options *opt); * the value is NULL, a random string will be * generated. * @param local_passwd Optional string to be used as local password. + * @param grp_lock Optional group lock to be used by this session. + * If NULL, the session will create one itself. * @param p_ice Pointer to receive the ICE session instance. * * @return PJ_SUCCESS if ICE session is created successfully. @@ -740,6 +743,7 @@ PJ_DECL(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg, const pj_ice_sess_cb *cb, const pj_str_t *local_ufrag, const pj_str_t *local_passwd, + pj_grp_lock_t *grp_lock, pj_ice_sess **p_ice); /** diff --git a/pjnath/include/pjnath/stun_config.h b/pjnath/include/pjnath/stun_config.h index 199c452..e5a0b98 100644 --- a/pjnath/include/pjnath/stun_config.h +++ b/pjnath/include/pjnath/stun_config.h @@ -1,4 +1,4 @@ -/* $Id: stun_config.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: stun_config.h 4199 2012-07-05 10:52:55Z nanang $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -81,6 +81,13 @@ typedef struct pj_stun_config */ unsigned res_cache_msec; + /** + * Software name to be included in all STUN requests and responses. + * + * Default: PJNATH_STUN_SOFTWARE_NAME. + */ + pj_str_t software_name; + } pj_stun_config; @@ -102,6 +109,7 @@ PJ_INLINE(void) pj_stun_config_init(pj_stun_config *cfg, cfg->timer_heap = timer_heap; cfg->rto_msec = PJ_STUN_RTO_VALUE; cfg->res_cache_msec = PJ_STUN_RES_CACHE_DURATION; + cfg->software_name = pj_str((char*)PJNATH_STUN_SOFTWARE_NAME); } diff --git a/pjnath/include/pjnath/stun_session.h b/pjnath/include/pjnath/stun_session.h index 29efe1a..dd12e91 100644 --- a/pjnath/include/pjnath/stun_session.h +++ b/pjnath/include/pjnath/stun_session.h @@ -1,4 +1,4 @@ -/* $Id: stun_session.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: stun_session.h 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -30,6 +30,7 @@ #include <pjnath/stun_config.h> #include <pjnath/stun_transaction.h> #include <pj/list.h> +#include <pj/lock.h> #include <pj/timer.h> PJ_BEGIN_DECL @@ -384,6 +385,8 @@ typedef enum pj_stun_sess_msg_log_flag * name will be used for example for logging purpose. * @param cb Session callback. * @param fingerprint Enable message fingerprint for outgoing messages. + * @param grp_lock Optional group lock to be used by this session. + * If NULL, the session will create one itself. * @param p_sess Pointer to receive STUN session instance. * * @return PJ_SUCCESS on success, or the appropriate error code. @@ -392,6 +395,7 @@ PJ_DECL(pj_status_t) pj_stun_session_create(pj_stun_config *cfg, const char *name, const pj_stun_session_cb *cb, pj_bool_t fingerprint, + pj_grp_lock_t *grp_lock, pj_stun_session **p_sess); /** @@ -431,22 +435,6 @@ PJ_DECL(pj_status_t) pj_stun_session_set_user_data(pj_stun_session *sess, PJ_DECL(void*) pj_stun_session_get_user_data(pj_stun_session *sess); /** - * Change the lock object used by the STUN session. By default, the STUN - * session uses a mutex to protect its internal data. If application already - * protects access to STUN session with higher layer lock, it may disable - * the mutex protection in the STUN session by changing the STUN session - * lock to a NULL mutex. - * - * @param sess The STUN session instance. - * @param lock New lock instance to be used by the STUN session. - * @param auto_del Specify whether STUN session should destroy this - * lock instance when it's destroyed. - */ -PJ_DECL(pj_status_t) pj_stun_session_set_lock(pj_stun_session *sess, - pj_lock_t *lock, - pj_bool_t auto_del); - -/** * Set SOFTWARE name to be included in all requests and responses. * * @param sess The STUN session instance. @@ -682,6 +670,8 @@ PJ_DECL(pj_status_t) pj_stun_session_cancel_req(pj_stun_session *sess, * * @param sess The STUN session instance. * @param tdata The request message previously sent. + * @param mod_count Boolean flag to indicate whether transmission count + * needs to be incremented. * * @return PJ_SUCCESS on success, or the appropriate error. * This function will return PJNATH_ESTUNDESTROYED if @@ -689,7 +679,8 @@ PJ_DECL(pj_status_t) pj_stun_session_cancel_req(pj_stun_session *sess, * callback. */ PJ_DECL(pj_status_t) pj_stun_session_retransmit_req(pj_stun_session *sess, - pj_stun_tx_data *tdata); + pj_stun_tx_data *tdata, + pj_bool_t mod_count); /** diff --git a/pjnath/include/pjnath/stun_sock.h b/pjnath/include/pjnath/stun_sock.h index decba9a..c18741a 100644 --- a/pjnath/include/pjnath/stun_sock.h +++ b/pjnath/include/pjnath/stun_sock.h @@ -1,4 +1,4 @@ -/* $Id: stun_sock.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: stun_sock.h 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -27,6 +27,7 @@ #include <pjnath/stun_config.h> #include <pjlib-util/resolver.h> #include <pj/ioqueue.h> +#include <pj/lock.h> #include <pj/sock.h> #include <pj/sock_qos.h> @@ -218,7 +219,17 @@ typedef struct pj_stun_sock_info typedef struct pj_stun_sock_cfg { /** - * Packet buffer size. Default value is PJ_STUN_SOCK_PKT_LEN. + * The group lock to be used by the STUN socket. If NULL, the STUN socket + * will create one internally. + * + * Default: NULL + */ + pj_grp_lock_t *grp_lock; + + /** + * Packet buffer size. + * + * Default value is PJ_STUN_SOCK_PKT_LEN. */ unsigned max_pkt_size; @@ -236,11 +247,21 @@ typedef struct pj_stun_sock_cfg * address is zero, socket will be bound to INADDR_ANY. If the address * is non-zero, socket will be bound to this address only, and the * transport will have only one address alias (the \a alias_cnt field - * in #pj_stun_sock_info structure. + * in #pj_stun_sock_info structure. If the port is set to zero, the + * socket will bind at any port (chosen by the OS). */ pj_sockaddr bound_addr; /** + * Specify the port range for STUN socket binding, relative to the start + * port number specified in \a bound_addr. Note that this setting is only + * applicable when the start port number is non zero. + * + * Default value is zero. + */ + pj_uint16_t port_range; + + /** * Specify the STUN keep-alive duration, in seconds. The STUN transport * does keep-alive by sending STUN Binding request to the STUN server. * If this value is zero, the PJ_STUN_KEEP_ALIVE_SEC value will be used. diff --git a/pjnath/include/pjnath/stun_transaction.h b/pjnath/include/pjnath/stun_transaction.h index 526186f..2acb56c 100644 --- a/pjnath/include/pjnath/stun_transaction.h +++ b/pjnath/include/pjnath/stun_transaction.h @@ -1,4 +1,4 @@ -/* $Id: stun_transaction.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: stun_transaction.h 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -27,6 +27,7 @@ #include <pjnath/stun_msg.h> #include <pjnath/stun_config.h> +#include <pj/lock.h> PJ_BEGIN_DECL @@ -124,6 +125,7 @@ typedef struct pj_stun_tsx_cb * @param cfg The STUN endpoint, which will be used to retrieve * various settings for the transaction. * @param pool Pool to be used to allocate memory from. + * @param grp_lock Group lock to synchronize. * @param cb Callback structure, to be used by the transaction * to send message and to notify the application about * the completion of the transaction. @@ -133,6 +135,7 @@ typedef struct pj_stun_tsx_cb */ PJ_DECL(pj_status_t) pj_stun_client_tsx_create( pj_stun_config *cfg, pj_pool_t *pool, + pj_grp_lock_t *grp_lock, const pj_stun_tsx_cb *cb, pj_stun_client_tsx **p_tsx); @@ -159,15 +162,14 @@ pj_stun_client_tsx_schedule_destroy(pj_stun_client_tsx *tsx, /** - * Destroy a STUN client transaction immediately. This function can be - * called at any time to stop the transaction and destroy it. + * Stop the client transaction. * * @param tsx The STUN transaction. * * @return PJ_SUCCESS on success or PJ_EINVAL if the parameter * is NULL. */ -PJ_DECL(pj_status_t) pj_stun_client_tsx_destroy(pj_stun_client_tsx *tsx); +PJ_DECL(pj_status_t) pj_stun_client_tsx_stop(pj_stun_client_tsx *tsx); /** @@ -234,13 +236,16 @@ PJ_DECL(pj_status_t) pj_stun_client_tsx_send_msg(pj_stun_client_tsx *tsx, * but this functionality is needed by ICE. * * @param tsx The STUN client transaction instance. + * @param mod_count Boolean flag to indicate whether transmission count + * needs to be incremented. * * @return PJ_SUCCESS on success, or PJNATH_ESTUNDESTROYED * when the user has destroyed the transaction in * \a on_send_msg() callback, or any other error code * as returned by \a on_send_msg() callback. */ -PJ_DECL(pj_status_t) pj_stun_client_tsx_retransmit(pj_stun_client_tsx *tsx); +PJ_DECL(pj_status_t) pj_stun_client_tsx_retransmit(pj_stun_client_tsx *tsx, + pj_bool_t mod_count); /** diff --git a/pjnath/include/pjnath/turn_session.h b/pjnath/include/pjnath/turn_session.h index 0eeb6e3..eb6d16b 100644 --- a/pjnath/include/pjnath/turn_session.h +++ b/pjnath/include/pjnath/turn_session.h @@ -1,4 +1,4 @@ -/* $Id: turn_session.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: turn_session.h 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -417,7 +417,10 @@ PJ_DECL(const char*) pj_turn_state_name(pj_turn_state_t state); * @param name Optional name to identify this session in the log. * @param af Address family of the client connection. Currently * pj_AF_INET() and pj_AF_INET6() are supported. - * @param conn_type Connection type to the TURN server. + * @param conn_type Connection type to the TURN server. + * @param grp_lock Optional group lock object to be used by this session. + * If this value is NULL, the session will create + * a group lock internally. * @param cb Callback to receive events from the TURN session. * @param options Option flags, currently this value must be zero. * @param user_data Arbitrary application data to be associated with @@ -432,6 +435,7 @@ PJ_DECL(pj_status_t) pj_turn_session_create(const pj_stun_config *cfg, const char *name, int af, pj_turn_tp_type conn_type, + pj_grp_lock_t *grp_lock, const pj_turn_session_cb *cb, unsigned options, void *user_data, diff --git a/pjnath/include/pjnath/turn_sock.h b/pjnath/include/pjnath/turn_sock.h index 1a75a64..b13057c 100644 --- a/pjnath/include/pjnath/turn_sock.h +++ b/pjnath/include/pjnath/turn_sock.h @@ -1,4 +1,4 @@ -/* $Id: turn_sock.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: turn_sock.h 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -109,6 +109,21 @@ typedef struct pj_turn_sock_cb typedef struct pj_turn_sock_cfg { /** + * The group lock to be used by the STUN socket. If NULL, the STUN socket + * will create one internally. + * + * Default: NULL + */ + pj_grp_lock_t *grp_lock; + + /** + * Packet buffer size. + * + * Default value is PJ_TURN_MAX_PKT_LEN. + */ + unsigned max_pkt_size; + + /** * QoS traffic type to be set on this transport. When application wants * to apply QoS tagging to the transport, it's preferable to set this * field rather than \a qos_param fields since this is more portable. @@ -134,6 +149,23 @@ typedef struct pj_turn_sock_cfg */ pj_bool_t qos_ignore_error; + /** + * Specify the interface where the socket should be bound to. If the + * address is zero, socket will be bound to INADDR_ANY. If the address + * is non-zero, socket will be bound to this address only. If the port is + * set to zero, the socket will bind at any port (chosen by the OS). + */ + pj_sockaddr bound_addr; + + /** + * Specify the port range for TURN socket binding, relative to the start + * port number specified in \a bound_addr. Note that this setting is only + * applicable when the start port number is non zero. + * + * Default value is zero. + */ + pj_uint16_t port_range; + } pj_turn_sock_cfg; diff --git a/pjnath/src/pjnath-test/concur_test.c b/pjnath/src/pjnath-test/concur_test.c new file mode 100644 index 0000000..bf54e94 --- /dev/null +++ b/pjnath/src/pjnath-test/concur_test.c @@ -0,0 +1,367 @@ +/* $Id: concur_test.c 4412 2013-03-05 03:12:32Z riza $ */ +/* + * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) + * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "test.h" + +#if INCLUDE_CONCUR_TEST + +#define THIS_FILE "concur_test.c" + +/****************************************************************************/ +#define WORKER_THREAD_CNT 4 +#define SERVER_THREAD_CNT 4 +#define MAX_SOCK_CLIENTS 80 + +struct stun_test_session +{ + pj_stun_config stun_cfg; + + pj_lock_t *lock; + + pj_thread_t *worker_threads[WORKER_THREAD_CNT]; + + pj_sock_t server_sock; + int server_port; + pj_thread_t *server_threads[SERVER_THREAD_CNT]; + pj_event_t *server_event; + + pj_bool_t thread_quit_flag; + + /* Test parameters: */ + struct { + int client_got_response; + + pj_bool_t server_wait_for_event; + pj_bool_t server_drop_request; + int client_sleep_after_start; + int client_sleep_before_destroy; + } param; +}; + +static int server_thread_proc(void *p) +{ + struct stun_test_session *test_sess = (struct stun_test_session*)p; + pj_pool_t *pool; + pj_status_t status; + + PJ_LOG(4,(THIS_FILE, "Server thread running")); + + pool = pj_pool_create(test_sess->stun_cfg.pf, "server", 512, 512, NULL); + + while (!test_sess->thread_quit_flag) { + pj_time_val timeout = {0, 10}; + pj_fd_set_t rdset; + int n; + + /* Serve client */ + PJ_FD_ZERO(&rdset); + PJ_FD_SET(test_sess->server_sock, &rdset); + n = pj_sock_select(test_sess->server_sock+1, &rdset, + NULL, NULL, &timeout); + if (n==1 && PJ_FD_ISSET(test_sess->server_sock, &rdset)) { + pj_uint8_t pkt[512]; + pj_ssize_t pkt_len; + pj_size_t res_len; + pj_sockaddr client_addr; + int addr_len; + + pj_stun_msg *stun_req, *stun_res; + + pj_pool_reset(pool); + + /* Got query */ + pkt_len = sizeof(pkt); + addr_len = sizeof(client_addr); + status = pj_sock_recvfrom(test_sess->server_sock, pkt, &pkt_len, + 0, &client_addr, &addr_len); + if (status != PJ_SUCCESS) { + continue; + } + + status = pj_stun_msg_decode(pool, pkt, pkt_len, + PJ_STUN_IS_DATAGRAM, + &stun_req, NULL, NULL); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "STUN request decode error")); + continue; + } + + status = pj_stun_msg_create_response(pool, stun_req, + PJ_STUN_SC_BAD_REQUEST, NULL, + &stun_res); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "STUN create response error")); + continue; + } + + status = pj_stun_msg_encode(stun_res, pkt, sizeof(pkt), 0, + NULL, &res_len); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "STUN encode error")); + continue; + } + + /* Ignore request */ + if (test_sess->param.server_drop_request) + continue; + + /* Wait for signal to continue */ + if (test_sess->param.server_wait_for_event) + pj_event_wait(test_sess->server_event); + + pkt_len = res_len; + pj_sock_sendto(test_sess->server_sock, pkt, &pkt_len, 0, + &client_addr, pj_sockaddr_get_len(&client_addr)); + } + } + + pj_pool_release(pool); + + PJ_LOG(4,(THIS_FILE, "Server thread quitting")); + return 0; +} + +static int worker_thread_proc(void *p) +{ + struct stun_test_session *test_sess = (struct stun_test_session*)p; + + PJ_LOG(4,(THIS_FILE, "Worker thread running")); + + while (!test_sess->thread_quit_flag) { + pj_time_val timeout = {0, 10}; + pj_timer_heap_poll(test_sess->stun_cfg.timer_heap, NULL); + pj_ioqueue_poll(test_sess->stun_cfg.ioqueue, &timeout); + } + + PJ_LOG(4,(THIS_FILE, "Worker thread quitting")); + return 0; +} + +static pj_bool_t stun_sock_on_status(pj_stun_sock *stun_sock, + pj_stun_sock_op op, + pj_status_t status) +{ + struct stun_test_session *test_sess = (struct stun_test_session*)pj_stun_sock_get_user_data(stun_sock); + + PJ_UNUSED_ARG(op); + PJ_UNUSED_ARG(status); + + test_sess->param.client_got_response++; + return PJ_TRUE; +} + +static int stun_destroy_test_session(struct stun_test_session *test_sess) +{ + + unsigned i; + pj_stun_sock_cb stun_cb; + pj_status_t status; + pj_stun_sock *stun_sock[MAX_SOCK_CLIENTS]; + + pj_bzero(&stun_cb, sizeof(stun_cb)); + stun_cb.on_status = &stun_sock_on_status; + + pj_event_reset(test_sess->server_event); + + /* Create all clients first */ + for (i=0; i<MAX_SOCK_CLIENTS; ++i) { + char name[10]; + sprintf(name, "stun%02d", i); + status = pj_stun_sock_create(&test_sess->stun_cfg, name, pj_AF_INET(), + &stun_cb, NULL, test_sess, + &stun_sock[i]); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "Error creating stun socket")); + return -10; + } + } + + /* Start resolution */ + for (i=0; i<MAX_SOCK_CLIENTS; ++i) { + pj_str_t server_ip = pj_str("127.0.0.1"); + status = pj_stun_sock_start(stun_sock[i], &server_ip, + (pj_uint16_t)test_sess->server_port, NULL); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "Error starting stun socket")); + return -20; + } + } + + /* settle down */ + pj_thread_sleep(test_sess->param.client_sleep_after_start); + + /* Resume server threads */ + pj_event_set(test_sess->server_event); + + pj_thread_sleep(test_sess->param.client_sleep_before_destroy); + + /* Destroy clients */ + for (i=0; i<MAX_SOCK_CLIENTS; ++i) { + status = pj_stun_sock_destroy(stun_sock[i]); + if (status != PJ_SUCCESS) { + PJ_PERROR(1,(THIS_FILE, status, "Error destroying stun socket")); + } + } + + return 0; +} + +static int stun_destroy_test(void) +{ + enum { LOOP = 500 }; + struct stun_test_session test_sess; + pj_sockaddr bind_addr; + int addr_len; + pj_caching_pool cp; + pj_pool_t *pool; + unsigned i; + pj_status_t status; + int rc = 0; + + PJ_LOG(3,(THIS_FILE, " STUN destroy concurrency test")); + + pj_bzero(&test_sess, sizeof(test_sess)); + + pj_caching_pool_init(&cp, NULL, 0); + pool = pj_pool_create(&cp.factory, "testsess", 512, 512, NULL); + + pj_stun_config_init(&test_sess.stun_cfg, &cp.factory, 0, NULL, NULL); + + status = pj_timer_heap_create(pool, 1023, &test_sess.stun_cfg.timer_heap); + pj_assert(status == PJ_SUCCESS); + + status = pj_lock_create_recursive_mutex(pool, NULL, &test_sess.lock); + pj_assert(status == PJ_SUCCESS); + + pj_timer_heap_set_lock(test_sess.stun_cfg.timer_heap, test_sess.lock, PJ_TRUE); + pj_assert(status == PJ_SUCCESS); + + status = pj_ioqueue_create(pool, 512, &test_sess.stun_cfg.ioqueue); + pj_assert(status == PJ_SUCCESS); + + pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &test_sess.server_sock); + pj_sockaddr_init(pj_AF_INET(), &bind_addr, NULL, 0); + status = pj_sock_bind(test_sess.server_sock, &bind_addr, pj_sockaddr_get_len(&bind_addr)); + pj_assert(status == PJ_SUCCESS); + + addr_len = sizeof(bind_addr); + status = pj_sock_getsockname(test_sess.server_sock, &bind_addr, &addr_len); + pj_assert(status == PJ_SUCCESS); + + test_sess.server_port = pj_sockaddr_get_port(&bind_addr); + + status = pj_event_create(pool, NULL, PJ_TRUE, PJ_FALSE, &test_sess.server_event); + pj_assert(status == PJ_SUCCESS); + + for (i=0; i<SERVER_THREAD_CNT; ++i) { + status = pj_thread_create(pool, NULL, + &server_thread_proc, &test_sess, + 0, 0, &test_sess.server_threads[i]); + pj_assert(status == PJ_SUCCESS); + } + + for (i=0; i<WORKER_THREAD_CNT; ++i) { + status = pj_thread_create(pool, NULL, + &worker_thread_proc, &test_sess, + 0, 0, &test_sess.worker_threads[i]); + pj_assert(status == PJ_SUCCESS); + } + + /* Test 1: Main thread calls destroy while callback is processing response */ + PJ_LOG(3,(THIS_FILE, " Destroy in main thread while callback is running")); + for (i=0; i<LOOP; ++i) { + int sleep = pj_rand() % 5; + + PJ_LOG(3,(THIS_FILE, " Try %-3d of %d", i+1, LOOP)); + + /* Test 1: destroy at the same time when receiving response */ + pj_bzero(&test_sess.param, sizeof(test_sess.param)); + test_sess.param.client_sleep_after_start = 20; + test_sess.param.client_sleep_before_destroy = sleep; + test_sess.param.server_wait_for_event = PJ_TRUE; + stun_destroy_test_session(&test_sess); + PJ_LOG(3,(THIS_FILE, + " stun test a: sleep delay:%d: clients with response: %d", + sleep, test_sess.param.client_got_response)); + + /* Test 2: destroy at the same time with STUN retransmit timer */ + test_sess.param.server_drop_request = PJ_TRUE; + test_sess.param.client_sleep_after_start = 0; + test_sess.param.client_sleep_before_destroy = PJ_STUN_RTO_VALUE; + test_sess.param.server_wait_for_event = PJ_FALSE; + stun_destroy_test_session(&test_sess); + PJ_LOG(3,(THIS_FILE, " stun test b: retransmit concurrency")); + + /* Test 3: destroy at the same time with receiving response + * AND STUN retransmit timer */ + test_sess.param.client_got_response = 0; + test_sess.param.server_drop_request = PJ_FALSE; + test_sess.param.client_sleep_after_start = PJ_STUN_RTO_VALUE; + test_sess.param.client_sleep_before_destroy = 0; + test_sess.param.server_wait_for_event = PJ_TRUE; + stun_destroy_test_session(&test_sess); + PJ_LOG(3,(THIS_FILE, + " stun test c: clients with response: %d", + test_sess.param.client_got_response)); + + pj_thread_sleep(10); + + ice_one_conc_test(&test_sess.stun_cfg, PJ_FALSE); + + pj_thread_sleep(10); + } + + /* Avoid compiler warning */ + goto on_return; + + +on_return: + test_sess.thread_quit_flag = PJ_TRUE; + + for (i=0; i<SERVER_THREAD_CNT; ++i) { + pj_thread_join(test_sess.server_threads[i]); + } + + for (i=0; i<WORKER_THREAD_CNT; ++i) { + pj_thread_join(test_sess.worker_threads[i]); + } + + pj_event_destroy(test_sess.server_event); + pj_sock_close(test_sess.server_sock); + pj_ioqueue_destroy(test_sess.stun_cfg.ioqueue); + pj_timer_heap_destroy(test_sess.stun_cfg.timer_heap); + + pj_pool_release(pool); + pj_caching_pool_destroy(&cp); + + PJ_LOG(3,(THIS_FILE, " Done. rc=%d", rc)); + return rc; +} + + +int concur_test(void) +{ + int rc = 0; + + rc += stun_destroy_test(); + + return 0; +} + +#endif /* INCLUDE_CONCUR_TEST */ diff --git a/pjnath/src/pjnath-test/ice_test.c b/pjnath/src/pjnath-test/ice_test.c index fe0ee8d..2b74811 100644 --- a/pjnath/src/pjnath-test/ice_test.c +++ b/pjnath/src/pjnath-test/ice_test.c @@ -1,4 +1,4 @@ -/* $Id: ice_test.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: ice_test.c 4412 2013-03-05 03:12:32Z riza $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -29,7 +29,9 @@ enum #define NODELAY 0xFFFFFFFF #define SRV_DOMAIN "pjsip.lab.domain" +#define MAX_THREADS 16 +#define THIS_FILE "ice_test.c" #define INDENT " " /* Client flags */ @@ -48,7 +50,7 @@ struct test_result unsigned rx_cnt[4]; /* Number of data received */ }; - +/* Role comp# host? stun? turn? flag? ans_del snd_del des_del */ /* Test session configuration */ struct test_cfg { @@ -60,8 +62,8 @@ struct test_cfg unsigned client_flag; /* Client flags */ unsigned answer_delay; /* Delay before sending SDP */ - unsigned send_delay; /* Delay before sending data */ - unsigned destroy_delay; /* Delay before destroy() */ + unsigned send_delay; /* unused */ + unsigned destroy_delay; /* unused */ struct test_result expected;/* Expected result */ @@ -79,6 +81,17 @@ struct ice_ept pj_str_t pass; /* password */ }; +/* Session param */ +struct sess_param +{ + unsigned worker_cnt; + unsigned worker_timeout; + pj_bool_t worker_quit; + + pj_bool_t destroy_after_create; + pj_bool_t destroy_after_one_done; +}; + /* The test session */ struct test_sess { @@ -86,8 +99,12 @@ struct test_sess pj_stun_config *stun_cfg; pj_dns_resolver *resolver; + struct sess_param *param; + test_server *server; + pj_thread_t *worker_threads[MAX_THREADS]; + unsigned server_flag; struct ice_ept caller; struct ice_ept callee; @@ -190,6 +207,7 @@ static int create_sess(pj_stun_config *stun_cfg, unsigned server_flag, struct test_cfg *caller_cfg, struct test_cfg *callee_cfg, + struct sess_param *test_param, struct test_sess **p_sess) { pj_pool_t *pool; @@ -204,6 +222,7 @@ static int create_sess(pj_stun_config *stun_cfg, sess = PJ_POOL_ZALLOC_T(pool, struct test_sess); sess->pool = pool; sess->stun_cfg = stun_cfg; + sess->param = test_param; pj_memcpy(&sess->caller.cfg, caller_cfg, sizeof(*caller_cfg)); sess->caller.result.init_status = sess->caller.result.nego_status = PJ_EPENDING; @@ -261,6 +280,8 @@ static int create_sess(pj_stun_config *stun_cfg, /* Destroy test session */ static void destroy_sess(struct test_sess *sess, unsigned wait_msec) { + unsigned i; + if (sess->caller.ice) { pj_ice_strans_destroy(sess->caller.ice); sess->caller.ice = NULL; @@ -271,6 +292,12 @@ static void destroy_sess(struct test_sess *sess, unsigned wait_msec) sess->callee.ice = NULL; } + sess->param->worker_quit = PJ_TRUE; + for (i=0; i<sess->param->worker_cnt; ++i) { + if (sess->worker_threads[i]) + pj_thread_join(sess->worker_threads[i]); + } + poll_events(sess->stun_cfg, wait_msec, PJ_FALSE); if (sess->resolver) { @@ -326,6 +353,9 @@ static void ice_on_ice_complete(pj_ice_strans *ice_st, case PJ_ICE_STRANS_OP_NEGOTIATION: ept->result.nego_status = status; break; + case PJ_ICE_STRANS_OP_KEEP_ALIVE: + /* keep alive failed? */ + break; default: pj_assert(!"Unknown op"); } @@ -384,20 +414,20 @@ static int check_pair(const struct ice_ept *ept1, const struct ice_ept *ept2, c1 = pj_ice_strans_get_valid_pair(ept1->ice, i+1); if (c1 == NULL) { - PJ_LOG(3,("", INDENT "err: unable to get valid pair for ice1 " + PJ_LOG(3,(THIS_FILE, INDENT "err: unable to get valid pair for ice1 " "component %d", i+1)); return start_err - 2; } c2 = pj_ice_strans_get_valid_pair(ept2->ice, i+1); if (c2 == NULL) { - PJ_LOG(3,("", INDENT "err: unable to get valid pair for ice2 " + PJ_LOG(3,(THIS_FILE, INDENT "err: unable to get valid pair for ice2 " "component %d", i+1)); return start_err - 4; } if (pj_sockaddr_cmp(&c1->rcand->addr, &c2->lcand->addr) != 0) { - PJ_LOG(3,("", INDENT "err: candidate pair does not match " + PJ_LOG(3,(THIS_FILE, INDENT "err: candidate pair does not match " "for component %d", i+1)); return start_err - 6; } @@ -408,14 +438,14 @@ static int check_pair(const struct ice_ept *ept1, const struct ice_ept *ept2, if (ept1->cfg.comp_cnt>i && pj_ice_strans_get_valid_pair(ept1->ice, i+1) != NULL) { - PJ_LOG(3,("", INDENT "err: ice1 shouldn't have valid pair " + PJ_LOG(3,(THIS_FILE, INDENT "err: ice1 shouldn't have valid pair " "for component %d", i+1)); return start_err - 8; } if (ept2->cfg.comp_cnt>i && pj_ice_strans_get_valid_pair(ept2->ice, i+1) != NULL) { - PJ_LOG(3,("", INDENT "err: ice2 shouldn't have valid pair " + PJ_LOG(3,(THIS_FILE, INDENT "err: ice2 shouldn't have valid pair " "for component %d", i+1)); return start_err - 9; } @@ -436,26 +466,44 @@ static int check_pair(const struct ice_ept *ept1, const struct ice_ept *ept2, rc = PJ_SUCCESS; \ break; \ } \ - if (t.sec - t0.sec > (timeout)) break; \ + PJ_TIME_VAL_SUB(t, t0); \ + if ((unsigned)PJ_TIME_VAL_MSEC(t) >= (timeout)) \ + break; \ } \ } +int worker_thread_proc(void *data) +{ + pj_status_t rc; + struct test_sess *sess = (struct test_sess *) data; + pj_stun_config *stun_cfg = sess->stun_cfg; + + /* Wait until negotiation is complete on both endpoints */ +#define ALL_DONE (sess->param->worker_quit || \ + (sess->caller.result.nego_status!=PJ_EPENDING && \ + sess->callee.result.nego_status!=PJ_EPENDING)) + WAIT_UNTIL(sess->param->worker_timeout, ALL_DONE, rc); + + return 0; +} -static int perform_test(const char *title, - pj_stun_config *stun_cfg, - unsigned server_flag, - struct test_cfg *caller_cfg, - struct test_cfg *callee_cfg) +static int perform_test2(const char *title, + pj_stun_config *stun_cfg, + unsigned server_flag, + struct test_cfg *caller_cfg, + struct test_cfg *callee_cfg, + struct sess_param *test_param) { pjlib_state pjlib_state; struct test_sess *sess; + unsigned i; int rc; - PJ_LOG(3,("", INDENT "%s", title)); + PJ_LOG(3,(THIS_FILE, INDENT "%s", title)); capture_pjlib_state(stun_cfg, &pjlib_state); - rc = create_sess(stun_cfg, server_flag, caller_cfg, callee_cfg, &sess); + rc = create_sess(stun_cfg, server_flag, caller_cfg, callee_cfg, test_param, &sess); if (rc != 0) return rc; @@ -463,10 +511,10 @@ static int perform_test(const char *title, sess->callee.result.init_status!=PJ_EPENDING) /* Wait until both ICE transports are initialized */ - WAIT_UNTIL(30, ALL_READY, rc); + WAIT_UNTIL(30000, ALL_READY, rc); if (!ALL_READY) { - PJ_LOG(3,("", INDENT "err: init timed-out")); + PJ_LOG(3,(THIS_FILE, INDENT "err: init timed-out")); destroy_sess(sess, 500); return -100; } @@ -489,7 +537,6 @@ static int perform_test(const char *title, rc = 0; goto on_return; } - /* Init ICE on caller */ rc = pj_ice_strans_init_ice(sess->caller.ice, sess->caller.cfg.role, &sess->caller.ufrag, &sess->caller.pass); @@ -507,17 +554,14 @@ static int perform_test(const char *title, destroy_sess(sess, 500); return -110; } - /* Start ICE on callee */ rc = start_ice(&sess->callee, &sess->caller); if (rc != PJ_SUCCESS) { destroy_sess(sess, 500); return -120; } - /* Wait for callee's answer_delay */ poll_events(stun_cfg, sess->callee.cfg.answer_delay, PJ_FALSE); - /* Start ICE on caller */ rc = start_ice(&sess->caller, &sess->callee); if (rc != PJ_SUCCESS) { @@ -525,13 +569,37 @@ static int perform_test(const char *title, return -130; } - /* Wait until negotiation is complete on both endpoints */ -#define ALL_DONE (sess->caller.result.nego_status!=PJ_EPENDING && \ - sess->callee.result.nego_status!=PJ_EPENDING) - WAIT_UNTIL(30, ALL_DONE, rc); + for (i=0; i<sess->param->worker_cnt; ++i) { + pj_status_t status; + status = pj_thread_create(sess->pool, "worker_thread", + worker_thread_proc, sess, 0, 0, + &sess->worker_threads[i]); + if (status != PJ_SUCCESS) { + PJ_LOG(3,(THIS_FILE, INDENT "err: create thread")); + destroy_sess(sess, 500); + return -135; + } + } + + if (sess->param->destroy_after_create) + goto on_destroy; + + if (sess->param->destroy_after_one_done) { + while (sess->caller.result.init_status==PJ_EPENDING && + sess->callee.result.init_status==PJ_EPENDING) + { + if (sess->param->worker_cnt) + pj_thread_sleep(0); + else + poll_events(stun_cfg, 0, PJ_FALSE); + } + goto on_destroy; + } + + WAIT_UNTIL(30000, ALL_DONE, rc); if (!ALL_DONE) { - PJ_LOG(3,("", INDENT "err: negotiation timed-out")); + PJ_LOG(3,(THIS_FILE, INDENT "err: negotiation timed-out")); destroy_sess(sess, 500); return -140; } @@ -561,6 +629,7 @@ static int perform_test(const char *title, } /* Looks like everything is okay */ +on_destroy: /* Destroy ICE stream transports first to let it de-allocate * TURN relay (otherwise there'll be timer/memory leak, unless @@ -578,7 +647,7 @@ static int perform_test(const char *title, on_return: /* Wait.. */ - poll_events(stun_cfg, 500, PJ_FALSE); + poll_events(stun_cfg, 200, PJ_FALSE); /* Now destroy everything */ destroy_sess(sess, 500); @@ -591,7 +660,20 @@ on_return: return rc; } - return 0; + return rc; +} + +static int perform_test(const char *title, + pj_stun_config *stun_cfg, + unsigned server_flag, + struct test_cfg *caller_cfg, + struct test_cfg *callee_cfg) +{ + struct sess_param test_param; + + pj_bzero(&test_param, sizeof(test_param)); + return perform_test2(title, stun_cfg, server_flag, caller_cfg, + callee_cfg, &test_param); } #define ROLE1 PJ_ICE_SESS_ROLE_CONTROLLED @@ -680,7 +762,7 @@ int ice_test(void) if (rc != 0) goto on_return; } - + /* Simple test first with srflx candidate */ if (1) { struct sess_cfg_t cfg = @@ -744,7 +826,7 @@ int ice_test(void) {ROLE2, 2, NO, YES, NO, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}} }; - rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, + rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, &cfg.ua1, &cfg.ua2); if (rc != 0) goto on_return; @@ -785,6 +867,7 @@ int ice_test(void) goto on_return; } + /* STUN failure, testing TURN deallocation */ if (1) { struct sess_cfg_t cfg = @@ -792,11 +875,11 @@ int ice_test(void) "STUN failure, testing TURN deallocation", 0xFFFF & (~(CREATE_STUN_SERVER)), /* Role comp# host? stun? turn? flag? ans_del snd_del des_del */ - {ROLE1, 2, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}}, - {ROLE2, 2, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}} + {ROLE1, 1, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}}, + {ROLE2, 1, YES, YES, YES, 0, 0, 0, 0, {PJNATH_ESTUNTIMEDOUT, -1}} }; - rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, + rc = perform_test(cfg.title, &stun_cfg, cfg.server_flag, &cfg.ua1, &cfg.ua2); if (rc != 0) goto on_return; @@ -818,7 +901,7 @@ int ice_test(void) unsigned delay[] = { 50, 2000 }; unsigned d; - PJ_LOG(3,("", " %s", cfg->title)); + PJ_LOG(3,(THIS_FILE, " %s", cfg->title)); /* For each test item, test with various answer delay */ for (d=0; d<PJ_ARRAY_SIZE(delay); ++d) { @@ -876,3 +959,89 @@ on_return: return rc; } +int ice_one_conc_test(pj_stun_config *stun_cfg, int err_quit) +{ + struct sess_cfg_t { + const char *title; + unsigned server_flag; + struct test_cfg ua1; + struct test_cfg ua2; + } cfg = + { + "Concurrency test", + 0xFFFF, + /* Role comp# host? stun? turn? flag? ans_del snd_del des_del */ + {ROLE1, 1, YES, YES, YES, 0, 0, 0, 0, {PJ_SUCCESS, PJ_SUCCESS}}, + {ROLE2, 1, YES, YES, YES, 0, 0, 0, 0, {PJ_SUCCESS, PJ_SUCCESS}} + }; + struct sess_param test_param; + int rc; + + + /* test a: destroy as soon as nego starts */ + cfg.title = " ice test a: immediate destroy"; + pj_bzero(&test_param, sizeof(test_param)); + test_param.worker_cnt = 4; + test_param.worker_timeout = 1000; + test_param.destroy_after_create = PJ_TRUE; + + rc = perform_test2(cfg.title, stun_cfg, cfg.server_flag, + &cfg.ua1, &cfg.ua2, &test_param); + if (rc != 0 && err_quit) + return rc; + + /* test b: destroy as soon as one is done */ + cfg.title = " ice test b: destroy after 1 success"; + test_param.destroy_after_create = PJ_FALSE; + test_param.destroy_after_one_done = PJ_TRUE; + + rc = perform_test2(cfg.title, stun_cfg, cfg.server_flag, + &cfg.ua1, &cfg.ua2, &test_param); + if (rc != 0 && err_quit) + return rc; + + /* test c: normal */ + cfg.title = " ice test c: normal flow"; + pj_bzero(&test_param, sizeof(test_param)); + test_param.worker_cnt = 4; + test_param.worker_timeout = 1000; + + rc = perform_test2(cfg.title, stun_cfg, cfg.server_flag, + &cfg.ua1, &cfg.ua2, &test_param); + if (rc != 0 && err_quit) + return rc; + + return 0; +} + +int ice_conc_test(void) +{ + const unsigned LOOP = 100; + pj_pool_t *pool; + pj_stun_config stun_cfg; + unsigned i; + int rc; + + pool = pj_pool_create(mem, NULL, 512, 512, NULL); + rc = create_stun_config(pool, &stun_cfg); + if (rc != PJ_SUCCESS) { + pj_pool_release(pool); + return -7; + } + + for (i = 0; i < LOOP; i++) { + PJ_LOG(3,(THIS_FILE, INDENT "Test %d of %d", i+1, LOOP)); + rc = ice_one_conc_test(&stun_cfg, PJ_TRUE); + if (rc) + break; + } + + /* Avoid compiler warning */ + goto on_return; + +on_return: + destroy_stun_config(&stun_cfg); + pj_pool_release(pool); + + return rc; +} diff --git a/pjnath/src/pjnath-test/sess_auth.c b/pjnath/src/pjnath-test/sess_auth.c index 05a6209..2364260 100644 --- a/pjnath/src/pjnath-test/sess_auth.c +++ b/pjnath/src/pjnath-test/sess_auth.c @@ -1,4 +1,4 @@ -/* $Id: sess_auth.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: sess_auth.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -247,7 +247,7 @@ static int create_std_server(pj_stun_auth_type auth_type, pj_bzero(&sess_cb, sizeof(sess_cb)); sess_cb.on_rx_request = &server_on_rx_request; sess_cb.on_send_msg = &server_send_msg; - status = pj_stun_session_create(&stun_cfg, "server", &sess_cb, PJ_FALSE, &server->sess); + status = pj_stun_session_create(&stun_cfg, "server", &sess_cb, PJ_FALSE, NULL, &server->sess); if (status != PJ_SUCCESS) { destroy_server(); return -10; @@ -479,7 +479,7 @@ static int run_client_test(const char *title, pj_bzero(&sess_cb, sizeof(sess_cb)); sess_cb.on_request_complete = &client_on_request_complete; sess_cb.on_send_msg = &client_send_msg; - status = pj_stun_session_create(&stun_cfg, "client", &sess_cb, PJ_FALSE, &client->sess); + status = pj_stun_session_create(&stun_cfg, "client", &sess_cb, PJ_FALSE, NULL, &client->sess); if (status != PJ_SUCCESS) { destroy_client_server(); return -200; diff --git a/pjnath/src/pjnath-test/stun_sock_test.c b/pjnath/src/pjnath-test/stun_sock_test.c index 7a309ea..3c3cecc 100644 --- a/pjnath/src/pjnath-test/stun_sock_test.c +++ b/pjnath/src/pjnath-test/stun_sock_test.c @@ -1,4 +1,4 @@ -/* $Id: stun_sock_test.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: stun_sock_test.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -298,7 +298,7 @@ static int timeout_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) struct stun_client *client; pj_str_t srv_addr; pj_time_val timeout, t; - int ret = 0; + int i, ret = 0; pj_status_t status; PJ_LOG(3,(THIS_FILE, " timeout test [%d]", destroy_on_err)); @@ -359,6 +359,8 @@ static int timeout_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) on_return: destroy_server(srv); destroy_client(client); + for (i=0; i<7; ++i) + handle_events(cfg, 50); return ret; } @@ -373,7 +375,7 @@ static int missing_attr_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) struct stun_client *client; pj_str_t srv_addr; pj_time_val timeout, t; - int ret = 0; + int i, ret = 0; pj_status_t status; PJ_LOG(3,(THIS_FILE, " missing attribute test [%d]", destroy_on_err)); @@ -426,6 +428,8 @@ static int missing_attr_test(pj_stun_config *cfg, pj_bool_t destroy_on_err) on_return: destroy_server(srv); destroy_client(client); + for (i=0; i<7; ++i) + handle_events(cfg, 50); return ret; } @@ -440,7 +444,7 @@ static int keep_alive_test(pj_stun_config *cfg) pj_stun_sock_info info; pj_str_t srv_addr; pj_time_val timeout, t; - int ret = 0; + int i, ret = 0; pj_status_t status; PJ_LOG(3,(THIS_FILE, " normal operation")); @@ -791,6 +795,8 @@ static int keep_alive_test(pj_stun_config *cfg) on_return: destroy_server(srv); destroy_client(client); + for (i=0; i<7; ++i) + handle_events(cfg, 50); return ret; } diff --git a/pjnath/src/pjnath-test/test.c b/pjnath/src/pjnath-test/test.c index 081df25..5277585 100644 --- a/pjnath/src/pjnath-test/test.c +++ b/pjnath/src/pjnath-test/test.c @@ -1,4 +1,4 @@ -/* $Id: test.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: test.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -34,6 +34,7 @@ pj_status_t create_stun_config(pj_pool_t *pool, pj_stun_config *stun_cfg) { pj_ioqueue_t *ioqueue; pj_timer_heap_t *timer_heap; + pj_lock_t *lock; pj_status_t status; status = pj_ioqueue_create(pool, 64, &ioqueue); @@ -49,6 +50,9 @@ pj_status_t create_stun_config(pj_pool_t *pool, pj_stun_config *stun_cfg) return status; } + pj_lock_create_recursive_mutex(pool, NULL, &lock); + pj_timer_heap_set_lock(timer_heap, lock, PJ_TRUE); + pj_stun_config_init(stun_cfg, mem, 0, ioqueue, timer_heap); return PJ_SUCCESS; @@ -105,7 +109,7 @@ void capture_pjlib_state(pj_stun_config *cfg, struct pjlib_state *st) st->timer_cnt = pj_timer_heap_count(cfg->timer_heap); - cp = (pj_caching_pool*)mem; + cp = (pj_caching_pool*)cfg->pf; st->pool_used_cnt = cp->used_count; } @@ -120,6 +124,10 @@ int check_pjlib_state(pj_stun_config *cfg, if (current_state.timer_cnt > initial_st->timer_cnt) { PJ_LOG(3,("", " error: possibly leaking timer")); rc |= ERR_TIMER_LEAK; + +#if PJ_TIMER_DEBUG + pj_timer_heap_dump(cfg->timer_heap); +#endif } if (current_state.pool_used_cnt > initial_st->pool_used_cnt) { @@ -148,6 +156,18 @@ pj_pool_factory *mem; int param_log_decor = PJ_LOG_HAS_NEWLINE | PJ_LOG_HAS_TIME | PJ_LOG_HAS_MICRO_SEC; +pj_log_func *orig_log_func; +FILE *log_file; + +static void test_log_func(int level, const char *data, int len) +{ + if (log_file) { + fwrite(data, len, 1, log_file); + } + if (level <= 3) + orig_log_func(level, data, len); +} + static int test_inner(void) { pj_caching_pool caching_pool; @@ -158,6 +178,11 @@ static int test_inner(void) #if 1 pj_log_set_level(3); pj_log_set_decor(param_log_decor); +#elif 1 + log_file = fopen("pjnath-test.log", "wt"); + pj_log_set_level(5); + orig_log_func = pj_log_get_log_func(); + pj_log_set_log_func(&test_log_func); #endif rc = pj_init(); @@ -189,7 +214,13 @@ static int test_inner(void) DO_TEST(turn_sock_test()); #endif +#if INCLUDE_CONCUR_TEST + DO_TEST(concur_test()); +#endif + on_return: + if (log_file) + fclose(log_file); return rc; } diff --git a/pjnath/src/pjnath-test/test.h b/pjnath/src/pjnath-test/test.h index bbba992..504f2f7 100644 --- a/pjnath/src/pjnath-test/test.h +++ b/pjnath/src/pjnath-test/test.h @@ -1,4 +1,4 @@ -/* $Id: test.h 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: test.h 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -25,17 +25,21 @@ #define INCLUDE_ICE_TEST 1 #define INCLUDE_STUN_SOCK_TEST 1 #define INCLUDE_TURN_SOCK_TEST 1 +#define INCLUDE_CONCUR_TEST 1 int stun_test(void); int sess_auth_test(void); int stun_sock_test(void); int turn_sock_test(void); int ice_test(void); +int concur_test(void); int test_main(void); extern void app_perror(const char *title, pj_status_t rc); extern pj_pool_factory *mem; +int ice_one_conc_test(pj_stun_config *stun_cfg, int err_quit); + //////////////////////////////////// /* * Utilities diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c index 05f39bc..30a40f6 100644 --- a/pjnath/src/pjnath/ice_session.c +++ b/pjnath/src/pjnath/ice_session.c @@ -1,4 +1,4 @@ -/* $Id: ice_session.c 3999 2012-03-30 07:10:13Z bennylp $ */ +/* $Id: ice_session.c 4365 2013-02-21 18:06:51Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -97,6 +97,7 @@ static pj_uint8_t cand_type_prefs[4] = #endif }; +#define THIS_FILE "ice_session.c" #define CHECK_NAME_LEN 128 #define LOG4(expr) PJ_LOG(4,expr) #define LOG5(expr) PJ_LOG(4,expr) @@ -134,6 +135,7 @@ typedef struct timer_data static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te); static void on_ice_complete(pj_ice_sess *ice, pj_status_t status); static void ice_keep_alive(pj_ice_sess *ice, pj_bool_t send_now); +static void ice_on_destroy(void *obj); static void destroy_ice(pj_ice_sess *ice, pj_status_t reason); static pj_status_t start_periodic_check(pj_timer_heap_t *th, @@ -288,6 +290,7 @@ static pj_status_t init_comp(pj_ice_sess *ice, /* Create STUN session for this candidate */ status = pj_stun_session_create(&ice->stun_cfg, NULL, &sess_cb, PJ_TRUE, + ice->grp_lock, &comp->stun_sess); if (status != PJ_SUCCESS) return status; @@ -332,6 +335,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg, const pj_ice_sess_cb *cb, const pj_str_t *local_ufrag, const pj_str_t *local_passwd, + pj_grp_lock_t *grp_lock, pj_ice_sess **p_ice) { pj_pool_t *pool; @@ -359,13 +363,20 @@ PJ_DEF(pj_status_t) pj_ice_sess_create(pj_stun_config *stun_cfg, pj_ansi_snprintf(ice->obj_name, sizeof(ice->obj_name), name, ice); - status = pj_mutex_create_recursive(pool, ice->obj_name, - &ice->mutex); - if (status != PJ_SUCCESS) { - destroy_ice(ice, status); - return status; + if (grp_lock) { + ice->grp_lock = grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &ice->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(ice->grp_lock); + pj_grp_lock_add_handler(ice->grp_lock, pool, ice, + &ice_on_destroy); + pj_memcpy(&ice->cb, cb, sizeof(*cb)); pj_memcpy(&ice->stun_cfg, stun_cfg, sizeof(*stun_cfg)); @@ -444,6 +455,21 @@ PJ_DEF(pj_status_t) pj_ice_sess_set_options(pj_ice_sess *ice, /* + * Callback to really destroy the session + */ +static void ice_on_destroy(void *obj) +{ + pj_ice_sess *ice = (pj_ice_sess*) obj; + + if (ice->pool) { + pj_pool_t *pool = ice->pool; + ice->pool = NULL; + pj_pool_release(pool); + } + LOG4((THIS_FILE, "ICE session %p destroyed", ice)); +} + +/* * Destroy */ static void destroy_ice(pj_ice_sess *ice, @@ -452,21 +478,21 @@ static void destroy_ice(pj_ice_sess *ice, unsigned i; if (reason == PJ_SUCCESS) { - LOG4((ice->obj_name, "Destroying ICE session")); + LOG4((ice->obj_name, "Destroying ICE session %p", ice)); } - /* Let other callbacks finish */ - if (ice->mutex) { - pj_mutex_lock(ice->mutex); - pj_mutex_unlock(ice->mutex); - } + pj_grp_lock_acquire(ice->grp_lock); - if (ice->timer.id) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, - &ice->timer); - ice->timer.id = PJ_FALSE; + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return; } + ice->is_destroying = PJ_TRUE; + + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, + &ice->timer, PJ_FALSE); + for (i=0; i<ice->comp_cnt; ++i) { if (ice->comp[i].stun_sess) { pj_stun_session_destroy(ice->comp[i].stun_sess); @@ -474,21 +500,12 @@ static void destroy_ice(pj_ice_sess *ice, } } - if (ice->clist.timer.id) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->clist.timer); - ice->clist.timer.id = PJ_FALSE; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, + &ice->clist.timer, + PJ_FALSE); - if (ice->mutex) { - pj_mutex_destroy(ice->mutex); - ice->mutex = NULL; - } - - if (ice->pool) { - pj_pool_t *pool = ice->pool; - ice->pool = NULL; - pj_pool_release(pool); - } + pj_grp_lock_dec_ref(ice->grp_lock); + pj_grp_lock_release(ice->grp_lock); } @@ -701,13 +718,14 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice, { pj_ice_sess_cand *lcand; pj_status_t status = PJ_SUCCESS; + char address[PJ_INET6_ADDRSTRLEN]; PJ_ASSERT_RETURN(ice && comp_id && foundation && addr && base_addr && addr_len, PJ_EINVAL); PJ_ASSERT_RETURN(comp_id <= ice->comp_cnt, PJ_EINVAL); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); if (ice->lcand_cnt >= PJ_ARRAY_SIZE(ice->lcand)) { status = PJ_ETOOMANY; @@ -720,13 +738,14 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice, lcand->type = type; pj_strdup(ice->pool, &lcand->foundation, foundation); lcand->prio = CALC_CAND_PRIO(ice, type, local_pref, lcand->comp_id); - pj_memcpy(&lcand->addr, addr, addr_len); - pj_memcpy(&lcand->base_addr, base_addr, addr_len); + pj_sockaddr_cp(&lcand->addr, addr); + pj_sockaddr_cp(&lcand->base_addr, base_addr); if (rel_addr == NULL) rel_addr = base_addr; pj_memcpy(&lcand->rel_addr, rel_addr, addr_len); - pj_ansi_strcpy(ice->tmp.txt, pj_inet_ntoa(lcand->addr.ipv4.sin_addr)); + pj_ansi_strcpy(ice->tmp.txt, pj_sockaddr_print(&lcand->addr, address, + sizeof(address), 0)); LOG4((ice->obj_name, "Candidate %d added: comp_id=%d, type=%s, foundation=%.*s, " "addr=%s:%d, base=%s:%d, prio=0x%x (%u)", @@ -736,9 +755,9 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice, (int)lcand->foundation.slen, lcand->foundation.ptr, ice->tmp.txt, - (int)pj_ntohs(lcand->addr.ipv4.sin_port), - pj_inet_ntoa(lcand->base_addr.ipv4.sin_addr), - (int)pj_htons(lcand->base_addr.ipv4.sin_port), + pj_sockaddr_get_port(&lcand->addr), + pj_sockaddr_print(&lcand->base_addr, address, sizeof(address), 0), + pj_sockaddr_get_port(&lcand->base_addr), lcand->prio, lcand->prio)); if (p_cand_id) @@ -747,7 +766,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice, ++ice->lcand_cnt; on_error: - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return status; } @@ -764,7 +783,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, *cand_id = -1; - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); /* First find in valid list if we have nominated pair */ for (i=0; i<ice->valid_list.count; ++i) { @@ -772,7 +791,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, if (check->lcand->comp_id == comp_id) { *cand_id = GET_LCAND_ID(check->lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } @@ -784,7 +803,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, lcand->type == PJ_ICE_CAND_TYPE_RELAYED) { *cand_id = GET_LCAND_ID(lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } @@ -797,7 +816,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, lcand->type == PJ_ICE_CAND_TYPE_PRFLX)) { *cand_id = GET_LCAND_ID(lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } @@ -809,13 +828,13 @@ PJ_DEF(pj_status_t) pj_ice_sess_find_default_cand(pj_ice_sess *ice, lcand->type == PJ_ICE_CAND_TYPE_HOST) { *cand_id = GET_LCAND_ID(lcand); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } } /* Still no candidate is found! :( */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_assert(!"Should have a candidate by now"); return PJ_EBUG; @@ -875,25 +894,24 @@ static const char *dump_check(char *buffer, unsigned bufsize, { const pj_ice_sess_cand *lcand = check->lcand; const pj_ice_sess_cand *rcand = check->rcand; - char laddr[PJ_INET6_ADDRSTRLEN]; + char laddr[PJ_INET6_ADDRSTRLEN], raddr[PJ_INET6_ADDRSTRLEN]; int len; PJ_CHECK_STACK(); - pj_ansi_strcpy(laddr, pj_inet_ntoa(lcand->addr.ipv4.sin_addr)); - - if (lcand->addr.addr.sa_family == pj_AF_INET()) { - len = pj_ansi_snprintf(buffer, bufsize, - "%d: [%d] %s:%d-->%s:%d", - (int)GET_CHECK_ID(clist, check), - check->lcand->comp_id, - laddr, (int)pj_ntohs(lcand->addr.ipv4.sin_port), - pj_inet_ntoa(rcand->addr.ipv4.sin_addr), - (int)pj_ntohs(rcand->addr.ipv4.sin_port)); - } else { - len = pj_ansi_snprintf(buffer, bufsize, "IPv6->IPv6"); - } + pj_ansi_strcpy(laddr, pj_sockaddr_print(&lcand->addr, laddr, + sizeof(laddr), 0)); + len = pj_ansi_snprintf(buffer, bufsize, + "%d: [%d] %s:%d-->%s:%d", + (int)GET_CHECK_ID(clist, check), + check->lcand->comp_id, + pj_sockaddr_print(&lcand->addr, laddr, + sizeof(laddr), 0), + pj_sockaddr_get_port(&lcand->addr), + pj_sockaddr_print(&rcand->addr, raddr, + sizeof(raddr), 0), + pj_sockaddr_get_port(&rcand->addr)); if (len < 0) len = 0; @@ -964,6 +982,7 @@ static void sort_checklist(pj_ice_sess *ice, pj_ice_sess_checklist *clist) } } + pj_assert(clist->count > 0); for (i=0; i<clist->count-1; ++i) { unsigned j, highest = i; @@ -996,32 +1015,6 @@ static void sort_checklist(pj_ice_sess *ice, pj_ice_sess_checklist *clist) } } -enum -{ - SOCKADDR_EQUAL = 0, - SOCKADDR_NOT_EQUAL = 1 -}; - -/* Utility: compare sockaddr. - * Returns 0 if equal. - */ -static int sockaddr_cmp(const pj_sockaddr *a1, const pj_sockaddr *a2) -{ - if (a1->addr.sa_family != a2->addr.sa_family) - return SOCKADDR_NOT_EQUAL; - - if (a1->addr.sa_family == pj_AF_INET()) { - return !(a1->ipv4.sin_addr.s_addr == a2->ipv4.sin_addr.s_addr && - a1->ipv4.sin_port == a2->ipv4.sin_port); - } else if (a1->addr.sa_family == pj_AF_INET6()) { - return pj_memcmp(&a1->ipv6, &a2->ipv6, sizeof(a1->ipv6)); - } else { - pj_assert(!"Invalid address family!"); - return SOCKADDR_NOT_EQUAL; - } -} - - /* Prune checklist, this must have been done after the checklist * is sorted. */ @@ -1053,7 +1046,7 @@ static pj_status_t prune_checklist(pj_ice_sess *ice, if (host->type != PJ_ICE_CAND_TYPE_HOST) continue; - if (sockaddr_cmp(&srflx->base_addr, &host->addr) == 0) { + if (pj_sockaddr_cmp(&srflx->base_addr, &host->addr) == 0) { /* Replace this SRFLX with its BASE */ clist->checks[i].lcand = host; break; @@ -1061,11 +1054,13 @@ static pj_status_t prune_checklist(pj_ice_sess *ice, } if (j==ice->lcand_cnt) { + char baddr[PJ_INET6_ADDRSTRLEN]; /* Host candidate not found this this srflx! */ LOG4((ice->obj_name, "Base candidate %s:%d not found for srflx candidate %d", - pj_inet_ntoa(srflx->base_addr.ipv4.sin_addr), - pj_ntohs(srflx->base_addr.ipv4.sin_port), + pj_sockaddr_print(&srflx->base_addr, baddr, + sizeof(baddr), 0), + pj_sockaddr_get_port(&srflx->base_addr), GET_LCAND_ID(clist->checks[i].lcand))); return PJNATH_EICENOHOSTCAND; } @@ -1093,7 +1088,7 @@ static pj_status_t prune_checklist(pj_ice_sess *ice, if ((licand == ljcand) && (ricand == rjcand)) { reason = "duplicate found"; } else if ((rjcand == ricand) && - (sockaddr_cmp(&ljcand->base_addr, + (pj_sockaddr_cmp(&ljcand->base_addr, &licand->base_addr)==0)) { reason = "equal base"; @@ -1124,14 +1119,20 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te) { pj_ice_sess *ice = (pj_ice_sess*) te->user_data; enum timer_type type = (enum timer_type)te->id; - pj_bool_t has_mutex = PJ_TRUE; PJ_UNUSED_ARG(th); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); te->id = TIMER_NONE; + if (ice->is_destroying) { + /* Stray timer, could happen when destroy is invoked while callback + * is pending. */ + pj_grp_lock_release(ice->grp_lock); + return; + } + switch (type) { case TIMER_CONTROLLED_WAIT_NOM: LOG4((ice->obj_name, @@ -1154,8 +1155,6 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te) /* Release mutex in case app destroy us in the callback */ ice_status = ice->ice_status; on_ice_complete = ice->cb.on_ice_complete; - has_mutex = PJ_FALSE; - pj_mutex_unlock(ice->mutex); /* Notify app about ICE completion*/ if (on_ice_complete) @@ -1173,8 +1172,7 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te) break; } - if (has_mutex) - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); } /* Send keep-alive */ @@ -1232,8 +1230,10 @@ done: ice->comp_cnt; pj_time_val_normalize(&delay); - ice->timer.id = TIMER_KEEP_ALIVE; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, &ice->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_KEEP_ALIVE, + ice->grp_lock); } else { pj_assert(!"Not expected any timer active"); @@ -1247,10 +1247,8 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) ice->is_complete = PJ_TRUE; ice->ice_status = status; - if (ice->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer); - ice->timer.id = TIMER_NONE; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer, + TIMER_NONE); /* Log message */ LOG4((ice->obj_name, "ICE process complete, status=%s", @@ -1263,9 +1261,10 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) if (ice->cb.on_ice_complete) { pj_time_val delay = {0, 0}; - ice->timer.id = TIMER_COMPLETION_CALLBACK; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &ice->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_COMPLETION_CALLBACK, + ice->grp_lock); } } } @@ -1493,10 +1492,11 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice, delay.msec = ice->opt.controlled_agent_want_nom_timeout; pj_time_val_normalize(&delay); - ice->timer.id = TIMER_CONTROLLED_WAIT_NOM; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &ice->timer, - &delay); + pj_timer_heap_schedule_w_grp_lock( + ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_CONTROLLED_WAIT_NOM, + ice->grp_lock); LOG5((ice->obj_name, "All checks have completed. Controlled agent now " @@ -1572,10 +1572,8 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice, "Scheduling nominated check in %d ms", ice->opt.nominated_check_delay)); - if (ice->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer); - ice->timer.id = TIMER_NONE; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer, + TIMER_NONE); /* All components have valid pair. Let connectivity checks run for * a little bit more time, then start our nominated check. @@ -1584,8 +1582,10 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice, delay.msec = ice->opt.nominated_check_delay; pj_time_val_normalize(&delay); - ice->timer.id = TIMER_START_NOMINATED_CHECK; - pj_timer_heap_schedule(ice->stun_cfg.timer_heap, &ice->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->timer, &delay, + TIMER_START_NOMINATED_CHECK, + ice->grp_lock); return PJ_FALSE; } @@ -1615,7 +1615,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( PJ_ASSERT_RETURN(rcand_cnt + ice->rcand_cnt <= PJ_ICE_MAX_CAND, PJ_ETOOMANY); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); /* Save credentials */ username.ptr = buf; @@ -1663,7 +1663,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( pj_ice_sess_check *chk = &clist->checks[clist->count]; if (clist->count >= PJ_ICE_MAX_CHECKS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_ETOOMANY; } @@ -1688,13 +1688,20 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( } } + /* This could happen if candidates have no matching address families */ + if (clist->count == 0) { + LOG4((ice->obj_name, "Error: no checklist can be created")); + pj_grp_lock_release(ice->grp_lock); + return PJ_ENOTFOUND; + } + /* Sort checklist based on priority */ sort_checklist(ice, clist); /* Prune the checklist */ status = prune_checklist(ice, clist); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return status; } @@ -1721,7 +1728,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list( /* Log checklist */ dump_checklist("Checklist created:", ice, clist); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -1809,7 +1816,8 @@ static pj_status_t perform_check(pj_ice_sess *ice, /* Initiate STUN transaction to send the request */ status = pj_stun_session_send_msg(comp->stun_sess, msg_data, PJ_FALSE, PJ_TRUE, &rcand->addr, - sizeof(pj_sockaddr_in), check->tdata); + pj_sockaddr_get_len(&rcand->addr), + check->tdata); if (status != PJ_SUCCESS) { check->tdata = NULL; pjnath_perror(ice->obj_name, "Error sending STUN request", status); @@ -1840,7 +1848,12 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, ice = td->ice; clist = td->clist; - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return PJ_SUCCESS; + } /* Set timer ID to FALSE first */ te->id = PJ_FALSE; @@ -1860,7 +1873,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, if (check->state == PJ_ICE_SESS_CHECK_STATE_WAITING) { status = perform_check(ice, clist, i, ice->is_nominating); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return status; } @@ -1880,7 +1893,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, if (check->state == PJ_ICE_SESS_CHECK_STATE_FROZEN) { status = perform_check(ice, clist, i, ice->is_nominating); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return status; } @@ -1897,12 +1910,12 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th, /* Schedule for next timer */ pj_time_val timeout = {0, PJ_ICE_TA_VAL}; - te->id = PJ_TRUE; pj_time_val_normalize(&timeout); - pj_timer_heap_schedule(th, te, &timeout); + pj_timer_heap_schedule_w_grp_lock(th, te, &timeout, PJ_TRUE, + ice->grp_lock); } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return PJ_SUCCESS; } @@ -1922,8 +1935,8 @@ static void start_nominated_check(pj_ice_sess *ice) /* Stop our timer if it's active */ if (ice->timer.id == TIMER_START_NOMINATED_CHECK) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->timer); - ice->timer.id = TIMER_NONE; + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, &ice->timer, + TIMER_NONE); } /* For each component, set the check state of valid check with @@ -1951,18 +1964,15 @@ static void start_nominated_check(pj_ice_sess *ice) } /* And (re)start the periodic check */ - if (ice->clist.timer.id) { - pj_timer_heap_cancel(ice->stun_cfg.timer_heap, &ice->clist.timer); - ice->clist.timer.id = PJ_FALSE; - } + pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap, + &ice->clist.timer, PJ_FALSE); - ice->clist.timer.id = PJ_TRUE; delay.sec = delay.msec = 0; - status = pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &ice->clist.timer, &delay); - if (status != PJ_SUCCESS) { - ice->clist.timer.id = PJ_FALSE; - } else { + status = pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &ice->clist.timer, &delay, + PJ_TRUE, + ice->grp_lock); + if (status == PJ_SUCCESS) { LOG5((ice->obj_name, "Periodic timer rescheduled..")); } @@ -2012,7 +2022,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice) PJ_ASSERT_RETURN(ice->clist.count > 0, PJ_EINVALIDOP); /* Lock session */ - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); LOG4((ice->obj_name, "Starting ICE check..")); pj_log_push_indent(); @@ -2042,7 +2052,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice) } if (i == clist->count) { pj_assert(!"Unable to find checklist for component 1"); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return PJNATH_EICEINCOMPID; } @@ -2096,15 +2106,15 @@ PJ_DEF(pj_status_t) pj_ice_sess_start_check(pj_ice_sess *ice) * instead to reduce stack usage: * return start_periodic_check(ice->stun_cfg.timer_heap, &clist->timer); */ - clist->timer.id = PJ_TRUE; delay.sec = delay.msec = 0; - status = pj_timer_heap_schedule(ice->stun_cfg.timer_heap, - &clist->timer, &delay); + status = pj_timer_heap_schedule_w_grp_lock(ice->stun_cfg.timer_heap, + &clist->timer, &delay, + PJ_TRUE, ice->grp_lock); if (status != PJ_SUCCESS) { clist->timer.id = PJ_FALSE; } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); pj_log_pop_indent(); return status; } @@ -2125,9 +2135,22 @@ static pj_status_t on_stun_send_msg(pj_stun_session *sess, stun_data *sd = (stun_data*) pj_stun_session_get_user_data(sess); pj_ice_sess *ice = sd->ice; pj_ice_msg_data *msg_data = (pj_ice_msg_data*) token; + pj_status_t status; - return (*ice->cb.on_tx_pkt)(ice, sd->comp_id, msg_data->transport_id, - pkt, pkt_size, dst_addr, addr_len); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + /* Stray retransmit timer that could happen while + * we're being destroyed */ + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } + + status = (*ice->cb.on_tx_pkt)(ice, sd->comp_id, msg_data->transport_id, + pkt, pkt_size, dst_addr, addr_len); + + pj_grp_lock_release(ice->grp_lock); + return status; } @@ -2162,7 +2185,13 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, pj_assert(tdata == check->tdata); check->tdata = NULL; - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + /* Not sure if this is possible but just in case */ + pj_grp_lock_release(ice->grp_lock); + return; + } /* Init lcand to NULL. lcand will be found from the mapped address * found in the response. @@ -2213,7 +2242,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, perform_check(ice, clist, msg_data->data.req.ckid, check->nominated || ice->is_nominating); pj_log_pop_indent(); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2228,7 +2257,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status); on_check_complete(ice, check); pj_log_pop_indent(); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2241,7 +2270,8 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, * the response match the source IP address and port that the Binding * Request was sent from. */ - if (sockaddr_cmp(&check->rcand->addr, (const pj_sockaddr*)src_addr) != 0) { + if (pj_sockaddr_cmp(&check->rcand->addr, (const pj_sockaddr*)src_addr)!=0) + { status = PJNATH_EICEINSRCADDR; LOG4((ice->obj_name, "Check %s%s: connectivity check FAILED: source address mismatch", @@ -2252,7 +2282,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status); on_check_complete(ice, check); pj_log_pop_indent(); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2285,14 +2315,14 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, PJNATH_ESTUNNOMAPPEDADDR); on_check_complete(ice, check); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } /* Find local candidate that matches the XOR-MAPPED-ADDRESS */ pj_assert(lcand == NULL); for (i=0; i<ice->lcand_cnt; ++i) { - if (sockaddr_cmp(&xaddr->sockaddr, &ice->lcand[i].addr) == 0) { + if (pj_sockaddr_cmp(&xaddr->sockaddr, &ice->lcand[i].addr) == 0) { /* Match */ lcand = &ice->lcand[i]; break; @@ -2328,12 +2358,13 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, &xaddr->sockaddr, &check->lcand->base_addr, &check->lcand->base_addr, - sizeof(pj_sockaddr_in), &cand_id); + pj_sockaddr_get_len(&xaddr->sockaddr), + &cand_id); if (status != PJ_SUCCESS) { check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status); on_check_complete(ice, check); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } @@ -2393,11 +2424,11 @@ static void on_stun_request_complete(pj_stun_session *stun_sess, */ if (on_check_complete(ice, check)) { /* ICE complete! */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return; } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); } @@ -2438,7 +2469,12 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, sd = (stun_data*) pj_stun_session_get_user_data(sess); ice = sd->ice; - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } /* * Note: @@ -2453,7 +2489,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_PRIORITY, 0); if (prio_attr == NULL) { LOG5((ice->obj_name, "Received Binding request with no PRIORITY")); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -2498,7 +2534,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, pj_stun_session_respond(sess, rdata, PJ_STUN_SC_ROLE_CONFLICT, NULL, token, PJ_TRUE, src_addr, src_addr_len); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -2510,7 +2546,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, pj_stun_session_respond(sess, rdata, PJ_STUN_SC_ROLE_CONFLICT, NULL, token, PJ_TRUE, src_addr, src_addr_len); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } else { /* Switch role to controlled */ @@ -2525,7 +2561,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, */ status = pj_stun_session_create_res(sess, rdata, 0, NULL, &tdata); if (status != PJ_SUCCESS) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return status; } @@ -2562,7 +2598,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, rcheck->comp_id = sd->comp_id; rcheck->transport_id = ((pj_ice_msg_data*)token)->transport_id; rcheck->src_addr_len = src_addr_len; - pj_memcpy(&rcheck->src_addr, src_addr, src_addr_len); + pj_sockaddr_cp(&rcheck->src_addr, src_addr); rcheck->use_candidate = (uc_attr != NULL); rcheck->priority = prio_attr->value; rcheck->role_attr = role_attr; @@ -2577,7 +2613,7 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess, handle_incoming_check(ice, rcheck); } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_SUCCESS; } @@ -2600,7 +2636,7 @@ static void handle_incoming_check(pj_ice_sess *ice, * the request. */ for (i=0; i<ice->rcand_cnt; ++i) { - if (sockaddr_cmp(&rcheck->src_addr, &ice->rcand[i].addr)==0) + if (pj_sockaddr_cmp(&rcheck->src_addr, &ice->rcand[i].addr)==0) break; } @@ -2610,6 +2646,7 @@ static void handle_incoming_check(pj_ice_sess *ice, * candidate. */ if (i == ice->rcand_cnt) { + char raddr[PJ_INET6_ADDRSTRLEN]; if (ice->rcand_cnt >= PJ_ICE_MAX_CAND) { LOG4((ice->obj_name, "Unable to add new peer reflexive candidate: too many " @@ -2621,7 +2658,7 @@ static void handle_incoming_check(pj_ice_sess *ice, rcand->comp_id = (pj_uint8_t)rcheck->comp_id; rcand->type = PJ_ICE_CAND_TYPE_PRFLX; rcand->prio = rcheck->priority; - pj_memcpy(&rcand->addr, &rcheck->src_addr, rcheck->src_addr_len); + pj_sockaddr_cp(&rcand->addr, &rcheck->src_addr); /* Foundation is random, unique from other foundation */ rcand->foundation.ptr = (char*) pj_pool_alloc(ice->pool, 36); @@ -2630,9 +2667,9 @@ static void handle_incoming_check(pj_ice_sess *ice, rcand->foundation.ptr); LOG4((ice->obj_name, - "Added new remote candidate from the request: %s:%d", - pj_inet_ntoa(rcand->addr.ipv4.sin_addr), - (int)pj_ntohs(rcand->addr.ipv4.sin_port))); + "Added new remote candidate from the request: %s:%d", + pj_sockaddr_print(&rcand->addr, raddr, sizeof(raddr), 0), + pj_sockaddr_get_port(&rcand->addr))); } else { /* Remote candidate found */ @@ -2649,7 +2686,7 @@ static void handle_incoming_check(pj_ice_sess *ice, for (i=0; i<ice->clist.count; ++i) { pj_ice_sess_check *c = &ice->clist.checks[i]; if (/*c->lcand == lcand ||*/ - sockaddr_cmp(&c->lcand->base_addr, &lcand->base_addr)==0) + pj_sockaddr_cmp(&c->lcand->base_addr, &lcand->base_addr)==0) { lcand = c->lcand; break; @@ -2732,7 +2769,7 @@ static void handle_incoming_check(pj_ice_sess *ice, LOG5((ice->obj_name, "Triggered check for check %d not performed " "because it's in progress. Retransmitting", i)); pj_log_push_indent(); - pj_stun_session_retransmit_req(comp->stun_sess, c->tdata); + pj_stun_session_retransmit_req(comp->stun_sess, c->tdata, PJ_FALSE); pj_log_pop_indent(); } else if (c->state == PJ_ICE_SESS_CHECK_STATE_SUCCEEDED) { @@ -2866,18 +2903,23 @@ PJ_DEF(pj_status_t) pj_ice_sess_send_data(pj_ice_sess *ice, return PJNATH_EICEINCOMPID; } - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } comp = find_comp(ice, comp_id); if (comp == NULL) { status = PJNATH_EICEINCOMPID; - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); goto on_return; } if (comp->valid_check == NULL) { status = PJNATH_EICEINPROGRESS; - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); goto on_return; } @@ -2886,12 +2928,14 @@ PJ_DEF(pj_status_t) pj_ice_sess_send_data(pj_ice_sess *ice, pj_sockaddr_cp(&addr, &comp->valid_check->rcand->addr); /* Release the mutex now to avoid deadlock (see ticket #1451). */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); + + PJ_RACE_ME(5); status = (*ice->cb.on_tx_pkt)(ice, comp_id, transport_id, data, data_len, &addr, - sizeof(pj_sockaddr_in)); + pj_sockaddr_get_len(&addr)); on_return: return status; @@ -2913,11 +2957,16 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice, PJ_ASSERT_RETURN(ice, PJ_EINVAL); - pj_mutex_lock(ice->mutex); + pj_grp_lock_acquire(ice->grp_lock); + + if (ice->is_destroying) { + pj_grp_lock_release(ice->grp_lock); + return PJ_EINVALIDOP; + } comp = find_comp(ice, comp_id); if (comp == NULL) { - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJNATH_EICEINCOMPID; } @@ -2930,7 +2979,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice, } if (msg_data == NULL) { pj_assert(!"Invalid transport ID"); - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); return PJ_EINVAL; } @@ -2950,12 +2999,14 @@ PJ_DEF(pj_status_t) pj_ice_sess_on_rx_pkt(pj_ice_sess *ice, LOG4((ice->obj_name, "Error processing incoming message: %s", ice->tmp.errmsg)); } - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); } else { /* Not a STUN packet. Call application's callback instead, but release * the mutex now or otherwise we may get deadlock. */ - pj_mutex_unlock(ice->mutex); + pj_grp_lock_release(ice->grp_lock); + + PJ_RACE_ME(5); (*ice->cb.on_rx_data)(ice, comp_id, transport_id, pkt, pkt_size, src_addr, src_addr_len); diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c index 8ae2a90..2df77bf 100644 --- a/pjnath/src/pjnath/ice_strans.c +++ b/pjnath/src/pjnath/ice_strans.c @@ -1,4 +1,4 @@ -/* $Id: ice_strans.c 4133 2012-05-21 14:00:17Z bennylp $ */ +/* $Id: ice_strans.c 4412 2013-03-05 03:12:32Z riza $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -31,8 +31,9 @@ #include <pj/string.h> #include <pj/compat/socket.h> +#define ENABLE_TRACE 0 -#if 0 +#if defined(ENABLE_TRACE) && (ENABLE_TRACE != 0) # define TRACE_PKT(expr) PJ_LOG(5,expr) #else # define TRACE_PKT(expr) @@ -126,13 +127,11 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, /* Forward decls */ +static void ice_st_on_destroy(void *obj); static void destroy_ice_st(pj_ice_strans *ice_st); #define ice_st_perror(ice_st,msg,rc) pjnath_perror(ice_st->obj_name,msg,rc) static void sess_init_update(pj_ice_strans *ice_st); -static void sess_add_ref(pj_ice_strans *ice_st); -static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st); - /** * This structure describes an ICE stream transport component. A component * in ICE stream transport typically corresponds to a single socket created @@ -172,7 +171,7 @@ struct pj_ice_strans void *user_data; /**< Application data. */ pj_ice_strans_cfg cfg; /**< Configuration. */ pj_ice_strans_cb cb; /**< Application callback. */ - pj_lock_t *init_lock; /**< Initialization mutex. */ + pj_grp_lock_t *grp_lock; /**< Group lock. */ pj_ice_strans_state state; /**< Session state. */ pj_ice_sess *ice; /**< ICE session. */ @@ -183,7 +182,6 @@ struct pj_ice_strans pj_timer_entry ka_timer; /**< STUN keep-alive timer. */ - pj_atomic_t *busy_cnt; /**< To prevent destroy */ pj_bool_t destroy_req;/**< Destroy has been called? */ pj_bool_t cb_called; /**< Init error callback called?*/ }; @@ -503,6 +501,13 @@ static pj_status_t create_comp(pj_ice_strans *ice_st, unsigned comp_id) add_update_turn(ice_st, comp); } + /* It's possible that we end up without any candidates */ + if (comp->cand_cnt == 0) { + PJ_LOG(4,(ice_st->obj_name, + "Error: no candidate is created due to settings")); + return PJ_EINVAL; + } + return PJ_SUCCESS; } @@ -544,23 +549,22 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, comp_cnt)); pj_log_push_indent(); - pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg); - pj_memcpy(&ice_st->cb, cb, sizeof(*cb)); - - status = pj_atomic_create(pool, 0, &ice_st->busy_cnt); + status = pj_grp_lock_create(pool, NULL, &ice_st->grp_lock); if (status != PJ_SUCCESS) { - destroy_ice_st(ice_st); - return status; - } - - status = pj_lock_create_recursive_mutex(pool, ice_st->obj_name, - &ice_st->init_lock); - if (status != PJ_SUCCESS) { - destroy_ice_st(ice_st); + pj_pool_release(pool); pj_log_pop_indent(); return status; } + pj_grp_lock_add_ref(ice_st->grp_lock); + pj_grp_lock_add_handler(ice_st->grp_lock, pool, ice_st, + &ice_st_on_destroy); + + pj_ice_strans_cfg_copy(pool, &ice_st->cfg, cfg); + ice_st->cfg.stun.cfg.grp_lock = ice_st->grp_lock; + ice_st->cfg.turn.cfg.grp_lock = ice_st->grp_lock; + pj_memcpy(&ice_st->cb, cb, sizeof(*cb)); + ice_st->comp_cnt = comp_cnt; ice_st->comp = (pj_ice_strans_comp**) pj_pool_calloc(pool, comp_cnt, sizeof(pj_ice_strans_comp*)); @@ -571,12 +575,12 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, /* Acquire initialization mutex to prevent callback to be * called before we finish initialization. */ - pj_lock_acquire(ice_st->init_lock); + pj_grp_lock_acquire(ice_st->grp_lock); for (i=0; i<comp_cnt; ++i) { status = create_comp(ice_st, i+1); if (status != PJ_SUCCESS) { - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); destroy_ice_st(ice_st); pj_log_pop_indent(); return status; @@ -584,9 +588,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, } /* Done with initialization */ - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); - PJ_LOG(4,(ice_st->obj_name, "ICE stream transport created")); + PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p created", ice_st)); *p_ice_st = ice_st; @@ -598,14 +602,35 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name, return PJ_SUCCESS; } +/* REALLY destroy ICE */ +static void ice_st_on_destroy(void *obj) +{ + pj_ice_strans *ice_st = (pj_ice_strans*)obj; + + PJ_LOG(4,(ice_st->obj_name, "ICE stream transport %p destroyed", obj)); + + /* Done */ + pj_pool_release(ice_st->pool); +} + /* Destroy ICE */ static void destroy_ice_st(pj_ice_strans *ice_st) { unsigned i; - PJ_LOG(5,(ice_st->obj_name, "ICE stream transport destroying..")); + PJ_LOG(5,(ice_st->obj_name, "ICE stream transport %p destroy request..", + ice_st)); pj_log_push_indent(); + pj_grp_lock_acquire(ice_st->grp_lock); + + if (ice_st->destroy_req) { + pj_grp_lock_release(ice_st->grp_lock); + return; + } + + ice_st->destroy_req = PJ_TRUE; + /* Destroy ICE if we have ICE */ if (ice_st->ice) { pj_ice_sess_destroy(ice_st->ice); @@ -616,38 +641,19 @@ static void destroy_ice_st(pj_ice_strans *ice_st) for (i=0; i<ice_st->comp_cnt; ++i) { if (ice_st->comp[i]) { if (ice_st->comp[i]->stun_sock) { - pj_stun_sock_set_user_data(ice_st->comp[i]->stun_sock, NULL); pj_stun_sock_destroy(ice_st->comp[i]->stun_sock); ice_st->comp[i]->stun_sock = NULL; } if (ice_st->comp[i]->turn_sock) { - pj_turn_sock_set_user_data(ice_st->comp[i]->turn_sock, NULL); pj_turn_sock_destroy(ice_st->comp[i]->turn_sock); ice_st->comp[i]->turn_sock = NULL; } } } - ice_st->comp_cnt = 0; - - /* Destroy mutex */ - if (ice_st->init_lock) { - pj_lock_acquire(ice_st->init_lock); - pj_lock_release(ice_st->init_lock); - pj_lock_destroy(ice_st->init_lock); - ice_st->init_lock = NULL; - } - - /* Destroy reference counter */ - if (ice_st->busy_cnt) { - pj_assert(pj_atomic_get(ice_st->busy_cnt)==0); - pj_atomic_destroy(ice_st->busy_cnt); - ice_st->busy_cnt = NULL; - } - PJ_LOG(4,(ice_st->obj_name, "ICE stream transport destroyed")); + pj_grp_lock_dec_ref(ice_st->grp_lock); + pj_grp_lock_release(ice_st->grp_lock); - /* Done */ - pj_pool_release(ice_st->pool); pj_log_pop_indent(); } @@ -732,45 +738,12 @@ static void sess_init_update(pj_ice_strans *ice_st) */ PJ_DEF(pj_status_t) pj_ice_strans_destroy(pj_ice_strans *ice_st) { - PJ_ASSERT_RETURN(ice_st, PJ_EINVAL); - - ice_st->destroy_req = PJ_TRUE; - if (pj_atomic_get(ice_st->busy_cnt) > 0) { - PJ_LOG(5,(ice_st->obj_name, - "ICE strans object is busy, will destroy later")); - return PJ_EPENDING; - } - destroy_ice_st(ice_st); return PJ_SUCCESS; } /* - * Increment busy counter. - */ -static void sess_add_ref(pj_ice_strans *ice_st) -{ - pj_atomic_inc(ice_st->busy_cnt); -} - -/* - * Decrement busy counter. If the counter has reached zero and destroy - * has been requested, destroy the object and return FALSE. - */ -static pj_bool_t sess_dec_ref(pj_ice_strans *ice_st) -{ - int count = pj_atomic_dec_and_get(ice_st->busy_cnt); - pj_assert(count >= 0); - if (count==0 && ice_st->destroy_req) { - pj_ice_strans_destroy(ice_st); - return PJ_FALSE; - } else { - return PJ_TRUE; - } -} - -/* * Get user data */ PJ_DEF(void*) pj_ice_strans_get_user_data(pj_ice_strans *ice_st) @@ -833,7 +806,9 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st, /* Create! */ status = pj_ice_sess_create(&ice_st->cfg.stun_cfg, ice_st->obj_name, role, ice_st->comp_cnt, &ice_cb, - local_ufrag, local_passwd, &ice_st->ice); + local_ufrag, local_passwd, + ice_st->grp_lock, + &ice_st->ice); if (status != PJ_SUCCESS) return status; @@ -1145,6 +1120,8 @@ pj_ice_strans_get_valid_pair(const pj_ice_strans *ice_st, */ PJ_DEF(pj_status_t) pj_ice_strans_stop_ice(pj_ice_strans *ice_st) { + PJ_ASSERT_RETURN(ice_st, PJ_EINVAL); + if (ice_st->ice) { pj_ice_sess_destroy(ice_st->ice); ice_st->ice = NULL; @@ -1246,7 +1223,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) pj_time_val t; unsigned msec; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); pj_gettimeofday(&t); PJ_TIME_VAL_SUB(t, ice_st->start_time); @@ -1328,7 +1305,7 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status) } - sess_dec_ref(ice_st); + pj_grp_lock_dec_ref(ice_st->grp_lock); } /* @@ -1344,17 +1321,20 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice, pj_ice_strans *ice_st = (pj_ice_strans*)ice->user_data; pj_ice_strans_comp *comp; pj_status_t status; +#if defined(ENABLE_TRACE) && (ENABLE_TRACE != 0) + char daddr[PJ_INET6_ADDRSTRLEN]; +#endif PJ_ASSERT_RETURN(comp_id && comp_id <= ice_st->comp_cnt, PJ_EINVAL); comp = ice_st->comp[comp_id-1]; TRACE_PKT((comp->ice_st->obj_name, - "Component %d TX packet to %s:%d with transport %d", - comp_id, - pj_inet_ntoa(((pj_sockaddr_in*)dst_addr)->sin_addr), - (int)pj_ntohs(((pj_sockaddr_in*)dst_addr)->sin_port), - transport_id)); + "Component %d TX packet to %s:%d with transport %d", + comp_id, + pj_sockaddr_print(dst_addr, daddr, sizeof(addr), 0), + pj_sockaddr_get_port(dst_addr), + transport_id)); if (transport_id == TP_TURN) { if (comp->turn_sock) { @@ -1417,7 +1397,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock, ice_st = comp->ice_st; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); if (ice_st->ice == NULL) { /* The ICE session is gone, but we're still receiving packets. @@ -1442,7 +1422,7 @@ static pj_bool_t stun_on_rx_data(pj_stun_sock *stun_sock, } } - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE; } /* Notifification when asynchronous send operation to the STUN socket @@ -1473,10 +1453,10 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, comp = (pj_ice_strans_comp*) pj_stun_sock_get_user_data(stun_sock); ice_st = comp->ice_st; - sess_add_ref(ice_st); + pj_grp_lock_add_ref(ice_st->grp_lock); /* Wait until initialization completes */ - pj_lock_acquire(ice_st->init_lock); + pj_grp_lock_acquire(ice_st->grp_lock); /* Find the srflx cancidate */ for (i=0; i<comp->cand_cnt; ++i) { @@ -1486,14 +1466,14 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, } } - pj_lock_release(ice_st->init_lock); + pj_grp_lock_release(ice_st->grp_lock); /* It is possible that we don't have srflx candidate even though this * callback is called. This could happen when we cancel adding srflx * candidate due to initialization error. */ if (cand == NULL) { - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock) ? PJ_FALSE : PJ_TRUE; } switch (op) { @@ -1546,7 +1526,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, if (comp->default_cand > idx) { --comp->default_cand; } else if (comp->default_cand == idx) { - comp->default_cand = !idx; + comp->default_cand = 0; } /* Remove srflx candidate */ @@ -1574,7 +1554,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, /* May not have cand, e.g. when error during init */ if (cand) cand->status = status; - if (!ice_st->cfg.stun.ignore_stun_error) { + if (!ice_st->cfg.stun.ignore_stun_error || comp->cand_cnt==1) { sess_fail(ice_st, PJ_ICE_STRANS_OP_INIT, "STUN binding request failed", status); } else { @@ -1609,7 +1589,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock, break; } - return sess_dec_ref(ice_st); + return pj_grp_lock_dec_ref(ice_st->grp_lock)? PJ_FALSE : PJ_TRUE; } /* Callback when TURN socket has received a packet */ @@ -1628,7 +1608,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock, return; } - sess_add_ref(comp->ice_st); + pj_grp_lock_add_ref(comp->ice_st->grp_lock); if (comp->ice_st->ice == NULL) { /* The ICE session is gone, but we're still receiving packets. @@ -1655,7 +1635,7 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock, } } - sess_dec_ref(comp->ice_st); + pj_grp_lock_dec_ref(comp->ice_st->grp_lock); } @@ -1677,7 +1657,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_name(old_state), pj_turn_state_name(new_state))); pj_log_push_indent(); - sess_add_ref(comp->ice_st); + pj_grp_lock_add_ref(comp->ice_st->grp_lock); if (new_state == PJ_TURN_STATE_READY) { pj_turn_session_info rel_info; @@ -1691,7 +1671,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_sock_get_info(turn_sock, &rel_info); /* Wait until initialization completes */ - pj_lock_acquire(comp->ice_st->init_lock); + pj_grp_lock_acquire(comp->ice_st->grp_lock); /* Find relayed candidate in the component */ for (i=0; i<comp->cand_cnt; ++i) { @@ -1702,7 +1682,7 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, } pj_assert(cand != NULL); - pj_lock_release(comp->ice_st->init_lock); + pj_grp_lock_release(comp->ice_st->grp_lock); /* Update candidate */ pj_sockaddr_cp(&cand->addr, &rel_info.relay_addr); @@ -1735,22 +1715,27 @@ static void turn_on_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_sock_set_user_data(turn_sock, NULL); comp->turn_sock = NULL; - /* Set session to fail if we're still initializing */ - if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) { - sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT, - "TURN allocation failed", info.last_status); - } else if (comp->turn_err_cnt > 1) { - sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE, - "TURN refresh failed", info.last_status); - } else { - PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status, - "Comp %d: TURN allocation failed, retrying", - comp->comp_id)); - add_update_turn(comp->ice_st, comp); + /* Set session to fail on error. last_status PJ_SUCCESS means normal + * deallocation, which should not trigger sess_fail as it may have + * been initiated by ICE destroy + */ + if (info.last_status != PJ_SUCCESS) { + if (comp->ice_st->state < PJ_ICE_STRANS_STATE_READY) { + sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_INIT, + "TURN allocation failed", info.last_status); + } else if (comp->turn_err_cnt > 1) { + sess_fail(comp->ice_st, PJ_ICE_STRANS_OP_KEEP_ALIVE, + "TURN refresh failed", info.last_status); + } else { + PJ_PERROR(4,(comp->ice_st->obj_name, info.last_status, + "Comp %d: TURN allocation failed, retrying", + comp->comp_id)); + add_update_turn(comp->ice_st, comp); + } } } - sess_dec_ref(comp->ice_st); + pj_grp_lock_dec_ref(comp->ice_st->grp_lock); pj_log_pop_indent(); } diff --git a/pjnath/src/pjnath/nat_detect.c b/pjnath/src/pjnath/nat_detect.c index 86ac694..0eaf9bf 100644 --- a/pjnath/src/pjnath/nat_detect.c +++ b/pjnath/src/pjnath/nat_detect.c @@ -1,4 +1,4 @@ -/* $Id: nat_detect.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: nat_detect.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -307,7 +307,7 @@ PJ_DEF(pj_status_t) pj_stun_detect_nat_type(const pj_sockaddr_in *server, sess_cb.on_request_complete = &on_request_complete; sess_cb.on_send_msg = &on_send_msg; status = pj_stun_session_create(stun_cfg, pool->obj_name, &sess_cb, - PJ_FALSE, &sess->stun_sess); + PJ_FALSE, NULL, &sess->stun_sess); if (status != PJ_SUCCESS) goto on_error; diff --git a/pjnath/src/pjnath/stun_session.c b/pjnath/src/pjnath/stun_session.c index 45d5313..4018b93 100644 --- a/pjnath/src/pjnath/stun_session.c +++ b/pjnath/src/pjnath/stun_session.c @@ -1,4 +1,4 @@ -/* $Id: stun_session.c 3843 2011-10-24 14:13:35Z bennylp $ */ +/* $Id: stun_session.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -25,13 +25,10 @@ struct pj_stun_session { pj_stun_config *cfg; pj_pool_t *pool; - pj_lock_t *lock; - pj_bool_t delete_lock; + pj_grp_lock_t *grp_lock; pj_stun_session_cb cb; void *user_data; - - pj_atomic_t *busy; - pj_bool_t destroy_request; + pj_bool_t is_destroying; pj_bool_t use_fingerprint; @@ -55,14 +52,15 @@ struct pj_stun_session }; #define SNAME(s_) ((s_)->pool->obj_name) +#define THIS_FILE "stun_session.c" -#if PJ_LOG_MAX_LEVEL >= 5 +#if 1 # define TRACE_(expr) PJ_LOG(5,expr) #else # define TRACE_(expr) #endif -#define LOG_ERR_(sess,title,rc) pjnath_perror(sess->pool->obj_name,title,rc) +#define LOG_ERR_(sess,title,rc) PJ_PERROR(3,(sess->pool->obj_name,rc,title)) #define TDATA_POOL_SIZE PJNATH_POOL_LEN_STUN_TDATA #define TDATA_POOL_INC PJNATH_POOL_INC_STUN_TDATA @@ -77,6 +75,7 @@ static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx, const void *stun_pkt, pj_size_t pkt_size); static void stun_tsx_on_destroy(pj_stun_client_tsx *tsx); +static void stun_sess_on_destroy(void *comp); static pj_stun_tsx_cb tsx_cb = { @@ -148,31 +147,38 @@ static void stun_tsx_on_destroy(pj_stun_client_tsx *tsx) pj_stun_tx_data *tdata; tdata = (pj_stun_tx_data*) pj_stun_client_tsx_get_data(tsx); - tsx_erase(tdata->sess, tdata); + pj_stun_client_tsx_stop(tsx); + if (tdata) { + tsx_erase(tdata->sess, tdata); + pj_pool_release(tdata->pool); + } - pj_stun_client_tsx_destroy(tsx); - pj_pool_release(tdata->pool); + TRACE_((THIS_FILE, "STUN transaction %p destroyed", tsx)); } static void destroy_tdata(pj_stun_tx_data *tdata, pj_bool_t force) { + TRACE_((THIS_FILE, "tdata %p destroy request, force=%d, tsx=%p", tdata, + force, tdata->client_tsx)); + if (tdata->res_timer.id != PJ_FALSE) { - pj_timer_heap_cancel(tdata->sess->cfg->timer_heap, - &tdata->res_timer); - tdata->res_timer.id = PJ_FALSE; + pj_timer_heap_cancel_if_active(tdata->sess->cfg->timer_heap, + &tdata->res_timer, PJ_FALSE); pj_list_erase(tdata); } if (force) { + pj_list_erase(tdata); if (tdata->client_tsx) { - tsx_erase(tdata->sess, tdata); - pj_stun_client_tsx_destroy(tdata->client_tsx); + pj_stun_client_tsx_stop(tdata->client_tsx); + pj_stun_client_tsx_set_data(tdata->client_tsx, NULL); } pj_pool_release(tdata->pool); } else { if (tdata->client_tsx) { - pj_time_val delay = {2, 0}; + /* "Probably" this is to absorb retransmission */ + pj_time_val delay = {0, 300}; pj_stun_client_tsx_schedule_destroy(tdata->client_tsx, &delay); } else { @@ -206,7 +212,7 @@ static void on_cache_timeout(pj_timer_heap_t *timer_heap, PJ_LOG(5,(SNAME(tdata->sess), "Response cache deleted")); pj_list_erase(tdata); - pj_stun_msg_destroy_tdata(tdata->sess, tdata); + destroy_tdata(tdata, PJ_FALSE); } static pj_status_t apply_msg_options(pj_stun_session *sess, @@ -419,8 +425,12 @@ static void stun_tsx_on_complete(pj_stun_client_tsx *tsx, sess = tdata->sess; /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_stun_msg_destroy_tdata(sess, tdata); + pj_grp_lock_release(sess->grp_lock); + return; + } /* Handle authentication challenge */ handle_auth_challenge(sess, tdata, response, src_addr, @@ -434,15 +444,13 @@ static void stun_tsx_on_complete(pj_stun_client_tsx *tsx, /* Destroy the transmit data. This will remove the transaction * from the pending list too. */ - pj_stun_msg_destroy_tdata(sess, tdata); + if (status == PJNATH_ESTUNTIMEDOUT) + destroy_tdata(tdata, PJ_TRUE); + else + destroy_tdata(tdata, PJ_FALSE); tdata = NULL; - pj_lock_release(sess->lock); - - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return; - } + pj_grp_lock_release(sess->grp_lock); } static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx, @@ -457,20 +465,21 @@ static pj_status_t stun_tsx_on_send_msg(pj_stun_client_tsx *tsx, sess = tdata->sess; /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + /* Stray timer */ + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = sess->cb.on_send_msg(tdata->sess, tdata->token, stun_pkt, pkt_size, tdata->dst_addr, tdata->addr_len); - pj_lock_release(sess->lock); + if (pj_grp_lock_release(sess->grp_lock)) + return PJ_EGONE; - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } else { - return status; - } + return status; } /* **************************************************************************/ @@ -479,6 +488,7 @@ PJ_DEF(pj_status_t) pj_stun_session_create( pj_stun_config *cfg, const char *name, const pj_stun_session_cb *cb, pj_bool_t fingerprint, + pj_grp_lock_t *grp_lock, pj_stun_session **p_sess) { pj_pool_t *pool; @@ -500,49 +510,38 @@ PJ_DEF(pj_status_t) pj_stun_session_create( pj_stun_config *cfg, pj_memcpy(&sess->cb, cb, sizeof(*cb)); sess->use_fingerprint = fingerprint; sess->log_flag = 0xFFFF; - - sess->srv_name.ptr = (char*) pj_pool_alloc(pool, 32); - sess->srv_name.slen = pj_ansi_snprintf(sess->srv_name.ptr, 32, - "pjnath-%s", pj_get_version()); - sess->rx_pool = pj_pool_create(sess->cfg->pf, name, - PJNATH_POOL_LEN_STUN_TDATA, + if (grp_lock) { + sess->grp_lock = grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &sess->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } + } + + pj_grp_lock_add_ref(sess->grp_lock); + pj_grp_lock_add_handler(sess->grp_lock, pool, sess, + &stun_sess_on_destroy); + + pj_stun_session_set_software_name(sess, &cfg->software_name); + + sess->rx_pool = pj_pool_create(sess->cfg->pf, name, + PJNATH_POOL_LEN_STUN_TDATA, PJNATH_POOL_INC_STUN_TDATA, NULL); pj_list_init(&sess->pending_request_list); pj_list_init(&sess->cached_response_list); - status = pj_lock_create_recursive_mutex(pool, name, &sess->lock); - if (status != PJ_SUCCESS) { - pj_pool_release(pool); - return status; - } - sess->delete_lock = PJ_TRUE; - - status = pj_atomic_create(pool, 0, &sess->busy); - if (status != PJ_SUCCESS) { - pj_lock_destroy(sess->lock); - pj_pool_release(pool); - return status; - } - *p_sess = sess; return PJ_SUCCESS; } -PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) +static void stun_sess_on_destroy(void *comp) { - PJ_ASSERT_RETURN(sess, PJ_EINVAL); - - pj_lock_acquire(sess->lock); - - /* Can't destroy if we're in a callback */ - sess->destroy_request = PJ_TRUE; - if (pj_atomic_get(sess->busy)) { - pj_lock_release(sess->lock); - return PJ_EPENDING; - } + pj_stun_session *sess = (pj_stun_session*)comp; while (!pj_list_empty(&sess->pending_request_list)) { pj_stun_tx_data *tdata = sess->pending_request_list.next; @@ -553,11 +552,6 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) pj_stun_tx_data *tdata = sess->cached_response_list.next; destroy_tdata(tdata, PJ_TRUE); } - pj_lock_release(sess->lock); - - if (sess->delete_lock) { - pj_lock_destroy(sess->lock); - } if (sess->rx_pool) { pj_pool_release(sess->rx_pool); @@ -566,6 +560,47 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) pj_pool_release(sess->pool); + TRACE_((THIS_FILE, "STUN session %p destroyed", sess)); +} + +PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) +{ + pj_stun_tx_data *tdata; + + PJ_ASSERT_RETURN(sess, PJ_EINVAL); + + TRACE_((SNAME(sess), "STUN session %p destroy request, ref_cnt=%d", + sess, pj_grp_lock_get_ref(sess->grp_lock))); + + pj_grp_lock_acquire(sess->grp_lock); + + if (sess->is_destroying) { + /* Prevent from decrementing the ref counter more than once */ + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + + sess->is_destroying = PJ_TRUE; + + /* We need to stop transactions and cached response because they are + * holding the group lock's reference counter while retransmitting. + */ + tdata = sess->pending_request_list.next; + while (tdata != &sess->pending_request_list) { + if (tdata->client_tsx) + pj_stun_client_tsx_stop(tdata->client_tsx); + tdata = tdata->next; + } + + tdata = sess->cached_response_list.next; + while (tdata != &sess->cached_response_list) { + pj_timer_heap_cancel_if_active(tdata->sess->cfg->timer_heap, + &tdata->res_timer, PJ_FALSE); + tdata = tdata->next; + } + + pj_grp_lock_dec_ref(sess->grp_lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -574,9 +609,9 @@ PJ_DEF(pj_status_t) pj_stun_session_set_user_data( pj_stun_session *sess, void *user_data) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); sess->user_data = user_data; - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -586,35 +621,16 @@ PJ_DEF(void*) pj_stun_session_get_user_data(pj_stun_session *sess) return sess->user_data; } -PJ_DEF(pj_status_t) pj_stun_session_set_lock( pj_stun_session *sess, - pj_lock_t *lock, - pj_bool_t auto_del) -{ - pj_lock_t *old_lock = sess->lock; - pj_bool_t old_del; - - PJ_ASSERT_RETURN(sess && lock, PJ_EINVAL); - - pj_lock_acquire(old_lock); - sess->lock = lock; - old_del = sess->delete_lock; - sess->delete_lock = auto_del; - pj_lock_release(old_lock); - - if (old_lock) - pj_lock_destroy(old_lock); - - return PJ_SUCCESS; -} - PJ_DEF(pj_status_t) pj_stun_session_set_software_name(pj_stun_session *sess, const pj_str_t *sw) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); if (sw && sw->slen) pj_strdup(sess->pool, &sess->srv_name, sw); else sess->srv_name.slen = 0; + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -624,6 +640,7 @@ PJ_DEF(pj_status_t) pj_stun_session_set_credential(pj_stun_session *sess, { PJ_ASSERT_RETURN(sess, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); sess->auth_type = auth_type; if (cred) { pj_stun_auth_cred_dup(sess->pool, &sess->cred, cred); @@ -631,6 +648,7 @@ PJ_DEF(pj_status_t) pj_stun_session_set_credential(pj_stun_session *sess, sess->auth_type = PJ_STUN_AUTH_NONE; pj_bzero(&sess->cred, sizeof(sess->cred)); } + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -707,17 +725,21 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess, PJ_ASSERT_RETURN(sess && p_tdata, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = create_tdata(sess, &tdata); if (status != PJ_SUCCESS) - return status; + goto on_error; /* Create STUN message */ status = pj_stun_msg_create(tdata->pool, method, magic, tsx_id, &tdata->msg); - if (status != PJ_SUCCESS) { - pj_pool_release(tdata->pool); - return status; - } + if (status != PJ_SUCCESS) + goto on_error; /* copy the request's transaction ID as the transaction key. */ pj_assert(sizeof(tdata->msg_key)==sizeof(tdata->msg->hdr.tsx_id)); @@ -733,10 +755,8 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess, } else if (sess->auth_type == PJ_STUN_AUTH_SHORT_TERM) { /* MUST put authentication in request */ status = get_auth(sess, tdata); - if (status != PJ_SUCCESS) { - pj_pool_release(tdata->pool); - return status; - } + if (status != PJ_SUCCESS) + goto on_error; } else if (sess->auth_type == PJ_STUN_AUTH_LONG_TERM) { /* Only put authentication information if we've received @@ -744,22 +764,27 @@ PJ_DEF(pj_status_t) pj_stun_session_create_req(pj_stun_session *sess, */ if (sess->next_nonce.slen != 0) { status = get_auth(sess, tdata); - if (status != PJ_SUCCESS) { - pj_pool_release(tdata->pool); - return status; - } + if (status != PJ_SUCCESS) + goto on_error; tdata->auth_info.nonce = sess->next_nonce; tdata->auth_info.realm = sess->server_realm; } } else { pj_assert(!"Invalid authentication type"); - pj_pool_release(tdata->pool); - return PJ_EBUG; + status = PJ_EBUG; + goto on_error; } *p_tdata = tdata; + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; + +on_error: + if (tdata) + pj_pool_release(tdata->pool); + pj_grp_lock_release(sess->grp_lock); + return status; } PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess, @@ -771,9 +796,17 @@ PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess, PJ_ASSERT_RETURN(sess && p_tdata, PJ_EINVAL); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = create_tdata(sess, &tdata); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(sess->grp_lock); return status; + } /* Create STUN message */ msg_type |= PJ_STUN_INDICATION_BIT; @@ -781,10 +814,13 @@ PJ_DEF(pj_status_t) pj_stun_session_create_ind(pj_stun_session *sess, NULL, &tdata->msg); if (status != PJ_SUCCESS) { pj_pool_release(tdata->pool); + pj_grp_lock_release(sess->grp_lock); return status; } *p_tdata = tdata; + + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -800,15 +836,24 @@ PJ_DEF(pj_status_t) pj_stun_session_create_res( pj_stun_session *sess, pj_status_t status; pj_stun_tx_data *tdata = NULL; + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = create_tdata(sess, &tdata); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(sess->grp_lock); return status; + } /* Create STUN response message */ status = pj_stun_msg_create_response(tdata->pool, rdata->msg, err_code, err_msg, &tdata->msg); if (status != PJ_SUCCESS) { pj_pool_release(tdata->pool); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -823,6 +868,8 @@ PJ_DEF(pj_status_t) pj_stun_session_create_res( pj_stun_session *sess, *p_tdata = tdata; + pj_grp_lock_release(sess->grp_lock); + return PJ_SUCCESS; } @@ -869,6 +916,13 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, PJ_ASSERT_RETURN(sess && addr_len && server && tdata, PJ_EINVAL); + /* Lock the session and prevent user from destroying us in the callback */ + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + pj_log_push_indent(); /* Allocate packet */ @@ -878,10 +932,6 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, tdata->token = token; tdata->retransmit = retransmit; - /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); - /* Apply options */ status = apply_msg_options(sess, tdata->pool, &tdata->auth_info, tdata->msg); @@ -911,7 +961,8 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, if (PJ_STUN_IS_REQUEST(tdata->msg->hdr.type)) { /* Create STUN client transaction */ - status = pj_stun_client_tsx_create(sess->cfg, tdata->pool, + status = pj_stun_client_tsx_create(sess->cfg, tdata->pool, + sess->grp_lock, &tsx_cb, &tdata->client_tsx); PJ_ASSERT_RETURN(status==PJ_SUCCESS, status); pj_stun_client_tsx_set_data(tdata->client_tsx, (void*)tdata); @@ -941,17 +992,17 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, pj_time_val timeout; pj_memset(&tdata->res_timer, 0, sizeof(tdata->res_timer)); - pj_timer_entry_init(&tdata->res_timer, PJ_TRUE, tdata, + pj_timer_entry_init(&tdata->res_timer, PJ_FALSE, tdata, &on_cache_timeout); timeout.sec = sess->cfg->res_cache_msec / 1000; timeout.msec = sess->cfg->res_cache_msec % 1000; - status = pj_timer_heap_schedule(sess->cfg->timer_heap, - &tdata->res_timer, - &timeout); + status = pj_timer_heap_schedule_w_grp_lock(sess->cfg->timer_heap, + &tdata->res_timer, + &timeout, PJ_TRUE, + sess->grp_lock); if (status != PJ_SUCCESS) { - tdata->res_timer.id = PJ_FALSE; pj_stun_msg_destroy_tdata(sess, tdata); LOG_ERR_(sess, "Error scheduling response timer", status); goto on_return; @@ -977,15 +1028,10 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, } on_return: - pj_lock_release(sess->lock); - pj_log_pop_indent(); - /* Check if application has called destroy() in the callback */ - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + if (pj_grp_lock_release(sess->grp_lock)) + return PJ_EGONE; return status; } @@ -1007,14 +1053,25 @@ PJ_DEF(pj_status_t) pj_stun_session_respond( pj_stun_session *sess, pj_str_t reason; pj_stun_tx_data *tdata; + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + status = pj_stun_session_create_res(sess, rdata, code, (errmsg?pj_cstr(&reason,errmsg):NULL), &tdata); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(sess->grp_lock); return status; + } - return pj_stun_session_send_msg(sess, token, cache, PJ_FALSE, - dst_addr, addr_len, tdata); + status = pj_stun_session_send_msg(sess, token, cache, PJ_FALSE, + dst_addr, addr_len, tdata); + + pj_grp_lock_release(sess->grp_lock); + return status; } @@ -1031,8 +1088,11 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess, PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL); /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } if (notify) { (sess->cb.on_request_complete)(sess, notify_status, tdata->token, @@ -1042,12 +1102,7 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess, /* Just destroy tdata. This will destroy the transaction as well */ pj_stun_msg_destroy_tdata(sess, tdata); - pj_lock_release(sess->lock); - - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -1056,7 +1111,8 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess, * Explicitly request retransmission of the request. */ PJ_DEF(pj_status_t) pj_stun_session_retransmit_req(pj_stun_session *sess, - pj_stun_tx_data *tdata) + pj_stun_tx_data *tdata, + pj_bool_t mod_count) { pj_status_t status; @@ -1064,17 +1120,15 @@ PJ_DEF(pj_status_t) pj_stun_session_retransmit_req(pj_stun_session *sess, PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL); /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); - - status = pj_stun_client_tsx_retransmit(tdata->client_tsx); + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } - pj_lock_release(sess->lock); + status = pj_stun_client_tsx_retransmit(tdata->client_tsx, mod_count); - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1362,11 +1416,15 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess, PJ_ASSERT_RETURN(sess && packet && pkt_size, PJ_EINVAL); - pj_log_push_indent(); - /* Lock the session and prevent user from destroying us in the callback */ - pj_atomic_inc(sess->busy); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); + + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return PJ_EINVALIDOP; + } + + pj_log_push_indent(); /* Reset pool */ pj_pool_reset(sess->rx_pool); @@ -1419,17 +1477,10 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess, } on_return: - pj_lock_release(sess->lock); - pj_log_pop_indent(); - /* If we've received destroy request while we're on the callback, - * destroy the session now. - */ - if (pj_atomic_dec_and_get(sess->busy)==0 && sess->destroy_request) { - pj_stun_session_destroy(sess); - return PJNATH_ESTUNDESTROYED; - } + if (pj_grp_lock_release(sess->grp_lock)) + return PJ_EGONE; return status; } diff --git a/pjnath/src/pjnath/stun_sock.c b/pjnath/src/pjnath/stun_sock.c index ff7dc16..cc7bd2c 100644 --- a/pjnath/src/pjnath/stun_sock.c +++ b/pjnath/src/pjnath/stun_sock.c @@ -1,4 +1,4 @@ -/* $Id: stun_sock.c 3999 2012-03-30 07:10:13Z bennylp $ */ +/* $Id: stun_sock.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -28,16 +28,24 @@ #include <pj/assert.h> #include <pj/ip_helper.h> #include <pj/log.h> +#include <pj/os.h> #include <pj/pool.h> #include <pj/rand.h> +#if 1 +# define TRACE_(x) PJ_LOG(5,x) +#else +# define TRACE_(x) +#endif + +enum { MAX_BIND_RETRY = 100 }; struct pj_stun_sock { char *obj_name; /* Log identification */ pj_pool_t *pool; /* Pool */ void *user_data; /* Application user data */ - + pj_bool_t is_destroying; /* Destroy already called */ int af; /* Address family */ pj_stun_config stun_cfg; /* STUN config (ioqueue etc)*/ pj_stun_sock_cb cb; /* Application callbacks */ @@ -56,13 +64,16 @@ struct pj_stun_sock pj_uint16_t tsx_id[6]; /* .. to match STUN msg */ pj_stun_session *stun_sess; /* STUN session */ - + pj_grp_lock_t *grp_lock; /* Session group lock */ }; /* * Prototypes for static functions */ +/* Destructor for group lock */ +static void stun_sock_destructor(void *obj); + /* This callback is called by the STUN session to send packet */ static pj_status_t sess_on_send_msg(pj_stun_session *sess, void *token, @@ -162,7 +173,9 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, pj_pool_t *pool; pj_stun_sock *stun_sock; pj_stun_sock_cfg default_cfg; + pj_sockaddr bound_addr; unsigned i; + pj_uint16_t max_bind_retry; pj_status_t status; PJ_ASSERT_RETURN(stun_cfg && cb && p_stun_sock, PJ_EINVAL); @@ -198,6 +211,20 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, if (stun_sock->ka_interval == 0) stun_sock->ka_interval = PJ_STUN_KEEP_ALIVE_SEC; + if (cfg && cfg->grp_lock) { + stun_sock->grp_lock = cfg->grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &stun_sock->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } + } + + pj_grp_lock_add_ref(stun_sock->grp_lock); + pj_grp_lock_add_handler(stun_sock->grp_lock, pool, stun_sock, + &stun_sock_destructor); + /* Create socket and bind socket */ status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &stun_sock->sock_fd); if (status != PJ_SUCCESS) @@ -211,17 +238,17 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, goto on_error; /* Bind socket */ - if (pj_sockaddr_has_addr(&cfg->bound_addr)) { - status = pj_sock_bind(stun_sock->sock_fd, &cfg->bound_addr, - pj_sockaddr_get_len(&cfg->bound_addr)); - } else { - pj_sockaddr bound_addr; - - pj_sockaddr_init(af, &bound_addr, NULL, 0); - status = pj_sock_bind(stun_sock->sock_fd, &bound_addr, - pj_sockaddr_get_len(&bound_addr)); + max_bind_retry = MAX_BIND_RETRY; + if (cfg->port_range && cfg->port_range < max_bind_retry) + max_bind_retry = cfg->port_range; + pj_sockaddr_init(af, &bound_addr, NULL, 0); + if (cfg->bound_addr.addr.sa_family == pj_AF_INET() || + cfg->bound_addr.addr.sa_family == pj_AF_INET6()) + { + pj_sockaddr_cp(&bound_addr, &cfg->bound_addr); } - + status = pj_sock_bind_random(stun_sock->sock_fd, &bound_addr, + cfg->port_range, max_bind_retry); if (status != PJ_SUCCESS) goto on_error; @@ -248,6 +275,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, pj_activesock_cb activesock_cb; pj_activesock_cfg_default(&activesock_cfg); + activesock_cfg.grp_lock = stun_sock->grp_lock; activesock_cfg.async_cnt = cfg->async_cnt; activesock_cfg.concurrency = 0; @@ -286,6 +314,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_create( pj_stun_config *stun_cfg, status = pj_stun_session_create(&stun_sock->stun_cfg, stun_sock->obj_name, &sess_cb, PJ_FALSE, + stun_sock->grp_lock, &stun_sock->stun_sess); if (status != PJ_SUCCESS) goto on_error; @@ -328,6 +357,8 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock, PJ_ASSERT_RETURN(stun_sock && domain && default_port, PJ_EINVAL); + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Check whether the domain contains IP address */ stun_sock->srv_addr.addr.sa_family = (pj_uint16_t)stun_sock->af; status = pj_inet_pton(stun_sock->af, domain, @@ -356,7 +387,6 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock, &stun_sock->q); /* Processing will resume when the DNS SRV callback is called */ - return status; } else { @@ -374,40 +404,29 @@ PJ_DEF(pj_status_t) pj_stun_sock_start( pj_stun_sock *stun_sock, pj_sockaddr_set_port(&stun_sock->srv_addr, (pj_uint16_t)default_port); /* Start sending Binding request */ - return get_mapped_addr(stun_sock); + status = get_mapped_addr(stun_sock); } + + pj_grp_lock_release(stun_sock->grp_lock); + return status; } -/* Destroy */ -PJ_DEF(pj_status_t) pj_stun_sock_destroy(pj_stun_sock *stun_sock) +/* Destructor */ +static void stun_sock_destructor(void *obj) { + pj_stun_sock *stun_sock = (pj_stun_sock*)obj; + if (stun_sock->q) { pj_dns_srv_cancel_query(stun_sock->q, PJ_FALSE); stun_sock->q = NULL; } - /* Destroy the active socket first just in case we'll get - * stray callback. - */ - if (stun_sock->active_sock != NULL) { - pj_activesock_close(stun_sock->active_sock); - stun_sock->active_sock = NULL; - stun_sock->sock_fd = PJ_INVALID_SOCKET; - } else if (stun_sock->sock_fd != PJ_INVALID_SOCKET) { - pj_sock_close(stun_sock->sock_fd); - stun_sock->sock_fd = PJ_INVALID_SOCKET; - } - - if (stun_sock->ka_timer.id != 0) { - pj_timer_heap_cancel(stun_sock->stun_cfg.timer_heap, - &stun_sock->ka_timer); - stun_sock->ka_timer.id = 0; - } - + /* if (stun_sock->stun_sess) { pj_stun_session_destroy(stun_sock->stun_sess); stun_sock->stun_sess = NULL; } + */ if (stun_sock->pool) { pj_pool_t *pool = stun_sock->pool; @@ -415,6 +434,40 @@ PJ_DEF(pj_status_t) pj_stun_sock_destroy(pj_stun_sock *stun_sock) pj_pool_release(pool); } + TRACE_(("", "STUN sock %p destroyed", stun_sock)); + +} + +/* Destroy */ +PJ_DEF(pj_status_t) pj_stun_sock_destroy(pj_stun_sock *stun_sock) +{ + TRACE_((stun_sock->obj_name, "STUN sock %p request, ref_cnt=%d", + stun_sock, pj_grp_lock_get_ref(stun_sock->grp_lock))); + + pj_grp_lock_acquire(stun_sock->grp_lock); + if (stun_sock->is_destroying) { + /* Destroy already called */ + pj_grp_lock_release(stun_sock->grp_lock); + return PJ_EINVALIDOP; + } + + stun_sock->is_destroying = PJ_TRUE; + pj_timer_heap_cancel_if_active(stun_sock->stun_cfg.timer_heap, + &stun_sock->ka_timer, 0); + + if (stun_sock->active_sock != NULL) { + stun_sock->sock_fd = PJ_INVALID_SOCKET; + pj_activesock_close(stun_sock->active_sock); + } else if (stun_sock->sock_fd != PJ_INVALID_SOCKET) { + pj_sock_close(stun_sock->sock_fd); + stun_sock->sock_fd = PJ_INVALID_SOCKET; + } + + if (stun_sock->stun_sess) { + pj_stun_session_destroy(stun_sock->stun_sess); + } + pj_grp_lock_dec_ref(stun_sock->grp_lock); + pj_grp_lock_release(stun_sock->grp_lock); return PJ_SUCCESS; } @@ -458,12 +511,15 @@ static void dns_srv_resolver_cb(void *user_data, { pj_stun_sock *stun_sock = (pj_stun_sock*) user_data; + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Clear query */ stun_sock->q = NULL; /* Handle error */ if (status != PJ_SUCCESS) { sess_fail(stun_sock, PJ_STUN_SOCK_DNS_OP, status); + pj_grp_lock_release(stun_sock->grp_lock); return; } @@ -480,6 +536,8 @@ static void dns_srv_resolver_cb(void *user_data, /* Start sending Binding request */ get_mapped_addr(stun_sock); + + pj_grp_lock_release(stun_sock->grp_lock); } @@ -523,6 +581,8 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, PJ_ASSERT_RETURN(stun_sock && info, PJ_EINVAL); + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Copy STUN server address and mapped address */ pj_memcpy(&info->srv_addr, &stun_sock->srv_addr, sizeof(pj_sockaddr)); @@ -533,8 +593,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, addr_len = sizeof(info->bound_addr); status = pj_sock_getsockname(stun_sock->sock_fd, &info->bound_addr, &addr_len); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return status; + } /* If socket is bound to a specific interface, then only put that * interface in the alias list. Otherwise query all the interfaces @@ -550,8 +612,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, /* Get the default address */ status = pj_gethostip(stun_sock->af, &def_addr); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return status; + } pj_sockaddr_set_port(&def_addr, port); @@ -559,8 +623,10 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, info->alias_cnt = PJ_ARRAY_SIZE(info->aliases); status = pj_enum_ip_interface(stun_sock->af, &info->alias_cnt, info->aliases); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return status; + } /* Set the port number for each address. */ @@ -580,6 +646,7 @@ PJ_DEF(pj_status_t) pj_stun_sock_get_info( pj_stun_sock *stun_sock, } } + pj_grp_lock_release(stun_sock->grp_lock); return PJ_SUCCESS; } @@ -593,14 +660,29 @@ PJ_DEF(pj_status_t) pj_stun_sock_sendto( pj_stun_sock *stun_sock, unsigned addr_len) { pj_ssize_t size; + pj_status_t status; + PJ_ASSERT_RETURN(stun_sock && pkt && dst_addr && addr_len, PJ_EINVAL); + pj_grp_lock_acquire(stun_sock->grp_lock); + + if (!stun_sock->active_sock) { + /* We have been shutdown, but this callback may still get called + * by retransmit timer. + */ + pj_grp_lock_release(stun_sock->grp_lock); + return PJ_EINVALIDOP; + } + if (send_key==NULL) send_key = &stun_sock->send_key; size = pkt_len; - return pj_activesock_sendto(stun_sock->active_sock, send_key, - pkt, &size, flag, dst_addr, addr_len); + status = pj_activesock_sendto(stun_sock->active_sock, send_key, + pkt, &size, flag, dst_addr, addr_len); + + pj_grp_lock_release(stun_sock->grp_lock); + return status; } /* This callback is called by the STUN session to send packet */ @@ -615,12 +697,18 @@ static pj_status_t sess_on_send_msg(pj_stun_session *sess, pj_ssize_t size; stun_sock = (pj_stun_sock *) pj_stun_session_get_user_data(sess); + if (!stun_sock || !stun_sock->active_sock) { + /* We have been shutdown, but this callback may still get called + * by retransmit timer. + */ + return PJ_EINVALIDOP; + } pj_assert(token==INTERNAL_MSG_TOKEN); PJ_UNUSED_ARG(token); size = pkt_size; - return pj_activesock_sendto(stun_sock->active_sock, + return pj_activesock_sendto(stun_sock->active_sock, &stun_sock->int_send_key, pkt, &size, 0, dst_addr, addr_len); } @@ -643,6 +731,8 @@ static void sess_on_request_complete(pj_stun_session *sess, pj_bool_t resched = PJ_TRUE; stun_sock = (pj_stun_sock *) pj_stun_session_get_user_data(sess); + if (!stun_sock) + return; PJ_UNUSED_ARG(tdata); PJ_UNUSED_ARG(token); @@ -712,25 +802,20 @@ on_return: /* Schedule keep-alive timer */ static void start_ka_timer(pj_stun_sock *stun_sock) { - if (stun_sock->ka_timer.id != 0) { - pj_timer_heap_cancel(stun_sock->stun_cfg.timer_heap, - &stun_sock->ka_timer); - stun_sock->ka_timer.id = 0; - } + pj_timer_heap_cancel_if_active(stun_sock->stun_cfg.timer_heap, + &stun_sock->ka_timer, 0); pj_assert(stun_sock->ka_interval != 0); - if (stun_sock->ka_interval > 0) { + if (stun_sock->ka_interval > 0 && !stun_sock->is_destroying) { pj_time_val delay; delay.sec = stun_sock->ka_interval; delay.msec = 0; - if (pj_timer_heap_schedule(stun_sock->stun_cfg.timer_heap, - &stun_sock->ka_timer, - &delay) == PJ_SUCCESS) - { - stun_sock->ka_timer.id = PJ_TRUE; - } + pj_timer_heap_schedule_w_grp_lock(stun_sock->stun_cfg.timer_heap, + &stun_sock->ka_timer, + &delay, PJ_TRUE, + stun_sock->grp_lock); } } @@ -742,14 +827,18 @@ static void ka_timer_cb(pj_timer_heap_t *th, pj_timer_entry *te) stun_sock = (pj_stun_sock *) te->user_data; PJ_UNUSED_ARG(th); + pj_grp_lock_acquire(stun_sock->grp_lock); /* Time to send STUN Binding request */ - if (get_mapped_addr(stun_sock) != PJ_SUCCESS) + if (get_mapped_addr(stun_sock) != PJ_SUCCESS) { + pj_grp_lock_release(stun_sock->grp_lock); return; + } /* Next keep-alive timer will be scheduled once the request * is complete. */ + pj_grp_lock_release(stun_sock->grp_lock); } /* Callback from active socket when incoming packet is received */ @@ -765,6 +854,8 @@ static pj_bool_t on_data_recvfrom(pj_activesock_t *asock, pj_uint16_t type; stun_sock = (pj_stun_sock*) pj_activesock_get_user_data(asock); + if (!stun_sock) + return PJ_FALSE; /* Log socket error */ if (status != PJ_SUCCESS) { @@ -772,6 +863,8 @@ static pj_bool_t on_data_recvfrom(pj_activesock_t *asock, return PJ_TRUE; } + pj_grp_lock_acquire(stun_sock->grp_lock); + /* Check that this is STUN message */ status = pj_stun_msg_check((const pj_uint8_t*)data, size, PJ_STUN_IS_DATAGRAM | PJ_STUN_CHECK_PACKET); @@ -807,7 +900,10 @@ static pj_bool_t on_data_recvfrom(pj_activesock_t *asock, status = pj_stun_session_on_rx_pkt(stun_sock->stun_sess, data, size, PJ_STUN_IS_DATAGRAM, NULL, NULL, src_addr, addr_len); - return status!=PJNATH_ESTUNDESTROYED ? PJ_TRUE : PJ_FALSE; + + status = pj_grp_lock_release(stun_sock->grp_lock); + + return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE; process_app_data: if (stun_sock->cb.on_rx_data) { @@ -815,10 +911,12 @@ process_app_data: ret = (*stun_sock->cb.on_rx_data)(stun_sock, data, size, src_addr, addr_len); - return ret; + status = pj_grp_lock_release(stun_sock->grp_lock); + return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE; } - return PJ_TRUE; + status = pj_grp_lock_release(stun_sock->grp_lock); + return status!=PJ_EGONE ? PJ_TRUE : PJ_FALSE; } /* Callback from active socket about send status */ @@ -829,6 +927,8 @@ static pj_bool_t on_data_sent(pj_activesock_t *asock, pj_stun_sock *stun_sock; stun_sock = (pj_stun_sock*) pj_activesock_get_user_data(asock); + if (!stun_sock) + return PJ_FALSE; /* Don't report to callback if this is internal message */ if (send_key == &stun_sock->int_send_key) { @@ -839,6 +939,8 @@ static pj_bool_t on_data_sent(pj_activesock_t *asock, if (stun_sock->cb.on_data_sent) { pj_bool_t ret; + pj_grp_lock_acquire(stun_sock->grp_lock); + /* If app gives NULL send_key in sendto() function, then give * NULL in the callback too */ @@ -848,6 +950,7 @@ static pj_bool_t on_data_sent(pj_activesock_t *asock, /* Call callback */ ret = (*stun_sock->cb.on_data_sent)(stun_sock, send_key, sent); + pj_grp_lock_release(stun_sock->grp_lock); return ret; } diff --git a/pjnath/src/pjnath/stun_transaction.c b/pjnath/src/pjnath/stun_transaction.c index d714ecf..58eca26 100644 --- a/pjnath/src/pjnath/stun_transaction.c +++ b/pjnath/src/pjnath/stun_transaction.c @@ -1,4 +1,4 @@ -/* $Id: stun_transaction.c 3753 2011-09-18 14:59:56Z bennylp $ */ +/* $Id: stun_transaction.c 4413 2013-03-05 06:29:15Z ming $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -26,6 +26,8 @@ #include <pj/timer.h> +#define THIS_FILE "stun_transaction.c" +#define TIMER_INACTIVE 0 #define TIMER_ACTIVE 1 @@ -34,6 +36,7 @@ struct pj_stun_client_tsx char obj_name[PJ_MAX_OBJ_NAME]; pj_stun_tsx_cb cb; void *user_data; + pj_grp_lock_t *grp_lock; pj_bool_t complete; @@ -51,18 +54,24 @@ struct pj_stun_client_tsx }; +#if 1 +# define TRACE_(expr) PJ_LOG(5,expr) +#else +# define TRACE_(expr) +#endif + + static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, pj_timer_entry *timer); static void destroy_timer_callback(pj_timer_heap_t *timer_heap, pj_timer_entry *timer); -#define stun_perror(tsx,msg,rc) pjnath_perror(tsx->obj_name, msg, rc) - /* * Create a STUN client transaction. */ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg, pj_pool_t *pool, + pj_grp_lock_t *grp_lock, const pj_stun_tsx_cb *cb, pj_stun_client_tsx **p_tsx) { @@ -74,6 +83,7 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg, tsx = PJ_POOL_ZALLOC_T(pool, pj_stun_client_tsx); tsx->rto_msec = cfg->rto_msec; tsx->timer_heap = cfg->timer_heap; + tsx->grp_lock = grp_lock; pj_memcpy(&tsx->cb, cb, sizeof(*cb)); tsx->retransmit_timer.cb = &retransmit_timer_callback; @@ -82,7 +92,7 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_create(pj_stun_config *cfg, tsx->destroy_timer.cb = &destroy_timer_callback; tsx->destroy_timer.user_data = tsx; - pj_ansi_snprintf(tsx->obj_name, sizeof(tsx->obj_name), "stuntsx%p", tsx); + pj_ansi_snprintf(tsx->obj_name, sizeof(tsx->obj_name), "utsx%p", tsx); *p_tsx = tsx; @@ -100,26 +110,30 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_schedule_destroy( PJ_ASSERT_RETURN(tsx && delay, PJ_EINVAL); PJ_ASSERT_RETURN(tsx->cb.on_destroy, PJ_EINVAL); + pj_grp_lock_acquire(tsx->grp_lock); + /* Cancel previously registered timer */ - if (tsx->destroy_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->destroy_timer); - tsx->destroy_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->destroy_timer, + TIMER_INACTIVE); /* Stop retransmission, just in case */ - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); - status = pj_timer_heap_schedule(tsx->timer_heap, - &tsx->destroy_timer, delay); - if (status != PJ_SUCCESS) + status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap, + &tsx->destroy_timer, delay, + TIMER_ACTIVE, tsx->grp_lock); + if (status != PJ_SUCCESS) { + pj_grp_lock_release(tsx->grp_lock); return status; + } - tsx->destroy_timer.id = TIMER_ACTIVE; tsx->cb.on_complete = NULL; + pj_grp_lock_release(tsx->grp_lock); + + TRACE_((tsx->obj_name, "STUN transaction %p schedule destroy", tsx)); + return PJ_SUCCESS; } @@ -127,20 +141,21 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_schedule_destroy( /* * Destroy transaction immediately. */ -PJ_DEF(pj_status_t) pj_stun_client_tsx_destroy(pj_stun_client_tsx *tsx) +PJ_DEF(pj_status_t) pj_stun_client_tsx_stop(pj_stun_client_tsx *tsx) { PJ_ASSERT_RETURN(tsx, PJ_EINVAL); - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } - if (tsx->destroy_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->destroy_timer); - tsx->destroy_timer.id = 0; - } + /* Don't call grp_lock_acquire() because we might be called on + * group lock's destructor. + */ + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->destroy_timer, + TIMER_INACTIVE); + + PJ_LOG(5,(tsx->obj_name, "STUN client transaction %p stopped, ref_cnt=%d", + tsx, pj_grp_lock_get_ref(tsx->grp_lock))); - PJ_LOG(5,(tsx->obj_name, "STUN client transaction destroyed")); return PJ_SUCCESS; } @@ -180,14 +195,15 @@ PJ_DEF(void*) pj_stun_client_tsx_get_data(pj_stun_client_tsx *tsx) /* * Transmit message. */ -static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx) +static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx, + pj_bool_t mod_count) { pj_status_t status; - PJ_ASSERT_RETURN(tsx->retransmit_timer.id == 0 || + PJ_ASSERT_RETURN(tsx->retransmit_timer.id == TIMER_INACTIVE || !tsx->require_retransmit, PJ_EBUSY); - if (tsx->require_retransmit) { + if (tsx->require_retransmit && mod_count) { /* Calculate retransmit/timeout delay */ if (tsx->transmit_count == 0) { tsx->retransmit_time.sec = 0; @@ -210,18 +226,20 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx) * cancel it (as opposed to when schedule_timer() failed we cannot * cancel transmission). */; - status = pj_timer_heap_schedule(tsx->timer_heap, - &tsx->retransmit_timer, - &tsx->retransmit_time); + status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap, + &tsx->retransmit_timer, + &tsx->retransmit_time, + TIMER_ACTIVE, + tsx->grp_lock); if (status != PJ_SUCCESS) { - tsx->retransmit_timer.id = 0; + tsx->retransmit_timer.id = TIMER_INACTIVE; return status; } - tsx->retransmit_timer.id = TIMER_ACTIVE; } - tsx->transmit_count++; + if (mod_count) + tsx->transmit_count++; PJ_LOG(5,(tsx->obj_name, "STUN sending message (transmit count=%d)", tsx->transmit_count)); @@ -233,12 +251,12 @@ static pj_status_t tsx_transmit_msg(pj_stun_client_tsx *tsx) if (status == PJNATH_ESTUNDESTROYED) { /* We've been destroyed, don't access the object. */ } else if (status != PJ_SUCCESS) { - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, - &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; + if (mod_count) { + pj_timer_heap_cancel_if_active( tsx->timer_heap, + &tsx->retransmit_timer, + TIMER_INACTIVE); } - stun_perror(tsx, "STUN error sending message", status); + PJ_PERROR(4, (tsx->obj_name, status, "STUN error sending message")); } pj_log_pop_indent(); @@ -259,6 +277,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_send_msg(pj_stun_client_tsx *tsx, PJ_ASSERT_RETURN(tsx && pkt && pkt_len, PJ_EINVAL); PJ_ASSERT_RETURN(tsx->retransmit_timer.id == 0, PJ_EBUSY); + pj_grp_lock_acquire(tsx->grp_lock); + /* Encode message */ tsx->last_pkt = pkt; tsx->last_pkt_size = pkt_len; @@ -284,27 +304,29 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_send_msg(pj_stun_client_tsx *tsx, * cancel it (as opposed to when schedule_timer() failed we cannot * cancel transmission). */; - status = pj_timer_heap_schedule(tsx->timer_heap, - &tsx->retransmit_timer, - &tsx->retransmit_time); + status = pj_timer_heap_schedule_w_grp_lock(tsx->timer_heap, + &tsx->retransmit_timer, + &tsx->retransmit_time, + TIMER_ACTIVE, + tsx->grp_lock); if (status != PJ_SUCCESS) { - tsx->retransmit_timer.id = 0; + tsx->retransmit_timer.id = TIMER_INACTIVE; + pj_grp_lock_release(tsx->grp_lock); return status; } - tsx->retransmit_timer.id = TIMER_ACTIVE; } /* Send the message */ - status = tsx_transmit_msg(tsx); + status = tsx_transmit_msg(tsx, PJ_TRUE); if (status != PJ_SUCCESS) { - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, - &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, + &tsx->retransmit_timer, + TIMER_INACTIVE); + pj_grp_lock_release(tsx->grp_lock); return status; } + pj_grp_lock_release(tsx->grp_lock); return PJ_SUCCESS; } @@ -317,8 +339,12 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, pj_status_t status; PJ_UNUSED_ARG(timer_heap); + pj_grp_lock_acquire(tsx->grp_lock); if (tsx->transmit_count >= PJ_STUN_MAX_TRANSMIT_COUNT) { + /* tsx may be destroyed when calling the callback below */ + pj_grp_lock_t *grp_lock = tsx->grp_lock; + /* Retransmission count exceeded. Transaction has failed */ tsx->retransmit_timer.id = 0; PJ_LOG(4,(tsx->obj_name, "STUN timeout waiting for response")); @@ -329,16 +355,15 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, tsx->cb.on_complete(tsx, PJNATH_ESTUNTIMEDOUT, NULL, NULL, 0); } } + pj_grp_lock_release(grp_lock); /* We might have been destroyed, don't try to access the object */ pj_log_pop_indent(); return; } tsx->retransmit_timer.id = 0; - status = tsx_transmit_msg(tsx); - if (status == PJNATH_ESTUNDESTROYED) { - /* We've been destroyed, don't try to access the object */ - } else if (status != PJ_SUCCESS) { + status = tsx_transmit_msg(tsx, PJ_TRUE); + if (status != PJ_SUCCESS) { tsx->retransmit_timer.id = 0; if (!tsx->complete) { tsx->complete = PJ_TRUE; @@ -346,25 +371,26 @@ static void retransmit_timer_callback(pj_timer_heap_t *timer_heap, tsx->cb.on_complete(tsx, status, NULL, NULL, 0); } } - /* We might have been destroyed, don't try to access the object */ } + + pj_grp_lock_release(tsx->grp_lock); + /* We might have been destroyed, don't try to access the object */ } /* * Request to retransmit the request. */ -PJ_DEF(pj_status_t) pj_stun_client_tsx_retransmit(pj_stun_client_tsx *tsx) +PJ_DEF(pj_status_t) pj_stun_client_tsx_retransmit(pj_stun_client_tsx *tsx, + pj_bool_t mod_count) { if (tsx->destroy_timer.id != 0) { return PJ_SUCCESS; } - if (tsx->retransmit_timer.id != 0) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); - return tsx_transmit_msg(tsx); + return tsx_transmit_msg(tsx, mod_count); } /* Timer callback to destroy transaction */ @@ -376,6 +402,7 @@ static void destroy_timer_callback(pj_timer_heap_t *timer_heap, PJ_UNUSED_ARG(timer_heap); tsx->destroy_timer.id = PJ_FALSE; + tsx->cb.on_destroy(tsx); /* Don't access transaction after this */ } @@ -405,10 +432,8 @@ PJ_DEF(pj_status_t) pj_stun_client_tsx_on_rx_msg(pj_stun_client_tsx *tsx, /* We have a response with matching transaction ID. * We can cancel retransmit timer now. */ - if (tsx->retransmit_timer.id) { - pj_timer_heap_cancel(tsx->timer_heap, &tsx->retransmit_timer); - tsx->retransmit_timer.id = 0; - } + pj_timer_heap_cancel_if_active(tsx->timer_heap, &tsx->retransmit_timer, + TIMER_INACTIVE); /* Find STUN error code attribute */ err_attr = (pj_stun_errcode_attr*) diff --git a/pjnath/src/pjnath/turn_session.c b/pjnath/src/pjnath/turn_session.c index cbe8f5c..7031cd3 100644 --- a/pjnath/src/pjnath/turn_session.c +++ b/pjnath/src/pjnath/turn_session.c @@ -1,4 +1,4 @@ -/* $Id: turn_session.c 3844 2011-10-24 15:03:43Z bennylp $ */ +/* $Id: turn_session.c 4368 2013-02-21 21:53:28Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -112,8 +112,9 @@ struct pj_turn_session pj_turn_session_cb cb; void *user_data; pj_stun_config stun_cfg; + pj_bool_t is_destroying; - pj_lock_t *lock; + pj_grp_lock_t *grp_lock; int busy; pj_turn_state_t state; @@ -161,6 +162,7 @@ struct pj_turn_session */ static void sess_shutdown(pj_turn_session *sess, pj_status_t status); +static void turn_sess_on_destroy(void *comp); static void do_destroy(pj_turn_session *sess); static void send_refresh(pj_turn_session *sess, int lifetime); static pj_status_t stun_on_send_msg(pj_stun_session *sess, @@ -236,6 +238,7 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, const char *name, int af, pj_turn_tp_type conn_type, + pj_grp_lock_t *grp_lock, const pj_turn_session_cb *cb, unsigned options, void *user_data, @@ -244,7 +247,6 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, pj_pool_t *pool; pj_turn_session *sess; pj_stun_session_cb stun_cb; - pj_lock_t *null_lock; pj_status_t status; PJ_ASSERT_RETURN(cfg && cfg->pf && cb && p_sess, PJ_EINVAL); @@ -281,13 +283,20 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, sess->perm_table = pj_hash_create(pool, PJ_TURN_PERM_HTABLE_SIZE); /* Session lock */ - status = pj_lock_create_recursive_mutex(pool, sess->obj_name, - &sess->lock); - if (status != PJ_SUCCESS) { - do_destroy(sess); - return status; + if (grp_lock) { + sess->grp_lock = grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &sess->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(sess->grp_lock); + pj_grp_lock_add_handler(sess->grp_lock, pool, sess, + &turn_sess_on_destroy); + /* Timer */ pj_timer_entry_init(&sess->timer, TIMER_NONE, sess, &on_timer_event); @@ -297,7 +306,7 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, stun_cb.on_request_complete = &stun_on_request_complete; stun_cb.on_rx_indication = &stun_on_rx_indication; status = pj_stun_session_create(&sess->stun_cfg, sess->obj_name, &stun_cb, - PJ_FALSE, &sess->stun); + PJ_FALSE, sess->grp_lock, &sess->stun); if (status != PJ_SUCCESS) { do_destroy(sess); return status; @@ -306,16 +315,6 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, /* Attach ourself to STUN session */ pj_stun_session_set_user_data(sess->stun, sess); - /* Replace mutex in STUN session with a NULL mutex, since access to - * STUN session is serialized. - */ - status = pj_lock_create_null_mutex(pool, name, &null_lock); - if (status != PJ_SUCCESS) { - do_destroy(sess); - return status; - } - pj_stun_session_set_lock(sess->stun, null_lock, PJ_TRUE); - /* Done */ PJ_LOG(4,(sess->obj_name, "TURN client session created")); @@ -325,32 +324,9 @@ PJ_DEF(pj_status_t) pj_turn_session_create( const pj_stun_config *cfg, } -/* Destroy */ -static void do_destroy(pj_turn_session *sess) +static void turn_sess_on_destroy(void *comp) { - /* Lock session */ - if (sess->lock) { - pj_lock_acquire(sess->lock); - } - - /* Cancel pending timer, if any */ - if (sess->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(sess->timer_heap, &sess->timer); - sess->timer.id = TIMER_NONE; - } - - /* Destroy STUN session */ - if (sess->stun) { - pj_stun_session_destroy(sess->stun); - sess->stun = NULL; - } - - /* Destroy lock */ - if (sess->lock) { - pj_lock_release(sess->lock); - pj_lock_destroy(sess->lock); - sess->lock = NULL; - } + pj_turn_session *sess = (pj_turn_session*) comp; /* Destroy pool */ if (sess->pool) { @@ -363,6 +339,26 @@ static void do_destroy(pj_turn_session *sess) } } +/* Destroy */ +static void do_destroy(pj_turn_session *sess) +{ + PJ_LOG(4,(sess->obj_name, "TURN session destroy request, ref_cnt=%d", + pj_grp_lock_get_ref(sess->grp_lock))); + + pj_grp_lock_acquire(sess->grp_lock); + if (sess->is_destroying) { + pj_grp_lock_release(sess->grp_lock); + return; + } + + sess->is_destroying = PJ_TRUE; + pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer, TIMER_NONE); + pj_stun_session_destroy(sess->stun); + + pj_grp_lock_dec_ref(sess->grp_lock); + pj_grp_lock_release(sess->grp_lock); +} + /* Set session state */ static void set_state(pj_turn_session *sess, enum pj_turn_state_t state) @@ -421,7 +417,10 @@ static void sess_shutdown(pj_turn_session *sess, /* This may recursively call this function again with * state==PJ_TURN_STATE_DEALLOCATED. */ + /* No need to deallocate as we're already deallocating! + * See https://trac.pjsip.org/repos/ticket/1551 send_refresh(sess, 0); + */ break; case PJ_TURN_STATE_DEALLOCATED: case PJ_TURN_STATE_DESTROYING: @@ -434,13 +433,11 @@ static void sess_shutdown(pj_turn_session *sess, set_state(sess, PJ_TURN_STATE_DESTROYING); - if (sess->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(sess->timer_heap, &sess->timer); - sess->timer.id = TIMER_NONE; - } - - sess->timer.id = TIMER_DESTROY; - pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay); + pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer, + TIMER_NONE); + pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer, + &delay, TIMER_DESTROY, + sess->grp_lock); } } @@ -452,11 +449,11 @@ PJ_DEF(pj_status_t) pj_turn_session_shutdown(pj_turn_session *sess) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); sess_shutdown(sess, PJ_SUCCESS); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -550,9 +547,9 @@ PJ_DEF(pj_status_t) pj_turn_session_set_software_name( pj_turn_session *sess, { pj_status_t status; - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); status = pj_stun_session_set_software_name(sess->stun, sw); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -573,7 +570,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_server( pj_turn_session *sess, PJ_ASSERT_RETURN(sess && domain, PJ_EINVAL); PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_NULL, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* See if "domain" contains just IP address */ tmp_addr.addr.sa_family = sess->af; @@ -673,7 +670,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_server( pj_turn_session *sess, } on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -687,11 +684,11 @@ PJ_DEF(pj_status_t) pj_turn_session_set_credential(pj_turn_session *sess, PJ_ASSERT_RETURN(sess && cred, PJ_EINVAL); PJ_ASSERT_RETURN(sess->stun, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); pj_stun_session_set_credential(sess->stun, PJ_STUN_AUTH_LONG_TERM, cred); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -712,7 +709,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, sess->state<=PJ_TURN_STATE_RESOLVED, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); if (param && param != &sess->alloc_param) pj_turn_alloc_param_copy(sess->pool, &sess->alloc_param, param); @@ -723,7 +720,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, PJ_LOG(4,(sess->obj_name, "Pending ALLOCATE in state %s", state_names[sess->state])); - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; } @@ -735,7 +732,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, status = pj_stun_session_create_req(sess->stun, PJ_STUN_ALLOCATE_REQUEST, PJ_STUN_MAGIC, NULL, &tdata); if (status != PJ_SUCCESS) { - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -775,7 +772,7 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, set_state(sess, PJ_TURN_STATE_RESOLVED); } - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -796,14 +793,14 @@ PJ_DEF(pj_status_t) pj_turn_session_set_perm( pj_turn_session *sess, PJ_ASSERT_RETURN(sess && addr_cnt && addr, PJ_EINVAL); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* Create a bare CreatePermission request */ status = pj_stun_session_create_req(sess->stun, PJ_STUN_CREATE_PERM_REQUEST, PJ_STUN_MAGIC, NULL, &tdata); if (status != PJ_SUCCESS) { - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -854,7 +851,7 @@ PJ_DEF(pj_status_t) pj_turn_session_set_perm( pj_turn_session *sess, goto on_error; } - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return PJ_SUCCESS; on_error: @@ -871,7 +868,7 @@ on_error: if (perm->req_token == req_token) invalidate_perm(sess, perm); } - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -942,7 +939,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, } /* Lock session now */ - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* Lookup permission first */ perm = lookup_perm(sess, addr, pj_sockaddr_get_len(addr), PJ_FALSE); @@ -957,7 +954,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, status = pj_turn_session_set_perm(sess, 1, (const pj_sockaddr*)addr, 0); if (status != PJ_SUCCESS) { - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } } @@ -1032,7 +1029,7 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, } on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1052,7 +1049,7 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess, PJ_ASSERT_RETURN(sess && peer_adr && addr_len, PJ_EINVAL); PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_READY, PJ_EINVALIDOP); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); /* Create blank ChannelBind request */ status = pj_stun_session_create_req(sess->stun, @@ -1095,7 +1092,7 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess, tdata); on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1118,7 +1115,7 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess, */ /* Start locking the session */ - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); is_datagram = (sess->conn_type==PJ_TURN_TP_UDP); @@ -1190,7 +1187,7 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess, } on_return: - pj_lock_release(sess->lock); + pj_grp_lock_release(sess->grp_lock); return status; } @@ -1382,20 +1379,22 @@ static void on_allocate_success(pj_turn_session *sess, /* Cancel existing keep-alive timer, if any */ pj_assert(sess->timer.id != TIMER_DESTROY); - - if (sess->timer.id != TIMER_NONE) { - pj_timer_heap_cancel(sess->timer_heap, &sess->timer); - sess->timer.id = TIMER_NONE; + if (sess->timer.id == TIMER_KEEP_ALIVE) { + pj_timer_heap_cancel_if_active(sess->timer_heap, &sess->timer, + TIMER_NONE); } /* Start keep-alive timer once allocation succeeds */ - timeout.sec = sess->ka_interval; - timeout.msec = 0; + if (sess->state < PJ_TURN_STATE_DEALLOCATING) { + timeout.sec = sess->ka_interval; + timeout.msec = 0; - sess->timer.id = TIMER_KEEP_ALIVE; - pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &timeout); + pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer, + &timeout, TIMER_KEEP_ALIVE, + sess->grp_lock); - set_state(sess, PJ_TURN_STATE_READY); + set_state(sess, PJ_TURN_STATE_READY); + } } /* @@ -1945,7 +1944,7 @@ static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e) PJ_UNUSED_ARG(th); - pj_lock_acquire(sess->lock); + pj_grp_lock_acquire(sess->grp_lock); eid = (enum timer_id_t) e->id; e->id = TIMER_NONE; @@ -1956,6 +1955,11 @@ static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e) pj_bool_t resched = PJ_TRUE; pj_bool_t pkt_sent = PJ_FALSE; + if (sess->state >= PJ_TURN_STATE_DEALLOCATING) { + /* Ignore if we're deallocating */ + goto on_return; + } + pj_gettimeofday(&now); /* Refresh allocation if it's time to do so */ @@ -2022,19 +2026,19 @@ static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e) delay.sec = sess->ka_interval; delay.msec = 0; - sess->timer.id = TIMER_KEEP_ALIVE; - pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay); + pj_timer_heap_schedule_w_grp_lock(sess->timer_heap, &sess->timer, + &delay, TIMER_KEEP_ALIVE, + sess->grp_lock); } - pj_lock_release(sess->lock); - } else if (eid == TIMER_DESTROY) { /* Time to destroy */ - pj_lock_release(sess->lock); do_destroy(sess); } else { pj_assert(!"Unknown timer event"); - pj_lock_release(sess->lock); } + +on_return: + pj_grp_lock_release(sess->grp_lock); } diff --git a/pjnath/src/pjnath/turn_sock.c b/pjnath/src/pjnath/turn_sock.c index 799b557..970a955 100644 --- a/pjnath/src/pjnath/turn_sock.c +++ b/pjnath/src/pjnath/turn_sock.c @@ -1,4 +1,4 @@ -/* $Id: turn_sock.c 3841 2011-10-24 09:28:13Z ming $ */ +/* $Id: turn_sock.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -32,6 +32,10 @@ enum TIMER_DESTROY }; + +enum { MAX_BIND_RETRY = 100 }; + + #define INIT 0x1FFFFFFF struct pj_turn_sock @@ -42,13 +46,13 @@ struct pj_turn_sock pj_turn_sock_cb cb; void *user_data; - pj_lock_t *lock; + pj_bool_t is_destroying; + pj_grp_lock_t *grp_lock; pj_turn_alloc_param alloc_param; pj_stun_config cfg; pj_turn_sock_cfg setting; - pj_bool_t destroy_request; pj_timer_entry timer; int af; @@ -89,6 +93,7 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, +static void turn_sock_on_destroy(void *comp); static void destroy(pj_turn_sock *turn_sock); static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e); @@ -97,10 +102,12 @@ static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e); PJ_DEF(void) pj_turn_sock_cfg_default(pj_turn_sock_cfg *cfg) { pj_bzero(cfg, sizeof(*cfg)); + cfg->max_pkt_size = PJ_TURN_MAX_PKT_LEN; cfg->qos_type = PJ_QOS_TYPE_BEST_EFFORT; cfg->qos_ignore_error = PJ_TRUE; } + /* * Create. */ @@ -162,14 +169,21 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, pj_memcpy(&turn_sock->cb, cb, sizeof(*cb)); } - /* Create lock */ - status = pj_lock_create_recursive_mutex(pool, turn_sock->obj_name, - &turn_sock->lock); - if (status != PJ_SUCCESS) { - destroy(turn_sock); - return status; + /* Session lock */ + if (setting && setting->grp_lock) { + turn_sock->grp_lock = setting->grp_lock; + } else { + status = pj_grp_lock_create(pool, NULL, &turn_sock->grp_lock); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } } + pj_grp_lock_add_ref(turn_sock->grp_lock); + pj_grp_lock_add_handler(turn_sock->grp_lock, pool, turn_sock, + &turn_sock_on_destroy); + /* Init timer */ pj_timer_entry_init(&turn_sock->timer, TIMER_NONE, turn_sock, &timer_cb); @@ -180,7 +194,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, sess_cb.on_rx_data = &turn_on_rx_data; sess_cb.on_state = &turn_on_state; status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, - &sess_cb, 0, turn_sock, &turn_sock->sess); + turn_sock->grp_lock, &sess_cb, 0, + turn_sock, &turn_sock->sess); if (status != PJ_SUCCESS) { destroy(turn_sock); return status; @@ -197,41 +212,45 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, /* * Destroy. */ -static void destroy(pj_turn_sock *turn_sock) +static void turn_sock_on_destroy(void *comp) { - if (turn_sock->lock) { - pj_lock_acquire(turn_sock->lock); - } - - if (turn_sock->sess) { - pj_turn_session_set_user_data(turn_sock->sess, NULL); - pj_turn_session_shutdown(turn_sock->sess); - turn_sock->sess = NULL; - } - - if (turn_sock->active_sock) { - pj_activesock_close(turn_sock->active_sock); - turn_sock->active_sock = NULL; - } - - if (turn_sock->lock) { - pj_lock_release(turn_sock->lock); - pj_lock_destroy(turn_sock->lock); - turn_sock->lock = NULL; - } + pj_turn_sock *turn_sock = (pj_turn_sock*) comp; if (turn_sock->pool) { pj_pool_t *pool = turn_sock->pool; + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroyed")); turn_sock->pool = NULL; pj_pool_release(pool); } } +static void destroy(pj_turn_sock *turn_sock) +{ + PJ_LOG(4,(turn_sock->obj_name, "TURN socket destroy request, ref_cnt=%d", + pj_grp_lock_get_ref(turn_sock->grp_lock))); + + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } + + turn_sock->is_destroying = PJ_TRUE; + if (turn_sock->sess) + pj_turn_session_shutdown(turn_sock->sess); + if (turn_sock->active_sock) + pj_activesock_close(turn_sock->active_sock); + pj_grp_lock_dec_ref(turn_sock->grp_lock); + pj_grp_lock_release(turn_sock->grp_lock); +} PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) { - pj_lock_acquire(turn_sock->lock); - turn_sock->destroy_request = PJ_TRUE; + pj_grp_lock_acquire(turn_sock->grp_lock); + if (turn_sock->is_destroying) { + pj_grp_lock_release(turn_sock->grp_lock); + return; + } if (turn_sock->sess) { pj_turn_session_shutdown(turn_sock->sess); @@ -239,12 +258,11 @@ PJ_DEF(void) pj_turn_sock_destroy(pj_turn_sock *turn_sock) * session state is DESTROYING we will schedule a timer to * destroy ourselves. */ - pj_lock_release(turn_sock->lock); } else { - pj_lock_release(turn_sock->lock); destroy(turn_sock); } + pj_grp_lock_release(turn_sock->grp_lock); } @@ -260,7 +278,6 @@ static void timer_cb(pj_timer_heap_t *th, pj_timer_entry *e) switch (eid) { case TIMER_DESTROY: - PJ_LOG(5,(turn_sock->obj_name, "Destroying TURN")); destroy(turn_sock); break; default: @@ -330,7 +347,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, */ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) { - return pj_lock_acquire(turn_sock->lock); + return pj_grp_lock_acquire(turn_sock->grp_lock); } /** @@ -338,7 +355,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) */ PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock) { - return pj_lock_release(turn_sock->lock); + return pj_grp_lock_release(turn_sock->grp_lock); } /* @@ -374,6 +391,8 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, PJ_ASSERT_RETURN(turn_sock && domain, PJ_EINVAL); PJ_ASSERT_RETURN(turn_sock->sess, PJ_EINVALIDOP); + pj_grp_lock_acquire(turn_sock->grp_lock); + /* Copy alloc param. We will call session_alloc() only after the * server address has been resolved. */ @@ -388,6 +407,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, status = pj_turn_session_set_credential(turn_sock->sess, cred); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting credential", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } } @@ -397,13 +417,14 @@ PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, resolver); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error setting TURN server", status); + pj_grp_lock_release(turn_sock->grp_lock); return status; } /* Done for now. The next work will be done when session state moved * to RESOLVED state. */ - + pj_grp_lock_release(turn_sock->grp_lock); return PJ_SUCCESS; } @@ -462,9 +483,23 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, pj_turn_sock *turn_sock; turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); + if (!turn_sock) + return PJ_FALSE; + + pj_grp_lock_acquire(turn_sock->grp_lock); + + /* TURN session may have already been destroyed here. + * See ticket #1557 (http://trac.pjsip.org/repos/ticket/1557). + */ + if (!turn_sock->sess) { + sess_fail(turn_sock, "TURN session already destroyed", status); + pj_grp_lock_release(turn_sock->grp_lock); + return PJ_FALSE; + } if (status != PJ_SUCCESS) { sess_fail(turn_sock, "TCP connect() error", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } @@ -474,7 +509,7 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, /* Kick start pending read operation */ status = pj_activesock_start_read(asock, turn_sock->pool, - PJ_TURN_MAX_PKT_LEN, 0); + turn_sock->setting.max_pkt_size, 0); /* Init send_key */ pj_ioqueue_op_key_init(&turn_sock->send_key, sizeof(turn_sock->send_key)); @@ -483,9 +518,11 @@ static pj_bool_t on_connect_complete(pj_activesock_t *asock, status = pj_turn_session_alloc(turn_sock->sess, &turn_sock->alloc_param); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error sending ALLOCATE", status); + pj_grp_lock_release(turn_sock->grp_lock); return PJ_FALSE; } + pj_grp_lock_release(turn_sock->grp_lock); return PJ_TRUE; } @@ -545,9 +582,9 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, pj_bool_t ret = PJ_TRUE; turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); - pj_lock_acquire(turn_sock->lock); + pj_grp_lock_acquire(turn_sock->grp_lock); - if (status == PJ_SUCCESS && turn_sock->sess) { + if (status == PJ_SUCCESS && turn_sock->sess && !turn_sock->is_destroying) { /* Report incoming packet to TURN session, repeat while we have * "packet" in the buffer (required for stream-oriented transports) */ @@ -597,7 +634,7 @@ static pj_bool_t on_data_read(pj_activesock_t *asock, } on_return: - pj_lock_release(turn_sock->lock); + pj_grp_lock_release(turn_sock->grp_lock); return ret; } @@ -617,7 +654,7 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess, pj_ssize_t len = pkt_len; pj_status_t status; - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ // https://trac.pjsip.org/repos/ticket/1316 //pj_assert(!"We should shutdown gracefully"); @@ -663,7 +700,7 @@ static void turn_on_rx_data(pj_turn_session *sess, { pj_turn_sock *turn_sock = (pj_turn_sock*) pj_turn_session_get_user_data(sess); - if (turn_sock == NULL) { + if (turn_sock == NULL || turn_sock->is_destroying) { /* We've been destroyed */ return; } @@ -712,7 +749,10 @@ static void turn_on_state(pj_turn_session *sess, char addrtxt[PJ_INET6_ADDRSTRLEN+8]; int sock_type; pj_sock_t sock; + pj_activesock_cfg asock_cfg; pj_activesock_cb asock_cb; + pj_sockaddr bound_addr, *cfg_bind_addr; + pj_uint16_t max_bind_retry; /* Close existing connection, if any. This happens when * we're switching to alternate TURN server when either TCP @@ -738,7 +778,29 @@ static void turn_on_state(pj_turn_session *sess, return; } - /* Apply QoS, if specified */ + /* Bind socket */ + cfg_bind_addr = &turn_sock->setting.bound_addr; + max_bind_retry = MAX_BIND_RETRY; + if (turn_sock->setting.port_range && + turn_sock->setting.port_range < max_bind_retry) + { + max_bind_retry = turn_sock->setting.port_range; + } + pj_sockaddr_init(turn_sock->af, &bound_addr, NULL, 0); + if (cfg_bind_addr->addr.sa_family == pj_AF_INET() || + cfg_bind_addr->addr.sa_family == pj_AF_INET6()) + { + pj_sockaddr_cp(&bound_addr, cfg_bind_addr); + } + status = pj_sock_bind_random(sock, &bound_addr, + turn_sock->setting.port_range, + max_bind_retry); + if (status != PJ_SUCCESS) { + pj_turn_sock_destroy(turn_sock); + return; + } + + /* Apply QoS, if specified */ status = pj_sock_apply_qos2(sock, turn_sock->setting.qos_type, &turn_sock->setting.qos_params, (turn_sock->setting.qos_ignore_error?2:1), @@ -749,11 +811,14 @@ static void turn_on_state(pj_turn_session *sess, } /* Create active socket */ + pj_activesock_cfg_default(&asock_cfg); + asock_cfg.grp_lock = turn_sock->grp_lock; + pj_bzero(&asock_cb, sizeof(asock_cb)); asock_cb.on_data_read = &on_data_read; asock_cb.on_connect_complete = &on_connect_complete; status = pj_activesock_create(turn_sock->pool, sock, - sock_type, NULL, + sock_type, &asock_cfg, turn_sock->cfg.ioqueue, &asock_cb, turn_sock, &turn_sock->active_sock); @@ -794,14 +859,12 @@ static void turn_on_state(pj_turn_session *sess, turn_sock->sess = NULL; pj_turn_session_set_user_data(sess, NULL); - if (turn_sock->timer.id) { - pj_timer_heap_cancel(turn_sock->cfg.timer_heap, &turn_sock->timer); - turn_sock->timer.id = 0; - } - - turn_sock->timer.id = TIMER_DESTROY; - pj_timer_heap_schedule(turn_sock->cfg.timer_heap, &turn_sock->timer, - &delay); + pj_timer_heap_cancel_if_active(turn_sock->cfg.timer_heap, + &turn_sock->timer, 0); + pj_timer_heap_schedule_w_grp_lock(turn_sock->cfg.timer_heap, + &turn_sock->timer, + &delay, TIMER_DESTROY, + turn_sock->grp_lock); } } diff --git a/pjnath/src/pjturn-srv/allocation.c b/pjnath/src/pjturn-srv/allocation.c index 9371635..f267bf8 100644 --- a/pjnath/src/pjturn-srv/allocation.c +++ b/pjnath/src/pjturn-srv/allocation.c @@ -1,4 +1,4 @@ -/* $Id: allocation.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: allocation.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -338,7 +338,7 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_transport *transport, sess_cb.on_rx_request = &stun_on_rx_request; sess_cb.on_rx_indication = &stun_on_rx_indication; status = pj_stun_session_create(&srv->core.stun_cfg, alloc->obj_name, - &sess_cb, PJ_FALSE, &alloc->sess); + &sess_cb, PJ_FALSE, NULL, &alloc->sess); if (status != PJ_SUCCESS) { goto on_error; } diff --git a/pjnath/src/pjturn-srv/server.c b/pjnath/src/pjturn-srv/server.c index 3732898..f27d3a9 100644 --- a/pjnath/src/pjturn-srv/server.c +++ b/pjnath/src/pjturn-srv/server.c @@ -1,4 +1,4 @@ -/* $Id: server.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* $Id: server.c 4360 2013-02-21 11:26:35Z bennylp $ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> @@ -155,7 +155,8 @@ PJ_DEF(pj_status_t) pj_turn_srv_create(pj_pool_factory *pf, sess_cb.on_send_msg = &on_tx_stun_msg; status = pj_stun_session_create(&srv->core.stun_cfg, srv->obj_name, - &sess_cb, PJ_FALSE, &srv->core.stun_sess); + &sess_cb, PJ_FALSE, NULL, + &srv->core.stun_sess); if (status != PJ_SUCCESS) { goto on_error; } |