diff options
Diffstat (limited to 'main/stasis_bridges.c')
-rw-r--r-- | main/stasis_bridges.c | 143 |
1 files changed, 83 insertions, 60 deletions
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c index 858875ad9..72f4d5055 100644 --- a/main/stasis_bridges.c +++ b/main/stasis_bridges.c @@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/channel.h" #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" @@ -361,6 +362,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message); static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message); +static struct stasis_cp_all *bridge_cache_all; + /*! * @{ \brief Define bridge message types. */ @@ -372,14 +375,52 @@ STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_am STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami); /*! @} */ -/*! \brief Aggregate topic for bridge messages */ -static struct stasis_topic *bridge_topic_all; +struct stasis_cache *ast_bridge_cache(void) +{ + return stasis_cp_all_cache(bridge_cache_all); +} -/*! \brief Caching aggregate topic for bridge snapshots */ -static struct stasis_caching_topic *bridge_topic_all_cached; +struct stasis_topic *ast_bridge_topic_all(void) +{ + return stasis_cp_all_topic(bridge_cache_all); +} + +struct stasis_topic *ast_bridge_topic_all_cached(void) +{ + return stasis_cp_all_topic_cached(bridge_cache_all); +} -/*! \brief Topic pool for individual bridge topics */ -static struct stasis_topic_pool *bridge_topic_pool; +int bridge_topics_init(struct ast_bridge *bridge) +{ + if (ast_strlen_zero(bridge->uniqueid)) { + ast_log(LOG_ERROR, "Bridge id initialization required\n"); + return -1; + } + bridge->topics = stasis_cp_single_create(bridge_cache_all, + bridge->uniqueid); + if (!bridge->topics) { + return -1; + } + return 0; +} + +struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge) +{ + if (!bridge) { + return ast_bridge_topic_all(); + } + + return stasis_cp_single_topic(bridge->topics); +} + +struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge) +{ + if (!bridge) { + return ast_bridge_topic_all_cached(); + } + + return stasis_cp_single_topic_cached(bridge->topics); +} /*! \brief Destructor for bridge snapshots */ static void bridge_snapshot_dtor(void *obj) @@ -425,25 +466,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge return snapshot; } -struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge) -{ - struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid); - if (!bridge_topic) { - return ast_bridge_topic_all(); - } - return bridge_topic; -} - -struct stasis_topic *ast_bridge_topic_all(void) -{ - return bridge_topic_all; -} - -struct stasis_caching_topic *ast_bridge_topic_all_cached(void) -{ - return bridge_topic_all_cached; -} - void ast_bridge_publish_state(struct ast_bridge *bridge) { RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup); @@ -464,7 +486,8 @@ void ast_bridge_publish_state(struct ast_bridge *bridge) stasis_publish(ast_bridge_topic(bridge), msg); } -static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj) +static void bridge_publish_state_from_blob(struct ast_bridge *bridge, + struct ast_bridge_blob *obj) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); @@ -475,7 +498,7 @@ static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj) return; } - stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg); + stasis_publish(ast_bridge_topic(bridge), msg); } /*! \brief Destructor for bridge merge messages */ @@ -597,7 +620,7 @@ void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *cha /* enter blob first, then state */ stasis_publish(ast_bridge_topic(bridge), msg); - bridge_publish_state_from_blob(stasis_message_data(msg)); + bridge_publish_state_from_blob(bridge, stasis_message_data(msg)); } void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan) @@ -610,7 +633,7 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha } /* state first, then leave blob (opposite of enter, preserves nesting of events) */ - bridge_publish_state_from_blob(stasis_message_data(msg)); + bridge_publish_state_from_blob(bridge, stasis_message_data(msg)); stasis_publish(ast_bridge_topic(bridge), msg); } @@ -1043,7 +1066,7 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid) ast_assert(!ast_strlen_zero(uniqueid)); - message = stasis_cache_get(ast_bridge_topic_all_cached(), + message = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), uniqueid); if (!message) { @@ -1058,23 +1081,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid) return snapshot; } -static void stasis_bridging_cleanup(void) -{ - ao2_cleanup(bridge_topic_all); - bridge_topic_all = NULL; - bridge_topic_all_cached = stasis_caching_unsubscribe_and_join( - bridge_topic_all_cached); - ao2_cleanup(bridge_topic_pool); - bridge_topic_pool = NULL; - - STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type); -} - /*! \brief snapshot ID getter for caching topic */ static const char *bridge_snapshot_get_id(struct stasis_message *msg) { @@ -1086,21 +1092,38 @@ static const char *bridge_snapshot_get_id(struct stasis_message *msg) return snapshot->uniqueid; } +static void stasis_bridging_cleanup(void) +{ + STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type); + + ao2_cleanup(bridge_cache_all); + bridge_cache_all = NULL; +} + int ast_stasis_bridging_init(void) { + int res = 0; + ast_register_cleanup(stasis_bridging_cleanup); - STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type); - STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type); - STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type); - STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type); - bridge_topic_all = stasis_topic_create("ast_bridge_topic_all"); - bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id); - bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all); - - return !bridge_topic_all - || !bridge_topic_all_cached - || !bridge_topic_pool ? -1 : 0; + bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all", + bridge_snapshot_get_id); + + if (!bridge_cache_all) { + return -1; + } + + res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type); + + return res; } |