diff options
author | Benny Prijono <bennylp@teluu.com> | 2008-05-11 18:12:16 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2008-05-11 18:12:16 +0000 |
commit | 0ee6805a9757610af92f19a88d142276586a41ef (patch) | |
tree | 16071cefcbd45c40e65f78a20d4cb7a93b7cff98 /pjlib/src | |
parent | 15eb39a3eaec0bd8260e6cce261c197110b6a916 (diff) |
Implement ticket #531: active socket abstraction to make ioqueue programming easier
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1953 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src')
-rw-r--r-- | pjlib/src/pj/activesock.c | 594 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/activesock.c | 268 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/test.c | 4 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/test.h | 2 |
4 files changed, 868 insertions, 0 deletions
diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c new file mode 100644 index 00000000..10c47abe --- /dev/null +++ b/pjlib/src/pj/activesock.c @@ -0,0 +1,594 @@ +/* $Id$ */ +/* + * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include <pj/activesock.h> +#include <pj/assert.h> +#include <pj/errno.h> +#include <pj/pool.h> +#include <pj/sock.h> +#include <pj/string.h> + +#define PJ_ACTIVESOCK_MAX_LOOP 50 + + +enum read_type +{ + TYPE_NONE, + TYPE_RECV, + TYPE_RECV_FROM +}; + +struct read_op +{ + pj_ioqueue_op_key_t op_key; + pj_uint8_t *pkt; + unsigned max_size; + pj_size_t size; + pj_sockaddr src_addr; + int src_addr_len; +}; + +struct accept_op +{ + pj_ioqueue_op_key_t op_key; + pj_sock_t new_sock; + pj_sockaddr rem_addr; + int rem_addr_len; +}; + +struct pj_activesock_t +{ + pj_ioqueue_key_t *key; + pj_bool_t stream_oriented; + pj_ioqueue_t *ioqueue; + void *user_data; + unsigned async_count; + unsigned max_loop; + pj_activesock_cb cb; + + struct read_op *read_op; + pj_uint32_t read_flags; + enum read_type read_type; + + struct accept_op *accept_op; +}; + + +static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); +static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_sent); +static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t sock, + pj_status_t status); +static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, + pj_status_t status); + + +PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg) +{ + pj_bzero(cfg, sizeof(*cfg)); + cfg->async_cnt = 1; + cfg->concurrency = -1; +} + + +PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, + pj_sock_t sock, + int sock_type, + const pj_activesock_cfg *opt, + pj_ioqueue_t *ioqueue, + const pj_activesock_cb *cb, + pj_activesock_t **p_asock) +{ + pj_activesock_t *asock; + pj_ioqueue_callback ioq_cb; + pj_status_t status; + + PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL); + PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL); + PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() || + sock_type==pj_SOCK_DGRAM(), PJ_EINVAL); + PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL); + + asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t); + asock->ioqueue = ioqueue; + asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); + asock->async_count = (opt? opt->async_cnt : 1); + asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; + pj_memcpy(&asock->cb, cb, sizeof(*cb)); + + pj_bzero(&ioq_cb, sizeof(ioq_cb)); + ioq_cb.on_read_complete = &ioqueue_on_read_complete; + ioq_cb.on_write_complete = &ioqueue_on_write_complete; + ioq_cb.on_connect_complete = &ioqueue_on_connect_complete; + ioq_cb.on_accept_complete = &ioqueue_on_accept_complete; + + status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock, + &ioq_cb, &asock->key); + if (status != PJ_SUCCESS) { + pj_activesock_close(asock); + return status; + } + + if (opt && opt->concurrency >= 0) { + pj_ioqueue_set_concurrency(asock->key, opt->concurrency); + } + + *p_asock = asock; + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool, + const pj_sockaddr *addr, + const pj_activesock_cfg *opt, + pj_ioqueue_t *ioqueue, + const pj_activesock_cb *cb, + pj_activesock_t **p_asock, + pj_sockaddr *bound_addr) +{ + pj_sock_t sock_fd; + pj_sockaddr default_addr; + pj_status_t status; + + if (addr == NULL) { + pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0); + addr = &default_addr; + } + + status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0, + &sock_fd); + if (status != PJ_SUCCESS) { + return status; + } + + status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr)); + if (status != PJ_SUCCESS) { + pj_sock_close(sock_fd); + return status; + } + + status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt, + ioqueue, cb, p_asock); + if (status != PJ_SUCCESS) { + pj_sock_close(sock_fd); + return status; + } + + if (bound_addr) { + int addr_len = sizeof(*bound_addr); + status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len); + if (status != PJ_SUCCESS) { + pj_activesock_close(*p_asock); + return status; + } + } + + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock) +{ + PJ_ASSERT_RETURN(asock, PJ_EINVAL); + if (asock->key) { + pj_ioqueue_unregister(asock->key); + asock->key = NULL; + } + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock, + void *user_data) +{ + PJ_ASSERT_RETURN(asock, PJ_EINVAL); + asock->user_data = user_data; + return PJ_SUCCESS; +} + + +PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock) +{ + PJ_ASSERT_RETURN(asock, NULL); + return asock->user_data; +} + + +PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock, + pj_pool_t *pool, + unsigned buff_size, + pj_uint32_t flags) +{ + unsigned i; + pj_status_t status; + + PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); + PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP); + + asock->read_op = (struct read_op*) + pj_pool_calloc(pool, asock->async_count, + sizeof(struct read_op)); + asock->read_type = TYPE_RECV; + asock->read_flags = flags; + + for (i=0; i<asock->async_count; ++i) { + struct read_op *r = &asock->read_op[i]; + pj_ssize_t size_to_read; + + r->pkt = pj_pool_alloc(pool, buff_size); + r->max_size = size_to_read = buff_size; + + status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read, + PJ_IOQUEUE_ALWAYS_ASYNC | flags); + PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); + + if (status != PJ_EPENDING) + return status; + } + + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock, + pj_pool_t *pool, + unsigned buff_size, + pj_uint32_t flags) +{ + unsigned i; + pj_status_t status; + + PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); + PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); + + asock->read_op = (struct read_op*) + pj_pool_calloc(pool, asock->async_count, + sizeof(struct read_op)); + asock->read_type = TYPE_RECV_FROM; + asock->read_flags = flags; + + for (i=0; i<asock->async_count; ++i) { + struct read_op *r = &asock->read_op[i]; + pj_ssize_t size_to_read; + + r->pkt = pj_pool_alloc(pool, buff_size); + r->max_size = size_to_read = buff_size; + r->src_addr_len = sizeof(r->src_addr); + + status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt, + &size_to_read, + PJ_IOQUEUE_ALWAYS_ASYNC | flags, + &r->src_addr, &r->src_addr_len); + PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); + + if (status != PJ_EPENDING) + return status; + } + + return PJ_SUCCESS; +} + + +static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + pj_activesock_t *asock; + struct read_op *r = (struct read_op*)op_key; + unsigned loop = 0; + pj_status_t status; + + asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + + do { + unsigned flags; + + if (bytes_read > 0) { + /* + * We've got new data. + */ + pj_size_t remainder; + pj_bool_t ret; + + /* Append this new data to existing data. If socket is stream + * oriented, user might have left some data in the buffer. + * Otherwise if socket is datagram there will be nothing in + * existing packet hence the packet will contain only the new + * packet. + */ + r->size += bytes_read; + + /* Set default remainder to zero */ + remainder = 0; + + /* And return value to TRUE */ + ret = PJ_TRUE; + + /* Notify callback */ + if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { + ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size, + PJ_SUCCESS, &remainder); + } else if (asock->read_type == TYPE_RECV_FROM && + asock->cb.on_data_recvfrom) + { + ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size, + &r->src_addr, + r->src_addr_len, + PJ_SUCCESS); + } + + /* If callback returns false, we have been destroyed! */ + if (!ret) + return; + + /* Only stream oriented socket may leave data in the packet */ + if (asock->stream_oriented) { + r->size = remainder; + } else { + r->size = 0; + } + + } else if (bytes_read <= 0) { + + pj_size_t remainder; + pj_bool_t ret; + + if (bytes_read == 0) { + /* For stream/connection oriented socket, this means the + * connection has been closed. For datagram sockets, it means + * we've received datagram with zero length. + */ + if (asock->stream_oriented) + status = PJ_EEOF; + else + status = PJ_SUCCESS; + } else { + /* This means we've got an error. If this is stream/connection + * oriented, it means connection has been closed. For datagram + * sockets, it means we've got some error (e.g. EWOULDBLOCK). + */ + status = -bytes_read; + } + + /* Set default remainder to zero */ + remainder = 0; + + /* And return value to TRUE */ + ret = PJ_TRUE; + + /* Notify callback */ + if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { + /* For connection oriented socket, we still need to report + * the remainder data (if any) to the user to let user do + * processing with the remainder data before it closes the + * connection. + * If there is no remainder data, set the packet to NULL. + */ + ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL), + r->size, status, &remainder); + + } else if (asock->read_type == TYPE_RECV_FROM && + asock->cb.on_data_recvfrom) + { + /* This would always be datagram oriented hence there's + * nothing in the packet. We can't be sure if there will be + * anything useful in the source_addr, so just put NULL + * there too. + */ + ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0, + NULL, 0, status); + } + + /* If callback returns false, we have been destroyed! */ + if (!ret) + return; + + /* Only stream oriented socket may leave data in the packet */ + if (asock->stream_oriented) { + r->size = remainder; + } else { + r->size = 0; + } + } + + /* Read next data. We limit ourselves to processing max_loop immediate + * data, so when the loop counter has exceeded this value, force the + * read()/recvfrom() to return pending operation to allow the program + * to do other jobs. + */ + bytes_read = r->max_size - r->size; + flags = asock->read_flags; + if (++loop >= asock->max_loop) + flags |= PJ_IOQUEUE_ALWAYS_ASYNC; + + if (asock->read_type == TYPE_RECV) { + status = pj_ioqueue_recv(key, op_key, r->pkt + r->size, + &bytes_read, flags); + } else { + r->src_addr_len = sizeof(r->src_addr); + status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size, + &bytes_read, flags, + &r->src_addr, &r->src_addr_len); + } + + } while (status != PJ_EPENDING && status != PJ_ECANCELLED); + +} + + +PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, + pj_ioqueue_op_key_t *send_key, + const void *data, + pj_ssize_t *size, + unsigned flags) +{ + PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); + + return pj_ioqueue_send(asock->key, send_key, data, size, flags); +} + + +PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock, + pj_ioqueue_op_key_t *send_key, + const void *data, + pj_ssize_t *size, + unsigned flags, + const pj_sockaddr_t *addr, + int addr_len) +{ + PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, + PJ_EINVAL); + + return pj_ioqueue_sendto(asock->key, send_key, data, size, flags, + addr, addr_len); +} + + +static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_sent) +{ + pj_activesock_t *asock; + + asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + + if (asock->cb.on_data_sent) { + pj_bool_t ret; + + ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent); + + /* If callback returns false, we have been destroyed! */ + if (!ret) + return; + } +} + + +PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock, + pj_pool_t *pool) +{ + unsigned i; + + PJ_ASSERT_RETURN(asock, PJ_EINVAL); + PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP); + + asock->accept_op = (struct accept_op*) + pj_pool_calloc(pool, asock->async_count, + sizeof(struct accept_op)); + for (i=0; i<asock->async_count; ++i) { + struct accept_op *a = &asock->accept_op[i]; + pj_status_t status; + + do { + a->new_sock = PJ_INVALID_SOCKET; + a->rem_addr_len = sizeof(a->rem_addr); + + status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock, + NULL, &a->rem_addr, &a->rem_addr_len); + if (status == PJ_SUCCESS) { + /* We've got immediate connection. Not sure if it's a good + * idea to call the callback now (probably application will + * not be prepared to process it), so lets just silently + * close the socket. + */ + pj_sock_close(a->new_sock); + } + } while (status == PJ_SUCCESS); + + if (status != PJ_EPENDING) { + return status; + } + } + + return PJ_SUCCESS; +} + + +static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t new_sock, + pj_status_t status) +{ + pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + struct accept_op *accept_op = (struct accept_op*) op_key; + + do { + if (status==PJ_SUCCESS && asock->cb.on_accept_complete) { + pj_bool_t ret; + + /* Notify callback */ + ret = (*asock->cb.on_accept_complete)(asock, new_sock, + &accept_op->rem_addr, + accept_op->rem_addr_len); + + /* If callback returns false, we have been destroyed! */ + if (!ret) + return; + + } else if (status==PJ_SUCCESS) { + /* Application doesn't handle the new socket, we need to + * close it to avoid resource leak. + */ + pj_sock_close(accept_op->new_sock); + } + + /* Prepare next accept() */ + accept_op->new_sock = PJ_INVALID_SOCKET; + accept_op->rem_addr_len = sizeof(accept_op->rem_addr); + + status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock, + NULL, &accept_op->rem_addr, + &accept_op->rem_addr_len); + + } while (status != PJ_EPENDING && status != PJ_ECANCELLED); +} + + +PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock, + pj_pool_t *pool, + const pj_sockaddr_t *remaddr, + int addr_len) +{ + PJ_UNUSED_ARG(pool); + return pj_ioqueue_connect(asock->key, remaddr, addr_len); +} + + +static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, + pj_status_t status) +{ + pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + + if (asock->cb.on_connect_complete) { + pj_bool_t ret; + + ret = (*asock->cb.on_connect_complete)(asock, status); + + if (!ret) { + /* We've been destroyed */ + return; + } + } +} + diff --git a/pjlib/src/pjlib-test/activesock.c b/pjlib/src/pjlib-test/activesock.c new file mode 100644 index 00000000..0e38eea9 --- /dev/null +++ b/pjlib/src/pjlib-test/activesock.c @@ -0,0 +1,268 @@ +/* $Id$ */ +/* + * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "test.h" +#include <pjlib.h> + +/** + * \page page_pjlib_activesock_test Test: Active Socket + * + * This file is <b>pjlib-test/activesock.c</b> + * + * \include pjlib-test/activesock.c + */ + +#if INCLUDE_ACTIVESOCK_TEST + +#define THIS_FILE "activesock.c" + + +/******************************************************************* + * Simple UDP echo server. + */ +struct udp_echo_srv +{ + pj_activesock_t *asock; + pj_bool_t echo_enabled; + pj_uint16_t port; + pj_ioqueue_op_key_t send_key; + pj_status_t status; + unsigned rx_cnt; + unsigned rx_err_cnt, tx_err_cnt; +}; + +static void udp_echo_err(const char *title, pj_status_t status) +{ + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(3,(THIS_FILE, " error: %s: %s", title, errmsg)); +} + +static pj_bool_t udp_echo_srv_on_data_recvfrom(pj_activesock_t *asock, + void *data, + pj_size_t size, + const pj_sockaddr_t *src_addr, + int addr_len, + pj_status_t status) +{ + struct udp_echo_srv *srv; + pj_ssize_t sent; + + + srv = (struct udp_echo_srv*) pj_activesock_get_user_data(asock); + + if (status != PJ_SUCCESS) { + srv->status = status; + srv->rx_err_cnt++; + udp_echo_err("recvfrom() callback", status); + return PJ_TRUE; + } + + srv->rx_cnt++; + + /* Send back if echo is enabled */ + if (srv->echo_enabled) { + sent = size; + srv->status = pj_activesock_sendto(asock, &srv->send_key, data, + &sent, 0, + src_addr, addr_len); + if (srv->status != PJ_SUCCESS) { + srv->tx_err_cnt++; + udp_echo_err("sendto()", status); + } + } + + return PJ_TRUE; +} + + +static pj_status_t udp_echo_srv_create(pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_bool_t enable_echo, + struct udp_echo_srv **p_srv) +{ + struct udp_echo_srv *srv; + pj_sock_t sock_fd = PJ_INVALID_SOCKET; + pj_sockaddr addr; + int addr_len; + pj_activesock_cb activesock_cb; + pj_status_t status; + + srv = PJ_POOL_ZALLOC_T(pool, struct udp_echo_srv); + srv->echo_enabled = enable_echo; + + pj_sockaddr_in_init(&addr.ipv4, NULL, 0); + addr_len = sizeof(addr); + + pj_bzero(&activesock_cb, sizeof(activesock_cb)); + activesock_cb.on_data_recvfrom = &udp_echo_srv_on_data_recvfrom; + + status = pj_activesock_create_udp(pool, &addr, NULL, ioqueue, &activesock_cb, + &srv->asock, &addr); + if (status != PJ_SUCCESS) { + pj_sock_close(sock_fd); + udp_echo_err("pj_activesock_create()", status); + return status; + } + + srv->port = pj_ntohs(addr.ipv4.sin_port); + + pj_activesock_set_user_data(srv->asock, srv); + pj_ioqueue_op_key_init(&srv->send_key, sizeof(srv->send_key)); + + status = pj_activesock_start_recvfrom(srv->asock, pool, 32, 0); + if (status != PJ_SUCCESS) { + pj_activesock_close(srv->asock); + udp_echo_err("pj_activesock_start_recvfrom()", status); + return status; + } + + + *p_srv = srv; + return PJ_SUCCESS; +} + +static void udp_echo_srv_destroy(struct udp_echo_srv *srv) +{ + pj_activesock_close(srv->asock); +} + +/******************************************************************* + * UDP ping pong test (send packet back and forth between two UDP echo + * servers. + */ +static int udp_ping_pong_test(void) +{ + pj_ioqueue_t *ioqueue = NULL; + pj_pool_t *pool = NULL; + struct udp_echo_srv *srv1=NULL, *srv2=NULL; + pj_bool_t need_send = PJ_TRUE; + unsigned data = 0; + int count, ret; + pj_status_t status; + + pool = pj_pool_create(mem, "pingpong", 512, 512, NULL); + if (!pool) + return -10; + + status = pj_ioqueue_create(pool, 4, &ioqueue); + if (status != PJ_SUCCESS) { + ret = -20; + udp_echo_err("pj_ioqueue_create()", status); + goto on_return; + } + + status = udp_echo_srv_create(pool, ioqueue, PJ_TRUE, &srv1); + if (status != PJ_SUCCESS) { + ret = -30; + goto on_return; + } + + status = udp_echo_srv_create(pool, ioqueue, PJ_TRUE, &srv2); + if (status != PJ_SUCCESS) { + ret = -40; + goto on_return; + } + + /* initiate the first send */ + for (count=0; count<1000; ++count) { + unsigned last_rx1, last_rx2; + unsigned i; + + if (need_send) { + pj_str_t loopback; + pj_sockaddr_in addr; + pj_ssize_t sent; + + ++data; + + sent = sizeof(data); + loopback = pj_str("127.0.0.1"); + pj_sockaddr_in_init(&addr, &loopback, srv2->port); + status = pj_activesock_sendto(srv1->asock, &srv1->send_key, + &data, &sent, 0, + &addr, sizeof(addr)); + if (status != PJ_SUCCESS && status != PJ_EPENDING) { + ret = -50; + udp_echo_err("sendto()", status); + goto on_return; + } + + need_send = PJ_FALSE; + } + + last_rx1 = srv1->rx_cnt; + last_rx2 = srv2->rx_cnt; + + for (i=0; i<10 && last_rx1 == srv1->rx_cnt && last_rx2 == srv2->rx_cnt; ++i) { + pj_time_val delay = {0, 10}; + pj_ioqueue_poll(ioqueue, &delay); + } + + if (srv1->rx_err_cnt+srv1->tx_err_cnt != 0 || + srv2->rx_err_cnt+srv2->tx_err_cnt != 0) + { + /* Got error */ + ret = -60; + goto on_return; + } + + if (last_rx1 == srv1->rx_cnt && last_rx2 == srv2->rx_cnt) { + /* Packet lost */ + ret = -70; + udp_echo_err("packets have been lost", PJ_ETIMEDOUT); + goto on_return; + } + } + + ret = 0; + +on_return: + if (srv2) + udp_echo_srv_destroy(srv2); + if (srv1) + udp_echo_srv_destroy(srv1); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return ret; +} + + +int activesock_test(void) +{ + int ret; + + PJ_LOG(3,("", "..udp ping/pong test")); + ret = udp_ping_pong_test(); + if (ret != 0) + return ret; + + return 0; +} + +#else /* INCLUDE_ACTIVESOCK_TEST */ +/* To prevent warning about "translation unit is empty" + * when this test is disabled. + */ +int dummy_active_sock_test; +#endif /* INCLUDE_ACTIVESOCK_TEST */ + diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c index 0202a3da..949cf243 100644 --- a/pjlib/src/pjlib-test/test.c +++ b/pjlib/src/pjlib-test/test.c @@ -154,6 +154,10 @@ int test_inner(void) DO_TEST( udp_ioqueue_unreg_test() ); #endif +#if INCLUDE_ACTIVESOCK_TEST + DO_TEST( activesock_test() ); +#endif + #if INCLUDE_FILE_TEST DO_TEST( file_test() ); #endif diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h index 327fb0ee..89afc8fe 100644 --- a/pjlib/src/pjlib-test/test.h +++ b/pjlib/src/pjlib-test/test.h @@ -51,6 +51,7 @@ #define INCLUDE_SELECT_TEST GROUP_NETWORK #define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK #define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK +#define INCLUDE_ACTIVESOCK_TEST GROUP_NETWORK #define INCLUDE_IOQUEUE_PERF_TEST (PJ_HAS_THREADS && GROUP_NETWORK) #define INCLUDE_IOQUEUE_UNREG_TEST (PJ_HAS_THREADS && GROUP_NETWORK) #define INCLUDE_FILE_TEST GROUP_FILE @@ -90,6 +91,7 @@ extern int udp_ioqueue_test(void); extern int udp_ioqueue_unreg_test(void); extern int tcp_ioqueue_test(void); extern int ioqueue_perf_test(void); +extern int activesock_test(void); extern int file_test(void); extern int echo_server(void); |