From 71a01725b850505a26d800288dc00e376e885dc1 Mon Sep 17 00:00:00 2001 From: Kinsey Moore Date: Tue, 16 Apr 2013 15:48:16 +0000 Subject: Move presence state distribution to Stasis-core Convert presence state events to Stasis-core messages and remove redundant serializers where possible. Review: https://reviewboard.asterisk.org/r/2410/ (closes issue ASTERISK-21102) Patch-by: Kinsey Moore git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@385862 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/pbx.c | 266 +++++++++++++++++++-------------------------------- main/presencestate.c | 163 +++++++++++++++++-------------- 2 files changed, 190 insertions(+), 239 deletions(-) (limited to 'main') diff --git a/main/pbx.c b/main/pbx.c index 017896979..43805ce7c 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -819,8 +819,6 @@ AST_APP_OPTIONS(waitexten_opts, { struct ast_context; struct ast_app; -static struct ast_taskprocessor *extension_state_tps; - AST_THREADSTORAGE(switch_data); AST_THREADSTORAGE(extensionstate_buf); @@ -1123,13 +1121,6 @@ static const struct cfextension_states { { AST_EXTENSION_INUSE | AST_EXTENSION_ONHOLD, "InUse&Hold" } }; -struct presencechange { - char *provider; - int state; - char *subtype; - char *message; -}; - struct pbx_exception { AST_DECLARE_STRING_FIELDS( AST_STRING_FIELD(context); /*!< Context associated with this exception */ @@ -1297,7 +1288,7 @@ static char *overrideswitch = NULL; /*! \brief Subscription for device state change events */ static struct stasis_subscription *device_state_sub; /*! \brief Subscription for presence state change events */ -static struct ast_event_sub *presence_state_sub; +static struct stasis_subscription *presence_state_sub; AST_MUTEX_DEFINE_STATIC(maxcalllock); static int countcalls; @@ -5042,125 +5033,6 @@ static int execute_state_callback(ast_state_cb_type cb, return res; } -static int handle_presencechange(void *datap) -{ - struct ast_hint *hint; - struct ast_str *hint_app = NULL; - struct presencechange *pc = datap; - struct ao2_iterator i; - struct ao2_iterator cb_iter; - char context_name[AST_MAX_CONTEXT]; - char exten_name[AST_MAX_EXTENSION]; - int res = -1; - - hint_app = ast_str_create(1024); - if (!hint_app) { - goto presencechange_cleanup; - } - - ast_mutex_lock(&context_merge_lock);/* Hold off ast_merge_contexts_and_delete */ - i = ao2_iterator_init(hints, 0); - for (; (hint = ao2_iterator_next(&i)); ao2_ref(hint, -1)) { - struct ast_state_cb *state_cb; - const char *app; - char *parse; - - ao2_lock(hint); - - if (!hint->exten) { - /* The extension has already been destroyed */ - ao2_unlock(hint); - continue; - } - - /* Does this hint monitor the device that changed state? */ - app = ast_get_extension_app(hint->exten); - if (ast_strlen_zero(app)) { - /* The hint does not monitor presence at all. */ - ao2_unlock(hint); - continue; - } - - ast_str_set(&hint_app, 0, "%s", app); - parse = parse_hint_presence(hint_app); - if (ast_strlen_zero(parse)) { - ao2_unlock(hint); - continue; - } - if (strcasecmp(parse, pc->provider)) { - /* The hint does not monitor the presence provider. */ - ao2_unlock(hint); - continue; - } - - /* - * Save off strings in case the hint extension gets destroyed - * while we are notifying the watchers. - */ - ast_copy_string(context_name, - ast_get_context_name(ast_get_extension_context(hint->exten)), - sizeof(context_name)); - ast_copy_string(exten_name, ast_get_extension_name(hint->exten), - sizeof(exten_name)); - ast_str_set(&hint_app, 0, "%s", ast_get_extension_app(hint->exten)); - - /* Check to see if update is necessary */ - if ((hint->last_presence_state == pc->state) && - ((hint->last_presence_subtype && pc->subtype && !strcmp(hint->last_presence_subtype, pc->subtype)) || (!hint->last_presence_subtype && !pc->subtype)) && - ((hint->last_presence_message && pc->message && !strcmp(hint->last_presence_message, pc->message)) || (!hint->last_presence_message && !pc->message))) { - - /* this update is the same as the last, do nothing */ - ao2_unlock(hint); - continue; - } - - /* update new values */ - ast_free(hint->last_presence_subtype); - ast_free(hint->last_presence_message); - hint->last_presence_state = pc->state; - hint->last_presence_subtype = pc->subtype ? ast_strdup(pc->subtype) : NULL; - hint->last_presence_message = pc->message ? ast_strdup(pc->message) : NULL; - - ao2_unlock(hint); - - /* For general callbacks */ - cb_iter = ao2_iterator_init(statecbs, 0); - for (; (state_cb = ao2_iterator_next(&cb_iter)); ao2_ref(state_cb, -1)) { - execute_state_callback(state_cb->change_cb, - context_name, - exten_name, - state_cb->data, - AST_HINT_UPDATE_PRESENCE, - hint, - NULL); - } - ao2_iterator_destroy(&cb_iter); - - /* For extension callbacks */ - cb_iter = ao2_iterator_init(hint->callbacks, 0); - for (; (state_cb = ao2_iterator_next(&cb_iter)); ao2_ref(state_cb, -1)) { - execute_state_callback(state_cb->change_cb, - context_name, - exten_name, - state_cb->data, - AST_HINT_UPDATE_PRESENCE, - hint, - NULL); - } - ao2_iterator_destroy(&cb_iter); - } - ao2_iterator_destroy(&i); - ast_mutex_unlock(&context_merge_lock); - - res = 0; - -presencechange_cleanup: - ast_free(hint_app); - ao2_ref(pc, -1); - - return res; -} - /*! * /internal * /brief Identify a channel for every device which is supposedly responsible for the device state. @@ -11663,49 +11535,112 @@ static int pbx_builtin_sayphonetic(struct ast_channel *chan, const char *data) return res; } -static void presencechange_destroy(void *data) -{ - struct presencechange *pc = data; - ast_free(pc->provider); - ast_free(pc->subtype); - ast_free(pc->message); -} - -static void presence_state_cb(const struct ast_event *event, void *unused) +static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - struct presencechange *pc; - const char *tmp; + struct ast_presence_state_message *presence_state = stasis_message_data(msg); + struct ast_hint *hint; + struct ast_str *hint_app = NULL; + struct ao2_iterator hint_iter; + struct ao2_iterator cb_iter; + char context_name[AST_MAX_CONTEXT]; + char exten_name[AST_MAX_EXTENSION]; - if (!(pc = ao2_alloc(sizeof(*pc), presencechange_destroy))) { + if (stasis_message_type(msg) != ast_presence_state_message_type()) { return; } - tmp = ast_event_get_ie_str(event, AST_EVENT_IE_PRESENCE_PROVIDER); - if (ast_strlen_zero(tmp)) { - ast_log(LOG_ERROR, "Received invalid event that had no presence provider IE\n"); - ao2_ref(pc, -1); + hint_app = ast_str_create(1024); + if (!hint_app) { return; } - pc->provider = ast_strdup(tmp); - pc->state = ast_event_get_ie_uint(event, AST_EVENT_IE_PRESENCE_STATE); - if (pc->state < 0) { - ao2_ref(pc, -1); - return; - } + ast_mutex_lock(&context_merge_lock);/* Hold off ast_merge_contexts_and_delete */ + hint_iter = ao2_iterator_init(hints, 0); + for (; (hint = ao2_iterator_next(&hint_iter)); ao2_cleanup(hint)) { + struct ast_state_cb *state_cb; + const char *app; + char *parse; + SCOPED_AO2LOCK(lock, hint); - if ((tmp = ast_event_get_ie_str(event, AST_EVENT_IE_PRESENCE_SUBTYPE))) { - pc->subtype = ast_strdup(tmp); - } + if (!hint->exten) { + /* The extension has already been destroyed */ + continue; + } - if ((tmp = ast_event_get_ie_str(event, AST_EVENT_IE_PRESENCE_MESSAGE))) { - pc->message = ast_strdup(tmp); - } + /* Does this hint monitor the device that changed state? */ + app = ast_get_extension_app(hint->exten); + if (ast_strlen_zero(app)) { + /* The hint does not monitor presence at all. */ + continue; + } + + ast_str_set(&hint_app, 0, "%s", app); + parse = parse_hint_presence(hint_app); + if (ast_strlen_zero(parse)) { + continue; + } + if (strcasecmp(parse, presence_state->provider)) { + /* The hint does not monitor the presence provider. */ + continue; + } + + /* + * Save off strings in case the hint extension gets destroyed + * while we are notifying the watchers. + */ + ast_copy_string(context_name, + ast_get_context_name(ast_get_extension_context(hint->exten)), + sizeof(context_name)); + ast_copy_string(exten_name, ast_get_extension_name(hint->exten), + sizeof(exten_name)); + ast_str_set(&hint_app, 0, "%s", ast_get_extension_app(hint->exten)); - /* The task processor thread is taking our reference to the presencechange object. */ - if (ast_taskprocessor_push(extension_state_tps, handle_presencechange, pc) < 0) { - ao2_ref(pc, -1); + /* Check to see if update is necessary */ + if ((hint->last_presence_state == presence_state->state) && + ((hint->last_presence_subtype && presence_state->subtype && !strcmp(hint->last_presence_subtype, presence_state->subtype)) || (!hint->last_presence_subtype && !presence_state->subtype)) && + ((hint->last_presence_message && presence_state->message && !strcmp(hint->last_presence_message, presence_state->message)) || (!hint->last_presence_message && !presence_state->message))) { + + /* this update is the same as the last, do nothing */ + continue; + } + + /* update new values */ + ast_free(hint->last_presence_subtype); + ast_free(hint->last_presence_message); + hint->last_presence_state = presence_state->state; + hint->last_presence_subtype = presence_state->subtype ? ast_strdup(presence_state->subtype) : NULL; + hint->last_presence_message = presence_state->message ? ast_strdup(presence_state->message) : NULL; + + /* For general callbacks */ + cb_iter = ao2_iterator_init(statecbs, 0); + for (; (state_cb = ao2_iterator_next(&cb_iter)); ao2_ref(state_cb, -1)) { + execute_state_callback(state_cb->change_cb, + context_name, + exten_name, + state_cb->data, + AST_HINT_UPDATE_PRESENCE, + hint, + NULL); + } + ao2_iterator_destroy(&cb_iter); + + /* For extension callbacks */ + cb_iter = ao2_iterator_init(hint->callbacks, 0); + for (; (state_cb = ao2_iterator_next(&cb_iter)); ao2_cleanup(state_cb)) { + execute_state_callback(state_cb->change_cb, + context_name, + exten_name, + state_cb->data, + AST_HINT_UPDATE_PRESENCE, + hint, + NULL); + } + ao2_iterator_destroy(&cb_iter); } + ao2_iterator_destroy(&hint_iter); + ast_mutex_unlock(&context_merge_lock); + + ast_free(hint_app); } /*! @@ -11765,7 +11700,7 @@ static void unload_pbx(void) int x; if (presence_state_sub) { - presence_state_sub = ast_event_unsubscribe(presence_state_sub); + presence_state_sub = stasis_unsubscribe(presence_state_sub); } if (device_state_sub) { device_state_sub = stasis_unsubscribe(device_state_sub); @@ -11780,9 +11715,6 @@ static void unload_pbx(void) ast_custom_function_unregister(&exception_function); ast_custom_function_unregister(&testtime_function); ast_data_unregister(NULL); - if (extension_state_tps) { - extension_state_tps = ast_taskprocessor_unreference(extension_state_tps); - } } int load_pbx(void) @@ -11793,9 +11725,6 @@ int load_pbx(void) /* Initialize the PBX */ ast_verb(1, "Asterisk PBX Core Initializing\n"); - if (!(extension_state_tps = ast_taskprocessor_get("pbx-core", 0))) { - ast_log(LOG_WARNING, "failed to create pbx-core taskprocessor\n"); - } ast_verb(1, "Registering builtin applications:\n"); ast_cli_register_multiple(pbx_cli, ARRAY_LEN(pbx_cli)); @@ -11819,8 +11748,7 @@ int load_pbx(void) return -1; } - if (!(presence_state_sub = ast_event_subscribe(AST_EVENT_PRESENCE_STATE, presence_state_cb, "pbx Presence State Change", NULL, - AST_EVENT_IE_END))) { + if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) { return -1; } diff --git a/main/presencestate.c b/main/presencestate.c index b87b09242..ae85f01d4 100644 --- a/main/presencestate.c +++ b/main/presencestate.c @@ -53,13 +53,9 @@ static const struct { { "dnd", AST_PRESENCE_DND}, }; -/*! \brief Flag for the queue */ -static ast_cond_t change_pending; - -struct state_change { - AST_LIST_ENTRY(state_change) list; - char provider[1]; -}; +struct stasis_message_type *presence_state_type; +struct stasis_topic *presence_state_topic_all; +struct stasis_caching_topic *presence_state_topic_cached; /*! \brief A presence state provider */ struct presence_state_provider { @@ -71,13 +67,6 @@ struct presence_state_provider { /*! \brief A list of providers */ static AST_RWLIST_HEAD_STATIC(presence_state_providers, presence_state_provider); -/*! \brief The state change queue. State changes are queued - for processing by a separate thread */ -static AST_LIST_HEAD_STATIC(state_changes, state_change); - -/*! \brief The presence state change notification thread */ -static pthread_t change_thread = AST_PTHREADT_NULL; - const char *ast_presence_state2str(enum ast_presence_state state) { int i; @@ -103,25 +92,20 @@ enum ast_presence_state ast_presence_state_val(const char *val) static enum ast_presence_state presence_state_cached(const char *presence_provider, char **subtype, char **message) { enum ast_presence_state res = AST_PRESENCE_INVALID; - struct ast_event *event; - const char *_subtype; - const char *_message; + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct ast_presence_state_message *presence_state; - event = ast_event_get_cached(AST_EVENT_PRESENCE_STATE, - AST_EVENT_IE_PRESENCE_PROVIDER, AST_EVENT_IE_PLTYPE_STR, presence_provider, - AST_EVENT_IE_END); + msg = stasis_cache_get(ast_presence_state_topic_cached(), ast_presence_state_message_type(), presence_provider); - if (!event) { + if (!msg) { return res; } - res = ast_event_get_ie_uint(event, AST_EVENT_IE_PRESENCE_STATE); - _subtype = ast_event_get_ie_str(event, AST_EVENT_IE_PRESENCE_SUBTYPE); - _message = ast_event_get_ie_str(event, AST_EVENT_IE_PRESENCE_MESSAGE); + presence_state = stasis_message_data(msg); + res = presence_state->state; - *subtype = !ast_strlen_zero(_subtype) ? ast_strdup(_subtype) : NULL; - *message = !ast_strlen_zero(_message) ? ast_strdup(_message) : NULL; - ast_event_destroy(event); + *subtype = !ast_strlen_zero(presence_state->subtype) ? ast_strdup(presence_state->subtype) : NULL; + *message = !ast_strlen_zero(presence_state->message) ? ast_strdup(presence_state->message) : NULL; return res; } @@ -213,23 +197,50 @@ int ast_presence_state_prov_del(const char *label) return res; } +static void presence_state_dtor(void *obj) +{ + struct ast_presence_state_message *presence_state = obj; + ast_string_field_free_memory(presence_state); +} + +static struct ast_presence_state_message *presence_state_alloc(const char *provider, + enum ast_presence_state state, + const char *subtype, + const char *message) +{ + RAII_VAR(struct ast_presence_state_message *, presence_state, ao2_alloc(sizeof(*presence_state), presence_state_dtor), ao2_cleanup); + + if (!presence_state || ast_string_field_init(presence_state, 256)) { + return NULL; + } + + presence_state->state = state; + ast_string_field_set(presence_state, provider, provider); + ast_string_field_set(presence_state, subtype, S_OR(subtype, "")); + ast_string_field_set(presence_state, message, S_OR(message, "")); + + ao2_ref(presence_state, +1); + return presence_state; +} + static void presence_state_event(const char *provider, enum ast_presence_state state, const char *subtype, const char *message) { - struct ast_event *event; - - if (!(event = ast_event_new(AST_EVENT_PRESENCE_STATE, - AST_EVENT_IE_PRESENCE_PROVIDER, AST_EVENT_IE_PLTYPE_STR, provider, - AST_EVENT_IE_PRESENCE_STATE, AST_EVENT_IE_PLTYPE_UINT, state, - AST_EVENT_IE_PRESENCE_SUBTYPE, AST_EVENT_IE_PLTYPE_STR, S_OR(subtype, ""), - AST_EVENT_IE_PRESENCE_MESSAGE, AST_EVENT_IE_PLTYPE_STR, S_OR(message, ""), - AST_EVENT_IE_END))) { + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + RAII_VAR(struct ast_presence_state_message *, presence_state, presence_state_alloc(provider, state, subtype, message), ao2_cleanup); + + if (!presence_state) { return; } - ast_event_queue_and_cache(event); + msg = stasis_message_create(ast_presence_state_message_type(), presence_state); + if (!msg) { + return; + } + + stasis_publish(ast_presence_state_topic_all(), msg); } static void do_presence_state_change(const char *provider) @@ -254,19 +265,10 @@ int ast_presence_state_changed_literal(enum ast_presence_state state, const char *message, const char *presence_provider) { - struct state_change *change; - - if (state != AST_PRESENCE_NOT_SET) { - presence_state_event(presence_provider, state, subtype, message); - } else if ((change_thread == AST_PTHREADT_NULL) || - !(change = ast_calloc(1, sizeof(*change) + strlen(presence_provider)))) { + if (state == AST_PRESENCE_NOT_SET) { do_presence_state_change(presence_provider); } else { - strcpy(change->provider, presence_provider); - AST_LIST_LOCK(&state_changes); - AST_LIST_INSERT_TAIL(&state_changes, change, list); - ast_cond_signal(&change_pending); - AST_LIST_UNLOCK(&state_changes); + presence_state_event(presence_provider, state, subtype, message); } return 0; @@ -287,38 +289,59 @@ int ast_presence_state_changed(enum ast_presence_state state, return ast_presence_state_changed_literal(state, subtype, message, buf); } -/*! \brief Go through the presence state change queue and update changes in the presence state thread */ -static void *do_presence_changes(void *data) +struct stasis_message_type *ast_presence_state_message_type(void) { - struct state_change *next, *current; - - for (;;) { - /* This basically pops off any state change entries, resets the list back to NULL, unlocks, and processes each state change */ - AST_LIST_LOCK(&state_changes); - if (AST_LIST_EMPTY(&state_changes)) - ast_cond_wait(&change_pending, &state_changes.lock); - next = AST_LIST_FIRST(&state_changes); - AST_LIST_HEAD_INIT_NOLOCK(&state_changes); - AST_LIST_UNLOCK(&state_changes); - - /* Process each state change */ - while ((current = next)) { - next = AST_LIST_NEXT(current, list); - do_presence_state_change(current->provider); - ast_free(current); - } + return presence_state_type; +} + +struct stasis_topic *ast_presence_state_topic_all(void) +{ + return presence_state_topic_all; +} + +struct stasis_caching_topic *ast_presence_state_topic_cached(void) +{ + return presence_state_topic_cached; +} + +static const char *presence_state_get_id(struct stasis_message *msg) +{ + struct ast_presence_state_message *presence_state = stasis_message_data(msg); + + if (stasis_message_type(msg) != ast_presence_state_message_type()) { + return NULL; } - return NULL; + return presence_state->provider; +} + +static void presence_state_engine_cleanup(void) +{ + ao2_cleanup(presence_state_topic_all); + presence_state_topic_all = NULL; + ao2_cleanup(presence_state_topic_cached); + presence_state_topic_cached = NULL; + ao2_cleanup(presence_state_type); + presence_state_type = NULL; } int ast_presence_state_engine_init(void) { - ast_cond_init(&change_pending, NULL); - if (ast_pthread_create_background(&change_thread, NULL, do_presence_changes, NULL) < 0) { - ast_log(LOG_ERROR, "Unable to start presence state change thread.\n"); + presence_state_type = stasis_message_type_create("ast_presence_state_message"); + if (!presence_state_type) { + return -1; + } + + presence_state_topic_all = stasis_topic_create("ast_presence_state_topic_all"); + if (!presence_state_topic_all) { + return -1; + } + + presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_get_id); + if (!presence_state_topic_cached) { return -1; } + ast_register_atexit(presence_state_engine_cleanup); return 0; } -- cgit v1.2.3