summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-08-01 13:49:34 +0000
committerDavid M. Lee <dlee@digium.com>2013-08-01 13:49:34 +0000
commite1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 (patch)
tree3026c96da713bafcf1126c77bde6994f348280bb /main
parent5c1396946929ab19e94c117f8ad3db5f78a450bc (diff)
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to the current structure of stasis_caching_topics: you cannot subscribe to cache updates for a single channel/bridge/endpoint/etc. To address this, this patch splits the cache away from the stasis_caching_topic, making it a first class object. The stasis_cache object is shared amongst individual stasis_caching_topics that are created per channel/endpoint/etc. These are still forwarded to global whatever_all_cached topics, so their use from most of the code does not change. In making these changes, I noticed that we frequently used a similar pattern for bridges, endpoints and channels: single_topic ----------------> all_topic ^ | single_topic_cached ----+----> all_topic_cached | +----> cache This pattern was extracted as the 'Stasis Caching Pattern', defined in stasis_caching_pattern.h. This avoids a lot of duplicate code between the different domain objects. Since the cache is now disassociated from its upstream caching topics, this also necessitated a change to how the 'guaranteed' flag worked for retrieving from a cache. The code for handling the caching guarantee was extracted into a 'stasis_topic_wait' function, which works for any stasis_topic. (closes issue ASTERISK-22002) Review: https://reviewboard.asterisk.org/r/2672/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r--main/app.c18
-rw-r--r--main/bridge.c18
-rw-r--r--main/cdr.c4
-rw-r--r--main/cel.c4
-rw-r--r--main/channel_internal_api.c44
-rw-r--r--main/cli.c6
-rw-r--r--main/devicestate.c26
-rw-r--r--main/endpoints.c53
-rw-r--r--main/manager.c6
-rw-r--r--main/manager_bridges.c8
-rw-r--r--main/manager_channels.c2
-rw-r--r--main/manager_endpoints.c8
-rw-r--r--main/pbx.c2
-rw-r--r--main/presencestate.c21
-rw-r--r--main/stasis.c5
-rw-r--r--main/stasis_bridges.c143
-rw-r--r--main/stasis_cache.c173
-rw-r--r--main/stasis_cache_pattern.c189
-rw-r--r--main/stasis_channels.c109
-rw-r--r--main/stasis_endpoints.c86
-rw-r--r--main/stasis_wait.c133
21 files changed, 734 insertions, 324 deletions
diff --git a/main/app.c b/main/app.c
index 031f6f28f..8d081fe8c 100644
--- a/main/app.c
+++ b/main/app.c
@@ -88,6 +88,7 @@ static AST_LIST_HEAD_STATIC(zombies, zombie);
* @{ \brief Define \ref stasis topic objects for MWI
*/
static struct stasis_topic *mwi_topic_all;
+static struct stasis_cache *mwi_state_cache;
static struct stasis_caching_topic *mwi_topic_cached;
static struct stasis_topic_pool *mwi_topic_pool;
/* @} */
@@ -2696,9 +2697,14 @@ struct stasis_topic *ast_mwi_topic_all(void)
return mwi_topic_all;
}
-struct stasis_caching_topic *ast_mwi_topic_cached(void)
+struct stasis_cache *ast_mwi_state_cache(void)
{
- return mwi_topic_cached;
+ return mwi_state_cache;
+}
+
+struct stasis_topic *ast_mwi_topic_cached(void)
+{
+ return stasis_caching_get_topic(mwi_topic_cached);
}
struct stasis_topic *ast_mwi_topic(const char *uniqueid)
@@ -2754,7 +2760,7 @@ int ast_publish_mwi_state_full(
if (!ast_strlen_zero(channel_id)) {
RAII_VAR(struct stasis_message *, chan_message,
- stasis_cache_get(ast_channel_topic_all_cached(),
+ stasis_cache_get(ast_channel_cache(),
ast_channel_snapshot_type(),
channel_id),
ao2_cleanup);
@@ -2855,7 +2861,11 @@ int app_init(void)
if (!mwi_topic_all) {
return -1;
}
- mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id);
+ mwi_state_cache = stasis_cache_create(mwi_state_get_id);
+ if (!mwi_state_cache) {
+ return -1;
+ }
+ mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache);
if (!mwi_topic_cached) {
return -1;
}
diff --git a/main/bridge.c b/main/bridge.c
index 0b4d95d5e..9e9e65c17 100644
--- a/main/bridge.c
+++ b/main/bridge.c
@@ -46,6 +46,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/bridge_after.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/app.h"
#include "asterisk/file.h"
#include "asterisk/module.h"
@@ -634,6 +635,8 @@ static void destroy_bridge(void *obj)
}
cleanup_video_mode(bridge);
+
+ stasis_cp_single_unsubscribe(bridge->topics);
}
struct ast_bridge *bridge_register(struct ast_bridge *bridge)
@@ -685,6 +688,13 @@ struct ast_bridge *bridge_base_init(struct ast_bridge *self, uint32_t capabiliti
ast_set_flag(&self->feature_flags, flags);
self->allowed_capabilities = capabilities;
+ if (bridge_topics_init(self) != 0) {
+ ast_log(LOG_WARNING, "Bridge %s: Could not initialize topics\n",
+ self->uniqueid);
+ ao2_ref(self, -1);
+ return NULL;
+ }
+
/* Use our helper function to find the "best" bridge technology. */
self->technology = find_best_technology(capabilities, self);
if (!self->technology) {
@@ -4397,7 +4407,7 @@ static char *complete_bridge(const char *word, int state)
struct ao2_iterator iter;
struct stasis_message *msg;
- if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+ if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
return NULL;
}
@@ -4435,7 +4445,7 @@ static char *handle_bridge_show_all(struct ast_cli_entry *e, int cmd, struct ast
return NULL;
}
- if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+ if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
ast_cli(a->fd, "Failed to retrieve cached bridges\n");
return CLI_SUCCESS;
}
@@ -4467,7 +4477,7 @@ static int bridge_show_specific_print_channel(void *obj, void *arg, int flags)
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot;
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached(), ast_channel_snapshot_type(), uniqueid))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid))) {
return 0;
}
snapshot = stasis_message_data(msg);
@@ -4500,7 +4510,7 @@ static char *handle_bridge_show_specific(struct ast_cli_entry *e, int cmd, struc
return CLI_SHOWUSAGE;
}
- msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), a->argv[2]);
+ msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]);
if (!msg) {
ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]);
return CLI_SUCCESS;
diff --git a/main/cdr.c b/main/cdr.c
index 5129cce03..f3608f0a9 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -4005,11 +4005,11 @@ int ast_cdr_engine_init(void)
return -1;
}
- channel_subscription = stasis_forward_all(stasis_caching_get_topic(ast_channel_topic_all_cached()), cdr_topic);
+ channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
if (!channel_subscription) {
return -1;
}
- bridge_subscription = stasis_forward_all(stasis_caching_get_topic(ast_bridge_topic_all_cached()), cdr_topic);
+ bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
if (!bridge_subscription) {
return -1;
}
diff --git a/main/cel.c b/main/cel.c
index f66fbdcc0..a03d08115 100644
--- a/main/cel.c
+++ b/main/cel.c
@@ -1551,14 +1551,14 @@ int ast_cel_engine_init(void)
}
cel_channel_forwarder = stasis_forward_all(
- stasis_caching_get_topic(ast_channel_topic_all_cached()),
+ ast_channel_topic_all_cached(),
cel_aggregation_topic);
if (!cel_channel_forwarder) {
return -1;
}
cel_bridge_forwarder = stasis_forward_all(
- stasis_caching_get_topic(ast_bridge_topic_all_cached()),
+ ast_bridge_topic_all_cached(),
cel_aggregation_topic);
if (!cel_bridge_forwarder) {
return -1;
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index a1d20871d..35bcb187d 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -44,6 +44,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/data.h"
#include "asterisk/endpoints.h"
#include "asterisk/indications.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stringfields.h"
@@ -208,7 +209,7 @@ struct ast_channel {
char dtmf_digit_to_emulate; /*!< Digit being emulated */
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
- struct stasis_topic *topic; /*!< Topic for all channel's events */
+ struct stasis_cp_single *topics; /*!< Topic for all channel's events */
struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
@@ -1434,8 +1435,8 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
chan->forwarder = stasis_unsubscribe(chan->forwarder);
chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
- ao2_cleanup(chan->topic);
- chan->topic = NULL;
+ stasis_cp_single_unsubscribe(chan->topics);
+ chan->topics = NULL;
}
void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1450,16 +1451,31 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan)
struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
{
- return chan ? chan->topic : ast_channel_topic_all();
+ if (!chan) {
+ return ast_channel_topic_all();
+ }
+
+ return stasis_cp_single_topic(chan->topics);
}
-int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *endpoint)
+struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan)
+{
+ if (!chan) {
+ return ast_channel_topic_all_cached();
+ }
+
+ return stasis_cp_single_topic_cached(chan->topics);
+}
+
+int ast_channel_forward_endpoint(struct ast_channel *chan,
+ struct ast_endpoint *endpoint)
{
ast_assert(chan != NULL);
ast_assert(endpoint != NULL);
chan->endpoint_forward =
- stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint));
+ stasis_forward_all(ast_channel_topic(chan),
+ ast_endpoint_topic(endpoint));
if (chan->endpoint_forward == NULL) {
return -1;
@@ -1468,19 +1484,21 @@ int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *
return 0;
}
-void ast_channel_internal_setup_topics(struct ast_channel *chan)
+int ast_channel_internal_setup_topics(struct ast_channel *chan)
{
const char *topic_name = chan->uniqueid;
- ast_assert(chan->topic == NULL);
- ast_assert(chan->forwarder == NULL);
+ ast_assert(chan->topics == NULL);
if (ast_strlen_zero(topic_name)) {
topic_name = "<dummy-channel>";
}
- chan->topic = stasis_topic_create(topic_name);
- chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
+ chan->topics = stasis_cp_single_create(
+ ast_channel_cache_all(), topic_name);
- ast_assert(chan->topic != NULL);
- ast_assert(chan->forwarder != NULL);
+ if (!chan->topics) {
+ return -1;
+ }
+
+ return 0;
}
diff --git a/main/cli.c b/main/cli.c
index 6474f3e72..6ca0737ab 100644
--- a/main/cli.c
+++ b/main/cli.c
@@ -915,7 +915,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar
return CLI_SHOWUSAGE;
- if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+ if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
ast_cli(a->fd, "Failed to retrieve cached channels\n");
return CLI_SUCCESS;
}
@@ -1438,7 +1438,7 @@ static char *handle_showchan(struct ast_cli_entry *e, int cmd, struct ast_cli_ar
now = ast_tvnow();
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
ast_cli(a->fd, "%s is not a known channel\n", a->argv[3]);
return CLI_SUCCESS;
}
@@ -1571,7 +1571,7 @@ char *ast_complete_channels(const char *line, const char *word, int pos, int sta
return NULL;
}
- if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached(), ast_channel_snapshot_type()))) {
+ if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) {
return NULL;
}
diff --git a/main/devicestate.c b/main/devicestate.c
index b2c70f764..c16a0628b 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -196,6 +196,7 @@ static ast_cond_t change_pending;
struct stasis_subscription *devstate_message_sub;
static struct stasis_topic *device_state_topic_all;
+static struct stasis_cache *device_state_cache;
static struct stasis_caching_topic *device_state_topic_cached;
static struct stasis_topic_pool *device_state_topic_pool;
@@ -285,7 +286,7 @@ static enum ast_device_state devstate_cached(const char *device)
RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
struct ast_device_state_message *device_state;
- cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+ cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
if (!cached_msg) {
return AST_DEVICE_UNKNOWN;
}
@@ -586,7 +587,7 @@ static enum ast_device_state get_aggregate_state(char *device)
ast_devstate_aggregate_init(&aggregate);
- cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL);
+ cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
@@ -598,7 +599,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre
RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
struct ast_device_state_message *cached_aggregate_device_state;
- cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+ cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
if (!cached_aggregate_msg) {
return 1;
}
@@ -719,9 +720,14 @@ struct stasis_topic *ast_device_state_topic_all(void)
return device_state_topic_all;
}
-struct stasis_caching_topic *ast_device_state_topic_cached(void)
+struct stasis_cache *ast_device_state_cache(void)
{
- return device_state_topic_cached;
+ return device_state_cache;
+}
+
+struct stasis_topic *ast_device_state_topic_cached(void)
+{
+ return stasis_caching_get_topic(device_state_topic_cached);
}
struct stasis_topic *ast_device_state_topic(const char *device)
@@ -777,6 +783,8 @@ static void devstate_cleanup(void)
devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub);
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
+ ao2_cleanup(device_state_cache);
+ device_state_cache = NULL;
device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
ao2_cleanup(device_state_topic_pool);
@@ -794,7 +802,11 @@ int devstate_init(void)
if (!device_state_topic_all) {
return -1;
}
- device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id);
+ device_state_cache = stasis_cache_create(device_state_get_id);
+ if (!device_state_cache) {
+ return -1;
+ }
+ device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache);
if (!device_state_topic_cached) {
return -1;
}
@@ -803,7 +815,7 @@ int devstate_init(void)
return -1;
}
- devstate_message_sub = stasis_subscribe(stasis_caching_get_topic(ast_device_state_topic_cached()), devstate_change_collector_cb, NULL);
+ devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL);
if (!devstate_message_sub) {
ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
diff --git a/main/endpoints.c b/main/endpoints.c
index d689f2e6e..b33e33f1a 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -58,17 +58,29 @@ struct ast_endpoint {
*/
int max_channels;
/*! Topic for this endpoint's messages */
- struct stasis_topic *topic;
- /*!
- * Forwarding subscription sending messages to ast_endpoint_topic_all()
- */
- struct stasis_subscription *forward;
+ struct stasis_cp_single *topics;
/*! Router for handling this endpoint's messages */
struct stasis_message_router *router;
/*! ast_str_container of channels associated with this endpoint */
struct ao2_container *channel_ids;
};
+struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
+{
+ if (!endpoint) {
+ return ast_endpoint_topic_all();
+ }
+ return stasis_cp_single_topic(endpoint->topics);
+}
+
+struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
+{
+ if (!endpoint) {
+ return ast_endpoint_topic_all_cached();
+ }
+ return stasis_cp_single_topic_cached(endpoint->topics);
+}
+
const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
{
switch (state) {
@@ -88,7 +100,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
ast_assert(endpoint != NULL);
- ast_assert(endpoint->topic != NULL);
+ ast_assert(endpoint->topics != NULL);
snapshot = ast_endpoint_snapshot_create(endpoint);
if (!snapshot) {
@@ -98,7 +110,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
if (!message) {
return;
}
- stasis_publish(endpoint->topic, message);
+ stasis_publish(ast_endpoint_topic(endpoint), message);
}
static void endpoint_dtor(void *obj)
@@ -110,11 +122,8 @@ static void endpoint_dtor(void *obj)
ao2_cleanup(endpoint->router);
endpoint->router = NULL;
- stasis_unsubscribe(endpoint->forward);
- endpoint->forward = NULL;
-
- ao2_cleanup(endpoint->topic);
- endpoint->topic = NULL;
+ stasis_cp_single_unsubscribe(endpoint->topics);
+ endpoint->topics = NULL;
ao2_cleanup(endpoint->channel_ids);
endpoint->channel_ids = NULL;
@@ -214,18 +223,13 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
return NULL;
}
- endpoint->topic = stasis_topic_create(endpoint->id);
- if (!endpoint->topic) {
- return NULL;
- }
-
- endpoint->forward =
- stasis_forward_all(endpoint->topic, ast_endpoint_topic_all());
- if (!endpoint->forward) {
+ endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
+ endpoint->id);
+ if (!endpoint->topics) {
return NULL;
}
- endpoint->router = stasis_message_router_create(endpoint->topic);
+ endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
if (!endpoint->router) {
return NULL;
}
@@ -271,7 +275,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
message = stasis_cache_clear_create(clear_msg);
if (message) {
- stasis_publish(endpoint->topic, message);
+ stasis_publish(ast_endpoint_topic(endpoint), message);
}
}
@@ -285,11 +289,6 @@ const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
return endpoint->resource;
}
-struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
-{
- return endpoint ? endpoint->topic : ast_endpoint_topic_all();
-}
-
void ast_endpoint_set_state(struct ast_endpoint *endpoint,
enum ast_endpoint_state state)
{
diff --git a/main/manager.c b/main/manager.c
index 33bf26977..8ea7f4202 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -3874,7 +3874,7 @@ static int action_status(struct mansession *s, const struct message *m)
}
if (all) {
- if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+ if (!(cached_channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
ast_free(str);
astman_send_error(s, m, "Memory Allocation Failure");
return 1;
@@ -3882,7 +3882,7 @@ static int action_status(struct mansession *s, const struct message *m)
it_chans = ao2_iterator_init(cached_channels, 0);
msg = ao2_iterator_next(&it_chans);
} else {
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), name))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), name))) {
astman_send_error(s, m, "No such channel");
ast_free(str);
return 0;
@@ -5356,7 +5356,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m
idText[0] = '\0';
}
- if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+ if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
astman_send_error(s, m, "Could not get cached channels");
return 0;
}
diff --git a/main/manager_bridges.c b/main/manager_bridges.c
index 7f0ae6b01..c791e63f3 100644
--- a/main/manager_bridges.c
+++ b/main/manager_bridges.c
@@ -350,7 +350,7 @@ static int manager_bridges_list(struct mansession *s, const struct message *m)
ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
}
- bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type());
+ bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
if (!bridges) {
astman_send_error(s, m, "Internal error");
return -1;
@@ -382,7 +382,7 @@ static int send_bridge_info_item_cb(void *obj, void *arg, void *data, int flags)
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot;
RAII_VAR(struct ast_str *, channel_text, NULL, ast_free);
- msg = stasis_cache_get(ast_channel_topic_all_cached(),
+ msg = stasis_cache_get(ast_channel_cache(),
ast_channel_snapshot_type(), uniqueid);
if (!msg) {
@@ -432,7 +432,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m)
ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
}
- msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), bridge_uniqueid);
+ msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), bridge_uniqueid);
if (!msg) {
astman_send_error(s, m, "Specified BridgeUniqueid not found");
return -1;
@@ -489,7 +489,7 @@ int manager_bridging_init(void)
return -1;
}
- bridge_topic = stasis_caching_get_topic(ast_bridge_topic_all_cached());
+ bridge_topic = ast_bridge_topic_all_cached();
if (!bridge_topic) {
return -1;
}
diff --git a/main/manager_channels.c b/main/manager_channels.c
index d26f0be06..cab4aa38d 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -1269,7 +1269,7 @@ int manager_channels_init(void)
if (!message_router) {
return -1;
}
- channel_topic = stasis_caching_get_topic(ast_channel_topic_all_cached());
+ channel_topic = ast_channel_topic_all_cached();
if (!channel_topic) {
return -1;
}
diff --git a/main/manager_endpoints.c b/main/manager_endpoints.c
index f0ed28a2c..634283728 100644
--- a/main/manager_endpoints.c
+++ b/main/manager_endpoints.c
@@ -49,7 +49,11 @@ static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
- stasis_forward_message(ast_manager_get_topic(), stasis_caching_get_topic(ast_endpoint_topic_all_cached()), message);
+ /* XXX This looks wrong. Nothing should post or forward to a caching
+ * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
+ * to dig to make sure I don't break anything, though.
+ */
+ stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
}
int manager_endpoints_init(void)
@@ -64,7 +68,7 @@ int manager_endpoints_init(void)
ast_register_atexit(manager_endpoints_shutdown);
- endpoint_topic = stasis_caching_get_topic(ast_endpoint_topic_all_cached());
+ endpoint_topic = ast_endpoint_topic_all_cached();
if (!endpoint_topic) {
return -1;
}
diff --git a/main/pbx.c b/main/pbx.c
index 0b8024a1f..27f774ac3 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -8042,7 +8042,7 @@ static char *handle_show_chanvar(struct ast_cli_entry *e, int cmd, struct ast_cl
if (a->argc != e->args + 1)
return CLI_SHOWUSAGE;
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
ast_cli(a->fd, "Channel '%s' not found\n", a->argv[e->args]);
return CLI_FAILURE;
}
diff --git a/main/presencestate.c b/main/presencestate.c
index 45174d5a1..3f394b635 100644
--- a/main/presencestate.c
+++ b/main/presencestate.c
@@ -55,6 +55,7 @@ static const struct {
STASIS_MESSAGE_TYPE_DEFN(ast_presence_state_message_type);
struct stasis_topic *presence_state_topic_all;
+struct stasis_cache *presence_state_cache;
struct stasis_caching_topic *presence_state_topic_cached;
/*! \brief A presence state provider */
@@ -95,7 +96,7 @@ static enum ast_presence_state presence_state_cached(const char *presence_provid
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_presence_state_message *presence_state;
- msg = stasis_cache_get(ast_presence_state_topic_cached(), ast_presence_state_message_type(), presence_provider);
+ msg = stasis_cache_get(ast_presence_state_cache(), ast_presence_state_message_type(), presence_provider);
if (!msg) {
return res;
@@ -294,9 +295,14 @@ struct stasis_topic *ast_presence_state_topic_all(void)
return presence_state_topic_all;
}
-struct stasis_caching_topic *ast_presence_state_topic_cached(void)
+struct stasis_cache *ast_presence_state_cache(void)
{
- return presence_state_topic_cached;
+ return presence_state_cache;
+}
+
+struct stasis_topic *ast_presence_state_topic_cached(void)
+{
+ return stasis_caching_get_topic(presence_state_topic_cached);
}
static const char *presence_state_get_id(struct stasis_message *msg)
@@ -314,6 +320,8 @@ static void presence_state_engine_cleanup(void)
{
ao2_cleanup(presence_state_topic_all);
presence_state_topic_all = NULL;
+ ao2_cleanup(presence_state_cache);
+ presence_state_cache = NULL;
presence_state_topic_cached = stasis_caching_unsubscribe_and_join(presence_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_presence_state_message_type);
}
@@ -331,7 +339,12 @@ int ast_presence_state_engine_init(void)
return -1;
}
- presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_get_id);
+ presence_state_cache = stasis_cache_create(presence_state_get_id);
+ if (!presence_state_cache) {
+ return -1;
+ }
+
+ presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_cache);
if (!presence_state_topic_cached) {
return -1;
}
diff --git a/main/stasis.c b/main/stasis.c
index 64f77e309..b1af7b7f6 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -653,6 +653,11 @@ int stasis_init(void)
return -1;
}
+ if (stasis_wait_init() != 0) {
+ ast_log(LOG_ERROR, "Stasis initialization failed\n");
+ return -1;
+ }
+
if (pool) {
ast_log(LOG_ERROR, "Stasis double-initialized\n");
return -1;
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c
index 858875ad9..72f4d5055 100644
--- a/main/stasis_bridges.c
+++ b/main/stasis_bridges.c
@@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/channel.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
@@ -361,6 +362,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct stasis_cp_all *bridge_cache_all;
+
/*!
* @{ \brief Define bridge message types.
*/
@@ -372,14 +375,52 @@ STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_am
STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
/*! @} */
-/*! \brief Aggregate topic for bridge messages */
-static struct stasis_topic *bridge_topic_all;
+struct stasis_cache *ast_bridge_cache(void)
+{
+ return stasis_cp_all_cache(bridge_cache_all);
+}
-/*! \brief Caching aggregate topic for bridge snapshots */
-static struct stasis_caching_topic *bridge_topic_all_cached;
+struct stasis_topic *ast_bridge_topic_all(void)
+{
+ return stasis_cp_all_topic(bridge_cache_all);
+}
+
+struct stasis_topic *ast_bridge_topic_all_cached(void)
+{
+ return stasis_cp_all_topic_cached(bridge_cache_all);
+}
-/*! \brief Topic pool for individual bridge topics */
-static struct stasis_topic_pool *bridge_topic_pool;
+int bridge_topics_init(struct ast_bridge *bridge)
+{
+ if (ast_strlen_zero(bridge->uniqueid)) {
+ ast_log(LOG_ERROR, "Bridge id initialization required\n");
+ return -1;
+ }
+ bridge->topics = stasis_cp_single_create(bridge_cache_all,
+ bridge->uniqueid);
+ if (!bridge->topics) {
+ return -1;
+ }
+ return 0;
+}
+
+struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
+{
+ if (!bridge) {
+ return ast_bridge_topic_all();
+ }
+
+ return stasis_cp_single_topic(bridge->topics);
+}
+
+struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge)
+{
+ if (!bridge) {
+ return ast_bridge_topic_all_cached();
+ }
+
+ return stasis_cp_single_topic_cached(bridge->topics);
+}
/*! \brief Destructor for bridge snapshots */
static void bridge_snapshot_dtor(void *obj)
@@ -425,25 +466,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge
return snapshot;
}
-struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
-{
- struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
- if (!bridge_topic) {
- return ast_bridge_topic_all();
- }
- return bridge_topic;
-}
-
-struct stasis_topic *ast_bridge_topic_all(void)
-{
- return bridge_topic_all;
-}
-
-struct stasis_caching_topic *ast_bridge_topic_all_cached(void)
-{
- return bridge_topic_all_cached;
-}
-
void ast_bridge_publish_state(struct ast_bridge *bridge)
{
RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -464,7 +486,8 @@ void ast_bridge_publish_state(struct ast_bridge *bridge)
stasis_publish(ast_bridge_topic(bridge), msg);
}
-static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
+static void bridge_publish_state_from_blob(struct ast_bridge *bridge,
+ struct ast_bridge_blob *obj)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
@@ -475,7 +498,7 @@ static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
return;
}
- stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
+ stasis_publish(ast_bridge_topic(bridge), msg);
}
/*! \brief Destructor for bridge merge messages */
@@ -597,7 +620,7 @@ void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *cha
/* enter blob first, then state */
stasis_publish(ast_bridge_topic(bridge), msg);
- bridge_publish_state_from_blob(stasis_message_data(msg));
+ bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
}
void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
@@ -610,7 +633,7 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha
}
/* state first, then leave blob (opposite of enter, preserves nesting of events) */
- bridge_publish_state_from_blob(stasis_message_data(msg));
+ bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
stasis_publish(ast_bridge_topic(bridge), msg);
}
@@ -1043,7 +1066,7 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
ast_assert(!ast_strlen_zero(uniqueid));
- message = stasis_cache_get(ast_bridge_topic_all_cached(),
+ message = stasis_cache_get(ast_bridge_cache(),
ast_bridge_snapshot_type(),
uniqueid);
if (!message) {
@@ -1058,23 +1081,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
return snapshot;
}
-static void stasis_bridging_cleanup(void)
-{
- ao2_cleanup(bridge_topic_all);
- bridge_topic_all = NULL;
- bridge_topic_all_cached = stasis_caching_unsubscribe_and_join(
- bridge_topic_all_cached);
- ao2_cleanup(bridge_topic_pool);
- bridge_topic_pool = NULL;
-
- STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type);
-}
-
/*! \brief snapshot ID getter for caching topic */
static const char *bridge_snapshot_get_id(struct stasis_message *msg)
{
@@ -1086,21 +1092,38 @@ static const char *bridge_snapshot_get_id(struct stasis_message *msg)
return snapshot->uniqueid;
}
+static void stasis_bridging_cleanup(void)
+{
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type);
+
+ ao2_cleanup(bridge_cache_all);
+ bridge_cache_all = NULL;
+}
+
int ast_stasis_bridging_init(void)
{
+ int res = 0;
+
ast_register_cleanup(stasis_bridging_cleanup);
- STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type);
- STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
- STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type);
- STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type);
- bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
- bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id);
- bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
-
- return !bridge_topic_all
- || !bridge_topic_all_cached
- || !bridge_topic_pool ? -1 : 0;
+ bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all",
+ bridge_snapshot_get_id);
+
+ if (!bridge_cache_all) {
+ return -1;
+ }
+
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type);
+
+ return res;
}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index 17be90111..3d5065665 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -44,16 +44,19 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#endif
/*! \internal */
+struct stasis_cache {
+ struct ao2_container *entries;
+ snapshot_get_id id_fn;
+};
+
+/*! \internal */
struct stasis_caching_topic {
- struct ao2_container *cache;
+ struct stasis_cache *cache;
struct stasis_topic *topic;
struct stasis_topic *original_topic;
struct stasis_subscription *sub;
- snapshot_get_id id_fn;
};
-static struct stasis_message_type *cache_guarantee_type(void);
-
static void stasis_caching_topic_dtor(void *obj) {
struct stasis_caching_topic *caching_topic = obj;
ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
@@ -136,7 +139,8 @@ static struct cache_entry *cache_entry_create(struct stasis_message_type *type,
ast_assert(type != NULL);
ast_assert(id != NULL);
- entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
+ entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!entry) {
return NULL;
}
@@ -183,28 +187,62 @@ static int cache_entry_cmp(void *obj, void *arg, int flags)
return 0;
}
-static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+static void cache_dtor(void *obj)
+{
+ struct stasis_cache *cache = obj;
+
+ ao2_cleanup(cache->entries);
+ cache->entries = NULL;
+}
+
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+{
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+
+ cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!cache) {
+ return NULL;
+ }
+
+ cache->entries = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash,
+ cache_entry_cmp);
+ if (!cache->entries) {
+ return NULL;
+ }
+
+ cache->id_fn = id_fn;
+
+ ao2_ref(cache, +1);
+ return cache;
+}
+
+static struct stasis_message *cache_put(struct stasis_cache *cache,
+ struct stasis_message_type *type, const char *id,
+ struct stasis_message *new_snapshot)
{
RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
struct stasis_message *old_snapshot = NULL;
- ast_assert(caching_topic->cache != NULL);
+ ast_assert(cache->entries != NULL);
+ ast_assert(new_snapshot == NULL ||
+ type == stasis_message_type(new_snapshot));
new_entry = cache_entry_create(type, id, new_snapshot);
if (new_snapshot == NULL) {
/* Remove entry from cache */
- cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
+ cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_UNLINK);
if (cached_entry) {
old_snapshot = cached_entry->snapshot;
cached_entry->snapshot = NULL;
}
} else {
/* Insert/update cache */
- SCOPED_AO2LOCK(lock, caching_topic->cache);
+ SCOPED_AO2LOCK(lock, cache->entries);
- cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
+ cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_NOLOCK);
if (cached_entry) {
/* Update cache. Because objects are moving, no need to update refcounts. */
old_snapshot = cached_entry->snapshot;
@@ -212,7 +250,7 @@ static struct stasis_message *cache_put(struct stasis_caching_topic *caching_top
new_entry->snapshot = NULL;
} else {
/* Insert into the cache */
- ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
+ ao2_link_flags(cache->entries, new_entry, OBJ_NOLOCK);
}
}
@@ -220,68 +258,19 @@ static struct stasis_message *cache_put(struct stasis_caching_topic *caching_top
return old_snapshot;
}
-/*! \internal */
-struct caching_guarantee {
- ast_mutex_t lock;
- ast_cond_t cond;
- unsigned int done:1;
-};
-
-static void caching_guarantee_dtor(void *obj)
-{
- struct caching_guarantee *guarantee = obj;
-
- ast_assert(guarantee->done == 1);
-
- ast_mutex_destroy(&guarantee->lock);
- ast_cond_destroy(&guarantee->cond);
-}
-
-static struct stasis_message *caching_guarantee_create(void)
-{
- RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-
- if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
- return NULL;
- }
-
- ast_mutex_init(&guarantee->lock);
- ast_cond_init(&guarantee->cond, NULL);
-
- if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
- return NULL;
- }
-
- ao2_ref(msg, +1);
- return msg;
-}
-
-struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, unsigned int guaranteed)
+struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
{
RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
- ast_assert(caching_topic->cache != NULL);
-
- if (guaranteed) {
- RAII_VAR(struct stasis_message *, msg, caching_guarantee_create(), ao2_cleanup);
- struct caching_guarantee *guarantee = stasis_message_data(msg);
-
- ast_mutex_lock(&guarantee->lock);
- stasis_publish(caching_topic->original_topic, msg);
- while (!guarantee->done) {
- ast_cond_wait(&guarantee->cond, &guarantee->lock);
- }
- ast_mutex_unlock(&guarantee->lock);
- }
+ ast_assert(cache->entries != NULL);
search_entry = cache_entry_create(type, id, NULL);
if (search_entry == NULL) {
return NULL;
}
- cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
+ cached_entry = ao2_find(cache->entries, search_entry, OBJ_POINTER);
if (cached_entry == NULL) {
return NULL;
}
@@ -308,25 +297,25 @@ static int cache_dump_cb(void *obj, void *arg, int flags)
return 0;
}
-struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
{
struct cache_dump_data cache_dump;
- ast_assert(caching_topic->cache != NULL);
+ ast_assert(cache->entries != NULL);
cache_dump.type = type;
- cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
+ cache_dump.cached = ao2_container_alloc_options(
+ AO2_ALLOC_OPT_LOCK_NOLOCK, 1, NULL, NULL);
if (!cache_dump.cached) {
return NULL;
}
- ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
+ ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
return cache_dump.cached;
}
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
-STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
{
@@ -362,7 +351,8 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
ast_assert(topic != NULL);
ast_assert(old_snapshot != NULL || new_snapshot != NULL);
- update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
+ update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!update) {
return NULL;
}
@@ -393,7 +383,8 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
return msg;
}
-static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void caching_topic_exec(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
struct stasis_caching_topic *caching_topic = data;
@@ -401,36 +392,25 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
ast_assert(caching_topic != NULL);
ast_assert(caching_topic->topic != NULL);
- ast_assert(caching_topic->id_fn != NULL);
+ ast_assert(caching_topic->cache != NULL);
+ ast_assert(caching_topic->cache->id_fn != NULL);
if (stasis_subscription_final_message(sub, message)) {
caching_topic_needs_unref = caching_topic;
}
- /* Handle cache guarantee event */
- if (cache_guarantee_type() == stasis_message_type(message)) {
- struct caching_guarantee *guarantee = stasis_message_data(message);
-
- ast_mutex_lock(&guarantee->lock);
- guarantee->done = 1;
- ast_cond_signal(&guarantee->cond);
- ast_mutex_unlock(&guarantee->lock);
-
- return;
- }
-
/* Handle cache clear event */
if (stasis_cache_clear_type() == stasis_message_type(message)) {
RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
struct stasis_message *clear_msg = stasis_message_data(message);
- const char *clear_id = caching_topic->id_fn(clear_msg);
+ const char *clear_id = caching_topic->cache->id_fn(clear_msg);
struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
ast_assert(clear_type != NULL);
if (clear_id) {
- old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL);
+ old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
if (old_snapshot) {
update = update_create(topic, old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
@@ -444,7 +424,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
}
}
- id = caching_topic->id_fn(message);
+ id = caching_topic->cache->id_fn(message);
if (id == NULL) {
/* Object isn't cached; forward */
stasis_forward_message(caching_topic->topic, topic, message);
@@ -453,7 +433,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
- old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
+ old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
update = update_create(topic, old_snapshot, message);
if (update == NULL) {
@@ -464,7 +444,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
}
}
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
struct stasis_subscription *sub;
@@ -476,23 +456,19 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
return NULL;
}
- caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
+ caching_topic = ao2_alloc_options(sizeof(*caching_topic),
+ stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (caching_topic == NULL) {
return NULL;
}
- caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
- if (!caching_topic->cache) {
- ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
- return NULL;
- }
-
caching_topic->topic = stasis_topic_create(new_name);
if (caching_topic->topic == NULL) {
return NULL;
}
- caching_topic->id_fn = id_fn;
+ ao2_ref(cache, +1);
+ caching_topic->cache = cache;
sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
if (sub == NULL) {
@@ -514,7 +490,6 @@ static void stasis_cache_cleanup(void)
{
STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
- STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
}
int stasis_cache_init(void)
@@ -529,10 +504,6 @@ int stasis_cache_init(void)
return -1;
}
- if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
- return -1;
- }
-
return 0;
}
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
new file mode 100644
index 000000000..18ae8617e
--- /dev/null
+++ b/main/stasis_cache_pattern.c
@@ -0,0 +1,189 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Typical cache pattern for Stasis topics.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis_cache_pattern.h"
+
+struct stasis_cp_all {
+ struct stasis_topic *topic;
+ struct stasis_topic *topic_cached;
+ struct stasis_cache *cache;
+};
+
+struct stasis_cp_single {
+ struct stasis_topic *topic;
+ struct stasis_caching_topic *topic_cached;
+
+ struct stasis_subscription *forward;
+ struct stasis_subscription *forward_cached;
+};
+
+static void all_dtor(void *obj)
+{
+ struct stasis_cp_all *all = obj;
+
+ ao2_cleanup(all->topic);
+ ao2_cleanup(all->topic_cached);
+ ao2_cleanup(all->cache);
+}
+
+struct stasis_cp_all *stasis_cp_all_create(const char *name,
+ snapshot_get_id id_fn)
+{
+ RAII_VAR(char *, cached_name, NULL, ast_free);
+ RAII_VAR(struct stasis_cp_all *, all, NULL, ao2_cleanup);
+
+ all = ao2_alloc(sizeof(*all), all_dtor);
+ if (!all) {
+ return NULL;
+ }
+
+ ast_asprintf(&cached_name, "%s-cached", name);
+ if (!cached_name) {
+ return NULL;
+ }
+
+ all->topic = stasis_topic_create(name);
+ all->topic_cached = stasis_topic_create(cached_name);
+ all->cache = stasis_cache_create(id_fn);
+
+ if (!all->topic || !all->topic_cached || !all->cache) {
+ return NULL;
+ }
+
+ ao2_ref(all, +1);
+ return all;
+}
+
+struct stasis_topic *stasis_cp_all_topic(struct stasis_cp_all *all)
+{
+ if (!all) {
+ return NULL;
+ }
+ return all->topic;
+}
+
+struct stasis_topic *stasis_cp_all_topic_cached(
+ struct stasis_cp_all *all)
+{
+ if (!all) {
+ return NULL;
+ }
+ return all->topic_cached;
+}
+
+struct stasis_cache *stasis_cp_all_cache(struct stasis_cp_all *all)
+{
+ if (!all) {
+ return NULL;
+ }
+ return all->cache;
+}
+
+static void one_dtor(void *obj)
+{
+ struct stasis_cp_single *one = obj;
+
+ /* Should already be unsubscribed */
+ ast_assert(one->topic_cached == NULL);
+ ast_assert(one->forward == NULL);
+ ast_assert(one->forward_cached == NULL);
+
+ ao2_cleanup(one->topic);
+ one->topic = NULL;
+}
+
+struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
+ const char *name)
+{
+ RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
+
+ one = ao2_alloc(sizeof(*one), one_dtor);
+ if (!one) {
+ return NULL;
+ }
+
+ one->topic = stasis_topic_create(name);
+ if (!one->topic) {
+ return NULL;
+ }
+ one->topic_cached = stasis_caching_topic_create(one->topic, all->cache);
+ if (!one->topic_cached) {
+ return NULL;
+ }
+
+ one->forward = stasis_forward_all(one->topic, all->topic);
+ if (!one->forward) {
+ return NULL;
+ }
+ one->forward_cached = stasis_forward_all(
+ stasis_caching_get_topic(one->topic_cached), all->topic_cached);
+ if (!one->forward_cached) {
+ return NULL;
+ }
+
+ ao2_ref(one, +1);
+ return one;
+}
+
+void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
+{
+ if (!one) {
+ return;
+ }
+
+ stasis_caching_unsubscribe(one->topic_cached);
+ one->topic_cached = NULL;
+ stasis_unsubscribe(one->forward);
+ one->forward = NULL;
+ stasis_unsubscribe(one->forward_cached);
+ one->forward_cached = NULL;
+}
+
+struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one)
+{
+ if (!one) {
+ return NULL;
+ }
+ return one->topic;
+}
+
+struct stasis_topic *stasis_cp_single_topic_cached(
+ struct stasis_cp_single *one)
+{
+ if (!one) {
+ return NULL;
+ }
+ return stasis_caching_get_topic(one->topic_cached);
+}
+
diff --git a/main/stasis_channels.c b/main/stasis_channels.c
index c71bbbd14..6729a1072 100644
--- a/main/stasis_channels.c
+++ b/main/stasis_channels.c
@@ -38,6 +38,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/bridge.h"
#include "asterisk/translate.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/stasis_channels.h"
/*** DOCUMENTATION
@@ -88,23 +89,33 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
-/*! \brief Topic for all channels */
-struct stasis_topic *channel_topic_all;
+static struct stasis_cp_all *channel_cache_all;
+static struct stasis_cache *channel_cache_by_name;
+static struct stasis_caching_topic *channel_by_name_topic;
-/*! \brief Caching topic for all channels */
-struct stasis_caching_topic *channel_topic_all_cached;
+struct stasis_cp_all *ast_channel_cache_all(void)
+{
+ return channel_cache_all;
+}
-/*! \brief Caching topic for all channels indexed by name */
-struct stasis_caching_topic *channel_topic_all_cached_by_name;
+struct stasis_cache *ast_channel_cache(void)
+{
+ return stasis_cp_all_cache(channel_cache_all);
+}
struct stasis_topic *ast_channel_topic_all(void)
{
- return channel_topic_all;
+ return stasis_cp_all_topic(channel_cache_all);
+}
+
+struct stasis_topic *ast_channel_topic_all_cached(void)
+{
+ return stasis_cp_all_topic_cached(channel_cache_all);
}
-struct stasis_caching_topic *ast_channel_topic_all_cached(void)
+struct stasis_cache *ast_channel_cache_by_name(void)
{
- return channel_topic_all_cached;
+ return channel_cache_by_name;
}
static const char *channel_snapshot_get_id(struct stasis_message *message)
@@ -117,11 +128,6 @@ static const char *channel_snapshot_get_id(struct stasis_message *message)
return snapshot->uniqueid;
}
-struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void)
-{
- return channel_topic_all_cached_by_name;
-}
-
static const char *channel_snapshot_get_name(struct stasis_message *message)
{
struct ast_channel_snapshot *snapshot;
@@ -461,7 +467,7 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniquei
ast_assert(!ast_strlen_zero(uniqueid));
- message = stasis_cache_get(ast_channel_topic_all_cached(),
+ message = stasis_cache_get(ast_channel_cache(),
ast_channel_snapshot_type(),
uniqueid);
if (!message) {
@@ -483,7 +489,7 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char
ast_assert(!ast_strlen_zero(name));
- message = stasis_cache_get(ast_channel_topic_all_cached_by_name(),
+ message = stasis_cache_get(ast_channel_cache_by_name(),
ast_channel_snapshot_type(),
name);
if (!message) {
@@ -906,10 +912,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type,
static void stasis_channels_cleanup(void)
{
- channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
- channel_topic_all_cached_by_name = stasis_caching_unsubscribe_and_join(channel_topic_all_cached_by_name);
- ao2_cleanup(channel_topic_all);
- channel_topic_all = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
@@ -929,33 +931,58 @@ static void stasis_channels_cleanup(void)
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type);
+
+ stasis_caching_unsubscribe_and_join(channel_by_name_topic);
+ channel_by_name_topic = NULL;
+ ao2_cleanup(channel_cache_by_name);
+ channel_cache_by_name = NULL;
+ ao2_cleanup(channel_cache_all);
+ channel_cache_all = NULL;
}
-void ast_stasis_channels_init(void)
+int ast_stasis_channels_init(void)
{
+ int res = 0;
+
ast_register_cleanup(stasis_channels_cleanup);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type);
- STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type);
+ channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
+ channel_snapshot_get_id);
+ if (!channel_cache_all) {
+ return -1;
+ }
STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
- channel_topic_all = stasis_topic_create("ast_channel_topic_all");
- channel_topic_all_cached = stasis_caching_topic_create(channel_topic_all, channel_snapshot_get_id);
- channel_topic_all_cached_by_name = stasis_caching_topic_create(channel_topic_all, channel_snapshot_get_name);
+ channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name);
+ if (!channel_cache_by_name) {
+ return -1;
+ }
+
+ channel_by_name_topic = stasis_caching_topic_create(
+ stasis_cp_all_topic(channel_cache_all),
+ channel_cache_by_name);
+ if (!channel_by_name_topic) {
+ return -1;
+ }
+
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type);
+
+ return res;
}
diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c
index a6756182c..831b4aee0 100644
--- a/main/stasis_endpoints.c
+++ b/main/stasis_endpoints.c
@@ -73,6 +73,28 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
</managerEvent>
***/
+static struct stasis_cp_all *endpoint_cache_all;
+
+struct stasis_cp_all *ast_endpoint_cache_all(void)
+{
+ return endpoint_cache_all;
+}
+
+struct stasis_cache *ast_endpoint_cache(void)
+{
+ return stasis_cp_all_cache(endpoint_cache_all);
+}
+
+struct stasis_topic *ast_endpoint_topic_all(void)
+{
+ return stasis_cp_all_topic(endpoint_cache_all);
+}
+
+struct stasis_topic *ast_endpoint_topic_all_cached(void)
+{
+ return stasis_cp_all_topic_cached(endpoint_cache_all);
+}
+
static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg);
STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type);
@@ -80,10 +102,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type,
.to_ami = peerstatus_to_ami,
);
-static struct stasis_topic *endpoint_topic_all;
-
-static struct stasis_caching_topic *endpoint_topic_all_cached;
-
static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg)
{
struct ast_endpoint_blob *obj = stasis_message_data(msg);
@@ -168,16 +186,6 @@ void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_mess
}
}
-struct stasis_topic *ast_endpoint_topic_all(void)
-{
- return endpoint_topic_all;
-}
-
-struct stasis_caching_topic *ast_endpoint_topic_all_cached(void)
-{
- return endpoint_topic_all_cached;
-}
-
struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
const char *name, unsigned int guaranteed)
{
@@ -190,8 +198,12 @@ struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
return NULL;
}
- msg = stasis_cache_get_extended(ast_endpoint_topic_all_cached(),
- ast_endpoint_snapshot_type(), id, guaranteed);
+ if (guaranteed) {
+ stasis_topic_wait(ast_endpoint_topic_all_cached());
+ }
+
+ msg = stasis_cache_get(ast_endpoint_cache(),
+ ast_endpoint_snapshot_type(), id);
if (!msg) {
return NULL;
}
@@ -267,44 +279,28 @@ struct ast_json *ast_endpoint_snapshot_to_json(
return ast_json_ref(json);
}
-static void endpoints_stasis_shutdown(void)
+static void endpoints_stasis_cleanup(void)
{
- stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
- endpoint_topic_all_cached = NULL;
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type);
- ao2_cleanup(endpoint_topic_all);
- endpoint_topic_all = NULL;
+ ao2_cleanup(endpoint_cache_all);
+ endpoint_cache_all = NULL;
}
int ast_endpoint_stasis_init(void)
{
- ast_register_atexit(endpoints_stasis_shutdown);
-
- if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type) != 0) {
- return -1;
- }
+ int res = 0;
+ ast_register_cleanup(endpoints_stasis_cleanup);
- if (!endpoint_topic_all) {
- endpoint_topic_all = stasis_topic_create("endpoint_topic_all");
- }
-
- if (!endpoint_topic_all) {
+ endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
+ endpoint_snapshot_get_id);
+ if (!endpoint_cache_all) {
return -1;
}
- if (!endpoint_topic_all_cached) {
- endpoint_topic_all_cached =
- stasis_caching_topic_create(
- endpoint_topic_all, endpoint_snapshot_get_id);
- }
-
- if (!endpoint_topic_all_cached) {
- return -1;
- }
-
- if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type) != 0) {
- return -1;
- }
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type);
- return 0;
+ return res;
}
diff --git a/main/stasis_wait.c b/main/stasis_wait.c
new file mode 100644
index 000000000..e94c686e1
--- /dev/null
+++ b/main/stasis_wait.c
@@ -0,0 +1,133 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * Joshua Colp <jcolp@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Wait support for Stasis topics.
+ *
+ * \author Joshua Colp <jcolp@digium.com>
+ */
+
+/*** MODULEINFO
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+
+static struct stasis_message_type *cache_guarantee_type(void);
+STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
+
+/*! \internal */
+struct caching_guarantee {
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ unsigned int done:1;
+};
+
+static void caching_guarantee_dtor(void *obj)
+{
+ struct caching_guarantee *guarantee = obj;
+
+ ast_assert(guarantee->done == 1);
+
+ ast_mutex_destroy(&guarantee->lock);
+ ast_cond_destroy(&guarantee->cond);
+}
+
+static void guarantee_handler(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ /* Wait for our particular message */
+ if (data == message) {
+ struct caching_guarantee *guarantee;
+ ast_assert(cache_guarantee_type() == stasis_message_type(message));
+ guarantee = stasis_message_data(message);
+
+ ast_mutex_lock(&guarantee->lock);
+ guarantee->done = 1;
+ ast_cond_signal(&guarantee->cond);
+ ast_mutex_unlock(&guarantee->lock);
+ }
+}
+
+static struct stasis_message *caching_guarantee_create(void)
+{
+ RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
+ return NULL;
+ }
+
+ ast_mutex_init(&guarantee->lock);
+ ast_cond_init(&guarantee->cond, NULL);
+
+ if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
+ return NULL;
+ }
+
+ ao2_ref(msg, +1);
+ return msg;
+}
+
+int stasis_topic_wait(struct stasis_topic *topic)
+{
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ struct caching_guarantee *guarantee;
+
+ msg = caching_guarantee_create();
+ if (!msg) {
+ return -1;
+ }
+
+ sub = stasis_subscribe(topic, guarantee_handler, msg);
+ if (!sub) {
+ return -1;
+ }
+
+ guarantee = stasis_message_data(msg);
+
+ ast_mutex_lock(&guarantee->lock);
+ stasis_publish(topic, msg);
+ while (!guarantee->done) {
+ ast_cond_wait(&guarantee->cond, &guarantee->lock);
+ }
+ ast_mutex_unlock(&guarantee->lock);
+ return 0;
+}
+
+static void wait_cleanup(void)
+{
+ STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
+}
+
+int stasis_wait_init(void)
+{
+ ast_register_cleanup(wait_cleanup);
+
+ if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
+ return -1;
+ }
+ return 0;
+}