summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_common_abs.c
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-09 15:37:19 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-09 15:37:19 +0000
commit6e1024262b48b57b771331b8c19e988e43627bd7 (patch)
treea43fdaeb6d7b22cc7afab1633622bf55d39dfd67 /pjlib/src/pj/ioqueue_common_abs.c
parentfb9e3b3a6649cc5cbe0c6747cb1918f3be71ba06 (diff)
Rework pjlib++
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@36 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c115
1 files changed, 103 insertions, 12 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 4cffcae4..75774ede 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -227,7 +227,9 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* so that send() can work in parallel.
*/
if (h->fd_type == PJ_SOCK_DGRAM) {
- pj_list_erase(write_op);
+ pj_list_erase(write_op);
+ write_op->op = 0;
+
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
@@ -267,7 +269,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
if (h->fd_type != PJ_SOCK_DGRAM) {
/* Write completion of the whole stream. */
- pj_list_erase(write_op);
+ pj_list_erase(write_op);
+ write_op->op = 0;
/* Clear operation if there's no more data to send. */
if (pj_list_empty(&h->write_list))
@@ -313,7 +316,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Get one accept operation from the list. */
accept_op = h->accept_list.next;
- pj_list_erase(accept_op);
+ pj_list_erase(accept_op);
+ accept_op->op = 0;
/* Clear bit in fdset if there is no more pending accept */
if (pj_list_empty(&h->accept_list))
@@ -346,7 +350,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Get one pending read operation from the list. */
read_op = h->read_list.next;
- pj_list_erase(read_op);
+ pj_list_erase(read_op);
+ read_op->op = 0;
/* Clear fdset if there is no pending read. */
if (pj_list_empty(&h->read_list))
@@ -475,6 +480,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available.
*/
@@ -496,8 +504,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
- read_op = (struct read_operation*)op_key;
-
read_op->op = PJ_IOQUEUE_OP_RECV;
read_op->buf = buffer;
read_op->size = *length;
@@ -530,6 +536,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available.
*/
@@ -552,8 +561,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
- read_op = (struct read_operation*)op_key;
-
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
@@ -586,6 +593,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track:
* Try to send data immediately, only if there's no pending write!
@@ -624,7 +634,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
/*
* Schedule asynchronous send.
*/
- write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND;
write_op->buf = (void*)data;
write_op->size = *length;
@@ -659,6 +668,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track:
* Try to send data immediately, only if there's no pending write!
@@ -702,7 +714,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
/*
* Schedule asynchronous send.
*/
- write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
write_op->buf = (void*)data;
write_op->size = *length;
@@ -735,6 +746,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+ accept_op = (struct accept_operation*)op_key;
+ accept_op->op = 0;
/* Fast track:
* See if there's new connection available immediately.
@@ -767,8 +781,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
* Schedule accept() operation to be completed when there is incoming
* connection available.
*/
- accept_op = (struct accept_operation*)op_key;
-
accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
accept_op->accept_fd = new_sock;
accept_op->rmt_addr = remote;
@@ -821,3 +833,82 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
}
#endif /* PJ_HAS_TCP */
+/*
+ * pj_ioqueue_is_pending()
+ */
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ struct generic_operation *op_rec;
+
+ PJ_UNUSED_ARG(key);
+
+ op_rec = (struct generic_operation*)op_key;
+ return op_rec->op != 0;
+}
+
+
+/*
+ * pj_ioqueue_post_completion()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ struct generic_operation *op_rec;
+
+ /*
+ * Find the operation key in all pending operation list to
+ * really make sure that it's still there; then call the callback.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Find the operation in the pending read list. */
+ op_rec = (struct generic_operation*)key->read_list.next;
+ while (op_rec != (void*)&key->read_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_read_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending write list. */
+ op_rec = (struct generic_operation*)key->write_list.next;
+ while (op_rec != (void*)&key->write_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_write_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending accept list. */
+ op_rec = (struct generic_operation*)key->accept_list.next;
+ while (op_rec != (void*)&key->accept_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_accept_complete)(key, op_key,
+ PJ_INVALID_SOCKET,
+ bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EINVALIDOP;
+}
+