diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-01 21:46:17 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-01 21:46:17 +0000 |
commit | 2659094addd65fee3d26bc2fe21f4b42d825bdfb (patch) | |
tree | 373c7a58cefb6d06b9bdcb842628c3cfec2fafbe | |
parent | 7520eb2058c85109a0b137b0d0f0487a149c9a0c (diff) |
Changed atomic interface and fixed bugs in echo test client
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@5 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r-- | pjlib/include/pj/os.h | 21 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_linux_kernel.c | 17 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_unix.c | 41 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_win32.c | 26 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/echo_clt.c | 130 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/test.h | 2 |
6 files changed, 105 insertions, 132 deletions
diff --git a/pjlib/include/pj/os.h b/pjlib/include/pj/os.h index 7b91b7ac..536992f2 100644 --- a/pjlib/include/pj/os.h +++ b/pjlib/include/pj/os.h @@ -304,11 +304,9 @@ PJ_DECL(pj_status_t) pj_atomic_destroy( pj_atomic_t *atomic_var ); * * @param atomic_var the atomic variable. * @param value value to be set to the variable. - * - * @return the previous value of the variable. */ -PJ_DECL(pj_atomic_value_t) pj_atomic_set(pj_atomic_t *atomic_var, - pj_atomic_value_t value); +PJ_DECL(void) pj_atomic_set( pj_atomic_t *atomic_var, + pj_atomic_value_t value); /** * Get the value of an atomic type. @@ -323,19 +321,24 @@ PJ_DECL(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var); * Increment the value of an atomic type. * * @param atomic_var the atomic variable. - * - * @return the result. */ -PJ_DECL(pj_atomic_value_t) pj_atomic_inc(pj_atomic_t *atomic_var); +PJ_DECL(void) pj_atomic_inc(pj_atomic_t *atomic_var); /** * Decrement the value of an atomic type. * * @param atomic_var the atomic variable. + */ +PJ_DECL(void) pj_atomic_dec(pj_atomic_t *atomic_var); + +/** + * Add a value to an atomic type. * - * @return the result. + * @param atomic_var The atomic variable. + * @param value Value to be added. */ -PJ_DECL(pj_atomic_value_t) pj_atomic_dec(pj_atomic_t *atomic_var); +PJ_DECL(void) pj_atomic_add( pj_atomic_t *atomic_var, + pj_atomic_value_t value); /** * @} diff --git a/pjlib/src/pj/os_core_linux_kernel.c b/pjlib/src/pj/os_core_linux_kernel.c index 01d06ab2..14337087 100644 --- a/pjlib/src/pj/os_core_linux_kernel.c +++ b/pjlib/src/pj/os_core_linux_kernel.c @@ -415,15 +415,12 @@ PJ_DEF(pj_status_t) pj_atomic_create( pj_pool_t *pool, PJ_DEF(pj_status_t) pj_atomic_destroy( pj_atomic_t *var ) { - return 0; + return PJ_SUCCESS; } -PJ_DEF(pj_atomic_value_t) pj_atomic_set(pj_atomic_t *var, - pj_atomic_value_t value) +PJ_DEF(void) pj_atomic_set(pj_atomic_t *var, pj_atomic_value_t value) { - pj_atomic_value_t oldval = atomic_read(&var->atom); atomic_set(&var->atom, value); - return oldval; } PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *var) @@ -431,18 +428,20 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *var) return atomic_read(&var->atom); } -PJ_DEF(pj_atomic_value_t) pj_atomic_inc(pj_atomic_t *var) +PJ_DEF(void) pj_atomic_inc(pj_atomic_t *var) { atomic_inc(&var->atom); - return atomic_read(&var->atom); } -PJ_DEF(pj_atomic_value_t) pj_atomic_dec(pj_atomic_t *var) +PJ_DEF(void) pj_atomic_dec(pj_atomic_t *var) { atomic_dec(&var->atom); - return atomic_read(&var->atom); } +PJ_DEF(void) pj_atomic_add( pj_atomic_t *var, pj_atomic_value_t value ) +{ + atomic_add(value, &var->atom); +} /////////////////////////////////////////////////////////////////////////////// diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c index 4b0de399..18f4ded2 100644 --- a/pjlib/src/pj/os_core_unix.c +++ b/pjlib/src/pj/os_core_unix.c @@ -526,23 +526,17 @@ PJ_DEF(pj_status_t) pj_atomic_destroy( pj_atomic_t *atomic_var ) /* * pj_atomic_set() */ -PJ_DEF(pj_atomic_value_t) pj_atomic_set(pj_atomic_t *atomic_var, - pj_atomic_value_t value) +PJ_DEF(void) pj_atomic_set(pj_atomic_t *atomic_var, pj_atomic_value_t value) { - pj_atomic_value_t oldval; - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(atomic_var, 0); #if PJ_HAS_THREADS pj_mutex_lock( atomic_var->mutex ); #endif - oldval = atomic_var->value; atomic_var->value = value; #if PJ_HAS_THREADS pj_mutex_unlock( atomic_var->mutex); #endif - return oldval; } /* @@ -553,7 +547,6 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var) pj_atomic_value_t oldval; PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(atomic_var, 0); #if PJ_HAS_THREADS pj_mutex_lock( atomic_var->mutex ); @@ -568,41 +561,49 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var) /* * pj_atomic_inc() */ -PJ_DEF(pj_atomic_value_t) pj_atomic_inc(pj_atomic_t *atomic_var) +PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var) { - pj_atomic_value_t newval; - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(atomic_var, 0); #if PJ_HAS_THREADS pj_mutex_lock( atomic_var->mutex ); #endif - newval = ++atomic_var->value; + ++atomic_var->value; #if PJ_HAS_THREADS pj_mutex_unlock( atomic_var->mutex); #endif - return newval; } /* * pj_atomic_dec() */ -PJ_DEF(pj_atomic_value_t) pj_atomic_dec(pj_atomic_t *atomic_var) +PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) { - pj_atomic_value_t newval; - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(atomic_var, 0); #if PJ_HAS_THREADS pj_mutex_lock( atomic_var->mutex ); #endif - newval = --atomic_var->value; + --atomic_var->value; #if PJ_HAS_THREADS pj_mutex_unlock( atomic_var->mutex); #endif - return newval; +} + +/* + * pj_atomic_add() + */ +PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value ) +{ +#if PJ_HAS_THREADS + pj_mutex_lock(atomic_var->mutex); +#endif + + atomic_var->value += value; + +#if PJ_HAS_THREADS + pj_mutex_unlock(atomic_var->mutex); +#endif } diff --git a/pjlib/src/pj/os_core_win32.c b/pjlib/src/pj/os_core_win32.c index ad69799d..6ba87493 100644 --- a/pjlib/src/pj/os_core_win32.c +++ b/pjlib/src/pj/os_core_win32.c @@ -495,18 +495,17 @@ PJ_DEF(pj_status_t) pj_atomic_destroy( pj_atomic_t *var ) /* * pj_atomic_set() */ -PJ_DEF(long) pj_atomic_set(pj_atomic_t *atomic_var, long value) +PJ_DEF(void) pj_atomic_set( pj_atomic_t *atomic_var, pj_atomic_value_t value) { PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(atomic_var, 0); - return InterlockedExchange(&atomic_var->value, value); + InterlockedExchange(&atomic_var->value, value); } /* * pj_atomic_get() */ -PJ_DEF(long) pj_atomic_get(pj_atomic_t *atomic_var) +PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var) { PJ_CHECK_STACK(); PJ_ASSERT_RETURN(atomic_var, 0); @@ -517,13 +516,12 @@ PJ_DEF(long) pj_atomic_get(pj_atomic_t *atomic_var) /* * pj_atomic_inc() */ -PJ_DEF(long) pj_atomic_inc(pj_atomic_t *atomic_var) +PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var) { PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(atomic_var, 0); #if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400 - return InterlockedIncrement(&atomic_var->value); + InterlockedIncrement(&atomic_var->value); #else # error Fix Me #endif @@ -532,19 +530,27 @@ PJ_DEF(long) pj_atomic_inc(pj_atomic_t *atomic_var) /* * pj_atomic_dec() */ -PJ_DEF(long) pj_atomic_dec(pj_atomic_t *atomic_var) +PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) { PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(atomic_var, 0); #if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400 - return InterlockedDecrement(&atomic_var->value); + InterlockedDecrement(&atomic_var->value); #else # error Fix me #endif } +/* + * pj_atomic_add() + */ +PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, + pj_atomic_value_t value ) +{ + InterlockedExchangeAdd( &atomic_var->value, value ); +} + /////////////////////////////////////////////////////////////////////////////// /* * pj_thread_local_alloc() diff --git a/pjlib/src/pjlib-test/echo_clt.c b/pjlib/src/pjlib-test/echo_clt.c index fcc00da7..897e94d6 100644 --- a/pjlib/src/pjlib-test/echo_clt.c +++ b/pjlib/src/pjlib-test/echo_clt.c @@ -28,11 +28,7 @@ struct client int port; }; -static pj_sem_t *sem; -static pj_mutex_t *mutex; -static pj_size_t total_bw; -static unsigned total_poster; -static pj_time_val first_report; +static pj_atomic_t *totalBytes; #define MSEC_PRINT_DURATION 1000 @@ -61,9 +57,7 @@ static int echo_client_thread(void *arg) pj_status_t rc; struct client *client = arg; pj_status_t last_recv_err = PJ_SUCCESS, last_send_err = PJ_SUCCESS; - - pj_time_val last_report, next_report; - pj_size_t thread_total; + unsigned counter = 0; rc = app_socket(PJ_AF_INET, client->sock_type, 0, -1, &sock); if (rc != PJ_SUCCESS) { @@ -89,21 +83,19 @@ static int echo_client_thread(void *arg) pj_inet_ntoa(addr.sin_addr), pj_ntohs(addr.sin_port))); - pj_create_random_string(send_buf, BUF_SIZE); - thread_total = 0; + pj_memset(send_buf, 'A', BUF_SIZE); + send_buf[BUF_SIZE-1]='\0'; /* Give other thread chance to initialize themselves! */ - pj_thread_sleep(500); - - pj_gettimeofday(&last_report); - next_report = first_report; + pj_thread_sleep(200); //PJ_LOG(3,("", "...thread %p running", pj_thread_this())); for (;;) { int rc; pj_ssize_t bytes; - pj_time_val now; + + ++counter; /* Send a packet. */ bytes = BUF_SIZE; @@ -147,51 +139,20 @@ static int echo_client_thread(void *arg) } while (bytes != BUF_SIZE && bytes != 0); } - /* Accumulate total received. */ - thread_total = thread_total + bytes; - - /* Report current bandwidth on due. */ - pj_gettimeofday(&now); - - if (PJ_TIME_VAL_GTE(now, next_report)) { - pj_uint32_t bw; - pj_bool_t signal_parent = 0; - pj_time_val duration; - pj_uint32_t msec; - - duration = now; - PJ_TIME_VAL_SUB(duration, last_report); - msec = PJ_TIME_VAL_MSEC(duration); - - bw = thread_total * 1000 / msec; - - /* Post result to parent */ - pj_mutex_lock(mutex); - total_bw += bw; - total_poster++; - //PJ_LOG(3,("", "...thread %p posting result", pj_thread_this())); - if (total_poster >= ECHO_CLIENT_MAX_THREADS) - signal_parent = 1; - pj_mutex_unlock(mutex); - - thread_total = 0; - last_report = now; - next_report.sec++; - - if (signal_parent) { - pj_sem_post(sem); - } - - pj_thread_sleep(0); - } - if (bytes == 0) continue; if (pj_memcmp(send_buf, recv_buf, BUF_SIZE) != 0) { - //PJ_LOG(3,("", "...error: buffer has changed!")); - break; + recv_buf[BUF_SIZE-1] = '\0'; + PJ_LOG(3,("", "...error: buffer %u has changed!\n" + "send_buf=%s\n" + "recv_buf=%s\n", + counter, send_buf, recv_buf)); + //break; } + + /* Accumulate total received. */ + pj_atomic_add(totalBytes, bytes); } pj_sock_close(sock); @@ -205,6 +166,8 @@ int echo_client(int sock_type, const char *server, int port) pj_status_t rc; struct client client; int i; + pj_atomic_value_t last_received; + pj_timestamp last_report; client.sock_type = sock_type; client.server = server; @@ -212,34 +175,17 @@ int echo_client(int sock_type, const char *server, int port) pool = pj_pool_create( mem, NULL, 4000, 4000, NULL ); - rc = pj_sem_create(pool, NULL, 0, ECHO_CLIENT_MAX_THREADS+1, &sem); - if (rc != PJ_SUCCESS) { - PJ_LOG(3,("", "...error: unable to create semaphore", rc)); - return -10; - } - - rc = pj_mutex_create_simple(pool, NULL, &mutex); - if (rc != PJ_SUCCESS) { - PJ_LOG(3,("", "...error: unable to create mutex", rc)); - return -20; - } - - /* - rc = pj_atomic_create(pool, 0, &atom); + rc = pj_atomic_create(pool, 0, &totalBytes); if (rc != PJ_SUCCESS) { PJ_LOG(3,("", "...error: unable to create atomic variable", rc)); return -30; } - */ PJ_LOG(3,("", "Echo client started")); PJ_LOG(3,("", " Destination: %s:%d", ECHO_SERVER_ADDRESS, ECHO_SERVER_START_PORT)); PJ_LOG(3,("", " Press Ctrl-C to exit")); - pj_gettimeofday(&first_report); - first_report.sec += 2; - for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) { rc = pj_thread_create( pool, NULL, &echo_client_thread, &client, PJ_THREAD_DEFAULT_STACK_SIZE, 0, @@ -250,19 +196,37 @@ int echo_client(int sock_type, const char *server, int port) } } - for (;;) { - pj_uint32_t bw; + last_received = 0; + pj_get_timestamp(&last_report); - pj_sem_wait(sem); - - pj_mutex_lock(mutex); - bw = total_bw; - total_bw = 0; - total_poster = 0; - pj_mutex_unlock(mutex); + for (;;) { + pj_timestamp now; + unsigned long received, cur_received; + unsigned msec; + pj_highprec_t bw; + pj_time_val elapsed; + unsigned bw32; + + pj_thread_sleep(1000); + + pj_get_timestamp(&now); + elapsed = pj_elapsed_time(&last_report, &now); + msec = PJ_TIME_VAL_MSEC(elapsed); + + received = pj_atomic_get(totalBytes); + cur_received = received - last_received; + + bw = cur_received; + pj_highprec_mul(bw, 1000); + pj_highprec_div(bw, msec); + + bw32 = (unsigned)bw; + + last_report = now; + last_received = received; PJ_LOG(3,("", "...%d threads, total bandwidth: %d KB/s", - ECHO_CLIENT_MAX_THREADS, bw/1000)); + ECHO_CLIENT_MAX_THREADS, bw32/1000)); } for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) { diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h index 28c5123c..53ba0e1a 100644 --- a/pjlib/src/pjlib-test/test.h +++ b/pjlib/src/pjlib-test/test.h @@ -44,7 +44,7 @@ #define ECHO_SERVER_ADDRESS "compaq.home" #define ECHO_SERVER_DURATION_MSEC (60*60*1000) -#define ECHO_CLIENT_MAX_THREADS 2 +#define ECHO_CLIENT_MAX_THREADS 6 PJ_BEGIN_DECL |