summaryrefslogtreecommitdiff
path: root/res
diff options
context:
space:
mode:
Diffstat (limited to 'res')
-rw-r--r--res/ari/ari_model_validators.c228
-rw-r--r--res/ari/ari_model_validators.h68
-rw-r--r--res/ari/resource_channels.c34
-rw-r--r--res/ari/resource_endpoints.c105
-rw-r--r--res/ari/resource_endpoints.h60
-rw-r--r--res/res_ari_endpoints.c234
-rw-r--r--res/res_pjsip_messaging.c129
-rw-r--r--res/res_stasis.c8
-rw-r--r--res/res_xmpp.c11
-rw-r--r--res/stasis/app.c46
-rw-r--r--res/stasis/messaging.c531
-rw-r--r--res/stasis/messaging.h83
12 files changed, 1429 insertions, 108 deletions
diff --git a/res/ari/ari_model_validators.c b/res/ari/ari_model_validators.c
index 0302db327..10fd3bd83 100644
--- a/res/ari/ari_model_validators.c
+++ b/res/ari/ari_model_validators.c
@@ -588,6 +588,140 @@ ari_validator ast_ari_validate_endpoint_fn(void)
return ast_ari_validate_endpoint;
}
+int ast_ari_validate_text_message(struct ast_json *json)
+{
+ int res = 1;
+ struct ast_json_iter *iter;
+ int has_body = 0;
+ int has_from = 0;
+ int has_to = 0;
+
+ for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+ if (strcmp("body", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_body = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessage field body failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("from", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_from = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessage field from failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("to", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_to = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessage field to failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("variables", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ prop_is_valid = ast_ari_validate_list(
+ ast_json_object_iter_value(iter),
+ ast_ari_validate_text_message_variable);
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessage field variables failed validation\n");
+ res = 0;
+ }
+ } else
+ {
+ ast_log(LOG_ERROR,
+ "ARI TextMessage has undocumented field %s\n",
+ ast_json_object_iter_key(iter));
+ res = 0;
+ }
+ }
+
+ if (!has_body) {
+ ast_log(LOG_ERROR, "ARI TextMessage missing required field body\n");
+ res = 0;
+ }
+
+ if (!has_from) {
+ ast_log(LOG_ERROR, "ARI TextMessage missing required field from\n");
+ res = 0;
+ }
+
+ if (!has_to) {
+ ast_log(LOG_ERROR, "ARI TextMessage missing required field to\n");
+ res = 0;
+ }
+
+ return res;
+}
+
+ari_validator ast_ari_validate_text_message_fn(void)
+{
+ return ast_ari_validate_text_message;
+}
+
+int ast_ari_validate_text_message_variable(struct ast_json *json)
+{
+ int res = 1;
+ struct ast_json_iter *iter;
+ int has_key = 0;
+ int has_value = 0;
+
+ for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+ if (strcmp("key", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_key = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessageVariable field key failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("value", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_value = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessageVariable field value failed validation\n");
+ res = 0;
+ }
+ } else
+ {
+ ast_log(LOG_ERROR,
+ "ARI TextMessageVariable has undocumented field %s\n",
+ ast_json_object_iter_key(iter));
+ res = 0;
+ }
+ }
+
+ if (!has_key) {
+ ast_log(LOG_ERROR, "ARI TextMessageVariable missing required field key\n");
+ res = 0;
+ }
+
+ if (!has_value) {
+ ast_log(LOG_ERROR, "ARI TextMessageVariable missing required field value\n");
+ res = 0;
+ }
+
+ return res;
+}
+
+ari_validator ast_ari_validate_text_message_variable_fn(void)
+{
+ return ast_ari_validate_text_message_variable;
+}
+
int ast_ari_validate_caller_id(struct ast_json *json)
{
int res = 1;
@@ -3890,6 +4024,9 @@ int ast_ari_validate_event(struct ast_json *json)
if (strcmp("StasisStart", discriminator) == 0) {
return ast_ari_validate_stasis_start(json);
} else
+ if (strcmp("TextMessageReceived", discriminator) == 0) {
+ return ast_ari_validate_text_message_received(json);
+ } else
{
ast_log(LOG_ERROR, "ARI Event has undocumented subtype %s\n",
discriminator);
@@ -4061,6 +4198,9 @@ int ast_ari_validate_message(struct ast_json *json)
if (strcmp("StasisStart", discriminator) == 0) {
return ast_ari_validate_stasis_start(json);
} else
+ if (strcmp("TextMessageReceived", discriminator) == 0) {
+ return ast_ari_validate_text_message_received(json);
+ } else
{
ast_log(LOG_ERROR, "ARI Message has undocumented subtype %s\n",
discriminator);
@@ -4724,6 +4864,94 @@ ari_validator ast_ari_validate_stasis_start_fn(void)
return ast_ari_validate_stasis_start;
}
+int ast_ari_validate_text_message_received(struct ast_json *json)
+{
+ int res = 1;
+ struct ast_json_iter *iter;
+ int has_type = 0;
+ int has_application = 0;
+ int has_message = 0;
+
+ for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+ if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_type = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived field type failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("application", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_application = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived field application failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ prop_is_valid = ast_ari_validate_date(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived field timestamp failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("endpoint", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ prop_is_valid = ast_ari_validate_endpoint(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived field endpoint failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("message", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_message = 1;
+ prop_is_valid = ast_ari_validate_text_message(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived field message failed validation\n");
+ res = 0;
+ }
+ } else
+ {
+ ast_log(LOG_ERROR,
+ "ARI TextMessageReceived has undocumented field %s\n",
+ ast_json_object_iter_key(iter));
+ res = 0;
+ }
+ }
+
+ if (!has_type) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field type\n");
+ res = 0;
+ }
+
+ if (!has_application) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field application\n");
+ res = 0;
+ }
+
+ if (!has_message) {
+ ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field message\n");
+ res = 0;
+ }
+
+ return res;
+}
+
+ari_validator ast_ari_validate_text_message_received_fn(void)
+{
+ return ast_ari_validate_text_message_received;
+}
+
int ast_ari_validate_application(struct ast_json *json)
{
int res = 1;
diff --git a/res/ari/ari_model_validators.h b/res/ari/ari_model_validators.h
index 0186168b2..beace67b2 100644
--- a/res/ari/ari_model_validators.h
+++ b/res/ari/ari_model_validators.h
@@ -299,6 +299,42 @@ int ast_ari_validate_endpoint(struct ast_json *json);
ari_validator ast_ari_validate_endpoint_fn(void);
/*!
+ * \brief Validator for TextMessage.
+ *
+ * A text message.
+ *
+ * \param json JSON object to validate.
+ * \returns True (non-zero) if valid.
+ * \returns False (zero) if invalid.
+ */
+int ast_ari_validate_text_message(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_text_message().
+ *
+ * See \ref ast_ari_model_validators.h for more details.
+ */
+ari_validator ast_ari_validate_text_message_fn(void);
+
+/*!
+ * \brief Validator for TextMessageVariable.
+ *
+ * A key/value pair variable in a text message.
+ *
+ * \param json JSON object to validate.
+ * \returns True (non-zero) if valid.
+ * \returns False (zero) if invalid.
+ */
+int ast_ari_validate_text_message_variable(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_text_message_variable().
+ *
+ * See \ref ast_ari_model_validators.h for more details.
+ */
+ari_validator ast_ari_validate_text_message_variable_fn(void);
+
+/*!
* \brief Validator for CallerID.
*
* Caller identification
@@ -1097,6 +1133,24 @@ int ast_ari_validate_stasis_start(struct ast_json *json);
ari_validator ast_ari_validate_stasis_start_fn(void);
/*!
+ * \brief Validator for TextMessageReceived.
+ *
+ * A text message was received from an endpoint.
+ *
+ * \param json JSON object to validate.
+ * \returns True (non-zero) if valid.
+ * \returns False (zero) if invalid.
+ */
+int ast_ari_validate_text_message_received(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_text_message_received().
+ *
+ * See \ref ast_ari_model_validators.h for more details.
+ */
+ari_validator ast_ari_validate_text_message_received_fn(void);
+
+/*!
* \brief Validator for Application.
*
* Details of a Stasis application
@@ -1152,6 +1206,14 @@ ari_validator ast_ari_validate_application_fn(void);
* - resource: string (required)
* - state: string
* - technology: string (required)
+ * TextMessage
+ * - body: string (required)
+ * - from: string (required)
+ * - to: string (required)
+ * - variables: List[TextMessageVariable]
+ * TextMessageVariable
+ * - key: string (required)
+ * - value: string (required)
* CallerID
* - name: string (required)
* - number: string (required)
@@ -1405,6 +1467,12 @@ ari_validator ast_ari_validate_application_fn(void);
* - timestamp: Date
* - args: List[string] (required)
* - channel: Channel (required)
+ * TextMessageReceived
+ * - type: string (required)
+ * - application: string (required)
+ * - timestamp: Date
+ * - endpoint: Endpoint
+ * - message: TextMessage (required)
* Application
* - bridge_ids: List[string] (required)
* - channel_ids: List[string] (required)
diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c
index cef1e7130..d0a1be32e 100644
--- a/res/ari/resource_channels.c
+++ b/res/ari/resource_channels.c
@@ -723,36 +723,6 @@ void ast_ari_channels_list(struct ast_variable *headers,
ast_ari_response_ok(response, ast_json_ref(json));
}
-static int json_to_ast_variables(struct ast_json *json_variables, struct ast_variable **variables)
-{
- struct ast_variable *current = NULL;
- struct ast_json_iter *it_json_var;
-
- for (it_json_var = ast_json_object_iter(json_variables); it_json_var;
- it_json_var = ast_json_object_iter_next(json_variables, it_json_var)) {
- struct ast_variable *new_var;
-
- new_var = ast_variable_new(ast_json_object_iter_key(it_json_var),
- ast_json_string_get(ast_json_object_iter_value(it_json_var)),
- "");
- if (!new_var) {
- ast_variables_destroy(*variables);
- *variables = NULL;
- return 1;
- }
-
- if (!current) {
- *variables = new_var;
- current = *variables;
- } else {
- current->next = new_var;
- current = new_var;
- }
- }
-
- return 0;
-}
-
static void ari_channels_handle_originate_with_id(const char *args_endpoint,
const char *args_extension,
const char *args_context,
@@ -894,7 +864,7 @@ void ast_ari_channels_originate_with_id(struct ast_variable *headers,
ast_ari_channels_originate_with_id_parse_body(args->variables, args);
json_variables = ast_json_object_get(args->variables, "variables");
if (json_variables) {
- if (json_to_ast_variables(json_variables, &variables)) {
+ if (ast_json_to_ast_variables(json_variables, &variables)) {
ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to channel variables\n");
ast_ari_response_alloc_failed(response);
return;
@@ -930,7 +900,7 @@ void ast_ari_channels_originate(struct ast_variable *headers,
ast_ari_channels_originate_parse_body(args->variables, args);
json_variables = ast_json_object_get(args->variables, "variables");
if (json_variables) {
- if (json_to_ast_variables(json_variables, &variables)) {
+ if (ast_json_to_ast_variables(json_variables, &variables)) {
ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to channel variables\n");
ast_ari_response_alloc_failed(response);
return;
diff --git a/res/ari/resource_endpoints.c b/res/ari/resource_endpoints.c
index ff2b150dd..4f91e781d 100644
--- a/res/ari/resource_endpoints.c
+++ b/res/ari/resource_endpoints.c
@@ -174,3 +174,108 @@ void ast_ari_endpoints_get(struct ast_variable *headers,
ast_ari_response_ok(response, json);
}
+
+static void send_message(const char *to, const char *from, const char *body, struct ast_variable *variables, struct ast_ari_response *response)
+{
+ struct ast_variable *current;
+ struct ast_msg *msg;
+ int res = 0;
+
+ if (ast_strlen_zero(to)) {
+ ast_ari_response_error(response, 400, "Bad Request",
+ "To must be specified");
+ return;
+ }
+
+ msg = ast_msg_alloc();
+ if (!msg) {
+ ast_ari_response_alloc_failed(response);
+ return;
+ }
+
+ res |= ast_msg_set_from(msg, "%s", from);
+ res |= ast_msg_set_to(msg, "%s", to);
+
+ if (!ast_strlen_zero(body)) {
+ res |= ast_msg_set_body(msg, "%s", body);
+ }
+
+ for (current = variables; current; current = current->next) {
+ res |= ast_msg_set_var_outbound(msg, current->name, current->value);
+ }
+
+ if (res) {
+ ast_ari_response_alloc_failed(response);
+ ast_msg_destroy(msg);
+ return;
+ }
+
+ if (ast_msg_send(msg, to, from)) {
+ ast_ari_response_error(response, 404, "Not Found",
+ "Endpoint not found");
+ }
+
+ response->message = ast_json_null();
+ response->response_code = 202;
+ response->response_text = "Accepted";
+}
+
+void ast_ari_endpoints_send_message(struct ast_variable *headers,
+ struct ast_ari_endpoints_send_message_args *args,
+ struct ast_ari_response *response)
+{
+ RAII_VAR(struct ast_variable *, variables, NULL, ast_variables_destroy);
+
+ if (args->variables) {
+ struct ast_json *json_variables;
+
+ ast_ari_endpoints_send_message_parse_body(args->variables, args);
+ json_variables = ast_json_object_get(args->variables, "variables");
+ if (json_variables) {
+ if (ast_json_to_ast_variables(json_variables, &variables)) {
+ ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to Asterisk variables\n");
+ ast_ari_response_alloc_failed(response);
+ return;
+ }
+ }
+ }
+
+ send_message(args->to, args->from, args->body, variables, response);
+}
+
+void ast_ari_endpoints_send_message_to_endpoint(struct ast_variable *headers,
+ struct ast_ari_endpoints_send_message_to_endpoint_args *args,
+ struct ast_ari_response *response)
+{
+ RAII_VAR(struct ast_variable *, variables, NULL, ast_variables_destroy);
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ char msg_to[128];
+ char *tech = ast_strdupa(args->tech);
+
+ /* Really, we just want to know if this thing exists */
+ snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource);
+ if (!snapshot) {
+ ast_ari_response_error(response, 404, "Not Found",
+ "Endpoint not found");
+ return;
+ }
+
+ if (args->variables) {
+ struct ast_json *json_variables;
+
+ ast_ari_endpoints_send_message_to_endpoint_parse_body(args->variables, args);
+ json_variables = ast_json_object_get(args->variables, "variables");
+
+ if (json_variables) {
+ if (ast_json_to_ast_variables(json_variables, &variables)) {
+ ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to Asterisk variables\n");
+ ast_ari_response_alloc_failed(response);
+ return;
+ }
+ }
+ }
+
+ snprintf(msg_to, sizeof(msg_to), "%s:%s", ast_str_to_lower(tech), args->resource);
+
+ send_message(msg_to, args->from, args->body, variables, response);
+}
diff --git a/res/ari/resource_endpoints.h b/res/ari/resource_endpoints.h
index 3af81a66f..4391b36e6 100644
--- a/res/ari/resource_endpoints.h
+++ b/res/ari/resource_endpoints.h
@@ -50,6 +50,35 @@ struct ast_ari_endpoints_list_args {
* \param[out] response HTTP response
*/
void ast_ari_endpoints_list(struct ast_variable *headers, struct ast_ari_endpoints_list_args *args, struct ast_ari_response *response);
+/*! Argument struct for ast_ari_endpoints_send_message() */
+struct ast_ari_endpoints_send_message_args {
+ /*! The endpoint resource or technology specific URI to send the message to. Valid resources are sip, pjsip, and xmpp. */
+ const char *to;
+ /*! The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp. */
+ const char *from;
+ /*! The body of the message */
+ const char *body;
+ struct ast_json *variables;
+};
+/*!
+ * \brief Body parsing function for /endpoints/sendMessage.
+ * \param body The JSON body from which to parse parameters.
+ * \param[out] args The args structure to parse into.
+ * \retval zero on success
+ * \retval non-zero on failure
+ */
+int ast_ari_endpoints_send_message_parse_body(
+ struct ast_json *body,
+ struct ast_ari_endpoints_send_message_args *args);
+
+/*!
+ * \brief Send a message to some technology URI or endpoint.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void ast_ari_endpoints_send_message(struct ast_variable *headers, struct ast_ari_endpoints_send_message_args *args, struct ast_ari_response *response);
/*! Argument struct for ast_ari_endpoints_list_by_tech() */
struct ast_ari_endpoints_list_by_tech_args {
/*! Technology of the endpoints (sip,iax2,...) */
@@ -78,5 +107,36 @@ struct ast_ari_endpoints_get_args {
* \param[out] response HTTP response
*/
void ast_ari_endpoints_get(struct ast_variable *headers, struct ast_ari_endpoints_get_args *args, struct ast_ari_response *response);
+/*! Argument struct for ast_ari_endpoints_send_message_to_endpoint() */
+struct ast_ari_endpoints_send_message_to_endpoint_args {
+ /*! Technology of the endpoint */
+ const char *tech;
+ /*! ID of the endpoint */
+ const char *resource;
+ /*! The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp. */
+ const char *from;
+ /*! The body of the message */
+ const char *body;
+ struct ast_json *variables;
+};
+/*!
+ * \brief Body parsing function for /endpoints/{tech}/{resource}/sendMessage.
+ * \param body The JSON body from which to parse parameters.
+ * \param[out] args The args structure to parse into.
+ * \retval zero on success
+ * \retval non-zero on failure
+ */
+int ast_ari_endpoints_send_message_to_endpoint_parse_body(
+ struct ast_json *body,
+ struct ast_ari_endpoints_send_message_to_endpoint_args *args);
+
+/*!
+ * \brief Send a message to some endpoint in a technology.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void ast_ari_endpoints_send_message_to_endpoint(struct ast_variable *headers, struct ast_ari_endpoints_send_message_to_endpoint_args *args, struct ast_ari_response *response);
#endif /* _ASTERISK_RESOURCE_ENDPOINTS_H */
diff --git a/res/res_ari_endpoints.c b/res/res_ari_endpoints.c
index f973c7a53..071d66bec 100644
--- a/res/res_ari_endpoints.c
+++ b/res/res_ari_endpoints.c
@@ -102,6 +102,108 @@ static void ast_ari_endpoints_list_cb(
fin: __attribute__((unused))
return;
}
+int ast_ari_endpoints_send_message_parse_body(
+ struct ast_json *body,
+ struct ast_ari_endpoints_send_message_args *args)
+{
+ struct ast_json *field;
+ /* Parse query parameters out of it */
+ field = ast_json_object_get(body, "to");
+ if (field) {
+ args->to = ast_json_string_get(field);
+ }
+ field = ast_json_object_get(body, "from");
+ if (field) {
+ args->from = ast_json_string_get(field);
+ }
+ field = ast_json_object_get(body, "body");
+ if (field) {
+ args->body = ast_json_string_get(field);
+ }
+ return 0;
+}
+
+/*!
+ * \brief Parameter parsing callback for /endpoints/sendMessage.
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param[out] response Response to the HTTP request.
+ */
+static void ast_ari_endpoints_send_message_cb(
+ struct ast_tcptls_session_instance *ser,
+ struct ast_variable *get_params, struct ast_variable *path_vars,
+ struct ast_variable *headers, struct ast_ari_response *response)
+{
+ struct ast_ari_endpoints_send_message_args args = {};
+ struct ast_variable *i;
+ RAII_VAR(struct ast_json *, body, NULL, ast_json_unref);
+#if defined(AST_DEVMODE)
+ int is_valid;
+ int code;
+#endif /* AST_DEVMODE */
+
+ for (i = get_params; i; i = i->next) {
+ if (strcmp(i->name, "to") == 0) {
+ args.to = (i->value);
+ } else
+ if (strcmp(i->name, "from") == 0) {
+ args.from = (i->value);
+ } else
+ if (strcmp(i->name, "body") == 0) {
+ args.body = (i->value);
+ } else
+ {}
+ }
+ /* Look for a JSON request entity */
+ body = ast_http_get_json(ser, headers);
+ if (!body) {
+ switch (errno) {
+ case EFBIG:
+ ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large");
+ goto fin;
+ case ENOMEM:
+ ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request");
+ goto fin;
+ case EIO:
+ ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body");
+ goto fin;
+ }
+ }
+ args.variables = ast_json_ref(body);
+ ast_ari_endpoints_send_message(headers, &args, response);
+#if defined(AST_DEVMODE)
+ code = response->response_code;
+
+ switch (code) {
+ case 0: /* Implementation is still a stub, or the code wasn't set */
+ is_valid = response->message == NULL;
+ break;
+ case 500: /* Internal Server Error */
+ case 501: /* Not Implemented */
+ case 404: /* Endpoint not found */
+ is_valid = 1;
+ break;
+ default:
+ if (200 <= code && code <= 299) {
+ is_valid = ast_ari_validate_void(
+ response->message);
+ } else {
+ ast_log(LOG_ERROR, "Invalid error response %d for /endpoints/sendMessage\n", code);
+ is_valid = 0;
+ }
+ }
+
+ if (!is_valid) {
+ ast_log(LOG_ERROR, "Response validation failed for /endpoints/sendMessage\n");
+ ast_ari_response_error(response, 500,
+ "Internal Server Error", "Response validation failed");
+ }
+#endif /* AST_DEVMODE */
+
+fin: __attribute__((unused))
+ return;
+}
/*!
* \brief Parameter parsing callback for /endpoints/{tech}.
* \param get_params GET parameters in the HTTP request.
@@ -200,6 +302,7 @@ static void ast_ari_endpoints_get_cb(
break;
case 500: /* Internal Server Error */
case 501: /* Not Implemented */
+ case 400: /* Invalid parameters for sending a message. */
case 404: /* Endpoints not found */
is_valid = 1;
break;
@@ -223,16 +326,139 @@ static void ast_ari_endpoints_get_cb(
fin: __attribute__((unused))
return;
}
+int ast_ari_endpoints_send_message_to_endpoint_parse_body(
+ struct ast_json *body,
+ struct ast_ari_endpoints_send_message_to_endpoint_args *args)
+{
+ struct ast_json *field;
+ /* Parse query parameters out of it */
+ field = ast_json_object_get(body, "from");
+ if (field) {
+ args->from = ast_json_string_get(field);
+ }
+ field = ast_json_object_get(body, "body");
+ if (field) {
+ args->body = ast_json_string_get(field);
+ }
+ return 0;
+}
+
+/*!
+ * \brief Parameter parsing callback for /endpoints/{tech}/{resource}/sendMessage.
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param[out] response Response to the HTTP request.
+ */
+static void ast_ari_endpoints_send_message_to_endpoint_cb(
+ struct ast_tcptls_session_instance *ser,
+ struct ast_variable *get_params, struct ast_variable *path_vars,
+ struct ast_variable *headers, struct ast_ari_response *response)
+{
+ struct ast_ari_endpoints_send_message_to_endpoint_args args = {};
+ struct ast_variable *i;
+ RAII_VAR(struct ast_json *, body, NULL, ast_json_unref);
+#if defined(AST_DEVMODE)
+ int is_valid;
+ int code;
+#endif /* AST_DEVMODE */
+
+ for (i = get_params; i; i = i->next) {
+ if (strcmp(i->name, "from") == 0) {
+ args.from = (i->value);
+ } else
+ if (strcmp(i->name, "body") == 0) {
+ args.body = (i->value);
+ } else
+ {}
+ }
+ for (i = path_vars; i; i = i->next) {
+ if (strcmp(i->name, "tech") == 0) {
+ args.tech = (i->value);
+ } else
+ if (strcmp(i->name, "resource") == 0) {
+ args.resource = (i->value);
+ } else
+ {}
+ }
+ /* Look for a JSON request entity */
+ body = ast_http_get_json(ser, headers);
+ if (!body) {
+ switch (errno) {
+ case EFBIG:
+ ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large");
+ goto fin;
+ case ENOMEM:
+ ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request");
+ goto fin;
+ case EIO:
+ ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body");
+ goto fin;
+ }
+ }
+ args.variables = ast_json_ref(body);
+ ast_ari_endpoints_send_message_to_endpoint(headers, &args, response);
+#if defined(AST_DEVMODE)
+ code = response->response_code;
+
+ switch (code) {
+ case 0: /* Implementation is still a stub, or the code wasn't set */
+ is_valid = response->message == NULL;
+ break;
+ case 500: /* Internal Server Error */
+ case 501: /* Not Implemented */
+ case 400: /* Invalid parameters for sending a message. */
+ case 404: /* Endpoint not found */
+ is_valid = 1;
+ break;
+ default:
+ if (200 <= code && code <= 299) {
+ is_valid = ast_ari_validate_void(
+ response->message);
+ } else {
+ ast_log(LOG_ERROR, "Invalid error response %d for /endpoints/{tech}/{resource}/sendMessage\n", code);
+ is_valid = 0;
+ }
+ }
+
+ if (!is_valid) {
+ ast_log(LOG_ERROR, "Response validation failed for /endpoints/{tech}/{resource}/sendMessage\n");
+ ast_ari_response_error(response, 500,
+ "Internal Server Error", "Response validation failed");
+ }
+#endif /* AST_DEVMODE */
+
+fin: __attribute__((unused))
+ return;
+}
/*! \brief REST handler for /api-docs/endpoints.{format} */
+static struct stasis_rest_handlers endpoints_sendMessage = {
+ .path_segment = "sendMessage",
+ .callbacks = {
+ [AST_HTTP_PUT] = ast_ari_endpoints_send_message_cb,
+ },
+ .num_children = 0,
+ .children = { }
+};
+/*! \brief REST handler for /api-docs/endpoints.{format} */
+static struct stasis_rest_handlers endpoints_tech_resource_sendMessage = {
+ .path_segment = "sendMessage",
+ .callbacks = {
+ [AST_HTTP_PUT] = ast_ari_endpoints_send_message_to_endpoint_cb,
+ },
+ .num_children = 0,
+ .children = { }
+};
+/*! \brief REST handler for /api-docs/endpoints.{format} */
static struct stasis_rest_handlers endpoints_tech_resource = {
.path_segment = "resource",
.is_wildcard = 1,
.callbacks = {
[AST_HTTP_GET] = ast_ari_endpoints_get_cb,
},
- .num_children = 0,
- .children = { }
+ .num_children = 1,
+ .children = { &endpoints_tech_resource_sendMessage, }
};
/*! \brief REST handler for /api-docs/endpoints.{format} */
static struct stasis_rest_handlers endpoints_tech = {
@@ -250,8 +476,8 @@ static struct stasis_rest_handlers endpoints = {
.callbacks = {
[AST_HTTP_GET] = ast_ari_endpoints_list_cb,
},
- .num_children = 1,
- .children = { &endpoints_tech, }
+ .num_children = 2,
+ .children = { &endpoints_sendMessage,&endpoints_tech, }
};
static int load_module(void)
diff --git a/res/res_pjsip_messaging.c b/res/res_pjsip_messaging.c
index f80261417..db9752553 100644
--- a/res/res_pjsip_messaging.c
+++ b/res/res_pjsip_messaging.c
@@ -47,40 +47,10 @@ const pjsip_method pjsip_message_method = {PJSIP_OTHER_METHOD, {"MESSAGE", 7} };
#define MAX_HDR_SIZE 512
#define MAX_BODY_SIZE 1024
-#define MAX_EXTEN_SIZE 256
#define MAX_USER_SIZE 128
/*!
* \internal
- * \brief Determine where in the dialplan a call should go
- *
- * \details This uses the username in the request URI to try to match
- * an extension in an endpoint's context in order to route the call.
- *
- * \param rdata The SIP request
- * \param context The context to use
- * \param exten The extension to use
- */
-static enum pjsip_status_code get_destination(const pjsip_rx_data *rdata, const char *context, char *exten)
-{
- pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri;
- pjsip_sip_uri *sip_ruri;
-
- if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) {
- return PJSIP_SC_UNSUPPORTED_URI_SCHEME;
- }
-
- sip_ruri = pjsip_uri_get_uri(ruri);
- ast_copy_pj_str(exten, &sip_ruri->user, MAX_EXTEN_SIZE);
-
- if (ast_exists_extension(NULL, context, exten, 1, NULL)) {
- return PJSIP_SC_OK;
- }
- return PJSIP_SC_NOT_FOUND;
-}
-
-/*!
- * \internal
* \brief Checks to make sure the request has the correct content type.
*
* \details This module supports the following media types: "text/plain".
@@ -244,7 +214,6 @@ static void update_from(pjsip_tx_data *tdata, char *from)
PJSIP_PARSE_URI_AS_NAMEADDR))) {
pjsip_name_addr *parsed_name_addr = (pjsip_name_addr *)parsed;
pjsip_sip_uri *parsed_uri = pjsip_uri_get_uri(parsed_name_addr->uri);
-
if (pj_strlen(&parsed_name_addr->display)) {
pj_strdup(tdata->pool, &name_addr->display,
&parsed_name_addr->display);
@@ -458,58 +427,62 @@ static char *sip_to_pjsip(char *buf, int size, int capacity)
*/
static enum pjsip_status_code rx_data_to_ast_msg(pjsip_rx_data *rdata, struct ast_msg *msg)
{
-
-#define CHECK_RES(z_) do { if (z_) { ast_msg_destroy(msg); \
- return PJSIP_SC_INTERNAL_SERVER_ERROR; } } while (0)
-
- int size;
- char buf[MAX_BODY_SIZE];
+ struct ast_sip_endpoint *endpt = ast_pjsip_rdata_get_endpoint(rdata);
+ pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri;
+ pjsip_sip_uri *sip_ruri;
pjsip_name_addr *name_addr;
+ char buf[MAX_BODY_SIZE];
const char *field;
- pjsip_status_code code;
- struct ast_sip_endpoint *endpt = ast_pjsip_rdata_get_endpoint(rdata);
const char *context = S_OR(endpt->message_context, endpt->context);
+ char exten[AST_MAX_EXTENSION];
+ int res = 0;
+ int size;
- /* make sure there is an appropriate context and extension*/
- if ((code = get_destination(rdata, context, buf)) != PJSIP_SC_OK) {
- return code;
+ if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) {
+ return PJSIP_SC_UNSUPPORTED_URI_SCHEME;
}
- CHECK_RES(ast_msg_set_context(msg, "%s", context));
- CHECK_RES(ast_msg_set_exten(msg, "%s", buf));
+ sip_ruri = pjsip_uri_get_uri(ruri);
+ ast_copy_pj_str(exten, &sip_ruri->user, AST_MAX_EXTENSION);
+
+ res |= ast_msg_set_context(msg, "%s", context);
+ res |= ast_msg_set_exten(msg, "%s", exten);
/* to header */
name_addr = (pjsip_name_addr *)rdata->msg_info.to->uri;
- if ((size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf)-1)) > 0) {
- buf[size] = '\0';
- /* prepend the tech */
- CHECK_RES(ast_msg_set_to(msg, "%s", sip_to_pjsip(buf, ++size, sizeof(buf)-1)));
+ size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf) - 1);
+ if (size <= 0) {
+ return PJSIP_SC_INTERNAL_SERVER_ERROR;
}
+ buf[size] = '\0';
+ res |= ast_msg_set_to(msg, "%s", sip_to_pjsip(buf, ++size, sizeof(buf) - 1));
/* from header */
name_addr = (pjsip_name_addr *)rdata->msg_info.from->uri;
- if ((size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf)-1)) > 0) {
- buf[size] = '\0';
- CHECK_RES(ast_msg_set_from(msg, "%s", buf));
+ size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf) - 1);
+ if (size <= 0) {
+ return PJSIP_SC_INTERNAL_SERVER_ERROR;
}
+ buf[size] = '\0';
+ res |= ast_msg_set_from(msg, "%s", buf);
- /* receive address */
- field = pj_sockaddr_print(&rdata->pkt_info.src_addr, buf, sizeof(buf)-1, 1);
- CHECK_RES(ast_msg_set_var(msg, "PJSIP_RECVADDR", field));
+ field = pj_sockaddr_print(&rdata->pkt_info.src_addr, buf, sizeof(buf) - 1, 1);
+ res |= ast_msg_set_var(msg, "PJSIP_RECVADDR", field);
- /* body */
if (print_body(rdata, buf, sizeof(buf) - 1) > 0) {
- CHECK_RES(ast_msg_set_body(msg, "%s", buf));
+ res |= ast_msg_set_body(msg, "%s", buf);
}
/* endpoint name */
+ res |= ast_msg_set_tech(msg, "%s", "PJSIP");
+ res |= ast_msg_set_endpoint(msg, "%s", ast_sorcery_object_get_id(endpt));
if (endpt->id.self.name.valid) {
- CHECK_RES(ast_msg_set_var(msg, "PJSIP_PEERNAME", endpt->id.self.name.str));
+ res |= ast_msg_set_var(msg, "PJSIP_ENDPOINT", endpt->id.self.name.str);
}
- CHECK_RES(headers_to_vars(rdata, msg));
+ res |= headers_to_vars(rdata, msg);
- return PJSIP_SC_OK;
+ return !res ? PJSIP_SC_OK : PJSIP_SC_INTERNAL_SERVER_ERROR;
}
struct msg_data {
@@ -547,17 +520,14 @@ static struct msg_data* msg_data_create(const struct ast_msg *msg, const char *t
return NULL;
}
- /* if there is another sip in the uri then we are good,
- otherwise it needs a sip: in front */
- mdata->to = to == skip_sip(to) ? ast_strdup(to - 3) :
- ast_strdup(++to);
+ /* Make sure we start with sip: */
+ mdata->to = ast_begins_with(to, "sip:") ? ast_strdup(++to) : ast_strdup(to - 3);
mdata->from = ast_strdup(from);
/* sometimes from can still contain the tag at this point, so remove it */
if ((tag = strchr(mdata->from, ';'))) {
*tag = '\0';
}
-
return mdata;
}
@@ -577,8 +547,8 @@ static int msg_send(void *data)
mdata->to, &uri), ao2_cleanup);
if (!endpoint) {
- ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not find endpoint and "
- "no default outbound endpoint configured\n");
+ ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not find endpoint '%s' and "
+ "no default outbound endpoint configured\n", mdata->to);
return -1;
}
@@ -598,6 +568,9 @@ static int msg_send(void *data)
vars_to_headers(mdata->msg, tdata);
+ ast_debug(1, "Sending message to '%s' (via endpoint %s) from '%s'\n",
+ mdata->to, ast_sorcery_object_get_id(endpoint), mdata->from);
+
if (ast_sip_send_request(tdata, NULL, endpoint, NULL, NULL)) {
ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not send request\n");
return -1;
@@ -670,24 +643,36 @@ static pj_bool_t module_on_rx_request(pjsip_rx_data *rdata)
return PJ_FALSE;
}
+ code = check_content_type(rdata);
+ if (code != PJSIP_SC_OK) {
+ send_response(rdata, code, NULL, NULL);
+ return PJ_TRUE;
+ }
+
msg = ast_msg_alloc();
if (!msg) {
send_response(rdata, PJSIP_SC_INTERNAL_SERVER_ERROR, NULL, NULL);
return PJ_TRUE;
}
- if ((code = check_content_type(rdata)) != PJSIP_SC_OK) {
+ code = rx_data_to_ast_msg(rdata, msg);
+ if (code != PJSIP_SC_OK) {
send_response(rdata, code, NULL, NULL);
+ ast_msg_destroy(msg);
return PJ_TRUE;
}
- if ((code = rx_data_to_ast_msg(rdata, msg)) == PJSIP_SC_OK) {
- /* send it to the dialplan */
- ast_msg_queue(msg);
- code = PJSIP_SC_ACCEPTED;
+ if (!ast_msg_has_destination(msg)) {
+ ast_debug(1, "MESSAGE request received, but no handler wanted it\n");
+ send_response(rdata, PJSIP_SC_NOT_FOUND, NULL, NULL);
+ ast_msg_destroy(msg);
+ return PJ_TRUE;
}
- send_response(rdata, code, NULL, NULL);
+ /* send it to the messaging core */
+ ast_msg_queue(msg);
+ send_response(rdata, PJSIP_SC_ACCEPTED, NULL, NULL);
+
return PJ_TRUE;
}
diff --git a/res/res_stasis.c b/res/res_stasis.c
index a64feee48..7d5373153 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -66,6 +66,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/strings.h"
#include "stasis/app.h"
#include "stasis/control.h"
+#include "stasis/messaging.h"
#include "stasis/stasis_bridge.h"
#include "asterisk/core_unreal.h"
#include "asterisk/musiconhold.h"
@@ -1433,6 +1434,8 @@ static int unload_module(void)
{
stasis_app_unregister_event_sources();
+ messaging_cleanup();
+
ao2_cleanup(apps_registry);
apps_registry = NULL;
@@ -1495,6 +1498,11 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
+ if (messaging_init()) {
+ unload_module();
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
bridge_stasis_init();
stasis_app_register_event_sources();
diff --git a/res/res_xmpp.c b/res/res_xmpp.c
index f5734ce04..b3c374871 100644
--- a/res/res_xmpp.c
+++ b/res/res_xmpp.c
@@ -3174,16 +3174,27 @@ static int xmpp_pak_message(struct ast_xmpp_client *client, struct ast_xmpp_clie
if (ast_test_flag(&cfg->flags, XMPP_SEND_TO_DIALPLAN)) {
struct ast_msg *msg;
+ struct ast_xmpp_buddy *buddy;
if ((msg = ast_msg_alloc())) {
int res;
ast_xmpp_client_lock(client);
+ buddy = ao2_find(client->buddies, pak->from->partial, OBJ_KEY | OBJ_NOLOCK);
+
res = ast_msg_set_to(msg, "xmpp:%s", cfg->user);
res |= ast_msg_set_from(msg, "xmpp:%s", message->from);
res |= ast_msg_set_body(msg, "%s", message->message);
res |= ast_msg_set_context(msg, "%s", cfg->context);
+ res |= ast_msg_set_tech(msg, "%s", "XMPP");
+ res |= ast_msg_set_endpoint(msg, "%s", client->name);
+
+ if (buddy) {
+ res |= ast_msg_set_var(msg, "XMPP_BUDDY", buddy->id);
+ }
+
+ ao2_cleanup(buddy);
ast_xmpp_client_unlock(client);
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 41f6ccf65..7e7911b9c 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -28,6 +28,7 @@
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "app.h"
+#include "messaging.h"
#include "asterisk/callerid.h"
#include "asterisk/stasis_app.h"
@@ -511,6 +512,44 @@ static struct ast_json *simple_endpoint_event(
"endpoint", json_endpoint);
}
+static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
+{
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ struct ast_json *json_endpoint;
+ struct stasis_app *app = pvt;
+ char *tech;
+ char *resource;
+
+ tech = ast_strdupa(endpoint_id);
+ resource = strchr(tech, '/');
+ if (resource) {
+ resource[0] = '\0';
+ resource++;
+ }
+
+ if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
+ return -1;
+ }
+
+ snapshot = ast_endpoint_latest_snapshot(tech, resource);
+ if (!snapshot) {
+ return -1;
+ }
+
+ json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
+ if (!json_endpoint) {
+ return -1;
+ }
+
+ app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
+ "type", "TextMessageReceived",
+ "timestamp", ast_json_timeval(ast_tvnow(), NULL),
+ "endpoint", json_endpoint,
+ "message", json_msg));
+
+ return 0;
+}
+
static void sub_endpoint_update_handler(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
@@ -1018,6 +1057,10 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
ao2_find(app->forwards, forwards,
OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
OBJ_NODATA);
+
+ if (!strcmp(kind, "endpoint")) {
+ messaging_app_unsubscribe_endpoint(app->name, id);
+ }
}
return 0;
@@ -1148,6 +1191,9 @@ int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+
+ /* Subscribe for messages */
+ messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
++forwards->interested;
diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c
new file mode 100644
index 000000000..47730851f
--- /dev/null
+++ b/res/stasis/messaging.c
@@ -0,0 +1,531 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan@digium.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/message.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/vector.h"
+#include "asterisk/lock.h"
+#include "asterisk/utils.h"
+#include "asterisk/test.h"
+#include "messaging.h"
+
+/*!
+ * \brief Number of buckets for the \ref endpoint_subscriptions container
+ */
+#define ENDPOINTS_NUM_BUCKETS 127
+
+/*! \brief Storage object for an application */
+struct application_tuple {
+ /*! ao2 ref counted private object to pass to the callback */
+ void *pvt;
+ /*! The callback to call when this application has a message */
+ message_received_cb callback;
+ /*! The name (key) of the application */
+ char app_name[];
+};
+
+/*! \brief A subscription to some endpoint or technology */
+struct message_subscription {
+ /*! The applications that have subscribed to this endpoint or tech */
+ AST_VECTOR(, struct application_tuple *) applications;
+ /*! The name of this endpoint or tech */
+ char token[];
+};
+
+/*! \brief The subscriptions to endpoints */
+static struct ao2_container *endpoint_subscriptions;
+
+/*!
+ * \brief The subscriptions to technologies
+ *
+ * \note These are stored separately from standard endpoints, given how
+ * relatively few of them there are.
+ */
+static AST_VECTOR(,struct message_subscription *) tech_subscriptions;
+
+/*! \brief RWLock for \c tech_subscriptions */
+static ast_rwlock_t tech_subscriptions_lock;
+
+/*! \internal \brief Destructor for \c application_tuple */
+static void application_tuple_dtor(void *obj)
+{
+ struct application_tuple *tuple = obj;
+
+ ao2_cleanup(tuple->pvt);
+}
+
+/*! \internal \brief Constructor for \c application_tuple */
+static struct application_tuple *application_tuple_alloc(const char *app_name, message_received_cb callback, void *pvt)
+{
+ struct application_tuple *tuple;
+ size_t size = sizeof(*tuple) + strlen(app_name) + 1;
+
+ ast_assert(callback != NULL);
+
+ tuple = ao2_t_alloc(size, application_tuple_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!tuple) {
+ return NULL;
+ }
+
+ strcpy(tuple->app_name, app_name); /* Safe */
+ tuple->pvt = ao2_bump(pvt);
+ tuple->callback = callback;
+
+ return tuple;
+}
+
+/*! \internal \brief Destructor for \ref message_subscription */
+static void message_subscription_dtor(void *obj)
+{
+ struct message_subscription *sub = obj;
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+ struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+ ao2_cleanup(tuple);
+ }
+ AST_VECTOR_FREE(&sub->applications);
+}
+
+/*! \internal \brief Constructor for \ref message_subscription */
+static struct message_subscription *message_subscription_alloc(const char *token)
+{
+ struct message_subscription *sub;
+ size_t size = sizeof(*sub) + strlen(token) + 1;
+
+ sub = ao2_t_alloc(size, message_subscription_dtor, AO2_ALLOC_OPT_LOCK_RWLOCK);
+ if (!sub) {
+ return NULL;
+ }
+ strcpy(sub->token, token); /* Safe */
+
+ return sub;
+}
+
+/*! AO2 hash function for \ref message_subscription */
+static int message_subscription_hash_cb(const void *obj, const int flags)
+{
+ const struct message_subscription *sub;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ sub = obj;
+ key = sub->token;
+ break;
+ default:
+ /* Hash can only work on something with a full key. */
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_hash(key);
+}
+
+/*! AO2 comparison function for \ref message_subscription */
+static int message_subscription_compare_cb(void *obj, void *arg, int flags)
+{
+ const struct message_subscription *object_left = obj;
+ const struct message_subscription *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->token;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcmp(object_left->token, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /*
+ * We could also use a partial key struct containing a length
+ * so strlen() does not get called for every comparison instead.
+ */
+ cmp = strncmp(object_left->token, right_key, strlen(right_key));
+ break;
+ default:
+ /*
+ * What arg points to is specific to this traversal callback
+ * and has no special meaning to astobj2.
+ */
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ /*
+ * At this point the traversal callback is identical to a sorted
+ * container.
+ */
+ return CMP_MATCH;
+}
+
+/*! \internal \brief Convert a \c ast_msg To/From URI to a Stasis endpoint name */
+static void msg_to_endpoint(const struct ast_msg *msg, char *buf, size_t len)
+{
+ const char *endpoint = ast_msg_get_endpoint(msg);
+
+ snprintf(buf, len, "%s%s%s", ast_msg_get_tech(msg),
+ ast_strlen_zero(endpoint) ? "" : "/",
+ S_OR(endpoint, ""));
+}
+
+/*! \internal
+ * \brief Callback from the \c message API that determines if we can handle
+ * this message
+ */
+static int has_destination_cb(const struct ast_msg *msg)
+{
+ struct message_subscription *sub;
+ int i;
+ char buf[256];
+
+ msg_to_endpoint(msg, buf, sizeof(buf));
+
+ ast_rwlock_rdlock(&tech_subscriptions_lock);
+ for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+ sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+ if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
+ || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ sub = NULL; /* No ref bump! */
+ goto match;
+ }
+
+ }
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+
+ sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+ if (sub) {
+ goto match;
+ }
+
+ ast_debug(1, "No subscription found for %s\n", buf);
+ return 0;
+
+match:
+ ao2_cleanup(sub);
+ return 1;
+}
+
+static struct ast_json *msg_to_json(struct ast_msg *msg)
+{
+ struct ast_json *json_obj;
+ struct ast_json *json_vars;
+ struct ast_msg_var_iterator *it_vars;
+ const char *name;
+ const char *value;
+
+ it_vars = ast_msg_var_iterator_init(msg);
+ if (!it_vars) {
+ return NULL;
+ }
+
+ json_vars = ast_json_array_create();
+ if (!json_vars) {
+ return NULL;
+ }
+
+ while (ast_msg_var_iterator_next(msg, it_vars, &name, &value)) {
+ struct ast_json *json_tuple;
+
+ json_tuple = ast_json_pack("{s: s}", name, value);
+ if (!json_tuple) {
+ ast_json_free(json_vars);
+ return NULL;
+ }
+
+ ast_json_array_append(json_vars, json_tuple);
+ ast_msg_var_unref_current(it_vars);
+ }
+ ast_msg_var_iterator_destroy(it_vars);
+
+ json_obj = ast_json_pack("{s: s, s: s, s: s, s: o}",
+ "from", ast_msg_get_from(msg),
+ "to", ast_msg_get_to(msg),
+ "body", ast_msg_get_body(msg),
+ "variables", json_vars);
+
+ return json_obj;
+}
+
+static int handle_msg_cb(struct ast_msg *msg)
+{
+ struct message_subscription *sub;
+ int i;
+ char buf[256];
+ const char *endpoint_name;
+ struct ast_json *json_msg;
+
+ msg_to_endpoint(msg, buf, sizeof(buf));
+
+ ast_rwlock_rdlock(&tech_subscriptions_lock);
+ for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+ sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+ if (!sub) {
+ continue;
+ }
+
+ if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ ao2_bump(sub);
+ endpoint_name = buf;
+ goto match;
+ }
+ }
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+
+ sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+ if (sub) {
+ endpoint_name = buf;
+ goto match;
+ }
+
+ return -1;
+
+match:
+ ast_debug(3, "Dispatching message for %s\n", endpoint_name);
+
+ json_msg = msg_to_json(msg);
+ if (!json_msg) {
+ ao2_ref(sub, -1);
+ return -1;
+ }
+
+ for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+ struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+ tuple->callback(endpoint_name, json_msg, tuple->pvt);
+ }
+
+ ast_json_unref(json_msg);
+ ao2_ref(sub, -1);
+ return 0;
+}
+
+struct ast_msg_handler ari_msg_handler = {
+ .name = "ari",
+ .handle_msg = handle_msg_cb,
+ .has_destination = has_destination_cb,
+};
+
+static int messaging_subscription_cmp(struct message_subscription *sub, const char *key)
+{
+ return !strcmp(sub->token, key) ? 1 : 0;
+}
+
+static int application_tuple_cmp(struct application_tuple *item, const char *key)
+{
+ return !strcmp(item->app_name, key) ? 1 : 0;
+}
+
+static int is_app_subscribed(struct message_subscription *sub, const char *app_name)
+{
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+ struct application_tuple *tuple;
+
+ tuple = AST_VECTOR_GET(&sub->applications, i);
+ if (tuple && !strcmp(tuple->app_name, app_name)) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+static struct message_subscription *get_subscription(struct ast_endpoint *endpoint)
+{
+ struct message_subscription *sub = NULL;
+
+ if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
+ } else {
+ int i;
+
+ ast_rwlock_rdlock(&tech_subscriptions_lock);
+ for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+ sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+ if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+ ao2_bump(sub);
+ break;
+ }
+ }
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ }
+
+ return sub;
+}
+
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
+{
+ RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
+
+ endpoint = ast_endpoint_find_by_id(endpoint_id);
+ if (!endpoint) {
+ return;
+ }
+
+ sub = get_subscription(endpoint);
+ if (!sub) {
+ return;
+ }
+
+ ao2_lock(sub);
+ if (!is_app_subscribed(sub, app_name)) {
+ ao2_unlock(sub);
+ return;
+ }
+
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
+ if (AST_VECTOR_SIZE(&sub->applications) == 0) {
+ if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ ao2_unlink(endpoint_subscriptions, sub);
+ } else {
+ ast_rwlock_wrlock(&tech_subscriptions_lock);
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+ messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ }
+ }
+ ao2_unlock(sub);
+ ao2_ref(sub, -1);
+
+ ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
+ app_name, ast_endpoint_get_id(endpoint));
+}
+
+static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
+{
+ struct message_subscription *sub = get_subscription(endpoint);
+
+ if (sub) {
+ return sub;
+ }
+
+ sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+ if (!sub) {
+ return NULL;
+ }
+
+ if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ ao2_link(endpoint_subscriptions, sub);
+ } else {
+ ast_rwlock_wrlock(&tech_subscriptions_lock);
+ AST_VECTOR_APPEND(&tech_subscriptions, ao2_bump(sub));
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ }
+
+ return sub;
+}
+
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
+{
+ RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+ struct application_tuple *tuple;
+
+ sub = get_or_create_subscription(endpoint);
+ if (!sub) {
+ return -1;
+ }
+
+ ao2_lock(sub);
+ if (is_app_subscribed(sub, app_name)) {
+ ao2_unlock(sub);
+ return 0;
+ }
+
+ tuple = application_tuple_alloc(app_name, callback, pvt);
+ if (!tuple) {
+ ao2_unlock(sub);
+ return -1;
+ }
+ AST_VECTOR_APPEND(&sub->applications, tuple);
+ ao2_unlock(sub);
+
+ ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
+ app_name, ast_endpoint_get_id(endpoint));
+
+ return 0;
+}
+
+
+int messaging_cleanup(void)
+{
+ ast_msg_handler_unregister(&ari_msg_handler);
+ ao2_ref(endpoint_subscriptions, -1);
+ AST_VECTOR_FREE(&tech_subscriptions);
+ ast_rwlock_destroy(&tech_subscriptions_lock);\
+
+ return 0;
+}
+
+int messaging_init(void)
+{
+ endpoint_subscriptions = ao2_t_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
+ ENDPOINTS_NUM_BUCKETS, message_subscription_hash_cb, NULL,
+ message_subscription_compare_cb, "Endpoint messaging subscription container creation");
+ if (!endpoint_subscriptions) {
+ return -1;
+ }
+
+ if (AST_VECTOR_INIT(&tech_subscriptions, 4)) {
+ ao2_ref(endpoint_subscriptions, -1);
+ return -1;
+ }
+
+ if (ast_rwlock_init(&tech_subscriptions_lock)) {
+ ao2_ref(endpoint_subscriptions, -1);
+ AST_VECTOR_FREE(&tech_subscriptions);
+ return -1;
+ }
+
+ if (ast_msg_handler_register(&ari_msg_handler)) {
+ ao2_ref(endpoint_subscriptions, -1);
+ AST_VECTOR_FREE(&tech_subscriptions);
+ ast_rwlock_destroy(&tech_subscriptions_lock);
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/res/stasis/messaging.h b/res/stasis/messaging.h
new file mode 100644
index 000000000..c69d5d570
--- /dev/null
+++ b/res/stasis/messaging.h
@@ -0,0 +1,83 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_RES_STASIS_MESSAGING_H
+#define _ASTERISK_RES_STASIS_MESSAGING_H
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan@digium.com>
+ * \since 12.4.0
+ */
+
+/*!
+ * \brief Callback handler for when a message is received from the core
+ *
+ * \param endpoint_id The ID of the endpoint that we received the message from
+ * \param json_msg JSON representation of the text message
+ * \param pvt ao2 ref counted pvt passed during registration
+ *
+ * \retval 0 the message was handled
+ * \retval non-zero the message was not handled
+ */
+typedef int (* message_received_cb)(const char *endpoint_id, struct ast_json *json_msg, void *pvt);
+
+/*!
+ * \brief Subscribe for messages from a particular endpoint
+ *
+ * \param app_name Name of the stasis application to unsubscribe from messaging
+ * \param endpoint_id The ID of the endpoint we no longer care about
+ *
+ * \retval 0 success
+ * \retval -1 error
+ */
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id);
+
+/*!
+ * \brief Subscribe an application to an endpoint for messages
+ *
+ * \param app_name The name of the \ref stasis application to subscribe to \c endpoint
+ * \param endpoint The endpoint object to subscribe to
+ * \param message_received_cb The callback to call when a message is received
+ * \param pvt An ao2 ref counted object that will be passed to the callback.
+ *
+ * \retval 0 subscription was successful
+ * \retval -1 subscription failed
+ */
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt);
+
+/*!
+ * \brief Tidy up the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_cleanup(void);
+
+/*!
+ * \brief Initialize the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_init(void);
+
+#endif /* #define _ASTERISK_RES_STASIS_MESSAGING_H */ \ No newline at end of file