diff options
Diffstat (limited to 'pjlib/include/pj/ioqueue.h')
-rw-r--r-- | pjlib/include/pj/ioqueue.h | 420 |
1 files changed, 258 insertions, 162 deletions
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index 9d26b082..6a7b827e 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -1,5 +1,4 @@ /* $Id$ - * */ #ifndef __PJ_IOQUEUE_H__ @@ -48,17 +47,54 @@ PJ_BEGIN_DECL * @ingroup PJ_IO * @{ * - * This file provides abstraction for various event dispatching mechanisms. - * The interfaces for event dispatching vary alot, even in a single - * operating system. The abstraction here hopefully is suitable for most of - * the event dispatching available. - * - * Currently, the I/O Queue supports: - * - select(), as the common denominator, but the least efficient. - * - I/O Completion ports in Windows NT/2000/XP, which is the most efficient - * way to dispatch events in Windows NT based OSes, and most importantly, - * it doesn't have the limit on how many handles to monitor. And it works - * with files (not only sockets) as well. + * I/O Queue provides API for performing asynchronous I/O operations. It
+ * conforms to proactor pattern, which allows application to submit an
+ * asynchronous operation and to be notified later when the operation has
+ * completed. + *
+ * The framework works natively in platforms where asynchronous operation API
+ * exists, such as in Windows NT with IoCompletionPort/IOCP. In other
+ * platforms, the I/O queue abstracts the operating system's event poll API
+ * to provide semantics similar to IoCompletionPort with minimal penalties
+ * (i.e. per ioqueue and per handle mutex protection).
+ *
+ * The I/O queue provides more than just unified abstraction. It also:
+ * - makes sure that the operation uses the most effective way to utilize
+ * the underlying mechanism, to provide the maximum theoritical
+ * throughput possible on a given platform.
+ * - choose the most efficient mechanism for event polling on a given
+ * platform.
+ * + * Currently, the I/O Queue is implemented using: + * - <tt><b>select()</b></tt>, as the common denominator, but the least
+ * efficient. Also the number of descriptor is limited to
+ * \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64).
+ * - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode),
+ * a much faster replacement for select() on Linux (and more importantly
+ * doesn't have limitation on number of descriptors). + * - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most
+ * efficient way to dispatch events in Windows NT based OSes, and most
+ * importantly, it doesn't have the limit on how many handles to monitor.
+ * And it works with files (not only sockets) as well. + *
+ *
+ * \section pj_ioqueue_concurrency_sec Concurrency Rules
+ *
+ * The items below describe rules that must be obeyed when using the I/O
+ * queue, with regard to concurrency:
+ * - in general, the I/O queue is thread safe (assuming the lock strategy
+ * is not changed to disable mutex protection). All operations, except
+ * unregistration which is described below, can be safely invoked
+ * simultaneously by multiple threads.
+ * - however, <b>care must be taken when unregistering a key</b> from the
+ * ioqueue. Application must take care that when one thread is issuing
+ * an unregistration, other thread is not simultaneously invoking an
+ * operation <b>to the same key</b>.
+ *\n
+ * This happens because the ioqueue functions are working with a pointer
+ * to the key, and there is a possible race condition where the pointer
+ * has been rendered invalid by other threads before the ioqueue has a
+ * chance to acquire mutex on it.
* * \section pj_ioqeuue_examples_sec Examples * @@ -68,53 +104,90 @@ PJ_BEGIN_DECL * - \ref page_pjlib_ioqueue_udp_test * - \ref page_pjlib_ioqueue_perf_test */ +
+
+
+/**
+ * This structure describes operation specific key to be submitted to
+ * I/O Queue when performing the asynchronous operation. This key will
+ * be returned to the application when completion callback is called.
+ *
+ * Application normally wants to attach it's specific data in the
+ * \c user_data field so that it can keep track of which operation has
+ * completed when the callback is called. Alternatively, application can
+ * also extend this struct to include its data, because the pointer that
+ * is returned in the completion callback will be exactly the same as
+ * the pointer supplied when the asynchronous function is called.
+ */
+typedef struct pj_ioqueue_op_key_t
+{
+ void *internal__[32]; /**< Internal I/O Queue data. */
+ void *user_data; /**< Application data. */
+} pj_ioqueue_op_key_t;
- /** - * This structure describes the callbacks to be called when I/O operation - * completes. - */ +/** + * This structure describes the callbacks to be called when I/O operation + * completes. + */ typedef struct pj_ioqueue_callback { /** - * This callback is called when #pj_ioqueue_read or #pj_ioqueue_recvfrom + * This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom * completes. * - * @param key The key. - * @param bytes_read The size of data that has just been read. + * @param key The key.
+ * @param op_key Operation key. + * @param bytes_read >= 0 to indicate the amount of data read,
+ * otherwise negative value containing the error
+ * code. To obtain the pj_status_t error code, use
+ * (pj_status_t code = -bytes_read). */ - void (*on_read_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); + void (*on_read_complete)(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read); /** * This callback is called when #pj_ioqueue_write or #pj_ioqueue_sendto * completes. * - * @param key The key. - * @param bytes_read The size of data that has just been read. + * @param key The key.
+ * @param op_key Operation key. + * @param bytes_sent >= 0 to indicate the amount of data written,
+ * otherwise negative value containing the error
+ * code. To obtain the pj_status_t error code, use
+ * (pj_status_t code = -bytes_sent).
*/ - void (*on_write_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent); + void (*on_write_complete)(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent); /** * This callback is called when #pj_ioqueue_accept completes. * - * @param key The key. + * @param key The key.
+ * @param op_key Operation key. * @param sock Newly connected socket. * @param status Zero if the operation completes successfully. */ - void (*on_accept_complete)(pj_ioqueue_key_t *key, pj_sock_t sock, - int status); + void (*on_accept_complete)(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock, + pj_status_t status); /** * This callback is called when #pj_ioqueue_connect completes. * - * @param key The key. - * @param status Zero if the operation completes successfully. + * @param key The key.
+ * @param status PJ_SUCCESS if the operation completes successfully. */ - void (*on_connect_complete)(pj_ioqueue_key_t *key, int status); + void (*on_connect_complete)(pj_ioqueue_key_t *key,
+ pj_status_t status); } pj_ioqueue_callback; /** - * Types of I/O Queue operation. + * Types of pending I/O Queue operation. This enumeration is only used
+ * internally within the ioqueue. */ typedef enum pj_ioqueue_operation_e { @@ -137,25 +210,19 @@ typedef enum pj_ioqueue_operation_e * number of threads. */ #define PJ_IOQUEUE_DEFAULT_THREADS 0 - - +
/** * Create a new I/O Queue framework. * * @param pool The pool to allocate the I/O queue structure. * @param max_fd The maximum number of handles to be supported, which * should not exceed PJ_IOQUEUE_MAX_HANDLES. - * @param max_threads The maximum number of threads that are allowed to - * operate on a single descriptor simultaneously. If - * the value is zero, the framework will set it - * to a reasonable value. * @param ioqueue Pointer to hold the newly created I/O Queue. * * @return PJ_SUCCESS on success. */ PJ_DECL(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, pj_ioqueue_t **ioqueue); /** @@ -199,8 +266,10 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, * @param sock The socket. * @param user_data User data to be associated with the key, which can be * retrieved later. - * @param cb Callback to be called when I/O operation completes. - * @param key Pointer to receive the returned key. + * @param cb Callback to be called when I/O operation completes.
+ * @param key Pointer to receive the key to be associated with this
+ * socket. Subsequent I/O queue operation will need this
+ * key. * * @return PJ_SUCCESS on success, or the error code. */ @@ -208,49 +277,66 @@ PJ_DECL(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_ioqueue_t *ioque, pj_sock_t sock, void *user_data, - const pj_ioqueue_callback *cb, - pj_ioqueue_key_t **key); + const pj_ioqueue_callback *cb,
+ pj_ioqueue_key_t **key ); /** - * Unregister a handle from the I/O Queue framework. + * Unregister from the I/O Queue framework. Caller must make sure that
+ * the key doesn't have any pending operation before calling this function,
+ * or otherwise the behaviour is undefined (either callback will be called
+ * later when the data is sent/received, or the callback will not be called,
+ * or even something else). * - * @param ioque The I/O Queue. - * @param key The key that uniquely identifies the handle, which is - * returned from the function #pj_ioqueue_register_sock() - * or other registration functions. + * @param key The key that was previously obtained from registration.
* * @return PJ_SUCCESS on success or the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key ); +PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ); /** - * Get user data associated with the I/O Queue key. + * Get user data associated with an ioqueue key. + *
+ * @param key The key that was previously obtained from registration.
* - * @param key The key previously associated with the socket/handle with - * #pj_ioqueue_register_sock() (or other registration - * functions). - * - * @return The user data associated with the key, or NULL on error - * of if no data is associated with the key during + * @return The user data associated with the descriptor, or NULL
+ * on error or if no data is associated with the key during * registration. */ PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ); +
+/**
+ * Set or change the user data to be associated with the file descriptor or
+ * handle or socket descriptor.
+ *
+ * @param key The key that was previously obtained from registration.
+ * @param user_data User data to be associated with the descriptor.
+ * @param old_data Optional parameter to retrieve the old user data.
+ *
+ * @return PJ_SUCCESS on success or the error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 /** - * Instruct I/O Queue to wait for incoming connections on the specified - * listening socket. This function will return - * immediately (i.e. non-blocking) regardless whether some data has been - * transfered. If the function can't complete immediately, and the caller will - * be notified about the completion when it calls pj_ioqueue_poll(). - * - * @param ioqueue The I/O Queue - * @param key The key which registered to the server socket. - * @param sock Argument which contain pointer to receive - * the socket for the incoming connection. + * Instruct I/O Queue to accept incoming connection on the specified + * listening socket. This function will return immediately (i.e. non-blocking)
+ * regardless whether a connection is immediately available. If the function
+ * can't complete immediately, the caller will be notified about the incoming
+ * connection when it calls pj_ioqueue_poll(). If a new connection is
+ * immediately available, the function returns PJ_SUCCESS with the new
+ * connection; in this case, the callback WILL NOT be called. + * + * @param key The key which registered to the server socket.
+ * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called. + * @param new_sock Argument which contain pointer to receive the new socket + * for the incoming connection. * @param local Optional argument which contain pointer to variable to * receive local address. * @param remote Optional argument which contain pointer to variable to @@ -259,14 +345,16 @@ PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ); * address, and on output, contains the actual length of the * address. This argument is optional. * @return - * - PJ_SUCCESS If there's a connection available immediately, which - * in this case the callback should have been called before - * the function returns. - * - PJ_EPENDING If accept is queued, or + * - PJ_SUCCESS When connection is available immediately, and the
+ * parameters will be updated to contain information about
+ * the new connection. In this case, a completion callback
+ * WILL NOT be called. + * - PJ_EPENDING If no connection is available immediately. When a new
+ * connection arrives, the callback will be called. * - non-zero which indicates the appropriate error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, pj_sock_t *sock, pj_sockaddr_t *local, pj_sockaddr_t *remote, @@ -274,21 +362,22 @@ PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, /** * Initiate non-blocking socket connect. If the socket can NOT be connected - * immediately, the result will be reported during poll. + * immediately, asynchronous connect() will be scheduled and caller will be
+ * notified via completion callback when it calls pj_ioqueue_poll(). If
+ * socket is connected immediately, the function returns PJ_SUCCESS and
+ * completion callback WILL NOT be called. * - * @param ioqueue The ioqueue * @param key The key associated with TCP socket * @param addr The remote address. * @param addrlen The remote address length. * * @return - * - PJ_SUCCESS If socket is connected immediately, which in this case - * the callback should have been called. + * - PJ_SUCCESS If socket is connected immediately. In this case, the
+ * completion callback WILL NOT be called. * - PJ_EPENDING If operation is queued, or * - non-zero Indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ); @@ -309,74 +398,75 @@ PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, PJ_DECL(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout); -/** - * Instruct the I/O Queue to read from the specified handle. This function - * returns immediately (i.e. non-blocking) regardless whether some data has - * been transfered. If the operation can't complete immediately, caller will - * be notified about the completion when it calls pj_ioqueue_poll(). - * - * @param ioque The I/O Queue. - * @param key The key that uniquely identifies the handle. - * @param buffer The buffer to hold the read data. The caller MUST make sure - * that this buffer remain valid until the framework completes - * reading the handle. - * @param buflen The maximum size to be read. - * - * @return - * - PJ_SUCCESS If immediate data has been received. In this case, the - * callback must have been called before this function - * returns, and no pending operation is scheduled. - * - PJ_EPENDING If the operation has been queued. - * - non-zero The return value indicates the error code. - */ -PJ_DECL(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen); - /** - * This function behaves similarly as #pj_ioqueue_read(), except that it is - * normally called for socket. + * Instruct the I/O Queue to read from the specified handle. This function
+ * returns immediately (i.e. non-blocking) regardless whether some data has
+ * been transfered. If the operation can't complete immediately, caller will
+ * be notified about the completion when it calls pj_ioqueue_poll(). If data
+ * is immediately available, the function will return PJ_SUCCESS and the
+ * callback WILL NOT be called.
* - * @param ioque The I/O Queue. * @param key The key that uniquely identifies the handle. + * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called. Caller must make sure that this key remains
+ * valid until the function completes.
* @param buffer The buffer to hold the read data. The caller MUST make sure * that this buffer remain valid until the framework completes * reading the handle. - * @param buflen The maximum size to be read. + * @param length On input, it specifies the size of the buffer. If data is
+ * available to be read immediately, the function returns
+ * PJ_SUCCESS and this argument will be filled with the
+ * amount of data read. If the function is pending, caller
+ * will be notified about the amount of data read in the
+ * callback. This parameter can point to local variable in
+ * caller's stack and doesn't have to remain valid for the
+ * duration of pending operation. * @param flags Recv flag. * * @return - * - PJ_SUCCESS If immediate data has been received. In this case, the - * callback must have been called before this function - * returns, and no pending operation is scheduled. - * - PJ_EPENDING If the operation has been queued. + * - PJ_SUCCESS If immediate data has been received in the buffer. In this
+ * case, the callback WILL NOT be called. + * - PJ_EPENDING If the operation has been queued, and the callback will be
+ * called when data has been received. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags ); /** - * This function behaves similarly as #pj_ioqueue_read(), except that it is + * This function behaves similarly as #pj_ioqueue_recv(), except that it is * normally called for socket, and the remote address will also be returned * along with the data. Caller MUST make sure that both buffer and addr * remain valid until the framework completes reading the data. * - * @param ioque The I/O Queue. * @param key The key that uniquely identifies the handle. + * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called.
* @param buffer The buffer to hold the read data. The caller MUST make sure * that this buffer remain valid until the framework completes * reading the handle. - * @param buflen The maximum size to be read. + * @param length On input, it specifies the size of the buffer. If data is
+ * available to be read immediately, the function returns
+ * PJ_SUCCESS and this argument will be filled with the
+ * amount of data read. If the function is pending, caller
+ * will be notified about the amount of data read in the
+ * callback. This parameter can point to local variable in
+ * caller's stack and doesn't have to remain valid for the
+ * duration of pending operation.
* @param flags Recv flag. - * @param addr Pointer to buffer to receive the address, or NULL. + * @param addr Optional Pointer to buffer to receive the address. * @param addrlen On input, specifies the length of the address buffer. * On output, it will be filled with the actual length of - * the address. + * the address. This argument can be NULL if \c addr is not
+ * specified. * * @return * - PJ_SUCCESS If immediate data has been received. In this case, the @@ -385,56 +475,52 @@ PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, * - PJ_EPENDING If the operation has been queued. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags, pj_sockaddr_t *addr, int *addrlen); -/** - * Instruct the I/O Queue to write to the handle. This function will return - * immediately (i.e. non-blocking) regardless whether some data has been - * transfered. If the function can't complete immediately, and the caller will - * be notified about the completion when it calls pj_ioqueue_poll(). - * - * @param ioque the I/O Queue. - * @param key the key that identifies the handle. - * @param data the data to send. Caller MUST make sure that this buffer - * remains valid until the write operation completes. - * @param datalen the length of the data. - * - * @return - * - PJ_SUCCESS If data was immediately written. - * - PJ_EPENDING If the operation has been queued. - * - non-zero The return value indicates the error code. - */ -PJ_DECL(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen); /** - * This function behaves similarly as #pj_ioqueue_write(), except that - * pj_sock_send() (or equivalent) will be called to send the data. - * - * @param ioque the I/O Queue. - * @param key the key that identifies the handle. - * @param data the data to send. Caller MUST make sure that this buffer + * Instruct the I/O Queue to write to the handle. This function will return
+ * immediately (i.e. non-blocking) regardless whether some data has been
+ * transfered. If the function can't complete immediately, the caller will
+ * be notified about the completion when it calls pj_ioqueue_poll(). If
+ * operation completes immediately and data has been transfered, the function
+ * returns PJ_SUCCESS and the callback will NOT be called.
+ *
+ * @param key The key that identifies the handle. + * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called.
+ * @param data The data to send. Caller MUST make sure that this buffer * remains valid until the write operation completes. - * @param datalen the length of the data. - * @param flags send flags. + * @param length On input, it specifies the length of data to send. When
+ * data was sent immediately, this function returns PJ_SUCCESS
+ * and this parameter contains the length of data sent. If
+ * data can not be sent immediately, an asynchronous operation
+ * is scheduled and caller will be notified via callback the
+ * number of bytes sent. This parameter can point to local
+ * variable on caller's stack and doesn't have to remain
+ * valid until the operation has completed. + * @param flags Send flags. * * @return - * - PJ_SUCCESS If data was immediately written. - * - PJ_EPENDING If the operation has been queued. + * - PJ_SUCCESS If data was immediately transfered. In this case, no
+ * pending operation has been scheduled and the callback
+ * WILL NOT be called. + * - PJ_EPENDING If the operation has been queued. Once data base been
+ * transfered, the callback will be called. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags ); @@ -442,24 +528,34 @@ PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, * This function behaves similarly as #pj_ioqueue_write(), except that * pj_sock_sendto() (or equivalent) will be called to send the data. * - * @param ioque the I/O Queue. * @param key the key that identifies the handle. + * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called.
* @param data the data to send. Caller MUST make sure that this buffer * remains valid until the write operation completes. - * @param datalen the length of the data. + * @param length On input, it specifies the length of data to send. When
+ * data was sent immediately, this function returns PJ_SUCCESS
+ * and this parameter contains the length of data sent. If
+ * data can not be sent immediately, an asynchronous operation
+ * is scheduled and caller will be notified via callback the
+ * number of bytes sent. This parameter can point to local
+ * variable on caller's stack and doesn't have to remain
+ * valid until the operation has completed.
* @param flags send flags. - * @param addr remote address. - * @param addrlen remote address length. + * @param addr Optional remote address. + * @param addrlen Remote address length, \c addr is specified. * * @return * - PJ_SUCCESS If data was immediately written. * - PJ_EPENDING If the operation has been queued. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags, const pj_sockaddr_t *addr, int addrlen); |