summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/channel.h117
-rw-r--r--include/asterisk/channel_internal.h1
-rw-r--r--include/asterisk/stasis.h506
-rw-r--r--main/asterisk.c6
-rw-r--r--main/asterisk.exports.in1
-rw-r--r--main/channel.c287
-rw-r--r--main/channel_internal_api.c21
-rw-r--r--main/manager.c200
-rw-r--r--main/pbx.c13
-rw-r--r--main/stasis.c514
-rw-r--r--main/stasis_cache.c443
-rw-r--r--main/stasis_message.c135
-rw-r--r--tests/test_stasis.c683
13 files changed, 2812 insertions, 115 deletions
diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h
index f7e4d2d39..eee0828be 100644
--- a/include/asterisk/channel.h
+++ b/include/asterisk/channel.h
@@ -125,7 +125,6 @@ References:
#include "asterisk/abstract_jb.h"
#include "asterisk/astobj2.h"
-
#include "asterisk/poll-compat.h"
#if defined(__cplusplus) || defined(c_plusplus)
@@ -151,6 +150,7 @@ extern "C" {
#include "asterisk/channelstate.h"
#include "asterisk/ccss.h"
#include "asterisk/framehook.h"
+#include "asterisk/stasis.h"
#define DATASTORE_INHERIT_FOREVER INT_MAX
@@ -4102,4 +4102,119 @@ int ast_channel_dialed_causes_add(const struct ast_channel *chan, const struct a
void ast_channel_dialed_causes_clear(const struct ast_channel *chan);
struct ast_flags *ast_channel_flags(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Structure representing a snapshot of channel state.
+ *
+ * While not enforced programmatically, this object is shared across multiple
+ * threads, and should be threated as an immutable object.
+ */
+struct ast_channel_snapshot {
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(name); /*!< ASCII unique channel name */
+ AST_STRING_FIELD(accountcode); /*!< Account code for billing */
+ AST_STRING_FIELD(peeraccount); /*!< Peer account code for billing */
+ AST_STRING_FIELD(userfield); /*!< Userfield for CEL billing */
+ AST_STRING_FIELD(uniqueid); /*!< Unique Channel Identifier */
+ AST_STRING_FIELD(linkedid); /*!< Linked Channel Identifier -- gets propagated by linkage */
+ AST_STRING_FIELD(parkinglot); /*!< Default parking lot, if empty, default parking lot */
+ AST_STRING_FIELD(hangupsource); /*!< Who is responsible for hanging up this channel */
+ AST_STRING_FIELD(appl); /*!< Current application */
+ AST_STRING_FIELD(data); /*!< Data passed to current application */
+ AST_STRING_FIELD(context); /*!< Dialplan: Current extension context */
+ AST_STRING_FIELD(exten); /*!< Dialplan: Current extension number */
+ AST_STRING_FIELD(caller_name); /*!< Caller ID Name */
+ AST_STRING_FIELD(caller_number); /*!< Caller ID Number */
+ AST_STRING_FIELD(connected_name); /*!< Connected Line Name */
+ AST_STRING_FIELD(connected_number); /*!< Connected Line Number */
+ );
+
+ struct timeval creationtime; /*!< The time of channel creation */
+ enum ast_channel_state state; /*!< State of line */
+ int priority; /*!< Dialplan: Current extension priority */
+ int amaflags; /*!< AMA flags for billing */
+ int hangupcause; /*!< Why is the channel hanged up. See causes.h */
+ struct ast_flags flags; /*!< channel flags of AST_FLAG_ type */
+};
+
+/*!
+ * \since 12
+ * \brief Generate a snapshot of the channel state. This is an ao2 object, so
+ * ao2_cleanup() to deallocate.
+ *
+ * \param chan The channel from which to generate a snapshot
+ *
+ * \retval pointer on success (must be ast_freed)
+ * \retval NULL on error
+ */
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_snapshot.
+ *
+ * \retval Message type for \ref ast_channel_snapshot.
+ */
+struct stasis_message_type *ast_channel_snapshot(void);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for a particular channel.
+ *
+ * \param chan Channel.
+ *
+ * \retval Topic for channel's events.
+ * \retval \c NULL if \a chan is \c NULL.
+ */
+struct stasis_topic *ast_channel_topic(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for all channels.
+ * \retval Topic for all channel events.
+ */
+struct stasis_topic *ast_channel_topic_all(void);
+
+/*!
+ * \since 12
+ * \brief A caching topic which caches \ref ast_channel_snapshot messages from
+ * ast_channel_events_all(void).
+ *
+ * \retval Topic for all channel events.
+ */
+struct stasis_caching_topic *ast_channel_topic_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Variable set event.
+ */
+struct ast_channel_varset {
+ /*! Channel variable was set on (or NULL for global variable) */
+ struct ast_channel_snapshot *snapshot;
+ /*! Variable name */
+ char *variable;
+ /*! New value */
+ char *value;
+};
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_varset messages.
+ *
+ * \retval Message type for \ref ast_channel_varset messages.
+ */
+struct stasis_message_type *ast_channel_varset(void);
+
+/*!
+ * \since 12
+ * \brief Publish a \ref ast_channel_varset for a channel.
+ *
+ * \param chan Channel to pulish the event for, or \c NULL for 'none'.
+ * \param variable Name of the variable being set
+ * \param value Value.
+ */
+void ast_channel_publish_varset(struct ast_channel *chan,
+ const char *variable, const char *value);
+
#endif /* _ASTERISK_CHANNEL_H */
diff --git a/include/asterisk/channel_internal.h b/include/asterisk/channel_internal.h
index 1b01fe042..38776c1e1 100644
--- a/include/asterisk/channel_internal.h
+++ b/include/asterisk/channel_internal.h
@@ -23,4 +23,5 @@ struct ast_channel *__ast_channel_internal_alloc(void (*destructor)(void *obj),
void ast_channel_internal_finalize(struct ast_channel *chan);
int ast_channel_internal_is_finalized(struct ast_channel *chan);
void ast_channel_internal_cleanup(struct ast_channel *chan);
+void ast_channel_internal_setup_topics(struct ast_channel *chan);
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
new file mode 100644
index 000000000..a6ab4212f
--- /dev/null
+++ b/include/asterisk/stasis.h
@@ -0,0 +1,506 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_STASIS_H
+#define _ASTERISK_STASIS_H
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
+ * detailed documentation.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ * \since 12
+ *
+ * \page stasis Stasis Message Bus API
+ *
+ * \par Intro
+ *
+ * The Stasis Message Bus is a loosely typed mechanism for distributing messages
+ * within Asterisk. It is designed to be:
+ * - Loosely coupled; new message types can be added in seperate modules.
+ * - Easy to use; publishing and subscribing are straightforward operations.
+ * - Consistent memory management; all message bus objects are AO2 managed
+ * objects, using ao2_ref() and ao2_cleanup() to manage the reference
+ * counting.
+ *
+ * There are three main concepts for using the Stasis Message Bus:
+ * - \ref stasis_message
+ * - \ref stasis_topic
+ * - \ref stasis_subscription
+ *
+ * \par stasis_message
+ *
+ * Central to the Stasis Message Bus is the \ref stasis_message, the messages
+ * that are sent on the bus. These messages have:
+ * - a type (as defined by a \ref stasis_message_type)
+ * - a value - a \c void pointer to an AO2 object
+ * - a timestamp when it was created
+ *
+ * Once a \ref stasis_message has been created, it is immutable and cannot
+ * change. The same goes for the value of the message (although this cannot be
+ * enforced in code). Messages themselves are reference-counted, AO2 objects,
+ * along with their values. By being both reference counted and immutable,
+ * messages can be shared throughout the system without any concerns for
+ * threading. (Well, the objects must be allocated with \ref
+ * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread
+ * safe. But other than that, no worries).
+ *
+ * The type of a message is defined by an instance of \ref stasis_message_type,
+ * which can be created by calling stasis_message_type_create(). Message types
+ * are named, which is useful in debugging. It is recommended that the string
+ * name for a message type match the name of the struct that's stored in the
+ * message. For example, name for \ref stasis_cache_update's message type is \c
+ * "stasis_cache_update".
+ *
+ * \par stasis_topic
+ *
+ * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
+ * subscribed, and \ref stasis_message's may be published. Any message published
+ * to the topic is dispatched to all of its subscribers. The topic itself may be
+ * named, which is useful in debugging.
+ *
+ * Topics themselves are reference counted objects, and automagically
+ * unsubscribe all of their subscribers when they are destroyed. Topics are also
+ * thread safe, so no worries about publishing/subscribing/unsubscribing to a
+ * topic concurrently from multiple threads. It's also designed to handle the
+ * case of unsubscribing from a topic from within the subscription handler.
+ *
+ * \par Forwarding
+ *
+ * There is one special case of topics that's worth noting: forwarding
+ * messages. It's a fairly common use case to want to forward all the messages
+ * published on one topic to another one (for example, an aggregator topic that
+ * publishes all the events from a set of other topics). This can be
+ * accomplished easily using stasis_forward_all(). This sets up the forwarding
+ * between the two topics, and returns a \ref stasis_subscription, which can be
+ * unsubscribed to stop the forwarding.
+ *
+ * \par Caching
+ *
+ * Another common use case is to want to cache certain messages that are
+ * published on the bus. Usually these events are snapshots of the current state
+ * in the system, and it's desirable to query that state from the cache without
+ * locking the original object. It's also desirable for subscribers of the
+ * caching topic to receive messages that have both the old cache value and the
+ * new value being put into the cache. For this, we have
+ * stasis_caching_topic_create(), providing it with the topic which publishes
+ * the messages that you wish to cache, and a function that can identify
+ * cacheable messages.
+ *
+ * The returned \ref stasis_caching_topic provides a topic that forwards
+ * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
+ * stasis_cache_update message which provides the old snapshot (or \c NULL if
+ * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
+ * removed from the cache). A stasis_cache_clear_create() message must be sent
+ * to the topic in order to remove entries from the cache.
+ *
+ * As with all things Stasis, the \ref stasis_caching_topic is a reference
+ * counted AO2 object.
+ *
+ * \par stasis_subscriber
+ *
+ * Any topic may be subscribed to by simply providing stasis_subscribe() the
+ * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
+ * data that is passed back to the handler. Invocations on the subscription's
+ * handler are serialized, but different invocations may occur on different
+ * threads (this usually isn't important unless you use thread locals or
+ * something similar).
+ *
+ * Since the topic (by necessity) holds a reference to the subscription,
+ * reference counting alone is insufficient to terminate a subscription. In
+ * order to stop receiving messages, call stasis_unsubscribe() with your \ref
+ * stasis_subscription. This will remove the topic's reference to the
+ * subscription, and allow it to be destroyed when all of the other references
+ * are cleaned up.
+ */
+
+#include "asterisk/utils.h"
+
+/*! @{ */
+
+/*!
+ * \brief Metadata about a \ref stasis_message.
+ * \since 12
+ */
+struct stasis_message_type;
+
+/*!
+ * \brief Register a new message type.
+ *
+ * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
+ * with it.
+ *
+ * \param name Name of the new type.
+ * \return Pointer to the new type.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type_create(const char *name);
+
+/*!
+ * \brief Gets the name of a given message type
+ * \param type The type to get.
+ * \return Name of the type.
+ * \return \c NULL if \a type is \c NULL.
+ * \since 12
+ */
+const char *stasis_message_type_name(const struct stasis_message_type *type);
+
+/*!
+ * \brief Opaque type for a Stasis message.
+ * \since 12
+ */
+struct stasis_message;
+
+/*!
+ * \brief Create a new message.
+ *
+ * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
+ * with it. Messages are also immutable, and must not be modified after they
+ * are initialized. Especially the \a data in the message.
+ *
+ * \param type Type of the message
+ * \param data Immutable data that is the actual contents of the message
+ * \return New message
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
+
+/*!
+ * \brief Get the message type for a \ref stasis_message.
+ * \param msg Message to type
+ * \return Type of \a msg
+ * \return \c NULL if \a msg is \c NULL.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the data contained in a message.
+ * \param msg Message.
+ * \return Immutable data pointer
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+void *stasis_message_data(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the time when a message was created.
+ * \param msg Message.
+ * \return Pointer to the \a timeval when the message was created.
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic to which messages may be posted, and subscribers, well, subscribe
+ * \since 12
+ */
+struct stasis_topic;
+
+/*!
+ * \brief Create a new topic.
+ * \param name Name of the new topic.
+ * \return New topic instance.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_topic *stasis_topic_create(const char *name);
+
+/*!
+ * \brief Return the name of a topic.
+ * \param topic Topic.
+ * \return Name of the topic.
+ * \return \c NULL if topic is \c NULL.
+ * \since 12
+ */
+const char *stasis_topic_name(const struct stasis_topic *topic);
+
+/*!
+ * \brief Publish a message to a topic's subscribers.
+ * \param topic Topic.
+ * \param message Message to publish.
+ * \since 12
+ */
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Publish a message from a specified topic to all the subscribers of a
+ * possibly different topic.
+ * \param topic Topic to publish message to.
+ * \param topic Original topic message was from.
+ * \param message Message
+ * \since 12
+ */
+void stasis_forward_message(struct stasis_topic *topic,
+ struct stasis_topic *publisher_topic,
+ struct stasis_message *message);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Opaque type for a Stasis subscription.
+ * \since 12
+ */
+struct stasis_subscription;
+
+/*!
+ * \brief Callback function type for Stasis subscriptions.
+ * \param data Data field provided with subscription.
+ * \param topic Topic to which the message was published.
+ * \param message Published message.
+ * \since 12
+ */
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Create a subscription.
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but may not always occur on
+ * the same thread. The invocation order of different subscriptions is
+ * unspecified.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data);
+
+/*!
+ * \brief Cancel a subscription.
+ *
+ * Note that in an asynchronous system, there may still be messages queued or
+ * in transit to the subscription's callback. These will still be delivered.
+ * There will be a final 'SubscriptionCancelled' message, indicating the
+ * delivery of the final message.
+ *
+ * \param subscription Subscription to cancel.
+ * \since 12
+ */
+void stasis_unsubscribe(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Create a subscription which forwards all messages from one topic to
+ * another.
+ *
+ * Note that the \a topic parameter of the invoked callback will the be \a topic
+ * the message was sent to, not the topic the subscriber subscribed to.
+ *
+ * \param from_topic Topic to forward.
+ * \param to_topic Destination topic of forwarded messages.
+ * \return New forwarding subscription.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+
+/*!
+ * \brief Get the unique ID for the subscription.
+ *
+ * \param sub Subscription for which to get the unique ID.
+ * \return Unique ID for the subscription.
+ * \since 12
+ */
+const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
+
+/*!
+ * \brief Returns whether a subscription is currently subscribed.
+ *
+ * Note that there may still be messages queued up to be dispatched to this
+ * subscription, but the stasis_subscription_final_message() has been enqueued.
+ *
+ * \param sub Subscription to check
+ * \return False (zero) if subscription is not subscribed.
+ * \return True (non-zero) if still subscribed.
+ */
+int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
+
+/*!
+ * \brief Determine whether a message is the final message to be received on a subscription.
+ *
+ * \param sub Subscription on which the message was received.
+ * \param msg Message to check.
+ * \return zero if the provided message is not the final message.
+ * \return non-zero if the provided message is the final message.
+ * \since 12
+ */
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
+
+/*!
+ * \brief Holds details about changes to subscriptions for the specified topic
+ * \since 12
+ */
+struct stasis_subscription_change {
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(uniqueid); /*!< The unique ID associated with this subscription */
+ AST_STRING_FIELD(description); /*!< The description of the change to the subscription associated with the uniqueid */
+ );
+ struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
+};
+
+/*!
+ * \brief Gets the message type for subscription change notices
+ * \return The stasis_message_type for subscription change notices
+ * \since 12
+ */
+struct stasis_message_type *stasis_subscription_change(void);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic wrapper, which caches certain messages.
+ * \since 12
+ */
+struct stasis_caching_topic;
+
+/*!
+ * \brief Message type for cache update messages.
+ * \return Message type for cache update messages.
+ * \since 12
+ */
+struct stasis_message_type *stasis_cache_update(void);
+
+/*!
+ * \brief Cache update message
+ * \since 12
+ */
+struct stasis_cache_update {
+ /*! \brief Topic that published \c new_snapshot */
+ struct stasis_topic *topic;
+ /*! \brief Convenience reference to snapshot type */
+ struct stasis_message_type *type;
+ /*! \brief Old value from the cache */
+ struct stasis_message *old_snapshot;
+ /*! \brief New value */
+ struct stasis_message *new_snapshot;
+};
+
+/*!
+ * \brief A message which instructs the caching topic to remove an entry from its cache.
+ * \param type Message type.
+ * \param id Unique id of the snapshot to clear.
+ * \return Message which, when sent to the \a topic, will clear the item from the cache.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Callback extract a unique identity from a snapshot message.
+ *
+ * This identity is unique to the underlying object of the snapshot, such as the
+ * UniqueId field of a channel.
+ *
+ * \param message Message to extract id from.
+ * \return String representing the snapshot's id.
+ * \return \c NULL if the message_type of the message isn't a handled snapshot.
+ * \since 12
+ */
+typedef const char *(*snapshot_get_id)(struct stasis_message *message);
+
+/*!
+ * \brief Create a topic which monitors and caches messages from another topic.
+ *
+ * The idea is that some topics publish 'snapshots' of some other object's state
+ * that should be cached. When these snapshot messages are received, the cache
+ * is updated, and a stasis_cache_update() message is forwarded, which has both
+ * the original snapshot message and the new message.
+ *
+ * \param original_topic Topic publishing snapshot messages.
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \return New topic which changes snapshot messages to stasis_cache_update()
+ * messages, and forwards all other messages from the original topic.
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+
+/*!
+ * Unsubscribes a caching topic from its upstream topic.
+ * \param caching_topic Caching topic to unsubscribe
+ * \since 12
+ */
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Returns the topic of cached events from a caching topics.
+ * \param caching_topic The caching topic.
+ * \return The topic that publishes cache update events, along with passthrough events
+ * from the underlying topic.
+ * \return \c NULL if \a caching_topic is \c NULL.
+ * \since 12
+ */
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Retrieve an item from the cache.
+ * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param type Type of message to retrieve.
+ * \param id Identity of the snapshot to retrieve.
+ * \return Message from the cache. The cache still owns the message, so
+ * ao2_ref() if you want to keep it.
+ * \return \c NULL if message is not found.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic,
+ struct stasis_message_type *type,
+ const char *id);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Initialize the Stasis subsystem
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_init(void);
+
+/*!
+ * \private
+ * \brief called by stasis_init() for cache initialization.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_cache_init(void);
+
+/*! @} */
+
+#endif /* _ASTERISK_STASIS_H */
diff --git a/main/asterisk.c b/main/asterisk.c
index 1d5371938..4e5e58c05 100644
--- a/main/asterisk.c
+++ b/main/asterisk.c
@@ -240,6 +240,7 @@ int daemon(int, int); /* defined in libresolv of all places */
#include "asterisk/aoc.h"
#include "asterisk/uuid.h"
#include "asterisk/sorcery.h"
+#include "asterisk/stasis.h"
#include "../defaults.h"
@@ -4120,6 +4121,11 @@ int main(int argc, char *argv[])
exit(1);
}
+ if (stasis_init()) {
+ printf("Stasis initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
ast_makesocket();
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
diff --git a/main/asterisk.exports.in b/main/asterisk.exports.in
index 49d3a44a4..3157b8319 100644
--- a/main/asterisk.exports.in
+++ b/main/asterisk.exports.in
@@ -32,6 +32,7 @@
LINKER_SYMBOL_PREFIXdialed_interface_info;
LINKER_SYMBOL_PREFIXstrsep;
LINKER_SYMBOL_PREFIXsetenv;
+ LINKER_SYMBOL_PREFIXstasis_*;
LINKER_SYMBOL_PREFIXunsetenv;
LINKER_SYMBOL_PREFIXstrcasestr;
LINKER_SYMBOL_PREFIXstrnlen;
diff --git a/main/channel.c b/main/channel.c
index 9c8f32cb5..df0a67b3b 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -152,6 +152,15 @@ static AST_RWLIST_HEAD_STATIC(backends, chanlist);
/*! \brief All active channels on the system */
static struct ao2_container *channels;
+/*! \brief Message type for channel snapshot events */
+static struct stasis_message_type *__channel_snapshot;
+
+static struct stasis_message_type *__channel_varset;
+
+struct stasis_topic *__channel_topic_all;
+
+struct stasis_caching_topic *__channel_topic_all_cached;
+
/*! \brief map AST_CAUSE's to readable string representations
*
* \ref causes.h
@@ -214,6 +223,77 @@ static const struct causes_map causes[] = {
{ AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" },
};
+static void publish_channel_state(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ snapshot = ast_channel_snapshot_create(chan);
+ if (!snapshot) {
+ ast_log(LOG_ERROR, "Allocation error\n");
+ return;
+ }
+
+ message = stasis_message_create(ast_channel_snapshot(), snapshot);
+ if (!message) {
+ return;
+ }
+
+ ast_assert(ast_channel_topic(chan) != NULL);
+ stasis_publish(ast_channel_topic(chan), message);
+}
+
+static void channel_varset_dtor(void *obj)
+{
+ struct ast_channel_varset *event = obj;
+ ao2_cleanup(event->snapshot);
+ ast_free(event->variable);
+ ast_free(event->value);
+}
+
+void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
+{
+ RAII_VAR(struct ast_channel_varset *, event, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ event = ao2_alloc(sizeof(*event), channel_varset_dtor);
+ if (!event) {
+ return;
+ }
+
+ if (chan) {
+ event->snapshot = ast_channel_snapshot_create(chan);
+ if (event->snapshot == NULL) {
+ return;
+ }
+ }
+ event->variable = ast_strdup(name);
+ event->value = ast_strdup(value);
+ if (event->variable == NULL || event->value == NULL) {
+ return;
+ }
+
+ msg = stasis_message_create(ast_channel_varset(), event);
+ if (!msg) {
+ return;
+ }
+
+ if (chan) {
+ stasis_publish(ast_channel_topic(chan), msg);
+ } else {
+ stasis_publish(ast_channel_topic_all(), msg);
+ }
+}
+
+
+static void publish_cache_clear(struct ast_channel *chan)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan));
+ stasis_publish(ast_channel_topic(chan), message);
+}
+
struct ast_variable *ast_channeltype_list(void)
{
struct chanlist *cl;
@@ -1073,6 +1153,8 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp));
}
+ ast_channel_internal_setup_topics(tmp);
+
if (!ast_strlen_zero(name_fmt)) {
char *slash, *slash2;
/* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call.
@@ -1145,34 +1227,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
* a lot of data into this func to do it here!
*/
if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) {
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a new channel is created.</synopsis>
- <syntax>
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" />
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" />
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "AccountCode: %s\r\n"
- "Exten: %s\r\n"
- "Context: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(tmp),
- state,
- ast_state2str(state),
- S_OR(cid_num, ""),
- S_OR(cid_name, ""),
- ast_channel_accountcode(tmp),
- S_OR(exten, ""),
- S_OR(context, ""),
- ast_channel_uniqueid(tmp));
+ publish_channel_state(tmp);
}
ast_channel_internal_finalize(tmp);
@@ -2893,39 +2948,9 @@ int ast_hangup(struct ast_channel *chan)
ast_channel_unlock(chan);
ast_cc_offer(chan);
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel is hung up.</synopsis>
- <syntax>
- <parameter name="Cause">
- <para>A numeric cause code for why the channel was hung up.</para>
- </parameter>
- <parameter name="Cause-txt">
- <para>A description of why the channel was hung up.</para>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup",
- "Channel: %s\r\n"
- "Uniqueid: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "AccountCode: %s\r\n"
- "Cause: %d\r\n"
- "Cause-txt: %s\r\n",
- ast_channel_name(chan),
- ast_channel_uniqueid(chan),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"),
- ast_channel_accountcode(chan),
- ast_channel_hangupcause(chan),
- ast_cause2str(ast_channel_hangupcause(chan))
- );
+
+ publish_channel_state(chan);
+ publish_cache_clear(chan);
if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) &&
!ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) &&
@@ -7435,47 +7460,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state)
* we override what they are saying the state is and things go amuck. */
ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name);
- /* setstate used to conditionally report Newchannel; this is no more */
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel's state changes.</synopsis>
- <syntax>
- <parameter name="ChannelState">
- <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
- </parameter>
- <parameter name="ChannelStateDesc">
- <enumlist>
- <enum name="Down"/>
- <enum name="Rsrvd"/>
- <enum name="OffHook"/>
- <enum name="Dialing"/>
- <enum name="Ring"/>
- <enum name="Ringing"/>
- <enum name="Up"/>
- <enum name="Busy"/>
- <enum name="Dialing Offhook"/>
- <enum name="Pre-ring"/>
- <enum name="Unknown"/>
- </enumlist>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""),
- ast_channel_uniqueid(chan));
+ publish_channel_state(chan);
return 0;
}
@@ -8644,6 +8629,14 @@ static void prnt_channel_key(void *v_obj, void *where, ao2_prnt_fn *prnt)
static void channels_shutdown(void)
{
+ ao2_cleanup(__channel_snapshot);
+ __channel_snapshot = NULL;
+ ao2_cleanup(__channel_varset);
+ __channel_varset = NULL;
+ ao2_cleanup(__channel_topic_all);
+ __channel_topic_all = NULL;
+ stasis_caching_unsubscribe(__channel_topic_all_cached);
+ __channel_topic_all_cached = NULL;
ast_data_unregister(NULL);
ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
if (channels) {
@@ -8653,6 +8646,16 @@ static void channels_shutdown(void)
}
}
+static const char *channel_snapshot_get_id(struct stasis_message *message)
+{
+ struct ast_channel_snapshot *snapshot;
+ if (ast_channel_snapshot() != stasis_message_type(message)) {
+ return NULL;
+ }
+ snapshot = stasis_message_data(message);
+ return snapshot->uniqueid;
+}
+
void ast_channels_init(void)
{
channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS,
@@ -8661,6 +8664,12 @@ void ast_channels_init(void)
ao2_container_register("channels", channels, prnt_channel_key);
}
+ __channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+ __channel_varset = stasis_message_type_create("ast_channel_varset");
+
+ __channel_topic_all = stasis_topic_create("ast_channel_topic_all");
+ __channel_topic_all_cached = stasis_caching_topic_create(__channel_topic_all, channel_snapshot_get_id);
+
ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers));
@@ -8668,6 +8677,7 @@ void ast_channels_init(void)
ast_plc_reload();
ast_register_atexit(channels_shutdown);
+
}
/*! \brief Print call group and pickup group ---*/
@@ -11241,6 +11251,79 @@ int ast_channel_get_cc_agent_type(struct ast_channel *chan, char *agent_type, si
return 0;
}
+static void ast_channel_snapshot_dtor(void *obj)
+{
+ struct ast_channel_snapshot *snapshot = obj;
+ ast_string_field_free_memory(snapshot);
+}
+
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor);
+ if (ast_string_field_init(snapshot, 1024)) {
+ return NULL;
+ }
+
+ ast_string_field_set(snapshot, name, ast_channel_name(chan));
+ ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
+ ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
+ ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
+ ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
+ ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
+ ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan));
+ ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
+ if (ast_channel_appl(chan)) {
+ ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
+ }
+ if (ast_channel_data(chan)) {
+ ast_string_field_set(snapshot, data, ast_channel_data(chan));
+ }
+ ast_string_field_set(snapshot, context, ast_channel_context(chan));
+ ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
+
+ ast_string_field_set(snapshot, caller_name,
+ S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, caller_number,
+ S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
+
+ ast_string_field_set(snapshot, connected_name,
+ S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, connected_number,
+ S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
+
+ snapshot->creationtime = ast_channel_creationtime(chan);
+ snapshot->state = ast_channel_state(chan);
+ snapshot->priority = ast_channel_priority(chan);
+ snapshot->amaflags = ast_channel_amaflags(chan);
+ snapshot->hangupcause = ast_channel_hangupcause(chan);
+ snapshot->flags = *ast_channel_flags(chan);
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
+
+struct stasis_message_type *ast_channel_varset(void)
+{
+ return __channel_varset;
+}
+
+struct stasis_message_type *ast_channel_snapshot(void)
+{
+ return __channel_snapshot;
+}
+
+struct stasis_topic *ast_channel_topic_all(void)
+{
+ return __channel_topic_all;
+}
+
+struct stasis_caching_topic *ast_channel_topic_all_cached(void)
+{
+ return __channel_topic_all_cached;
+}
+
/* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY
*
* ONLY FUNCTIONS FOR PROVIDING BACKWARDS ABI COMPATIBILITY BELONG HERE
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index 3f892ddef..8cc2e6c62 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -195,6 +195,8 @@ struct ast_channel {
char dtmf_digit_to_emulate; /*!< Digit being emulated */
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
+ struct stasis_topic *topic; /*!< Topic for all channel's events */
+ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
};
/* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1364,6 +1366,12 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
}
ast_string_field_free_memory(chan);
+
+ stasis_unsubscribe(chan->forwarder);
+ chan->forwarder = NULL;
+
+ ao2_cleanup(chan->topic);
+ chan->topic = NULL;
}
void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1375,3 +1383,16 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan)
{
return chan->finalized;
}
+
+struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
+{
+ return chan->topic;
+}
+
+void ast_channel_internal_setup_topics(struct ast_channel *chan)
+{
+ ast_assert(chan->topic == NULL);
+ ast_assert(chan->forwarder == NULL);
+ chan->topic = stasis_topic_create(chan->uniqueid);
+ chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
+}
diff --git a/main/manager.c b/main/manager.c
index fc0ec2631..10a3a3397 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -91,6 +91,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/aoc.h"
#include "asterisk/stringfields.h"
#include "asterisk/presencestate.h"
+#include "asterisk/stasis.h"
/*** DOCUMENTATION
<manager name="Ping" language="en_US">
@@ -963,6 +964,73 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
manager.conf will be present upon starting a new session.</para>
</description>
</manager>
+ <managerEvent language="en_US" name="Newchannel">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a new channel is created.</synopsis>
+ <syntax>
+ <parameter name="Channel">
+ </parameter>
+ <parameter name="ChannelState">
+ <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
+ </parameter>
+ <parameter name="ChannelStateDesc">
+ <enumlist>
+ <enum name="Down"/>
+ <enum name="Rsrvd"/>
+ <enum name="OffHook"/>
+ <enum name="Dialing"/>
+ <enum name="Ring"/>
+ <enum name="Ringing"/>
+ <enum name="Up"/>
+ <enum name="Busy"/>
+ <enum name="Dialing Offhook"/>
+ <enum name="Pre-ring"/>
+ <enum name="Unknown"/>
+ </enumlist>
+ </parameter>
+ <parameter name="CallerIDNum">
+ </parameter>
+ <parameter name="CallerIDName">
+ </parameter>
+ <parameter name="ConnectedLineNum">
+ </parameter>
+ <parameter name="ConnectedLineName">
+ </parameter>
+ <parameter name="AccountCode">
+ </parameter>
+ <parameter name="Context">
+ </parameter>
+ <parameter name="Exten">
+ </parameter>
+ <parameter name="Priority">
+ </parameter>
+ <parameter name="Uniqueid">
+ </parameter>
+ <parameter name="Cause">
+ <para>A numeric cause code for why the channel was hung up.</para>
+ </parameter>
+ <parameter name="Cause-txt">
+ <para>A description of why the channel was hung up.</para>
+ </parameter>
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
+ <managerEvent language="en_US" name="Newstate">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a channel's state changes.</synopsis>
+ <syntax>
+ <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
+ <managerEvent language="en_US" name="Hangup">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a channel is hung up.</synopsis>
+ <syntax>
+ <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
***/
/*! \addtogroup Group_AMI AMI functions
@@ -1060,6 +1128,8 @@ static const struct {
{{ "restart", "gracefully", NULL }},
};
+static struct stasis_subscription *channel_state_sub;
+
static void acl_change_event_cb(const struct ast_event *event, void *userdata);
static void acl_change_event_subscribe(void)
@@ -7376,6 +7446,127 @@ static void load_channelvars(struct ast_variable *var)
AST_RWLIST_UNLOCK(&channelvars);
}
+/*!
+ * \brief Generate the AMI message body from a channel snapshot
+ * \internal
+ *
+ * \param snapshot the channel snapshot for which to generate an AMI message body
+ *
+ * \retval NULL on error
+ * \retval ast_str* on success (must be ast_freed by caller)
+ */
+static struct ast_str *manager_build_channel_state_string(const struct ast_channel_snapshot *snapshot)
+{
+ struct ast_str *out = ast_str_create(1024);
+ int res = 0;
+ if (!out) {
+ return NULL;
+ }
+ res = ast_str_set(&out, 0,
+ "Channel: %s\r\n"
+ "ChannelState: %d\r\n"
+ "ChannelStateDesc: %s\r\n"
+ "CallerIDNum: %s\r\n"
+ "CallerIDName: %s\r\n"
+ "ConnectedLineNum: %s\r\n"
+ "ConnectedLineName: %s\r\n"
+ "AccountCode: %s\r\n"
+ "Context: %s\r\n"
+ "Exten: %s\r\n"
+ "Priority: %d\r\n"
+ "Uniqueid: %s\r\n"
+ "Cause: %d\r\n"
+ "Cause-txt: %s\r\n",
+ snapshot->name,
+ snapshot->state,
+ ast_state2str(snapshot->state),
+ snapshot->caller_number,
+ snapshot->caller_name,
+ snapshot->connected_number,
+ snapshot->connected_name,
+ snapshot->accountcode,
+ snapshot->context,
+ snapshot->exten,
+ snapshot->priority,
+ snapshot->uniqueid,
+ snapshot->hangupcause,
+ ast_cause2str(snapshot->hangupcause));
+
+ if (!res) {
+ return NULL;
+ }
+
+ return out;
+}
+
+static void channel_snapshot_update(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot)
+{
+ int is_hungup;
+ char *manager_event = NULL;
+
+ if (!new_snapshot) {
+ /* Ignore cache clearing events; we'll see the hangup first */
+ return;
+ }
+
+ is_hungup = ast_test_flag(&new_snapshot->flags, AST_FLAG_ZOMBIE) ? 1 : 0;
+
+ if (!old_snapshot) {
+ manager_event = "Newchannel";
+ }
+
+ if (old_snapshot && old_snapshot->state != new_snapshot->state) {
+ manager_event = "Newstate";
+ }
+
+ if (old_snapshot && is_hungup) {
+ manager_event = "Hangup";
+ }
+
+ if (manager_event) {
+ RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+
+ channel_event_string = manager_build_channel_state_string(new_snapshot);
+ if (channel_event_string) {
+ manager_event(EVENT_FLAG_CALL, manager_event, "%s", ast_str_buffer(channel_event_string));
+ }
+ }
+}
+
+static void channel_varset(const char *channel_name, const char *uniqueid, const char *name, const char *value)
+{
+ /*** DOCUMENTATION
+ <managerEventInstance>
+ <synopsis>Raised when a variable is set to a particular value.</synopsis>
+ </managerEventInstance>
+ ***/
+ manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
+ "Channel: %s\r\n"
+ "Variable: %s\r\n"
+ "Value: %s\r\n"
+ "Uniqueid: %s\r\n",
+ channel_name, name, value, uniqueid);
+}
+
+static void channel_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ if (stasis_message_type(message) == stasis_cache_update()) {
+ struct stasis_cache_update *update = stasis_message_data(message);
+ if (ast_channel_snapshot() == update->type) {
+ struct ast_channel_snapshot *old_snapshot =
+ stasis_message_data(update->old_snapshot);
+ struct ast_channel_snapshot *new_snapshot =
+ stasis_message_data(update->new_snapshot);
+ channel_snapshot_update(old_snapshot, new_snapshot);
+ }
+ } else if (stasis_message_type(message) == ast_channel_varset()) {
+ struct ast_channel_varset *varset = stasis_message_data(message);
+ const char *name = varset->snapshot ? varset->snapshot->name : "none";
+ const char *uniqueid = varset->snapshot ? varset->snapshot->uniqueid : "none";
+ channel_varset(name, uniqueid, varset->variable, varset->value);
+ }
+}
+
/*! \internal \brief Free a user record. Should already be removed from the list */
static void manager_free_user(struct ast_manager_user *user)
{
@@ -7399,6 +7590,9 @@ static void manager_shutdown(void)
{
struct ast_manager_user *user;
+ stasis_unsubscribe(channel_state_sub);
+ channel_state_sub = NULL;
+
if (registered) {
ast_manager_unregister("Ping");
ast_manager_unregister("Events");
@@ -7490,6 +7684,12 @@ static int __init_manager(int reload, int by_external_config)
manager_enabled = 0;
+ if (!channel_state_sub) {
+ channel_state_sub = stasis_subscribe(
+ stasis_caching_get_topic(ast_channel_topic_all_cached()),
+ channel_event_cb, NULL);
+ }
+
if (!registered) {
/* Register default actions */
ast_manager_register_xml_core("Ping", 0, action_ping);
diff --git a/main/pbx.c b/main/pbx.c
index bf95ccbe2..82bbb5257 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -11453,18 +11453,7 @@ int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const
ast_verb(2, "Setting global variable '%s' to '%s'\n", name, value);
newvariable = ast_var_assign(name, value);
AST_LIST_INSERT_HEAD(headp, newvariable, entries);
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a variable is set to a particular value.</synopsis>
- </managerEventInstance>
- ***/
- manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
- "Channel: %s\r\n"
- "Variable: %s\r\n"
- "Value: %s\r\n"
- "Uniqueid: %s\r\n",
- chan ? ast_channel_name(chan) : "none", name, value,
- chan ? ast_channel_uniqueid(chan) : "none");
+ ast_channel_publish_varset(chan, name, value);
}
if (chan)
diff --git a/main/stasis.c b/main/stasis.c
new file mode 100644
index 000000000..f94736bf1
--- /dev/null
+++ b/main/stasis.c
@@ -0,0 +1,514 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/threadpool.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/utils.h"
+#include "asterisk/uuid.h"
+
+/*! Initial size of the subscribers list. */
+#define INITIAL_SUBSCRIBERS_MAX 4
+
+/*! Threadpool for dispatching notifications to subscribers */
+static struct ast_threadpool *pool;
+
+static struct stasis_message_type *__subscription_change_message_type;
+
+/*! \private */
+struct stasis_topic {
+ char *name;
+ /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
+ struct stasis_subscription **subscribers;
+ /*! Allocated length of the subscribers array */
+ size_t num_subscribers_max;
+ /*! Current size of the subscribers array */
+ size_t num_subscribers_current;
+};
+
+/* Forward declarations for the tightly-coupled subscription object */
+struct stasis_subscription;
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+
+static void topic_dtor(void *obj)
+{
+ struct stasis_topic *topic = obj;
+ ast_free(topic->name);
+ topic->name = NULL;
+ ast_free(topic->subscribers);
+ topic->subscribers = NULL;
+}
+
+struct stasis_topic *stasis_topic_create(const char *name)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+
+ topic = ao2_alloc(sizeof(*topic), topic_dtor);
+
+ if (!topic) {
+ return NULL;
+ }
+
+ topic->name = ast_strdup(name);
+ if (!topic->name) {
+ return NULL;
+ }
+
+ topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
+ topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers));
+ if (!topic->subscribers) {
+ return NULL;
+ }
+
+ ao2_ref(topic, +1);
+ return topic;
+}
+
+const char *stasis_topic_name(const struct stasis_topic *topic)
+{
+ return topic->name;
+}
+
+/*! \private */
+struct stasis_subscription {
+ /*! Unique ID for this subscription */
+ char *uniqueid;
+ /*! Topic subscribed to. */
+ struct stasis_topic *topic;
+ /*! Mailbox for processing incoming messages. */
+ struct ast_taskprocessor *mailbox;
+ /*! Callback function for incoming message processing. */
+ stasis_subscription_cb callback;
+ /*! Data pointer to be handed to the callback. */
+ void *data;
+};
+
+static void subscription_dtor(void *obj)
+{
+ struct stasis_subscription *sub = obj;
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+ ast_free(sub->uniqueid);
+ sub->uniqueid = NULL;
+ ao2_cleanup(sub->topic);
+ sub->topic = NULL;
+ ast_taskprocessor_unreference(sub->mailbox);
+ sub->mailbox = NULL;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+
+static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
+{
+ RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
+ char uniqueid[AST_UUID_STR_LEN];
+
+ sub = ao2_alloc(sizeof(*sub), subscription_dtor);
+ if (!sub) {
+ return NULL;
+ }
+
+ id = ast_uuid_generate();
+ if (!id) {
+ ast_log(LOG_ERROR, "UUID generation failed\n");
+ return NULL;
+ }
+ ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
+ if (needs_mailbox) {
+ sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
+ if (!sub->mailbox) {
+ return NULL;
+ }
+ }
+
+ sub->uniqueid = ast_strdup(uniqueid);
+ ao2_ref(topic, +1);
+ sub->topic = topic;
+ sub->callback = callback;
+ sub->data = data;
+
+ if (topic_add_subscription(topic, sub) != 0) {
+ return NULL;
+ }
+ send_subscription_change_message(topic, uniqueid, "Subscribe");
+
+ ao2_ref(sub, +1);
+ return sub;
+}
+
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+{
+ return __stasis_subscribe(topic, callback, data, 1);
+}
+
+void stasis_unsubscribe(struct stasis_subscription *sub)
+{
+ if (sub) {
+ size_t i;
+ struct stasis_topic *topic = sub->topic;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ if (topic->subscribers[i] == sub) {
+ send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
+ /* swap [i] with last entry; remove last entry */
+ topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
+ return;
+ }
+ }
+
+ ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+ }
+}
+
+int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
+{
+ if (sub) {
+ size_t i;
+ struct stasis_topic *topic = sub->topic;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ if (topic->subscribers[i] == sub) {
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
+{
+ return sub->uniqueid;
+}
+
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
+{
+ struct stasis_subscription_change *change;
+ if (stasis_message_type(msg) != stasis_subscription_change()) {
+ return 0;
+ }
+
+ change = stasis_message_data(msg);
+ if (strcmp("Unsubscribe", change->description)) {
+ return 0;
+ }
+
+ if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*!
+ * \brief Add a subscriber to a topic.
+ * \param topic Topic
+ * \param sub Subscriber
+ * \return 0 on success
+ * \return Non-zero on error
+ */
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+ struct stasis_subscription **subscribers;
+ SCOPED_AO2LOCK(lock, topic);
+
+ /* Increase list size, if needed */
+ if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
+ subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
+ if (!subscribers) {
+ return -1;
+ }
+ topic->subscribers = subscribers;
+ topic->num_subscribers_max *= 2;
+ }
+
+ /* Don't ref sub here or we'll cause a reference cycle. */
+ topic->subscribers[topic->num_subscribers_current++] = sub;
+ return 0;
+}
+
+/*!
+ * \private
+ * \brief Information needed to dispatch a message to a subscription
+ */
+struct dispatch {
+ /*! Topic message was published to */
+ struct stasis_topic *topic;
+ /*! The message itself */
+ struct stasis_message *message;
+ /*! Subscription receiving the message */
+ struct stasis_subscription *sub;
+};
+
+static void dispatch_dtor(void *data)
+{
+ struct dispatch *dispatch = data;
+ ao2_cleanup(dispatch->topic);
+ ao2_cleanup(dispatch->message);
+ ao2_cleanup(dispatch->sub);
+}
+
+static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+{
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ ast_assert(topic != NULL);
+ ast_assert(message != NULL);
+ ast_assert(sub != NULL);
+
+ dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
+ if (!dispatch) {
+ return NULL;
+ }
+
+ dispatch->topic = topic;
+ ao2_ref(topic, +1);
+
+ dispatch->message = message;
+ ao2_ref(message, +1);
+
+ dispatch->sub = sub;
+ ao2_ref(sub, +1);
+
+ ao2_ref(dispatch, +1);
+ return dispatch;
+}
+
+/*!
+ * \brief Dispatch a message to a subscriber
+ * \param data \ref dispatch object
+ * \return 0
+ */
+static int dispatch_exec(void *data)
+{
+ RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
+
+ /* Since sub->topic doesn't change, no need to lock sub */
+ ast_assert(dispatch->sub->topic != NULL);
+ ao2_ref(dispatch->sub->topic, +1);
+ sub_topic = dispatch->sub->topic;
+
+ dispatch->sub->callback(dispatch->sub->data,
+ dispatch->sub,
+ sub_topic,
+ dispatch->message);
+
+ return 0;
+}
+
+void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+ struct stasis_subscription **subscribers = NULL;
+ size_t num_subscribers, i;
+
+ /* Copy the subscribers, so we don't have to hold the mutex for long */
+ {
+ SCOPED_AO2LOCK(lock, topic);
+ num_subscribers = topic->num_subscribers_current;
+ subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
+ if (subscribers) {
+ for (i = 0; i < num_subscribers; ++i) {
+ ao2_ref(topic->subscribers[i], +1);
+ subscribers[i] = topic->subscribers[i];
+ }
+ }
+ }
+
+ if (!subscribers) {
+ ast_log(LOG_ERROR, "Dropping message\n");
+ return;
+ }
+
+ for (i = 0; i < num_subscribers; ++i) {
+ struct stasis_subscription *sub = subscribers[i];
+
+ ast_assert(sub != NULL);
+
+ if (sub->mailbox) {
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ break;
+ }
+
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+ dispatch = NULL; /* Ownership transferred to mailbox */
+ }
+ } else {
+ /* No mailbox; dispatch directly */
+ sub->callback(sub->data, sub, sub->topic, message);
+ }
+ }
+
+ for (i = 0; i < num_subscribers; ++i) {
+ ao2_cleanup(subscribers[i]);
+ }
+ ast_free(subscribers);
+}
+
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+ stasis_forward_message(topic, topic, message);
+}
+
+/*! \brief Forwarding subscriber */
+static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct stasis_topic *to_topic = data;
+ stasis_forward_message(to_topic, topic, message);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(to_topic);
+ }
+}
+
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+{
+ struct stasis_subscription *sub;
+ if (!from_topic || !to_topic) {
+ return NULL;
+ }
+ /* Subscribe without a mailbox, since we're just forwarding messages */
+ sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+ if (sub) {
+ /* hold a ref to to_topic for this forwarding subscription */
+ ao2_ref(to_topic, +1);
+ }
+ return sub;
+}
+
+static void subscription_change_dtor(void *obj)
+{
+ struct stasis_subscription_change *change = obj;
+ ast_string_field_free_memory(change);
+ ao2_cleanup(change->topic);
+}
+
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+
+ change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
+ if (ast_string_field_init(change, 128)) {
+ return NULL;
+ }
+
+ ast_string_field_set(change, uniqueid, uniqueid);
+ ast_string_field_set(change, description, description);
+ ao2_ref(topic, +1);
+ change->topic = topic;
+
+ ao2_ref(change, +1);
+ return change;
+}
+
+struct stasis_message_type *stasis_subscription_change(void)
+{
+ return __subscription_change_message_type;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ change = subscription_change_alloc(topic, uniqueid, description);
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+}
+
+/*! \brief Cleanup function */
+static void stasis_exit(void)
+{
+ ao2_cleanup(__subscription_change_message_type);
+ __subscription_change_message_type = NULL;
+ ast_threadpool_shutdown(pool);
+ pool = NULL;
+}
+
+int stasis_init(void)
+{
+ int cache_init;
+
+ /* XXX Should this be configurable? */
+ struct ast_threadpool_options opts = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 20,
+ .auto_increment = 1,
+ .initial_size = 0,
+ .max_size = 200
+ };
+
+ ast_register_atexit(stasis_exit);
+
+ if (pool) {
+ ast_log(LOG_ERROR, "Stasis double-initialized\n");
+ return -1;
+ }
+
+ pool = ast_threadpool_create("stasis-core", NULL, &opts);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
+ return -1;
+ }
+
+ cache_init = stasis_cache_init();
+ if (cache_init != 0) {
+ return -1;
+ }
+
+ __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
+ if (!__subscription_change_message_type) {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
new file mode 100644
index 000000000..2f4cf52fd
--- /dev/null
+++ b/main/stasis_cache.c
@@ -0,0 +1,443 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/hashtab.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+#ifdef LOW_MEMORY
+#define NUM_CACHE_BUCKETS 17
+#else
+#define NUM_CACHE_BUCKETS 563
+#endif
+
+struct stasis_caching_topic {
+ struct ao2_container *cache;
+ struct stasis_topic *topic;
+ struct stasis_subscription *sub;
+ snapshot_get_id id_fn;
+};
+
+static void stasis_caching_topic_dtor(void *obj) {
+ struct stasis_caching_topic *caching_topic = obj;
+ ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+ caching_topic->sub = NULL;
+ ao2_cleanup(caching_topic->cache);
+ caching_topic->cache = NULL;
+ ao2_cleanup(caching_topic->topic);
+ caching_topic->topic = NULL;
+}
+
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
+{
+ return caching_topic->topic;
+}
+
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
+{
+ if (caching_topic) {
+ if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+ stasis_unsubscribe(caching_topic->sub);
+ } else {
+ ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
+ }
+ }
+}
+
+struct cache_entry {
+ struct stasis_message_type *type;
+ char *id;
+ struct stasis_message *snapshot;
+};
+
+static void cache_entry_dtor(void *obj)
+{
+ struct cache_entry *entry = obj;
+ ao2_cleanup(entry->type);
+ entry->type = NULL;
+ ast_free(entry->id);
+ entry->id = NULL;
+ ao2_cleanup(entry->snapshot);
+ entry->snapshot = NULL;
+}
+
+static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
+{
+ RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
+
+ ast_assert(type != NULL);
+ ast_assert(id != NULL);
+
+ entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
+ if (!entry) {
+ return NULL;
+ }
+
+ entry->id = ast_strdup(id);
+ if (!entry->id) {
+ return NULL;
+ }
+
+ ao2_ref(type, +1);
+ entry->type = type;
+ if (snapshot != NULL) {
+ ao2_ref(snapshot, +1);
+ entry->snapshot = snapshot;
+ }
+
+ ao2_ref(entry, +1);
+ return entry;
+}
+
+static int cache_entry_hash(const void *obj, int flags)
+{
+ const struct cache_entry *entry = obj;
+ int hash = 0;
+
+ ast_assert(!(flags & OBJ_KEY));
+
+ hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
+ hash += ast_hashtab_hash_string(entry->id);
+ return hash;
+}
+
+static int cache_entry_cmp(void *obj, void *arg, int flags)
+{
+ const struct cache_entry *left = obj;
+ const struct cache_entry *right = arg;
+
+ ast_assert(!(flags & OBJ_KEY));
+
+ if (left->type == right->type && strcmp(left->id, right->id) == 0) {
+ return CMP_MATCH | CMP_STOP;
+ }
+
+ return 0;
+}
+
+static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+{
+ RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
+ RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+ struct stasis_message *old_snapshot = NULL;
+
+ ast_assert(caching_topic->cache != NULL);
+
+ new_entry = cache_entry_create(type, id, new_snapshot);
+
+ if (new_snapshot == NULL) {
+ /* Remove entry from cache */
+ cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
+ if (cached_entry) {
+ old_snapshot = cached_entry->snapshot;
+ cached_entry->snapshot = NULL;
+ }
+ } else {
+ /* Insert/update cache */
+ SCOPED_AO2LOCK(lock, caching_topic->cache);
+
+ cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
+ if (cached_entry) {
+ /* Update cache. Because objects are moving, no need to update refcounts. */
+ old_snapshot = cached_entry->snapshot;
+ cached_entry->snapshot = new_entry->snapshot;
+ new_entry->snapshot = NULL;
+ } else {
+ /* Insert into the cache */
+ ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
+ }
+
+ }
+
+ return old_snapshot;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
+{
+ RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
+ RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+
+ ast_assert(caching_topic->cache != NULL);
+
+ search_entry = cache_entry_create(type, id, NULL);
+ if (search_entry == NULL) {
+ return NULL;
+ }
+
+ cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
+ if (cached_entry == NULL) {
+ return NULL;
+ }
+
+ ast_assert(cached_entry->snapshot != NULL);
+ ao2_ref(cached_entry->snapshot, +1);
+ return cached_entry->snapshot;
+}
+
+static struct stasis_message_type *__cache_clear_data;
+
+static struct stasis_message_type *cache_clear_data(void)
+{
+ ast_assert(__cache_clear_data != NULL);
+ return __cache_clear_data;
+}
+
+static struct stasis_message_type *__cache_update;
+
+struct stasis_message_type *stasis_cache_update(void)
+{
+ ast_assert(__cache_update != NULL);
+ return __cache_update;
+}
+
+struct cache_clear_data {
+ struct stasis_message_type *type;
+ char *id;
+};
+
+static void cache_clear_data_dtor(void *obj)
+{
+ struct cache_clear_data *ev = obj;
+ ast_free(ev->id);
+ ev->id = NULL;
+ ao2_cleanup(ev->type);
+ ev->type = NULL;
+}
+
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
+{
+ RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
+ if (!ev) {
+ return NULL;
+ }
+
+ ev->id = ast_strdup(id);
+ if (!ev->id) {
+ return NULL;
+ }
+ ao2_ref(type, +1);
+ ev->type = type;
+
+ msg = stasis_message_create(cache_clear_data(), ev);
+
+ if (!msg) {
+ return NULL;
+ }
+
+ ao2_ref(msg, +1);
+ return msg;
+}
+
+static void stasis_cache_update_dtor(void *obj)
+{
+ struct stasis_cache_update *update = obj;
+ ao2_cleanup(update->topic);
+ update->topic = NULL;
+ ao2_cleanup(update->old_snapshot);
+ update->old_snapshot = NULL;
+ ao2_cleanup(update->new_snapshot);
+ update->new_snapshot = NULL;
+ ao2_cleanup(update->type);
+ update->type = NULL;
+}
+
+static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+{
+ RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ast_assert(topic != NULL);
+ ast_assert(old_snapshot != NULL || new_snapshot != NULL);
+
+ update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
+ if (!update) {
+ return NULL;
+ }
+
+ ao2_ref(topic, +1);
+ update->topic = topic;
+ if (old_snapshot) {
+ ao2_ref(old_snapshot, +1);
+ update->old_snapshot = old_snapshot;
+ if (!new_snapshot) {
+ ao2_ref(stasis_message_type(old_snapshot), +1);
+ update->type = stasis_message_type(old_snapshot);
+ }
+ }
+ if (new_snapshot) {
+ ao2_ref(new_snapshot, +1);
+ update->new_snapshot = new_snapshot;
+ ao2_ref(stasis_message_type(new_snapshot), +1);
+ update->type = stasis_message_type(new_snapshot);
+ }
+
+ msg = stasis_message_create(stasis_cache_update(), update);
+ if (!msg) {
+ return NULL;
+ }
+
+ ao2_ref(msg, +1);
+ return msg;
+}
+
+static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
+ struct stasis_caching_topic *caching_topic = data;
+ const char *id = NULL;
+
+ ast_assert(caching_topic->topic != NULL);
+ ast_assert(caching_topic->id_fn != NULL);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ caching_topic_needs_unref = caching_topic;
+ }
+
+ /* Handle cache clear event */
+ if (cache_clear_data() == stasis_message_type(message)) {
+ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+ struct cache_clear_data *clear = stasis_message_data(message);
+ ast_assert(clear->type != NULL);
+ ast_assert(clear->id != NULL);
+ old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
+ if (old_snapshot) {
+ update = update_create(topic, old_snapshot, NULL);
+ stasis_publish(caching_topic->topic, update);
+ } else {
+ ast_log(LOG_ERROR,
+ "Attempting to remove an item from the cache that isn't there: %s %s\n",
+ stasis_message_type_name(clear->type), clear->id);
+ }
+ return;
+ }
+
+ id = caching_topic->id_fn(message);
+ if (id == NULL) {
+ /* Object isn't cached; forward */
+ stasis_forward_message(caching_topic->topic, topic, message);
+ } else {
+ /* Update the cache */
+ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+
+ old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
+
+ update = update_create(topic, old_snapshot, message);
+ if (update == NULL) {
+ return;
+ }
+
+ stasis_publish(caching_topic->topic, update);
+ }
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(caching_topic);
+ }
+}
+
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ struct stasis_subscription *sub;
+ RAII_VAR(char *, new_name, NULL, free);
+ int ret;
+
+ ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
+ if (ret < 0) {
+ return NULL;
+ }
+
+ caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
+ if (caching_topic == NULL) {
+ return NULL;
+ }
+
+ caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
+ if (!caching_topic->cache) {
+ ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
+ return NULL;
+ }
+
+ caching_topic->topic = stasis_topic_create(new_name);
+ if (caching_topic->topic == NULL) {
+ return NULL;
+ }
+
+ caching_topic->id_fn = id_fn;
+
+ sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+ if (sub == NULL) {
+ return NULL;
+ }
+ /* This is for the reference contained in the subscription above */
+ ao2_ref(caching_topic, +1);
+ caching_topic->sub = sub;
+
+ ao2_ref(caching_topic, +1);
+ return caching_topic;
+}
+
+static void stasis_cache_exit(void)
+{
+ ao2_cleanup(__cache_clear_data);
+ __cache_clear_data = NULL;
+ ao2_cleanup(__cache_update);
+ __cache_update = NULL;
+}
+
+int stasis_cache_init(void)
+{
+ ast_register_atexit(stasis_cache_exit);
+
+ if (__cache_clear_data || __cache_update) {
+ ast_log(LOG_ERROR, "Stasis cache double initialized\n");
+ return -1;
+ }
+
+ __cache_update = stasis_message_type_create("stasis_cache_update");
+ if (!__cache_update) {
+ return -1;
+ }
+
+ __cache_clear_data = stasis_message_type_create("StasisCacheClear");
+ if (!__cache_clear_data) {
+ return -1;
+ }
+ return 0;
+}
+
diff --git a/main/stasis_message.c b/main/stasis_message.c
new file mode 100644
index 000000000..8d397b935
--- /dev/null
+++ b/main/stasis_message.c
@@ -0,0 +1,135 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+/*! \private */
+struct stasis_message_type {
+ char *name;
+};
+
+static void message_type_dtor(void *obj)
+{
+ struct stasis_message_type *type = obj;
+ ast_free(type->name);
+ type->name = NULL;
+}
+
+struct stasis_message_type *stasis_message_type_create(const char *name)
+{
+ RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+
+ type = ao2_alloc(sizeof(*type), message_type_dtor);
+ if (!type) {
+ return NULL;
+ }
+
+ type->name = ast_strdup(name);
+ if (!type->name) {
+ return NULL;
+ }
+
+ ao2_ref(type, +1);
+ return type;
+}
+
+const char *stasis_message_type_name(const struct stasis_message_type *type)
+{
+ return type->name;
+}
+
+/*! \private */
+struct stasis_message {
+ /*! Time the message was created */
+ struct timeval timestamp;
+ /*! Type of the message */
+ struct stasis_message_type *type;
+ /*! Message content */
+ void *data;
+};
+
+static void stasis_message_dtor(void *obj)
+{
+ struct stasis_message *message = obj;
+ ao2_cleanup(message->type);
+ ao2_cleanup(message->data);
+}
+
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ if (type == NULL || data == NULL) {
+ return NULL;
+ }
+
+ message = ao2_alloc(sizeof(*message), stasis_message_dtor);
+ if (message == NULL) {
+ return NULL;
+ }
+
+ message->timestamp = ast_tvnow();
+ ao2_ref(type, +1);
+ message->type = type;
+ ao2_ref(data, +1);
+ message->data = data;
+
+ ao2_ref(message, +1);
+ return message;
+}
+
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return msg->type;
+}
+
+void *stasis_message_data(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return msg->data;
+}
+
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return &msg->timestamp;
+}
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
new file mode 100644
index 000000000..b05264106
--- /dev/null
+++ b/tests/test_stasis.c
@@ -0,0 +1,683 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file \brief Test Stasis message bus.
+ *
+ * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
+ *
+ * \ingroup tests
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/module.h"
+#include "asterisk/stasis.h"
+#include "asterisk/test.h"
+
+static const char *test_category = "/stasis/core/";
+
+AST_TEST_DEFINE(message_type)
+{
+ RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test basic message_type functions";
+ info->description = "Test basic message_type functions";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ ast_test_validate(test, NULL == stasis_message_type_create(NULL));
+ uut = stasis_message_type_create("SomeMessage");
+ ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(message)
+{
+ RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
+ RAII_VAR(char *, data, NULL, ao2_cleanup);
+ char *expected = "SomeData";
+ struct timeval expected_timestamp;
+ struct timeval time_diff;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test basic message functions";
+ info->description = "Test basic message functions";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+
+ type = stasis_message_type_create("SomeMessage");
+
+ ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
+ ast_test_validate(test, NULL == stasis_message_create(type, NULL));
+
+ data = ao2_alloc(strlen(expected) + 1, NULL);
+ strcpy(data, expected);
+ expected_timestamp = ast_tvnow();
+ uut = stasis_message_create(type, data);
+
+ ast_test_validate(test, NULL != uut);
+ ast_test_validate(test, type == stasis_message_type(uut));
+ ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
+ ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
+
+ time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
+ /* 10ms is certainly long enough for the two calls to complete */
+ ast_test_validate(test, time_diff.tv_sec == 0);
+ ast_test_validate(test, time_diff.tv_usec < 10000);
+
+ ao2_ref(uut, -1);
+ uut = NULL;
+ ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
+
+ return AST_TEST_PASS;
+}
+
+struct consumer {
+ ast_mutex_t lock;
+ ast_cond_t out;
+ struct stasis_message **messages_rxed;
+ size_t messages_rxed_len;
+ int ignore_subscriptions;
+ int complete;
+};
+
+static void consumer_dtor(void *obj) {
+ struct consumer *consumer = obj;
+
+ ast_mutex_destroy(&consumer->lock);
+ ast_cond_destroy(&consumer->out);
+
+ while (consumer->messages_rxed_len > 0) {
+ ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
+ }
+ ast_free(consumer->messages_rxed);
+ consumer->messages_rxed = NULL;
+}
+
+static struct consumer *consumer_create(int ignore_subscriptions) {
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+ consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
+
+ if (!consumer) {
+ return NULL;
+ }
+
+ consumer->ignore_subscriptions = ignore_subscriptions;
+ consumer->messages_rxed = ast_malloc(0);
+ if (!consumer->messages_rxed) {
+ return NULL;
+ }
+
+ ast_mutex_init(&consumer->lock);
+ ast_cond_init(&consumer->out, NULL);
+
+ ao2_ref(consumer, +1);
+ return consumer;
+}
+
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct consumer *consumer = data;
+ RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
+ SCOPED_MUTEX(lock, &consumer->lock);
+
+ if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change()) {
+
+ ++consumer->messages_rxed_len;
+ consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
+ ast_assert(consumer->messages_rxed != NULL);
+ consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
+ ao2_ref(message, +1);
+ }
+
+ if (stasis_subscription_final_message(sub, message)) {
+ consumer->complete = 1;
+ consumer_needs_cleanup = consumer;
+ }
+
+ ast_cond_signal(&consumer->out);
+}
+
+static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 30,
+ .tv_nsec = start.tv_usec * 1000
+ };
+
+ SCOPED_MUTEX(lock, &consumer->lock);
+
+ while (consumer->messages_rxed_len < expected_len) {
+ int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+ if (r == ETIMEDOUT) {
+ break;
+ }
+ ast_assert(r == 0); /* Not expecting any othet types of errors */
+ }
+ return consumer->messages_rxed_len;
+}
+
+static int consumer_wait_for_completion(struct consumer *consumer)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 30,
+ .tv_nsec = start.tv_usec * 1000
+ };
+
+ SCOPED_MUTEX(lock, &consumer->lock);
+
+ while (!consumer->complete) {
+ int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+ if (r == ETIMEDOUT) {
+ break;
+ }
+ ast_assert(r == 0); /* Not expecting any othet types of errors */
+ }
+ return consumer->complete;
+}
+
+static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
+{
+ struct timeval start = ast_tvnow();
+ struct timeval diff = {
+ .tv_sec = 0,
+ .tv_usec = 100000 /* wait for 100ms */
+ };
+ struct timeval end_tv = ast_tvadd(start, diff);
+ struct timespec end = {
+ .tv_sec = end_tv.tv_sec,
+ .tv_nsec = end_tv.tv_usec * 1000
+ };
+
+ SCOPED_MUTEX(lock, &consumer->lock);
+
+ while (consumer->messages_rxed_len == expected_len) {
+ int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+ if (r == ETIMEDOUT) {
+ break;
+ }
+ ast_assert(r == 0); /* Not expecting any othet types of errors */
+ }
+ return consumer->messages_rxed_len;
+}
+
+AST_TEST_DEFINE(subscription_messages)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
+ int complete;
+ struct stasis_subscription_change *change;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test subscribe/unsubscribe messages";
+ info->description = "Test subscribe/unsubscribe messages";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(0);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+ expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
+
+ stasis_unsubscribe(uut);
+ uut = NULL;
+ complete = consumer_wait_for_completion(consumer);
+ ast_test_validate(test, 1 == complete);
+
+ ast_test_validate(test, 2 == consumer->messages_rxed_len);
+ ast_test_validate(test, stasis_subscription_change() == stasis_message_type(consumer->messages_rxed[0]));
+ ast_test_validate(test, stasis_subscription_change() == stasis_message_type(consumer->messages_rxed[1]));
+
+ change = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, topic == change->topic);
+ ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
+ ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+ change = stasis_message_data(consumer->messages_rxed[1]);
+ ast_test_validate(test, topic == change->topic);
+ ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
+ ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(publish)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ int actual_len;
+ const char *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test simple subscriptions";
+ info->description = "Test simple subscriptions";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage");
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish(topic, test_message);
+
+ actual_len = consumer_wait_for(consumer, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, test_data == actual);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(unsubscribe_stops_messages)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ int actual_len;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test simple subscriptions";
+ info->description = "Test simple subscriptions";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+
+ stasis_unsubscribe(uut);
+ uut = NULL;
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage");
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish(topic, test_message);
+
+ actual_len = consumer_should_stay(consumer, 0);
+ ast_test_validate(test, 0 == actual_len);
+
+ return AST_TEST_PASS;
+}
+
+
+AST_TEST_DEFINE(forward)
+{
+ RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+
+ RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ int actual_len;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test sending events to a parent topic";
+ info->description = "Test sending events to a parent topic.\n"
+ "This test creates three topics (one parent, two children)\n"
+ "and publishes a message to one child, and verifies it's\n"
+ "only seen by that child and the parent";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ parent_topic = stasis_topic_create("ParentTestTopic");
+ ast_test_validate(test, NULL != parent_topic);
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ forward_sub = stasis_forward_all(topic, parent_topic);
+ ast_test_validate(test, NULL != forward_sub);
+
+ parent_consumer = consumer_create(1);
+ ast_test_validate(test, NULL != parent_consumer);
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
+ ast_test_validate(test, NULL != parent_sub);
+ ao2_ref(parent_consumer, +1);
+ sub = stasis_subscribe(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != sub);
+ ao2_ref(consumer, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage");
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish(topic, test_message);
+
+ actual_len = consumer_wait_for(consumer, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(parent_consumer, 1);
+ ast_test_validate(test, 1 == actual_len);
+
+ return AST_TEST_PASS;
+}
+
+struct cache_test_data {
+ char *id;
+ char *value;
+};
+
+static void cache_test_data_dtor(void *obj)
+{
+ struct cache_test_data *data = obj;
+ ast_free(data->id);
+ ast_free(data->value);
+}
+
+static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
+{
+ RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
+
+ data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
+ if (data == NULL) {
+ return NULL;
+ }
+
+ ast_assert(name != NULL);
+ ast_assert(value != NULL);
+
+ data->id = ast_strdup(name);
+ data->value = ast_strdup(value);
+ if (!data->id || !data->value) {
+ return NULL;
+ }
+
+ return stasis_message_create(type, data);
+}
+
+static const char *cache_test_data_id(struct stasis_message *message) {
+ struct cache_test_data *cachable = stasis_message_data(message);
+
+ if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
+ return NULL;
+ }
+ return cachable->id;
+}
+
+AST_TEST_DEFINE(cache_passthrough)
+{
+ RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ int actual_len;
+ struct stasis_message_type *actual_type;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test passing messages through cache topic unscathed.";
+ info->description = "Test passing messages through cache topic unscathed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ non_cache_type = stasis_message_type_create("NonCacheable");
+ ast_test_validate(test, NULL != non_cache_type);
+ topic = stasis_topic_create("SomeTopic");
+ ast_test_validate(test, NULL != topic);
+ caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+ ast_test_validate(test, NULL != caching_topic);
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+ sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
+ ast_test_validate(test, NULL != sub);
+ ao2_ref(consumer, +1);
+
+ test_message = cache_test_message_create(non_cache_type, "1", "1");
+ ast_test_validate(test, NULL != test_message);
+
+ stasis_publish(topic, test_message);
+
+ actual_len = consumer_wait_for(consumer, 1);
+ ast_test_validate(test, 1 == actual_len);
+
+ actual_type = stasis_message_type(consumer->messages_rxed[0]);
+ ast_test_validate(test, non_cache_type == actual_type);
+
+ ast_test_validate(test, test_message == consumer->messages_rxed[0]);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(cache)
+{
+ RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
+ int actual_len;
+ struct stasis_cache_update *actual_update;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test passing messages through cache topic unscathed.";
+ info->description = "Test passing messages through cache topic unscathed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ cache_type = stasis_message_type_create("Cacheable");
+ ast_test_validate(test, NULL != cache_type);
+ topic = stasis_topic_create("SomeTopic");
+ ast_test_validate(test, NULL != topic);
+ caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+ ast_test_validate(test, NULL != caching_topic);
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+ sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
+ ast_test_validate(test, NULL != sub);
+ ao2_ref(consumer, +1);
+
+ test_message1_1 = cache_test_message_create(cache_type, "1", "1");
+ ast_test_validate(test, NULL != test_message1_1);
+ test_message2_1 = cache_test_message_create(cache_type, "2", "1");
+ ast_test_validate(test, NULL != test_message2_1);
+
+ /* Post a couple of snapshots */
+ stasis_publish(topic, test_message1_1);
+ stasis_publish(topic, test_message2_1);
+ actual_len = consumer_wait_for(consumer, 2);
+ ast_test_validate(test, 2 == actual_len);
+
+ /* Check for new snapshot messages */
+ ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[0]));
+ actual_update = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, topic == actual_update->topic);
+ ast_test_validate(test, NULL == actual_update->old_snapshot);
+ ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
+ ast_test_validate(test, test_message1_1 == stasis_cache_get(caching_topic, cache_type, "1"));
+ /* stasis_cache_get returned a ref, so unref test_message1_1 */
+ ao2_ref(test_message1_1, -1);
+
+ ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[1]));
+ actual_update = stasis_message_data(consumer->messages_rxed[1]);
+ ast_test_validate(test, topic == actual_update->topic);
+ ast_test_validate(test, NULL == actual_update->old_snapshot);
+ ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
+ ast_test_validate(test, test_message2_1 == stasis_cache_get(caching_topic, cache_type, "2"));
+ /* stasis_cache_get returned a ref, so unref test_message2_1 */
+ ao2_ref(test_message2_1, -1);
+
+ /* Update snapshot 2 */
+ test_message2_2 = cache_test_message_create(cache_type, "2", "2");
+ ast_test_validate(test, NULL != test_message2_2);
+ stasis_publish(topic, test_message2_2);
+
+ actual_len = consumer_wait_for(consumer, 3);
+ ast_test_validate(test, 3 == actual_len);
+
+ actual_update = stasis_message_data(consumer->messages_rxed[2]);
+ ast_test_validate(test, topic == actual_update->topic);
+ ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
+ ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
+ ast_test_validate(test, test_message2_2 == stasis_cache_get(caching_topic, cache_type, "2"));
+ /* stasis_cache_get returned a ref, so unref test_message2_2 */
+ ao2_ref(test_message2_2, -1);
+
+ /* Clear snapshot 1 */
+ test_message1_clear = stasis_cache_clear_create(cache_type, "1");
+ ast_test_validate(test, NULL != test_message1_clear);
+ stasis_publish(topic, test_message1_clear);
+
+ actual_len = consumer_wait_for(consumer, 4);
+ ast_test_validate(test, 4 == actual_len);
+
+ actual_update = stasis_message_data(consumer->messages_rxed[3]);
+ ast_test_validate(test, topic == actual_update->topic);
+ ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
+ ast_test_validate(test, NULL == actual_update->new_snapshot);
+ ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1"));
+
+ return AST_TEST_PASS;
+}
+
+static int unload_module(void)
+{
+ AST_TEST_UNREGISTER(message_type);
+ AST_TEST_UNREGISTER(message);
+ AST_TEST_UNREGISTER(subscription_messages);
+ AST_TEST_UNREGISTER(publish);
+ AST_TEST_UNREGISTER(unsubscribe_stops_messages);
+ AST_TEST_UNREGISTER(forward);
+ AST_TEST_UNREGISTER(cache_passthrough);
+ AST_TEST_UNREGISTER(cache);
+ return 0;
+}
+
+static int load_module(void)
+{
+ AST_TEST_REGISTER(message_type);
+ AST_TEST_REGISTER(message);
+ AST_TEST_REGISTER(subscription_messages);
+ AST_TEST_REGISTER(publish);
+ AST_TEST_REGISTER(unsubscribe_stops_messages);
+ AST_TEST_REGISTER(forward);
+ AST_TEST_REGISTER(cache_passthrough);
+ AST_TEST_REGISTER(cache);
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
+ .load = load_module,
+ .unload = unload_module
+ );