/* $Id$ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "turn.h" #include "auth.h" #define MAX_CLIENTS 32 #define MAX_PEERS_PER_CLIENT 8 //#define MAX_HANDLES (MAX_CLIENTS*MAX_PEERS_PER_CLIENT+MAX_LISTENERS) #define MAX_HANDLES PJ_IOQUEUE_MAX_HANDLES #define MAX_TIMER (MAX_HANDLES * 2) #define MIN_PORT 49152 #define MAX_PORT 65535 #define MAX_LISTENERS 16 #define MAX_THREADS 2 #define MAX_NET_EVENTS 1000 /* Prototypes */ static int server_thread_proc(void *arg); static pj_status_t on_tx_stun_msg( pj_stun_session *sess, void *token, const void *pkt, pj_size_t pkt_size, const pj_sockaddr_t *dst_addr, unsigned addr_len); static pj_status_t on_rx_stun_request(pj_stun_session *sess, const pj_uint8_t *pkt, unsigned pkt_len, const pj_stun_rx_data *rdata, void *user_data, const pj_sockaddr_t *src_addr, unsigned src_addr_len); struct saved_cred { pj_str_t realm; pj_str_t username; pj_str_t nonce; int data_type; pj_str_t data; }; /* * Get transport type name, normally for logging purpose only. */ PJ_DEF(const char*) pj_turn_tp_type_name(int tp_type) { /* Must be 3 characters long! */ if (tp_type == PJ_TURN_TP_UDP) { return "UDP"; } else if (tp_type == PJ_TURN_TP_TCP) { return "TCP"; } else { pj_assert(!"Unsupported transport"); return "???"; } } /* * Create server. */ PJ_DEF(pj_status_t) pj_turn_srv_create(pj_pool_factory *pf, pj_turn_srv **p_srv) { pj_pool_t *pool; pj_stun_session_cb sess_cb; pj_turn_srv *srv; unsigned i; pj_status_t status; PJ_ASSERT_RETURN(pf && p_srv, PJ_EINVAL); /* Create server and init core settings */ pool = pj_pool_create(pf, "srv%p", 1000, 1000, NULL); srv = PJ_POOL_ZALLOC_T(pool, pj_turn_srv); srv->obj_name = pool->obj_name; srv->core.pf = pf; srv->core.pool = pool; srv->core.tls_key = srv->core.tls_data = -1; /* Create ioqueue */ status = pj_ioqueue_create(pool, MAX_HANDLES, &srv->core.ioqueue); if (status != PJ_SUCCESS) goto on_error; /* Server mutex */ status = pj_lock_create_recursive_mutex(pool, srv->obj_name, &srv->core.lock); if (status != PJ_SUCCESS) goto on_error; /* Allocate TLS */ status = pj_thread_local_alloc(&srv->core.tls_key); if (status != PJ_SUCCESS) goto on_error; status = pj_thread_local_alloc(&srv->core.tls_data); if (status != PJ_SUCCESS) goto on_error; /* Create timer heap */ status = pj_timer_heap_create(pool, MAX_TIMER, &srv->core.timer_heap); if (status != PJ_SUCCESS) goto on_error; /* Configure lock for the timer heap */ pj_timer_heap_set_lock(srv->core.timer_heap, srv->core.lock, PJ_FALSE); /* Array of listeners */ srv->core.listener = (pj_turn_listener**) pj_pool_calloc(pool, MAX_LISTENERS, sizeof(srv->core.listener[0])); /* Create hash tables */ srv->tables.alloc = pj_hash_create(pool, MAX_CLIENTS); srv->tables.res = pj_hash_create(pool, MAX_CLIENTS); /* Init ports settings */ srv->ports.min_udp = srv->ports.next_udp = MIN_PORT; srv->ports.max_udp = MAX_PORT; srv->ports.min_tcp = srv->ports.next_tcp = MIN_PORT; srv->ports.max_tcp = MAX_PORT; /* Init STUN config */ pj_stun_config_init(&srv->core.stun_cfg, pf, 0, srv->core.ioqueue, srv->core.timer_heap); /* Init STUN credential */ srv->core.cred.type = PJ_STUN_AUTH_CRED_DYNAMIC; srv->core.cred.data.dyn_cred.user_data = srv; srv->core.cred.data.dyn_cred.get_auth = &pj_turn_get_auth; srv->core.cred.data.dyn_cred.get_password = &pj_turn_get_password; srv->core.cred.data.dyn_cred.verify_nonce = &pj_turn_verify_nonce; /* Create STUN session to handle new allocation */ pj_bzero(&sess_cb, sizeof(sess_cb)); sess_cb.on_rx_request = &on_rx_stun_request; sess_cb.on_send_msg = &on_tx_stun_msg; status = pj_stun_session_create(&srv->core.stun_cfg, srv->obj_name, &sess_cb, PJ_FALSE, NULL, &srv->core.stun_sess); if (status != PJ_SUCCESS) { goto on_error; } pj_stun_session_set_user_data(srv->core.stun_sess, srv); pj_stun_session_set_credential(srv->core.stun_sess, PJ_STUN_AUTH_LONG_TERM, &srv->core.cred); /* Array of worker threads */ srv->core.thread_cnt = MAX_THREADS; srv->core.thread = (pj_thread_t**) pj_pool_calloc(pool, srv->core.thread_cnt, sizeof(pj_thread_t*)); /* Start the worker threads */ for (i=0; icore.thread_cnt; ++i) { status = pj_thread_create(pool, srv->obj_name, &server_thread_proc, srv, 0, 0, &srv->core.thread[i]); if (status != PJ_SUCCESS) goto on_error; } /* We're done. Application should add listeners now */ PJ_LOG(4,(srv->obj_name, "TURN server v%s is running", pj_get_version())); *p_srv = srv; return PJ_SUCCESS; on_error: pj_turn_srv_destroy(srv); return status; } /* * Handle timer and network events */ static void srv_handle_events(pj_turn_srv *srv, const pj_time_val *max_timeout) { /* timeout is 'out' var. This just to make compiler happy. */ pj_time_val timeout = { 0, 0}; unsigned net_event_count = 0; int c; /* Poll the timer. The timer heap has its own mutex for better * granularity, so we don't need to lock the server. */ timeout.sec = timeout.msec = 0; c = pj_timer_heap_poll( srv->core.timer_heap, &timeout ); /* timer_heap_poll should never ever returns negative value, or otherwise * ioqueue_poll() will block forever! */ pj_assert(timeout.sec >= 0 && timeout.msec >= 0); if (timeout.msec >= 1000) timeout.msec = 999; /* If caller specifies maximum time to wait, then compare the value with * the timeout to wait from timer, and use the minimum value. */ if (max_timeout && PJ_TIME_VAL_GT(timeout, *max_timeout)) { timeout = *max_timeout; } /* Poll ioqueue. * Repeat polling the ioqueue while we have immediate events, because * timer heap may process more than one events, so if we only process * one network events at a time (such as when IOCP backend is used), * the ioqueue may have trouble keeping up with the request rate. * * For example, for each send() request, one network event will be * reported by ioqueue for the send() completion. If we don't poll * the ioqueue often enough, the send() completion will not be * reported in timely manner. */ do { c = pj_ioqueue_poll( srv->core.ioqueue, &timeout); if (c < 0) { pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); return; } else if (c == 0) { break; } else { net_event_count += c; timeout.sec = timeout.msec = 0; } } while (c > 0 && net_event_count < MAX_NET_EVENTS); } /* * Server worker thread proc. */ static int server_thread_proc(void *arg) { pj_turn_srv *srv = (pj_turn_srv*)arg; while (!srv->core.quit) { pj_time_val timeout_max = {0, 100}; srv_handle_events(srv, &timeout_max); } return 0; } /* * Destroy the server. */ PJ_DEF(pj_status_t) pj_turn_srv_destroy(pj_turn_srv *srv) { pj_hash_iterator_t itbuf, *it; unsigned i; /* Stop all worker threads */ srv->core.quit = PJ_TRUE; for (i=0; icore.thread_cnt; ++i) { if (srv->core.thread[i]) { pj_thread_join(srv->core.thread[i]); pj_thread_destroy(srv->core.thread[i]); srv->core.thread[i] = NULL; } } /* Destroy all allocations FIRST */ if (srv->tables.alloc) { it = pj_hash_first(srv->tables.alloc, &itbuf); while (it != NULL) { pj_turn_allocation *alloc = (pj_turn_allocation*) pj_hash_this(srv->tables.alloc, it); pj_hash_iterator_t *next = pj_hash_next(srv->tables.alloc, it); pj_turn_allocation_destroy(alloc); it = next; } } /* Destroy all listeners. */ for (i=0; icore.lis_cnt; ++i) { if (srv->core.listener[i]) { pj_turn_listener_destroy(srv->core.listener[i]); srv->core.listener[i] = NULL; } } /* Destroy STUN session */ if (srv->core.stun_sess) { pj_stun_session_destroy(srv->core.stun_sess); srv->core.stun_sess = NULL; } /* Destroy hash tables (well, sort of) */ if (srv->tables.alloc) { srv->tables.alloc = NULL; srv->tables.res = NULL; } /* Destroy timer heap */ if (srv->core.timer_heap) { pj_timer_heap_destroy(srv->core.timer_heap); srv->core.timer_heap = NULL; } /* Destroy ioqueue */ if (srv->core.ioqueue) { pj_ioqueue_destroy(srv->core.ioqueue); srv->core.ioqueue = NULL; } /* Destroy thread local IDs */ if (srv->core.tls_key != -1) { pj_thread_local_free(srv->core.tls_key); srv->core.tls_key = -1; } if (srv->core.tls_data != -1) { pj_thread_local_free(srv->core.tls_data); srv->core.tls_data = -1; } /* Destroy server lock */ if (srv->core.lock) { pj_lock_destroy(srv->core.lock); srv->core.lock = NULL; } /* Release pool */ if (srv->core.pool) { pj_pool_t *pool = srv->core.pool; srv->core.pool = NULL; pj_pool_release(pool); } /* Done */ return PJ_SUCCESS; } /* * Add listener. */ PJ_DEF(pj_status_t) pj_turn_srv_add_listener(pj_turn_srv *srv, pj_turn_listener *lis) { unsigned index; PJ_ASSERT_RETURN(srv && lis, PJ_EINVAL); PJ_ASSERT_RETURN(srv->core.lis_cnt < MAX_LISTENERS, PJ_ETOOMANY); /* Add to array */ index = srv->core.lis_cnt; srv->core.listener[index] = lis; lis->server = srv; lis->id = index; srv->core.lis_cnt++; PJ_LOG(4,(srv->obj_name, "Listener %s/%s added at index %d", lis->obj_name, lis->info, lis->id)); return PJ_SUCCESS; } /* * Destroy listener. */ PJ_DEF(pj_status_t) pj_turn_listener_destroy(pj_turn_listener *listener) { pj_turn_srv *srv = listener->server; unsigned i; /* Remove from our listener list */ pj_lock_acquire(srv->core.lock); for (i=0; icore.lis_cnt; ++i) { if (srv->core.listener[i] == listener) { srv->core.listener[i] = NULL; srv->core.lis_cnt--; listener->id = PJ_TURN_INVALID_LIS_ID; break; } } pj_lock_release(srv->core.lock); /* Destroy */ return listener->destroy(listener); } /** * Add a reference to a transport. */ PJ_DEF(void) pj_turn_transport_add_ref( pj_turn_transport *transport, pj_turn_allocation *alloc) { transport->add_ref(transport, alloc); } /** * Decrement transport reference counter. */ PJ_DEF(void) pj_turn_transport_dec_ref( pj_turn_transport *transport, pj_turn_allocation *alloc) { transport->dec_ref(transport, alloc); } /* * Register an allocation to the hash tables. */ PJ_DEF(pj_status_t) pj_turn_srv_register_allocation(pj_turn_srv *srv, pj_turn_allocation *alloc) { /* Add to hash tables */ pj_lock_acquire(srv->core.lock); pj_hash_set(alloc->pool, srv->tables.alloc, &alloc->hkey, sizeof(alloc->hkey), 0, alloc); pj_hash_set(alloc->pool, srv->tables.res, &alloc->relay.hkey, sizeof(alloc->relay.hkey), 0, &alloc->relay); pj_lock_release(srv->core.lock); return PJ_SUCCESS; } /* * Unregister an allocation from the hash tables. */ PJ_DEF(pj_status_t) pj_turn_srv_unregister_allocation(pj_turn_srv *srv, pj_turn_allocation *alloc) { /* Unregister from hash tables */ pj_lock_acquire(srv->core.lock); pj_hash_set(alloc->pool, srv->tables.alloc, &alloc->hkey, sizeof(alloc->hkey), 0, NULL); pj_hash_set(alloc->pool, srv->tables.res, &alloc->relay.hkey, sizeof(alloc->relay.hkey), 0, NULL); pj_lock_release(srv->core.lock); return PJ_SUCCESS; } /* Callback from our own STUN session whenever it needs to send * outgoing STUN packet. */ static pj_status_t on_tx_stun_msg( pj_stun_session *sess, void *token, const void *pdu, pj_size_t pdu_size, const pj_sockaddr_t *dst_addr, unsigned addr_len) { pj_turn_transport *transport = (pj_turn_transport*) token; PJ_ASSERT_RETURN(transport!=NULL, PJ_EINVALIDOP); PJ_UNUSED_ARG(sess); return transport->sendto(transport, pdu, pdu_size, 0, dst_addr, addr_len); } /* Respond to STUN request */ static pj_status_t stun_respond(pj_stun_session *sess, pj_turn_transport *transport, const pj_stun_rx_data *rdata, unsigned code, const char *errmsg, pj_bool_t cache, const pj_sockaddr_t *dst_addr, unsigned addr_len) { pj_status_t status; pj_str_t reason; pj_stun_tx_data *tdata; /* Create response */ status = pj_stun_session_create_res(sess, rdata, code, (errmsg?pj_cstr(&reason,errmsg):NULL), &tdata); if (status != PJ_SUCCESS) return status; /* Send the response */ return pj_stun_session_send_msg(sess, transport, cache, PJ_FALSE, dst_addr, addr_len, tdata); } /* Callback from our own STUN session when incoming request arrives. * This function is triggered by pj_stun_session_on_rx_pkt() call in * pj_turn_srv_on_rx_pkt() function below. */ static pj_status_t on_rx_stun_request(pj_stun_session *sess, const pj_uint8_t *pdu, unsigned pdu_len, const pj_stun_rx_data *rdata, void *token, const pj_sockaddr_t *src_addr, unsigned src_addr_len) { pj_turn_transport *transport; const pj_stun_msg *msg = rdata->msg; pj_turn_srv *srv; pj_turn_allocation *alloc; pj_status_t status; PJ_UNUSED_ARG(pdu); PJ_UNUSED_ARG(pdu_len); transport = (pj_turn_transport*) token; srv = transport->listener->server; /* Respond any requests other than ALLOCATE with 437 response */ if (msg->hdr.type != PJ_STUN_ALLOCATE_REQUEST) { stun_respond(sess, transport, rdata, PJ_STUN_SC_ALLOCATION_MISMATCH, NULL, PJ_FALSE, src_addr, src_addr_len); return PJ_SUCCESS; } /* Create new allocation. The relay resource will be allocated * in this function. */ status = pj_turn_allocation_create(transport, src_addr, src_addr_len, rdata, sess, &alloc); if (status != PJ_SUCCESS) { /* STUN response has been sent, no need to reply here */ return PJ_SUCCESS; } /* Done. */ return PJ_SUCCESS; } /* Handle STUN Binding request */ static void handle_binding_request(pj_turn_pkt *pkt, unsigned options) { pj_stun_msg *request, *response; pj_uint8_t pdu[200]; pj_size_t len; pj_status_t status; /* Decode request */ status = pj_stun_msg_decode(pkt->pool, pkt->pkt, pkt->len, options, &request, NULL, NULL); if (status != PJ_SUCCESS) return; /* Create response */ status = pj_stun_msg_create_response(pkt->pool, request, 0, NULL, &response); if (status != PJ_SUCCESS) return; /* Add XOR-MAPPED-ADDRESS */ pj_stun_msg_add_sockaddr_attr(pkt->pool, response, PJ_STUN_ATTR_XOR_MAPPED_ADDR, PJ_TRUE, &pkt->src.clt_addr, pkt->src_addr_len); /* Encode */ status = pj_stun_msg_encode(response, pdu, sizeof(pdu), 0, NULL, &len); if (status != PJ_SUCCESS) return; /* Send response */ pkt->transport->sendto(pkt->transport, pdu, len, 0, &pkt->src.clt_addr, pkt->src_addr_len); } /* * This callback is called by UDP listener on incoming packet. This is * the first entry for incoming packet (from client) to the server. From * here, the packet may be handed over to an allocation if an allocation * is found for the client address, or handed over to owned STUN session * if an allocation is not found. */ PJ_DEF(void) pj_turn_srv_on_rx_pkt(pj_turn_srv *srv, pj_turn_pkt *pkt) { pj_turn_allocation *alloc; /* Get TURN allocation from the source address */ pj_lock_acquire(srv->core.lock); alloc = (pj_turn_allocation*) pj_hash_get(srv->tables.alloc, &pkt->src, sizeof(pkt->src), NULL); pj_lock_release(srv->core.lock); /* If allocation is found, just hand over the packet to the * allocation. */ if (alloc) { pj_turn_allocation_on_rx_client_pkt(alloc, pkt); } else { /* Otherwise this is a new client */ unsigned options; pj_size_t parsed_len; pj_status_t status; /* Check that this is a STUN message */ options = PJ_STUN_CHECK_PACKET | PJ_STUN_NO_FINGERPRINT_CHECK; if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP) options |= PJ_STUN_IS_DATAGRAM; status = pj_stun_msg_check(pkt->pkt, pkt->len, options); if (status != PJ_SUCCESS) { /* If the first byte are not STUN, drop the packet. First byte * of STUN message is always 0x00 or 0x01. Otherwise wait for * more data as the data might have come from TCP. * * Also drop packet if it's unreasonably too big, as this might * indicate invalid data that's building up in the buffer. * * Or if packet is a datagram. */ if ((*pkt->pkt != 0x00 && *pkt->pkt != 0x01) || pkt->len > 1600 || (options & PJ_STUN_IS_DATAGRAM)) { char errmsg[PJ_ERR_MSG_SIZE]; char ip[PJ_INET6_ADDRSTRLEN+10]; pkt->len = 0; pj_strerror(status, errmsg, sizeof(errmsg)); PJ_LOG(5,(srv->obj_name, "Non-STUN packet from %s is dropped: %s", pj_sockaddr_print(&pkt->src.clt_addr, ip, sizeof(ip), 3), errmsg)); } return; } /* Special handling for Binding Request. We won't give it to the * STUN session since this request is not authenticated. */ if (pkt->pkt[1] == 1) { handle_binding_request(pkt, options); return; } /* Hand over processing to STUN session. This will trigger * on_rx_stun_request() callback to be called if the STUN * message is a request. */ options &= ~PJ_STUN_CHECK_PACKET; parsed_len = 0; status = pj_stun_session_on_rx_pkt(srv->core.stun_sess, pkt->pkt, pkt->len, options, pkt->transport, &parsed_len, &pkt->src.clt_addr, pkt->src_addr_len); if (status != PJ_SUCCESS) { char errmsg[PJ_ERR_MSG_SIZE]; char ip[PJ_INET6_ADDRSTRLEN+10]; pj_strerror(status, errmsg, sizeof(errmsg)); PJ_LOG(5,(srv->obj_name, "Error processing STUN packet from %s: %s", pj_sockaddr_print(&pkt->src.clt_addr, ip, sizeof(ip), 3), errmsg)); } if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP) { pkt->len = 0; } else if (parsed_len > 0) { if (parsed_len == pkt->len) { pkt->len = 0; } else { pj_memmove(pkt->pkt, pkt->pkt+parsed_len, pkt->len - parsed_len); pkt->len -= parsed_len; } } } }