summaryrefslogtreecommitdiff
path: root/pjlib/src
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-06 16:50:38 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-06 16:50:38 +0000
commit33a8c1cb59304d92d517e3ba511bf233c729597f (patch)
treee6cb65930121480465db749bf5916fa2708ca633 /pjlib/src
parent6d5fbe07f3dc84c10ea75c5584fe8b5513278d08 (diff)
Tested new ioqueue framework on Linux with select and epoll
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c1626
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.h215
-rw-r--r--pjlib/src/pj/ioqueue_epoll.c661
-rw-r--r--pjlib/src/pj/ioqueue_select.c394
-rw-r--r--pjlib/src/pj/os_core_unix.c2
-rw-r--r--pjlib/src/pj/sock_bsd.c4
-rw-r--r--pjlib/src/pjlib-test/ioq_perf.c76
7 files changed, 1297 insertions, 1681 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index b5599d9c..774d53e3 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -1,813 +1,813 @@
-/* $Id$ */
-
-#include <pj/ioqueue.h>
-#include <pj/errno.h>
-#include <pj/list.h>
-#include <pj/sock.h>
-#include <pj/lock.h>
-#include <pj/assert.h>
-#include <pj/string.h>
-
-
-static void ioqueue_init( pj_ioqueue_t *ioqueue )
-{
- ioqueue->lock = NULL;
- ioqueue->auto_delete_lock = 0;
-}
-
-static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
-{
- if (ioqueue->auto_delete_lock && ioqueue->lock )
- return pj_lock_destroy(ioqueue->lock);
- else
- return PJ_SUCCESS;
-}
-
-/*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
-
- if (ioqueue->auto_delete_lock && ioqueue->lock) {
- pj_lock_destroy(ioqueue->lock);
- }
-
- ioqueue->lock = lock;
- ioqueue->auto_delete_lock = auto_delete;
-
- return PJ_SUCCESS;
-}
-
-static pj_status_t ioqueue_init_key( pj_pool_t *pool,
- pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t sock,
- void *user_data,
- const pj_ioqueue_callback *cb)
-{
- pj_status_t rc;
- int optlen;
-
- key->ioqueue = ioqueue;
- key->fd = sock;
- key->user_data = user_data;
- pj_list_init(&key->read_list);
- pj_list_init(&key->write_list);
-#if PJ_HAS_TCP
- pj_list_init(&key->accept_list);
-#endif
-
- /* Save callback. */
- pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
-
- /* Get socket type. When socket type is datagram, some optimization
- * will be performed during send to allow parallel send operations.
- */
- optlen = sizeof(key->fd_type);
- rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
- &key->fd_type, &optlen);
- if (rc != PJ_SUCCESS)
- key->fd_type = PJ_SOCK_STREAM;
-
- /* Create mutex for the key. */
- rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
-
- return rc;
-}
-
-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
-{
- pj_mutex_destroy(key->mutex);
-}
-
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
- */
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(key != NULL, NULL);
- return key->user_data;
-}
-
-/*
- * pj_ioqueue_set_user_data()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data)
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
- key->user_data = user_data;
-
- return PJ_SUCCESS;
-}
-
-PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->write_list);
-}
-
-PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->read_list);
-}
-
-PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
-{
-#if PJ_HAS_TCP
- return !pj_list_empty(&key->accept_list);
-#else
- return 0;
-#endif
-}
-
-PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
-{
- return key->connecting;
-}
-
-
-/*
- * ioqueue_dispatch_event()
- *
- * Report occurence of an event in the key to be processed by the
- * framework.
- */
-void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
-{
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if (h->connecting) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
- /* Clear operation. */
- h->connecting = 0;
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
-#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Done. */
-
- } else
-#endif /* PJ_HAS_TCP */
- if (key_has_pending_write(h)) {
- /* Socket is writable. */
- struct write_operation *write_op;
- pj_ssize_t sent;
- pj_status_t send_rc;
-
- /* Get the first in the queue. */
- write_op = h->write_list.next;
-
- /* For datagrams, we can remove the write_op from the list
- * so that send() can work in parallel.
- */
- if (h->fd_type == PJ_SOCK_DGRAM) {
- pj_list_erase(write_op);
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- pj_mutex_unlock(h->mutex);
- }
-
- /* Send the data.
- * Unfortunately we must do this while holding key's mutex, thus
- * preventing parallel write on a single key.. :-((
- */
- sent = write_op->size - write_op->written;
- if (write_op->op == PJ_IOQUEUE_OP_SEND) {
- send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
- &sent, write_op->flags);
- } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
- send_rc = pj_sock_sendto(h->fd,
- write_op->buf+write_op->written,
- &sent, write_op->flags,
- &write_op->rmt_addr,
- write_op->rmt_addrlen);
- } else {
- pj_assert(!"Invalid operation type!");
- send_rc = PJ_EBUG;
- }
-
- if (send_rc == PJ_SUCCESS) {
- write_op->written += sent;
- } else {
- pj_assert(send_rc > 0);
- write_op->written = -send_rc;
- }
-
- /* Are we finished with this buffer? */
- if (send_rc!=PJ_SUCCESS ||
- write_op->written == (pj_ssize_t)write_op->size ||
- h->fd_type == PJ_SOCK_DGRAM)
- {
- if (h->fd_type != PJ_SOCK_DGRAM) {
- /* Write completion of the whole stream. */
- pj_list_erase(write_op);
-
- /* Clear operation if there's no more data to send. */
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- /* No need to hold mutex anymore */
- pj_mutex_unlock(h->mutex);
- }
-
- /* Call callback. */
- if (h->cb.on_write_complete) {
- (*h->cb.on_write_complete)(h,
- (pj_ioqueue_op_key_t*)write_op,
- write_op->written);
- }
-
- } else {
- pj_mutex_unlock(h->mutex);
- }
-
- /* Done. */
- } else {
- pj_assert(!"Descriptor is signaled but key "
- "has no pending operation!");
-
- pj_mutex_unlock(h->mutex);
- }
-}
-
-void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
-{
- pj_status_t rc;
-
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-# if PJ_HAS_TCP
- if (!pj_list_empty(&h->accept_list)) {
-
- struct accept_operation *accept_op;
-
- /* Get one accept operation from the list. */
- accept_op = h->accept_list.next;
- pj_list_erase(accept_op);
-
- /* Clear bit in fdset if there is no more pending accept */
- if (pj_list_empty(&h->accept_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- rc=pj_sock_accept(h->fd, accept_op->accept_fd,
- accept_op->rmt_addr, accept_op->addrlen);
- if (rc==PJ_SUCCESS && accept_op->local_addr) {
- rc = pj_sock_getsockname(*accept_op->accept_fd,
- accept_op->local_addr,
- accept_op->addrlen);
- }
-
- /* Call callback. */
- if (h->cb.on_accept_complete)
- (*h->cb.on_accept_complete)(h,
- (pj_ioqueue_op_key_t*)accept_op,
- *accept_op->accept_fd, rc);
-
- }
- else
-# endif
- if (key_has_pending_read(h)) {
- struct read_operation *read_op;
- pj_ssize_t bytes_read;
-
- pj_assert(!pj_list_empty(&h->read_list));
-
- /* Get one pending read operation from the list. */
- read_op = h->read_list.next;
- pj_list_erase(read_op);
-
- /* Clear fdset if there is no pending read. */
- if (pj_list_empty(&h->read_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- bytes_read = read_op->size;
-
- if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
- read_op->rmt_addr,
- read_op->rmt_addrlen);
- } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- } else {
- pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
- /*
- * User has specified pj_ioqueue_read().
- * On Win32, we should do ReadFile(). But because we got
- * here because of select() anyway, user must have put a
- * socket descriptor on h->fd, which in this case we can
- * just call pj_sock_recv() instead of ReadFile().
- * On Unix, user may put a file in h->fd, so we'll have
- * to call read() here.
- * This may not compile on systems which doesn't have
- * read(). That's why we only specify PJ_LINUX here so
- * that error is easier to catch.
- */
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
- // &bytes_read, NULL);
-# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
- bytes_read = read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
-# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
- bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
-# else
-# error "Implement read() for this platform!"
-# endif
- }
-
- if (rc != PJ_SUCCESS) {
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- /* On Win32, for UDP, WSAECONNRESET on the receive side
- * indicates that previous sending has triggered ICMP Port
- * Unreachable message.
- * But we wouldn't know at this point which one of previous
- * key that has triggered the error, since UDP socket can
- * be shared!
- * So we'll just ignore it!
- */
-
- if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
- //PJ_LOG(4,(THIS_FILE,
- // "Ignored ICMP port unreach. on key=%p", h));
- }
-# endif
-
- /* In any case we would report this to caller. */
- bytes_read = -rc;
- }
-
- /* Call callback. */
- if (h->cb.on_read_complete) {
- (*h->cb.on_read_complete)(h,
- (pj_ioqueue_op_key_t*)read_op,
- bytes_read);
- }
-
- } else {
- pj_mutex_unlock(h->mutex);
- }
-}
-
-
-void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *h )
-{
- pj_mutex_lock(h->mutex);
-
- if (!h->connecting) {
- /* It is possible that more than one thread was woken up, thus
- * the remaining thread will see h->connecting as zero because
- * it has been processed by other thread.
- */
- pj_mutex_unlock(h->mutex);
- return;
- }
-
- /* Clear operation. */
- h->connecting = 0;
-
- pj_mutex_unlock(h->mutex);
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-}
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags )
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recv(key->fd, buffer, &size, flags);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags,
- pj_sockaddr_t *addr,
- int *addrlen)
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
- addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
- read_op->rmt_addr = addr;
- read_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_send(key->fd, data, &sent, flags);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Schedule asynchronous send.
- */
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags,
- const pj_sockaddr_t *addr,
- int addrlen)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Check that address storage can hold the address parameter.
- */
- PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
-
- /*
- * Schedule asynchronous send.
- */
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND_TO;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
- pj_memcpy(&write_op->rmt_addr, addr, addrlen);
- write_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- struct accept_operation *accept_op;
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
-
- /* Fast track:
- * See if there's new connection available immediately.
- */
- if (pj_list_empty(&key->accept_list)) {
- status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! New connection is available! */
- if (local && addrlen) {
- status = pj_sock_getsockname(*new_sock, local, addrlen);
- if (status != PJ_SUCCESS) {
- pj_sock_close(*new_sock);
- *new_sock = PJ_INVALID_SOCKET;
- return status;
- }
- }
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * No connection is available immediately.
- * Schedule accept() operation to be completed when there is incoming
- * connection available.
- */
- accept_op = (struct accept_operation*)op_key;
-
- accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
- accept_op->accept_fd = new_sock;
- accept_op->rmt_addr = remote;
- accept_op->addrlen= addrlen;
- accept_op->local_addr = local;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->accept_list, accept_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
-{
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
-
- /* Check if socket has not been marked for connecting */
- if (key->connecting != 0)
- return PJ_EPENDING;
-
- status = pj_sock_connect(key->fd, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Connected! */
- return PJ_SUCCESS;
- } else {
- if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
- /* Pending! */
- pj_mutex_lock(key->mutex);
- key->connecting = PJ_TRUE;
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
- pj_mutex_unlock(key->mutex);
- return PJ_EPENDING;
- } else {
- /* Error! */
- return status;
- }
- }
-}
-#endif /* PJ_HAS_TCP */
-
+/* $Id$ */
+
+/*
+ * ioqueue_common_abs.c
+ *
+ * This contains common functionalities to emulate proactor pattern with
+ * various event dispatching mechanisms (e.g. select, epoll).
+ *
+ * This file will be included by the appropriate ioqueue implementation.
+ * This file is NOT supposed to be compiled as stand-alone source.
+ */
+
+static void ioqueue_init( pj_ioqueue_t *ioqueue )
+{
+ ioqueue->lock = NULL;
+ ioqueue->auto_delete_lock = 0;
+}
+
+static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
+{
+ if (ioqueue->auto_delete_lock && ioqueue->lock )
+ return pj_lock_destroy(ioqueue->lock);
+ else
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_set_lock()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
+ pj_lock_t *lock,
+ pj_bool_t auto_delete )
+{
+ PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
+
+ if (ioqueue->auto_delete_lock && ioqueue->lock) {
+ pj_lock_destroy(ioqueue->lock);
+ }
+
+ ioqueue->lock = lock;
+ ioqueue->auto_delete_lock = auto_delete;
+
+ return PJ_SUCCESS;
+}
+
+static pj_status_t ioqueue_init_key( pj_pool_t *pool,
+ pj_ioqueue_t *ioqueue,
+ pj_ioqueue_key_t *key,
+ pj_sock_t sock,
+ void *user_data,
+ const pj_ioqueue_callback *cb)
+{
+ pj_status_t rc;
+ int optlen;
+
+ key->ioqueue = ioqueue;
+ key->fd = sock;
+ key->user_data = user_data;
+ pj_list_init(&key->read_list);
+ pj_list_init(&key->write_list);
+#if PJ_HAS_TCP
+ pj_list_init(&key->accept_list);
+#endif
+
+ /* Save callback. */
+ pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+
+ /* Get socket type. When socket type is datagram, some optimization
+ * will be performed during send to allow parallel send operations.
+ */
+ optlen = sizeof(key->fd_type);
+ rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
+ &key->fd_type, &optlen);
+ if (rc != PJ_SUCCESS)
+ key->fd_type = PJ_SOCK_STREAM;
+
+ /* Create mutex for the key. */
+ rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+
+ return rc;
+}
+
+static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
+{
+ pj_mutex_destroy(key->mutex);
+}
+
+/*
+ * pj_ioqueue_get_user_data()
+ *
+ * Obtain value associated with a key.
+ */
+PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+{
+ PJ_ASSERT_RETURN(key != NULL, NULL);
+ return key->user_data;
+}
+
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+ key->user_data = user_data;
+
+ return PJ_SUCCESS;
+}
+
+PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
+{
+ return !pj_list_empty(&key->write_list);
+}
+
+PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
+{
+ return !pj_list_empty(&key->read_list);
+}
+
+PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
+{
+#if PJ_HAS_TCP
+ return !pj_list_empty(&key->accept_list);
+#else
+ return 0;
+#endif
+}
+
+PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
+{
+ return key->connecting;
+}
+
+
+/*
+ * ioqueue_dispatch_event()
+ *
+ * Report occurence of an event in the key to be processed by the
+ * framework.
+ */
+void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
+{
+ /* Lock the key. */
+ pj_mutex_lock(h->mutex);
+
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
+ if (h->connecting) {
+ /* Completion of connect() operation */
+ pj_ssize_t bytes_transfered;
+
+ /* Clear operation. */
+ h->connecting = 0;
+
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
+#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
+ /* from connect(2):
+ * On Linux, use getsockopt to read the SO_ERROR option at
+ * level SOL_SOCKET to determine whether connect() completed
+ * successfully (if SO_ERROR is zero).
+ */
+ int value;
+ socklen_t vallen = sizeof(value);
+ int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
+ &value, &vallen);
+ if (gs_rc != 0) {
+ /* Argh!! What to do now???
+ * Just indicate that the socket is connected. The
+ * application will get error as soon as it tries to use
+ * the socket to send/receive.
+ */
+ bytes_transfered = 0;
+ } else {
+ bytes_transfered = value;
+ }
+#elif defined(PJ_WIN32) && PJ_WIN32!=0
+ bytes_transfered = 0; /* success */
+#else
+ /* Excellent information in D.J. Bernstein page:
+ * http://cr.yp.to/docs/connect.html
+ *
+ * Seems like the most portable way of detecting connect()
+ * failure is to call getpeername(). If socket is connected,
+ * getpeername() will return 0. If the socket is not connected,
+ * it will return ENOTCONN, and read(fd, &ch, 1) will produce
+ * the right errno through error slippage. This is a combination
+ * of suggestions from Douglas C. Schmidt and Ken Keys.
+ */
+ int gp_rc;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
+ bytes_transfered = gp_rc;
+#endif
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, bytes_transfered);
+
+ /* Done. */
+
+ } else
+#endif /* PJ_HAS_TCP */
+ if (key_has_pending_write(h)) {
+ /* Socket is writable. */
+ struct write_operation *write_op;
+ pj_ssize_t sent;
+ pj_status_t send_rc;
+
+ /* Get the first in the queue. */
+ write_op = h->write_list.next;
+
+ /* For datagrams, we can remove the write_op from the list
+ * so that send() can work in parallel.
+ */
+ if (h->fd_type == PJ_SOCK_DGRAM) {
+ pj_list_erase(write_op);
+ if (pj_list_empty(&h->write_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+
+ pj_mutex_unlock(h->mutex);
+ }
+
+ /* Send the data.
+ * Unfortunately we must do this while holding key's mutex, thus
+ * preventing parallel write on a single key.. :-((
+ */
+ sent = write_op->size - write_op->written;
+ if (write_op->op == PJ_IOQUEUE_OP_SEND) {
+ send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
+ &sent, write_op->flags);
+ } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
+ send_rc = pj_sock_sendto(h->fd,
+ write_op->buf+write_op->written,
+ &sent, write_op->flags,
+ &write_op->rmt_addr,
+ write_op->rmt_addrlen);
+ } else {
+ pj_assert(!"Invalid operation type!");
+ send_rc = PJ_EBUG;
+ }
+
+ if (send_rc == PJ_SUCCESS) {
+ write_op->written += sent;
+ } else {
+ pj_assert(send_rc > 0);
+ write_op->written = -send_rc;
+ }
+
+ /* Are we finished with this buffer? */
+ if (send_rc!=PJ_SUCCESS ||
+ write_op->written == (pj_ssize_t)write_op->size ||
+ h->fd_type == PJ_SOCK_DGRAM)
+ {
+ if (h->fd_type != PJ_SOCK_DGRAM) {
+ /* Write completion of the whole stream. */
+ pj_list_erase(write_op);
+
+ /* Clear operation if there's no more data to send. */
+ if (pj_list_empty(&h->write_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+
+ /* No need to hold mutex anymore */
+ pj_mutex_unlock(h->mutex);
+ }
+
+ /* Call callback. */
+ if (h->cb.on_write_complete) {
+ (*h->cb.on_write_complete)(h,
+ (pj_ioqueue_op_key_t*)write_op,
+ write_op->written);
+ }
+
+ } else {
+ pj_mutex_unlock(h->mutex);
+ }
+
+ /* Done. */
+ } else {
+ pj_assert(!"Descriptor is signaled but key "
+ "has no pending operation!");
+
+ pj_mutex_unlock(h->mutex);
+ }
+}
+
+void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
+{
+ pj_status_t rc;
+
+ /* Lock the key. */
+ pj_mutex_lock(h->mutex);
+
+# if PJ_HAS_TCP
+ if (!pj_list_empty(&h->accept_list)) {
+
+ struct accept_operation *accept_op;
+
+ /* Get one accept operation from the list. */
+ accept_op = h->accept_list.next;
+ pj_list_erase(accept_op);
+
+ /* Clear bit in fdset if there is no more pending accept */
+ if (pj_list_empty(&h->accept_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
+ rc=pj_sock_accept(h->fd, accept_op->accept_fd,
+ accept_op->rmt_addr, accept_op->addrlen);
+ if (rc==PJ_SUCCESS && accept_op->local_addr) {
+ rc = pj_sock_getsockname(*accept_op->accept_fd,
+ accept_op->local_addr,
+ accept_op->addrlen);
+ }
+
+ /* Call callback. */
+ if (h->cb.on_accept_complete) {
+ (*h->cb.on_accept_complete)(h,
+ (pj_ioqueue_op_key_t*)accept_op,
+ *accept_op->accept_fd, rc);
+ }
+
+ }
+ else
+# endif
+ if (key_has_pending_read(h)) {
+ struct read_operation *read_op;
+ pj_ssize_t bytes_read;
+
+ /* Get one pending read operation from the list. */
+ read_op = h->read_list.next;
+ pj_list_erase(read_op);
+
+ /* Clear fdset if there is no pending read. */
+ if (pj_list_empty(&h->read_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
+ bytes_read = read_op->size;
+
+ if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
+ rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
+ read_op->rmt_addr,
+ read_op->rmt_addrlen);
+ } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ } else {
+ pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
+ /*
+ * User has specified pj_ioqueue_read().
+ * On Win32, we should do ReadFile(). But because we got
+ * here because of select() anyway, user must have put a
+ * socket descriptor on h->fd, which in this case we can
+ * just call pj_sock_recv() instead of ReadFile().
+ * On Unix, user may put a file in h->fd, so we'll have
+ * to call read() here.
+ * This may not compile on systems which doesn't have
+ * read(). That's why we only specify PJ_LINUX here so
+ * that error is easier to catch.
+ */
+# if defined(PJ_WIN32) && PJ_WIN32 != 0
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
+ // &bytes_read, NULL);
+# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
+ bytes_read = read(h->fd, read_op->buf, bytes_read);
+ rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
+# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
+ bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
+ rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
+# else
+# error "Implement read() for this platform!"
+# endif
+ }
+
+ if (rc != PJ_SUCCESS) {
+# if defined(PJ_WIN32) && PJ_WIN32 != 0
+ /* On Win32, for UDP, WSAECONNRESET on the receive side
+ * indicates that previous sending has triggered ICMP Port
+ * Unreachable message.
+ * But we wouldn't know at this point which one of previous
+ * key that has triggered the error, since UDP socket can
+ * be shared!
+ * So we'll just ignore it!
+ */
+
+ if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
+ //PJ_LOG(4,(THIS_FILE,
+ // "Ignored ICMP port unreach. on key=%p", h));
+ }
+# endif
+
+ /* In any case we would report this to caller. */
+ bytes_read = -rc;
+ }
+
+ /* Call callback. */
+ if (h->cb.on_read_complete) {
+ (*h->cb.on_read_complete)(h,
+ (pj_ioqueue_op_key_t*)read_op,
+ bytes_read);
+ }
+
+ } else {
+ pj_mutex_unlock(h->mutex);
+ }
+}
+
+
+void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
+ pj_ioqueue_key_t *h )
+{
+ pj_mutex_lock(h->mutex);
+
+ if (!h->connecting) {
+ /* It is possible that more than one thread was woken up, thus
+ * the remaining thread will see h->connecting as zero because
+ * it has been processed by other thread.
+ */
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
+
+ /* Clear operation. */
+ h->connecting = 0;
+
+ pj_mutex_unlock(h->mutex);
+
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, -1);
+}
+
+/*
+ * pj_ioqueue_recv()
+ *
+ * Start asynchronous recv() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ void *buffer,
+ pj_ssize_t *length,
+ unsigned flags )
+{
+ pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recv(key->fd, buffer, &size, flags);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ read_op = (struct read_operation*)op_key;
+
+ read_op->op = PJ_IOQUEUE_OP_RECV;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->read_list, read_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_recvfrom()
+ *
+ * Start asynchronous recvfrom() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ void *buffer,
+ pj_ssize_t *length,
+ unsigned flags,
+ pj_sockaddr_t *addr,
+ int *addrlen)
+{
+ pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
+ addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ read_op = (struct read_operation*)op_key;
+
+ read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+ read_op->rmt_addr = addr;
+ read_op->rmt_addrlen = addrlen;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->read_list, read_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_send()
+ *
+ * Start asynchronous send() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ const void *data,
+ pj_ssize_t *length,
+ unsigned flags)
+{
+ struct write_operation *write_op;
+ pj_status_t status;
+ pj_ssize_t sent;
+
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_send(key->fd, data, &sent, flags);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * Schedule asynchronous send.
+ */
+ write_op = (struct write_operation*)op_key;
+ write_op->op = PJ_IOQUEUE_OP_SEND;
+ write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->write_list, write_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+
+/*
+ * pj_ioqueue_sendto()
+ *
+ * Start asynchronous write() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ const void *data,
+ pj_ssize_t *length,
+ unsigned flags,
+ const pj_sockaddr_t *addr,
+ int addrlen)
+{
+ struct write_operation *write_op;
+ pj_status_t status;
+ pj_ssize_t sent;
+
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * Check that address storage can hold the address parameter.
+ */
+ PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
+
+ /*
+ * Schedule asynchronous send.
+ */
+ write_op = (struct write_operation*)op_key;
+ write_op->op = PJ_IOQUEUE_OP_SEND_TO;
+ write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+ pj_memcpy(&write_op->rmt_addr, addr, addrlen);
+ write_op->rmt_addrlen = addrlen;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->write_list, write_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+#if PJ_HAS_TCP
+/*
+ * Initiate overlapped accept() operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t *new_sock,
+ pj_sockaddr_t *local,
+ pj_sockaddr_t *remote,
+ int *addrlen)
+{
+ struct accept_operation *accept_op;
+ pj_status_t status;
+
+ /* check parameters. All must be specified! */
+ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+ /* Fast track:
+ * See if there's new connection available immediately.
+ */
+ if (pj_list_empty(&key->accept_list)) {
+ status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! New connection is available! */
+ if (local && addrlen) {
+ status = pj_sock_getsockname(*new_sock, local, addrlen);
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(*new_sock);
+ *new_sock = PJ_INVALID_SOCKET;
+ return status;
+ }
+ }
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * No connection is available immediately.
+ * Schedule accept() operation to be completed when there is incoming
+ * connection available.
+ */
+ accept_op = (struct accept_operation*)op_key;
+
+ accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
+ accept_op->accept_fd = new_sock;
+ accept_op->rmt_addr = remote;
+ accept_op->addrlen= addrlen;
+ accept_op->local_addr = local;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->accept_list, accept_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+/*
+ * Initiate overlapped connect() operation (well, it's non-blocking actually,
+ * since there's no overlapped version of connect()).
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
+ const pj_sockaddr_t *addr,
+ int addrlen )
+{
+ pj_status_t status;
+
+ /* check parameters. All must be specified! */
+ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+
+ /* Check if socket has not been marked for connecting */
+ if (key->connecting != 0)
+ return PJ_EPENDING;
+
+ status = pj_sock_connect(key->fd, addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Connected! */
+ return PJ_SUCCESS;
+ } else {
+ if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
+ /* Pending! */
+ pj_mutex_lock(key->mutex);
+ key->connecting = PJ_TRUE;
+ ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
+ pj_mutex_unlock(key->mutex);
+ return PJ_EPENDING;
+ } else {
+ /* Error! */
+ return status;
+ }
+ }
+}
+#endif /* PJ_HAS_TCP */
+
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index c6fc1ff6..1902ff46 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -1,107 +1,108 @@
-/* $Id */
-
-/* ioqueue_common_abs.h
- *
- * This file contains private declarations for abstracting various
- * event polling/dispatching mechanisms (e.g. select, poll, epoll)
- * to the ioqueue.
- */
-
-#include <pj/list.h>
-
-/*
- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
- * the correct error code.
- */
-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
-# error "Error reporting must be enabled for this function to work!"
-#endif
-
-
-struct generic_operation
-{
- PJ_DECL_LIST_MEMBER(struct generic_operation);
- pj_ioqueue_operation_e op;
-};
-
-struct read_operation
-{
- PJ_DECL_LIST_MEMBER(struct read_operation);
- pj_ioqueue_operation_e op;
-
- void *buf;
- pj_size_t size;
- unsigned flags;
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-};
-
-struct write_operation
-{
- PJ_DECL_LIST_MEMBER(struct write_operation);
- pj_ioqueue_operation_e op;
-
- char *buf;
- pj_size_t size;
- pj_ssize_t written;
- unsigned flags;
- pj_sockaddr_in rmt_addr;
- int rmt_addrlen;
-};
-
-#if PJ_HAS_TCP
-struct accept_operation
-{
- PJ_DECL_LIST_MEMBER(struct accept_operation);
- pj_ioqueue_operation_e op;
-
- pj_sock_t *accept_fd;
- pj_sockaddr_t *local_addr;
- pj_sockaddr_t *rmt_addr;
- int *addrlen;
-};
-#endif
-
-union operation_key
-{
- struct generic_operation generic;
- struct read_operation read;
- struct write_operation write;
-#if PJ_HAS_TCP
- struct accept_operation accept;
-#endif
-};
-
-#define DECLARE_COMMON_KEY \
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
- pj_ioqueue_t *ioqueue; \
- pj_mutex_t *mutex; \
- pj_sock_t fd; \
- int fd_type; \
- void *user_data; \
- pj_ioqueue_callback cb; \
- int connecting; \
- struct read_operation read_list; \
- struct write_operation write_list; \
- struct accept_operation accept_list;
-
-
-#define DECLARE_COMMON_IOQUEUE \
- pj_lock_t *lock; \
- pj_bool_t auto_delete_lock;
-
-
-enum ioqueue_event_type
-{
- NO_EVENT,
- READABLE_EVENT,
- WRITEABLE_EVENT,
- EXCEPTION_EVENT,
-};
-
-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type );
-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type);
+/* $Id */
+
+/* ioqueue_common_abs.h
+ *
+ * This file contains private declarations for abstracting various
+ * event polling/dispatching mechanisms (e.g. select, poll, epoll)
+ * to the ioqueue.
+ */
+
+#include <pj/list.h>
+
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+# error "Proper error reporting must be enabled for ioqueue to work!"
+#endif
+
+
+struct generic_operation
+{
+ PJ_DECL_LIST_MEMBER(struct generic_operation);
+ pj_ioqueue_operation_e op;
+};
+
+struct read_operation
+{
+ PJ_DECL_LIST_MEMBER(struct read_operation);
+ pj_ioqueue_operation_e op;
+
+ void *buf;
+ pj_size_t size;
+ unsigned flags;
+ pj_sockaddr_t *rmt_addr;
+ int *rmt_addrlen;
+};
+
+struct write_operation
+{
+ PJ_DECL_LIST_MEMBER(struct write_operation);
+ pj_ioqueue_operation_e op;
+
+ char *buf;
+ pj_size_t size;
+ pj_ssize_t written;
+ unsigned flags;
+ pj_sockaddr_in rmt_addr;
+ int rmt_addrlen;
+};
+
+#if PJ_HAS_TCP
+struct accept_operation
+{
+ PJ_DECL_LIST_MEMBER(struct accept_operation);
+ pj_ioqueue_operation_e op;
+
+ pj_sock_t *accept_fd;
+ pj_sockaddr_t *local_addr;
+ pj_sockaddr_t *rmt_addr;
+ int *addrlen;
+};
+#endif
+
+union operation_key
+{
+ struct generic_operation generic;
+ struct read_operation read;
+ struct write_operation write;
+#if PJ_HAS_TCP
+ struct accept_operation accept;
+#endif
+};
+
+#define DECLARE_COMMON_KEY \
+ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
+ pj_ioqueue_t *ioqueue; \
+ pj_mutex_t *mutex; \
+ pj_sock_t fd; \
+ int fd_type; \
+ void *user_data; \
+ pj_ioqueue_callback cb; \
+ int connecting; \
+ struct read_operation read_list; \
+ struct write_operation write_list; \
+ struct accept_operation accept_list;
+
+
+#define DECLARE_COMMON_IOQUEUE \
+ pj_lock_t *lock; \
+ pj_bool_t auto_delete_lock;
+
+
+enum ioqueue_event_type
+{
+ NO_EVENT,
+ READABLE_EVENT,
+ WRITEABLE_EVENT,
+ EXCEPTION_EVENT,
+};
+
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type );
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type);
+
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index 24f9bfbb..aa012531 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -1,5 +1,4 @@
/* $Id$
- *
*/
/*
* ioqueue_epoll.c
@@ -30,7 +29,7 @@
# define epoll_data data.ptr
# define epoll_data_type void*
-# define ioctl_val_type unsigned long*
+# define ioctl_val_type unsigned long
# define getsockopt_val_ptr int*
# define os_getsockopt getsockopt
# define os_ioctl ioctl
@@ -126,51 +125,20 @@
#define THIS_FILE "ioq_epoll"
-#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \
- (op & PJ_IOQUEUE_OP_RECV) || \
- (op & PJ_IOQUEUE_OP_RECV_FROM))
-#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \
- (op & PJ_IOQUEUE_OP_SEND) || \
- (op & PJ_IOQUEUE_OP_SEND_TO))
-
-
-#if PJ_HAS_TCP
-# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT)
-# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT)
-#else
-# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0
-# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0
-#endif
-
-
//#define TRACE_(expr) PJ_LOG(3,expr)
#define TRACE_(expr)
+/*
+ * Include common ioqueue abstraction.
+ */
+#include "ioqueue_common_abs.h"
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
{
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
- pj_sock_t fd;
- pj_ioqueue_operation_e op;
- void *user_data;
- pj_ioqueue_callback cb;
-
- void *rd_buf;
- unsigned rd_flags;
- pj_size_t rd_buflen;
- void *wr_buf;
- pj_size_t wr_buflen;
-
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-
- pj_sockaddr_t *local_addr;
- int *local_addrlen;
-
- pj_sock_t *accept_fd;
+ DECLARE_COMMON_KEY
};
/*
@@ -178,13 +146,18 @@ struct pj_ioqueue_key_t
*/
struct pj_ioqueue_t
{
- pj_lock_t *lock;
- pj_bool_t auto_delete_lock;
+ DECLARE_COMMON_IOQUEUE
+
unsigned max, count;
pj_ioqueue_key_t hlist;
int epfd;
};
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
+
/*
* pj_ioqueue_create()
*
@@ -192,37 +165,45 @@ struct pj_ioqueue_t
*/
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
- int max_threads,
pj_ioqueue_t **p_ioqueue)
{
- pj_ioqueue_t *ioque;
+ pj_ioqueue_t *ioqueue;
pj_status_t rc;
+ pj_lock_t *lock;
- PJ_UNUSED_ARG(max_threads);
+ /* Check that arguments are valid. */
+ PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
+ max_fd > 0, PJ_EINVAL);
- if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
- pj_assert(!"max_fd too large");
- return PJ_EINVAL;
- }
+ /* Check that size of pj_ioqueue_op_key_t is sufficient */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+
+ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+
+ ioqueue_init(ioqueue);
- ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
- ioque->max = max_fd;
- ioque->count = 0;
- pj_list_init(&ioque->hlist);
+ ioqueue->max = max_fd;
+ ioqueue->count = 0;
+ pj_list_init(&ioqueue->hlist);
- rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
+ rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
if (rc != PJ_SUCCESS)
return rc;
- ioque->auto_delete_lock = PJ_TRUE;
- ioque->epfd = os_epoll_create(max_fd);
- if (ioque->epfd < 0) {
+ rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+ if (rc != PJ_SUCCESS)
+ return rc;
+
+ ioqueue->epfd = os_epoll_create(max_fd);
+ if (ioqueue->epfd < 0) {
+ ioqueue_destroy(ioqueue);
return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
}
- PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
+ PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
- *p_ioqueue = ioque;
+ *p_ioqueue = ioqueue;
return PJ_SUCCESS;
}
@@ -231,47 +212,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*
* Destroy ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
- PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
- PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP);
-
- pj_lock_acquire(ioque->lock);
- os_close(ioque->epfd);
- ioque->epfd = 0;
- if (ioque->auto_delete_lock)
- pj_lock_destroy(ioque->lock);
-
- return PJ_SUCCESS;
-}
-
-/*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
-
- if (ioque->auto_delete_lock) {
- pj_lock_destroy(ioque->lock);
- }
-
- ioque->lock = lock;
- ioque->auto_delete_lock = auto_delete;
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+ PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
- return PJ_SUCCESS;
+ pj_lock_acquire(ioqueue->lock);
+ os_close(ioqueue->epfd);
+ ioqueue->epfd = 0;
+ return ioqueue_destroy(ioqueue);
}
-
/*
* pj_ioqueue_register_sock()
*
* Register a socket to ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
- pj_ioqueue_t *ioque,
+ pj_ioqueue_t *ioqueue,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
@@ -283,12 +241,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
int status;
pj_status_t rc = PJ_SUCCESS;
- PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
+ PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
cb && p_key, PJ_EINVAL);
- pj_lock_acquire(ioque->lock);
+ pj_lock_acquire(ioqueue->lock);
- if (ioque->count >= ioque->max) {
+ if (ioqueue->count >= ioqueue->max) {
rc = PJ_ETOOMANY;
TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
goto on_return;
@@ -305,16 +263,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
/* Create key. */
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- key->fd = sock;
- key->user_data = user_data;
- pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+ rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+ if (rc != PJ_SUCCESS) {
+ key = NULL;
+ goto on_return;
+ }
/* os_epoll_ctl. */
ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
ev.epoll_data = (epoll_data_type)key;
- status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev);
+ status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
if (status < 0) {
rc = pj_get_os_error();
+ key = NULL;
TRACE_((THIS_FILE,
"pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
status));
@@ -322,12 +283,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* Register */
- pj_list_insert_before(&ioque->hlist, key);
- ++ioque->count;
+ pj_list_insert_before(&ioqueue->hlist, key);
+ ++ioqueue->count;
on_return:
*p_key = key;
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return rc;
}
@@ -337,179 +298,116 @@ on_return:
*
* Unregister handle from ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key)
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
{
+ pj_ioqueue_t *ioqueue;
struct epoll_event ev;
int status;
- PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+ PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
- pj_lock_acquire(ioque->lock);
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
- pj_assert(ioque->count > 0);
- --ioque->count;
+ pj_assert(ioqueue->count > 0);
+ --ioqueue->count;
pj_list_erase(key);
ev.events = 0;
ev.epoll_data = (epoll_data_type)key;
- status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev);
+ status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
if (status != 0) {
pj_status_t rc = pj_get_os_error();
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return rc;
}
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
+
+ /* Destroy the key. */
+ ioqueue_destroy_key(key);
+
return PJ_SUCCESS;
}
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
*/
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type)
{
- PJ_ASSERT_RETURN(key != NULL, NULL);
- return key->user_data;
}
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type )
+{
+}
/*
* pj_ioqueue_poll()
*
*/
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
int i, count, processed;
- struct epoll_event events[16];
+ struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
int msec;
+ struct queue {
+ pj_ioqueue_key_t *key;
+ enum ioqueue_event_type event_type;
+ } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
PJ_CHECK_STACK();
msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
- count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec);
+ count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec);
if (count <= 0)
return count;
/* Lock ioqueue. */
- pj_lock_acquire(ioque->lock);
-
- processed = 0;
+ pj_lock_acquire(ioqueue->lock);
- for (i=0; i<count; ++i) {
+ for (processed=0, i=0; i<count; ++i) {
pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
events[i].epoll_data;
- pj_status_t rc;
-
- /*
- * Check for completion of read operations.
- */
- if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) {
- pj_ssize_t bytes_read = h->rd_buflen;
-
- if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0,
- h->rmt_addr, h->rmt_addrlen);
- } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
- } else {
- bytes_read = os_read( h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
- }
-
- if (rc != PJ_SUCCESS) {
- bytes_read = -rc;
- }
-
- h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |
- PJ_IOQUEUE_OP_RECV_FROM);
-
- /* Call callback. */
- (*h->cb.on_read_complete)(h, bytes_read);
- ++processed;
- }
/*
- * Check for completion of accept() operation.
+ * Check readability.
*/
- else if ((events[i].events & EPOLLIN) &&
- (h->op & PJ_IOQUEUE_OP_ACCEPT))
- {
- /* accept() must be the only operation specified on
- * server socket
- */
- pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT);
-
- rc = pj_sock_accept( h->fd, h->accept_fd,
- h->rmt_addr, h->rmt_addrlen);
- if (rc==PJ_SUCCESS && h->local_addr) {
- rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,
- h->local_addrlen);
- }
-
- h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
-
- /* Call callback. */
- (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
-
+ if ((events[i].events & EPOLLIN) &&
+ (key_has_pending_read(h) || key_has_pending_accept(h))) {
+ queue[processed].key = h;
+ queue[processed].event_type = READABLE_EVENT;
++processed;
}
/*
- * Check for completion of write operations.
+ * Check for writeability.
*/
- if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {
- /* Completion of write(), send(), or sendto() operation. */
-
- /* Clear operation. */
- h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |
- PJ_IOQUEUE_OP_SEND_TO);
-
- /* Call callback. */
- /* All data must have been sent? */
- (*h->cb.on_write_complete)(h, h->wr_buflen);
-
+ if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) {
+ queue[processed].key = h;
+ queue[processed].event_type = WRITEABLE_EVENT;
++processed;
}
+
#if PJ_HAS_TCP
/*
* Check for completion of connect() operation.
*/
- else if ((events[i].events & EPOLLOUT) &&
- (h->op & PJ_IOQUEUE_OP_CONNECT))
- {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- (getsockopt_val_ptr)&value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-
- /* Clear operation. */
- h->op &= (~PJ_IOQUEUE_OP_CONNECT);
-
- /* Call callback. */
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
+ if ((events[i].events & EPOLLOUT) && (h->connecting)) {
+ queue[processed].key = h;
+ queue[processed].event_type = WRITEABLE_EVENT;
++processed;
}
#endif /* PJ_HAS_TCP */
@@ -517,321 +415,32 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
/*
* Check for error condition.
*/
- if (events[i].events & EPOLLERR) {
- if (h->op & PJ_IOQUEUE_OP_CONNECT) {
- h->op &= ~PJ_IOQUEUE_OP_CONNECT;
-
- /* Call callback. */
- (*h->cb.on_connect_complete)(h, -1);
-
- ++processed;
- }
+ if (events[i].events & EPOLLERR && (h->connecting)) {
+ queue[processed].key = h;
+ queue[processed].event_type = EXCEPTION_EVENT;
+ ++processed;
}
}
-
- pj_lock_release(ioque->lock);
-
- return processed;
-}
-
-/*
- * pj_ioqueue_read()
- *
- * Start asynchronous read from the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen)
-{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_READ;
- key->rd_flags = 0;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen,
- unsigned flags )
-{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_RECV;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- key->rd_flags = flags;
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen,
- unsigned flags,
- pj_sockaddr_t *addr,
- int *addrlen)
-{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_RECV_FROM;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- key->rd_flags = flags;
- key->rmt_addr = addr;
- key->rmt_addrlen = addrlen;
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen)
-{
- pj_status_t rc;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_send() if it returns error. */
- rc = pj_sock_send(key->fd, data, &sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
- }
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_WRITE;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
-
- pj_lock_release(ioque->lock);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen,
- unsigned flags)
-{
- pj_status_t rc;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_send() if it returns error. */
- rc = pj_sock_send(key->fd, data, &sent, flags);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
- }
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_SEND;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
-
- pj_lock_release(ioque->lock);
-
- return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen,
- unsigned flags,
- const pj_sockaddr_t *addr,
- int addrlen)
-{
- pj_status_t rc;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_sendto() if it returns error. */
- rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
- }
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_SEND_TO;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- /* check parameters. All must be specified! */
- pj_assert(ioqueue && key && new_sock);
-
- /* Server socket must have no other operation! */
- pj_assert(key->op == 0);
-
- pj_lock_acquire(ioqueue->lock);
-
- key->op = PJ_IOQUEUE_OP_ACCEPT;
- key->accept_fd = new_sock;
- key->rmt_addr = remote;
- key->rmt_addrlen = addrlen;
- key->local_addr = local;
- key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */
-
pj_lock_release(ioqueue->lock);
- return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
-{
- pj_status_t rc;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
- /* Connecting socket must have no other operation! */
- PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
-
- rc = pj_sock_connect(key->fd, addr, addrlen);
- if (rc == PJ_SUCCESS) {
- /* Connected! */
- return PJ_SUCCESS;
- } else {
- if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||
- rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))
- {
- /* Pending! */
- pj_lock_acquire(ioqueue->lock);
- key->op = PJ_IOQUEUE_OP_CONNECT;
- pj_lock_release(ioqueue->lock);
- return PJ_EPENDING;
- } else {
- /* Error! */
- return rc;
- }
+ /* Now process the events. */
+ for (i=0; i<processed; ++i) {
+ switch (queue[i].event_type) {
+ case READABLE_EVENT:
+ ioqueue_dispatch_read_event(ioqueue, queue[i].key);
+ break;
+ case WRITEABLE_EVENT:
+ ioqueue_dispatch_write_event(ioqueue, queue[i].key);
+ break;
+ case EXCEPTION_EVENT:
+ ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
+ break;
+ case NO_EVENT:
+ pj_assert(!"Invalid event!");
+ break;
+ }
}
+
+ return processed;
}
-#endif /* PJ_HAS_TCP */
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 24e68564..c2051681 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -20,11 +20,11 @@
#include <pj/compat/socket.h>
#include <pj/sock_select.h>
#include <pj/errno.h>
-
-/*
- * Include declaration from common abstraction.
- */
-#include "ioqueue_common_abs.h"
+
+/*
+ * Include declaration from common abstraction.
+ */
+#include "ioqueue_common_abs.h"
/*
* ISSUES with ioqueue_select()
@@ -38,30 +38,30 @@
*
*/
#define THIS_FILE "ioq_select"
-
-/*
- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
- * the correct error code.
- */
-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
-# error "Error reporting must be enabled for this function to work!"
-#endif
-
-/**
- * Get the number of descriptors in the set. This is defined in sock_select.c
- * This function will only return the number of sockets set from PJ_FD_SET
- * operation. When the set is modified by other means (such as by select()),
- * the count will not be reflected here.
- *
- * That's why don't export this function in the header file, to avoid
- * misunderstanding.
- *
- * @param fdsetp The descriptor set.
- *
- * @return Number of descriptors in the set.
- */
-PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
-
+
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+# error "Error reporting must be enabled for this function to work!"
+#endif
+
+/**
+ * Get the number of descriptors in the set. This is defined in sock_select.c
+ * This function will only return the number of sockets set from PJ_FD_SET
+ * operation. When the set is modified by other means (such as by select()),
+ * the count will not be reflected here.
+ *
+ * That's why don't export this function in the header file, to avoid
+ * misunderstanding.
+ *
+ * @param fdsetp The descriptor set.
+ *
+ * @return Number of descriptors in the set.
+ */
+PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
+
/*
* During debugging build, VALIDATE_FD_SET is set.
@@ -72,12 +72,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
#else
# define VALIDATE_FD_SET 0
#endif
-
+
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
-{
+{
DECLARE_COMMON_KEY
};
@@ -86,7 +86,7 @@ struct pj_ioqueue_key_t
*/
struct pj_ioqueue_t
{
- DECLARE_COMMON_IOQUEUE
+ DECLARE_COMMON_IOQUEUE
unsigned max, count;
pj_ioqueue_key_t key_list;
@@ -96,11 +96,11 @@ struct pj_ioqueue_t
pj_fd_set_t xfdset;
#endif
};
-
-/* Include implementation for common abstraction after we declare
- * pj_ioqueue_key_t and pj_ioqueue_t.
- */
-#include "ioqueue_common_abs.c"
+
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
/*
* pj_ioqueue_create()
@@ -111,22 +111,22 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
pj_ioqueue_t **p_ioqueue)
{
- pj_ioqueue_t *ioqueue;
+ pj_ioqueue_t *ioqueue;
pj_lock_t *lock;
pj_status_t rc;
-
- /* Check that arguments are valid. */
- PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
- max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
- PJ_EINVAL);
-
- /* Check that size of pj_ioqueue_op_key_t is sufficient */
- PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
- sizeof(union operation_key), PJ_EBUG);
-
- ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-
- ioqueue_init(ioqueue);
+
+ /* Check that arguments are valid. */
+ PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
+ max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
+ PJ_EINVAL);
+
+ /* Check that size of pj_ioqueue_op_key_t is sufficient */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+
+ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+
+ ioqueue_init(ioqueue);
ioqueue->max = max_fd;
ioqueue->count = 0;
@@ -141,8 +141,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
if (rc != PJ_SUCCESS)
return rc;
- rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
- if (rc != PJ_SUCCESS)
+ rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+ if (rc != PJ_SUCCESS)
return rc;
PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
@@ -159,8 +159,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
-
- pj_lock_acquire(ioqueue->lock);
+
+ pj_lock_acquire(ioqueue->lock);
return ioqueue_destroy(ioqueue);
}
@@ -203,16 +203,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* Create key. */
- key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
- if (rc != PJ_SUCCESS)
- return rc;
+ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+ if (rc != PJ_SUCCESS) {
+ key = NULL;
+ goto on_return;
+ }
/* Register */
pj_list_insert_before(&ioqueue->key_list, key);
++ioqueue->count;
-on_return:
+on_return:
/* On error, socket may be left in non-blocking mode. */
*p_key = key;
pj_lock_release(ioqueue->lock);
@@ -226,13 +228,13 @@ on_return:
* Unregister handle from ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
-{
- pj_ioqueue_t *ioqueue;
+{
+ pj_ioqueue_t *ioqueue;
PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- ioqueue = key->ioqueue;
-
+
+ ioqueue = key->ioqueue;
+
pj_lock_acquire(ioqueue->lock);
pj_assert(ioqueue->count > 0);
@@ -243,21 +245,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
#if PJ_HAS_TCP
PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif
-
- /* ioqueue_destroy may try to acquire key's mutex.
- * Since normally the order of locking is to lock key's mutex first
- * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
- * release ioqueue's mutex first.
- */
- pj_lock_release(ioqueue->lock);
-
- /* Destroy the key. */
- ioqueue_destroy_key(key);
+
+ /* ioqueue_destroy may try to acquire key's mutex.
+ * Since normally the order of locking is to lock key's mutex first
+ * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
+ * release ioqueue's mutex first.
+ */
+ pj_lock_release(ioqueue->lock);
+
+ /* Destroy the key. */
+ ioqueue_destroy_key(key);
return PJ_SUCCESS;
}
-
+
/* This supposed to check whether the fd_set values are consistent
* with the operation currently set in each key.
*/
@@ -307,54 +309,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue,
}
}
#endif /* VALIDATE_FD_SET */
-
-
-/* ioqueue_remove_from_set()
- * This function is called from ioqueue_dispatch_event() to instruct
- * the ioqueue to remove the specified descriptor from ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type)
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
-
-/*
- * ioqueue_add_to_set()
- * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
- * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type )
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
+
+
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type)
+{
+ pj_lock_acquire(ioqueue->lock);
+
+ if (event_type == READABLE_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
+ else if (event_type == WRITEABLE_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
+ else if (event_type == EXCEPTION_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
+ else
+ pj_assert(0);
+
+ pj_lock_release(ioqueue->lock);
+}
+
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type )
+{
+ pj_lock_acquire(ioqueue->lock);
+
+ if (event_type == READABLE_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
+ else if (event_type == WRITEABLE_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
+ else if (event_type == EXCEPTION_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
+ else
+ pj_assert(0);
+
+ pj_lock_release(ioqueue->lock);
+}
/*
* pj_ioqueue_poll()
@@ -378,19 +380,19 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_fd_set_t rfdset, wfdset, xfdset;
int count, counter;
pj_ioqueue_key_t *h;
- struct event
- {
- pj_ioqueue_key_t *key;
- enum event_type event_type;
- } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
-
- PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+ struct event
+ {
+ pj_ioqueue_key_t *key;
+ enum ioqueue_event_type event_type;
+ } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
+
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
/* Lock ioqueue before making fd_set copies */
pj_lock_acquire(ioqueue->lock);
-
- /* We will only do select() when there are sockets to be polled.
- * Otherwise select() will return error.
+
+ /* We will only do select() when there are sockets to be polled.
+ * Otherwise select() will return error.
*/
if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
@@ -422,71 +424,71 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
if (count <= 0)
return count;
- else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
- count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
+ else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
+ count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
- /* Scan descriptor sets for event and add the events in the event
- * array to be processed later in this function. We do this so that
- * events can be processed in parallel without holding ioqueue lock.
+ /* Scan descriptor sets for event and add the events in the event
+ * array to be processed later in this function. We do this so that
+ * events can be processed in parallel without holding ioqueue lock.
*/
pj_lock_acquire(ioqueue->lock);
-
- counter = 0;
-
- /* Scan for writable sockets first to handle piggy-back data
- * coming with accept().
- */
- h = ioqueue->key_list.next;
- for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
- if ( (key_has_pending_write(h) || key_has_pending_connect(h))
- && PJ_FD_ISSET(h->fd, &wfdset))
- {
- event[counter].key = h;
- event[counter].event_type = WRITEABLE_EVENT;
- ++counter;
- }
-
- /* Scan for readable socket. */
- if ((key_has_pending_read(h) || key_has_pending_accept(h))
- && PJ_FD_ISSET(h->fd, &rfdset))
- {
- event[counter].key = h;
- event[counter].event_type = READABLE_EVENT;
- ++counter; }
-
-#if PJ_HAS_TCP
- if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
- event[counter].key = h;
- event[counter].event_type = EXCEPTION_EVENT;
- ++counter;
- }
-#endif
- }
-
- pj_lock_release(ioqueue->lock);
-
- count = counter;
-
- /* Now process all events. The dispatch functions will take care
- * of locking in each of the key
- */
- for (counter=0; counter<count; ++counter) {
- switch (event[counter].event_type) {
- case READABLE_EVENT:
- ioqueue_dispatch_read_event(ioqueue, event[counter].key);
- break;
- case WRITEABLE_EVENT:
- ioqueue_dispatch_write_event(ioqueue, event[counter].key);
- break;
- case EXCEPTION_EVENT:
- ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
- break;
- case NO_EVENT:
- default:
- pj_assert(!"Invalid event!");
- break;
- }
- }
+
+ counter = 0;
+
+ /* Scan for writable sockets first to handle piggy-back data
+ * coming with accept().
+ */
+ h = ioqueue->key_list.next;
+ for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
+ if ( (key_has_pending_write(h) || key_has_pending_connect(h))
+ && PJ_FD_ISSET(h->fd, &wfdset))
+ {
+ event[counter].key = h;
+ event[counter].event_type = WRITEABLE_EVENT;
+ ++counter;
+ }
+
+ /* Scan for readable socket. */
+ if ((key_has_pending_read(h) || key_has_pending_accept(h))
+ && PJ_FD_ISSET(h->fd, &rfdset))
+ {
+ event[counter].key = h;
+ event[counter].event_type = READABLE_EVENT;
+ ++counter;
+ }
+
+#if PJ_HAS_TCP
+ if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
+ event[counter].key = h;
+ event[counter].event_type = EXCEPTION_EVENT;
+ ++counter;
+ }
+#endif
+ }
+
+ pj_lock_release(ioqueue->lock);
+
+ count = counter;
+
+ /* Now process all events. The dispatch functions will take care
+ * of locking in each of the key
+ */
+ for (counter=0; counter<count; ++counter) {
+ switch (event[counter].event_type) {
+ case READABLE_EVENT:
+ ioqueue_dispatch_read_event(ioqueue, event[counter].key);
+ break;
+ case WRITEABLE_EVENT:
+ ioqueue_dispatch_write_event(ioqueue, event[counter].key);
+ break;
+ case EXCEPTION_EVENT:
+ ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
+ break;
+ case NO_EVENT:
+ pj_assert(!"Invalid event!");
+ break;
+ }
+ }
return count;
}
diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c
index 18f4ded2..0c951e99 100644
--- a/pjlib/src/pj/os_core_unix.c
+++ b/pjlib/src/pj/os_core_unix.c
@@ -715,12 +715,14 @@ static pj_status_t init_mutex(pj_mutex_t *mutex, const char *name, int type)
if (type == PJ_MUTEX_SIMPLE) {
#if defined(PJ_LINUX) && PJ_LINUX!=0
+ extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int);
rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_FAST_NP);
#else
rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
#endif
} else {
#if defined(PJ_LINUX) && PJ_LINUX!=0
+ extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int);
rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
#else
rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
diff --git a/pjlib/src/pj/sock_bsd.c b/pjlib/src/pj/sock_bsd.c
index 92495df2..21c10d19 100644
--- a/pjlib/src/pj/sock_bsd.c
+++ b/pjlib/src/pj/sock_bsd.c
@@ -104,7 +104,9 @@ PJ_DEF(pj_uint32_t) pj_htonl(pj_uint32_t hostlong)
*/
PJ_DEF(char*) pj_inet_ntoa(pj_in_addr inaddr)
{
- return inet_ntoa(*(struct in_addr*)&inaddr);
+ struct in_addr addr;
+ addr.s_addr = inaddr.s_addr;
+ return inet_ntoa(addr);
}
/*
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index 3305fb60..a4ee005c 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -38,9 +38,9 @@ typedef struct test_item
client_fd;
pj_ioqueue_t *ioqueue;
pj_ioqueue_key_t *server_key,
- *client_key;
- pj_ioqueue_op_key_t recv_op,
- send_op;
+ *client_key;
+ pj_ioqueue_op_key_t recv_op,
+ send_op;
int has_pending_send;
pj_size_t buffer_size;
char *outgoing_buffer;
@@ -52,16 +52,16 @@ typedef struct test_item
/* Callback when data has been read.
* Increment item->bytes_recv and ready to read the next data.
*/
-static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read)
{
test_item *item = pj_ioqueue_get_user_data(key);
- pj_status_t rc;
+ pj_status_t rc;
int data_is_available = 1;
//TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
-
+
do {
if (thread_quit_flag)
return;
@@ -76,7 +76,7 @@ static void on_read_complete(pj_ioqueue_key_t *key,
PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",
bytes_read, errmsg));
PJ_LOG(3,(THIS_FILE,
- ".....additional info: total read=%u, total written=%u",
+ ".....additional info: total read=%u, total sent=%u",
item->bytes_recv, item->bytes_sent));
} else {
last_error_counter++;
@@ -94,44 +94,44 @@ static void on_read_complete(pj_ioqueue_key_t *key,
*/
if (item->bytes_recv > item->buffer_size * 10000)
thread_quit_flag = 1;
-
+
bytes_read = item->buffer_size;
rc = pj_ioqueue_recv( key, op_key,
item->incoming_buffer, &bytes_read, 0 );
- if (rc == PJ_SUCCESS) {
- data_is_available = 1;
- } else if (rc == PJ_EPENDING) {
- data_is_available = 0;
- } else {
+ if (rc == PJ_SUCCESS) {
+ data_is_available = 1;
+ } else if (rc == PJ_EPENDING) {
+ data_is_available = 0;
+ } else {
data_is_available = 0;
if (rc != last_error) {
last_error = rc;
- app_perror("...error: read error", rc);
+ app_perror("...error: read error(1)", rc);
} else {
last_error_counter++;
}
- }
-
- if (!item->has_pending_send) {
- pj_ssize_t sent = item->buffer_size;
- rc = pj_ioqueue_send(item->client_key, &item->send_op,
- item->outgoing_buffer, &sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...error: write error", rc);
- }
-
- item->has_pending_send = (rc==PJ_EPENDING);
- }
-
+ }
+
+ if (!item->has_pending_send) {
+ pj_ssize_t sent = item->buffer_size;
+ rc = pj_ioqueue_send(item->client_key, &item->send_op,
+ item->outgoing_buffer, &sent, 0);
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: write error", rc);
+ }
+
+ item->has_pending_send = (rc==PJ_EPENDING);
+ }
+
} while (data_is_available);
}
/* Callback when data has been written.
* Increment item->bytes_sent and write the next data.
*/
-static void on_write_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent)
{
test_item *item = pj_ioqueue_get_user_data(key);
@@ -140,7 +140,7 @@ static void on_write_complete(pj_ioqueue_key_t *key,
if (thread_quit_flag)
return;
-
+
item->has_pending_send = 0;
item->bytes_sent += bytes_sent;
@@ -150,14 +150,14 @@ static void on_write_complete(pj_ioqueue_key_t *key,
}
else {
pj_status_t rc;
-
+
bytes_sent = item->buffer_size;
rc = pj_ioqueue_send( item->client_key, op_key,
item->outgoing_buffer, &bytes_sent, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: write error", rc);
- }
-
+ }
+
item->has_pending_send = (rc==PJ_EPENDING);
}
}
@@ -231,7 +231,7 @@ static int perform_test(int sock_type, const char *type_name,
/* Initialize each producer-consumer pair. */
for (i=0; i<sockpair_cnt; ++i) {
- pj_ssize_t bytes;
+ pj_ssize_t bytes;
items[i].ioqueue = ioqueue;
items[i].buffer_size = buffer_size;
@@ -274,7 +274,7 @@ static int perform_test(int sock_type, const char *type_name,
}
/* Start reading. */
- TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
+ TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
bytes = items[i].buffer_size;
rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
items[i].incoming_buffer, &bytes,
@@ -285,7 +285,7 @@ static int perform_test(int sock_type, const char *type_name,
}
/* Start writing. */
- TRACE_((THIS_FILE, " pj_ioqueue_write.."));
+ TRACE_((THIS_FILE, " pj_ioqueue_write.."));
bytes = items[i].buffer_size;
rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
items[i].outgoing_buffer, &bytes, 0);
@@ -293,7 +293,7 @@ static int perform_test(int sock_type, const char *type_name,
app_perror("...error: pj_ioqueue_write", rc);
return -76;
}
-
+
items[i].has_pending_send = (rc==PJ_EPENDING);
}