summaryrefslogtreecommitdiff
path: root/pjnath/src/pjturn-srv
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2008-04-09 09:38:12 +0000
committerBenny Prijono <bennylp@teluu.com>2008-04-09 09:38:12 +0000
commitd0b1cc04536aa71c104dc3b14ae0f6f4ffb36066 (patch)
treee74dae3fac329ce74fba07ada0e6a3080b94e584 /pjnath/src/pjturn-srv
parent02ca90e766f49bf2c03e784669220f838eb19805 (diff)
More ticket #485: huge changeset to support TURN TCP. Please see ticket #485 for the details
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1913 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjnath/src/pjturn-srv')
-rw-r--r--pjnath/src/pjturn-srv/allocation.c157
-rw-r--r--pjnath/src/pjturn-srv/listener_tcp.c480
-rw-r--r--pjnath/src/pjturn-srv/listener_udp.c54
-rw-r--r--pjnath/src/pjturn-srv/main.c5
-rw-r--r--pjnath/src/pjturn-srv/server.c199
-rw-r--r--pjnath/src/pjturn-srv/turn.h126
6 files changed, 826 insertions, 195 deletions
diff --git a/pjnath/src/pjturn-srv/allocation.c b/pjnath/src/pjturn-srv/allocation.c
index 339b2924..b552bc45 100644
--- a/pjnath/src/pjturn-srv/allocation.c
+++ b/pjnath/src/pjturn-srv/allocation.c
@@ -65,6 +65,7 @@ static void on_rx_from_peer(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read);
static pj_status_t stun_on_send_msg(pj_stun_session *sess,
+ void *token,
const void *pkt,
pj_size_t pkt_size,
const pj_sockaddr_t *dst_addr,
@@ -73,12 +74,14 @@ static pj_status_t stun_on_rx_request(pj_stun_session *sess,
const pj_uint8_t *pkt,
unsigned pkt_len,
const pj_stun_rx_data *rdata,
+ void *token,
const pj_sockaddr_t *src_addr,
unsigned src_addr_len);
static pj_status_t stun_on_rx_indication(pj_stun_session *sess,
const pj_uint8_t *pkt,
unsigned pkt_len,
const pj_stun_msg *msg,
+ void *token,
const pj_sockaddr_t *src_addr,
unsigned src_addr_len);
@@ -123,7 +126,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg,
if (cfg->bandwidth > MAX_CLIENT_BANDWIDTH) {
pj_stun_session_respond(sess, rdata,
PJ_STUN_SC_ALLOCATION_QUOTA_REACHED,
- "Invalid bandwidth", PJ_TRUE,
+ "Invalid bandwidth", NULL, PJ_TRUE,
src_addr, src_addr_len);
return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_ALLOCATION_QUOTA_REACHED);
}
@@ -134,7 +137,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg,
if (attr_req_tp == NULL) {
pj_stun_session_respond(sess, rdata, PJ_STUN_SC_BAD_REQUEST,
"Missing REQUESTED-TRANSPORT attribute",
- PJ_TRUE, src_addr, src_addr_len);
+ NULL, PJ_TRUE, src_addr, src_addr_len);
return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST);
}
@@ -143,7 +146,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg,
/* Can only support UDP for now */
if (cfg->tp_type != PJ_TURN_TP_UDP) {
pj_stun_session_respond(sess, rdata, PJ_STUN_SC_UNSUPP_TRANSPORT_PROTO,
- NULL, PJ_TRUE, src_addr, src_addr_len);
+ NULL, NULL, PJ_TRUE, src_addr, src_addr_len);
return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_UNSUPP_TRANSPORT_PROTO);
}
@@ -155,8 +158,8 @@ static pj_status_t parse_allocate_req(alloc_request *cfg,
/* We don't support RESERVATION-TOKEN for now */
pj_stun_session_respond(sess, rdata,
PJ_STUN_SC_BAD_REQUEST,
- "RESERVATION-TOKEN is not supported", PJ_TRUE,
- src_addr, src_addr_len);
+ "RESERVATION-TOKEN is not supported", NULL,
+ PJ_TRUE, src_addr, src_addr_len);
return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST);
}
@@ -167,8 +170,8 @@ static pj_status_t parse_allocate_req(alloc_request *cfg,
/* We don't support REQUESTED-PROPS for now */
pj_stun_session_respond(sess, rdata,
PJ_STUN_SC_BAD_REQUEST,
- "REQUESTED-PROPS is not supported", PJ_TRUE,
- src_addr, src_addr_len);
+ "REQUESTED-PROPS is not supported",
+ NULL, PJ_TRUE, src_addr, src_addr_len);
return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST);
}
@@ -179,8 +182,8 @@ static pj_status_t parse_allocate_req(alloc_request *cfg,
cfg->lifetime = attr_lifetime->value;
if (cfg->lifetime < MIN_LIFETIME) {
pj_stun_session_respond(sess, rdata, PJ_STUN_SC_BAD_REQUEST,
- "LIFETIME too short", PJ_TRUE,
- src_addr, src_addr_len);
+ "LIFETIME too short", NULL,
+ PJ_TRUE, src_addr, src_addr_len);
return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST);
}
if (cfg->lifetime > MAX_LIFETIME)
@@ -196,6 +199,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg,
/* Respond to ALLOCATE request */
static pj_status_t send_allocate_response(pj_turn_allocation *alloc,
pj_stun_session *srv_sess,
+ pj_turn_transport *transport,
const pj_stun_rx_data *rdata)
{
pj_stun_tx_data *tdata;
@@ -232,8 +236,8 @@ static pj_status_t send_allocate_response(pj_turn_allocation *alloc,
pj_sockaddr_get_len(&alloc->hkey.clt_addr));
/* Send the response */
- return pj_stun_session_send_msg(srv_sess, PJ_TRUE,
- &alloc->hkey.clt_addr,
+ return pj_stun_session_send_msg(srv_sess, transport, PJ_TRUE,
+ PJ_FALSE, &alloc->hkey.clt_addr,
pj_sockaddr_get_len(&alloc->hkey.clt_addr),
tdata);
}
@@ -283,14 +287,14 @@ static pj_status_t init_cred(pj_turn_allocation *alloc, const pj_stun_msg *req)
/*
* Create new allocation.
*/
-PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener,
+PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_transport *transport,
const pj_sockaddr_t *src_addr,
unsigned src_addr_len,
const pj_stun_rx_data *rdata,
pj_stun_session *srv_sess,
pj_turn_allocation **p_alloc)
{
- pj_turn_srv *srv = listener->server;
+ pj_turn_srv *srv = transport->listener->server;
const pj_stun_msg *msg = rdata->msg;
pj_pool_t *pool;
alloc_request req;
@@ -310,13 +314,16 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener,
alloc = PJ_POOL_ZALLOC_T(pool, pj_turn_allocation);
alloc->pool = pool;
alloc->obj_name = pool->obj_name;
- alloc->listener = listener;
- alloc->clt_sock = PJ_INVALID_SOCKET;
alloc->relay.tp.sock = PJ_INVALID_SOCKET;
+ alloc->server = transport->listener->server;
alloc->bandwidth = req.bandwidth;
- alloc->hkey.tp_type = listener->tp_type;
+ /* Set transport */
+ alloc->transport = transport;
+ pj_turn_transport_add_ref(transport, alloc);
+
+ alloc->hkey.tp_type = transport->listener->tp_type;
pj_memcpy(&alloc->hkey.clt_addr, src_addr, src_addr_len);
status = pj_lock_create_recursive_mutex(pool, alloc->obj_name,
@@ -332,7 +339,8 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener,
alloc->ch_table = pj_hash_create(pool, PEER_TABLE_SIZE);
/* Print info */
- pj_ansi_strcpy(alloc->info, pj_turn_tp_type_name(listener->tp_type));
+ pj_ansi_strcpy(alloc->info,
+ pj_turn_tp_type_name(transport->listener->tp_type));
alloc->info[3] = ':';
pj_sockaddr_print(src_addr, alloc->info+4, sizeof(alloc->info)-4, 3);
@@ -370,7 +378,7 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener,
pj_turn_srv_register_allocation(srv, alloc);
/* Respond to ALLOCATE request */
- status = send_allocate_response(alloc, srv_sess, rdata);
+ status = send_allocate_response(alloc, srv_sess, transport, rdata);
if (status != PJ_SUCCESS)
goto on_error;
@@ -388,7 +396,7 @@ on_error:
/* Send reply to the ALLOCATE request */
pj_strerror(status, str_tmp, sizeof(str_tmp));
pj_stun_session_respond(srv_sess, rdata, PJ_STUN_SC_BAD_REQUEST, str_tmp,
- PJ_TRUE, src_addr, src_addr_len);
+ transport, PJ_TRUE, src_addr, src_addr_len);
/* Cleanup */
destroy_allocation(alloc);
@@ -400,7 +408,7 @@ on_error:
static void destroy_relay(pj_turn_relay_res *relay)
{
if (relay->timer.id) {
- pj_timer_heap_cancel(relay->allocation->listener->server->core.timer_heap,
+ pj_timer_heap_cancel(relay->allocation->server->core.timer_heap,
&relay->timer);
relay->timer.id = PJ_FALSE;
}
@@ -427,7 +435,7 @@ static void destroy_allocation(pj_turn_allocation *alloc)
pj_pool_t *pool;
/* Unregister this allocation */
- pj_turn_srv_unregister_allocation(alloc->listener->server, alloc);
+ pj_turn_srv_unregister_allocation(alloc->server, alloc);
/* Destroy relay */
destroy_relay(&alloc->relay);
@@ -437,6 +445,12 @@ static void destroy_allocation(pj_turn_allocation *alloc)
pj_lock_acquire(alloc->lock);
}
+ /* Unreference transport */
+ if (alloc->transport) {
+ pj_turn_transport_dec_ref(alloc->transport, alloc);
+ alloc->transport = NULL;
+ }
+
/* Destroy STUN session */
if (alloc->sess) {
pj_stun_session_destroy(alloc->sess);
@@ -465,6 +479,20 @@ PJ_DECL(void) pj_turn_allocation_destroy(pj_turn_allocation *alloc)
}
+/*
+ * Handle transport closure.
+ */
+PJ_DEF(void) pj_turn_allocation_on_transport_closed( pj_turn_allocation *alloc,
+ pj_turn_transport *tp)
+{
+ PJ_LOG(5,(alloc->obj_name, "Transport %s unexpectedly closed, destroying "
+ "allocation %s", tp->info, alloc->info));
+ pj_turn_transport_dec_ref(tp, alloc);
+ alloc->transport = NULL;
+ destroy_allocation(alloc);
+}
+
+
/* Initiate shutdown sequence for this allocation and start destroy timer.
* Once allocation is marked as shutting down, any packets will be
* rejected/discarded
@@ -476,7 +504,7 @@ static void alloc_shutdown(pj_turn_allocation *alloc)
/* Work with existing schedule */
if (alloc->relay.timer.id == TIMER_ID_TIMEOUT) {
/* Cancel existing shutdown timer */
- pj_timer_heap_cancel(alloc->listener->server->core.timer_heap,
+ pj_timer_heap_cancel(alloc->server->core.timer_heap,
&alloc->relay.timer);
alloc->relay.timer.id = TIMER_ID_NONE;
@@ -498,7 +526,7 @@ static void alloc_shutdown(pj_turn_allocation *alloc)
/* Schedule destroy timer */
alloc->relay.timer.id = TIMER_ID_DESTROY;
- pj_timer_heap_schedule(alloc->listener->server->core.timer_heap,
+ pj_timer_heap_schedule(alloc->server->core.timer_heap,
&alloc->relay.timer, &destroy_delay);
}
@@ -514,7 +542,7 @@ static pj_status_t resched_timeout(pj_turn_allocation *alloc)
pj_assert(alloc->relay.timer.id != TIMER_ID_DESTROY);
if (alloc->relay.timer.id != 0) {
- pj_timer_heap_cancel(alloc->listener->server->core.timer_heap,
+ pj_timer_heap_cancel(alloc->server->core.timer_heap,
&alloc->relay.timer);
alloc->relay.timer.id = TIMER_ID_NONE;
}
@@ -523,7 +551,7 @@ static pj_status_t resched_timeout(pj_turn_allocation *alloc)
delay.msec = 0;
alloc->relay.timer.id = TIMER_ID_TIMEOUT;
- status = pj_timer_heap_schedule(alloc->listener->server->core.timer_heap,
+ status = pj_timer_heap_schedule(alloc->server->core.timer_heap,
&alloc->relay.timer, &delay);
if (status != PJ_SUCCESS) {
alloc->relay.timer.id = TIMER_ID_NONE;
@@ -589,7 +617,7 @@ static pj_status_t create_relay(pj_turn_srv *srv,
relay->tp.sock = PJ_INVALID_SOCKET;
/* TODO: get the requested address family from somewhere */
- af = alloc->listener->addr.addr.sa_family;
+ af = alloc->transport->listener->addr.addr.sa_family;
/* Save realm */
sa = (pj_stun_string_attr*)
@@ -686,7 +714,8 @@ static pj_status_t create_relay(pj_turn_srv *srv,
return status;
}
if (!pj_sockaddr_has_addr(&relay->hkey.addr)) {
- pj_sockaddr_copy_addr(&relay->hkey.addr, &alloc->listener->addr);
+ pj_sockaddr_copy_addr(&relay->hkey.addr,
+ &alloc->transport->listener->addr);
}
if (!pj_sockaddr_has_addr(&relay->hkey.addr)) {
pj_sockaddr tmp_addr;
@@ -724,8 +753,8 @@ static void send_reply_err(pj_turn_allocation *alloc,
{
pj_status_t status;
- status = pj_stun_session_respond(alloc->sess, rdata, code, errmsg, cache,
- &alloc->hkey.clt_addr,
+ status = pj_stun_session_respond(alloc->sess, rdata, code, errmsg, NULL,
+ cache, &alloc->hkey.clt_addr,
pj_sockaddr_get_len(&alloc->hkey.clt_addr.addr));
if (status != PJ_SUCCESS) {
alloc_err(alloc, "Error sending STUN error response", status);
@@ -769,8 +798,8 @@ static void send_reply_ok(pj_turn_allocation *alloc,
}
}
- status = pj_stun_session_send_msg(alloc->sess, PJ_TRUE,
- &alloc->hkey.clt_addr,
+ status = pj_stun_session_send_msg(alloc->sess, NULL, PJ_TRUE,
+ PJ_FALSE, &alloc->hkey.clt_addr,
pj_sockaddr_get_len(&alloc->hkey.clt_addr),
tdata);
if (status != PJ_SUCCESS) {
@@ -790,13 +819,6 @@ static pj_turn_permission *create_permission(pj_turn_allocation *alloc,
perm = PJ_POOL_ZALLOC_T(alloc->pool, pj_turn_permission);
pj_memcpy(&perm->hkey.peer_addr, peer_addr, addr_len);
- if (alloc->listener->tp_type == PJ_TURN_TP_UDP) {
- perm->sock = alloc->listener->sock;
- } else {
- pj_assert(!"TCP is not supported yet");
- return NULL;
- }
-
perm->allocation = alloc;
perm->channel = PJ_TURN_INVALID_CHANNEL;
@@ -900,13 +922,28 @@ PJ_DEF(void) pj_turn_allocation_on_rx_client_pkt(pj_turn_allocation *alloc,
* callbacks.
*/
unsigned options = PJ_STUN_CHECK_PACKET;
- if (pkt->listener->tp_type == PJ_TURN_TP_UDP)
+ unsigned parsed_len = 0;
+
+ if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP)
options |= PJ_STUN_IS_DATAGRAM;
status = pj_stun_session_on_rx_pkt(alloc->sess, pkt->pkt, pkt->len,
- options, NULL,
+ options, NULL, &parsed_len,
&pkt->src.clt_addr,
pkt->src_addr_len);
+
+ if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP) {
+ pkt->len = 0;
+ } else if (parsed_len > 0) {
+ if (parsed_len == pkt->len) {
+ pkt->len = 0;
+ } else {
+ pj_memmove(pkt->pkt, pkt->pkt+parsed_len,
+ pkt->len - parsed_len);
+ pkt->len -= parsed_len;
+ }
+ }
+
if (status != PJ_SUCCESS) {
alloc_err(alloc, "Error handling STUN packet", status);
goto on_return;
@@ -923,7 +960,7 @@ PJ_DEF(void) pj_turn_allocation_on_rx_client_pkt(pj_turn_allocation *alloc,
pj_assert(sizeof(*cd)==4);
/* For UDP check the packet length */
- if (alloc->listener->tp_type == PJ_TURN_TP_UDP) {
+ if (alloc->transport->listener->tp_type == PJ_TURN_TP_UDP) {
if (pkt->len < pj_ntohs(cd->length)+sizeof(*cd)) {
PJ_LOG(4,(alloc->obj_name,
"ChannelData from %s discarded: UDP size error",
@@ -1003,10 +1040,10 @@ static void handle_peer_pkt(pj_turn_allocation *alloc,
pj_memcpy(rel->tp.tx_pkt+sizeof(pj_turn_channel_data), pkt, len);
/* Send to client */
- pj_turn_listener_sendto(alloc->listener, rel->tp.tx_pkt,
- len+sizeof(pj_turn_channel_data), 0,
- &alloc->hkey.clt_addr,
- pj_sockaddr_get_len(&alloc->hkey.clt_addr));
+ alloc->transport->sendto(alloc->transport, rel->tp.tx_pkt,
+ len+sizeof(pj_turn_channel_data), 0,
+ &alloc->hkey.clt_addr,
+ pj_sockaddr_get_len(&alloc->hkey.clt_addr));
} else {
/* Send Data Indication */
pj_stun_tx_data *tdata;
@@ -1026,8 +1063,8 @@ static void handle_peer_pkt(pj_turn_allocation *alloc,
PJ_STUN_ATTR_DATA,
(const pj_uint8_t*)pkt, len);
- pj_stun_session_send_msg(alloc->sess, PJ_FALSE,
- &alloc->hkey.clt_addr,
+ pj_stun_session_send_msg(alloc->sess, NULL, PJ_FALSE,
+ PJ_FALSE, &alloc->hkey.clt_addr,
pj_sockaddr_get_len(&alloc->hkey.clt_addr),
tdata);
}
@@ -1076,6 +1113,7 @@ static void on_rx_from_peer(pj_ioqueue_key_t *key,
* a STUN message towards the client.
*/
static pj_status_t stun_on_send_msg(pj_stun_session *sess,
+ void *token,
const void *pkt,
pj_size_t pkt_size,
const pj_sockaddr_t *dst_addr,
@@ -1083,10 +1121,12 @@ static pj_status_t stun_on_send_msg(pj_stun_session *sess,
{
pj_turn_allocation *alloc;
+ PJ_UNUSED_ARG(token);
+
alloc = (pj_turn_allocation*) pj_stun_session_get_user_data(sess);
- return pj_turn_listener_sendto(alloc->listener, pkt, pkt_size, 0,
- dst_addr, addr_len);
+ return alloc->transport->sendto(alloc->transport, pkt, pkt_size, 0,
+ dst_addr, addr_len);
}
/*
@@ -1098,6 +1138,7 @@ static pj_status_t stun_on_rx_request(pj_stun_session *sess,
const pj_uint8_t *pkt,
unsigned pkt_len,
const pj_stun_rx_data *rdata,
+ void *token,
const pj_sockaddr_t *src_addr,
unsigned src_addr_len)
{
@@ -1106,6 +1147,7 @@ static pj_status_t stun_on_rx_request(pj_stun_session *sess,
PJ_UNUSED_ARG(pkt);
PJ_UNUSED_ARG(pkt_len);
+ PJ_UNUSED_ARG(token);
PJ_UNUSED_ARG(src_addr);
PJ_UNUSED_ARG(src_addr_len);
@@ -1274,6 +1316,7 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *sess,
const pj_uint8_t *pkt,
unsigned pkt_len,
const pj_stun_msg *msg,
+ void *token,
const pj_sockaddr_t *src_addr,
unsigned src_addr_len)
{
@@ -1281,9 +1324,11 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *sess,
pj_stun_data_attr *data_attr;
pj_turn_allocation *alloc;
pj_turn_permission *perm;
+ pj_ssize_t len;
PJ_UNUSED_ARG(pkt);
PJ_UNUSED_ARG(pkt_len);
+ PJ_UNUSED_ARG(token);
PJ_UNUSED_ARG(src_addr);
PJ_UNUSED_ARG(src_addr_len);
@@ -1320,15 +1365,11 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *sess,
if (data_attr == NULL)
return PJ_SUCCESS;
- /* Relay the data to client */
- if (alloc->hkey.tp_type == PJ_TURN_TP_UDP) {
- pj_ssize_t len = data_attr->length;
- pj_sock_sendto(alloc->listener->sock, data_attr->data,
- &len, 0, &peer_attr->sockaddr,
- pj_sockaddr_get_len(&peer_attr->sockaddr));
- } else {
- pj_assert(!"TCP is not supported");
- }
+ /* Relay the data to peer */
+ len = data_attr->length;
+ pj_sock_sendto(alloc->relay.tp.sock, data_attr->data,
+ &len, 0, &peer_attr->sockaddr,
+ pj_sockaddr_get_len(&peer_attr->sockaddr));
return PJ_SUCCESS;
}
diff --git a/pjnath/src/pjturn-srv/listener_tcp.c b/pjnath/src/pjturn-srv/listener_tcp.c
new file mode 100644
index 00000000..e5369e5a
--- /dev/null
+++ b/pjnath/src/pjturn-srv/listener_tcp.c
@@ -0,0 +1,480 @@
+/* $Id$ */
+/*
+ * Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include "turn.h"
+#include <pj/compat/socket.h>
+
+struct accept_op
+{
+ pj_ioqueue_op_key_t op_key;
+ pj_sock_t sock;
+ pj_sockaddr src_addr;
+ int src_addr_len;
+};
+
+struct tcp_listener
+{
+ pj_turn_listener base;
+ pj_ioqueue_key_t *key;
+ unsigned accept_cnt;
+ struct accept_op *accept_op; /* Array of accept_op's */
+};
+
+
+static void lis_on_accept_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
+ pj_status_t status);
+static pj_status_t lis_destroy(pj_turn_listener *listener);
+static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
+ pj_sockaddr_t *src_addr, int src_addr_len);
+
+static void show_err(const char *sender, const char *title,
+ pj_status_t status)
+{
+ char errmsg[PJ_ERR_MSG_SIZE];
+
+ pj_strerror(status, errmsg, sizeof(errmsg));
+ PJ_LOG(4,(sender, "%s: %s", title, errmsg));
+}
+
+
+/*
+ * Create a new listener on the specified port.
+ */
+PJ_DEF(pj_status_t) pj_turn_listener_create_tcp(pj_turn_srv *srv,
+ int af,
+ const pj_str_t *bound_addr,
+ unsigned port,
+ unsigned concurrency_cnt,
+ unsigned flags,
+ pj_turn_listener **p_listener)
+{
+ pj_pool_t *pool;
+ struct tcp_listener *tcp_lis;
+ pj_ioqueue_callback ioqueue_cb;
+ unsigned i;
+ pj_status_t status;
+
+ /* Create structure */
+ pool = pj_pool_create(srv->core.pf, "tcpl%p", 1000, 1000, NULL);
+ tcp_lis = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
+ tcp_lis->base.pool = pool;
+ tcp_lis->base.obj_name = pool->obj_name;
+ tcp_lis->base.server = srv;
+ tcp_lis->base.tp_type = PJ_TURN_TP_TCP;
+ tcp_lis->base.sock = PJ_INVALID_SOCKET;
+ //tcp_lis->base.sendto = &tcp_sendto;
+ tcp_lis->base.destroy = &lis_destroy;
+ tcp_lis->accept_cnt = concurrency_cnt;
+ tcp_lis->base.flags = flags;
+
+ /* Create socket */
+ status = pj_sock_socket(af, pj_SOCK_STREAM(), 0, &tcp_lis->base.sock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Init bind address */
+ status = pj_sockaddr_init(af, &tcp_lis->base.addr, bound_addr,
+ (pj_uint16_t)port);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Create info */
+ pj_ansi_strcpy(tcp_lis->base.info, "TCP:");
+ pj_sockaddr_print(&tcp_lis->base.addr, tcp_lis->base.info+4,
+ sizeof(tcp_lis->base.info)-4, 3);
+
+ /* Bind socket */
+ status = pj_sock_bind(tcp_lis->base.sock, &tcp_lis->base.addr,
+ pj_sockaddr_get_len(&tcp_lis->base.addr));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Listen() */
+ status = pj_sock_listen(tcp_lis->base.sock, 5);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Register to ioqueue */
+ pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb));
+ ioqueue_cb.on_accept_complete = &lis_on_accept_complete;
+ status = pj_ioqueue_register_sock(pool, srv->core.ioqueue, tcp_lis->base.sock,
+ tcp_lis, &ioqueue_cb, &tcp_lis->key);
+
+ /* Create op keys */
+ tcp_lis->accept_op = (struct accept_op*)pj_pool_calloc(pool, concurrency_cnt,
+ sizeof(struct accept_op));
+
+ /* Create each accept_op and kick off read operation */
+ for (i=0; i<concurrency_cnt; ++i) {
+ lis_on_accept_complete(tcp_lis->key, &tcp_lis->accept_op[i].op_key,
+ PJ_INVALID_SOCKET, PJ_EPENDING);
+ }
+
+ /* Done */
+ PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s created",
+ tcp_lis->base.info));
+
+ *p_listener = &tcp_lis->base;
+ return PJ_SUCCESS;
+
+
+on_error:
+ lis_destroy(&tcp_lis->base);
+ return status;
+}
+
+
+/*
+ * Destroy listener.
+ */
+static pj_status_t lis_destroy(pj_turn_listener *listener)
+{
+ struct tcp_listener *tcp_lis = (struct tcp_listener *)listener;
+ unsigned i;
+
+ if (tcp_lis->key) {
+ pj_ioqueue_unregister(tcp_lis->key);
+ tcp_lis->key = NULL;
+ tcp_lis->base.sock = PJ_INVALID_SOCKET;
+ } else if (tcp_lis->base.sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(tcp_lis->base.sock);
+ tcp_lis->base.sock = PJ_INVALID_SOCKET;
+ }
+
+ for (i=0; i<tcp_lis->accept_cnt; ++i) {
+ /* Nothing to do */
+ }
+
+ if (tcp_lis->base.pool) {
+ pj_pool_t *pool = tcp_lis->base.pool;
+
+ PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s destroyed",
+ tcp_lis->base.info));
+
+ tcp_lis->base.pool = NULL;
+ pj_pool_release(pool);
+ }
+ return PJ_SUCCESS;
+}
+
+
+/*
+ * Callback on new TCP connection.
+ */
+static void lis_on_accept_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
+ pj_status_t status)
+{
+ struct tcp_listener *tcp_lis;
+ struct accept_op *accept_op = (struct accept_op*) op_key;
+
+ tcp_lis = (struct tcp_listener*) pj_ioqueue_get_user_data(key);
+
+ PJ_UNUSED_ARG(sock);
+
+ do {
+ /* Report new connection. */
+ if (status == PJ_SUCCESS) {
+ char addr[PJ_INET6_ADDRSTRLEN+8];
+ PJ_LOG(5,(tcp_lis->base.obj_name, "Incoming TCP from %s",
+ pj_sockaddr_print(&accept_op->src_addr, addr,
+ sizeof(addr), 3)));
+ transport_create(accept_op->sock, &tcp_lis->base,
+ &accept_op->src_addr, accept_op->src_addr_len);
+ } else if (status != PJ_EPENDING) {
+ show_err(tcp_lis->base.obj_name, "accept()", status);
+ }
+
+ /* Prepare next accept() */
+ accept_op->src_addr_len = sizeof(accept_op->src_addr);
+ status = pj_ioqueue_accept(key, op_key, &accept_op->sock,
+ NULL,
+ &accept_op->src_addr,
+ &accept_op->src_addr_len);
+
+ } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
+ status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
+}
+
+
+/****************************************************************************/
+/*
+ * Transport
+ */
+enum
+{
+ TIMER_NONE,
+ TIMER_DESTROY
+};
+
+/* The delay in seconds to be applied before TCP transport is destroyed when
+ * no allocation is referencing it. This also means the initial time to wait
+ * after the initial TCP connection establishment to receive a valid STUN
+ * message in the transport.
+ */
+#define SHUTDOWN_DELAY 10
+
+struct recv_op
+{
+ pj_ioqueue_op_key_t op_key;
+ pj_turn_pkt pkt;
+};
+
+struct tcp_transport
+{
+ pj_turn_transport base;
+ pj_pool_t *pool;
+ pj_timer_entry timer;
+
+ pj_turn_allocation *alloc;
+ int ref_cnt;
+
+ pj_sock_t sock;
+ pj_ioqueue_key_t *key;
+ struct recv_op recv_op;
+ pj_ioqueue_op_key_t send_op;
+};
+
+
+static void tcp_on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+
+static pj_status_t tcp_sendto(pj_turn_transport *tp,
+ const void *packet,
+ pj_size_t size,
+ unsigned flag,
+ const pj_sockaddr_t *addr,
+ int addr_len);
+static void tcp_destroy(struct tcp_transport *tcp);
+static void tcp_add_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc);
+static void tcp_dec_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc);
+static void timer_callback(pj_timer_heap_t *timer_heap,
+ pj_timer_entry *entry);
+
+static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
+ pj_sockaddr_t *src_addr, int src_addr_len)
+{
+ pj_pool_t *pool;
+ struct tcp_transport *tcp;
+ pj_ioqueue_callback cb;
+ pj_status_t status;
+
+ pool = pj_pool_create(lis->server->core.pf, "tcp%p", 1000, 1000, NULL);
+
+ tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
+ tcp->base.obj_name = pool->obj_name;
+ tcp->base.listener = lis;
+ tcp->base.info = lis->info;
+ tcp->base.sendto = &tcp_sendto;
+ tcp->base.add_ref = &tcp_add_ref;
+ tcp->base.dec_ref = &tcp_dec_ref;
+ tcp->pool = pool;
+ tcp->sock = sock;
+
+ pj_timer_entry_init(&tcp->timer, TIMER_NONE, tcp, &timer_callback);
+
+ /* Register to ioqueue */
+ pj_bzero(&cb, sizeof(cb));
+ cb.on_read_complete = &tcp_on_read_complete;
+ status = pj_ioqueue_register_sock(pool, lis->server->core.ioqueue, sock,
+ tcp, &cb, &tcp->key);
+ if (status != PJ_SUCCESS) {
+ tcp_destroy(tcp);
+ return;
+ }
+
+ /* Init pkt */
+ tcp->recv_op.pkt.pool = pj_pool_create(lis->server->core.pf, "tcpkt%p",
+ 1000, 1000, NULL);
+ tcp->recv_op.pkt.transport = &tcp->base;
+ tcp->recv_op.pkt.src.tp_type = PJ_TURN_TP_TCP;
+ tcp->recv_op.pkt.src_addr_len = src_addr_len;
+ pj_memcpy(&tcp->recv_op.pkt.src.clt_addr, src_addr, src_addr_len);
+
+ tcp_on_read_complete(tcp->key, &tcp->recv_op.op_key, -PJ_EPENDING);
+ /* Should not access transport from now, it may have been destroyed */
+}
+
+
+static void tcp_destroy(struct tcp_transport *tcp)
+{
+ if (tcp->key) {
+ pj_ioqueue_unregister(tcp->key);
+ tcp->key = NULL;
+ tcp->sock = 0;
+ } else if (tcp->sock) {
+ pj_sock_close(tcp->sock);
+ tcp->sock = 0;
+ }
+
+ if (tcp->pool) {
+ pj_pool_release(tcp->pool);
+ }
+}
+
+
+static void timer_callback(pj_timer_heap_t *timer_heap,
+ pj_timer_entry *entry)
+{
+ struct tcp_transport *tcp = (struct tcp_transport*) entry->user_data;
+
+ PJ_UNUSED_ARG(timer_heap);
+
+ tcp_destroy(tcp);
+}
+
+
+static void tcp_on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ struct tcp_transport *tcp;
+ struct recv_op *recv_op = (struct recv_op*) op_key;
+ pj_status_t status;
+
+ tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key);
+
+ do {
+ /* Report to server or allocation, if we have allocation */
+ if (bytes_read > 0) {
+
+ recv_op->pkt.len = bytes_read;
+ pj_gettimeofday(&recv_op->pkt.rx_time);
+
+ tcp_add_ref(&tcp->base, NULL);
+
+ if (tcp->alloc) {
+ pj_turn_allocation_on_rx_client_pkt(tcp->alloc, &recv_op->pkt);
+ } else {
+ pj_turn_srv_on_rx_pkt(tcp->base.listener->server, &recv_op->pkt);
+ }
+
+ pj_assert(tcp->ref_cnt > 0);
+ tcp_dec_ref(&tcp->base, NULL);
+
+ } else if (bytes_read != -PJ_EPENDING) {
+ /* TCP connection closed/error. Notify client and then destroy
+ * ourselves.
+ * Note: the -PJ_EPENDING is the value passed during init.
+ */
+ ++tcp->ref_cnt;
+
+ if (tcp->alloc) {
+ if (bytes_read != 0) {
+ show_err(tcp->base.obj_name, "TCP socket error",
+ -bytes_read);
+ } else {
+ PJ_LOG(5,(tcp->base.obj_name, "TCP socket closed"));
+ }
+ pj_turn_allocation_on_transport_closed(tcp->alloc, &tcp->base);
+ tcp->alloc = NULL;
+ }
+
+ pj_assert(tcp->ref_cnt > 0);
+ if (--tcp->ref_cnt == 0) {
+ tcp_destroy(tcp);
+ return;
+ }
+ }
+
+ /* Reset pool */
+ pj_pool_reset(recv_op->pkt.pool);
+
+ /* If packet is full discard it */
+ if (recv_op->pkt.len == sizeof(recv_op->pkt.pkt)) {
+ PJ_LOG(4,(tcp->base.obj_name, "Buffer discarded"));
+ recv_op->pkt.len = 0;
+ }
+
+ /* Read next packet */
+ bytes_read = sizeof(recv_op->pkt.pkt) - recv_op->pkt.len;
+ status = pj_ioqueue_recv(tcp->key, op_key,
+ recv_op->pkt.pkt + recv_op->pkt.len,
+ &bytes_read, 0);
+
+ if (status != PJ_EPENDING && status != PJ_SUCCESS)
+ bytes_read = -status;
+
+ } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
+ status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
+
+}
+
+
+static pj_status_t tcp_sendto(pj_turn_transport *tp,
+ const void *packet,
+ pj_size_t size,
+ unsigned flag,
+ const pj_sockaddr_t *addr,
+ int addr_len)
+{
+ struct tcp_transport *tcp = (struct tcp_transport*) tp;
+ pj_ssize_t length = size;
+
+ PJ_UNUSED_ARG(addr);
+ PJ_UNUSED_ARG(addr_len);
+
+ return pj_ioqueue_send(tcp->key, &tcp->send_op, packet, &length, flag);
+}
+
+
+static void tcp_add_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc)
+{
+ struct tcp_transport *tcp = (struct tcp_transport*) tp;
+
+ ++tcp->ref_cnt;
+
+ if (tcp->alloc == NULL && alloc) {
+ tcp->alloc = alloc;
+ }
+
+ /* Cancel shutdown timer if it's running */
+ if (tcp->timer.id != TIMER_NONE) {
+ pj_timer_heap_cancel(tcp->base.listener->server->core.timer_heap,
+ &tcp->timer);
+ tcp->timer.id = TIMER_NONE;
+ }
+}
+
+
+static void tcp_dec_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc)
+{
+ struct tcp_transport *tcp = (struct tcp_transport*) tp;
+
+ --tcp->ref_cnt;
+
+ if (alloc && alloc == tcp->alloc) {
+ tcp->alloc = NULL;
+ }
+
+ if (tcp->ref_cnt == 0 && tcp->timer.id == TIMER_NONE) {
+ pj_time_val delay = { SHUTDOWN_DELAY, 0 };
+ tcp->timer.id = TIMER_DESTROY;
+ pj_timer_heap_schedule(tcp->base.listener->server->core.timer_heap,
+ &tcp->timer, &delay);
+ }
+}
+
diff --git a/pjnath/src/pjturn-srv/listener_udp.c b/pjnath/src/pjturn-srv/listener_udp.c
index b634d092..2c0eccdc 100644
--- a/pjnath/src/pjturn-srv/listener_udp.c
+++ b/pjnath/src/pjturn-srv/listener_udp.c
@@ -17,6 +17,7 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "turn.h"
+#include <pj/compat/socket.h>
struct read_op
{
@@ -27,22 +28,30 @@ struct read_op
struct udp_listener
{
pj_turn_listener base;
+
pj_ioqueue_key_t *key;
unsigned read_cnt;
struct read_op **read_op; /* Array of read_op's */
+
+ pj_turn_transport tp; /* Transport instance */
};
-static pj_status_t udp_sendto(pj_turn_listener *listener,
+static pj_status_t udp_destroy(pj_turn_listener *udp);
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+
+static pj_status_t udp_sendto(pj_turn_transport *tp,
const void *packet,
pj_size_t size,
unsigned flag,
const pj_sockaddr_t *addr,
int addr_len);
-static pj_status_t udp_destroy(pj_turn_listener *udp);
-static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read);
+static void udp_add_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc);
+static void udp_dec_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc);
/*
@@ -70,11 +79,17 @@ PJ_DEF(pj_status_t) pj_turn_listener_create_udp( pj_turn_srv *srv,
udp->base.server = srv;
udp->base.tp_type = PJ_TURN_TP_UDP;
udp->base.sock = PJ_INVALID_SOCKET;
- udp->base.sendto = &udp_sendto;
udp->base.destroy = &udp_destroy;
udp->read_cnt = concurrency_cnt;
udp->base.flags = flags;
+ udp->tp.obj_name = udp->base.obj_name;
+ udp->tp.info = udp->base.info;
+ udp->tp.listener = &udp->base;
+ udp->tp.sendto = &udp_sendto;
+ udp->tp.add_ref = &udp_add_ref;
+ udp->tp.dec_ref = &udp_dec_ref;
+
/* Create socket */
status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &udp->base.sock);
if (status != PJ_SUCCESS)
@@ -172,7 +187,7 @@ static pj_status_t udp_destroy(pj_turn_listener *listener)
/*
* Callback to send packet.
*/
-static pj_status_t udp_sendto(pj_turn_listener *listener,
+static pj_status_t udp_sendto(pj_turn_transport *tp,
const void *packet,
pj_size_t size,
unsigned flag,
@@ -180,9 +195,27 @@ static pj_status_t udp_sendto(pj_turn_listener *listener,
int addr_len)
{
pj_ssize_t len = size;
- return pj_sock_sendto(listener->sock, packet, &len, flag, addr, addr_len);
+ return pj_sock_sendto(tp->listener->sock, packet, &len, flag, addr, addr_len);
+}
+
+
+static void udp_add_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc)
+{
+ /* Do nothing */
+ PJ_UNUSED_ARG(tp);
+ PJ_UNUSED_ARG(alloc);
}
+static void udp_dec_ref(pj_turn_transport *tp,
+ pj_turn_allocation *alloc)
+{
+ /* Do nothing */
+ PJ_UNUSED_ARG(tp);
+ PJ_UNUSED_ARG(alloc);
+}
+
+
/*
* Callback on received packet.
*/
@@ -211,7 +244,7 @@ static void on_read_complete(pj_ioqueue_key_t *key,
rpool = read_op->pkt.pool;
pj_pool_reset(rpool);
read_op->pkt.pool = rpool;
- read_op->pkt.listener = &udp->base;
+ read_op->pkt.transport = &udp->tp;
read_op->pkt.src.tp_type = udp->base.tp_type;
/* Read next packet */
@@ -227,6 +260,7 @@ static void on_read_complete(pj_ioqueue_key_t *key,
if (status != PJ_EPENDING && status != PJ_SUCCESS)
bytes_read = -status;
- } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
+ } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
+ status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
}
diff --git a/pjnath/src/pjturn-srv/main.c b/pjnath/src/pjturn-srv/main.c
index 348ad3db..4ebdcde8 100644
--- a/pjnath/src/pjturn-srv/main.c
+++ b/pjnath/src/pjturn-srv/main.c
@@ -140,6 +140,11 @@ int main()
status = pj_turn_listener_create_udp(srv, pj_AF_INET(), NULL,
PJ_STUN_PORT, 1, 0, &listener);
if (status != PJ_SUCCESS)
+ return err("Error creating UDP listener", status);
+
+ status = pj_turn_listener_create_tcp(srv, pj_AF_INET(), NULL,
+ PJ_STUN_PORT, 1, 0, &listener);
+ if (status != PJ_SUCCESS)
return err("Error creating listener", status);
status = pj_turn_srv_add_listener(srv, listener);
diff --git a/pjnath/src/pjturn-srv/server.c b/pjnath/src/pjturn-srv/server.c
index 66f1c6a7..17ded7b0 100644
--- a/pjnath/src/pjturn-srv/server.c
+++ b/pjnath/src/pjturn-srv/server.c
@@ -28,11 +28,12 @@
#define MAX_PORT 65535
#define MAX_LISTENERS 16
#define MAX_THREADS 2
-#define MAX_NET_EVENTS 10
+#define MAX_NET_EVENTS 1000
/* Prototypes */
static int server_thread_proc(void *arg);
static pj_status_t on_tx_stun_msg( pj_stun_session *sess,
+ void *token,
const void *pkt,
pj_size_t pkt_size,
const pj_sockaddr_t *dst_addr,
@@ -41,6 +42,7 @@ static pj_status_t on_rx_stun_request(pj_stun_session *sess,
const pj_uint8_t *pkt,
unsigned pkt_len,
const pj_stun_rx_data *rdata,
+ void *user_data,
const pj_sockaddr_t *src_addr,
unsigned src_addr_len);
@@ -77,6 +79,7 @@ PJ_DEF(pj_status_t) pj_turn_srv_create(pj_pool_factory *pf,
pj_turn_srv **p_srv)
{
pj_pool_t *pool;
+ pj_stun_session_cb sess_cb;
pj_turn_srv *srv;
unsigned i;
pj_status_t status;
@@ -124,11 +127,6 @@ PJ_DEF(pj_status_t) pj_turn_srv_create(pj_pool_factory *pf,
pj_pool_calloc(pool, MAX_LISTENERS,
sizeof(srv->core.listener[0]));
- /* Array of STUN sessions, one for each listener */
- srv->core.stun_sess = (pj_stun_session**)
- pj_pool_calloc(pool, MAX_LISTENERS,
- (sizeof(srv->core.stun_sess[0])));
-
/* Create hash tables */
srv->tables.alloc = pj_hash_create(pool, MAX_CLIENTS);
srv->tables.res = pj_hash_create(pool, MAX_CLIENTS);
@@ -150,6 +148,22 @@ PJ_DEF(pj_status_t) pj_turn_srv_create(pj_pool_factory *pf,
srv->core.cred.data.dyn_cred.get_password = &pj_turn_get_password;
srv->core.cred.data.dyn_cred.verify_nonce = &pj_turn_verify_nonce;
+ /* Create STUN session to handle new allocation */
+ pj_bzero(&sess_cb, sizeof(sess_cb));
+ sess_cb.on_rx_request = &on_rx_stun_request;
+ sess_cb.on_send_msg = &on_tx_stun_msg;
+
+ status = pj_stun_session_create(&srv->core.stun_cfg, srv->obj_name,
+ &sess_cb, PJ_FALSE, &srv->core.stun_sess);
+ if (status != PJ_SUCCESS) {
+ goto on_error;
+ }
+
+ pj_stun_session_set_user_data(srv->core.stun_sess, srv);
+ pj_stun_session_set_credential(srv->core.stun_sess, PJ_STUN_AUTH_LONG_TERM,
+ &srv->core.cred);
+
+
/* Array of worker threads */
srv->core.thread_cnt = MAX_THREADS;
srv->core.thread = (pj_thread_t**)
@@ -240,7 +254,7 @@ static int server_thread_proc(void *arg)
pj_turn_srv *srv = (pj_turn_srv*)arg;
while (!srv->core.quit) {
- pj_time_val timeout_max = {0, 500};
+ pj_time_val timeout_max = {0, 100};
srv_handle_events(srv, &timeout_max);
}
@@ -277,16 +291,18 @@ PJ_DEF(pj_status_t) pj_turn_srv_destroy(pj_turn_srv *srv)
}
}
- /* Destroy all listeners and STUN sessions associated with them. */
+ /* Destroy all listeners. */
for (i=0; i<srv->core.lis_cnt; ++i) {
if (srv->core.listener[i]) {
pj_turn_listener_destroy(srv->core.listener[i]);
srv->core.listener[i] = NULL;
}
- if (srv->core.stun_sess[i]) {
- pj_stun_session_destroy(srv->core.stun_sess[i]);
- srv->core.stun_sess[i] = NULL;
- }
+ }
+
+ /* Destroy STUN session */
+ if (srv->core.stun_sess) {
+ pj_stun_session_destroy(srv->core.stun_sess);
+ srv->core.stun_sess = NULL;
}
/* Destroy hash tables (well, sort of) */
@@ -341,10 +357,7 @@ PJ_DEF(pj_status_t) pj_turn_srv_destroy(pj_turn_srv *srv)
PJ_DEF(pj_status_t) pj_turn_srv_add_listener(pj_turn_srv *srv,
pj_turn_listener *lis)
{
- pj_stun_session_cb sess_cb;
unsigned index;
- pj_stun_session *sess;
- pj_status_t status;
PJ_ASSERT_RETURN(srv && lis, PJ_EINVAL);
PJ_ASSERT_RETURN(srv->core.lis_cnt < MAX_LISTENERS, PJ_ETOOMANY);
@@ -353,24 +366,6 @@ PJ_DEF(pj_status_t) pj_turn_srv_add_listener(pj_turn_srv *srv,
index = srv->core.lis_cnt;
srv->core.listener[index] = lis;
lis->server = srv;
-
- /* Create STUN session to handle new allocation */
- pj_bzero(&sess_cb, sizeof(sess_cb));
- sess_cb.on_rx_request = &on_rx_stun_request;
- sess_cb.on_send_msg = &on_tx_stun_msg;
-
- status = pj_stun_session_create(&srv->core.stun_cfg, lis->obj_name,
- &sess_cb, PJ_FALSE, &sess);
- if (status != PJ_SUCCESS) {
- srv->core.listener[index] = NULL;
- return status;
- }
-
- pj_stun_session_set_user_data(sess, lis);
- pj_stun_session_set_credential(sess, PJ_STUN_AUTH_LONG_TERM,
- &srv->core.cred);
-
- srv->core.stun_sess[index] = sess;
lis->id = index;
srv->core.lis_cnt++;
@@ -382,21 +377,6 @@ PJ_DEF(pj_status_t) pj_turn_srv_add_listener(pj_turn_srv *srv,
/*
- * Send packet with this listener.
- */
-PJ_DEF(pj_status_t) pj_turn_listener_sendto(pj_turn_listener *listener,
- const void *packet,
- pj_size_t size,
- unsigned flag,
- const pj_sockaddr_t *addr,
- int addr_len)
-{
- pj_assert(listener->id != PJ_TURN_INVALID_LIS_ID);
- return listener->sendto(listener, packet, size, flag, addr, addr_len);
-}
-
-
-/*
* Destroy listener.
*/
PJ_DEF(pj_status_t) pj_turn_listener_destroy(pj_turn_listener *listener)
@@ -411,10 +391,6 @@ PJ_DEF(pj_status_t) pj_turn_listener_destroy(pj_turn_listener *listener)
srv->core.listener[i] = NULL;
srv->core.lis_cnt--;
listener->id = PJ_TURN_INVALID_LIS_ID;
- if (srv->core.stun_sess[i]) {
- pj_stun_session_destroy(srv->core.stun_sess[i]);
- srv->core.stun_sess[i] = NULL;
- }
break;
}
}
@@ -425,6 +401,26 @@ PJ_DEF(pj_status_t) pj_turn_listener_destroy(pj_turn_listener *listener)
}
+/**
+ * Add a reference to a transport.
+ */
+PJ_DEF(void) pj_turn_transport_add_ref( pj_turn_transport *transport,
+ pj_turn_allocation *alloc)
+{
+ transport->add_ref(transport, alloc);
+}
+
+
+/**
+ * Decrement transport reference counter.
+ */
+PJ_DEF(void) pj_turn_transport_dec_ref( pj_turn_transport *transport,
+ pj_turn_allocation *alloc)
+{
+ transport->dec_ref(transport, alloc);
+}
+
+
/*
* Register an allocation to the hash tables.
*/
@@ -466,24 +462,26 @@ PJ_DEF(pj_status_t) pj_turn_srv_unregister_allocation(pj_turn_srv *srv,
* outgoing STUN packet.
*/
static pj_status_t on_tx_stun_msg( pj_stun_session *sess,
- const void *pkt,
- pj_size_t pkt_size,
+ void *token,
+ const void *pdu,
+ pj_size_t pdu_size,
const pj_sockaddr_t *dst_addr,
unsigned addr_len)
{
- pj_turn_listener *listener;
+ pj_turn_transport *transport = (pj_turn_transport*) token;
- listener = (pj_turn_listener*) pj_stun_session_get_user_data(sess);
+ PJ_ASSERT_RETURN(transport!=NULL, PJ_EINVALIDOP);
- PJ_ASSERT_RETURN(listener!=NULL, PJ_EINVALIDOP);
+ PJ_UNUSED_ARG(sess);
- return pj_turn_listener_sendto(listener, pkt, pkt_size, 0,
- dst_addr, addr_len);
+ return transport->sendto(transport, pdu, pdu_size, 0,
+ dst_addr, addr_len);
}
/* Respond to STUN request */
static pj_status_t stun_respond(pj_stun_session *sess,
+ pj_turn_transport *transport,
const pj_stun_rx_data *rdata,
unsigned code,
const char *errmsg,
@@ -503,36 +501,38 @@ static pj_status_t stun_respond(pj_stun_session *sess,
return status;
/* Send the response */
- return pj_stun_session_send_msg(sess, cache, dst_addr, addr_len, tdata);
+ return pj_stun_session_send_msg(sess, transport, cache, PJ_FALSE,
+ dst_addr, addr_len, tdata);
}
/* Callback from our own STUN session when incoming request arrives.
* This function is triggered by pj_stun_session_on_rx_pkt() call in
- * pj_turn_srv_on_rx_pkt() function below.
+ * pj_turn_srv_on_rx_pkt() function below.
*/
static pj_status_t on_rx_stun_request(pj_stun_session *sess,
- const pj_uint8_t *pkt,
- unsigned pkt_len,
+ const pj_uint8_t *pdu,
+ unsigned pdu_len,
const pj_stun_rx_data *rdata,
+ void *token,
const pj_sockaddr_t *src_addr,
unsigned src_addr_len)
{
- pj_turn_listener *listener;
+ pj_turn_transport *transport;
const pj_stun_msg *msg = rdata->msg;
pj_turn_srv *srv;
pj_turn_allocation *alloc;
pj_status_t status;
- PJ_UNUSED_ARG(pkt);
- PJ_UNUSED_ARG(pkt_len);
+ PJ_UNUSED_ARG(pdu);
+ PJ_UNUSED_ARG(pdu_len);
- listener = (pj_turn_listener*) pj_stun_session_get_user_data(sess);
- srv = listener->server;
+ transport = (pj_turn_transport*) token;
+ srv = transport->listener->server;
/* Respond any requests other than ALLOCATE with 437 response */
if (msg->hdr.type != PJ_STUN_ALLOCATE_REQUEST) {
- stun_respond(sess, rdata, PJ_STUN_SC_ALLOCATION_MISMATCH,
+ stun_respond(sess, transport, rdata, PJ_STUN_SC_ALLOCATION_MISMATCH,
NULL, PJ_FALSE, src_addr, src_addr_len);
return PJ_SUCCESS;
}
@@ -540,7 +540,7 @@ static pj_status_t on_rx_stun_request(pj_stun_session *sess,
/* Create new allocation. The relay resource will be allocated
* in this function.
*/
- status = pj_turn_allocation_create(listener, src_addr, src_addr_len,
+ status = pj_turn_allocation_create(transport, src_addr, src_addr_len,
rdata, sess, &alloc);
if (status != PJ_SUCCESS) {
/* STUN response has been sent, no need to reply here */
@@ -576,37 +576,53 @@ PJ_DEF(void) pj_turn_srv_on_rx_pkt(pj_turn_srv *srv,
pj_turn_allocation_on_rx_client_pkt(alloc, pkt);
} else {
/* Otherwise this is a new client */
- unsigned options, lis_id;
+ unsigned options;
+ unsigned parsed_len;
pj_status_t status;
/* Check that this is a STUN message */
options = PJ_STUN_CHECK_PACKET;
- if (pkt->listener->tp_type == PJ_TURN_TP_UDP)
+ if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP)
options |= PJ_STUN_IS_DATAGRAM;
status = pj_stun_msg_check(pkt->pkt, pkt->len, options);
if (status != PJ_SUCCESS) {
- char errmsg[PJ_ERR_MSG_SIZE];
- char ip[PJ_INET6_ADDRSTRLEN+10];
-
- pj_strerror(status, errmsg, sizeof(errmsg));
- PJ_LOG(5,(srv->obj_name,
- "Non STUN packet from %s is dropped: %s",
- pj_sockaddr_print(&pkt->src.clt_addr, ip, sizeof(ip), 3),
- errmsg));
+ /* If the first byte are not STUN, drop the packet. First byte
+ * of STUN message is always 0x00 or 0x01. Otherwise wait for
+ * more data as the data might have come from TCP.
+ *
+ * Also drop packet if it's unreasonably too big, as this might
+ * indicate invalid data that's building up in the buffer.
+ *
+ * Or if packet is a datagram.
+ */
+ if ((*pkt->pkt != 0x00 && *pkt->pkt != 0x01) ||
+ pkt->len > 1600 ||
+ (options & PJ_STUN_IS_DATAGRAM))
+ {
+ char errmsg[PJ_ERR_MSG_SIZE];
+ char ip[PJ_INET6_ADDRSTRLEN+10];
+
+ pkt->len = 0;
+
+ pj_strerror(status, errmsg, sizeof(errmsg));
+ PJ_LOG(5,(srv->obj_name,
+ "Non-STUN packet from %s is dropped: %s",
+ pj_sockaddr_print(&pkt->src.clt_addr, ip, sizeof(ip), 3),
+ errmsg));
+ }
return;
}
- lis_id = pkt->listener->id;
-
/* Hand over processing to STUN session. This will trigger
* on_rx_stun_request() callback to be called if the STUN
* message is a request.
*/
options &= ~PJ_STUN_CHECK_PACKET;
- status = pj_stun_session_on_rx_pkt(srv->core.stun_sess[lis_id],
- pkt->pkt, pkt->len, options, NULL,
- &pkt->src.clt_addr,
+ parsed_len = 0;
+ status = pj_stun_session_on_rx_pkt(srv->core.stun_sess, pkt->pkt,
+ pkt->len, options, pkt->transport,
+ &parsed_len, &pkt->src.clt_addr,
pkt->src_addr_len);
if (status != PJ_SUCCESS) {
char errmsg[PJ_ERR_MSG_SIZE];
@@ -617,7 +633,18 @@ PJ_DEF(void) pj_turn_srv_on_rx_pkt(pj_turn_srv *srv,
"Error processing STUN packet from %s: %s",
pj_sockaddr_print(&pkt->src.clt_addr, ip, sizeof(ip), 3),
errmsg));
- return;
+ }
+
+ if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP) {
+ pkt->len = 0;
+ } else if (parsed_len > 0) {
+ if (parsed_len == pkt->len) {
+ pkt->len = 0;
+ } else {
+ pj_memmove(pkt->pkt, pkt->pkt+parsed_len,
+ pkt->len - parsed_len);
+ pkt->len -= parsed_len;
+ }
}
}
}
diff --git a/pjnath/src/pjturn-srv/turn.h b/pjnath/src/pjturn-srv/turn.h
index 337e8746..ae9cc590 100644
--- a/pjnath/src/pjturn-srv/turn.h
+++ b/pjnath/src/pjturn-srv/turn.h
@@ -22,10 +22,11 @@
#include <pjlib.h>
#include <pjnath.h>
-typedef struct pj_turn_relay_res pj_turn_relay_res;
+typedef struct pj_turn_relay_res pj_turn_relay_res;
typedef struct pj_turn_listener pj_turn_listener;
-typedef struct pj_turn_permission pj_turn_permission;
-typedef struct pj_turn_allocation pj_turn_allocation;
+typedef struct pj_turn_transport pj_turn_transport;
+typedef struct pj_turn_permission pj_turn_permission;
+typedef struct pj_turn_allocation pj_turn_allocation;
typedef struct pj_turn_srv pj_turn_srv;
typedef struct pj_turn_pkt pj_turn_pkt;
@@ -132,11 +133,11 @@ struct pj_turn_allocation
/** Mutex */
pj_lock_t *lock;
- /** TURN listener. */
- pj_turn_listener *listener;
+ /** Server instance. */
+ pj_turn_srv *server;
- /** Client socket, if connection to client is using TCP. */
- pj_sock_t clt_sock;
+ /** Transport to send/receive packets to/from client. */
+ pj_turn_transport *transport;
/** The relay resource for this allocation. */
pj_turn_relay_res relay;
@@ -181,11 +182,6 @@ struct pj_turn_permission
/** Hash table key */
pj_turn_permission_key hkey;
- /** Transport socket. If TCP is used, the value will be the actual
- * TCP socket. If UDP is used, the value will be the relay address
- */
- pj_sock_t sock;
-
/** TURN allocation that owns this permission/channel */
pj_turn_allocation *allocation;
@@ -201,12 +197,12 @@ struct pj_turn_permission
/**
* Create new allocation.
*/
-PJ_DECL(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener,
- const pj_sockaddr_t *src_addr,
- unsigned src_addr_len,
- const pj_stun_rx_data *rdata,
- pj_stun_session *srv_sess,
- pj_turn_allocation **p_alloc);
+PJ_DECL(pj_status_t) pj_turn_allocation_create(pj_turn_transport *transport,
+ const pj_sockaddr_t *src_addr,
+ unsigned src_addr_len,
+ const pj_stun_rx_data *rdata,
+ pj_stun_session *srv_sess,
+ pj_turn_allocation **p_alloc);
/**
* Destroy allocation.
*/
@@ -217,7 +213,13 @@ PJ_DECL(void) pj_turn_allocation_destroy(pj_turn_allocation *alloc);
* Handle incoming packet from client.
*/
PJ_DECL(void) pj_turn_allocation_on_rx_client_pkt(pj_turn_allocation *alloc,
- pj_turn_pkt *pkt);
+ pj_turn_pkt *pkt);
+
+/**
+ * Handle transport closure.
+ */
+PJ_DECL(void) pj_turn_allocation_on_transport_closed(pj_turn_allocation *alloc,
+ pj_turn_transport *tp);
/****************************************************************************/
/*
@@ -257,16 +259,42 @@ struct pj_turn_listener
/** Flags. */
unsigned flags;
+ /** Destroy handler */
+ pj_status_t (*destroy)(pj_turn_listener*);
+};
+
+
+/**
+ * This structure describes TURN transport socket which is used to send and
+ * receive packets towards client.
+ */
+struct pj_turn_transport
+{
+ /** Object name/identification */
+ char *obj_name;
+
+ /** Slightly longer info about this listener */
+ char *info;
+
+ /** Listener instance */
+ pj_turn_listener *listener;
+
/** Sendto handler */
- pj_status_t (*sendto)(pj_turn_listener *listener,
+ pj_status_t (*sendto)(pj_turn_transport *tp,
const void *packet,
pj_size_t size,
unsigned flag,
const pj_sockaddr_t *addr,
int addr_len);
- /** Destroy handler */
- pj_status_t (*destroy)(pj_turn_listener*);
+ /** Addref handler */
+ void (*add_ref)(pj_turn_transport *tp,
+ pj_turn_allocation *alloc);
+
+ /** Decref handler */
+ void (*dec_ref)(pj_turn_transport *tp,
+ pj_turn_allocation *alloc);
+
};
@@ -278,8 +306,8 @@ struct pj_turn_pkt
/** Pool for this packet */
pj_pool_t *pool;
- /** Listener that owns this. */
- pj_turn_listener *listener;
+ /** Transport where the packet was received. */
+ pj_turn_transport *transport;
/** Packet buffer (must be 32bit aligned). */
pj_uint8_t pkt[PJ_TURN_MAX_PKT_LEN];
@@ -299,25 +327,26 @@ struct pj_turn_pkt
/**
- * Create a new listener on the specified port.
+ * Create a UDP listener on the specified port.
*/
PJ_DECL(pj_status_t) pj_turn_listener_create_udp(pj_turn_srv *srv,
- int af,
- const pj_str_t *bound_addr,
- unsigned port,
- unsigned concurrency_cnt,
- unsigned flags,
- pj_turn_listener **p_listener);
+ int af,
+ const pj_str_t *bound_addr,
+ unsigned port,
+ unsigned concurrency_cnt,
+ unsigned flags,
+ pj_turn_listener **p_lis);
/**
- * Send packet with this listener.
+ * Create a TCP listener on the specified port.
*/
-PJ_DECL(pj_status_t) pj_turn_listener_sendto(pj_turn_listener *listener,
- const void *packet,
- pj_size_t size,
- unsigned flag,
- const pj_sockaddr_t *addr,
- int addr_len);
+PJ_DECL(pj_status_t) pj_turn_listener_create_tcp(pj_turn_srv *srv,
+ int af,
+ const pj_str_t *bound_addr,
+ unsigned port,
+ unsigned concurrency_cnt,
+ unsigned flags,
+ pj_turn_listener **p_lis);
/**
* Destroy listener.
@@ -325,6 +354,21 @@ PJ_DECL(pj_status_t) pj_turn_listener_sendto(pj_turn_listener *listener,
PJ_DECL(pj_status_t) pj_turn_listener_destroy(pj_turn_listener *listener);
+/**
+ * Add a reference to a transport.
+ */
+PJ_DECL(void) pj_turn_transport_add_ref(pj_turn_transport *transport,
+ pj_turn_allocation *alloc);
+
+
+/**
+ * Decrement transport reference counter.
+ */
+PJ_DECL(void) pj_turn_transport_dec_ref(pj_turn_transport *transport,
+ pj_turn_allocation *alloc);
+
+
+
/****************************************************************************/
/*
* TURN Server API
@@ -360,8 +404,8 @@ struct pj_turn_srv
/** Array of listeners. */
pj_turn_listener **listener;
- /** Array of STUN sessions, one for each listeners. */
- pj_stun_session **stun_sess;
+ /** STUN session to handle initial Allocate request. */
+ pj_stun_session *stun_sess;
/** Number of worker threads. */
unsigned thread_cnt;
@@ -456,7 +500,7 @@ PJ_DECL(pj_status_t) pj_turn_srv_unregister_allocation(pj_turn_srv *srv,
* This callback is called by UDP listener on incoming packet.
*/
PJ_DECL(void) pj_turn_srv_on_rx_pkt(pj_turn_srv *srv,
- pj_turn_pkt *pkt);
+ pj_turn_pkt *pkt);
#endif /* __PJ_TURN_SRV_TURN_H__ */