summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-01 21:46:17 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-01 21:46:17 +0000
commit2659094addd65fee3d26bc2fe21f4b42d825bdfb (patch)
tree373c7a58cefb6d06b9bdcb842628c3cfec2fafbe
parent7520eb2058c85109a0b137b0d0f0487a149c9a0c (diff)
Changed atomic interface and fixed bugs in echo test client
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@5 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjlib/include/pj/os.h21
-rw-r--r--pjlib/src/pj/os_core_linux_kernel.c17
-rw-r--r--pjlib/src/pj/os_core_unix.c41
-rw-r--r--pjlib/src/pj/os_core_win32.c26
-rw-r--r--pjlib/src/pjlib-test/echo_clt.c130
-rw-r--r--pjlib/src/pjlib-test/test.h2
6 files changed, 105 insertions, 132 deletions
diff --git a/pjlib/include/pj/os.h b/pjlib/include/pj/os.h
index 7b91b7ac..536992f2 100644
--- a/pjlib/include/pj/os.h
+++ b/pjlib/include/pj/os.h
@@ -304,11 +304,9 @@ PJ_DECL(pj_status_t) pj_atomic_destroy( pj_atomic_t *atomic_var );
*
* @param atomic_var the atomic variable.
* @param value value to be set to the variable.
- *
- * @return the previous value of the variable.
*/
-PJ_DECL(pj_atomic_value_t) pj_atomic_set(pj_atomic_t *atomic_var,
- pj_atomic_value_t value);
+PJ_DECL(void) pj_atomic_set( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value);
/**
* Get the value of an atomic type.
@@ -323,19 +321,24 @@ PJ_DECL(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var);
* Increment the value of an atomic type.
*
* @param atomic_var the atomic variable.
- *
- * @return the result.
*/
-PJ_DECL(pj_atomic_value_t) pj_atomic_inc(pj_atomic_t *atomic_var);
+PJ_DECL(void) pj_atomic_inc(pj_atomic_t *atomic_var);
/**
* Decrement the value of an atomic type.
*
* @param atomic_var the atomic variable.
+ */
+PJ_DECL(void) pj_atomic_dec(pj_atomic_t *atomic_var);
+
+/**
+ * Add a value to an atomic type.
*
- * @return the result.
+ * @param atomic_var The atomic variable.
+ * @param value Value to be added.
*/
-PJ_DECL(pj_atomic_value_t) pj_atomic_dec(pj_atomic_t *atomic_var);
+PJ_DECL(void) pj_atomic_add( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value);
/**
* @}
diff --git a/pjlib/src/pj/os_core_linux_kernel.c b/pjlib/src/pj/os_core_linux_kernel.c
index 01d06ab2..14337087 100644
--- a/pjlib/src/pj/os_core_linux_kernel.c
+++ b/pjlib/src/pj/os_core_linux_kernel.c
@@ -415,15 +415,12 @@ PJ_DEF(pj_status_t) pj_atomic_create( pj_pool_t *pool,
PJ_DEF(pj_status_t) pj_atomic_destroy( pj_atomic_t *var )
{
- return 0;
+ return PJ_SUCCESS;
}
-PJ_DEF(pj_atomic_value_t) pj_atomic_set(pj_atomic_t *var,
- pj_atomic_value_t value)
+PJ_DEF(void) pj_atomic_set(pj_atomic_t *var, pj_atomic_value_t value)
{
- pj_atomic_value_t oldval = atomic_read(&var->atom);
atomic_set(&var->atom, value);
- return oldval;
}
PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *var)
@@ -431,18 +428,20 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *var)
return atomic_read(&var->atom);
}
-PJ_DEF(pj_atomic_value_t) pj_atomic_inc(pj_atomic_t *var)
+PJ_DEF(void) pj_atomic_inc(pj_atomic_t *var)
{
atomic_inc(&var->atom);
- return atomic_read(&var->atom);
}
-PJ_DEF(pj_atomic_value_t) pj_atomic_dec(pj_atomic_t *var)
+PJ_DEF(void) pj_atomic_dec(pj_atomic_t *var)
{
atomic_dec(&var->atom);
- return atomic_read(&var->atom);
}
+PJ_DEF(void) pj_atomic_add( pj_atomic_t *var, pj_atomic_value_t value )
+{
+ atomic_add(value, &var->atom);
+}
///////////////////////////////////////////////////////////////////////////////
diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c
index 4b0de399..18f4ded2 100644
--- a/pjlib/src/pj/os_core_unix.c
+++ b/pjlib/src/pj/os_core_unix.c
@@ -526,23 +526,17 @@ PJ_DEF(pj_status_t) pj_atomic_destroy( pj_atomic_t *atomic_var )
/*
* pj_atomic_set()
*/
-PJ_DEF(pj_atomic_value_t) pj_atomic_set(pj_atomic_t *atomic_var,
- pj_atomic_value_t value)
+PJ_DEF(void) pj_atomic_set(pj_atomic_t *atomic_var, pj_atomic_value_t value)
{
- pj_atomic_value_t oldval;
-
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(atomic_var, 0);
#if PJ_HAS_THREADS
pj_mutex_lock( atomic_var->mutex );
#endif
- oldval = atomic_var->value;
atomic_var->value = value;
#if PJ_HAS_THREADS
pj_mutex_unlock( atomic_var->mutex);
#endif
- return oldval;
}
/*
@@ -553,7 +547,6 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var)
pj_atomic_value_t oldval;
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(atomic_var, 0);
#if PJ_HAS_THREADS
pj_mutex_lock( atomic_var->mutex );
@@ -568,41 +561,49 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var)
/*
* pj_atomic_inc()
*/
-PJ_DEF(pj_atomic_value_t) pj_atomic_inc(pj_atomic_t *atomic_var)
+PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var)
{
- pj_atomic_value_t newval;
-
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(atomic_var, 0);
#if PJ_HAS_THREADS
pj_mutex_lock( atomic_var->mutex );
#endif
- newval = ++atomic_var->value;
+ ++atomic_var->value;
#if PJ_HAS_THREADS
pj_mutex_unlock( atomic_var->mutex);
#endif
- return newval;
}
/*
* pj_atomic_dec()
*/
-PJ_DEF(pj_atomic_value_t) pj_atomic_dec(pj_atomic_t *atomic_var)
+PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
{
- pj_atomic_value_t newval;
-
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(atomic_var, 0);
#if PJ_HAS_THREADS
pj_mutex_lock( atomic_var->mutex );
#endif
- newval = --atomic_var->value;
+ --atomic_var->value;
#if PJ_HAS_THREADS
pj_mutex_unlock( atomic_var->mutex);
#endif
- return newval;
+}
+
+/*
+ * pj_atomic_add()
+ */
+PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value )
+{
+#if PJ_HAS_THREADS
+ pj_mutex_lock(atomic_var->mutex);
+#endif
+
+ atomic_var->value += value;
+
+#if PJ_HAS_THREADS
+ pj_mutex_unlock(atomic_var->mutex);
+#endif
}
diff --git a/pjlib/src/pj/os_core_win32.c b/pjlib/src/pj/os_core_win32.c
index ad69799d..6ba87493 100644
--- a/pjlib/src/pj/os_core_win32.c
+++ b/pjlib/src/pj/os_core_win32.c
@@ -495,18 +495,17 @@ PJ_DEF(pj_status_t) pj_atomic_destroy( pj_atomic_t *var )
/*
* pj_atomic_set()
*/
-PJ_DEF(long) pj_atomic_set(pj_atomic_t *atomic_var, long value)
+PJ_DEF(void) pj_atomic_set( pj_atomic_t *atomic_var, pj_atomic_value_t value)
{
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(atomic_var, 0);
- return InterlockedExchange(&atomic_var->value, value);
+ InterlockedExchange(&atomic_var->value, value);
}
/*
* pj_atomic_get()
*/
-PJ_DEF(long) pj_atomic_get(pj_atomic_t *atomic_var)
+PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var)
{
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(atomic_var, 0);
@@ -517,13 +516,12 @@ PJ_DEF(long) pj_atomic_get(pj_atomic_t *atomic_var)
/*
* pj_atomic_inc()
*/
-PJ_DEF(long) pj_atomic_inc(pj_atomic_t *atomic_var)
+PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var)
{
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(atomic_var, 0);
#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
- return InterlockedIncrement(&atomic_var->value);
+ InterlockedIncrement(&atomic_var->value);
#else
# error Fix Me
#endif
@@ -532,19 +530,27 @@ PJ_DEF(long) pj_atomic_inc(pj_atomic_t *atomic_var)
/*
* pj_atomic_dec()
*/
-PJ_DEF(long) pj_atomic_dec(pj_atomic_t *atomic_var)
+PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
{
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(atomic_var, 0);
#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
- return InterlockedDecrement(&atomic_var->value);
+ InterlockedDecrement(&atomic_var->value);
#else
# error Fix me
#endif
}
+/*
+ * pj_atomic_add()
+ */
+PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value )
+{
+ InterlockedExchangeAdd( &atomic_var->value, value );
+}
+
///////////////////////////////////////////////////////////////////////////////
/*
* pj_thread_local_alloc()
diff --git a/pjlib/src/pjlib-test/echo_clt.c b/pjlib/src/pjlib-test/echo_clt.c
index fcc00da7..897e94d6 100644
--- a/pjlib/src/pjlib-test/echo_clt.c
+++ b/pjlib/src/pjlib-test/echo_clt.c
@@ -28,11 +28,7 @@ struct client
int port;
};
-static pj_sem_t *sem;
-static pj_mutex_t *mutex;
-static pj_size_t total_bw;
-static unsigned total_poster;
-static pj_time_val first_report;
+static pj_atomic_t *totalBytes;
#define MSEC_PRINT_DURATION 1000
@@ -61,9 +57,7 @@ static int echo_client_thread(void *arg)
pj_status_t rc;
struct client *client = arg;
pj_status_t last_recv_err = PJ_SUCCESS, last_send_err = PJ_SUCCESS;
-
- pj_time_val last_report, next_report;
- pj_size_t thread_total;
+ unsigned counter = 0;
rc = app_socket(PJ_AF_INET, client->sock_type, 0, -1, &sock);
if (rc != PJ_SUCCESS) {
@@ -89,21 +83,19 @@ static int echo_client_thread(void *arg)
pj_inet_ntoa(addr.sin_addr),
pj_ntohs(addr.sin_port)));
- pj_create_random_string(send_buf, BUF_SIZE);
- thread_total = 0;
+ pj_memset(send_buf, 'A', BUF_SIZE);
+ send_buf[BUF_SIZE-1]='\0';
/* Give other thread chance to initialize themselves! */
- pj_thread_sleep(500);
-
- pj_gettimeofday(&last_report);
- next_report = first_report;
+ pj_thread_sleep(200);
//PJ_LOG(3,("", "...thread %p running", pj_thread_this()));
for (;;) {
int rc;
pj_ssize_t bytes;
- pj_time_val now;
+
+ ++counter;
/* Send a packet. */
bytes = BUF_SIZE;
@@ -147,51 +139,20 @@ static int echo_client_thread(void *arg)
} while (bytes != BUF_SIZE && bytes != 0);
}
- /* Accumulate total received. */
- thread_total = thread_total + bytes;
-
- /* Report current bandwidth on due. */
- pj_gettimeofday(&now);
-
- if (PJ_TIME_VAL_GTE(now, next_report)) {
- pj_uint32_t bw;
- pj_bool_t signal_parent = 0;
- pj_time_val duration;
- pj_uint32_t msec;
-
- duration = now;
- PJ_TIME_VAL_SUB(duration, last_report);
- msec = PJ_TIME_VAL_MSEC(duration);
-
- bw = thread_total * 1000 / msec;
-
- /* Post result to parent */
- pj_mutex_lock(mutex);
- total_bw += bw;
- total_poster++;
- //PJ_LOG(3,("", "...thread %p posting result", pj_thread_this()));
- if (total_poster >= ECHO_CLIENT_MAX_THREADS)
- signal_parent = 1;
- pj_mutex_unlock(mutex);
-
- thread_total = 0;
- last_report = now;
- next_report.sec++;
-
- if (signal_parent) {
- pj_sem_post(sem);
- }
-
- pj_thread_sleep(0);
- }
-
if (bytes == 0)
continue;
if (pj_memcmp(send_buf, recv_buf, BUF_SIZE) != 0) {
- //PJ_LOG(3,("", "...error: buffer has changed!"));
- break;
+ recv_buf[BUF_SIZE-1] = '\0';
+ PJ_LOG(3,("", "...error: buffer %u has changed!\n"
+ "send_buf=%s\n"
+ "recv_buf=%s\n",
+ counter, send_buf, recv_buf));
+ //break;
}
+
+ /* Accumulate total received. */
+ pj_atomic_add(totalBytes, bytes);
}
pj_sock_close(sock);
@@ -205,6 +166,8 @@ int echo_client(int sock_type, const char *server, int port)
pj_status_t rc;
struct client client;
int i;
+ pj_atomic_value_t last_received;
+ pj_timestamp last_report;
client.sock_type = sock_type;
client.server = server;
@@ -212,34 +175,17 @@ int echo_client(int sock_type, const char *server, int port)
pool = pj_pool_create( mem, NULL, 4000, 4000, NULL );
- rc = pj_sem_create(pool, NULL, 0, ECHO_CLIENT_MAX_THREADS+1, &sem);
- if (rc != PJ_SUCCESS) {
- PJ_LOG(3,("", "...error: unable to create semaphore", rc));
- return -10;
- }
-
- rc = pj_mutex_create_simple(pool, NULL, &mutex);
- if (rc != PJ_SUCCESS) {
- PJ_LOG(3,("", "...error: unable to create mutex", rc));
- return -20;
- }
-
- /*
- rc = pj_atomic_create(pool, 0, &atom);
+ rc = pj_atomic_create(pool, 0, &totalBytes);
if (rc != PJ_SUCCESS) {
PJ_LOG(3,("", "...error: unable to create atomic variable", rc));
return -30;
}
- */
PJ_LOG(3,("", "Echo client started"));
PJ_LOG(3,("", " Destination: %s:%d",
ECHO_SERVER_ADDRESS, ECHO_SERVER_START_PORT));
PJ_LOG(3,("", " Press Ctrl-C to exit"));
- pj_gettimeofday(&first_report);
- first_report.sec += 2;
-
for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
rc = pj_thread_create( pool, NULL, &echo_client_thread, &client,
PJ_THREAD_DEFAULT_STACK_SIZE, 0,
@@ -250,19 +196,37 @@ int echo_client(int sock_type, const char *server, int port)
}
}
- for (;;) {
- pj_uint32_t bw;
+ last_received = 0;
+ pj_get_timestamp(&last_report);
- pj_sem_wait(sem);
-
- pj_mutex_lock(mutex);
- bw = total_bw;
- total_bw = 0;
- total_poster = 0;
- pj_mutex_unlock(mutex);
+ for (;;) {
+ pj_timestamp now;
+ unsigned long received, cur_received;
+ unsigned msec;
+ pj_highprec_t bw;
+ pj_time_val elapsed;
+ unsigned bw32;
+
+ pj_thread_sleep(1000);
+
+ pj_get_timestamp(&now);
+ elapsed = pj_elapsed_time(&last_report, &now);
+ msec = PJ_TIME_VAL_MSEC(elapsed);
+
+ received = pj_atomic_get(totalBytes);
+ cur_received = received - last_received;
+
+ bw = cur_received;
+ pj_highprec_mul(bw, 1000);
+ pj_highprec_div(bw, msec);
+
+ bw32 = (unsigned)bw;
+
+ last_report = now;
+ last_received = received;
PJ_LOG(3,("", "...%d threads, total bandwidth: %d KB/s",
- ECHO_CLIENT_MAX_THREADS, bw/1000));
+ ECHO_CLIENT_MAX_THREADS, bw32/1000));
}
for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h
index 28c5123c..53ba0e1a 100644
--- a/pjlib/src/pjlib-test/test.h
+++ b/pjlib/src/pjlib-test/test.h
@@ -44,7 +44,7 @@
#define ECHO_SERVER_ADDRESS "compaq.home"
#define ECHO_SERVER_DURATION_MSEC (60*60*1000)
-#define ECHO_CLIENT_MAX_THREADS 2
+#define ECHO_CLIENT_MAX_THREADS 6
PJ_BEGIN_DECL