summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/devicestate.h2
-rw-r--r--include/asterisk/event.h127
-rw-r--r--include/asterisk/event_defs.h238
-rw-r--r--include/asterisk/stasis.h31
-rw-r--r--main/app.c35
-rw-r--r--main/devicestate.c33
-rw-r--r--main/event.c123
-rw-r--r--main/stasis.c21
-rw-r--r--main/stasis_message.c5
-rw-r--r--res/res_corosync.c390
10 files changed, 804 insertions, 201 deletions
diff --git a/include/asterisk/devicestate.h b/include/asterisk/devicestate.h
index ccc46311e..4f9aa739b 100644
--- a/include/asterisk/devicestate.h
+++ b/include/asterisk/devicestate.h
@@ -59,7 +59,7 @@ enum ast_device_state {
AST_DEVICE_RINGING, /*!< Device is ringing */
AST_DEVICE_RINGINUSE, /*!< Device is ringing *and* in use */
AST_DEVICE_ONHOLD, /*!< Device is on hold */
- AST_DEVICE_TOTAL, /*/ Total num of device states, used for testing */
+ AST_DEVICE_TOTAL, /*!< Total num of device states, used for testing */
};
/*! \brief Device State Cachability
diff --git a/include/asterisk/event.h b/include/asterisk/event.h
index 3178de5c2..7eea0581d 100644
--- a/include/asterisk/event.h
+++ b/include/asterisk/event.h
@@ -25,33 +25,27 @@
/*!
* \page AstGenericEvents Generic event system
*
- * The purpose of this API is to provide a generic way to share events between
- * Asterisk modules. Code can generate events, and other code can subscribe to
- * them.
+ * Prior to the creation of \ref stasis, the purpose of this API was to provide
+ * a generic way to share events between Asterisk modules. Once there was a need
+ * to disseminate data whose definition was provided by the producers/consumers,
+ * it was no longer possible to use the binary representation in the generic
+ * event system.
+ *
+ * That aside, the generic event system is still useful and used by several
+ * modules in Asterisk.
+ * - CEL uses the \ref ast_event representation to pass information to registered
+ * backends.
+ * - The \file res_corosync module publishes \ref ast_event representations of
+ * information to other Asterisk instances in a cluster.
+ * - Security event represent their event types and data using this system.
+ * - Theoretically, any \ref stasis message can use this system to pass
+ * information around in a binary format.
*
* Events have an associated event type, as well as information elements. The
* information elements are the meta data that go along with each event. For
* example, in the case of message waiting indication, the event type is MWI,
* and each MWI event contains at least three information elements: the
* mailbox, the number of new messages, and the number of old messages.
- *
- * Subscriptions to events consist of an event type and information elements,
- * as well. Subscriptions can be to all events, or a certain subset of events.
- * If an event type is provided, only events of that type will be sent to this
- * subscriber. Furthermore, if information elements are supplied with the
- * subscription, only events that contain the specified information elements
- * with specified values will be sent to the subscriber. For example, when a
- * SIP phone subscribes to MWI for mailbox 1234, then chan_sip can subscribe
- * to internal Asterisk MWI events with the MAILBOX information element with
- * a value of "1234".
- *
- * Another key feature of this event system is the ability to cache events.
- * It is useful for some types of events to be able to remember the last known
- * value. These are usually events that indicate some kind of state change.
- * In the example of MWI, app_voicemail can instruct the event core to cache
- * these events based on the mailbox. So, the last known MWI state of each
- * mailbox will be cached, and other modules can retrieve this information
- * on demand without having to poll the mailbox directly.
*/
#ifndef AST_EVENT_H
@@ -109,9 +103,6 @@ struct ast_event *ast_event_new(enum ast_event_type event_type, ...);
*
* \return Nothing
*
- * \note Events that have been queued should *not* be destroyed by the code that
- * created the event. It will be automatically destroyed after being
- * dispatched to the appropriate subscribers.
*/
void ast_event_destroy(struct ast_event *event);
@@ -150,6 +141,55 @@ int ast_event_append_ie_uint(struct ast_event **event, enum ast_event_ie_type ie
uint32_t data);
/*!
+ * \brief Append an information element that has a bitflags payload
+ *
+ * \param event the event that the IE will be appended to
+ * \param ie_type the type of IE to append
+ * \param bitflags the flags that are the payload of the IE
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ * \since 1.8
+ *
+ * The pointer to the event will get updated with the new location for the event
+ * that now contains the appended information element. If the re-allocation of
+ * the memory for this event fails, it will be set to NULL.
+ */
+int ast_event_append_ie_bitflags(struct ast_event **event, enum ast_event_ie_type ie_type,
+ uint32_t bitflags);
+
+/*!
+ * \brief Append an information element that has a raw payload
+ *
+ * \param event the event that the IE will be appended to
+ * \param ie_type the type of IE to append
+ * \param data A pointer to the raw data for the payload of the IE
+ * \param data_len The amount of data to copy into the payload
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ *
+ * The pointer to the event will get updated with the new location for the event
+ * that now contains the appended information element. If the re-allocation of
+ * the memory for this event fails, it will be set to NULL.
+ */
+int ast_event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
+ const void *data, size_t data_len);
+
+/*!
+ * \brief Append the global EID IE
+ *
+ * \param event the event to append IE to
+ *
+ * \note For ast_event_new() that includes IEs, this is done automatically
+ * for you.
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int ast_event_append_eid(struct ast_event **event);
+
+/*!
* \brief Get the value of an information element that has an integer payload
*
* \param event The event to get the IE from
@@ -173,6 +213,38 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_
const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type);
/*!
+ * \brief Get the value of an information element that has a raw payload
+ *
+ * \param event The event to get the IE from
+ * \param ie_type the type of information element to retrieve
+ *
+ * \return This returns the payload of the information element with the given type.
+ * If the information element isn't found, NULL will be returned.
+ */
+const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type);
+
+/*!
+ * \brief Get the length of the raw payload for a particular IE
+ *
+ * \param event The event to get the IE payload length from
+ * \param ie_type the type of information element to get the length of
+ *
+ * \return If an IE of type ie_type is found, its payload length is returned.
+ * Otherwise, 0 is returned.
+ */
+uint16_t ast_event_get_ie_raw_payload_len(const struct ast_event *event, enum ast_event_ie_type ie_type);
+
+/*!
+ * \brief Get the string representation of the type of the given event
+ *
+ * \arg event the event to get the type of
+ *
+ * \return the string representation of the event type of the provided event
+ * \since 1.6.1
+ */
+const char *ast_event_get_type_name(const struct ast_event *event);
+
+/*!
* \brief Get the string representation of an information element type
*
* \param ie_type the information element type to get the string representation of
@@ -273,6 +345,13 @@ uint32_t ast_event_iterator_get_ie_uint(struct ast_event_iterator *iterator);
*/
const char *ast_event_iterator_get_ie_str(struct ast_event_iterator *iterator);
+/*!
+ * \brief Get the minimum length of an ast_event.
+ *
+ * \return minimum amount of memory that will be consumed by any ast_event.
+ */
+size_t ast_event_minimum_length(void);
+
#if defined(__cplusplus) || defined(c_plusplus)
}
#endif
diff --git a/include/asterisk/event_defs.h b/include/asterisk/event_defs.h
index f8f98ace0..80a8d7dda 100644
--- a/include/asterisk/event_defs.h
+++ b/include/asterisk/event_defs.h
@@ -25,23 +25,41 @@
#ifndef AST_EVENT_DEFS_H
#define AST_EVENT_DEFS_H
-/*! \brief Event types
- * \note These values no longer go over the wire and can change when items are removed. */
enum ast_event_type {
/*! Reserved to provide the ability to subscribe to all events. A specific
* event should never have a payload of 0. */
AST_EVENT_ALL = 0x00,
/*! This event type is reserved for use by third-party modules to create
- * custom events without having to modify this file.
+ * custom events without having to modify this file.
* \note There are no "custom" IE types, because IEs only have to be
* unique to the event itself, not necessarily across all events. */
AST_EVENT_CUSTOM = 0x01,
+ /*! Voicemail message waiting indication */
+ AST_EVENT_MWI = 0x02,
/*! Someone has subscribed to events */
- AST_EVENT_SUB = 0x02,
+ AST_EVENT_SUB = 0x03,
+ /*! Someone has unsubscribed from events */
+ AST_EVENT_UNSUB = 0x04,
+ /*! The aggregate state of a device across all servers configured to be
+ * a part of a device state cluster has changed. */
+ AST_EVENT_DEVICE_STATE = 0x05,
+ /*! The state of a device has changed on _one_ server. This should not be used
+ * directly, in general. Use AST_EVENT_DEVICE_STATE instead. */
+ AST_EVENT_DEVICE_STATE_CHANGE = 0x06,
/*! Channel Event Logging events */
- AST_EVENT_CEL = 0x03,
+ AST_EVENT_CEL = 0x07,
+ /*! A report of a security related event (see security_events.h) */
+ AST_EVENT_SECURITY = 0x08,
+ /*! Used by res_stun_monitor to alert listeners to an exernal network address change. */
+ AST_EVENT_NETWORK_CHANGE = 0x09,
+ /*! The presence state for a presence provider */
+ AST_EVENT_PRESENCE_STATE = 0x0a,
+ /*! Used to alert listeners when a named ACL has changed. */
+ AST_EVENT_ACL_CHANGE = 0x0b,
+ /*! Send out a ping for debugging distributed events */
+ AST_EVENT_PING = 0x0c,
/*! Number of event types. This should be the last event type + 1 */
- AST_EVENT_TOTAL = 0x04,
+ AST_EVENT_TOTAL = 0x0d,
};
/*! \brief Event Information Element types */
@@ -49,199 +67,243 @@ enum ast_event_ie_type {
/*! Used to terminate the arguments to event functions */
AST_EVENT_IE_END = -1,
- /*!
+ /*!
+ * \brief Number of new messages
+ * Used by: AST_EVENT_MWI
+ * Payload type: UINT
+ */
+ AST_EVENT_IE_NEWMSGS = 0x0001,
+ /*!
+ * \brief Number of
+ * Used by: AST_EVENT_MWI
+ * Payload type: UINT
+ */
+ AST_EVENT_IE_OLDMSGS = 0x0002,
+ /*!
+ * \brief Mailbox name \verbatim (mailbox[@context]) \endverbatim
+ * Used by: AST_EVENT_MWI
+ * Payload type: STR
+ */
+ AST_EVENT_IE_MAILBOX = 0x0003,
+ /*!
* \brief Unique ID
* Used by: AST_EVENT_SUB, AST_EVENT_UNSUB
* Payload type: UINT
*/
- AST_EVENT_IE_UNIQUEID = 0x0001,
- /*!
- * \brief Event type
+ AST_EVENT_IE_UNIQUEID = 0x0004,
+ /*!
+ * \brief Event type
* Used by: AST_EVENT_SUB, AST_EVENT_UNSUB
* Payload type: UINT
*/
- AST_EVENT_IE_EVENTTYPE = 0x0002,
+ AST_EVENT_IE_EVENTTYPE = 0x0005,
/*!
* \brief Hint that someone cares that an IE exists
* Used by: AST_EVENT_SUB
* Payload type: UINT (ast_event_ie_type)
*/
- AST_EVENT_IE_EXISTS = 0x0003,
- /*!
- * \brief Context IE
- * Used by AST_EVENT_MWI
- * Payload type: str
- */
- AST_EVENT_IE_CONTEXT = 0x0004,
- /*!
+ AST_EVENT_IE_EXISTS = 0x0006,
+ /*!
+ * \brief Device Name
+ * Used by AST_EVENT_DEVICE_STATE_CHANGE
+ * Payload type: STR
+ */
+ AST_EVENT_IE_DEVICE = 0x0007,
+ /*!
+ * \brief Generic State IE
+ * Used by AST_EVENT_DEVICE_STATE_CHANGE
+ * Payload type: UINT
+ * The actual state values depend on the event which
+ * this IE is a part of.
+ */
+ AST_EVENT_IE_STATE = 0x0008,
+ /*!
+ * \brief Context IE
+ * Used by AST_EVENT_MWI
+ * Payload type: str
+ */
+ AST_EVENT_IE_CONTEXT = 0x0009,
+ /*!
* \brief Channel Event Type
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_EVENT_TYPE = 0x0005,
- /*!
+ AST_EVENT_IE_CEL_EVENT_TYPE = 0x000a,
+ /*!
* \brief Channel Event Time (seconds)
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_EVENT_TIME = 0x0006,
- /*!
+ AST_EVENT_IE_CEL_EVENT_TIME = 0x000b,
+ /*!
* \brief Channel Event Time (micro-seconds)
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_EVENT_TIME_USEC = 0x0007,
- /*!
+ AST_EVENT_IE_CEL_EVENT_TIME_USEC = 0x000c,
+ /*!
* \brief Channel Event User Event Name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_USEREVENT_NAME = 0x0008,
- /*!
+ AST_EVENT_IE_CEL_USEREVENT_NAME = 0x000d,
+ /*!
* \brief Channel Event CID name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDNAME = 0x0009,
- /*!
+ AST_EVENT_IE_CEL_CIDNAME = 0x000e,
+ /*!
* \brief Channel Event CID num
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDNUM = 0x000a,
- /*!
+ AST_EVENT_IE_CEL_CIDNUM = 0x000f,
+ /*!
* \brief Channel Event extension name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_EXTEN = 0x000b,
- /*!
+ AST_EVENT_IE_CEL_EXTEN = 0x0010,
+ /*!
* \brief Channel Event context name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CONTEXT = 0x000c,
- /*!
+ AST_EVENT_IE_CEL_CONTEXT = 0x0011,
+ /*!
* \brief Channel Event channel name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CHANNAME = 0x000d,
- /*!
+ AST_EVENT_IE_CEL_CHANNAME = 0x0012,
+ /*!
* \brief Channel Event app name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_APPNAME = 0x000e,
- /*!
+ AST_EVENT_IE_CEL_APPNAME = 0x0013,
+ /*!
* \brief Channel Event app args/data
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_APPDATA = 0x000f,
- /*!
+ AST_EVENT_IE_CEL_APPDATA = 0x0014,
+ /*!
* \brief Channel Event AMA flags
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_AMAFLAGS = 0x0010,
- /*!
+ AST_EVENT_IE_CEL_AMAFLAGS = 0x0015,
+ /*!
* \brief Channel Event AccountCode
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_ACCTCODE = 0x0011,
- /*!
+ AST_EVENT_IE_CEL_ACCTCODE = 0x0016,
+ /*!
* \brief Channel Event UniqueID
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_UNIQUEID = 0x0012,
- /*!
+ AST_EVENT_IE_CEL_UNIQUEID = 0x0017,
+ /*!
* \brief Channel Event Userfield
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_USERFIELD = 0x0013,
- /*!
+ AST_EVENT_IE_CEL_USERFIELD = 0x0018,
+ /*!
* \brief Channel Event CID ANI field
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDANI = 0x0014,
- /*!
+ AST_EVENT_IE_CEL_CIDANI = 0x0019,
+ /*!
* \brief Channel Event CID RDNIS field
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDRDNIS = 0x0015,
- /*!
+ AST_EVENT_IE_CEL_CIDRDNIS = 0x001a,
+ /*!
* \brief Channel Event CID dnid
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDDNID = 0x0016,
- /*!
+ AST_EVENT_IE_CEL_CIDDNID = 0x001b,
+ /*!
* \brief Channel Event Peer -- for Things involving multiple channels, like BRIDGE
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_PEER = 0x0017,
- /*!
+ AST_EVENT_IE_CEL_PEER = 0x001c,
+ /*!
* \brief Channel Event LinkedID
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_LINKEDID = 0x0018,
- /*!
+ AST_EVENT_IE_CEL_LINKEDID = 0x001d,
+ /*!
* \brief Channel Event peeraccount
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_PEERACCT = 0x0019,
- /*!
+ AST_EVENT_IE_CEL_PEERACCT = 0x001e,
+ /*!
* \brief Channel Event extra data
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_EXTRA = 0x001a,
+ AST_EVENT_IE_CEL_EXTRA = 0x001f,
/*!
* \brief Description
* Used by: AST_EVENT_SUB, AST_EVENT_UNSUB
* Payload type: STR
*/
- AST_EVENT_IE_DESCRIPTION = 0x001b,
+ AST_EVENT_IE_DESCRIPTION = 0x0020,
/*!
* \brief Entity ID
* Used by All events
* Payload type: RAW
* This IE indicates which server the event originated from
*/
- AST_EVENT_IE_EVENT_VERSION = 0x001c,
- AST_EVENT_IE_SERVICE = 0x001d,
- AST_EVENT_IE_MODULE = 0x001e,
- AST_EVENT_IE_ACCOUNT_ID = 0x001f,
- AST_EVENT_IE_SESSION_ID = 0x0020,
- AST_EVENT_IE_SESSION_TV = 0x0021,
- AST_EVENT_IE_ACL_NAME = 0x0022,
- AST_EVENT_IE_LOCAL_ADDR = 0x0023,
- AST_EVENT_IE_REMOTE_ADDR = 0x0024,
- AST_EVENT_IE_EVENT_TV = 0x0025,
- AST_EVENT_IE_REQUEST_TYPE = 0x0026,
- AST_EVENT_IE_REQUEST_PARAMS = 0x0027,
- AST_EVENT_IE_AUTH_METHOD = 0x0028,
- AST_EVENT_IE_SEVERITY = 0x0029,
- AST_EVENT_IE_EXPECTED_ADDR = 0x002a,
- AST_EVENT_IE_CHALLENGE = 0x002b,
- AST_EVENT_IE_RESPONSE = 0x002c,
- AST_EVENT_IE_EXPECTED_RESPONSE = 0x002e,
- AST_EVENT_IE_RECEIVED_CHALLENGE = 0x002f,
- AST_EVENT_IE_RECEIVED_HASH = 0x0030,
- AST_EVENT_IE_USING_PASSWORD = 0x0031,
- AST_EVENT_IE_ATTEMPTED_TRANSPORT = 0x0032,
+ AST_EVENT_IE_EID = 0x0021,
+ AST_EVENT_IE_SECURITY_EVENT = 0x0022,
+ AST_EVENT_IE_EVENT_VERSION = 0x0023,
+ AST_EVENT_IE_SERVICE = 0x0024,
+ AST_EVENT_IE_MODULE = 0x0025,
+ AST_EVENT_IE_ACCOUNT_ID = 0x0026,
+ AST_EVENT_IE_SESSION_ID = 0x0027,
+ AST_EVENT_IE_SESSION_TV = 0x0028,
+ AST_EVENT_IE_ACL_NAME = 0x0029,
+ AST_EVENT_IE_LOCAL_ADDR = 0x002a,
+ AST_EVENT_IE_REMOTE_ADDR = 0x002b,
+ AST_EVENT_IE_EVENT_TV = 0x002c,
+ AST_EVENT_IE_REQUEST_TYPE = 0x002d,
+ AST_EVENT_IE_REQUEST_PARAMS = 0x002e,
+ AST_EVENT_IE_AUTH_METHOD = 0x002f,
+ AST_EVENT_IE_SEVERITY = 0x0030,
+ AST_EVENT_IE_EXPECTED_ADDR = 0x0031,
+ AST_EVENT_IE_CHALLENGE = 0x0032,
+ AST_EVENT_IE_RESPONSE = 0x0033,
+ AST_EVENT_IE_EXPECTED_RESPONSE = 0x0034,
+ AST_EVENT_IE_RECEIVED_CHALLENGE = 0x0035,
+ AST_EVENT_IE_RECEIVED_HASH = 0x0036,
+ AST_EVENT_IE_USING_PASSWORD = 0x0037,
+ AST_EVENT_IE_ATTEMPTED_TRANSPORT = 0x0038,
+ AST_EVENT_IE_PRESENCE_PROVIDER = 0x0039,
+ AST_EVENT_IE_PRESENCE_STATE = 0x003a,
+ AST_EVENT_IE_PRESENCE_SUBTYPE = 0x003b,
+ AST_EVENT_IE_PRESENCE_MESSAGE = 0x003c,
+ /*!
+ * \brief Event non-cachability flag
+ * Used by: All events
+ * Payload type: UINT
+ */
+ AST_EVENT_IE_CACHABLE = 0x003d,
/*! \brief Must be the last IE value +1 */
- AST_EVENT_IE_TOTAL = 0x0033,
+ AST_EVENT_IE_TOTAL = 0x003e,
};
/*!
@@ -249,12 +311,16 @@ enum ast_event_ie_type {
*/
enum ast_event_ie_pltype {
AST_EVENT_IE_PLTYPE_UNKNOWN = -1,
+ /*! Just check if it exists, not the value */
+ AST_EVENT_IE_PLTYPE_EXISTS,
/*! Unsigned Integer (Can be used for signed, too ...) */
AST_EVENT_IE_PLTYPE_UINT,
/*! String */
AST_EVENT_IE_PLTYPE_STR,
/*! Raw data, compared with memcmp */
AST_EVENT_IE_PLTYPE_RAW,
+ /*! Bit flags (unsigned integer, compared using boolean logic) */
+ AST_EVENT_IE_PLTYPE_BITFLAGS,
};
/*!
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 20870e6d6..f5b4a60d9 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -171,6 +171,7 @@
#include "asterisk/json.h"
#include "asterisk/manager.h"
#include "asterisk/utils.h"
+#include "asterisk/event.h"
/*! @{ */
@@ -255,6 +256,21 @@ struct stasis_message_vtable {
*/
struct ast_manager_event_blob *(*to_ami)(
struct stasis_message *message);
+
+ /*!
+ * \since 12.3.0
+ * \brief Build the \ref ast_event representation of the message.
+ *
+ * May be \c NULL, or may return \c NULL, to indicate no representation.
+ * The returned object should be free'd.
+ *
+ * \param message Message to convert to an \ref ast_event.
+ * \return Newly allocated \ref ast_event.
+ * \return \c NULL on error.
+ * \return \c NULL if AMI format is not supported.
+ */
+ struct ast_event *(*to_event)(
+ struct stasis_message *message);
};
/*!
@@ -389,6 +405,19 @@ struct ast_json *stasis_message_to_json(struct stasis_message *message, struct s
struct ast_manager_event_blob *stasis_message_to_ami(
struct stasis_message *message);
+/*!
+ * \brief Build the \ref AstGenericEvents representation of the message.
+ *
+ * May return \c NULL, to indicate no representation. The returned object should
+ * be disposed of via \ref ast_event_destroy.
+ *
+ * \param message Message to convert to AMI.
+ * \return \c NULL on error.
+ * \return \c NULL if AMI format is not supported.
+ */
+struct ast_event *stasis_message_to_event(
+ struct stasis_message *message);
+
/*! @} */
/*! @{ */
@@ -1020,6 +1049,7 @@ void stasis_log_bad_type_access(const char *name);
* STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
* .to_ami = foo_to_ami,
* .to_json = foo_to_json,
+ * .to_event = foo_to_event,
* );
* \endcode
*
@@ -1046,6 +1076,7 @@ void stasis_log_bad_type_access(const char *name);
* STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
* .to_ami = foo_to_ami,
* .to_json = foo_to_json,
+ * .to_event = foo_to_event,
* );
* \endcode
*
diff --git a/main/app.c b/main/app.c
index a5c4c7af5..3c2f33c51 100644
--- a/main/app.c
+++ b/main/app.c
@@ -96,10 +96,43 @@ static struct stasis_topic *queue_topic_all;
static struct stasis_topic_pool *queue_topic_pool;
/* @} */
+/*! \brief Convert a MWI \ref stasis_message to a \ref ast_event */
+static struct ast_event *mwi_to_event(struct stasis_message *message)
+{
+ struct ast_event *event;
+ struct ast_mwi_state *mwi_state;
+ char *mailbox;
+ char *context;
+
+ if (!message) {
+ return NULL;
+ }
+
+ mwi_state = stasis_message_data(message);
+
+ /* Strip off @context */
+ context = mailbox = ast_strdupa(mwi_state->uniqueid);
+ strsep(&context, "@");
+ if (ast_strlen_zero(context)) {
+ context = "default";
+ }
+
+ event = ast_event_new(AST_EVENT_MWI,
+ AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
+ AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
+ AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, mwi_state->new_msgs,
+ AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, mwi_state->old_msgs,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, &mwi_state->eid, sizeof(mwi_state->eid),
+ AST_EVENT_IE_END);
+
+ return event;
+}
+
/*
* @{ \brief Define \ref stasis message types for MWI
*/
-STASIS_MESSAGE_TYPE_DEFN(ast_mwi_state_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_mwi_state_type,
+ .to_event = mwi_to_event, );
STASIS_MESSAGE_TYPE_DEFN(ast_mwi_vm_app_type);
/* @} */
diff --git a/main/devicestate.c b/main/devicestate.c
index 1199e68ab..3580b1af7 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -224,9 +224,12 @@ static struct stasis_caching_topic *device_state_topic_cached;
static struct stasis_topic_pool *device_state_topic_pool;
static struct ast_manager_event_blob *devstate_to_ami(struct stasis_message *msg);
+static struct ast_event *devstate_to_event(struct stasis_message *msg);
+
STASIS_MESSAGE_TYPE_DEFN(ast_device_state_message_type,
.to_ami = devstate_to_ami,
+ .to_event = devstate_to_event,
);
/* Forward declarations */
@@ -925,3 +928,33 @@ static struct ast_manager_event_blob *devstate_to_ami(struct stasis_message *msg
"State: %s\r\n",
dev_state->device, ast_devstate_str(dev_state->state));
}
+
+/*! \brief Convert a \ref stasis_message to a \ref ast_event */
+static struct ast_event *devstate_to_event(struct stasis_message *message)
+{
+ struct ast_event *event;
+ struct ast_device_state_message *device_state;
+
+ if (!message) {
+ return NULL;
+ }
+
+ device_state = stasis_message_data(message);
+
+ if (device_state->eid) {
+ event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device_state->device,
+ AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, device_state->state,
+ AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, device_state->cachable,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, device_state->eid, sizeof(*device_state->eid),
+ AST_EVENT_IE_END);
+ } else {
+ event = ast_event_new(AST_EVENT_DEVICE_STATE,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device_state->device,
+ AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, device_state->state,
+ AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, device_state->cachable,
+ AST_EVENT_IE_END);
+ }
+
+ return event;
+}
diff --git a/main/event.c b/main/event.c
index 990f62161..876e53b88 100644
--- a/main/event.c
+++ b/main/event.c
@@ -44,10 +44,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
-static int event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
- const void *data, size_t data_len);
-static const void *event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type);
-
/*!
* \brief An event information element
*
@@ -109,19 +105,42 @@ struct ast_event_ie_val {
size_t raw_datalen;
};
-struct ie_map {
- enum ast_event_ie_pltype ie_pltype;
- const char *name;
+/*!
+ * \brief Event Names
+ */
+static const char * const event_names[AST_EVENT_TOTAL] = {
+ [AST_EVENT_ALL] = "All",
+ [AST_EVENT_CUSTOM] = "Custom",
+ [AST_EVENT_MWI] = "MWI",
+ [AST_EVENT_SUB] = "Subscription",
+ [AST_EVENT_UNSUB] = "Unsubscription",
+ [AST_EVENT_DEVICE_STATE] = "DeviceState",
+ [AST_EVENT_DEVICE_STATE_CHANGE] = "DeviceStateChange",
+ [AST_EVENT_CEL] = "CEL",
+ [AST_EVENT_SECURITY] = "Security",
+ [AST_EVENT_NETWORK_CHANGE] = "NetworkChange",
+ [AST_EVENT_PRESENCE_STATE] = "PresenceState",
+ [AST_EVENT_ACL_CHANGE] = "ACLChange",
+ [AST_EVENT_PING] = "Ping",
};
/*!
* \brief IE payload types and names
*/
-static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = {
+static const struct ie_map {
+ enum ast_event_ie_pltype ie_pltype;
+ const char *name;
+} ie_maps[AST_EVENT_IE_TOTAL] = {
+ [AST_EVENT_IE_NEWMSGS] = { AST_EVENT_IE_PLTYPE_UINT, "NewMessages" },
+ [AST_EVENT_IE_OLDMSGS] = { AST_EVENT_IE_PLTYPE_UINT, "OldMessages" },
+ [AST_EVENT_IE_MAILBOX] = { AST_EVENT_IE_PLTYPE_STR, "Mailbox" },
[AST_EVENT_IE_UNIQUEID] = { AST_EVENT_IE_PLTYPE_UINT, "UniqueID" },
[AST_EVENT_IE_EVENTTYPE] = { AST_EVENT_IE_PLTYPE_UINT, "EventType" },
[AST_EVENT_IE_EXISTS] = { AST_EVENT_IE_PLTYPE_UINT, "Exists" },
+ [AST_EVENT_IE_DEVICE] = { AST_EVENT_IE_PLTYPE_STR, "Device" },
+ [AST_EVENT_IE_STATE] = { AST_EVENT_IE_PLTYPE_UINT, "State" },
[AST_EVENT_IE_CONTEXT] = { AST_EVENT_IE_PLTYPE_STR, "Context" },
+ [AST_EVENT_IE_EID] = { AST_EVENT_IE_PLTYPE_RAW, "EntityID" },
[AST_EVENT_IE_CEL_EVENT_TYPE] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventType" },
[AST_EVENT_IE_CEL_EVENT_TIME] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventTime" },
[AST_EVENT_IE_CEL_EVENT_TIME_USEC] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventTimeUSec" },
@@ -144,6 +163,7 @@ static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = {
[AST_EVENT_IE_CEL_LINKEDID] = { AST_EVENT_IE_PLTYPE_STR, "CELLinkedID" },
[AST_EVENT_IE_CEL_PEERACCT] = { AST_EVENT_IE_PLTYPE_STR, "CELPeerAcct" },
[AST_EVENT_IE_CEL_EXTRA] = { AST_EVENT_IE_PLTYPE_STR, "CELExtra" },
+ [AST_EVENT_IE_SECURITY_EVENT] = { AST_EVENT_IE_PLTYPE_STR, "SecurityEvent" },
[AST_EVENT_IE_EVENT_VERSION] = { AST_EVENT_IE_PLTYPE_UINT, "EventVersion" },
[AST_EVENT_IE_SERVICE] = { AST_EVENT_IE_PLTYPE_STR, "Service" },
[AST_EVENT_IE_MODULE] = { AST_EVENT_IE_PLTYPE_STR, "Module" },
@@ -166,8 +186,27 @@ static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = {
[AST_EVENT_IE_RECEIVED_HASH] = { AST_EVENT_IE_PLTYPE_STR, "ReceivedHash" },
[AST_EVENT_IE_USING_PASSWORD] = { AST_EVENT_IE_PLTYPE_UINT, "UsingPassword" },
[AST_EVENT_IE_ATTEMPTED_TRANSPORT] = { AST_EVENT_IE_PLTYPE_STR, "AttemptedTransport" },
+ [AST_EVENT_IE_CACHABLE] = { AST_EVENT_IE_PLTYPE_UINT, "Cachable" },
+ [AST_EVENT_IE_PRESENCE_PROVIDER] = { AST_EVENT_IE_PLTYPE_STR, "PresenceProvider" },
+ [AST_EVENT_IE_PRESENCE_STATE] = { AST_EVENT_IE_PLTYPE_UINT, "PresenceState" },
+ [AST_EVENT_IE_PRESENCE_SUBTYPE] = { AST_EVENT_IE_PLTYPE_STR, "PresenceSubtype" },
+ [AST_EVENT_IE_PRESENCE_MESSAGE] = { AST_EVENT_IE_PLTYPE_STR, "PresenceMessage" },
};
+const char *ast_event_get_type_name(const struct ast_event *event)
+{
+ enum ast_event_type type;
+
+ type = ast_event_get_type(event);
+
+ if (type < 0 || type >= ARRAY_LEN(event_names)) {
+ ast_log(LOG_ERROR, "Invalid event type - '%d'\n", type);
+ return "";
+ }
+
+ return event_names[type];
+}
+
const char *ast_event_get_ie_type_name(enum ast_event_ie_type ie_type)
{
if (ie_type <= 0 || ie_type >= ARRAY_LEN(ie_maps)) {
@@ -257,7 +296,7 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_
{
const uint32_t *ie_val;
- ie_val = event_get_ie_raw(event, ie_type);
+ ie_val = ast_event_get_ie_raw(event, ie_type);
return ie_val ? ntohl(get_unaligned_uint32(ie_val)) : 0;
}
@@ -266,12 +305,12 @@ const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_i
{
const struct ast_event_ie_str_payload *str_payload;
- str_payload = event_get_ie_raw(event, ie_type);
+ str_payload = ast_event_get_ie_raw(event, ie_type);
return str_payload ? str_payload->str : NULL;
}
-static const void *event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
+const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
{
struct ast_event_iterator iterator;
int res;
@@ -285,6 +324,26 @@ static const void *event_get_ie_raw(const struct ast_event *event, enum ast_even
return NULL;
}
+static uint16_t event_iterator_get_ie_raw_payload_len(struct ast_event_iterator *iterator)
+{
+ return ntohs(iterator->ie->ie_payload_len);
+}
+
+uint16_t ast_event_get_ie_raw_payload_len(const struct ast_event *event, enum ast_event_ie_type ie_type)
+{
+ struct ast_event_iterator iterator;
+ int res;
+
+ for (res = ast_event_iterator_init(&iterator, event); !res; res = ast_event_iterator_next(&iterator)) {
+ if (ast_event_iterator_get_ie_type(&iterator) == ie_type) {
+ return event_iterator_get_ie_raw_payload_len(&iterator);
+ }
+ }
+
+ return 0;
+}
+
+
int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_type,
const char *str)
{
@@ -297,17 +356,24 @@ int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_
strcpy(str_payload->str, str);
str_payload->hash = ast_str_hash(str);
- return event_append_ie_raw(event, ie_type, str_payload, payload_len);
+ return ast_event_append_ie_raw(event, ie_type, str_payload, payload_len);
}
int ast_event_append_ie_uint(struct ast_event **event, enum ast_event_ie_type ie_type,
uint32_t data)
{
data = htonl(data);
- return event_append_ie_raw(event, ie_type, &data, sizeof(data));
+ return ast_event_append_ie_raw(event, ie_type, &data, sizeof(data));
+}
+
+int ast_event_append_ie_bitflags(struct ast_event **event, enum ast_event_ie_type ie_type,
+ uint32_t flags)
+{
+ flags = htonl(flags);
+ return ast_event_append_ie_raw(event, ie_type, &flags, sizeof(flags));
}
-static int event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
+int ast_event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
const void *data, size_t data_len)
{
struct ast_event_ie *ie;
@@ -361,11 +427,16 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
memset(ie_value, 0, sizeof(*ie_value));
ie_value->ie_type = ie_type;
ie_value->ie_pltype = va_arg(ap, enum ast_event_ie_pltype);
+
switch (ie_value->ie_pltype) {
case AST_EVENT_IE_PLTYPE_UINT:
ie_value->payload.uint = va_arg(ap, uint32_t);
insert = 1;
break;
+ case AST_EVENT_IE_PLTYPE_BITFLAGS:
+ ie_value->payload.uint = va_arg(ap, uint32_t);
+ insert = 1;
+ break;
case AST_EVENT_IE_PLTYPE_STR:
ie_value->payload.str = va_arg(ap, const char *);
insert = 1;
@@ -381,6 +452,7 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
break;
}
case AST_EVENT_IE_PLTYPE_UNKNOWN:
+ case AST_EVENT_IE_PLTYPE_EXISTS:
break;
}
@@ -407,10 +479,14 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
case AST_EVENT_IE_PLTYPE_UINT:
ast_event_append_ie_uint(&event, ie_val->ie_type, ie_val->payload.uint);
break;
+ case AST_EVENT_IE_PLTYPE_BITFLAGS:
+ ast_event_append_ie_bitflags(&event, ie_val->ie_type, ie_val->payload.uint);
+ break;
case AST_EVENT_IE_PLTYPE_RAW:
- event_append_ie_raw(&event, ie_val->ie_type,
+ ast_event_append_ie_raw(&event, ie_val->ie_type,
ie_val->payload.raw, ie_val->raw_datalen);
break;
+ case AST_EVENT_IE_PLTYPE_EXISTS:
case AST_EVENT_IE_PLTYPE_UNKNOWN:
break;
}
@@ -421,10 +497,27 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
}
}
+ if (!ast_event_get_ie_raw(event, AST_EVENT_IE_EID)) {
+ /* If the event is originating on this server, add the server's
+ * entity ID to the event. */
+ ast_event_append_eid(&event);
+ }
+
return event;
}
+int ast_event_append_eid(struct ast_event **event)
+{
+ return ast_event_append_ie_raw(event, AST_EVENT_IE_EID,
+ &ast_eid_default, sizeof(ast_eid_default));
+}
+
void ast_event_destroy(struct ast_event *event)
{
ast_free(event);
}
+
+size_t ast_event_minimum_length(void)
+{
+ return sizeof(struct ast_event);
+}
diff --git a/main/stasis.c b/main/stasis.c
index 4d05f18e8..5eca791ef 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -686,15 +686,17 @@ struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
from = forward->from_topic;
to = forward->to_topic;
- topic_lock_both(to, from);
- AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
- AST_VECTOR_ELEM_CLEANUP_NOOP);
+ if (from && to) {
+ topic_lock_both(to, from);
+ AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
+ AST_VECTOR_ELEM_CLEANUP_NOOP);
- for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
- topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
+ for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
+ topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
+ }
+ ao2_unlock(from);
+ ao2_unlock(to);
}
- ao2_unlock(from);
- ao2_unlock(to);
ao2_cleanup(forward);
@@ -717,6 +719,11 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
return NULL;
}
+ /* Forwards to ourselves are implicit. */
+ if (to_topic == from_topic) {
+ return ao2_bump(forward);
+ }
+
forward->from_topic = ao2_bump(from_topic);
forward->to_topic = ao2_bump(to_topic);
diff --git a/main/stasis_message.c b/main/stasis_message.c
index 1db2ae97a..6132efc20 100644
--- a/main/stasis_message.c
+++ b/main/stasis_message.c
@@ -187,3 +187,8 @@ struct ast_json *stasis_message_to_json(
{
return INVOKE_VIRTUAL(to_json, msg, sanitize);
}
+
+struct ast_event *stasis_message_to_event(struct stasis_message *msg)
+{
+ return INVOKE_VIRTUAL(to_event, msg);
+} \ No newline at end of file
diff --git a/res/res_corosync.c b/res/res_corosync.c
index ce4165498..6c4a3d1e7 100644
--- a/res/res_corosync.c
+++ b/res/res_corosync.c
@@ -44,20 +44,125 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
+#include "asterisk/app.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
AST_RWLOCK_DEFINE_STATIC(event_types_lock);
+static void publish_mwi_to_stasis(struct ast_event *event);
+static void publish_device_state_to_stasis(struct ast_event *event);
+
+/*! \brief The internal topic used for message forwarding and pings */
+static struct stasis_topic *corosync_aggregate_topic;
+
+/*! \brief Our \ref stasis message router */
+static struct stasis_message_router *stasis_router;
+
+/*! \brief Internal accessor for our topic */
+static struct stasis_topic *corosync_topic(void)
+{
+ return corosync_aggregate_topic;
+}
+
+/*! \brief A payload wrapper around a corosync ping event */
+struct corosync_ping_payload {
+ /*! The corosync ping event being passed over \ref stasis */
+ struct ast_event *event;
+};
+
+/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
+static void corosync_ping_payload_dtor(void *obj)
+{
+ struct corosync_ping_payload *payload = obj;
+
+ ast_free(payload->event);
+}
+
+/*! \brief Convert a Corosync PING to a \ref ast_event */
+static struct ast_event *corosync_ping_to_event(struct stasis_message *message)
+{
+ struct corosync_ping_payload *payload;
+ struct ast_event *event;
+ struct ast_eid *event_eid;
+
+ if (!message) {
+ return NULL;
+ }
+
+ payload = stasis_message_data(message);
+
+ if (!payload->event) {
+ return NULL;
+ }
+
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
+
+ event = ast_event_new(AST_EVENT_PING,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
+ AST_EVENT_IE_END);
+
+ return event;
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
+ .to_event = corosync_ping_to_event, );
+
+/*! \brief Publish a Corosync ping to \ref stasis */
+static void publish_corosync_ping_to_stasis(struct ast_event *event)
+{
+ struct corosync_ping_payload *payload;
+ struct stasis_message *message;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
+ ast_assert(event != NULL);
+
+ payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
+ if (!payload) {
+ return;
+ }
+ payload->event = event;
+
+ message = stasis_message_create(corosync_ping_message_type(), payload);
+ if (!message) {
+ ao2_t_ref(payload, -1, "Destroy payload on off nominal");
+ return;
+ }
+
+ stasis_publish(corosync_topic(), message);
+
+ ao2_t_ref(payload, -1, "Hand ref to stasis");
+ ao2_t_ref(message, -1, "Hand ref to stasis");
+}
+
static struct {
const char *name;
- struct ast_event_sub *sub;
+ struct stasis_forward *sub;
unsigned char publish;
unsigned char publish_default;
unsigned char subscribe;
unsigned char subscribe_default;
+ struct stasis_topic *(* topic_fn)(void);
+ struct stasis_cache *(* cache_fn)(void);
+ struct stasis_message_type *(* message_type_fn)(void);
+ void (* publish_to_stasis)(struct ast_event *);
} event_types[] = {
- [AST_EVENT_MWI] = { .name = "mwi", },
- [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
- [AST_EVENT_PING] = { .name = "ping", .publish_default = 1, .subscribe_default = 1 },
+ [AST_EVENT_MWI] = { .name = "mwi",
+ .topic_fn = ast_mwi_topic_all,
+ .cache_fn = ast_mwi_state_cache,
+ .message_type_fn = ast_mwi_state_type,
+ .publish_to_stasis = publish_mwi_to_stasis, },
+ [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
+ .topic_fn = ast_device_state_topic_all,
+ .cache_fn = ast_device_state_cache,
+ .message_type_fn = ast_device_state_message_type,
+ .publish_to_stasis = publish_device_state_to_stasis, },
+ [AST_EVENT_PING] = { .name = "ping",
+ .publish_default = 1,
+ .subscribe_default = 1,
+ .topic_fn = corosync_topic,
+ .message_type_fn = corosync_ping_message_type,
+ .publish_to_stasis = publish_corosync_ping_to_stasis, },
};
static struct {
@@ -88,6 +193,71 @@ static corosync_cfg_callbacks_t cfg_callbacks = {
.corosync_cfg_shutdown_callback = cfg_shutdown_cb,
};
+/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
+static void publish_mwi_to_stasis(struct ast_event *event)
+{
+ const char *mailbox;
+ const char *context;
+ unsigned int new_msgs;
+ unsigned int old_msgs;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_MWI);
+
+ mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
+ context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
+ new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+ old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
+ return;
+ }
+
+ if (new_msgs > INT_MAX) {
+ new_msgs = INT_MAX;
+ }
+
+ if (old_msgs > INT_MAX) {
+ old_msgs = INT_MAX;
+ }
+
+ if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
+ (int)old_msgs, NULL, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
+ mailbox, context, eid);
+ }
+}
+
+/*! \brief Publish a received device state \ref ast_event to \ref stasis */
+static void publish_device_state_to_stasis(struct ast_event *event)
+{
+ const char *device;
+ enum ast_device_state state;
+ unsigned int cachable;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE);
+
+ device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
+ state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+ cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(device)) {
+ return;
+ }
+
+ if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
+ device, eid);
+ }
+}
+
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
@@ -101,8 +271,6 @@ static cpg_callbacks_t cpg_callbacks = {
.cpg_confchg_fn = cpg_confchg_cb,
};
-static void ast_event_cb(const struct ast_event *event, void *data);
-
#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
static void cfg_state_track_cb(
corosync_cfg_state_notification_buffer_t *notification_buffer,
@@ -120,6 +288,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
struct ast_event *event;
+ void (*publish_handler)(struct ast_event *) = NULL;
+ enum ast_event_type event_type;
if (msg_len < ast_event_minimum_length()) {
ast_debug(1, "Ignoring event that's too small. %u < %u\n",
@@ -133,9 +303,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
return;
}
+ event_type = ast_event_get_type(msg);
+ if (event_type > AST_EVENT_TOTAL) {
+ /* Egads, we don't support this */
+ return;
+ }
+
ast_rwlock_rdlock(&event_types_lock);
- if (!event_types[ast_event_get_type(msg)].subscribe) {
- /* We are not configured to subscribe to these events. */
+ publish_handler = event_types[event_type].publish_to_stasis;
+ if (!event_types[event_type].subscribe || !publish_handler) {
+ /* We are not configured to subscribe to these events or
+ we have no way to publish it internally. */
ast_rwlock_unlock(&event_types_lock);
return;
}
@@ -147,20 +325,80 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
memcpy(event, msg, msg_len);
+ if (event_type == AST_EVENT_PING) {
+ const struct ast_eid *eid;
+ char buf[128] = "";
+
+ eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
+ ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
+ }
+ ast_debug(5, "Publishing event %s (%d) to stasis\n",
+ ast_event_get_type_name(event), event_type);
+ publish_handler(event);
+}
+
+static void publish_to_corosync(struct stasis_message *message)
+{
+ cs_error_t cs_err;
+ struct iovec iov;
+ struct ast_event *event;
+
+ event = stasis_message_to_event(message);
+ if (!event) {
+ return;
+ }
+
+ if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
+ /* If the event didn't originate from this server, don't send it back out. */
+ ast_event_destroy(event);
+ return;
+ }
+
if (ast_event_get_type(event) == AST_EVENT_PING) {
const struct ast_eid *eid;
char buf[128] = "";
eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(cpg_deliver_cb) Got event PING from server with EID: '%s'\n", buf);
+ ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
+ }
+
+ iov.iov_base = (void *)event;
+ iov.iov_len = ast_event_get_size(event);
+
+ ast_debug(5, "Publishing event %s (%d) to corosync\n",
+ ast_event_get_type_name(event), ast_event_get_type(event));
- ast_event_queue(event);
- } else {
- ast_event_queue_and_cache(event);
+ /* The stasis subscription will only exist if we are configured to publish
+ * these events, so just send away. */
+ if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
+ ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
}
}
+static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ if (!message) {
+ return;
+ }
+
+ publish_to_corosync(message);
+}
+
+static int dump_cache_cb(void *obj, void *arg, int flags)
+{
+ struct stasis_message *message = obj;
+
+ if (!message) {
+ return 0;
+ }
+
+ publish_to_corosync(message);
+
+ return 0;
+}
+
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
@@ -176,20 +414,27 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
for (i = 0; i < ARRAY_LEN(event_types); i++) {
- struct ast_event_sub *event_sub;
+ struct ao2_container *messages;
ast_rwlock_rdlock(&event_types_lock);
if (!event_types[i].publish) {
ast_rwlock_unlock(&event_types_lock);
continue;
}
+
+ if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
+ ast_rwlock_unlock(&event_types_lock);
+ continue;
+ }
+
+ messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
+ event_types[i].message_type_fn(),
+ &ast_eid_default);
ast_rwlock_unlock(&event_types_lock);
- event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
- ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
- &ast_eid_default, sizeof(ast_eid_default));
- ast_event_dump_cache(event_sub);
- ast_event_sub_destroy(event_sub);
+ ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
+
+ ao2_t_ref(messages, -1, "Dispose of dumped cache");
}
}
@@ -231,13 +476,13 @@ static void *dispatch_thread_handler(void *data)
if (pfd[0].revents & POLLIN) {
if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
}
}
if (pfd[1].revents & POLLIN) {
if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
}
}
@@ -287,37 +532,6 @@ static void *dispatch_thread_handler(void *data)
return NULL;
}
-static void ast_event_cb(const struct ast_event *event, void *data)
-{
- cs_error_t cs_err;
- struct iovec iov = {
- .iov_base = (void *) event,
- .iov_len = ast_event_get_size(event),
- };
-
- if (ast_event_get_type(event) == AST_EVENT_PING) {
- const struct ast_eid *eid;
- char buf[128] = "";
-
- eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
- ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(ast_event_cb) Got event PING from server with EID: '%s'\n", buf);
- }
-
- if (ast_eid_cmp(&ast_eid_default,
- ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
- /* If the event didn't originate from this server, don't send it back out. */
- return;
- }
-
- /* The ast_event subscription will only exist if we are configured to publish
- * these events, so just send away. */
-
- if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
- ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err);
- }
-}
-
static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
cs_error_t cs_err;
@@ -368,7 +582,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
continue;
}
- ast_cli(a->fd, "=== Node %u\n", i);
+ ast_cli(a->fd, "=== Node %d\n", i);
ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
for (j = 0; j < num_addrs; j++) {
@@ -378,7 +592,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
- ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
+ ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
}
}
@@ -421,7 +635,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_FAILURE;
}
- ast_event_queue(event);
+ ast_rwlock_rdlock(&event_types_lock);
+ event_types[AST_EVENT_PING].publish_to_stasis(event);
+ ast_rwlock_unlock(&event_types_lock);
return CLI_SUCCESS;
}
@@ -532,11 +748,16 @@ static int load_general_config(struct ast_config *cfg)
for (i = 0; i < ARRAY_LEN(event_types); i++) {
if (event_types[i].publish && !event_types[i].sub) {
- event_types[i].sub = ast_event_subscribe(i,
- ast_event_cb, "Corosync", NULL,
- AST_EVENT_IE_END);
+ event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(),
+ corosync_topic());
+ stasis_message_router_add(stasis_router,
+ event_types[i].message_type_fn(),
+ stasis_message_cb,
+ NULL);
} else if (!event_types[i].publish && event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
}
}
@@ -577,14 +798,32 @@ static void cleanup_module(void)
cs_error_t cs_err;
unsigned int i;
- for (i = 0; i < ARRAY_LEN(event_types); i++) {
- if (event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ if (stasis_router) {
+
+ /* Unsubscribe all topic forwards and cancel all message routes */
+ ast_rwlock_wrlock(&event_types_lock);
+ for (i = 0; i < ARRAY_LEN(event_types); i++) {
+ if (event_types[i].sub) {
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
+ }
+ event_types[i].publish = 0;
+ event_types[i].subscribe = 0;
}
- event_types[i].publish = 0;
- event_types[i].subscribe = 0;
+ ast_rwlock_unlock(&event_types_lock);
+
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+ }
+
+ if (corosync_aggregate_topic) {
+ ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
+ corosync_aggregate_topic = NULL;
}
+ STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
+
if (dispatch_thread.id != AST_PTHREADT_NULL) {
char meepmeep = 'x';
dispatch_thread.stop = 1;
@@ -623,13 +862,30 @@ static int load_module(void)
enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
struct cpg_name name;
+ corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
+ if (!corosync_aggregate_topic) {
+ ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
+ goto failed;
+ }
+
+ stasis_router = stasis_message_router_create(corosync_aggregate_topic);
+ if (!stasis_router) {
+ ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
+ goto failed;
+ }
+
+ if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
+ ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
+ goto failed;
+ }
+
if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
- return AST_MODULE_LOAD_DECLINE;
+ ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
+ goto failed;
}
if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
goto failed;
}
@@ -637,7 +893,7 @@ static int load_module(void)
name.length = strlen(name.value);
if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
goto failed;
}