diff options
Diffstat (limited to 'pjmedia/src/pjmedia/event.c')
-rw-r--r-- | pjmedia/src/pjmedia/event.c | 377 |
1 files changed, 377 insertions, 0 deletions
diff --git a/pjmedia/src/pjmedia/event.c b/pjmedia/src/pjmedia/event.c new file mode 100644 index 0000000..bc03ac2 --- /dev/null +++ b/pjmedia/src/pjmedia/event.c @@ -0,0 +1,377 @@ +/* $Id: event.c 3905 2011-12-09 05:15:39Z ming $ */ +/* + * Copyright (C) 2011-2011 Teluu Inc. (http://www.teluu.com) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include <pjmedia/event.h> +#include <pjmedia/errno.h> +#include <pj/assert.h> +#include <pj/list.h> +#include <pj/log.h> +#include <pj/os.h> +#include <pj/pool.h> +#include <pj/string.h> + +#define THIS_FILE "event.c" + +#define MAX_EVENTS 16 + +typedef struct esub esub; + +struct esub +{ + PJ_DECL_LIST_MEMBER(esub); + + pjmedia_event_cb *cb; + void *user_data; + void *epub; +}; + +typedef struct event_queue +{ + pjmedia_event events[MAX_EVENTS]; /**< array of events. */ + int head, tail; + pj_bool_t is_full; +} event_queue; + +struct pjmedia_event_mgr +{ + pj_pool_t *pool; + pj_thread_t *thread; /**< worker thread. */ + pj_bool_t is_quitting; + pj_sem_t *sem; + pj_mutex_t *mutex; + event_queue ev_queue; + event_queue *pub_ev_queue; /**< publish() event queue. */ + esub esub_list; /**< list of subscribers. */ + esub free_esub_list; /**< list of subscribers. */ + esub *th_next_sub, /**< worker thread's next sub. */ + *pub_next_sub; /**< publish() next sub. */ +}; + +static pjmedia_event_mgr *event_manager_instance; + +static pj_status_t event_queue_add_event(event_queue* ev_queue, + pjmedia_event *event) +{ + if (ev_queue->is_full) { + char ev_name[5]; + + /* This event will be ignored. */ + PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] " + "due to full queue.", + pjmedia_fourcc_name(event->type, ev_name), + event->epub)); + + return PJ_ETOOMANY; + } + + pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event)); + ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS; + if (ev_queue->tail == ev_queue->head) + ev_queue->is_full = PJ_TRUE; + + return PJ_SUCCESS; +} + +static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr, + event_queue *ev_queue, + esub **next_sub, + pj_bool_t rls_lock) +{ + pj_status_t err = PJ_SUCCESS; + esub * sub = mgr->esub_list.next; + pjmedia_event *ev = &ev_queue->events[ev_queue->head]; + + while (sub != &mgr->esub_list) { + *next_sub = sub->next; + + /* Check if the subscriber is interested in + * receiving the event from the publisher. + */ + if (sub->epub == ev->epub || !sub->epub) { + pjmedia_event_cb *cb = sub->cb; + void *user_data = sub->user_data; + pj_status_t status; + + if (rls_lock) + pj_mutex_unlock(mgr->mutex); + + status = (*cb)(ev, user_data); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + + if (rls_lock) + pj_mutex_lock(mgr->mutex); + } + sub = *next_sub; + } + *next_sub = NULL; + + ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS; + ev_queue->is_full = PJ_FALSE; + + return err; +} + +/* Event worker thread function. */ +static int event_worker_thread(void *arg) +{ + pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg; + + while (1) { + /* Wait until there is an event. */ + pj_sem_wait(mgr->sem); + + if (mgr->is_quitting) + break; + + pj_mutex_lock(mgr->mutex); + event_mgr_distribute_events(mgr, &mgr->ev_queue, + &mgr->th_next_sub, PJ_TRUE); + pj_mutex_unlock(mgr->mutex); + } + + return 0; +} + +PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool, + unsigned options, + pjmedia_event_mgr **p_mgr) +{ + pjmedia_event_mgr *mgr; + pj_status_t status; + + mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr); + mgr->pool = pj_pool_create(pool->factory, "evt mgr", 500, 500, NULL); + pj_list_init(&mgr->esub_list); + pj_list_init(&mgr->free_esub_list); + + if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) { + status = pj_sem_create(mgr->pool, "ev_sem", 0, MAX_EVENTS + 1, + &mgr->sem); + if (status != PJ_SUCCESS) + return status; + + status = pj_thread_create(mgr->pool, "ev_thread", + &event_worker_thread, + mgr, 0, 0, &mgr->thread); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; + } + } + + status = pj_mutex_create_recursive(mgr->pool, "ev_mutex", &mgr->mutex); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; + } + + if (!event_manager_instance) + event_manager_instance = mgr; + + if (p_mgr) + *p_mgr = mgr; + + return PJ_SUCCESS; +} + +PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void) +{ + return event_manager_instance; +} + +PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr) +{ + event_manager_instance = mgr; +} + +PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr) +{ + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_ON_FAIL(mgr != NULL, return); + + if (mgr->thread) { + mgr->is_quitting = PJ_TRUE; + pj_sem_post(mgr->sem); + pj_thread_join(mgr->thread); + } + + if (mgr->sem) { + pj_sem_destroy(mgr->sem); + mgr->sem = NULL; + } + + if (mgr->mutex) { + pj_mutex_destroy(mgr->mutex); + mgr->mutex = NULL; + } + + if (mgr->pool) + pj_pool_release(mgr->pool); + + if (event_manager_instance == mgr) + event_manager_instance = NULL; +} + +PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, + pjmedia_event_type type, + const pj_timestamp *ts, + const void *src) +{ + pj_bzero(event, sizeof(*event)); + event->type = type; + if (ts) + event->timestamp.u64 = ts->u64; + event->epub = event->src = src; +} + +PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr, + pjmedia_event_cb *cb, + void *user_data, + void *epub) +{ + esub *sub; + + PJ_ASSERT_RETURN(cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + /* Check whether callback function with the same user data is already + * subscribed to the publisher. This is to prevent the callback function + * receiving the same event from the same publisher more than once. + */ + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && sub->user_data == user_data && + sub->epub == epub) + { + pj_mutex_unlock(mgr->mutex); + return PJ_SUCCESS; + } + sub = next; + } + + if (mgr->free_esub_list.next != &mgr->free_esub_list) { + sub = mgr->free_esub_list.next; + pj_list_erase(sub); + } else + sub = PJ_POOL_ZALLOC_T(mgr->pool, esub); + sub->cb = cb; + sub->user_data = user_data; + sub->epub = epub; + pj_list_push_back(&mgr->esub_list, sub); + pj_mutex_unlock(mgr->mutex); + + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) +pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr, + pjmedia_event_cb *cb, + void *user_data, + void *epub) +{ + esub *sub; + + PJ_ASSERT_RETURN(cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && (sub->user_data == user_data || !user_data) && + (sub->epub == epub || !epub)) + { + /* If the worker thread or pjmedia_event_publish() API is + * in the process of distributing events, make sure that + * its pointer to the next subscriber stays valid. + */ + if (mgr->th_next_sub == sub) + mgr->th_next_sub = sub->next; + if (mgr->pub_next_sub == sub) + mgr->pub_next_sub = sub->next; + pj_list_erase(sub); + pj_list_push_back(&mgr->free_esub_list, sub); + if (user_data && epub) + break; + } + sub = next; + } + pj_mutex_unlock(mgr->mutex); + + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr, + void *epub, + pjmedia_event *event, + pjmedia_event_publish_flag flag) +{ + pj_status_t err = PJ_SUCCESS; + + PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + event->epub = epub; + + pj_mutex_lock(mgr->mutex); + if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) { + if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS) + pj_sem_post(mgr->sem); + } else { + /* For nested pjmedia_event_publish() calls, i.e. calling publish() + * inside the subscriber's callback, the function will only add + * the event to the event queue of the first publish() call. It + * is the first publish() call that will be responsible to + * distribute the events. + */ + if (mgr->pub_ev_queue) { + event_queue_add_event(mgr->pub_ev_queue, event); + } else { + static event_queue ev_queue; + pj_status_t status; + + ev_queue.head = ev_queue.tail = 0; + ev_queue.is_full = PJ_FALSE; + mgr->pub_ev_queue = &ev_queue; + + event_queue_add_event(mgr->pub_ev_queue, event); + + do { + status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue, + &mgr->pub_next_sub, + PJ_FALSE); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + } while(ev_queue.head != ev_queue.tail || ev_queue.is_full); + + mgr->pub_ev_queue = NULL; + } + } + pj_mutex_unlock(mgr->mutex); + + return err; +} |