diff options
Diffstat (limited to 'main/devicestate.c')
-rw-r--r-- | main/devicestate.c | 327 |
1 files changed, 183 insertions, 144 deletions
diff --git a/main/devicestate.c b/main/devicestate.c index 1f0c968aa..5c2340863 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -282,16 +282,20 @@ enum ast_device_state ast_parse_device_state(const char *device) static enum ast_device_state devstate_cached(const char *device) { - RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup); + struct stasis_message *cached_msg; struct ast_device_state_message *device_state; + enum ast_device_state state; - cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device); + cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(), + ast_device_state_message_type(), device, NULL); if (!cached_msg) { return AST_DEVICE_UNKNOWN; } device_state = stasis_message_data(cached_msg); + state = device_state->state; + ao2_cleanup(cached_msg); - return device_state->state; + return state; } /*! \brief Check device state through channel specific function or generic function */ @@ -522,148 +526,62 @@ static void *do_devstate_changes(void *data) return NULL; } -#define MAX_SERVERS 64 -static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags) -{ - struct stasis_message *msg = obj; - struct ast_devstate_aggregate *aggregate = arg; - char *device = data; - struct ast_device_state_message *device_state = stasis_message_data(msg); - - if (!device_state->eid || strcmp(device, device_state->device)) { - /* ignore aggregate states and devices that don't match */ - return 0; - } - ast_debug(1, "Adding per-server state of '%s' for '%s'\n", - ast_devstate2str(device_state->state), device); - ast_devstate_aggregate_add(aggregate, device_state->state); - return 0; -} - -static void device_state_dtor(void *obj) -{ - struct ast_device_state_message *device_state = obj; - ast_string_field_free_memory(device_state); - ast_free(device_state->eid); -} - static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid) { - RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup); + struct ast_device_state_message *new_device_state; + char *pos; + size_t stuff_len; + + ast_assert(!ast_strlen_zero(device)); - if (!new_device_state || ast_string_field_init(new_device_state, 256)) { + stuff_len = strlen(device) + 1; + if (eid) { + stuff_len += sizeof(*eid); + } + new_device_state = ao2_alloc_options(sizeof(*new_device_state) + stuff_len, NULL, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!new_device_state) { return NULL; } - ast_string_field_set(new_device_state, device, device); - new_device_state->state = state; - new_device_state->cachable = cachable; - if (eid) { - char eid_str[20]; - struct ast_str *cache_id = ast_str_alloca(256); - - new_device_state->eid = ast_malloc(sizeof(*eid)); - if (!new_device_state->eid) { - return NULL; - } - - *new_device_state->eid = *eid; - ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid); - ast_str_set(&cache_id, 0, "%s%s", eid_str, device); - ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id)); + /* non-aggregate device state. */ + new_device_state->stuff[0] = *eid; + new_device_state->eid = &new_device_state->stuff[0]; + pos = (char *) &new_device_state->stuff[1]; } else { - /* no EID makes this an aggregate state */ - ast_string_field_set(new_device_state, cache_id, device); + pos = (char *) &new_device_state->stuff[0]; } - ao2_ref(new_device_state, +1); - return new_device_state; -} - -static enum ast_device_state get_aggregate_state(char *device) -{ - RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); - struct ast_devstate_aggregate aggregate; - - ast_devstate_aggregate_init(&aggregate); - - cached = stasis_cache_dump(ast_device_state_cache(), NULL); - - ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device); - - return ast_devstate_aggregate_result(&aggregate); -} - -static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state) -{ - RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup); - struct ast_device_state_message *cached_aggregate_device_state; + strcpy(pos, device);/* Safe */ + new_device_state->device = pos; - cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device); - if (!cached_aggregate_msg) { - return 1; - } + new_device_state->state = state; + new_device_state->cachable = cachable; - cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg); - if (cached_aggregate_device_state->state == new_aggregate_state) { - return 0; - } - return 1; + return new_device_state; } -static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) +static void devstate_change_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) { - enum ast_device_state aggregate_state; - char *device; struct ast_device_state_message *device_state; - RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup); - RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup); - - if (stasis_cache_update_type() == stasis_message_type(msg)) { - struct stasis_cache_update *update = stasis_message_data(msg); - if (!update->new_snapshot) { - return; - } - msg = update->new_snapshot; - } if (ast_device_state_message_type() != stasis_message_type(msg)) { return; } device_state = stasis_message_data(msg); - - if (!device_state->eid) { - /* ignore aggregate messages */ + if (device_state->cachable == AST_DEVSTATE_CACHABLE || !device_state->eid) { + /* Ignore cacheable and aggregate messages. */ return; } - device = ast_strdupa(device_state->device); - ast_debug(1, "Processing device state change for '%s'\n", device); - - if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) { - /* if it's not cachable, there will be no aggregate state to get - * and this should be passed through */ - aggregate_state = device_state->state; - } else { - - aggregate_state = get_aggregate_state(device); - ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n", - ast_devstate2str(aggregate_state), device); - - if (!aggregate_state_changed(device, aggregate_state)) { - /* No change since last reported device state */ - ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n", - device, ast_devstate2str(aggregate_state)); - return; - } - } - - ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n", - device, ast_devstate2str(aggregate_state)); - - ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL); + /* + * Non-cacheable device state aggregates are just the + * device state republished as the aggregate. + */ + ast_publish_device_state_full(device_state->device, device_state->state, + device_state->cachable, NULL); } /*! \brief Initialize the device state engine in separate thread */ @@ -738,25 +656,30 @@ struct stasis_topic *ast_device_state_topic(const char *device) int ast_device_state_clear_cache(const char *device) { - RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct stasis_message *cached_msg; + struct stasis_message *msg; - if (!(cached_msg = stasis_cache_get(ast_device_state_cache(), - ast_device_state_message_type(), device))) { + cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(), + ast_device_state_message_type(), device, &ast_eid_default); + if (!cached_msg) { /* nothing to clear */ return -1; } msg = stasis_cache_clear_create(cached_msg); - stasis_publish(ast_device_state_topic(device), msg); + if (msg) { + stasis_publish(ast_device_state_topic(device), msg); + } + ao2_cleanup(msg); + ao2_cleanup(cached_msg); return 0; } int ast_publish_device_state_full( - const char *device, - enum ast_device_state state, - enum ast_devstate_cache cachable, - struct ast_eid *eid) + const char *device, + enum ast_device_state state, + enum ast_devstate_cache cachable, + struct ast_eid *eid) { RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); @@ -769,7 +692,11 @@ int ast_publish_device_state_full( return -1; } - message = stasis_message_create(ast_device_state_message_type(), device_state); + message = stasis_message_create_full(ast_device_state_message_type(), device_state, + eid); + if (!message) { + return -1; + } device_specific_topic = ast_device_state_topic(device); if (!device_specific_topic) { @@ -783,6 +710,7 @@ int ast_publish_device_state_full( static const char *device_state_get_id(struct stasis_message *message) { struct ast_device_state_message *device_state; + if (ast_device_state_message_type() != stasis_message_type(message)) { return NULL; } @@ -792,20 +720,124 @@ static const char *device_state_get_id(struct stasis_message *message) return NULL; } - return device_state->cache_id; + return device_state->device; +} + +/*! + * \internal + * \brief Callback to publish the aggregate device state cache entry message. + * \since 12.2.0 + * + * \param cache_topic Caching topic the aggregate message may be published over. + * \param aggregate The aggregate shapshot message to publish. + * + * \return Nothing + */ +static void device_state_aggregate_publish(struct stasis_topic *cache_topic, struct stasis_message *aggregate) +{ + const char *device; + struct stasis_topic *device_specific_topic; + + device = device_state_get_id(aggregate); + if (!device) { + return; + } + device_specific_topic = ast_device_state_topic(device); + if (!device_specific_topic) { + return; + } + + stasis_publish(device_specific_topic, aggregate); +} + +/*! + * \internal + * \brief Callback to calculate the aggregate device state cache entry. + * \since 12.2.0 + * + * \param entry Cache entry to calculate a new aggregate snapshot. + * \param new_snapshot The shapshot that is being updated. + * + * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate() + * if a new aggregate could not be calculated because of error. + * + * \return New aggregate-snapshot calculated on success. + * Caller has a reference on return. + */ +static struct stasis_message *device_state_aggregate_calc(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot) +{ + struct stasis_message *aggregate_snapshot; + struct stasis_message *snapshot; + struct ast_device_state_message *device_state; + const char *device = NULL; + struct ast_devstate_aggregate aggregate; + int idx; + + /* Determine the new aggregate device state. */ + ast_devstate_aggregate_init(&aggregate); + snapshot = stasis_cache_entry_get_local(entry); + if (snapshot) { + device_state = stasis_message_data(snapshot); + device = device_state->device; + ast_devstate_aggregate_add(&aggregate, device_state->state); + } + for (idx = 0; ; ++idx) { + snapshot = stasis_cache_entry_get_remote(entry, idx); + if (!snapshot) { + break; + } + + device_state = stasis_message_data(snapshot); + device = device_state->device; + ast_devstate_aggregate_add(&aggregate, device_state->state); + } + + if (!device) { + /* There are no device states cached. Delete the aggregate. */ + return NULL; + } + + snapshot = stasis_cache_entry_get_aggregate(entry); + if (snapshot) { + device_state = stasis_message_data(snapshot); + if (device_state->state == ast_devstate_aggregate_result(&aggregate)) { + /* Aggregate device state did not change. */ + return ao2_bump(snapshot); + } + } + + device_state = device_state_alloc(device, ast_devstate_aggregate_result(&aggregate), + AST_DEVSTATE_CACHABLE, NULL); + if (!device_state) { + /* Bummer. We have to keep the old aggregate snapshot. */ + return ao2_bump(snapshot); + } + aggregate_snapshot = stasis_message_create_full(ast_device_state_message_type(), + device_state, NULL); + ao2_cleanup(device_state); + if (!aggregate_snapshot) { + /* Bummer. We have to keep the old aggregate snapshot. */ + return ao2_bump(snapshot); + } + + return aggregate_snapshot; } static void devstate_cleanup(void) { devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub); - ao2_cleanup(device_state_topic_all); - device_state_topic_all = NULL; + device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached); + ao2_cleanup(device_state_cache); device_state_cache = NULL; - device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached); - STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type); + ao2_cleanup(device_state_topic_pool); device_state_topic_pool = NULL; + + ao2_cleanup(device_state_topic_all); + device_state_topic_all = NULL; + + STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type); } int devstate_init(void) @@ -817,25 +849,32 @@ int devstate_init(void) } device_state_topic_all = stasis_topic_create("ast_device_state_topic"); if (!device_state_topic_all) { + devstate_cleanup(); return -1; } - device_state_cache = stasis_cache_create(device_state_get_id); - if (!device_state_cache) { + device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all()); + if (!device_state_topic_pool) { + devstate_cleanup(); return -1; } - device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache); - if (!device_state_topic_cached) { + device_state_cache = stasis_cache_create_full(device_state_get_id, + device_state_aggregate_calc, device_state_aggregate_publish); + if (!device_state_cache) { + devstate_cleanup(); return -1; } - device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all()); - if (!device_state_topic_pool) { + device_state_topic_cached = stasis_caching_topic_create(ast_device_state_topic_all(), + device_state_cache); + if (!device_state_topic_cached) { + devstate_cleanup(); return -1; } - devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL); - + devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(), + devstate_change_cb, NULL); if (!devstate_message_sub) { - ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); + ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n"); + devstate_cleanup(); return -1; } |