diff options
58 files changed, 923 insertions, 444 deletions
@@ -397,6 +397,9 @@ res_pjsip into the "reg_server" field in the ps_contacts table to facilitate multi-server setups. + * When starting Asterisk, received traffic will now be ignored until Asterisk + has loaded all modules and is fully booted. + res_hep ------------------ * Added a new option, 'uuid_type', that sets the preferred source of the Homer diff --git a/addons/chan_ooh323.h b/addons/chan_ooh323.h index 89caaff63..1279a2246 100644 --- a/addons/chan_ooh323.h +++ b/addons/chan_ooh323.h @@ -38,7 +38,6 @@ #include <errno.h> #include <fcntl.h> #include <signal.h> -#include <sys/signal.h> #include "asterisk/lock.h" #include "asterisk/channel.h" diff --git a/addons/ooh323c/src/Makefile.in b/addons/ooh323c/src/Makefile.in index d3a96024b..15b14f7df 100644 --- a/addons/ooh323c/src/Makefile.in +++ b/addons/ooh323c/src/Makefile.in @@ -104,7 +104,7 @@ CONFIG_HEADER = $(top_builddir)/config.h CONFIG_CLEAN_FILES = LIBRARIES = $(noinst_LIBRARIES) -libooh323c_a_AR = $(AR) cru +libooh323c_a_AR = $(AR) cr libooh323c_a_LIBADD = am_libooh323c_a_OBJECTS = ooLogChan.$(OBJEXT) ooUtils.$(OBJEXT) \ ooGkClient.$(OBJEXT) context.$(OBJEXT) ooDateTime.$(OBJEXT) \ diff --git a/apps/app_dahdiras.c b/apps/app_dahdiras.c index e8bdad01e..51921a98e 100644 --- a/apps/app_dahdiras.c +++ b/apps/app_dahdiras.c @@ -36,12 +36,7 @@ ASTERISK_REGISTER_FILE() #include <sys/ioctl.h> #include <sys/wait.h> -#ifdef __linux__ -#include <sys/signal.h> -#else #include <signal.h> -#endif /* __linux__ */ - #include <fcntl.h> #include <dahdi/user.h> diff --git a/apps/app_dial.c b/apps/app_dial.c index 1f019d6c8..c4d527303 100644 --- a/apps/app_dial.c +++ b/apps/app_dial.c @@ -35,7 +35,7 @@ ASTERISK_REGISTER_FILE() #include <sys/time.h> -#include <sys/signal.h> +#include <signal.h> #include <sys/stat.h> #include <netinet/in.h> diff --git a/apps/app_queue.c b/apps/app_queue.c index 45297f5f2..8f949635d 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -72,7 +72,7 @@ ASTERISK_REGISTER_FILE() #include <sys/time.h> -#include <sys/signal.h> +#include <signal.h> #include <netinet/in.h> #include <ctype.h> diff --git a/channels/chan_console.c b/channels/chan_console.c index 9fdecd7d4..bd849ad53 100644 --- a/channels/chan_console.c +++ b/channels/chan_console.c @@ -64,7 +64,7 @@ ASTERISK_REGISTER_FILE() -#include <sys/signal.h> /* SIGURG */ +#include <signal.h> /* SIGURG */ #include <portaudio.h> diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index 2d923433c..e4b7c0ee8 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -58,10 +58,8 @@ ASTERISK_REGISTER_FILE() #if defined(__NetBSD__) || defined(__FreeBSD__) #include <pthread.h> -#include <signal.h> -#else -#include <sys/signal.h> #endif +#include <signal.h> #include <sys/stat.h> #include <math.h> diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index af44de96c..721da9a04 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -68,7 +68,6 @@ ASTERISK_REGISTER_FILE() #include <netinet/in_systm.h> #include <netinet/ip.h> #include <sys/time.h> -#include <sys/signal.h> #include <signal.h> #include <strings.h> #include <netdb.h> diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index 8714ddbbc..6df5d3fd0 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -47,7 +47,6 @@ ASTERISK_REGISTER_FILE() #include <net/if.h> #include <fcntl.h> #include <netdb.h> -#include <sys/signal.h> #include <signal.h> #include <netinet/in.h> #include <netinet/in_systm.h> diff --git a/channels/chan_motif.c b/channels/chan_motif.c index 118d1d9fb..0c710923f 100644 --- a/channels/chan_motif.c +++ b/channels/chan_motif.c @@ -51,7 +51,7 @@ ASTERISK_REGISTER_FILE() #include <netdb.h> #include <netinet/in.h> #include <arpa/inet.h> -#include <sys/signal.h> +#include <signal.h> #include <iksemel.h> #include <pthread.h> diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c index 370075d2b..22f834d37 100644 --- a/channels/chan_pjsip.c +++ b/channels/chan_pjsip.c @@ -269,6 +269,9 @@ static int direct_media_mitigate_glare(struct ast_sip_session *session) return 0; } +/*! + * \pre chan is locked + */ static int check_for_rtp_changes(struct ast_channel *chan, struct ast_rtp_instance *rtp, struct ast_sip_session_media *media, int rtcp_fd) { @@ -338,6 +341,11 @@ static int send_direct_media_request(void *data) int changed = 0; int res = 0; + /* The channel needs to be locked when checking for RTP changes. + * Otherwise, we could end up destroying an underlying RTCP structure + * at the same time that the channel thread is attempting to read RTCP + */ + ast_channel_lock(cdata->chan); if (pvt->media[SIP_MEDIA_AUDIO]) { changed |= check_for_rtp_changes( cdata->chan, cdata->rtp, pvt->media[SIP_MEDIA_AUDIO], 1); @@ -346,6 +354,7 @@ static int send_direct_media_request(void *data) changed |= check_for_rtp_changes( cdata->chan, cdata->vrtp, pvt->media[SIP_MEDIA_VIDEO], 3); } + ast_channel_unlock(cdata->chan); if (direct_media_mitigate_glare(cdata->session)) { ast_debug(4, "Disregarding setting RTP on %s: mitigating re-INVITE glare\n", ast_channel_name(cdata->chan)); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 19f8aa308..8f547bf1a 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -224,7 +224,6 @@ ASTERISK_REGISTER_FILE() #include <signal.h> -#include <sys/signal.h> #include <regex.h> #include <inttypes.h> @@ -12996,7 +12995,7 @@ static void add_codec_to_sdp(const struct sip_pvt *p, /* Opus mandates 2 channels in rtpmap */ if (ast_format_cmp(format, ast_format_opus) == AST_FORMAT_CMP_EQUAL) { ast_str_append(a_buf, 0, "a=rtpmap:%d %s/%u/2\r\n", rtp_code, mime, rate); - } else { + } else if ((35 <= rtp_code) || !(sip_cfg.compactheaders)) { ast_str_append(a_buf, 0, "a=rtpmap:%d %s/%u\r\n", rtp_code, mime, rate); } @@ -15791,11 +15790,12 @@ static void start_register_timeout(struct sip_registry *reg) static const char *sip_sanitized_host(const char *host) { - struct ast_sockaddr addr = { { 0, 0, }, }; + struct ast_sockaddr addr; /* peer/sip_pvt->tohost and sip_registry->hostname should never have a port * in them, so we use PARSE_PORT_FORBID here. If this lookup fails, we return * the original host which is most likely a host name and not an IP. */ + memset(&addr, 0, sizeof(addr)); if (!ast_sockaddr_parse(&addr, host, PARSE_PORT_FORBID)) { return host; } @@ -31811,7 +31811,7 @@ static struct sip_peer *build_peer(const char *name, struct ast_variable *v, str olddirectmediaacl = ast_free_acl_list(olddirectmediaacl); if (!ast_strlen_zero(peer->callback)) { /* build string from peer info */ char *reg_string; - if (ast_asprintf(®_string, "%s?%s:%s@%s/%s", peer->name, peer->username, !ast_strlen_zero(peer->remotesecret) ? peer->remotesecret : peer->secret, peer->tohost, peer->callback) >= 0) { + if (ast_asprintf(®_string, "%s?%s:%s:%s@%s/%s", peer->name, S_OR(peer->fromuser, peer->username), S_OR(peer->remotesecret, peer->secret), peer->username, peer->tohost, peer->callback) >= 0) { sip_register(reg_string, 0); /* XXX TODO: count in registry_count */ ast_free(reg_string); } diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c index 76990d175..fb6e619a1 100644 --- a/channels/chan_skinny.c +++ b/channels/chan_skinny.c @@ -49,7 +49,6 @@ ASTERISK_REGISTER_FILE() #include <fcntl.h> #include <netdb.h> #include <arpa/inet.h> -#include <sys/signal.h> #include <signal.h> #include <ctype.h> diff --git a/configs/samples/sip.conf.sample b/configs/samples/sip.conf.sample index 8f28e2680..a7b74df69 100644 --- a/configs/samples/sip.conf.sample +++ b/configs/samples/sip.conf.sample @@ -786,7 +786,7 @@ srvlookup=yes ; Enable DNS SRV lookups on outbound calls ; A similar effect can be achieved by adding a "callbackextension" option in a peer section. ; this is equivalent to having the following line in the general section: ; -; register => username:secret@host/callbackextension +; register => fromuser:secret:username@host/callbackextension ; ; and more readable because you don't have to write the parameters in two places ; (note that the "port" is ignored - this is a bug that should be fixed). @@ -4866,6 +4866,9 @@ case "${host_os}" in linux-gnueabi* | linux-gnuspe) OSARCH=linux-gnu ;; + linux-musl*) + OSARCH=linux-musl + ;; kfreebsd*-gnu) OSARCH=kfreebsd-gnu ;; @@ -13772,7 +13775,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) +#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -13818,7 +13821,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) +#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -13842,7 +13845,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) +#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -13887,7 +13890,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) +#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -13911,7 +13914,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) +#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -19328,7 +19331,8 @@ fi fi -if test "x${OSARCH}" = "xlinux-gnu" ; then +case "${OSARCH}" in +linux*) if test "x${PBX_CAP}" != "x1" -a "${USE_CAP}" != "no"; then pbxlibdir="" @@ -19433,7 +19437,8 @@ _ACEOF fi -fi + ;; +esac if test "x${PBX_DAHDI}" != "x1"; then diff --git a/configure.ac b/configure.ac index 5852d3fa2..3cc8588fe 100644 --- a/configure.ac +++ b/configure.ac @@ -181,6 +181,9 @@ case "${host_os}" in linux-gnueabi* | linux-gnuspe) OSARCH=linux-gnu ;; + linux-musl*) + OSARCH=linux-musl + ;; kfreebsd*-gnu) OSARCH=kfreebsd-gnu ;; @@ -1388,9 +1391,11 @@ if test "${PBX_BFD}" = "0"; then AST_EXT_LIB_CHECK([BFD], [bfd], [bfd_check_format], [bfd.h], [-ldl -liberty -lz]) fi -if test "x${OSARCH}" = "xlinux-gnu" ; then +case "${OSARCH}" in +linux*) AST_EXT_LIB_CHECK([CAP], [cap], [cap_from_text], [sys/capability.h]) -fi + ;; +esac AST_C_DEFINE_CHECK([DAHDI], [DAHDI_RESET_COUNTERS], [dahdi/user.h], [230]) AST_C_DEFINE_CHECK([DAHDI], [DAHDI_DEFAULT_MTU_MRU], [dahdi/user.h], [220]) diff --git a/include/asterisk/compat.h b/include/asterisk/compat.h index 3eb6c96a2..252ce914a 100644 --- a/include/asterisk/compat.h +++ b/include/asterisk/compat.h @@ -70,7 +70,7 @@ #endif #ifndef AST_POLL_COMPAT -#include <sys/poll.h> +#include <poll.h> #else #include "asterisk/poll-compat.h" #endif diff --git a/include/asterisk/poll-compat.h b/include/asterisk/poll-compat.h index cbb610925..72ac2c3e2 100644 --- a/include/asterisk/poll-compat.h +++ b/include/asterisk/poll-compat.h @@ -83,7 +83,7 @@ #ifndef AST_POLL_COMPAT -#include <sys/poll.h> +#include <poll.h> #define ast_poll(a, b, c) poll(a, b, c) diff --git a/include/asterisk/res_hep.h b/include/asterisk/res_hep.h index bd0129eea..cfd213ad7 100644 --- a/include/asterisk/res_hep.h +++ b/include/asterisk/res_hep.h @@ -118,6 +118,14 @@ int hepv3_send_packet(struct hepv3_capture_info *capture_info); */ enum hep_uuid_type hepv3_get_uuid_type(void); +/*! + * \brief Return whether or not we're currently loaded and active + * + * \retval 0 The module is not loaded + * \retval 1 The module is loaded + */ +int hepv3_is_loaded(void); + #if defined(__cplusplus) || defined(c_plusplus) } #endif diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 50d02d980..d1f0c9825 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1301,6 +1301,17 @@ struct ast_serializer_shutdown_group; struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group); /*! + * \brief Determine the distributor serializer for the SIP message. + * \since 13.10.0 + * + * \param rdata The incoming message. + * + * \retval Calculated distributor serializer on success. + * \retval NULL on error. + */ +struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata); + +/*! * \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized * * Passing a NULL serializer is a way to remove a serializer from a dialog. diff --git a/include/asterisk/res_pjsip_session.h b/include/asterisk/res_pjsip_session.h index 55401e7c7..5ca2c99a5 100644 --- a/include/asterisk/res_pjsip_session.h +++ b/include/asterisk/res_pjsip_session.h @@ -408,9 +408,10 @@ struct ast_sip_channel_pvt *ast_sip_channel_pvt_alloc(void *pvt, struct ast_sip_ * \param endpoint The endpoint that this session communicates with * \param contact The contact associated with this session * \param inv_session The PJSIP INVITE session data + * \param rdata INVITE request received (NULL if for outgoing allocation) */ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, - struct ast_sip_contact *contact, pjsip_inv_session *inv); + struct ast_sip_contact *contact, pjsip_inv_session *inv, pjsip_rx_data *rdata); /*! * \brief Request and wait for the session serializer to be suspended. diff --git a/include/asterisk/sorcery.h b/include/asterisk/sorcery.h index 42ecc133f..23219ec41 100644 --- a/include/asterisk/sorcery.h +++ b/include/asterisk/sorcery.h @@ -695,6 +695,20 @@ int __ast_sorcery_object_register(struct ast_sorcery *sorcery, const char *type, __ast_sorcery_object_register((sorcery), (type), 1, 1, (alloc), (transform), (apply)) /*! + * \brief Set the high and low alert water marks of the sorcery object type. + * \since 13.10.0 + * + * \param sorcery Pointer to a sorcery structure + * \param type Type of object + * \param low_water New queue low water mark. (-1 to set as 90% of high_water) + * \param high_water New queue high water mark. + * + * \retval 0 on success. + * \retval -1 on error (water marks not changed). + */ +int ast_sorcery_object_set_congestion_levels(struct ast_sorcery *sorcery, const char *type, long low_water, long high_water); + +/*! * \brief Set the copy handler for an object type * * \param sorcery Pointer to a sorcery structure diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 14ab7d93b..62ed1ed1a 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -591,6 +591,20 @@ struct stasis_subscription *stasis_unsubscribe( struct stasis_subscription *subscription); /*! + * \brief Set the high and low alert water marks of the stasis subscription. + * \since 13.10.0 + * + * \param subscription Pointer to a stasis subscription + * \param low_water New queue low water mark. (-1 to set as 90% of high_water) + * \param high_water New queue high water mark. + * + * \retval 0 on success. + * \retval -1 on error (water marks not changed). + */ +int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, + long low_water, long high_water); + +/*! * \brief Block until the last message is processed on a subscription. * * This function will not return until the \a subscription's callback for the diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 89657a5ee..50270a788 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -127,6 +127,20 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router, struct stasis_message *message); /*! + * \brief Set the high and low alert water marks of the stasis message router. + * \since 13.10.0 + * + * \param router Pointer to a stasis message router + * \param low_water New queue low water mark. (-1 to set as 90% of high_water) + * \param high_water New queue high water mark. + * + * \retval 0 on success. + * \retval -1 on error (water marks not changed). + */ +int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, + long low_water, long high_water); + +/*! * \brief Add a route to a message router. * * A particular \a message_type may have at most one route per \a router. If diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index af3ce747f..e51122269 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -59,6 +59,7 @@ struct ast_taskprocessor; /*! \brief Suggested maximum taskprocessor name length (less null terminator). */ #define AST_TASKPROCESSOR_MAX_NAME 45 +/*! Default taskprocessor high water level alert trigger */ #define AST_TASKPROCESSOR_HIGH_WATER_LEVEL 500 /*! @@ -297,4 +298,26 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps); */ long ast_taskprocessor_size(struct ast_taskprocessor *tps); +/*! + * \brief Get the current taskprocessor high water alert count. + * \since 13.10.0 + * + * \retval 0 if no taskprocessors are in high water alert. + * \retval non-zero if some task processors are in high water alert. + */ +unsigned int ast_taskprocessor_alert_get(void); + +/*! + * \brief Set the high and low alert water marks of the given taskprocessor queue. + * \since 13.10.0 + * + * \param tps Taskprocessor to update queue water marks. + * \param low_water New queue low water mark. (-1 to set as 90% of high_water) + * \param high_water New queue high water mark. + * + * \retval 0 on success. + * \retval -1 on error (water marks not changed). + */ +int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water); + #endif /* __AST_TASKPROCESSOR_H__ */ diff --git a/main/Makefile b/main/Makefile index 386397801..729ae9c01 100644 --- a/main/Makefile +++ b/main/Makefile @@ -46,7 +46,7 @@ AST_LIBS+=$(CRYPT_LIB) AST_LIBS+=$(AST_CLANG_BLOCKS_LIBS) AST_LIBS+=$(RT_LIB) -ifneq ($(findstring $(OSARCH), linux-gnu uclinux linux-uclibc kfreebsd-gnu),) +ifneq ($(findstring $(OSARCH), linux-gnu uclinux linux-uclibc linux-musl kfreebsd-gnu),) ifneq ($(findstring LOADABLE_MODULES,$(MENUSELECT_CFLAGS)),) AST_LIBS+=-ldl endif diff --git a/main/ast_expr2.c b/main/ast_expr2.c index d41072d6e..c700b01d7 100644 --- a/main/ast_expr2.c +++ b/main/ast_expr2.c @@ -94,6 +94,7 @@ #define ASTMM_LIBC ASTMM_REDIRECT #include "asterisk.h" +#include <sys/cdefs.h> #include <sys/types.h> #include <stdio.h> diff --git a/main/ast_expr2.y b/main/ast_expr2.y index 762e83d84..df87bcc7f 100644 --- a/main/ast_expr2.y +++ b/main/ast_expr2.y @@ -15,6 +15,7 @@ #define ASTMM_LIBC ASTMM_REDIRECT #include "asterisk.h" +#include <sys/cdefs.h> #include <sys/types.h> #include <stdio.h> diff --git a/main/cdr.c b/main/cdr.c index b43e3610c..586a10684 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -71,6 +71,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_message_router.h" #include "asterisk/astobj2.h" +#include "asterisk/taskprocessor.h" /*** DOCUMENTATION <configInfo name="cdr" language="en_US"> @@ -1415,8 +1416,6 @@ static int base_process_bridge_leave(struct cdr_object *cdr, struct ast_bridge_s static int base_process_dial_end(struct cdr_object *cdr, struct ast_channel_snapshot *caller, struct ast_channel_snapshot *peer, const char *dial_status) { - /* In general, most things shouldn't get a dial end. */ - ast_assert(0); return 0; } @@ -4221,6 +4220,8 @@ int ast_cdr_engine_init(void) if (!stasis_router) { return -1; } + stasis_message_router_set_congestion_limits(stasis_router, -1, + 10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) { return -1; diff --git a/main/cel.c b/main/cel.c index a0d0ad723..887a9e6a5 100644 --- a/main/cel.c +++ b/main/cel.c @@ -59,6 +59,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/parking.h" #include "asterisk/pickup.h" #include "asterisk/core_local.h" +#include "asterisk/taskprocessor.h" /*** DOCUMENTATION <configInfo name="cel" language="en_US"> @@ -1575,6 +1576,8 @@ static int create_routes(void) if (!cel_state_router) { return -1; } + stasis_message_router_set_congestion_limits(cel_state_router, -1, + 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); ret |= stasis_message_router_add(cel_state_router, stasis_cache_update_type(), diff --git a/main/cli.c b/main/cli.c index f2bedc91a..8af20b61b 100644 --- a/main/cli.c +++ b/main/cli.c @@ -42,7 +42,6 @@ ASTERISK_REGISTER_FILE() #include "asterisk/_private.h" #include "asterisk/paths.h" /* use ast_config_AST_MODULE_DIR */ -#include <sys/signal.h> #include <signal.h> #include <ctype.h> #include <regex.h> diff --git a/main/dns.c b/main/dns.c index 96227949e..fa94089e7 100644 --- a/main/dns.c +++ b/main/dns.c @@ -254,12 +254,10 @@ static int dns_search_res(const char *dname, int rr_class, int rr_type, { int ret = AST_DNS_SEARCH_FAILURE; - struct __res_state dns_state; ast_mutex_lock(&res_lock); res_init(); - ret = res_search(&dns_state, - dname, + ret = res_search(dname, rr_class, rr_type, dns_response, diff --git a/main/editline/Makefile.in b/main/editline/Makefile.in index 112b68b64..2be4333d5 100644 --- a/main/editline/Makefile.in +++ b/main/editline/Makefile.in @@ -187,7 +187,7 @@ distclean : clean # $(LIB_A) : $(BGCSRCS:.c=.o_a) $(CCSRCS:.c=.o_a) - $(AR) cru $@ $? + $(AR) cr $@ $? $(RANLIB) $@ $(LIB_S) : $(BGCSRCS:.c=.o_s) $(CCSRCS:.c=.o_s) diff --git a/main/features.c b/main/features.c index 7dfe4cde5..6806fe749 100644 --- a/main/features.c +++ b/main/features.c @@ -46,7 +46,7 @@ ASTERISK_REGISTER_FILE() #include <pthread.h> #include <signal.h> #include <sys/time.h> -#include <sys/signal.h> +#include <signal.h> #include <netinet/in.h> #include "asterisk/lock.h" diff --git a/main/http.c b/main/http.c index 5ec94a7e1..a8362829e 100644 --- a/main/http.c +++ b/main/http.c @@ -49,7 +49,7 @@ ASTERISK_REGISTER_FILE() #include <time.h> #include <sys/time.h> #include <sys/stat.h> -#include <sys/signal.h> +#include <signal.h> #include <fcntl.h> #include "asterisk/paths.h" /* use ast_config_AST_DATA_DIR */ diff --git a/main/manager.c b/main/manager.c index 94415b7a0..029da70f7 100644 --- a/main/manager.c +++ b/main/manager.c @@ -100,6 +100,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/rtp_engine.h" #include "asterisk/format_cache.h" #include "asterisk/translate.h" +#include "asterisk/taskprocessor.h" /*** DOCUMENTATION <manager name="Ping" language="en_US"> @@ -8692,6 +8693,8 @@ static int manager_subscriptions_init(void) if (!stasis_router) { return -1; } + stasis_message_router_set_congestion_limits(stasis_router, -1, + 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); res |= stasis_message_router_set_default(stasis_router, manager_default_msg_cb, NULL); diff --git a/main/sorcery.c b/main/sorcery.c index ec340e827..6a7b54bbf 100644 --- a/main/sorcery.c +++ b/main/sorcery.c @@ -1164,6 +1164,20 @@ int __ast_sorcery_object_register(struct ast_sorcery *sorcery, const char *type, return 0; } +int ast_sorcery_object_set_congestion_levels(struct ast_sorcery *sorcery, const char *type, long low_water, long high_water) +{ + struct ast_sorcery_object_type *object_type; + int res = -1; + + object_type = ao2_find(sorcery->types, type, OBJ_SEARCH_KEY); + if (object_type) { + res = ast_taskprocessor_alert_set_levels(object_type->serializer, + low_water, high_water); + ao2_ref(object_type, -1); + } + return res; +} + void ast_sorcery_object_set_copy_handler(struct ast_sorcery *sorcery, const char *type, sorcery_copy_handler copy) { RAII_VAR(struct ast_sorcery_object_type *, object_type, ao2_find(sorcery->types, type, OBJ_KEY), ao2_cleanup); diff --git a/main/stasis.c b/main/stasis.c index 9fe3a2aae..91ad94e76 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -564,6 +564,18 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) return NULL; } +int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, + long low_water, long high_water) +{ + int res = -1; + + if (subscription) { + res = ast_taskprocessor_alert_set_levels(subscription->mailbox, + low_water, high_water); + } + return res; +} + void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index f60180d68..85034bcf9 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -289,6 +289,18 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router, ao2_cleanup(router); } +int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, + long low_water, long high_water) +{ + int res = -1; + + if (router) { + res = stasis_subscription_set_congestion_limits(router->subscription, + low_water, high_water); + } + return res; +} + int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 5b8ff08f1..2f0124045 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -76,6 +76,10 @@ struct ast_taskprocessor { void *local_data; /*! \brief Taskprocessor current queue size */ long tps_queue_size; + /*! \brief Taskprocessor low water clear alert level */ + long tps_queue_low; + /*! \brief Taskprocessor high water alert trigger level */ + long tps_queue_high; /*! \brief Taskprocessor queue */ AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue; struct ast_taskprocessor_listener *listener; @@ -85,6 +89,8 @@ struct ast_taskprocessor { unsigned int executing:1; /*! Indicates that a high water warning has been issued on this task processor */ unsigned int high_water_warned:1; + /*! Indicates that a high water alert is active on this taskprocessor */ + unsigned int high_water_alert:1; }; /*! @@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags); /*! \brief The astobj2 compare callback for taskprocessors */ static int tps_cmp_cb(void *obj, void *arg, int flags); -/*! \brief Destroy the taskprocessor when its refcount reaches zero */ -static void tps_taskprocessor_destroy(void *tps); - /*! \brief CLI <example>taskprocessor ping <blah></example> handler function */ static int tps_ping_handler(void *datap); -/*! \brief Remove the front task off the taskprocessor queue */ -static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps); - static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); @@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg struct ao2_container *sorted_tps; struct ast_taskprocessor *tps; struct ao2_iterator iter; -#define FMT_HEADERS "%-45s %10s %10s %10s\n" -#define FMT_FIELDS "%-45s %10lu %10lu %10lu\n" +#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n" +#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n" switch (cmd) { case CLI_INIT: @@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_FAILURE; } - ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth"); + ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water"); tcount = 0; iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK); while ((tps = ao2_iterator_next(&iter))) { @@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg maxqsize = 0; processed = 0; } - ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize); + ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize, + tps->tps_queue_low, tps->tps_queue_high); ast_taskprocessor_unreference(tps); ++tcount; } @@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags) return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0; } +/*! Count of the number of taskprocessors in high water alert. */ +static unsigned int tps_alert_count; + +/*! Access protection for tps_alert_count */ +AST_RWLOCK_DEFINE_STATIC(tps_alert_lock); + +/*! + * \internal + * \brief Add a delta to tps_alert_count with protection. + * \since 13.10.0 + * + * \param tps Taskprocessor updating queue water mark alert trigger. + * \param delta The amount to add to tps_alert_count. + * + * \return Nothing + */ +static void tps_alert_add(struct ast_taskprocessor *tps, int delta) +{ + unsigned int old; + + ast_rwlock_wrlock(&tps_alert_lock); + old = tps_alert_count; + tps_alert_count += delta; + if (DEBUG_ATLEAST(3) + /* and tps_alert_count becomes zero or non-zero */ + && !old != !tps_alert_count) { + ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n", + tps->name, tps_alert_count ? "triggered" : "cleared"); + } + ast_rwlock_unlock(&tps_alert_lock); +} + +unsigned int ast_taskprocessor_alert_get(void) +{ + unsigned int count; + + ast_rwlock_rdlock(&tps_alert_lock); + count = tps_alert_count; + ast_rwlock_unlock(&tps_alert_lock); + + return count; +} + +int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water) +{ + if (!tps || high_water < 0 || high_water < low_water) { + return -1; + } + + if (low_water < 0) { + /* Set low water level to 90% of high water level */ + low_water = (high_water * 9) / 10; + } + + ao2_lock(tps); + + tps->tps_queue_low = low_water; + tps->tps_queue_high = high_water; + + if (tps->high_water_alert) { + if (!tps->tps_queue_size || tps->tps_queue_size < low_water) { + /* Update water mark alert immediately */ + tps->high_water_alert = 0; + tps_alert_add(tps, -1); + } + } else { + if (high_water <= tps->tps_queue_size) { + /* Update water mark alert immediately */ + tps->high_water_alert = 1; + tps_alert_add(tps, +1); + } + } + + ao2_unlock(tps); + + return 0; +} + /* destroy the taskprocessor */ -static void tps_taskprocessor_destroy(void *tps) +static void tps_taskprocessor_dtor(void *tps) { struct ast_taskprocessor *t = tps; struct tps_task *task; - if (!tps) { - ast_log(LOG_ERROR, "missing taskprocessor\n"); - return; + while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) { + tps_task_free(task); } - ast_debug(1, "destroying taskprocessor '%s'\n", t->name); - /* free it */ + t->tps_queue_size = 0; + + if (t->high_water_alert) { + t->high_water_alert = 0; + tps_alert_add(t, -1); + } + ast_free(t->stats); t->stats = NULL; ast_free((char *) t->name); - if (t->listener) { - ao2_ref(t->listener, -1); - t->listener = NULL; - } - while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) { - tps_task_free(task); - } + t->name = NULL; + ao2_cleanup(t->listener); + t->listener = NULL; } /* pop the front task and return it */ @@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) struct tps_task *task; if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) { - tps->tps_queue_size--; + --tps->tps_queue_size; + if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) { + tps->high_water_alert = 0; + tps_alert_add(tps, -1); + } } return task; } @@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void) static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener) { - RAII_VAR(struct ast_taskprocessor *, p, - ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup); + struct ast_taskprocessor *p; + p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor); if (!p) { ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name); return NULL; } - if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) { - ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name); - return NULL; - } - if (!(p->name = ast_strdup(name))) { + /* Set default congestion water level alert triggers. */ + p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10; + p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; + + p->stats = ast_calloc(1, sizeof(*p->stats)); + p->name = ast_strdup(name); + if (!p->stats || !p->name) { + ao2_ref(p, -1); return NULL; } @@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru if (!(ao2_link(tps_singletons, p))) { ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name); listener->tps = NULL; - ao2_ref(p, -1); + ao2_ref(p, -2); return NULL; } if (p->listener->callbacks->start(p->listener)) { - ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name); + ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", + p->name); ast_taskprocessor_unreference(p); return NULL; } - /* RAII_VAR will decrement the refcount at the end of the function. - * Since we want to pass back a reference to p, we bump the refcount - */ - ao2_ref(p, +1); return p; - } /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't @@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; - if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) { - ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n", - tps->name, previous_size); - tps->high_water_warned = 1; + if (previous_size >= tps->tps_queue_high) { + if (!tps->high_water_warned) { + tps->high_water_warned = 1; + ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n", + tps->name, previous_size); + } + if (!tps->high_water_alert) { + tps->high_water_alert = 1; + tps_alert_add(tps, +1); + } } /* The currently executing task counts as still in queue */ diff --git a/main/tcptls.c b/main/tcptls.c index f56e0aa70..046501b77 100644 --- a/main/tcptls.c +++ b/main/tcptls.c @@ -38,7 +38,6 @@ ASTERISK_REGISTER_FILE() #endif #include <signal.h> -#include <sys/signal.h> #include "asterisk/compat.h" #include "asterisk/tcptls.h" diff --git a/main/translate.c b/main/translate.c index 8d37e3724..b7443b81b 100644 --- a/main/translate.c +++ b/main/translate.c @@ -358,6 +358,7 @@ static struct ast_trans_pvt *newpvt(struct ast_translator *t, struct ast_format pvt->f.offset = AST_FRIENDLY_OFFSET; pvt->f.src = pvt->t->name; pvt->f.data.ptr = pvt->outbuf.c; + pvt->f.seqno = 0x10000; /* * If the translator has not provided a format @@ -524,13 +525,46 @@ struct ast_trans_pvt *ast_translator_build_path(struct ast_format *dst, struct a /*! \brief do the actual translation */ struct ast_frame *ast_translate(struct ast_trans_pvt *path, struct ast_frame *f, int consume) { - struct ast_trans_pvt *p = path; - struct ast_frame *out; + const unsigned int rtp_seqno_max_value = 0xffff; + struct ast_frame *out_last, *out = NULL; + struct ast_trans_pvt *step; struct timeval delivery; int has_timing_info; long ts; long len; - int seqno; + int seqno, frames_missing; + + /* Determine the amount of lost packets for PLC */ + /* But not at start with first frame = path->f.seqno is still 0x10000 */ + /* But not when there is no sequence number = frame created internally */ + if ((path->f.seqno <= rtp_seqno_max_value) && (path->f.seqno != f->seqno)) { + if (f->seqno < path->f.seqno) { /* seqno overrun situation */ + frames_missing = rtp_seqno_max_value + f->seqno - path->f.seqno - 1; + } else { + frames_missing = f->seqno - path->f.seqno - 1; + } + /* Out-of-order packet - more precise: late packet */ + if ((rtp_seqno_max_value + 1) / 2 < frames_missing) { + if (consume) { + ast_frfree(f); + } + /* + * Do not pass late packets to any transcoding module, because that + * confuses the state of any library (packets inter-depend). With + * the next packet, this one is going to be treated as lost packet. + */ + return NULL; + } + + if (frames_missing > 96) { + struct ast_str *str = ast_str_alloca(256); + + /* not DEBUG but NOTICE because of WARNING in main/cannel.c:__ast_queue_frame */ + ast_log(LOG_NOTICE, "%d lost frame(s) %d/%d %s\n", frames_missing, f->seqno, path->f.seqno, ast_translate_path_to_str(path, &str)); + } + } else { + frames_missing = 0; + } has_timing_info = ast_test_flag(f, AST_FRFLAG_HAS_TIMING_INFO); ts = f->ts; @@ -560,18 +594,93 @@ struct ast_frame *ast_translate(struct ast_trans_pvt *path, struct ast_frame *f, f->samples, ast_format_get_sample_rate(f->subclass.format))); } delivery = f->delivery; - for (out = f; out && p ; p = p->next) { - struct ast_frame *current = out; - - do { - framein(p, current); - current = AST_LIST_NEXT(current, frame_list); - } while (current); - if (out != f) { - ast_frfree(out); + + for (out_last = NULL; frames_missing + 1; frames_missing--) { + struct ast_frame *frame_to_translate, *inner_head; + struct ast_frame missed = { + .frametype = AST_FRAME_VOICE, + .subclass.format = f->subclass.format, + .datalen = 0, + /* In RTP, the amount of samples might change anytime */ + /* If that happened while frames got lost, what to do? */ + .samples = f->samples, /* FIXME */ + .src = __FUNCTION__, + .data.uint32 = 0, + .delivery.tv_sec = 0, + .delivery.tv_usec = 0, + .flags = 0, + /* RTP sequence number is between 0x0001 and 0xffff */ + .seqno = (rtp_seqno_max_value + 1 + f->seqno - frames_missing) & rtp_seqno_max_value, + }; + + if (frames_missing) { + frame_to_translate = &missed; + } else { + frame_to_translate = f; + } + + /* The translation path from one format to another might contain several steps */ + /* out* collects the result for missed frame(s) and input frame(s) */ + /* out is the result of the conversion of all frames, translated into the destination format */ + /* out_last is the last frame in that list, to add frames faster */ + for (step = path, inner_head = frame_to_translate; inner_head && step; step = step->next) { + struct ast_frame *current, *inner_last, *inner_prev = frame_to_translate; + + /* inner* collects the result of each conversion step, the input for the next step */ + /* inner_head is a list of frames created by each conversion step */ + /* inner_last is the last frame in that list, to add frames faster */ + for (inner_last = NULL, current = inner_head; current; current = AST_LIST_NEXT(current, frame_list)) { + struct ast_frame *tmp; + + framein(step, current); + tmp = step->t->frameout(step); + + if (!tmp) { + continue; + } else if (inner_last) { + struct ast_frame *t; + + /* Determine the last frame of the list before appending to it */ + while ((t = AST_LIST_NEXT(inner_last, frame_list))) { + inner_last = t; + } + AST_LIST_NEXT(inner_last, frame_list) = tmp; + } else { + inner_prev = inner_head; + inner_head = tmp; + inner_last = tmp; + } + } + + /* The current step did not create any frames = no frames for the next step */ + /* The steps are not lost because framein buffered those for the next input frame */ + if (!inner_last) { + inner_prev = inner_head; + inner_head = NULL; + } + if (inner_prev != frame_to_translate) { + ast_frfree(inner_prev); /* Frees just the intermediate lists */ + } + } + + /* This frame created no frames after translation = continue with next frame */ + /* The frame is not lost because framein buffered it to be combined with the next frame */ + if (!inner_head) { + continue; + } else if (out_last) { + struct ast_frame *t; + + /* Determine the last frame of the list before appending to it */ + while ((t = AST_LIST_NEXT(out_last, frame_list))) { + out_last = t; + } + AST_LIST_NEXT(out_last, frame_list) = inner_head; + } else { + out = inner_head; + out_last = inner_head; } - out = p->t->frameout(p); } + if (out) { /* we have a frame, play with times */ if (!ast_tvzero(delivery)) { diff --git a/pbx/pbx_dundi.c b/pbx/pbx_dundi.c index 10495acb6..94b71a002 100644 --- a/pbx/pbx_dundi.c +++ b/pbx/pbx_dundi.c @@ -43,7 +43,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/network.h" #include <sys/ioctl.h> #include <zlib.h> -#include <sys/signal.h> +#include <signal.h> #include <pthread.h> #include <net/if.h> diff --git a/res/ael/ael.flex b/res/ael/ael.flex index b1b2bd76d..4e87f3a40 100644 --- a/res/ael/ael.flex +++ b/res/ael/ael.flex @@ -80,6 +80,12 @@ ASTERISK_REGISTER_FILE() #if !defined(GLOB_ABORTED) #define GLOB_ABORTED GLOB_ABEND #endif +#if !defined(GLOB_BRACE) +#define GLOB_BRACE 0 +#endif +#if !defined(GLOB_NOMAGIC) +#define GLOB_NOMAGIC 0 +#endif #include "asterisk/logger.h" #include "asterisk/utils.h" diff --git a/res/ael/ael_lex.c b/res/ael/ael_lex.c index a7a20aa60..9fbd66429 100644 --- a/res/ael/ael_lex.c +++ b/res/ael/ael_lex.c @@ -839,6 +839,12 @@ ASTERISK_REGISTER_FILE() #if !defined(GLOB_ABORTED) #define GLOB_ABORTED GLOB_ABEND #endif +#if !defined(GLOB_BRACE) +#define GLOB_BRACE 0 +#endif +#if !defined(GLOB_NOMAGIC) +#define GLOB_NOMAGIC 0 +#endif #include "asterisk/logger.h" #include "asterisk/utils.h" diff --git a/res/res_hep.c b/res/res_hep.c index 45201359d..e79f2b67a 100644 --- a/res/res_hep.c +++ b/res/res_hep.c @@ -409,9 +409,21 @@ enum hep_uuid_type hepv3_get_uuid_type(void) { RAII_VAR(struct module_config *, config, ao2_global_obj_ref(global_config), ao2_cleanup); + if (!config) { + /* Well, that's unfortunate. Return something. */ + return HEP_UUID_TYPE_CALL_ID; + } + return config->general->uuid_type; } +int hepv3_is_loaded(void) +{ + RAII_VAR(struct module_config *, config, ao2_global_obj_ref(global_config), ao2_cleanup); + + return (config != NULL) ? 1 : 0; +} + struct hepv3_capture_info *hepv3_create_capture_info(const void *payload, size_t len) { struct hepv3_capture_info *info; diff --git a/res/res_hep.exports.in b/res/res_hep.exports.in index df0f2b4f7..e318ac97f 100644 --- a/res/res_hep.exports.in +++ b/res/res_hep.exports.in @@ -3,6 +3,7 @@ LINKER_SYMBOL_PREFIX*hepv3_send_packet; LINKER_SYMBOL_PREFIX*hepv3_create_capture_info; LINKER_SYMBOL_PREFIX*hepv3_get_uuid_type; + LINKER_SYMBOL_PREFIX*hepv3_is_loaded; local: *; }; diff --git a/res/res_hep_pjsip.c b/res/res_hep_pjsip.c index 0cc54c237..a3a93e9b2 100644 --- a/res/res_hep_pjsip.c +++ b/res/res_hep_pjsip.c @@ -210,6 +210,11 @@ static int load_module(void) { CHECK_PJSIP_MODULE_LOADED(); + if (!ast_module_check("res_hep.so") || !hepv3_is_loaded()) { + ast_log(AST_LOG_WARNING, "res_hep is not loaded or running; declining module load\n"); + return AST_MODULE_LOAD_DECLINE; + } + ast_sip_register_service(&logging_module); return AST_MODULE_LOAD_SUCCESS; } diff --git a/res/res_hep_rtcp.c b/res/res_hep_rtcp.c index 8643d4db6..03db18159 100644 --- a/res/res_hep_rtcp.c +++ b/res/res_hep_rtcp.c @@ -149,6 +149,10 @@ static void rtp_topic_handler(void *data, struct stasis_subscription *sub, struc static int load_module(void) { + if (!ast_module_check("res_hep.so") || !hepv3_is_loaded()) { + ast_log(AST_LOG_WARNING, "res_hep is not loaded or running; declining module load\n"); + return AST_MODULE_LOAD_DECLINE; + } stasis_rtp_subscription = stasis_subscribe(ast_rtp_topic(), rtp_topic_handler, NULL); diff --git a/res/res_musiconhold.c b/res/res_musiconhold.c index f124d58f2..3c7199ef4 100644 --- a/res/res_musiconhold.c +++ b/res/res_musiconhold.c @@ -44,7 +44,7 @@ ASTERISK_REGISTER_FILE() #include <ctype.h> #include <signal.h> #include <sys/time.h> -#include <sys/signal.h> +#include <signal.h> #include <netinet/in.h> #include <sys/stat.h> #include <dirent.h> diff --git a/res/res_pjsip/location.c b/res/res_pjsip/location.c index 43e6ea40f..8f8c030c3 100644 --- a/res/res_pjsip/location.c +++ b/res/res_pjsip/location.c @@ -25,6 +25,7 @@ #include "asterisk/astobj2.h" #include "asterisk/paths.h" #include "asterisk/sorcery.h" +#include "asterisk/taskprocessor.h" #include "include/res_pjsip_private.h" #include "asterisk/res_pjsip_cli.h" #include "asterisk/statsd.h" @@ -1114,6 +1115,8 @@ int ast_sip_initialize_sorcery_location(void) ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size); ast_sorcery_apply_default(sorcery, "contact", "astdb", "registrar"); + ast_sorcery_object_set_congestion_levels(sorcery, "contact", -1, + 3 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); ast_sorcery_apply_default(sorcery, "aor", "config", "pjsip.conf,criteria=type=aor"); if (ast_sorcery_object_register(sorcery, "contact", contact_alloc, NULL, contact_apply_handler) || diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index 3867eaea0..715ecb263 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -59,6 +59,12 @@ struct unidentified_request{ char src_name[]; }; +/*! Number of serializers in pool if one not otherwise known. (Best if prime number) */ +#define DISTRIBUTOR_POOL_SIZE 31 + +/*! Pool of serializers to use if not supplied. */ +static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE]; + /*! * \internal * \brief Record the task's serializer name on the tdata structure. @@ -278,6 +284,83 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata) return dlg; } +/*! + * \internal + * \brief Compute a hash value on a pjlib string + * \since 13.10.0 + * + * \param[in] str The pjlib string to add to the hash + * \param[in] hash The hash value to add to + * + * \details + * This version of the function is for when you need to compute a + * string hash of more than one string. + * + * This famous hash algorithm was written by Dan Bernstein and is + * commonly used. + * + * \sa http://www.cse.yorku.ca/~oz/hash.html + */ +static int pjstr_hash_add(pj_str_t *str, int hash) +{ + size_t len; + const char *pos; + + len = pj_strlen(str); + pos = pj_strbuf(str); + while (len--) { + hash = hash * 33 ^ *pos++; + } + + return hash; +} + +/*! + * \internal + * \brief Compute a hash value on a pjlib string + * \since 13.10.0 + * + * \param[in] str The pjlib string to hash + * + * This famous hash algorithm was written by Dan Bernstein and is + * commonly used. + * + * http://www.cse.yorku.ca/~oz/hash.html + */ +static int pjstr_hash(pj_str_t *str) +{ + return pjstr_hash_add(str, 5381); +} + +struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata) +{ + int hash; + pj_str_t *remote_tag; + struct ast_taskprocessor *serializer; + + if (!rdata->msg_info.msg) { + return NULL; + } + + if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) { + remote_tag = &rdata->msg_info.from->tag; + } else { + remote_tag = &rdata->msg_info.to->tag; + } + + /* Compute the hash from the SIP message call-id and remote-tag */ + hash = pjstr_hash(&rdata->msg_info.cid->id); + hash = pjstr_hash_add(remote_tag, hash); + hash = abs(hash); + + serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]); + if (serializer) { + ast_debug(3, "Calculated serializer %s to use for %s\n", + ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata)); + } + return serializer; +} + static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata); static pjsip_module endpoint_mod = { @@ -286,15 +369,22 @@ static pjsip_module endpoint_mod = { .on_rx_request = endpoint_lookup, }; -#define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3) - static pj_bool_t distributor(pjsip_rx_data *rdata) { - pjsip_dialog *dlg = find_dialog(rdata); + pjsip_dialog *dlg; struct distributor_dialog_data *dist = NULL; struct ast_taskprocessor *serializer = NULL; pjsip_rx_data *clone; + if (!ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { + /* + * Ignore everything until we are fully booted. Let the + * peer retransmit messages until we are ready. + */ + return PJ_TRUE; + } + + dlg = find_dialog(rdata); if (dlg) { ast_debug(3, "Searching for serializer on dialog %s for %s\n", dlg->obj_name, pjsip_rx_data_get_info(rdata)); @@ -315,12 +405,45 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n", pjsip_rx_data_get_info(rdata)); serializer = find_request_serializer(rdata); + if (!serializer) { + if (ast_taskprocessor_alert_get()) { + /* We're overloaded, ignore the unmatched response. */ + ast_debug(3, "Taskprocessor overload alert: Ignoring unmatched '%s'.\n", + pjsip_rx_data_get_info(rdata)); + return PJ_TRUE; + } + + /* + * Pick a serializer for the unmatched response. Maybe + * the stack can figure out what it is for, or we really + * should just toss it regardless. + */ + serializer = ast_sip_get_distributor_serializer(rdata); + } } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) { /* We have a BYE or CANCEL request without a serializer. */ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL); return PJ_TRUE; + } else { + if (ast_taskprocessor_alert_get()) { + /* + * When taskprocessors get backed up, there is a good chance that + * we are being overloaded and need to defer adding new work to + * the system. To defer the work we will ignore the request and + * rely on the peer's transport layer to retransmit the message. + * We usually work off the overload within a few seconds. The + * alternative is to send back a 503 response to these requests + * and be done with it. + */ + ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n", + pjsip_rx_data_get_info(rdata)); + return PJ_TRUE; + } + + /* Pick a serializer for the out-of-dialog request. */ + serializer = ast_sip_get_distributor_serializer(rdata); } pjsip_rx_data_clone(rdata, 0, &clone); @@ -329,18 +452,9 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint); } - if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) { - /* When the threadpool is backed up this much, there is a good chance that we have encountered - * some sort of terrible condition and don't need to be adding more work to the threadpool. - * It's in our best interest to send back a 503 response and be done with it. - */ - if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); - } + if (ast_sip_push_task(serializer, distribute, clone)) { ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]); pjsip_rx_data_free_cloned(clone); - } else { - ast_sip_push_task(serializer, distribute, clone); } ast_taskprocessor_unreference(serializer); @@ -787,6 +901,7 @@ static int cli_unid_print_header(void *obj, void *arg, int flags) return 0; } + static int cli_unid_print_body(void *obj, void *arg, int flags) { struct unidentified_request *unid = obj; @@ -877,6 +992,47 @@ static struct ast_sorcery_observer global_observer = { .loaded = global_loaded, }; +/*! + * \internal + * \brief Shutdown the serializers in the distributor pool. + * \since 13.10.0 + * + * \return Nothing + */ +static void distributor_pool_shutdown(void) +{ + int idx; + + for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) { + ast_taskprocessor_unreference(distributor_pool[idx]); + distributor_pool[idx] = NULL; + } +} + +/*! + * \internal + * \brief Setup the serializers in the distributor pool. + * \since 13.10.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int distributor_pool_setup(void) +{ + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + int idx; + + for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) { + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor"); + + distributor_pool[idx] = ast_sip_create_serializer(tps_name); + if (!distributor_pool[idx]) { + return -1; + } + } + return 0; +} int ast_sip_initialize_distributor(void) { @@ -886,6 +1042,11 @@ int ast_sip_initialize_distributor(void) return -1; } + if (distributor_pool_setup()) { + ast_sip_destroy_distributor(); + return -1; + } + prune_context = ast_sched_context_create(); if (!prune_context) { ast_sip_destroy_distributor(); @@ -918,8 +1079,10 @@ int ast_sip_initialize_distributor(void) return -1; } - unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL); + unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL, + AO2_ALLOC_OPT_LOCK_NOLOCK); if (!unid_formatter) { + ast_sip_destroy_distributor(); ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n"); return -1; } @@ -931,6 +1094,7 @@ int ast_sip_initialize_distributor(void) unid_formatter->get_id = cli_unid_get_id; unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id; ast_sip_register_cli_formatter(unid_formatter); + ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands)); return 0; @@ -941,17 +1105,20 @@ void ast_sip_destroy_distributor(void) ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); ast_sip_unregister_cli_formatter(unid_formatter); - internal_sip_unregister_service(&distributor_mod); - internal_sip_unregister_service(&endpoint_mod); internal_sip_unregister_service(&auth_mod); + internal_sip_unregister_service(&endpoint_mod); + internal_sip_unregister_service(&distributor_mod); ao2_cleanup(artificial_auth); ao2_cleanup(artificial_endpoint); - ao2_cleanup(unidentified_requests); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); if (prune_context) { ast_sched_context_destroy(prune_context); } + + distributor_pool_shutdown(); + + ao2_cleanup(unidentified_requests); } diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c index 1114336bd..381de3754 100644 --- a/res/res_pjsip/pjsip_options.c +++ b/res/res_pjsip/pjsip_options.c @@ -31,6 +31,7 @@ #include "asterisk/test.h" #include "asterisk/statsd.h" #include "include/res_pjsip_private.h" +#include "asterisk/taskprocessor.h" #define DEFAULT_LANGUAGE "en" #define DEFAULT_ENCODING "text/plain" @@ -1004,6 +1005,8 @@ int ast_sip_initialize_sorcery_qualify(void) /* initialize sorcery ast_sip_contact_status resource */ ast_sorcery_apply_default(sorcery, CONTACT_STATUS, "memory", NULL); + ast_sorcery_object_set_congestion_levels(sorcery, CONTACT_STATUS, -1, + 3 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); if (ast_sorcery_internal_object_register(sorcery, CONTACT_STATUS, contact_status_alloc, NULL, NULL)) { diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 10ffb19fc..06a1b52b1 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -355,7 +355,7 @@ struct ast_sip_publication { struct subscription_persistence { /*! Sorcery object details */ SORCERY_OBJECT(details); - /*! The name of the endpoint involved in the subscrption */ + /*! The name of the endpoint involved in the subscription */ char *endpoint; /*! SIP message that creates the subscription */ char packet[PJSIP_MAX_PKT_LEN]; @@ -1207,10 +1207,9 @@ static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pj pjsip_dlg_inc_session(dlg, &pubsub_module); } -static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint) +static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) { struct sip_subscription_tree *sub_tree; - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor); if (!sub_tree) { @@ -1219,11 +1218,24 @@ static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_e ast_module_ref(ast_module_info->self); - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s", - ast_sorcery_object_get_id(endpoint)); + if (rdata) { + /* + * We must continue using the serializer that the original + * SUBSCRIBE came in on for the dialog. There may be + * retransmissions already enqueued in the original + * serializer that can result in reentrancy and message + * sequencing problems. + */ + sub_tree->serializer = ast_sip_get_distributor_serializer(rdata); + } else { + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - sub_tree->serializer = ast_sip_create_serializer(tps_name); + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s", + ast_sorcery_object_get_id(endpoint)); + + sub_tree->serializer = ast_sip_create_serializer(tps_name); + } if (!sub_tree->serializer) { ao2_ref(sub_tree, -1); return NULL; @@ -1264,7 +1276,7 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s pjsip_dialog *dlg; struct subscription_persistence *persistence; - sub_tree = allocate_subscription_tree(endpoint); + sub_tree = allocate_subscription_tree(endpoint, rdata); if (!sub_tree) { *dlg_status = PJ_ENOMEM; return NULL; @@ -1313,109 +1325,176 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s static int initial_notify_task(void *obj); static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state); -/*! \brief Callback function to perform the actual recreation of a subscription */ -static int subscription_persistence_recreate(void *obj, void *arg, int flags) +/*! Persistent subscription recreation continuation under distributor serializer data */ +struct persistence_recreate_data { + struct subscription_persistence *persistence; + pjsip_rx_data *rdata; +}; + +/*! + * \internal + * \brief subscription_persistence_recreate continuation under distributor serializer. + * \since 13.10.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int sub_persistence_recreate(void *obj) { - struct subscription_persistence *persistence = obj; - pj_pool_t *pool = arg; - pjsip_rx_data rdata = { { 0, }, }; - RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + struct persistence_recreate_data *recreate_data = obj; + struct subscription_persistence *persistence = recreate_data->persistence; + pjsip_rx_data *rdata = recreate_data->rdata; + struct ast_sip_endpoint *endpoint; struct sip_subscription_tree *sub_tree; struct ast_sip_pubsub_body_generator *generator; - int resp; + struct ast_sip_subscription_handler *handler; char *resource; - size_t resource_size; pjsip_sip_uri *request_uri; + size_t resource_size; + int resp; struct resource_tree tree; pjsip_expires_hdr *expires_header; - struct ast_sip_subscription_handler *handler; - /* If this subscription has already expired remove it */ - if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } + request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri); + resource_size = pj_strlen(&request_uri->user) + 1; + resource = ast_alloca(resource_size); + ast_copy_pj_str(resource, &request_uri->user, resource_size); - endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint); - if (!endpoint) { - ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n", + handler = subscription_get_handler_from_rdata(rdata); + if (!handler || !handler->notifier) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n", persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); return 0; } - pj_pool_reset(pool); - rdata.tp_info.pool = pool; - - if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port, - persistence->transport_key, persistence->local_name, persistence->local_port)) { - ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n", + generator = subscription_get_generator_from_rdata(rdata, handler); + if (!generator) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n", persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); return 0; } - if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) { - ast_log(LOG_NOTICE, "Endpoint %s persisted a SIP response instead of a subscribe request. Unable to reload subscription.\n", - ast_sorcery_object_get_id(endpoint)); + ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data, + pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); + + /* Getting the endpoint may take some time that can affect the expiration. */ + endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", + persistence->endpoint); + if (!endpoint) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n", + persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); return 0; } - request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri); - resource_size = pj_strlen(&request_uri->user) + 1; - resource = ast_alloca(resource_size); - ast_copy_pj_str(resource, &request_uri->user, resource_size); - /* Update the expiration header with the new expiration */ - expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next); + expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, + rdata->msg_info.msg->hdr.next); if (!expires_header) { - expires_header = pjsip_expires_hdr_create(pool, 0); + expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0); if (!expires_header) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n", + persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); return 0; } - pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header); + pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header); } expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); - - handler = subscription_get_handler_from_rdata(&rdata); - if (!handler || !handler->notifier) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } - - generator = subscription_get_generator_from_rdata(&rdata, handler); - if (!generator) { + if (expires_header->ivalue <= 0) { + /* The subscription expired since we started recreating the subscription. */ ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); return 0; } - ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, - pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); - memset(&tree, 0, sizeof(tree)); resp = build_resource_tree(endpoint, handler, resource, &tree, - ast_sip_pubsub_has_eventlist_support(&rdata)); + ast_sip_pubsub_has_eventlist_support(rdata)); if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { pj_status_t dlg_status; - sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree, &dlg_status); + sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, + &tree, &dlg_status); if (!sub_tree) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint); - return 0; - } - sub_tree->persistence = ao2_bump(persistence); - subscription_persistence_update(sub_tree, &rdata); - if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) { - pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); - ao2_ref(sub_tree, -1); + if (dlg_status != PJ_EEXISTS) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + } else { + sub_tree->persistence = ao2_bump(persistence); + subscription_persistence_update(sub_tree, rdata); + if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, + ao2_bump(sub_tree))) { + /* Could not send initial subscribe NOTIFY */ + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + ao2_ref(sub_tree, -1); + } } } else { ast_sorcery_delete(ast_sip_get_sorcery(), persistence); } resource_tree_destroy(&tree); + ao2_ref(endpoint, -1); + + return 0; +} + +/*! \brief Callback function to perform the actual recreation of a subscription */ +static int subscription_persistence_recreate(void *obj, void *arg, int flags) +{ + struct subscription_persistence *persistence = obj; + pj_pool_t *pool = arg; + struct ast_taskprocessor *serializer; + pjsip_rx_data rdata; + struct persistence_recreate_data recreate_data; + + /* If this subscription has already expired remove it */ + if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + memset(&rdata, 0, sizeof(rdata)); + pj_pool_reset(pool); + rdata.tp_info.pool = pool; + + if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port, + persistence->transport_key, persistence->local_name, persistence->local_port)) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) { + ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + /* Continue the remainder in the distributor serializer */ + serializer = ast_sip_get_distributor_serializer(&rdata); + if (!serializer) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + recreate_data.persistence = persistence; + recreate_data.rdata = &rdata; + if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + ast_taskprocessor_unreference(serializer); return 0; } @@ -1571,7 +1650,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su pjsip_evsub *evsub; struct sip_subscription_tree *sub_tree = NULL; - sub_tree = allocate_subscription_tree(endpoint); + sub_tree = allocate_subscription_tree(endpoint, NULL); if (!sub_tree) { return NULL; } diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c index 0e14ab786..a39dac676 100644 --- a/res/res_pjsip_registrar.c +++ b/res/res_pjsip_registrar.c @@ -231,155 +231,11 @@ static void registrar_add_date_header(pjsip_tx_data *tdata) ast_sip_add_header(tdata, "Date", date); } -#define SERIALIZER_BUCKETS 59 - -static struct ao2_container *serializers; - -/*! \brief Serializer with associated aor key */ -struct serializer { - /* Serializer to distribute tasks to */ - struct ast_taskprocessor *serializer; - /* The name of the aor to associate with the serializer */ - char aor_name[0]; -}; - -static void serializer_destroy(void *obj) -{ - struct serializer *ser = obj; - - ast_taskprocessor_unreference(ser->serializer); -} - -static struct serializer *serializer_create(const char *aor_name) -{ - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - size_t size = strlen(aor_name) + 1; - struct serializer *ser = ao2_alloc( - sizeof(*ser) + size, serializer_destroy); - - if (!ser) { - return NULL; - } - - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/aor/%s", - aor_name); - - if (!(ser->serializer = ast_sip_create_serializer(tps_name))) { - ao2_ref(ser, -1); - return NULL; - } - - strcpy(ser->aor_name, aor_name); - return ser; -} - -static struct serializer *serializer_find_or_create(const char *aor_name) -{ - struct serializer *ser = ao2_find(serializers, aor_name, OBJ_SEARCH_KEY); - - if (ser) { - return ser; - } - - if (!(ser = serializer_create(aor_name))) { - return NULL; - } - - ao2_link(serializers, ser); - return ser; -} - -static int serializer_hash(const void *obj, const int flags) -{ - const struct serializer *object; - const char *key; - - switch (flags & OBJ_SEARCH_MASK) { - case OBJ_SEARCH_KEY: - key = obj; - return ast_str_hash(key); - case OBJ_SEARCH_OBJECT: - object = obj; - return ast_str_hash(object->aor_name); - default: - /* Hash can only work on something with a full key. */ - ast_assert(0); - return 0; - } -} - -static int serializer_cmp(void *obj_left, void *obj_right, int flags) -{ - const struct serializer *object_left = obj_left; - const struct serializer *object_right = obj_right; - const char *right_key = obj_right; - int cmp; - - switch (flags & OBJ_SEARCH_MASK) { - case OBJ_SEARCH_OBJECT: - right_key = object_right->aor_name; - /* Fall through */ - case OBJ_SEARCH_KEY: - cmp = strcmp(object_left->aor_name, right_key); - break; - case OBJ_SEARCH_PARTIAL_KEY: - /* - * We could also use a partial key struct containing a length - * so strlen() does not get called for every comparison instead. - */ - cmp = strncmp(object_left->aor_name, right_key, strlen(right_key)); - break; - default: - cmp = 0; - break; - } - - return cmp ? 0 : CMP_MATCH; -} - -struct rx_task_data { - pjsip_rx_data *rdata; - struct ast_sip_endpoint *endpoint; - struct ast_sip_aor *aor; -}; - -static void rx_task_data_destroy(void *obj) -{ - struct rx_task_data *task_data = obj; - - pjsip_rx_data_free_cloned(task_data->rdata); - ao2_cleanup(task_data->endpoint); - ao2_cleanup(task_data->aor); -} - -static struct rx_task_data *rx_task_data_create(pjsip_rx_data *rdata, - struct ast_sip_endpoint *endpoint, - struct ast_sip_aor *aor) -{ - struct rx_task_data *task_data = ao2_alloc( - sizeof(*task_data), rx_task_data_destroy); - - if (!task_data) { - return NULL; - } - - pjsip_rx_data_clone(rdata, 0, &task_data->rdata); - - task_data->endpoint = endpoint; - ao2_ref(task_data->endpoint, +1); - - task_data->aor = aor; - ao2_ref(task_data->aor, +1); - - return task_data; -} - static const pj_str_t path_hdr_name = { "Path", 4 }; -static int build_path_data(struct rx_task_data *task_data, struct ast_str **path_str) +static int build_path_data(pjsip_rx_data *rdata, struct ast_str **path_str) { - pjsip_generic_string_hdr *path_hdr = pjsip_msg_find_hdr_by_name(task_data->rdata->msg_info.msg, &path_hdr_name, NULL); + pjsip_generic_string_hdr *path_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &path_hdr_name, NULL); if (!path_hdr) { return 0; @@ -392,24 +248,24 @@ static int build_path_data(struct rx_task_data *task_data, struct ast_str **path ast_str_set(path_str, 0, "%.*s", (int)path_hdr->hvalue.slen, path_hdr->hvalue.ptr); - while ((path_hdr = (pjsip_generic_string_hdr *) pjsip_msg_find_hdr_by_name(task_data->rdata->msg_info.msg, &path_hdr_name, path_hdr->next))) { + while ((path_hdr = (pjsip_generic_string_hdr *) pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &path_hdr_name, path_hdr->next))) { ast_str_append(path_str, 0, ",%.*s", (int)path_hdr->hvalue.slen, path_hdr->hvalue.ptr); } return 0; } -static int registrar_validate_path(struct rx_task_data *task_data, struct ast_str **path_str) +static int registrar_validate_path(pjsip_rx_data *rdata, struct ast_sip_aor *aor, struct ast_str **path_str) { const pj_str_t path_supported_name = { "path", 4 }; pjsip_supported_hdr *supported_hdr; int i; - if (!task_data->aor->support_path) { + if (!aor->support_path) { return 0; } - if (build_path_data(task_data, path_str)) { + if (build_path_data(rdata, path_str)) { return -1; } @@ -417,7 +273,7 @@ static int registrar_validate_path(struct rx_task_data *task_data, struct ast_st return 0; } - supported_hdr = pjsip_msg_find_hdr(task_data->rdata->msg_info.msg, PJSIP_H_SUPPORTED, NULL); + supported_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, NULL); if (!supported_hdr) { return -1; } @@ -433,8 +289,11 @@ static int registrar_validate_path(struct rx_task_data *task_data, struct ast_st return -1; } -static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *contacts, - const char *aor_name) +static int register_aor_core(pjsip_rx_data *rdata, + struct ast_sip_endpoint *endpoint, + struct ast_sip_aor *aor, + const char *aor_name, + struct ao2_container *contacts) { static const pj_str_t USER_AGENT = { "User-Agent", 10 }; @@ -458,38 +317,38 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co /* So we don't count static contacts against max_contacts we prune them out from the container */ ao2_callback(contacts, OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE, registrar_prune_static, NULL); - if (registrar_validate_contacts(task_data->rdata, contacts, task_data->aor, &added, &updated, &deleted)) { + if (registrar_validate_contacts(rdata, contacts, aor, &added, &updated, &deleted)) { /* The provided Contact headers do not conform to the specification */ - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), task_data->rdata, 400, NULL, NULL, NULL); - ast_sip_report_failed_acl(task_data->endpoint, task_data->rdata, "registrar_invalid_contacts_provided"); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); + ast_sip_report_failed_acl(endpoint, rdata, "registrar_invalid_contacts_provided"); ast_log(LOG_WARNING, "Failed to validate contacts in REGISTER request from '%s'\n", - ast_sorcery_object_get_id(task_data->endpoint)); + ast_sorcery_object_get_id(endpoint)); return PJ_TRUE; } - if (registrar_validate_path(task_data, &path_str)) { + if (registrar_validate_path(rdata, aor, &path_str)) { /* Ensure that intervening proxies did not make invalid modifications to the request */ - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), task_data->rdata, 420, NULL, NULL, NULL); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 420, NULL, NULL, NULL); ast_log(LOG_WARNING, "Invalid modifications made to REGISTER request from '%s' by intervening proxy\n", - ast_sorcery_object_get_id(task_data->endpoint)); + ast_sorcery_object_get_id(endpoint)); return PJ_TRUE; } - if ((MAX(added - deleted, 0) + (!task_data->aor->remove_existing ? ao2_container_count(contacts) : 0)) > task_data->aor->max_contacts) { + if ((MAX(added - deleted, 0) + (!aor->remove_existing ? ao2_container_count(contacts) : 0)) > aor->max_contacts) { /* Enforce the maximum number of contacts */ - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), task_data->rdata, 403, NULL, NULL, NULL); - ast_sip_report_failed_acl(task_data->endpoint, task_data->rdata, "registrar_attempt_exceeds_maximum_configured_contacts"); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL); + ast_sip_report_failed_acl(endpoint, rdata, "registrar_attempt_exceeds_maximum_configured_contacts"); ast_log(LOG_WARNING, "Registration attempt from endpoint '%s' to AOR '%s' will exceed max contacts of %u\n", - ast_sorcery_object_get_id(task_data->endpoint), ast_sorcery_object_get_id(task_data->aor), task_data->aor->max_contacts); + ast_sorcery_object_get_id(endpoint), aor_name, aor->max_contacts); return PJ_TRUE; } if (!(details.pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "Contact Comparison", 256, 256))) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), task_data->rdata, 500, NULL, NULL, NULL); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); return PJ_TRUE; } - user_agent_hdr = pjsip_msg_find_hdr_by_name(task_data->rdata->msg_info.msg, &USER_AGENT, NULL); + user_agent_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &USER_AGENT, NULL); if (user_agent_hdr) { alloc_size = pj_strlen(&user_agent_hdr->hvalue) + 1; user_agent = ast_alloca(alloc_size); @@ -497,10 +356,10 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co } /* Find the first Via header */ - via_hdr = via_hdr_last = (pjsip_via_hdr*) pjsip_msg_find_hdr(task_data->rdata->msg_info.msg, PJSIP_H_VIA, NULL); + via_hdr = via_hdr_last = (pjsip_via_hdr*) pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_VIA, NULL); if (via_hdr) { /* Find the last Via header */ - while ( (via_hdr = (pjsip_via_hdr*) pjsip_msg_find_hdr(task_data->rdata->msg_info.msg, + while ( (via_hdr = (pjsip_via_hdr*) pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_VIA, via_hdr->next)) != NULL) { via_hdr_last = via_hdr; } @@ -510,7 +369,7 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co via_port=via_hdr_last->sent_by.port; } - call_id_hdr = (pjsip_cid_hdr*) pjsip_msg_find_hdr(task_data->rdata->msg_info.msg, PJSIP_H_CALL_ID, NULL); + call_id_hdr = (pjsip_cid_hdr*) pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CALL_ID, NULL); if (call_id_hdr) { alloc_size = pj_strlen(&call_id_hdr->id) + 1; call_id = ast_alloca(alloc_size); @@ -518,7 +377,7 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co } /* Iterate each provided Contact header and add, update, or delete */ - while ((contact_hdr = pjsip_msg_find_hdr(task_data->rdata->msg_info.msg, PJSIP_H_CONTACT, contact_hdr ? contact_hdr->next : NULL))) { + while ((contact_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, contact_hdr ? contact_hdr->next : NULL))) { int expiration; char contact_uri[pjsip_max_url_size]; RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup); @@ -534,7 +393,7 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co continue; } - expiration = registrar_get_expiration(task_data->aor, contact_hdr, task_data->rdata); + expiration = registrar_get_expiration(aor, contact_hdr, rdata); details.uri = pjsip_uri_get_uri(contact_hdr->uri); pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, details.uri, contact_uri, sizeof(contact_uri)); @@ -546,9 +405,9 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co continue; } - if (ast_sip_location_add_contact_nolock(task_data->aor, contact_uri, ast_tvadd(ast_tvnow(), + if (ast_sip_location_add_contact_nolock(aor, contact_uri, ast_tvadd(ast_tvnow(), ast_samp2tv(expiration, 1)), path_str ? ast_str_buffer(path_str) : NULL, - user_agent, via_addr, via_port, call_id, task_data->endpoint)) { + user_agent, via_addr, via_port, call_id, endpoint)) { ast_log(LOG_ERROR, "Unable to bind contact '%s' to AOR '%s'\n", contact_uri, aor_name); continue; @@ -576,8 +435,8 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co } contact_update->expiration_time = ast_tvadd(ast_tvnow(), ast_samp2tv(expiration, 1)); - contact_update->qualify_frequency = task_data->aor->qualify_frequency; - contact_update->authenticate_qualify = task_data->aor->authenticate_qualify; + contact_update->qualify_frequency = aor->qualify_frequency; + contact_update->authenticate_qualify = aor->authenticate_qualify; if (path_str) { ast_string_field_set(contact_update, path, ast_str_buffer(path_str)); } @@ -625,16 +484,16 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co /* If the AOR is configured to remove any existing contacts that have not been updated/added as a result of this REGISTER * do so */ - if (task_data->aor->remove_existing) { + if (aor->remove_existing) { ao2_callback(contacts, OBJ_NODATA | OBJ_MULTIPLE, registrar_delete_contact, NULL); } /* Re-retrieve contacts. Caller will clean up the original container. */ - contacts = ast_sip_location_retrieve_aor_contacts_nolock(task_data->aor); + contacts = ast_sip_location_retrieve_aor_contacts_nolock(aor); response_contact = ao2_callback(contacts, 0, NULL, NULL); /* Send a response containing all of the contacts (including static) that are present on this AOR */ - if (ast_sip_create_response(task_data->rdata, 200, response_contact, &tdata) != PJ_SUCCESS) { + if (ast_sip_create_response(rdata, 200, response_contact, &tdata) != PJ_SUCCESS) { ao2_cleanup(response_contact); ao2_cleanup(contacts); return PJ_TRUE; @@ -647,44 +506,42 @@ static int rx_task_core(struct rx_task_data *task_data, struct ao2_container *co ao2_callback(contacts, 0, registrar_add_contact, tdata); ao2_cleanup(contacts); - if ((expires_hdr = pjsip_msg_find_hdr(task_data->rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL))) { - expires_hdr = pjsip_expires_hdr_create(tdata->pool, registrar_get_expiration(task_data->aor, NULL, task_data->rdata)); + if ((expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL))) { + expires_hdr = pjsip_expires_hdr_create(tdata->pool, registrar_get_expiration(aor, NULL, rdata)); pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr*)expires_hdr); } - ast_sip_send_stateful_response(task_data->rdata, tdata, task_data->endpoint); + ast_sip_send_stateful_response(rdata, tdata, endpoint); return PJ_TRUE; } -static int rx_task(void *data) +static int register_aor(pjsip_rx_data *rdata, + struct ast_sip_endpoint *endpoint, + struct ast_sip_aor *aor, + const char *aor_name) { int res; - struct rx_task_data *task_data = data; struct ao2_container *contacts = NULL; struct ast_named_lock *lock; - const char *aor_name = ast_sorcery_object_get_id(task_data->aor); lock = ast_named_lock_get(AST_NAMED_LOCK_TYPE_RWLOCK, "aor", aor_name); if (!lock) { - ao2_cleanup(task_data); return PJ_TRUE; } ao2_wrlock(lock); - contacts = ast_sip_location_retrieve_aor_contacts_nolock(task_data->aor); + contacts = ast_sip_location_retrieve_aor_contacts_nolock(aor); if (!contacts) { ao2_unlock(lock); ast_named_lock_put(lock); - ao2_cleanup(task_data); return PJ_TRUE; } - res = rx_task_core(task_data, contacts, aor_name); + res = register_aor_core(rdata, endpoint, aor, aor_name, contacts); ao2_cleanup(contacts); ao2_unlock(lock); ast_named_lock_put(lock); - ao2_cleanup(task_data); return res; } @@ -748,44 +605,20 @@ static char *find_aor_name(const char *username, const char *domain, const char return NULL; } -static pj_bool_t registrar_on_rx_request(struct pjsip_rx_data *rdata) +static struct ast_sip_aor *find_registrar_aor(struct pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint) { - RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup); - struct rx_task_data *task_data; - - RAII_VAR(struct ast_sip_endpoint *, endpoint, - ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup); - RAII_VAR(struct ast_sip_aor *, aor, NULL, ao2_cleanup); - char *domain_name = NULL; + struct ast_sip_aor *aor = NULL; + char *aor_name = NULL; + char *domain_name; char *username = NULL; - RAII_VAR(char *, aor_name, NULL, ast_free); int i; - if (pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_register_method) || !endpoint) { - return PJ_FALSE; - } - - if (ast_strlen_zero(endpoint->aors)) { - /* Short circuit early if the endpoint has no AORs configured on it, which means no registration possible */ - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL); - ast_sip_report_failed_acl(endpoint, rdata, "registrar_attempt_without_configured_aors"); - ast_log(LOG_WARNING, "Endpoint '%s' has no configured AORs\n", ast_sorcery_object_get_id(endpoint)); - return PJ_TRUE; - } - - if (!PJSIP_URI_SCHEME_IS_SIP(rdata->msg_info.to->uri) && !PJSIP_URI_SCHEME_IS_SIPS(rdata->msg_info.to->uri)) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); - ast_sip_report_failed_acl(endpoint, rdata, "registrar_invalid_uri_in_to_received"); - ast_log(LOG_WARNING, "Endpoint '%s' attempted to register to an AOR with a non-SIP URI\n", ast_sorcery_object_get_id(endpoint)); - return PJ_TRUE; - } - - for (i = 0; i < AST_VECTOR_SIZE(&endpoint->ident_method_order); i++) { + for (i = 0; i < AST_VECTOR_SIZE(&endpoint->ident_method_order); ++i) { pjsip_sip_uri *uri; pjsip_authorization_hdr *header = NULL; switch (AST_VECTOR_GET(&endpoint->ident_method_order, i)) { - case AST_SIP_ENDPOINT_IDENTIFY_BY_USERNAME : + case AST_SIP_ENDPOINT_IDENTIFY_BY_USERNAME: uri = pjsip_uri_get_uri(rdata->msg_info.to->uri); domain_name = ast_alloca(uri->host.slen + 1); @@ -798,7 +631,7 @@ static pj_bool_t registrar_on_rx_request(struct pjsip_rx_data *rdata) ast_debug(3, "Matched aor '%s' by To username\n", aor_name); } break; - case AST_SIP_ENDPOINT_IDENTIFY_BY_AUTH_USERNAME : + case AST_SIP_ENDPOINT_IDENTIFY_BY_AUTH_USERNAME: while ((header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_AUTHORIZATION, header ? header->next : NULL))) { if (header && !pj_stricmp2(&header->scheme, "digest")) { @@ -828,42 +661,57 @@ static pj_bool_t registrar_on_rx_request(struct pjsip_rx_data *rdata) /* The provided AOR name was not found (be it within the configuration or sorcery itself) */ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL); ast_sip_report_req_no_support(endpoint, rdata, "registrar_requested_aor_not_found"); - ast_log(LOG_WARNING, "AOR '%s' not found for endpoint '%s'\n", username, ast_sorcery_object_get_id(endpoint)); - return PJ_TRUE; + ast_log(LOG_WARNING, "AOR '%s' not found for endpoint '%s'\n", + username ?: "", ast_sorcery_object_get_id(endpoint)); } + ast_free(aor_name); + return aor; +} - if (!aor->max_contacts) { - /* Registration is not permitted for this AOR */ +static pj_bool_t registrar_on_rx_request(struct pjsip_rx_data *rdata) +{ + RAII_VAR(struct ast_sip_endpoint *, endpoint, + ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup); + struct ast_sip_aor *aor; + const char *aor_name; + + if (pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_register_method) || !endpoint) { + return PJ_FALSE; + } + + if (ast_strlen_zero(endpoint->aors)) { + /* Short circuit early if the endpoint has no AORs configured on it, which means no registration possible */ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL); - ast_sip_report_req_no_support(endpoint, rdata, "registrar_attempt_without_registration_permitted"); - ast_log(LOG_WARNING, "AOR '%s' has no configured max_contacts. Endpoint '%s' unable to register\n", - ast_sorcery_object_get_id(aor), ast_sorcery_object_get_id(endpoint)); + ast_sip_report_failed_acl(endpoint, rdata, "registrar_attempt_without_configured_aors"); + ast_log(LOG_WARNING, "Endpoint '%s' has no configured AORs\n", ast_sorcery_object_get_id(endpoint)); return PJ_TRUE; } - if (!(ser = serializer_find_or_create(aor_name))) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL); - ast_sip_report_mem_limit(endpoint, rdata); - ast_log(LOG_WARNING, "Endpoint '%s' unable to register on AOR '%s' - could not get serializer\n", - ast_sorcery_object_get_id(endpoint), ast_sorcery_object_get_id(aor)); + if (!PJSIP_URI_SCHEME_IS_SIP(rdata->msg_info.to->uri) && !PJSIP_URI_SCHEME_IS_SIPS(rdata->msg_info.to->uri)) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); + ast_sip_report_failed_acl(endpoint, rdata, "registrar_invalid_uri_in_to_received"); + ast_log(LOG_WARNING, "Endpoint '%s' attempted to register to an AOR with a non-SIP URI\n", ast_sorcery_object_get_id(endpoint)); return PJ_TRUE; } - if (!(task_data = rx_task_data_create(rdata, endpoint, aor))) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL); - ast_sip_report_mem_limit(endpoint, rdata); - ast_log(LOG_WARNING, "Endpoint '%s' unable to register on AOR '%s' - could not create rx_task_data\n", - ast_sorcery_object_get_id(endpoint), ast_sorcery_object_get_id(aor)); + aor = find_registrar_aor(rdata, endpoint); + if (!aor) { + /* We've already responded about not finding an AOR. */ return PJ_TRUE; } - if (ast_sip_push_task(ser->serializer, rx_task, task_data)) { + aor_name = ast_sorcery_object_get_id(aor); + + if (!aor->max_contacts) { + /* Registration is not permitted for this AOR */ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL); - ast_sip_report_mem_limit(endpoint, rdata); - ast_log(LOG_WARNING, "Endpoint '%s' unable to register on AOR '%s' - could not serialize task\n", - ast_sorcery_object_get_id(endpoint), ast_sorcery_object_get_id(aor)); - ao2_ref(task_data, -1); + ast_sip_report_req_no_support(endpoint, rdata, "registrar_attempt_without_registration_permitted"); + ast_log(LOG_WARNING, "AOR '%s' has no configured max_contacts. Endpoint '%s' unable to register\n", + aor_name, ast_sorcery_object_get_id(endpoint)); + } else { + register_aor(rdata, endpoint, aor, aor_name); } + ao2_ref(aor, -1); return PJ_TRUE; } @@ -952,11 +800,6 @@ static int load_module(void) CHECK_PJSIP_MODULE_LOADED(); - if (!(serializers = ao2_container_alloc( - SERIALIZER_BUCKETS, serializer_hash, serializer_cmp))) { - return AST_MODULE_LOAD_DECLINE; - } - if (ast_sip_register_service(®istrar_module)) { return AST_MODULE_LOAD_DECLINE; } @@ -976,8 +819,6 @@ static int unload_module(void) { ast_manager_unregister(AMI_SHOW_REGISTRATIONS); ast_sip_unregister_service(®istrar_module); - - ao2_cleanup(serializers); return 0; } diff --git a/res/res_pjsip_session.c b/res/res_pjsip_session.c index 59d759fdc..67cd09ddf 100644 --- a/res/res_pjsip_session.c +++ b/res/res_pjsip_session.c @@ -1403,12 +1403,11 @@ struct ast_sip_channel_pvt *ast_sip_channel_pvt_alloc(void *pvt, struct ast_sip_ } struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, - struct ast_sip_contact *contact, pjsip_inv_session *inv_session) + struct ast_sip_contact *contact, pjsip_inv_session *inv_session, pjsip_rx_data *rdata) { RAII_VAR(struct ast_sip_session *, session, NULL, ao2_cleanup); struct ast_sip_session_supplement *iter; int dsp_features = 0; - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; session = ao2_alloc(sizeof(*session), session_destructor); if (!session) { @@ -1429,11 +1428,24 @@ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, /* fill session->media with available types */ ao2_callback(sdp_handlers, OBJ_NODATA, add_session_media, session); - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/session/%s", - ast_sorcery_object_get_id(endpoint)); + if (rdata) { + /* + * We must continue using the serializer that the original + * INVITE came in on for the dialog. There may be + * retransmissions already enqueued in the original + * serializer that can result in reentrancy and message + * sequencing problems. + */ + session->serializer = ast_sip_get_distributor_serializer(rdata); + } else { + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - session->serializer = ast_sip_create_serializer(tps_name); + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outsess/%s", + ast_sorcery_object_get_id(endpoint)); + + session->serializer = ast_sip_create_serializer(tps_name); + } if (!session->serializer) { return NULL; } @@ -1731,7 +1743,9 @@ struct ast_sip_session *ast_sip_session_create_outgoing(struct ast_sip_endpoint timer.sess_expires = endpoint->extensions.timer.sess_expires; pjsip_timer_init_session(inv_session, &timer); - if (!(session = ast_sip_session_alloc(endpoint, found_contact ? found_contact : contact, inv_session))) { + session = ast_sip_session_alloc(endpoint, found_contact ? found_contact : contact, + inv_session, NULL); + if (!session) { pjsip_inv_terminate(inv_session, 500, PJ_FALSE); return NULL; } @@ -2142,7 +2156,7 @@ static void handle_new_invite_request(pjsip_rx_data *rdata) return; } - session = ast_sip_session_alloc(endpoint, NULL, inv_session); + session = ast_sip_session_alloc(endpoint, NULL, inv_session, rdata); if (!session) { if (pjsip_inv_initial_answer(inv_session, rdata, 500, NULL, NULL, &tdata) == PJ_SUCCESS) { pjsip_inv_terminate(inv_session, 500, PJ_FALSE); diff --git a/tests/test_netsock2.c b/tests/test_netsock2.c index 638ff37cc..780b0b06f 100644 --- a/tests/test_netsock2.c +++ b/tests/test_netsock2.c @@ -75,7 +75,7 @@ AST_TEST_DEFINE(parsing) }; size_t x; - struct ast_sockaddr addr = { { 0, 0, } }; + struct ast_sockaddr addr; int parse_result; switch (cmd) { @@ -91,15 +91,17 @@ AST_TEST_DEFINE(parsing) } for (x = 0; x < ARRAY_LEN(test_vals); x++) { + memset(&addr, 0, sizeof(addr)); if ((parse_result = ast_sockaddr_parse(&addr, test_vals[x].address, 0)) != test_vals[x].expected_result) { ast_test_status_update(test, "On '%s' expected %d but got %d\n", test_vals[x].address, test_vals[x].expected_result, parse_result); res = AST_TEST_FAIL; } if (parse_result) { - struct ast_sockaddr tmp_addr = { { 0, 0, } }; + struct ast_sockaddr tmp_addr; const char *tmp; tmp = ast_sockaddr_stringify(&addr); + memset(&tmp_addr, 0, sizeof(tmp_addr)); ast_sockaddr_parse(&tmp_addr, tmp, 0); if (ast_sockaddr_cmp_addr(&addr, &tmp_addr)) { char buf[64]; |