summaryrefslogtreecommitdiff
path: root/main/stasis_bridges.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-08-27 19:19:36 +0000
committerDavid M. Lee <dlee@digium.com>2013-08-27 19:19:36 +0000
commit451993f4f57331fae84a9c04b31fd5663ccfc593 (patch)
treeb9baca6011939f78f161e64e1f8382a6acaa2c82 /main/stasis_bridges.c
parent3540c7ac6eca33609c294baa1fbcf56722536f6a (diff)
ARI: WebSocket event cleanup
Stasis events (which get distributed over the ARI WebSocket) are created by subscribing to the channel_all_cached and bridge_all_cached topics, filtering out events for channels/bridges currently subscribed to. There are two issues with that. First was a race condition, where messages in-flight to the master subscribe-to-all-things topic would get sent out, even though the events happened before the channel was put into Stasis. Secondly, as the number of channels and bridges grow in the system, the work spent filtering messages becomes excessive. Since r395954, individual channels and bridges have caching topics, and can be subscribed to individually. This patch takes advantage, so that channels and bridges are subscribed to on demand, instead of filtering the global topics. The one case where filtering is still required is handling BridgeMerge messages, which are published directly to the bridge_all topic. Other than the change to how subscriptions work, this patch mostly just moves code around. Most of the work generating JSON objects from messages was moved to .to_json handlers on the message types. The callback functions handling app subscriptions were moved from res_stasis (b/c they were global to the model) to stasis/app.c (b/c they are local to the app now). (closes issue ASTERISK-21969) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/2754/ ........ Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis_bridges.c')
-rw-r--r--main/stasis_bridges.c54
1 files changed, 51 insertions, 3 deletions
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c
index 2a79056d1..be1294ad0 100644
--- a/main/stasis_bridges.c
+++ b/main/stasis_bridges.c
@@ -132,6 +132,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg);
static struct stasis_cp_all *bridge_cache_all;
@@ -139,9 +142,12 @@ static struct stasis_cp_all *bridge_cache_all;
* @{ \brief Define bridge message types.
*/
STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type,
+ .to_json = ast_bridge_merge_message_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type,
+ .to_json = ast_channel_entered_bridge_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type,
+ .to_json = ast_channel_left_bridge_to_json);
STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami);
STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
/*! @} */
@@ -307,6 +313,19 @@ static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_b
return msg;
}
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_merge_message *merge;
+
+ merge = stasis_message_data(msg);
+
+ return ast_json_pack("{s: s, s: o, s: o, s: o}",
+ "type", "BridgeMerged",
+ "timestamp", ast_json_timeval(*stasis_message_timestamp(msg), NULL),
+ "bridge", ast_bridge_snapshot_to_json(merge->to),
+ "bridge_from", ast_bridge_snapshot_to_json(merge->from));
+}
+
void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from)
{
RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup);
@@ -417,6 +436,35 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha
stasis_publish(ast_bridge_topic(bridge), msg);
}
+static struct ast_json *simple_bridge_channel_event(
+ const char *type,
+ struct ast_bridge_snapshot *bridge_snapshot,
+ struct ast_channel_snapshot *channel_snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+ "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
+struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+ return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(msg));
+}
+
+struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+ return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(msg));
+}
+
typedef struct ast_json *(*json_item_serializer_cb)(void *obj);
static struct ast_json *container_to_json_array(struct ao2_container *items, json_item_serializer_cb item_cb)