From 97611d9f0a7809a759a0a0603f6d45f5822ad170 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sun, 6 Nov 2005 13:32:11 +0000 Subject: Put common ioqueue functionalities in separate file to be used by both select() and epoll git-svn-id: http://svn.pjsip.org/repos/pjproject/main@12 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_select.c | 933 ++++++------------------------------------ 1 file changed, 135 insertions(+), 798 deletions(-) (limited to 'pjlib/src/pj/ioqueue_select.c') diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 367ffb5e..24e68564 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -20,6 +20,11 @@ #include #include #include + +/* + * Include declaration from common abstraction. + */ +#include "ioqueue_common_abs.h" /* * ISSUES with ioqueue_select() @@ -58,8 +63,6 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); - - /* * During debugging build, VALIDATE_FD_SET is set. * This will check the validity of the fd_sets. @@ -70,76 +73,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); # define VALIDATE_FD_SET 0 #endif -struct generic_operation -{ - PJ_DECL_LIST_MEMBER(struct generic_operation); - pj_ioqueue_operation_e op; -}; - -struct read_operation -{ - PJ_DECL_LIST_MEMBER(struct read_operation); - pj_ioqueue_operation_e op; - - void *buf; - pj_size_t size; - unsigned flags; - pj_sockaddr_t *rmt_addr; - int *rmt_addrlen; -}; - -struct write_operation -{ - PJ_DECL_LIST_MEMBER(struct write_operation); - pj_ioqueue_operation_e op; - - char *buf; - pj_size_t size; - pj_ssize_t written; - unsigned flags; - pj_sockaddr_in rmt_addr; - int rmt_addrlen; -}; - -#if PJ_HAS_TCP -struct accept_operation -{ - PJ_DECL_LIST_MEMBER(struct accept_operation); - pj_ioqueue_operation_e op; - - pj_sock_t *accept_fd; - pj_sockaddr_t *local_addr; - pj_sockaddr_t *rmt_addr; - int *addrlen; -}; -#endif - -union operation_key -{ - struct generic_operation generic; - struct read_operation read; - struct write_operation write; -#if PJ_HAS_TCP - struct accept_operation accept; -#endif -}; - /* * This describes each key. */ struct pj_ioqueue_key_t -{ - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); - pj_ioqueue_t *ioqueue; - pj_sock_t fd; - void *user_data; - pj_ioqueue_callback cb; - int connecting; - struct read_operation read_list; - struct write_operation write_list; -#if PJ_HAS_TCP - struct accept_operation accept_list; -#endif +{ + DECLARE_COMMON_KEY }; /* @@ -147,8 +86,8 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - pj_lock_t *lock; - pj_bool_t auto_delete_lock; + DECLARE_COMMON_IOQUEUE + unsigned max, count; pj_ioqueue_key_t key_list; pj_fd_set_t rfdset; @@ -157,6 +96,11 @@ struct pj_ioqueue_t pj_fd_set_t xfdset; #endif }; + +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" /* * pj_ioqueue_create() @@ -167,7 +111,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioqueue; + pj_ioqueue_t *ioqueue; + pj_lock_t *lock; pj_status_t rc; /* Check that arguments are valid. */ @@ -179,7 +124,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= sizeof(union operation_key), PJ_EBUG); - ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); + ioqueue->max = max_fd; ioqueue->count = 0; PJ_FD_ZERO(&ioqueue->rfdset); @@ -189,11 +137,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, #endif pj_list_init(&ioqueue->key_list); - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; - ioqueue->auto_delete_lock = PJ_TRUE; + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) + return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); @@ -208,16 +158,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, */ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { - pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); pj_lock_acquire(ioqueue->lock); - - if (ioqueue->auto_delete_lock) - rc = pj_lock_destroy(ioqueue->lock); - - return rc; + return ioqueue_destroy(ioqueue); } @@ -260,17 +204,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - key->ioqueue = ioqueue; - key->fd = sock; - key->user_data = user_data; - pj_list_init(&key->read_list); - pj_list_init(&key->write_list); -#if PJ_HAS_TCP - pj_list_init(&key->accept_list); -#endif - - /* Save callback. */ - pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) + return rc; /* Register */ pj_list_insert_before(&ioqueue->key_list, key); @@ -296,7 +232,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) PJ_ASSERT_RETURN(key, PJ_EINVAL); ioqueue = key->ioqueue; - + pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -308,39 +244,20 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif - pj_lock_release(ioqueue->lock); - return PJ_SUCCESS; -} + /* ioqueue_destroy may try to acquire key's mutex. + * Since normally the order of locking is to lock key's mutex first + * then ioqueue's mutex, ioqueue_destroy may deadlock unless we + * release ioqueue's mutex first. + */ + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); -/* - * pj_ioqueue_get_user_data() - * - * Obtain value associated with a key. - */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) -{ - PJ_ASSERT_RETURN(key != NULL, NULL); - return key->user_data; + return PJ_SUCCESS; } -/* - * pj_ioqueue_set_user_data() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, - void *user_data, - void **old_data) -{ - PJ_ASSERT_RETURN(key, PJ_EINVAL); - - if (old_data) - *old_data = key->user_data; - key->user_data = user_data; - - return PJ_SUCCESS; -} - - /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ @@ -390,7 +307,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, } } #endif /* VALIDATE_FD_SET */ + +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} + +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} /* * pj_ioqueue_poll() @@ -412,8 +376,13 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; - int count; + int count, counter; pj_ioqueue_key_t *h; + struct event + { + pj_ioqueue_key_t *key; + enum event_type event_type; + } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); @@ -453,704 +422,72 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count <= 0) return count; + else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) + count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; - /* Lock ioqueue again before scanning for signalled sockets. - * We must strictly use recursive mutex since application may invoke - * the ioqueue again inside the callback. + /* Scan descriptor sets for event and add the events in the event + * array to be processed later in this function. We do this so that + * events can be processed in parallel without holding ioqueue lock. */ pj_lock_acquire(ioqueue->lock); + + counter = 0; /* Scan for writable sockets first to handle piggy-back data * coming with accept(). */ h = ioqueue->key_list.next; -do_writable_scan: - for ( ; h!=&ioqueue->key_list; h = h->next) { - if ( (!pj_list_empty(&h->write_list) || h->connecting) + for ( ; h!=&ioqueue->key_list && counternext) { + if ( (key_has_pending_write(h) || key_has_pending_connect(h)) && PJ_FD_ISSET(h->fd, &wfdset)) { - break; + event[counter].key = h; + event[counter].event_type = WRITEABLE_EVENT; + ++counter; } - } - if (h != &ioqueue->key_list) { - pj_assert(!pj_list_empty(&h->write_list) || h->connecting); - -#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 - if (h->connecting) { - /* Completion of connect() operation */ - pj_ssize_t bytes_transfered; - -#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) - /* from connect(2): - * On Linux, use getsockopt to read the SO_ERROR option at - * level SOL_SOCKET to determine whether connect() completed - * successfully (if SO_ERROR is zero). - */ - int value; - socklen_t vallen = sizeof(value); - int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, - &value, &vallen); - if (gs_rc != 0) { - /* Argh!! What to do now??? - * Just indicate that the socket is connected. The - * application will get error as soon as it tries to use - * the socket to send/receive. - */ - bytes_transfered = 0; - } else { - bytes_transfered = value; - } -#elif defined(PJ_WIN32) && PJ_WIN32!=0 - bytes_transfered = 0; /* success */ -#else - /* Excellent information in D.J. Bernstein page: - * http://cr.yp.to/docs/connect.html - * - * Seems like the most portable way of detecting connect() - * failure is to call getpeername(). If socket is connected, - * getpeername() will return 0. If the socket is not connected, - * it will return ENOTCONN, and read(fd, &ch, 1) will produce - * the right errno through error slippage. This is a combination - * of suggestions from Douglas C. Schmidt and Ken Keys. - */ - int gp_rc; - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - - gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); - bytes_transfered = gp_rc; -#endif - - /* Clear operation. */ - h->connecting = 0; - PJ_FD_CLR(h->fd, &ioqueue->wfdset); - PJ_FD_CLR(h->fd, &ioqueue->xfdset); - - /* Call callback. */ - if (h->cb.on_connect_complete) - (*h->cb.on_connect_complete)(h, bytes_transfered); - - /* Re-scan writable sockets. */ - goto do_writable_scan; - - } else -#endif /* PJ_HAS_TCP */ - { - /* Socket is writable. */ - struct write_operation *write_op; - pj_ssize_t sent; - pj_status_t send_rc; - - /* Get the first in the queue. */ - write_op = h->write_list.next; - - /* Send the data. */ - sent = write_op->size - write_op->written; - if (write_op->op == PJ_IOQUEUE_OP_SEND) { - send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, - &sent, write_op->flags); - } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { - send_rc = pj_sock_sendto(h->fd, - write_op->buf+write_op->written, - &sent, write_op->flags, - &write_op->rmt_addr, - write_op->rmt_addrlen); - } else { - pj_assert(!"Invalid operation type!"); - send_rc = PJ_EBUG; - } - if (send_rc == PJ_SUCCESS) { - write_op->written += sent; - } else { - pj_assert(send_rc > 0); - write_op->written = -send_rc; - } - - /* In any case we don't need to process this descriptor again. */ - PJ_FD_CLR(h->fd, &wfdset); - - /* Are we finished with this buffer? */ - if (send_rc!=PJ_SUCCESS || - write_op->written == (pj_ssize_t)write_op->size) - { - pj_list_erase(write_op); - - /* Clear operation if there's no more data to send. */ - if (pj_list_empty(&h->write_list)) - PJ_FD_CLR(h->fd, &ioqueue->wfdset); - - /* Call callback. */ - if (h->cb.on_write_complete) { - (*h->cb.on_write_complete)(h, - (pj_ioqueue_op_key_t*)write_op, - write_op->written); - } - } - - /* Re-scan writable sockets. */ - goto do_writable_scan; - } - } + /* Scan for readable socket. */ + if ((key_has_pending_read(h) || key_has_pending_accept(h)) + && PJ_FD_ISSET(h->fd, &rfdset)) + { + event[counter].key = h; + event[counter].event_type = READABLE_EVENT; + ++counter; } - /* Scan for readable socket. */ - h = ioqueue->key_list.next; -do_readable_scan: - for ( ; h!=&ioqueue->key_list; h = h->next) { - if ((!pj_list_empty(&h->read_list) #if PJ_HAS_TCP - || !pj_list_empty(&h->accept_list) -#endif - ) && PJ_FD_ISSET(h->fd, &rfdset)) - { - break; - } - } - if (h != &ioqueue->key_list) { - pj_status_t rc; - -#if PJ_HAS_TCP - pj_assert(!pj_list_empty(&h->read_list) || - !pj_list_empty(&h->accept_list)); -#else - pj_assert(!pj_list_empty(&h->read_list)); + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { + event[counter].key = h; + event[counter].event_type = EXCEPTION_EVENT; + ++counter; + } #endif - -# if PJ_HAS_TCP - if (!pj_list_empty(&h->accept_list)) { - - struct accept_operation *accept_op; - - /* Get one accept operation from the list. */ - accept_op = h->accept_list.next; - pj_list_erase(accept_op); - - rc=pj_sock_accept(h->fd, accept_op->accept_fd, - accept_op->rmt_addr, accept_op->addrlen); - if (rc==PJ_SUCCESS && accept_op->local_addr) { - rc = pj_sock_getsockname(*accept_op->accept_fd, - accept_op->local_addr, - accept_op->addrlen); - } - - /* Clear bit in fdset if there is no more pending accept */ - if (pj_list_empty(&h->accept_list)) - PJ_FD_CLR(h->fd, &ioqueue->rfdset); - - /* Call callback. */ - if (h->cb.on_accept_complete) - (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, - *accept_op->accept_fd, rc); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - } - else { -# endif - struct read_operation *read_op; - pj_ssize_t bytes_read; - - pj_assert(!pj_list_empty(&h->read_list)); - - /* Get one pending read operation from the list. */ - read_op = h->read_list.next; - pj_list_erase(read_op); - - bytes_read = read_op->size; - - if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, - read_op->rmt_addr, - read_op->rmt_addrlen); - } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); - } else { - pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); - /* - * User has specified pj_ioqueue_read(). - * On Win32, we should do ReadFile(). But because we got - * here because of select() anyway, user must have put a - * socket descriptor on h->fd, which in this case we can - * just call pj_sock_recv() instead of ReadFile(). - * On Unix, user may put a file in h->fd, so we'll have - * to call read() here. - * This may not compile on systems which doesn't have - * read(). That's why we only specify PJ_LINUX here so - * that error is easier to catch. - */ -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); - //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, - // &bytes_read, NULL); -# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) - bytes_read = read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); -# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 - bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; -# else -# error "Implement read() for this platform!" -# endif - } - - if (rc != PJ_SUCCESS) { -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - /* On Win32, for UDP, WSAECONNRESET on the receive side - * indicates that previous sending has triggered ICMP Port - * Unreachable message. - * But we wouldn't know at this point which one of previous - * key that has triggered the error, since UDP socket can - * be shared! - * So we'll just ignore it! - */ - - if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { - //PJ_LOG(4,(THIS_FILE, - // "Ignored ICMP port unreach. on key=%p", h)); - } -# endif - - /* In any case we would report this to caller. */ - bytes_read = -rc; - } - - /* Clear fdset if there is no pending read. */ - if (pj_list_empty(&h->read_list)) - PJ_FD_CLR(h->fd, &ioqueue->rfdset); - - /* In any case clear from temporary set. */ - PJ_FD_CLR(h->fd, &rfdset); - - /* Call callback. */ - if (h->cb.on_read_complete) - (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, - bytes_read); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - - } - } - -#if PJ_HAS_TCP - /* Scan for exception socket for TCP connection error. */ - h = ioqueue->key_list.next; -do_except_scan: - for ( ; h!=&ioqueue->key_list; h = h->next) { - if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset)) - break; - } - if (h != &ioqueue->key_list) { - - pj_assert(h->connecting); - - /* Clear operation. */ - h->connecting = 0; - PJ_FD_CLR(h->fd, &ioqueue->wfdset); - PJ_FD_CLR(h->fd, &ioqueue->xfdset); - PJ_FD_CLR(h->fd, &wfdset); - PJ_FD_CLR(h->fd, &xfdset); - - /* Call callback. */ - if (h->cb.on_connect_complete) - (*h->cb.on_connect_complete)(h, -1); - - /* Re-scan exception list. */ - goto do_except_scan; - } -#endif /* PJ_HAS_TCP */ - - /* Shouldn't happen. */ - /* For strange reason on WinXP select() can return 1 while there is no - * pj_fd_set_t signaled. */ - /* pj_assert(0); */ - - //count = 0; - - pj_lock_release(ioqueue->lock); - return count; -} - -/* - * pj_ioqueue_recv() - * - * Start asynchronous recv() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags ) -{ - pj_status_t status; - pj_ssize_t size; - struct read_operation *read_op; - pj_ioqueue_t *ioqueue; - - PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* Try to see if there's data immediately available. - */ - size = *length; - status = pj_sock_recv(key->fd, buffer, &size, flags); - if (status == PJ_SUCCESS) { - /* Yes! Data is available! */ - *length = size; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) - return status; - } - - /* - * No data is immediately available. - * Must schedule asynchronous operation to the ioqueue. - */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - - read_op = (struct read_operation*)op_key; - - read_op->op = PJ_IOQUEUE_OP_RECV; - read_op->buf = buffer; - read_op->size = *length; - read_op->flags = flags; - - pj_list_insert_before(&key->read_list, read_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); - - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_recvfrom() - * - * Start asynchronous recvfrom() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - pj_status_t status; - pj_ssize_t size; - struct read_operation *read_op; - pj_ioqueue_t *ioqueue; - - PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* Try to see if there's data immediately available. - */ - size = *length; - status = pj_sock_recvfrom(key->fd, buffer, &size, flags, - addr, addrlen); - if (status == PJ_SUCCESS) { - /* Yes! Data is available! */ - *length = size; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) - return status; } - /* - * No data is immediately available. - * Must schedule asynchronous operation to the ioqueue. - */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - - read_op = (struct read_operation*)op_key; - - read_op->op = PJ_IOQUEUE_OP_RECV_FROM; - read_op->buf = buffer; - read_op->size = *length; - read_op->flags = flags; - read_op->rmt_addr = addr; - read_op->rmt_addrlen = addrlen; - - pj_list_insert_before(&key->read_list, read_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_send() - * - * Start asynchronous send() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags) -{ - pj_ioqueue_t *ioqueue; - struct write_operation *write_op; - pj_status_t status; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); - PJ_CHECK_STACK(); - /* Fast track: - * Try to send data immediately, only if there's no pending write! - * Note: - * We are speculating that the list is empty here without properly - * acquiring ioqueue's mutex first. This is intentional, to maximize - * performance via parallelism. - * - * This should be safe, because: - * - by convention, we require caller to make sure that the - * key is not unregistered while other threads are invoking - * an operation on the same key. - * - pj_list_empty() is safe to be invoked by multiple threads, - * even when other threads are modifying the list. - */ - if (pj_list_empty(&key->write_list)) { - /* - * See if data can be sent immediately. - */ - sent = *length; - status = pj_sock_send(key->fd, data, &sent, flags); - if (status == PJ_SUCCESS) { - /* Success! */ - *length = sent; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { - return status; - } - } - } + count = counter; - /* - * Schedule asynchronous send. + /* Now process all events. The dispatch functions will take care + * of locking in each of the key */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - - write_op = (struct write_operation*)op_key; - write_op->op = PJ_IOQUEUE_OP_SEND; - write_op->buf = NULL; - write_op->size = *length; - write_op->written = 0; - write_op->flags = flags; - - pj_list_insert_before(&key->write_list, write_op); - PJ_FD_SET(key->fd, &ioqueue->wfdset); - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_sendto() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - pj_ioqueue_t *ioqueue; - struct write_operation *write_op; - pj_status_t status; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* Fast track: - * Try to send data immediately, only if there's no pending write! - * Note: - * We are speculating that the list is empty here without properly - * acquiring ioqueue's mutex first. This is intentional, to maximize - * performance via parallelism. - * - * This should be safe, because: - * - by convention, we require caller to make sure that the - * key is not unregistered while other threads are invoking - * an operation on the same key. - * - pj_list_empty() is safe to be invoked by multiple threads, - * even when other threads are modifying the list. - */ - if (pj_list_empty(&key->write_list)) { - /* - * See if data can be sent immediately. - */ - sent = *length; - status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); - if (status == PJ_SUCCESS) { - /* Success! */ - *length = sent; - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { - return status; - } + for (counter=0; counterioqueue; - pj_lock_acquire(ioqueue->lock); - - write_op = (struct write_operation*)op_key; - write_op->op = PJ_IOQUEUE_OP_SEND_TO; - write_op->buf = NULL; - write_op->size = *length; - write_op->written = 0; - write_op->flags = flags; - pj_memcpy(&write_op->rmt_addr, addr, addrlen); - write_op->rmt_addrlen = addrlen; - - pj_list_insert_before(&key->write_list, write_op); - PJ_FD_SET(key->fd, &ioqueue->wfdset); - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} -#if PJ_HAS_TCP -/* - * Initiate overlapped accept() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ - pj_ioqueue_t *ioqueue; - struct accept_operation *accept_op; - pj_status_t status; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); - - /* Fast track: - * See if there's new connection available immediately. - */ - if (pj_list_empty(&key->accept_list)) { - status = pj_sock_accept(key->fd, new_sock, remote, addrlen); - if (status == PJ_SUCCESS) { - /* Yes! New connection is available! */ - if (local && addrlen) { - status = pj_sock_getsockname(*new_sock, local, addrlen); - if (status != PJ_SUCCESS) { - pj_sock_close(*new_sock); - *new_sock = PJ_INVALID_SOCKET; - return status; - } - } - return PJ_SUCCESS; - } else { - /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report - * the error to caller. - */ - if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { - return status; - } - } - } - - /* - * No connection is available immediately. - * Schedule accept() operation to be completed when there is incoming - * connection available. - */ - ioqueue = key->ioqueue; - accept_op = (struct accept_operation*)op_key; - - pj_lock_acquire(ioqueue->lock); - - accept_op->op = PJ_IOQUEUE_OP_ACCEPT; - accept_op->accept_fd = new_sock; - accept_op->rmt_addr = remote; - accept_op->addrlen= addrlen; - accept_op->local_addr = local; - - pj_list_insert_before(&key->accept_list, accept_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} - -/* - * Initiate overlapped connect() operation (well, it's non-blocking actually, - * since there's no overlapped version of connect()). - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{ - pj_ioqueue_t *ioqueue; - pj_status_t status; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); - - /* Check if socket has not been marked for connecting */ - if (key->connecting != 0) - return PJ_EPENDING; - - status = pj_sock_connect(key->fd, addr, addrlen); - if (status == PJ_SUCCESS) { - /* Connected! */ - return PJ_SUCCESS; - } else { - if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { - /* Pending! */ - ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - key->connecting = PJ_TRUE; - PJ_FD_SET(key->fd, &ioqueue->wfdset); - PJ_FD_SET(key->fd, &ioqueue->xfdset); - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; - } else { - /* Error! */ - return status; - } - } + return count; } -#endif /* PJ_HAS_TCP */ -- cgit v1.2.3