diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/asterisk.c | 6 | ||||
-rw-r--r-- | main/asterisk.exports.in | 1 | ||||
-rw-r--r-- | main/channel.c | 287 | ||||
-rw-r--r-- | main/channel_internal_api.c | 21 | ||||
-rw-r--r-- | main/manager.c | 200 | ||||
-rw-r--r-- | main/pbx.c | 13 | ||||
-rw-r--r-- | main/stasis.c | 514 | ||||
-rw-r--r-- | main/stasis_cache.c | 443 | ||||
-rw-r--r-- | main/stasis_message.c | 135 |
9 files changed, 1506 insertions, 114 deletions
diff --git a/main/asterisk.c b/main/asterisk.c index 1d5371938..4e5e58c05 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -240,6 +240,7 @@ int daemon(int, int); /* defined in libresolv of all places */ #include "asterisk/aoc.h" #include "asterisk/uuid.h" #include "asterisk/sorcery.h" +#include "asterisk/stasis.h" #include "../defaults.h" @@ -4120,6 +4121,11 @@ int main(int argc, char *argv[]) exit(1); } + if (stasis_init()) { + printf("Stasis initialization failed.\n%s", term_quit()); + exit(1); + } + ast_makesocket(); sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); diff --git a/main/asterisk.exports.in b/main/asterisk.exports.in index 49d3a44a4..3157b8319 100644 --- a/main/asterisk.exports.in +++ b/main/asterisk.exports.in @@ -32,6 +32,7 @@ LINKER_SYMBOL_PREFIXdialed_interface_info; LINKER_SYMBOL_PREFIXstrsep; LINKER_SYMBOL_PREFIXsetenv; + LINKER_SYMBOL_PREFIXstasis_*; LINKER_SYMBOL_PREFIXunsetenv; LINKER_SYMBOL_PREFIXstrcasestr; LINKER_SYMBOL_PREFIXstrnlen; diff --git a/main/channel.c b/main/channel.c index 9c8f32cb5..df0a67b3b 100644 --- a/main/channel.c +++ b/main/channel.c @@ -152,6 +152,15 @@ static AST_RWLIST_HEAD_STATIC(backends, chanlist); /*! \brief All active channels on the system */ static struct ao2_container *channels; +/*! \brief Message type for channel snapshot events */ +static struct stasis_message_type *__channel_snapshot; + +static struct stasis_message_type *__channel_varset; + +struct stasis_topic *__channel_topic_all; + +struct stasis_caching_topic *__channel_topic_all_cached; + /*! \brief map AST_CAUSE's to readable string representations * * \ref causes.h @@ -214,6 +223,77 @@ static const struct causes_map causes[] = { { AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" }, }; +static void publish_channel_state(struct ast_channel *chan) +{ + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + snapshot = ast_channel_snapshot_create(chan); + if (!snapshot) { + ast_log(LOG_ERROR, "Allocation error\n"); + return; + } + + message = stasis_message_create(ast_channel_snapshot(), snapshot); + if (!message) { + return; + } + + ast_assert(ast_channel_topic(chan) != NULL); + stasis_publish(ast_channel_topic(chan), message); +} + +static void channel_varset_dtor(void *obj) +{ + struct ast_channel_varset *event = obj; + ao2_cleanup(event->snapshot); + ast_free(event->variable); + ast_free(event->value); +} + +void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value) +{ + RAII_VAR(struct ast_channel_varset *, event, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + event = ao2_alloc(sizeof(*event), channel_varset_dtor); + if (!event) { + return; + } + + if (chan) { + event->snapshot = ast_channel_snapshot_create(chan); + if (event->snapshot == NULL) { + return; + } + } + event->variable = ast_strdup(name); + event->value = ast_strdup(value); + if (event->variable == NULL || event->value == NULL) { + return; + } + + msg = stasis_message_create(ast_channel_varset(), event); + if (!msg) { + return; + } + + if (chan) { + stasis_publish(ast_channel_topic(chan), msg); + } else { + stasis_publish(ast_channel_topic_all(), msg); + } +} + + +static void publish_cache_clear(struct ast_channel *chan) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan)); + stasis_publish(ast_channel_topic(chan), message); +} + struct ast_variable *ast_channeltype_list(void) { struct chanlist *cl; @@ -1073,6 +1153,8 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp)); } + ast_channel_internal_setup_topics(tmp); + if (!ast_strlen_zero(name_fmt)) { char *slash, *slash2; /* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call. @@ -1145,34 +1227,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char * a lot of data into this func to do it here! */ if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) { - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a new channel is created.</synopsis> - <syntax> - <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" /> - <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" /> - </syntax> - </managerEventInstance> - ***/ - ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel", - "Channel: %s\r\n" - "ChannelState: %d\r\n" - "ChannelStateDesc: %s\r\n" - "CallerIDNum: %s\r\n" - "CallerIDName: %s\r\n" - "AccountCode: %s\r\n" - "Exten: %s\r\n" - "Context: %s\r\n" - "Uniqueid: %s\r\n", - ast_channel_name(tmp), - state, - ast_state2str(state), - S_OR(cid_num, ""), - S_OR(cid_name, ""), - ast_channel_accountcode(tmp), - S_OR(exten, ""), - S_OR(context, ""), - ast_channel_uniqueid(tmp)); + publish_channel_state(tmp); } ast_channel_internal_finalize(tmp); @@ -2893,39 +2948,9 @@ int ast_hangup(struct ast_channel *chan) ast_channel_unlock(chan); ast_cc_offer(chan); - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a channel is hung up.</synopsis> - <syntax> - <parameter name="Cause"> - <para>A numeric cause code for why the channel was hung up.</para> - </parameter> - <parameter name="Cause-txt"> - <para>A description of why the channel was hung up.</para> - </parameter> - </syntax> - </managerEventInstance> - ***/ - ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup", - "Channel: %s\r\n" - "Uniqueid: %s\r\n" - "CallerIDNum: %s\r\n" - "CallerIDName: %s\r\n" - "ConnectedLineNum: %s\r\n" - "ConnectedLineName: %s\r\n" - "AccountCode: %s\r\n" - "Cause: %d\r\n" - "Cause-txt: %s\r\n", - ast_channel_name(chan), - ast_channel_uniqueid(chan), - S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"), - S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"), - S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"), - S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"), - ast_channel_accountcode(chan), - ast_channel_hangupcause(chan), - ast_cause2str(ast_channel_hangupcause(chan)) - ); + + publish_channel_state(chan); + publish_cache_clear(chan); if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) && @@ -7435,47 +7460,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state) * we override what they are saying the state is and things go amuck. */ ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name); - /* setstate used to conditionally report Newchannel; this is no more */ - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a channel's state changes.</synopsis> - <syntax> - <parameter name="ChannelState"> - <para>A numeric code for the channel's current state, related to ChannelStateDesc</para> - </parameter> - <parameter name="ChannelStateDesc"> - <enumlist> - <enum name="Down"/> - <enum name="Rsrvd"/> - <enum name="OffHook"/> - <enum name="Dialing"/> - <enum name="Ring"/> - <enum name="Ringing"/> - <enum name="Up"/> - <enum name="Busy"/> - <enum name="Dialing Offhook"/> - <enum name="Pre-ring"/> - <enum name="Unknown"/> - </enumlist> - </parameter> - </syntax> - </managerEventInstance> - ***/ - ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate", - "Channel: %s\r\n" - "ChannelState: %d\r\n" - "ChannelStateDesc: %s\r\n" - "CallerIDNum: %s\r\n" - "CallerIDName: %s\r\n" - "ConnectedLineNum: %s\r\n" - "ConnectedLineName: %s\r\n" - "Uniqueid: %s\r\n", - ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)), - S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""), - S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""), - S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""), - S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""), - ast_channel_uniqueid(chan)); + publish_channel_state(chan); return 0; } @@ -8644,6 +8629,14 @@ static void prnt_channel_key(void *v_obj, void *where, ao2_prnt_fn *prnt) static void channels_shutdown(void) { + ao2_cleanup(__channel_snapshot); + __channel_snapshot = NULL; + ao2_cleanup(__channel_varset); + __channel_varset = NULL; + ao2_cleanup(__channel_topic_all); + __channel_topic_all = NULL; + stasis_caching_unsubscribe(__channel_topic_all_cached); + __channel_topic_all_cached = NULL; ast_data_unregister(NULL); ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel)); if (channels) { @@ -8653,6 +8646,16 @@ static void channels_shutdown(void) } } +static const char *channel_snapshot_get_id(struct stasis_message *message) +{ + struct ast_channel_snapshot *snapshot; + if (ast_channel_snapshot() != stasis_message_type(message)) { + return NULL; + } + snapshot = stasis_message_data(message); + return snapshot->uniqueid; +} + void ast_channels_init(void) { channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS, @@ -8661,6 +8664,12 @@ void ast_channels_init(void) ao2_container_register("channels", channels, prnt_channel_key); } + __channel_snapshot = stasis_message_type_create("ast_channel_snapshot"); + __channel_varset = stasis_message_type_create("ast_channel_varset"); + + __channel_topic_all = stasis_topic_create("ast_channel_topic_all"); + __channel_topic_all_cached = stasis_caching_topic_create(__channel_topic_all, channel_snapshot_get_id); + ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel)); ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers)); @@ -8668,6 +8677,7 @@ void ast_channels_init(void) ast_plc_reload(); ast_register_atexit(channels_shutdown); + } /*! \brief Print call group and pickup group ---*/ @@ -11241,6 +11251,79 @@ int ast_channel_get_cc_agent_type(struct ast_channel *chan, char *agent_type, si return 0; } +static void ast_channel_snapshot_dtor(void *obj) +{ + struct ast_channel_snapshot *snapshot = obj; + ast_string_field_free_memory(snapshot); +} + +struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan) +{ + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + + snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor); + if (ast_string_field_init(snapshot, 1024)) { + return NULL; + } + + ast_string_field_set(snapshot, name, ast_channel_name(chan)); + ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan)); + ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan)); + ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan)); + ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan)); + ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan)); + ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan)); + ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan)); + if (ast_channel_appl(chan)) { + ast_string_field_set(snapshot, appl, ast_channel_appl(chan)); + } + if (ast_channel_data(chan)) { + ast_string_field_set(snapshot, data, ast_channel_data(chan)); + } + ast_string_field_set(snapshot, context, ast_channel_context(chan)); + ast_string_field_set(snapshot, exten, ast_channel_exten(chan)); + + ast_string_field_set(snapshot, caller_name, + S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "")); + ast_string_field_set(snapshot, caller_number, + S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "")); + + ast_string_field_set(snapshot, connected_name, + S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "")); + ast_string_field_set(snapshot, connected_number, + S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "")); + + snapshot->creationtime = ast_channel_creationtime(chan); + snapshot->state = ast_channel_state(chan); + snapshot->priority = ast_channel_priority(chan); + snapshot->amaflags = ast_channel_amaflags(chan); + snapshot->hangupcause = ast_channel_hangupcause(chan); + snapshot->flags = *ast_channel_flags(chan); + + ao2_ref(snapshot, +1); + return snapshot; +} + +struct stasis_message_type *ast_channel_varset(void) +{ + return __channel_varset; +} + +struct stasis_message_type *ast_channel_snapshot(void) +{ + return __channel_snapshot; +} + +struct stasis_topic *ast_channel_topic_all(void) +{ + return __channel_topic_all; +} + +struct stasis_caching_topic *ast_channel_topic_all_cached(void) +{ + return __channel_topic_all_cached; +} + /* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY * * ONLY FUNCTIONS FOR PROVIDING BACKWARDS ABI COMPATIBILITY BELONG HERE diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index 3f892ddef..8cc2e6c62 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -195,6 +195,8 @@ struct ast_channel { char dtmf_digit_to_emulate; /*!< Digit being emulated */ char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */ struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ + struct stasis_topic *topic; /*!< Topic for all channel's events */ + struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */ }; /* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */ @@ -1364,6 +1366,12 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) } ast_string_field_free_memory(chan); + + stasis_unsubscribe(chan->forwarder); + chan->forwarder = NULL; + + ao2_cleanup(chan->topic); + chan->topic = NULL; } void ast_channel_internal_finalize(struct ast_channel *chan) @@ -1375,3 +1383,16 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan) { return chan->finalized; } + +struct stasis_topic *ast_channel_topic(struct ast_channel *chan) +{ + return chan->topic; +} + +void ast_channel_internal_setup_topics(struct ast_channel *chan) +{ + ast_assert(chan->topic == NULL); + ast_assert(chan->forwarder == NULL); + chan->topic = stasis_topic_create(chan->uniqueid); + chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all()); +} diff --git a/main/manager.c b/main/manager.c index fc0ec2631..10a3a3397 100644 --- a/main/manager.c +++ b/main/manager.c @@ -91,6 +91,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/aoc.h" #include "asterisk/stringfields.h" #include "asterisk/presencestate.h" +#include "asterisk/stasis.h" /*** DOCUMENTATION <manager name="Ping" language="en_US"> @@ -963,6 +964,73 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") manager.conf will be present upon starting a new session.</para> </description> </manager> + <managerEvent language="en_US" name="Newchannel"> + <managerEventInstance class="EVENT_FLAG_CALL"> + <synopsis>Raised when a new channel is created.</synopsis> + <syntax> + <parameter name="Channel"> + </parameter> + <parameter name="ChannelState"> + <para>A numeric code for the channel's current state, related to ChannelStateDesc</para> + </parameter> + <parameter name="ChannelStateDesc"> + <enumlist> + <enum name="Down"/> + <enum name="Rsrvd"/> + <enum name="OffHook"/> + <enum name="Dialing"/> + <enum name="Ring"/> + <enum name="Ringing"/> + <enum name="Up"/> + <enum name="Busy"/> + <enum name="Dialing Offhook"/> + <enum name="Pre-ring"/> + <enum name="Unknown"/> + </enumlist> + </parameter> + <parameter name="CallerIDNum"> + </parameter> + <parameter name="CallerIDName"> + </parameter> + <parameter name="ConnectedLineNum"> + </parameter> + <parameter name="ConnectedLineName"> + </parameter> + <parameter name="AccountCode"> + </parameter> + <parameter name="Context"> + </parameter> + <parameter name="Exten"> + </parameter> + <parameter name="Priority"> + </parameter> + <parameter name="Uniqueid"> + </parameter> + <parameter name="Cause"> + <para>A numeric cause code for why the channel was hung up.</para> + </parameter> + <parameter name="Cause-txt"> + <para>A description of why the channel was hung up.</para> + </parameter> + </syntax> + </managerEventInstance> + </managerEvent> + <managerEvent language="en_US" name="Newstate"> + <managerEventInstance class="EVENT_FLAG_CALL"> + <synopsis>Raised when a channel's state changes.</synopsis> + <syntax> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" /> + </syntax> + </managerEventInstance> + </managerEvent> + <managerEvent language="en_US" name="Hangup"> + <managerEventInstance class="EVENT_FLAG_CALL"> + <synopsis>Raised when a channel is hung up.</synopsis> + <syntax> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" /> + </syntax> + </managerEventInstance> + </managerEvent> ***/ /*! \addtogroup Group_AMI AMI functions @@ -1060,6 +1128,8 @@ static const struct { {{ "restart", "gracefully", NULL }}, }; +static struct stasis_subscription *channel_state_sub; + static void acl_change_event_cb(const struct ast_event *event, void *userdata); static void acl_change_event_subscribe(void) @@ -7376,6 +7446,127 @@ static void load_channelvars(struct ast_variable *var) AST_RWLIST_UNLOCK(&channelvars); } +/*! + * \brief Generate the AMI message body from a channel snapshot + * \internal + * + * \param snapshot the channel snapshot for which to generate an AMI message body + * + * \retval NULL on error + * \retval ast_str* on success (must be ast_freed by caller) + */ +static struct ast_str *manager_build_channel_state_string(const struct ast_channel_snapshot *snapshot) +{ + struct ast_str *out = ast_str_create(1024); + int res = 0; + if (!out) { + return NULL; + } + res = ast_str_set(&out, 0, + "Channel: %s\r\n" + "ChannelState: %d\r\n" + "ChannelStateDesc: %s\r\n" + "CallerIDNum: %s\r\n" + "CallerIDName: %s\r\n" + "ConnectedLineNum: %s\r\n" + "ConnectedLineName: %s\r\n" + "AccountCode: %s\r\n" + "Context: %s\r\n" + "Exten: %s\r\n" + "Priority: %d\r\n" + "Uniqueid: %s\r\n" + "Cause: %d\r\n" + "Cause-txt: %s\r\n", + snapshot->name, + snapshot->state, + ast_state2str(snapshot->state), + snapshot->caller_number, + snapshot->caller_name, + snapshot->connected_number, + snapshot->connected_name, + snapshot->accountcode, + snapshot->context, + snapshot->exten, + snapshot->priority, + snapshot->uniqueid, + snapshot->hangupcause, + ast_cause2str(snapshot->hangupcause)); + + if (!res) { + return NULL; + } + + return out; +} + +static void channel_snapshot_update(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) +{ + int is_hungup; + char *manager_event = NULL; + + if (!new_snapshot) { + /* Ignore cache clearing events; we'll see the hangup first */ + return; + } + + is_hungup = ast_test_flag(&new_snapshot->flags, AST_FLAG_ZOMBIE) ? 1 : 0; + + if (!old_snapshot) { + manager_event = "Newchannel"; + } + + if (old_snapshot && old_snapshot->state != new_snapshot->state) { + manager_event = "Newstate"; + } + + if (old_snapshot && is_hungup) { + manager_event = "Hangup"; + } + + if (manager_event) { + RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); + + channel_event_string = manager_build_channel_state_string(new_snapshot); + if (channel_event_string) { + manager_event(EVENT_FLAG_CALL, manager_event, "%s", ast_str_buffer(channel_event_string)); + } + } +} + +static void channel_varset(const char *channel_name, const char *uniqueid, const char *name, const char *value) +{ + /*** DOCUMENTATION + <managerEventInstance> + <synopsis>Raised when a variable is set to a particular value.</synopsis> + </managerEventInstance> + ***/ + manager_event(EVENT_FLAG_DIALPLAN, "VarSet", + "Channel: %s\r\n" + "Variable: %s\r\n" + "Value: %s\r\n" + "Uniqueid: %s\r\n", + channel_name, name, value, uniqueid); +} + +static void channel_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + if (stasis_message_type(message) == stasis_cache_update()) { + struct stasis_cache_update *update = stasis_message_data(message); + if (ast_channel_snapshot() == update->type) { + struct ast_channel_snapshot *old_snapshot = + stasis_message_data(update->old_snapshot); + struct ast_channel_snapshot *new_snapshot = + stasis_message_data(update->new_snapshot); + channel_snapshot_update(old_snapshot, new_snapshot); + } + } else if (stasis_message_type(message) == ast_channel_varset()) { + struct ast_channel_varset *varset = stasis_message_data(message); + const char *name = varset->snapshot ? varset->snapshot->name : "none"; + const char *uniqueid = varset->snapshot ? varset->snapshot->uniqueid : "none"; + channel_varset(name, uniqueid, varset->variable, varset->value); + } +} + /*! \internal \brief Free a user record. Should already be removed from the list */ static void manager_free_user(struct ast_manager_user *user) { @@ -7399,6 +7590,9 @@ static void manager_shutdown(void) { struct ast_manager_user *user; + stasis_unsubscribe(channel_state_sub); + channel_state_sub = NULL; + if (registered) { ast_manager_unregister("Ping"); ast_manager_unregister("Events"); @@ -7490,6 +7684,12 @@ static int __init_manager(int reload, int by_external_config) manager_enabled = 0; + if (!channel_state_sub) { + channel_state_sub = stasis_subscribe( + stasis_caching_get_topic(ast_channel_topic_all_cached()), + channel_event_cb, NULL); + } + if (!registered) { /* Register default actions */ ast_manager_register_xml_core("Ping", 0, action_ping); diff --git a/main/pbx.c b/main/pbx.c index bf95ccbe2..82bbb5257 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -11453,18 +11453,7 @@ int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const ast_verb(2, "Setting global variable '%s' to '%s'\n", name, value); newvariable = ast_var_assign(name, value); AST_LIST_INSERT_HEAD(headp, newvariable, entries); - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>Raised when a variable is set to a particular value.</synopsis> - </managerEventInstance> - ***/ - manager_event(EVENT_FLAG_DIALPLAN, "VarSet", - "Channel: %s\r\n" - "Variable: %s\r\n" - "Value: %s\r\n" - "Uniqueid: %s\r\n", - chan ? ast_channel_name(chan) : "none", name, value, - chan ? ast_channel_uniqueid(chan) : "none"); + ast_channel_publish_varset(chan, name, value); } if (chan) diff --git a/main/stasis.c b/main/stasis.c new file mode 100644 index 000000000..f94736bf1 --- /dev/null +++ b/main/stasis.c @@ -0,0 +1,514 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis Message Bus API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/threadpool.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/utils.h" +#include "asterisk/uuid.h" + +/*! Initial size of the subscribers list. */ +#define INITIAL_SUBSCRIBERS_MAX 4 + +/*! Threadpool for dispatching notifications to subscribers */ +static struct ast_threadpool *pool; + +static struct stasis_message_type *__subscription_change_message_type; + +/*! \private */ +struct stasis_topic { + char *name; + /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */ + struct stasis_subscription **subscribers; + /*! Allocated length of the subscribers array */ + size_t num_subscribers_max; + /*! Current size of the subscribers array */ + size_t num_subscribers_current; +}; + +/* Forward declarations for the tightly-coupled subscription object */ +struct stasis_subscription; +static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); + +static void topic_dtor(void *obj) +{ + struct stasis_topic *topic = obj; + ast_free(topic->name); + topic->name = NULL; + ast_free(topic->subscribers); + topic->subscribers = NULL; +} + +struct stasis_topic *stasis_topic_create(const char *name) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + + topic = ao2_alloc(sizeof(*topic), topic_dtor); + + if (!topic) { + return NULL; + } + + topic->name = ast_strdup(name); + if (!topic->name) { + return NULL; + } + + topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX; + topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers)); + if (!topic->subscribers) { + return NULL; + } + + ao2_ref(topic, +1); + return topic; +} + +const char *stasis_topic_name(const struct stasis_topic *topic) +{ + return topic->name; +} + +/*! \private */ +struct stasis_subscription { + /*! Unique ID for this subscription */ + char *uniqueid; + /*! Topic subscribed to. */ + struct stasis_topic *topic; + /*! Mailbox for processing incoming messages. */ + struct ast_taskprocessor *mailbox; + /*! Callback function for incoming message processing. */ + stasis_subscription_cb callback; + /*! Data pointer to be handed to the callback. */ + void *data; +}; + +static void subscription_dtor(void *obj) +{ + struct stasis_subscription *sub = obj; + ast_assert(!stasis_subscription_is_subscribed(sub)); + ast_free(sub->uniqueid); + sub->uniqueid = NULL; + ao2_cleanup(sub->topic); + sub->topic = NULL; + ast_taskprocessor_unreference(sub->mailbox); + sub->mailbox = NULL; +} + +static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); + +static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox) +{ + RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); + RAII_VAR(struct ast_uuid *, id, NULL, ast_free); + char uniqueid[AST_UUID_STR_LEN]; + + sub = ao2_alloc(sizeof(*sub), subscription_dtor); + if (!sub) { + return NULL; + } + + id = ast_uuid_generate(); + if (!id) { + ast_log(LOG_ERROR, "UUID generation failed\n"); + return NULL; + } + ast_uuid_to_str(id, uniqueid, sizeof(uniqueid)); + if (needs_mailbox) { + sub->mailbox = ast_threadpool_serializer(uniqueid, pool); + if (!sub->mailbox) { + return NULL; + } + } + + sub->uniqueid = ast_strdup(uniqueid); + ao2_ref(topic, +1); + sub->topic = topic; + sub->callback = callback; + sub->data = data; + + if (topic_add_subscription(topic, sub) != 0) { + return NULL; + } + send_subscription_change_message(topic, uniqueid, "Subscribe"); + + ao2_ref(sub, +1); + return sub; +} + +struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data) +{ + return __stasis_subscribe(topic, callback, data, 1); +} + +void stasis_unsubscribe(struct stasis_subscription *sub) +{ + if (sub) { + size_t i; + struct stasis_topic *topic = sub->topic; + SCOPED_AO2LOCK(lock_topic, topic); + + for (i = 0; i < topic->num_subscribers_current; ++i) { + if (topic->subscribers[i] == sub) { + send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe"); + /* swap [i] with last entry; remove last entry */ + topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current]; + /* Unsubscribing unrefs the subscription */ + ao2_cleanup(sub); + return; + } + } + + ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n"); + } +} + +int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) +{ + if (sub) { + size_t i; + struct stasis_topic *topic = sub->topic; + SCOPED_AO2LOCK(lock_topic, topic); + + for (i = 0; i < topic->num_subscribers_current; ++i) { + if (topic->subscribers[i] == sub) { + return 1; + } + } + } + + return 0; +} + +const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub) +{ + return sub->uniqueid; +} + +int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg) +{ + struct stasis_subscription_change *change; + if (stasis_message_type(msg) != stasis_subscription_change()) { + return 0; + } + + change = stasis_message_data(msg); + if (strcmp("Unsubscribe", change->description)) { + return 0; + } + + if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) { + return 0; + } + + return 1; +} + +/*! + * \brief Add a subscriber to a topic. + * \param topic Topic + * \param sub Subscriber + * \return 0 on success + * \return Non-zero on error + */ +static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) +{ + struct stasis_subscription **subscribers; + SCOPED_AO2LOCK(lock, topic); + + /* Increase list size, if needed */ + if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) { + subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers)); + if (!subscribers) { + return -1; + } + topic->subscribers = subscribers; + topic->num_subscribers_max *= 2; + } + + /* Don't ref sub here or we'll cause a reference cycle. */ + topic->subscribers[topic->num_subscribers_current++] = sub; + return 0; +} + +/*! + * \private + * \brief Information needed to dispatch a message to a subscription + */ +struct dispatch { + /*! Topic message was published to */ + struct stasis_topic *topic; + /*! The message itself */ + struct stasis_message *message; + /*! Subscription receiving the message */ + struct stasis_subscription *sub; +}; + +static void dispatch_dtor(void *data) +{ + struct dispatch *dispatch = data; + ao2_cleanup(dispatch->topic); + ao2_cleanup(dispatch->message); + ao2_cleanup(dispatch->sub); +} + +static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub) +{ + RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + + ast_assert(topic != NULL); + ast_assert(message != NULL); + ast_assert(sub != NULL); + + dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor); + if (!dispatch) { + return NULL; + } + + dispatch->topic = topic; + ao2_ref(topic, +1); + + dispatch->message = message; + ao2_ref(message, +1); + + dispatch->sub = sub; + ao2_ref(sub, +1); + + ao2_ref(dispatch, +1); + return dispatch; +} + +/*! + * \brief Dispatch a message to a subscriber + * \param data \ref dispatch object + * \return 0 + */ +static int dispatch_exec(void *data) +{ + RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup); + RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup); + + /* Since sub->topic doesn't change, no need to lock sub */ + ast_assert(dispatch->sub->topic != NULL); + ao2_ref(dispatch->sub->topic, +1); + sub_topic = dispatch->sub->topic; + + dispatch->sub->callback(dispatch->sub->data, + dispatch->sub, + sub_topic, + dispatch->message); + + return 0; +} + +void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message) +{ + struct stasis_subscription **subscribers = NULL; + size_t num_subscribers, i; + + /* Copy the subscribers, so we don't have to hold the mutex for long */ + { + SCOPED_AO2LOCK(lock, topic); + num_subscribers = topic->num_subscribers_current; + subscribers = ast_malloc(num_subscribers * sizeof(*subscribers)); + if (subscribers) { + for (i = 0; i < num_subscribers; ++i) { + ao2_ref(topic->subscribers[i], +1); + subscribers[i] = topic->subscribers[i]; + } + } + } + + if (!subscribers) { + ast_log(LOG_ERROR, "Dropping message\n"); + return; + } + + for (i = 0; i < num_subscribers; ++i) { + struct stasis_subscription *sub = subscribers[i]; + + ast_assert(sub != NULL); + + if (sub->mailbox) { + RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + + dispatch = dispatch_create(publisher_topic, message, sub); + if (!dispatch) { + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + break; + } + + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { + dispatch = NULL; /* Ownership transferred to mailbox */ + } + } else { + /* No mailbox; dispatch directly */ + sub->callback(sub->data, sub, sub->topic, message); + } + } + + for (i = 0; i < num_subscribers; ++i) { + ao2_cleanup(subscribers[i]); + } + ast_free(subscribers); +} + +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) +{ + stasis_forward_message(topic, topic, message); +} + +/*! \brief Forwarding subscriber */ +static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + struct stasis_topic *to_topic = data; + stasis_forward_message(to_topic, topic, message); + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(to_topic); + } +} + +struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic) +{ + struct stasis_subscription *sub; + if (!from_topic || !to_topic) { + return NULL; + } + /* Subscribe without a mailbox, since we're just forwarding messages */ + sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0); + if (sub) { + /* hold a ref to to_topic for this forwarding subscription */ + ao2_ref(to_topic, +1); + } + return sub; +} + +static void subscription_change_dtor(void *obj) +{ + struct stasis_subscription_change *change = obj; + ast_string_field_free_memory(change); + ao2_cleanup(change->topic); +} + +static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description) +{ + RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); + + change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor); + if (ast_string_field_init(change, 128)) { + return NULL; + } + + ast_string_field_set(change, uniqueid, uniqueid); + ast_string_field_set(change, description, description); + ao2_ref(topic, +1); + change->topic = topic; + + ao2_ref(change, +1); + return change; +} + +struct stasis_message_type *stasis_subscription_change(void) +{ + return __subscription_change_message_type; +} + +static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description) +{ + RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + change = subscription_change_alloc(topic, uniqueid, description); + + if (!change) { + return; + } + + msg = stasis_message_create(stasis_subscription_change(), change); + + if (!msg) { + return; + } + + stasis_publish(topic, msg); +} + +/*! \brief Cleanup function */ +static void stasis_exit(void) +{ + ao2_cleanup(__subscription_change_message_type); + __subscription_change_message_type = NULL; + ast_threadpool_shutdown(pool); + pool = NULL; +} + +int stasis_init(void) +{ + int cache_init; + + /* XXX Should this be configurable? */ + struct ast_threadpool_options opts = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 20, + .auto_increment = 1, + .initial_size = 0, + .max_size = 200 + }; + + ast_register_atexit(stasis_exit); + + if (pool) { + ast_log(LOG_ERROR, "Stasis double-initialized\n"); + return -1; + } + + pool = ast_threadpool_create("stasis-core", NULL, &opts); + if (!pool) { + ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n"); + return -1; + } + + cache_init = stasis_cache_init(); + if (cache_init != 0) { + return -1; + } + + __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change"); + if (!__subscription_change_message_type) { + return -1; + } + + return 0; +} diff --git a/main/stasis_cache.c b/main/stasis_cache.c new file mode 100644 index 000000000..2f4cf52fd --- /dev/null +++ b/main/stasis_cache.c @@ -0,0 +1,443 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis Message API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/hashtab.h" +#include "asterisk/stasis.h" +#include "asterisk/utils.h" + +#ifdef LOW_MEMORY +#define NUM_CACHE_BUCKETS 17 +#else +#define NUM_CACHE_BUCKETS 563 +#endif + +struct stasis_caching_topic { + struct ao2_container *cache; + struct stasis_topic *topic; + struct stasis_subscription *sub; + snapshot_get_id id_fn; +}; + +static void stasis_caching_topic_dtor(void *obj) { + struct stasis_caching_topic *caching_topic = obj; + ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub)); + caching_topic->sub = NULL; + ao2_cleanup(caching_topic->cache); + caching_topic->cache = NULL; + ao2_cleanup(caching_topic->topic); + caching_topic->topic = NULL; +} + +struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic) +{ + return caching_topic->topic; +} + +void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic) +{ + if (caching_topic) { + if (stasis_subscription_is_subscribed(caching_topic->sub)) { + stasis_unsubscribe(caching_topic->sub); + } else { + ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n"); + } + } +} + +struct cache_entry { + struct stasis_message_type *type; + char *id; + struct stasis_message *snapshot; +}; + +static void cache_entry_dtor(void *obj) +{ + struct cache_entry *entry = obj; + ao2_cleanup(entry->type); + entry->type = NULL; + ast_free(entry->id); + entry->id = NULL; + ao2_cleanup(entry->snapshot); + entry->snapshot = NULL; +} + +static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot) +{ + RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup); + + ast_assert(type != NULL); + ast_assert(id != NULL); + + entry = ao2_alloc(sizeof(*entry), cache_entry_dtor); + if (!entry) { + return NULL; + } + + entry->id = ast_strdup(id); + if (!entry->id) { + return NULL; + } + + ao2_ref(type, +1); + entry->type = type; + if (snapshot != NULL) { + ao2_ref(snapshot, +1); + entry->snapshot = snapshot; + } + + ao2_ref(entry, +1); + return entry; +} + +static int cache_entry_hash(const void *obj, int flags) +{ + const struct cache_entry *entry = obj; + int hash = 0; + + ast_assert(!(flags & OBJ_KEY)); + + hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type)); + hash += ast_hashtab_hash_string(entry->id); + return hash; +} + +static int cache_entry_cmp(void *obj, void *arg, int flags) +{ + const struct cache_entry *left = obj; + const struct cache_entry *right = arg; + + ast_assert(!(flags & OBJ_KEY)); + + if (left->type == right->type && strcmp(left->id, right->id) == 0) { + return CMP_MATCH | CMP_STOP; + } + + return 0; +} + +static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot) +{ + RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup); + RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); + struct stasis_message *old_snapshot = NULL; + + ast_assert(caching_topic->cache != NULL); + + new_entry = cache_entry_create(type, id, new_snapshot); + + if (new_snapshot == NULL) { + /* Remove entry from cache */ + cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK); + if (cached_entry) { + old_snapshot = cached_entry->snapshot; + cached_entry->snapshot = NULL; + } + } else { + /* Insert/update cache */ + SCOPED_AO2LOCK(lock, caching_topic->cache); + + cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK); + if (cached_entry) { + /* Update cache. Because objects are moving, no need to update refcounts. */ + old_snapshot = cached_entry->snapshot; + cached_entry->snapshot = new_entry->snapshot; + new_entry->snapshot = NULL; + } else { + /* Insert into the cache */ + ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK); + } + + } + + return old_snapshot; +} + +struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id) +{ + RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup); + RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); + + ast_assert(caching_topic->cache != NULL); + + search_entry = cache_entry_create(type, id, NULL); + if (search_entry == NULL) { + return NULL; + } + + cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER); + if (cached_entry == NULL) { + return NULL; + } + + ast_assert(cached_entry->snapshot != NULL); + ao2_ref(cached_entry->snapshot, +1); + return cached_entry->snapshot; +} + +static struct stasis_message_type *__cache_clear_data; + +static struct stasis_message_type *cache_clear_data(void) +{ + ast_assert(__cache_clear_data != NULL); + return __cache_clear_data; +} + +static struct stasis_message_type *__cache_update; + +struct stasis_message_type *stasis_cache_update(void) +{ + ast_assert(__cache_update != NULL); + return __cache_update; +} + +struct cache_clear_data { + struct stasis_message_type *type; + char *id; +}; + +static void cache_clear_data_dtor(void *obj) +{ + struct cache_clear_data *ev = obj; + ast_free(ev->id); + ev->id = NULL; + ao2_cleanup(ev->type); + ev->type = NULL; +} + +struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id) +{ + RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor); + if (!ev) { + return NULL; + } + + ev->id = ast_strdup(id); + if (!ev->id) { + return NULL; + } + ao2_ref(type, +1); + ev->type = type; + + msg = stasis_message_create(cache_clear_data(), ev); + + if (!msg) { + return NULL; + } + + ao2_ref(msg, +1); + return msg; +} + +static void stasis_cache_update_dtor(void *obj) +{ + struct stasis_cache_update *update = obj; + ao2_cleanup(update->topic); + update->topic = NULL; + ao2_cleanup(update->old_snapshot); + update->old_snapshot = NULL; + ao2_cleanup(update->new_snapshot); + update->new_snapshot = NULL; + ao2_cleanup(update->type); + update->type = NULL; +} + +static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot) +{ + RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + ast_assert(topic != NULL); + ast_assert(old_snapshot != NULL || new_snapshot != NULL); + + update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor); + if (!update) { + return NULL; + } + + ao2_ref(topic, +1); + update->topic = topic; + if (old_snapshot) { + ao2_ref(old_snapshot, +1); + update->old_snapshot = old_snapshot; + if (!new_snapshot) { + ao2_ref(stasis_message_type(old_snapshot), +1); + update->type = stasis_message_type(old_snapshot); + } + } + if (new_snapshot) { + ao2_ref(new_snapshot, +1); + update->new_snapshot = new_snapshot; + ao2_ref(stasis_message_type(new_snapshot), +1); + update->type = stasis_message_type(new_snapshot); + } + + msg = stasis_message_create(stasis_cache_update(), update); + if (!msg) { + return NULL; + } + + ao2_ref(msg, +1); + return msg; +} + +static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup); + struct stasis_caching_topic *caching_topic = data; + const char *id = NULL; + + ast_assert(caching_topic->topic != NULL); + ast_assert(caching_topic->id_fn != NULL); + + if (stasis_subscription_final_message(sub, message)) { + caching_topic_needs_unref = caching_topic; + } + + /* Handle cache clear event */ + if (cache_clear_data() == stasis_message_type(message)) { + RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); + struct cache_clear_data *clear = stasis_message_data(message); + ast_assert(clear->type != NULL); + ast_assert(clear->id != NULL); + old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL); + if (old_snapshot) { + update = update_create(topic, old_snapshot, NULL); + stasis_publish(caching_topic->topic, update); + } else { + ast_log(LOG_ERROR, + "Attempting to remove an item from the cache that isn't there: %s %s\n", + stasis_message_type_name(clear->type), clear->id); + } + return; + } + + id = caching_topic->id_fn(message); + if (id == NULL) { + /* Object isn't cached; forward */ + stasis_forward_message(caching_topic->topic, topic, message); + } else { + /* Update the cache */ + RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); + + old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message); + + update = update_create(topic, old_snapshot, message); + if (update == NULL) { + return; + } + + stasis_publish(caching_topic->topic, update); + } + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(caching_topic); + } +} + +struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn) +{ + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + struct stasis_subscription *sub; + RAII_VAR(char *, new_name, NULL, free); + int ret; + + ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic)); + if (ret < 0) { + return NULL; + } + + caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor); + if (caching_topic == NULL) { + return NULL; + } + + caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp); + if (!caching_topic->cache) { + ast_log(LOG_ERROR, "Stasis cache allocation failed\n"); + return NULL; + } + + caching_topic->topic = stasis_topic_create(new_name); + if (caching_topic->topic == NULL) { + return NULL; + } + + caching_topic->id_fn = id_fn; + + sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic); + if (sub == NULL) { + return NULL; + } + /* This is for the reference contained in the subscription above */ + ao2_ref(caching_topic, +1); + caching_topic->sub = sub; + + ao2_ref(caching_topic, +1); + return caching_topic; +} + +static void stasis_cache_exit(void) +{ + ao2_cleanup(__cache_clear_data); + __cache_clear_data = NULL; + ao2_cleanup(__cache_update); + __cache_update = NULL; +} + +int stasis_cache_init(void) +{ + ast_register_atexit(stasis_cache_exit); + + if (__cache_clear_data || __cache_update) { + ast_log(LOG_ERROR, "Stasis cache double initialized\n"); + return -1; + } + + __cache_update = stasis_message_type_create("stasis_cache_update"); + if (!__cache_update) { + return -1; + } + + __cache_clear_data = stasis_message_type_create("StasisCacheClear"); + if (!__cache_clear_data) { + return -1; + } + return 0; +} + diff --git a/main/stasis_message.c b/main/stasis_message.c new file mode 100644 index 000000000..8d397b935 --- /dev/null +++ b/main/stasis_message.c @@ -0,0 +1,135 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis Message API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/utils.h" + +/*! \private */ +struct stasis_message_type { + char *name; +}; + +static void message_type_dtor(void *obj) +{ + struct stasis_message_type *type = obj; + ast_free(type->name); + type->name = NULL; +} + +struct stasis_message_type *stasis_message_type_create(const char *name) +{ + RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup); + + type = ao2_alloc(sizeof(*type), message_type_dtor); + if (!type) { + return NULL; + } + + type->name = ast_strdup(name); + if (!type->name) { + return NULL; + } + + ao2_ref(type, +1); + return type; +} + +const char *stasis_message_type_name(const struct stasis_message_type *type) +{ + return type->name; +} + +/*! \private */ +struct stasis_message { + /*! Time the message was created */ + struct timeval timestamp; + /*! Type of the message */ + struct stasis_message_type *type; + /*! Message content */ + void *data; +}; + +static void stasis_message_dtor(void *obj) +{ + struct stasis_message *message = obj; + ao2_cleanup(message->type); + ao2_cleanup(message->data); +} + +struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + if (type == NULL || data == NULL) { + return NULL; + } + + message = ao2_alloc(sizeof(*message), stasis_message_dtor); + if (message == NULL) { + return NULL; + } + + message->timestamp = ast_tvnow(); + ao2_ref(type, +1); + message->type = type; + ao2_ref(data, +1); + message->data = data; + + ao2_ref(message, +1); + return message; +} + +struct stasis_message_type *stasis_message_type(const struct stasis_message *msg) +{ + if (msg == NULL) { + return NULL; + } + return msg->type; +} + +void *stasis_message_data(const struct stasis_message *msg) +{ + if (msg == NULL) { + return NULL; + } + return msg->data; +} + +const struct timeval *stasis_message_timestamp(const struct stasis_message *msg) +{ + if (msg == NULL) { + return NULL; + } + return &msg->timestamp; +} |