summaryrefslogtreecommitdiff
path: root/res/res_stasis.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_stasis.c')
-rw-r--r--res/res_stasis.c294
1 files changed, 137 insertions, 157 deletions
diff --git a/res/res_stasis.c b/res/res_stasis.c
index de432e409..ed3823051 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -48,7 +48,6 @@
*/
/*** MODULEINFO
- <depend>res_stasis_json_events</depend>
<support_level>core</support_level>
***/
@@ -66,7 +65,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/strings.h"
#include "stasis/app.h"
#include "stasis/control.h"
-#include "stasis_json/resource_events.h"
/*! Time to wait for a frame in the application */
#define MAX_WAIT_MS 200
@@ -233,28 +231,60 @@ static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
/*! \brief Typedef for callbacks that get called on channel snapshot updates */
typedef struct ast_json *(*channel_snapshot_monitor)(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot);
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv);
+
+static struct ast_json *simple_channel_event(
+ const char *type,
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_created_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelCreated", snapshot, tv);
+}
+
+static struct ast_json *channel_destroyed_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelDestroyed",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "cause", snapshot->hangupcause,
+ "cause_txt", ast_cause2str(snapshot->hangupcause),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_state_change_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelStateChange", snapshot, tv);
+}
/*! \brief Handle channel state changes */
static struct ast_json *channel_state(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
if (!old_snapshot) {
- return stasis_json_event_channel_created_create(snapshot);
+ return channel_created_event(snapshot, tv);
} else if (!new_snapshot) {
- json = ast_json_pack("{s: i, s: s}",
- "cause", snapshot->hangupcause,
- "cause_txt", ast_cause2str(snapshot->hangupcause));
- if (!json) {
- return NULL;
- }
- return stasis_json_event_channel_destroyed_create(snapshot, json);
+ return channel_destroyed_event(snapshot, tv);
} else if (old_snapshot->state != new_snapshot->state) {
- return stasis_json_event_channel_state_change_create(snapshot);
+ return channel_state_change_event(snapshot, tv);
}
return NULL;
@@ -262,7 +292,8 @@ static struct ast_json *channel_state(
static struct ast_json *channel_dialplan(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -280,19 +311,18 @@ static struct ast_json *channel_dialplan(
return NULL;
}
- json = ast_json_pack("{s: s, s: s}",
- "application", new_snapshot->appl,
- "application_data", new_snapshot->data);
- if (!json) {
- return NULL;
- }
-
- return stasis_json_event_channel_dialplan_create(new_snapshot, json);
+ return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
+ "type", "ChannelDialplan",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "dialplan_app", new_snapshot->appl,
+ "dialplan_app_data", new_snapshot->data,
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
static struct ast_json *channel_callerid(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -305,29 +335,16 @@ static struct ast_json *channel_callerid(
return NULL;
}
- json = ast_json_pack("{s: i, s: s}",
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelCallerId",
+ "timestamp", ast_json_timeval(*tv, NULL),
"caller_presentation", new_snapshot->caller_pres,
- "caller_presentation_txt", ast_describe_caller_presentation(new_snapshot->caller_pres));
- if (!json) {
- return NULL;
- }
-
- return stasis_json_event_channel_caller_id_create(new_snapshot, json);
-}
-
-static struct ast_json *channel_snapshot(
- struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
-{
- if (!new_snapshot) {
- return NULL;
- }
-
- return stasis_json_event_channel_snapshot_create(new_snapshot);
+ "caller_presentation_txt", ast_describe_caller_presentation(
+ new_snapshot->caller_pres),
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
channel_snapshot_monitor channel_monitors[] = {
- channel_snapshot,
channel_state,
channel_dialplan,
channel_callerid
@@ -351,6 +368,9 @@ static void sub_channel_snapshot_handler(void *data,
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
+ /* Pull timestamp from the new snapshot, or from the update message
+ * when there isn't one. */
+ const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
int i;
watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
@@ -361,7 +381,7 @@ static void sub_channel_snapshot_handler(void *data,
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- msg = channel_monitors[i](old_snapshot, new_snapshot);
+ msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
if (msg) {
ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
@@ -373,22 +393,26 @@ static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
}
-static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_handler_cb handler_cb)
+static void sub_channel_blob_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+ struct ast_channel_blob *obj = stasis_message_data(message);
if (!obj->snapshot) {
return;
}
- watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
- if (!watching_apps) {
+ msg = stasis_message_to_json(message);
+ if (!msg) {
return;
}
- msg = handler_cb(obj);
- if (!msg) {
+ watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
+ if (!watching_apps) {
return;
}
@@ -446,7 +470,6 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
int argc, char *argv[])
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
struct ast_json *json_args;
@@ -460,13 +483,16 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
return -1;
}
- blob = ast_json_pack("{s: []}", "args");
- if (!blob) {
+ msg = ast_json_pack("{s: s, s: [], s: o}",
+ "type", "StasisStart",
+ "args",
+ "channel", ast_channel_snapshot_to_json(snapshot));
+ if (!msg) {
return -1;
}
/* Append arguments to args array */
- json_args = ast_json_object_get(blob, "args");
+ json_args = ast_json_object_get(msg, "args");
ast_assert(json_args != NULL);
for (i = 0; i < argc; ++i) {
int r = ast_json_array_append(json_args,
@@ -477,11 +503,6 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
}
}
- msg = stasis_json_event_stasis_start_create(snapshot, blob);
- if (!msg) {
- return -1;
- }
-
app_send(app, msg);
return 0;
}
@@ -499,7 +520,9 @@ int app_send_end_msg(struct app *app, struct ast_channel *chan)
return -1;
}
- msg = stasis_json_event_stasis_end_create(snapshot);
+ msg = ast_json_pack("{s: s, s: o}",
+ "type", "StasisEnd",
+ "channel", ast_channel_snapshot_to_json(snapshot));
if (!msg) {
return -1;
}
@@ -633,15 +656,13 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
app = ao2_find(apps_registry, app_name, OBJ_KEY | OBJ_NOLOCK);
if (app) {
- RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- blob = ast_json_pack("{s: s}", "application", app_name);
- if (blob) {
- msg = stasis_json_event_application_replaced_create(blob);
- if (msg) {
- app_send(app, msg);
- }
+ msg = ast_json_pack("{s: s, s: s}",
+ "type", "ApplicationReplaced",
+ "application", app_name);
+ if (msg) {
+ app_send(app, msg);
}
app_update(app, handler, data);
@@ -665,82 +686,6 @@ void stasis_app_unregister(const char *app_name)
}
}
-static struct ast_json *handle_blob_dtmf(struct ast_channel_blob *obj)
-{
- RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- const char *direction;
-
- /* To simplify events, we'll only generate on receive */
- direction = ast_json_string_get(
- ast_json_object_get(obj->blob, "direction"));
-
- if (strcmp("Received", direction) != 0) {
- return NULL;
- }
-
- extra = ast_json_pack(
- "{s: o}",
- "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
- if (!extra) {
- return NULL;
- }
-
- return stasis_json_event_channel_dtmf_received_create(obj->snapshot, extra);
-}
-
-/* To simplify events, we'll only generate on DTMF end (dtmf_end type) */
-static void sub_dtmf_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_dtmf);
-}
-
-static struct ast_json *handle_blob_userevent(struct ast_channel_blob *obj)
-{
- return stasis_json_event_channel_userevent_create(obj->snapshot, obj->blob);
-}
-
-static void sub_userevent_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_userevent);
-}
-
-static struct ast_json *handle_blob_hangup_request(struct ast_channel_blob *obj)
-{
- return stasis_json_event_channel_hangup_request_create(obj->snapshot, obj->blob);
-}
-
-static void sub_hangup_request_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_hangup_request);
-}
-
-static struct ast_json *handle_blob_varset(struct ast_channel_blob *obj)
-{
- return stasis_json_event_channel_varset_create(obj->snapshot, obj->blob);
-}
-
-static void sub_varset_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_varset);
-}
-
void stasis_app_ref(void)
{
ast_module_ref(ast_module_info->self);
@@ -788,6 +733,30 @@ static int remove_bridge_cb(void *obj, void *arg, int flags)
return 0;
}
+static struct ast_json *simple_bridge_event(
+ const char *type,
+ struct ast_bridge_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *simple_bridge_channel_event(
+ const char *type,
+ struct ast_bridge_snapshot *bridge_snapshot,
+ struct ast_channel_snapshot *channel_snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+ "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
static void sub_bridge_snapshot_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
@@ -797,6 +766,8 @@ static void sub_bridge_snapshot_handler(void *data,
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
+ const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
+
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
@@ -810,11 +781,11 @@ static void sub_bridge_snapshot_handler(void *data,
/* The bridge has gone away. Create the message, make sure no apps are
* watching this bridge anymore, and destroy the bridge's control
* structure */
- msg = stasis_json_event_bridge_destroyed_create(old_snapshot);
+ msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
stasis_app_bridge_destroy(old_snapshot->uniqueid);
} else if (!old_snapshot) {
- msg = stasis_json_event_bridge_created_create(old_snapshot);
+ msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
}
if (!msg) {
@@ -865,6 +836,7 @@ static void sub_bridge_merge_handler(void *data,
struct ast_bridge_merge_message *merge = stasis_message_data(message);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
+ const struct timeval *tv = stasis_message_timestamp(message);
watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
if (watching_apps_to) {
@@ -881,16 +853,16 @@ static void sub_bridge_merge_handler(void *data,
return;
}
- /* The secondary bridge has to be packed into JSON by hand because the auto-generated
- * JSON event generator can only handle one instance of a given snapshot type in an
- * elegant way */
- blob = ast_json_pack("{s: o}", "bridge_from", ast_bridge_snapshot_to_json(merge->from));
- if (!blob) {
+ msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
+ "type", "BridgeMerged",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(merge->to),
+ "bridge_from", ast_bridge_snapshot_to_json(merge->from));
+
+ if (!msg) {
return;
}
- msg = stasis_json_event_bridge_merged_create(merge->to, blob);
-
distribute_message(watching_apps_all, msg);
}
@@ -920,7 +892,8 @@ static void sub_bridge_enter_handler(void *data,
return;
}
- msg = stasis_json_event_channel_entered_bridge_create(obj->bridge, obj->channel);
+ msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(message));
distribute_message(watching_apps_all, msg);
}
@@ -939,7 +912,8 @@ static void sub_bridge_leave_handler(void *data,
return;
}
- msg = stasis_json_event_channel_left_bridge_create(obj->bridge, obj->channel);
+ msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(message));
distribute_message(watching_apps_bridge, msg);
}
@@ -972,10 +946,16 @@ static int load_module(void)
}
r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_userevent_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_varset_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_dtmf_begin_type(), sub_dtmf_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_hangup_request_handler, NULL);
+ /* TODO: This could be handled a lot better. Instead of subscribing to
+ * the one caching topic and filtering out messages by channel id, we
+ * should have individual caching topics per-channel, with a shared
+ * back-end cache. That would simplify a lot of what's going on right
+ * here.
+ */
+ r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
+ r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
+ r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
+ r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
if (r) {
return AST_MODULE_LOAD_FAILURE;
}