summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pjlib/build/pjlib.dsp4
-rw-r--r--pjlib/build/pjlib_test.dsp4
-rw-r--r--pjlib/include/pj/compat/os_linux.h17
-rw-r--r--pjlib/include/pj/compat/os_linux_kernel.h15
-rw-r--r--pjlib/include/pj/compat/os_palmos.h15
-rw-r--r--pjlib/include/pj/compat/os_sunos.h15
-rw-r--r--pjlib/include/pj/compat/os_win32.h15
-rw-r--r--pjlib/include/pj/doxygen.h55
-rw-r--r--pjlib/include/pj/ioqueue.h420
-rw-r--r--pjlib/include/pj/list.h4
-rw-r--r--pjlib/include/pj/pool.h4
-rw-r--r--pjlib/include/pj/sock_select.h10
-rw-r--r--pjlib/include/pj/xml.h6
-rw-r--r--pjlib/src/pj/config.c2
-rw-r--r--pjlib/src/pj/ioqueue_select.c1235
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c834
-rw-r--r--pjlib/src/pjlib-test/atomic.c24
-rw-r--r--pjlib/src/pjlib-test/ioq_perf.c178
-rw-r--r--pjlib/src/pjlib-test/ioq_tcp.c172
-rw-r--r--pjlib/src/pjlib-test/ioq_udp.c157
-rw-r--r--pjlib/src/pjlib-test/main.c3
-rw-r--r--pjlib/src/pjlib-test/test.c8
-rw-r--r--pjlib/src/pjlib-test/test.h14
-rw-r--r--pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c181
-rw-r--r--pjlib/src/pjlib-test/udp_echo_srv_sync.c56
25 files changed, 2105 insertions, 1343 deletions
diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp
index e7aaed0b..23e9f0a2 100644
--- a/pjlib/build/pjlib.dsp
+++ b/pjlib/build/pjlib.dsp
@@ -391,6 +391,10 @@ SOURCE=..\include\pj\compat\os_palmos.h
# End Source File
# Begin Source File
+SOURCE=..\include\pj\compat\os_sunos.h
+# End Source File
+# Begin Source File
+
SOURCE=..\include\pj\compat\os_win32.h
# End Source File
# Begin Source File
diff --git a/pjlib/build/pjlib_test.dsp b/pjlib/build/pjlib_test.dsp
index 3ba9536d..d17af417 100644
--- a/pjlib/build/pjlib_test.dsp
+++ b/pjlib/build/pjlib_test.dsp
@@ -196,6 +196,10 @@ SOURCE="..\src\pjlib-test\timestamp.c"
# End Source File
# Begin Source File
+SOURCE="..\src\pjlib-test\udp_echo_srv_ioqueue.c"
+# End Source File
+# Begin Source File
+
SOURCE="..\src\pjlib-test\udp_echo_srv_sync.c"
# End Source File
# Begin Source File
diff --git a/pjlib/include/pj/compat/os_linux.h b/pjlib/include/pj/compat/os_linux.h
index c75589d6..a2f97f65 100644
--- a/pjlib/include/pj/compat/os_linux.h
+++ b/pjlib/include/pj/compat/os_linux.h
@@ -54,7 +54,22 @@
#define PJ_HAS_WINSOCK_H 0
#define PJ_HAS_WINSOCK2_H 0
-#define PJ_SOCK_HAS_INET_ATON 1
+#define PJ_SOCK_HAS_INET_ATON 1
+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR 1
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() can not return immediate daata.
+ */
+#define PJ_BLOCKING_ERROR_VAL EAGAIN
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS
/* Default threading is enabled, unless it's overridden. */
#ifndef PJ_HAS_THREADS
diff --git a/pjlib/include/pj/compat/os_linux_kernel.h b/pjlib/include/pj/compat/os_linux_kernel.h
index 91c41773..0d44ef0e 100644
--- a/pjlib/include/pj/compat/os_linux_kernel.h
+++ b/pjlib/include/pj/compat/os_linux_kernel.h
@@ -52,6 +52,21 @@
#define PJ_HAS_WINSOCK2_H 0
#define PJ_SOCK_HAS_INET_ATON 0
+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR 1
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() can not return immediate daata.
+ */
+#define PJ_BLOCKING_ERROR_VAL EAGAIN
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS
#ifndef PJ_HAS_THREADS
# define PJ_HAS_THREADS (1)
diff --git a/pjlib/include/pj/compat/os_palmos.h b/pjlib/include/pj/compat/os_palmos.h
index 63cd8273..e2ac4b52 100644
--- a/pjlib/include/pj/compat/os_palmos.h
+++ b/pjlib/include/pj/compat/os_palmos.h
@@ -44,6 +44,21 @@
#define PJ_HAS_WINSOCK2_H 0
#define PJ_SOCK_HAS_INET_ATON 0
+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR 0
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() can not return immediate daata.
+ */
+#define PJ_BLOCKING_ERROR_VAL xxx
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL xxx
/* Default threading is enabled, unless it's overridden. */
#ifndef PJ_HAS_THREADS
diff --git a/pjlib/include/pj/compat/os_sunos.h b/pjlib/include/pj/compat/os_sunos.h
index b0f48c38..990fb57d 100644
--- a/pjlib/include/pj/compat/os_sunos.h
+++ b/pjlib/include/pj/compat/os_sunos.h
@@ -39,6 +39,21 @@
#define PJ_HAS_WINSOCK2_H 0
#define PJ_SOCK_HAS_INET_ATON 0
+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR 0
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() can not return immediate daata.
+ */
+#define PJ_BLOCKING_ERROR_VAL EWOULDBLOCK
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL EINPROGRESS
/* Default threading is enabled, unless it's overridden. */
#ifndef PJ_HAS_THREADS
diff --git a/pjlib/include/pj/compat/os_win32.h b/pjlib/include/pj/compat/os_win32.h
index db7b1b4f..87ff7520 100644
--- a/pjlib/include/pj/compat/os_win32.h
+++ b/pjlib/include/pj/compat/os_win32.h
@@ -59,6 +59,21 @@
#define PJ_HAS_WINSOCK2_H 1
#define PJ_SOCK_HAS_INET_ATON 0
+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR 0
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() or send() can not return immediately.
+ */
+#define PJ_BLOCKING_ERROR_VAL WSAEWOULDBLOCK
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL WSAEWOULDBLOCK
/* Default threading is enabled, unless it's overridden. */
#ifndef PJ_HAS_THREADS
diff --git a/pjlib/include/pj/doxygen.h b/pjlib/include/pj/doxygen.h
index 9c32e4e2..30f642a6 100644
--- a/pjlib/include/pj/doxygen.h
+++ b/pjlib/include/pj/doxygen.h
@@ -87,9 +87,12 @@
* Alternatively, to get the list of all examples, you can click on
* <b>Related Pages</b> on the top of HTML document or on
* <b>PJLIB Page Documentation</b> on navigation pane of your PDF reader.
- *
- *
- *
+ *
+ * - <b>How to Submit Code to PJLIB Project</b>
+ *\n
+ * Please read \ref pjlib_coding_convention_page before submitting
+ * your code. Send your code as patch against current Subversion tree
+ * to the appropriate mailing list.
*
*
* @section features_sec Features
@@ -395,6 +398,52 @@
+/*////////////////////////////////////////////////////////////////////////// */
+/*
+ CODING CONVENTION
+ */
+
+/**
+ * @page pjlib_coding_convention_page Coding Convention
+ *
+ * Before you submit your code/patches to be included with PJLIB, you must
+ * make sure that your code is compliant with PJLIB coding convention.
+ * <b>This is very important!</b> Otherwise we would not accept your code.
+ *
+ * @section coding_conv_editor_sec Editor Settings
+ *
+ * The single most important thing in the whole coding convention is editor
+ * settings. It's more important than the correctness of your code (bugs will
+ * only crash the system, but incorrect tab size is mental!).
+ *
+ * Kindly set your editor as follows:
+ * - tab size to \b 8.
+ * - indentation to \b 4.
+ *
+ * With \c vi, you can do it with:
+ * <pre>
+ * :se ts=8
+ * :se sts=4
+ * </pre>
+ *
+ * You should replace tab with eight spaces.
+ *
+ * @section coding_conv_detail_sec Coding Style
+ *
+ * Coding style MUST strictly follow K&R style. The rest of coding style
+ * must follow current style. You SHOULD be able to observe the style
+ * currently used by PJLIB from PJLIB sources, and apply the style to your
+ * code. If you're not able to do simple thing like to observe PJLIB
+ * coding style from the sources, then logic dictates that your ability to
+ * observe more difficult area in PJLIB such as memory allocation strategy,
+ * concurrency, etc is questionable.
+ *
+ * @section coding_conv_comment_sec Commenting Your Code
+ *
+ * Public API (e.g. in header files) MUST have doxygen compliant comments.
+ *
+ */
+
/*////////////////////////////////////////////////////////////////////////// */
/*
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h
index 9d26b082..6a7b827e 100644
--- a/pjlib/include/pj/ioqueue.h
+++ b/pjlib/include/pj/ioqueue.h
@@ -1,5 +1,4 @@
/* $Id$
- *
*/
#ifndef __PJ_IOQUEUE_H__
@@ -48,17 +47,54 @@ PJ_BEGIN_DECL
* @ingroup PJ_IO
* @{
*
- * This file provides abstraction for various event dispatching mechanisms.
- * The interfaces for event dispatching vary alot, even in a single
- * operating system. The abstraction here hopefully is suitable for most of
- * the event dispatching available.
- *
- * Currently, the I/O Queue supports:
- * - select(), as the common denominator, but the least efficient.
- * - I/O Completion ports in Windows NT/2000/XP, which is the most efficient
- * way to dispatch events in Windows NT based OSes, and most importantly,
- * it doesn't have the limit on how many handles to monitor. And it works
- * with files (not only sockets) as well.
+ * I/O Queue provides API for performing asynchronous I/O operations. It
+ * conforms to proactor pattern, which allows application to submit an
+ * asynchronous operation and to be notified later when the operation has
+ * completed.
+ *
+ * The framework works natively in platforms where asynchronous operation API
+ * exists, such as in Windows NT with IoCompletionPort/IOCP. In other
+ * platforms, the I/O queue abstracts the operating system's event poll API
+ * to provide semantics similar to IoCompletionPort with minimal penalties
+ * (i.e. per ioqueue and per handle mutex protection).
+ *
+ * The I/O queue provides more than just unified abstraction. It also:
+ * - makes sure that the operation uses the most effective way to utilize
+ * the underlying mechanism, to provide the maximum theoritical
+ * throughput possible on a given platform.
+ * - choose the most efficient mechanism for event polling on a given
+ * platform.
+ *
+ * Currently, the I/O Queue is implemented using:
+ * - <tt><b>select()</b></tt>, as the common denominator, but the least
+ * efficient. Also the number of descriptor is limited to
+ * \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64).
+ * - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode),
+ * a much faster replacement for select() on Linux (and more importantly
+ * doesn't have limitation on number of descriptors).
+ * - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most
+ * efficient way to dispatch events in Windows NT based OSes, and most
+ * importantly, it doesn't have the limit on how many handles to monitor.
+ * And it works with files (not only sockets) as well.
+ *
+ *
+ * \section pj_ioqueue_concurrency_sec Concurrency Rules
+ *
+ * The items below describe rules that must be obeyed when using the I/O
+ * queue, with regard to concurrency:
+ * - in general, the I/O queue is thread safe (assuming the lock strategy
+ * is not changed to disable mutex protection). All operations, except
+ * unregistration which is described below, can be safely invoked
+ * simultaneously by multiple threads.
+ * - however, <b>care must be taken when unregistering a key</b> from the
+ * ioqueue. Application must take care that when one thread is issuing
+ * an unregistration, other thread is not simultaneously invoking an
+ * operation <b>to the same key</b>.
+ *\n
+ * This happens because the ioqueue functions are working with a pointer
+ * to the key, and there is a possible race condition where the pointer
+ * has been rendered invalid by other threads before the ioqueue has a
+ * chance to acquire mutex on it.
*
* \section pj_ioqeuue_examples_sec Examples
*
@@ -68,53 +104,90 @@ PJ_BEGIN_DECL
* - \ref page_pjlib_ioqueue_udp_test
* - \ref page_pjlib_ioqueue_perf_test
*/
+
+
+
+/**
+ * This structure describes operation specific key to be submitted to
+ * I/O Queue when performing the asynchronous operation. This key will
+ * be returned to the application when completion callback is called.
+ *
+ * Application normally wants to attach it's specific data in the
+ * \c user_data field so that it can keep track of which operation has
+ * completed when the callback is called. Alternatively, application can
+ * also extend this struct to include its data, because the pointer that
+ * is returned in the completion callback will be exactly the same as
+ * the pointer supplied when the asynchronous function is called.
+ */
+typedef struct pj_ioqueue_op_key_t
+{
+ void *internal__[32]; /**< Internal I/O Queue data. */
+ void *user_data; /**< Application data. */
+} pj_ioqueue_op_key_t;
- /**
- * This structure describes the callbacks to be called when I/O operation
- * completes.
- */
+/**
+ * This structure describes the callbacks to be called when I/O operation
+ * completes.
+ */
typedef struct pj_ioqueue_callback
{
/**
- * This callback is called when #pj_ioqueue_read or #pj_ioqueue_recvfrom
+ * This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom
* completes.
*
- * @param key The key.
- * @param bytes_read The size of data that has just been read.
+ * @param key The key.
+ * @param op_key Operation key.
+ * @param bytes_read >= 0 to indicate the amount of data read,
+ * otherwise negative value containing the error
+ * code. To obtain the pj_status_t error code, use
+ * (pj_status_t code = -bytes_read).
*/
- void (*on_read_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_read);
+ void (*on_read_complete)(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
/**
* This callback is called when #pj_ioqueue_write or #pj_ioqueue_sendto
* completes.
*
- * @param key The key.
- * @param bytes_read The size of data that has just been read.
+ * @param key The key.
+ * @param op_key Operation key.
+ * @param bytes_sent >= 0 to indicate the amount of data written,
+ * otherwise negative value containing the error
+ * code. To obtain the pj_status_t error code, use
+ * (pj_status_t code = -bytes_sent).
*/
- void (*on_write_complete)(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent);
+ void (*on_write_complete)(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent);
/**
* This callback is called when #pj_ioqueue_accept completes.
*
- * @param key The key.
+ * @param key The key.
+ * @param op_key Operation key.
* @param sock Newly connected socket.
* @param status Zero if the operation completes successfully.
*/
- void (*on_accept_complete)(pj_ioqueue_key_t *key, pj_sock_t sock,
- int status);
+ void (*on_accept_complete)(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
+ pj_status_t status);
/**
* This callback is called when #pj_ioqueue_connect completes.
*
- * @param key The key.
- * @param status Zero if the operation completes successfully.
+ * @param key The key.
+ * @param status PJ_SUCCESS if the operation completes successfully.
*/
- void (*on_connect_complete)(pj_ioqueue_key_t *key, int status);
+ void (*on_connect_complete)(pj_ioqueue_key_t *key,
+ pj_status_t status);
} pj_ioqueue_callback;
/**
- * Types of I/O Queue operation.
+ * Types of pending I/O Queue operation. This enumeration is only used
+ * internally within the ioqueue.
*/
typedef enum pj_ioqueue_operation_e
{
@@ -137,25 +210,19 @@ typedef enum pj_ioqueue_operation_e
* number of threads.
*/
#define PJ_IOQUEUE_DEFAULT_THREADS 0
-
-
+
/**
* Create a new I/O Queue framework.
*
* @param pool The pool to allocate the I/O queue structure.
* @param max_fd The maximum number of handles to be supported, which
* should not exceed PJ_IOQUEUE_MAX_HANDLES.
- * @param max_threads The maximum number of threads that are allowed to
- * operate on a single descriptor simultaneously. If
- * the value is zero, the framework will set it
- * to a reasonable value.
* @param ioqueue Pointer to hold the newly created I/O Queue.
*
* @return PJ_SUCCESS on success.
*/
PJ_DECL(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
- int max_threads,
pj_ioqueue_t **ioqueue);
/**
@@ -199,8 +266,10 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
* @param sock The socket.
* @param user_data User data to be associated with the key, which can be
* retrieved later.
- * @param cb Callback to be called when I/O operation completes.
- * @param key Pointer to receive the returned key.
+ * @param cb Callback to be called when I/O operation completes.
+ * @param key Pointer to receive the key to be associated with this
+ * socket. Subsequent I/O queue operation will need this
+ * key.
*
* @return PJ_SUCCESS on success, or the error code.
*/
@@ -208,49 +277,66 @@ PJ_DECL(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_ioqueue_t *ioque,
pj_sock_t sock,
void *user_data,
- const pj_ioqueue_callback *cb,
- pj_ioqueue_key_t **key);
+ const pj_ioqueue_callback *cb,
+ pj_ioqueue_key_t **key );
/**
- * Unregister a handle from the I/O Queue framework.
+ * Unregister from the I/O Queue framework. Caller must make sure that
+ * the key doesn't have any pending operation before calling this function,
+ * or otherwise the behaviour is undefined (either callback will be called
+ * later when the data is sent/received, or the callback will not be called,
+ * or even something else).
*
- * @param ioque The I/O Queue.
- * @param key The key that uniquely identifies the handle, which is
- * returned from the function #pj_ioqueue_register_sock()
- * or other registration functions.
+ * @param key The key that was previously obtained from registration.
*
* @return PJ_SUCCESS on success or the error code.
*/
-PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key );
+PJ_DECL(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key );
/**
- * Get user data associated with the I/O Queue key.
+ * Get user data associated with an ioqueue key.
+ *
+ * @param key The key that was previously obtained from registration.
*
- * @param key The key previously associated with the socket/handle with
- * #pj_ioqueue_register_sock() (or other registration
- * functions).
- *
- * @return The user data associated with the key, or NULL on error
- * of if no data is associated with the key during
+ * @return The user data associated with the descriptor, or NULL
+ * on error or if no data is associated with the key during
* registration.
*/
PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key );
+
+/**
+ * Set or change the user data to be associated with the file descriptor or
+ * handle or socket descriptor.
+ *
+ * @param key The key that was previously obtained from registration.
+ * @param user_data User data to be associated with the descriptor.
+ * @param old_data Optional parameter to retrieve the old user data.
+ *
+ * @return PJ_SUCCESS on success or the error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
/**
- * Instruct I/O Queue to wait for incoming connections on the specified
- * listening socket. This function will return
- * immediately (i.e. non-blocking) regardless whether some data has been
- * transfered. If the function can't complete immediately, and the caller will
- * be notified about the completion when it calls pj_ioqueue_poll().
- *
- * @param ioqueue The I/O Queue
- * @param key The key which registered to the server socket.
- * @param sock Argument which contain pointer to receive
- * the socket for the incoming connection.
+ * Instruct I/O Queue to accept incoming connection on the specified
+ * listening socket. This function will return immediately (i.e. non-blocking)
+ * regardless whether a connection is immediately available. If the function
+ * can't complete immediately, the caller will be notified about the incoming
+ * connection when it calls pj_ioqueue_poll(). If a new connection is
+ * immediately available, the function returns PJ_SUCCESS with the new
+ * connection; in this case, the callback WILL NOT be called.
+ *
+ * @param key The key which registered to the server socket.
+ * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called.
+ * @param new_sock Argument which contain pointer to receive the new socket
+ * for the incoming connection.
* @param local Optional argument which contain pointer to variable to
* receive local address.
* @param remote Optional argument which contain pointer to variable to
@@ -259,14 +345,16 @@ PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key );
* address, and on output, contains the actual length of the
* address. This argument is optional.
* @return
- * - PJ_SUCCESS If there's a connection available immediately, which
- * in this case the callback should have been called before
- * the function returns.
- * - PJ_EPENDING If accept is queued, or
+ * - PJ_SUCCESS When connection is available immediately, and the
+ * parameters will be updated to contain information about
+ * the new connection. In this case, a completion callback
+ * WILL NOT be called.
+ * - PJ_EPENDING If no connection is available immediately. When a new
+ * connection arrives, the callback will be called.
* - non-zero which indicates the appropriate error code.
*/
-PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
pj_sock_t *sock,
pj_sockaddr_t *local,
pj_sockaddr_t *remote,
@@ -274,21 +362,22 @@ PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
/**
* Initiate non-blocking socket connect. If the socket can NOT be connected
- * immediately, the result will be reported during poll.
+ * immediately, asynchronous connect() will be scheduled and caller will be
+ * notified via completion callback when it calls pj_ioqueue_poll(). If
+ * socket is connected immediately, the function returns PJ_SUCCESS and
+ * completion callback WILL NOT be called.
*
- * @param ioqueue The ioqueue
* @param key The key associated with TCP socket
* @param addr The remote address.
* @param addrlen The remote address length.
*
* @return
- * - PJ_SUCCESS If socket is connected immediately, which in this case
- * the callback should have been called.
+ * - PJ_SUCCESS If socket is connected immediately. In this case, the
+ * completion callback WILL NOT be called.
* - PJ_EPENDING If operation is queued, or
* - non-zero Indicates the error code.
*/
-PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen );
@@ -309,74 +398,75 @@ PJ_DECL(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
PJ_DECL(int) pj_ioqueue_poll( pj_ioqueue_t *ioque,
const pj_time_val *timeout);
-/**
- * Instruct the I/O Queue to read from the specified handle. This function
- * returns immediately (i.e. non-blocking) regardless whether some data has
- * been transfered. If the operation can't complete immediately, caller will
- * be notified about the completion when it calls pj_ioqueue_poll().
- *
- * @param ioque The I/O Queue.
- * @param key The key that uniquely identifies the handle.
- * @param buffer The buffer to hold the read data. The caller MUST make sure
- * that this buffer remain valid until the framework completes
- * reading the handle.
- * @param buflen The maximum size to be read.
- *
- * @return
- * - PJ_SUCCESS If immediate data has been received. In this case, the
- * callback must have been called before this function
- * returns, and no pending operation is scheduled.
- * - PJ_EPENDING If the operation has been queued.
- * - non-zero The return value indicates the error code.
- */
-PJ_DECL(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen);
-
/**
- * This function behaves similarly as #pj_ioqueue_read(), except that it is
- * normally called for socket.
+ * Instruct the I/O Queue to read from the specified handle. This function
+ * returns immediately (i.e. non-blocking) regardless whether some data has
+ * been transfered. If the operation can't complete immediately, caller will
+ * be notified about the completion when it calls pj_ioqueue_poll(). If data
+ * is immediately available, the function will return PJ_SUCCESS and the
+ * callback WILL NOT be called.
*
- * @param ioque The I/O Queue.
* @param key The key that uniquely identifies the handle.
+ * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called. Caller must make sure that this key remains
+ * valid until the function completes.
* @param buffer The buffer to hold the read data. The caller MUST make sure
* that this buffer remain valid until the framework completes
* reading the handle.
- * @param buflen The maximum size to be read.
+ * @param length On input, it specifies the size of the buffer. If data is
+ * available to be read immediately, the function returns
+ * PJ_SUCCESS and this argument will be filled with the
+ * amount of data read. If the function is pending, caller
+ * will be notified about the amount of data read in the
+ * callback. This parameter can point to local variable in
+ * caller's stack and doesn't have to remain valid for the
+ * duration of pending operation.
* @param flags Recv flag.
*
* @return
- * - PJ_SUCCESS If immediate data has been received. In this case, the
- * callback must have been called before this function
- * returns, and no pending operation is scheduled.
- * - PJ_EPENDING If the operation has been queued.
+ * - PJ_SUCCESS If immediate data has been received in the buffer. In this
+ * case, the callback WILL NOT be called.
+ * - PJ_EPENDING If the operation has been queued, and the callback will be
+ * called when data has been received.
* - non-zero The return value indicates the error code.
*/
-PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags );
/**
- * This function behaves similarly as #pj_ioqueue_read(), except that it is
+ * This function behaves similarly as #pj_ioqueue_recv(), except that it is
* normally called for socket, and the remote address will also be returned
* along with the data. Caller MUST make sure that both buffer and addr
* remain valid until the framework completes reading the data.
*
- * @param ioque The I/O Queue.
* @param key The key that uniquely identifies the handle.
+ * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called.
* @param buffer The buffer to hold the read data. The caller MUST make sure
* that this buffer remain valid until the framework completes
* reading the handle.
- * @param buflen The maximum size to be read.
+ * @param length On input, it specifies the size of the buffer. If data is
+ * available to be read immediately, the function returns
+ * PJ_SUCCESS and this argument will be filled with the
+ * amount of data read. If the function is pending, caller
+ * will be notified about the amount of data read in the
+ * callback. This parameter can point to local variable in
+ * caller's stack and doesn't have to remain valid for the
+ * duration of pending operation.
* @param flags Recv flag.
- * @param addr Pointer to buffer to receive the address, or NULL.
+ * @param addr Optional Pointer to buffer to receive the address.
* @param addrlen On input, specifies the length of the address buffer.
* On output, it will be filled with the actual length of
- * the address.
+ * the address. This argument can be NULL if \c addr is not
+ * specified.
*
* @return
* - PJ_SUCCESS If immediate data has been received. In this case, the
@@ -385,56 +475,52 @@ PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
* - PJ_EPENDING If the operation has been queued.
* - non-zero The return value indicates the error code.
*/
-PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags,
pj_sockaddr_t *addr,
int *addrlen);
-/**
- * Instruct the I/O Queue to write to the handle. This function will return
- * immediately (i.e. non-blocking) regardless whether some data has been
- * transfered. If the function can't complete immediately, and the caller will
- * be notified about the completion when it calls pj_ioqueue_poll().
- *
- * @param ioque the I/O Queue.
- * @param key the key that identifies the handle.
- * @param data the data to send. Caller MUST make sure that this buffer
- * remains valid until the write operation completes.
- * @param datalen the length of the data.
- *
- * @return
- * - PJ_SUCCESS If data was immediately written.
- * - PJ_EPENDING If the operation has been queued.
- * - non-zero The return value indicates the error code.
- */
-PJ_DECL(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen);
/**
- * This function behaves similarly as #pj_ioqueue_write(), except that
- * pj_sock_send() (or equivalent) will be called to send the data.
- *
- * @param ioque the I/O Queue.
- * @param key the key that identifies the handle.
- * @param data the data to send. Caller MUST make sure that this buffer
+ * Instruct the I/O Queue to write to the handle. This function will return
+ * immediately (i.e. non-blocking) regardless whether some data has been
+ * transfered. If the function can't complete immediately, the caller will
+ * be notified about the completion when it calls pj_ioqueue_poll(). If
+ * operation completes immediately and data has been transfered, the function
+ * returns PJ_SUCCESS and the callback will NOT be called.
+ *
+ * @param key The key that identifies the handle.
+ * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called.
+ * @param data The data to send. Caller MUST make sure that this buffer
* remains valid until the write operation completes.
- * @param datalen the length of the data.
- * @param flags send flags.
+ * @param length On input, it specifies the length of data to send. When
+ * data was sent immediately, this function returns PJ_SUCCESS
+ * and this parameter contains the length of data sent. If
+ * data can not be sent immediately, an asynchronous operation
+ * is scheduled and caller will be notified via callback the
+ * number of bytes sent. This parameter can point to local
+ * variable on caller's stack and doesn't have to remain
+ * valid until the operation has completed.
+ * @param flags Send flags.
*
* @return
- * - PJ_SUCCESS If data was immediately written.
- * - PJ_EPENDING If the operation has been queued.
+ * - PJ_SUCCESS If data was immediately transfered. In this case, no
+ * pending operation has been scheduled and the callback
+ * WILL NOT be called.
+ * - PJ_EPENDING If the operation has been queued. Once data base been
+ * transfered, the callback will be called.
* - non-zero The return value indicates the error code.
*/
-PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags );
@@ -442,24 +528,34 @@ PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
* This function behaves similarly as #pj_ioqueue_write(), except that
* pj_sock_sendto() (or equivalent) will be called to send the data.
*
- * @param ioque the I/O Queue.
* @param key the key that identifies the handle.
+ * @param op_key An operation specific key to be associated with the
+ * pending operation, so that application can keep track of
+ * which operation has been completed when the callback is
+ * called.
* @param data the data to send. Caller MUST make sure that this buffer
* remains valid until the write operation completes.
- * @param datalen the length of the data.
+ * @param length On input, it specifies the length of data to send. When
+ * data was sent immediately, this function returns PJ_SUCCESS
+ * and this parameter contains the length of data sent. If
+ * data can not be sent immediately, an asynchronous operation
+ * is scheduled and caller will be notified via callback the
+ * number of bytes sent. This parameter can point to local
+ * variable on caller's stack and doesn't have to remain
+ * valid until the operation has completed.
* @param flags send flags.
- * @param addr remote address.
- * @param addrlen remote address length.
+ * @param addr Optional remote address.
+ * @param addrlen Remote address length, \c addr is specified.
*
* @return
* - PJ_SUCCESS If data was immediately written.
* - PJ_EPENDING If the operation has been queued.
* - non-zero The return value indicates the error code.
*/
-PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags,
const pj_sockaddr_t *addr,
int addrlen);
diff --git a/pjlib/include/pj/list.h b/pjlib/include/pj/list.h
index 32bd5aec..95ae99fd 100644
--- a/pjlib/include/pj/list.h
+++ b/pjlib/include/pj/list.h
@@ -46,7 +46,7 @@ PJ_BEGIN_DECL
* @hideinitializer
*/
#define PJ_DECL_LIST_MEMBER(type) type *prev; /** List @a prev. */ \
- type *next; /** List @a next. */
+ type *next /** List @a next. */
/**
@@ -56,7 +56,7 @@ PJ_BEGIN_DECL
*/
struct pj_list
{
- PJ_DECL_LIST_MEMBER(void)
+ PJ_DECL_LIST_MEMBER(void);
};
diff --git a/pjlib/include/pj/pool.h b/pjlib/include/pj/pool.h
index ca6ac20a..4001079f 100644
--- a/pjlib/include/pj/pool.h
+++ b/pjlib/include/pj/pool.h
@@ -113,7 +113,7 @@ typedef void pj_pool_callback(pj_pool_t *pool, pj_size_t size);
*/
typedef struct pj_pool_block
{
- PJ_DECL_LIST_MEMBER(struct pj_pool_block) /**< List's prev and next. */
+ PJ_DECL_LIST_MEMBER(struct pj_pool_block); /**< List's prev and next. */
unsigned char *buf; /**< Start of buffer. */
unsigned char *cur; /**< Current alloc ptr. */
unsigned char *end; /**< End of buffer. */
@@ -126,7 +126,7 @@ typedef struct pj_pool_block
*/
struct pj_pool_t
{
- PJ_DECL_LIST_MEMBER(struct pj_pool_t)
+ PJ_DECL_LIST_MEMBER(struct pj_pool_t);
/** Pool name */
char obj_name[PJ_MAX_OBJ_NAME];
diff --git a/pjlib/include/pj/sock_select.h b/pjlib/include/pj/sock_select.h
index 730d05e8..5b0c7002 100644
--- a/pjlib/include/pj/sock_select.h
+++ b/pjlib/include/pj/sock_select.h
@@ -97,16 +97,6 @@ PJ_DECL(pj_bool_t) PJ_FD_ISSET(pj_sock_t fd, const pj_fd_set_t *fdsetp);
/**
- * Get the number of descriptors in the set.
- *
- * @param fdsetp The descriptor set.
- *
- * @return Number of descriptors in the set.
- */
-PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
-
-
-/**
* This function wait for a number of file descriptors to change status.
* The behaviour is the same as select() function call which appear in
* standard BSD socket libraries.
diff --git a/pjlib/include/pj/xml.h b/pjlib/include/pj/xml.h
index d487cc88..9777a514 100644
--- a/pjlib/include/pj/xml.h
+++ b/pjlib/include/pj/xml.h
@@ -30,7 +30,7 @@ typedef struct pj_xml_node pj_xml_node;
/** This structure declares XML attribute. */
struct pj_xml_attr
{
- PJ_DECL_LIST_MEMBER(pj_xml_attr)
+ PJ_DECL_LIST_MEMBER(pj_xml_attr);
pj_str_t name; /**< Attribute name. */
pj_str_t value; /**< Attribute value. */
};
@@ -39,13 +39,13 @@ struct pj_xml_attr
*/
typedef struct pj_xml_node_head
{
- PJ_DECL_LIST_MEMBER(pj_xml_node)
+ PJ_DECL_LIST_MEMBER(pj_xml_node);
} pj_xml_node_head;
/** This structure describes XML node. */
struct pj_xml_node
{
- PJ_DECL_LIST_MEMBER(pj_xml_node) /** List @a prev and @a next member */
+ PJ_DECL_LIST_MEMBER(pj_xml_node); /** List @a prev and @a next member */
pj_str_t name; /** Node name. */
pj_xml_attr attr_head; /** Attribute list. */
pj_xml_node_head node_head; /** Node list. */
diff --git a/pjlib/src/pj/config.c b/pjlib/src/pj/config.c
index 962fbdcf..3354f133 100644
--- a/pjlib/src/pj/config.c
+++ b/pjlib/src/pj/config.c
@@ -4,7 +4,7 @@
#include <pj/log.h>
static const char *id = "config.c";
-const char *PJ_VERSION = "0.3.0-pre1";
+const char *PJ_VERSION = "0.3.0-pre4";
PJ_DEF(void) pj_dump_config(void)
{
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 37d6f661..367ffb5e 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -33,23 +33,33 @@
*
*/
#define THIS_FILE "ioq_select"
+
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+# error "Error reporting must be enabled for this function to work!"
+#endif
+
+/**
+ * Get the number of descriptors in the set. This is defined in sock_select.c
+ * This function will only return the number of sockets set from PJ_FD_SET
+ * operation. When the set is modified by other means (such as by select()),
+ * the count will not be reflected here.
+ *
+ * That's why don't export this function in the header file, to avoid
+ * misunderstanding.
+ *
+ * @param fdsetp The descriptor set.
+ *
+ * @return Number of descriptors in the set.
+ */
+PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
+
-#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \
- (op & PJ_IOQUEUE_OP_RECV) || \
- (op & PJ_IOQUEUE_OP_RECV_FROM))
-#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \
- (op & PJ_IOQUEUE_OP_SEND) || \
- (op & PJ_IOQUEUE_OP_SEND_TO))
-#if PJ_HAS_TCP
-# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT)
-# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT)
-#else
-# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0
-# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0
-#endif
-
/*
* During debugging build, VALIDATE_FD_SET is set.
* This will check the validity of the fd_sets.
@@ -59,31 +69,77 @@
#else
# define VALIDATE_FD_SET 0
#endif
+
+struct generic_operation
+{
+ PJ_DECL_LIST_MEMBER(struct generic_operation);
+ pj_ioqueue_operation_e op;
+};
+
+struct read_operation
+{
+ PJ_DECL_LIST_MEMBER(struct read_operation);
+ pj_ioqueue_operation_e op;
+
+ void *buf;
+ pj_size_t size;
+ unsigned flags;
+ pj_sockaddr_t *rmt_addr;
+ int *rmt_addrlen;
+};
+
+struct write_operation
+{
+ PJ_DECL_LIST_MEMBER(struct write_operation);
+ pj_ioqueue_operation_e op;
+
+ char *buf;
+ pj_size_t size;
+ pj_ssize_t written;
+ unsigned flags;
+ pj_sockaddr_in rmt_addr;
+ int rmt_addrlen;
+};
+
+#if PJ_HAS_TCP
+struct accept_operation
+{
+ PJ_DECL_LIST_MEMBER(struct accept_operation);
+ pj_ioqueue_operation_e op;
+
+ pj_sock_t *accept_fd;
+ pj_sockaddr_t *local_addr;
+ pj_sockaddr_t *rmt_addr;
+ int *addrlen;
+};
+#endif
+
+union operation_key
+{
+ struct generic_operation generic;
+ struct read_operation read;
+ struct write_operation write;
+#if PJ_HAS_TCP
+ struct accept_operation accept;
+#endif
+};
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
{
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
+ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
+ pj_ioqueue_t *ioqueue;
pj_sock_t fd;
- pj_ioqueue_operation_e op;
void *user_data;
- pj_ioqueue_callback cb;
-
- void *rd_buf;
- unsigned rd_flags;
- pj_size_t rd_buflen;
- void *wr_buf;
- pj_size_t wr_buflen;
-
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-
- pj_sockaddr_t *local_addr;
- int *local_addrlen;
-
- pj_sock_t *accept_fd;
+ pj_ioqueue_callback cb;
+ int connecting;
+ struct read_operation read_list;
+ struct write_operation write_list;
+#if PJ_HAS_TCP
+ struct accept_operation accept_list;
+#endif
};
/*
@@ -94,7 +150,7 @@ struct pj_ioqueue_t
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
unsigned max, count;
- pj_ioqueue_key_t hlist;
+ pj_ioqueue_key_t key_list;
pj_fd_set_t rfdset;
pj_fd_set_t wfdset;
#if PJ_HAS_TCP
@@ -109,38 +165,39 @@ struct pj_ioqueue_t
*/
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
- int max_threads,
pj_ioqueue_t **p_ioqueue)
{
- pj_ioqueue_t *ioque;
+ pj_ioqueue_t *ioqueue;
pj_status_t rc;
-
- PJ_UNUSED_ARG(max_threads);
-
- if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
- pj_assert(!"max_fd too large");
- return PJ_EINVAL;
- }
-
- ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
- ioque->max = max_fd;
- ioque->count = 0;
- PJ_FD_ZERO(&ioque->rfdset);
- PJ_FD_ZERO(&ioque->wfdset);
+
+ /* Check that arguments are valid. */
+ PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
+ max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
+ PJ_EINVAL);
+
+ /* Check that size of pj_ioqueue_op_key_t is sufficient */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+
+ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+ ioqueue->max = max_fd;
+ ioqueue->count = 0;
+ PJ_FD_ZERO(&ioqueue->rfdset);
+ PJ_FD_ZERO(&ioqueue->wfdset);
#if PJ_HAS_TCP
- PJ_FD_ZERO(&ioque->xfdset);
+ PJ_FD_ZERO(&ioqueue->xfdset);
#endif
- pj_list_init(&ioque->hlist);
+ pj_list_init(&ioqueue->key_list);
- rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
+ rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock);
if (rc != PJ_SUCCESS)
return rc;
- ioque->auto_delete_lock = PJ_TRUE;
+ ioqueue->auto_delete_lock = PJ_TRUE;
- PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
+ PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
- *p_ioqueue = ioque;
+ *p_ioqueue = ioqueue;
return PJ_SUCCESS;
}
@@ -149,46 +206,28 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*
* Destroy ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
pj_status_t rc = PJ_SUCCESS;
- PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+
+ pj_lock_acquire(ioqueue->lock);
- if (ioque->auto_delete_lock)
- rc = pj_lock_destroy(ioque->lock);
+ if (ioqueue->auto_delete_lock)
+ rc = pj_lock_destroy(ioqueue->lock);
return rc;
}
/*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
-
- if (ioque->auto_delete_lock) {
- pj_lock_destroy(ioque->lock);
- }
-
- ioque->lock = lock;
- ioque->auto_delete_lock = auto_delete;
-
- return PJ_SUCCESS;
-}
-
-
-/*
* pj_ioqueue_register_sock()
*
* Register a handle to ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
- pj_ioqueue_t *ioque,
+ pj_ioqueue_t *ioqueue,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
@@ -198,12 +237,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_uint32_t value;
pj_status_t rc = PJ_SUCCESS;
- PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
+ PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
cb && p_key, PJ_EINVAL);
- pj_lock_acquire(ioque->lock);
+ pj_lock_acquire(ioqueue->lock);
- if (ioque->count >= ioque->max) {
+ if (ioqueue->count >= ioqueue->max) {
rc = PJ_ETOOMANY;
goto on_return;
}
@@ -211,7 +250,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
/* Set socket to nonblocking. */
value = 1;
#ifdef PJ_WIN32
- if (ioctlsocket(sock, FIONBIO, (unsigned long*)&value)) {
+ if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
#else
if (ioctl(sock, FIONBIO, &value)) {
#endif
@@ -220,20 +259,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* Create key. */
- key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ key->ioqueue = ioqueue;
key->fd = sock;
- key->user_data = user_data;
-
+ key->user_data = user_data;
+ pj_list_init(&key->read_list);
+ pj_list_init(&key->write_list);
+#if PJ_HAS_TCP
+ pj_list_init(&key->accept_list);
+#endif
+
/* Save callback. */
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
/* Register */
- pj_list_insert_before(&ioque->hlist, key);
- ++ioque->count;
+ pj_list_insert_before(&ioqueue->key_list, key);
+ ++ioqueue->count;
-on_return:
+on_return:
+ /* On error, socket may be left in non-blocking mode. */
*p_key = key;
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return rc;
}
@@ -243,23 +289,26 @@ on_return:
*
* Unregister handle from ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key)
-{
- PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
+{
+ pj_ioqueue_t *ioqueue;
+
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ ioqueue = key->ioqueue;
- pj_lock_acquire(ioque->lock);
+ pj_lock_acquire(ioqueue->lock);
- pj_assert(ioque->count > 0);
- --ioque->count;
+ pj_assert(ioqueue->count > 0);
+ --ioqueue->count;
pj_list_erase(key);
- PJ_FD_CLR(key->fd, &ioque->rfdset);
- PJ_FD_CLR(key->fd, &ioque->wfdset);
+ PJ_FD_CLR(key->fd, &ioqueue->rfdset);
+ PJ_FD_CLR(key->fd, &ioqueue->wfdset);
#if PJ_HAS_TCP
- PJ_FD_CLR(key->fd, &ioque->xfdset);
+ PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif
-
- pj_lock_release(ioque->lock);
+
+ pj_lock_release(ioqueue->lock);
return PJ_SUCCESS;
}
@@ -274,25 +323,40 @@ PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
return key->user_data;
}
+
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+ key->user_data = user_data;
+
+ return PJ_SUCCESS;
+}
+
/* This supposed to check whether the fd_set values are consistent
* with the operation currently set in each key.
*/
#if VALIDATE_FD_SET
-static void validate_sets(const pj_ioqueue_t *ioque,
+static void validate_sets(const pj_ioqueue_t *ioqueue,
const pj_fd_set_t *rfdset,
const pj_fd_set_t *wfdset,
const pj_fd_set_t *xfdset)
{
pj_ioqueue_key_t *key;
- key = ioque->hlist.next;
- while (key != &ioque->hlist) {
- if ((key->op & PJ_IOQUEUE_OP_READ)
- || (key->op & PJ_IOQUEUE_OP_RECV)
- || (key->op & PJ_IOQUEUE_OP_RECV_FROM)
+ key = ioqueue->key_list.next;
+ while (key != &ioqueue->key_list) {
+ if (!pj_list_empty(&key->read_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
- || (key->op & PJ_IOQUEUE_OP_ACCEPT)
+ || !pj_list_empty(&key->accept_list)
#endif
)
{
@@ -301,11 +365,9 @@ static void validate_sets(const pj_ioqueue_t *ioque,
else {
pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
}
- if ((key->op & PJ_IOQUEUE_OP_WRITE)
- || (key->op & PJ_IOQUEUE_OP_SEND)
- || (key->op & PJ_IOQUEUE_OP_SEND_TO)
+ if (!pj_list_empty(&key->write_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
- || (key->op & PJ_IOQUEUE_OP_CONNECT)
+ || key->connecting
#endif
)
{
@@ -315,7 +377,7 @@ static void validate_sets(const pj_ioqueue_t *ioque,
pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
- if (key->op & PJ_IOQUEUE_OP_CONNECT)
+ if (key->connecting)
{
pj_assert(PJ_FD_ISSET(key->fd, xfdset));
}
@@ -347,124 +409,263 @@ static void validate_sets(const pj_ioqueue_t *ioque,
* - to guarantee preemptiveness etc, the poll function must strictly
* work on fd_set copy of the ioqueue (not the original one).
*/
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
int count;
pj_ioqueue_key_t *h;
+
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
/* Lock ioqueue before making fd_set copies */
- pj_lock_acquire(ioque->lock);
-
- if (PJ_FD_COUNT(&ioque->rfdset)==0 &&
- PJ_FD_COUNT(&ioque->wfdset)==0 &&
- PJ_FD_COUNT(&ioque->xfdset)==0)
+ pj_lock_acquire(ioqueue->lock);
+
+ /* We will only do select() when there are sockets to be polled.
+ * Otherwise select() will return error.
+ */
+ if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
+ PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
+ PJ_FD_COUNT(&ioqueue->xfdset)==0)
{
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
if (timeout)
pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
return 0;
}
/* Copy ioqueue's pj_fd_set_t to local variables. */
- pj_memcpy(&rfdset, &ioque->rfdset, sizeof(pj_fd_set_t));
- pj_memcpy(&wfdset, &ioque->wfdset, sizeof(pj_fd_set_t));
+ pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
+ pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
#if PJ_HAS_TCP
- pj_memcpy(&xfdset, &ioque->xfdset, sizeof(pj_fd_set_t));
+ pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
#else
PJ_FD_ZERO(&xfdset);
#endif
#if VALIDATE_FD_SET
- validate_sets(ioque, &rfdset, &wfdset, &xfdset);
+ validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
#endif
/* Unlock ioqueue before select(). */
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
if (count <= 0)
return count;
- /* Lock ioqueue again before scanning for signalled sockets. */
- pj_lock_acquire(ioque->lock);
-
-#if PJ_HAS_TCP
- /* Scan for exception socket */
- h = ioque->hlist.next;
-do_except_scan:
- for ( ; h!=&ioque->hlist; h = h->next) {
- if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset))
- break;
- }
- if (h != &ioque->hlist) {
- /* 'connect()' should be the only operation. */
- pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT));
-
- /* Clear operation. */
- h->op &= ~(PJ_IOQUEUE_OP_CONNECT);
- PJ_FD_CLR(h->fd, &ioque->wfdset);
- PJ_FD_CLR(h->fd, &ioque->xfdset);
- PJ_FD_CLR(h->fd, &wfdset);
- PJ_FD_CLR(h->fd, &xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-
- /* Re-scan exception list. */
- goto do_except_scan;
- }
-#endif /* PJ_HAS_TCP */
+ /* Lock ioqueue again before scanning for signalled sockets.
+ * We must strictly use recursive mutex since application may invoke
+ * the ioqueue again inside the callback.
+ */
+ pj_lock_acquire(ioqueue->lock);
+ /* Scan for writable sockets first to handle piggy-back data
+ * coming with accept().
+ */
+ h = ioqueue->key_list.next;
+do_writable_scan:
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if ( (!pj_list_empty(&h->write_list) || h->connecting)
+ && PJ_FD_ISSET(h->fd, &wfdset))
+ {
+ break;
+ }
+ }
+ if (h != &ioqueue->key_list) {
+ pj_assert(!pj_list_empty(&h->write_list) || h->connecting);
+
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
+ if (h->connecting) {
+ /* Completion of connect() operation */
+ pj_ssize_t bytes_transfered;
+
+#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
+ /* from connect(2):
+ * On Linux, use getsockopt to read the SO_ERROR option at
+ * level SOL_SOCKET to determine whether connect() completed
+ * successfully (if SO_ERROR is zero).
+ */
+ int value;
+ socklen_t vallen = sizeof(value);
+ int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
+ &value, &vallen);
+ if (gs_rc != 0) {
+ /* Argh!! What to do now???
+ * Just indicate that the socket is connected. The
+ * application will get error as soon as it tries to use
+ * the socket to send/receive.
+ */
+ bytes_transfered = 0;
+ } else {
+ bytes_transfered = value;
+ }
+#elif defined(PJ_WIN32) && PJ_WIN32!=0
+ bytes_transfered = 0; /* success */
+#else
+ /* Excellent information in D.J. Bernstein page:
+ * http://cr.yp.to/docs/connect.html
+ *
+ * Seems like the most portable way of detecting connect()
+ * failure is to call getpeername(). If socket is connected,
+ * getpeername() will return 0. If the socket is not connected,
+ * it will return ENOTCONN, and read(fd, &ch, 1) will produce
+ * the right errno through error slippage. This is a combination
+ * of suggestions from Douglas C. Schmidt and Ken Keys.
+ */
+ int gp_rc;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
+ bytes_transfered = gp_rc;
+#endif
+
+ /* Clear operation. */
+ h->connecting = 0;
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+ PJ_FD_CLR(h->fd, &ioqueue->xfdset);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, bytes_transfered);
+
+ /* Re-scan writable sockets. */
+ goto do_writable_scan;
+
+ } else
+#endif /* PJ_HAS_TCP */
+ {
+ /* Socket is writable. */
+ struct write_operation *write_op;
+ pj_ssize_t sent;
+ pj_status_t send_rc;
+
+ /* Get the first in the queue. */
+ write_op = h->write_list.next;
+
+ /* Send the data. */
+ sent = write_op->size - write_op->written;
+ if (write_op->op == PJ_IOQUEUE_OP_SEND) {
+ send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
+ &sent, write_op->flags);
+ } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
+ send_rc = pj_sock_sendto(h->fd,
+ write_op->buf+write_op->written,
+ &sent, write_op->flags,
+ &write_op->rmt_addr,
+ write_op->rmt_addrlen);
+ } else {
+ pj_assert(!"Invalid operation type!");
+ send_rc = PJ_EBUG;
+ }
+
+ if (send_rc == PJ_SUCCESS) {
+ write_op->written += sent;
+ } else {
+ pj_assert(send_rc > 0);
+ write_op->written = -send_rc;
+ }
+
+ /* In any case we don't need to process this descriptor again. */
+ PJ_FD_CLR(h->fd, &wfdset);
+
+ /* Are we finished with this buffer? */
+ if (send_rc!=PJ_SUCCESS ||
+ write_op->written == (pj_ssize_t)write_op->size)
+ {
+ pj_list_erase(write_op);
+
+ /* Clear operation if there's no more data to send. */
+ if (pj_list_empty(&h->write_list))
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+
+ /* Call callback. */
+ if (h->cb.on_write_complete) {
+ (*h->cb.on_write_complete)(h,
+ (pj_ioqueue_op_key_t*)write_op,
+ write_op->written);
+ }
+ }
+
+ /* Re-scan writable sockets. */
+ goto do_writable_scan;
+ }
+ }
+
/* Scan for readable socket. */
- h = ioque->hlist.next;
+ h = ioqueue->key_list.next;
do_readable_scan:
- for ( ; h!=&ioque->hlist; h = h->next) {
- if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) &&
- PJ_FD_ISSET(h->fd, &rfdset))
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if ((!pj_list_empty(&h->read_list)
+#if PJ_HAS_TCP
+ || !pj_list_empty(&h->accept_list)
+#endif
+ ) && PJ_FD_ISSET(h->fd, &rfdset))
{
break;
}
}
- if (h != &ioque->hlist) {
+ if (h != &ioqueue->key_list) {
pj_status_t rc;
- pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) ||
- PJ_IOQUEUE_IS_ACCEPT_OP(h->op));
+#if PJ_HAS_TCP
+ pj_assert(!pj_list_empty(&h->read_list) ||
+ !pj_list_empty(&h->accept_list));
+#else
+ pj_assert(!pj_list_empty(&h->read_list));
+#endif
# if PJ_HAS_TCP
- if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) {
- /* accept() must be the only operation specified on server socket */
- pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT);
-
- rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen);
- if (rc==0 && h->local_addr) {
- rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,
- h->local_addrlen);
+ if (!pj_list_empty(&h->accept_list)) {
+
+ struct accept_operation *accept_op;
+
+ /* Get one accept operation from the list. */
+ accept_op = h->accept_list.next;
+ pj_list_erase(accept_op);
+
+ rc=pj_sock_accept(h->fd, accept_op->accept_fd,
+ accept_op->rmt_addr, accept_op->addrlen);
+ if (rc==PJ_SUCCESS && accept_op->local_addr) {
+ rc = pj_sock_getsockname(*accept_op->accept_fd,
+ accept_op->local_addr,
+ accept_op->addrlen);
}
- h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
- PJ_FD_CLR(h->fd, &ioque->rfdset);
+ /* Clear bit in fdset if there is no more pending accept */
+ if (pj_list_empty(&h->accept_list))
+ PJ_FD_CLR(h->fd, &ioqueue->rfdset);
/* Call callback. */
if (h->cb.on_accept_complete)
- (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
+ (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,
+ *accept_op->accept_fd, rc);
/* Re-scan readable sockets. */
goto do_readable_scan;
- }
+ }
else {
-# endif
- pj_ssize_t bytes_read = h->rd_buflen;
-
- if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0,
- h->rmt_addr, h->rmt_addrlen);
- } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
- } else {
+# endif
+ struct read_operation *read_op;
+ pj_ssize_t bytes_read;
+
+ pj_assert(!pj_list_empty(&h->read_list));
+
+ /* Get one pending read operation from the list. */
+ read_op = h->read_list.next;
+ pj_list_erase(read_op);
+
+ bytes_read = read_op->size;
+
+ if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
+ rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
+ read_op->rmt_addr,
+ read_op->rmt_addrlen);
+ } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ } else {
+ pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
/*
* User has specified pj_ioqueue_read().
* On Win32, we should do ReadFile(). But because we got
@@ -478,9 +679,10 @@ do_readable_scan:
* that error is easier to catch.
*/
# if defined(PJ_WIN32) && PJ_WIN32 != 0
- rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
-# elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \
- (defined(PJ_SUNOS) && PJ_SUNOS != 0)
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
+ // &bytes_read, NULL);
+# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
bytes_read = read(h->fd, h->rd_buf, bytes_read);
rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
@@ -503,124 +705,61 @@ do_readable_scan:
*/
if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
- PJ_LOG(4,(THIS_FILE,
- "Ignored ICMP port unreach. on key=%p", h));
+ //PJ_LOG(4,(THIS_FILE,
+ // "Ignored ICMP port unreach. on key=%p", h));
}
# endif
/* In any case we would report this to caller. */
bytes_read = -rc;
}
-
- h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |
- PJ_IOQUEUE_OP_RECV_FROM);
- PJ_FD_CLR(h->fd, &ioque->rfdset);
+
+ /* Clear fdset if there is no pending read. */
+ if (pj_list_empty(&h->read_list))
+ PJ_FD_CLR(h->fd, &ioqueue->rfdset);
+
+ /* In any case clear from temporary set. */
PJ_FD_CLR(h->fd, &rfdset);
/* Call callback. */
if (h->cb.on_read_complete)
- (*h->cb.on_read_complete)(h, bytes_read);
+ (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,
+ bytes_read);
/* Re-scan readable sockets. */
goto do_readable_scan;
}
}
-
- /* Scan for writable socket */
- h = ioque->hlist.next;
-do_writable_scan:
- for ( ; h!=&ioque->hlist; h = h->next) {
- if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op))
- && PJ_FD_ISSET(h->fd, &wfdset))
- {
- break;
- }
- }
- if (h != &ioque->hlist) {
- pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) ||
- PJ_IOQUEUE_IS_CONNECT_OP(h->op));
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if ((h->op & PJ_IOQUEUE_OP_CONNECT)) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
-#if (defined(PJ_LINUX) && PJ_LINUX!=0) || \
- (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Clear operation. */
- h->op &= (~PJ_IOQUEUE_OP_CONNECT);
- PJ_FD_CLR(h->fd, &ioque->wfdset);
- PJ_FD_CLR(h->fd, &ioque->xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
-
- } else
-#endif /* PJ_HAS_TCP */
- {
- /* Completion of write(), send(), or sendto() operation. */
-
- /* Clear operation. */
- h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |
- PJ_IOQUEUE_OP_SEND_TO);
- PJ_FD_CLR(h->fd, &ioque->wfdset);
- PJ_FD_CLR(h->fd, &wfdset);
-
- /* Call callback. */
- /* All data must have been sent? */
- if (h->cb.on_write_complete)
- (*h->cb.on_write_complete)(h, h->wr_buflen);
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
- }
- }
-
+
+#if PJ_HAS_TCP
+ /* Scan for exception socket for TCP connection error. */
+ h = ioqueue->key_list.next;
+do_except_scan:
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))
+ break;
+ }
+ if (h != &ioqueue->key_list) {
+
+ pj_assert(h->connecting);
+
+ /* Clear operation. */
+ h->connecting = 0;
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+ PJ_FD_CLR(h->fd, &ioqueue->xfdset);
+ PJ_FD_CLR(h->fd, &wfdset);
+ PJ_FD_CLR(h->fd, &xfdset);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, -1);
+
+ /* Re-scan exception list. */
+ goto do_except_scan;
+ }
+#endif /* PJ_HAS_TCP */
+
/* Shouldn't happen. */
/* For strange reason on WinXP select() can return 1 while there is no
* pj_fd_set_t signaled. */
@@ -628,75 +767,63 @@ do_writable_scan:
//count = 0;
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return count;
}
/*
- * pj_ioqueue_read()
- *
- * Start asynchronous read from the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen)
-{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_READ;
- key->rd_flags = 0;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- PJ_FD_SET(key->fd, &ioque->rfdset);
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-
-/*
* pj_ioqueue_recv()
*
* Start asynchronous recv() from the socket.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags )
-{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
+{
+ pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+ pj_ioqueue_t *ioqueue;
- key->op |= PJ_IOQUEUE_OP_RECV;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- key->rd_flags = flags;
- PJ_FD_SET(key->fd, &ioque->rfdset);
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recv(key->fd, buffer, &size, flags);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ read_op = (struct read_operation*)op_key;
+
+ read_op->op = PJ_IOQUEUE_OP_RECV;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+
+ pj_list_insert_before(&key->read_list, read_op);
+ PJ_FD_SET(key->fd, &ioqueue->rfdset);
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
@@ -705,80 +832,60 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
*
* Start asynchronous recvfrom() from the socket.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags,
pj_sockaddr_t *addr,
int *addrlen)
{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_RECV_FROM;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- key->rd_flags = flags;
- key->rmt_addr = addr;
- key->rmt_addrlen = addrlen;
- PJ_FD_SET(key->fd, &ioque->rfdset);
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen)
-{
- pj_status_t rc;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_send() if it returns error. */
- rc = pj_sock_send(key->fd, data, &sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
- }
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_WRITE;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
- PJ_FD_SET(key->fd, &ioque->wfdset);
-
- pj_lock_release(ioque->lock);
-
- return PJ_EPENDING;
+ pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+ pj_ioqueue_t *ioqueue;
+
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
+ addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ read_op = (struct read_operation*)op_key;
+
+ read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+ read_op->rmt_addr = addr;
+ read_op->rmt_addrlen = addrlen;
+
+ pj_list_insert_before(&key->read_list, read_op);
+ PJ_FD_SET(key->fd, &ioqueue->rfdset);
+
+ pj_lock_release(ioqueue->lock);
+ return PJ_EPENDING;
}
/*
@@ -786,41 +893,71 @@ PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
*
* Start asynchronous send() to the descriptor.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags)
-{
- pj_status_t rc;
+{
+ pj_ioqueue_t *ioqueue;
+ struct write_operation *write_op;
+ pj_status_t status;
pj_ssize_t sent;
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_send() if it returns error. */
- rc = pj_sock_send(key->fd, data, &sent, flags);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_send(key->fd, data, &sent, flags);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
}
+
+ /*
+ * Schedule asynchronous send.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = PJ_IOQUEUE_OP_SEND;
+ write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+
+ pj_list_insert_before(&key->write_list, write_op);
+ PJ_FD_SET(key->fd, &ioqueue->wfdset);
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_SEND;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
- PJ_FD_SET(key->fd, &ioque->wfdset);
-
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
@@ -831,75 +968,149 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
*
* Start asynchronous write() to the descriptor.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags,
const pj_sockaddr_t *addr,
int addrlen)
{
- pj_status_t rc;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_sendto() if it returns error. */
- rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
- }
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_SEND_TO;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
- PJ_FD_SET(key->fd, &ioque->wfdset);
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
+ pj_ioqueue_t *ioqueue;
+ struct write_operation *write_op;
+ pj_status_t status;
+ pj_ssize_t sent;
+
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * Check that address storage can hold the address parameter.
+ */
+ PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
+
+ /*
+ * Schedule asynchronous send.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = PJ_IOQUEUE_OP_SEND_TO;
+ write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+ pj_memcpy(&write_op->rmt_addr, addr, addrlen);
+ write_op->rmt_addrlen = addrlen;
+
+ pj_list_insert_before(&key->write_list, write_op);
+ PJ_FD_SET(key->fd, &ioqueue->wfdset);
+
+ pj_lock_release(ioqueue->lock);
+
+ return PJ_EPENDING;
}
#if PJ_HAS_TCP
/*
* Initiate overlapped accept() operation.
*/
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- /* check parameters. All must be specified! */
- pj_assert(ioqueue && key && new_sock);
-
- /* Server socket must have no other operation! */
- pj_assert(key->op == 0);
-
- pj_lock_acquire(ioqueue->lock);
-
- key->op = PJ_IOQUEUE_OP_ACCEPT;
- key->accept_fd = new_sock;
- key->rmt_addr = remote;
- key->rmt_addrlen = addrlen;
- key->local_addr = local;
- key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t *new_sock,
+ pj_sockaddr_t *local,
+ pj_sockaddr_t *remote,
+ int *addrlen)
+{
+ pj_ioqueue_t *ioqueue;
+ struct accept_operation *accept_op;
+ pj_status_t status;
+ /* check parameters. All must be specified! */
+ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+ /* Fast track:
+ * See if there's new connection available immediately.
+ */
+ if (pj_list_empty(&key->accept_list)) {
+ status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! New connection is available! */
+ if (local && addrlen) {
+ status = pj_sock_getsockname(*new_sock, local, addrlen);
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(*new_sock);
+ *new_sock = PJ_INVALID_SOCKET;
+ return status;
+ }
+ }
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * No connection is available immediately.
+ * Schedule accept() operation to be completed when there is incoming
+ * connection available.
+ */
+ ioqueue = key->ioqueue;
+ accept_op = (struct accept_operation*)op_key;
+
+ pj_lock_acquire(ioqueue->lock);
+
+ accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
+ accept_op->accept_fd = new_sock;
+ accept_op->rmt_addr = remote;
+ accept_op->addrlen= addrlen;
+ accept_op->local_addr = local;
+
+ pj_list_insert_before(&key->accept_list, accept_op);
PJ_FD_SET(key->fd, &ioqueue->rfdset);
+
+ pj_lock_release(ioqueue->lock);
- pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
@@ -907,37 +1118,37 @@ PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
* Initiate overlapped connect() operation (well, it's non-blocking actually,
* since there's no overlapped version of connect()).
*/
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
-{
- pj_status_t rc;
+{
+ pj_ioqueue_t *ioqueue;
+ pj_status_t status;
/* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
+ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
- /* Connecting socket must have no other operation! */
- PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
+ /* Check if socket has not been marked for connecting */
+ if (key->connecting != 0)
+ return PJ_EPENDING;
- rc = pj_sock_connect(key->fd, addr, addrlen);
- if (rc == PJ_SUCCESS) {
+ status = pj_sock_connect(key->fd, addr, addrlen);
+ if (status == PJ_SUCCESS) {
/* Connected! */
return PJ_SUCCESS;
} else {
- if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||
- rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))
- {
- /* Pending! */
+ if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
+ /* Pending! */
+ ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
- key->op = PJ_IOQUEUE_OP_CONNECT;
+ key->connecting = PJ_TRUE;
PJ_FD_SET(key->fd, &ioqueue->wfdset);
PJ_FD_SET(key->fd, &ioqueue->xfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
} else {
/* Error! */
- return rc;
+ return status;
}
}
}
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index dbf883a4..afb75c54 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -22,17 +22,28 @@
# include <mswsock.h>
#endif
-
-#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+20)
+
+/* The address specified in AcceptEx() must be 16 more than the size of
+ * SOCKADDR (source: MSDN).
+ */
+#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
+
+typedef struct generic_overlapped
+{
+ WSAOVERLAPPED overlapped;
+ pj_ioqueue_operation_e operation;
+} generic_overlapped;
/*
* OVERLAP structure for send and receive.
*/
typedef struct ioqueue_overlapped
{
- WSAOVERLAPPED overlapped;
+ WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
- WSABUF wsabuf;
+ WSABUF wsabuf;
+ pj_sockaddr_in dummy_addr;
+ int dummy_addrlen;
} ioqueue_overlapped;
#if PJ_HAS_TCP
@@ -41,7 +52,7 @@ typedef struct ioqueue_overlapped
*/
typedef struct ioqueue_accept_rec
{
- WSAOVERLAPPED overlapped;
+ WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
pj_sock_t newsock;
pj_sock_t *newsock_ptr;
@@ -51,19 +62,29 @@ typedef struct ioqueue_accept_rec
char accept_buf[2 * ACCEPT_ADDR_LEN];
} ioqueue_accept_rec;
#endif
+
+/*
+ * Structure to hold pending operation key.
+ */
+union operation_key
+{
+ generic_overlapped generic;
+ ioqueue_overlapped overlapped;
+#if PJ_HAS_TCP
+ ioqueue_accept_rec accept;
+#endif
+};
/*
* Structure for individual socket.
*/
struct pj_ioqueue_key_t
-{
+{
+ pj_ioqueue_t *ioqueue;
HANDLE hnd;
void *user_data;
- ioqueue_overlapped recv_overlapped;
- ioqueue_overlapped send_overlapped;
#if PJ_HAS_TCP
int connecting;
- ioqueue_accept_rec accept_overlapped;
#endif
pj_ioqueue_callback cb;
};
@@ -106,9 +127,14 @@ static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
&local,
&locallen,
&remote,
- &remotelen);
- pj_memcpy(accept_overlapped->local, local, locallen);
- pj_memcpy(accept_overlapped->remote, remote, locallen);
+ &remotelen);
+ if (*accept_overlapped->addrlen > locallen) {
+ pj_memcpy(accept_overlapped->local, local, locallen);
+ pj_memcpy(accept_overlapped->remote, remote, locallen);
+ } else {
+ pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen);
+ pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen);
+ }
*accept_overlapped->addrlen = locallen;
if (accept_overlapped->newsock_ptr)
*accept_overlapped->newsock_ptr = accept_overlapped->newsock;
@@ -120,7 +146,6 @@ static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
{
pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
HANDLE hEvent = ioqueue->connecting_handles[pos];
- unsigned long optval;
/* Remove key from array of connecting handles. */
pj_array_erase(ioqueue->connecting_keys, sizeof(key),
@@ -143,12 +168,6 @@ static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
CloseHandle(hEvent);
}
- /* Set socket to blocking again. */
- optval = 0;
- if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
- DWORD dwStatus;
- dwStatus = WSAGetLastError();
- }
}
/*
@@ -183,7 +202,8 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue,
WSAEnumNetworkEvents((pj_sock_t)key->hnd,
ioqueue->connecting_handles[pos],
&net_events);
- *connect_err = net_events.iErrorCode[FD_CONNECT_BIT];
+ *connect_err =
+ PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
/* Erase socket from pending connect. */
erase_connecting_socket(ioqueue, pos);
@@ -194,95 +214,121 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue,
}
#endif
-
+/*
+ * pj_ioqueue_create()
+ */
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
- int max_threads,
- pj_ioqueue_t **ioqueue)
+ pj_ioqueue_t **p_ioqueue)
{
- pj_ioqueue_t *ioq;
+ pj_ioqueue_t *ioqueue;
pj_status_t rc;
PJ_UNUSED_ARG(max_fd);
- PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL);
-
- ioq = pj_pool_zalloc(pool, sizeof(*ioq));
- ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads);
- if (ioq->iocp == NULL)
+ PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
+
+ rc = sizeof(union operation_key);
+
+ /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+
+ ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
+ ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if (ioqueue->iocp == NULL)
return PJ_RETURN_OS_ERROR(GetLastError());
- rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock);
+ rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
if (rc != PJ_SUCCESS) {
- CloseHandle(ioq->iocp);
+ CloseHandle(ioqueue->iocp);
return rc;
}
- ioq->auto_delete_lock = PJ_TRUE;
+ ioqueue->auto_delete_lock = PJ_TRUE;
- *ioqueue = ioq;
+ *p_ioqueue = ioqueue;
- PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq));
+ PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
return PJ_SUCCESS;
}
-
-PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque )
+
+/*
+ * pj_ioqueue_destroy()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
{
unsigned i;
PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
/* Destroy events in the pool */
- for (i=0; i<ioque->event_count; ++i) {
- CloseHandle(ioque->event_pool[i]);
+ for (i=0; i<ioqueue->event_count; ++i) {
+ CloseHandle(ioqueue->event_pool[i]);
}
- ioque->event_count = 0;
-
- if (ioque->auto_delete_lock)
- pj_lock_destroy(ioque->lock);
-
- if (CloseHandle(ioque->iocp) == TRUE)
- return PJ_SUCCESS;
- else
- return PJ_RETURN_OS_ERROR(GetLastError());
+ ioqueue->event_count = 0;
+
+ if (CloseHandle(ioqueue->iocp) != TRUE)
+ return PJ_RETURN_OS_ERROR(GetLastError());
+
+ if (ioqueue->auto_delete_lock)
+ pj_lock_destroy(ioqueue->lock);
+
+ return PJ_SUCCESS;
}
-
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
+
+/*
+ * pj_ioqueue_set_lock()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
pj_lock_t *lock,
pj_bool_t auto_delete )
{
- PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
+ PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
- if (ioque->auto_delete_lock) {
- pj_lock_destroy(ioque->lock);
+ if (ioqueue->auto_delete_lock) {
+ pj_lock_destroy(ioqueue->lock);
}
- ioque->lock = lock;
- ioque->auto_delete_lock = auto_delete;
+ ioqueue->lock = lock;
+ ioqueue->auto_delete_lock = auto_delete;
return PJ_SUCCESS;
}
-
+
+/*
+ * pj_ioqueue_register_sock()
+ */
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
- pj_ioqueue_t *ioque,
- pj_sock_t hnd,
+ pj_ioqueue_t *ioqueue,
+ pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **key )
{
HANDLE hioq;
- pj_ioqueue_key_t *rec;
-
- PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL);
-
- rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- rec->hnd = (HANDLE)hnd;
+ pj_ioqueue_key_t *rec;
+ u_long value;
+ int rc;
+
+ PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
+
+ /* Build the key for this socket. */
+ rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ rec->ioqueue = ioqueue;
+ rec->hnd = (HANDLE)sock;
rec->user_data = user_data;
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
-#if PJ_HAS_TCP
- rec->accept_overlapped.newsock = PJ_INVALID_SOCKET;
-#endif
- hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0);
+
+ /* Set socket to nonblocking. */
+ value = 1;
+ rc = ioctlsocket(sock, FIONBIO, &value);
+ if (rc != 0) {
+ return PJ_RETURN_OS_ERROR(WSAGetLastError());
+ }
+
+ /* Associate with IOCP */
+ hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
if (!hioq) {
return PJ_RETURN_OS_ERROR(GetLastError());
}
@@ -291,58 +337,78 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
return PJ_SUCCESS;
}
-
-
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key )
+/*
+ * pj_ioqueue_unregister()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
- PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
#if PJ_HAS_TCP
if (key->connecting) {
unsigned pos;
+ pj_ioqueue_t *ioqueue;
+
+ ioqueue = key->ioqueue;
/* Erase from connecting_handles */
- pj_lock_acquire(ioque->lock);
- for (pos=0; pos < ioque->connecting_count; ++pos) {
- if (ioque->connecting_keys[pos] == key) {
- erase_connecting_socket(ioque, pos);
- if (key->accept_overlapped.newsock_ptr) {
- /* ??? shouldn't it be newsock instead of newsock_ptr??? */
- closesocket(*key->accept_overlapped.newsock_ptr);
- }
+ pj_lock_acquire(ioqueue->lock);
+ for (pos=0; pos < ioqueue->connecting_count; ++pos) {
+ if (ioqueue->connecting_keys[pos] == key) {
+ erase_connecting_socket(ioqueue, pos);
break;
}
}
- pj_lock_release(ioque->lock);
- key->connecting = 0;
+ key->connecting = 0;
+ pj_lock_release(ioqueue->lock);
}
#endif
return PJ_SUCCESS;
}
-
+
+/*
+ * pj_ioqueue_get_user_data()
+ */
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
PJ_ASSERT_RETURN(key, NULL);
return key->user_data;
}
-
-/*
+
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data )
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+
+ key->user_data = user_data;
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_poll()
+ *
* Poll for events.
*/
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
DWORD dwMsec, dwBytesTransfered, dwKey;
- ioqueue_overlapped *ov;
+ generic_overlapped *pOv;
pj_ioqueue_key_t *key;
pj_ssize_t size_status;
BOOL rc;
- PJ_ASSERT_RETURN(ioque, -PJ_EINVAL);
+ PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
/* Check the connecting array. */
#if PJ_HAS_TCP
- key = check_connecting(ioque, &size_status);
+ key = check_connecting(ioqueue, &size_status);
if (key != NULL) {
key->cb.on_connect_complete(key, (int)size_status);
return 1;
@@ -353,40 +419,46 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
/* Poll for completion status. */
- rc = GetQueuedCompletionStatus(ioque->iocp, &dwBytesTransfered, &dwKey,
- (OVERLAPPED**)&ov, dwMsec);
+ rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey,
+ (OVERLAPPED**)&pOv, dwMsec);
/* The return value is:
* - nonzero if event was dequeued.
- * - zero and ov==NULL if no event was dequeued.
- * - zero and ov!=NULL if event for failed I/O was dequeued.
+ * - zero and pOv==NULL if no event was dequeued.
+ * - zero and pOv!=NULL if event for failed I/O was dequeued.
*/
- if (ov) {
+ if (pOv) {
/* Event was dequeued for either successfull or failed I/O */
key = (pj_ioqueue_key_t*)dwKey;
size_status = dwBytesTransfered;
- switch (ov->operation) {
+ switch (pOv->operation) {
case PJ_IOQUEUE_OP_READ:
case PJ_IOQUEUE_OP_RECV:
case PJ_IOQUEUE_OP_RECV_FROM:
- key->recv_overlapped.operation = 0;
+ pOv->operation = 0;
if (key->cb.on_read_complete)
- key->cb.on_read_complete(key, size_status);
+ key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ size_status);
break;
case PJ_IOQUEUE_OP_WRITE:
case PJ_IOQUEUE_OP_SEND:
case PJ_IOQUEUE_OP_SEND_TO:
- key->send_overlapped.operation = 0;
+ pOv->operation = 0;
if (key->cb.on_write_complete)
- key->cb.on_write_complete(key, size_status);
+ key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ size_status);
break;
#if PJ_HAS_TCP
case PJ_IOQUEUE_OP_ACCEPT:
/* special case for accept. */
- ioqueue_on_accept_complete((ioqueue_accept_rec*)ov);
- if (key->cb.on_accept_complete)
- key->cb.on_accept_complete(key, key->accept_overlapped.newsock,
- 0);
+ ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
+ if (key->cb.on_accept_complete) {
+ ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
+ key->cb.on_accept_complete(key,
+ (pj_ioqueue_op_key_t*)pOv,
+ accept_rec->newsock,
+ PJ_SUCCESS);
+ }
break;
case PJ_IOQUEUE_OP_CONNECT:
#endif
@@ -398,9 +470,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
}
if (GetLastError()==WAIT_TIMEOUT) {
- /* Check the connecting array. */
+ /* Check the connecting array (again). */
#if PJ_HAS_TCP
- key = check_connecting(ioque, &size_status);
+ key = check_connecting(ioqueue, &size_status);
if (key != NULL) {
key->cb.on_connect_complete(key, (int)size_status);
return 1;
@@ -412,95 +484,72 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
}
/*
- * pj_ioqueue_read()
- *
- * Initiate overlapped ReadFile operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen)
-{
- BOOL rc;
- DWORD bytesRead;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this descriptor");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
- key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ;
-
- rc = ReadFile(key->hnd, buffer, buflen, &bytesRead,
- &key->recv_overlapped.overlapped);
- if (rc == FALSE) {
- DWORD dwStatus = GetLastError();
- if (dwStatus==ERROR_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /*
- * This is workaround to a probable bug in Win2000 (probably NT too).
- * Even if 'rc' is TRUE, which indicates operation has completed,
- * GetQueuedCompletionStatus still will return the key.
- * So as work around, we always return PJ_EPENDING here.
- */
- return PJ_EPENDING;
- }
-}
-
-/*
* pj_ioqueue_recv()
*
* Initiate overlapped WSARecv() operation.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags )
-{
- int rc;
- DWORD bytesRead;
- DWORD dwFlags = 0;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
- key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ;
-
- key->recv_overlapped.wsabuf.buf = buffer;
- key->recv_overlapped.wsabuf.len = buflen;
-
- dwFlags = flags;
-
- rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1,
- &bytesRead, &dwFlags,
- &key->recv_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesRead;
- */
- return PJ_EPENDING;
- }
+{
+ /*
+ * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
+ * addrlen here. But unfortunately it generates EINVAL... :-(
+ * -bennylp
+ */
+ int rc;
+ DWORD bytesRead;
+ DWORD dwFlags = 0;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+ op_key_rec->overlapped.wsabuf.buf = buffer;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ dwFlags = flags;
+
+ /* Try non-overlapped received first to see if data is
+ * immediately available.
+ */
+ rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, NULL, NULL);
+ if (rc == 0) {
+ *length = bytesRead;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ /*
+ * No immediate data available.
+ * Register overlapped Recv() operation.
+ */
+ pj_memset(&op_key_rec->overlapped.overlapped, 0,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
+
+ rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING) {
+ *length = -1;
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+ }
+
+ /* Pending operation has been scheduled. */
+ return PJ_EPENDING;
}
/*
@@ -508,136 +557,79 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
*
* Initiate overlapped RecvFrom() operation.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags,
pj_sockaddr_t *addr,
int *addrlen)
{
- BOOL rc;
- DWORD bytesRead;
- DWORD dwFlags;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
- key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM;
- key->recv_overlapped.wsabuf.buf = buffer;
- key->recv_overlapped.wsabuf.len = buflen;
- dwFlags = flags;
- rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1,
- &bytesRead, &dwFlags,
- addr, addrlen,
- &key->recv_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesRead;
- */
- return PJ_EPENDING;
- }
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Initiate overlapped WriteFile() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen)
-{
- BOOL rc;
- DWORD bytesWritten;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this descriptor");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
- key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE;
- rc = WriteFile(key->hnd, data, datalen, &bytesWritten,
- &key->send_overlapped.overlapped);
-
- if (rc == FALSE) {
- DWORD dwStatus = GetLastError();
- if (dwStatus==ERROR_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesWritten;
- */
- return PJ_EPENDING;
- }
+ int rc;
+ DWORD bytesRead;
+ DWORD dwFlags = 0;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+ op_key_rec->overlapped.wsabuf.buf = buffer;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ dwFlags = flags;
+
+ /* Try non-overlapped received first to see if data is
+ * immediately available.
+ */
+ rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
+ if (rc == 0) {
+ *length = bytesRead;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ /*
+ * No immediate data available.
+ * Register overlapped Recv() operation.
+ */
+ pj_memset(&op_key_rec->overlapped.overlapped, 0,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
+
+ rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, addr, addrlen,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING) {
+ *length = -1;
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+ }
+
+ /* Pending operation has been scheduled. */
+ return PJ_EPENDING;
}
-
/*
* pj_ioqueue_send()
*
* Initiate overlapped Send operation.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags )
{
- int rc;
- DWORD bytesWritten;
- DWORD dwFlags;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
- key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE;
- key->send_overlapped.wsabuf.buf = (void*)data;
- key->send_overlapped.wsabuf.len = datalen;
- dwFlags = flags;
- rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1,
- &bytesWritten, dwFlags,
- &key->send_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesRead;
- */
- return PJ_EPENDING;
- }
+ return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
}
@@ -646,46 +638,65 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
*
* Initiate overlapped SendTo operation.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags,
const pj_sockaddr_t *addr,
int addrlen)
-{
- BOOL rc;
- DWORD bytesSent;
- DWORD dwFlags;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
- key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO;
- key->send_overlapped.wsabuf.buf = (char*)data;
- key->send_overlapped.wsabuf.len = datalen;
- dwFlags = flags;
- rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1,
- &bytesSent, dwFlags, addr,
- addrlen, &key->send_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- // Must always return pending status.
- // See comments on pj_ioqueue_read
- // return bytesSent;
- return PJ_EPENDING;
- }
+{
+ int rc;
+ DWORD bytesWritten;
+ DWORD dwFlags;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+
+ dwFlags = flags;
+
+ /*
+ * First try blocking write.
+ */
+ op_key_rec->overlapped.wsabuf.buf = (void*)data;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesWritten, dwFlags, addr, addrlen,
+ NULL, NULL);
+ if (rc == 0) {
+ *length = bytesWritten;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwStatus);
+ }
+ }
+
+ /*
+ * Data can't be sent immediately.
+ * Schedule asynchronous WSASend().
+ */
+ pj_memset(&op_key_rec->overlapped.overlapped, 0,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
+
+ rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesWritten, dwFlags, addr, addrlen,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING)
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+
+ /* Asynchronous operation successfully submitted. */
+ return PJ_EPENDING;
}
#if PJ_HAS_TCP
@@ -695,59 +706,93 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
*
* Initiate overlapped accept() operation.
*/
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t *new_sock,
+ pj_sockaddr_t *local,
+ pj_sockaddr_t *remote,
+ int *addrlen)
{
- BOOL rc;
+ BOOL rc;
DWORD bytesReceived;
- pj_status_t status;
+ pj_status_t status;
+ union operation_key *op_key_rec;
+ SOCKET sock;
PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioqueue);
-
- if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) {
- pj_sock_t sock;
- status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock);
- if (status != PJ_SUCCESS)
- return status;
-
- key->accept_overlapped.newsock = sock;
- }
- key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT;
- key->accept_overlapped.addrlen = addrlen;
- key->accept_overlapped.local = local;
- key->accept_overlapped.remote = remote;
- key->accept_overlapped.newsock_ptr = new_sock;
- pj_memset(&key->accept_overlapped.overlapped, 0,
- sizeof(key->accept_overlapped.overlapped));
-
- rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock,
- key->accept_overlapped.accept_buf,
+ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+ /*
+ * See if there is a new connection immediately available.
+ */
+ sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
+ if (sock != INVALID_SOCKET) {
+ /* Yes! New socket is available! */
+ int status;
+
+ status = getsockname(sock, local, addrlen);
+ if (status != 0) {
+ DWORD dwError = WSAGetLastError();
+ closesocket(sock);
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+
+ *new_sock = sock;
+ return PJ_SUCCESS;
+
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ /*
+ * No connection is immediately available.
+ * Must schedule an asynchronous operation.
+ */
+ op_key_rec = (union operation_key*)op_key->internal__;
+
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
+ &op_key_rec->accept.newsock);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
+ * addresses can be obtained with getsockname() and getpeername().
+ */
+ status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&key->hnd, sizeof(SOCKET));
+ /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
+ * So ignore the error status.
+ */
+
+ op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
+ op_key_rec->accept.addrlen = addrlen;
+ op_key_rec->accept.local = local;
+ op_key_rec->accept.remote = remote;
+ op_key_rec->accept.newsock_ptr = new_sock;
+ pj_memset(&op_key_rec->accept.overlapped, 0,
+ sizeof(op_key_rec->accept.overlapped));
+
+ rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
+ op_key_rec->accept.accept_buf,
0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
&bytesReceived,
- &key->accept_overlapped.overlapped);
+ &op_key_rec->accept.overlapped );
if (rc == TRUE) {
- ioqueue_on_accept_complete(&key->accept_overlapped);
- if (key->cb.on_accept_complete)
- key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0);
+ ioqueue_on_accept_complete(&op_key_rec->accept);
return PJ_SUCCESS;
} else {
DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
+ if (dwStatus!=WSA_IO_PENDING)
return PJ_STATUS_FROM_OS(dwStatus);
- }
+ }
+
+ /* Asynchronous Accept() has been submitted. */
+ return PJ_EPENDING;
}
@@ -757,42 +802,29 @@ PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
* Initiate overlapped connect() operation (well, it's non-blocking actually,
* since there's no overlapped version of connect()).
*/
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
{
- unsigned long optval = 1;
- HANDLE hEvent;
+ HANDLE hEvent;
+ pj_ioqueue_t *ioqueue;
PJ_CHECK_STACK();
-
- /* Set socket to non-blocking. */
- if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
+ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
/* Initiate connect() */
if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
DWORD dwStatus;
dwStatus = WSAGetLastError();
- if (dwStatus != WSAEWOULDBLOCK) {
- /* Permanent error */
+ if (dwStatus != WSAEWOULDBLOCK) {
return PJ_RETURN_OS_ERROR(dwStatus);
- } else {
- /* Pending operation. This is what we're looking for. */
}
} else {
/* Connect has completed immediately! */
- /* Restore to blocking mode. */
- optval = 0;
- if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
-
- key->cb.on_connect_complete(key, 0);
return PJ_SUCCESS;
}
+
+ ioqueue = key->ioqueue;
/* Add to the array of connecting socket to be polled */
pj_lock_acquire(ioqueue->lock);
diff --git a/pjlib/src/pjlib-test/atomic.c b/pjlib/src/pjlib-test/atomic.c
index 429085e1..09bdfdba 100644
--- a/pjlib/src/pjlib-test/atomic.c
+++ b/pjlib/src/pjlib-test/atomic.c
@@ -47,21 +47,29 @@ int atomic_test(void)
/* get: check the value. */
if (pj_atomic_get(atomic_var) != 111)
return -30;
-
- /* increment. */
- if (pj_atomic_inc(atomic_var) != 112)
+
+ /* increment. */
+ pj_atomic_inc(atomic_var);
+ if (pj_atomic_get(atomic_var) != 112)
return -40;
- /* decrement. */
- if (pj_atomic_dec(atomic_var) != 111)
+ /* decrement. */
+ pj_atomic_dec(atomic_var);
+ if (pj_atomic_get(atomic_var) != 111)
return -50;
- /* set */
- if (pj_atomic_set(atomic_var, 211) != 111)
+ /* set */
+ pj_atomic_set(atomic_var, 211);
+ if (pj_atomic_get(atomic_var) != 211)
return -60;
+
+ /* add */
+ pj_atomic_add(atomic_var, 10);
+ if (pj_atomic_get(atomic_var) != 221)
+ return -60;
/* check the value again. */
- if (pj_atomic_get(atomic_var) != 211)
+ if (pj_atomic_get(atomic_var) != 221)
return -70;
/* destroy */
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index cb93c4cf..4cd11068 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -34,77 +34,105 @@ static unsigned last_error_counter;
/* Descriptor for each producer/consumer pair. */
typedef struct test_item
{
- pj_sock_t server_fd,
- client_fd;
- pj_ioqueue_t *ioqueue;
- pj_ioqueue_key_t *server_key,
- *client_key;
- pj_size_t buffer_size;
- char *outgoing_buffer;
- char *incoming_buffer;
- pj_size_t bytes_sent,
- bytes_recv;
+ pj_sock_t server_fd,
+ client_fd;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_key_t *server_key,
+ *client_key;
+ pj_ioqueue_op_key_t recv_op,
+ send_op;
+ int has_pending_send;
+ pj_size_t buffer_size;
+ char *outgoing_buffer;
+ char *incoming_buffer;
+ pj_size_t bytes_sent,
+ bytes_recv;
} test_item;
/* Callback when data has been read.
* Increment item->bytes_recv and ready to read the next data.
*/
-static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
{
test_item *item = pj_ioqueue_get_user_data(key);
- pj_status_t rc;
+ pj_status_t rc;
+ int data_is_available = 1;
//TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
+
+ do {
+ if (thread_quit_flag)
+ return;
+
+ if (bytes_read < 0) {
+ pj_status_t rc = -bytes_read;
+ char errmsg[128];
+
+ if (rc != last_error) {
+ last_error = rc;
+ pj_strerror(rc, errmsg, sizeof(errmsg));
+ PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",
+ bytes_read, errmsg));
+ PJ_LOG(3,(THIS_FILE,
+ ".....additional info: total read=%u, total written=%u",
+ item->bytes_recv, item->bytes_sent));
+ } else {
+ last_error_counter++;
+ }
+ bytes_read = 0;
+
+ } else if (bytes_read == 0) {
+ PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
+ }
- if (thread_quit_flag)
- return;
-
- if (bytes_read < 0) {
- pj_status_t rc = -bytes_read;
- char errmsg[128];
-
- if (rc != last_error) {
- last_error = rc;
- pj_strerror(rc, errmsg, sizeof(errmsg));
- PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",
- bytes_read, errmsg));
- PJ_LOG(3,(THIS_FILE,
- ".....additional info: total read=%u, total written=%u",
- item->bytes_recv, item->bytes_sent));
- } else {
- last_error_counter++;
- }
- bytes_read = 0;
-
- } else if (bytes_read == 0) {
- PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
- }
-
- item->bytes_recv += bytes_read;
+ item->bytes_recv += bytes_read;
- /* To assure that the test quits, even if main thread
- * doesn't have time to run.
- */
- if (item->bytes_recv > item->buffer_size * 10000)
- thread_quit_flag = 1;
-
- rc = pj_ioqueue_recv( item->ioqueue, item->server_key,
- item->incoming_buffer, item->buffer_size, 0 );
-
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- if (rc != last_error) {
- last_error = rc;
- app_perror("...error: read error", rc);
- } else {
- last_error_counter++;
- }
- }
+ /* To assure that the test quits, even if main thread
+ * doesn't have time to run.
+ */
+ if (item->bytes_recv > item->buffer_size * 10000)
+ thread_quit_flag = 1;
+
+ bytes_read = item->buffer_size;
+ rc = pj_ioqueue_recv( key, op_key,
+ item->incoming_buffer, &bytes_read, 0 );
+
+ if (rc == PJ_SUCCESS) {
+ data_is_available = 1;
+ } else if (rc == PJ_EPENDING) {
+ data_is_available = 0;
+ } else {
+ data_is_available = 0;
+ if (rc != last_error) {
+ last_error = rc;
+ app_perror("...error: read error", rc);
+ } else {
+ last_error_counter++;
+ }
+ }
+
+ if (!item->has_pending_send) {
+ pj_ssize_t sent = item->buffer_size;
+ rc = pj_ioqueue_send(item->client_key, &item->send_op,
+ item->outgoing_buffer, &sent, 0);
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: write error", rc);
+ }
+
+ item->has_pending_send = (rc==PJ_EPENDING);
+ }
+
+ } while (data_is_available);
}
/* Callback when data has been written.
* Increment item->bytes_sent and write the next data.
*/
-static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
+static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
{
test_item *item = pj_ioqueue_get_user_data(key);
@@ -112,7 +140,8 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
if (thread_quit_flag)
return;
-
+
+ item->has_pending_send = 0;
item->bytes_sent += bytes_sent;
if (bytes_sent <= 0) {
@@ -121,12 +150,15 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
}
else {
pj_status_t rc;
-
- rc = pj_ioqueue_write(item->ioqueue, item->client_key,
- item->outgoing_buffer, item->buffer_size);
+
+ bytes_sent = item->buffer_size;
+ rc = pj_ioqueue_send( item->client_key, op_key,
+ item->outgoing_buffer, &bytes_sent, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: write error", rc);
- }
+ }
+
+ item->has_pending_send = (rc==PJ_EPENDING);
}
}
@@ -191,7 +223,7 @@ static int perform_test(int sock_type, const char *type_name,
thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
TRACE_((THIS_FILE, " creating ioqueue.."));
- rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt, &ioqueue);
+ rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create ioqueue", rc);
return -15;
@@ -199,6 +231,7 @@ static int perform_test(int sock_type, const char *type_name,
/* Initialize each producer-consumer pair. */
for (i=0; i<sockpair_cnt; ++i) {
+ pj_ssize_t bytes;
items[i].ioqueue = ioqueue;
items[i].buffer_size = buffer_size;
@@ -241,24 +274,27 @@ static int perform_test(int sock_type, const char *type_name,
}
/* Start reading. */
- TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
- rc = pj_ioqueue_recv(ioqueue, items[i].server_key,
- items[i].incoming_buffer, items[i].buffer_size,
+ TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
+ bytes = items[i].buffer_size;
+ rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
+ items[i].incoming_buffer, &bytes,
0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ if (rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_recv", rc);
return -73;
}
/* Start writing. */
- TRACE_((THIS_FILE, " pj_ioqueue_write.."));
- rc = pj_ioqueue_write(ioqueue, items[i].client_key,
- items[i].outgoing_buffer, items[i].buffer_size);
+ TRACE_((THIS_FILE, " pj_ioqueue_write.."));
+ bytes = items[i].buffer_size;
+ rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
+ items[i].outgoing_buffer, &bytes, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_write", rc);
return -76;
}
-
+
+ items[i].has_pending_send = (rc==PJ_EPENDING);
}
/* Create the threads. */
@@ -324,8 +360,8 @@ static int perform_test(int sock_type, const char *type_name,
/* Close all sockets. */
TRACE_((THIS_FILE, " closing all sockets.."));
for (i=0; i<sockpair_cnt; ++i) {
- pj_ioqueue_unregister(ioqueue, items[i].server_key);
- pj_ioqueue_unregister(ioqueue, items[i].client_key);
+ pj_ioqueue_unregister(items[i].server_key);
+ pj_ioqueue_unregister(items[i].client_key);
pj_sock_close(items[i].server_fd);
pj_sock_close(items[i].client_fd);
}
diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c
index ebce633b..fd5329e5 100644
--- a/pjlib/src/pjlib-test/ioq_tcp.c
+++ b/pjlib/src/pjlib-test/ioq_tcp.c
@@ -31,33 +31,45 @@
#define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)
#define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
-static pj_ssize_t callback_read_size,
- callback_write_size,
- callback_accept_status,
- callback_connect_status;
-static pj_ioqueue_key_t*callback_read_key,
- *callback_write_key,
- *callback_accept_key,
- *callback_connect_key;
-
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static pj_ssize_t callback_read_size,
+ callback_write_size,
+ callback_accept_status,
+ callback_connect_status;
+static pj_ioqueue_key_t *callback_read_key,
+ *callback_write_key,
+ *callback_accept_key,
+ *callback_connect_key;
+static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op;
+
+static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
{
- callback_read_key = key;
+ callback_read_key = key;
+ callback_read_op = op_key;
callback_read_size = bytes_read;
}
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written)
+static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written)
{
- callback_write_key = key;
+ callback_write_key = key;
+ callback_write_op = op_key;
callback_write_size = bytes_written;
}
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock,
+static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
int status)
{
PJ_UNUSED_ARG(sock);
- callback_accept_key = key;
+ callback_accept_key = key;
+ callback_accept_op = op_key;
callback_accept_status = status;
}
@@ -83,28 +95,38 @@ static int send_recv_test(pj_ioqueue_t *ioque,
pj_ssize_t bufsize,
pj_timestamp *t_elapsed)
{
- int rc;
- pj_ssize_t bytes;
+ pj_status_t status;
+ pj_ssize_t bytes;
+ pj_time_val timeout;
pj_timestamp t1, t2;
- int pending_op = 0;
-
- // Start reading on the server side.
- rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize);
- if (rc != 0 && rc != PJ_EPENDING) {
+ int pending_op = 0;
+ pj_ioqueue_op_key_t read_op, write_op;
+
+ // Start reading on the server side.
+ bytes = bufsize;
+ status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
+ if (status != PJ_SUCCESS && status != PJ_EPENDING) {
+ app_perror("...pj_ioqueue_recv error", status);
return -100;
}
-
- ++pending_op;
+
+ if (status == PJ_EPENDING)
+ ++pending_op;
+ else {
+ /* Does not expect to return error or immediate data. */
+ return -115;
+ }
// Randomize send buffer.
pj_create_random_string((char*)send_buf, bufsize);
- // Starts send on the client side.
- bytes = pj_ioqueue_write(ioque, ckey, send_buf, bufsize);
- if (bytes != bufsize && bytes != PJ_EPENDING) {
+ // Starts send on the client side.
+ bytes = bufsize;
+ status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0);
+ if (status != PJ_SUCCESS && bytes != PJ_EPENDING) {
return -120;
}
- if (bytes == PJ_EPENDING) {
+ if (status == PJ_EPENDING) {
++pending_op;
}
@@ -113,37 +135,52 @@ static int send_recv_test(pj_ioqueue_t *ioque,
// Reset indicators
callback_read_size = callback_write_size = 0;
- callback_read_key = callback_write_key = NULL;
+ callback_read_key = callback_write_key = NULL;
+ callback_read_op = callback_write_op = NULL;
// Poll the queue until we've got completion event in the server side.
- rc = 0;
- while (pending_op > 0) {
- rc = pj_ioqueue_poll(ioque, NULL);
- if (rc > 0) {
+ status = 0;
+ while (pending_op > 0) {
+ timeout.sec = 1; timeout.msec = 0;
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status > 0) {
if (callback_read_size) {
- if (callback_read_size != bufsize) {
+ if (callback_read_size != bufsize)
return -160;
- }
if (callback_read_key != skey)
- return -161;
+ return -161;
+ if (callback_read_op != &read_op)
+ return -162;
}
if (callback_write_size) {
if (callback_write_key != ckey)
- return -162;
+ return -163;
+ if (callback_write_op != &write_op)
+ return -164;
}
- pending_op -= rc;
- }
- if (rc < 0) {
+ pending_op -= status;
+ }
+ if (status == 0) {
+ PJ_LOG(3,("", "...error: timed out"));
+ }
+ if (status < 0) {
return -170;
}
}
+
+ // Pending op is zero.
+ // Subsequent poll should yield zero too.
+ timeout.sec = timeout.msec = 0;
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0)
+ return -173;
// End time.
pj_get_timestamp(&t2);
t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo);
- if (rc < 0) {
- return -150;
+ if (status < 0) {
+ return -176;
}
// Compare recv buffer with send buffer.
@@ -167,7 +204,8 @@ static int compliance_test_0(void)
pj_pool_t *pool = NULL;
char *send_buf, *recv_buf;
pj_ioqueue_t *ioque = NULL;
- pj_ioqueue_key_t *skey, *ckey0, *ckey1;
+ pj_ioqueue_key_t *skey, *ckey0, *ckey1;
+ pj_ioqueue_op_key_t accept_op;
int bufsize = BUF_MIN_SIZE;
pj_ssize_t status = -1;
int pending_op = 0;
@@ -205,7 +243,7 @@ static int compliance_test_0(void)
}
// Create I/O Queue.
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_ioqueue_create()", rc);
status=-20; goto on_error;
@@ -231,7 +269,8 @@ static int compliance_test_0(void)
// Server socket accept()
client_addr_len = sizeof(pj_sockaddr_in);
- status = pj_ioqueue_accept(ioque, skey, &csock0, &client_addr, &rmt_addr, &client_addr_len);
+ status = pj_ioqueue_accept(skey, &accept_op, &csock0,
+ &client_addr, &rmt_addr, &client_addr_len);
if (status != PJ_EPENDING) {
app_perror("...ERROR in pj_ioqueue_accept()", rc);
status=-30; goto on_error;
@@ -247,7 +286,7 @@ static int compliance_test_0(void)
addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
// Client socket connect()
- status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr));
+ status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
app_perror("...ERROR in pj_ioqueue_connect()", rc);
status=-40; goto on_error;
@@ -262,6 +301,7 @@ static int compliance_test_0(void)
callback_read_key = callback_write_key =
callback_accept_key = callback_connect_key = NULL;
+ callback_accept_op = callback_read_op = callback_write_op = NULL;
while (pending_op) {
pj_time_val timeout = {1, 0};
@@ -273,8 +313,12 @@ static int compliance_test_0(void)
status=-41; goto on_error;
}
if (callback_accept_key != skey) {
- status=-41; goto on_error;
- }
+ status=-42; goto on_error;
+ }
+ if (callback_accept_op != &accept_op) {
+ status=-43; goto on_error;
+ }
+ callback_accept_status = -2;
}
if (callback_connect_status != -2) {
@@ -283,7 +327,8 @@ static int compliance_test_0(void)
}
if (callback_connect_key != ckey1) {
status=-51; goto on_error;
- }
+ }
+ callback_connect_status = -2;
}
pending_op -= status;
@@ -293,6 +338,16 @@ static int compliance_test_0(void)
}
}
}
+
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
// Check accepted socket.
if (csock0 == PJ_INVALID_SOCKET) {
@@ -312,7 +367,8 @@ static int compliance_test_0(void)
// Test send and receive.
t_elapsed.u32.lo = 0;
- status = send_recv_test(ioque, ckey0, ckey1, send_buf, recv_buf, bufsize, &t_elapsed);
+ status = send_recv_test(ioque, ckey0, ckey1, send_buf,
+ recv_buf, bufsize, &t_elapsed);
if (status != 0) {
goto on_error;
}
@@ -354,7 +410,7 @@ static int compliance_test_1(void)
pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
// Create I/O Queue.
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (!ioque) {
status=-20; goto on_error;
}
@@ -381,7 +437,7 @@ static int compliance_test_1(void)
addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
// Client socket connect()
- status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr));
+ status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
if (status==PJ_SUCCESS) {
// unexpectedly success!
status = -30;
@@ -416,7 +472,17 @@ static int compliance_test_1(void)
}
}
}
-
+
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
+
// Success
status = 0;
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index a59dac88..6ee90e42 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -34,31 +34,43 @@
#undef TRACE_
#define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg))
-static pj_ssize_t callback_read_size,
- callback_write_size,
- callback_accept_status,
- callback_connect_status;
-static pj_ioqueue_key_t *callback_read_key,
- *callback_write_key,
- *callback_accept_key,
- *callback_connect_key;
-
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static pj_ssize_t callback_read_size,
+ callback_write_size,
+ callback_accept_status,
+ callback_connect_status;
+static pj_ioqueue_key_t *callback_read_key,
+ *callback_write_key,
+ *callback_accept_key,
+ *callback_connect_key;
+static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op;
+
+static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
{
- callback_read_key = key;
+ callback_read_key = key;
+ callback_read_op = op_key;
callback_read_size = bytes_read;
}
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written)
+static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written)
{
- callback_write_key = key;
+ callback_write_key = key;
+ callback_write_op = op_key;
callback_write_size = bytes_written;
}
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status)
+static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock, int status)
{
PJ_UNUSED_ARG(sock);
- callback_accept_key = key;
+ callback_accept_key = key;
+ callback_accept_op = op_key;
callback_accept_status = status;
}
@@ -83,29 +95,6 @@ static pj_ioqueue_callback test_cb =
#endif
/*
- * native_format_test()
- * This is just a simple test to verify that various structures in sock.h
- * are really compatible with operating system's definitions.
- */
-static int native_format_test(void)
-{
- pj_status_t rc;
-
- // Test that PJ_INVALID_SOCKET is working.
- {
- pj_sock_t sock;
- rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock);
- if (rc == PJ_SUCCESS)
- return -1020;
- }
-
- // Previous func will set errno var.
- pj_set_os_error(PJ_SUCCESS);
-
- return 0;
-}
-
-/*
* compliance_test()
* To test that the basic IOQueue functionality works. It will just exchange
* data between two sockets.
@@ -118,7 +107,8 @@ static int compliance_test(void)
pj_pool_t *pool = NULL;
char *send_buf, *recv_buf;
pj_ioqueue_t *ioque = NULL;
- pj_ioqueue_key_t *skey, *ckey;
+ pj_ioqueue_key_t *skey, *ckey;
+ pj_ioqueue_op_key_t read_op, write_op;
int bufsize = BUF_MIN_SIZE;
pj_ssize_t bytes, status = -1;
pj_str_t temp;
@@ -157,8 +147,7 @@ static int compliance_test(void)
// Create I/O Queue.
TRACE_("create ioqueue...");
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,
- PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
status=-20; goto on_error;
}
@@ -194,12 +183,14 @@ static int compliance_test(void)
// Register reading from ioqueue.
TRACE_("start recvfrom...");
- addrlen = sizeof(addr);
- bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0,
- &addr, &addrlen);
- if (bytes < 0 && bytes != PJ_EPENDING) {
+ addrlen = sizeof(addr);
+ bytes = bufsize;
+ rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0,
+ &addr, &addrlen);
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_recvfrom", rc);
status=-28; goto on_error;
- } else if (bytes == PJ_EPENDING) {
+ } else if (rc == PJ_EPENDING) {
recv_pending = 1;
PJ_LOG(3, (THIS_FILE,
"......ok: recvfrom returned pending"));
@@ -210,14 +201,14 @@ static int compliance_test(void)
}
// Write must return the number of bytes.
- TRACE_("start sendto...");
- bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr,
- sizeof(addr));
- if (bytes != bufsize && bytes != PJ_EPENDING) {
- PJ_LOG(1,(THIS_FILE,
- "......error: sendto returned %d", bytes));
+ TRACE_("start sendto...");
+ bytes = bufsize;
+ rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr,
+ sizeof(addr));
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_sendto", rc);
status=-30; goto on_error;
- } else if (bytes == PJ_EPENDING) {
+ } else if (rc == PJ_EPENDING) {
send_pending = 1;
PJ_LOG(3, (THIS_FILE,
"......ok: sendto returned pending"));
@@ -232,9 +223,10 @@ static int compliance_test(void)
callback_accept_status = callback_connect_status = -2;
callback_read_key = callback_write_key =
callback_accept_key = callback_connect_key = NULL;
+ callback_read_op = callback_write_op = NULL;
// Poll if pending.
- while (send_pending && recv_pending) {
+ while (send_pending || recv_pending) {
int rc;
pj_time_val timeout = { 5, 0 };
@@ -253,9 +245,11 @@ static int compliance_test(void)
if (callback_read_size != bufsize) {
status=-61; goto on_error;
}
-
if (callback_read_key != skey) {
status=-65; goto on_error;
+ }
+ if (callback_read_op != &read_op) {
+ status=-66; goto on_error;
}
if (memcmp(send_buf, recv_buf, bufsize) != 0) {
@@ -270,9 +264,11 @@ static int compliance_test(void)
if (callback_write_size != bufsize) {
status=-73; goto on_error;
}
-
if (callback_write_key != ckey) {
status=-75; goto on_error;
+ }
+ if (callback_write_op != &write_op) {
+ status=-76; goto on_error;
}
send_pending = 0;
@@ -326,9 +322,7 @@ static int many_handles_test(void)
sock = pj_pool_alloc(pool, MAX*sizeof(pj_sock_t));
/* Create IOQueue */
- rc = pj_ioqueue_create(pool, MAX,
- PJ_IOQUEUE_DEFAULT_THREADS,
- &ioqueue);
+ rc = pj_ioqueue_create(pool, MAX, &ioqueue);
if (rc != PJ_SUCCESS || ioqueue == NULL) {
app_perror("...error in pj_ioqueue_create", rc);
return -10;
@@ -358,7 +352,7 @@ static int many_handles_test(void)
/* Now deregister and close all handles. */
for (i=0; i<count; ++i) {
- rc = pj_ioqueue_unregister(ioqueue, key[i]);
+ rc = pj_ioqueue_unregister(key[i]);
if (rc != PJ_SUCCESS) {
app_perror("...error in pj_ioqueue_unregister", rc);
}
@@ -392,7 +386,8 @@ static int bench_test(int bufsize, int inactive_sock_count)
pj_sock_t ssock=-1, csock=-1;
pj_sockaddr_in addr;
pj_pool_t *pool = NULL;
- pj_sock_t *inactive_sock=NULL;
+ pj_sock_t *inactive_sock=NULL;
+ pj_ioqueue_op_key_t *inactive_read_op;
char *send_buf, *recv_buf;
pj_ioqueue_t *ioque = NULL;
pj_ioqueue_key_t *skey, *ckey, *key;
@@ -429,8 +424,7 @@ static int bench_test(int bufsize, int inactive_sock_count)
pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES);
// Create I/O Queue.
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,
- PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
app_perror("...error: pj_ioqueue_create()", rc);
goto on_error;
@@ -439,10 +433,14 @@ static int bench_test(int bufsize, int inactive_sock_count)
// Allocate inactive sockets, and bind them to some arbitrary address.
// Then register them to the I/O queue, and start a read operation.
inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,
- inactive_sock_count*sizeof(pj_sock_t));
+ inactive_sock_count*sizeof(pj_sock_t));
+ inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,
+ inactive_sock_count*sizeof(pj_ioqueue_op_key_t));
memset(&addr, 0, sizeof(addr));
addr.sin_family = PJ_AF_INET;
- for (i=0; i<inactive_sock_count; ++i) {
+ for (i=0; i<inactive_sock_count; ++i) {
+ pj_ssize_t bytes;
+
rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]);
if (rc != PJ_SUCCESS || inactive_sock[i] < 0) {
app_perror("...error: pj_sock_socket()", rc);
@@ -462,8 +460,9 @@ static int bench_test(int bufsize, int inactive_sock_count)
app_perror("...error(1): pj_ioqueue_register_sock()", rc);
PJ_LOG(3,(THIS_FILE, "....i=%d", i));
goto on_error;
- }
- rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize);
+ }
+ bytes = bufsize;
+ rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0);
if ( rc < 0 && rc != PJ_EPENDING) {
pj_sock_close(inactive_sock[i]);
inactive_sock[i] = PJ_INVALID_SOCKET;
@@ -495,22 +494,25 @@ static int bench_test(int bufsize, int inactive_sock_count)
// Test loop.
t_elapsed.u64 = 0;
for (i=0; i<LOOP; ++i) {
- pj_ssize_t bytes;
+ pj_ssize_t bytes;
+ pj_ioqueue_op_key_t read_op, write_op;
// Randomize send buffer.
pj_create_random_string(send_buf, bufsize);
- // Start reading on the server side.
- rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize);
+ // Start reading on the server side.
+ bytes = bufsize;
+ rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
if (rc < 0 && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_read()", rc);
break;
}
- // Starts send on the client side.
- bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0,
- &addr, sizeof(addr));
- if (bytes != bufsize && bytes != PJ_EPENDING) {
+ // Starts send on the client side.
+ bytes = bufsize;
+ rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0,
+ &addr, sizeof(addr));
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_write()", bytes);
rc = -1;
break;
@@ -585,7 +587,7 @@ on_error:
pj_sock_close(csock);
for (i=0; i<inactive_sock_count && inactive_sock &&
inactive_sock[i]!=PJ_INVALID_SOCKET; ++i)
- {
+ {
pj_sock_close(inactive_sock[i]);
}
if (ioque != NULL)
@@ -599,11 +601,6 @@ int udp_ioqueue_test()
int status;
int bufsize, sock_count;
- PJ_LOG(3, (THIS_FILE, "...format test"));
- if ((status = native_format_test()) != 0)
- return status;
- PJ_LOG(3, (THIS_FILE, "....native format test ok"));
-
PJ_LOG(3, (THIS_FILE, "...compliance test"));
if ((status=compliance_test()) != 0) {
return status;
diff --git a/pjlib/src/pjlib-test/main.c b/pjlib/src/pjlib-test/main.c
index 96acc925..6a764e64 100644
--- a/pjlib/src/pjlib-test/main.c
+++ b/pjlib/src/pjlib-test/main.c
@@ -11,7 +11,8 @@ extern const char *param_echo_server;
extern int param_echo_port;
-#if defined(PJ_WIN32) && PJ_WIN32!=0
+//#if defined(PJ_WIN32) && PJ_WIN32!=0
+#if 0
#include <windows.h>
static void boost(void)
{
diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c
index 5e804f69..e9a62e33 100644
--- a/pjlib/src/pjlib-test/test.c
+++ b/pjlib/src/pjlib-test/test.c
@@ -121,10 +121,10 @@ int test_inner(void)
#if PJ_HAS_TCP && INCLUDE_TCP_IOQUEUE_TEST
DO_TEST( tcp_ioqueue_test() );
#endif
-
-#if INCLUDE_IOQUEUE_PERF_TEST
- DO_TEST( ioqueue_perf_test() );
-#endif
+
+#if INCLUDE_IOQUEUE_PERF_TEST
+ DO_TEST( ioqueue_perf_test() );
+#endif
#if INCLUDE_XML_TEST
DO_TEST( xml_test() );
diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h
index 8efe20d0..f4408513 100644
--- a/pjlib/src/pjlib-test/test.h
+++ b/pjlib/src/pjlib-test/test.h
@@ -8,7 +8,7 @@
#define GROUP_LIBC 0
#define GROUP_OS 0
#define GROUP_DATA_STRUCTURE 0
-#define GROUP_NETWORK 0
+#define GROUP_NETWORK 1
#define GROUP_EXTRA 0
#define INCLUDE_ERRNO_TEST GROUP_LIBC
@@ -30,13 +30,13 @@
#define INCLUDE_SOCK_PERF_TEST GROUP_NETWORK
#define INCLUDE_SELECT_TEST GROUP_NETWORK
#define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK
-#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
+#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK
#define INCLUDE_XML_TEST GROUP_EXTRA
-
#define INCLUDE_ECHO_SERVER 0
-#define INCLUDE_ECHO_CLIENT 1
+#define INCLUDE_ECHO_CLIENT 0
+
#define ECHO_SERVER_MAX_THREADS 4
#define ECHO_SERVER_START_PORT 65000
@@ -66,12 +66,16 @@ extern int sock_test(void);
extern int sock_perf_test(void);
extern int select_test(void);
extern int udp_ioqueue_test(void);
-extern int tcp_ioqueue_test(void);
+extern int tcp_ioqueue_test(void);
extern int ioqueue_perf_test(void);
extern int xml_test(void);
extern int echo_server(void);
extern int echo_client(int sock_type, const char *server, int port);
+
+extern int echo_srv_sync(void);
+extern int udp_echo_srv_ioqueue(void);
+extern int echo_srv_common_loop(pj_atomic_t *bytes_counter);
extern pj_pool_factory *mem;
diff --git a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
new file mode 100644
index 00000000..5fe6d6f2
--- /dev/null
+++ b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
@@ -0,0 +1,181 @@
+/* $Id$
+ */
+#include <pjlib.h>
+#include "test.h"
+
+static pj_ioqueue_key_t *key;
+static pj_atomic_t *total_bytes;
+
+struct op_key
+{
+ pj_ioqueue_op_key_t op_key_;
+ struct op_key *peer;
+ char *buffer;
+ pj_size_t size;
+ int is_pending;
+ pj_status_t last_err;
+};
+
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_received)
+{
+ pj_status_t rc;
+ struct op_key *recv_rec = (struct op_key *)op_key;
+
+ for (;;) {
+ struct op_key *send_rec = recv_rec->peer;
+ recv_rec->is_pending = 0;
+
+ if (bytes_received < 0) {
+ PJ_LOG(3,("","...error receiving data, received=%d",
+ bytes_received));
+ } else if (bytes_received == 0) {
+ /* note: previous error, or write callback */
+ } else {
+ pj_atomic_add(total_bytes, bytes_received);
+
+ if (!send_rec->is_pending) {
+ pj_ssize_t sent = bytes_received;
+ pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
+ rc = pj_ioqueue_send(key, &send_rec->op_key_,
+ send_rec->buffer, &sent, 0);
+ send_rec->is_pending = (rc==PJ_EPENDING);
+
+ if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
+ app_perror("...send error", rc);
+ }
+ }
+ }
+
+ if (!send_rec->is_pending) {
+ bytes_received = recv_rec->size;
+ rc = pj_ioqueue_recv(key, &recv_rec->op_key_,
+ recv_rec->buffer, &bytes_received, 0);
+ recv_rec->is_pending = (rc==PJ_EPENDING);
+ if (rc == PJ_SUCCESS) {
+ /* fall through next loop. */
+ } else if (rc == PJ_EPENDING) {
+ /* quit callback. */
+ break;
+ } else {
+ /* error */
+ app_perror("...recv error", rc);
+ recv_rec->last_err = rc;
+
+ bytes_received = 0;
+ /* fall through next loop. */
+ }
+ } else {
+ /* recv will be done when write completion callback is called. */
+ break;
+ }
+ }
+}
+
+static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
+{
+ struct op_key *send_rec = (struct op_key*)op_key;
+
+ if (bytes_sent <= 0) {
+ pj_status_t rc = pj_get_netos_error();
+ app_perror("...send error", rc);
+ }
+
+ send_rec->is_pending = 0;
+ on_read_complete(key, &send_rec->peer->op_key_, 0);
+}
+
+static int worker_thread(void *arg)
+{
+ pj_ioqueue_t *ioqueue = arg;
+ struct op_key read_op, write_op;
+ char recv_buf[512], send_buf[512];
+ pj_ssize_t length;
+ pj_status_t rc;
+
+ read_op.peer = &write_op;
+ read_op.is_pending = 0;
+ read_op.last_err = 0;
+ read_op.buffer = recv_buf;
+ read_op.size = sizeof(recv_buf);
+ write_op.peer = &read_op;
+ write_op.is_pending = 0;
+ write_op.last_err = 0;
+ write_op.buffer = send_buf;
+ write_op.size = sizeof(send_buf);
+
+ length = sizeof(recv_buf);
+ rc = pj_ioqueue_recv(key, &read_op.op_key_, recv_buf, &length, 0);
+ if (rc == PJ_SUCCESS) {
+ read_op.is_pending = 1;
+ on_read_complete(key, &read_op.op_key_, length);
+ }
+
+ for (;;) {
+ pj_time_val timeout;
+ timeout.sec = 0; timeout.msec = 10;
+ rc = pj_ioqueue_poll(ioqueue, &timeout);
+ }
+}
+
+int udp_echo_srv_ioqueue(void)
+{
+ pj_pool_t *pool;
+ pj_sock_t sock;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback callback;
+ int i;
+ pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
+ pj_status_t rc;
+
+ pj_memset(&callback, 0, sizeof(callback));
+ callback.on_read_complete = &on_read_complete;
+ callback.on_write_complete = &on_write_complete;
+
+ pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
+ if (!pool)
+ return -10;
+
+ rc = pj_ioqueue_create(pool, 2, &ioqueue);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...pj_ioqueue_create error", rc);
+ return -20;
+ }
+
+ rc = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0,
+ ECHO_SERVER_START_PORT, &sock);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...app_socket error", rc);
+ return -30;
+ }
+
+ rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
+ &callback, &key);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error registering socket", rc);
+ return -40;
+ }
+
+ rc = pj_atomic_create(pool, 0, &total_bytes);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error creating atomic variable", rc);
+ return -45;
+ }
+
+ for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
+ rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
+ PJ_THREAD_DEFAULT_STACK_SIZE, 0,
+ &thread[i]);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...create thread error", rc);
+ return -50;
+ }
+ }
+
+ echo_srv_common_loop(total_bytes);
+
+ return 0;
+}
diff --git a/pjlib/src/pjlib-test/udp_echo_srv_sync.c b/pjlib/src/pjlib-test/udp_echo_srv_sync.c
index 0e73b134..19ee702c 100644
--- a/pjlib/src/pjlib-test/udp_echo_srv_sync.c
+++ b/pjlib/src/pjlib-test/udp_echo_srv_sync.c
@@ -8,7 +8,7 @@ static pj_atomic_t *total_bytes;
static int worker_thread(void *arg)
{
pj_sock_t sock = (pj_sock_t)arg;
- char buf[1516];
+ char buf[512];
pj_status_t last_recv_err = PJ_SUCCESS, last_write_err = PJ_SUCCESS;
for (;;) {
@@ -48,9 +48,6 @@ int echo_srv_sync(void)
pj_sock_t sock;
pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
pj_status_t rc;
- pj_highprec_t last_received, avg_bw, highest_bw;
- pj_time_val last_print;
- unsigned count;
int i;
pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
@@ -83,25 +80,36 @@ int echo_srv_sync(void)
ECHO_SERVER_MAX_THREADS, ECHO_SERVER_START_PORT));
PJ_LOG(3,("", "...Press Ctrl-C to abort"));
+ echo_srv_common_loop(total_bytes);
+ return 0;
+}
+
+
+int echo_srv_common_loop(pj_atomic_t *bytes_counter)
+{
+ pj_highprec_t last_received, avg_bw, highest_bw;
+ pj_time_val last_print;
+ unsigned count;
+
last_received = 0;
pj_gettimeofday(&last_print);
avg_bw = highest_bw = 0;
- count = 0;
-
+ count = 0;
+
for (;;) {
- pj_highprec_t received, cur_received, bw;
+ pj_highprec_t received, cur_received, bw;
unsigned msec;
pj_time_val now, duration;
pj_thread_sleep(1000);
-
- received = cur_received = pj_atomic_get(total_bytes);
+
+ received = cur_received = pj_atomic_get(bytes_counter);
cur_received = cur_received - last_received;
-
+
pj_gettimeofday(&now);
duration = now;
PJ_TIME_VAL_SUB(duration, last_print);
- msec = PJ_TIME_VAL_MSEC(duration);
+ msec = PJ_TIME_VAL_MSEC(duration);
bw = cur_received;
pj_highprec_mul(bw, 1000);
@@ -113,13 +121,13 @@ int echo_srv_sync(void)
avg_bw = avg_bw + bw;
count++;
- PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s (avg=%u KB/s) %s",
- ECHO_SERVER_MAX_THREADS,
- (unsigned)(bw / 1000),
- (unsigned)(avg_bw / count / 1000),
- (count==20 ? "<ses avg>" : "")));
-
- if (count==20) {
+ PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s (avg=%u KB/s) %s",
+ ECHO_SERVER_MAX_THREADS,
+ (unsigned)(bw / 1000),
+ (unsigned)(avg_bw / count / 1000),
+ (count==20 ? "<ses avg>" : "")));
+
+ if (count==20) {
if (avg_bw/count > highest_bw)
highest_bw = avg_bw/count;
@@ -127,9 +135,9 @@ int echo_srv_sync(void)
avg_bw = 0;
PJ_LOG(3,("", "Highest average bandwidth=%u KB/s",
- (unsigned)(highest_bw/1000)));
- }
- }
-}
-
-
+ (unsigned)(highest_bw/1000)));
+ }
+ }
+}
+
+