diff options
Diffstat (limited to 'pjnath/src/pjstun-srv-test')
-rw-r--r-- | pjnath/src/pjstun-srv-test/bind_usage.c | 206 | ||||
-rw-r--r-- | pjnath/src/pjstun-srv-test/main.c | 146 | ||||
-rw-r--r-- | pjnath/src/pjstun-srv-test/server.c | 185 | ||||
-rw-r--r-- | pjnath/src/pjstun-srv-test/server.h | 135 | ||||
-rw-r--r-- | pjnath/src/pjstun-srv-test/turn_usage.c | 1408 | ||||
-rw-r--r-- | pjnath/src/pjstun-srv-test/usage.c | 271 |
6 files changed, 2351 insertions, 0 deletions
diff --git a/pjnath/src/pjstun-srv-test/bind_usage.c b/pjnath/src/pjstun-srv-test/bind_usage.c new file mode 100644 index 00000000..fc10fb91 --- /dev/null +++ b/pjnath/src/pjstun-srv-test/bind_usage.c @@ -0,0 +1,206 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2005 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "server.h" + +#define THIS_FILE "bind_usage.c" + +static void usage_on_rx_data(pj_stun_usage *usage, + void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); +static void usage_on_destroy(pj_stun_usage *usage); +static pj_status_t sess_on_send_msg(pj_stun_session *sess, + const void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *dst_addr, + unsigned addr_len); +static pj_status_t sess_on_rx_request(pj_stun_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); + +struct bind_usage +{ + pj_pool_t *pool; + pj_stun_usage *usage; + pj_stun_session *session; +}; + + +PJ_DEF(pj_status_t) pj_stun_bind_usage_create(pj_stun_server *srv, + const pj_str_t *ip_addr, + unsigned port, + pj_stun_usage **p_bu) +{ + pj_pool_t *pool; + struct bind_usage *bu; + pj_stun_server_info *si; + pj_stun_usage_cb usage_cb; + pj_stun_session_cb sess_cb; + pj_sockaddr_in local_addr; + pj_status_t status; + + si = pj_stun_server_get_info(srv); + + pool = pj_pool_create(si->pf, "bind%p", 128, 128, NULL); + bu = PJ_POOL_ZALLOC_T(pool, struct bind_usage); + bu->pool = pool; + + status = pj_sockaddr_in_init(&local_addr, ip_addr, (pj_uint16_t)port); + if (status != PJ_SUCCESS) + return status; + + pj_bzero(&usage_cb, sizeof(usage_cb)); + usage_cb.on_rx_data = &usage_on_rx_data; + usage_cb.on_destroy = &usage_on_destroy; + + status = pj_stun_usage_create(srv, "bind%p", &usage_cb, + PJ_AF_INET, PJ_SOCK_DGRAM, 0, + &local_addr, sizeof(local_addr), + &bu->usage); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } + + pj_bzero(&sess_cb, sizeof(sess_cb)); + sess_cb.on_send_msg = &sess_on_send_msg; + sess_cb.on_rx_request = &sess_on_rx_request; + status = pj_stun_session_create(si->endpt, "bind%p", &sess_cb, PJ_FALSE, + &bu->session); + if (status != PJ_SUCCESS) { + pj_stun_usage_destroy(bu->usage); + return status; + } + + pj_stun_usage_set_user_data(bu->usage, bu); + pj_stun_session_set_user_data(bu->session, bu); + + if (p_bu) + *p_bu = bu->usage; + + return PJ_SUCCESS; +} + + +static void usage_on_rx_data(pj_stun_usage *usage, + void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + struct bind_usage *bu; + pj_stun_session *session; + pj_status_t status; + + bu = (struct bind_usage*) pj_stun_usage_get_user_data(usage); + session = bu->session; + + /* Handle packet to session */ + status = pj_stun_session_on_rx_pkt(session, (pj_uint8_t*)pkt, pkt_size, + PJ_STUN_IS_DATAGRAM | PJ_STUN_CHECK_PACKET, + NULL, src_addr, src_addr_len); + if (status != PJ_SUCCESS) { + pj_stun_perror(THIS_FILE, "Error handling incoming packet", status); + return; + } +} + + +static pj_status_t sess_on_send_msg(pj_stun_session *sess, + const void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *dst_addr, + unsigned addr_len) +{ + struct bind_usage *bu; + pj_stun_usage *usage; + + bu = (struct bind_usage*) pj_stun_session_get_user_data(sess); + usage = bu->usage; + + return pj_stun_usage_sendto(usage, pkt, pkt_size, 0, + dst_addr, addr_len); +} + + +static pj_status_t sess_on_rx_request(pj_stun_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + pj_stun_tx_data *tdata; + pj_status_t status; + + PJ_UNUSED_ARG(pkt); + PJ_UNUSED_ARG(pkt_len); + + /* Create response */ + status = pj_stun_session_create_response(sess, msg, 0, NULL, &tdata); + if (status != PJ_SUCCESS) + return status; + + /* Create MAPPED-ADDRESS attribute */ + status = pj_stun_msg_add_sockaddr_attr(tdata->pool, tdata->msg, + PJ_STUN_ATTR_MAPPED_ADDR, + PJ_FALSE, + src_addr, src_addr_len); + if (status != PJ_SUCCESS) { + pj_stun_perror(THIS_FILE, "Error creating response", status); + pj_stun_msg_destroy_tdata(sess, tdata); + return status; + } + + /* On the presence of magic, create XOR-MAPPED-ADDRESS attribute */ + if (msg->hdr.magic == PJ_STUN_MAGIC) { + status = + pj_stun_msg_add_sockaddr_attr(tdata->pool, tdata->msg, + PJ_STUN_ATTR_XOR_MAPPED_ADDR, + PJ_TRUE, + src_addr, src_addr_len); + if (status != PJ_SUCCESS) { + pj_stun_perror(THIS_FILE, "Error creating response", status); + pj_stun_msg_destroy_tdata(sess, tdata); + return status; + } + } + + /* Send */ + status = pj_stun_session_send_msg(sess, PJ_TRUE, + src_addr, src_addr_len, tdata); + return status; + +} + +static void usage_on_destroy(pj_stun_usage *usage) +{ + struct bind_usage *bu; + + bu = (struct bind_usage*) pj_stun_usage_get_user_data(usage); + if (bu==NULL) + return; + + pj_stun_session_destroy(bu->session); + pj_pool_release(bu->pool); +} diff --git a/pjnath/src/pjstun-srv-test/main.c b/pjnath/src/pjstun-srv-test/main.c new file mode 100644 index 00000000..c462d47e --- /dev/null +++ b/pjnath/src/pjstun-srv-test/main.c @@ -0,0 +1,146 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2005 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "server.h" + +#define THIS_FILE "main.c" + +struct options +{ + char *realm; + char *user_name; + char *password; + char *nonce; + pj_bool_t use_fingerprint; +} o; + +static void usage(void) +{ + puts("Usage: pjstun_srv_test [OPTIONS]"); + puts(""); + puts("where OPTIONS:"); + puts(" --realm, -r Set realm of the credential"); + puts(" --username, -u Set username of the credential"); + puts(" --password, -p Set password of the credential"); + puts(" --nonce, -N Set NONCE"); + puts(" --fingerprint, -F Use fingerprint for outgoing requests"); + puts(" --help, -h"); +} + + +static void server_main(pj_stun_server *srv) +{ + int quit = 0; + + while (!quit) { + char line[10]; + + printf("Menu:\n" + " d Dump status\n" + " q Quit\n" + "Choice:"); + + fgets(line, sizeof(line), stdin); + if (line[0] == 'q') { + quit = 1; + } else if (line[0] == 'd') { + pj_stun_server_info *si = pj_stun_server_get_info(srv); + pj_pool_factory_dump(si->pf, PJ_TRUE); + } + } +} + +int main(int argc, char *argv[]) +{ + struct pj_getopt_option long_options[] = { + { "realm", 1, 0, 'r'}, + { "username", 1, 0, 'u'}, + { "password", 1, 0, 'p'}, + { "nonce", 1, 0, 'N'}, + { "fingerprint",0, 0, 'F'}, + { "help", 0, 0, 'h'} + }; + int c, opt_id; + pj_caching_pool cp; + pj_stun_server *srv; + pj_status_t status; + + while((c=pj_getopt_long(argc,argv, "r:u:p:hF", long_options, &opt_id))!=-1) { + switch (c) { + case 'r': + o.realm = pj_optarg; + break; + case 'u': + o.user_name = pj_optarg; + break; + case 'p': + o.password = pj_optarg; + break; + case 'N': + o.nonce = pj_optarg; + break; + case 'h': + usage(); + return 0; + case 'F': + o.use_fingerprint = PJ_TRUE; + break; + default: + printf("Argument \"%s\" is not valid. Use -h to see help", + argv[pj_optind]); + return 1; + } + } + + if (pj_optind != argc) { + puts("Error: invalid arguments"); + return 1; + } + + pj_init(); + pjlib_util_init(); + pj_caching_pool_init(&cp, &pj_pool_factory_default_policy, 0); + + status = pj_stun_server_create(&cp.factory, 1, &srv); + if (status != PJ_SUCCESS) { + pj_stun_perror(THIS_FILE, "Unable to create server", status); + return 1; + } + + /* + status = pj_stun_bind_usage_create(srv, NULL, 3478, NULL); + if (status != PJ_SUCCESS) { + pj_stun_perror(THIS_FILE, "Unable to create bind usage", status); + return 1; + } + */ + + status = pj_stun_turn_usage_create(srv, PJ_SOCK_DGRAM, NULL, + 3478, NULL); + if (status != PJ_SUCCESS) { + pj_stun_perror(THIS_FILE, "Unable to create bind usage", status); + return 1; + } + + server_main(srv); + + pj_stun_server_destroy(srv); + pj_pool_factory_dump(&cp.factory, PJ_TRUE); + pj_shutdown(); + return 0; +} diff --git a/pjnath/src/pjstun-srv-test/server.c b/pjnath/src/pjstun-srv-test/server.c new file mode 100644 index 00000000..5fdb233e --- /dev/null +++ b/pjnath/src/pjstun-srv-test/server.c @@ -0,0 +1,185 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2005 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include "server.h" + +#define THIS_FILE "server.c" + +struct pj_stun_server +{ + pj_stun_server_info si; + + pj_pool_t *pool; + + pj_bool_t thread_quit_flag; + pj_thread_t **threads; + + unsigned usage_cnt; + pj_stun_usage *usage[32]; +}; + +PJ_DEF(pj_status_t) pj_stun_perror( const char *sender, + const char *title, + pj_status_t status) +{ + char errmsg[PJ_ERR_MSG_SIZE]; + pj_strerror(status, errmsg, sizeof(errmsg)); + + PJ_LOG(3,(sender, "%s: %s", title, errmsg)); + return status; +} + +static int worker_thread(void *p) +{ + pj_stun_server *srv = (pj_stun_server*)p; + + while (!srv->thread_quit_flag) { + pj_time_val timeout = { 0, 50 }; + pj_timer_heap_poll(srv->si.timer_heap, NULL); + pj_ioqueue_poll(srv->si.ioqueue, &timeout); + } + + return 0; +} + + +PJ_DEF(pj_status_t) pj_stun_server_create(pj_pool_factory *pf, + unsigned thread_cnt, + pj_stun_server **p_srv) +{ + pj_pool_t *pool; + pj_stun_server *srv; + unsigned i; + pj_status_t status; + + pool = pj_pool_create(pf, "server%p", 4000, 4000, NULL); + + srv = PJ_POOL_ZALLOC_T(pool, pj_stun_server); + srv->pool = pool; + srv->si.pf = pf; + + status = pj_ioqueue_create(srv->pool, PJ_IOQUEUE_MAX_HANDLES, + &srv->si.ioqueue); + if (status != PJ_SUCCESS) + goto on_error; + + status = pj_timer_heap_create(srv->pool, 1024, &srv->si.timer_heap); + if (status != PJ_SUCCESS) + goto on_error; + + status = pj_stun_endpoint_create(srv->si.pf, 0, srv->si.ioqueue, + srv->si.timer_heap, &srv->si.endpt); + if (status != PJ_SUCCESS) + goto on_error; + + srv->si.thread_cnt = thread_cnt; + srv->threads = pj_pool_calloc(pool, thread_cnt, sizeof(pj_thread_t*)); + for (i=0; i<thread_cnt; ++i) { + status = pj_thread_create(pool, "worker%p", &worker_thread, + srv, 0, 0, &srv->threads[i]); + if (status != PJ_SUCCESS) + goto on_error; + } + + *p_srv = srv; + return PJ_SUCCESS; + +on_error: + pj_stun_server_destroy(srv); + return status; +} + + +PJ_DEF(pj_stun_server_info*) pj_stun_server_get_info(pj_stun_server *srv) +{ + return &srv->si; +} + + +pj_status_t pj_stun_server_register_usage(pj_stun_server *srv, + pj_stun_usage *usage) +{ + unsigned i; + + for (i=0; i<PJ_ARRAY_SIZE(srv->usage); ++i) { + if (srv->usage[i] == usage) + return PJ_SUCCESS; + } + + for (i=0; i<PJ_ARRAY_SIZE(srv->usage); ++i) { + if (srv->usage[i] == NULL) + break; + } + + if (i == PJ_ARRAY_SIZE(srv->usage)) + return PJ_ETOOMANY; + + srv->usage[i] = usage; + ++srv->usage_cnt; + + return PJ_SUCCESS; +} + +pj_status_t pj_stun_server_unregister_usage(pj_stun_server *srv, + pj_stun_usage *usage) +{ + unsigned i; + + for (i=0; i<PJ_ARRAY_SIZE(srv->usage); ++i) { + if (srv->usage[i] == usage) + break; + } + + if (i != PJ_ARRAY_SIZE(srv->usage)) { + srv->usage[i] = NULL; + --srv->usage_cnt; + return PJ_SUCCESS; + } + + return PJ_ENOTFOUND; +} + + +PJ_DEF(pj_status_t) pj_stun_server_destroy(pj_stun_server *srv) +{ + unsigned i; + + for (i=0; i<PJ_ARRAY_SIZE(srv->usage); ++i) { + if (!srv->usage[i]) + continue; + + pj_stun_usage_destroy(srv->usage[i]); + pj_stun_server_unregister_usage(srv, srv->usage[i]); + } + + srv->thread_quit_flag = PJ_TRUE; + for (i=0; i<srv->si.thread_cnt; ++i) { + pj_thread_join(srv->threads[i]); + srv->threads[i] = NULL; + } + + pj_stun_endpoint_destroy(srv->si.endpt); + pj_timer_heap_destroy(srv->si.timer_heap); + pj_ioqueue_destroy(srv->si.ioqueue); + pj_pool_release(srv->pool); + + return PJ_SUCCESS; +} + + diff --git a/pjnath/src/pjstun-srv-test/server.h b/pjnath/src/pjstun-srv-test/server.h new file mode 100644 index 00000000..a88d87c2 --- /dev/null +++ b/pjnath/src/pjstun-srv-test/server.h @@ -0,0 +1,135 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2005 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#ifndef __STUN_SERVER_H__ +#define __STUN_SERVER_H__ + +#include <pjlib-util.h> +#include <pjlib.h> + + +/** Opaque declaration for STUN server instance */ +typedef struct pj_stun_server pj_stun_server; + +/** STUN server info */ +typedef struct pj_stun_server_info +{ + pj_pool_factory *pf; + pj_stun_endpoint *endpt; + pj_ioqueue_t *ioqueue; + pj_timer_heap_t *timer_heap; + unsigned thread_cnt; +} pj_stun_server_info; + +/** STUN usage */ +typedef struct pj_stun_usage pj_stun_usage; + +/** STUN usage callback */ +typedef struct pj_stun_usage_cb +{ + void (*on_rx_data)(pj_stun_usage *usage, + void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); + void (*on_destroy)(pj_stun_usage *usage); +} pj_stun_usage_cb; + + +PJ_DECL(pj_status_t) pj_stun_perror(const char *sender, + const char *title, + pj_status_t status); + +/** + * Create instance of STUN server. + */ +PJ_DECL(pj_status_t) pj_stun_server_create(pj_pool_factory *pf, + unsigned thread_cnt, + pj_stun_server **p_srv); + +/** + * Get STUN server info. + */ +PJ_DECL(pj_stun_server_info*) pj_stun_server_get_info(pj_stun_server *srv); + + +/** + * Destroy STUN server. + */ +PJ_DECL(pj_status_t) pj_stun_server_destroy(pj_stun_server *srv); + + +/** + * Create STUN usage. + */ +PJ_DECL(pj_status_t) pj_stun_usage_create(pj_stun_server *srv, + const char *name, + const pj_stun_usage_cb *cb, + int family, + int type, + int protocol, + const pj_sockaddr_t *local_addr, + int addr_len, + pj_stun_usage **p_usage); + +/** + * Destroy usage. + */ +PJ_DECL(pj_status_t) pj_stun_usage_destroy(pj_stun_usage *usage); + +/** + * Set user data. + */ +PJ_DECL(pj_status_t) pj_stun_usage_set_user_data(pj_stun_usage *usage, + void *user_data); +/** + * Get user data. + */ +PJ_DECL(void*) pj_stun_usage_get_user_data(pj_stun_usage *usage); + +/** + * Send with the usage. + */ +PJ_DECL(pj_status_t) pj_stun_usage_sendto(pj_stun_usage *usage, + const void *pkt, + pj_size_t pkt_size, + unsigned flags, + const pj_sockaddr_t *dst_addr, + unsigned addr_len); + +PJ_DECL(pj_status_t) pj_stun_bind_usage_create(pj_stun_server *srv, + const pj_str_t *ip_addr, + unsigned port, + pj_stun_usage **p_bu); + +PJ_DECL(pj_status_t) pj_stun_turn_usage_create(pj_stun_server *srv, + int type, + const pj_str_t *ip_addr, + unsigned port, + pj_stun_usage **p_bu); + + +pj_status_t pj_stun_server_register_usage(pj_stun_server *srv, + pj_stun_usage *usage); +pj_status_t pj_stun_server_unregister_usage(pj_stun_server *srv, + pj_stun_usage *usage); + + +#endif /* __STUN_SERVER_H__ */ + + diff --git a/pjnath/src/pjstun-srv-test/turn_usage.c b/pjnath/src/pjstun-srv-test/turn_usage.c new file mode 100644 index 00000000..e3d2e595 --- /dev/null +++ b/pjnath/src/pjstun-srv-test/turn_usage.c @@ -0,0 +1,1408 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2005 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "server.h" + +#define THIS_FILE "turn_usage.c" + +#define MAX_CLIENTS 8000 +#define MAX_PEER_PER_CLIENT 16 +#define START_PORT 2000 +#define END_PORT 65530 + +/* + * Forward declarations. + */ +struct turn_usage; +struct turn_client; + +static void tu_on_rx_data(pj_stun_usage *usage, + void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); +static void tu_on_destroy(pj_stun_usage *usage); +static pj_status_t tu_sess_on_send_msg(pj_stun_session *sess, + const void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *dst_addr, + unsigned addr_len); +static pj_status_t tu_sess_on_rx_request(pj_stun_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); + +static pj_status_t handle_binding_req(pj_stun_session *session, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); + +static pj_status_t client_create(struct turn_usage *tu, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len, + struct turn_client **p_client); +static pj_status_t client_destroy(struct turn_client *client, + pj_status_t reason); +static pj_status_t client_handle_stun_msg(struct turn_client *client, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); + + +struct turn_usage +{ + pj_pool_factory *pf; + pj_stun_endpoint *endpt; + pj_ioqueue_t *ioqueue; + pj_timer_heap_t *timer_heap; + pj_pool_t *pool; + pj_mutex_t *mutex; + pj_stun_usage *usage; + int type; + pj_stun_session *default_session; + pj_hash_table_t *client_htable; + + unsigned max_bw_kbps; + unsigned max_lifetime; + + unsigned next_port; +}; + +struct peer; + +struct turn_client +{ + char obj_name[PJ_MAX_OBJ_NAME]; + struct turn_usage *tu; + pj_pool_t *pool; + pj_stun_session *session; + pj_mutex_t *mutex; + + pj_sockaddr_in client_src_addr; + + /* Socket and socket address of the allocated port */ + int sock_type; + pj_sock_t sock; + pj_ioqueue_key_t *key; + pj_sockaddr_in alloc_addr; + + /* Allocation properties */ + unsigned bw_kbps; + unsigned lifetime; + pj_timer_entry expiry_timer; + + + /* Hash table to keep all peers, key-ed by their address */ + pj_hash_table_t *peer_htable; + + /* Active destination, or sin_addr.s_addr will be zero if + * no active destination is set. + */ + struct peer *active_peer; + + /* Current packet received/sent from/to the allocated port */ + pj_uint8_t pkt[4000]; + pj_sockaddr_in pkt_src_addr; + int pkt_src_addr_len; + pj_ioqueue_op_key_t pkt_read_key; + pj_ioqueue_op_key_t pkt_write_key; +}; + +struct peer +{ + struct turn_client *client; + pj_sockaddr_in addr; +}; + +struct session_data +{ + struct turn_usage *tu; + struct turn_client *client; +}; + + +/* + * This is the only public API, to create and start the TURN usage. + */ +PJ_DEF(pj_status_t) pj_stun_turn_usage_create(pj_stun_server *srv, + int type, + const pj_str_t *ip_addr, + unsigned port, + pj_stun_usage **p_bu) +{ + pj_pool_t *pool; + struct turn_usage *tu; + pj_stun_server_info *si; + pj_stun_usage_cb usage_cb; + pj_stun_session_cb sess_cb; + struct session_data *sd; + pj_sockaddr_in local_addr; + pj_status_t status; + + PJ_ASSERT_RETURN(srv && (type==PJ_SOCK_DGRAM||type==PJ_SOCK_STREAM), + PJ_EINVAL); + si = pj_stun_server_get_info(srv); + + pool = pj_pool_create(si->pf, "turn%p", 4000, 4000, NULL); + tu = PJ_POOL_ZALLOC_T(pool, struct turn_usage); + tu->pool = pool; + tu->type = type; + tu->pf = si->pf; + tu->endpt = si->endpt; + tu->ioqueue = si->ioqueue; + tu->timer_heap = si->timer_heap; + tu->next_port = START_PORT; + tu->max_bw_kbps = 64; + tu->max_lifetime = 10 * 60; + + status = pj_sockaddr_in_init(&local_addr, ip_addr, (pj_uint16_t)port); + if (status != PJ_SUCCESS) + return status; + + /* Create usage */ + pj_bzero(&usage_cb, sizeof(usage_cb)); + usage_cb.on_rx_data = &tu_on_rx_data; + usage_cb.on_destroy = &tu_on_destroy; + status = pj_stun_usage_create(srv, "turn%p", &usage_cb, + PJ_AF_INET, tu->type, 0, + &local_addr, sizeof(local_addr), + &tu->usage); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } + pj_stun_usage_set_user_data(tu->usage, tu); + + /* Init hash tables */ + tu->client_htable = pj_hash_create(tu->pool, MAX_CLIENTS); + + /* Create default session */ + pj_bzero(&sess_cb, sizeof(sess_cb)); + sess_cb.on_send_msg = &tu_sess_on_send_msg; + sess_cb.on_rx_request = &tu_sess_on_rx_request; + status = pj_stun_session_create(si->endpt, "turns%p", &sess_cb, PJ_FALSE, + &tu->default_session); + if (status != PJ_SUCCESS) { + pj_stun_usage_destroy(tu->usage); + return status; + } + + sd = PJ_POOL_ZALLOC_T(pool, struct session_data); + sd->tu = tu; + pj_stun_session_set_user_data(tu->default_session, sd); + + /* Create mutex */ + status = pj_mutex_create_recursive(pool, "turn%p", &tu->mutex); + if (status != PJ_SUCCESS) { + pj_stun_usage_destroy(tu->usage); + return status; + } + + if (p_bu) { + *p_bu = tu->usage; + } + + return PJ_SUCCESS; +} + + +/* + * This is a callback called by usage.c when the particular STUN usage + * is to be destroyed. + */ +static void tu_on_destroy(pj_stun_usage *usage) +{ + struct turn_usage *tu; + pj_hash_iterator_t hit, *it; + + tu = (struct turn_usage*) pj_stun_usage_get_user_data(usage); + + /* Destroy all clients */ + if (tu->client_htable) { + it = pj_hash_first(tu->client_htable, &hit); + while (it) { + struct turn_client *client; + + client = (struct turn_client *)pj_hash_this(tu->client_htable, it); + client_destroy(client, PJ_SUCCESS); + + it = pj_hash_first(tu->client_htable, &hit); + } + } + + pj_stun_session_destroy(tu->default_session); + pj_mutex_destroy(tu->mutex); + pj_pool_release(tu->pool); +} + + +/* + * This is a callback called by the usage.c to notify the TURN usage, + * that incoming packet (may or may not be a STUN packet) is received + * on the port where the TURN usage is listening. + */ +static void tu_on_rx_data(pj_stun_usage *usage, + void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + struct turn_usage *tu; + struct turn_client *client; + unsigned flags; + pj_status_t status; + + /* Which usage instance is this */ + tu = (struct turn_usage*) pj_stun_usage_get_user_data(usage); + + /* Lookup client structure based on source address */ + client = (struct turn_client*) pj_hash_get(tu->client_htable, src_addr, + src_addr_len, NULL); + + /* STUN message decoding flag */ + flags = 0; + if (tu->type == PJ_SOCK_DGRAM) + flags |= PJ_STUN_IS_DATAGRAM; + + + if (client) { + status = pj_stun_msg_check(pkt, pkt_size, flags); + + if (status == PJ_SUCCESS) { + /* Received STUN message */ + status = pj_stun_session_on_rx_pkt(client->session, + (pj_uint8_t*)pkt, pkt_size, + flags, NULL, + src_addr, src_addr_len); + } else if (client->active_peer) { + /* Received non-STUN message and client has active destination */ + pj_ssize_t sz = pkt_size; + pj_ioqueue_sendto(client->key, &client->pkt_write_key, + pkt, &sz, 0, + &client->active_peer->addr, + sizeof(client->active_peer->addr)); + } else { + /* Received non-STUN message and client doesn't have active + * destination. + */ + /* Ignore */ + } + + } else { + /* Received packet (could be STUN or no) from new source */ + flags |= PJ_STUN_CHECK_PACKET; + pj_stun_session_on_rx_pkt(tu->default_session, (pj_uint8_t*)pkt, + pkt_size, flags, NULL, + src_addr, src_addr_len); + } +} + + +/* + * This is a utility function provided by TU (Turn Usage) to reserve + * or allocate internal port/socket. The allocation needs to be + * coordinated to minimize bind() collissions. + */ +static pj_status_t tu_alloc_port(struct turn_usage *tu, + int type, + unsigned rpp_bits, + const pj_sockaddr_in *req_addr, + pj_sock_t *p_sock, + int *err_code) +{ + enum { RETRY = 100 }; + pj_sockaddr_in addr; + pj_sock_t sock = PJ_INVALID_SOCKET; + unsigned retry; + pj_status_t status; + + if (req_addr && req_addr->sin_port != 0) { + + *err_code = PJ_STUN_SC_INVALID_PORT; + + /* Allocate specific port */ + status = pj_sock_socket(PJ_AF_INET, type, 0, &sock); + if (status != PJ_SUCCESS) + return status; + + /* Bind */ + status = pj_sock_bind(sock, req_addr, sizeof(pj_sockaddr_in)); + if (status != PJ_SUCCESS) { + pj_sock_close(sock); + return status; + } + + /* Success */ + *p_sock = sock; + return PJ_SUCCESS; + + } else { + status = -1; + *err_code = PJ_STUN_SC_INSUFFICIENT_CAPACITY; + + if (req_addr && req_addr->sin_addr.s_addr) { + *err_code = PJ_STUN_SC_INVALID_IP_ADDR; + pj_memcpy(&addr, req_addr, sizeof(pj_sockaddr_in)); + } else { + pj_sockaddr_in_init(&addr, NULL, 0); + } + + for (retry=0; retry<RETRY && sock == PJ_INVALID_SOCKET; ++retry) { + switch (rpp_bits) { + case 1: + if ((tu->next_port & 0x01)==0) + tu->next_port++; + break; + case 2: + case 3: + if ((tu->next_port & 0x01)==1) + tu->next_port++; + break; + } + + status = pj_sock_socket(PJ_AF_INET, type, 0, &sock); + if (status != PJ_SUCCESS) + return status; + + addr.sin_port = pj_htons((pj_uint16_t)tu->next_port); + + if (++tu->next_port > END_PORT) + tu->next_port = START_PORT; + + status = pj_sock_bind(sock, &addr, sizeof(addr)); + if (status != PJ_SUCCESS) { + pj_sock_close(sock); + sock = PJ_INVALID_SOCKET; + + /* If client requested specific IP address, assume that + * bind failed because the IP address is not valid. We + * don't want to retry that since it will exhaust our + * port space. + */ + if (req_addr && req_addr->sin_addr.s_addr) + break; + } + } + + if (sock == PJ_INVALID_SOCKET) { + return status; + } + + *p_sock = sock; + return PJ_SUCCESS; + } +} + + +/* + * This callback is called by the TU's STUN session when it receives + * a valid STUN message. This is called from tu_on_rx_data above. + */ +static pj_status_t tu_sess_on_rx_request(pj_stun_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + struct session_data *sd; + struct turn_client *client; + pj_stun_tx_data *tdata; + pj_status_t status; + + PJ_UNUSED_ARG(pkt); + PJ_UNUSED_ARG(pkt_len); + + sd = (struct session_data*) pj_stun_session_get_user_data(sess); + + pj_assert(sd->client == NULL); + + if (msg->hdr.type == PJ_STUN_BINDING_REQUEST) { + return handle_binding_req(sess, msg, src_addr, src_addr_len); + + } else if (msg->hdr.type != PJ_STUN_ALLOCATE_REQUEST) { + if (PJ_STUN_IS_REQUEST(msg->hdr.type)) { + status = pj_stun_session_create_response(sess, msg, + PJ_STUN_SC_NO_BINDING, + NULL, &tdata); + if (status==PJ_SUCCESS) { + status = pj_stun_session_send_msg(sess, PJ_FALSE, + src_addr, src_addr_len, + tdata); + } + } else { + PJ_LOG(4,(THIS_FILE, + "Received %s %s without matching Allocation, " + "ignored", pj_stun_get_method_name(msg->hdr.type), + pj_stun_get_class_name(msg->hdr.type))); + } + return PJ_SUCCESS; + } + + status = client_create(sd->tu, src_addr, src_addr_len, &client); + if (status != PJ_SUCCESS) { + pj_stun_perror(THIS_FILE, "Error creating new TURN client", + status); + return status; + } + + + /* Hand over message to client */ + pj_mutex_lock(client->mutex); + status = client_handle_stun_msg(client, msg, src_addr, src_addr_len); + pj_mutex_unlock(client->mutex); + + return status; +} + + +/* + * This callback is called by STUN session when it needs to send packet + * to the network. + */ +static pj_status_t tu_sess_on_send_msg(pj_stun_session *sess, + const void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *dst_addr, + unsigned addr_len) +{ + struct session_data *sd; + + sd = (struct session_data*) pj_stun_session_get_user_data(sess); + + if (sd->tu->type == PJ_SOCK_DGRAM) { + return pj_stun_usage_sendto(sd->tu->usage, pkt, pkt_size, 0, + dst_addr, addr_len); + } else { + return PJ_ENOTSUP; + } +} + + +/****************************************************************************/ +/* + * TURN client operations. + */ + +/* Function prototypes */ +static pj_status_t client_create_relay(struct turn_client *client); +static pj_status_t client_destroy_relay(struct turn_client *client); +static void client_on_expired(pj_timer_heap_t *th, pj_timer_entry *e); +static void client_on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); +static pj_status_t client_respond(struct turn_client *client, + const pj_stun_msg *msg, + int err_code, + const char *err_msg, + const pj_sockaddr_t *dst_addr, + int dst_addr_len); +static struct peer* client_get_peer(struct turn_client *client, + const pj_sockaddr_in *peer_addr, + pj_uint32_t *hval); +static struct peer* client_add_peer(struct turn_client *client, + const pj_sockaddr_in *peer_addr, + pj_uint32_t hval); + +static const char *get_tp_type(int type) +{ + if (type==PJ_SOCK_DGRAM) + return "udp"; + else if (type==PJ_SOCK_STREAM) + return "tcp"; + else + return "???"; +} + + +/* + * This callback is called when incoming STUN message is received + * in the TURN usage. This is called from by tu_on_rx_data() when + * the packet is handed over to the client. + */ +static pj_status_t client_sess_on_rx_msg(pj_stun_session *sess, + const pj_uint8_t *pkt, + unsigned pkt_len, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + struct session_data *sd; + + PJ_UNUSED_ARG(pkt); + PJ_UNUSED_ARG(pkt_len); + + sd = (struct session_data*) pj_stun_session_get_user_data(sess); + pj_assert(sd->client != PJ_SUCCESS); + + return client_handle_stun_msg(sd->client, msg, src_addr, src_addr_len); +} + + +/* + * This callback is called by client's STUN session to send outgoing + * STUN packet. It's called when client calls pj_stun_session_send_msg() + * function. + */ +static pj_status_t client_sess_on_send_msg(pj_stun_session *sess, + const void *pkt, + pj_size_t pkt_size, + const pj_sockaddr_t *dst_addr, + unsigned addr_len) +{ + struct session_data *sd; + + sd = (struct session_data*) pj_stun_session_get_user_data(sess); + + if (sd->tu->type == PJ_SOCK_DGRAM) { + return pj_stun_usage_sendto(sd->tu->usage, pkt, pkt_size, 0, + dst_addr, addr_len); + } else { + return PJ_ENOTSUP; + } +} + + +/* + * Create a new TURN client for the specified source address. + */ +static pj_status_t client_create(struct turn_usage *tu, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len, + struct turn_client **p_client) +{ + pj_pool_t *pool; + struct turn_client *client; + pj_stun_session_cb sess_cb; + struct session_data *sd; + pj_status_t status; + + PJ_ASSERT_RETURN(src_addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL); + + pool = pj_pool_create(tu->pf, "turnc%p", 4000, 4000, NULL); + client = PJ_POOL_ZALLOC_T(pool, struct turn_client); + client->pool = pool; + client->tu = tu; + client->sock = PJ_INVALID_SOCKET; + + pj_memcpy(&client->client_src_addr, src_addr, + sizeof(client->client_src_addr)); + + if (src_addr) { + const pj_sockaddr_in *a4 = (const pj_sockaddr_in *)src_addr; + pj_ansi_snprintf(client->obj_name, sizeof(client->obj_name), + "%s:%s:%d", + get_tp_type(tu->type), + pj_inet_ntoa(a4->sin_addr), + (int)pj_ntohs(a4->sin_port)); + client->obj_name[sizeof(client->obj_name)-1] = '\0'; + } + + /* Create session */ + pj_bzero(&sess_cb, sizeof(sess_cb)); + sess_cb.on_send_msg = &client_sess_on_send_msg; + sess_cb.on_rx_request = &client_sess_on_rx_msg; + sess_cb.on_rx_indication = &client_sess_on_rx_msg; + status = pj_stun_session_create(tu->endpt, client->obj_name, + &sess_cb, PJ_FALSE, + &client->session); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + return status; + } + + sd = PJ_POOL_ZALLOC_T(pool, struct session_data); + sd->tu = tu; + sd->client = client; + pj_stun_session_set_user_data(client->session, sd); + + /* Mutex */ + status = pj_mutex_create_recursive(client->pool, pool->obj_name, + &client->mutex); + if (status != PJ_SUCCESS) { + client_destroy(client, status); + return status; + } + + /* Create hash table */ + client->peer_htable = pj_hash_create(client->pool, MAX_PEER_PER_CLIENT); + if (client->peer_htable == NULL) { + client_destroy(client, status); + return PJ_ENOMEM; + } + + /* Init timer entry */ + client->expiry_timer.user_data = client; + client->expiry_timer.cb = &client_on_expired; + client->expiry_timer.id = 0; + + /* Register to hash table */ + pj_mutex_lock(tu->mutex); + pj_hash_set(pool, tu->client_htable, src_addr, src_addr_len, 0, client); + pj_mutex_unlock(tu->mutex); + + /* Done */ + *p_client = client; + + PJ_LOG(4,(THIS_FILE, "TURN client %s created", client->obj_name)); + + return PJ_SUCCESS; +} + + +/* + * Destroy TURN client. + */ +static pj_status_t client_destroy(struct turn_client *client, + pj_status_t reason) +{ + struct turn_usage *tu = client->tu; + char name[PJ_MAX_OBJ_NAME]; + + pj_assert(sizeof(name)==sizeof(client->obj_name)); + pj_memcpy(name, client->obj_name, sizeof(name)); + + /* Kill timer if it's active */ + if (client->expiry_timer.id != 0) { + pj_timer_heap_cancel(tu->timer_heap, &client->expiry_timer); + client->expiry_timer.id = PJ_FALSE; + } + + /* Destroy relay */ + client_destroy_relay(client); + + /* Unregister client from hash table */ + pj_mutex_lock(tu->mutex); + pj_hash_set(NULL, tu->client_htable, + &client->client_src_addr, sizeof(client->client_src_addr), + 0, NULL); + pj_mutex_unlock(tu->mutex); + + /* Destroy STUN session */ + if (client->session) { + pj_stun_session_destroy(client->session); + client->session = NULL; + } + + /* Mutex */ + if (client->mutex) { + pj_mutex_destroy(client->mutex); + client->mutex = NULL; + } + + /* Finally destroy pool */ + if (client->pool) { + pj_pool_t *pool = client->pool; + client->pool = NULL; + pj_pool_release(pool); + } + + if (reason == PJ_SUCCESS) { + PJ_LOG(4,(THIS_FILE, "TURN client %s destroyed", name)); + } + + return PJ_SUCCESS; +} + + +/* + * This utility function is used to setup relay (with ioqueue) after + * socket has been allocated for the TURN client. + */ +static pj_status_t client_create_relay(struct turn_client *client) +{ + pj_ioqueue_callback client_ioq_cb; + int addrlen; + pj_status_t status; + + /* Update address */ + addrlen = sizeof(pj_sockaddr_in); + status = pj_sock_getsockname(client->sock, &client->alloc_addr, + &addrlen); + if (status != PJ_SUCCESS) { + pj_sock_close(client->sock); + client->sock = PJ_INVALID_SOCKET; + return status; + } + + if (client->alloc_addr.sin_addr.s_addr == 0) { + status = pj_gethostip(&client->alloc_addr.sin_addr); + if (status != PJ_SUCCESS) { + pj_sock_close(client->sock); + client->sock = PJ_INVALID_SOCKET; + return status; + } + } + + /* Register to ioqueue */ + pj_bzero(&client_ioq_cb, sizeof(client_ioq_cb)); + client_ioq_cb.on_read_complete = &client_on_read_complete; + status = pj_ioqueue_register_sock(client->pool, client->tu->ioqueue, + client->sock, client, + &client_ioq_cb, &client->key); + if (status != PJ_SUCCESS) { + pj_sock_close(client->sock); + client->sock = PJ_INVALID_SOCKET; + return status; + } + + pj_ioqueue_op_key_init(&client->pkt_read_key, + sizeof(client->pkt_read_key)); + pj_ioqueue_op_key_init(&client->pkt_write_key, + sizeof(client->pkt_write_key)); + + /* Trigger the first read */ + client_on_read_complete(client->key, &client->pkt_read_key, 0); + + PJ_LOG(4,(THIS_FILE, "TURN client %s: relay allocated on %s:%s:%d", + client->obj_name, + get_tp_type(client->sock_type), + pj_inet_ntoa(client->alloc_addr.sin_addr), + (int)pj_ntohs(client->alloc_addr.sin_port))); + + return PJ_SUCCESS; +} + + +/* + * This utility function is used to destroy the port allocated for + * the TURN client. + */ +static pj_status_t client_destroy_relay(struct turn_client *client) +{ + /* Close socket */ + if (client->key) { + pj_ioqueue_unregister(client->key); + client->key = NULL; + client->sock = PJ_INVALID_SOCKET; + } else if (client->sock && client->sock != PJ_INVALID_SOCKET) { + pj_sock_close(client->sock); + client->sock = PJ_INVALID_SOCKET; + } + + PJ_LOG(4,(THIS_FILE, "TURN client %s: relay allocation %s:%s:%d destroyed", + client->obj_name, + get_tp_type(client->sock_type), + pj_inet_ntoa(client->alloc_addr.sin_addr), + (int)pj_ntohs(client->alloc_addr.sin_port))); + return PJ_SUCCESS; +} + + +/* + * From the source packet address, get the peer instance from hash table. + */ +static struct peer* client_get_peer(struct turn_client *client, + const pj_sockaddr_in *peer_addr, + pj_uint32_t *hval) +{ + return (struct peer*) + pj_hash_get(client->peer_htable, peer_addr, sizeof(*peer_addr), hval); +} + + +/* + * Add a peer instance to the peer hash table. + */ +static struct peer* client_add_peer(struct turn_client *client, + const pj_sockaddr_in *peer_addr, + unsigned hval) +{ + struct peer *peer; + + peer = PJ_POOL_ZALLOC_T(client->pool, struct peer); + peer->client = client; + pj_memcpy(&peer->addr, peer_addr, sizeof(peer->addr)); + + pj_hash_set(client->pool, client->peer_htable, + &peer->addr, sizeof(peer->addr), hval, peer); + + PJ_LOG(4,(THIS_FILE, "TURN client %s: peer %s:%s:%d added", + client->obj_name, get_tp_type(client->sock_type), + pj_inet_ntoa(peer->addr.sin_addr), + (int)pj_ntohs(peer->addr.sin_port))); + + return peer; +} + + +/* + * Utility to send STUN response message (normally to send error response). + */ +static pj_status_t client_respond(struct turn_client *client, + const pj_stun_msg *msg, + int err_code, + const char *custom_msg, + const pj_sockaddr_t *dst_addr, + int dst_addr_len) +{ + pj_str_t err_msg; + pj_str_t *p_err_msg = NULL; + pj_stun_tx_data *response; + pj_status_t status; + + if (custom_msg) + pj_cstr(&err_msg, custom_msg), p_err_msg = &err_msg; + + status = pj_stun_session_create_response(client->session, msg, + err_code, p_err_msg, + &response); + if (status == PJ_SUCCESS) + status = pj_stun_session_send_msg(client->session, PJ_TRUE, + dst_addr, dst_addr_len, response); + + return status; +} + + +/* + * Handle incoming initial or subsequent Allocate Request. + * This function is called by client_handle_stun_msg() below. + */ +static pj_status_t client_handle_allocate_req(struct turn_client *client, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + const pj_stun_bandwidth_attr *a_bw; + const pj_stun_lifetime_attr *a_lf; + const pj_stun_req_port_props_attr *a_rpp; + const pj_stun_req_transport_attr *a_rt; + const pj_stun_req_ip_attr *a_rip; + pj_stun_tx_data *response; + pj_sockaddr_in req_addr; + int addr_len; + unsigned req_bw, rpp_bits; + pj_time_val timeout; + pj_status_t status; + + a_bw = (const pj_stun_bandwidth_attr *) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_BANDWIDTH, 0); + a_lf = (const pj_stun_lifetime_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_LIFETIME, 0); + a_rpp = (const pj_stun_req_port_props_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_REQ_PORT_PROPS, 0); + a_rt = (const pj_stun_req_transport_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_REQ_TRANSPORT, 0); + a_rip = (const pj_stun_req_ip_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_REQ_IP, 0); + + /* Init requested local address */ + pj_sockaddr_in_init(&req_addr, NULL, 0); + + /* Process BANDWIDTH attribute */ + if (a_bw && a_bw->value > client->tu->max_bw_kbps) { + client_respond(client, msg, PJ_STUN_SC_INSUFFICIENT_CAPACITY, NULL, + src_addr, src_addr_len); + return PJ_SUCCESS; + } else if (a_bw) { + client->bw_kbps = req_bw = a_bw->value; + } else { + req_bw = 0; + client->bw_kbps = client->tu->max_bw_kbps; + } + + /* Process REQUESTED-TRANSPORT attribute */ + if (a_rt && a_rt->value != 0) { + client_respond(client, msg, PJ_STUN_SC_UNSUPP_TRANSPORT_PROTO, NULL, + src_addr, src_addr_len); + return PJ_SUCCESS; + } else if (a_rt) { + client->sock_type = a_rt->value ? PJ_SOCK_STREAM : PJ_SOCK_DGRAM; + } else { + client->sock_type = client->tu->type;; + } + + /* Process REQUESTED-IP attribute */ + if (a_rip && a_rip->addr.addr.sa_family != PJ_AF_INET) { + client_respond(client, msg, PJ_STUN_SC_INVALID_IP_ADDR, NULL, + src_addr, src_addr_len); + return PJ_SUCCESS; + + } else if (a_rip) { + req_addr.sin_addr.s_addr = a_rip->addr.ipv4.sin_addr.s_addr; + } + + /* Process REQUESTED-PORT-PROPS attribute */ + if (a_rpp) { + unsigned port; + + rpp_bits = (a_rpp->value & 0x00030000) >> 16; + port = (a_rpp->value & 0xFFFF); + req_addr.sin_port = pj_htons((pj_uint8_t)port); + } else { + rpp_bits = 0; + } + + /* Process LIFETIME attribute */ + if (a_lf && a_lf->value > client->tu->max_lifetime) { + client->lifetime = client->tu->max_lifetime; + } else if (a_lf) { + client->lifetime = a_lf->value; + } else { + client->lifetime = client->tu->max_lifetime; + } + + /* Allocate socket if we don't have one */ + if (client->key == NULL) { + int err_code; + + PJ_LOG(4,(THIS_FILE, "TURN client %s: received initial Allocate " + "request, requested type:addr:port=%s:%s:%d, rpp " + "bits=%d, bw=%dkbps, lifetime=%d", + client->obj_name, get_tp_type(client->sock_type), + pj_inet_ntoa(req_addr.sin_addr), pj_ntohs(req_addr.sin_port), + rpp_bits, client->bw_kbps, client->lifetime)); + + status = tu_alloc_port(client->tu, client->sock_type, rpp_bits, + &req_addr, &client->sock, &err_code); + if (status != PJ_SUCCESS) { + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(THIS_FILE, "TURN client %s: error allocating relay port" + ": %s", + client->obj_name, errmsg)); + + client_respond(client, msg, err_code, NULL, + src_addr, src_addr_len); + + return status; + } + + status = client_create_relay(client); + if (status != PJ_SUCCESS) { + client_respond(client, msg, PJ_STUN_SC_SERVER_ERROR, NULL, + src_addr, src_addr_len); + return status; + } + } else { + /* Otherwise check if the port parameter stays the same */ + /* TODO */ + PJ_LOG(4,(THIS_FILE, "TURN client %s: received Allocate refresh, " + "lifetime=%d", + client->obj_name, client->lifetime)); + } + + /* Refresh timer */ + if (client->expiry_timer.id != PJ_FALSE) { + pj_timer_heap_cancel(client->tu->timer_heap, &client->expiry_timer); + client->expiry_timer.id = PJ_FALSE; + } + timeout.sec = client->lifetime; + timeout.msec = 0; + pj_timer_heap_schedule(client->tu->timer_heap, &client->expiry_timer, &timeout); + client->expiry_timer.id = PJ_TRUE; + + /* Done successfully, create and send success response */ + status = pj_stun_session_create_response(client->session, msg, + 0, NULL, &response); + if (status != PJ_SUCCESS) { + return status; + } + + pj_stun_msg_add_uint_attr(response->pool, response->msg, + PJ_STUN_ATTR_BANDWIDTH, client->bw_kbps); + pj_stun_msg_add_uint_attr(response->pool, response->msg, + PJ_STUN_ATTR_LIFETIME, client->lifetime); + pj_stun_msg_add_sockaddr_attr(response->pool, response->msg, + PJ_STUN_ATTR_MAPPED_ADDR, PJ_FALSE, + src_addr, src_addr_len); + pj_stun_msg_add_sockaddr_attr(response->pool, response->msg, + PJ_STUN_ATTR_XOR_MAPPED_ADDR, PJ_TRUE, + src_addr, src_addr_len); + + addr_len = sizeof(req_addr); + pj_sock_getsockname(client->sock, &req_addr, &addr_len); + pj_stun_msg_add_sockaddr_attr(response->pool, response->msg, + PJ_STUN_ATTR_RELAY_ADDR, PJ_FALSE, + &client->alloc_addr, addr_len); + + PJ_LOG(4,(THIS_FILE, "TURN client %s: relay allocated or refreshed, " + "internal address is %s:%s:%d", + client->obj_name, + get_tp_type(client->sock_type), + pj_inet_ntoa(req_addr.sin_addr), + (int)pj_ntohs(req_addr.sin_port))); + + return pj_stun_session_send_msg(client->session, PJ_TRUE, + src_addr, src_addr_len, response); +} + + +/* + * Handle incoming Binding request. + * This function is called by client_handle_stun_msg() below. + */ +static pj_status_t handle_binding_req(pj_stun_session *session, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + pj_stun_tx_data *tdata; + pj_status_t status; + + /* Create response */ + status = pj_stun_session_create_response(session, msg, 0, NULL, + &tdata); + if (status != PJ_SUCCESS) + return status; + + /* Create MAPPED-ADDRESS attribute */ + pj_stun_msg_add_sockaddr_attr(tdata->pool, tdata->msg, + PJ_STUN_ATTR_MAPPED_ADDR, + PJ_FALSE, + src_addr, src_addr_len); + + /* On the presence of magic, create XOR-MAPPED-ADDRESS attribute */ + if (msg->hdr.magic == PJ_STUN_MAGIC) { + status = + pj_stun_msg_add_sockaddr_attr(tdata->pool, tdata->msg, + PJ_STUN_ATTR_XOR_MAPPED_ADDR, + PJ_TRUE, + src_addr, src_addr_len); + } + + /* Send */ + status = pj_stun_session_send_msg(session, PJ_TRUE, + src_addr, src_addr_len, tdata); + return status; +} + + +/* + * client handling incoming STUN Set Active Destination request + * This function is called by client_handle_stun_msg() below. + */ +static pj_status_t client_handle_sad(struct turn_client *client, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + pj_stun_remote_addr_attr *a_raddr; + + a_raddr = (pj_stun_remote_addr_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_REMOTE_ADDR, 0); + if (!a_raddr) { + /* Remote active destination needs to be cleared */ + client->active_peer = NULL; + + } else if (a_raddr->addr.addr.sa_family != PJ_AF_INET) { + /* Bad request (not IPv4) */ + client_respond(client, msg, PJ_STUN_SC_BAD_REQUEST, NULL, + src_addr, src_addr_len); + return PJ_SUCCESS; + + } else if (client->active_peer) { + /* Client tries to set new active destination without clearing + * it first. Reject with 439. + */ + client_respond(client, msg, PJ_STUN_SC_TRANSITIONING, NULL, + src_addr, src_addr_len); + return PJ_SUCCESS; + + } else { + struct peer *peer; + pj_uint32_t hval = 0; + + /* Add a new peer/permission if we don't have one for this address */ + peer = client_get_peer(client, &a_raddr->addr.ipv4, &hval); + if (peer==NULL) { + peer = client_add_peer(client, &a_raddr->addr.ipv4, hval); + } + + /* Set active destination */ + client->active_peer = peer; + } + + if (client->active_peer) { + PJ_LOG(4,(THIS_FILE, + "TURN client %s: active destination set to %s:%d", + client->obj_name, + pj_inet_ntoa(client->active_peer->addr.sin_addr), + (int)pj_ntohs(client->active_peer->addr.sin_port))); + } else { + PJ_LOG(4,(THIS_FILE, "TURN client %s: active destination cleared", + client->obj_name)); + } + + /* Respond with successful response */ + client_respond(client, msg, 0, NULL, src_addr, src_addr_len); + + return PJ_SUCCESS; +} + + +/* + * client handling incoming STUN Send Indication + * This function is called by client_handle_stun_msg() below. + */ +static pj_status_t client_handle_send_ind(struct turn_client *client, + const pj_stun_msg *msg) +{ + pj_stun_remote_addr_attr *a_raddr; + pj_stun_data_attr *a_data; + pj_uint32_t hval = 0; + const pj_uint8_t *data; + pj_ssize_t datalen; + + /* Get REMOTE-ADDRESS attribute */ + a_raddr = (pj_stun_remote_addr_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_REMOTE_ADDR, 0); + if (!a_raddr) { + /* REMOTE-ADDRESS not present, discard packet */ + return PJ_SUCCESS; + + } else if (a_raddr->addr.addr.sa_family != PJ_AF_INET) { + /* REMOTE-ADDRESS present but not IPv4, discard packet */ + return PJ_SUCCESS; + + } + + /* Get the DATA attribute */ + a_data = (pj_stun_data_attr*) + pj_stun_msg_find_attr(msg, PJ_STUN_ATTR_DATA, 0); + if (a_data) { + data = (const pj_uint8_t *)a_data->data; + datalen = a_data->length; + + } else if (client->sock_type == PJ_SOCK_STREAM) { + /* Discard if no Data and Allocation type is TCP */ + return PJ_SUCCESS; + + } else { + data = (const pj_uint8_t *)""; + datalen = 0; + } + + /* Add to peer table if necessary */ + if (client_get_peer(client, &a_raddr->addr.ipv4, &hval)==NULL) + client_add_peer(client, &a_raddr->addr.ipv4, hval); + + /* Send the packet */ + pj_ioqueue_sendto(client->key, &client->pkt_write_key, + data, &datalen, 0, + &a_raddr->addr.ipv4, sizeof(a_raddr->addr.ipv4)); + + return PJ_SUCCESS; +} + + +/* + * client handling unknown incoming STUN message. + * This function is called by client_handle_stun_msg() below. + */ +static pj_status_t client_handle_unknown_msg(struct turn_client *client, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + PJ_LOG(4,(THIS_FILE, "TURN client %s: unhandled %s %s", + client->obj_name, pj_stun_get_method_name(msg->hdr.type), + pj_stun_get_class_name(msg->hdr.type))); + + if (PJ_STUN_IS_REQUEST(msg->hdr.type)) { + return client_respond(client, msg, PJ_STUN_SC_BAD_REQUEST, NULL, + src_addr, src_addr_len); + } else { + /* Ignore */ + return PJ_SUCCESS; + } +} + + +/* + * Main entry for handling STUN messages arriving on the main TURN port, + * for this client + */ +static pj_status_t client_handle_stun_msg(struct turn_client *client, + const pj_stun_msg *msg, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len) +{ + pj_status_t status; + + switch (msg->hdr.type) { + case PJ_STUN_SEND_INDICATION: + status = client_handle_send_ind(client, msg); + break; + + case PJ_STUN_SET_ACTIVE_DESTINATION_REQUEST: + status = client_handle_sad(client, msg, + src_addr, src_addr_len); + break; + + case PJ_STUN_ALLOCATE_REQUEST: + status = client_handle_allocate_req(client, msg, + src_addr, src_addr_len); + break; + + case PJ_STUN_BINDING_REQUEST: + status = handle_binding_req(client->session, msg, + src_addr, src_addr_len); + break; + + default: + status = client_handle_unknown_msg(client, msg, + src_addr, src_addr_len); + break; + } + + return status; +} + + +PJ_INLINE(pj_uint32_t) GET_VAL32(const pj_uint8_t *pdu, unsigned pos) +{ + return (pdu[pos+0] << 24) + + (pdu[pos+1] << 16) + + (pdu[pos+2] << 8) + + (pdu[pos+3]); +} + + +/* + * Handle incoming data from peer + * This function is called by client_on_read_complete() below. + */ +static void client_handle_peer_data(struct turn_client *client, + unsigned bytes_read) +{ + struct peer *peer; + pj_bool_t has_magic_cookie; + pj_status_t status; + + /* Has the sender been registered as peer? */ + peer = client_get_peer(client, &client->pkt_src_addr, NULL); + if (peer == NULL) { + /* Nope. Discard packet */ + PJ_LOG(5,(THIS_FILE, + "TURN client %s: discarded data from %s:%d", + client->obj_name, + pj_inet_ntoa(client->pkt_src_addr.sin_addr), + (int)pj_ntohs(client->pkt_src_addr.sin_port))); + return; + } + + /* Check if packet has STUN magic cookie */ + has_magic_cookie = (GET_VAL32(client->pkt, 4) == PJ_STUN_MAGIC); + + /* If this is the Active Destination and the packet doesn't have + * STUN magic cookie, send the packet to client as is. + */ + if (peer == client->active_peer && !has_magic_cookie) { + pj_stun_usage_sendto(client->tu->usage, client->pkt, bytes_read, 0, + &client->pkt_src_addr, client->pkt_src_addr_len); + } else { + /* Otherwise wrap in Data Indication */ + pj_stun_tx_data *data_ind; + + status = pj_stun_session_create_ind(client->session, + PJ_STUN_DATA_INDICATION, + &data_ind); + if (status != PJ_SUCCESS) + return; + + pj_stun_msg_add_sockaddr_attr(data_ind->pool, data_ind->msg, + PJ_STUN_ATTR_REMOTE_ADDR, PJ_FALSE, + &client->pkt_src_addr, + client->pkt_src_addr_len); + pj_stun_msg_add_binary_attr(data_ind->pool, data_ind->msg, + PJ_STUN_ATTR_DATA, + client->pkt, bytes_read); + + + pj_stun_session_send_msg(client->session, PJ_FALSE, + &client->pkt_src_addr, + client->pkt_src_addr_len, + data_ind); + } +} + + +/* + * This callback is called by the ioqueue when read operation has + * completed on the allocated relay port. + */ +static void client_on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + enum { MAX_LOOP = 10 }; + struct turn_client *client; + unsigned count; + pj_status_t status; + + PJ_UNUSED_ARG(op_key); + + client = pj_ioqueue_get_user_data(key); + + /* Lock client */ + pj_mutex_lock(client->mutex); + + for (count=0; ; ++count) { + unsigned flags; + + if (bytes_read > 0) { + /* Received data from peer! */ + client_handle_peer_data(client, bytes_read); + + } else if (bytes_read < 0) { + char errmsg[PJ_ERR_MSG_SIZE]; + pj_strerror(-bytes_read, errmsg, sizeof(errmsg)); + PJ_LOG(4,(THIS_FILE, "TURN client %s: error reading data " + "from allocated relay port: %s", + client->obj_name, errmsg)); + } + + bytes_read = sizeof(client->pkt); + flags = (count >= MAX_LOOP) ? PJ_IOQUEUE_ALWAYS_ASYNC : 0; + client->pkt_src_addr_len = sizeof(client->pkt_src_addr); + status = pj_ioqueue_recvfrom(client->key, + &client->pkt_read_key, + client->pkt, &bytes_read, flags, + &client->pkt_src_addr, + &client->pkt_src_addr_len); + if (status == PJ_EPENDING) + break; + } + + /* Unlock client */ + pj_mutex_unlock(client->mutex); +} + + +/* On Allocation timer timeout (i.e. we don't receive new Allocate request + * to refresh the allocation in time) + */ +static void client_on_expired(pj_timer_heap_t *th, pj_timer_entry *e) +{ + struct turn_client *client; + + PJ_UNUSED_ARG(th); + + client = (struct turn_client*) e->user_data; + + PJ_LOG(4,(THIS_FILE, "TURN client %s: allocation timer timeout, " + "destroying client", + client->obj_name)); + client_destroy(client, PJ_SUCCESS); +} + diff --git a/pjnath/src/pjstun-srv-test/usage.c b/pjnath/src/pjstun-srv-test/usage.c new file mode 100644 index 00000000..a8a5c274 --- /dev/null +++ b/pjnath/src/pjstun-srv-test/usage.c @@ -0,0 +1,271 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2005 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "server.h" + +struct worker +{ + pj_ioqueue_op_key_t read_key; + unsigned index; + pj_uint8_t readbuf[4000]; + pj_sockaddr src_addr; + int src_addr_len; +}; + +struct pj_stun_usage +{ + pj_pool_t *pool; + pj_stun_server *srv; + pj_mutex_t *mutex; + pj_stun_usage_cb cb; + int type; + pj_sock_t sock; + pj_ioqueue_key_t *key; + unsigned worker_cnt; + struct worker *worker; + + pj_ioqueue_op_key_t *send_key; + unsigned send_count, send_index; + + pj_bool_t quitting; + void *user_data; +}; + + +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); + +/* + * Create STUN usage. + */ +PJ_DEF(pj_status_t) pj_stun_usage_create( pj_stun_server *srv, + const char *name, + const pj_stun_usage_cb *cb, + int family, + int type, + int protocol, + const pj_sockaddr_t *local_addr, + int addr_len, + pj_stun_usage **p_usage) +{ + pj_stun_server_info *si; + pj_pool_t *pool; + pj_stun_usage *usage; + pj_ioqueue_callback ioqueue_cb; + unsigned i; + pj_status_t status; + + si = pj_stun_server_get_info(srv); + + pool = pj_pool_create(si->pf, name, 4000, 4000, NULL); + usage = PJ_POOL_ZALLOC_T(pool, pj_stun_usage); + usage->pool = pool; + usage->srv = srv; + + status = pj_mutex_create_simple(pool, name, &usage->mutex); + if (status != PJ_SUCCESS) + goto on_error; + + usage->type = type; + status = pj_sock_socket(family, type, protocol, &usage->sock); + if (status != PJ_SUCCESS) + goto on_error; + + status = pj_sock_bind(usage->sock, local_addr, addr_len); + if (status != PJ_SUCCESS) + goto on_error; + + pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb)); + ioqueue_cb.on_read_complete = &on_read_complete; + status = pj_ioqueue_register_sock(usage->pool, si->ioqueue, usage->sock, + usage, &ioqueue_cb, &usage->key); + if (status != PJ_SUCCESS) + goto on_error; + + usage->worker_cnt = si->thread_cnt; + usage->worker = pj_pool_calloc(pool, si->thread_cnt, + sizeof(struct worker)); + for (i=0; i<si->thread_cnt; ++i) { + pj_ioqueue_op_key_init(&usage->worker[i].read_key, + sizeof(usage->worker[i].read_key)); + usage->worker[i].index = i; + } + + usage->send_count = usage->worker_cnt * 2; + usage->send_key = pj_pool_calloc(pool, usage->send_count, + sizeof(pj_ioqueue_op_key_t)); + for (i=0; i<usage->send_count; ++i) { + pj_ioqueue_op_key_init(&usage->send_key[i], + sizeof(usage->send_key[i])); + } + + for (i=0; i<si->thread_cnt; ++i) { + pj_ssize_t size; + + size = sizeof(usage->worker[i].readbuf); + usage->worker[i].src_addr_len = sizeof(usage->worker[i].src_addr); + status = pj_ioqueue_recvfrom(usage->key, &usage->worker[i].read_key, + usage->worker[i].readbuf, &size, + PJ_IOQUEUE_ALWAYS_ASYNC, + &usage->worker[i].src_addr, + &usage->worker[i].src_addr_len); + if (status != PJ_EPENDING) + goto on_error; + } + + pj_stun_server_register_usage(srv, usage); + + /* Only after everything has been initialized we copy the callback, + * to prevent callback from being called when we encounter error + * during initialiation (decendant would not expect this). + */ + pj_memcpy(&usage->cb, cb, sizeof(*cb)); + + *p_usage = usage; + return PJ_SUCCESS; + +on_error: + pj_stun_usage_destroy(usage); + return status; +} + + +/** + * Destroy usage. + */ +PJ_DEF(pj_status_t) pj_stun_usage_destroy(pj_stun_usage *usage) +{ + pj_stun_server_unregister_usage(usage->srv, usage); + if (usage->cb.on_destroy) + (*usage->cb.on_destroy)(usage); + + if (usage->key) { + pj_ioqueue_unregister(usage->key); + usage->key = NULL; + usage->sock = PJ_INVALID_SOCKET; + } else if (usage->sock != 0 && usage->sock != PJ_INVALID_SOCKET) { + pj_sock_close(usage->sock); + usage->sock = PJ_INVALID_SOCKET; + } + + if (usage->mutex) { + pj_mutex_destroy(usage->mutex); + usage->mutex = NULL; + } + + if (usage->pool) { + pj_pool_t *pool = usage->pool; + usage->pool = NULL; + pj_pool_release(pool); + } + + return PJ_SUCCESS; +} + + +/** + * Set user data. + */ +PJ_DEF(pj_status_t) pj_stun_usage_set_user_data( pj_stun_usage *usage, + void *user_data) +{ + usage->user_data = user_data; + return PJ_SUCCESS; +} + +/** + * Get user data. + */ +PJ_DEF(void*) pj_stun_usage_get_user_data(pj_stun_usage *usage) +{ + return usage->user_data; +} + + +/** + * Send with the usage. + */ +PJ_DEF(pj_status_t) pj_stun_usage_sendto( pj_stun_usage *usage, + const void *pkt, + pj_size_t pkt_size, + unsigned flags, + const pj_sockaddr_t *dst_addr, + unsigned addr_len) +{ + pj_ssize_t size = pkt_size; + unsigned i, count = usage->send_count, index; + + pj_mutex_lock(usage->mutex); + for (i=0, ++usage->send_index; i<count; ++i, ++usage->send_index) { + if (usage->send_index >= usage->send_count) + usage->send_index = 0; + + if (pj_ioqueue_is_pending(usage->key, &usage->send_key[usage->send_index])==0) { + break; + } + } + + if (i==count) { + pj_mutex_unlock(usage->mutex); + return PJ_EBUSY; + } + + index = usage->send_index; + pj_mutex_unlock(usage->mutex); + + return pj_ioqueue_sendto(usage->key, &usage->send_key[index], + pkt, &size, flags, + dst_addr, addr_len); +} + + +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + enum { MAX_LOOP = 10 }; + pj_stun_usage *usage = pj_ioqueue_get_user_data(key); + struct worker *worker = (struct worker*) op_key; + unsigned count; + pj_status_t status; + + for (count=0; !usage->quitting; ++count) { + unsigned flags; + + if (bytes_read > 0) { + (*usage->cb.on_rx_data)(usage, worker->readbuf, bytes_read, + &worker->src_addr, worker->src_addr_len); + } else if (bytes_read < 0) { + pj_stun_perror(usage->pool->obj_name, "recv() error", -bytes_read); + } + + if (usage->quitting) + break; + + bytes_read = sizeof(worker->readbuf); + flags = (count >= MAX_LOOP) ? PJ_IOQUEUE_ALWAYS_ASYNC : 0; + worker->src_addr_len = sizeof(worker->src_addr); + status = pj_ioqueue_recvfrom(usage->key, &worker->read_key, + worker->readbuf, &bytes_read, flags, + &worker->src_addr, &worker->src_addr_len); + if (status == PJ_EPENDING) + break; + } +} + |