diff options
Diffstat (limited to 'res')
-rw-r--r-- | res/ari/ari_model_validators.c | 228 | ||||
-rw-r--r-- | res/ari/ari_model_validators.h | 68 | ||||
-rw-r--r-- | res/ari/resource_channels.c | 34 | ||||
-rw-r--r-- | res/ari/resource_endpoints.c | 105 | ||||
-rw-r--r-- | res/ari/resource_endpoints.h | 60 | ||||
-rw-r--r-- | res/res_ari_endpoints.c | 234 | ||||
-rw-r--r-- | res/res_pjsip_messaging.c | 129 | ||||
-rw-r--r-- | res/res_stasis.c | 8 | ||||
-rw-r--r-- | res/res_xmpp.c | 11 | ||||
-rw-r--r-- | res/stasis/app.c | 46 | ||||
-rw-r--r-- | res/stasis/messaging.c | 531 | ||||
-rw-r--r-- | res/stasis/messaging.h | 83 |
12 files changed, 1429 insertions, 108 deletions
diff --git a/res/ari/ari_model_validators.c b/res/ari/ari_model_validators.c index 0302db327..10fd3bd83 100644 --- a/res/ari/ari_model_validators.c +++ b/res/ari/ari_model_validators.c @@ -588,6 +588,140 @@ ari_validator ast_ari_validate_endpoint_fn(void) return ast_ari_validate_endpoint; } +int ast_ari_validate_text_message(struct ast_json *json) +{ + int res = 1; + struct ast_json_iter *iter; + int has_body = 0; + int has_from = 0; + int has_to = 0; + + for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) { + if (strcmp("body", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_body = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessage field body failed validation\n"); + res = 0; + } + } else + if (strcmp("from", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_from = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessage field from failed validation\n"); + res = 0; + } + } else + if (strcmp("to", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_to = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessage field to failed validation\n"); + res = 0; + } + } else + if (strcmp("variables", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_list( + ast_json_object_iter_value(iter), + ast_ari_validate_text_message_variable); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessage field variables failed validation\n"); + res = 0; + } + } else + { + ast_log(LOG_ERROR, + "ARI TextMessage has undocumented field %s\n", + ast_json_object_iter_key(iter)); + res = 0; + } + } + + if (!has_body) { + ast_log(LOG_ERROR, "ARI TextMessage missing required field body\n"); + res = 0; + } + + if (!has_from) { + ast_log(LOG_ERROR, "ARI TextMessage missing required field from\n"); + res = 0; + } + + if (!has_to) { + ast_log(LOG_ERROR, "ARI TextMessage missing required field to\n"); + res = 0; + } + + return res; +} + +ari_validator ast_ari_validate_text_message_fn(void) +{ + return ast_ari_validate_text_message; +} + +int ast_ari_validate_text_message_variable(struct ast_json *json) +{ + int res = 1; + struct ast_json_iter *iter; + int has_key = 0; + int has_value = 0; + + for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) { + if (strcmp("key", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_key = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessageVariable field key failed validation\n"); + res = 0; + } + } else + if (strcmp("value", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_value = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessageVariable field value failed validation\n"); + res = 0; + } + } else + { + ast_log(LOG_ERROR, + "ARI TextMessageVariable has undocumented field %s\n", + ast_json_object_iter_key(iter)); + res = 0; + } + } + + if (!has_key) { + ast_log(LOG_ERROR, "ARI TextMessageVariable missing required field key\n"); + res = 0; + } + + if (!has_value) { + ast_log(LOG_ERROR, "ARI TextMessageVariable missing required field value\n"); + res = 0; + } + + return res; +} + +ari_validator ast_ari_validate_text_message_variable_fn(void) +{ + return ast_ari_validate_text_message_variable; +} + int ast_ari_validate_caller_id(struct ast_json *json) { int res = 1; @@ -3890,6 +4024,9 @@ int ast_ari_validate_event(struct ast_json *json) if (strcmp("StasisStart", discriminator) == 0) { return ast_ari_validate_stasis_start(json); } else + if (strcmp("TextMessageReceived", discriminator) == 0) { + return ast_ari_validate_text_message_received(json); + } else { ast_log(LOG_ERROR, "ARI Event has undocumented subtype %s\n", discriminator); @@ -4061,6 +4198,9 @@ int ast_ari_validate_message(struct ast_json *json) if (strcmp("StasisStart", discriminator) == 0) { return ast_ari_validate_stasis_start(json); } else + if (strcmp("TextMessageReceived", discriminator) == 0) { + return ast_ari_validate_text_message_received(json); + } else { ast_log(LOG_ERROR, "ARI Message has undocumented subtype %s\n", discriminator); @@ -4724,6 +4864,94 @@ ari_validator ast_ari_validate_stasis_start_fn(void) return ast_ari_validate_stasis_start; } +int ast_ari_validate_text_message_received(struct ast_json *json) +{ + int res = 1; + struct ast_json_iter *iter; + int has_type = 0; + int has_application = 0; + int has_message = 0; + + for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) { + if (strcmp("type", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_type = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessageReceived field type failed validation\n"); + res = 0; + } + } else + if (strcmp("application", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_application = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessageReceived field application failed validation\n"); + res = 0; + } + } else + if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_date( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessageReceived field timestamp failed validation\n"); + res = 0; + } + } else + if (strcmp("endpoint", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_endpoint( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessageReceived field endpoint failed validation\n"); + res = 0; + } + } else + if (strcmp("message", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_message = 1; + prop_is_valid = ast_ari_validate_text_message( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI TextMessageReceived field message failed validation\n"); + res = 0; + } + } else + { + ast_log(LOG_ERROR, + "ARI TextMessageReceived has undocumented field %s\n", + ast_json_object_iter_key(iter)); + res = 0; + } + } + + if (!has_type) { + ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field type\n"); + res = 0; + } + + if (!has_application) { + ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field application\n"); + res = 0; + } + + if (!has_message) { + ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field message\n"); + res = 0; + } + + return res; +} + +ari_validator ast_ari_validate_text_message_received_fn(void) +{ + return ast_ari_validate_text_message_received; +} + int ast_ari_validate_application(struct ast_json *json) { int res = 1; diff --git a/res/ari/ari_model_validators.h b/res/ari/ari_model_validators.h index 0186168b2..beace67b2 100644 --- a/res/ari/ari_model_validators.h +++ b/res/ari/ari_model_validators.h @@ -299,6 +299,42 @@ int ast_ari_validate_endpoint(struct ast_json *json); ari_validator ast_ari_validate_endpoint_fn(void); /*! + * \brief Validator for TextMessage. + * + * A text message. + * + * \param json JSON object to validate. + * \returns True (non-zero) if valid. + * \returns False (zero) if invalid. + */ +int ast_ari_validate_text_message(struct ast_json *json); + +/*! + * \brief Function pointer to ast_ari_validate_text_message(). + * + * See \ref ast_ari_model_validators.h for more details. + */ +ari_validator ast_ari_validate_text_message_fn(void); + +/*! + * \brief Validator for TextMessageVariable. + * + * A key/value pair variable in a text message. + * + * \param json JSON object to validate. + * \returns True (non-zero) if valid. + * \returns False (zero) if invalid. + */ +int ast_ari_validate_text_message_variable(struct ast_json *json); + +/*! + * \brief Function pointer to ast_ari_validate_text_message_variable(). + * + * See \ref ast_ari_model_validators.h for more details. + */ +ari_validator ast_ari_validate_text_message_variable_fn(void); + +/*! * \brief Validator for CallerID. * * Caller identification @@ -1097,6 +1133,24 @@ int ast_ari_validate_stasis_start(struct ast_json *json); ari_validator ast_ari_validate_stasis_start_fn(void); /*! + * \brief Validator for TextMessageReceived. + * + * A text message was received from an endpoint. + * + * \param json JSON object to validate. + * \returns True (non-zero) if valid. + * \returns False (zero) if invalid. + */ +int ast_ari_validate_text_message_received(struct ast_json *json); + +/*! + * \brief Function pointer to ast_ari_validate_text_message_received(). + * + * See \ref ast_ari_model_validators.h for more details. + */ +ari_validator ast_ari_validate_text_message_received_fn(void); + +/*! * \brief Validator for Application. * * Details of a Stasis application @@ -1152,6 +1206,14 @@ ari_validator ast_ari_validate_application_fn(void); * - resource: string (required) * - state: string * - technology: string (required) + * TextMessage + * - body: string (required) + * - from: string (required) + * - to: string (required) + * - variables: List[TextMessageVariable] + * TextMessageVariable + * - key: string (required) + * - value: string (required) * CallerID * - name: string (required) * - number: string (required) @@ -1405,6 +1467,12 @@ ari_validator ast_ari_validate_application_fn(void); * - timestamp: Date * - args: List[string] (required) * - channel: Channel (required) + * TextMessageReceived + * - type: string (required) + * - application: string (required) + * - timestamp: Date + * - endpoint: Endpoint + * - message: TextMessage (required) * Application * - bridge_ids: List[string] (required) * - channel_ids: List[string] (required) diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index cef1e7130..d0a1be32e 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -723,36 +723,6 @@ void ast_ari_channels_list(struct ast_variable *headers, ast_ari_response_ok(response, ast_json_ref(json)); } -static int json_to_ast_variables(struct ast_json *json_variables, struct ast_variable **variables) -{ - struct ast_variable *current = NULL; - struct ast_json_iter *it_json_var; - - for (it_json_var = ast_json_object_iter(json_variables); it_json_var; - it_json_var = ast_json_object_iter_next(json_variables, it_json_var)) { - struct ast_variable *new_var; - - new_var = ast_variable_new(ast_json_object_iter_key(it_json_var), - ast_json_string_get(ast_json_object_iter_value(it_json_var)), - ""); - if (!new_var) { - ast_variables_destroy(*variables); - *variables = NULL; - return 1; - } - - if (!current) { - *variables = new_var; - current = *variables; - } else { - current->next = new_var; - current = new_var; - } - } - - return 0; -} - static void ari_channels_handle_originate_with_id(const char *args_endpoint, const char *args_extension, const char *args_context, @@ -894,7 +864,7 @@ void ast_ari_channels_originate_with_id(struct ast_variable *headers, ast_ari_channels_originate_with_id_parse_body(args->variables, args); json_variables = ast_json_object_get(args->variables, "variables"); if (json_variables) { - if (json_to_ast_variables(json_variables, &variables)) { + if (ast_json_to_ast_variables(json_variables, &variables)) { ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to channel variables\n"); ast_ari_response_alloc_failed(response); return; @@ -930,7 +900,7 @@ void ast_ari_channels_originate(struct ast_variable *headers, ast_ari_channels_originate_parse_body(args->variables, args); json_variables = ast_json_object_get(args->variables, "variables"); if (json_variables) { - if (json_to_ast_variables(json_variables, &variables)) { + if (ast_json_to_ast_variables(json_variables, &variables)) { ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to channel variables\n"); ast_ari_response_alloc_failed(response); return; diff --git a/res/ari/resource_endpoints.c b/res/ari/resource_endpoints.c index ff2b150dd..4f91e781d 100644 --- a/res/ari/resource_endpoints.c +++ b/res/ari/resource_endpoints.c @@ -174,3 +174,108 @@ void ast_ari_endpoints_get(struct ast_variable *headers, ast_ari_response_ok(response, json); } + +static void send_message(const char *to, const char *from, const char *body, struct ast_variable *variables, struct ast_ari_response *response) +{ + struct ast_variable *current; + struct ast_msg *msg; + int res = 0; + + if (ast_strlen_zero(to)) { + ast_ari_response_error(response, 400, "Bad Request", + "To must be specified"); + return; + } + + msg = ast_msg_alloc(); + if (!msg) { + ast_ari_response_alloc_failed(response); + return; + } + + res |= ast_msg_set_from(msg, "%s", from); + res |= ast_msg_set_to(msg, "%s", to); + + if (!ast_strlen_zero(body)) { + res |= ast_msg_set_body(msg, "%s", body); + } + + for (current = variables; current; current = current->next) { + res |= ast_msg_set_var_outbound(msg, current->name, current->value); + } + + if (res) { + ast_ari_response_alloc_failed(response); + ast_msg_destroy(msg); + return; + } + + if (ast_msg_send(msg, to, from)) { + ast_ari_response_error(response, 404, "Not Found", + "Endpoint not found"); + } + + response->message = ast_json_null(); + response->response_code = 202; + response->response_text = "Accepted"; +} + +void ast_ari_endpoints_send_message(struct ast_variable *headers, + struct ast_ari_endpoints_send_message_args *args, + struct ast_ari_response *response) +{ + RAII_VAR(struct ast_variable *, variables, NULL, ast_variables_destroy); + + if (args->variables) { + struct ast_json *json_variables; + + ast_ari_endpoints_send_message_parse_body(args->variables, args); + json_variables = ast_json_object_get(args->variables, "variables"); + if (json_variables) { + if (ast_json_to_ast_variables(json_variables, &variables)) { + ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to Asterisk variables\n"); + ast_ari_response_alloc_failed(response); + return; + } + } + } + + send_message(args->to, args->from, args->body, variables, response); +} + +void ast_ari_endpoints_send_message_to_endpoint(struct ast_variable *headers, + struct ast_ari_endpoints_send_message_to_endpoint_args *args, + struct ast_ari_response *response) +{ + RAII_VAR(struct ast_variable *, variables, NULL, ast_variables_destroy); + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + char msg_to[128]; + char *tech = ast_strdupa(args->tech); + + /* Really, we just want to know if this thing exists */ + snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource); + if (!snapshot) { + ast_ari_response_error(response, 404, "Not Found", + "Endpoint not found"); + return; + } + + if (args->variables) { + struct ast_json *json_variables; + + ast_ari_endpoints_send_message_to_endpoint_parse_body(args->variables, args); + json_variables = ast_json_object_get(args->variables, "variables"); + + if (json_variables) { + if (ast_json_to_ast_variables(json_variables, &variables)) { + ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to Asterisk variables\n"); + ast_ari_response_alloc_failed(response); + return; + } + } + } + + snprintf(msg_to, sizeof(msg_to), "%s:%s", ast_str_to_lower(tech), args->resource); + + send_message(msg_to, args->from, args->body, variables, response); +} diff --git a/res/ari/resource_endpoints.h b/res/ari/resource_endpoints.h index 3af81a66f..4391b36e6 100644 --- a/res/ari/resource_endpoints.h +++ b/res/ari/resource_endpoints.h @@ -50,6 +50,35 @@ struct ast_ari_endpoints_list_args { * \param[out] response HTTP response */ void ast_ari_endpoints_list(struct ast_variable *headers, struct ast_ari_endpoints_list_args *args, struct ast_ari_response *response); +/*! Argument struct for ast_ari_endpoints_send_message() */ +struct ast_ari_endpoints_send_message_args { + /*! The endpoint resource or technology specific URI to send the message to. Valid resources are sip, pjsip, and xmpp. */ + const char *to; + /*! The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp. */ + const char *from; + /*! The body of the message */ + const char *body; + struct ast_json *variables; +}; +/*! + * \brief Body parsing function for /endpoints/sendMessage. + * \param body The JSON body from which to parse parameters. + * \param[out] args The args structure to parse into. + * \retval zero on success + * \retval non-zero on failure + */ +int ast_ari_endpoints_send_message_parse_body( + struct ast_json *body, + struct ast_ari_endpoints_send_message_args *args); + +/*! + * \brief Send a message to some technology URI or endpoint. + * + * \param headers HTTP headers + * \param args Swagger parameters + * \param[out] response HTTP response + */ +void ast_ari_endpoints_send_message(struct ast_variable *headers, struct ast_ari_endpoints_send_message_args *args, struct ast_ari_response *response); /*! Argument struct for ast_ari_endpoints_list_by_tech() */ struct ast_ari_endpoints_list_by_tech_args { /*! Technology of the endpoints (sip,iax2,...) */ @@ -78,5 +107,36 @@ struct ast_ari_endpoints_get_args { * \param[out] response HTTP response */ void ast_ari_endpoints_get(struct ast_variable *headers, struct ast_ari_endpoints_get_args *args, struct ast_ari_response *response); +/*! Argument struct for ast_ari_endpoints_send_message_to_endpoint() */ +struct ast_ari_endpoints_send_message_to_endpoint_args { + /*! Technology of the endpoint */ + const char *tech; + /*! ID of the endpoint */ + const char *resource; + /*! The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp. */ + const char *from; + /*! The body of the message */ + const char *body; + struct ast_json *variables; +}; +/*! + * \brief Body parsing function for /endpoints/{tech}/{resource}/sendMessage. + * \param body The JSON body from which to parse parameters. + * \param[out] args The args structure to parse into. + * \retval zero on success + * \retval non-zero on failure + */ +int ast_ari_endpoints_send_message_to_endpoint_parse_body( + struct ast_json *body, + struct ast_ari_endpoints_send_message_to_endpoint_args *args); + +/*! + * \brief Send a message to some endpoint in a technology. + * + * \param headers HTTP headers + * \param args Swagger parameters + * \param[out] response HTTP response + */ +void ast_ari_endpoints_send_message_to_endpoint(struct ast_variable *headers, struct ast_ari_endpoints_send_message_to_endpoint_args *args, struct ast_ari_response *response); #endif /* _ASTERISK_RESOURCE_ENDPOINTS_H */ diff --git a/res/res_ari_endpoints.c b/res/res_ari_endpoints.c index f973c7a53..071d66bec 100644 --- a/res/res_ari_endpoints.c +++ b/res/res_ari_endpoints.c @@ -102,6 +102,108 @@ static void ast_ari_endpoints_list_cb( fin: __attribute__((unused)) return; } +int ast_ari_endpoints_send_message_parse_body( + struct ast_json *body, + struct ast_ari_endpoints_send_message_args *args) +{ + struct ast_json *field; + /* Parse query parameters out of it */ + field = ast_json_object_get(body, "to"); + if (field) { + args->to = ast_json_string_get(field); + } + field = ast_json_object_get(body, "from"); + if (field) { + args->from = ast_json_string_get(field); + } + field = ast_json_object_get(body, "body"); + if (field) { + args->body = ast_json_string_get(field); + } + return 0; +} + +/*! + * \brief Parameter parsing callback for /endpoints/sendMessage. + * \param get_params GET parameters in the HTTP request. + * \param path_vars Path variables extracted from the request. + * \param headers HTTP headers. + * \param[out] response Response to the HTTP request. + */ +static void ast_ari_endpoints_send_message_cb( + struct ast_tcptls_session_instance *ser, + struct ast_variable *get_params, struct ast_variable *path_vars, + struct ast_variable *headers, struct ast_ari_response *response) +{ + struct ast_ari_endpoints_send_message_args args = {}; + struct ast_variable *i; + RAII_VAR(struct ast_json *, body, NULL, ast_json_unref); +#if defined(AST_DEVMODE) + int is_valid; + int code; +#endif /* AST_DEVMODE */ + + for (i = get_params; i; i = i->next) { + if (strcmp(i->name, "to") == 0) { + args.to = (i->value); + } else + if (strcmp(i->name, "from") == 0) { + args.from = (i->value); + } else + if (strcmp(i->name, "body") == 0) { + args.body = (i->value); + } else + {} + } + /* Look for a JSON request entity */ + body = ast_http_get_json(ser, headers); + if (!body) { + switch (errno) { + case EFBIG: + ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large"); + goto fin; + case ENOMEM: + ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request"); + goto fin; + case EIO: + ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body"); + goto fin; + } + } + args.variables = ast_json_ref(body); + ast_ari_endpoints_send_message(headers, &args, response); +#if defined(AST_DEVMODE) + code = response->response_code; + + switch (code) { + case 0: /* Implementation is still a stub, or the code wasn't set */ + is_valid = response->message == NULL; + break; + case 500: /* Internal Server Error */ + case 501: /* Not Implemented */ + case 404: /* Endpoint not found */ + is_valid = 1; + break; + default: + if (200 <= code && code <= 299) { + is_valid = ast_ari_validate_void( + response->message); + } else { + ast_log(LOG_ERROR, "Invalid error response %d for /endpoints/sendMessage\n", code); + is_valid = 0; + } + } + + if (!is_valid) { + ast_log(LOG_ERROR, "Response validation failed for /endpoints/sendMessage\n"); + ast_ari_response_error(response, 500, + "Internal Server Error", "Response validation failed"); + } +#endif /* AST_DEVMODE */ + +fin: __attribute__((unused)) + return; +} /*! * \brief Parameter parsing callback for /endpoints/{tech}. * \param get_params GET parameters in the HTTP request. @@ -200,6 +302,7 @@ static void ast_ari_endpoints_get_cb( break; case 500: /* Internal Server Error */ case 501: /* Not Implemented */ + case 400: /* Invalid parameters for sending a message. */ case 404: /* Endpoints not found */ is_valid = 1; break; @@ -223,16 +326,139 @@ static void ast_ari_endpoints_get_cb( fin: __attribute__((unused)) return; } +int ast_ari_endpoints_send_message_to_endpoint_parse_body( + struct ast_json *body, + struct ast_ari_endpoints_send_message_to_endpoint_args *args) +{ + struct ast_json *field; + /* Parse query parameters out of it */ + field = ast_json_object_get(body, "from"); + if (field) { + args->from = ast_json_string_get(field); + } + field = ast_json_object_get(body, "body"); + if (field) { + args->body = ast_json_string_get(field); + } + return 0; +} + +/*! + * \brief Parameter parsing callback for /endpoints/{tech}/{resource}/sendMessage. + * \param get_params GET parameters in the HTTP request. + * \param path_vars Path variables extracted from the request. + * \param headers HTTP headers. + * \param[out] response Response to the HTTP request. + */ +static void ast_ari_endpoints_send_message_to_endpoint_cb( + struct ast_tcptls_session_instance *ser, + struct ast_variable *get_params, struct ast_variable *path_vars, + struct ast_variable *headers, struct ast_ari_response *response) +{ + struct ast_ari_endpoints_send_message_to_endpoint_args args = {}; + struct ast_variable *i; + RAII_VAR(struct ast_json *, body, NULL, ast_json_unref); +#if defined(AST_DEVMODE) + int is_valid; + int code; +#endif /* AST_DEVMODE */ + + for (i = get_params; i; i = i->next) { + if (strcmp(i->name, "from") == 0) { + args.from = (i->value); + } else + if (strcmp(i->name, "body") == 0) { + args.body = (i->value); + } else + {} + } + for (i = path_vars; i; i = i->next) { + if (strcmp(i->name, "tech") == 0) { + args.tech = (i->value); + } else + if (strcmp(i->name, "resource") == 0) { + args.resource = (i->value); + } else + {} + } + /* Look for a JSON request entity */ + body = ast_http_get_json(ser, headers); + if (!body) { + switch (errno) { + case EFBIG: + ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large"); + goto fin; + case ENOMEM: + ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request"); + goto fin; + case EIO: + ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body"); + goto fin; + } + } + args.variables = ast_json_ref(body); + ast_ari_endpoints_send_message_to_endpoint(headers, &args, response); +#if defined(AST_DEVMODE) + code = response->response_code; + + switch (code) { + case 0: /* Implementation is still a stub, or the code wasn't set */ + is_valid = response->message == NULL; + break; + case 500: /* Internal Server Error */ + case 501: /* Not Implemented */ + case 400: /* Invalid parameters for sending a message. */ + case 404: /* Endpoint not found */ + is_valid = 1; + break; + default: + if (200 <= code && code <= 299) { + is_valid = ast_ari_validate_void( + response->message); + } else { + ast_log(LOG_ERROR, "Invalid error response %d for /endpoints/{tech}/{resource}/sendMessage\n", code); + is_valid = 0; + } + } + + if (!is_valid) { + ast_log(LOG_ERROR, "Response validation failed for /endpoints/{tech}/{resource}/sendMessage\n"); + ast_ari_response_error(response, 500, + "Internal Server Error", "Response validation failed"); + } +#endif /* AST_DEVMODE */ + +fin: __attribute__((unused)) + return; +} /*! \brief REST handler for /api-docs/endpoints.{format} */ +static struct stasis_rest_handlers endpoints_sendMessage = { + .path_segment = "sendMessage", + .callbacks = { + [AST_HTTP_PUT] = ast_ari_endpoints_send_message_cb, + }, + .num_children = 0, + .children = { } +}; +/*! \brief REST handler for /api-docs/endpoints.{format} */ +static struct stasis_rest_handlers endpoints_tech_resource_sendMessage = { + .path_segment = "sendMessage", + .callbacks = { + [AST_HTTP_PUT] = ast_ari_endpoints_send_message_to_endpoint_cb, + }, + .num_children = 0, + .children = { } +}; +/*! \brief REST handler for /api-docs/endpoints.{format} */ static struct stasis_rest_handlers endpoints_tech_resource = { .path_segment = "resource", .is_wildcard = 1, .callbacks = { [AST_HTTP_GET] = ast_ari_endpoints_get_cb, }, - .num_children = 0, - .children = { } + .num_children = 1, + .children = { &endpoints_tech_resource_sendMessage, } }; /*! \brief REST handler for /api-docs/endpoints.{format} */ static struct stasis_rest_handlers endpoints_tech = { @@ -250,8 +476,8 @@ static struct stasis_rest_handlers endpoints = { .callbacks = { [AST_HTTP_GET] = ast_ari_endpoints_list_cb, }, - .num_children = 1, - .children = { &endpoints_tech, } + .num_children = 2, + .children = { &endpoints_sendMessage,&endpoints_tech, } }; static int load_module(void) diff --git a/res/res_pjsip_messaging.c b/res/res_pjsip_messaging.c index f80261417..db9752553 100644 --- a/res/res_pjsip_messaging.c +++ b/res/res_pjsip_messaging.c @@ -47,40 +47,10 @@ const pjsip_method pjsip_message_method = {PJSIP_OTHER_METHOD, {"MESSAGE", 7} }; #define MAX_HDR_SIZE 512 #define MAX_BODY_SIZE 1024 -#define MAX_EXTEN_SIZE 256 #define MAX_USER_SIZE 128 /*! * \internal - * \brief Determine where in the dialplan a call should go - * - * \details This uses the username in the request URI to try to match - * an extension in an endpoint's context in order to route the call. - * - * \param rdata The SIP request - * \param context The context to use - * \param exten The extension to use - */ -static enum pjsip_status_code get_destination(const pjsip_rx_data *rdata, const char *context, char *exten) -{ - pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri; - pjsip_sip_uri *sip_ruri; - - if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) { - return PJSIP_SC_UNSUPPORTED_URI_SCHEME; - } - - sip_ruri = pjsip_uri_get_uri(ruri); - ast_copy_pj_str(exten, &sip_ruri->user, MAX_EXTEN_SIZE); - - if (ast_exists_extension(NULL, context, exten, 1, NULL)) { - return PJSIP_SC_OK; - } - return PJSIP_SC_NOT_FOUND; -} - -/*! - * \internal * \brief Checks to make sure the request has the correct content type. * * \details This module supports the following media types: "text/plain". @@ -244,7 +214,6 @@ static void update_from(pjsip_tx_data *tdata, char *from) PJSIP_PARSE_URI_AS_NAMEADDR))) { pjsip_name_addr *parsed_name_addr = (pjsip_name_addr *)parsed; pjsip_sip_uri *parsed_uri = pjsip_uri_get_uri(parsed_name_addr->uri); - if (pj_strlen(&parsed_name_addr->display)) { pj_strdup(tdata->pool, &name_addr->display, &parsed_name_addr->display); @@ -458,58 +427,62 @@ static char *sip_to_pjsip(char *buf, int size, int capacity) */ static enum pjsip_status_code rx_data_to_ast_msg(pjsip_rx_data *rdata, struct ast_msg *msg) { - -#define CHECK_RES(z_) do { if (z_) { ast_msg_destroy(msg); \ - return PJSIP_SC_INTERNAL_SERVER_ERROR; } } while (0) - - int size; - char buf[MAX_BODY_SIZE]; + struct ast_sip_endpoint *endpt = ast_pjsip_rdata_get_endpoint(rdata); + pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri; + pjsip_sip_uri *sip_ruri; pjsip_name_addr *name_addr; + char buf[MAX_BODY_SIZE]; const char *field; - pjsip_status_code code; - struct ast_sip_endpoint *endpt = ast_pjsip_rdata_get_endpoint(rdata); const char *context = S_OR(endpt->message_context, endpt->context); + char exten[AST_MAX_EXTENSION]; + int res = 0; + int size; - /* make sure there is an appropriate context and extension*/ - if ((code = get_destination(rdata, context, buf)) != PJSIP_SC_OK) { - return code; + if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) { + return PJSIP_SC_UNSUPPORTED_URI_SCHEME; } - CHECK_RES(ast_msg_set_context(msg, "%s", context)); - CHECK_RES(ast_msg_set_exten(msg, "%s", buf)); + sip_ruri = pjsip_uri_get_uri(ruri); + ast_copy_pj_str(exten, &sip_ruri->user, AST_MAX_EXTENSION); + + res |= ast_msg_set_context(msg, "%s", context); + res |= ast_msg_set_exten(msg, "%s", exten); /* to header */ name_addr = (pjsip_name_addr *)rdata->msg_info.to->uri; - if ((size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf)-1)) > 0) { - buf[size] = '\0'; - /* prepend the tech */ - CHECK_RES(ast_msg_set_to(msg, "%s", sip_to_pjsip(buf, ++size, sizeof(buf)-1))); + size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf) - 1); + if (size <= 0) { + return PJSIP_SC_INTERNAL_SERVER_ERROR; } + buf[size] = '\0'; + res |= ast_msg_set_to(msg, "%s", sip_to_pjsip(buf, ++size, sizeof(buf) - 1)); /* from header */ name_addr = (pjsip_name_addr *)rdata->msg_info.from->uri; - if ((size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf)-1)) > 0) { - buf[size] = '\0'; - CHECK_RES(ast_msg_set_from(msg, "%s", buf)); + size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf) - 1); + if (size <= 0) { + return PJSIP_SC_INTERNAL_SERVER_ERROR; } + buf[size] = '\0'; + res |= ast_msg_set_from(msg, "%s", buf); - /* receive address */ - field = pj_sockaddr_print(&rdata->pkt_info.src_addr, buf, sizeof(buf)-1, 1); - CHECK_RES(ast_msg_set_var(msg, "PJSIP_RECVADDR", field)); + field = pj_sockaddr_print(&rdata->pkt_info.src_addr, buf, sizeof(buf) - 1, 1); + res |= ast_msg_set_var(msg, "PJSIP_RECVADDR", field); - /* body */ if (print_body(rdata, buf, sizeof(buf) - 1) > 0) { - CHECK_RES(ast_msg_set_body(msg, "%s", buf)); + res |= ast_msg_set_body(msg, "%s", buf); } /* endpoint name */ + res |= ast_msg_set_tech(msg, "%s", "PJSIP"); + res |= ast_msg_set_endpoint(msg, "%s", ast_sorcery_object_get_id(endpt)); if (endpt->id.self.name.valid) { - CHECK_RES(ast_msg_set_var(msg, "PJSIP_PEERNAME", endpt->id.self.name.str)); + res |= ast_msg_set_var(msg, "PJSIP_ENDPOINT", endpt->id.self.name.str); } - CHECK_RES(headers_to_vars(rdata, msg)); + res |= headers_to_vars(rdata, msg); - return PJSIP_SC_OK; + return !res ? PJSIP_SC_OK : PJSIP_SC_INTERNAL_SERVER_ERROR; } struct msg_data { @@ -547,17 +520,14 @@ static struct msg_data* msg_data_create(const struct ast_msg *msg, const char *t return NULL; } - /* if there is another sip in the uri then we are good, - otherwise it needs a sip: in front */ - mdata->to = to == skip_sip(to) ? ast_strdup(to - 3) : - ast_strdup(++to); + /* Make sure we start with sip: */ + mdata->to = ast_begins_with(to, "sip:") ? ast_strdup(++to) : ast_strdup(to - 3); mdata->from = ast_strdup(from); /* sometimes from can still contain the tag at this point, so remove it */ if ((tag = strchr(mdata->from, ';'))) { *tag = '\0'; } - return mdata; } @@ -577,8 +547,8 @@ static int msg_send(void *data) mdata->to, &uri), ao2_cleanup); if (!endpoint) { - ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not find endpoint and " - "no default outbound endpoint configured\n"); + ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not find endpoint '%s' and " + "no default outbound endpoint configured\n", mdata->to); return -1; } @@ -598,6 +568,9 @@ static int msg_send(void *data) vars_to_headers(mdata->msg, tdata); + ast_debug(1, "Sending message to '%s' (via endpoint %s) from '%s'\n", + mdata->to, ast_sorcery_object_get_id(endpoint), mdata->from); + if (ast_sip_send_request(tdata, NULL, endpoint, NULL, NULL)) { ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not send request\n"); return -1; @@ -670,24 +643,36 @@ static pj_bool_t module_on_rx_request(pjsip_rx_data *rdata) return PJ_FALSE; } + code = check_content_type(rdata); + if (code != PJSIP_SC_OK) { + send_response(rdata, code, NULL, NULL); + return PJ_TRUE; + } + msg = ast_msg_alloc(); if (!msg) { send_response(rdata, PJSIP_SC_INTERNAL_SERVER_ERROR, NULL, NULL); return PJ_TRUE; } - if ((code = check_content_type(rdata)) != PJSIP_SC_OK) { + code = rx_data_to_ast_msg(rdata, msg); + if (code != PJSIP_SC_OK) { send_response(rdata, code, NULL, NULL); + ast_msg_destroy(msg); return PJ_TRUE; } - if ((code = rx_data_to_ast_msg(rdata, msg)) == PJSIP_SC_OK) { - /* send it to the dialplan */ - ast_msg_queue(msg); - code = PJSIP_SC_ACCEPTED; + if (!ast_msg_has_destination(msg)) { + ast_debug(1, "MESSAGE request received, but no handler wanted it\n"); + send_response(rdata, PJSIP_SC_NOT_FOUND, NULL, NULL); + ast_msg_destroy(msg); + return PJ_TRUE; } - send_response(rdata, code, NULL, NULL); + /* send it to the messaging core */ + ast_msg_queue(msg); + send_response(rdata, PJSIP_SC_ACCEPTED, NULL, NULL); + return PJ_TRUE; } diff --git a/res/res_stasis.c b/res/res_stasis.c index a64feee48..7d5373153 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -66,6 +66,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/strings.h" #include "stasis/app.h" #include "stasis/control.h" +#include "stasis/messaging.h" #include "stasis/stasis_bridge.h" #include "asterisk/core_unreal.h" #include "asterisk/musiconhold.h" @@ -1433,6 +1434,8 @@ static int unload_module(void) { stasis_app_unregister_event_sources(); + messaging_cleanup(); + ao2_cleanup(apps_registry); apps_registry = NULL; @@ -1495,6 +1498,11 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } + if (messaging_init()) { + unload_module(); + return AST_MODULE_LOAD_FAILURE; + } + bridge_stasis_init(); stasis_app_register_event_sources(); diff --git a/res/res_xmpp.c b/res/res_xmpp.c index f5734ce04..b3c374871 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -3174,16 +3174,27 @@ static int xmpp_pak_message(struct ast_xmpp_client *client, struct ast_xmpp_clie if (ast_test_flag(&cfg->flags, XMPP_SEND_TO_DIALPLAN)) { struct ast_msg *msg; + struct ast_xmpp_buddy *buddy; if ((msg = ast_msg_alloc())) { int res; ast_xmpp_client_lock(client); + buddy = ao2_find(client->buddies, pak->from->partial, OBJ_KEY | OBJ_NOLOCK); + res = ast_msg_set_to(msg, "xmpp:%s", cfg->user); res |= ast_msg_set_from(msg, "xmpp:%s", message->from); res |= ast_msg_set_body(msg, "%s", message->message); res |= ast_msg_set_context(msg, "%s", cfg->context); + res |= ast_msg_set_tech(msg, "%s", "XMPP"); + res |= ast_msg_set_endpoint(msg, "%s", client->name); + + if (buddy) { + res |= ast_msg_set_var(msg, "XMPP_BUDDY", buddy->id); + } + + ao2_cleanup(buddy); ast_xmpp_client_unlock(client); diff --git a/res/stasis/app.c b/res/stasis/app.c index 41f6ccf65..7e7911b9c 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -28,6 +28,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "app.h" +#include "messaging.h" #include "asterisk/callerid.h" #include "asterisk/stasis_app.h" @@ -511,6 +512,44 @@ static struct ast_json *simple_endpoint_event( "endpoint", json_endpoint); } +static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt) +{ + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + struct ast_json *json_endpoint; + struct stasis_app *app = pvt; + char *tech; + char *resource; + + tech = ast_strdupa(endpoint_id); + resource = strchr(tech, '/'); + if (resource) { + resource[0] = '\0'; + resource++; + } + + if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) { + return -1; + } + + snapshot = ast_endpoint_latest_snapshot(tech, resource); + if (!snapshot) { + return -1; + } + + json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer()); + if (!json_endpoint) { + return -1; + } + + app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}", + "type", "TextMessageReceived", + "timestamp", ast_json_timeval(ast_tvnow(), NULL), + "endpoint", json_endpoint, + "message", json_msg)); + + return 0; +} + static void sub_endpoint_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) @@ -1018,6 +1057,10 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, ao2_find(app->forwards, forwards, OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK | OBJ_NODATA); + + if (!strcmp(kind, "endpoint")) { + messaging_app_unsubscribe_endpoint(app->name, id); + } } return 0; @@ -1148,6 +1191,9 @@ int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint return -1; } ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); + + /* Subscribe for messages */ + messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); } ++forwards->interested; diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c new file mode 100644 index 000000000..47730851f --- /dev/null +++ b/res/stasis/messaging.c @@ -0,0 +1,531 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2014, Digium, Inc. + * + * Matt Jordan <mjordan@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * + * \brief Stasis out-of-call text message support + * + * \author Matt Jordan <mjordan@digium.com> + */ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/message.h" +#include "asterisk/endpoints.h" +#include "asterisk/astobj2.h" +#include "asterisk/vector.h" +#include "asterisk/lock.h" +#include "asterisk/utils.h" +#include "asterisk/test.h" +#include "messaging.h" + +/*! + * \brief Number of buckets for the \ref endpoint_subscriptions container + */ +#define ENDPOINTS_NUM_BUCKETS 127 + +/*! \brief Storage object for an application */ +struct application_tuple { + /*! ao2 ref counted private object to pass to the callback */ + void *pvt; + /*! The callback to call when this application has a message */ + message_received_cb callback; + /*! The name (key) of the application */ + char app_name[]; +}; + +/*! \brief A subscription to some endpoint or technology */ +struct message_subscription { + /*! The applications that have subscribed to this endpoint or tech */ + AST_VECTOR(, struct application_tuple *) applications; + /*! The name of this endpoint or tech */ + char token[]; +}; + +/*! \brief The subscriptions to endpoints */ +static struct ao2_container *endpoint_subscriptions; + +/*! + * \brief The subscriptions to technologies + * + * \note These are stored separately from standard endpoints, given how + * relatively few of them there are. + */ +static AST_VECTOR(,struct message_subscription *) tech_subscriptions; + +/*! \brief RWLock for \c tech_subscriptions */ +static ast_rwlock_t tech_subscriptions_lock; + +/*! \internal \brief Destructor for \c application_tuple */ +static void application_tuple_dtor(void *obj) +{ + struct application_tuple *tuple = obj; + + ao2_cleanup(tuple->pvt); +} + +/*! \internal \brief Constructor for \c application_tuple */ +static struct application_tuple *application_tuple_alloc(const char *app_name, message_received_cb callback, void *pvt) +{ + struct application_tuple *tuple; + size_t size = sizeof(*tuple) + strlen(app_name) + 1; + + ast_assert(callback != NULL); + + tuple = ao2_t_alloc(size, application_tuple_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!tuple) { + return NULL; + } + + strcpy(tuple->app_name, app_name); /* Safe */ + tuple->pvt = ao2_bump(pvt); + tuple->callback = callback; + + return tuple; +} + +/*! \internal \brief Destructor for \ref message_subscription */ +static void message_subscription_dtor(void *obj) +{ + struct message_subscription *sub = obj; + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { + struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i); + + ao2_cleanup(tuple); + } + AST_VECTOR_FREE(&sub->applications); +} + +/*! \internal \brief Constructor for \ref message_subscription */ +static struct message_subscription *message_subscription_alloc(const char *token) +{ + struct message_subscription *sub; + size_t size = sizeof(*sub) + strlen(token) + 1; + + sub = ao2_t_alloc(size, message_subscription_dtor, AO2_ALLOC_OPT_LOCK_RWLOCK); + if (!sub) { + return NULL; + } + strcpy(sub->token, token); /* Safe */ + + return sub; +} + +/*! AO2 hash function for \ref message_subscription */ +static int message_subscription_hash_cb(const void *obj, const int flags) +{ + const struct message_subscription *sub; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + sub = obj; + key = sub->token; + break; + default: + /* Hash can only work on something with a full key. */ + ast_assert(0); + return 0; + } + return ast_str_hash(key); +} + +/*! AO2 comparison function for \ref message_subscription */ +static int message_subscription_compare_cb(void *obj, void *arg, int flags) +{ + const struct message_subscription *object_left = obj; + const struct message_subscription *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->token; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcmp(object_left->token, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* + * We could also use a partial key struct containing a length + * so strlen() does not get called for every comparison instead. + */ + cmp = strncmp(object_left->token, right_key, strlen(right_key)); + break; + default: + /* + * What arg points to is specific to this traversal callback + * and has no special meaning to astobj2. + */ + cmp = 0; + break; + } + if (cmp) { + return 0; + } + /* + * At this point the traversal callback is identical to a sorted + * container. + */ + return CMP_MATCH; +} + +/*! \internal \brief Convert a \c ast_msg To/From URI to a Stasis endpoint name */ +static void msg_to_endpoint(const struct ast_msg *msg, char *buf, size_t len) +{ + const char *endpoint = ast_msg_get_endpoint(msg); + + snprintf(buf, len, "%s%s%s", ast_msg_get_tech(msg), + ast_strlen_zero(endpoint) ? "" : "/", + S_OR(endpoint, "")); +} + +/*! \internal + * \brief Callback from the \c message API that determines if we can handle + * this message + */ +static int has_destination_cb(const struct ast_msg *msg) +{ + struct message_subscription *sub; + int i; + char buf[256]; + + msg_to_endpoint(msg, buf, sizeof(buf)); + + ast_rwlock_rdlock(&tech_subscriptions_lock); + for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { + sub = AST_VECTOR_GET(&tech_subscriptions, i); + + if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token)) + || !strncasecmp(sub->token, buf, strlen(sub->token)))) { + ast_rwlock_unlock(&tech_subscriptions_lock); + sub = NULL; /* No ref bump! */ + goto match; + } + + } + ast_rwlock_unlock(&tech_subscriptions_lock); + + sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); + if (sub) { + goto match; + } + + ast_debug(1, "No subscription found for %s\n", buf); + return 0; + +match: + ao2_cleanup(sub); + return 1; +} + +static struct ast_json *msg_to_json(struct ast_msg *msg) +{ + struct ast_json *json_obj; + struct ast_json *json_vars; + struct ast_msg_var_iterator *it_vars; + const char *name; + const char *value; + + it_vars = ast_msg_var_iterator_init(msg); + if (!it_vars) { + return NULL; + } + + json_vars = ast_json_array_create(); + if (!json_vars) { + return NULL; + } + + while (ast_msg_var_iterator_next(msg, it_vars, &name, &value)) { + struct ast_json *json_tuple; + + json_tuple = ast_json_pack("{s: s}", name, value); + if (!json_tuple) { + ast_json_free(json_vars); + return NULL; + } + + ast_json_array_append(json_vars, json_tuple); + ast_msg_var_unref_current(it_vars); + } + ast_msg_var_iterator_destroy(it_vars); + + json_obj = ast_json_pack("{s: s, s: s, s: s, s: o}", + "from", ast_msg_get_from(msg), + "to", ast_msg_get_to(msg), + "body", ast_msg_get_body(msg), + "variables", json_vars); + + return json_obj; +} + +static int handle_msg_cb(struct ast_msg *msg) +{ + struct message_subscription *sub; + int i; + char buf[256]; + const char *endpoint_name; + struct ast_json *json_msg; + + msg_to_endpoint(msg, buf, sizeof(buf)); + + ast_rwlock_rdlock(&tech_subscriptions_lock); + for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { + sub = AST_VECTOR_GET(&tech_subscriptions, i); + + if (!sub) { + continue; + } + + if (!strncasecmp(sub->token, buf, strlen(sub->token))) { + ast_rwlock_unlock(&tech_subscriptions_lock); + ao2_bump(sub); + endpoint_name = buf; + goto match; + } + } + ast_rwlock_unlock(&tech_subscriptions_lock); + + sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); + if (sub) { + endpoint_name = buf; + goto match; + } + + return -1; + +match: + ast_debug(3, "Dispatching message for %s\n", endpoint_name); + + json_msg = msg_to_json(msg); + if (!json_msg) { + ao2_ref(sub, -1); + return -1; + } + + for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { + struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i); + + tuple->callback(endpoint_name, json_msg, tuple->pvt); + } + + ast_json_unref(json_msg); + ao2_ref(sub, -1); + return 0; +} + +struct ast_msg_handler ari_msg_handler = { + .name = "ari", + .handle_msg = handle_msg_cb, + .has_destination = has_destination_cb, +}; + +static int messaging_subscription_cmp(struct message_subscription *sub, const char *key) +{ + return !strcmp(sub->token, key) ? 1 : 0; +} + +static int application_tuple_cmp(struct application_tuple *item, const char *key) +{ + return !strcmp(item->app_name, key) ? 1 : 0; +} + +static int is_app_subscribed(struct message_subscription *sub, const char *app_name) +{ + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { + struct application_tuple *tuple; + + tuple = AST_VECTOR_GET(&sub->applications, i); + if (tuple && !strcmp(tuple->app_name, app_name)) { + return 1; + } + } + + return 0; +} + +static struct message_subscription *get_subscription(struct ast_endpoint *endpoint) +{ + struct message_subscription *sub = NULL; + + if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY); + } else { + int i; + + ast_rwlock_rdlock(&tech_subscriptions_lock); + for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { + sub = AST_VECTOR_GET(&tech_subscriptions, i); + + if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) { + ao2_bump(sub); + break; + } + } + ast_rwlock_unlock(&tech_subscriptions_lock); + } + + return sub; +} + +void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id) +{ + RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup); + RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); + + endpoint = ast_endpoint_find_by_id(endpoint_id); + if (!endpoint) { + return; + } + + sub = get_subscription(endpoint); + if (!sub) { + return; + } + + ao2_lock(sub); + if (!is_app_subscribed(sub, app_name)) { + ao2_unlock(sub); + return; + } + + AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup); + if (AST_VECTOR_SIZE(&sub->applications) == 0) { + if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + ao2_unlink(endpoint_subscriptions, sub); + } else { + ast_rwlock_wrlock(&tech_subscriptions_lock); + AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint), + messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP); + ast_rwlock_unlock(&tech_subscriptions_lock); + } + } + ao2_unlock(sub); + ao2_ref(sub, -1); + + ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n", + app_name, ast_endpoint_get_id(endpoint)); +} + +static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint) +{ + struct message_subscription *sub = get_subscription(endpoint); + + if (sub) { + return sub; + } + + sub = message_subscription_alloc(ast_endpoint_get_id(endpoint)); + if (!sub) { + return NULL; + } + + if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + ao2_link(endpoint_subscriptions, sub); + } else { + ast_rwlock_wrlock(&tech_subscriptions_lock); + AST_VECTOR_APPEND(&tech_subscriptions, ao2_bump(sub)); + ast_rwlock_unlock(&tech_subscriptions_lock); + } + + return sub; +} + +int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt) +{ + RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup); + struct application_tuple *tuple; + + sub = get_or_create_subscription(endpoint); + if (!sub) { + return -1; + } + + ao2_lock(sub); + if (is_app_subscribed(sub, app_name)) { + ao2_unlock(sub); + return 0; + } + + tuple = application_tuple_alloc(app_name, callback, pvt); + if (!tuple) { + ao2_unlock(sub); + return -1; + } + AST_VECTOR_APPEND(&sub->applications, tuple); + ao2_unlock(sub); + + ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n", + app_name, ast_endpoint_get_id(endpoint)); + + return 0; +} + + +int messaging_cleanup(void) +{ + ast_msg_handler_unregister(&ari_msg_handler); + ao2_ref(endpoint_subscriptions, -1); + AST_VECTOR_FREE(&tech_subscriptions); + ast_rwlock_destroy(&tech_subscriptions_lock);\ + + return 0; +} + +int messaging_init(void) +{ + endpoint_subscriptions = ao2_t_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0, + ENDPOINTS_NUM_BUCKETS, message_subscription_hash_cb, NULL, + message_subscription_compare_cb, "Endpoint messaging subscription container creation"); + if (!endpoint_subscriptions) { + return -1; + } + + if (AST_VECTOR_INIT(&tech_subscriptions, 4)) { + ao2_ref(endpoint_subscriptions, -1); + return -1; + } + + if (ast_rwlock_init(&tech_subscriptions_lock)) { + ao2_ref(endpoint_subscriptions, -1); + AST_VECTOR_FREE(&tech_subscriptions); + return -1; + } + + if (ast_msg_handler_register(&ari_msg_handler)) { + ao2_ref(endpoint_subscriptions, -1); + AST_VECTOR_FREE(&tech_subscriptions); + ast_rwlock_destroy(&tech_subscriptions_lock); + return -1; + } + + return 0; +} diff --git a/res/stasis/messaging.h b/res/stasis/messaging.h new file mode 100644 index 000000000..c69d5d570 --- /dev/null +++ b/res/stasis/messaging.h @@ -0,0 +1,83 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2014, Digium, Inc. + * + * Matt Jordan <mjordan@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_RES_STASIS_MESSAGING_H +#define _ASTERISK_RES_STASIS_MESSAGING_H + +/*! + * \file + * + * \brief Stasis out-of-call text message support + * + * \author Matt Jordan <mjordan@digium.com> + * \since 12.4.0 + */ + +/*! + * \brief Callback handler for when a message is received from the core + * + * \param endpoint_id The ID of the endpoint that we received the message from + * \param json_msg JSON representation of the text message + * \param pvt ao2 ref counted pvt passed during registration + * + * \retval 0 the message was handled + * \retval non-zero the message was not handled + */ +typedef int (* message_received_cb)(const char *endpoint_id, struct ast_json *json_msg, void *pvt); + +/*! + * \brief Subscribe for messages from a particular endpoint + * + * \param app_name Name of the stasis application to unsubscribe from messaging + * \param endpoint_id The ID of the endpoint we no longer care about + * + * \retval 0 success + * \retval -1 error + */ +void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id); + +/*! + * \brief Subscribe an application to an endpoint for messages + * + * \param app_name The name of the \ref stasis application to subscribe to \c endpoint + * \param endpoint The endpoint object to subscribe to + * \param message_received_cb The callback to call when a message is received + * \param pvt An ao2 ref counted object that will be passed to the callback. + * + * \retval 0 subscription was successful + * \retval -1 subscription failed + */ +int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt); + +/*! + * \brief Tidy up the messaging layer + * + * \retval 0 success + * \retval -1 failure + */ +int messaging_cleanup(void); + +/*! + * \brief Initialize the messaging layer + * + * \retval 0 success + * \retval -1 failure + */ +int messaging_init(void); + +#endif /* #define _ASTERISK_RES_STASIS_MESSAGING_H */
\ No newline at end of file |