From d0b1cc04536aa71c104dc3b14ae0f6f4ffb36066 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Wed, 9 Apr 2008 09:38:12 +0000 Subject: More ticket #485: huge changeset to support TURN TCP. Please see ticket #485 for the details git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1913 74dad513-b988-da41-8d7b-12977e46ad98 --- pjnath/src/pjturn-srv/allocation.c | 157 +++++++++++++++++++++++-------------- 1 file changed, 99 insertions(+), 58 deletions(-) (limited to 'pjnath/src/pjturn-srv/allocation.c') diff --git a/pjnath/src/pjturn-srv/allocation.c b/pjnath/src/pjturn-srv/allocation.c index 339b2924..b552bc45 100644 --- a/pjnath/src/pjturn-srv/allocation.c +++ b/pjnath/src/pjturn-srv/allocation.c @@ -65,6 +65,7 @@ static void on_rx_from_peer(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read); static pj_status_t stun_on_send_msg(pj_stun_session *sess, + void *token, const void *pkt, pj_size_t pkt_size, const pj_sockaddr_t *dst_addr, @@ -73,12 +74,14 @@ static pj_status_t stun_on_rx_request(pj_stun_session *sess, const pj_uint8_t *pkt, unsigned pkt_len, const pj_stun_rx_data *rdata, + void *token, const pj_sockaddr_t *src_addr, unsigned src_addr_len); static pj_status_t stun_on_rx_indication(pj_stun_session *sess, const pj_uint8_t *pkt, unsigned pkt_len, const pj_stun_msg *msg, + void *token, const pj_sockaddr_t *src_addr, unsigned src_addr_len); @@ -123,7 +126,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg, if (cfg->bandwidth > MAX_CLIENT_BANDWIDTH) { pj_stun_session_respond(sess, rdata, PJ_STUN_SC_ALLOCATION_QUOTA_REACHED, - "Invalid bandwidth", PJ_TRUE, + "Invalid bandwidth", NULL, PJ_TRUE, src_addr, src_addr_len); return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_ALLOCATION_QUOTA_REACHED); } @@ -134,7 +137,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg, if (attr_req_tp == NULL) { pj_stun_session_respond(sess, rdata, PJ_STUN_SC_BAD_REQUEST, "Missing REQUESTED-TRANSPORT attribute", - PJ_TRUE, src_addr, src_addr_len); + NULL, PJ_TRUE, src_addr, src_addr_len); return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST); } @@ -143,7 +146,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg, /* Can only support UDP for now */ if (cfg->tp_type != PJ_TURN_TP_UDP) { pj_stun_session_respond(sess, rdata, PJ_STUN_SC_UNSUPP_TRANSPORT_PROTO, - NULL, PJ_TRUE, src_addr, src_addr_len); + NULL, NULL, PJ_TRUE, src_addr, src_addr_len); return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_UNSUPP_TRANSPORT_PROTO); } @@ -155,8 +158,8 @@ static pj_status_t parse_allocate_req(alloc_request *cfg, /* We don't support RESERVATION-TOKEN for now */ pj_stun_session_respond(sess, rdata, PJ_STUN_SC_BAD_REQUEST, - "RESERVATION-TOKEN is not supported", PJ_TRUE, - src_addr, src_addr_len); + "RESERVATION-TOKEN is not supported", NULL, + PJ_TRUE, src_addr, src_addr_len); return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST); } @@ -167,8 +170,8 @@ static pj_status_t parse_allocate_req(alloc_request *cfg, /* We don't support REQUESTED-PROPS for now */ pj_stun_session_respond(sess, rdata, PJ_STUN_SC_BAD_REQUEST, - "REQUESTED-PROPS is not supported", PJ_TRUE, - src_addr, src_addr_len); + "REQUESTED-PROPS is not supported", + NULL, PJ_TRUE, src_addr, src_addr_len); return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST); } @@ -179,8 +182,8 @@ static pj_status_t parse_allocate_req(alloc_request *cfg, cfg->lifetime = attr_lifetime->value; if (cfg->lifetime < MIN_LIFETIME) { pj_stun_session_respond(sess, rdata, PJ_STUN_SC_BAD_REQUEST, - "LIFETIME too short", PJ_TRUE, - src_addr, src_addr_len); + "LIFETIME too short", NULL, + PJ_TRUE, src_addr, src_addr_len); return PJ_STATUS_FROM_STUN_CODE(PJ_STUN_SC_BAD_REQUEST); } if (cfg->lifetime > MAX_LIFETIME) @@ -196,6 +199,7 @@ static pj_status_t parse_allocate_req(alloc_request *cfg, /* Respond to ALLOCATE request */ static pj_status_t send_allocate_response(pj_turn_allocation *alloc, pj_stun_session *srv_sess, + pj_turn_transport *transport, const pj_stun_rx_data *rdata) { pj_stun_tx_data *tdata; @@ -232,8 +236,8 @@ static pj_status_t send_allocate_response(pj_turn_allocation *alloc, pj_sockaddr_get_len(&alloc->hkey.clt_addr)); /* Send the response */ - return pj_stun_session_send_msg(srv_sess, PJ_TRUE, - &alloc->hkey.clt_addr, + return pj_stun_session_send_msg(srv_sess, transport, PJ_TRUE, + PJ_FALSE, &alloc->hkey.clt_addr, pj_sockaddr_get_len(&alloc->hkey.clt_addr), tdata); } @@ -283,14 +287,14 @@ static pj_status_t init_cred(pj_turn_allocation *alloc, const pj_stun_msg *req) /* * Create new allocation. */ -PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener, +PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_transport *transport, const pj_sockaddr_t *src_addr, unsigned src_addr_len, const pj_stun_rx_data *rdata, pj_stun_session *srv_sess, pj_turn_allocation **p_alloc) { - pj_turn_srv *srv = listener->server; + pj_turn_srv *srv = transport->listener->server; const pj_stun_msg *msg = rdata->msg; pj_pool_t *pool; alloc_request req; @@ -310,13 +314,16 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener, alloc = PJ_POOL_ZALLOC_T(pool, pj_turn_allocation); alloc->pool = pool; alloc->obj_name = pool->obj_name; - alloc->listener = listener; - alloc->clt_sock = PJ_INVALID_SOCKET; alloc->relay.tp.sock = PJ_INVALID_SOCKET; + alloc->server = transport->listener->server; alloc->bandwidth = req.bandwidth; - alloc->hkey.tp_type = listener->tp_type; + /* Set transport */ + alloc->transport = transport; + pj_turn_transport_add_ref(transport, alloc); + + alloc->hkey.tp_type = transport->listener->tp_type; pj_memcpy(&alloc->hkey.clt_addr, src_addr, src_addr_len); status = pj_lock_create_recursive_mutex(pool, alloc->obj_name, @@ -332,7 +339,8 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener, alloc->ch_table = pj_hash_create(pool, PEER_TABLE_SIZE); /* Print info */ - pj_ansi_strcpy(alloc->info, pj_turn_tp_type_name(listener->tp_type)); + pj_ansi_strcpy(alloc->info, + pj_turn_tp_type_name(transport->listener->tp_type)); alloc->info[3] = ':'; pj_sockaddr_print(src_addr, alloc->info+4, sizeof(alloc->info)-4, 3); @@ -370,7 +378,7 @@ PJ_DEF(pj_status_t) pj_turn_allocation_create(pj_turn_listener *listener, pj_turn_srv_register_allocation(srv, alloc); /* Respond to ALLOCATE request */ - status = send_allocate_response(alloc, srv_sess, rdata); + status = send_allocate_response(alloc, srv_sess, transport, rdata); if (status != PJ_SUCCESS) goto on_error; @@ -388,7 +396,7 @@ on_error: /* Send reply to the ALLOCATE request */ pj_strerror(status, str_tmp, sizeof(str_tmp)); pj_stun_session_respond(srv_sess, rdata, PJ_STUN_SC_BAD_REQUEST, str_tmp, - PJ_TRUE, src_addr, src_addr_len); + transport, PJ_TRUE, src_addr, src_addr_len); /* Cleanup */ destroy_allocation(alloc); @@ -400,7 +408,7 @@ on_error: static void destroy_relay(pj_turn_relay_res *relay) { if (relay->timer.id) { - pj_timer_heap_cancel(relay->allocation->listener->server->core.timer_heap, + pj_timer_heap_cancel(relay->allocation->server->core.timer_heap, &relay->timer); relay->timer.id = PJ_FALSE; } @@ -427,7 +435,7 @@ static void destroy_allocation(pj_turn_allocation *alloc) pj_pool_t *pool; /* Unregister this allocation */ - pj_turn_srv_unregister_allocation(alloc->listener->server, alloc); + pj_turn_srv_unregister_allocation(alloc->server, alloc); /* Destroy relay */ destroy_relay(&alloc->relay); @@ -437,6 +445,12 @@ static void destroy_allocation(pj_turn_allocation *alloc) pj_lock_acquire(alloc->lock); } + /* Unreference transport */ + if (alloc->transport) { + pj_turn_transport_dec_ref(alloc->transport, alloc); + alloc->transport = NULL; + } + /* Destroy STUN session */ if (alloc->sess) { pj_stun_session_destroy(alloc->sess); @@ -465,6 +479,20 @@ PJ_DECL(void) pj_turn_allocation_destroy(pj_turn_allocation *alloc) } +/* + * Handle transport closure. + */ +PJ_DEF(void) pj_turn_allocation_on_transport_closed( pj_turn_allocation *alloc, + pj_turn_transport *tp) +{ + PJ_LOG(5,(alloc->obj_name, "Transport %s unexpectedly closed, destroying " + "allocation %s", tp->info, alloc->info)); + pj_turn_transport_dec_ref(tp, alloc); + alloc->transport = NULL; + destroy_allocation(alloc); +} + + /* Initiate shutdown sequence for this allocation and start destroy timer. * Once allocation is marked as shutting down, any packets will be * rejected/discarded @@ -476,7 +504,7 @@ static void alloc_shutdown(pj_turn_allocation *alloc) /* Work with existing schedule */ if (alloc->relay.timer.id == TIMER_ID_TIMEOUT) { /* Cancel existing shutdown timer */ - pj_timer_heap_cancel(alloc->listener->server->core.timer_heap, + pj_timer_heap_cancel(alloc->server->core.timer_heap, &alloc->relay.timer); alloc->relay.timer.id = TIMER_ID_NONE; @@ -498,7 +526,7 @@ static void alloc_shutdown(pj_turn_allocation *alloc) /* Schedule destroy timer */ alloc->relay.timer.id = TIMER_ID_DESTROY; - pj_timer_heap_schedule(alloc->listener->server->core.timer_heap, + pj_timer_heap_schedule(alloc->server->core.timer_heap, &alloc->relay.timer, &destroy_delay); } @@ -514,7 +542,7 @@ static pj_status_t resched_timeout(pj_turn_allocation *alloc) pj_assert(alloc->relay.timer.id != TIMER_ID_DESTROY); if (alloc->relay.timer.id != 0) { - pj_timer_heap_cancel(alloc->listener->server->core.timer_heap, + pj_timer_heap_cancel(alloc->server->core.timer_heap, &alloc->relay.timer); alloc->relay.timer.id = TIMER_ID_NONE; } @@ -523,7 +551,7 @@ static pj_status_t resched_timeout(pj_turn_allocation *alloc) delay.msec = 0; alloc->relay.timer.id = TIMER_ID_TIMEOUT; - status = pj_timer_heap_schedule(alloc->listener->server->core.timer_heap, + status = pj_timer_heap_schedule(alloc->server->core.timer_heap, &alloc->relay.timer, &delay); if (status != PJ_SUCCESS) { alloc->relay.timer.id = TIMER_ID_NONE; @@ -589,7 +617,7 @@ static pj_status_t create_relay(pj_turn_srv *srv, relay->tp.sock = PJ_INVALID_SOCKET; /* TODO: get the requested address family from somewhere */ - af = alloc->listener->addr.addr.sa_family; + af = alloc->transport->listener->addr.addr.sa_family; /* Save realm */ sa = (pj_stun_string_attr*) @@ -686,7 +714,8 @@ static pj_status_t create_relay(pj_turn_srv *srv, return status; } if (!pj_sockaddr_has_addr(&relay->hkey.addr)) { - pj_sockaddr_copy_addr(&relay->hkey.addr, &alloc->listener->addr); + pj_sockaddr_copy_addr(&relay->hkey.addr, + &alloc->transport->listener->addr); } if (!pj_sockaddr_has_addr(&relay->hkey.addr)) { pj_sockaddr tmp_addr; @@ -724,8 +753,8 @@ static void send_reply_err(pj_turn_allocation *alloc, { pj_status_t status; - status = pj_stun_session_respond(alloc->sess, rdata, code, errmsg, cache, - &alloc->hkey.clt_addr, + status = pj_stun_session_respond(alloc->sess, rdata, code, errmsg, NULL, + cache, &alloc->hkey.clt_addr, pj_sockaddr_get_len(&alloc->hkey.clt_addr.addr)); if (status != PJ_SUCCESS) { alloc_err(alloc, "Error sending STUN error response", status); @@ -769,8 +798,8 @@ static void send_reply_ok(pj_turn_allocation *alloc, } } - status = pj_stun_session_send_msg(alloc->sess, PJ_TRUE, - &alloc->hkey.clt_addr, + status = pj_stun_session_send_msg(alloc->sess, NULL, PJ_TRUE, + PJ_FALSE, &alloc->hkey.clt_addr, pj_sockaddr_get_len(&alloc->hkey.clt_addr), tdata); if (status != PJ_SUCCESS) { @@ -790,13 +819,6 @@ static pj_turn_permission *create_permission(pj_turn_allocation *alloc, perm = PJ_POOL_ZALLOC_T(alloc->pool, pj_turn_permission); pj_memcpy(&perm->hkey.peer_addr, peer_addr, addr_len); - if (alloc->listener->tp_type == PJ_TURN_TP_UDP) { - perm->sock = alloc->listener->sock; - } else { - pj_assert(!"TCP is not supported yet"); - return NULL; - } - perm->allocation = alloc; perm->channel = PJ_TURN_INVALID_CHANNEL; @@ -900,13 +922,28 @@ PJ_DEF(void) pj_turn_allocation_on_rx_client_pkt(pj_turn_allocation *alloc, * callbacks. */ unsigned options = PJ_STUN_CHECK_PACKET; - if (pkt->listener->tp_type == PJ_TURN_TP_UDP) + unsigned parsed_len = 0; + + if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP) options |= PJ_STUN_IS_DATAGRAM; status = pj_stun_session_on_rx_pkt(alloc->sess, pkt->pkt, pkt->len, - options, NULL, + options, NULL, &parsed_len, &pkt->src.clt_addr, pkt->src_addr_len); + + if (pkt->transport->listener->tp_type == PJ_TURN_TP_UDP) { + pkt->len = 0; + } else if (parsed_len > 0) { + if (parsed_len == pkt->len) { + pkt->len = 0; + } else { + pj_memmove(pkt->pkt, pkt->pkt+parsed_len, + pkt->len - parsed_len); + pkt->len -= parsed_len; + } + } + if (status != PJ_SUCCESS) { alloc_err(alloc, "Error handling STUN packet", status); goto on_return; @@ -923,7 +960,7 @@ PJ_DEF(void) pj_turn_allocation_on_rx_client_pkt(pj_turn_allocation *alloc, pj_assert(sizeof(*cd)==4); /* For UDP check the packet length */ - if (alloc->listener->tp_type == PJ_TURN_TP_UDP) { + if (alloc->transport->listener->tp_type == PJ_TURN_TP_UDP) { if (pkt->len < pj_ntohs(cd->length)+sizeof(*cd)) { PJ_LOG(4,(alloc->obj_name, "ChannelData from %s discarded: UDP size error", @@ -1003,10 +1040,10 @@ static void handle_peer_pkt(pj_turn_allocation *alloc, pj_memcpy(rel->tp.tx_pkt+sizeof(pj_turn_channel_data), pkt, len); /* Send to client */ - pj_turn_listener_sendto(alloc->listener, rel->tp.tx_pkt, - len+sizeof(pj_turn_channel_data), 0, - &alloc->hkey.clt_addr, - pj_sockaddr_get_len(&alloc->hkey.clt_addr)); + alloc->transport->sendto(alloc->transport, rel->tp.tx_pkt, + len+sizeof(pj_turn_channel_data), 0, + &alloc->hkey.clt_addr, + pj_sockaddr_get_len(&alloc->hkey.clt_addr)); } else { /* Send Data Indication */ pj_stun_tx_data *tdata; @@ -1026,8 +1063,8 @@ static void handle_peer_pkt(pj_turn_allocation *alloc, PJ_STUN_ATTR_DATA, (const pj_uint8_t*)pkt, len); - pj_stun_session_send_msg(alloc->sess, PJ_FALSE, - &alloc->hkey.clt_addr, + pj_stun_session_send_msg(alloc->sess, NULL, PJ_FALSE, + PJ_FALSE, &alloc->hkey.clt_addr, pj_sockaddr_get_len(&alloc->hkey.clt_addr), tdata); } @@ -1076,6 +1113,7 @@ static void on_rx_from_peer(pj_ioqueue_key_t *key, * a STUN message towards the client. */ static pj_status_t stun_on_send_msg(pj_stun_session *sess, + void *token, const void *pkt, pj_size_t pkt_size, const pj_sockaddr_t *dst_addr, @@ -1083,10 +1121,12 @@ static pj_status_t stun_on_send_msg(pj_stun_session *sess, { pj_turn_allocation *alloc; + PJ_UNUSED_ARG(token); + alloc = (pj_turn_allocation*) pj_stun_session_get_user_data(sess); - return pj_turn_listener_sendto(alloc->listener, pkt, pkt_size, 0, - dst_addr, addr_len); + return alloc->transport->sendto(alloc->transport, pkt, pkt_size, 0, + dst_addr, addr_len); } /* @@ -1098,6 +1138,7 @@ static pj_status_t stun_on_rx_request(pj_stun_session *sess, const pj_uint8_t *pkt, unsigned pkt_len, const pj_stun_rx_data *rdata, + void *token, const pj_sockaddr_t *src_addr, unsigned src_addr_len) { @@ -1106,6 +1147,7 @@ static pj_status_t stun_on_rx_request(pj_stun_session *sess, PJ_UNUSED_ARG(pkt); PJ_UNUSED_ARG(pkt_len); + PJ_UNUSED_ARG(token); PJ_UNUSED_ARG(src_addr); PJ_UNUSED_ARG(src_addr_len); @@ -1274,6 +1316,7 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *sess, const pj_uint8_t *pkt, unsigned pkt_len, const pj_stun_msg *msg, + void *token, const pj_sockaddr_t *src_addr, unsigned src_addr_len) { @@ -1281,9 +1324,11 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *sess, pj_stun_data_attr *data_attr; pj_turn_allocation *alloc; pj_turn_permission *perm; + pj_ssize_t len; PJ_UNUSED_ARG(pkt); PJ_UNUSED_ARG(pkt_len); + PJ_UNUSED_ARG(token); PJ_UNUSED_ARG(src_addr); PJ_UNUSED_ARG(src_addr_len); @@ -1320,15 +1365,11 @@ static pj_status_t stun_on_rx_indication(pj_stun_session *sess, if (data_attr == NULL) return PJ_SUCCESS; - /* Relay the data to client */ - if (alloc->hkey.tp_type == PJ_TURN_TP_UDP) { - pj_ssize_t len = data_attr->length; - pj_sock_sendto(alloc->listener->sock, data_attr->data, - &len, 0, &peer_attr->sockaddr, - pj_sockaddr_get_len(&peer_attr->sockaddr)); - } else { - pj_assert(!"TCP is not supported"); - } + /* Relay the data to peer */ + len = data_attr->length; + pj_sock_sendto(alloc->relay.tp.sock, data_attr->data, + &len, 0, &peer_attr->sockaddr, + pj_sockaddr_get_len(&peer_attr->sockaddr)); return PJ_SUCCESS; } -- cgit v1.2.3