diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-09 15:37:19 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-09 15:37:19 +0000 |
commit | 6e1024262b48b57b771331b8c19e988e43627bd7 (patch) | |
tree | a43fdaeb6d7b22cc7afab1633622bf55d39dfd67 /pjlib/src/pj | |
parent | fb9e3b3a6649cc5cbe0c6747cb1918f3be71ba06 (diff) |
Rework pjlib++
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@36 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj')
-rw-r--r-- | pjlib/src/pj/errno.c | 3 | ||||
-rw-r--r-- | pjlib/src/pj/file_io_ansi.c | 6 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 115 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_winnt.c | 34 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_unix.c | 75 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_win32.c | 69 | ||||
-rw-r--r-- | pjlib/src/pj/timer.c | 96 |
7 files changed, 313 insertions, 85 deletions
diff --git a/pjlib/src/pj/errno.c b/pjlib/src/pj/errno.c index 682d9b6c..25bf5b7f 100644 --- a/pjlib/src/pj/errno.c +++ b/pjlib/src/pj/errno.c @@ -29,7 +29,8 @@ static const struct { PJ_ETOOMANY, "Too many objects of the specified type"}, { PJ_EBUSY, "Object is busy"}, { PJ_ENOTSUP, "Option/operation is not supported"}, - { PJ_EINVALIDOP, "Invalid operation"} + { PJ_EINVALIDOP, "Invalid operation"},
+ { PJ_ECANCELLED, "Operation cancelled"} }; /* diff --git a/pjlib/src/pj/file_io_ansi.c b/pjlib/src/pj/file_io_ansi.c index f95c74a9..0946eddc 100644 --- a/pjlib/src/pj/file_io_ansi.c +++ b/pjlib/src/pj/file_io_ansi.c @@ -66,7 +66,8 @@ PJ_DEF(pj_status_t) pj_file_write( pj_oshandle_t fd, clearerr((FILE*)fd); written = fwrite(data, 1, *size, (FILE*)fd); - if (ferror((FILE*)fd)) { + if (ferror((FILE*)fd)) {
+ *size = -1; return PJ_RETURN_OS_ERROR(errno); } @@ -82,7 +83,8 @@ PJ_DEF(pj_status_t) pj_file_read( pj_oshandle_t fd, clearerr((FILE*)fd); bytes = fread(data, 1, *size, (FILE*)fd); - if (ferror((FILE*)fd)) { + if (ferror((FILE*)fd)) {
+ *size = -1; return PJ_RETURN_OS_ERROR(errno); } diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 4cffcae4..75774ede 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -227,7 +227,9 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * so that send() can work in parallel. */ if (h->fd_type == PJ_SOCK_DGRAM) { - pj_list_erase(write_op); + pj_list_erase(write_op);
+ write_op->op = 0;
+ if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); @@ -267,7 +269,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { if (h->fd_type != PJ_SOCK_DGRAM) { /* Write completion of the whole stream. */ - pj_list_erase(write_op); + pj_list_erase(write_op);
+ write_op->op = 0; /* Clear operation if there's no more data to send. */ if (pj_list_empty(&h->write_list)) @@ -313,7 +316,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Get one accept operation from the list. */ accept_op = h->accept_list.next; - pj_list_erase(accept_op); + pj_list_erase(accept_op);
+ accept_op->op = 0; /* Clear bit in fdset if there is no more pending accept */ if (pj_list_empty(&h->accept_list)) @@ -346,7 +350,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Get one pending read operation from the list. */ read_op = h->read_list.next; - pj_list_erase(read_op); + pj_list_erase(read_op);
+ read_op->op = 0; /* Clear fdset if there is no pending read. */ if (pj_list_empty(&h->read_list)) @@ -475,6 +480,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available. */ @@ -496,8 +504,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, * No data is immediately available. * Must schedule asynchronous operation to the ioqueue. */ - read_op = (struct read_operation*)op_key; - read_op->op = PJ_IOQUEUE_OP_RECV; read_op->buf = buffer; read_op->size = *length; @@ -530,6 +536,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available. */ @@ -552,8 +561,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, * No data is immediately available. * Must schedule asynchronous operation to the ioqueue. */ - read_op = (struct read_operation*)op_key; - read_op->op = PJ_IOQUEUE_OP_RECV_FROM; read_op->buf = buffer; read_op->size = *length; @@ -586,6 +593,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track: * Try to send data immediately, only if there's no pending write! @@ -624,7 +634,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, /* * Schedule asynchronous send. */ - write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND; write_op->buf = (void*)data; write_op->size = *length; @@ -659,6 +668,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track: * Try to send data immediately, only if there's no pending write! @@ -702,7 +714,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, /* * Schedule asynchronous send. */ - write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND_TO; write_op->buf = (void*)data; write_op->size = *length; @@ -735,6 +746,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, /* check parameters. All must be specified! */ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); +
+ accept_op = (struct accept_operation*)op_key;
+ accept_op->op = 0;
/* Fast track: * See if there's new connection available immediately. @@ -767,8 +781,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, * Schedule accept() operation to be completed when there is incoming * connection available. */ - accept_op = (struct accept_operation*)op_key; - accept_op->op = PJ_IOQUEUE_OP_ACCEPT; accept_op->accept_fd = new_sock; accept_op->rmt_addr = remote; @@ -821,3 +833,82 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, } #endif /* PJ_HAS_TCP */ +/*
+ * pj_ioqueue_is_pending()
+ */
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ struct generic_operation *op_rec;
+
+ PJ_UNUSED_ARG(key);
+
+ op_rec = (struct generic_operation*)op_key;
+ return op_rec->op != 0;
+}
+
+
+/*
+ * pj_ioqueue_post_completion()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ struct generic_operation *op_rec;
+
+ /*
+ * Find the operation key in all pending operation list to
+ * really make sure that it's still there; then call the callback.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Find the operation in the pending read list. */
+ op_rec = (struct generic_operation*)key->read_list.next;
+ while (op_rec != (void*)&key->read_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_read_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending write list. */
+ op_rec = (struct generic_operation*)key->write_list.next;
+ while (op_rec != (void*)&key->write_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_write_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending accept list. */
+ op_rec = (struct generic_operation*)key->accept_list.next;
+ while (op_rec != (void*)&key->accept_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_accept_complete)(key, op_key,
+ PJ_INVALID_SOCKET,
+ bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EINVALIDOP;
+}
+
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 7944953f..93cbb6d5 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -891,3 +891,37 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, } #endif /* #if PJ_HAS_TCP */ +
+
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ BOOL rc;
+ DWORD bytesTransfered;
+
+ rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
+ &bytesTransfered, FALSE );
+
+ if (rc == FALSE) {
+ return GetLastError()==ERROR_IO_INCOMPLETE;
+ }
+
+ return FALSE;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ BOOL rc;
+
+ rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
+ (long)key, (OVERLAPPED*)op_key );
+ if (rc == FALSE) {
+ return PJ_RETURN_OS_ERROR(GetLastError());
+ }
+
+ return PJ_SUCCESS;
+}
+
diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c index 3892b64b..cc57aab2 100644 --- a/pjlib/src/pj/os_core_unix.c +++ b/pjlib/src/pj/os_core_unix.c @@ -558,54 +558,91 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var) return oldval; } +/*
+ * pj_atomic_inc_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)
+{
+ pj_atomic_value_t new_value;
+
+ PJ_CHECK_STACK();
+
+#if PJ_HAS_THREADS
+ pj_mutex_lock( atomic_var->mutex );
+#endif
+ new_value = ++atomic_var->value;
+#if PJ_HAS_THREADS
+ pj_mutex_unlock( atomic_var->mutex);
+#endif
+
+ return new_value;
+}
/* * pj_atomic_inc() */ PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var) { - PJ_CHECK_STACK(); - -#if PJ_HAS_THREADS - pj_mutex_lock( atomic_var->mutex ); -#endif - ++atomic_var->value; -#if PJ_HAS_THREADS - pj_mutex_unlock( atomic_var->mutex); -#endif + pj_atomic_inc_and_get(atomic_var); } /* - * pj_atomic_dec() + * pj_atomic_dec_and_get() */ -PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) -{ +PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var) +{
+ pj_atomic_value_t new_value;
+ PJ_CHECK_STACK(); #if PJ_HAS_THREADS pj_mutex_lock( atomic_var->mutex ); #endif - --atomic_var->value; + new_value = --atomic_var->value; #if PJ_HAS_THREADS pj_mutex_unlock( atomic_var->mutex); -#endif +#endif
+
+ return new_value; } +
+/*
+ * pj_atomic_dec()
+ */
+PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
+{
+ pj_atomic_dec_and_get(atomic_var);
+}
/* - * pj_atomic_add() + * pj_atomic_add_and_get() */ -PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value ) -{ +PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value ) +{
+ pj_atomic_value_t new_value;
+ #if PJ_HAS_THREADS pj_mutex_lock(atomic_var->mutex); #endif - atomic_var->value += value; + atomic_var->value += value;
+ new_value = atomic_var->value; #if PJ_HAS_THREADS pj_mutex_unlock(atomic_var->mutex); -#endif +#endif
+
+ return new_value; } +/*
+ * pj_atomic_add()
+ */
+PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value )
+{
+ pj_atomic_add_and_get(atomic_var, value);
+}
/////////////////////////////////////////////////////////////////////////////// /* diff --git a/pjlib/src/pj/os_core_win32.c b/pjlib/src/pj/os_core_win32.c index 68416c66..be770d56 100644 --- a/pjlib/src/pj/os_core_win32.c +++ b/pjlib/src/pj/os_core_win32.c @@ -512,32 +512,48 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var) return atomic_var->value; } +/*
+ * pj_atomic_inc_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)
+{
+ PJ_CHECK_STACK();
+
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ return InterlockedIncrement(&atomic_var->value);
+#else
+# error Fix Me
+#endif
+}
+
/* * pj_atomic_inc() */ PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var) -{ - PJ_CHECK_STACK(); - -#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400 - InterlockedIncrement(&atomic_var->value); -#else -# error Fix Me -#endif +{
+ pj_atomic_inc_and_get(atomic_var); } - +
+/*
+ * pj_atomic_dec_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var)
+{
+ PJ_CHECK_STACK();
+
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ return InterlockedDecrement(&atomic_var->value);
+#else
+# error Fix me
+#endif
+}
+
/* * pj_atomic_dec() */ PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) { - PJ_CHECK_STACK(); - -#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400 - InterlockedDecrement(&atomic_var->value); -#else -# error Fix me -#endif + pj_atomic_dec_and_get(atomic_var); } /* @@ -546,10 +562,27 @@ PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value ) { - InterlockedExchangeAdd( &atomic_var->value, value ); +#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ InterlockedExchangeAdd( &atomic_var->value, value );
+#else
+# error Fix me
+#endif +} +
+/*
+ * pj_atomic_add_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value)
+{
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ long oldValue = InterlockedExchangeAdd( &atomic_var->value, value);
+ return oldValue + value;
+#else
+# error Fix me
+#endif
} - /////////////////////////////////////////////////////////////////////////////// /* * pj_thread_local_alloc() diff --git a/pjlib/src/pj/timer.c b/pjlib/src/pj/timer.c index 9fa190a1..ffec1f4d 100644 --- a/pjlib/src/pj/timer.c +++ b/pjlib/src/pj/timer.c @@ -14,9 +14,13 @@ #include <pj/string.h> #include <pj/assert.h> #include <pj/errno.h> +#include <pj/lock.h>
#define HEAP_PARENT(X) (X == 0 ? 0 : (((X) - 1) / 2)) #define HEAP_LEFT(X) (((X)+(X))+1) +
+
+#define DEFAULT_MAX_TIMED_OUT_PER_POLL (64)
/** @@ -32,9 +36,15 @@ struct pj_timer_heap_t /** Current size of the heap. */ pj_size_t cur_size; +
+ /** Max timed out entries to process per poll. */
+ unsigned max_entries_per_poll;
- /** Mutex for synchronization, or NULL */ - pj_mutex_t *mutex; + /** Lock object. */ + pj_lock_t *lock; +
+ /** Autodelete lock. */
+ pj_bool_t auto_delete_lock;
/** * Current contents of the Heap, which is organized as a "heap" of @@ -71,15 +81,15 @@ struct pj_timer_heap_t PJ_INLINE(void) lock_timer_heap( pj_timer_heap_t *ht ) { - if (ht->mutex) { - pj_mutex_lock(ht->mutex); + if (ht->lock) { + pj_lock_acquire(ht->lock); } } PJ_INLINE(void) unlock_timer_heap( pj_timer_heap_t *ht ) { - if (ht->mutex) { - pj_mutex_unlock(ht->mutex); + if (ht->lock) { + pj_lock_release(ht->lock); } } @@ -319,7 +329,7 @@ PJ_DEF(pj_size_t) pj_timer_heap_mem_size(pj_size_t count) sizeof(pj_timer_heap_t) + /* size of each entry: */ (count+2) * (sizeof(pj_timer_entry*)+sizeof(pj_timer_id_t)) + - /* mutex, pool etc: */ + /* lock, pool etc: */ 132; } @@ -328,7 +338,6 @@ PJ_DEF(pj_size_t) pj_timer_heap_mem_size(pj_size_t count) */ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool, pj_size_t size, - unsigned flag, pj_timer_heap_t **p_heap) { pj_timer_heap_t *ht; @@ -348,23 +357,14 @@ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool, /* Initialize timer heap sizes */ ht->max_size = size; - ht->cur_size = 0; + ht->cur_size = 0;
+ ht->max_entries_per_poll = DEFAULT_MAX_TIMED_OUT_PER_POLL; ht->timer_ids_freelist = 1; - ht->pool = pool; - - /* Mutex. */ - if (flag & PJ_TIMER_HEAP_NO_SYNCHRONIZE) { - ht->mutex = NULL; - } else { - pj_status_t rc; - - /* Mutex must be the recursive types. - * See commented code inside pj_timer_heap_poll() - */ - rc = pj_mutex_create(pool, "tmhp%p", PJ_MUTEX_RECURSE, &ht->mutex); - if (rc != PJ_SUCCESS) - return rc; - } + ht->pool = pool;
+ + /* Lock. */ + ht->lock = NULL;
+ ht->auto_delete_lock = 0;
// Create the heap array. ht->heap = pj_pool_alloc(pool, sizeof(pj_timer_entry*) * size); @@ -385,6 +385,34 @@ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool, *p_heap = ht; return PJ_SUCCESS; } +
+PJ_DEF(void) pj_timer_heap_destroy( pj_timer_heap_t *ht )
+{
+ if (ht->lock && ht->auto_delete_lock) {
+ pj_lock_destroy(ht->lock);
+ ht->lock = NULL;
+ }
+}
+
+PJ_DEF(void) pj_timer_heap_set_lock( pj_timer_heap_t *ht,
+ pj_lock_t *lock,
+ pj_bool_t auto_del )
+{
+ if (ht->lock && ht->auto_delete_lock)
+ pj_lock_destroy(ht->lock);
+
+ ht->lock = lock;
+ ht->auto_delete_lock = auto_del;
+}
+
+
+PJ_DEF(unsigned) pj_timer_heap_set_max_timed_out_per_poll(pj_timer_heap_t *ht,
+ unsigned count )
+{
+ unsigned old_count = ht->max_entries_per_poll;
+ ht->max_entries_per_poll = count;
+ return old_count;
+}
PJ_DEF(pj_timer_entry*) pj_timer_entry_init( pj_timer_entry *entry, int id, @@ -433,12 +461,13 @@ PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht, return count; } -PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay ) +PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht,
+ pj_time_val *next_delay ) { pj_time_val now; - int count; + unsigned count; - PJ_ASSERT_RETURN(ht, -1); + PJ_ASSERT_RETURN(ht, 0); if (!ht->cur_size && next_delay) { next_delay->sec = next_delay->msec = PJ_MAXINT32; @@ -450,16 +479,15 @@ PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay ) lock_timer_heap(ht); while ( ht->cur_size && - PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) ) + PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) &&
+ count < ht->max_entries_per_poll ) { pj_timer_entry *node = remove_node(ht, 0); ++count; - //Better not to temporarily release mutex to save some syscalls. - //But then make sure the mutex must be the recursive types (PJ_MUTEX_RECURSE)! - //unlock_timer_heap(ht); + unlock_timer_heap(ht); (*node->cb)(ht, node); - //lock_timer_heap(ht); + lock_timer_heap(ht); } if (ht->cur_size && next_delay) { *next_delay = ht->heap[0]->_timer_value; @@ -473,7 +501,9 @@ PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay ) } PJ_DEF(pj_size_t) pj_timer_heap_count( pj_timer_heap_t *ht ) -{ +{
+ PJ_ASSERT_RETURN(ht, 0);
+ return ht->cur_size; } |