summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorKinsey Moore <kmoore@digium.com>2013-03-20 16:01:30 +0000
committerKinsey Moore <kmoore@digium.com>2013-03-20 16:01:30 +0000
commit7ed0b80d94100626ccdd4ddb19207486bf59710a (patch)
tree14892acaae0ac2b3e42477759e8ed6c8dbfdc216 /main
parent07d01e1c4185bf044f3ebb8bf49caf386c331351 (diff)
Resolve a race condition in Stasis
Because of the way that topics were handled when publishing, it was possible to dispatch a message to a subscription after that subscription had been unsubscribed such that the dispatched message arrived at the callback after the callback had received its final message. In callbacks that cleaned up user data, this would often cause a segfault. This has been resolved by locking the topic during the entirety of dispatch. To prevent long publishing and topic locking times, forwarding subscriptions have been made to be standard subscriptions instead of mailboxless subscriptions which were dispatched at publishing time. git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@383422 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r--main/stasis.c73
1 files changed, 19 insertions, 54 deletions
diff --git a/main/stasis.c b/main/stasis.c
index 2ad0caf93..d7769790c 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -131,7 +131,7 @@ static void subscription_dtor(void *obj)
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
-static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
@@ -148,11 +148,10 @@ static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic
return NULL;
}
ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
- if (needs_mailbox) {
- sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
- if (!sub->mailbox) {
- return NULL;
- }
+
+ sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
+ if (!sub->mailbox) {
+ return NULL;
}
sub->uniqueid = ast_strdup(uniqueid);
@@ -170,11 +169,6 @@ static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic
return sub;
}
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
-{
- return __stasis_subscribe(topic, callback, data, 1);
-}
-
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
if (sub) {
@@ -338,58 +332,29 @@ static int dispatch_exec(void *data)
void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
{
- struct stasis_subscription **subscribers = NULL;
- size_t num_subscribers, i;
+ size_t i;
+ SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
- /* Copy the subscribers, so we don't have to hold the mutex for long */
- {
- SCOPED_AO2LOCK(lock, topic);
- num_subscribers = topic->num_subscribers_current;
- subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
- if (subscribers) {
- for (i = 0; i < num_subscribers; ++i) {
- ao2_ref(topic->subscribers[i], +1);
- subscribers[i] = topic->subscribers[i];
- }
- }
- }
-
- if (!subscribers) {
- ast_log(LOG_ERROR, "Dropping message\n");
- return;
- }
-
- for (i = 0; i < num_subscribers; ++i) {
- struct stasis_subscription *sub = subscribers[i];
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ struct stasis_subscription *sub = topic->subscribers[i];
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
ast_assert(sub != NULL);
- if (sub->mailbox) {
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
- }
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
- dispatch = NULL; /* Ownership transferred to mailbox */
- }
- } else {
- /* No mailbox; dispatch directly */
- sub->callback(sub->data, sub, sub->topic, message);
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ break;
}
- }
- for (i = 0; i < num_subscribers; ++i) {
- ao2_cleanup(subscribers[i]);
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+ dispatch = NULL; /* Ownership transferred to mailbox */
+ }
}
- ast_free(subscribers);
}
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
@@ -414,8 +379,8 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
if (!from_topic || !to_topic) {
return NULL;
}
- /* Subscribe without a mailbox, since we're just forwarding messages */
- sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+
+ sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
if (sub) {
/* hold a ref to to_topic for this forwarding subscription */
ao2_ref(to_topic, +1);