diff options
Diffstat (limited to 'pjnath/src/pjturn-srv/listener_tcp.c')
-rw-r--r-- | pjnath/src/pjturn-srv/listener_tcp.c | 480 |
1 files changed, 480 insertions, 0 deletions
diff --git a/pjnath/src/pjturn-srv/listener_tcp.c b/pjnath/src/pjturn-srv/listener_tcp.c new file mode 100644 index 00000000..e5369e5a --- /dev/null +++ b/pjnath/src/pjturn-srv/listener_tcp.c @@ -0,0 +1,480 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "turn.h" +#include <pj/compat/socket.h> + +struct accept_op +{ + pj_ioqueue_op_key_t op_key; + pj_sock_t sock; + pj_sockaddr src_addr; + int src_addr_len; +}; + +struct tcp_listener +{ + pj_turn_listener base; + pj_ioqueue_key_t *key; + unsigned accept_cnt; + struct accept_op *accept_op; /* Array of accept_op's */ +}; + + +static void lis_on_accept_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t sock, + pj_status_t status); +static pj_status_t lis_destroy(pj_turn_listener *listener); +static void transport_create(pj_sock_t sock, pj_turn_listener *lis, + pj_sockaddr_t *src_addr, int src_addr_len); + +static void show_err(const char *sender, const char *title, + pj_status_t status) +{ + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(sender, "%s: %s", title, errmsg)); +} + + +/* + * Create a new listener on the specified port. + */ +PJ_DEF(pj_status_t) pj_turn_listener_create_tcp(pj_turn_srv *srv, + int af, + const pj_str_t *bound_addr, + unsigned port, + unsigned concurrency_cnt, + unsigned flags, + pj_turn_listener **p_listener) +{ + pj_pool_t *pool; + struct tcp_listener *tcp_lis; + pj_ioqueue_callback ioqueue_cb; + unsigned i; + pj_status_t status; + + /* Create structure */ + pool = pj_pool_create(srv->core.pf, "tcpl%p", 1000, 1000, NULL); + tcp_lis = PJ_POOL_ZALLOC_T(pool, struct tcp_listener); + tcp_lis->base.pool = pool; + tcp_lis->base.obj_name = pool->obj_name; + tcp_lis->base.server = srv; + tcp_lis->base.tp_type = PJ_TURN_TP_TCP; + tcp_lis->base.sock = PJ_INVALID_SOCKET; + //tcp_lis->base.sendto = &tcp_sendto; + tcp_lis->base.destroy = &lis_destroy; + tcp_lis->accept_cnt = concurrency_cnt; + tcp_lis->base.flags = flags; + + /* Create socket */ + status = pj_sock_socket(af, pj_SOCK_STREAM(), 0, &tcp_lis->base.sock); + if (status != PJ_SUCCESS) + goto on_error; + + /* Init bind address */ + status = pj_sockaddr_init(af, &tcp_lis->base.addr, bound_addr, + (pj_uint16_t)port); + if (status != PJ_SUCCESS) + goto on_error; + + /* Create info */ + pj_ansi_strcpy(tcp_lis->base.info, "TCP:"); + pj_sockaddr_print(&tcp_lis->base.addr, tcp_lis->base.info+4, + sizeof(tcp_lis->base.info)-4, 3); + + /* Bind socket */ + status = pj_sock_bind(tcp_lis->base.sock, &tcp_lis->base.addr, + pj_sockaddr_get_len(&tcp_lis->base.addr)); + if (status != PJ_SUCCESS) + goto on_error; + + /* Listen() */ + status = pj_sock_listen(tcp_lis->base.sock, 5); + if (status != PJ_SUCCESS) + goto on_error; + + /* Register to ioqueue */ + pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb)); + ioqueue_cb.on_accept_complete = &lis_on_accept_complete; + status = pj_ioqueue_register_sock(pool, srv->core.ioqueue, tcp_lis->base.sock, + tcp_lis, &ioqueue_cb, &tcp_lis->key); + + /* Create op keys */ + tcp_lis->accept_op = (struct accept_op*)pj_pool_calloc(pool, concurrency_cnt, + sizeof(struct accept_op)); + + /* Create each accept_op and kick off read operation */ + for (i=0; i<concurrency_cnt; ++i) { + lis_on_accept_complete(tcp_lis->key, &tcp_lis->accept_op[i].op_key, + PJ_INVALID_SOCKET, PJ_EPENDING); + } + + /* Done */ + PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s created", + tcp_lis->base.info)); + + *p_listener = &tcp_lis->base; + return PJ_SUCCESS; + + +on_error: + lis_destroy(&tcp_lis->base); + return status; +} + + +/* + * Destroy listener. + */ +static pj_status_t lis_destroy(pj_turn_listener *listener) +{ + struct tcp_listener *tcp_lis = (struct tcp_listener *)listener; + unsigned i; + + if (tcp_lis->key) { + pj_ioqueue_unregister(tcp_lis->key); + tcp_lis->key = NULL; + tcp_lis->base.sock = PJ_INVALID_SOCKET; + } else if (tcp_lis->base.sock != PJ_INVALID_SOCKET) { + pj_sock_close(tcp_lis->base.sock); + tcp_lis->base.sock = PJ_INVALID_SOCKET; + } + + for (i=0; i<tcp_lis->accept_cnt; ++i) { + /* Nothing to do */ + } + + if (tcp_lis->base.pool) { + pj_pool_t *pool = tcp_lis->base.pool; + + PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s destroyed", + tcp_lis->base.info)); + + tcp_lis->base.pool = NULL; + pj_pool_release(pool); + } + return PJ_SUCCESS; +} + + +/* + * Callback on new TCP connection. + */ +static void lis_on_accept_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t sock, + pj_status_t status) +{ + struct tcp_listener *tcp_lis; + struct accept_op *accept_op = (struct accept_op*) op_key; + + tcp_lis = (struct tcp_listener*) pj_ioqueue_get_user_data(key); + + PJ_UNUSED_ARG(sock); + + do { + /* Report new connection. */ + if (status == PJ_SUCCESS) { + char addr[PJ_INET6_ADDRSTRLEN+8]; + PJ_LOG(5,(tcp_lis->base.obj_name, "Incoming TCP from %s", + pj_sockaddr_print(&accept_op->src_addr, addr, + sizeof(addr), 3))); + transport_create(accept_op->sock, &tcp_lis->base, + &accept_op->src_addr, accept_op->src_addr_len); + } else if (status != PJ_EPENDING) { + show_err(tcp_lis->base.obj_name, "accept()", status); + } + + /* Prepare next accept() */ + accept_op->src_addr_len = sizeof(accept_op->src_addr); + status = pj_ioqueue_accept(key, op_key, &accept_op->sock, + NULL, + &accept_op->src_addr, + &accept_op->src_addr_len); + + } while (status != PJ_EPENDING && status != PJ_ECANCELLED && + status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)); +} + + +/****************************************************************************/ +/* + * Transport + */ +enum +{ + TIMER_NONE, + TIMER_DESTROY +}; + +/* The delay in seconds to be applied before TCP transport is destroyed when + * no allocation is referencing it. This also means the initial time to wait + * after the initial TCP connection establishment to receive a valid STUN + * message in the transport. + */ +#define SHUTDOWN_DELAY 10 + +struct recv_op +{ + pj_ioqueue_op_key_t op_key; + pj_turn_pkt pkt; +}; + +struct tcp_transport +{ + pj_turn_transport base; + pj_pool_t *pool; + pj_timer_entry timer; + + pj_turn_allocation *alloc; + int ref_cnt; + + pj_sock_t sock; + pj_ioqueue_key_t *key; + struct recv_op recv_op; + pj_ioqueue_op_key_t send_op; +}; + + +static void tcp_on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); + +static pj_status_t tcp_sendto(pj_turn_transport *tp, + const void *packet, + pj_size_t size, + unsigned flag, + const pj_sockaddr_t *addr, + int addr_len); +static void tcp_destroy(struct tcp_transport *tcp); +static void tcp_add_ref(pj_turn_transport *tp, + pj_turn_allocation *alloc); +static void tcp_dec_ref(pj_turn_transport *tp, + pj_turn_allocation *alloc); +static void timer_callback(pj_timer_heap_t *timer_heap, + pj_timer_entry *entry); + +static void transport_create(pj_sock_t sock, pj_turn_listener *lis, + pj_sockaddr_t *src_addr, int src_addr_len) +{ + pj_pool_t *pool; + struct tcp_transport *tcp; + pj_ioqueue_callback cb; + pj_status_t status; + + pool = pj_pool_create(lis->server->core.pf, "tcp%p", 1000, 1000, NULL); + + tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport); + tcp->base.obj_name = pool->obj_name; + tcp->base.listener = lis; + tcp->base.info = lis->info; + tcp->base.sendto = &tcp_sendto; + tcp->base.add_ref = &tcp_add_ref; + tcp->base.dec_ref = &tcp_dec_ref; + tcp->pool = pool; + tcp->sock = sock; + + pj_timer_entry_init(&tcp->timer, TIMER_NONE, tcp, &timer_callback); + + /* Register to ioqueue */ + pj_bzero(&cb, sizeof(cb)); + cb.on_read_complete = &tcp_on_read_complete; + status = pj_ioqueue_register_sock(pool, lis->server->core.ioqueue, sock, + tcp, &cb, &tcp->key); + if (status != PJ_SUCCESS) { + tcp_destroy(tcp); + return; + } + + /* Init pkt */ + tcp->recv_op.pkt.pool = pj_pool_create(lis->server->core.pf, "tcpkt%p", + 1000, 1000, NULL); + tcp->recv_op.pkt.transport = &tcp->base; + tcp->recv_op.pkt.src.tp_type = PJ_TURN_TP_TCP; + tcp->recv_op.pkt.src_addr_len = src_addr_len; + pj_memcpy(&tcp->recv_op.pkt.src.clt_addr, src_addr, src_addr_len); + + tcp_on_read_complete(tcp->key, &tcp->recv_op.op_key, -PJ_EPENDING); + /* Should not access transport from now, it may have been destroyed */ +} + + +static void tcp_destroy(struct tcp_transport *tcp) +{ + if (tcp->key) { + pj_ioqueue_unregister(tcp->key); + tcp->key = NULL; + tcp->sock = 0; + } else if (tcp->sock) { + pj_sock_close(tcp->sock); + tcp->sock = 0; + } + + if (tcp->pool) { + pj_pool_release(tcp->pool); + } +} + + +static void timer_callback(pj_timer_heap_t *timer_heap, + pj_timer_entry *entry) +{ + struct tcp_transport *tcp = (struct tcp_transport*) entry->user_data; + + PJ_UNUSED_ARG(timer_heap); + + tcp_destroy(tcp); +} + + +static void tcp_on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + struct tcp_transport *tcp; + struct recv_op *recv_op = (struct recv_op*) op_key; + pj_status_t status; + + tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key); + + do { + /* Report to server or allocation, if we have allocation */ + if (bytes_read > 0) { + + recv_op->pkt.len = bytes_read; + pj_gettimeofday(&recv_op->pkt.rx_time); + + tcp_add_ref(&tcp->base, NULL); + + if (tcp->alloc) { + pj_turn_allocation_on_rx_client_pkt(tcp->alloc, &recv_op->pkt); + } else { + pj_turn_srv_on_rx_pkt(tcp->base.listener->server, &recv_op->pkt); + } + + pj_assert(tcp->ref_cnt > 0); + tcp_dec_ref(&tcp->base, NULL); + + } else if (bytes_read != -PJ_EPENDING) { + /* TCP connection closed/error. Notify client and then destroy + * ourselves. + * Note: the -PJ_EPENDING is the value passed during init. + */ + ++tcp->ref_cnt; + + if (tcp->alloc) { + if (bytes_read != 0) { + show_err(tcp->base.obj_name, "TCP socket error", + -bytes_read); + } else { + PJ_LOG(5,(tcp->base.obj_name, "TCP socket closed")); + } + pj_turn_allocation_on_transport_closed(tcp->alloc, &tcp->base); + tcp->alloc = NULL; + } + + pj_assert(tcp->ref_cnt > 0); + if (--tcp->ref_cnt == 0) { + tcp_destroy(tcp); + return; + } + } + + /* Reset pool */ + pj_pool_reset(recv_op->pkt.pool); + + /* If packet is full discard it */ + if (recv_op->pkt.len == sizeof(recv_op->pkt.pkt)) { + PJ_LOG(4,(tcp->base.obj_name, "Buffer discarded")); + recv_op->pkt.len = 0; + } + + /* Read next packet */ + bytes_read = sizeof(recv_op->pkt.pkt) - recv_op->pkt.len; + status = pj_ioqueue_recv(tcp->key, op_key, + recv_op->pkt.pkt + recv_op->pkt.len, + &bytes_read, 0); + + if (status != PJ_EPENDING && status != PJ_SUCCESS) + bytes_read = -status; + + } while (status != PJ_EPENDING && status != PJ_ECANCELLED && + status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)); + +} + + +static pj_status_t tcp_sendto(pj_turn_transport *tp, + const void *packet, + pj_size_t size, + unsigned flag, + const pj_sockaddr_t *addr, + int addr_len) +{ + struct tcp_transport *tcp = (struct tcp_transport*) tp; + pj_ssize_t length = size; + + PJ_UNUSED_ARG(addr); + PJ_UNUSED_ARG(addr_len); + + return pj_ioqueue_send(tcp->key, &tcp->send_op, packet, &length, flag); +} + + +static void tcp_add_ref(pj_turn_transport *tp, + pj_turn_allocation *alloc) +{ + struct tcp_transport *tcp = (struct tcp_transport*) tp; + + ++tcp->ref_cnt; + + if (tcp->alloc == NULL && alloc) { + tcp->alloc = alloc; + } + + /* Cancel shutdown timer if it's running */ + if (tcp->timer.id != TIMER_NONE) { + pj_timer_heap_cancel(tcp->base.listener->server->core.timer_heap, + &tcp->timer); + tcp->timer.id = TIMER_NONE; + } +} + + +static void tcp_dec_ref(pj_turn_transport *tp, + pj_turn_allocation *alloc) +{ + struct tcp_transport *tcp = (struct tcp_transport*) tp; + + --tcp->ref_cnt; + + if (alloc && alloc == tcp->alloc) { + tcp->alloc = NULL; + } + + if (tcp->ref_cnt == 0 && tcp->timer.id == TIMER_NONE) { + pj_time_val delay = { SHUTDOWN_DELAY, 0 }; + tcp->timer.id = TIMER_DESTROY; + pj_timer_heap_schedule(tcp->base.listener->server->core.timer_heap, + &tcp->timer, &delay); + } +} + |