diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
commit | 33a8c1cb59304d92d517e3ba511bf233c729597f (patch) | |
tree | e6cb65930121480465db749bf5916fa2708ca633 /pjlib/src | |
parent | 6d5fbe07f3dc84c10ea75c5584fe8b5513278d08 (diff) |
Tested new ioqueue framework on Linux with select and epoll
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 1626 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.h | 215 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_epoll.c | 661 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_select.c | 394 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_unix.c | 2 | ||||
-rw-r--r-- | pjlib/src/pj/sock_bsd.c | 4 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_perf.c | 76 |
7 files changed, 1297 insertions, 1681 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index b5599d9c..774d53e3 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -1,813 +1,813 @@ -/* $Id$ */
-
-#include <pj/ioqueue.h>
-#include <pj/errno.h>
-#include <pj/list.h>
-#include <pj/sock.h>
-#include <pj/lock.h>
-#include <pj/assert.h>
-#include <pj/string.h>
-
-
-static void ioqueue_init( pj_ioqueue_t *ioqueue )
-{
- ioqueue->lock = NULL;
- ioqueue->auto_delete_lock = 0;
-}
-
-static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
-{
- if (ioqueue->auto_delete_lock && ioqueue->lock )
- return pj_lock_destroy(ioqueue->lock);
- else
- return PJ_SUCCESS;
-}
-
-/*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
-
- if (ioqueue->auto_delete_lock && ioqueue->lock) {
- pj_lock_destroy(ioqueue->lock);
- }
-
- ioqueue->lock = lock;
- ioqueue->auto_delete_lock = auto_delete;
-
- return PJ_SUCCESS;
-}
-
-static pj_status_t ioqueue_init_key( pj_pool_t *pool,
- pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t sock,
- void *user_data,
- const pj_ioqueue_callback *cb)
-{
- pj_status_t rc;
- int optlen;
-
- key->ioqueue = ioqueue;
- key->fd = sock;
- key->user_data = user_data;
- pj_list_init(&key->read_list);
- pj_list_init(&key->write_list);
-#if PJ_HAS_TCP
- pj_list_init(&key->accept_list);
-#endif
-
- /* Save callback. */
- pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
-
- /* Get socket type. When socket type is datagram, some optimization
- * will be performed during send to allow parallel send operations.
- */
- optlen = sizeof(key->fd_type);
- rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
- &key->fd_type, &optlen);
- if (rc != PJ_SUCCESS)
- key->fd_type = PJ_SOCK_STREAM;
-
- /* Create mutex for the key. */
- rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
-
- return rc;
-}
-
-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
-{
- pj_mutex_destroy(key->mutex);
-}
-
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
- */
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(key != NULL, NULL);
- return key->user_data;
-}
-
-/*
- * pj_ioqueue_set_user_data()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data)
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
- key->user_data = user_data;
-
- return PJ_SUCCESS;
-}
-
-PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->write_list);
-}
-
-PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->read_list);
-}
-
-PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
-{
-#if PJ_HAS_TCP
- return !pj_list_empty(&key->accept_list);
-#else
- return 0;
-#endif
-}
-
-PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
-{
- return key->connecting;
-}
-
-
-/*
- * ioqueue_dispatch_event()
- *
- * Report occurence of an event in the key to be processed by the
- * framework.
- */
-void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
-{
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if (h->connecting) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
- /* Clear operation. */
- h->connecting = 0;
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
-#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Done. */
-
- } else
-#endif /* PJ_HAS_TCP */
- if (key_has_pending_write(h)) {
- /* Socket is writable. */
- struct write_operation *write_op;
- pj_ssize_t sent;
- pj_status_t send_rc;
-
- /* Get the first in the queue. */
- write_op = h->write_list.next;
-
- /* For datagrams, we can remove the write_op from the list
- * so that send() can work in parallel.
- */
- if (h->fd_type == PJ_SOCK_DGRAM) {
- pj_list_erase(write_op);
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- pj_mutex_unlock(h->mutex);
- }
-
- /* Send the data.
- * Unfortunately we must do this while holding key's mutex, thus
- * preventing parallel write on a single key.. :-((
- */
- sent = write_op->size - write_op->written;
- if (write_op->op == PJ_IOQUEUE_OP_SEND) {
- send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
- &sent, write_op->flags);
- } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
- send_rc = pj_sock_sendto(h->fd,
- write_op->buf+write_op->written,
- &sent, write_op->flags,
- &write_op->rmt_addr,
- write_op->rmt_addrlen);
- } else {
- pj_assert(!"Invalid operation type!");
- send_rc = PJ_EBUG;
- }
-
- if (send_rc == PJ_SUCCESS) {
- write_op->written += sent;
- } else {
- pj_assert(send_rc > 0);
- write_op->written = -send_rc;
- }
-
- /* Are we finished with this buffer? */
- if (send_rc!=PJ_SUCCESS ||
- write_op->written == (pj_ssize_t)write_op->size ||
- h->fd_type == PJ_SOCK_DGRAM)
- {
- if (h->fd_type != PJ_SOCK_DGRAM) {
- /* Write completion of the whole stream. */
- pj_list_erase(write_op);
-
- /* Clear operation if there's no more data to send. */
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- /* No need to hold mutex anymore */
- pj_mutex_unlock(h->mutex);
- }
-
- /* Call callback. */
- if (h->cb.on_write_complete) {
- (*h->cb.on_write_complete)(h,
- (pj_ioqueue_op_key_t*)write_op,
- write_op->written);
- }
-
- } else {
- pj_mutex_unlock(h->mutex);
- }
-
- /* Done. */
- } else {
- pj_assert(!"Descriptor is signaled but key "
- "has no pending operation!");
-
- pj_mutex_unlock(h->mutex);
- }
-}
-
-void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
-{
- pj_status_t rc;
-
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-# if PJ_HAS_TCP
- if (!pj_list_empty(&h->accept_list)) {
-
- struct accept_operation *accept_op;
-
- /* Get one accept operation from the list. */
- accept_op = h->accept_list.next;
- pj_list_erase(accept_op);
-
- /* Clear bit in fdset if there is no more pending accept */
- if (pj_list_empty(&h->accept_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- rc=pj_sock_accept(h->fd, accept_op->accept_fd,
- accept_op->rmt_addr, accept_op->addrlen);
- if (rc==PJ_SUCCESS && accept_op->local_addr) {
- rc = pj_sock_getsockname(*accept_op->accept_fd,
- accept_op->local_addr,
- accept_op->addrlen);
- }
-
- /* Call callback. */
- if (h->cb.on_accept_complete)
- (*h->cb.on_accept_complete)(h,
- (pj_ioqueue_op_key_t*)accept_op,
- *accept_op->accept_fd, rc);
-
- }
- else
-# endif
- if (key_has_pending_read(h)) {
- struct read_operation *read_op;
- pj_ssize_t bytes_read;
-
- pj_assert(!pj_list_empty(&h->read_list));
-
- /* Get one pending read operation from the list. */
- read_op = h->read_list.next;
- pj_list_erase(read_op);
-
- /* Clear fdset if there is no pending read. */
- if (pj_list_empty(&h->read_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- bytes_read = read_op->size;
-
- if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
- read_op->rmt_addr,
- read_op->rmt_addrlen);
- } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- } else {
- pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
- /*
- * User has specified pj_ioqueue_read().
- * On Win32, we should do ReadFile(). But because we got
- * here because of select() anyway, user must have put a
- * socket descriptor on h->fd, which in this case we can
- * just call pj_sock_recv() instead of ReadFile().
- * On Unix, user may put a file in h->fd, so we'll have
- * to call read() here.
- * This may not compile on systems which doesn't have
- * read(). That's why we only specify PJ_LINUX here so
- * that error is easier to catch.
- */
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
- // &bytes_read, NULL);
-# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
- bytes_read = read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
-# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
- bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
-# else
-# error "Implement read() for this platform!"
-# endif
- }
-
- if (rc != PJ_SUCCESS) {
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- /* On Win32, for UDP, WSAECONNRESET on the receive side
- * indicates that previous sending has triggered ICMP Port
- * Unreachable message.
- * But we wouldn't know at this point which one of previous
- * key that has triggered the error, since UDP socket can
- * be shared!
- * So we'll just ignore it!
- */
-
- if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
- //PJ_LOG(4,(THIS_FILE,
- // "Ignored ICMP port unreach. on key=%p", h));
- }
-# endif
-
- /* In any case we would report this to caller. */
- bytes_read = -rc;
- }
-
- /* Call callback. */
- if (h->cb.on_read_complete) {
- (*h->cb.on_read_complete)(h,
- (pj_ioqueue_op_key_t*)read_op,
- bytes_read);
- }
-
- } else {
- pj_mutex_unlock(h->mutex);
- }
-}
-
-
-void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *h )
-{
- pj_mutex_lock(h->mutex);
-
- if (!h->connecting) {
- /* It is possible that more than one thread was woken up, thus
- * the remaining thread will see h->connecting as zero because
- * it has been processed by other thread.
- */
- pj_mutex_unlock(h->mutex);
- return;
- }
-
- /* Clear operation. */
- h->connecting = 0;
-
- pj_mutex_unlock(h->mutex);
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-}
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags )
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recv(key->fd, buffer, &size, flags);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags,
- pj_sockaddr_t *addr,
- int *addrlen)
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
- addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
- read_op->rmt_addr = addr;
- read_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_send(key->fd, data, &sent, flags);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Schedule asynchronous send.
- */
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags,
- const pj_sockaddr_t *addr,
- int addrlen)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Check that address storage can hold the address parameter.
- */
- PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
-
- /*
- * Schedule asynchronous send.
- */
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND_TO;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
- pj_memcpy(&write_op->rmt_addr, addr, addrlen);
- write_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- struct accept_operation *accept_op;
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
-
- /* Fast track:
- * See if there's new connection available immediately.
- */
- if (pj_list_empty(&key->accept_list)) {
- status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! New connection is available! */
- if (local && addrlen) {
- status = pj_sock_getsockname(*new_sock, local, addrlen);
- if (status != PJ_SUCCESS) {
- pj_sock_close(*new_sock);
- *new_sock = PJ_INVALID_SOCKET;
- return status;
- }
- }
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * No connection is available immediately.
- * Schedule accept() operation to be completed when there is incoming
- * connection available.
- */
- accept_op = (struct accept_operation*)op_key;
-
- accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
- accept_op->accept_fd = new_sock;
- accept_op->rmt_addr = remote;
- accept_op->addrlen= addrlen;
- accept_op->local_addr = local;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->accept_list, accept_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
-{
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
-
- /* Check if socket has not been marked for connecting */
- if (key->connecting != 0)
- return PJ_EPENDING;
-
- status = pj_sock_connect(key->fd, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Connected! */
- return PJ_SUCCESS;
- } else {
- if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
- /* Pending! */
- pj_mutex_lock(key->mutex);
- key->connecting = PJ_TRUE;
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
- pj_mutex_unlock(key->mutex);
- return PJ_EPENDING;
- } else {
- /* Error! */
- return status;
- }
- }
-}
-#endif /* PJ_HAS_TCP */
-
+/* $Id$ */ + +/* + * ioqueue_common_abs.c + * + * This contains common functionalities to emulate proactor pattern with + * various event dispatching mechanisms (e.g. select, epoll). + * + * This file will be included by the appropriate ioqueue implementation. + * This file is NOT supposed to be compiled as stand-alone source. + */ + +static void ioqueue_init( pj_ioqueue_t *ioqueue ) +{ + ioqueue->lock = NULL; + ioqueue->auto_delete_lock = 0; +} + +static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue) +{ + if (ioqueue->auto_delete_lock && ioqueue->lock ) + return pj_lock_destroy(ioqueue->lock); + else + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_set_lock() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, + pj_lock_t *lock, + pj_bool_t auto_delete ) +{ + PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); + + if (ioqueue->auto_delete_lock && ioqueue->lock) { + pj_lock_destroy(ioqueue->lock); + } + + ioqueue->lock = lock; + ioqueue->auto_delete_lock = auto_delete; + + return PJ_SUCCESS; +} + +static pj_status_t ioqueue_init_key( pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + pj_sock_t sock, + void *user_data, + const pj_ioqueue_callback *cb) +{ + pj_status_t rc; + int optlen; + + key->ioqueue = ioqueue; + key->fd = sock; + key->user_data = user_data; + pj_list_init(&key->read_list); + pj_list_init(&key->write_list); +#if PJ_HAS_TCP + pj_list_init(&key->accept_list); +#endif + + /* Save callback. */ + pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + + /* Get socket type. When socket type is datagram, some optimization + * will be performed during send to allow parallel send operations. + */ + optlen = sizeof(key->fd_type); + rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE, + &key->fd_type, &optlen); + if (rc != PJ_SUCCESS) + key->fd_type = PJ_SOCK_STREAM; + + /* Create mutex for the key. */ + rc = pj_mutex_create_simple(pool, NULL, &key->mutex); + + return rc; +} + +static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) +{ + pj_mutex_destroy(key->mutex); +} + +/* + * pj_ioqueue_get_user_data() + * + * Obtain value associated with a key. + */ +PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(key != NULL, NULL); + return key->user_data; +} + +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + key->user_data = user_data; + + return PJ_SUCCESS; +} + +PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->write_list); +} + +PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->read_list); +} + +PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key) +{ +#if PJ_HAS_TCP + return !pj_list_empty(&key->accept_list); +#else + return 0; +#endif +} + +PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) +{ + return key->connecting; +} + + +/* + * ioqueue_dispatch_event() + * + * Report occurence of an event in the key to be processed by the + * framework. + */ +void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) +{ + /* Lock the key. */ + pj_mutex_lock(h->mutex); + +#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 + if (h->connecting) { + /* Completion of connect() operation */ + pj_ssize_t bytes_transfered; + + /* Clear operation. */ + h->connecting = 0; + + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + +#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) + /* from connect(2): + * On Linux, use getsockopt to read the SO_ERROR option at + * level SOL_SOCKET to determine whether connect() completed + * successfully (if SO_ERROR is zero). + */ + int value; + socklen_t vallen = sizeof(value); + int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, + &value, &vallen); + if (gs_rc != 0) { + /* Argh!! What to do now??? + * Just indicate that the socket is connected. The + * application will get error as soon as it tries to use + * the socket to send/receive. + */ + bytes_transfered = 0; + } else { + bytes_transfered = value; + } +#elif defined(PJ_WIN32) && PJ_WIN32!=0 + bytes_transfered = 0; /* success */ +#else + /* Excellent information in D.J. Bernstein page: + * http://cr.yp.to/docs/connect.html + * + * Seems like the most portable way of detecting connect() + * failure is to call getpeername(). If socket is connected, + * getpeername() will return 0. If the socket is not connected, + * it will return ENOTCONN, and read(fd, &ch, 1) will produce + * the right errno through error slippage. This is a combination + * of suggestions from Douglas C. Schmidt and Ken Keys. + */ + int gp_rc; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); + bytes_transfered = gp_rc; +#endif + + /* Call callback. */ + if (h->cb.on_connect_complete) + (*h->cb.on_connect_complete)(h, bytes_transfered); + + /* Done. */ + + } else +#endif /* PJ_HAS_TCP */ + if (key_has_pending_write(h)) { + /* Socket is writable. */ + struct write_operation *write_op; + pj_ssize_t sent; + pj_status_t send_rc; + + /* Get the first in the queue. */ + write_op = h->write_list.next; + + /* For datagrams, we can remove the write_op from the list + * so that send() can work in parallel. + */ + if (h->fd_type == PJ_SOCK_DGRAM) { + pj_list_erase(write_op); + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + + pj_mutex_unlock(h->mutex); + } + + /* Send the data. + * Unfortunately we must do this while holding key's mutex, thus + * preventing parallel write on a single key.. :-(( + */ + sent = write_op->size - write_op->written; + if (write_op->op == PJ_IOQUEUE_OP_SEND) { + send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, + &sent, write_op->flags); + } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { + send_rc = pj_sock_sendto(h->fd, + write_op->buf+write_op->written, + &sent, write_op->flags, + &write_op->rmt_addr, + write_op->rmt_addrlen); + } else { + pj_assert(!"Invalid operation type!"); + send_rc = PJ_EBUG; + } + + if (send_rc == PJ_SUCCESS) { + write_op->written += sent; + } else { + pj_assert(send_rc > 0); + write_op->written = -send_rc; + } + + /* Are we finished with this buffer? */ + if (send_rc!=PJ_SUCCESS || + write_op->written == (pj_ssize_t)write_op->size || + h->fd_type == PJ_SOCK_DGRAM) + { + if (h->fd_type != PJ_SOCK_DGRAM) { + /* Write completion of the whole stream. */ + pj_list_erase(write_op); + + /* Clear operation if there's no more data to send. */ + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + + /* No need to hold mutex anymore */ + pj_mutex_unlock(h->mutex); + } + + /* Call callback. */ + if (h->cb.on_write_complete) { + (*h->cb.on_write_complete)(h, + (pj_ioqueue_op_key_t*)write_op, + write_op->written); + } + + } else { + pj_mutex_unlock(h->mutex); + } + + /* Done. */ + } else { + pj_assert(!"Descriptor is signaled but key " + "has no pending operation!"); + + pj_mutex_unlock(h->mutex); + } +} + +void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) +{ + pj_status_t rc; + + /* Lock the key. */ + pj_mutex_lock(h->mutex); + +# if PJ_HAS_TCP + if (!pj_list_empty(&h->accept_list)) { + + struct accept_operation *accept_op; + + /* Get one accept operation from the list. */ + accept_op = h->accept_list.next; + pj_list_erase(accept_op); + + /* Clear bit in fdset if there is no more pending accept */ + if (pj_list_empty(&h->accept_list)) + ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + + rc=pj_sock_accept(h->fd, accept_op->accept_fd, + accept_op->rmt_addr, accept_op->addrlen); + if (rc==PJ_SUCCESS && accept_op->local_addr) { + rc = pj_sock_getsockname(*accept_op->accept_fd, + accept_op->local_addr, + accept_op->addrlen); + } + + /* Call callback. */ + if (h->cb.on_accept_complete) { + (*h->cb.on_accept_complete)(h, + (pj_ioqueue_op_key_t*)accept_op, + *accept_op->accept_fd, rc); + } + + } + else +# endif + if (key_has_pending_read(h)) { + struct read_operation *read_op; + pj_ssize_t bytes_read; + + /* Get one pending read operation from the list. */ + read_op = h->read_list.next; + pj_list_erase(read_op); + + /* Clear fdset if there is no pending read. */ + if (pj_list_empty(&h->read_list)) + ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + + bytes_read = read_op->size; + + if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { + rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, + read_op->rmt_addr, + read_op->rmt_addrlen); + } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + } else { + pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); + /* + * User has specified pj_ioqueue_read(). + * On Win32, we should do ReadFile(). But because we got + * here because of select() anyway, user must have put a + * socket descriptor on h->fd, which in this case we can + * just call pj_sock_recv() instead of ReadFile(). + * On Unix, user may put a file in h->fd, so we'll have + * to call read() here. + * This may not compile on systems which doesn't have + * read(). That's why we only specify PJ_LINUX here so + * that error is easier to catch. + */ +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, + // &bytes_read, NULL); +# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) + bytes_read = read(h->fd, read_op->buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); +# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 + bytes_read = sys_read(h->fd, read_op->buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; +# else +# error "Implement read() for this platform!" +# endif + } + + if (rc != PJ_SUCCESS) { +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + /* On Win32, for UDP, WSAECONNRESET on the receive side + * indicates that previous sending has triggered ICMP Port + * Unreachable message. + * But we wouldn't know at this point which one of previous + * key that has triggered the error, since UDP socket can + * be shared! + * So we'll just ignore it! + */ + + if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { + //PJ_LOG(4,(THIS_FILE, + // "Ignored ICMP port unreach. on key=%p", h)); + } +# endif + + /* In any case we would report this to caller. */ + bytes_read = -rc; + } + + /* Call callback. */ + if (h->cb.on_read_complete) { + (*h->cb.on_read_complete)(h, + (pj_ioqueue_op_key_t*)read_op, + bytes_read); + } + + } else { + pj_mutex_unlock(h->mutex); + } +} + + +void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) +{ + pj_mutex_lock(h->mutex); + + if (!h->connecting) { + /* It is possible that more than one thread was woken up, thus + * the remaining thread will see h->connecting as zero because + * it has been processed by other thread. + */ + pj_mutex_unlock(h->mutex); + return; + } + + /* Clear operation. */ + h->connecting = 0; + + pj_mutex_unlock(h->mutex); + + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + + /* Call callback. */ + if (h->cb.on_connect_complete) + (*h->cb.on_connect_complete)(h, -1); +} + +/* + * pj_ioqueue_recv() + * + * Start asynchronous recv() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags ) +{ + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recv(key->fd, buffer, &size, flags); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_recvfrom() + * + * Start asynchronous recvfrom() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags, + pj_sockaddr_t *addr, + int *addrlen) +{ + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recvfrom(key->fd, buffer, &size, flags, + addr, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV_FROM; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + read_op->rmt_addr = addr; + read_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_send() + * + * Start asynchronous send() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags) +{ + struct write_operation *write_op; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_send(key->fd, data, &sent, flags); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + + +/* + * pj_ioqueue_sendto() + * + * Start asynchronous write() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags, + const pj_sockaddr_t *addr, + int addrlen) +{ + struct write_operation *write_op; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Check that address storage can hold the address parameter. + */ + PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND_TO; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + pj_memcpy(&write_op->rmt_addr, addr, addrlen); + write_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +#if PJ_HAS_TCP +/* + * Initiate overlapped accept() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{ + struct accept_operation *accept_op; + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* Fast track: + * See if there's new connection available immediately. + */ + if (pj_list_empty(&key->accept_list)) { + status = pj_sock_accept(key->fd, new_sock, remote, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! New connection is available! */ + if (local && addrlen) { + status = pj_sock_getsockname(*new_sock, local, addrlen); + if (status != PJ_SUCCESS) { + pj_sock_close(*new_sock); + *new_sock = PJ_INVALID_SOCKET; + return status; + } + } + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * No connection is available immediately. + * Schedule accept() operation to be completed when there is incoming + * connection available. + */ + accept_op = (struct accept_operation*)op_key; + + accept_op->op = PJ_IOQUEUE_OP_ACCEPT; + accept_op->accept_fd = new_sock; + accept_op->rmt_addr = remote; + accept_op->addrlen= addrlen; + accept_op->local_addr = local; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->accept_list, accept_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * Initiate overlapped connect() operation (well, it's non-blocking actually, + * since there's no overlapped version of connect()). + */ +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, + const pj_sockaddr_t *addr, + int addrlen ) +{ + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + + /* Check if socket has not been marked for connecting */ + if (key->connecting != 0) + return PJ_EPENDING; + + status = pj_sock_connect(key->fd, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Connected! */ + return PJ_SUCCESS; + } else { + if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { + /* Pending! */ + pj_mutex_lock(key->mutex); + key->connecting = PJ_TRUE; + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT); + pj_mutex_unlock(key->mutex); + return PJ_EPENDING; + } else { + /* Error! */ + return status; + } + } +} +#endif /* PJ_HAS_TCP */ + diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index c6fc1ff6..1902ff46 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -1,107 +1,108 @@ -/* $Id */
-
-/* ioqueue_common_abs.h
- *
- * This file contains private declarations for abstracting various
- * event polling/dispatching mechanisms (e.g. select, poll, epoll)
- * to the ioqueue.
- */
-
-#include <pj/list.h>
-
-/*
- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
- * the correct error code.
- */
-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
-# error "Error reporting must be enabled for this function to work!"
-#endif
-
-
-struct generic_operation
-{
- PJ_DECL_LIST_MEMBER(struct generic_operation);
- pj_ioqueue_operation_e op;
-};
-
-struct read_operation
-{
- PJ_DECL_LIST_MEMBER(struct read_operation);
- pj_ioqueue_operation_e op;
-
- void *buf;
- pj_size_t size;
- unsigned flags;
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-};
-
-struct write_operation
-{
- PJ_DECL_LIST_MEMBER(struct write_operation);
- pj_ioqueue_operation_e op;
-
- char *buf;
- pj_size_t size;
- pj_ssize_t written;
- unsigned flags;
- pj_sockaddr_in rmt_addr;
- int rmt_addrlen;
-};
-
-#if PJ_HAS_TCP
-struct accept_operation
-{
- PJ_DECL_LIST_MEMBER(struct accept_operation);
- pj_ioqueue_operation_e op;
-
- pj_sock_t *accept_fd;
- pj_sockaddr_t *local_addr;
- pj_sockaddr_t *rmt_addr;
- int *addrlen;
-};
-#endif
-
-union operation_key
-{
- struct generic_operation generic;
- struct read_operation read;
- struct write_operation write;
-#if PJ_HAS_TCP
- struct accept_operation accept;
-#endif
-};
-
-#define DECLARE_COMMON_KEY \
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
- pj_ioqueue_t *ioqueue; \
- pj_mutex_t *mutex; \
- pj_sock_t fd; \
- int fd_type; \
- void *user_data; \
- pj_ioqueue_callback cb; \
- int connecting; \
- struct read_operation read_list; \
- struct write_operation write_list; \
- struct accept_operation accept_list;
-
-
-#define DECLARE_COMMON_IOQUEUE \
- pj_lock_t *lock; \
- pj_bool_t auto_delete_lock;
-
-
-enum ioqueue_event_type
-{
- NO_EVENT,
- READABLE_EVENT,
- WRITEABLE_EVENT,
- EXCEPTION_EVENT,
-};
-
-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type );
-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type);
+/* $Id */ + +/* ioqueue_common_abs.h + * + * This file contains private declarations for abstracting various + * event polling/dispatching mechanisms (e.g. select, poll, epoll) + * to the ioqueue. + */ + +#include <pj/list.h> + +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Proper error reporting must be enabled for ioqueue to work!" +#endif + + +struct generic_operation +{ + PJ_DECL_LIST_MEMBER(struct generic_operation); + pj_ioqueue_operation_e op; +}; + +struct read_operation +{ + PJ_DECL_LIST_MEMBER(struct read_operation); + pj_ioqueue_operation_e op; + + void *buf; + pj_size_t size; + unsigned flags; + pj_sockaddr_t *rmt_addr; + int *rmt_addrlen; +}; + +struct write_operation +{ + PJ_DECL_LIST_MEMBER(struct write_operation); + pj_ioqueue_operation_e op; + + char *buf; + pj_size_t size; + pj_ssize_t written; + unsigned flags; + pj_sockaddr_in rmt_addr; + int rmt_addrlen; +}; + +#if PJ_HAS_TCP +struct accept_operation +{ + PJ_DECL_LIST_MEMBER(struct accept_operation); + pj_ioqueue_operation_e op; + + pj_sock_t *accept_fd; + pj_sockaddr_t *local_addr; + pj_sockaddr_t *rmt_addr; + int *addrlen; +}; +#endif + +union operation_key +{ + struct generic_operation generic; + struct read_operation read; + struct write_operation write; +#if PJ_HAS_TCP + struct accept_operation accept; +#endif +}; + +#define DECLARE_COMMON_KEY \ + PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ + pj_ioqueue_t *ioqueue; \ + pj_mutex_t *mutex; \ + pj_sock_t fd; \ + int fd_type; \ + void *user_data; \ + pj_ioqueue_callback cb; \ + int connecting; \ + struct read_operation read_list; \ + struct write_operation write_list; \ + struct accept_operation accept_list; + + +#define DECLARE_COMMON_IOQUEUE \ + pj_lock_t *lock; \ + pj_bool_t auto_delete_lock; + + +enum ioqueue_event_type +{ + NO_EVENT, + READABLE_EVENT, + WRITEABLE_EVENT, + EXCEPTION_EVENT, +}; + +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ); +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type); + diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index 24f9bfbb..aa012531 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -1,5 +1,4 @@ /* $Id$ - * */ /* * ioqueue_epoll.c @@ -30,7 +29,7 @@ # define epoll_data data.ptr # define epoll_data_type void* -# define ioctl_val_type unsigned long* +# define ioctl_val_type unsigned long # define getsockopt_val_ptr int* # define os_getsockopt getsockopt # define os_ioctl ioctl @@ -126,51 +125,20 @@ #define THIS_FILE "ioq_epoll" -#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \ - (op & PJ_IOQUEUE_OP_RECV) || \ - (op & PJ_IOQUEUE_OP_RECV_FROM)) -#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \ - (op & PJ_IOQUEUE_OP_SEND) || \ - (op & PJ_IOQUEUE_OP_SEND_TO)) - - -#if PJ_HAS_TCP -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT) -# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT) -#else -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0 -# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0 -#endif - - //#define TRACE_(expr) PJ_LOG(3,expr) #define TRACE_(expr) +/* + * Include common ioqueue abstraction. + */ +#include "ioqueue_common_abs.h" /* * This describes each key. */ struct pj_ioqueue_key_t { - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) - pj_sock_t fd; - pj_ioqueue_operation_e op; - void *user_data; - pj_ioqueue_callback cb; - - void *rd_buf; - unsigned rd_flags; - pj_size_t rd_buflen; - void *wr_buf; - pj_size_t wr_buflen; - - pj_sockaddr_t *rmt_addr; - int *rmt_addrlen; - - pj_sockaddr_t *local_addr; - int *local_addrlen; - - pj_sock_t *accept_fd; + DECLARE_COMMON_KEY }; /* @@ -178,13 +146,18 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - pj_lock_t *lock; - pj_bool_t auto_delete_lock; + DECLARE_COMMON_IOQUEUE + unsigned max, count; pj_ioqueue_key_t hlist; int epfd; }; +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" + /* * pj_ioqueue_create() * @@ -192,37 +165,45 @@ struct pj_ioqueue_t */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioque; + pj_ioqueue_t *ioqueue; pj_status_t rc; + pj_lock_t *lock; - PJ_UNUSED_ARG(max_threads); + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0, PJ_EINVAL); - if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { - pj_assert(!"max_fd too large"); - return PJ_EINVAL; - } + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); - ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); - ioque->max = max_fd; - ioque->count = 0; - pj_list_init(&ioque->hlist); + ioqueue->max = max_fd; + ioqueue->count = 0; + pj_list_init(&ioqueue->hlist); - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; - ioque->auto_delete_lock = PJ_TRUE; - ioque->epfd = os_epoll_create(max_fd); - if (ioque->epfd < 0) { + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) + return rc; + + ioqueue->epfd = os_epoll_create(max_fd); + if (ioqueue->epfd < 0) { + ioqueue_destroy(ioqueue); return PJ_RETURN_OS_ERROR(pj_get_native_os_error()); } - PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque)); + PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue)); - *p_ioqueue = ioque; + *p_ioqueue = ioqueue; return PJ_SUCCESS; } @@ -231,47 +212,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, * * Destroy ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) +PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); - PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP); - - pj_lock_acquire(ioque->lock); - os_close(ioque->epfd); - ioque->epfd = 0; - if (ioque->auto_delete_lock) - pj_lock_destroy(ioque->lock); - - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_set_lock() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, - pj_lock_t *lock, - pj_bool_t auto_delete ) -{ - PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); - - if (ioque->auto_delete_lock) { - pj_lock_destroy(ioque->lock); - } - - ioque->lock = lock; - ioque->auto_delete_lock = auto_delete; + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); - return PJ_SUCCESS; + pj_lock_acquire(ioqueue->lock); + os_close(ioqueue->epfd); + ioqueue->epfd = 0; + return ioqueue_destroy(ioqueue); } - /* * pj_ioqueue_register_sock() * * Register a socket to ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioque, + pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, @@ -283,12 +241,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, int status; pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET && + PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); - pj_lock_acquire(ioque->lock); + pj_lock_acquire(ioqueue->lock); - if (ioque->count >= ioque->max) { + if (ioqueue->count >= ioqueue->max) { rc = PJ_ETOOMANY; TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files")); goto on_return; @@ -305,16 +263,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - key->fd = sock; - key->user_data = user_data; - pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } /* os_epoll_ctl. */ ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; ev.epoll_data = (epoll_data_type)key; - status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev); + status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev); if (status < 0) { rc = pj_get_os_error(); + key = NULL; TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", status)); @@ -322,12 +283,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Register */ - pj_list_insert_before(&ioque->hlist, key); - ++ioque->count; + pj_list_insert_before(&ioqueue->hlist, key); + ++ioqueue->count; on_return: *p_key = key; - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; } @@ -337,179 +298,116 @@ on_return: * * Unregister handle from ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key) +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) { + pj_ioqueue_t *ioqueue; struct epoll_event ev; int status; - PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); + PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL); - pj_lock_acquire(ioque->lock); + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); - pj_assert(ioque->count > 0); - --ioque->count; + pj_assert(ioqueue->count > 0); + --ioqueue->count; pj_list_erase(key); ev.events = 0; ev.epoll_data = (epoll_data_type)key; - status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev); + status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev); if (status != 0) { pj_status_t rc = pj_get_os_error(); - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; } - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); + return PJ_SUCCESS; } -/* - * pj_ioqueue_get_user_data() - * - * Obtain value associated with a key. +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) { - PJ_ASSERT_RETURN(key != NULL, NULL); - return key->user_data; } +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ +} /* * pj_ioqueue_poll() * */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { int i, count, processed; - struct epoll_event events[16]; + struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; int msec; + struct queue { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; + } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; PJ_CHECK_STACK(); msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000; - count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec); + count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec); if (count <= 0) return count; /* Lock ioqueue. */ - pj_lock_acquire(ioque->lock); - - processed = 0; + pj_lock_acquire(ioqueue->lock); - for (i=0; i<count; ++i) { + for (processed=0, i=0; i<count; ++i) { pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) events[i].epoll_data; - pj_status_t rc; - - /* - * Check for completion of read operations. - */ - if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) { - pj_ssize_t bytes_read = h->rd_buflen; - - if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0, - h->rmt_addr, h->rmt_addrlen); - } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); - } else { - bytes_read = os_read( h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); - } - - if (rc != PJ_SUCCESS) { - bytes_read = -rc; - } - - h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | - PJ_IOQUEUE_OP_RECV_FROM); - - /* Call callback. */ - (*h->cb.on_read_complete)(h, bytes_read); - ++processed; - } /* - * Check for completion of accept() operation. + * Check readability. */ - else if ((events[i].events & EPOLLIN) && - (h->op & PJ_IOQUEUE_OP_ACCEPT)) - { - /* accept() must be the only operation specified on - * server socket - */ - pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT); - - rc = pj_sock_accept( h->fd, h->accept_fd, - h->rmt_addr, h->rmt_addrlen); - if (rc==PJ_SUCCESS && h->local_addr) { - rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, - h->local_addrlen); - } - - h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); - - /* Call callback. */ - (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); - + if ((events[i].events & EPOLLIN) && + (key_has_pending_read(h) || key_has_pending_accept(h))) { + queue[processed].key = h; + queue[processed].event_type = READABLE_EVENT; ++processed; } /* - * Check for completion of write operations. + * Check for writeability. */ - if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) { - /* Completion of write(), send(), or sendto() operation. */ - - /* Clear operation. */ - h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | - PJ_IOQUEUE_OP_SEND_TO); - - /* Call callback. */ - /* All data must have been sent? */ - (*h->cb.on_write_complete)(h, h->wr_buflen); - + if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { + queue[processed].key = h; + queue[processed].event_type = WRITEABLE_EVENT; ++processed; } + #if PJ_HAS_TCP /* * Check for completion of connect() operation. */ - else if ((events[i].events & EPOLLOUT) && - (h->op & PJ_IOQUEUE_OP_CONNECT)) - { - /* Completion of connect() operation */ - pj_ssize_t bytes_transfered; - - /* from connect(2): - * On Linux, use getsockopt to read the SO_ERROR option at - * level SOL_SOCKET to determine whether connect() completed - * successfully (if SO_ERROR is zero). - */ - int value; - socklen_t vallen = sizeof(value); - int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, - (getsockopt_val_ptr)&value, &vallen); - if (gs_rc != 0) { - /* Argh!! What to do now??? - * Just indicate that the socket is connected. The - * application will get error as soon as it tries to use - * the socket to send/receive. - */ - bytes_transfered = 0; - } else { - bytes_transfered = value; - } - - /* Clear operation. */ - h->op &= (~PJ_IOQUEUE_OP_CONNECT); - - /* Call callback. */ - (*h->cb.on_connect_complete)(h, bytes_transfered); - + if ((events[i].events & EPOLLOUT) && (h->connecting)) { + queue[processed].key = h; + queue[processed].event_type = WRITEABLE_EVENT; ++processed; } #endif /* PJ_HAS_TCP */ @@ -517,321 +415,32 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) /* * Check for error condition. */ - if (events[i].events & EPOLLERR) { - if (h->op & PJ_IOQUEUE_OP_CONNECT) { - h->op &= ~PJ_IOQUEUE_OP_CONNECT; - - /* Call callback. */ - (*h->cb.on_connect_complete)(h, -1); - - ++processed; - } + if (events[i].events & EPOLLERR && (h->connecting)) { + queue[processed].key = h; + queue[processed].event_type = EXCEPTION_EVENT; + ++processed; } } - - pj_lock_release(ioque->lock); - - return processed; -} - -/* - * pj_ioqueue_read() - * - * Start asynchronous read from the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_READ; - key->rd_flags = 0; - key->rd_buf = buffer; - key->rd_buflen = buflen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_recv() - * - * Start asynchronous recv() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags ) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_recvfrom() - * - * Start asynchronous recvfrom() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV_FROM; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - key->rmt_addr = addr; - key->rmt_addrlen = addrlen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_write() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, 0); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_WRITE; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - -/* - * pj_ioqueue_send() - * - * Start asynchronous send() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, flags); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_sendto() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_sendto() if it returns error. */ - rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND_TO; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -#if PJ_HAS_TCP -/* - * Initiate overlapped accept() operation. - */ -PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ - /* check parameters. All must be specified! */ - pj_assert(ioqueue && key && new_sock); - - /* Server socket must have no other operation! */ - pj_assert(key->op == 0); - - pj_lock_acquire(ioqueue->lock); - - key->op = PJ_IOQUEUE_OP_ACCEPT; - key->accept_fd = new_sock; - key->rmt_addr = remote; - key->rmt_addrlen = addrlen; - key->local_addr = local; - key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */ - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * Initiate overlapped connect() operation (well, it's non-blocking actually, - * since there's no overlapped version of connect()). - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{ - pj_status_t rc; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); - /* Connecting socket must have no other operation! */ - PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); - - rc = pj_sock_connect(key->fd, addr, addrlen); - if (rc == PJ_SUCCESS) { - /* Connected! */ - return PJ_SUCCESS; - } else { - if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || - rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) - { - /* Pending! */ - pj_lock_acquire(ioqueue->lock); - key->op = PJ_IOQUEUE_OP_CONNECT; - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; - } else { - /* Error! */ - return rc; - } + /* Now process the events. */ + for (i=0; i<processed; ++i) { + switch (queue[i].event_type) { + case READABLE_EVENT: + ioqueue_dispatch_read_event(ioqueue, queue[i].key); + break; + case WRITEABLE_EVENT: + ioqueue_dispatch_write_event(ioqueue, queue[i].key); + break; + case EXCEPTION_EVENT: + ioqueue_dispatch_exception_event(ioqueue, queue[i].key); + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } } + + return processed; } -#endif /* PJ_HAS_TCP */ diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 24e68564..c2051681 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -20,11 +20,11 @@ #include <pj/compat/socket.h> #include <pj/sock_select.h> #include <pj/errno.h> -
-/*
- * Include declaration from common abstraction.
- */
-#include "ioqueue_common_abs.h"
+ +/* + * Include declaration from common abstraction. + */ +#include "ioqueue_common_abs.h" /* * ISSUES with ioqueue_select() @@ -38,30 +38,30 @@ * */ #define THIS_FILE "ioq_select" -
-/*
- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
- * the correct error code.
- */
-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
-# error "Error reporting must be enabled for this function to work!"
-#endif
-
-/**
- * Get the number of descriptors in the set. This is defined in sock_select.c
- * This function will only return the number of sockets set from PJ_FD_SET
- * operation. When the set is modified by other means (such as by select()),
- * the count will not be reflected here.
- *
- * That's why don't export this function in the header file, to avoid
- * misunderstanding.
- *
- * @param fdsetp The descriptor set.
- *
- * @return Number of descriptors in the set.
- */
-PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
-
+ +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Error reporting must be enabled for this function to work!" +#endif + +/** + * Get the number of descriptors in the set. This is defined in sock_select.c + * This function will only return the number of sockets set from PJ_FD_SET + * operation. When the set is modified by other means (such as by select()), + * the count will not be reflected here. + * + * That's why don't export this function in the header file, to avoid + * misunderstanding. + * + * @param fdsetp The descriptor set. + * + * @return Number of descriptors in the set. + */ +PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); + /* * During debugging build, VALIDATE_FD_SET is set. @@ -72,12 +72,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); #else # define VALIDATE_FD_SET 0 #endif -
+ /* * This describes each key. */ struct pj_ioqueue_key_t -{
+{ DECLARE_COMMON_KEY }; @@ -86,7 +86,7 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - DECLARE_COMMON_IOQUEUE
+ DECLARE_COMMON_IOQUEUE unsigned max, count; pj_ioqueue_key_t key_list; @@ -96,11 +96,11 @@ struct pj_ioqueue_t pj_fd_set_t xfdset; #endif }; -
-/* Include implementation for common abstraction after we declare
- * pj_ioqueue_key_t and pj_ioqueue_t.
- */
-#include "ioqueue_common_abs.c"
+ +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" /* * pj_ioqueue_create() @@ -111,22 +111,22 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioqueue;
+ pj_ioqueue_t *ioqueue; pj_lock_t *lock; pj_status_t rc; -
- /* Check that arguments are valid. */
- PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
- max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
- PJ_EINVAL);
-
- /* Check that size of pj_ioqueue_op_key_t is sufficient */
- PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
- sizeof(union operation_key), PJ_EBUG);
-
- ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-
- ioqueue_init(ioqueue);
+ + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, + PJ_EINVAL); + + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); ioqueue->max = max_fd; ioqueue->count = 0; @@ -141,8 +141,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, if (rc != PJ_SUCCESS) return rc; - rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
- if (rc != PJ_SUCCESS)
+ rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); @@ -159,8 +159,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); -
- pj_lock_acquire(ioqueue->lock);
+ + pj_lock_acquire(ioqueue->lock); return ioqueue_destroy(ioqueue); } @@ -203,16 +203,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Create key. */ - key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
- if (rc != PJ_SUCCESS)
- return rc;
+ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } /* Register */ pj_list_insert_before(&ioqueue->key_list, key); ++ioqueue->count; -on_return:
+on_return: /* On error, socket may be left in non-blocking mode. */ *p_key = key; pj_lock_release(ioqueue->lock); @@ -226,13 +228,13 @@ on_return: * Unregister handle from ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) -{
- pj_ioqueue_t *ioqueue;
+{ + pj_ioqueue_t *ioqueue; PJ_ASSERT_RETURN(key, PJ_EINVAL); -
- ioqueue = key->ioqueue;
-
+ + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -243,21 +245,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) #if PJ_HAS_TCP PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif -
- /* ioqueue_destroy may try to acquire key's mutex.
- * Since normally the order of locking is to lock key's mutex first
- * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
- * release ioqueue's mutex first.
- */
- pj_lock_release(ioqueue->lock);
-
- /* Destroy the key. */
- ioqueue_destroy_key(key);
+ + /* ioqueue_destroy may try to acquire key's mutex. + * Since normally the order of locking is to lock key's mutex first + * then ioqueue's mutex, ioqueue_destroy may deadlock unless we + * release ioqueue's mutex first. + */ + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); return PJ_SUCCESS; } -
+ /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ @@ -307,54 +309,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, } } #endif /* VALIDATE_FD_SET */ -
- -/* ioqueue_remove_from_set()
- * This function is called from ioqueue_dispatch_event() to instruct
- * the ioqueue to remove the specified descriptor from ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type)
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
-
-/*
- * ioqueue_add_to_set()
- * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
- * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type )
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
+ + +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} + +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} /* * pj_ioqueue_poll() @@ -378,19 +380,19 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_fd_set_t rfdset, wfdset, xfdset; int count, counter; pj_ioqueue_key_t *h; - struct event
- {
- pj_ioqueue_key_t *key;
- enum event_type event_type;
- } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
-
- PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+ struct event + { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; + } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); /* Lock ioqueue before making fd_set copies */ pj_lock_acquire(ioqueue->lock); -
- /* We will only do select() when there are sockets to be polled.
- * Otherwise select() will return error.
+ + /* We will only do select() when there are sockets to be polled. + * Otherwise select() will return error. */ if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && PJ_FD_COUNT(&ioqueue->wfdset)==0 && @@ -422,71 +424,71 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count <= 0) return count; - else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
- count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
+ else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) + count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; - /* Scan descriptor sets for event and add the events in the event
- * array to be processed later in this function. We do this so that
- * events can be processed in parallel without holding ioqueue lock.
+ /* Scan descriptor sets for event and add the events in the event + * array to be processed later in this function. We do this so that + * events can be processed in parallel without holding ioqueue lock. */ pj_lock_acquire(ioqueue->lock); -
- counter = 0;
- - /* Scan for writable sockets first to handle piggy-back data
- * coming with accept().
- */
- h = ioqueue->key_list.next;
- for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
- if ( (key_has_pending_write(h) || key_has_pending_connect(h))
- && PJ_FD_ISSET(h->fd, &wfdset))
- {
- event[counter].key = h;
- event[counter].event_type = WRITEABLE_EVENT;
- ++counter;
- }
-
- /* Scan for readable socket. */
- if ((key_has_pending_read(h) || key_has_pending_accept(h))
- && PJ_FD_ISSET(h->fd, &rfdset))
- {
- event[counter].key = h;
- event[counter].event_type = READABLE_EVENT;
- ++counter; }
-
-#if PJ_HAS_TCP
- if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
- event[counter].key = h;
- event[counter].event_type = EXCEPTION_EVENT;
- ++counter;
- }
-#endif
- }
-
- pj_lock_release(ioqueue->lock);
-
- count = counter;
-
- /* Now process all events. The dispatch functions will take care
- * of locking in each of the key
- */
- for (counter=0; counter<count; ++counter) {
- switch (event[counter].event_type) {
- case READABLE_EVENT:
- ioqueue_dispatch_read_event(ioqueue, event[counter].key);
- break;
- case WRITEABLE_EVENT:
- ioqueue_dispatch_write_event(ioqueue, event[counter].key);
- break;
- case EXCEPTION_EVENT:
- ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
- break;
- case NO_EVENT:
- default:
- pj_assert(!"Invalid event!");
- break;
- }
- }
+ + counter = 0; + + /* Scan for writable sockets first to handle piggy-back data + * coming with accept(). + */ + h = ioqueue->key_list.next; + for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { + if ( (key_has_pending_write(h) || key_has_pending_connect(h)) + && PJ_FD_ISSET(h->fd, &wfdset)) + { + event[counter].key = h; + event[counter].event_type = WRITEABLE_EVENT; + ++counter; + } + + /* Scan for readable socket. */ + if ((key_has_pending_read(h) || key_has_pending_accept(h)) + && PJ_FD_ISSET(h->fd, &rfdset)) + { + event[counter].key = h; + event[counter].event_type = READABLE_EVENT; + ++counter; + } + +#if PJ_HAS_TCP + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { + event[counter].key = h; + event[counter].event_type = EXCEPTION_EVENT; + ++counter; + } +#endif + } + + pj_lock_release(ioqueue->lock); + + count = counter; + + /* Now process all events. The dispatch functions will take care + * of locking in each of the key + */ + for (counter=0; counter<count; ++counter) { + switch (event[counter].event_type) { + case READABLE_EVENT: + ioqueue_dispatch_read_event(ioqueue, event[counter].key); + break; + case WRITEABLE_EVENT: + ioqueue_dispatch_write_event(ioqueue, event[counter].key); + break; + case EXCEPTION_EVENT: + ioqueue_dispatch_exception_event(ioqueue, event[counter].key); + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } + } return count; } diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c index 18f4ded2..0c951e99 100644 --- a/pjlib/src/pj/os_core_unix.c +++ b/pjlib/src/pj/os_core_unix.c @@ -715,12 +715,14 @@ static pj_status_t init_mutex(pj_mutex_t *mutex, const char *name, int type) if (type == PJ_MUTEX_SIMPLE) { #if defined(PJ_LINUX) && PJ_LINUX!=0 + extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_FAST_NP); #else rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); #endif } else { #if defined(PJ_LINUX) && PJ_LINUX!=0 + extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP); #else rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); diff --git a/pjlib/src/pj/sock_bsd.c b/pjlib/src/pj/sock_bsd.c index 92495df2..21c10d19 100644 --- a/pjlib/src/pj/sock_bsd.c +++ b/pjlib/src/pj/sock_bsd.c @@ -104,7 +104,9 @@ PJ_DEF(pj_uint32_t) pj_htonl(pj_uint32_t hostlong) */ PJ_DEF(char*) pj_inet_ntoa(pj_in_addr inaddr) { - return inet_ntoa(*(struct in_addr*)&inaddr); + struct in_addr addr; + addr.s_addr = inaddr.s_addr; + return inet_ntoa(addr); } /* diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index 3305fb60..a4ee005c 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -38,9 +38,9 @@ typedef struct test_item client_fd; pj_ioqueue_t *ioqueue; pj_ioqueue_key_t *server_key, - *client_key;
- pj_ioqueue_op_key_t recv_op,
- send_op;
+ *client_key; + pj_ioqueue_op_key_t recv_op, + send_op; int has_pending_send; pj_size_t buffer_size; char *outgoing_buffer; @@ -52,16 +52,16 @@ typedef struct test_item /* Callback when data has been read. * Increment item->bytes_recv and ready to read the next data. */ -static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read) { test_item *item = pj_ioqueue_get_user_data(key); - pj_status_t rc;
+ pj_status_t rc; int data_is_available = 1; //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); -
+ do { if (thread_quit_flag) return; @@ -76,7 +76,7 @@ static void on_read_complete(pj_ioqueue_key_t *key, PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", bytes_read, errmsg)); PJ_LOG(3,(THIS_FILE, - ".....additional info: total read=%u, total written=%u", + ".....additional info: total read=%u, total sent=%u", item->bytes_recv, item->bytes_sent)); } else { last_error_counter++; @@ -94,44 +94,44 @@ static void on_read_complete(pj_ioqueue_key_t *key, */ if (item->bytes_recv > item->buffer_size * 10000) thread_quit_flag = 1; -
+ bytes_read = item->buffer_size; rc = pj_ioqueue_recv( key, op_key, item->incoming_buffer, &bytes_read, 0 ); - if (rc == PJ_SUCCESS) {
- data_is_available = 1;
- } else if (rc == PJ_EPENDING) {
- data_is_available = 0;
- } else {
+ if (rc == PJ_SUCCESS) { + data_is_available = 1; + } else if (rc == PJ_EPENDING) { + data_is_available = 0; + } else { data_is_available = 0; if (rc != last_error) { last_error = rc; - app_perror("...error: read error", rc); + app_perror("...error: read error(1)", rc); } else { last_error_counter++; } - }
-
- if (!item->has_pending_send) {
- pj_ssize_t sent = item->buffer_size;
- rc = pj_ioqueue_send(item->client_key, &item->send_op,
- item->outgoing_buffer, &sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...error: write error", rc);
- }
-
- item->has_pending_send = (rc==PJ_EPENDING);
- }
-
+ } + + if (!item->has_pending_send) { + pj_ssize_t sent = item->buffer_size; + rc = pj_ioqueue_send(item->client_key, &item->send_op, + item->outgoing_buffer, &sent, 0); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + app_perror("...error: write error", rc); + } + + item->has_pending_send = (rc==PJ_EPENDING); + } + } while (data_is_available); } /* Callback when data has been written. * Increment item->bytes_sent and write the next data. */ -static void on_write_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_write_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent) { test_item *item = pj_ioqueue_get_user_data(key); @@ -140,7 +140,7 @@ static void on_write_complete(pj_ioqueue_key_t *key, if (thread_quit_flag) return; -
+ item->has_pending_send = 0; item->bytes_sent += bytes_sent; @@ -150,14 +150,14 @@ static void on_write_complete(pj_ioqueue_key_t *key, } else { pj_status_t rc; -
+ bytes_sent = item->buffer_size; rc = pj_ioqueue_send( item->client_key, op_key, item->outgoing_buffer, &bytes_sent, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: write error", rc); - }
-
+ } + item->has_pending_send = (rc==PJ_EPENDING); } } @@ -231,7 +231,7 @@ static int perform_test(int sock_type, const char *type_name, /* Initialize each producer-consumer pair. */ for (i=0; i<sockpair_cnt; ++i) { - pj_ssize_t bytes;
+ pj_ssize_t bytes; items[i].ioqueue = ioqueue; items[i].buffer_size = buffer_size; @@ -274,7 +274,7 @@ static int perform_test(int sock_type, const char *type_name, } /* Start reading. */ - TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
+ TRACE_((THIS_FILE, " pj_ioqueue_recv..")); bytes = items[i].buffer_size; rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, items[i].incoming_buffer, &bytes, @@ -285,7 +285,7 @@ static int perform_test(int sock_type, const char *type_name, } /* Start writing. */ - TRACE_((THIS_FILE, " pj_ioqueue_write.."));
+ TRACE_((THIS_FILE, " pj_ioqueue_write..")); bytes = items[i].buffer_size; rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, items[i].outgoing_buffer, &bytes, 0); @@ -293,7 +293,7 @@ static int perform_test(int sock_type, const char *type_name, app_perror("...error: pj_ioqueue_write", rc); return -76; } -
+ items[i].has_pending_send = (rc==PJ_EPENDING); } |