From 97611d9f0a7809a759a0a0603f6d45f5822ad170 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sun, 6 Nov 2005 13:32:11 +0000 Subject: Put common ioqueue functionalities in separate file to be used by both select() and epoll git-svn-id: http://svn.pjsip.org/repos/pjproject/main@12 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/build/pjlib.dsp | 22 +- pjlib/include/pj/ioqueue.h | 24 +- pjlib/include/pj/sock.h | 27 +- pjlib/src/pj/ioqueue_common_abs.c | 813 +++++++++++++++++++++++++++++++++ pjlib/src/pj/ioqueue_common_abs.h | 107 +++++ pjlib/src/pj/ioqueue_select.c | 933 ++++++-------------------------------- pjlib/src/pj/sock_bsd.c | 15 +- pjlib/src/pj/sock_linux_kernel.c | 13 +- pjlib/src/pjlib-test/ioq_perf.c | 2 +- pjlib/src/pjlib-test/ioq_tcp.c | 4 - 10 files changed, 1113 insertions(+), 847 deletions(-) create mode 100644 pjlib/src/pj/ioqueue_common_abs.c create mode 100644 pjlib/src/pj/ioqueue_common_abs.h diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp index 23e9f0a2..07550f90 100644 --- a/pjlib/build/pjlib.dsp +++ b/pjlib/build/pjlib.dsp @@ -206,29 +206,21 @@ SOURCE=..\src\pj\hash.c # End Source File # Begin Source File -SOURCE=..\src\pj\ioqueue_select.c - -!IF "$(CFG)" == "pjlib - Win32 Release" - +SOURCE=..\src\pj\ioqueue_common_abs.c # PROP Exclude_From_Build 1 +# End Source File +# Begin Source File -!ELSEIF "$(CFG)" == "pjlib - Win32 Debug" - -!ENDIF +SOURCE=..\src\pj\ioqueue_common_abs.h +# End Source File +# Begin Source File +SOURCE=..\src\pj\ioqueue_select.c # End Source File # Begin Source File SOURCE=..\src\pj\ioqueue_winnt.c - -!IF "$(CFG)" == "pjlib - Win32 Release" - -!ELSEIF "$(CFG)" == "pjlib - Win32 Debug" - # PROP Exclude_From_Build 1 - -!ENDIF - # End Source File # Begin Source File diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index 6a7b827e..2e084fd3 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -82,11 +82,10 @@ PJ_BEGIN_DECL * * The items below describe rules that must be obeyed when using the I/O * queue, with regard to concurrency: - * - in general, the I/O queue is thread safe (assuming the lock strategy - * is not changed to disable mutex protection). All operations, except - * unregistration which is described below, can be safely invoked - * simultaneously by multiple threads. - * - however, care must be taken when unregistering a key from the + * - simultaneous operations (by different threads) to different key is safe. + * - simultaneous operations to the same key is also safe, except + * unregistration, which is described below. + * - care must be taken when unregistering a key from the * ioqueue. Application must take care that when one thread is issuing * an unregistration, other thread is not simultaneously invoking an * operation to the same key. @@ -205,11 +204,16 @@ typedef enum pj_ioqueue_operation_e } pj_ioqueue_operation_e; -/** - * Indicates that the I/O Queue should be created to handle reasonable - * number of threads. - */ -#define PJ_IOQUEUE_DEFAULT_THREADS 0 +/** + * This macro specifies the maximum number of events that can be + * processed by the ioqueue on a single poll cycle, on implementation + * that supports it. The value is only meaningfull when specified + * during PJLIB build. + */ +#ifndef PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL +# define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL (16) +#endif + /** * Create a new I/O Queue framework. diff --git a/pjlib/include/pj/sock.h b/pjlib/include/pj/sock.h index 79f161f2..6dfbaf29 100644 --- a/pjlib/include/pj/sock.h +++ b/pjlib/include/pj/sock.h @@ -69,7 +69,7 @@ extern const pj_uint16_t PJ_SOCK_RDM; /**< Reliably-delivered messages. */ /** - * Socket level specified in #pj_sock_setsockopt(). + * Socket level specified in #pj_sock_setsockopt() or #pj_sock_getsockopt(). * APPLICATION MUST USE THESE VALUES INSTEAD OF NORMAL SOL_*, BECAUSE * THE LIBRARY WILL TRANSLATE THE VALUE TO THE NATIVE VALUE. */ @@ -78,6 +78,15 @@ extern const pj_uint16_t PJ_SOL_IP; /**< IP level. */ extern const pj_uint16_t PJ_SOL_TCP; /**< TCP level. */ extern const pj_uint16_t PJ_SOL_UDP; /**< UDP level. */ extern const pj_uint16_t PJ_SOL_IPV6; /**< IP version 6 */ + +/** + * Values to be specified as \c optname when calling #pj_sock_setsockopt() + * or #pj_sock_getsockopt(). + */ +extern const pj_uint16_t PJ_SO_TYPE; /**< Socket type. */ +extern const pj_uint16_t PJ_SO_RCVBUF; /**< Buffer size for receive. */ +extern const pj_uint16_t PJ_SO_SNDBUF; /**< Buffer size for send. */ + /** * Flags to be specified in #pj_sock_recv, #pj_sock_send, etc. @@ -419,7 +428,7 @@ PJ_DECL(pj_status_t) pj_sock_socket(int family, * @return Zero on success. */ PJ_DECL(pj_status_t) pj_sock_close(pj_sock_t sockfd); - + /** * This function gives the socket sockfd the local address my_addr. my_addr is @@ -539,8 +548,7 @@ PJ_DECL(pj_status_t) pj_sock_getsockname( pj_sock_t sockfd, * * @param sockfd The socket descriptor. * @param level The level which to get the option from. - * @param optname The option name, which will be passed uninterpreted - * by the library. + * @param optname The option name. * @param optval Identifies the buffer which the value will be * returned. * @param optlen Initially contains the length of the buffer, upon @@ -549,8 +557,8 @@ PJ_DECL(pj_status_t) pj_sock_getsockname( pj_sock_t sockfd, * @return Zero on success. */ PJ_DECL(pj_status_t) pj_sock_getsockopt( pj_sock_t sockfd, - int level, - int optname, + pj_uint16_t level, + pj_uint16_t optname, void *optval, int *optlen); /** @@ -560,16 +568,15 @@ PJ_DECL(pj_status_t) pj_sock_getsockopt( pj_sock_t sockfd, * * @param sockfd The socket descriptor. * @param level The level which to get the option from. - * @param optname The option name, which will be passed uninterpreted - * by the library. + * @param optname The option name. * @param optval Identifies the buffer which contain the value. * @param optlen The length of the value. * * @return PJ_SUCCESS or the status code. */ PJ_DECL(pj_status_t) pj_sock_setsockopt( pj_sock_t sockfd, - int level, - int optname, + pj_uint16_t level, + pj_uint16_t optname, const void *optval, int optlen); diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c new file mode 100644 index 00000000..b5599d9c --- /dev/null +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -0,0 +1,813 @@ +/* $Id$ */ + +#include +#include +#include +#include +#include +#include +#include + + +static void ioqueue_init( pj_ioqueue_t *ioqueue ) +{ + ioqueue->lock = NULL; + ioqueue->auto_delete_lock = 0; +} + +static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue) +{ + if (ioqueue->auto_delete_lock && ioqueue->lock ) + return pj_lock_destroy(ioqueue->lock); + else + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_set_lock() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, + pj_lock_t *lock, + pj_bool_t auto_delete ) +{ + PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); + + if (ioqueue->auto_delete_lock && ioqueue->lock) { + pj_lock_destroy(ioqueue->lock); + } + + ioqueue->lock = lock; + ioqueue->auto_delete_lock = auto_delete; + + return PJ_SUCCESS; +} + +static pj_status_t ioqueue_init_key( pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + pj_sock_t sock, + void *user_data, + const pj_ioqueue_callback *cb) +{ + pj_status_t rc; + int optlen; + + key->ioqueue = ioqueue; + key->fd = sock; + key->user_data = user_data; + pj_list_init(&key->read_list); + pj_list_init(&key->write_list); +#if PJ_HAS_TCP + pj_list_init(&key->accept_list); +#endif + + /* Save callback. */ + pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + + /* Get socket type. When socket type is datagram, some optimization + * will be performed during send to allow parallel send operations. + */ + optlen = sizeof(key->fd_type); + rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE, + &key->fd_type, &optlen); + if (rc != PJ_SUCCESS) + key->fd_type = PJ_SOCK_STREAM; + + /* Create mutex for the key. */ + rc = pj_mutex_create_simple(pool, NULL, &key->mutex); + + return rc; +} + +static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) +{ + pj_mutex_destroy(key->mutex); +} + +/* + * pj_ioqueue_get_user_data() + * + * Obtain value associated with a key. + */ +PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(key != NULL, NULL); + return key->user_data; +} + +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + key->user_data = user_data; + + return PJ_SUCCESS; +} + +PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->write_list); +} + +PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->read_list); +} + +PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key) +{ +#if PJ_HAS_TCP + return !pj_list_empty(&key->accept_list); +#else + return 0; +#endif +} + +PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) +{ + return key->connecting; +} + + +/* + * ioqueue_dispatch_event() + * + * Report occurence of an event in the key to be processed by the + * framework. + */ +void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) +{ + /* Lock the key. */ + pj_mutex_lock(h->mutex); + +#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 + if (h->connecting) { + /* Completion of connect() operation */ + pj_ssize_t bytes_transfered; + + /* Clear operation. */ + h->connecting = 0; + + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + +#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) + /* from connect(2): + * On Linux, use getsockopt to read the SO_ERROR option at + * level SOL_SOCKET to determine whether connect() completed + * successfully (if SO_ERROR is zero). + */ + int value; + socklen_t vallen = sizeof(value); + int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, + &value, &vallen); + if (gs_rc != 0) { + /* Argh!! What to do now??? + * Just indicate that the socket is connected. The + * application will get error as soon as it tries to use + * the socket to send/receive. + */ + bytes_transfered = 0; + } else { + bytes_transfered = value; + } +#elif defined(PJ_WIN32) && PJ_WIN32!=0 + bytes_transfered = 0; /* success */ +#else + /* Excellent information in D.J. Bernstein page: + * http://cr.yp.to/docs/connect.html + * + * Seems like the most portable way of detecting connect() + * failure is to call getpeername(). If socket is connected, + * getpeername() will return 0. If the socket is not connected, + * it will return ENOTCONN, and read(fd, &ch, 1) will produce + * the right errno through error slippage. This is a combination + * of suggestions from Douglas C. Schmidt and Ken Keys. + */ + int gp_rc; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); + bytes_transfered = gp_rc; +#endif + + /* Call callback. */ + if (h->cb.on_connect_complete) + (*h->cb.on_connect_complete)(h, bytes_transfered); + + /* Done. */ + + } else +#endif /* PJ_HAS_TCP */ + if (key_has_pending_write(h)) { + /* Socket is writable. */ + struct write_operation *write_op; + pj_ssize_t sent; + pj_status_t send_rc; + + /* Get the first in the queue. */ + write_op = h->write_list.next; + + /* For datagrams, we can remove the write_op from the list + * so that send() can work in parallel. + */ + if (h->fd_type == PJ_SOCK_DGRAM) { + pj_list_erase(write_op); + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + + pj_mutex_unlock(h->mutex); + } + + /* Send the data. + * Unfortunately we must do this while holding key's mutex, thus + * preventing parallel write on a single key.. :-(( + */ + sent = write_op->size - write_op->written; + if (write_op->op == PJ_IOQUEUE_OP_SEND) { + send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, + &sent, write_op->flags); + } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { + send_rc = pj_sock_sendto(h->fd, + write_op->buf+write_op->written, + &sent, write_op->flags, + &write_op->rmt_addr, + write_op->rmt_addrlen); + } else { + pj_assert(!"Invalid operation type!"); + send_rc = PJ_EBUG; + } + + if (send_rc == PJ_SUCCESS) { + write_op->written += sent; + } else { + pj_assert(send_rc > 0); + write_op->written = -send_rc; + } + + /* Are we finished with this buffer? */ + if (send_rc!=PJ_SUCCESS || + write_op->written == (pj_ssize_t)write_op->size || + h->fd_type == PJ_SOCK_DGRAM) + { + if (h->fd_type != PJ_SOCK_DGRAM) { + /* Write completion of the whole stream. */ + pj_list_erase(write_op); + + /* Clear operation if there's no more data to send. */ + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + + /* No need to hold mutex anymore */ + pj_mutex_unlock(h->mutex); + } + + /* Call callback. */ + if (h->cb.on_write_complete) { + (*h->cb.on_write_complete)(h, + (pj_ioqueue_op_key_t*)write_op, + write_op->written); + } + + } else { + pj_mutex_unlock(h->mutex); + } + + /* Done. */ + } else { + pj_assert(!"Descriptor is signaled but key " + "has no pending operation!"); + + pj_mutex_unlock(h->mutex); + } +} + +void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) +{ + pj_status_t rc; + + /* Lock the key. */ + pj_mutex_lock(h->mutex); + +# if PJ_HAS_TCP + if (!pj_list_empty(&h->accept_list)) { + + struct accept_operation *accept_op; + + /* Get one accept operation from the list. */ + accept_op = h->accept_list.next; + pj_list_erase(accept_op); + + /* Clear bit in fdset if there is no more pending accept */ + if (pj_list_empty(&h->accept_list)) + ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + + rc=pj_sock_accept(h->fd, accept_op->accept_fd, + accept_op->rmt_addr, accept_op->addrlen); + if (rc==PJ_SUCCESS && accept_op->local_addr) { + rc = pj_sock_getsockname(*accept_op->accept_fd, + accept_op->local_addr, + accept_op->addrlen); + } + + /* Call callback. */ + if (h->cb.on_accept_complete) + (*h->cb.on_accept_complete)(h, + (pj_ioqueue_op_key_t*)accept_op, + *accept_op->accept_fd, rc); + + } + else +# endif + if (key_has_pending_read(h)) { + struct read_operation *read_op; + pj_ssize_t bytes_read; + + pj_assert(!pj_list_empty(&h->read_list)); + + /* Get one pending read operation from the list. */ + read_op = h->read_list.next; + pj_list_erase(read_op); + + /* Clear fdset if there is no pending read. */ + if (pj_list_empty(&h->read_list)) + ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + + bytes_read = read_op->size; + + if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { + rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, + read_op->rmt_addr, + read_op->rmt_addrlen); + } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + } else { + pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); + /* + * User has specified pj_ioqueue_read(). + * On Win32, we should do ReadFile(). But because we got + * here because of select() anyway, user must have put a + * socket descriptor on h->fd, which in this case we can + * just call pj_sock_recv() instead of ReadFile(). + * On Unix, user may put a file in h->fd, so we'll have + * to call read() here. + * This may not compile on systems which doesn't have + * read(). That's why we only specify PJ_LINUX here so + * that error is easier to catch. + */ +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, + // &bytes_read, NULL); +# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) + bytes_read = read(h->fd, h->rd_buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); +# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 + bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; +# else +# error "Implement read() for this platform!" +# endif + } + + if (rc != PJ_SUCCESS) { +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + /* On Win32, for UDP, WSAECONNRESET on the receive side + * indicates that previous sending has triggered ICMP Port + * Unreachable message. + * But we wouldn't know at this point which one of previous + * key that has triggered the error, since UDP socket can + * be shared! + * So we'll just ignore it! + */ + + if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { + //PJ_LOG(4,(THIS_FILE, + // "Ignored ICMP port unreach. on key=%p", h)); + } +# endif + + /* In any case we would report this to caller. */ + bytes_read = -rc; + } + + /* Call callback. */ + if (h->cb.on_read_complete) { + (*h->cb.on_read_complete)(h, + (pj_ioqueue_op_key_t*)read_op, + bytes_read); + } + + } else { + pj_mutex_unlock(h->mutex); + } +} + + +void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) +{ + pj_mutex_lock(h->mutex); + + if (!h->connecting) { + /* It is possible that more than one thread was woken up, thus + * the remaining thread will see h->connecting as zero because + * it has been processed by other thread. + */ + pj_mutex_unlock(h->mutex); + return; + } + + /* Clear operation. */ + h->connecting = 0; + + pj_mutex_unlock(h->mutex); + + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + + /* Call callback. */ + if (h->cb.on_connect_complete) + (*h->cb.on_connect_complete)(h, -1); +} + +/* + * pj_ioqueue_recv() + * + * Start asynchronous recv() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags ) +{ + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recv(key->fd, buffer, &size, flags); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_recvfrom() + * + * Start asynchronous recvfrom() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags, + pj_sockaddr_t *addr, + int *addrlen) +{ + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recvfrom(key->fd, buffer, &size, flags, + addr, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV_FROM; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + read_op->rmt_addr = addr; + read_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_send() + * + * Start asynchronous send() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags) +{ + struct write_operation *write_op; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_send(key->fd, data, &sent, flags); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + + +/* + * pj_ioqueue_sendto() + * + * Start asynchronous write() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags, + const pj_sockaddr_t *addr, + int addrlen) +{ + struct write_operation *write_op; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Check that address storage can hold the address parameter. + */ + PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND_TO; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + pj_memcpy(&write_op->rmt_addr, addr, addrlen); + write_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +#if PJ_HAS_TCP +/* + * Initiate overlapped accept() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{ + struct accept_operation *accept_op; + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* Fast track: + * See if there's new connection available immediately. + */ + if (pj_list_empty(&key->accept_list)) { + status = pj_sock_accept(key->fd, new_sock, remote, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! New connection is available! */ + if (local && addrlen) { + status = pj_sock_getsockname(*new_sock, local, addrlen); + if (status != PJ_SUCCESS) { + pj_sock_close(*new_sock); + *new_sock = PJ_INVALID_SOCKET; + return status; + } + } + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * No connection is available immediately. + * Schedule accept() operation to be completed when there is incoming + * connection available. + */ + accept_op = (struct accept_operation*)op_key; + + accept_op->op = PJ_IOQUEUE_OP_ACCEPT; + accept_op->accept_fd = new_sock; + accept_op->rmt_addr = remote; + accept_op->addrlen= addrlen; + accept_op->local_addr = local; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->accept_list, accept_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * Initiate overlapped connect() operation (well, it's non-blocking actually, + * since there's no overlapped version of connect()). + */ +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, + const pj_sockaddr_t *addr, + int addrlen ) +{ + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + + /* Check if socket has not been marked for connecting */ + if (key->connecting != 0) + return PJ_EPENDING; + + status = pj_sock_connect(key->fd, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Connected! */ + return PJ_SUCCESS; + } else { + if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { + /* Pending! */ + pj_mutex_lock(key->mutex); + key->connecting = PJ_TRUE; + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT); + pj_mutex_unlock(key->mutex); + return PJ_EPENDING; + } else { + /* Error! */ + return status; + } + } +} +#endif /* PJ_HAS_TCP */ + diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h new file mode 100644 index 00000000..c6fc1ff6 --- /dev/null +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -0,0 +1,107 @@ +/* $Id */ + +/* ioqueue_common_abs.h + * + * This file contains private declarations for abstracting various + * event polling/dispatching mechanisms (e.g. select, poll, epoll) + * to the ioqueue. + */ + +#include + +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Error reporting must be enabled for this function to work!" +#endif + + +struct generic_operation +{ + PJ_DECL_LIST_MEMBER(struct generic_operation); + pj_ioqueue_operation_e op; +}; + +struct read_operation +{ + PJ_DECL_LIST_MEMBER(struct read_operation); + pj_ioqueue_operation_e op; + + void *buf; + pj_size_t size; + unsigned flags; + pj_sockaddr_t *rmt_addr; + int *rmt_addrlen; +}; + +struct write_operation +{ + PJ_DECL_LIST_MEMBER(struct write_operation); + pj_ioqueue_operation_e op; + + char *buf; + pj_size_t size; + pj_ssize_t written; + unsigned flags; + pj_sockaddr_in rmt_addr; + int rmt_addrlen; +}; + +#if PJ_HAS_TCP +struct accept_operation +{ + PJ_DECL_LIST_MEMBER(struct accept_operation); + pj_ioqueue_operation_e op; + + pj_sock_t *accept_fd; + pj_sockaddr_t *local_addr; + pj_sockaddr_t *rmt_addr; + int *addrlen; +}; +#endif + +union operation_key +{ + struct generic_operation generic; + struct read_operation read; + struct write_operation write; +#if PJ_HAS_TCP + struct accept_operation accept; +#endif +}; + +#define DECLARE_COMMON_KEY \ + PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ + pj_ioqueue_t *ioqueue; \ + pj_mutex_t *mutex; \ + pj_sock_t fd; \ + int fd_type; \ + void *user_data; \ + pj_ioqueue_callback cb; \ + int connecting; \ + struct read_operation read_list; \ + struct write_operation write_list; \ + struct accept_operation accept_list; + + +#define DECLARE_COMMON_IOQUEUE \ + pj_lock_t *lock; \ + pj_bool_t auto_delete_lock; + + +enum ioqueue_event_type +{ + NO_EVENT, + READABLE_EVENT, + WRITEABLE_EVENT, + EXCEPTION_EVENT, +}; + +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ); +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type); diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 367ffb5e..24e68564 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -20,6 +20,11 @@ #include #include #include + +/* + * Include declaration from common abstraction. + */ +#include "ioqueue_common_abs.h" /* * ISSUES with ioqueue_select() @@ -58,8 +63,6 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); - - /* * During debugging build, VALIDATE_FD_SET is set. * This will check the validity of the fd_sets. @@ -70,76 +73,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); # define VALIDATE_FD_SET 0 #endif -struct generic_operation -{ - PJ_DECL_LIST_MEMBER(struct generic_operation); - pj_ioqueue_operation_e op; -}; - -struct read_operation -{ - PJ_DECL_LIST_MEMBER(struct read_operation); - pj_ioqueue_operation_e op; - - void *buf; - pj_size_t size; - unsigned flags; - pj_sockaddr_t *rmt_addr; - int *rmt_addrlen; -}; - -struct write_operation -{ - PJ_DECL_LIST_MEMBER(struct write_operation); - pj_ioqueue_operation_e op; - - char *buf; - pj_size_t size; - pj_ssize_t written; - unsigned flags; - pj_sockaddr_in rmt_addr; - int rmt_addrlen; -}; - -#if PJ_HAS_TCP -struct accept_operation -{ - PJ_DECL_LIST_MEMBER(struct accept_operation); - pj_ioqueue_operation_e op; - - pj_sock_t *accept_fd; - pj_sockaddr_t *local_addr; - pj_sockaddr_t *rmt_addr; - int *addrlen; -}; -#endif - -union operation_key -{ - struct generic_operation generic; - struct read_operation read; - struct write_operation write; -#if PJ_HAS_TCP - struct accept_operation accept; -#endif -}; - /* * This describes each key. */ struct pj_ioqueue_key_t -{ - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); - pj_ioqueue_t *ioqueue; - pj_sock_t fd; - void *user_data; - pj_ioqueue_callback cb; - int connecting; - struct read_operation read_list; - struct write_operation write_list; -#if PJ_HAS_TCP - struct accept_operation accept_list; -#endif +{ + DECLARE_COMMON_KEY }; /* @@ -147,8 +86,8 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - pj_lock_t *lock; - pj_bool_t auto_delete_lock; + DECLARE_COMMON_IOQUEUE + unsigned max, count; pj_ioqueue_key_t key_list; pj_fd_set_t rfdset; @@ -157,6 +96,11 @@ struct pj_ioqueue_t pj_fd_set_t xfdset; #endif }; + +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" /* * pj_ioqueue_create() @@ -167,7 +111,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioqueue; + pj_ioqueue_t *ioqueue; + pj_lock_t *lock; pj_status_t rc; /* Check that arguments are valid. */ @@ -179,7 +124,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= sizeof(union operation_key), PJ_EBUG); - ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); + ioqueue->max = max_fd; ioqueue->count = 0; PJ_FD_ZERO(&ioqueue->rfdset); @@ -189,11 +137,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, #endif pj_list_init(&ioqueue->key_list); - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; - ioqueue->auto_delete_lock = PJ_TRUE; + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) + return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); @@ -208,16 +158,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, */ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { - pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); pj_lock_acquire(ioqueue->lock); - - if (ioqueue->auto_delete_lock) - rc = pj_lock_destroy(ioqueue->lock); - - return rc; + return ioqueue_destroy(ioqueue); } @@ -260,17 +204,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - key->ioqueue = ioqueue; - key->fd = sock; - key->user_data = user_data; - pj_list_init(&key->read_list); - pj_list_init(&key->write_list); -#if PJ_HAS_TCP - pj_list_init(&key->accept_list); -#endif - - /* Save callback. */ - pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) + return rc; /* Register */ pj_list_insert_before(&ioqueue->key_list, key); @@ -296,7 +232,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) PJ_ASSERT_RETURN(key, PJ_EINVAL); ioqueue = key->ioqueue; - + pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -308,39 +244,20 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif - pj_lock_release(ioqueue->lock); - return PJ_SUCCESS; -} + /* ioqueue_destroy may try to acquire key's mutex. + * Since normally the order of locking is to lock key's mutex first + * then ioqueue's mutex, ioqueue_destroy may deadlock unless we + * release ioqueue's mutex first. + */ + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); -/* - * pj_ioqueue_get_user_data() - * - * Obtain value associated with a key. - */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) -{ - PJ_ASSERT_RETURN(key != NULL, NULL); - return key->user_data; + return PJ_SUCCESS; } -/* - * pj_ioqueue_set_user_data() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, - void *user_data, - void **old_data) -{ - PJ_ASSERT_RETURN(key, PJ_EINVAL); - - if (old_data) - *old_data = key->user_data; - key->user_data = user_data; - - return PJ_SUCCESS; -} - - /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ @@ -390,7 +307,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, } } #endif /* VALIDATE_FD_SET */ + +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} + +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} /* * pj_ioqueue_poll() @@ -412,8 +376,13 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; - int count; + int count, counter; pj_ioqueue_key_t *h; + struct event + { + pj_ioqueue_key_t *key; + enum event_type event_type; + } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); @@ -453,704 +422,72 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count <= 0) return count; + else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) + count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; - /* Lock ioqueue again before scanning for signalled sockets. - * We must strictly use recursive mutex since application may invoke - * the ioqueue again inside the callback. + /* Scan descriptor sets for event and add the events in the event + * array to be processed later in this function. We do this so that + * events can be processed in parallel without holding ioqueue lock. */ pj_lock_acquire(ioqueue->lock); + + counter = 0; /* Scan for writable sockets first to handle piggy-back data * coming with accept(). */ h = ioqueue->key_list.next; -do_writable_scan: - for ( ; h!=&ioqueue->key_list; h = h->next) { - if ( (!pj_list_empty(&h->write_list) || h->connecting) + for ( ; h!=&ioqueue->key_list && counternext) { + if ( (key_has_pending_write(h) || key_has_pending_connect(h)) && PJ_FD_ISSET(h->fd, &wfdset)) { - break; + event[counter].key = h; + event[counter].event_type = WRITEABLE_EVENT; + ++counter; } - } - if (h != &ioqueue->key_list) { - pj_assert(!pj_list_empty(&h->write_list) || h->connecting); - -#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 - if (h->connecting) { - /* Completion of connect() operation */ - pj_ssize_t bytes_transfered; - -#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) - /* from connect(2): - * On Linux, use getsockopt to read the SO_ERROR option at - * level SOL_SOCKET to determine whether connect() completed - * successfully (if SO_ERROR is zero). - */ - int value; - socklen_t vallen = sizeof(value); - int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, - &value, &vallen); - if (gs_rc != 0) { - /* Argh!! What to do now??? - * Just indicate that the socket is connected. The - * application will get error as soon as it tries to use - * the socket to send/receive. - */ - bytes_transfered = 0; - } else { - bytes_transfered = value; - } -#elif defined(PJ_WIN32) && PJ_WIN32!=0 - bytes_transfered = 0; /* success */ -#else - /* Excellent information in D.J. Bernstein page: - * http://cr.yp.to/docs/connect.html - * - * Seems like the most portable way of detecting connect() - * failure is to call getpeername(). If socket is connected, - * getpeername() will return 0. If the socket is not connected, - * it will return ENOTCONN, and read(fd, &ch, 1) will produce - * the right errno through error slippage. This is a combination - * of suggestions from Douglas C. Schmidt and Ken Keys. - */ - int gp_rc; - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - - gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); - bytes_transfered = gp_rc; -#endif - - /* Clear operation. */ - h->connecting = 0; - PJ_FD_CLR(h->fd, &ioqueue->wfdset); - PJ_FD_CLR(h->fd, &ioqueue->xfdset); - - /* Call callback. */ - if (h->cb.on_connect_complete) - (*h->cb.on_connect_complete)(h, bytes_transfered); - - /* Re-scan writable sockets. */ - goto do_writable_scan; - - } else -#endif /* PJ_HAS_TCP */ - { - /* Socket is writable. */ - struct write_operation *write_op; - pj_ssize_t sent; - pj_status_t send_rc; - - /* Get the first in the queue. */ - write_op = h->write_list.next; - - /* Send the data. */ - sent = write_op->size - write_op->written; - if (write_op->op == PJ_IOQUEUE_OP_SEND) { - send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, - &sent, write_op->flags); - } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { - send_rc = pj_sock_sendto(h->fd, - write_op->buf+write_op->written, - &sent, write_op->flags, - &write_op->rmt_addr, - write_op->rmt_addrlen); - } else { - pj_assert(!"Invalid operation type!"); - send_rc = PJ_EBUG; - } - if (send_rc == PJ_SUCCESS) { - write_op->written += sent; - } else { - pj_assert(send_rc > 0); - write_op->written = -send_rc; - } - - /* In any case we don't need to process this descriptor again. */ - PJ_FD_CLR(h->fd, &wfdset); - - /* Are we finished with this buffer? */ - if (send_rc!=PJ_SUCCESS || - write_op->written == (pj_ssize_t)write_op->size) - { - pj_list_erase(write_op); - - /* Clear operation if there's no more data to send. */ - if (pj_list_empty(&h->write_list)) - PJ_FD_CLR(h->fd, &ioqueue->wfdset); - - /* Call callback. */ - if (h->cb.on_write_complete) { - (*h->cb.on_write_complete)(h, - (pj_ioqueue_op_key_t*)write_op, - write_op->written); - } - } - - /* Re-scan writable sockets. */ - goto do_writable_scan; - } - } + /* Scan for readable socket. */ + if ((key_has_pending_read(h) || key_has_pending_accept(h)) + && PJ_FD_ISSET(h->fd, &rfdset)) + { + event[counter].key = h; + event[counter].event_type = READABLE_EVENT; + ++counter; } - /* Scan for readable socket. */ - h = ioqueue->key_list.next; -do_readable_scan: - for ( ; h!=&ioqueue->key_list; h = h->next) { - if ((!pj_list_empty(&h->read_list) #if PJ_HAS_TCP - || !pj_list_empty(&h->accept_list) -#endif - ) && PJ_FD_ISSET(h->fd, &rfdset)) - { - break; - } - } - if (h != &ioqueue->key_list) { - pj_status_t rc; - -#if PJ_HAS_TCP - pj_assert(!pj_list_empty(&h->read_list) || - !pj_list_empty(&h->accept_list)); -#else - pj_assert(!pj_list_empty(&h->read_list)); + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { + event[counter].key = h; + event[counter].event_type = EXCEPTION_EVENT; + ++counter; + } #endif - -# if PJ_HAS_TCP - if (!pj_list_empty(&h->accept_list)) { - - struct accept_operation *accept_op; - - /* Get one accept operation from the list. */ - accept_op = h->accept_list.next; - pj_list_erase(accept_op); - - rc=pj_sock_accept(h->fd, accept_op->accept_fd, - accept_op->rmt_addr, accept_op->addrlen); - if (rc==PJ_SUCCESS && accept_op->local_addr) { - rc = pj_sock_getsockname(*accept_op->accept_fd, - accept_op->local_addr, - accept_op->addrlen); - } - - /* Clear bit in fdset if there is no more pending accept */ - if (pj_list_empty(&h->accept_list)) - PJ_FD_CLR(h->fd, &ioqueue->rfdset); - - /* Call callback. */ - if (h->cb.on_accept_complete) - (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, - *accept_op->accept_fd, rc); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - } - else { -# endif - struct read_operation *read_op; - pj_ssize_t bytes_read; - - pj_assert(!pj_list_empty(&h->read_list)); - - /* Get one pending read operation from the list. */ - read_op = h->read_list.next; - pj_list_erase(read_op); - - bytes_read = read_op->size; - - if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, - read_op->rmt_addr, - read_op->rmt_addrlen); - } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); - } else { - pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); - /* - * User has specified pj_ioqueue_read(). - * On Win32, we should do ReadFile(). But because we got - * here because of select() anyway, user must have put a - * socket descriptor on h->fd, which in this case we can - * just call pj_sock_recv() instead of ReadFile(). - * On Unix, user may put a file in h->fd, so we'll have - * to call read() here. - * This may not compile on systems which doesn't have - * read(). That's why we only specify PJ_LINUX here so - * that error is easier to catch. - */ -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); - //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, - // &bytes_read, NULL); -# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) - bytes_read = read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); -# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 - bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; -# else -# error "Implement read() for this platform!" -# endif - } - - if (rc != PJ_SUCCESS) { -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - /* On Win32, for UDP, WSAECONNRESET on the receive side - * indicates that previous sending has triggered ICMP Port - * Unreachable message. - * But we wouldn't know at this point which one of previous - * key that has triggered the error, since UDP socket can - * be shared! - * So we'll just ignore it! - */ - - if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { - //PJ_LOG(4,(THIS_FILE, - // "Ignored ICMP port unreach. on key=%p", h)); - } -# endif - - /* In any case we would report this to caller. */ - bytes_read = -rc; - } - - /* Clear fdset if there is no pending read. */ - if (pj_list_empty(&h->read_list)) - PJ_FD_CLR(h->fd, &ioqueue->rfdset); - - /* In any case clear from temporary set. */ - PJ_FD_CLR(h->fd, &rfdset); - - /* Call callback. */ - if (h->cb.on_read_complete) - (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, - bytes_read); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - - } - } - -#if PJ_HAS_TCP - /* Scan for exception socket for TCP connection error. */ - h = ioqueue->key_list.next; -do_except_scan: - for ( ; h!=&ioqueue->key_list; h = h->next) { - if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) - break; - } - if (h != &ioqueue->key_list) { - - pj_assert(h->connecting); - - /* Clear operation. */ - h->connecting = 0; - PJ_FD_CLR(h->fd, &ioqueue->wfdset); - PJ_FD_CLR(h->fd, &ioqueue->xfdset); - PJ_FD_CLR(h->fd, &wfdset); - PJ_FD_CLR(h->fd, &xfdset); - - /* Call callback. */ - if (h->cb.on_connect_complete) - (*h->cb.on_connect_complete)(h, -1); - - /* Re-scan exception list. */ - goto do_except_scan; - } -#endif /* PJ_HAS_TCP */ - - /* Shouldn't happen. */ - /* For strange reason on WinXP select() can return 1 while there is no - * pj_fd_set_t signaled. */ - /* pj_assert(0); */ - - //count = 0; - - pj_lock_release(ioqueue->lock); - return count; -} - -/* - * pj_ioqueue_recv() - * - * Start asynchronous recv() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags ) -{ - pj_status_t status; - pj_ssize_t size; - struct read_operation *read_op; - pj_ioqueue_t *ioqueue; - - PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* Try to see if there's data immediately available. - */ - size = *length; - status = pj_sock_recv(key->fd, buffer, &size, flags); - if (status == PJ_SUCCESS) { - /* Yes! Data is available! */ - *length = size; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) - return status; - } - - /* - * No data is immediately available. - * Must schedule asynchronous operation to the ioqueue. - */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - - read_op = (struct read_operation*)op_key; - - read_op->op = PJ_IOQUEUE_OP_RECV; - read_op->buf = buffer; - read_op->size = *length; - read_op->flags = flags; - - pj_list_insert_before(&key->read_list, read_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); - - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_recvfrom() - * - * Start asynchronous recvfrom() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - pj_status_t status; - pj_ssize_t size; - struct read_operation *read_op; - pj_ioqueue_t *ioqueue; - - PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* Try to see if there's data immediately available. - */ - size = *length; - status = pj_sock_recvfrom(key->fd, buffer, &size, flags, - addr, addrlen); - if (status == PJ_SUCCESS) { - /* Yes! Data is available! */ - *length = size; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) - return status; } - /* - * No data is immediately available. - * Must schedule asynchronous operation to the ioqueue. - */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - - read_op = (struct read_operation*)op_key; - - read_op->op = PJ_IOQUEUE_OP_RECV_FROM; - read_op->buf = buffer; - read_op->size = *length; - read_op->flags = flags; - read_op->rmt_addr = addr; - read_op->rmt_addrlen = addrlen; - - pj_list_insert_before(&key->read_list, read_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_send() - * - * Start asynchronous send() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags) -{ - pj_ioqueue_t *ioqueue; - struct write_operation *write_op; - pj_status_t status; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); - PJ_CHECK_STACK(); - /* Fast track: - * Try to send data immediately, only if there's no pending write! - * Note: - * We are speculating that the list is empty here without properly - * acquiring ioqueue's mutex first. This is intentional, to maximize - * performance via parallelism. - * - * This should be safe, because: - * - by convention, we require caller to make sure that the - * key is not unregistered while other threads are invoking - * an operation on the same key. - * - pj_list_empty() is safe to be invoked by multiple threads, - * even when other threads are modifying the list. - */ - if (pj_list_empty(&key->write_list)) { - /* - * See if data can be sent immediately. - */ - sent = *length; - status = pj_sock_send(key->fd, data, &sent, flags); - if (status == PJ_SUCCESS) { - /* Success! */ - *length = sent; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { - return status; - } - } - } + count = counter; - /* - * Schedule asynchronous send. + /* Now process all events. The dispatch functions will take care + * of locking in each of the key */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - - write_op = (struct write_operation*)op_key; - write_op->op = PJ_IOQUEUE_OP_SEND; - write_op->buf = NULL; - write_op->size = *length; - write_op->written = 0; - write_op->flags = flags; - - pj_list_insert_before(&key->write_list, write_op); - PJ_FD_SET(key->fd, &ioqueue->wfdset); - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_sendto() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - pj_ioqueue_t *ioqueue; - struct write_operation *write_op; - pj_status_t status; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* Fast track: - * Try to send data immediately, only if there's no pending write! - * Note: - * We are speculating that the list is empty here without properly - * acquiring ioqueue's mutex first. This is intentional, to maximize - * performance via parallelism. - * - * This should be safe, because: - * - by convention, we require caller to make sure that the - * key is not unregistered while other threads are invoking - * an operation on the same key. - * - pj_list_empty() is safe to be invoked by multiple threads, - * even when other threads are modifying the list. - */ - if (pj_list_empty(&key->write_list)) { - /* - * See if data can be sent immediately. - */ - sent = *length; - status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); - if (status == PJ_SUCCESS) { - /* Success! */ - *length = sent; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { - return status; - } + for (counter=0; counterioqueue; - pj_lock_acquire(ioqueue->lock); - - write_op = (struct write_operation*)op_key; - write_op->op = PJ_IOQUEUE_OP_SEND_TO; - write_op->buf = NULL; - write_op->size = *length; - write_op->written = 0; - write_op->flags = flags; - pj_memcpy(&write_op->rmt_addr, addr, addrlen); - write_op->rmt_addrlen = addrlen; - - pj_list_insert_before(&key->write_list, write_op); - PJ_FD_SET(key->fd, &ioqueue->wfdset); - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} -#if PJ_HAS_TCP -/* - * Initiate overlapped accept() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ - pj_ioqueue_t *ioqueue; - struct accept_operation *accept_op; - pj_status_t status; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); - - /* Fast track: - * See if there's new connection available immediately. - */ - if (pj_list_empty(&key->accept_list)) { - status = pj_sock_accept(key->fd, new_sock, remote, addrlen); - if (status == PJ_SUCCESS) { - /* Yes! New connection is available! */ - if (local && addrlen) { - status = pj_sock_getsockname(*new_sock, local, addrlen); - if (status != PJ_SUCCESS) { - pj_sock_close(*new_sock); - *new_sock = PJ_INVALID_SOCKET; - return status; - } - } - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { - return status; - } - } - } - - /* - * No connection is available immediately. - * Schedule accept() operation to be completed when there is incoming - * connection available. - */ - ioqueue = key->ioqueue; - accept_op = (struct accept_operation*)op_key; - - pj_lock_acquire(ioqueue->lock); - - accept_op->op = PJ_IOQUEUE_OP_ACCEPT; - accept_op->accept_fd = new_sock; - accept_op->rmt_addr = remote; - accept_op->addrlen= addrlen; - accept_op->local_addr = local; - - pj_list_insert_before(&key->accept_list, accept_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} - -/* - * Initiate overlapped connect() operation (well, it's non-blocking actually, - * since there's no overlapped version of connect()). - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{ - pj_ioqueue_t *ioqueue; - pj_status_t status; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); - - /* Check if socket has not been marked for connecting */ - if (key->connecting != 0) - return PJ_EPENDING; - - status = pj_sock_connect(key->fd, addr, addrlen); - if (status == PJ_SUCCESS) { - /* Connected! */ - return PJ_SUCCESS; - } else { - if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { - /* Pending! */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - key->connecting = PJ_TRUE; - PJ_FD_SET(key->fd, &ioqueue->wfdset); - PJ_FD_SET(key->fd, &ioqueue->xfdset); - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; - } else { - /* Error! */ - return status; - } - } + return count; } -#endif /* PJ_HAS_TCP */ diff --git a/pjlib/src/pj/sock_bsd.c b/pjlib/src/pj/sock_bsd.c index f9439d17..92495df2 100644 --- a/pjlib/src/pj/sock_bsd.c +++ b/pjlib/src/pj/sock_bsd.c @@ -59,7 +59,12 @@ const pj_uint16_t PJ_SOL_IPV6 = SOL_IPV6; #else const pj_uint16_t PJ_SOL_IPV6 = 0xFFFF; #endif - + +/* optname values. */ +const pj_uint16_t PJ_SO_TYPE = SO_TYPE; +const pj_uint16_t PJ_SO_RCVBUF = SO_RCVBUF; +const pj_uint16_t PJ_SO_SNDBUF = SO_SNDBUF; + /* * Convert 16-bit value from network byte order to host byte order. @@ -464,8 +469,8 @@ PJ_DEF(pj_status_t) pj_sock_recvfrom(pj_sock_t sock, * Get socket option. */ PJ_DEF(pj_status_t) pj_sock_getsockopt( pj_sock_t sock, - int level, - int optname, + pj_uint16_t level, + pj_uint16_t optname, void *optval, int *optlen) { @@ -482,8 +487,8 @@ PJ_DEF(pj_status_t) pj_sock_getsockopt( pj_sock_t sock, * Set socket option. */ PJ_DEF(pj_status_t) pj_sock_setsockopt( pj_sock_t sock, - int level, - int optname, + pj_uint16_t level, + pj_uint16_t optname, const void *optval, int optlen) { diff --git a/pjlib/src/pj/sock_linux_kernel.c b/pjlib/src/pj/sock_linux_kernel.c index 4748af92..d7924f91 100644 --- a/pjlib/src/pj/sock_linux_kernel.c +++ b/pjlib/src/pj/sock_linux_kernel.c @@ -71,6 +71,11 @@ const pj_uint16_t PJ_SOL_IPV6 = SOL_IPV6; #else # error "SOL_IPV6 undeclared!" #endif + +/* optname values. */ +const pj_uint16_t PJ_SO_TYPE = SO_TYPE; +const pj_uint16_t PJ_SO_RCVBUF = SO_RCVBUF; +const pj_uint16_t PJ_SO_SNDBUF = SO_SNDBUF; /* * Convert 16-bit value from network byte order to host byte order. @@ -553,8 +558,8 @@ PJ_DEF(pj_status_t) pj_sock_recvfrom( pj_sock_t sockfd, * Get socket option. */ PJ_DEF(pj_status_t) pj_sock_getsockopt( pj_sock_t sockfd, - int level, - int optname, + pj_uint16_t level, + pj_uint16_t optname, void *optval, int *optlen) { @@ -580,8 +585,8 @@ PJ_DEF(pj_status_t) pj_sock_getsockopt( pj_sock_t sockfd, * Set socket option. */ PJ_DEF(pj_status_t) pj_sock_setsockopt( pj_sock_t sockfd, - int level, - int optname, + pj_uint16_t level, + pj_uint16_t optname, const void *optval, int optlen) { diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index 4cd11068..3305fb60 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -71,7 +71,7 @@ static void on_read_complete(pj_ioqueue_key_t *key, char errmsg[128]; if (rc != last_error) { - last_error = rc; + //last_error = rc; pj_strerror(rc, errmsg, sizeof(errmsg)); PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", bytes_read, errmsg)); diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c index fd5329e5..14813c79 100644 --- a/pjlib/src/pjlib-test/ioq_tcp.c +++ b/pjlib/src/pjlib-test/ioq_tcp.c @@ -179,10 +179,6 @@ static int send_recv_test(pj_ioqueue_t *ioque, pj_get_timestamp(&t2); t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo); - if (status < 0) { - return -176; - } - // Compare recv buffer with send buffer. if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) { return -180; -- cgit v1.2.3