diff options
-rw-r--r-- | CHANGES | 9 | ||||
-rw-r--r-- | apps/app_userevent.c | 2 | ||||
-rw-r--r-- | include/asterisk/stasis.h | 71 | ||||
-rw-r--r-- | include/asterisk/stasis_app.h | 34 | ||||
-rw-r--r-- | include/asterisk/stasis_channels.h | 8 | ||||
-rw-r--r-- | main/manager_channels.c | 51 | ||||
-rw-r--r-- | main/stasis.c | 258 | ||||
-rw-r--r-- | main/stasis_channels.c | 27 | ||||
-rw-r--r-- | main/stasis_endpoints.c | 6 | ||||
-rw-r--r-- | res/ari/ari_model_validators.c | 25 | ||||
-rw-r--r-- | res/ari/ari_model_validators.h | 4 | ||||
-rw-r--r-- | res/ari/resource_events.c | 56 | ||||
-rw-r--r-- | res/ari/resource_events.h | 34 | ||||
-rw-r--r-- | res/res_ari_events.c | 192 | ||||
-rw-r--r-- | res/res_stasis.c | 84 | ||||
-rw-r--r-- | res/stasis/app.c | 4 | ||||
-rw-r--r-- | rest-api/api-docs/events.json | 74 |
17 files changed, 839 insertions, 100 deletions
@@ -33,6 +33,15 @@ ARI a channel's ARI control queue until they are stopped. They also can not be rewound or fastforwarded. + * User events can now be generated from ARI. Events can be signalled with + arbitrary json variables, and include one or more of channel, bridge, or + endpoint snapshots. An application must be specified which will receive + the event message (other applications can subscribe to it). The message + will also be delivered via AMI provided a channel is attached. Dialplan + generated user event messages are still transmitted via the channel, and + will only be received by a stasis application they are attached to or if + the channel is subscribed to. + chan_sip ----------- * SIP peers can now specify 'trust_id_outbound' which affects RPID/PAI diff --git a/apps/app_userevent.c b/apps/app_userevent.c index f5defd49d..8f7219eab 100644 --- a/apps/app_userevent.c +++ b/apps/app_userevent.c @@ -115,7 +115,7 @@ static int userevent_exec(struct ast_channel *chan, const char *data) } ast_channel_lock(chan); - ast_channel_publish_blob(chan, ast_channel_user_event_type(), blob); + ast_multi_object_blob_single_channel_publish(chan, ast_multi_user_event_type(), blob); ast_channel_unlock(chan); return 0; } diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index f5b4a60d9..4c4052c14 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -1032,6 +1032,77 @@ struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struc */ struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type); +/*! + * \brief Object type code for multi user object snapshots + */ +enum stasis_user_multi_object_snapshot_type { + STASIS_UMOS_CHANNEL = 0, /*!< Channel Snapshots */ + STASIS_UMOS_BRIDGE, /*!< Bridge Snapshots */ + STASIS_UMOS_ENDPOINT, /*!< Endpoint Snapshots */ +}; + +/*! \brief Number of snapshot types */ +#define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1) + +/*! + * \brief Message type for custom user defined events with multi object blobs + * \return The stasis_message_type for user event + * \since 12.3.0 + */ +struct stasis_message_type *ast_multi_user_event_type(void); + +/*! + * \brief Create a stasis multi object blob + * \since 12.3.0 + * + * \details + * Multi object blob can store a combination of arbitrary json values + * (the blob) and also snapshots of various other system objects (such + * as channels, bridges, etc) for delivery through a stasis message. + * The multi object blob is first created, then optionally objects + * are added to it, before being attached to a message and delivered + * to stasis topic. + * + * \param blob Json blob + * + * \note When used for an ast_multi_user_event_type message, the + * json blob should contain at minimum {eventname: name}. + * + * \retval ast_multi_object_blob* if succeeded + * \retval NULL if creation failed + */ +struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob); + +/*! + * \brief Add an object to a multi object blob previously created + * \since 12.3.0 + * + * \param multi The multi object blob previously created + * \param type Type code for the object such as channel, bridge, etc. + * \param object Snapshot object of the type supplied to typename + * + * \return Nothing + */ +void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object); + +/*! + * \brief Create and publish a stasis message blob on a channel with it's snapshot + * \since 12.3.0 + * + * \details + * For compatibility with app_userevent, this creates a multi object + * blob message, attaches the channel snapshot to it, and publishes it + * to the channel's topic. + * + * \param chan The channel to snapshot and publish event to + * \param type The message type + * \param blob A json blob to publish with the snapshot + * + * \return Nothing + */ +void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob); + + /*! @} */ /*! @{ */ diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h index 02c67fdca..334155a5b 100644 --- a/include/asterisk/stasis_app.h +++ b/include/asterisk/stasis_app.h @@ -228,6 +228,33 @@ void stasis_app_unregister_event_source(struct stasis_app_event_source *obj); */ void stasis_app_unregister_event_sources(void); +/*! \brief Return code for stasis_app_user_event */ +enum stasis_app_user_event_res { + STASIS_APP_USER_OK, + STASIS_APP_USER_APP_NOT_FOUND, + STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND, + STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME, + STASIS_APP_USER_USEREVENT_INVALID, + STASIS_APP_USER_INTERNAL_ERROR, +}; + +/*! + * \brief Generate a Userevent for stasis app (echo to AMI) + * + * \param app_name Name of the application to generate event for/to. + * \param event_name Name of the Userevent. + * \param source_uris URIs for the source objects to attach to event. + * \param sources_count Array size of source_uris. + * \param userevent_data Custom parameters for the user event + * \param userevents_count Array size of userevent_data + * + * \return \ref stasis_app_user_event_res return code. + */ +enum stasis_app_user_event_res stasis_app_user_event(const char *app_name, + const char *event_name, + const char **source_uris, int sources_count, + struct ast_json *json_variables); + /*! \brief Return code for stasis_app_[un]subscribe */ enum stasis_app_subscribe_res { @@ -591,6 +618,13 @@ void stasis_app_control_publish( struct stasis_app_control *control, struct stasis_message *message); /*! + * \brief Returns the stasis topic for an app + * + * \param app Stasis app to get topic of + */ +struct stasis_topic *ast_app_get_topic(struct stasis_app *app); + +/*! * \brief Queue a control frame without payload. * * \param control Control to publish to. diff --git a/include/asterisk/stasis_channels.h b/include/asterisk/stasis_channels.h index 9c64c1c49..4b1da75ba 100644 --- a/include/asterisk/stasis_channels.h +++ b/include/asterisk/stasis_channels.h @@ -381,14 +381,6 @@ struct stasis_message_type *ast_channel_varset_type(void); /*! * \since 12 - * \brief Message type for when a custom user event is sent on a channel. - * - * \retval A stasis message type - */ -struct stasis_message_type *ast_channel_user_event_type(void); - -/*! - * \since 12 * \brief Message type for when a hangup is requested on a channel. * * \retval A stasis message type diff --git a/main/manager_channels.c b/main/manager_channels.c index 507e2c9b4..9a15353f2 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -629,54 +629,6 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub, } } -static int userevent_exclusion_cb(const char *key) -{ - if (!strcmp("type", key)) { - return 1; - } - if (!strcmp("eventname", key)) { - return 1; - } - return 0; -} - -static void channel_user_event_cb(void *data, struct stasis_subscription *sub, - struct stasis_message *message) -{ - struct ast_channel_blob *obj = stasis_message_data(message); - RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); - RAII_VAR(struct ast_str *, body, NULL, ast_free); - const char *eventname; - - eventname = ast_json_string_get(ast_json_object_get(obj->blob, "eventname")); - body = ast_manager_str_from_json_object(obj->blob, userevent_exclusion_cb); - channel_event_string = ast_manager_build_channel_state_string(obj->snapshot); - - if (!channel_event_string || !body) { - return; - } - - /*** DOCUMENTATION - <managerEventInstance> - <synopsis>A user defined event raised from the dialplan.</synopsis> - <syntax> - <channel_snapshot/> - <parameter name="UserEvent"> - <para>The event name, as specified in the dialplan.</para> - </parameter> - </syntax> - <see-also> - <ref type="application">UserEvent</ref> - </see-also> - </managerEventInstance> - ***/ - manager_event(EVENT_FLAG_USER, "UserEvent", - "%s" - "UserEvent: %s\r\n" - "%s", - ast_str_buffer(channel_event_string), eventname, ast_str_buffer(body)); -} - static void publish_basic_channel_event(const char *event, int class, struct ast_channel_snapshot *snapshot) { RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); @@ -1160,9 +1112,6 @@ int manager_channels_init(void) ast_channel_snapshot_type(), channel_snapshot_update, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_user_event_type(), channel_user_event_cb, NULL); - - ret |= stasis_message_router_add(message_router, ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL); ret |= stasis_message_router_add(message_router, diff --git a/main/stasis.c b/main/stasis.c index 5eca791ef..a451e7b13 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -38,6 +38,29 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/utils.h" #include "asterisk/uuid.h" #include "asterisk/vector.h" +#include "asterisk/stasis_channels.h" +#include "asterisk/stasis_bridges.h" +#include "asterisk/stasis_endpoints.h" + +/*** DOCUMENTATION + <managerEvent language="en_US" name="UserEvent"> + <managerEventInstance class="EVENT_FLAG_USER"> + <synopsis>A user defined event raised from the dialplan.</synopsis> + <syntax> + <channel_snapshot/> + <parameter name="UserEvent"> + <para>The event name, as specified in the dialplan.</para> + </parameter> + </syntax> + <description> + <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para> + </description> + <see-also> + <ref type="application">UserEvent</ref> + </see-also> + </managerEventInstance> + </managerEvent> +***/ /*! * \page stasis-impl Stasis Implementation Notes @@ -974,10 +997,241 @@ void stasis_log_bad_type_access(const char *name) ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name); } +/*! \brief A multi object blob data structure to carry user event stasis messages */ +struct ast_multi_object_blob { + struct ast_json *blob; /*< A blob of JSON data */ + AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */ +}; + +/*! + * \internal + * \brief Destructor for \ref ast_multi_object_blob objects + */ +static void multi_object_blob_dtor(void *obj) +{ + struct ast_multi_object_blob *multi = obj; + int type; + int i; + + for (type = 0; type < STASIS_UMOS_MAX; ++type) { + for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) { + ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i)); + } + AST_VECTOR_FREE(&multi->snapshots[type]); + } + ast_json_unref(multi->blob); +} + +/*! \brief Create a stasis user event multi object blob */ +struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob) +{ + int type; + RAII_VAR(struct ast_multi_object_blob *, multi, + ao2_alloc(sizeof(*multi), multi_object_blob_dtor), + ao2_cleanup); + + ast_assert(blob != NULL); + + if (!multi) { + return NULL; + } + + for (type = 0; type < STASIS_UMOS_MAX; ++type) { + if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) { + return NULL; + } + } + + multi->blob = ast_json_ref(blob); + + ao2_ref(multi, +1); + return multi; +} + +/*! \brief Add an object (snapshot) to the blob */ +void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, + enum stasis_user_multi_object_snapshot_type type, void *object) +{ + if (!multi || !object) { + return; + } + AST_VECTOR_APPEND(&multi->snapshots[type],object); +} + +/*! \brief Publish single channel user event (for app_userevent compatibility) */ +void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, + struct stasis_message_type *type, struct ast_json *blob) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup); + RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup); + + multi = ast_multi_object_blob_create(blob); + if (!multi) { + return; + } + + channel_snapshot = ast_channel_snapshot_create(chan); + ao2_ref(channel_snapshot, +1); + ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot); + + message = stasis_message_create(type, multi); + if (message) { + /* app_userevent still publishes to channel */ + stasis_publish(ast_channel_topic(chan), message); + } +} + +/*! \internal \brief convert multi object blob to ari json */ +static struct ast_json *multi_user_event_to_json( + struct stasis_message *message, + const struct stasis_message_sanitizer *sanitize) +{ + RAII_VAR(struct ast_json *, out, NULL, ast_json_unref); + struct ast_multi_object_blob *multi = stasis_message_data(message); + struct ast_json *blob = multi->blob; + const struct timeval *tv = stasis_message_timestamp(message); + enum stasis_user_multi_object_snapshot_type type; + int i; + + out = ast_json_object_create(); + if (!out) { + return NULL; + } + + ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent")); + ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL)); + ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname"))); + ast_json_object_set(out, "userevent", ast_json_ref(blob)); /* eventname gets duplicated, that's ok */ + + for (type = 0; type < STASIS_UMOS_MAX; ++type) { + for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) { + struct ast_json *json_object = NULL; + char *name = NULL; + void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i); + + switch (type) { + case STASIS_UMOS_CHANNEL: + json_object = ast_channel_snapshot_to_json(snapshot, sanitize); + name = "channel"; + break; + case STASIS_UMOS_BRIDGE: + json_object = ast_bridge_snapshot_to_json(snapshot, sanitize); + name = "bridge"; + break; + case STASIS_UMOS_ENDPOINT: + json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize); + name = "endpoint"; + break; + } + if (json_object) { + ast_json_object_set(out, name, json_object); + } + } + } + return ast_json_ref(out); +} + +/*! \internal \brief convert multi object blob to ami string */ +static struct ast_str *multi_object_blob_to_ami(void *obj) +{ + struct ast_str *ami_str=ast_str_create(1024); + struct ast_str *ami_snapshot; + const struct ast_multi_object_blob *multi = obj; + enum stasis_user_multi_object_snapshot_type type; + int i; + + if (!ami_str) { + return NULL; + } + if (!multi) { + ast_free(ami_str); + return NULL; + } + + for (type = 0; type < STASIS_UMOS_MAX; ++type) { + for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) { + char *name = ""; + void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i); + ami_snapshot = NULL; + + if (i > 0) { + ast_asprintf(&name, "%d", i + 1); + } + + switch (type) { + case STASIS_UMOS_CHANNEL: + ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name); + break; + + case STASIS_UMOS_BRIDGE: + ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name); + break; + + case STASIS_UMOS_ENDPOINT: + /* currently not sending endpoint snapshots to AMI */ + break; + } + if (ami_snapshot) { + ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot)); + ast_free(ami_snapshot); + } + } + } + + return ami_str; +} + +/*! \internal \brief Callback to pass only user defined parameters from blob */ +static int userevent_exclusion_cb(const char *key) +{ + if (!strcmp("eventname", key)) { + return 1; + } + return 0; +} + +static struct ast_manager_event_blob *multi_user_event_to_ami( + struct stasis_message *message) +{ + RAII_VAR(struct ast_str *, object_string, NULL, ast_free); + RAII_VAR(struct ast_str *, body, NULL, ast_free); + struct ast_multi_object_blob *multi = stasis_message_data(message); + const char *eventname; + + eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname")); + body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb); + object_string = multi_object_blob_to_ami(multi); + if (!object_string || !body) { + return NULL; + } + + return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent", + "%s" + "UserEvent: %s\r\n" + "%s", + ast_str_buffer(object_string), + eventname, + ast_str_buffer(body)); +} + + +/*! + * @{ \brief Define multi user event message type(s). + */ + +STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type, + .to_json = multi_user_event_to_json, + .to_ami = multi_user_event_to_ami, + ); + +/*! @} */ + /*! \brief Cleanup function for graceful shutdowns */ static void stasis_cleanup(void) { STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type); } int stasis_init(void) @@ -995,6 +1249,10 @@ int stasis_init(void) if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) { return -1; } + if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) { + return -1; + } return 0; } + diff --git a/main/stasis_channels.c b/main/stasis_channels.c index 3522fb291..49fd3f995 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -916,28 +916,6 @@ static struct ast_json *dtmf_end_to_json( "channel", json_channel); } -static struct ast_json *user_event_to_json( - struct stasis_message *message, - const struct stasis_message_sanitizer *sanitize) -{ - struct ast_channel_blob *channel_blob = stasis_message_data(message); - struct ast_json *blob = channel_blob->blob; - struct ast_channel_snapshot *snapshot = channel_blob->snapshot; - const struct timeval *tv = stasis_message_timestamp(message); - struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize); - - if (!json_channel) { - return NULL; - } - - return ast_json_pack("{s: s, s: o, s: O, s: O, s: o}", - "type", "ChannelUserevent", - "timestamp", ast_json_timeval(*tv, NULL), - "eventname", ast_json_object_get(blob, "eventname"), - "userevent", blob, - "channel", json_channel); -} - static struct ast_json *varset_to_json( struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) @@ -1007,9 +985,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type, .to_ami = varset_to_ami, .to_json = varset_to_json, ); -STASIS_MESSAGE_TYPE_DEFN(ast_channel_user_event_type, - .to_json = user_event_to_json, - ); STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type, .to_json = hangup_request_to_json, ); @@ -1048,7 +1023,6 @@ static void stasis_channels_cleanup(void) STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_user_event_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type); @@ -1097,7 +1071,6 @@ int ast_stasis_channels_init(void) res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type); - res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type); diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index 3f8d32419..e3f5a3f15 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -193,7 +193,11 @@ struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_endpoint_snapshot *snapshot; - ast_asprintf(&id, "%s/%s", tech, name); + if (ast_strlen_zero(name)) { + ast_asprintf(&id, "%s", tech); + } else { + ast_asprintf(&id, "%s/%s", tech, name); + } if (!id) { return NULL; } diff --git a/res/ari/ari_model_validators.c b/res/ari/ari_model_validators.c index 5c24b9c41..fa38155bd 100644 --- a/res/ari/ari_model_validators.c +++ b/res/ari/ari_model_validators.c @@ -3076,7 +3076,6 @@ int ast_ari_validate_channel_userevent(struct ast_json *json) struct ast_json_iter *iter; int has_type = 0; int has_application = 0; - int has_channel = 0; int has_eventname = 0; int has_userevent = 0; @@ -3110,9 +3109,17 @@ int ast_ari_validate_channel_userevent(struct ast_json *json) res = 0; } } else + if (strcmp("bridge", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_bridge( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI ChannelUserevent field bridge failed validation\n"); + res = 0; + } + } else if (strcmp("channel", ast_json_object_iter_key(iter)) == 0) { int prop_is_valid; - has_channel = 1; prop_is_valid = ast_ari_validate_channel( ast_json_object_iter_value(iter)); if (!prop_is_valid) { @@ -3120,6 +3127,15 @@ int ast_ari_validate_channel_userevent(struct ast_json *json) res = 0; } } else + if (strcmp("endpoint", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_endpoint( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI ChannelUserevent field endpoint failed validation\n"); + res = 0; + } + } else if (strcmp("eventname", ast_json_object_iter_key(iter)) == 0) { int prop_is_valid; has_eventname = 1; @@ -3158,11 +3174,6 @@ int ast_ari_validate_channel_userevent(struct ast_json *json) res = 0; } - if (!has_channel) { - ast_log(LOG_ERROR, "ARI ChannelUserevent missing required field channel\n"); - res = 0; - } - if (!has_eventname) { ast_log(LOG_ERROR, "ARI ChannelUserevent missing required field eventname\n"); res = 0; diff --git a/res/ari/ari_model_validators.h b/res/ari/ari_model_validators.h index 7214a5875..a4512a1c7 100644 --- a/res/ari/ari_model_validators.h +++ b/res/ari/ari_model_validators.h @@ -1278,7 +1278,9 @@ ari_validator ast_ari_validate_application_fn(void); * - type: string (required) * - application: string (required) * - timestamp: Date - * - channel: Channel (required) + * - bridge: Bridge + * - channel: Channel + * - endpoint: Endpoint * - eventname: string (required) * - userevent: object (required) * ChannelVarset diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c index 098049fc6..d159741c2 100644 --- a/res/ari/resource_events.c +++ b/res/ari/resource_events.c @@ -217,3 +217,59 @@ void ast_ari_websocket_events_event_websocket(struct ast_ari_websocket_session * ast_json_unref(msg); } } + +void ast_ari_events_user_event(struct ast_variable *headers, + struct ast_ari_events_user_event_args *args, + struct ast_ari_response *response) +{ + enum stasis_app_user_event_res res; + struct ast_json *json_variables = NULL; + + if (args->variables) { + ast_ari_events_user_event_parse_body(args->variables, args); + json_variables = ast_json_object_get(args->variables, "variables"); + } + + if (ast_strlen_zero(args->application)) { + ast_ari_response_error(response, 400, "Bad Request", + "Missing parameter application"); + return; + } + + res = stasis_app_user_event(args->application, + args->event_name, + args->source, args->source_count, + json_variables); + + switch (res) { + case STASIS_APP_USER_OK: + ast_ari_response_no_content(response); + break; + + case STASIS_APP_USER_APP_NOT_FOUND: + ast_ari_response_error(response, 404, "Not Found", + "Application not found"); + break; + + case STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND: + ast_ari_response_error(response, 422, "Unprocessable Entity", + "Event source was not found"); + break; + + case STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME: + ast_ari_response_error(response, 400, "Bad Request", + "Invalid event source URI scheme"); + break; + + case STASIS_APP_USER_USEREVENT_INVALID: + ast_ari_response_error(response, 400, "Bad Request", + "Invalid userevnet data"); + break; + + case STASIS_APP_USER_INTERNAL_ERROR: + default: + ast_ari_response_error(response, 500, "Internal Server Error", + "Error processing request"); + } +} + diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h index 96ee5b3ac..08077098b 100644 --- a/res/ari/resource_events.h +++ b/res/ari/resource_events.h @@ -56,5 +56,39 @@ struct ast_ari_events_event_websocket_args { * \param args Swagger parameters. */ void ast_ari_websocket_events_event_websocket(struct ast_ari_websocket_session *session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args); +/*! Argument struct for ast_ari_events_user_event() */ +struct ast_ari_events_user_event_args { + /*! Event name */ + const char *event_name; + /*! The name of the application that will receive this event */ + const char *application; + /*! Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName} */ + const char **source; + /*! Length of source array. */ + size_t source_count; + /*! Parsing context for source. */ + char *source_parse; + /*! custom key/value pairs added to the user event */ + struct ast_json *variables; +}; +/*! + * \brief Body parsing function for /events/user/{eventName}. + * \param body The JSON body from which to parse parameters. + * \param[out] args The args structure to parse into. + * \retval zero on success + * \retval non-zero on failure + */ +int ast_ari_events_user_event_parse_body( + struct ast_json *body, + struct ast_ari_events_user_event_args *args); + +/*! + * \brief Generate a user event. + * + * \param headers HTTP headers + * \param args Swagger parameters + * \param[out] response HTTP response + */ +void ast_ari_events_user_event(struct ast_variable *headers, struct ast_ari_events_user_event_args *args, struct ast_ari_response *response); #endif /* _ASTERISK_RESOURCE_EVENTS_H */ diff --git a/res/res_ari_events.c b/res/res_ari_events.c index f5a3facd6..4e56789e0 100644 --- a/res/res_ari_events.c +++ b/res/res_ari_events.c @@ -149,15 +149,203 @@ fin: __attribute__((unused)) ast_free(args.app_parse); ast_free(args.app); } +int ast_ari_events_user_event_parse_body( + struct ast_json *body, + struct ast_ari_events_user_event_args *args) +{ + struct ast_json *field; + /* Parse query parameters out of it */ + field = ast_json_object_get(body, "application"); + if (field) { + args->application = ast_json_string_get(field); + } + field = ast_json_object_get(body, "source"); + if (field) { + /* If they were silly enough to both pass in a query param and a + * JSON body, free up the query value. + */ + ast_free(args->source); + if (ast_json_typeof(field) == AST_JSON_ARRAY) { + /* Multiple param passed as array */ + size_t i; + args->source_count = ast_json_array_size(field); + args->source = ast_malloc(sizeof(*args->source) * args->source_count); + + if (!args->source) { + return -1; + } + + for (i = 0; i < args->source_count; ++i) { + args->source[i] = ast_json_string_get(ast_json_array_get(field, i)); + } + } else { + /* Multiple param passed as single value */ + args->source_count = 1; + args->source = ast_malloc(sizeof(*args->source) * args->source_count); + if (!args->source) { + return -1; + } + args->source[0] = ast_json_string_get(field); + } + } + return 0; +} + +/*! + * \brief Parameter parsing callback for /events/user/{eventName}. + * \param get_params GET parameters in the HTTP request. + * \param path_vars Path variables extracted from the request. + * \param headers HTTP headers. + * \param[out] response Response to the HTTP request. + */ +static void ast_ari_events_user_event_cb( + struct ast_tcptls_session_instance *ser, + struct ast_variable *get_params, struct ast_variable *path_vars, + struct ast_variable *headers, struct ast_ari_response *response) +{ + struct ast_ari_events_user_event_args args = {}; + struct ast_variable *i; + RAII_VAR(struct ast_json *, body, NULL, ast_json_unref); +#if defined(AST_DEVMODE) + int is_valid; + int code; +#endif /* AST_DEVMODE */ + + for (i = get_params; i; i = i->next) { + if (strcmp(i->name, "application") == 0) { + args.application = (i->value); + } else + if (strcmp(i->name, "source") == 0) { + /* Parse comma separated list */ + char *vals[MAX_VALS]; + size_t j; + + args.source_parse = ast_strdup(i->value); + if (!args.source_parse) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (strlen(args.source_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.source_count = 1; + vals[0] = args.source_parse; + } else { + args.source_count = ast_app_separate_args( + args.source_parse, ',', vals, + ARRAY_LEN(vals)); + } + + if (args.source_count == 0) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (args.source_count >= MAX_VALS) { + ast_ari_response_error(response, 400, + "Bad Request", + "Too many values for source"); + goto fin; + } + + args.source = ast_malloc(sizeof(*args.source) * args.source_count); + if (!args.source) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + for (j = 0; j < args.source_count; ++j) { + args.source[j] = (vals[j]); + } + } else + {} + } + for (i = path_vars; i; i = i->next) { + if (strcmp(i->name, "eventName") == 0) { + args.event_name = (i->value); + } else + {} + } + /* Look for a JSON request entity */ + body = ast_http_get_json(ser, headers); + if (!body) { + switch (errno) { + case EFBIG: + ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large"); + goto fin; + case ENOMEM: + ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request"); + goto fin; + case EIO: + ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body"); + goto fin; + } + } + args.variables = ast_json_ref(body); + ast_ari_events_user_event(headers, &args, response); +#if defined(AST_DEVMODE) + code = response->response_code; + + switch (code) { + case 0: /* Implementation is still a stub, or the code wasn't set */ + is_valid = response->message == NULL; + break; + case 500: /* Internal Server Error */ + case 501: /* Not Implemented */ + case 404: /* Application does not exist. */ + case 422: /* Event source not found. */ + case 400: /* Invalid even tsource URI or userevent data. */ + is_valid = 1; + break; + default: + if (200 <= code && code <= 299) { + is_valid = ast_ari_validate_void( + response->message); + } else { + ast_log(LOG_ERROR, "Invalid error response %d for /events/user/{eventName}\n", code); + is_valid = 0; + } + } + + if (!is_valid) { + ast_log(LOG_ERROR, "Response validation failed for /events/user/{eventName}\n"); + ast_ari_response_error(response, 500, + "Internal Server Error", "Response validation failed"); + } +#endif /* AST_DEVMODE */ + +fin: __attribute__((unused)) + ast_free(args.source_parse); + ast_free(args.source); + return; +} /*! \brief REST handler for /api-docs/events.{format} */ -static struct stasis_rest_handlers events = { - .path_segment = "events", +static struct stasis_rest_handlers events_user_eventName = { + .path_segment = "eventName", + .is_wildcard = 1, .callbacks = { + [AST_HTTP_POST] = ast_ari_events_user_event_cb, }, .num_children = 0, .children = { } }; +/*! \brief REST handler for /api-docs/events.{format} */ +static struct stasis_rest_handlers events_user = { + .path_segment = "user", + .callbacks = { + }, + .num_children = 1, + .children = { &events_user_eventName, } +}; +/*! \brief REST handler for /api-docs/events.{format} */ +static struct stasis_rest_handlers events = { + .path_segment = "events", + .callbacks = { + }, + .num_children = 1, + .children = { &events_user, } +}; static int load_module(void) { diff --git a/res/res_stasis.c b/res/res_stasis.c index d9542cd21..0184d209c 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -61,6 +61,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_app_impl.h" #include "asterisk/stasis_channels.h" #include "asterisk/stasis_bridges.h" +#include "asterisk/stasis_endpoints.h" #include "asterisk/stasis_message_router.h" #include "asterisk/strings.h" #include "stasis/app.h" @@ -1310,6 +1311,89 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name, json, app_unsubscribe); } +enum stasis_app_user_event_res stasis_app_user_event(const char *app_name, + const char *event_name, + const char **source_uris, int sources_count, + struct ast_json *json_variables) +{ + RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup); + RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); + RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup); + RAII_VAR(void *, obj, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + enum stasis_app_subscribe_res res = STASIS_APP_USER_INTERNAL_ERROR; + struct ast_json *json_value; + int have_channel = 0; + int i; + + if (!app) { + ast_log(LOG_WARNING, "App %s not found\n", app_name); + return STASIS_APP_USER_APP_NOT_FOUND; + } + + blob = json_variables; + if (!blob) { + blob = ast_json_pack("{}"); + } + json_value = ast_json_string_create(event_name); + if (!json_value) { + ast_log(LOG_ERROR, "unable to create json string\n"); + return res; + } + if (ast_json_object_set(blob, "eventname", json_value)) { + ast_log(LOG_ERROR, "unable to set eventname to blob\n"); + return res; + } + + multi = ast_multi_object_blob_create(blob); + + for (i = 0; i < sources_count; ++i) { + const char *uri = source_uris[i]; + void *snapshot=NULL; + enum stasis_user_multi_object_snapshot_type type; + + if (ast_begins_with(uri, "channel:")) { + type = STASIS_UMOS_CHANNEL; + snapshot = ast_channel_snapshot_get_latest(uri + 8); + have_channel = 1; + } else if (ast_begins_with(uri, "bridge:")) { + type = STASIS_UMOS_BRIDGE; + snapshot = ast_bridge_snapshot_get_latest(uri + 7); + } else if (ast_begins_with(uri, "endpoint:")) { + type = STASIS_UMOS_ENDPOINT; + snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL); + } else { + ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri); + return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME; + } + if (!snapshot) { + ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri); + return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND; + } + ast_multi_object_blob_add(multi, type, snapshot); + } + + message = stasis_message_create(ast_multi_user_event_type(), multi); + if (!message) { + ast_log(LOG_ERROR, "Unable to create stasis user event message\n"); + return res; + } + + /* + * Publishing to two different topics is normally to be avoided -- except + * in this case both are final destinations with no forwards (only listeners). + * The message has to be delivered to the application topic for ARI, but a + * copy is also delivered directly to the manager for AMI if there is a channel. + */ + stasis_publish(ast_app_get_topic(app), message); + + if (have_channel) { + stasis_publish(ast_manager_get_topic(), message); + } + + return STASIS_APP_USER_OK; +} + void stasis_app_ref(void) { ast_module_ref(ast_module_info->self); diff --git a/res/stasis/app.c b/res/stasis/app.c index 9fcf848e6..4dcb635ef 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -795,6 +795,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat return app; } +struct stasis_topic *ast_app_get_topic(struct stasis_app *app) { + return app->topic; +} + /*! * \brief Send a message to the given application. * \param app App to send the message to. diff --git a/rest-api/api-docs/events.json b/rest-api/api-docs/events.json index 20c8423bc..f19b0b189 100644 --- a/rest-api/api-docs/events.json +++ b/rest-api/api-docs/events.json @@ -30,6 +30,66 @@ ] } ] + }, + { + "path": "/events/user/{eventName}", + "description": "Stasis application user events", + "operations": [ + { + "httpMethod": "POST", + "summary": "Generate a user event.", + "nickname": "userEvent", + "responseClass": "void", + "parameters": [ + { + "name": "eventName", + "description": "Event name", + "paramType": "path", + "required": true, + "allowMultiple": false, + "dataType": "string" + }, + { + "name": "application", + "description": "The name of the application that will receive this event", + "paramType": "query", + "required": true, + "allowMultiple": false, + "dataType": "string" + }, + { + "name": "source", + "description": "URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName}", + "paramType": "query", + "required": false, + "allowMultiple": true, + "dataType": "string" + }, + { + "name": "variables", + "description": "custom key/value pairs added to the user event", + "paramType": "body", + "required": false, + "allowMultiple": false, + "dataType": "containers" + } + ], + "errorResponses": [ + { + "code": 404, + "reason": "Application does not exist." + }, + { + "code": 422, + "reason": "Event source not found." + }, + { + "code": 400, + "reason": "Invalid even tsource URI or userevent data." + } + ] + } + ] } ], "models": { @@ -451,9 +511,19 @@ "description": "The name of the user event." }, "channel": { - "required": true, + "required": false, "type": "Channel", - "description": "The channel that signaled the user event." + "description": "A channel that is signaled with the user event." + }, + "bridge": { + "required": false, + "type": "Bridge", + "description": "A bridge that is signaled with the user event." + }, + "endpoint": { + "required": false, + "type": "Endpoint", + "description": "A endpoint that is signaled with the user event." }, "userevent": { "required": true, |