diff options
-rw-r--r-- | channels/chan_sip.c | 15 | ||||
-rw-r--r-- | channels/sip/include/sip.h | 2 | ||||
-rw-r--r-- | include/asterisk/astobj2.h | 51 | ||||
-rw-r--r-- | include/asterisk/endpoints.h | 170 | ||||
-rw-r--r-- | include/asterisk/stasis.h | 16 | ||||
-rw-r--r-- | include/asterisk/stasis_endpoints.h | 154 | ||||
-rw-r--r-- | include/asterisk/stasis_test.h | 142 | ||||
-rw-r--r-- | main/asterisk.c | 6 | ||||
-rw-r--r-- | main/astobj2.c | 38 | ||||
-rw-r--r-- | main/channel_internal_api.c | 39 | ||||
-rw-r--r-- | main/endpoints.c | 338 | ||||
-rw-r--r-- | main/stasis_cache.c | 63 | ||||
-rw-r--r-- | main/stasis_endpoints.c | 189 | ||||
-rw-r--r-- | res/res_stasis_http_endpoints.c | 47 | ||||
-rw-r--r-- | res/res_stasis_test.c | 291 | ||||
-rw-r--r-- | res/res_stasis_test.exports.in | 6 | ||||
-rw-r--r-- | res/stasis_http/resource_endpoints.c | 134 | ||||
-rw-r--r-- | res/stasis_http/resource_endpoints.h | 21 | ||||
-rw-r--r-- | rest-api/api-docs/endpoints.json | 37 | ||||
-rw-r--r-- | tests/test_endpoints.c | 158 | ||||
-rw-r--r-- | tests/test_stasis_endpoints.c | 307 |
21 files changed, 2154 insertions, 70 deletions
diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 8cd739e49..7cf5f37dd 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -295,6 +295,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "sip/include/security_events.h" #include "asterisk/sip_api.h" #include "asterisk/app.h" +#include "asterisk/stasis_endpoints.h" /*** DOCUMENTATION <application name="SIPDtmfMode" language="en_US"> @@ -5326,6 +5327,9 @@ static void sip_destroy_peer(struct sip_peer *peer) peer->caps = ast_format_cap_destroy(peer->caps); ast_rtp_dtls_cfg_free(&peer->dtls_cfg); + + ast_endpoint_shutdown(peer->endpoint); + peer->endpoint = NULL; } /*! \brief Update peer data in database (if used) */ @@ -8012,6 +8016,14 @@ static struct ast_channel *sip_new(struct sip_pvt *i, int state, const char *tit return NULL; } + if (i->relatedpeer) { + if (ast_endpoint_add_channel(i->relatedpeer->endpoint, tmp)) { + ast_channel_unref(tmp); + sip_pvt_lock(i); + return NULL; + } + } + /* If we sent in a callid, bind it to the channel. */ if (callid) { ast_channel_callid_set(tmp, callid); @@ -30770,6 +30782,9 @@ static struct sip_peer *build_peer(const char *name, struct ast_variable *v, str if (!(peer = ao2_t_alloc(sizeof(*peer), sip_destroy_peer_fn, "allocate a peer struct"))) { return NULL; } + if (!(peer->endpoint = ast_endpoint_create("SIP", name))) { + return NULL; + } if (!(peer->caps = ast_format_cap_alloc_nolock())) { ao2_t_ref(peer, -1, "failed to allocate format capabilities, drop peer"); return NULL; diff --git a/channels/sip/include/sip.h b/channels/sip/include/sip.h index 090f1bcb2..d852ee945 100644 --- a/channels/sip/include/sip.h +++ b/channels/sip/include/sip.h @@ -1377,6 +1377,8 @@ struct sip_peer { unsigned int disallowed_methods; struct ast_cc_config_params *cc_params; + struct ast_endpoint *endpoint; + struct ast_rtp_dtls_cfg dtls_cfg; }; diff --git a/include/asterisk/astobj2.h b/include/asterisk/astobj2.h index 70a9041c3..502cc744c 100644 --- a/include/asterisk/astobj2.h +++ b/include/asterisk/astobj2.h @@ -1890,4 +1890,55 @@ void __ao2_cleanup_debug(void *obj, const char *file, int line, const char *func #define ao2_cleanup(obj) __ao2_cleanup(obj) #endif void ao2_iterator_cleanup(struct ao2_iterator *iter); + + +/* XXX TODO BUGBUG and all the other things... + * These functions should eventually be moved elsewhere, but the utils folder + * won't compile with them in strings.h + */ + +/*! + * \since 12 + * \brief Allocates a hash container for bare strings + * + * \param buckets The number of buckets to use for the hash container + * + * \retval AO2 container for strings + * \retval NULL if allocation failed + */ +#define ast_str_container_alloc(buckets) ast_str_container_alloc_options(AO2_ALLOC_OPT_LOCK_MUTEX, buckets) + +/*! + * \since 12 + * \brief Allocates a hash container for bare strings + * + * \param opts Options to be provided to the container + * \param buckets The number of buckets to use for the hash container + * + * \retval AO2 container for strings + * \retval NULL if allocation failed + */ +struct ao2_container *ast_str_container_alloc_options(enum ao2_container_opts opts, int buckets); + +/*! + * \since 12 + * \brief Adds a string to a string container allocated by ast_str_container_alloc + * + * \param str_container The container to which to add a string + * \param add The string to add to the container + * + * \retval zero on success + * \retval non-zero if the operation failed + */ +int ast_str_container_add(struct ao2_container *str_container, const char *add); + +/*! + * \since 12 + * \brief Removes a string from a string container allocated by ast_str_container_alloc + * + * \param str_container The container from which to remove a string + * \param remove The string to remove from the container + */ +void ast_str_container_remove(struct ao2_container *str_container, const char *remove); + #endif /* _ASTERISK_ASTOBJ2_H */ diff --git a/include/asterisk/endpoints.h b/include/asterisk/endpoints.h new file mode 100644 index 000000000..b0be1cf38 --- /dev/null +++ b/include/asterisk/endpoints.h @@ -0,0 +1,170 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_ENDPOINTS_H +#define _ASTERISK_ENDPOINTS_H + +/*! \file + * + * \brief Endpoint abstractions. + * + * \author David M. Lee, II <dlee@digium.com> + * \since 12 + * + * An endpoint is an external device/system that may offer/accept channels + * to/from Asterisk. While this is a very useful concept for end users, it is + * surprisingly \a not a core concept within Asterisk iteself. + * + * This file defines \ref ast_endpoint as a seperate object, which channel + * drivers may use to expose their concept of an endpoint. As the channel driver + * creates channels, it can use ast_endpoint_add_channel() to associate channels + * to the endpoint. This updates the endpoint appropriately, and forwards all of + * the channel's events to the endpoint's topic. + * + * In order to avoid excessive locking on the endpoint object itself, the + * mutable state is not accessible via getters. Instead, you can create a + * snapshot using ast_endpoint_snapshot_create() to get a consistent snapshot of + * the internal state. + */ + +#include "asterisk/json.h" + +/*! + * \brief Valid states for an endpoint. + * \since 12 + */ +enum ast_endpoint_state { + /*! The endpoint state is not known. */ + AST_ENDPOINT_UNKNOWN, + /*! The endpoint is not available. */ + AST_ENDPOINT_OFFLINE, + /*! The endpoint is available. */ + AST_ENDPOINT_ONLINE, +}; + +/*! + * \brief Returns a string representation of the given endpoint state. + * + * \param state Endpoint state. + * \return String representation of \a state. + * \return \c "?" if \a state isn't in \ref ast_endpoint_state. + */ +const char *ast_endpoint_state_to_string(enum ast_endpoint_state state); + +/*! + * \brief Opaque struct representing an endpoint. + * + * An endpoint is an external device/system that may offer/accept channels + * to/from Asterisk. + * + * \since 12 + */ +struct ast_endpoint; + +/*! + * \brief Create an endpoint struct. + * + * The endpoint is created with a state of UNKNOWN and max_channels of -1 + * (unlimited). While \ref ast_endpoint is AO2 managed, you have to + * shut it down with ast_endpoint_shutdown() to clean up references from + * subscriptions. + * + * \param tech Technology for this endpoint. + * \param resource Name of this endpoint. + * \return Newly created endpoint. + * \return \c NULL on error. + * \since 12 + */ +struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource); + +/*! + * \brief Shutsdown an \ref ast_endpoint. + * + * \param endpoint Endpoint to shut down. + * \since 12 + */ +void ast_endpoint_shutdown(struct ast_endpoint *endpoint); + +/*! + * \brief Gets the technology of the given endpoint. + * + * This is an immutable string describing the channel provider technology + * (SIP, IAX2, etc.). + * + * \param endpoint The endpoint. + * \return Tec of the endpoint. + * \return \c NULL if endpoint is \c NULL. + * \since 12 + */ +const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint); + +/*! + * \brief Gets the resource name of the given endpoint. + * + * This is unique for the endpoint's technology, and immutable. + * + * \param endpoint The endpoint. + * \return Resource name of the endpoint. + * \return \c NULL if endpoint is \c NULL. + * \since 12 + */ +const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint); + +/*! + * \brief Updates the state of the given endpoint. + * + * \param endpoint Endpoint to modify. + * \param state New state. + * \since 12 + */ +void ast_endpoint_set_state(struct ast_endpoint *endpoint, + enum ast_endpoint_state state); + +/*! + * \brief Updates the maximum number of channels an endpoint supports. + * + * Set to -1 for unlimited channels. + * + * \param endpoint Endpoint to modify. + * \param max_channels Maximum number of concurrent channels this endpoint + * supports. + */ +void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, + int max_channels); + + +/*! + * \since 12 + * \brief Adds a channel to the given endpoint. + * + * This updates the endpoint's statistics, as well as forwarding all of the + * channel's messages to the endpoint's topic. + * + * The channel is automagically removed from the endpoint when it is disposed + * of. + * + * \param endpoint + * \param chan Channel. + * \retval 0 on success. + * \retval Non-zero on error. + */ +int ast_endpoint_add_channel(struct ast_endpoint *endpoint, + struct ast_channel *chan); + + +#endif /* _ASTERISK_ENDPOINTS_H */ diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 07c3c5c6b..48d51e369 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -431,6 +431,22 @@ struct stasis_cache_update { }; /*! + * \brief Cache clear message. + */ +struct stasis_cache_clear { + /*! Type of object being cleared from the cache */ + struct stasis_message_type *type; + /*! Id of the object being cleared from the cache */ + char id[]; +}; + +/*! + * \brief Message type for \ref stasis_cache_clear. + * \since 12 + */ +struct stasis_message_type *stasis_cache_clear_type(void); + +/*! * \brief A message which instructs the caching topic to remove an entry from its cache. * \param type Message type. * \param id Unique id of the snapshot to clear. diff --git a/include/asterisk/stasis_endpoints.h b/include/asterisk/stasis_endpoints.h new file mode 100644 index 000000000..2aeb614e6 --- /dev/null +++ b/include/asterisk/stasis_endpoints.h @@ -0,0 +1,154 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_STASIS_ENDPOINTS_H +#define _ASTERISK_STASIS_ENDPOINTS_H + +/*! \file + * + * \brief Endpoint abstractions. + * + * \author David M. Lee, II <dlee@digium.com> + * \since 12 + */ + +#include "asterisk/endpoints.h" +#include "asterisk/json.h" +#include "asterisk/stasis.h" +#include "asterisk/stringfields.h" + +/*! \addtogroup StasisTopicsAndMessages + * @{ + */ + +/*! + * \brief A snapshot of an endpoint's state. + * + * The id for an endpoint is tech/resource. The duplication is needed because + * there are several cases where any of the three values would be needed, and + * constantly splitting or reassembling would be a pain. + * + * \since 12 + */ +struct ast_endpoint_snapshot { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(id); /*!< unique id for this endpoint. */ + AST_STRING_FIELD(tech); /*!< Channel technology */ + AST_STRING_FIELD(resource); /*!< Tech-unique name */ + ); + + /*! Endpoint state */ + enum ast_endpoint_state state; + /*! + * Maximum number of channels this endpoint supports. If the upper limit + * for an endpoint is unknown, this field is set to -1. + */ + int max_channels; + /*! Number of channels currently active on this endpoint */ + int num_channels; + /*! Channel ids */ + char *channel_ids[]; +}; + +/*! + * \brief Blob of data associated with an endpoint. + * + * The blob is actually a JSON object of structured data. It has a "type" field + * which contains the type string describing this blob. + * + * \since 12 + */ +struct ast_endpoint_blob { + struct ast_endpoint_snapshot *snapshot; + struct ast_json *blob; +}; + +/*! + * \brief Message type for \ref ast_endpoint_snapshot. + * \since 12 + */ +struct stasis_message_type *ast_endpoint_snapshot_type(void); + +/*! + * \brief Create a snapshot of an endpoint + * \param endpoint Endpoint to snap a shot of. + * \return Snapshot of the endpoint. + * \return \c NULL on error. + * \since 12 + */ +struct ast_endpoint_snapshot *ast_endpoint_snapshot_create( + struct ast_endpoint *endpoint); + +/*! + * \brief Returns the topic for a specific endpoint. + * + * \param endpoint The endpoint. + * \return The topic for the given endpoint. + * \return ast_endpoint_topic_all() if endpoint is \c NULL. + * \since 12 + */ +struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint); + +/*! + * \brief Topic for all endpoint releated messages. + * \since 12 + */ +struct stasis_topic *ast_endpoint_topic_all(void); + +/*! + * \brief Cached topic for all endpoint related messages. + * \since 12 + */ +struct stasis_caching_topic *ast_endpoint_topic_all_cached(void); + +/*! + * \brief Retrieve the most recent snapshot for the endpoint with the given + * name. + * + * \param tech Name of the endpoint's technology. + * \param resource Resource name of the endpoint. + * \return Snapshot of the endpoint with the given name. + * \return \c NULL if endpoint is not found, or on error. + * \since 12 + */ +struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, + const char *resource +); + +/*! @} */ + +/*! + * \brief Build a JSON object from a \ref ast_endpoint_snapshot. + * + * \param snapshot Endpoint snapshot. + * \return JSON object representing endpoint snapshot. + * \return \c NULL on error + */ +struct ast_json *ast_endpoint_snapshot_to_json( + const struct ast_endpoint_snapshot *snapshot); + +/*! + * \brief Initialization function for endpoint stasis support. + * + * \return 0 on success. + * \return non-zero on error. + * \since 12 + */ +int ast_endpoint_stasis_init(void); + +#endif /* _ASTERISK_STASIS_ENDPOINTS_H */ diff --git a/include/asterisk/stasis_test.h b/include/asterisk/stasis_test.h new file mode 100644 index 000000000..ad4020a08 --- /dev/null +++ b/include/asterisk/stasis_test.h @@ -0,0 +1,142 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_STASIS_TEST_H +#define _ASTERISK_STASIS_TEST_H + +/*! + * \file \brief Test infrastructure for dealing with Stasis. + * + * \author David M. Lee, II <dlee@digium.com> + * + * This file contains some helpful utilities for testing Stasis related topics + * and messages. The \ref stasis_message_sink is something you can subscribe to + * a topic which will receive all of the messages from the topic. This messages + * are accumulated in its \c messages field. + * + * There are a set of wait functions (stasis_message_sink_wait_for_count(), + * stasis_message_sink_wait_for(), etc.) which will block waiting for conditions + * to be met in the \ref stasis_message_sink. + */ + +#include "asterisk/lock.h" +#include "asterisk/stasis.h" + +#define STASIS_SINK_DEFAULT_WAIT 5000 + +/*! \brief Structure that collects messages from a topic */ +struct stasis_message_sink { + /*! Condition mutex. */ + ast_mutex_t lock; + /*! Condition to signal state changes */ + ast_cond_t cond; + /*! Maximum number of messages messages field can hold without + * realloc */ + size_t max_messages; + /*! Current number of messages in messages field. */ + size_t num_messages; + /*! Boolean flag to be set when unsubscribe is received */ + int is_done:1; + /*! Ordered array of messages received */ + struct stasis_message **messages; +}; + +/*! + * \brief Create a message sink. + * + * This is an AO2 managed object, which you ao2_cleanup() when done. The + * destructor waits for an unsubscribe message to be received, to ensure the + * object isn't disposed of before the topic is finished. + */ +struct stasis_message_sink *stasis_message_sink_create(void); + +/*! + * \brief Topic callback to receive messages. + * + * We return a function pointer instead of simply exposing the function because + * of the vagaries of dlopen(), \c RTLD_LAZY, and function pointers. See the + * comment on the implementation for details why. + * + * \return Function pointer to \ref stasis_message_sink's message handling + * function + */ +stasis_subscription_cb stasis_message_sink_cb(void); + +/*! + * \brief Wait for a sink's num_messages field to reach a certain level. + * + * The optional timeout prevents complete deadlock in a test. + * + * \param sink Sink to wait on. + * \param num_messages sink->num_messages value to wait for. + * \param timeout_millis Number of milliseconds to wait. -1 to wait forever. + * \return Actual sink->num_messages value at return. + * If this is < \a num_messages, then the timeout expired. + */ +int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, + int num_messages, int timeout_millis); + +typedef int (*stasis_wait_cb)(struct stasis_message *msg, const void *data); + +/*! + * \brief Wait for a message that matches the given criteria. + * + * \param sink Sink to wait on. + * \param start Index of message to start with. + * \param cmp_cb comparison function. This returns true (non-zero) on match + * and false (zero) on match. + * \param timeout_millis Number of milliseconds to wait. + * \return Index of the matching message. + * \return Negative for no match. + */ +int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, + stasis_wait_cb cmp_cb, const void *data, int timeout_millis); + +/*! + * \brief Ensures that no new messages are received. + * + * The optional timeout prevents complete deadlock in a test. + * + * \param sink Sink to wait on. + * \param num_messages expecte \a sink->num_messages. + * \param timeout_millis Number of milliseconds to wait for. + * \return Actual sink->num_messages value at return. + * If this is < \a num_messages, then the timeout expired. + */ +int stasis_message_sink_should_stay(struct stasis_message_sink *sink, + int num_messages, int timeout_millis); + +/*! \addtogroup StasisTopicsAndMessages + * @{ + */ + +/*! + * \brief Creates a test message. + */ +struct stasis_message *stasis_test_message_create(void); + +/*! + * \brief Gets the type of messages created by stasis_test_message_create(). + */ +struct stasis_message_type *stasis_test_message_type(void); + +/*! + * @} + */ + +#endif /* _ASTERISK_STASIS_TEST_H */ diff --git a/main/asterisk.c b/main/asterisk.c index b8deab1d4..885797f36 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -242,6 +242,7 @@ int daemon(int, int); /* defined in libresolv of all places */ #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/json.h" +#include "asterisk/stasis_endpoints.h" #include "../defaults.h" @@ -4174,6 +4175,11 @@ int main(int argc, char *argv[]) exit(1); } + if (ast_endpoint_stasis_init()) { + printf("Endpoint initialization failed.\n%s", term_quit()); + exit(1); + } + ast_makesocket(); sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); diff --git a/main/astobj2.c b/main/astobj2.c index a980ec379..8ccc36cb4 100644 --- a/main/astobj2.c +++ b/main/astobj2.c @@ -5780,3 +5780,41 @@ int astobj2_init(void) return 0; } + +/* XXX TODO BUGBUG and all the other things... + * These functions should eventually be moved elsewhere, but the utils folder + * won't compile with them in strings.h + */ +static int str_hash(const void *obj, const int flags) +{ + return ast_str_hash(obj); +} + +static int str_cmp(void *lhs, void *rhs, int flags) +{ + return strcmp(lhs, rhs) ? 0 : CMP_MATCH; +} + +struct ao2_container *ast_str_container_alloc_options(enum ao2_container_opts opts, int buckets) +{ + return ao2_container_alloc_options(opts, buckets, str_hash, str_cmp); +} + +int ast_str_container_add(struct ao2_container *str_container, const char *add) +{ + RAII_VAR(char *, ao2_add, ao2_alloc(strlen(add) + 1, NULL), ao2_cleanup); + + if (!ao2_add) { + return -1; + } + + /* safe strcpy */ + strcpy(ao2_add, add); + ao2_link(str_container, ao2_add); + return 0; +} + +void ast_str_container_remove(struct ao2_container *str_container, const char *remove) +{ + ao2_find(str_container, remove, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK); +} diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index cb9ed6744..f3293d59b 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -39,11 +39,13 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include <fcntl.h> #include "asterisk/channel.h" -#include "asterisk/stringfields.h" +#include "asterisk/channel_internal.h" #include "asterisk/data.h" +#include "asterisk/endpoints.h" #include "asterisk/indications.h" #include "asterisk/stasis_channels.h" -#include "asterisk/channel_internal.h" +#include "asterisk/stasis_endpoints.h" +#include "asterisk/stringfields.h" #include "asterisk/test.h" /*! @@ -198,6 +200,7 @@ struct ast_channel { struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ struct stasis_topic *topic; /*!< Topic for all channel's events */ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */ + struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ }; /* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */ @@ -1369,6 +1372,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) ast_string_field_free_memory(chan); chan->forwarder = stasis_unsubscribe(chan->forwarder); + chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward); ao2_cleanup(chan->topic); chan->topic = NULL; @@ -1389,6 +1393,37 @@ struct stasis_topic *ast_channel_topic(struct ast_channel *chan) return chan ? chan->topic : ast_channel_topic_all(); } +int ast_endpoint_add_channel(struct ast_endpoint *endpoint, + struct ast_channel *chan) +{ + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + ast_assert(chan != NULL); + ast_assert(endpoint != NULL); + + snapshot = ast_channel_snapshot_create(chan); + if (!snapshot) { + return -1; + } + + msg = stasis_message_create(ast_channel_snapshot_type(), snapshot); + if (!msg) { + return -1; + } + + chan->endpoint_forward = + stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint)); + + if (chan->endpoint_forward == NULL) { + return -1; + } + + stasis_publish(ast_endpoint_topic(endpoint), msg); + + return 0; +} + void ast_channel_internal_setup_topics(struct ast_channel *chan) { const char *topic_name = chan->uniqueid; diff --git a/main/endpoints.c b/main/endpoints.c new file mode 100644 index 000000000..95397f960 --- /dev/null +++ b/main/endpoints.c @@ -0,0 +1,338 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Asterisk endpoint API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/endpoints.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_channels.h" +#include "asterisk/stasis_endpoints.h" +#include "asterisk/stasis_message_router.h" +#include "asterisk/stringfields.h" + +/*! Buckets for endpoint->channel mappings. Keep it prime! */ +#define ENDPOINT_BUCKETS 127 + +struct ast_endpoint { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */ + AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */ + AST_STRING_FIELD(id); /*!< tech/resource id */ + ); + /*! Endpoint's current state */ + enum ast_endpoint_state state; + /*! + * \brief Max channels for this endpoint. -1 means unlimited or unknown. + * + * Note that this simply documents the limits of an endpoint, and does + * nothing to try to enforce the limit. + */ + int max_channels; + /*! Topic for this endpoint's messages */ + struct stasis_topic *topic; + /*! + * Forwarding subscription sending messages to ast_endpoint_topic_all() + */ + struct stasis_subscription *forward; + /*! Router for handling this endpoint's messages */ + struct stasis_message_router *router; + /*! ast_str_container of channels associated with this endpoint */ + struct ao2_container *channel_ids; +}; + +const char *ast_endpoint_state_to_string(enum ast_endpoint_state state) +{ + switch (state) { + case AST_ENDPOINT_UNKNOWN: + return "unknown"; + case AST_ENDPOINT_OFFLINE: + return "offline"; + case AST_ENDPOINT_ONLINE: + return "online"; + } + return "?"; +} + +static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) +{ + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + ast_assert(endpoint != NULL); + ast_assert(endpoint->topic != NULL); + + snapshot = ast_endpoint_snapshot_create(endpoint); + if (!snapshot) { + return; + } + message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot); + if (!message) { + return; + } + stasis_publish(endpoint->topic, message); +} + +static void endpoint_dtor(void *obj) +{ + struct ast_endpoint *endpoint = obj; + + /* The router should be shut down already */ + ast_assert(endpoint->router == NULL); + + stasis_unsubscribe(endpoint->forward); + endpoint->forward = NULL; + + ao2_cleanup(endpoint->topic); + endpoint->topic = NULL; + + ast_string_field_free_memory(endpoint); +} + +static void endpoint_channel_snapshot(void *data, + struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_endpoint *endpoint = data; + struct ast_channel_snapshot *snapshot = stasis_message_data(message); + RAII_VAR(char *, existing_id, NULL, ao2_cleanup); + int publish = 0; + + ast_assert(endpoint != NULL); + ast_assert(snapshot != NULL); + + ao2_lock(endpoint); + existing_id = ao2_find(endpoint->channel_ids, snapshot->uniqueid, + OBJ_POINTER); + if (!existing_id) { + ast_str_container_add(endpoint->channel_ids, + snapshot->uniqueid); + publish = 1; + } + ao2_unlock(endpoint); + if (publish) { + endpoint_publish_snapshot(endpoint); + } +} + +static void endpoint_cache_clear(void *data, + struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_endpoint *endpoint = data; + struct stasis_cache_clear *clear = stasis_message_data(message); + + ast_assert(endpoint != NULL); + ast_assert(clear != NULL); + + ao2_lock(endpoint); + ao2_find(endpoint->channel_ids, clear->id, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK); + ao2_unlock(endpoint); + endpoint_publish_snapshot(endpoint); +} + +static void endpoint_default(void *data, + struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_message *message) +{ + struct stasis_endpoint *endpoint = data; + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(endpoint); + } +} + +struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) +{ + RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); + int r = 0; + + if (ast_strlen_zero(tech)) { + ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n"); + return NULL; + } + + if (ast_strlen_zero(resource)) { + ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n"); + return NULL; + } + + endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor); + if (!endpoint) { + return NULL; + } + + endpoint->max_channels = -1; + endpoint->state = AST_ENDPOINT_UNKNOWN; + + if (ast_string_field_init(endpoint, 80) != 0) { + return NULL; + } + + ast_string_field_set(endpoint, tech, tech); + ast_string_field_set(endpoint, resource, resource); + ast_string_field_build(endpoint, id, "%s/%s", tech, resource); + + /* All access to channel_ids should be covered by the endpoint's + * lock; no extra lock needed. */ + endpoint->channel_ids = ast_str_container_alloc_options( + AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_BUCKETS); + if (!endpoint->channel_ids) { + return NULL; + } + + endpoint->topic = stasis_topic_create(endpoint->id); + if (!endpoint->topic) { + return NULL; + } + + endpoint->forward = + stasis_forward_all(endpoint->topic, ast_endpoint_topic_all()); + if (!endpoint->forward) { + return NULL; + } + + endpoint->router = stasis_message_router_create(endpoint->topic); + if (!endpoint->router) { + return NULL; + } + r |= stasis_message_router_add(endpoint->router, + ast_channel_snapshot_type(), endpoint_channel_snapshot, + endpoint); + r |= stasis_message_router_add(endpoint->router, + stasis_cache_clear_type(), endpoint_cache_clear, + endpoint); + r |= stasis_message_router_set_default(endpoint->router, + endpoint_default, endpoint); + + endpoint_publish_snapshot(endpoint); + + ao2_ref(endpoint, +1); + return endpoint; +} + +const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint) +{ + ast_assert(endpoint != NULL); + return endpoint->tech; +} + +void ast_endpoint_shutdown(struct ast_endpoint *endpoint) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + if (endpoint == NULL) { + return; + } + + message = stasis_cache_clear_create(ast_endpoint_snapshot_type(), endpoint->id); + if (message) { + stasis_publish(endpoint->topic, message); + } + + stasis_message_router_unsubscribe(endpoint->router); + endpoint->router = NULL; +} + +const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) +{ + return endpoint->resource; +} + +struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) +{ + return endpoint ? endpoint->topic : ast_endpoint_topic_all(); +} + +void ast_endpoint_set_state(struct ast_endpoint *endpoint, + enum ast_endpoint_state state) +{ + ast_assert(endpoint != NULL); + ao2_lock(endpoint); + endpoint->state = state; + ao2_unlock(endpoint); + endpoint_publish_snapshot(endpoint); +} + +void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, + int max_channels) +{ + ast_assert(endpoint != NULL); + ao2_lock(endpoint); + endpoint->max_channels = max_channels; + ao2_unlock(endpoint); + endpoint_publish_snapshot(endpoint); +} + +static void endpoint_snapshot_dtor(void *obj) +{ + struct ast_endpoint_snapshot *snapshot = obj; + + ast_assert(snapshot != NULL); + ast_string_field_free_memory(snapshot); +} + +struct ast_endpoint_snapshot *ast_endpoint_snapshot_create( + struct ast_endpoint *endpoint) +{ + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + int channel_count; + struct ao2_iterator i; + void *obj; + SCOPED_AO2LOCK(lock, endpoint); + + channel_count = ao2_container_count(endpoint->channel_ids); + + snapshot = ao2_alloc( + sizeof(*snapshot) + channel_count * sizeof(char *), + endpoint_snapshot_dtor); + + if (ast_string_field_init(snapshot, 80) != 0) { + return NULL; + } + + ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech, + endpoint->resource); + ast_string_field_set(snapshot, tech, endpoint->tech); + ast_string_field_set(snapshot, resource, endpoint->resource); + + snapshot->state = endpoint->state; + snapshot->max_channels = endpoint->max_channels; + + i = ao2_iterator_init(endpoint->channel_ids, 0); + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(char *, channel_id, obj, ao2_cleanup); + snapshot->channel_ids[snapshot->num_channels++] = channel_id; + } + + ao2_ref(snapshot, +1); + return snapshot; +} diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 7b6dc82c9..6d07dc3c5 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -240,54 +240,45 @@ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_top return cache_dump.cached; } -static struct stasis_message_type *__cache_clear_data; +static struct stasis_message_type *cache_clear_type; -static struct stasis_message_type *cache_clear_data(void) +struct stasis_message_type *stasis_cache_clear_type(void) { - ast_assert(__cache_clear_data != NULL); - return __cache_clear_data; + ast_assert(cache_clear_type != NULL); + return cache_clear_type; } -static struct stasis_message_type *__cache_update; +static struct stasis_message_type *cache_update_type; struct stasis_message_type *stasis_cache_update_type(void) { - ast_assert(__cache_update != NULL); - return __cache_update; + ast_assert(cache_update_type != NULL); + return cache_update_type; } -struct cache_clear_data { - struct stasis_message_type *type; - char *id; -}; - -static void cache_clear_data_dtor(void *obj) +static void cache_clear_dtor(void *obj) { - struct cache_clear_data *ev = obj; - ast_free(ev->id); - ev->id = NULL; + struct stasis_cache_clear *ev = obj; ao2_cleanup(ev->type); ev->type = NULL; } struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id) { - RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache_clear *, ev, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor); + ev = ao2_alloc(sizeof(*ev) + strlen(id) + 1, cache_clear_dtor); if (!ev) { return NULL; } - ev->id = ast_strdup(id); - if (!ev->id) { - return NULL; - } + /* strcpy safe */ + strcpy(ev->id, id); ao2_ref(type, +1); ev->type = type; - msg = stasis_message_create(cache_clear_data(), ev); + msg = stasis_message_create(stasis_cache_clear_type(), ev); if (!msg) { return NULL; @@ -363,10 +354,10 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru } /* Handle cache clear event */ - if (cache_clear_data() == stasis_message_type(message)) { + if (stasis_cache_clear_type() == stasis_message_type(message)) { RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); - struct cache_clear_data *clear = stasis_message_data(message); + struct stasis_cache_clear *clear = stasis_message_data(message); ast_assert(clear->type != NULL); ast_assert(clear->id != NULL); old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL); @@ -374,7 +365,9 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru update = update_create(topic, old_snapshot, NULL); stasis_publish(caching_topic->topic, update); } else { - ast_log(LOG_ERROR, + /* While this could be a problem, it's very likely to + * happen with message forwarding */ + ast_debug(1, "Attempting to remove an item from the cache that isn't there: %s %s\n", stasis_message_type_name(clear->type), clear->id); } @@ -449,28 +442,28 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or static void stasis_cache_exit(void) { - ao2_cleanup(__cache_clear_data); - __cache_clear_data = NULL; - ao2_cleanup(__cache_update); - __cache_update = NULL; + ao2_cleanup(cache_clear_type); + cache_clear_type = NULL; + ao2_cleanup(cache_update_type); + cache_update_type = NULL; } int stasis_cache_init(void) { ast_register_atexit(stasis_cache_exit); - if (__cache_clear_data || __cache_update) { + if (cache_clear_type || cache_update_type) { ast_log(LOG_ERROR, "Stasis cache double initialized\n"); return -1; } - __cache_update = stasis_message_type_create("stasis_cache_update"); - if (!__cache_update) { + cache_update_type = stasis_message_type_create("stasis_cache_update"); + if (!cache_update_type) { return -1; } - __cache_clear_data = stasis_message_type_create("StasisCacheClear"); - if (!__cache_clear_data) { + cache_clear_type = stasis_message_type_create("StasisCacheClear"); + if (!cache_clear_type) { return -1; } return 0; diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c new file mode 100644 index 000000000..424df66c5 --- /dev/null +++ b/main/stasis_endpoints.c @@ -0,0 +1,189 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis endpoint API. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_endpoints.h" + +static struct stasis_message_type *endpoint_snapshot_type; + +static struct stasis_topic *endpoint_topic_all; + +static struct stasis_caching_topic *endpoint_topic_all_cached; + +struct stasis_message_type *ast_endpoint_snapshot_type(void) +{ + return endpoint_snapshot_type; +} + +struct stasis_topic *ast_endpoint_topic_all(void) +{ + return endpoint_topic_all; +} + +struct stasis_caching_topic *ast_endpoint_topic_all_cached(void) +{ + return endpoint_topic_all_cached; +} + +struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, + const char *name) +{ + RAII_VAR(char *, id, NULL, ast_free); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct ast_endpoint_snapshot *snapshot; + + ast_asprintf(&id, "%s/%s", tech, name); + if (!id) { + return NULL; + } + + msg = stasis_cache_get(ast_endpoint_topic_all_cached(), + ast_endpoint_snapshot_type(), id); + if (!msg) { + return NULL; + } + + snapshot = stasis_message_data(msg); + ast_assert(snapshot != NULL); + + ao2_ref(snapshot, +1); + return snapshot; +} + +/*! + * \brief Callback extract a unique identity from a snapshot message. + * + * This identity is unique to the underlying object of the snapshot, such as the + * UniqueId field of a channel. + * + * \param message Message to extract id from. + * \return String representing the snapshot's id. + * \return \c NULL if the message_type of the message isn't a handled snapshot. + * \since 12 + */ +static const char *endpoint_snapshot_get_id(struct stasis_message *message) +{ + struct ast_endpoint_snapshot *snapshot; + + if (ast_endpoint_snapshot_type() != stasis_message_type(message)) { + return NULL; + } + + snapshot = stasis_message_data(message); + + return snapshot->id; +} + + +static void endpoints_stasis_shutdown(void) +{ + ao2_cleanup(endpoint_topic_all); + endpoint_topic_all = NULL; + + stasis_caching_unsubscribe(endpoint_topic_all_cached); + endpoint_topic_all_cached = NULL; +} + +struct ast_json *ast_endpoint_snapshot_to_json( + const struct ast_endpoint_snapshot *snapshot) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ast_json *channel_array; + int i; + + json = ast_json_pack("{s: s, s: s, s: s, s: []}", + "technology", snapshot->tech, + "resource", snapshot->resource, + "state", ast_endpoint_state_to_string(snapshot->state), + "channels"); + + if (json == NULL) { + return NULL; + } + + if (snapshot->max_channels != -1) { + int res = ast_json_object_set(json, "max_channels", + ast_json_integer_create(snapshot->max_channels)); + if (res != 0) { + return NULL; + } + } + + channel_array = ast_json_object_get(json, "channels"); + ast_assert(channel_array != NULL); + for (i = 0; i < snapshot->num_channels; ++i) { + int res = ast_json_array_append(channel_array, + ast_json_stringf("channel:%s", + snapshot->channel_ids[i])); + if (res != 0) { + return NULL; + } + } + + return ast_json_ref(json); +} + +int ast_endpoint_stasis_init(void) +{ + ast_register_atexit(endpoints_stasis_shutdown); + + if (!endpoint_topic_all) { + endpoint_topic_all = stasis_topic_create("endpoint_topic_all"); + } + + if (!endpoint_topic_all) { + return -1; + } + + if (!endpoint_topic_all_cached) { + endpoint_topic_all_cached = + stasis_caching_topic_create( + endpoint_topic_all, endpoint_snapshot_get_id); + } + + if (!endpoint_topic_all_cached) { + return -1; + } + + if (!endpoint_snapshot_type) { + endpoint_snapshot_type = stasis_message_type_create( + "ast_endpoint_snapshot"); + } + + if (!endpoint_snapshot_type) { + return -1; + } + + return 0; +} diff --git a/res/res_stasis_http_endpoints.c b/res/res_stasis_http_endpoints.c index a420d4ede..a09be1b6e 100644 --- a/res/res_stasis_http_endpoints.c +++ b/res/res_stasis_http_endpoints.c @@ -55,18 +55,32 @@ static void stasis_http_get_endpoints_cb( struct ast_variable *headers, struct stasis_http_response *response) { struct ast_get_endpoints_args args = {}; + stasis_http_get_endpoints(headers, &args, response); +} +/*! + * \brief Parameter parsing callback for /endpoints/{tech}. + * \param get_params GET parameters in the HTTP request. + * \param path_vars Path variables extracted from the request. + * \param headers HTTP headers. + * \param[out] response Response to the HTTP request. + */ +static void stasis_http_get_endpoints_by_tech_cb( + struct ast_variable *get_params, struct ast_variable *path_vars, + struct ast_variable *headers, struct stasis_http_response *response) +{ + struct ast_get_endpoints_by_tech_args args = {}; struct ast_variable *i; - for (i = get_params; i; i = i->next) { - if (strcmp(i->name, "withType") == 0) { - args.with_type = (i->value); + for (i = path_vars; i; i = i->next) { + if (strcmp(i->name, "tech") == 0) { + args.tech = (i->value); } else {} } - stasis_http_get_endpoints(headers, &args, response); + stasis_http_get_endpoints_by_tech(headers, &args, response); } /*! - * \brief Parameter parsing callback for /endpoints/{endpointId}. + * \brief Parameter parsing callback for /endpoints/{tech}/{resource}. * \param get_params GET parameters in the HTTP request. * \param path_vars Path variables extracted from the request. * \param headers HTTP headers. @@ -80,8 +94,11 @@ static void stasis_http_get_endpoint_cb( struct ast_variable *i; for (i = path_vars; i; i = i->next) { - if (strcmp(i->name, "endpointId") == 0) { - args.endpoint_id = (i->value); + if (strcmp(i->name, "tech") == 0) { + args.tech = (i->value); + } else + if (strcmp(i->name, "resource") == 0) { + args.resource = (i->value); } else {} } @@ -89,8 +106,8 @@ static void stasis_http_get_endpoint_cb( } /*! \brief REST handler for /api-docs/endpoints.{format} */ -static struct stasis_rest_handlers endpoints_endpointId = { - .path_segment = "endpointId", +static struct stasis_rest_handlers endpoints_tech_resource = { + .path_segment = "resource", .is_wildcard = 1, .callbacks = { [AST_HTTP_GET] = stasis_http_get_endpoint_cb, @@ -99,13 +116,23 @@ static struct stasis_rest_handlers endpoints_endpointId = { .children = { } }; /*! \brief REST handler for /api-docs/endpoints.{format} */ +static struct stasis_rest_handlers endpoints_tech = { + .path_segment = "tech", + .is_wildcard = 1, + .callbacks = { + [AST_HTTP_GET] = stasis_http_get_endpoints_by_tech_cb, + }, + .num_children = 1, + .children = { &endpoints_tech_resource, } +}; +/*! \brief REST handler for /api-docs/endpoints.{format} */ static struct stasis_rest_handlers endpoints = { .path_segment = "endpoints", .callbacks = { [AST_HTTP_GET] = stasis_http_get_endpoints_cb, }, .num_children = 1, - .children = { &endpoints_endpointId, } + .children = { &endpoints_tech, } }; static int load_module(void) diff --git a/res/res_stasis_test.c b/res/res_stasis_test.c new file mode 100644 index 000000000..7b5aece4f --- /dev/null +++ b/res/res_stasis_test.c @@ -0,0 +1,291 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file \brief Test infrastructure for dealing with Stasis. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); + +#include "asterisk/astobj2.h" +#include "asterisk/module.h" +#include "asterisk/stasis_test.h" + +static struct stasis_message_type *test_message_type; + +static void stasis_message_sink_dtor(void *obj) +{ + struct stasis_message_sink *sink = obj; + + { + SCOPED_MUTEX(lock, &sink->lock); + while (!sink->is_done) { + /* Normally waiting forever is bad, but if we're not + * done, we're not done. */ + ast_cond_wait(&sink->cond, &sink->lock); + } + } + + ast_mutex_destroy(&sink->lock); + ast_cond_destroy(&sink->cond); + + while (sink->num_messages > 0) { + ao2_cleanup(sink->messages[--sink->num_messages]); + } + ast_free(sink->messages); + sink->messages = NULL; + sink->max_messages = 0; +} + +static struct timespec make_deadline(int timeout_millis) +{ + struct timeval start = ast_tvnow(); + struct timeval delta = { + .tv_sec = timeout_millis / 1000, + .tv_usec = (timeout_millis % 1000) * 1000, + }; + struct timeval deadline_tv = ast_tvadd(start, delta); + struct timespec deadline = { + .tv_sec = deadline_tv.tv_sec, + .tv_nsec = 1000 * deadline_tv.tv_usec, + }; + + return deadline; +} + +struct stasis_message_sink *stasis_message_sink_create(void) +{ + RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup); + + sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor); + if (!sink) { + return NULL; + } + ast_mutex_init(&sink->lock); + ast_cond_init(&sink->cond, NULL); + sink->max_messages = 4; + sink->messages = + ast_malloc(sizeof(*sink->messages) * sink->max_messages); + if (!sink->messages) { + return NULL; + } + ao2_ref(sink, +1); + return sink; +} + +/*! + * \brief Implementation of the stasis_message_sink_cb() callback. + * + * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well, + * it has to do with how we load modules. + * + * Modules have their own metadata compiled into them in the AST_MODULE_INFO() + * block. This includes dependency information in the \c nonoptreq field. + * + * Asterisk loads the module, inspects the field, then loads any needed + * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial + * dlopen(), which defers binding function references until they are called. + * + * But when you take the address of a function, that function needs to be + * available at load time. So if some module used the address of + * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then + * that module would fail to load. + * + * The stasis_message_sink_cb() function gives us a layer of indirection so that + * the initial lazy binding will still work as expected. + */ +static void message_sink_cb(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) +{ + struct stasis_message_sink *sink = data; + + SCOPED_MUTEX(lock, &sink->lock); + + if (stasis_subscription_final_message(sub, message)) { + sink->is_done = 1; + ast_cond_signal(&sink->cond); + return; + } + + if (stasis_subscription_change_type() == stasis_message_type(message)) { + /* Ignore subscription changes */ + return; + } + + if (sink->num_messages == sink->max_messages) { + size_t new_max_messages = sink->max_messages * 2; + struct stasis_message **new_messages = ast_realloc( + sink->messages, + sizeof(*new_messages) * new_max_messages); + if (!new_messages) { + return; + } + sink->max_messages = new_max_messages; + sink->messages = new_messages; + } + + ao2_ref(message, +1); + sink->messages[sink->num_messages++] = message; + ast_cond_signal(&sink->cond); +} + +stasis_subscription_cb stasis_message_sink_cb(void) +{ + return message_sink_cb; +} + + +int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, + int num_messages, int timeout_millis) +{ + struct timespec deadline = make_deadline(timeout_millis); + + SCOPED_MUTEX(lock, &sink->lock); + while (sink->num_messages < num_messages) { + int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + break; + } + if (r != 0) { + ast_log(LOG_ERROR, "Unexpected condition error: %s\n", + strerror(r)); + break; + } + } + return sink->num_messages; +} + +int stasis_message_sink_should_stay(struct stasis_message_sink *sink, + int num_messages, int timeout_millis) +{ + struct timespec deadline = make_deadline(timeout_millis); + + SCOPED_MUTEX(lock, &sink->lock); + while (sink->num_messages == num_messages) { + int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + break; + } + if (r != 0) { + ast_log(LOG_ERROR, "Unexpected condition error: %s\n", + strerror(r)); + break; + } + } + return sink->num_messages; +} + +int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, + stasis_wait_cb cmp_cb, const void *data, int timeout_millis) +{ + struct timespec deadline = make_deadline(timeout_millis); + + SCOPED_MUTEX(lock, &sink->lock); + + /* wait for the start */ + while (sink->num_messages < start + 1) { + int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + /* Timed out waiting for the start */ + return -1; + } + if (r != 0) { + ast_log(LOG_ERROR, "Unexpected condition error: %s\n", + strerror(r)); + return -2; + } + } + + + while (!cmp_cb(sink->messages[start], data)) { + ++start; + + while (sink->num_messages < start + 1) { + int r = ast_cond_timedwait(&sink->cond, + &sink->lock, &deadline); + + if (r == ETIMEDOUT) { + return -1; + } + if (r != 0) { + ast_log(LOG_ERROR, + "Unexpected condition error: %s\n", + strerror(r)); + return -2; + } + } + } + + return start; +} + +struct stasis_message *stasis_test_message_create(void) +{ + RAII_VAR(void *, data, NULL, ao2_cleanup); + + /* We just need the unique pointer; don't care what's in it */ + data = ao2_alloc(1, NULL); + if (!data) { + return NULL; + } + + return stasis_message_create(stasis_test_message_type(), data); +} + +struct stasis_message_type *stasis_test_message_type(void) +{ + return test_message_type; +} + +static int unload_module(void) +{ + ao2_cleanup(test_message_type); + test_message_type = NULL; + return 0; +} + +static int load_module(void) +{ + test_message_type = stasis_message_type_create( + "stasis_test_message"); + if (!test_message_type) { + return AST_MODULE_LOAD_FAILURE; + } + + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, + AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, + "Stasis test utilities", + .load = load_module, + .unload = unload_module, + .load_pri = AST_MODPRI_APP_DEPEND, + ); diff --git a/res/res_stasis_test.exports.in b/res/res_stasis_test.exports.in new file mode 100644 index 000000000..961600323 --- /dev/null +++ b/res/res_stasis_test.exports.in @@ -0,0 +1,6 @@ +{ + global: + LINKER_SYMBOL_PREFIXstasis_*; + local: + *; +}; diff --git a/res/stasis_http/resource_endpoints.c b/res/stasis_http/resource_endpoints.c index b2611bad8..52d05c093 100644 --- a/res/stasis_http/resource_endpoints.c +++ b/res/stasis_http/resource_endpoints.c @@ -1,4 +1,4 @@ -/* -*- C -*- +/* * Asterisk -- An open source telephony toolkit. * * Copyright (C) 2012 - 2013, Digium, Inc. @@ -18,26 +18,140 @@ /*! \file * - * \brief Implementation for stasis-http stubs. + * \brief /api-docs/endpoints.{format} implementation- Endpoint resources * * \author David M. Lee, II <dlee@digium.com> */ -/*** MODULEINFO - <support_level>core</support_level> - ***/ - #include "asterisk.h" ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "resource_endpoints.h" -void stasis_http_get_endpoint(struct ast_variable *headers, struct ast_get_endpoint_args *args, struct stasis_http_response *response) +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_endpoints.h" + +void stasis_http_get_endpoints(struct ast_variable *headers, + struct ast_get_endpoints_args *args, + struct stasis_http_response *response) +{ + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ao2_iterator i; + void *obj; + + caching_topic = ast_endpoint_topic_all_cached(); + if (!caching_topic) { + stasis_http_response_error( + response, 500, "Internal Server Error", + "Message bus not initialized"); + return; + } + ao2_ref(caching_topic, +1); + + snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + if (!snapshots) { + stasis_http_response_alloc_failed(response); + return; + } + + json = ast_json_array_create(); + if (!json) { + stasis_http_response_alloc_failed(response); + return; + } + + i = ao2_iterator_init(snapshots, 0); + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup); + struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg); + int r = ast_json_array_append( + json, ast_endpoint_snapshot_to_json(snapshot)); + if (r != 0) { + stasis_http_response_alloc_failed(response); + return; + } + } + ao2_iterator_destroy(&i); + + stasis_http_response_ok(response, ast_json_ref(json)); +} +void stasis_http_get_endpoints_by_tech(struct ast_variable *headers, + struct ast_get_endpoints_by_tech_args *args, + struct stasis_http_response *response) { - ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoint\n"); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ao2_iterator i; + void *obj; + + /* TODO - if tech isn't a recognized type of endpoint, it should 404 */ + + caching_topic = ast_endpoint_topic_all_cached(); + if (!caching_topic) { + stasis_http_response_error( + response, 500, "Internal Server Error", + "Message bus not initialized"); + return; + } + ao2_ref(caching_topic, +1); + + snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + if (!snapshots) { + stasis_http_response_alloc_failed(response); + return; + } + + json = ast_json_array_create(); + if (!json) { + stasis_http_response_alloc_failed(response); + return; + } + + i = ao2_iterator_init(snapshots, 0); + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup); + struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg); + int r; + + if (strcmp(args->tech, snapshot->tech) != 0) { + continue; + } + + r = ast_json_array_append( + json, ast_endpoint_snapshot_to_json(snapshot)); + if (r != 0) { + stasis_http_response_alloc_failed(response); + return; + } + } + ao2_iterator_destroy(&i); + + stasis_http_response_ok(response, ast_json_ref(json)); } -void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response) +void stasis_http_get_endpoint(struct ast_variable *headers, + struct ast_get_endpoint_args *args, + struct stasis_http_response *response) { - ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoints\n"); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + + snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource); + if (!snapshot) { + stasis_http_response_error(response, 404, "Not Found", + "Endpoint not found"); + return; + } + + json = ast_endpoint_snapshot_to_json(snapshot); + if (!json) { + stasis_http_response_alloc_failed(response); + return; + } + + stasis_http_response_ok(response, ast_json_ref(json)); } diff --git a/res/stasis_http/resource_endpoints.h b/res/stasis_http/resource_endpoints.h index 9f5a96e50..b534fb047 100644 --- a/res/stasis_http/resource_endpoints.h +++ b/res/stasis_http/resource_endpoints.h @@ -41,21 +41,34 @@ /*! \brief Argument struct for stasis_http_get_endpoints() */ struct ast_get_endpoints_args { - /*! \brief Filter endpoints by type (sip,iax2,dhadi,...) */ - const char *with_type; }; /*! - * \brief List available endoints. + * \brief List all endoints. * * \param headers HTTP headers * \param args Swagger parameters * \param[out] response HTTP response */ void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response); +/*! \brief Argument struct for stasis_http_get_endpoints_by_tech() */ +struct ast_get_endpoints_by_tech_args { + /*! \brief Technology of the endpoints (sip,iax2,...) */ + const char *tech; +}; +/*! + * \brief List available endoints for a given endpoint technology. + * + * \param headers HTTP headers + * \param args Swagger parameters + * \param[out] response HTTP response + */ +void stasis_http_get_endpoints_by_tech(struct ast_variable *headers, struct ast_get_endpoints_by_tech_args *args, struct stasis_http_response *response); /*! \brief Argument struct for stasis_http_get_endpoint() */ struct ast_get_endpoint_args { + /*! \brief Technology of the endpoint */ + const char *tech; /*! \brief ID of the endpoint */ - const char *endpoint_id; + const char *resource; }; /*! * \brief Details for an endpoint. diff --git a/rest-api/api-docs/endpoints.json b/rest-api/api-docs/endpoints.json index 43b8453d7..d3d77d84a 100644 --- a/rest-api/api-docs/endpoints.json +++ b/rest-api/api-docs/endpoints.json @@ -13,16 +13,26 @@ "operations": [ { "httpMethod": "GET", - "summary": "List available endoints.", + "summary": "List all endoints.", "nickname": "getEndpoints", + "responseClass": "List[Endpoint]" + } + ] + }, + { + "path": "/endpoints/{tech}", + "description": "Asterisk endpoints", + "operations": [ + { + "httpMethod": "GET", + "summary": "List available endoints for a given endpoint technology.", + "nickname": "getEndpointsByTech", "responseClass": "List[Endpoint]", "parameters": [ { - "name": "withType", - "description": "Filter endpoints by type (sip,iax2,dhadi,...)", - "paramType": "query", - "required": false, - "allowMultiple": true, + "name": "tech", + "description": "Technology of the endpoints (sip,iax2,...)", + "paramType": "path", "dataType": "string" } ] @@ -30,7 +40,7 @@ ] }, { - "path": "/endpoints/{endpointId}", + "path": "/endpoints/{tech}/{resource}", "description": "Single endpoint", "operations": [ { @@ -40,7 +50,13 @@ "responseClass": "Endpoint", "parameters": [ { - "name": "endpointId", + "name": "tech", + "description": "Technology of the endpoint", + "paramType": "path", + "dataType": "string" + }, + { + "name": "resource", "description": "ID of the endpoint", "paramType": "path", "dataType": "string" @@ -53,13 +69,16 @@ "models": { "Endpoint": { "id": "Endpoint", + "description": "A snapshot of an endpoint. Unlike most resources, which have a single unique identifier, an endpoint is uniquely identified by the technology/resource pair.", "properties": { "technology": { "type": "string", + "description": "Technology of the endpoint", "required": true }, - "name": { + "resource": { "type": "string", + "description": "Identifier of the endpoint, specific to the given technology.", "required": true } } diff --git a/tests/test_endpoints.c b/tests/test_endpoints.c new file mode 100644 index 000000000..2758e8e78 --- /dev/null +++ b/tests/test_endpoints.c @@ -0,0 +1,158 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file \brief Test endpoints. + * + * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim + * + * \ingroup tests + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/endpoints.h" +#include "asterisk/module.h" +#include "asterisk/stasis_endpoints.h" +#include "asterisk/test.h" + +static const char *test_category = "/core/endpoints/"; + +AST_TEST_DEFINE(create) +{ + RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown); + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test endpoint creation"; + info->description = "Test endpoint creation"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + ast_test_validate(test, NULL == ast_endpoint_create(NULL, NULL)); + ast_test_validate(test, NULL == ast_endpoint_create("", "")); + ast_test_validate(test, NULL == ast_endpoint_create("TEST", "")); + ast_test_validate(test, NULL == ast_endpoint_create("", "test_res")); + + uut = ast_endpoint_create("TEST", "test_res"); + ast_test_validate(test, NULL != uut); + + ast_test_validate(test, + 0 == strcmp("TEST", ast_endpoint_get_tech(uut))); + ast_test_validate(test, + 0 == strcmp("test_res", ast_endpoint_get_resource(uut))); + + return AST_TEST_PASS; +} + + +AST_TEST_DEFINE(defaults) +{ + RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown); + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test defaults for new endpoints"; + info->description = "Test defaults for new endpoints"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + uut = ast_endpoint_create("TEST", "test_res"); + ast_test_validate(test, NULL != uut); + snapshot = ast_endpoint_snapshot_create(uut); + ast_test_validate(test, NULL != snapshot); + + ast_test_validate(test, 0 == strcmp("TEST/test_res", snapshot->id)); + ast_test_validate(test, 0 == strcmp("TEST", snapshot->tech)); + ast_test_validate(test, 0 == strcmp("test_res", snapshot->resource)); + ast_test_validate(test, AST_ENDPOINT_UNKNOWN == snapshot->state); + ast_test_validate(test, -1 == snapshot->max_channels); + ast_test_validate(test, 0 == snapshot->num_channels); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(setters) +{ + RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown); + RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test endpoint setters"; + info->description = "Test endpoint setters"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + uut = ast_endpoint_create("TEST", "test_res"); + ast_test_validate(test, NULL != uut); + + ast_endpoint_set_state(uut, AST_ENDPOINT_ONLINE); + ast_endpoint_set_max_channels(uut, 314159); + + snapshot = ast_endpoint_snapshot_create(uut); + ast_test_validate(test, NULL != snapshot); + + ast_test_validate(test, AST_ENDPOINT_ONLINE == snapshot->state); + ast_test_validate(test, 314159 == snapshot->max_channels); + + return AST_TEST_PASS; +} + +static int unload_module(void) +{ + AST_TEST_UNREGISTER(create); + AST_TEST_UNREGISTER(defaults); + AST_TEST_UNREGISTER(setters); + return 0; +} + +static int load_module(void) +{ + AST_TEST_REGISTER(create); + AST_TEST_REGISTER(defaults); + AST_TEST_REGISTER(setters); + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT, + "Endpoint testing", + .load = load_module, + .unload = unload_module, + ); diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c new file mode 100644 index 000000000..30d0d09ae --- /dev/null +++ b/tests/test_stasis_endpoints.c @@ -0,0 +1,307 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file \brief Test endpoints. + * + * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim + * + * \ingroup tests + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <depend>res_stasis_test</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/channel.h" +#include "asterisk/endpoints.h" +#include "asterisk/module.h" +#include "asterisk/stasis_channels.h" +#include "asterisk/stasis_endpoints.h" +#include "asterisk/stasis_test.h" +#include "asterisk/test.h" + +static const char *test_category = "/stasis/endpoints/"; + +static void safe_channel_hangup(struct ast_channel *chan) +{ + if (!chan) { + return; + } + ast_hangup(chan); +} + +/*! \brief Message matcher looking for cache update messages */ +static int cache_update(struct stasis_message *msg, const void *data) { + struct stasis_cache_update *update; + struct ast_endpoint_snapshot *snapshot; + const char *name = data; + + if (stasis_cache_update_type() != stasis_message_type(msg)) { + return 0; + } + + update = stasis_message_data(msg); + if (ast_endpoint_snapshot_type() != update->type) { + return 0; + } + + snapshot = stasis_message_data(update->old_snapshot); + if (!snapshot) { + snapshot = stasis_message_data(update->new_snapshot); + } + + return 0 == strcmp(name, snapshot->resource); +} + +AST_TEST_DEFINE(state_changes) +{ + RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown); + RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup); + RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + struct stasis_message *msg; + struct stasis_message_type *type; + struct ast_endpoint_snapshot *actual_snapshot; + int actual_count; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test endpoint updates as its state changes"; + info->description = + "Test endpoint updates as its state changes"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + uut = ast_endpoint_create("TEST", __func__); + ast_test_validate(test, NULL != uut); + + sink = stasis_message_sink_create(); + ast_test_validate(test, NULL != sink); + + sub = stasis_subscribe(ast_endpoint_topic(uut), + stasis_message_sink_cb(), sink); + ast_test_validate(test, NULL != sub); + + ast_endpoint_set_state(uut, AST_ENDPOINT_OFFLINE); + actual_count = stasis_message_sink_wait_for_count(sink, 1, + STASIS_SINK_DEFAULT_WAIT); + msg = sink->messages[0]; + type = stasis_message_type(msg); + ast_test_validate(test, ast_endpoint_snapshot_type() == type); + actual_snapshot = stasis_message_data(msg); + ast_test_validate(test, AST_ENDPOINT_OFFLINE == actual_snapshot->state); + + ast_endpoint_set_max_channels(uut, 8675309); + actual_count = stasis_message_sink_wait_for_count(sink, 2, + STASIS_SINK_DEFAULT_WAIT); + msg = sink->messages[1]; + type = stasis_message_type(msg); + ast_test_validate(test, ast_endpoint_snapshot_type() == type); + actual_snapshot = stasis_message_data(msg); + ast_test_validate(test, 8675309 == actual_snapshot->max_channels); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(cache_clear) +{ + RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown); + RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup); + RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + struct stasis_message *msg; + struct stasis_message_type *type; + struct ast_endpoint_snapshot *actual_snapshot; + struct stasis_cache_update *update; + int message_index; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test endpoint state change messages"; + info->description = "Test endpoint state change messages"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + /* Subscribe to the cache topic */ + sink = stasis_message_sink_create(); + ast_test_validate(test, NULL != sink); + + sub = stasis_subscribe( + stasis_caching_get_topic(ast_endpoint_topic_all_cached()), + stasis_message_sink_cb(), sink); + ast_test_validate(test, NULL != sub); + + uut = ast_endpoint_create("TEST", __func__); + ast_test_validate(test, NULL != uut); + + /* Since the cache topic is a singleton (ew), it may have messages from + * elsewheres that it's processing, or maybe even some final messages + * from the prior test. We've got to wait_for our specific message, + * instead of wait_for_count. + */ + message_index = stasis_message_sink_wait_for(sink, 0, + cache_update, __func__, STASIS_SINK_DEFAULT_WAIT); + ast_test_validate(test, 0 <= message_index); + + /* First message should be a cache creation entry for our endpont */ + msg = sink->messages[message_index]; + type = stasis_message_type(msg); + ast_test_validate(test, stasis_cache_update_type() == type); + update = stasis_message_data(msg); + ast_test_validate(test, ast_endpoint_snapshot_type() == update->type); + ast_test_validate(test, NULL == update->old_snapshot); + actual_snapshot = stasis_message_data(update->new_snapshot); + ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech)); + ast_test_validate(test, + 0 == strcmp(__func__, actual_snapshot->resource)); + + ast_endpoint_shutdown(uut); + uut = NULL; + message_index = stasis_message_sink_wait_for(sink, message_index + 1, + cache_update, __func__, STASIS_SINK_DEFAULT_WAIT); + ast_test_validate(test, 0 <= message_index); + /* Now we should have a cache removal entry */ + msg = sink->messages[message_index]; + type = stasis_message_type(msg); + ast_test_validate(test, stasis_cache_update_type() == type); + update = stasis_message_data(msg); + ast_test_validate(test, ast_endpoint_snapshot_type() == update->type); + actual_snapshot = stasis_message_data(update->old_snapshot); + ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech)); + ast_test_validate(test, + 0 == strcmp(__func__, actual_snapshot->resource)); + ast_test_validate(test, NULL == update->new_snapshot); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(channel_messages) +{ + RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown); + RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup); + RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + struct stasis_message *msg; + struct stasis_message_type *type; + struct ast_endpoint_snapshot *actual_snapshot; + int actual_count; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test channel messages on an endpoint topic"; + info->description = + "Test channel messages on an endpoint topic"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + uut = ast_endpoint_create("TEST", __func__); + ast_test_validate(test, NULL != uut); + + sink = stasis_message_sink_create(); + ast_test_validate(test, NULL != sink); + + sub = stasis_subscribe(ast_endpoint_topic(uut), + stasis_message_sink_cb(), sink); + ast_test_validate(test, NULL != sub); + + chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", __func__, "100", + "100", "default", NULL, 0, "TEST/test_res"); + ast_test_validate(test, NULL != chan); + + ast_endpoint_add_channel(uut, chan); + + actual_count = stasis_message_sink_wait_for_count(sink, 2, + STASIS_SINK_DEFAULT_WAIT); + ast_test_validate(test, 2 == actual_count); + + msg = sink->messages[0]; + type = stasis_message_type(msg); + ast_test_validate(test, ast_channel_snapshot_type() == type); + + msg = sink->messages[1]; + type = stasis_message_type(msg); + ast_test_validate(test, ast_endpoint_snapshot_type() == type); + actual_snapshot = stasis_message_data(msg); + ast_test_validate(test, 1 == actual_snapshot->num_channels); + + safe_channel_hangup(chan); + chan = NULL; + + actual_count = stasis_message_sink_wait_for_count(sink, 5, + STASIS_SINK_DEFAULT_WAIT); + ast_test_validate(test, 5 == actual_count); + + msg = sink->messages[2]; + type = stasis_message_type(msg); + ast_test_validate(test, ast_channel_snapshot_type() == type); + + msg = sink->messages[3]; + type = stasis_message_type(msg); + ast_test_validate(test, stasis_cache_clear_type() == type); + + msg = sink->messages[4]; + type = stasis_message_type(msg); + ast_test_validate(test, ast_endpoint_snapshot_type() == type); + actual_snapshot = stasis_message_data(msg); + ast_test_validate(test, 0 == actual_snapshot->num_channels); + + return AST_TEST_PASS; +} + +static int unload_module(void) +{ + AST_TEST_UNREGISTER(state_changes); + AST_TEST_UNREGISTER(cache_clear); + AST_TEST_UNREGISTER(channel_messages); + return 0; +} + +static int load_module(void) +{ + AST_TEST_REGISTER(state_changes); + AST_TEST_REGISTER(cache_clear); + AST_TEST_REGISTER(channel_messages); + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT, + "Endpoint stasis-related testing", + .load = load_module, + .unload = unload_module, + .nonoptreq = "res_stasis_test", + ); |