summaryrefslogtreecommitdiff
path: root/include/asterisk
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-12-01 17:59:21 +0000
committerMatthew Jordan <mjordan@digium.com>2014-12-01 17:59:21 +0000
commit1106e8fd0f831c79eb2a7cf079739561da11d6ef (patch)
tree3e841771528c621760a6b0471935bc432e704054 /include/asterisk
parentef9ca8bc32c7cce781c35bca81d9ae544fd22dae (diff)
main/stasis: Allow subscriptions to use a threadpool for message delivery
Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'include/asterisk')
-rw-r--r--include/asterisk/stasis.h25
-rw-r--r--include/asterisk/stasis_internal.h7
-rw-r--r--include/asterisk/stasis_message_router.h16
3 files changed, 46 insertions, 2 deletions
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 4189513ac..0b1b1e83f 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -542,6 +542,31 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
/*!
+ * \brief Create a subscription whose callbacks occur on a thread pool
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but will almost certainly not
+ * always happen on the same thread. The invocation order of different subscriptions
+ * is unspecified.
+ *
+ * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
+ * dispatch items to its \c callback. This form of subscription should be used
+ * when many subscriptions may be made to the specified \c topic.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12.8.0
+ */
+struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
+ stasis_subscription_cb callback, void *data);
+
+/*!
* \brief Cancel a subscription.
*
* Note that in an asynchronous system, there may still be messages queued or
diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h
index bb7b6cc0a..bc6122c2b 100644
--- a/include/asterisk/stasis_internal.h
+++ b/include/asterisk/stasis_internal.h
@@ -52,8 +52,10 @@
* \param callback Callback function for subscription messages.
* \param data Data to be passed to the callback, in addition to the message.
* \param needs_mailbox Determines whether or not the subscription requires a mailbox.
- * Subscriptions with mailboxes will be delivered on a thread in the Stasis threadpool;
+ * Subscriptions with mailboxes will be delivered on some non-publisher thread;
* subscriptions without mailboxes will be delivered on the publisher thread.
+ * \param use_thread_pool Use the thread pool for the subscription. This is only
+ * relevant if \c needs_mailbox is non-zero.
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12
@@ -62,6 +64,7 @@ struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
- int needs_mailbox);
+ int needs_mailbox,
+ int use_thread_pool);
#endif /* STASIS_INTERNAL_H_ */
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 613a2bd7f..89657a5ee 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -59,6 +59,22 @@ struct stasis_message_router *stasis_message_router_create(
struct stasis_topic *topic);
/*!
+ * \brief Create a new message router object.
+ *
+ * The subscription created for this message router will dispatch
+ * callbacks on a thread pool.
+ *
+ * \param topic Topic to subscribe route to.
+ *
+ * \return New \ref stasis_message_router.
+ * \return \c NULL on error.
+ *
+ * \since 12.8.0
+ */
+struct stasis_message_router *stasis_message_router_create_pool(
+ struct stasis_topic *topic);
+
+/*!
* \brief Unsubscribe the router from the upstream topic.
*
* \param router Router to unsubscribe.