summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/activesock.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-01-07 14:24:28 -0600
committerDavid M. Lee <dlee@digium.com>2013-01-07 14:24:28 -0600
commitf3ab456a17af1c89a6e3be4d20c5944853df1cb0 (patch)
treed00e1a332cd038a6d906a1ea0ac91e1a4458e617 /pjlib/src/pj/activesock.c
Import pjproject-2.0.1
Diffstat (limited to 'pjlib/src/pj/activesock.c')
-rw-r--r--pjlib/src/pj/activesock.c882
1 files changed, 882 insertions, 0 deletions
diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c
new file mode 100644
index 0000000..5c91383
--- /dev/null
+++ b/pjlib/src/pj/activesock.c
@@ -0,0 +1,882 @@
+/* $Id: activesock.c 3553 2011-05-05 06:14:19Z nanang $ */
+/*
+ * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
+ * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pj/activesock.h>
+#include <pj/compat/socket.h>
+#include <pj/assert.h>
+#include <pj/errno.h>
+#include <pj/log.h>
+#include <pj/pool.h>
+#include <pj/sock.h>
+#include <pj/string.h>
+
+#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
+ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
+# include <CFNetwork/CFNetwork.h>
+
+ static pj_bool_t ios_bg_support = PJ_TRUE;
+#endif
+
+#define PJ_ACTIVESOCK_MAX_LOOP 50
+
+
+enum read_type
+{
+ TYPE_NONE,
+ TYPE_RECV,
+ TYPE_RECV_FROM
+};
+
+struct read_op
+{
+ pj_ioqueue_op_key_t op_key;
+ pj_uint8_t *pkt;
+ unsigned max_size;
+ pj_size_t size;
+ pj_sockaddr src_addr;
+ int src_addr_len;
+};
+
+struct accept_op
+{
+ pj_ioqueue_op_key_t op_key;
+ pj_sock_t new_sock;
+ pj_sockaddr rem_addr;
+ int rem_addr_len;
+};
+
+struct send_data
+{
+ pj_uint8_t *data;
+ pj_ssize_t len;
+ pj_ssize_t sent;
+ unsigned flags;
+};
+
+struct pj_activesock_t
+{
+ pj_ioqueue_key_t *key;
+ pj_bool_t stream_oriented;
+ pj_bool_t whole_data;
+ pj_ioqueue_t *ioqueue;
+ void *user_data;
+ unsigned async_count;
+ unsigned max_loop;
+ pj_activesock_cb cb;
+#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
+ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
+ int bg_setting;
+ pj_sock_t sock;
+ CFReadStreamRef readStream;
+#endif
+
+ unsigned err_counter;
+ pj_status_t last_err;
+
+ struct send_data send_data;
+
+ struct read_op *read_op;
+ pj_uint32_t read_flags;
+ enum read_type read_type;
+
+ struct accept_op *accept_op;
+};
+
+
+static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent);
+#if PJ_HAS_TCP
+static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
+ pj_status_t status);
+static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
+ pj_status_t status);
+#endif
+
+PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
+{
+ pj_bzero(cfg, sizeof(*cfg));
+ cfg->async_cnt = 1;
+ cfg->concurrency = -1;
+ cfg->whole_data = PJ_TRUE;
+}
+
+#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
+ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
+static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
+{
+ if (asock->readStream) {
+ CFReadStreamClose(asock->readStream);
+ CFRelease(asock->readStream);
+ asock->readStream = NULL;
+ }
+}
+
+static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
+{
+ if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
+ activesock_destroy_iphone_os_stream(asock);
+
+ CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
+ &asock->readStream, NULL);
+
+ if (!asock->readStream ||
+ CFReadStreamSetProperty(asock->readStream,
+ kCFStreamNetworkServiceType,
+ kCFStreamNetworkServiceTypeVoIP)
+ != TRUE ||
+ CFReadStreamOpen(asock->readStream) != TRUE)
+ {
+ PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
+ "usage. Background mode will not be supported."));
+
+ activesock_destroy_iphone_os_stream(asock);
+ }
+ }
+}
+
+
+PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
+ int val)
+{
+ asock->bg_setting = val;
+ if (asock->bg_setting)
+ activesock_create_iphone_os_stream(asock);
+ else
+ activesock_destroy_iphone_os_stream(asock);
+}
+
+PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
+{
+ ios_bg_support = val;
+}
+#endif
+
+PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
+ pj_sock_t sock,
+ int sock_type,
+ const pj_activesock_cfg *opt,
+ pj_ioqueue_t *ioqueue,
+ const pj_activesock_cb *cb,
+ void *user_data,
+ pj_activesock_t **p_asock)
+{
+ pj_activesock_t *asock;
+ pj_ioqueue_callback ioq_cb;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
+ PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
+ PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
+ sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
+ PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
+
+ asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
+ asock->ioqueue = ioqueue;
+ asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
+ asock->async_count = (opt? opt->async_cnt : 1);
+ asock->whole_data = (opt? opt->whole_data : 1);
+ asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
+ asock->user_data = user_data;
+ pj_memcpy(&asock->cb, cb, sizeof(*cb));
+
+ pj_bzero(&ioq_cb, sizeof(ioq_cb));
+ ioq_cb.on_read_complete = &ioqueue_on_read_complete;
+ ioq_cb.on_write_complete = &ioqueue_on_write_complete;
+#if PJ_HAS_TCP
+ ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
+ ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
+#endif
+
+ status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
+ &ioq_cb, &asock->key);
+ if (status != PJ_SUCCESS) {
+ pj_activesock_close(asock);
+ return status;
+ }
+
+ if (asock->whole_data) {
+ /* Must disable concurrency otherwise there is a race condition */
+ pj_ioqueue_set_concurrency(asock->key, 0);
+ } else if (opt && opt->concurrency >= 0) {
+ pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
+ }
+
+#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
+ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
+ asock->sock = sock;
+ asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
+#endif
+
+ *p_asock = asock;
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
+ const pj_sockaddr *addr,
+ const pj_activesock_cfg *opt,
+ pj_ioqueue_t *ioqueue,
+ const pj_activesock_cb *cb,
+ void *user_data,
+ pj_activesock_t **p_asock,
+ pj_sockaddr *bound_addr)
+{
+ pj_sock_t sock_fd;
+ pj_sockaddr default_addr;
+ pj_status_t status;
+
+ if (addr == NULL) {
+ pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
+ addr = &default_addr;
+ }
+
+ status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
+ &sock_fd);
+ if (status != PJ_SUCCESS) {
+ return status;
+ }
+
+ status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(sock_fd);
+ return status;
+ }
+
+ status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
+ ioqueue, cb, user_data, p_asock);
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(sock_fd);
+ return status;
+ }
+
+ if (bound_addr) {
+ int addr_len = sizeof(*bound_addr);
+ status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
+ if (status != PJ_SUCCESS) {
+ pj_activesock_close(*p_asock);
+ return status;
+ }
+ }
+
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
+{
+ PJ_ASSERT_RETURN(asock, PJ_EINVAL);
+ if (asock->key) {
+#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
+ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
+ activesock_destroy_iphone_os_stream(asock);
+#endif
+
+ pj_ioqueue_unregister(asock->key);
+ asock->key = NULL;
+ }
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
+ void *user_data)
+{
+ PJ_ASSERT_RETURN(asock, PJ_EINVAL);
+ asock->user_data = user_data;
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
+{
+ PJ_ASSERT_RETURN(asock, NULL);
+ return asock->user_data;
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
+ pj_pool_t *pool,
+ unsigned buff_size,
+ pj_uint32_t flags)
+{
+ void **readbuf;
+ unsigned i;
+
+ PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
+
+ readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
+ sizeof(void*));
+
+ for (i=0; i<asock->async_count; ++i) {
+ readbuf[i] = pj_pool_alloc(pool, buff_size);
+ }
+
+ return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
+ pj_pool_t *pool,
+ unsigned buff_size,
+ void *readbuf[],
+ pj_uint32_t flags)
+{
+ unsigned i;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
+ PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
+ PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
+
+ asock->read_op = (struct read_op*)
+ pj_pool_calloc(pool, asock->async_count,
+ sizeof(struct read_op));
+ asock->read_type = TYPE_RECV;
+ asock->read_flags = flags;
+
+ for (i=0; i<asock->async_count; ++i) {
+ struct read_op *r = &asock->read_op[i];
+ pj_ssize_t size_to_read;
+
+ r->pkt = (pj_uint8_t*)readbuf[i];
+ r->max_size = size_to_read = buff_size;
+
+ status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
+ PJ_IOQUEUE_ALWAYS_ASYNC | flags);
+ PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
+
+ if (status != PJ_EPENDING)
+ return status;
+ }
+
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
+ pj_pool_t *pool,
+ unsigned buff_size,
+ pj_uint32_t flags)
+{
+ void **readbuf;
+ unsigned i;
+
+ PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
+
+ readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
+ sizeof(void*));
+
+ for (i=0; i<asock->async_count; ++i) {
+ readbuf[i] = pj_pool_alloc(pool, buff_size);
+ }
+
+ return pj_activesock_start_recvfrom2(asock, pool, buff_size,
+ readbuf, flags);
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
+ pj_pool_t *pool,
+ unsigned buff_size,
+ void *readbuf[],
+ pj_uint32_t flags)
+{
+ unsigned i;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
+ PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
+
+ asock->read_op = (struct read_op*)
+ pj_pool_calloc(pool, asock->async_count,
+ sizeof(struct read_op));
+ asock->read_type = TYPE_RECV_FROM;
+ asock->read_flags = flags;
+
+ for (i=0; i<asock->async_count; ++i) {
+ struct read_op *r = &asock->read_op[i];
+ pj_ssize_t size_to_read;
+
+ r->pkt = (pj_uint8_t*) readbuf[i];
+ r->max_size = size_to_read = buff_size;
+ r->src_addr_len = sizeof(r->src_addr);
+
+ status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
+ &size_to_read,
+ PJ_IOQUEUE_ALWAYS_ASYNC | flags,
+ &r->src_addr, &r->src_addr_len);
+ PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
+
+ if (status != PJ_EPENDING)
+ return status;
+ }
+
+ return PJ_SUCCESS;
+}
+
+
+static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ pj_activesock_t *asock;
+ struct read_op *r = (struct read_op*)op_key;
+ unsigned loop = 0;
+ pj_status_t status;
+
+ asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+
+ do {
+ unsigned flags;
+
+ if (bytes_read > 0) {
+ /*
+ * We've got new data.
+ */
+ pj_size_t remainder;
+ pj_bool_t ret;
+
+ /* Append this new data to existing data. If socket is stream
+ * oriented, user might have left some data in the buffer.
+ * Otherwise if socket is datagram there will be nothing in
+ * existing packet hence the packet will contain only the new
+ * packet.
+ */
+ r->size += bytes_read;
+
+ /* Set default remainder to zero */
+ remainder = 0;
+
+ /* And return value to TRUE */
+ ret = PJ_TRUE;
+
+ /* Notify callback */
+ if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
+ ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
+ PJ_SUCCESS, &remainder);
+ } else if (asock->read_type == TYPE_RECV_FROM &&
+ asock->cb.on_data_recvfrom)
+ {
+ ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
+ &r->src_addr,
+ r->src_addr_len,
+ PJ_SUCCESS);
+ }
+
+ /* If callback returns false, we have been destroyed! */
+ if (!ret)
+ return;
+
+ /* Only stream oriented socket may leave data in the packet */
+ if (asock->stream_oriented) {
+ r->size = remainder;
+ } else {
+ r->size = 0;
+ }
+
+ } else if (bytes_read <= 0 &&
+ -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
+ -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
+ (asock->stream_oriented ||
+ -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
+ {
+ pj_size_t remainder;
+ pj_bool_t ret;
+
+ if (bytes_read == 0) {
+ /* For stream/connection oriented socket, this means the
+ * connection has been closed. For datagram sockets, it means
+ * we've received datagram with zero length.
+ */
+ if (asock->stream_oriented)
+ status = PJ_EEOF;
+ else
+ status = PJ_SUCCESS;
+ } else {
+ /* This means we've got an error. If this is stream/connection
+ * oriented, it means connection has been closed. For datagram
+ * sockets, it means we've got some error (e.g. EWOULDBLOCK).
+ */
+ status = -bytes_read;
+ }
+
+ /* Set default remainder to zero */
+ remainder = 0;
+
+ /* And return value to TRUE */
+ ret = PJ_TRUE;
+
+ /* Notify callback */
+ if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
+ /* For connection oriented socket, we still need to report
+ * the remainder data (if any) to the user to let user do
+ * processing with the remainder data before it closes the
+ * connection.
+ * If there is no remainder data, set the packet to NULL.
+ */
+
+ /* Shouldn't set the packet to NULL, as there may be active
+ * socket user, such as SSL socket, that needs to have access
+ * to the read buffer packet.
+ */
+ //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
+ // r->size, status, &remainder);
+ ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
+ status, &remainder);
+
+ } else if (asock->read_type == TYPE_RECV_FROM &&
+ asock->cb.on_data_recvfrom)
+ {
+ /* This would always be datagram oriented hence there's
+ * nothing in the packet. We can't be sure if there will be
+ * anything useful in the source_addr, so just put NULL
+ * there too.
+ */
+ /* In some scenarios, status may be PJ_SUCCESS. The upper
+ * layer application may not expect the callback to be called
+ * with successful status and NULL data, so lets not call the
+ * callback if the status is PJ_SUCCESS.
+ */
+ if (status != PJ_SUCCESS ) {
+ ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
+ NULL, 0, status);
+ }
+ }
+
+ /* If callback returns false, we have been destroyed! */
+ if (!ret)
+ return;
+
+ /* Only stream oriented socket may leave data in the packet */
+ if (asock->stream_oriented) {
+ r->size = remainder;
+ } else {
+ r->size = 0;
+ }
+ }
+
+ /* Read next data. We limit ourselves to processing max_loop immediate
+ * data, so when the loop counter has exceeded this value, force the
+ * read()/recvfrom() to return pending operation to allow the program
+ * to do other jobs.
+ */
+ bytes_read = r->max_size - r->size;
+ flags = asock->read_flags;
+ if (++loop >= asock->max_loop)
+ flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
+
+ if (asock->read_type == TYPE_RECV) {
+ status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
+ &bytes_read, flags);
+ } else {
+ r->src_addr_len = sizeof(r->src_addr);
+ status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
+ &bytes_read, flags,
+ &r->src_addr, &r->src_addr_len);
+ }
+
+ if (status == PJ_SUCCESS) {
+ /* Immediate data */
+ ;
+ } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
+ /* Error */
+ bytes_read = -status;
+ } else {
+ break;
+ }
+ } while (1);
+
+}
+
+
+static pj_status_t send_remaining(pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *send_key)
+{
+ struct send_data *sd = (struct send_data*)send_key->activesock_data;
+ pj_status_t status;
+
+ do {
+ pj_ssize_t size;
+
+ size = sd->len - sd->sent;
+ status = pj_ioqueue_send(asock->key, send_key,
+ sd->data+sd->sent, &size, sd->flags);
+ if (status != PJ_SUCCESS) {
+ /* Pending or error */
+ break;
+ }
+
+ sd->sent += size;
+ if (sd->sent == sd->len) {
+ /* The whole data has been sent. */
+ return PJ_SUCCESS;
+ }
+
+ } while (sd->sent < sd->len);
+
+ return status;
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *send_key,
+ const void *data,
+ pj_ssize_t *size,
+ unsigned flags)
+{
+ PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
+
+ send_key->activesock_data = NULL;
+
+ if (asock->whole_data) {
+ pj_ssize_t whole;
+ pj_status_t status;
+
+ whole = *size;
+
+ status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ if (status != PJ_SUCCESS) {
+ /* Pending or error */
+ return status;
+ }
+
+ if (*size == whole) {
+ /* The whole data has been sent. */
+ return PJ_SUCCESS;
+ }
+
+ /* Data was partially sent */
+ asock->send_data.data = (pj_uint8_t*)data;
+ asock->send_data.len = whole;
+ asock->send_data.sent = *size;
+ asock->send_data.flags = flags;
+ send_key->activesock_data = &asock->send_data;
+
+ /* Try again */
+ status = send_remaining(asock, send_key);
+ if (status == PJ_SUCCESS) {
+ *size = whole;
+ }
+ return status;
+
+ } else {
+ return pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ }
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *send_key,
+ const void *data,
+ pj_ssize_t *size,
+ unsigned flags,
+ const pj_sockaddr_t *addr,
+ int addr_len)
+{
+ PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
+ PJ_EINVAL);
+
+ return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
+ addr, addr_len);
+}
+
+
+static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
+{
+ pj_activesock_t *asock;
+
+ asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+
+ if (bytes_sent > 0 && op_key->activesock_data) {
+ /* whole_data is requested. Make sure we send all the data */
+ struct send_data *sd = (struct send_data*)op_key->activesock_data;
+
+ sd->sent += bytes_sent;
+ if (sd->sent == sd->len) {
+ /* all has been sent */
+ bytes_sent = sd->sent;
+ op_key->activesock_data = NULL;
+ } else {
+ /* send remaining data */
+ pj_status_t status;
+
+ status = send_remaining(asock, op_key);
+ if (status == PJ_EPENDING)
+ return;
+ else if (status == PJ_SUCCESS)
+ bytes_sent = sd->sent;
+ else
+ bytes_sent = -status;
+
+ op_key->activesock_data = NULL;
+ }
+ }
+
+ if (asock->cb.on_data_sent) {
+ pj_bool_t ret;
+
+ ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
+
+ /* If callback returns false, we have been destroyed! */
+ if (!ret)
+ return;
+ }
+}
+
+#if PJ_HAS_TCP
+PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
+ pj_pool_t *pool)
+{
+ unsigned i;
+
+ PJ_ASSERT_RETURN(asock, PJ_EINVAL);
+ PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
+
+ asock->accept_op = (struct accept_op*)
+ pj_pool_calloc(pool, asock->async_count,
+ sizeof(struct accept_op));
+ for (i=0; i<asock->async_count; ++i) {
+ struct accept_op *a = &asock->accept_op[i];
+ pj_status_t status;
+
+ do {
+ a->new_sock = PJ_INVALID_SOCKET;
+ a->rem_addr_len = sizeof(a->rem_addr);
+
+ status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
+ NULL, &a->rem_addr, &a->rem_addr_len);
+ if (status == PJ_SUCCESS) {
+ /* We've got immediate connection. Not sure if it's a good
+ * idea to call the callback now (probably application will
+ * not be prepared to process it), so lets just silently
+ * close the socket.
+ */
+ pj_sock_close(a->new_sock);
+ }
+ } while (status == PJ_SUCCESS);
+
+ if (status != PJ_EPENDING) {
+ return status;
+ }
+ }
+
+ return PJ_SUCCESS;
+}
+
+
+static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t new_sock,
+ pj_status_t status)
+{
+ pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+ struct accept_op *accept_op = (struct accept_op*) op_key;
+
+ PJ_UNUSED_ARG(new_sock);
+
+ do {
+ if (status == asock->last_err && status != PJ_SUCCESS) {
+ asock->err_counter++;
+ if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
+ PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
+ " operation, stopping further ioqueue accepts.",
+ asock->err_counter, asock->last_err));
+ return;
+ }
+ } else {
+ asock->err_counter = 0;
+ asock->last_err = status;
+ }
+
+ if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
+ pj_bool_t ret;
+
+ /* Notify callback */
+ ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
+ &accept_op->rem_addr,
+ accept_op->rem_addr_len);
+
+ /* If callback returns false, we have been destroyed! */
+ if (!ret)
+ return;
+
+#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
+ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
+ activesock_create_iphone_os_stream(asock);
+#endif
+ } else if (status==PJ_SUCCESS) {
+ /* Application doesn't handle the new socket, we need to
+ * close it to avoid resource leak.
+ */
+ pj_sock_close(accept_op->new_sock);
+ }
+
+ /* Prepare next accept() */
+ accept_op->new_sock = PJ_INVALID_SOCKET;
+ accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
+
+ status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
+ NULL, &accept_op->rem_addr,
+ &accept_op->rem_addr_len);
+
+ } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
+}
+
+
+PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
+ pj_pool_t *pool,
+ const pj_sockaddr_t *remaddr,
+ int addr_len)
+{
+ PJ_UNUSED_ARG(pool);
+ return pj_ioqueue_connect(asock->key, remaddr, addr_len);
+}
+
+static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
+ pj_status_t status)
+{
+ pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+
+ if (asock->cb.on_connect_complete) {
+ pj_bool_t ret;
+
+ ret = (*asock->cb.on_connect_complete)(asock, status);
+
+ if (!ret) {
+ /* We've been destroyed */
+ return;
+ }
+
+#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
+ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
+ activesock_create_iphone_os_stream(asock);
+#endif
+
+ }
+}
+#endif /* PJ_HAS_TCP */
+