diff options
Diffstat (limited to 'res')
-rw-r--r-- | res/res_stasis_http_endpoints.c | 47 | ||||
-rw-r--r-- | res/res_stasis_test.c | 291 | ||||
-rw-r--r-- | res/res_stasis_test.exports.in | 6 | ||||
-rw-r--r-- | res/stasis_http/resource_endpoints.c | 134 | ||||
-rw-r--r-- | res/stasis_http/resource_endpoints.h | 21 |
5 files changed, 475 insertions, 24 deletions
diff --git a/res/res_stasis_http_endpoints.c b/res/res_stasis_http_endpoints.c index a420d4ede..a09be1b6e 100644 --- a/res/res_stasis_http_endpoints.c +++ b/res/res_stasis_http_endpoints.c @@ -55,18 +55,32 @@ static void stasis_http_get_endpoints_cb( struct ast_variable *headers, struct stasis_http_response *response) { struct ast_get_endpoints_args args = {}; + stasis_http_get_endpoints(headers, &args, response); +} +/*! + * \brief Parameter parsing callback for /endpoints/{tech}. + * \param get_params GET parameters in the HTTP request. + * \param path_vars Path variables extracted from the request. + * \param headers HTTP headers. + * \param[out] response Response to the HTTP request. + */ +static void stasis_http_get_endpoints_by_tech_cb( + struct ast_variable *get_params, struct ast_variable *path_vars, + struct ast_variable *headers, struct stasis_http_response *response) +{ + struct ast_get_endpoints_by_tech_args args = {}; struct ast_variable *i; - for (i = get_params; i; i = i->next) { - if (strcmp(i->name, "withType") == 0) { - args.with_type = (i->value); + for (i = path_vars; i; i = i->next) { + if (strcmp(i->name, "tech") == 0) { + args.tech = (i->value); } else {} } - stasis_http_get_endpoints(headers, &args, response); + stasis_http_get_endpoints_by_tech(headers, &args, response); } /*! - * \brief Parameter parsing callback for /endpoints/{endpointId}. + * \brief Parameter parsing callback for /endpoints/{tech}/{resource}. * \param get_params GET parameters in the HTTP request. * \param path_vars Path variables extracted from the request. * \param headers HTTP headers. @@ -80,8 +94,11 @@ static void stasis_http_get_endpoint_cb( struct ast_variable *i; for (i = path_vars; i; i = i->next) { - if (strcmp(i->name, "endpointId") == 0) { - args.endpoint_id = (i->value); + if (strcmp(i->name, "tech") == 0) { + args.tech = (i->value); + } else + if (strcmp(i->name, "resource") == 0) { + args.resource = (i->value); } else {} } @@ -89,8 +106,8 @@ static void stasis_http_get_endpoint_cb( } /*! \brief REST handler for /api-docs/endpoints.{format} */ -static struct stasis_rest_handlers endpoints_endpointId = { - .path_segment = "endpointId", +static struct stasis_rest_handlers endpoints_tech_resource = { + .path_segment = "resource", .is_wildcard = 1, .callbacks = { [AST_HTTP_GET] = stasis_http_get_endpoint_cb, @@ -99,13 +116,23 @@ static struct stasis_rest_handlers endpoints_endpointId = { .children = { } }; /*! \brief REST handler for /api-docs/endpoints.{format} */ +static struct stasis_rest_handlers endpoints_tech = { + .path_segment = "tech", + .is_wildcard = 1, + .callbacks = { + [AST_HTTP_GET] = stasis_http_get_endpoints_by_tech_cb, + }, + .num_children = 1, + .children = { &endpoints_tech_resource, } +}; +/*! \brief REST handler for /api-docs/endpoints.{format} */ static struct stasis_rest_handlers endpoints = { .path_segment = "endpoints", .callbacks = { [AST_HTTP_GET] = stasis_http_get_endpoints_cb, }, .num_children = 1, - .children = { &endpoints_endpointId, } + .children = { &endpoints_tech, } }; static int load_module(void) diff --git a/res/res_stasis_test.c b/res/res_stasis_test.c new file mode 100644 index 000000000..7b5aece4f --- /dev/null +++ b/res/res_stasis_test.c @@ -0,0 +1,291 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file \brief Test infrastructure for dealing with Stasis. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); + +#include "asterisk/astobj2.h" +#include "asterisk/module.h" +#include "asterisk/stasis_test.h" + +static struct stasis_message_type *test_message_type; + +static void stasis_message_sink_dtor(void *obj) +{ + struct stasis_message_sink *sink = obj; + + { + SCOPED_MUTEX(lock, &sink->lock); + while (!sink->is_done) { + /* Normally waiting forever is bad, but if we're not + * done, we're not done. */ + ast_cond_wait(&sink->cond, &sink->lock); + } + } + + ast_mutex_destroy(&sink->lock); + ast_cond_destroy(&sink->cond); + + while (sink->num_messages > 0) { + ao2_cleanup(sink->messages[--sink->num_messages]); + } + ast_free(sink->messages); + sink->messages = NULL; + sink->max_messages = 0; +} + +static struct timespec make_deadline(int timeout_millis) +{ + struct timeval start = ast_tvnow(); + struct timeval delta = { + .tv_sec = timeout_millis / 1000, + .tv_usec = (timeout_millis % 1000) * 1000, + }; + struct timeval deadline_tv = ast_tvadd(start, delta); + struct timespec deadline = { + .tv_sec = deadline_tv.tv_sec, + .tv_nsec = 1000 * deadline_tv.tv_usec, + }; + + return deadline; +} + +struct stasis_message_sink *stasis_message_sink_create(void) +{ + RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup); + + sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor); + if (!sink) { + return NULL; + } + ast_mutex_init(&sink->lock); + ast_cond_init(&sink->cond, NULL); + sink->max_messages = 4; + sink->messages = + ast_malloc(sizeof(*sink->messages) * sink->max_messages); + if (!sink->messages) { + return NULL; + } + ao2_ref(sink, +1); + return sink; +} + +/*! + * \brief Implementation of the stasis_message_sink_cb() callback. + * + * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well, + * it has to do with how we load modules. + * + * Modules have their own metadata compiled into them in the AST_MODULE_INFO() + * block. This includes dependency information in the \c nonoptreq field. + * + * Asterisk loads the module, inspects the field, then loads any needed + * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial + * dlopen(), which defers binding function references until they are called. + * + * But when you take the address of a function, that function needs to be + * available at load time. So if some module used the address of + * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then + * that module would fail to load. + * + * The stasis_message_sink_cb() function gives us a layer of indirection so that + * the initial lazy binding will still work as expected. + */ +static void message_sink_cb(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) +{ + struct stasis_message_sink *sink = data; + + SCOPED_MUTEX(lock, &sink->lock); + + if (stasis_subscription_final_message(sub, message)) { + sink->is_done = 1; + ast_cond_signal(&sink->cond); + return; + } + + if (stasis_subscription_change_type() == stasis_message_type(message)) { + /* Ignore subscription changes */ + return; + } + + if (sink->num_messages == sink->max_messages) { + size_t new_max_messages = sink->max_messages * 2; + struct stasis_message **new_messages = ast_realloc( + sink->messages, + sizeof(*new_messages) * new_max_messages); + if (!new_messages) { + return; + } + sink->max_messages = new_max_messages; + sink->messages = new_messages; + } + + ao2_ref(message, +1); + sink->messages[sink->num_messages++] = message; + ast_cond_signal(&sink->cond); +} + +stasis_subscription_cb stasis_message_sink_cb(void) +{ + return message_sink_cb; +} + + +int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, + int num_messages, int timeout_millis) +{ + struct timespec deadline = make_deadline(timeout_millis); + + SCOPED_MUTEX(lock, &sink->lock); + while (sink->num_messages < num_messages) { + int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + break; + } + if (r != 0) { + ast_log(LOG_ERROR, "Unexpected condition error: %s\n", + strerror(r)); + break; + } + } + return sink->num_messages; +} + +int stasis_message_sink_should_stay(struct stasis_message_sink *sink, + int num_messages, int timeout_millis) +{ + struct timespec deadline = make_deadline(timeout_millis); + + SCOPED_MUTEX(lock, &sink->lock); + while (sink->num_messages == num_messages) { + int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + break; + } + if (r != 0) { + ast_log(LOG_ERROR, "Unexpected condition error: %s\n", + strerror(r)); + break; + } + } + return sink->num_messages; +} + +int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, + stasis_wait_cb cmp_cb, const void *data, int timeout_millis) +{ + struct timespec deadline = make_deadline(timeout_millis); + + SCOPED_MUTEX(lock, &sink->lock); + + /* wait for the start */ + while (sink->num_messages < start + 1) { + int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + /* Timed out waiting for the start */ + return -1; + } + if (r != 0) { + ast_log(LOG_ERROR, "Unexpected condition error: %s\n", + strerror(r)); + return -2; + } + } + + + while (!cmp_cb(sink->messages[start], data)) { + ++start; + + while (sink->num_messages < start + 1) { + int r = ast_cond_timedwait(&sink->cond, + &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + return -1; + } + if (r != 0) { + ast_log(LOG_ERROR, + "Unexpected condition error: %s\n", + strerror(r)); + return -2; + } + } + } + + return start; +} + +struct stasis_message *stasis_test_message_create(void) +{ + RAII_VAR(void *, data, NULL, ao2_cleanup); + + /* We just need the unique pointer; don't care what's in it */ + data = ao2_alloc(1, NULL); + if (!data) { + return NULL; + } + + return stasis_message_create(stasis_test_message_type(), data); +} + +struct stasis_message_type *stasis_test_message_type(void) +{ + return test_message_type; +} + +static int unload_module(void) +{ + ao2_cleanup(test_message_type); + test_message_type = NULL; + return 0; +} + +static int load_module(void) +{ + test_message_type = stasis_message_type_create( + "stasis_test_message"); + if (!test_message_type) { + return AST_MODULE_LOAD_FAILURE; + } + + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, + AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, + "Stasis test utilities", + .load = load_module, + .unload = unload_module, + .load_pri = AST_MODPRI_APP_DEPEND, + ); diff --git a/res/res_stasis_test.exports.in b/res/res_stasis_test.exports.in new file mode 100644 index 000000000..961600323 --- /dev/null +++ b/res/res_stasis_test.exports.in @@ -0,0 +1,6 @@ +{ + global: + LINKER_SYMBOL_PREFIXstasis_*; + local: + *; +}; diff --git a/res/stasis_http/resource_endpoints.c b/res/stasis_http/resource_endpoints.c index b2611bad8..52d05c093 100644 --- a/res/stasis_http/resource_endpoints.c +++ b/res/stasis_http/resource_endpoints.c @@ -1,4 +1,4 @@ -/* -*- C -*- +/* * Asterisk -- An open source telephony toolkit. * * Copyright (C) 2012 - 2013, Digium, Inc. @@ -18,26 +18,140 @@ /*! \file * - * \brief Implementation for stasis-http stubs. + * \brief /api-docs/endpoints.{format} implementation- Endpoint resources * * \author David M. Lee, II <dlee@digium.com> */ -/*** MODULEINFO - <support_level>core</support_level> - ***/ - #include "asterisk.h" ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "resource_endpoints.h" -void stasis_http_get_endpoint(struct ast_variable *headers, struct ast_get_endpoint_args *args, struct stasis_http_response *response) +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_endpoints.h" + +void stasis_http_get_endpoints(struct ast_variable *headers, + struct ast_get_endpoints_args *args, + struct stasis_http_response *response) +{ + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ao2_iterator i; + void *obj; + + caching_topic = ast_endpoint_topic_all_cached(); + if (!caching_topic) { + stasis_http_response_error( + response, 500, "Internal Server Error", + "Message bus not initialized"); + return; + } + ao2_ref(caching_topic, +1); + + snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + if (!snapshots) { + stasis_http_response_alloc_failed(response); + return; + } + + json = ast_json_array_create(); + if (!json) { + stasis_http_response_alloc_failed(response); + return; + } + + i = ao2_iterator_init(snapshots, 0); + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup); + struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg); + int r = ast_json_array_append( + json, ast_endpoint_snapshot_to_json(snapshot)); + if (r != 0) { + stasis_http_response_alloc_failed(response); + return; + } + } + ao2_iterator_destroy(&i); + + stasis_http_response_ok(response, ast_json_ref(json)); +} +void stasis_http_get_endpoints_by_tech(struct ast_variable *headers, + struct ast_get_endpoints_by_tech_args *args, + struct stasis_http_response *response) { - ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoint\n"); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ao2_iterator i; + void *obj; + + /* TODO - if tech isn't a recognized type of endpoint, it should 404 */ + + caching_topic = ast_endpoint_topic_all_cached(); + if (!caching_topic) { + stasis_http_response_error( + response, 500, "Internal Server Error", + "Message bus not initialized"); + return; + } + ao2_ref(caching_topic, +1); + + snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + if (!snapshots) { + stasis_http_response_alloc_failed(response); + return; + } + + json = ast_json_array_create(); + if (!json) { + stasis_http_response_alloc_failed(response); + return; + } + + i = ao2_iterator_init(snapshots, 0); + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup); + struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg); + int r; + + if (strcmp(args->tech, snapshot->tech) != 0) { + continue; + } + + r = ast_json_array_append( + json, ast_endpoint_snapshot_to_json(snapshot)); + if (r != 0) { + stasis_http_response_alloc_failed(response); + return; + } + } + ao2_iterator_destroy(&i); + + stasis_http_response_ok(response, ast_json_ref(json)); } -void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response) +void stasis_http_get_endpoint(struct ast_variable *headers, + struct ast_get_endpoint_args *args, + struct stasis_http_response *response) { - ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoints\n"); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + + snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource); + if (!snapshot) { + stasis_http_response_error(response, 404, "Not Found", + "Endpoint not found"); + return; + } + + json = ast_endpoint_snapshot_to_json(snapshot); + if (!json) { + stasis_http_response_alloc_failed(response); + return; + } + + stasis_http_response_ok(response, ast_json_ref(json)); } diff --git a/res/stasis_http/resource_endpoints.h b/res/stasis_http/resource_endpoints.h index 9f5a96e50..b534fb047 100644 --- a/res/stasis_http/resource_endpoints.h +++ b/res/stasis_http/resource_endpoints.h @@ -41,21 +41,34 @@ /*! \brief Argument struct for stasis_http_get_endpoints() */ struct ast_get_endpoints_args { - /*! \brief Filter endpoints by type (sip,iax2,dhadi,...) */ - const char *with_type; }; /*! - * \brief List available endoints. + * \brief List all endoints. * * \param headers HTTP headers * \param args Swagger parameters * \param[out] response HTTP response */ void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response); +/*! \brief Argument struct for stasis_http_get_endpoints_by_tech() */ +struct ast_get_endpoints_by_tech_args { + /*! \brief Technology of the endpoints (sip,iax2,...) */ + const char *tech; +}; +/*! + * \brief List available endoints for a given endpoint technology. + * + * \param headers HTTP headers + * \param args Swagger parameters + * \param[out] response HTTP response + */ +void stasis_http_get_endpoints_by_tech(struct ast_variable *headers, struct ast_get_endpoints_by_tech_args *args, struct stasis_http_response *response); /*! \brief Argument struct for stasis_http_get_endpoint() */ struct ast_get_endpoint_args { + /*! \brief Technology of the endpoint */ + const char *tech; /*! \brief ID of the endpoint */ - const char *endpoint_id; + const char *resource; }; /*! * \brief Details for an endpoint. |