diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-09 15:37:19 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-09 15:37:19 +0000 |
commit | 6e1024262b48b57b771331b8c19e988e43627bd7 (patch) | |
tree | a43fdaeb6d7b22cc7afab1633622bf55d39dfd67 /pjlib/src | |
parent | fb9e3b3a6649cc5cbe0c6747cb1918f3be71ba06 (diff) |
Rework pjlib++
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@36 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src')
-rw-r--r-- | pjlib/src/pj++/compiletest.cpp | 46 | ||||
-rw-r--r-- | pjlib/src/pj++/pj++.cpp | 17 | ||||
-rw-r--r-- | pjlib/src/pj++/proactor.cpp | 298 | ||||
-rw-r--r-- | pjlib/src/pj/errno.c | 3 | ||||
-rw-r--r-- | pjlib/src/pj/file_io_ansi.c | 6 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 115 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_winnt.c | 34 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_unix.c | 75 | ||||
-rw-r--r-- | pjlib/src/pj/os_core_win32.c | 69 | ||||
-rw-r--r-- | pjlib/src/pj/timer.c | 96 | ||||
-rw-r--r-- | pjlib/src/pjlib++-test/main.cpp | 29 |
11 files changed, 342 insertions, 446 deletions
diff --git a/pjlib/src/pj++/compiletest.cpp b/pjlib/src/pj++/compiletest.cpp deleted file mode 100644 index 5bc4f8bd..00000000 --- a/pjlib/src/pj++/compiletest.cpp +++ /dev/null @@ -1,46 +0,0 @@ -/* $Id$ - * - */
-#include <pjlib++.hpp>
-
-
-#if 0
-struct MyNode
-{
- PJ_DECL_LIST_MEMBER(struct MyNode)
- int data;
-};
-
-int test()
-{
- typedef PJ_List<MyNode> MyList;
- MyList list;
- MyList::iterator it, end = list.end();
-
- for (it=list.begin(); it!=end; ++it) {
- MyNode *n = *it;
- }
-
- return 0;
-}
-
-int test_scan()
-{
- PJ_Scanner scan;
- PJ_String s;
- PJ_CharSpec cs;
-
- scan.get(&cs, &s);
- return 0;
-}
-
-int test_scan_c()
-{
- pj_scanner scan;
- pj_str_t s;
- pj_char_spec cs;
-
- pj_scan_get(&scan, cs, &s);
- return 0;
-}
-#endif
diff --git a/pjlib/src/pj++/pj++.cpp b/pjlib/src/pj++/pj++.cpp deleted file mode 100644 index 45442098..00000000 --- a/pjlib/src/pj++/pj++.cpp +++ /dev/null @@ -1,17 +0,0 @@ -/* $Id$ - * - */
-#include <pj++/scanner.hpp>
-#include <pj++/timer.hpp>
-#include <pj/except.h>
-
-void PJ_Scanner::syntax_error_handler_throw_pj(pj_scanner *)
-{
- PJ_THROW( PJ_Scanner::SYNTAX_ERROR );
-}
-
-void PJ_Timer_Entry::timer_heap_callback(pj_timer_heap_t *, pj_timer_entry *e)
-{
- PJ_Timer_Entry *entry = static_cast<PJ_Timer_Entry*>(e);
- entry->on_timeout();
-}
diff --git a/pjlib/src/pj++/proactor.cpp b/pjlib/src/pj++/proactor.cpp deleted file mode 100644 index dba9370b..00000000 --- a/pjlib/src/pj++/proactor.cpp +++ /dev/null @@ -1,298 +0,0 @@ -/* $Id$ - * - */
-#include <pj++/proactor.hpp>
-#include <pj/string.h> // memset
-
-static struct pj_ioqueue_callback ioqueue_cb =
-{
- &PJ_Event_Handler::read_complete_cb,
- &PJ_Event_Handler::write_complete_cb,
- &PJ_Event_Handler::accept_complete_cb,
- &PJ_Event_Handler::connect_complete_cb,
-};
-
-PJ_Event_Handler::PJ_Event_Handler()
-: proactor_(NULL), key_(NULL)
-{
- pj_memset(&timer_, 0, sizeof(timer_));
- timer_.user_data = this;
- timer_.cb = &timer_callback;
-}
-
-PJ_Event_Handler::~PJ_Event_Handler()
-{
-}
-
-#if PJ_HAS_TCP
-bool PJ_Event_Handler::connect(const PJ_INET_Addr &addr)
-{
- pj_assert(key_ != NULL && proactor_ != NULL);
-
- if (key_ == NULL || proactor_ == NULL)
- return false;
-
- int status = pj_ioqueue_connect(proactor_->get_io_queue(), key_,
- &addr, sizeof(PJ_INET_Addr));
- if (status == 0) {
- on_connect_complete(0);
- return true;
- } else if (status == PJ_IOQUEUE_PENDING) {
- return true;
- } else {
- return false;
- }
-}
-
-bool PJ_Event_Handler::accept(PJ_Socket *sock, PJ_INET_Addr *local, PJ_INET_Addr *remote)
-{
- pj_assert(key_ != NULL && proactor_ != NULL);
-
- if (key_ == NULL || proactor_ == NULL)
- return false;
-
- int status = pj_ioqueue_accept(proactor_->get_io_queue(), key_,
- &sock->get_handle(),
- local_addr, remote,
- (remote? sizeof(*remote) : 0));
- if (status == 0) {
- on_accept_complete(0);
- return true;
- } else if (status == PJ_IOQUEUE_PENDING) {
- return true;
- } else {
- return false;
- }
-}
-
-#endif
-
-bool PJ_Event_Handler::read(void *buf, pj_size_t len)
-{
- pj_assert(key_ != NULL && proactor_ != NULL);
-
- if (key_ == NULL || proactor_ == NULL)
- return false;
-
- int bytes_status = pj_ioqueue_read(proactor_->get_io_queue(),
- key_, buf, len);
- if (bytes_status >= 0) {
- on_read_complete(bytes_status);
- return true;
- } else if (bytes_status == PJ_IOQUEUE_PENDING) {
- return true;
- } else {
- return false;
- }
-}
-
-bool PJ_Event_Handler::recvfrom(void *buf, pj_size_t len, PJ_INET_Addr *addr)
-{
- pj_assert(key_ != NULL && proactor_ != NULL);
-
- if (key_ == NULL || proactor_ == NULL)
- return false;
-
-
- tmp_recvfrom_addr_len = sizeof(PJ_INET_Addr);
-
- int bytes_status = pj_ioqueue_recvfrom(proactor_->get_io_queue(),
- key_, buf, len,
- addr,
- (addr? &tmp_recvfrom_addr_len : NULL));
- if (bytes_status >= 0) {
- on_read_complete(bytes_status);
- return true;
- } else if (bytes_status == PJ_IOQUEUE_PENDING) {
- return true;
- } else {
- return false;
- }
-}
-
-bool PJ_Event_Handler::write(const void *data, pj_size_t len)
-{
- pj_assert(key_ != NULL && proactor_ != NULL);
-
- if (key_ == NULL || proactor_ == NULL)
- return false;
-
- int bytes_status = pj_ioqueue_write(proactor_->get_io_queue(),
- key_, data, len);
- if (bytes_status >= 0) {
- on_write_complete(bytes_status);
- return true;
- } else if (bytes_status == PJ_IOQUEUE_PENDING) {
- return true;
- } else {
- return false;
- }
-}
-
-bool PJ_Event_Handler::sendto(const void *data, pj_size_t len, const PJ_INET_Addr &addr)
-{
- pj_assert(key_ != NULL && proactor_ != NULL);
-
- if (key_ == NULL || proactor_ == NULL)
- return false;
-
- int bytes_status = pj_ioqueue_sendto(proactor_->get_io_queue(),
- key_, data, len,
- &addr, sizeof(PJ_INET_Addr));
- if (bytes_status >= 0) {
- on_write_complete(bytes_status);
- return true;
- } else if (bytes_status == PJ_IOQUEUE_PENDING) {
- return true;
- } else {
- return false;
- }
-}
-
-
-void PJ_Event_Handler::read_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
-{
- PJ_Event_Handler *handler =
- (PJ_Event_Handler*) pj_ioqueue_get_user_data(key);
-
- handler->on_read_complete(bytes_read);
-}
-
-void PJ_Event_Handler::write_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
-{
- PJ_Event_Handler *handler =
- (PJ_Event_Handler*) pj_ioqueue_get_user_data(key);
-
- handler->on_write_complete(bytes_sent);
-}
-
-void PJ_Event_Handler::accept_complete_cb(pj_ioqueue_key_t *key, int status)
-{
-#if PJ_HAS_TCP
- PJ_Event_Handler *handler =
- (PJ_Event_Handler*) pj_ioqueue_get_user_data(key);
-
- handler->on_accept_complete(status);
-#endif
-}
-
-void PJ_Event_Handler::connect_complete_cb(pj_ioqueue_key_t *key, int status)
-{
-#if PJ_HAS_TCP
- PJ_Event_Handler *handler =
- (PJ_Event_Handler*) pj_ioqueue_get_user_data(key);
-
- handler->on_connect_complete(status);
-#endif
-}
-
-void PJ_Event_Handler::timer_callback( pj_timer_heap_t *timer_heap,
- struct pj_timer_entry *entry)
-{
- PJ_Event_Handler *handler = (PJ_Event_Handler*) entry->user_data;
- handler->on_timeout(entry->id);
-}
-
-
-PJ_Proactor *PJ_Proactor::create(PJ_Pool *pool, pj_size_t max_fd,
- pj_size_t timer_entry_count, unsigned timer_flags)
-{
- PJ_Proactor *p = (PJ_Proactor*) pool->calloc(1, sizeof(PJ_Proactor));
- if (!p) return NULL;
-
- p->ioq_ = pj_ioqueue_create(pool->pool_(), max_fd);
- if (!p->ioq_) return NULL;
-
- p->th_ = pj_timer_heap_create(pool->pool_(), timer_entry_count, timer_flags);
- if (!p->th_) return NULL;
-
- return p;
-}
-
-void PJ_Proactor::destroy()
-{
- pj_ioqueue_destroy(ioq_);
-}
-
-bool PJ_Proactor::register_handler(PJ_Pool *pool, PJ_Event_Handler *handler)
-{
- pj_assert(handler->key_ == NULL && handler->proactor_ == NULL);
-
- if (handler->key_ != NULL)
- return false;
-
- handler->key_ = pj_ioqueue_register_sock(pool->pool_(), ioq_,
- handler->get_handle(),
- handler, &ioqueue_cb);
- if (handler->key_ != NULL) {
- handler->proactor_ = this;
- return true;
- } else {
- return false;
- }
-}
-
-void PJ_Proactor::unregister_handler(PJ_Event_Handler *handler)
-{
- if (handler->key_ == NULL) return;
- pj_ioqueue_unregister(ioq_, handler->key_);
- handler->key_ = NULL;
- handler->proactor_ = NULL;
-}
-
-bool PJ_Proactor::schedule_timer( pj_timer_heap_t *timer, PJ_Event_Handler *handler,
- const PJ_Time_Val &delay, int id)
-{
- handler->timer_.id = id;
- return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
-}
-
-bool PJ_Proactor::schedule_timer(PJ_Event_Handler *handler, const PJ_Time_Val &delay,
- int id)
-{
- return schedule_timer(th_, handler, delay, id);
-}
-
-bool PJ_Proactor::cancel_timer(PJ_Event_Handler *handler)
-{
- return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
-}
-
-bool PJ_Proactor::handle_events(PJ_Time_Val *max_timeout)
-{
- pj_time_val timeout;
-
- timeout.sec = timeout.msec = 0; /* timeout is 'out' var. */
-
- if (pj_timer_heap_poll( th_, &timeout ) > 0)
- return true;
-
- if (timeout.sec < 0) timeout.sec = PJ_MAXINT32;
-
- /* If caller specifies maximum time to wait, then compare the value with
- * the timeout to wait from timer, and use the minimum value.
- */
- if (max_timeout && PJ_TIME_VAL_GT(timeout, *max_timeout)) {
- timeout = *max_timeout;
- }
-
- /* Poll events in ioqueue. */
- int result;
-
- result = pj_ioqueue_poll(ioq_, &timeout);
- if (result != 1)
- return false;
-
- return true;
-}
-
-pj_ioqueue_t *PJ_Proactor::get_io_queue()
-{
- return ioq_;
-}
-
-pj_timer_heap_t *PJ_Proactor::get_timer_heap()
-{
- return th_;
-}
-
diff --git a/pjlib/src/pj/errno.c b/pjlib/src/pj/errno.c index 682d9b6c..25bf5b7f 100644 --- a/pjlib/src/pj/errno.c +++ b/pjlib/src/pj/errno.c @@ -29,7 +29,8 @@ static const struct { PJ_ETOOMANY, "Too many objects of the specified type"}, { PJ_EBUSY, "Object is busy"}, { PJ_ENOTSUP, "Option/operation is not supported"}, - { PJ_EINVALIDOP, "Invalid operation"} + { PJ_EINVALIDOP, "Invalid operation"},
+ { PJ_ECANCELLED, "Operation cancelled"} }; /* diff --git a/pjlib/src/pj/file_io_ansi.c b/pjlib/src/pj/file_io_ansi.c index f95c74a9..0946eddc 100644 --- a/pjlib/src/pj/file_io_ansi.c +++ b/pjlib/src/pj/file_io_ansi.c @@ -66,7 +66,8 @@ PJ_DEF(pj_status_t) pj_file_write( pj_oshandle_t fd, clearerr((FILE*)fd); written = fwrite(data, 1, *size, (FILE*)fd); - if (ferror((FILE*)fd)) { + if (ferror((FILE*)fd)) {
+ *size = -1; return PJ_RETURN_OS_ERROR(errno); } @@ -82,7 +83,8 @@ PJ_DEF(pj_status_t) pj_file_read( pj_oshandle_t fd, clearerr((FILE*)fd); bytes = fread(data, 1, *size, (FILE*)fd); - if (ferror((FILE*)fd)) { + if (ferror((FILE*)fd)) {
+ *size = -1; return PJ_RETURN_OS_ERROR(errno); } diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 4cffcae4..75774ede 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -227,7 +227,9 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * so that send() can work in parallel. */ if (h->fd_type == PJ_SOCK_DGRAM) { - pj_list_erase(write_op); + pj_list_erase(write_op);
+ write_op->op = 0;
+ if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); @@ -267,7 +269,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { if (h->fd_type != PJ_SOCK_DGRAM) { /* Write completion of the whole stream. */ - pj_list_erase(write_op); + pj_list_erase(write_op);
+ write_op->op = 0; /* Clear operation if there's no more data to send. */ if (pj_list_empty(&h->write_list)) @@ -313,7 +316,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Get one accept operation from the list. */ accept_op = h->accept_list.next; - pj_list_erase(accept_op); + pj_list_erase(accept_op);
+ accept_op->op = 0; /* Clear bit in fdset if there is no more pending accept */ if (pj_list_empty(&h->accept_list)) @@ -346,7 +350,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Get one pending read operation from the list. */ read_op = h->read_list.next; - pj_list_erase(read_op); + pj_list_erase(read_op);
+ read_op->op = 0; /* Clear fdset if there is no pending read. */ if (pj_list_empty(&h->read_list)) @@ -475,6 +480,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available. */ @@ -496,8 +504,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, * No data is immediately available. * Must schedule asynchronous operation to the ioqueue. */ - read_op = (struct read_operation*)op_key; - read_op->op = PJ_IOQUEUE_OP_RECV; read_op->buf = buffer; read_op->size = *length; @@ -530,6 +536,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ read_op = (struct read_operation*)op_key;
+ read_op->op = 0;
/* Try to see if there's data immediately available. */ @@ -552,8 +561,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, * No data is immediately available. * Must schedule asynchronous operation to the ioqueue. */ - read_op = (struct read_operation*)op_key; - read_op->op = PJ_IOQUEUE_OP_RECV_FROM; read_op->buf = buffer; read_op->size = *length; @@ -586,6 +593,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track: * Try to send data immediately, only if there's no pending write! @@ -624,7 +634,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, /* * Schedule asynchronous send. */ - write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND; write_op->buf = (void*)data; write_op->size = *length; @@ -659,6 +668,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); +
+ write_op = (struct write_operation*)op_key;
+ write_op->op = 0;
/* Fast track: * Try to send data immediately, only if there's no pending write! @@ -702,7 +714,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, /* * Schedule asynchronous send. */ - write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND_TO; write_op->buf = (void*)data; write_op->size = *length; @@ -735,6 +746,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, /* check parameters. All must be specified! */ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); +
+ accept_op = (struct accept_operation*)op_key;
+ accept_op->op = 0;
/* Fast track: * See if there's new connection available immediately. @@ -767,8 +781,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, * Schedule accept() operation to be completed when there is incoming * connection available. */ - accept_op = (struct accept_operation*)op_key; - accept_op->op = PJ_IOQUEUE_OP_ACCEPT; accept_op->accept_fd = new_sock; accept_op->rmt_addr = remote; @@ -821,3 +833,82 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, } #endif /* PJ_HAS_TCP */ +/*
+ * pj_ioqueue_is_pending()
+ */
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ struct generic_operation *op_rec;
+
+ PJ_UNUSED_ARG(key);
+
+ op_rec = (struct generic_operation*)op_key;
+ return op_rec->op != 0;
+}
+
+
+/*
+ * pj_ioqueue_post_completion()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ struct generic_operation *op_rec;
+
+ /*
+ * Find the operation key in all pending operation list to
+ * really make sure that it's still there; then call the callback.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Find the operation in the pending read list. */
+ op_rec = (struct generic_operation*)key->read_list.next;
+ while (op_rec != (void*)&key->read_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_read_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending write list. */
+ op_rec = (struct generic_operation*)key->write_list.next;
+ while (op_rec != (void*)&key->write_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_write_complete)(key, op_key, bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ /* Find the operation in the pending accept list. */
+ op_rec = (struct generic_operation*)key->accept_list.next;
+ while (op_rec != (void*)&key->accept_list) {
+ if (op_rec == (void*)op_key) {
+ pj_list_erase(op_rec);
+ op_rec->op = 0;
+ pj_mutex_unlock(key->mutex);
+
+ (*key->cb.on_accept_complete)(key, op_key,
+ PJ_INVALID_SOCKET,
+ bytes_status);
+ return PJ_SUCCESS;
+ }
+ op_rec = op_rec->next;
+ }
+
+ pj_mutex_unlock(key->mutex);
+
+ return PJ_EINVALIDOP;
+}
+
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 7944953f..93cbb6d5 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -891,3 +891,37 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, } #endif /* #if PJ_HAS_TCP */ +
+
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ BOOL rc;
+ DWORD bytesTransfered;
+
+ rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
+ &bytesTransfered, FALSE );
+
+ if (rc == FALSE) {
+ return GetLastError()==ERROR_IO_INCOMPLETE;
+ }
+
+ return FALSE;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ BOOL rc;
+
+ rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
+ (long)key, (OVERLAPPED*)op_key );
+ if (rc == FALSE) {
+ return PJ_RETURN_OS_ERROR(GetLastError());
+ }
+
+ return PJ_SUCCESS;
+}
+
diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c index 3892b64b..cc57aab2 100644 --- a/pjlib/src/pj/os_core_unix.c +++ b/pjlib/src/pj/os_core_unix.c @@ -558,54 +558,91 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var) return oldval; } +/*
+ * pj_atomic_inc_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)
+{
+ pj_atomic_value_t new_value;
+
+ PJ_CHECK_STACK();
+
+#if PJ_HAS_THREADS
+ pj_mutex_lock( atomic_var->mutex );
+#endif
+ new_value = ++atomic_var->value;
+#if PJ_HAS_THREADS
+ pj_mutex_unlock( atomic_var->mutex);
+#endif
+
+ return new_value;
+}
/* * pj_atomic_inc() */ PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var) { - PJ_CHECK_STACK(); - -#if PJ_HAS_THREADS - pj_mutex_lock( atomic_var->mutex ); -#endif - ++atomic_var->value; -#if PJ_HAS_THREADS - pj_mutex_unlock( atomic_var->mutex); -#endif + pj_atomic_inc_and_get(atomic_var); } /* - * pj_atomic_dec() + * pj_atomic_dec_and_get() */ -PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) -{ +PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var) +{
+ pj_atomic_value_t new_value;
+ PJ_CHECK_STACK(); #if PJ_HAS_THREADS pj_mutex_lock( atomic_var->mutex ); #endif - --atomic_var->value; + new_value = --atomic_var->value; #if PJ_HAS_THREADS pj_mutex_unlock( atomic_var->mutex); -#endif +#endif
+
+ return new_value; } +
+/*
+ * pj_atomic_dec()
+ */
+PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
+{
+ pj_atomic_dec_and_get(atomic_var);
+}
/* - * pj_atomic_add() + * pj_atomic_add_and_get() */ -PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value ) -{ +PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value ) +{
+ pj_atomic_value_t new_value;
+ #if PJ_HAS_THREADS pj_mutex_lock(atomic_var->mutex); #endif - atomic_var->value += value; + atomic_var->value += value;
+ new_value = atomic_var->value; #if PJ_HAS_THREADS pj_mutex_unlock(atomic_var->mutex); -#endif +#endif
+
+ return new_value; } +/*
+ * pj_atomic_add()
+ */
+PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value )
+{
+ pj_atomic_add_and_get(atomic_var, value);
+}
/////////////////////////////////////////////////////////////////////////////// /* diff --git a/pjlib/src/pj/os_core_win32.c b/pjlib/src/pj/os_core_win32.c index 68416c66..be770d56 100644 --- a/pjlib/src/pj/os_core_win32.c +++ b/pjlib/src/pj/os_core_win32.c @@ -512,32 +512,48 @@ PJ_DEF(pj_atomic_value_t) pj_atomic_get(pj_atomic_t *atomic_var) return atomic_var->value; } +/*
+ * pj_atomic_inc_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)
+{
+ PJ_CHECK_STACK();
+
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ return InterlockedIncrement(&atomic_var->value);
+#else
+# error Fix Me
+#endif
+}
+
/* * pj_atomic_inc() */ PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var) -{ - PJ_CHECK_STACK(); - -#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400 - InterlockedIncrement(&atomic_var->value); -#else -# error Fix Me -#endif +{
+ pj_atomic_inc_and_get(atomic_var); } - +
+/*
+ * pj_atomic_dec_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var)
+{
+ PJ_CHECK_STACK();
+
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ return InterlockedDecrement(&atomic_var->value);
+#else
+# error Fix me
+#endif
+}
+
/* * pj_atomic_dec() */ PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) { - PJ_CHECK_STACK(); - -#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400 - InterlockedDecrement(&atomic_var->value); -#else -# error Fix me -#endif + pj_atomic_dec_and_get(atomic_var); } /* @@ -546,10 +562,27 @@ PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var) PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value ) { - InterlockedExchangeAdd( &atomic_var->value, value ); +#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ InterlockedExchangeAdd( &atomic_var->value, value );
+#else
+# error Fix me
+#endif +} +
+/*
+ * pj_atomic_add_and_get()
+ */
+PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var,
+ pj_atomic_value_t value)
+{
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
+ long oldValue = InterlockedExchangeAdd( &atomic_var->value, value);
+ return oldValue + value;
+#else
+# error Fix me
+#endif
} - /////////////////////////////////////////////////////////////////////////////// /* * pj_thread_local_alloc() diff --git a/pjlib/src/pj/timer.c b/pjlib/src/pj/timer.c index 9fa190a1..ffec1f4d 100644 --- a/pjlib/src/pj/timer.c +++ b/pjlib/src/pj/timer.c @@ -14,9 +14,13 @@ #include <pj/string.h> #include <pj/assert.h> #include <pj/errno.h> +#include <pj/lock.h>
#define HEAP_PARENT(X) (X == 0 ? 0 : (((X) - 1) / 2)) #define HEAP_LEFT(X) (((X)+(X))+1) +
+
+#define DEFAULT_MAX_TIMED_OUT_PER_POLL (64)
/** @@ -32,9 +36,15 @@ struct pj_timer_heap_t /** Current size of the heap. */ pj_size_t cur_size; +
+ /** Max timed out entries to process per poll. */
+ unsigned max_entries_per_poll;
- /** Mutex for synchronization, or NULL */ - pj_mutex_t *mutex; + /** Lock object. */ + pj_lock_t *lock; +
+ /** Autodelete lock. */
+ pj_bool_t auto_delete_lock;
/** * Current contents of the Heap, which is organized as a "heap" of @@ -71,15 +81,15 @@ struct pj_timer_heap_t PJ_INLINE(void) lock_timer_heap( pj_timer_heap_t *ht ) { - if (ht->mutex) { - pj_mutex_lock(ht->mutex); + if (ht->lock) { + pj_lock_acquire(ht->lock); } } PJ_INLINE(void) unlock_timer_heap( pj_timer_heap_t *ht ) { - if (ht->mutex) { - pj_mutex_unlock(ht->mutex); + if (ht->lock) { + pj_lock_release(ht->lock); } } @@ -319,7 +329,7 @@ PJ_DEF(pj_size_t) pj_timer_heap_mem_size(pj_size_t count) sizeof(pj_timer_heap_t) + /* size of each entry: */ (count+2) * (sizeof(pj_timer_entry*)+sizeof(pj_timer_id_t)) + - /* mutex, pool etc: */ + /* lock, pool etc: */ 132; } @@ -328,7 +338,6 @@ PJ_DEF(pj_size_t) pj_timer_heap_mem_size(pj_size_t count) */ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool, pj_size_t size, - unsigned flag, pj_timer_heap_t **p_heap) { pj_timer_heap_t *ht; @@ -348,23 +357,14 @@ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool, /* Initialize timer heap sizes */ ht->max_size = size; - ht->cur_size = 0; + ht->cur_size = 0;
+ ht->max_entries_per_poll = DEFAULT_MAX_TIMED_OUT_PER_POLL; ht->timer_ids_freelist = 1; - ht->pool = pool; - - /* Mutex. */ - if (flag & PJ_TIMER_HEAP_NO_SYNCHRONIZE) { - ht->mutex = NULL; - } else { - pj_status_t rc; - - /* Mutex must be the recursive types. - * See commented code inside pj_timer_heap_poll() - */ - rc = pj_mutex_create(pool, "tmhp%p", PJ_MUTEX_RECURSE, &ht->mutex); - if (rc != PJ_SUCCESS) - return rc; - } + ht->pool = pool;
+ + /* Lock. */ + ht->lock = NULL;
+ ht->auto_delete_lock = 0;
// Create the heap array. ht->heap = pj_pool_alloc(pool, sizeof(pj_timer_entry*) * size); @@ -385,6 +385,34 @@ PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool, *p_heap = ht; return PJ_SUCCESS; } +
+PJ_DEF(void) pj_timer_heap_destroy( pj_timer_heap_t *ht )
+{
+ if (ht->lock && ht->auto_delete_lock) {
+ pj_lock_destroy(ht->lock);
+ ht->lock = NULL;
+ }
+}
+
+PJ_DEF(void) pj_timer_heap_set_lock( pj_timer_heap_t *ht,
+ pj_lock_t *lock,
+ pj_bool_t auto_del )
+{
+ if (ht->lock && ht->auto_delete_lock)
+ pj_lock_destroy(ht->lock);
+
+ ht->lock = lock;
+ ht->auto_delete_lock = auto_del;
+}
+
+
+PJ_DEF(unsigned) pj_timer_heap_set_max_timed_out_per_poll(pj_timer_heap_t *ht,
+ unsigned count )
+{
+ unsigned old_count = ht->max_entries_per_poll;
+ ht->max_entries_per_poll = count;
+ return old_count;
+}
PJ_DEF(pj_timer_entry*) pj_timer_entry_init( pj_timer_entry *entry, int id, @@ -433,12 +461,13 @@ PJ_DEF(int) pj_timer_heap_cancel( pj_timer_heap_t *ht, return count; } -PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay ) +PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht,
+ pj_time_val *next_delay ) { pj_time_val now; - int count; + unsigned count; - PJ_ASSERT_RETURN(ht, -1); + PJ_ASSERT_RETURN(ht, 0); if (!ht->cur_size && next_delay) { next_delay->sec = next_delay->msec = PJ_MAXINT32; @@ -450,16 +479,15 @@ PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay ) lock_timer_heap(ht); while ( ht->cur_size && - PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) ) + PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) &&
+ count < ht->max_entries_per_poll ) { pj_timer_entry *node = remove_node(ht, 0); ++count; - //Better not to temporarily release mutex to save some syscalls. - //But then make sure the mutex must be the recursive types (PJ_MUTEX_RECURSE)! - //unlock_timer_heap(ht); + unlock_timer_heap(ht); (*node->cb)(ht, node); - //lock_timer_heap(ht); + lock_timer_heap(ht); } if (ht->cur_size && next_delay) { *next_delay = ht->heap[0]->_timer_value; @@ -473,7 +501,9 @@ PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay ) } PJ_DEF(pj_size_t) pj_timer_heap_count( pj_timer_heap_t *ht ) -{ +{
+ PJ_ASSERT_RETURN(ht, 0);
+ return ht->cur_size; } diff --git a/pjlib/src/pjlib++-test/main.cpp b/pjlib/src/pjlib++-test/main.cpp new file mode 100644 index 00000000..4a6d0aaa --- /dev/null +++ b/pjlib/src/pjlib++-test/main.cpp @@ -0,0 +1,29 @@ +#include <pj++/file.hpp>
+#include <pj++/list.hpp>
+#include <pj++/lock.hpp>
+#include <pj++/hash.hpp>
+#include <pj++/os.hpp>
+#include <pj++/proactor.hpp>
+#include <pj++/sock.hpp>
+#include <pj++/string.hpp>
+#include <pj++/timer.hpp>
+#include <pj++/tree.hpp>
+
+int main()
+{
+ Pjlib lib;
+ Pj_Caching_Pool mem;
+ Pj_Pool the_pool;
+ Pj_Pool *pool = &the_pool;
+
+ the_pool.attach(mem.create_pool(4000,4000));
+
+ Pj_Semaphore_Lock lsem(pool);
+ Pj_Semaphore_Lock *plsem;
+
+ plsem = new(pool) Pj_Semaphore_Lock(pool);
+ delete plsem;
+
+ return 0;
+}
+
|