summaryrefslogtreecommitdiff
path: root/res/ais/lck.c
diff options
context:
space:
mode:
authorRussell Bryant <russell@russellbryant.com>2008-06-10 15:12:17 +0000
committerRussell Bryant <russell@russellbryant.com>2008-06-10 15:12:17 +0000
commite9d72e0cb22838d180a31d98deb40d4d951df0c7 (patch)
treed87634ac06f4e43877c9790c9f70d61d70b246f1 /res/ais/lck.c
parent823d1c7ea97e33646f38274ffa4ce585021eae3e (diff)
Merge another big set of changes from team/russell/events
This commit merges in the rest of the code needed to support distributed device state. There are two main parts to this commit. Core changes: - The device state handling in the core has been updated to understand device state across a cluster of Asterisk servers. Every time the state of a device changes, it looks at all of the device states on each node, and determines the aggregate device state. That resulting device state is what is provided to modules in Asterisk that take actions based on the state of a device. New module, res_ais: - A module has been written to facilitate the communication of events between nodes in a cluster of Asterisk servers. This module uses the SAForum AIS (Service Availability Forum Application Interface Specification) CLM and EVT services (Cluster Management and Event) to handle this task. This module currently supports sharing Voicemail MWI (Message Waiting Indication) and device state events between servers. It has been tested with openais, though other implementations of the spec do exist. For more information on testing distributed device state, see the following doc: - doc/distributed_devstate.txt git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@121559 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res/ais/lck.c')
-rw-r--r--res/ais/lck.c551
1 files changed, 551 insertions, 0 deletions
diff --git a/res/ais/lck.c b/res/ais/lck.c
new file mode 100644
index 000000000..7e7533dc6
--- /dev/null
+++ b/res/ais/lck.c
@@ -0,0 +1,551 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Digium, Inc.
+ *
+ * Russell Bryant <russell@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \author Russell Bryant <russell@digium.com>
+ *
+ * \brief Usage of the SAForum AIS (Application Interface Specification)
+ *
+ * \arg http://www.openais.org/
+ *
+ * This file contains the code specific to the use of the LCK
+ * (Distributed Locks) Service.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "ais.h"
+
+#include "asterisk/module.h"
+#include "asterisk/utils.h"
+#include "asterisk/cli.h"
+#include "asterisk/logger.h"
+#include "asterisk/pbx.h"
+#include "asterisk/app.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/strings.h"
+#include "asterisk/channel.h"
+
+SaLckHandleT lck_handle;
+
+/*!
+ * \brief Callbacks available in the Lock Service
+ *
+ * None of these are actually required if only synchronous locking is used.
+ * However, some of them must be implemented should the asynchronous locks
+ * be used.
+ */
+static SaLckCallbacksT lck_callbacks = {
+ /*! Get notified when a cluster-wide lock gets created */
+ .saLckResourceOpenCallback = NULL,
+ /*! Get notified when an asynchronous lock request gets granted */
+ .saLckLockGrantCallback = NULL,
+ /*! Be informed when a currently held lock is blocking another node */
+ .saLckLockWaiterCallback = NULL,
+ /*! Get notified when an asynchronous unlock request is done */
+ .saLckResourceUnlockCallback = NULL,
+};
+
+enum lock_type {
+ RDLOCK,
+ WRLOCK,
+ TRY_RDLOCK,
+ TRY_WRLOCK,
+};
+
+#define LOCK_BUCKETS 101
+
+/*!
+ * Every thread that wants to use a distributed lock must open its own handle
+ * to the lock. So, a thread-local container of opened locks is used to keep
+ * track of what locks have been opened.
+ *
+ * \todo It would be nice to be able to have a thread-local container, instead
+ * of using a thread-local wrapper like this.
+ */
+struct lock_resources {
+ struct ao2_container *locks;
+};
+
+static int lock_resources_init(void *);
+static void lock_resources_destroy(void *);
+
+AST_THREADSTORAGE_CUSTOM(locks_ts_key,
+ lock_resources_init, lock_resources_destroy);
+
+struct lock_resource {
+ SaLckResourceHandleT handle;
+ SaLckLockIdT id;
+ SaNameT ais_name;
+ const char *name;
+};
+
+static int lock_hash_cb(const void *obj, int flags)
+{
+ const struct lock_resource *lock = obj;
+
+ return ast_str_hash(lock->name);
+}
+
+static int lock_cmp_cb(void *obj, void *arg, int flags)
+{
+ struct lock_resource *lock1 = obj, *lock2 = arg;
+
+ return !strcasecmp(lock1->name, lock2->name) ? CMP_MATCH : 0;
+}
+
+static int lock_resources_init(void *data)
+{
+ struct lock_resources *lock_resources = data;
+
+ if (!(lock_resources->locks = ao2_container_alloc(LOCK_BUCKETS,
+ lock_hash_cb, lock_cmp_cb))) {
+ return -1;
+ }
+
+ return 0;
+
+}
+
+static void lock_resources_destroy(void *data)
+{
+ struct lock_resources *lock_resources = data;
+
+ ao2_ref(lock_resources->locks, -1);
+
+ ast_free(lock_resources);
+}
+
+static void lock_destructor(void *obj)
+{
+ struct lock_resource *lock = obj;
+
+ if (lock->name)
+ ast_free((void *) lock->name);
+}
+
+static inline struct lock_resource *lock_ref(struct lock_resource *lock)
+{
+ ao2_ref(lock, +1);
+ return lock;
+}
+
+static inline struct lock_resource *lock_unref(struct lock_resource *lock)
+{
+ ao2_ref(lock, -1);
+ return NULL;
+}
+
+static void lock_datastore_destroy(void *data)
+{
+ struct lock_resource *lock = data;
+ SaAisErrorT ais_res;
+
+ ais_res = saLckResourceUnlock(lock->id, SA_TIME_ONE_SECOND * 3);
+ if (ais_res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error unlocking '%s': %s\n", lock->name,
+ ais_err2str(ais_res));
+ }
+
+ lock_unref(lock);
+}
+
+static struct lock_resource *find_lock(const char *name)
+{
+ struct lock_resource *lock, tmp_lock = {
+ .name = name,
+ };
+ SaAisErrorT ais_res;
+ struct lock_resources *lock_resources;
+
+ if (!(lock_resources = ast_threadstorage_get(&locks_ts_key,
+ sizeof(*lock_resources)))) {
+ return NULL;
+ }
+
+ /* Return the lock if it has already been opened by this thread */
+ if ((lock = ao2_find(lock_resources->locks, &tmp_lock, OBJ_POINTER)))
+ return lock;
+
+ /* Allocate and open the lock */
+ if (!(lock = ao2_alloc(sizeof(*lock), lock_destructor)))
+ return NULL;
+
+ if (!(lock->name = ast_strdup(name)))
+ return lock_unref(lock);
+
+ /* Map the name into the SaNameT for convenience */
+ ast_copy_string((char *) lock->ais_name.value, lock->name,
+ sizeof(lock->ais_name.value));
+ lock->ais_name.length = strlen(lock->name);
+
+ ais_res = saLckResourceOpen(lck_handle, &lock->ais_name,
+ SA_LCK_RESOURCE_CREATE, SA_TIME_ONE_SECOND * 3, &lock->handle);
+ if (ais_res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Failed to open lock: %s\n", ais_err2str(ais_res));
+ return lock_unref(lock);
+ }
+
+ return lock;
+}
+
+const struct ast_datastore_info dlock_datastore_info = {
+ .type = "DLOCK",
+ .destroy = lock_datastore_destroy,
+};
+
+static void add_lock_to_chan(struct ast_channel *chan, struct lock_resource *lock,
+ enum lock_type lock_type, double timeout, char *buf, size_t len)
+{
+ struct ast_datastore *datastore;
+ SaAisErrorT ais_res;
+ SaLckLockModeT mode = 0;
+ SaLckLockFlagsT flags = 0;
+ SaLckLockStatusT status;
+
+ ast_channel_lock(chan);
+ datastore = ast_channel_datastore_find(chan, &dlock_datastore_info, lock->name);
+
+ if (datastore) {
+ ast_log(LOG_ERROR, "The DLOCK '%s' is already locked by channel '%s'\n",
+ lock->name, chan->name);
+ ast_channel_unlock(chan);
+ ast_copy_string(buf, "FAILURE", len);
+ return;
+ }
+ ast_channel_unlock(chan);
+
+ switch (lock_type) {
+ case TRY_RDLOCK:
+ flags = SA_LCK_LOCK_NO_QUEUE;
+ mode = SA_LCK_PR_LOCK_MODE;
+ break;
+ case RDLOCK:
+ flags = SA_LCK_LOCK_NO_QUEUE;
+ mode = SA_LCK_PR_LOCK_MODE;
+ break;
+ case TRY_WRLOCK:
+ flags = SA_LCK_LOCK_NO_QUEUE;
+ mode = SA_LCK_EX_LOCK_MODE;
+ break;
+ case WRLOCK:
+ flags = SA_LCK_LOCK_NO_QUEUE;
+ mode = SA_LCK_EX_LOCK_MODE;
+ break;
+ }
+
+ /* Actually acquire the lock now */
+ ais_res = saLckResourceLock(lock->handle, &lock->id, mode, flags, 0,
+ (SaTimeT) timeout * SA_TIME_ONE_SECOND, &status);
+ if (ais_res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Problem acquiring lock '%s': %s\n",
+ lock->name, ais_err2str(ais_res));
+ ast_copy_string(buf, (ais_res == SA_AIS_ERR_TIMEOUT) ? "TIMEOUT" :
+ "FAILURE", len);
+ return;
+ }
+
+ switch (status) {
+ case SA_LCK_LOCK_GRANTED:
+ ast_copy_string(buf, "SUCCESS", len);
+ break;
+ case SA_LCK_LOCK_DEADLOCK:
+ ast_copy_string(buf, "DEADLOCK", len);
+ return;
+ /*! XXX \todo Need to look at handling these other cases in a different way */
+ case SA_LCK_LOCK_NOT_QUEUED:
+ case SA_LCK_LOCK_ORPHANED:
+ case SA_LCK_LOCK_NO_MORE:
+ case SA_LCK_LOCK_DUPLICATE_EX:
+ ast_copy_string(buf, "FAILURE", len);
+ return;
+ }
+
+ if (!(datastore = ast_channel_datastore_alloc(&dlock_datastore_info,
+ lock->name))) {
+ ast_copy_string(buf, "FAILURE", len);
+ return;
+ }
+
+ datastore->data = lock_ref(lock);
+
+ ast_channel_lock(chan);
+ ast_channel_datastore_add(chan, datastore);
+ ast_channel_unlock(chan);
+}
+
+static int handle_lock(struct ast_channel *chan, enum lock_type lock_type,
+ char *data, char *buf, size_t len)
+{
+ AST_DECLARE_APP_ARGS(args,
+ AST_APP_ARG(name);
+ AST_APP_ARG(timeout);
+ );
+ int res = 0;
+ double timeout = 3;
+ struct lock_resource *lock = NULL;
+
+ ast_autoservice_start(chan);
+
+ AST_STANDARD_APP_ARGS(args, data);
+ if (ast_strlen_zero(args.name)) {
+ ast_log(LOG_ERROR, "The DLOCK functions require a lock name\n");
+ res = -1;
+ goto return_cleanup;
+ }
+ switch (lock_type) {
+ case RDLOCK:
+ case WRLOCK:
+ if (!ast_strlen_zero(args.timeout) && ((timeout = atof(args.timeout)) < 0)) {
+ ast_log(LOG_ERROR, "Timeout value '%s' not valid\n", args.timeout);
+ res = -1;
+ goto return_cleanup;
+ }
+ break;
+ case TRY_RDLOCK:
+ case TRY_WRLOCK:
+ if (!ast_strlen_zero(args.timeout)) {
+ ast_log(LOG_ERROR, "The trylock functions only take one argument\n");
+ res = -1;
+ goto return_cleanup;
+ }
+ }
+
+ if (!(lock = find_lock(args.name))) {
+ ast_copy_string(buf, "FAILURE", len);
+ res = -1;
+ goto return_cleanup;
+ }
+
+ add_lock_to_chan(chan, lock, lock_type, timeout, buf, len);
+
+ lock = lock_unref(lock);
+
+return_cleanup:
+ ast_autoservice_stop(chan);
+
+ return res;
+}
+
+static int handle_rdlock(struct ast_channel *chan, const char *cmd, char *data,
+ char *buf, size_t len)
+{
+ return handle_lock(chan, RDLOCK, data, buf, len);
+}
+
+static int handle_wrlock(struct ast_channel *chan, const char *cmd, char *data,
+ char *buf, size_t len)
+{
+ return handle_lock(chan, WRLOCK, data, buf, len);
+}
+
+static int handle_tryrdlock(struct ast_channel *chan, const char *cmd, char *data,
+ char *buf, size_t len)
+{
+ return handle_lock(chan, TRY_RDLOCK, data, buf, len);
+}
+
+static int handle_trywrlock(struct ast_channel *chan, const char *cmd, char *data,
+ char *buf, size_t len)
+{
+ return handle_lock(chan, TRY_WRLOCK, data, buf, len);
+}
+
+static int handle_unlock(struct ast_channel *chan, const char *cmd, char *data,
+ char *buf, size_t len)
+{
+ struct ast_datastore *datastore;
+ struct lock_resource *lock;
+ SaAisErrorT ais_res;
+ int res = 0;
+
+ if (ast_strlen_zero(data)) {
+ ast_log(LOG_ERROR, "DLOCK_UNLOCK requires a lock name\n");
+ ast_copy_string(buf, "FAILURE", len);
+ return -1;
+ }
+
+ ast_autoservice_start(chan);
+
+ ast_channel_lock(chan);
+ datastore = ast_channel_datastore_find(chan, &dlock_datastore_info, data);
+ if (!datastore) {
+ ast_log(LOG_ERROR, "The DLOCK '%s' is not locked by channel '%s'\n",
+ data, chan->name);
+ ast_channel_unlock(chan);
+ ast_copy_string(buf, "FAILURE", len);
+ return -1;
+ }
+ ast_channel_datastore_remove(chan, datastore);
+ ast_channel_unlock(chan);
+
+ lock = datastore->data;
+ ais_res = saLckResourceUnlock(lock->id, SA_TIME_ONE_SECOND * 3);
+ if (ais_res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error unlocking '%s': %s\n", lock->name,
+ ais_err2str(ais_res));
+ res = -1;
+ ast_copy_string(buf, (ais_res == SA_AIS_ERR_TIMEOUT) ? "TIMEOUT" :
+ "FAILURE", len);
+ } else {
+ ast_copy_string(buf, "SUCCESS", len);
+ }
+
+ datastore->data = lock_unref(lock);
+ ast_channel_datastore_free(datastore);
+
+ ast_autoservice_stop(chan);
+
+ return res;
+}
+
+#define LOCK_DESC_COMMON \
+" The name of the lock can be anything. The first time a named lock gets\n" \
+"used, it will be automatically created and maintained amongst the cluster.\n" \
+" The result of this function will be one of the following:\n" \
+" SUCCESS | TIMEOUT | FAILURE | DEADLOCK\n" DEADLOCK_DESC
+
+#define DEADLOCK_DESC \
+" The result, DEADLOCK, can only be provided if the AIS implementation in\n" \
+"use provides the optional feature of deadlock detection. If the lock fails\n" \
+"with the result of DEADLOCK, it means that the AIS implementation has\n" \
+"determined that if this lock were acquired, it would cause a deadlock.\n"
+
+static struct ast_custom_function dlock_rdlock = {
+ .name = "DLOCK_RDLOCK",
+ .synopsis = "Read-lock a distributed lock",
+ .desc =
+" This function will read-lock a distributed lock provided by the locking\n"
+"service of AIS. This is a blocking operation. However, a timeout can be\n"
+"specified to avoid deadlocks. The default timeout used if one is not\n"
+"provided as an argument is 3 seconds.\n"
+LOCK_DESC_COMMON
+"",
+ .syntax = "DLOCK_RDLOCK(<lock_name>,[timeout])",
+ .read = handle_rdlock,
+};
+
+static struct ast_custom_function dlock_wrlock = {
+ .name = "DLOCK_WRLOCK",
+ .synopsis = "Write-lock a distributed lock",
+ .desc =
+" This function will write-lock a distributed lock provided by the locking\n"
+"service of AIS. This is a blocking operation. However, a timeout can be\n"
+"specified to avoid deadlocks. The default timeout used if one is not\n"
+"provided as an argument is 3 seconds.\n"
+LOCK_DESC_COMMON
+"",
+ .syntax = "DLOCK_WRLOCK(<lock_name>,[timeout])",
+ .read = handle_wrlock,
+};
+
+static struct ast_custom_function dlock_tryrdlock = {
+ .name = "DLOCK_TRYRDLOCK",
+ .synopsis = "Try to read-lock a distributed lock",
+ .desc =
+" This function will attempt to read-lock a distributed lock provided by the\n"
+"locking service of AIS. This is a non-blocking operation.\n"
+" The name of the lock can be anything. The first time a named lock gets\n"
+"used, it will be automatically created and maintained amongst the cluster.\n"
+" The result of this function will be one of the following:\n"
+" SUCCESS | FAILURE | DEADLOCK\n"
+DEADLOCK_DESC
+"",
+ .syntax = "DLOCK_TRYRDLOCK(<lock_name>)",
+ .read = handle_tryrdlock,
+};
+
+static struct ast_custom_function dlock_trywrlock = {
+ .name = "DLOCK_TRYWRLOCK",
+ .synopsis = "Try to write-lock a distributed lock",
+ .desc =
+" This function will attempt to write-lock a distributed lock provided by\n"
+"the locking service of AIS. This is a non-blocking operation.\n"
+" The name of the lock can be anything. The first time a named lock gets\n"
+"used, it will be automatically created and maintained amongst the cluster.\n"
+" The result of this function will be one of the following:\n"
+" SUCCESS | FAILURE | DEADLOCK\n"
+DEADLOCK_DESC
+"",
+ .syntax = "DLOCK_TRYWRLOCK(<lock_name>)",
+ .read = handle_trywrlock,
+};
+
+static struct ast_custom_function dlock_unlock = {
+ .name = "DLOCK_UNLOCK",
+ .synopsis = "Unlock a distributed lock",
+ .desc =
+" This function will unlock a currently held distributed lock. This should\n"
+"be used regardless of the lock was read or write locked. The result of\n"
+"this funtion will be one of the following:\n"
+" SUCCESS | TIMEOUT | FAILURE\n"
+"",
+ .syntax = "DLOCK_UNLOCK(<lock_name>)",
+ .read = handle_unlock,
+};
+
+int ast_ais_lck_load_module(void)
+{
+ SaAisErrorT ais_res;
+ int res;
+
+ ais_res = saLckInitialize(&lck_handle, &lck_callbacks, &ais_version);
+ if (ais_res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Could not initialize distributed locking service: %s\n",
+ ais_err2str(ais_res));
+ return -1;
+ }
+
+ res = ast_custom_function_register(&dlock_rdlock);
+ res |= ast_custom_function_register(&dlock_wrlock);
+ res |= ast_custom_function_register(&dlock_tryrdlock);
+ res |= ast_custom_function_register(&dlock_trywrlock);
+ res |= ast_custom_function_register(&dlock_unlock);
+
+ return res;
+}
+
+int ast_ais_lck_unload_module(void)
+{
+ SaAisErrorT ais_res;
+ int res = 0;
+
+ ast_custom_function_unregister(&dlock_rdlock);
+ ast_custom_function_unregister(&dlock_wrlock);
+ ast_custom_function_unregister(&dlock_tryrdlock);
+ ast_custom_function_unregister(&dlock_trywrlock);
+ ast_custom_function_unregister(&dlock_unlock);
+
+ ais_res = saLckFinalize(lck_handle);
+ if (ais_res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Problem stopping distributed locking service: %s\n",
+ ais_err2str(ais_res));
+ res = -1;
+ }
+
+ return res;
+}