summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/astobj2.c29
-rw-r--r--main/ccss.c6
-rw-r--r--main/cdr.c22
-rw-r--r--main/cel.c26
-rw-r--r--main/channel.c11
-rw-r--r--main/channel_internal_api.c6
-rw-r--r--main/devicestate.c2
-rw-r--r--main/endpoints.c4
-rw-r--r--main/manager.c11
-rw-r--r--main/manager_bridges.c8
-rw-r--r--main/manager_channels.c35
-rw-r--r--main/manager_endpoints.c7
-rw-r--r--main/manager_mwi.c6
-rw-r--r--main/manager_system.c4
-rw-r--r--main/pbx.c4
-rw-r--r--main/sem.c116
-rw-r--r--main/sounds_index.c2
-rw-r--r--main/stasis.c389
-rw-r--r--main/stasis_cache.c13
-rw-r--r--main/stasis_cache_pattern.c12
-rw-r--r--main/stasis_config.c201
-rw-r--r--main/stasis_message_router.c233
-rw-r--r--main/stasis_wait.c2
-rw-r--r--main/taskprocessor.c234
24 files changed, 674 insertions, 709 deletions
diff --git a/main/astobj2.c b/main/astobj2.c
index 88a6a6a23..88801bd2f 100644
--- a/main/astobj2.c
+++ b/main/astobj2.c
@@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
ast_atomic_fetchadd_int(&ao2.total_objects, -1);
#endif
+ /* In case someone uses an object after it's been freed */
+ obj->priv_data.magic = 0;
+
switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
obj_mutex = INTERNAL_OBJ_MUTEX(user_data);
ast_mutex_destroy(&obj_mutex->mutex.lock);
- /*
- * For safety, zero-out the astobj2_lock header and also the
- * first word of the user-data, which we make sure is always
- * allocated.
- */
- memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) );
ast_free(obj_mutex);
break;
case AO2_ALLOC_OPT_LOCK_RWLOCK:
obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data);
ast_rwlock_destroy(&obj_rwlock->rwlock.lock);
- /*
- * For safety, zero-out the astobj2_rwlock header and also the
- * first word of the user-data, which we make sure is always
- * allocated.
- */
- memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) );
ast_free(obj_rwlock);
break;
case AO2_ALLOC_OPT_LOCK_NOLOCK:
- /*
- * For safety, zero-out the astobj2 header and also the first
- * word of the user-data, which we make sure is always
- * allocated.
- */
- memset(obj, '\0', sizeof(*obj) + sizeof(void *) );
ast_free(obj);
break;
default:
@@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f
struct astobj2_lock *obj_mutex;
struct astobj2_rwlock *obj_rwlock;
- if (data_size < sizeof(void *)) {
- /*
- * We always alloc at least the size of a void *,
- * for debugging purposes.
- */
- data_size = sizeof(void *);
- }
-
switch (options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
#if defined(__AST_DEBUG_MALLOC)
diff --git a/main/ccss.c b/main/ccss.c
index 061c45a7c..3068c6ffa 100644
--- a/main/ccss.c
+++ b/main/ccss.c
@@ -1397,7 +1397,7 @@ static void generic_monitor_instance_list_destructor(void *obj)
ast_free((char *)generic_list->device_name);
}
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
{
struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
@@ -1471,7 +1471,7 @@ static int generic_monitor_devstate_tp_cb(void *data)
return 0;
}
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* Wow, it's cool that we've picked up on a state change, but we really want
* the actual work to be done in the core's taskprocessor execution thread
@@ -2750,7 +2750,7 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent)
return 0;
}
-static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_cc_agent *agent = userdata;
enum ast_device_state new_state;
diff --git a/main/cdr.c b/main/cdr.c
index fb02d3350..ea0f9c0e4 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -334,13 +334,13 @@ static struct ao2_container *active_cdrs_by_channel;
static struct stasis_message_router *stasis_router;
/*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
/*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
/*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
/*! \brief The parent topic for all topics we want to aggregate for CDRs */
static struct stasis_topic *cdr_topic;
@@ -1839,7 +1839,7 @@ static int finalized_state_process_party_a(struct cdr_object *cdr, struct ast_ch
* \param topic The topic this message was published for
* \param message The message
*/
-static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
@@ -2020,7 +2020,7 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot,
* \param topic The topic this message was published for
* \param message The message
*/
-static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
@@ -2150,7 +2150,7 @@ static int filter_bridge_messages(struct ast_bridge_snapshot *bridge)
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_leave_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2450,7 +2450,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_enter_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2494,7 +2494,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription *
* \param message The message about who got parked
* */
static void handle_parked_call_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_parked_call_payload *payload = stasis_message_data(message);
struct ast_channel_snapshot *channel = payload->parkee;
@@ -3884,9 +3884,9 @@ static int process_config(int reload)
static void cdr_engine_cleanup(void)
{
- channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
- bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
- parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+ channel_subscription = stasis_forward_cancel(channel_subscription);
+ bridge_subscription = stasis_forward_cancel(bridge_subscription);
+ parking_subscription = stasis_forward_cancel(parking_subscription);
stasis_message_router_unsubscribe_and_join(stasis_router);
ao2_cleanup(cdr_topic);
cdr_topic = NULL;
diff --git a/main/cel.c b/main/cel.c
index 6050fac75..0d78b5cce 100644
--- a/main/cel.c
+++ b/main/cel.c
@@ -121,16 +121,16 @@ static struct stasis_topic *cel_topic;
static struct stasis_topic *cel_aggregation_topic;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
/*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
/*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
struct stasis_message_type *cel_generic_type(void);
STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@@ -1019,7 +1019,6 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot)
}
static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_cache_update *update = stasis_message_data(message);
@@ -1082,7 +1081,6 @@ static struct ast_str *cel_generate_peer_str(
static void cel_bridge_enter_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1110,7 +1108,6 @@ static void cel_bridge_enter_cb(
static void cel_bridge_leave_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1138,7 +1135,6 @@ static void cel_bridge_leave_cb(
static void cel_parking_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_parked_call_payload *parked_payload = stasis_message_data(message);
@@ -1183,7 +1179,6 @@ static void save_dialstatus(struct ast_multi_channel_blob *blob)
}
static void cel_dial_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *blob = stasis_message_data(message);
@@ -1218,7 +1213,6 @@ static void cel_dial_cb(void *data, struct stasis_subscription *sub,
static void cel_generic_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
@@ -1241,7 +1235,6 @@ static void cel_generic_cb(
static void cel_blind_transfer_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *obj = stasis_message_data(message);
@@ -1289,7 +1282,6 @@ static void cel_blind_transfer_cb(
static void cel_attended_transfer_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_attended_transfer_message *xfer = stasis_message_data(message);
@@ -1342,7 +1334,6 @@ static void cel_attended_transfer_cb(
static void cel_pickup_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
@@ -1364,7 +1355,6 @@ static void cel_pickup_cb(
static void cel_local_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
@@ -1394,10 +1384,10 @@ static void ast_cel_engine_term(void)
cel_aggregation_topic = NULL;
ao2_cleanup(cel_topic);
cel_topic = NULL;
- cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
- cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
- cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
- cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+ cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+ cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+ cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+ cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
ast_cli_unregister(&cli_status);
ao2_cleanup(cel_dialstatus_store);
cel_dialstatus_store = NULL;
diff --git a/main/channel.c b/main/channel.c
index 7ba3e6c39..e8ce5e0c5 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -7549,14 +7549,19 @@ struct varshead *ast_channel_get_manager_vars(struct ast_channel *chan)
RAII_VAR(struct ast_str *, tmp, NULL, ast_free);
struct manager_channel_variable *mcv;
- ret = ao2_alloc(sizeof(*ret), varshead_dtor);
- tmp = ast_str_create(16);
-
if (!ret || !tmp) {
return NULL;
}
AST_RWLIST_RDLOCK(&channelvars);
+
+ if (AST_LIST_EMPTY(&channelvars)) {
+ return NULL;
+ }
+
+ ret = ao2_alloc(sizeof(*ret), varshead_dtor);
+ tmp = ast_str_create(16);
+
AST_LIST_TRAVERSE(&channelvars, mcv, entry) {
const char *val = NULL;
struct ast_var_t *var;
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index 956816d76..de2cc9c71 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -207,8 +207,7 @@ struct ast_channel {
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_cp_single *topics; /*!< Topic for all channel's events */
- struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
- struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
+ struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1429,8 +1428,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
ast_string_field_free_memory(chan);
- chan->forwarder = stasis_unsubscribe(chan->forwarder);
- chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+ chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
stasis_cp_single_unsubscribe(chan->topics);
chan->topics = NULL;
diff --git a/main/devicestate.c b/main/devicestate.c
index bcf07ff0b..158d1f817 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -610,7 +610,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre
return 1;
}
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
enum ast_device_state aggregate_state;
char *device;
diff --git a/main/endpoints.c b/main/endpoints.c
index b33e33f1a..bdcf401ba 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -152,7 +152,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
/*! \brief Handler for channel snapshot cache clears */
static void endpoint_cache_clear(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_endpoint *endpoint = data;
@@ -174,7 +174,7 @@ static void endpoint_cache_clear(void *data,
}
static void endpoint_default(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_endpoint *endpoint = data;
diff --git a/main/manager.c b/main/manager.c
index 00649dafa..69def4b1f 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1126,7 +1126,7 @@ static struct stasis_topic *manager_topic;
static struct stasis_message_router *stasis_router;
/*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
#define MGR_SHOW_TERMINAL_WIDTH 80
@@ -1151,7 +1151,7 @@ static const struct {
{{ "restart", "gracefully", NULL }},
};
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void acl_change_stasis_subscribe(void)
{
@@ -1427,7 +1427,6 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl
}
static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
@@ -1444,7 +1443,6 @@ static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
}
static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_json_payload *payload = stasis_message_data(message);
@@ -7640,7 +7638,6 @@ static void load_channelvars(struct ast_variable *var)
#ifdef TEST_FRAMEWORK
static void test_suite_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_test_suite_message_payload *payload;
@@ -7759,7 +7756,7 @@ static void manager_shutdown(void)
stasis_message_router_unsubscribe_and_join(stasis_router);
stasis_router = NULL;
}
- stasis_unsubscribe_and_join(rtp_topic_forwarder);
+ stasis_forward_cancel(rtp_topic_forwarder);
rtp_topic_forwarder = NULL;
ao2_cleanup(manager_topic);
manager_topic = NULL;
@@ -8344,7 +8341,7 @@ static int __init_manager(int reload, int by_external_config)
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;
diff --git a/main/manager_bridges.c b/main/manager_bridges.c
index 77d9ff05e..fad676b56 100644
--- a/main/manager_bridges.c
+++ b/main/manager_bridges.c
@@ -106,7 +106,7 @@ static struct stasis_message_router *bridge_state_router;
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_bridge_state_string_prefix(
const struct ast_bridge_snapshot *snapshot,
@@ -180,7 +180,6 @@ bridge_snapshot_monitor bridge_monitors[] = {
};
static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
@@ -221,7 +220,6 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
}
static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_merge_message *merge_msg = stasis_message_data(message);
@@ -254,7 +252,6 @@ static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
}
static void channel_enter_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
static const char *swap_name = "SwapUniqueid: ";
@@ -283,7 +280,6 @@ static void channel_enter_cb(void *data, struct stasis_subscription *sub,
}
static void channel_leave_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -456,7 +452,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m)
static void manager_bridging_cleanup(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/manager_channels.c b/main/manager_channels.c
index 485841b69..0bebb216c 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -370,7 +370,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_channel_state_string_prefix(
const struct ast_channel_snapshot *snapshot,
@@ -565,7 +565,6 @@ channel_snapshot_monitor channel_monitors[] = {
};
static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -616,7 +615,7 @@ static int userevent_exclusion_cb(const char *key)
}
static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -667,7 +666,7 @@ static void publish_basic_channel_event(const char *event, int class, struct ast
}
static void channel_hangup_request_cb(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
@@ -707,7 +706,7 @@ static void channel_hangup_request_cb(void *data,
}
static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
struct ast_channel_snapshot *spyer;
@@ -730,7 +729,7 @@ static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
}
static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free);
@@ -765,7 +764,7 @@ static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub
}
static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -806,7 +805,7 @@ static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
}
static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -853,7 +852,7 @@ static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
}
static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -884,7 +883,7 @@ static void channel_hangup_handler_cb(void *data, struct stasis_subscription *su
}
static void channel_fax_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free);
@@ -957,7 +956,7 @@ static void channel_fax_cb(void *data, struct stasis_subscription *sub,
}
static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
struct ast_json *blob = payload->blob;
@@ -977,7 +976,7 @@ static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
}
static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -985,7 +984,7 @@ static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
}
static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -993,7 +992,7 @@ static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub
}
static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@@ -1004,7 +1003,7 @@ static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
* \brief Callback processing messages for channel dialing
*/
static void channel_dial_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
const char *dialstatus;
@@ -1051,7 +1050,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
}
static void channel_hold_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
const char *musicclass;
@@ -1083,7 +1082,7 @@ static void channel_hold_cb(void *data, struct stasis_subscription *sub,
}
static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -1100,7 +1099,7 @@ static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
static void manager_channels_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/manager_endpoints.c b/main/manager_endpoints.c
index 634283728..b5f5b31c2 100644
--- a/main/manager_endpoints.c
+++ b/main/manager_endpoints.c
@@ -46,14 +46,9 @@ static void manager_endpoints_shutdown(void)
}
static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
- /* XXX This looks wrong. Nothing should post or forward to a caching
- * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
- * to dig to make sure I don't break anything, though.
- */
- stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
+ stasis_publish(ast_manager_get_topic(), message);
}
int manager_endpoints_init(void)
diff --git a/main/manager_mwi.c b/main/manager_mwi.c
index 12a3de361..849c315e1 100644
--- a/main/manager_mwi.c
+++ b/main/manager_mwi.c
@@ -41,7 +41,7 @@ struct stasis_message_router *mwi_state_router;
/*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
/*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
static int exclude_event_cb(const char *key)
@@ -54,7 +54,6 @@ static int exclude_event_cb(const char *key)
/*! \brief Generic MWI event callback used for one-off events from voicemail modules */
static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_blob *payload = stasis_message_data(message);
@@ -86,7 +85,6 @@ static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
}
static void mwi_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_state *mwi_state;
@@ -149,7 +147,7 @@ static void mwi_update_cb(void *data, struct stasis_subscription *sub,
static void manager_mwi_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/manager_system.c b/main/manager_system.c
index 4fef11da4..f4e7e9e0b 100644
--- a/main/manager_system.c
+++ b/main/manager_system.c
@@ -34,11 +34,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! \brief The \ref stasis subscription returned by the forwarding of the system topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static void manager_system_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
diff --git a/main/pbx.c b/main/pbx.c
index 09f3d95ec..2a415401f 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -5111,7 +5111,7 @@ static void get_device_state_causing_channels(struct ao2_container *c)
ao2_iterator_destroy(&iter);
}
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_device_state_message *dev_state;
struct ast_hint *hint;
@@ -11369,7 +11369,7 @@ static int pbx_builtin_sayphonetic(struct ast_channel *chan, const char *data)
return res;
}
-static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_presence_state_message *presence_state = stasis_message_data(msg);
struct ast_hint *hint;
diff --git a/main/sem.c b/main/sem.c
new file mode 100644
index 000000000..e67d9c72e
--- /dev/null
+++ b/main/sem.c
@@ -0,0 +1,116 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Asterisk semaphore support.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/sem.h"
+#include "asterisk/utils.h"
+
+#ifndef HAS_WORKING_SEMAPHORE
+
+/* DIY semaphores! */
+
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
+{
+ if (pshared) {
+ /* Don't need it... yet */
+ errno = ENOSYS;
+ return -1;
+ }
+
+ /* Since value is unsigned, this will also catch attempts to init with
+ * a negative value */
+ if (value > AST_SEM_VALUE_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ sem->count = value;
+ sem->waiters = 0;
+ ast_mutex_init(&sem->mutex);
+ ast_cond_init(&sem->cond, NULL);
+ return 0;
+}
+
+int ast_sem_destroy(struct ast_sem *sem)
+{
+ ast_mutex_destroy(&sem->mutex);
+ ast_cond_destroy(&sem->cond);
+ return 0;
+}
+
+int ast_sem_post(struct ast_sem *sem)
+{
+ SCOPED_MUTEX(lock, &sem->mutex);
+
+ ast_assert(sem->count >= 0);
+
+ if (sem->count == AST_SEM_VALUE_MAX) {
+ errno = EOVERFLOW;
+ return -1;
+ }
+
+ /* Give it up! */
+ ++sem->count;
+
+ /* Release a waiter, if needed */
+ if (sem->waiters) {
+ ast_cond_signal(&sem->cond);
+ }
+
+ return 0;
+}
+
+int ast_sem_wait(struct ast_sem *sem)
+{
+ SCOPED_MUTEX(lock, &sem->mutex);
+
+ ast_assert(sem->count >= 0);
+
+ /* Wait for a non-zero count */
+ ++sem->waiters;
+ while (sem->count == 0) {
+ ast_cond_wait(&sem->cond, &sem->mutex);
+ }
+ --sem->waiters;
+
+ /* Take it! */
+ --sem->count;
+
+ return 0;
+}
+
+int ast_sem_getvalue(struct ast_sem *sem, int *sval)
+{
+ SCOPED_MUTEX(lock, &sem->mutex);
+
+ ast_assert(sem->count >= 0);
+
+ *sval = sem->count;
+
+ return 0;
+}
+
+#endif
diff --git a/main/sounds_index.c b/main/sounds_index.c
index 9f70ef6cc..2fcd23908 100644
--- a/main/sounds_index.c
+++ b/main/sounds_index.c
@@ -281,7 +281,7 @@ static void sounds_cleanup(void)
}
static void format_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
ast_sounds_reindex();
}
diff --git a/main/stasis.c b/main/stasis.c
index 1a03bb3d4..42c901769 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -29,15 +29,15 @@
#include "asterisk.h"
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
+#include "asterisk/vector.h"
/*!
* \page stasis-impl Stasis Implementation Notes
@@ -134,24 +134,23 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
-/*! Threadpool for dispatching notifications to subscribers */
-static struct ast_threadpool *pool;
-
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
- struct stasis_subscription **subscribers;
- /*! Allocated length of the subscribers array */
- size_t num_subscribers_max;
- /*! Current size of the subscribers array */
- size_t num_subscribers_current;
+ ast_vector(struct stasis_subscription *) subscribers;
+
+ /*! Topics forwarding into this topic */
+ ast_vector(struct stasis_topic *) upstream_topics;
};
/* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+ struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
{
@@ -159,16 +158,18 @@ static void topic_dtor(void *obj)
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here. */
- ast_assert(topic->num_subscribers_current == 0);
+ ast_assert(ast_vector_size(topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
- ast_free(topic->subscribers);
- topic->subscribers = NULL;
+
+ ast_vector_free(topic->subscribers);
+ ast_vector_free(topic->upstream_topics);
}
struct stasis_topic *stasis_topic_create(const char *name)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ int res = 0;
topic = ao2_alloc(sizeof(*topic), topic_dtor);
@@ -181,9 +182,10 @@ struct stasis_topic *stasis_topic_create(const char *name)
return NULL;
}
- topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
- topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
- if (!topic->subscribers) {
+ res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+ res |= ast_vector_init(topic->upstream_topics, 0);
+
+ if (res != 0) {
return NULL;
}
@@ -247,7 +249,6 @@ static void subscription_dtor(void *obj)
* \param message Message to send.
*/
static void subscription_invoke(struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
/* Notify that the final message has been received */
@@ -258,7 +259,7 @@ static void subscription_invoke(struct stasis_subscription *sub,
}
/* Since sub is mostly immutable, no need to lock sub */
- sub->callback(sub->data, sub, topic, message);
+ sub->callback(sub->data, sub, message);
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
@@ -268,7 +269,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
}
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
@@ -286,10 +288,21 @@ struct stasis_subscription *internal_stasis_subscribe(
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
- sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+ /* With a small number of subscribers, a thread-per-sub is
+ * acceptable. If our usage changes so that we have larger
+ * numbers of subscribers, we'll probably want to consider
+ * a threadpool. We had that originally, but with so few
+ * subscribers it was actually a performance loss instead of
+ * a gain.
+ */
+ sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+ TPS_REF_DEFAULT);
if (!sub->mailbox) {
return NULL;
}
+ ast_taskprocessor_set_local(sub->mailbox, sub);
+ /* Taskprocessor has a reference */
+ ao2_ref(sub, +1);
}
ao2_ref(topic, +1);
@@ -302,7 +315,7 @@ struct stasis_subscription *internal_stasis_subscribe(
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
}
- send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+ send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub;
@@ -316,29 +329,42 @@ struct stasis_subscription *stasis_subscribe(
return internal_stasis_subscribe(topic, callback, data, 1);
}
+static int sub_cleanup(void *data)
+{
+ struct stasis_subscription *sub = data;
+ ao2_cleanup(sub);
+ return 0;
+}
+
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
- if (sub) {
- size_t i;
- /* The subscription may be the last ref to this topic. Hold
- * the topic ref open until after the unlock. */
- RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
- ao2_cleanup);
- SCOPED_AO2LOCK(lock_topic, topic);
+ /* The subscription may be the last ref to this topic. Hold
+ * the topic ref open until after the unlock. */
+ RAII_VAR(struct stasis_topic *, topic,
+ ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
- send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
- /* swap [i] with last entry; remove last entry */
- topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
- /* Unsubscribing unrefs the subscription */
- ao2_cleanup(sub);
- return NULL;
- }
- }
+ if (!sub) {
+ return NULL;
+ }
+
+ /* We have to remove the subscription first, to ensure the unsubscribe
+ * is the final message */
+ if (topic_remove_subscription(sub->topic, sub) != 0) {
+ ast_log(LOG_ERROR,
+ "Internal error: subscription has invalid topic\n");
+ return NULL;
+ }
+
+ /* Now let everyone know about the unsubscribe */
+ send_subscription_unsubscribe(topic, sub);
- ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+ /* When all that's done, remove the ref the mailbox has on the sub */
+ if (sub->mailbox) {
+ ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
}
+
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
return NULL;
}
@@ -388,8 +414,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ if (ast_vector_get(topic->subscribers, i) == sub) {
return 1;
}
}
@@ -431,74 +457,36 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
*/
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- struct stasis_subscription **subscribers;
+ size_t idx;
SCOPED_AO2LOCK(lock, topic);
- /* Increase list size, if needed */
- if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
- subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
- if (!subscribers) {
- return -1;
- }
- topic->subscribers = subscribers;
- topic->num_subscribers_max *= 2;
- }
-
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
* to release it.
*
* If we bumped the refcount here, the owner would have to unsubscribe
* and cleanup, which is a bit awkward. */
- topic->subscribers[topic->num_subscribers_current++] = sub;
- return 0;
-}
+ ast_vector_append(topic->subscribers, sub);
-/*!
- * \internal
- * \brief Information needed to dispatch a message to a subscription
- */
-struct dispatch {
- /*! Topic message was published to */
- struct stasis_topic *topic;
- /*! The message itself */
- struct stasis_message *message;
- /*! Subscription receiving the message */
- struct stasis_subscription *sub;
-};
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_add_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
-static void dispatch_dtor(void *data)
-{
- struct dispatch *dispatch = data;
- ao2_cleanup(dispatch->topic);
- ao2_cleanup(dispatch->message);
- ao2_cleanup(dispatch->sub);
+ return 0;
}
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+ size_t idx;
+ SCOPED_AO2LOCK(lock_topic, topic);
- ast_assert(topic != NULL);
- ast_assert(message != NULL);
- ast_assert(sub != NULL);
-
- dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
- if (!dispatch) {
- return NULL;
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_remove_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
}
- dispatch->topic = topic;
- ao2_ref(topic, +1);
-
- dispatch->message = message;
- ao2_ref(message, +1);
-
- dispatch->sub = sub;
- ao2_ref(sub, +1);
-
- ao2_ref(dispatch, +1);
- return dispatch;
+ return ast_vector_remove_elem_unordered(topic->subscribers, sub);
}
/*!
@@ -506,16 +494,34 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
* \param data \ref dispatch object
* \return 0
*/
-static int dispatch_exec(void *data)
+static int dispatch_exec(struct ast_taskprocessor_local *local)
{
- RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+ struct stasis_subscription *sub = local->local_data;
+ struct stasis_message *message = local->data;
- subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
+ subscription_invoke(sub, message);
+ ao2_cleanup(message);
return 0;
}
-void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+static void dispatch_message(struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ if (sub->mailbox) {
+ ao2_bump(message);
+ if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+ /* Push failed; ugh. */
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ ao2_cleanup(message);
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, message);
+ }
+}
+
+void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
{
size_t i;
/* The topic may be unref'ed by the subscription invocation.
@@ -525,70 +531,104 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
- ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- struct stasis_subscription *sub = topic->subscribers[i];
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ struct stasis_subscription *sub =
+ ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
- if (sub->mailbox) {
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
- }
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
- /* Ownership transferred to mailbox.
- * Don't increment ref, b/c the task processor
- * may have already gotten rid of the object.
- */
- dispatch = NULL;
- }
- } else {
- /* Dispatch directly */
- subscription_invoke(sub, publisher_topic, message);
- }
+ dispatch_message(sub, message);
}
}
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+/*!
+ * \brief Forwarding information
+ *
+ * Any message posted to \a from_topic is forwarded to \a to_topic.
+ *
+ * In cases where both the \a from_topic and \a to_topic need to be locked,
+ * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
+ */
+struct stasis_forward {
+ /*! Originating topic */
+ struct stasis_topic *from_topic;
+ /*! Destination topic */
+ struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
{
- stasis_forward_message(topic, topic, message);
+ struct stasis_forward *forward = obj;
+
+ ao2_cleanup(forward->from_topic);
+ forward->from_topic = NULL;
+ ao2_cleanup(forward->to_topic);
+ forward->to_topic = NULL;
}
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
{
- struct stasis_topic *to_topic = data;
- stasis_forward_message(to_topic, topic, message);
+ if (forward) {
+ int idx;
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(to_topic);
+ struct stasis_topic *from = forward->from_topic;
+ struct stasis_topic *to = forward->to_topic;
+
+ SCOPED_AO2LOCK(to_lock, to);
+
+ ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+ ao2_lock(from);
+ for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+ topic_remove_subscription(
+ from, ast_vector_get(to->subscribers, idx));
+ }
+ ao2_unlock(from);
}
+
+ ao2_cleanup(forward);
+
+ return NULL;
}
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic)
{
- struct stasis_subscription *sub;
+ RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+
if (!from_topic || !to_topic) {
return NULL;
}
- /* Forwarding subscriptions should dispatch directly instead of having a
- * mailbox. Otherwise, messages forwarded to the same topic from
- * different topics may get reordered. Which is bad.
- */
- sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
- if (sub) {
- /* hold a ref to to_topic for this forwarding subscription */
- ao2_ref(to_topic, +1);
+ forward = ao2_alloc(sizeof(*forward), forward_dtor);
+ if (!forward) {
+ return NULL;
}
- return sub;
+
+ forward->from_topic = ao2_bump(from_topic);
+ forward->to_topic = ao2_bump(to_topic);
+
+ {
+ SCOPED_AO2LOCK(lock, to_topic);
+ int res;
+
+ res = ast_vector_append(to_topic->upstream_topics, from_topic);
+ if (res != 0) {
+ return NULL;
+ }
+
+ {
+ SCOPED_AO2LOCK(lock, from_topic);
+ size_t idx;
+ for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+ topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
+ }
+ }
+ }
+
+ return ao2_bump(forward);
}
static void subscription_change_dtor(void *obj)
@@ -598,7 +638,7 @@ static void subscription_change_dtor(void *obj)
ao2_cleanup(change->topic);
}
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
@@ -616,12 +656,15 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi
return change;
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- change = subscription_change_alloc(topic, uniqueid, description);
+ /* This assumes that we have already unsubscribed */
+ ast_assert(stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
@@ -636,15 +679,42 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u
stasis_publish(topic, msg);
}
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+ struct stasis_subscription *sub)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ /* This assumes that we have already unsubscribed */
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+
+ /* Now we have to dispatch to the subscription itself */
+ dispatch_message(sub, msg);
+}
+
struct topic_pool_entry {
- struct stasis_subscription *forward;
+ struct stasis_forward *forward;
struct stasis_topic *topic;
};
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
- entry->forward = stasis_unsubscribe(entry->forward);
+ entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
}
@@ -731,13 +801,6 @@ void stasis_log_bad_type_access(const char *name)
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
}
-/*! \brief Shutdown function */
-static void stasis_exit(void)
-{
- ast_threadpool_shutdown(pool);
- pool = NULL;
-}
-
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
@@ -748,36 +811,14 @@ int stasis_init(void)
{
int cache_init;
- struct ast_threadpool_options opts;
-
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
- ast_register_atexit(stasis_exit);
-
- if (stasis_config_init() != 0) {
- ast_log(LOG_ERROR, "Stasis configuration failed\n");
- return -1;
- }
if (stasis_wait_init() != 0) {
ast_log(LOG_ERROR, "Stasis initialization failed\n");
return -1;
}
- if (pool) {
- ast_log(LOG_ERROR, "Stasis double-initialized\n");
- return -1;
- }
-
- stasis_config_get_threadpool_options(&opts);
- ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
- opts.initial_size, opts.max_size, opts.idle_timeout);
- pool = ast_threadpool_create("stasis-core", NULL, &opts);
- if (!pool) {
- ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
- return -1;
- }
-
cache_init = stasis_cache_init();
if (cache_init != 0) {
return -1;
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index d4375520d..279210d5b 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -339,8 +339,6 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_messa
static void stasis_cache_update_dtor(void *obj)
{
struct stasis_cache_update *update = obj;
- ao2_cleanup(update->topic);
- update->topic = NULL;
ao2_cleanup(update->old_snapshot);
update->old_snapshot = NULL;
ao2_cleanup(update->new_snapshot);
@@ -349,12 +347,11 @@ static void stasis_cache_update_dtor(void *obj)
update->type = NULL;
}
-static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
{
RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- ast_assert(topic != NULL);
ast_assert(old_snapshot != NULL || new_snapshot != NULL);
update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
@@ -363,8 +360,6 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
return NULL;
}
- ao2_ref(topic, +1);
- update->topic = topic;
if (old_snapshot) {
ao2_ref(old_snapshot, +1);
update->old_snapshot = old_snapshot;
@@ -390,7 +385,7 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
}
static void caching_topic_exec(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
struct stasis_caching_topic *caching_topic = data;
@@ -418,7 +413,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
if (clear_id) {
old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
if (old_snapshot) {
- update = update_create(topic, old_snapshot, NULL);
+ update = update_create(old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
return;
}
@@ -440,7 +435,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
- update = update_create(topic, old_snapshot, message);
+ update = update_create(old_snapshot, message);
if (update == NULL) {
return;
}
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
index 381fdd989..9644028c3 100644
--- a/main/stasis_cache_pattern.c
+++ b/main/stasis_cache_pattern.c
@@ -39,15 +39,15 @@ struct stasis_cp_all {
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
- struct stasis_subscription *forward_all_to_cached;
+ struct stasis_forward *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
- struct stasis_subscription *forward_topic_to_all;
- struct stasis_subscription *forward_cached_to_all;
+ struct stasis_forward *forward_topic_to_all;
+ struct stasis_forward *forward_cached_to_all;
};
static void all_dtor(void *obj)
@@ -60,7 +60,7 @@ static void all_dtor(void *obj)
all->topic_cached = NULL;
ao2_cleanup(all->cache);
all->cache = NULL;
- stasis_unsubscribe_and_join(all->forward_all_to_cached);
+ stasis_forward_cancel(all->forward_all_to_cached);
all->forward_all_to_cached = NULL;
}
@@ -172,9 +172,9 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
return;
}
- stasis_unsubscribe(one->forward_topic_to_all);
+ stasis_forward_cancel(one->forward_topic_to_all);
one->forward_topic_to_all = NULL;
- stasis_unsubscribe(one->forward_cached_to_all);
+ stasis_forward_cancel(one->forward_cached_to_all);
one->forward_cached_to_all = NULL;
stasis_caching_unsubscribe(one->topic_cached);
one->topic_cached = NULL;
diff --git a/main/stasis_config.c b/main/stasis_config.c
deleted file mode 100644
index 006df51dd..000000000
--- a/main/stasis_config.c
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2013, Digium, Inc.
- *
- * David M. Lee, II <dlee@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
-
-/*! \file
- *
- * \brief Stasis Message Bus configuration API.
- *
- * \author David M. Lee, II <dlee@digium.com>
- */
-
-/*** MODULEINFO
- <support_level>core</support_level>
- ***/
-
-#include "asterisk.h"
-
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
-#include "asterisk/config_options.h"
-#include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
-
-#include <limits.h>
-
-/*** DOCUMENTATION
- <configInfo name="stasis" language="en_US">
- <synopsis>Stasis message bus configuration.</synopsis>
- <configFile name="stasis.conf">
- <configObject name="threadpool">
- <synopsis>Threadpool configuration.</synopsis>
- <configOption name="initial_size" default="0">
- <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
- </configOption>
- <configOption name="idle_timeout_sec" default="20">
- <synopsis>Number of seconds for an idle thread to be disposed of.</synopsis>
- </configOption>
- <configOption name="max_size" default="200">
- <synopsis>Maximum number of threads in the threadpool.</synopsis>
- </configOption>
- </configObject>
- </configFile>
- </configInfo>
- ***/
-
-/*! \brief Locking container for safe configuration access. */
-static AO2_GLOBAL_OBJ_STATIC(confs);
-
-struct stasis_threadpool_conf {
- int initial_size;
- int idle_timeout_sec;
- int max_size;
-};
-
-struct stasis_conf {
- struct stasis_threadpool_conf *threadpool;
-};
-
-/*! \brief Mapping of the stasis conf struct's globals to the
- * threadpool context in the config file. */
-static struct aco_type threadpool_option = {
- .type = ACO_GLOBAL,
- .name = "threadpool",
- .item_offset = offsetof(struct stasis_conf, threadpool),
- .category = "^threadpool$",
- .category_match = ACO_WHITELIST,
-};
-
-static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
-
-#define CONF_FILENAME "stasis.conf"
-
-/*! \brief The conf file that's processed for the module. */
-static struct aco_file conf_file = {
- /*! The config file name. */
- .filename = CONF_FILENAME,
- /*! The mapping object types to be processed. */
- .types = ACO_TYPES(&threadpool_option),
-};
-
-static void conf_dtor(void *obj)
-{
- struct stasis_conf *conf = obj;
-
- ao2_cleanup(conf->threadpool);
- conf->threadpool = NULL;
-}
-
-static void *conf_alloc(void)
-{
- RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
-
- conf = ao2_alloc_options(sizeof(*conf), conf_dtor,
- AO2_ALLOC_OPT_LOCK_NOLOCK);
- if (!conf) {
- return NULL;
- }
-
- conf->threadpool = ao2_alloc_options(sizeof(*conf->threadpool), NULL,
- AO2_ALLOC_OPT_LOCK_NOLOCK);
- if (!conf->threadpool) {
- return NULL;
- }
-
- aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool);
-
- ao2_ref(conf, +1);
- return conf;
-}
-
-CONFIG_INFO_CORE("stasis", cfg_info, confs, conf_alloc,
- .files = ACO_FILES(&conf_file));
-
-void stasis_config_get_threadpool_options(
- struct ast_threadpool_options *threadpool_options)
-{
- RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
-
- conf = ao2_global_obj_ref(confs);
-
- ast_assert(conf && conf->threadpool);
-
- {
- struct ast_threadpool_options newopts = {
- .version = AST_THREADPOOL_OPTIONS_VERSION,
- .initial_size = conf->threadpool->initial_size,
- .auto_increment = 1,
- .idle_timeout = conf->threadpool->idle_timeout_sec,
- .max_size = conf->threadpool->max_size,
- };
-
- *threadpool_options = newopts;
- }
-}
-
-/*! \brief Load (or reload) configuration. */
-static int process_config(int reload)
-{
- RAII_VAR(struct stasis_conf *, conf, conf_alloc(), ao2_cleanup);
-
- switch (aco_process_config(&cfg_info, reload)) {
- case ACO_PROCESS_ERROR:
- if (conf && !reload
- && !aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool)) {
- ast_log(AST_LOG_NOTICE, "Failed to process Stasis configuration; using defaults\n");
- ao2_global_obj_replace_unref(confs, conf);
- return 0;
- }
- return -1;
- case ACO_PROCESS_OK:
- case ACO_PROCESS_UNCHANGED:
- break;
- }
-
- return 0;
-}
-
-static void config_exit(void)
-{
- aco_info_destroy(&cfg_info);
- ao2_global_obj_release(confs);
-}
-
-int stasis_config_init(void)
-{
- if (aco_info_init(&cfg_info)) {
- aco_info_destroy(&cfg_info);
- return -1;
- }
-
- ast_register_atexit(config_exit);
-
- /* threadpool section */
- aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
- threadpool_options, "0", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, initial_size), 0,
- INT_MAX);
- aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
- threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
- INT_MAX);
- aco_option_register(&cfg_info, "max_size", ACO_EXACT,
- threadpool_options, "200", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, max_size), 0, INT_MAX);
-
- return process_config(0);
-}
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 26d2f2c0c..8c82decfe 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
-/*! Number of hash buckets for the route table. Keep it prime! */
-#define ROUTE_TABLE_BUCKETS 7
-
/*! \internal */
struct stasis_message_route {
/*! Message type handle by this route. */
@@ -47,29 +44,79 @@ struct stasis_message_route {
void *data;
};
-static void route_dtor(void *obj)
+struct route_table {
+ /*! Current number of entries in the route table */
+ size_t current_size;
+ /*! Allocated number of entires in the route table */
+ size_t max_size;
+ /*! The route table itself */
+ struct stasis_message_route routes[];
+};
+
+static struct stasis_message_route *table_find_route(struct route_table *table,
+ struct stasis_message_type *message_type)
{
- struct stasis_message_route *route = obj;
+ size_t idx;
+
+ /* While a linear search for routes may seem very inefficient, most
+ * route tables have six routes or less. For such small data, it's
+ * hard to beat a linear search. If we start having larger route
+ * tables, then we can look into containers with more efficient
+ * lookups.
+ */
+ for (idx = 0; idx < table->current_size; ++idx) {
+ if (table->routes[idx].message_type == message_type) {
+ return &table->routes[idx];
+ }
+ }
- ao2_cleanup(route->message_type);
- route->message_type = NULL;
+ return NULL;
}
-static int route_hash(const void *obj, const int flags)
+static int table_add_route(struct route_table **table_ptr,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
{
- const struct stasis_message_route *route = obj;
- const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type;
+ struct route_table *table = *table_ptr;
+ struct stasis_message_route *route;
+
+ ast_assert(table_find_route(table, message_type) == NULL);
+
+ if (table->current_size + 1 > table->max_size) {
+ size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
+ struct route_table *new_table = ast_realloc(table,
+ sizeof(*new_table) +
+ sizeof(new_table->routes[0]) * new_max_size);
+ if (!new_table) {
+ return -1;
+ }
+ *table_ptr = table = new_table;
+ table->max_size = new_max_size;
+ }
- return ast_str_hash(stasis_message_type_name(message_type));
+ route = &table->routes[table->current_size++];
+
+ route->message_type = ao2_bump(message_type);
+ route->callback = callback;
+ route->data = data;
+
+ return 0;
}
-static int route_cmp(void *obj, void *arg, int flags)
+static int table_remove_route(struct route_table *table,
+ struct stasis_message_type *message_type)
{
- const struct stasis_message_route *left = obj;
- const struct stasis_message_route *right = arg;
- const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type;
-
- return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0;
+ size_t idx;
+
+ for (idx = 0; idx < table->current_size; ++idx) {
+ if (table->routes[idx].message_type == message_type) {
+ ao2_cleanup(message_type);
+ table->routes[idx] =
+ table->routes[--table->current_size];
+ return 0;
+ }
+ }
+ return -1;
}
/*! \internal */
@@ -77,11 +124,11 @@ struct stasis_message_router {
/*! Subscription to the upstream topic */
struct stasis_subscription *subscription;
/*! Subscribed routes */
- struct ao2_container *routes;
- /*! Subscribed routes for \ref stasi_cache_update messages */
- struct ao2_container *cache_routes;
+ struct route_table *routes;
+ /*! Subscribed routes for \ref stasis_cache_update messages */
+ struct route_table *cache_routes;
/*! Route of last resort */
- struct stasis_message_route *default_route;
+ struct stasis_message_route default_route;
};
static void router_dtor(void *obj)
@@ -92,66 +139,60 @@ static void router_dtor(void *obj)
ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
- ao2_cleanup(router->routes);
+ ast_free(router->routes);
router->routes = NULL;
- ao2_cleanup(router->cache_routes);
+ ast_free(router->cache_routes);
router->cache_routes = NULL;
-
- ao2_cleanup(router->default_route);
- router->default_route = NULL;
}
-static struct stasis_message_route *find_route(
+static int find_route(
struct stasis_message_router *router,
- struct stasis_message *message)
+ struct stasis_message *message,
+ struct stasis_message_route *route_out)
{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_route *route = NULL;
struct stasis_message_type *type = stasis_message_type(message);
SCOPED_AO2LOCK(lock, router);
+ ast_assert(route_out != NULL);
+
if (type == stasis_cache_update_type()) {
/* Find a cache route */
struct stasis_cache_update *update =
stasis_message_data(message);
- route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+ route = table_find_route(router->cache_routes, update->type);
}
if (route == NULL) {
/* Find a regular route */
- route = ao2_find(router->routes, type, OBJ_KEY);
+ route = table_find_route(router->routes, type);
}
- if (route == NULL) {
+ if (route == NULL && router->default_route.callback) {
/* Maybe the default route, then? */
- if ((route = router->default_route)) {
- ao2_ref(route, +1);
- }
+ route = &router->default_route;
}
- if (route == NULL) {
- return NULL;
+ if (!route) {
+ return -1;
}
- ao2_ref(route, +1);
- return route;
+ *route_out = *route;
+ return 0;
}
static void router_dispatch(void *data,
struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_route route;
- route = find_route(router, message);
-
- if (route) {
- route->callback(route->data, sub, topic, message);
+ if (find_route(router, message, &route) == 0) {
+ route.callback(route.data, sub, message);
}
-
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(router);
}
@@ -167,14 +208,12 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
- router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
- route_cmp);
+ router->routes = ast_calloc(1, sizeof(*router->routes));
if (!router->routes) {
return NULL;
}
- router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
- route_hash, route_cmp);
+ router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
if (!router->cache_routes) {
return NULL;
}
@@ -216,100 +255,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
return stasis_subscription_is_done(router->subscription);
}
-
-static struct stasis_message_route *route_create(
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data)
-{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
- route = ao2_alloc(sizeof(*route), route_dtor);
- if (!route) {
- return NULL;
- }
-
- if (message_type) {
- ao2_ref(message_type, +1);
- }
- route->message_type = message_type;
- route->callback = callback;
- route->data = data;
-
- ao2_ref(route, +1);
- return route;
-}
-
-static int add_route(struct stasis_message_router *router,
- struct stasis_message_route *route)
-{
- RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, router);
-
- existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
-
- if (existing_route) {
- ast_log(LOG_ERROR, "Cannot add route; route exists\n");
- return -1;
- }
-
- ao2_link(router->routes, route);
- return 0;
-}
-
-static int add_cache_route(struct stasis_message_router *router,
- struct stasis_message_route *route)
-{
- RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, router);
-
- existing_route = ao2_find(router->cache_routes, route->message_type,
- OBJ_KEY);
-
- if (existing_route) {
- ast_log(LOG_ERROR, "Cannot add route; route exists\n");
- return -1;
- }
-
- ao2_link(router->cache_routes, route);
- return 0;
-}
-
int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
- route = route_create(message_type, callback, data);
- if (!route) {
- return -1;
- }
-
- return add_route(router, route);
+ SCOPED_AO2LOCK(lock, router);
+ return table_add_route(&router->routes, message_type, callback, data);
}
int stasis_message_router_add_cache_update(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
- route = route_create(message_type, callback, data);
- if (!route) {
- return -1;
- }
-
- return add_cache_route(router, route);
+ SCOPED_AO2LOCK(lock, router);
+ return table_add_route(&router->cache_routes, message_type, callback, data);
}
void stasis_message_router_remove(struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
-
- ao2_find(router->routes, message_type,
- OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ table_remove_route(router->routes, message_type);
}
void stasis_message_router_remove_cache_update(
@@ -317,9 +283,7 @@ void stasis_message_router_remove_cache_update(
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
-
- ao2_find(router->cache_routes, message_type,
- OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ table_remove_route(router->cache_routes, message_type);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
@@ -327,7 +291,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
void *data)
{
SCOPED_AO2LOCK(lock, router);
- ao2_cleanup(router->default_route);
- router->default_route = route_create(NULL, callback, data);
- return router->default_route ? 0 : -1;
+ router->default_route.callback = callback;
+ router->default_route.data = data;
+ /* While this implementation can never fail, it used to be able to */
+ return 0;
}
diff --git a/main/stasis_wait.c b/main/stasis_wait.c
index e94c686e1..32b59718c 100644
--- a/main/stasis_wait.c
+++ b/main/stasis_wait.c
@@ -55,7 +55,7 @@ static void caching_guarantee_dtor(void *obj)
}
static void guarantee_handler(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
/* Wait for our particular message */
if (data == message) {
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index a8d1c80f9..189219d66 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/sem.h"
/*!
* \brief tps_task structure is queued to a taskprocessor
@@ -47,11 +48,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
*/
struct tps_task {
/*! \brief The execute() task callback function pointer */
- int (*execute)(void *datap);
+ union {
+ int (*execute)(void *datap);
+ int (*execute_local)(struct ast_taskprocessor_local *local);
+ } callback;
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
+ unsigned int wants_local:1;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -68,6 +73,7 @@ struct ast_taskprocessor {
const char *name;
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats *stats;
+ void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor queue */
@@ -113,9 +119,6 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
-/*! \brief The task processing function executed by a taskprocessor */
-static void *tps_processing_function(void *data);
-
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
static void tps_taskprocessor_destroy(void *tps);
@@ -138,47 +141,56 @@ static struct ast_cli_entry taskprocessor_clis[] = {
struct default_taskprocessor_listener_pvt {
pthread_t poll_thread;
- ast_mutex_t lock;
- ast_cond_t cond;
- int wake_up;
int dead;
+ struct ast_sem sem;
};
-
-static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
{
- SCOPED_MUTEX(lock, &pvt->lock);
- pvt->wake_up = 1;
- pvt->dead = should_die;
- ast_cond_signal(&pvt->cond);
+ ast_assert(pvt->dead);
+ ast_sem_destroy(&pvt->sem);
+ ast_free(pvt);
}
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
{
- SCOPED_MUTEX(lock, &pvt->lock);
- while (!pvt->wake_up) {
- ast_cond_wait(&pvt->cond, lock);
- }
- pvt->wake_up = 0;
- return pvt->dead;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+
+ default_listener_pvt_destroy(pvt);
+
+ listener->user_data = NULL;
}
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
*/
-static void *tps_processing_function(void *data)
+static void *default_tps_processing_function(void *data)
{
struct ast_taskprocessor_listener *listener = data;
struct ast_taskprocessor *tps = listener->tps;
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- int dead = 0;
-
- while (!dead) {
- if (!ast_taskprocessor_execute(tps)) {
- dead = default_tps_idle(pvt);
+ int sem_value;
+ int res;
+
+ while (!pvt->dead) {
+ res = ast_sem_wait(&pvt->sem);
+ if (res != 0 && errno != EINTR) {
+ ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
+ strerror(errno));
+ /* Just give up */
+ break;
}
+ ast_taskprocessor_execute(tps);
}
+
+ /* No posting to a dead taskprocessor! */
+ res = ast_sem_getvalue(&pvt->sem, &sem_value);
+ ast_assert(res == 0 && sem_value == 0);
+
+ /* Free the shutdown reference (see default_listener_shutdown) */
+ ao2_t_ref(listener->tps, -1, "tps-shutdown");
+
return NULL;
}
@@ -186,7 +198,7 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+ if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
return -1;
}
@@ -197,33 +209,50 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- ast_assert(!pvt->dead);
-
- if (was_empty) {
- default_tps_wake_up(pvt, 0);
+ if (ast_sem_post(&pvt->sem) != 0) {
+ ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
+ strerror(errno));
}
}
-static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+static int default_listener_die(void *data)
{
- ast_mutex_destroy(&pvt->lock);
- ast_cond_destroy(&pvt->cond);
- ast_free(pvt);
+ struct default_taskprocessor_listener_pvt *pvt = data;
+ pvt->dead = 1;
+ return 0;
}
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- default_tps_wake_up(pvt, 1);
- pthread_join(pvt->poll_thread, NULL);
+ int res;
+
+ /* Hold a reference during shutdown */
+ ao2_t_ref(listener->tps, +1, "tps-shutdown");
+
+ ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+
+ if (pthread_self() == pvt->poll_thread) {
+ res = pthread_detach(pvt->poll_thread);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_detach(): %s\n",
+ strerror(errno));
+ }
+ } else {
+ res = pthread_join(pvt->poll_thread, NULL);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_join(): %s\n",
+ strerror(errno));
+ }
+ }
pvt->poll_thread = AST_PTHREADT_NULL;
- default_listener_pvt_destroy(pvt);
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
.start = default_listener_start,
.task_pushed = default_task_pushed,
.shutdown = default_listener_shutdown,
+ .dtor = default_listener_pvt_dtor,
};
/*!
@@ -258,19 +287,48 @@ int ast_tps_init(void)
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
- if ((t = ast_calloc(1, sizeof(*t)))) {
- t->execute = task_exe;
- t->datap = datap;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
+ }
+
+ t->callback.execute = task_exe;
+ t->datap = datap;
+
+ return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+ struct tps_task *t;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
}
+
+ t->callback.execute_local = task_exe;
+ t->datap = datap;
+ t->wants_local = 1;
+
return t;
}
/* release task resources */
static void *tps_task_free(struct tps_task *task)
{
- if (task) {
- ast_free(task);
- }
+ ast_free(task);
return NULL;
}
@@ -425,16 +483,10 @@ static void tps_taskprocessor_destroy(void *tps)
}
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
/* free it */
- if (t->stats) {
- ast_free(t->stats);
- t->stats = NULL;
- }
+ ast_free(t->stats);
+ t->stats = NULL;
ast_free((char *) t->name);
if (t->listener) {
- /* This code should not be reached since the listener
- * should have been destroyed before the taskprocessor could
- * be destroyed
- */
ao2_ref(t->listener, -1);
t->listener = NULL;
}
@@ -447,7 +499,6 @@ static void tps_taskprocessor_destroy(void *tps)
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
{
struct tps_task *task;
- SCOPED_AO2LOCK(lock, tps);
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
@@ -476,10 +527,21 @@ static void listener_shutdown(struct ast_taskprocessor_listener *listener)
ao2_ref(listener->tps, -1);
}
+static void taskprocessor_listener_dtor(void *obj)
+{
+ struct ast_taskprocessor_listener *listener = obj;
+
+ if (listener->callbacks->dtor) {
+ listener->callbacks->dtor(listener);
+ }
+}
+
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
{
RAII_VAR(struct ast_taskprocessor_listener *, listener,
- ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
+ NULL, ao2_cleanup);
+
+ listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
if (!listener) {
return NULL;
@@ -510,9 +572,12 @@ static void *default_listener_pvt_alloc(void)
if (!pvt) {
return NULL;
}
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
pvt->poll_thread = AST_PTHREADT_NULL;
+ if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
+ ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+ ast_free(pvt);
+ return NULL;
+ }
return pvt;
}
@@ -594,7 +659,6 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
p = __allocate_taskprocessor(name, listener);
if (!p) {
- default_listener_pvt_destroy(pvt);
ao2_ref(listener, -1);
return NULL;
}
@@ -615,6 +679,13 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
return __allocate_taskprocessor(name, listener);
}
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+ void *local_data)
+{
+ SCOPED_AO2LOCK(lock, tps);
+ tps->local_data = local_data;
+}
+
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
@@ -636,20 +707,21 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
}
/* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
{
- struct tps_task *t;
int previous_size;
int was_empty;
- if (!tps || !task_exe) {
- ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+ if (!tps) {
+ ast_log(LOG_ERROR, "tps is NULL!\n");
return -1;
}
- if (!(t = tps_task_alloc(task_exe, datap))) {
- ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
+
+ if (!t) {
+ ast_log(LOG_ERROR, "t is NULL!\n");
return -1;
}
+
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
@@ -660,21 +732,43 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
return 0;
}
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
+ struct ast_taskprocessor_local local;
struct tps_task *t;
int size;
ao2_lock(tps);
+ t = tps_taskprocessor_pop(tps);
+ if (!t) {
+ ao2_unlock(tps);
+ return 0;
+ }
+
tps->executing = 1;
- ao2_unlock(tps);
- t = tps_taskprocessor_pop(tps);
+ if (t->wants_local) {
+ local.local_data = tps->local_data;
+ local.data = t->datap;
+ }
+ ao2_unlock(tps);
- if (t) {
- t->execute(t->datap);
- tps_task_free(t);
+ if (t->wants_local) {
+ t->callback.execute_local(&local);
+ } else {
+ t->callback.execute(t->datap);
}
+ tps_task_free(t);
ao2_lock(tps);
/* We need to check size in the same critical section where we reset the
@@ -684,7 +778,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
tps->executing = 0;
size = tps_taskprocessor_depth(tps);
/* If we executed a task, bump the stats */
- if (t && tps->stats) {
+ if (tps->stats) {
tps->stats->_tasks_processed_count++;
if (size > tps->stats->max_qsize) {
tps->stats->max_qsize = size;
@@ -693,7 +787,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
ao2_unlock(tps);
/* If we executed a task, check for the transition to empty */
- if (t && size == 0 && tps->listener->callbacks->emptied) {
+ if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
}
return size > 0;