From c04000a192a00f047ea6d04e131e42f0b72bc11b Mon Sep 17 00:00:00 2001 From: Liong Sauw Ming Date: Thu, 1 Dec 2011 10:49:07 +0000 Subject: Closed #1420: Add support for event manager git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@3893 74dad513-b988-da41-8d7b-12977e46ad98 --- pjmedia/include/pjmedia-videodev/videodev.h | 13 - pjmedia/include/pjmedia-videodev/videodev_imp.h | 3 - pjmedia/include/pjmedia/event.h | 254 ++++++++------- pjmedia/include/pjmedia/port.h | 16 - pjmedia/include/pjmedia/vid_codec.h | 3 - pjmedia/include/pjmedia/vid_port.h | 10 - pjmedia/src/pjmedia-codec/ffmpeg_codecs.c | 12 +- pjmedia/src/pjmedia-videodev/colorbar_dev.c | 1 - pjmedia/src/pjmedia-videodev/dshow_dev.c | 1 - pjmedia/src/pjmedia-videodev/ffmpeg_dev.c | 1 - pjmedia/src/pjmedia-videodev/sdl_dev.c | 10 +- pjmedia/src/pjmedia-videodev/videodev.c | 6 - pjmedia/src/pjmedia/event.c | 394 ++++++++++++++++++------ pjmedia/src/pjmedia/port.c | 14 - pjmedia/src/pjmedia/vid_codec.c | 1 - pjmedia/src/pjmedia/vid_port.c | 72 ++--- pjmedia/src/pjmedia/vid_stream.c | 41 +-- pjmedia/src/test/test.c | 2 + pjmedia/src/test/vid_codec_test.c | 16 +- pjmedia/src/test/vid_dev_test.c | 16 +- pjmedia/src/test/vid_port_test.c | 16 +- 21 files changed, 490 insertions(+), 412 deletions(-) (limited to 'pjmedia') diff --git a/pjmedia/include/pjmedia-videodev/videodev.h b/pjmedia/include/pjmedia-videodev/videodev.h index 767fdf73..fa6c2697 100644 --- a/pjmedia/include/pjmedia-videodev/videodev.h +++ b/pjmedia/include/pjmedia-videodev/videodev.h @@ -750,19 +750,6 @@ PJ_DECL(pj_status_t) pjmedia_vid_dev_stream_start( PJ_DECL(pj_bool_t) pjmedia_vid_dev_stream_is_running(pjmedia_vid_dev_stream *strm); -/** - * Get the event publisher object for the video stream. Caller typically use - * the returned object to subscribe or unsubscribe events from the video - * stream. - * - * @param strm The video stream. - * - * @return The event publisher object. - */ -PJ_DECL(pjmedia_event_publisher*) -pjmedia_vid_dev_stream_get_event_publisher(pjmedia_vid_dev_stream *strm); - - /** * Request one frame from the stream. Application needs to call this function * periodically only if the stream doesn't support "active interface", i.e. diff --git a/pjmedia/include/pjmedia-videodev/videodev_imp.h b/pjmedia/include/pjmedia-videodev/videodev_imp.h index 8ddfd968..0a1d9830 100644 --- a/pjmedia/include/pjmedia-videodev/videodev_imp.h +++ b/pjmedia/include/pjmedia-videodev/videodev_imp.h @@ -188,9 +188,6 @@ struct pjmedia_vid_dev_stream /** Operations */ pjmedia_vid_dev_stream_op *op; - - /** Event producer */ - pjmedia_event_publisher epub; }; diff --git a/pjmedia/include/pjmedia/event.h b/pjmedia/include/pjmedia/event.h index 259f7264..7a779e79 100644 --- a/pjmedia/include/pjmedia/event.h +++ b/pjmedia/include/pjmedia/event.h @@ -25,7 +25,6 @@ */ #include #include -#include PJ_BEGIN_DECL @@ -87,16 +86,6 @@ typedef enum pjmedia_event_type } pjmedia_event_type; -/** - * Forward declaration for event subscription. - */ -typedef struct pjmedia_event_subscription pjmedia_event_subscription; - -/** - * Forward declaration for event publisher. - */ -typedef struct pjmedia_event_publisher pjmedia_event_publisher; - /** * Additional data/parameters for media format changed event * (PJMEDIA_EVENT_FMT_CHANGED). @@ -162,8 +151,8 @@ typedef char pjmedia_event_user_data[PJMEDIA_EVENT_DATA_MAX_SIZE]; /** * This structure describes a media event. It consists mainly of the event - * type and additional data/parameters for the event. Event publishers need - * to use #pjmedia_event_init() to initialize this event structure with + * type and additional data/parameters for the event. Applications can + * use #pjmedia_event_init() to initialize this event structure with * basic information about the event. */ typedef struct pjmedia_event @@ -179,27 +168,22 @@ typedef struct pjmedia_event pj_timestamp timestamp; /** - * This keeps count on the number of subscribers that have - * processed this event. - */ - unsigned proc_cnt; - - /** - * The object signature of the event publisher. Application may use - * this to check which publisher published the event. + * Pointer information about the source of this event. This field + * is provided mainly for comparison purpose so that event subscribers + * can check which source the event originated from. Usage of this + * pointer for other purpose may require special care such as mutex + * locking or checking whether the object is already destroyed. */ - pjmedia_obj_sig epub_sig; + const void *src; /** - * Pointer information about the source of this event. This field - * is provided mainly so that the event subscribers can compare it - * against the publisher that it subscribed the events from initially, - * a publisher can republish events from other publisher. Event - * subscription must be careful when using this pointer other than for - * comparison purpose, since access to the publisher may require special - * care (e.g. mutex locking). + * Pointer information about the publisher of this event. This field + * is provided mainly for comparison purpose so that event subscribers + * can check which object published the event. Usage of this + * pointer for other purpose may require special care such as mutex + * locking or checking whether the object is already destroyed. */ - const pjmedia_event_publisher *epub; + const void *epub; /** * Additional data/parameters about the event. The type of data @@ -238,164 +222,170 @@ typedef struct pjmedia_event } pjmedia_event; /** - * The callback to receive media events. The callback should increase - * \a proc_cnt field of the event if it processes the event. + * The callback to receive media events. * - * @param esub The subscription that was made initially to receive - * this event. - * @param event The media event itself. + * @param event The media event. + * @param user_data The user data associated with the callback. * * @return If the callback returns non-PJ_SUCCESS, this return - * code may be propagated back to the producer. + * code may be propagated back to the caller. */ -typedef pj_status_t pjmedia_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event); +typedef pj_status_t pjmedia_event_cb(pjmedia_event *event, + void *user_data); /** - * This structure keeps the data needed to maintain an event subscription. - * This data is normally kept by event publishers. + * This enumeration describes flags for event publication via + * #pjmedia_event_publish(). */ -struct pjmedia_event_subscription +typedef enum pjmedia_event_publish_flag { - /** Standard list members */ - PJ_DECL_LIST_MEMBER(pjmedia_event_subscription); + /** + * Publisher will only post the event to the event manager. It is the + * event manager that will later notify all the publisher's subscribers. + */ + PJMEDIA_EVENT_PUBLISH_POST_EVENT = 1 - /** Callback that will be called by publisher to report events. */ - pjmedia_event_cb *cb; +} pjmedia_event_publish_flag; - /** User data for this subscription */ - void *user_data; +/** + * Event manager flag. + */ +typedef enum pjmedia_event_mgr_flag +{ + /** + * Tell the event manager not to create any event worker thread. + */ + PJMEDIA_EVENT_MGR_NO_THREAD = 1 - /** Current publisher it is subscribed to */ - pjmedia_event_publisher *subscribe_to; -}; +} pjmedia_event_mgr_flag; /** - * This describes an event publisher. An event publisher is an object that - * maintains event subscriptions. When an event is published on behalf of - * a publisher with #pjmedia_event_publish(), that event will be propagated - * to all of the subscribers registered to the publisher. + * Opaque data type for event manager. Typically, the event manager + * is a singleton instance, although application may instantiate more than one + * instances of this if required. */ -struct pjmedia_event_publisher -{ - /** The object signature of the publisher */ - pjmedia_obj_sig sig; +typedef struct pjmedia_event_mgr pjmedia_event_mgr; - /** List of subscriptions for this event publisher */ - pjmedia_event_subscription subscription_list; -}; +/** + * Create a new event manager instance. This will also set the pointer + * to the singleton instance if the value is still NULL. + * + * @param pool Pool to allocate memory from. + * @param options Options. Bitmask flags from #pjmedia_event_mgr_flag + * @param mgr Pointer to hold the created instance of the + * event manager. + * + * @return PJ_SUCCESS on success or the appropriate error code. + */ +PJ_DECL(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool, + unsigned options, + pjmedia_event_mgr **mgr); /** - * Initialize event structure with basic data about the event. + * Get the singleton instance of the event manager. * - * @param event The event to be initialized. - * @param type The event type to be set for this event. - * @param ts Event timestamp. May be set to NULL to set the event - * timestamp to zero. - * @param epub Event publisher. + * @return The instance. */ -PJ_DECL(void) pjmedia_event_init(pjmedia_event *event, - pjmedia_event_type type, - const pj_timestamp *ts, - const pjmedia_event_publisher *epub); +PJ_DECL(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void); /** - * Initialize an event publisher structure. + * Manually assign a specific event manager instance as the singleton + * instance. Normally this is not needed if only one instance is ever + * going to be created, as the library automatically assign the singleton + * instance. * - * @param epub The event publisher. - * @param sig The object signature of the publisher. + * @param mgr The instance to be used as the singleton instance. + * Application may specify NULL to clear the singleton + * singleton instance. */ -PJ_DECL(void) pjmedia_event_publisher_init(pjmedia_event_publisher *epub, - pjmedia_obj_sig sig); +PJ_DECL(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr); /** - * Initialize subscription data. + * Destroy an event manager. If the manager happens to be the singleton + * instance, the singleton instance will be set to NULL. * - * @param esub The event subscription. - * @param cb The callback to receive events. - * @param user_data Arbitrary user data to be associated with the - * subscription. + * @param mgr The eventmanager. Specify NULL to use + * the singleton instance. */ -PJ_DECL(void) pjmedia_event_subscription_init(pjmedia_event_subscription *esub, - pjmedia_event_cb *cb, - void *user_data); +PJ_DECL(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr); /** - * Subscribe to events published by the specified publisher using the - * specified subscription object. The callback and user data fields of - * the subscription object must have been initialized prior to calling - * this function, and the subscription object must be kept alive throughout - * the duration of the subscription (e.g. it must not be allocated from - * the stack). + * Initialize event structure with basic data about the event. * - * Note that the subscriber may receive not only events emitted by + * @param event The event to be initialized. + * @param type The event type to be set for this event. + * @param ts Event timestamp. May be set to NULL to set the event + * timestamp to zero. + * @param src Event source. + */ +PJ_DECL(void) pjmedia_event_init(pjmedia_event *event, + pjmedia_event_type type, + const pj_timestamp *ts, + const void *src); + +/** + * Subscribe a callback function to events published by the specified + * publisher. Note that the subscriber may receive not only events emitted by * the specific publisher specified in the argument, but also from other * publishers contained by the publisher, if the publisher is republishing * events from other publishers. * + * @param mgr The event manager. + * @param pool Pool to allocate memory from. + * @param cb The callback function to receive the event. + * @param user_data The user data to be associated with the callback + * function. * @param epub The event publisher. - * @param esub The event subscription object. * * @return PJ_SUCCESS on success or the appropriate error code. */ -PJ_DECL(pj_status_t) pjmedia_event_subscribe(pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub); +PJ_DECL(pj_status_t) pjmedia_event_subscribe(pjmedia_event_mgr *mgr, + pj_pool_t *pool, + pjmedia_event_cb *cb, + void *user_data, + void *epub); /** - * Unsubscribe the specified subscription object from publisher it is - * currently subscribed to. If the subscription object is not currently - * subscribed to anything, the function will do nothing. + * Unsubscribe the callback associated with the user data from a publisher. + * If the user data is not specified, this function will do the + * unsubscription for all user data. If the publisher, epub, is not + * specified, this function will do the unsubscription from all publishers. * - * @param esub The event subscription object, which must be the same - * object that was given to #pjmedia_event_subscribe(). + * @param mgr The event manager. + * @param cb The callback function. + * @param user_data The user data associated with the callback + * function, can be NULL. + * @param epub The event publisher, can be NULL. * * @return PJ_SUCCESS on success or the appropriate error code. */ PJ_DECL(pj_status_t) -pjmedia_event_unsubscribe(pjmedia_event_subscription *esub); - -/** - * Check if the specified publisher has subscribers. - * - * @param epub The event publisher. - * - * @return PJ_TRUE if the publisher has at least one subscriber. - */ -PJ_DECL(pj_bool_t) -pjmedia_event_publisher_has_sub(pjmedia_event_publisher *epub); +pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr, + pjmedia_event_cb *cb, + void *user_data, + void *epub); /** * Publish the specified event to all subscribers of the specified event - * publisher. + * publisher. By default, the function will call all the subcribers' + * callbacks immediately. If the publisher uses the flag + * PJMEDIA_EVENT_PUBLISH_POST_EVENT, publisher will only post the event + * to the event manager and return immediately. It is the event manager + * that will later notify all the publisher's subscribers. * + * @param mgr The event manager. * @param epub The event publisher. * @param event The event to be published. + * @param flag Publication flag. * * @return PJ_SUCCESS only if all subscription callbacks returned * PJ_SUCCESS. */ -PJ_DECL(pj_status_t) pjmedia_event_publish(pjmedia_event_publisher *epub, - pjmedia_event *event); +PJ_DECL(pj_status_t) pjmedia_event_publish(pjmedia_event_mgr *mgr, + void *epub, + pjmedia_event *event, + pjmedia_event_publish_flag flag); -/** - * Subscribe to events produced by the source publisher in \a esrc and - * republish the events to all subscribers in \a epub publisher. - * - * @param esrc The event source from which events will be - * republished. - * @param epub Events from the event source above will be - * republished to subscribers of this publisher. - * @param esub The subscription object to be used to subscribe - * to \a esrc. This doesn't need to be initialized, - * but it must be kept alive throughout the lifetime - * of the subsciption. - * - * @return PJ_SUCCESS only if all subscription callbacks returned - * PJ_SUCCESS. - */ -PJ_DECL(pj_status_t) pjmedia_event_republish(pjmedia_event_publisher *esrc, - pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub); /** * @} PJMEDIA_EVENT diff --git a/pjmedia/include/pjmedia/port.h b/pjmedia/include/pjmedia/port.h index b6493413..4ea5de1d 100644 --- a/pjmedia/include/pjmedia/port.h +++ b/pjmedia/include/pjmedia/port.h @@ -399,11 +399,6 @@ typedef struct pjmedia_port */ pj_status_t (*on_destroy)(struct pjmedia_port *this_port); - /** - * Get event publisher for this media port, if any. - */ - pjmedia_event_publisher *(*get_event_pub)(struct pjmedia_port *this_port); - } pjmedia_port; @@ -483,17 +478,6 @@ PJ_DECL(pj_status_t) pjmedia_port_get_frame( pjmedia_port *port, PJ_DECL(pj_status_t) pjmedia_port_put_frame( pjmedia_port *port, pjmedia_frame *frame ); -/** - * Get the event publisher for the media port, if any. - * - * @param port The media port. - * - * @return The event publisher, or NULL if the port does not publish - * events. - */ -PJ_DECL(pjmedia_event_publisher*) -pjmedia_port_get_event_publisher(pjmedia_port *port); - /** * Destroy port (and subsequent downstream ports) * diff --git a/pjmedia/include/pjmedia/vid_codec.h b/pjmedia/include/pjmedia/vid_codec.h index 4726d2b2..9716e540 100644 --- a/pjmedia/include/pjmedia/vid_codec.h +++ b/pjmedia/include/pjmedia/vid_codec.h @@ -249,9 +249,6 @@ struct pjmedia_vid_codec /** Operations to codec. */ pjmedia_vid_codec_op *op; - - /** Event publisher object */ - pjmedia_event_publisher epub; }; diff --git a/pjmedia/include/pjmedia/vid_port.h b/pjmedia/include/pjmedia/vid_port.h index 66c31c84..3de28346 100644 --- a/pjmedia/include/pjmedia/vid_port.h +++ b/pjmedia/include/pjmedia/vid_port.h @@ -104,16 +104,6 @@ PJ_DECL(void) pjmedia_vid_port_set_cb(pjmedia_vid_port *vid_port, const pjmedia_vid_dev_cb *cb, void *user_data); -/** - * Get the event publisher instance of the video port. - * - * @param vid_port The video port. - * - * @return The event publisher of the video port. - */ -PJ_DECL(pjmedia_event_publisher*) -pjmedia_vid_port_get_event_publisher(pjmedia_vid_port *vid_port); - /** * Return the underlying video stream of the video port. * diff --git a/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c b/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c index 53602b67..ccd0901f 100644 --- a/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c +++ b/pjmedia/src/pjmedia-codec/ffmpeg_codecs.c @@ -1609,15 +1609,15 @@ static pj_status_t ffmpeg_codec_decode_whole(pjmedia_vid_codec *codec, } /* Broadcast event */ - if (pjmedia_event_publisher_has_sub(&codec->epub)) { + { pjmedia_event event; pjmedia_event_init(&event, PJMEDIA_EVENT_FMT_CHANGED, - &input->timestamp, &codec->epub); + &input->timestamp, codec); event.data.fmt_changed.dir = PJMEDIA_DIR_DECODING; pj_memcpy(&event.data.fmt_changed.new_fmt, &ff->param.dec_fmt, sizeof(ff->param.dec_fmt)); - pjmedia_event_publish(&codec->epub, &event); + pjmedia_event_publish(NULL, codec, &event, 0); } } @@ -1651,13 +1651,13 @@ static pj_status_t ffmpeg_codec_decode_whole(pjmedia_vid_codec *codec, output->size = vafp->framebytes; /* Check if we got key frame */ - if (avframe.key_frame && pjmedia_event_publisher_has_sub(&codec->epub)) + if (avframe.key_frame) { pjmedia_event event; pjmedia_event_init(&event, PJMEDIA_EVENT_KEY_FRAME_FOUND, - &output->timestamp, &codec->epub); - pjmedia_event_publish(&codec->epub, &event); + &output->timestamp, codec); + pjmedia_event_publish(NULL, codec, &event, 0); } } else { output->type = PJMEDIA_FRAME_TYPE_NONE; diff --git a/pjmedia/src/pjmedia-videodev/colorbar_dev.c b/pjmedia/src/pjmedia-videodev/colorbar_dev.c index e0d488da..6fb658ef 100644 --- a/pjmedia/src/pjmedia-videodev/colorbar_dev.c +++ b/pjmedia/src/pjmedia-videodev/colorbar_dev.c @@ -416,7 +416,6 @@ static pj_status_t cbar_factory_create_stream( strm->cbfi = cbfi; pj_memcpy(&strm->vafp, &vafp, sizeof(vafp)); strm->ts_inc = PJMEDIA_SPF2(param->clock_rate, &vfd->fps, 1); - pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_COLORBAR); for (i = 0; i < vfi->plane_cnt; ++i) { strm->first_line[i] = pj_pool_alloc(pool, vafp.strides[i]); diff --git a/pjmedia/src/pjmedia-videodev/dshow_dev.c b/pjmedia/src/pjmedia-videodev/dshow_dev.c index f65ed921..c116875f 100644 --- a/pjmedia/src/pjmedia-videodev/dshow_dev.c +++ b/pjmedia/src/pjmedia-videodev/dshow_dev.c @@ -869,7 +869,6 @@ static pj_status_t dshow_factory_create_stream( strm->pool = pool; pj_memcpy(&strm->vid_cb, cb, sizeof(*cb)); strm->user_data = user_data; - pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_DSHOW); if (param->dir & PJMEDIA_DIR_CAPTURE) { const pjmedia_video_format_detail *vfd; diff --git a/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c b/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c index 9f874077..1bfc0c27 100644 --- a/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c +++ b/pjmedia/src/pjmedia-videodev/ffmpeg_dev.c @@ -383,7 +383,6 @@ static pj_status_t ffmpeg_factory_create_stream( strm->factory = (ffmpeg_factory*)f; strm->pool = pool; pj_memcpy(&strm->param, param, sizeof(*param)); - pjmedia_event_publisher_init(&strm->base.epub); /* Done */ strm->base.op = &stream_op; diff --git a/pjmedia/src/pjmedia-videodev/sdl_dev.c b/pjmedia/src/pjmedia-videodev/sdl_dev.c index 465e37b5..109b22bd 100644 --- a/pjmedia/src/pjmedia-videodev/sdl_dev.c +++ b/pjmedia/src/pjmedia-videodev/sdl_dev.c @@ -308,7 +308,7 @@ static struct sdl_stream* find_stream(struct sdl_factory *sf, if (strm) pjmedia_event_init(pevent, PJMEDIA_EVENT_NONE, &strm->last_ts, - &strm->base.epub); + strm); return strm; } @@ -354,7 +354,7 @@ static pj_status_t handle_event(void *data) if (strm && pevent.type != PJMEDIA_EVENT_NONE) { pj_status_t status; - pjmedia_event_publish(&strm->base.epub, &pevent); + pjmedia_event_publish(NULL, strm, &pevent, 0); switch (pevent.type) { case PJMEDIA_EVENT_WND_RESIZED: @@ -375,9 +375,8 @@ static pj_status_t handle_event(void *data) sdl_stream_stop(&strm->base); sdl_destroy_all(strm); pjmedia_event_init(&pevent, PJMEDIA_EVENT_WND_CLOSED, - &strm->last_ts, - &strm->base.epub); - pjmedia_event_publish(&strm->base.epub, &pevent); + &strm->last_ts, strm); + pjmedia_event_publish(NULL, strm, &pevent, 0); /* * Note: don't access the stream after this point, it @@ -916,7 +915,6 @@ static pj_status_t sdl_factory_create_stream( strm->sf = sf; pj_memcpy(&strm->vid_cb, cb, sizeof(*cb)); strm->user_data = user_data; - pjmedia_event_publisher_init(&strm->base.epub, PJMEDIA_SIG_VID_DEV_SDL); /* Create render stream here */ if (param->dir & PJMEDIA_DIR_RENDER) { diff --git a/pjmedia/src/pjmedia-videodev/videodev.c b/pjmedia/src/pjmedia-videodev/videodev.c index 9a82ba6e..9b96520d 100644 --- a/pjmedia/src/pjmedia-videodev/videodev.c +++ b/pjmedia/src/pjmedia-videodev/videodev.c @@ -785,12 +785,6 @@ PJ_DEF(pj_status_t) pjmedia_vid_dev_stream_set_cap( return strm->op->set_cap(strm, cap, value); } -PJ_DEF(pjmedia_event_publisher*) -pjmedia_vid_dev_stream_get_event_publisher(pjmedia_vid_dev_stream *strm) -{ - return &strm->epub; -} - /* API: Start the stream. */ PJ_DEF(pj_status_t) pjmedia_vid_dev_stream_start(pjmedia_vid_dev_stream *strm) { diff --git a/pjmedia/src/pjmedia/event.c b/pjmedia/src/pjmedia/event.c index 8e92b53e..c499eaa5 100644 --- a/pjmedia/src/pjmedia/event.c +++ b/pjmedia/src/pjmedia/event.c @@ -19,150 +19,342 @@ #include #include #include +#include #include +#include +#include #include #define THIS_FILE "event.c" -#if 1 -# define TRACE_(x) PJ_LOG(6,x) -#else -# define TRACE_(x) -#endif +#define MAX_EVENTS 16 -PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, - pjmedia_event_type type, - const pj_timestamp *ts, - const pjmedia_event_publisher *epub) +typedef struct esub esub; + +struct esub { - pj_bzero(event, sizeof(*event)); - event->type = type; - if (ts) - event->timestamp.u64 = ts->u64; - event->epub = epub; - if (epub) - event->epub_sig = epub->sig; -} + PJ_DECL_LIST_MEMBER(esub); + + pjmedia_event_cb *cb; + void *user_data; + void *epub; +}; -PJ_DEF(void) pjmedia_event_publisher_init(pjmedia_event_publisher *epub, - pjmedia_obj_sig sig) +typedef struct event_queue { - pj_bzero(epub, sizeof(*epub)); - pj_list_init(&epub->subscription_list); - epub->sig = sig; -} + pjmedia_event events[MAX_EVENTS]; /**< array of events. */ + int head, tail; + pj_bool_t is_full; +} event_queue; + +struct pjmedia_event_mgr +{ + pj_thread_t *thread; /**< worker thread. */ + pj_bool_t is_quitting; + pj_sem_t *sem; + pj_mutex_t *mutex; + event_queue ev_queue; + event_queue *pub_ev_queue; /**< publish() event queue. */ + esub esub_list; /**< list of subscribers. */ + esub *th_next_sub, /**< worker thread's next sub. */ + *pub_next_sub; /**< publish() next sub. */ +}; -PJ_DEF(void) pjmedia_event_subscription_init( pjmedia_event_subscription *esub, - pjmedia_event_cb *cb, - void *user_data) +static pjmedia_event_mgr *event_manager_instance; + +static pj_status_t event_queue_add_event(event_queue* ev_queue, + pjmedia_event *event) { - pj_bzero(esub, sizeof(*esub)); - esub->cb = cb; - esub->user_data = user_data; + if (ev_queue->is_full) { + char ev_name[5]; + + /* This event will be ignored. */ + PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] " + "due to full queue.", + pjmedia_fourcc_name(event->type, ev_name), + event->epub)); + + return PJ_ETOOMANY; + } + + pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event)); + ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS; + if (ev_queue->tail == ev_queue->head) + ev_queue->is_full = PJ_TRUE; + + return PJ_SUCCESS; } -PJ_DEF(pj_bool_t) -pjmedia_event_publisher_has_sub(pjmedia_event_publisher *epub) +static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr, + event_queue *ev_queue, + esub **next_sub) { - PJ_ASSERT_RETURN(epub, PJ_FALSE); - return epub->subscription_list.next && - (!pj_list_empty(&epub->subscription_list)); + pj_status_t err = PJ_SUCCESS; + esub * sub = mgr->esub_list.next; + pjmedia_event *ev = &ev_queue->events[ev_queue->head]; + + while (sub != &mgr->esub_list) { + *next_sub = sub->next; + + /* Check if the subscriber is interested in + * receiving the event from the publisher. + */ + if (sub->epub == ev->epub || !sub->epub) { + pj_status_t status = (*sub->cb)(ev, sub->user_data); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + } + sub = *next_sub; + } + *next_sub = NULL; + + ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS; + ev_queue->is_full = PJ_FALSE; + + return err; } -PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub) +/* Event worker thread function. */ +static int event_worker_thread(void *arg) { - char epub_name[5]; + pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg; - PJ_ASSERT_RETURN(epub && esub && esub->cb, PJ_EINVAL); - /* Must not currently subscribe to anything */ - PJ_ASSERT_RETURN(esub->subscribe_to == NULL, PJ_EINVALIDOP); + while (1) { + /* Wait until there is an event. */ + pj_sem_wait(mgr->sem); - TRACE_((THIS_FILE, "Subscription [0x%p] to publisher %s", - esub, - pjmedia_fourcc_name(epub->sig, epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(epub_name); + if (mgr->is_quitting) + break; - pj_list_push_back(&epub->subscription_list, esub); - esub->subscribe_to = epub; - return PJ_SUCCESS; + pj_mutex_lock(mgr->mutex); + event_mgr_distribute_events(mgr, &mgr->ev_queue, &mgr->th_next_sub); + pj_mutex_unlock(mgr->mutex); + } + + return 0; } -PJ_DEF(pj_status_t) pjmedia_event_unsubscribe(pjmedia_event_subscription *esub) +PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool, + unsigned options, + pjmedia_event_mgr **p_mgr) { - PJ_ASSERT_RETURN(esub, PJ_EINVAL); - - if (esub->subscribe_to) { - char epub_name[5]; - TRACE_((THIS_FILE, "Unsubscription [0x%p] to publisher %s", - esub, - pjmedia_fourcc_name(esub->subscribe_to->sig, - epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(epub_name); - - PJ_ASSERT_RETURN( - pj_list_find_node(&esub->subscribe_to->subscription_list, - esub)==esub, PJ_ENOTFOUND); - pj_list_erase(esub); - esub->subscribe_to = NULL; + pjmedia_event_mgr *mgr; + pj_status_t status; + + mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr); + pj_list_init(&mgr->esub_list); + + if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) { + status = pj_sem_create(pool, "ev_sem", 0, MAX_EVENTS + 1, &mgr->sem); + if (status != PJ_SUCCESS) + return status; + + status = pj_thread_create(pool, "ev_thread", &event_worker_thread, + mgr, 0, 0, &mgr->thread); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; + } + } + + status = pj_mutex_create_recursive(pool, "ev_mutex", &mgr->mutex); + if (status != PJ_SUCCESS) { + pjmedia_event_mgr_destroy(mgr); + return status; } + + if (!event_manager_instance) + event_manager_instance = mgr; + + if (p_mgr) + *p_mgr = mgr; + return PJ_SUCCESS; } -PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_publisher *epub, - pjmedia_event *event) +PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void) { - pjmedia_event_subscription *esub; - char event_name[5]; - char epub_name[5]; - pj_status_t err = PJ_SUCCESS; + return event_manager_instance; +} - PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); +PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr) +{ + event_manager_instance = mgr; +} - TRACE_((THIS_FILE, "Event %s is published by publisher %s", - pjmedia_fourcc_name(event->type, event_name), - pjmedia_fourcc_name(epub->sig, epub_name))); - /* Suppress compiler warning if trace is disabled */ - PJ_UNUSED_ARG(event_name); - PJ_UNUSED_ARG(epub_name); +PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr) +{ + esub *sub; - esub = epub->subscription_list.next; - if (!esub) - return err; + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_ON_FAIL(mgr != NULL, return); - while (esub != &epub->subscription_list) { - pjmedia_event_subscription *next; - pj_status_t status; + if (mgr->thread) { + mgr->is_quitting = PJ_TRUE; + pj_sem_post(mgr->sem); + pj_thread_join(mgr->thread); + } - /* just in case esub is destroyed in the callback */ - next = esub->next; + if (mgr->sem) { + pj_sem_destroy(mgr->sem); + mgr->sem = NULL; + } - status = (*esub->cb)(esub, event); - if (status != PJ_SUCCESS && err == PJ_SUCCESS) - err = status; + if (mgr->mutex) { + pj_mutex_destroy(mgr->mutex); + mgr->mutex = NULL; + } - esub = next; + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + pj_list_erase(sub); + sub = next; } - return err; + if (event_manager_instance == mgr) + event_manager_instance = NULL; } -static pj_status_t republisher_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, + pjmedia_event_type type, + const pj_timestamp *ts, + const void *src) { - return pjmedia_event_publish((pjmedia_event_publisher*)esub->user_data, - event); + pj_bzero(event, sizeof(*event)); + event->type = type; + if (ts) + event->timestamp.u64 = ts->u64; + event->epub = event->src = src; } -PJ_DEF(pj_status_t) pjmedia_event_republish(pjmedia_event_publisher *esrc, - pjmedia_event_publisher *epub, - pjmedia_event_subscription *esub) +PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr, + pj_pool_t *pool, + pjmedia_event_cb *cb, + void *user_data, + void *epub) { - PJ_ASSERT_RETURN(esrc && epub && esub, PJ_EINVAL); + esub *sub; + + PJ_ASSERT_RETURN(pool && cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + /* Check whether callback function with the same user data is already + * subscribed to the publisher. This is to prevent the callback function + * receiving the same event from the same publisher more than once. + */ + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && sub->user_data == user_data && + sub->epub == epub) + { + pj_mutex_unlock(mgr->mutex); + return PJ_SUCCESS; + } + sub = next; + } + + sub = PJ_POOL_ZALLOC_T(pool, esub); + sub->cb = cb; + sub->user_data = user_data; + sub->epub = epub; + pj_list_push_back(&mgr->esub_list, sub); + pj_mutex_unlock(mgr->mutex); - pjmedia_event_subscription_init(esub, &republisher_cb, epub); - return pjmedia_event_subscribe(esrc, esub); + return PJ_SUCCESS; } +PJ_DEF(pj_status_t) +pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr, + pjmedia_event_cb *cb, + void *user_data, + void *epub) +{ + esub *sub; + + PJ_ASSERT_RETURN(cb, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + pj_mutex_lock(mgr->mutex); + sub = mgr->esub_list.next; + while (sub != &mgr->esub_list) { + esub *next = sub->next; + if (sub->cb == cb && (sub->user_data == user_data || !user_data) && + (sub->epub == epub || !epub)) + { + /* If the worker thread or pjmedia_event_publish() API is + * in the process of distributing events, make sure that + * its pointer to the next subscriber stays valid. + */ + if (mgr->th_next_sub == sub) + mgr->th_next_sub = sub->next; + if (mgr->pub_next_sub == sub) + mgr->pub_next_sub = sub->next; + pj_list_erase(sub); + if (user_data && epub) + break; + } + sub = next; + } + pj_mutex_unlock(mgr->mutex); + + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr, + void *epub, + pjmedia_event *event, + pjmedia_event_publish_flag flag) +{ + pj_status_t err = PJ_SUCCESS; + + PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); + + if (!mgr) mgr = pjmedia_event_mgr_instance(); + PJ_ASSERT_RETURN(mgr, PJ_EINVAL); + + event->epub = epub; + + pj_mutex_lock(mgr->mutex); + if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) { + if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS) + pj_sem_post(mgr->sem); + } else { + /* For nested pjmedia_event_publish() calls, i.e. calling publish() + * inside the subscriber's callback, the function will only add + * the event to the event queue of the first publish() call. It + * is the first publish() call that will be responsible to + * distribute the events. + */ + if (mgr->pub_ev_queue) { + event_queue_add_event(mgr->pub_ev_queue, event); + } else { + static event_queue ev_queue; + pj_status_t status; + + ev_queue.head = ev_queue.tail = 0; + ev_queue.is_full = PJ_FALSE; + mgr->pub_ev_queue = &ev_queue; + + event_queue_add_event(mgr->pub_ev_queue, event); + + do { + status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue, + &mgr->pub_next_sub); + if (status != PJ_SUCCESS && err == PJ_SUCCESS) + err = status; + } while(ev_queue.head != ev_queue.tail || ev_queue.is_full); + + mgr->pub_ev_queue = NULL; + } + } + pj_mutex_unlock(mgr->mutex); + + return err; +} diff --git a/pjmedia/src/pjmedia/port.c b/pjmedia/src/pjmedia/port.c index e838263a..b7defae5 100644 --- a/pjmedia/src/pjmedia/port.c +++ b/pjmedia/src/pjmedia/port.c @@ -117,20 +117,6 @@ PJ_DEF(pj_status_t) pjmedia_port_put_frame( pjmedia_port *port, return PJ_EINVALIDOP; } -/* - * Get event publisher - */ -PJ_DEF(pjmedia_event_publisher*) -pjmedia_port_get_event_publisher(pjmedia_port *port) -{ - PJ_ASSERT_RETURN(port, NULL); - - if (port->get_event_pub) - return (*port->get_event_pub)(port); - - return NULL; -} - /** * Destroy port (and subsequent downstream ports) */ diff --git a/pjmedia/src/pjmedia/vid_codec.c b/pjmedia/src/pjmedia/vid_codec.c index 04d2f455..2002c6d5 100644 --- a/pjmedia/src/pjmedia/vid_codec.c +++ b/pjmedia/src/pjmedia/vid_codec.c @@ -77,7 +77,6 @@ PJ_DEF(void) pjmedia_vid_codec_reset(pjmedia_vid_codec *codec, pjmedia_obj_sig sig) { pj_bzero(codec, sizeof(*codec)); - pjmedia_event_publisher_init(&codec->epub, sig); } /* diff --git a/pjmedia/src/pjmedia/vid_port.c b/pjmedia/src/pjmedia/vid_port.c index fa5c2da6..1c1cdedf 100644 --- a/pjmedia/src/pjmedia/vid_port.c +++ b/pjmedia/src/pjmedia/vid_port.c @@ -61,10 +61,6 @@ struct pjmedia_vid_port pj_size_t conv_buf_size; pjmedia_conversion_param conv_param; - pjmedia_event_publisher epub; - pjmedia_event_subscription esub_dev; - pjmedia_event_subscription esub_client_port; - pjmedia_clock *clock; pjmedia_clock_src clocksrc; @@ -94,10 +90,10 @@ static pj_status_t vidstream_cap_cb(pjmedia_vid_dev_stream *stream, static pj_status_t vidstream_render_cb(pjmedia_vid_dev_stream *stream, void *user_data, pjmedia_frame *frame); -static pj_status_t vidstream_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event); -static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event); +static pj_status_t vidstream_event_cb(pjmedia_event *event, + void *user_data); +static pj_status_t client_port_event_cb(pjmedia_event *event, + void *user_data); static void enc_clock_cb(const pj_timestamp *ts, void *user_data); static void dec_clock_cb(const pj_timestamp *ts, void *user_data); @@ -206,7 +202,6 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_create( pj_pool_t *pool, vp->role = prm->active ? ROLE_ACTIVE : ROLE_PASSIVE; vp->dir = prm->vidparam.dir; // vp->cap_size = vfd->size; - pjmedia_event_publisher_init(&vp->epub, SIGNATURE); vparam = prm->vidparam; dev_name[0] = '\0'; @@ -272,10 +267,8 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_create( pj_pool_t *pool, vparam.fmt.det.vid.fps.num, vparam.fmt.det.vid.fps.denum)); /* Subscribe to device's events */ - pjmedia_event_subscription_init(&vp->esub_dev, vidstream_event_cb, vp); - pjmedia_event_subscribe( - pjmedia_vid_dev_stream_get_event_publisher(vp->strm), - &vp->esub_dev); + pjmedia_event_subscribe(NULL, vp->pool, &vidstream_event_cb, + vp, vp->strm); if (vp->dir & PJMEDIA_DIR_CAPTURE) { pjmedia_format_copy(&vp->conv_param.src, &vparam.fmt); @@ -370,13 +363,6 @@ PJ_DEF(void) pjmedia_vid_port_set_cb(pjmedia_vid_port *vid_port, vid_port->strm_cb_data = user_data; } -PJ_DEF(pjmedia_event_publisher*) -pjmedia_vid_port_get_event_publisher(pjmedia_vid_port *vid_port) -{ - PJ_ASSERT_RETURN(vid_port, NULL); - return &vid_port->epub; -} - PJ_DEF(pjmedia_vid_dev_stream*) pjmedia_vid_port_get_stream(pjmedia_vid_port *vp) { @@ -419,20 +405,14 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_connect(pjmedia_vid_port *vp, pjmedia_port *port, pj_bool_t destroy) { - pjmedia_event_publisher *epub; - PJ_ASSERT_RETURN(vp && vp->role==ROLE_ACTIVE, PJ_EINVAL); vp->destroy_client_port = destroy; vp->client_port = port; /* Subscribe to client port's events */ - epub = pjmedia_port_get_event_publisher(port); - if (epub) { - pjmedia_event_subscription_init(&vp->esub_client_port, - &client_port_event_cb, - vp); - pjmedia_event_subscribe(epub, &vp->esub_client_port); - } + pjmedia_event_subscribe(NULL, vp->pool, &client_port_event_cb, vp, + vp->client_port); + return PJ_SUCCESS; } @@ -441,10 +421,10 @@ PJ_DEF(pj_status_t) pjmedia_vid_port_disconnect(pjmedia_vid_port *vp) { PJ_ASSERT_RETURN(vp && vp->role==ROLE_ACTIVE, PJ_EINVAL); - if (vp->client_port) { - pjmedia_event_unsubscribe(&vp->esub_client_port); - vp->client_port = NULL; - } + pjmedia_event_unsubscribe(NULL, &client_port_event_cb, vp, + vp->client_port); + vp->client_port = NULL; + return PJ_SUCCESS; } @@ -510,10 +490,13 @@ PJ_DEF(void) pjmedia_vid_port_destroy(pjmedia_vid_port *vp) vp->clock = NULL; } if (vp->strm) { + pjmedia_event_unsubscribe(NULL, &vidstream_event_cb, vp, vp->strm); pjmedia_vid_dev_stream_destroy(vp->strm); vp->strm = NULL; } if (vp->client_port) { + pjmedia_event_unsubscribe(NULL, &client_port_event_cb, vp, + vp->client_port); if (vp->destroy_client_port) pjmedia_port_destroy(vp->client_port); vp->client_port = NULL; @@ -560,26 +543,24 @@ static void save_rgb_frame(int width, int height, const pjmedia_frame *frm) */ /* Handle event from vidstream */ -static pj_status_t vidstream_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t vidstream_event_cb(pjmedia_event *event, + void *user_data) { - pjmedia_vid_port *vp = (pjmedia_vid_port*)esub->user_data; + pjmedia_vid_port *vp = (pjmedia_vid_port*)user_data; /* Just republish the event to our client */ - return pjmedia_event_publish(&vp->epub, event); + return pjmedia_event_publish(NULL, vp, event, 0); } -static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t client_port_event_cb(pjmedia_event *event, + void *user_data) { - pjmedia_vid_port *vp = (pjmedia_vid_port*)esub->user_data; + pjmedia_vid_port *vp = (pjmedia_vid_port*)user_data; if (event->type == PJMEDIA_EVENT_FMT_CHANGED) { const pjmedia_video_format_detail *vfd; pj_status_t status; - ++event->proc_cnt; - pjmedia_vid_port_stop(vp); /* Retrieve the video format detail */ @@ -633,8 +614,11 @@ static pj_status_t client_port_event_cb(pjmedia_event_subscription *esub, pjmedia_vid_port_start(vp); } - /* Republish the event */ - return pjmedia_event_publish(&vp->epub, event); + /* Republish the event, post the event to the event manager + * to avoid deadlock if vidport is trying to stop the clock. + */ + return pjmedia_event_publish(NULL, vp, event, + PJMEDIA_EVENT_PUBLISH_POST_EVENT); } static pj_status_t convert_frame(pjmedia_vid_port *vp, diff --git a/pjmedia/src/pjmedia/vid_stream.c b/pjmedia/src/pjmedia/vid_stream.c index 876b0165..9879d9bb 100644 --- a/pjmedia/src/pjmedia/vid_stream.c +++ b/pjmedia/src/pjmedia/vid_stream.c @@ -147,9 +147,6 @@ struct pjmedia_vid_stream pjmedia_vid_codec *codec; /**< Codec instance being used. */ pj_uint32_t last_dec_ts; /**< Last decoded timestamp. */ int last_dec_seq; /**< Last decoded sequence. */ - - pjmedia_event_subscription esub_codec; /**< To subscribe codec events */ - pjmedia_event_publisher epub; /**< To publish events */ }; /* Prototypes */ @@ -339,18 +336,15 @@ static void dump_port_info(const pjmedia_vid_channel *chan, /* * Handle events from stream components. */ -static pj_status_t stream_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t stream_event_cb(pjmedia_event *event, + void *user_data) { - pjmedia_vid_stream *stream = (pjmedia_vid_stream*)esub->user_data; + pjmedia_vid_stream *stream = (pjmedia_vid_stream*)user_data; - if (esub == &stream->esub_codec) { + if (event->epub == stream->codec) { /* This is codec event */ switch (event->type) { case PJMEDIA_EVENT_FMT_CHANGED: - /* we process the event */ - ++event->proc_cnt; - /* Copy the event to avoid deadlock if we publish the event * now. This happens because fmt_event may trigger restart * while we're still holding the jb_mutex. @@ -362,13 +356,7 @@ static pj_status_t stream_event_cb(pjmedia_event_subscription *esub, } } - return pjmedia_event_publish(&stream->epub, event); -} - -static pjmedia_event_publisher *port_get_epub(pjmedia_port *port) -{ - pjmedia_vid_stream *stream = (pjmedia_vid_stream*) port->port_data.pdata; - return &stream->epub; + return pjmedia_event_publish(NULL, stream, event, 0); } #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 @@ -1031,18 +1019,18 @@ static pj_status_t decode_frame(pjmedia_vid_stream *stream, vfd->fps.num / vfd->fps.denum)); /* Publish PJMEDIA_EVENT_FMT_CHANGED event */ - if (pjmedia_event_publisher_has_sub(&stream->epub)) { + { pjmedia_event event; dump_port_info(stream->dec, "changed"); pjmedia_event_init(&event, PJMEDIA_EVENT_FMT_CHANGED, - &frame->timestamp, &stream->epub); + &frame->timestamp, &stream); event.data.fmt_changed.dir = PJMEDIA_DIR_DECODING; pj_memcpy(&event.data.fmt_changed.new_fmt, &stream->info.codec_param->dec_fmt, sizeof(pjmedia_format)); - pjmedia_event_publish(&stream->epub, &event); + pjmedia_event_publish(NULL, stream, &event, 0); } } } @@ -1086,7 +1074,7 @@ static pj_status_t get_frame(pjmedia_port *port, stream->dec : stream->enc, "changed"); - pjmedia_event_publish(&stream->epub, &stream->fmt_event); + pjmedia_event_publish(NULL, stream, &stream->fmt_event, 0); stream->fmt_event.type = PJMEDIA_EVENT_NONE; } @@ -1211,7 +1199,6 @@ static pj_status_t create_channel( pj_pool_t *pool, /* Init port. */ channel->port.port_data.pdata = stream; - channel->port.get_event_pub = &port_get_epub; PJ_LOG(5, (name.ptr, "%s channel created %dx%d %s%s%.*s %d/%d(~%d)fps", @@ -1345,11 +1332,9 @@ PJ_DEF(pj_status_t) pjmedia_vid_stream_create( if (status != PJ_SUCCESS) return status; - /* Init event publisher and subscribe to codec events */ - pjmedia_event_publisher_init(&stream->epub, SIGNATURE); - pjmedia_event_subscription_init(&stream->esub_codec, &stream_event_cb, - stream); - pjmedia_event_subscribe(&stream->codec->epub, &stream->esub_codec); + /* Subscribe to codec events */ + pjmedia_event_subscribe(NULL, pool, &stream_event_cb, stream, + stream->codec); /* Estimate the maximum frame size */ stream->frame_size = vfd_enc->size.w * vfd_enc->size.h * 4; @@ -1556,6 +1541,8 @@ PJ_DEF(pj_status_t) pjmedia_vid_stream_destroy( pjmedia_vid_stream *stream ) /* Free codec. */ if (stream->codec) { + pjmedia_event_unsubscribe(NULL, &stream_event_cb, stream, + stream->codec); pjmedia_vid_codec_close(stream->codec); pjmedia_vid_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec); stream->codec = NULL; diff --git a/pjmedia/src/test/test.c b/pjmedia/src/test/test.c index 2b7c9ca5..275d2da0 100644 --- a/pjmedia/src/test/test.c +++ b/pjmedia/src/test/test.c @@ -72,6 +72,7 @@ int test_main(void) #if defined(PJMEDIA_HAS_VIDEO) && (PJMEDIA_HAS_VIDEO != 0) pjmedia_video_format_mgr_create(pool, 64, 0, NULL); pjmedia_converter_mgr_create(pool, NULL); + pjmedia_event_mgr_create(pool, 0, NULL); pjmedia_vid_codec_mgr_create(pool, NULL); #endif @@ -115,6 +116,7 @@ on_return: #if defined(PJMEDIA_HAS_VIDEO) && (PJMEDIA_HAS_VIDEO != 0) pjmedia_video_format_mgr_destroy(pjmedia_video_format_mgr_instance()); pjmedia_converter_mgr_destroy(pjmedia_converter_mgr_instance()); + pjmedia_event_mgr_destroy(pjmedia_event_mgr_instance()); pjmedia_vid_codec_mgr_destroy(pjmedia_vid_codec_mgr_instance()); #endif diff --git a/pjmedia/src/test/vid_codec_test.c b/pjmedia/src/test/vid_codec_test.c index a6c62469..11ef93ab 100644 --- a/pjmedia/src/test/vid_codec_test.c +++ b/pjmedia/src/test/vid_codec_test.c @@ -47,18 +47,16 @@ typedef struct codec_port_data_t pj_size_t pack_buf_size; } codec_port_data_t; -static pj_status_t codec_on_event(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t codec_on_event(pjmedia_event *event, + void *user_data) { - codec_port_data_t *port_data = (codec_port_data_t*)esub->user_data; + codec_port_data_t *port_data = (codec_port_data_t*)user_data; if (event->type == PJMEDIA_EVENT_FMT_CHANGED) { pjmedia_vid_codec *codec = port_data->codec; pjmedia_vid_codec_param codec_param; pj_status_t status; - ++event->proc_cnt; - status = pjmedia_vid_codec_get_param(codec, &codec_param); if (status != PJ_SUCCESS) return status; @@ -200,7 +198,6 @@ static int encode_decode_test(pj_pool_t *pool, const char *codec_id, pjmedia_vid_port *capture=NULL, *renderer=NULL; pjmedia_vid_port_param vport_param; pjmedia_video_format_detail *vfd; - pjmedia_event_subscription esub; char codec_name[5]; pj_status_t status; int rc = 0; @@ -323,9 +320,8 @@ static int encode_decode_test(pj_pool_t *pool, const char *codec_id, codec_param.dec_fmt.det = codec_param.enc_fmt.det; /* Subscribe to codec events */ - pjmedia_event_subscription_init(&esub, &codec_on_event, - &codec_port_data); - pjmedia_event_subscribe(&codec->epub, &esub); + pjmedia_event_subscribe(NULL, pool, &codec_on_event, &codec_port_data, + codec); } pjmedia_vid_port_param_default(&vport_param); @@ -430,6 +426,8 @@ on_return: pjmedia_vid_port_destroy(renderer); } if (codec) { + pjmedia_event_unsubscribe(NULL, &codec_on_event, &codec_port_data, + codec); pjmedia_vid_codec_close(codec); pjmedia_vid_codec_mgr_dealloc_codec(NULL, codec); } diff --git a/pjmedia/src/test/vid_dev_test.c b/pjmedia/src/test/vid_dev_test.c index 7425e49b..5782337b 100644 --- a/pjmedia/src/test/vid_dev_test.c +++ b/pjmedia/src/test/vid_dev_test.c @@ -76,10 +76,10 @@ static int enum_devs(void) return PJ_SUCCESS; } -static pj_status_t vid_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t vid_event_cb(pjmedia_event *event, + void *user_data) { - PJ_UNUSED_ARG(esub); + PJ_UNUSED_ARG(user_data); if (event->type == PJMEDIA_EVENT_WND_CLOSED) is_quitting = PJ_TRUE; @@ -95,7 +95,6 @@ static int capture_render_loopback(int cap_dev_id, int rend_dev_id, pjmedia_vid_dev_info cdi, rdi; pjmedia_vid_port_param param; pjmedia_video_format_detail *vfd; - pjmedia_event_subscription esub; pj_status_t status; int rc = 0, i; @@ -161,10 +160,7 @@ static int capture_render_loopback(int cap_dev_id, int rend_dev_id, } /* Set event handler */ - pjmedia_event_subscription_init(&esub, &vid_event_cb, NULL); - pjmedia_event_subscribe( - pjmedia_vid_port_get_event_publisher(renderer), - &esub); + pjmedia_event_subscribe(NULL, pool, &vid_event_cb, NULL, renderer); /* Connect capture to renderer */ status = pjmedia_vid_port_connect( @@ -196,8 +192,10 @@ on_return: if (capture) pjmedia_vid_port_destroy(capture); - if (renderer) + if (renderer) { + pjmedia_event_unsubscribe(NULL, &vid_event_cb, NULL, renderer); pjmedia_vid_port_destroy(renderer); + } pj_pool_release(pool); return rc; diff --git a/pjmedia/src/test/vid_port_test.c b/pjmedia/src/test/vid_port_test.c index fed61bb1..dd3bf37e 100644 --- a/pjmedia/src/test/vid_port_test.c +++ b/pjmedia/src/test/vid_port_test.c @@ -32,10 +32,10 @@ static pj_bool_t is_quitting = PJ_FALSE; -static pj_status_t vid_event_cb(pjmedia_event_subscription *esub, - pjmedia_event *event) +static pj_status_t vid_event_cb(pjmedia_event *event, + void *user_data) { - PJ_UNUSED_ARG(esub); + PJ_UNUSED_ARG(user_data); if (event->type == PJMEDIA_EVENT_WND_CLOSED) is_quitting = PJ_TRUE; @@ -52,7 +52,6 @@ static int capture_render_loopback(pj_bool_t active, pjmedia_vid_dev_info cdi, rdi; pjmedia_vid_port_param param; pjmedia_video_format_detail *vfd; - pjmedia_event_subscription esub; pj_status_t status; int rc = 0, i; @@ -118,10 +117,7 @@ static int capture_render_loopback(pj_bool_t active, } /* Set event handler */ - pjmedia_event_subscription_init(&esub, &vid_event_cb, NULL); - pjmedia_event_subscribe( - pjmedia_vid_port_get_event_publisher(renderer), - &esub); + pjmedia_event_subscribe(NULL, pool, &vid_event_cb, NULL, renderer); /* Connect capture to renderer */ status = pjmedia_vid_port_connect( @@ -153,8 +149,10 @@ on_return: if (capture) pjmedia_vid_port_destroy(capture); - if (renderer) + if (renderer) { + pjmedia_event_unsubscribe(NULL, &vid_event_cb, NULL, renderer); pjmedia_vid_port_destroy(renderer); + } pj_pool_release(pool); return rc; -- cgit v1.2.3