From e2aa3dae84706748ba106c352557c9d9cea90efc Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Wed, 12 Mar 2008 20:52:16 +0000 Subject: More ticket #485: implementation of TURN UDP client session git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1862 74dad513-b988-da41-8d7b-12977e46ad98 --- pjnath/include/pjnath/errno.h | 23 +- pjnath/include/pjnath/stun_session.h | 18 +- pjnath/include/pjnath/turn_session.h | 131 ++++- pjnath/include/pjnath/turn_udp.h | 134 +++++ pjnath/src/pjnath/errno.c | 4 + pjnath/src/pjnath/stun_session.c | 66 ++- pjnath/src/pjnath/turn_session.c | 987 ++++++++++++++++++++++++++++++----- pjnath/src/pjnath/turn_udp.c | 322 ++++++++++++ 8 files changed, 1519 insertions(+), 166 deletions(-) create mode 100644 pjnath/include/pjnath/turn_udp.h create mode 100644 pjnath/src/pjnath/turn_udp.c diff --git a/pjnath/include/pjnath/errno.h b/pjnath/include/pjnath/errno.h index 1f763fc1..56763916 100644 --- a/pjnath/include/pjnath/errno.h +++ b/pjnath/include/pjnath/errno.h @@ -48,22 +48,26 @@ */ #define PJ_STATUS_FROM_STUN_CODE(code) (PJNATH_ERRNO_START+code) +/** + * @hideinitializer + * Invalid STUN message + */ +#define PJNATH_EINSTUNMSG (PJNATH_ERRNO_START+1) /* 370001 */ /** * @hideinitializer * Invalid STUN message length. */ -#define PJNATH_EINSTUNMSGLEN (PJNATH_ERRNO_START+1) /* 370001 */ +#define PJNATH_EINSTUNMSGLEN (PJNATH_ERRNO_START+2) /* 370002 */ /** * @hideinitializer * Invalid or unexpected STUN message type */ -#define PJNATH_EINSTUNMSGTYPE (PJNATH_ERRNO_START+2) /* 370002 */ +#define PJNATH_EINSTUNMSGTYPE (PJNATH_ERRNO_START+3) /* 370003 */ /** * @hideinitializer * STUN transaction has timed out */ -#define PJNATH_ESTUNTIMEDOUT (PJNATH_ERRNO_START+3) /* 370003 */ - +#define PJNATH_ESTUNTIMEDOUT (PJNATH_ERRNO_START+4) /* 370004 */ /** @@ -173,6 +177,17 @@ #define PJNATH_EICENOHOSTCAND (PJNATH_ERRNO_START+92) /* 370092 */ +/************************************************************ + * TURN ERROR CODES + ***********************************************************/ +/** + * @hideinitializer + * Invalid or unsupported TURN transport. + */ +#define PJNATH_ETURNINTP (PJNATH_ERRNO_START+120) /* 370120 */ + + + /** * @} */ diff --git a/pjnath/include/pjnath/stun_session.h b/pjnath/include/pjnath/stun_session.h index 2f096d57..b28763ac 100644 --- a/pjnath/include/pjnath/stun_session.h +++ b/pjnath/include/pjnath/stun_session.h @@ -112,7 +112,7 @@ typedef struct pj_stun_session_cb * Note that when the status is not success, the * response may contain non-NULL value if the * response contains STUN ERROR-CODE attribute. - * @param request The original STUN request. + * @param tdata The original STUN request. * @param response The response message, on successful transaction, * or otherwise MAY BE NULL if status is not success. * Note that when the status is not success, this @@ -223,6 +223,22 @@ PJ_DECL(pj_status_t) pj_stun_session_set_user_data(pj_stun_session *sess, */ PJ_DECL(void*) pj_stun_session_get_user_data(pj_stun_session *sess); +/** + * Change the lock object used by the STUN session. By default, the STUN + * session uses a mutex to protect its internal data. If application already + * protects access to STUN session with higher layer lock, it may disable + * the mutex protection in the STUN session by changing the STUN session + * lock to a NULL mutex. + * + * @param sess The STUN session instance. + * @param lock New lock instance to be used by the STUN session. + * @param auto_del Specify whether STUN session should destroy this + * lock instance when it's destroyed. + */ +PJ_DECL(pj_status_t) pj_stun_session_set_lock(pj_stun_session *sess, + pj_lock_t *lock, + pj_bool_t auto_del); + /** * Set server name to be included in all response. * diff --git a/pjnath/include/pjnath/turn_session.h b/pjnath/include/pjnath/turn_session.h index 19260b77..bb00e0de 100644 --- a/pjnath/include/pjnath/turn_session.h +++ b/pjnath/include/pjnath/turn_session.h @@ -48,15 +48,73 @@ typedef struct pj_turn_session pj_turn_session; #define PJ_TURN_CHANNEL_MAX 0xFFFE /* inclusive */ #define PJ_TURN_NO_TIMEOUT ((long)0x7FFFFFFF) #define PJ_TURN_MAX_PKT_LEN 3000 -#define PJ_TURN_PERM_TIMEOUT 300 -#define PJ_TURN_CHANNEL_TIMEOUT 600 +#define PJ_TURN_PERM_TIMEOUT 300 /* Must be greater than REFRESH_SEC_BEFORE */ +#define PJ_TURN_CHANNEL_TIMEOUT 600 /* Must be greater than REFRESH_SEC_BEFORE */ +#define PJ_TURN_REFRESH_SEC_BEFORE 60 +#define PJ_TURN_KEEP_ALIVE_SEC 15 +#define PJ_TURN_PEER_HTABLE_SIZE 8 -/** Transport types */ -enum { +/** TURN transport types */ +typedef enum pj_turn_tp_type +{ PJ_TURN_TP_UDP = 16, /**< UDP. */ - PJ_TURN_TP_TCP = 6 /**< TCP. */ -}; + PJ_TURN_TP_TCP = 6, /**< TCP. */ + PJ_TURN_TP_TLS = 256 /**< TLS. */ +} pj_turn_tp_type; + + +/** TURN session state */ +typedef enum pj_turn_state_t +{ + /** + * TURN session has just been created. + */ + PJ_TURN_STATE_NULL, + + /** + * TURN server has been configured and now is being resolved via + * DNS SRV resolution. + */ + PJ_TURN_STATE_RESOLVING, + + /** + * TURN server has been resolved. If there is pending allocation to + * be done, it will be invoked immediately. + */ + PJ_TURN_STATE_RESOLVED, + + /** + * TURN session has issued ALLOCATE request and is waiting for response + * from the TURN server. + */ + PJ_TURN_STATE_ALLOCATING, + + /** + * TURN session has successfully allocated relay resoruce and now is + * ready to be used. + */ + PJ_TURN_STATE_READY, + + /** + * TURN session has issued deallocate request and is waiting for a + * response from the TURN server. + */ + PJ_TURN_STATE_DEALLOCATING, + + /** + * Deallocate response has been received. Normally the session will + * proceed to DESTROYING state immediately. + */ + PJ_TURN_STATE_DEALLOCATED, + + /** + * TURN session is being destroyed. + */ + PJ_TURN_STATE_DESTROYING + +} pj_turn_state_t; + /* ChannelData header */ typedef struct pj_turn_channel_data @@ -73,7 +131,9 @@ typedef struct pj_turn_channel_data typedef struct pj_turn_session_cb { /** - * Callback to send outgoing packet. This callback is mandatory. + * This callback will be called by the TURN session whenever it + * needs to send outgoing message. Since the TURN session doesn't + * have a socket on its own, this callback must be implemented. */ pj_status_t (*on_send_pkt)(pj_turn_session *sess, const pj_uint8_t *pkt, @@ -82,14 +142,21 @@ typedef struct pj_turn_session_cb unsigned dst_addr_len); /** - * Notification when allocation completes, either successfully or - * with failure. + * Notification when peer address has been bound successfully to + * a channel number. + * + * This callback is optional. */ - void (*on_allocate_complete)(pj_turn_session *sess, - pj_status_t status); + void (*on_channel_bound)(pj_turn_session *sess, + const pj_sockaddr_t *peer_addr, + unsigned addr_len, + unsigned ch_num); /** - * Notification when data is received. + * Notification when incoming data has been received, either through + * Data indication or ChannelData message from the TURN server. + * + * This callback is optional. */ void (*on_rx_data)(pj_turn_session *sess, const pj_uint8_t *pkt, @@ -98,9 +165,12 @@ typedef struct pj_turn_session_cb unsigned addr_len); /** - * Notification when session has been destroyed. + * Notification when TURN session state has changed. Application should + * implement this callback at least to know that the TURN session is + * going to be destroyed. */ - void (*on_destroyed)(pj_turn_session *sess); + void (*on_state)(pj_turn_session *sess, pj_turn_state_t old_state, + pj_turn_state_t new_state); } pj_turn_session_cb; @@ -112,6 +182,7 @@ typedef struct pj_turn_alloc_param { int bandwidth; int lifetime; + int ka_interval; } pj_turn_alloc_param; @@ -120,15 +191,35 @@ typedef struct pj_turn_alloc_param */ typedef struct pj_turn_session_info { + /** + * The relay address + */ + pj_sockaddr relay_addr; + + /** + * The TURN server address for informational purpose. + */ pj_sockaddr server; + } pj_turn_session_info; +/** + * Get TURN state name. + */ +PJ_DECL(const char*) pj_turn_state_name(pj_turn_state_t state); + + /** * Create TURN client session. */ PJ_DECL(pj_status_t) pj_turn_session_create(pj_stun_config *cfg, + const char *name, + int af, + pj_turn_tp_type conn_type, const pj_turn_session_cb *cb, + void *user_data, + unsigned options, pj_turn_session **p_sess); @@ -138,12 +229,22 @@ PJ_DECL(pj_status_t) pj_turn_session_create(pj_stun_config *cfg, PJ_DECL(pj_status_t) pj_turn_session_destroy(pj_turn_session *sess); +/** + * Re-assign user data. + */ +PJ_DECL(pj_status_t) pj_turn_session_set_user_data(pj_turn_session *sess, + void *user_data); + +/** + * Retrieve user data. + */ +PJ_DECL(void*) pj_turn_session_get_user_data(pj_turn_session *sess); + /** * Set the server or domain name of the server. */ PJ_DECL(pj_status_t) pj_turn_session_set_server(pj_turn_session *sess, const pj_str_t *domain, - const pj_str_t *res_name, int default_port, pj_dns_resolver *resolver); diff --git a/pjnath/include/pjnath/turn_udp.h b/pjnath/include/pjnath/turn_udp.h new file mode 100644 index 00000000..f0d7c3c4 --- /dev/null +++ b/pjnath/include/pjnath/turn_udp.h @@ -0,0 +1,134 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2007 Benny Prijono + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#ifndef __PJNATH_TURN_UDP_H__ +#define __PJNATH_TURN_UDP_H__ + +/** + * @file turn_udp.h + * @brief TURN relay using UDP client as transport protocol + */ +#include + + +PJ_BEGIN_DECL + + +/* **************************************************************************/ +/** + * @defgroup PJNATH_TURN_UDP TURN UDP client + * @brief TURN relay using UDP client as transport protocol + * @ingroup PJNATH_STUN + * @{ + */ + + +/** + * Opaque declaration for TURN UDP client. + */ +typedef struct pj_turn_udp pj_turn_udp; + + +typedef struct pj_turn_udp_cb +{ + /** + * Notification when incoming data has been received, either through + * Data indication or ChannelData message from the TURN server. + * + * This callback is mandatory. + */ + void (*on_rx_data)(pj_turn_udp *udp_rel, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_sockaddr_t *peer_addr, + unsigned addr_len); + + /** + * Notification when TURN session state has changed. Application should + * implement this callback to know that the TURN session is no longer + * available. + */ + void (*on_state)(pj_turn_udp *udp_rel, pj_turn_state_t old_state, + pj_turn_state_t new_state); + +} pj_turn_udp_cb; + + +/** + * Create. + */ +PJ_DECL(pj_status_t) pj_turn_udp_create(pj_stun_config *cfg, + int af, + const pj_turn_udp_cb *cb, + unsigned options, + void *user_data, + pj_turn_udp **p_udp_rel); + +/** + * Destroy. + */ +PJ_DECL(void) pj_turn_udp_destroy(pj_turn_udp *udp_rel); + +/** + * Set user data. + */ +PJ_DECL(pj_status_t) pj_turn_udp_set_user_data(pj_turn_udp *udp_rel, + void *user_data); + +/** + * Get user data. + */ +PJ_DECL(void*) pj_turn_udp_get_user_data(pj_turn_udp *udp_rel); + +/** + * Initialize. + */ +PJ_DECL(pj_status_t) pj_turn_udp_init(pj_turn_udp *udp_rel, + const pj_str_t *domain, + int default_port, + pj_dns_resolver *resolver, + const pj_stun_auth_cred *cred, + const pj_turn_alloc_param *param); + +/** + * Send packet. + */ +PJ_DECL(pj_status_t) pj_turn_udp_sendto(pj_turn_udp *udp_rel, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_sockaddr_t *addr, + unsigned addr_len); + +/** + * Bind a peer address to a channel number. + */ +PJ_DECL(pj_status_t) pj_turn_udp_bind_channel(pj_turn_udp *udp_rel, + const pj_sockaddr_t *peer, + unsigned addr_len); + + +/** + * @} + */ + + +PJ_END_DECL + + +#endif /* __PJNATH_TURN_UDP_H__ */ + diff --git a/pjnath/src/pjnath/errno.c b/pjnath/src/pjnath/errno.c index af636fcd..a075a0c0 100644 --- a/pjnath/src/pjnath/errno.c +++ b/pjnath/src/pjnath/errno.c @@ -35,6 +35,7 @@ static const struct } err_str[] = { /* STUN related error codes */ + PJ_BUILD_ERR( PJNATH_EINSTUNMSG, "Invalid STUN message"), PJ_BUILD_ERR( PJNATH_EINSTUNMSGLEN, "Invalid STUN message length"), PJ_BUILD_ERR( PJNATH_EINSTUNMSGTYPE, "Invalid or unexpected STUN message type"), PJ_BUILD_ERR( PJNATH_ESTUNTIMEDOUT, "STUN transaction has timed out"), @@ -62,6 +63,9 @@ static const struct PJ_BUILD_ERR( PJNATH_EICEINCANDSDP, "Invalid SDP \"candidate\" attribute"), PJ_BUILD_ERR( PJNATH_EICENOHOSTCAND, "No host candidate associated with srflx"), + /* TURN related errors */ + PJ_BUILD_ERR( PJNATH_ETURNINTP, "Invalid/unsupported transport"), + }; #endif /* PJ_HAS_ERROR_STRING */ diff --git a/pjnath/src/pjnath/stun_session.c b/pjnath/src/pjnath/stun_session.c index 4225fe77..d9ae3b75 100644 --- a/pjnath/src/pjnath/stun_session.c +++ b/pjnath/src/pjnath/stun_session.c @@ -23,7 +23,8 @@ struct pj_stun_session { pj_stun_config *cfg; pj_pool_t *pool; - pj_mutex_t *mutex; + pj_lock_t *lock; + pj_bool_t delete_lock; pj_stun_session_cb cb; void *user_data; @@ -402,11 +403,12 @@ PJ_DEF(pj_status_t) pj_stun_session_create( pj_stun_config *cfg, pj_list_init(&sess->pending_request_list); pj_list_init(&sess->cached_response_list); - status = pj_mutex_create_recursive(pool, name, &sess->mutex); + status = pj_lock_create_recursive_mutex(pool, name, &sess->lock); if (status != PJ_SUCCESS) { pj_pool_release(pool); return status; } + sess->delete_lock = PJ_TRUE; *p_sess = sess; @@ -417,7 +419,7 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); - pj_mutex_lock(sess->mutex); + pj_lock_acquire(sess->lock); while (!pj_list_empty(&sess->pending_request_list)) { pj_stun_tx_data *tdata = sess->pending_request_list.next; destroy_tdata(tdata); @@ -426,9 +428,12 @@ PJ_DEF(pj_status_t) pj_stun_session_destroy(pj_stun_session *sess) pj_stun_tx_data *tdata = sess->cached_response_list.next; destroy_tdata(tdata); } - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); + + if (sess->delete_lock) { + pj_lock_destroy(sess->lock); + } - pj_mutex_destroy(sess->mutex); pj_pool_release(sess->pool); return PJ_SUCCESS; @@ -439,9 +444,9 @@ PJ_DEF(pj_status_t) pj_stun_session_set_user_data( pj_stun_session *sess, void *user_data) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); - pj_mutex_lock(sess->mutex); + pj_lock_acquire(sess->lock); sess->user_data = user_data; - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); return PJ_SUCCESS; } @@ -451,6 +456,27 @@ PJ_DEF(void*) pj_stun_session_get_user_data(pj_stun_session *sess) return sess->user_data; } +PJ_DEF(pj_status_t) pj_stun_session_set_lock( pj_stun_session *sess, + pj_lock_t *lock, + pj_bool_t auto_del) +{ + pj_lock_t *old_lock = sess->lock; + pj_bool_t old_del; + + PJ_ASSERT_RETURN(sess && lock, PJ_EINVAL); + + pj_lock_acquire(old_lock); + sess->lock = lock; + old_del = sess->delete_lock; + sess->delete_lock = auto_del; + pj_lock_release(old_lock); + + if (old_lock) + pj_lock_destroy(old_lock); + + return PJ_SUCCESS; +} + PJ_DEF(pj_status_t) pj_stun_session_set_server_name(pj_stun_session *sess, const pj_str_t *srv_name) { @@ -602,13 +628,13 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, tdata->pkt = pj_pool_alloc(tdata->pool, tdata->max_len); /* Start locking the session now */ - pj_mutex_lock(sess->mutex); + pj_lock_acquire(sess->lock); /* Apply options */ status = apply_msg_options(sess, tdata->pool, tdata->msg); if (status != PJ_SUCCESS) { pj_stun_msg_destroy_tdata(sess, tdata); - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); LOG_ERR_(sess, "Error applying options", status); return status; } @@ -616,7 +642,7 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, status = get_key(sess, tdata->pool, tdata->msg, &tdata->auth_key); if (status != PJ_SUCCESS) { pj_stun_msg_destroy_tdata(sess, tdata); - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); LOG_ERR_(sess, "Error getting creadential's key", status); return status; } @@ -628,7 +654,7 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, &tdata->pkt_size); if (status != PJ_SUCCESS) { pj_stun_msg_destroy_tdata(sess, tdata); - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); LOG_ERR_(sess, "STUN encode() error", status); return status; } @@ -656,7 +682,7 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, tdata->pkt, tdata->pkt_size); if (status != PJ_SUCCESS && status != PJ_EPENDING) { pj_stun_msg_destroy_tdata(sess, tdata); - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); LOG_ERR_(sess, "Error sending STUN request", status); return status; } @@ -684,7 +710,7 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, &timeout); if (status != PJ_SUCCESS) { pj_stun_msg_destroy_tdata(sess, tdata); - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); LOG_ERR_(sess, "Error scheduling response timer", status); return status; } @@ -707,7 +733,7 @@ PJ_DEF(pj_status_t) pj_stun_session_send_msg( pj_stun_session *sess, } - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); return status; } @@ -749,7 +775,7 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess, PJ_ASSERT_RETURN(!notify || notify_status!=PJ_SUCCESS, PJ_EINVAL); PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL); - pj_mutex_lock(sess->mutex); + pj_lock_acquire(sess->lock); if (notify) { (sess->cb.on_request_complete)(sess, notify_status, tdata, NULL, @@ -759,7 +785,7 @@ PJ_DEF(pj_status_t) pj_stun_session_cancel_req( pj_stun_session *sess, /* Just destroy tdata. This will destroy the transaction as well */ pj_stun_msg_destroy_tdata(sess, tdata); - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); return PJ_SUCCESS; } @@ -774,11 +800,11 @@ PJ_DEF(pj_status_t) pj_stun_session_retransmit_req(pj_stun_session *sess, PJ_ASSERT_RETURN(sess && tdata, PJ_EINVAL); PJ_ASSERT_RETURN(PJ_STUN_IS_REQUEST(tdata->msg->hdr.type), PJ_EINVAL); - pj_mutex_lock(sess->mutex); + pj_lock_acquire(sess->lock); status = pj_stun_client_tsx_retransmit(tdata->client_tsx); - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); return status; } @@ -1053,7 +1079,7 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess, pj_ntohs(((pj_sockaddr_in*)src_addr)->sin_port), pj_stun_msg_dump(msg, dump, PJ_STUN_MAX_PKT_LEN, NULL))); - pj_mutex_lock(sess->mutex); + pj_lock_acquire(sess->lock); /* For requests, check if we have cached response */ status = check_cached_response(sess, tmp_pool, msg, @@ -1088,7 +1114,7 @@ PJ_DEF(pj_status_t) pj_stun_session_on_rx_pkt(pj_stun_session *sess, } on_return: - pj_mutex_unlock(sess->mutex); + pj_lock_release(sess->lock); pj_pool_release(tmp_pool); return status; diff --git a/pjnath/src/pjnath/turn_session.c b/pjnath/src/pjnath/turn_session.c index 9ce7290a..d09d7e42 100644 --- a/pjnath/src/pjnath/turn_session.c +++ b/pjnath/src/pjnath/turn_session.c @@ -17,30 +17,47 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include +#include #include #include #include #include +#include +#include #include +#include #include #include +#define MAX_SRV_CNT 4 +#define REFRESH_SEC_BEFORE 60 -enum state_t +static const char *state_names[] = { - STATE_NULL, - STATE_RESOLVING, - STATE_RESOLVED, - STATE_ALLOCATING, - STATE_READY + "Null", + "Resolving", + "Resolved", + "Allocating", + "Ready", + "Deallocating", + "Deallocated", + "Destroying" }; +enum timer_id_t +{ + TIMER_NONE, + TIMER_KEEP_ALIVE, + TIMER_DESTROY +}; + + struct peer { - unsigned ch_id; - pj_sockaddr peer_addr; + pj_uint16_t ch_id; + pj_bool_t bound; + pj_sockaddr addr; pj_time_val expiry; - pj_uint8_t tsx_id[12]; /* Pending ChannelBind request */ }; struct pj_turn_session @@ -48,20 +65,38 @@ struct pj_turn_session pj_pool_t *pool; const char *obj_name; pj_turn_session_cb cb; + void *user_data; + + pj_lock_t *lock; + int busy; - enum state_t state; + pj_turn_state_t state; + pj_bool_t pending_destroy; + pj_bool_t destroy_notified; pj_stun_session *stun; + unsigned lifetime; + int ka_interval; + pj_time_val expiry; + + pj_timer_heap_t *timer_heap; + pj_timer_entry timer; + pj_dns_async_query *dns_async; + pj_uint16_t default_port; - unsigned srv_addr_cnt; + pj_uint16_t af; + pj_turn_tp_type tp_type; + pj_uint16_t srv_addr_cnt; pj_sockaddr *srv_addr_list; pj_sockaddr *srv_addr; pj_bool_t pending_alloc; pj_turn_alloc_param alloc_param; + pj_hash_table_t *peer_table; + /* tx_pkt must be 16bit aligned */ pj_uint8_t tx_pkt[PJ_TURN_MAX_PKT_LEN]; @@ -72,6 +107,11 @@ struct pj_turn_session /* * Prototypes. */ +static void sess_shutdown(pj_turn_session *sess, + pj_bool_t notify, + pj_status_t status); +static void do_destroy(pj_turn_session *sess); +static void send_refresh(pj_turn_session *sess, int lifetime); static pj_status_t stun_on_send_msg(pj_stun_session *sess, const void *pkt, pj_size_t pkt_size, @@ -92,39 +132,78 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *sess, static void dns_srv_resolver_cb(void *user_data, pj_status_t status, const pj_dns_srv_record *rec); -static void dns_a_resolver_cb(void *user_data, - pj_status_t status, - pj_dns_parsed_packet *response); static struct peer *lookup_peer_by_addr(pj_turn_session *sess, const pj_sockaddr_t *addr, unsigned addr_len, - pj_bool_t update); + pj_bool_t update, + pj_bool_t bind_channel); static struct peer *lookup_peer_by_chnum(pj_turn_session *sess, - unsigned chnum); + pj_uint16_t chnum); +static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e); +/** + * Get TURN state name. + */ +PJ_DEF(const char*) pj_turn_state_name(pj_turn_state_t state) +{ + return state_names[state]; +} + /* * Create TURN client session. */ PJ_DEF(pj_status_t) pj_turn_session_create( pj_stun_config *cfg, + const char *name, + int af, + pj_turn_tp_type tp_type, const pj_turn_session_cb *cb, + void *user_data, + unsigned options, pj_turn_session **p_sess) { pj_pool_t *pool; pj_turn_session *sess; pj_stun_session_cb stun_cb; + pj_lock_t *null_lock; pj_status_t status; PJ_ASSERT_RETURN(cfg && cfg->pf && cb && p_sess, PJ_EINVAL); + PJ_ASSERT_RETURN(cb->on_send_pkt, PJ_EINVAL); + + PJ_UNUSED_ARG(options); + + if (name == NULL) + name = "turn%p"; /* Allocate and create TURN session */ - pool = pj_pool_create(cfg->pf, "turn%p", 1000, 1000, NULL); + pool = pj_pool_create(cfg->pf, name, 1000, 1000, NULL); sess = PJ_POOL_ZALLOC_T(pool, pj_turn_session); sess->pool = pool; sess->obj_name = pool->obj_name; + sess->timer_heap = cfg->timer_heap; + sess->af = (pj_uint16_t)af; + sess->tp_type = tp_type; + sess->ka_interval = PJ_TURN_KEEP_ALIVE_SEC; + sess->user_data = user_data; + /* Copy callback */ pj_memcpy(&sess->cb, cb, sizeof(*cb)); + /* Peer hash table */ + sess->peer_table = pj_hash_create(pool, PJ_TURN_PEER_HTABLE_SIZE); + + /* Session lock */ + status = pj_lock_create_recursive_mutex(pool, sess->obj_name, + &sess->lock); + if (status != PJ_SUCCESS) { + do_destroy(sess); + return status; + } + + /* Timer */ + pj_timer_entry_init(&sess->timer, TIMER_NONE, sess, &on_timer_event); + /* Create STUN session */ pj_bzero(&stun_cb, sizeof(stun_cb)); stun_cb.on_send_msg = &stun_on_send_msg; @@ -133,34 +212,178 @@ PJ_DEF(pj_status_t) pj_turn_session_create( pj_stun_config *cfg, status = pj_stun_session_create(cfg, sess->obj_name, &stun_cb, PJ_FALSE, &sess->stun); if (status != PJ_SUCCESS) { - pj_turn_session_destroy(sess); + do_destroy(sess); + return status; + } + + /* Replace mutex in STUN session with a NULL mutex, since access to + * STUN session is serialized. + */ + status = pj_lock_create_null_mutex(pool, name, &null_lock); + if (status != PJ_SUCCESS) { + do_destroy(sess); return status; } + pj_stun_session_set_lock(sess->stun, null_lock, PJ_TRUE); + + /* Done */ + + PJ_LOG(4,(sess->obj_name, "TURN client session created")); - /* Done for now */ *p_sess = sess; return PJ_SUCCESS; } +/* Destroy */ +static void do_destroy(pj_turn_session *sess) +{ + /* Lock session */ + if (sess->lock) { + pj_lock_acquire(sess->lock); + } + + /* Cancel pending timer, if any */ + if (sess->timer.id != TIMER_NONE) { + pj_timer_heap_cancel(sess->timer_heap, &sess->timer); + sess->timer.id = TIMER_NONE; + } + + /* Destroy STUN session */ + if (sess->stun) { + pj_stun_session_destroy(sess->stun); + sess->stun = NULL; + } + + /* Destroy lock */ + if (sess->lock) { + pj_lock_release(sess->lock); + pj_lock_destroy(sess->lock); + sess->lock = NULL; + } + + /* Destroy pool */ + if (sess->pool) { + pj_pool_t *pool = sess->pool; + + PJ_LOG(4,(sess->obj_name, "TURN client session destroyed")); + + sess->pool = NULL; + pj_pool_release(pool); + } +} + + +/* Set session state */ +static void set_state(pj_turn_session *sess, enum pj_turn_state_t state) +{ + pj_turn_state_t old_state = sess->state; + + PJ_LOG(4,(sess->obj_name, "State changed %s --> %s", + state_names[old_state], state_names[state])); + sess->state = state; + + if (sess->cb.on_state) { + (*sess->cb.on_state)(sess, old_state, state); + } +} + +/* + * Notify application and shutdown the TURN session. + */ +static void sess_shutdown(pj_turn_session *sess, + pj_bool_t notify, + pj_status_t status) +{ + pj_bool_t can_destroy = PJ_TRUE; + + PJ_UNUSED_ARG(notify); + + PJ_LOG(4,(sess->obj_name, "Request to shutdown in state %s, cause:%d", + state_names[sess->state], status)); + + switch (sess->state) { + case PJ_TURN_STATE_NULL: + break; + case PJ_TURN_STATE_RESOLVING: + pj_assert(sess->dns_async != NULL); + pj_dns_resolver_cancel_query(sess->dns_async, PJ_FALSE); + sess->dns_async = NULL; + break; + case PJ_TURN_STATE_RESOLVED: + break; + case PJ_TURN_STATE_ALLOCATING: + /* We need to wait until allocation complete */ + sess->pending_destroy = PJ_TRUE; + can_destroy = PJ_FALSE; + break; + case PJ_TURN_STATE_READY: + /* Send REFRESH with LIFETIME=0 */ + can_destroy = PJ_FALSE; + sess->pending_destroy = PJ_TRUE; + break; + case PJ_TURN_STATE_DEALLOCATING: + can_destroy = PJ_FALSE; + /* This may recursively call this function again with + * state==PJ_TURN_STATE_DEALLOCATED. + */ + send_refresh(sess, 0); + break; + case PJ_TURN_STATE_DEALLOCATED: + break; + } + + if (can_destroy) { + /* Schedule destroy */ + pj_time_val delay = {0, 0}; + + if (sess->timer.id != TIMER_NONE) { + pj_timer_heap_cancel(sess->timer_heap, &sess->timer); + sess->timer.id = TIMER_NONE; + } + + set_state(sess, PJ_TURN_STATE_DESTROYING); + + sess->timer.id = TIMER_DESTROY; + pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay); + } +} + + /* - * Destroy TURN client session. + * Public API to destroy TURN client session. */ PJ_DEF(pj_status_t) pj_turn_session_destroy(pj_turn_session *sess) { PJ_ASSERT_RETURN(sess, PJ_EINVAL); - /* TODO */ + pj_lock_acquire(sess->lock); + + sess_shutdown(sess, PJ_FALSE, PJ_SUCCESS); + + pj_lock_release(sess->lock); + + return PJ_SUCCESS; } /* - * Notify application and destroy the TURN session. + * Re-assign user data. + */ +PJ_DEF(pj_status_t) pj_turn_session_set_user_data( pj_turn_session *sess, + void *user_data) +{ + sess->user_data = user_data; + return PJ_SUCCESS; +} + + +/** + * Retrieve user data. */ -static void destroy(pj_turn_session *sess, - pj_bool_t notify, - pj_status_t status) +PJ_DEF(void*) pj_turn_session_get_user_data(pj_turn_session *sess) { + return sess->user_data; } @@ -169,67 +392,90 @@ static void destroy(pj_turn_session *sess, */ PJ_DEF(pj_status_t) pj_turn_session_set_server( pj_turn_session *sess, const pj_str_t *domain, - const pj_str_t *res_name, int default_port, pj_dns_resolver *resolver) { pj_status_t status; PJ_ASSERT_RETURN(sess && domain, PJ_EINVAL); + PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_NULL, PJ_EINVALIDOP); + + pj_lock_acquire(sess->lock); - if (res_name) { - /* res_name is specified, resolve with DNS SRV resolution. - * Resolver must be specified in this case. + if (resolver) { + /* Resolve with DNS SRV resolution, and fallback to DNS A resolution + * if default_port is specified. */ - PJ_ASSERT_RETURN(resolver, PJ_EINVAL); - - sess->state = STATE_RESOLVING; - status = pj_dns_srv_resolve(domain, res_name, default_port, sess->pool, - resolver, PJ_DNS_SRV_FALLBACK_A, sess, - &dns_srv_resolver_cb, &sess->dns_async); - if (status != PJ_SUCCESS) { - sess->state = STATE_NULL; - return status; + unsigned opt = 0; + pj_str_t res_name; + + switch (sess->tp_type) { + case PJ_TURN_TP_UDP: + res_name = pj_str("_turn._udp."); + break; + case PJ_TURN_TP_TCP: + res_name = pj_str("_turn._tcp."); + break; + case PJ_TURN_TP_TLS: + res_name = pj_str("_turns._tcp."); + break; + default: + status = PJNATH_ETURNINTP; + goto on_return; } - } else if (resolver) { - /* res_name is not specified, but resolver is specified. - * Resolve domain as a hostname with DNS A resolution. - */ - sess->state = STATE_RESOLVING; - status = pj_dns_resolver_start_query(resolver, domain, PJ_DNS_TYPE_A, - 0, &dns_a_resolver_cb, - sess, &sess->dns_async); + /* Fallback to DNS A only if default port is specified */ + if (default_port>0 && default_port<65536) { + opt = PJ_DNS_SRV_FALLBACK_A; + sess->default_port = (pj_uint16_t)default_port; + } + + set_state(sess, PJ_TURN_STATE_RESOLVING); + status = pj_dns_srv_resolve(domain, &res_name, default_port, + sess->pool, resolver, opt, sess, + &dns_srv_resolver_cb, &sess->dns_async); if (status != PJ_SUCCESS) { - sess->state = STATE_NULL; - return status; + set_state(sess, PJ_TURN_STATE_NULL); + goto on_return; } } else { - /* Both res_name and resolver is not specified. - * Resolve with standard gethostbyname() + /* Resolver is not specified, resolve with standard gethostbyname(). + * The default_port MUST be specified in this case. */ - pj_addrinfo ai[3]; - unsigned i, cnt = PJ_ARRAY_SIZE(ai); + pj_addrinfo *ai; + unsigned i, cnt; + + /* Default port must be specified */ + PJ_ASSERT_RETURN(default_port>0 && default_port<65536, PJ_EINVAL); + sess->default_port = (pj_uint16_t)default_port; - status = pj_getaddrinfo(pj_AF_INET(), domain, &cnt, ai); + cnt = MAX_SRV_CNT; + ai = (pj_addrinfo*) + pj_pool_calloc(sess->pool, cnt, sizeof(pj_addrinfo)); + + status = pj_getaddrinfo(sess->af, domain, &cnt, ai); if (status != PJ_SUCCESS) - return status; + goto on_return; - sess->srv_addr_cnt = cnt; + sess->srv_addr_cnt = (pj_uint16_t)cnt; sess->srv_addr_list = (pj_sockaddr*) pj_pool_calloc(sess->pool, cnt, sizeof(pj_sockaddr)); for (i=0; isrv_addr_list[i], &ai[i].ai_addr, - sizeof(pj_sockaddr)); + pj_sockaddr *addr = &sess->srv_addr_list[i]; + pj_memcpy(addr, &ai[i].ai_addr, sizeof(pj_sockaddr)); + addr->addr.sa_family = sess->af; + addr->ipv4.sin_port = pj_htons(sess->default_port); } sess->srv_addr = &sess->srv_addr_list[0]; - sess->state = STATE_RESOLVED; + set_state(sess, PJ_TURN_STATE_RESOLVED); } - return PJ_SUCCESS; +on_return: + pj_lock_release(sess->lock); + return status; } @@ -240,7 +486,13 @@ PJ_DEF(pj_status_t) pj_turn_session_set_cred(pj_turn_session *sess, const pj_stun_auth_cred *cred) { PJ_ASSERT_RETURN(sess && cred, PJ_EINVAL); + + pj_lock_acquire(sess->lock); + pj_stun_session_set_credential(sess->stun, cred); + + pj_lock_release(sess->lock); + return PJ_SUCCESS; } @@ -255,24 +507,34 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, pj_status_t status; PJ_ASSERT_RETURN(sess, PJ_EINVAL); - PJ_ASSERT_RETURN(sess->state <= STATE_RESOLVED, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(sess->state>PJ_TURN_STATE_NULL && sess->state<=PJ_TURN_STATE_RESOLVED, + PJ_EINVALIDOP); - if (sess->state < STATE_RESOLVED) { - if (param) + pj_lock_acquire(sess->lock); + + if (sess->state < PJ_TURN_STATE_RESOLVED) { + if (param && param != &sess->alloc_param) pj_memcpy(&sess->alloc_param, param, sizeof(*param)); sess->pending_alloc = PJ_TRUE; + + PJ_LOG(4,(sess->obj_name, "Pending ALLOCATE in state %s", + state_names[sess->state])); + + pj_lock_release(sess->lock); return PJ_SUCCESS; } /* Ready to allocate */ - pj_assert(sess->state == STATE_RESOLVED); + pj_assert(sess->state == PJ_TURN_STATE_RESOLVED); /* Create a bare request */ status = pj_stun_session_create_req(sess->stun, PJ_STUN_ALLOCATE_REQUEST, PJ_STUN_MAGIC, NULL, &tdata); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + pj_lock_release(sess->lock); return status; + } /* MUST include REQUESTED-TRANSPORT attribute */ pj_stun_msg_add_uint_attr(tdata->pool, tdata->msg, @@ -293,55 +555,108 @@ PJ_DEF(pj_status_t) pj_turn_session_alloc(pj_turn_session *sess, sess->alloc_param.lifetime); } - /* Select server address */ + /* Server address must be set */ pj_assert(sess->srv_addr != NULL); /* Send request */ - sess->state = STATE_ALLOCATING; + set_state(sess, PJ_TURN_STATE_ALLOCATING); status = pj_stun_session_send_msg(sess->stun, PJ_FALSE, sess->srv_addr, pj_sockaddr_get_len(sess->srv_addr), tdata); if (status != PJ_SUCCESS) { - sess->state = STATE_RESOLVED; + /* Set state back to RESOLVED. We don't want to destroy session now, + * let the application do it if it wants to. + */ + set_state(sess, PJ_TURN_STATE_RESOLVED); } + pj_lock_release(sess->lock); return status; } +/* + * Send REFRESH + */ +static void send_refresh(pj_turn_session *sess, int lifetime) +{ + pj_stun_tx_data *tdata; + pj_status_t status; + + PJ_ASSERT_ON_FAIL(sess->state==PJ_TURN_STATE_READY, return); + + /* Create a bare REFRESH request */ + status = pj_stun_session_create_req(sess->stun, PJ_STUN_REFRESH_REQUEST, + PJ_STUN_MAGIC, NULL, &tdata); + if (status != PJ_SUCCESS) + goto on_error; + + /* Add LIFETIME */ + if (lifetime >= 0) { + pj_stun_msg_add_uint_attr(tdata->pool, tdata->msg, + PJ_STUN_ATTR_LIFETIME, lifetime); + } + + /* Send request */ + if (lifetime == 0) { + set_state(sess, PJ_TURN_STATE_DEALLOCATING); + } + + status = pj_stun_session_send_msg(sess->stun, PJ_FALSE, sess->srv_addr, + pj_sockaddr_get_len(sess->srv_addr), + tdata); + if (status != PJ_SUCCESS) + goto on_error; + + return; + +on_error: + if (lifetime == 0) { + set_state(sess, PJ_TURN_STATE_DEALLOCATED); + sess_shutdown(sess, PJ_FALSE, status); + } +} + + /** * Relay data to the specified peer through the session. */ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, const pj_uint8_t *pkt, unsigned pkt_len, - const pj_sockaddr_t *peer_addr, + const pj_sockaddr_t *addr, unsigned addr_len) { struct peer *peer; + pj_status_t status; - PJ_ASSERT_RETURN(sess && pkt && pkt_len && peer_addr && addr_len, + PJ_ASSERT_RETURN(sess && pkt && pkt_len && addr && addr_len, PJ_EINVAL); /* Return error if we're not ready */ - if (sess->state != STATE_READY) { + if (sess->state != PJ_TURN_STATE_READY) { return PJ_EIGNORED; } + /* Lock session now */ + pj_lock_acquire(sess->lock); + /* Lookup peer to see whether we've assigned a channel number * to this peer. */ - peer = lookup_peer_by_addr(sess, peer_addr, addr_len, PJ_TRUE); + peer = lookup_peer_by_addr(sess, addr, addr_len, PJ_TRUE, PJ_FALSE); pj_assert(peer != NULL); - if (peer->ch_id != PJ_TURN_INVALID_CHANNEL) { + if (peer->ch_id != PJ_TURN_INVALID_CHANNEL && peer->bound) { /* Peer is assigned Channel number, we can use ChannelData */ pj_turn_channel_data *cd = (pj_turn_channel_data*)sess->tx_pkt; pj_assert(sizeof(*cd)==4); - if (pkt_len > sizeof(sess->tx_pkt)-sizeof(*cd)) - return PJ_ETOOBIG; + if (pkt_len > sizeof(sess->tx_pkt)-sizeof(*cd)) { + status = PJ_ETOOBIG; + goto on_return; + } cd->ch_number = pj_htons((pj_uint16_t)peer->ch_id); cd->length = pj_htons((pj_uint16_t)pkt_len); @@ -349,37 +664,40 @@ PJ_DEF(pj_status_t) pj_turn_session_sendto( pj_turn_session *sess, pj_assert(sess->srv_addr != NULL); - return sess->cb.on_send_pkt(sess, sess->tx_pkt, pkt_len+sizeof(*cd), - sess->srv_addr, - pj_sockaddr_get_len(sess->srv_addr)); + status = sess->cb.on_send_pkt(sess, sess->tx_pkt, pkt_len+sizeof(*cd), + sess->srv_addr, + pj_sockaddr_get_len(sess->srv_addr)); } else { /* Peer has not been assigned Channel number, must use Send * Indication. */ pj_stun_tx_data *tdata; - pj_status_t status; /* Create blank SEND-INDICATION */ status = pj_stun_session_create_ind(sess->stun, PJ_STUN_SEND_INDICATION, &tdata); if (status != PJ_SUCCESS) - return status; + goto on_return; /* Add PEER-ADDRESS */ pj_stun_msg_add_sockaddr_attr(tdata->pool, tdata->msg, PJ_STUN_ATTR_PEER_ADDR, PJ_TRUE, - peer_addr, addr_len); + addr, addr_len); /* Add DATA attribute */ pj_stun_msg_add_binary_attr(tdata->pool, tdata->msg, PJ_STUN_ATTR_DATA, pkt, pkt_len); /* Send the indication */ - return pj_stun_session_send_msg(sess->stun, PJ_FALSE, sess->srv_addr, - pj_sockaddr_get_len(sess->srv_addr), - tdata); + status = pj_stun_session_send_msg(sess->stun, PJ_FALSE, sess->srv_addr, + pj_sockaddr_get_len(sess->srv_addr), + tdata); } + +on_return: + pj_lock_release(sess->lock); + return status; } @@ -392,27 +710,37 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess, { struct peer *peer; pj_stun_tx_data *tdata; - unsigned ch_num; + pj_uint16_t ch_num; pj_status_t status; - PJ_ASSERT_RETURN(sess && peer && addr_len, PJ_EINVAL); + PJ_ASSERT_RETURN(sess && peer_adr && addr_len, PJ_EINVAL); + PJ_ASSERT_RETURN(sess->state == PJ_TURN_STATE_READY, PJ_EINVALIDOP); + + pj_lock_acquire(sess->lock); /* Create blank ChannelBind request */ status = pj_stun_session_create_req(sess->stun, PJ_STUN_CHANNEL_BIND_REQUEST, PJ_STUN_MAGIC, NULL, &tdata); if (status != PJ_SUCCESS) - return status; + goto on_return; /* Lookup peer */ - peer = lookup_peer_by_addr(sess, peer_adr, addr_len, PJ_TRUE); + peer = lookup_peer_by_addr(sess, peer_adr, addr_len, PJ_TRUE, PJ_FALSE); pj_assert(peer); + /* Associate peer data structure with tdata for future reference + * when we receive the ChannelBind response. + */ + tdata->user_data = peer; + if (peer->ch_id != PJ_TURN_INVALID_CHANNEL) { + /* Channel is already bound. This is a refresh request. */ ch_num = peer->ch_id; } else { - PJ_ASSERT_RETURN(sess->next_ch <= PJ_TURN_CHANNEL_MAX, PJ_ETOOMANY); - ch_num = sess->next_ch++; + PJ_ASSERT_ON_FAIL(sess->next_ch <= PJ_TURN_CHANNEL_MAX, + {status=PJ_ETOOMANY; goto on_return;}); + peer->ch_id = ch_num = sess->next_ch++; } /* Add CHANNEL-NUMBER attribute */ @@ -425,13 +753,14 @@ PJ_DEF(pj_status_t) pj_turn_session_bind_channel(pj_turn_session *sess, PJ_STUN_ATTR_PEER_ADDR, PJ_TRUE, peer_adr, addr_len); - /* Save transaction ID to peer */ - pj_memcpy(peer->tsx_id, tdata->msg->hdr.tsx_id, sizeof(peer->tsx_id)); - /* Send the request */ - return pj_stun_session_send_msg(sess->stun, PJ_FALSE, sess->srv_addr, - pj_sockaddr_get_len(sess->srv_addr), - tdata); + status = pj_stun_session_send_msg(sess->stun, PJ_FALSE, sess->srv_addr, + pj_sockaddr_get_len(sess->srv_addr), + tdata); + +on_return: + pj_lock_release(sess->lock); + return status; } @@ -445,10 +774,15 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess, pj_bool_t is_datagram) { pj_bool_t is_stun; + pj_status_t status; /* Packet could be ChannelData or STUN message (response or * indication). */ + + /* Start locking the session */ + pj_lock_acquire(sess->lock); + /* Quickly check if this is STUN message */ is_stun = ((pkt[0] & 0xC0) == 0); @@ -459,30 +793,54 @@ PJ_DEF(pj_status_t) pj_turn_session_on_rx_pkt(pj_turn_session *sess, options = PJ_STUN_CHECK_PACKET; if (is_datagram) options |= PJ_STUN_IS_DATAGRAM; - return pj_stun_session_on_rx_pkt(sess->stun, pkt, pkt_len, + status=pj_stun_session_on_rx_pkt(sess->stun, pkt, pkt_len, options, NULL, sess->srv_addr, pj_sockaddr_get_len(sess->srv_addr)); - } else { - /* This must be ChannelData */ + + } else if (sess->cb.on_rx_data) { + + /* This must be ChannelData. Only makes sense when on_rx_data() is + * implemented by application. + */ pj_turn_channel_data cd; struct peer *peer; + PJ_ASSERT_RETURN(pkt_len >= 4, PJ_ETOOSMALL); + /* Lookup peer */ pj_memcpy(&cd, pkt, sizeof(pj_turn_channel_data)); - peer = lookup_peer_by_chnum(sess, pj_ntohs(cd.ch_number)); - if (!peer) - return PJ_ENOTFOUND; + cd.ch_number = pj_ntohs(cd.ch_number); + cd.length = pj_ntohs(cd.length); + peer = lookup_peer_by_chnum(sess, cd.ch_number); + if (!peer || !peer->bound) { + status = PJ_ENOTFOUND; + goto on_return; + } - /* Notify application */ - if (sess->cb.on_rx_data) { - (*sess->cb.on_rx_data)(sess, pkt+sizeof(cd), pj_ntohs(cd.length), - &peer->peer_addr, - pj_sockaddr_get_len(&peer->peer_addr)); + /* Check that size is correct, for UDP */ + if (pkt_len < cd.length+sizeof(cd)) { + status = PJ_ETOOSMALL; + goto on_return; } - return PJ_SUCCESS; + /* Notify application */ + (*sess->cb.on_rx_data)(sess, pkt+sizeof(cd), cd.length, + &peer->addr, + pj_sockaddr_get_len(&peer->addr)); + + status = PJ_SUCCESS; + + } else { + /* This is ChannelData and application doesn't implement + * on_rx_data() callback. Just ignore the packet. + */ + status = PJ_SUCCESS; } + +on_return: + pj_lock_release(sess->lock); + return status; } @@ -503,6 +861,156 @@ static pj_status_t stun_on_send_msg(pj_stun_session *stun, } +/* + * Handle failed ALLOCATE or REFRESH request. This may switch to alternate + * server if we have one. + */ +static void on_session_fail( pj_turn_session *sess, + enum pj_stun_method_e method, + pj_status_t status, + const pj_str_t *reason) +{ + do { + pj_str_t reason1; + char err_msg[PJ_ERR_MSG_SIZE]; + + if (reason == NULL) { + pj_strerror(status, err_msg, sizeof(err_msg)); + reason1 = pj_str(err_msg); + reason = &reason1; + } + + PJ_LOG(4,(sess->obj_name, "%s error: %.*s", + pj_stun_get_method_name(method), + (int)reason->slen, reason->ptr)); + + /* If this is ALLOCATE response and we don't have more server + * addresses to try, notify application and destroy the TURN + * session. + */ + if (method==PJ_STUN_ALLOCATE_METHOD && + sess->srv_addr == &sess->srv_addr_list[sess->srv_addr_cnt-1]) + { + + set_state(sess, PJ_TURN_STATE_DEALLOCATED); + sess_shutdown(sess, PJ_TRUE, status); + return; + } + + /* Otherwise if this is REFRESH response, notify application + * that session has been TERMINATED. + */ + if (method==PJ_STUN_REFRESH_METHOD) { + set_state(sess, PJ_TURN_STATE_DEALLOCATED); + sess_shutdown(sess, PJ_TRUE, status); + return; + } + + /* Try next server */ + ++sess->srv_addr; + reason = NULL; + + PJ_LOG(4,(sess->obj_name, "Trying next server")); + + status = pj_turn_session_alloc(sess, NULL); + + } while (status != PJ_SUCCESS); +} + + +/* + * Handle successful response to ALLOCATE or REFRESH request. + */ +static void on_allocate_success(pj_turn_session *sess, + enum pj_stun_method_e method, + const pj_stun_msg *msg) +{ + const pj_stun_lifetime_attr *lf_attr; + const pj_stun_relay_addr_attr *raddr_attr; + pj_str_t s; + pj_time_val timeout; + + /* Must have LIFETIME attribute */ + lf_attr = (const pj_stun_lifetime_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_LIFETIME, 0); + if (lf_attr == NULL) { + on_session_fail(sess, method, PJNATH_EINSTUNMSG, + pj_cstr(&s, "Error: Missing LIFETIME attribute")); + return; + } + + /* If LIFETIME is zero, this is a deallocation */ + if (lf_attr->value == 0) { + pj_bool_t notify = sess->state < PJ_TURN_STATE_DEALLOCATING; + set_state(sess, PJ_TURN_STATE_DEALLOCATED); + sess_shutdown(sess, notify, PJ_SUCCESS); + return; + } + + /* Update lifetime and keep-alive interval */ + sess->lifetime = lf_attr->value; + pj_gettimeofday(&sess->expiry); + + if (sess->lifetime < PJ_TURN_KEEP_ALIVE_SEC) { + if (sess->lifetime <= 2) { + on_session_fail(sess, method, PJ_ETOOSMALL, + pj_cstr(&s, "Error: LIFETIME too small")); + return; + } + sess->ka_interval = sess->lifetime - 2; + sess->expiry.sec += (sess->ka_interval-1); + } else { + int timeout; + + sess->ka_interval = PJ_TURN_KEEP_ALIVE_SEC; + + timeout = sess->lifetime - PJ_TURN_REFRESH_SEC_BEFORE; + if (timeout < sess->ka_interval) + timeout = sess->ka_interval - 1; + + sess->expiry.sec += timeout; + } + + /* Check that relayed transport address contains correct + * address family. + */ + raddr_attr = (const pj_stun_relay_addr_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_RELAY_ADDR, 0); + if (raddr_attr == NULL && method==PJ_STUN_ALLOCATE_METHOD) { + on_session_fail(sess, method, PJNATH_EINSTUNMSG, + pj_cstr(&s, "Error: Received ALLOCATE without " + "RELAY-ADDRESS attribute")); + return; + } + if (raddr_attr && raddr_attr->sockaddr.addr.sa_family != sess->af) { + on_session_fail(sess, method, PJNATH_EINSTUNMSG, + pj_cstr(&s, "Error: RELAY-ADDRESS with non IPv4" + " address family is not supported " + "for now")); + return; + } + + + /* Success */ + + /* Cancel existing keep-alive timer, if any */ + pj_assert(sess->timer.id != TIMER_DESTROY); + + if (sess->timer.id != TIMER_NONE) { + pj_timer_heap_cancel(sess->timer_heap, &sess->timer); + sess->timer.id = TIMER_NONE; + } + + /* Start keep-alive timer once allocation succeeds */ + timeout.sec = sess->ka_interval; + timeout.msec = 0; + + sess->timer.id = TIMER_KEEP_ALIVE; + pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &timeout); + + set_state(sess, PJ_TURN_STATE_READY); +} + /* * Notification from STUN session on request completion. */ @@ -514,28 +1022,106 @@ static void stun_on_request_complete(pj_stun_session *stun, unsigned src_addr_len) { pj_turn_session *sess; - int method = PJ_STUN_GET_METHOD(response->hdr.type); + int method = PJ_STUN_GET_METHOD(tdata->msg->hdr.type); + + PJ_UNUSED_ARG(src_addr); + PJ_UNUSED_ARG(src_addr_len); sess = (pj_turn_session*)pj_stun_session_get_user_data(stun); if (method == PJ_STUN_ALLOCATE_METHOD) { /* Handle ALLOCATE response */ - if (PJ_STUN_IS_SUCCESS_RESPONSE(response->hdr.type)) { + if (status==PJ_SUCCESS && + PJ_STUN_IS_SUCCESS_RESPONSE(response->hdr.type)) + { + /* Successful Allocate response */ + on_allocate_success(sess, method, response); } else { - /* Error Allocate response */ + /* Failed Allocate request */ + const pj_str_t *err_msg = NULL; + + if (status == PJ_SUCCESS) { + const pj_stun_errcode_attr *err_attr; + err_attr = (const pj_stun_errcode_attr*) + pj_stun_msg_find_attr(response, + PJ_STUN_ATTR_ERROR_CODE, 0); + if (err_attr) { + status = PJ_STATUS_FROM_STUN_CODE(err_attr->err_code); + err_msg = &err_attr->reason; + } else { + status = PJNATH_EINSTUNMSG; + } + } + + on_session_fail(sess, method, status, err_msg); + } + + } else if (method == PJ_STUN_REFRESH_METHOD) { + /* Handle Refresh response */ + if (status==PJ_SUCCESS && + PJ_STUN_IS_SUCCESS_RESPONSE(response->hdr.type)) + { + /* Success, schedule next refresh. */ + on_allocate_success(sess, method, response); + } else { + /* Failed Refresh request */ + const pj_str_t *err_msg = NULL; + + if (status == PJ_SUCCESS) { + const pj_stun_errcode_attr *err_attr; + err_attr = (const pj_stun_errcode_attr*) + pj_stun_msg_find_attr(response, + PJ_STUN_ATTR_ERROR_CODE, 0); + if (err_attr) { + status = PJ_STATUS_FROM_STUN_CODE(err_attr->err_code); + err_msg = &err_attr->reason; + } else { + status = PJNATH_EINSTUNMSG; + } + } + + /* Notify and destroy */ + on_session_fail(sess, method, status, err_msg); } } else if (method == PJ_STUN_CHANNEL_BIND_METHOD) { /* Handle ChannelBind response */ - if (PJ_STUN_IS_SUCCESS_RESPONSE(response->hdr.type)) { + if (status==PJ_SUCCESS && + PJ_STUN_IS_SUCCESS_RESPONSE(response->hdr.type)) + { /* Successful ChannelBind response */ + struct peer *peer = (struct peer*)tdata->user_data; - } else { - /* Error ChannelBind response */ + pj_assert(peer->ch_id != PJ_TURN_INVALID_CHANNEL); + peer->bound = PJ_TRUE; + /* Update hash table */ + lookup_peer_by_addr(sess, &peer->addr, + pj_sockaddr_get_len(&peer->addr), + PJ_TRUE, PJ_TRUE); + + } else { + /* Failed ChannelBind response */ + pj_str_t err_msg = {"", 0}; + + if (status == PJ_SUCCESS) { + const pj_stun_errcode_attr *err_attr; + err_attr = (const pj_stun_errcode_attr*) + pj_stun_msg_find_attr(response, + PJ_STUN_ATTR_ERROR_CODE, 0); + if (err_attr) { + status = PJ_STATUS_FROM_STUN_CODE(err_attr->err_code); + err_msg = err_attr->reason; + } else { + status = PJNATH_EINSTUNMSG; + } + } + + PJ_LOG(4,(sess->obj_name, "ChannelBind failed: %.*s", + (int)err_msg.slen, err_msg.ptr)); } } else { @@ -560,6 +1146,9 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *stun, pj_stun_peer_addr_attr *peer_attr; pj_stun_data_attr *data_attr; + PJ_UNUSED_ARG(src_addr); + PJ_UNUSED_ARG(src_addr_len); + sess = (pj_turn_session*)pj_stun_session_get_user_data(stun); /* Expecting Data Indication only */ @@ -603,28 +1192,43 @@ static void dns_srv_resolver_cb(void *user_data, const pj_dns_srv_record *rec) { pj_turn_session *sess = (pj_turn_session*) user_data; + unsigned i, cnt; + + /* Clear async resolver */ + sess->dns_async = NULL; /* Check failure */ if (status != PJ_SUCCESS) { - destroy(sess, PJ_TRUE, status); + sess_shutdown(sess, PJ_TRUE, status); return; } /* Copy results to server entries */ + for (i=0, cnt=0; icount && cntentry[i].server.addr_count && cntsrv_addr[cnt].ipv4; - /* Run pending allocation */ -} + addr->sin_family = sess->af; + addr->sin_port = pj_htons(rec->entry[i].port); + addr->sin_addr.s_addr = rec->entry[i].server.addr[j].s_addr; + ++cnt; + } + } + sess->srv_addr_cnt = (pj_uint16_t)cnt; -/* - * Notification on completion of DNS A resolution. - */ -static void dns_a_resolver_cb(void *user_data, - pj_status_t status, - pj_dns_parsed_packet *response) -{ + /* Set current server */ + sess->srv_addr = &sess->srv_addr[0]; + + /* Set state to PJ_TURN_STATE_RESOLVED */ + set_state(sess, PJ_TURN_STATE_RESOLVED); + + /* Run pending allocation */ + if (sess->pending_alloc) { + pj_turn_session_alloc(sess, NULL); + } } @@ -634,8 +1238,43 @@ static void dns_a_resolver_cb(void *user_data, static struct peer *lookup_peer_by_addr(pj_turn_session *sess, const pj_sockaddr_t *addr, unsigned addr_len, - pj_bool_t update) + pj_bool_t update, + pj_bool_t bind_channel) { + unsigned hval = 0; + struct peer *peer; + + peer = (struct peer*) pj_hash_get(sess->peer_table, addr, addr_len, &hval); + if (peer == NULL && update) { + peer = PJ_POOL_ZALLOC_T(sess->pool, struct peer); + peer->ch_id = PJ_TURN_INVALID_CHANNEL; + pj_memcpy(&peer->addr, addr, addr_len); + + /* Register by peer address */ + pj_hash_set(sess->pool, sess->peer_table, &peer->addr, addr_len, + hval, peer); + } + + if (peer && update) { + pj_gettimeofday(&peer->expiry); + if (peer->bound) { + peer->expiry.sec += PJ_TURN_CHANNEL_TIMEOUT - 10; + } else { + peer->expiry.sec += PJ_TURN_PERM_TIMEOUT - 10; + } + + if (bind_channel) { + /* Register by channel number */ + pj_assert(peer->ch_id != PJ_TURN_INVALID_CHANNEL && peer->bound); + pj_assert(pj_hash_get(sess->peer_table, &peer->ch_id, + sizeof(peer->ch_id), NULL)==0); + + pj_hash_set(sess->pool, sess->peer_table, &peer->ch_id, + sizeof(peer->ch_id), 0, peer); + } + } + + return peer; } @@ -643,8 +1282,104 @@ static struct peer *lookup_peer_by_addr(pj_turn_session *sess, * Lookup peer descriptor from its channel number. */ static struct peer *lookup_peer_by_chnum(pj_turn_session *sess, - unsigned chnum) + pj_uint16_t chnum) { + return (struct peer*) pj_hash_get(sess->peer_table, &chnum, + sizeof(chnum), NULL); } +/* + * Timer event. + */ +static void on_timer_event(pj_timer_heap_t *th, pj_timer_entry *e) +{ + pj_turn_session *sess = (pj_turn_session*)e->user_data; + enum timer_id_t eid; + + PJ_UNUSED_ARG(th); + + pj_lock_acquire(sess->lock); + + eid = e->id; + e->id = TIMER_NONE; + + if (eid == TIMER_KEEP_ALIVE) { + pj_time_val now; + pj_hash_iterator_t itbuf, *it; + pj_bool_t resched = PJ_TRUE; + pj_bool_t pkt_sent = PJ_FALSE; + + pj_gettimeofday(&now); + + /* Refresh allocation if it's time to do so */ + if (PJ_TIME_VAL_LTE(sess->expiry, now)) { + int lifetime = sess->alloc_param.lifetime; + + if (lifetime == 0) + lifetime = -1; + + send_refresh(sess, lifetime); + resched = PJ_FALSE; + pkt_sent = PJ_TRUE; + } + + /* Scan hash table to refresh bound channels */ + it = pj_hash_first(sess->peer_table, &itbuf); + while (it) { + struct peer *peer = (struct peer*) + pj_hash_this(sess->peer_table, it); + if (peer->bound && PJ_TIME_VAL_LTE(peer->expiry, now)) { + + /* Send ChannelBind to refresh channel binding and + * permission. + */ + pj_turn_session_bind_channel(sess, &peer->addr, + pj_sockaddr_get_len(&peer->addr)); + pkt_sent = PJ_TRUE; + } + + it = pj_hash_next(sess->peer_table, it); + } + + /* If no packet is sent, send a blank Send indication to + * refresh local NAT. + */ + if (!pkt_sent && sess->alloc_param.ka_interval > 0) { + pj_stun_tx_data *tdata; + pj_status_t rc; + + /* Create blank SEND-INDICATION */ + rc = pj_stun_session_create_ind(sess->stun, + PJ_STUN_SEND_INDICATION, &tdata); + if (rc == PJ_SUCCESS) { + /* Add DATA attribute with zero length */ + pj_stun_msg_add_binary_attr(tdata->pool, tdata->msg, + PJ_STUN_ATTR_DATA, NULL, 0); + + /* Send the indication */ + pj_stun_session_send_msg(sess->stun, PJ_FALSE, sess->srv_addr, + pj_sockaddr_get_len(sess->srv_addr), + tdata); + } + } + + /* Reshcedule timer */ + if (resched) { + pj_time_val delay; + + delay.sec = sess->ka_interval; + delay.msec = 0; + + pj_timer_heap_schedule(sess->timer_heap, &sess->timer, &delay); + } + + pj_lock_release(sess->lock); + + } else if (eid == TIMER_DESTROY) { + /* Time to destroy */ + pj_lock_release(sess->lock); + do_destroy(sess); + } +} + diff --git a/pjnath/src/pjnath/turn_udp.c b/pjnath/src/pjnath/turn_udp.c new file mode 100644 index 00000000..7e3bd4c5 --- /dev/null +++ b/pjnath/src/pjnath/turn_udp.c @@ -0,0 +1,322 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2007 Benny Prijono + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include +#include +#include +#include +#include + +struct pj_turn_udp +{ + pj_pool_t *pool; + pj_turn_session *sess; + pj_turn_udp_cb cb; + void *user_data; + + pj_sock_t sock; + pj_ioqueue_key_t *key; + pj_ioqueue_op_key_t read_key; + pj_uint8_t pkt[PJ_TURN_MAX_PKT_LEN]; + pj_sockaddr src_addr; + int src_addr_len; +}; + + +/* + * Callback prototypes. + */ +static pj_status_t turn_on_send_pkt(pj_turn_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_sockaddr_t *dst_addr, + unsigned dst_addr_len); +static void turn_on_channel_bound(pj_turn_session *sess, + const pj_sockaddr_t *peer_addr, + unsigned addr_len, + unsigned ch_num); +static void turn_on_rx_data(pj_turn_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_sockaddr_t *peer_addr, + unsigned addr_len); +static void turn_on_state(pj_turn_session *sess, + pj_turn_state_t old_state, + pj_turn_state_t new_state); +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); + + +/* + * Create. + */ +PJ_DEF(pj_status_t) pj_turn_udp_create( pj_stun_config *cfg, + int af, + const pj_turn_udp_cb *cb, + unsigned options, + void *user_data, + pj_turn_udp **p_udp_rel) +{ + pj_turn_udp *udp_rel; + pj_turn_session_cb sess_cb; + pj_ioqueue_callback ioq_cb; + pj_pool_t *pool; + pj_status_t status; + + PJ_ASSERT_RETURN(cfg && p_udp_rel, PJ_EINVAL); + PJ_ASSERT_RETURN(options==0, PJ_EINVAL); + + /* Create and init basic data structure */ + pool = pj_pool_create(cfg->pf, "udprel%p", 1000, 1000, NULL); + udp_rel = PJ_POOL_ZALLOC_T(pool, pj_turn_udp); + udp_rel->pool = pool; + udp_rel->user_data = user_data; + + if (cb) { + pj_memcpy(&udp_rel->cb, cb, sizeof(*cb)); + } + + /* Init TURN session */ + pj_bzero(&sess_cb, sizeof(sess_cb)); + sess_cb.on_send_pkt = &turn_on_send_pkt; + sess_cb.on_channel_bound = &turn_on_channel_bound; + sess_cb.on_rx_data = &turn_on_rx_data; + sess_cb.on_state = &turn_on_state; + status = pj_turn_session_create(cfg, pool->obj_name, af, PJ_TURN_TP_UDP, + &sess_cb, udp_rel, 0, &udp_rel->sess); + if (status != PJ_SUCCESS) { + pj_turn_udp_destroy(udp_rel); + return status; + } + + /* Init socket */ + status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &udp_rel->sock); + if (status != PJ_SUCCESS) { + pj_turn_udp_destroy(udp_rel); + return status; + } + + /* Register to ioqeuue */ + pj_bzero(&ioq_cb, sizeof(ioq_cb)); + ioq_cb.on_read_complete = &on_read_complete; + status = pj_ioqueue_register_sock(udp_rel->pool, cfg->ioqueue, + udp_rel->sock, udp_rel, + &ioq_cb, &udp_rel->key); + if (status != PJ_SUCCESS) { + pj_turn_udp_destroy(udp_rel); + return status; + } + + /* Kick start pending read operation */ + pj_ioqueue_op_key_init(&udp_rel->read_key, sizeof(udp_rel->read_key)); + on_read_complete(udp_rel->key, &udp_rel->read_key, 0); + + *p_udp_rel = udp_rel; + return PJ_SUCCESS; +} + +/* + * Destroy. + */ +PJ_DEF(void) pj_turn_udp_destroy(pj_turn_udp *udp_rel) +{ + if (udp_rel->sess) { + pj_turn_session_destroy(udp_rel->sess); + udp_rel->sess = NULL; + } + + if (udp_rel->pool) { + pj_pool_t *pool = udp_rel->pool; + udp_rel->pool = NULL; + pj_pool_release(pool); + } +} + +/* + * Set user data. + */ +PJ_DEF(pj_status_t) pj_turn_udp_set_user_data( pj_turn_udp *udp_rel, + void *user_data) +{ + udp_rel->user_data = user_data; + return PJ_SUCCESS; +} + +/* + * Get user data. + */ +PJ_DEF(void*) pj_turn_udp_get_user_data(pj_turn_udp *udp_rel) +{ + return udp_rel->user_data; +} + +/* + * Initialize. + */ +PJ_DEF(pj_status_t) pj_turn_udp_init( pj_turn_udp *udp_rel, + const pj_str_t *domain, + int default_port, + pj_dns_resolver *resolver, + const pj_stun_auth_cred *cred, + const pj_turn_alloc_param *param) +{ + pj_status_t status; + + status = pj_turn_session_set_server(udp_rel->sess, domain, default_port, + resolver); + if (status != PJ_SUCCESS) + return status; + + status = pj_turn_session_set_cred(udp_rel->sess, cred); + if (status != PJ_SUCCESS) + return status; + + status = pj_turn_session_alloc(udp_rel->sess, param); + if (status != PJ_SUCCESS) + return status; + + return PJ_SUCCESS; +} + +/* + * Send packet. + */ +PJ_DEF(pj_status_t) pj_turn_udp_sendto( pj_turn_udp *udp_rel, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_sockaddr_t *addr, + unsigned addr_len) +{ + return pj_turn_session_sendto(udp_rel->sess, pkt, pkt_len, + addr, addr_len); +} + +/* + * Bind a peer address to a channel number. + */ +PJ_DEF(pj_status_t) pj_turn_udp_bind_channel( pj_turn_udp *udp_rel, + const pj_sockaddr_t *peer, + unsigned addr_len) +{ + return pj_turn_session_bind_channel(udp_rel->sess, peer, addr_len); +} + + +/* + * Notification from ioqueue when incoming UDP packet is received. + */ +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + pj_turn_udp *udp_rel; + pj_status_t status; + + udp_rel = (pj_turn_udp*) pj_ioqueue_get_user_data(key); + + do { + /* Report incoming packet to TURN session */ + if (bytes_read > 0) { + pj_turn_session_on_rx_pkt(udp_rel->sess, udp_rel->pkt, + bytes_read, PJ_TRUE); + } + + /* Read next packet */ + bytes_read = sizeof(udp_rel->pkt); + udp_rel->src_addr_len = sizeof(udp_rel->src_addr); + + status = pj_ioqueue_recvfrom(udp_rel->key, op_key, + udp_rel->pkt, &bytes_read, 0, + &udp_rel->src_addr, + &udp_rel->src_addr_len); + + if (status != PJ_EPENDING && status != PJ_SUCCESS) + bytes_read = -status; + + } while (status != PJ_EPENDING && status != PJ_ECANCELLED); + +} + + +/* + * Callback from TURN session to send outgoing packet. + */ +static pj_status_t turn_on_send_pkt(pj_turn_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_sockaddr_t *dst_addr, + unsigned dst_addr_len) +{ + pj_turn_udp *udp_rel = (pj_turn_udp*) + pj_turn_session_get_user_data(sess); + pj_ssize_t len = pkt_len; + + return pj_sock_sendto(udp_rel->sock, pkt, &len, 0, + dst_addr, dst_addr_len); +} + + +/* + * Callback from TURN session when a channel is successfully bound. + */ +static void turn_on_channel_bound(pj_turn_session *sess, + const pj_sockaddr_t *peer_addr, + unsigned addr_len, + unsigned ch_num) +{ + PJ_UNUSED_ARG(sess); + PJ_UNUSED_ARG(peer_addr); + PJ_UNUSED_ARG(addr_len); + PJ_UNUSED_ARG(ch_num); +} + + +/* + * Callback from TURN session upon incoming data. + */ +static void turn_on_rx_data(pj_turn_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_sockaddr_t *peer_addr, + unsigned addr_len) +{ + pj_turn_udp *udp_rel = (pj_turn_udp*) + pj_turn_session_get_user_data(sess); + if (udp_rel->cb.on_rx_data) { + (*udp_rel->cb.on_rx_data)(udp_rel, pkt, pkt_len, + peer_addr, addr_len); + } +} + + +/* + * Callback from TURN session when state has changed + */ +static void turn_on_state(pj_turn_session *sess, + pj_turn_state_t old_state, + pj_turn_state_t new_state) +{ + pj_turn_udp *udp_rel = (pj_turn_udp*) + pj_turn_session_get_user_data(sess); + if (udp_rel->cb.on_state) { + (*udp_rel->cb.on_state)(udp_rel, old_state, new_state); + } +} + + -- cgit v1.2.3