summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--channels/chan_sip.c15
-rw-r--r--channels/sip/include/sip.h2
-rw-r--r--include/asterisk/astobj2.h51
-rw-r--r--include/asterisk/endpoints.h170
-rw-r--r--include/asterisk/stasis.h16
-rw-r--r--include/asterisk/stasis_endpoints.h154
-rw-r--r--include/asterisk/stasis_test.h142
-rw-r--r--main/asterisk.c6
-rw-r--r--main/astobj2.c38
-rw-r--r--main/channel_internal_api.c39
-rw-r--r--main/endpoints.c338
-rw-r--r--main/stasis_cache.c63
-rw-r--r--main/stasis_endpoints.c189
-rw-r--r--res/res_stasis_http_endpoints.c47
-rw-r--r--res/res_stasis_test.c291
-rw-r--r--res/res_stasis_test.exports.in6
-rw-r--r--res/stasis_http/resource_endpoints.c134
-rw-r--r--res/stasis_http/resource_endpoints.h21
-rw-r--r--rest-api/api-docs/endpoints.json37
-rw-r--r--tests/test_endpoints.c158
-rw-r--r--tests/test_stasis_endpoints.c307
21 files changed, 2154 insertions, 70 deletions
diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index 8cd739e49..7cf5f37dd 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -295,6 +295,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "sip/include/security_events.h"
#include "asterisk/sip_api.h"
#include "asterisk/app.h"
+#include "asterisk/stasis_endpoints.h"
/*** DOCUMENTATION
<application name="SIPDtmfMode" language="en_US">
@@ -5326,6 +5327,9 @@ static void sip_destroy_peer(struct sip_peer *peer)
peer->caps = ast_format_cap_destroy(peer->caps);
ast_rtp_dtls_cfg_free(&peer->dtls_cfg);
+
+ ast_endpoint_shutdown(peer->endpoint);
+ peer->endpoint = NULL;
}
/*! \brief Update peer data in database (if used) */
@@ -8012,6 +8016,14 @@ static struct ast_channel *sip_new(struct sip_pvt *i, int state, const char *tit
return NULL;
}
+ if (i->relatedpeer) {
+ if (ast_endpoint_add_channel(i->relatedpeer->endpoint, tmp)) {
+ ast_channel_unref(tmp);
+ sip_pvt_lock(i);
+ return NULL;
+ }
+ }
+
/* If we sent in a callid, bind it to the channel. */
if (callid) {
ast_channel_callid_set(tmp, callid);
@@ -30770,6 +30782,9 @@ static struct sip_peer *build_peer(const char *name, struct ast_variable *v, str
if (!(peer = ao2_t_alloc(sizeof(*peer), sip_destroy_peer_fn, "allocate a peer struct"))) {
return NULL;
}
+ if (!(peer->endpoint = ast_endpoint_create("SIP", name))) {
+ return NULL;
+ }
if (!(peer->caps = ast_format_cap_alloc_nolock())) {
ao2_t_ref(peer, -1, "failed to allocate format capabilities, drop peer");
return NULL;
diff --git a/channels/sip/include/sip.h b/channels/sip/include/sip.h
index 090f1bcb2..d852ee945 100644
--- a/channels/sip/include/sip.h
+++ b/channels/sip/include/sip.h
@@ -1377,6 +1377,8 @@ struct sip_peer {
unsigned int disallowed_methods;
struct ast_cc_config_params *cc_params;
+ struct ast_endpoint *endpoint;
+
struct ast_rtp_dtls_cfg dtls_cfg;
};
diff --git a/include/asterisk/astobj2.h b/include/asterisk/astobj2.h
index 70a9041c3..502cc744c 100644
--- a/include/asterisk/astobj2.h
+++ b/include/asterisk/astobj2.h
@@ -1890,4 +1890,55 @@ void __ao2_cleanup_debug(void *obj, const char *file, int line, const char *func
#define ao2_cleanup(obj) __ao2_cleanup(obj)
#endif
void ao2_iterator_cleanup(struct ao2_iterator *iter);
+
+
+/* XXX TODO BUGBUG and all the other things...
+ * These functions should eventually be moved elsewhere, but the utils folder
+ * won't compile with them in strings.h
+ */
+
+/*!
+ * \since 12
+ * \brief Allocates a hash container for bare strings
+ *
+ * \param buckets The number of buckets to use for the hash container
+ *
+ * \retval AO2 container for strings
+ * \retval NULL if allocation failed
+ */
+#define ast_str_container_alloc(buckets) ast_str_container_alloc_options(AO2_ALLOC_OPT_LOCK_MUTEX, buckets)
+
+/*!
+ * \since 12
+ * \brief Allocates a hash container for bare strings
+ *
+ * \param opts Options to be provided to the container
+ * \param buckets The number of buckets to use for the hash container
+ *
+ * \retval AO2 container for strings
+ * \retval NULL if allocation failed
+ */
+struct ao2_container *ast_str_container_alloc_options(enum ao2_container_opts opts, int buckets);
+
+/*!
+ * \since 12
+ * \brief Adds a string to a string container allocated by ast_str_container_alloc
+ *
+ * \param str_container The container to which to add a string
+ * \param add The string to add to the container
+ *
+ * \retval zero on success
+ * \retval non-zero if the operation failed
+ */
+int ast_str_container_add(struct ao2_container *str_container, const char *add);
+
+/*!
+ * \since 12
+ * \brief Removes a string from a string container allocated by ast_str_container_alloc
+ *
+ * \param str_container The container from which to remove a string
+ * \param remove The string to remove from the container
+ */
+void ast_str_container_remove(struct ao2_container *str_container, const char *remove);
+
#endif /* _ASTERISK_ASTOBJ2_H */
diff --git a/include/asterisk/endpoints.h b/include/asterisk/endpoints.h
new file mode 100644
index 000000000..b0be1cf38
--- /dev/null
+++ b/include/asterisk/endpoints.h
@@ -0,0 +1,170 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_ENDPOINTS_H
+#define _ASTERISK_ENDPOINTS_H
+
+/*! \file
+ *
+ * \brief Endpoint abstractions.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ * \since 12
+ *
+ * An endpoint is an external device/system that may offer/accept channels
+ * to/from Asterisk. While this is a very useful concept for end users, it is
+ * surprisingly \a not a core concept within Asterisk iteself.
+ *
+ * This file defines \ref ast_endpoint as a seperate object, which channel
+ * drivers may use to expose their concept of an endpoint. As the channel driver
+ * creates channels, it can use ast_endpoint_add_channel() to associate channels
+ * to the endpoint. This updates the endpoint appropriately, and forwards all of
+ * the channel's events to the endpoint's topic.
+ *
+ * In order to avoid excessive locking on the endpoint object itself, the
+ * mutable state is not accessible via getters. Instead, you can create a
+ * snapshot using ast_endpoint_snapshot_create() to get a consistent snapshot of
+ * the internal state.
+ */
+
+#include "asterisk/json.h"
+
+/*!
+ * \brief Valid states for an endpoint.
+ * \since 12
+ */
+enum ast_endpoint_state {
+ /*! The endpoint state is not known. */
+ AST_ENDPOINT_UNKNOWN,
+ /*! The endpoint is not available. */
+ AST_ENDPOINT_OFFLINE,
+ /*! The endpoint is available. */
+ AST_ENDPOINT_ONLINE,
+};
+
+/*!
+ * \brief Returns a string representation of the given endpoint state.
+ *
+ * \param state Endpoint state.
+ * \return String representation of \a state.
+ * \return \c "?" if \a state isn't in \ref ast_endpoint_state.
+ */
+const char *ast_endpoint_state_to_string(enum ast_endpoint_state state);
+
+/*!
+ * \brief Opaque struct representing an endpoint.
+ *
+ * An endpoint is an external device/system that may offer/accept channels
+ * to/from Asterisk.
+ *
+ * \since 12
+ */
+struct ast_endpoint;
+
+/*!
+ * \brief Create an endpoint struct.
+ *
+ * The endpoint is created with a state of UNKNOWN and max_channels of -1
+ * (unlimited). While \ref ast_endpoint is AO2 managed, you have to
+ * shut it down with ast_endpoint_shutdown() to clean up references from
+ * subscriptions.
+ *
+ * \param tech Technology for this endpoint.
+ * \param resource Name of this endpoint.
+ * \return Newly created endpoint.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource);
+
+/*!
+ * \brief Shutsdown an \ref ast_endpoint.
+ *
+ * \param endpoint Endpoint to shut down.
+ * \since 12
+ */
+void ast_endpoint_shutdown(struct ast_endpoint *endpoint);
+
+/*!
+ * \brief Gets the technology of the given endpoint.
+ *
+ * This is an immutable string describing the channel provider technology
+ * (SIP, IAX2, etc.).
+ *
+ * \param endpoint The endpoint.
+ * \return Tec of the endpoint.
+ * \return \c NULL if endpoint is \c NULL.
+ * \since 12
+ */
+const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint);
+
+/*!
+ * \brief Gets the resource name of the given endpoint.
+ *
+ * This is unique for the endpoint's technology, and immutable.
+ *
+ * \param endpoint The endpoint.
+ * \return Resource name of the endpoint.
+ * \return \c NULL if endpoint is \c NULL.
+ * \since 12
+ */
+const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint);
+
+/*!
+ * \brief Updates the state of the given endpoint.
+ *
+ * \param endpoint Endpoint to modify.
+ * \param state New state.
+ * \since 12
+ */
+void ast_endpoint_set_state(struct ast_endpoint *endpoint,
+ enum ast_endpoint_state state);
+
+/*!
+ * \brief Updates the maximum number of channels an endpoint supports.
+ *
+ * Set to -1 for unlimited channels.
+ *
+ * \param endpoint Endpoint to modify.
+ * \param max_channels Maximum number of concurrent channels this endpoint
+ * supports.
+ */
+void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
+ int max_channels);
+
+
+/*!
+ * \since 12
+ * \brief Adds a channel to the given endpoint.
+ *
+ * This updates the endpoint's statistics, as well as forwarding all of the
+ * channel's messages to the endpoint's topic.
+ *
+ * The channel is automagically removed from the endpoint when it is disposed
+ * of.
+ *
+ * \param endpoint
+ * \param chan Channel.
+ * \retval 0 on success.
+ * \retval Non-zero on error.
+ */
+int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
+ struct ast_channel *chan);
+
+
+#endif /* _ASTERISK_ENDPOINTS_H */
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 07c3c5c6b..48d51e369 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -431,6 +431,22 @@ struct stasis_cache_update {
};
/*!
+ * \brief Cache clear message.
+ */
+struct stasis_cache_clear {
+ /*! Type of object being cleared from the cache */
+ struct stasis_message_type *type;
+ /*! Id of the object being cleared from the cache */
+ char id[];
+};
+
+/*!
+ * \brief Message type for \ref stasis_cache_clear.
+ * \since 12
+ */
+struct stasis_message_type *stasis_cache_clear_type(void);
+
+/*!
* \brief A message which instructs the caching topic to remove an entry from its cache.
* \param type Message type.
* \param id Unique id of the snapshot to clear.
diff --git a/include/asterisk/stasis_endpoints.h b/include/asterisk/stasis_endpoints.h
new file mode 100644
index 000000000..2aeb614e6
--- /dev/null
+++ b/include/asterisk/stasis_endpoints.h
@@ -0,0 +1,154 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_STASIS_ENDPOINTS_H
+#define _ASTERISK_STASIS_ENDPOINTS_H
+
+/*! \file
+ *
+ * \brief Endpoint abstractions.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ * \since 12
+ */
+
+#include "asterisk/endpoints.h"
+#include "asterisk/json.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stringfields.h"
+
+/*! \addtogroup StasisTopicsAndMessages
+ * @{
+ */
+
+/*!
+ * \brief A snapshot of an endpoint's state.
+ *
+ * The id for an endpoint is tech/resource. The duplication is needed because
+ * there are several cases where any of the three values would be needed, and
+ * constantly splitting or reassembling would be a pain.
+ *
+ * \since 12
+ */
+struct ast_endpoint_snapshot {
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(id); /*!< unique id for this endpoint. */
+ AST_STRING_FIELD(tech); /*!< Channel technology */
+ AST_STRING_FIELD(resource); /*!< Tech-unique name */
+ );
+
+ /*! Endpoint state */
+ enum ast_endpoint_state state;
+ /*!
+ * Maximum number of channels this endpoint supports. If the upper limit
+ * for an endpoint is unknown, this field is set to -1.
+ */
+ int max_channels;
+ /*! Number of channels currently active on this endpoint */
+ int num_channels;
+ /*! Channel ids */
+ char *channel_ids[];
+};
+
+/*!
+ * \brief Blob of data associated with an endpoint.
+ *
+ * The blob is actually a JSON object of structured data. It has a "type" field
+ * which contains the type string describing this blob.
+ *
+ * \since 12
+ */
+struct ast_endpoint_blob {
+ struct ast_endpoint_snapshot *snapshot;
+ struct ast_json *blob;
+};
+
+/*!
+ * \brief Message type for \ref ast_endpoint_snapshot.
+ * \since 12
+ */
+struct stasis_message_type *ast_endpoint_snapshot_type(void);
+
+/*!
+ * \brief Create a snapshot of an endpoint
+ * \param endpoint Endpoint to snap a shot of.
+ * \return Snapshot of the endpoint.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
+ struct ast_endpoint *endpoint);
+
+/*!
+ * \brief Returns the topic for a specific endpoint.
+ *
+ * \param endpoint The endpoint.
+ * \return The topic for the given endpoint.
+ * \return ast_endpoint_topic_all() if endpoint is \c NULL.
+ * \since 12
+ */
+struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint);
+
+/*!
+ * \brief Topic for all endpoint releated messages.
+ * \since 12
+ */
+struct stasis_topic *ast_endpoint_topic_all(void);
+
+/*!
+ * \brief Cached topic for all endpoint related messages.
+ * \since 12
+ */
+struct stasis_caching_topic *ast_endpoint_topic_all_cached(void);
+
+/*!
+ * \brief Retrieve the most recent snapshot for the endpoint with the given
+ * name.
+ *
+ * \param tech Name of the endpoint's technology.
+ * \param resource Resource name of the endpoint.
+ * \return Snapshot of the endpoint with the given name.
+ * \return \c NULL if endpoint is not found, or on error.
+ * \since 12
+ */
+struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
+ const char *resource
+);
+
+/*! @} */
+
+/*!
+ * \brief Build a JSON object from a \ref ast_endpoint_snapshot.
+ *
+ * \param snapshot Endpoint snapshot.
+ * \return JSON object representing endpoint snapshot.
+ * \return \c NULL on error
+ */
+struct ast_json *ast_endpoint_snapshot_to_json(
+ const struct ast_endpoint_snapshot *snapshot);
+
+/*!
+ * \brief Initialization function for endpoint stasis support.
+ *
+ * \return 0 on success.
+ * \return non-zero on error.
+ * \since 12
+ */
+int ast_endpoint_stasis_init(void);
+
+#endif /* _ASTERISK_STASIS_ENDPOINTS_H */
diff --git a/include/asterisk/stasis_test.h b/include/asterisk/stasis_test.h
new file mode 100644
index 000000000..ad4020a08
--- /dev/null
+++ b/include/asterisk/stasis_test.h
@@ -0,0 +1,142 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_STASIS_TEST_H
+#define _ASTERISK_STASIS_TEST_H
+
+/*!
+ * \file \brief Test infrastructure for dealing with Stasis.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ *
+ * This file contains some helpful utilities for testing Stasis related topics
+ * and messages. The \ref stasis_message_sink is something you can subscribe to
+ * a topic which will receive all of the messages from the topic. This messages
+ * are accumulated in its \c messages field.
+ *
+ * There are a set of wait functions (stasis_message_sink_wait_for_count(),
+ * stasis_message_sink_wait_for(), etc.) which will block waiting for conditions
+ * to be met in the \ref stasis_message_sink.
+ */
+
+#include "asterisk/lock.h"
+#include "asterisk/stasis.h"
+
+#define STASIS_SINK_DEFAULT_WAIT 5000
+
+/*! \brief Structure that collects messages from a topic */
+struct stasis_message_sink {
+ /*! Condition mutex. */
+ ast_mutex_t lock;
+ /*! Condition to signal state changes */
+ ast_cond_t cond;
+ /*! Maximum number of messages messages field can hold without
+ * realloc */
+ size_t max_messages;
+ /*! Current number of messages in messages field. */
+ size_t num_messages;
+ /*! Boolean flag to be set when unsubscribe is received */
+ int is_done:1;
+ /*! Ordered array of messages received */
+ struct stasis_message **messages;
+};
+
+/*!
+ * \brief Create a message sink.
+ *
+ * This is an AO2 managed object, which you ao2_cleanup() when done. The
+ * destructor waits for an unsubscribe message to be received, to ensure the
+ * object isn't disposed of before the topic is finished.
+ */
+struct stasis_message_sink *stasis_message_sink_create(void);
+
+/*!
+ * \brief Topic callback to receive messages.
+ *
+ * We return a function pointer instead of simply exposing the function because
+ * of the vagaries of dlopen(), \c RTLD_LAZY, and function pointers. See the
+ * comment on the implementation for details why.
+ *
+ * \return Function pointer to \ref stasis_message_sink's message handling
+ * function
+ */
+stasis_subscription_cb stasis_message_sink_cb(void);
+
+/*!
+ * \brief Wait for a sink's num_messages field to reach a certain level.
+ *
+ * The optional timeout prevents complete deadlock in a test.
+ *
+ * \param sink Sink to wait on.
+ * \param num_messages sink->num_messages value to wait for.
+ * \param timeout_millis Number of milliseconds to wait. -1 to wait forever.
+ * \return Actual sink->num_messages value at return.
+ * If this is < \a num_messages, then the timeout expired.
+ */
+int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
+ int num_messages, int timeout_millis);
+
+typedef int (*stasis_wait_cb)(struct stasis_message *msg, const void *data);
+
+/*!
+ * \brief Wait for a message that matches the given criteria.
+ *
+ * \param sink Sink to wait on.
+ * \param start Index of message to start with.
+ * \param cmp_cb comparison function. This returns true (non-zero) on match
+ * and false (zero) on match.
+ * \param timeout_millis Number of milliseconds to wait.
+ * \return Index of the matching message.
+ * \return Negative for no match.
+ */
+int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
+ stasis_wait_cb cmp_cb, const void *data, int timeout_millis);
+
+/*!
+ * \brief Ensures that no new messages are received.
+ *
+ * The optional timeout prevents complete deadlock in a test.
+ *
+ * \param sink Sink to wait on.
+ * \param num_messages expecte \a sink->num_messages.
+ * \param timeout_millis Number of milliseconds to wait for.
+ * \return Actual sink->num_messages value at return.
+ * If this is < \a num_messages, then the timeout expired.
+ */
+int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
+ int num_messages, int timeout_millis);
+
+/*! \addtogroup StasisTopicsAndMessages
+ * @{
+ */
+
+/*!
+ * \brief Creates a test message.
+ */
+struct stasis_message *stasis_test_message_create(void);
+
+/*!
+ * \brief Gets the type of messages created by stasis_test_message_create().
+ */
+struct stasis_message_type *stasis_test_message_type(void);
+
+/*!
+ * @}
+ */
+
+#endif /* _ASTERISK_STASIS_TEST_H */
diff --git a/main/asterisk.c b/main/asterisk.c
index b8deab1d4..885797f36 100644
--- a/main/asterisk.c
+++ b/main/asterisk.c
@@ -242,6 +242,7 @@ int daemon(int, int); /* defined in libresolv of all places */
#include "asterisk/sorcery.h"
#include "asterisk/stasis.h"
#include "asterisk/json.h"
+#include "asterisk/stasis_endpoints.h"
#include "../defaults.h"
@@ -4174,6 +4175,11 @@ int main(int argc, char *argv[])
exit(1);
}
+ if (ast_endpoint_stasis_init()) {
+ printf("Endpoint initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
ast_makesocket();
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
diff --git a/main/astobj2.c b/main/astobj2.c
index a980ec379..8ccc36cb4 100644
--- a/main/astobj2.c
+++ b/main/astobj2.c
@@ -5780,3 +5780,41 @@ int astobj2_init(void)
return 0;
}
+
+/* XXX TODO BUGBUG and all the other things...
+ * These functions should eventually be moved elsewhere, but the utils folder
+ * won't compile with them in strings.h
+ */
+static int str_hash(const void *obj, const int flags)
+{
+ return ast_str_hash(obj);
+}
+
+static int str_cmp(void *lhs, void *rhs, int flags)
+{
+ return strcmp(lhs, rhs) ? 0 : CMP_MATCH;
+}
+
+struct ao2_container *ast_str_container_alloc_options(enum ao2_container_opts opts, int buckets)
+{
+ return ao2_container_alloc_options(opts, buckets, str_hash, str_cmp);
+}
+
+int ast_str_container_add(struct ao2_container *str_container, const char *add)
+{
+ RAII_VAR(char *, ao2_add, ao2_alloc(strlen(add) + 1, NULL), ao2_cleanup);
+
+ if (!ao2_add) {
+ return -1;
+ }
+
+ /* safe strcpy */
+ strcpy(ao2_add, add);
+ ao2_link(str_container, ao2_add);
+ return 0;
+}
+
+void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
+{
+ ao2_find(str_container, remove, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
+}
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index cb9ed6744..f3293d59b 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -39,11 +39,13 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include <fcntl.h>
#include "asterisk/channel.h"
-#include "asterisk/stringfields.h"
+#include "asterisk/channel_internal.h"
#include "asterisk/data.h"
+#include "asterisk/endpoints.h"
#include "asterisk/indications.h"
#include "asterisk/stasis_channels.h"
-#include "asterisk/channel_internal.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/stringfields.h"
#include "asterisk/test.h"
/*!
@@ -198,6 +200,7 @@ struct ast_channel {
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_topic *topic; /*!< Topic for all channel's events */
struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
+ struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1369,6 +1372,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
ast_string_field_free_memory(chan);
chan->forwarder = stasis_unsubscribe(chan->forwarder);
+ chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
ao2_cleanup(chan->topic);
chan->topic = NULL;
@@ -1389,6 +1393,37 @@ struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
return chan ? chan->topic : ast_channel_topic_all();
}
+int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ast_assert(chan != NULL);
+ ast_assert(endpoint != NULL);
+
+ snapshot = ast_channel_snapshot_create(chan);
+ if (!snapshot) {
+ return -1;
+ }
+
+ msg = stasis_message_create(ast_channel_snapshot_type(), snapshot);
+ if (!msg) {
+ return -1;
+ }
+
+ chan->endpoint_forward =
+ stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint));
+
+ if (chan->endpoint_forward == NULL) {
+ return -1;
+ }
+
+ stasis_publish(ast_endpoint_topic(endpoint), msg);
+
+ return 0;
+}
+
void ast_channel_internal_setup_topics(struct ast_channel *chan)
{
const char *topic_name = chan->uniqueid;
diff --git a/main/endpoints.c b/main/endpoints.c
new file mode 100644
index 000000000..95397f960
--- /dev/null
+++ b/main/endpoints.c
@@ -0,0 +1,338 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Asterisk endpoint API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/stasis_message_router.h"
+#include "asterisk/stringfields.h"
+
+/*! Buckets for endpoint->channel mappings. Keep it prime! */
+#define ENDPOINT_BUCKETS 127
+
+struct ast_endpoint {
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
+ AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
+ AST_STRING_FIELD(id); /*!< tech/resource id */
+ );
+ /*! Endpoint's current state */
+ enum ast_endpoint_state state;
+ /*!
+ * \brief Max channels for this endpoint. -1 means unlimited or unknown.
+ *
+ * Note that this simply documents the limits of an endpoint, and does
+ * nothing to try to enforce the limit.
+ */
+ int max_channels;
+ /*! Topic for this endpoint's messages */
+ struct stasis_topic *topic;
+ /*!
+ * Forwarding subscription sending messages to ast_endpoint_topic_all()
+ */
+ struct stasis_subscription *forward;
+ /*! Router for handling this endpoint's messages */
+ struct stasis_message_router *router;
+ /*! ast_str_container of channels associated with this endpoint */
+ struct ao2_container *channel_ids;
+};
+
+const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
+{
+ switch (state) {
+ case AST_ENDPOINT_UNKNOWN:
+ return "unknown";
+ case AST_ENDPOINT_OFFLINE:
+ return "offline";
+ case AST_ENDPOINT_ONLINE:
+ return "online";
+ }
+ return "?";
+}
+
+static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
+{
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ ast_assert(endpoint != NULL);
+ ast_assert(endpoint->topic != NULL);
+
+ snapshot = ast_endpoint_snapshot_create(endpoint);
+ if (!snapshot) {
+ return;
+ }
+ message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
+ if (!message) {
+ return;
+ }
+ stasis_publish(endpoint->topic, message);
+}
+
+static void endpoint_dtor(void *obj)
+{
+ struct ast_endpoint *endpoint = obj;
+
+ /* The router should be shut down already */
+ ast_assert(endpoint->router == NULL);
+
+ stasis_unsubscribe(endpoint->forward);
+ endpoint->forward = NULL;
+
+ ao2_cleanup(endpoint->topic);
+ endpoint->topic = NULL;
+
+ ast_string_field_free_memory(endpoint);
+}
+
+static void endpoint_channel_snapshot(void *data,
+ struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ struct ast_endpoint *endpoint = data;
+ struct ast_channel_snapshot *snapshot = stasis_message_data(message);
+ RAII_VAR(char *, existing_id, NULL, ao2_cleanup);
+ int publish = 0;
+
+ ast_assert(endpoint != NULL);
+ ast_assert(snapshot != NULL);
+
+ ao2_lock(endpoint);
+ existing_id = ao2_find(endpoint->channel_ids, snapshot->uniqueid,
+ OBJ_POINTER);
+ if (!existing_id) {
+ ast_str_container_add(endpoint->channel_ids,
+ snapshot->uniqueid);
+ publish = 1;
+ }
+ ao2_unlock(endpoint);
+ if (publish) {
+ endpoint_publish_snapshot(endpoint);
+ }
+}
+
+static void endpoint_cache_clear(void *data,
+ struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ struct ast_endpoint *endpoint = data;
+ struct stasis_cache_clear *clear = stasis_message_data(message);
+
+ ast_assert(endpoint != NULL);
+ ast_assert(clear != NULL);
+
+ ao2_lock(endpoint);
+ ao2_find(endpoint->channel_ids, clear->id, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK);
+ ao2_unlock(endpoint);
+ endpoint_publish_snapshot(endpoint);
+}
+
+static void endpoint_default(void *data,
+ struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ struct stasis_endpoint *endpoint = data;
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(endpoint);
+ }
+}
+
+struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
+{
+ RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
+ int r = 0;
+
+ if (ast_strlen_zero(tech)) {
+ ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
+ return NULL;
+ }
+
+ if (ast_strlen_zero(resource)) {
+ ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
+ return NULL;
+ }
+
+ endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
+ if (!endpoint) {
+ return NULL;
+ }
+
+ endpoint->max_channels = -1;
+ endpoint->state = AST_ENDPOINT_UNKNOWN;
+
+ if (ast_string_field_init(endpoint, 80) != 0) {
+ return NULL;
+ }
+
+ ast_string_field_set(endpoint, tech, tech);
+ ast_string_field_set(endpoint, resource, resource);
+ ast_string_field_build(endpoint, id, "%s/%s", tech, resource);
+
+ /* All access to channel_ids should be covered by the endpoint's
+ * lock; no extra lock needed. */
+ endpoint->channel_ids = ast_str_container_alloc_options(
+ AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_BUCKETS);
+ if (!endpoint->channel_ids) {
+ return NULL;
+ }
+
+ endpoint->topic = stasis_topic_create(endpoint->id);
+ if (!endpoint->topic) {
+ return NULL;
+ }
+
+ endpoint->forward =
+ stasis_forward_all(endpoint->topic, ast_endpoint_topic_all());
+ if (!endpoint->forward) {
+ return NULL;
+ }
+
+ endpoint->router = stasis_message_router_create(endpoint->topic);
+ if (!endpoint->router) {
+ return NULL;
+ }
+ r |= stasis_message_router_add(endpoint->router,
+ ast_channel_snapshot_type(), endpoint_channel_snapshot,
+ endpoint);
+ r |= stasis_message_router_add(endpoint->router,
+ stasis_cache_clear_type(), endpoint_cache_clear,
+ endpoint);
+ r |= stasis_message_router_set_default(endpoint->router,
+ endpoint_default, endpoint);
+
+ endpoint_publish_snapshot(endpoint);
+
+ ao2_ref(endpoint, +1);
+ return endpoint;
+}
+
+const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
+{
+ ast_assert(endpoint != NULL);
+ return endpoint->tech;
+}
+
+void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ if (endpoint == NULL) {
+ return;
+ }
+
+ message = stasis_cache_clear_create(ast_endpoint_snapshot_type(), endpoint->id);
+ if (message) {
+ stasis_publish(endpoint->topic, message);
+ }
+
+ stasis_message_router_unsubscribe(endpoint->router);
+ endpoint->router = NULL;
+}
+
+const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
+{
+ return endpoint->resource;
+}
+
+struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
+{
+ return endpoint ? endpoint->topic : ast_endpoint_topic_all();
+}
+
+void ast_endpoint_set_state(struct ast_endpoint *endpoint,
+ enum ast_endpoint_state state)
+{
+ ast_assert(endpoint != NULL);
+ ao2_lock(endpoint);
+ endpoint->state = state;
+ ao2_unlock(endpoint);
+ endpoint_publish_snapshot(endpoint);
+}
+
+void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
+ int max_channels)
+{
+ ast_assert(endpoint != NULL);
+ ao2_lock(endpoint);
+ endpoint->max_channels = max_channels;
+ ao2_unlock(endpoint);
+ endpoint_publish_snapshot(endpoint);
+}
+
+static void endpoint_snapshot_dtor(void *obj)
+{
+ struct ast_endpoint_snapshot *snapshot = obj;
+
+ ast_assert(snapshot != NULL);
+ ast_string_field_free_memory(snapshot);
+}
+
+struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
+ struct ast_endpoint *endpoint)
+{
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ int channel_count;
+ struct ao2_iterator i;
+ void *obj;
+ SCOPED_AO2LOCK(lock, endpoint);
+
+ channel_count = ao2_container_count(endpoint->channel_ids);
+
+ snapshot = ao2_alloc(
+ sizeof(*snapshot) + channel_count * sizeof(char *),
+ endpoint_snapshot_dtor);
+
+ if (ast_string_field_init(snapshot, 80) != 0) {
+ return NULL;
+ }
+
+ ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
+ endpoint->resource);
+ ast_string_field_set(snapshot, tech, endpoint->tech);
+ ast_string_field_set(snapshot, resource, endpoint->resource);
+
+ snapshot->state = endpoint->state;
+ snapshot->max_channels = endpoint->max_channels;
+
+ i = ao2_iterator_init(endpoint->channel_ids, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(char *, channel_id, obj, ao2_cleanup);
+ snapshot->channel_ids[snapshot->num_channels++] = channel_id;
+ }
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index 7b6dc82c9..6d07dc3c5 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -240,54 +240,45 @@ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_top
return cache_dump.cached;
}
-static struct stasis_message_type *__cache_clear_data;
+static struct stasis_message_type *cache_clear_type;
-static struct stasis_message_type *cache_clear_data(void)
+struct stasis_message_type *stasis_cache_clear_type(void)
{
- ast_assert(__cache_clear_data != NULL);
- return __cache_clear_data;
+ ast_assert(cache_clear_type != NULL);
+ return cache_clear_type;
}
-static struct stasis_message_type *__cache_update;
+static struct stasis_message_type *cache_update_type;
struct stasis_message_type *stasis_cache_update_type(void)
{
- ast_assert(__cache_update != NULL);
- return __cache_update;
+ ast_assert(cache_update_type != NULL);
+ return cache_update_type;
}
-struct cache_clear_data {
- struct stasis_message_type *type;
- char *id;
-};
-
-static void cache_clear_data_dtor(void *obj)
+static void cache_clear_dtor(void *obj)
{
- struct cache_clear_data *ev = obj;
- ast_free(ev->id);
- ev->id = NULL;
+ struct stasis_cache_clear *ev = obj;
ao2_cleanup(ev->type);
ev->type = NULL;
}
struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
{
- RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache_clear *, ev, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
+ ev = ao2_alloc(sizeof(*ev) + strlen(id) + 1, cache_clear_dtor);
if (!ev) {
return NULL;
}
- ev->id = ast_strdup(id);
- if (!ev->id) {
- return NULL;
- }
+ /* strcpy safe */
+ strcpy(ev->id, id);
ao2_ref(type, +1);
ev->type = type;
- msg = stasis_message_create(cache_clear_data(), ev);
+ msg = stasis_message_create(stasis_cache_clear_type(), ev);
if (!msg) {
return NULL;
@@ -363,10 +354,10 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
}
/* Handle cache clear event */
- if (cache_clear_data() == stasis_message_type(message)) {
+ if (stasis_cache_clear_type() == stasis_message_type(message)) {
RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
- struct cache_clear_data *clear = stasis_message_data(message);
+ struct stasis_cache_clear *clear = stasis_message_data(message);
ast_assert(clear->type != NULL);
ast_assert(clear->id != NULL);
old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
@@ -374,7 +365,9 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
update = update_create(topic, old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
} else {
- ast_log(LOG_ERROR,
+ /* While this could be a problem, it's very likely to
+ * happen with message forwarding */
+ ast_debug(1,
"Attempting to remove an item from the cache that isn't there: %s %s\n",
stasis_message_type_name(clear->type), clear->id);
}
@@ -449,28 +442,28 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
static void stasis_cache_exit(void)
{
- ao2_cleanup(__cache_clear_data);
- __cache_clear_data = NULL;
- ao2_cleanup(__cache_update);
- __cache_update = NULL;
+ ao2_cleanup(cache_clear_type);
+ cache_clear_type = NULL;
+ ao2_cleanup(cache_update_type);
+ cache_update_type = NULL;
}
int stasis_cache_init(void)
{
ast_register_atexit(stasis_cache_exit);
- if (__cache_clear_data || __cache_update) {
+ if (cache_clear_type || cache_update_type) {
ast_log(LOG_ERROR, "Stasis cache double initialized\n");
return -1;
}
- __cache_update = stasis_message_type_create("stasis_cache_update");
- if (!__cache_update) {
+ cache_update_type = stasis_message_type_create("stasis_cache_update");
+ if (!cache_update_type) {
return -1;
}
- __cache_clear_data = stasis_message_type_create("StasisCacheClear");
- if (!__cache_clear_data) {
+ cache_clear_type = stasis_message_type_create("StasisCacheClear");
+ if (!cache_clear_type) {
return -1;
}
return 0;
diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c
new file mode 100644
index 000000000..424df66c5
--- /dev/null
+++ b/main/stasis_endpoints.c
@@ -0,0 +1,189 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis endpoint API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_endpoints.h"
+
+static struct stasis_message_type *endpoint_snapshot_type;
+
+static struct stasis_topic *endpoint_topic_all;
+
+static struct stasis_caching_topic *endpoint_topic_all_cached;
+
+struct stasis_message_type *ast_endpoint_snapshot_type(void)
+{
+ return endpoint_snapshot_type;
+}
+
+struct stasis_topic *ast_endpoint_topic_all(void)
+{
+ return endpoint_topic_all;
+}
+
+struct stasis_caching_topic *ast_endpoint_topic_all_cached(void)
+{
+ return endpoint_topic_all_cached;
+}
+
+struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
+ const char *name)
+{
+ RAII_VAR(char *, id, NULL, ast_free);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+ struct ast_endpoint_snapshot *snapshot;
+
+ ast_asprintf(&id, "%s/%s", tech, name);
+ if (!id) {
+ return NULL;
+ }
+
+ msg = stasis_cache_get(ast_endpoint_topic_all_cached(),
+ ast_endpoint_snapshot_type(), id);
+ if (!msg) {
+ return NULL;
+ }
+
+ snapshot = stasis_message_data(msg);
+ ast_assert(snapshot != NULL);
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
+
+/*!
+ * \brief Callback extract a unique identity from a snapshot message.
+ *
+ * This identity is unique to the underlying object of the snapshot, such as the
+ * UniqueId field of a channel.
+ *
+ * \param message Message to extract id from.
+ * \return String representing the snapshot's id.
+ * \return \c NULL if the message_type of the message isn't a handled snapshot.
+ * \since 12
+ */
+static const char *endpoint_snapshot_get_id(struct stasis_message *message)
+{
+ struct ast_endpoint_snapshot *snapshot;
+
+ if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
+ return NULL;
+ }
+
+ snapshot = stasis_message_data(message);
+
+ return snapshot->id;
+}
+
+
+static void endpoints_stasis_shutdown(void)
+{
+ ao2_cleanup(endpoint_topic_all);
+ endpoint_topic_all = NULL;
+
+ stasis_caching_unsubscribe(endpoint_topic_all_cached);
+ endpoint_topic_all_cached = NULL;
+}
+
+struct ast_json *ast_endpoint_snapshot_to_json(
+ const struct ast_endpoint_snapshot *snapshot)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ast_json *channel_array;
+ int i;
+
+ json = ast_json_pack("{s: s, s: s, s: s, s: []}",
+ "technology", snapshot->tech,
+ "resource", snapshot->resource,
+ "state", ast_endpoint_state_to_string(snapshot->state),
+ "channels");
+
+ if (json == NULL) {
+ return NULL;
+ }
+
+ if (snapshot->max_channels != -1) {
+ int res = ast_json_object_set(json, "max_channels",
+ ast_json_integer_create(snapshot->max_channels));
+ if (res != 0) {
+ return NULL;
+ }
+ }
+
+ channel_array = ast_json_object_get(json, "channels");
+ ast_assert(channel_array != NULL);
+ for (i = 0; i < snapshot->num_channels; ++i) {
+ int res = ast_json_array_append(channel_array,
+ ast_json_stringf("channel:%s",
+ snapshot->channel_ids[i]));
+ if (res != 0) {
+ return NULL;
+ }
+ }
+
+ return ast_json_ref(json);
+}
+
+int ast_endpoint_stasis_init(void)
+{
+ ast_register_atexit(endpoints_stasis_shutdown);
+
+ if (!endpoint_topic_all) {
+ endpoint_topic_all = stasis_topic_create("endpoint_topic_all");
+ }
+
+ if (!endpoint_topic_all) {
+ return -1;
+ }
+
+ if (!endpoint_topic_all_cached) {
+ endpoint_topic_all_cached =
+ stasis_caching_topic_create(
+ endpoint_topic_all, endpoint_snapshot_get_id);
+ }
+
+ if (!endpoint_topic_all_cached) {
+ return -1;
+ }
+
+ if (!endpoint_snapshot_type) {
+ endpoint_snapshot_type = stasis_message_type_create(
+ "ast_endpoint_snapshot");
+ }
+
+ if (!endpoint_snapshot_type) {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/res/res_stasis_http_endpoints.c b/res/res_stasis_http_endpoints.c
index a420d4ede..a09be1b6e 100644
--- a/res/res_stasis_http_endpoints.c
+++ b/res/res_stasis_http_endpoints.c
@@ -55,18 +55,32 @@ static void stasis_http_get_endpoints_cb(
struct ast_variable *headers, struct stasis_http_response *response)
{
struct ast_get_endpoints_args args = {};
+ stasis_http_get_endpoints(headers, &args, response);
+}
+/*!
+ * \brief Parameter parsing callback for /endpoints/{tech}.
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param[out] response Response to the HTTP request.
+ */
+static void stasis_http_get_endpoints_by_tech_cb(
+ struct ast_variable *get_params, struct ast_variable *path_vars,
+ struct ast_variable *headers, struct stasis_http_response *response)
+{
+ struct ast_get_endpoints_by_tech_args args = {};
struct ast_variable *i;
- for (i = get_params; i; i = i->next) {
- if (strcmp(i->name, "withType") == 0) {
- args.with_type = (i->value);
+ for (i = path_vars; i; i = i->next) {
+ if (strcmp(i->name, "tech") == 0) {
+ args.tech = (i->value);
} else
{}
}
- stasis_http_get_endpoints(headers, &args, response);
+ stasis_http_get_endpoints_by_tech(headers, &args, response);
}
/*!
- * \brief Parameter parsing callback for /endpoints/{endpointId}.
+ * \brief Parameter parsing callback for /endpoints/{tech}/{resource}.
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
@@ -80,8 +94,11 @@ static void stasis_http_get_endpoint_cb(
struct ast_variable *i;
for (i = path_vars; i; i = i->next) {
- if (strcmp(i->name, "endpointId") == 0) {
- args.endpoint_id = (i->value);
+ if (strcmp(i->name, "tech") == 0) {
+ args.tech = (i->value);
+ } else
+ if (strcmp(i->name, "resource") == 0) {
+ args.resource = (i->value);
} else
{}
}
@@ -89,8 +106,8 @@ static void stasis_http_get_endpoint_cb(
}
/*! \brief REST handler for /api-docs/endpoints.{format} */
-static struct stasis_rest_handlers endpoints_endpointId = {
- .path_segment = "endpointId",
+static struct stasis_rest_handlers endpoints_tech_resource = {
+ .path_segment = "resource",
.is_wildcard = 1,
.callbacks = {
[AST_HTTP_GET] = stasis_http_get_endpoint_cb,
@@ -99,13 +116,23 @@ static struct stasis_rest_handlers endpoints_endpointId = {
.children = { }
};
/*! \brief REST handler for /api-docs/endpoints.{format} */
+static struct stasis_rest_handlers endpoints_tech = {
+ .path_segment = "tech",
+ .is_wildcard = 1,
+ .callbacks = {
+ [AST_HTTP_GET] = stasis_http_get_endpoints_by_tech_cb,
+ },
+ .num_children = 1,
+ .children = { &endpoints_tech_resource, }
+};
+/*! \brief REST handler for /api-docs/endpoints.{format} */
static struct stasis_rest_handlers endpoints = {
.path_segment = "endpoints",
.callbacks = {
[AST_HTTP_GET] = stasis_http_get_endpoints_cb,
},
.num_children = 1,
- .children = { &endpoints_endpointId, }
+ .children = { &endpoints_tech, }
};
static int load_module(void)
diff --git a/res/res_stasis_test.c b/res/res_stasis_test.c
new file mode 100644
index 000000000..7b5aece4f
--- /dev/null
+++ b/res/res_stasis_test.c
@@ -0,0 +1,291 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file \brief Test infrastructure for dealing with Stasis.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+
+#include "asterisk/astobj2.h"
+#include "asterisk/module.h"
+#include "asterisk/stasis_test.h"
+
+static struct stasis_message_type *test_message_type;
+
+static void stasis_message_sink_dtor(void *obj)
+{
+ struct stasis_message_sink *sink = obj;
+
+ {
+ SCOPED_MUTEX(lock, &sink->lock);
+ while (!sink->is_done) {
+ /* Normally waiting forever is bad, but if we're not
+ * done, we're not done. */
+ ast_cond_wait(&sink->cond, &sink->lock);
+ }
+ }
+
+ ast_mutex_destroy(&sink->lock);
+ ast_cond_destroy(&sink->cond);
+
+ while (sink->num_messages > 0) {
+ ao2_cleanup(sink->messages[--sink->num_messages]);
+ }
+ ast_free(sink->messages);
+ sink->messages = NULL;
+ sink->max_messages = 0;
+}
+
+static struct timespec make_deadline(int timeout_millis)
+{
+ struct timeval start = ast_tvnow();
+ struct timeval delta = {
+ .tv_sec = timeout_millis / 1000,
+ .tv_usec = (timeout_millis % 1000) * 1000,
+ };
+ struct timeval deadline_tv = ast_tvadd(start, delta);
+ struct timespec deadline = {
+ .tv_sec = deadline_tv.tv_sec,
+ .tv_nsec = 1000 * deadline_tv.tv_usec,
+ };
+
+ return deadline;
+}
+
+struct stasis_message_sink *stasis_message_sink_create(void)
+{
+ RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+
+ sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
+ if (!sink) {
+ return NULL;
+ }
+ ast_mutex_init(&sink->lock);
+ ast_cond_init(&sink->cond, NULL);
+ sink->max_messages = 4;
+ sink->messages =
+ ast_malloc(sizeof(*sink->messages) * sink->max_messages);
+ if (!sink->messages) {
+ return NULL;
+ }
+ ao2_ref(sink, +1);
+ return sink;
+}
+
+/*!
+ * \brief Implementation of the stasis_message_sink_cb() callback.
+ *
+ * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
+ * it has to do with how we load modules.
+ *
+ * Modules have their own metadata compiled into them in the AST_MODULE_INFO()
+ * block. This includes dependency information in the \c nonoptreq field.
+ *
+ * Asterisk loads the module, inspects the field, then loads any needed
+ * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
+ * dlopen(), which defers binding function references until they are called.
+ *
+ * But when you take the address of a function, that function needs to be
+ * available at load time. So if some module used the address of
+ * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
+ * that module would fail to load.
+ *
+ * The stasis_message_sink_cb() function gives us a layer of indirection so that
+ * the initial lazy binding will still work as expected.
+ */
+static void message_sink_cb(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct stasis_message_sink *sink = data;
+
+ SCOPED_MUTEX(lock, &sink->lock);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ sink->is_done = 1;
+ ast_cond_signal(&sink->cond);
+ return;
+ }
+
+ if (stasis_subscription_change_type() == stasis_message_type(message)) {
+ /* Ignore subscription changes */
+ return;
+ }
+
+ if (sink->num_messages == sink->max_messages) {
+ size_t new_max_messages = sink->max_messages * 2;
+ struct stasis_message **new_messages = ast_realloc(
+ sink->messages,
+ sizeof(*new_messages) * new_max_messages);
+ if (!new_messages) {
+ return;
+ }
+ sink->max_messages = new_max_messages;
+ sink->messages = new_messages;
+ }
+
+ ao2_ref(message, +1);
+ sink->messages[sink->num_messages++] = message;
+ ast_cond_signal(&sink->cond);
+}
+
+stasis_subscription_cb stasis_message_sink_cb(void)
+{
+ return message_sink_cb;
+}
+
+
+int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
+ int num_messages, int timeout_millis)
+{
+ struct timespec deadline = make_deadline(timeout_millis);
+
+ SCOPED_MUTEX(lock, &sink->lock);
+ while (sink->num_messages < num_messages) {
+ int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ break;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+ strerror(r));
+ break;
+ }
+ }
+ return sink->num_messages;
+}
+
+int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
+ int num_messages, int timeout_millis)
+{
+ struct timespec deadline = make_deadline(timeout_millis);
+
+ SCOPED_MUTEX(lock, &sink->lock);
+ while (sink->num_messages == num_messages) {
+ int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ break;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+ strerror(r));
+ break;
+ }
+ }
+ return sink->num_messages;
+}
+
+int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
+ stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
+{
+ struct timespec deadline = make_deadline(timeout_millis);
+
+ SCOPED_MUTEX(lock, &sink->lock);
+
+ /* wait for the start */
+ while (sink->num_messages < start + 1) {
+ int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ /* Timed out waiting for the start */
+ return -1;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+ strerror(r));
+ return -2;
+ }
+ }
+
+
+ while (!cmp_cb(sink->messages[start], data)) {
+ ++start;
+
+ while (sink->num_messages < start + 1) {
+ int r = ast_cond_timedwait(&sink->cond,
+ &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ return -1;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR,
+ "Unexpected condition error: %s\n",
+ strerror(r));
+ return -2;
+ }
+ }
+ }
+
+ return start;
+}
+
+struct stasis_message *stasis_test_message_create(void)
+{
+ RAII_VAR(void *, data, NULL, ao2_cleanup);
+
+ /* We just need the unique pointer; don't care what's in it */
+ data = ao2_alloc(1, NULL);
+ if (!data) {
+ return NULL;
+ }
+
+ return stasis_message_create(stasis_test_message_type(), data);
+}
+
+struct stasis_message_type *stasis_test_message_type(void)
+{
+ return test_message_type;
+}
+
+static int unload_module(void)
+{
+ ao2_cleanup(test_message_type);
+ test_message_type = NULL;
+ return 0;
+}
+
+static int load_module(void)
+{
+ test_message_type = stasis_message_type_create(
+ "stasis_test_message");
+ if (!test_message_type) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY,
+ AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER,
+ "Stasis test utilities",
+ .load = load_module,
+ .unload = unload_module,
+ .load_pri = AST_MODPRI_APP_DEPEND,
+ );
diff --git a/res/res_stasis_test.exports.in b/res/res_stasis_test.exports.in
new file mode 100644
index 000000000..961600323
--- /dev/null
+++ b/res/res_stasis_test.exports.in
@@ -0,0 +1,6 @@
+{
+ global:
+ LINKER_SYMBOL_PREFIXstasis_*;
+ local:
+ *;
+};
diff --git a/res/stasis_http/resource_endpoints.c b/res/stasis_http/resource_endpoints.c
index b2611bad8..52d05c093 100644
--- a/res/stasis_http/resource_endpoints.c
+++ b/res/stasis_http/resource_endpoints.c
@@ -1,4 +1,4 @@
-/* -*- C -*-
+/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2012 - 2013, Digium, Inc.
@@ -18,26 +18,140 @@
/*! \file
*
- * \brief Implementation for stasis-http stubs.
+ * \brief /api-docs/endpoints.{format} implementation- Endpoint resources
*
* \author David M. Lee, II <dlee@digium.com>
*/
-/*** MODULEINFO
- <support_level>core</support_level>
- ***/
-
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "resource_endpoints.h"
-void stasis_http_get_endpoint(struct ast_variable *headers, struct ast_get_endpoint_args *args, struct stasis_http_response *response)
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_endpoints.h"
+
+void stasis_http_get_endpoints(struct ast_variable *headers,
+ struct ast_get_endpoints_args *args,
+ struct stasis_http_response *response)
+{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ao2_iterator i;
+ void *obj;
+
+ caching_topic = ast_endpoint_topic_all_cached();
+ if (!caching_topic) {
+ stasis_http_response_error(
+ response, 500, "Internal Server Error",
+ "Message bus not initialized");
+ return;
+ }
+ ao2_ref(caching_topic, +1);
+
+ snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type());
+ if (!snapshots) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ json = ast_json_array_create();
+ if (!json) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ i = ao2_iterator_init(snapshots, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
+ struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg);
+ int r = ast_json_array_append(
+ json, ast_endpoint_snapshot_to_json(snapshot));
+ if (r != 0) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+ }
+ ao2_iterator_destroy(&i);
+
+ stasis_http_response_ok(response, ast_json_ref(json));
+}
+void stasis_http_get_endpoints_by_tech(struct ast_variable *headers,
+ struct ast_get_endpoints_by_tech_args *args,
+ struct stasis_http_response *response)
{
- ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoint\n");
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ao2_iterator i;
+ void *obj;
+
+ /* TODO - if tech isn't a recognized type of endpoint, it should 404 */
+
+ caching_topic = ast_endpoint_topic_all_cached();
+ if (!caching_topic) {
+ stasis_http_response_error(
+ response, 500, "Internal Server Error",
+ "Message bus not initialized");
+ return;
+ }
+ ao2_ref(caching_topic, +1);
+
+ snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type());
+ if (!snapshots) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ json = ast_json_array_create();
+ if (!json) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ i = ao2_iterator_init(snapshots, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
+ struct ast_endpoint_snapshot *snapshot = stasis_message_data(msg);
+ int r;
+
+ if (strcmp(args->tech, snapshot->tech) != 0) {
+ continue;
+ }
+
+ r = ast_json_array_append(
+ json, ast_endpoint_snapshot_to_json(snapshot));
+ if (r != 0) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+ }
+ ao2_iterator_destroy(&i);
+
+ stasis_http_response_ok(response, ast_json_ref(json));
}
-void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response)
+void stasis_http_get_endpoint(struct ast_variable *headers,
+ struct ast_get_endpoint_args *args,
+ struct stasis_http_response *response)
{
- ast_log(LOG_ERROR, "TODO: stasis_http_get_endpoints\n");
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource);
+ if (!snapshot) {
+ stasis_http_response_error(response, 404, "Not Found",
+ "Endpoint not found");
+ return;
+ }
+
+ json = ast_endpoint_snapshot_to_json(snapshot);
+ if (!json) {
+ stasis_http_response_alloc_failed(response);
+ return;
+ }
+
+ stasis_http_response_ok(response, ast_json_ref(json));
}
diff --git a/res/stasis_http/resource_endpoints.h b/res/stasis_http/resource_endpoints.h
index 9f5a96e50..b534fb047 100644
--- a/res/stasis_http/resource_endpoints.h
+++ b/res/stasis_http/resource_endpoints.h
@@ -41,21 +41,34 @@
/*! \brief Argument struct for stasis_http_get_endpoints() */
struct ast_get_endpoints_args {
- /*! \brief Filter endpoints by type (sip,iax2,dhadi,...) */
- const char *with_type;
};
/*!
- * \brief List available endoints.
+ * \brief List all endoints.
*
* \param headers HTTP headers
* \param args Swagger parameters
* \param[out] response HTTP response
*/
void stasis_http_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct stasis_http_response *response);
+/*! \brief Argument struct for stasis_http_get_endpoints_by_tech() */
+struct ast_get_endpoints_by_tech_args {
+ /*! \brief Technology of the endpoints (sip,iax2,...) */
+ const char *tech;
+};
+/*!
+ * \brief List available endoints for a given endpoint technology.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void stasis_http_get_endpoints_by_tech(struct ast_variable *headers, struct ast_get_endpoints_by_tech_args *args, struct stasis_http_response *response);
/*! \brief Argument struct for stasis_http_get_endpoint() */
struct ast_get_endpoint_args {
+ /*! \brief Technology of the endpoint */
+ const char *tech;
/*! \brief ID of the endpoint */
- const char *endpoint_id;
+ const char *resource;
};
/*!
* \brief Details for an endpoint.
diff --git a/rest-api/api-docs/endpoints.json b/rest-api/api-docs/endpoints.json
index 43b8453d7..d3d77d84a 100644
--- a/rest-api/api-docs/endpoints.json
+++ b/rest-api/api-docs/endpoints.json
@@ -13,16 +13,26 @@
"operations": [
{
"httpMethod": "GET",
- "summary": "List available endoints.",
+ "summary": "List all endoints.",
"nickname": "getEndpoints",
+ "responseClass": "List[Endpoint]"
+ }
+ ]
+ },
+ {
+ "path": "/endpoints/{tech}",
+ "description": "Asterisk endpoints",
+ "operations": [
+ {
+ "httpMethod": "GET",
+ "summary": "List available endoints for a given endpoint technology.",
+ "nickname": "getEndpointsByTech",
"responseClass": "List[Endpoint]",
"parameters": [
{
- "name": "withType",
- "description": "Filter endpoints by type (sip,iax2,dhadi,...)",
- "paramType": "query",
- "required": false,
- "allowMultiple": true,
+ "name": "tech",
+ "description": "Technology of the endpoints (sip,iax2,...)",
+ "paramType": "path",
"dataType": "string"
}
]
@@ -30,7 +40,7 @@
]
},
{
- "path": "/endpoints/{endpointId}",
+ "path": "/endpoints/{tech}/{resource}",
"description": "Single endpoint",
"operations": [
{
@@ -40,7 +50,13 @@
"responseClass": "Endpoint",
"parameters": [
{
- "name": "endpointId",
+ "name": "tech",
+ "description": "Technology of the endpoint",
+ "paramType": "path",
+ "dataType": "string"
+ },
+ {
+ "name": "resource",
"description": "ID of the endpoint",
"paramType": "path",
"dataType": "string"
@@ -53,13 +69,16 @@
"models": {
"Endpoint": {
"id": "Endpoint",
+ "description": "A snapshot of an endpoint. Unlike most resources, which have a single unique identifier, an endpoint is uniquely identified by the technology/resource pair.",
"properties": {
"technology": {
"type": "string",
+ "description": "Technology of the endpoint",
"required": true
},
- "name": {
+ "resource": {
"type": "string",
+ "description": "Identifier of the endpoint, specific to the given technology.",
"required": true
}
}
diff --git a/tests/test_endpoints.c b/tests/test_endpoints.c
new file mode 100644
index 000000000..2758e8e78
--- /dev/null
+++ b/tests/test_endpoints.c
@@ -0,0 +1,158 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file \brief Test endpoints.
+ *
+ * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
+ *
+ * \ingroup tests
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/module.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/test.h"
+
+static const char *test_category = "/core/endpoints/";
+
+AST_TEST_DEFINE(create)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test endpoint creation";
+ info->description = "Test endpoint creation";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ ast_test_validate(test, NULL == ast_endpoint_create(NULL, NULL));
+ ast_test_validate(test, NULL == ast_endpoint_create("", ""));
+ ast_test_validate(test, NULL == ast_endpoint_create("TEST", ""));
+ ast_test_validate(test, NULL == ast_endpoint_create("", "test_res"));
+
+ uut = ast_endpoint_create("TEST", "test_res");
+ ast_test_validate(test, NULL != uut);
+
+ ast_test_validate(test,
+ 0 == strcmp("TEST", ast_endpoint_get_tech(uut)));
+ ast_test_validate(test,
+ 0 == strcmp("test_res", ast_endpoint_get_resource(uut)));
+
+ return AST_TEST_PASS;
+}
+
+
+AST_TEST_DEFINE(defaults)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test defaults for new endpoints";
+ info->description = "Test defaults for new endpoints";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ uut = ast_endpoint_create("TEST", "test_res");
+ ast_test_validate(test, NULL != uut);
+ snapshot = ast_endpoint_snapshot_create(uut);
+ ast_test_validate(test, NULL != snapshot);
+
+ ast_test_validate(test, 0 == strcmp("TEST/test_res", snapshot->id));
+ ast_test_validate(test, 0 == strcmp("TEST", snapshot->tech));
+ ast_test_validate(test, 0 == strcmp("test_res", snapshot->resource));
+ ast_test_validate(test, AST_ENDPOINT_UNKNOWN == snapshot->state);
+ ast_test_validate(test, -1 == snapshot->max_channels);
+ ast_test_validate(test, 0 == snapshot->num_channels);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(setters)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+ RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test endpoint setters";
+ info->description = "Test endpoint setters";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ uut = ast_endpoint_create("TEST", "test_res");
+ ast_test_validate(test, NULL != uut);
+
+ ast_endpoint_set_state(uut, AST_ENDPOINT_ONLINE);
+ ast_endpoint_set_max_channels(uut, 314159);
+
+ snapshot = ast_endpoint_snapshot_create(uut);
+ ast_test_validate(test, NULL != snapshot);
+
+ ast_test_validate(test, AST_ENDPOINT_ONLINE == snapshot->state);
+ ast_test_validate(test, 314159 == snapshot->max_channels);
+
+ return AST_TEST_PASS;
+}
+
+static int unload_module(void)
+{
+ AST_TEST_UNREGISTER(create);
+ AST_TEST_UNREGISTER(defaults);
+ AST_TEST_UNREGISTER(setters);
+ return 0;
+}
+
+static int load_module(void)
+{
+ AST_TEST_REGISTER(create);
+ AST_TEST_REGISTER(defaults);
+ AST_TEST_REGISTER(setters);
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT,
+ "Endpoint testing",
+ .load = load_module,
+ .unload = unload_module,
+ );
diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c
new file mode 100644
index 000000000..30d0d09ae
--- /dev/null
+++ b/tests/test_stasis_endpoints.c
@@ -0,0 +1,307 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file \brief Test endpoints.
+ *
+ * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
+ *
+ * \ingroup tests
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <depend>res_stasis_test</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/channel.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/module.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/stasis_test.h"
+#include "asterisk/test.h"
+
+static const char *test_category = "/stasis/endpoints/";
+
+static void safe_channel_hangup(struct ast_channel *chan)
+{
+ if (!chan) {
+ return;
+ }
+ ast_hangup(chan);
+}
+
+/*! \brief Message matcher looking for cache update messages */
+static int cache_update(struct stasis_message *msg, const void *data) {
+ struct stasis_cache_update *update;
+ struct ast_endpoint_snapshot *snapshot;
+ const char *name = data;
+
+ if (stasis_cache_update_type() != stasis_message_type(msg)) {
+ return 0;
+ }
+
+ update = stasis_message_data(msg);
+ if (ast_endpoint_snapshot_type() != update->type) {
+ return 0;
+ }
+
+ snapshot = stasis_message_data(update->old_snapshot);
+ if (!snapshot) {
+ snapshot = stasis_message_data(update->new_snapshot);
+ }
+
+ return 0 == strcmp(name, snapshot->resource);
+}
+
+AST_TEST_DEFINE(state_changes)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+ RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
+ RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ struct stasis_message *msg;
+ struct stasis_message_type *type;
+ struct ast_endpoint_snapshot *actual_snapshot;
+ int actual_count;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test endpoint updates as its state changes";
+ info->description =
+ "Test endpoint updates as its state changes";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ uut = ast_endpoint_create("TEST", __func__);
+ ast_test_validate(test, NULL != uut);
+
+ sink = stasis_message_sink_create();
+ ast_test_validate(test, NULL != sink);
+
+ sub = stasis_subscribe(ast_endpoint_topic(uut),
+ stasis_message_sink_cb(), sink);
+ ast_test_validate(test, NULL != sub);
+
+ ast_endpoint_set_state(uut, AST_ENDPOINT_OFFLINE);
+ actual_count = stasis_message_sink_wait_for_count(sink, 1,
+ STASIS_SINK_DEFAULT_WAIT);
+ msg = sink->messages[0];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, AST_ENDPOINT_OFFLINE == actual_snapshot->state);
+
+ ast_endpoint_set_max_channels(uut, 8675309);
+ actual_count = stasis_message_sink_wait_for_count(sink, 2,
+ STASIS_SINK_DEFAULT_WAIT);
+ msg = sink->messages[1];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, 8675309 == actual_snapshot->max_channels);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(cache_clear)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+ RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
+ RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ struct stasis_message *msg;
+ struct stasis_message_type *type;
+ struct ast_endpoint_snapshot *actual_snapshot;
+ struct stasis_cache_update *update;
+ int message_index;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test endpoint state change messages";
+ info->description = "Test endpoint state change messages";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ /* Subscribe to the cache topic */
+ sink = stasis_message_sink_create();
+ ast_test_validate(test, NULL != sink);
+
+ sub = stasis_subscribe(
+ stasis_caching_get_topic(ast_endpoint_topic_all_cached()),
+ stasis_message_sink_cb(), sink);
+ ast_test_validate(test, NULL != sub);
+
+ uut = ast_endpoint_create("TEST", __func__);
+ ast_test_validate(test, NULL != uut);
+
+ /* Since the cache topic is a singleton (ew), it may have messages from
+ * elsewheres that it's processing, or maybe even some final messages
+ * from the prior test. We've got to wait_for our specific message,
+ * instead of wait_for_count.
+ */
+ message_index = stasis_message_sink_wait_for(sink, 0,
+ cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 0 <= message_index);
+
+ /* First message should be a cache creation entry for our endpont */
+ msg = sink->messages[message_index];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, stasis_cache_update_type() == type);
+ update = stasis_message_data(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
+ ast_test_validate(test, NULL == update->old_snapshot);
+ actual_snapshot = stasis_message_data(update->new_snapshot);
+ ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
+ ast_test_validate(test,
+ 0 == strcmp(__func__, actual_snapshot->resource));
+
+ ast_endpoint_shutdown(uut);
+ uut = NULL;
+ message_index = stasis_message_sink_wait_for(sink, message_index + 1,
+ cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 0 <= message_index);
+ /* Now we should have a cache removal entry */
+ msg = sink->messages[message_index];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, stasis_cache_update_type() == type);
+ update = stasis_message_data(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
+ actual_snapshot = stasis_message_data(update->old_snapshot);
+ ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
+ ast_test_validate(test,
+ 0 == strcmp(__func__, actual_snapshot->resource));
+ ast_test_validate(test, NULL == update->new_snapshot);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(channel_messages)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+ RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
+ RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ struct stasis_message *msg;
+ struct stasis_message_type *type;
+ struct ast_endpoint_snapshot *actual_snapshot;
+ int actual_count;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test channel messages on an endpoint topic";
+ info->description =
+ "Test channel messages on an endpoint topic";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ uut = ast_endpoint_create("TEST", __func__);
+ ast_test_validate(test, NULL != uut);
+
+ sink = stasis_message_sink_create();
+ ast_test_validate(test, NULL != sink);
+
+ sub = stasis_subscribe(ast_endpoint_topic(uut),
+ stasis_message_sink_cb(), sink);
+ ast_test_validate(test, NULL != sub);
+
+ chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", __func__, "100",
+ "100", "default", NULL, 0, "TEST/test_res");
+ ast_test_validate(test, NULL != chan);
+
+ ast_endpoint_add_channel(uut, chan);
+
+ actual_count = stasis_message_sink_wait_for_count(sink, 2,
+ STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 2 == actual_count);
+
+ msg = sink->messages[0];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_channel_snapshot_type() == type);
+
+ msg = sink->messages[1];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, 1 == actual_snapshot->num_channels);
+
+ safe_channel_hangup(chan);
+ chan = NULL;
+
+ actual_count = stasis_message_sink_wait_for_count(sink, 5,
+ STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 5 == actual_count);
+
+ msg = sink->messages[2];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_channel_snapshot_type() == type);
+
+ msg = sink->messages[3];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, stasis_cache_clear_type() == type);
+
+ msg = sink->messages[4];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, 0 == actual_snapshot->num_channels);
+
+ return AST_TEST_PASS;
+}
+
+static int unload_module(void)
+{
+ AST_TEST_UNREGISTER(state_changes);
+ AST_TEST_UNREGISTER(cache_clear);
+ AST_TEST_UNREGISTER(channel_messages);
+ return 0;
+}
+
+static int load_module(void)
+{
+ AST_TEST_REGISTER(state_changes);
+ AST_TEST_REGISTER(cache_clear);
+ AST_TEST_REGISTER(channel_messages);
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT,
+ "Endpoint stasis-related testing",
+ .load = load_module,
+ .unload = unload_module,
+ .nonoptreq = "res_stasis_test",
+ );