diff options
Diffstat (limited to 'main/devicestate.c')
-rw-r--r-- | main/devicestate.c | 417 |
1 files changed, 212 insertions, 205 deletions
diff --git a/main/devicestate.c b/main/devicestate.c index 4ed51f9e3..40b9e55e0 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -129,7 +129,12 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/devicestate.h" #include "asterisk/pbx.h" #include "asterisk/app.h" +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" #include "asterisk/event.h" +#include "asterisk/devicestate.h" + +#define DEVSTATE_TOPIC_BUCKETS 57 /*! \brief Device state strings for printing */ static const char * const devstatestring[][2] = { @@ -188,25 +193,12 @@ static pthread_t change_thread = AST_PTHREADT_NULL; /*! \brief Flag for the queue */ static ast_cond_t change_pending; -struct devstate_change { - AST_LIST_ENTRY(devstate_change) entry; - uint32_t state; - struct ast_eid eid; - enum ast_devstate_cache cachable; - char device[1]; -}; +struct stasis_subscription *devstate_message_sub; -static struct { - pthread_t thread; - struct ast_event_sub *event_sub; - ast_cond_t cond; - ast_mutex_t lock; - AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q; - unsigned int enabled:1; -} devstate_collector = { - .thread = AST_PTHREADT_NULL, - .enabled = 0, -}; +static struct stasis_topic *device_state_topic_all; +static struct stasis_caching_topic *device_state_topic_cached; +static struct stasis_message_type *device_state_message_type; +static struct stasis_topic_pool *device_state_topic_pool; /* Forward declarations */ static int getproviderstate(const char *provider, const char *address); @@ -289,21 +281,16 @@ enum ast_device_state ast_parse_device_state(const char *device) static enum ast_device_state devstate_cached(const char *device) { - enum ast_device_state res = AST_DEVICE_UNKNOWN; - struct ast_event *event; - - event = ast_event_get_cached(AST_EVENT_DEVICE_STATE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_END); - - if (!event) - return res; + RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup); + struct ast_device_state_message *device_state; - res = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - - ast_event_destroy(event); + cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + if (!cached_msg) { + return AST_DEVICE_UNKNOWN; + } + device_state = stasis_message_data(cached_msg); - return res; + return device_state->state; } /*! \brief Check device state through channel specific function or generic function */ @@ -426,39 +413,9 @@ static int getproviderstate(const char *provider, const char *address) return res; } -static void devstate_event(const char *device, enum ast_device_state state, int cachable) -{ - struct ast_event *event; - enum ast_event_type event_type; - - if (devstate_collector.enabled) { - /* Distributed device state is enabled, so this state change is a change - * for a single server, not the real state. */ - event_type = AST_EVENT_DEVICE_STATE_CHANGE; - } else { - event_type = AST_EVENT_DEVICE_STATE; - } - - ast_debug(3, "device '%s' state '%d'\n", device, state); - - if (!(event = ast_event_new(event_type, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state, - AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, cachable, - AST_EVENT_IE_END))) { - return; - } - - if (cachable) { - ast_event_queue_and_cache(event); - } else { - ast_event_queue(event); - } -} - /*! Called by the state change thread to find out what the state is, and then * to queue up the state change event */ -static void do_state_change(const char *device, int cachable) +static void do_state_change(const char *device, enum ast_devstate_cache cachable) { enum ast_device_state state; @@ -466,7 +423,7 @@ static void do_state_change(const char *device, int cachable) ast_debug(3, "Changing state for %s - state %d (%s)\n", device, state, ast_devstate2str(state)); - devstate_event(device, state, cachable); + ast_publish_device_state(device, state, cachable); } int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_cache cachable, const char *device) @@ -490,7 +447,7 @@ int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_ */ if (state != AST_DEVICE_UNKNOWN) { - devstate_event(device, state, cachable); + ast_publish_device_state(device, state, cachable); } else if (change_thread == AST_PTHREADT_NULL || !(change = ast_calloc(1, sizeof(*change) + strlen(device)))) { /* we could not allocate a change struct, or */ /* there is no background thread, so process the change now */ @@ -562,176 +519,148 @@ static void *do_devstate_changes(void *data) return NULL; } -static void destroy_devstate_change(struct devstate_change *sc) -{ - ast_free(sc); -} - #define MAX_SERVERS 64 -struct change_collection { - struct devstate_change states[MAX_SERVERS]; - size_t num_states; -}; - -static void devstate_cache_cb(const struct ast_event *event, void *data) +static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags) { - struct change_collection *collection = data; - int i; - const struct ast_eid *eid; + struct stasis_message *msg = obj; + struct ast_devstate_aggregate *aggregate = arg; + char *device = data; + struct ast_device_state_message *device_state = stasis_message_data(msg); - if (collection->num_states == ARRAY_LEN(collection->states)) { - ast_log(LOG_ERROR, "More per-server state values than we have room for (MAX_SERVERS is %d)\n", - MAX_SERVERS); - return; - } - - if (!(eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { - ast_log(LOG_ERROR, "Device state change event with no EID\n"); - return; + if (!device_state->eid || strcmp(device, device_state->device)) { + /* ignore aggregate states and devices that don't match */ + return 0; } - - i = collection->num_states; - - collection->states[i].state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - collection->states[i].eid = *eid; - - collection->num_states++; + ast_debug(1, "Adding per-server state of '%s' for '%s'\n", + ast_devstate2str(device_state->state), device); + ast_devstate_aggregate_add(aggregate, device_state->state); + return 0; } -static void process_collection(const char *device, enum ast_devstate_cache cachable, struct change_collection *collection) +static void device_state_dtor(void *obj) { - int i; - struct ast_devstate_aggregate agg; - enum ast_device_state state; - struct ast_event *event; + struct ast_device_state_message *device_state = obj; + ast_string_field_free_memory(device_state); + ast_free(device_state->eid); +} - ast_devstate_aggregate_init(&agg); +static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid) +{ + RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup); - for (i = 0; i < collection->num_states; i++) { - ast_debug(1, "Adding per-server state of '%s' for '%s'\n", - ast_devstate2str(collection->states[i].state), device); - ast_devstate_aggregate_add(&agg, collection->states[i].state); + if (!new_device_state || ast_string_field_init(new_device_state, 256)) { + return NULL; } - state = ast_devstate_aggregate_result(&agg); + ast_string_field_set(new_device_state, device, device); + new_device_state->state = state; + new_device_state->cachable = cachable; - ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n", - ast_devstate2str(state), device); + if (eid) { + char eid_str[20]; + struct ast_str *cache_id = ast_str_alloca(256); - event = ast_event_get_cached(AST_EVENT_DEVICE_STATE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_END); - - if (event) { - enum ast_device_state old_state; + new_device_state->eid = ast_malloc(sizeof(*eid)); + if (!new_device_state->eid) { + return NULL; + } - old_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); + *new_device_state->eid = *eid; + ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid); + ast_str_set(&cache_id, 0, "%s%s", eid_str, device); + ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id)); + } else { + /* no EID makes this an aggregate state */ + ast_string_field_set(new_device_state, cache_id, device); + } - ast_event_destroy(event); + ao2_ref(new_device_state, +1); + return new_device_state; +} - if (state == old_state) { - /* No change since last reported device state */ - ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n", - device, ast_devstate2str(state)); - return; - } - } +static enum ast_device_state get_aggregate_state(char *device) +{ + RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); + struct ast_devstate_aggregate aggregate; - ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n", - device, ast_devstate2str(state)); + ast_devstate_aggregate_init(&aggregate); - event = ast_event_new(AST_EVENT_DEVICE_STATE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state, - AST_EVENT_IE_END); + cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); - if (!event) { - return; - } + ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device); - if (cachable) { - ast_event_queue_and_cache(event); - } else { - ast_event_queue(event); - } + return ast_devstate_aggregate_result(&aggregate); } -static void handle_devstate_change(struct devstate_change *sc) +static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state) { - struct ast_event_sub *tmp_sub; - struct change_collection collection = { - .num_states = 0, - }; - - ast_debug(1, "Processing device state change for '%s'\n", sc->device); + RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup); + struct ast_device_state_message *cached_aggregate_device_state; - if (!(tmp_sub = ast_event_subscribe_new(AST_EVENT_DEVICE_STATE_CHANGE, devstate_cache_cb, &collection))) { - ast_log(LOG_ERROR, "Failed to create subscription\n"); - return; + cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + if (!cached_aggregate_msg) { + return 1; } - if (ast_event_sub_append_ie_str(tmp_sub, AST_EVENT_IE_DEVICE, sc->device)) { - ast_log(LOG_ERROR, "Failed to append device IE\n"); - ast_event_sub_destroy(tmp_sub); - return; + cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg); + if (cached_aggregate_device_state->state == new_aggregate_state) { + return 0; } - - /* Populate the collection of device states from the cache */ - ast_event_dump_cache(tmp_sub); - - process_collection(sc->device, sc->cachable, &collection); - - ast_event_sub_destroy(tmp_sub); + return 1; } -static void *run_devstate_collector(void *data) +static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - for (;;) { - struct devstate_change *sc; + enum ast_device_state aggregate_state; + char *device; + struct ast_device_state_message *device_state; + RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup); + RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup); + + if (stasis_cache_update_type() == stasis_message_type(msg)) { + struct stasis_cache_update *update = stasis_message_data(msg); + if (!update->new_snapshot) { + return; + } + msg = update->new_snapshot; + } - ast_mutex_lock(&devstate_collector.lock); - while (!(sc = AST_LIST_REMOVE_HEAD(&devstate_collector.devstate_change_q, entry))) - ast_cond_wait(&devstate_collector.cond, &devstate_collector.lock); - ast_mutex_unlock(&devstate_collector.lock); + if (ast_device_state_message_type() != stasis_message_type(msg)) { + return; + } - handle_devstate_change(sc); + device_state = stasis_message_data(msg); - destroy_devstate_change(sc); + if (!device_state->eid) { + /* ignore aggregate messages */ + return; } - return NULL; -} + device = ast_strdupa(device_state->device); + ast_debug(1, "Processing device state change for '%s'\n", device); -static void devstate_change_collector_cb(const struct ast_event *event, void *data) -{ - struct devstate_change *sc; - const char *device; - const struct ast_eid *eid; - uint32_t state; - enum ast_devstate_cache cachable = AST_DEVSTATE_CACHABLE; + if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) { + /* if it's not cachable, there will be no aggregate state to get + * and this should be passed through */ + aggregate_state = device_state->state; + } else { - device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE); - eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); - state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE); + aggregate_state = get_aggregate_state(device); + ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n", + ast_devstate2str(aggregate_state), device); - if (ast_strlen_zero(device) || !eid) { - ast_log(LOG_ERROR, "Invalid device state change event received\n"); - return; + if (!aggregate_state_changed(device, aggregate_state)) { + /* No change since last reported device state */ + ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n", + device, ast_devstate2str(aggregate_state)); + return; + } } - if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device)))) - return; - - strcpy(sc->device, device); - sc->eid = *eid; - sc->state = state; - sc->cachable = cachable; + ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n", + device, ast_devstate2str(aggregate_state)); - ast_mutex_lock(&devstate_collector.lock); - AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry); - ast_cond_signal(&devstate_collector.cond); - ast_mutex_unlock(&devstate_collector.lock); + ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL); } /*! \brief Initialize the device state engine in separate thread */ @@ -784,28 +713,106 @@ enum ast_device_state ast_devstate_aggregate_result(struct ast_devstate_aggregat return agg->state; } -int ast_enable_distributed_devstate(void) +struct stasis_topic *ast_device_state_topic_all(void) { - if (devstate_collector.enabled) { - return 0; + return device_state_topic_all; +} + +struct stasis_caching_topic *ast_device_state_topic_cached(void) +{ + return device_state_topic_cached; +} + +struct stasis_message_type *ast_device_state_message_type(void) +{ + return device_state_message_type; +} + +struct stasis_topic *ast_device_state_topic(const char *device) +{ + return stasis_topic_pool_get_topic(device_state_topic_pool, device); +} + +int ast_publish_device_state_full( + const char *device, + enum ast_device_state state, + enum ast_devstate_cache cachable, + struct ast_eid *eid) +{ + RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + struct stasis_topic *device_specific_topic; + + ast_assert(!ast_strlen_zero(device)); + + device_state = device_state_alloc(device, state, cachable, eid); + if (!device_state) { + return -1; } - devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, - devstate_change_collector_cb, "devicestate_engine_enable_distributed", NULL, AST_EVENT_IE_END); + message = stasis_message_create(ast_device_state_message_type(), device_state); - if (!devstate_collector.event_sub) { - ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); + device_specific_topic = ast_device_state_topic(device); + if (!device_specific_topic) { return -1; } - ast_mutex_init(&devstate_collector.lock); - ast_cond_init(&devstate_collector.cond, NULL); - if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) { - ast_log(LOG_ERROR, "Unable to start device state collector thread.\n"); + stasis_publish(device_specific_topic, message); + return 0; +} + +static const char *device_state_get_id(struct stasis_message *message) +{ + struct ast_device_state_message *device_state; + if (ast_device_state_message_type() != stasis_message_type(message)) { + return NULL; + } + + device_state = stasis_message_data(message); + if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) { + return NULL; + } + + return device_state->cache_id; +} + +static void devstate_exit(void) +{ + ao2_cleanup(device_state_topic_all); + device_state_topic_all = NULL; + device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached); + ao2_cleanup(device_state_message_type); + device_state_message_type = NULL; + ao2_cleanup(device_state_topic_pool); + device_state_topic_pool = NULL; +} + +int devstate_init(void) +{ + device_state_topic_all = stasis_topic_create("ast_device_state_topic"); + if (!device_state_topic_all) { + return -1; + } + device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id); + if (!device_state_topic_cached) { + return -1; + } + device_state_message_type = stasis_message_type_create("ast_device_state_message"); + if (!device_state_message_type) { + return -1; + } + device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all()); + if (!device_state_topic_pool) { return -1; } - devstate_collector.enabled = 1; + devstate_message_sub = stasis_subscribe(stasis_caching_get_topic(ast_device_state_topic_cached()), devstate_change_collector_cb, NULL); + + if (!devstate_message_sub) { + ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); + return -1; + } + ast_register_atexit(devstate_exit); return 0; } |