diff options
author | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
commit | e1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 (patch) | |
tree | 3026c96da713bafcf1126c77bde6994f348280bb /main | |
parent | 5c1396946929ab19e94c117f8ad3db5f78a450bc (diff) |
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to
the current structure of stasis_caching_topics: you cannot subscribe
to cache updates for a single channel/bridge/endpoint/etc.
To address this, this patch splits the cache away from the
stasis_caching_topic, making it a first class object. The stasis_cache
object is shared amongst individual stasis_caching_topics that are
created per channel/endpoint/etc. These are still forwarded to global
whatever_all_cached topics, so their use from most of the code does
not change.
In making these changes, I noticed that we frequently used a similar
pattern for bridges, endpoints and channels:
single_topic ----------------> all_topic
^
|
single_topic_cached ----+----> all_topic_cached
|
+----> cache
This pattern was extracted as the 'Stasis Caching Pattern', defined in
stasis_caching_pattern.h. This avoids a lot of duplicate code between
the different domain objects.
Since the cache is now disassociated from its upstream caching topics,
this also necessitated a change to how the 'guaranteed' flag worked
for retrieving from a cache. The code for handling the caching
guarantee was extracted into a 'stasis_topic_wait' function, which
works for any stasis_topic.
(closes issue ASTERISK-22002)
Review: https://reviewboard.asterisk.org/r/2672/
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r-- | main/app.c | 18 | ||||
-rw-r--r-- | main/bridge.c | 18 | ||||
-rw-r--r-- | main/cdr.c | 4 | ||||
-rw-r--r-- | main/cel.c | 4 | ||||
-rw-r--r-- | main/channel_internal_api.c | 44 | ||||
-rw-r--r-- | main/cli.c | 6 | ||||
-rw-r--r-- | main/devicestate.c | 26 | ||||
-rw-r--r-- | main/endpoints.c | 53 | ||||
-rw-r--r-- | main/manager.c | 6 | ||||
-rw-r--r-- | main/manager_bridges.c | 8 | ||||
-rw-r--r-- | main/manager_channels.c | 2 | ||||
-rw-r--r-- | main/manager_endpoints.c | 8 | ||||
-rw-r--r-- | main/pbx.c | 2 | ||||
-rw-r--r-- | main/presencestate.c | 21 | ||||
-rw-r--r-- | main/stasis.c | 5 | ||||
-rw-r--r-- | main/stasis_bridges.c | 143 | ||||
-rw-r--r-- | main/stasis_cache.c | 173 | ||||
-rw-r--r-- | main/stasis_cache_pattern.c | 189 | ||||
-rw-r--r-- | main/stasis_channels.c | 109 | ||||
-rw-r--r-- | main/stasis_endpoints.c | 86 | ||||
-rw-r--r-- | main/stasis_wait.c | 133 |
21 files changed, 734 insertions, 324 deletions
diff --git a/main/app.c b/main/app.c index 031f6f28f..8d081fe8c 100644 --- a/main/app.c +++ b/main/app.c @@ -88,6 +88,7 @@ static AST_LIST_HEAD_STATIC(zombies, zombie); * @{ \brief Define \ref stasis topic objects for MWI */ static struct stasis_topic *mwi_topic_all; +static struct stasis_cache *mwi_state_cache; static struct stasis_caching_topic *mwi_topic_cached; static struct stasis_topic_pool *mwi_topic_pool; /* @} */ @@ -2696,9 +2697,14 @@ struct stasis_topic *ast_mwi_topic_all(void) return mwi_topic_all; } -struct stasis_caching_topic *ast_mwi_topic_cached(void) +struct stasis_cache *ast_mwi_state_cache(void) { - return mwi_topic_cached; + return mwi_state_cache; +} + +struct stasis_topic *ast_mwi_topic_cached(void) +{ + return stasis_caching_get_topic(mwi_topic_cached); } struct stasis_topic *ast_mwi_topic(const char *uniqueid) @@ -2754,7 +2760,7 @@ int ast_publish_mwi_state_full( if (!ast_strlen_zero(channel_id)) { RAII_VAR(struct stasis_message *, chan_message, - stasis_cache_get(ast_channel_topic_all_cached(), + stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), channel_id), ao2_cleanup); @@ -2855,7 +2861,11 @@ int app_init(void) if (!mwi_topic_all) { return -1; } - mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id); + mwi_state_cache = stasis_cache_create(mwi_state_get_id); + if (!mwi_state_cache) { + return -1; + } + mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache); if (!mwi_topic_cached) { return -1; } diff --git a/main/bridge.c b/main/bridge.c index 0b4d95d5e..9e9e65c17 100644 --- a/main/bridge.c +++ b/main/bridge.c @@ -46,6 +46,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/bridge_after.h" #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/app.h" #include "asterisk/file.h" #include "asterisk/module.h" @@ -634,6 +635,8 @@ static void destroy_bridge(void *obj) } cleanup_video_mode(bridge); + + stasis_cp_single_unsubscribe(bridge->topics); } struct ast_bridge *bridge_register(struct ast_bridge *bridge) @@ -685,6 +688,13 @@ struct ast_bridge *bridge_base_init(struct ast_bridge *self, uint32_t capabiliti ast_set_flag(&self->feature_flags, flags); self->allowed_capabilities = capabilities; + if (bridge_topics_init(self) != 0) { + ast_log(LOG_WARNING, "Bridge %s: Could not initialize topics\n", + self->uniqueid); + ao2_ref(self, -1); + return NULL; + } + /* Use our helper function to find the "best" bridge technology. */ self->technology = find_best_technology(capabilities, self); if (!self->technology) { @@ -4397,7 +4407,7 @@ static char *complete_bridge(const char *word, int state) struct ao2_iterator iter; struct stasis_message *msg; - if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) { + if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) { return NULL; } @@ -4435,7 +4445,7 @@ static char *handle_bridge_show_all(struct ast_cli_entry *e, int cmd, struct ast return NULL; } - if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) { + if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) { ast_cli(a->fd, "Failed to retrieve cached bridges\n"); return CLI_SUCCESS; } @@ -4467,7 +4477,7 @@ static int bridge_show_specific_print_channel(void *obj, void *arg, int flags) RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_channel_snapshot *snapshot; - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached(), ast_channel_snapshot_type(), uniqueid))) { + if (!(msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid))) { return 0; } snapshot = stasis_message_data(msg); @@ -4500,7 +4510,7 @@ static char *handle_bridge_show_specific(struct ast_cli_entry *e, int cmd, struc return CLI_SHOWUSAGE; } - msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), a->argv[2]); + msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]); if (!msg) { ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]); return CLI_SUCCESS; diff --git a/main/cdr.c b/main/cdr.c index 5129cce03..f3608f0a9 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -4005,11 +4005,11 @@ int ast_cdr_engine_init(void) return -1; } - channel_subscription = stasis_forward_all(stasis_caching_get_topic(ast_channel_topic_all_cached()), cdr_topic); + channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic); if (!channel_subscription) { return -1; } - bridge_subscription = stasis_forward_all(stasis_caching_get_topic(ast_bridge_topic_all_cached()), cdr_topic); + bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic); if (!bridge_subscription) { return -1; } diff --git a/main/cel.c b/main/cel.c index f66fbdcc0..a03d08115 100644 --- a/main/cel.c +++ b/main/cel.c @@ -1551,14 +1551,14 @@ int ast_cel_engine_init(void) } cel_channel_forwarder = stasis_forward_all( - stasis_caching_get_topic(ast_channel_topic_all_cached()), + ast_channel_topic_all_cached(), cel_aggregation_topic); if (!cel_channel_forwarder) { return -1; } cel_bridge_forwarder = stasis_forward_all( - stasis_caching_get_topic(ast_bridge_topic_all_cached()), + ast_bridge_topic_all_cached(), cel_aggregation_topic); if (!cel_bridge_forwarder) { return -1; diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index a1d20871d..35bcb187d 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -44,6 +44,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/data.h" #include "asterisk/endpoints.h" #include "asterisk/indications.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/stasis_channels.h" #include "asterisk/stasis_endpoints.h" #include "asterisk/stringfields.h" @@ -208,7 +209,7 @@ struct ast_channel { char dtmf_digit_to_emulate; /*!< Digit being emulated */ char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */ struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ - struct stasis_topic *topic; /*!< Topic for all channel's events */ + struct stasis_cp_single *topics; /*!< Topic for all channel's events */ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */ struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ }; @@ -1434,8 +1435,8 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) chan->forwarder = stasis_unsubscribe(chan->forwarder); chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward); - ao2_cleanup(chan->topic); - chan->topic = NULL; + stasis_cp_single_unsubscribe(chan->topics); + chan->topics = NULL; } void ast_channel_internal_finalize(struct ast_channel *chan) @@ -1450,16 +1451,31 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan) struct stasis_topic *ast_channel_topic(struct ast_channel *chan) { - return chan ? chan->topic : ast_channel_topic_all(); + if (!chan) { + return ast_channel_topic_all(); + } + + return stasis_cp_single_topic(chan->topics); } -int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *endpoint) +struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan) +{ + if (!chan) { + return ast_channel_topic_all_cached(); + } + + return stasis_cp_single_topic_cached(chan->topics); +} + +int ast_channel_forward_endpoint(struct ast_channel *chan, + struct ast_endpoint *endpoint) { ast_assert(chan != NULL); ast_assert(endpoint != NULL); chan->endpoint_forward = - stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint)); + stasis_forward_all(ast_channel_topic(chan), + ast_endpoint_topic(endpoint)); if (chan->endpoint_forward == NULL) { return -1; @@ -1468,19 +1484,21 @@ int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint * return 0; } -void ast_channel_internal_setup_topics(struct ast_channel *chan) +int ast_channel_internal_setup_topics(struct ast_channel *chan) { const char *topic_name = chan->uniqueid; - ast_assert(chan->topic == NULL); - ast_assert(chan->forwarder == NULL); + ast_assert(chan->topics == NULL); if (ast_strlen_zero(topic_name)) { topic_name = "<dummy-channel>"; } - chan->topic = stasis_topic_create(topic_name); - chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all()); + chan->topics = stasis_cp_single_create( + ast_channel_cache_all(), topic_name); - ast_assert(chan->topic != NULL); - ast_assert(chan->forwarder != NULL); + if (!chan->topics) { + return -1; + } + + return 0; } diff --git a/main/cli.c b/main/cli.c index 6474f3e72..6ca0737ab 100644 --- a/main/cli.c +++ b/main/cli.c @@ -915,7 +915,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar return CLI_SHOWUSAGE; - if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) { + if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { ast_cli(a->fd, "Failed to retrieve cached channels\n"); return CLI_SUCCESS; } @@ -1438,7 +1438,7 @@ static char *handle_showchan(struct ast_cli_entry *e, int cmd, struct ast_cli_ar now = ast_tvnow(); - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { + if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { ast_cli(a->fd, "%s is not a known channel\n", a->argv[3]); return CLI_SUCCESS; } @@ -1571,7 +1571,7 @@ char *ast_complete_channels(const char *line, const char *word, int pos, int sta return NULL; } - if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached(), ast_channel_snapshot_type()))) { + if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) { return NULL; } diff --git a/main/devicestate.c b/main/devicestate.c index b2c70f764..c16a0628b 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -196,6 +196,7 @@ static ast_cond_t change_pending; struct stasis_subscription *devstate_message_sub; static struct stasis_topic *device_state_topic_all; +static struct stasis_cache *device_state_cache; static struct stasis_caching_topic *device_state_topic_cached; static struct stasis_topic_pool *device_state_topic_pool; @@ -285,7 +286,7 @@ static enum ast_device_state devstate_cached(const char *device) RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup); struct ast_device_state_message *device_state; - cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device); if (!cached_msg) { return AST_DEVICE_UNKNOWN; } @@ -586,7 +587,7 @@ static enum ast_device_state get_aggregate_state(char *device) ast_devstate_aggregate_init(&aggregate); - cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device); @@ -598,7 +599,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup); struct ast_device_state_message *cached_aggregate_device_state; - cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device); if (!cached_aggregate_msg) { return 1; } @@ -719,9 +720,14 @@ struct stasis_topic *ast_device_state_topic_all(void) return device_state_topic_all; } -struct stasis_caching_topic *ast_device_state_topic_cached(void) +struct stasis_cache *ast_device_state_cache(void) { - return device_state_topic_cached; + return device_state_cache; +} + +struct stasis_topic *ast_device_state_topic_cached(void) +{ + return stasis_caching_get_topic(device_state_topic_cached); } struct stasis_topic *ast_device_state_topic(const char *device) @@ -777,6 +783,8 @@ static void devstate_cleanup(void) devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub); ao2_cleanup(device_state_topic_all); device_state_topic_all = NULL; + ao2_cleanup(device_state_cache); + device_state_cache = NULL; device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type); ao2_cleanup(device_state_topic_pool); @@ -794,7 +802,11 @@ int devstate_init(void) if (!device_state_topic_all) { return -1; } - device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id); + device_state_cache = stasis_cache_create(device_state_get_id); + if (!device_state_cache) { + return -1; + } + device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache); if (!device_state_topic_cached) { return -1; } @@ -803,7 +815,7 @@ int devstate_init(void) return -1; } - devstate_message_sub = stasis_subscribe(stasis_caching_get_topic(ast_device_state_topic_cached()), devstate_change_collector_cb, NULL); + devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL); if (!devstate_message_sub) { ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); diff --git a/main/endpoints.c b/main/endpoints.c index d689f2e6e..b33e33f1a 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -58,17 +58,29 @@ struct ast_endpoint { */ int max_channels; /*! Topic for this endpoint's messages */ - struct stasis_topic *topic; - /*! - * Forwarding subscription sending messages to ast_endpoint_topic_all() - */ - struct stasis_subscription *forward; + struct stasis_cp_single *topics; /*! Router for handling this endpoint's messages */ struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; }; +struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) +{ + if (!endpoint) { + return ast_endpoint_topic_all(); + } + return stasis_cp_single_topic(endpoint->topics); +} + +struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint) +{ + if (!endpoint) { + return ast_endpoint_topic_all_cached(); + } + return stasis_cp_single_topic_cached(endpoint->topics); +} + const char *ast_endpoint_state_to_string(enum ast_endpoint_state state) { switch (state) { @@ -88,7 +100,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); ast_assert(endpoint != NULL); - ast_assert(endpoint->topic != NULL); + ast_assert(endpoint->topics != NULL); snapshot = ast_endpoint_snapshot_create(endpoint); if (!snapshot) { @@ -98,7 +110,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) if (!message) { return; } - stasis_publish(endpoint->topic, message); + stasis_publish(ast_endpoint_topic(endpoint), message); } static void endpoint_dtor(void *obj) @@ -110,11 +122,8 @@ static void endpoint_dtor(void *obj) ao2_cleanup(endpoint->router); endpoint->router = NULL; - stasis_unsubscribe(endpoint->forward); - endpoint->forward = NULL; - - ao2_cleanup(endpoint->topic); - endpoint->topic = NULL; + stasis_cp_single_unsubscribe(endpoint->topics); + endpoint->topics = NULL; ao2_cleanup(endpoint->channel_ids); endpoint->channel_ids = NULL; @@ -214,18 +223,13 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) return NULL; } - endpoint->topic = stasis_topic_create(endpoint->id); - if (!endpoint->topic) { - return NULL; - } - - endpoint->forward = - stasis_forward_all(endpoint->topic, ast_endpoint_topic_all()); - if (!endpoint->forward) { + endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), + endpoint->id); + if (!endpoint->topics) { return NULL; } - endpoint->router = stasis_message_router_create(endpoint->topic); + endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; } @@ -271,7 +275,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); message = stasis_cache_clear_create(clear_msg); if (message) { - stasis_publish(endpoint->topic, message); + stasis_publish(ast_endpoint_topic(endpoint), message); } } @@ -285,11 +289,6 @@ const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) return endpoint->resource; } -struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) -{ - return endpoint ? endpoint->topic : ast_endpoint_topic_all(); -} - void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state) { diff --git a/main/manager.c b/main/manager.c index 33bf26977..8ea7f4202 100644 --- a/main/manager.c +++ b/main/manager.c @@ -3874,7 +3874,7 @@ static int action_status(struct mansession *s, const struct message *m) } if (all) { - if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) { + if (!(cached_channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { ast_free(str); astman_send_error(s, m, "Memory Allocation Failure"); return 1; @@ -3882,7 +3882,7 @@ static int action_status(struct mansession *s, const struct message *m) it_chans = ao2_iterator_init(cached_channels, 0); msg = ao2_iterator_next(&it_chans); } else { - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), name))) { + if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), name))) { astman_send_error(s, m, "No such channel"); ast_free(str); return 0; @@ -5356,7 +5356,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m idText[0] = '\0'; } - if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) { + if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { astman_send_error(s, m, "Could not get cached channels"); return 0; } diff --git a/main/manager_bridges.c b/main/manager_bridges.c index 7f0ae6b01..c791e63f3 100644 --- a/main/manager_bridges.c +++ b/main/manager_bridges.c @@ -350,7 +350,7 @@ static int manager_bridges_list(struct mansession *s, const struct message *m) ast_str_set(&id_text, 0, "ActionID: %s\r\n", id); } - bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()); + bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()); if (!bridges) { astman_send_error(s, m, "Internal error"); return -1; @@ -382,7 +382,7 @@ static int send_bridge_info_item_cb(void *obj, void *arg, void *data, int flags) RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_channel_snapshot *snapshot; RAII_VAR(struct ast_str *, channel_text, NULL, ast_free); - msg = stasis_cache_get(ast_channel_topic_all_cached(), + msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid); if (!msg) { @@ -432,7 +432,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m) ast_str_set(&id_text, 0, "ActionID: %s\r\n", id); } - msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), bridge_uniqueid); + msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), bridge_uniqueid); if (!msg) { astman_send_error(s, m, "Specified BridgeUniqueid not found"); return -1; @@ -489,7 +489,7 @@ int manager_bridging_init(void) return -1; } - bridge_topic = stasis_caching_get_topic(ast_bridge_topic_all_cached()); + bridge_topic = ast_bridge_topic_all_cached(); if (!bridge_topic) { return -1; } diff --git a/main/manager_channels.c b/main/manager_channels.c index d26f0be06..cab4aa38d 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -1269,7 +1269,7 @@ int manager_channels_init(void) if (!message_router) { return -1; } - channel_topic = stasis_caching_get_topic(ast_channel_topic_all_cached()); + channel_topic = ast_channel_topic_all_cached(); if (!channel_topic) { return -1; } diff --git a/main/manager_endpoints.c b/main/manager_endpoints.c index f0ed28a2c..634283728 100644 --- a/main/manager_endpoints.c +++ b/main/manager_endpoints.c @@ -49,7 +49,11 @@ static void endpoint_state_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) { - stasis_forward_message(ast_manager_get_topic(), stasis_caching_get_topic(ast_endpoint_topic_all_cached()), message); + /* XXX This looks wrong. Nothing should post or forward to a caching + * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have + * to dig to make sure I don't break anything, though. + */ + stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message); } int manager_endpoints_init(void) @@ -64,7 +68,7 @@ int manager_endpoints_init(void) ast_register_atexit(manager_endpoints_shutdown); - endpoint_topic = stasis_caching_get_topic(ast_endpoint_topic_all_cached()); + endpoint_topic = ast_endpoint_topic_all_cached(); if (!endpoint_topic) { return -1; } diff --git a/main/pbx.c b/main/pbx.c index 0b8024a1f..27f774ac3 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -8042,7 +8042,7 @@ static char *handle_show_chanvar(struct ast_cli_entry *e, int cmd, struct ast_cl if (a->argc != e->args + 1) return CLI_SHOWUSAGE; - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { + if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { ast_cli(a->fd, "Channel '%s' not found\n", a->argv[e->args]); return CLI_FAILURE; } diff --git a/main/presencestate.c b/main/presencestate.c index 45174d5a1..3f394b635 100644 --- a/main/presencestate.c +++ b/main/presencestate.c @@ -55,6 +55,7 @@ static const struct { STASIS_MESSAGE_TYPE_DEFN(ast_presence_state_message_type); struct stasis_topic *presence_state_topic_all; +struct stasis_cache *presence_state_cache; struct stasis_caching_topic *presence_state_topic_cached; /*! \brief A presence state provider */ @@ -95,7 +96,7 @@ static enum ast_presence_state presence_state_cached(const char *presence_provid RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_presence_state_message *presence_state; - msg = stasis_cache_get(ast_presence_state_topic_cached(), ast_presence_state_message_type(), presence_provider); + msg = stasis_cache_get(ast_presence_state_cache(), ast_presence_state_message_type(), presence_provider); if (!msg) { return res; @@ -294,9 +295,14 @@ struct stasis_topic *ast_presence_state_topic_all(void) return presence_state_topic_all; } -struct stasis_caching_topic *ast_presence_state_topic_cached(void) +struct stasis_cache *ast_presence_state_cache(void) { - return presence_state_topic_cached; + return presence_state_cache; +} + +struct stasis_topic *ast_presence_state_topic_cached(void) +{ + return stasis_caching_get_topic(presence_state_topic_cached); } static const char *presence_state_get_id(struct stasis_message *msg) @@ -314,6 +320,8 @@ static void presence_state_engine_cleanup(void) { ao2_cleanup(presence_state_topic_all); presence_state_topic_all = NULL; + ao2_cleanup(presence_state_cache); + presence_state_cache = NULL; presence_state_topic_cached = stasis_caching_unsubscribe_and_join(presence_state_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(ast_presence_state_message_type); } @@ -331,7 +339,12 @@ int ast_presence_state_engine_init(void) return -1; } - presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_get_id); + presence_state_cache = stasis_cache_create(presence_state_get_id); + if (!presence_state_cache) { + return -1; + } + + presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_cache); if (!presence_state_topic_cached) { return -1; } diff --git a/main/stasis.c b/main/stasis.c index 64f77e309..b1af7b7f6 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -653,6 +653,11 @@ int stasis_init(void) return -1; } + if (stasis_wait_init() != 0) { + ast_log(LOG_ERROR, "Stasis initialization failed\n"); + return -1; + } + if (pool) { ast_log(LOG_ERROR, "Stasis double-initialized\n"); return -1; diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c index 858875ad9..72f4d5055 100644 --- a/main/stasis_bridges.c +++ b/main/stasis_bridges.c @@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/channel.h" #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" @@ -361,6 +362,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message); static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message); +static struct stasis_cp_all *bridge_cache_all; + /*! * @{ \brief Define bridge message types. */ @@ -372,14 +375,52 @@ STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_am STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami); /*! @} */ -/*! \brief Aggregate topic for bridge messages */ -static struct stasis_topic *bridge_topic_all; +struct stasis_cache *ast_bridge_cache(void) +{ + return stasis_cp_all_cache(bridge_cache_all); +} -/*! \brief Caching aggregate topic for bridge snapshots */ -static struct stasis_caching_topic *bridge_topic_all_cached; +struct stasis_topic *ast_bridge_topic_all(void) +{ + return stasis_cp_all_topic(bridge_cache_all); +} + +struct stasis_topic *ast_bridge_topic_all_cached(void) +{ + return stasis_cp_all_topic_cached(bridge_cache_all); +} -/*! \brief Topic pool for individual bridge topics */ -static struct stasis_topic_pool *bridge_topic_pool; +int bridge_topics_init(struct ast_bridge *bridge) +{ + if (ast_strlen_zero(bridge->uniqueid)) { + ast_log(LOG_ERROR, "Bridge id initialization required\n"); + return -1; + } + bridge->topics = stasis_cp_single_create(bridge_cache_all, + bridge->uniqueid); + if (!bridge->topics) { + return -1; + } + return 0; +} + +struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge) +{ + if (!bridge) { + return ast_bridge_topic_all(); + } + + return stasis_cp_single_topic(bridge->topics); +} + +struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge) +{ + if (!bridge) { + return ast_bridge_topic_all_cached(); + } + + return stasis_cp_single_topic_cached(bridge->topics); +} /*! \brief Destructor for bridge snapshots */ static void bridge_snapshot_dtor(void *obj) @@ -425,25 +466,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge return snapshot; } -struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge) -{ - struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid); - if (!bridge_topic) { - return ast_bridge_topic_all(); - } - return bridge_topic; -} - -struct stasis_topic *ast_bridge_topic_all(void) -{ - return bridge_topic_all; -} - -struct stasis_caching_topic *ast_bridge_topic_all_cached(void) -{ - return bridge_topic_all_cached; -} - void ast_bridge_publish_state(struct ast_bridge *bridge) { RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup); @@ -464,7 +486,8 @@ void ast_bridge_publish_state(struct ast_bridge *bridge) stasis_publish(ast_bridge_topic(bridge), msg); } -static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj) +static void bridge_publish_state_from_blob(struct ast_bridge *bridge, + struct ast_bridge_blob *obj) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); @@ -475,7 +498,7 @@ static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj) return; } - stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg); + stasis_publish(ast_bridge_topic(bridge), msg); } /*! \brief Destructor for bridge merge messages */ @@ -597,7 +620,7 @@ void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *cha /* enter blob first, then state */ stasis_publish(ast_bridge_topic(bridge), msg); - bridge_publish_state_from_blob(stasis_message_data(msg)); + bridge_publish_state_from_blob(bridge, stasis_message_data(msg)); } void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan) @@ -610,7 +633,7 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha } /* state first, then leave blob (opposite of enter, preserves nesting of events) */ - bridge_publish_state_from_blob(stasis_message_data(msg)); + bridge_publish_state_from_blob(bridge, stasis_message_data(msg)); stasis_publish(ast_bridge_topic(bridge), msg); } @@ -1043,7 +1066,7 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid) ast_assert(!ast_strlen_zero(uniqueid)); - message = stasis_cache_get(ast_bridge_topic_all_cached(), + message = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), uniqueid); if (!message) { @@ -1058,23 +1081,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid) return snapshot; } -static void stasis_bridging_cleanup(void) -{ - ao2_cleanup(bridge_topic_all); - bridge_topic_all = NULL; - bridge_topic_all_cached = stasis_caching_unsubscribe_and_join( - bridge_topic_all_cached); - ao2_cleanup(bridge_topic_pool); - bridge_topic_pool = NULL; - - STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type); -} - /*! \brief snapshot ID getter for caching topic */ static const char *bridge_snapshot_get_id(struct stasis_message *msg) { @@ -1086,21 +1092,38 @@ static const char *bridge_snapshot_get_id(struct stasis_message *msg) return snapshot->uniqueid; } +static void stasis_bridging_cleanup(void) +{ + STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type); + + ao2_cleanup(bridge_cache_all); + bridge_cache_all = NULL; +} + int ast_stasis_bridging_init(void) { + int res = 0; + ast_register_cleanup(stasis_bridging_cleanup); - STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type); - STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type); - STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type); - STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type); - bridge_topic_all = stasis_topic_create("ast_bridge_topic_all"); - bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id); - bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all); - - return !bridge_topic_all - || !bridge_topic_all_cached - || !bridge_topic_pool ? -1 : 0; + bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all", + bridge_snapshot_get_id); + + if (!bridge_cache_all) { + return -1; + } + + res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type); + + return res; } diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 17be90111..3d5065665 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -44,16 +44,19 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #endif /*! \internal */ +struct stasis_cache { + struct ao2_container *entries; + snapshot_get_id id_fn; +}; + +/*! \internal */ struct stasis_caching_topic { - struct ao2_container *cache; + struct stasis_cache *cache; struct stasis_topic *topic; struct stasis_topic *original_topic; struct stasis_subscription *sub; - snapshot_get_id id_fn; }; -static struct stasis_message_type *cache_guarantee_type(void); - static void stasis_caching_topic_dtor(void *obj) { struct stasis_caching_topic *caching_topic = obj; ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub)); @@ -136,7 +139,8 @@ static struct cache_entry *cache_entry_create(struct stasis_message_type *type, ast_assert(type != NULL); ast_assert(id != NULL); - entry = ao2_alloc(sizeof(*entry), cache_entry_dtor); + entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); if (!entry) { return NULL; } @@ -183,28 +187,62 @@ static int cache_entry_cmp(void *obj, void *arg, int flags) return 0; } -static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot) +static void cache_dtor(void *obj) +{ + struct stasis_cache *cache = obj; + + ao2_cleanup(cache->entries); + cache->entries = NULL; +} + +struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn) +{ + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); + + cache = ao2_alloc_options(sizeof(*cache), cache_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!cache) { + return NULL; + } + + cache->entries = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, + cache_entry_cmp); + if (!cache->entries) { + return NULL; + } + + cache->id_fn = id_fn; + + ao2_ref(cache, +1); + return cache; +} + +static struct stasis_message *cache_put(struct stasis_cache *cache, + struct stasis_message_type *type, const char *id, + struct stasis_message *new_snapshot) { RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup); RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); struct stasis_message *old_snapshot = NULL; - ast_assert(caching_topic->cache != NULL); + ast_assert(cache->entries != NULL); + ast_assert(new_snapshot == NULL || + type == stasis_message_type(new_snapshot)); new_entry = cache_entry_create(type, id, new_snapshot); if (new_snapshot == NULL) { /* Remove entry from cache */ - cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK); + cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_UNLINK); if (cached_entry) { old_snapshot = cached_entry->snapshot; cached_entry->snapshot = NULL; } } else { /* Insert/update cache */ - SCOPED_AO2LOCK(lock, caching_topic->cache); + SCOPED_AO2LOCK(lock, cache->entries); - cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK); + cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_NOLOCK); if (cached_entry) { /* Update cache. Because objects are moving, no need to update refcounts. */ old_snapshot = cached_entry->snapshot; @@ -212,7 +250,7 @@ static struct stasis_message *cache_put(struct stasis_caching_topic *caching_top new_entry->snapshot = NULL; } else { /* Insert into the cache */ - ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK); + ao2_link_flags(cache->entries, new_entry, OBJ_NOLOCK); } } @@ -220,68 +258,19 @@ static struct stasis_message *cache_put(struct stasis_caching_topic *caching_top return old_snapshot; } -/*! \internal */ -struct caching_guarantee { - ast_mutex_t lock; - ast_cond_t cond; - unsigned int done:1; -}; - -static void caching_guarantee_dtor(void *obj) -{ - struct caching_guarantee *guarantee = obj; - - ast_assert(guarantee->done == 1); - - ast_mutex_destroy(&guarantee->lock); - ast_cond_destroy(&guarantee->cond); -} - -static struct stasis_message *caching_guarantee_create(void) -{ - RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - - if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) { - return NULL; - } - - ast_mutex_init(&guarantee->lock); - ast_cond_init(&guarantee->cond, NULL); - - if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) { - return NULL; - } - - ao2_ref(msg, +1); - return msg; -} - -struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, unsigned int guaranteed) +struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id) { RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup); RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); - ast_assert(caching_topic->cache != NULL); - - if (guaranteed) { - RAII_VAR(struct stasis_message *, msg, caching_guarantee_create(), ao2_cleanup); - struct caching_guarantee *guarantee = stasis_message_data(msg); - - ast_mutex_lock(&guarantee->lock); - stasis_publish(caching_topic->original_topic, msg); - while (!guarantee->done) { - ast_cond_wait(&guarantee->cond, &guarantee->lock); - } - ast_mutex_unlock(&guarantee->lock); - } + ast_assert(cache->entries != NULL); search_entry = cache_entry_create(type, id, NULL); if (search_entry == NULL) { return NULL; } - cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER); + cached_entry = ao2_find(cache->entries, search_entry, OBJ_POINTER); if (cached_entry == NULL) { return NULL; } @@ -308,25 +297,25 @@ static int cache_dump_cb(void *obj, void *arg, int flags) return 0; } -struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type) +struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type) { struct cache_dump_data cache_dump; - ast_assert(caching_topic->cache != NULL); + ast_assert(cache->entries != NULL); cache_dump.type = type; - cache_dump.cached = ao2_container_alloc(1, NULL, NULL); + cache_dump.cached = ao2_container_alloc_options( + AO2_ALLOC_OPT_LOCK_NOLOCK, 1, NULL, NULL); if (!cache_dump.cached) { return NULL; } - ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump); + ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump); return cache_dump.cached; } STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type); STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type); -STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type); struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message) { @@ -362,7 +351,8 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s ast_assert(topic != NULL); ast_assert(old_snapshot != NULL || new_snapshot != NULL); - update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor); + update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); if (!update) { return NULL; } @@ -393,7 +383,8 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s return msg; } -static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void caching_topic_exec(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) { RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup); struct stasis_caching_topic *caching_topic = data; @@ -401,36 +392,25 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru ast_assert(caching_topic != NULL); ast_assert(caching_topic->topic != NULL); - ast_assert(caching_topic->id_fn != NULL); + ast_assert(caching_topic->cache != NULL); + ast_assert(caching_topic->cache->id_fn != NULL); if (stasis_subscription_final_message(sub, message)) { caching_topic_needs_unref = caching_topic; } - /* Handle cache guarantee event */ - if (cache_guarantee_type() == stasis_message_type(message)) { - struct caching_guarantee *guarantee = stasis_message_data(message); - - ast_mutex_lock(&guarantee->lock); - guarantee->done = 1; - ast_cond_signal(&guarantee->cond); - ast_mutex_unlock(&guarantee->lock); - - return; - } - /* Handle cache clear event */ if (stasis_cache_clear_type() == stasis_message_type(message)) { RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); struct stasis_message *clear_msg = stasis_message_data(message); - const char *clear_id = caching_topic->id_fn(clear_msg); + const char *clear_id = caching_topic->cache->id_fn(clear_msg); struct stasis_message_type *clear_type = stasis_message_type(clear_msg); ast_assert(clear_type != NULL); if (clear_id) { - old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL); + old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL); if (old_snapshot) { update = update_create(topic, old_snapshot, NULL); stasis_publish(caching_topic->topic, update); @@ -444,7 +424,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru } } - id = caching_topic->id_fn(message); + id = caching_topic->cache->id_fn(message); if (id == NULL) { /* Object isn't cached; forward */ stasis_forward_message(caching_topic->topic, topic, message); @@ -453,7 +433,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); - old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message); + old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message); update = update_create(topic, old_snapshot, message); if (update == NULL) { @@ -464,7 +444,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru } } -struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn) +struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache) { RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); struct stasis_subscription *sub; @@ -476,23 +456,19 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or return NULL; } - caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor); + caching_topic = ao2_alloc_options(sizeof(*caching_topic), + stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); if (caching_topic == NULL) { return NULL; } - caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp); - if (!caching_topic->cache) { - ast_log(LOG_ERROR, "Stasis cache allocation failed\n"); - return NULL; - } - caching_topic->topic = stasis_topic_create(new_name); if (caching_topic->topic == NULL) { return NULL; } - caching_topic->id_fn = id_fn; + ao2_ref(cache, +1); + caching_topic->cache = cache; sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0); if (sub == NULL) { @@ -514,7 +490,6 @@ static void stasis_cache_cleanup(void) { STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type); STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type); - STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type); } int stasis_cache_init(void) @@ -529,10 +504,6 @@ int stasis_cache_init(void) return -1; } - if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) { - return -1; - } - return 0; } diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c new file mode 100644 index 000000000..18ae8617e --- /dev/null +++ b/main/stasis_cache_pattern.c @@ -0,0 +1,189 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Typical cache pattern for Stasis topics. + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis_cache_pattern.h" + +struct stasis_cp_all { + struct stasis_topic *topic; + struct stasis_topic *topic_cached; + struct stasis_cache *cache; +}; + +struct stasis_cp_single { + struct stasis_topic *topic; + struct stasis_caching_topic *topic_cached; + + struct stasis_subscription *forward; + struct stasis_subscription *forward_cached; +}; + +static void all_dtor(void *obj) +{ + struct stasis_cp_all *all = obj; + + ao2_cleanup(all->topic); + ao2_cleanup(all->topic_cached); + ao2_cleanup(all->cache); +} + +struct stasis_cp_all *stasis_cp_all_create(const char *name, + snapshot_get_id id_fn) +{ + RAII_VAR(char *, cached_name, NULL, ast_free); + RAII_VAR(struct stasis_cp_all *, all, NULL, ao2_cleanup); + + all = ao2_alloc(sizeof(*all), all_dtor); + if (!all) { + return NULL; + } + + ast_asprintf(&cached_name, "%s-cached", name); + if (!cached_name) { + return NULL; + } + + all->topic = stasis_topic_create(name); + all->topic_cached = stasis_topic_create(cached_name); + all->cache = stasis_cache_create(id_fn); + + if (!all->topic || !all->topic_cached || !all->cache) { + return NULL; + } + + ao2_ref(all, +1); + return all; +} + +struct stasis_topic *stasis_cp_all_topic(struct stasis_cp_all *all) +{ + if (!all) { + return NULL; + } + return all->topic; +} + +struct stasis_topic *stasis_cp_all_topic_cached( + struct stasis_cp_all *all) +{ + if (!all) { + return NULL; + } + return all->topic_cached; +} + +struct stasis_cache *stasis_cp_all_cache(struct stasis_cp_all *all) +{ + if (!all) { + return NULL; + } + return all->cache; +} + +static void one_dtor(void *obj) +{ + struct stasis_cp_single *one = obj; + + /* Should already be unsubscribed */ + ast_assert(one->topic_cached == NULL); + ast_assert(one->forward == NULL); + ast_assert(one->forward_cached == NULL); + + ao2_cleanup(one->topic); + one->topic = NULL; +} + +struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, + const char *name) +{ + RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); + + one = ao2_alloc(sizeof(*one), one_dtor); + if (!one) { + return NULL; + } + + one->topic = stasis_topic_create(name); + if (!one->topic) { + return NULL; + } + one->topic_cached = stasis_caching_topic_create(one->topic, all->cache); + if (!one->topic_cached) { + return NULL; + } + + one->forward = stasis_forward_all(one->topic, all->topic); + if (!one->forward) { + return NULL; + } + one->forward_cached = stasis_forward_all( + stasis_caching_get_topic(one->topic_cached), all->topic_cached); + if (!one->forward_cached) { + return NULL; + } + + ao2_ref(one, +1); + return one; +} + +void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) +{ + if (!one) { + return; + } + + stasis_caching_unsubscribe(one->topic_cached); + one->topic_cached = NULL; + stasis_unsubscribe(one->forward); + one->forward = NULL; + stasis_unsubscribe(one->forward_cached); + one->forward_cached = NULL; +} + +struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one) +{ + if (!one) { + return NULL; + } + return one->topic; +} + +struct stasis_topic *stasis_cp_single_topic_cached( + struct stasis_cp_single *one) +{ + if (!one) { + return NULL; + } + return stasis_caching_get_topic(one->topic_cached); +} + diff --git a/main/stasis_channels.c b/main/stasis_channels.c index c71bbbd14..6729a1072 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -38,6 +38,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/bridge.h" #include "asterisk/translate.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/stasis_channels.h" /*** DOCUMENTATION @@ -88,23 +89,33 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7 -/*! \brief Topic for all channels */ -struct stasis_topic *channel_topic_all; +static struct stasis_cp_all *channel_cache_all; +static struct stasis_cache *channel_cache_by_name; +static struct stasis_caching_topic *channel_by_name_topic; -/*! \brief Caching topic for all channels */ -struct stasis_caching_topic *channel_topic_all_cached; +struct stasis_cp_all *ast_channel_cache_all(void) +{ + return channel_cache_all; +} -/*! \brief Caching topic for all channels indexed by name */ -struct stasis_caching_topic *channel_topic_all_cached_by_name; +struct stasis_cache *ast_channel_cache(void) +{ + return stasis_cp_all_cache(channel_cache_all); +} struct stasis_topic *ast_channel_topic_all(void) { - return channel_topic_all; + return stasis_cp_all_topic(channel_cache_all); +} + +struct stasis_topic *ast_channel_topic_all_cached(void) +{ + return stasis_cp_all_topic_cached(channel_cache_all); } -struct stasis_caching_topic *ast_channel_topic_all_cached(void) +struct stasis_cache *ast_channel_cache_by_name(void) { - return channel_topic_all_cached; + return channel_cache_by_name; } static const char *channel_snapshot_get_id(struct stasis_message *message) @@ -117,11 +128,6 @@ static const char *channel_snapshot_get_id(struct stasis_message *message) return snapshot->uniqueid; } -struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void) -{ - return channel_topic_all_cached_by_name; -} - static const char *channel_snapshot_get_name(struct stasis_message *message) { struct ast_channel_snapshot *snapshot; @@ -461,7 +467,7 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniquei ast_assert(!ast_strlen_zero(uniqueid)); - message = stasis_cache_get(ast_channel_topic_all_cached(), + message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid); if (!message) { @@ -483,7 +489,7 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char ast_assert(!ast_strlen_zero(name)); - message = stasis_cache_get(ast_channel_topic_all_cached_by_name(), + message = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), name); if (!message) { @@ -906,10 +912,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type, static void stasis_channels_cleanup(void) { - channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached); - channel_topic_all_cached_by_name = stasis_caching_unsubscribe_and_join(channel_topic_all_cached_by_name); - ao2_cleanup(channel_topic_all); - channel_topic_all = NULL; STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type); @@ -929,33 +931,58 @@ static void stasis_channels_cleanup(void) STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type); + + stasis_caching_unsubscribe_and_join(channel_by_name_topic); + channel_by_name_topic = NULL; + ao2_cleanup(channel_cache_by_name); + channel_cache_by_name = NULL; + ao2_cleanup(channel_cache_all); + channel_cache_all = NULL; } -void ast_stasis_channels_init(void) +int ast_stasis_channels_init(void) { + int res = 0; + ast_register_cleanup(stasis_channels_cleanup); - STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type); + channel_cache_all = stasis_cp_all_create("ast_channel_topic_all", + channel_snapshot_get_id); + if (!channel_cache_all) { + return -1; + } STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type); STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type); - channel_topic_all = stasis_topic_create("ast_channel_topic_all"); - channel_topic_all_cached = stasis_caching_topic_create(channel_topic_all, channel_snapshot_get_id); - channel_topic_all_cached_by_name = stasis_caching_topic_create(channel_topic_all, channel_snapshot_get_name); + channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name); + if (!channel_cache_by_name) { + return -1; + } + + channel_by_name_topic = stasis_caching_topic_create( + stasis_cp_all_topic(channel_cache_all), + channel_cache_by_name); + if (!channel_by_name_topic) { + return -1; + } + + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type); + + return res; } diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index a6756182c..831b4aee0 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -73,6 +73,28 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") </managerEvent> ***/ +static struct stasis_cp_all *endpoint_cache_all; + +struct stasis_cp_all *ast_endpoint_cache_all(void) +{ + return endpoint_cache_all; +} + +struct stasis_cache *ast_endpoint_cache(void) +{ + return stasis_cp_all_cache(endpoint_cache_all); +} + +struct stasis_topic *ast_endpoint_topic_all(void) +{ + return stasis_cp_all_topic(endpoint_cache_all); +} + +struct stasis_topic *ast_endpoint_topic_all_cached(void) +{ + return stasis_cp_all_topic_cached(endpoint_cache_all); +} + static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg); STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type); @@ -80,10 +102,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type, .to_ami = peerstatus_to_ami, ); -static struct stasis_topic *endpoint_topic_all; - -static struct stasis_caching_topic *endpoint_topic_all_cached; - static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg) { struct ast_endpoint_blob *obj = stasis_message_data(msg); @@ -168,16 +186,6 @@ void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_mess } } -struct stasis_topic *ast_endpoint_topic_all(void) -{ - return endpoint_topic_all; -} - -struct stasis_caching_topic *ast_endpoint_topic_all_cached(void) -{ - return endpoint_topic_all_cached; -} - struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, const char *name, unsigned int guaranteed) { @@ -190,8 +198,12 @@ struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, return NULL; } - msg = stasis_cache_get_extended(ast_endpoint_topic_all_cached(), - ast_endpoint_snapshot_type(), id, guaranteed); + if (guaranteed) { + stasis_topic_wait(ast_endpoint_topic_all_cached()); + } + + msg = stasis_cache_get(ast_endpoint_cache(), + ast_endpoint_snapshot_type(), id); if (!msg) { return NULL; } @@ -267,44 +279,28 @@ struct ast_json *ast_endpoint_snapshot_to_json( return ast_json_ref(json); } -static void endpoints_stasis_shutdown(void) +static void endpoints_stasis_cleanup(void) { - stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached); - endpoint_topic_all_cached = NULL; + STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type); - ao2_cleanup(endpoint_topic_all); - endpoint_topic_all = NULL; + ao2_cleanup(endpoint_cache_all); + endpoint_cache_all = NULL; } int ast_endpoint_stasis_init(void) { - ast_register_atexit(endpoints_stasis_shutdown); - - if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type) != 0) { - return -1; - } + int res = 0; + ast_register_cleanup(endpoints_stasis_cleanup); - if (!endpoint_topic_all) { - endpoint_topic_all = stasis_topic_create("endpoint_topic_all"); - } - - if (!endpoint_topic_all) { + endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all", + endpoint_snapshot_get_id); + if (!endpoint_cache_all) { return -1; } - if (!endpoint_topic_all_cached) { - endpoint_topic_all_cached = - stasis_caching_topic_create( - endpoint_topic_all, endpoint_snapshot_get_id); - } - - if (!endpoint_topic_all_cached) { - return -1; - } - - if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type) != 0) { - return -1; - } + res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type); - return 0; + return res; } diff --git a/main/stasis_wait.c b/main/stasis_wait.c new file mode 100644 index 000000000..e94c686e1 --- /dev/null +++ b/main/stasis_wait.c @@ -0,0 +1,133 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * Joshua Colp <jcolp@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Wait support for Stasis topics. + * + * \author Joshua Colp <jcolp@digium.com> + */ + +/*** MODULEINFO + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" + +static struct stasis_message_type *cache_guarantee_type(void); +STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type); + +/*! \internal */ +struct caching_guarantee { + ast_mutex_t lock; + ast_cond_t cond; + unsigned int done:1; +}; + +static void caching_guarantee_dtor(void *obj) +{ + struct caching_guarantee *guarantee = obj; + + ast_assert(guarantee->done == 1); + + ast_mutex_destroy(&guarantee->lock); + ast_cond_destroy(&guarantee->cond); +} + +static void guarantee_handler(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) +{ + /* Wait for our particular message */ + if (data == message) { + struct caching_guarantee *guarantee; + ast_assert(cache_guarantee_type() == stasis_message_type(message)); + guarantee = stasis_message_data(message); + + ast_mutex_lock(&guarantee->lock); + guarantee->done = 1; + ast_cond_signal(&guarantee->cond); + ast_mutex_unlock(&guarantee->lock); + } +} + +static struct stasis_message *caching_guarantee_create(void) +{ + RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) { + return NULL; + } + + ast_mutex_init(&guarantee->lock); + ast_cond_init(&guarantee->cond, NULL); + + if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) { + return NULL; + } + + ao2_ref(msg, +1); + return msg; +} + +int stasis_topic_wait(struct stasis_topic *topic) +{ + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + struct caching_guarantee *guarantee; + + msg = caching_guarantee_create(); + if (!msg) { + return -1; + } + + sub = stasis_subscribe(topic, guarantee_handler, msg); + if (!sub) { + return -1; + } + + guarantee = stasis_message_data(msg); + + ast_mutex_lock(&guarantee->lock); + stasis_publish(topic, msg); + while (!guarantee->done) { + ast_cond_wait(&guarantee->cond, &guarantee->lock); + } + ast_mutex_unlock(&guarantee->lock); + return 0; +} + +static void wait_cleanup(void) +{ + STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type); +} + +int stasis_wait_init(void) +{ + ast_register_cleanup(wait_cleanup); + + if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) { + return -1; + } + return 0; +} |