diff options
Diffstat (limited to 'include/asterisk')
-rw-r--r-- | include/asterisk/autoconfig.h.in | 3 | ||||
-rw-r--r-- | include/asterisk/sem.h | 157 | ||||
-rw-r--r-- | include/asterisk/stasis.h | 33 | ||||
-rw-r--r-- | include/asterisk/stasis_internal.h | 2 | ||||
-rw-r--r-- | include/asterisk/stasis_message_router.h | 16 | ||||
-rw-r--r-- | include/asterisk/taskprocessor.h | 39 | ||||
-rw-r--r-- | include/asterisk/vector.h | 193 |
7 files changed, 415 insertions, 28 deletions
diff --git a/include/asterisk/autoconfig.h.in b/include/asterisk/autoconfig.h.in index 87a769ed0..559b69af1 100644 --- a/include/asterisk/autoconfig.h.in +++ b/include/asterisk/autoconfig.h.in @@ -29,6 +29,9 @@ /* Define to 1 if using `alloca.c'. */ #undef C_ALLOCA +/* Define to 1 if anonymous semaphores work. */ +#undef HAS_WORKING_SEMAPHORE + /* Define to 1 if you have the `acos' function. */ #undef HAVE_ACOS diff --git a/include/asterisk/sem.h b/include/asterisk/sem.h new file mode 100644 index 000000000..8f6356c01 --- /dev/null +++ b/include/asterisk/sem.h @@ -0,0 +1,157 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef ASTERISK_SEMAPHORE_H +#define ASTERISK_SEMAPHORE_H + +/*! + * \file Asterisk semaphore API + * + * This API is a thin wrapper around the POSIX semaphore API (when available), + * so see the POSIX documentation for further details. + */ + +#ifdef HAS_WORKING_SEMAPHORE +/* Working semaphore implementation detected */ + +#include <semaphore.h> + +struct ast_sem { + sem_t real_sem; +}; + +#define AST_SEM_VALUE_MAX SEM_VALUE_MAX + +/* These are thin wrappers; might as well inline them */ + +static force_inline int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value) +{ + return sem_init(&sem->real_sem, pshared, value); +} + +static force_inline int ast_sem_destroy(struct ast_sem *sem) +{ + return sem_destroy(&sem->real_sem); +} + +static force_inline int ast_sem_post(struct ast_sem *sem) +{ + return sem_post(&sem->real_sem); +} + +static force_inline int ast_sem_wait(struct ast_sem *sem) +{ + return sem_wait(&sem->real_sem); +} + +static force_inline int ast_sem_getvalue(struct ast_sem *sem, int *sval) +{ + return sem_getvalue(&sem->real_sem, sval); +} + +#else +/* Unnamed semaphores don't work. Rolling our own, I guess... */ + +#include "asterisk/lock.h" + +#include <limits.h> + +struct ast_sem { + /*! Current count of this semaphore */ + int count; + /*! Number of threads currently waiting for this semaphore */ + int waiters; + /*! Mutual exclusion */ + ast_mutex_t mutex; + /*! Condition for singalling waiters */ + ast_cond_t cond; +}; + +#define AST_SEM_VALUE_MAX INT_MAX + +/*! + * \brief Initialize a semaphore. + * + * \param sem Semaphore to initialize. + * \param pshared Pass true (nonzero) to share this thread between processes. + * Not be supported on all platforms, so be wary! + * But leave the parameter, to be compatible with the POSIX ABI + * in case we need to add support in the future. + * \param value Initial value of the semaphore. + * + * \return 0 on success. + * \return -1 on error, errno set to indicate error. + */ +int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value); + +/*! + * \brief Destroy a semaphore. + * + * Destroying a semaphore that other threads are currently blocked on produces + * undefined behavior. + * + * \param sem Semaphore to destroy. + * + * \return 0 on success. + * \return -1 on error, errno set to indicate error. + */ +int ast_sem_destroy(struct ast_sem *sem); + +/*! + * \brief Increments the semaphore, unblocking a waiter if necessary. + * + * \param sem Semaphore to increment. + * + * \return 0 on success. + * \return -1 on error, errno set to indicate error. + */ +int ast_sem_post(struct ast_sem *sem); + +/*! + * \brief Decrements the semaphore. + * + * If the semaphore's current value is zero, this function blocks until another + * thread posts (ast_sem_post()) to the semaphore (or is interrupted by a signal + * handler, which sets errno to EINTR). + * + * \param sem Semaphore to decrement. + * + * \return 0 on success. + * \return -1 on error, errno set to indicate error. + */ +int ast_sem_wait(struct ast_sem *sem); + +/*! + * \brief Gets the current value of the semaphore. + * + * If threads are blocked on this semaphore, POSIX allows the return value to be + * either 0 or a negative number whose absolute value is the number of threads + * blocked. Don't assume that it will give you one or the other; Asterisk has + * been ported to just about everything. + * + * \param sem Semaphore to query. + * \param[out] sval Output value. + * + * \return 0 on success. + * \return -1 on error, errno set to indicate error. + */ +int ast_sem_getvalue(struct ast_sem *sem, int *sval); + +#endif + +#endif /* ASTERISK_SEMAPHORE_H */ diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 1a3dae00f..529aa12bb 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -348,18 +348,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic); void stasis_publish(struct stasis_topic *topic, struct stasis_message *message); /*! - * \brief Publish a message from a specified topic to all the subscribers of a - * possibly different topic. - * \param topic Topic to publish message to. - * \param topic Original topic message was from. - * \param message Message - * \since 12 - */ -void stasis_forward_message(struct stasis_topic *topic, - struct stasis_topic *publisher_topic, - struct stasis_message *message); - -/*! * \brief Wait for all pending messages on a given topic to be processed. * \param topic Topic to await pending messages on. * \return 0 on success. @@ -381,11 +369,10 @@ struct stasis_subscription; /*! * \brief Callback function type for Stasis subscriptions. * \param data Data field provided with subscription. - * \param topic Topic to which the message was published. * \param message Published message. * \since 12 */ -typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); +typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message); /*! * \brief Create a subscription. @@ -464,6 +451,8 @@ int stasis_subscription_is_done(struct stasis_subscription *subscription); struct stasis_subscription *stasis_unsubscribe_and_join( struct stasis_subscription *subscription); +struct stasis_forward; + /*! * \brief Create a subscription which forwards all messages from one topic to * another. @@ -477,9 +466,11 @@ struct stasis_subscription *stasis_unsubscribe_and_join( * \return \c NULL on error. * \since 12 */ -struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, +struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic); +struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward); + /*! * \brief Get the unique ID for the subscription. * @@ -579,8 +570,6 @@ struct stasis_message_type *stasis_cache_update_type(void); * \since 12 */ struct stasis_cache_update { - /*! \brief Topic that published \c new_snapshot */ - struct stasis_topic *topic; /*! \brief Convenience reference to snapshot type */ struct stasis_message_type *type; /*! \brief Old value from the cache */ @@ -884,16 +873,6 @@ int stasis_config_init(void); */ int stasis_wait_init(void); -struct ast_threadpool_options; - -/*! - * \internal - * \brief Retrieves the Stasis threadpool configuration. - * \param[out] threadpool_options Filled with Stasis threadpool options. - */ -void stasis_config_get_threadpool_options( - struct ast_threadpool_options *threadpool_options); - /*! @} */ /*! diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h index 67ab88ff0..01e581242 100644 --- a/include/asterisk/stasis_internal.h +++ b/include/asterisk/stasis_internal.h @@ -62,7 +62,7 @@ struct stasis_message; */ struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, - void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message), + void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message), void *data, int needs_mailbox); diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index b14868b4a..3209adb16 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router); * updates for types not handled by routes added with * stasis_message_router_add_cache_update(). * + * Adding multiple routes for the same message type results in undefined + * behavior. + * * \param router Router to add the route to. * \param message_type Type of message to route. * \param callback Callback to forard messages of \a message_type to. @@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router, * These are distinct from regular routes, so one could have both a regular * route and a cache route for the same \a message_type. * + * Adding multiple routes for the same message type results in undefined + * behavior. + * * \param router Router to add the route to. * \param message_type Subtype of cache update to route. * \param callback Callback to forard messages of \a message_type to. @@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router, /*! * \brief Remove a route from a message router. * + * If a route is removed from another thread, there is no notification that + * all messages using this route have been processed. This typically means that + * the associated \c data pointer for this route must be kept until the + * route itself is disposed of. + * * \param router Router to remove the route from. * \param message_type Type of message to route. * @@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router, /*! * \brief Remove a cache route from a message router. * + * If a route is removed from another thread, there is no notification that + * all messages using this route have been processed. This typically means that + * the associated \c data pointer for this route must be kept until the + * route itself is disposed of. + * * \param router Router to remove the route from. * \param message_type Type of message to route. * diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 219166305..ab523290c 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -109,6 +109,7 @@ struct ast_taskprocessor_listener_callbacks { * \param listener The listener */ void (*shutdown)(struct ast_taskprocessor_listener *listener); + void (*dtor)(struct ast_taskprocessor_listener *listener); }; /*! @@ -175,6 +176,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener); /*! + * \brief Sets the local data associated with a taskprocessor. + * + * \since 12.0.0 + * + * See ast_taskprocessor_push_local(). + * + * \param tps Task processor. + * \param local_data Local data to associate with \a tps. + */ +void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data); + +/*! * \brief Unreference the specified taskprocessor and its reference count will decrement. * * Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy @@ -196,6 +209,32 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps); */ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap); +/*! \brief Local data parameter */ +struct ast_taskprocessor_local { + /*! Local data, associated with the taskprocessor. */ + void *local_data; + /*! Data pointer passed with this task. */ + void *data; +}; + +/*! + * \brief Push a task into the specified taskprocessor queue and signal the + * taskprocessor thread. + * + * The callback receives a \ref ast_taskprocessor_local struct, which contains + * both the provided \a datap pointer, and any local data set on the + * taskprocessor with ast_taskprocessor_set_local(). + * + * \param tps The taskprocessor structure + * \param task_exe The task handling function to push into the taskprocessor queue + * \param datap The data to be used by the task handling function + * \retval 0 success + * \retval -1 failure + * \since 12.0.0 + */ +int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, + int (*task_exe)(struct ast_taskprocessor_local *local), void *datap); + /*! * \brief Pop a task off the taskprocessor and execute it. * diff --git a/include/asterisk/vector.h b/include/asterisk/vector.h new file mode 100644 index 000000000..f5d3e9a14 --- /dev/null +++ b/include/asterisk/vector.h @@ -0,0 +1,193 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_VECTOR_H +#define _ASTERISK_VECTOR_H + +/*! \file + * + * \brief Vector container support. + * + * A vector is a variable length array, with properties that can be useful when + * order doesn't matter. + * - Appends are asymptotically constant time. + * - Unordered removes are constant time. + * - Search is linear time + * + * \author David M. Lee, II <dlee@digium.com> + * \since 12 + */ + +/*! \brief Define a vector structure */ +#define ast_vector(type) \ + struct { \ + type *elems; \ + size_t max; \ + size_t current; \ + } + +/*! + * \brief Initialize a vector + * + * If \a size is 0, then no space will be allocated until the vector is + * appended to. + * + * \param vec Vector to initialize. + * \param size Initial size of the vector. + * + * \return 0 on success. + * \return Non-zero on failure. + */ +#define ast_vector_init(vec, size) ({ \ + size_t __size = (size); \ + size_t alloc_size = __size * sizeof(*(vec).elems); \ + (vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL; \ + (vec).current = 0; \ + if ((vec).elems) { \ + (vec).max = __size; \ + } else { \ + (vec).max = 0; \ + } \ + alloc_size == 0 || (vec).elems != NULL ? 0 : -1; \ +}) + +/*! + * \brief Deallocates this vector. + * + * If any code to free the elements of this vector need to be run, that should + * be done prior to this call. + * + * \param vec Vector to deallocate. + */ +#define ast_vector_free(vec) do { \ + ast_free((vec).elems); \ + (vec).elems = NULL; \ + (vec).max = 0; \ + (vec).current = 0; \ +} while (0) + +/*! + * \brief Append an element to a vector, growing the vector if needed. + * + * \param vec Vector to append to. + * \param elem Element to append. + * + * \return 0 on success. + * \return Non-zero on failure. + */ +#define ast_vector_append(vec, elem) ({ \ + int res = 0; \ + \ + if ((vec).current + 1 > (vec).max) { \ + size_t new_max = (vec).max ? 2 * (vec).max : 1; \ + typeof((vec).elems) new_elems = ast_realloc( \ + (vec).elems, new_max * sizeof(*new_elems)); \ + if (new_elems) { \ + (vec).elems = new_elems; \ + (vec).max = new_max; \ + } else { \ + res = -1; \ + } \ + } \ + \ + if (res == 0) { \ + (vec).elems[(vec).current++] = (elem); \ + } \ + res; \ +}) + +/*! + * \brief Remove an element from a vector by index. + * + * Note that elements in the vector may be reordered, so that the remove can + * happen in constant time. + * + * \param vec Vector to remove from. + * \param idx Index of the element to remove. + * \return The element that was removed. + */ +#define ast_vector_remove_unordered(vec, idx) ({ \ + typeof((vec).elems[0]) res; \ + size_t __idx = (idx); \ + ast_assert(__idx < (vec).current); \ + res = (vec).elems[__idx]; \ + (vec).elems[__idx] = (vec).elems[--(vec).current]; \ + res; \ +}) + + +/*! + * \brief Remove an element from a vector that matches the given comparison + * + * \param vec Vector to remove from. + * \param value Value to pass into comparator. + * \param cmp Comparator function/macros (called as \c cmp(elem, value)) + * \return 0 if element was removed. + * \return Non-zero if element was not in the vector. + */ +#define ast_vector_remove_cmp_unordered(vec, value, cmp) ({ \ + int res = -1; \ + size_t idx; \ + typeof(value) __value = (value); \ + for (idx = 0; idx < (vec).current; ++idx) { \ + if (cmp((vec).elems[idx], __value)) { \ + ast_vector_remove_unordered((vec), idx); \ + res = 0; \ + break; \ + } \ + } \ + res; \ +}) + +/*! \brief Default comparator for ast_vector_remove_elem_unordered() */ +#define AST_VECTOR_DEFAULT_CMP(a, b) ((a) == (b)) + +/*! + * \brief Remove an element from a vector. + * + * \param vec Vector to remove from. + * \param elem Element to remove + * \return 0 if element was removed. + * \return Non-zero if element was not in the vector. + */ +#define ast_vector_remove_elem_unordered(vec, elem) ({ \ + ast_vector_remove_cmp_unordered((vec), (elem), \ + AST_VECTOR_DEFAULT_CMP); \ +}) + +/*! + * \brief Get the number of elements in a vector. + * + * \param vec Vector to query. + * \return Number of elements in the vector. + */ +#define ast_vector_size(vec) (vec).current + +/*! + * \brief Get an element from a vector. + * + * \param vec Vector to query. + * \param idx Index of the element to get. + */ +#define ast_vector_get(vec, idx) ({ \ + size_t __idx = (idx); \ + ast_assert(__idx < (vec).current); \ + (vec).elems[__idx]; \ +}) + +#endif /* _ASTERISK_VECTOR_H */ |