diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
commit | 33a8c1cb59304d92d517e3ba511bf233c729597f (patch) | |
tree | e6cb65930121480465db749bf5916fa2708ca633 /pjlib | |
parent | 6d5fbe07f3dc84c10ea75c5584fe8b5513278d08 (diff) |
Tested new ioqueue framework on Linux with select and epoll
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r-- | pjlib/build/Makefile | 8 | ||||
-rw-r--r-- | pjlib/build/os-linux.mak | 4 | ||||
-rw-r--r-- | pjlib/include/pj/compat/os_linux.h | 30 | ||||
-rw-r--r-- | pjlib/include/pj/compat/os_linux_kernel.h | 30 | ||||
-rw-r--r-- | pjlib/include/pj/compat/os_sunos.h | 30 | ||||
-rw-r--r-- | pjlib/include/pj/compat/os_win32.h | 30 | ||||
-rw-r--r-- | pjlib/include/pj/ioqueue.h | 410 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 1626 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.h | 215 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_epoll.c | 661 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_select.c | 394 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_unix.c | 2 | ||||
-rw-r--r-- | pjlib/src/pj/sock_bsd.c | 4 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_perf.c | 76 |
14 files changed, 1569 insertions, 1951 deletions
diff --git a/pjlib/build/Makefile b/pjlib/build/Makefile index 21b2d387..e6a1bd9f 100644 --- a/pjlib/build/Makefile +++ b/pjlib/build/Makefile @@ -60,7 +60,7 @@ export PJLIB_LIB := ../lib/libpj-$(MACHINE_NAME)-$(OS_NAME)-$(CC_NAME)$(LIBEXT) ############################################################################### # Gather all flags. # -export _CFLAGS := -O2 $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \ +export _CFLAGS := -O2 -g $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \ $(CFLAGS) $(CC_INC)../include export _CXXFLAGS:= $(_CFLAGS) $(CC_CXXFLAGS) $(OS_CXXFLAGS) $(M_CXXFLAGS) \ $(HOST_CXXFLAGS) $(CXXFLAGS) @@ -98,7 +98,6 @@ export TEST_EXE := ../bin/pjlib-test-$(MACHINE_NAME)-$(OS_NAME)-$(CC_NAME)$(HOST export CC_OUT CC AR RANLIB HOST_MV HOST_RM HOST_RMDIR HOST_MKDIR OBJEXT LD LDOUT - ############################################################################### # Main entry # @@ -124,9 +123,12 @@ depend: dep: depend -pjlib: +pjlib: ../include/pj/config_site.h $(MAKE) -f $(RULES_MAK) APP=PJLIB app=pjlib $(PJLIB_LIB) +../include/pj/config_site.h: + touch ../include/pj/config_site.h + pjlib-test: $(MAKE) -f $(RULES_MAK) APP=TEST app=pjlib-test $(TEST_EXE) diff --git a/pjlib/build/os-linux.mak b/pjlib/build/os-linux.mak index fd9fc307..ab220411 100644 --- a/pjlib/build/os-linux.mak +++ b/pjlib/build/os-linux.mak @@ -14,8 +14,8 @@ export PJLIB_OBJS += addr_resolv_sock.o guid_simple.o \ os_time_ansi.o \ pool_policy_malloc.o sock_bsd.o sock_select.o -export PJLIB_OBJS += ioqueue_select.o -#export PJLIB_OBJS += ioqueue_epoll.o +#export PJLIB_OBJS += ioqueue_select.o +export PJLIB_OBJS += ioqueue_epoll.o # # TEST_OBJS are operating system specific object files to be included in diff --git a/pjlib/include/pj/compat/os_linux.h b/pjlib/include/pj/compat/os_linux.h index a2f97f65..efb661a4 100644 --- a/pjlib/include/pj/compat/os_linux.h +++ b/pjlib/include/pj/compat/os_linux.h @@ -54,22 +54,22 @@ #define PJ_HAS_WINSOCK_H 0 #define PJ_HAS_WINSOCK2_H 0 -#define PJ_SOCK_HAS_INET_ATON 1
-
-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
- * the status of non-blocking connect() operation.
+#define PJ_SOCK_HAS_INET_ATON 1 + +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 1 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() can not return immediate daata. + */ +#define PJ_BLOCKING_ERROR_VAL EAGAIN + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. */ -#define PJ_HAS_SO_ERROR 1
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket recv() can not return immediate daata.
- */
-#define PJ_BLOCKING_ERROR_VAL EAGAIN
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket connect() can not get connected immediately.
- */
-#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS
+#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS /* Default threading is enabled, unless it's overridden. */ #ifndef PJ_HAS_THREADS diff --git a/pjlib/include/pj/compat/os_linux_kernel.h b/pjlib/include/pj/compat/os_linux_kernel.h index 0d44ef0e..ccae3418 100644 --- a/pjlib/include/pj/compat/os_linux_kernel.h +++ b/pjlib/include/pj/compat/os_linux_kernel.h @@ -52,21 +52,21 @@ #define PJ_HAS_WINSOCK2_H 0 #define PJ_SOCK_HAS_INET_ATON 0 -
-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
- * the status of non-blocking connect() operation.
- */
-#define PJ_HAS_SO_ERROR 1
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket recv() can not return immediate daata.
- */
-#define PJ_BLOCKING_ERROR_VAL EAGAIN
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket connect() can not get connected immediately.
- */
-#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS
+ +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 1 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() can not return immediate daata. + */ +#define PJ_BLOCKING_ERROR_VAL EAGAIN + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS #ifndef PJ_HAS_THREADS # define PJ_HAS_THREADS (1) diff --git a/pjlib/include/pj/compat/os_sunos.h b/pjlib/include/pj/compat/os_sunos.h index 990fb57d..87c408ab 100644 --- a/pjlib/include/pj/compat/os_sunos.h +++ b/pjlib/include/pj/compat/os_sunos.h @@ -39,21 +39,21 @@ #define PJ_HAS_WINSOCK2_H 0 #define PJ_SOCK_HAS_INET_ATON 0 -
-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
- * the status of non-blocking connect() operation.
- */
-#define PJ_HAS_SO_ERROR 0
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket recv() can not return immediate daata.
- */
-#define PJ_BLOCKING_ERROR_VAL EWOULDBLOCK
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket connect() can not get connected immediately.
- */
-#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS
+ +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 0 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() can not return immediate daata. + */ +#define PJ_BLOCKING_ERROR_VAL EWOULDBLOCK + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS /* Default threading is enabled, unless it's overridden. */ #ifndef PJ_HAS_THREADS diff --git a/pjlib/include/pj/compat/os_win32.h b/pjlib/include/pj/compat/os_win32.h index 87ff7520..e8391b94 100644 --- a/pjlib/include/pj/compat/os_win32.h +++ b/pjlib/include/pj/compat/os_win32.h @@ -59,21 +59,21 @@ #define PJ_HAS_WINSOCK2_H 1 #define PJ_SOCK_HAS_INET_ATON 0 -
-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
- * the status of non-blocking connect() operation.
- */
-#define PJ_HAS_SO_ERROR 0
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket recv() or send() can not return immediately.
- */
-#define PJ_BLOCKING_ERROR_VAL WSAEWOULDBLOCK
-
-/* This value specifies the value set in errno by the OS when a non-blocking
- * socket connect() can not get connected immediately.
- */
-#define PJ_BLOCKING_CONNECT_ERROR_VAL WSAEWOULDBLOCK
+ +/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return + * the status of non-blocking connect() operation. + */ +#define PJ_HAS_SO_ERROR 0 + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket recv() or send() can not return immediately. + */ +#define PJ_BLOCKING_ERROR_VAL WSAEWOULDBLOCK + +/* This value specifies the value set in errno by the OS when a non-blocking + * socket connect() can not get connected immediately. + */ +#define PJ_BLOCKING_CONNECT_ERROR_VAL WSAEWOULDBLOCK /* Default threading is enabled, unless it's overridden. */ #ifndef PJ_HAS_THREADS diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index 2e084fd3..ce30c9f9 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -47,53 +47,53 @@ PJ_BEGIN_DECL * @ingroup PJ_IO * @{ * - * I/O Queue provides API for performing asynchronous I/O operations. It
- * conforms to proactor pattern, which allows application to submit an
- * asynchronous operation and to be notified later when the operation has
+ * I/O Queue provides API for performing asynchronous I/O operations. It + * conforms to proactor pattern, which allows application to submit an + * asynchronous operation and to be notified later when the operation has * completed. - *
- * The framework works natively in platforms where asynchronous operation API
- * exists, such as in Windows NT with IoCompletionPort/IOCP. In other
- * platforms, the I/O queue abstracts the operating system's event poll API
- * to provide semantics similar to IoCompletionPort with minimal penalties
- * (i.e. per ioqueue and per handle mutex protection).
- *
- * The I/O queue provides more than just unified abstraction. It also:
- * - makes sure that the operation uses the most effective way to utilize
- * the underlying mechanism, to provide the maximum theoritical
- * throughput possible on a given platform.
- * - choose the most efficient mechanism for event polling on a given
- * platform.
+ * + * The framework works natively in platforms where asynchronous operation API + * exists, such as in Windows NT with IoCompletionPort/IOCP. In other + * platforms, the I/O queue abstracts the operating system's event poll API + * to provide semantics similar to IoCompletionPort with minimal penalties + * (i.e. per ioqueue and per handle mutex protection). + * + * The I/O queue provides more than just unified abstraction. It also: + * - makes sure that the operation uses the most effective way to utilize + * the underlying mechanism, to provide the maximum theoritical + * throughput possible on a given platform. + * - choose the most efficient mechanism for event polling on a given + * platform. * * Currently, the I/O Queue is implemented using: - * - <tt><b>select()</b></tt>, as the common denominator, but the least
- * efficient. Also the number of descriptor is limited to
- * \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64).
- * - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode),
- * a much faster replacement for select() on Linux (and more importantly
+ * - <tt><b>select()</b></tt>, as the common denominator, but the least + * efficient. Also the number of descriptor is limited to + * \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64). + * - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode), + * a much faster replacement for select() on Linux (and more importantly * doesn't have limitation on number of descriptors). - * - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most
- * efficient way to dispatch events in Windows NT based OSes, and most
- * importantly, it doesn't have the limit on how many handles to monitor.
+ * - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most + * efficient way to dispatch events in Windows NT based OSes, and most + * importantly, it doesn't have the limit on how many handles to monitor. * And it works with files (not only sockets) as well. - *
- *
- * \section pj_ioqueue_concurrency_sec Concurrency Rules
- *
- * The items below describe rules that must be obeyed when using the I/O
- * queue, with regard to concurrency:
- * - simultaneous operations (by different threads) to different key is safe.
- * - simultaneous operations to the same key is also safe, except
- * <b>unregistration</b>, which is described below.
- * - <b>care must be taken when unregistering a key</b> from the
- * ioqueue. Application must take care that when one thread is issuing
- * an unregistration, other thread is not simultaneously invoking an
- * operation <b>to the same key</b>.
- *\n
- * This happens because the ioqueue functions are working with a pointer
- * to the key, and there is a possible race condition where the pointer
- * has been rendered invalid by other threads before the ioqueue has a
- * chance to acquire mutex on it.
+ * + * + * \section pj_ioqueue_concurrency_sec Concurrency Rules + * + * The items below describe rules that must be obeyed when using the I/O + * queue, with regard to concurrency: + * - simultaneous operations (by different threads) to different key is safe. + * - simultaneous operations to the same key is also safe, except + * <b>unregistration</b>, which is described below. + * - <b>care must be taken when unregistering a key</b> from the + * ioqueue. Application must take care that when one thread is issuing + * an unregistration, other thread is not simultaneously invoking an + * operation <b>to the same key</b>. + *\n + * This happens because the ioqueue functions are working with a pointer + * to the key, and there is a possible race condition where the pointer + * has been rendered invalid by other threads before the ioqueue has a + * chance to acquire mutex on it. * * \section pj_ioqeuue_examples_sec Examples * @@ -103,26 +103,26 @@ PJ_BEGIN_DECL * - \ref page_pjlib_ioqueue_udp_test * - \ref page_pjlib_ioqueue_perf_test */ -
-
-
-/**
- * This structure describes operation specific key to be submitted to
- * I/O Queue when performing the asynchronous operation. This key will
- * be returned to the application when completion callback is called.
- *
- * Application normally wants to attach it's specific data in the
- * \c user_data field so that it can keep track of which operation has
- * completed when the callback is called. Alternatively, application can
- * also extend this struct to include its data, because the pointer that
- * is returned in the completion callback will be exactly the same as
- * the pointer supplied when the asynchronous function is called.
- */
-typedef struct pj_ioqueue_op_key_t
-{
- void *internal__[32]; /**< Internal I/O Queue data. */
- void *user_data; /**< Application data. */
-} pj_ioqueue_op_key_t;
+ + + +/** + * This structure describes operation specific key to be submitted to + * I/O Queue when performing the asynchronous operation. This key will + * be returned to the application when completion callback is called. + * + * Application normally wants to attach it's specific data in the + * \c user_data field so that it can keep track of which operation has + * completed when the callback is called. Alternatively, application can + * also extend this struct to include its data, because the pointer that + * is returned in the completion callback will be exactly the same as + * the pointer supplied when the asynchronous function is called. + */ +typedef struct pj_ioqueue_op_key_t +{ + void *internal__[32]; /**< Internal I/O Queue data. */ + void *user_data; /**< Application data. */ +} pj_ioqueue_op_key_t; /** * This structure describes the callbacks to be called when I/O operation @@ -134,58 +134,58 @@ typedef struct pj_ioqueue_callback * This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom * completes. * - * @param key The key.
+ * @param key The key. * @param op_key Operation key. - * @param bytes_read >= 0 to indicate the amount of data read,
- * otherwise negative value containing the error
- * code. To obtain the pj_status_t error code, use
+ * @param bytes_read >= 0 to indicate the amount of data read, + * otherwise negative value containing the error + * code. To obtain the pj_status_t error code, use * (pj_status_t code = -bytes_read). */ - void (*on_read_complete)(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+ void (*on_read_complete)(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read); /** * This callback is called when #pj_ioqueue_write or #pj_ioqueue_sendto * completes. * - * @param key The key.
+ * @param key The key. * @param op_key Operation key. - * @param bytes_sent >= 0 to indicate the amount of data written,
- * otherwise negative value containing the error
- * code. To obtain the pj_status_t error code, use
- * (pj_status_t code = -bytes_sent).
+ * @param bytes_sent >= 0 to indicate the amount of data written, + * otherwise negative value containing the error + * code. To obtain the pj_status_t error code, use + * (pj_status_t code = -bytes_sent). */ - void (*on_write_complete)(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+ void (*on_write_complete)(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent); /** * This callback is called when #pj_ioqueue_accept completes. * - * @param key The key.
+ * @param key The key. * @param op_key Operation key. * @param sock Newly connected socket. * @param status Zero if the operation completes successfully. */ - void (*on_accept_complete)(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+ void (*on_accept_complete)(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_sock_t sock, pj_status_t status); /** * This callback is called when #pj_ioqueue_connect completes. * - * @param key The key.
+ * @param key The key. * @param status PJ_SUCCESS if the operation completes successfully. */ - void (*on_connect_complete)(pj_ioqueue_key_t *key,
+ void (*on_connect_complete)(pj_ioqueue_key_t *key, pj_status_t status); } pj_ioqueue_callback; /** - * Types of pending I/O Queue operation. This enumeration is only used
+ * Types of pending I/O Queue operation. This enumeration is only used * internally within the ioqueue. */ typedef enum pj_ioqueue_operation_e @@ -204,17 +204,17 @@ typedef enum pj_ioqueue_operation_e } pj_ioqueue_operation_e; -/**
- * This macro specifies the maximum number of events that can be
- * processed by the ioqueue on a single poll cycle, on implementation
- * that supports it. The value is only meaningfull when specified
- * during PJLIB build.
- */
-#ifndef PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL
-# define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL (16)
-#endif
+/** + * This macro specifies the maximum number of events that can be + * processed by the ioqueue on a single poll cycle, on implementation + * that supports it. The value is only meaningfull when specified + * during PJLIB build. + */ +#ifndef PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL +# define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL (16) +#endif + -
/** * Create a new I/O Queue framework. * @@ -270,9 +270,9 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, * @param sock The socket. * @param user_data User data to be associated with the key, which can be * retrieved later. - * @param cb Callback to be called when I/O operation completes.
- * @param key Pointer to receive the key to be associated with this
- * socket. Subsequent I/O queue operation will need this
+ * @param cb Callback to be called when I/O operation completes. + * @param key Pointer to receive the key to be associated with this + * socket. Subsequent I/O queue operation will need this * key. * * @return PJ_SUCCESS on success, or the error code. @@ -281,17 +281,17 @@ PJ_DECL(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_ioqueue_t *ioque, pj_sock_t sock, void *user_data, - const pj_ioqueue_callback *cb,
+ const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key ); /** - * Unregister from the I/O Queue framework. Caller must make sure that
- * the key doesn't have any pending operation before calling this function,
- * or otherwise the behaviour is undefined (either callback will be called
- * later when the data is sent/received, or the callback will not be called,
+ * Unregister from the I/O Queue framework. Caller must make sure that + * the key doesn't have any pending operation before calling this function, + * or otherwise the behaviour is undefined (either callback will be called + * later when the data is sent/received, or the callback will not be called, * or even something else). * - * @param key The key that was previously obtained from registration.
+ * @param key The key that was previously obtained from registration. * * @return PJ_SUCCESS on success or the error code. */ @@ -300,44 +300,44 @@ PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ); /** * Get user data associated with an ioqueue key. - *
- * @param key The key that was previously obtained from registration.
* - * @return The user data associated with the descriptor, or NULL
+ * @param key The key that was previously obtained from registration. + * + * @return The user data associated with the descriptor, or NULL * on error or if no data is associated with the key during * registration. */ PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ); -
-/**
- * Set or change the user data to be associated with the file descriptor or
- * handle or socket descriptor.
- *
- * @param key The key that was previously obtained from registration.
- * @param user_data User data to be associated with the descriptor.
- * @param old_data Optional parameter to retrieve the old user data.
- *
- * @return PJ_SUCCESS on success or the error code.
- */
-PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data);
+ +/** + * Set or change the user data to be associated with the file descriptor or + * handle or socket descriptor. + * + * @param key The key that was previously obtained from registration. + * @param user_data User data to be associated with the descriptor. + * @param old_data Optional parameter to retrieve the old user data. + * + * @return PJ_SUCCESS on success or the error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data); #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 /** * Instruct I/O Queue to accept incoming connection on the specified - * listening socket. This function will return immediately (i.e. non-blocking)
- * regardless whether a connection is immediately available. If the function
- * can't complete immediately, the caller will be notified about the incoming
- * connection when it calls pj_ioqueue_poll(). If a new connection is
- * immediately available, the function returns PJ_SUCCESS with the new
+ * listening socket. This function will return immediately (i.e. non-blocking) + * regardless whether a connection is immediately available. If the function + * can't complete immediately, the caller will be notified about the incoming + * connection when it calls pj_ioqueue_poll(). If a new connection is + * immediately available, the function returns PJ_SUCCESS with the new * connection; in this case, the callback WILL NOT be called. * - * @param key The key which registered to the server socket.
- * @param op_key An operation specific key to be associated with the
- * pending operation, so that application can keep track of
- * which operation has been completed when the callback is
+ * @param key The key which registered to the server socket. + * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is * called. * @param new_sock Argument which contain pointer to receive the new socket * for the incoming connection. @@ -349,15 +349,15 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, * address, and on output, contains the actual length of the * address. This argument is optional. * @return - * - PJ_SUCCESS When connection is available immediately, and the
- * parameters will be updated to contain information about
- * the new connection. In this case, a completion callback
+ * - PJ_SUCCESS When connection is available immediately, and the + * parameters will be updated to contain information about + * the new connection. In this case, a completion callback * WILL NOT be called. - * - PJ_EPENDING If no connection is available immediately. When a new
+ * - PJ_EPENDING If no connection is available immediately. When a new * connection arrives, the callback will be called. * - non-zero which indicates the appropriate error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t *sock, pj_sockaddr_t *local, @@ -366,9 +366,9 @@ PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, /** * Initiate non-blocking socket connect. If the socket can NOT be connected - * immediately, asynchronous connect() will be scheduled and caller will be
- * notified via completion callback when it calls pj_ioqueue_poll(). If
- * socket is connected immediately, the function returns PJ_SUCCESS and
+ * immediately, asynchronous connect() will be scheduled and caller will be + * notified via completion callback when it calls pj_ioqueue_poll(). If + * socket is connected immediately, the function returns PJ_SUCCESS and * completion callback WILL NOT be called. * * @param key The key associated with TCP socket @@ -376,7 +376,7 @@ PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, * @param addrlen The remote address length. * * @return - * - PJ_SUCCESS If socket is connected immediately. In this case, the
+ * - PJ_SUCCESS If socket is connected immediately. In this case, the * completion callback WILL NOT be called. * - PJ_EPENDING If operation is queued, or * - non-zero Indicates the error code. @@ -404,40 +404,40 @@ PJ_DECL(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, /** - * Instruct the I/O Queue to read from the specified handle. This function
- * returns immediately (i.e. non-blocking) regardless whether some data has
- * been transfered. If the operation can't complete immediately, caller will
- * be notified about the completion when it calls pj_ioqueue_poll(). If data
- * is immediately available, the function will return PJ_SUCCESS and the
- * callback WILL NOT be called.
+ * Instruct the I/O Queue to read from the specified handle. This function + * returns immediately (i.e. non-blocking) regardless whether some data has + * been transfered. If the operation can't complete immediately, caller will + * be notified about the completion when it calls pj_ioqueue_poll(). If data + * is immediately available, the function will return PJ_SUCCESS and the + * callback WILL NOT be called. * * @param key The key that uniquely identifies the handle. - * @param op_key An operation specific key to be associated with the
- * pending operation, so that application can keep track of
- * which operation has been completed when the callback is
- * called. Caller must make sure that this key remains
- * valid until the function completes.
+ * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. Caller must make sure that this key remains + * valid until the function completes. * @param buffer The buffer to hold the read data. The caller MUST make sure * that this buffer remain valid until the framework completes * reading the handle. - * @param length On input, it specifies the size of the buffer. If data is
- * available to be read immediately, the function returns
- * PJ_SUCCESS and this argument will be filled with the
- * amount of data read. If the function is pending, caller
- * will be notified about the amount of data read in the
- * callback. This parameter can point to local variable in
- * caller's stack and doesn't have to remain valid for the
+ * @param length On input, it specifies the size of the buffer. If data is + * available to be read immediately, the function returns + * PJ_SUCCESS and this argument will be filled with the + * amount of data read. If the function is pending, caller + * will be notified about the amount of data read in the + * callback. This parameter can point to local variable in + * caller's stack and doesn't have to remain valid for the * duration of pending operation. * @param flags Recv flag. * * @return - * - PJ_SUCCESS If immediate data has been received in the buffer. In this
+ * - PJ_SUCCESS If immediate data has been received in the buffer. In this * case, the callback WILL NOT be called. - * - PJ_EPENDING If the operation has been queued, and the callback will be
+ * - PJ_EPENDING If the operation has been queued, and the callback will be * called when data has been received. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, @@ -450,26 +450,26 @@ PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, * remain valid until the framework completes reading the data. * * @param key The key that uniquely identifies the handle. - * @param op_key An operation specific key to be associated with the
- * pending operation, so that application can keep track of
- * which operation has been completed when the callback is
- * called.
+ * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. * @param buffer The buffer to hold the read data. The caller MUST make sure * that this buffer remain valid until the framework completes * reading the handle. - * @param length On input, it specifies the size of the buffer. If data is
- * available to be read immediately, the function returns
- * PJ_SUCCESS and this argument will be filled with the
- * amount of data read. If the function is pending, caller
- * will be notified about the amount of data read in the
- * callback. This parameter can point to local variable in
- * caller's stack and doesn't have to remain valid for the
- * duration of pending operation.
+ * @param length On input, it specifies the size of the buffer. If data is + * available to be read immediately, the function returns + * PJ_SUCCESS and this argument will be filled with the + * amount of data read. If the function is pending, caller + * will be notified about the amount of data read in the + * callback. This parameter can point to local variable in + * caller's stack and doesn't have to remain valid for the + * duration of pending operation. * @param flags Recv flag. * @param addr Optional Pointer to buffer to receive the address. * @param addrlen On input, specifies the length of the address buffer. * On output, it will be filled with the actual length of - * the address. This argument can be NULL if \c addr is not
+ * the address. This argument can be NULL if \c addr is not * specified. * * @return @@ -479,7 +479,7 @@ PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, * - PJ_EPENDING If the operation has been queued. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, @@ -489,39 +489,39 @@ PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, /** - * Instruct the I/O Queue to write to the handle. This function will return
- * immediately (i.e. non-blocking) regardless whether some data has been
- * transfered. If the function can't complete immediately, the caller will
- * be notified about the completion when it calls pj_ioqueue_poll(). If
- * operation completes immediately and data has been transfered, the function
- * returns PJ_SUCCESS and the callback will NOT be called.
- *
+ * Instruct the I/O Queue to write to the handle. This function will return + * immediately (i.e. non-blocking) regardless whether some data has been + * transfered. If the function can't complete immediately, the caller will + * be notified about the completion when it calls pj_ioqueue_poll(). If + * operation completes immediately and data has been transfered, the function + * returns PJ_SUCCESS and the callback will NOT be called. + * * @param key The key that identifies the handle. - * @param op_key An operation specific key to be associated with the
- * pending operation, so that application can keep track of
- * which operation has been completed when the callback is
- * called.
+ * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. * @param data The data to send. Caller MUST make sure that this buffer * remains valid until the write operation completes. - * @param length On input, it specifies the length of data to send. When
- * data was sent immediately, this function returns PJ_SUCCESS
- * and this parameter contains the length of data sent. If
- * data can not be sent immediately, an asynchronous operation
- * is scheduled and caller will be notified via callback the
- * number of bytes sent. This parameter can point to local
- * variable on caller's stack and doesn't have to remain
+ * @param length On input, it specifies the length of data to send. When + * data was sent immediately, this function returns PJ_SUCCESS + * and this parameter contains the length of data sent. If + * data can not be sent immediately, an asynchronous operation + * is scheduled and caller will be notified via callback the + * number of bytes sent. This parameter can point to local + * variable on caller's stack and doesn't have to remain * valid until the operation has completed. * @param flags Send flags. * * @return - * - PJ_SUCCESS If data was immediately transfered. In this case, no
- * pending operation has been scheduled and the callback
+ * - PJ_SUCCESS If data was immediately transfered. In this case, no + * pending operation has been scheduled and the callback * WILL NOT be called. - * - PJ_EPENDING If the operation has been queued. Once data base been
+ * - PJ_EPENDING If the operation has been queued. Once data base been * transfered, the callback will be called. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, @@ -533,20 +533,20 @@ PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, * pj_sock_sendto() (or equivalent) will be called to send the data. * * @param key the key that identifies the handle. - * @param op_key An operation specific key to be associated with the
- * pending operation, so that application can keep track of
- * which operation has been completed when the callback is
- * called.
+ * @param op_key An operation specific key to be associated with the + * pending operation, so that application can keep track of + * which operation has been completed when the callback is + * called. * @param data the data to send. Caller MUST make sure that this buffer * remains valid until the write operation completes. - * @param length On input, it specifies the length of data to send. When
- * data was sent immediately, this function returns PJ_SUCCESS
- * and this parameter contains the length of data sent. If
- * data can not be sent immediately, an asynchronous operation
- * is scheduled and caller will be notified via callback the
- * number of bytes sent. This parameter can point to local
- * variable on caller's stack and doesn't have to remain
- * valid until the operation has completed.
+ * @param length On input, it specifies the length of data to send. When + * data was sent immediately, this function returns PJ_SUCCESS + * and this parameter contains the length of data sent. If + * data can not be sent immediately, an asynchronous operation + * is scheduled and caller will be notified via callback the + * number of bytes sent. This parameter can point to local + * variable on caller's stack and doesn't have to remain + * valid until the operation has completed. * @param flags send flags. * @param addr Optional remote address. * @param addrlen Remote address length, \c addr is specified. @@ -556,7 +556,7 @@ PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, * - PJ_EPENDING If the operation has been queued. * - non-zero The return value indicates the error code. */ -PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index b5599d9c..774d53e3 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -1,813 +1,813 @@ -/* $Id$ */
-
-#include <pj/ioqueue.h>
-#include <pj/errno.h>
-#include <pj/list.h>
-#include <pj/sock.h>
-#include <pj/lock.h>
-#include <pj/assert.h>
-#include <pj/string.h>
-
-
-static void ioqueue_init( pj_ioqueue_t *ioqueue )
-{
- ioqueue->lock = NULL;
- ioqueue->auto_delete_lock = 0;
-}
-
-static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
-{
- if (ioqueue->auto_delete_lock && ioqueue->lock )
- return pj_lock_destroy(ioqueue->lock);
- else
- return PJ_SUCCESS;
-}
-
-/*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
-
- if (ioqueue->auto_delete_lock && ioqueue->lock) {
- pj_lock_destroy(ioqueue->lock);
- }
-
- ioqueue->lock = lock;
- ioqueue->auto_delete_lock = auto_delete;
-
- return PJ_SUCCESS;
-}
-
-static pj_status_t ioqueue_init_key( pj_pool_t *pool,
- pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t sock,
- void *user_data,
- const pj_ioqueue_callback *cb)
-{
- pj_status_t rc;
- int optlen;
-
- key->ioqueue = ioqueue;
- key->fd = sock;
- key->user_data = user_data;
- pj_list_init(&key->read_list);
- pj_list_init(&key->write_list);
-#if PJ_HAS_TCP
- pj_list_init(&key->accept_list);
-#endif
-
- /* Save callback. */
- pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
-
- /* Get socket type. When socket type is datagram, some optimization
- * will be performed during send to allow parallel send operations.
- */
- optlen = sizeof(key->fd_type);
- rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
- &key->fd_type, &optlen);
- if (rc != PJ_SUCCESS)
- key->fd_type = PJ_SOCK_STREAM;
-
- /* Create mutex for the key. */
- rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
-
- return rc;
-}
-
-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
-{
- pj_mutex_destroy(key->mutex);
-}
-
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
- */
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(key != NULL, NULL);
- return key->user_data;
-}
-
-/*
- * pj_ioqueue_set_user_data()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data)
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
- key->user_data = user_data;
-
- return PJ_SUCCESS;
-}
-
-PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->write_list);
-}
-
-PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
-{
- return !pj_list_empty(&key->read_list);
-}
-
-PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
-{
-#if PJ_HAS_TCP
- return !pj_list_empty(&key->accept_list);
-#else
- return 0;
-#endif
-}
-
-PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
-{
- return key->connecting;
-}
-
-
-/*
- * ioqueue_dispatch_event()
- *
- * Report occurence of an event in the key to be processed by the
- * framework.
- */
-void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
-{
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if (h->connecting) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
- /* Clear operation. */
- h->connecting = 0;
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
-#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Done. */
-
- } else
-#endif /* PJ_HAS_TCP */
- if (key_has_pending_write(h)) {
- /* Socket is writable. */
- struct write_operation *write_op;
- pj_ssize_t sent;
- pj_status_t send_rc;
-
- /* Get the first in the queue. */
- write_op = h->write_list.next;
-
- /* For datagrams, we can remove the write_op from the list
- * so that send() can work in parallel.
- */
- if (h->fd_type == PJ_SOCK_DGRAM) {
- pj_list_erase(write_op);
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- pj_mutex_unlock(h->mutex);
- }
-
- /* Send the data.
- * Unfortunately we must do this while holding key's mutex, thus
- * preventing parallel write on a single key.. :-((
- */
- sent = write_op->size - write_op->written;
- if (write_op->op == PJ_IOQUEUE_OP_SEND) {
- send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
- &sent, write_op->flags);
- } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
- send_rc = pj_sock_sendto(h->fd,
- write_op->buf+write_op->written,
- &sent, write_op->flags,
- &write_op->rmt_addr,
- write_op->rmt_addrlen);
- } else {
- pj_assert(!"Invalid operation type!");
- send_rc = PJ_EBUG;
- }
-
- if (send_rc == PJ_SUCCESS) {
- write_op->written += sent;
- } else {
- pj_assert(send_rc > 0);
- write_op->written = -send_rc;
- }
-
- /* Are we finished with this buffer? */
- if (send_rc!=PJ_SUCCESS ||
- write_op->written == (pj_ssize_t)write_op->size ||
- h->fd_type == PJ_SOCK_DGRAM)
- {
- if (h->fd_type != PJ_SOCK_DGRAM) {
- /* Write completion of the whole stream. */
- pj_list_erase(write_op);
-
- /* Clear operation if there's no more data to send. */
- if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
-
- /* No need to hold mutex anymore */
- pj_mutex_unlock(h->mutex);
- }
-
- /* Call callback. */
- if (h->cb.on_write_complete) {
- (*h->cb.on_write_complete)(h,
- (pj_ioqueue_op_key_t*)write_op,
- write_op->written);
- }
-
- } else {
- pj_mutex_unlock(h->mutex);
- }
-
- /* Done. */
- } else {
- pj_assert(!"Descriptor is signaled but key "
- "has no pending operation!");
-
- pj_mutex_unlock(h->mutex);
- }
-}
-
-void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
-{
- pj_status_t rc;
-
- /* Lock the key. */
- pj_mutex_lock(h->mutex);
-
-# if PJ_HAS_TCP
- if (!pj_list_empty(&h->accept_list)) {
-
- struct accept_operation *accept_op;
-
- /* Get one accept operation from the list. */
- accept_op = h->accept_list.next;
- pj_list_erase(accept_op);
-
- /* Clear bit in fdset if there is no more pending accept */
- if (pj_list_empty(&h->accept_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- rc=pj_sock_accept(h->fd, accept_op->accept_fd,
- accept_op->rmt_addr, accept_op->addrlen);
- if (rc==PJ_SUCCESS && accept_op->local_addr) {
- rc = pj_sock_getsockname(*accept_op->accept_fd,
- accept_op->local_addr,
- accept_op->addrlen);
- }
-
- /* Call callback. */
- if (h->cb.on_accept_complete)
- (*h->cb.on_accept_complete)(h,
- (pj_ioqueue_op_key_t*)accept_op,
- *accept_op->accept_fd, rc);
-
- }
- else
-# endif
- if (key_has_pending_read(h)) {
- struct read_operation *read_op;
- pj_ssize_t bytes_read;
-
- pj_assert(!pj_list_empty(&h->read_list));
-
- /* Get one pending read operation from the list. */
- read_op = h->read_list.next;
- pj_list_erase(read_op);
-
- /* Clear fdset if there is no pending read. */
- if (pj_list_empty(&h->read_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
-
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
-
- bytes_read = read_op->size;
-
- if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
- read_op->rmt_addr,
- read_op->rmt_addrlen);
- } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- } else {
- pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
- /*
- * User has specified pj_ioqueue_read().
- * On Win32, we should do ReadFile(). But because we got
- * here because of select() anyway, user must have put a
- * socket descriptor on h->fd, which in this case we can
- * just call pj_sock_recv() instead of ReadFile().
- * On Unix, user may put a file in h->fd, so we'll have
- * to call read() here.
- * This may not compile on systems which doesn't have
- * read(). That's why we only specify PJ_LINUX here so
- * that error is easier to catch.
- */
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
- // &bytes_read, NULL);
-# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
- bytes_read = read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
-# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
- bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
-# else
-# error "Implement read() for this platform!"
-# endif
- }
-
- if (rc != PJ_SUCCESS) {
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- /* On Win32, for UDP, WSAECONNRESET on the receive side
- * indicates that previous sending has triggered ICMP Port
- * Unreachable message.
- * But we wouldn't know at this point which one of previous
- * key that has triggered the error, since UDP socket can
- * be shared!
- * So we'll just ignore it!
- */
-
- if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
- //PJ_LOG(4,(THIS_FILE,
- // "Ignored ICMP port unreach. on key=%p", h));
- }
-# endif
-
- /* In any case we would report this to caller. */
- bytes_read = -rc;
- }
-
- /* Call callback. */
- if (h->cb.on_read_complete) {
- (*h->cb.on_read_complete)(h,
- (pj_ioqueue_op_key_t*)read_op,
- bytes_read);
- }
-
- } else {
- pj_mutex_unlock(h->mutex);
- }
-}
-
-
-void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *h )
-{
- pj_mutex_lock(h->mutex);
-
- if (!h->connecting) {
- /* It is possible that more than one thread was woken up, thus
- * the remaining thread will see h->connecting as zero because
- * it has been processed by other thread.
- */
- pj_mutex_unlock(h->mutex);
- return;
- }
-
- /* Clear operation. */
- h->connecting = 0;
-
- pj_mutex_unlock(h->mutex);
-
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-}
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags )
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recv(key->fd, buffer, &size, flags);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags,
- pj_sockaddr_t *addr,
- int *addrlen)
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
- addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
- read_op->rmt_addr = addr;
- read_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_send(key->fd, data, &sent, flags);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Schedule asynchronous send.
- */
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags,
- const pj_sockaddr_t *addr,
- int addrlen)
-{
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * Check that address storage can hold the address parameter.
- */
- PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
-
- /*
- * Schedule asynchronous send.
- */
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND_TO;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
- pj_memcpy(&write_op->rmt_addr, addr, addrlen);
- write_op->rmt_addrlen = addrlen;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- struct accept_operation *accept_op;
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
-
- /* Fast track:
- * See if there's new connection available immediately.
- */
- if (pj_list_empty(&key->accept_list)) {
- status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! New connection is available! */
- if (local && addrlen) {
- status = pj_sock_getsockname(*new_sock, local, addrlen);
- if (status != PJ_SUCCESS) {
- pj_sock_close(*new_sock);
- *new_sock = PJ_INVALID_SOCKET;
- return status;
- }
- }
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * No connection is available immediately.
- * Schedule accept() operation to be completed when there is incoming
- * connection available.
- */
- accept_op = (struct accept_operation*)op_key;
-
- accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
- accept_op->accept_fd = new_sock;
- accept_op->rmt_addr = remote;
- accept_op->addrlen= addrlen;
- accept_op->local_addr = local;
-
- pj_mutex_lock(key->mutex);
- pj_list_insert_before(&key->accept_list, accept_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
-
- return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
-{
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
-
- /* Check if socket has not been marked for connecting */
- if (key->connecting != 0)
- return PJ_EPENDING;
-
- status = pj_sock_connect(key->fd, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Connected! */
- return PJ_SUCCESS;
- } else {
- if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
- /* Pending! */
- pj_mutex_lock(key->mutex);
- key->connecting = PJ_TRUE;
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
- pj_mutex_unlock(key->mutex);
- return PJ_EPENDING;
- } else {
- /* Error! */
- return status;
- }
- }
-}
-#endif /* PJ_HAS_TCP */
-
+/* $Id$ */ + +/* + * ioqueue_common_abs.c + * + * This contains common functionalities to emulate proactor pattern with + * various event dispatching mechanisms (e.g. select, epoll). + * + * This file will be included by the appropriate ioqueue implementation. + * This file is NOT supposed to be compiled as stand-alone source. + */ + +static void ioqueue_init( pj_ioqueue_t *ioqueue ) +{ + ioqueue->lock = NULL; + ioqueue->auto_delete_lock = 0; +} + +static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue) +{ + if (ioqueue->auto_delete_lock && ioqueue->lock ) + return pj_lock_destroy(ioqueue->lock); + else + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_set_lock() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, + pj_lock_t *lock, + pj_bool_t auto_delete ) +{ + PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); + + if (ioqueue->auto_delete_lock && ioqueue->lock) { + pj_lock_destroy(ioqueue->lock); + } + + ioqueue->lock = lock; + ioqueue->auto_delete_lock = auto_delete; + + return PJ_SUCCESS; +} + +static pj_status_t ioqueue_init_key( pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + pj_sock_t sock, + void *user_data, + const pj_ioqueue_callback *cb) +{ + pj_status_t rc; + int optlen; + + key->ioqueue = ioqueue; + key->fd = sock; + key->user_data = user_data; + pj_list_init(&key->read_list); + pj_list_init(&key->write_list); +#if PJ_HAS_TCP + pj_list_init(&key->accept_list); +#endif + + /* Save callback. */ + pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + + /* Get socket type. When socket type is datagram, some optimization + * will be performed during send to allow parallel send operations. + */ + optlen = sizeof(key->fd_type); + rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE, + &key->fd_type, &optlen); + if (rc != PJ_SUCCESS) + key->fd_type = PJ_SOCK_STREAM; + + /* Create mutex for the key. */ + rc = pj_mutex_create_simple(pool, NULL, &key->mutex); + + return rc; +} + +static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) +{ + pj_mutex_destroy(key->mutex); +} + +/* + * pj_ioqueue_get_user_data() + * + * Obtain value associated with a key. + */ +PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(key != NULL, NULL); + return key->user_data; +} + +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + key->user_data = user_data; + + return PJ_SUCCESS; +} + +PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->write_list); +} + +PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key) +{ + return !pj_list_empty(&key->read_list); +} + +PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key) +{ +#if PJ_HAS_TCP + return !pj_list_empty(&key->accept_list); +#else + return 0; +#endif +} + +PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) +{ + return key->connecting; +} + + +/* + * ioqueue_dispatch_event() + * + * Report occurence of an event in the key to be processed by the + * framework. + */ +void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) +{ + /* Lock the key. */ + pj_mutex_lock(h->mutex); + +#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 + if (h->connecting) { + /* Completion of connect() operation */ + pj_ssize_t bytes_transfered; + + /* Clear operation. */ + h->connecting = 0; + + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + +#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) + /* from connect(2): + * On Linux, use getsockopt to read the SO_ERROR option at + * level SOL_SOCKET to determine whether connect() completed + * successfully (if SO_ERROR is zero). + */ + int value; + socklen_t vallen = sizeof(value); + int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, + &value, &vallen); + if (gs_rc != 0) { + /* Argh!! What to do now??? + * Just indicate that the socket is connected. The + * application will get error as soon as it tries to use + * the socket to send/receive. + */ + bytes_transfered = 0; + } else { + bytes_transfered = value; + } +#elif defined(PJ_WIN32) && PJ_WIN32!=0 + bytes_transfered = 0; /* success */ +#else + /* Excellent information in D.J. Bernstein page: + * http://cr.yp.to/docs/connect.html + * + * Seems like the most portable way of detecting connect() + * failure is to call getpeername(). If socket is connected, + * getpeername() will return 0. If the socket is not connected, + * it will return ENOTCONN, and read(fd, &ch, 1) will produce + * the right errno through error slippage. This is a combination + * of suggestions from Douglas C. Schmidt and Ken Keys. + */ + int gp_rc; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); + bytes_transfered = gp_rc; +#endif + + /* Call callback. */ + if (h->cb.on_connect_complete) + (*h->cb.on_connect_complete)(h, bytes_transfered); + + /* Done. */ + + } else +#endif /* PJ_HAS_TCP */ + if (key_has_pending_write(h)) { + /* Socket is writable. */ + struct write_operation *write_op; + pj_ssize_t sent; + pj_status_t send_rc; + + /* Get the first in the queue. */ + write_op = h->write_list.next; + + /* For datagrams, we can remove the write_op from the list + * so that send() can work in parallel. + */ + if (h->fd_type == PJ_SOCK_DGRAM) { + pj_list_erase(write_op); + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + + pj_mutex_unlock(h->mutex); + } + + /* Send the data. + * Unfortunately we must do this while holding key's mutex, thus + * preventing parallel write on a single key.. :-(( + */ + sent = write_op->size - write_op->written; + if (write_op->op == PJ_IOQUEUE_OP_SEND) { + send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, + &sent, write_op->flags); + } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { + send_rc = pj_sock_sendto(h->fd, + write_op->buf+write_op->written, + &sent, write_op->flags, + &write_op->rmt_addr, + write_op->rmt_addrlen); + } else { + pj_assert(!"Invalid operation type!"); + send_rc = PJ_EBUG; + } + + if (send_rc == PJ_SUCCESS) { + write_op->written += sent; + } else { + pj_assert(send_rc > 0); + write_op->written = -send_rc; + } + + /* Are we finished with this buffer? */ + if (send_rc!=PJ_SUCCESS || + write_op->written == (pj_ssize_t)write_op->size || + h->fd_type == PJ_SOCK_DGRAM) + { + if (h->fd_type != PJ_SOCK_DGRAM) { + /* Write completion of the whole stream. */ + pj_list_erase(write_op); + + /* Clear operation if there's no more data to send. */ + if (pj_list_empty(&h->write_list)) + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + + /* No need to hold mutex anymore */ + pj_mutex_unlock(h->mutex); + } + + /* Call callback. */ + if (h->cb.on_write_complete) { + (*h->cb.on_write_complete)(h, + (pj_ioqueue_op_key_t*)write_op, + write_op->written); + } + + } else { + pj_mutex_unlock(h->mutex); + } + + /* Done. */ + } else { + pj_assert(!"Descriptor is signaled but key " + "has no pending operation!"); + + pj_mutex_unlock(h->mutex); + } +} + +void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) +{ + pj_status_t rc; + + /* Lock the key. */ + pj_mutex_lock(h->mutex); + +# if PJ_HAS_TCP + if (!pj_list_empty(&h->accept_list)) { + + struct accept_operation *accept_op; + + /* Get one accept operation from the list. */ + accept_op = h->accept_list.next; + pj_list_erase(accept_op); + + /* Clear bit in fdset if there is no more pending accept */ + if (pj_list_empty(&h->accept_list)) + ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + + rc=pj_sock_accept(h->fd, accept_op->accept_fd, + accept_op->rmt_addr, accept_op->addrlen); + if (rc==PJ_SUCCESS && accept_op->local_addr) { + rc = pj_sock_getsockname(*accept_op->accept_fd, + accept_op->local_addr, + accept_op->addrlen); + } + + /* Call callback. */ + if (h->cb.on_accept_complete) { + (*h->cb.on_accept_complete)(h, + (pj_ioqueue_op_key_t*)accept_op, + *accept_op->accept_fd, rc); + } + + } + else +# endif + if (key_has_pending_read(h)) { + struct read_operation *read_op; + pj_ssize_t bytes_read; + + /* Get one pending read operation from the list. */ + read_op = h->read_list.next; + pj_list_erase(read_op); + + /* Clear fdset if there is no pending read. */ + if (pj_list_empty(&h->read_list)) + ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + + bytes_read = read_op->size; + + if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { + rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, + read_op->rmt_addr, + read_op->rmt_addrlen); + } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + } else { + pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); + /* + * User has specified pj_ioqueue_read(). + * On Win32, we should do ReadFile(). But because we got + * here because of select() anyway, user must have put a + * socket descriptor on h->fd, which in this case we can + * just call pj_sock_recv() instead of ReadFile(). + * On Unix, user may put a file in h->fd, so we'll have + * to call read() here. + * This may not compile on systems which doesn't have + * read(). That's why we only specify PJ_LINUX here so + * that error is easier to catch. + */ +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size, + // &bytes_read, NULL); +# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) + bytes_read = read(h->fd, read_op->buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); +# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 + bytes_read = sys_read(h->fd, read_op->buf, bytes_read); + rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; +# else +# error "Implement read() for this platform!" +# endif + } + + if (rc != PJ_SUCCESS) { +# if defined(PJ_WIN32) && PJ_WIN32 != 0 + /* On Win32, for UDP, WSAECONNRESET on the receive side + * indicates that previous sending has triggered ICMP Port + * Unreachable message. + * But we wouldn't know at this point which one of previous + * key that has triggered the error, since UDP socket can + * be shared! + * So we'll just ignore it! + */ + + if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { + //PJ_LOG(4,(THIS_FILE, + // "Ignored ICMP port unreach. on key=%p", h)); + } +# endif + + /* In any case we would report this to caller. */ + bytes_read = -rc; + } + + /* Call callback. */ + if (h->cb.on_read_complete) { + (*h->cb.on_read_complete)(h, + (pj_ioqueue_op_key_t*)read_op, + bytes_read); + } + + } else { + pj_mutex_unlock(h->mutex); + } +} + + +void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) +{ + pj_mutex_lock(h->mutex); + + if (!h->connecting) { + /* It is possible that more than one thread was woken up, thus + * the remaining thread will see h->connecting as zero because + * it has been processed by other thread. + */ + pj_mutex_unlock(h->mutex); + return; + } + + /* Clear operation. */ + h->connecting = 0; + + pj_mutex_unlock(h->mutex); + + ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + + /* Call callback. */ + if (h->cb.on_connect_complete) + (*h->cb.on_connect_complete)(h, -1); +} + +/* + * pj_ioqueue_recv() + * + * Start asynchronous recv() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags ) +{ + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recv(key->fd, buffer, &size, flags); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_recvfrom() + * + * Start asynchronous recvfrom() from the socket. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags, + pj_sockaddr_t *addr, + int *addrlen) +{ + pj_status_t status; + pj_ssize_t size; + struct read_operation *read_op; + + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Try to see if there's data immediately available. + */ + size = *length; + status = pj_sock_recvfrom(key->fd, buffer, &size, flags, + addr, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! Data is available! */ + *length = size; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) + return status; + } + + /* + * No data is immediately available. + * Must schedule asynchronous operation to the ioqueue. + */ + read_op = (struct read_operation*)op_key; + + read_op->op = PJ_IOQUEUE_OP_RECV_FROM; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags; + read_op->rmt_addr = addr; + read_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->read_list, read_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * pj_ioqueue_send() + * + * Start asynchronous send() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags) +{ + struct write_operation *write_op; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_send(key->fd, data, &sent, flags); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + + +/* + * pj_ioqueue_sendto() + * + * Start asynchronous write() to the descriptor. + */ +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags, + const pj_sockaddr_t *addr, + int addrlen) +{ + struct write_operation *write_op; + pj_status_t status; + pj_ssize_t sent; + + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); + PJ_CHECK_STACK(); + + /* Fast track: + * Try to send data immediately, only if there's no pending write! + * Note: + * We are speculating that the list is empty here without properly + * acquiring ioqueue's mutex first. This is intentional, to maximize + * performance via parallelism. + * + * This should be safe, because: + * - by convention, we require caller to make sure that the + * key is not unregistered while other threads are invoking + * an operation on the same key. + * - pj_list_empty() is safe to be invoked by multiple threads, + * even when other threads are modifying the list. + */ + if (pj_list_empty(&key->write_list)) { + /* + * See if data can be sent immediately. + */ + sent = *length; + status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Success! */ + *length = sent; + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * Check that address storage can hold the address parameter. + */ + PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG); + + /* + * Schedule asynchronous send. + */ + write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND_TO; + write_op->buf = NULL; + write_op->size = *length; + write_op->written = 0; + write_op->flags = flags; + pj_memcpy(&write_op->rmt_addr, addr, addrlen); + write_op->rmt_addrlen = addrlen; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->write_list, write_op); + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +#if PJ_HAS_TCP +/* + * Initiate overlapped accept() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{ + struct accept_operation *accept_op; + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* Fast track: + * See if there's new connection available immediately. + */ + if (pj_list_empty(&key->accept_list)) { + status = pj_sock_accept(key->fd, new_sock, remote, addrlen); + if (status == PJ_SUCCESS) { + /* Yes! New connection is available! */ + if (local && addrlen) { + status = pj_sock_getsockname(*new_sock, local, addrlen); + if (status != PJ_SUCCESS) { + pj_sock_close(*new_sock); + *new_sock = PJ_INVALID_SOCKET; + return status; + } + } + return PJ_SUCCESS; + } else { + /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report + * the error to caller. + */ + if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) { + return status; + } + } + } + + /* + * No connection is available immediately. + * Schedule accept() operation to be completed when there is incoming + * connection available. + */ + accept_op = (struct accept_operation*)op_key; + + accept_op->op = PJ_IOQUEUE_OP_ACCEPT; + accept_op->accept_fd = new_sock; + accept_op->rmt_addr = remote; + accept_op->addrlen= addrlen; + accept_op->local_addr = local; + + pj_mutex_lock(key->mutex); + pj_list_insert_before(&key->accept_list, accept_op); + ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + pj_mutex_unlock(key->mutex); + + return PJ_EPENDING; +} + +/* + * Initiate overlapped connect() operation (well, it's non-blocking actually, + * since there's no overlapped version of connect()). + */ +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, + const pj_sockaddr_t *addr, + int addrlen ) +{ + pj_status_t status; + + /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + + /* Check if socket has not been marked for connecting */ + if (key->connecting != 0) + return PJ_EPENDING; + + status = pj_sock_connect(key->fd, addr, addrlen); + if (status == PJ_SUCCESS) { + /* Connected! */ + return PJ_SUCCESS; + } else { + if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { + /* Pending! */ + pj_mutex_lock(key->mutex); + key->connecting = PJ_TRUE; + ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT); + pj_mutex_unlock(key->mutex); + return PJ_EPENDING; + } else { + /* Error! */ + return status; + } + } +} +#endif /* PJ_HAS_TCP */ + diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index c6fc1ff6..1902ff46 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -1,107 +1,108 @@ -/* $Id */
-
-/* ioqueue_common_abs.h
- *
- * This file contains private declarations for abstracting various
- * event polling/dispatching mechanisms (e.g. select, poll, epoll)
- * to the ioqueue.
- */
-
-#include <pj/list.h>
-
-/*
- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
- * the correct error code.
- */
-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
-# error "Error reporting must be enabled for this function to work!"
-#endif
-
-
-struct generic_operation
-{
- PJ_DECL_LIST_MEMBER(struct generic_operation);
- pj_ioqueue_operation_e op;
-};
-
-struct read_operation
-{
- PJ_DECL_LIST_MEMBER(struct read_operation);
- pj_ioqueue_operation_e op;
-
- void *buf;
- pj_size_t size;
- unsigned flags;
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-};
-
-struct write_operation
-{
- PJ_DECL_LIST_MEMBER(struct write_operation);
- pj_ioqueue_operation_e op;
-
- char *buf;
- pj_size_t size;
- pj_ssize_t written;
- unsigned flags;
- pj_sockaddr_in rmt_addr;
- int rmt_addrlen;
-};
-
-#if PJ_HAS_TCP
-struct accept_operation
-{
- PJ_DECL_LIST_MEMBER(struct accept_operation);
- pj_ioqueue_operation_e op;
-
- pj_sock_t *accept_fd;
- pj_sockaddr_t *local_addr;
- pj_sockaddr_t *rmt_addr;
- int *addrlen;
-};
-#endif
-
-union operation_key
-{
- struct generic_operation generic;
- struct read_operation read;
- struct write_operation write;
-#if PJ_HAS_TCP
- struct accept_operation accept;
-#endif
-};
-
-#define DECLARE_COMMON_KEY \
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
- pj_ioqueue_t *ioqueue; \
- pj_mutex_t *mutex; \
- pj_sock_t fd; \
- int fd_type; \
- void *user_data; \
- pj_ioqueue_callback cb; \
- int connecting; \
- struct read_operation read_list; \
- struct write_operation write_list; \
- struct accept_operation accept_list;
-
-
-#define DECLARE_COMMON_IOQUEUE \
- pj_lock_t *lock; \
- pj_bool_t auto_delete_lock;
-
-
-enum ioqueue_event_type
-{
- NO_EVENT,
- READABLE_EVENT,
- WRITEABLE_EVENT,
- EXCEPTION_EVENT,
-};
-
-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type );
-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type);
+/* $Id */ + +/* ioqueue_common_abs.h + * + * This file contains private declarations for abstracting various + * event polling/dispatching mechanisms (e.g. select, poll, epoll) + * to the ioqueue. + */ + +#include <pj/list.h> + +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Proper error reporting must be enabled for ioqueue to work!" +#endif + + +struct generic_operation +{ + PJ_DECL_LIST_MEMBER(struct generic_operation); + pj_ioqueue_operation_e op; +}; + +struct read_operation +{ + PJ_DECL_LIST_MEMBER(struct read_operation); + pj_ioqueue_operation_e op; + + void *buf; + pj_size_t size; + unsigned flags; + pj_sockaddr_t *rmt_addr; + int *rmt_addrlen; +}; + +struct write_operation +{ + PJ_DECL_LIST_MEMBER(struct write_operation); + pj_ioqueue_operation_e op; + + char *buf; + pj_size_t size; + pj_ssize_t written; + unsigned flags; + pj_sockaddr_in rmt_addr; + int rmt_addrlen; +}; + +#if PJ_HAS_TCP +struct accept_operation +{ + PJ_DECL_LIST_MEMBER(struct accept_operation); + pj_ioqueue_operation_e op; + + pj_sock_t *accept_fd; + pj_sockaddr_t *local_addr; + pj_sockaddr_t *rmt_addr; + int *addrlen; +}; +#endif + +union operation_key +{ + struct generic_operation generic; + struct read_operation read; + struct write_operation write; +#if PJ_HAS_TCP + struct accept_operation accept; +#endif +}; + +#define DECLARE_COMMON_KEY \ + PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ + pj_ioqueue_t *ioqueue; \ + pj_mutex_t *mutex; \ + pj_sock_t fd; \ + int fd_type; \ + void *user_data; \ + pj_ioqueue_callback cb; \ + int connecting; \ + struct read_operation read_list; \ + struct write_operation write_list; \ + struct accept_operation accept_list; + + +#define DECLARE_COMMON_IOQUEUE \ + pj_lock_t *lock; \ + pj_bool_t auto_delete_lock; + + +enum ioqueue_event_type +{ + NO_EVENT, + READABLE_EVENT, + WRITEABLE_EVENT, + EXCEPTION_EVENT, +}; + +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ); +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type); + diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index 24f9bfbb..aa012531 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -1,5 +1,4 @@ /* $Id$ - * */ /* * ioqueue_epoll.c @@ -30,7 +29,7 @@ # define epoll_data data.ptr # define epoll_data_type void* -# define ioctl_val_type unsigned long* +# define ioctl_val_type unsigned long # define getsockopt_val_ptr int* # define os_getsockopt getsockopt # define os_ioctl ioctl @@ -126,51 +125,20 @@ #define THIS_FILE "ioq_epoll" -#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \ - (op & PJ_IOQUEUE_OP_RECV) || \ - (op & PJ_IOQUEUE_OP_RECV_FROM)) -#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \ - (op & PJ_IOQUEUE_OP_SEND) || \ - (op & PJ_IOQUEUE_OP_SEND_TO)) - - -#if PJ_HAS_TCP -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT) -# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT) -#else -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0 -# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0 -#endif - - //#define TRACE_(expr) PJ_LOG(3,expr) #define TRACE_(expr) +/* + * Include common ioqueue abstraction. + */ +#include "ioqueue_common_abs.h" /* * This describes each key. */ struct pj_ioqueue_key_t { - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) - pj_sock_t fd; - pj_ioqueue_operation_e op; - void *user_data; - pj_ioqueue_callback cb; - - void *rd_buf; - unsigned rd_flags; - pj_size_t rd_buflen; - void *wr_buf; - pj_size_t wr_buflen; - - pj_sockaddr_t *rmt_addr; - int *rmt_addrlen; - - pj_sockaddr_t *local_addr; - int *local_addrlen; - - pj_sock_t *accept_fd; + DECLARE_COMMON_KEY }; /* @@ -178,13 +146,18 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - pj_lock_t *lock; - pj_bool_t auto_delete_lock; + DECLARE_COMMON_IOQUEUE + unsigned max, count; pj_ioqueue_key_t hlist; int epfd; }; +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" + /* * pj_ioqueue_create() * @@ -192,37 +165,45 @@ struct pj_ioqueue_t */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioque; + pj_ioqueue_t *ioqueue; pj_status_t rc; + pj_lock_t *lock; - PJ_UNUSED_ARG(max_threads); + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0, PJ_EINVAL); - if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { - pj_assert(!"max_fd too large"); - return PJ_EINVAL; - } + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); - ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); - ioque->max = max_fd; - ioque->count = 0; - pj_list_init(&ioque->hlist); + ioqueue->max = max_fd; + ioqueue->count = 0; + pj_list_init(&ioqueue->hlist); - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; - ioque->auto_delete_lock = PJ_TRUE; - ioque->epfd = os_epoll_create(max_fd); - if (ioque->epfd < 0) { + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) + return rc; + + ioqueue->epfd = os_epoll_create(max_fd); + if (ioqueue->epfd < 0) { + ioqueue_destroy(ioqueue); return PJ_RETURN_OS_ERROR(pj_get_native_os_error()); } - PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque)); + PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue)); - *p_ioqueue = ioque; + *p_ioqueue = ioqueue; return PJ_SUCCESS; } @@ -231,47 +212,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, * * Destroy ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) +PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); - PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP); - - pj_lock_acquire(ioque->lock); - os_close(ioque->epfd); - ioque->epfd = 0; - if (ioque->auto_delete_lock) - pj_lock_destroy(ioque->lock); - - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_set_lock() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, - pj_lock_t *lock, - pj_bool_t auto_delete ) -{ - PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); - - if (ioque->auto_delete_lock) { - pj_lock_destroy(ioque->lock); - } - - ioque->lock = lock; - ioque->auto_delete_lock = auto_delete; + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); - return PJ_SUCCESS; + pj_lock_acquire(ioqueue->lock); + os_close(ioqueue->epfd); + ioqueue->epfd = 0; + return ioqueue_destroy(ioqueue); } - /* * pj_ioqueue_register_sock() * * Register a socket to ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioque, + pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, @@ -283,12 +241,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, int status; pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET && + PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); - pj_lock_acquire(ioque->lock); + pj_lock_acquire(ioqueue->lock); - if (ioque->count >= ioque->max) { + if (ioqueue->count >= ioqueue->max) { rc = PJ_ETOOMANY; TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files")); goto on_return; @@ -305,16 +263,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - key->fd = sock; - key->user_data = user_data; - pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } /* os_epoll_ctl. */ ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; ev.epoll_data = (epoll_data_type)key; - status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev); + status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev); if (status < 0) { rc = pj_get_os_error(); + key = NULL; TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", status)); @@ -322,12 +283,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Register */ - pj_list_insert_before(&ioque->hlist, key); - ++ioque->count; + pj_list_insert_before(&ioqueue->hlist, key); + ++ioqueue->count; on_return: *p_key = key; - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; } @@ -337,179 +298,116 @@ on_return: * * Unregister handle from ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key) +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) { + pj_ioqueue_t *ioqueue; struct epoll_event ev; int status; - PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); + PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL); - pj_lock_acquire(ioque->lock); + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); - pj_assert(ioque->count > 0); - --ioque->count; + pj_assert(ioqueue->count > 0); + --ioqueue->count; pj_list_erase(key); ev.events = 0; ev.epoll_data = (epoll_data_type)key; - status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev); + status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev); if (status != 0) { pj_status_t rc = pj_get_os_error(); - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; } - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); + return PJ_SUCCESS; } -/* - * pj_ioqueue_get_user_data() - * - * Obtain value associated with a key. +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) { - PJ_ASSERT_RETURN(key != NULL, NULL); - return key->user_data; } +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ +} /* * pj_ioqueue_poll() * */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { int i, count, processed; - struct epoll_event events[16]; + struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; int msec; + struct queue { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; + } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; PJ_CHECK_STACK(); msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000; - count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec); + count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec); if (count <= 0) return count; /* Lock ioqueue. */ - pj_lock_acquire(ioque->lock); - - processed = 0; + pj_lock_acquire(ioqueue->lock); - for (i=0; i<count; ++i) { + for (processed=0, i=0; i<count; ++i) { pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) events[i].epoll_data; - pj_status_t rc; - - /* - * Check for completion of read operations. - */ - if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) { - pj_ssize_t bytes_read = h->rd_buflen; - - if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0, - h->rmt_addr, h->rmt_addrlen); - } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); - } else { - bytes_read = os_read( h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); - } - - if (rc != PJ_SUCCESS) { - bytes_read = -rc; - } - - h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | - PJ_IOQUEUE_OP_RECV_FROM); - - /* Call callback. */ - (*h->cb.on_read_complete)(h, bytes_read); - ++processed; - } /* - * Check for completion of accept() operation. + * Check readability. */ - else if ((events[i].events & EPOLLIN) && - (h->op & PJ_IOQUEUE_OP_ACCEPT)) - { - /* accept() must be the only operation specified on - * server socket - */ - pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT); - - rc = pj_sock_accept( h->fd, h->accept_fd, - h->rmt_addr, h->rmt_addrlen); - if (rc==PJ_SUCCESS && h->local_addr) { - rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, - h->local_addrlen); - } - - h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); - - /* Call callback. */ - (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); - + if ((events[i].events & EPOLLIN) && + (key_has_pending_read(h) || key_has_pending_accept(h))) { + queue[processed].key = h; + queue[processed].event_type = READABLE_EVENT; ++processed; } /* - * Check for completion of write operations. + * Check for writeability. */ - if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) { - /* Completion of write(), send(), or sendto() operation. */ - - /* Clear operation. */ - h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | - PJ_IOQUEUE_OP_SEND_TO); - - /* Call callback. */ - /* All data must have been sent? */ - (*h->cb.on_write_complete)(h, h->wr_buflen); - + if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { + queue[processed].key = h; + queue[processed].event_type = WRITEABLE_EVENT; ++processed; } + #if PJ_HAS_TCP /* * Check for completion of connect() operation. */ - else if ((events[i].events & EPOLLOUT) && - (h->op & PJ_IOQUEUE_OP_CONNECT)) - { - /* Completion of connect() operation */ - pj_ssize_t bytes_transfered; - - /* from connect(2): - * On Linux, use getsockopt to read the SO_ERROR option at - * level SOL_SOCKET to determine whether connect() completed - * successfully (if SO_ERROR is zero). - */ - int value; - socklen_t vallen = sizeof(value); - int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, - (getsockopt_val_ptr)&value, &vallen); - if (gs_rc != 0) { - /* Argh!! What to do now??? - * Just indicate that the socket is connected. The - * application will get error as soon as it tries to use - * the socket to send/receive. - */ - bytes_transfered = 0; - } else { - bytes_transfered = value; - } - - /* Clear operation. */ - h->op &= (~PJ_IOQUEUE_OP_CONNECT); - - /* Call callback. */ - (*h->cb.on_connect_complete)(h, bytes_transfered); - + if ((events[i].events & EPOLLOUT) && (h->connecting)) { + queue[processed].key = h; + queue[processed].event_type = WRITEABLE_EVENT; ++processed; } #endif /* PJ_HAS_TCP */ @@ -517,321 +415,32 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) /* * Check for error condition. */ - if (events[i].events & EPOLLERR) { - if (h->op & PJ_IOQUEUE_OP_CONNECT) { - h->op &= ~PJ_IOQUEUE_OP_CONNECT; - - /* Call callback. */ - (*h->cb.on_connect_complete)(h, -1); - - ++processed; - } + if (events[i].events & EPOLLERR && (h->connecting)) { + queue[processed].key = h; + queue[processed].event_type = EXCEPTION_EVENT; + ++processed; } } - - pj_lock_release(ioque->lock); - - return processed; -} - -/* - * pj_ioqueue_read() - * - * Start asynchronous read from the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_READ; - key->rd_flags = 0; - key->rd_buf = buffer; - key->rd_buflen = buflen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_recv() - * - * Start asynchronous recv() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags ) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_recvfrom() - * - * Start asynchronous recvfrom() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen, - unsigned flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV_FROM; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - key->rmt_addr = addr; - key->rmt_addrlen = addrlen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_write() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, 0); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_WRITE; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - -/* - * pj_ioqueue_send() - * - * Start asynchronous send() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, flags); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_sendto() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen, - unsigned flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_sendto() if it returns error. */ - rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND_TO; - key->wr_buf = NULL; - key->wr_buflen = datalen; - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -#if PJ_HAS_TCP -/* - * Initiate overlapped accept() operation. - */ -PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ - /* check parameters. All must be specified! */ - pj_assert(ioqueue && key && new_sock); - - /* Server socket must have no other operation! */ - pj_assert(key->op == 0); - - pj_lock_acquire(ioqueue->lock); - - key->op = PJ_IOQUEUE_OP_ACCEPT; - key->accept_fd = new_sock; - key->rmt_addr = remote; - key->rmt_addrlen = addrlen; - key->local_addr = local; - key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */ - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * Initiate overlapped connect() operation (well, it's non-blocking actually, - * since there's no overlapped version of connect()). - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{ - pj_status_t rc; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); - /* Connecting socket must have no other operation! */ - PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); - - rc = pj_sock_connect(key->fd, addr, addrlen); - if (rc == PJ_SUCCESS) { - /* Connected! */ - return PJ_SUCCESS; - } else { - if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || - rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) - { - /* Pending! */ - pj_lock_acquire(ioqueue->lock); - key->op = PJ_IOQUEUE_OP_CONNECT; - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; - } else { - /* Error! */ - return rc; - } + /* Now process the events. */ + for (i=0; i<processed; ++i) { + switch (queue[i].event_type) { + case READABLE_EVENT: + ioqueue_dispatch_read_event(ioqueue, queue[i].key); + break; + case WRITEABLE_EVENT: + ioqueue_dispatch_write_event(ioqueue, queue[i].key); + break; + case EXCEPTION_EVENT: + ioqueue_dispatch_exception_event(ioqueue, queue[i].key); + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } } + + return processed; } -#endif /* PJ_HAS_TCP */ diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 24e68564..c2051681 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -20,11 +20,11 @@ #include <pj/compat/socket.h> #include <pj/sock_select.h> #include <pj/errno.h> -
-/*
- * Include declaration from common abstraction.
- */
-#include "ioqueue_common_abs.h"
+ +/* + * Include declaration from common abstraction. + */ +#include "ioqueue_common_abs.h" /* * ISSUES with ioqueue_select() @@ -38,30 +38,30 @@ * */ #define THIS_FILE "ioq_select" -
-/*
- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
- * the correct error code.
- */
-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
-# error "Error reporting must be enabled for this function to work!"
-#endif
-
-/**
- * Get the number of descriptors in the set. This is defined in sock_select.c
- * This function will only return the number of sockets set from PJ_FD_SET
- * operation. When the set is modified by other means (such as by select()),
- * the count will not be reflected here.
- *
- * That's why don't export this function in the header file, to avoid
- * misunderstanding.
- *
- * @param fdsetp The descriptor set.
- *
- * @return Number of descriptors in the set.
- */
-PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
-
+ +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Error reporting must be enabled for this function to work!" +#endif + +/** + * Get the number of descriptors in the set. This is defined in sock_select.c + * This function will only return the number of sockets set from PJ_FD_SET + * operation. When the set is modified by other means (such as by select()), + * the count will not be reflected here. + * + * That's why don't export this function in the header file, to avoid + * misunderstanding. + * + * @param fdsetp The descriptor set. + * + * @return Number of descriptors in the set. + */ +PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); + /* * During debugging build, VALIDATE_FD_SET is set. @@ -72,12 +72,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); #else # define VALIDATE_FD_SET 0 #endif -
+ /* * This describes each key. */ struct pj_ioqueue_key_t -{
+{ DECLARE_COMMON_KEY }; @@ -86,7 +86,7 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - DECLARE_COMMON_IOQUEUE
+ DECLARE_COMMON_IOQUEUE unsigned max, count; pj_ioqueue_key_t key_list; @@ -96,11 +96,11 @@ struct pj_ioqueue_t pj_fd_set_t xfdset; #endif }; -
-/* Include implementation for common abstraction after we declare
- * pj_ioqueue_key_t and pj_ioqueue_t.
- */
-#include "ioqueue_common_abs.c"
+ +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" /* * pj_ioqueue_create() @@ -111,22 +111,22 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioqueue;
+ pj_ioqueue_t *ioqueue; pj_lock_t *lock; pj_status_t rc; -
- /* Check that arguments are valid. */
- PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
- max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
- PJ_EINVAL);
-
- /* Check that size of pj_ioqueue_op_key_t is sufficient */
- PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
- sizeof(union operation_key), PJ_EBUG);
-
- ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-
- ioqueue_init(ioqueue);
+ + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, + PJ_EINVAL); + + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); ioqueue->max = max_fd; ioqueue->count = 0; @@ -141,8 +141,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, if (rc != PJ_SUCCESS) return rc; - rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
- if (rc != PJ_SUCCESS)
+ rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); @@ -159,8 +159,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); -
- pj_lock_acquire(ioqueue->lock);
+ + pj_lock_acquire(ioqueue->lock); return ioqueue_destroy(ioqueue); } @@ -203,16 +203,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Create key. */ - key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
- if (rc != PJ_SUCCESS)
- return rc;
+ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } /* Register */ pj_list_insert_before(&ioqueue->key_list, key); ++ioqueue->count; -on_return:
+on_return: /* On error, socket may be left in non-blocking mode. */ *p_key = key; pj_lock_release(ioqueue->lock); @@ -226,13 +228,13 @@ on_return: * Unregister handle from ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) -{
- pj_ioqueue_t *ioqueue;
+{ + pj_ioqueue_t *ioqueue; PJ_ASSERT_RETURN(key, PJ_EINVAL); -
- ioqueue = key->ioqueue;
-
+ + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -243,21 +245,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) #if PJ_HAS_TCP PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif -
- /* ioqueue_destroy may try to acquire key's mutex.
- * Since normally the order of locking is to lock key's mutex first
- * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
- * release ioqueue's mutex first.
- */
- pj_lock_release(ioqueue->lock);
-
- /* Destroy the key. */
- ioqueue_destroy_key(key);
+ + /* ioqueue_destroy may try to acquire key's mutex. + * Since normally the order of locking is to lock key's mutex first + * then ioqueue's mutex, ioqueue_destroy may deadlock unless we + * release ioqueue's mutex first. + */ + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); return PJ_SUCCESS; } -
+ /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ @@ -307,54 +309,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, } } #endif /* VALIDATE_FD_SET */ -
- -/* ioqueue_remove_from_set()
- * This function is called from ioqueue_dispatch_event() to instruct
- * the ioqueue to remove the specified descriptor from ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type)
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
-
-/*
- * ioqueue_add_to_set()
- * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
- * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type )
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
+ + +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} + +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} /* * pj_ioqueue_poll() @@ -378,19 +380,19 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_fd_set_t rfdset, wfdset, xfdset; int count, counter; pj_ioqueue_key_t *h; - struct event
- {
- pj_ioqueue_key_t *key;
- enum event_type event_type;
- } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
-
- PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+ struct event + { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; + } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); /* Lock ioqueue before making fd_set copies */ pj_lock_acquire(ioqueue->lock); -
- /* We will only do select() when there are sockets to be polled.
- * Otherwise select() will return error.
+ + /* We will only do select() when there are sockets to be polled. + * Otherwise select() will return error. */ if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && PJ_FD_COUNT(&ioqueue->wfdset)==0 && @@ -422,71 +424,71 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count <= 0) return count; - else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
- count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
+ else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) + count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; - /* Scan descriptor sets for event and add the events in the event
- * array to be processed later in this function. We do this so that
- * events can be processed in parallel without holding ioqueue lock.
+ /* Scan descriptor sets for event and add the events in the event + * array to be processed later in this function. We do this so that + * events can be processed in parallel without holding ioqueue lock. */ pj_lock_acquire(ioqueue->lock); -
- counter = 0;
- - /* Scan for writable sockets first to handle piggy-back data
- * coming with accept().
- */
- h = ioqueue->key_list.next;
- for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
- if ( (key_has_pending_write(h) || key_has_pending_connect(h))
- && PJ_FD_ISSET(h->fd, &wfdset))
- {
- event[counter].key = h;
- event[counter].event_type = WRITEABLE_EVENT;
- ++counter;
- }
-
- /* Scan for readable socket. */
- if ((key_has_pending_read(h) || key_has_pending_accept(h))
- && PJ_FD_ISSET(h->fd, &rfdset))
- {
- event[counter].key = h;
- event[counter].event_type = READABLE_EVENT;
- ++counter; }
-
-#if PJ_HAS_TCP
- if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
- event[counter].key = h;
- event[counter].event_type = EXCEPTION_EVENT;
- ++counter;
- }
-#endif
- }
-
- pj_lock_release(ioqueue->lock);
-
- count = counter;
-
- /* Now process all events. The dispatch functions will take care
- * of locking in each of the key
- */
- for (counter=0; counter<count; ++counter) {
- switch (event[counter].event_type) {
- case READABLE_EVENT:
- ioqueue_dispatch_read_event(ioqueue, event[counter].key);
- break;
- case WRITEABLE_EVENT:
- ioqueue_dispatch_write_event(ioqueue, event[counter].key);
- break;
- case EXCEPTION_EVENT:
- ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
- break;
- case NO_EVENT:
- default:
- pj_assert(!"Invalid event!");
- break;
- }
- }
+ + counter = 0; + + /* Scan for writable sockets first to handle piggy-back data + * coming with accept(). + */ + h = ioqueue->key_list.next; + for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { + if ( (key_has_pending_write(h) || key_has_pending_connect(h)) + && PJ_FD_ISSET(h->fd, &wfdset)) + { + event[counter].key = h; + event[counter].event_type = WRITEABLE_EVENT; + ++counter; + } + + /* Scan for readable socket. */ + if ((key_has_pending_read(h) || key_has_pending_accept(h)) + && PJ_FD_ISSET(h->fd, &rfdset)) + { + event[counter].key = h; + event[counter].event_type = READABLE_EVENT; + ++counter; + } + +#if PJ_HAS_TCP + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { + event[counter].key = h; + event[counter].event_type = EXCEPTION_EVENT; + ++counter; + } +#endif + } + + pj_lock_release(ioqueue->lock); + + count = counter; + + /* Now process all events. The dispatch functions will take care + * of locking in each of the key + */ + for (counter=0; counter<count; ++counter) { + switch (event[counter].event_type) { + case READABLE_EVENT: + ioqueue_dispatch_read_event(ioqueue, event[counter].key); + break; + case WRITEABLE_EVENT: + ioqueue_dispatch_write_event(ioqueue, event[counter].key); + break; + case EXCEPTION_EVENT: + ioqueue_dispatch_exception_event(ioqueue, event[counter].key); + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } + } return count; } diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c index 18f4ded2..0c951e99 100644 --- a/pjlib/src/pj/os_core_unix.c +++ b/pjlib/src/pj/os_core_unix.c @@ -715,12 +715,14 @@ static pj_status_t init_mutex(pj_mutex_t *mutex, const char *name, int type) if (type == PJ_MUTEX_SIMPLE) { #if defined(PJ_LINUX) && PJ_LINUX!=0 + extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_FAST_NP); #else rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); #endif } else { #if defined(PJ_LINUX) && PJ_LINUX!=0 + extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int); rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP); #else rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); diff --git a/pjlib/src/pj/sock_bsd.c b/pjlib/src/pj/sock_bsd.c index 92495df2..21c10d19 100644 --- a/pjlib/src/pj/sock_bsd.c +++ b/pjlib/src/pj/sock_bsd.c @@ -104,7 +104,9 @@ PJ_DEF(pj_uint32_t) pj_htonl(pj_uint32_t hostlong) */ PJ_DEF(char*) pj_inet_ntoa(pj_in_addr inaddr) { - return inet_ntoa(*(struct in_addr*)&inaddr); + struct in_addr addr; + addr.s_addr = inaddr.s_addr; + return inet_ntoa(addr); } /* diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index 3305fb60..a4ee005c 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -38,9 +38,9 @@ typedef struct test_item client_fd; pj_ioqueue_t *ioqueue; pj_ioqueue_key_t *server_key, - *client_key;
- pj_ioqueue_op_key_t recv_op,
- send_op;
+ *client_key; + pj_ioqueue_op_key_t recv_op, + send_op; int has_pending_send; pj_size_t buffer_size; char *outgoing_buffer; @@ -52,16 +52,16 @@ typedef struct test_item /* Callback when data has been read. * Increment item->bytes_recv and ready to read the next data. */ -static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read) { test_item *item = pj_ioqueue_get_user_data(key); - pj_status_t rc;
+ pj_status_t rc; int data_is_available = 1; //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); -
+ do { if (thread_quit_flag) return; @@ -76,7 +76,7 @@ static void on_read_complete(pj_ioqueue_key_t *key, PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", bytes_read, errmsg)); PJ_LOG(3,(THIS_FILE, - ".....additional info: total read=%u, total written=%u", + ".....additional info: total read=%u, total sent=%u", item->bytes_recv, item->bytes_sent)); } else { last_error_counter++; @@ -94,44 +94,44 @@ static void on_read_complete(pj_ioqueue_key_t *key, */ if (item->bytes_recv > item->buffer_size * 10000) thread_quit_flag = 1; -
+ bytes_read = item->buffer_size; rc = pj_ioqueue_recv( key, op_key, item->incoming_buffer, &bytes_read, 0 ); - if (rc == PJ_SUCCESS) {
- data_is_available = 1;
- } else if (rc == PJ_EPENDING) {
- data_is_available = 0;
- } else {
+ if (rc == PJ_SUCCESS) { + data_is_available = 1; + } else if (rc == PJ_EPENDING) { + data_is_available = 0; + } else { data_is_available = 0; if (rc != last_error) { last_error = rc; - app_perror("...error: read error", rc); + app_perror("...error: read error(1)", rc); } else { last_error_counter++; } - }
-
- if (!item->has_pending_send) {
- pj_ssize_t sent = item->buffer_size;
- rc = pj_ioqueue_send(item->client_key, &item->send_op,
- item->outgoing_buffer, &sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...error: write error", rc);
- }
-
- item->has_pending_send = (rc==PJ_EPENDING);
- }
-
+ } + + if (!item->has_pending_send) { + pj_ssize_t sent = item->buffer_size; + rc = pj_ioqueue_send(item->client_key, &item->send_op, + item->outgoing_buffer, &sent, 0); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + app_perror("...error: write error", rc); + } + + item->has_pending_send = (rc==PJ_EPENDING); + } + } while (data_is_available); } /* Callback when data has been written. * Increment item->bytes_sent and write the next data. */ -static void on_write_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_write_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent) { test_item *item = pj_ioqueue_get_user_data(key); @@ -140,7 +140,7 @@ static void on_write_complete(pj_ioqueue_key_t *key, if (thread_quit_flag) return; -
+ item->has_pending_send = 0; item->bytes_sent += bytes_sent; @@ -150,14 +150,14 @@ static void on_write_complete(pj_ioqueue_key_t *key, } else { pj_status_t rc; -
+ bytes_sent = item->buffer_size; rc = pj_ioqueue_send( item->client_key, op_key, item->outgoing_buffer, &bytes_sent, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: write error", rc); - }
-
+ } + item->has_pending_send = (rc==PJ_EPENDING); } } @@ -231,7 +231,7 @@ static int perform_test(int sock_type, const char *type_name, /* Initialize each producer-consumer pair. */ for (i=0; i<sockpair_cnt; ++i) { - pj_ssize_t bytes;
+ pj_ssize_t bytes; items[i].ioqueue = ioqueue; items[i].buffer_size = buffer_size; @@ -274,7 +274,7 @@ static int perform_test(int sock_type, const char *type_name, } /* Start reading. */ - TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
+ TRACE_((THIS_FILE, " pj_ioqueue_recv..")); bytes = items[i].buffer_size; rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, items[i].incoming_buffer, &bytes, @@ -285,7 +285,7 @@ static int perform_test(int sock_type, const char *type_name, } /* Start writing. */ - TRACE_((THIS_FILE, " pj_ioqueue_write.."));
+ TRACE_((THIS_FILE, " pj_ioqueue_write..")); bytes = items[i].buffer_size; rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, items[i].outgoing_buffer, &bytes, 0); @@ -293,7 +293,7 @@ static int perform_test(int sock_type, const char *type_name, app_perror("...error: pj_ioqueue_write", rc); return -76; } -
+ items[i].has_pending_send = (rc==PJ_EPENDING); } |