From 9b8e0a5afe9cba0fd430e9642630bd465db9aefa Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Thu, 21 Feb 2013 11:18:36 +0000 Subject: Fixed #1616: Implementation of Group lock and other foundation in PJLIB for fixing synchronization issues git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@4359 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/activesock.c | 56 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 3 deletions(-) (limited to 'pjlib/src/pj/activesock.c') diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c index 5452ca4b..2c0cad55 100644 --- a/pjlib/src/pj/activesock.c +++ b/pjlib/src/pj/activesock.c @@ -43,6 +43,13 @@ enum read_type TYPE_RECV_FROM }; +enum shutdown_dir +{ + SHUT_NONE = 0, + SHUT_RX = 1, + SHUT_TX = 2 +}; + struct read_op { pj_ioqueue_op_key_t op_key; @@ -77,6 +84,7 @@ struct pj_activesock_t pj_ioqueue_t *ioqueue; void *user_data; unsigned async_count; + unsigned shutdown; unsigned max_loop; pj_activesock_cb cb; #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ @@ -209,8 +217,9 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, ioq_cb.on_accept_complete = &ioqueue_on_accept_complete; #endif - status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock, - &ioq_cb, &asock->key); + status = pj_ioqueue_register_sock2(pool, ioqueue, sock, + (opt? opt->grp_lock : NULL), + asock, &ioq_cb, &asock->key); if (status != PJ_SUCCESS) { pj_activesock_close(asock); return status; @@ -283,10 +292,10 @@ PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool, return PJ_SUCCESS; } - PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock) { PJ_ASSERT_RETURN(asock, PJ_EINVAL); + asock->shutdown = SHUT_RX | SHUT_TX; if (asock->key) { #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 @@ -448,6 +457,10 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + /* Ignore if we've been shutdown */ + if (asock->shutdown & SHUT_RX) + return; + do { unsigned flags; @@ -569,6 +582,10 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, if (!ret) return; + /* Also stop further read if we've been shutdown */ + if (asock->shutdown & SHUT_RX) + return; + /* Only stream oriented socket may leave data in the packet */ if (asock->stream_oriented) { r->size = remainder; @@ -648,6 +665,9 @@ PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, { PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); + if (asock->shutdown & SHUT_TX) + return PJ_EINVALIDOP; + send_key->activesock_data = NULL; if (asock->whole_data) { @@ -698,6 +718,9 @@ PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock, PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, PJ_EINVAL); + if (asock->shutdown & SHUT_TX) + return PJ_EINVALIDOP; + return pj_ioqueue_sendto(asock->key, send_key, data, size, flags, addr, addr_len); } @@ -711,6 +734,13 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + /* Ignore if we've been shutdown. This may cause data to be partially + * sent even when 'wholedata' was requested if the OS only sent partial + * buffer. + */ + if (asock->shutdown & SHUT_TX) + return; + if (bytes_sent > 0 && op_key->activesock_data) { /* whole_data is requested. Make sure we send all the data */ struct send_data *sd = (struct send_data*)op_key->activesock_data; @@ -756,6 +786,10 @@ PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock, PJ_ASSERT_RETURN(asock, PJ_EINVAL); PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP); + /* Ignore if we've been shutdown */ + if (asock->shutdown) + return PJ_EINVALIDOP; + asock->accept_op = (struct accept_op*) pj_pool_calloc(pool, asock->async_count, sizeof(struct accept_op)); @@ -798,6 +832,10 @@ static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, PJ_UNUSED_ARG(new_sock); + /* Ignore if we've been shutdown */ + if (asock->shutdown) + return; + do { if (status == asock->last_err && status != PJ_SUCCESS) { asock->err_counter++; @@ -835,6 +873,10 @@ static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, pj_sock_close(accept_op->new_sock); } + /* Don't start another accept() if we've been shutdown */ + if (asock->shutdown) + return; + /* Prepare next accept() */ accept_op->new_sock = PJ_INVALID_SOCKET; accept_op->rem_addr_len = sizeof(accept_op->rem_addr); @@ -853,6 +895,10 @@ PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock, int addr_len) { PJ_UNUSED_ARG(pool); + + if (asock->shutdown) + return PJ_EINVALIDOP; + return pj_ioqueue_connect(asock->key, remaddr, addr_len); } @@ -861,6 +907,10 @@ static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, { pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + /* Ignore if we've been shutdown */ + if (asock->shutdown) + return; + if (asock->cb.on_connect_complete) { pj_bool_t ret; -- cgit v1.2.3