summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorKinsey Moore <kmoore@digium.com>2013-03-16 15:45:58 +0000
committerKinsey Moore <kmoore@digium.com>2013-03-16 15:45:58 +0000
commit99aa02d17f7f1815f9a2abf75282f815a975cd67 (patch)
tree5ebb9cf3ddecd7153afb9e30f767d170c44b2142 /main
parent5d45596f6257b86189bef2dfaf5d9cc0b001fa46 (diff)
Transition MWI to Stasis-core
Remove MWI's dependency on the event system by moving it to Stasis-core. This also introduces forwarding topic pools in Stasis-core which aggregate many dynamically allocated topics into a single primary topic. Review: https://reviewboard.asterisk.org/r/2368/ (closes issue ASTERISK-21097) Patch-by: Kinsey Moore git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@383284 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r--main/app.c129
-rw-r--r--main/asterisk.c5
-rw-r--r--main/channel.c3
-rw-r--r--main/stasis.c93
4 files changed, 228 insertions, 2 deletions
diff --git a/main/app.c b/main/app.c
index 6db65f371..dca74849f 100644
--- a/main/app.c
+++ b/main/app.c
@@ -66,6 +66,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/threadstorage.h"
#include "asterisk/test.h"
#include "asterisk/module.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+
+#define MWI_TOPIC_BUCKETS 57
AST_THREADSTORAGE_PUBLIC(ast_str_thread_global_buf);
@@ -78,6 +82,11 @@ struct zombie {
static AST_LIST_HEAD_STATIC(zombies, zombie);
+static struct stasis_topic *mwi_topic_all;
+static struct stasis_caching_topic *mwi_topic_cached;
+static struct stasis_message_type *mwi_message_type;
+static struct stasis_topic_pool *mwi_topic_pool;
+
static void *shaun_of_the_dead(void *data)
{
struct zombie *cur;
@@ -2632,3 +2641,123 @@ int ast_app_parse_timelen(const char *timestr, int *result, enum ast_timelen uni
return 0;
}
+
+
+static void mwi_state_dtor(void *obj)
+{
+ struct stasis_mwi_state *mwi_state = obj;
+ ast_string_field_free_memory(mwi_state);
+}
+
+struct stasis_topic *stasis_mwi_topic_all(void)
+{
+ return mwi_topic_all;
+}
+
+struct stasis_caching_topic *stasis_mwi_topic_cached(void)
+{
+ return mwi_topic_cached;
+}
+
+struct stasis_message_type *stasis_mwi_state_message(void)
+{
+ return mwi_message_type;
+}
+
+struct stasis_topic *stasis_mwi_topic(const char *uniqueid)
+{
+ return stasis_topic_pool_get_topic(mwi_topic_pool, uniqueid);
+}
+
+int stasis_publish_mwi_state_full(
+ const char *mailbox,
+ const char *context,
+ int new_msgs,
+ int old_msgs,
+ struct ast_eid *eid)
+{
+ RAII_VAR(struct stasis_mwi_state *, mwi_state, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+ struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
+ struct stasis_topic *mailbox_specific_topic;
+
+ ast_assert(!ast_strlen_zero(mailbox));
+ ast_assert(!ast_strlen_zero(context));
+
+ ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+
+ mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor);
+ if (ast_string_field_init(mwi_state, 256)) {
+ return -1;
+ }
+
+ ast_string_field_set(mwi_state, uniqueid, ast_str_buffer(uniqueid));
+ ast_string_field_set(mwi_state, mailbox, mailbox);
+ ast_string_field_set(mwi_state, context, context);
+ mwi_state->new_msgs = new_msgs;
+ mwi_state->old_msgs = old_msgs;
+ if (eid) {
+ mwi_state->eid = *eid;
+ } else {
+ ast_set_default_eid(&mwi_state->eid);
+ }
+
+ message = stasis_message_create(stasis_mwi_state_message(), mwi_state);
+
+ mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+ if (!mailbox_specific_topic) {
+ return -1;
+ }
+
+ stasis_publish(mailbox_specific_topic, message);
+
+ return 0;
+}
+
+static const char *mwi_state_get_id(struct stasis_message *message)
+{
+ if (stasis_mwi_state_message() == stasis_message_type(message)) {
+ struct stasis_mwi_state *mwi_state = stasis_message_data(message);
+ return mwi_state->uniqueid;
+ } else if (stasis_subscription_change() == stasis_message_type(message)) {
+ struct stasis_subscription_change *change = stasis_message_data(message);
+ return change->uniqueid;
+ }
+
+ return NULL;
+}
+
+static void app_exit(void)
+{
+ ao2_cleanup(mwi_topic_all);
+ mwi_topic_all = NULL;
+ mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
+ ao2_cleanup(mwi_message_type);
+ mwi_message_type = NULL;
+ ao2_cleanup(mwi_topic_pool);
+ mwi_topic_pool = NULL;
+}
+
+int app_init(void)
+{
+ mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
+ if (!mwi_topic_all) {
+ return -1;
+ }
+ mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id);
+ if (!mwi_topic_cached) {
+ return -1;
+ }
+ mwi_message_type = stasis_message_type_create("stasis_mwi_state");
+ if (!mwi_message_type) {
+ return -1;
+ }
+ mwi_topic_pool = stasis_topic_pool_create(mwi_topic_all);
+ if (!mwi_topic_pool) {
+ return -1;
+ }
+
+ ast_register_atexit(app_exit);
+ return 0;
+}
+
diff --git a/main/asterisk.c b/main/asterisk.c
index 4e5e58c05..3a0e87c41 100644
--- a/main/asterisk.c
+++ b/main/asterisk.c
@@ -4178,6 +4178,11 @@ int main(int argc, char *argv[])
aco_init();
+ if (app_init()) {
+ printf("App core initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
if (astdb_init()) {
printf("%s", term_quit());
exit(1);
diff --git a/main/channel.c b/main/channel.c
index 3289edaa4..3f8319b34 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -8637,8 +8637,7 @@ static void channels_shutdown(void)
__channel_varset = NULL;
ao2_cleanup(__channel_topic_all);
__channel_topic_all = NULL;
- stasis_caching_unsubscribe(__channel_topic_all_cached);
- __channel_topic_all_cached = NULL;
+ __channel_topic_all_cached = stasis_caching_unsubscribe(__channel_topic_all_cached);
ast_data_unregister(NULL);
ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
if (channels) {
diff --git a/main/stasis.c b/main/stasis.c
index a4d44b819..2ad0caf93 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -41,6 +41,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! Initial size of the subscribers list. */
#define INITIAL_SUBSCRIBERS_MAX 4
+/*! The number of buckets to use for topic pools */
+#define TOPIC_POOL_BUCKETS 57
+
/*! Threadpool for dispatching notifications to subscribers */
static struct ast_threadpool *pool;
@@ -470,6 +473,96 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u
stasis_publish(topic, msg);
}
+struct topic_pool_entry {
+ struct stasis_subscription *forward;
+ struct stasis_topic *topic;
+};
+
+static void topic_pool_entry_dtor(void *obj)
+{
+ struct topic_pool_entry *entry = obj;
+ entry->forward = stasis_unsubscribe(entry->forward);
+ ao2_cleanup(entry->topic);
+ entry->topic = NULL;
+}
+
+static struct topic_pool_entry *topic_pool_entry_alloc(void)
+{
+ return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
+}
+
+struct stasis_topic_pool {
+ struct ao2_container *pool_container;
+ struct stasis_topic *pool_topic;
+};
+
+static void topic_pool_dtor(void *obj)
+{
+ struct stasis_topic_pool *pool = obj;
+ ao2_cleanup(pool->pool_container);
+ pool->pool_container = NULL;
+ ao2_cleanup(pool->pool_topic);
+ pool->pool_topic = NULL;
+}
+
+static int topic_pool_entry_hash(const void *obj, const int flags)
+{
+ const char *topic_name= (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
+ return ast_str_case_hash(topic_name);
+}
+
+static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
+{
+ struct topic_pool_entry *opt1 = obj, *opt2 = arg;
+ const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
+ return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
+}
+
+struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
+{
+ RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
+ if (!pool) {
+ return NULL;
+ }
+ pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
+ ao2_ref(pooled_topic, +1);
+ pool->pool_topic = pooled_topic;
+
+ ao2_ref(pool, +1);
+ return pool;
+}
+
+struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
+{
+ RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
+ topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
+
+ if (topic_pool_entry) {
+ return topic_pool_entry->topic;
+ }
+
+ topic_pool_entry = topic_pool_entry_alloc();
+
+ if (!topic_pool_entry) {
+ return NULL;
+ }
+
+ topic_pool_entry->topic = stasis_topic_create(topic_name);
+ if (!topic_pool_entry->topic) {
+ return NULL;
+ }
+
+ topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
+ if (!topic_pool_entry->forward) {
+ return NULL;
+ }
+
+ ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
+
+ return topic_pool_entry->topic;
+}
+
/*! \brief Cleanup function */
static void stasis_exit(void)
{