diff options
author | David M. Lee <dlee@digium.com> | 2013-01-07 14:24:28 -0600 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-01-07 14:24:28 -0600 |
commit | f3ab456a17af1c89a6e3be4d20c5944853df1cb0 (patch) | |
tree | d00e1a332cd038a6d906a1ea0ac91e1a4458e617 /pjlib/src/pj/ioqueue_common_abs.c |
Import pjproject-2.0.1
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 1314 |
1 files changed, 1314 insertions, 0 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c new file mode 100644 index 0000000..ee4506d --- /dev/null +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -0,0 +1,1314 @@ +/* $Id: ioqueue_common_abs.c 3666 2011-07-19 08:40:20Z nanang $ */ +/* + * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) + * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/* + * ioqueue_common_abs.c + * + * This contains common functionalities to emulate proactor pattern with + * various event dispatching mechanisms (e.g. select, epoll). + * + * This file will be included by the appropriate ioqueue implementation. + * This file is NOT supposed to be compiled as stand-alone source. + */ + +#define PENDING_RETRY 2 + +static void ioqueue_init( pj_ioqueue_t *ioqueue ) +{ + ioqueue->lock = NULL; + ioqueue->auto_delete_lock = 0; + ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; +} + +static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue) +{ + if (ioqueue->auto_delete_lock && ioqueue->lock ) { + pj_lock_release(ioqueue->lock); + return pj_lock_destroy(ioqueue->lock); + } + + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_set_lock() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, + pj_lock_t *lock, + pj_bool_t auto_delete ) +{ + PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); + + if (ioqueue->auto_delete_lock && ioqueue->lock) { + pj_lock_destroy(ioqueue->lock); + } + + ioqueue->lock = lock; + ioqueue->auto_delete_lock = auto_delete; + + return PJ_SUCCESS; +} + +static pj_status_t ioqueue_init_key( pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + pj_sock_t sock, + void *user_data, + const pj_ioqueue_callback *cb) +{ + pj_status_t rc; + int optlen; + + PJ_UNUSED_ARG(pool); + + key->ioqueue = ioqueue; + key->fd = sock; + key->user_data = user_data; + pj_list_init(&key->read_list); + pj_list_init(&key->write_list); +#if PJ_HAS_TCP + pj_list_init(&key->accept_list); + key->connecting = 0; +#endif + + /* Save callback. */ + pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Set initial reference count to 1 */ + pj_assert(key->ref_count == 0); + ++key->ref_count; + + key->closing = 0; +#endif + + rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency); + if (rc != PJ_SUCCESS) + return rc; + + /* Get socket type. When socket type is datagram, some optimization + * will be performed during send to allow parallel send operations. + */ + optlen = sizeof(key->fd_type); + rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(), + &key->fd_type, &optlen); + if (rc != PJ_SUCCESS) + key->fd_type = pj_SOCK_STREAM(); + + /* Create mutex for the key. */ +#if !PJ_IOQUEUE_HAS_SAFE_UNREG + rc = pj_mutex_create_simple(pool, NULL, &key->mutex); +#endif + + return rc; +} + +/* + * pj_ioqueue_get_user_data() + * + * Obtain value associated with a key. + */ +PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(key != NULL, NULL); + return key->user_data; +} + +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + key->user_data = user_data; + + return PJ_SUCCESS; +} + +PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->write_list); +} + +PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->read_list); +} + +PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key) +{ +#if PJ_HAS_TCP + return !pj_list_empty(&key->accept_list); +#else + PJ_UNUSED_ARG(key); + return 0; +#endif +} + +PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) +{ + return key->connecting; +} + + +#if PJ_IOQUEUE_HAS_SAFE_UNREG +# define IS_CLOSING(key) (key->closing) +#else +# define IS_CLOSING(key) (0) +#endif + + +/* + * ioqueue_dispatch_event() + * + * Report occurence of an event in the key to be processed by the + * framework. + */ +void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) +{ + /* Lock the key. */ + pj_mutex_lock(h->mutex); + + if (IS_CLOSING(h)) { + pj_mutex_unlock(h->mutex); + return; + } + +#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 + if (h->connecting) { + /* Completion of connect() operation */ + pj_status_t status; + pj_bool_t has_lock; + + /* Clear operation. */ + h->connecting = 0; + + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); + + +#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) + /* from connect(2): + * On Linux, use getsockopt to read the SO_ERROR option at + * level SOL_SOCKET to determine whether connect() completed + * successfully (if SO_ERROR is zero). + */ + { + int value; + int vallen = sizeof(value); + int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, + &value, &vallen); + if (gs_rc != 0) { + /* Argh!! What to do now??? + * Just indicate that the socket is connected. The + * application will get error as soon as it tries to use + * the socket to send/receive. + */ + status = PJ_SUCCESS; + } else { + status = PJ_STATUS_FROM_OS(value); + } + } +#elif defined(PJ_WIN32) && PJ_WIN32!=0 + status = PJ_SUCCESS; /* success */ +#else + /* Excellent information in D.J. Bernstein page: + * http://cr.yp.to/docs/connect.html + * + * Seems like the most portable way of detecting connect() + * failure is to call getpeername(). If socket is connected, + * getpeername() will return 0. If the socket is not connected, + * it will return ENOTCONN, and read(fd, &ch, 1) will produce + * the right errno through error slippage. This is a combination + * of suggestions from Douglas C. Schmidt and Ken Keys. + */ + { + struct sockaddr_in addr; + int addrlen = sizeof(addr); + + status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr, + &addrlen); + } +#endif + + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } + + /* Call callback. */ + if (h->cb.on_connect_complete && !IS_CLOSING(h)) + (*h->cb.on_connect_complete)(h, status); + + /* Unlock if we still hold the lock */ + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + + /* Done. */ + + } else +#endif /* PJ_HAS_TCP */ + if (key_has_pending_write(h)) { + /* Socket is writable. */ + struct write_operation *write_op; + pj_ssize_t sent; + pj_status_t send_rc = PJ_SUCCESS; + + /* Get the first in the queue. */ + write_op = h->write_list.next; + + /* For datagrams, we can remove the write_op from the list + * so that send() can work in parallel. + */ + if (h->fd_type == pj_SOCK_DGRAM()) { + pj_list_erase(write_op); + + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); + + } + + /* Send the data. + * Unfortunately we must do this while holding key's mutex, thus + * preventing parallel write on a single key.. :-(( + */ + sent = write_op->size - write_op->written; + if (write_op->op == PJ_IOQUEUE_OP_SEND) { + send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, + &sent, write_op->flags); + /* Can't do this. We only clear "op" after we're finished sending + * the whole buffer. + */ + //write_op->op = 0; + } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { + int retry = 2; + while (--retry >= 0) { + send_rc = pj_sock_sendto(h->fd, + write_op->buf+write_op->written, + &sent, write_op->flags, + &write_op->rmt_addr, + write_op->rmt_addrlen); +#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ + PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 + /* Special treatment for dead UDP sockets here, see ticket #1107 */ + if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) && + h->fd_type==pj_SOCK_DGRAM()) + { + PJ_PERROR(4,(THIS_FILE, send_rc, + "Send error for socket %d, retrying", + h->fd)); + replace_udp_sock(h); + continue; + } +#endif + break; + } + + /* Can't do this. We only clear "op" after we're finished sending + * the whole buffer. + */ + //write_op->op = 0; + } else { + pj_assert(!"Invalid operation type!"); + write_op->op = PJ_IOQUEUE_OP_NONE; + send_rc = PJ_EBUG; + } + + if (send_rc == PJ_SUCCESS) { + write_op->written += sent; + } else { + pj_assert(send_rc > 0); + write_op->written = -send_rc; + } + + /* Are we finished with this buffer? */ + if (send_rc!=PJ_SUCCESS || + write_op->written == (pj_ssize_t)write_op->size || + h->fd_type == pj_SOCK_DGRAM()) + { + pj_bool_t has_lock; + + write_op->op = PJ_IOQUEUE_OP_NONE; + + if (h->fd_type != pj_SOCK_DGRAM()) { + /* Write completion of the whole stream. */ + pj_list_erase(write_op); + + /* Clear operation if there's no more data to send. */ + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); + + } + + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } + + /* Call callback. */ + if (h->cb.on_write_complete && !IS_CLOSING(h)) { + (*h->cb.on_write_complete)(h, + (pj_ioqueue_op_key_t*)write_op, + write_op->written); + } + + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + + } else { + pj_mutex_unlock(h->mutex); + } + + /* Done. */ + } else { + /* + * This is normal; execution may fall here when multiple threads + * are signalled for the same event, but only one thread eventually + * able to process the event. + */ + pj_mutex_unlock(h->mutex); + } +} + +void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) +{ + pj_status_t rc; + + /* Lock the key. */ + pj_mutex_lock(h->mutex); + + if (IS_CLOSING(h)) { + pj_mutex_unlock(h->mutex); + return; + } + +# if PJ_HAS_TCP + if (!pj_list_empty(&h->accept_list)) { + + struct accept_operation *accept_op; + pj_bool_t has_lock; + + /* Get one accept operation from the list. */ + accept_op = h->accept_list.next; + pj_list_erase(accept_op); + accept_op->op = PJ_IOQUEUE_OP_NONE; + + /* Clear bit in fdset if there is no more pending accept */ + if (pj_list_empty(&h->accept_list)) + ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT); + + rc=pj_sock_accept(h->fd, accept_op->accept_fd, + accept_op->rmt_addr, accept_op->addrlen); + if (rc==PJ_SUCCESS && accept_op->local_addr) { + rc = pj_sock_getsockname(*accept_op->accept_fd, + accept_op->local_addr, + accept_op->addrlen); + } + + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } + + /* Call callback. */ + if (h->cb.on_accept_complete && !IS_CLOSING(h)) { + (*h->cb.on_accept_complete)(h, + (pj_ioqueue_op_key_t*)accept_op, + *accept_op->accept_fd, rc); + } + + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + } + else +# endif + if (key_has_pending_read(h)) { + struct read_operation *read_op; + pj_ssize_t bytes_read; + pj_bool_t has_lock; + + /* Get one pending read operation from the list. */ + read_op = h->read_list.next; + pj_list_erase(read_op); + + /* Clear fdset if there is no pending read. */ + if (pj_list_empty(&h->read_list)) + ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT); + + bytes_read = read_op->size; + + if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { + read_op->op = PJ_IOQUEUE_OP_NONE; + rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, + read_op->flags, + read_op->rmt_addr, + read_op->rmt_addrlen); + } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { + read_op->op = PJ_IOQUEUE_OP_NONE; + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, + read_op->flags); + } else { + pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); + read_op->op = PJ_IOQUEUE_OP_NONE; + /* + * User has specified pj_ioqueue_read(). + * On Win32, we should do ReadFile(). But because we got + * here because of select() anyway, user must have put a + * socket descriptor on h->fd, which in this case we can + * just call pj_sock_recv() instead of ReadFile(). + * On Unix, user may put a file in h->fd, so we'll have + * to call read() here. + * This may not compile on systems which doesn't have + * read(). That's why we only specify PJ_LINUX here so + * that error is easier to catch. + */ +# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \ + defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0 + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, + read_op->flags); + //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, + // &bytes_read, NULL); +# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) + bytes_read = read(h->fd, read_op->buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); +# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 + bytes_read = sys_read(h->fd, read_op->buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; +# else +# error "Implement read() for this platform!" +# endif + } + + if (rc != PJ_SUCCESS) { +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + /* On Win32, for UDP, WSAECONNRESET on the receive side + * indicates that previous sending has triggered ICMP Port + * Unreachable message. + * But we wouldn't know at this point which one of previous + * key that has triggered the error, since UDP socket can + * be shared! + * So we'll just ignore it! + */ + + if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { + //PJ_LOG(4,(THIS_FILE, + // "Ignored ICMP port unreach. on key=%p", h)); + } +# endif + + /* In any case we would report this to caller. */ + bytes_read = -rc; + +#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ + PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 + /* Special treatment for dead UDP sockets here, see ticket #1107 */ + if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) && + h->fd_type==pj_SOCK_DGRAM()) + { + replace_udp_sock(h); + } +#endif + } + + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } + + /* Call callback. */ + if (h->cb.on_read_complete && !IS_CLOSING(h)) { + (*h->cb.on_read_complete)(h, + (pj_ioqueue_op_key_t*)read_op, + bytes_read); + } + + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + + } else { + /* + * This is normal; execution may fall here when multiple threads + * are signalled for the same event, but only one thread eventually + * able to process the event. + */ + pj_mutex_unlock(h->mutex); + } +} + + +void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) +{ + pj_bool_t has_lock; + + pj_mutex_lock(h->mutex); + + if (!h->connecting) { + /* It is possible that more than one thread was woken up, thus + * the remaining thread will see h->connecting as zero because + * it has been processed by other thread. + */ + pj_mutex_unlock(h->mutex); + return; + } + + if (IS_CLOSING(h)) { + pj_mutex_unlock(h->mutex); + return; + } + + /* Clear operation. */ + h->connecting = 0; + + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } + + /* Call callback. */ + if (h->cb.on_connect_complete && !IS_CLOSING(h)) { + pj_status_t status = -1; +#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) + int value; + int vallen = sizeof(value); + int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, + &value, &vallen); + if (gs_rc == 0) { + status = PJ_RETURN_OS_ERROR(value); + } +#endif + + (*h->cb.on_connect_complete)(h, status); + } + + if (has_lock) { + pj_mutex_unlock(h->mutex); + } +} + +/* + * pj_ioqueue_recv() + * + * Start asynchronous recv() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags ) +{ + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Check if key is closing (need to do this first before accessing + * other variables, since they might have been destroyed. See ticket + * #469). + */ + if (IS_CLOSING(key)) + return PJ_ECANCELLED; + + read_op = (struct read_operation*)op_key; + read_op->op = PJ_IOQUEUE_OP_NONE; + + /* Try to see if there's data immediately available. + */ + if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) { + pj_status_t status; + pj_ssize_t size; + + size = *length; + status = pj_sock_recv(key->fd, buffer, &size, flags); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + } + + flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op->op = PJ_IOQUEUE_OP_RECV; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + + pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_recvfrom() + * + * Start asynchronous recvfrom() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags, + pj_sockaddr_t *addr, + int *addrlen) +{ + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Check if key is closing. */ + if (IS_CLOSING(key)) + return PJ_ECANCELLED; + + read_op = (struct read_operation*)op_key; + read_op->op = PJ_IOQUEUE_OP_NONE; + + /* Try to see if there's data immediately available. + */ + if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) { + pj_status_t status; + pj_ssize_t size; + + size = *length; + status = pj_sock_recvfrom(key->fd, buffer, &size, flags, + addr, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + } + + flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op->op = PJ_IOQUEUE_OP_RECV_FROM; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + read_op->rmt_addr = addr; + read_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_send() + * + * Start asynchronous send() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags) +{ + struct write_operation *write_op; + pj_status_t status; + unsigned retry; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Check if key is closing. */ + if (IS_CLOSING(key)) + return PJ_ECANCELLED; + + /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */ + flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_send(key->fd, data, &sent, flags); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + + /* Spin if write_op has pending operation */ + for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry) + pj_thread_sleep(0); + + /* Last chance */ + if (write_op->op) { + /* Unable to send packet because there is already pending write in the + * write_op. We could not put the operation into the write_op + * because write_op already contains a pending operation! And + * we could not send the packet directly with send() either, + * because that will break the order of the packet. So we can + * only return error here. + * + * This could happen for example in multithreads program, + * where polling is done by one thread, while other threads are doing + * the sending only. If the polling thread runs on lower priority + * than the sending thread, then it's possible that the pending + * write flag is not cleared in-time because clearing is only done + * during polling. + * + * Aplication should specify multiple write operation keys on + * situation like this. + */ + //pj_assert(!"ioqueue: there is pending operation on this key!"); + return PJ_EBUSY; + } + + write_op->op = PJ_IOQUEUE_OP_SEND; + write_op->buf = (char*)data; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + + pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + + +/* + * pj_ioqueue_sendto() + * + * Start asynchronous write() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + pj_uint32_t flags, + const pj_sockaddr_t *addr, + int addrlen) +{ + struct write_operation *write_op; + unsigned retry; + pj_bool_t restart_retry = PJ_FALSE; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + +#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ + PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 +retry_on_restart: +#else + PJ_UNUSED_ARG(restart_retry); +#endif + /* Check if key is closing. */ + if (IS_CLOSING(key)) + return PJ_ECANCELLED; + + /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */ + flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { +#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ + PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 + /* Special treatment for dead UDP sockets here, see ticket #1107 */ + if (status==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) && + key->fd_type==pj_SOCK_DGRAM() && !restart_retry) + { + PJ_PERROR(4,(THIS_FILE, status, + "Send error for socket %d, retrying", + key->fd)); + replace_udp_sock(key); + restart_retry = PJ_TRUE; + goto retry_on_restart; + } +#endif + + return status; + } + status = status; + } + } + + /* + * Check that address storage can hold the address parameter. + */ + PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG); + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + + /* Spin if write_op has pending operation */ + for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry) + pj_thread_sleep(0); + + /* Last chance */ + if (write_op->op) { + /* Unable to send packet because there is already pending write on the + * write_op. We could not put the operation into the write_op + * because write_op already contains a pending operation! And + * we could not send the packet directly with sendto() either, + * because that will break the order of the packet. So we can + * only return error here. + * + * This could happen for example in multithreads program, + * where polling is done by one thread, while other threads are doing + * the sending only. If the polling thread runs on lower priority + * than the sending thread, then it's possible that the pending + * write flag is not cleared in-time because clearing is only done + * during polling. + * + * Aplication should specify multiple write operation keys on + * situation like this. + */ + //pj_assert(!"ioqueue: there is pending operation on this key!"); + return PJ_EBUSY; + } + + write_op->op = PJ_IOQUEUE_OP_SEND_TO; + write_op->buf = (char*)data; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + pj_memcpy(&write_op->rmt_addr, addr, addrlen); + write_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +#if PJ_HAS_TCP +/* + * Initiate overlapped accept() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{ + struct accept_operation *accept_op; + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* Check if key is closing. */ + if (IS_CLOSING(key)) + return PJ_ECANCELLED; + + accept_op = (struct accept_operation*)op_key; + accept_op->op = PJ_IOQUEUE_OP_NONE; + + /* Fast track: + * See if there's new connection available immediately. + */ + if (pj_list_empty(&key->accept_list)) { + status = pj_sock_accept(key->fd, new_sock, remote, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! New connection is available! */ + if (local && addrlen) { + status = pj_sock_getsockname(*new_sock, local, addrlen); + if (status != PJ_SUCCESS) { + pj_sock_close(*new_sock); + *new_sock = PJ_INVALID_SOCKET; + return status; + } + } + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * No connection is available immediately. + * Schedule accept() operation to be completed when there is incoming + * connection available. + */ + accept_op->op = PJ_IOQUEUE_OP_ACCEPT; + accept_op->accept_fd = new_sock; + accept_op->rmt_addr = remote; + accept_op->addrlen= addrlen; + accept_op->local_addr = local; + + pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } + pj_list_insert_before(&key->accept_list, accept_op); + ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * Initiate overlapped connect() operation (well, it's non-blocking actually, + * since there's no overlapped version of connect()). + */ +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, + const pj_sockaddr_t *addr, + int addrlen ) +{ + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + + /* Check if key is closing. */ + if (IS_CLOSING(key)) + return PJ_ECANCELLED; + + /* Check if socket has not been marked for connecting */ + if (key->connecting != 0) + return PJ_EPENDING; + + status = pj_sock_connect(key->fd, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Connected! */ + return PJ_SUCCESS; + } else { + if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { + /* Pending! */ + pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous + * check in multithreaded app. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } + key->connecting = PJ_TRUE; + ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT); + pj_mutex_unlock(key->mutex); + return PJ_EPENDING; + } else { + /* Error! */ + return status; + } + } +} +#endif /* PJ_HAS_TCP */ + + +PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key, + pj_size_t size ) +{ + pj_bzero(op_key, size); +} + + +/* + * pj_ioqueue_is_pending() + */ +PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key ) +{ + struct generic_operation *op_rec; + + PJ_UNUSED_ARG(key); + + op_rec = (struct generic_operation*)op_key; + return op_rec->op != 0; +} + + +/* + * pj_ioqueue_post_completion() + */ +PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_status ) +{ + struct generic_operation *op_rec; + + /* + * Find the operation key in all pending operation list to + * really make sure that it's still there; then call the callback. + */ + pj_mutex_lock(key->mutex); + + /* Find the operation in the pending read list. */ + op_rec = (struct generic_operation*)key->read_list.next; + while (op_rec != (void*)&key->read_list) { + if (op_rec == (void*)op_key) { + pj_list_erase(op_rec); + op_rec->op = PJ_IOQUEUE_OP_NONE; + pj_mutex_unlock(key->mutex); + + (*key->cb.on_read_complete)(key, op_key, bytes_status); + return PJ_SUCCESS; + } + op_rec = op_rec->next; + } + + /* Find the operation in the pending write list. */ + op_rec = (struct generic_operation*)key->write_list.next; + while (op_rec != (void*)&key->write_list) { + if (op_rec == (void*)op_key) { + pj_list_erase(op_rec); + op_rec->op = PJ_IOQUEUE_OP_NONE; + pj_mutex_unlock(key->mutex); + + (*key->cb.on_write_complete)(key, op_key, bytes_status); + return PJ_SUCCESS; + } + op_rec = op_rec->next; + } + + /* Find the operation in the pending accept list. */ + op_rec = (struct generic_operation*)key->accept_list.next; + while (op_rec != (void*)&key->accept_list) { + if (op_rec == (void*)op_key) { + pj_list_erase(op_rec); + op_rec->op = PJ_IOQUEUE_OP_NONE; + pj_mutex_unlock(key->mutex); + + (*key->cb.on_accept_complete)(key, op_key, + PJ_INVALID_SOCKET, + bytes_status); + return PJ_SUCCESS; + } + op_rec = op_rec->next; + } + + pj_mutex_unlock(key->mutex); + + return PJ_EINVALIDOP; +} + +PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); + ioqueue->default_concurrency = allow; + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is + * disabled. + */ + PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); + + key->allow_concurrent = allow; + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) +{ + return pj_mutex_lock(key->mutex); +} + +PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) +{ + return pj_mutex_unlock(key->mutex); +} + |