summaryrefslogtreecommitdiff
path: root/pjmedia/src
diff options
context:
space:
mode:
Diffstat (limited to 'pjmedia/src')
-rw-r--r--pjmedia/src/pjmedia-codec/ffmpeg_codecs.c12
-rw-r--r--pjmedia/src/pjmedia-videodev/colorbar_dev.c1
-rw-r--r--pjmedia/src/pjmedia-videodev/dshow_dev.c1
-rw-r--r--pjmedia/src/pjmedia-videodev/ffmpeg_dev.c1
-rw-r--r--pjmedia/src/pjmedia-videodev/sdl_dev.c10
-rw-r--r--pjmedia/src/pjmedia-videodev/videodev.c6
-rw-r--r--pjmedia/src/pjmedia/event.c394
-rw-r--r--pjmedia/src/pjmedia/port.c14
-rw-r--r--pjmedia/src/pjmedia/vid_codec.c1
-rw-r--r--pjmedia/src/pjmedia/vid_port.c72
-rw-r--r--pjmedia/src/pjmedia/vid_stream.c41
-rw-r--r--pjmedia/src/test/test.c2
-rw-r--r--pjmedia/src/test/vid_codec_test.c16
-rw-r--r--pjmedia/src/test/vid_dev_test.c16
-rw-r--r--pjmedia/src/test/vid_port_test.c16
15 files changed, 368 insertions, 235 deletions
diff --git a/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c b/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c
index 53602b67..ccd0901f 100644
--- a/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c
+++ b/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c
@@ -1609,15 +1609,15 @@ static pj_status_t ffmpeg_codec_decode_whole(pjmedia_vid_codec *codec,
}
/* Broadcast event */
- if (pjmedia_event_publisher_has_sub(&codec->epub)) {
+ {
pjmedia_event event;
pjmedia_event_init(&event, PJMEDIA_EVENT_FMT_CHANGED,
- &input->timestamp, &codec->epub);
+ &input->timestamp, codec);
event.data.fmt_changed.dir = PJMEDIA_DIR_DECODING;
pj_memcpy(&event.data.fmt_changed.new_fmt, &ff->param.dec_fmt,
sizeof(ff->param.dec_fmt));
- pjmedia_event_publish(&codec->epub, &event);
+ pjmedia_event_publish(NULL, codec, &event, 0);
}
}
@@ -1651,13 +1651,13 @@ static pj_status_t ffmpeg_codec_decode_whole(pjmedia_vid_codec *codec,
output->size = vafp->framebytes;
/* Check if we got key frame */
- if (avframe.key_frame && pjmedia_event_publisher_has_sub(&codec->epub))
+ if (avframe.key_frame)
{
pjmedia_event event;
pjmedia_event_init(&event, PJMEDIA_EVENT_KEY_FRAME_FOUND,
- &output->timestamp, &codec->epub);
- pjmedia_event_publish(&codec->epub, &event);
+ &output->timestamp, codec);
+ pjmedia_event_publish(NULL, codec, &event, 0);
}
} else {
output->type = PJMEDIA_FRAME_TYPE_NONE;
diff --git a/pjmedia/src/pjmedia-videodev/colorbar_dev.c b/pjmedia/src/pjmedia-videodev/colorbar_dev.c
index e0d488da..6fb658ef 100644
--- a/pjmedia/src/pjmedia-videodev/colorbar_dev.c
+++ b/pjmedia/src/pjmedia-videodev/colorbar_dev.c
@@ -416,7 +416,6 @@ static pj_status_t cbar_factory_create_stream(
strm->cbfi = cbfi;
pj_memcpy(&strm->vafp, &vafp, sizeof(vafp));
strm->ts_inc = PJMEDIA_SPF2(param->clock_rate, &vfd->fps, 1);
- pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_COLORBAR);
for (i = 0; i < vfi->plane_cnt; ++i) {
strm->first_line[i] = pj_pool_alloc(pool, vafp.strides[i]);
diff --git a/pjmedia/src/pjmedia-videodev/dshow_dev.c b/pjmedia/src/pjmedia-videodev/dshow_dev.c
index f65ed921..c116875f 100644
--- a/pjmedia/src/pjmedia-videodev/dshow_dev.c
+++ b/pjmedia/src/pjmedia-videodev/dshow_dev.c
@@ -869,7 +869,6 @@ static pj_status_t dshow_factory_create_stream(
strm->pool = pool;
pj_memcpy(&strm->vid_cb, cb, sizeof(*cb));
strm->user_data = user_data;
- pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_DSHOW);
if (param->dir & PJMEDIA_DIR_CAPTURE) {
const pjmedia_video_format_detail *vfd;
diff --git a/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c b/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c
index 9f874077..1bfc0c27 100644
--- a/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c
+++ b/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c
@@ -383,7 +383,6 @@ static pj_status_t ffmpeg_factory_create_stream(
strm->factory = (ffmpeg_factory*)f;
strm->pool = pool;
pj_memcpy(&strm->param, param, sizeof(*param));
- pjmedia_event_publisher_init(&strm->base.epub);
/* Done */
strm->base.op = &stream_op;
diff --git a/pjmedia/src/pjmedia-videodev/sdl_dev.c b/pjmedia/src/pjmedia-videodev/sdl_dev.c
index 465e37b5..109b22bd 100644
--- a/pjmedia/src/pjmedia-videodev/sdl_dev.c
+++ b/pjmedia/src/pjmedia-videodev/sdl_dev.c
@@ -308,7 +308,7 @@ static struct sdl_stream* find_stream(struct sdl_factory *sf,
if (strm)
pjmedia_event_init(pevent, PJMEDIA_EVENT_NONE, &strm->last_ts,
- &strm->base.epub);
+ strm);
return strm;
}
@@ -354,7 +354,7 @@ static pj_status_t handle_event(void *data)
if (strm && pevent.type != PJMEDIA_EVENT_NONE) {
pj_status_t status;
- pjmedia_event_publish(&strm->base.epub, &pevent);
+ pjmedia_event_publish(NULL, strm, &pevent, 0);
switch (pevent.type) {
case PJMEDIA_EVENT_WND_RESIZED:
@@ -375,9 +375,8 @@ static pj_status_t handle_event(void *data)
sdl_stream_stop(&strm->base);
sdl_destroy_all(strm);
pjmedia_event_init(&pevent, PJMEDIA_EVENT_WND_CLOSED,
- &strm->last_ts,
- &strm->base.epub);
- pjmedia_event_publish(&strm->base.epub, &pevent);
+ &strm->last_ts, strm);
+ pjmedia_event_publish(NULL, strm, &pevent, 0);
/*
* Note: don't access the stream after this point, it
@@ -916,7 +915,6 @@ static pj_status_t sdl_factory_create_stream(
strm->sf = sf;
pj_memcpy(&strm->vid_cb, cb, sizeof(*cb));
strm->user_data = user_data;
- pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_SDL);
/* Create render stream here */
if (param->dir & PJMEDIA_DIR_RENDER) {
diff --git a/pjmedia/src/pjmedia-videodev/videodev.c b/pjmedia/src/pjmedia-videodev/videodev.c
index 9a82ba6e..9b96520d 100644
--- a/pjmedia/src/pjmedia-videodev/videodev.c
+++ b/pjmedia/src/pjmedia-videodev/videodev.c
@@ -785,12 +785,6 @@ PJ_DEF(pj_status_t) pjmedia_vid_dev_stream_set_cap(
return strm->op->set_cap(strm, cap, value);
}
-PJ_DEF(pjmedia_event_publisher*)
-pjmedia_vid_dev_stream_get_event_publisher(pjmedia_vid_dev_stream *strm)
-{
- return &strm->epub;
-}
-
/* API: Start the stream. */
PJ_DEF(pj_status_t) pjmedia_vid_dev_stream_start(pjmedia_vid_dev_stream *strm)
{
diff --git a/pjmedia/src/pjmedia/event.c b/pjmedia/src/pjmedia/event.c
index 8e92b53e..c499eaa5 100644
--- a/pjmedia/src/pjmedia/event.c
+++ b/pjmedia/src/pjmedia/event.c
@@ -19,150 +19,342 @@
#include <pjmedia/event.h>
#include <pjmedia/errno.h>
#include <pj/assert.h>
+#include <pj/list.h>
#include <pj/log.h>
+#include <pj/os.h>
+#include <pj/pool.h>
#include <pj/string.h>
#define THIS_FILE "event.c"
-#if 1
-# define TRACE_(x) PJ_LOG(6,x)
-#else
-# define TRACE_(x)
-#endif
+#define MAX_EVENTS 16
-PJ_DEF(void) pjmedia_event_init( pjmedia_event *event,
- pjmedia_event_type type,
- const pj_timestamp *ts,
- const pjmedia_event_publisher *epub)
+typedef struct esub esub;
+
+struct esub
{
- pj_bzero(event, sizeof(*event));
- event->type = type;
- if (ts)
- event->timestamp.u64 = ts->u64;
- event->epub = epub;
- if (epub)
- event->epub_sig = epub->sig;
-}
+ PJ_DECL_LIST_MEMBER(esub);
+
+ pjmedia_event_cb *cb;
+ void *user_data;
+ void *epub;
+};
-PJ_DEF(void) pjmedia_event_publisher_init(pjmedia_event_publisher *epub,
- pjmedia_obj_sig sig)
+typedef struct event_queue
{
- pj_bzero(epub, sizeof(*epub));
- pj_list_init(&epub->subscription_list);
- epub->sig = sig;
-}
+ pjmedia_event events[MAX_EVENTS]; /**< array of events. */
+ int head, tail;
+ pj_bool_t is_full;
+} event_queue;
+
+struct pjmedia_event_mgr
+{
+ pj_thread_t *thread; /**< worker thread. */
+ pj_bool_t is_quitting;
+ pj_sem_t *sem;
+ pj_mutex_t *mutex;
+ event_queue ev_queue;
+ event_queue *pub_ev_queue; /**< publish() event queue. */
+ esub esub_list; /**< list of subscribers. */
+ esub *th_next_sub, /**< worker thread's next sub. */
+ *pub_next_sub; /**< publish() next sub. */
+};
-PJ_DEF(void) pjmedia_event_subscription_init( pjmedia_event_subscription *esub,
- pjmedia_event_cb *cb,
- void *user_data)
+static pjmedia_event_mgr *event_manager_instance;
+
+static pj_status_t event_queue_add_event(event_queue* ev_queue,
+ pjmedia_event *event)
{
- pj_bzero(esub, sizeof(*esub));
- esub->cb = cb;
- esub->user_data = user_data;
+ if (ev_queue->is_full) {
+ char ev_name[5];
+
+ /* This event will be ignored. */
+ PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] "
+ "due to full queue.",
+ pjmedia_fourcc_name(event->type, ev_name),
+ event->epub));
+
+ return PJ_ETOOMANY;
+ }
+
+ pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event));
+ ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS;
+ if (ev_queue->tail == ev_queue->head)
+ ev_queue->is_full = PJ_TRUE;
+
+ return PJ_SUCCESS;
}
-PJ_DEF(pj_bool_t)
-pjmedia_event_publisher_has_sub(pjmedia_event_publisher *epub)
+static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr,
+ event_queue *ev_queue,
+ esub **next_sub)
{
- PJ_ASSERT_RETURN(epub, PJ_FALSE);
- return epub->subscription_list.next &&
- (!pj_list_empty(&epub->subscription_list));
+ pj_status_t err = PJ_SUCCESS;
+ esub * sub = mgr->esub_list.next;
+ pjmedia_event *ev = &ev_queue->events[ev_queue->head];
+
+ while (sub != &mgr->esub_list) {
+ *next_sub = sub->next;
+
+ /* Check if the subscriber is interested in
+ * receiving the event from the publisher.
+ */
+ if (sub->epub == ev->epub || !sub->epub) {
+ pj_status_t status = (*sub->cb)(ev, sub->user_data);
+ if (status != PJ_SUCCESS && err == PJ_SUCCESS)
+ err = status;
+ }
+ sub = *next_sub;
+ }
+ *next_sub = NULL;
+
+ ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS;
+ ev_queue->is_full = PJ_FALSE;
+
+ return err;
}
-PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_publisher *epub,
- pjmedia_event_subscription *esub)
+/* Event worker thread function. */
+static int event_worker_thread(void *arg)
{
- char epub_name[5];
+ pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg;
- PJ_ASSERT_RETURN(epub && esub && esub->cb, PJ_EINVAL);
- /* Must not currently subscribe to anything */
- PJ_ASSERT_RETURN(esub->subscribe_to == NULL, PJ_EINVALIDOP);
+ while (1) {
+ /* Wait until there is an event. */
+ pj_sem_wait(mgr->sem);
- TRACE_((THIS_FILE, "Subscription [0x%p] to publisher %s",
- esub,
- pjmedia_fourcc_name(epub->sig, epub_name)));
- /* Suppress compiler warning if trace is disabled */
- PJ_UNUSED_ARG(epub_name);
+ if (mgr->is_quitting)
+ break;
- pj_list_push_back(&epub->subscription_list, esub);
- esub->subscribe_to = epub;
- return PJ_SUCCESS;
+ pj_mutex_lock(mgr->mutex);
+ event_mgr_distribute_events(mgr, &mgr->ev_queue, &mgr->th_next_sub);
+ pj_mutex_unlock(mgr->mutex);
+ }
+
+ return 0;
}
-PJ_DEF(pj_status_t) pjmedia_event_unsubscribe(pjmedia_event_subscription *esub)
+PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool,
+ unsigned options,
+ pjmedia_event_mgr **p_mgr)
{
- PJ_ASSERT_RETURN(esub, PJ_EINVAL);
-
- if (esub->subscribe_to) {
- char epub_name[5];
- TRACE_((THIS_FILE, "Unsubscription [0x%p] to publisher %s",
- esub,
- pjmedia_fourcc_name(esub->subscribe_to->sig,
- epub_name)));
- /* Suppress compiler warning if trace is disabled */
- PJ_UNUSED_ARG(epub_name);
-
- PJ_ASSERT_RETURN(
- pj_list_find_node(&esub->subscribe_to->subscription_list,
- esub)==esub, PJ_ENOTFOUND);
- pj_list_erase(esub);
- esub->subscribe_to = NULL;
+ pjmedia_event_mgr *mgr;
+ pj_status_t status;
+
+ mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr);
+ pj_list_init(&mgr->esub_list);
+
+ if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) {
+ status = pj_sem_create(pool, "ev_sem", 0, MAX_EVENTS + 1, &mgr->sem);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ status = pj_thread_create(pool, "ev_thread", &event_worker_thread,
+ mgr, 0, 0, &mgr->thread);
+ if (status != PJ_SUCCESS) {
+ pjmedia_event_mgr_destroy(mgr);
+ return status;
+ }
+ }
+
+ status = pj_mutex_create_recursive(pool, "ev_mutex", &mgr->mutex);
+ if (status != PJ_SUCCESS) {
+ pjmedia_event_mgr_destroy(mgr);
+ return status;
}
+
+ if (!event_manager_instance)
+ event_manager_instance = mgr;
+
+ if (p_mgr)
+ *p_mgr = mgr;
+
return PJ_SUCCESS;
}
-PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_publisher *epub,
- pjmedia_event *event)
+PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void)
{
- pjmedia_event_subscription *esub;
- char event_name[5];
- char epub_name[5];
- pj_status_t err = PJ_SUCCESS;
+ return event_manager_instance;
+}
- PJ_ASSERT_RETURN(epub && event, PJ_EINVAL);
+PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr)
+{
+ event_manager_instance = mgr;
+}
- TRACE_((THIS_FILE, "Event %s is published by publisher %s",
- pjmedia_fourcc_name(event->type, event_name),
- pjmedia_fourcc_name(epub->sig, epub_name)));
- /* Suppress compiler warning if trace is disabled */
- PJ_UNUSED_ARG(event_name);
- PJ_UNUSED_ARG(epub_name);
+PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr)
+{
+ esub *sub;
- esub = epub->subscription_list.next;
- if (!esub)
- return err;
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_ON_FAIL(mgr != NULL, return);
- while (esub != &epub->subscription_list) {
- pjmedia_event_subscription *next;
- pj_status_t status;
+ if (mgr->thread) {
+ mgr->is_quitting = PJ_TRUE;
+ pj_sem_post(mgr->sem);
+ pj_thread_join(mgr->thread);
+ }
- /* just in case esub is destroyed in the callback */
- next = esub->next;
+ if (mgr->sem) {
+ pj_sem_destroy(mgr->sem);
+ mgr->sem = NULL;
+ }
- status = (*esub->cb)(esub, event);
- if (status != PJ_SUCCESS && err == PJ_SUCCESS)
- err = status;
+ if (mgr->mutex) {
+ pj_mutex_destroy(mgr->mutex);
+ mgr->mutex = NULL;
+ }
- esub = next;
+ sub = mgr->esub_list.next;
+ while (sub != &mgr->esub_list) {
+ esub *next = sub->next;
+ pj_list_erase(sub);
+ sub = next;
}
- return err;
+ if (event_manager_instance == mgr)
+ event_manager_instance = NULL;
}
-static pj_status_t republisher_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+PJ_DEF(void) pjmedia_event_init( pjmedia_event *event,
+ pjmedia_event_type type,
+ const pj_timestamp *ts,
+ const void *src)
{
- return pjmedia_event_publish((pjmedia_event_publisher*)esub->user_data,
- event);
+ pj_bzero(event, sizeof(*event));
+ event->type = type;
+ if (ts)
+ event->timestamp.u64 = ts->u64;
+ event->epub = event->src = src;
}
-PJ_DEF(pj_status_t) pjmedia_event_republish(pjmedia_event_publisher *esrc,
- pjmedia_event_publisher *epub,
- pjmedia_event_subscription *esub)
+PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr,
+ pj_pool_t *pool,
+ pjmedia_event_cb *cb,
+ void *user_data,
+ void *epub)
{
- PJ_ASSERT_RETURN(esrc && epub && esub, PJ_EINVAL);
+ esub *sub;
+
+ PJ_ASSERT_RETURN(pool && cb, PJ_EINVAL);
+
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
+
+ pj_mutex_lock(mgr->mutex);
+ /* Check whether callback function with the same user data is already
+ * subscribed to the publisher. This is to prevent the callback function
+ * receiving the same event from the same publisher more than once.
+ */
+ sub = mgr->esub_list.next;
+ while (sub != &mgr->esub_list) {
+ esub *next = sub->next;
+ if (sub->cb == cb && sub->user_data == user_data &&
+ sub->epub == epub)
+ {
+ pj_mutex_unlock(mgr->mutex);
+ return PJ_SUCCESS;
+ }
+ sub = next;
+ }
+
+ sub = PJ_POOL_ZALLOC_T(pool, esub);
+ sub->cb = cb;
+ sub->user_data = user_data;
+ sub->epub = epub;
+ pj_list_push_back(&mgr->esub_list, sub);
+ pj_mutex_unlock(mgr->mutex);
- pjmedia_event_subscription_init(esub, &republisher_cb, epub);
- return pjmedia_event_subscribe(esrc, esub);
+ return PJ_SUCCESS;
}
+PJ_DEF(pj_status_t)
+pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr,
+ pjmedia_event_cb *cb,
+ void *user_data,
+ void *epub)
+{
+ esub *sub;
+
+ PJ_ASSERT_RETURN(cb, PJ_EINVAL);
+
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
+
+ pj_mutex_lock(mgr->mutex);
+ sub = mgr->esub_list.next;
+ while (sub != &mgr->esub_list) {
+ esub *next = sub->next;
+ if (sub->cb == cb && (sub->user_data == user_data || !user_data) &&
+ (sub->epub == epub || !epub))
+ {
+ /* If the worker thread or pjmedia_event_publish() API is
+ * in the process of distributing events, make sure that
+ * its pointer to the next subscriber stays valid.
+ */
+ if (mgr->th_next_sub == sub)
+ mgr->th_next_sub = sub->next;
+ if (mgr->pub_next_sub == sub)
+ mgr->pub_next_sub = sub->next;
+ pj_list_erase(sub);
+ if (user_data && epub)
+ break;
+ }
+ sub = next;
+ }
+ pj_mutex_unlock(mgr->mutex);
+
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr,
+ void *epub,
+ pjmedia_event *event,
+ pjmedia_event_publish_flag flag)
+{
+ pj_status_t err = PJ_SUCCESS;
+
+ PJ_ASSERT_RETURN(epub && event, PJ_EINVAL);
+
+ if (!mgr) mgr = pjmedia_event_mgr_instance();
+ PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
+
+ event->epub = epub;
+
+ pj_mutex_lock(mgr->mutex);
+ if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) {
+ if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS)
+ pj_sem_post(mgr->sem);
+ } else {
+ /* For nested pjmedia_event_publish() calls, i.e. calling publish()
+ * inside the subscriber's callback, the function will only add
+ * the event to the event queue of the first publish() call. It
+ * is the first publish() call that will be responsible to
+ * distribute the events.
+ */
+ if (mgr->pub_ev_queue) {
+ event_queue_add_event(mgr->pub_ev_queue, event);
+ } else {
+ static event_queue ev_queue;
+ pj_status_t status;
+
+ ev_queue.head = ev_queue.tail = 0;
+ ev_queue.is_full = PJ_FALSE;
+ mgr->pub_ev_queue = &ev_queue;
+
+ event_queue_add_event(mgr->pub_ev_queue, event);
+
+ do {
+ status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue,
+ &mgr->pub_next_sub);
+ if (status != PJ_SUCCESS && err == PJ_SUCCESS)
+ err = status;
+ } while(ev_queue.head != ev_queue.tail || ev_queue.is_full);
+
+ mgr->pub_ev_queue = NULL;
+ }
+ }
+ pj_mutex_unlock(mgr->mutex);
+
+ return err;
+}
diff --git a/pjmedia/src/pjmedia/port.c b/pjmedia/src/pjmedia/port.c
index e838263a..b7defae5 100644
--- a/pjmedia/src/pjmedia/port.c
+++ b/pjmedia/src/pjmedia/port.c
@@ -117,20 +117,6 @@ PJ_DEF(pj_status_t) pjmedia_port_put_frame( pjmedia_port *port,
return PJ_EINVALIDOP;
}
-/*
- * Get event publisher
- */
-PJ_DEF(pjmedia_event_publisher*)
-pjmedia_port_get_event_publisher(pjmedia_port *port)
-{
- PJ_ASSERT_RETURN(port, NULL);
-
- if (port->get_event_pub)
- return (*port->get_event_pub)(port);
-
- return NULL;
-}
-
/**
* Destroy port (and subsequent downstream ports)
*/
diff --git a/pjmedia/src/pjmedia/vid_codec.c b/pjmedia/src/pjmedia/vid_codec.c
index 04d2f455..2002c6d5 100644
--- a/pjmedia/src/pjmedia/vid_codec.c
+++ b/pjmedia/src/pjmedia/vid_codec.c
@@ -77,7 +77,6 @@ PJ_DEF(void) pjmedia_vid_codec_reset(pjmedia_vid_codec *codec,
pjmedia_obj_sig sig)
{
pj_bzero(codec, sizeof(*codec));
- pjmedia_event_publisher_init(&codec->epub, sig);
}
/*
diff --git a/pjmedia/src/pjmedia/vid_port.c b/pjmedia/src/pjmedia/vid_port.c
index fa5c2da6..1c1cdedf 100644
--- a/pjmedia/src/pjmedia/vid_port.c
+++ b/pjmedia/src/pjmedia/vid_port.c
@@ -61,10 +61,6 @@ struct pjmedia_vid_port
pj_size_t conv_buf_size;
pjmedia_conversion_param conv_param;
- pjmedia_event_publisher epub;
- pjmedia_event_subscription esub_dev;
- pjmedia_event_subscription esub_client_port;
-
pjmedia_clock *clock;
pjmedia_clock_src clocksrc;
@@ -94,10 +90,10 @@ static pj_status_t vidstream_cap_cb(pjmedia_vid_dev_stream *stream,
static pj_status_t vidstream_render_cb(pjmedia_vid_dev_stream *stream,
void *user_data,
pjmedia_frame *frame);
-static pj_status_t vidstream_event_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event);
-static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event);
+static pj_status_t vidstream_event_cb(pjmedia_event *event,
+ void *user_data);
+static pj_status_t client_port_event_cb(pjmedia_event *event,
+ void *user_data);
static void enc_clock_cb(const pj_timestamp *ts, void *user_data);
static void dec_clock_cb(const pj_timestamp *ts, void *user_data);
@@ -206,7 +202,6 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_create( pj_pool_t *pool,
vp->role = prm->active ? ROLE_ACTIVE : ROLE_PASSIVE;
vp->dir = prm->vidparam.dir;
// vp->cap_size = vfd->size;
- pjmedia_event_publisher_init(&vp->epub, SIGNATURE);
vparam = prm->vidparam;
dev_name[0] = '\0';
@@ -272,10 +267,8 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_create( pj_pool_t *pool,
vparam.fmt.det.vid.fps.num, vparam.fmt.det.vid.fps.denum));
/* Subscribe to device's events */
- pjmedia_event_subscription_init(&vp->esub_dev, vidstream_event_cb, vp);
- pjmedia_event_subscribe(
- pjmedia_vid_dev_stream_get_event_publisher(vp->strm),
- &vp->esub_dev);
+ pjmedia_event_subscribe(NULL, vp->pool, &vidstream_event_cb,
+ vp, vp->strm);
if (vp->dir & PJMEDIA_DIR_CAPTURE) {
pjmedia_format_copy(&vp->conv_param.src, &vparam.fmt);
@@ -370,13 +363,6 @@ PJ_DEF(void) pjmedia_vid_port_set_cb(pjmedia_vid_port *vid_port,
vid_port->strm_cb_data = user_data;
}
-PJ_DEF(pjmedia_event_publisher*)
-pjmedia_vid_port_get_event_publisher(pjmedia_vid_port *vid_port)
-{
- PJ_ASSERT_RETURN(vid_port, NULL);
- return &vid_port->epub;
-}
-
PJ_DEF(pjmedia_vid_dev_stream*)
pjmedia_vid_port_get_stream(pjmedia_vid_port *vp)
{
@@ -419,20 +405,14 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_connect(pjmedia_vid_port *vp,
pjmedia_port *port,
pj_bool_t destroy)
{
- pjmedia_event_publisher *epub;
-
PJ_ASSERT_RETURN(vp && vp->role==ROLE_ACTIVE, PJ_EINVAL);
vp->destroy_client_port = destroy;
vp->client_port = port;
/* Subscribe to client port's events */
- epub = pjmedia_port_get_event_publisher(port);
- if (epub) {
- pjmedia_event_subscription_init(&vp->esub_client_port,
- &client_port_event_cb,
- vp);
- pjmedia_event_subscribe(epub, &vp->esub_client_port);
- }
+ pjmedia_event_subscribe(NULL, vp->pool, &client_port_event_cb, vp,
+ vp->client_port);
+
return PJ_SUCCESS;
}
@@ -441,10 +421,10 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_disconnect(pjmedia_vid_port *vp)
{
PJ_ASSERT_RETURN(vp && vp->role==ROLE_ACTIVE, PJ_EINVAL);
- if (vp->client_port) {
- pjmedia_event_unsubscribe(&vp->esub_client_port);
- vp->client_port = NULL;
- }
+ pjmedia_event_unsubscribe(NULL, &client_port_event_cb, vp,
+ vp->client_port);
+ vp->client_port = NULL;
+
return PJ_SUCCESS;
}
@@ -510,10 +490,13 @@ PJ_DEF(void) pjmedia_vid_port_destroy(pjmedia_vid_port *vp)
vp->clock = NULL;
}
if (vp->strm) {
+ pjmedia_event_unsubscribe(NULL, &vidstream_event_cb, vp, vp->strm);
pjmedia_vid_dev_stream_destroy(vp->strm);
vp->strm = NULL;
}
if (vp->client_port) {
+ pjmedia_event_unsubscribe(NULL, &client_port_event_cb, vp,
+ vp->client_port);
if (vp->destroy_client_port)
pjmedia_port_destroy(vp->client_port);
vp->client_port = NULL;
@@ -560,26 +543,24 @@ static void save_rgb_frame(int width, int height, const pjmedia_frame *frm)
*/
/* Handle event from vidstream */
-static pj_status_t vidstream_event_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+static pj_status_t vidstream_event_cb(pjmedia_event *event,
+ void *user_data)
{
- pjmedia_vid_port *vp = (pjmedia_vid_port*)esub->user_data;
+ pjmedia_vid_port *vp = (pjmedia_vid_port*)user_data;
/* Just republish the event to our client */
- return pjmedia_event_publish(&vp->epub, event);
+ return pjmedia_event_publish(NULL, vp, event, 0);
}
-static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+static pj_status_t client_port_event_cb(pjmedia_event *event,
+ void *user_data)
{
- pjmedia_vid_port *vp = (pjmedia_vid_port*)esub->user_data;
+ pjmedia_vid_port *vp = (pjmedia_vid_port*)user_data;
if (event->type == PJMEDIA_EVENT_FMT_CHANGED) {
const pjmedia_video_format_detail *vfd;
pj_status_t status;
- ++event->proc_cnt;
-
pjmedia_vid_port_stop(vp);
/* Retrieve the video format detail */
@@ -633,8 +614,11 @@ static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub,
pjmedia_vid_port_start(vp);
}
- /* Republish the event */
- return pjmedia_event_publish(&vp->epub, event);
+ /* Republish the event, post the event to the event manager
+ * to avoid deadlock if vidport is trying to stop the clock.
+ */
+ return pjmedia_event_publish(NULL, vp, event,
+ PJMEDIA_EVENT_PUBLISH_POST_EVENT);
}
static pj_status_t convert_frame(pjmedia_vid_port *vp,
diff --git a/pjmedia/src/pjmedia/vid_stream.c b/pjmedia/src/pjmedia/vid_stream.c
index 876b0165..9879d9bb 100644
--- a/pjmedia/src/pjmedia/vid_stream.c
+++ b/pjmedia/src/pjmedia/vid_stream.c
@@ -147,9 +147,6 @@ struct pjmedia_vid_stream
pjmedia_vid_codec *codec; /**< Codec instance being used. */
pj_uint32_t last_dec_ts; /**< Last decoded timestamp. */
int last_dec_seq; /**< Last decoded sequence. */
-
- pjmedia_event_subscription esub_codec; /**< To subscribe codec events */
- pjmedia_event_publisher epub; /**< To publish events */
};
/* Prototypes */
@@ -339,18 +336,15 @@ static void dump_port_info(const pjmedia_vid_channel *chan,
/*
* Handle events from stream components.
*/
-static pj_status_t stream_event_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+static pj_status_t stream_event_cb(pjmedia_event *event,
+ void *user_data)
{
- pjmedia_vid_stream *stream = (pjmedia_vid_stream*)esub->user_data;
+ pjmedia_vid_stream *stream = (pjmedia_vid_stream*)user_data;
- if (esub == &stream->esub_codec) {
+ if (event->epub == stream->codec) {
/* This is codec event */
switch (event->type) {
case PJMEDIA_EVENT_FMT_CHANGED:
- /* we process the event */
- ++event->proc_cnt;
-
/* Copy the event to avoid deadlock if we publish the event
* now. This happens because fmt_event may trigger restart
* while we're still holding the jb_mutex.
@@ -362,13 +356,7 @@ static pj_status_t stream_event_cb(pjmedia_event_subscription *esub,
}
}
- return pjmedia_event_publish(&stream->epub, event);
-}
-
-static pjmedia_event_publisher *port_get_epub(pjmedia_port *port)
-{
- pjmedia_vid_stream *stream = (pjmedia_vid_stream*) port->port_data.pdata;
- return &stream->epub;
+ return pjmedia_event_publish(NULL, stream, event, 0);
}
#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0
@@ -1031,18 +1019,18 @@ static pj_status_t decode_frame(pjmedia_vid_stream *stream,
vfd->fps.num / vfd->fps.denum));
/* Publish PJMEDIA_EVENT_FMT_CHANGED event */
- if (pjmedia_event_publisher_has_sub(&stream->epub)) {
+ {
pjmedia_event event;
dump_port_info(stream->dec, "changed");
pjmedia_event_init(&event, PJMEDIA_EVENT_FMT_CHANGED,
- &frame->timestamp, &stream->epub);
+ &frame->timestamp, &stream);
event.data.fmt_changed.dir = PJMEDIA_DIR_DECODING;
pj_memcpy(&event.data.fmt_changed.new_fmt,
&stream->info.codec_param->dec_fmt,
sizeof(pjmedia_format));
- pjmedia_event_publish(&stream->epub, &event);
+ pjmedia_event_publish(NULL, stream, &event, 0);
}
}
}
@@ -1086,7 +1074,7 @@ static pj_status_t get_frame(pjmedia_port *port,
stream->dec : stream->enc,
"changed");
- pjmedia_event_publish(&stream->epub, &stream->fmt_event);
+ pjmedia_event_publish(NULL, stream, &stream->fmt_event, 0);
stream->fmt_event.type = PJMEDIA_EVENT_NONE;
}
@@ -1211,7 +1199,6 @@ static pj_status_t create_channel( pj_pool_t *pool,
/* Init port. */
channel->port.port_data.pdata = stream;
- channel->port.get_event_pub = &port_get_epub;
PJ_LOG(5, (name.ptr,
"%s channel created %dx%d %s%s%.*s %d/%d(~%d)fps",
@@ -1345,11 +1332,9 @@ PJ_DEF(pj_status_t) pjmedia_vid_stream_create(
if (status != PJ_SUCCESS)
return status;
- /* Init event publisher and subscribe to codec events */
- pjmedia_event_publisher_init(&stream->epub, SIGNATURE);
- pjmedia_event_subscription_init(&stream->esub_codec, &stream_event_cb,
- stream);
- pjmedia_event_subscribe(&stream->codec->epub, &stream->esub_codec);
+ /* Subscribe to codec events */
+ pjmedia_event_subscribe(NULL, pool, &stream_event_cb, stream,
+ stream->codec);
/* Estimate the maximum frame size */
stream->frame_size = vfd_enc->size.w * vfd_enc->size.h * 4;
@@ -1556,6 +1541,8 @@ PJ_DEF(pj_status_t) pjmedia_vid_stream_destroy( pjmedia_vid_stream *stream )
/* Free codec. */
if (stream->codec) {
+ pjmedia_event_unsubscribe(NULL, &stream_event_cb, stream,
+ stream->codec);
pjmedia_vid_codec_close(stream->codec);
pjmedia_vid_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec);
stream->codec = NULL;
diff --git a/pjmedia/src/test/test.c b/pjmedia/src/test/test.c
index 2b7c9ca5..275d2da0 100644
--- a/pjmedia/src/test/test.c
+++ b/pjmedia/src/test/test.c
@@ -72,6 +72,7 @@ int test_main(void)
#if defined(PJMEDIA_HAS_VIDEO) && (PJMEDIA_HAS_VIDEO != 0)
pjmedia_video_format_mgr_create(pool, 64, 0, NULL);
pjmedia_converter_mgr_create(pool, NULL);
+ pjmedia_event_mgr_create(pool, 0, NULL);
pjmedia_vid_codec_mgr_create(pool, NULL);
#endif
@@ -115,6 +116,7 @@ on_return:
#if defined(PJMEDIA_HAS_VIDEO) && (PJMEDIA_HAS_VIDEO != 0)
pjmedia_video_format_mgr_destroy(pjmedia_video_format_mgr_instance());
pjmedia_converter_mgr_destroy(pjmedia_converter_mgr_instance());
+ pjmedia_event_mgr_destroy(pjmedia_event_mgr_instance());
pjmedia_vid_codec_mgr_destroy(pjmedia_vid_codec_mgr_instance());
#endif
diff --git a/pjmedia/src/test/vid_codec_test.c b/pjmedia/src/test/vid_codec_test.c
index a6c62469..11ef93ab 100644
--- a/pjmedia/src/test/vid_codec_test.c
+++ b/pjmedia/src/test/vid_codec_test.c
@@ -47,18 +47,16 @@ typedef struct codec_port_data_t
pj_size_t pack_buf_size;
} codec_port_data_t;
-static pj_status_t codec_on_event(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+static pj_status_t codec_on_event(pjmedia_event *event,
+ void *user_data)
{
- codec_port_data_t *port_data = (codec_port_data_t*)esub->user_data;
+ codec_port_data_t *port_data = (codec_port_data_t*)user_data;
if (event->type == PJMEDIA_EVENT_FMT_CHANGED) {
pjmedia_vid_codec *codec = port_data->codec;
pjmedia_vid_codec_param codec_param;
pj_status_t status;
- ++event->proc_cnt;
-
status = pjmedia_vid_codec_get_param(codec, &codec_param);
if (status != PJ_SUCCESS)
return status;
@@ -200,7 +198,6 @@ static int encode_decode_test(pj_pool_t *pool, const char *codec_id,
pjmedia_vid_port *capture=NULL, *renderer=NULL;
pjmedia_vid_port_param vport_param;
pjmedia_video_format_detail *vfd;
- pjmedia_event_subscription esub;
char codec_name[5];
pj_status_t status;
int rc = 0;
@@ -323,9 +320,8 @@ static int encode_decode_test(pj_pool_t *pool, const char *codec_id,
codec_param.dec_fmt.det = codec_param.enc_fmt.det;
/* Subscribe to codec events */
- pjmedia_event_subscription_init(&esub, &codec_on_event,
- &codec_port_data);
- pjmedia_event_subscribe(&codec->epub, &esub);
+ pjmedia_event_subscribe(NULL, pool, &codec_on_event, &codec_port_data,
+ codec);
}
pjmedia_vid_port_param_default(&vport_param);
@@ -430,6 +426,8 @@ on_return:
pjmedia_vid_port_destroy(renderer);
}
if (codec) {
+ pjmedia_event_unsubscribe(NULL, &codec_on_event, &codec_port_data,
+ codec);
pjmedia_vid_codec_close(codec);
pjmedia_vid_codec_mgr_dealloc_codec(NULL, codec);
}
diff --git a/pjmedia/src/test/vid_dev_test.c b/pjmedia/src/test/vid_dev_test.c
index 7425e49b..5782337b 100644
--- a/pjmedia/src/test/vid_dev_test.c
+++ b/pjmedia/src/test/vid_dev_test.c
@@ -76,10 +76,10 @@ static int enum_devs(void)
return PJ_SUCCESS;
}
-static pj_status_t vid_event_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+static pj_status_t vid_event_cb(pjmedia_event *event,
+ void *user_data)
{
- PJ_UNUSED_ARG(esub);
+ PJ_UNUSED_ARG(user_data);
if (event->type == PJMEDIA_EVENT_WND_CLOSED)
is_quitting = PJ_TRUE;
@@ -95,7 +95,6 @@ static int capture_render_loopback(int cap_dev_id, int rend_dev_id,
pjmedia_vid_dev_info cdi, rdi;
pjmedia_vid_port_param param;
pjmedia_video_format_detail *vfd;
- pjmedia_event_subscription esub;
pj_status_t status;
int rc = 0, i;
@@ -161,10 +160,7 @@ static int capture_render_loopback(int cap_dev_id, int rend_dev_id,
}
/* Set event handler */
- pjmedia_event_subscription_init(&esub, &vid_event_cb, NULL);
- pjmedia_event_subscribe(
- pjmedia_vid_port_get_event_publisher(renderer),
- &esub);
+ pjmedia_event_subscribe(NULL, pool, &vid_event_cb, NULL, renderer);
/* Connect capture to renderer */
status = pjmedia_vid_port_connect(
@@ -196,8 +192,10 @@ on_return:
if (capture)
pjmedia_vid_port_destroy(capture);
- if (renderer)
+ if (renderer) {
+ pjmedia_event_unsubscribe(NULL, &vid_event_cb, NULL, renderer);
pjmedia_vid_port_destroy(renderer);
+ }
pj_pool_release(pool);
return rc;
diff --git a/pjmedia/src/test/vid_port_test.c b/pjmedia/src/test/vid_port_test.c
index fed61bb1..dd3bf37e 100644
--- a/pjmedia/src/test/vid_port_test.c
+++ b/pjmedia/src/test/vid_port_test.c
@@ -32,10 +32,10 @@
static pj_bool_t is_quitting = PJ_FALSE;
-static pj_status_t vid_event_cb(pjmedia_event_subscription *esub,
- pjmedia_event *event)
+static pj_status_t vid_event_cb(pjmedia_event *event,
+ void *user_data)
{
- PJ_UNUSED_ARG(esub);
+ PJ_UNUSED_ARG(user_data);
if (event->type == PJMEDIA_EVENT_WND_CLOSED)
is_quitting = PJ_TRUE;
@@ -52,7 +52,6 @@ static int capture_render_loopback(pj_bool_t active,
pjmedia_vid_dev_info cdi, rdi;
pjmedia_vid_port_param param;
pjmedia_video_format_detail *vfd;
- pjmedia_event_subscription esub;
pj_status_t status;
int rc = 0, i;
@@ -118,10 +117,7 @@ static int capture_render_loopback(pj_bool_t active,
}
/* Set event handler */
- pjmedia_event_subscription_init(&esub, &vid_event_cb, NULL);
- pjmedia_event_subscribe(
- pjmedia_vid_port_get_event_publisher(renderer),
- &esub);
+ pjmedia_event_subscribe(NULL, pool, &vid_event_cb, NULL, renderer);
/* Connect capture to renderer */
status = pjmedia_vid_port_connect(
@@ -153,8 +149,10 @@ on_return:
if (capture)
pjmedia_vid_port_destroy(capture);
- if (renderer)
+ if (renderer) {
+ pjmedia_event_unsubscribe(NULL, &vid_event_cb, NULL, renderer);
pjmedia_vid_port_destroy(renderer);
+ }
pj_pool_release(pool);
return rc;