summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/asterisk.c6
-rw-r--r--main/astobj2.c38
-rw-r--r--main/channel_internal_api.c39
-rw-r--r--main/endpoints.c338
-rw-r--r--main/stasis_cache.c63
-rw-r--r--main/stasis_endpoints.c189
6 files changed, 636 insertions, 37 deletions
diff --git a/main/asterisk.c b/main/asterisk.c
index b8deab1d4..885797f36 100644
--- a/main/asterisk.c
+++ b/main/asterisk.c
@@ -242,6 +242,7 @@ int daemon(int, int); /* defined in libresolv of all places */
#include "asterisk/sorcery.h"
#include "asterisk/stasis.h"
#include "asterisk/json.h"
+#include "asterisk/stasis_endpoints.h"
#include "../defaults.h"
@@ -4174,6 +4175,11 @@ int main(int argc, char *argv[])
exit(1);
}
+ if (ast_endpoint_stasis_init()) {
+ printf("Endpoint initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
ast_makesocket();
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
diff --git a/main/astobj2.c b/main/astobj2.c
index a980ec379..8ccc36cb4 100644
--- a/main/astobj2.c
+++ b/main/astobj2.c
@@ -5780,3 +5780,41 @@ int astobj2_init(void)
return 0;
}
+
+/* XXX TODO BUGBUG and all the other things...
+ * These functions should eventually be moved elsewhere, but the utils folder
+ * won't compile with them in strings.h
+ */
+static int str_hash(const void *obj, const int flags)
+{
+ return ast_str_hash(obj);
+}
+
+static int str_cmp(void *lhs, void *rhs, int flags)
+{
+ return strcmp(lhs, rhs) ? 0 : CMP_MATCH;
+}
+
+struct ao2_container *ast_str_container_alloc_options(enum ao2_container_opts opts, int buckets)
+{
+ return ao2_container_alloc_options(opts, buckets, str_hash, str_cmp);
+}
+
+int ast_str_container_add(struct ao2_container *str_container, const char *add)
+{
+ RAII_VAR(char *, ao2_add, ao2_alloc(strlen(add) + 1, NULL), ao2_cleanup);
+
+ if (!ao2_add) {
+ return -1;
+ }
+
+ /* safe strcpy */
+ strcpy(ao2_add, add);
+ ao2_link(str_container, ao2_add);
+ return 0;
+}
+
+void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
+{
+ ao2_find(str_container, remove, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
+}
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index cb9ed6744..f3293d59b 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -39,11 +39,13 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include <fcntl.h>
#include "asterisk/channel.h"
-#include "asterisk/stringfields.h"
+#include "asterisk/channel_internal.h"
#include "asterisk/data.h"
+#include "asterisk/endpoints.h"
#include "asterisk/indications.h"
#include "asterisk/stasis_channels.h"
-#include "asterisk/channel_internal.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/stringfields.h"
#include "asterisk/test.h"
/*!
@@ -198,6 +200,7 @@ struct ast_channel {
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_topic *topic; /*!< Topic for all channel's events */
struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
+ struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1369,6 +1372,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
ast_string_field_free_memory(chan);
chan->forwarder = stasis_unsubscribe(chan->forwarder);
+ chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
ao2_cleanup(chan->topic);
chan->topic = NULL;
@@ -1389,6 +1393,37 @@ struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
return chan ? chan->topic : ast_channel_topic_all();
}
+int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ast_assert(chan != NULL);
+ ast_assert(endpoint != NULL);
+
+ snapshot = ast_channel_snapshot_create(chan);
+ if (!snapshot) {
+ return -1;
+ }
+
+ msg = stasis_message_create(ast_channel_snapshot_type(), snapshot);
+ if (!msg) {
+ return -1;
+ }
+
+ chan->endpoint_forward =
+ stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint));
+
+ if (chan->endpoint_forward == NULL) {
+ return -1;
+ }
+
+ stasis_publish(ast_endpoint_topic(endpoint), msg);
+
+ return 0;
+}
+
void ast_channel_internal_setup_topics(struct ast_channel *chan)
{
const char *topic_name = chan->uniqueid;
diff --git a/main/endpoints.c b/main/endpoints.c
new file mode 100644
index 000000000..95397f960
--- /dev/null
+++ b/main/endpoints.c
@@ -0,0 +1,338 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Asterisk endpoint API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/stasis_message_router.h"
+#include "asterisk/stringfields.h"
+
+/*! Buckets for endpoint->channel mappings. Keep it prime! */
+#define ENDPOINT_BUCKETS 127
+
+struct ast_endpoint {
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
+ AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
+ AST_STRING_FIELD(id); /*!< tech/resource id */
+ );
+ /*! Endpoint's current state */
+ enum ast_endpoint_state state;
+ /*!
+ * \brief Max channels for this endpoint. -1 means unlimited or unknown.
+ *
+ * Note that this simply documents the limits of an endpoint, and does
+ * nothing to try to enforce the limit.
+ */
+ int max_channels;
+ /*! Topic for this endpoint's messages */
+ struct stasis_topic *topic;
+ /*!
+ * Forwarding subscription sending messages to ast_endpoint_topic_all()
+ */
+ struct stasis_subscription *forward;
+ /*! Router for handling this endpoint's messages */
+ struct stasis_message_router *router;
+ /*! ast_str_container of channels associated with this endpoint */
+ struct ao2_container *channel_ids;
+};
+
+const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
+{
+ switch (state) {
+ case AST_ENDPOINT_UNKNOWN:
+ return "unknown";
+ case AST_ENDPOINT_OFFLINE:
+ return "offline";
+ case AST_ENDPOINT_ONLINE:
+ return "online";
+ }
+ return "?";
+}
+
+static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
+{
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ ast_assert(endpoint != NULL);
+ ast_assert(endpoint->topic != NULL);
+
+ snapshot = ast_endpoint_snapshot_create(endpoint);
+ if (!snapshot) {
+ return;
+ }
+ message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
+ if (!message) {
+ return;
+ }
+ stasis_publish(endpoint->topic, message);
+}
+
+static void endpoint_dtor(void *obj)
+{
+ struct ast_endpoint *endpoint = obj;
+
+ /* The router should be shut down already */
+ ast_assert(endpoint->router == NULL);
+
+ stasis_unsubscribe(endpoint->forward);
+ endpoint->forward = NULL;
+
+ ao2_cleanup(endpoint->topic);
+ endpoint->topic = NULL;
+
+ ast_string_field_free_memory(endpoint);
+}
+
+static void endpoint_channel_snapshot(void *data,
+ struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ struct ast_endpoint *endpoint = data;
+ struct ast_channel_snapshot *snapshot = stasis_message_data(message);
+ RAII_VAR(char *, existing_id, NULL, ao2_cleanup);
+ int publish = 0;
+
+ ast_assert(endpoint != NULL);
+ ast_assert(snapshot != NULL);
+
+ ao2_lock(endpoint);
+ existing_id = ao2_find(endpoint->channel_ids, snapshot->uniqueid,
+ OBJ_POINTER);
+ if (!existing_id) {
+ ast_str_container_add(endpoint->channel_ids,
+ snapshot->uniqueid);
+ publish = 1;
+ }
+ ao2_unlock(endpoint);
+ if (publish) {
+ endpoint_publish_snapshot(endpoint);
+ }
+}
+
+static void endpoint_cache_clear(void *data,
+ struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ struct ast_endpoint *endpoint = data;
+ struct stasis_cache_clear *clear = stasis_message_data(message);
+
+ ast_assert(endpoint != NULL);
+ ast_assert(clear != NULL);
+
+ ao2_lock(endpoint);
+ ao2_find(endpoint->channel_ids, clear->id, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK);
+ ao2_unlock(endpoint);
+ endpoint_publish_snapshot(endpoint);
+}
+
+static void endpoint_default(void *data,
+ struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ struct stasis_endpoint *endpoint = data;
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(endpoint);
+ }
+}
+
+struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
+{
+ RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
+ int r = 0;
+
+ if (ast_strlen_zero(tech)) {
+ ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
+ return NULL;
+ }
+
+ if (ast_strlen_zero(resource)) {
+ ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
+ return NULL;
+ }
+
+ endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
+ if (!endpoint) {
+ return NULL;
+ }
+
+ endpoint->max_channels = -1;
+ endpoint->state = AST_ENDPOINT_UNKNOWN;
+
+ if (ast_string_field_init(endpoint, 80) != 0) {
+ return NULL;
+ }
+
+ ast_string_field_set(endpoint, tech, tech);
+ ast_string_field_set(endpoint, resource, resource);
+ ast_string_field_build(endpoint, id, "%s/%s", tech, resource);
+
+ /* All access to channel_ids should be covered by the endpoint's
+ * lock; no extra lock needed. */
+ endpoint->channel_ids = ast_str_container_alloc_options(
+ AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_BUCKETS);
+ if (!endpoint->channel_ids) {
+ return NULL;
+ }
+
+ endpoint->topic = stasis_topic_create(endpoint->id);
+ if (!endpoint->topic) {
+ return NULL;
+ }
+
+ endpoint->forward =
+ stasis_forward_all(endpoint->topic, ast_endpoint_topic_all());
+ if (!endpoint->forward) {
+ return NULL;
+ }
+
+ endpoint->router = stasis_message_router_create(endpoint->topic);
+ if (!endpoint->router) {
+ return NULL;
+ }
+ r |= stasis_message_router_add(endpoint->router,
+ ast_channel_snapshot_type(), endpoint_channel_snapshot,
+ endpoint);
+ r |= stasis_message_router_add(endpoint->router,
+ stasis_cache_clear_type(), endpoint_cache_clear,
+ endpoint);
+ r |= stasis_message_router_set_default(endpoint->router,
+ endpoint_default, endpoint);
+
+ endpoint_publish_snapshot(endpoint);
+
+ ao2_ref(endpoint, +1);
+ return endpoint;
+}
+
+const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
+{
+ ast_assert(endpoint != NULL);
+ return endpoint->tech;
+}
+
+void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ if (endpoint == NULL) {
+ return;
+ }
+
+ message = stasis_cache_clear_create(ast_endpoint_snapshot_type(), endpoint->id);
+ if (message) {
+ stasis_publish(endpoint->topic, message);
+ }
+
+ stasis_message_router_unsubscribe(endpoint->router);
+ endpoint->router = NULL;
+}
+
+const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
+{
+ return endpoint->resource;
+}
+
+struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
+{
+ return endpoint ? endpoint->topic : ast_endpoint_topic_all();
+}
+
+void ast_endpoint_set_state(struct ast_endpoint *endpoint,
+ enum ast_endpoint_state state)
+{
+ ast_assert(endpoint != NULL);
+ ao2_lock(endpoint);
+ endpoint->state = state;
+ ao2_unlock(endpoint);
+ endpoint_publish_snapshot(endpoint);
+}
+
+void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
+ int max_channels)
+{
+ ast_assert(endpoint != NULL);
+ ao2_lock(endpoint);
+ endpoint->max_channels = max_channels;
+ ao2_unlock(endpoint);
+ endpoint_publish_snapshot(endpoint);
+}
+
+static void endpoint_snapshot_dtor(void *obj)
+{
+ struct ast_endpoint_snapshot *snapshot = obj;
+
+ ast_assert(snapshot != NULL);
+ ast_string_field_free_memory(snapshot);
+}
+
+struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
+ struct ast_endpoint *endpoint)
+{
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ int channel_count;
+ struct ao2_iterator i;
+ void *obj;
+ SCOPED_AO2LOCK(lock, endpoint);
+
+ channel_count = ao2_container_count(endpoint->channel_ids);
+
+ snapshot = ao2_alloc(
+ sizeof(*snapshot) + channel_count * sizeof(char *),
+ endpoint_snapshot_dtor);
+
+ if (ast_string_field_init(snapshot, 80) != 0) {
+ return NULL;
+ }
+
+ ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
+ endpoint->resource);
+ ast_string_field_set(snapshot, tech, endpoint->tech);
+ ast_string_field_set(snapshot, resource, endpoint->resource);
+
+ snapshot->state = endpoint->state;
+ snapshot->max_channels = endpoint->max_channels;
+
+ i = ao2_iterator_init(endpoint->channel_ids, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(char *, channel_id, obj, ao2_cleanup);
+ snapshot->channel_ids[snapshot->num_channels++] = channel_id;
+ }
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index 7b6dc82c9..6d07dc3c5 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -240,54 +240,45 @@ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_top
return cache_dump.cached;
}
-static struct stasis_message_type *__cache_clear_data;
+static struct stasis_message_type *cache_clear_type;
-static struct stasis_message_type *cache_clear_data(void)
+struct stasis_message_type *stasis_cache_clear_type(void)
{
- ast_assert(__cache_clear_data != NULL);
- return __cache_clear_data;
+ ast_assert(cache_clear_type != NULL);
+ return cache_clear_type;
}
-static struct stasis_message_type *__cache_update;
+static struct stasis_message_type *cache_update_type;
struct stasis_message_type *stasis_cache_update_type(void)
{
- ast_assert(__cache_update != NULL);
- return __cache_update;
+ ast_assert(cache_update_type != NULL);
+ return cache_update_type;
}
-struct cache_clear_data {
- struct stasis_message_type *type;
- char *id;
-};
-
-static void cache_clear_data_dtor(void *obj)
+static void cache_clear_dtor(void *obj)
{
- struct cache_clear_data *ev = obj;
- ast_free(ev->id);
- ev->id = NULL;
+ struct stasis_cache_clear *ev = obj;
ao2_cleanup(ev->type);
ev->type = NULL;
}
struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
{
- RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache_clear *, ev, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
+ ev = ao2_alloc(sizeof(*ev) + strlen(id) + 1, cache_clear_dtor);
if (!ev) {
return NULL;
}
- ev->id = ast_strdup(id);
- if (!ev->id) {
- return NULL;
- }
+ /* strcpy safe */
+ strcpy(ev->id, id);
ao2_ref(type, +1);
ev->type = type;
- msg = stasis_message_create(cache_clear_data(), ev);
+ msg = stasis_message_create(stasis_cache_clear_type(), ev);
if (!msg) {
return NULL;
@@ -363,10 +354,10 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
}
/* Handle cache clear event */
- if (cache_clear_data() == stasis_message_type(message)) {
+ if (stasis_cache_clear_type() == stasis_message_type(message)) {
RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
- struct cache_clear_data *clear = stasis_message_data(message);
+ struct stasis_cache_clear *clear = stasis_message_data(message);
ast_assert(clear->type != NULL);
ast_assert(clear->id != NULL);
old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
@@ -374,7 +365,9 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
update = update_create(topic, old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
} else {
- ast_log(LOG_ERROR,
+ /* While this could be a problem, it's very likely to
+ * happen with message forwarding */
+ ast_debug(1,
"Attempting to remove an item from the cache that isn't there: %s %s\n",
stasis_message_type_name(clear->type), clear->id);
}
@@ -449,28 +442,28 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
static void stasis_cache_exit(void)
{
- ao2_cleanup(__cache_clear_data);
- __cache_clear_data = NULL;
- ao2_cleanup(__cache_update);
- __cache_update = NULL;
+ ao2_cleanup(cache_clear_type);
+ cache_clear_type = NULL;
+ ao2_cleanup(cache_update_type);
+ cache_update_type = NULL;
}
int stasis_cache_init(void)
{
ast_register_atexit(stasis_cache_exit);
- if (__cache_clear_data || __cache_update) {
+ if (cache_clear_type || cache_update_type) {
ast_log(LOG_ERROR, "Stasis cache double initialized\n");
return -1;
}
- __cache_update = stasis_message_type_create("stasis_cache_update");
- if (!__cache_update) {
+ cache_update_type = stasis_message_type_create("stasis_cache_update");
+ if (!cache_update_type) {
return -1;
}
- __cache_clear_data = stasis_message_type_create("StasisCacheClear");
- if (!__cache_clear_data) {
+ cache_clear_type = stasis_message_type_create("StasisCacheClear");
+ if (!cache_clear_type) {
return -1;
}
return 0;
diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c
new file mode 100644
index 000000000..424df66c5
--- /dev/null
+++ b/main/stasis_endpoints.c
@@ -0,0 +1,189 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis endpoint API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_endpoints.h"
+
+static struct stasis_message_type *endpoint_snapshot_type;
+
+static struct stasis_topic *endpoint_topic_all;
+
+static struct stasis_caching_topic *endpoint_topic_all_cached;
+
+struct stasis_message_type *ast_endpoint_snapshot_type(void)
+{
+ return endpoint_snapshot_type;
+}
+
+struct stasis_topic *ast_endpoint_topic_all(void)
+{
+ return endpoint_topic_all;
+}
+
+struct stasis_caching_topic *ast_endpoint_topic_all_cached(void)
+{
+ return endpoint_topic_all_cached;
+}
+
+struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
+ const char *name)
+{
+ RAII_VAR(char *, id, NULL, ast_free);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+ struct ast_endpoint_snapshot *snapshot;
+
+ ast_asprintf(&id, "%s/%s", tech, name);
+ if (!id) {
+ return NULL;
+ }
+
+ msg = stasis_cache_get(ast_endpoint_topic_all_cached(),
+ ast_endpoint_snapshot_type(), id);
+ if (!msg) {
+ return NULL;
+ }
+
+ snapshot = stasis_message_data(msg);
+ ast_assert(snapshot != NULL);
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
+
+/*!
+ * \brief Callback extract a unique identity from a snapshot message.
+ *
+ * This identity is unique to the underlying object of the snapshot, such as the
+ * UniqueId field of a channel.
+ *
+ * \param message Message to extract id from.
+ * \return String representing the snapshot's id.
+ * \return \c NULL if the message_type of the message isn't a handled snapshot.
+ * \since 12
+ */
+static const char *endpoint_snapshot_get_id(struct stasis_message *message)
+{
+ struct ast_endpoint_snapshot *snapshot;
+
+ if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
+ return NULL;
+ }
+
+ snapshot = stasis_message_data(message);
+
+ return snapshot->id;
+}
+
+
+static void endpoints_stasis_shutdown(void)
+{
+ ao2_cleanup(endpoint_topic_all);
+ endpoint_topic_all = NULL;
+
+ stasis_caching_unsubscribe(endpoint_topic_all_cached);
+ endpoint_topic_all_cached = NULL;
+}
+
+struct ast_json *ast_endpoint_snapshot_to_json(
+ const struct ast_endpoint_snapshot *snapshot)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ast_json *channel_array;
+ int i;
+
+ json = ast_json_pack("{s: s, s: s, s: s, s: []}",
+ "technology", snapshot->tech,
+ "resource", snapshot->resource,
+ "state", ast_endpoint_state_to_string(snapshot->state),
+ "channels");
+
+ if (json == NULL) {
+ return NULL;
+ }
+
+ if (snapshot->max_channels != -1) {
+ int res = ast_json_object_set(json, "max_channels",
+ ast_json_integer_create(snapshot->max_channels));
+ if (res != 0) {
+ return NULL;
+ }
+ }
+
+ channel_array = ast_json_object_get(json, "channels");
+ ast_assert(channel_array != NULL);
+ for (i = 0; i < snapshot->num_channels; ++i) {
+ int res = ast_json_array_append(channel_array,
+ ast_json_stringf("channel:%s",
+ snapshot->channel_ids[i]));
+ if (res != 0) {
+ return NULL;
+ }
+ }
+
+ return ast_json_ref(json);
+}
+
+int ast_endpoint_stasis_init(void)
+{
+ ast_register_atexit(endpoints_stasis_shutdown);
+
+ if (!endpoint_topic_all) {
+ endpoint_topic_all = stasis_topic_create("endpoint_topic_all");
+ }
+
+ if (!endpoint_topic_all) {
+ return -1;
+ }
+
+ if (!endpoint_topic_all_cached) {
+ endpoint_topic_all_cached =
+ stasis_caching_topic_create(
+ endpoint_topic_all, endpoint_snapshot_get_id);
+ }
+
+ if (!endpoint_topic_all_cached) {
+ return -1;
+ }
+
+ if (!endpoint_snapshot_type) {
+ endpoint_snapshot_type = stasis_message_type_create(
+ "ast_endpoint_snapshot");
+ }
+
+ if (!endpoint_snapshot_type) {
+ return -1;
+ }
+
+ return 0;
+}