diff options
Diffstat (limited to 'pjnath/src/pjnath/turn_sock.c')
-rw-r--r-- | pjnath/src/pjnath/turn_sock.c | 216 |
1 files changed, 119 insertions, 97 deletions
diff --git a/pjnath/src/pjnath/turn_sock.c b/pjnath/src/pjnath/turn_sock.c index cd2e08d4..e595d271 100644 --- a/pjnath/src/pjnath/turn_sock.c +++ b/pjnath/src/pjnath/turn_sock.c @@ -17,6 +17,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <pjnath/turn_sock.h> +#include <pj/activesock.h> #include <pj/assert.h> #include <pj/errno.h> #include <pj/lock.h> @@ -50,11 +51,8 @@ struct pj_turn_sock int af; pj_turn_tp_type conn_type; - pj_sock_t sock; - pj_ioqueue_key_t *key; - pj_ioqueue_op_key_t read_key; + pj_activesock_t *active_sock; pj_ioqueue_op_key_t send_key; - pj_uint8_t pkt[PJ_TURN_MAX_PKT_LEN]; }; @@ -71,18 +69,22 @@ static void turn_on_channel_bound(pj_turn_session *sess, unsigned addr_len, unsigned ch_num); static void turn_on_rx_data(pj_turn_session *sess, - const pj_uint8_t *pkt, + void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len); static void turn_on_state(pj_turn_session *sess, pj_turn_state_t old_state, pj_turn_state_t new_state); -static void on_read_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_read); -static void on_connect_complete(pj_ioqueue_key_t *key, - pj_status_t status); + +static pj_bool_t on_data_read(pj_activesock_t *asock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder); +static pj_bool_t on_connect_complete(pj_activesock_t *asock, + pj_status_t status); + static void destroy(pj_turn_sock *turn_sock); @@ -158,7 +160,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg, sess_cb.on_rx_data = &turn_on_rx_data; sess_cb.on_state = &turn_on_state; status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type, - &sess_cb, turn_sock, 0, &turn_sock->sess); + &sess_cb, 0, turn_sock, &turn_sock->sess); if (status != PJ_SUCCESS) { destroy(turn_sock); return status; @@ -187,13 +189,9 @@ static void destroy(pj_turn_sock *turn_sock) turn_sock->sess = NULL; } - if (turn_sock->key) { - pj_ioqueue_unregister(turn_sock->key); - turn_sock->key = NULL; - turn_sock->sock = 0; - } else if (turn_sock->sock) { - pj_sock_close(turn_sock->sock); - turn_sock->sock = 0; + if (turn_sock->active_sock) { + pj_activesock_close(turn_sock->active_sock); + turn_sock->active_sock = NULL; } if (turn_sock->lock) { @@ -271,7 +269,8 @@ static void sess_fail(pj_turn_sock *turn_sock, const char *title, pj_status_t status) { show_err(turn_sock, title, status); - pj_turn_session_destroy(turn_sock->sess); + if (turn_sock->sess) + pj_turn_session_destroy(turn_sock->sess); } /* @@ -280,6 +279,7 @@ static void sess_fail(pj_turn_sock *turn_sock, const char *title, PJ_DEF(pj_status_t) pj_turn_sock_set_user_data( pj_turn_sock *turn_sock, void *user_data) { + PJ_ASSERT_RETURN(turn_sock, PJ_EINVAL); turn_sock->user_data = user_data; return PJ_SUCCESS; } @@ -289,6 +289,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_set_user_data( pj_turn_sock *turn_sock, */ PJ_DEF(void*) pj_turn_sock_get_user_data(pj_turn_sock *turn_sock) { + PJ_ASSERT_RETURN(turn_sock, NULL); return turn_sock->user_data; } @@ -296,7 +297,7 @@ PJ_DEF(void*) pj_turn_sock_get_user_data(pj_turn_sock *turn_sock) * Get info. */ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, - pj_turn_session_info *info) + pj_turn_session_info *info) { PJ_ASSERT_RETURN(turn_sock && info, PJ_EINVAL); @@ -309,15 +310,41 @@ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock, } } +/** + * Lock the TURN socket. Application may need to call this function to + * synchronize access to other objects to avoid deadlock. + */ +PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock) +{ + return pj_lock_acquire(turn_sock->lock); +} + +/** + * Unlock the TURN socket. + */ +PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock) +{ + return pj_lock_release(turn_sock->lock); +} + +/* + * Set STUN message logging for this TURN session. + */ +PJ_DEF(void) pj_turn_sock_set_log( pj_turn_sock *turn_sock, + unsigned flags) +{ + pj_turn_session_set_log(turn_sock->sess, flags); +} + /* * Initialize. */ -PJ_DEF(pj_status_t) pj_turn_sock_init(pj_turn_sock *turn_sock, - const pj_str_t *domain, - int default_port, - pj_dns_resolver *resolver, - const pj_stun_auth_cred *cred, - const pj_turn_alloc_param *param) +PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock, + const pj_str_t *domain, + int default_port, + pj_dns_resolver *resolver, + const pj_stun_auth_cred *cred, + const pj_turn_alloc_param *param) { pj_status_t status; @@ -392,16 +419,16 @@ PJ_DEF(pj_status_t) pj_turn_sock_bind_channel( pj_turn_sock *turn_sock, /* * Notification when outgoing TCP socket has been connected. */ -static void on_connect_complete(pj_ioqueue_key_t *key, - pj_status_t status) +static pj_bool_t on_connect_complete(pj_activesock_t *asock, + pj_status_t status) { pj_turn_sock *turn_sock; - turn_sock = (pj_turn_sock*) pj_ioqueue_get_user_data(key); + turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "TCP connect() error", status); - return; + return PJ_FALSE; } if (turn_sock->conn_type != PJ_TURN_TP_UDP) { @@ -409,8 +436,8 @@ static void on_connect_complete(pj_ioqueue_key_t *key, } /* Kick start pending read operation */ - pj_ioqueue_op_key_init(&turn_sock->read_key, sizeof(turn_sock->read_key)); - on_read_complete(turn_sock->key, &turn_sock->read_key, INIT); + status = pj_activesock_start_read(asock, turn_sock->pool, + PJ_TURN_MAX_PKT_LEN, 0); /* Init send_key */ pj_ioqueue_op_key_init(&turn_sock->send_key, sizeof(turn_sock->send_key)); @@ -419,56 +446,43 @@ static void on_connect_complete(pj_ioqueue_key_t *key, status = pj_turn_session_alloc(turn_sock->sess, &turn_sock->alloc_param); if (status != PJ_SUCCESS) { sess_fail(turn_sock, "Error sending ALLOCATE", status); - return; + return PJ_FALSE; } + + return PJ_TRUE; } /* * Notification from ioqueue when incoming UDP packet is received. */ -static void on_read_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_read) +static pj_bool_t on_data_read(pj_activesock_t *asock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) { - enum { MAX_RETRY = 10 }; pj_turn_sock *turn_sock; - int retry = 0; - pj_status_t status; + pj_bool_t ret = PJ_TRUE; - turn_sock = (pj_turn_sock*) pj_ioqueue_get_user_data(key); + turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock); pj_lock_acquire(turn_sock->lock); - do { - if (bytes_read == INIT) { - /* Special instruction to initialize pending read() */ - } else if (bytes_read > 0 && turn_sock->sess) { - /* Report incoming packet to TURN session */ - pj_turn_session_on_rx_pkt(turn_sock->sess, turn_sock->pkt, - bytes_read, - turn_sock->conn_type == PJ_TURN_TP_UDP); - } else if (bytes_read <= 0 && turn_sock->conn_type != PJ_TURN_TP_UDP) { - sess_fail(turn_sock, "TCP connection closed", -bytes_read); - goto on_return; - } - - /* Read next packet */ - bytes_read = sizeof(turn_sock->pkt); - status = pj_ioqueue_recv(turn_sock->key, op_key, - turn_sock->pkt, &bytes_read, 0); - - if (status != PJ_EPENDING && status != PJ_SUCCESS) { - char errmsg[PJ_ERR_MSG_SIZE]; - - pj_strerror(status, errmsg, sizeof(errmsg)); - sess_fail(turn_sock, "Socket recv() error", status); - goto on_return; - } - - } while (status != PJ_EPENDING && status != PJ_ECANCELLED && - ++retry < MAX_RETRY); + if (status == PJ_SUCCESS && turn_sock->sess) { + /* Report incoming packet to TURN session */ + PJ_TODO(REPORT_PARSED_LEN); + pj_turn_session_on_rx_pkt(turn_sock->sess, data, size); + } else if (status != PJ_SUCCESS && + turn_sock->conn_type != PJ_TURN_TP_UDP) + { + sess_fail(turn_sock, "TCP connection closed", status); + ret = PJ_FALSE; + goto on_return; + } on_return: pj_lock_release(turn_sock->lock); + + return ret; } @@ -482,7 +496,7 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess, unsigned dst_addr_len) { pj_turn_sock *turn_sock = (pj_turn_sock*) - pj_turn_session_get_user_data(sess); + pj_turn_session_get_user_data(sess); pj_ssize_t len = pkt_len; pj_status_t status; @@ -495,8 +509,8 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess, PJ_UNUSED_ARG(dst_addr); PJ_UNUSED_ARG(dst_addr_len); - status = pj_ioqueue_send(turn_sock->key, &turn_sock->send_key, - pkt, &len, 0); + status = pj_activesock_send(turn_sock->active_sock, &turn_sock->send_key, + pkt, &len, 0); if (status != PJ_SUCCESS && status != PJ_EPENDING) { show_err(turn_sock, "socket send()", status); } @@ -524,7 +538,7 @@ static void turn_on_channel_bound(pj_turn_session *sess, * Callback from TURN session upon incoming data. */ static void turn_on_rx_data(pj_turn_session *sess, - const pj_uint8_t *pkt, + void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len) @@ -559,7 +573,19 @@ static void turn_on_state(pj_turn_session *sess, return; } - if (new_state == PJ_TURN_STATE_RESOLVED) { + /* Notify app first */ + if (turn_sock->cb.on_state) { + (*turn_sock->cb.on_state)(turn_sock, old_state, new_state); + } + + /* Make sure user hasn't destroyed us in the callback */ + if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) { + pj_turn_session_info info; + pj_turn_session_get_info(turn_sock->sess, &info); + new_state = info.state; + } + + if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) { /* * Once server has been resolved, initiate outgoing TCP * connection to the server. @@ -567,19 +593,16 @@ static void turn_on_state(pj_turn_session *sess, pj_turn_session_info info; char addrtxt[PJ_INET6_ADDRSTRLEN+8]; int sock_type; - pj_ioqueue_callback ioq_cb; + pj_sock_t sock; + pj_activesock_cb asock_cb; /* Close existing connection, if any. This happens when * we're switching to alternate TURN server when either TCP * connection or ALLOCATE request failed. */ - if (turn_sock->key) { - pj_ioqueue_unregister(turn_sock->key); - turn_sock->key = NULL; - turn_sock->sock = 0; - } else if (turn_sock->sock) { - pj_sock_close(turn_sock->sock); - turn_sock->sock = 0; + if (turn_sock->active_sock) { + pj_activesock_close(turn_sock->active_sock); + turn_sock->active_sock = NULL; } /* Get server address from session info */ @@ -591,20 +614,21 @@ static void turn_on_state(pj_turn_session *sess, sock_type = pj_SOCK_STREAM(); /* Init socket */ - status = pj_sock_socket(turn_sock->af, sock_type, 0, - &turn_sock->sock); + status = pj_sock_socket(turn_sock->af, sock_type, 0, &sock); if (status != PJ_SUCCESS) { pj_turn_sock_destroy(turn_sock); return; } - /* Register to ioqeuue */ - pj_bzero(&ioq_cb, sizeof(ioq_cb)); - ioq_cb.on_read_complete = &on_read_complete; - ioq_cb.on_connect_complete = &on_connect_complete; - status = pj_ioqueue_register_sock(turn_sock->pool, turn_sock->cfg.ioqueue, - turn_sock->sock, turn_sock, - &ioq_cb, &turn_sock->key); + /* Create active socket */ + pj_bzero(&asock_cb, sizeof(asock_cb)); + asock_cb.on_data_read = &on_data_read; + asock_cb.on_connect_complete = &on_connect_complete; + status = pj_activesock_create(turn_sock->pool, sock, + sock_type, NULL, + turn_sock->cfg.ioqueue, &asock_cb, + turn_sock, + &turn_sock->active_sock); if (status != PJ_SUCCESS) { pj_turn_sock_destroy(turn_sock); return; @@ -616,10 +640,12 @@ static void turn_on_state(pj_turn_session *sess, sizeof(addrtxt), 3))); /* Initiate non-blocking connect */ - status = pj_ioqueue_connect(turn_sock->key, &info.server, - pj_sockaddr_get_len(&info.server)); + status=pj_activesock_start_connect(turn_sock->active_sock, + turn_sock->pool, + &info.server, + pj_sockaddr_get_len(&info.server)); if (status == PJ_SUCCESS) { - on_connect_complete(turn_sock->key, PJ_SUCCESS); + on_connect_complete(turn_sock->active_sock, PJ_SUCCESS); } else if (status != PJ_EPENDING) { pj_turn_sock_destroy(turn_sock); return; @@ -630,10 +656,6 @@ static void turn_on_state(pj_turn_session *sess, */ } - if (turn_sock->cb.on_state) { - (*turn_sock->cb.on_state)(turn_sock, old_state, new_state); - } - if (new_state >= PJ_TURN_STATE_DESTROYING && turn_sock->sess) { pj_time_val delay = {0, 0}; |