summaryrefslogtreecommitdiff
path: root/pjlib
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2013-02-21 11:18:36 +0000
committerBenny Prijono <bennylp@teluu.com>2013-02-21 11:18:36 +0000
commit9b8e0a5afe9cba0fd430e9642630bd465db9aefa (patch)
tree550cd346c99b144a79c76096dc75d6be2dfb3c31 /pjlib
parent99de12689e7c6b8117ee82ff8e3c17e5ec85418d (diff)
Fixed #1616: Implementation of Group lock and other foundation in PJLIB for fixing synchronization issues
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@4359 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r--pjlib/include/pj/activesock.h7
-rw-r--r--pjlib/include/pj/config.h16
-rw-r--r--pjlib/include/pj/errno.h5
-rw-r--r--pjlib/include/pj/ioqueue.h13
-rw-r--r--pjlib/include/pj/lock.h252
-rw-r--r--pjlib/include/pj/timer.h66
-rw-r--r--pjlib/include/pj/types.h3
-rw-r--r--pjlib/src/pj/activesock.c56
-rw-r--r--pjlib/src/pj/errno.c3
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c114
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.h3
-rw-r--r--pjlib/src/pj/ioqueue_epoll.c18
-rw-r--r--pjlib/src/pj/ioqueue_select.c81
-rw-r--r--pjlib/src/pj/lock.c515
-rw-r--r--pjlib/src/pj/os_core_unix.c121
-rw-r--r--pjlib/src/pj/timer.c114
16 files changed, 1283 insertions, 104 deletions
diff --git a/pjlib/include/pj/activesock.h b/pjlib/include/pj/activesock.h
index 5a201864..11fa6589 100644
--- a/pjlib/include/pj/activesock.h
+++ b/pjlib/include/pj/activesock.h
@@ -174,6 +174,11 @@ typedef struct pj_activesock_cb
typedef struct pj_activesock_cfg
{
/**
+ * Optional group lock to be assigned to the ioqueue key.
+ */
+ pj_grp_lock_t *grp_lock;
+
+ /**
* Number of concurrent asynchronous operations that is to be supported
* by the active socket. This value only affects socket receive and
* accept operations -- the active socket will issue one or more
@@ -290,7 +295,6 @@ PJ_DECL(pj_status_t) pj_activesock_create_udp(pj_pool_t *pool,
pj_activesock_t **p_asock,
pj_sockaddr *bound_addr);
-
/**
* Close the active socket. This will unregister the socket from the
* ioqueue and ultimately close the socket.
@@ -548,6 +552,7 @@ PJ_DECL(pj_status_t) pj_activesock_start_connect(pj_activesock_t *asock,
const pj_sockaddr_t *remaddr,
int addr_len);
+
#endif /* PJ_HAS_TCP */
/**
diff --git a/pjlib/include/pj/config.h b/pjlib/include/pj/config.h
index 2568214d..9b504c3a 100644
--- a/pjlib/include/pj/config.h
+++ b/pjlib/include/pj/config.h
@@ -488,6 +488,14 @@
/**
+ * Set this to 1 to enable debugging on the group lock. Default: 0
+ */
+#ifndef PJ_GRP_LOCK_DEBUG
+# define PJ_GRP_LOCK_DEBUG 0
+#endif
+
+
+/**
* Specify this as \a stack_size argument in #pj_thread_create() to specify
* that thread should use default stack size for the current platform.
*
@@ -1122,6 +1130,14 @@
#endif
/**
+ * Simulate race condition by sleeping the thread in strategic locations.
+ * Default: no!
+ */
+#ifndef PJ_RACE_ME
+# define PJ_RACE_ME(x)
+#endif
+
+/**
* Function attributes to inform that the function may throw exception.
*
* @param x The exception list, enclosed in parenthesis.
diff --git a/pjlib/include/pj/errno.h b/pjlib/include/pj/errno.h
index 66fea949..e9e403ce 100644
--- a/pjlib/include/pj/errno.h
+++ b/pjlib/include/pj/errno.h
@@ -422,6 +422,11 @@ PJ_DECL(pj_status_t) pj_register_strerror(pj_status_t start_code,
* Unsupported address family
*/
#define PJ_EAFNOTSUP (PJ_ERRNO_START_STATUS + 22)/* 70022 */
+/**
+ * @hideinitializer
+ * Object no longer exists
+ */
+#define PJ_EGONE (PJ_ERRNO_START_STATUS + 23)/* 70023 */
/** @} */ /* pj_errnum */
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h
index 40bb403c..1e983327 100644
--- a/pjlib/include/pj/ioqueue.h
+++ b/pjlib/include/pj/ioqueue.h
@@ -405,6 +405,19 @@ PJ_DECL(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_ioqueue_key_t **key );
/**
+ * Variant of pj_ioqueue_register_sock() with additional group lock parameter.
+ * If group lock is set for the key, the key will add the reference counter
+ * when the socket is registered and decrease it when it is destroyed.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
+ pj_ioqueue_t *ioque,
+ pj_sock_t sock,
+ pj_grp_lock_t *grp_lock,
+ void *user_data,
+ const pj_ioqueue_callback *cb,
+ pj_ioqueue_key_t **key );
+
+/**
* Unregister from the I/O Queue framework. Caller must make sure that
* the key doesn't have any pending operations before calling this function,
* by calling #pj_ioqueue_is_pending() for all previously submitted
diff --git a/pjlib/include/pj/lock.h b/pjlib/include/pj/lock.h
index b8bdb0f9..b1a5d72e 100644
--- a/pjlib/include/pj/lock.h
+++ b/pjlib/include/pj/lock.h
@@ -147,6 +147,258 @@ PJ_DECL(pj_status_t) pj_lock_destroy( pj_lock_t *lock );
/** @} */
+
+/**
+ * @defgroup PJ_GRP_LOCK Group Lock
+ * @ingroup PJ_LOCK
+ * @{
+ *
+ * Group lock is a synchronization object to manage concurrency among members
+ * within the same logical group. Example of such groups are:
+ *
+ * - dialog, which has members such as the dialog itself, an invite session,
+ * and several transactions
+ * - ICE, which has members such as ICE stream transport, ICE session, STUN
+ * socket, TURN socket, and down to ioqueue key
+ *
+ * Group lock has three functions:
+ *
+ * - mutual exclusion: to protect resources from being accessed by more than
+ * one threads at the same time
+ * - session management: to make sure that the resource is not destroyed
+ * while others are still using or about to use it.
+ * - lock coordinator: to provide uniform lock ordering among more than one
+ * lock objects, which is necessary to avoid deadlock.
+ *
+ * The requirements of the group lock are:
+ *
+ * - must satisfy all the functions above
+ * - must allow members to join or leave the group (for example,
+ * transaction may be added or removed from a dialog)
+ * - must be able to synchronize with external lock (for example, a dialog
+ * lock must be able to sync itself with PJSUA lock)
+ *
+ * Please see https://trac.pjsip.org/repos/wiki/Group_Lock for more info.
+ */
+
+/**
+ * Settings for creating the group lock.
+ */
+typedef struct pj_grp_lock_config
+{
+ /**
+ * Creation flags, currently must be zero.
+ */
+ unsigned flags;
+
+} pj_grp_lock_config;
+
+
+/**
+ * Initialize the config with the default values.
+ *
+ * @param cfg The config to be initialized.
+ */
+PJ_DECL(void) pj_grp_lock_config_default(pj_grp_lock_config *cfg);
+
+/**
+ * Create a group lock object. Initially the group lock will have reference
+ * counter of one.
+ *
+ * @param pool The group lock only uses the pool parameter to get
+ * the pool factory, from which it will create its own
+ * pool.
+ * @param cfg Optional configuration.
+ * @param p_grp_lock Pointer to receive the newly created group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_create(pj_pool_t *pool,
+ const pj_grp_lock_config *cfg,
+ pj_grp_lock_t **p_grp_lock);
+
+/**
+ * Forcibly destroy the group lock, ignoring the reference counter value.
+ *
+ * @param grp_lock The group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_destroy( pj_grp_lock_t *grp_lock);
+
+/**
+ * Move the contents of the old lock to the new lock and destroy the
+ * old lock.
+ *
+ * @param old_lock The old group lock to be destroyed.
+ * @param new_lock The new group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_replace(pj_grp_lock_t *old_lock,
+ pj_grp_lock_t *new_lock);
+
+/**
+ * Acquire lock on the specified group lock.
+ *
+ * @param grp_lock The group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_acquire( pj_grp_lock_t *grp_lock);
+
+/**
+ * Acquire lock on the specified group lock if it is available, otherwise
+ * return immediately wihout waiting.
+ *
+ * @param grp_lock The group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_tryacquire( pj_grp_lock_t *grp_lock);
+
+/**
+ * Release the previously held lock. This may cause the group lock
+ * to be destroyed if it is the last one to hold the reference counter.
+ * In that case, the function will return PJ_EGONE.
+ *
+ * @param grp_lock The group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_release( pj_grp_lock_t *grp_lock);
+
+/**
+ * Add a destructor handler, to be called by the group lock when it is
+ * about to be destroyed.
+ *
+ * @param grp_lock The group lock.
+ * @param pool Pool to allocate memory for the handler.
+ * @param member A pointer to be passed to the handler.
+ * @param handler The destroy handler.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_add_handler(pj_grp_lock_t *grp_lock,
+ pj_pool_t *pool,
+ void *member,
+ void (*handler)(void *member));
+
+/**
+ * Remove previously registered handler. All parameters must be the same
+ * as when the handler was added.
+ *
+ * @param grp_lock The group lock.
+ * @param member A pointer to be passed to the handler.
+ * @param handler The destroy handler.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_del_handler(pj_grp_lock_t *grp_lock,
+ void *member,
+ void (*handler)(void *member));
+
+/**
+ * Increment reference counter to prevent the group lock grom being destroyed.
+ *
+ * @param grp_lock The group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+#if !PJ_GRP_LOCK_DEBUG
+PJ_DECL(pj_status_t) pj_grp_lock_add_ref(pj_grp_lock_t *grp_lock);
+
+#define pj_grp_lock_add_ref_dbg(grp_lock, x, y) pj_grp_lock_add_ref(grp_lock)
+
+#else
+
+#define pj_grp_lock_add_ref(g) pj_grp_lock_add_ref_dbg(g, __FILE__, __LINE__)
+
+PJ_DECL(pj_status_t) pj_grp_lock_add_ref_dbg(pj_grp_lock_t *grp_lock,
+ const char *file,
+ int line);
+#endif
+
+/**
+ * Decrement the reference counter. When the counter value reaches zero, the
+ * group lock will be destroyed and all destructor handlers will be called.
+ *
+ * @param grp_lock The group lock.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+#if !PJ_GRP_LOCK_DEBUG
+PJ_DECL(pj_status_t) pj_grp_lock_dec_ref(pj_grp_lock_t *grp_lock);
+
+#define pj_grp_lock_dec_ref_dbg(grp_lock, x, y) pj_grp_lock_dec_ref(grp_lock)
+#else
+
+#define pj_grp_lock_dec_ref(g) pj_grp_lock_dec_ref_dbg(g, __FILE__, __LINE__)
+
+PJ_DECL(pj_status_t) pj_grp_lock_dec_ref_dbg(pj_grp_lock_t *grp_lock,
+ const char *file,
+ int line);
+
+#endif
+
+/**
+ * Get current reference count value. This normally is only used for
+ * debugging purpose.
+ *
+ * @param grp_lock The group lock.
+ *
+ * @return The reference count value.
+ */
+PJ_DECL(int) pj_grp_lock_get_ref(pj_grp_lock_t *grp_lock);
+
+
+/**
+ * Dump group lock info for debugging purpose. If group lock debugging is
+ * enabled (via PJ_GRP_LOCK_DEBUG) macro, this will print the group lock
+ * reference counter value along with the source file and line. If
+ * debugging is disabled, this will only print the reference counter.
+ *
+ * @param grp_lock The group lock.
+ */
+PJ_DECL(void) pj_grp_lock_dump(pj_grp_lock_t *grp_lock);
+
+
+/**
+ * Synchronize an external lock with the group lock, by adding it to the
+ * list of locks to be acquired by the group lock when the group lock is
+ * acquired.
+ *
+ * The ''pos'' argument specifies the lock order and also the relative
+ * position with regard to lock ordering against the group lock. Locks with
+ * lower ''pos'' value will be locked first, and those with negative value
+ * will be locked before the group lock (the group lock's ''pos'' value is
+ * zero).
+ *
+ * @param grp_lock The group lock.
+ * @param ext_lock The external lock
+ * @param pos The position.
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_chain_lock(pj_grp_lock_t *grp_lock,
+ pj_lock_t *ext_lock,
+ int pos);
+
+/**
+ * Remove an external lock from group lock's list of synchronized locks.
+ *
+ * @param grp_lock The group lock.
+ * @param ext_lock The external lock
+ *
+ * @return PJ_SUCCESS or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_grp_lock_unchain_lock(pj_grp_lock_t *grp_lock,
+ pj_lock_t *ext_lock);
+
+
+/** @} */
+
+
PJ_END_DECL
diff --git a/pjlib/include/pj/timer.h b/pjlib/include/pj/timer.h
index c2182ce3..59149385 100644
--- a/pjlib/include/pj/timer.h
+++ b/pjlib/include/pj/timer.h
@@ -24,6 +24,7 @@
*/
#include <pj/types.h>
+#include <pj/lock.h>
PJ_BEGIN_DECL
@@ -118,6 +119,12 @@ typedef struct pj_timer_entry
*/
pj_time_val _timer_value;
+ /**
+ * Internal: the group lock used by this entry, set when
+ * pj_timer_heap_schedule_w_lock() is used.
+ */
+ pj_grp_lock_t *_grp_lock;
+
#if PJ_TIMER_DEBUG
const char *src_file;
int src_line;
@@ -229,7 +236,46 @@ PJ_DECL(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht,
#endif /* PJ_TIMER_DEBUG */
/**
- * Cancel a previously registered timer.
+ * Schedule a timer entry which will expire AFTER the specified delay, and
+ * increment the reference counter of the group lock while the timer entry
+ * is active. The group lock reference counter will automatically be released
+ * after the timer callback is called or when the timer is cancelled.
+ *
+ * @param ht The timer heap.
+ * @param entry The entry to be registered.
+ * @param id_val The value to be set to the "id" field of the timer entry
+ * once the timer is scheduled.
+ * @param delay The interval to expire.
+ * @param grp_lock The group lock.
+ *
+ * @return PJ_SUCCESS, or the appropriate error code.
+ */
+#if PJ_TIMER_DEBUG
+# define pj_timer_heap_schedule_w_grp_lock(ht,e,d,id,g) \
+ pj_timer_heap_schedule_w_grp_lock_dbg(ht,e,d,id,g,__FILE__,__LINE__)
+
+ PJ_DECL(pj_status_t) pj_timer_heap_schedule_w_grp_lock_dbg(
+ pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay,
+ int id_val,
+ pj_grp_lock_t *grp_lock,
+ const char *src_file,
+ int src_line);
+#else
+PJ_DECL(pj_status_t) pj_timer_heap_schedule_w_grp_lock(
+ pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay,
+ int id_val,
+ pj_grp_lock_t *grp_lock);
+#endif /* PJ_TIMER_DEBUG */
+
+
+/**
+ * Cancel a previously registered timer. This will also decrement the
+ * reference counter of the group lock associated with the timer entry,
+ * if the entry was scheduled with one.
*
* @param ht The timer heap.
* @param entry The entry to be cancelled.
@@ -241,6 +287,24 @@ PJ_DECL(int) pj_timer_heap_cancel( pj_timer_heap_t *ht,
pj_timer_entry *entry);
/**
+ * Cancel only if the previously registered timer is active. This will
+ * also decrement the reference counter of the group lock associated
+ * with the timer entry, if the entry was scheduled with one. In any
+ * case, set the "id" to the specified value.
+ *
+ * @param ht The timer heap.
+ * @param entry The entry to be cancelled.
+ * @param id_val Value to be set to "id"
+ *
+ * @return The number of timer cancelled, which should be one if the
+ * entry has really been registered, or zero if no timer was
+ * cancelled.
+ */
+PJ_DECL(int) pj_timer_heap_cancel_if_active(pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ int id_val);
+
+/**
* Get the number of timer entries.
*
* @param ht The timer heap.
diff --git a/pjlib/include/pj/types.h b/pjlib/include/pj/types.h
index 9f05ce44..04e426b5 100644
--- a/pjlib/include/pj/types.h
+++ b/pjlib/include/pj/types.h
@@ -231,6 +231,9 @@ typedef struct pj_thread_t pj_thread_t;
/** Lock object. */
typedef struct pj_lock_t pj_lock_t;
+/** Group lock */
+typedef struct pj_grp_lock_t pj_grp_lock_t;
+
/** Mutex handle. */
typedef struct pj_mutex_t pj_mutex_t;
diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c
index 5452ca4b..2c0cad55 100644
--- a/pjlib/src/pj/activesock.c
+++ b/pjlib/src/pj/activesock.c
@@ -43,6 +43,13 @@ enum read_type
TYPE_RECV_FROM
};
+enum shutdown_dir
+{
+ SHUT_NONE = 0,
+ SHUT_RX = 1,
+ SHUT_TX = 2
+};
+
struct read_op
{
pj_ioqueue_op_key_t op_key;
@@ -77,6 +84,7 @@ struct pj_activesock_t
pj_ioqueue_t *ioqueue;
void *user_data;
unsigned async_count;
+ unsigned shutdown;
unsigned max_loop;
pj_activesock_cb cb;
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
@@ -209,8 +217,9 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
#endif
- status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
- &ioq_cb, &asock->key);
+ status = pj_ioqueue_register_sock2(pool, ioqueue, sock,
+ (opt? opt->grp_lock : NULL),
+ asock, &ioq_cb, &asock->key);
if (status != PJ_SUCCESS) {
pj_activesock_close(asock);
return status;
@@ -283,10 +292,10 @@ PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
return PJ_SUCCESS;
}
-
PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
{
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
+ asock->shutdown = SHUT_RX | SHUT_TX;
if (asock->key) {
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
@@ -448,6 +457,10 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+ /* Ignore if we've been shutdown */
+ if (asock->shutdown & SHUT_RX)
+ return;
+
do {
unsigned flags;
@@ -569,6 +582,10 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
if (!ret)
return;
+ /* Also stop further read if we've been shutdown */
+ if (asock->shutdown & SHUT_RX)
+ return;
+
/* Only stream oriented socket may leave data in the packet */
if (asock->stream_oriented) {
r->size = remainder;
@@ -648,6 +665,9 @@ PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
{
PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
+ if (asock->shutdown & SHUT_TX)
+ return PJ_EINVALIDOP;
+
send_key->activesock_data = NULL;
if (asock->whole_data) {
@@ -698,6 +718,9 @@ PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
PJ_EINVAL);
+ if (asock->shutdown & SHUT_TX)
+ return PJ_EINVALIDOP;
+
return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
addr, addr_len);
}
@@ -711,6 +734,13 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+ /* Ignore if we've been shutdown. This may cause data to be partially
+ * sent even when 'wholedata' was requested if the OS only sent partial
+ * buffer.
+ */
+ if (asock->shutdown & SHUT_TX)
+ return;
+
if (bytes_sent > 0 && op_key->activesock_data) {
/* whole_data is requested. Make sure we send all the data */
struct send_data *sd = (struct send_data*)op_key->activesock_data;
@@ -756,6 +786,10 @@ PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
+ /* Ignore if we've been shutdown */
+ if (asock->shutdown)
+ return PJ_EINVALIDOP;
+
asock->accept_op = (struct accept_op*)
pj_pool_calloc(pool, asock->async_count,
sizeof(struct accept_op));
@@ -798,6 +832,10 @@ static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
PJ_UNUSED_ARG(new_sock);
+ /* Ignore if we've been shutdown */
+ if (asock->shutdown)
+ return;
+
do {
if (status == asock->last_err && status != PJ_SUCCESS) {
asock->err_counter++;
@@ -835,6 +873,10 @@ static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
pj_sock_close(accept_op->new_sock);
}
+ /* Don't start another accept() if we've been shutdown */
+ if (asock->shutdown)
+ return;
+
/* Prepare next accept() */
accept_op->new_sock = PJ_INVALID_SOCKET;
accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
@@ -853,6 +895,10 @@ PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
int addr_len)
{
PJ_UNUSED_ARG(pool);
+
+ if (asock->shutdown)
+ return PJ_EINVALIDOP;
+
return pj_ioqueue_connect(asock->key, remaddr, addr_len);
}
@@ -861,6 +907,10 @@ static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
{
pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+ /* Ignore if we've been shutdown */
+ if (asock->shutdown)
+ return;
+
if (asock->cb.on_connect_complete) {
pj_bool_t ret;
diff --git a/pjlib/src/pj/errno.c b/pjlib/src/pj/errno.c
index 615d1450..4726b33d 100644
--- a/pjlib/src/pj/errno.c
+++ b/pjlib/src/pj/errno.c
@@ -77,7 +77,8 @@ static const struct
PJ_BUILD_ERR(PJ_ETOOSMALL, "Size is too short"),
PJ_BUILD_ERR(PJ_EIGNORED, "Ignored"),
PJ_BUILD_ERR(PJ_EIPV6NOTSUP, "IPv6 is not supported"),
- PJ_BUILD_ERR(PJ_EAFNOTSUP, "Unsupported address family")
+ PJ_BUILD_ERR(PJ_EAFNOTSUP, "Unsupported address family"),
+ PJ_BUILD_ERR(PJ_EGONE, "Object no longer exists")
};
#endif /* PJ_HAS_ERROR_STRING */
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 296aea90..a5dc3f95 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -70,6 +70,7 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
pj_sock_t sock,
+ pj_grp_lock_t *grp_lock,
void *user_data,
const pj_ioqueue_callback *cb)
{
@@ -114,10 +115,18 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
/* Create mutex for the key. */
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
- rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+ rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock);
#endif
+ if (rc != PJ_SUCCESS)
+ return rc;
+
+ /* Group lock */
+ key->grp_lock = grp_lock;
+ if (key->grp_lock) {
+ pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0);
+ }
- return rc;
+ return PJ_SUCCESS;
}
/*
@@ -189,10 +198,10 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
/* Lock the key. */
- pj_mutex_lock(h->mutex);
+ pj_ioqueue_lock_key(h);
if (IS_CLOSING(h)) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
@@ -261,7 +270,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
} else {
has_lock = PJ_TRUE;
}
@@ -272,7 +281,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
/* Unlock if we still hold the lock */
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
/* Done. */
@@ -379,7 +388,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -392,11 +402,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
} else {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
/* Done. */
@@ -406,7 +416,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
@@ -415,10 +425,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
pj_status_t rc;
/* Lock the key. */
- pj_mutex_lock(h->mutex);
+ pj_ioqueue_lock_key(h);
if (IS_CLOSING(h)) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
@@ -453,7 +463,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -466,7 +477,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
else
@@ -567,7 +578,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -580,7 +592,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
} else {
@@ -589,7 +601,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
@@ -599,19 +611,19 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
{
pj_bool_t has_lock;
- pj_mutex_lock(h->mutex);
+ pj_ioqueue_lock_key(h);
if (!h->connecting) {
/* It is possible that more than one thread was woken up, thus
* the remaining thread will see h->connecting as zero because
* it has been processed by other thread.
*/
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
if (IS_CLOSING(h)) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
@@ -629,7 +641,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -651,7 +664,7 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
@@ -713,18 +726,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
read_op->size = *length;
read_op->flags = flags;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -789,18 +802,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -903,18 +916,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
write_op->written = 0;
write_op->flags = flags;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -1050,18 +1063,18 @@ retry_on_restart:
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
write_op->rmt_addrlen = addrlen;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -1127,18 +1140,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
accept_op->addrlen= addrlen;
accept_op->local_addr = local;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->accept_list, accept_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -1171,18 +1184,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
} else {
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
/* Pending! */
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous
* check in multithreaded app. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
key->connecting = PJ_TRUE;
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
} else {
/* Error! */
@@ -1228,7 +1241,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
* Find the operation key in all pending operation list to
* really make sure that it's still there; then call the callback.
*/
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Find the operation in the pending read list. */
op_rec = (struct generic_operation*)key->read_list.next;
@@ -1236,7 +1249,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = PJ_IOQUEUE_OP_NONE;
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
(*key->cb.on_read_complete)(key, op_key, bytes_status);
return PJ_SUCCESS;
@@ -1250,7 +1263,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = PJ_IOQUEUE_OP_NONE;
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
(*key->cb.on_write_complete)(key, op_key, bytes_status);
return PJ_SUCCESS;
@@ -1264,7 +1277,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = PJ_IOQUEUE_OP_NONE;
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
(*key->cb.on_accept_complete)(key, op_key,
PJ_INVALID_SOCKET,
@@ -1274,7 +1287,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
op_rec = op_rec->next;
}
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EINVALIDOP;
}
@@ -1304,11 +1317,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
{
- return pj_mutex_lock(key->mutex);
+ if (key->grp_lock)
+ return pj_grp_lock_acquire(key->grp_lock);
+ else
+ return pj_lock_acquire(key->lock);
}
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
{
- return pj_mutex_unlock(key->mutex);
+ if (key->grp_lock)
+ return pj_grp_lock_release(key->grp_lock);
+ else
+ return pj_lock_release(key->lock);
}
+
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index 3a41051b..3bdbb524 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -101,7 +101,8 @@ union operation_key
#define DECLARE_COMMON_KEY \
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
pj_ioqueue_t *ioqueue; \
- pj_mutex_t *mutex; \
+ pj_grp_lock_t *grp_lock; \
+ pj_lock_t *lock; \
pj_bool_t inside_callback; \
pj_bool_t destroy_requested; \
pj_bool_t allow_concurrent; \
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index 4c547791..18b37e8a 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -262,11 +262,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
key->ref_count = 0;
- rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+ rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
if (rc != PJ_SUCCESS) {
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
pj_mutex_destroy(ioqueue->ref_cnt_mutex);
@@ -323,19 +323,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
/* Destroy reference counters */
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
@@ -422,7 +422,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
if (status < 0) {
rc = pj_get_os_error();
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = NULL;
TRACE_((THIS_FILE,
"pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
@@ -497,7 +497,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
* the key. We need to lock the key before ioqueue here to prevent
* deadlock.
*/
- pj_mutex_lock(key->mutex);
+ pj_lock_acquire(key->lock);
/* Also lock ioqueue */
pj_lock_acquire(ioqueue->lock);
@@ -531,9 +531,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
decrement_counter(key);
/* Done. */
- pj_mutex_unlock(key->mutex);
+ pj_lock_release(key->lock);
#else
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
#endif
return PJ_SUCCESS;
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 603b7ee8..0d97c0a6 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -39,6 +39,7 @@
#include <pj/sock_select.h>
#include <pj/sock_qos.h>
#include <pj/errno.h>
+#include <pj/rand.h>
/* Now that we have access to OS'es <sys/select>, lets check again that
* PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
@@ -237,11 +238,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
key->ref_count = 0;
- rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+ rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
if (rc != PJ_SUCCESS) {
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
pj_mutex_destroy(ioqueue->ref_cnt_mutex);
@@ -284,19 +285,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
/* Destroy reference counters */
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
@@ -312,9 +313,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
*
* Register socket handle to ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
+PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_sock_t sock,
+ pj_grp_lock_t *grp_lock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **p_key)
@@ -358,7 +360,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
#endif
- rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+ rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
if (rc != PJ_SUCCESS) {
key = NULL;
goto on_return;
@@ -386,12 +388,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
on_return:
/* On error, socket may be left in non-blocking mode. */
+ if (rc != PJ_SUCCESS) {
+ if (key->grp_lock)
+ pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
+ }
*p_key = key;
pj_lock_release(ioqueue->lock);
return rc;
}
+PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
+ pj_ioqueue_t *ioqueue,
+ pj_sock_t sock,
+ void *user_data,
+ const pj_ioqueue_callback *cb,
+ pj_ioqueue_key_t **p_key)
+{
+ return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
+ cb, p_key);
+}
+
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Increment key's reference counter */
static void increment_counter(pj_ioqueue_key_t *key)
@@ -446,7 +463,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
* the key. We need to lock the key before ioqueue here to prevent
* deadlock.
*/
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Also lock ioqueue */
pj_lock_acquire(ioqueue->lock);
@@ -485,9 +502,34 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
decrement_counter(key);
/* Done. */
- pj_mutex_unlock(key->mutex);
+ if (key->grp_lock) {
+ /* just dec_ref and unlock. we will set grp_lock to NULL
+ * elsewhere */
+ pj_grp_lock_t *grp_lock = key->grp_lock;
+ // Don't set grp_lock to NULL otherwise the other thread
+ // will crash. Just leave it as dangling pointer, but this
+ // should be safe
+ //key->grp_lock = NULL;
+ pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
+ pj_grp_lock_release(grp_lock);
+ } else {
+ pj_ioqueue_unlock_key(key);
+ }
#else
- pj_mutex_destroy(key->mutex);
+ if (key->grp_lock) {
+ /* set grp_lock to NULL and unlock */
+ pj_grp_lock_t *grp_lock = key->grp_lock;
+ // Don't set grp_lock to NULL otherwise the other thread
+ // will crash. Just leave it as dangling pointer, but this
+ // should be safe
+ //key->grp_lock = NULL;
+ pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
+ pj_grp_lock_release(grp_lock);
+ } else {
+ pj_ioqueue_unlock_key(key);
+ }
+
+ pj_lock_destroy(key->lock);
#endif
return PJ_SUCCESS;
@@ -620,6 +662,10 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue)
if (PJ_TIME_VAL_GTE(now, h->free_time)) {
pj_list_erase(h);
+ // Don't set grp_lock to NULL otherwise the other thread
+ // will crash. Just leave it as dangling pointer, but this
+ // should be safe
+ //h->grp_lock = NULL;
pj_list_push_back(&ioqueue->free_list, h);
}
h = next;
@@ -781,7 +827,7 @@ on_error:
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
- int count, counter;
+ int count, i, counter;
pj_ioqueue_key_t *h;
struct event
{
@@ -892,8 +938,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#endif
}
+ for (i=0; i<counter; ++i) {
+ if (event[i].key->grp_lock)
+ pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0);
+ }
+
+ PJ_RACE_ME(5);
+
pj_lock_release(ioqueue->lock);
+ PJ_RACE_ME(5);
+
count = counter;
/* Now process all events. The dispatch functions will take care
@@ -918,6 +973,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(event[counter].key);
#endif
+
+ if (event[counter].key->grp_lock)
+ pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock,
+ "ioqueue", 0);
}
diff --git a/pjlib/src/pj/lock.c b/pjlib/src/pj/lock.c
index c281f8b3..04b1b670 100644
--- a/pjlib/src/pj/lock.c
+++ b/pjlib/src/pj/lock.c
@@ -20,10 +20,12 @@
#include <pj/lock.h>
#include <pj/os.h>
#include <pj/assert.h>
+#include <pj/log.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/errno.h>
+#define THIS_FILE "lock.c"
typedef void LOCK_OBJ;
@@ -196,3 +198,516 @@ PJ_DEF(pj_status_t) pj_lock_destroy( pj_lock_t *lock )
return (*lock->destroy)(lock->lock_object);
}
+
+/******************************************************************************
+ * Group lock
+ */
+
+/* Individual lock in the group lock */
+typedef struct grp_lock_item
+{
+ PJ_DECL_LIST_MEMBER(struct grp_lock_item);
+ int prio;
+ pj_lock_t *lock;
+
+} grp_lock_item;
+
+/* Destroy callbacks */
+typedef struct grp_destroy_callback
+{
+ PJ_DECL_LIST_MEMBER(struct grp_destroy_callback);
+ void *comp;
+ void (*handler)(void*);
+} grp_destroy_callback;
+
+#if PJ_GRP_LOCK_DEBUG
+/* Store each add_ref caller */
+typedef struct grp_lock_ref
+{
+ PJ_DECL_LIST_MEMBER(struct grp_lock_ref);
+ const char *file;
+ int line;
+} grp_lock_ref;
+#endif
+
+/* The group lock */
+struct pj_grp_lock_t
+{
+ pj_lock_t base;
+
+ pj_pool_t *pool;
+ pj_atomic_t *ref_cnt;
+ pj_lock_t *own_lock;
+
+ pj_thread_t *owner;
+ int owner_cnt;
+
+ grp_lock_item lock_list;
+ grp_destroy_callback destroy_list;
+
+#if PJ_GRP_LOCK_DEBUG
+ grp_lock_ref ref_list;
+ grp_lock_ref ref_free_list;
+#endif
+};
+
+
+PJ_DEF(void) pj_grp_lock_config_default(pj_grp_lock_config *cfg)
+{
+ pj_bzero(cfg, sizeof(*cfg));
+}
+
+static void grp_lock_set_owner_thread(pj_grp_lock_t *glock)
+{
+ if (!glock->owner) {
+ glock->owner = pj_thread_this();
+ glock->owner_cnt = 1;
+ } else {
+ pj_assert(glock->owner == pj_thread_this());
+ glock->owner_cnt++;
+ }
+}
+
+static void grp_lock_unset_owner_thread(pj_grp_lock_t *glock)
+{
+ pj_assert(glock->owner == pj_thread_this());
+ pj_assert(glock->owner_cnt > 0);
+ if (--glock->owner_cnt <= 0) {
+ glock->owner = NULL;
+ glock->owner_cnt = 0;
+ }
+}
+
+static pj_status_t grp_lock_acquire(LOCK_OBJ *p)
+{
+ pj_grp_lock_t *glock = (pj_grp_lock_t*)p;
+ grp_lock_item *lck;
+
+ pj_assert(pj_atomic_get(glock->ref_cnt) > 0);
+
+ lck = glock->lock_list.next;
+ while (lck != &glock->lock_list) {
+ pj_lock_acquire(lck->lock);
+ lck = lck->next;
+ }
+ grp_lock_set_owner_thread(glock);
+ pj_grp_lock_add_ref(glock);
+ return PJ_SUCCESS;
+}
+
+static pj_status_t grp_lock_tryacquire(LOCK_OBJ *p)
+{
+ pj_grp_lock_t *glock = (pj_grp_lock_t*)p;
+ grp_lock_item *lck;
+
+ pj_assert(pj_atomic_get(glock->ref_cnt) > 0);
+
+ lck = glock->lock_list.next;
+ while (lck != &glock->lock_list) {
+ pj_status_t status = pj_lock_tryacquire(lck->lock);
+ if (status != PJ_SUCCESS) {
+ lck = lck->prev;
+ while (lck != &glock->lock_list) {
+ pj_lock_release(lck->lock);
+ lck = lck->prev;
+ }
+ return status;
+ }
+ lck = lck->next;
+ }
+ grp_lock_set_owner_thread(glock);
+ pj_grp_lock_add_ref(glock);
+ return PJ_SUCCESS;
+}
+
+static pj_status_t grp_lock_release(LOCK_OBJ *p)
+{
+ pj_grp_lock_t *glock = (pj_grp_lock_t*)p;
+ grp_lock_item *lck;
+
+ grp_lock_unset_owner_thread(glock);
+
+ lck = glock->lock_list.prev;
+ while (lck != &glock->lock_list) {
+ pj_lock_release(lck->lock);
+ lck = lck->prev;
+ }
+ return pj_grp_lock_dec_ref(glock);
+}
+
+static pj_status_t grp_lock_destroy(LOCK_OBJ *p)
+{
+ pj_grp_lock_t *glock = (pj_grp_lock_t*)p;
+ pj_pool_t *pool = glock->pool;
+ grp_lock_item *lck;
+ grp_destroy_callback *cb;
+
+ if (!glock->pool) {
+ /* already destroyed?! */
+ return PJ_EINVAL;
+ }
+
+ /* Release all chained locks */
+ lck = glock->lock_list.next;
+ while (lck != &glock->lock_list) {
+ if (lck->lock != glock->own_lock) {
+ unsigned i;
+ for (i=0; i<glock->owner_cnt; ++i)
+ pj_lock_release(lck->lock);
+ }
+ lck = lck->next;
+ }
+
+ /* Call callbacks */
+ cb = glock->destroy_list.next;
+ while (cb != &glock->destroy_list) {
+ grp_destroy_callback *next = cb->next;
+ cb->handler(cb->comp);
+ cb = next;
+ }
+
+ pj_lock_destroy(glock->own_lock);
+ pj_atomic_destroy(glock->ref_cnt);
+ glock->pool = NULL;
+ pj_pool_release(pool);
+
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_grp_lock_create( pj_pool_t *pool,
+ const pj_grp_lock_config *cfg,
+ pj_grp_lock_t **p_grp_lock)
+{
+ pj_grp_lock_t *glock;
+ grp_lock_item *own_lock;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(pool && p_grp_lock, PJ_EINVAL);
+
+ PJ_UNUSED_ARG(cfg);
+
+ pool = pj_pool_create(pool->factory, "glck%p", 512, 512, NULL);
+ if (!pool)
+ return PJ_ENOMEM;
+
+ glock = PJ_POOL_ZALLOC_T(pool, pj_grp_lock_t);
+ glock->base.lock_object = glock;
+ glock->base.acquire = &grp_lock_acquire;
+ glock->base.tryacquire = &grp_lock_tryacquire;
+ glock->base.release = &grp_lock_release;
+ glock->base.destroy = &grp_lock_destroy;
+
+ glock->pool = pool;
+ pj_list_init(&glock->lock_list);
+ pj_list_init(&glock->destroy_list);
+#if PJ_GRP_LOCK_DEBUG
+ pj_list_init(&glock->ref_list);
+ pj_list_init(&glock->ref_free_list);
+#endif
+
+ status = pj_atomic_create(pool, 0, &glock->ref_cnt);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ status = pj_lock_create_recursive_mutex(pool, pool->obj_name,
+ &glock->own_lock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ own_lock = PJ_POOL_ZALLOC_T(pool, grp_lock_item);
+ own_lock->lock = glock->own_lock;
+ pj_list_push_back(&glock->lock_list, own_lock);
+
+ *p_grp_lock = glock;
+ return PJ_SUCCESS;
+
+on_error:
+ grp_lock_destroy(glock);
+ return status;
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_destroy( pj_grp_lock_t *grp_lock)
+{
+ return grp_lock_destroy(grp_lock);
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_acquire( pj_grp_lock_t *grp_lock)
+{
+ return grp_lock_acquire(grp_lock);
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_tryacquire( pj_grp_lock_t *grp_lock)
+{
+ return grp_lock_tryacquire(grp_lock);
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_release( pj_grp_lock_t *grp_lock)
+{
+ return grp_lock_release(grp_lock);
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_replace( pj_grp_lock_t *old,
+ pj_grp_lock_t *new)
+{
+ grp_destroy_callback *ocb;
+
+ /* Move handlers from old to new */
+ ocb = old->destroy_list.next;
+ while (ocb != &old->destroy_list) {
+ grp_destroy_callback *ncb;
+
+ ncb = PJ_POOL_ALLOC_T(new->pool, grp_destroy_callback);
+ ncb->comp = ocb->comp;
+ ncb->handler = ocb->handler;
+ pj_list_push_back(&new->destroy_list, ncb);
+
+ ocb = ocb->next;
+ }
+
+ pj_list_init(&old->destroy_list);
+
+ grp_lock_destroy(old);
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_add_handler( pj_grp_lock_t *glock,
+ pj_pool_t *pool,
+ void *comp,
+ void (*destroy)(void *comp))
+{
+ grp_destroy_callback *cb;
+
+ grp_lock_acquire(glock);
+
+ if (pool == NULL)
+ pool = glock->pool;
+
+ cb = PJ_POOL_ZALLOC_T(pool, grp_destroy_callback);
+ cb->comp = comp;
+ cb->handler = destroy;
+ pj_list_push_back(&glock->destroy_list, cb);
+
+ grp_lock_release(glock);
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_del_handler( pj_grp_lock_t *glock,
+ void *comp,
+ void (*destroy)(void *comp))
+{
+ grp_destroy_callback *cb;
+
+ grp_lock_acquire(glock);
+
+ cb = glock->destroy_list.next;
+ while (cb != &glock->destroy_list) {
+ if (cb->comp == comp && cb->handler == destroy)
+ break;
+ cb = cb->next;
+ }
+
+ if (cb != &glock->destroy_list)
+ pj_list_erase(cb);
+
+ grp_lock_release(glock);
+ return PJ_SUCCESS;
+}
+
+static pj_status_t grp_lock_add_ref(pj_grp_lock_t *glock)
+{
+ pj_atomic_inc(glock->ref_cnt);
+ return PJ_SUCCESS;
+}
+
+static pj_status_t grp_lock_dec_ref(pj_grp_lock_t *glock)
+{
+ int cnt; /* for debugging */
+ if ((cnt=pj_atomic_dec_and_get(glock->ref_cnt)) == 0) {
+ grp_lock_destroy(glock);
+ return PJ_EGONE;
+ }
+ pj_assert(cnt > 0);
+ pj_grp_lock_dump(glock);
+ return PJ_SUCCESS;
+}
+
+#if PJ_GRP_LOCK_DEBUG
+PJ_DEF(pj_status_t) pj_grp_lock_add_ref_dbg(pj_grp_lock_t *glock,
+ const char *file,
+ int line)
+{
+ grp_lock_ref *ref;
+ pj_status_t status;
+
+ pj_enter_critical_section();
+ if (!pj_list_empty(&glock->ref_free_list)) {
+ ref = glock->ref_free_list.next;
+ pj_list_erase(ref);
+ } else {
+ ref = PJ_POOL_ALLOC_T(glock->pool, grp_lock_ref);
+ }
+
+ ref->file = file;
+ ref->line = line;
+ pj_list_push_back(&glock->ref_list, ref);
+
+ pj_leave_critical_section();
+
+ status = grp_lock_add_ref(glock);
+
+ if (status != PJ_SUCCESS) {
+ pj_enter_critical_section();
+ pj_list_erase(ref);
+ pj_list_push_back(&glock->ref_free_list, ref);
+ pj_leave_critical_section();
+ }
+
+ return status;
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_dec_ref_dbg(pj_grp_lock_t *glock,
+ const char *file,
+ int line)
+{
+ grp_lock_ref *ref;
+
+ pj_enter_critical_section();
+ /* Find the same source file */
+ ref = glock->ref_list.next;
+ while (ref != &glock->ref_list) {
+ if (strcmp(ref->file, file) == 0) {
+ pj_list_erase(ref);
+ pj_list_push_back(&glock->ref_free_list, ref);
+ break;
+ }
+ ref = ref->next;
+ }
+ pj_leave_critical_section();
+
+ if (ref == &glock->ref_list) {
+ PJ_LOG(2,(THIS_FILE, "pj_grp_lock_dec_ref_dbg() could not find "
+ "matching ref for %s", file));
+ }
+
+ return grp_lock_dec_ref(glock);
+}
+#else
+PJ_DEF(pj_status_t) pj_grp_lock_add_ref(pj_grp_lock_t *glock)
+{
+ return grp_lock_add_ref(glock);
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_dec_ref(pj_grp_lock_t *glock)
+{
+ return grp_lock_dec_ref(glock);
+}
+#endif
+
+PJ_DEF(int) pj_grp_lock_get_ref(pj_grp_lock_t *glock)
+{
+ return pj_atomic_get(glock->ref_cnt);
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_chain_lock( pj_grp_lock_t *glock,
+ pj_lock_t *lock,
+ int pos)
+{
+ grp_lock_item *lck, *new_lck;
+ unsigned i;
+
+ grp_lock_acquire(glock);
+
+ for (i=0; i<glock->owner_cnt; ++i)
+ pj_lock_acquire(lock);
+
+ lck = glock->lock_list.next;
+ while (lck != &glock->lock_list) {
+ if (lck->prio >= pos)
+ break;
+ lck = lck->next;
+ }
+
+ new_lck = PJ_POOL_ZALLOC_T(glock->pool, grp_lock_item);
+ new_lck->prio = pos;
+ new_lck->lock = lock;
+ pj_list_insert_before(lck, new_lck);
+
+ /* this will also release the new lock */
+ grp_lock_release(glock);
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_grp_lock_unchain_lock( pj_grp_lock_t *glock,
+ pj_lock_t *lock)
+{
+ grp_lock_item *lck;
+
+ grp_lock_acquire(glock);
+
+ lck = glock->lock_list.next;
+ while (lck != &glock->lock_list) {
+ if (lck->lock == lock)
+ break;
+ lck = lck->next;
+ }
+
+ if (lck != &glock->lock_list) {
+ unsigned i;
+
+ pj_list_erase(lck);
+ for (i=0; i<glock->owner_cnt; ++i)
+ pj_lock_release(lck->lock);
+ }
+
+ grp_lock_release(glock);
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(void) pj_grp_lock_dump(pj_grp_lock_t *grp_lock)
+{
+#if PJ_GRP_LOCK_DEBUG
+ grp_lock_ref *ref = grp_lock->ref_list.next;
+ char info_buf[1000];
+ pj_str_t info;
+
+ info.ptr = info_buf;
+ info.slen = 0;
+
+ pj_grp_lock_acquire(grp_lock);
+ pj_enter_critical_section();
+
+ while (ref != &grp_lock->ref_list && info.slen < sizeof(info_buf)) {
+ char *start = info.ptr + info.slen;
+ int max_len = sizeof(info_buf) - info.slen;
+ int len;
+
+ len = pj_ansi_snprintf(start, max_len, "%s:%d ", ref->file, ref->line);
+ if (len < 1 || len > max_len) {
+ len = strlen(ref->file);
+ if (len > max_len - 1)
+ len = max_len - 1;
+
+ memcpy(start, ref->file, len);
+ start[len++] = ' ';
+ }
+
+ info.slen += len;
+
+ ref = ref->next;
+ }
+
+ if (ref != &grp_lock->ref_list) {
+ int i;
+ for (i=0; i<4; ++i)
+ info_buf[sizeof(info_buf)-i-1] = '.';
+ }
+ info.ptr[info.slen-1] = '\0';
+
+ pj_leave_critical_section();
+ pj_grp_lock_release(grp_lock);
+
+ PJ_LOG(4,(THIS_FILE, "Group lock %p, ref_cnt=%d. Reference holders: %s",
+ grp_lock, pj_grp_lock_get_ref(grp_lock), info.ptr));
+#endif
+}
diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c
index 810e4b0d..252f7fcc 100644
--- a/pjlib/src/pj/os_core_unix.c
+++ b/pjlib/src/pj/os_core_unix.c
@@ -97,7 +97,18 @@ struct pj_sem_t
#if defined(PJ_HAS_EVENT_OBJ) && PJ_HAS_EVENT_OBJ != 0
struct pj_event_t
{
- char obj_name[PJ_MAX_OBJ_NAME];
+ enum event_state {
+ EV_STATE_OFF,
+ EV_STATE_SET,
+ EV_STATE_PULSED
+ } state;
+
+ pj_mutex_t mutex;
+ pthread_cond_t cond;
+
+ pj_bool_t auto_reset;
+ unsigned threads_waiting;
+ unsigned threads_to_release;
};
#endif /* PJ_HAS_EVENT_OBJ */
@@ -1700,13 +1711,48 @@ PJ_DEF(pj_status_t) pj_event_create(pj_pool_t *pool, const char *name,
pj_bool_t manual_reset, pj_bool_t initial,
pj_event_t **ptr_event)
{
- pj_assert(!"Not supported!");
- PJ_UNUSED_ARG(pool);
- PJ_UNUSED_ARG(name);
- PJ_UNUSED_ARG(manual_reset);
- PJ_UNUSED_ARG(initial);
- PJ_UNUSED_ARG(ptr_event);
- return PJ_EINVALIDOP;
+ pj_event_t *event;
+
+ event = PJ_POOL_ALLOC_T(pool, pj_event_t);
+
+ init_mutex(&event->mutex, name, PJ_MUTEX_SIMPLE);
+ pthread_cond_init(&event->cond, 0);
+ event->auto_reset = !manual_reset;
+ event->threads_waiting = 0;
+
+ if (initial) {
+ event->state = EV_STATE_SET;
+ event->threads_to_release = 1;
+ } else {
+ event->state = EV_STATE_OFF;
+ event->threads_to_release = 0;
+ }
+
+ *ptr_event = event;
+ return PJ_SUCCESS;
+}
+
+static void event_on_one_release(pj_event_t *event)
+{
+ if (event->state == EV_STATE_SET) {
+ if (event->auto_reset) {
+ event->threads_to_release = 0;
+ event->state = EV_STATE_OFF;
+ } else {
+ /* Manual reset remains on */
+ }
+ } else {
+ if (event->auto_reset) {
+ /* Only release one */
+ event->threads_to_release = 0;
+ event->state = EV_STATE_OFF;
+ } else {
+ event->threads_to_release--;
+ pj_assert(event->threads_to_release >= 0);
+ if (event->threads_to_release==0)
+ event->state = EV_STATE_OFF;
+ }
+ }
}
/*
@@ -1714,8 +1760,14 @@ PJ_DEF(pj_status_t) pj_event_create(pj_pool_t *pool, const char *name,
*/
PJ_DEF(pj_status_t) pj_event_wait(pj_event_t *event)
{
- PJ_UNUSED_ARG(event);
- return PJ_EINVALIDOP;
+ pthread_mutex_lock(&event->mutex.mutex);
+ event->threads_waiting++;
+ while (event->state == EV_STATE_OFF)
+ pthread_cond_wait(&event->cond, &event->mutex.mutex);
+ event->threads_waiting--;
+ event_on_one_release(event);
+ pthread_mutex_unlock(&event->mutex.mutex);
+ return PJ_SUCCESS;
}
/*
@@ -1723,8 +1775,16 @@ PJ_DEF(pj_status_t) pj_event_wait(pj_event_t *event)
*/
PJ_DEF(pj_status_t) pj_event_trywait(pj_event_t *event)
{
- PJ_UNUSED_ARG(event);
- return PJ_EINVALIDOP;
+ pj_status_t status;
+
+ pthread_mutex_lock(&event->mutex.mutex);
+ status = event->state != EV_STATE_OFF ? PJ_SUCCESS : -1;
+ if (status==PJ_SUCCESS) {
+ event_on_one_release(event);
+ }
+ pthread_mutex_unlock(&event->mutex.mutex);
+
+ return status;
}
/*
@@ -1732,8 +1792,15 @@ PJ_DEF(pj_status_t) pj_event_trywait(pj_event_t *event)
*/
PJ_DEF(pj_status_t) pj_event_set(pj_event_t *event)
{
- PJ_UNUSED_ARG(event);
- return PJ_EINVALIDOP;
+ pthread_mutex_lock(&event->mutex.mutex);
+ event->threads_to_release = 1;
+ event->state = EV_STATE_SET;
+ if (event->auto_reset)
+ pthread_cond_signal(&event->cond);
+ else
+ pthread_cond_broadcast(&event->cond);
+ pthread_mutex_unlock(&event->mutex.mutex);
+ return PJ_SUCCESS;
}
/*
@@ -1741,8 +1808,18 @@ PJ_DEF(pj_status_t) pj_event_set(pj_event_t *event)
*/
PJ_DEF(pj_status_t) pj_event_pulse(pj_event_t *event)
{
- PJ_UNUSED_ARG(event);
- return PJ_EINVALIDOP;
+ pthread_mutex_lock(&event->mutex.mutex);
+ if (event->threads_waiting) {
+ event->threads_to_release = event->auto_reset ? 1 :
+ event->threads_waiting;
+ event->state = EV_STATE_PULSED;
+ if (event->threads_to_release==1)
+ pthread_cond_signal(&event->cond);
+ else
+ pthread_cond_broadcast(&event->cond);
+ }
+ pthread_mutex_unlock(&event->mutex.mutex);
+ return PJ_SUCCESS;
}
/*
@@ -1750,8 +1827,11 @@ PJ_DEF(pj_status_t) pj_event_pulse(pj_event_t *event)
*/
PJ_DEF(pj_status_t) pj_event_reset(pj_event_t *event)
{
- PJ_UNUSED_ARG(event);
- return PJ_EINVALIDOP;
+ pthread_mutex_lock(&event->mutex.mutex);
+ event->state = EV_STATE_OFF;
+ event->threads_to_release = 0;
+ pthread_mutex_unlock(&event->mutex.mutex);
+ return PJ_SUCCESS;
}
/*
@@ -1759,8 +1839,9 @@ PJ_DEF(pj_status_t) pj_event_reset(pj_event_t *event)
*/
PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event)
{
- PJ_UNUSED_ARG(event);
- return PJ_EINVALIDOP;
+ pj_mutex_destroy(&event->mutex);
+ pthread_cond_destroy(&event->cond);
+ return PJ_SUCCESS;
}
#endif /* PJ_HAS_EVENT_OBJ */
diff --git a/pjlib/src/pj/timer.c b/pjlib/src/pj/timer.c
index 767a261f..47209fbe 100644
--- a/pjlib/src/pj/timer.c
+++ b/pjlib/src/pj/timer.c
@@ -35,6 +35,7 @@
#include <pj/errno.h>
#include <pj/lock.h>
#include <pj/log.h>
+#include <pj/rand.h>
#define THIS_FILE "timer.c"
@@ -451,20 +452,27 @@ PJ_DEF(pj_timer_entry*) pj_timer_entry_init( pj_timer_entry *entry,
entry->id = id;
entry->user_data = user_data;
entry->cb = cb;
+ entry->_grp_lock = NULL;
return entry;
}
#if PJ_TIMER_DEBUG
-PJ_DEF(pj_status_t) pj_timer_heap_schedule_dbg( pj_timer_heap_t *ht,
- pj_timer_entry *entry,
- const pj_time_val *delay,
- const char *src_file,
- int src_line)
+static pj_status_t schedule_w_grp_lock_dbg(pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay,
+ pj_bool_t set_id,
+ int id_val,
+ pj_grp_lock_t *grp_lock,
+ const char *src_file,
+ int src_line)
#else
-PJ_DEF(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht,
- pj_timer_entry *entry,
- const pj_time_val *delay)
+static pj_status_t schedule_w_grp_lock(pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay,
+ pj_bool_t set_id,
+ int id_val,
+ pj_grp_lock_t *grp_lock)
#endif
{
pj_status_t status;
@@ -485,13 +493,66 @@ PJ_DEF(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht,
lock_timer_heap(ht);
status = schedule_entry(ht, entry, &expires);
+ if (status == PJ_SUCCESS) {
+ if (set_id)
+ entry->id = id_val;
+ entry->_grp_lock = grp_lock;
+ if (entry->_grp_lock) {
+ pj_grp_lock_add_ref(entry->_grp_lock);
+ }
+ }
unlock_timer_heap(ht);
return status;
}
-PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht,
- pj_timer_entry *entry)
+
+#if PJ_TIMER_DEBUG
+PJ_DEF(pj_status_t) pj_timer_heap_schedule_dbg( pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay,
+ const char *src_file,
+ int src_line)
+{
+ return schedule_w_grp_lock_dbg(ht, entry, delay, PJ_FALSE, 1, NULL,
+ src_file, src_line);
+}
+
+PJ_DEF(pj_status_t) pj_timer_heap_schedule_w_grp_lock_dbg(
+ pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay,
+ int id_val,
+ pj_grp_lock_t *grp_lock,
+ const char *src_file,
+ int src_line)
+{
+ return schedule_w_grp_lock_dbg(ht, entry, delay, PJ_TRUE, id_val,
+ grp_lock, src_file, src_line);
+}
+
+#else
+PJ_DEF(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay)
+{
+ return schedule_w_grp_lock(ht, entry, delay, PJ_FALSE, 1, NULL);
+}
+
+PJ_DEF(pj_status_t) pj_timer_heap_schedule_w_grp_lock(pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ const pj_time_val *delay,
+ int id_val,
+ pj_grp_lock_t *grp_lock)
+{
+ return schedule_w_grp_lock(ht, entry, delay, PJ_TRUE, id_val, grp_lock);
+}
+#endif
+
+static int cancel_timer(pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ pj_bool_t set_id,
+ int id_val)
{
int count;
@@ -499,11 +560,32 @@ PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht,
lock_timer_heap(ht);
count = cancel(ht, entry, 1);
+ if (set_id) {
+ entry->id = id_val;
+ }
+ if (entry->_grp_lock) {
+ pj_grp_lock_t *grp_lock = entry->_grp_lock;
+ entry->_grp_lock = NULL;
+ pj_grp_lock_dec_ref(grp_lock);
+ }
unlock_timer_heap(ht);
return count;
}
+PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht,
+ pj_timer_entry *entry)
+{
+ return cancel_timer(ht, entry, PJ_FALSE, 0);
+}
+
+PJ_DEF(int) pj_timer_heap_cancel_if_active(pj_timer_heap_t *ht,
+ pj_timer_entry *entry,
+ int id_val)
+{
+ return cancel_timer(ht, entry, PJ_TRUE, id_val);
+}
+
PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht,
pj_time_val *next_delay )
{
@@ -527,11 +609,23 @@ PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht,
count < ht->max_entries_per_poll )
{
pj_timer_entry *node = remove_node(ht, 0);
+ pj_grp_lock_t *grp_lock;
+
++count;
+ grp_lock = node->_grp_lock;
+ node->_grp_lock = NULL;
+
unlock_timer_heap(ht);
+
+ PJ_RACE_ME(5);
+
if (node->cb)
(*node->cb)(ht, node);
+
+ if (grp_lock)
+ pj_grp_lock_dec_ref(grp_lock);
+
lock_timer_heap(ht);
}
if (ht->cur_size && next_delay) {