summaryrefslogtreecommitdiff
path: root/pjsip/src/pjsip/sip_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjsip/src/pjsip/sip_transport.c')
-rw-r--r--pjsip/src/pjsip/sip_transport.c679
1 files changed, 368 insertions, 311 deletions
diff --git a/pjsip/src/pjsip/sip_transport.c b/pjsip/src/pjsip/sip_transport.c
index 65b8ec45..09eb6cfb 100644
--- a/pjsip/src/pjsip/sip_transport.c
+++ b/pjsip/src/pjsip/sip_transport.c
@@ -1,17 +1,18 @@
/* $Id$
- *
*/
#include <pjsip/sip_transport.h>
#include <pjsip/sip_endpoint.h>
#include <pjsip/sip_parser.h>
#include <pjsip/sip_msg.h>
#include <pjsip/sip_private.h>
+#include <pjsip/sip_errno.h>
#include <pj/os.h>
#include <pj/log.h>
#include <pj/ioqueue.h>
#include <pj/hash.h>
#include <pj/string.h>
#include <pj/pool.h>
+#include <pj/assert.h>
#define MGR_IDLE_CHECK_INTERVAL 30
#define MGR_HASH_TABLE_SIZE PJSIP_MAX_DIALOG_COUNT
@@ -44,7 +45,7 @@ struct pjsip_transport_t
/** Standard list members, for chaining the transport in the
* listener list.
*/
- PJ_DECL_LIST_MEMBER(struct pjsip_transport_t)
+ PJ_DECL_LIST_MEMBER(struct pjsip_transport_t);
/** Transport's pool. */
pj_pool_t *pool;
@@ -66,10 +67,13 @@ struct pjsip_transport_t
/** I/O Queue key */
pj_ioqueue_key_t *key;
-
+
+ /** Accept key. */
+ pj_ioqueue_op_key_t accept_op;
+
/** Receive data buffer */
pjsip_rx_data *rdata;
-
+
/** Pointer to transport manager */
pjsip_transport_mgr *mgr;
@@ -114,7 +118,9 @@ struct pjsip_transport_mgr
pj_mutex_t *mutex;
pjsip_endpoint *endpt;
pj_ioqueue_t *ioqueue;
- pj_time_val next_idle_check;
+ pj_time_val next_idle_check;
+ pj_size_t send_buf_size;
+ pj_size_t recv_buf_size;
void (*message_callback)(pjsip_endpoint*, pjsip_rx_data *rdata);
};
@@ -144,7 +150,7 @@ typedef struct transport_key
*/
struct transport_callback
{
- PJ_DECL_LIST_MEMBER(struct transport_callback)
+ PJ_DECL_LIST_MEMBER(struct transport_callback);
/** User defined token to be passed to the callback. */
void *token;
@@ -173,10 +179,18 @@ const struct
#endif
};
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read);
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent);
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, int status);
-static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status);
+static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent);
+static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t newsock,
+ int status);
+static void on_ioqueue_connect(pj_ioqueue_key_t *key,
+ int status);
static pj_ioqueue_callback ioqueue_transport_callback =
{
@@ -194,11 +208,11 @@ static void init_key_from_transport(transport_key *key,
key->type = (pj_uint8_t)tr->type;
key->zero = 0;
- key->addr = pj_sockaddr_get_addr(&tr->remote_addr);
- key->port = pj_sockaddr_get_port(&tr->remote_addr);
+ key->addr = pj_sockaddr_in_get_addr(&tr->remote_addr).s_addr;
+ key->port = pj_sockaddr_in_get_port(&tr->remote_addr);
/*
if (key->port == 0) {
- key->port = pj_sockaddr_get_port(&tr->local_addr);
+ key->port = pj_sockaddr_in_get_port(&tr->local_addr);
}
*/
}
@@ -212,15 +226,15 @@ static void init_tcp_key(transport_key *key, pjsip_transport_type_e type,
key->type = (pj_uint8_t)type;
key->zero = 0;
- key->addr = pj_sockaddr_get_addr(addr);
- key->port = pj_sockaddr_get_port(addr);
+ key->addr = pj_sockaddr_in_get_addr(addr).s_addr;
+ key->port = pj_sockaddr_in_get_port(addr);
}
#endif
static void init_udp_key(transport_key *key, pjsip_transport_type_e type,
const pj_sockaddr_in *addr)
{
- PJ_UNUSED_ARG(addr)
+ PJ_UNUSED_ARG(addr);
/* This is to detect alignment problems. */
pj_assert(sizeof(transport_key) == 8);
@@ -236,7 +250,7 @@ static void init_udp_key(transport_key *key, pjsip_transport_type_e type,
pj_str_t localaddr = pj_str("127.0.0.1");
pj_sockaddr_in addr;
pj_sockaddr_set_str_addr(&addr, &localaddr);
- key->addr = pj_sockaddr_get_addr(&addr);
+ key->addr = pj_sockaddr_in_get_addr(&addr);
}
#endif
}
@@ -297,31 +311,36 @@ pjsip_transport_get_type_from_name(const pj_str_t *name)
/*
* Create new transmit buffer.
*/
-pjsip_tx_data* pjsip_tx_data_create( pjsip_transport_mgr *mgr )
+pj_status_t pjsip_tx_data_create( pjsip_transport_mgr *mgr,
+ pjsip_tx_data **p_tdata )
{
pj_pool_t *pool;
pjsip_tx_data *tdata;
+ pj_status_t status;
PJ_LOG(5, ("", "pjsip_tx_data_create"));
+
+ PJ_ASSERT_RETURN(mgr && p_tdata, PJ_EINVAL);
pool = pjsip_endpt_create_pool( mgr->endpt, "ptdt%p",
PJSIP_POOL_LEN_TDATA,
PJSIP_POOL_INC_TDATA );
if (!pool) {
- return NULL;
+ return PJ_ENOMEM;
}
tdata = pj_pool_calloc(pool, 1, sizeof(pjsip_tx_data));
tdata->pool = pool;
tdata->mgr = mgr;
- sprintf(tdata->obj_name,"txd%p", tdata);
+ pj_sprintf(tdata->obj_name,"txd%p", tdata);
- tdata->ref_cnt = pj_atomic_create(tdata->pool, 0);
- if (!tdata->ref_cnt) {
+ status = pj_atomic_create(tdata->pool, 0, &tdata->ref_cnt);
+ if (status != PJ_SUCCESS) {
pjsip_endpt_destroy_pool( mgr->endpt, tdata->pool );
- return NULL;
+ return status;
}
-
- return tdata;
+
+ *p_tdata = tdata;
+ return PJ_SUCCESS;
}
/*
@@ -339,7 +358,7 @@ PJ_DEF(void) pjsip_tx_data_add_ref( pjsip_tx_data *tdata )
PJ_DEF(void) pjsip_tx_data_dec_ref( pjsip_tx_data *tdata )
{
pj_assert( pj_atomic_get(tdata->ref_cnt) > 0);
- if (pj_atomic_dec(tdata->ref_cnt) <= 0) {
+ if (pj_atomic_dec_and_get(tdata->ref_cnt) <= 0) {
PJ_LOG(6,(tdata->obj_name, "destroying txdata"));
pj_atomic_destroy( tdata->ref_cnt );
pjsip_endpt_destroy_pool( tdata->mgr->endpt, tdata->pool );
@@ -375,7 +394,7 @@ PJ_DEF(pjsip_transport_type_e) pjsip_get_transport_type_from_flag(unsigned flag)
return PJSIP_TRANSPORT_TCP;
} else
#else
- PJ_UNUSED_ARG(flag)
+ PJ_UNUSED_ARG(flag);
#endif
{
return PJSIP_TRANSPORT_UDP;
@@ -452,7 +471,7 @@ PJ_DEF(void) pjsip_transport_add_ref( pjsip_transport_t * tr )
PJ_DEF(void) pjsip_transport_dec_ref( pjsip_transport_t *tr )
{
pj_assert(tr->ref_cnt > 0);
- if (pj_atomic_dec(tr->ref_cnt) == 0) {
+ if (pj_atomic_dec_and_get(tr->ref_cnt) == 0) {
pj_gettimeofday(&tr->close_time);
tr->close_time.sec += PJSIP_TRANSPORT_CLOSE_TIMEOUT;
}
@@ -461,13 +480,15 @@ PJ_DEF(void) pjsip_transport_dec_ref( pjsip_transport_t *tr )
/*
* Open the underlying transport.
*/
-static pj_sock_t create_socket( pjsip_transport_type_e type,
- pj_sockaddr_in *local )
+static pj_status_t create_socket( pjsip_transport_type_e type,
+ pj_sockaddr_in *local,
+ pj_sock_t *p_sock)
{
int sock_family;
int sock_type;
int sock_proto;
- int len;
+ int len;
+ pj_status_t status;
pj_sock_t sock;
/* Set socket parameters */
@@ -483,30 +504,23 @@ static pj_sock_t create_socket( pjsip_transport_type_e type,
sock_proto = 0;
#endif
} else {
- PJ_LOG(2,("", "create_socket: unsupported transport type %s",
- get_type_name(type)));
- return PJ_INVALID_SOCKET;
+ return PJ_EINVAL;
}
/* Create socket. */
- sock = pj_sock_socket( sock_family, sock_type, sock_proto, PJ_SOCK_ASYNC);
- if (sock == PJ_INVALID_SOCKET) {
- PJ_PERROR((THIS_FILE, "%s socket()", get_type_name(type)));
- return PJ_INVALID_SOCKET;
- }
+ status = pj_sock_socket( sock_family, sock_type, sock_proto, &sock);
+ if (status != PJ_SUCCESS)
+ return status;
/* Bind the socket to the requested address, or if no address is
* specified, let the operating system chooses the address.
*/
if (/*local->sin_addr.s_addr != 0 &&*/ local->sin_port != 0) {
- /* Bind to the requested address. */
- if (pj_sock_bind(sock, local, sizeof(*local)) != 0) {
- PJ_PERROR((THIS_FILE, "bind() to %s %s:%d",
- get_type_name(type),
- pj_sockaddr_get_str_addr(local),
- pj_sockaddr_get_port(local)));
+ /* Bind to the requested address. */
+ status = pj_sock_bind(sock, local, sizeof(*local));
+ if (status != PJ_SUCCESS) {
pj_sock_close(sock);
- return PJ_INVALID_SOCKET;
+ return status;
}
} else if (type == PJSIP_TRANSPORT_UDP) {
/* Only for UDP sockets: bind to any address so that the operating
@@ -515,23 +529,24 @@ static pj_sock_t create_socket( pjsip_transport_type_e type,
* get 0.0.0.0 as local address).
*/
pj_memset(local, 0, sizeof(*local));
- local->sin_family = PJ_AF_INET;
- if (pj_sock_bind(sock, local, sizeof(*local)) != 0) {
- PJ_PERROR((THIS_FILE, "bind() to %s 0.0.0.0:0", get_type_name(type)));
+ local->sin_family = PJ_AF_INET;
+ status = pj_sock_bind(sock, local, sizeof(*local));
+ if (status != PJ_SUCCESS) {
pj_sock_close(sock);
- return PJ_INVALID_SOCKET;
+ return status;
}
/* Get the local address. */
- len = sizeof(pj_sockaddr_in);
- if (pj_sock_getsockname(sock, local, &len)) {
- PJ_PERROR((THIS_FILE, "getsockname()"));
+ len = sizeof(pj_sockaddr_in);
+ status = pj_sock_getsockname(sock, local, &len);
+ if (status != PJ_SUCCESS) {
pj_sock_close(sock);
- return -1;
+ return status;
}
}
-
- return sock;
+
+ *p_sock = sock;
+ return PJ_SUCCESS;
}
/*
@@ -547,21 +562,24 @@ static void destroy_socket( pjsip_transport_t * tr)
/*
* Create a new transport object.
*/
-static pjsip_transport_t* create_transport( pjsip_transport_mgr *mgr,
- pjsip_transport_type_e type,
- pj_sock_t sock_hnd,
- const pj_sockaddr_in *local_addr,
- const pj_sockaddr_in *addr_name)
+static pj_status_t create_transport( pjsip_transport_mgr *mgr,
+ pjsip_transport_type_e type,
+ pj_sock_t sock_hnd,
+ const pj_sockaddr_in *local_addr,
+ const pj_sockaddr_in *addr_name,
+ pjsip_transport_t **p_transport )
{
pj_pool_t *tr_pool=NULL, *rdata_pool=NULL;
- pjsip_transport_t *tr = NULL;
+ pjsip_transport_t *tr = NULL;
+ pj_status_t status;
/* Allocate pool for transport from endpoint. */
tr_pool = pjsip_endpt_create_pool( mgr->endpt,
transport_get_name_format(type),
PJSIP_POOL_LEN_TRANSPORT,
PJSIP_POOL_INC_TRANSPORT );
- if (!tr_pool) {
+ if (!tr_pool) {
+ status = PJ_ENOMEM;
goto on_error;
}
@@ -570,7 +588,8 @@ static pjsip_transport_t* create_transport( pjsip_transport_mgr *mgr,
"prdt%p",
PJSIP_POOL_LEN_RDATA,
PJSIP_POOL_INC_RDATA );
- if (!rdata_pool) {
+ if (!rdata_pool) {
+ status = PJ_ENOMEM;
goto on_error;
}
@@ -582,7 +601,7 @@ static pjsip_transport_t* create_transport( pjsip_transport_mgr *mgr,
tr->sock = sock_hnd;
pj_memcpy(&tr->local_addr, local_addr, sizeof(pj_sockaddr_in));
pj_list_init(&tr->cb_list);
- sprintf(tr->obj_name, transport_get_name_format(type), tr);
+ pj_sprintf(tr->obj_name, transport_get_name_format(type), tr);
if (type != PJSIP_TRANSPORT_UDP) {
tr->flag |= PJSIP_TRANSPORT_RELIABLE;
@@ -594,11 +613,10 @@ static pjsip_transport_t* create_transport( pjsip_transport_mgr *mgr,
}
pj_memcpy(&tr->addr_name, addr_name, sizeof(*addr_name));
- /* Create atomic */
- tr->ref_cnt = pj_atomic_create(tr_pool, 0);
- if (!tr->ref_cnt) {
+ /* Create atomic */
+ status = pj_atomic_create(tr_pool, 0, &tr->ref_cnt);
+ if (status != PJ_SUCCESS)
goto on_error;
- }
/* Init rdata in the transport. */
tr->rdata = pj_pool_alloc(rdata_pool, sizeof(*tr->rdata));
@@ -607,22 +625,20 @@ static pjsip_transport_t* create_transport( pjsip_transport_mgr *mgr,
tr->rdata->transport = tr;
/* Init transport mutex. */
- tr->tr_mutex = pj_mutex_create(tr_pool, "mtr%p", 0);
- if (!tr->tr_mutex) {
- PJ_PERROR((tr->obj_name, "pj_mutex_create()"));
+ status = pj_mutex_create_recursive(tr_pool, "mtr%p", &tr->tr_mutex);
+ if (status != PJ_SUCCESS)
goto on_error;
- }
/* Register to I/O Queue */
- tr->key = pj_ioqueue_register(tr_pool, mgr->ioqueue,
- (pj_oshandle_t)tr->sock, tr,
- &ioqueue_transport_callback);
- if (tr->key == NULL) {
- PJ_PERROR((tr->obj_name, "pj_ioqueue_register()"));
+ status = pj_ioqueue_register_sock( tr_pool, mgr->ioqueue,
+ tr->sock, tr,
+ &ioqueue_transport_callback,
+ &tr->key);
+ if (status != PJ_SUCCESS)
goto on_error;
- }
-
- return tr;
+
+ *p_transport = tr;
+ return PJ_SUCCESS;
on_error:
if (tr && tr->tr_mutex) {
@@ -634,18 +650,18 @@ on_error:
if (rdata_pool) {
pjsip_endpt_destroy_pool(mgr->endpt, rdata_pool);
}
- return NULL;
+ return status;
}
/*
* Destroy transport.
*/
-static void destroy_transport( pjsip_transport_mgr *mgr, pjsip_transport_t *tr )
+static void destroy_transport( pjsip_transport_mgr *mgr, pjsip_transport_t *tr)
{
transport_key hash_key;
/* Remove from I/O queue. */
- pj_ioqueue_unregister( mgr->ioqueue, tr->key );
+ pj_ioqueue_unregister( tr->key );
/* Remove from hash table */
init_key_from_transport(&hash_key, tr);
@@ -668,12 +684,18 @@ static void destroy_transport( pjsip_transport_mgr *mgr, pjsip_transport_t *tr )
}
-static int transport_send_msg( pjsip_transport_t *tr, pjsip_tx_data *tdata,
- const pj_sockaddr_in *addr)
+static pj_status_t transport_send_msg( pjsip_transport_t *tr,
+ pjsip_tx_data *tdata,
+ const pj_sockaddr_in *addr,
+ pj_ssize_t *p_sent)
{
const char *buf = tdata->buf.start;
- int sent;
- int len;
+ pj_ssize_t size;
+ pj_status_t status;
+
+ /* Can only send if tdata is not being sent! */
+ if (pj_ioqueue_is_pending(tr->key, &tdata->op_key))
+ return PJSIP_EPENDINGTX;
/* Allocate buffer if necessary. */
if (tdata->buf.start == NULL) {
@@ -684,34 +706,31 @@ static int transport_send_msg( pjsip_transport_t *tr, pjsip_tx_data *tdata,
/* Print the message if it's not printed */
if (tdata->buf.cur <= tdata->buf.start) {
- len = pjsip_msg_print(tdata->msg, tdata->buf.start,
- tdata->buf.end - tdata->buf.start);
- if (len < 1) {
- return len;
- }
- tdata->buf.cur += len;
- tdata->buf.cur[len] = '\0';
+ size = pjsip_msg_print( tdata->msg, tdata->buf.start,
+ tdata->buf.end - tdata->buf.start);
+ if (size < 0) {
+ return PJSIP_EMSGTOOLONG;
+ }
+ pj_assert(size != 0);
+ tdata->buf.cur += size;
+ tdata->buf.cur[size] = '\0';
}
- /* BUG BUG BUG */
- /* MUST CHECK THAT THE SOCKET IS READY TO SEND (IOQueue)! */
- PJ_TODO(BUG_BUG_BUG___SENDING_DATAGRAM_WHILE_SOCKET_IS_PENDING__)
-
/* Send the message. */
buf = tdata->buf.start;
- len = tdata->buf.cur - tdata->buf.start;
+ size = tdata->buf.cur - tdata->buf.start;
if (tr->type == PJSIP_TRANSPORT_UDP) {
PJ_LOG(4,(tr->obj_name, "sendto %s:%d, %d bytes, data:\n"
"----------- begin msg ------------\n"
"%s"
"------------ end msg -------------",
- pj_sockaddr_get_str_addr(addr),
- pj_sockaddr_get_port(addr),
- len, buf));
-
- sent = pj_ioqueue_sendto( tr->mgr->ioqueue, tr->key,
- buf, len, addr, sizeof(*addr));
+ pj_inet_ntoa(addr->sin_addr),
+ pj_sockaddr_in_get_port(addr),
+ size, buf));
+
+ status = pj_ioqueue_sendto( tr->key, &tdata->op_key,
+ buf, &size, 0, addr, sizeof(*addr));
}
#if PJ_HAS_TCP
else {
@@ -719,40 +738,33 @@ static int transport_send_msg( pjsip_transport_t *tr, pjsip_tx_data *tdata,
"----------- begin msg ------------\n"
"%s"
"------------ end msg -------------",
- len, buf));
+ size, buf));
- sent = pj_ioqueue_write (tr->mgr->ioqueue, tr->key, buf, len);
+ status = pj_ioqueue_send(tr->key, &tdata->op_key, buf, &size, 0);
}
#else
else {
- pj_assert(0);
- sent = -1;
+ pj_assert(!"Unsupported transport");
+ status = PJSIP_EUNSUPTRANSPORT;
}
#endif
-
- if (sent == len || sent == PJ_IOQUEUE_PENDING) {
- return len;
- }
-
- /* On error, clear the flag. */
- PJ_PERROR((tr->obj_name, tr->type == PJSIP_TRANSPORT_UDP ? "pj_ioqueue_sendto()" : "pj_ioqueue_write()"));
- return -1;
+
+ *p_sent = size;
+ return status;
}
/*
* Send a SIP message using the specified transport, to the address specified
* in the outgoing data.
*/
-PJ_DEF(int) pjsip_transport_send_msg( pjsip_transport_t *tr,
- pjsip_tx_data *tdata,
- const pj_sockaddr_in *addr)
+PJ_DEF(pj_status_t) pjsip_transport_send_msg( pjsip_transport_t *tr,
+ pjsip_tx_data *tdata,
+ const pj_sockaddr_in *addr,
+ pj_ssize_t *sent)
{
- int sent;
-
PJ_LOG(5, (tr->obj_name, "pjsip_transport_send_msg(tdata=%s)", tdata->obj_name));
- sent = transport_send_msg(tr, tdata, addr );
- return sent;
+ return transport_send_msg(tr, tdata, addr, sent );
}
///////////////////////////////////////////////////////////////////////////////
@@ -760,44 +772,47 @@ PJ_DEF(int) pjsip_transport_send_msg( pjsip_transport_t *tr,
/*
* Create a new transport manager.
*/
-PJ_DEF(pjsip_transport_mgr*)
-pjsip_transport_mgr_create( pj_pool_t *pool,
- pjsip_endpoint * endpt,
- void (*cb)(pjsip_endpoint*,pjsip_rx_data *) )
+PJ_DEF(pj_status_t) pjsip_transport_mgr_create( pj_pool_t *pool,
+ pjsip_endpoint * endpt,
+ void (*cb)(pjsip_endpoint*,
+ pjsip_rx_data *),
+ pjsip_transport_mgr **p_mgr)
{
pjsip_transport_mgr *mgr;
+ pj_status_t status;
PJ_LOG(5, (LOG_TRANSPORT_MGR, "pjsip_transport_mgr_create()"));
mgr = pj_pool_alloc(pool, sizeof(*mgr));
mgr->endpt = endpt;
mgr->message_callback = cb;
-
+ mgr->send_buf_size = DEFAULT_SO_SNDBUF;
+ mgr->recv_buf_size = DEFAULT_SO_RCVBUF;
+
mgr->transport_table = pj_hash_create(pool, MGR_HASH_TABLE_SIZE);
if (!mgr->transport_table) {
- PJ_LOG(3, (LOG_TRANSPORT_MGR, "error creating transport manager hash table"));
- return NULL;
+ return PJ_ENOMEM;
}
- mgr->ioqueue = pj_ioqueue_create(pool, PJSIP_MAX_TRANSPORTS);
- if (!mgr->ioqueue) {
- PJ_LOG(3, (LOG_TRANSPORT_MGR, "error creating IO queue"));
- return NULL;
+ status = pj_ioqueue_create(pool, PJSIP_MAX_TRANSPORTS, &mgr->ioqueue);
+ if (status != PJ_SUCCESS) {
+ return status;
}
- mgr->mutex = pj_mutex_create(pool, "tmgr%p", 0);
- if (!mgr->mutex) {
- PJ_LOG(3, (LOG_TRANSPORT_MGR, "error creating mutex"));
+ status = pj_mutex_create_recursive(pool, "tmgr%p", &mgr->mutex);
+ if (status != PJ_SUCCESS) {
pj_ioqueue_destroy(mgr->ioqueue);
- return NULL;
+ return status;
}
pj_gettimeofday(&mgr->next_idle_check);
- mgr->next_idle_check.sec += MGR_IDLE_CHECK_INTERVAL;
- return mgr;
+ mgr->next_idle_check.sec += MGR_IDLE_CHECK_INTERVAL;
+
+ *p_mgr = mgr;
+ return status;
}
/*
* Destroy transport manager.
*/
-PJ_DEF(void) pjsip_transport_mgr_destroy( pjsip_transport_mgr *mgr )
+PJ_DEF(pj_status_t) pjsip_transport_mgr_destroy( pjsip_transport_mgr *mgr )
{
pj_hash_iterator_t itr_val;
pj_hash_iterator_t *itr;
@@ -822,7 +837,9 @@ PJ_DEF(void) pjsip_transport_mgr_destroy( pjsip_transport_mgr *mgr )
}
pj_ioqueue_destroy(mgr->ioqueue);
- pj_mutex_unlock(mgr->mutex);
+ pj_mutex_unlock(mgr->mutex);
+
+ return PJ_SUCCESS;
}
/*
@@ -835,62 +852,81 @@ static pj_status_t create_listener( pjsip_transport_mgr *mgr,
const pj_sockaddr_in *addr_name)
{
pjsip_transport_t *tr;
- struct transport_key *hash_key;
- int opt_val;
-
- opt_val = DEFAULT_SO_SNDBUF;
- if (pj_sock_setsockopt( sock_hnd, SOL_SOCKET, SO_SNDBUF, &opt_val, sizeof(opt_val)) != PJ_SUCCESS) {
- PJ_LOG(3, (LOG_TRANSPORT_MGR, "create listener: error setting SNDBUF to %d", DEFAULT_SO_SNDBUF));
- // Just ignore the error.
+ struct transport_key *hash_key;
+ const pj_str_t loopback_addr = { "127.0.0.1", 9 };
+ pj_status_t status;
+
+ if (mgr->send_buf_size != 0) {
+ int opt_val = mgr->send_buf_size;
+ status = pj_sock_setsockopt( sock_hnd, PJ_SOL_SOCKET,
+ PJ_SO_SNDBUF,
+ &opt_val, sizeof(opt_val));
+
+ if (status != PJ_SUCCESS) {
+ return status;
+ }
}
-
- opt_val = DEFAULT_SO_RCVBUF;
- if (pj_sock_setsockopt( sock_hnd, SOL_SOCKET, SO_RCVBUF, &opt_val, sizeof(opt_val)) != PJ_SUCCESS) {
- PJ_LOG(3, (LOG_TRANSPORT_MGR, "create listener: error setting RCVBUF to %d", DEFAULT_SO_SNDBUF));
- // Just ignore the error
+
+ if (mgr->recv_buf_size != 0) {
+ int opt_val = mgr->recv_buf_size;
+ status = pj_sock_setsockopt( sock_hnd, PJ_SOL_SOCKET,
+ PJ_SO_RCVBUF,
+ &opt_val, sizeof(opt_val));
+ if (status != PJ_SUCCESS) {
+ return status;
+ }
}
- tr = create_transport(mgr, type, sock_hnd, local_addr, addr_name);
- if (!tr) {
+ status = create_transport(mgr, type, sock_hnd, local_addr, addr_name, &tr);
+ if (status != PJ_SUCCESS) {
pj_sock_close(sock_hnd);
- return -1;
+ return status;
}
#if PJ_HAS_TCP
if (type == PJSIP_TRANSPORT_TCP) {
- pj_status_t status;
-
- if (pj_sock_listen(tr->sock, BACKLOG) != 0) {
- PJ_PERROR((tr->obj_name, "listen()"));
- destroy_transport(mgr, tr);
- return -1;
- }
- tr->accept_data.addrlen = sizeof(tr->accept_data.local);
- status = pj_ioqueue_accept(mgr->ioqueue, tr->key,
- &tr->accept_data.sock,
- &tr->accept_data.local,
- &tr->accept_data.remote,
- &tr->accept_data.addrlen);
- if (status != PJ_IOQUEUE_PENDING) {
- PJ_PERROR((tr->obj_name, "pj_ioqueue_accept()"));
+
+ status = pj_sock_listen(tr->sock, BACKLOG);
+ if (status != 0) {
destroy_transport(mgr, tr);
- return -1;
+ return status;
}
+
+ /* Discard immediate connections. */
+ do {
+ tr->accept_data.addrlen = sizeof(tr->accept_data.local);
+ status = pj_ioqueue_accept(tr->key, &tr->accept_op,
+ &tr->accept_data.sock,
+ &tr->accept_data.local,
+ &tr->accept_data.remote,
+ &tr->accept_data.addrlen);
+ if (status==PJ_SUCCESS) {
+ pj_sock_close(tr->accept_data.sock);
+ } else if (status != PJ_EPENDING) {
+ destroy_transport(mgr, tr);
+ return status;
+ }
+ } while (status==PJ_SUCCESS);
} else
#endif
- if (type == PJSIP_TRANSPORT_UDP) {
- pj_status_t status;
-
- tr->rdata->addr_len = sizeof(tr->rdata->addr);
- status = pj_ioqueue_recvfrom( mgr->ioqueue, tr->key,
- tr->rdata->packet, PJSIP_MAX_PKT_LEN,
- &tr->rdata->addr,
- &tr->rdata->addr_len);
- if (status != PJ_IOQUEUE_PENDING) {
- PJ_PERROR((tr->obj_name, "pj_ioqueue_recvfrom()"));
- destroy_transport(mgr, tr);
- return -1;
- }
+ if (type == PJSIP_TRANSPORT_UDP) {
+ pj_ssize_t bytes;
+
+ /* Discard immediate data. */
+ do {
+ tr->rdata->addr_len = sizeof(tr->rdata->addr);
+ bytes = PJSIP_MAX_PKT_LEN;
+ status = pj_ioqueue_recvfrom( tr->key, &tr->rdata->op_key,
+ tr->rdata->packet, &bytes, 0,
+ &tr->rdata->addr,
+ &tr->rdata->addr_len);
+ if (status == PJ_SUCCESS) {
+ ;
+ } else if (status != PJ_EPENDING) {
+ destroy_transport(mgr, tr);
+ return status;
+ }
+ } while (status == PJ_SUCCESS);
}
pj_atomic_set(tr->ref_cnt, 1);
@@ -900,26 +936,29 @@ static pj_status_t create_listener( pjsip_transport_mgr *mgr,
/* Set remote address to 127.0.0.1 for UDP socket bound to 127.0.0.1.
* See further comments on struct pjsip_transport_t definition.
- */
- if (type == PJSIP_TRANSPORT_UDP && local_addr->sin_addr.s_addr == inet_addr("127.0.0.1")) {
+ */
+ if (type == PJSIP_TRANSPORT_UDP &&
+ local_addr->sin_addr.s_addr == pj_inet_addr(&loopback_addr).s_addr)
+ {
pj_str_t localaddr = pj_str("127.0.0.1");
- pj_sockaddr_set_str_addr( &tr->remote_addr, &localaddr);
+ pj_sockaddr_in_set_str_addr( &tr->remote_addr, &localaddr);
}
hash_key = pj_pool_alloc(tr->pool, sizeof(transport_key));
init_key_from_transport(hash_key, tr);
pj_mutex_lock(mgr->mutex);
- pj_hash_set(tr->pool, mgr->transport_table, hash_key, sizeof(transport_key), tr);
+ pj_hash_set(tr->pool, mgr->transport_table,
+ hash_key, sizeof(transport_key), tr);
pj_mutex_unlock(mgr->mutex);
PJ_LOG(4,(tr->obj_name, "Listening at %s %s:%d",
get_type_name(tr->type),
- pj_sockaddr_get_str_addr(&tr->local_addr),
- pj_sockaddr_get_port(&tr->local_addr)));
+ pj_inet_ntoa(tr->local_addr.sin_addr),
+ pj_sockaddr_in_get_port(&tr->local_addr)));
PJ_LOG(4,(tr->obj_name, "Listener public address is at %s %s:%d",
get_type_name(tr->type),
- pj_sockaddr_get_str_addr(&tr->addr_name),
- pj_sockaddr_get_port(&tr->addr_name)));
+ pj_inet_ntoa(tr->addr_name.sin_addr),
+ pj_sockaddr_in_get_port(&tr->addr_name)));
return PJ_SUCCESS;
}
@@ -932,12 +971,13 @@ PJ_DEF(pj_status_t) pjsip_create_listener( pjsip_transport_mgr *mgr,
const pj_sockaddr_in *addr_name)
{
pj_sock_t sock_hnd;
+ pj_status_t status;
PJ_LOG(5, (LOG_TRANSPORT_MGR, "pjsip_create_listener(type=%d)", type));
- sock_hnd = create_socket(type, local_addr);
- if (sock_hnd == PJ_INVALID_SOCKET) {
- return -1;
+ status = create_socket(type, local_addr, &sock_hnd);
+ if (status != PJ_SUCCESS) {
+ return status;
}
return create_listener(mgr, type, sock_hnd, local_addr, addr_name);
@@ -950,13 +990,16 @@ PJ_DEF(pj_status_t) pjsip_create_udp_listener( pjsip_transport_mgr *mgr,
pj_sock_t sock,
const pj_sockaddr_in *addr_name)
{
- pj_sockaddr_in local_addr;
+ pj_sockaddr_in local_addr;
+ pj_status_t status;
int addrlen = sizeof(local_addr);
+
+ status = pj_sock_getsockname(sock, (pj_sockaddr_t*)&local_addr, &addrlen);
+ if (status != PJ_SUCCESS)
+ return status;
- if (pj_sock_getsockname(sock, (pj_sockaddr_t*)&local_addr, &addrlen) != 0)
- return -1;
-
- return create_listener(mgr, PJSIP_TRANSPORT_UDP, sock, &local_addr, addr_name);
+ return create_listener(mgr, PJSIP_TRANSPORT_UDP, sock,
+ &local_addr, addr_name);
}
/*
@@ -973,7 +1016,7 @@ PJ_DEF(void) pjsip_transport_get( pjsip_transport_mgr *mgr,
transport_key search_key, *hash_key;
pjsip_transport_t *tr;
pj_sockaddr_in local;
- int sock_hnd;
+ pj_sock_t sock_hnd;
pj_status_t status;
struct transport_callback *cb_rec;
@@ -1029,49 +1072,50 @@ PJ_DEF(void) pjsip_transport_get( pjsip_transport_mgr *mgr,
/* Transport not found. Create new one. */
pj_memset(&local, 0, sizeof(local));
local.sin_family = PJ_AF_INET;
- sock_hnd = create_socket(type, &local);
- if (sock_hnd == PJ_INVALID_SOCKET) {
+ status = create_socket(type, &local, &sock_hnd);
+ if (status != PJ_SUCCESS) {
pj_mutex_unlock(mgr->mutex);
- (*cb_rec->cb)(NULL, cb_rec->token, -1);
+ (*cb_rec->cb)(NULL, cb_rec->token, status);
return;
}
- tr = create_transport(mgr, type, sock_hnd, &local, NULL);
- if (!tr) {
+ status = create_transport(mgr, type, sock_hnd, &local, NULL, &tr);
+ if (status != PJ_SUCCESS) {
pj_mutex_unlock(mgr->mutex);
- (*cb_rec->cb)(NULL, cb_rec->token, -1);
+ (*cb_rec->cb)(NULL, cb_rec->token, status);
return;
}
#if PJ_HAS_TCP
if (type == PJSIP_TRANSPORT_TCP) {
pj_memcpy(&tr->remote_addr, remote, sizeof(pj_sockaddr_in));
- status = pj_ioqueue_connect(mgr->ioqueue, tr->key,
- &tr->remote_addr, sizeof(pj_sockaddr_in));
+ status = pj_ioqueue_connect(tr->key, &tr->remote_addr,
+ sizeof(pj_sockaddr_in));
pj_assert(status != 0);
- if (status != PJ_IOQUEUE_PENDING) {
- PJ_PERROR((tr->obj_name, "pj_ioqueue_connect()"));
+ if (status != PJ_EPENDING) {
+ PJ_TODO(HANDLE_IMMEDIATE_CONNECT);
destroy_transport(mgr, tr);
pj_mutex_unlock(mgr->mutex);
- (*cb_rec->cb)(NULL, cb_rec->token, -1);
+ (*cb_rec->cb)(NULL, cb_rec->token, status);
return;
}
} else
#endif
if (type == PJSIP_TRANSPORT_UDP) {
- int len;
+ pj_ssize_t size;
do {
- tr->rdata->addr_len = sizeof(tr->rdata->addr);
- len = pj_ioqueue_recvfrom( mgr->ioqueue, tr->key,
- tr->rdata->packet, PJSIP_MAX_PKT_LEN,
- &tr->rdata->addr,
- &tr->rdata->addr_len);
- pj_assert(len < 0);
- if (len != PJ_IOQUEUE_PENDING) {
- PJ_PERROR((tr->obj_name, "pj_ioqueue_recvfrom()"));
+ tr->rdata->addr_len = sizeof(tr->rdata->addr);
+ size = PJSIP_MAX_PKT_LEN;
+ status = pj_ioqueue_recvfrom( tr->key, &tr->rdata->op_key,
+ tr->rdata->packet, &size, 0,
+ &tr->rdata->addr,
+ &tr->rdata->addr_len);
+ if (status == PJ_SUCCESS)
+ ;
+ else if (status != PJ_EPENDING) {
destroy_transport(mgr, tr);
pj_mutex_unlock(mgr->mutex);
- (*cb_rec->cb)(NULL, cb_rec->token, -1);
+ (*cb_rec->cb)(NULL, cb_rec->token, status);
return;
}
@@ -1082,7 +1126,7 @@ PJ_DEF(void) pjsip_transport_get( pjsip_transport_mgr *mgr,
*/
PJ_TODO(FIXED_BUG_ON_IMMEDIATE_TRANSPORT_DATA);
- } while (len != PJ_IOQUEUE_PENDING);
+ } while (status == PJ_SUCCESS);
//Bug: cb will never be called!
// Must force status to PJ_SUCCESS;
@@ -1092,15 +1136,16 @@ PJ_DEF(void) pjsip_transport_get( pjsip_transport_mgr *mgr,
} else {
pj_mutex_unlock(mgr->mutex);
- (*cb_rec->cb)(NULL, cb_rec->token, -1);
+ (*cb_rec->cb)(NULL, cb_rec->token, PJSIP_EUNSUPTRANSPORT);
return;
}
- pj_assert(status==PJ_IOQUEUE_PENDING || status==PJ_SUCCESS);
+ pj_assert(status==PJ_EPENDING || status==PJ_SUCCESS);
pj_mutex_lock(tr->tr_mutex);
hash_key = pj_pool_alloc(tr->pool, sizeof(transport_key));
pj_memcpy(hash_key, &search_key, sizeof(transport_key));
- pj_hash_set(tr->pool, mgr->transport_table, hash_key, sizeof(transport_key), tr);
+ pj_hash_set(tr->pool, mgr->transport_table,
+ hash_key, sizeof(transport_key), tr);
if (status == PJ_SUCCESS) {
pj_mutex_unlock(tr->tr_mutex);
pj_mutex_unlock(mgr->mutex);
@@ -1123,24 +1168,28 @@ static void handle_new_connection( pjsip_transport_mgr *mgr,
pj_status_t status )
{
pjsip_transport_t *tr;
- transport_key *hash_key;
+ transport_key *hash_key;
+ pj_ssize_t size;
pj_assert (listener->type == PJSIP_TRANSPORT_TCP);
if (status != PJ_SUCCESS) {
- PJ_PERROR((listener->obj_name, "accept() returned error"));
- return;
+ PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status,
+ "Error in accept() completion"));
+ goto on_return;
}
PJ_LOG(4,(listener->obj_name, "incoming tcp connection from %s:%d",
- pj_sockaddr_get_str_addr(&listener->accept_data.remote),
- pj_sockaddr_get_port(&listener->accept_data.remote)));
-
- tr = create_transport(mgr, listener->type,
- listener->accept_data.sock,
- &listener->accept_data.local,
- NULL);
- if (!tr) {
+ pj_inet_ntoa(listener->accept_data.remote.sin_addr),
+ pj_sockaddr_in_get_port(&listener->accept_data.remote)));
+
+ status = create_transport(mgr, listener->type,
+ listener->accept_data.sock,
+ &listener->accept_data.local,
+ NULL, &tr);
+ if (status != PJ_SUCCESS) {
+ PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status,
+ "Error in creating new incoming TCP"));
goto on_return;
}
@@ -1153,32 +1202,40 @@ static void handle_new_connection( pjsip_transport_mgr *mgr,
*/
tr->rdata->addr = listener->accept_data.remote;
tr->rdata->addr_len = listener->accept_data.addrlen;
-
- status = pj_ioqueue_read (mgr->ioqueue, tr->key, tr->rdata->packet, PJSIP_MAX_PKT_LEN);
- if (status != PJ_IOQUEUE_PENDING) {
- PJ_PERROR((tr->obj_name, "pj_ioqueue_read()"));
+
+ size = PJSIP_MAX_PKT_LEN;
+ status = pj_ioqueue_recv(tr->key, &tr->rdata->op_key,
+ tr->rdata->packet, &size, 0);
+ if (status != PJ_EPENDING) {
+ PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status,
+ "Error in receiving data"));
+ PJ_TODO(IMMEDIATE_DATA);
destroy_transport(mgr, tr);
goto on_return;
}
- pj_memcpy(&tr->remote_addr, &listener->accept_data.remote, listener->accept_data.addrlen);
+ pj_memcpy(&tr->remote_addr, &listener->accept_data.remote,
+ listener->accept_data.addrlen);
hash_key = pj_pool_alloc(tr->pool, sizeof(transport_key));
init_key_from_transport(hash_key, tr);
pj_mutex_lock(mgr->mutex);
- pj_hash_set(tr->pool, mgr->transport_table, hash_key, sizeof(transport_key), tr);
+ pj_hash_set(tr->pool, mgr->transport_table, hash_key,
+ sizeof(transport_key), tr);
pj_mutex_unlock(mgr->mutex);
on_return:
/* Re-initiate asynchronous accept() */
listener->accept_data.addrlen = sizeof(listener->accept_data.local);
- status = pj_ioqueue_accept(mgr->ioqueue, listener->key,
+ status = pj_ioqueue_accept(listener->key, &listener->accept_op,
&listener->accept_data.sock,
&listener->accept_data.local,
&listener->accept_data.remote,
&listener->accept_data.addrlen);
- if (status != PJ_IOQUEUE_PENDING) {
- PJ_PERROR((listener->obj_name, "pj_ioqueue_accept()"));
+ if (status != PJ_EPENDING) {
+ PJSIP_ENDPT_LOG_ERROR((mgr->endpt, listener->obj_name, status,
+ "Error in receiving data"));
+ PJ_TODO(IMMEDIATE_ACCEPT);
return;
}
}
@@ -1193,8 +1250,9 @@ static void handle_connect_completion( pjsip_transport_mgr *mgr,
{
struct transport_callback new_list;
struct transport_callback *cb_rec;
+ pj_ssize_t recv_size;
- PJ_UNUSED_ARG(mgr)
+ PJ_UNUSED_ARG(mgr);
/* On connect completion, we must call all registered callbacks in
* the transport.
@@ -1221,11 +1279,12 @@ static void handle_connect_completion( pjsip_transport_mgr *mgr,
*/
if (status == PJ_SUCCESS) {
int addrlen = sizeof(tr->local_addr);
- int rc;
- if ((rc=pj_sock_getsockname(tr->sock, (pj_sockaddr_t*)&tr->local_addr, &addrlen)) == 0) {
+
+ status = pj_sock_getsockname(tr->sock,
+ (pj_sockaddr_t*)&tr->local_addr,
+ &addrlen);
+ if (status == PJ_SUCCESS) {
pj_memcpy(&tr->addr_name, &tr->local_addr, sizeof(tr->addr_name));
- } else {
- PJ_LOG(4,(tr->obj_name, "Unable to get local address (getsockname=%d)", rc));
}
}
@@ -1243,15 +1302,18 @@ static void handle_connect_completion( pjsip_transport_mgr *mgr,
/* Success? */
if (status != PJ_SUCCESS) {
- destroy_transport(mgr, tr);
+ destroy_transport(mgr, tr);
+ PJ_TODO(WTF);
return;
}
- /* Initiate read operation to socket. */
- status = pj_ioqueue_read (mgr->ioqueue, tr->key, tr->rdata->packet, PJSIP_MAX_PKT_LEN);
- if (status != PJ_IOQUEUE_PENDING) {
- PJ_PERROR((tr->obj_name, "pj_ioqueue_read()"));
- destroy_transport(mgr, tr);
+ /* Initiate read operation to socket. */
+ recv_size = PJSIP_MAX_PKT_LEN;
+ status = pj_ioqueue_recv( tr->key, &tr->rdata->op_key, tr->rdata->packet,
+ &recv_size, 0);
+ if (status != PJ_EPENDING) {
+ destroy_transport(mgr, tr);
+ PJ_TODO(IMMEDIATE_DATA);
return;
}
}
@@ -1269,7 +1331,6 @@ static void handle_received_data( pjsip_transport_mgr *mgr,
pj_ssize_t size )
{
pjsip_msg *msg;
- pjsip_cid_hdr *call_id;
pjsip_rx_data *rdata = tr->rdata;
pj_pool_t *rdata_pool;
pjsip_hdr *hdr;
@@ -1315,8 +1376,8 @@ static void handle_received_data( pjsip_transport_mgr *mgr,
rdata->packet[rdata->len] = '\0';
/* Get source address and port for logging purpose. */
- src_addr = pj_sockaddr_get_str_addr(&rdata->addr);
- src_port = pj_sockaddr_get_port(&rdata->addr);
+ src_addr = pj_inet_ntoa(rdata->addr.sin_addr);
+ src_port = pj_sockaddr_in_get_port(&rdata->addr);
/* Print the whole data to the log. */
PJ_LOG(4,(tr->obj_name, "%d bytes recvfrom %s:%d:\n"
@@ -1333,15 +1394,15 @@ static void handle_received_data( pjsip_transport_mgr *mgr,
#if PJ_HAS_TCP
/* For TCP transport, check if the whole message has been received. */
if (tr->type != PJSIP_TRANSPORT_UDP) {
- pj_bool_t is_complete;
- is_complete = pjsip_find_msg(rdata->packet, rdata->len, PJ_FALSE, &msg_fragment_size);
- if (!is_complete) {
- if (rdata->len == PJSIP_MAX_PKT_LEN) {
- PJ_LOG(1,(tr->obj_name,
- "Transport buffer full (%d bytes) for TCP socket %s:%d "
- "(probably too many invalid fragments received). "
- "Buffer will be discarded.",
- PJSIP_MAX_PKT_LEN, src_addr, src_port));
+ pj_status_t msg_status;
+ msg_status = pjsip_find_msg(rdata->packet, rdata->len, PJ_FALSE,
+ &msg_fragment_size);
+ if (msg_status != PJ_SUCCESS) {
+ if (rdata->len == PJSIP_MAX_PKT_LEN) {
+ PJSIP_ENDPT_LOG_ERROR((mgr->endpt, tr->obj_name,
+ PJSIP_EOVERFLOW,
+ "Buffer discarded for %s:%d",
+ src_addr, src_port));
goto on_return;
} else {
goto tcp_read_packet;
@@ -1357,36 +1418,21 @@ static void handle_received_data( pjsip_transport_mgr *mgr,
PJ_LOG(5,(tr->obj_name, "Parsing %d bytes from %s:%d", msg_fragment_size,
src_addr, src_port));
- msg = pjsip_parse_msg( rdata->pool, rdata->packet, msg_fragment_size,
- &rdata->parse_err);
+ msg = pjsip_parse_rdata( rdata->packet, msg_fragment_size, rdata);
if (msg == NULL) {
PJ_LOG(3,(tr->obj_name, "Bad message (%d bytes from %s:%d)", msg_fragment_size,
src_addr, src_port));
goto finish_process_fragment;
}
-
- /* Attach newly created message to rdata. */
- rdata->msg = msg;
-
- /* Extract Call-ID, From and To header and tags, topmost Via, and CSeq
- * header from the message.
- */
- call_id = pjsip_msg_find_hdr( msg, PJSIP_H_CALL_ID, NULL);
- rdata->from = pjsip_msg_find_hdr( msg, PJSIP_H_FROM, NULL);
- rdata->to = pjsip_msg_find_hdr( msg, PJSIP_H_TO, NULL);
- rdata->via = pjsip_msg_find_hdr( msg, PJSIP_H_VIA, NULL);
- rdata->cseq = pjsip_msg_find_hdr( msg, PJSIP_H_CSEQ, NULL);
-
- if (call_id == NULL || rdata->from == NULL || rdata->to == NULL ||
- rdata->via == NULL || rdata->cseq == NULL)
+
+ /* Perform basic header checking. */
+ if (rdata->call_id.ptr == NULL || rdata->from == NULL ||
+ rdata->to == NULL || rdata->via == NULL || rdata->cseq == NULL)
{
PJ_LOG(3,(tr->obj_name, "Bad message from %s:%d: missing some header",
src_addr, src_port));
goto finish_process_fragment;
}
- rdata->call_id = call_id->id;
- rdata->from_tag = rdata->from->tag;
- rdata->to_tag = rdata->to->tag;
/* If message is received from address that's different from the sent-by,
* MUST add received parameter to the via.
@@ -1401,14 +1447,14 @@ static void handle_received_data( pjsip_transport_mgr *mgr,
* If message contains "rport" param, put the received port there.
*/
if (rdata->via->rport_param == 0) {
- rdata->via->rport_param = pj_sockaddr_get_port(&rdata->addr);
+ rdata->via->rport_param = pj_sockaddr_in_get_port(&rdata->addr);
}
/* Drop response message if it has more than one Via.
*/
if (msg->type == PJSIP_RESPONSE_MSG) {
hdr = (pjsip_hdr*)rdata->via->next;
- if (hdr) {
+ if (hdr != &rdata->msg->hdr) {
hdr = pjsip_msg_find_hdr(msg, PJSIP_H_VIA, hdr);
if (hdr) {
PJ_LOG(3,(tr->obj_name, "Bad message from %s:%d: "
@@ -1444,10 +1490,12 @@ on_return:
/* Read the next packet. */
rdata->addr_len = sizeof(rdata->addr);
- if (tr->type == PJSIP_TRANSPORT_UDP) {
- pj_ioqueue_recvfrom( tr->mgr->ioqueue, tr->key,
- tr->rdata->packet, PJSIP_MAX_PKT_LEN,
- &rdata->addr, &rdata->addr_len);
+ if (tr->type == PJSIP_TRANSPORT_UDP) {
+ pj_ssize_t size = PJSIP_MAX_PKT_LEN;
+ pj_ioqueue_recvfrom(tr->key, &tr->rdata->op_key,
+ tr->rdata->packet, &size, 0,
+ &rdata->addr, &rdata->addr_len);
+ PJ_TODO(HANDLE_IMMEDIATE_DATA);
}
#if PJ_HAS_TCP
@@ -1455,10 +1503,12 @@ on_return:
label inside the '#if PJ_HAS_TCP' block to avoid 'unreferenced label' warning.
*/
tcp_read_packet:
- if (tr->type == PJSIP_TRANSPORT_TCP) {
- pj_ioqueue_read( tr->mgr->ioqueue, tr->key,
+ if (tr->type == PJSIP_TRANSPORT_TCP) {
+ pj_ssize_t size = PJSIP_MAX_PKT_LEN - tr->rdata->len;
+ pj_ioqueue_recv( tr->key, &tr->rdata->op_key,
tr->rdata->packet + tr->rdata->len,
- PJSIP_MAX_PKT_LEN - tr->rdata->len);
+ &size, 0);
+ PJ_TODO(HANDLE_IMMEDIATE_DATA_1);
}
#endif
}
@@ -1507,7 +1557,9 @@ static void transport_mgr_on_idle( pjsip_transport_mgr *mgr )
pj_mutex_unlock(mgr->mutex);
}
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
{
pjsip_transport_t *t;
t = pj_ioqueue_get_user_data(key);
@@ -1515,17 +1567,22 @@ static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
handle_received_data( t->mgr, t, bytes_read );
}
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
+static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
{
- PJ_UNUSED_ARG(key)
- PJ_UNUSED_ARG(bytes_sent)
+ PJ_UNUSED_ARG(key);
+ PJ_UNUSED_ARG(bytes_sent);
/* Completion of write operation.
* Do nothing.
*/
}
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, int status)
+static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t newsock,
+ int status)
{
#if PJ_HAS_TCP
pjsip_transport_t *t;
@@ -1533,8 +1590,8 @@ static void on_ioqueue_accept(pj_ioqueue_key_t *key, int status)
handle_new_connection( t->mgr, t, status );
#else
- PJ_UNUSED_ARG(key)
- PJ_UNUSED_ARG(status)
+ PJ_UNUSED_ARG(key);
+ PJ_UNUSED_ARG(status);
#endif
}
@@ -1546,8 +1603,8 @@ static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status)
handle_connect_completion( t->mgr, t, status);
#else
- PJ_UNUSED_ARG(key)
- PJ_UNUSED_ARG(status)
+ PJ_UNUSED_ARG(key);
+ PJ_UNUSED_ARG(status);
#endif
}