diff options
Diffstat (limited to 'include/asterisk/stasis.h')
-rw-r--r-- | include/asterisk/stasis.h | 130 |
1 files changed, 88 insertions, 42 deletions
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index fd60724c0..3b9cec34f 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -94,13 +94,24 @@ * in the system, and it's desirable to query that state from the cache without * locking the original object. It's also desirable for subscribers of the * caching topic to receive messages that have both the old cache value and the - * new value being put into the cache. For this, we have - * stasis_caching_topic_create(), providing it with the topic which publishes - * the messages that you wish to cache, and a function that can identify - * cacheable messages. - * - * The returned \ref stasis_caching_topic provides a topic that forwards - * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref + * new value being put into the cache. For this, we have stasis_cache_create() + * and stasis_caching_topic_create(), providing them with the topic which + * publishes the messages that you wish to cache, and a function that can + * identify cacheable messages. + * + * The \ref stasis_cache is designed so that it may be shared amongst several + * \ref stasis_caching_topic objects. This allows you to have individual caching + * topics per-object (i.e. so you can subscribe to updates for a single object), + * and still have a single cache to query for the state of all objects. While a + * cache may be shared amongst different message types, such a usage is probably + * not a good idea. + * + * The \ref stasis_cache can only be written to by \ref stasis_caching_topics. + * It's a thread safe container, so freely use the stasis_cache_get() and + * stasis_cache_dump() to query the cache. + * + * The \ref stasis_caching_topic provides a topic that forwards non-cacheable + * messages unchanged. A cacheable message is wrapped in a \ref * stasis_cache_update message which provides the old snapshot (or \c NULL if * this is a new cache entry), and the new snapshot (or \c NULL if the entry was * removed from the cache). A stasis_cache_clear_create() message must be sent @@ -111,6 +122,9 @@ * stasis_caching_topic will not be freed until after it has been unsubscribed, * and all other ao2_ref()'s have been cleaned up. * + * The \ref stasis_cache object is a normal AO2 managed object, which can be + * release with ao2_cleanup(). + * * \par stasis_subscriber * * Any topic may be subscribed to by simply providing stasis_subscribe() the @@ -345,6 +359,15 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message); +/*! + * \brief Wait for all pending messages on a given topic to be processed. + * \param topic Topic to await pending messages on. + * \return 0 on success. + * \return Non-zero on error. + * \since 12 + */ +int stasis_topic_wait(struct stasis_topic *topic); + /*! @} */ /*! @{ */ @@ -514,6 +537,8 @@ struct stasis_message_type *stasis_subscription_change_type(void); /*! @} */ +/*! @{ */ + /*! * \brief Pool for topic aggregation */ @@ -575,22 +600,17 @@ struct stasis_message_type *stasis_cache_clear_type(void); /*! @{ */ /*! - * \brief A topic wrapper, which caches certain messages. + * \brief A message cache, for use with \ref stasis_caching_topic. * \since 12 */ -struct stasis_caching_topic; +struct stasis_cache; /*! - * \brief A message which instructs the caching topic to remove an entry from its cache. - * - * \param message Message representative of the cache entry that should be cleared. - * This will become the data held in the stasis_cache_clear message. - * - * \return Message which, when sent to the \a topic, will clear the item from the cache. - * \return \c NULL on error. + * \brief A topic wrapper, which caches certain messages. * \since 12 */ -struct stasis_message *stasis_cache_clear_create(struct stasis_message *message); +struct stasis_caching_topic; + /*! * \brief Callback extract a unique identity from a snapshot message. @@ -606,6 +626,21 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *message) typedef const char *(*snapshot_get_id)(struct stasis_message *message); /*! + * \brief Create a cache. + * + * This is the backend store for a \ref stasis_caching_topic. The cache is + * thread safe, allowing concurrent reads and writes. + * + * The returned object is AO2 managed, so ao2_cleanup() when you're done. + * + * \param id_fn Callback to extract the id from a snapshot message. + * \return New cache indexed by \a id_fn. + * \return \c NULL on error + * \since 12 + */ +struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn); + +/*! * \brief Create a topic which monitors and caches messages from another topic. * * The idea is that some topics publish 'snapshots' of some other object's state @@ -613,13 +648,17 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message); * is updated, and a stasis_cache_update() message is forwarded, which has both * the original snapshot message and the new message. * + * The returned object is AO2 managed, so ao2_cleanup() when done with it. + * * \param original_topic Topic publishing snapshot messages. - * \param id_fn Callback to extract the id from a snapshot message. + * \param cache Backend cache in which to keep snapshots. * \return New topic which changes snapshot messages to stasis_cache_update() * messages, and forwards all other messages from the original topic. + * \return \c NULL on error * \since 12 */ -struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn); +struct stasis_caching_topic *stasis_caching_topic_create( + struct stasis_topic *original_topic, struct stasis_cache *cache); /*! * \brief Unsubscribes a caching topic from its upstream topic. @@ -651,53 +690,55 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join( /*! * \brief Returns the topic of cached events from a caching topics. * \param caching_topic The caching topic. - * \return The topic that publishes cache update events, along with passthrough events - * from the underlying topic. + * \return The topic that publishes cache update events, along with passthrough + * events from the underlying topic. * \return \c NULL if \a caching_topic is \c NULL. * \since 12 */ -struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic); +struct stasis_topic *stasis_caching_get_topic( + struct stasis_caching_topic *caching_topic); /*! - * \brief Retrieve an item from the cache. + * \brief A message which instructs the caching topic to remove an entry from + * its cache. * - * The returned item is AO2 managed, so ao2_cleanup() when you're done with it. + * \param message Message representative of the cache entry that should be + * cleared. This will become the data held in the + * stasis_cache_clear message. * - * \param caching_topic The topic returned from stasis_caching_topic_create(). - * \param type Type of message to retrieve. - * \param id Identity of the snapshot to retrieve. - * \return Message from the cache. - * \return \c NULL if message is not found. + * \return Message which, when sent to a \ref stasis_caching_topic, will clear + * the item from the cache. + * \return \c NULL on error. * \since 12 */ -#define stasis_cache_get(caching_topic, type, id) stasis_cache_get_extended(caching_topic, type, id, 0) +struct stasis_message *stasis_cache_clear_create(struct stasis_message *message); /*! * \brief Retrieve an item from the cache. - * \param caching_topic The topic returned from stasis_caching_topic_create(). + * + * The returned item is AO2 managed, so ao2_cleanup() when you're done with it. + * + * \param cache The cache to query. * \param type Type of message to retrieve. * \param id Identity of the snapshot to retrieve. - * \param guaranteed If set to 1 it is guaranteed that any pending messages have been processed. - * \return Message from the cache. The cache still owns the message, so - * ao2_ref() if you want to keep it. + * \return Message from the cache. * \return \c NULL if message is not found. * \since 12 */ -struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, - struct stasis_message_type *type, - const char *id, - unsigned int guaranteed); +struct stasis_message *stasis_cache_get( + struct stasis_cache *cache, struct stasis_message_type *type, + const char *id); /*! * \brief Dump cached items to a subscription - * \param caching_topic The topic returned from stasis_caching_topic_create(). + * \param cache The cache to query. * \param type Type of message to dump (any type if \c NULL). * \return ao2_container containing all matches (must be unreffed by caller) * \return \c NULL on allocation error * \since 12 */ -struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, - struct stasis_message_type *type); +struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, + struct stasis_message_type *type); /*! @} */ @@ -831,13 +872,18 @@ int stasis_cache_init(void); /*! * \internal - * \brief called by stasis_init for config initialization. + * \brief called by stasis_init() for config initialization. * \return 0 on success. * \return Non-zero on error. * \since 12 */ int stasis_config_init(void); +/*! + * \internal + */ +int stasis_wait_init(void); + struct ast_threadpool_options; /*! |