diff options
author | Liong Sauw Ming <ming@teluu.com> | 2011-12-01 10:49:07 +0000 |
---|---|---|
committer | Liong Sauw Ming <ming@teluu.com> | 2011-12-01 10:49:07 +0000 |
commit | c04000a192a00f047ea6d04e131e42f0b72bc11b (patch) | |
tree | a3f1a4ba2cd467087640c5cb2bd9509570c5acff /pjmedia/src | |
parent | 5a41db1f3ba90b676e9485a15841e5fec656ed58 (diff) |
Closed #1420: Add support for event manager
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@3893 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia/src')
-rw-r--r-- | pjmedia/src/pjmedia-codec/ffmpeg_codecs.c | 12 | ||||
-rw-r--r-- | pjmedia/src/pjmedia-videodev/colorbar_dev.c | 1 | ||||
-rw-r--r-- | pjmedia/src/pjmedia-videodev/dshow_dev.c | 1 | ||||
-rw-r--r-- | pjmedia/src/pjmedia-videodev/ffmpeg_dev.c | 1 | ||||
-rw-r--r-- | pjmedia/src/pjmedia-videodev/sdl_dev.c | 10 | ||||
-rw-r--r-- | pjmedia/src/pjmedia-videodev/videodev.c | 6 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/event.c | 394 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/port.c | 14 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/vid_codec.c | 1 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/vid_port.c | 72 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/vid_stream.c | 41 | ||||
-rw-r--r-- | pjmedia/src/test/test.c | 2 | ||||
-rw-r--r-- | pjmedia/src/test/vid_codec_test.c | 16 | ||||
-rw-r--r-- | pjmedia/src/test/vid_dev_test.c | 16 | ||||
-rw-r--r-- | pjmedia/src/test/vid_port_test.c | 16 |
15 files changed, 368 insertions, 235 deletions
diff --git a/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c b/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c index 53602b67..ccd0901f 100644 --- a/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c +++ b/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c @@ -1609,15 +1609,15 @@ static pj_status_t ffmpeg_codec_decode_whole(pjmedia_vid_codec *codec, } /* Broadcast event */ - if (pjmedia_event_publisher_has_sub(&codec->epub)) { + { pjmedia_event event; pjmedia_event_init(&event, PJMEDIA_EVENT_FMT_CHANGED, - &input->timestamp, &codec->epub); + &input->timestamp, codec); event.data.fmt_changed.dir = PJMEDIA_DIR_DECODING; pj_memcpy(&event.data.fmt_changed.new_fmt, &ff->param.dec_fmt, sizeof(ff->param.dec_fmt)); - pjmedia_event_publish(&codec->epub, &event); + pjmedia_event_publish(NULL, codec, &event, 0); } } @@ -1651,13 +1651,13 @@ static pj_status_t ffmpeg_codec_decode_whole(pjmedia_vid_codec *codec, output->size = vafp->framebytes; /* Check if we got key frame */ - if (avframe.key_frame && pjmedia_event_publisher_has_sub(&codec->epub)) + if (avframe.key_frame) { pjmedia_event event; pjmedia_event_init(&event, PJMEDIA_EVENT_KEY_FRAME_FOUND, - &output->timestamp, &codec->epub); - pjmedia_event_publish(&codec->epub, &event); + &output->timestamp, codec); + pjmedia_event_publish(NULL, codec, &event, 0); } } else { output->type = PJMEDIA_FRAME_TYPE_NONE; diff --git a/pjmedia/src/pjmedia-videodev/colorbar_dev.c b/pjmedia/src/pjmedia-videodev/colorbar_dev.c index e0d488da..6fb658ef 100644 --- a/pjmedia/src/pjmedia-videodev/colorbar_dev.c +++ b/pjmedia/src/pjmedia-videodev/colorbar_dev.c @@ -416,7 +416,6 @@ static pj_status_t cbar_factory_create_stream( strm->cbfi = cbfi; pj_memcpy(&strm->vafp, &vafp, sizeof(vafp)); strm->ts_inc = PJMEDIA_SPF2(param->clock_rate, &vfd->fps, 1); - pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_COLORBAR); for (i = 0; i < vfi->plane_cnt; ++i) { strm->first_line[i] = pj_pool_alloc(pool, vafp.strides[i]); diff --git a/pjmedia/src/pjmedia-videodev/dshow_dev.c b/pjmedia/src/pjmedia-videodev/dshow_dev.c index f65ed921..c116875f 100644 --- a/pjmedia/src/pjmedia-videodev/dshow_dev.c +++ b/pjmedia/src/pjmedia-videodev/dshow_dev.c @@ -869,7 +869,6 @@ static pj_status_t dshow_factory_create_stream( strm->pool = pool; pj_memcpy(&strm->vid_cb, cb, sizeof(*cb)); strm->user_data = user_data; - pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_DSHOW); if (param->dir & PJMEDIA_DIR_CAPTURE) { const pjmedia_video_format_detail *vfd; diff --git a/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c b/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c index 9f874077..1bfc0c27 100644 --- a/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c +++ b/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c @@ -383,7 +383,6 @@ static pj_status_t ffmpeg_factory_create_stream( strm->factory = (ffmpeg_factory*)f; strm->pool = pool; pj_memcpy(&strm->param, param, sizeof(*param)); - pjmedia_event_publisher_init(&strm->base.epub); /* Done */ strm->base.op = &stream_op; diff --git a/pjmedia/src/pjmedia-videodev/sdl_dev.c b/pjmedia/src/pjmedia-videodev/sdl_dev.c index 465e37b5..109b22bd 100644 --- a/pjmedia/src/pjmedia-videodev/sdl_dev.c +++ b/pjmedia/src/pjmedia-videodev/sdl_dev.c @@ -308,7 +308,7 @@ static struct sdl_stream* find_stream(struct sdl_factory *sf, if (strm) pjmedia_event_init(pevent, PJMEDIA_EVENT_NONE, &strm->last_ts, - &strm->base.epub); + strm); return strm; } @@ -354,7 +354,7 @@ static pj_status_t handle_event(void *data) if (strm && pevent.type != PJMEDIA_EVENT_NONE) { pj_status_t status; - pjmedia_event_publish(&strm->base.epub, &pevent); + pjmedia_event_publish(NULL, strm, &pevent, 0); switch (pevent.type) { case PJMEDIA_EVENT_WND_RESIZED: @@ -375,9 +375,8 @@ static pj_status_t handle_event(void *data) sdl_stream_stop(&strm->base); sdl_destroy_all(strm); pjmedia_event_init(&pevent, PJMEDIA_EVENT_WND_CLOSED, - &strm->last_ts, - &strm->base.epub); - pjmedia_event_publish(&strm->base.epub, &pevent); + &strm->last_ts, strm); + pjmedia_event_publish(NULL, strm, &pevent, 0); /* * Note: don't access the stream after this point, it @@ -916,7 +915,6 @@ static pj_status_t sdl_factory_create_stream( strm->sf = sf; pj_memcpy(&strm->vid_cb, cb, sizeof(*cb)); strm->user_data = user_data; - pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_SDL); /* Create render stream here */ if (param->dir & PJMEDIA_DIR_RENDER) { diff --git a/pjmedia/src/pjmedia-videodev/videodev.c b/pjmedia/src/pjmedia-videodev/videodev.c index 9a82ba6e..9b96520d 100644 --- a/pjmedia/src/pjmedia-videodev/videodev.c +++ b/pjmedia/src/pjmedia-videodev/videodev.c @@ -785,12 +785,6 @@ PJ_DEF(pj_status_t) pjmedia_vid_dev_stream_set_cap( return strm->op->set_cap(strm, cap, value); } -PJ_DEF(pjmedia_event_publisher*) -pjmedia_vid_dev_stream_get_event_publisher(pjmedia_vid_dev_stream *strm) -{ - return &strm->epub; -} - /* API: Start the stream. */ PJ_DEF(pj_status_t) pjmedia_vid_dev_stream_start(pjmedia_vid_dev_stream *strm) { diff --git a/pjmedia/src/pjmedia/event.c b/pjmedia/src/pjmedia/event.c index 8e92b53e..c499eaa5 100644 --- a/pjmedia/src/pjmedia/event.c +++ b/pjmedia/src/pjmedia/event.c @@ -19,150 +19,342 @@ #include <pjmedia/event.h> #include <pjmedia/errno.h> #include <pj/assert.h> +#include <pj/list.h> #include <pj/log.h> +#include <pj/os.h> +#include <pj/pool.h> #include <pj/string.h> #define THIS_FILE "event.c" -#if 1 -# define TRACE_(x) PJ_LOG(6,x) -#else -# define TRACE_(x) -#endif +#define MAX_EVENTS 16 -PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, - pjmedia_event_type type, - const pj_timestamp *ts, - const pjmedia_event_publisher *epub) +typedef struct esub esub; + +struct esub { - pj_bzero(event, sizeof(*event)); - event->type = type; - if (ts) - event->timestamp.u64 = ts->u64; - event->epub = epub; - if (epub) - event->epub_sig = epub->sig; -} + PJ_DECL_LIST_MEMBER(esub); + + pjmedia_event_cb *cb; + void *user_data; + void *epub; +}; -PJ_DEF(void) pjmedia_event_publisher_init(pjmedia_event_publisher *epub, - pjmedia_obj_sig sig) +typedef struct event_queue { - pj_bzero(epub, sizeof(*epub)); - pj_list_init(&epub->subscription_list); - epub->sig = sig; -} + pjmedia_event events[MAX_EVENTS]; /**< array of events. */ + int head, tail; + pj_bool_t is_full; +} event_queue; + +struct pjmedia_event_mgr +{ + pj_thread_t *thread; /**< worker thread. */ + pj_bool_t is_quitting; + pj_sem_t *sem; + pj_mutex_t *mutex; + event_queue ev_queue; + event_queue *pub_ev_queue; /**< publish() event queue. */ + esub esub_list; /**< list of subscribers. */ + esub *th_next_sub, /**< worker thread's next sub. */ + *pub_next_sub; /**< publish() next sub. */ +}; -PJ_DEF(void) pjmedia_event_subscription_init( pjmedia_event_subscription *esub, - pjmedia_event_cb *cb, - void *user_data) +static pjmedia_event_mgr *event_manager_instance; + +static pj_status_t event_queue_add_event(event_queue* ev_queue, + pjmedia_event *event) { - pj_bzero(esub, sizeof(*esub)); - esub->cb = cb; - esub->user_data = user_data; + if (ev_queue->is_full) { + char ev_name[5]; + + /* This event will be ignored. */ + PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] " + "due to full queue.", + pjmedia_fourcc_name(event->type, ev_name), + event->epub)); + + return PJ_ETOOMANY; + } + + pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event)); + ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS; + if (ev_queue->tail == ev_queue->head) + ev_queue->is_full = PJ_TRUE; + + return PJ_SUCCESS; } -PJ_DEF(pj_bool_t) -pjmedia_event_publisher_has_sub(pjmedia_event_publisher *epub) +static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr, + event_queue *ev_queue, + esub **next_sub) { - PJ_ASSERT_RETURN(epub, PJ_FALSE); - return epub->subscription_list.next && - (!pj_list_empty(&epub->subscription_list)); + pj_status_t err = PJ_SUCCESS; + esub * sub = mgr->esub_list.next; + pjmedia_event *ev = &ev_queue->events[ev_queue->head]; + + while (sub != &mgr->esub_list) { + *next_sub = sub->next; + + /* Check if the subscriber is interested in + * receiving the event from the publisher. + */ + if (sub->epub == ev->epub || !sub->epub) { + pj_status_t status = (*sub->cb)(ev, sub->user_data); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + } + sub = *next_sub; + } + *next_sub = NULL; + + ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS; + ev_queue->is_full = PJ_FALSE; + + return err; } -PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub) +/* Event worker thread function. */ +static int event_worker_thread(void *arg) { - char epub_name[5]; + pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg; - PJ_ASSERT_RETURN(epub && esub && esub->cb, PJ_EINVAL); - /* Must not currently subscribe to anything */ - PJ_ASSERT_RETURN(esub->subscribe_to == NULL, PJ_EINVALIDOP); + while (1) { + /* Wait until there is an event. */ + pj_sem_wait(mgr->sem); - TRACE_((THIS_FILE, "Subscription [0x%p] to publisher %s", - esub, - pjmedia_fourcc_name(epub->sig, epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(epub_name); + if (mgr->is_quitting) + break; - pj_list_push_back(&epub->subscription_list, esub); - esub->subscribe_to = epub; - return PJ_SUCCESS; + pj_mutex_lock(mgr->mutex); + event_mgr_distribute_events(mgr, &mgr->ev_queue, &mgr->th_next_sub); + pj_mutex_unlock(mgr->mutex); + } + + return 0; } -PJ_DEF(pj_status_t) pjmedia_event_unsubscribe(pjmedia_event_subscription *esub) +PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool, + unsigned options, + pjmedia_event_mgr **p_mgr) { - PJ_ASSERT_RETURN(esub, PJ_EINVAL); - - if (esub->subscribe_to) { - char epub_name[5]; - TRACE_((THIS_FILE, "Unsubscription [0x%p] to publisher %s", - esub, - pjmedia_fourcc_name(esub->subscribe_to->sig, - epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(epub_name); - - PJ_ASSERT_RETURN( - pj_list_find_node(&esub->subscribe_to->subscription_list, - esub)==esub, PJ_ENOTFOUND); - pj_list_erase(esub); - esub->subscribe_to = NULL; + pjmedia_event_mgr *mgr; + pj_status_t status; + + mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr); + pj_list_init(&mgr->esub_list); + + if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) { + status = pj_sem_create(pool, "ev_sem", 0, MAX_EVENTS + 1, &mgr->sem); + if (status != PJ_SUCCESS) + return status; + + status = pj_thread_create(pool, "ev_thread", &event_worker_thread, + mgr, 0, 0, &mgr->thread); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; + } + } + + status = pj_mutex_create_recursive(pool, "ev_mutex", &mgr->mutex); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; } + + if (!event_manager_instance) + event_manager_instance = mgr; + + if (p_mgr) + *p_mgr = mgr; + return PJ_SUCCESS; } -PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_publisher *epub, - pjmedia_event *event) +PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void) { - pjmedia_event_subscription *esub; - char event_name[5]; - char epub_name[5]; - pj_status_t err = PJ_SUCCESS; + return event_manager_instance; +} - PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); +PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr) +{ + event_manager_instance = mgr; +} - TRACE_((THIS_FILE, "Event %s is published by publisher %s", - pjmedia_fourcc_name(event->type, event_name), - pjmedia_fourcc_name(epub->sig, epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(event_name); - PJ_UNUSED_ARG(epub_name); +PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr) +{ + esub *sub; - esub = epub->subscription_list.next; - if (!esub) - return err; + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_ON_FAIL(mgr != NULL, return); - while (esub != &epub->subscription_list) { - pjmedia_event_subscription *next; - pj_status_t status; + if (mgr->thread) { + mgr->is_quitting = PJ_TRUE; + pj_sem_post(mgr->sem); + pj_thread_join(mgr->thread); + } - /* just in case esub is destroyed in the callback */ - next = esub->next; + if (mgr->sem) { + pj_sem_destroy(mgr->sem); + mgr->sem = NULL; + } - status = (*esub->cb)(esub, event); - if (status != PJ_SUCCESS && err == PJ_SUCCESS) - err = status; + if (mgr->mutex) { + pj_mutex_destroy(mgr->mutex); + mgr->mutex = NULL; + } - esub = next; + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + pj_list_erase(sub); + sub = next; } - return err; + if (event_manager_instance == mgr) + event_manager_instance = NULL; } -static pj_status_t republisher_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, + pjmedia_event_type type, + const pj_timestamp *ts, + const void *src) { - return pjmedia_event_publish((pjmedia_event_publisher*)esub->user_data, - event); + pj_bzero(event, sizeof(*event)); + event->type = type; + if (ts) + event->timestamp.u64 = ts->u64; + event->epub = event->src = src; } -PJ_DEF(pj_status_t) pjmedia_event_republish(pjmedia_event_publisher *esrc, - pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub) +PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr, + pj_pool_t *pool, + pjmedia_event_cb *cb, + void *user_data, + void *epub) { - PJ_ASSERT_RETURN(esrc && epub && esub, PJ_EINVAL); + esub *sub; + + PJ_ASSERT_RETURN(pool && cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + /* Check whether callback function with the same user data is already + * subscribed to the publisher. This is to prevent the callback function + * receiving the same event from the same publisher more than once. + */ + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && sub->user_data == user_data && + sub->epub == epub) + { + pj_mutex_unlock(mgr->mutex); + return PJ_SUCCESS; + } + sub = next; + } + + sub = PJ_POOL_ZALLOC_T(pool, esub); + sub->cb = cb; + sub->user_data = user_data; + sub->epub = epub; + pj_list_push_back(&mgr->esub_list, sub); + pj_mutex_unlock(mgr->mutex); - pjmedia_event_subscription_init(esub, &republisher_cb, epub); - return pjmedia_event_subscribe(esrc, esub); + return PJ_SUCCESS; } +PJ_DEF(pj_status_t) +pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr, + pjmedia_event_cb *cb, + void *user_data, + void *epub) +{ + esub *sub; + + PJ_ASSERT_RETURN(cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && (sub->user_data == user_data || !user_data) && + (sub->epub == epub || !epub)) + { + /* If the worker thread or pjmedia_event_publish() API is + * in the process of distributing events, make sure that + * its pointer to the next subscriber stays valid. + */ + if (mgr->th_next_sub == sub) + mgr->th_next_sub = sub->next; + if (mgr->pub_next_sub == sub) + mgr->pub_next_sub = sub->next; + pj_list_erase(sub); + if (user_data && epub) + break; + } + sub = next; + } + pj_mutex_unlock(mgr->mutex); + + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr, + void *epub, + pjmedia_event *event, + pjmedia_event_publish_flag flag) +{ + pj_status_t err = PJ_SUCCESS; + + PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + event->epub = epub; + + pj_mutex_lock(mgr->mutex); + if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) { + if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS) + pj_sem_post(mgr->sem); + } else { + /* For nested pjmedia_event_publish() calls, i.e. calling publish() + * inside the subscriber's callback, the function will only add + * the event to the event queue of the first publish() call. It + * is the first publish() call that will be responsible to + * distribute the events. + */ + if (mgr->pub_ev_queue) { + event_queue_add_event(mgr->pub_ev_queue, event); + } else { + static event_queue ev_queue; + pj_status_t status; + + ev_queue.head = ev_queue.tail = 0; + ev_queue.is_full = PJ_FALSE; + mgr->pub_ev_queue = &ev_queue; + + event_queue_add_event(mgr->pub_ev_queue, event); + + do { + status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue, + &mgr->pub_next_sub); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + } while(ev_queue.head != ev_queue.tail || ev_queue.is_full); + + mgr->pub_ev_queue = NULL; + } + } + pj_mutex_unlock(mgr->mutex); + + return err; +} diff --git a/pjmedia/src/pjmedia/port.c b/pjmedia/src/pjmedia/port.c index e838263a..b7defae5 100644 --- a/pjmedia/src/pjmedia/port.c +++ b/pjmedia/src/pjmedia/port.c @@ -117,20 +117,6 @@ PJ_DEF(pj_status_t) pjmedia_port_put_frame( pjmedia_port *port, return PJ_EINVALIDOP; } -/* - * Get event publisher - */ -PJ_DEF(pjmedia_event_publisher*) -pjmedia_port_get_event_publisher(pjmedia_port *port) -{ - PJ_ASSERT_RETURN(port, NULL); - - if (port->get_event_pub) - return (*port->get_event_pub)(port); - - return NULL; -} - /** * Destroy port (and subsequent downstream ports) */ diff --git a/pjmedia/src/pjmedia/vid_codec.c b/pjmedia/src/pjmedia/vid_codec.c index 04d2f455..2002c6d5 100644 --- a/pjmedia/src/pjmedia/vid_codec.c +++ b/pjmedia/src/pjmedia/vid_codec.c @@ -77,7 +77,6 @@ PJ_DEF(void) pjmedia_vid_codec_reset(pjmedia_vid_codec *codec, pjmedia_obj_sig sig) { pj_bzero(codec, sizeof(*codec)); - pjmedia_event_publisher_init(&codec->epub, sig); } /* diff --git a/pjmedia/src/pjmedia/vid_port.c b/pjmedia/src/pjmedia/vid_port.c index fa5c2da6..1c1cdedf 100644 --- a/pjmedia/src/pjmedia/vid_port.c +++ b/pjmedia/src/pjmedia/vid_port.c @@ -61,10 +61,6 @@ struct pjmedia_vid_port pj_size_t conv_buf_size; pjmedia_conversion_param conv_param; - pjmedia_event_publisher epub; - pjmedia_event_subscription esub_dev; - pjmedia_event_subscription esub_client_port; - pjmedia_clock *clock; pjmedia_clock_src clocksrc; @@ -94,10 +90,10 @@ static pj_status_t vidstream_cap_cb(pjmedia_vid_dev_stream *stream, static pj_status_t vidstream_render_cb(pjmedia_vid_dev_stream *stream, void *user_data, pjmedia_frame *frame); -static pj_status_t vidstream_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event); -static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event); +static pj_status_t vidstream_event_cb(pjmedia_event *event, + void *user_data); +static pj_status_t client_port_event_cb(pjmedia_event *event, + void *user_data); static void enc_clock_cb(const pj_timestamp *ts, void *user_data); static void dec_clock_cb(const pj_timestamp *ts, void *user_data); @@ -206,7 +202,6 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_create( pj_pool_t *pool, vp->role = prm->active ? ROLE_ACTIVE : ROLE_PASSIVE; vp->dir = prm->vidparam.dir; // vp->cap_size = vfd->size; - pjmedia_event_publisher_init(&vp->epub, SIGNATURE); vparam = prm->vidparam; dev_name[0] = '\0'; @@ -272,10 +267,8 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_create( pj_pool_t *pool, vparam.fmt.det.vid.fps.num, vparam.fmt.det.vid.fps.denum)); /* Subscribe to device's events */ - pjmedia_event_subscription_init(&vp->esub_dev, vidstream_event_cb, vp); - pjmedia_event_subscribe( - pjmedia_vid_dev_stream_get_event_publisher(vp->strm), - &vp->esub_dev); + pjmedia_event_subscribe(NULL, vp->pool, &vidstream_event_cb, + vp, vp->strm); if (vp->dir & PJMEDIA_DIR_CAPTURE) { pjmedia_format_copy(&vp->conv_param.src, &vparam.fmt); @@ -370,13 +363,6 @@ PJ_DEF(void) pjmedia_vid_port_set_cb(pjmedia_vid_port *vid_port, vid_port->strm_cb_data = user_data; } -PJ_DEF(pjmedia_event_publisher*) -pjmedia_vid_port_get_event_publisher(pjmedia_vid_port *vid_port) -{ - PJ_ASSERT_RETURN(vid_port, NULL); - return &vid_port->epub; -} - PJ_DEF(pjmedia_vid_dev_stream*) pjmedia_vid_port_get_stream(pjmedia_vid_port *vp) { @@ -419,20 +405,14 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_connect(pjmedia_vid_port *vp, pjmedia_port *port, pj_bool_t destroy) { - pjmedia_event_publisher *epub; - PJ_ASSERT_RETURN(vp && vp->role==ROLE_ACTIVE, PJ_EINVAL); vp->destroy_client_port = destroy; vp->client_port = port; /* Subscribe to client port's events */ - epub = pjmedia_port_get_event_publisher(port); - if (epub) { - pjmedia_event_subscription_init(&vp->esub_client_port, - &client_port_event_cb, - vp); - pjmedia_event_subscribe(epub, &vp->esub_client_port); - } + pjmedia_event_subscribe(NULL, vp->pool, &client_port_event_cb, vp, + vp->client_port); + return PJ_SUCCESS; } @@ -441,10 +421,10 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_disconnect(pjmedia_vid_port *vp) { PJ_ASSERT_RETURN(vp && vp->role==ROLE_ACTIVE, PJ_EINVAL); - if (vp->client_port) { - pjmedia_event_unsubscribe(&vp->esub_client_port); - vp->client_port = NULL; - } + pjmedia_event_unsubscribe(NULL, &client_port_event_cb, vp, + vp->client_port); + vp->client_port = NULL; + return PJ_SUCCESS; } @@ -510,10 +490,13 @@ PJ_DEF(void) pjmedia_vid_port_destroy(pjmedia_vid_port *vp) vp->clock = NULL; } if (vp->strm) { + pjmedia_event_unsubscribe(NULL, &vidstream_event_cb, vp, vp->strm); pjmedia_vid_dev_stream_destroy(vp->strm); vp->strm = NULL; } if (vp->client_port) { + pjmedia_event_unsubscribe(NULL, &client_port_event_cb, vp, + vp->client_port); if (vp->destroy_client_port) pjmedia_port_destroy(vp->client_port); vp->client_port = NULL; @@ -560,26 +543,24 @@ static void save_rgb_frame(int width, int height, const pjmedia_frame *frm) */ /* Handle event from vidstream */ -static pj_status_t vidstream_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t vidstream_event_cb(pjmedia_event *event, + void *user_data) { - pjmedia_vid_port *vp = (pjmedia_vid_port*)esub->user_data; + pjmedia_vid_port *vp = (pjmedia_vid_port*)user_data; /* Just republish the event to our client */ - return pjmedia_event_publish(&vp->epub, event); + return pjmedia_event_publish(NULL, vp, event, 0); } -static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t client_port_event_cb(pjmedia_event *event, + void *user_data) { - pjmedia_vid_port *vp = (pjmedia_vid_port*)esub->user_data; + pjmedia_vid_port *vp = (pjmedia_vid_port*)user_data; if (event->type == PJMEDIA_EVENT_FMT_CHANGED) { const pjmedia_video_format_detail *vfd; pj_status_t status; - ++event->proc_cnt; - pjmedia_vid_port_stop(vp); /* Retrieve the video format detail */ @@ -633,8 +614,11 @@ static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub, pjmedia_vid_port_start(vp); } - /* Republish the event */ - return pjmedia_event_publish(&vp->epub, event); + /* Republish the event, post the event to the event manager + * to avoid deadlock if vidport is trying to stop the clock. + */ + return pjmedia_event_publish(NULL, vp, event, + PJMEDIA_EVENT_PUBLISH_POST_EVENT); } static pj_status_t convert_frame(pjmedia_vid_port *vp, diff --git a/pjmedia/src/pjmedia/vid_stream.c b/pjmedia/src/pjmedia/vid_stream.c index 876b0165..9879d9bb 100644 --- a/pjmedia/src/pjmedia/vid_stream.c +++ b/pjmedia/src/pjmedia/vid_stream.c @@ -147,9 +147,6 @@ struct pjmedia_vid_stream pjmedia_vid_codec *codec; /**< Codec instance being used. */ pj_uint32_t last_dec_ts; /**< Last decoded timestamp. */ int last_dec_seq; /**< Last decoded sequence. */ - - pjmedia_event_subscription esub_codec; /**< To subscribe codec events */ - pjmedia_event_publisher epub; /**< To publish events */ }; /* Prototypes */ @@ -339,18 +336,15 @@ static void dump_port_info(const pjmedia_vid_channel *chan, /* * Handle events from stream components. */ -static pj_status_t stream_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t stream_event_cb(pjmedia_event *event, + void *user_data) { - pjmedia_vid_stream *stream = (pjmedia_vid_stream*)esub->user_data; + pjmedia_vid_stream *stream = (pjmedia_vid_stream*)user_data; - if (esub == &stream->esub_codec) { + if (event->epub == stream->codec) { /* This is codec event */ switch (event->type) { case PJMEDIA_EVENT_FMT_CHANGED: - /* we process the event */ - ++event->proc_cnt; - /* Copy the event to avoid deadlock if we publish the event * now. This happens because fmt_event may trigger restart * while we're still holding the jb_mutex. @@ -362,13 +356,7 @@ static pj_status_t stream_event_cb(pjmedia_event_subscription *esub, } } - return pjmedia_event_publish(&stream->epub, event); -} - -static pjmedia_event_publisher *port_get_epub(pjmedia_port *port) -{ - pjmedia_vid_stream *stream = (pjmedia_vid_stream*) port->port_data.pdata; - return &stream->epub; + return pjmedia_event_publish(NULL, stream, event, 0); } #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 @@ -1031,18 +1019,18 @@ static pj_status_t decode_frame(pjmedia_vid_stream *stream, vfd->fps.num / vfd->fps.denum)); /* Publish PJMEDIA_EVENT_FMT_CHANGED event */ - if (pjmedia_event_publisher_has_sub(&stream->epub)) { + { pjmedia_event event; dump_port_info(stream->dec, "changed"); pjmedia_event_init(&event, PJMEDIA_EVENT_FMT_CHANGED, - &frame->timestamp, &stream->epub); + &frame->timestamp, &stream); event.data.fmt_changed.dir = PJMEDIA_DIR_DECODING; pj_memcpy(&event.data.fmt_changed.new_fmt, &stream->info.codec_param->dec_fmt, sizeof(pjmedia_format)); - pjmedia_event_publish(&stream->epub, &event); + pjmedia_event_publish(NULL, stream, &event, 0); } } } @@ -1086,7 +1074,7 @@ static pj_status_t get_frame(pjmedia_port *port, stream->dec : stream->enc, "changed"); - pjmedia_event_publish(&stream->epub, &stream->fmt_event); + pjmedia_event_publish(NULL, stream, &stream->fmt_event, 0); stream->fmt_event.type = PJMEDIA_EVENT_NONE; } @@ -1211,7 +1199,6 @@ static pj_status_t create_channel( pj_pool_t *pool, /* Init port. */ channel->port.port_data.pdata = stream; - channel->port.get_event_pub = &port_get_epub; PJ_LOG(5, (name.ptr, "%s channel created %dx%d %s%s%.*s %d/%d(~%d)fps", @@ -1345,11 +1332,9 @@ PJ_DEF(pj_status_t) pjmedia_vid_stream_create( if (status != PJ_SUCCESS) return status; - /* Init event publisher and subscribe to codec events */ - pjmedia_event_publisher_init(&stream->epub, SIGNATURE); - pjmedia_event_subscription_init(&stream->esub_codec, &stream_event_cb, - stream); - pjmedia_event_subscribe(&stream->codec->epub, &stream->esub_codec); + /* Subscribe to codec events */ + pjmedia_event_subscribe(NULL, pool, &stream_event_cb, stream, + stream->codec); /* Estimate the maximum frame size */ stream->frame_size = vfd_enc->size.w * vfd_enc->size.h * 4; @@ -1556,6 +1541,8 @@ PJ_DEF(pj_status_t) pjmedia_vid_stream_destroy( pjmedia_vid_stream *stream ) /* Free codec. */ if (stream->codec) { + pjmedia_event_unsubscribe(NULL, &stream_event_cb, stream, + stream->codec); pjmedia_vid_codec_close(stream->codec); pjmedia_vid_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec); stream->codec = NULL; diff --git a/pjmedia/src/test/test.c b/pjmedia/src/test/test.c index 2b7c9ca5..275d2da0 100644 --- a/pjmedia/src/test/test.c +++ b/pjmedia/src/test/test.c @@ -72,6 +72,7 @@ int test_main(void) #if defined(PJMEDIA_HAS_VIDEO) && (PJMEDIA_HAS_VIDEO != 0) pjmedia_video_format_mgr_create(pool, 64, 0, NULL); pjmedia_converter_mgr_create(pool, NULL); + pjmedia_event_mgr_create(pool, 0, NULL); pjmedia_vid_codec_mgr_create(pool, NULL); #endif @@ -115,6 +116,7 @@ on_return: #if defined(PJMEDIA_HAS_VIDEO) && (PJMEDIA_HAS_VIDEO != 0) pjmedia_video_format_mgr_destroy(pjmedia_video_format_mgr_instance()); pjmedia_converter_mgr_destroy(pjmedia_converter_mgr_instance()); + pjmedia_event_mgr_destroy(pjmedia_event_mgr_instance()); pjmedia_vid_codec_mgr_destroy(pjmedia_vid_codec_mgr_instance()); #endif diff --git a/pjmedia/src/test/vid_codec_test.c b/pjmedia/src/test/vid_codec_test.c index a6c62469..11ef93ab 100644 --- a/pjmedia/src/test/vid_codec_test.c +++ b/pjmedia/src/test/vid_codec_test.c @@ -47,18 +47,16 @@ typedef struct codec_port_data_t pj_size_t pack_buf_size; } codec_port_data_t; -static pj_status_t codec_on_event(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t codec_on_event(pjmedia_event *event, + void *user_data) { - codec_port_data_t *port_data = (codec_port_data_t*)esub->user_data; + codec_port_data_t *port_data = (codec_port_data_t*)user_data; if (event->type == PJMEDIA_EVENT_FMT_CHANGED) { pjmedia_vid_codec *codec = port_data->codec; pjmedia_vid_codec_param codec_param; pj_status_t status; - ++event->proc_cnt; - status = pjmedia_vid_codec_get_param(codec, &codec_param); if (status != PJ_SUCCESS) return status; @@ -200,7 +198,6 @@ static int encode_decode_test(pj_pool_t *pool, const char *codec_id, pjmedia_vid_port *capture=NULL, *renderer=NULL; pjmedia_vid_port_param vport_param; pjmedia_video_format_detail *vfd; - pjmedia_event_subscription esub; char codec_name[5]; pj_status_t status; int rc = 0; @@ -323,9 +320,8 @@ static int encode_decode_test(pj_pool_t *pool, const char *codec_id, codec_param.dec_fmt.det = codec_param.enc_fmt.det; /* Subscribe to codec events */ - pjmedia_event_subscription_init(&esub, &codec_on_event, - &codec_port_data); - pjmedia_event_subscribe(&codec->epub, &esub); + pjmedia_event_subscribe(NULL, pool, &codec_on_event, &codec_port_data, + codec); } pjmedia_vid_port_param_default(&vport_param); @@ -430,6 +426,8 @@ on_return: pjmedia_vid_port_destroy(renderer); } if (codec) { + pjmedia_event_unsubscribe(NULL, &codec_on_event, &codec_port_data, + codec); pjmedia_vid_codec_close(codec); pjmedia_vid_codec_mgr_dealloc_codec(NULL, codec); } diff --git a/pjmedia/src/test/vid_dev_test.c b/pjmedia/src/test/vid_dev_test.c index 7425e49b..5782337b 100644 --- a/pjmedia/src/test/vid_dev_test.c +++ b/pjmedia/src/test/vid_dev_test.c @@ -76,10 +76,10 @@ static int enum_devs(void) return PJ_SUCCESS; } -static pj_status_t vid_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t vid_event_cb(pjmedia_event *event, + void *user_data) { - PJ_UNUSED_ARG(esub); + PJ_UNUSED_ARG(user_data); if (event->type == PJMEDIA_EVENT_WND_CLOSED) is_quitting = PJ_TRUE; @@ -95,7 +95,6 @@ static int capture_render_loopback(int cap_dev_id, int rend_dev_id, pjmedia_vid_dev_info cdi, rdi; pjmedia_vid_port_param param; pjmedia_video_format_detail *vfd; - pjmedia_event_subscription esub; pj_status_t status; int rc = 0, i; @@ -161,10 +160,7 @@ static int capture_render_loopback(int cap_dev_id, int rend_dev_id, } /* Set event handler */ - pjmedia_event_subscription_init(&esub, &vid_event_cb, NULL); - pjmedia_event_subscribe( - pjmedia_vid_port_get_event_publisher(renderer), - &esub); + pjmedia_event_subscribe(NULL, pool, &vid_event_cb, NULL, renderer); /* Connect capture to renderer */ status = pjmedia_vid_port_connect( @@ -196,8 +192,10 @@ on_return: if (capture) pjmedia_vid_port_destroy(capture); - if (renderer) + if (renderer) { + pjmedia_event_unsubscribe(NULL, &vid_event_cb, NULL, renderer); pjmedia_vid_port_destroy(renderer); + } pj_pool_release(pool); return rc; diff --git a/pjmedia/src/test/vid_port_test.c b/pjmedia/src/test/vid_port_test.c index fed61bb1..dd3bf37e 100644 --- a/pjmedia/src/test/vid_port_test.c +++ b/pjmedia/src/test/vid_port_test.c @@ -32,10 +32,10 @@ static pj_bool_t is_quitting = PJ_FALSE; -static pj_status_t vid_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t vid_event_cb(pjmedia_event *event, + void *user_data) { - PJ_UNUSED_ARG(esub); + PJ_UNUSED_ARG(user_data); if (event->type == PJMEDIA_EVENT_WND_CLOSED) is_quitting = PJ_TRUE; @@ -52,7 +52,6 @@ static int capture_render_loopback(pj_bool_t active, pjmedia_vid_dev_info cdi, rdi; pjmedia_vid_port_param param; pjmedia_video_format_detail *vfd; - pjmedia_event_subscription esub; pj_status_t status; int rc = 0, i; @@ -118,10 +117,7 @@ static int capture_render_loopback(pj_bool_t active, } /* Set event handler */ - pjmedia_event_subscription_init(&esub, &vid_event_cb, NULL); - pjmedia_event_subscribe( - pjmedia_vid_port_get_event_publisher(renderer), - &esub); + pjmedia_event_subscribe(NULL, pool, &vid_event_cb, NULL, renderer); /* Connect capture to renderer */ status = pjmedia_vid_port_connect( @@ -153,8 +149,10 @@ on_return: if (capture) pjmedia_vid_port_destroy(capture); - if (renderer) + if (renderer) { + pjmedia_event_unsubscribe(NULL, &vid_event_cb, NULL, renderer); pjmedia_vid_port_destroy(renderer); + } pj_pool_release(pool); return rc; |