diff options
-rw-r--r-- | include/asterisk/channel.h | 117 | ||||
-rw-r--r-- | include/asterisk/channel_internal.h | 1 | ||||
-rw-r--r-- | include/asterisk/stasis.h | 506 | ||||
-rw-r--r-- | main/asterisk.c | 6 | ||||
-rw-r--r-- | main/asterisk.exports.in | 1 | ||||
-rw-r--r-- | main/channel.c | 287 | ||||
-rw-r--r-- | main/channel_internal_api.c | 21 | ||||
-rw-r--r-- | main/manager.c | 200 | ||||
-rw-r--r-- | main/pbx.c | 13 | ||||
-rw-r--r-- | main/stasis.c | 514 | ||||
-rw-r--r-- | main/stasis_cache.c | 443 | ||||
-rw-r--r-- | main/stasis_message.c | 135 | ||||
-rw-r--r-- | tests/test_stasis.c | 683 |
13 files changed, 2812 insertions, 115 deletions
diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h index f7e4d2d39..eee0828be 100644 --- a/include/asterisk/channel.h +++ b/include/asterisk/channel.h @@ -125,7 +125,6 @@ References: #include "asterisk/abstract_jb.h" #include "asterisk/astobj2.h" - #include "asterisk/poll-compat.h" #if defined(__cplusplus) || defined(c_plusplus) @@ -151,6 +150,7 @@ extern "C" { #include "asterisk/channelstate.h" #include "asterisk/ccss.h" #include "asterisk/framehook.h" +#include "asterisk/stasis.h" #define DATASTORE_INHERIT_FOREVER INT_MAX @@ -4102,4 +4102,119 @@ int ast_channel_dialed_causes_add(const struct ast_channel *chan, const struct a void ast_channel_dialed_causes_clear(const struct ast_channel *chan); struct ast_flags *ast_channel_flags(struct ast_channel *chan); + +/*! + * \since 12 + * \brief Structure representing a snapshot of channel state. + * + * While not enforced programmatically, this object is shared across multiple + * threads, and should be threated as an immutable object. + */ +struct ast_channel_snapshot { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(name); /*!< ASCII unique channel name */ + AST_STRING_FIELD(accountcode); /*!< Account code for billing */ + AST_STRING_FIELD(peeraccount); /*!< Peer account code for billing */ + AST_STRING_FIELD(userfield); /*!< Userfield for CEL billing */ + AST_STRING_FIELD(uniqueid); /*!< Unique Channel Identifier */ + AST_STRING_FIELD(linkedid); /*!< Linked Channel Identifier -- gets propagated by linkage */ + AST_STRING_FIELD(parkinglot); /*!< Default parking lot, if empty, default parking lot */ + AST_STRING_FIELD(hangupsource); /*!< Who is responsible for hanging up this channel */ + AST_STRING_FIELD(appl); /*!< Current application */ + AST_STRING_FIELD(data); /*!< Data passed to current application */ + AST_STRING_FIELD(context); /*!< Dialplan: Current extension context */ + AST_STRING_FIELD(exten); /*!< Dialplan: Current extension number */ + AST_STRING_FIELD(caller_name); /*!< Caller ID Name */ + AST_STRING_FIELD(caller_number); /*!< Caller ID Number */ + AST_STRING_FIELD(connected_name); /*!< Connected Line Name */ + AST_STRING_FIELD(connected_number); /*!< Connected Line Number */ + ); + + struct timeval creationtime; /*!< The time of channel creation */ + enum ast_channel_state state; /*!< State of line */ + int priority; /*!< Dialplan: Current extension priority */ + int amaflags; /*!< AMA flags for billing */ + int hangupcause; /*!< Why is the channel hanged up. See causes.h */ + struct ast_flags flags; /*!< channel flags of AST_FLAG_ type */ +}; + +/*! + * \since 12 + * \brief Generate a snapshot of the channel state. This is an ao2 object, so + * ao2_cleanup() to deallocate. + * + * \param chan The channel from which to generate a snapshot + * + * \retval pointer on success (must be ast_freed) + * \retval NULL on error + */ +struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan); + +/*! + * \since 12 + * \brief Message type for \ref ast_channel_snapshot. + * + * \retval Message type for \ref ast_channel_snapshot. + */ +struct stasis_message_type *ast_channel_snapshot(void); + +/*! + * \since 12 + * \brief A topic which publishes the events for a particular channel. + * + * \param chan Channel. + * + * \retval Topic for channel's events. + * \retval \c NULL if \a chan is \c NULL. + */ +struct stasis_topic *ast_channel_topic(struct ast_channel *chan); + +/*! + * \since 12 + * \brief A topic which publishes the events for all channels. + * \retval Topic for all channel events. + */ +struct stasis_topic *ast_channel_topic_all(void); + +/*! + * \since 12 + * \brief A caching topic which caches \ref ast_channel_snapshot messages from + * ast_channel_events_all(void). + * + * \retval Topic for all channel events. + */ +struct stasis_caching_topic *ast_channel_topic_all_cached(void); + +/*! + * \since 12 + * \brief Variable set event. + */ +struct ast_channel_varset { + /*! Channel variable was set on (or NULL for global variable) */ + struct ast_channel_snapshot *snapshot; + /*! Variable name */ + char *variable; + /*! New value */ + char *value; +}; + +/*! + * \since 12 + * \brief Message type for \ref ast_channel_varset messages. + * + * \retval Message type for \ref ast_channel_varset messages. + */ +struct stasis_message_type *ast_channel_varset(void); + +/*! + * \since 12 + * \brief Publish a \ref ast_channel_varset for a channel. + * + * \param chan Channel to pulish the event for, or \c NULL for 'none'. + * \param variable Name of the variable being set + * \param value Value. + */ +void ast_channel_publish_varset(struct ast_channel *chan, + const char *variable, const char *value); + #endif /* _ASTERISK_CHANNEL_H */ diff --git a/include/asterisk/channel_internal.h b/include/asterisk/channel_internal.h index 1b01fe042..38776c1e1 100644 --- a/include/asterisk/channel_internal.h +++ b/include/asterisk/channel_internal.h @@ -23,4 +23,5 @@ struct ast_channel *__ast_channel_internal_alloc(void (*destructor)(void *obj), void ast_channel_internal_finalize(struct ast_channel *chan); int ast_channel_internal_is_finalized(struct ast_channel *chan); void ast_channel_internal_cleanup(struct ast_channel *chan); +void ast_channel_internal_setup_topics(struct ast_channel *chan); diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h new file mode 100644 index 000000000..a6ab4212f --- /dev/null +++ b/include/asterisk/stasis.h @@ -0,0 +1,506 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_STASIS_H +#define _ASTERISK_STASIS_H + +/*! \file + * + * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for + * detailed documentation. + * + * \author David M. Lee, II <dlee@digium.com> + * \since 12 + * + * \page stasis Stasis Message Bus API + * + * \par Intro + * + * The Stasis Message Bus is a loosely typed mechanism for distributing messages + * within Asterisk. It is designed to be: + * - Loosely coupled; new message types can be added in seperate modules. + * - Easy to use; publishing and subscribing are straightforward operations. + * - Consistent memory management; all message bus objects are AO2 managed + * objects, using ao2_ref() and ao2_cleanup() to manage the reference + * counting. + * + * There are three main concepts for using the Stasis Message Bus: + * - \ref stasis_message + * - \ref stasis_topic + * - \ref stasis_subscription + * + * \par stasis_message + * + * Central to the Stasis Message Bus is the \ref stasis_message, the messages + * that are sent on the bus. These messages have: + * - a type (as defined by a \ref stasis_message_type) + * - a value - a \c void pointer to an AO2 object + * - a timestamp when it was created + * + * Once a \ref stasis_message has been created, it is immutable and cannot + * change. The same goes for the value of the message (although this cannot be + * enforced in code). Messages themselves are reference-counted, AO2 objects, + * along with their values. By being both reference counted and immutable, + * messages can be shared throughout the system without any concerns for + * threading. (Well, the objects must be allocated with \ref + * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread + * safe. But other than that, no worries). + * + * The type of a message is defined by an instance of \ref stasis_message_type, + * which can be created by calling stasis_message_type_create(). Message types + * are named, which is useful in debugging. It is recommended that the string + * name for a message type match the name of the struct that's stored in the + * message. For example, name for \ref stasis_cache_update's message type is \c + * "stasis_cache_update". + * + * \par stasis_topic + * + * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be + * subscribed, and \ref stasis_message's may be published. Any message published + * to the topic is dispatched to all of its subscribers. The topic itself may be + * named, which is useful in debugging. + * + * Topics themselves are reference counted objects, and automagically + * unsubscribe all of their subscribers when they are destroyed. Topics are also + * thread safe, so no worries about publishing/subscribing/unsubscribing to a + * topic concurrently from multiple threads. It's also designed to handle the + * case of unsubscribing from a topic from within the subscription handler. + * + * \par Forwarding + * + * There is one special case of topics that's worth noting: forwarding + * messages. It's a fairly common use case to want to forward all the messages + * published on one topic to another one (for example, an aggregator topic that + * publishes all the events from a set of other topics). This can be + * accomplished easily using stasis_forward_all(). This sets up the forwarding + * between the two topics, and returns a \ref stasis_subscription, which can be + * unsubscribed to stop the forwarding. + * + * \par Caching + * + * Another common use case is to want to cache certain messages that are + * published on the bus. Usually these events are snapshots of the current state + * in the system, and it's desirable to query that state from the cache without + * locking the original object. It's also desirable for subscribers of the + * caching topic to receive messages that have both the old cache value and the + * new value being put into the cache. For this, we have + * stasis_caching_topic_create(), providing it with the topic which publishes + * the messages that you wish to cache, and a function that can identify + * cacheable messages. + * + * The returned \ref stasis_caching_topic provides a topic that forwards + * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref + * stasis_cache_update message which provides the old snapshot (or \c NULL if + * this is a new cache entry), and the new snapshot (or \c NULL if the entry was + * removed from the cache). A stasis_cache_clear_create() message must be sent + * to the topic in order to remove entries from the cache. + * + * As with all things Stasis, the \ref stasis_caching_topic is a reference + * counted AO2 object. + * + * \par stasis_subscriber + * + * Any topic may be subscribed to by simply providing stasis_subscribe() the + * \ref stasis_topic to subscribe to, a handler function and \c void pointer to + * data that is passed back to the handler. Invocations on the subscription's + * handler are serialized, but different invocations may occur on different + * threads (this usually isn't important unless you use thread locals or + * something similar). + * + * Since the topic (by necessity) holds a reference to the subscription, + * reference counting alone is insufficient to terminate a subscription. In + * order to stop receiving messages, call stasis_unsubscribe() with your \ref + * stasis_subscription. This will remove the topic's reference to the + * subscription, and allow it to be destroyed when all of the other references + * are cleaned up. + */ + +#include "asterisk/utils.h" + +/*! @{ */ + +/*! + * \brief Metadata about a \ref stasis_message. + * \since 12 + */ +struct stasis_message_type; + +/*! + * \brief Register a new message type. + * + * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done + * with it. + * + * \param name Name of the new type. + * \return Pointer to the new type. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_message_type *stasis_message_type_create(const char *name); + +/*! + * \brief Gets the name of a given message type + * \param type The type to get. + * \return Name of the type. + * \return \c NULL if \a type is \c NULL. + * \since 12 + */ +const char *stasis_message_type_name(const struct stasis_message_type *type); + +/*! + * \brief Opaque type for a Stasis message. + * \since 12 + */ +struct stasis_message; + +/*! + * \brief Create a new message. + * + * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done + * with it. Messages are also immutable, and must not be modified after they + * are initialized. Especially the \a data in the message. + * + * \param type Type of the message + * \param data Immutable data that is the actual contents of the message + * \return New message + * \return \c NULL on error + * \since 12 + */ +struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data); + +/*! + * \brief Get the message type for a \ref stasis_message. + * \param msg Message to type + * \return Type of \a msg + * \return \c NULL if \a msg is \c NULL. + * \since 12 + */ +struct stasis_message_type *stasis_message_type(const struct stasis_message *msg); + +/*! + * \brief Get the data contained in a message. + * \param msg Message. + * \return Immutable data pointer + * \return \c NULL if msg is \c NULL. + * \since 12 + */ +void *stasis_message_data(const struct stasis_message *msg); + +/*! + * \brief Get the time when a message was created. + * \param msg Message. + * \return Pointer to the \a timeval when the message was created. + * \return \c NULL if msg is \c NULL. + * \since 12 + */ +const struct timeval *stasis_message_timestamp(const struct stasis_message *msg); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief A topic to which messages may be posted, and subscribers, well, subscribe + * \since 12 + */ +struct stasis_topic; + +/*! + * \brief Create a new topic. + * \param name Name of the new topic. + * \return New topic instance. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_topic *stasis_topic_create(const char *name); + +/*! + * \brief Return the name of a topic. + * \param topic Topic. + * \return Name of the topic. + * \return \c NULL if topic is \c NULL. + * \since 12 + */ +const char *stasis_topic_name(const struct stasis_topic *topic); + +/*! + * \brief Publish a message to a topic's subscribers. + * \param topic Topic. + * \param message Message to publish. + * \since 12 + */ +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message); + +/*! + * \brief Publish a message from a specified topic to all the subscribers of a + * possibly different topic. + * \param topic Topic to publish message to. + * \param topic Original topic message was from. + * \param message Message + * \since 12 + */ +void stasis_forward_message(struct stasis_topic *topic, + struct stasis_topic *publisher_topic, + struct stasis_message *message); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief Opaque type for a Stasis subscription. + * \since 12 + */ +struct stasis_subscription; + +/*! + * \brief Callback function type for Stasis subscriptions. + * \param data Data field provided with subscription. + * \param topic Topic to which the message was published. + * \param message Published message. + * \since 12 + */ +typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); + +/*! + * \brief Create a subscription. + * + * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free + * up this reference), the subscription must be explicitly unsubscribed from its + * topic using stasis_unsubscribe(). + * + * The invocations of the callback are serialized, but may not always occur on + * the same thread. The invocation order of different subscriptions is + * unspecified. + * + * \param topic Topic to subscribe to. + * \param callback Callback function for subscription messages. + * \param data Data to be passed to the callback, in addition to the message. + * \return New \ref stasis_subscription object. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data); + +/*! + * \brief Cancel a subscription. + * + * Note that in an asynchronous system, there may still be messages queued or + * in transit to the subscription's callback. These will still be delivered. + * There will be a final 'SubscriptionCancelled' message, indicating the + * delivery of the final message. + * + * \param subscription Subscription to cancel. + * \since 12 + */ +void stasis_unsubscribe(struct stasis_subscription *subscription); + +/*! + * \brief Create a subscription which forwards all messages from one topic to + * another. + * + * Note that the \a topic parameter of the invoked callback will the be \a topic + * the message was sent to, not the topic the subscriber subscribed to. + * + * \param from_topic Topic to forward. + * \param to_topic Destination topic of forwarded messages. + * \return New forwarding subscription. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic); + +/*! + * \brief Get the unique ID for the subscription. + * + * \param sub Subscription for which to get the unique ID. + * \return Unique ID for the subscription. + * \since 12 + */ +const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub); + +/*! + * \brief Returns whether a subscription is currently subscribed. + * + * Note that there may still be messages queued up to be dispatched to this + * subscription, but the stasis_subscription_final_message() has been enqueued. + * + * \param sub Subscription to check + * \return False (zero) if subscription is not subscribed. + * \return True (non-zero) if still subscribed. + */ +int stasis_subscription_is_subscribed(const struct stasis_subscription *sub); + +/*! + * \brief Determine whether a message is the final message to be received on a subscription. + * + * \param sub Subscription on which the message was received. + * \param msg Message to check. + * \return zero if the provided message is not the final message. + * \return non-zero if the provided message is the final message. + * \since 12 + */ +int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg); + +/*! + * \brief Holds details about changes to subscriptions for the specified topic + * \since 12 + */ +struct stasis_subscription_change { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(uniqueid); /*!< The unique ID associated with this subscription */ + AST_STRING_FIELD(description); /*!< The description of the change to the subscription associated with the uniqueid */ + ); + struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */ +}; + +/*! + * \brief Gets the message type for subscription change notices + * \return The stasis_message_type for subscription change notices + * \since 12 + */ +struct stasis_message_type *stasis_subscription_change(void); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief A topic wrapper, which caches certain messages. + * \since 12 + */ +struct stasis_caching_topic; + +/*! + * \brief Message type for cache update messages. + * \return Message type for cache update messages. + * \since 12 + */ +struct stasis_message_type *stasis_cache_update(void); + +/*! + * \brief Cache update message + * \since 12 + */ +struct stasis_cache_update { + /*! \brief Topic that published \c new_snapshot */ + struct stasis_topic *topic; + /*! \brief Convenience reference to snapshot type */ + struct stasis_message_type *type; + /*! \brief Old value from the cache */ + struct stasis_message *old_snapshot; + /*! \brief New value */ + struct stasis_message *new_snapshot; +}; + +/*! + * \brief A message which instructs the caching topic to remove an entry from its cache. + * \param type Message type. + * \param id Unique id of the snapshot to clear. + * \return Message which, when sent to the \a topic, will clear the item from the cache. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id); + +/*! + * \brief Callback extract a unique identity from a snapshot message. + * + * This identity is unique to the underlying object of the snapshot, such as the + * UniqueId field of a channel. + * + * \param message Message to extract id from. + * \return String representing the snapshot's id. + * \return \c NULL if the message_type of the message isn't a handled snapshot. + * \since 12 + */ +typedef const char *(*snapshot_get_id)(struct stasis_message *message); + +/*! + * \brief Create a topic which monitors and caches messages from another topic. + * + * The idea is that some topics publish 'snapshots' of some other object's state + * that should be cached. When these snapshot messages are received, the cache + * is updated, and a stasis_cache_update() message is forwarded, which has both + * the original snapshot message and the new message. + * + * \param original_topic Topic publishing snapshot messages. + * \param id_fn Callback to extract the id from a snapshot message. + * \return New topic which changes snapshot messages to stasis_cache_update() + * messages, and forwards all other messages from the original topic. + * \since 12 + */ +struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn); + +/*! + * Unsubscribes a caching topic from its upstream topic. + * \param caching_topic Caching topic to unsubscribe + * \since 12 + */ +void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic); + +/*! + * \brief Returns the topic of cached events from a caching topics. + * \param caching_topic The caching topic. + * \return The topic that publishes cache update events, along with passthrough events + * from the underlying topic. + * \return \c NULL if \a caching_topic is \c NULL. + * \since 12 + */ +struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic); + +/*! + * \brief Retrieve an item from the cache. + * \param caching_topic The topic returned from stasis_caching_topic_create(). + * \param type Type of message to retrieve. + * \param id Identity of the snapshot to retrieve. + * \return Message from the cache. The cache still owns the message, so + * ao2_ref() if you want to keep it. + * \return \c NULL if message is not found. + * \since 12 + */ +struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, + struct stasis_message_type *type, + const char *id); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief Initialize the Stasis subsystem + * \return 0 on success. + * \return Non-zero on error. + * \since 12 + */ +int stasis_init(void); + +/*! + * \private + * \brief called by stasis_init() for cache initialization. + * \return 0 on success. + * \return Non-zero on error. + * \since 12 + */ +int stasis_cache_init(void); + +/*! @} */ + +#endif /* _ASTERISK_STASIS_H */ diff --git a/main/asterisk.c b/main/asterisk.c index 1d5371938..4e5e58c05 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -240,6 +240,7 @@ int daemon(int, int); /* defined in libresolv of all places */ #include "asterisk/aoc.h" #include "asterisk/uuid.h" #include "asterisk/sorcery.h" +#include "asterisk/stasis.h" #include "../defaults.h" @@ -4120,6 +4121,11 @@ int main(int argc, char *argv[]) exit(1); } + if (stasis_init()) { + printf("Stasis initialization failed.\n%s", term_quit()); + exit(1); + } + ast_makesocket(); sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); diff --git a/main/asterisk.exports.in b/main/asterisk.exports.in index 49d3a44a4..3157b8319 100644 --- a/main/asterisk.exports.in +++ b/main/asterisk.exports.in @@ -32,6 +32,7 @@ LINKER_SYMBOL_PREFIXdialed_interface_info; LINKER_SYMBOL_PREFIXstrsep; LINKER_SYMBOL_PREFIXsetenv; + LINKER_SYMBOL_PREFIXstasis_*; LINKER_SYMBOL_PREFIXunsetenv; LINKER_SYMBOL_PREFIXstrcasestr; LINKER_SYMBOL_PREFIXstrnlen; diff --git a/main/channel.c b/main/channel.c index 9c8f32cb5..df0a67b3b 100644 --- a/main/channel.c +++ b/main/channel.c @@ -152,6 +152,15 @@ static AST_RWLIST_HEAD_STATIC(backends, chanlist); /*! \brief All active channels on the system */ static struct ao2_container *channels; +/*! \brief Message type for channel snapshot events */ +static struct stasis_message_type *__channel_snapshot; + +static struct stasis_message_type *__channel_varset; + +struct stasis_topic *__channel_topic_all; + +struct stasis_caching_topic *__channel_topic_all_cached; + /*! \brief map AST_CAUSE's to readable string representations * * \ref causes.h @@ -214,6 +223,77 @@ static const struct causes_map causes[] = { { AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" }, }; +static void publish_channel_state(struct ast_channel *chan) +{ + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + snapshot = ast_channel_snapshot_create(chan); + if (!snapshot) { + ast_log(LOG_ERROR, "Allocation error\n"); + return; + } + + message = stasis_message_create(ast_channel_snapshot(), snapshot); + if (!message) { + return; + } + + ast_assert(ast_channel_topic(chan) != NULL); + stasis_publish(ast_channel_topic(chan), message); +} + +static void channel_varset_dtor(void *obj) +{ + struct ast_channel_varset *event = obj; + ao2_cleanup(event->snapshot); + ast_free(event->variable); + ast_free(event->value); +} + +void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value) +{ + RAII_VAR(struct ast_channel_varset *, event, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + event = ao2_alloc(sizeof(*event), channel_varset_dtor); + if (!event) { + return; + } + + if (chan) { + event->snapshot = ast_channel_snapshot_create(chan); + if (event->snapshot == NULL) { + return; + } + } + event->variable = ast_strdup(name); + event->value = ast_strdup(value); + if (event->variable == NULL || event->value == NULL) { + return; + } + + msg = stasis_message_create(ast_channel_varset(), event); + if (!msg) { + return; + } + + if (chan) { + stasis_publish(ast_channel_topic(chan), msg); + } else { + stasis_publish(ast_channel_topic_all(), msg); + } +} + + +static void publish_cache_clear(struct ast_channel *chan) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan)); + stasis_publish(ast_channel_topic(chan), message); +} + struct ast_variable *ast_channeltype_list(void) { struct chanlist *cl; @@ -1073,6 +1153,8 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp)); } + ast_channel_internal_setup_topics(tmp); + if (!ast_strlen_zero(name_fmt)) { char *slash, *slash2; /* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call. @@ -1145,34 +1227,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char * a lot of data into this func to do it here! */ if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) { - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a new channel is created.</synopsis> - <syntax> - <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" /> - <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" /> - </syntax> - </managerEventInstance> - ***/ - ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel", - "Channel: %s\r\n" - "ChannelState: %d\r\n" - "ChannelStateDesc: %s\r\n" - "CallerIDNum: %s\r\n" - "CallerIDName: %s\r\n" - "AccountCode: %s\r\n" - "Exten: %s\r\n" - "Context: %s\r\n" - "Uniqueid: %s\r\n", - ast_channel_name(tmp), - state, - ast_state2str(state), - S_OR(cid_num, ""), - S_OR(cid_name, ""), - ast_channel_accountcode(tmp), - S_OR(exten, ""), - S_OR(context, ""), - ast_channel_uniqueid(tmp)); + publish_channel_state(tmp); } ast_channel_internal_finalize(tmp); @@ -2893,39 +2948,9 @@ int ast_hangup(struct ast_channel *chan) ast_channel_unlock(chan); ast_cc_offer(chan); - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a channel is hung up.</synopsis> - <syntax> - <parameter name="Cause"> - <para>A numeric cause code for why the channel was hung up.</para> - </parameter> - <parameter name="Cause-txt"> - <para>A description of why the channel was hung up.</para> - </parameter> - </syntax> - </managerEventInstance> - ***/ - ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup", - "Channel: %s\r\n" - "Uniqueid: %s\r\n" - "CallerIDNum: %s\r\n" - "CallerIDName: %s\r\n" - "ConnectedLineNum: %s\r\n" - "ConnectedLineName: %s\r\n" - "AccountCode: %s\r\n" - "Cause: %d\r\n" - "Cause-txt: %s\r\n", - ast_channel_name(chan), - ast_channel_uniqueid(chan), - S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"), - S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"), - S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"), - S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"), - ast_channel_accountcode(chan), - ast_channel_hangupcause(chan), - ast_cause2str(ast_channel_hangupcause(chan)) - ); + + publish_channel_state(chan); + publish_cache_clear(chan); if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) && @@ -7435,47 +7460,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state) * we override what they are saying the state is and things go amuck. */ ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name); - /* setstate used to conditionally report Newchannel; this is no more */ - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a channel's state changes.</synopsis> - <syntax> - <parameter name="ChannelState"> - <para>A numeric code for the channel's current state, related to ChannelStateDesc</para> - </parameter> - <parameter name="ChannelStateDesc"> - <enumlist> - <enum name="Down"/> - <enum name="Rsrvd"/> - <enum name="OffHook"/> - <enum name="Dialing"/> - <enum name="Ring"/> - <enum name="Ringing"/> - <enum name="Up"/> - <enum name="Busy"/> - <enum name="Dialing Offhook"/> - <enum name="Pre-ring"/> - <enum name="Unknown"/> - </enumlist> - </parameter> - </syntax> - </managerEventInstance> - ***/ - ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate", - "Channel: %s\r\n" - "ChannelState: %d\r\n" - "ChannelStateDesc: %s\r\n" - "CallerIDNum: %s\r\n" - "CallerIDName: %s\r\n" - "ConnectedLineNum: %s\r\n" - "ConnectedLineName: %s\r\n" - "Uniqueid: %s\r\n", - ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)), - S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""), - S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""), - S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""), - S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""), - ast_channel_uniqueid(chan)); + publish_channel_state(chan); return 0; } @@ -8644,6 +8629,14 @@ static void prnt_channel_key(void *v_obj, void *where, ao2_prnt_fn *prnt) static void channels_shutdown(void) { + ao2_cleanup(__channel_snapshot); + __channel_snapshot = NULL; + ao2_cleanup(__channel_varset); + __channel_varset = NULL; + ao2_cleanup(__channel_topic_all); + __channel_topic_all = NULL; + stasis_caching_unsubscribe(__channel_topic_all_cached); + __channel_topic_all_cached = NULL; ast_data_unregister(NULL); ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel)); if (channels) { @@ -8653,6 +8646,16 @@ static void channels_shutdown(void) } } +static const char *channel_snapshot_get_id(struct stasis_message *message) +{ + struct ast_channel_snapshot *snapshot; + if (ast_channel_snapshot() != stasis_message_type(message)) { + return NULL; + } + snapshot = stasis_message_data(message); + return snapshot->uniqueid; +} + void ast_channels_init(void) { channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS, @@ -8661,6 +8664,12 @@ void ast_channels_init(void) ao2_container_register("channels", channels, prnt_channel_key); } + __channel_snapshot = stasis_message_type_create("ast_channel_snapshot"); + __channel_varset = stasis_message_type_create("ast_channel_varset"); + + __channel_topic_all = stasis_topic_create("ast_channel_topic_all"); + __channel_topic_all_cached = stasis_caching_topic_create(__channel_topic_all, channel_snapshot_get_id); + ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel)); ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers)); @@ -8668,6 +8677,7 @@ void ast_channels_init(void) ast_plc_reload(); ast_register_atexit(channels_shutdown); + } /*! \brief Print call group and pickup group ---*/ @@ -11241,6 +11251,79 @@ int ast_channel_get_cc_agent_type(struct ast_channel *chan, char *agent_type, si return 0; } +static void ast_channel_snapshot_dtor(void *obj) +{ + struct ast_channel_snapshot *snapshot = obj; + ast_string_field_free_memory(snapshot); +} + +struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan) +{ + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + + snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor); + if (ast_string_field_init(snapshot, 1024)) { + return NULL; + } + + ast_string_field_set(snapshot, name, ast_channel_name(chan)); + ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan)); + ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan)); + ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan)); + ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan)); + ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan)); + ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan)); + ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan)); + if (ast_channel_appl(chan)) { + ast_string_field_set(snapshot, appl, ast_channel_appl(chan)); + } + if (ast_channel_data(chan)) { + ast_string_field_set(snapshot, data, ast_channel_data(chan)); + } + ast_string_field_set(snapshot, context, ast_channel_context(chan)); + ast_string_field_set(snapshot, exten, ast_channel_exten(chan)); + + ast_string_field_set(snapshot, caller_name, + S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "")); + ast_string_field_set(snapshot, caller_number, + S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "")); + + ast_string_field_set(snapshot, connected_name, + S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "")); + ast_string_field_set(snapshot, connected_number, + S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "")); + + snapshot->creationtime = ast_channel_creationtime(chan); + snapshot->state = ast_channel_state(chan); + snapshot->priority = ast_channel_priority(chan); + snapshot->amaflags = ast_channel_amaflags(chan); + snapshot->hangupcause = ast_channel_hangupcause(chan); + snapshot->flags = *ast_channel_flags(chan); + + ao2_ref(snapshot, +1); + return snapshot; +} + +struct stasis_message_type *ast_channel_varset(void) +{ + return __channel_varset; +} + +struct stasis_message_type *ast_channel_snapshot(void) +{ + return __channel_snapshot; +} + +struct stasis_topic *ast_channel_topic_all(void) +{ + return __channel_topic_all; +} + +struct stasis_caching_topic *ast_channel_topic_all_cached(void) +{ + return __channel_topic_all_cached; +} + /* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY * * ONLY FUNCTIONS FOR PROVIDING BACKWARDS ABI COMPATIBILITY BELONG HERE diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index 3f892ddef..8cc2e6c62 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -195,6 +195,8 @@ struct ast_channel { char dtmf_digit_to_emulate; /*!< Digit being emulated */ char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */ struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ + struct stasis_topic *topic; /*!< Topic for all channel's events */ + struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */ }; /* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */ @@ -1364,6 +1366,12 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) } ast_string_field_free_memory(chan); + + stasis_unsubscribe(chan->forwarder); + chan->forwarder = NULL; + + ao2_cleanup(chan->topic); + chan->topic = NULL; } void ast_channel_internal_finalize(struct ast_channel *chan) @@ -1375,3 +1383,16 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan) { return chan->finalized; } + +struct stasis_topic *ast_channel_topic(struct ast_channel *chan) +{ + return chan->topic; +} + +void ast_channel_internal_setup_topics(struct ast_channel *chan) +{ + ast_assert(chan->topic == NULL); + ast_assert(chan->forwarder == NULL); + chan->topic = stasis_topic_create(chan->uniqueid); + chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all()); +} diff --git a/main/manager.c b/main/manager.c index fc0ec2631..10a3a3397 100644 --- a/main/manager.c +++ b/main/manager.c @@ -91,6 +91,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/aoc.h" #include "asterisk/stringfields.h" #include "asterisk/presencestate.h" +#include "asterisk/stasis.h" /*** DOCUMENTATION <manager name="Ping" language="en_US"> @@ -963,6 +964,73 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") manager.conf will be present upon starting a new session.</para> </description> </manager> + <managerEvent language="en_US" name="Newchannel"> + <managerEventInstance class="EVENT_FLAG_CALL"> + <synopsis>Raised when a new channel is created.</synopsis> + <syntax> + <parameter name="Channel"> + </parameter> + <parameter name="ChannelState"> + <para>A numeric code for the channel's current state, related to ChannelStateDesc</para> + </parameter> + <parameter name="ChannelStateDesc"> + <enumlist> + <enum name="Down"/> + <enum name="Rsrvd"/> + <enum name="OffHook"/> + <enum name="Dialing"/> + <enum name="Ring"/> + <enum name="Ringing"/> + <enum name="Up"/> + <enum name="Busy"/> + <enum name="Dialing Offhook"/> + <enum name="Pre-ring"/> + <enum name="Unknown"/> + </enumlist> + </parameter> + <parameter name="CallerIDNum"> + </parameter> + <parameter name="CallerIDName"> + </parameter> + <parameter name="ConnectedLineNum"> + </parameter> + <parameter name="ConnectedLineName"> + </parameter> + <parameter name="AccountCode"> + </parameter> + <parameter name="Context"> + </parameter> + <parameter name="Exten"> + </parameter> + <parameter name="Priority"> + </parameter> + <parameter name="Uniqueid"> + </parameter> + <parameter name="Cause"> + <para>A numeric cause code for why the channel was hung up.</para> + </parameter> + <parameter name="Cause-txt"> + <para>A description of why the channel was hung up.</para> + </parameter> + </syntax> + </managerEventInstance> + </managerEvent> + <managerEvent language="en_US" name="Newstate"> + <managerEventInstance class="EVENT_FLAG_CALL"> + <synopsis>Raised when a channel's state changes.</synopsis> + <syntax> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" /> + </syntax> + </managerEventInstance> + </managerEvent> + <managerEvent language="en_US" name="Hangup"> + <managerEventInstance class="EVENT_FLAG_CALL"> + <synopsis>Raised when a channel is hung up.</synopsis> + <syntax> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" /> + </syntax> + </managerEventInstance> + </managerEvent> ***/ /*! \addtogroup Group_AMI AMI functions @@ -1060,6 +1128,8 @@ static const struct { {{ "restart", "gracefully", NULL }}, }; +static struct stasis_subscription *channel_state_sub; + static void acl_change_event_cb(const struct ast_event *event, void *userdata); static void acl_change_event_subscribe(void) @@ -7376,6 +7446,127 @@ static void load_channelvars(struct ast_variable *var) AST_RWLIST_UNLOCK(&channelvars); } +/*! + * \brief Generate the AMI message body from a channel snapshot + * \internal + * + * \param snapshot the channel snapshot for which to generate an AMI message body + * + * \retval NULL on error + * \retval ast_str* on success (must be ast_freed by caller) + */ +static struct ast_str *manager_build_channel_state_string(const struct ast_channel_snapshot *snapshot) +{ + struct ast_str *out = ast_str_create(1024); + int res = 0; + if (!out) { + return NULL; + } + res = ast_str_set(&out, 0, + "Channel: %s\r\n" + "ChannelState: %d\r\n" + "ChannelStateDesc: %s\r\n" + "CallerIDNum: %s\r\n" + "CallerIDName: %s\r\n" + "ConnectedLineNum: %s\r\n" + "ConnectedLineName: %s\r\n" + "AccountCode: %s\r\n" + "Context: %s\r\n" + "Exten: %s\r\n" + "Priority: %d\r\n" + "Uniqueid: %s\r\n" + "Cause: %d\r\n" + "Cause-txt: %s\r\n", + snapshot->name, + snapshot->state, + ast_state2str(snapshot->state), + snapshot->caller_number, + snapshot->caller_name, + snapshot->connected_number, + snapshot->connected_name, + snapshot->accountcode, + snapshot->context, + snapshot->exten, + snapshot->priority, + snapshot->uniqueid, + snapshot->hangupcause, + ast_cause2str(snapshot->hangupcause)); + + if (!res) { + return NULL; + } + + return out; +} + +static void channel_snapshot_update(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) +{ + int is_hungup; + char *manager_event = NULL; + + if (!new_snapshot) { + /* Ignore cache clearing events; we'll see the hangup first */ + return; + } + + is_hungup = ast_test_flag(&new_snapshot->flags, AST_FLAG_ZOMBIE) ? 1 : 0; + + if (!old_snapshot) { + manager_event = "Newchannel"; + } + + if (old_snapshot && old_snapshot->state != new_snapshot->state) { + manager_event = "Newstate"; + } + + if (old_snapshot && is_hungup) { + manager_event = "Hangup"; + } + + if (manager_event) { + RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); + + channel_event_string = manager_build_channel_state_string(new_snapshot); + if (channel_event_string) { + manager_event(EVENT_FLAG_CALL, manager_event, "%s", ast_str_buffer(channel_event_string)); + } + } +} + +static void channel_varset(const char *channel_name, const char *uniqueid, const char *name, const char *value) +{ + /*** DOCUMENTATION + <managerEventInstance> + <synopsis>Raised when a variable is set to a particular value.</synopsis> + </managerEventInstance> + ***/ + manager_event(EVENT_FLAG_DIALPLAN, "VarSet", + "Channel: %s\r\n" + "Variable: %s\r\n" + "Value: %s\r\n" + "Uniqueid: %s\r\n", + channel_name, name, value, uniqueid); +} + +static void channel_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + if (stasis_message_type(message) == stasis_cache_update()) { + struct stasis_cache_update *update = stasis_message_data(message); + if (ast_channel_snapshot() == update->type) { + struct ast_channel_snapshot *old_snapshot = + stasis_message_data(update->old_snapshot); + struct ast_channel_snapshot *new_snapshot = + stasis_message_data(update->new_snapshot); + channel_snapshot_update(old_snapshot, new_snapshot); + } + } else if (stasis_message_type(message) == ast_channel_varset()) { + struct ast_channel_varset *varset = stasis_message_data(message); + const char *name = varset->snapshot ? varset->snapshot->name : "none"; + const char *uniqueid = varset->snapshot ? varset->snapshot->uniqueid : "none"; + channel_varset(name, uniqueid, varset->variable, varset->value); + } +} + /*! \internal \brief Free a user record. Should already be removed from the list */ static void manager_free_user(struct ast_manager_user *user) { @@ -7399,6 +7590,9 @@ static void manager_shutdown(void) { struct ast_manager_user *user; + stasis_unsubscribe(channel_state_sub); + channel_state_sub = NULL; + if (registered) { ast_manager_unregister("Ping"); ast_manager_unregister("Events"); @@ -7490,6 +7684,12 @@ static int __init_manager(int reload, int by_external_config) manager_enabled = 0; + if (!channel_state_sub) { + channel_state_sub = stasis_subscribe( + stasis_caching_get_topic(ast_channel_topic_all_cached()), + channel_event_cb, NULL); + } + if (!registered) { /* Register default actions */ ast_manager_register_xml_core("Ping", 0, action_ping); diff --git a/main/pbx.c b/main/pbx.c index bf95ccbe2..82bbb5257 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -11453,18 +11453,7 @@ int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const ast_verb(2, "Setting global variable '%s' to '%s'\n", name, value); newvariable = ast_var_assign(name, value); AST_LIST_INSERT_HEAD(headp, newvariable, entries); - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a variable is set to a particular value.</synopsis> - </managerEventInstance> - ***/ - manager_event(EVENT_FLAG_DIALPLAN, "VarSet", - "Channel: %s\r\n" - "Variable: %s\r\n" - "Value: %s\r\n" - "Uniqueid: %s\r\n", - chan ? ast_channel_name(chan) : "none", name, value, - chan ? ast_channel_uniqueid(chan) : "none"); + ast_channel_publish_varset(chan, name, value); } if (chan) diff --git a/main/stasis.c b/main/stasis.c new file mode 100644 index 000000000..f94736bf1 --- /dev/null +++ b/main/stasis.c @@ -0,0 +1,514 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis Message Bus API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/threadpool.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/utils.h" +#include "asterisk/uuid.h" + +/*! Initial size of the subscribers list. */ +#define INITIAL_SUBSCRIBERS_MAX 4 + +/*! Threadpool for dispatching notifications to subscribers */ +static struct ast_threadpool *pool; + +static struct stasis_message_type *__subscription_change_message_type; + +/*! \private */ +struct stasis_topic { + char *name; + /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */ + struct stasis_subscription **subscribers; + /*! Allocated length of the subscribers array */ + size_t num_subscribers_max; + /*! Current size of the subscribers array */ + size_t num_subscribers_current; +}; + +/* Forward declarations for the tightly-coupled subscription object */ +struct stasis_subscription; +static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); + +static void topic_dtor(void *obj) +{ + struct stasis_topic *topic = obj; + ast_free(topic->name); + topic->name = NULL; + ast_free(topic->subscribers); + topic->subscribers = NULL; +} + +struct stasis_topic *stasis_topic_create(const char *name) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + + topic = ao2_alloc(sizeof(*topic), topic_dtor); + + if (!topic) { + return NULL; + } + + topic->name = ast_strdup(name); + if (!topic->name) { + return NULL; + } + + topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX; + topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers)); + if (!topic->subscribers) { + return NULL; + } + + ao2_ref(topic, +1); + return topic; +} + +const char *stasis_topic_name(const struct stasis_topic *topic) +{ + return topic->name; +} + +/*! \private */ +struct stasis_subscription { + /*! Unique ID for this subscription */ + char *uniqueid; + /*! Topic subscribed to. */ + struct stasis_topic *topic; + /*! Mailbox for processing incoming messages. */ + struct ast_taskprocessor *mailbox; + /*! Callback function for incoming message processing. */ + stasis_subscription_cb callback; + /*! Data pointer to be handed to the callback. */ + void *data; +}; + +static void subscription_dtor(void *obj) +{ + struct stasis_subscription *sub = obj; + ast_assert(!stasis_subscription_is_subscribed(sub)); + ast_free(sub->uniqueid); + sub->uniqueid = NULL; + ao2_cleanup(sub->topic); + sub->topic = NULL; + ast_taskprocessor_unreference(sub->mailbox); + sub->mailbox = NULL; +} + +static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); + +static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox) +{ + RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); + RAII_VAR(struct ast_uuid *, id, NULL, ast_free); + char uniqueid[AST_UUID_STR_LEN]; + + sub = ao2_alloc(sizeof(*sub), subscription_dtor); + if (!sub) { + return NULL; + } + + id = ast_uuid_generate(); + if (!id) { + ast_log(LOG_ERROR, "UUID generation failed\n"); + return NULL; + } + ast_uuid_to_str(id, uniqueid, sizeof(uniqueid)); + if (needs_mailbox) { + sub->mailbox = ast_threadpool_serializer(uniqueid, pool); + if (!sub->mailbox) { + return NULL; + } + } + + sub->uniqueid = ast_strdup(uniqueid); + ao2_ref(topic, +1); + sub->topic = topic; + sub->callback = callback; + sub->data = data; + + if (topic_add_subscription(topic, sub) != 0) { + return NULL; + } + send_subscription_change_message(topic, uniqueid, "Subscribe"); + + ao2_ref(sub, +1); + return sub; +} + +struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data) +{ + return __stasis_subscribe(topic, callback, data, 1); +} + +void stasis_unsubscribe(struct stasis_subscription *sub) +{ + if (sub) { + size_t i; + struct stasis_topic *topic = sub->topic; + SCOPED_AO2LOCK(lock_topic, topic); + + for (i = 0; i < topic->num_subscribers_current; ++i) { + if (topic->subscribers[i] == sub) { + send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe"); + /* swap [i] with last entry; remove last entry */ + topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current]; + /* Unsubscribing unrefs the subscription */ + ao2_cleanup(sub); + return; + } + } + + ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n"); + } +} + +int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) +{ + if (sub) { + size_t i; + struct stasis_topic *topic = sub->topic; + SCOPED_AO2LOCK(lock_topic, topic); + + for (i = 0; i < topic->num_subscribers_current; ++i) { + if (topic->subscribers[i] == sub) { + return 1; + } + } + } + + return 0; +} + +const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub) +{ + return sub->uniqueid; +} + +int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg) +{ + struct stasis_subscription_change *change; + if (stasis_message_type(msg) != stasis_subscription_change()) { + return 0; + } + + change = stasis_message_data(msg); + if (strcmp("Unsubscribe", change->description)) { + return 0; + } + + if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) { + return 0; + } + + return 1; +} + +/*! + * \brief Add a subscriber to a topic. + * \param topic Topic + * \param sub Subscriber + * \return 0 on success + * \return Non-zero on error + */ +static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) +{ + struct stasis_subscription **subscribers; + SCOPED_AO2LOCK(lock, topic); + + /* Increase list size, if needed */ + if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) { + subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers)); + if (!subscribers) { + return -1; + } + topic->subscribers = subscribers; + topic->num_subscribers_max *= 2; + } + + /* Don't ref sub here or we'll cause a reference cycle. */ + topic->subscribers[topic->num_subscribers_current++] = sub; + return 0; +} + +/*! + * \private + * \brief Information needed to dispatch a message to a subscription + */ +struct dispatch { + /*! Topic message was published to */ + struct stasis_topic *topic; + /*! The message itself */ + struct stasis_message *message; + /*! Subscription receiving the message */ + struct stasis_subscription *sub; +}; + +static void dispatch_dtor(void *data) +{ + struct dispatch *dispatch = data; + ao2_cleanup(dispatch->topic); + ao2_cleanup(dispatch->message); + ao2_cleanup(dispatch->sub); +} + +static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub) +{ + RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + + ast_assert(topic != NULL); + ast_assert(message != NULL); + ast_assert(sub != NULL); + + dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor); + if (!dispatch) { + return NULL; + } + + dispatch->topic = topic; + ao2_ref(topic, +1); + + dispatch->message = message; + ao2_ref(message, +1); + + dispatch->sub = sub; + ao2_ref(sub, +1); + + ao2_ref(dispatch, +1); + return dispatch; +} + +/*! + * \brief Dispatch a message to a subscriber + * \param data \ref dispatch object + * \return 0 + */ +static int dispatch_exec(void *data) +{ + RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup); + RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup); + + /* Since sub->topic doesn't change, no need to lock sub */ + ast_assert(dispatch->sub->topic != NULL); + ao2_ref(dispatch->sub->topic, +1); + sub_topic = dispatch->sub->topic; + + dispatch->sub->callback(dispatch->sub->data, + dispatch->sub, + sub_topic, + dispatch->message); + + return 0; +} + +void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message) +{ + struct stasis_subscription **subscribers = NULL; + size_t num_subscribers, i; + + /* Copy the subscribers, so we don't have to hold the mutex for long */ + { + SCOPED_AO2LOCK(lock, topic); + num_subscribers = topic->num_subscribers_current; + subscribers = ast_malloc(num_subscribers * sizeof(*subscribers)); + if (subscribers) { + for (i = 0; i < num_subscribers; ++i) { + ao2_ref(topic->subscribers[i], +1); + subscribers[i] = topic->subscribers[i]; + } + } + } + + if (!subscribers) { + ast_log(LOG_ERROR, "Dropping message\n"); + return; + } + + for (i = 0; i < num_subscribers; ++i) { + struct stasis_subscription *sub = subscribers[i]; + + ast_assert(sub != NULL); + + if (sub->mailbox) { + RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + + dispatch = dispatch_create(publisher_topic, message, sub); + if (!dispatch) { + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + break; + } + + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { + dispatch = NULL; /* Ownership transferred to mailbox */ + } + } else { + /* No mailbox; dispatch directly */ + sub->callback(sub->data, sub, sub->topic, message); + } + } + + for (i = 0; i < num_subscribers; ++i) { + ao2_cleanup(subscribers[i]); + } + ast_free(subscribers); +} + +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) +{ + stasis_forward_message(topic, topic, message); +} + +/*! \brief Forwarding subscriber */ +static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + struct stasis_topic *to_topic = data; + stasis_forward_message(to_topic, topic, message); + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(to_topic); + } +} + +struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic) +{ + struct stasis_subscription *sub; + if (!from_topic || !to_topic) { + return NULL; + } + /* Subscribe without a mailbox, since we're just forwarding messages */ + sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0); + if (sub) { + /* hold a ref to to_topic for this forwarding subscription */ + ao2_ref(to_topic, +1); + } + return sub; +} + +static void subscription_change_dtor(void *obj) +{ + struct stasis_subscription_change *change = obj; + ast_string_field_free_memory(change); + ao2_cleanup(change->topic); +} + +static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description) +{ + RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); + + change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor); + if (ast_string_field_init(change, 128)) { + return NULL; + } + + ast_string_field_set(change, uniqueid, uniqueid); + ast_string_field_set(change, description, description); + ao2_ref(topic, +1); + change->topic = topic; + + ao2_ref(change, +1); + return change; +} + +struct stasis_message_type *stasis_subscription_change(void) +{ + return __subscription_change_message_type; +} + +static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description) +{ + RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + change = subscription_change_alloc(topic, uniqueid, description); + + if (!change) { + return; + } + + msg = stasis_message_create(stasis_subscription_change(), change); + + if (!msg) { + return; + } + + stasis_publish(topic, msg); +} + +/*! \brief Cleanup function */ +static void stasis_exit(void) +{ + ao2_cleanup(__subscription_change_message_type); + __subscription_change_message_type = NULL; + ast_threadpool_shutdown(pool); + pool = NULL; +} + +int stasis_init(void) +{ + int cache_init; + + /* XXX Should this be configurable? */ + struct ast_threadpool_options opts = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 20, + .auto_increment = 1, + .initial_size = 0, + .max_size = 200 + }; + + ast_register_atexit(stasis_exit); + + if (pool) { + ast_log(LOG_ERROR, "Stasis double-initialized\n"); + return -1; + } + + pool = ast_threadpool_create("stasis-core", NULL, &opts); + if (!pool) { + ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n"); + return -1; + } + + cache_init = stasis_cache_init(); + if (cache_init != 0) { + return -1; + } + + __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change"); + if (!__subscription_change_message_type) { + return -1; + } + + return 0; +} diff --git a/main/stasis_cache.c b/main/stasis_cache.c new file mode 100644 index 000000000..2f4cf52fd --- /dev/null +++ b/main/stasis_cache.c @@ -0,0 +1,443 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis Message API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/hashtab.h" +#include "asterisk/stasis.h" +#include "asterisk/utils.h" + +#ifdef LOW_MEMORY +#define NUM_CACHE_BUCKETS 17 +#else +#define NUM_CACHE_BUCKETS 563 +#endif + +struct stasis_caching_topic { + struct ao2_container *cache; + struct stasis_topic *topic; + struct stasis_subscription *sub; + snapshot_get_id id_fn; +}; + +static void stasis_caching_topic_dtor(void *obj) { + struct stasis_caching_topic *caching_topic = obj; + ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub)); + caching_topic->sub = NULL; + ao2_cleanup(caching_topic->cache); + caching_topic->cache = NULL; + ao2_cleanup(caching_topic->topic); + caching_topic->topic = NULL; +} + +struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic) +{ + return caching_topic->topic; +} + +void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic) +{ + if (caching_topic) { + if (stasis_subscription_is_subscribed(caching_topic->sub)) { + stasis_unsubscribe(caching_topic->sub); + } else { + ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n"); + } + } +} + +struct cache_entry { + struct stasis_message_type *type; + char *id; + struct stasis_message *snapshot; +}; + +static void cache_entry_dtor(void *obj) +{ + struct cache_entry *entry = obj; + ao2_cleanup(entry->type); + entry->type = NULL; + ast_free(entry->id); + entry->id = NULL; + ao2_cleanup(entry->snapshot); + entry->snapshot = NULL; +} + +static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot) +{ + RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup); + + ast_assert(type != NULL); + ast_assert(id != NULL); + + entry = ao2_alloc(sizeof(*entry), cache_entry_dtor); + if (!entry) { + return NULL; + } + + entry->id = ast_strdup(id); + if (!entry->id) { + return NULL; + } + + ao2_ref(type, +1); + entry->type = type; + if (snapshot != NULL) { + ao2_ref(snapshot, +1); + entry->snapshot = snapshot; + } + + ao2_ref(entry, +1); + return entry; +} + +static int cache_entry_hash(const void *obj, int flags) +{ + const struct cache_entry *entry = obj; + int hash = 0; + + ast_assert(!(flags & OBJ_KEY)); + + hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type)); + hash += ast_hashtab_hash_string(entry->id); + return hash; +} + +static int cache_entry_cmp(void *obj, void *arg, int flags) +{ + const struct cache_entry *left = obj; + const struct cache_entry *right = arg; + + ast_assert(!(flags & OBJ_KEY)); + + if (left->type == right->type && strcmp(left->id, right->id) == 0) { + return CMP_MATCH | CMP_STOP; + } + + return 0; +} + +static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot) +{ + RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup); + RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); + struct stasis_message *old_snapshot = NULL; + + ast_assert(caching_topic->cache != NULL); + + new_entry = cache_entry_create(type, id, new_snapshot); + + if (new_snapshot == NULL) { + /* Remove entry from cache */ + cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK); + if (cached_entry) { + old_snapshot = cached_entry->snapshot; + cached_entry->snapshot = NULL; + } + } else { + /* Insert/update cache */ + SCOPED_AO2LOCK(lock, caching_topic->cache); + + cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK); + if (cached_entry) { + /* Update cache. Because objects are moving, no need to update refcounts. */ + old_snapshot = cached_entry->snapshot; + cached_entry->snapshot = new_entry->snapshot; + new_entry->snapshot = NULL; + } else { + /* Insert into the cache */ + ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK); + } + + } + + return old_snapshot; +} + +struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id) +{ + RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup); + RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); + + ast_assert(caching_topic->cache != NULL); + + search_entry = cache_entry_create(type, id, NULL); + if (search_entry == NULL) { + return NULL; + } + + cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER); + if (cached_entry == NULL) { + return NULL; + } + + ast_assert(cached_entry->snapshot != NULL); + ao2_ref(cached_entry->snapshot, +1); + return cached_entry->snapshot; +} + +static struct stasis_message_type *__cache_clear_data; + +static struct stasis_message_type *cache_clear_data(void) +{ + ast_assert(__cache_clear_data != NULL); + return __cache_clear_data; +} + +static struct stasis_message_type *__cache_update; + +struct stasis_message_type *stasis_cache_update(void) +{ + ast_assert(__cache_update != NULL); + return __cache_update; +} + +struct cache_clear_data { + struct stasis_message_type *type; + char *id; +}; + +static void cache_clear_data_dtor(void *obj) +{ + struct cache_clear_data *ev = obj; + ast_free(ev->id); + ev->id = NULL; + ao2_cleanup(ev->type); + ev->type = NULL; +} + +struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id) +{ + RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor); + if (!ev) { + return NULL; + } + + ev->id = ast_strdup(id); + if (!ev->id) { + return NULL; + } + ao2_ref(type, +1); + ev->type = type; + + msg = stasis_message_create(cache_clear_data(), ev); + + if (!msg) { + return NULL; + } + + ao2_ref(msg, +1); + return msg; +} + +static void stasis_cache_update_dtor(void *obj) +{ + struct stasis_cache_update *update = obj; + ao2_cleanup(update->topic); + update->topic = NULL; + ao2_cleanup(update->old_snapshot); + update->old_snapshot = NULL; + ao2_cleanup(update->new_snapshot); + update->new_snapshot = NULL; + ao2_cleanup(update->type); + update->type = NULL; +} + +static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot) +{ + RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + ast_assert(topic != NULL); + ast_assert(old_snapshot != NULL || new_snapshot != NULL); + + update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor); + if (!update) { + return NULL; + } + + ao2_ref(topic, +1); + update->topic = topic; + if (old_snapshot) { + ao2_ref(old_snapshot, +1); + update->old_snapshot = old_snapshot; + if (!new_snapshot) { + ao2_ref(stasis_message_type(old_snapshot), +1); + update->type = stasis_message_type(old_snapshot); + } + } + if (new_snapshot) { + ao2_ref(new_snapshot, +1); + update->new_snapshot = new_snapshot; + ao2_ref(stasis_message_type(new_snapshot), +1); + update->type = stasis_message_type(new_snapshot); + } + + msg = stasis_message_create(stasis_cache_update(), update); + if (!msg) { + return NULL; + } + + ao2_ref(msg, +1); + return msg; +} + +static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup); + struct stasis_caching_topic *caching_topic = data; + const char *id = NULL; + + ast_assert(caching_topic->topic != NULL); + ast_assert(caching_topic->id_fn != NULL); + + if (stasis_subscription_final_message(sub, message)) { + caching_topic_needs_unref = caching_topic; + } + + /* Handle cache clear event */ + if (cache_clear_data() == stasis_message_type(message)) { + RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); + struct cache_clear_data *clear = stasis_message_data(message); + ast_assert(clear->type != NULL); + ast_assert(clear->id != NULL); + old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL); + if (old_snapshot) { + update = update_create(topic, old_snapshot, NULL); + stasis_publish(caching_topic->topic, update); + } else { + ast_log(LOG_ERROR, + "Attempting to remove an item from the cache that isn't there: %s %s\n", + stasis_message_type_name(clear->type), clear->id); + } + return; + } + + id = caching_topic->id_fn(message); + if (id == NULL) { + /* Object isn't cached; forward */ + stasis_forward_message(caching_topic->topic, topic, message); + } else { + /* Update the cache */ + RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); + + old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message); + + update = update_create(topic, old_snapshot, message); + if (update == NULL) { + return; + } + + stasis_publish(caching_topic->topic, update); + } + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(caching_topic); + } +} + +struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn) +{ + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + struct stasis_subscription *sub; + RAII_VAR(char *, new_name, NULL, free); + int ret; + + ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic)); + if (ret < 0) { + return NULL; + } + + caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor); + if (caching_topic == NULL) { + return NULL; + } + + caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp); + if (!caching_topic->cache) { + ast_log(LOG_ERROR, "Stasis cache allocation failed\n"); + return NULL; + } + + caching_topic->topic = stasis_topic_create(new_name); + if (caching_topic->topic == NULL) { + return NULL; + } + + caching_topic->id_fn = id_fn; + + sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic); + if (sub == NULL) { + return NULL; + } + /* This is for the reference contained in the subscription above */ + ao2_ref(caching_topic, +1); + caching_topic->sub = sub; + + ao2_ref(caching_topic, +1); + return caching_topic; +} + +static void stasis_cache_exit(void) +{ + ao2_cleanup(__cache_clear_data); + __cache_clear_data = NULL; + ao2_cleanup(__cache_update); + __cache_update = NULL; +} + +int stasis_cache_init(void) +{ + ast_register_atexit(stasis_cache_exit); + + if (__cache_clear_data || __cache_update) { + ast_log(LOG_ERROR, "Stasis cache double initialized\n"); + return -1; + } + + __cache_update = stasis_message_type_create("stasis_cache_update"); + if (!__cache_update) { + return -1; + } + + __cache_clear_data = stasis_message_type_create("StasisCacheClear"); + if (!__cache_clear_data) { + return -1; + } + return 0; +} + diff --git a/main/stasis_message.c b/main/stasis_message.c new file mode 100644 index 000000000..8d397b935 --- /dev/null +++ b/main/stasis_message.c @@ -0,0 +1,135 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis Message API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/utils.h" + +/*! \private */ +struct stasis_message_type { + char *name; +}; + +static void message_type_dtor(void *obj) +{ + struct stasis_message_type *type = obj; + ast_free(type->name); + type->name = NULL; +} + +struct stasis_message_type *stasis_message_type_create(const char *name) +{ + RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup); + + type = ao2_alloc(sizeof(*type), message_type_dtor); + if (!type) { + return NULL; + } + + type->name = ast_strdup(name); + if (!type->name) { + return NULL; + } + + ao2_ref(type, +1); + return type; +} + +const char *stasis_message_type_name(const struct stasis_message_type *type) +{ + return type->name; +} + +/*! \private */ +struct stasis_message { + /*! Time the message was created */ + struct timeval timestamp; + /*! Type of the message */ + struct stasis_message_type *type; + /*! Message content */ + void *data; +}; + +static void stasis_message_dtor(void *obj) +{ + struct stasis_message *message = obj; + ao2_cleanup(message->type); + ao2_cleanup(message->data); +} + +struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + if (type == NULL || data == NULL) { + return NULL; + } + + message = ao2_alloc(sizeof(*message), stasis_message_dtor); + if (message == NULL) { + return NULL; + } + + message->timestamp = ast_tvnow(); + ao2_ref(type, +1); + message->type = type; + ao2_ref(data, +1); + message->data = data; + + ao2_ref(message, +1); + return message; +} + +struct stasis_message_type *stasis_message_type(const struct stasis_message *msg) +{ + if (msg == NULL) { + return NULL; + } + return msg->type; +} + +void *stasis_message_data(const struct stasis_message *msg) +{ + if (msg == NULL) { + return NULL; + } + return msg->data; +} + +const struct timeval *stasis_message_timestamp(const struct stasis_message *msg) +{ + if (msg == NULL) { + return NULL; + } + return &msg->timestamp; +} diff --git a/tests/test_stasis.c b/tests/test_stasis.c new file mode 100644 index 000000000..b05264106 --- /dev/null +++ b/tests/test_stasis.c @@ -0,0 +1,683 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file \brief Test Stasis message bus. + * + * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim + * + * \ingroup tests + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/module.h" +#include "asterisk/stasis.h" +#include "asterisk/test.h" + +static const char *test_category = "/stasis/core/"; + +AST_TEST_DEFINE(message_type) +{ + RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup); + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test basic message_type functions"; + info->description = "Test basic message_type functions"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + ast_test_validate(test, NULL == stasis_message_type_create(NULL)); + uut = stasis_message_type_create("SomeMessage"); + ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage")); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(message) +{ + RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup); + RAII_VAR(char *, data, NULL, ao2_cleanup); + char *expected = "SomeData"; + struct timeval expected_timestamp; + struct timeval time_diff; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test basic message functions"; + info->description = "Test basic message functions"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + + type = stasis_message_type_create("SomeMessage"); + + ast_test_validate(test, NULL == stasis_message_create(NULL, NULL)); + ast_test_validate(test, NULL == stasis_message_create(type, NULL)); + + data = ao2_alloc(strlen(expected) + 1, NULL); + strcpy(data, expected); + expected_timestamp = ast_tvnow(); + uut = stasis_message_create(type, data); + + ast_test_validate(test, NULL != uut); + ast_test_validate(test, type == stasis_message_type(uut)); + ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut))); + ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */ + + time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp); + /* 10ms is certainly long enough for the two calls to complete */ + ast_test_validate(test, time_diff.tv_sec == 0); + ast_test_validate(test, time_diff.tv_usec < 10000); + + ao2_ref(uut, -1); + uut = NULL; + ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */ + + return AST_TEST_PASS; +} + +struct consumer { + ast_mutex_t lock; + ast_cond_t out; + struct stasis_message **messages_rxed; + size_t messages_rxed_len; + int ignore_subscriptions; + int complete; +}; + +static void consumer_dtor(void *obj) { + struct consumer *consumer = obj; + + ast_mutex_destroy(&consumer->lock); + ast_cond_destroy(&consumer->out); + + while (consumer->messages_rxed_len > 0) { + ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]); + } + ast_free(consumer->messages_rxed); + consumer->messages_rxed = NULL; +} + +static struct consumer *consumer_create(int ignore_subscriptions) { + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + + consumer = ao2_alloc(sizeof(*consumer), consumer_dtor); + + if (!consumer) { + return NULL; + } + + consumer->ignore_subscriptions = ignore_subscriptions; + consumer->messages_rxed = ast_malloc(0); + if (!consumer->messages_rxed) { + return NULL; + } + + ast_mutex_init(&consumer->lock); + ast_cond_init(&consumer->out, NULL); + + ao2_ref(consumer, +1); + return consumer; +} + +static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + struct consumer *consumer = data; + RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); + SCOPED_MUTEX(lock, &consumer->lock); + + if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change()) { + + ++consumer->messages_rxed_len; + consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len); + ast_assert(consumer->messages_rxed != NULL); + consumer->messages_rxed[consumer->messages_rxed_len - 1] = message; + ao2_ref(message, +1); + } + + if (stasis_subscription_final_message(sub, message)) { + consumer->complete = 1; + consumer_needs_cleanup = consumer; + } + + ast_cond_signal(&consumer->out); +} + +static int consumer_wait_for(struct consumer *consumer, size_t expected_len) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 30, + .tv_nsec = start.tv_usec * 1000 + }; + + SCOPED_MUTEX(lock, &consumer->lock); + + while (consumer->messages_rxed_len < expected_len) { + int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end); + if (r == ETIMEDOUT) { + break; + } + ast_assert(r == 0); /* Not expecting any othet types of errors */ + } + return consumer->messages_rxed_len; +} + +static int consumer_wait_for_completion(struct consumer *consumer) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 30, + .tv_nsec = start.tv_usec * 1000 + }; + + SCOPED_MUTEX(lock, &consumer->lock); + + while (!consumer->complete) { + int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end); + if (r == ETIMEDOUT) { + break; + } + ast_assert(r == 0); /* Not expecting any othet types of errors */ + } + return consumer->complete; +} + +static int consumer_should_stay(struct consumer *consumer, size_t expected_len) +{ + struct timeval start = ast_tvnow(); + struct timeval diff = { + .tv_sec = 0, + .tv_usec = 100000 /* wait for 100ms */ + }; + struct timeval end_tv = ast_tvadd(start, diff); + struct timespec end = { + .tv_sec = end_tv.tv_sec, + .tv_nsec = end_tv.tv_usec * 1000 + }; + + SCOPED_MUTEX(lock, &consumer->lock); + + while (consumer->messages_rxed_len == expected_len) { + int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end); + if (r == ETIMEDOUT) { + break; + } + ast_assert(r == 0); /* Not expecting any othet types of errors */ + } + return consumer->messages_rxed_len; +} + +AST_TEST_DEFINE(subscription_messages) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(char *, expected_uniqueid, NULL, ast_free); + int complete; + struct stasis_subscription_change *change; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test subscribe/unsubscribe messages"; + info->description = "Test subscribe/unsubscribe messages"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(0); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut)); + + stasis_unsubscribe(uut); + uut = NULL; + complete = consumer_wait_for_completion(consumer); + ast_test_validate(test, 1 == complete); + + ast_test_validate(test, 2 == consumer->messages_rxed_len); + ast_test_validate(test, stasis_subscription_change() == stasis_message_type(consumer->messages_rxed[0])); + ast_test_validate(test, stasis_subscription_change() == stasis_message_type(consumer->messages_rxed[1])); + + change = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Subscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + change = stasis_message_data(consumer->messages_rxed[1]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(publish) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test simple subscriptions"; + info->description = "Test simple subscriptions"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage"); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish(topic, test_message); + + actual_len = consumer_wait_for(consumer, 1); + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(unsubscribe_stops_messages) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test simple subscriptions"; + info->description = "Test simple subscriptions"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + stasis_unsubscribe(uut); + uut = NULL; + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage"); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish(topic, test_message); + + actual_len = consumer_should_stay(consumer, 0); + ast_test_validate(test, 0 == actual_len); + + return AST_TEST_PASS; +} + + +AST_TEST_DEFINE(forward) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + + RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending events to a parent topic"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes a message to one child, and verifies it's\n" + "only seen by that child and the parent"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + forward_sub = stasis_forward_all(topic, parent_topic); + ast_test_validate(test, NULL != forward_sub); + + parent_consumer = consumer_create(1); + ast_test_validate(test, NULL != parent_consumer); + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer); + ast_test_validate(test, NULL != parent_sub); + ao2_ref(parent_consumer, +1); + sub = stasis_subscribe(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != sub); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage"); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish(topic, test_message); + + actual_len = consumer_wait_for(consumer, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(parent_consumer, 1); + ast_test_validate(test, 1 == actual_len); + + return AST_TEST_PASS; +} + +struct cache_test_data { + char *id; + char *value; +}; + +static void cache_test_data_dtor(void *obj) +{ + struct cache_test_data *data = obj; + ast_free(data->id); + ast_free(data->value); +} + +static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value) +{ + RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup); + + data = ao2_alloc(sizeof(*data), cache_test_data_dtor); + if (data == NULL) { + return NULL; + } + + ast_assert(name != NULL); + ast_assert(value != NULL); + + data->id = ast_strdup(name); + data->value = ast_strdup(value); + if (!data->id || !data->value) { + return NULL; + } + + return stasis_message_create(type, data); +} + +static const char *cache_test_data_id(struct stasis_message *message) { + struct cache_test_data *cachable = stasis_message_data(message); + + if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) { + return NULL; + } + return cachable->id; +} + +AST_TEST_DEFINE(cache_passthrough) +{ + RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + int actual_len; + struct stasis_message_type *actual_type; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test passing messages through cache topic unscathed."; + info->description = "Test passing messages through cache topic unscathed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + non_cache_type = stasis_message_type_create("NonCacheable"); + ast_test_validate(test, NULL != non_cache_type); + topic = stasis_topic_create("SomeTopic"); + ast_test_validate(test, NULL != topic); + caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + ast_test_validate(test, NULL != caching_topic); + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer); + ast_test_validate(test, NULL != sub); + ao2_ref(consumer, +1); + + test_message = cache_test_message_create(non_cache_type, "1", "1"); + ast_test_validate(test, NULL != test_message); + + stasis_publish(topic, test_message); + + actual_len = consumer_wait_for(consumer, 1); + ast_test_validate(test, 1 == actual_len); + + actual_type = stasis_message_type(consumer->messages_rxed[0]); + ast_test_validate(test, non_cache_type == actual_type); + + ast_test_validate(test, test_message == consumer->messages_rxed[0]); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(cache) +{ + RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup); + int actual_len; + struct stasis_cache_update *actual_update; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test passing messages through cache topic unscathed."; + info->description = "Test passing messages through cache topic unscathed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + cache_type = stasis_message_type_create("Cacheable"); + ast_test_validate(test, NULL != cache_type); + topic = stasis_topic_create("SomeTopic"); + ast_test_validate(test, NULL != topic); + caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + ast_test_validate(test, NULL != caching_topic); + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer); + ast_test_validate(test, NULL != sub); + ao2_ref(consumer, +1); + + test_message1_1 = cache_test_message_create(cache_type, "1", "1"); + ast_test_validate(test, NULL != test_message1_1); + test_message2_1 = cache_test_message_create(cache_type, "2", "1"); + ast_test_validate(test, NULL != test_message2_1); + + /* Post a couple of snapshots */ + stasis_publish(topic, test_message1_1); + stasis_publish(topic, test_message2_1); + actual_len = consumer_wait_for(consumer, 2); + ast_test_validate(test, 2 == actual_len); + + /* Check for new snapshot messages */ + ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[0])); + actual_update = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, topic == actual_update->topic); + ast_test_validate(test, NULL == actual_update->old_snapshot); + ast_test_validate(test, test_message1_1 == actual_update->new_snapshot); + ast_test_validate(test, test_message1_1 == stasis_cache_get(caching_topic, cache_type, "1")); + /* stasis_cache_get returned a ref, so unref test_message1_1 */ + ao2_ref(test_message1_1, -1); + + ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[1])); + actual_update = stasis_message_data(consumer->messages_rxed[1]); + ast_test_validate(test, topic == actual_update->topic); + ast_test_validate(test, NULL == actual_update->old_snapshot); + ast_test_validate(test, test_message2_1 == actual_update->new_snapshot); + ast_test_validate(test, test_message2_1 == stasis_cache_get(caching_topic, cache_type, "2")); + /* stasis_cache_get returned a ref, so unref test_message2_1 */ + ao2_ref(test_message2_1, -1); + + /* Update snapshot 2 */ + test_message2_2 = cache_test_message_create(cache_type, "2", "2"); + ast_test_validate(test, NULL != test_message2_2); + stasis_publish(topic, test_message2_2); + + actual_len = consumer_wait_for(consumer, 3); + ast_test_validate(test, 3 == actual_len); + + actual_update = stasis_message_data(consumer->messages_rxed[2]); + ast_test_validate(test, topic == actual_update->topic); + ast_test_validate(test, test_message2_1 == actual_update->old_snapshot); + ast_test_validate(test, test_message2_2 == actual_update->new_snapshot); + ast_test_validate(test, test_message2_2 == stasis_cache_get(caching_topic, cache_type, "2")); + /* stasis_cache_get returned a ref, so unref test_message2_2 */ + ao2_ref(test_message2_2, -1); + + /* Clear snapshot 1 */ + test_message1_clear = stasis_cache_clear_create(cache_type, "1"); + ast_test_validate(test, NULL != test_message1_clear); + stasis_publish(topic, test_message1_clear); + + actual_len = consumer_wait_for(consumer, 4); + ast_test_validate(test, 4 == actual_len); + + actual_update = stasis_message_data(consumer->messages_rxed[3]); + ast_test_validate(test, topic == actual_update->topic); + ast_test_validate(test, test_message1_1 == actual_update->old_snapshot); + ast_test_validate(test, NULL == actual_update->new_snapshot); + ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1")); + + return AST_TEST_PASS; +} + +static int unload_module(void) +{ + AST_TEST_UNREGISTER(message_type); + AST_TEST_UNREGISTER(message); + AST_TEST_UNREGISTER(subscription_messages); + AST_TEST_UNREGISTER(publish); + AST_TEST_UNREGISTER(unsubscribe_stops_messages); + AST_TEST_UNREGISTER(forward); + AST_TEST_UNREGISTER(cache_passthrough); + AST_TEST_UNREGISTER(cache); + return 0; +} + +static int load_module(void) +{ + AST_TEST_REGISTER(message_type); + AST_TEST_REGISTER(message); + AST_TEST_REGISTER(subscription_messages); + AST_TEST_REGISTER(publish); + AST_TEST_REGISTER(unsubscribe_stops_messages); + AST_TEST_REGISTER(forward); + AST_TEST_REGISTER(cache_passthrough); + AST_TEST_REGISTER(cache); + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing", + .load = load_module, + .unload = unload_module + ); |