diff options
author | Liong Sauw Ming <ming@teluu.com> | 2011-12-01 10:49:07 +0000 |
---|---|---|
committer | Liong Sauw Ming <ming@teluu.com> | 2011-12-01 10:49:07 +0000 |
commit | c04000a192a00f047ea6d04e131e42f0b72bc11b (patch) | |
tree | a3f1a4ba2cd467087640c5cb2bd9509570c5acff /pjmedia/src/pjmedia/event.c | |
parent | 5a41db1f3ba90b676e9485a15841e5fec656ed58 (diff) |
Closed #1420: Add support for event manager
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@3893 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia/src/pjmedia/event.c')
-rw-r--r-- | pjmedia/src/pjmedia/event.c | 394 |
1 files changed, 293 insertions, 101 deletions
diff --git a/pjmedia/src/pjmedia/event.c b/pjmedia/src/pjmedia/event.c index 8e92b53e..c499eaa5 100644 --- a/pjmedia/src/pjmedia/event.c +++ b/pjmedia/src/pjmedia/event.c @@ -19,150 +19,342 @@ #include <pjmedia/event.h> #include <pjmedia/errno.h> #include <pj/assert.h> +#include <pj/list.h> #include <pj/log.h> +#include <pj/os.h> +#include <pj/pool.h> #include <pj/string.h> #define THIS_FILE "event.c" -#if 1 -# define TRACE_(x) PJ_LOG(6,x) -#else -# define TRACE_(x) -#endif +#define MAX_EVENTS 16 -PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, - pjmedia_event_type type, - const pj_timestamp *ts, - const pjmedia_event_publisher *epub) +typedef struct esub esub; + +struct esub { - pj_bzero(event, sizeof(*event)); - event->type = type; - if (ts) - event->timestamp.u64 = ts->u64; - event->epub = epub; - if (epub) - event->epub_sig = epub->sig; -} + PJ_DECL_LIST_MEMBER(esub); + + pjmedia_event_cb *cb; + void *user_data; + void *epub; +}; -PJ_DEF(void) pjmedia_event_publisher_init(pjmedia_event_publisher *epub, - pjmedia_obj_sig sig) +typedef struct event_queue { - pj_bzero(epub, sizeof(*epub)); - pj_list_init(&epub->subscription_list); - epub->sig = sig; -} + pjmedia_event events[MAX_EVENTS]; /**< array of events. */ + int head, tail; + pj_bool_t is_full; +} event_queue; + +struct pjmedia_event_mgr +{ + pj_thread_t *thread; /**< worker thread. */ + pj_bool_t is_quitting; + pj_sem_t *sem; + pj_mutex_t *mutex; + event_queue ev_queue; + event_queue *pub_ev_queue; /**< publish() event queue. */ + esub esub_list; /**< list of subscribers. */ + esub *th_next_sub, /**< worker thread's next sub. */ + *pub_next_sub; /**< publish() next sub. */ +}; -PJ_DEF(void) pjmedia_event_subscription_init( pjmedia_event_subscription *esub, - pjmedia_event_cb *cb, - void *user_data) +static pjmedia_event_mgr *event_manager_instance; + +static pj_status_t event_queue_add_event(event_queue* ev_queue, + pjmedia_event *event) { - pj_bzero(esub, sizeof(*esub)); - esub->cb = cb; - esub->user_data = user_data; + if (ev_queue->is_full) { + char ev_name[5]; + + /* This event will be ignored. */ + PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] " + "due to full queue.", + pjmedia_fourcc_name(event->type, ev_name), + event->epub)); + + return PJ_ETOOMANY; + } + + pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event)); + ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS; + if (ev_queue->tail == ev_queue->head) + ev_queue->is_full = PJ_TRUE; + + return PJ_SUCCESS; } -PJ_DEF(pj_bool_t) -pjmedia_event_publisher_has_sub(pjmedia_event_publisher *epub) +static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr, + event_queue *ev_queue, + esub **next_sub) { - PJ_ASSERT_RETURN(epub, PJ_FALSE); - return epub->subscription_list.next && - (!pj_list_empty(&epub->subscription_list)); + pj_status_t err = PJ_SUCCESS; + esub * sub = mgr->esub_list.next; + pjmedia_event *ev = &ev_queue->events[ev_queue->head]; + + while (sub != &mgr->esub_list) { + *next_sub = sub->next; + + /* Check if the subscriber is interested in + * receiving the event from the publisher. + */ + if (sub->epub == ev->epub || !sub->epub) { + pj_status_t status = (*sub->cb)(ev, sub->user_data); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + } + sub = *next_sub; + } + *next_sub = NULL; + + ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS; + ev_queue->is_full = PJ_FALSE; + + return err; } -PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub) +/* Event worker thread function. */ +static int event_worker_thread(void *arg) { - char epub_name[5]; + pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg; - PJ_ASSERT_RETURN(epub && esub && esub->cb, PJ_EINVAL); - /* Must not currently subscribe to anything */ - PJ_ASSERT_RETURN(esub->subscribe_to == NULL, PJ_EINVALIDOP); + while (1) { + /* Wait until there is an event. */ + pj_sem_wait(mgr->sem); - TRACE_((THIS_FILE, "Subscription [0x%p] to publisher %s", - esub, - pjmedia_fourcc_name(epub->sig, epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(epub_name); + if (mgr->is_quitting) + break; - pj_list_push_back(&epub->subscription_list, esub); - esub->subscribe_to = epub; - return PJ_SUCCESS; + pj_mutex_lock(mgr->mutex); + event_mgr_distribute_events(mgr, &mgr->ev_queue, &mgr->th_next_sub); + pj_mutex_unlock(mgr->mutex); + } + + return 0; } -PJ_DEF(pj_status_t) pjmedia_event_unsubscribe(pjmedia_event_subscription *esub) +PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool, + unsigned options, + pjmedia_event_mgr **p_mgr) { - PJ_ASSERT_RETURN(esub, PJ_EINVAL); - - if (esub->subscribe_to) { - char epub_name[5]; - TRACE_((THIS_FILE, "Unsubscription [0x%p] to publisher %s", - esub, - pjmedia_fourcc_name(esub->subscribe_to->sig, - epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(epub_name); - - PJ_ASSERT_RETURN( - pj_list_find_node(&esub->subscribe_to->subscription_list, - esub)==esub, PJ_ENOTFOUND); - pj_list_erase(esub); - esub->subscribe_to = NULL; + pjmedia_event_mgr *mgr; + pj_status_t status; + + mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr); + pj_list_init(&mgr->esub_list); + + if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) { + status = pj_sem_create(pool, "ev_sem", 0, MAX_EVENTS + 1, &mgr->sem); + if (status != PJ_SUCCESS) + return status; + + status = pj_thread_create(pool, "ev_thread", &event_worker_thread, + mgr, 0, 0, &mgr->thread); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; + } + } + + status = pj_mutex_create_recursive(pool, "ev_mutex", &mgr->mutex); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; } + + if (!event_manager_instance) + event_manager_instance = mgr; + + if (p_mgr) + *p_mgr = mgr; + return PJ_SUCCESS; } -PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_publisher *epub, - pjmedia_event *event) +PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void) { - pjmedia_event_subscription *esub; - char event_name[5]; - char epub_name[5]; - pj_status_t err = PJ_SUCCESS; + return event_manager_instance; +} - PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); +PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr) +{ + event_manager_instance = mgr; +} - TRACE_((THIS_FILE, "Event %s is published by publisher %s", - pjmedia_fourcc_name(event->type, event_name), - pjmedia_fourcc_name(epub->sig, epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(event_name); - PJ_UNUSED_ARG(epub_name); +PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr) +{ + esub *sub; - esub = epub->subscription_list.next; - if (!esub) - return err; + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_ON_FAIL(mgr != NULL, return); - while (esub != &epub->subscription_list) { - pjmedia_event_subscription *next; - pj_status_t status; + if (mgr->thread) { + mgr->is_quitting = PJ_TRUE; + pj_sem_post(mgr->sem); + pj_thread_join(mgr->thread); + } - /* just in case esub is destroyed in the callback */ - next = esub->next; + if (mgr->sem) { + pj_sem_destroy(mgr->sem); + mgr->sem = NULL; + } - status = (*esub->cb)(esub, event); - if (status != PJ_SUCCESS && err == PJ_SUCCESS) - err = status; + if (mgr->mutex) { + pj_mutex_destroy(mgr->mutex); + mgr->mutex = NULL; + } - esub = next; + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + pj_list_erase(sub); + sub = next; } - return err; + if (event_manager_instance == mgr) + event_manager_instance = NULL; } -static pj_status_t republisher_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, + pjmedia_event_type type, + const pj_timestamp *ts, + const void *src) { - return pjmedia_event_publish((pjmedia_event_publisher*)esub->user_data, - event); + pj_bzero(event, sizeof(*event)); + event->type = type; + if (ts) + event->timestamp.u64 = ts->u64; + event->epub = event->src = src; } -PJ_DEF(pj_status_t) pjmedia_event_republish(pjmedia_event_publisher *esrc, - pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub) +PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr, + pj_pool_t *pool, + pjmedia_event_cb *cb, + void *user_data, + void *epub) { - PJ_ASSERT_RETURN(esrc && epub && esub, PJ_EINVAL); + esub *sub; + + PJ_ASSERT_RETURN(pool && cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + /* Check whether callback function with the same user data is already + * subscribed to the publisher. This is to prevent the callback function + * receiving the same event from the same publisher more than once. + */ + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && sub->user_data == user_data && + sub->epub == epub) + { + pj_mutex_unlock(mgr->mutex); + return PJ_SUCCESS; + } + sub = next; + } + + sub = PJ_POOL_ZALLOC_T(pool, esub); + sub->cb = cb; + sub->user_data = user_data; + sub->epub = epub; + pj_list_push_back(&mgr->esub_list, sub); + pj_mutex_unlock(mgr->mutex); - pjmedia_event_subscription_init(esub, &republisher_cb, epub); - return pjmedia_event_subscribe(esrc, esub); + return PJ_SUCCESS; } +PJ_DEF(pj_status_t) +pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr, + pjmedia_event_cb *cb, + void *user_data, + void *epub) +{ + esub *sub; + + PJ_ASSERT_RETURN(cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && (sub->user_data == user_data || !user_data) && + (sub->epub == epub || !epub)) + { + /* If the worker thread or pjmedia_event_publish() API is + * in the process of distributing events, make sure that + * its pointer to the next subscriber stays valid. + */ + if (mgr->th_next_sub == sub) + mgr->th_next_sub = sub->next; + if (mgr->pub_next_sub == sub) + mgr->pub_next_sub = sub->next; + pj_list_erase(sub); + if (user_data && epub) + break; + } + sub = next; + } + pj_mutex_unlock(mgr->mutex); + + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr, + void *epub, + pjmedia_event *event, + pjmedia_event_publish_flag flag) +{ + pj_status_t err = PJ_SUCCESS; + + PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + event->epub = epub; + + pj_mutex_lock(mgr->mutex); + if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) { + if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS) + pj_sem_post(mgr->sem); + } else { + /* For nested pjmedia_event_publish() calls, i.e. calling publish() + * inside the subscriber's callback, the function will only add + * the event to the event queue of the first publish() call. It + * is the first publish() call that will be responsible to + * distribute the events. + */ + if (mgr->pub_ev_queue) { + event_queue_add_event(mgr->pub_ev_queue, event); + } else { + static event_queue ev_queue; + pj_status_t status; + + ev_queue.head = ev_queue.tail = 0; + ev_queue.is_full = PJ_FALSE; + mgr->pub_ev_queue = &ev_queue; + + event_queue_add_event(mgr->pub_ev_queue, event); + + do { + status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue, + &mgr->pub_next_sub); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + } while(ev_queue.head != ev_queue.tail || ev_queue.is_full); + + mgr->pub_ev_queue = NULL; + } + } + pj_mutex_unlock(mgr->mutex); + + return err; +} |