summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-03-08 15:15:13 +0000
committerDavid M. Lee <dlee@digium.com>2013-03-08 15:15:13 +0000
commit4edd8be35cdef3b212355c48b68319f2304bc9f2 (patch)
tree19eb1e097c68641f443584f6585a71bb4a4d6f77 /main
parentf6f6bc7b5932808d570e41869c47c3fc54ab6bf8 (diff)
This patch adds a new message bus API to Asterisk.
For the initial use of this bus, I took some work kmoore did creating channel snapshots. So rather than create AMI events directly in the channel code, this patch generates Stasis events, which manager.c uses to then publish the AMI event. This message bus provides a generic publish/subscribe mechanism within Asterisk. This message bus is: - Loosely coupled; new message types can be added in seperate modules. - Easy to use; publishing and subscribing are straightforward operations. In addition to basic publish/subscribe, the patch also provides mechanisms for message forwarding, and for message caching. (issue ASTERISK-20887) (closes issue ASTERISK-20959) Review: https://reviewboard.asterisk.org/r/2339/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@382685 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r--main/asterisk.c6
-rw-r--r--main/asterisk.exports.in1
-rw-r--r--main/channel.c287
-rw-r--r--main/channel_internal_api.c21
-rw-r--r--main/manager.c200
-rw-r--r--main/pbx.c13
-rw-r--r--main/stasis.c514
-rw-r--r--main/stasis_cache.c443
-rw-r--r--main/stasis_message.c135
9 files changed, 1506 insertions, 114 deletions
diff --git a/main/asterisk.c b/main/asterisk.c
index 1d5371938..4e5e58c05 100644
--- a/main/asterisk.c
+++ b/main/asterisk.c
@@ -240,6 +240,7 @@ int daemon(int, int); /* defined in libresolv of all places */
#include "asterisk/aoc.h"
#include "asterisk/uuid.h"
#include "asterisk/sorcery.h"
+#include "asterisk/stasis.h"
#include "../defaults.h"
@@ -4120,6 +4121,11 @@ int main(int argc, char *argv[])
exit(1);
}
+ if (stasis_init()) {
+ printf("Stasis initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
ast_makesocket();
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
diff --git a/main/asterisk.exports.in b/main/asterisk.exports.in
index 49d3a44a4..3157b8319 100644
--- a/main/asterisk.exports.in
+++ b/main/asterisk.exports.in
@@ -32,6 +32,7 @@
LINKER_SYMBOL_PREFIXdialed_interface_info;
LINKER_SYMBOL_PREFIXstrsep;
LINKER_SYMBOL_PREFIXsetenv;
+ LINKER_SYMBOL_PREFIXstasis_*;
LINKER_SYMBOL_PREFIXunsetenv;
LINKER_SYMBOL_PREFIXstrcasestr;
LINKER_SYMBOL_PREFIXstrnlen;
diff --git a/main/channel.c b/main/channel.c
index 9c8f32cb5..df0a67b3b 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -152,6 +152,15 @@ static AST_RWLIST_HEAD_STATIC(backends, chanlist);
/*! \brief All active channels on the system */
static struct ao2_container *channels;
+/*! \brief Message type for channel snapshot events */
+static struct stasis_message_type *__channel_snapshot;
+
+static struct stasis_message_type *__channel_varset;
+
+struct stasis_topic *__channel_topic_all;
+
+struct stasis_caching_topic *__channel_topic_all_cached;
+
/*! \brief map AST_CAUSE's to readable string representations
*
* \ref causes.h
@@ -214,6 +223,77 @@ static const struct causes_map causes[] = {
{ AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" },
};
+static void publish_channel_state(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ snapshot = ast_channel_snapshot_create(chan);
+ if (!snapshot) {
+ ast_log(LOG_ERROR, "Allocation error\n");
+ return;
+ }
+
+ message = stasis_message_create(ast_channel_snapshot(), snapshot);
+ if (!message) {
+ return;
+ }
+
+ ast_assert(ast_channel_topic(chan) != NULL);
+ stasis_publish(ast_channel_topic(chan), message);
+}
+
+static void channel_varset_dtor(void *obj)
+{
+ struct ast_channel_varset *event = obj;
+ ao2_cleanup(event->snapshot);
+ ast_free(event->variable);
+ ast_free(event->value);
+}
+
+void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
+{
+ RAII_VAR(struct ast_channel_varset *, event, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ event = ao2_alloc(sizeof(*event), channel_varset_dtor);
+ if (!event) {
+ return;
+ }
+
+ if (chan) {
+ event->snapshot = ast_channel_snapshot_create(chan);
+ if (event->snapshot == NULL) {
+ return;
+ }
+ }
+ event->variable = ast_strdup(name);
+ event->value = ast_strdup(value);
+ if (event->variable == NULL || event->value == NULL) {
+ return;
+ }
+
+ msg = stasis_message_create(ast_channel_varset(), event);
+ if (!msg) {
+ return;
+ }
+
+ if (chan) {
+ stasis_publish(ast_channel_topic(chan), msg);
+ } else {
+ stasis_publish(ast_channel_topic_all(), msg);
+ }
+}
+
+
+static void publish_cache_clear(struct ast_channel *chan)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan));
+ stasis_publish(ast_channel_topic(chan), message);
+}
+
struct ast_variable *ast_channeltype_list(void)
{
struct chanlist *cl;
@@ -1073,6 +1153,8 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp));
}
+ ast_channel_internal_setup_topics(tmp);
+
if (!ast_strlen_zero(name_fmt)) {
char *slash, *slash2;
/* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call.
@@ -1145,34 +1227,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
* a lot of data into this func to do it here!
*/
if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) {
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a new channel is created.</synopsis>
- <syntax>
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" />
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" />
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "AccountCode: %s\r\n"
- "Exten: %s\r\n"
- "Context: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(tmp),
- state,
- ast_state2str(state),
- S_OR(cid_num, ""),
- S_OR(cid_name, ""),
- ast_channel_accountcode(tmp),
- S_OR(exten, ""),
- S_OR(context, ""),
- ast_channel_uniqueid(tmp));
+ publish_channel_state(tmp);
}
ast_channel_internal_finalize(tmp);
@@ -2893,39 +2948,9 @@ int ast_hangup(struct ast_channel *chan)
ast_channel_unlock(chan);
ast_cc_offer(chan);
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel is hung up.</synopsis>
- <syntax>
- <parameter name="Cause">
- <para>A numeric cause code for why the channel was hung up.</para>
- </parameter>
- <parameter name="Cause-txt">
- <para>A description of why the channel was hung up.</para>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup",
- "Channel: %s\r\n"
- "Uniqueid: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "AccountCode: %s\r\n"
- "Cause: %d\r\n"
- "Cause-txt: %s\r\n",
- ast_channel_name(chan),
- ast_channel_uniqueid(chan),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"),
- ast_channel_accountcode(chan),
- ast_channel_hangupcause(chan),
- ast_cause2str(ast_channel_hangupcause(chan))
- );
+
+ publish_channel_state(chan);
+ publish_cache_clear(chan);
if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) &&
!ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) &&
@@ -7435,47 +7460,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state)
* we override what they are saying the state is and things go amuck. */
ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name);
- /* setstate used to conditionally report Newchannel; this is no more */
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel's state changes.</synopsis>
- <syntax>
- <parameter name="ChannelState">
- <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
- </parameter>
- <parameter name="ChannelStateDesc">
- <enumlist>
- <enum name="Down"/>
- <enum name="Rsrvd"/>
- <enum name="OffHook"/>
- <enum name="Dialing"/>
- <enum name="Ring"/>
- <enum name="Ringing"/>
- <enum name="Up"/>
- <enum name="Busy"/>
- <enum name="Dialing Offhook"/>
- <enum name="Pre-ring"/>
- <enum name="Unknown"/>
- </enumlist>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""),
- ast_channel_uniqueid(chan));
+ publish_channel_state(chan);
return 0;
}
@@ -8644,6 +8629,14 @@ static void prnt_channel_key(void *v_obj, void *where, ao2_prnt_fn *prnt)
static void channels_shutdown(void)
{
+ ao2_cleanup(__channel_snapshot);
+ __channel_snapshot = NULL;
+ ao2_cleanup(__channel_varset);
+ __channel_varset = NULL;
+ ao2_cleanup(__channel_topic_all);
+ __channel_topic_all = NULL;
+ stasis_caching_unsubscribe(__channel_topic_all_cached);
+ __channel_topic_all_cached = NULL;
ast_data_unregister(NULL);
ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
if (channels) {
@@ -8653,6 +8646,16 @@ static void channels_shutdown(void)
}
}
+static const char *channel_snapshot_get_id(struct stasis_message *message)
+{
+ struct ast_channel_snapshot *snapshot;
+ if (ast_channel_snapshot() != stasis_message_type(message)) {
+ return NULL;
+ }
+ snapshot = stasis_message_data(message);
+ return snapshot->uniqueid;
+}
+
void ast_channels_init(void)
{
channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS,
@@ -8661,6 +8664,12 @@ void ast_channels_init(void)
ao2_container_register("channels", channels, prnt_channel_key);
}
+ __channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+ __channel_varset = stasis_message_type_create("ast_channel_varset");
+
+ __channel_topic_all = stasis_topic_create("ast_channel_topic_all");
+ __channel_topic_all_cached = stasis_caching_topic_create(__channel_topic_all, channel_snapshot_get_id);
+
ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers));
@@ -8668,6 +8677,7 @@ void ast_channels_init(void)
ast_plc_reload();
ast_register_atexit(channels_shutdown);
+
}
/*! \brief Print call group and pickup group ---*/
@@ -11241,6 +11251,79 @@ int ast_channel_get_cc_agent_type(struct ast_channel *chan, char *agent_type, si
return 0;
}
+static void ast_channel_snapshot_dtor(void *obj)
+{
+ struct ast_channel_snapshot *snapshot = obj;
+ ast_string_field_free_memory(snapshot);
+}
+
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor);
+ if (ast_string_field_init(snapshot, 1024)) {
+ return NULL;
+ }
+
+ ast_string_field_set(snapshot, name, ast_channel_name(chan));
+ ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
+ ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
+ ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
+ ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
+ ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
+ ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan));
+ ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
+ if (ast_channel_appl(chan)) {
+ ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
+ }
+ if (ast_channel_data(chan)) {
+ ast_string_field_set(snapshot, data, ast_channel_data(chan));
+ }
+ ast_string_field_set(snapshot, context, ast_channel_context(chan));
+ ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
+
+ ast_string_field_set(snapshot, caller_name,
+ S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, caller_number,
+ S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
+
+ ast_string_field_set(snapshot, connected_name,
+ S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, connected_number,
+ S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
+
+ snapshot->creationtime = ast_channel_creationtime(chan);
+ snapshot->state = ast_channel_state(chan);
+ snapshot->priority = ast_channel_priority(chan);
+ snapshot->amaflags = ast_channel_amaflags(chan);
+ snapshot->hangupcause = ast_channel_hangupcause(chan);
+ snapshot->flags = *ast_channel_flags(chan);
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
+
+struct stasis_message_type *ast_channel_varset(void)
+{
+ return __channel_varset;
+}
+
+struct stasis_message_type *ast_channel_snapshot(void)
+{
+ return __channel_snapshot;
+}
+
+struct stasis_topic *ast_channel_topic_all(void)
+{
+ return __channel_topic_all;
+}
+
+struct stasis_caching_topic *ast_channel_topic_all_cached(void)
+{
+ return __channel_topic_all_cached;
+}
+
/* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY
*
* ONLY FUNCTIONS FOR PROVIDING BACKWARDS ABI COMPATIBILITY BELONG HERE
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index 3f892ddef..8cc2e6c62 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -195,6 +195,8 @@ struct ast_channel {
char dtmf_digit_to_emulate; /*!< Digit being emulated */
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
+ struct stasis_topic *topic; /*!< Topic for all channel's events */
+ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
};
/* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1364,6 +1366,12 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
}
ast_string_field_free_memory(chan);
+
+ stasis_unsubscribe(chan->forwarder);
+ chan->forwarder = NULL;
+
+ ao2_cleanup(chan->topic);
+ chan->topic = NULL;
}
void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1375,3 +1383,16 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan)
{
return chan->finalized;
}
+
+struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
+{
+ return chan->topic;
+}
+
+void ast_channel_internal_setup_topics(struct ast_channel *chan)
+{
+ ast_assert(chan->topic == NULL);
+ ast_assert(chan->forwarder == NULL);
+ chan->topic = stasis_topic_create(chan->uniqueid);
+ chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
+}
diff --git a/main/manager.c b/main/manager.c
index fc0ec2631..10a3a3397 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -91,6 +91,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/aoc.h"
#include "asterisk/stringfields.h"
#include "asterisk/presencestate.h"
+#include "asterisk/stasis.h"
/*** DOCUMENTATION
<manager name="Ping" language="en_US">
@@ -963,6 +964,73 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
manager.conf will be present upon starting a new session.</para>
</description>
</manager>
+ <managerEvent language="en_US" name="Newchannel">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a new channel is created.</synopsis>
+ <syntax>
+ <parameter name="Channel">
+ </parameter>
+ <parameter name="ChannelState">
+ <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
+ </parameter>
+ <parameter name="ChannelStateDesc">
+ <enumlist>
+ <enum name="Down"/>
+ <enum name="Rsrvd"/>
+ <enum name="OffHook"/>
+ <enum name="Dialing"/>
+ <enum name="Ring"/>
+ <enum name="Ringing"/>
+ <enum name="Up"/>
+ <enum name="Busy"/>
+ <enum name="Dialing Offhook"/>
+ <enum name="Pre-ring"/>
+ <enum name="Unknown"/>
+ </enumlist>
+ </parameter>
+ <parameter name="CallerIDNum">
+ </parameter>
+ <parameter name="CallerIDName">
+ </parameter>
+ <parameter name="ConnectedLineNum">
+ </parameter>
+ <parameter name="ConnectedLineName">
+ </parameter>
+ <parameter name="AccountCode">
+ </parameter>
+ <parameter name="Context">
+ </parameter>
+ <parameter name="Exten">
+ </parameter>
+ <parameter name="Priority">
+ </parameter>
+ <parameter name="Uniqueid">
+ </parameter>
+ <parameter name="Cause">
+ <para>A numeric cause code for why the channel was hung up.</para>
+ </parameter>
+ <parameter name="Cause-txt">
+ <para>A description of why the channel was hung up.</para>
+ </parameter>
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
+ <managerEvent language="en_US" name="Newstate">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a channel's state changes.</synopsis>
+ <syntax>
+ <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
+ <managerEvent language="en_US" name="Hangup">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a channel is hung up.</synopsis>
+ <syntax>
+ <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+ </syntax>
+ </managerEventInstance>
+ </managerEvent>
***/
/*! \addtogroup Group_AMI AMI functions
@@ -1060,6 +1128,8 @@ static const struct {
{{ "restart", "gracefully", NULL }},
};
+static struct stasis_subscription *channel_state_sub;
+
static void acl_change_event_cb(const struct ast_event *event, void *userdata);
static void acl_change_event_subscribe(void)
@@ -7376,6 +7446,127 @@ static void load_channelvars(struct ast_variable *var)
AST_RWLIST_UNLOCK(&channelvars);
}
+/*!
+ * \brief Generate the AMI message body from a channel snapshot
+ * \internal
+ *
+ * \param snapshot the channel snapshot for which to generate an AMI message body
+ *
+ * \retval NULL on error
+ * \retval ast_str* on success (must be ast_freed by caller)
+ */
+static struct ast_str *manager_build_channel_state_string(const struct ast_channel_snapshot *snapshot)
+{
+ struct ast_str *out = ast_str_create(1024);
+ int res = 0;
+ if (!out) {
+ return NULL;
+ }
+ res = ast_str_set(&out, 0,
+ "Channel: %s\r\n"
+ "ChannelState: %d\r\n"
+ "ChannelStateDesc: %s\r\n"
+ "CallerIDNum: %s\r\n"
+ "CallerIDName: %s\r\n"
+ "ConnectedLineNum: %s\r\n"
+ "ConnectedLineName: %s\r\n"
+ "AccountCode: %s\r\n"
+ "Context: %s\r\n"
+ "Exten: %s\r\n"
+ "Priority: %d\r\n"
+ "Uniqueid: %s\r\n"
+ "Cause: %d\r\n"
+ "Cause-txt: %s\r\n",
+ snapshot->name,
+ snapshot->state,
+ ast_state2str(snapshot->state),
+ snapshot->caller_number,
+ snapshot->caller_name,
+ snapshot->connected_number,
+ snapshot->connected_name,
+ snapshot->accountcode,
+ snapshot->context,
+ snapshot->exten,
+ snapshot->priority,
+ snapshot->uniqueid,
+ snapshot->hangupcause,
+ ast_cause2str(snapshot->hangupcause));
+
+ if (!res) {
+ return NULL;
+ }
+
+ return out;
+}
+
+static void channel_snapshot_update(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot)
+{
+ int is_hungup;
+ char *manager_event = NULL;
+
+ if (!new_snapshot) {
+ /* Ignore cache clearing events; we'll see the hangup first */
+ return;
+ }
+
+ is_hungup = ast_test_flag(&new_snapshot->flags, AST_FLAG_ZOMBIE) ? 1 : 0;
+
+ if (!old_snapshot) {
+ manager_event = "Newchannel";
+ }
+
+ if (old_snapshot && old_snapshot->state != new_snapshot->state) {
+ manager_event = "Newstate";
+ }
+
+ if (old_snapshot && is_hungup) {
+ manager_event = "Hangup";
+ }
+
+ if (manager_event) {
+ RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+
+ channel_event_string = manager_build_channel_state_string(new_snapshot);
+ if (channel_event_string) {
+ manager_event(EVENT_FLAG_CALL, manager_event, "%s", ast_str_buffer(channel_event_string));
+ }
+ }
+}
+
+static void channel_varset(const char *channel_name, const char *uniqueid, const char *name, const char *value)
+{
+ /*** DOCUMENTATION
+ <managerEventInstance>
+ <synopsis>Raised when a variable is set to a particular value.</synopsis>
+ </managerEventInstance>
+ ***/
+ manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
+ "Channel: %s\r\n"
+ "Variable: %s\r\n"
+ "Value: %s\r\n"
+ "Uniqueid: %s\r\n",
+ channel_name, name, value, uniqueid);
+}
+
+static void channel_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ if (stasis_message_type(message) == stasis_cache_update()) {
+ struct stasis_cache_update *update = stasis_message_data(message);
+ if (ast_channel_snapshot() == update->type) {
+ struct ast_channel_snapshot *old_snapshot =
+ stasis_message_data(update->old_snapshot);
+ struct ast_channel_snapshot *new_snapshot =
+ stasis_message_data(update->new_snapshot);
+ channel_snapshot_update(old_snapshot, new_snapshot);
+ }
+ } else if (stasis_message_type(message) == ast_channel_varset()) {
+ struct ast_channel_varset *varset = stasis_message_data(message);
+ const char *name = varset->snapshot ? varset->snapshot->name : "none";
+ const char *uniqueid = varset->snapshot ? varset->snapshot->uniqueid : "none";
+ channel_varset(name, uniqueid, varset->variable, varset->value);
+ }
+}
+
/*! \internal \brief Free a user record. Should already be removed from the list */
static void manager_free_user(struct ast_manager_user *user)
{
@@ -7399,6 +7590,9 @@ static void manager_shutdown(void)
{
struct ast_manager_user *user;
+ stasis_unsubscribe(channel_state_sub);
+ channel_state_sub = NULL;
+
if (registered) {
ast_manager_unregister("Ping");
ast_manager_unregister("Events");
@@ -7490,6 +7684,12 @@ static int __init_manager(int reload, int by_external_config)
manager_enabled = 0;
+ if (!channel_state_sub) {
+ channel_state_sub = stasis_subscribe(
+ stasis_caching_get_topic(ast_channel_topic_all_cached()),
+ channel_event_cb, NULL);
+ }
+
if (!registered) {
/* Register default actions */
ast_manager_register_xml_core("Ping", 0, action_ping);
diff --git a/main/pbx.c b/main/pbx.c
index bf95ccbe2..82bbb5257 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -11453,18 +11453,7 @@ int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const
ast_verb(2, "Setting global variable '%s' to '%s'\n", name, value);
newvariable = ast_var_assign(name, value);
AST_LIST_INSERT_HEAD(headp, newvariable, entries);
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a variable is set to a particular value.</synopsis>
- </managerEventInstance>
- ***/
- manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
- "Channel: %s\r\n"
- "Variable: %s\r\n"
- "Value: %s\r\n"
- "Uniqueid: %s\r\n",
- chan ? ast_channel_name(chan) : "none", name, value,
- chan ? ast_channel_uniqueid(chan) : "none");
+ ast_channel_publish_varset(chan, name, value);
}
if (chan)
diff --git a/main/stasis.c b/main/stasis.c
new file mode 100644
index 000000000..f94736bf1
--- /dev/null
+++ b/main/stasis.c
@@ -0,0 +1,514 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/threadpool.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/utils.h"
+#include "asterisk/uuid.h"
+
+/*! Initial size of the subscribers list. */
+#define INITIAL_SUBSCRIBERS_MAX 4
+
+/*! Threadpool for dispatching notifications to subscribers */
+static struct ast_threadpool *pool;
+
+static struct stasis_message_type *__subscription_change_message_type;
+
+/*! \private */
+struct stasis_topic {
+ char *name;
+ /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
+ struct stasis_subscription **subscribers;
+ /*! Allocated length of the subscribers array */
+ size_t num_subscribers_max;
+ /*! Current size of the subscribers array */
+ size_t num_subscribers_current;
+};
+
+/* Forward declarations for the tightly-coupled subscription object */
+struct stasis_subscription;
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+
+static void topic_dtor(void *obj)
+{
+ struct stasis_topic *topic = obj;
+ ast_free(topic->name);
+ topic->name = NULL;
+ ast_free(topic->subscribers);
+ topic->subscribers = NULL;
+}
+
+struct stasis_topic *stasis_topic_create(const char *name)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+
+ topic = ao2_alloc(sizeof(*topic), topic_dtor);
+
+ if (!topic) {
+ return NULL;
+ }
+
+ topic->name = ast_strdup(name);
+ if (!topic->name) {
+ return NULL;
+ }
+
+ topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
+ topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers));
+ if (!topic->subscribers) {
+ return NULL;
+ }
+
+ ao2_ref(topic, +1);
+ return topic;
+}
+
+const char *stasis_topic_name(const struct stasis_topic *topic)
+{
+ return topic->name;
+}
+
+/*! \private */
+struct stasis_subscription {
+ /*! Unique ID for this subscription */
+ char *uniqueid;
+ /*! Topic subscribed to. */
+ struct stasis_topic *topic;
+ /*! Mailbox for processing incoming messages. */
+ struct ast_taskprocessor *mailbox;
+ /*! Callback function for incoming message processing. */
+ stasis_subscription_cb callback;
+ /*! Data pointer to be handed to the callback. */
+ void *data;
+};
+
+static void subscription_dtor(void *obj)
+{
+ struct stasis_subscription *sub = obj;
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+ ast_free(sub->uniqueid);
+ sub->uniqueid = NULL;
+ ao2_cleanup(sub->topic);
+ sub->topic = NULL;
+ ast_taskprocessor_unreference(sub->mailbox);
+ sub->mailbox = NULL;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+
+static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
+{
+ RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
+ char uniqueid[AST_UUID_STR_LEN];
+
+ sub = ao2_alloc(sizeof(*sub), subscription_dtor);
+ if (!sub) {
+ return NULL;
+ }
+
+ id = ast_uuid_generate();
+ if (!id) {
+ ast_log(LOG_ERROR, "UUID generation failed\n");
+ return NULL;
+ }
+ ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
+ if (needs_mailbox) {
+ sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
+ if (!sub->mailbox) {
+ return NULL;
+ }
+ }
+
+ sub->uniqueid = ast_strdup(uniqueid);
+ ao2_ref(topic, +1);
+ sub->topic = topic;
+ sub->callback = callback;
+ sub->data = data;
+
+ if (topic_add_subscription(topic, sub) != 0) {
+ return NULL;
+ }
+ send_subscription_change_message(topic, uniqueid, "Subscribe");
+
+ ao2_ref(sub, +1);
+ return sub;
+}
+
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+{
+ return __stasis_subscribe(topic, callback, data, 1);
+}
+
+void stasis_unsubscribe(struct stasis_subscription *sub)
+{
+ if (sub) {
+ size_t i;
+ struct stasis_topic *topic = sub->topic;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ if (topic->subscribers[i] == sub) {
+ send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
+ /* swap [i] with last entry; remove last entry */
+ topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
+ return;
+ }
+ }
+
+ ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+ }
+}
+
+int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
+{
+ if (sub) {
+ size_t i;
+ struct stasis_topic *topic = sub->topic;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ if (topic->subscribers[i] == sub) {
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
+{
+ return sub->uniqueid;
+}
+
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
+{
+ struct stasis_subscription_change *change;
+ if (stasis_message_type(msg) != stasis_subscription_change()) {
+ return 0;
+ }
+
+ change = stasis_message_data(msg);
+ if (strcmp("Unsubscribe", change->description)) {
+ return 0;
+ }
+
+ if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*!
+ * \brief Add a subscriber to a topic.
+ * \param topic Topic
+ * \param sub Subscriber
+ * \return 0 on success
+ * \return Non-zero on error
+ */
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+ struct stasis_subscription **subscribers;
+ SCOPED_AO2LOCK(lock, topic);
+
+ /* Increase list size, if needed */
+ if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
+ subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
+ if (!subscribers) {
+ return -1;
+ }
+ topic->subscribers = subscribers;
+ topic->num_subscribers_max *= 2;
+ }
+
+ /* Don't ref sub here or we'll cause a reference cycle. */
+ topic->subscribers[topic->num_subscribers_current++] = sub;
+ return 0;
+}
+
+/*!
+ * \private
+ * \brief Information needed to dispatch a message to a subscription
+ */
+struct dispatch {
+ /*! Topic message was published to */
+ struct stasis_topic *topic;
+ /*! The message itself */
+ struct stasis_message *message;
+ /*! Subscription receiving the message */
+ struct stasis_subscription *sub;
+};
+
+static void dispatch_dtor(void *data)
+{
+ struct dispatch *dispatch = data;
+ ao2_cleanup(dispatch->topic);
+ ao2_cleanup(dispatch->message);
+ ao2_cleanup(dispatch->sub);
+}
+
+static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+{
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ ast_assert(topic != NULL);
+ ast_assert(message != NULL);
+ ast_assert(sub != NULL);
+
+ dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
+ if (!dispatch) {
+ return NULL;
+ }
+
+ dispatch->topic = topic;
+ ao2_ref(topic, +1);
+
+ dispatch->message = message;
+ ao2_ref(message, +1);
+
+ dispatch->sub = sub;
+ ao2_ref(sub, +1);
+
+ ao2_ref(dispatch, +1);
+ return dispatch;
+}
+
+/*!
+ * \brief Dispatch a message to a subscriber
+ * \param data \ref dispatch object
+ * \return 0
+ */
+static int dispatch_exec(void *data)
+{
+ RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
+
+ /* Since sub->topic doesn't change, no need to lock sub */
+ ast_assert(dispatch->sub->topic != NULL);
+ ao2_ref(dispatch->sub->topic, +1);
+ sub_topic = dispatch->sub->topic;
+
+ dispatch->sub->callback(dispatch->sub->data,
+ dispatch->sub,
+ sub_topic,
+ dispatch->message);
+
+ return 0;
+}
+
+void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+ struct stasis_subscription **subscribers = NULL;
+ size_t num_subscribers, i;
+
+ /* Copy the subscribers, so we don't have to hold the mutex for long */
+ {
+ SCOPED_AO2LOCK(lock, topic);
+ num_subscribers = topic->num_subscribers_current;
+ subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
+ if (subscribers) {
+ for (i = 0; i < num_subscribers; ++i) {
+ ao2_ref(topic->subscribers[i], +1);
+ subscribers[i] = topic->subscribers[i];
+ }
+ }
+ }
+
+ if (!subscribers) {
+ ast_log(LOG_ERROR, "Dropping message\n");
+ return;
+ }
+
+ for (i = 0; i < num_subscribers; ++i) {
+ struct stasis_subscription *sub = subscribers[i];
+
+ ast_assert(sub != NULL);
+
+ if (sub->mailbox) {
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ break;
+ }
+
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+ dispatch = NULL; /* Ownership transferred to mailbox */
+ }
+ } else {
+ /* No mailbox; dispatch directly */
+ sub->callback(sub->data, sub, sub->topic, message);
+ }
+ }
+
+ for (i = 0; i < num_subscribers; ++i) {
+ ao2_cleanup(subscribers[i]);
+ }
+ ast_free(subscribers);
+}
+
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+ stasis_forward_message(topic, topic, message);
+}
+
+/*! \brief Forwarding subscriber */
+static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct stasis_topic *to_topic = data;
+ stasis_forward_message(to_topic, topic, message);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(to_topic);
+ }
+}
+
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+{
+ struct stasis_subscription *sub;
+ if (!from_topic || !to_topic) {
+ return NULL;
+ }
+ /* Subscribe without a mailbox, since we're just forwarding messages */
+ sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+ if (sub) {
+ /* hold a ref to to_topic for this forwarding subscription */
+ ao2_ref(to_topic, +1);
+ }
+ return sub;
+}
+
+static void subscription_change_dtor(void *obj)
+{
+ struct stasis_subscription_change *change = obj;
+ ast_string_field_free_memory(change);
+ ao2_cleanup(change->topic);
+}
+
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+
+ change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
+ if (ast_string_field_init(change, 128)) {
+ return NULL;
+ }
+
+ ast_string_field_set(change, uniqueid, uniqueid);
+ ast_string_field_set(change, description, description);
+ ao2_ref(topic, +1);
+ change->topic = topic;
+
+ ao2_ref(change, +1);
+ return change;
+}
+
+struct stasis_message_type *stasis_subscription_change(void)
+{
+ return __subscription_change_message_type;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ change = subscription_change_alloc(topic, uniqueid, description);
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+}
+
+/*! \brief Cleanup function */
+static void stasis_exit(void)
+{
+ ao2_cleanup(__subscription_change_message_type);
+ __subscription_change_message_type = NULL;
+ ast_threadpool_shutdown(pool);
+ pool = NULL;
+}
+
+int stasis_init(void)
+{
+ int cache_init;
+
+ /* XXX Should this be configurable? */
+ struct ast_threadpool_options opts = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 20,
+ .auto_increment = 1,
+ .initial_size = 0,
+ .max_size = 200
+ };
+
+ ast_register_atexit(stasis_exit);
+
+ if (pool) {
+ ast_log(LOG_ERROR, "Stasis double-initialized\n");
+ return -1;
+ }
+
+ pool = ast_threadpool_create("stasis-core", NULL, &opts);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
+ return -1;
+ }
+
+ cache_init = stasis_cache_init();
+ if (cache_init != 0) {
+ return -1;
+ }
+
+ __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
+ if (!__subscription_change_message_type) {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
new file mode 100644
index 000000000..2f4cf52fd
--- /dev/null
+++ b/main/stasis_cache.c
@@ -0,0 +1,443 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/hashtab.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+#ifdef LOW_MEMORY
+#define NUM_CACHE_BUCKETS 17
+#else
+#define NUM_CACHE_BUCKETS 563
+#endif
+
+struct stasis_caching_topic {
+ struct ao2_container *cache;
+ struct stasis_topic *topic;
+ struct stasis_subscription *sub;
+ snapshot_get_id id_fn;
+};
+
+static void stasis_caching_topic_dtor(void *obj) {
+ struct stasis_caching_topic *caching_topic = obj;
+ ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+ caching_topic->sub = NULL;
+ ao2_cleanup(caching_topic->cache);
+ caching_topic->cache = NULL;
+ ao2_cleanup(caching_topic->topic);
+ caching_topic->topic = NULL;
+}
+
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
+{
+ return caching_topic->topic;
+}
+
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
+{
+ if (caching_topic) {
+ if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+ stasis_unsubscribe(caching_topic->sub);
+ } else {
+ ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
+ }
+ }
+}
+
+struct cache_entry {
+ struct stasis_message_type *type;
+ char *id;
+ struct stasis_message *snapshot;
+};
+
+static void cache_entry_dtor(void *obj)
+{
+ struct cache_entry *entry = obj;
+ ao2_cleanup(entry->type);
+ entry->type = NULL;
+ ast_free(entry->id);
+ entry->id = NULL;
+ ao2_cleanup(entry->snapshot);
+ entry->snapshot = NULL;
+}
+
+static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
+{
+ RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
+
+ ast_assert(type != NULL);
+ ast_assert(id != NULL);
+
+ entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
+ if (!entry) {
+ return NULL;
+ }
+
+ entry->id = ast_strdup(id);
+ if (!entry->id) {
+ return NULL;
+ }
+
+ ao2_ref(type, +1);
+ entry->type = type;
+ if (snapshot != NULL) {
+ ao2_ref(snapshot, +1);
+ entry->snapshot = snapshot;
+ }
+
+ ao2_ref(entry, +1);
+ return entry;
+}
+
+static int cache_entry_hash(const void *obj, int flags)
+{
+ const struct cache_entry *entry = obj;
+ int hash = 0;
+
+ ast_assert(!(flags & OBJ_KEY));
+
+ hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
+ hash += ast_hashtab_hash_string(entry->id);
+ return hash;
+}
+
+static int cache_entry_cmp(void *obj, void *arg, int flags)
+{
+ const struct cache_entry *left = obj;
+ const struct cache_entry *right = arg;
+
+ ast_assert(!(flags & OBJ_KEY));
+
+ if (left->type == right->type && strcmp(left->id, right->id) == 0) {
+ return CMP_MATCH | CMP_STOP;
+ }
+
+ return 0;
+}
+
+static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+{
+ RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
+ RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+ struct stasis_message *old_snapshot = NULL;
+
+ ast_assert(caching_topic->cache != NULL);
+
+ new_entry = cache_entry_create(type, id, new_snapshot);
+
+ if (new_snapshot == NULL) {
+ /* Remove entry from cache */
+ cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
+ if (cached_entry) {
+ old_snapshot = cached_entry->snapshot;
+ cached_entry->snapshot = NULL;
+ }
+ } else {
+ /* Insert/update cache */
+ SCOPED_AO2LOCK(lock, caching_topic->cache);
+
+ cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
+ if (cached_entry) {
+ /* Update cache. Because objects are moving, no need to update refcounts. */
+ old_snapshot = cached_entry->snapshot;
+ cached_entry->snapshot = new_entry->snapshot;
+ new_entry->snapshot = NULL;
+ } else {
+ /* Insert into the cache */
+ ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
+ }
+
+ }
+
+ return old_snapshot;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
+{
+ RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
+ RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+
+ ast_assert(caching_topic->cache != NULL);
+
+ search_entry = cache_entry_create(type, id, NULL);
+ if (search_entry == NULL) {
+ return NULL;
+ }
+
+ cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
+ if (cached_entry == NULL) {
+ return NULL;
+ }
+
+ ast_assert(cached_entry->snapshot != NULL);
+ ao2_ref(cached_entry->snapshot, +1);
+ return cached_entry->snapshot;
+}
+
+static struct stasis_message_type *__cache_clear_data;
+
+static struct stasis_message_type *cache_clear_data(void)
+{
+ ast_assert(__cache_clear_data != NULL);
+ return __cache_clear_data;
+}
+
+static struct stasis_message_type *__cache_update;
+
+struct stasis_message_type *stasis_cache_update(void)
+{
+ ast_assert(__cache_update != NULL);
+ return __cache_update;
+}
+
+struct cache_clear_data {
+ struct stasis_message_type *type;
+ char *id;
+};
+
+static void cache_clear_data_dtor(void *obj)
+{
+ struct cache_clear_data *ev = obj;
+ ast_free(ev->id);
+ ev->id = NULL;
+ ao2_cleanup(ev->type);
+ ev->type = NULL;
+}
+
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
+{
+ RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
+ if (!ev) {
+ return NULL;
+ }
+
+ ev->id = ast_strdup(id);
+ if (!ev->id) {
+ return NULL;
+ }
+ ao2_ref(type, +1);
+ ev->type = type;
+
+ msg = stasis_message_create(cache_clear_data(), ev);
+
+ if (!msg) {
+ return NULL;
+ }
+
+ ao2_ref(msg, +1);
+ return msg;
+}
+
+static void stasis_cache_update_dtor(void *obj)
+{
+ struct stasis_cache_update *update = obj;
+ ao2_cleanup(update->topic);
+ update->topic = NULL;
+ ao2_cleanup(update->old_snapshot);
+ update->old_snapshot = NULL;
+ ao2_cleanup(update->new_snapshot);
+ update->new_snapshot = NULL;
+ ao2_cleanup(update->type);
+ update->type = NULL;
+}
+
+static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+{
+ RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ast_assert(topic != NULL);
+ ast_assert(old_snapshot != NULL || new_snapshot != NULL);
+
+ update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
+ if (!update) {
+ return NULL;
+ }
+
+ ao2_ref(topic, +1);
+ update->topic = topic;
+ if (old_snapshot) {
+ ao2_ref(old_snapshot, +1);
+ update->old_snapshot = old_snapshot;
+ if (!new_snapshot) {
+ ao2_ref(stasis_message_type(old_snapshot), +1);
+ update->type = stasis_message_type(old_snapshot);
+ }
+ }
+ if (new_snapshot) {
+ ao2_ref(new_snapshot, +1);
+ update->new_snapshot = new_snapshot;
+ ao2_ref(stasis_message_type(new_snapshot), +1);
+ update->type = stasis_message_type(new_snapshot);
+ }
+
+ msg = stasis_message_create(stasis_cache_update(), update);
+ if (!msg) {
+ return NULL;
+ }
+
+ ao2_ref(msg, +1);
+ return msg;
+}
+
+static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
+ struct stasis_caching_topic *caching_topic = data;
+ const char *id = NULL;
+
+ ast_assert(caching_topic->topic != NULL);
+ ast_assert(caching_topic->id_fn != NULL);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ caching_topic_needs_unref = caching_topic;
+ }
+
+ /* Handle cache clear event */
+ if (cache_clear_data() == stasis_message_type(message)) {
+ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+ struct cache_clear_data *clear = stasis_message_data(message);
+ ast_assert(clear->type != NULL);
+ ast_assert(clear->id != NULL);
+ old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
+ if (old_snapshot) {
+ update = update_create(topic, old_snapshot, NULL);
+ stasis_publish(caching_topic->topic, update);
+ } else {
+ ast_log(LOG_ERROR,
+ "Attempting to remove an item from the cache that isn't there: %s %s\n",
+ stasis_message_type_name(clear->type), clear->id);
+ }
+ return;
+ }
+
+ id = caching_topic->id_fn(message);
+ if (id == NULL) {
+ /* Object isn't cached; forward */
+ stasis_forward_message(caching_topic->topic, topic, message);
+ } else {
+ /* Update the cache */
+ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+
+ old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
+
+ update = update_create(topic, old_snapshot, message);
+ if (update == NULL) {
+ return;
+ }
+
+ stasis_publish(caching_topic->topic, update);
+ }
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(caching_topic);
+ }
+}
+
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ struct stasis_subscription *sub;
+ RAII_VAR(char *, new_name, NULL, free);
+ int ret;
+
+ ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
+ if (ret < 0) {
+ return NULL;
+ }
+
+ caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
+ if (caching_topic == NULL) {
+ return NULL;
+ }
+
+ caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
+ if (!caching_topic->cache) {
+ ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
+ return NULL;
+ }
+
+ caching_topic->topic = stasis_topic_create(new_name);
+ if (caching_topic->topic == NULL) {
+ return NULL;
+ }
+
+ caching_topic->id_fn = id_fn;
+
+ sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+ if (sub == NULL) {
+ return NULL;
+ }
+ /* This is for the reference contained in the subscription above */
+ ao2_ref(caching_topic, +1);
+ caching_topic->sub = sub;
+
+ ao2_ref(caching_topic, +1);
+ return caching_topic;
+}
+
+static void stasis_cache_exit(void)
+{
+ ao2_cleanup(__cache_clear_data);
+ __cache_clear_data = NULL;
+ ao2_cleanup(__cache_update);
+ __cache_update = NULL;
+}
+
+int stasis_cache_init(void)
+{
+ ast_register_atexit(stasis_cache_exit);
+
+ if (__cache_clear_data || __cache_update) {
+ ast_log(LOG_ERROR, "Stasis cache double initialized\n");
+ return -1;
+ }
+
+ __cache_update = stasis_message_type_create("stasis_cache_update");
+ if (!__cache_update) {
+ return -1;
+ }
+
+ __cache_clear_data = stasis_message_type_create("StasisCacheClear");
+ if (!__cache_clear_data) {
+ return -1;
+ }
+ return 0;
+}
+
diff --git a/main/stasis_message.c b/main/stasis_message.c
new file mode 100644
index 000000000..8d397b935
--- /dev/null
+++ b/main/stasis_message.c
@@ -0,0 +1,135 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+/*! \private */
+struct stasis_message_type {
+ char *name;
+};
+
+static void message_type_dtor(void *obj)
+{
+ struct stasis_message_type *type = obj;
+ ast_free(type->name);
+ type->name = NULL;
+}
+
+struct stasis_message_type *stasis_message_type_create(const char *name)
+{
+ RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+
+ type = ao2_alloc(sizeof(*type), message_type_dtor);
+ if (!type) {
+ return NULL;
+ }
+
+ type->name = ast_strdup(name);
+ if (!type->name) {
+ return NULL;
+ }
+
+ ao2_ref(type, +1);
+ return type;
+}
+
+const char *stasis_message_type_name(const struct stasis_message_type *type)
+{
+ return type->name;
+}
+
+/*! \private */
+struct stasis_message {
+ /*! Time the message was created */
+ struct timeval timestamp;
+ /*! Type of the message */
+ struct stasis_message_type *type;
+ /*! Message content */
+ void *data;
+};
+
+static void stasis_message_dtor(void *obj)
+{
+ struct stasis_message *message = obj;
+ ao2_cleanup(message->type);
+ ao2_cleanup(message->data);
+}
+
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ if (type == NULL || data == NULL) {
+ return NULL;
+ }
+
+ message = ao2_alloc(sizeof(*message), stasis_message_dtor);
+ if (message == NULL) {
+ return NULL;
+ }
+
+ message->timestamp = ast_tvnow();
+ ao2_ref(type, +1);
+ message->type = type;
+ ao2_ref(data, +1);
+ message->data = data;
+
+ ao2_ref(message, +1);
+ return message;
+}
+
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return msg->type;
+}
+
+void *stasis_message_data(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return msg->data;
+}
+
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return &msg->timestamp;
+}