diff options
Diffstat (limited to 'res/res_stasis.c')
-rw-r--r-- | res/res_stasis.c | 294 |
1 files changed, 137 insertions, 157 deletions
diff --git a/res/res_stasis.c b/res/res_stasis.c index de432e409..ed3823051 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -48,7 +48,6 @@ */ /*** MODULEINFO - <depend>res_stasis_json_events</depend> <support_level>core</support_level> ***/ @@ -66,7 +65,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/strings.h" #include "stasis/app.h" #include "stasis/control.h" -#include "stasis_json/resource_events.h" /*! Time to wait for a frame in the application */ #define MAX_WAIT_MS 200 @@ -233,28 +231,60 @@ static struct ao2_container *get_apps_watching_channel(const char *uniqueid) /*! \brief Typedef for callbacks that get called on channel snapshot updates */ typedef struct ast_json *(*channel_snapshot_monitor)( struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot); + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv); + +static struct ast_json *simple_channel_event( + const char *type, + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "channel", ast_channel_snapshot_to_json(snapshot)); +} + +static struct ast_json *channel_created_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return simple_channel_event("ChannelCreated", snapshot, tv); +} + +static struct ast_json *channel_destroyed_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", + "type", "ChannelDestroyed", + "timestamp", ast_json_timeval(*tv, NULL), + "cause", snapshot->hangupcause, + "cause_txt", ast_cause2str(snapshot->hangupcause), + "channel", ast_channel_snapshot_to_json(snapshot)); +} + +static struct ast_json *channel_state_change_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return simple_channel_event("ChannelStateChange", snapshot, tv); +} /*! \brief Handle channel state changes */ static struct ast_json *channel_state( struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot) + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) { - RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot; if (!old_snapshot) { - return stasis_json_event_channel_created_create(snapshot); + return channel_created_event(snapshot, tv); } else if (!new_snapshot) { - json = ast_json_pack("{s: i, s: s}", - "cause", snapshot->hangupcause, - "cause_txt", ast_cause2str(snapshot->hangupcause)); - if (!json) { - return NULL; - } - return stasis_json_event_channel_destroyed_create(snapshot, json); + return channel_destroyed_event(snapshot, tv); } else if (old_snapshot->state != new_snapshot->state) { - return stasis_json_event_channel_state_change_create(snapshot); + return channel_state_change_event(snapshot, tv); } return NULL; @@ -262,7 +292,8 @@ static struct ast_json *channel_state( static struct ast_json *channel_dialplan( struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot) + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) { RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); @@ -280,19 +311,18 @@ static struct ast_json *channel_dialplan( return NULL; } - json = ast_json_pack("{s: s, s: s}", - "application", new_snapshot->appl, - "application_data", new_snapshot->data); - if (!json) { - return NULL; - } - - return stasis_json_event_channel_dialplan_create(new_snapshot, json); + return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}", + "type", "ChannelDialplan", + "timestamp", ast_json_timeval(*tv, NULL), + "dialplan_app", new_snapshot->appl, + "dialplan_app_data", new_snapshot->data, + "channel", ast_channel_snapshot_to_json(new_snapshot)); } static struct ast_json *channel_callerid( struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot) + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) { RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); @@ -305,29 +335,16 @@ static struct ast_json *channel_callerid( return NULL; } - json = ast_json_pack("{s: i, s: s}", + return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", + "type", "ChannelCallerId", + "timestamp", ast_json_timeval(*tv, NULL), "caller_presentation", new_snapshot->caller_pres, - "caller_presentation_txt", ast_describe_caller_presentation(new_snapshot->caller_pres)); - if (!json) { - return NULL; - } - - return stasis_json_event_channel_caller_id_create(new_snapshot, json); -} - -static struct ast_json *channel_snapshot( - struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot) -{ - if (!new_snapshot) { - return NULL; - } - - return stasis_json_event_channel_snapshot_create(new_snapshot); + "caller_presentation_txt", ast_describe_caller_presentation( + new_snapshot->caller_pres), + "channel", ast_channel_snapshot_to_json(new_snapshot)); } channel_snapshot_monitor channel_monitors[] = { - channel_snapshot, channel_state, channel_dialplan, channel_callerid @@ -351,6 +368,9 @@ static void sub_channel_snapshot_handler(void *data, struct stasis_cache_update *update = stasis_message_data(message); struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot); struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot); + /* Pull timestamp from the new snapshot, or from the update message + * when there isn't one. */ + const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message); int i; watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid); @@ -361,7 +381,7 @@ static void sub_channel_snapshot_handler(void *data, for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - msg = channel_monitors[i](old_snapshot, new_snapshot); + msg = channel_monitors[i](old_snapshot, new_snapshot, tv); if (msg) { ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg); } @@ -373,22 +393,26 @@ static void distribute_message(struct ao2_container *apps, struct ast_json *msg) ao2_callback(apps, OBJ_NODATA, app_send_cb, msg); } -static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_handler_cb handler_cb) +static void sub_channel_blob_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup); + struct ast_channel_blob *obj = stasis_message_data(message); if (!obj->snapshot) { return; } - watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid); - if (!watching_apps) { + msg = stasis_message_to_json(message); + if (!msg) { return; } - msg = handler_cb(obj); - if (!msg) { + watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid); + if (!watching_apps) { return; } @@ -446,7 +470,6 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc, char *argv[]) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); struct ast_json *json_args; @@ -460,13 +483,16 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan, return -1; } - blob = ast_json_pack("{s: []}", "args"); - if (!blob) { + msg = ast_json_pack("{s: s, s: [], s: o}", + "type", "StasisStart", + "args", + "channel", ast_channel_snapshot_to_json(snapshot)); + if (!msg) { return -1; } /* Append arguments to args array */ - json_args = ast_json_object_get(blob, "args"); + json_args = ast_json_object_get(msg, "args"); ast_assert(json_args != NULL); for (i = 0; i < argc; ++i) { int r = ast_json_array_append(json_args, @@ -477,11 +503,6 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan, } } - msg = stasis_json_event_stasis_start_create(snapshot, blob); - if (!msg) { - return -1; - } - app_send(app, msg); return 0; } @@ -499,7 +520,9 @@ int app_send_end_msg(struct app *app, struct ast_channel *chan) return -1; } - msg = stasis_json_event_stasis_end_create(snapshot); + msg = ast_json_pack("{s: s, s: o}", + "type", "StasisEnd", + "channel", ast_channel_snapshot_to_json(snapshot)); if (!msg) { return -1; } @@ -633,15 +656,13 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) app = ao2_find(apps_registry, app_name, OBJ_KEY | OBJ_NOLOCK); if (app) { - RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - blob = ast_json_pack("{s: s}", "application", app_name); - if (blob) { - msg = stasis_json_event_application_replaced_create(blob); - if (msg) { - app_send(app, msg); - } + msg = ast_json_pack("{s: s, s: s}", + "type", "ApplicationReplaced", + "application", app_name); + if (msg) { + app_send(app, msg); } app_update(app, handler, data); @@ -665,82 +686,6 @@ void stasis_app_unregister(const char *app_name) } } -static struct ast_json *handle_blob_dtmf(struct ast_channel_blob *obj) -{ - RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref); - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - const char *direction; - - /* To simplify events, we'll only generate on receive */ - direction = ast_json_string_get( - ast_json_object_get(obj->blob, "direction")); - - if (strcmp("Received", direction) != 0) { - return NULL; - } - - extra = ast_json_pack( - "{s: o}", - "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit"))); - if (!extra) { - return NULL; - } - - return stasis_json_event_channel_dtmf_received_create(obj->snapshot, extra); -} - -/* To simplify events, we'll only generate on DTMF end (dtmf_end type) */ -static void sub_dtmf_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - struct ast_channel_blob *obj = stasis_message_data(message); - generic_blob_handler(obj, handle_blob_dtmf); -} - -static struct ast_json *handle_blob_userevent(struct ast_channel_blob *obj) -{ - return stasis_json_event_channel_userevent_create(obj->snapshot, obj->blob); -} - -static void sub_userevent_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - struct ast_channel_blob *obj = stasis_message_data(message); - generic_blob_handler(obj, handle_blob_userevent); -} - -static struct ast_json *handle_blob_hangup_request(struct ast_channel_blob *obj) -{ - return stasis_json_event_channel_hangup_request_create(obj->snapshot, obj->blob); -} - -static void sub_hangup_request_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - struct ast_channel_blob *obj = stasis_message_data(message); - generic_blob_handler(obj, handle_blob_hangup_request); -} - -static struct ast_json *handle_blob_varset(struct ast_channel_blob *obj) -{ - return stasis_json_event_channel_varset_create(obj->snapshot, obj->blob); -} - -static void sub_varset_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - struct ast_channel_blob *obj = stasis_message_data(message); - generic_blob_handler(obj, handle_blob_varset); -} - void stasis_app_ref(void) { ast_module_ref(ast_module_info->self); @@ -788,6 +733,30 @@ static int remove_bridge_cb(void *obj, void *arg, int flags) return 0; } +static struct ast_json *simple_bridge_event( + const char *type, + struct ast_bridge_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "bridge", ast_bridge_snapshot_to_json(snapshot)); +} + +static struct ast_json *simple_bridge_channel_event( + const char *type, + struct ast_bridge_snapshot *bridge_snapshot, + struct ast_channel_snapshot *channel_snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "bridge", ast_bridge_snapshot_to_json(bridge_snapshot), + "channel", ast_channel_snapshot_to_json(channel_snapshot)); +} + static void sub_bridge_snapshot_handler(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, @@ -797,6 +766,8 @@ static void sub_bridge_snapshot_handler(void *data, struct stasis_cache_update *update = stasis_message_data(message); struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot); struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot); + const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message); + RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid); @@ -810,11 +781,11 @@ static void sub_bridge_snapshot_handler(void *data, /* The bridge has gone away. Create the message, make sure no apps are * watching this bridge anymore, and destroy the bridge's control * structure */ - msg = stasis_json_event_bridge_destroyed_create(old_snapshot); + msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv); ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id); stasis_app_bridge_destroy(old_snapshot->uniqueid); } else if (!old_snapshot) { - msg = stasis_json_event_bridge_created_create(old_snapshot); + msg = simple_bridge_event("BridgeCreated", old_snapshot, tv); } if (!msg) { @@ -865,6 +836,7 @@ static void sub_bridge_merge_handler(void *data, struct ast_bridge_merge_message *merge = stasis_message_data(message); RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); + const struct timeval *tv = stasis_message_timestamp(message); watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid); if (watching_apps_to) { @@ -881,16 +853,16 @@ static void sub_bridge_merge_handler(void *data, return; } - /* The secondary bridge has to be packed into JSON by hand because the auto-generated - * JSON event generator can only handle one instance of a given snapshot type in an - * elegant way */ - blob = ast_json_pack("{s: o}", "bridge_from", ast_bridge_snapshot_to_json(merge->from)); - if (!blob) { + msg = ast_json_pack("{s: s, s: o, s: o, s: o}", + "type", "BridgeMerged", + "timestamp", ast_json_timeval(*tv, NULL), + "bridge", ast_bridge_snapshot_to_json(merge->to), + "bridge_from", ast_bridge_snapshot_to_json(merge->from)); + + if (!msg) { return; } - msg = stasis_json_event_bridge_merged_create(merge->to, blob); - distribute_message(watching_apps_all, msg); } @@ -920,7 +892,8 @@ static void sub_bridge_enter_handler(void *data, return; } - msg = stasis_json_event_channel_entered_bridge_create(obj->bridge, obj->channel); + msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge, + obj->channel, stasis_message_timestamp(message)); distribute_message(watching_apps_all, msg); } @@ -939,7 +912,8 @@ static void sub_bridge_leave_handler(void *data, return; } - msg = stasis_json_event_channel_left_bridge_create(obj->bridge, obj->channel); + msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge, + obj->channel, stasis_message_timestamp(message)); distribute_message(watching_apps_bridge, msg); } @@ -972,10 +946,16 @@ static int load_module(void) } r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL); - r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_userevent_handler, NULL); - r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_varset_handler, NULL); - r |= stasis_message_router_add(channel_router, ast_channel_dtmf_begin_type(), sub_dtmf_handler, NULL); - r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_hangup_request_handler, NULL); + /* TODO: This could be handled a lot better. Instead of subscribing to + * the one caching topic and filtering out messages by channel id, we + * should have individual caching topics per-channel, with a shared + * back-end cache. That would simplify a lot of what's going on right + * here. + */ + r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL); + r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL); + r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL); + r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL); if (r) { return AST_MODULE_LOAD_FAILURE; } |