summaryrefslogtreecommitdiff
path: root/pjnath/src/pjnath-test/concur_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjnath/src/pjnath-test/concur_test.c')
-rw-r--r--pjnath/src/pjnath-test/concur_test.c367
1 files changed, 367 insertions, 0 deletions
diff --git a/pjnath/src/pjnath-test/concur_test.c b/pjnath/src/pjnath-test/concur_test.c
new file mode 100644
index 0000000..bf54e94
--- /dev/null
+++ b/pjnath/src/pjnath-test/concur_test.c
@@ -0,0 +1,367 @@
+/* $Id: concur_test.c 4412 2013-03-05 03:12:32Z riza $ */
+/*
+ * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
+ * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include "test.h"
+
+#if INCLUDE_CONCUR_TEST
+
+#define THIS_FILE "concur_test.c"
+
+/****************************************************************************/
+#define WORKER_THREAD_CNT 4
+#define SERVER_THREAD_CNT 4
+#define MAX_SOCK_CLIENTS 80
+
+struct stun_test_session
+{
+ pj_stun_config stun_cfg;
+
+ pj_lock_t *lock;
+
+ pj_thread_t *worker_threads[WORKER_THREAD_CNT];
+
+ pj_sock_t server_sock;
+ int server_port;
+ pj_thread_t *server_threads[SERVER_THREAD_CNT];
+ pj_event_t *server_event;
+
+ pj_bool_t thread_quit_flag;
+
+ /* Test parameters: */
+ struct {
+ int client_got_response;
+
+ pj_bool_t server_wait_for_event;
+ pj_bool_t server_drop_request;
+ int client_sleep_after_start;
+ int client_sleep_before_destroy;
+ } param;
+};
+
+static int server_thread_proc(void *p)
+{
+ struct stun_test_session *test_sess = (struct stun_test_session*)p;
+ pj_pool_t *pool;
+ pj_status_t status;
+
+ PJ_LOG(4,(THIS_FILE, "Server thread running"));
+
+ pool = pj_pool_create(test_sess->stun_cfg.pf, "server", 512, 512, NULL);
+
+ while (!test_sess->thread_quit_flag) {
+ pj_time_val timeout = {0, 10};
+ pj_fd_set_t rdset;
+ int n;
+
+ /* Serve client */
+ PJ_FD_ZERO(&rdset);
+ PJ_FD_SET(test_sess->server_sock, &rdset);
+ n = pj_sock_select(test_sess->server_sock+1, &rdset,
+ NULL, NULL, &timeout);
+ if (n==1 && PJ_FD_ISSET(test_sess->server_sock, &rdset)) {
+ pj_uint8_t pkt[512];
+ pj_ssize_t pkt_len;
+ pj_size_t res_len;
+ pj_sockaddr client_addr;
+ int addr_len;
+
+ pj_stun_msg *stun_req, *stun_res;
+
+ pj_pool_reset(pool);
+
+ /* Got query */
+ pkt_len = sizeof(pkt);
+ addr_len = sizeof(client_addr);
+ status = pj_sock_recvfrom(test_sess->server_sock, pkt, &pkt_len,
+ 0, &client_addr, &addr_len);
+ if (status != PJ_SUCCESS) {
+ continue;
+ }
+
+ status = pj_stun_msg_decode(pool, pkt, pkt_len,
+ PJ_STUN_IS_DATAGRAM,
+ &stun_req, NULL, NULL);
+ if (status != PJ_SUCCESS) {
+ PJ_PERROR(1,(THIS_FILE, status, "STUN request decode error"));
+ continue;
+ }
+
+ status = pj_stun_msg_create_response(pool, stun_req,
+ PJ_STUN_SC_BAD_REQUEST, NULL,
+ &stun_res);
+ if (status != PJ_SUCCESS) {
+ PJ_PERROR(1,(THIS_FILE, status, "STUN create response error"));
+ continue;
+ }
+
+ status = pj_stun_msg_encode(stun_res, pkt, sizeof(pkt), 0,
+ NULL, &res_len);
+ if (status != PJ_SUCCESS) {
+ PJ_PERROR(1,(THIS_FILE, status, "STUN encode error"));
+ continue;
+ }
+
+ /* Ignore request */
+ if (test_sess->param.server_drop_request)
+ continue;
+
+ /* Wait for signal to continue */
+ if (test_sess->param.server_wait_for_event)
+ pj_event_wait(test_sess->server_event);
+
+ pkt_len = res_len;
+ pj_sock_sendto(test_sess->server_sock, pkt, &pkt_len, 0,
+ &client_addr, pj_sockaddr_get_len(&client_addr));
+ }
+ }
+
+ pj_pool_release(pool);
+
+ PJ_LOG(4,(THIS_FILE, "Server thread quitting"));
+ return 0;
+}
+
+static int worker_thread_proc(void *p)
+{
+ struct stun_test_session *test_sess = (struct stun_test_session*)p;
+
+ PJ_LOG(4,(THIS_FILE, "Worker thread running"));
+
+ while (!test_sess->thread_quit_flag) {
+ pj_time_val timeout = {0, 10};
+ pj_timer_heap_poll(test_sess->stun_cfg.timer_heap, NULL);
+ pj_ioqueue_poll(test_sess->stun_cfg.ioqueue, &timeout);
+ }
+
+ PJ_LOG(4,(THIS_FILE, "Worker thread quitting"));
+ return 0;
+}
+
+static pj_bool_t stun_sock_on_status(pj_stun_sock *stun_sock,
+ pj_stun_sock_op op,
+ pj_status_t status)
+{
+ struct stun_test_session *test_sess = (struct stun_test_session*)pj_stun_sock_get_user_data(stun_sock);
+
+ PJ_UNUSED_ARG(op);
+ PJ_UNUSED_ARG(status);
+
+ test_sess->param.client_got_response++;
+ return PJ_TRUE;
+}
+
+static int stun_destroy_test_session(struct stun_test_session *test_sess)
+{
+
+ unsigned i;
+ pj_stun_sock_cb stun_cb;
+ pj_status_t status;
+ pj_stun_sock *stun_sock[MAX_SOCK_CLIENTS];
+
+ pj_bzero(&stun_cb, sizeof(stun_cb));
+ stun_cb.on_status = &stun_sock_on_status;
+
+ pj_event_reset(test_sess->server_event);
+
+ /* Create all clients first */
+ for (i=0; i<MAX_SOCK_CLIENTS; ++i) {
+ char name[10];
+ sprintf(name, "stun%02d", i);
+ status = pj_stun_sock_create(&test_sess->stun_cfg, name, pj_AF_INET(),
+ &stun_cb, NULL, test_sess,
+ &stun_sock[i]);
+ if (status != PJ_SUCCESS) {
+ PJ_PERROR(1,(THIS_FILE, status, "Error creating stun socket"));
+ return -10;
+ }
+ }
+
+ /* Start resolution */
+ for (i=0; i<MAX_SOCK_CLIENTS; ++i) {
+ pj_str_t server_ip = pj_str("127.0.0.1");
+ status = pj_stun_sock_start(stun_sock[i], &server_ip,
+ (pj_uint16_t)test_sess->server_port, NULL);
+ if (status != PJ_SUCCESS) {
+ PJ_PERROR(1,(THIS_FILE, status, "Error starting stun socket"));
+ return -20;
+ }
+ }
+
+ /* settle down */
+ pj_thread_sleep(test_sess->param.client_sleep_after_start);
+
+ /* Resume server threads */
+ pj_event_set(test_sess->server_event);
+
+ pj_thread_sleep(test_sess->param.client_sleep_before_destroy);
+
+ /* Destroy clients */
+ for (i=0; i<MAX_SOCK_CLIENTS; ++i) {
+ status = pj_stun_sock_destroy(stun_sock[i]);
+ if (status != PJ_SUCCESS) {
+ PJ_PERROR(1,(THIS_FILE, status, "Error destroying stun socket"));
+ }
+ }
+
+ return 0;
+}
+
+static int stun_destroy_test(void)
+{
+ enum { LOOP = 500 };
+ struct stun_test_session test_sess;
+ pj_sockaddr bind_addr;
+ int addr_len;
+ pj_caching_pool cp;
+ pj_pool_t *pool;
+ unsigned i;
+ pj_status_t status;
+ int rc = 0;
+
+ PJ_LOG(3,(THIS_FILE, " STUN destroy concurrency test"));
+
+ pj_bzero(&test_sess, sizeof(test_sess));
+
+ pj_caching_pool_init(&cp, NULL, 0);
+ pool = pj_pool_create(&cp.factory, "testsess", 512, 512, NULL);
+
+ pj_stun_config_init(&test_sess.stun_cfg, &cp.factory, 0, NULL, NULL);
+
+ status = pj_timer_heap_create(pool, 1023, &test_sess.stun_cfg.timer_heap);
+ pj_assert(status == PJ_SUCCESS);
+
+ status = pj_lock_create_recursive_mutex(pool, NULL, &test_sess.lock);
+ pj_assert(status == PJ_SUCCESS);
+
+ pj_timer_heap_set_lock(test_sess.stun_cfg.timer_heap, test_sess.lock, PJ_TRUE);
+ pj_assert(status == PJ_SUCCESS);
+
+ status = pj_ioqueue_create(pool, 512, &test_sess.stun_cfg.ioqueue);
+ pj_assert(status == PJ_SUCCESS);
+
+ pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &test_sess.server_sock);
+ pj_sockaddr_init(pj_AF_INET(), &bind_addr, NULL, 0);
+ status = pj_sock_bind(test_sess.server_sock, &bind_addr, pj_sockaddr_get_len(&bind_addr));
+ pj_assert(status == PJ_SUCCESS);
+
+ addr_len = sizeof(bind_addr);
+ status = pj_sock_getsockname(test_sess.server_sock, &bind_addr, &addr_len);
+ pj_assert(status == PJ_SUCCESS);
+
+ test_sess.server_port = pj_sockaddr_get_port(&bind_addr);
+
+ status = pj_event_create(pool, NULL, PJ_TRUE, PJ_FALSE, &test_sess.server_event);
+ pj_assert(status == PJ_SUCCESS);
+
+ for (i=0; i<SERVER_THREAD_CNT; ++i) {
+ status = pj_thread_create(pool, NULL,
+ &server_thread_proc, &test_sess,
+ 0, 0, &test_sess.server_threads[i]);
+ pj_assert(status == PJ_SUCCESS);
+ }
+
+ for (i=0; i<WORKER_THREAD_CNT; ++i) {
+ status = pj_thread_create(pool, NULL,
+ &worker_thread_proc, &test_sess,
+ 0, 0, &test_sess.worker_threads[i]);
+ pj_assert(status == PJ_SUCCESS);
+ }
+
+ /* Test 1: Main thread calls destroy while callback is processing response */
+ PJ_LOG(3,(THIS_FILE, " Destroy in main thread while callback is running"));
+ for (i=0; i<LOOP; ++i) {
+ int sleep = pj_rand() % 5;
+
+ PJ_LOG(3,(THIS_FILE, " Try %-3d of %d", i+1, LOOP));
+
+ /* Test 1: destroy at the same time when receiving response */
+ pj_bzero(&test_sess.param, sizeof(test_sess.param));
+ test_sess.param.client_sleep_after_start = 20;
+ test_sess.param.client_sleep_before_destroy = sleep;
+ test_sess.param.server_wait_for_event = PJ_TRUE;
+ stun_destroy_test_session(&test_sess);
+ PJ_LOG(3,(THIS_FILE,
+ " stun test a: sleep delay:%d: clients with response: %d",
+ sleep, test_sess.param.client_got_response));
+
+ /* Test 2: destroy at the same time with STUN retransmit timer */
+ test_sess.param.server_drop_request = PJ_TRUE;
+ test_sess.param.client_sleep_after_start = 0;
+ test_sess.param.client_sleep_before_destroy = PJ_STUN_RTO_VALUE;
+ test_sess.param.server_wait_for_event = PJ_FALSE;
+ stun_destroy_test_session(&test_sess);
+ PJ_LOG(3,(THIS_FILE, " stun test b: retransmit concurrency"));
+
+ /* Test 3: destroy at the same time with receiving response
+ * AND STUN retransmit timer */
+ test_sess.param.client_got_response = 0;
+ test_sess.param.server_drop_request = PJ_FALSE;
+ test_sess.param.client_sleep_after_start = PJ_STUN_RTO_VALUE;
+ test_sess.param.client_sleep_before_destroy = 0;
+ test_sess.param.server_wait_for_event = PJ_TRUE;
+ stun_destroy_test_session(&test_sess);
+ PJ_LOG(3,(THIS_FILE,
+ " stun test c: clients with response: %d",
+ test_sess.param.client_got_response));
+
+ pj_thread_sleep(10);
+
+ ice_one_conc_test(&test_sess.stun_cfg, PJ_FALSE);
+
+ pj_thread_sleep(10);
+ }
+
+ /* Avoid compiler warning */
+ goto on_return;
+
+
+on_return:
+ test_sess.thread_quit_flag = PJ_TRUE;
+
+ for (i=0; i<SERVER_THREAD_CNT; ++i) {
+ pj_thread_join(test_sess.server_threads[i]);
+ }
+
+ for (i=0; i<WORKER_THREAD_CNT; ++i) {
+ pj_thread_join(test_sess.worker_threads[i]);
+ }
+
+ pj_event_destroy(test_sess.server_event);
+ pj_sock_close(test_sess.server_sock);
+ pj_ioqueue_destroy(test_sess.stun_cfg.ioqueue);
+ pj_timer_heap_destroy(test_sess.stun_cfg.timer_heap);
+
+ pj_pool_release(pool);
+ pj_caching_pool_destroy(&cp);
+
+ PJ_LOG(3,(THIS_FILE, " Done. rc=%d", rc));
+ return rc;
+}
+
+
+int concur_test(void)
+{
+ int rc = 0;
+
+ rc += stun_destroy_test();
+
+ return 0;
+}
+
+#endif /* INCLUDE_CONCUR_TEST */