summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-05-22 12:01:37 +0000
committerMatthew Jordan <mjordan@digium.com>2014-05-22 12:01:37 +0000
commit9cee08f50240b7da7ddd5efc04a8ef0d31292e7a (patch)
tree0180f85de28dce14f04ed17a979152bf8d362d9d
parent3bac303dc95afbe60069247eda469d9b48cfd701 (diff)
res_corosync: Update module to work with Stasis (and compile)
This patch fixes res_corosync such that it works with Asterisk 12. This restores the functionality that was present in previous versions of Asterisk, and ensures compatibility with those versions by restoring the binary message format needed to pass information from/to them. The following changes were made in the core to support this: * The event system has been partially restored. All event definition and event types in this patch were pulled from Asterisk 11. Previously, we had hoped that this information would live in res_corosync; however, the approach in this patch seems to be better for a few reasons: (1) Theoretically, ast_events can be used by any module as a binary representation of a Stasis message. Given the structure of an ast_event object, that information has to live in the core to be used universally. For example, defining the payload of a device state ast_event in res_corosync could result in an incompatible device state representation in another module. (2) Much of this representation already lived in the core, and was not easily extensible. (3) The code already existed. :-) * Stasis message types now have a message formatter that converts their payload to an ast_event object. * Stasis message forwarders now handle forwarding to themselves. Previously this would result in an infinite recursive call. Now, this simply creates a new forwarding object with no forwards set up (as it is the thing it is forwarding to). This is advantageous for res_corosync, as returning NULL would also imply an unrecoverable error. Returning a subscription in this case allows for easier handling of message types that are published directly to an aggregate topic that has forwarders. Review: https://reviewboard.asterisk.org/r/3486/ ASTERISK-22912 #close ASTERISK-22372 #close ........ Merged revisions 414330 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@414331 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r--include/asterisk/devicestate.h2
-rw-r--r--include/asterisk/event.h127
-rw-r--r--include/asterisk/event_defs.h238
-rw-r--r--include/asterisk/stasis.h31
-rw-r--r--main/app.c35
-rw-r--r--main/devicestate.c33
-rw-r--r--main/event.c123
-rw-r--r--main/stasis.c21
-rw-r--r--main/stasis_message.c5
-rw-r--r--res/res_corosync.c390
10 files changed, 804 insertions, 201 deletions
diff --git a/include/asterisk/devicestate.h b/include/asterisk/devicestate.h
index ccc46311e..4f9aa739b 100644
--- a/include/asterisk/devicestate.h
+++ b/include/asterisk/devicestate.h
@@ -59,7 +59,7 @@ enum ast_device_state {
AST_DEVICE_RINGING, /*!< Device is ringing */
AST_DEVICE_RINGINUSE, /*!< Device is ringing *and* in use */
AST_DEVICE_ONHOLD, /*!< Device is on hold */
- AST_DEVICE_TOTAL, /*/ Total num of device states, used for testing */
+ AST_DEVICE_TOTAL, /*!< Total num of device states, used for testing */
};
/*! \brief Device State Cachability
diff --git a/include/asterisk/event.h b/include/asterisk/event.h
index 3178de5c2..7eea0581d 100644
--- a/include/asterisk/event.h
+++ b/include/asterisk/event.h
@@ -25,33 +25,27 @@
/*!
* \page AstGenericEvents Generic event system
*
- * The purpose of this API is to provide a generic way to share events between
- * Asterisk modules. Code can generate events, and other code can subscribe to
- * them.
+ * Prior to the creation of \ref stasis, the purpose of this API was to provide
+ * a generic way to share events between Asterisk modules. Once there was a need
+ * to disseminate data whose definition was provided by the producers/consumers,
+ * it was no longer possible to use the binary representation in the generic
+ * event system.
+ *
+ * That aside, the generic event system is still useful and used by several
+ * modules in Asterisk.
+ * - CEL uses the \ref ast_event representation to pass information to registered
+ * backends.
+ * - The \file res_corosync module publishes \ref ast_event representations of
+ * information to other Asterisk instances in a cluster.
+ * - Security event represent their event types and data using this system.
+ * - Theoretically, any \ref stasis message can use this system to pass
+ * information around in a binary format.
*
* Events have an associated event type, as well as information elements. The
* information elements are the meta data that go along with each event. For
* example, in the case of message waiting indication, the event type is MWI,
* and each MWI event contains at least three information elements: the
* mailbox, the number of new messages, and the number of old messages.
- *
- * Subscriptions to events consist of an event type and information elements,
- * as well. Subscriptions can be to all events, or a certain subset of events.
- * If an event type is provided, only events of that type will be sent to this
- * subscriber. Furthermore, if information elements are supplied with the
- * subscription, only events that contain the specified information elements
- * with specified values will be sent to the subscriber. For example, when a
- * SIP phone subscribes to MWI for mailbox 1234, then chan_sip can subscribe
- * to internal Asterisk MWI events with the MAILBOX information element with
- * a value of "1234".
- *
- * Another key feature of this event system is the ability to cache events.
- * It is useful for some types of events to be able to remember the last known
- * value. These are usually events that indicate some kind of state change.
- * In the example of MWI, app_voicemail can instruct the event core to cache
- * these events based on the mailbox. So, the last known MWI state of each
- * mailbox will be cached, and other modules can retrieve this information
- * on demand without having to poll the mailbox directly.
*/
#ifndef AST_EVENT_H
@@ -109,9 +103,6 @@ struct ast_event *ast_event_new(enum ast_event_type event_type, ...);
*
* \return Nothing
*
- * \note Events that have been queued should *not* be destroyed by the code that
- * created the event. It will be automatically destroyed after being
- * dispatched to the appropriate subscribers.
*/
void ast_event_destroy(struct ast_event *event);
@@ -150,6 +141,55 @@ int ast_event_append_ie_uint(struct ast_event **event, enum ast_event_ie_type ie
uint32_t data);
/*!
+ * \brief Append an information element that has a bitflags payload
+ *
+ * \param event the event that the IE will be appended to
+ * \param ie_type the type of IE to append
+ * \param bitflags the flags that are the payload of the IE
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ * \since 1.8
+ *
+ * The pointer to the event will get updated with the new location for the event
+ * that now contains the appended information element. If the re-allocation of
+ * the memory for this event fails, it will be set to NULL.
+ */
+int ast_event_append_ie_bitflags(struct ast_event **event, enum ast_event_ie_type ie_type,
+ uint32_t bitflags);
+
+/*!
+ * \brief Append an information element that has a raw payload
+ *
+ * \param event the event that the IE will be appended to
+ * \param ie_type the type of IE to append
+ * \param data A pointer to the raw data for the payload of the IE
+ * \param data_len The amount of data to copy into the payload
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ *
+ * The pointer to the event will get updated with the new location for the event
+ * that now contains the appended information element. If the re-allocation of
+ * the memory for this event fails, it will be set to NULL.
+ */
+int ast_event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
+ const void *data, size_t data_len);
+
+/*!
+ * \brief Append the global EID IE
+ *
+ * \param event the event to append IE to
+ *
+ * \note For ast_event_new() that includes IEs, this is done automatically
+ * for you.
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int ast_event_append_eid(struct ast_event **event);
+
+/*!
* \brief Get the value of an information element that has an integer payload
*
* \param event The event to get the IE from
@@ -173,6 +213,38 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_
const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type);
/*!
+ * \brief Get the value of an information element that has a raw payload
+ *
+ * \param event The event to get the IE from
+ * \param ie_type the type of information element to retrieve
+ *
+ * \return This returns the payload of the information element with the given type.
+ * If the information element isn't found, NULL will be returned.
+ */
+const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type);
+
+/*!
+ * \brief Get the length of the raw payload for a particular IE
+ *
+ * \param event The event to get the IE payload length from
+ * \param ie_type the type of information element to get the length of
+ *
+ * \return If an IE of type ie_type is found, its payload length is returned.
+ * Otherwise, 0 is returned.
+ */
+uint16_t ast_event_get_ie_raw_payload_len(const struct ast_event *event, enum ast_event_ie_type ie_type);
+
+/*!
+ * \brief Get the string representation of the type of the given event
+ *
+ * \arg event the event to get the type of
+ *
+ * \return the string representation of the event type of the provided event
+ * \since 1.6.1
+ */
+const char *ast_event_get_type_name(const struct ast_event *event);
+
+/*!
* \brief Get the string representation of an information element type
*
* \param ie_type the information element type to get the string representation of
@@ -273,6 +345,13 @@ uint32_t ast_event_iterator_get_ie_uint(struct ast_event_iterator *iterator);
*/
const char *ast_event_iterator_get_ie_str(struct ast_event_iterator *iterator);
+/*!
+ * \brief Get the minimum length of an ast_event.
+ *
+ * \return minimum amount of memory that will be consumed by any ast_event.
+ */
+size_t ast_event_minimum_length(void);
+
#if defined(__cplusplus) || defined(c_plusplus)
}
#endif
diff --git a/include/asterisk/event_defs.h b/include/asterisk/event_defs.h
index f8f98ace0..80a8d7dda 100644
--- a/include/asterisk/event_defs.h
+++ b/include/asterisk/event_defs.h
@@ -25,23 +25,41 @@
#ifndef AST_EVENT_DEFS_H
#define AST_EVENT_DEFS_H
-/*! \brief Event types
- * \note These values no longer go over the wire and can change when items are removed. */
enum ast_event_type {
/*! Reserved to provide the ability to subscribe to all events. A specific
* event should never have a payload of 0. */
AST_EVENT_ALL = 0x00,
/*! This event type is reserved for use by third-party modules to create
- * custom events without having to modify this file.
+ * custom events without having to modify this file.
* \note There are no "custom" IE types, because IEs only have to be
* unique to the event itself, not necessarily across all events. */
AST_EVENT_CUSTOM = 0x01,
+ /*! Voicemail message waiting indication */
+ AST_EVENT_MWI = 0x02,
/*! Someone has subscribed to events */
- AST_EVENT_SUB = 0x02,
+ AST_EVENT_SUB = 0x03,
+ /*! Someone has unsubscribed from events */
+ AST_EVENT_UNSUB = 0x04,
+ /*! The aggregate state of a device across all servers configured to be
+ * a part of a device state cluster has changed. */
+ AST_EVENT_DEVICE_STATE = 0x05,
+ /*! The state of a device has changed on _one_ server. This should not be used
+ * directly, in general. Use AST_EVENT_DEVICE_STATE instead. */
+ AST_EVENT_DEVICE_STATE_CHANGE = 0x06,
/*! Channel Event Logging events */
- AST_EVENT_CEL = 0x03,
+ AST_EVENT_CEL = 0x07,
+ /*! A report of a security related event (see security_events.h) */
+ AST_EVENT_SECURITY = 0x08,
+ /*! Used by res_stun_monitor to alert listeners to an exernal network address change. */
+ AST_EVENT_NETWORK_CHANGE = 0x09,
+ /*! The presence state for a presence provider */
+ AST_EVENT_PRESENCE_STATE = 0x0a,
+ /*! Used to alert listeners when a named ACL has changed. */
+ AST_EVENT_ACL_CHANGE = 0x0b,
+ /*! Send out a ping for debugging distributed events */
+ AST_EVENT_PING = 0x0c,
/*! Number of event types. This should be the last event type + 1 */
- AST_EVENT_TOTAL = 0x04,
+ AST_EVENT_TOTAL = 0x0d,
};
/*! \brief Event Information Element types */
@@ -49,199 +67,243 @@ enum ast_event_ie_type {
/*! Used to terminate the arguments to event functions */
AST_EVENT_IE_END = -1,
- /*!
+ /*!
+ * \brief Number of new messages
+ * Used by: AST_EVENT_MWI
+ * Payload type: UINT
+ */
+ AST_EVENT_IE_NEWMSGS = 0x0001,
+ /*!
+ * \brief Number of
+ * Used by: AST_EVENT_MWI
+ * Payload type: UINT
+ */
+ AST_EVENT_IE_OLDMSGS = 0x0002,
+ /*!
+ * \brief Mailbox name \verbatim (mailbox[@context]) \endverbatim
+ * Used by: AST_EVENT_MWI
+ * Payload type: STR
+ */
+ AST_EVENT_IE_MAILBOX = 0x0003,
+ /*!
* \brief Unique ID
* Used by: AST_EVENT_SUB, AST_EVENT_UNSUB
* Payload type: UINT
*/
- AST_EVENT_IE_UNIQUEID = 0x0001,
- /*!
- * \brief Event type
+ AST_EVENT_IE_UNIQUEID = 0x0004,
+ /*!
+ * \brief Event type
* Used by: AST_EVENT_SUB, AST_EVENT_UNSUB
* Payload type: UINT
*/
- AST_EVENT_IE_EVENTTYPE = 0x0002,
+ AST_EVENT_IE_EVENTTYPE = 0x0005,
/*!
* \brief Hint that someone cares that an IE exists
* Used by: AST_EVENT_SUB
* Payload type: UINT (ast_event_ie_type)
*/
- AST_EVENT_IE_EXISTS = 0x0003,
- /*!
- * \brief Context IE
- * Used by AST_EVENT_MWI
- * Payload type: str
- */
- AST_EVENT_IE_CONTEXT = 0x0004,
- /*!
+ AST_EVENT_IE_EXISTS = 0x0006,
+ /*!
+ * \brief Device Name
+ * Used by AST_EVENT_DEVICE_STATE_CHANGE
+ * Payload type: STR
+ */
+ AST_EVENT_IE_DEVICE = 0x0007,
+ /*!
+ * \brief Generic State IE
+ * Used by AST_EVENT_DEVICE_STATE_CHANGE
+ * Payload type: UINT
+ * The actual state values depend on the event which
+ * this IE is a part of.
+ */
+ AST_EVENT_IE_STATE = 0x0008,
+ /*!
+ * \brief Context IE
+ * Used by AST_EVENT_MWI
+ * Payload type: str
+ */
+ AST_EVENT_IE_CONTEXT = 0x0009,
+ /*!
* \brief Channel Event Type
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_EVENT_TYPE = 0x0005,
- /*!
+ AST_EVENT_IE_CEL_EVENT_TYPE = 0x000a,
+ /*!
* \brief Channel Event Time (seconds)
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_EVENT_TIME = 0x0006,
- /*!
+ AST_EVENT_IE_CEL_EVENT_TIME = 0x000b,
+ /*!
* \brief Channel Event Time (micro-seconds)
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_EVENT_TIME_USEC = 0x0007,
- /*!
+ AST_EVENT_IE_CEL_EVENT_TIME_USEC = 0x000c,
+ /*!
* \brief Channel Event User Event Name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_USEREVENT_NAME = 0x0008,
- /*!
+ AST_EVENT_IE_CEL_USEREVENT_NAME = 0x000d,
+ /*!
* \brief Channel Event CID name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDNAME = 0x0009,
- /*!
+ AST_EVENT_IE_CEL_CIDNAME = 0x000e,
+ /*!
* \brief Channel Event CID num
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDNUM = 0x000a,
- /*!
+ AST_EVENT_IE_CEL_CIDNUM = 0x000f,
+ /*!
* \brief Channel Event extension name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_EXTEN = 0x000b,
- /*!
+ AST_EVENT_IE_CEL_EXTEN = 0x0010,
+ /*!
* \brief Channel Event context name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CONTEXT = 0x000c,
- /*!
+ AST_EVENT_IE_CEL_CONTEXT = 0x0011,
+ /*!
* \brief Channel Event channel name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CHANNAME = 0x000d,
- /*!
+ AST_EVENT_IE_CEL_CHANNAME = 0x0012,
+ /*!
* \brief Channel Event app name
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_APPNAME = 0x000e,
- /*!
+ AST_EVENT_IE_CEL_APPNAME = 0x0013,
+ /*!
* \brief Channel Event app args/data
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_APPDATA = 0x000f,
- /*!
+ AST_EVENT_IE_CEL_APPDATA = 0x0014,
+ /*!
* \brief Channel Event AMA flags
* Used by: AST_EVENT_CEL
* Payload type: UINT
*/
- AST_EVENT_IE_CEL_AMAFLAGS = 0x0010,
- /*!
+ AST_EVENT_IE_CEL_AMAFLAGS = 0x0015,
+ /*!
* \brief Channel Event AccountCode
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_ACCTCODE = 0x0011,
- /*!
+ AST_EVENT_IE_CEL_ACCTCODE = 0x0016,
+ /*!
* \brief Channel Event UniqueID
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_UNIQUEID = 0x0012,
- /*!
+ AST_EVENT_IE_CEL_UNIQUEID = 0x0017,
+ /*!
* \brief Channel Event Userfield
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_USERFIELD = 0x0013,
- /*!
+ AST_EVENT_IE_CEL_USERFIELD = 0x0018,
+ /*!
* \brief Channel Event CID ANI field
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDANI = 0x0014,
- /*!
+ AST_EVENT_IE_CEL_CIDANI = 0x0019,
+ /*!
* \brief Channel Event CID RDNIS field
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDRDNIS = 0x0015,
- /*!
+ AST_EVENT_IE_CEL_CIDRDNIS = 0x001a,
+ /*!
* \brief Channel Event CID dnid
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_CIDDNID = 0x0016,
- /*!
+ AST_EVENT_IE_CEL_CIDDNID = 0x001b,
+ /*!
* \brief Channel Event Peer -- for Things involving multiple channels, like BRIDGE
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_PEER = 0x0017,
- /*!
+ AST_EVENT_IE_CEL_PEER = 0x001c,
+ /*!
* \brief Channel Event LinkedID
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_LINKEDID = 0x0018,
- /*!
+ AST_EVENT_IE_CEL_LINKEDID = 0x001d,
+ /*!
* \brief Channel Event peeraccount
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_PEERACCT = 0x0019,
- /*!
+ AST_EVENT_IE_CEL_PEERACCT = 0x001e,
+ /*!
* \brief Channel Event extra data
* Used by: AST_EVENT_CEL
* Payload type: STR
*/
- AST_EVENT_IE_CEL_EXTRA = 0x001a,
+ AST_EVENT_IE_CEL_EXTRA = 0x001f,
/*!
* \brief Description
* Used by: AST_EVENT_SUB, AST_EVENT_UNSUB
* Payload type: STR
*/
- AST_EVENT_IE_DESCRIPTION = 0x001b,
+ AST_EVENT_IE_DESCRIPTION = 0x0020,
/*!
* \brief Entity ID
* Used by All events
* Payload type: RAW
* This IE indicates which server the event originated from
*/
- AST_EVENT_IE_EVENT_VERSION = 0x001c,
- AST_EVENT_IE_SERVICE = 0x001d,
- AST_EVENT_IE_MODULE = 0x001e,
- AST_EVENT_IE_ACCOUNT_ID = 0x001f,
- AST_EVENT_IE_SESSION_ID = 0x0020,
- AST_EVENT_IE_SESSION_TV = 0x0021,
- AST_EVENT_IE_ACL_NAME = 0x0022,
- AST_EVENT_IE_LOCAL_ADDR = 0x0023,
- AST_EVENT_IE_REMOTE_ADDR = 0x0024,
- AST_EVENT_IE_EVENT_TV = 0x0025,
- AST_EVENT_IE_REQUEST_TYPE = 0x0026,
- AST_EVENT_IE_REQUEST_PARAMS = 0x0027,
- AST_EVENT_IE_AUTH_METHOD = 0x0028,
- AST_EVENT_IE_SEVERITY = 0x0029,
- AST_EVENT_IE_EXPECTED_ADDR = 0x002a,
- AST_EVENT_IE_CHALLENGE = 0x002b,
- AST_EVENT_IE_RESPONSE = 0x002c,
- AST_EVENT_IE_EXPECTED_RESPONSE = 0x002e,
- AST_EVENT_IE_RECEIVED_CHALLENGE = 0x002f,
- AST_EVENT_IE_RECEIVED_HASH = 0x0030,
- AST_EVENT_IE_USING_PASSWORD = 0x0031,
- AST_EVENT_IE_ATTEMPTED_TRANSPORT = 0x0032,
+ AST_EVENT_IE_EID = 0x0021,
+ AST_EVENT_IE_SECURITY_EVENT = 0x0022,
+ AST_EVENT_IE_EVENT_VERSION = 0x0023,
+ AST_EVENT_IE_SERVICE = 0x0024,
+ AST_EVENT_IE_MODULE = 0x0025,
+ AST_EVENT_IE_ACCOUNT_ID = 0x0026,
+ AST_EVENT_IE_SESSION_ID = 0x0027,
+ AST_EVENT_IE_SESSION_TV = 0x0028,
+ AST_EVENT_IE_ACL_NAME = 0x0029,
+ AST_EVENT_IE_LOCAL_ADDR = 0x002a,
+ AST_EVENT_IE_REMOTE_ADDR = 0x002b,
+ AST_EVENT_IE_EVENT_TV = 0x002c,
+ AST_EVENT_IE_REQUEST_TYPE = 0x002d,
+ AST_EVENT_IE_REQUEST_PARAMS = 0x002e,
+ AST_EVENT_IE_AUTH_METHOD = 0x002f,
+ AST_EVENT_IE_SEVERITY = 0x0030,
+ AST_EVENT_IE_EXPECTED_ADDR = 0x0031,
+ AST_EVENT_IE_CHALLENGE = 0x0032,
+ AST_EVENT_IE_RESPONSE = 0x0033,
+ AST_EVENT_IE_EXPECTED_RESPONSE = 0x0034,
+ AST_EVENT_IE_RECEIVED_CHALLENGE = 0x0035,
+ AST_EVENT_IE_RECEIVED_HASH = 0x0036,
+ AST_EVENT_IE_USING_PASSWORD = 0x0037,
+ AST_EVENT_IE_ATTEMPTED_TRANSPORT = 0x0038,
+ AST_EVENT_IE_PRESENCE_PROVIDER = 0x0039,
+ AST_EVENT_IE_PRESENCE_STATE = 0x003a,
+ AST_EVENT_IE_PRESENCE_SUBTYPE = 0x003b,
+ AST_EVENT_IE_PRESENCE_MESSAGE = 0x003c,
+ /*!
+ * \brief Event non-cachability flag
+ * Used by: All events
+ * Payload type: UINT
+ */
+ AST_EVENT_IE_CACHABLE = 0x003d,
/*! \brief Must be the last IE value +1 */
- AST_EVENT_IE_TOTAL = 0x0033,
+ AST_EVENT_IE_TOTAL = 0x003e,
};
/*!
@@ -249,12 +311,16 @@ enum ast_event_ie_type {
*/
enum ast_event_ie_pltype {
AST_EVENT_IE_PLTYPE_UNKNOWN = -1,
+ /*! Just check if it exists, not the value */
+ AST_EVENT_IE_PLTYPE_EXISTS,
/*! Unsigned Integer (Can be used for signed, too ...) */
AST_EVENT_IE_PLTYPE_UINT,
/*! String */
AST_EVENT_IE_PLTYPE_STR,
/*! Raw data, compared with memcmp */
AST_EVENT_IE_PLTYPE_RAW,
+ /*! Bit flags (unsigned integer, compared using boolean logic) */
+ AST_EVENT_IE_PLTYPE_BITFLAGS,
};
/*!
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 20870e6d6..f5b4a60d9 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -171,6 +171,7 @@
#include "asterisk/json.h"
#include "asterisk/manager.h"
#include "asterisk/utils.h"
+#include "asterisk/event.h"
/*! @{ */
@@ -255,6 +256,21 @@ struct stasis_message_vtable {
*/
struct ast_manager_event_blob *(*to_ami)(
struct stasis_message *message);
+
+ /*!
+ * \since 12.3.0
+ * \brief Build the \ref ast_event representation of the message.
+ *
+ * May be \c NULL, or may return \c NULL, to indicate no representation.
+ * The returned object should be free'd.
+ *
+ * \param message Message to convert to an \ref ast_event.
+ * \return Newly allocated \ref ast_event.
+ * \return \c NULL on error.
+ * \return \c NULL if AMI format is not supported.
+ */
+ struct ast_event *(*to_event)(
+ struct stasis_message *message);
};
/*!
@@ -389,6 +405,19 @@ struct ast_json *stasis_message_to_json(struct stasis_message *message, struct s
struct ast_manager_event_blob *stasis_message_to_ami(
struct stasis_message *message);
+/*!
+ * \brief Build the \ref AstGenericEvents representation of the message.
+ *
+ * May return \c NULL, to indicate no representation. The returned object should
+ * be disposed of via \ref ast_event_destroy.
+ *
+ * \param message Message to convert to AMI.
+ * \return \c NULL on error.
+ * \return \c NULL if AMI format is not supported.
+ */
+struct ast_event *stasis_message_to_event(
+ struct stasis_message *message);
+
/*! @} */
/*! @{ */
@@ -1020,6 +1049,7 @@ void stasis_log_bad_type_access(const char *name);
* STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
* .to_ami = foo_to_ami,
* .to_json = foo_to_json,
+ * .to_event = foo_to_event,
* );
* \endcode
*
@@ -1046,6 +1076,7 @@ void stasis_log_bad_type_access(const char *name);
* STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
* .to_ami = foo_to_ami,
* .to_json = foo_to_json,
+ * .to_event = foo_to_event,
* );
* \endcode
*
diff --git a/main/app.c b/main/app.c
index a5c4c7af5..3c2f33c51 100644
--- a/main/app.c
+++ b/main/app.c
@@ -96,10 +96,43 @@ static struct stasis_topic *queue_topic_all;
static struct stasis_topic_pool *queue_topic_pool;
/* @} */
+/*! \brief Convert a MWI \ref stasis_message to a \ref ast_event */
+static struct ast_event *mwi_to_event(struct stasis_message *message)
+{
+ struct ast_event *event;
+ struct ast_mwi_state *mwi_state;
+ char *mailbox;
+ char *context;
+
+ if (!message) {
+ return NULL;
+ }
+
+ mwi_state = stasis_message_data(message);
+
+ /* Strip off @context */
+ context = mailbox = ast_strdupa(mwi_state->uniqueid);
+ strsep(&context, "@");
+ if (ast_strlen_zero(context)) {
+ context = "default";
+ }
+
+ event = ast_event_new(AST_EVENT_MWI,
+ AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
+ AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
+ AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, mwi_state->new_msgs,
+ AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, mwi_state->old_msgs,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, &mwi_state->eid, sizeof(mwi_state->eid),
+ AST_EVENT_IE_END);
+
+ return event;
+}
+
/*
* @{ \brief Define \ref stasis message types for MWI
*/
-STASIS_MESSAGE_TYPE_DEFN(ast_mwi_state_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_mwi_state_type,
+ .to_event = mwi_to_event, );
STASIS_MESSAGE_TYPE_DEFN(ast_mwi_vm_app_type);
/* @} */
diff --git a/main/devicestate.c b/main/devicestate.c
index 1199e68ab..3580b1af7 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -224,9 +224,12 @@ static struct stasis_caching_topic *device_state_topic_cached;
static struct stasis_topic_pool *device_state_topic_pool;
static struct ast_manager_event_blob *devstate_to_ami(struct stasis_message *msg);
+static struct ast_event *devstate_to_event(struct stasis_message *msg);
+
STASIS_MESSAGE_TYPE_DEFN(ast_device_state_message_type,
.to_ami = devstate_to_ami,
+ .to_event = devstate_to_event,
);
/* Forward declarations */
@@ -925,3 +928,33 @@ static struct ast_manager_event_blob *devstate_to_ami(struct stasis_message *msg
"State: %s\r\n",
dev_state->device, ast_devstate_str(dev_state->state));
}
+
+/*! \brief Convert a \ref stasis_message to a \ref ast_event */
+static struct ast_event *devstate_to_event(struct stasis_message *message)
+{
+ struct ast_event *event;
+ struct ast_device_state_message *device_state;
+
+ if (!message) {
+ return NULL;
+ }
+
+ device_state = stasis_message_data(message);
+
+ if (device_state->eid) {
+ event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device_state->device,
+ AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, device_state->state,
+ AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, device_state->cachable,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, device_state->eid, sizeof(*device_state->eid),
+ AST_EVENT_IE_END);
+ } else {
+ event = ast_event_new(AST_EVENT_DEVICE_STATE,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device_state->device,
+ AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, device_state->state,
+ AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, device_state->cachable,
+ AST_EVENT_IE_END);
+ }
+
+ return event;
+}
diff --git a/main/event.c b/main/event.c
index 990f62161..876e53b88 100644
--- a/main/event.c
+++ b/main/event.c
@@ -44,10 +44,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
-static int event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
- const void *data, size_t data_len);
-static const void *event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type);
-
/*!
* \brief An event information element
*
@@ -109,19 +105,42 @@ struct ast_event_ie_val {
size_t raw_datalen;
};
-struct ie_map {
- enum ast_event_ie_pltype ie_pltype;
- const char *name;
+/*!
+ * \brief Event Names
+ */
+static const char * const event_names[AST_EVENT_TOTAL] = {
+ [AST_EVENT_ALL] = "All",
+ [AST_EVENT_CUSTOM] = "Custom",
+ [AST_EVENT_MWI] = "MWI",
+ [AST_EVENT_SUB] = "Subscription",
+ [AST_EVENT_UNSUB] = "Unsubscription",
+ [AST_EVENT_DEVICE_STATE] = "DeviceState",
+ [AST_EVENT_DEVICE_STATE_CHANGE] = "DeviceStateChange",
+ [AST_EVENT_CEL] = "CEL",
+ [AST_EVENT_SECURITY] = "Security",
+ [AST_EVENT_NETWORK_CHANGE] = "NetworkChange",
+ [AST_EVENT_PRESENCE_STATE] = "PresenceState",
+ [AST_EVENT_ACL_CHANGE] = "ACLChange",
+ [AST_EVENT_PING] = "Ping",
};
/*!
* \brief IE payload types and names
*/
-static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = {
+static const struct ie_map {
+ enum ast_event_ie_pltype ie_pltype;
+ const char *name;
+} ie_maps[AST_EVENT_IE_TOTAL] = {
+ [AST_EVENT_IE_NEWMSGS] = { AST_EVENT_IE_PLTYPE_UINT, "NewMessages" },
+ [AST_EVENT_IE_OLDMSGS] = { AST_EVENT_IE_PLTYPE_UINT, "OldMessages" },
+ [AST_EVENT_IE_MAILBOX] = { AST_EVENT_IE_PLTYPE_STR, "Mailbox" },
[AST_EVENT_IE_UNIQUEID] = { AST_EVENT_IE_PLTYPE_UINT, "UniqueID" },
[AST_EVENT_IE_EVENTTYPE] = { AST_EVENT_IE_PLTYPE_UINT, "EventType" },
[AST_EVENT_IE_EXISTS] = { AST_EVENT_IE_PLTYPE_UINT, "Exists" },
+ [AST_EVENT_IE_DEVICE] = { AST_EVENT_IE_PLTYPE_STR, "Device" },
+ [AST_EVENT_IE_STATE] = { AST_EVENT_IE_PLTYPE_UINT, "State" },
[AST_EVENT_IE_CONTEXT] = { AST_EVENT_IE_PLTYPE_STR, "Context" },
+ [AST_EVENT_IE_EID] = { AST_EVENT_IE_PLTYPE_RAW, "EntityID" },
[AST_EVENT_IE_CEL_EVENT_TYPE] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventType" },
[AST_EVENT_IE_CEL_EVENT_TIME] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventTime" },
[AST_EVENT_IE_CEL_EVENT_TIME_USEC] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventTimeUSec" },
@@ -144,6 +163,7 @@ static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = {
[AST_EVENT_IE_CEL_LINKEDID] = { AST_EVENT_IE_PLTYPE_STR, "CELLinkedID" },
[AST_EVENT_IE_CEL_PEERACCT] = { AST_EVENT_IE_PLTYPE_STR, "CELPeerAcct" },
[AST_EVENT_IE_CEL_EXTRA] = { AST_EVENT_IE_PLTYPE_STR, "CELExtra" },
+ [AST_EVENT_IE_SECURITY_EVENT] = { AST_EVENT_IE_PLTYPE_STR, "SecurityEvent" },
[AST_EVENT_IE_EVENT_VERSION] = { AST_EVENT_IE_PLTYPE_UINT, "EventVersion" },
[AST_EVENT_IE_SERVICE] = { AST_EVENT_IE_PLTYPE_STR, "Service" },
[AST_EVENT_IE_MODULE] = { AST_EVENT_IE_PLTYPE_STR, "Module" },
@@ -166,8 +186,27 @@ static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = {
[AST_EVENT_IE_RECEIVED_HASH] = { AST_EVENT_IE_PLTYPE_STR, "ReceivedHash" },
[AST_EVENT_IE_USING_PASSWORD] = { AST_EVENT_IE_PLTYPE_UINT, "UsingPassword" },
[AST_EVENT_IE_ATTEMPTED_TRANSPORT] = { AST_EVENT_IE_PLTYPE_STR, "AttemptedTransport" },
+ [AST_EVENT_IE_CACHABLE] = { AST_EVENT_IE_PLTYPE_UINT, "Cachable" },
+ [AST_EVENT_IE_PRESENCE_PROVIDER] = { AST_EVENT_IE_PLTYPE_STR, "PresenceProvider" },
+ [AST_EVENT_IE_PRESENCE_STATE] = { AST_EVENT_IE_PLTYPE_UINT, "PresenceState" },
+ [AST_EVENT_IE_PRESENCE_SUBTYPE] = { AST_EVENT_IE_PLTYPE_STR, "PresenceSubtype" },
+ [AST_EVENT_IE_PRESENCE_MESSAGE] = { AST_EVENT_IE_PLTYPE_STR, "PresenceMessage" },
};
+const char *ast_event_get_type_name(const struct ast_event *event)
+{
+ enum ast_event_type type;
+
+ type = ast_event_get_type(event);
+
+ if (type < 0 || type >= ARRAY_LEN(event_names)) {
+ ast_log(LOG_ERROR, "Invalid event type - '%d'\n", type);
+ return "";
+ }
+
+ return event_names[type];
+}
+
const char *ast_event_get_ie_type_name(enum ast_event_ie_type ie_type)
{
if (ie_type <= 0 || ie_type >= ARRAY_LEN(ie_maps)) {
@@ -257,7 +296,7 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_
{
const uint32_t *ie_val;
- ie_val = event_get_ie_raw(event, ie_type);
+ ie_val = ast_event_get_ie_raw(event, ie_type);
return ie_val ? ntohl(get_unaligned_uint32(ie_val)) : 0;
}
@@ -266,12 +305,12 @@ const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_i
{
const struct ast_event_ie_str_payload *str_payload;
- str_payload = event_get_ie_raw(event, ie_type);
+ str_payload = ast_event_get_ie_raw(event, ie_type);
return str_payload ? str_payload->str : NULL;
}
-static const void *event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
+const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
{
struct ast_event_iterator iterator;
int res;
@@ -285,6 +324,26 @@ static const void *event_get_ie_raw(const struct ast_event *event, enum ast_even
return NULL;
}
+static uint16_t event_iterator_get_ie_raw_payload_len(struct ast_event_iterator *iterator)
+{
+ return ntohs(iterator->ie->ie_payload_len);
+}
+
+uint16_t ast_event_get_ie_raw_payload_len(const struct ast_event *event, enum ast_event_ie_type ie_type)
+{
+ struct ast_event_iterator iterator;
+ int res;
+
+ for (res = ast_event_iterator_init(&iterator, event); !res; res = ast_event_iterator_next(&iterator)) {
+ if (ast_event_iterator_get_ie_type(&iterator) == ie_type) {
+ return event_iterator_get_ie_raw_payload_len(&iterator);
+ }
+ }
+
+ return 0;
+}
+
+
int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_type,
const char *str)
{
@@ -297,17 +356,24 @@ int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_
strcpy(str_payload->str, str);
str_payload->hash = ast_str_hash(str);
- return event_append_ie_raw(event, ie_type, str_payload, payload_len);
+ return ast_event_append_ie_raw(event, ie_type, str_payload, payload_len);
}
int ast_event_append_ie_uint(struct ast_event **event, enum ast_event_ie_type ie_type,
uint32_t data)
{
data = htonl(data);
- return event_append_ie_raw(event, ie_type, &data, sizeof(data));
+ return ast_event_append_ie_raw(event, ie_type, &data, sizeof(data));
+}
+
+int ast_event_append_ie_bitflags(struct ast_event **event, enum ast_event_ie_type ie_type,
+ uint32_t flags)
+{
+ flags = htonl(flags);
+ return ast_event_append_ie_raw(event, ie_type, &flags, sizeof(flags));
}
-static int event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
+int ast_event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type,
const void *data, size_t data_len)
{
struct ast_event_ie *ie;
@@ -361,11 +427,16 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
memset(ie_value, 0, sizeof(*ie_value));
ie_value->ie_type = ie_type;
ie_value->ie_pltype = va_arg(ap, enum ast_event_ie_pltype);
+
switch (ie_value->ie_pltype) {
case AST_EVENT_IE_PLTYPE_UINT:
ie_value->payload.uint = va_arg(ap, uint32_t);
insert = 1;
break;
+ case AST_EVENT_IE_PLTYPE_BITFLAGS:
+ ie_value->payload.uint = va_arg(ap, uint32_t);
+ insert = 1;
+ break;
case AST_EVENT_IE_PLTYPE_STR:
ie_value->payload.str = va_arg(ap, const char *);
insert = 1;
@@ -381,6 +452,7 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
break;
}
case AST_EVENT_IE_PLTYPE_UNKNOWN:
+ case AST_EVENT_IE_PLTYPE_EXISTS:
break;
}
@@ -407,10 +479,14 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
case AST_EVENT_IE_PLTYPE_UINT:
ast_event_append_ie_uint(&event, ie_val->ie_type, ie_val->payload.uint);
break;
+ case AST_EVENT_IE_PLTYPE_BITFLAGS:
+ ast_event_append_ie_bitflags(&event, ie_val->ie_type, ie_val->payload.uint);
+ break;
case AST_EVENT_IE_PLTYPE_RAW:
- event_append_ie_raw(&event, ie_val->ie_type,
+ ast_event_append_ie_raw(&event, ie_val->ie_type,
ie_val->payload.raw, ie_val->raw_datalen);
break;
+ case AST_EVENT_IE_PLTYPE_EXISTS:
case AST_EVENT_IE_PLTYPE_UNKNOWN:
break;
}
@@ -421,10 +497,27 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...)
}
}
+ if (!ast_event_get_ie_raw(event, AST_EVENT_IE_EID)) {
+ /* If the event is originating on this server, add the server's
+ * entity ID to the event. */
+ ast_event_append_eid(&event);
+ }
+
return event;
}
+int ast_event_append_eid(struct ast_event **event)
+{
+ return ast_event_append_ie_raw(event, AST_EVENT_IE_EID,
+ &ast_eid_default, sizeof(ast_eid_default));
+}
+
void ast_event_destroy(struct ast_event *event)
{
ast_free(event);
}
+
+size_t ast_event_minimum_length(void)
+{
+ return sizeof(struct ast_event);
+}
diff --git a/main/stasis.c b/main/stasis.c
index 4d05f18e8..5eca791ef 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -686,15 +686,17 @@ struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
from = forward->from_topic;
to = forward->to_topic;
- topic_lock_both(to, from);
- AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
- AST_VECTOR_ELEM_CLEANUP_NOOP);
+ if (from && to) {
+ topic_lock_both(to, from);
+ AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
+ AST_VECTOR_ELEM_CLEANUP_NOOP);
- for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
- topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
+ for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
+ topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
+ }
+ ao2_unlock(from);
+ ao2_unlock(to);
}
- ao2_unlock(from);
- ao2_unlock(to);
ao2_cleanup(forward);
@@ -717,6 +719,11 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
return NULL;
}
+ /* Forwards to ourselves are implicit. */
+ if (to_topic == from_topic) {
+ return ao2_bump(forward);
+ }
+
forward->from_topic = ao2_bump(from_topic);
forward->to_topic = ao2_bump(to_topic);
diff --git a/main/stasis_message.c b/main/stasis_message.c
index 1db2ae97a..6132efc20 100644
--- a/main/stasis_message.c
+++ b/main/stasis_message.c
@@ -187,3 +187,8 @@ struct ast_json *stasis_message_to_json(
{
return INVOKE_VIRTUAL(to_json, msg, sanitize);
}
+
+struct ast_event *stasis_message_to_event(struct stasis_message *msg)
+{
+ return INVOKE_VIRTUAL(to_event, msg);
+} \ No newline at end of file
diff --git a/res/res_corosync.c b/res/res_corosync.c
index ce4165498..6c4a3d1e7 100644
--- a/res/res_corosync.c
+++ b/res/res_corosync.c
@@ -44,20 +44,125 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
+#include "asterisk/app.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
AST_RWLOCK_DEFINE_STATIC(event_types_lock);
+static void publish_mwi_to_stasis(struct ast_event *event);
+static void publish_device_state_to_stasis(struct ast_event *event);
+
+/*! \brief The internal topic used for message forwarding and pings */
+static struct stasis_topic *corosync_aggregate_topic;
+
+/*! \brief Our \ref stasis message router */
+static struct stasis_message_router *stasis_router;
+
+/*! \brief Internal accessor for our topic */
+static struct stasis_topic *corosync_topic(void)
+{
+ return corosync_aggregate_topic;
+}
+
+/*! \brief A payload wrapper around a corosync ping event */
+struct corosync_ping_payload {
+ /*! The corosync ping event being passed over \ref stasis */
+ struct ast_event *event;
+};
+
+/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
+static void corosync_ping_payload_dtor(void *obj)
+{
+ struct corosync_ping_payload *payload = obj;
+
+ ast_free(payload->event);
+}
+
+/*! \brief Convert a Corosync PING to a \ref ast_event */
+static struct ast_event *corosync_ping_to_event(struct stasis_message *message)
+{
+ struct corosync_ping_payload *payload;
+ struct ast_event *event;
+ struct ast_eid *event_eid;
+
+ if (!message) {
+ return NULL;
+ }
+
+ payload = stasis_message_data(message);
+
+ if (!payload->event) {
+ return NULL;
+ }
+
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
+
+ event = ast_event_new(AST_EVENT_PING,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
+ AST_EVENT_IE_END);
+
+ return event;
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
+ .to_event = corosync_ping_to_event, );
+
+/*! \brief Publish a Corosync ping to \ref stasis */
+static void publish_corosync_ping_to_stasis(struct ast_event *event)
+{
+ struct corosync_ping_payload *payload;
+ struct stasis_message *message;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
+ ast_assert(event != NULL);
+
+ payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
+ if (!payload) {
+ return;
+ }
+ payload->event = event;
+
+ message = stasis_message_create(corosync_ping_message_type(), payload);
+ if (!message) {
+ ao2_t_ref(payload, -1, "Destroy payload on off nominal");
+ return;
+ }
+
+ stasis_publish(corosync_topic(), message);
+
+ ao2_t_ref(payload, -1, "Hand ref to stasis");
+ ao2_t_ref(message, -1, "Hand ref to stasis");
+}
+
static struct {
const char *name;
- struct ast_event_sub *sub;
+ struct stasis_forward *sub;
unsigned char publish;
unsigned char publish_default;
unsigned char subscribe;
unsigned char subscribe_default;
+ struct stasis_topic *(* topic_fn)(void);
+ struct stasis_cache *(* cache_fn)(void);
+ struct stasis_message_type *(* message_type_fn)(void);
+ void (* publish_to_stasis)(struct ast_event *);
} event_types[] = {
- [AST_EVENT_MWI] = { .name = "mwi", },
- [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
- [AST_EVENT_PING] = { .name = "ping", .publish_default = 1, .subscribe_default = 1 },
+ [AST_EVENT_MWI] = { .name = "mwi",
+ .topic_fn = ast_mwi_topic_all,
+ .cache_fn = ast_mwi_state_cache,
+ .message_type_fn = ast_mwi_state_type,
+ .publish_to_stasis = publish_mwi_to_stasis, },
+ [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
+ .topic_fn = ast_device_state_topic_all,
+ .cache_fn = ast_device_state_cache,
+ .message_type_fn = ast_device_state_message_type,
+ .publish_to_stasis = publish_device_state_to_stasis, },
+ [AST_EVENT_PING] = { .name = "ping",
+ .publish_default = 1,
+ .subscribe_default = 1,
+ .topic_fn = corosync_topic,
+ .message_type_fn = corosync_ping_message_type,
+ .publish_to_stasis = publish_corosync_ping_to_stasis, },
};
static struct {
@@ -88,6 +193,71 @@ static corosync_cfg_callbacks_t cfg_callbacks = {
.corosync_cfg_shutdown_callback = cfg_shutdown_cb,
};
+/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
+static void publish_mwi_to_stasis(struct ast_event *event)
+{
+ const char *mailbox;
+ const char *context;
+ unsigned int new_msgs;
+ unsigned int old_msgs;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_MWI);
+
+ mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
+ context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
+ new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+ old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
+ return;
+ }
+
+ if (new_msgs > INT_MAX) {
+ new_msgs = INT_MAX;
+ }
+
+ if (old_msgs > INT_MAX) {
+ old_msgs = INT_MAX;
+ }
+
+ if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
+ (int)old_msgs, NULL, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
+ mailbox, context, eid);
+ }
+}
+
+/*! \brief Publish a received device state \ref ast_event to \ref stasis */
+static void publish_device_state_to_stasis(struct ast_event *event)
+{
+ const char *device;
+ enum ast_device_state state;
+ unsigned int cachable;
+ struct ast_eid *event_eid;
+
+ ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE);
+
+ device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
+ state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+ cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+
+ if (ast_strlen_zero(device)) {
+ return;
+ }
+
+ if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
+ char eid[16];
+ ast_eid_to_str(eid, sizeof(eid), event_eid);
+ ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
+ device, eid);
+ }
+}
+
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
@@ -101,8 +271,6 @@ static cpg_callbacks_t cpg_callbacks = {
.cpg_confchg_fn = cpg_confchg_cb,
};
-static void ast_event_cb(const struct ast_event *event, void *data);
-
#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
static void cfg_state_track_cb(
corosync_cfg_state_notification_buffer_t *notification_buffer,
@@ -120,6 +288,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
struct ast_event *event;
+ void (*publish_handler)(struct ast_event *) = NULL;
+ enum ast_event_type event_type;
if (msg_len < ast_event_minimum_length()) {
ast_debug(1, "Ignoring event that's too small. %u < %u\n",
@@ -133,9 +303,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
return;
}
+ event_type = ast_event_get_type(msg);
+ if (event_type > AST_EVENT_TOTAL) {
+ /* Egads, we don't support this */
+ return;
+ }
+
ast_rwlock_rdlock(&event_types_lock);
- if (!event_types[ast_event_get_type(msg)].subscribe) {
- /* We are not configured to subscribe to these events. */
+ publish_handler = event_types[event_type].publish_to_stasis;
+ if (!event_types[event_type].subscribe || !publish_handler) {
+ /* We are not configured to subscribe to these events or
+ we have no way to publish it internally. */
ast_rwlock_unlock(&event_types_lock);
return;
}
@@ -147,20 +325,80 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
memcpy(event, msg, msg_len);
+ if (event_type == AST_EVENT_PING) {
+ const struct ast_eid *eid;
+ char buf[128] = "";
+
+ eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
+ ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
+ }
+ ast_debug(5, "Publishing event %s (%d) to stasis\n",
+ ast_event_get_type_name(event), event_type);
+ publish_handler(event);
+}
+
+static void publish_to_corosync(struct stasis_message *message)
+{
+ cs_error_t cs_err;
+ struct iovec iov;
+ struct ast_event *event;
+
+ event = stasis_message_to_event(message);
+ if (!event) {
+ return;
+ }
+
+ if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
+ /* If the event didn't originate from this server, don't send it back out. */
+ ast_event_destroy(event);
+ return;
+ }
+
if (ast_event_get_type(event) == AST_EVENT_PING) {
const struct ast_eid *eid;
char buf[128] = "";
eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(cpg_deliver_cb) Got event PING from server with EID: '%s'\n", buf);
+ ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
+ }
+
+ iov.iov_base = (void *)event;
+ iov.iov_len = ast_event_get_size(event);
+
+ ast_debug(5, "Publishing event %s (%d) to corosync\n",
+ ast_event_get_type_name(event), ast_event_get_type(event));
- ast_event_queue(event);
- } else {
- ast_event_queue_and_cache(event);
+ /* The stasis subscription will only exist if we are configured to publish
+ * these events, so just send away. */
+ if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
+ ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
}
}
+static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ if (!message) {
+ return;
+ }
+
+ publish_to_corosync(message);
+}
+
+static int dump_cache_cb(void *obj, void *arg, int flags)
+{
+ struct stasis_message *message = obj;
+
+ if (!message) {
+ return 0;
+ }
+
+ publish_to_corosync(message);
+
+ return 0;
+}
+
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
@@ -176,20 +414,27 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
for (i = 0; i < ARRAY_LEN(event_types); i++) {
- struct ast_event_sub *event_sub;
+ struct ao2_container *messages;
ast_rwlock_rdlock(&event_types_lock);
if (!event_types[i].publish) {
ast_rwlock_unlock(&event_types_lock);
continue;
}
+
+ if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
+ ast_rwlock_unlock(&event_types_lock);
+ continue;
+ }
+
+ messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
+ event_types[i].message_type_fn(),
+ &ast_eid_default);
ast_rwlock_unlock(&event_types_lock);
- event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
- ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
- &ast_eid_default, sizeof(ast_eid_default));
- ast_event_dump_cache(event_sub);
- ast_event_sub_destroy(event_sub);
+ ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
+
+ ao2_t_ref(messages, -1, "Dispose of dumped cache");
}
}
@@ -231,13 +476,13 @@ static void *dispatch_thread_handler(void *data)
if (pfd[0].revents & POLLIN) {
if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
}
}
if (pfd[1].revents & POLLIN) {
if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
+ ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
}
}
@@ -287,37 +532,6 @@ static void *dispatch_thread_handler(void *data)
return NULL;
}
-static void ast_event_cb(const struct ast_event *event, void *data)
-{
- cs_error_t cs_err;
- struct iovec iov = {
- .iov_base = (void *) event,
- .iov_len = ast_event_get_size(event),
- };
-
- if (ast_event_get_type(event) == AST_EVENT_PING) {
- const struct ast_eid *eid;
- char buf[128] = "";
-
- eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
- ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
- ast_log(LOG_NOTICE, "(ast_event_cb) Got event PING from server with EID: '%s'\n", buf);
- }
-
- if (ast_eid_cmp(&ast_eid_default,
- ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
- /* If the event didn't originate from this server, don't send it back out. */
- return;
- }
-
- /* The ast_event subscription will only exist if we are configured to publish
- * these events, so just send away. */
-
- if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
- ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err);
- }
-}
-
static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
cs_error_t cs_err;
@@ -368,7 +582,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
continue;
}
- ast_cli(a->fd, "=== Node %u\n", i);
+ ast_cli(a->fd, "=== Node %d\n", i);
ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
for (j = 0; j < num_addrs; j++) {
@@ -378,7 +592,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
- ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
+ ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
}
}
@@ -421,7 +635,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_FAILURE;
}
- ast_event_queue(event);
+ ast_rwlock_rdlock(&event_types_lock);
+ event_types[AST_EVENT_PING].publish_to_stasis(event);
+ ast_rwlock_unlock(&event_types_lock);
return CLI_SUCCESS;
}
@@ -532,11 +748,16 @@ static int load_general_config(struct ast_config *cfg)
for (i = 0; i < ARRAY_LEN(event_types); i++) {
if (event_types[i].publish && !event_types[i].sub) {
- event_types[i].sub = ast_event_subscribe(i,
- ast_event_cb, "Corosync", NULL,
- AST_EVENT_IE_END);
+ event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(),
+ corosync_topic());
+ stasis_message_router_add(stasis_router,
+ event_types[i].message_type_fn(),
+ stasis_message_cb,
+ NULL);
} else if (!event_types[i].publish && event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
}
}
@@ -577,14 +798,32 @@ static void cleanup_module(void)
cs_error_t cs_err;
unsigned int i;
- for (i = 0; i < ARRAY_LEN(event_types); i++) {
- if (event_types[i].sub) {
- event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
+ if (stasis_router) {
+
+ /* Unsubscribe all topic forwards and cancel all message routes */
+ ast_rwlock_wrlock(&event_types_lock);
+ for (i = 0; i < ARRAY_LEN(event_types); i++) {
+ if (event_types[i].sub) {
+ event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
+ stasis_message_router_remove(stasis_router,
+ event_types[i].message_type_fn());
+ }
+ event_types[i].publish = 0;
+ event_types[i].subscribe = 0;
}
- event_types[i].publish = 0;
- event_types[i].subscribe = 0;
+ ast_rwlock_unlock(&event_types_lock);
+
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+ }
+
+ if (corosync_aggregate_topic) {
+ ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
+ corosync_aggregate_topic = NULL;
}
+ STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
+
if (dispatch_thread.id != AST_PTHREADT_NULL) {
char meepmeep = 'x';
dispatch_thread.stop = 1;
@@ -623,13 +862,30 @@ static int load_module(void)
enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
struct cpg_name name;
+ corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
+ if (!corosync_aggregate_topic) {
+ ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
+ goto failed;
+ }
+
+ stasis_router = stasis_message_router_create(corosync_aggregate_topic);
+ if (!stasis_router) {
+ ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
+ goto failed;
+ }
+
+ if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
+ ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
+ goto failed;
+ }
+
if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
- return AST_MODULE_LOAD_DECLINE;
+ ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
+ goto failed;
}
if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
goto failed;
}
@@ -637,7 +893,7 @@ static int load_module(void)
name.length = strlen(name.value);
if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
+ ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
goto failed;
}