summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-09-30 18:55:27 +0000
committerDavid M. Lee <dlee@digium.com>2013-09-30 18:55:27 +0000
commit2de42c2a257947d0a376d861dbd1ca76923e7930 (patch)
treea01a063af4857ba99e8fa3f7892dd41c6edf826a /main
parentdb7c8691a06bd896070e1a9c4f90309104509190 (diff)
Multiple revisions 399887,400138,400178,400180-400181
........ r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line Minor performance bump by not allocate manager variable struct if we don't need it ........ r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines Stasis performance improvements This patch addresses several performance problems that were found in the initial performance testing of Asterisk 12. The Stasis dispatch object was allocated as an AO2 object, even though it has a very confined lifecycle. This was replaced with a straight ast_malloc(). The Stasis message router was spending an inordinate amount of time searching hash tables. In this case, most of our routers had 6 or fewer routes in them to begin with. This was replaced with an array that's searched linearly for the route. We more heavily rely on AO2 objects in Asterisk 12, and the memset() in ao2_ref() actually became noticeable on the profile. This was #ifdef'ed to only run when AO2_DEBUG was enabled. After being misled by an erroneous comment in taskprocessor.c during profiling, the wrong comment was removed. Review: https://reviewboard.asterisk.org/r/2873/ ........ r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines Taskprocessor optimization; switch Stasis to use taskprocessors This patch optimizes taskprocessor to use a semaphore for signaling, which the OS can do a better job at managing contention and waiting that we can with a mutex and condition. The taskprocessor execution was also slightly optimized to reduce the number of locks taken. The only observable difference in the taskprocessor implementation is that when the final reference to the taskprocessor goes away, it will execute all tasks to completion instead of discarding the unexecuted tasks. For systems where unnamed semaphores are not supported, a really simple semaphore implementation is provided. (Which gives identical performance as the original taskprocessor implementation). The way we ended up implementing Stasis caused the threadpool to be a burden instead of a boost to performance. This was switched to just use taskprocessors directly for subscriptions. Review: https://reviewboard.asterisk.org/r/2881/ ........ r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines Optimize how Stasis forwards are dispatched This patch optimizes how forwards are dispatched in Stasis. Originally, forwards were dispatched as subscriptions that are invoked on the publishing thread. This did not account for the vast number of forwards we would end up having in the system, and the amount of work it would take to walk though the forward subscriptions. This patch modifies Stasis so that rather than walking the tree of forwards on every dispatch, when forwards and subscriptions are changed, the subscriber list for every topic in the tree is changed. This has a couple of benefits. First, this reduces the workload of dispatching messages. It also reduces contention when dispatching to different topics that happen to forward to the same aggregation topic (as happens with all of the channel, bridge and endpoint topics). Since forwards are no longer subscriptions, the bulk of this patch is simply changing stasis_subscription objects to stasis_forward objects (which, admittedly, I should have done in the first place.) Since this required me to yet again put in a growing array, I finally abstracted that out into a set of ast_vector macros in asterisk/vector.h. Review: https://reviewboard.asterisk.org/r/2883/ ........ r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines Remove dispatch object allocation from Stasis publishing While looking for areas for performance improvement, I realized that an unused feature in Stasis was negatively impacting performance. When a message is sent to a subscriber, a dispatch object is allocated for the dispatch, containing the topic the message was published to, the subscriber the message is being sent to, and the message itself. The topic is actually unused by any subscriber in Asterisk today. And the subscriber is associated with the taskprocessor the message is being dispatched to. First, this patch removes the unused topic parameter from Stasis subscription callbacks. Second, this patch introduces the concept of taskprocessor local data, data that may be set on a taskprocessor and provided along with the data pointer when a task is pushed using the ast_taskprocessor_push_local() call. This allows the task to have both data specific to that taskprocessor, in addition to data specific to that invocation. With those two changes, the dispatch object can be removed completely, and the message is simply refcounted and sent directly to the taskprocessor. Review: https://reviewboard.asterisk.org/r/2884/ ........ Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r--main/astobj2.c29
-rw-r--r--main/ccss.c6
-rw-r--r--main/cdr.c22
-rw-r--r--main/cel.c26
-rw-r--r--main/channel.c11
-rw-r--r--main/channel_internal_api.c6
-rw-r--r--main/devicestate.c2
-rw-r--r--main/endpoints.c4
-rw-r--r--main/manager.c11
-rw-r--r--main/manager_bridges.c8
-rw-r--r--main/manager_channels.c35
-rw-r--r--main/manager_endpoints.c7
-rw-r--r--main/manager_mwi.c6
-rw-r--r--main/manager_system.c4
-rw-r--r--main/pbx.c4
-rw-r--r--main/sem.c116
-rw-r--r--main/sounds_index.c2
-rw-r--r--main/stasis.c389
-rw-r--r--main/stasis_cache.c13
-rw-r--r--main/stasis_cache_pattern.c12
-rw-r--r--main/stasis_config.c201
-rw-r--r--main/stasis_message_router.c233
-rw-r--r--main/stasis_wait.c2
-rw-r--r--main/taskprocessor.c234
24 files changed, 674 insertions, 709 deletions
diff --git a/main/astobj2.c b/main/astobj2.c
index 88a6a6a23..88801bd2f 100644
--- a/main/astobj2.c
+++ b/main/astobj2.c
@@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
ast_atomic_fetchadd_int(&ao2.total_objects, -1);
#endif
+ /* In case someone uses an object after it's been freed */
+ obj->priv_data.magic = 0;
+
switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
obj_mutex = INTERNAL_OBJ_MUTEX(user_data);
ast_mutex_destroy(&obj_mutex->mutex.lock);
- /*
- * For safety, zero-out the astobj2_lock header and also the
- * first word of the user-data, which we make sure is always
- * allocated.
- */
- memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) );
ast_free(obj_mutex);
break;
case AO2_ALLOC_OPT_LOCK_RWLOCK:
obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data);
ast_rwlock_destroy(&obj_rwlock->rwlock.lock);
- /*
- * For safety, zero-out the astobj2_rwlock header and also the
- * first word of the user-data, which we make sure is always
- * allocated.
- */
- memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) );
ast_free(obj_rwlock);
break;
case AO2_ALLOC_OPT_LOCK_NOLOCK:
- /*
- * For safety, zero-out the astobj2 header and also the first
- * word of the user-data, which we make sure is always
- * allocated.
- */
- memset(obj, '\0', sizeof(*obj) + sizeof(void *) );
ast_free(obj);
break;
default:
@@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f
struct astobj2_lock *obj_mutex;
struct astobj2_rwlock *obj_rwlock;
- if (data_size < sizeof(void *)) {
- /*
- * We always alloc at least the size of a void *,
- * for debugging purposes.
- */
- data_size = sizeof(void *);
- }
-
switch (options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
#if defined(__AST_DEBUG_MALLOC)
diff --git a/main/ccss.c b/main/ccss.c
index 061c45a7c..3068c6ffa 100644
--- a/main/ccss.c
+++ b/main/ccss.c
@@ -1397,7 +1397,7 @@ static void generic_monitor_instance_list_destructor(void *obj)
ast_free((char *)generic_list->device_name);
}
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
{
struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
@@ -1471,7 +1471,7 @@ static int generic_monitor_devstate_tp_cb(void *data)
return 0;
}
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* Wow, it's cool that we've picked up on a state change, but we really want
* the actual work to be done in the core's taskprocessor execution thread
@@ -2750,7 +2750,7 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent)
return 0;
}
-static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_cc_agent *agent = userdata;
enum ast_device_state new_state;
diff --git a/main/cdr.c b/main/cdr.c
index fb02d3350..ea0f9c0e4 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -334,13 +334,13 @@ static struct ao2_container *active_cdrs_by_channel;
static struct stasis_message_router *stasis_router;
/*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
/*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
/*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
/*! \brief The parent topic for all topics we want to aggregate for CDRs */
static struct stasis_topic *cdr_topic;
@@ -1839,7 +1839,7 @@ static int finalized_state_process_party_a(struct cdr_object *cdr, struct ast_ch
* \param topic The topic this message was published for
* \param message The message
*/
-static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
@@ -2020,7 +2020,7 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot,
* \param topic The topic this message was published for
* \param message The message
*/
-static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
@@ -2150,7 +2150,7 @@ static int filter_bridge_messages(struct ast_bridge_snapshot *bridge)
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_leave_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2450,7 +2450,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_enter_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2494,7 +2494,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription *
* \param message The message about who got parked
* */
static void handle_parked_call_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_parked_call_payload *payload = stasis_message_data(message);
struct ast_channel_snapshot *channel = payload->parkee;
@@ -3884,9 +3884,9 @@ static int process_config(int reload)
static void cdr_engine_cleanup(void)
{
- channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
- bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
- parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+ channel_subscription = stasis_forward_cancel(channel_subscription);
+ bridge_subscription = stasis_forward_cancel(bridge_subscription);
+ parking_subscription = stasis_forward_cancel(parking_subscription);
stasis_message_router_unsubscribe_and_join(stasis_router);
ao2_cleanup(cdr_topic);
cdr_topic = NULL;
diff --git a/main/cel.c b/main/cel.c
index 6050fac75..0d78b5cce 100644
--- a/main/cel.c
+++ b/main/cel.c
@@ -121,16 +121,16 @@ static struct stasis_topic *cel_topic;
static struct stasis_topic *cel_aggregation_topic;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
/*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
/*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
struct stasis_message_type *cel_generic_type(void);
STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@@ -1019,7 +1019,6 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot)
}
static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_cache_update *update = stasis_message_data(message);
@@ -1082,7 +1081,6 @@ static struct ast_str *cel_generate_peer_str(
static void cel_bridge_enter_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1110,7 +1108,6 @@ static void cel_bridge_enter_cb(
static void cel_bridge_leave_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1138,7 +1135,6 @@ static void cel_bridge_leave_cb(
static void cel_parking_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_parked_call_payload *parked_payload = stasis_message_data(message);
@@ -1183,7 +1179,6 @@ static void save_dialstatus(struct ast_multi_channel_blob *blob)
}
static void cel_dial_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *blob = stasis_message_data(message);
@@ -1218,7 +1213,6 @@ static void cel_dial_cb(void *data, struct stasis_subscription *sub,
static void cel_generic_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
@@ -1241,7 +1235,6 @@ static void cel_generic_cb(
static void cel_blind_transfer_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *obj = stasis_message_data(message);
@@ -1289,7 +1282,6 @@ static void cel_blind_transfer_cb(
static void cel_attended_transfer_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_attended_transfer_message *xfer = stasis_message_data(message);
@@ -1342,7 +1334,6 @@ static void cel_attended_transfer_cb(
static void cel_pickup_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
@@ -1364,7 +1355,6 @@ static void cel_pickup_cb(
static void cel_local_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
@@ -1394,10 +1384,10 @@ static void ast_cel_engine_term(void)
cel_aggregation_topic = NULL;
ao2_cleanup(cel_topic);
cel_topic = NULL;
- cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
- cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
- cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
- cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+ cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+ cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+ cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+ cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
ast_cli_unregister(&cli_status);
ao2_cleanup(cel_dialstatus_store);
cel_dialstatus_store = NULL;
diff --git a/main/channel.c b/main/channel.c
index 7ba3e6c39..e8ce5e0c5 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -7549,14 +7549,19 @@ struct varshead *ast_channel_get_manager_vars(struct ast_channel *chan)
RAII_VAR(struct ast_str *, tmp, NULL, ast_free);
struct manager_channel_variable *mcv;
- ret = ao2_alloc(sizeof(*ret), varshead_dtor);
- tmp = ast_str_create(16);
-
if (!ret || !tmp) {
return NULL;
}
AST_RWLIST_RDLOCK(&channelvars);
+
+ if (AST_LIST_EMPTY(&channelvars)) {
+ return NULL;
+ }
+
+ ret = ao2_alloc(sizeof(*ret), varshead_dtor);
+ tmp = ast_str_create(16);
+
AST_LIST_TRAVERSE(&channelvars, mcv, entry) {
const char *val = NULL;
struct ast_var_t *var;
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index 956816d76..de2cc9c71 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -207,8 +207,7 @@ struct ast_channel {
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_cp_single *topics; /*!< Topic for all channel's events */
- struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
- struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
+ struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1429,8 +1428,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
ast_string_field_free_memory(chan);
- chan->forwarder = stasis_unsubscribe(chan->forwarder);
- chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+ chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
stasis_cp_single_unsubscribe(chan->topics);
chan->topics = NULL;
diff --git a/main/devicestate.c b/main/devicestate.c
index bcf07ff0b..158d1f817 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -610,7 +610,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre
return 1;
}
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
enum ast_device_state aggregate_state;
char *device;
diff --git a/main/endpoints.c b/main/endpoints.c
index b33e33f1a..bdcf401ba 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -152,7 +152,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
/*! \brief Handler for channel snapshot cache clears */
static void endpoint_cache_clear(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_endpoint *endpoint = data;
@@ -174,7 +174,7 @@ static void endpoint_cache_clear(void *data,
}
static void endpoint_default(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_endpoint *endpoint = data;
diff --git a/main/manager.c b/main/manager.c
index 00649dafa..69def4b1f 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1126,7 +1126,7 @@ static struct stasis_topic *manager_topic;
static struct stasis_message_router *stasis_router;
/*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
#define MGR_SHOW_TERMINAL_WIDTH 80
@@ -1151,7 +1151,7 @@ static const struct {
{{ "restart", "gracefully", NULL }},
};
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void acl_change_stasis_subscribe(void)
{
@@ -1427,7 +1427,6 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl
}
static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
@@ -1444,7 +1443,6 @@ static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
}
static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_json_payload *payload = stasis_message_data(message);
@@ -7640,7 +7638,6 @@ static void load_channelvars(struct ast_variable *var)
#ifdef TEST_FRAMEWORK
static void test_suite_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_test_suite_message_payload *payload;
@@ -7759,7 +7756,7 @@ static void manager_shutdown(void)
stasis_message_router_unsubscribe_and_join(stasis_router);
stasis_router = NULL;
}
- stasis_unsubscribe_and_join(rtp_topic_forwarder);
+ stasis_forward_cancel(rtp_topic_forwarder);
rtp_topic_forwarder = NULL;
ao2_cleanup(manager_topic);
manager_topic = NULL;
@@ -8344,7 +8341,7 @@ static int __init_manager(int reload, int by_external_config)
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;
diff --git a/main/manager_bridges.c b/main/manager_bridges.c
index 77d9ff05e..fad676b56 100644
--- a/main/manager_bridges.c
+++ b/main/manager_bridges.c
@@ -106,7 +106,7 @@ static struct stasis_message_router *bridge_state_router;
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_bridge_state_string_prefix(
const struct ast_bridge_snapshot *snapshot,
@@ -180,7 +180,6 @@ bridge_snapshot_monitor bridge_monitors[] = {
};
static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
@@ -221,7 +220,6 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
}
static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_merge_message *merge_msg = stasis_message_data(message);
@@ -254,7 +252,6 @@ static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
}
static void channel_enter_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
static const char *swap_name = "SwapUniqueid: ";
@@ -283,7 +280,6 @@ static void channel_enter_cb(void *data, struct stasis_subscription *sub,
}
static void channel_leave_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -456,7 +452,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m)
static void manager_bridging_cleanup(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/manager_channels.c b/main/manager_channels.c
index 485841b69..0bebb216c 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -370,7 +370,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_channel_state_string_prefix(
const struct ast_channel_snapshot *snapshot,
@@ -565,7 +565,6 @@ channel_snapshot_monitor channel_monitors[] = {
};
static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -616,7 +615,7 @@ static int userevent_exclusion_cb(const char *key)
}
static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -667,7 +666,7 @@ static void publish_basic_channel_event(const char *event, int class, struct ast
}
static void channel_hangup_request_cb(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
@@ -707,7 +706,7 @@ static void channel_hangup_request_cb(void *data,
}
static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
struct ast_channel_snapshot *spyer;
@@ -730,7 +729,7 @@ static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
}
static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free);
@@ -765,7 +764,7 @@ static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub
}
static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -806,7 +805,7 @@ static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
}
static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -853,7 +852,7 @@ static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
}
static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -884,7 +883,7 @@ static void channel_hangup_handler_cb(void *data, struct stasis_subscription *su
}
static void channel_fax_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free);
@@ -957,7 +956,7 @@ static void channel_fax_cb(void *data, struct stasis_subscription *sub,
}
static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
struct ast_json *blob = payload->blob;
@@ -977,7 +976,7 @@ static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
}
static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -985,7 +984,7 @@ static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
}
static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -993,7 +992,7 @@ static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub
}
static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -1004,7 +1003,7 @@ static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
* \brief Callback processing messages for channel dialing
*/
static void channel_dial_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
const char *dialstatus;
@@ -1051,7 +1050,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
}
static void channel_hold_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
const char *musicclass;
@@ -1083,7 +1082,7 @@ static void channel_hold_cb(void *data, struct stasis_subscription *sub,
}
static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -1100,7 +1099,7 @@ static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
static void manager_channels_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/manager_endpoints.c b/main/manager_endpoints.c
index 634283728..b5f5b31c2 100644
--- a/main/manager_endpoints.c
+++ b/main/manager_endpoints.c
@@ -46,14 +46,9 @@ static void manager_endpoints_shutdown(void)
}
static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
- /* XXX This looks wrong. Nothing should post or forward to a caching
- * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
- * to dig to make sure I don't break anything, though.
- */
- stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
+ stasis_publish(ast_manager_get_topic(), message);
}
int manager_endpoints_init(void)
diff --git a/main/manager_mwi.c b/main/manager_mwi.c
index 12a3de361..849c315e1 100644
--- a/main/manager_mwi.c
+++ b/main/manager_mwi.c
@@ -41,7 +41,7 @@ struct stasis_message_router *mwi_state_router;
/*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
/*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
static int exclude_event_cb(const char *key)
@@ -54,7 +54,6 @@ static int exclude_event_cb(const char *key)
/*! \brief Generic MWI event callback used for one-off events from voicemail modules */
static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_blob *payload = stasis_message_data(message);
@@ -86,7 +85,6 @@ static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
}
static void mwi_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_state *mwi_state;
@@ -149,7 +147,7 @@ static void mwi_update_cb(void *data, struct stasis_subscription *sub,
static void manager_mwi_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/manager_system.c b/main/manager_system.c
index 4fef11da4..f4e7e9e0b 100644
--- a/main/manager_system.c
+++ b/main/manager_system.c
@@ -34,11 +34,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! \brief The \ref stasis subscription returned by the forwarding of the system topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static void manager_system_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/pbx.c b/main/pbx.c
index 09f3d95ec..2a415401f 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -5111,7 +5111,7 @@ static void get_device_state_causing_channels(struct ao2_container *c)
ao2_iterator_destroy(&iter);
}
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_device_state_message *dev_state;
struct ast_hint *hint;
@@ -11369,7 +11369,7 @@ static int pbx_builtin_sayphonetic(struct ast_channel *chan, const char *data)
return res;
}
-static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_presence_state_message *presence_state = stasis_message_data(msg);
struct ast_hint *hint;
diff --git a/main/sem.c b/main/sem.c
new file mode 100644
index 000000000..e67d9c72e
--- /dev/null
+++ b/main/sem.c
@@ -0,0 +1,116 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Asterisk semaphore support.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/sem.h"
+#include "asterisk/utils.h"
+
+#ifndef HAS_WORKING_SEMAPHORE
+
+/* DIY semaphores! */
+
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
+{
+ if (pshared) {
+ /* Don't need it... yet */
+ errno = ENOSYS;
+ return -1;
+ }
+
+ /* Since value is unsigned, this will also catch attempts to init with
+ * a negative value */
+ if (value > AST_SEM_VALUE_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ sem->count = value;
+ sem->waiters = 0;
+ ast_mutex_init(&sem->mutex);
+ ast_cond_init(&sem->cond, NULL);
+ return 0;
+}
+
+int ast_sem_destroy(struct ast_sem *sem)
+{
+ ast_mutex_destroy(&sem->mutex);
+ ast_cond_destroy(&sem->cond);
+ return 0;
+}
+
+int ast_sem_post(struct ast_sem *sem)
+{
+ SCOPED_MUTEX(lock, &sem->mutex);
+
+ ast_assert(sem->count >= 0);
+
+ if (sem->count == AST_SEM_VALUE_MAX) {
+ errno = EOVERFLOW;
+ return -1;
+ }
+
+ /* Give it up! */
+ ++sem->count;
+
+ /* Release a waiter, if needed */
+ if (sem->waiters) {
+ ast_cond_signal(&sem->cond);
+ }
+
+ return 0;
+}
+
+int ast_sem_wait(struct ast_sem *sem)
+{
+ SCOPED_MUTEX(lock, &sem->mutex);
+
+ ast_assert(sem->count >= 0);
+
+ /* Wait for a non-zero count */
+ ++sem->waiters;
+ while (sem->count == 0) {
+ ast_cond_wait(&sem->cond, &sem->mutex);
+ }
+ --sem->waiters;
+
+ /* Take it! */
+ --sem->count;
+
+ return 0;
+}
+
+int ast_sem_getvalue(struct ast_sem *sem, int *sval)
+{
+ SCOPED_MUTEX(lock, &sem->mutex);
+
+ ast_assert(sem->count >= 0);
+
+ *sval = sem->count;
+
+ return 0;
+}
+
+#endif
diff --git a/main/sounds_index.c b/main/sounds_index.c
index 9f70ef6cc..2fcd23908 100644
--- a/main/sounds_index.c
+++ b/main/sounds_index.c
@@ -281,7 +281,7 @@ static void sounds_cleanup(void)
}
static void format_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
ast_sounds_reindex();
}
diff --git a/main/stasis.c b/main/stasis.c
index 1a03bb3d4..42c901769 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -29,15 +29,15 @@
#include "asterisk.h"
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
+#include "asterisk/vector.h"
/*!
* \page stasis-impl Stasis Implementation Notes
@@ -134,24 +134,23 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
-/*! Threadpool for dispatching notifications to subscribers */
-static struct ast_threadpool *pool;
-
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
- struct stasis_subscription **subscribers;
- /*! Allocated length of the subscribers array */
- size_t num_subscribers_max;
- /*! Current size of the subscribers array */
- size_t num_subscribers_current;
+ ast_vector(struct stasis_subscription *) subscribers;
+
+ /*! Topics forwarding into this topic */
+ ast_vector(struct stasis_topic *) upstream_topics;
};
/* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+ struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
{
@@ -159,16 +158,18 @@ static void topic_dtor(void *obj)
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here. */
- ast_assert(topic->num_subscribers_current == 0);
+ ast_assert(ast_vector_size(topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
- ast_free(topic->subscribers);
- topic->subscribers = NULL;
+
+ ast_vector_free(topic->subscribers);
+ ast_vector_free(topic->upstream_topics);
}
struct stasis_topic *stasis_topic_create(const char *name)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ int res = 0;
topic = ao2_alloc(sizeof(*topic), topic_dtor);
@@ -181,9 +182,10 @@ struct stasis_topic *stasis_topic_create(const char *name)
return NULL;
}
- topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
- topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
- if (!topic->subscribers) {
+ res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+ res |= ast_vector_init(topic->upstream_topics, 0);
+
+ if (res != 0) {
return NULL;
}
@@ -247,7 +249,6 @@ static void subscription_dtor(void *obj)
* \param message Message to send.
*/
static void subscription_invoke(struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
/* Notify that the final message has been received */
@@ -258,7 +259,7 @@ static void subscription_invoke(struct stasis_subscription *sub,
}
/* Since sub is mostly immutable, no need to lock sub */
- sub->callback(sub->data, sub, topic, message);
+ sub->callback(sub->data, sub, message);
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
@@ -268,7 +269,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
}
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
@@ -286,10 +288,21 @@ struct stasis_subscription *internal_stasis_subscribe(
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
- sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+ /* With a small number of subscribers, a thread-per-sub is
+ * acceptable. If our usage changes so that we have larger
+ * numbers of subscribers, we'll probably want to consider
+ * a threadpool. We had that originally, but with so few
+ * subscribers it was actually a performance loss instead of
+ * a gain.
+ */
+ sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+ TPS_REF_DEFAULT);
if (!sub->mailbox) {
return NULL;
}
+ ast_taskprocessor_set_local(sub->mailbox, sub);
+ /* Taskprocessor has a reference */
+ ao2_ref(sub, +1);
}
ao2_ref(topic, +1);
@@ -302,7 +315,7 @@ struct stasis_subscription *internal_stasis_subscribe(
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
}
- send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+ send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub;
@@ -316,29 +329,42 @@ struct stasis_subscription *stasis_subscribe(
return internal_stasis_subscribe(topic, callback, data, 1);
}
+static int sub_cleanup(void *data)
+{
+ struct stasis_subscription *sub = data;
+ ao2_cleanup(sub);
+ return 0;
+}
+
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
- if (sub) {
- size_t i;
- /* The subscription may be the last ref to this topic. Hold
- * the topic ref open until after the unlock. */
- RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
- ao2_cleanup);
- SCOPED_AO2LOCK(lock_topic, topic);
+ /* The subscription may be the last ref to this topic. Hold
+ * the topic ref open until after the unlock. */
+ RAII_VAR(struct stasis_topic *, topic,
+ ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
- send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
- /* swap [i] with last entry; remove last entry */
- topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
- /* Unsubscribing unrefs the subscription */
- ao2_cleanup(sub);
- return NULL;
- }
- }
+ if (!sub) {
+ return NULL;
+ }
+
+ /* We have to remove the subscription first, to ensure the unsubscribe
+ * is the final message */
+ if (topic_remove_subscription(sub->topic, sub) != 0) {
+ ast_log(LOG_ERROR,
+ "Internal error: subscription has invalid topic\n");
+ return NULL;
+ }
+
+ /* Now let everyone know about the unsubscribe */
+ send_subscription_unsubscribe(topic, sub);
- ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+ /* When all that's done, remove the ref the mailbox has on the sub */
+ if (sub->mailbox) {
+ ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
}
+
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
return NULL;
}
@@ -388,8 +414,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ if (ast_vector_get(topic->subscribers, i) == sub) {
return 1;
}
}
@@ -431,74 +457,36 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
*/
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- struct stasis_subscription **subscribers;
+ size_t idx;
SCOPED_AO2LOCK(lock, topic);
- /* Increase list size, if needed */
- if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
- subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
- if (!subscribers) {
- return -1;
- }
- topic->subscribers = subscribers;
- topic->num_subscribers_max *= 2;
- }
-
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
* to release it.
*
* If we bumped the refcount here, the owner would have to unsubscribe
* and cleanup, which is a bit awkward. */
- topic->subscribers[topic->num_subscribers_current++] = sub;
- return 0;
-}
+ ast_vector_append(topic->subscribers, sub);
-/*!
- * \internal
- * \brief Information needed to dispatch a message to a subscription
- */
-struct dispatch {
- /*! Topic message was published to */
- struct stasis_topic *topic;
- /*! The message itself */
- struct stasis_message *message;
- /*! Subscription receiving the message */
- struct stasis_subscription *sub;
-};
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_add_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
-static void dispatch_dtor(void *data)
-{
- struct dispatch *dispatch = data;
- ao2_cleanup(dispatch->topic);
- ao2_cleanup(dispatch->message);
- ao2_cleanup(dispatch->sub);
+ return 0;
}
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+ size_t idx;
+ SCOPED_AO2LOCK(lock_topic, topic);
- ast_assert(topic != NULL);
- ast_assert(message != NULL);
- ast_assert(sub != NULL);
-
- dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
- if (!dispatch) {
- return NULL;
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_remove_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
}
- dispatch->topic = topic;
- ao2_ref(topic, +1);
-
- dispatch->message = message;
- ao2_ref(message, +1);
-
- dispatch->sub = sub;
- ao2_ref(sub, +1);
-
- ao2_ref(dispatch, +1);
- return dispatch;
+ return ast_vector_remove_elem_unordered(topic->subscribers, sub);
}
/*!
@@ -506,16 +494,34 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
* \param data \ref dispatch object
* \return 0
*/
-static int dispatch_exec(void *data)
+static int dispatch_exec(struct ast_taskprocessor_local *local)
{
- RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+ struct stasis_subscription *sub = local->local_data;
+ struct stasis_message *message = local->data;
- subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
+ subscription_invoke(sub, message);
+ ao2_cleanup(message);
return 0;
}
-void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+static void dispatch_message(struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ if (sub->mailbox) {
+ ao2_bump(message);
+ if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+ /* Push failed; ugh. */
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ ao2_cleanup(message);
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, message);
+ }
+}
+
+void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
{
size_t i;
/* The topic may be unref'ed by the subscription invocation.
@@ -525,70 +531,104 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
- ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- struct stasis_subscription *sub = topic->subscribers[i];
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ struct stasis_subscription *sub =
+ ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
- if (sub->mailbox) {
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
- }
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
- /* Ownership transferred to mailbox.
- * Don't increment ref, b/c the task processor
- * may have already gotten rid of the object.
- */
- dispatch = NULL;
- }
- } else {
- /* Dispatch directly */
- subscription_invoke(sub, publisher_topic, message);
- }
+ dispatch_message(sub, message);
}
}
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+/*!
+ * \brief Forwarding information
+ *
+ * Any message posted to \a from_topic is forwarded to \a to_topic.
+ *
+ * In cases where both the \a from_topic and \a to_topic need to be locked,
+ * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
+ */
+struct stasis_forward {
+ /*! Originating topic */
+ struct stasis_topic *from_topic;
+ /*! Destination topic */
+ struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
{
- stasis_forward_message(topic, topic, message);
+ struct stasis_forward *forward = obj;
+
+ ao2_cleanup(forward->from_topic);
+ forward->from_topic = NULL;
+ ao2_cleanup(forward->to_topic);
+ forward->to_topic = NULL;
}
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
{
- struct stasis_topic *to_topic = data;
- stasis_forward_message(to_topic, topic, message);
+ if (forward) {
+ int idx;
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(to_topic);
+ struct stasis_topic *from = forward->from_topic;
+ struct stasis_topic *to = forward->to_topic;
+
+ SCOPED_AO2LOCK(to_lock, to);
+
+ ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+ ao2_lock(from);
+ for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+ topic_remove_subscription(
+ from, ast_vector_get(to->subscribers, idx));
+ }
+ ao2_unlock(from);
}
+
+ ao2_cleanup(forward);
+
+ return NULL;
}
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic)
{
- struct stasis_subscription *sub;
+ RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+
if (!from_topic || !to_topic) {
return NULL;
}
- /* Forwarding subscriptions should dispatch directly instead of having a
- * mailbox. Otherwise, messages forwarded to the same topic from
- * different topics may get reordered. Which is bad.
- */
- sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
- if (sub) {
- /* hold a ref to to_topic for this forwarding subscription */
- ao2_ref(to_topic, +1);
+ forward = ao2_alloc(sizeof(*forward), forward_dtor);
+ if (!forward) {
+ return NULL;
}
- return sub;
+
+ forward->from_topic = ao2_bump(from_topic);
+ forward->to_topic = ao2_bump(to_topic);
+
+ {
+ SCOPED_AO2LOCK(lock, to_topic);
+ int res;
+
+ res = ast_vector_append(to_topic->upstream_topics, from_topic);
+ if (res != 0) {
+ return NULL;
+ }
+
+ {
+ SCOPED_AO2LOCK(lock, from_topic);
+ size_t idx;
+ for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+ topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
+ }
+ }
+ }
+
+ return ao2_bump(forward);
}
static void subscription_change_dtor(void *obj)
@@ -598,7 +638,7 @@ static void subscription_change_dtor(void *obj)
ao2_cleanup(change->topic);
}
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
@@ -616,12 +656,15 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi
return change;
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- change = subscription_change_alloc(topic, uniqueid, description);
+ /* This assumes that we have already unsubscribed */
+ ast_assert(stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
@@ -636,15 +679,42 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u
stasis_publish(topic, msg);
}
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+ struct stasis_subscription *sub)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ /* This assumes that we have already unsubscribed */
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+
+ /* Now we have to dispatch to the subscription itself */
+ dispatch_message(sub, msg);
+}
+
struct topic_pool_entry {
- struct stasis_subscription *forward;
+ struct stasis_forward *forward;
struct stasis_topic *topic;
};
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
- entry->forward = stasis_unsubscribe(entry->forward);
+ entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
}
@@ -731,13 +801,6 @@ void stasis_log_bad_type_access(const char *name)
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
}
-/*! \brief Shutdown function */
-static void stasis_exit(void)
-{
- ast_threadpool_shutdown(pool);
- pool = NULL;
-}
-
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
@@ -748,36 +811,14 @@ int stasis_init(void)
{
int cache_init;
- struct ast_threadpool_options opts;
-
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
- ast_register_atexit(stasis_exit);
-
- if (stasis_config_init() != 0) {
- ast_log(LOG_ERROR, "Stasis configuration failed\n");
- return -1;
- }
if (stasis_wait_init() != 0) {
ast_log(LOG_ERROR, "Stasis initialization failed\n");
return -1;
}
- if (pool) {
- ast_log(LOG_ERROR, "Stasis double-initialized\n");
- return -1;
- }
-
- stasis_config_get_threadpool_options(&opts);
- ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
- opts.initial_size, opts.max_size, opts.idle_timeout);
- pool = ast_threadpool_create("stasis-core", NULL, &opts);
- if (!pool) {
- ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
- return -1;
- }
-
cache_init = stasis_cache_init();
if (cache_init != 0) {
return -1;
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index d4375520d..279210d5b 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -339,8 +339,6 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_messa
static void stasis_cache_update_dtor(void *obj)
{
struct stasis_cache_update *update = obj;
- ao2_cleanup(update->topic);
- update->topic = NULL;
ao2_cleanup(update->old_snapshot);
update->old_snapshot = NULL;
ao2_cleanup(update->new_snapshot);
@@ -349,12 +347,11 @@ static void stasis_cache_update_dtor(void *obj)
update->type = NULL;
}
-static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
{
RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- ast_assert(topic != NULL);
ast_assert(old_snapshot != NULL || new_snapshot != NULL);
update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
@@ -363,8 +360,6 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
return NULL;
}
- ao2_ref(topic, +1);
- update->topic = topic;
if (old_snapshot) {
ao2_ref(old_snapshot, +1);
update->old_snapshot = old_snapshot;
@@ -390,7 +385,7 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
}
static void caching_topic_exec(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
struct stasis_caching_topic *caching_topic = data;
@@ -418,7 +413,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
if (clear_id) {
old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
if (old_snapshot) {
- update = update_create(topic, old_snapshot, NULL);
+ update = update_create(old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
return;
}
@@ -440,7 +435,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
- update = update_create(topic, old_snapshot, message);
+ update = update_create(old_snapshot, message);
if (update == NULL) {
return;
}
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
index 381fdd989..9644028c3 100644
--- a/main/stasis_cache_pattern.c
+++ b/main/stasis_cache_pattern.c
@@ -39,15 +39,15 @@ struct stasis_cp_all {
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
- struct stasis_subscription *forward_all_to_cached;
+ struct stasis_forward *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
- struct stasis_subscription *forward_topic_to_all;
- struct stasis_subscription *forward_cached_to_all;
+ struct stasis_forward *forward_topic_to_all;
+ struct stasis_forward *forward_cached_to_all;
};
static void all_dtor(void *obj)
@@ -60,7 +60,7 @@ static void all_dtor(void *obj)
all->topic_cached = NULL;
ao2_cleanup(all->cache);
all->cache = NULL;
- stasis_unsubscribe_and_join(all->forward_all_to_cached);
+ stasis_forward_cancel(all->forward_all_to_cached);
all->forward_all_to_cached = NULL;
}
@@ -172,9 +172,9 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
return;
}
- stasis_unsubscribe(one->forward_topic_to_all);
+ stasis_forward_cancel(one->forward_topic_to_all);
one->forward_topic_to_all = NULL;
- stasis_unsubscribe(one->forward_cached_to_all);
+ stasis_forward_cancel(one->forward_cached_to_all);
one->forward_cached_to_all = NULL;
stasis_caching_unsubscribe(one->topic_cached);
one->topic_cached = NULL;
diff --git a/main/stasis_config.c b/main/stasis_config.c
deleted file mode 100644
index 006df51dd..000000000
--- a/main/stasis_config.c
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2013, Digium, Inc.
- *
- * David M. Lee, II <dlee@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
-
-/*! \file
- *
- * \brief Stasis Message Bus configuration API.
- *
- * \author David M. Lee, II <dlee@digium.com>
- */
-
-/*** MODULEINFO
- <support_level>core</support_level>
- ***/
-
-#include "asterisk.h"
-
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
-#include "asterisk/config_options.h"
-#include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
-
-#include <limits.h>
-
-/*** DOCUMENTATION
- <configInfo name="stasis" language="en_US">
- <synopsis>Stasis message bus configuration.</synopsis>
- <configFile name="stasis.conf">
- <configObject name="threadpool">
- <synopsis>Threadpool configuration.</synopsis>
- <configOption name="initial_size" default="0">
- <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
- </configOption>
- <configOption name="idle_timeout_sec" default="20">
- <synopsis>Number of seconds for an idle thread to be disposed of.</synopsis>
- </configOption>
- <configOption name="max_size" default="200">
- <synopsis>Maximum number of threads in the threadpool.</synopsis>
- </configOption>
- </configObject>
- </configFile>
- </configInfo>
- ***/
-
-/*! \brief Locking container for safe configuration access. */
-static AO2_GLOBAL_OBJ_STATIC(confs);
-
-struct stasis_threadpool_conf {
- int initial_size;
- int idle_timeout_sec;
- int max_size;
-};
-
-struct stasis_conf {
- struct stasis_threadpool_conf *threadpool;
-};
-
-/*! \brief Mapping of the stasis conf struct's globals to the
- * threadpool context in the config file. */
-static struct aco_type threadpool_option = {
- .type = ACO_GLOBAL,
- .name = "threadpool",
- .item_offset = offsetof(struct stasis_conf, threadpool),
- .category = "^threadpool$",
- .category_match = ACO_WHITELIST,
-};
-
-static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
-
-#define CONF_FILENAME "stasis.conf"
-
-/*! \brief The conf file that's processed for the module. */
-static struct aco_file conf_file = {
- /*! The config file name. */
- .filename = CONF_FILENAME,
- /*! The mapping object types to be processed. */
- .types = ACO_TYPES(&threadpool_option),
-};
-
-static void conf_dtor(void *obj)
-{
- struct stasis_conf *conf = obj;
-
- ao2_cleanup(conf->threadpool);
- conf->threadpool = NULL;
-}
-
-static void *conf_alloc(void)
-{
- RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
-
- conf = ao2_alloc_options(sizeof(*conf), conf_dtor,
- AO2_ALLOC_OPT_LOCK_NOLOCK);
- if (!conf) {
- return NULL;
- }
-
- conf->threadpool = ao2_alloc_options(sizeof(*conf->threadpool), NULL,
- AO2_ALLOC_OPT_LOCK_NOLOCK);
- if (!conf->threadpool) {
- return NULL;
- }
-
- aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool);
-
- ao2_ref(conf, +1);
- return conf;
-}
-
-CONFIG_INFO_CORE("stasis", cfg_info, confs, conf_alloc,
- .files = ACO_FILES(&conf_file));
-
-void stasis_config_get_threadpool_options(
- struct ast_threadpool_options *threadpool_options)
-{
- RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
-
- conf = ao2_global_obj_ref(confs);
-
- ast_assert(conf && conf->threadpool);
-
- {
- struct ast_threadpool_options newopts = {
- .version = AST_THREADPOOL_OPTIONS_VERSION,
- .initial_size = conf->threadpool->initial_size,
- .auto_increment = 1,
- .idle_timeout = conf->threadpool->idle_timeout_sec,
- .max_size = conf->threadpool->max_size,
- };
-
- *threadpool_options = newopts;
- }
-}
-
-/*! \brief Load (or reload) configuration. */
-static int process_config(int reload)
-{
- RAII_VAR(struct stasis_conf *, conf, conf_alloc(), ao2_cleanup);
-
- switch (aco_process_config(&cfg_info, reload)) {
- case ACO_PROCESS_ERROR:
- if (conf && !reload
- && !aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool)) {
- ast_log(AST_LOG_NOTICE, "Failed to process Stasis configuration; using defaults\n");
- ao2_global_obj_replace_unref(confs, conf);
- return 0;
- }
- return -1;
- case ACO_PROCESS_OK:
- case ACO_PROCESS_UNCHANGED:
- break;
- }
-
- return 0;
-}
-
-static void config_exit(void)
-{
- aco_info_destroy(&cfg_info);
- ao2_global_obj_release(confs);
-}
-
-int stasis_config_init(void)
-{
- if (aco_info_init(&cfg_info)) {
- aco_info_destroy(&cfg_info);
- return -1;
- }
-
- ast_register_atexit(config_exit);
-
- /* threadpool section */
- aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
- threadpool_options, "0", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, initial_size), 0,
- INT_MAX);
- aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
- threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
- INT_MAX);
- aco_option_register(&cfg_info, "max_size", ACO_EXACT,
- threadpool_options, "200", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, max_size), 0, INT_MAX);
-
- return process_config(0);
-}
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 26d2f2c0c..8c82decfe 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
-/*! Number of hash buckets for the route table. Keep it prime! */
-#define ROUTE_TABLE_BUCKETS 7
-
/*! \internal */
struct stasis_message_route {
/*! Message type handle by this route. */
@@ -47,29 +44,79 @@ struct stasis_message_route {
void *data;
};
-static void route_dtor(void *obj)
+struct route_table {
+ /*! Current number of entries in the route table */
+ size_t current_size;
+ /*! Allocated number of entires in the route table */
+ size_t max_size;
+ /*! The route table itself */
+ struct stasis_message_route routes[];
+};
+
+static struct stasis_message_route *table_find_route(struct route_table *table,
+ struct stasis_message_type *message_type)
{
- struct stasis_message_route *route = obj;
+ size_t idx;
+
+ /* While a linear search for routes may seem very inefficient, most
+ * route tables have six routes or less. For such small data, it's
+ * hard to beat a linear search. If we start having larger route
+ * tables, then we can look into containers with more efficient
+ * lookups.
+ */
+ for (idx = 0; idx < table->current_size; ++idx) {
+ if (table->routes[idx].message_type == message_type) {
+ return &table->routes[idx];
+ }
+ }
- ao2_cleanup(route->message_type);
- route->message_type = NULL;
+ return NULL;
}
-static int route_hash(const void *obj, const int flags)
+static int table_add_route(struct route_table **table_ptr,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
{
- const struct stasis_message_route *route = obj;
- const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type;
+ struct route_table *table = *table_ptr;
+ struct stasis_message_route *route;
+
+ ast_assert(table_find_route(table, message_type) == NULL);
+
+ if (table->current_size + 1 > table->max_size) {
+ size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
+ struct route_table *new_table = ast_realloc(table,
+ sizeof(*new_table) +
+ sizeof(new_table->routes[0]) * new_max_size);
+ if (!new_table) {
+ return -1;
+ }
+ *table_ptr = table = new_table;
+ table->max_size = new_max_size;
+ }
- return ast_str_hash(stasis_message_type_name(message_type));
+ route = &table->routes[table->current_size++];
+
+ route->message_type = ao2_bump(message_type);
+ route->callback = callback;
+ route->data = data;
+
+ return 0;
}
-static int route_cmp(void *obj, void *arg, int flags)
+static int table_remove_route(struct route_table *table,
+ struct stasis_message_type *message_type)
{
- const struct stasis_message_route *left = obj;
- const struct stasis_message_route *right = arg;
- const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type;
-
- return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0;
+ size_t idx;
+
+ for (idx = 0; idx < table->current_size; ++idx) {
+ if (table->routes[idx].message_type == message_type) {
+ ao2_cleanup(message_type);
+ table->routes[idx] =
+ table->routes[--table->current_size];
+ return 0;
+ }
+ }
+ return -1;
}
/*! \internal */
@@ -77,11 +124,11 @@ struct stasis_message_router {
/*! Subscription to the upstream topic */
struct stasis_subscription *subscription;
/*! Subscribed routes */
- struct ao2_container *routes;
- /*! Subscribed routes for \ref stasi_cache_update messages */
- struct ao2_container *cache_routes;
+ struct route_table *routes;
+ /*! Subscribed routes for \ref stasis_cache_update messages */
+ struct route_table *cache_routes;
/*! Route of last resort */
- struct stasis_message_route *default_route;
+ struct stasis_message_route default_route;
};
static void router_dtor(void *obj)
@@ -92,66 +139,60 @@ static void router_dtor(void *obj)
ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
- ao2_cleanup(router->routes);
+ ast_free(router->routes);
router->routes = NULL;
- ao2_cleanup(router->cache_routes);
+ ast_free(router->cache_routes);
router->cache_routes = NULL;
-
- ao2_cleanup(router->default_route);
- router->default_route = NULL;
}
-static struct stasis_message_route *find_route(
+static int find_route(
struct stasis_message_router *router,
- struct stasis_message *message)
+ struct stasis_message *message,
+ struct stasis_message_route *route_out)
{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_route *route = NULL;
struct stasis_message_type *type = stasis_message_type(message);
SCOPED_AO2LOCK(lock, router);
+ ast_assert(route_out != NULL);
+
if (type == stasis_cache_update_type()) {
/* Find a cache route */
struct stasis_cache_update *update =
stasis_message_data(message);
- route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+ route = table_find_route(router->cache_routes, update->type);
}
if (route == NULL) {
/* Find a regular route */
- route = ao2_find(router->routes, type, OBJ_KEY);
+ route = table_find_route(router->routes, type);
}
- if (route == NULL) {
+ if (route == NULL && router->default_route.callback) {
/* Maybe the default route, then? */
- if ((route = router->default_route)) {
- ao2_ref(route, +1);
- }
+ route = &router->default_route;
}
- if (route == NULL) {
- return NULL;
+ if (!route) {
+ return -1;
}
- ao2_ref(route, +1);
- return route;
+ *route_out = *route;
+ return 0;
}
static void router_dispatch(void *data,
struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_route route;
- route = find_route(router, message);
-
- if (route) {
- route->callback(route->data, sub, topic, message);
+ if (find_route(router, message, &route) == 0) {
+ route.callback(route.data, sub, message);
}
-
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(router);
}
@@ -167,14 +208,12 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
- router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
- route_cmp);
+ router->routes = ast_calloc(1, sizeof(*router->routes));
if (!router->routes) {
return NULL;
}
- router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
- route_hash, route_cmp);
+ router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
if (!router->cache_routes) {
return NULL;
}
@@ -216,100 +255,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
return stasis_subscription_is_done(router->subscription);
}
-
-static struct stasis_message_route *route_create(
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data)
-{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
- route = ao2_alloc(sizeof(*route), route_dtor);
- if (!route) {
- return NULL;
- }
-
- if (message_type) {
- ao2_ref(message_type, +1);
- }
- route->message_type = message_type;
- route->callback = callback;
- route->data = data;
-
- ao2_ref(route, +1);
- return route;
-}
-
-static int add_route(struct stasis_message_router *router,
- struct stasis_message_route *route)
-{
- RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, router);
-
- existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
-
- if (existing_route) {
- ast_log(LOG_ERROR, "Cannot add route; route exists\n");
- return -1;
- }
-
- ao2_link(router->routes, route);
- return 0;
-}
-
-static int add_cache_route(struct stasis_message_router *router,
- struct stasis_message_route *route)
-{
- RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, router);
-
- existing_route = ao2_find(router->cache_routes, route->message_type,
- OBJ_KEY);
-
- if (existing_route) {
- ast_log(LOG_ERROR, "Cannot add route; route exists\n");
- return -1;
- }
-
- ao2_link(router->cache_routes, route);
- return 0;
-}
-
int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
- route = route_create(message_type, callback, data);
- if (!route) {
- return -1;
- }
-
- return add_route(router, route);
+ SCOPED_AO2LOCK(lock, router);
+ return table_add_route(&router->routes, message_type, callback, data);
}
int stasis_message_router_add_cache_update(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
- route = route_create(message_type, callback, data);
- if (!route) {
- return -1;
- }
-
- return add_cache_route(router, route);
+ SCOPED_AO2LOCK(lock, router);
+ return table_add_route(&router->cache_routes, message_type, callback, data);
}
void stasis_message_router_remove(struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
-
- ao2_find(router->routes, message_type,
- OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ table_remove_route(router->routes, message_type);
}
void stasis_message_router_remove_cache_update(
@@ -317,9 +283,7 @@ void stasis_message_router_remove_cache_update(
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
-
- ao2_find(router->cache_routes, message_type,
- OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ table_remove_route(router->cache_routes, message_type);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
@@ -327,7 +291,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
void *data)
{
SCOPED_AO2LOCK(lock, router);
- ao2_cleanup(router->default_route);
- router->default_route = route_create(NULL, callback, data);
- return router->default_route ? 0 : -1;
+ router->default_route.callback = callback;
+ router->default_route.data = data;
+ /* While this implementation can never fail, it used to be able to */
+ return 0;
}
diff --git a/main/stasis_wait.c b/main/stasis_wait.c
index e94c686e1..32b59718c 100644
--- a/main/stasis_wait.c
+++ b/main/stasis_wait.c
@@ -55,7 +55,7 @@ static void caching_guarantee_dtor(void *obj)
}
static void guarantee_handler(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
/* Wait for our particular message */
if (data == message) {
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index a8d1c80f9..189219d66 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/sem.h"
/*!
* \brief tps_task structure is queued to a taskprocessor
@@ -47,11 +48,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
*/
struct tps_task {
/*! \brief The execute() task callback function pointer */
- int (*execute)(void *datap);
+ union {
+ int (*execute)(void *datap);
+ int (*execute_local)(struct ast_taskprocessor_local *local);
+ } callback;
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
+ unsigned int wants_local:1;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -68,6 +73,7 @@ struct ast_taskprocessor {
const char *name;
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats *stats;
+ void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor queue */
@@ -113,9 +119,6 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
-/*! \brief The task processing function executed by a taskprocessor */
-static void *tps_processing_function(void *data);
-
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
static void tps_taskprocessor_destroy(void *tps);
@@ -138,47 +141,56 @@ static struct ast_cli_entry taskprocessor_clis[] = {
struct default_taskprocessor_listener_pvt {
pthread_t poll_thread;
- ast_mutex_t lock;
- ast_cond_t cond;
- int wake_up;
int dead;
+ struct ast_sem sem;
};
-
-static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
{
- SCOPED_MUTEX(lock, &pvt->lock);
- pvt->wake_up = 1;
- pvt->dead = should_die;
- ast_cond_signal(&pvt->cond);
+ ast_assert(pvt->dead);
+ ast_sem_destroy(&pvt->sem);
+ ast_free(pvt);
}
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
{
- SCOPED_MUTEX(lock, &pvt->lock);
- while (!pvt->wake_up) {
- ast_cond_wait(&pvt->cond, lock);
- }
- pvt->wake_up = 0;
- return pvt->dead;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+
+ default_listener_pvt_destroy(pvt);
+
+ listener->user_data = NULL;
}
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
*/
-static void *tps_processing_function(void *data)
+static void *default_tps_processing_function(void *data)
{
struct ast_taskprocessor_listener *listener = data;
struct ast_taskprocessor *tps = listener->tps;
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- int dead = 0;
-
- while (!dead) {
- if (!ast_taskprocessor_execute(tps)) {
- dead = default_tps_idle(pvt);
+ int sem_value;
+ int res;
+
+ while (!pvt->dead) {
+ res = ast_sem_wait(&pvt->sem);
+ if (res != 0 && errno != EINTR) {
+ ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
+ strerror(errno));
+ /* Just give up */
+ break;
}
+ ast_taskprocessor_execute(tps);
}
+
+ /* No posting to a dead taskprocessor! */
+ res = ast_sem_getvalue(&pvt->sem, &sem_value);
+ ast_assert(res == 0 && sem_value == 0);
+
+ /* Free the shutdown reference (see default_listener_shutdown) */
+ ao2_t_ref(listener->tps, -1, "tps-shutdown");
+
return NULL;
}
@@ -186,7 +198,7 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+ if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
return -1;
}
@@ -197,33 +209,50 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- ast_assert(!pvt->dead);
-
- if (was_empty) {
- default_tps_wake_up(pvt, 0);
+ if (ast_sem_post(&pvt->sem) != 0) {
+ ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
+ strerror(errno));
}
}
-static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+static int default_listener_die(void *data)
{
- ast_mutex_destroy(&pvt->lock);
- ast_cond_destroy(&pvt->cond);
- ast_free(pvt);
+ struct default_taskprocessor_listener_pvt *pvt = data;
+ pvt->dead = 1;
+ return 0;
}
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- default_tps_wake_up(pvt, 1);
- pthread_join(pvt->poll_thread, NULL);
+ int res;
+
+ /* Hold a reference during shutdown */
+ ao2_t_ref(listener->tps, +1, "tps-shutdown");
+
+ ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+
+ if (pthread_self() == pvt->poll_thread) {
+ res = pthread_detach(pvt->poll_thread);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_detach(): %s\n",
+ strerror(errno));
+ }
+ } else {
+ res = pthread_join(pvt->poll_thread, NULL);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_join(): %s\n",
+ strerror(errno));
+ }
+ }
pvt->poll_thread = AST_PTHREADT_NULL;
- default_listener_pvt_destroy(pvt);
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
.start = default_listener_start,
.task_pushed = default_task_pushed,
.shutdown = default_listener_shutdown,
+ .dtor = default_listener_pvt_dtor,
};
/*!
@@ -258,19 +287,48 @@ int ast_tps_init(void)
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
- if ((t = ast_calloc(1, sizeof(*t)))) {
- t->execute = task_exe;
- t->datap = datap;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
+ }
+
+ t->callback.execute = task_exe;
+ t->datap = datap;
+
+ return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+ struct tps_task *t;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
}
+
+ t->callback.execute_local = task_exe;
+ t->datap = datap;
+ t->wants_local = 1;
+
return t;
}
/* release task resources */
static void *tps_task_free(struct tps_task *task)
{
- if (task) {
- ast_free(task);
- }
+ ast_free(task);
return NULL;
}
@@ -425,16 +483,10 @@ static void tps_taskprocessor_destroy(void *tps)
}
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
/* free it */
- if (t->stats) {
- ast_free(t->stats);
- t->stats = NULL;
- }
+ ast_free(t->stats);
+ t->stats = NULL;
ast_free((char *) t->name);
if (t->listener) {
- /* This code should not be reached since the listener
- * should have been destroyed before the taskprocessor could
- * be destroyed
- */
ao2_ref(t->listener, -1);
t->listener = NULL;
}
@@ -447,7 +499,6 @@ static void tps_taskprocessor_destroy(void *tps)
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
{
struct tps_task *task;
- SCOPED_AO2LOCK(lock, tps);
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
@@ -476,10 +527,21 @@ static void listener_shutdown(struct ast_taskprocessor_listener *listener)
ao2_ref(listener->tps, -1);
}
+static void taskprocessor_listener_dtor(void *obj)
+{
+ struct ast_taskprocessor_listener *listener = obj;
+
+ if (listener->callbacks->dtor) {
+ listener->callbacks->dtor(listener);
+ }
+}
+
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
{
RAII_VAR(struct ast_taskprocessor_listener *, listener,
- ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
+ NULL, ao2_cleanup);
+
+ listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
if (!listener) {
return NULL;
@@ -510,9 +572,12 @@ static void *default_listener_pvt_alloc(void)
if (!pvt) {
return NULL;
}
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
pvt->poll_thread = AST_PTHREADT_NULL;
+ if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
+ ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+ ast_free(pvt);
+ return NULL;
+ }
return pvt;
}
@@ -594,7 +659,6 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
p = __allocate_taskprocessor(name, listener);
if (!p) {
- default_listener_pvt_destroy(pvt);
ao2_ref(listener, -1);
return NULL;
}
@@ -615,6 +679,13 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
return __allocate_taskprocessor(name, listener);
}
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+ void *local_data)
+{
+ SCOPED_AO2LOCK(lock, tps);
+ tps->local_data = local_data;
+}
+
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
@@ -636,20 +707,21 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
}
/* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
{
- struct tps_task *t;
int previous_size;
int was_empty;
- if (!tps || !task_exe) {
- ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+ if (!tps) {
+ ast_log(LOG_ERROR, "tps is NULL!\n");
return -1;
}
- if (!(t = tps_task_alloc(task_exe, datap))) {
- ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
+
+ if (!t) {
+ ast_log(LOG_ERROR, "t is NULL!\n");
return -1;
}
+
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
@@ -660,21 +732,43 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
return 0;
}
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
+ struct ast_taskprocessor_local local;
struct tps_task *t;
int size;
ao2_lock(tps);
+ t = tps_taskprocessor_pop(tps);
+ if (!t) {
+ ao2_unlock(tps);
+ return 0;
+ }
+
tps->executing = 1;
- ao2_unlock(tps);
- t = tps_taskprocessor_pop(tps);
+ if (t->wants_local) {
+ local.local_data = tps->local_data;
+ local.data = t->datap;
+ }
+ ao2_unlock(tps);
- if (t) {
- t->execute(t->datap);
- tps_task_free(t);
+ if (t->wants_local) {
+ t->callback.execute_local(&local);
+ } else {
+ t->callback.execute(t->datap);
}
+ tps_task_free(t);
ao2_lock(tps);
/* We need to check size in the same critical section where we reset the
@@ -684,7 +778,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
tps->executing = 0;
size = tps_taskprocessor_depth(tps);
/* If we executed a task, bump the stats */
- if (t && tps->stats) {
+ if (tps->stats) {
tps->stats->_tasks_processed_count++;
if (size > tps->stats->max_qsize) {
tps->stats->max_qsize = size;
@@ -693,7 +787,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
ao2_unlock(tps);
/* If we executed a task, check for the transition to empty */
- if (t && size == 0 && tps->listener->callbacks->emptied) {
+ if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
}
return size > 0;