From 9b8e0a5afe9cba0fd430e9642630bd465db9aefa Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Thu, 21 Feb 2013 11:18:36 +0000 Subject: Fixed #1616: Implementation of Group lock and other foundation in PJLIB for fixing synchronization issues git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@4359 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/include/pj/activesock.h | 7 +- pjlib/include/pj/config.h | 16 ++ pjlib/include/pj/errno.h | 5 + pjlib/include/pj/ioqueue.h | 13 + pjlib/include/pj/lock.h | 252 +++++++++++++++++++ pjlib/include/pj/timer.h | 66 ++++- pjlib/include/pj/types.h | 3 + pjlib/src/pj/activesock.c | 56 ++++- pjlib/src/pj/errno.c | 3 +- pjlib/src/pj/ioqueue_common_abs.c | 114 +++++---- pjlib/src/pj/ioqueue_common_abs.h | 3 +- pjlib/src/pj/ioqueue_epoll.c | 18 +- pjlib/src/pj/ioqueue_select.c | 81 +++++- pjlib/src/pj/lock.c | 515 ++++++++++++++++++++++++++++++++++++++ pjlib/src/pj/os_core_unix.c | 121 +++++++-- pjlib/src/pj/timer.c | 114 ++++++++- 16 files changed, 1283 insertions(+), 104 deletions(-) (limited to 'pjlib') diff --git a/pjlib/include/pj/activesock.h b/pjlib/include/pj/activesock.h index 5a201864..11fa6589 100644 --- a/pjlib/include/pj/activesock.h +++ b/pjlib/include/pj/activesock.h @@ -173,6 +173,11 @@ typedef struct pj_activesock_cb */ typedef struct pj_activesock_cfg { + /** + * Optional group lock to be assigned to the ioqueue key. + */ + pj_grp_lock_t *grp_lock; + /** * Number of concurrent asynchronous operations that is to be supported * by the active socket. This value only affects socket receive and @@ -290,7 +295,6 @@ PJ_DECL(pj_status_t) pj_activesock_create_udp(pj_pool_t *pool, pj_activesock_t **p_asock, pj_sockaddr *bound_addr); - /** * Close the active socket. This will unregister the socket from the * ioqueue and ultimately close the socket. @@ -548,6 +552,7 @@ PJ_DECL(pj_status_t) pj_activesock_start_connect(pj_activesock_t *asock, const pj_sockaddr_t *remaddr, int addr_len); + #endif /* PJ_HAS_TCP */ /** diff --git a/pjlib/include/pj/config.h b/pjlib/include/pj/config.h index 2568214d..9b504c3a 100644 --- a/pjlib/include/pj/config.h +++ b/pjlib/include/pj/config.h @@ -487,6 +487,14 @@ #endif +/** + * Set this to 1 to enable debugging on the group lock. Default: 0 + */ +#ifndef PJ_GRP_LOCK_DEBUG +# define PJ_GRP_LOCK_DEBUG 0 +#endif + + /** * Specify this as \a stack_size argument in #pj_thread_create() to specify * that thread should use default stack size for the current platform. @@ -1121,6 +1129,14 @@ # define PJ_TODO(id) TODO___##id: #endif +/** + * Simulate race condition by sleeping the thread in strategic locations. + * Default: no! + */ +#ifndef PJ_RACE_ME +# define PJ_RACE_ME(x) +#endif + /** * Function attributes to inform that the function may throw exception. * diff --git a/pjlib/include/pj/errno.h b/pjlib/include/pj/errno.h index 66fea949..e9e403ce 100644 --- a/pjlib/include/pj/errno.h +++ b/pjlib/include/pj/errno.h @@ -422,6 +422,11 @@ PJ_DECL(pj_status_t) pj_register_strerror(pj_status_t start_code, * Unsupported address family */ #define PJ_EAFNOTSUP (PJ_ERRNO_START_STATUS + 22)/* 70022 */ +/** + * @hideinitializer + * Object no longer exists + */ +#define PJ_EGONE (PJ_ERRNO_START_STATUS + 23)/* 70023 */ /** @} */ /* pj_errnum */ diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index 40bb403c..1e983327 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -404,6 +404,19 @@ PJ_DECL(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key ); +/** + * Variant of pj_ioqueue_register_sock() with additional group lock parameter. + * If group lock is set for the key, the key will add the reference counter + * when the socket is registered and decrease it when it is destroyed. + */ +PJ_DECL(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, + pj_ioqueue_t *ioque, + pj_sock_t sock, + pj_grp_lock_t *grp_lock, + void *user_data, + const pj_ioqueue_callback *cb, + pj_ioqueue_key_t **key ); + /** * Unregister from the I/O Queue framework. Caller must make sure that * the key doesn't have any pending operations before calling this function, diff --git a/pjlib/include/pj/lock.h b/pjlib/include/pj/lock.h index b8bdb0f9..b1a5d72e 100644 --- a/pjlib/include/pj/lock.h +++ b/pjlib/include/pj/lock.h @@ -147,6 +147,258 @@ PJ_DECL(pj_status_t) pj_lock_destroy( pj_lock_t *lock ); /** @} */ + +/** + * @defgroup PJ_GRP_LOCK Group Lock + * @ingroup PJ_LOCK + * @{ + * + * Group lock is a synchronization object to manage concurrency among members + * within the same logical group. Example of such groups are: + * + * - dialog, which has members such as the dialog itself, an invite session, + * and several transactions + * - ICE, which has members such as ICE stream transport, ICE session, STUN + * socket, TURN socket, and down to ioqueue key + * + * Group lock has three functions: + * + * - mutual exclusion: to protect resources from being accessed by more than + * one threads at the same time + * - session management: to make sure that the resource is not destroyed + * while others are still using or about to use it. + * - lock coordinator: to provide uniform lock ordering among more than one + * lock objects, which is necessary to avoid deadlock. + * + * The requirements of the group lock are: + * + * - must satisfy all the functions above + * - must allow members to join or leave the group (for example, + * transaction may be added or removed from a dialog) + * - must be able to synchronize with external lock (for example, a dialog + * lock must be able to sync itself with PJSUA lock) + * + * Please see https://trac.pjsip.org/repos/wiki/Group_Lock for more info. + */ + +/** + * Settings for creating the group lock. + */ +typedef struct pj_grp_lock_config +{ + /** + * Creation flags, currently must be zero. + */ + unsigned flags; + +} pj_grp_lock_config; + + +/** + * Initialize the config with the default values. + * + * @param cfg The config to be initialized. + */ +PJ_DECL(void) pj_grp_lock_config_default(pj_grp_lock_config *cfg); + +/** + * Create a group lock object. Initially the group lock will have reference + * counter of one. + * + * @param pool The group lock only uses the pool parameter to get + * the pool factory, from which it will create its own + * pool. + * @param cfg Optional configuration. + * @param p_grp_lock Pointer to receive the newly created group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_create(pj_pool_t *pool, + const pj_grp_lock_config *cfg, + pj_grp_lock_t **p_grp_lock); + +/** + * Forcibly destroy the group lock, ignoring the reference counter value. + * + * @param grp_lock The group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_destroy( pj_grp_lock_t *grp_lock); + +/** + * Move the contents of the old lock to the new lock and destroy the + * old lock. + * + * @param old_lock The old group lock to be destroyed. + * @param new_lock The new group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_replace(pj_grp_lock_t *old_lock, + pj_grp_lock_t *new_lock); + +/** + * Acquire lock on the specified group lock. + * + * @param grp_lock The group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_acquire( pj_grp_lock_t *grp_lock); + +/** + * Acquire lock on the specified group lock if it is available, otherwise + * return immediately wihout waiting. + * + * @param grp_lock The group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_tryacquire( pj_grp_lock_t *grp_lock); + +/** + * Release the previously held lock. This may cause the group lock + * to be destroyed if it is the last one to hold the reference counter. + * In that case, the function will return PJ_EGONE. + * + * @param grp_lock The group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_release( pj_grp_lock_t *grp_lock); + +/** + * Add a destructor handler, to be called by the group lock when it is + * about to be destroyed. + * + * @param grp_lock The group lock. + * @param pool Pool to allocate memory for the handler. + * @param member A pointer to be passed to the handler. + * @param handler The destroy handler. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_add_handler(pj_grp_lock_t *grp_lock, + pj_pool_t *pool, + void *member, + void (*handler)(void *member)); + +/** + * Remove previously registered handler. All parameters must be the same + * as when the handler was added. + * + * @param grp_lock The group lock. + * @param member A pointer to be passed to the handler. + * @param handler The destroy handler. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_del_handler(pj_grp_lock_t *grp_lock, + void *member, + void (*handler)(void *member)); + +/** + * Increment reference counter to prevent the group lock grom being destroyed. + * + * @param grp_lock The group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +#if !PJ_GRP_LOCK_DEBUG +PJ_DECL(pj_status_t) pj_grp_lock_add_ref(pj_grp_lock_t *grp_lock); + +#define pj_grp_lock_add_ref_dbg(grp_lock, x, y) pj_grp_lock_add_ref(grp_lock) + +#else + +#define pj_grp_lock_add_ref(g) pj_grp_lock_add_ref_dbg(g, __FILE__, __LINE__) + +PJ_DECL(pj_status_t) pj_grp_lock_add_ref_dbg(pj_grp_lock_t *grp_lock, + const char *file, + int line); +#endif + +/** + * Decrement the reference counter. When the counter value reaches zero, the + * group lock will be destroyed and all destructor handlers will be called. + * + * @param grp_lock The group lock. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +#if !PJ_GRP_LOCK_DEBUG +PJ_DECL(pj_status_t) pj_grp_lock_dec_ref(pj_grp_lock_t *grp_lock); + +#define pj_grp_lock_dec_ref_dbg(grp_lock, x, y) pj_grp_lock_dec_ref(grp_lock) +#else + +#define pj_grp_lock_dec_ref(g) pj_grp_lock_dec_ref_dbg(g, __FILE__, __LINE__) + +PJ_DECL(pj_status_t) pj_grp_lock_dec_ref_dbg(pj_grp_lock_t *grp_lock, + const char *file, + int line); + +#endif + +/** + * Get current reference count value. This normally is only used for + * debugging purpose. + * + * @param grp_lock The group lock. + * + * @return The reference count value. + */ +PJ_DECL(int) pj_grp_lock_get_ref(pj_grp_lock_t *grp_lock); + + +/** + * Dump group lock info for debugging purpose. If group lock debugging is + * enabled (via PJ_GRP_LOCK_DEBUG) macro, this will print the group lock + * reference counter value along with the source file and line. If + * debugging is disabled, this will only print the reference counter. + * + * @param grp_lock The group lock. + */ +PJ_DECL(void) pj_grp_lock_dump(pj_grp_lock_t *grp_lock); + + +/** + * Synchronize an external lock with the group lock, by adding it to the + * list of locks to be acquired by the group lock when the group lock is + * acquired. + * + * The ''pos'' argument specifies the lock order and also the relative + * position with regard to lock ordering against the group lock. Locks with + * lower ''pos'' value will be locked first, and those with negative value + * will be locked before the group lock (the group lock's ''pos'' value is + * zero). + * + * @param grp_lock The group lock. + * @param ext_lock The external lock + * @param pos The position. + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_chain_lock(pj_grp_lock_t *grp_lock, + pj_lock_t *ext_lock, + int pos); + +/** + * Remove an external lock from group lock's list of synchronized locks. + * + * @param grp_lock The group lock. + * @param ext_lock The external lock + * + * @return PJ_SUCCESS or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_grp_lock_unchain_lock(pj_grp_lock_t *grp_lock, + pj_lock_t *ext_lock); + + +/** @} */ + + PJ_END_DECL diff --git a/pjlib/include/pj/timer.h b/pjlib/include/pj/timer.h index c2182ce3..59149385 100644 --- a/pjlib/include/pj/timer.h +++ b/pjlib/include/pj/timer.h @@ -24,6 +24,7 @@ */ #include +#include PJ_BEGIN_DECL @@ -118,6 +119,12 @@ typedef struct pj_timer_entry */ pj_time_val _timer_value; + /** + * Internal: the group lock used by this entry, set when + * pj_timer_heap_schedule_w_lock() is used. + */ + pj_grp_lock_t *_grp_lock; + #if PJ_TIMER_DEBUG const char *src_file; int src_line; @@ -229,7 +236,46 @@ PJ_DECL(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht, #endif /* PJ_TIMER_DEBUG */ /** - * Cancel a previously registered timer. + * Schedule a timer entry which will expire AFTER the specified delay, and + * increment the reference counter of the group lock while the timer entry + * is active. The group lock reference counter will automatically be released + * after the timer callback is called or when the timer is cancelled. + * + * @param ht The timer heap. + * @param entry The entry to be registered. + * @param id_val The value to be set to the "id" field of the timer entry + * once the timer is scheduled. + * @param delay The interval to expire. + * @param grp_lock The group lock. + * + * @return PJ_SUCCESS, or the appropriate error code. + */ +#if PJ_TIMER_DEBUG +# define pj_timer_heap_schedule_w_grp_lock(ht,e,d,id,g) \ + pj_timer_heap_schedule_w_grp_lock_dbg(ht,e,d,id,g,__FILE__,__LINE__) + + PJ_DECL(pj_status_t) pj_timer_heap_schedule_w_grp_lock_dbg( + pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay, + int id_val, + pj_grp_lock_t *grp_lock, + const char *src_file, + int src_line); +#else +PJ_DECL(pj_status_t) pj_timer_heap_schedule_w_grp_lock( + pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay, + int id_val, + pj_grp_lock_t *grp_lock); +#endif /* PJ_TIMER_DEBUG */ + + +/** + * Cancel a previously registered timer. This will also decrement the + * reference counter of the group lock associated with the timer entry, + * if the entry was scheduled with one. * * @param ht The timer heap. * @param entry The entry to be cancelled. @@ -240,6 +286,24 @@ PJ_DECL(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht, PJ_DECL(int) pj_timer_heap_cancel( pj_timer_heap_t *ht, pj_timer_entry *entry); +/** + * Cancel only if the previously registered timer is active. This will + * also decrement the reference counter of the group lock associated + * with the timer entry, if the entry was scheduled with one. In any + * case, set the "id" to the specified value. + * + * @param ht The timer heap. + * @param entry The entry to be cancelled. + * @param id_val Value to be set to "id" + * + * @return The number of timer cancelled, which should be one if the + * entry has really been registered, or zero if no timer was + * cancelled. + */ +PJ_DECL(int) pj_timer_heap_cancel_if_active(pj_timer_heap_t *ht, + pj_timer_entry *entry, + int id_val); + /** * Get the number of timer entries. * diff --git a/pjlib/include/pj/types.h b/pjlib/include/pj/types.h index 9f05ce44..04e426b5 100644 --- a/pjlib/include/pj/types.h +++ b/pjlib/include/pj/types.h @@ -231,6 +231,9 @@ typedef struct pj_thread_t pj_thread_t; /** Lock object. */ typedef struct pj_lock_t pj_lock_t; +/** Group lock */ +typedef struct pj_grp_lock_t pj_grp_lock_t; + /** Mutex handle. */ typedef struct pj_mutex_t pj_mutex_t; diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c index 5452ca4b..2c0cad55 100644 --- a/pjlib/src/pj/activesock.c +++ b/pjlib/src/pj/activesock.c @@ -43,6 +43,13 @@ enum read_type TYPE_RECV_FROM }; +enum shutdown_dir +{ + SHUT_NONE = 0, + SHUT_RX = 1, + SHUT_TX = 2 +}; + struct read_op { pj_ioqueue_op_key_t op_key; @@ -77,6 +84,7 @@ struct pj_activesock_t pj_ioqueue_t *ioqueue; void *user_data; unsigned async_count; + unsigned shutdown; unsigned max_loop; pj_activesock_cb cb; #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ @@ -209,8 +217,9 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, ioq_cb.on_accept_complete = &ioqueue_on_accept_complete; #endif - status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock, - &ioq_cb, &asock->key); + status = pj_ioqueue_register_sock2(pool, ioqueue, sock, + (opt? opt->grp_lock : NULL), + asock, &ioq_cb, &asock->key); if (status != PJ_SUCCESS) { pj_activesock_close(asock); return status; @@ -283,10 +292,10 @@ PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool, return PJ_SUCCESS; } - PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock) { PJ_ASSERT_RETURN(asock, PJ_EINVAL); + asock->shutdown = SHUT_RX | SHUT_TX; if (asock->key) { #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 @@ -448,6 +457,10 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + /* Ignore if we've been shutdown */ + if (asock->shutdown & SHUT_RX) + return; + do { unsigned flags; @@ -569,6 +582,10 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, if (!ret) return; + /* Also stop further read if we've been shutdown */ + if (asock->shutdown & SHUT_RX) + return; + /* Only stream oriented socket may leave data in the packet */ if (asock->stream_oriented) { r->size = remainder; @@ -648,6 +665,9 @@ PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, { PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); + if (asock->shutdown & SHUT_TX) + return PJ_EINVALIDOP; + send_key->activesock_data = NULL; if (asock->whole_data) { @@ -698,6 +718,9 @@ PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock, PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, PJ_EINVAL); + if (asock->shutdown & SHUT_TX) + return PJ_EINVALIDOP; + return pj_ioqueue_sendto(asock->key, send_key, data, size, flags, addr, addr_len); } @@ -711,6 +734,13 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + /* Ignore if we've been shutdown. This may cause data to be partially + * sent even when 'wholedata' was requested if the OS only sent partial + * buffer. + */ + if (asock->shutdown & SHUT_TX) + return; + if (bytes_sent > 0 && op_key->activesock_data) { /* whole_data is requested. Make sure we send all the data */ struct send_data *sd = (struct send_data*)op_key->activesock_data; @@ -756,6 +786,10 @@ PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock, PJ_ASSERT_RETURN(asock, PJ_EINVAL); PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP); + /* Ignore if we've been shutdown */ + if (asock->shutdown) + return PJ_EINVALIDOP; + asock->accept_op = (struct accept_op*) pj_pool_calloc(pool, asock->async_count, sizeof(struct accept_op)); @@ -798,6 +832,10 @@ static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, PJ_UNUSED_ARG(new_sock); + /* Ignore if we've been shutdown */ + if (asock->shutdown) + return; + do { if (status == asock->last_err && status != PJ_SUCCESS) { asock->err_counter++; @@ -835,6 +873,10 @@ static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, pj_sock_close(accept_op->new_sock); } + /* Don't start another accept() if we've been shutdown */ + if (asock->shutdown) + return; + /* Prepare next accept() */ accept_op->new_sock = PJ_INVALID_SOCKET; accept_op->rem_addr_len = sizeof(accept_op->rem_addr); @@ -853,6 +895,10 @@ PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock, int addr_len) { PJ_UNUSED_ARG(pool); + + if (asock->shutdown) + return PJ_EINVALIDOP; + return pj_ioqueue_connect(asock->key, remaddr, addr_len); } @@ -861,6 +907,10 @@ static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, { pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + /* Ignore if we've been shutdown */ + if (asock->shutdown) + return; + if (asock->cb.on_connect_complete) { pj_bool_t ret; diff --git a/pjlib/src/pj/errno.c b/pjlib/src/pj/errno.c index 615d1450..4726b33d 100644 --- a/pjlib/src/pj/errno.c +++ b/pjlib/src/pj/errno.c @@ -77,7 +77,8 @@ static const struct PJ_BUILD_ERR(PJ_ETOOSMALL, "Size is too short"), PJ_BUILD_ERR(PJ_EIGNORED, "Ignored"), PJ_BUILD_ERR(PJ_EIPV6NOTSUP, "IPv6 is not supported"), - PJ_BUILD_ERR(PJ_EAFNOTSUP, "Unsupported address family") + PJ_BUILD_ERR(PJ_EAFNOTSUP, "Unsupported address family"), + PJ_BUILD_ERR(PJ_EGONE, "Object no longer exists") }; #endif /* PJ_HAS_ERROR_STRING */ diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 296aea90..a5dc3f95 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -70,6 +70,7 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *key, pj_sock_t sock, + pj_grp_lock_t *grp_lock, void *user_data, const pj_ioqueue_callback *cb) { @@ -114,10 +115,18 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, /* Create mutex for the key. */ #if !PJ_IOQUEUE_HAS_SAFE_UNREG - rc = pj_mutex_create_simple(pool, NULL, &key->mutex); + rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock); #endif + if (rc != PJ_SUCCESS) + return rc; + + /* Group lock */ + key->grp_lock = grp_lock; + if (key->grp_lock) { + pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0); + } - return rc; + return PJ_SUCCESS; } /* @@ -189,10 +198,10 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { /* Lock the key. */ - pj_mutex_lock(h->mutex); + pj_ioqueue_lock_key(h); if (IS_CLOSING(h)) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } @@ -261,7 +270,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } else { has_lock = PJ_TRUE; } @@ -272,7 +281,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) /* Unlock if we still hold the lock */ if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } /* Done. */ @@ -379,7 +388,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -392,11 +402,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } else { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } /* Done. */ @@ -406,7 +416,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * are signalled for the same event, but only one thread eventually * able to process the event. */ - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } @@ -415,10 +425,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) pj_status_t rc; /* Lock the key. */ - pj_mutex_lock(h->mutex); + pj_ioqueue_lock_key(h); if (IS_CLOSING(h)) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } @@ -453,7 +463,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -466,7 +477,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } else @@ -567,7 +578,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -580,7 +592,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } else { @@ -589,7 +601,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * are signalled for the same event, but only one thread eventually * able to process the event. */ - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } @@ -599,19 +611,19 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, { pj_bool_t has_lock; - pj_mutex_lock(h->mutex); + pj_ioqueue_lock_key(h); if (!h->connecting) { /* It is possible that more than one thread was woken up, thus * the remaining thread will see h->connecting as zero because * it has been processed by other thread. */ - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } if (IS_CLOSING(h)) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } @@ -629,7 +641,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -651,7 +664,7 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } @@ -713,18 +726,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, read_op->size = *length; read_op->flags = flags; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->read_list, read_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -789,18 +802,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, read_op->rmt_addr = addr; read_op->rmt_addrlen = addrlen; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->read_list, read_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -903,18 +916,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, write_op->written = 0; write_op->flags = flags; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->write_list, write_op); ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -1050,18 +1063,18 @@ retry_on_restart: pj_memcpy(&write_op->rmt_addr, addr, addrlen); write_op->rmt_addrlen = addrlen; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->write_list, write_op); ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -1127,18 +1140,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, accept_op->addrlen= addrlen; accept_op->local_addr = local; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->accept_list, accept_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -1171,18 +1184,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, } else { if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { /* Pending! */ - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous * check in multithreaded app. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } key->connecting = PJ_TRUE; ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } else { /* Error! */ @@ -1228,7 +1241,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, * Find the operation key in all pending operation list to * really make sure that it's still there; then call the callback. */ - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Find the operation in the pending read list. */ op_rec = (struct generic_operation*)key->read_list.next; @@ -1236,7 +1249,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = PJ_IOQUEUE_OP_NONE; - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); (*key->cb.on_read_complete)(key, op_key, bytes_status); return PJ_SUCCESS; @@ -1250,7 +1263,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = PJ_IOQUEUE_OP_NONE; - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); (*key->cb.on_write_complete)(key, op_key, bytes_status); return PJ_SUCCESS; @@ -1264,7 +1277,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = PJ_IOQUEUE_OP_NONE; - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); (*key->cb.on_accept_complete)(key, op_key, PJ_INVALID_SOCKET, @@ -1274,7 +1287,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, op_rec = op_rec->next; } - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EINVALIDOP; } @@ -1304,11 +1317,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) { - return pj_mutex_lock(key->mutex); + if (key->grp_lock) + return pj_grp_lock_acquire(key->grp_lock); + else + return pj_lock_acquire(key->lock); } PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) { - return pj_mutex_unlock(key->mutex); + if (key->grp_lock) + return pj_grp_lock_release(key->grp_lock); + else + return pj_lock_release(key->lock); } + diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index 3a41051b..3bdbb524 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -101,7 +101,8 @@ union operation_key #define DECLARE_COMMON_KEY \ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ pj_ioqueue_t *ioqueue; \ - pj_mutex_t *mutex; \ + pj_grp_lock_t *grp_lock; \ + pj_lock_t *lock; \ pj_bool_t inside_callback; \ pj_bool_t destroy_requested; \ pj_bool_t allow_concurrent; \ diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index 4c547791..18b37e8a 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -262,11 +262,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); key->ref_count = 0; - rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); + rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); if (rc != PJ_SUCCESS) { key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } pj_mutex_destroy(ioqueue->ref_cnt_mutex); @@ -323,19 +323,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) /* Destroy reference counters */ key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } @@ -422,7 +422,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev); if (status < 0) { rc = pj_get_os_error(); - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = NULL; TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", @@ -497,7 +497,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) * the key. We need to lock the key before ioqueue here to prevent * deadlock. */ - pj_mutex_lock(key->mutex); + pj_lock_acquire(key->lock); /* Also lock ioqueue */ pj_lock_acquire(ioqueue->lock); @@ -531,9 +531,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) decrement_counter(key); /* Done. */ - pj_mutex_unlock(key->mutex); + pj_lock_release(key->lock); #else - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); #endif return PJ_SUCCESS; diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 603b7ee8..0d97c0a6 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -39,6 +39,7 @@ #include #include #include +#include /* Now that we have access to OS'es , lets check again that * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE @@ -237,11 +238,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); key->ref_count = 0; - rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); + rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); if (rc != PJ_SUCCESS) { key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } pj_mutex_destroy(ioqueue->ref_cnt_mutex); @@ -284,19 +285,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) /* Destroy reference counters */ key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } @@ -312,9 +313,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) * * Register socket handle to ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, +PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_sock_t sock, + pj_grp_lock_t *grp_lock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **p_key) @@ -358,7 +360,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); #endif - rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); if (rc != PJ_SUCCESS) { key = NULL; goto on_return; @@ -386,12 +388,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, on_return: /* On error, socket may be left in non-blocking mode. */ + if (rc != PJ_SUCCESS) { + if (key->grp_lock) + pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); + } *p_key = key; pj_lock_release(ioqueue->lock); return rc; } +PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_sock_t sock, + void *user_data, + const pj_ioqueue_callback *cb, + pj_ioqueue_key_t **p_key) +{ + return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, + cb, p_key); +} + #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Increment key's reference counter */ static void increment_counter(pj_ioqueue_key_t *key) @@ -446,7 +463,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) * the key. We need to lock the key before ioqueue here to prevent * deadlock. */ - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Also lock ioqueue */ pj_lock_acquire(ioqueue->lock); @@ -485,9 +502,34 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) decrement_counter(key); /* Done. */ - pj_mutex_unlock(key->mutex); + if (key->grp_lock) { + /* just dec_ref and unlock. we will set grp_lock to NULL + * elsewhere */ + pj_grp_lock_t *grp_lock = key->grp_lock; + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + //key->grp_lock = NULL; + pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); + pj_grp_lock_release(grp_lock); + } else { + pj_ioqueue_unlock_key(key); + } #else - pj_mutex_destroy(key->mutex); + if (key->grp_lock) { + /* set grp_lock to NULL and unlock */ + pj_grp_lock_t *grp_lock = key->grp_lock; + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + //key->grp_lock = NULL; + pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); + pj_grp_lock_release(grp_lock); + } else { + pj_ioqueue_unlock_key(key); + } + + pj_lock_destroy(key->lock); #endif return PJ_SUCCESS; @@ -620,6 +662,10 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue) if (PJ_TIME_VAL_GTE(now, h->free_time)) { pj_list_erase(h); + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + //h->grp_lock = NULL; pj_list_push_back(&ioqueue->free_list, h); } h = next; @@ -781,7 +827,7 @@ on_error: PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; - int count, counter; + int count, i, counter; pj_ioqueue_key_t *h; struct event { @@ -892,8 +938,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #endif } + for (i=0; igrp_lock) + pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); + } + + PJ_RACE_ME(5); + pj_lock_release(ioqueue->lock); + PJ_RACE_ME(5); + count = counter; /* Now process all events. The dispatch functions will take care @@ -918,6 +973,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(event[counter].key); #endif + + if (event[counter].key->grp_lock) + pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, + "ioqueue", 0); } diff --git a/pjlib/src/pj/lock.c b/pjlib/src/pj/lock.c index c281f8b3..04b1b670 100644 --- a/pjlib/src/pj/lock.c +++ b/pjlib/src/pj/lock.c @@ -20,10 +20,12 @@ #include #include #include +#include #include #include #include +#define THIS_FILE "lock.c" typedef void LOCK_OBJ; @@ -196,3 +198,516 @@ PJ_DEF(pj_status_t) pj_lock_destroy( pj_lock_t *lock ) return (*lock->destroy)(lock->lock_object); } + +/****************************************************************************** + * Group lock + */ + +/* Individual lock in the group lock */ +typedef struct grp_lock_item +{ + PJ_DECL_LIST_MEMBER(struct grp_lock_item); + int prio; + pj_lock_t *lock; + +} grp_lock_item; + +/* Destroy callbacks */ +typedef struct grp_destroy_callback +{ + PJ_DECL_LIST_MEMBER(struct grp_destroy_callback); + void *comp; + void (*handler)(void*); +} grp_destroy_callback; + +#if PJ_GRP_LOCK_DEBUG +/* Store each add_ref caller */ +typedef struct grp_lock_ref +{ + PJ_DECL_LIST_MEMBER(struct grp_lock_ref); + const char *file; + int line; +} grp_lock_ref; +#endif + +/* The group lock */ +struct pj_grp_lock_t +{ + pj_lock_t base; + + pj_pool_t *pool; + pj_atomic_t *ref_cnt; + pj_lock_t *own_lock; + + pj_thread_t *owner; + int owner_cnt; + + grp_lock_item lock_list; + grp_destroy_callback destroy_list; + +#if PJ_GRP_LOCK_DEBUG + grp_lock_ref ref_list; + grp_lock_ref ref_free_list; +#endif +}; + + +PJ_DEF(void) pj_grp_lock_config_default(pj_grp_lock_config *cfg) +{ + pj_bzero(cfg, sizeof(*cfg)); +} + +static void grp_lock_set_owner_thread(pj_grp_lock_t *glock) +{ + if (!glock->owner) { + glock->owner = pj_thread_this(); + glock->owner_cnt = 1; + } else { + pj_assert(glock->owner == pj_thread_this()); + glock->owner_cnt++; + } +} + +static void grp_lock_unset_owner_thread(pj_grp_lock_t *glock) +{ + pj_assert(glock->owner == pj_thread_this()); + pj_assert(glock->owner_cnt > 0); + if (--glock->owner_cnt <= 0) { + glock->owner = NULL; + glock->owner_cnt = 0; + } +} + +static pj_status_t grp_lock_acquire(LOCK_OBJ *p) +{ + pj_grp_lock_t *glock = (pj_grp_lock_t*)p; + grp_lock_item *lck; + + pj_assert(pj_atomic_get(glock->ref_cnt) > 0); + + lck = glock->lock_list.next; + while (lck != &glock->lock_list) { + pj_lock_acquire(lck->lock); + lck = lck->next; + } + grp_lock_set_owner_thread(glock); + pj_grp_lock_add_ref(glock); + return PJ_SUCCESS; +} + +static pj_status_t grp_lock_tryacquire(LOCK_OBJ *p) +{ + pj_grp_lock_t *glock = (pj_grp_lock_t*)p; + grp_lock_item *lck; + + pj_assert(pj_atomic_get(glock->ref_cnt) > 0); + + lck = glock->lock_list.next; + while (lck != &glock->lock_list) { + pj_status_t status = pj_lock_tryacquire(lck->lock); + if (status != PJ_SUCCESS) { + lck = lck->prev; + while (lck != &glock->lock_list) { + pj_lock_release(lck->lock); + lck = lck->prev; + } + return status; + } + lck = lck->next; + } + grp_lock_set_owner_thread(glock); + pj_grp_lock_add_ref(glock); + return PJ_SUCCESS; +} + +static pj_status_t grp_lock_release(LOCK_OBJ *p) +{ + pj_grp_lock_t *glock = (pj_grp_lock_t*)p; + grp_lock_item *lck; + + grp_lock_unset_owner_thread(glock); + + lck = glock->lock_list.prev; + while (lck != &glock->lock_list) { + pj_lock_release(lck->lock); + lck = lck->prev; + } + return pj_grp_lock_dec_ref(glock); +} + +static pj_status_t grp_lock_destroy(LOCK_OBJ *p) +{ + pj_grp_lock_t *glock = (pj_grp_lock_t*)p; + pj_pool_t *pool = glock->pool; + grp_lock_item *lck; + grp_destroy_callback *cb; + + if (!glock->pool) { + /* already destroyed?! */ + return PJ_EINVAL; + } + + /* Release all chained locks */ + lck = glock->lock_list.next; + while (lck != &glock->lock_list) { + if (lck->lock != glock->own_lock) { + unsigned i; + for (i=0; iowner_cnt; ++i) + pj_lock_release(lck->lock); + } + lck = lck->next; + } + + /* Call callbacks */ + cb = glock->destroy_list.next; + while (cb != &glock->destroy_list) { + grp_destroy_callback *next = cb->next; + cb->handler(cb->comp); + cb = next; + } + + pj_lock_destroy(glock->own_lock); + pj_atomic_destroy(glock->ref_cnt); + glock->pool = NULL; + pj_pool_release(pool); + + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_grp_lock_create( pj_pool_t *pool, + const pj_grp_lock_config *cfg, + pj_grp_lock_t **p_grp_lock) +{ + pj_grp_lock_t *glock; + grp_lock_item *own_lock; + pj_status_t status; + + PJ_ASSERT_RETURN(pool && p_grp_lock, PJ_EINVAL); + + PJ_UNUSED_ARG(cfg); + + pool = pj_pool_create(pool->factory, "glck%p", 512, 512, NULL); + if (!pool) + return PJ_ENOMEM; + + glock = PJ_POOL_ZALLOC_T(pool, pj_grp_lock_t); + glock->base.lock_object = glock; + glock->base.acquire = &grp_lock_acquire; + glock->base.tryacquire = &grp_lock_tryacquire; + glock->base.release = &grp_lock_release; + glock->base.destroy = &grp_lock_destroy; + + glock->pool = pool; + pj_list_init(&glock->lock_list); + pj_list_init(&glock->destroy_list); +#if PJ_GRP_LOCK_DEBUG + pj_list_init(&glock->ref_list); + pj_list_init(&glock->ref_free_list); +#endif + + status = pj_atomic_create(pool, 0, &glock->ref_cnt); + if (status != PJ_SUCCESS) + goto on_error; + + status = pj_lock_create_recursive_mutex(pool, pool->obj_name, + &glock->own_lock); + if (status != PJ_SUCCESS) + goto on_error; + + own_lock = PJ_POOL_ZALLOC_T(pool, grp_lock_item); + own_lock->lock = glock->own_lock; + pj_list_push_back(&glock->lock_list, own_lock); + + *p_grp_lock = glock; + return PJ_SUCCESS; + +on_error: + grp_lock_destroy(glock); + return status; +} + +PJ_DEF(pj_status_t) pj_grp_lock_destroy( pj_grp_lock_t *grp_lock) +{ + return grp_lock_destroy(grp_lock); +} + +PJ_DEF(pj_status_t) pj_grp_lock_acquire( pj_grp_lock_t *grp_lock) +{ + return grp_lock_acquire(grp_lock); +} + +PJ_DEF(pj_status_t) pj_grp_lock_tryacquire( pj_grp_lock_t *grp_lock) +{ + return grp_lock_tryacquire(grp_lock); +} + +PJ_DEF(pj_status_t) pj_grp_lock_release( pj_grp_lock_t *grp_lock) +{ + return grp_lock_release(grp_lock); +} + +PJ_DEF(pj_status_t) pj_grp_lock_replace( pj_grp_lock_t *old, + pj_grp_lock_t *new) +{ + grp_destroy_callback *ocb; + + /* Move handlers from old to new */ + ocb = old->destroy_list.next; + while (ocb != &old->destroy_list) { + grp_destroy_callback *ncb; + + ncb = PJ_POOL_ALLOC_T(new->pool, grp_destroy_callback); + ncb->comp = ocb->comp; + ncb->handler = ocb->handler; + pj_list_push_back(&new->destroy_list, ncb); + + ocb = ocb->next; + } + + pj_list_init(&old->destroy_list); + + grp_lock_destroy(old); + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_grp_lock_add_handler( pj_grp_lock_t *glock, + pj_pool_t *pool, + void *comp, + void (*destroy)(void *comp)) +{ + grp_destroy_callback *cb; + + grp_lock_acquire(glock); + + if (pool == NULL) + pool = glock->pool; + + cb = PJ_POOL_ZALLOC_T(pool, grp_destroy_callback); + cb->comp = comp; + cb->handler = destroy; + pj_list_push_back(&glock->destroy_list, cb); + + grp_lock_release(glock); + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_grp_lock_del_handler( pj_grp_lock_t *glock, + void *comp, + void (*destroy)(void *comp)) +{ + grp_destroy_callback *cb; + + grp_lock_acquire(glock); + + cb = glock->destroy_list.next; + while (cb != &glock->destroy_list) { + if (cb->comp == comp && cb->handler == destroy) + break; + cb = cb->next; + } + + if (cb != &glock->destroy_list) + pj_list_erase(cb); + + grp_lock_release(glock); + return PJ_SUCCESS; +} + +static pj_status_t grp_lock_add_ref(pj_grp_lock_t *glock) +{ + pj_atomic_inc(glock->ref_cnt); + return PJ_SUCCESS; +} + +static pj_status_t grp_lock_dec_ref(pj_grp_lock_t *glock) +{ + int cnt; /* for debugging */ + if ((cnt=pj_atomic_dec_and_get(glock->ref_cnt)) == 0) { + grp_lock_destroy(glock); + return PJ_EGONE; + } + pj_assert(cnt > 0); + pj_grp_lock_dump(glock); + return PJ_SUCCESS; +} + +#if PJ_GRP_LOCK_DEBUG +PJ_DEF(pj_status_t) pj_grp_lock_add_ref_dbg(pj_grp_lock_t *glock, + const char *file, + int line) +{ + grp_lock_ref *ref; + pj_status_t status; + + pj_enter_critical_section(); + if (!pj_list_empty(&glock->ref_free_list)) { + ref = glock->ref_free_list.next; + pj_list_erase(ref); + } else { + ref = PJ_POOL_ALLOC_T(glock->pool, grp_lock_ref); + } + + ref->file = file; + ref->line = line; + pj_list_push_back(&glock->ref_list, ref); + + pj_leave_critical_section(); + + status = grp_lock_add_ref(glock); + + if (status != PJ_SUCCESS) { + pj_enter_critical_section(); + pj_list_erase(ref); + pj_list_push_back(&glock->ref_free_list, ref); + pj_leave_critical_section(); + } + + return status; +} + +PJ_DEF(pj_status_t) pj_grp_lock_dec_ref_dbg(pj_grp_lock_t *glock, + const char *file, + int line) +{ + grp_lock_ref *ref; + + pj_enter_critical_section(); + /* Find the same source file */ + ref = glock->ref_list.next; + while (ref != &glock->ref_list) { + if (strcmp(ref->file, file) == 0) { + pj_list_erase(ref); + pj_list_push_back(&glock->ref_free_list, ref); + break; + } + ref = ref->next; + } + pj_leave_critical_section(); + + if (ref == &glock->ref_list) { + PJ_LOG(2,(THIS_FILE, "pj_grp_lock_dec_ref_dbg() could not find " + "matching ref for %s", file)); + } + + return grp_lock_dec_ref(glock); +} +#else +PJ_DEF(pj_status_t) pj_grp_lock_add_ref(pj_grp_lock_t *glock) +{ + return grp_lock_add_ref(glock); +} + +PJ_DEF(pj_status_t) pj_grp_lock_dec_ref(pj_grp_lock_t *glock) +{ + return grp_lock_dec_ref(glock); +} +#endif + +PJ_DEF(int) pj_grp_lock_get_ref(pj_grp_lock_t *glock) +{ + return pj_atomic_get(glock->ref_cnt); +} + +PJ_DEF(pj_status_t) pj_grp_lock_chain_lock( pj_grp_lock_t *glock, + pj_lock_t *lock, + int pos) +{ + grp_lock_item *lck, *new_lck; + unsigned i; + + grp_lock_acquire(glock); + + for (i=0; iowner_cnt; ++i) + pj_lock_acquire(lock); + + lck = glock->lock_list.next; + while (lck != &glock->lock_list) { + if (lck->prio >= pos) + break; + lck = lck->next; + } + + new_lck = PJ_POOL_ZALLOC_T(glock->pool, grp_lock_item); + new_lck->prio = pos; + new_lck->lock = lock; + pj_list_insert_before(lck, new_lck); + + /* this will also release the new lock */ + grp_lock_release(glock); + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_grp_lock_unchain_lock( pj_grp_lock_t *glock, + pj_lock_t *lock) +{ + grp_lock_item *lck; + + grp_lock_acquire(glock); + + lck = glock->lock_list.next; + while (lck != &glock->lock_list) { + if (lck->lock == lock) + break; + lck = lck->next; + } + + if (lck != &glock->lock_list) { + unsigned i; + + pj_list_erase(lck); + for (i=0; iowner_cnt; ++i) + pj_lock_release(lck->lock); + } + + grp_lock_release(glock); + return PJ_SUCCESS; +} + +PJ_DEF(void) pj_grp_lock_dump(pj_grp_lock_t *grp_lock) +{ +#if PJ_GRP_LOCK_DEBUG + grp_lock_ref *ref = grp_lock->ref_list.next; + char info_buf[1000]; + pj_str_t info; + + info.ptr = info_buf; + info.slen = 0; + + pj_grp_lock_acquire(grp_lock); + pj_enter_critical_section(); + + while (ref != &grp_lock->ref_list && info.slen < sizeof(info_buf)) { + char *start = info.ptr + info.slen; + int max_len = sizeof(info_buf) - info.slen; + int len; + + len = pj_ansi_snprintf(start, max_len, "%s:%d ", ref->file, ref->line); + if (len < 1 || len > max_len) { + len = strlen(ref->file); + if (len > max_len - 1) + len = max_len - 1; + + memcpy(start, ref->file, len); + start[len++] = ' '; + } + + info.slen += len; + + ref = ref->next; + } + + if (ref != &grp_lock->ref_list) { + int i; + for (i=0; i<4; ++i) + info_buf[sizeof(info_buf)-i-1] = '.'; + } + info.ptr[info.slen-1] = '\0'; + + pj_leave_critical_section(); + pj_grp_lock_release(grp_lock); + + PJ_LOG(4,(THIS_FILE, "Group lock %p, ref_cnt=%d. Reference holders: %s", + grp_lock, pj_grp_lock_get_ref(grp_lock), info.ptr)); +#endif +} diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c index 810e4b0d..252f7fcc 100644 --- a/pjlib/src/pj/os_core_unix.c +++ b/pjlib/src/pj/os_core_unix.c @@ -97,7 +97,18 @@ struct pj_sem_t #if defined(PJ_HAS_EVENT_OBJ) && PJ_HAS_EVENT_OBJ != 0 struct pj_event_t { - char obj_name[PJ_MAX_OBJ_NAME]; + enum event_state { + EV_STATE_OFF, + EV_STATE_SET, + EV_STATE_PULSED + } state; + + pj_mutex_t mutex; + pthread_cond_t cond; + + pj_bool_t auto_reset; + unsigned threads_waiting; + unsigned threads_to_release; }; #endif /* PJ_HAS_EVENT_OBJ */ @@ -1700,13 +1711,48 @@ PJ_DEF(pj_status_t) pj_event_create(pj_pool_t *pool, const char *name, pj_bool_t manual_reset, pj_bool_t initial, pj_event_t **ptr_event) { - pj_assert(!"Not supported!"); - PJ_UNUSED_ARG(pool); - PJ_UNUSED_ARG(name); - PJ_UNUSED_ARG(manual_reset); - PJ_UNUSED_ARG(initial); - PJ_UNUSED_ARG(ptr_event); - return PJ_EINVALIDOP; + pj_event_t *event; + + event = PJ_POOL_ALLOC_T(pool, pj_event_t); + + init_mutex(&event->mutex, name, PJ_MUTEX_SIMPLE); + pthread_cond_init(&event->cond, 0); + event->auto_reset = !manual_reset; + event->threads_waiting = 0; + + if (initial) { + event->state = EV_STATE_SET; + event->threads_to_release = 1; + } else { + event->state = EV_STATE_OFF; + event->threads_to_release = 0; + } + + *ptr_event = event; + return PJ_SUCCESS; +} + +static void event_on_one_release(pj_event_t *event) +{ + if (event->state == EV_STATE_SET) { + if (event->auto_reset) { + event->threads_to_release = 0; + event->state = EV_STATE_OFF; + } else { + /* Manual reset remains on */ + } + } else { + if (event->auto_reset) { + /* Only release one */ + event->threads_to_release = 0; + event->state = EV_STATE_OFF; + } else { + event->threads_to_release--; + pj_assert(event->threads_to_release >= 0); + if (event->threads_to_release==0) + event->state = EV_STATE_OFF; + } + } } /* @@ -1714,8 +1760,14 @@ PJ_DEF(pj_status_t) pj_event_create(pj_pool_t *pool, const char *name, */ PJ_DEF(pj_status_t) pj_event_wait(pj_event_t *event) { - PJ_UNUSED_ARG(event); - return PJ_EINVALIDOP; + pthread_mutex_lock(&event->mutex.mutex); + event->threads_waiting++; + while (event->state == EV_STATE_OFF) + pthread_cond_wait(&event->cond, &event->mutex.mutex); + event->threads_waiting--; + event_on_one_release(event); + pthread_mutex_unlock(&event->mutex.mutex); + return PJ_SUCCESS; } /* @@ -1723,8 +1775,16 @@ PJ_DEF(pj_status_t) pj_event_wait(pj_event_t *event) */ PJ_DEF(pj_status_t) pj_event_trywait(pj_event_t *event) { - PJ_UNUSED_ARG(event); - return PJ_EINVALIDOP; + pj_status_t status; + + pthread_mutex_lock(&event->mutex.mutex); + status = event->state != EV_STATE_OFF ? PJ_SUCCESS : -1; + if (status==PJ_SUCCESS) { + event_on_one_release(event); + } + pthread_mutex_unlock(&event->mutex.mutex); + + return status; } /* @@ -1732,8 +1792,15 @@ PJ_DEF(pj_status_t) pj_event_trywait(pj_event_t *event) */ PJ_DEF(pj_status_t) pj_event_set(pj_event_t *event) { - PJ_UNUSED_ARG(event); - return PJ_EINVALIDOP; + pthread_mutex_lock(&event->mutex.mutex); + event->threads_to_release = 1; + event->state = EV_STATE_SET; + if (event->auto_reset) + pthread_cond_signal(&event->cond); + else + pthread_cond_broadcast(&event->cond); + pthread_mutex_unlock(&event->mutex.mutex); + return PJ_SUCCESS; } /* @@ -1741,8 +1808,18 @@ PJ_DEF(pj_status_t) pj_event_set(pj_event_t *event) */ PJ_DEF(pj_status_t) pj_event_pulse(pj_event_t *event) { - PJ_UNUSED_ARG(event); - return PJ_EINVALIDOP; + pthread_mutex_lock(&event->mutex.mutex); + if (event->threads_waiting) { + event->threads_to_release = event->auto_reset ? 1 : + event->threads_waiting; + event->state = EV_STATE_PULSED; + if (event->threads_to_release==1) + pthread_cond_signal(&event->cond); + else + pthread_cond_broadcast(&event->cond); + } + pthread_mutex_unlock(&event->mutex.mutex); + return PJ_SUCCESS; } /* @@ -1750,8 +1827,11 @@ PJ_DEF(pj_status_t) pj_event_pulse(pj_event_t *event) */ PJ_DEF(pj_status_t) pj_event_reset(pj_event_t *event) { - PJ_UNUSED_ARG(event); - return PJ_EINVALIDOP; + pthread_mutex_lock(&event->mutex.mutex); + event->state = EV_STATE_OFF; + event->threads_to_release = 0; + pthread_mutex_unlock(&event->mutex.mutex); + return PJ_SUCCESS; } /* @@ -1759,8 +1839,9 @@ PJ_DEF(pj_status_t) pj_event_reset(pj_event_t *event) */ PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event) { - PJ_UNUSED_ARG(event); - return PJ_EINVALIDOP; + pj_mutex_destroy(&event->mutex); + pthread_cond_destroy(&event->cond); + return PJ_SUCCESS; } #endif /* PJ_HAS_EVENT_OBJ */ diff --git a/pjlib/src/pj/timer.c b/pjlib/src/pj/timer.c index 767a261f..47209fbe 100644 --- a/pjlib/src/pj/timer.c +++ b/pjlib/src/pj/timer.c @@ -35,6 +35,7 @@ #include #include #include +#include #define THIS_FILE "timer.c" @@ -451,20 +452,27 @@ PJ_DEF(pj_timer_entry*) pj_timer_entry_init( pj_timer_entry *entry, entry->id = id; entry->user_data = user_data; entry->cb = cb; + entry->_grp_lock = NULL; return entry; } #if PJ_TIMER_DEBUG -PJ_DEF(pj_status_t) pj_timer_heap_schedule_dbg( pj_timer_heap_t *ht, - pj_timer_entry *entry, - const pj_time_val *delay, - const char *src_file, - int src_line) +static pj_status_t schedule_w_grp_lock_dbg(pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay, + pj_bool_t set_id, + int id_val, + pj_grp_lock_t *grp_lock, + const char *src_file, + int src_line) #else -PJ_DEF(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht, - pj_timer_entry *entry, - const pj_time_val *delay) +static pj_status_t schedule_w_grp_lock(pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay, + pj_bool_t set_id, + int id_val, + pj_grp_lock_t *grp_lock) #endif { pj_status_t status; @@ -485,13 +493,66 @@ PJ_DEF(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht, lock_timer_heap(ht); status = schedule_entry(ht, entry, &expires); + if (status == PJ_SUCCESS) { + if (set_id) + entry->id = id_val; + entry->_grp_lock = grp_lock; + if (entry->_grp_lock) { + pj_grp_lock_add_ref(entry->_grp_lock); + } + } unlock_timer_heap(ht); return status; } -PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht, - pj_timer_entry *entry) + +#if PJ_TIMER_DEBUG +PJ_DEF(pj_status_t) pj_timer_heap_schedule_dbg( pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay, + const char *src_file, + int src_line) +{ + return schedule_w_grp_lock_dbg(ht, entry, delay, PJ_FALSE, 1, NULL, + src_file, src_line); +} + +PJ_DEF(pj_status_t) pj_timer_heap_schedule_w_grp_lock_dbg( + pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay, + int id_val, + pj_grp_lock_t *grp_lock, + const char *src_file, + int src_line) +{ + return schedule_w_grp_lock_dbg(ht, entry, delay, PJ_TRUE, id_val, + grp_lock, src_file, src_line); +} + +#else +PJ_DEF(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay) +{ + return schedule_w_grp_lock(ht, entry, delay, PJ_FALSE, 1, NULL); +} + +PJ_DEF(pj_status_t) pj_timer_heap_schedule_w_grp_lock(pj_timer_heap_t *ht, + pj_timer_entry *entry, + const pj_time_val *delay, + int id_val, + pj_grp_lock_t *grp_lock) +{ + return schedule_w_grp_lock(ht, entry, delay, PJ_TRUE, id_val, grp_lock); +} +#endif + +static int cancel_timer(pj_timer_heap_t *ht, + pj_timer_entry *entry, + pj_bool_t set_id, + int id_val) { int count; @@ -499,11 +560,32 @@ PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht, lock_timer_heap(ht); count = cancel(ht, entry, 1); + if (set_id) { + entry->id = id_val; + } + if (entry->_grp_lock) { + pj_grp_lock_t *grp_lock = entry->_grp_lock; + entry->_grp_lock = NULL; + pj_grp_lock_dec_ref(grp_lock); + } unlock_timer_heap(ht); return count; } +PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht, + pj_timer_entry *entry) +{ + return cancel_timer(ht, entry, PJ_FALSE, 0); +} + +PJ_DEF(int) pj_timer_heap_cancel_if_active(pj_timer_heap_t *ht, + pj_timer_entry *entry, + int id_val) +{ + return cancel_timer(ht, entry, PJ_TRUE, id_val); +} + PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay ) { @@ -527,11 +609,23 @@ PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht, count < ht->max_entries_per_poll ) { pj_timer_entry *node = remove_node(ht, 0); + pj_grp_lock_t *grp_lock; + ++count; + grp_lock = node->_grp_lock; + node->_grp_lock = NULL; + unlock_timer_heap(ht); + + PJ_RACE_ME(5); + if (node->cb) (*node->cb)(ht, node); + + if (grp_lock) + pj_grp_lock_dec_ref(grp_lock); + lock_timer_heap(ht); } if (ht->cur_size && next_delay) { -- cgit v1.2.3