diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/asterisk/channel.h | 117 | ||||
-rw-r--r-- | include/asterisk/channel_internal.h | 1 | ||||
-rw-r--r-- | include/asterisk/stasis.h | 506 |
3 files changed, 623 insertions, 1 deletions
diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h index f7e4d2d39..eee0828be 100644 --- a/include/asterisk/channel.h +++ b/include/asterisk/channel.h @@ -125,7 +125,6 @@ References: #include "asterisk/abstract_jb.h" #include "asterisk/astobj2.h" - #include "asterisk/poll-compat.h" #if defined(__cplusplus) || defined(c_plusplus) @@ -151,6 +150,7 @@ extern "C" { #include "asterisk/channelstate.h" #include "asterisk/ccss.h" #include "asterisk/framehook.h" +#include "asterisk/stasis.h" #define DATASTORE_INHERIT_FOREVER INT_MAX @@ -4102,4 +4102,119 @@ int ast_channel_dialed_causes_add(const struct ast_channel *chan, const struct a void ast_channel_dialed_causes_clear(const struct ast_channel *chan); struct ast_flags *ast_channel_flags(struct ast_channel *chan); + +/*! + * \since 12 + * \brief Structure representing a snapshot of channel state. + * + * While not enforced programmatically, this object is shared across multiple + * threads, and should be threated as an immutable object. + */ +struct ast_channel_snapshot { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(name); /*!< ASCII unique channel name */ + AST_STRING_FIELD(accountcode); /*!< Account code for billing */ + AST_STRING_FIELD(peeraccount); /*!< Peer account code for billing */ + AST_STRING_FIELD(userfield); /*!< Userfield for CEL billing */ + AST_STRING_FIELD(uniqueid); /*!< Unique Channel Identifier */ + AST_STRING_FIELD(linkedid); /*!< Linked Channel Identifier -- gets propagated by linkage */ + AST_STRING_FIELD(parkinglot); /*!< Default parking lot, if empty, default parking lot */ + AST_STRING_FIELD(hangupsource); /*!< Who is responsible for hanging up this channel */ + AST_STRING_FIELD(appl); /*!< Current application */ + AST_STRING_FIELD(data); /*!< Data passed to current application */ + AST_STRING_FIELD(context); /*!< Dialplan: Current extension context */ + AST_STRING_FIELD(exten); /*!< Dialplan: Current extension number */ + AST_STRING_FIELD(caller_name); /*!< Caller ID Name */ + AST_STRING_FIELD(caller_number); /*!< Caller ID Number */ + AST_STRING_FIELD(connected_name); /*!< Connected Line Name */ + AST_STRING_FIELD(connected_number); /*!< Connected Line Number */ + ); + + struct timeval creationtime; /*!< The time of channel creation */ + enum ast_channel_state state; /*!< State of line */ + int priority; /*!< Dialplan: Current extension priority */ + int amaflags; /*!< AMA flags for billing */ + int hangupcause; /*!< Why is the channel hanged up. See causes.h */ + struct ast_flags flags; /*!< channel flags of AST_FLAG_ type */ +}; + +/*! + * \since 12 + * \brief Generate a snapshot of the channel state. This is an ao2 object, so + * ao2_cleanup() to deallocate. + * + * \param chan The channel from which to generate a snapshot + * + * \retval pointer on success (must be ast_freed) + * \retval NULL on error + */ +struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan); + +/*! + * \since 12 + * \brief Message type for \ref ast_channel_snapshot. + * + * \retval Message type for \ref ast_channel_snapshot. + */ +struct stasis_message_type *ast_channel_snapshot(void); + +/*! + * \since 12 + * \brief A topic which publishes the events for a particular channel. + * + * \param chan Channel. + * + * \retval Topic for channel's events. + * \retval \c NULL if \a chan is \c NULL. + */ +struct stasis_topic *ast_channel_topic(struct ast_channel *chan); + +/*! + * \since 12 + * \brief A topic which publishes the events for all channels. + * \retval Topic for all channel events. + */ +struct stasis_topic *ast_channel_topic_all(void); + +/*! + * \since 12 + * \brief A caching topic which caches \ref ast_channel_snapshot messages from + * ast_channel_events_all(void). + * + * \retval Topic for all channel events. + */ +struct stasis_caching_topic *ast_channel_topic_all_cached(void); + +/*! + * \since 12 + * \brief Variable set event. + */ +struct ast_channel_varset { + /*! Channel variable was set on (or NULL for global variable) */ + struct ast_channel_snapshot *snapshot; + /*! Variable name */ + char *variable; + /*! New value */ + char *value; +}; + +/*! + * \since 12 + * \brief Message type for \ref ast_channel_varset messages. + * + * \retval Message type for \ref ast_channel_varset messages. + */ +struct stasis_message_type *ast_channel_varset(void); + +/*! + * \since 12 + * \brief Publish a \ref ast_channel_varset for a channel. + * + * \param chan Channel to pulish the event for, or \c NULL for 'none'. + * \param variable Name of the variable being set + * \param value Value. + */ +void ast_channel_publish_varset(struct ast_channel *chan, + const char *variable, const char *value); + #endif /* _ASTERISK_CHANNEL_H */ diff --git a/include/asterisk/channel_internal.h b/include/asterisk/channel_internal.h index 1b01fe042..38776c1e1 100644 --- a/include/asterisk/channel_internal.h +++ b/include/asterisk/channel_internal.h @@ -23,4 +23,5 @@ struct ast_channel *__ast_channel_internal_alloc(void (*destructor)(void *obj), void ast_channel_internal_finalize(struct ast_channel *chan); int ast_channel_internal_is_finalized(struct ast_channel *chan); void ast_channel_internal_cleanup(struct ast_channel *chan); +void ast_channel_internal_setup_topics(struct ast_channel *chan); diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h new file mode 100644 index 000000000..a6ab4212f --- /dev/null +++ b/include/asterisk/stasis.h @@ -0,0 +1,506 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_STASIS_H +#define _ASTERISK_STASIS_H + +/*! \file + * + * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for + * detailed documentation. + * + * \author David M. Lee, II <dlee@digium.com> + * \since 12 + * + * \page stasis Stasis Message Bus API + * + * \par Intro + * + * The Stasis Message Bus is a loosely typed mechanism for distributing messages + * within Asterisk. It is designed to be: + * - Loosely coupled; new message types can be added in seperate modules. + * - Easy to use; publishing and subscribing are straightforward operations. + * - Consistent memory management; all message bus objects are AO2 managed + * objects, using ao2_ref() and ao2_cleanup() to manage the reference + * counting. + * + * There are three main concepts for using the Stasis Message Bus: + * - \ref stasis_message + * - \ref stasis_topic + * - \ref stasis_subscription + * + * \par stasis_message + * + * Central to the Stasis Message Bus is the \ref stasis_message, the messages + * that are sent on the bus. These messages have: + * - a type (as defined by a \ref stasis_message_type) + * - a value - a \c void pointer to an AO2 object + * - a timestamp when it was created + * + * Once a \ref stasis_message has been created, it is immutable and cannot + * change. The same goes for the value of the message (although this cannot be + * enforced in code). Messages themselves are reference-counted, AO2 objects, + * along with their values. By being both reference counted and immutable, + * messages can be shared throughout the system without any concerns for + * threading. (Well, the objects must be allocated with \ref + * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread + * safe. But other than that, no worries). + * + * The type of a message is defined by an instance of \ref stasis_message_type, + * which can be created by calling stasis_message_type_create(). Message types + * are named, which is useful in debugging. It is recommended that the string + * name for a message type match the name of the struct that's stored in the + * message. For example, name for \ref stasis_cache_update's message type is \c + * "stasis_cache_update". + * + * \par stasis_topic + * + * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be + * subscribed, and \ref stasis_message's may be published. Any message published + * to the topic is dispatched to all of its subscribers. The topic itself may be + * named, which is useful in debugging. + * + * Topics themselves are reference counted objects, and automagically + * unsubscribe all of their subscribers when they are destroyed. Topics are also + * thread safe, so no worries about publishing/subscribing/unsubscribing to a + * topic concurrently from multiple threads. It's also designed to handle the + * case of unsubscribing from a topic from within the subscription handler. + * + * \par Forwarding + * + * There is one special case of topics that's worth noting: forwarding + * messages. It's a fairly common use case to want to forward all the messages + * published on one topic to another one (for example, an aggregator topic that + * publishes all the events from a set of other topics). This can be + * accomplished easily using stasis_forward_all(). This sets up the forwarding + * between the two topics, and returns a \ref stasis_subscription, which can be + * unsubscribed to stop the forwarding. + * + * \par Caching + * + * Another common use case is to want to cache certain messages that are + * published on the bus. Usually these events are snapshots of the current state + * in the system, and it's desirable to query that state from the cache without + * locking the original object. It's also desirable for subscribers of the + * caching topic to receive messages that have both the old cache value and the + * new value being put into the cache. For this, we have + * stasis_caching_topic_create(), providing it with the topic which publishes + * the messages that you wish to cache, and a function that can identify + * cacheable messages. + * + * The returned \ref stasis_caching_topic provides a topic that forwards + * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref + * stasis_cache_update message which provides the old snapshot (or \c NULL if + * this is a new cache entry), and the new snapshot (or \c NULL if the entry was + * removed from the cache). A stasis_cache_clear_create() message must be sent + * to the topic in order to remove entries from the cache. + * + * As with all things Stasis, the \ref stasis_caching_topic is a reference + * counted AO2 object. + * + * \par stasis_subscriber + * + * Any topic may be subscribed to by simply providing stasis_subscribe() the + * \ref stasis_topic to subscribe to, a handler function and \c void pointer to + * data that is passed back to the handler. Invocations on the subscription's + * handler are serialized, but different invocations may occur on different + * threads (this usually isn't important unless you use thread locals or + * something similar). + * + * Since the topic (by necessity) holds a reference to the subscription, + * reference counting alone is insufficient to terminate a subscription. In + * order to stop receiving messages, call stasis_unsubscribe() with your \ref + * stasis_subscription. This will remove the topic's reference to the + * subscription, and allow it to be destroyed when all of the other references + * are cleaned up. + */ + +#include "asterisk/utils.h" + +/*! @{ */ + +/*! + * \brief Metadata about a \ref stasis_message. + * \since 12 + */ +struct stasis_message_type; + +/*! + * \brief Register a new message type. + * + * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done + * with it. + * + * \param name Name of the new type. + * \return Pointer to the new type. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_message_type *stasis_message_type_create(const char *name); + +/*! + * \brief Gets the name of a given message type + * \param type The type to get. + * \return Name of the type. + * \return \c NULL if \a type is \c NULL. + * \since 12 + */ +const char *stasis_message_type_name(const struct stasis_message_type *type); + +/*! + * \brief Opaque type for a Stasis message. + * \since 12 + */ +struct stasis_message; + +/*! + * \brief Create a new message. + * + * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done + * with it. Messages are also immutable, and must not be modified after they + * are initialized. Especially the \a data in the message. + * + * \param type Type of the message + * \param data Immutable data that is the actual contents of the message + * \return New message + * \return \c NULL on error + * \since 12 + */ +struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data); + +/*! + * \brief Get the message type for a \ref stasis_message. + * \param msg Message to type + * \return Type of \a msg + * \return \c NULL if \a msg is \c NULL. + * \since 12 + */ +struct stasis_message_type *stasis_message_type(const struct stasis_message *msg); + +/*! + * \brief Get the data contained in a message. + * \param msg Message. + * \return Immutable data pointer + * \return \c NULL if msg is \c NULL. + * \since 12 + */ +void *stasis_message_data(const struct stasis_message *msg); + +/*! + * \brief Get the time when a message was created. + * \param msg Message. + * \return Pointer to the \a timeval when the message was created. + * \return \c NULL if msg is \c NULL. + * \since 12 + */ +const struct timeval *stasis_message_timestamp(const struct stasis_message *msg); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief A topic to which messages may be posted, and subscribers, well, subscribe + * \since 12 + */ +struct stasis_topic; + +/*! + * \brief Create a new topic. + * \param name Name of the new topic. + * \return New topic instance. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_topic *stasis_topic_create(const char *name); + +/*! + * \brief Return the name of a topic. + * \param topic Topic. + * \return Name of the topic. + * \return \c NULL if topic is \c NULL. + * \since 12 + */ +const char *stasis_topic_name(const struct stasis_topic *topic); + +/*! + * \brief Publish a message to a topic's subscribers. + * \param topic Topic. + * \param message Message to publish. + * \since 12 + */ +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message); + +/*! + * \brief Publish a message from a specified topic to all the subscribers of a + * possibly different topic. + * \param topic Topic to publish message to. + * \param topic Original topic message was from. + * \param message Message + * \since 12 + */ +void stasis_forward_message(struct stasis_topic *topic, + struct stasis_topic *publisher_topic, + struct stasis_message *message); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief Opaque type for a Stasis subscription. + * \since 12 + */ +struct stasis_subscription; + +/*! + * \brief Callback function type for Stasis subscriptions. + * \param data Data field provided with subscription. + * \param topic Topic to which the message was published. + * \param message Published message. + * \since 12 + */ +typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); + +/*! + * \brief Create a subscription. + * + * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free + * up this reference), the subscription must be explicitly unsubscribed from its + * topic using stasis_unsubscribe(). + * + * The invocations of the callback are serialized, but may not always occur on + * the same thread. The invocation order of different subscriptions is + * unspecified. + * + * \param topic Topic to subscribe to. + * \param callback Callback function for subscription messages. + * \param data Data to be passed to the callback, in addition to the message. + * \return New \ref stasis_subscription object. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data); + +/*! + * \brief Cancel a subscription. + * + * Note that in an asynchronous system, there may still be messages queued or + * in transit to the subscription's callback. These will still be delivered. + * There will be a final 'SubscriptionCancelled' message, indicating the + * delivery of the final message. + * + * \param subscription Subscription to cancel. + * \since 12 + */ +void stasis_unsubscribe(struct stasis_subscription *subscription); + +/*! + * \brief Create a subscription which forwards all messages from one topic to + * another. + * + * Note that the \a topic parameter of the invoked callback will the be \a topic + * the message was sent to, not the topic the subscriber subscribed to. + * + * \param from_topic Topic to forward. + * \param to_topic Destination topic of forwarded messages. + * \return New forwarding subscription. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic); + +/*! + * \brief Get the unique ID for the subscription. + * + * \param sub Subscription for which to get the unique ID. + * \return Unique ID for the subscription. + * \since 12 + */ +const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub); + +/*! + * \brief Returns whether a subscription is currently subscribed. + * + * Note that there may still be messages queued up to be dispatched to this + * subscription, but the stasis_subscription_final_message() has been enqueued. + * + * \param sub Subscription to check + * \return False (zero) if subscription is not subscribed. + * \return True (non-zero) if still subscribed. + */ +int stasis_subscription_is_subscribed(const struct stasis_subscription *sub); + +/*! + * \brief Determine whether a message is the final message to be received on a subscription. + * + * \param sub Subscription on which the message was received. + * \param msg Message to check. + * \return zero if the provided message is not the final message. + * \return non-zero if the provided message is the final message. + * \since 12 + */ +int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg); + +/*! + * \brief Holds details about changes to subscriptions for the specified topic + * \since 12 + */ +struct stasis_subscription_change { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(uniqueid); /*!< The unique ID associated with this subscription */ + AST_STRING_FIELD(description); /*!< The description of the change to the subscription associated with the uniqueid */ + ); + struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */ +}; + +/*! + * \brief Gets the message type for subscription change notices + * \return The stasis_message_type for subscription change notices + * \since 12 + */ +struct stasis_message_type *stasis_subscription_change(void); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief A topic wrapper, which caches certain messages. + * \since 12 + */ +struct stasis_caching_topic; + +/*! + * \brief Message type for cache update messages. + * \return Message type for cache update messages. + * \since 12 + */ +struct stasis_message_type *stasis_cache_update(void); + +/*! + * \brief Cache update message + * \since 12 + */ +struct stasis_cache_update { + /*! \brief Topic that published \c new_snapshot */ + struct stasis_topic *topic; + /*! \brief Convenience reference to snapshot type */ + struct stasis_message_type *type; + /*! \brief Old value from the cache */ + struct stasis_message *old_snapshot; + /*! \brief New value */ + struct stasis_message *new_snapshot; +}; + +/*! + * \brief A message which instructs the caching topic to remove an entry from its cache. + * \param type Message type. + * \param id Unique id of the snapshot to clear. + * \return Message which, when sent to the \a topic, will clear the item from the cache. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id); + +/*! + * \brief Callback extract a unique identity from a snapshot message. + * + * This identity is unique to the underlying object of the snapshot, such as the + * UniqueId field of a channel. + * + * \param message Message to extract id from. + * \return String representing the snapshot's id. + * \return \c NULL if the message_type of the message isn't a handled snapshot. + * \since 12 + */ +typedef const char *(*snapshot_get_id)(struct stasis_message *message); + +/*! + * \brief Create a topic which monitors and caches messages from another topic. + * + * The idea is that some topics publish 'snapshots' of some other object's state + * that should be cached. When these snapshot messages are received, the cache + * is updated, and a stasis_cache_update() message is forwarded, which has both + * the original snapshot message and the new message. + * + * \param original_topic Topic publishing snapshot messages. + * \param id_fn Callback to extract the id from a snapshot message. + * \return New topic which changes snapshot messages to stasis_cache_update() + * messages, and forwards all other messages from the original topic. + * \since 12 + */ +struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn); + +/*! + * Unsubscribes a caching topic from its upstream topic. + * \param caching_topic Caching topic to unsubscribe + * \since 12 + */ +void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic); + +/*! + * \brief Returns the topic of cached events from a caching topics. + * \param caching_topic The caching topic. + * \return The topic that publishes cache update events, along with passthrough events + * from the underlying topic. + * \return \c NULL if \a caching_topic is \c NULL. + * \since 12 + */ +struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic); + +/*! + * \brief Retrieve an item from the cache. + * \param caching_topic The topic returned from stasis_caching_topic_create(). + * \param type Type of message to retrieve. + * \param id Identity of the snapshot to retrieve. + * \return Message from the cache. The cache still owns the message, so + * ao2_ref() if you want to keep it. + * \return \c NULL if message is not found. + * \since 12 + */ +struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, + struct stasis_message_type *type, + const char *id); + +/*! @} */ + +/*! @{ */ + +/*! + * \brief Initialize the Stasis subsystem + * \return 0 on success. + * \return Non-zero on error. + * \since 12 + */ +int stasis_init(void); + +/*! + * \private + * \brief called by stasis_init() for cache initialization. + * \return 0 on success. + * \return Non-zero on error. + * \since 12 + */ +int stasis_cache_init(void); + +/*! @} */ + +#endif /* _ASTERISK_STASIS_H */ |