summaryrefslogtreecommitdiff
path: root/main/stasis_bridges.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/stasis_bridges.c')
-rw-r--r--main/stasis_bridges.c143
1 files changed, 83 insertions, 60 deletions
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c
index 858875ad9..72f4d5055 100644
--- a/main/stasis_bridges.c
+++ b/main/stasis_bridges.c
@@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/channel.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
@@ -361,6 +362,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct stasis_cp_all *bridge_cache_all;
+
/*!
* @{ \brief Define bridge message types.
*/
@@ -372,14 +375,52 @@ STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_am
STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
/*! @} */
-/*! \brief Aggregate topic for bridge messages */
-static struct stasis_topic *bridge_topic_all;
+struct stasis_cache *ast_bridge_cache(void)
+{
+ return stasis_cp_all_cache(bridge_cache_all);
+}
-/*! \brief Caching aggregate topic for bridge snapshots */
-static struct stasis_caching_topic *bridge_topic_all_cached;
+struct stasis_topic *ast_bridge_topic_all(void)
+{
+ return stasis_cp_all_topic(bridge_cache_all);
+}
+
+struct stasis_topic *ast_bridge_topic_all_cached(void)
+{
+ return stasis_cp_all_topic_cached(bridge_cache_all);
+}
-/*! \brief Topic pool for individual bridge topics */
-static struct stasis_topic_pool *bridge_topic_pool;
+int bridge_topics_init(struct ast_bridge *bridge)
+{
+ if (ast_strlen_zero(bridge->uniqueid)) {
+ ast_log(LOG_ERROR, "Bridge id initialization required\n");
+ return -1;
+ }
+ bridge->topics = stasis_cp_single_create(bridge_cache_all,
+ bridge->uniqueid);
+ if (!bridge->topics) {
+ return -1;
+ }
+ return 0;
+}
+
+struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
+{
+ if (!bridge) {
+ return ast_bridge_topic_all();
+ }
+
+ return stasis_cp_single_topic(bridge->topics);
+}
+
+struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge)
+{
+ if (!bridge) {
+ return ast_bridge_topic_all_cached();
+ }
+
+ return stasis_cp_single_topic_cached(bridge->topics);
+}
/*! \brief Destructor for bridge snapshots */
static void bridge_snapshot_dtor(void *obj)
@@ -425,25 +466,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge
return snapshot;
}
-struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
-{
- struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
- if (!bridge_topic) {
- return ast_bridge_topic_all();
- }
- return bridge_topic;
-}
-
-struct stasis_topic *ast_bridge_topic_all(void)
-{
- return bridge_topic_all;
-}
-
-struct stasis_caching_topic *ast_bridge_topic_all_cached(void)
-{
- return bridge_topic_all_cached;
-}
-
void ast_bridge_publish_state(struct ast_bridge *bridge)
{
RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -464,7 +486,8 @@ void ast_bridge_publish_state(struct ast_bridge *bridge)
stasis_publish(ast_bridge_topic(bridge), msg);
}
-static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
+static void bridge_publish_state_from_blob(struct ast_bridge *bridge,
+ struct ast_bridge_blob *obj)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
@@ -475,7 +498,7 @@ static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
return;
}
- stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
+ stasis_publish(ast_bridge_topic(bridge), msg);
}
/*! \brief Destructor for bridge merge messages */
@@ -597,7 +620,7 @@ void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *cha
/* enter blob first, then state */
stasis_publish(ast_bridge_topic(bridge), msg);
- bridge_publish_state_from_blob(stasis_message_data(msg));
+ bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
}
void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
@@ -610,7 +633,7 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha
}
/* state first, then leave blob (opposite of enter, preserves nesting of events) */
- bridge_publish_state_from_blob(stasis_message_data(msg));
+ bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
stasis_publish(ast_bridge_topic(bridge), msg);
}
@@ -1043,7 +1066,7 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
ast_assert(!ast_strlen_zero(uniqueid));
- message = stasis_cache_get(ast_bridge_topic_all_cached(),
+ message = stasis_cache_get(ast_bridge_cache(),
ast_bridge_snapshot_type(),
uniqueid);
if (!message) {
@@ -1058,23 +1081,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
return snapshot;
}
-static void stasis_bridging_cleanup(void)
-{
- ao2_cleanup(bridge_topic_all);
- bridge_topic_all = NULL;
- bridge_topic_all_cached = stasis_caching_unsubscribe_and_join(
- bridge_topic_all_cached);
- ao2_cleanup(bridge_topic_pool);
- bridge_topic_pool = NULL;
-
- STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type);
-}
-
/*! \brief snapshot ID getter for caching topic */
static const char *bridge_snapshot_get_id(struct stasis_message *msg)
{
@@ -1086,21 +1092,38 @@ static const char *bridge_snapshot_get_id(struct stasis_message *msg)
return snapshot->uniqueid;
}
+static void stasis_bridging_cleanup(void)
+{
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type);
+
+ ao2_cleanup(bridge_cache_all);
+ bridge_cache_all = NULL;
+}
+
int ast_stasis_bridging_init(void)
{
+ int res = 0;
+
ast_register_cleanup(stasis_bridging_cleanup);
- STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type);
- STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
- STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type);
- STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type);
- bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
- bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id);
- bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
-
- return !bridge_topic_all
- || !bridge_topic_all_cached
- || !bridge_topic_pool ? -1 : 0;
+ bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all",
+ bridge_snapshot_get_id);
+
+ if (!bridge_cache_all) {
+ return -1;
+ }
+
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type);
+
+ return res;
}