summaryrefslogtreecommitdiff
path: root/include/asterisk
diff options
context:
space:
mode:
Diffstat (limited to 'include/asterisk')
-rw-r--r--include/asterisk/autoconfig.h.in3
-rw-r--r--include/asterisk/sem.h157
-rw-r--r--include/asterisk/stasis.h33
-rw-r--r--include/asterisk/stasis_internal.h2
-rw-r--r--include/asterisk/stasis_message_router.h16
-rw-r--r--include/asterisk/taskprocessor.h39
-rw-r--r--include/asterisk/vector.h193
7 files changed, 415 insertions, 28 deletions
diff --git a/include/asterisk/autoconfig.h.in b/include/asterisk/autoconfig.h.in
index 87a769ed0..559b69af1 100644
--- a/include/asterisk/autoconfig.h.in
+++ b/include/asterisk/autoconfig.h.in
@@ -29,6 +29,9 @@
/* Define to 1 if using `alloca.c'. */
#undef C_ALLOCA
+/* Define to 1 if anonymous semaphores work. */
+#undef HAS_WORKING_SEMAPHORE
+
/* Define to 1 if you have the `acos' function. */
#undef HAVE_ACOS
diff --git a/include/asterisk/sem.h b/include/asterisk/sem.h
new file mode 100644
index 000000000..8f6356c01
--- /dev/null
+++ b/include/asterisk/sem.h
@@ -0,0 +1,157 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef ASTERISK_SEMAPHORE_H
+#define ASTERISK_SEMAPHORE_H
+
+/*!
+ * \file Asterisk semaphore API
+ *
+ * This API is a thin wrapper around the POSIX semaphore API (when available),
+ * so see the POSIX documentation for further details.
+ */
+
+#ifdef HAS_WORKING_SEMAPHORE
+/* Working semaphore implementation detected */
+
+#include <semaphore.h>
+
+struct ast_sem {
+ sem_t real_sem;
+};
+
+#define AST_SEM_VALUE_MAX SEM_VALUE_MAX
+
+/* These are thin wrappers; might as well inline them */
+
+static force_inline int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
+{
+ return sem_init(&sem->real_sem, pshared, value);
+}
+
+static force_inline int ast_sem_destroy(struct ast_sem *sem)
+{
+ return sem_destroy(&sem->real_sem);
+}
+
+static force_inline int ast_sem_post(struct ast_sem *sem)
+{
+ return sem_post(&sem->real_sem);
+}
+
+static force_inline int ast_sem_wait(struct ast_sem *sem)
+{
+ return sem_wait(&sem->real_sem);
+}
+
+static force_inline int ast_sem_getvalue(struct ast_sem *sem, int *sval)
+{
+ return sem_getvalue(&sem->real_sem, sval);
+}
+
+#else
+/* Unnamed semaphores don't work. Rolling our own, I guess... */
+
+#include "asterisk/lock.h"
+
+#include <limits.h>
+
+struct ast_sem {
+ /*! Current count of this semaphore */
+ int count;
+ /*! Number of threads currently waiting for this semaphore */
+ int waiters;
+ /*! Mutual exclusion */
+ ast_mutex_t mutex;
+ /*! Condition for singalling waiters */
+ ast_cond_t cond;
+};
+
+#define AST_SEM_VALUE_MAX INT_MAX
+
+/*!
+ * \brief Initialize a semaphore.
+ *
+ * \param sem Semaphore to initialize.
+ * \param pshared Pass true (nonzero) to share this thread between processes.
+ * Not be supported on all platforms, so be wary!
+ * But leave the parameter, to be compatible with the POSIX ABI
+ * in case we need to add support in the future.
+ * \param value Initial value of the semaphore.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value);
+
+/*!
+ * \brief Destroy a semaphore.
+ *
+ * Destroying a semaphore that other threads are currently blocked on produces
+ * undefined behavior.
+ *
+ * \param sem Semaphore to destroy.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_destroy(struct ast_sem *sem);
+
+/*!
+ * \brief Increments the semaphore, unblocking a waiter if necessary.
+ *
+ * \param sem Semaphore to increment.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_post(struct ast_sem *sem);
+
+/*!
+ * \brief Decrements the semaphore.
+ *
+ * If the semaphore's current value is zero, this function blocks until another
+ * thread posts (ast_sem_post()) to the semaphore (or is interrupted by a signal
+ * handler, which sets errno to EINTR).
+ *
+ * \param sem Semaphore to decrement.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_wait(struct ast_sem *sem);
+
+/*!
+ * \brief Gets the current value of the semaphore.
+ *
+ * If threads are blocked on this semaphore, POSIX allows the return value to be
+ * either 0 or a negative number whose absolute value is the number of threads
+ * blocked. Don't assume that it will give you one or the other; Asterisk has
+ * been ported to just about everything.
+ *
+ * \param sem Semaphore to query.
+ * \param[out] sval Output value.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_getvalue(struct ast_sem *sem, int *sval);
+
+#endif
+
+#endif /* ASTERISK_SEMAPHORE_H */
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 1a3dae00f..529aa12bb 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -348,18 +348,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic);
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
/*!
- * \brief Publish a message from a specified topic to all the subscribers of a
- * possibly different topic.
- * \param topic Topic to publish message to.
- * \param topic Original topic message was from.
- * \param message Message
- * \since 12
- */
-void stasis_forward_message(struct stasis_topic *topic,
- struct stasis_topic *publisher_topic,
- struct stasis_message *message);
-
-/*!
* \brief Wait for all pending messages on a given topic to be processed.
* \param topic Topic to await pending messages on.
* \return 0 on success.
@@ -381,11 +369,10 @@ struct stasis_subscription;
/*!
* \brief Callback function type for Stasis subscriptions.
* \param data Data field provided with subscription.
- * \param topic Topic to which the message was published.
* \param message Published message.
* \since 12
*/
-typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
/*!
* \brief Create a subscription.
@@ -464,6 +451,8 @@ int stasis_subscription_is_done(struct stasis_subscription *subscription);
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription);
+struct stasis_forward;
+
/*!
* \brief Create a subscription which forwards all messages from one topic to
* another.
@@ -477,9 +466,11 @@ struct stasis_subscription *stasis_unsubscribe_and_join(
* \return \c NULL on error.
* \since 12
*/
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic);
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
+
/*!
* \brief Get the unique ID for the subscription.
*
@@ -579,8 +570,6 @@ struct stasis_message_type *stasis_cache_update_type(void);
* \since 12
*/
struct stasis_cache_update {
- /*! \brief Topic that published \c new_snapshot */
- struct stasis_topic *topic;
/*! \brief Convenience reference to snapshot type */
struct stasis_message_type *type;
/*! \brief Old value from the cache */
@@ -884,16 +873,6 @@ int stasis_config_init(void);
*/
int stasis_wait_init(void);
-struct ast_threadpool_options;
-
-/*!
- * \internal
- * \brief Retrieves the Stasis threadpool configuration.
- * \param[out] threadpool_options Filled with Stasis threadpool options.
- */
-void stasis_config_get_threadpool_options(
- struct ast_threadpool_options *threadpool_options);
-
/*! @} */
/*!
diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h
index 67ab88ff0..01e581242 100644
--- a/include/asterisk/stasis_internal.h
+++ b/include/asterisk/stasis_internal.h
@@ -62,7 +62,7 @@ struct stasis_message;
*/
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
- void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message),
+ void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message),
void *data,
int needs_mailbox);
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index b14868b4a..3209adb16 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router);
* updates for types not handled by routes added with
* stasis_message_router_add_cache_update().
*
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
* \param router Router to add the route to.
* \param message_type Type of message to route.
* \param callback Callback to forard messages of \a message_type to.
@@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router,
* These are distinct from regular routes, so one could have both a regular
* route and a cache route for the same \a message_type.
*
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
* \param router Router to add the route to.
* \param message_type Subtype of cache update to route.
* \param callback Callback to forard messages of \a message_type to.
@@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
/*!
* \brief Remove a route from a message router.
*
+ * If a route is removed from another thread, there is no notification that
+ * all messages using this route have been processed. This typically means that
+ * the associated \c data pointer for this route must be kept until the
+ * route itself is disposed of.
+ *
* \param router Router to remove the route from.
* \param message_type Type of message to route.
*
@@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router,
/*!
* \brief Remove a cache route from a message router.
*
+ * If a route is removed from another thread, there is no notification that
+ * all messages using this route have been processed. This typically means that
+ * the associated \c data pointer for this route must be kept until the
+ * route itself is disposed of.
+ *
* \param router Router to remove the route from.
* \param message_type Type of message to route.
*
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index 219166305..ab523290c 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -109,6 +109,7 @@ struct ast_taskprocessor_listener_callbacks {
* \param listener The listener
*/
void (*shutdown)(struct ast_taskprocessor_listener *listener);
+ void (*dtor)(struct ast_taskprocessor_listener *listener);
};
/*!
@@ -175,6 +176,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
/*!
+ * \brief Sets the local data associated with a taskprocessor.
+ *
+ * \since 12.0.0
+ *
+ * See ast_taskprocessor_push_local().
+ *
+ * \param tps Task processor.
+ * \param local_data Local data to associate with \a tps.
+ */
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data);
+
+/*!
* \brief Unreference the specified taskprocessor and its reference count will decrement.
*
* Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy
@@ -196,6 +209,32 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
*/
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+/*! \brief Local data parameter */
+struct ast_taskprocessor_local {
+ /*! Local data, associated with the taskprocessor. */
+ void *local_data;
+ /*! Data pointer passed with this task. */
+ void *data;
+};
+
+/*!
+ * \brief Push a task into the specified taskprocessor queue and signal the
+ * taskprocessor thread.
+ *
+ * The callback receives a \ref ast_taskprocessor_local struct, which contains
+ * both the provided \a datap pointer, and any local data set on the
+ * taskprocessor with ast_taskprocessor_set_local().
+ *
+ * \param tps The taskprocessor structure
+ * \param task_exe The task handling function to push into the taskprocessor queue
+ * \param datap The data to be used by the task handling function
+ * \retval 0 success
+ * \retval -1 failure
+ * \since 12.0.0
+ */
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
+ int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+
/*!
* \brief Pop a task off the taskprocessor and execute it.
*
diff --git a/include/asterisk/vector.h b/include/asterisk/vector.h
new file mode 100644
index 000000000..f5d3e9a14
--- /dev/null
+++ b/include/asterisk/vector.h
@@ -0,0 +1,193 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_VECTOR_H
+#define _ASTERISK_VECTOR_H
+
+/*! \file
+ *
+ * \brief Vector container support.
+ *
+ * A vector is a variable length array, with properties that can be useful when
+ * order doesn't matter.
+ * - Appends are asymptotically constant time.
+ * - Unordered removes are constant time.
+ * - Search is linear time
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ * \since 12
+ */
+
+/*! \brief Define a vector structure */
+#define ast_vector(type) \
+ struct { \
+ type *elems; \
+ size_t max; \
+ size_t current; \
+ }
+
+/*!
+ * \brief Initialize a vector
+ *
+ * If \a size is 0, then no space will be allocated until the vector is
+ * appended to.
+ *
+ * \param vec Vector to initialize.
+ * \param size Initial size of the vector.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_init(vec, size) ({ \
+ size_t __size = (size); \
+ size_t alloc_size = __size * sizeof(*(vec).elems); \
+ (vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL; \
+ (vec).current = 0; \
+ if ((vec).elems) { \
+ (vec).max = __size; \
+ } else { \
+ (vec).max = 0; \
+ } \
+ alloc_size == 0 || (vec).elems != NULL ? 0 : -1; \
+})
+
+/*!
+ * \brief Deallocates this vector.
+ *
+ * If any code to free the elements of this vector need to be run, that should
+ * be done prior to this call.
+ *
+ * \param vec Vector to deallocate.
+ */
+#define ast_vector_free(vec) do { \
+ ast_free((vec).elems); \
+ (vec).elems = NULL; \
+ (vec).max = 0; \
+ (vec).current = 0; \
+} while (0)
+
+/*!
+ * \brief Append an element to a vector, growing the vector if needed.
+ *
+ * \param vec Vector to append to.
+ * \param elem Element to append.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_append(vec, elem) ({ \
+ int res = 0; \
+ \
+ if ((vec).current + 1 > (vec).max) { \
+ size_t new_max = (vec).max ? 2 * (vec).max : 1; \
+ typeof((vec).elems) new_elems = ast_realloc( \
+ (vec).elems, new_max * sizeof(*new_elems)); \
+ if (new_elems) { \
+ (vec).elems = new_elems; \
+ (vec).max = new_max; \
+ } else { \
+ res = -1; \
+ } \
+ } \
+ \
+ if (res == 0) { \
+ (vec).elems[(vec).current++] = (elem); \
+ } \
+ res; \
+})
+
+/*!
+ * \brief Remove an element from a vector by index.
+ *
+ * Note that elements in the vector may be reordered, so that the remove can
+ * happen in constant time.
+ *
+ * \param vec Vector to remove from.
+ * \param idx Index of the element to remove.
+ * \return The element that was removed.
+ */
+#define ast_vector_remove_unordered(vec, idx) ({ \
+ typeof((vec).elems[0]) res; \
+ size_t __idx = (idx); \
+ ast_assert(__idx < (vec).current); \
+ res = (vec).elems[__idx]; \
+ (vec).elems[__idx] = (vec).elems[--(vec).current]; \
+ res; \
+})
+
+
+/*!
+ * \brief Remove an element from a vector that matches the given comparison
+ *
+ * \param vec Vector to remove from.
+ * \param value Value to pass into comparator.
+ * \param cmp Comparator function/macros (called as \c cmp(elem, value))
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_cmp_unordered(vec, value, cmp) ({ \
+ int res = -1; \
+ size_t idx; \
+ typeof(value) __value = (value); \
+ for (idx = 0; idx < (vec).current; ++idx) { \
+ if (cmp((vec).elems[idx], __value)) { \
+ ast_vector_remove_unordered((vec), idx); \
+ res = 0; \
+ break; \
+ } \
+ } \
+ res; \
+})
+
+/*! \brief Default comparator for ast_vector_remove_elem_unordered() */
+#define AST_VECTOR_DEFAULT_CMP(a, b) ((a) == (b))
+
+/*!
+ * \brief Remove an element from a vector.
+ *
+ * \param vec Vector to remove from.
+ * \param elem Element to remove
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_elem_unordered(vec, elem) ({ \
+ ast_vector_remove_cmp_unordered((vec), (elem), \
+ AST_VECTOR_DEFAULT_CMP); \
+})
+
+/*!
+ * \brief Get the number of elements in a vector.
+ *
+ * \param vec Vector to query.
+ * \return Number of elements in the vector.
+ */
+#define ast_vector_size(vec) (vec).current
+
+/*!
+ * \brief Get an element from a vector.
+ *
+ * \param vec Vector to query.
+ * \param idx Index of the element to get.
+ */
+#define ast_vector_get(vec, idx) ({ \
+ size_t __idx = (idx); \
+ ast_assert(__idx < (vec).current); \
+ (vec).elems[__idx]; \
+})
+
+#endif /* _ASTERISK_VECTOR_H */