summaryrefslogtreecommitdiff
path: root/pjlib/src/pj
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-09 15:37:19 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-09 15:37:19 +0000
commit6e1024262b48b57b771331b8c19e988e43627bd7 (patch)
treea43fdaeb6d7b22cc7afab1633622bf55d39dfd67 /pjlib/src/pj
parentfb9e3b3a6649cc5cbe0c6747cb1918f3be71ba06 (diff)
Rework pjlib++
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@36 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj')
-rw-r--r--pjlib/src/pj/errno.c3
-rw-r--r--pjlib/src/pj/file_io_ansi.c6
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c115
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c34
-rw-r--r--pjlib/src/pj/os_core_unix.c75
-rw-r--r--pjlib/src/pj/os_core_win32.c69
-rw-r--r--pjlib/src/pj/timer.c96
7 files changed, 313 insertions, 85 deletions
diff --git a/pjlib/src/pj/errno.c b/pjlib/src/pj/errno.c
index 682d9b6c..25bf5b7f 100644
--- a/pjlib/src/pj/errno.c
+++ b/pjlib/src/pj/errno.c
@@ -29,7 +29,8 @@ static const struct
{ PJ_ETOOMANY, "Too many objects of the specified type"},
{ PJ_EBUSY, "Object is busy"},
{ PJ_ENOTSUP, "Option/operation is not supported"},
- { PJ_EINVALIDOP, "Invalid operation"}
+ { PJ_EINVALIDOP, "Invalid operation"},
+ { PJ_ECANCELLED, "Operation cancelled"}
};
/*
diff --git a/pjlib/src/pj/file_io_ansi.c b/pjlib/src/pj/file_io_ansi.c
index f95c74a9..0946eddc 100644
--- a/pjlib/src/pj/file_io_ansi.c
+++ b/pjlib/src/pj/file_io_ansi.c
@@ -66,7 +66,8 @@ PJ_DEF(pj_status_t) pj_file_write( pj_oshandle_t fd,
clearerr((FILE*)fd);
written = fwrite(data, 1, *size, (FILE*)fd);
- if (ferror((FILE*)fd)) {
+ if (ferror((FILE*)fd)) {
+ *size = -1;
return PJ_RETURN_OS_ERROR(errno);
}
@@ -82,7 +83,8 @@ PJ_DEF(pj_status_t) pj_file_read( pj_oshandle_t fd,
clearerr((FILE*)fd);
bytes = fread(data, 1, *size, (FILE*)fd);
- if (ferror((FILE*)fd)) {
+ if (ferror((FILE*)fd)) {
+ *size = -1;
return PJ_RETURN_OS_ERROR(errno);
}
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 4cffcae4..75774ede 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -227,7 +227,9 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* so that send() can work in parallel.
*/
if (h->fd_type == PJ_SOCK_DGRAM) {
- pj_list_erase(write_op);
+ pj_list_erase(write_op);
+ write_op->op = 0;
+
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
@@ -267,7 +269,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
if (h->fd_type != PJ_SOCK_DGRAM) {
/* Write completion of the whole stream. */
- pj_list_erase(write_op);
+ pj_list_erase(write_op);
+ write_op->op = 0;
/* Clear operation if there's no more data to send. */
if (pj_list_empty(&h->write_list))
@@ -313,7 +316,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Get one accept operation from the list. */
accept_op = h->accept_list.next;
- pj_list_erase(accept_op);
+ pj_list_erase(accept_op);
+ accept_op->op = 0;
/* Clear bit in fdset if there is no more pending accept */
if (pj_list_empty(&h->accept_list))
@@ -346,7 +350,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Get one pending read operation from the list. */
read_op = h->read_list.next;
- pj_list_erase(read_op);
+ pj_list_erase(read_op);
+ read_op->op = 0;
/* Clear fdset if there is no pending read. */
if (pj_list_empty(&h->read_list))
@@ -475,6 +480,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available.
*/
@@ -496,8 +504,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
- read_op = (struct read_operation*)op_key;
-
read_op->op = PJ_IOQUEUE_OP_RECV;
read_op->buf = buffer;
read_op->size = *length;
@@ -530,6 +536,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available.
*/
@@ -552,8 +561,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
- read_op = (struct read_operation*)op_key;
-
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
@@ -586,6 +593,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track:
* Try to send data immediately, only if there's no pending write!
@@ -624,7 +634,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
/*
* Schedule asynchronous send.
*/
- write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND;
write_op->buf = (void*)data;
write_op->size = *length;
@@ -659,6 +668,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track:
* Try to send data immediately, only if there's no pending write!
@@ -702,7 +714,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
/*
* Schedule asynchronous send.
*/
- write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
write_op->buf = (void*)data;
write_op->size = *length;
@@ -735,6 +746,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+ accept_op = (struct accept_operation*)op_key;
+ accept_op->op = 0;
/* Fast track:
* See if there's new connection available immediately.
@@ -767,8 +781,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
* Schedule accept() operation to be completed when there is incoming
* connection available.
*/
- accept_op = (struct accept_operation*)op_key;
-
accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
accept_op->accept_fd = new_sock;
accept_op->rmt_addr = remote;
@@ -821,3 +833,82 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
}
#endif /* PJ_HAS_TCP */
+/*
+ * pj_ioqueue_is_pending()
+ */
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ struct generic_operation *op_rec;
+
+ PJ_UNUSED_ARG(key);
+
+ op_rec = (struct generic_operation*)op_key;
+ return op_rec->op != 0;
+}
+
+
+/*
+ * pj_ioqueue_post_completion()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ struct generic_operation *op_rec;
+
+ /*
+ * Find the operation key in all pending operation list to
+ * really make sure that it's still there; then call the callback.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Find the operation in the pending read list. */
+ op_rec = (struct generic_operation*)key->read_list.next;
+ while (op_rec != (void*)&key->read_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_read_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending write list. */
+ op_rec = (struct generic_operation*)key->write_list.next;
+ while (op_rec != (void*)&key->write_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_write_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending accept list. */
+ op_rec = (struct generic_operation*)key->accept_list.next;
+ while (op_rec != (void*)&key->accept_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_accept_complete)(key, op_key,
+ PJ_INVALID_SOCKET,
+ bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EINVALIDOP;
+}
+
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 7944953f..93cbb6d5 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -891,3 +891,37 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
}
#endif /* #if PJ_HAS_TCP */
+
+
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ BOOL rc;
+ DWORD bytesTransfered;
+
+ rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
+ &bytesTransfered, FALSE );
+
+ if (rc == FALSE) {
+ return GetLastError()==ERROR_IO_INCOMPLETE;
+ }
+
+ return FALSE;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ BOOL rc;
+
+ rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
+ (long)key, (OVERLAPPED*)op_key );
+ if (rc == FALSE) {
+ return PJ_RETURN_OS_ERROR(GetLastError());
+ }
+
+ return PJ_SUCCESS;
+}
+
diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c
index 3892b64b..cc57aab2 100644
--- a/pjlib/src/pj/os_core_unix.c
+++ b/pjlib/src/pj/os_core_unix.c
@@ -558,54 +558,91 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var)
return oldval;
}
+/*
+ * pj_atomic_inc_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)
+{
+ pj_atomic_value_t new_value;
+
+ PJ_CHECK_STACK();
+
+#if PJ_HAS_THREADS
+ pj_mutex_lock( atomic_var->mutex );
+#endif
+ new_value = ++atomic_var->value;
+#if PJ_HAS_THREADS
+ pj_mutex_unlock( atomic_var->mutex);
+#endif
+
+ return new_value;
+}
/*
* pj_atomic_inc()
*/
PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var)
{
- PJ_CHECK_STACK();
-
-#if PJ_HAS_THREADS
- pj_mutex_lock( atomic_var->mutex );
-#endif
- ++atomic_var->value;
-#if PJ_HAS_THREADS
- pj_mutex_unlock( atomic_var->mutex);
-#endif
+ pj_atomic_inc_and_get(atomic_var);
}
/*
- * pj_atomic_dec()
+ * pj_atomic_dec_and_get()
*/
-PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
-{
+PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var)
+{
+ pj_atomic_value_t new_value;
+
PJ_CHECK_STACK();
#if PJ_HAS_THREADS
pj_mutex_lock( atomic_var->mutex );
#endif
- --atomic_var->value;
+ new_value = --atomic_var->value;
#if PJ_HAS_THREADS
pj_mutex_unlock( atomic_var->mutex);
-#endif
+#endif
+
+ return new_value;
}
+
+/*
+ * pj_atomic_dec()
+ */
+PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
+{
+ pj_atomic_dec_and_get(atomic_var);
+}
/*
- * pj_atomic_add()
+ * pj_atomic_add_and_get()
*/
-PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value )
-{
+PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value )
+{
+ pj_atomic_value_t new_value;
+
#if PJ_HAS_THREADS
pj_mutex_lock(atomic_var->mutex);
#endif
- atomic_var->value += value;
+ atomic_var->value += value;
+ new_value = atomic_var->value;
#if PJ_HAS_THREADS
pj_mutex_unlock(atomic_var->mutex);
-#endif
+#endif
+
+ return new_value;
}
+/*
+ * pj_atomic_add()
+ */
+PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value )
+{
+ pj_atomic_add_and_get(atomic_var, value);
+}
///////////////////////////////////////////////////////////////////////////////
/*
diff --git a/pjlib/src/pj/os_core_win32.c b/pjlib/src/pj/os_core_win32.c
index 68416c66..be770d56 100644
--- a/pjlib/src/pj/os_core_win32.c
+++ b/pjlib/src/pj/os_core_win32.c
@@ -512,32 +512,48 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var)
return atomic_var->value;
}
+/*
+ * pj_atomic_inc_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)
+{
+ PJ_CHECK_STACK();
+
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ return InterlockedIncrement(&atomic_var->value);
+#else
+# error Fix Me
+#endif
+}
+
/*
* pj_atomic_inc()
*/
PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var)
-{
- PJ_CHECK_STACK();
-
-#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
- InterlockedIncrement(&atomic_var->value);
-#else
-# error Fix Me
-#endif
+{
+ pj_atomic_inc_and_get(atomic_var);
}
-
+
+/*
+ * pj_atomic_dec_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var)
+{
+ PJ_CHECK_STACK();
+
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ return InterlockedDecrement(&atomic_var->value);
+#else
+# error Fix me
+#endif
+}
+
/*
* pj_atomic_dec()
*/
PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
{
- PJ_CHECK_STACK();
-
-#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
- InterlockedDecrement(&atomic_var->value);
-#else
-# error Fix me
-#endif
+ pj_atomic_dec_and_get(atomic_var);
}
/*
@@ -546,10 +562,27 @@ PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var,
pj_atomic_value_t value )
{
- InterlockedExchangeAdd( &atomic_var->value, value );
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ InterlockedExchangeAdd( &atomic_var->value, value );
+#else
+# error Fix me
+#endif
+}
+
+/*
+ * pj_atomic_add_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value)
+{
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ long oldValue = InterlockedExchangeAdd( &atomic_var->value, value);
+ return oldValue + value;
+#else
+# error Fix me
+#endif
}
-
///////////////////////////////////////////////////////////////////////////////
/*
* pj_thread_local_alloc()
diff --git a/pjlib/src/pj/timer.c b/pjlib/src/pj/timer.c
index 9fa190a1..ffec1f4d 100644
--- a/pjlib/src/pj/timer.c
+++ b/pjlib/src/pj/timer.c
@@ -14,9 +14,13 @@
#include <pj/string.h>
#include <pj/assert.h>
#include <pj/errno.h>
+#include <pj/lock.h>
#define HEAP_PARENT(X) (X == 0 ? 0 : (((X) - 1) / 2))
#define HEAP_LEFT(X) (((X)+(X))+1)
+
+
+#define DEFAULT_MAX_TIMED_OUT_PER_POLL (64)
/**
@@ -32,9 +36,15 @@ struct pj_timer_heap_t
/** Current size of the heap. */
pj_size_t cur_size;
+
+ /** Max timed out entries to process per poll. */
+ unsigned max_entries_per_poll;
- /** Mutex for synchronization, or NULL */
- pj_mutex_t *mutex;
+ /** Lock object. */
+ pj_lock_t *lock;
+
+ /** Autodelete lock. */
+ pj_bool_t auto_delete_lock;
/**
* Current contents of the Heap, which is organized as a "heap" of
@@ -71,15 +81,15 @@ struct pj_timer_heap_t
PJ_INLINE(void) lock_timer_heap( pj_timer_heap_t *ht )
{
- if (ht->mutex) {
- pj_mutex_lock(ht->mutex);
+ if (ht->lock) {
+ pj_lock_acquire(ht->lock);
}
}
PJ_INLINE(void) unlock_timer_heap( pj_timer_heap_t *ht )
{
- if (ht->mutex) {
- pj_mutex_unlock(ht->mutex);
+ if (ht->lock) {
+ pj_lock_release(ht->lock);
}
}
@@ -319,7 +329,7 @@ PJ_DEF(pj_size_t) pj_timer_heap_mem_size(pj_size_t count)
sizeof(pj_timer_heap_t) +
/* size of each entry: */
(count+2) * (sizeof(pj_timer_entry*)+sizeof(pj_timer_id_t)) +
- /* mutex, pool etc: */
+ /* lock, pool etc: */
132;
}
@@ -328,7 +338,6 @@ PJ_DEF(pj_size_t) pj_timer_heap_mem_size(pj_size_t count)
*/
PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool,
pj_size_t size,
- unsigned flag,
pj_timer_heap_t **p_heap)
{
pj_timer_heap_t *ht;
@@ -348,23 +357,14 @@ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool,
/* Initialize timer heap sizes */
ht->max_size = size;
- ht->cur_size = 0;
+ ht->cur_size = 0;
+ ht->max_entries_per_poll = DEFAULT_MAX_TIMED_OUT_PER_POLL;
ht->timer_ids_freelist = 1;
- ht->pool = pool;
-
- /* Mutex. */
- if (flag & PJ_TIMER_HEAP_NO_SYNCHRONIZE) {
- ht->mutex = NULL;
- } else {
- pj_status_t rc;
-
- /* Mutex must be the recursive types.
- * See commented code inside pj_timer_heap_poll()
- */
- rc = pj_mutex_create(pool, "tmhp%p", PJ_MUTEX_RECURSE, &ht->mutex);
- if (rc != PJ_SUCCESS)
- return rc;
- }
+ ht->pool = pool;
+
+ /* Lock. */
+ ht->lock = NULL;
+ ht->auto_delete_lock = 0;
// Create the heap array.
ht->heap = pj_pool_alloc(pool, sizeof(pj_timer_entry*) * size);
@@ -385,6 +385,34 @@ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool,
*p_heap = ht;
return PJ_SUCCESS;
}
+
+PJ_DEF(void) pj_timer_heap_destroy( pj_timer_heap_t *ht )
+{
+ if (ht->lock && ht->auto_delete_lock) {
+ pj_lock_destroy(ht->lock);
+ ht->lock = NULL;
+ }
+}
+
+PJ_DEF(void) pj_timer_heap_set_lock( pj_timer_heap_t *ht,
+ pj_lock_t *lock,
+ pj_bool_t auto_del )
+{
+ if (ht->lock && ht->auto_delete_lock)
+ pj_lock_destroy(ht->lock);
+
+ ht->lock = lock;
+ ht->auto_delete_lock = auto_del;
+}
+
+
+PJ_DEF(unsigned) pj_timer_heap_set_max_timed_out_per_poll(pj_timer_heap_t *ht,
+ unsigned count )
+{
+ unsigned old_count = ht->max_entries_per_poll;
+ ht->max_entries_per_poll = count;
+ return old_count;
+}
PJ_DEF(pj_timer_entry*) pj_timer_entry_init( pj_timer_entry *entry,
int id,
@@ -433,12 +461,13 @@ PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht,
return count;
}
-PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay )
+PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht,
+ pj_time_val *next_delay )
{
pj_time_val now;
- int count;
+ unsigned count;
- PJ_ASSERT_RETURN(ht, -1);
+ PJ_ASSERT_RETURN(ht, 0);
if (!ht->cur_size && next_delay) {
next_delay->sec = next_delay->msec = PJ_MAXINT32;
@@ -450,16 +479,15 @@ PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay )
lock_timer_heap(ht);
while ( ht->cur_size &&
- PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) )
+ PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) &&
+ count < ht->max_entries_per_poll )
{
pj_timer_entry *node = remove_node(ht, 0);
++count;
- //Better not to temporarily release mutex to save some syscalls.
- //But then make sure the mutex must be the recursive types (PJ_MUTEX_RECURSE)!
- //unlock_timer_heap(ht);
+ unlock_timer_heap(ht);
(*node->cb)(ht, node);
- //lock_timer_heap(ht);
+ lock_timer_heap(ht);
}
if (ht->cur_size && next_delay) {
*next_delay = ht->heap[0]->_timer_value;
@@ -473,7 +501,9 @@ PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay )
}
PJ_DEF(pj_size_t) pj_timer_heap_count( pj_timer_heap_t *ht )
-{
+{
+ PJ_ASSERT_RETURN(ht, 0);
+
return ht->cur_size;
}