diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
commit | 33a8c1cb59304d92d517e3ba511bf233c729597f (patch) | |
tree | e6cb65930121480465db749bf5916fa2708ca633 /pjlib/src/pj/ioqueue_epoll.c | |
parent | 6d5fbe07f3dc84c10ea75c5584fe8b5513278d08 (diff) |
Tested new ioqueue framework on Linux with select and epoll
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_epoll.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_epoll.c | 661 |
1 files changed, 135 insertions, 526 deletions
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index 24f9bfbb..aa012531 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -1,5 +1,4 @@ /* $Id$ - * */ /* * ioqueue_epoll.c @@ -30,7 +29,7 @@ # define epoll_data data.ptr # define epoll_data_type void* -# define ioctl_val_type unsigned long* +# define ioctl_val_type unsigned long # define getsockopt_val_ptr int* # define os_getsockopt getsockopt # define os_ioctl ioctl @@ -126,51 +125,20 @@ #define THIS_FILE "ioq_epoll" -#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \ - (op & PJ_IOQUEUE_OP_RECV) || \ - (op & PJ_IOQUEUE_OP_RECV_FROM)) -#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \ - (op & PJ_IOQUEUE_OP_SEND) || \ - (op & PJ_IOQUEUE_OP_SEND_TO)) - - -#if PJ_HAS_TCP -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT) -# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT) -#else -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0 -# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0 -#endif - - //#define TRACE_(expr) PJ_LOG(3,expr) #define TRACE_(expr) +/* + * Include common ioqueue abstraction. + */ +#include "ioqueue_common_abs.h" /* * This describes each key. */ struct pj_ioqueue_key_t { - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) - pj_sock_t fd; - pj_ioqueue_operation_e op; - void *user_data; - pj_ioqueue_callback cb; - - void *rd_buf; - unsigned rd_flags; - pj_size_t rd_buflen; - void *wr_buf; - pj_size_t wr_buflen; - - pj_sockaddr_t *rmt_addr; - int *rmt_addrlen; - - pj_sockaddr_t *local_addr; - int *local_addrlen; - - pj_sock_t *accept_fd; + DECLARE_COMMON_KEY }; /* @@ -178,13 +146,18 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - pj_lock_t *lock; - pj_bool_t auto_delete_lock; + DECLARE_COMMON_IOQUEUE + unsigned max, count; pj_ioqueue_key_t hlist; int epfd; }; +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" + /* * pj_ioqueue_create() * @@ -192,37 +165,45 @@ struct pj_ioqueue_t */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioque; + pj_ioqueue_t *ioqueue; pj_status_t rc; + pj_lock_t *lock; - PJ_UNUSED_ARG(max_threads); + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0, PJ_EINVAL); - if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { - pj_assert(!"max_fd too large"); - return PJ_EINVAL; - } + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); - ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); - ioque->max = max_fd; - ioque->count = 0; - pj_list_init(&ioque->hlist); + ioqueue->max = max_fd; + ioqueue->count = 0; + pj_list_init(&ioqueue->hlist); - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; - ioque->auto_delete_lock = PJ_TRUE; - ioque->epfd = os_epoll_create(max_fd); - if (ioque->epfd < 0) { + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) + return rc; + + ioqueue->epfd = os_epoll_create(max_fd); + if (ioqueue->epfd < 0) { + ioqueue_destroy(ioqueue); return PJ_RETURN_OS_ERROR(pj_get_native_os_error()); } - PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque)); + PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue)); - *p_ioqueue = ioque; + *p_ioqueue = ioqueue; return PJ_SUCCESS; } @@ -231,47 +212,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, * * Destroy ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) +PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); - PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP); - - pj_lock_acquire(ioque->lock); - os_close(ioque->epfd); - ioque->epfd = 0; - if (ioque->auto_delete_lock) - pj_lock_destroy(ioque->lock); - - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_set_lock() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, - pj_lock_t *lock, - pj_bool_t auto_delete ) -{ - PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); - - if (ioque->auto_delete_lock) { - pj_lock_destroy(ioque->lock); - } - - ioque->lock = lock; - ioque->auto_delete_lock = auto_delete; + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); - return PJ_SUCCESS; + pj_lock_acquire(ioqueue->lock); + os_close(ioqueue->epfd); + ioqueue->epfd = 0; + return ioqueue_destroy(ioqueue); } - /* * pj_ioqueue_register_sock() * * Register a socket to ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioque, + pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, @@ -283,12 +241,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, int status; pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET && + PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); - pj_lock_acquire(ioque->lock); + pj_lock_acquire(ioqueue->lock); - if (ioque->count >= ioque->max) { + if (ioqueue->count >= ioqueue->max) { rc = PJ_ETOOMANY; TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files")); goto on_return; @@ -305,16 +263,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - key->fd = sock; - key->user_data = user_data; - pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } /* os_epoll_ctl. */ ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; ev.epoll_data = (epoll_data_type)key; - status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev); + status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev); if (status < 0) { rc = pj_get_os_error(); + key = NULL; TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", status)); @@ -322,12 +283,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Register */ - pj_list_insert_before(&ioque->hlist, key); - ++ioque->count; + pj_list_insert_before(&ioqueue->hlist, key); + ++ioqueue->count; on_return: *p_key = key; - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; } @@ -337,179 +298,116 @@ on_return: * * Unregister handle from ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key) +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) { + pj_ioqueue_t *ioqueue; struct epoll_event ev; int status; - PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); + PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL); - pj_lock_acquire(ioque->lock); + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); - pj_assert(ioque->count > 0); - --ioque->count; + pj_assert(ioqueue->count > 0); + --ioqueue->count; pj_list_erase(key); ev.events = 0; ev.epoll_data = (epoll_data_type)key; - status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev); + status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev); if (status != 0) { pj_status_t rc = pj_get_os_error(); - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; } - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); + return PJ_SUCCESS; } -/* - * pj_ioqueue_get_user_data() - * - * Obtain value associated with a key. +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) { - PJ_ASSERT_RETURN(key != NULL, NULL); - return key->user_data; } +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ +} /* * pj_ioqueue_poll() * */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { int i, count, processed; - struct epoll_event events[16]; + struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; int msec; + struct queue { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; + } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; PJ_CHECK_STACK(); msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000; - count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec); + count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec); if (count <= 0) return count; /* Lock ioqueue. */ - pj_lock_acquire(ioque->lock); - - processed = 0; + pj_lock_acquire(ioqueue->lock); - for (i=0; i<count; ++i) { + for (processed=0, i=0; i<count; ++i) { pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) events[i].epoll_data; - pj_status_t rc; - - /* - * Check for completion of read operations. - */ - if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) { - pj_ssize_t bytes_read = h->rd_buflen; - - if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0, - h->rmt_addr, h->rmt_addrlen); - } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); - } else { - bytes_read = os_read( h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); - } - - if (rc != PJ_SUCCESS) { - bytes_read = -rc; - } - - h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | - PJ_IOQUEUE_OP_RECV_FROM); - - /* Call callback. */ - (*h->cb.on_read_complete)(h, bytes_read); - ++processed; - } /* - * Check for completion of accept() operation. + * Check readability. */ - else if ((events[i].events & EPOLLIN) && - (h->op & PJ_IOQUEUE_OP_ACCEPT)) - { - /* accept() must be the only operation specified on - * server socket - */ - pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT); - - rc = pj_sock_accept( h->fd, h->accept_fd, - h->rmt_addr, h->rmt_addrlen); - if (rc==PJ_SUCCESS && h->local_addr) { - rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, - h->local_addrlen); - } - - h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); - - /* Call callback. */ - (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); - + if ((events[i].events & EPOLLIN) && + (key_has_pending_read(h) || key_has_pending_accept(h))) { + queue[processed].key = h; + queue[processed].event_type = READABLE_EVENT; ++processed; } /* - * Check for completion of write operations. + * Check for writeability. */ - if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) { - /* Completion of write(), send(), or sendto() operation. */ - - /* Clear operation. */ - h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | - PJ_IOQUEUE_OP_SEND_TO); - - /* Call callback. */ - /* All data must have been sent? */ - (*h->cb.on_write_complete)(h, h->wr_buflen); - + if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { + queue[processed].key = h; + queue[processed].event_type = WRITEABLE_EVENT; ++processed; } + #if PJ_HAS_TCP /* * Check for completion of connect() operation. */ - else if ((events[i].events & EPOLLOUT) && - (h->op & PJ_IOQUEUE_OP_CONNECT)) - { - /* Completion of connect() operation */ - pj_ssize_t bytes_transfered; - - /* from connect(2): - * On Linux, use getsockopt to read the SO_ERROR option at - * level SOL_SOCKET to determine whether connect() completed - * successfully (if SO_ERROR is zero). - */ - int value; - socklen_t vallen = sizeof(value); - int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, - (getsockopt_val_ptr)&value, &vallen); - if (gs_rc != 0) { - /* Argh!! What to do now??? - * Just indicate that the socket is connected. The - * application will get error as soon as it tries to use - * the socket to send/receive. - */ - bytes_transfered = 0; - } else { - bytes_transfered = value; - } - - /* Clear operation. */ - h->op &= (~PJ_IOQUEUE_OP_CONNECT); - - /* Call callback. */ - (*h->cb.on_connect_complete)(h, bytes_transfered); - + if ((events[i].events & EPOLLOUT) && (h->connecting)) { + queue[processed].key = h; + queue[processed].event_type = WRITEABLE_EVENT; ++processed; } #endif /* PJ_HAS_TCP */ @@ -517,321 +415,32 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) /* * Check for error condition. */ - if (events[i].events & EPOLLERR) { - if (h->op & PJ_IOQUEUE_OP_CONNECT) { - h->op &= ~PJ_IOQUEUE_OP_CONNECT; - - /* Call callback. */ - (*h->cb.on_connect_complete)(h, -1); - - ++processed; - } + if (events[i].events & EPOLLERR && (h->connecting)) { + queue[processed].key = h; + queue[processed].event_type = EXCEPTION_EVENT; + ++processed; } } - - pj_lock_release(ioque->lock); - - return processed; -} - -/* - * pj_ioqueue_read() - * - * Start asynchronous read from the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_READ; - key->rd_flags = 0; - key->rd_buf = buffer; - key->rd_buflen = buflen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_recv() - * - * Start asynchronous recv() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags ) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_recvfrom() - * - * Start asynchronous recvfrom() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV_FROM; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - key->rmt_addr = addr; - key->rmt_addrlen = addrlen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_write() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, 0); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_WRITE; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - -/* - * pj_ioqueue_send() - * - * Start asynchronous send() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, flags); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_sendto() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_sendto() if it returns error. */ - rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND_TO; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -#if PJ_HAS_TCP -/* - * Initiate overlapped accept() operation. - */ -PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ - /* check parameters. All must be specified! */ - pj_assert(ioqueue && key && new_sock); - - /* Server socket must have no other operation! */ - pj_assert(key->op == 0); - - pj_lock_acquire(ioqueue->lock); - - key->op = PJ_IOQUEUE_OP_ACCEPT; - key->accept_fd = new_sock; - key->rmt_addr = remote; - key->rmt_addrlen = addrlen; - key->local_addr = local; - key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */ - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * Initiate overlapped connect() operation (well, it's non-blocking actually, - * since there's no overlapped version of connect()). - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{ - pj_status_t rc; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); - /* Connecting socket must have no other operation! */ - PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); - - rc = pj_sock_connect(key->fd, addr, addrlen); - if (rc == PJ_SUCCESS) { - /* Connected! */ - return PJ_SUCCESS; - } else { - if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || - rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) - { - /* Pending! */ - pj_lock_acquire(ioqueue->lock); - key->op = PJ_IOQUEUE_OP_CONNECT; - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; - } else { - /* Error! */ - return rc; - } + /* Now process the events. */ + for (i=0; i<processed; ++i) { + switch (queue[i].event_type) { + case READABLE_EVENT: + ioqueue_dispatch_read_event(ioqueue, queue[i].key); + break; + case WRITEABLE_EVENT: + ioqueue_dispatch_write_event(ioqueue, queue[i].key); + break; + case EXCEPTION_EVENT: + ioqueue_dispatch_exception_event(ioqueue, queue[i].key); + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } } + + return processed; } -#endif /* PJ_HAS_TCP */ |