diff options
author | Kinsey Moore <kmoore@digium.com> | 2013-03-16 15:45:58 +0000 |
---|---|---|
committer | Kinsey Moore <kmoore@digium.com> | 2013-03-16 15:45:58 +0000 |
commit | 99aa02d17f7f1815f9a2abf75282f815a975cd67 (patch) | |
tree | 5ebb9cf3ddecd7153afb9e30f767d170c44b2142 /main | |
parent | 5d45596f6257b86189bef2dfaf5d9cc0b001fa46 (diff) |
Transition MWI to Stasis-core
Remove MWI's dependency on the event system by moving it to
Stasis-core. This also introduces forwarding topic pools in Stasis-core
which aggregate many dynamically allocated topics into a single primary
topic.
Review: https://reviewboard.asterisk.org/r/2368/
(closes issue ASTERISK-21097)
Patch-by: Kinsey Moore
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@383284 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r-- | main/app.c | 129 | ||||
-rw-r--r-- | main/asterisk.c | 5 | ||||
-rw-r--r-- | main/channel.c | 3 | ||||
-rw-r--r-- | main/stasis.c | 93 |
4 files changed, 228 insertions, 2 deletions
diff --git a/main/app.c b/main/app.c index 6db65f371..dca74849f 100644 --- a/main/app.c +++ b/main/app.c @@ -66,6 +66,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/threadstorage.h" #include "asterisk/test.h" #include "asterisk/module.h" +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" + +#define MWI_TOPIC_BUCKETS 57 AST_THREADSTORAGE_PUBLIC(ast_str_thread_global_buf); @@ -78,6 +82,11 @@ struct zombie { static AST_LIST_HEAD_STATIC(zombies, zombie); +static struct stasis_topic *mwi_topic_all; +static struct stasis_caching_topic *mwi_topic_cached; +static struct stasis_message_type *mwi_message_type; +static struct stasis_topic_pool *mwi_topic_pool; + static void *shaun_of_the_dead(void *data) { struct zombie *cur; @@ -2632,3 +2641,123 @@ int ast_app_parse_timelen(const char *timestr, int *result, enum ast_timelen uni return 0; } + + +static void mwi_state_dtor(void *obj) +{ + struct stasis_mwi_state *mwi_state = obj; + ast_string_field_free_memory(mwi_state); +} + +struct stasis_topic *stasis_mwi_topic_all(void) +{ + return mwi_topic_all; +} + +struct stasis_caching_topic *stasis_mwi_topic_cached(void) +{ + return mwi_topic_cached; +} + +struct stasis_message_type *stasis_mwi_state_message(void) +{ + return mwi_message_type; +} + +struct stasis_topic *stasis_mwi_topic(const char *uniqueid) +{ + return stasis_topic_pool_get_topic(mwi_topic_pool, uniqueid); +} + +int stasis_publish_mwi_state_full( + const char *mailbox, + const char *context, + int new_msgs, + int old_msgs, + struct ast_eid *eid) +{ + RAII_VAR(struct stasis_mwi_state *, mwi_state, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); + struct stasis_topic *mailbox_specific_topic; + + ast_assert(!ast_strlen_zero(mailbox)); + ast_assert(!ast_strlen_zero(context)); + + ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); + + mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor); + if (ast_string_field_init(mwi_state, 256)) { + return -1; + } + + ast_string_field_set(mwi_state, uniqueid, ast_str_buffer(uniqueid)); + ast_string_field_set(mwi_state, mailbox, mailbox); + ast_string_field_set(mwi_state, context, context); + mwi_state->new_msgs = new_msgs; + mwi_state->old_msgs = old_msgs; + if (eid) { + mwi_state->eid = *eid; + } else { + ast_set_default_eid(&mwi_state->eid); + } + + message = stasis_message_create(stasis_mwi_state_message(), mwi_state); + + mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (!mailbox_specific_topic) { + return -1; + } + + stasis_publish(mailbox_specific_topic, message); + + return 0; +} + +static const char *mwi_state_get_id(struct stasis_message *message) +{ + if (stasis_mwi_state_message() == stasis_message_type(message)) { + struct stasis_mwi_state *mwi_state = stasis_message_data(message); + return mwi_state->uniqueid; + } else if (stasis_subscription_change() == stasis_message_type(message)) { + struct stasis_subscription_change *change = stasis_message_data(message); + return change->uniqueid; + } + + return NULL; +} + +static void app_exit(void) +{ + ao2_cleanup(mwi_topic_all); + mwi_topic_all = NULL; + mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached); + ao2_cleanup(mwi_message_type); + mwi_message_type = NULL; + ao2_cleanup(mwi_topic_pool); + mwi_topic_pool = NULL; +} + +int app_init(void) +{ + mwi_topic_all = stasis_topic_create("stasis_mwi_topic"); + if (!mwi_topic_all) { + return -1; + } + mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id); + if (!mwi_topic_cached) { + return -1; + } + mwi_message_type = stasis_message_type_create("stasis_mwi_state"); + if (!mwi_message_type) { + return -1; + } + mwi_topic_pool = stasis_topic_pool_create(mwi_topic_all); + if (!mwi_topic_pool) { + return -1; + } + + ast_register_atexit(app_exit); + return 0; +} + diff --git a/main/asterisk.c b/main/asterisk.c index 4e5e58c05..3a0e87c41 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -4178,6 +4178,11 @@ int main(int argc, char *argv[]) aco_init(); + if (app_init()) { + printf("App core initialization failed.\n%s", term_quit()); + exit(1); + } + if (astdb_init()) { printf("%s", term_quit()); exit(1); diff --git a/main/channel.c b/main/channel.c index 3289edaa4..3f8319b34 100644 --- a/main/channel.c +++ b/main/channel.c @@ -8637,8 +8637,7 @@ static void channels_shutdown(void) __channel_varset = NULL; ao2_cleanup(__channel_topic_all); __channel_topic_all = NULL; - stasis_caching_unsubscribe(__channel_topic_all_cached); - __channel_topic_all_cached = NULL; + __channel_topic_all_cached = stasis_caching_unsubscribe(__channel_topic_all_cached); ast_data_unregister(NULL); ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel)); if (channels) { diff --git a/main/stasis.c b/main/stasis.c index a4d44b819..2ad0caf93 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -41,6 +41,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") /*! Initial size of the subscribers list. */ #define INITIAL_SUBSCRIBERS_MAX 4 +/*! The number of buckets to use for topic pools */ +#define TOPIC_POOL_BUCKETS 57 + /*! Threadpool for dispatching notifications to subscribers */ static struct ast_threadpool *pool; @@ -470,6 +473,96 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u stasis_publish(topic, msg); } +struct topic_pool_entry { + struct stasis_subscription *forward; + struct stasis_topic *topic; +}; + +static void topic_pool_entry_dtor(void *obj) +{ + struct topic_pool_entry *entry = obj; + entry->forward = stasis_unsubscribe(entry->forward); + ao2_cleanup(entry->topic); + entry->topic = NULL; +} + +static struct topic_pool_entry *topic_pool_entry_alloc(void) +{ + return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor); +} + +struct stasis_topic_pool { + struct ao2_container *pool_container; + struct stasis_topic *pool_topic; +}; + +static void topic_pool_dtor(void *obj) +{ + struct stasis_topic_pool *pool = obj; + ao2_cleanup(pool->pool_container); + pool->pool_container = NULL; + ao2_cleanup(pool->pool_topic); + pool->pool_topic = NULL; +} + +static int topic_pool_entry_hash(const void *obj, const int flags) +{ + const char *topic_name= (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic); + return ast_str_case_hash(topic_name); +} + +static int topic_pool_entry_cmp(void *obj, void *arg, int flags) +{ + struct topic_pool_entry *opt1 = obj, *opt2 = arg; + const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic); + return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP; +} + +struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic) +{ + RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup); + if (!pool) { + return NULL; + } + pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp); + ao2_ref(pooled_topic, +1); + pool->pool_topic = pooled_topic; + + ao2_ref(pool, +1); + return pool; +} + +struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name) +{ + RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup); + SCOPED_AO2LOCK(topic_container_lock, pool->pool_container); + topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK); + + if (topic_pool_entry) { + return topic_pool_entry->topic; + } + + topic_pool_entry = topic_pool_entry_alloc(); + + if (!topic_pool_entry) { + return NULL; + } + + topic_pool_entry->topic = stasis_topic_create(topic_name); + if (!topic_pool_entry->topic) { + return NULL; + } + + topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic); + if (!topic_pool_entry->forward) { + return NULL; + } + + ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK); + + return topic_pool_entry->topic; +} + /*! \brief Cleanup function */ static void stasis_exit(void) { |