summaryrefslogtreecommitdiff
path: root/res
diff options
context:
space:
mode:
Diffstat (limited to 'res')
-rw-r--r--res/res_stasis_http_endpoints.c47
-rw-r--r--res/res_stasis_test.c291
-rw-r--r--res/res_stasis_test.exports.in6
-rw-r--r--res/stasis_http/resource_endpoints.c134
-rw-r--r--res/stasis_http/resource_endpoints.h21
5 files changed, 475 insertions, 24 deletions
diff --git a/res/res_stasis_http_endpoints.c b/res/res_stasis_http_endpoints.c
index a420d4ede..a09be1b6e 100644
--- a/res/res_stasis_http_endpoints.c
+++ b/res/res_stasis_http_endpoints.c
@@ -55,18 +55,32 @@ static void stasis_http_get_endpoints_cb(
struct ast_variable *headers, struct stasis_http_response *response)
{
struct ast_get_endpoints_args args = {};
+ stasis_http_get_endpoints(headers, &args, response);
+}
+/*!
+ * \brief Parameter parsing callback for /endpoints/{tech}.
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param[out] response Response to the HTTP request.
+ */
+static void stasis_http_get_endpoints_by_tech_cb(
+ struct ast_variable *get_params, struct ast_variable *path_vars,
+ struct ast_variable *headers, struct stasis_http_response *response)
+{
+ struct ast_get_endpoints_by_tech_args args = {};
struct ast_variable *i;
- for (i = get_params; i; i = i->next) {
- if (strcmp(i->name, "withType") == 0) {
- args.with_type = (i->value);
+ for (i = path_vars; i; i = i->next) {
+ if (strcmp(i->name, "tech") == 0) {
+ args.tech = (i->value);
} else
{}
}
- stasis_http_get_endpoints(headers, &args, response);
+ stasis_http_get_endpoints_by_tech(headers, &args, response);
}
/*!
- * \brief Parameter parsing callback for /endpoints/{endpointId}.
+ * \brief Parameter parsing callback for /endpoints/{tech}/{resource}.
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
@@ -80,8 +94,11 @@ static void stasis_http_get_endpoint_cb(
struct ast_variable *i;
for (i = path_vars; i; i = i->next) {
- if (strcmp(i->name, "endpointId") == 0) {
- args.endpoint_id = (i->value);
+ if (strcmp(i->name, "tech") == 0) {
+ args.tech = (i->value);
+ } else
+ if (strcmp(i->name, "resource") == 0) {
+ args.resource = (i->value);
} else
{}
}
@@ -89,8 +106,8 @@ static void stasis_http_get_endpoint_cb(
}
/*! \brief REST handler for /api-docs/endpoints.{format} */
-static struct stasis_rest_handlers endpoints_endpointId = {
- .path_segment = "endpointId",
+static struct stasis_rest_handlers endpoints_tech_resource = {
+ .path_segment = "resource",
.is_wildcard = 1,
.callbacks = {
[AST_HTTP_GET] = stasis_http_get_endpoint_cb,
@@ -99,13 +116,23 @@ static struct stasis_rest_handlers endpoints_endpointId = {
.children = { }
};
/*! \brief REST handler for /api-docs/endpoints.{format} */
+static struct stasis_rest_handlers endpoints_tech = {
+ .path_segment = "tech",
+ .is_wildcard = 1,
+ .callbacks = {
+ [AST_HTTP_GET] = stasis_http_get_endpoints_by_tech_cb,
+ },
+ .num_children = 1,
+ .children = { &endpoints_tech_resource, }
+};
+/*! \brief REST handler for /api-docs/endpoints.{format} */
static struct stasis_rest_handlers endpoints = {
.path_segment = "endpoints",
.callbacks = {
[AST_HTTP_GET] = stasis_http_get_endpoints_cb,
},
.num_children = 1,
- .children = { &endpoints_endpointId, }
+ .children = { &endpoints_tech, }
};
static int load_module(void)
diff --git a/res/res_stasis_test.c b/res/res_stasis_test.c
new file mode 100644
index 000000000..7b5aece4f
--- /dev/null
+++ b/res/res_stasis_test.c
@@ -0,0 +1,291 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file \brief Test infrastructure for dealing with Stasis.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+
+#include "asterisk/astobj2.h"
+#include "asterisk/module.h"
+#include "asterisk/stasis_test.h"
+
+static struct stasis_message_type *test_message_type;
+
+static void stasis_message_sink_dtor(void *obj)
+{
+ struct stasis_message_sink *sink = obj;
+
+ {
+ SCOPED_MUTEX(lock, &sink->lock);
+ while (!sink->is_done) {
+ /* Normally waiting forever is bad, but if we're not
+ * done, we're not done. */
+ ast_cond_wait(&sink->cond, &sink->lock);
+ }
+ }
+
+ ast_mutex_destroy(&sink->lock);
+ ast_cond_destroy(&sink->cond);
+
+ while (sink->num_messages > 0) {
+ ao2_cleanup(sink->messages[--sink->num_messages]);
+ }
+ ast_free(sink->messages);
+ sink->messages = NULL;
+ sink->max_messages = 0;
+}
+
+static struct timespec make_deadline(int timeout_millis)
+{
+ struct timeval start = ast_tvnow();
+ struct timeval delta = {
+ .tv_sec = timeout_millis / 1000,
+ .tv_usec = (timeout_millis % 1000) * 1000,
+ };
+ struct timeval deadline_tv = ast_tvadd(start, delta);
+ struct timespec deadline = {
+ .tv_sec = deadline_tv.tv_sec,
+ .tv_nsec = 1000 * deadline_tv.tv_usec,
+ };
+
+ return deadline;
+}
+
+struct stasis_message_sink *stasis_message_sink_create(void)
+{
+ RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+
+ sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
+ if (!sink) {
+ return NULL;
+ }
+ ast_mutex_init(&sink->lock);
+ ast_cond_init(&sink->cond, NULL);
+ sink->max_messages = 4;
+ sink->messages =
+ ast_malloc(sizeof(*sink->messages) * sink->max_messages);
+ if (!sink->messages) {
+ return NULL;
+ }
+ ao2_ref(sink, +1);
+ return sink;
+}
+
+/*!
+ * \brief Implementation of the stasis_message_sink_cb() callback.
+ *
+ * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
+ * it has to do with how we load modules.
+ *
+ * Modules have their own metadata compiled into them in the AST_MODULE_INFO()
+ * block. This includes dependency information in the \c nonoptreq field.
+ *
+ * Asterisk loads the module, inspects the field, then loads any needed
+ * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
+ * dlopen(), which defers binding function references until they are called.
+ *
+ * But when you take the address of a function, that function needs to be
+ * available at load time. So if some module used the address of
+ * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
+ * that module would fail to load.
+ *
+ * The stasis_message_sink_cb() function gives us a layer of indirection so that
+ * the initial lazy binding will still work as expected.
+ */
+static void message_sink_cb(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct stasis_message_sink *sink = data;
+
+ SCOPED_MUTEX(lock, &sink->lock);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ sink->is_done = 1;
+ ast_cond_signal(&sink->cond);
+ return;
+ }
+
+ if (stasis_subscription_change_type() == stasis_message_type(message)) {
+ /* Ignore subscription changes */
+ return;
+ }
+
+ if (sink->num_messages == sink->max_messages) {
+ size_t new_max_messages = sink->max_messages * 2;
+ struct stasis_message **new_messages = ast_realloc(
+ sink->messages,
+ sizeof(*new_messages) * new_max_messages);
+ if (!new_messages) {
+ return;
+ }
+ sink->max_messages = new_max_messages;
+ sink->messages = new_messages;
+ }
+
+ ao2_ref(message, +1);
+ sink->messages[sink->num_messages++] = message;
+ ast_cond_signal(&sink->cond);
+}
+
+stasis_subscription_cb stasis_message_sink_cb(void)
+{
+ return message_sink_cb;
+}
+
+
+int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
+ int num_messages, int timeout_millis)
+{
+ struct timespec deadline = make_deadline(timeout_millis);
+
+ SCOPED_MUTEX(lock, &sink->lock);
+ while (sink->num_messages < num_messages) {
+ int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ break;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+ strerror(r));
+ break;
+ }
+ }
+ return sink->num_messages;
+}
+
+int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
+ int num_messages, int timeout_millis)
+{
+ struct timespec deadline = make_deadline(timeout_millis);
+
+ SCOPED_MUTEX(lock, &sink->lock);
+ while (sink->num_messages == num_messages) {
+ int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ break;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+ strerror(r));
+ break;
+ }
+ }
+ return sink->num_messages;
+}
+
+int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
+ stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
+{
+ struct timespec deadline = make_deadline(timeout_millis);
+
+ SCOPED_MUTEX(lock, &sink->lock);
+
+ /* wait for the start */
+ while (sink->num_messages < start + 1) {
+ int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ /* Timed out waiting for the start */
+ return -1;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+ strerror(r));
+ return -2;
+ }
+ }
+
+
+ while (!cmp_cb(sink->messages[start], data)) {
+ ++start;
+
+ while (sink->num_messages < start + 1) {
+ int r = ast_cond_timedwait(&sink->cond,
+ &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ return -1;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR,
+ "Unexpected condition error: %s\n",
+ strerror(r));
+ return -2;
+ }
+ }
+ }
+
+ return start;
+}
+
+struct stasis_message *stasis_test_message_create(void)
+{
+ RAII_VAR(void *, data, NULL, ao2_cleanup);
+
+ /* We just need the unique pointer; don't care what's in it */
+ data = ao2_alloc(1, NULL);
+ if (!data) {
+ return NULL;
+ }
+
+ return stasis_message_create(stasis_test_message_type(), data);
+}
+
+struct stasis_message_type *stasis_test_message_type(void)
+{
+ return test_message_type;
+}
+
+static int unload_module(void)
+{
+ ao2_cleanup(test_message_type);
+ test_message_type = NULL;
+ return 0;
+}
+
+static int load_module(void)
+{
+ test_message_type = stasis_message_type_create(
+ "stasis_test_message");
+ if (!test_message_type) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY,
+ AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER,
+ "Stasis test utilities",
+ .load = load_module,
+ .unload = unload_module,
+ .load_pri = AST_MODPRI_APP_DEPEND,
+ );
diff --git a/res/res_stasis_test.exports.in b/res/res_stasis_test.exports.in
new file mode 100644
index 000000000..961600323
--- /dev/null
+++ b/res/res_stasis_test.exports.in
@@ -0,0 +1,6 @@
+{
+ global:
+ LINKER_SYMBOL_PREFIXstasis_*;
+ local:
+ *;
+};
diff --git a/res/stasis_http/resource_endpoints.c b/res/stasis_http/resource_endpoints.c
index b2611bad8..52d05c093 100644
--- a/res/stasis_http/resource_endpoints.c
+++ b/res/stasis_http/resource_endpoints.c
@@ -1,4 +1,4 @@
-/* -*- C -*-
+/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2012 - 2013, Digium, Inc.
@@ -18,26 +18,140 @@
/*! \file
*
- * \brief Implementation for stasis-http stubs.
+ * \brief /api-docs/endpoints.{format} implementation- Endpoint resources
*
* \author David M. Lee, II <dlee@digium.com>
*/
-/*** MODULEINFO
- <support_level>core</support_level>
- ***/
-
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "resource_endpoints.h"
-void stasis_http_get_endpoint(struct ast_variable *headers, struct ast_get_endpoint_args *args, struct stasis_http_response *response)
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_endpoints.h"
+
+void stasis_http_get_endpoints(struct ast_variable *headers,
+ struct ast_get_endpoints_args *args,
+ struct stasis_http_response *response)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ao2_iterator i;
+ void *obj;
+
+ caching_topic = ast_endpoint_topic_all_cached();
+ if (!caching_topic) {
+ stasis_http_response_error(
+ response, 500, "Internal Server Error",
+ "Message bus not initialized");
+ return;
+ }
+ ao2_ref(caching_topic, +1);
+
+ snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type());
+ if (!snapshots) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ json = ast_json_array_create();
+ if (!json) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ i = ao2_iterator_init(snapshots, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
+ struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg);
+ int r = ast_json_array_append(
+ json, ast_endpoint_snapshot_to_json(snapshot));
+ if (r != 0) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+ }
+ ao2_iterator_destroy(&i);
+
+ stasis_http_response_ok(response, ast_json_ref(json));
+}
+void stasis_http_get_endpoints_by_tech(struct ast_variable *headers,
+ struct ast_get_endpoints_by_tech_args *args,
+ struct stasis_http_response *response)
{
- ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoint\n");
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ao2_iterator i;
+ void *obj;
+
+ /* TODO - if tech isn't a recognized type of endpoint, it should 404 */
+
+ caching_topic = ast_endpoint_topic_all_cached();
+ if (!caching_topic) {
+ stasis_http_response_error(
+ response, 500, "Internal Server Error",
+ "Message bus not initialized");
+ return;
+ }
+ ao2_ref(caching_topic, +1);
+
+ snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type());
+ if (!snapshots) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ json = ast_json_array_create();
+ if (!json) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ i = ao2_iterator_init(snapshots, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
+ struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg);
+ int r;
+
+ if (strcmp(args->tech, snapshot->tech) != 0) {
+ continue;
+ }
+
+ r = ast_json_array_append(
+ json, ast_endpoint_snapshot_to_json(snapshot));
+ if (r != 0) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+ }
+ ao2_iterator_destroy(&i);
+
+ stasis_http_response_ok(response, ast_json_ref(json));
}
-void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response)
+void stasis_http_get_endpoint(struct ast_variable *headers,
+ struct ast_get_endpoint_args *args,
+ struct stasis_http_response *response)
{
- ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoints\n");
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource);
+ if (!snapshot) {
+ stasis_http_response_error(response, 404, "Not Found",
+ "Endpoint not found");
+ return;
+ }
+
+ json = ast_endpoint_snapshot_to_json(snapshot);
+ if (!json) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ stasis_http_response_ok(response, ast_json_ref(json));
}
diff --git a/res/stasis_http/resource_endpoints.h b/res/stasis_http/resource_endpoints.h
index 9f5a96e50..b534fb047 100644
--- a/res/stasis_http/resource_endpoints.h
+++ b/res/stasis_http/resource_endpoints.h
@@ -41,21 +41,34 @@
/*! \brief Argument struct for stasis_http_get_endpoints() */
struct ast_get_endpoints_args {
- /*! \brief Filter endpoints by type (sip,iax2,dhadi,...) */
- const char *with_type;
};
/*!
- * \brief List available endoints.
+ * \brief List all endoints.
*
* \param headers HTTP headers
* \param args Swagger parameters
* \param[out] response HTTP response
*/
void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response);
+/*! \brief Argument struct for stasis_http_get_endpoints_by_tech() */
+struct ast_get_endpoints_by_tech_args {
+ /*! \brief Technology of the endpoints (sip,iax2,...) */
+ const char *tech;
+};
+/*!
+ * \brief List available endoints for a given endpoint technology.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void stasis_http_get_endpoints_by_tech(struct ast_variable *headers, struct ast_get_endpoints_by_tech_args *args, struct stasis_http_response *response);
/*! \brief Argument struct for stasis_http_get_endpoint() */
struct ast_get_endpoint_args {
+ /*! \brief Technology of the endpoint */
+ const char *tech;
/*! \brief ID of the endpoint */
- const char *endpoint_id;
+ const char *resource;
};
/*!
* \brief Details for an endpoint.