diff options
author | David M. Lee <dlee@digium.com> | 2013-08-27 19:19:36 +0000 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-08-27 19:19:36 +0000 |
commit | 451993f4f57331fae84a9c04b31fd5663ccfc593 (patch) | |
tree | b9baca6011939f78f161e64e1f8382a6acaa2c82 /main/stasis_bridges.c | |
parent | 3540c7ac6eca33609c294baa1fbcf56722536f6a (diff) |
ARI: WebSocket event cleanup
Stasis events (which get distributed over the ARI WebSocket) are created
by subscribing to the channel_all_cached and bridge_all_cached topics,
filtering out events for channels/bridges currently subscribed to.
There are two issues with that. First was a race condition, where
messages in-flight to the master subscribe-to-all-things topic would get
sent out, even though the events happened before the channel was put
into Stasis. Secondly, as the number of channels and bridges grow in the
system, the work spent filtering messages becomes excessive.
Since r395954, individual channels and bridges have caching topics, and
can be subscribed to individually. This patch takes advantage, so that
channels and bridges are subscribed to on demand, instead of filtering
the global topics.
The one case where filtering is still required is handling BridgeMerge
messages, which are published directly to the bridge_all topic.
Other than the change to how subscriptions work, this patch mostly just
moves code around. Most of the work generating JSON objects from
messages was moved to .to_json handlers on the message types. The
callback functions handling app subscriptions were moved from res_stasis
(b/c they were global to the model) to stasis/app.c (b/c they are local
to the app now).
(closes issue ASTERISK-21969)
Reported by: Matt Jordan
Review: https://reviewboard.asterisk.org/r/2754/
........
Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis_bridges.c')
-rw-r--r-- | main/stasis_bridges.c | 54 |
1 files changed, 51 insertions, 3 deletions
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c index 2a79056d1..be1294ad0 100644 --- a/main/stasis_bridges.c +++ b/main/stasis_bridges.c @@ -132,6 +132,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message); static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message); +static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg); +static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg); +static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg); static struct stasis_cp_all *bridge_cache_all; @@ -139,9 +142,12 @@ static struct stasis_cp_all *bridge_cache_all; * @{ \brief Define bridge message types. */ STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type); -STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type); -STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type); -STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type); +STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type, + .to_json = ast_bridge_merge_message_to_json); +STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type, + .to_json = ast_channel_entered_bridge_to_json); +STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type, + .to_json = ast_channel_left_bridge_to_json); STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami); STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami); /*! @} */ @@ -307,6 +313,19 @@ static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_b return msg; } +static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg) +{ + struct ast_bridge_merge_message *merge; + + merge = stasis_message_data(msg); + + return ast_json_pack("{s: s, s: o, s: o, s: o}", + "type", "BridgeMerged", + "timestamp", ast_json_timeval(*stasis_message_timestamp(msg), NULL), + "bridge", ast_bridge_snapshot_to_json(merge->to), + "bridge_from", ast_bridge_snapshot_to_json(merge->from)); +} + void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from) { RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup); @@ -417,6 +436,35 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha stasis_publish(ast_bridge_topic(bridge), msg); } +static struct ast_json *simple_bridge_channel_event( + const char *type, + struct ast_bridge_snapshot *bridge_snapshot, + struct ast_channel_snapshot *channel_snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "bridge", ast_bridge_snapshot_to_json(bridge_snapshot), + "channel", ast_channel_snapshot_to_json(channel_snapshot)); +} + +struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg) +{ + struct ast_bridge_blob *obj = stasis_message_data(msg); + + return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge, + obj->channel, stasis_message_timestamp(msg)); +} + +struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg) +{ + struct ast_bridge_blob *obj = stasis_message_data(msg); + + return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge, + obj->channel, stasis_message_timestamp(msg)); +} + typedef struct ast_json *(*json_item_serializer_cb)(void *obj); static struct ast_json *container_to_json_array(struct ao2_container *items, json_item_serializer_cb item_cb) |