summaryrefslogtreecommitdiff
path: root/res
diff options
context:
space:
mode:
Diffstat (limited to 'res')
-rw-r--r--res/ari/resource_channels.h4
-rw-r--r--res/res_pjsip.c51
-rw-r--r--res/res_pjsip/config_global.c68
-rw-r--r--res/res_pjsip_mwi.c138
-rw-r--r--res/res_pjsip_outbound_publish.c131
-rw-r--r--res/res_sorcery_config.c4
-rw-r--r--res/res_sorcery_memory.c4
7 files changed, 319 insertions, 81 deletions
diff --git a/res/ari/resource_channels.h b/res/ari/resource_channels.h
index 5bb6f7f1e..dd20415ff 100644
--- a/res/ari/resource_channels.h
+++ b/res/ari/resource_channels.h
@@ -78,7 +78,7 @@ struct ast_ari_channels_originate_args {
const char *other_channel_id;
/*! The unique id of the channel which is originating this one. */
const char *originator;
- /*! The format name capability list to use if originator is not specified. Ex. "ulaw,slin16". Format names an be found with "core show codecs". */
+ /*! The format name capability list to use if originator is not specified. Ex. "ulaw,slin16". Format names can be found with "core show codecs". */
const char *formats;
};
/*!
@@ -143,7 +143,7 @@ struct ast_ari_channels_originate_with_id_args {
const char *other_channel_id;
/*! The unique id of the channel which is originating this one. */
const char *originator;
- /*! The format name capability list to use if originator is not specified. Ex. "ulaw,slin16". Format names an be found with "core show codecs". */
+ /*! The format name capability list to use if originator is not specified. Ex. "ulaw,slin16". Format names can be found with "core show codecs". */
const char *formats;
};
/*!
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 3870e9f8d..aafb3a211 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1496,6 +1496,48 @@
set to this value if there is no better option (such as auth/realm) to be
used.</synopsis>
</configOption>
+ <configOption name="mwi_tps_queue_high" default="500">
+ <synopsis>MWI taskprocessor high water alert trigger level.</synopsis>
+ <description>
+ <para>On a heavily loaded system you may need to adjust the
+ taskprocessor queue limits. If any taskprocessor queue size
+ reaches its high water level then pjsip will stop processing
+ new requests until the alert is cleared. The alert clears
+ when all alerting taskprocessor queues have dropped to their
+ low water clear level.
+ </para>
+ </description>
+ </configOption>
+ <configOption name="mwi_tps_queue_low" default="-1">
+ <synopsis>MWI taskprocessor low water clear alert level.</synopsis>
+ <description>
+ <para>On a heavily loaded system you may need to adjust the
+ taskprocessor queue limits. If any taskprocessor queue size
+ reaches its high water level then pjsip will stop processing
+ new requests until the alert is cleared. The alert clears
+ when all alerting taskprocessor queues have dropped to their
+ low water clear level.
+ </para>
+ <note><para>Set to -1 for the low water level to be 90% of
+ the high water level.</para></note>
+ </description>
+ </configOption>
+ <configOption name="mwi_disable_initial_unsolicited" default="no">
+ <synopsis>Enable/Disable sending unsolicited MWI to all endpoints on startup.</synopsis>
+ <description>
+ <para>When the initial unsolicited MWI notification are
+ enabled on startup then the initial notifications
+ get sent at startup. If you have a lot of endpoints
+ (thousands) that use unsolicited MWI then you may
+ want to consider disabling the initial startup
+ notifications.
+ </para>
+ <para>When the initial unsolicited MWI notifications are
+ disabled on startup then the notifications will start
+ on the endpoint's next contact update.
+ </para>
+ </description>
+ </configOption>
</configObject>
</configFile>
</configInfo>
@@ -2138,13 +2180,13 @@ static struct ast_threadpool *sip_threadpool;
static pj_sockaddr host_ip_ipv4;
/*! Local host address for IPv4 (string form) */
-static char host_ip_ipv4_string[PJ_INET6_ADDRSTRLEN + 2];
+static char host_ip_ipv4_string[PJ_INET6_ADDRSTRLEN];
/*! Local host address for IPv6 */
static pj_sockaddr host_ip_ipv6;
/*! Local host address for IPv6 (string form) */
-static char host_ip_ipv6_string[PJ_INET6_ADDRSTRLEN + 2];
+static char host_ip_ipv6_string[PJ_INET6_ADDRSTRLEN];
static int register_service_noref(void *data)
{
@@ -4229,6 +4271,7 @@ static int unload_pjsip(void *data)
static int load_pjsip(void)
{
+ const unsigned int flags = 0; /* no port, no brackets */
pj_status_t status;
/* The third parameter is just copied from
@@ -4253,12 +4296,12 @@ static int load_pjsip(void)
}
if (!pj_gethostip(pj_AF_INET(), &host_ip_ipv4)) {
- pj_sockaddr_print(&host_ip_ipv4, host_ip_ipv4_string, sizeof(host_ip_ipv4_string), 2);
+ pj_sockaddr_print(&host_ip_ipv4, host_ip_ipv4_string, sizeof(host_ip_ipv4_string), flags);
ast_verb(3, "Local IPv4 address determined to be: %s\n", host_ip_ipv4_string);
}
if (!pj_gethostip(pj_AF_INET6(), &host_ip_ipv6)) {
- pj_sockaddr_print(&host_ip_ipv6, host_ip_ipv6_string, sizeof(host_ip_ipv6_string), 2);
+ pj_sockaddr_print(&host_ip_ipv6, host_ip_ipv6_string, sizeof(host_ip_ipv6_string), flags);
ast_verb(3, "Local IPv6 address determined to be: %s\n", host_ip_ipv6_string);
}
diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c
index 6bb688804..8a1b0d449 100644
--- a/res/res_pjsip/config_global.c
+++ b/res/res_pjsip/config_global.c
@@ -24,6 +24,7 @@
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/sorcery.h"
+#include "asterisk/taskprocessor.h"
#include "asterisk/ast_version.h"
#include "asterisk/res_pjsip_cli.h"
@@ -43,6 +44,9 @@
#define DEFAULT_UNIDENTIFIED_REQUEST_COUNT 5
#define DEFAULT_UNIDENTIFIED_REQUEST_PERIOD 5
#define DEFAULT_UNIDENTIFIED_REQUEST_PRUNE_INTERVAL 30
+#define DEFAULT_MWI_TPS_QUEUE_HIGH AST_TASKPROCESSOR_HIGH_WATER_LEVEL
+#define DEFAULT_MWI_TPS_QUEUE_LOW -1
+#define DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED 0
static char default_useragent[256];
@@ -79,6 +83,14 @@ struct global_config {
unsigned int unidentified_request_period;
/* Interval at which expired unidentifed requests will be pruned */
unsigned int unidentified_request_prune_interval;
+ struct {
+ /*! Taskprocessor high water alert trigger level */
+ unsigned int tps_queue_high;
+ /*! Taskprocessor low water clear alert level. */
+ int tps_queue_low;
+ /*! Nonzero to disable sending unsolicited mwi to all endpoints on startup */
+ unsigned int disable_initial_unsolicited;
+ } mwi;
};
static void global_destructor(void *obj)
@@ -314,6 +326,53 @@ void ast_sip_get_default_from_user(char *from_user, size_t size)
}
}
+
+unsigned int ast_sip_get_mwi_tps_queue_high(void)
+{
+ unsigned int tps_queue_high;
+ struct global_config *cfg;
+
+ cfg = get_global_cfg();
+ if (!cfg) {
+ return DEFAULT_MWI_TPS_QUEUE_HIGH;
+ }
+
+ tps_queue_high = cfg->mwi.tps_queue_high;
+ ao2_ref(cfg, -1);
+ return tps_queue_high;
+}
+
+int ast_sip_get_mwi_tps_queue_low(void)
+{
+ int tps_queue_low;
+ struct global_config *cfg;
+
+ cfg = get_global_cfg();
+ if (!cfg) {
+ return DEFAULT_MWI_TPS_QUEUE_LOW;
+ }
+
+ tps_queue_low = cfg->mwi.tps_queue_low;
+ ao2_ref(cfg, -1);
+ return tps_queue_low;
+}
+
+unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void)
+{
+ unsigned int disable_initial_unsolicited;
+ struct global_config *cfg;
+
+ cfg = get_global_cfg();
+ if (!cfg) {
+ return DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED;
+ }
+
+ disable_initial_unsolicited = cfg->mwi.disable_initial_unsolicited;
+ ao2_ref(cfg, -1);
+ return disable_initial_unsolicited;
+}
+
+
/*!
* \internal
* \brief Observer to set default global object if none exist.
@@ -450,6 +509,15 @@ int ast_sip_initialize_sorcery_global(void)
OPT_UINT_T, 0, FLDSET(struct global_config, unidentified_request_prune_interval));
ast_sorcery_object_field_register(sorcery, "global", "default_realm", DEFAULT_REALM,
OPT_STRINGFIELD_T, 0, STRFLDSET(struct global_config, default_realm));
+ ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_high",
+ __stringify(DEFAULT_MWI_TPS_QUEUE_HIGH),
+ OPT_UINT_T, 0, FLDSET(struct global_config, mwi.tps_queue_high));
+ ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_low",
+ __stringify(DEFAULT_MWI_TPS_QUEUE_LOW),
+ OPT_INT_T, 0, FLDSET(struct global_config, mwi.tps_queue_low));
+ ast_sorcery_object_field_register(sorcery, "global", "mwi_disable_initial_unsolicited",
+ DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED ? "yes" : "no",
+ OPT_BOOL_T, 1, FLDSET(struct global_config, mwi.disable_initial_unsolicited));
if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
return -1;
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index d86c96c74..9553cd732 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -35,6 +35,7 @@
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
#include "asterisk/sorcery.h"
#include "asterisk/stasis.h"
#include "asterisk/app.h"
@@ -52,6 +53,12 @@ static char *default_voicemail_extension;
#define MWI_DATASTORE "MWI datastore"
+/*! Number of serializers in pool if one not supplied. */
+#define MWI_SERIALIZER_POOL_SIZE 8
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
+
static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
@@ -119,6 +126,117 @@ struct mwi_subscription {
char id[1];
};
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \return Nothing
+ */
+static void mwi_serializer_pool_shutdown(void)
+{
+ int idx;
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
+ mwi_serializer_pool[idx] = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_pool_setup(void)
+{
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+ int idx;
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
+
+ mwi_serializer_pool[idx] = ast_sip_create_serializer_named(tps_name);
+ if (!mwi_serializer_pool[idx]) {
+ mwi_serializer_pool_shutdown();
+ return -1;
+ }
+ }
+ return 0;
+}
+
+/*!
+ * \internal
+ * \brief Pick a mwi serializer from the pool.
+ * \since 13.12.0
+ *
+ * \retval least queue size task processor.
+ */
+static struct ast_taskprocessor *get_mwi_serializer(void)
+{
+ int idx;
+ int pos;
+
+ if (!mwi_serializer_pool[0]) {
+ return NULL;
+ }
+
+ for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
+ pos = idx;
+ }
+ }
+
+ return mwi_serializer_pool[pos];
+}
+
+/*!
+ * \internal
+ * \brief Set taskprocessor alert levels for the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_set_alert_levels(void)
+{
+ int idx;
+ long tps_queue_high;
+ long tps_queue_low;
+
+ if (!mwi_serializer_pool[0]) {
+ return -1;
+ }
+
+ tps_queue_high = ast_sip_get_mwi_tps_queue_high();
+ if (tps_queue_high <= 0) {
+ ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
+ tps_queue_high);
+ tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+ }
+
+ tps_queue_low = ast_sip_get_mwi_tps_queue_low();
+ if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
+ ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
+ tps_queue_low);
+ tps_queue_low = -1;
+ }
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
+ ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
+ idx);
+ }
+ }
+
+ return 0;
+}
+
+
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg);
@@ -945,7 +1063,7 @@ static int send_notify(void *obj, void *arg, int flags)
struct mwi_subscription *mwi_sub = obj;
struct ast_taskprocessor *serializer = mwi_sub->is_solicited
? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
- : NULL;
+ : get_mwi_serializer();
if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
@@ -1063,7 +1181,7 @@ static int send_contact_notify(void *obj, void *arg, int flags)
return 0;
}
- if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) {
+ if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
}
@@ -1149,6 +1267,7 @@ static void global_loaded(const char *object_type)
{
ast_free(default_voicemail_extension);
default_voicemail_extension = ast_sip_get_default_voicemail_extension();
+ mwi_serializer_set_alert_levels();
}
static struct ast_sorcery_observer global_observer = {
@@ -1175,15 +1294,21 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
+ if (mwi_serializer_pool_setup()) {
+ ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
+ }
+
create_mwi_subscriptions();
ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
- if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
- ast_sip_push_task(NULL, send_initial_notify_all, NULL);
- } else {
- stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
+ if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
+ ast_sip_push_task(NULL, send_initial_notify_all, NULL);
+ } else {
+ stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ }
}
return AST_MODULE_LOAD_SUCCESS;
@@ -1193,6 +1318,7 @@ static int unload_module(void)
{
ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
ao2_ref(unsolicited_mwi, -1);
+ mwi_serializer_pool_shutdown();
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sip_unregister_subscription_handler(&mwi_handler);
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c
index ab8a6c9bd..35eedf0d2 100644
--- a/res/res_pjsip_outbound_publish.c
+++ b/res/res_pjsip_outbound_publish.c
@@ -31,6 +31,7 @@
#include "asterisk/res_pjsip_outbound_publish.h"
#include "asterisk/module.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
#include "asterisk/datastore.h"
/*** DOCUMENTATION
@@ -161,6 +162,8 @@ struct ast_sip_outbound_publish_client {
struct ast_sip_outbound_publish *publish;
/*! \brief The name of the transport to be used for the publish */
char *transport_name;
+ /*! \brief Serializer for stuff and things */
+ struct ast_taskprocessor *serializer;
};
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
@@ -171,13 +174,11 @@ struct ast_sip_outbound_publish_state {
char id[0];
};
-/*! \brief Unloading data */
-struct unloading_data {
- int is_unloading;
- int count;
- ast_mutex_t lock;
- ast_cond_t cond;
-} unloading;
+/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
+#define MAX_UNLOAD_TIMEOUT_TIME 35 /* Seconds */
+
+/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
+static struct ast_serializer_shutdown_group *shutdown_group;
/*! \brief Default number of client state container buckets */
#define DEFAULT_STATE_BUCKETS 31
@@ -393,7 +394,7 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand
/* If the publisher client has been started but it is going away stop it */
removed->stop_publishing(state->client);
state->client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
+ if (ast_sip_push_task(state->client->serializer, cancel_refresh_timer_task, ao2_bump(state->client))) {
ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
ast_sorcery_object_get_id(publish));
ao2_ref(state->client, -1);
@@ -614,7 +615,7 @@ fatal:
ast_free(message);
service:
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client))) {
ao2_ref(client, -1);
}
return -1;
@@ -656,7 +657,7 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
AST_LIST_INSERT_TAIL(&client->queue, message, entry);
- res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
+ res = ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client));
if (res) {
ao2_ref(client, -1);
}
@@ -679,16 +680,7 @@ static void sip_outbound_publish_client_destroy(void *obj)
ao2_cleanup(client->datastores);
ao2_cleanup(client->publish);
ast_free(client->transport_name);
-
- /* if unloading the module and all objects have been unpublished
- send the signal to finish unloading */
- if (unloading.is_unloading) {
- ast_mutex_lock(&unloading.lock);
- if (--unloading.count == 0) {
- ast_cond_signal(&unloading.cond);
- }
- ast_mutex_unlock(&unloading.lock);
- }
+ ast_taskprocessor_unreference(client->serializer);
}
static int explicit_publish_destroy(void *data)
@@ -718,7 +710,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
/* If the client was never started, there's nothing to unpublish, so just
* destroy the publication and remove its reference to the client.
*/
- ast_sip_push_task(NULL, explicit_publish_destroy, client);
+ ast_sip_push_task(client->serializer, explicit_publish_destroy, client);
return 0;
}
@@ -728,7 +720,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
}
client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, cancel_refresh_timer_task, ao2_bump(client))) {
ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
ao2_ref(client, -1);
@@ -736,7 +728,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
if (!client->sending) {
- if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, send_unpublish_task, ao2_bump(client))) {
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
ao2_ref(client, -1);
@@ -897,7 +889,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
if (client->sending) {
client->sending = NULL;
- if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (!ast_sip_push_task(client->serializer, send_unpublish_task, ao2_bump(client))) {
return;
}
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
@@ -981,7 +973,7 @@ end:
ast_free(message);
}
} else {
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client))) {
ao2_ref(client, -1);
}
}
@@ -1053,6 +1045,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
const char *id = ast_sorcery_object_get_id(publish);
struct ast_sip_outbound_publish_state *state =
ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
if (!state) {
return NULL;
@@ -1070,6 +1063,17 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
return NULL;
}
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s",
+ ast_sorcery_object_get_id(publish));
+
+ state->client->serializer = ast_sip_create_serializer_group_named(tps_name,
+ shutdown_group);
+ if (!state->client->serializer) {
+ ao2_ref(state, -1);
+ return NULL;
+ }
+
state->client->timer.user_data = state->client;
state->client->timer.cb = sip_outbound_publish_timer_cb;
state->client->transport_name = ast_strdup(publish->transport);
@@ -1082,7 +1086,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
static int initialize_publish_client(struct ast_sip_outbound_publish *publish,
struct ast_sip_outbound_publish_state *state)
{
- if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
+ if (ast_sip_push_task_synchronous(state->client->serializer, sip_outbound_publish_client_alloc, state->client)) {
ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
return -1;
@@ -1219,16 +1223,48 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
}
+
+static int unload_module(void)
+{
+ int remaining;
+
+ ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
+
+ ao2_global_obj_release(current_states);
+
+ /* Wait for publication serializers to get destroyed. */
+ ast_debug(2, "Waiting for publication to complete for unload.\n");
+ remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME);
+ if (remaining) {
+ ast_log(LOG_WARNING, "Unload incomplete. Could not stop %d outbound publications. Try again later.\n",
+ remaining);
+ return -1;
+ }
+
+ ast_debug(2, "Successful shutdown.\n");
+
+ ao2_cleanup(shutdown_group);
+ shutdown_group = NULL;
+
+ return 0;
+}
+
static int load_module(void)
{
CHECK_PJSIP_MODULE_LOADED();
+ shutdown_group = ast_serializer_shutdown_group_alloc();
+ if (!shutdown_group) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
sip_outbound_publish_apply)) {
ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
+ unload_module();
return AST_MODULE_LOAD_DECLINE;
}
@@ -1264,49 +1300,6 @@ static int reload_module(void)
return 0;
}
-static int unload_module(void)
-{
- struct timeval start = ast_tvnow();
- struct timespec end = {
- .tv_sec = start.tv_sec + 10,
- .tv_nsec = start.tv_usec * 1000
- };
- int res = 0;
- struct ao2_container *states = ao2_global_obj_ref(current_states);
-
- if (!states || !(unloading.count = ao2_container_count(states))) {
- return 0;
- }
- ao2_ref(states, -1);
-
- ast_mutex_init(&unloading.lock);
- ast_cond_init(&unloading.cond, NULL);
- ast_mutex_lock(&unloading.lock);
-
- unloading.is_unloading = 1;
- ao2_global_obj_release(current_states);
-
- /* wait for items to unpublish */
- ast_verb(5, "Waiting to complete unpublishing task(s)\n");
- while (unloading.count && !res) {
- res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
- }
- ast_mutex_unlock(&unloading.lock);
-
- ast_mutex_destroy(&unloading.lock);
- ast_cond_destroy(&unloading.cond);
-
- if (res) {
- ast_verb(5, "At least %d items were unable to unpublish "
- "in the allowed time\n", unloading.count);
- } else {
- ast_verb(5, "All items successfully unpublished\n");
- ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
- }
-
- return res;
-}
-
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
.load = load_module,
.reload = reload_module,
diff --git a/res/res_sorcery_config.c b/res/res_sorcery_config.c
index 220b5875f..056f63eaa 100644
--- a/res/res_sorcery_config.c
+++ b/res/res_sorcery_config.c
@@ -210,6 +210,10 @@ static void sorcery_config_retrieve_regex(const struct ast_sorcery *sorcery, voi
.regex = &expression,
};
+ if (ast_strlen_zero(regex)) {
+ regex = ".";
+ }
+
if (!config_objects || regcomp(&expression, regex, REG_EXTENDED | REG_NOSUB)) {
return;
}
diff --git a/res/res_sorcery_memory.c b/res/res_sorcery_memory.c
index e8f603038..b2f05591b 100644
--- a/res/res_sorcery_memory.c
+++ b/res/res_sorcery_memory.c
@@ -188,6 +188,10 @@ static void sorcery_memory_retrieve_regex(const struct ast_sorcery *sorcery, voi
.regex = &expression,
};
+ if (ast_strlen_zero(regex)) {
+ regex = ".";
+ }
+
if (regcomp(&expression, regex, REG_EXTENDED | REG_NOSUB)) {
return;
}