summaryrefslogtreecommitdiff
path: root/main/devicestate.c
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2014-03-07 20:41:13 +0000
committerRichard Mudgett <rmudgett@digium.com>2014-03-07 20:41:13 +0000
commit4ad1245cb5fd9ea0f54c866448ad5c18bcb84fa5 (patch)
treecbf0d2bd6a65e63d1d9a37193754d6476770ad54 /main/devicestate.c
parentecbd0527417115936b06b57be1a07b3607e31a85 (diff)
stasis cache: Enhance to keep track of an item from different entities.
A stasis cache entry now contains more than a single message/snapshot. It contains messages/snapshots for the local entity as well as any remote entities that post to the cached item. In addition callbacks can be supplied when the cache is created to compute and post the aggregate message/snapshot representing all entities stored in the cache entry. * All stasis messages now have an eid to indicate what entity posted it. * The stasis cache enhancements allow device state to cache and aggregate the device states from local and remote entities in a single operation. The cached aggregate device state is available immediately after it is posted to the stasis bus. This improves performance by eliminating a cache dump and associated ao2 container traversals to calculate the aggregate state. (closes issue ASTERISK-23204) Reported by: Mark Michelson Review: https://reviewboard.asterisk.org/r/3281/ ........ Merged revisions 410184 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@410185 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/devicestate.c')
-rw-r--r--main/devicestate.c327
1 files changed, 183 insertions, 144 deletions
diff --git a/main/devicestate.c b/main/devicestate.c
index 1f0c968aa..5c2340863 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -282,16 +282,20 @@ enum ast_device_state ast_parse_device_state(const char *device)
static enum ast_device_state devstate_cached(const char *device)
{
- RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
+ struct stasis_message *cached_msg;
struct ast_device_state_message *device_state;
+ enum ast_device_state state;
- cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
+ cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+ ast_device_state_message_type(), device, NULL);
if (!cached_msg) {
return AST_DEVICE_UNKNOWN;
}
device_state = stasis_message_data(cached_msg);
+ state = device_state->state;
+ ao2_cleanup(cached_msg);
- return device_state->state;
+ return state;
}
/*! \brief Check device state through channel specific function or generic function */
@@ -522,148 +526,62 @@ static void *do_devstate_changes(void *data)
return NULL;
}
-#define MAX_SERVERS 64
-static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
-{
- struct stasis_message *msg = obj;
- struct ast_devstate_aggregate *aggregate = arg;
- char *device = data;
- struct ast_device_state_message *device_state = stasis_message_data(msg);
-
- if (!device_state->eid || strcmp(device, device_state->device)) {
- /* ignore aggregate states and devices that don't match */
- return 0;
- }
- ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
- ast_devstate2str(device_state->state), device);
- ast_devstate_aggregate_add(aggregate, device_state->state);
- return 0;
-}
-
-static void device_state_dtor(void *obj)
-{
- struct ast_device_state_message *device_state = obj;
- ast_string_field_free_memory(device_state);
- ast_free(device_state->eid);
-}
-
static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid)
{
- RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup);
+ struct ast_device_state_message *new_device_state;
+ char *pos;
+ size_t stuff_len;
+
+ ast_assert(!ast_strlen_zero(device));
- if (!new_device_state || ast_string_field_init(new_device_state, 256)) {
+ stuff_len = strlen(device) + 1;
+ if (eid) {
+ stuff_len += sizeof(*eid);
+ }
+ new_device_state = ao2_alloc_options(sizeof(*new_device_state) + stuff_len, NULL,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!new_device_state) {
return NULL;
}
- ast_string_field_set(new_device_state, device, device);
- new_device_state->state = state;
- new_device_state->cachable = cachable;
-
if (eid) {
- char eid_str[20];
- struct ast_str *cache_id = ast_str_alloca(256);
-
- new_device_state->eid = ast_malloc(sizeof(*eid));
- if (!new_device_state->eid) {
- return NULL;
- }
-
- *new_device_state->eid = *eid;
- ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid);
- ast_str_set(&cache_id, 0, "%s%s", eid_str, device);
- ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id));
+ /* non-aggregate device state. */
+ new_device_state->stuff[0] = *eid;
+ new_device_state->eid = &new_device_state->stuff[0];
+ pos = (char *) &new_device_state->stuff[1];
} else {
- /* no EID makes this an aggregate state */
- ast_string_field_set(new_device_state, cache_id, device);
+ pos = (char *) &new_device_state->stuff[0];
}
- ao2_ref(new_device_state, +1);
- return new_device_state;
-}
-
-static enum ast_device_state get_aggregate_state(char *device)
-{
- RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
- struct ast_devstate_aggregate aggregate;
-
- ast_devstate_aggregate_init(&aggregate);
-
- cached = stasis_cache_dump(ast_device_state_cache(), NULL);
-
- ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
-
- return ast_devstate_aggregate_result(&aggregate);
-}
-
-static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state)
-{
- RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
- struct ast_device_state_message *cached_aggregate_device_state;
+ strcpy(pos, device);/* Safe */
+ new_device_state->device = pos;
- cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
- if (!cached_aggregate_msg) {
- return 1;
- }
+ new_device_state->state = state;
+ new_device_state->cachable = cachable;
- cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg);
- if (cached_aggregate_device_state->state == new_aggregate_state) {
- return 0;
- }
- return 1;
+ return new_device_state;
}
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
+static void devstate_change_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
- enum ast_device_state aggregate_state;
- char *device;
struct ast_device_state_message *device_state;
- RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup);
- RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup);
-
- if (stasis_cache_update_type() == stasis_message_type(msg)) {
- struct stasis_cache_update *update = stasis_message_data(msg);
- if (!update->new_snapshot) {
- return;
- }
- msg = update->new_snapshot;
- }
if (ast_device_state_message_type() != stasis_message_type(msg)) {
return;
}
device_state = stasis_message_data(msg);
-
- if (!device_state->eid) {
- /* ignore aggregate messages */
+ if (device_state->cachable == AST_DEVSTATE_CACHABLE || !device_state->eid) {
+ /* Ignore cacheable and aggregate messages. */
return;
}
- device = ast_strdupa(device_state->device);
- ast_debug(1, "Processing device state change for '%s'\n", device);
-
- if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
- /* if it's not cachable, there will be no aggregate state to get
- * and this should be passed through */
- aggregate_state = device_state->state;
- } else {
-
- aggregate_state = get_aggregate_state(device);
- ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
- ast_devstate2str(aggregate_state), device);
-
- if (!aggregate_state_changed(device, aggregate_state)) {
- /* No change since last reported device state */
- ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
- device, ast_devstate2str(aggregate_state));
- return;
- }
- }
-
- ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
- device, ast_devstate2str(aggregate_state));
-
- ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL);
+ /*
+ * Non-cacheable device state aggregates are just the
+ * device state republished as the aggregate.
+ */
+ ast_publish_device_state_full(device_state->device, device_state->state,
+ device_state->cachable, NULL);
}
/*! \brief Initialize the device state engine in separate thread */
@@ -738,25 +656,30 @@ struct stasis_topic *ast_device_state_topic(const char *device)
int ast_device_state_clear_cache(const char *device)
{
- RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+ struct stasis_message *cached_msg;
+ struct stasis_message *msg;
- if (!(cached_msg = stasis_cache_get(ast_device_state_cache(),
- ast_device_state_message_type(), device))) {
+ cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+ ast_device_state_message_type(), device, &ast_eid_default);
+ if (!cached_msg) {
/* nothing to clear */
return -1;
}
msg = stasis_cache_clear_create(cached_msg);
- stasis_publish(ast_device_state_topic(device), msg);
+ if (msg) {
+ stasis_publish(ast_device_state_topic(device), msg);
+ }
+ ao2_cleanup(msg);
+ ao2_cleanup(cached_msg);
return 0;
}
int ast_publish_device_state_full(
- const char *device,
- enum ast_device_state state,
- enum ast_devstate_cache cachable,
- struct ast_eid *eid)
+ const char *device,
+ enum ast_device_state state,
+ enum ast_devstate_cache cachable,
+ struct ast_eid *eid)
{
RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
@@ -769,7 +692,11 @@ int ast_publish_device_state_full(
return -1;
}
- message = stasis_message_create(ast_device_state_message_type(), device_state);
+ message = stasis_message_create_full(ast_device_state_message_type(), device_state,
+ eid);
+ if (!message) {
+ return -1;
+ }
device_specific_topic = ast_device_state_topic(device);
if (!device_specific_topic) {
@@ -783,6 +710,7 @@ int ast_publish_device_state_full(
static const char *device_state_get_id(struct stasis_message *message)
{
struct ast_device_state_message *device_state;
+
if (ast_device_state_message_type() != stasis_message_type(message)) {
return NULL;
}
@@ -792,20 +720,124 @@ static const char *device_state_get_id(struct stasis_message *message)
return NULL;
}
- return device_state->cache_id;
+ return device_state->device;
+}
+
+/*!
+ * \internal
+ * \brief Callback to publish the aggregate device state cache entry message.
+ * \since 12.2.0
+ *
+ * \param cache_topic Caching topic the aggregate message may be published over.
+ * \param aggregate The aggregate shapshot message to publish.
+ *
+ * \return Nothing
+ */
+static void device_state_aggregate_publish(struct stasis_topic *cache_topic, struct stasis_message *aggregate)
+{
+ const char *device;
+ struct stasis_topic *device_specific_topic;
+
+ device = device_state_get_id(aggregate);
+ if (!device) {
+ return;
+ }
+ device_specific_topic = ast_device_state_topic(device);
+ if (!device_specific_topic) {
+ return;
+ }
+
+ stasis_publish(device_specific_topic, aggregate);
+}
+
+/*!
+ * \internal
+ * \brief Callback to calculate the aggregate device state cache entry.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to calculate a new aggregate snapshot.
+ * \param new_snapshot The shapshot that is being updated.
+ *
+ * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
+ * if a new aggregate could not be calculated because of error.
+ *
+ * \return New aggregate-snapshot calculated on success.
+ * Caller has a reference on return.
+ */
+static struct stasis_message *device_state_aggregate_calc(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
+{
+ struct stasis_message *aggregate_snapshot;
+ struct stasis_message *snapshot;
+ struct ast_device_state_message *device_state;
+ const char *device = NULL;
+ struct ast_devstate_aggregate aggregate;
+ int idx;
+
+ /* Determine the new aggregate device state. */
+ ast_devstate_aggregate_init(&aggregate);
+ snapshot = stasis_cache_entry_get_local(entry);
+ if (snapshot) {
+ device_state = stasis_message_data(snapshot);
+ device = device_state->device;
+ ast_devstate_aggregate_add(&aggregate, device_state->state);
+ }
+ for (idx = 0; ; ++idx) {
+ snapshot = stasis_cache_entry_get_remote(entry, idx);
+ if (!snapshot) {
+ break;
+ }
+
+ device_state = stasis_message_data(snapshot);
+ device = device_state->device;
+ ast_devstate_aggregate_add(&aggregate, device_state->state);
+ }
+
+ if (!device) {
+ /* There are no device states cached. Delete the aggregate. */
+ return NULL;
+ }
+
+ snapshot = stasis_cache_entry_get_aggregate(entry);
+ if (snapshot) {
+ device_state = stasis_message_data(snapshot);
+ if (device_state->state == ast_devstate_aggregate_result(&aggregate)) {
+ /* Aggregate device state did not change. */
+ return ao2_bump(snapshot);
+ }
+ }
+
+ device_state = device_state_alloc(device, ast_devstate_aggregate_result(&aggregate),
+ AST_DEVSTATE_CACHABLE, NULL);
+ if (!device_state) {
+ /* Bummer. We have to keep the old aggregate snapshot. */
+ return ao2_bump(snapshot);
+ }
+ aggregate_snapshot = stasis_message_create_full(ast_device_state_message_type(),
+ device_state, NULL);
+ ao2_cleanup(device_state);
+ if (!aggregate_snapshot) {
+ /* Bummer. We have to keep the old aggregate snapshot. */
+ return ao2_bump(snapshot);
+ }
+
+ return aggregate_snapshot;
}
static void devstate_cleanup(void)
{
devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub);
- ao2_cleanup(device_state_topic_all);
- device_state_topic_all = NULL;
+ device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
+
ao2_cleanup(device_state_cache);
device_state_cache = NULL;
- device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
+
ao2_cleanup(device_state_topic_pool);
device_state_topic_pool = NULL;
+
+ ao2_cleanup(device_state_topic_all);
+ device_state_topic_all = NULL;
+
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
}
int devstate_init(void)
@@ -817,25 +849,32 @@ int devstate_init(void)
}
device_state_topic_all = stasis_topic_create("ast_device_state_topic");
if (!device_state_topic_all) {
+ devstate_cleanup();
return -1;
}
- device_state_cache = stasis_cache_create(device_state_get_id);
- if (!device_state_cache) {
+ device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
+ if (!device_state_topic_pool) {
+ devstate_cleanup();
return -1;
}
- device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache);
- if (!device_state_topic_cached) {
+ device_state_cache = stasis_cache_create_full(device_state_get_id,
+ device_state_aggregate_calc, device_state_aggregate_publish);
+ if (!device_state_cache) {
+ devstate_cleanup();
return -1;
}
- device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
- if (!device_state_topic_pool) {
+ device_state_topic_cached = stasis_caching_topic_create(ast_device_state_topic_all(),
+ device_state_cache);
+ if (!device_state_topic_cached) {
+ devstate_cleanup();
return -1;
}
- devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL);
-
+ devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
+ devstate_change_cb, NULL);
if (!devstate_message_sub) {
- ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
+ ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
+ devstate_cleanup();
return -1;
}