diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-09 15:37:19 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-09 15:37:19 +0000 |
commit | 6e1024262b48b57b771331b8c19e988e43627bd7 (patch) | |
tree | a43fdaeb6d7b22cc7afab1633622bf55d39dfd67 /pjlib/src/pj/ioqueue_common_abs.c | |
parent | fb9e3b3a6649cc5cbe0c6747cb1918f3be71ba06 (diff) |
Rework pjlib++
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@36 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 115 |
1 files changed, 103 insertions, 12 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 4cffcae4..75774ede 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -227,7 +227,9 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * so that send() can work in parallel. */ if (h->fd_type == PJ_SOCK_DGRAM) { - pj_list_erase(write_op); + pj_list_erase(write_op);
+ write_op->op = 0;
+ if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); @@ -267,7 +269,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { if (h->fd_type != PJ_SOCK_DGRAM) { /* Write completion of the whole stream. */ - pj_list_erase(write_op); + pj_list_erase(write_op);
+ write_op->op = 0; /* Clear operation if there's no more data to send. */ if (pj_list_empty(&h->write_list)) @@ -313,7 +316,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Get one accept operation from the list. */ accept_op = h->accept_list.next; - pj_list_erase(accept_op); + pj_list_erase(accept_op);
+ accept_op->op = 0; /* Clear bit in fdset if there is no more pending accept */ if (pj_list_empty(&h->accept_list)) @@ -346,7 +350,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Get one pending read operation from the list. */ read_op = h->read_list.next; - pj_list_erase(read_op); + pj_list_erase(read_op);
+ read_op->op = 0; /* Clear fdset if there is no pending read. */ if (pj_list_empty(&h->read_list)) @@ -475,6 +480,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available. */ @@ -496,8 +504,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, * No data is immediately available. * Must schedule asynchronous operation to the ioqueue. */ - read_op = (struct read_operation*)op_key; - read_op->op = PJ_IOQUEUE_OP_RECV; read_op->buf = buffer; read_op->size = *length; @@ -530,6 +536,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available. */ @@ -552,8 +561,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, * No data is immediately available. * Must schedule asynchronous operation to the ioqueue. */ - read_op = (struct read_operation*)op_key; - read_op->op = PJ_IOQUEUE_OP_RECV_FROM; read_op->buf = buffer; read_op->size = *length; @@ -586,6 +593,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track: * Try to send data immediately, only if there's no pending write! @@ -624,7 +634,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, /* * Schedule asynchronous send. */ - write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND; write_op->buf = (void*)data; write_op->size = *length; @@ -659,6 +668,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track: * Try to send data immediately, only if there's no pending write! @@ -702,7 +714,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, /* * Schedule asynchronous send. */ - write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND_TO; write_op->buf = (void*)data; write_op->size = *length; @@ -735,6 +746,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, /* check parameters. All must be specified! */ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); +
+ accept_op = (struct accept_operation*)op_key;
+ accept_op->op = 0;
/* Fast track: * See if there's new connection available immediately. @@ -767,8 +781,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, * Schedule accept() operation to be completed when there is incoming * connection available. */ - accept_op = (struct accept_operation*)op_key; - accept_op->op = PJ_IOQUEUE_OP_ACCEPT; accept_op->accept_fd = new_sock; accept_op->rmt_addr = remote; @@ -821,3 +833,82 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, } #endif /* PJ_HAS_TCP */ +/*
+ * pj_ioqueue_is_pending()
+ */
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ struct generic_operation *op_rec;
+
+ PJ_UNUSED_ARG(key);
+
+ op_rec = (struct generic_operation*)op_key;
+ return op_rec->op != 0;
+}
+
+
+/*
+ * pj_ioqueue_post_completion()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ struct generic_operation *op_rec;
+
+ /*
+ * Find the operation key in all pending operation list to
+ * really make sure that it's still there; then call the callback.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Find the operation in the pending read list. */
+ op_rec = (struct generic_operation*)key->read_list.next;
+ while (op_rec != (void*)&key->read_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_read_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending write list. */
+ op_rec = (struct generic_operation*)key->write_list.next;
+ while (op_rec != (void*)&key->write_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_write_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending accept list. */
+ op_rec = (struct generic_operation*)key->accept_list.next;
+ while (op_rec != (void*)&key->accept_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_accept_complete)(key, op_key,
+ PJ_INVALID_SOCKET,
+ bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EINVALIDOP;
+}
+
|