summaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
authorKinsey Moore <kmoore@digium.com>2013-03-16 15:45:58 +0000
committerKinsey Moore <kmoore@digium.com>2013-03-16 15:45:58 +0000
commit99aa02d17f7f1815f9a2abf75282f815a975cd67 (patch)
tree5ebb9cf3ddecd7153afb9e30f767d170c44b2142 /apps
parent5d45596f6257b86189bef2dfaf5d9cc0b001fa46 (diff)
Transition MWI to Stasis-core
Remove MWI's dependency on the event system by moving it to Stasis-core. This also introduces forwarding topic pools in Stasis-core which aggregate many dynamically allocated topics into a single primary topic. Review: https://reviewboard.asterisk.org/r/2368/ (closes issue ASTERISK-21097) Patch-by: Kinsey Moore git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@383284 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'apps')
-rw-r--r--apps/app_minivm.c12
-rw-r--r--apps/app_voicemail.c141
2 files changed, 72 insertions, 81 deletions
diff --git a/apps/app_minivm.c b/apps/app_minivm.c
index 498c6ea2f..53c5f0937 100644
--- a/apps/app_minivm.c
+++ b/apps/app_minivm.c
@@ -2013,7 +2013,6 @@ static int leave_voicemail(struct ast_channel *chan, char *username, struct leav
* \brief Queue a message waiting event */
static void queue_mwi_event(const char *mbx, const char *ctx, int urgent, int new, int old)
{
- struct ast_event *event;
char *mailbox, *context;
mailbox = ast_strdupa(mbx);
@@ -2022,16 +2021,7 @@ static void queue_mwi_event(const char *mbx, const char *ctx, int urgent, int ne
context = "default";
}
- if (!(event = ast_event_new(AST_EVENT_MWI,
- AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
- AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
- AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, (new+urgent),
- AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, old,
- AST_EVENT_IE_END))) {
- return;
- }
-
- ast_event_queue_and_cache(event);
+ stasis_publish_mwi_state(mailbox, context, new + urgent, old);
}
/*!\internal
diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c
index d0a8a7820..f1778691c 100644
--- a/apps/app_voicemail.c
+++ b/apps/app_voicemail.c
@@ -974,10 +974,8 @@ static ast_cond_t poll_cond = PTHREAD_COND_INITIALIZER;
static pthread_t poll_thread = AST_PTHREADT_NULL;
static unsigned char poll_thread_run;
-/*! Subscription to ... MWI event subscriptions */
-static struct ast_event_sub *mwi_sub_sub;
-/*! Subscription to ... MWI event un-subscriptions */
-static struct ast_event_sub *mwi_unsub_sub;
+/*! Subscription to MWI event subscription changes */
+static struct stasis_subscription *mwi_sub_sub;
/*!
* \brief An MWI subscription
@@ -991,16 +989,24 @@ struct mwi_sub {
int old_urgent;
int old_new;
int old_old;
- uint32_t uniqueid;
+ char *uniqueid;
char mailbox[1];
};
struct mwi_sub_task {
const char *mailbox;
const char *context;
- uint32_t uniqueid;
+ const char *uniqueid;
};
+static void mwi_sub_task_dtor(struct mwi_sub_task *mwist)
+{
+ ast_free((void *) mwist->mailbox);
+ ast_free((void *) mwist->context);
+ ast_free((void *) mwist->uniqueid);
+ ast_free(mwist);
+}
+
static struct ast_taskprocessor *mwi_subscription_tps;
static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
@@ -7721,25 +7727,16 @@ static int vm_forwardoptions(struct ast_channel *chan, struct ast_vm_user *vmu,
static void queue_mwi_event(const char *box, int urgent, int new, int old)
{
- struct ast_event *event;
char *mailbox, *context;
/* Strip off @default */
context = mailbox = ast_strdupa(box);
strsep(&context, "@");
- if (ast_strlen_zero(context))
+ if (ast_strlen_zero(context)) {
context = "default";
-
- if (!(event = ast_event_new(AST_EVENT_MWI,
- AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
- AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
- AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, (new+urgent),
- AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, old,
- AST_EVENT_IE_END))) {
- return;
}
- ast_event_queue_and_cache(event);
+ stasis_publish_mwi_state(mailbox, context, new + urgent, old);
}
/*!
@@ -12533,28 +12530,28 @@ static void *mb_poll_thread(void *data)
static void mwi_sub_destroy(struct mwi_sub *mwi_sub)
{
+ ast_free(mwi_sub->uniqueid);
ast_free(mwi_sub);
}
static int handle_unsubscribe(void *datap)
{
struct mwi_sub *mwi_sub;
- uint32_t *uniqueid = datap;
-
+ char *uniqueid = datap;
+
AST_RWLIST_WRLOCK(&mwi_subs);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(&mwi_subs, mwi_sub, entry) {
- if (mwi_sub->uniqueid == *uniqueid) {
+ if (!strcmp(mwi_sub->uniqueid, uniqueid)) {
AST_LIST_REMOVE_CURRENT(entry);
- break;
+ /* Don't break here since a duplicate uniqueid
+ * may have been added as a result of a cache dump. */
+ mwi_sub_destroy(mwi_sub);
}
}
AST_RWLIST_TRAVERSE_SAFE_END
AST_RWLIST_UNLOCK(&mwi_subs);
- if (mwi_sub)
- mwi_sub_destroy(mwi_sub);
-
- ast_free(uniqueid);
+ ast_free(uniqueid);
return 0;
}
@@ -12574,7 +12571,7 @@ static int handle_subscribe(void *datap)
if (!(mwi_sub = ast_calloc(1, len)))
return -1;
- mwi_sub->uniqueid = p->uniqueid;
+ mwi_sub->uniqueid = ast_strdup(p->uniqueid);
if (!ast_strlen_zero(p->mailbox))
strcpy(mwi_sub->mailbox, p->mailbox);
@@ -12586,75 +12583,85 @@ static int handle_subscribe(void *datap)
AST_RWLIST_WRLOCK(&mwi_subs);
AST_RWLIST_INSERT_TAIL(&mwi_subs, mwi_sub, entry);
AST_RWLIST_UNLOCK(&mwi_subs);
- ast_free((void *) p->mailbox);
- ast_free((void *) p->context);
- ast_free(p);
+ mwi_sub_task_dtor(p);
poll_subscribed_mailbox(mwi_sub);
return 0;
}
-static void mwi_unsub_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_unsub_event_cb(struct stasis_subscription_change *change)
{
- uint32_t u, *uniqueid = ast_calloc(1, sizeof(*uniqueid));
+ char *uniqueid = ast_strdup(change->uniqueid);
if (!uniqueid) {
ast_log(LOG_ERROR, "Unable to allocate memory for uniqueid\n");
return;
}
- if (ast_event_get_type(event) != AST_EVENT_UNSUB) {
+ if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
ast_free(uniqueid);
- return;
}
+}
- if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI) {
- ast_free(uniqueid);
+static void mwi_sub_event_cb(struct stasis_subscription_change *change)
+{
+ struct mwi_sub_task *mwist;
+ char *context = ast_strdupa(stasis_topic_name(change->topic));
+ char *mailbox;
+
+ if ((mwist = ast_calloc(1, (sizeof(*mwist)))) == NULL) {
return;
}
- u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
- *uniqueid = u;
- if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
- ast_free(uniqueid);
+ mailbox = strsep(&context, "@");
+
+ mwist->mailbox = ast_strdup(mailbox);
+ mwist->context = ast_strdup(context);
+ mwist->uniqueid = ast_strdup(change->uniqueid);
+
+ if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
+ mwi_sub_task_dtor(mwist);
}
}
-static void mwi_sub_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
{
- struct mwi_sub_task *mwist;
-
- if (ast_event_get_type(event) != AST_EVENT_SUB)
- return;
-
- if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI)
+ struct stasis_subscription_change *change;
+ /* Only looking for subscription change notices here */
+ if (stasis_message_type(msg) != stasis_subscription_change()) {
return;
+ }
- if ((mwist = ast_calloc(1, (sizeof(*mwist)))) == NULL) {
- ast_log(LOG_ERROR, "could not allocate a mwi_sub_task\n");
+ change = stasis_message_data(msg);
+ if (change->topic == stasis_mwi_topic_all()) {
return;
}
- mwist->mailbox = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX));
- mwist->context = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT));
- mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
-
- if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
- ast_free(mwist);
+
+ if (!strcmp(change->description, "Subscribe")) {
+ mwi_sub_event_cb(change);
+ } else if (!strcmp(change->description, "Unsubscribe")) {
+ mwi_unsub_event_cb(change);
}
}
+static int dump_cache(void *obj, void *arg, int flags)
+{
+ struct stasis_message *msg = obj;
+ mwi_event_cb(NULL, NULL, NULL, msg);
+ return 0;
+}
+
static void start_poll_thread(void)
{
int errcode;
- mwi_sub_sub = ast_event_subscribe(AST_EVENT_SUB, mwi_sub_event_cb, "Voicemail MWI subscription", NULL,
- AST_EVENT_IE_EVENTTYPE, AST_EVENT_IE_PLTYPE_UINT, AST_EVENT_MWI,
- AST_EVENT_IE_END);
-
- mwi_unsub_sub = ast_event_subscribe(AST_EVENT_UNSUB, mwi_unsub_event_cb, "Voicemail MWI subscription", NULL,
- AST_EVENT_IE_EVENTTYPE, AST_EVENT_IE_PLTYPE_UINT, AST_EVENT_MWI,
- AST_EVENT_IE_END);
+ mwi_sub_sub = stasis_subscribe(stasis_mwi_topic_all(), mwi_event_cb, NULL);
- if (mwi_sub_sub)
- ast_event_report_subs(mwi_sub_sub);
+ if (mwi_sub_sub) {
+ struct ao2_container *cached = stasis_cache_dump(stasis_mwi_topic_cached(), stasis_subscription_change());
+ if (cached) {
+ ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL);
+ }
+ ao2_cleanup(cached);
+ }
poll_thread_run = 1;
@@ -12668,13 +12675,7 @@ static void stop_poll_thread(void)
poll_thread_run = 0;
if (mwi_sub_sub) {
- ast_event_unsubscribe(mwi_sub_sub);
- mwi_sub_sub = NULL;
- }
-
- if (mwi_unsub_sub) {
- ast_event_unsubscribe(mwi_unsub_sub);
- mwi_unsub_sub = NULL;
+ mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
}
ast_mutex_lock(&poll_lock);