diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/asterisk.c | 6 | ||||
-rw-r--r-- | main/astobj2.c | 38 | ||||
-rw-r--r-- | main/channel_internal_api.c | 39 | ||||
-rw-r--r-- | main/endpoints.c | 338 | ||||
-rw-r--r-- | main/stasis_cache.c | 63 | ||||
-rw-r--r-- | main/stasis_endpoints.c | 189 |
6 files changed, 636 insertions, 37 deletions
diff --git a/main/asterisk.c b/main/asterisk.c index b8deab1d4..885797f36 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -242,6 +242,7 @@ int daemon(int, int); /* defined in libresolv of all places */ #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/json.h" +#include "asterisk/stasis_endpoints.h" #include "../defaults.h" @@ -4174,6 +4175,11 @@ int main(int argc, char *argv[]) exit(1); } + if (ast_endpoint_stasis_init()) { + printf("Endpoint initialization failed.\n%s", term_quit()); + exit(1); + } + ast_makesocket(); sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); diff --git a/main/astobj2.c b/main/astobj2.c index a980ec379..8ccc36cb4 100644 --- a/main/astobj2.c +++ b/main/astobj2.c @@ -5780,3 +5780,41 @@ int astobj2_init(void) return 0; } + +/* XXX TODO BUGBUG and all the other things... + * These functions should eventually be moved elsewhere, but the utils folder + * won't compile with them in strings.h + */ +static int str_hash(const void *obj, const int flags) +{ + return ast_str_hash(obj); +} + +static int str_cmp(void *lhs, void *rhs, int flags) +{ + return strcmp(lhs, rhs) ? 0 : CMP_MATCH; +} + +struct ao2_container *ast_str_container_alloc_options(enum ao2_container_opts opts, int buckets) +{ + return ao2_container_alloc_options(opts, buckets, str_hash, str_cmp); +} + +int ast_str_container_add(struct ao2_container *str_container, const char *add) +{ + RAII_VAR(char *, ao2_add, ao2_alloc(strlen(add) + 1, NULL), ao2_cleanup); + + if (!ao2_add) { + return -1; + } + + /* safe strcpy */ + strcpy(ao2_add, add); + ao2_link(str_container, ao2_add); + return 0; +} + +void ast_str_container_remove(struct ao2_container *str_container, const char *remove) +{ + ao2_find(str_container, remove, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK); +} diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index cb9ed6744..f3293d59b 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -39,11 +39,13 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include <fcntl.h> #include "asterisk/channel.h" -#include "asterisk/stringfields.h" +#include "asterisk/channel_internal.h" #include "asterisk/data.h" +#include "asterisk/endpoints.h" #include "asterisk/indications.h" #include "asterisk/stasis_channels.h" -#include "asterisk/channel_internal.h" +#include "asterisk/stasis_endpoints.h" +#include "asterisk/stringfields.h" #include "asterisk/test.h" /*! @@ -198,6 +200,7 @@ struct ast_channel { struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ struct stasis_topic *topic; /*!< Topic for all channel's events */ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */ + struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ }; /* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */ @@ -1369,6 +1372,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) ast_string_field_free_memory(chan); chan->forwarder = stasis_unsubscribe(chan->forwarder); + chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward); ao2_cleanup(chan->topic); chan->topic = NULL; @@ -1389,6 +1393,37 @@ struct stasis_topic *ast_channel_topic(struct ast_channel *chan) return chan ? chan->topic : ast_channel_topic_all(); } +int ast_endpoint_add_channel(struct ast_endpoint *endpoint, + struct ast_channel *chan) +{ + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + ast_assert(chan != NULL); + ast_assert(endpoint != NULL); + + snapshot = ast_channel_snapshot_create(chan); + if (!snapshot) { + return -1; + } + + msg = stasis_message_create(ast_channel_snapshot_type(), snapshot); + if (!msg) { + return -1; + } + + chan->endpoint_forward = + stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint)); + + if (chan->endpoint_forward == NULL) { + return -1; + } + + stasis_publish(ast_endpoint_topic(endpoint), msg); + + return 0; +} + void ast_channel_internal_setup_topics(struct ast_channel *chan) { const char *topic_name = chan->uniqueid; diff --git a/main/endpoints.c b/main/endpoints.c new file mode 100644 index 000000000..95397f960 --- /dev/null +++ b/main/endpoints.c @@ -0,0 +1,338 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Asterisk endpoint API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/endpoints.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_channels.h" +#include "asterisk/stasis_endpoints.h" +#include "asterisk/stasis_message_router.h" +#include "asterisk/stringfields.h" + +/*! Buckets for endpoint->channel mappings. Keep it prime! */ +#define ENDPOINT_BUCKETS 127 + +struct ast_endpoint { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */ + AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */ + AST_STRING_FIELD(id); /*!< tech/resource id */ + ); + /*! Endpoint's current state */ + enum ast_endpoint_state state; + /*! + * \brief Max channels for this endpoint. -1 means unlimited or unknown. + * + * Note that this simply documents the limits of an endpoint, and does + * nothing to try to enforce the limit. + */ + int max_channels; + /*! Topic for this endpoint's messages */ + struct stasis_topic *topic; + /*! + * Forwarding subscription sending messages to ast_endpoint_topic_all() + */ + struct stasis_subscription *forward; + /*! Router for handling this endpoint's messages */ + struct stasis_message_router *router; + /*! ast_str_container of channels associated with this endpoint */ + struct ao2_container *channel_ids; +}; + +const char *ast_endpoint_state_to_string(enum ast_endpoint_state state) +{ + switch (state) { + case AST_ENDPOINT_UNKNOWN: + return "unknown"; + case AST_ENDPOINT_OFFLINE: + return "offline"; + case AST_ENDPOINT_ONLINE: + return "online"; + } + return "?"; +} + +static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) +{ + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + ast_assert(endpoint != NULL); + ast_assert(endpoint->topic != NULL); + + snapshot = ast_endpoint_snapshot_create(endpoint); + if (!snapshot) { + return; + } + message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot); + if (!message) { + return; + } + stasis_publish(endpoint->topic, message); +} + +static void endpoint_dtor(void *obj) +{ + struct ast_endpoint *endpoint = obj; + + /* The router should be shut down already */ + ast_assert(endpoint->router == NULL); + + stasis_unsubscribe(endpoint->forward); + endpoint->forward = NULL; + + ao2_cleanup(endpoint->topic); + endpoint->topic = NULL; + + ast_string_field_free_memory(endpoint); +} + +static void endpoint_channel_snapshot(void *data, + struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_endpoint *endpoint = data; + struct ast_channel_snapshot *snapshot = stasis_message_data(message); + RAII_VAR(char *, existing_id, NULL, ao2_cleanup); + int publish = 0; + + ast_assert(endpoint != NULL); + ast_assert(snapshot != NULL); + + ao2_lock(endpoint); + existing_id = ao2_find(endpoint->channel_ids, snapshot->uniqueid, + OBJ_POINTER); + if (!existing_id) { + ast_str_container_add(endpoint->channel_ids, + snapshot->uniqueid); + publish = 1; + } + ao2_unlock(endpoint); + if (publish) { + endpoint_publish_snapshot(endpoint); + } +} + +static void endpoint_cache_clear(void *data, + struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_endpoint *endpoint = data; + struct stasis_cache_clear *clear = stasis_message_data(message); + + ast_assert(endpoint != NULL); + ast_assert(clear != NULL); + + ao2_lock(endpoint); + ao2_find(endpoint->channel_ids, clear->id, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK); + ao2_unlock(endpoint); + endpoint_publish_snapshot(endpoint); +} + +static void endpoint_default(void *data, + struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_message *message) +{ + struct stasis_endpoint *endpoint = data; + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(endpoint); + } +} + +struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) +{ + RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); + int r = 0; + + if (ast_strlen_zero(tech)) { + ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n"); + return NULL; + } + + if (ast_strlen_zero(resource)) { + ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n"); + return NULL; + } + + endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor); + if (!endpoint) { + return NULL; + } + + endpoint->max_channels = -1; + endpoint->state = AST_ENDPOINT_UNKNOWN; + + if (ast_string_field_init(endpoint, 80) != 0) { + return NULL; + } + + ast_string_field_set(endpoint, tech, tech); + ast_string_field_set(endpoint, resource, resource); + ast_string_field_build(endpoint, id, "%s/%s", tech, resource); + + /* All access to channel_ids should be covered by the endpoint's + * lock; no extra lock needed. */ + endpoint->channel_ids = ast_str_container_alloc_options( + AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_BUCKETS); + if (!endpoint->channel_ids) { + return NULL; + } + + endpoint->topic = stasis_topic_create(endpoint->id); + if (!endpoint->topic) { + return NULL; + } + + endpoint->forward = + stasis_forward_all(endpoint->topic, ast_endpoint_topic_all()); + if (!endpoint->forward) { + return NULL; + } + + endpoint->router = stasis_message_router_create(endpoint->topic); + if (!endpoint->router) { + return NULL; + } + r |= stasis_message_router_add(endpoint->router, + ast_channel_snapshot_type(), endpoint_channel_snapshot, + endpoint); + r |= stasis_message_router_add(endpoint->router, + stasis_cache_clear_type(), endpoint_cache_clear, + endpoint); + r |= stasis_message_router_set_default(endpoint->router, + endpoint_default, endpoint); + + endpoint_publish_snapshot(endpoint); + + ao2_ref(endpoint, +1); + return endpoint; +} + +const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint) +{ + ast_assert(endpoint != NULL); + return endpoint->tech; +} + +void ast_endpoint_shutdown(struct ast_endpoint *endpoint) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + if (endpoint == NULL) { + return; + } + + message = stasis_cache_clear_create(ast_endpoint_snapshot_type(), endpoint->id); + if (message) { + stasis_publish(endpoint->topic, message); + } + + stasis_message_router_unsubscribe(endpoint->router); + endpoint->router = NULL; +} + +const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) +{ + return endpoint->resource; +} + +struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) +{ + return endpoint ? endpoint->topic : ast_endpoint_topic_all(); +} + +void ast_endpoint_set_state(struct ast_endpoint *endpoint, + enum ast_endpoint_state state) +{ + ast_assert(endpoint != NULL); + ao2_lock(endpoint); + endpoint->state = state; + ao2_unlock(endpoint); + endpoint_publish_snapshot(endpoint); +} + +void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, + int max_channels) +{ + ast_assert(endpoint != NULL); + ao2_lock(endpoint); + endpoint->max_channels = max_channels; + ao2_unlock(endpoint); + endpoint_publish_snapshot(endpoint); +} + +static void endpoint_snapshot_dtor(void *obj) +{ + struct ast_endpoint_snapshot *snapshot = obj; + + ast_assert(snapshot != NULL); + ast_string_field_free_memory(snapshot); +} + +struct ast_endpoint_snapshot *ast_endpoint_snapshot_create( + struct ast_endpoint *endpoint) +{ + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + int channel_count; + struct ao2_iterator i; + void *obj; + SCOPED_AO2LOCK(lock, endpoint); + + channel_count = ao2_container_count(endpoint->channel_ids); + + snapshot = ao2_alloc( + sizeof(*snapshot) + channel_count * sizeof(char *), + endpoint_snapshot_dtor); + + if (ast_string_field_init(snapshot, 80) != 0) { + return NULL; + } + + ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech, + endpoint->resource); + ast_string_field_set(snapshot, tech, endpoint->tech); + ast_string_field_set(snapshot, resource, endpoint->resource); + + snapshot->state = endpoint->state; + snapshot->max_channels = endpoint->max_channels; + + i = ao2_iterator_init(endpoint->channel_ids, 0); + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(char *, channel_id, obj, ao2_cleanup); + snapshot->channel_ids[snapshot->num_channels++] = channel_id; + } + + ao2_ref(snapshot, +1); + return snapshot; +} diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 7b6dc82c9..6d07dc3c5 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -240,54 +240,45 @@ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_top return cache_dump.cached; } -static struct stasis_message_type *__cache_clear_data; +static struct stasis_message_type *cache_clear_type; -static struct stasis_message_type *cache_clear_data(void) +struct stasis_message_type *stasis_cache_clear_type(void) { - ast_assert(__cache_clear_data != NULL); - return __cache_clear_data; + ast_assert(cache_clear_type != NULL); + return cache_clear_type; } -static struct stasis_message_type *__cache_update; +static struct stasis_message_type *cache_update_type; struct stasis_message_type *stasis_cache_update_type(void) { - ast_assert(__cache_update != NULL); - return __cache_update; + ast_assert(cache_update_type != NULL); + return cache_update_type; } -struct cache_clear_data { - struct stasis_message_type *type; - char *id; -}; - -static void cache_clear_data_dtor(void *obj) +static void cache_clear_dtor(void *obj) { - struct cache_clear_data *ev = obj; - ast_free(ev->id); - ev->id = NULL; + struct stasis_cache_clear *ev = obj; ao2_cleanup(ev->type); ev->type = NULL; } struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id) { - RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache_clear *, ev, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor); + ev = ao2_alloc(sizeof(*ev) + strlen(id) + 1, cache_clear_dtor); if (!ev) { return NULL; } - ev->id = ast_strdup(id); - if (!ev->id) { - return NULL; - } + /* strcpy safe */ + strcpy(ev->id, id); ao2_ref(type, +1); ev->type = type; - msg = stasis_message_create(cache_clear_data(), ev); + msg = stasis_message_create(stasis_cache_clear_type(), ev); if (!msg) { return NULL; @@ -363,10 +354,10 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru } /* Handle cache clear event */ - if (cache_clear_data() == stasis_message_type(message)) { + if (stasis_cache_clear_type() == stasis_message_type(message)) { RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); - struct cache_clear_data *clear = stasis_message_data(message); + struct stasis_cache_clear *clear = stasis_message_data(message); ast_assert(clear->type != NULL); ast_assert(clear->id != NULL); old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL); @@ -374,7 +365,9 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru update = update_create(topic, old_snapshot, NULL); stasis_publish(caching_topic->topic, update); } else { - ast_log(LOG_ERROR, + /* While this could be a problem, it's very likely to + * happen with message forwarding */ + ast_debug(1, "Attempting to remove an item from the cache that isn't there: %s %s\n", stasis_message_type_name(clear->type), clear->id); } @@ -449,28 +442,28 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or static void stasis_cache_exit(void) { - ao2_cleanup(__cache_clear_data); - __cache_clear_data = NULL; - ao2_cleanup(__cache_update); - __cache_update = NULL; + ao2_cleanup(cache_clear_type); + cache_clear_type = NULL; + ao2_cleanup(cache_update_type); + cache_update_type = NULL; } int stasis_cache_init(void) { ast_register_atexit(stasis_cache_exit); - if (__cache_clear_data || __cache_update) { + if (cache_clear_type || cache_update_type) { ast_log(LOG_ERROR, "Stasis cache double initialized\n"); return -1; } - __cache_update = stasis_message_type_create("stasis_cache_update"); - if (!__cache_update) { + cache_update_type = stasis_message_type_create("stasis_cache_update"); + if (!cache_update_type) { return -1; } - __cache_clear_data = stasis_message_type_create("StasisCacheClear"); - if (!__cache_clear_data) { + cache_clear_type = stasis_message_type_create("StasisCacheClear"); + if (!cache_clear_type) { return -1; } return 0; diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c new file mode 100644 index 000000000..424df66c5 --- /dev/null +++ b/main/stasis_endpoints.c @@ -0,0 +1,189 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis endpoint API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_endpoints.h" + +static struct stasis_message_type *endpoint_snapshot_type; + +static struct stasis_topic *endpoint_topic_all; + +static struct stasis_caching_topic *endpoint_topic_all_cached; + +struct stasis_message_type *ast_endpoint_snapshot_type(void) +{ + return endpoint_snapshot_type; +} + +struct stasis_topic *ast_endpoint_topic_all(void) +{ + return endpoint_topic_all; +} + +struct stasis_caching_topic *ast_endpoint_topic_all_cached(void) +{ + return endpoint_topic_all_cached; +} + +struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, + const char *name) +{ + RAII_VAR(char *, id, NULL, ast_free); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct ast_endpoint_snapshot *snapshot; + + ast_asprintf(&id, "%s/%s", tech, name); + if (!id) { + return NULL; + } + + msg = stasis_cache_get(ast_endpoint_topic_all_cached(), + ast_endpoint_snapshot_type(), id); + if (!msg) { + return NULL; + } + + snapshot = stasis_message_data(msg); + ast_assert(snapshot != NULL); + + ao2_ref(snapshot, +1); + return snapshot; +} + +/*! + * \brief Callback extract a unique identity from a snapshot message. + * + * This identity is unique to the underlying object of the snapshot, such as the + * UniqueId field of a channel. + * + * \param message Message to extract id from. + * \return String representing the snapshot's id. + * \return \c NULL if the message_type of the message isn't a handled snapshot. + * \since 12 + */ +static const char *endpoint_snapshot_get_id(struct stasis_message *message) +{ + struct ast_endpoint_snapshot *snapshot; + + if (ast_endpoint_snapshot_type() != stasis_message_type(message)) { + return NULL; + } + + snapshot = stasis_message_data(message); + + return snapshot->id; +} + + +static void endpoints_stasis_shutdown(void) +{ + ao2_cleanup(endpoint_topic_all); + endpoint_topic_all = NULL; + + stasis_caching_unsubscribe(endpoint_topic_all_cached); + endpoint_topic_all_cached = NULL; +} + +struct ast_json *ast_endpoint_snapshot_to_json( + const struct ast_endpoint_snapshot *snapshot) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ast_json *channel_array; + int i; + + json = ast_json_pack("{s: s, s: s, s: s, s: []}", + "technology", snapshot->tech, + "resource", snapshot->resource, + "state", ast_endpoint_state_to_string(snapshot->state), + "channels"); + + if (json == NULL) { + return NULL; + } + + if (snapshot->max_channels != -1) { + int res = ast_json_object_set(json, "max_channels", + ast_json_integer_create(snapshot->max_channels)); + if (res != 0) { + return NULL; + } + } + + channel_array = ast_json_object_get(json, "channels"); + ast_assert(channel_array != NULL); + for (i = 0; i < snapshot->num_channels; ++i) { + int res = ast_json_array_append(channel_array, + ast_json_stringf("channel:%s", + snapshot->channel_ids[i])); + if (res != 0) { + return NULL; + } + } + + return ast_json_ref(json); +} + +int ast_endpoint_stasis_init(void) +{ + ast_register_atexit(endpoints_stasis_shutdown); + + if (!endpoint_topic_all) { + endpoint_topic_all = stasis_topic_create("endpoint_topic_all"); + } + + if (!endpoint_topic_all) { + return -1; + } + + if (!endpoint_topic_all_cached) { + endpoint_topic_all_cached = + stasis_caching_topic_create( + endpoint_topic_all, endpoint_snapshot_get_id); + } + + if (!endpoint_topic_all_cached) { + return -1; + } + + if (!endpoint_snapshot_type) { + endpoint_snapshot_type = stasis_message_type_create( + "ast_endpoint_snapshot"); + } + + if (!endpoint_snapshot_type) { + return -1; + } + + return 0; +} |