diff options
Diffstat (limited to 'include/asterisk')
-rw-r--r-- | include/asterisk/stasis.h | 116 | ||||
-rw-r--r-- | include/asterisk/stasis_message_router.h | 24 |
2 files changed, 127 insertions, 13 deletions
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 7e46dbf41..e6ea6fa13 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -124,6 +124,34 @@ * stasis_subscription. Due to cyclic references, the \ref * stasis_subscription will not be freed until after it has been unsubscribed, * and all other ao2_ref()'s have been cleaned up. + * + * \par Shutdown + * + * Subscriptions have two options for unsubscribing, depending upon the context + * in which you need to unsubscribe. + * + * If your subscription is owned by a module, and you must unsubscribe from the + * module_unload() function, then you'll want to use the + * stasis_unsubscribe_and_join() function. This will block until the final + * message has been received on the subscription. Otherwise, there's the danger + * of invoking the callback function after it has been unloaded. + * + * If your subscription is owned by an object, then your object should have an + * explicit shutdown() function, which calls stasis_unsubscribe(). In your + * subscription handler, when the stasis_subscription_final_message() has been + * received, decrement the refcount on your object. In your object's destructor, + * you may assert that stasis_subscription_is_done() to validate that the + * subscription's callback will no longer be invoked. + * + * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from + * an object's destructor. While code that does this may work most of the time, + * it's got one big downside. There's a general assumption that object + * destruction is non-blocking. If you block the destruction waiting for the + * subscription to complete, there's the danger that the subscription may + * process a message which will bump the refcount up by one. Then it does + * whatever it does, decrements the refcount, which then proceeds to re-destroy + * the object. Now you've got hard to reproduce bugs that only show up under + * certain loads. */ #include "asterisk/utils.h" @@ -292,8 +320,7 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s * \since 12 */ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, - stasis_subscription_cb callback, - void *data); + stasis_subscription_cb callback, void *data); /*! * \brief Cancel a subscription. @@ -304,10 +331,52 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, * delivery of the final message. * * \param subscription Subscription to cancel. - * \retval NULL for convenience + * \return \c NULL for convenience + * \since 12 + */ +struct stasis_subscription *stasis_unsubscribe( + struct stasis_subscription *subscription); + +/*! + * \brief Block until the last message is processed on a subscription. + * + * This function will not return until the \a subscription's callback for the + * stasis_subscription_final_message() completes. This allows cleanup routines + * to run before unblocking the joining thread. + * + * \param subscription Subscription to block on. + * \since 12 + */ +void stasis_subscription_join(struct stasis_subscription *subscription); + +/*! + * \brief Returns whether \a subscription has received its final message. + * + * Note that a subscription is considered done even while the + * stasis_subscription_final_message() is being processed. This allows cleanup + * routines to check the status of the subscription. + * + * \param subscription Subscription. + * \return True (non-zero) if stasis_subscription_final_message() has been + * received. + * \return False (zero) if waiting for the end. + */ +int stasis_subscription_is_done(struct stasis_subscription *subscription); + +/*! + * \brief Cancel a subscription, blocking until the last message is processed. + * + * While normally it's recommended to stasis_unsubscribe() and wait for + * stasis_subscription_final_message(), there are times (like during a module + * unload) where you have to wait for the final message (otherwise you'll call + * a function in a shared module that no longer exists). + * + * \param subscription Subscription to cancel. + * \return \c NULL for convenience * \since 12 */ -struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription); +struct stasis_subscription *stasis_unsubscribe_and_join( + struct stasis_subscription *subscription); /*! * \brief Create a subscription which forwards all messages from one topic to @@ -322,7 +391,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subsc * \return \c NULL on error. * \since 12 */ -struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic); +struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, + struct stasis_topic *to_topic); /*! * \brief Get the unique ID for the subscription. @@ -389,7 +459,8 @@ struct stasis_topic_pool; /*! * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic * \param pooled_topic Topic to which messages will be routed - * \retval the new stasis_topic_pool or NULL on failure + * \return the new stasis_topic_pool + * \return \c NULL on failure */ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic); @@ -397,8 +468,8 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t * \brief Find or create a topic in the pool * \param pool Pool for which to get the topic * \param topic_name Name of the topic to get - * \retval The already stored or newly allocated topic - * \retval NULL if the topic was not found and could not be allocated + * \return The already stored or newly allocated topic + * \return \c NULL if the topic was not found and could not be allocated */ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name); @@ -496,12 +567,31 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message); struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn); /*! - * Unsubscribes a caching topic from its upstream topic. + * \brief Unsubscribes a caching topic from its upstream topic. + * + * This function returns immediately, so be sure to cleanup when + * stasis_subscription_final_message() is received. + * + * \param caching_topic Caching topic to unsubscribe + * \return \c NULL for convenience + * \since 12 + */ +struct stasis_caching_topic *stasis_caching_unsubscribe( + struct stasis_caching_topic *caching_topic); + +/*! + * \brief Unsubscribes a caching topic from its upstream topic, blocking until + * all messages have been forwarded. + * + * See stasis_unsubscriben_and_join() for more info on when to use this as + * opposed to stasis_caching_unsubscribe(). + * * \param caching_topic Caching topic to unsubscribe - * \retval NULL for convenience + * \return \c NULL for convenience * \since 12 */ -struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic); +struct stasis_caching_topic *stasis_caching_unsubscribe_and_join( + struct stasis_caching_topic *caching_topic); /*! * \brief Returns the topic of cached events from a caching topics. @@ -530,9 +620,9 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top /*! * \brief Dump cached items to a subscription * \param caching_topic The topic returned from stasis_caching_topic_create(). - * \param type Type of message to dump (any type if NULL). + * \param type Type of message to dump (any type if \c NULL). * \return ao2_container containing all matches (must be unreffed by caller) - * \return NULL on allocation error + * \return \c NULL on allocation error * \since 12 */ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 42770d293..e7d5a4cc6 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -57,12 +57,36 @@ struct stasis_message_router *stasis_message_router_create( /*! * \brief Unsubscribe the router from the upstream topic. + * * \param router Router to unsubscribe. * \since 12 */ void stasis_message_router_unsubscribe(struct stasis_message_router *router); /*! + * \brief Unsubscribe the router from the upstream topic, blocking until the + * final message has been processed. + * + * See stasis_unsubscribe_and_join() for info on when to use this + * vs. stasis_message_router_unsubscribe(). + * + * \param router Router to unsubscribe. + * \since 12 + */ +void stasis_message_router_unsubscribe_and_join( + struct stasis_message_router *router); + +/*! + * \brief Returns whether \a router has received its final message. + * + * \param router Router. + * \return True (non-zero) if stasis_subscription_final_message() has been + * received. + * \return False (zero) if waiting for the end. + */ +int stasis_message_router_is_done(struct stasis_message_router *router); + +/*! * \brief Add a route to a message router. * \param router Router to add the route to. * \param message_type Type of message to route. |