summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/stasis.c73
1 files changed, 19 insertions, 54 deletions
diff --git a/main/stasis.c b/main/stasis.c
index 2ad0caf93..d7769790c 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -131,7 +131,7 @@ static void subscription_dtor(void *obj)
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
-static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
@@ -148,11 +148,10 @@ static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic
return NULL;
}
ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
- if (needs_mailbox) {
- sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
- if (!sub->mailbox) {
- return NULL;
- }
+
+ sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
+ if (!sub->mailbox) {
+ return NULL;
}
sub->uniqueid = ast_strdup(uniqueid);
@@ -170,11 +169,6 @@ static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic
return sub;
}
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
-{
- return __stasis_subscribe(topic, callback, data, 1);
-}
-
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
if (sub) {
@@ -338,58 +332,29 @@ static int dispatch_exec(void *data)
void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
{
- struct stasis_subscription **subscribers = NULL;
- size_t num_subscribers, i;
+ size_t i;
+ SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
- /* Copy the subscribers, so we don't have to hold the mutex for long */
- {
- SCOPED_AO2LOCK(lock, topic);
- num_subscribers = topic->num_subscribers_current;
- subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
- if (subscribers) {
- for (i = 0; i < num_subscribers; ++i) {
- ao2_ref(topic->subscribers[i], +1);
- subscribers[i] = topic->subscribers[i];
- }
- }
- }
-
- if (!subscribers) {
- ast_log(LOG_ERROR, "Dropping message\n");
- return;
- }
-
- for (i = 0; i < num_subscribers; ++i) {
- struct stasis_subscription *sub = subscribers[i];
+ for (i = 0; i < topic->num_subscribers_current; ++i) {
+ struct stasis_subscription *sub = topic->subscribers[i];
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
ast_assert(sub != NULL);
- if (sub->mailbox) {
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
- }
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
- dispatch = NULL; /* Ownership transferred to mailbox */
- }
- } else {
- /* No mailbox; dispatch directly */
- sub->callback(sub->data, sub, sub->topic, message);
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ break;
}
- }
- for (i = 0; i < num_subscribers; ++i) {
- ao2_cleanup(subscribers[i]);
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+ dispatch = NULL; /* Ownership transferred to mailbox */
+ }
}
- ast_free(subscribers);
}
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
@@ -414,8 +379,8 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
if (!from_topic || !to_topic) {
return NULL;
}
- /* Subscribe without a mailbox, since we're just forwarding messages */
- sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+
+ sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
if (sub) {
/* hold a ref to to_topic for this forwarding subscription */
ao2_ref(to_topic, +1);