summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_common_abs.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c1910
1 files changed, 955 insertions, 955 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index b383a31f..285aefc9 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -1,955 +1,955 @@
-/* $Id$ */
-/*
- * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
-
-/*
- * ioqueue_common_abs.c
- *
- * This contains common functionalities to emulate proactor pattern with
- * various event dispatching mechanisms (e.g. select, epoll).
- *
- * This file will be included by the appropriate ioqueue implementation.
- * This file is NOT supposed to be compiled as stand-alone source.
- */
-
-static void ioqueue_init( pj_ioqueue_t *ioqueue )
-{
- ioqueue->lock = NULL;
- ioqueue->auto_delete_lock = 0;
-}
-
-static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
-{
- if (ioqueue->auto_delete_lock && ioqueue->lock ) {
- pj_lock_release(ioqueue->lock);
- return pj_lock_destroy(ioqueue->lock);
- } else
- return PJ_SUCCESS;
-}
-
-/*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
-
- if (ioqueue->auto_delete_lock && ioqueue->lock) {
- pj_lock_destroy(ioqueue->lock);
- }
-
- ioqueue->lock = lock;
- ioqueue->auto_delete_lock = auto_delete;
-
- return PJ_SUCCESS;
-}
-
-static pj_status_t ioqueue_init_key( pj_pool_t *pool,
- pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t sock,
- void *user_data,
- const pj_ioqueue_callback *cb)
-{
- pj_status_t rc;
- int optlen;
-
- key->ioqueue = ioqueue;
- key->fd = sock;
- key->user_data = user_data;
- pj_list_init(&key->read_list);
- pj_list_init(&key->write_list);
-#if PJ_HAS_TCP
- pj_list_init(&key->accept_list);
-#endif
-
- /* Save callback. */
- pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
-
- /* Get socket type. When socket type is datagram, some optimization
- * will be performed during send to allow parallel send operations.
- */
- optlen = sizeof(key->fd_type);
- rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
- &key->fd_type, &optlen);
- if (rc != PJ_SUCCESS)
- key->fd_type = PJ_SOCK_STREAM;
-
- /* Create mutex for the key. */
- rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
-
- return rc;
-}
-
-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
-{
- pj_mutex_destroy(key->mutex);
-}
-
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
- */
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(key != NULL, NULL);
- return key->user_data;
-}
-
-/*
- * pj_ioqueue_set_user_data()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data)
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
- key->user_data = user_data;
-
- return PJ_SUCCESS;
-}
-
-PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->write_list);
-}
-
-PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->read_list);
-}
-
-PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
-{
-#if PJ_HAS_TCP
- return !pj_list_empty(&key->accept_list);
-#else
- return 0;
-#endif
-}
-
-PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
-{
- return key->connecting;
-}
-
-
-/*
- * ioqueue_dispatch_event()
- *
- * Report occurence of an event in the key to be processed by the
- * framework.
- */
-void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
-{
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if (h->connecting) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
- /* Clear operation. */
- h->connecting = 0;
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
-#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- {
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Done. */
-
- } else
-#endif /* PJ_HAS_TCP */
- if (key_has_pending_write(h)) {
- /* Socket is writable. */
- struct write_operation *write_op;
- pj_ssize_t sent;
- pj_status_t send_rc;
-
- /* Get the first in the queue. */
- write_op = h->write_list.next;
-
- /* For datagrams, we can remove the write_op from the list
- * so that send() can work in parallel.
- */
- if (h->fd_type == PJ_SOCK_DGRAM) {
- pj_list_erase(write_op);
- write_op->op = 0;
-
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- pj_mutex_unlock(h->mutex);
- }
-
- /* Send the data.
- * Unfortunately we must do this while holding key's mutex, thus
- * preventing parallel write on a single key.. :-((
- */
- sent = write_op->size - write_op->written;
- if (write_op->op == PJ_IOQUEUE_OP_SEND) {
- send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
- &sent, write_op->flags);
- } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
- send_rc = pj_sock_sendto(h->fd,
- write_op->buf+write_op->written,
- &sent, write_op->flags,
- &write_op->rmt_addr,
- write_op->rmt_addrlen);
- } else {
- pj_assert(!"Invalid operation type!");
- send_rc = PJ_EBUG;
- }
-
- if (send_rc == PJ_SUCCESS) {
- write_op->written += sent;
- } else {
- pj_assert(send_rc > 0);
- write_op->written = -send_rc;
- }
-
- /* Are we finished with this buffer? */
- if (send_rc!=PJ_SUCCESS ||
- write_op->written == (pj_ssize_t)write_op->size ||
- h->fd_type == PJ_SOCK_DGRAM)
- {
- if (h->fd_type != PJ_SOCK_DGRAM) {
- /* Write completion of the whole stream. */
- pj_list_erase(write_op);
- write_op->op = 0;
-
- /* Clear operation if there's no more data to send. */
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- /* No need to hold mutex anymore */
- pj_mutex_unlock(h->mutex);
- }
-
- /* Call callback. */
- if (h->cb.on_write_complete) {
- (*h->cb.on_write_complete)(h,
- (pj_ioqueue_op_key_t*)write_op,
- write_op->written);
- }
-
- } else {
- pj_mutex_unlock(h->mutex);
- }
-
- /* Done. */
- } else {
- /*
- * This is normal; execution may fall here when multiple threads
- * are signalled for the same event, but only one thread eventually
- * able to process the event.
- */
- pj_mutex_unlock(h->mutex);
- }
-}
-
-void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
-{
- pj_status_t rc;
-
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-# if PJ_HAS_TCP
- if (!pj_list_empty(&h->accept_list)) {
-
- struct accept_operation *accept_op;
-
- /* Get one accept operation from the list. */
- accept_op = h->accept_list.next;
- pj_list_erase(accept_op);
- accept_op->op = 0;
-
- /* Clear bit in fdset if there is no more pending accept */
- if (pj_list_empty(&h->accept_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- rc=pj_sock_accept(h->fd, accept_op->accept_fd,
- accept_op->rmt_addr, accept_op->addrlen);
- if (rc==PJ_SUCCESS && accept_op->local_addr) {
- rc = pj_sock_getsockname(*accept_op->accept_fd,
- accept_op->local_addr,
- accept_op->addrlen);
- }
-
- /* Call callback. */
- if (h->cb.on_accept_complete) {
- (*h->cb.on_accept_complete)(h,
- (pj_ioqueue_op_key_t*)accept_op,
- *accept_op->accept_fd, rc);
- }
-
- }
- else
-# endif
- if (key_has_pending_read(h)) {
- struct read_operation *read_op;
- pj_ssize_t bytes_read;
-
- /* Get one pending read operation from the list. */
- read_op = h->read_list.next;
- pj_list_erase(read_op);
- read_op->op = 0;
-
- /* Clear fdset if there is no pending read. */
- if (pj_list_empty(&h->read_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- bytes_read = read_op->size;
-
- if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
- read_op->rmt_addr,
- read_op->rmt_addrlen);
- } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- } else {
- pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
- /*
- * User has specified pj_ioqueue_read().
- * On Win32, we should do ReadFile(). But because we got
- * here because of select() anyway, user must have put a
- * socket descriptor on h->fd, which in this case we can
- * just call pj_sock_recv() instead of ReadFile().
- * On Unix, user may put a file in h->fd, so we'll have
- * to call read() here.
- * This may not compile on systems which doesn't have
- * read(). That's why we only specify PJ_LINUX here so
- * that error is easier to catch.
- */
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
- // &bytes_read, NULL);
-# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
- bytes_read = read(h->fd, read_op->buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
-# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
- bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
-# else
-# error "Implement read() for this platform!"
-# endif
- }
-
- if (rc != PJ_SUCCESS) {
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- /* On Win32, for UDP, WSAECONNRESET on the receive side
- * indicates that previous sending has triggered ICMP Port
- * Unreachable message.
- * But we wouldn't know at this point which one of previous
- * key that has triggered the error, since UDP socket can
- * be shared!
- * So we'll just ignore it!
- */
-
- if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
- //PJ_LOG(4,(THIS_FILE,
- // "Ignored ICMP port unreach. on key=%p", h));
- }
-# endif
-
- /* In any case we would report this to caller. */
- bytes_read = -rc;
- }
-
- /* Call callback. */
- if (h->cb.on_read_complete) {
- (*h->cb.on_read_complete)(h,
- (pj_ioqueue_op_key_t*)read_op,
- bytes_read);
- }
-
- } else {
- /*
- * This is normal; execution may fall here when multiple threads
- * are signalled for the same event, but only one thread eventually
- * able to process the event.
- */
- pj_mutex_unlock(h->mutex);
- }
-}
-
-
-void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *h )
-{
- pj_mutex_lock(h->mutex);
-
- if (!h->connecting) {
- /* It is possible that more than one thread was woken up, thus
- * the remaining thread will see h->connecting as zero because
- * it has been processed by other thread.
- */
- pj_mutex_unlock(h->mutex);
- return;
- }
-
- /* Clear operation. */
- h->connecting = 0;
-
- pj_mutex_unlock(h->mutex);
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-}
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags )
-{
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- read_op = (struct read_operation*)op_key;
- read_op->op = 0;
-
- /* Try to see if there's data immediately available.
- */
- if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
- pj_status_t status;
- pj_ssize_t size;
-
- size = *length;
- status = pj_sock_recv(key->fd, buffer, &size, flags);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
- }
-
- flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op->op = PJ_IOQUEUE_OP_RECV;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags,
- pj_sockaddr_t *addr,
- int *addrlen)
-{
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- read_op = (struct read_operation*)op_key;
- read_op->op = 0;
-
- /* Try to see if there's data immediately available.
- */
- if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
- pj_status_t status;
- pj_ssize_t size;
-
- size = *length;
- status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
- addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
- }
-
- flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
- read_op->rmt_addr = addr;
- read_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- write_op = (struct write_operation*)op_key;
- write_op->op = 0;
-
- /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
- flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_send(key->fd, data, &sent, flags);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Schedule asynchronous send.
- */
- write_op->op = PJ_IOQUEUE_OP_SEND;
- write_op->buf = (void*)data;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- pj_uint32_t flags,
- const pj_sockaddr_t *addr,
- int addrlen)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- write_op = (struct write_operation*)op_key;
- write_op->op = 0;
-
- /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
- flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Check that address storage can hold the address parameter.
- */
- PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
-
- /*
- * Schedule asynchronous send.
- */
- write_op->op = PJ_IOQUEUE_OP_SEND_TO;
- write_op->buf = (void*)data;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
- pj_memcpy(&write_op->rmt_addr, addr, addrlen);
- write_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- struct accept_operation *accept_op;
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
-
- accept_op = (struct accept_operation*)op_key;
- accept_op->op = 0;
-
- /* Fast track:
- * See if there's new connection available immediately.
- */
- if (pj_list_empty(&key->accept_list)) {
- status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! New connection is available! */
- if (local && addrlen) {
- status = pj_sock_getsockname(*new_sock, local, addrlen);
- if (status != PJ_SUCCESS) {
- pj_sock_close(*new_sock);
- *new_sock = PJ_INVALID_SOCKET;
- return status;
- }
- }
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * No connection is available immediately.
- * Schedule accept() operation to be completed when there is incoming
- * connection available.
- */
- accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
- accept_op->accept_fd = new_sock;
- accept_op->rmt_addr = remote;
- accept_op->addrlen= addrlen;
- accept_op->local_addr = local;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->accept_list, accept_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
-{
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
-
- /* Check if socket has not been marked for connecting */
- if (key->connecting != 0)
- return PJ_EPENDING;
-
- status = pj_sock_connect(key->fd, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Connected! */
- return PJ_SUCCESS;
- } else {
- if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
- /* Pending! */
- pj_mutex_lock(key->mutex);
- key->connecting = PJ_TRUE;
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
- pj_mutex_unlock(key->mutex);
- return PJ_EPENDING;
- } else {
- /* Error! */
- return status;
- }
- }
-}
-#endif /* PJ_HAS_TCP */
-
-
-PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
- pj_size_t size )
-{
- pj_memset(op_key, 0, size);
-}
-
-
-/*
- * pj_ioqueue_is_pending()
- */
-PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key )
-{
- struct generic_operation *op_rec;
-
- PJ_UNUSED_ARG(key);
-
- op_rec = (struct generic_operation*)op_key;
- return op_rec->op != 0;
-}
-
-
-/*
- * pj_ioqueue_post_completion()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_status )
-{
- struct generic_operation *op_rec;
-
- /*
- * Find the operation key in all pending operation list to
- * really make sure that it's still there; then call the callback.
- */
- pj_mutex_lock(key->mutex);
-
- /* Find the operation in the pending read list. */
- op_rec = (struct generic_operation*)key->read_list.next;
- while (op_rec != (void*)&key->read_list) {
- if (op_rec == (void*)op_key) {
- pj_list_erase(op_rec);
- op_rec->op = 0;
- pj_mutex_unlock(key->mutex);
-
- (*key->cb.on_read_complete)(key, op_key, bytes_status);
- return PJ_SUCCESS;
- }
- op_rec = op_rec->next;
- }
-
- /* Find the operation in the pending write list. */
- op_rec = (struct generic_operation*)key->write_list.next;
- while (op_rec != (void*)&key->write_list) {
- if (op_rec == (void*)op_key) {
- pj_list_erase(op_rec);
- op_rec->op = 0;
- pj_mutex_unlock(key->mutex);
-
- (*key->cb.on_write_complete)(key, op_key, bytes_status);
- return PJ_SUCCESS;
- }
- op_rec = op_rec->next;
- }
-
- /* Find the operation in the pending accept list. */
- op_rec = (struct generic_operation*)key->accept_list.next;
- while (op_rec != (void*)&key->accept_list) {
- if (op_rec == (void*)op_key) {
- pj_list_erase(op_rec);
- op_rec->op = 0;
- pj_mutex_unlock(key->mutex);
-
- (*key->cb.on_accept_complete)(key, op_key,
- PJ_INVALID_SOCKET,
- bytes_status);
- return PJ_SUCCESS;
- }
- op_rec = op_rec->next;
- }
-
- pj_mutex_unlock(key->mutex);
-
- return PJ_EINVALIDOP;
-}
-
+/* $Id$ */
+/*
+ * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/*
+ * ioqueue_common_abs.c
+ *
+ * This contains common functionalities to emulate proactor pattern with
+ * various event dispatching mechanisms (e.g. select, epoll).
+ *
+ * This file will be included by the appropriate ioqueue implementation.
+ * This file is NOT supposed to be compiled as stand-alone source.
+ */
+
+static void ioqueue_init( pj_ioqueue_t *ioqueue )
+{
+ ioqueue->lock = NULL;
+ ioqueue->auto_delete_lock = 0;
+}
+
+static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
+{
+ if (ioqueue->auto_delete_lock && ioqueue->lock ) {
+ pj_lock_release(ioqueue->lock);
+ return pj_lock_destroy(ioqueue->lock);
+ } else
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_set_lock()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
+ pj_lock_t *lock,
+ pj_bool_t auto_delete )
+{
+ PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
+
+ if (ioqueue->auto_delete_lock && ioqueue->lock) {
+ pj_lock_destroy(ioqueue->lock);
+ }
+
+ ioqueue->lock = lock;
+ ioqueue->auto_delete_lock = auto_delete;
+
+ return PJ_SUCCESS;
+}
+
+static pj_status_t ioqueue_init_key( pj_pool_t *pool,
+ pj_ioqueue_t *ioqueue,
+ pj_ioqueue_key_t *key,
+ pj_sock_t sock,
+ void *user_data,
+ const pj_ioqueue_callback *cb)
+{
+ pj_status_t rc;
+ int optlen;
+
+ key->ioqueue = ioqueue;
+ key->fd = sock;
+ key->user_data = user_data;
+ pj_list_init(&key->read_list);
+ pj_list_init(&key->write_list);
+#if PJ_HAS_TCP
+ pj_list_init(&key->accept_list);
+#endif
+
+ /* Save callback. */
+ pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+
+ /* Get socket type. When socket type is datagram, some optimization
+ * will be performed during send to allow parallel send operations.
+ */
+ optlen = sizeof(key->fd_type);
+ rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
+ &key->fd_type, &optlen);
+ if (rc != PJ_SUCCESS)
+ key->fd_type = PJ_SOCK_STREAM;
+
+ /* Create mutex for the key. */
+ rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+
+ return rc;
+}
+
+static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
+{
+ pj_mutex_destroy(key->mutex);
+}
+
+/*
+ * pj_ioqueue_get_user_data()
+ *
+ * Obtain value associated with a key.
+ */
+PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+{
+ PJ_ASSERT_RETURN(key != NULL, NULL);
+ return key->user_data;
+}
+
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+ key->user_data = user_data;
+
+ return PJ_SUCCESS;
+}
+
+PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
+{
+ return !pj_list_empty(&key->write_list);
+}
+
+PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
+{
+ return !pj_list_empty(&key->read_list);
+}
+
+PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
+{
+#if PJ_HAS_TCP
+ return !pj_list_empty(&key->accept_list);
+#else
+ return 0;
+#endif
+}
+
+PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
+{
+ return key->connecting;
+}
+
+
+/*
+ * ioqueue_dispatch_event()
+ *
+ * Report occurence of an event in the key to be processed by the
+ * framework.
+ */
+void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
+{
+ /* Lock the key. */
+ pj_mutex_lock(h->mutex);
+
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
+ if (h->connecting) {
+ /* Completion of connect() operation */
+ pj_ssize_t bytes_transfered;
+
+ /* Clear operation. */
+ h->connecting = 0;
+
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
+#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
+ /* from connect(2):
+ * On Linux, use getsockopt to read the SO_ERROR option at
+ * level SOL_SOCKET to determine whether connect() completed
+ * successfully (if SO_ERROR is zero).
+ */
+ {
+ int value;
+ socklen_t vallen = sizeof(value);
+ int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
+ &value, &vallen);
+ if (gs_rc != 0) {
+ /* Argh!! What to do now???
+ * Just indicate that the socket is connected. The
+ * application will get error as soon as it tries to use
+ * the socket to send/receive.
+ */
+ bytes_transfered = 0;
+ } else {
+ bytes_transfered = value;
+ }
+ }
+#elif defined(PJ_WIN32) && PJ_WIN32!=0
+ bytes_transfered = 0; /* success */
+#else
+ /* Excellent information in D.J. Bernstein page:
+ * http://cr.yp.to/docs/connect.html
+ *
+ * Seems like the most portable way of detecting connect()
+ * failure is to call getpeername(). If socket is connected,
+ * getpeername() will return 0. If the socket is not connected,
+ * it will return ENOTCONN, and read(fd, &ch, 1) will produce
+ * the right errno through error slippage. This is a combination
+ * of suggestions from Douglas C. Schmidt and Ken Keys.
+ */
+ int gp_rc;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
+ bytes_transfered = gp_rc;
+#endif
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, bytes_transfered);
+
+ /* Done. */
+
+ } else
+#endif /* PJ_HAS_TCP */
+ if (key_has_pending_write(h)) {
+ /* Socket is writable. */
+ struct write_operation *write_op;
+ pj_ssize_t sent;
+ pj_status_t send_rc;
+
+ /* Get the first in the queue. */
+ write_op = h->write_list.next;
+
+ /* For datagrams, we can remove the write_op from the list
+ * so that send() can work in parallel.
+ */
+ if (h->fd_type == PJ_SOCK_DGRAM) {
+ pj_list_erase(write_op);
+ write_op->op = 0;
+
+ if (pj_list_empty(&h->write_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+
+ pj_mutex_unlock(h->mutex);
+ }
+
+ /* Send the data.
+ * Unfortunately we must do this while holding key's mutex, thus
+ * preventing parallel write on a single key.. :-((
+ */
+ sent = write_op->size - write_op->written;
+ if (write_op->op == PJ_IOQUEUE_OP_SEND) {
+ send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
+ &sent, write_op->flags);
+ } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
+ send_rc = pj_sock_sendto(h->fd,
+ write_op->buf+write_op->written,
+ &sent, write_op->flags,
+ &write_op->rmt_addr,
+ write_op->rmt_addrlen);
+ } else {
+ pj_assert(!"Invalid operation type!");
+ send_rc = PJ_EBUG;
+ }
+
+ if (send_rc == PJ_SUCCESS) {
+ write_op->written += sent;
+ } else {
+ pj_assert(send_rc > 0);
+ write_op->written = -send_rc;
+ }
+
+ /* Are we finished with this buffer? */
+ if (send_rc!=PJ_SUCCESS ||
+ write_op->written == (pj_ssize_t)write_op->size ||
+ h->fd_type == PJ_SOCK_DGRAM)
+ {
+ if (h->fd_type != PJ_SOCK_DGRAM) {
+ /* Write completion of the whole stream. */
+ pj_list_erase(write_op);
+ write_op->op = 0;
+
+ /* Clear operation if there's no more data to send. */
+ if (pj_list_empty(&h->write_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+
+ /* No need to hold mutex anymore */
+ pj_mutex_unlock(h->mutex);
+ }
+
+ /* Call callback. */
+ if (h->cb.on_write_complete) {
+ (*h->cb.on_write_complete)(h,
+ (pj_ioqueue_op_key_t*)write_op,
+ write_op->written);
+ }
+
+ } else {
+ pj_mutex_unlock(h->mutex);
+ }
+
+ /* Done. */
+ } else {
+ /*
+ * This is normal; execution may fall here when multiple threads
+ * are signalled for the same event, but only one thread eventually
+ * able to process the event.
+ */
+ pj_mutex_unlock(h->mutex);
+ }
+}
+
+void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
+{
+ pj_status_t rc;
+
+ /* Lock the key. */
+ pj_mutex_lock(h->mutex);
+
+# if PJ_HAS_TCP
+ if (!pj_list_empty(&h->accept_list)) {
+
+ struct accept_operation *accept_op;
+
+ /* Get one accept operation from the list. */
+ accept_op = h->accept_list.next;
+ pj_list_erase(accept_op);
+ accept_op->op = 0;
+
+ /* Clear bit in fdset if there is no more pending accept */
+ if (pj_list_empty(&h->accept_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
+ rc=pj_sock_accept(h->fd, accept_op->accept_fd,
+ accept_op->rmt_addr, accept_op->addrlen);
+ if (rc==PJ_SUCCESS && accept_op->local_addr) {
+ rc = pj_sock_getsockname(*accept_op->accept_fd,
+ accept_op->local_addr,
+ accept_op->addrlen);
+ }
+
+ /* Call callback. */
+ if (h->cb.on_accept_complete) {
+ (*h->cb.on_accept_complete)(h,
+ (pj_ioqueue_op_key_t*)accept_op,
+ *accept_op->accept_fd, rc);
+ }
+
+ }
+ else
+# endif
+ if (key_has_pending_read(h)) {
+ struct read_operation *read_op;
+ pj_ssize_t bytes_read;
+
+ /* Get one pending read operation from the list. */
+ read_op = h->read_list.next;
+ pj_list_erase(read_op);
+ read_op->op = 0;
+
+ /* Clear fdset if there is no pending read. */
+ if (pj_list_empty(&h->read_list))
+ ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
+ bytes_read = read_op->size;
+
+ if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
+ rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
+ read_op->rmt_addr,
+ read_op->rmt_addrlen);
+ } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ } else {
+ pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
+ /*
+ * User has specified pj_ioqueue_read().
+ * On Win32, we should do ReadFile(). But because we got
+ * here because of select() anyway, user must have put a
+ * socket descriptor on h->fd, which in this case we can
+ * just call pj_sock_recv() instead of ReadFile().
+ * On Unix, user may put a file in h->fd, so we'll have
+ * to call read() here.
+ * This may not compile on systems which doesn't have
+ * read(). That's why we only specify PJ_LINUX here so
+ * that error is easier to catch.
+ */
+# if defined(PJ_WIN32) && PJ_WIN32 != 0
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
+ // &bytes_read, NULL);
+# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
+ bytes_read = read(h->fd, read_op->buf, bytes_read);
+ rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
+# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
+ bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
+ rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
+# else
+# error "Implement read() for this platform!"
+# endif
+ }
+
+ if (rc != PJ_SUCCESS) {
+# if defined(PJ_WIN32) && PJ_WIN32 != 0
+ /* On Win32, for UDP, WSAECONNRESET on the receive side
+ * indicates that previous sending has triggered ICMP Port
+ * Unreachable message.
+ * But we wouldn't know at this point which one of previous
+ * key that has triggered the error, since UDP socket can
+ * be shared!
+ * So we'll just ignore it!
+ */
+
+ if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
+ //PJ_LOG(4,(THIS_FILE,
+ // "Ignored ICMP port unreach. on key=%p", h));
+ }
+# endif
+
+ /* In any case we would report this to caller. */
+ bytes_read = -rc;
+ }
+
+ /* Call callback. */
+ if (h->cb.on_read_complete) {
+ (*h->cb.on_read_complete)(h,
+ (pj_ioqueue_op_key_t*)read_op,
+ bytes_read);
+ }
+
+ } else {
+ /*
+ * This is normal; execution may fall here when multiple threads
+ * are signalled for the same event, but only one thread eventually
+ * able to process the event.
+ */
+ pj_mutex_unlock(h->mutex);
+ }
+}
+
+
+void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
+ pj_ioqueue_key_t *h )
+{
+ pj_mutex_lock(h->mutex);
+
+ if (!h->connecting) {
+ /* It is possible that more than one thread was woken up, thus
+ * the remaining thread will see h->connecting as zero because
+ * it has been processed by other thread.
+ */
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
+
+ /* Clear operation. */
+ h->connecting = 0;
+
+ pj_mutex_unlock(h->mutex);
+
+ ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, -1);
+}
+
+/*
+ * pj_ioqueue_recv()
+ *
+ * Start asynchronous recv() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ void *buffer,
+ pj_ssize_t *length,
+ unsigned flags )
+{
+ struct read_operation *read_op;
+
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
+
+ /* Try to see if there's data immediately available.
+ */
+ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
+ pj_status_t status;
+ pj_ssize_t size;
+
+ size = *length;
+ status = pj_sock_recv(key->fd, buffer, &size, flags);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+ }
+
+ flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ read_op->op = PJ_IOQUEUE_OP_RECV;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->read_list, read_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_recvfrom()
+ *
+ * Start asynchronous recvfrom() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ void *buffer,
+ pj_ssize_t *length,
+ unsigned flags,
+ pj_sockaddr_t *addr,
+ int *addrlen)
+{
+ struct read_operation *read_op;
+
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
+
+ /* Try to see if there's data immediately available.
+ */
+ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
+ pj_status_t status;
+ pj_ssize_t size;
+
+ size = *length;
+ status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
+ addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+ }
+
+ flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+ read_op->rmt_addr = addr;
+ read_op->rmt_addrlen = addrlen;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->read_list, read_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_send()
+ *
+ * Start asynchronous send() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ const void *data,
+ pj_ssize_t *length,
+ unsigned flags)
+{
+ struct write_operation *write_op;
+ pj_status_t status;
+ pj_ssize_t sent;
+
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
+
+ /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
+ flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_send(key->fd, data, &sent, flags);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * Schedule asynchronous send.
+ */
+ write_op->op = PJ_IOQUEUE_OP_SEND;
+ write_op->buf = (void*)data;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->write_list, write_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+
+/*
+ * pj_ioqueue_sendto()
+ *
+ * Start asynchronous write() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ const void *data,
+ pj_ssize_t *length,
+ pj_uint32_t flags,
+ const pj_sockaddr_t *addr,
+ int addrlen)
+{
+ struct write_operation *write_op;
+ pj_status_t status;
+ pj_ssize_t sent;
+
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
+
+ /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
+ flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * Check that address storage can hold the address parameter.
+ */
+ PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
+
+ /*
+ * Schedule asynchronous send.
+ */
+ write_op->op = PJ_IOQUEUE_OP_SEND_TO;
+ write_op->buf = (void*)data;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+ pj_memcpy(&write_op->rmt_addr, addr, addrlen);
+ write_op->rmt_addrlen = addrlen;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->write_list, write_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+#if PJ_HAS_TCP
+/*
+ * Initiate overlapped accept() operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t *new_sock,
+ pj_sockaddr_t *local,
+ pj_sockaddr_t *remote,
+ int *addrlen)
+{
+ struct accept_operation *accept_op;
+ pj_status_t status;
+
+ /* check parameters. All must be specified! */
+ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+ accept_op = (struct accept_operation*)op_key;
+ accept_op->op = 0;
+
+ /* Fast track:
+ * See if there's new connection available immediately.
+ */
+ if (pj_list_empty(&key->accept_list)) {
+ status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! New connection is available! */
+ if (local && addrlen) {
+ status = pj_sock_getsockname(*new_sock, local, addrlen);
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(*new_sock);
+ *new_sock = PJ_INVALID_SOCKET;
+ return status;
+ }
+ }
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * No connection is available immediately.
+ * Schedule accept() operation to be completed when there is incoming
+ * connection available.
+ */
+ accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
+ accept_op->accept_fd = new_sock;
+ accept_op->rmt_addr = remote;
+ accept_op->addrlen= addrlen;
+ accept_op->local_addr = local;
+
+ pj_mutex_lock(key->mutex);
+ pj_list_insert_before(&key->accept_list, accept_op);
+ ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EPENDING;
+}
+
+/*
+ * Initiate overlapped connect() operation (well, it's non-blocking actually,
+ * since there's no overlapped version of connect()).
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
+ const pj_sockaddr_t *addr,
+ int addrlen )
+{
+ pj_status_t status;
+
+ /* check parameters. All must be specified! */
+ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+
+ /* Check if socket has not been marked for connecting */
+ if (key->connecting != 0)
+ return PJ_EPENDING;
+
+ status = pj_sock_connect(key->fd, addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Connected! */
+ return PJ_SUCCESS;
+ } else {
+ if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
+ /* Pending! */
+ pj_mutex_lock(key->mutex);
+ key->connecting = PJ_TRUE;
+ ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
+ pj_mutex_unlock(key->mutex);
+ return PJ_EPENDING;
+ } else {
+ /* Error! */
+ return status;
+ }
+ }
+}
+#endif /* PJ_HAS_TCP */
+
+
+PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
+ pj_size_t size )
+{
+ pj_memset(op_key, 0, size);
+}
+
+
+/*
+ * pj_ioqueue_is_pending()
+ */
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ struct generic_operation *op_rec;
+
+ PJ_UNUSED_ARG(key);
+
+ op_rec = (struct generic_operation*)op_key;
+ return op_rec->op != 0;
+}
+
+
+/*
+ * pj_ioqueue_post_completion()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ struct generic_operation *op_rec;
+
+ /*
+ * Find the operation key in all pending operation list to
+ * really make sure that it's still there; then call the callback.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Find the operation in the pending read list. */
+ op_rec = (struct generic_operation*)key->read_list.next;
+ while (op_rec != (void*)&key->read_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_read_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending write list. */
+ op_rec = (struct generic_operation*)key->write_list.next;
+ while (op_rec != (void*)&key->write_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_write_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending accept list. */
+ op_rec = (struct generic_operation*)key->accept_list.next;
+ while (op_rec != (void*)&key->accept_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_accept_complete)(key, op_key,
+ PJ_INVALID_SOCKET,
+ bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EINVALIDOP;
+}
+