summaryrefslogtreecommitdiff
path: root/pjmedia/src/pjmedia/event.c
diff options
context:
space:
mode:
authorLiong Sauw Ming <ming@teluu.com>2011-12-01 10:49:07 +0000
committerLiong Sauw Ming <ming@teluu.com>2011-12-01 10:49:07 +0000
commitc04000a192a00f047ea6d04e131e42f0b72bc11b (patch)
treea3f1a4ba2cd467087640c5cb2bd9509570c5acff /pjmedia/src/pjmedia/event.c
parent5a41db1f3ba90b676e9485a15841e5fec656ed58 (diff)
Closed #1420: Add support for event manager
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@3893 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia/src/pjmedia/event.c')
-rw-r--r--pjmedia/src/pjmedia/event.c394
1 files changed, 293 insertions, 101 deletions
diff --git a/pjmedia/src/pjmedia/event.c b/pjmedia/src/pjmedia/event.c
index 8e92b53e..c499eaa5 100644
--- a/pjmedia/src/pjmedia/event.c
+++ b/pjmedia/src/pjmedia/event.c
@@ -19,150 +19,342 @@
#include <pjmedia/event.h>
#include <pjmedia/errno.h>
#include <pj/assert.h>
+#include <pj/list.h>
#include <pj/log.h>
+#include <pj/os.h>
+#include <pj/pool.h>
#include <pj/string.h>
#define THIS_FILE "event.c"
-#if 1
-# define TRACE_(x) PJ_LOG(6,x)
-#else
-# define TRACE_(x)
-#endif
+#define MAX_EVENTS 16
-PJ_DEF(void) pjmedia_event_init( pjmedia_event *event,
- pjmedia_event_type type,
- const pj_timestamp *ts,
- const pjmedia_event_publisher *epub)
+typedef struct esub esub;
+
+struct esub
{
- pj_bzero(event, sizeof(*event));
- event->type = type;
- if (ts)
- event->timestamp.u64 = ts->u64;
- event->epub = epub;
- if (epub)
- event->epub_sig = epub->sig;
-}
+ PJ_DECL_LIST_MEMBER(esub);
+
+ pjmedia_event_cb *cb;
+ void *user_data;
+ void *epub;
+};
-PJ_DEF(void) pjmedia_event_publisher_init(pjmedia_event_publisher *epub,
- pjmedia_obj_sig sig)
+typedef struct event_queue
{
- pj_bzero(epub, sizeof(*epub));
- pj_list_init(&epub->subscription_list);
- epub->sig = sig;
-}
+ pjmedia_event events[MAX_EVENTS]; /**< array of events. */
+ int head, tail;
+ pj_bool_t is_full;
+} event_queue;
+
+struct pjmedia_event_mgr
+{
+ pj_thread_t *thread; /**< worker thread. */
+ pj_bool_t is_quitting;
+ pj_sem_t *sem;
+ pj_mutex_t *mutex;
+ event_queue ev_queue;
+ event_queue *pub_ev_queue; /**< publish() event queue. */
+ esub esub_list; /**< list of subscribers. */
+ esub *th_next_sub, /**< worker thread's next sub. */
+ *pub_next_sub; /**< publish() next sub. */
+};
-PJ_DEF(void) pjmedia_event_subscription_init( pjmedia_event_subscription *esub,
- pjmedia_event_cb *cb,
- void *user_data)
+static pjmedia_event_mgr *event_manager_instance;
+
+static pj_status_t event_queue_add_event(event_queue* ev_queue,
+ pjmedia_event *event)
{
- pj_bzero(esub, sizeof(*esub));
- esub->cb = cb;
- esub->user_data = user_data;
+ if (ev_queue->is_full) {
+ char ev_name[5];
+
+ /* This event will be ignored. */
+ PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] "
+ "due to full queue.",
+ pjmedia_fourcc_name(event->type, ev_name),
+ event->epub));
+
+ return PJ_ETOOMANY;
+ }
+
+ pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event));
+ ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS;
+ if (ev_queue->tail == ev_queue->head)
+ ev_queue->is_full = PJ_TRUE;
+
+ return PJ_SUCCESS;
}
-PJ_DEF(pj_bool_t)
-pjmedia_event_publisher_has_sub(pjmedia_event_publisher *epub)
+static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr,
+ event_queue *ev_queue,
+ esub **next_sub)
{
- PJ_ASSERT_RETURN(epub, PJ_FALSE);
- return epub->subscription_list.next &&
- (!pj_list_empty(&epub->subscription_list));
+ pj_status_t err = PJ_SUCCESS;
+ esub * sub = mgr->esub_list.next;
+ pjmedia_event *ev = &ev_queue->events[ev_queue->head];
+
+ while (sub != &mgr->esub_list) {
+ *next_sub = sub->next;
+
+ /* Check if the subscriber is interested in
+ * receiving the event from the publisher.
+ */
+ if (sub->epub == ev->epub || !sub->epub) {
+ pj_status_t status = (*sub->cb)(ev, sub->user_data);
+ if (status != PJ_SUCCESS && err == PJ_SUCCESS)
+ err = status;
+ }
+ sub = *next_sub;
+ }
+ *next_sub = NULL;
+
+ ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS;
+ ev_queue->is_full = PJ_FALSE;
+
+ return err;
}
-PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_publisher *epub,
- pjmedia_event_subscription *esub)
+/* Event worker thread function. */
+static int event_worker_thread(void *arg)
{
- char epub_name[5];
+ pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg;
- PJ_ASSERT_RETURN(epub && esub && esub->cb, PJ_EINVAL);
- /* Must not currently subscribe to anything */
- PJ_ASSERT_RETURN(esub->subscribe_to == NULL, PJ_EINVALIDOP);
+ while (1) {
+ /* Wait until there is an event. */
+ pj_sem_wait(mgr->sem);
- TRACE_((THIS_FILE, "Subscription [0x%p] to publisher %s",
- esub,
- pjmedia_fourcc_name(epub->sig, epub_name)));
- /* Suppress compiler warning if trace is disabled */
- PJ_UNUSED_ARG(epub_name);
+ if (mgr->is_quitting)
+ break;
- pj_list_push_back(&epub->subscription_list, esub);
- esub->subscribe_to = epub;
- return PJ_SUCCESS;
+ pj_mutex_lock(mgr->mutex);
+ event_mgr_distribute_events(mgr, &mgr->ev_queue, &mgr->th_next_sub);
+ pj_mutex_unlock(mgr->mutex);
+ }
+
+ return 0;
}
-PJ_DEF(pj_status_t) pjmedia_event_unsubscribe(pjmedia_event_subscription *esub)
+PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool,
+ unsigned options,
+ pjmedia_event_mgr **p_mgr)
{
- PJ_ASSERT_RETURN(esub, PJ_EINVAL);
-
- if (esub->subscribe_to) {
- char epub_name[5];
- TRACE_((THIS_FILE, "Unsubscription [0x%p] to publisher %s",
- esub,
- pjmedia_fourcc_name(esub->subscribe_to->sig,
- epub_name)));
- /* Suppress compiler warning if trace is disabled */
- PJ_UNUSED_ARG(epub_name);
-
- PJ_ASSERT_RETURN(
- pj_list_find_node(&esub->subscribe_to->subscription_list,
- esub)==esub, PJ_ENOTFOUND);
- pj_list_erase(esub);
- esub->subscribe_to = NULL;
+ pjmedia_event_mgr *mgr;
+ pj_status_t status;
+
+ mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr);
+ pj_list_init(&mgr->esub_list);
+
+ if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) {
+ status = pj_sem_create(pool, "ev_sem", 0, MAX_EVENTS + 1, &mgr->sem);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ status = pj_thread_create(pool, "ev_thread", &event_worker_thread,
+ mgr, 0, 0, &mgr->thread);
+ if (status != PJ_SUCCESS) {
+ pjmedia_event_mgr_destroy(mgr);
+ return status;
+ }
+ }
+
+ status = pj_mutex_create_recursive(pool, "ev_mutex", &mgr->mutex);
+ if (status != PJ_SUCCESS) {
+ pjmedia_event_mgr_destroy(mgr);
+ return status;
}
+
+ if (!event_manager_instance)
+ event_manager_instance = mgr;
+
+ if (p_mgr)
+ *p_mgr = mgr;
+
return PJ_SUCCESS;
}
-PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_publisher *epub,
- pjmedia_event *event)
+PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void)
{
- pjmedia_event_subscription *esub;
- char event_name[5];
- char epub_name[5];
- pj_status_t err = PJ_SUCCESS;
+ return event_manager_instance;
+}
- PJ_ASSERT_RETURN(epub && event, PJ_EINVAL);
+PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr)
+{
+ event_manager_instance = mgr;
+}
- TRACE_((THIS_FILE, "Event %s is published by publisher %s",
- pjmedia_fourcc_name(event->type, event_name),
- pjmedia_fourcc_name(epub->sig, epub_name)));
- /* Suppress compiler warning if trace is disabled */
- PJ_UNUSED_ARG(event_name);
- PJ_UNUSED_ARG(epub_name);
+PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr)
+{
+ esub *sub;
- esub = epub->subscription_list.next;
- if (!esub)
- return err;
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_ON_FAIL(mgr != NULL, return);
- while (esub != &epub->subscription_list) {
- pjmedia_event_subscription *next;
- pj_status_t status;
+ if (mgr->thread) {
+ mgr->is_quitting = PJ_TRUE;
+ pj_sem_post(mgr->sem);
+ pj_thread_join(mgr->thread);
+ }
- /* just in case esub is destroyed in the callback */
- next = esub->next;
+ if (mgr->sem) {
+ pj_sem_destroy(mgr->sem);
+ mgr->sem = NULL;
+ }
- status = (*esub->cb)(esub, event);
- if (status != PJ_SUCCESS && err == PJ_SUCCESS)
- err = status;
+ if (mgr->mutex) {
+ pj_mutex_destroy(mgr->mutex);
+ mgr->mutex = NULL;
+ }
- esub = next;
+ sub = mgr->esub_list.next;
+ while (sub != &mgr->esub_list) {
+ esub *next = sub->next;
+ pj_list_erase(sub);
+ sub = next;
}
- return err;
+ if (event_manager_instance == mgr)
+ event_manager_instance = NULL;
}
-static pj_status_t republisher_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+PJ_DEF(void) pjmedia_event_init( pjmedia_event *event,
+ pjmedia_event_type type,
+ const pj_timestamp *ts,
+ const void *src)
{
- return pjmedia_event_publish((pjmedia_event_publisher*)esub->user_data,
- event);
+ pj_bzero(event, sizeof(*event));
+ event->type = type;
+ if (ts)
+ event->timestamp.u64 = ts->u64;
+ event->epub = event->src = src;
}
-PJ_DEF(pj_status_t) pjmedia_event_republish(pjmedia_event_publisher *esrc,
- pjmedia_event_publisher *epub,
- pjmedia_event_subscription *esub)
+PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr,
+ pj_pool_t *pool,
+ pjmedia_event_cb *cb,
+ void *user_data,
+ void *epub)
{
- PJ_ASSERT_RETURN(esrc && epub && esub, PJ_EINVAL);
+ esub *sub;
+
+ PJ_ASSERT_RETURN(pool && cb, PJ_EINVAL);
+
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
+
+ pj_mutex_lock(mgr->mutex);
+ /* Check whether callback function with the same user data is already
+ * subscribed to the publisher. This is to prevent the callback function
+ * receiving the same event from the same publisher more than once.
+ */
+ sub = mgr->esub_list.next;
+ while (sub != &mgr->esub_list) {
+ esub *next = sub->next;
+ if (sub->cb == cb && sub->user_data == user_data &&
+ sub->epub == epub)
+ {
+ pj_mutex_unlock(mgr->mutex);
+ return PJ_SUCCESS;
+ }
+ sub = next;
+ }
+
+ sub = PJ_POOL_ZALLOC_T(pool, esub);
+ sub->cb = cb;
+ sub->user_data = user_data;
+ sub->epub = epub;
+ pj_list_push_back(&mgr->esub_list, sub);
+ pj_mutex_unlock(mgr->mutex);
- pjmedia_event_subscription_init(esub, &republisher_cb, epub);
- return pjmedia_event_subscribe(esrc, esub);
+ return PJ_SUCCESS;
}
+PJ_DEF(pj_status_t)
+pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr,
+ pjmedia_event_cb *cb,
+ void *user_data,
+ void *epub)
+{
+ esub *sub;
+
+ PJ_ASSERT_RETURN(cb, PJ_EINVAL);
+
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
+
+ pj_mutex_lock(mgr->mutex);
+ sub = mgr->esub_list.next;
+ while (sub != &mgr->esub_list) {
+ esub *next = sub->next;
+ if (sub->cb == cb && (sub->user_data == user_data || !user_data) &&
+ (sub->epub == epub || !epub))
+ {
+ /* If the worker thread or pjmedia_event_publish() API is
+ * in the process of distributing events, make sure that
+ * its pointer to the next subscriber stays valid.
+ */
+ if (mgr->th_next_sub == sub)
+ mgr->th_next_sub = sub->next;
+ if (mgr->pub_next_sub == sub)
+ mgr->pub_next_sub = sub->next;
+ pj_list_erase(sub);
+ if (user_data && epub)
+ break;
+ }
+ sub = next;
+ }
+ pj_mutex_unlock(mgr->mutex);
+
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr,
+ void *epub,
+ pjmedia_event *event,
+ pjmedia_event_publish_flag flag)
+{
+ pj_status_t err = PJ_SUCCESS;
+
+ PJ_ASSERT_RETURN(epub && event, PJ_EINVAL);
+
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
+
+ event->epub = epub;
+
+ pj_mutex_lock(mgr->mutex);
+ if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) {
+ if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS)
+ pj_sem_post(mgr->sem);
+ } else {
+ /* For nested pjmedia_event_publish() calls, i.e. calling publish()
+ * inside the subscriber's callback, the function will only add
+ * the event to the event queue of the first publish() call. It
+ * is the first publish() call that will be responsible to
+ * distribute the events.
+ */
+ if (mgr->pub_ev_queue) {
+ event_queue_add_event(mgr->pub_ev_queue, event);
+ } else {
+ static event_queue ev_queue;
+ pj_status_t status;
+
+ ev_queue.head = ev_queue.tail = 0;
+ ev_queue.is_full = PJ_FALSE;
+ mgr->pub_ev_queue = &ev_queue;
+
+ event_queue_add_event(mgr->pub_ev_queue, event);
+
+ do {
+ status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue,
+ &mgr->pub_next_sub);
+ if (status != PJ_SUCCESS && err == PJ_SUCCESS)
+ err = status;
+ } while(ev_queue.head != ev_queue.tail || ev_queue.is_full);
+
+ mgr->pub_ev_queue = NULL;
+ }
+ }
+ pj_mutex_unlock(mgr->mutex);
+
+ return err;
+}