summaryrefslogtreecommitdiff
path: root/res/res_stasis.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-07-03 16:32:41 +0000
committerDavid M. Lee <dlee@digium.com>2013-07-03 16:32:41 +0000
commitc9a3d4562ddb1ed5b34f7d5530efd6aa695377c2 (patch)
treedd082285fbb5c7714164e26145acc5c966e663be /res/res_stasis.c
parentdcf03554a0b38806bf1fe258acc423b070533d6e (diff)
Update events to use Swagger 1.3 subtyping, and related aftermath
This patch started with the simple idea of changing the /events data model to be more sane. The original model would send out events like: { "stasis_start": { "args": [], "channel": { ... } } } The event discriminator was the field name instead of being a value in the object, due to limitations in how Swagger 1.1 could model objects. While technically sufficient in communicating event information, it was really difficult to deal with in terms of client side JSON handling. This patch takes advantage of a proposed extension[1] to Swagger which allows type variance through the use of a discriminator field. This had a domino effect that made this a surprisingly large patch. [1]: https://groups.google.com/d/msg/wordnik-api/EC3rGajE0os/ey_5dBI_jWcJ In changing the models, I also had to change the swagger_model.py processor so it can handle the type discriminator and subtyping. I took that a big step forward, and using that information to generate an ari_model module, which can validate a JSON object against the Swagger model. The REST and WebSocket generators were changed to take advantage of the validators. If compiled with AST_DEVMODE enabled, JSON objects that don't match their corresponding models will not be sent out. For REST API calls, a 500 Internal Server response is sent. For WebSockets, the invalid JSON message is replaced with an error message. Since this took over about half of the job of the existing JSON generators, and the .to_json virtual function on messages took over the other half, I reluctantly removed the generators. The validators turned up all sorts of errors and inconsistencies in our data models, and the code. These were cleaned up, with checks in the code generator avoid some of the consistency problems in the future. * The model for a channel snapshot was trimmed down to match the information sent via AMI. Many of the field being sent were not useful in the general case. * The model for a bridge snapshot was updated to be more consistent with the other ARI models. Another impact of introducing subtyping was that the swagger-codegen documentation generator was insufficient (at least until it catches up with Swagger 1.2). I wanted it to be easier to generate docs for the API anyways, so I ported the wiki pages to use the Asterisk Swagger generator. In the process, I was able to clean up many of the model links, which would occasionally give inconsistent results on the wiki. I also added error responses to the wiki docs, making the wiki documentation more complete. Finally, since Stasis-HTTP will now be named Asterisk REST Interface (ARI), any new functions and files I created carry the ari_ prefix. I changed a few stasis_http references to ari where it was non-intrusive and made sense. (closes issue ASTERISK-21885) Review: https://reviewboard.asterisk.org/r/2639/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@393529 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res/res_stasis.c')
-rw-r--r--res/res_stasis.c294
1 files changed, 137 insertions, 157 deletions
diff --git a/res/res_stasis.c b/res/res_stasis.c
index de432e409..ed3823051 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -48,7 +48,6 @@
*/
/*** MODULEINFO
- <depend>res_stasis_json_events</depend>
<support_level>core</support_level>
***/
@@ -66,7 +65,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/strings.h"
#include "stasis/app.h"
#include "stasis/control.h"
-#include "stasis_json/resource_events.h"
/*! Time to wait for a frame in the application */
#define MAX_WAIT_MS 200
@@ -233,28 +231,60 @@ static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
/*! \brief Typedef for callbacks that get called on channel snapshot updates */
typedef struct ast_json *(*channel_snapshot_monitor)(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot);
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv);
+
+static struct ast_json *simple_channel_event(
+ const char *type,
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_created_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelCreated", snapshot, tv);
+}
+
+static struct ast_json *channel_destroyed_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelDestroyed",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "cause", snapshot->hangupcause,
+ "cause_txt", ast_cause2str(snapshot->hangupcause),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_state_change_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelStateChange", snapshot, tv);
+}
/*! \brief Handle channel state changes */
static struct ast_json *channel_state(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
if (!old_snapshot) {
- return stasis_json_event_channel_created_create(snapshot);
+ return channel_created_event(snapshot, tv);
} else if (!new_snapshot) {
- json = ast_json_pack("{s: i, s: s}",
- "cause", snapshot->hangupcause,
- "cause_txt", ast_cause2str(snapshot->hangupcause));
- if (!json) {
- return NULL;
- }
- return stasis_json_event_channel_destroyed_create(snapshot, json);
+ return channel_destroyed_event(snapshot, tv);
} else if (old_snapshot->state != new_snapshot->state) {
- return stasis_json_event_channel_state_change_create(snapshot);
+ return channel_state_change_event(snapshot, tv);
}
return NULL;
@@ -262,7 +292,8 @@ static struct ast_json *channel_state(
static struct ast_json *channel_dialplan(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -280,19 +311,18 @@ static struct ast_json *channel_dialplan(
return NULL;
}
- json = ast_json_pack("{s: s, s: s}",
- "application", new_snapshot->appl,
- "application_data", new_snapshot->data);
- if (!json) {
- return NULL;
- }
-
- return stasis_json_event_channel_dialplan_create(new_snapshot, json);
+ return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
+ "type", "ChannelDialplan",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "dialplan_app", new_snapshot->appl,
+ "dialplan_app_data", new_snapshot->data,
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
static struct ast_json *channel_callerid(
struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -305,29 +335,16 @@ static struct ast_json *channel_callerid(
return NULL;
}
- json = ast_json_pack("{s: i, s: s}",
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelCallerId",
+ "timestamp", ast_json_timeval(*tv, NULL),
"caller_presentation", new_snapshot->caller_pres,
- "caller_presentation_txt", ast_describe_caller_presentation(new_snapshot->caller_pres));
- if (!json) {
- return NULL;
- }
-
- return stasis_json_event_channel_caller_id_create(new_snapshot, json);
-}
-
-static struct ast_json *channel_snapshot(
- struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot)
-{
- if (!new_snapshot) {
- return NULL;
- }
-
- return stasis_json_event_channel_snapshot_create(new_snapshot);
+ "caller_presentation_txt", ast_describe_caller_presentation(
+ new_snapshot->caller_pres),
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
channel_snapshot_monitor channel_monitors[] = {
- channel_snapshot,
channel_state,
channel_dialplan,
channel_callerid
@@ -351,6 +368,9 @@ static void sub_channel_snapshot_handler(void *data,
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
+ /* Pull timestamp from the new snapshot, or from the update message
+ * when there isn't one. */
+ const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
int i;
watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
@@ -361,7 +381,7 @@ static void sub_channel_snapshot_handler(void *data,
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- msg = channel_monitors[i](old_snapshot, new_snapshot);
+ msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
if (msg) {
ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
@@ -373,22 +393,26 @@ static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
}
-static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_handler_cb handler_cb)
+static void sub_channel_blob_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+ struct ast_channel_blob *obj = stasis_message_data(message);
if (!obj->snapshot) {
return;
}
- watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
- if (!watching_apps) {
+ msg = stasis_message_to_json(message);
+ if (!msg) {
return;
}
- msg = handler_cb(obj);
- if (!msg) {
+ watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
+ if (!watching_apps) {
return;
}
@@ -446,7 +470,6 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
int argc, char *argv[])
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
struct ast_json *json_args;
@@ -460,13 +483,16 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
return -1;
}
- blob = ast_json_pack("{s: []}", "args");
- if (!blob) {
+ msg = ast_json_pack("{s: s, s: [], s: o}",
+ "type", "StasisStart",
+ "args",
+ "channel", ast_channel_snapshot_to_json(snapshot));
+ if (!msg) {
return -1;
}
/* Append arguments to args array */
- json_args = ast_json_object_get(blob, "args");
+ json_args = ast_json_object_get(msg, "args");
ast_assert(json_args != NULL);
for (i = 0; i < argc; ++i) {
int r = ast_json_array_append(json_args,
@@ -477,11 +503,6 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
}
}
- msg = stasis_json_event_stasis_start_create(snapshot, blob);
- if (!msg) {
- return -1;
- }
-
app_send(app, msg);
return 0;
}
@@ -499,7 +520,9 @@ int app_send_end_msg(struct app *app, struct ast_channel *chan)
return -1;
}
- msg = stasis_json_event_stasis_end_create(snapshot);
+ msg = ast_json_pack("{s: s, s: o}",
+ "type", "StasisEnd",
+ "channel", ast_channel_snapshot_to_json(snapshot));
if (!msg) {
return -1;
}
@@ -633,15 +656,13 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
app = ao2_find(apps_registry, app_name, OBJ_KEY | OBJ_NOLOCK);
if (app) {
- RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- blob = ast_json_pack("{s: s}", "application", app_name);
- if (blob) {
- msg = stasis_json_event_application_replaced_create(blob);
- if (msg) {
- app_send(app, msg);
- }
+ msg = ast_json_pack("{s: s, s: s}",
+ "type", "ApplicationReplaced",
+ "application", app_name);
+ if (msg) {
+ app_send(app, msg);
}
app_update(app, handler, data);
@@ -665,82 +686,6 @@ void stasis_app_unregister(const char *app_name)
}
}
-static struct ast_json *handle_blob_dtmf(struct ast_channel_blob *obj)
-{
- RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- const char *direction;
-
- /* To simplify events, we'll only generate on receive */
- direction = ast_json_string_get(
- ast_json_object_get(obj->blob, "direction"));
-
- if (strcmp("Received", direction) != 0) {
- return NULL;
- }
-
- extra = ast_json_pack(
- "{s: o}",
- "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
- if (!extra) {
- return NULL;
- }
-
- return stasis_json_event_channel_dtmf_received_create(obj->snapshot, extra);
-}
-
-/* To simplify events, we'll only generate on DTMF end (dtmf_end type) */
-static void sub_dtmf_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_dtmf);
-}
-
-static struct ast_json *handle_blob_userevent(struct ast_channel_blob *obj)
-{
- return stasis_json_event_channel_userevent_create(obj->snapshot, obj->blob);
-}
-
-static void sub_userevent_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_userevent);
-}
-
-static struct ast_json *handle_blob_hangup_request(struct ast_channel_blob *obj)
-{
- return stasis_json_event_channel_hangup_request_create(obj->snapshot, obj->blob);
-}
-
-static void sub_hangup_request_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_hangup_request);
-}
-
-static struct ast_json *handle_blob_varset(struct ast_channel_blob *obj)
-{
- return stasis_json_event_channel_varset_create(obj->snapshot, obj->blob);
-}
-
-static void sub_varset_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct ast_channel_blob *obj = stasis_message_data(message);
- generic_blob_handler(obj, handle_blob_varset);
-}
-
void stasis_app_ref(void)
{
ast_module_ref(ast_module_info->self);
@@ -788,6 +733,30 @@ static int remove_bridge_cb(void *obj, void *arg, int flags)
return 0;
}
+static struct ast_json *simple_bridge_event(
+ const char *type,
+ struct ast_bridge_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *simple_bridge_channel_event(
+ const char *type,
+ struct ast_bridge_snapshot *bridge_snapshot,
+ struct ast_channel_snapshot *channel_snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+ "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
static void sub_bridge_snapshot_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
@@ -797,6 +766,8 @@ static void sub_bridge_snapshot_handler(void *data,
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
+ const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
+
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
@@ -810,11 +781,11 @@ static void sub_bridge_snapshot_handler(void *data,
/* The bridge has gone away. Create the message, make sure no apps are
* watching this bridge anymore, and destroy the bridge's control
* structure */
- msg = stasis_json_event_bridge_destroyed_create(old_snapshot);
+ msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
stasis_app_bridge_destroy(old_snapshot->uniqueid);
} else if (!old_snapshot) {
- msg = stasis_json_event_bridge_created_create(old_snapshot);
+ msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
}
if (!msg) {
@@ -865,6 +836,7 @@ static void sub_bridge_merge_handler(void *data,
struct ast_bridge_merge_message *merge = stasis_message_data(message);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
+ const struct timeval *tv = stasis_message_timestamp(message);
watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
if (watching_apps_to) {
@@ -881,16 +853,16 @@ static void sub_bridge_merge_handler(void *data,
return;
}
- /* The secondary bridge has to be packed into JSON by hand because the auto-generated
- * JSON event generator can only handle one instance of a given snapshot type in an
- * elegant way */
- blob = ast_json_pack("{s: o}", "bridge_from", ast_bridge_snapshot_to_json(merge->from));
- if (!blob) {
+ msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
+ "type", "BridgeMerged",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(merge->to),
+ "bridge_from", ast_bridge_snapshot_to_json(merge->from));
+
+ if (!msg) {
return;
}
- msg = stasis_json_event_bridge_merged_create(merge->to, blob);
-
distribute_message(watching_apps_all, msg);
}
@@ -920,7 +892,8 @@ static void sub_bridge_enter_handler(void *data,
return;
}
- msg = stasis_json_event_channel_entered_bridge_create(obj->bridge, obj->channel);
+ msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(message));
distribute_message(watching_apps_all, msg);
}
@@ -939,7 +912,8 @@ static void sub_bridge_leave_handler(void *data,
return;
}
- msg = stasis_json_event_channel_left_bridge_create(obj->bridge, obj->channel);
+ msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(message));
distribute_message(watching_apps_bridge, msg);
}
@@ -972,10 +946,16 @@ static int load_module(void)
}
r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_userevent_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_varset_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_dtmf_begin_type(), sub_dtmf_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_hangup_request_handler, NULL);
+ /* TODO: This could be handled a lot better. Instead of subscribing to
+ * the one caching topic and filtering out messages by channel id, we
+ * should have individual caching topics per-channel, with a shared
+ * back-end cache. That would simplify a lot of what's going on right
+ * here.
+ */
+ r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
+ r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
+ r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
+ r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
if (r) {
return AST_MODULE_LOAD_FAILURE;
}