summaryrefslogtreecommitdiff
path: root/main/devicestate.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/devicestate.c')
-rw-r--r--main/devicestate.c327
1 files changed, 183 insertions, 144 deletions
diff --git a/main/devicestate.c b/main/devicestate.c
index 1f0c968aa..5c2340863 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -282,16 +282,20 @@ enum ast_device_state ast_parse_device_state(const char *device)
static enum ast_device_state devstate_cached(const char *device)
{
- RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
+ struct stasis_message *cached_msg;
struct ast_device_state_message *device_state;
+ enum ast_device_state state;
- cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
+ cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+ ast_device_state_message_type(), device, NULL);
if (!cached_msg) {
return AST_DEVICE_UNKNOWN;
}
device_state = stasis_message_data(cached_msg);
+ state = device_state->state;
+ ao2_cleanup(cached_msg);
- return device_state->state;
+ return state;
}
/*! \brief Check device state through channel specific function or generic function */
@@ -522,148 +526,62 @@ static void *do_devstate_changes(void *data)
return NULL;
}
-#define MAX_SERVERS 64
-static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
-{
- struct stasis_message *msg = obj;
- struct ast_devstate_aggregate *aggregate = arg;
- char *device = data;
- struct ast_device_state_message *device_state = stasis_message_data(msg);
-
- if (!device_state->eid || strcmp(device, device_state->device)) {
- /* ignore aggregate states and devices that don't match */
- return 0;
- }
- ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
- ast_devstate2str(device_state->state), device);
- ast_devstate_aggregate_add(aggregate, device_state->state);
- return 0;
-}
-
-static void device_state_dtor(void *obj)
-{
- struct ast_device_state_message *device_state = obj;
- ast_string_field_free_memory(device_state);
- ast_free(device_state->eid);
-}
-
static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid)
{
- RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup);
+ struct ast_device_state_message *new_device_state;
+ char *pos;
+ size_t stuff_len;
+
+ ast_assert(!ast_strlen_zero(device));
- if (!new_device_state || ast_string_field_init(new_device_state, 256)) {
+ stuff_len = strlen(device) + 1;
+ if (eid) {
+ stuff_len += sizeof(*eid);
+ }
+ new_device_state = ao2_alloc_options(sizeof(*new_device_state) + stuff_len, NULL,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!new_device_state) {
return NULL;
}
- ast_string_field_set(new_device_state, device, device);
- new_device_state->state = state;
- new_device_state->cachable = cachable;
-
if (eid) {
- char eid_str[20];
- struct ast_str *cache_id = ast_str_alloca(256);
-
- new_device_state->eid = ast_malloc(sizeof(*eid));
- if (!new_device_state->eid) {
- return NULL;
- }
-
- *new_device_state->eid = *eid;
- ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid);
- ast_str_set(&cache_id, 0, "%s%s", eid_str, device);
- ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id));
+ /* non-aggregate device state. */
+ new_device_state->stuff[0] = *eid;
+ new_device_state->eid = &new_device_state->stuff[0];
+ pos = (char *) &new_device_state->stuff[1];
} else {
- /* no EID makes this an aggregate state */
- ast_string_field_set(new_device_state, cache_id, device);
+ pos = (char *) &new_device_state->stuff[0];
}
- ao2_ref(new_device_state, +1);
- return new_device_state;
-}
-
-static enum ast_device_state get_aggregate_state(char *device)
-{
- RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
- struct ast_devstate_aggregate aggregate;
-
- ast_devstate_aggregate_init(&aggregate);
-
- cached = stasis_cache_dump(ast_device_state_cache(), NULL);
-
- ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
-
- return ast_devstate_aggregate_result(&aggregate);
-}
-
-static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state)
-{
- RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
- struct ast_device_state_message *cached_aggregate_device_state;
+ strcpy(pos, device);/* Safe */
+ new_device_state->device = pos;
- cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
- if (!cached_aggregate_msg) {
- return 1;
- }
+ new_device_state->state = state;
+ new_device_state->cachable = cachable;
- cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg);
- if (cached_aggregate_device_state->state == new_aggregate_state) {
- return 0;
- }
- return 1;
+ return new_device_state;
}
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
+static void devstate_change_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
- enum ast_device_state aggregate_state;
- char *device;
struct ast_device_state_message *device_state;
- RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup);
- RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup);
-
- if (stasis_cache_update_type() == stasis_message_type(msg)) {
- struct stasis_cache_update *update = stasis_message_data(msg);
- if (!update->new_snapshot) {
- return;
- }
- msg = update->new_snapshot;
- }
if (ast_device_state_message_type() != stasis_message_type(msg)) {
return;
}
device_state = stasis_message_data(msg);
-
- if (!device_state->eid) {
- /* ignore aggregate messages */
+ if (device_state->cachable == AST_DEVSTATE_CACHABLE || !device_state->eid) {
+ /* Ignore cacheable and aggregate messages. */
return;
}
- device = ast_strdupa(device_state->device);
- ast_debug(1, "Processing device state change for '%s'\n", device);
-
- if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
- /* if it's not cachable, there will be no aggregate state to get
- * and this should be passed through */
- aggregate_state = device_state->state;
- } else {
-
- aggregate_state = get_aggregate_state(device);
- ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
- ast_devstate2str(aggregate_state), device);
-
- if (!aggregate_state_changed(device, aggregate_state)) {
- /* No change since last reported device state */
- ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
- device, ast_devstate2str(aggregate_state));
- return;
- }
- }
-
- ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
- device, ast_devstate2str(aggregate_state));
-
- ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL);
+ /*
+ * Non-cacheable device state aggregates are just the
+ * device state republished as the aggregate.
+ */
+ ast_publish_device_state_full(device_state->device, device_state->state,
+ device_state->cachable, NULL);
}
/*! \brief Initialize the device state engine in separate thread */
@@ -738,25 +656,30 @@ struct stasis_topic *ast_device_state_topic(const char *device)
int ast_device_state_clear_cache(const char *device)
{
- RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+ struct stasis_message *cached_msg;
+ struct stasis_message *msg;
- if (!(cached_msg = stasis_cache_get(ast_device_state_cache(),
- ast_device_state_message_type(), device))) {
+ cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+ ast_device_state_message_type(), device, &ast_eid_default);
+ if (!cached_msg) {
/* nothing to clear */
return -1;
}
msg = stasis_cache_clear_create(cached_msg);
- stasis_publish(ast_device_state_topic(device), msg);
+ if (msg) {
+ stasis_publish(ast_device_state_topic(device), msg);
+ }
+ ao2_cleanup(msg);
+ ao2_cleanup(cached_msg);
return 0;
}
int ast_publish_device_state_full(
- const char *device,
- enum ast_device_state state,
- enum ast_devstate_cache cachable,
- struct ast_eid *eid)
+ const char *device,
+ enum ast_device_state state,
+ enum ast_devstate_cache cachable,
+ struct ast_eid *eid)
{
RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
@@ -769,7 +692,11 @@ int ast_publish_device_state_full(
return -1;
}
- message = stasis_message_create(ast_device_state_message_type(), device_state);
+ message = stasis_message_create_full(ast_device_state_message_type(), device_state,
+ eid);
+ if (!message) {
+ return -1;
+ }
device_specific_topic = ast_device_state_topic(device);
if (!device_specific_topic) {
@@ -783,6 +710,7 @@ int ast_publish_device_state_full(
static const char *device_state_get_id(struct stasis_message *message)
{
struct ast_device_state_message *device_state;
+
if (ast_device_state_message_type() != stasis_message_type(message)) {
return NULL;
}
@@ -792,20 +720,124 @@ static const char *device_state_get_id(struct stasis_message *message)
return NULL;
}
- return device_state->cache_id;
+ return device_state->device;
+}
+
+/*!
+ * \internal
+ * \brief Callback to publish the aggregate device state cache entry message.
+ * \since 12.2.0
+ *
+ * \param cache_topic Caching topic the aggregate message may be published over.
+ * \param aggregate The aggregate shapshot message to publish.
+ *
+ * \return Nothing
+ */
+static void device_state_aggregate_publish(struct stasis_topic *cache_topic, struct stasis_message *aggregate)
+{
+ const char *device;
+ struct stasis_topic *device_specific_topic;
+
+ device = device_state_get_id(aggregate);
+ if (!device) {
+ return;
+ }
+ device_specific_topic = ast_device_state_topic(device);
+ if (!device_specific_topic) {
+ return;
+ }
+
+ stasis_publish(device_specific_topic, aggregate);
+}
+
+/*!
+ * \internal
+ * \brief Callback to calculate the aggregate device state cache entry.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to calculate a new aggregate snapshot.
+ * \param new_snapshot The shapshot that is being updated.
+ *
+ * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
+ * if a new aggregate could not be calculated because of error.
+ *
+ * \return New aggregate-snapshot calculated on success.
+ * Caller has a reference on return.
+ */
+static struct stasis_message *device_state_aggregate_calc(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
+{
+ struct stasis_message *aggregate_snapshot;
+ struct stasis_message *snapshot;
+ struct ast_device_state_message *device_state;
+ const char *device = NULL;
+ struct ast_devstate_aggregate aggregate;
+ int idx;
+
+ /* Determine the new aggregate device state. */
+ ast_devstate_aggregate_init(&aggregate);
+ snapshot = stasis_cache_entry_get_local(entry);
+ if (snapshot) {
+ device_state = stasis_message_data(snapshot);
+ device = device_state->device;
+ ast_devstate_aggregate_add(&aggregate, device_state->state);
+ }
+ for (idx = 0; ; ++idx) {
+ snapshot = stasis_cache_entry_get_remote(entry, idx);
+ if (!snapshot) {
+ break;
+ }
+
+ device_state = stasis_message_data(snapshot);
+ device = device_state->device;
+ ast_devstate_aggregate_add(&aggregate, device_state->state);
+ }
+
+ if (!device) {
+ /* There are no device states cached. Delete the aggregate. */
+ return NULL;
+ }
+
+ snapshot = stasis_cache_entry_get_aggregate(entry);
+ if (snapshot) {
+ device_state = stasis_message_data(snapshot);
+ if (device_state->state == ast_devstate_aggregate_result(&aggregate)) {
+ /* Aggregate device state did not change. */
+ return ao2_bump(snapshot);
+ }
+ }
+
+ device_state = device_state_alloc(device, ast_devstate_aggregate_result(&aggregate),
+ AST_DEVSTATE_CACHABLE, NULL);
+ if (!device_state) {
+ /* Bummer. We have to keep the old aggregate snapshot. */
+ return ao2_bump(snapshot);
+ }
+ aggregate_snapshot = stasis_message_create_full(ast_device_state_message_type(),
+ device_state, NULL);
+ ao2_cleanup(device_state);
+ if (!aggregate_snapshot) {
+ /* Bummer. We have to keep the old aggregate snapshot. */
+ return ao2_bump(snapshot);
+ }
+
+ return aggregate_snapshot;
}
static void devstate_cleanup(void)
{
devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub);
- ao2_cleanup(device_state_topic_all);
- device_state_topic_all = NULL;
+ device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
+
ao2_cleanup(device_state_cache);
device_state_cache = NULL;
- device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
- STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
+
ao2_cleanup(device_state_topic_pool);
device_state_topic_pool = NULL;
+
+ ao2_cleanup(device_state_topic_all);
+ device_state_topic_all = NULL;
+
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
}
int devstate_init(void)
@@ -817,25 +849,32 @@ int devstate_init(void)
}
device_state_topic_all = stasis_topic_create("ast_device_state_topic");
if (!device_state_topic_all) {
+ devstate_cleanup();
return -1;
}
- device_state_cache = stasis_cache_create(device_state_get_id);
- if (!device_state_cache) {
+ device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
+ if (!device_state_topic_pool) {
+ devstate_cleanup();
return -1;
}
- device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache);
- if (!device_state_topic_cached) {
+ device_state_cache = stasis_cache_create_full(device_state_get_id,
+ device_state_aggregate_calc, device_state_aggregate_publish);
+ if (!device_state_cache) {
+ devstate_cleanup();
return -1;
}
- device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
- if (!device_state_topic_pool) {
+ device_state_topic_cached = stasis_caching_topic_create(ast_device_state_topic_all(),
+ device_state_cache);
+ if (!device_state_topic_cached) {
+ devstate_cleanup();
return -1;
}
- devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL);
-
+ devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
+ devstate_change_cb, NULL);
if (!devstate_message_sub) {
- ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
+ ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
+ devstate_cleanup();
return -1;
}