summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/asterisk.c6
-rw-r--r--main/asterisk.exports.in1
-rw-r--r--main/channel.c287
-rw-r--r--main/channel_internal_api.c21
-rw-r--r--main/manager.c200
-rw-r--r--main/pbx.c13
-rw-r--r--main/stasis.c514
-rw-r--r--main/stasis_cache.c443
-rw-r--r--main/stasis_message.c135
9 files changed, 1506 insertions, 114 deletions
diff --git a/main/asterisk.c b/main/asterisk.c
index 1d5371938..4e5e58c05 100644
--- a/main/asterisk.c
+++ b/main/asterisk.c
@@ -240,6 +240,7 @@ int daemon(int, int); /* defined in libresolv of all places */
#include "asterisk/aoc.h"
#include "asterisk/uuid.h"
#include "asterisk/sorcery.h"
+#include "asterisk/stasis.h"
#include "../defaults.h"
@@ -4120,6 +4121,11 @@ int main(int argc, char *argv[])
exit(1);
}
+ if (stasis_init()) {
+ printf("Stasis initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
ast_makesocket();
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
diff --git a/main/asterisk.exports.in b/main/asterisk.exports.in
index 49d3a44a4..3157b8319 100644
--- a/main/asterisk.exports.in
+++ b/main/asterisk.exports.in
@@ -32,6 +32,7 @@
LINKER_SYMBOL_PREFIXdialed_interface_info;
LINKER_SYMBOL_PREFIXstrsep;
LINKER_SYMBOL_PREFIXsetenv;
+ LINKER_SYMBOL_PREFIXstasis_*;
LINKER_SYMBOL_PREFIXunsetenv;
LINKER_SYMBOL_PREFIXstrcasestr;
LINKER_SYMBOL_PREFIXstrnlen;
diff --git a/main/channel.c b/main/channel.c
index 9c8f32cb5..df0a67b3b 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -152,6 +152,15 @@ static AST_RWLIST_HEAD_STATIC(backends, chanlist);
/*! \brief All active channels on the system */
static struct ao2_container *channels;
+/*! \brief Message type for channel snapshot events */
+static struct stasis_message_type *__channel_snapshot;
+
+static struct stasis_message_type *__channel_varset;
+
+struct stasis_topic *__channel_topic_all;
+
+struct stasis_caching_topic *__channel_topic_all_cached;
+
/*! \brief map AST_CAUSE's to readable string representations
*
* \ref causes.h
@@ -214,6 +223,77 @@ static const struct causes_map causes[] = {
{ AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" },
};
+static void publish_channel_state(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ snapshot = ast_channel_snapshot_create(chan);
+ if (!snapshot) {
+ ast_log(LOG_ERROR, "Allocation error\n");
+ return;
+ }
+
+ message = stasis_message_create(ast_channel_snapshot(), snapshot);
+ if (!message) {
+ return;
+ }
+
+ ast_assert(ast_channel_topic(chan) != NULL);
+ stasis_publish(ast_channel_topic(chan), message);
+}
+
+static void channel_varset_dtor(void *obj)
+{
+ struct ast_channel_varset *event = obj;
+ ao2_cleanup(event->snapshot);
+ ast_free(event->variable);
+ ast_free(event->value);
+}
+
+void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
+{
+ RAII_VAR(struct ast_channel_varset *, event, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ event = ao2_alloc(sizeof(*event), channel_varset_dtor);
+ if (!event) {
+ return;
+ }
+
+ if (chan) {
+ event->snapshot = ast_channel_snapshot_create(chan);
+ if (event->snapshot == NULL) {
+ return;
+ }
+ }
+ event->variable = ast_strdup(name);
+ event->value = ast_strdup(value);
+ if (event->variable == NULL || event->value == NULL) {
+ return;
+ }
+
+ msg = stasis_message_create(ast_channel_varset(), event);
+ if (!msg) {
+ return;
+ }
+
+ if (chan) {
+ stasis_publish(ast_channel_topic(chan), msg);
+ } else {
+ stasis_publish(ast_channel_topic_all(), msg);
+ }
+}
+
+
+static void publish_cache_clear(struct ast_channel *chan)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan));
+ stasis_publish(ast_channel_topic(chan), message);
+}
+
struct ast_variable *ast_channeltype_list(void)
{
struct chanlist *cl;
@@ -1073,6 +1153,8 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp));
}
+ ast_channel_internal_setup_topics(tmp);
+
if (!ast_strlen_zero(name_fmt)) {
char *slash, *slash2;
/* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call.
@@ -1145,34 +1227,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
* a lot of data into this func to do it here!
*/
if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) {
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a new channel is created.</synopsis>
- <syntax>
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" />
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" />
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "AccountCode: %s\r\n"
- "Exten: %s\r\n"
- "Context: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(tmp),
- state,
- ast_state2str(state),
- S_OR(cid_num, ""),
- S_OR(cid_name, ""),
- ast_channel_accountcode(tmp),
- S_OR(exten, ""),
- S_OR(context, ""),
- ast_channel_uniqueid(tmp));
+ publish_channel_state(tmp);
}
ast_channel_internal_finalize(tmp);
@@ -2893,39 +2948,9 @@ int ast_hangup(struct ast_channel *chan)
ast_channel_unlock(chan);
ast_cc_offer(chan);
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel is hung up.</synopsis>
- <syntax>
- <parameter name="Cause">
- <para>A numeric cause code for why the channel was hung up.</para>
- </parameter>
- <parameter name="Cause-txt">
- <para>A description of why the channel was hung up.</para>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup",
- "Channel: %s\r\n"
- "Uniqueid: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "AccountCode: %s\r\n"
- "Cause: %d\r\n"
- "Cause-txt: %s\r\n",
- ast_channel_name(chan),
- ast_channel_uniqueid(chan),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"),
- ast_channel_accountcode(chan),
- ast_channel_hangupcause(chan),
- ast_cause2str(ast_channel_hangupcause(chan))
- );
+
+ publish_channel_state(chan);
+ publish_cache_clear(chan);
if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) &&
!ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) &&
@@ -7435,47 +7460,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state)
* we override what they are saying the state is and things go amuck. */
ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name);
- /* setstate used to conditionally report Newchannel; this is no more */
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel's state changes.</synopsis>
- <syntax>
- <parameter name="ChannelState">
- <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
- </parameter>
- <parameter name="ChannelStateDesc">
- <enumlist>
- <enum name="Down"/>
- <enum name="Rsrvd"/>
- <enum name="OffHook"/>
- <enum name="Dialing"/>
- <enum name="Ring"/>
- <enum name="Ringing"/>
- <enum name="Up"/>
- <enum name="Busy"/>
- <enum name="Dialing Offhook"/>
- <enum name="Pre-ring"/>
- <enum name="Unknown"/>
- </enumlist>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""),
- ast_channel_uniqueid(chan));
+ publish_channel_state(chan);
return 0;
}
@@ -8644,6 +8629,14 @@ static void prnt_channel_key(void *v_obj, void *where, ao2_prnt_fn *prnt)
static void channels_shutdown(void)
{
+ ao2_cleanup(__channel_snapshot);
+ __channel_snapshot = NULL;
+ ao2_cleanup(__channel_varset);
+ __channel_varset = NULL;
+ ao2_cleanup(__channel_topic_all);
+ __channel_topic_all = NULL;
+ stasis_caching_unsubscribe(__channel_topic_all_cached);
+ __channel_topic_all_cached = NULL;
ast_data_unregister(NULL);
ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
if (channels) {
@@ -8653,6 +8646,16 @@ static void channels_shutdown(void)
}
}
+static const char *channel_snapshot_get_id(struct stasis_message *message)
+{
+ struct ast_channel_snapshot *snapshot;
+ if (ast_channel_snapshot() != stasis_message_type(message)) {
+ return NULL;
+ }
+ snapshot = stasis_message_data(message);
+ return snapshot->uniqueid;
+}
+
void ast_channels_init(void)
{
channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS,
@@ -8661,6 +8664,12 @@ void ast_channels_init(void)
ao2_container_register("channels", channels, prnt_channel_key);
}
+ __channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+ __channel_varset = stasis_message_type_create("ast_channel_varset");
+
+ __channel_topic_all = stasis_topic_create("ast_channel_topic_all");
+ __channel_topic_all_cached = stasis_caching_topic_create(__channel_topic_all, channel_snapshot_get_id);
+
ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers));
@@ -8668,6 +8677,7 @@ void ast_channels_init(void)
ast_plc_reload();
ast_register_atexit(channels_shutdown);
+
}
/*! \brief Print call group and pickup group ---*/
@@ -11241,6 +11251,79 @@ int ast_channel_get_cc_agent_type(struct ast_channel *chan, char *agent_type, si
return 0;
}
+static void ast_channel_snapshot_dtor(void *obj)
+{
+ struct ast_channel_snapshot *snapshot = obj;
+ ast_string_field_free_memory(snapshot);
+}
+
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor);
+ if (ast_string_field_init(snapshot, 1024)) {
+ return NULL;
+ }
+
+ ast_string_field_set(snapshot, name, ast_channel_name(chan));
+ ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
+ ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
+ ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
+ ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
+ ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
+ ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan));
+ ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
+ if (ast_channel_appl(chan)) {
+ ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
+ }
+ if (ast_channel_data(chan)) {
+ ast_string_field_set(snapshot, data, ast_channel_data(chan));
+ }
+ ast_string_field_set(snapshot, context, ast_channel_context(chan));
+ ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
+
+ ast_string_field_set(snapshot, caller_name,
+ S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, caller_number,
+ S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
+
+ ast_string_field_set(snapshot, connected_name,
+ S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, connected_number,
+ S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
+
+ snapshot->creationtime = ast_channel_creationtime(chan);
+ snapshot->state = ast_channel_state(chan);
+ snapshot->priority = ast_channel_priority(chan);
+ snapshot->amaflags = ast_channel_amaflags(chan);
+ snapshot->hangupcause = ast_channel_hangupcause(chan);
+ snapshot->flags = *ast_channel_flags(chan);
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
+
+struct stasis_message_type *ast_channel_varset(void)
+{
+ return __channel_varset;
+}
+
+struct stasis_message_type *ast_channel_snapshot(void)
+{
+ return __channel_snapshot;
+}
+
+struct stasis_topic *ast_channel_topic_all(void)
+{
+ return __channel_topic_all;
+}
+
+struct stasis_caching_topic *ast_channel_topic_all_cached(void)
+{
+ return __channel_topic_all_cached;
+}
+
/* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY
*
* ONLY FUNCTIONS FOR PROVIDING BACKWARDS ABI COMPATIBILITY BELONG HERE
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index 3f892ddef..8cc2e6c62 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -195,6 +195,8 @@ struct ast_channel {
char dtmf_digit_to_emulate; /*!< Digit being emulated */
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
+ struct stasis_topic *topic; /*!< Topic for all channel's events */
+ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
};
/* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1364,6 +1366,12 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
}
ast_string_field_free_memory(chan);
+
+ stasis_unsubscribe(chan->forwarder);
+ chan->forwarder = NULL;
+
+ ao2_cleanup(chan->topic);
+ chan->topic = NULL;
}
void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1375,3 +1383,16 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan)
{
return chan->finalized;
}
+
+struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
+{
+ return chan->topic;
+}
+
+void ast_channel_internal_setup_topics(struct ast_channel *chan)
+{
+ ast_assert(chan->topic == NULL);
+ ast_assert(chan->forwarder == NULL);
+ chan->topic = stasis_topic_create(chan->uniqueid);
+ chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
+}
diff --git a/main/manager.c b/main/manager.c
index fc0ec2631..10a3a3397 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -91,6 +91,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/aoc.h"
#include "asterisk/stringfields.h"
#include "asterisk/presencestate.h"
+#include "asterisk/stasis.h"
/*** DOCUMENTATION
<manager name="Ping" language="en_US">
@@ -963,6 +964,73 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
manager.conf will be present upon starting a new session.</para>
</description>
</manager>
+ <managerEvent language="en_US" name="Newchannel">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a new channel is created.</synopsis>
+ <syntax>
+ <parameter name="Channel">
+ </parameter>
+ <parameter name="ChannelState">
+ <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
+ </parameter>
+ <parameter name="ChannelStateDesc">
+ <enumlist>
+ <enum name="Down"/>
+ <enum name="Rsrvd"/>
+ <enum name="OffHook"/>
+ <enum name="Dialing"/>
+ <enum name="Ring"/>
+ <enum name="Ringing"/>
+ <enum name="Up"/>
+ <enum name="Busy"/>
+ <enum name="Dialing Offhook"/>
+ <enum name="Pre-ring"/>
+ <enum name="Unknown"/>
+ </enumlist>
+ </parameter>
+ <parameter name="CallerIDNum">
+ </parameter>
+ <parameter name="CallerIDName">
+ </parameter>
+ <parameter name="ConnectedLineNum">
+ </parameter>
+ <parameter name="ConnectedLineName">
+ </parameter>
+ <parameter name="AccountCode">
+ </parameter>
+ <parameter name="Context">
+ </parameter>
+ <parameter name="Exten">
+ </parameter>
+ <parameter name="Priority">
+ </parameter>
+ <parameter name="Uniqueid">
+ </parameter>
+ <parameter name="Cause">
+ <para>A numeric cause code for why the channel was hung up.</para>
+ </parameter>
+ <parameter name="Cause-txt">
+ <para>A description of why the channel was hung up.</para>
+ </parameter>
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
+ <managerEvent language="en_US" name="Newstate">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a channel's state changes.</synopsis>
+ <syntax>
+ <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
+ <managerEvent language="en_US" name="Hangup">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a channel is hung up.</synopsis>
+ <syntax>
+ <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
***/
/*! \addtogroup Group_AMI AMI functions
@@ -1060,6 +1128,8 @@ static const struct {
{{ "restart", "gracefully", NULL }},
};
+static struct stasis_subscription *channel_state_sub;
+
static void acl_change_event_cb(const struct ast_event *event, void *userdata);
static void acl_change_event_subscribe(void)
@@ -7376,6 +7446,127 @@ static void load_channelvars(struct ast_variable *var)
AST_RWLIST_UNLOCK(&channelvars);
}
+/*!
+ * \brief Generate the AMI message body from a channel snapshot
+ * \internal
+ *
+ * \param snapshot the channel snapshot for which to generate an AMI message body
+ *
+ * \retval NULL on error
+ * \retval ast_str* on success (must be ast_freed by caller)
+ */
+static struct ast_str *manager_build_channel_state_string(const struct ast_channel_snapshot *snapshot)
+{
+ struct ast_str *out = ast_str_create(1024);
+ int res = 0;
+ if (!out) {
+ return NULL;
+ }
+ res = ast_str_set(&out, 0,
+ "Channel: %s\r\n"
+ "ChannelState: %d\r\n"
+ "ChannelStateDesc: %s\r\n"
+ "CallerIDNum: %s\r\n"
+ "CallerIDName: %s\r\n"
+ "ConnectedLineNum: %s\r\n"
+ "ConnectedLineName: %s\r\n"
+ "AccountCode: %s\r\n"
+ "Context: %s\r\n"
+ "Exten: %s\r\n"
+ "Priority: %d\r\n"
+ "Uniqueid: %s\r\n"
+ "Cause: %d\r\n"
+ "Cause-txt: %s\r\n",
+ snapshot->name,
+ snapshot->state,
+ ast_state2str(snapshot->state),
+ snapshot->caller_number,
+ snapshot->caller_name,
+ snapshot->connected_number,
+ snapshot->connected_name,
+ snapshot->accountcode,
+ snapshot->context,
+ snapshot->exten,
+ snapshot->priority,
+ snapshot->uniqueid,
+ snapshot->hangupcause,
+ ast_cause2str(snapshot->hangupcause));
+
+ if (!res) {
+ return NULL;
+ }
+
+ return out;
+}
+
+static void channel_snapshot_update(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot)
+{
+ int is_hungup;
+ char *manager_event = NULL;
+
+ if (!new_snapshot) {
+ /* Ignore cache clearing events; we'll see the hangup first */
+ return;
+ }
+
+ is_hungup = ast_test_flag(&new_snapshot->flags, AST_FLAG_ZOMBIE) ? 1 : 0;
+
+ if (!old_snapshot) {
+ manager_event = "Newchannel";
+ }
+
+ if (old_snapshot && old_snapshot->state != new_snapshot->state) {
+ manager_event = "Newstate";
+ }
+
+ if (old_snapshot && is_hungup) {
+ manager_event = "Hangup";
+ }
+
+ if (manager_event) {
+ RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+
+ channel_event_string = manager_build_channel_state_string(new_snapshot);
+ if (channel_event_string) {
+ manager_event(EVENT_FLAG_CALL, manager_event, "%s", ast_str_buffer(channel_event_string));
+ }
+ }
+}
+
+static void channel_varset(const char *channel_name, const char *uniqueid, const char *name, const char *value)
+{
+ /*** DOCUMENTATION
+ <managerEventInstance>
+ <synopsis>Raised when a variable is set to a particular value.</synopsis>
+ </managerEventInstance>
+ ***/
+ manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
+ "Channel: %s\r\n"
+ "Variable: %s\r\n"
+ "Value: %s\r\n"
+ "Uniqueid: %s\r\n",
+ channel_name, name, value, uniqueid);
+}
+
+static void channel_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ if (stasis_message_type(message) == stasis_cache_update()) {
+ struct stasis_cache_update *update = stasis_message_data(message);
+ if (ast_channel_snapshot() == update->type) {
+ struct ast_channel_snapshot *old_snapshot =
+ stasis_message_data(update->old_snapshot);
+ struct ast_channel_snapshot *new_snapshot =
+ stasis_message_data(update->new_snapshot);
+ channel_snapshot_update(old_snapshot, new_snapshot);
+ }
+ } else if (stasis_message_type(message) == ast_channel_varset()) {
+ struct ast_channel_varset *varset = stasis_message_data(message);
+ const char *name = varset->snapshot ? varset->snapshot->name : "none";
+ const char *uniqueid = varset->snapshot ? varset->snapshot->uniqueid : "none";
+ channel_varset(name, uniqueid, varset->variable, varset->value);
+ }
+}
+
/*! \internal \brief Free a user record. Should already be removed from the list */
static void manager_free_user(struct ast_manager_user *user)
{
@@ -7399,6 +7590,9 @@ static void manager_shutdown(void)
{
struct ast_manager_user *user;
+ stasis_unsubscribe(channel_state_sub);
+ channel_state_sub = NULL;
+
if (registered) {
ast_manager_unregister("Ping");
ast_manager_unregister("Events");
@@ -7490,6 +7684,12 @@ static int __init_manager(int reload, int by_external_config)
manager_enabled = 0;
+ if (!channel_state_sub) {
+ channel_state_sub = stasis_subscribe(
+ stasis_caching_get_topic(ast_channel_topic_all_cached()),
+ channel_event_cb, NULL);
+ }
+
if (!registered) {
/* Register default actions */
ast_manager_register_xml_core("Ping", 0, action_ping);
diff --git a/main/pbx.c b/main/pbx.c
index bf95ccbe2..82bbb5257 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -11453,18 +11453,7 @@ int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const
ast_verb(2, "Setting global variable '%s' to '%s'\n", name, value);
newvariable = ast_var_assign(name, value);
AST_LIST_INSERT_HEAD(headp, newvariable, entries);
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a variable is set to a particular value.</synopsis>
- </managerEventInstance>
- ***/
- manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
- "Channel: %s\r\n"
- "Variable: %s\r\n"
- "Value: %s\r\n"
- "Uniqueid: %s\r\n",
- chan ? ast_channel_name(chan) : "none", name, value,
- chan ? ast_channel_uniqueid(chan) : "none");
+ ast_channel_publish_varset(chan, name, value);
}
if (chan)
diff --git a/main/stasis.c b/main/stasis.c
new file mode 100644
index 000000000..f94736bf1
--- /dev/null
+++ b/main/stasis.c
@@ -0,0 +1,514 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/threadpool.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/utils.h"
+#include "asterisk/uuid.h"
+
+/*! Initial size of the subscribers list. */
+#define INITIAL_SUBSCRIBERS_MAX 4
+
+/*! Threadpool for dispatching notifications to subscribers */
+static struct ast_threadpool *pool;
+
+static struct stasis_message_type *__subscription_change_message_type;
+
+/*! \private */
+struct stasis_topic {
+ char *name;
+ /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
+ struct stasis_subscription **subscribers;
+ /*! Allocated length of the subscribers array */
+ size_t num_subscribers_max;
+ /*! Current size of the subscribers array */
+ size_t num_subscribers_current;
+};
+
+/* Forward declarations for the tightly-coupled subscription object */
+struct stasis_subscription;
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+
+static void topic_dtor(void *obj)
+{
+ struct stasis_topic *topic = obj;
+ ast_free(topic->name);
+ topic->name = NULL;
+ ast_free(topic->subscribers);
+ topic->subscribers = NULL;
+}
+
+struct stasis_topic *stasis_topic_create(const char *name)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+
+ topic = ao2_alloc(sizeof(*topic), topic_dtor);
+
+ if (!topic) {
+ return NULL;
+ }
+
+ topic->name = ast_strdup(name);
+ if (!topic->name) {
+ return NULL;
+ }
+
+ topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
+ topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers));
+ if (!topic->subscribers) {
+ return NULL;
+ }
+
+ ao2_ref(topic, +1);
+ return topic;
+}
+
+const char *stasis_topic_name(const struct stasis_topic *topic)
+{
+ return topic->name;
+}
+
+/*! \private */
+struct stasis_subscription {
+ /*! Unique ID for this subscription */
+ char *uniqueid;
+ /*! Topic subscribed to. */
+ struct stasis_topic *topic;
+ /*! Mailbox for processing incoming messages. */
+ struct ast_taskprocessor *mailbox;
+ /*! Callback function for incoming message processing. */
+ stasis_subscription_cb callback;
+ /*! Data pointer to be handed to the callback. */
+ void *data;
+};
+
+static void subscription_dtor(void *obj)
+{
+ struct stasis_subscription *sub = obj;
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+ ast_free(sub->uniqueid);
+ sub->uniqueid = NULL;
+ ao2_cleanup(sub->topic);
+ sub->topic = NULL;
+ ast_taskprocessor_unreference(sub->mailbox);
+ sub->mailbox = NULL;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+
+static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
+{
+ RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
+ char uniqueid[AST_UUID_STR_LEN];
+
+ sub = ao2_alloc(sizeof(*sub), subscription_dtor);
+ if (!sub) {
+ return NULL;
+ }
+
+ id = ast_uuid_generate();
+ if (!id) {
+ ast_log(LOG_ERROR, "UUID generation failed\n");
+ return NULL;
+ }
+ ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
+ if (needs_mailbox) {
+ sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
+ if (!sub->mailbox) {
+ return NULL;
+ }
+ }
+
+ sub->uniqueid = ast_strdup(uniqueid);
+ ao2_ref(topic, +1);
+ sub->topic = topic;
+ sub->callback = callback;
+ sub->data = data;
+
+ if (topic_add_subscription(topic, sub) != 0) {
+ return NULL;
+ }
+ send_subscription_change_message(topic, uniqueid, "Subscribe");
+
+ ao2_ref(sub, +1);
+ return sub;
+}
+
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+{
+ return __stasis_subscribe(topic, callback, data, 1);
+}
+
+void stasis_unsubscribe(struct stasis_subscription *sub)
+{
+ if (sub) {
+ size_t i;
+ struct stasis_topic *topic = sub->topic;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ if (topic->subscribers[i] == sub) {
+ send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
+ /* swap [i] with last entry; remove last entry */
+ topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
+ return;
+ }
+ }
+
+ ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+ }
+}
+
+int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
+{
+ if (sub) {
+ size_t i;
+ struct stasis_topic *topic = sub->topic;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ if (topic->subscribers[i] == sub) {
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
+{
+ return sub->uniqueid;
+}
+
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
+{
+ struct stasis_subscription_change *change;
+ if (stasis_message_type(msg) != stasis_subscription_change()) {
+ return 0;
+ }
+
+ change = stasis_message_data(msg);
+ if (strcmp("Unsubscribe", change->description)) {
+ return 0;
+ }
+
+ if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*!
+ * \brief Add a subscriber to a topic.
+ * \param topic Topic
+ * \param sub Subscriber
+ * \return 0 on success
+ * \return Non-zero on error
+ */
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+ struct stasis_subscription **subscribers;
+ SCOPED_AO2LOCK(lock, topic);
+
+ /* Increase list size, if needed */
+ if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
+ subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
+ if (!subscribers) {
+ return -1;
+ }
+ topic->subscribers = subscribers;
+ topic->num_subscribers_max *= 2;
+ }
+
+ /* Don't ref sub here or we'll cause a reference cycle. */
+ topic->subscribers[topic->num_subscribers_current++] = sub;
+ return 0;
+}
+
+/*!
+ * \private
+ * \brief Information needed to dispatch a message to a subscription
+ */
+struct dispatch {
+ /*! Topic message was published to */
+ struct stasis_topic *topic;
+ /*! The message itself */
+ struct stasis_message *message;
+ /*! Subscription receiving the message */
+ struct stasis_subscription *sub;
+};
+
+static void dispatch_dtor(void *data)
+{
+ struct dispatch *dispatch = data;
+ ao2_cleanup(dispatch->topic);
+ ao2_cleanup(dispatch->message);
+ ao2_cleanup(dispatch->sub);
+}
+
+static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+{
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ ast_assert(topic != NULL);
+ ast_assert(message != NULL);
+ ast_assert(sub != NULL);
+
+ dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
+ if (!dispatch) {
+ return NULL;
+ }
+
+ dispatch->topic = topic;
+ ao2_ref(topic, +1);
+
+ dispatch->message = message;
+ ao2_ref(message, +1);
+
+ dispatch->sub = sub;
+ ao2_ref(sub, +1);
+
+ ao2_ref(dispatch, +1);
+ return dispatch;
+}
+
+/*!
+ * \brief Dispatch a message to a subscriber
+ * \param data \ref dispatch object
+ * \return 0
+ */
+static int dispatch_exec(void *data)
+{
+ RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
+
+ /* Since sub->topic doesn't change, no need to lock sub */
+ ast_assert(dispatch->sub->topic != NULL);
+ ao2_ref(dispatch->sub->topic, +1);
+ sub_topic = dispatch->sub->topic;
+
+ dispatch->sub->callback(dispatch->sub->data,
+ dispatch->sub,
+ sub_topic,
+ dispatch->message);
+
+ return 0;
+}
+
+void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+ struct stasis_subscription **subscribers = NULL;
+ size_t num_subscribers, i;
+
+ /* Copy the subscribers, so we don't have to hold the mutex for long */
+ {
+ SCOPED_AO2LOCK(lock, topic);
+ num_subscribers = topic->num_subscribers_current;
+ subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
+ if (subscribers) {
+ for (i = 0; i < num_subscribers; ++i) {
+ ao2_ref(topic->subscribers[i], +1);
+ subscribers[i] = topic->subscribers[i];
+ }
+ }
+ }
+
+ if (!subscribers) {
+ ast_log(LOG_ERROR, "Dropping message\n");
+ return;
+ }
+
+ for (i = 0; i < num_subscribers; ++i) {
+ struct stasis_subscription *sub = subscribers[i];
+
+ ast_assert(sub != NULL);
+
+ if (sub->mailbox) {
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ break;
+ }
+
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+ dispatch = NULL; /* Ownership transferred to mailbox */
+ }
+ } else {
+ /* No mailbox; dispatch directly */
+ sub->callback(sub->data, sub, sub->topic, message);
+ }
+ }
+
+ for (i = 0; i < num_subscribers; ++i) {
+ ao2_cleanup(subscribers[i]);
+ }
+ ast_free(subscribers);
+}
+
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+ stasis_forward_message(topic, topic, message);
+}
+
+/*! \brief Forwarding subscriber */
+static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct stasis_topic *to_topic = data;
+ stasis_forward_message(to_topic, topic, message);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(to_topic);
+ }
+}
+
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+{
+ struct stasis_subscription *sub;
+ if (!from_topic || !to_topic) {
+ return NULL;
+ }
+ /* Subscribe without a mailbox, since we're just forwarding messages */
+ sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+ if (sub) {
+ /* hold a ref to to_topic for this forwarding subscription */
+ ao2_ref(to_topic, +1);
+ }
+ return sub;
+}
+
+static void subscription_change_dtor(void *obj)
+{
+ struct stasis_subscription_change *change = obj;
+ ast_string_field_free_memory(change);
+ ao2_cleanup(change->topic);
+}
+
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+
+ change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
+ if (ast_string_field_init(change, 128)) {
+ return NULL;
+ }
+
+ ast_string_field_set(change, uniqueid, uniqueid);
+ ast_string_field_set(change, description, description);
+ ao2_ref(topic, +1);
+ change->topic = topic;
+
+ ao2_ref(change, +1);
+ return change;
+}
+
+struct stasis_message_type *stasis_subscription_change(void)
+{
+ return __subscription_change_message_type;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ change = subscription_change_alloc(topic, uniqueid, description);
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+}
+
+/*! \brief Cleanup function */
+static void stasis_exit(void)
+{
+ ao2_cleanup(__subscription_change_message_type);
+ __subscription_change_message_type = NULL;
+ ast_threadpool_shutdown(pool);
+ pool = NULL;
+}
+
+int stasis_init(void)
+{
+ int cache_init;
+
+ /* XXX Should this be configurable? */
+ struct ast_threadpool_options opts = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 20,
+ .auto_increment = 1,
+ .initial_size = 0,
+ .max_size = 200
+ };
+
+ ast_register_atexit(stasis_exit);
+
+ if (pool) {
+ ast_log(LOG_ERROR, "Stasis double-initialized\n");
+ return -1;
+ }
+
+ pool = ast_threadpool_create("stasis-core", NULL, &opts);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
+ return -1;
+ }
+
+ cache_init = stasis_cache_init();
+ if (cache_init != 0) {
+ return -1;
+ }
+
+ __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
+ if (!__subscription_change_message_type) {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
new file mode 100644
index 000000000..2f4cf52fd
--- /dev/null
+++ b/main/stasis_cache.c
@@ -0,0 +1,443 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/hashtab.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+#ifdef LOW_MEMORY
+#define NUM_CACHE_BUCKETS 17
+#else
+#define NUM_CACHE_BUCKETS 563
+#endif
+
+struct stasis_caching_topic {
+ struct ao2_container *cache;
+ struct stasis_topic *topic;
+ struct stasis_subscription *sub;
+ snapshot_get_id id_fn;
+};
+
+static void stasis_caching_topic_dtor(void *obj) {
+ struct stasis_caching_topic *caching_topic = obj;
+ ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+ caching_topic->sub = NULL;
+ ao2_cleanup(caching_topic->cache);
+ caching_topic->cache = NULL;
+ ao2_cleanup(caching_topic->topic);
+ caching_topic->topic = NULL;
+}
+
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
+{
+ return caching_topic->topic;
+}
+
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
+{
+ if (caching_topic) {
+ if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+ stasis_unsubscribe(caching_topic->sub);
+ } else {
+ ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
+ }
+ }
+}
+
+struct cache_entry {
+ struct stasis_message_type *type;
+ char *id;
+ struct stasis_message *snapshot;
+};
+
+static void cache_entry_dtor(void *obj)
+{
+ struct cache_entry *entry = obj;
+ ao2_cleanup(entry->type);
+ entry->type = NULL;
+ ast_free(entry->id);
+ entry->id = NULL;
+ ao2_cleanup(entry->snapshot);
+ entry->snapshot = NULL;
+}
+
+static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
+{
+ RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
+
+ ast_assert(type != NULL);
+ ast_assert(id != NULL);
+
+ entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
+ if (!entry) {
+ return NULL;
+ }
+
+ entry->id = ast_strdup(id);
+ if (!entry->id) {
+ return NULL;
+ }
+
+ ao2_ref(type, +1);
+ entry->type = type;
+ if (snapshot != NULL) {
+ ao2_ref(snapshot, +1);
+ entry->snapshot = snapshot;
+ }
+
+ ao2_ref(entry, +1);
+ return entry;
+}
+
+static int cache_entry_hash(const void *obj, int flags)
+{
+ const struct cache_entry *entry = obj;
+ int hash = 0;
+
+ ast_assert(!(flags & OBJ_KEY));
+
+ hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
+ hash += ast_hashtab_hash_string(entry->id);
+ return hash;
+}
+
+static int cache_entry_cmp(void *obj, void *arg, int flags)
+{
+ const struct cache_entry *left = obj;
+ const struct cache_entry *right = arg;
+
+ ast_assert(!(flags & OBJ_KEY));
+
+ if (left->type == right->type && strcmp(left->id, right->id) == 0) {
+ return CMP_MATCH | CMP_STOP;
+ }
+
+ return 0;
+}
+
+static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+{
+ RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
+ RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+ struct stasis_message *old_snapshot = NULL;
+
+ ast_assert(caching_topic->cache != NULL);
+
+ new_entry = cache_entry_create(type, id, new_snapshot);
+
+ if (new_snapshot == NULL) {
+ /* Remove entry from cache */
+ cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
+ if (cached_entry) {
+ old_snapshot = cached_entry->snapshot;
+ cached_entry->snapshot = NULL;
+ }
+ } else {
+ /* Insert/update cache */
+ SCOPED_AO2LOCK(lock, caching_topic->cache);
+
+ cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
+ if (cached_entry) {
+ /* Update cache. Because objects are moving, no need to update refcounts. */
+ old_snapshot = cached_entry->snapshot;
+ cached_entry->snapshot = new_entry->snapshot;
+ new_entry->snapshot = NULL;
+ } else {
+ /* Insert into the cache */
+ ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
+ }
+
+ }
+
+ return old_snapshot;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
+{
+ RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
+ RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+
+ ast_assert(caching_topic->cache != NULL);
+
+ search_entry = cache_entry_create(type, id, NULL);
+ if (search_entry == NULL) {
+ return NULL;
+ }
+
+ cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
+ if (cached_entry == NULL) {
+ return NULL;
+ }
+
+ ast_assert(cached_entry->snapshot != NULL);
+ ao2_ref(cached_entry->snapshot, +1);
+ return cached_entry->snapshot;
+}
+
+static struct stasis_message_type *__cache_clear_data;
+
+static struct stasis_message_type *cache_clear_data(void)
+{
+ ast_assert(__cache_clear_data != NULL);
+ return __cache_clear_data;
+}
+
+static struct stasis_message_type *__cache_update;
+
+struct stasis_message_type *stasis_cache_update(void)
+{
+ ast_assert(__cache_update != NULL);
+ return __cache_update;
+}
+
+struct cache_clear_data {
+ struct stasis_message_type *type;
+ char *id;
+};
+
+static void cache_clear_data_dtor(void *obj)
+{
+ struct cache_clear_data *ev = obj;
+ ast_free(ev->id);
+ ev->id = NULL;
+ ao2_cleanup(ev->type);
+ ev->type = NULL;
+}
+
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
+{
+ RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
+ if (!ev) {
+ return NULL;
+ }
+
+ ev->id = ast_strdup(id);
+ if (!ev->id) {
+ return NULL;
+ }
+ ao2_ref(type, +1);
+ ev->type = type;
+
+ msg = stasis_message_create(cache_clear_data(), ev);
+
+ if (!msg) {
+ return NULL;
+ }
+
+ ao2_ref(msg, +1);
+ return msg;
+}
+
+static void stasis_cache_update_dtor(void *obj)
+{
+ struct stasis_cache_update *update = obj;
+ ao2_cleanup(update->topic);
+ update->topic = NULL;
+ ao2_cleanup(update->old_snapshot);
+ update->old_snapshot = NULL;
+ ao2_cleanup(update->new_snapshot);
+ update->new_snapshot = NULL;
+ ao2_cleanup(update->type);
+ update->type = NULL;
+}
+
+static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+{
+ RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ast_assert(topic != NULL);
+ ast_assert(old_snapshot != NULL || new_snapshot != NULL);
+
+ update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
+ if (!update) {
+ return NULL;
+ }
+
+ ao2_ref(topic, +1);
+ update->topic = topic;
+ if (old_snapshot) {
+ ao2_ref(old_snapshot, +1);
+ update->old_snapshot = old_snapshot;
+ if (!new_snapshot) {
+ ao2_ref(stasis_message_type(old_snapshot), +1);
+ update->type = stasis_message_type(old_snapshot);
+ }
+ }
+ if (new_snapshot) {
+ ao2_ref(new_snapshot, +1);
+ update->new_snapshot = new_snapshot;
+ ao2_ref(stasis_message_type(new_snapshot), +1);
+ update->type = stasis_message_type(new_snapshot);
+ }
+
+ msg = stasis_message_create(stasis_cache_update(), update);
+ if (!msg) {
+ return NULL;
+ }
+
+ ao2_ref(msg, +1);
+ return msg;
+}
+
+static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
+ struct stasis_caching_topic *caching_topic = data;
+ const char *id = NULL;
+
+ ast_assert(caching_topic->topic != NULL);
+ ast_assert(caching_topic->id_fn != NULL);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ caching_topic_needs_unref = caching_topic;
+ }
+
+ /* Handle cache clear event */
+ if (cache_clear_data() == stasis_message_type(message)) {
+ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+ struct cache_clear_data *clear = stasis_message_data(message);
+ ast_assert(clear->type != NULL);
+ ast_assert(clear->id != NULL);
+ old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
+ if (old_snapshot) {
+ update = update_create(topic, old_snapshot, NULL);
+ stasis_publish(caching_topic->topic, update);
+ } else {
+ ast_log(LOG_ERROR,
+ "Attempting to remove an item from the cache that isn't there: %s %s\n",
+ stasis_message_type_name(clear->type), clear->id);
+ }
+ return;
+ }
+
+ id = caching_topic->id_fn(message);
+ if (id == NULL) {
+ /* Object isn't cached; forward */
+ stasis_forward_message(caching_topic->topic, topic, message);
+ } else {
+ /* Update the cache */
+ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+
+ old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
+
+ update = update_create(topic, old_snapshot, message);
+ if (update == NULL) {
+ return;
+ }
+
+ stasis_publish(caching_topic->topic, update);
+ }
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(caching_topic);
+ }
+}
+
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ struct stasis_subscription *sub;
+ RAII_VAR(char *, new_name, NULL, free);
+ int ret;
+
+ ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
+ if (ret < 0) {
+ return NULL;
+ }
+
+ caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
+ if (caching_topic == NULL) {
+ return NULL;
+ }
+
+ caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
+ if (!caching_topic->cache) {
+ ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
+ return NULL;
+ }
+
+ caching_topic->topic = stasis_topic_create(new_name);
+ if (caching_topic->topic == NULL) {
+ return NULL;
+ }
+
+ caching_topic->id_fn = id_fn;
+
+ sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+ if (sub == NULL) {
+ return NULL;
+ }
+ /* This is for the reference contained in the subscription above */
+ ao2_ref(caching_topic, +1);
+ caching_topic->sub = sub;
+
+ ao2_ref(caching_topic, +1);
+ return caching_topic;
+}
+
+static void stasis_cache_exit(void)
+{
+ ao2_cleanup(__cache_clear_data);
+ __cache_clear_data = NULL;
+ ao2_cleanup(__cache_update);
+ __cache_update = NULL;
+}
+
+int stasis_cache_init(void)
+{
+ ast_register_atexit(stasis_cache_exit);
+
+ if (__cache_clear_data || __cache_update) {
+ ast_log(LOG_ERROR, "Stasis cache double initialized\n");
+ return -1;
+ }
+
+ __cache_update = stasis_message_type_create("stasis_cache_update");
+ if (!__cache_update) {
+ return -1;
+ }
+
+ __cache_clear_data = stasis_message_type_create("StasisCacheClear");
+ if (!__cache_clear_data) {
+ return -1;
+ }
+ return 0;
+}
+
diff --git a/main/stasis_message.c b/main/stasis_message.c
new file mode 100644
index 000000000..8d397b935
--- /dev/null
+++ b/main/stasis_message.c
@@ -0,0 +1,135 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+/*! \private */
+struct stasis_message_type {
+ char *name;
+};
+
+static void message_type_dtor(void *obj)
+{
+ struct stasis_message_type *type = obj;
+ ast_free(type->name);
+ type->name = NULL;
+}
+
+struct stasis_message_type *stasis_message_type_create(const char *name)
+{
+ RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+
+ type = ao2_alloc(sizeof(*type), message_type_dtor);
+ if (!type) {
+ return NULL;
+ }
+
+ type->name = ast_strdup(name);
+ if (!type->name) {
+ return NULL;
+ }
+
+ ao2_ref(type, +1);
+ return type;
+}
+
+const char *stasis_message_type_name(const struct stasis_message_type *type)
+{
+ return type->name;
+}
+
+/*! \private */
+struct stasis_message {
+ /*! Time the message was created */
+ struct timeval timestamp;
+ /*! Type of the message */
+ struct stasis_message_type *type;
+ /*! Message content */
+ void *data;
+};
+
+static void stasis_message_dtor(void *obj)
+{
+ struct stasis_message *message = obj;
+ ao2_cleanup(message->type);
+ ao2_cleanup(message->data);
+}
+
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ if (type == NULL || data == NULL) {
+ return NULL;
+ }
+
+ message = ao2_alloc(sizeof(*message), stasis_message_dtor);
+ if (message == NULL) {
+ return NULL;
+ }
+
+ message->timestamp = ast_tvnow();
+ ao2_ref(type, +1);
+ message->type = type;
+ ao2_ref(data, +1);
+ message->data = data;
+
+ ao2_ref(message, +1);
+ return message;
+}
+
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return msg->type;
+}
+
+void *stasis_message_data(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return msg->data;
+}
+
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return &msg->timestamp;
+}