summaryrefslogtreecommitdiff
path: root/pjnath/src/pjnath/turn_sock.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjnath/src/pjnath/turn_sock.c')
-rw-r--r--pjnath/src/pjnath/turn_sock.c216
1 files changed, 119 insertions, 97 deletions
diff --git a/pjnath/src/pjnath/turn_sock.c b/pjnath/src/pjnath/turn_sock.c
index cd2e08d4..e595d271 100644
--- a/pjnath/src/pjnath/turn_sock.c
+++ b/pjnath/src/pjnath/turn_sock.c
@@ -17,6 +17,7 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <pjnath/turn_sock.h>
+#include <pj/activesock.h>
#include <pj/assert.h>
#include <pj/errno.h>
#include <pj/lock.h>
@@ -50,11 +51,8 @@ struct pj_turn_sock
int af;
pj_turn_tp_type conn_type;
- pj_sock_t sock;
- pj_ioqueue_key_t *key;
- pj_ioqueue_op_key_t read_key;
+ pj_activesock_t *active_sock;
pj_ioqueue_op_key_t send_key;
- pj_uint8_t pkt[PJ_TURN_MAX_PKT_LEN];
};
@@ -71,18 +69,22 @@ static void turn_on_channel_bound(pj_turn_session *sess,
unsigned addr_len,
unsigned ch_num);
static void turn_on_rx_data(pj_turn_session *sess,
- const pj_uint8_t *pkt,
+ void *pkt,
unsigned pkt_len,
const pj_sockaddr_t *peer_addr,
unsigned addr_len);
static void turn_on_state(pj_turn_session *sess,
pj_turn_state_t old_state,
pj_turn_state_t new_state);
-static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read);
-static void on_connect_complete(pj_ioqueue_key_t *key,
- pj_status_t status);
+
+static pj_bool_t on_data_read(pj_activesock_t *asock,
+ void *data,
+ pj_size_t size,
+ pj_status_t status,
+ pj_size_t *remainder);
+static pj_bool_t on_connect_complete(pj_activesock_t *asock,
+ pj_status_t status);
+
static void destroy(pj_turn_sock *turn_sock);
@@ -158,7 +160,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_create(pj_stun_config *cfg,
sess_cb.on_rx_data = &turn_on_rx_data;
sess_cb.on_state = &turn_on_state;
status = pj_turn_session_create(cfg, pool->obj_name, af, conn_type,
- &sess_cb, turn_sock, 0, &turn_sock->sess);
+ &sess_cb, 0, turn_sock, &turn_sock->sess);
if (status != PJ_SUCCESS) {
destroy(turn_sock);
return status;
@@ -187,13 +189,9 @@ static void destroy(pj_turn_sock *turn_sock)
turn_sock->sess = NULL;
}
- if (turn_sock->key) {
- pj_ioqueue_unregister(turn_sock->key);
- turn_sock->key = NULL;
- turn_sock->sock = 0;
- } else if (turn_sock->sock) {
- pj_sock_close(turn_sock->sock);
- turn_sock->sock = 0;
+ if (turn_sock->active_sock) {
+ pj_activesock_close(turn_sock->active_sock);
+ turn_sock->active_sock = NULL;
}
if (turn_sock->lock) {
@@ -271,7 +269,8 @@ static void sess_fail(pj_turn_sock *turn_sock, const char *title,
pj_status_t status)
{
show_err(turn_sock, title, status);
- pj_turn_session_destroy(turn_sock->sess);
+ if (turn_sock->sess)
+ pj_turn_session_destroy(turn_sock->sess);
}
/*
@@ -280,6 +279,7 @@ static void sess_fail(pj_turn_sock *turn_sock, const char *title,
PJ_DEF(pj_status_t) pj_turn_sock_set_user_data( pj_turn_sock *turn_sock,
void *user_data)
{
+ PJ_ASSERT_RETURN(turn_sock, PJ_EINVAL);
turn_sock->user_data = user_data;
return PJ_SUCCESS;
}
@@ -289,6 +289,7 @@ PJ_DEF(pj_status_t) pj_turn_sock_set_user_data( pj_turn_sock *turn_sock,
*/
PJ_DEF(void*) pj_turn_sock_get_user_data(pj_turn_sock *turn_sock)
{
+ PJ_ASSERT_RETURN(turn_sock, NULL);
return turn_sock->user_data;
}
@@ -296,7 +297,7 @@ PJ_DEF(void*) pj_turn_sock_get_user_data(pj_turn_sock *turn_sock)
* Get info.
*/
PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock,
- pj_turn_session_info *info)
+ pj_turn_session_info *info)
{
PJ_ASSERT_RETURN(turn_sock && info, PJ_EINVAL);
@@ -309,15 +310,41 @@ PJ_DEF(pj_status_t) pj_turn_sock_get_info(pj_turn_sock *turn_sock,
}
}
+/**
+ * Lock the TURN socket. Application may need to call this function to
+ * synchronize access to other objects to avoid deadlock.
+ */
+PJ_DEF(pj_status_t) pj_turn_sock_lock(pj_turn_sock *turn_sock)
+{
+ return pj_lock_acquire(turn_sock->lock);
+}
+
+/**
+ * Unlock the TURN socket.
+ */
+PJ_DEF(pj_status_t) pj_turn_sock_unlock(pj_turn_sock *turn_sock)
+{
+ return pj_lock_release(turn_sock->lock);
+}
+
+/*
+ * Set STUN message logging for this TURN session.
+ */
+PJ_DEF(void) pj_turn_sock_set_log( pj_turn_sock *turn_sock,
+ unsigned flags)
+{
+ pj_turn_session_set_log(turn_sock->sess, flags);
+}
+
/*
* Initialize.
*/
-PJ_DEF(pj_status_t) pj_turn_sock_init(pj_turn_sock *turn_sock,
- const pj_str_t *domain,
- int default_port,
- pj_dns_resolver *resolver,
- const pj_stun_auth_cred *cred,
- const pj_turn_alloc_param *param)
+PJ_DEF(pj_status_t) pj_turn_sock_alloc(pj_turn_sock *turn_sock,
+ const pj_str_t *domain,
+ int default_port,
+ pj_dns_resolver *resolver,
+ const pj_stun_auth_cred *cred,
+ const pj_turn_alloc_param *param)
{
pj_status_t status;
@@ -392,16 +419,16 @@ PJ_DEF(pj_status_t) pj_turn_sock_bind_channel( pj_turn_sock *turn_sock,
/*
* Notification when outgoing TCP socket has been connected.
*/
-static void on_connect_complete(pj_ioqueue_key_t *key,
- pj_status_t status)
+static pj_bool_t on_connect_complete(pj_activesock_t *asock,
+ pj_status_t status)
{
pj_turn_sock *turn_sock;
- turn_sock = (pj_turn_sock*) pj_ioqueue_get_user_data(key);
+ turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock);
if (status != PJ_SUCCESS) {
sess_fail(turn_sock, "TCP connect() error", status);
- return;
+ return PJ_FALSE;
}
if (turn_sock->conn_type != PJ_TURN_TP_UDP) {
@@ -409,8 +436,8 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
}
/* Kick start pending read operation */
- pj_ioqueue_op_key_init(&turn_sock->read_key, sizeof(turn_sock->read_key));
- on_read_complete(turn_sock->key, &turn_sock->read_key, INIT);
+ status = pj_activesock_start_read(asock, turn_sock->pool,
+ PJ_TURN_MAX_PKT_LEN, 0);
/* Init send_key */
pj_ioqueue_op_key_init(&turn_sock->send_key, sizeof(turn_sock->send_key));
@@ -419,56 +446,43 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
status = pj_turn_session_alloc(turn_sock->sess, &turn_sock->alloc_param);
if (status != PJ_SUCCESS) {
sess_fail(turn_sock, "Error sending ALLOCATE", status);
- return;
+ return PJ_FALSE;
}
+
+ return PJ_TRUE;
}
/*
* Notification from ioqueue when incoming UDP packet is received.
*/
-static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read)
+static pj_bool_t on_data_read(pj_activesock_t *asock,
+ void *data,
+ pj_size_t size,
+ pj_status_t status,
+ pj_size_t *remainder)
{
- enum { MAX_RETRY = 10 };
pj_turn_sock *turn_sock;
- int retry = 0;
- pj_status_t status;
+ pj_bool_t ret = PJ_TRUE;
- turn_sock = (pj_turn_sock*) pj_ioqueue_get_user_data(key);
+ turn_sock = (pj_turn_sock*) pj_activesock_get_user_data(asock);
pj_lock_acquire(turn_sock->lock);
- do {
- if (bytes_read == INIT) {
- /* Special instruction to initialize pending read() */
- } else if (bytes_read > 0 && turn_sock->sess) {
- /* Report incoming packet to TURN session */
- pj_turn_session_on_rx_pkt(turn_sock->sess, turn_sock->pkt,
- bytes_read,
- turn_sock->conn_type == PJ_TURN_TP_UDP);
- } else if (bytes_read <= 0 && turn_sock->conn_type != PJ_TURN_TP_UDP) {
- sess_fail(turn_sock, "TCP connection closed", -bytes_read);
- goto on_return;
- }
-
- /* Read next packet */
- bytes_read = sizeof(turn_sock->pkt);
- status = pj_ioqueue_recv(turn_sock->key, op_key,
- turn_sock->pkt, &bytes_read, 0);
-
- if (status != PJ_EPENDING && status != PJ_SUCCESS) {
- char errmsg[PJ_ERR_MSG_SIZE];
-
- pj_strerror(status, errmsg, sizeof(errmsg));
- sess_fail(turn_sock, "Socket recv() error", status);
- goto on_return;
- }
-
- } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
- ++retry < MAX_RETRY);
+ if (status == PJ_SUCCESS && turn_sock->sess) {
+ /* Report incoming packet to TURN session */
+ PJ_TODO(REPORT_PARSED_LEN);
+ pj_turn_session_on_rx_pkt(turn_sock->sess, data, size);
+ } else if (status != PJ_SUCCESS &&
+ turn_sock->conn_type != PJ_TURN_TP_UDP)
+ {
+ sess_fail(turn_sock, "TCP connection closed", status);
+ ret = PJ_FALSE;
+ goto on_return;
+ }
on_return:
pj_lock_release(turn_sock->lock);
+
+ return ret;
}
@@ -482,7 +496,7 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess,
unsigned dst_addr_len)
{
pj_turn_sock *turn_sock = (pj_turn_sock*)
- pj_turn_session_get_user_data(sess);
+ pj_turn_session_get_user_data(sess);
pj_ssize_t len = pkt_len;
pj_status_t status;
@@ -495,8 +509,8 @@ static pj_status_t turn_on_send_pkt(pj_turn_session *sess,
PJ_UNUSED_ARG(dst_addr);
PJ_UNUSED_ARG(dst_addr_len);
- status = pj_ioqueue_send(turn_sock->key, &turn_sock->send_key,
- pkt, &len, 0);
+ status = pj_activesock_send(turn_sock->active_sock, &turn_sock->send_key,
+ pkt, &len, 0);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
show_err(turn_sock, "socket send()", status);
}
@@ -524,7 +538,7 @@ static void turn_on_channel_bound(pj_turn_session *sess,
* Callback from TURN session upon incoming data.
*/
static void turn_on_rx_data(pj_turn_session *sess,
- const pj_uint8_t *pkt,
+ void *pkt,
unsigned pkt_len,
const pj_sockaddr_t *peer_addr,
unsigned addr_len)
@@ -559,7 +573,19 @@ static void turn_on_state(pj_turn_session *sess,
return;
}
- if (new_state == PJ_TURN_STATE_RESOLVED) {
+ /* Notify app first */
+ if (turn_sock->cb.on_state) {
+ (*turn_sock->cb.on_state)(turn_sock, old_state, new_state);
+ }
+
+ /* Make sure user hasn't destroyed us in the callback */
+ if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) {
+ pj_turn_session_info info;
+ pj_turn_session_get_info(turn_sock->sess, &info);
+ new_state = info.state;
+ }
+
+ if (turn_sock->sess && new_state == PJ_TURN_STATE_RESOLVED) {
/*
* Once server has been resolved, initiate outgoing TCP
* connection to the server.
@@ -567,19 +593,16 @@ static void turn_on_state(pj_turn_session *sess,
pj_turn_session_info info;
char addrtxt[PJ_INET6_ADDRSTRLEN+8];
int sock_type;
- pj_ioqueue_callback ioq_cb;
+ pj_sock_t sock;
+ pj_activesock_cb asock_cb;
/* Close existing connection, if any. This happens when
* we're switching to alternate TURN server when either TCP
* connection or ALLOCATE request failed.
*/
- if (turn_sock->key) {
- pj_ioqueue_unregister(turn_sock->key);
- turn_sock->key = NULL;
- turn_sock->sock = 0;
- } else if (turn_sock->sock) {
- pj_sock_close(turn_sock->sock);
- turn_sock->sock = 0;
+ if (turn_sock->active_sock) {
+ pj_activesock_close(turn_sock->active_sock);
+ turn_sock->active_sock = NULL;
}
/* Get server address from session info */
@@ -591,20 +614,21 @@ static void turn_on_state(pj_turn_session *sess,
sock_type = pj_SOCK_STREAM();
/* Init socket */
- status = pj_sock_socket(turn_sock->af, sock_type, 0,
- &turn_sock->sock);
+ status = pj_sock_socket(turn_sock->af, sock_type, 0, &sock);
if (status != PJ_SUCCESS) {
pj_turn_sock_destroy(turn_sock);
return;
}
- /* Register to ioqeuue */
- pj_bzero(&ioq_cb, sizeof(ioq_cb));
- ioq_cb.on_read_complete = &on_read_complete;
- ioq_cb.on_connect_complete = &on_connect_complete;
- status = pj_ioqueue_register_sock(turn_sock->pool, turn_sock->cfg.ioqueue,
- turn_sock->sock, turn_sock,
- &ioq_cb, &turn_sock->key);
+ /* Create active socket */
+ pj_bzero(&asock_cb, sizeof(asock_cb));
+ asock_cb.on_data_read = &on_data_read;
+ asock_cb.on_connect_complete = &on_connect_complete;
+ status = pj_activesock_create(turn_sock->pool, sock,
+ sock_type, NULL,
+ turn_sock->cfg.ioqueue, &asock_cb,
+ turn_sock,
+ &turn_sock->active_sock);
if (status != PJ_SUCCESS) {
pj_turn_sock_destroy(turn_sock);
return;
@@ -616,10 +640,12 @@ static void turn_on_state(pj_turn_session *sess,
sizeof(addrtxt), 3)));
/* Initiate non-blocking connect */
- status = pj_ioqueue_connect(turn_sock->key, &info.server,
- pj_sockaddr_get_len(&info.server));
+ status=pj_activesock_start_connect(turn_sock->active_sock,
+ turn_sock->pool,
+ &info.server,
+ pj_sockaddr_get_len(&info.server));
if (status == PJ_SUCCESS) {
- on_connect_complete(turn_sock->key, PJ_SUCCESS);
+ on_connect_complete(turn_sock->active_sock, PJ_SUCCESS);
} else if (status != PJ_EPENDING) {
pj_turn_sock_destroy(turn_sock);
return;
@@ -630,10 +656,6 @@ static void turn_on_state(pj_turn_session *sess,
*/
}
- if (turn_sock->cb.on_state) {
- (*turn_sock->cb.on_state)(turn_sock, old_state, new_state);
- }
-
if (new_state >= PJ_TURN_STATE_DESTROYING && turn_sock->sess) {
pj_time_val delay = {0, 0};