diff options
Diffstat (limited to 'res')
-rw-r--r-- | res/ari/ari_model_validators.c | 94 | ||||
-rw-r--r-- | res/ari/ari_model_validators.h | 24 | ||||
-rw-r--r-- | res/ari/resource_bridges.c | 44 | ||||
-rw-r--r-- | res/ari/resource_bridges.h | 20 | ||||
-rw-r--r-- | res/ari/resource_channels.c | 9 | ||||
-rw-r--r-- | res/ari/resource_channels.h | 20 | ||||
-rw-r--r-- | res/res_ari.c | 3 | ||||
-rw-r--r-- | res/res_ari_bridges.c | 142 | ||||
-rw-r--r-- | res/res_ari_channels.c | 142 | ||||
-rw-r--r-- | res/res_hep_pjsip.c | 54 | ||||
-rw-r--r-- | res/res_pjsip/config_transport.c | 6 | ||||
-rw-r--r-- | res/res_pjsip_dialog_info_body_generator.c | 2 | ||||
-rw-r--r-- | res/res_pjsip_exten_state.c | 10 | ||||
-rw-r--r-- | res/res_pjsip_outbound_publish.c | 854 | ||||
-rw-r--r-- | res/res_pjsip_outbound_registration.c | 24 | ||||
-rw-r--r-- | res/res_stasis_playback.c | 210 |
16 files changed, 1285 insertions, 373 deletions
diff --git a/res/ari/ari_model_validators.c b/res/ari/ari_model_validators.c index 623d5b541..8f05db035 100644 --- a/res/ari/ari_model_validators.c +++ b/res/ari/ari_model_validators.c @@ -1744,6 +1744,15 @@ int ast_ari_validate_playback(struct ast_json *json) res = 0; } } else + if (strcmp("next_media_uri", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI Playback field next_media_uri failed validation\n"); + res = 0; + } + } else if (strcmp("state", ast_json_object_iter_key(iter)) == 0) { int prop_is_valid; has_state = 1; @@ -4741,6 +4750,9 @@ int ast_ari_validate_event(struct ast_json *json) if (strcmp("PeerStatusChange", discriminator) == 0) { return ast_ari_validate_peer_status_change(json); } else + if (strcmp("PlaybackContinuing", discriminator) == 0) { + return ast_ari_validate_playback_continuing(json); + } else if (strcmp("PlaybackFinished", discriminator) == 0) { return ast_ari_validate_playback_finished(json); } else @@ -4930,6 +4942,9 @@ int ast_ari_validate_message(struct ast_json *json) if (strcmp("PeerStatusChange", discriminator) == 0) { return ast_ari_validate_peer_status_change(json); } else + if (strcmp("PlaybackContinuing", discriminator) == 0) { + return ast_ari_validate_playback_continuing(json); + } else if (strcmp("PlaybackFinished", discriminator) == 0) { return ast_ari_validate_playback_finished(json); } else @@ -5216,6 +5231,85 @@ ari_validator ast_ari_validate_peer_status_change_fn(void) return ast_ari_validate_peer_status_change; } +int ast_ari_validate_playback_continuing(struct ast_json *json) +{ + int res = 1; + struct ast_json_iter *iter; + int has_type = 0; + int has_application = 0; + int has_playback = 0; + + for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) { + if (strcmp("type", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_type = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI PlaybackContinuing field type failed validation\n"); + res = 0; + } + } else + if (strcmp("application", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_application = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI PlaybackContinuing field application failed validation\n"); + res = 0; + } + } else + if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_date( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI PlaybackContinuing field timestamp failed validation\n"); + res = 0; + } + } else + if (strcmp("playback", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_playback = 1; + prop_is_valid = ast_ari_validate_playback( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI PlaybackContinuing field playback failed validation\n"); + res = 0; + } + } else + { + ast_log(LOG_ERROR, + "ARI PlaybackContinuing has undocumented field %s\n", + ast_json_object_iter_key(iter)); + res = 0; + } + } + + if (!has_type) { + ast_log(LOG_ERROR, "ARI PlaybackContinuing missing required field type\n"); + res = 0; + } + + if (!has_application) { + ast_log(LOG_ERROR, "ARI PlaybackContinuing missing required field application\n"); + res = 0; + } + + if (!has_playback) { + ast_log(LOG_ERROR, "ARI PlaybackContinuing missing required field playback\n"); + res = 0; + } + + return res; +} + +ari_validator ast_ari_validate_playback_continuing_fn(void) +{ + return ast_ari_validate_playback_continuing; +} + int ast_ari_validate_playback_finished(struct ast_json *json) { int res = 1; diff --git a/res/ari/ari_model_validators.h b/res/ari/ari_model_validators.h index 0bcdb0fa2..2634528ba 100644 --- a/res/ari/ari_model_validators.h +++ b/res/ari/ari_model_validators.h @@ -1187,6 +1187,24 @@ int ast_ari_validate_peer_status_change(struct ast_json *json); ari_validator ast_ari_validate_peer_status_change_fn(void); /*! + * \brief Validator for PlaybackContinuing. + * + * Event showing the continuation of a media playback operation from one media URI to the next in the list. + * + * \param json JSON object to validate. + * \returns True (non-zero) if valid. + * \returns False (zero) if invalid. + */ +int ast_ari_validate_playback_continuing(struct ast_json *json); + +/*! + * \brief Function pointer to ast_ari_validate_playback_continuing(). + * + * See \ref ast_ari_model_validators.h for more details. + */ +ari_validator ast_ari_validate_playback_continuing_fn(void); + +/*! * \brief Validator for PlaybackFinished. * * Event showing the completion of a media playback operation. @@ -1457,6 +1475,7 @@ ari_validator ast_ari_validate_application_fn(void); * - id: string (required) * - language: string * - media_uri: string (required) + * - next_media_uri: string * - state: string (required) * - target_uri: string (required) * DeviceState @@ -1670,6 +1689,11 @@ ari_validator ast_ari_validate_application_fn(void); * - timestamp: Date * - endpoint: Endpoint (required) * - peer: Peer (required) + * PlaybackContinuing + * - type: string (required) + * - application: string (required) + * - timestamp: Date + * - playback: Playback (required) * PlaybackFinished * - type: string (required) * - application: string (required) diff --git a/res/ari/resource_bridges.c b/res/ari/resource_bridges.c index 6018c43be..cec443dba 100644 --- a/res/ari/resource_bridges.c +++ b/res/ari/resource_bridges.c @@ -332,7 +332,8 @@ static struct ast_channel *prepare_bridge_media_channel(const char *type) * \brief Performs common setup for a bridge playback operation * with both new controls and when existing controls are found. * - * \param args_media media string split from arguments + * \param args_media medias to play + * \param args_media_count number of media items in \c media * \param args_lang language string split from arguments * \param args_offset_ms milliseconds offset split from arguments * \param args_playback_id string to use for playback split from @@ -346,7 +347,8 @@ static struct ast_channel *prepare_bridge_media_channel(const char *type) * \retval -1 operation failed * \retval operation was successful */ -static int ari_bridges_play_helper(const char *args_media, +static int ari_bridges_play_helper(const char **args_media, + size_t args_media_count, const char *args_lang, int args_offset_ms, int args_skipms, @@ -371,8 +373,8 @@ static int ari_bridges_play_helper(const char *args_media, language = S_OR(args_lang, snapshot->language); - playback = stasis_app_control_play_uri(control, args_media, language, - bridge->uniqueid, STASIS_PLAYBACK_TARGET_BRIDGE, args_skipms, + playback = stasis_app_control_play_uri(control, args_media, args_media_count, + language, bridge->uniqueid, STASIS_PLAYBACK_TARGET_BRIDGE, args_skipms, args_offset_ms, args_playback_id); if (!playback) { @@ -380,7 +382,7 @@ static int ari_bridges_play_helper(const char *args_media, return -1; } - if (ast_asprintf(playback_url, "/playback/%s", + if (ast_asprintf(playback_url, "/playbacks/%s", stasis_app_playback_get_id(playback)) == -1) { playback_url = NULL; ast_ari_response_alloc_failed(response); @@ -396,7 +398,8 @@ static int ari_bridges_play_helper(const char *args_media, return 0; } -static void ari_bridges_play_new(const char *args_media, +static void ari_bridges_play_new(const char **args_media, + size_t args_media_count, const char *args_lang, int args_offset_ms, int args_skipms, @@ -449,9 +452,9 @@ static void ari_bridges_play_new(const char *args_media, } ao2_lock(control); - if (ari_bridges_play_helper(args_media, args_lang, args_offset_ms, - args_skipms, args_playback_id, response, bridge, control, - &json, &playback_url)) { + if (ari_bridges_play_helper(args_media, args_media_count, args_lang, + args_offset_ms, args_skipms, args_playback_id, response, bridge, + control, &json, &playback_url)) { ao2_unlock(control); return; } @@ -497,7 +500,8 @@ enum play_found_result { * \brief Performs common setup for a bridge playback operation * with both new controls and when existing controls are found. * - * \param args_media media string split from arguments + * \param args_media medias to play + * \param args_media_count number of media items in \c media * \param args_lang language string split from arguments * \param args_offset_ms milliseconds offset split from arguments * \param args_playback_id string to use for playback split from @@ -511,7 +515,8 @@ enum play_found_result { * \retval PLAY_FOUND_CHANNEL_UNAVAILABLE The operation failed because * the channel requested to playback with is breaking down. */ -static enum play_found_result ari_bridges_play_found(const char *args_media, +static enum play_found_result ari_bridges_play_found(const char **args_media, + size_t args_media_count, const char *args_lang, int args_offset_ms, int args_skipms, @@ -537,9 +542,9 @@ static enum play_found_result ari_bridges_play_found(const char *args_media, return PLAY_FOUND_CHANNEL_UNAVAILABLE; } - if (ari_bridges_play_helper(args_media, args_lang, args_offset_ms, - args_skipms, args_playback_id, response, bridge, control, - &json, &playback_url)) { + if (ari_bridges_play_helper(args_media, args_media_count, + args_lang, args_offset_ms, args_skipms, args_playback_id, + response, bridge, control, &json, &playback_url)) { ao2_unlock(control); return PLAY_FOUND_FAILURE; } @@ -551,7 +556,8 @@ static enum play_found_result ari_bridges_play_found(const char *args_media, static void ari_bridges_handle_play( const char *args_bridge_id, - const char *args_media, + const char **args_media, + size_t args_media_count, const char *args_lang, int args_offset_ms, int args_skipms, @@ -574,15 +580,15 @@ static void ari_bridges_handle_play( * that will work or else there isn't a channel for this bridge anymore, * in which case we'll revert to ari_bridges_play_new. */ - if (ari_bridges_play_found(args_media, args_lang, args_offset_ms, - args_skipms, args_playback_id, response,bridge, + if (ari_bridges_play_found(args_media, args_media_count, args_lang, + args_offset_ms, args_skipms, args_playback_id, response,bridge, play_channel) == PLAY_FOUND_CHANNEL_UNAVAILABLE) { continue; } return; } - ari_bridges_play_new(args_media, args_lang, args_offset_ms, + ari_bridges_play_new(args_media, args_media_count, args_lang, args_offset_ms, args_skipms, args_playback_id, response, bridge); } @@ -593,6 +599,7 @@ void ast_ari_bridges_play(struct ast_variable *headers, { ari_bridges_handle_play(args->bridge_id, args->media, + args->media_count, args->lang, args->offsetms, args->skipms, @@ -606,6 +613,7 @@ void ast_ari_bridges_play_with_id(struct ast_variable *headers, { ari_bridges_handle_play(args->bridge_id, args->media, + args->media_count, args->lang, args->offsetms, args->skipms, diff --git a/res/ari/resource_bridges.h b/res/ari/resource_bridges.h index 36ff6a017..17a3b8365 100644 --- a/res/ari/resource_bridges.h +++ b/res/ari/resource_bridges.h @@ -245,11 +245,15 @@ void ast_ari_bridges_stop_moh(struct ast_variable *headers, struct ast_ari_bridg struct ast_ari_bridges_play_args { /*! Bridge's id */ const char *bridge_id; - /*! Media's URI to play. */ - const char *media; + /*! Array of Media URIs to play. */ + const char **media; + /*! Length of media array. */ + size_t media_count; + /*! Parsing context for media. */ + char *media_parse; /*! For sounds, selects language for sound. */ const char *lang; - /*! Number of media to skip before playing. */ + /*! Number of milliseconds to skip before playing. Only applies to the first URI if multiple media URIs are specified. */ int offsetms; /*! Number of milliseconds to skip for forward/reverse operations. */ int skipms; @@ -283,11 +287,15 @@ struct ast_ari_bridges_play_with_id_args { const char *bridge_id; /*! Playback ID. */ const char *playback_id; - /*! Media's URI to play. */ - const char *media; + /*! Array of Media URIs to play. */ + const char **media; + /*! Length of media array. */ + size_t media_count; + /*! Parsing context for media. */ + char *media_parse; /*! For sounds, selects language for sound. */ const char *lang; - /*! Number of media to skip before playing. */ + /*! Number of milliseconds to skip before playing. Only applies to the first URI if multiple media URIs are specified. */ int offsetms; /*! Number of milliseconds to skip for forward/reverse operations. */ int skipms; diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index c838bc39c..b42581c84 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -469,7 +469,8 @@ void ast_ari_channels_stop_silence(struct ast_variable *headers, static void ari_channels_handle_play( const char *args_channel_id, - const char *args_media, + const char **args_media, + size_t args_media_count, const char *args_lang, int args_offsetms, int args_skipms, @@ -515,7 +516,7 @@ static void ari_channels_handle_play( language = S_OR(args_lang, snapshot->language); - playback = stasis_app_control_play_uri(control, args_media, language, + playback = stasis_app_control_play_uri(control, args_media, args_media_count, language, args_channel_id, STASIS_PLAYBACK_TARGET_CHANNEL, args_skipms, args_offsetms, args_playback_id); if (!playback) { ast_ari_response_error( @@ -524,7 +525,7 @@ static void ari_channels_handle_play( return; } - if (ast_asprintf(&playback_url, "/playback/%s", + if (ast_asprintf(&playback_url, "/playbacks/%s", stasis_app_playback_get_id(playback)) == -1) { playback_url = NULL; ast_ari_response_error( @@ -551,6 +552,7 @@ void ast_ari_channels_play(struct ast_variable *headers, ari_channels_handle_play( args->channel_id, args->media, + args->media_count, args->lang, args->offsetms, args->skipms, @@ -565,6 +567,7 @@ void ast_ari_channels_play_with_id(struct ast_variable *headers, ari_channels_handle_play( args->channel_id, args->media, + args->media_count, args->lang, args->offsetms, args->skipms, diff --git a/res/ari/resource_channels.h b/res/ari/resource_channels.h index 89b466d00..c690d70c8 100644 --- a/res/ari/resource_channels.h +++ b/res/ari/resource_channels.h @@ -505,11 +505,15 @@ void ast_ari_channels_stop_silence(struct ast_variable *headers, struct ast_ari_ struct ast_ari_channels_play_args { /*! Channel's id */ const char *channel_id; - /*! Media's URI to play. */ - const char *media; + /*! Array of Media URIs to play. */ + const char **media; + /*! Length of media array. */ + size_t media_count; + /*! Parsing context for media. */ + char *media_parse; /*! For sounds, selects language for sound. */ const char *lang; - /*! Number of media to skip before playing. */ + /*! Number of milliseconds to skip before playing. Only applies to the first URI if multiple media URIs are specified. */ int offsetms; /*! Number of milliseconds to skip for forward/reverse operations. */ int skipms; @@ -543,11 +547,15 @@ struct ast_ari_channels_play_with_id_args { const char *channel_id; /*! Playback ID. */ const char *playback_id; - /*! Media's URI to play. */ - const char *media; + /*! Array of Media URIs to play. */ + const char **media; + /*! Length of media array. */ + size_t media_count; + /*! Parsing context for media. */ + char *media_parse; /*! For sounds, selects language for sound. */ const char *lang; - /*! Number of media to skip before playing. */ + /*! Number of milliseconds to skip before playing. Only applies to the first URI if multiple media URIs are specified. */ int offsetms; /*! Number of milliseconds to skip for forward/reverse operations. */ int skipms; diff --git a/res/res_ari.c b/res/res_ari.c index 62083bfb1..14dece8e4 100644 --- a/res/res_ari.c +++ b/res/res_ari.c @@ -304,10 +304,11 @@ void ast_ari_response_alloc_failed(struct ast_ari_response *response) void ast_ari_response_created(struct ast_ari_response *response, const char *url, struct ast_json *message) { + RAII_VAR(struct stasis_rest_handlers *, root, get_root_handler(), ao2_cleanup); response->message = message; response->response_code = 201; response->response_text = "Created"; - ast_str_append(&response->headers, 0, "Location: %s\r\n", url); + ast_str_append(&response->headers, 0, "Location: /%s%s\r\n", root->path_segment, url); } static void add_allow_header(struct stasis_rest_handlers *handler, diff --git a/res/res_ari_bridges.c b/res/res_ari_bridges.c index 633dc94eb..119687999 100644 --- a/res/res_ari_bridges.c +++ b/res/res_ari_bridges.c @@ -935,7 +935,32 @@ int ast_ari_bridges_play_parse_body( /* Parse query parameters out of it */ field = ast_json_object_get(body, "media"); if (field) { - args->media = ast_json_string_get(field); + /* If they were silly enough to both pass in a query param and a + * JSON body, free up the query value. + */ + ast_free(args->media); + if (ast_json_typeof(field) == AST_JSON_ARRAY) { + /* Multiple param passed as array */ + size_t i; + args->media_count = ast_json_array_size(field); + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + + if (!args->media) { + return -1; + } + + for (i = 0; i < args->media_count; ++i) { + args->media[i] = ast_json_string_get(ast_json_array_get(field, i)); + } + } else { + /* Multiple param passed as single value */ + args->media_count = 1; + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + if (!args->media) { + return -1; + } + args->media[0] = ast_json_string_get(field); + } } field = ast_json_object_get(body, "lang"); if (field) { @@ -978,7 +1003,47 @@ static void ast_ari_bridges_play_cb( for (i = get_params; i; i = i->next) { if (strcmp(i->name, "media") == 0) { - args.media = (i->value); + /* Parse comma separated list */ + char *vals[MAX_VALS]; + size_t j; + + args.media_parse = ast_strdup(i->value); + if (!args.media_parse) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (strlen(args.media_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.media_count = 1; + vals[0] = args.media_parse; + } else { + args.media_count = ast_app_separate_args( + args.media_parse, ',', vals, + ARRAY_LEN(vals)); + } + + if (args.media_count == 0) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (args.media_count >= MAX_VALS) { + ast_ari_response_error(response, 400, + "Bad Request", + "Too many values for media"); + goto fin; + } + + args.media = ast_malloc(sizeof(*args.media) * args.media_count); + if (!args.media) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + for (j = 0; j < args.media_count; ++j) { + args.media[j] = (vals[j]); + } } else if (strcmp(i->name, "lang") == 0) { args.lang = (i->value); @@ -1051,6 +1116,8 @@ static void ast_ari_bridges_play_cb( #endif /* AST_DEVMODE */ fin: __attribute__((unused)) + ast_free(args.media_parse); + ast_free(args.media); return; } int ast_ari_bridges_play_with_id_parse_body( @@ -1061,7 +1128,32 @@ int ast_ari_bridges_play_with_id_parse_body( /* Parse query parameters out of it */ field = ast_json_object_get(body, "media"); if (field) { - args->media = ast_json_string_get(field); + /* If they were silly enough to both pass in a query param and a + * JSON body, free up the query value. + */ + ast_free(args->media); + if (ast_json_typeof(field) == AST_JSON_ARRAY) { + /* Multiple param passed as array */ + size_t i; + args->media_count = ast_json_array_size(field); + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + + if (!args->media) { + return -1; + } + + for (i = 0; i < args->media_count; ++i) { + args->media[i] = ast_json_string_get(ast_json_array_get(field, i)); + } + } else { + /* Multiple param passed as single value */ + args->media_count = 1; + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + if (!args->media) { + return -1; + } + args->media[0] = ast_json_string_get(field); + } } field = ast_json_object_get(body, "lang"); if (field) { @@ -1100,7 +1192,47 @@ static void ast_ari_bridges_play_with_id_cb( for (i = get_params; i; i = i->next) { if (strcmp(i->name, "media") == 0) { - args.media = (i->value); + /* Parse comma separated list */ + char *vals[MAX_VALS]; + size_t j; + + args.media_parse = ast_strdup(i->value); + if (!args.media_parse) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (strlen(args.media_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.media_count = 1; + vals[0] = args.media_parse; + } else { + args.media_count = ast_app_separate_args( + args.media_parse, ',', vals, + ARRAY_LEN(vals)); + } + + if (args.media_count == 0) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (args.media_count >= MAX_VALS) { + ast_ari_response_error(response, 400, + "Bad Request", + "Too many values for media"); + goto fin; + } + + args.media = ast_malloc(sizeof(*args.media) * args.media_count); + if (!args.media) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + for (j = 0; j < args.media_count; ++j) { + args.media[j] = (vals[j]); + } } else if (strcmp(i->name, "lang") == 0) { args.lang = (i->value); @@ -1173,6 +1305,8 @@ static void ast_ari_bridges_play_with_id_cb( #endif /* AST_DEVMODE */ fin: __attribute__((unused)) + ast_free(args.media_parse); + ast_free(args.media); return; } int ast_ari_bridges_record_parse_body( diff --git a/res/res_ari_channels.c b/res/res_ari_channels.c index 1f0818170..951a5475b 100644 --- a/res/res_ari_channels.c +++ b/res/res_ari_channels.c @@ -1842,7 +1842,32 @@ int ast_ari_channels_play_parse_body( /* Parse query parameters out of it */ field = ast_json_object_get(body, "media"); if (field) { - args->media = ast_json_string_get(field); + /* If they were silly enough to both pass in a query param and a + * JSON body, free up the query value. + */ + ast_free(args->media); + if (ast_json_typeof(field) == AST_JSON_ARRAY) { + /* Multiple param passed as array */ + size_t i; + args->media_count = ast_json_array_size(field); + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + + if (!args->media) { + return -1; + } + + for (i = 0; i < args->media_count; ++i) { + args->media[i] = ast_json_string_get(ast_json_array_get(field, i)); + } + } else { + /* Multiple param passed as single value */ + args->media_count = 1; + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + if (!args->media) { + return -1; + } + args->media[0] = ast_json_string_get(field); + } } field = ast_json_object_get(body, "lang"); if (field) { @@ -1885,7 +1910,47 @@ static void ast_ari_channels_play_cb( for (i = get_params; i; i = i->next) { if (strcmp(i->name, "media") == 0) { - args.media = (i->value); + /* Parse comma separated list */ + char *vals[MAX_VALS]; + size_t j; + + args.media_parse = ast_strdup(i->value); + if (!args.media_parse) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (strlen(args.media_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.media_count = 1; + vals[0] = args.media_parse; + } else { + args.media_count = ast_app_separate_args( + args.media_parse, ',', vals, + ARRAY_LEN(vals)); + } + + if (args.media_count == 0) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (args.media_count >= MAX_VALS) { + ast_ari_response_error(response, 400, + "Bad Request", + "Too many values for media"); + goto fin; + } + + args.media = ast_malloc(sizeof(*args.media) * args.media_count); + if (!args.media) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + for (j = 0; j < args.media_count; ++j) { + args.media[j] = (vals[j]); + } } else if (strcmp(i->name, "lang") == 0) { args.lang = (i->value); @@ -1958,6 +2023,8 @@ static void ast_ari_channels_play_cb( #endif /* AST_DEVMODE */ fin: __attribute__((unused)) + ast_free(args.media_parse); + ast_free(args.media); return; } int ast_ari_channels_play_with_id_parse_body( @@ -1968,7 +2035,32 @@ int ast_ari_channels_play_with_id_parse_body( /* Parse query parameters out of it */ field = ast_json_object_get(body, "media"); if (field) { - args->media = ast_json_string_get(field); + /* If they were silly enough to both pass in a query param and a + * JSON body, free up the query value. + */ + ast_free(args->media); + if (ast_json_typeof(field) == AST_JSON_ARRAY) { + /* Multiple param passed as array */ + size_t i; + args->media_count = ast_json_array_size(field); + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + + if (!args->media) { + return -1; + } + + for (i = 0; i < args->media_count; ++i) { + args->media[i] = ast_json_string_get(ast_json_array_get(field, i)); + } + } else { + /* Multiple param passed as single value */ + args->media_count = 1; + args->media = ast_malloc(sizeof(*args->media) * args->media_count); + if (!args->media) { + return -1; + } + args->media[0] = ast_json_string_get(field); + } } field = ast_json_object_get(body, "lang"); if (field) { @@ -2007,7 +2099,47 @@ static void ast_ari_channels_play_with_id_cb( for (i = get_params; i; i = i->next) { if (strcmp(i->name, "media") == 0) { - args.media = (i->value); + /* Parse comma separated list */ + char *vals[MAX_VALS]; + size_t j; + + args.media_parse = ast_strdup(i->value); + if (!args.media_parse) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (strlen(args.media_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.media_count = 1; + vals[0] = args.media_parse; + } else { + args.media_count = ast_app_separate_args( + args.media_parse, ',', vals, + ARRAY_LEN(vals)); + } + + if (args.media_count == 0) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + if (args.media_count >= MAX_VALS) { + ast_ari_response_error(response, 400, + "Bad Request", + "Too many values for media"); + goto fin; + } + + args.media = ast_malloc(sizeof(*args.media) * args.media_count); + if (!args.media) { + ast_ari_response_alloc_failed(response); + goto fin; + } + + for (j = 0; j < args.media_count; ++j) { + args.media[j] = (vals[j]); + } } else if (strcmp(i->name, "lang") == 0) { args.lang = (i->value); @@ -2080,6 +2212,8 @@ static void ast_ari_channels_play_with_id_cb( #endif /* AST_DEVMODE */ fin: __attribute__((unused)) + ast_free(args.media_parse); + ast_free(args.media); return; } int ast_ari_channels_record_parse_body( diff --git a/res/res_hep_pjsip.c b/res/res_hep_pjsip.c index effcc85dd..0cc54c237 100644 --- a/res/res_hep_pjsip.c +++ b/res/res_hep_pjsip.c @@ -82,13 +82,35 @@ static pj_status_t logging_on_tx_msg(pjsip_tx_data *tdata) pjsip_cid_hdr *cid_hdr; pjsip_from_hdr *from_hdr; pjsip_to_hdr *to_hdr; + pjsip_tpmgr_fla2_param prm; capture_info = hepv3_create_capture_info(tdata->buf.start, (size_t)(tdata->buf.cur - tdata->buf.start)); if (!capture_info) { return PJ_SUCCESS; } - pj_sockaddr_print(&tdata->tp_info.transport->local_addr, local_buf, sizeof(local_buf), 3); + /* Attempt to determine what IP address will we send this packet out of */ + pjsip_tpmgr_fla2_param_default(&prm); + prm.tp_type = tdata->tp_info.transport->key.type; + pj_strset2(&prm.dst_host, tdata->tp_info.dst_name); + prm.local_if = PJ_TRUE; + + /* If we can't get the local address use what we have already */ + if (pjsip_tpmgr_find_local_addr2(pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()), tdata->pool, &prm) != PJ_SUCCESS) { + pj_sockaddr_print(&tdata->tp_info.transport->local_addr, local_buf, sizeof(local_buf), 3); + } else { + if (prm.tp_type & PJSIP_TRANSPORT_IPV6) { + snprintf(local_buf, sizeof(local_buf), "[%.*s]:%hu", + (int)pj_strlen(&prm.ret_addr), + pj_strbuf(&prm.ret_addr), + prm.ret_port); + } else { + snprintf(local_buf, sizeof(local_buf), "%.*s:%hu", + (int)pj_strlen(&prm.ret_addr), + pj_strbuf(&prm.ret_addr), + prm.ret_port); + } + } pj_sockaddr_print(&tdata->tp_info.dst_addr, remote_buf, sizeof(remote_buf), 3); cid_hdr = PJSIP_MSG_CID_HDR(tdata->msg); @@ -120,17 +142,39 @@ static pj_bool_t logging_on_rx_msg(pjsip_rx_data *rdata) char remote_buf[256]; char *uuid; struct hepv3_capture_info *capture_info; + pjsip_tpmgr_fla2_param prm; capture_info = hepv3_create_capture_info(&rdata->pkt_info.packet, rdata->pkt_info.len); if (!capture_info) { return PJ_SUCCESS; } - if (rdata->tp_info.transport->addr_len) { - pj_sockaddr_print(&rdata->tp_info.transport->local_addr, local_buf, sizeof(local_buf), 3); + if (!rdata->pkt_info.src_addr_len) { + return PJ_SUCCESS; } - if (rdata->pkt_info.src_addr_len) { - pj_sockaddr_print(&rdata->pkt_info.src_addr, remote_buf, sizeof(remote_buf), 3); + pj_sockaddr_print(&rdata->pkt_info.src_addr, remote_buf, sizeof(remote_buf), 3); + + /* Attempt to determine what IP address we probably received this packet on */ + pjsip_tpmgr_fla2_param_default(&prm); + prm.tp_type = rdata->tp_info.transport->key.type; + pj_strset2(&prm.dst_host, rdata->pkt_info.src_name); + prm.local_if = PJ_TRUE; + + /* If we can't get the local address use what we have already */ + if (pjsip_tpmgr_find_local_addr2(pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()), rdata->tp_info.pool, &prm) != PJ_SUCCESS) { + pj_sockaddr_print(&rdata->tp_info.transport->local_addr, local_buf, sizeof(local_buf), 3); + } else { + if (prm.tp_type & PJSIP_TRANSPORT_IPV6) { + snprintf(local_buf, sizeof(local_buf), "[%.*s]:%hu", + (int)pj_strlen(&prm.ret_addr), + pj_strbuf(&prm.ret_addr), + prm.ret_port); + } else { + snprintf(local_buf, sizeof(local_buf), "%.*s:%hu", + (int)pj_strlen(&prm.ret_addr), + pj_strbuf(&prm.ret_addr), + prm.ret_port); + } } uuid = assign_uuid(&rdata->msg_info.cid->id, &rdata->msg_info.to->tag, &rdata->msg_info.from->tag); diff --git a/res/res_pjsip/config_transport.c b/res/res_pjsip/config_transport.c index 3a5afb6bc..b9208976f 100644 --- a/res/res_pjsip/config_transport.c +++ b/res/res_pjsip/config_transport.c @@ -562,11 +562,17 @@ static int transport_apply(const struct ast_sorcery *sorcery, void *obj) } } else if (transport->type == AST_TRANSPORT_TCP) { pjsip_tcp_transport_cfg cfg; + int option = 1; pjsip_tcp_transport_cfg_default(&cfg, temp_state->state->host.addr.sa_family); cfg.bind_addr = temp_state->state->host; cfg.async_cnt = transport->async_operations; set_qos(transport, &cfg.qos_params); + cfg.sockopt_params.options[0].level = pj_SOL_TCP(); + cfg.sockopt_params.options[0].optname = pj_TCP_NODELAY(); + cfg.sockopt_params.options[0].optval = &option; + cfg.sockopt_params.options[0].optlen = sizeof(option); + cfg.sockopt_params.cnt = 1; for (i = 0; i < BIND_TRIES && res != PJ_SUCCESS; i++) { if (perm_state && perm_state->state && perm_state->state->factory diff --git a/res/res_pjsip_dialog_info_body_generator.c b/res/res_pjsip_dialog_info_body_generator.c index d21af2ae6..5006b9efb 100644 --- a/res/res_pjsip_dialog_info_body_generator.c +++ b/res/res_pjsip_dialog_info_body_generator.c @@ -157,7 +157,7 @@ static int dialog_info_generate_body_content(void *body, void *data) /* The maximum number of times the ast_str() for the body text can grow before we declare an XML body * too large to send. */ -#define MAX_STRING_GROWTHS 3 +#define MAX_STRING_GROWTHS 6 static void dialog_info_to_string(void *body, struct ast_str **str) { diff --git a/res/res_pjsip_exten_state.c b/res/res_pjsip_exten_state.c index 22fb69c29..25b9bf1fe 100644 --- a/res/res_pjsip_exten_state.c +++ b/res/res_pjsip_exten_state.c @@ -647,21 +647,21 @@ static int exten_state_publisher_cb(void *data) publisher = AST_VECTOR_GET(&pub_data->pubs, idx); - uri = ast_sip_publish_client_get_from_uri(publisher->client); + uri = ast_sip_publish_client_get_user_from_uri(publisher->client, pub_data->exten_state_data.exten, + pub_data->exten_state_data.local, sizeof(pub_data->exten_state_data.local)); if (ast_strlen_zero(uri)) { ast_log(LOG_WARNING, "PUBLISH client '%s' has no from_uri or server_uri defined.\n", publisher->name); continue; } - ast_copy_string(pub_data->exten_state_data.local, uri, sizeof(pub_data->exten_state_data.local)); - uri = ast_sip_publish_client_get_to_uri(publisher->client); + uri = ast_sip_publish_client_get_user_to_uri(publisher->client, pub_data->exten_state_data.exten, + pub_data->exten_state_data.remote, sizeof(pub_data->exten_state_data.remote)); if (ast_strlen_zero(uri)) { ast_log(LOG_WARNING, "PUBLISH client '%s' has no to_uri or server_uri defined.\n", publisher->name); continue; } - ast_copy_string(pub_data->exten_state_data.remote, uri, sizeof(pub_data->exten_state_data.remote)); pub_data->exten_state_data.datastores = publisher->datastores; @@ -678,7 +678,7 @@ static int exten_state_publisher_cb(void *data) body.type = publisher->body_type; body.subtype = publisher->body_subtype; body.body_text = ast_str_buffer(body_text); - ast_sip_publish_client_send(publisher->client, &body); + ast_sip_publish_client_user_send(publisher->client, pub_data->exten_state_data.exten, &body); } pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 51e8a06be..1c3b0c644 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -27,6 +27,7 @@ #include <pjsip.h> #include <pjsip_simple.h> +#include "asterisk/res_pjproject.h" #include "asterisk/res_pjsip.h" #include "asterisk/res_pjsip_outbound_publish.h" #include "asterisk/module.h" @@ -94,6 +95,10 @@ <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note> </description> </configOption> + <configOption name="multi_user" default="no"> + <synopsis>Enable multi-user support</synopsis> + <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description> + </configOption> <configOption name="type"> <synopsis>Must be of type 'outbound-publish'.</synopsis> </configOption> @@ -102,6 +107,8 @@ </configInfo> ***/ +static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE; + /*! \brief Queued outbound publish message */ struct sip_outbound_publish_message { /*! \brief Optional body */ @@ -112,6 +119,39 @@ struct sip_outbound_publish_message { char body_contents[0]; }; +/* + * A note about some of the object types used in this module: + * + * The reason we currently have 4 separate object types that relate to configuration, + * publishing, state, and client information is due to object lifetimes and order of + * destruction dependencies. + * + * Separation of concerns is a good thing and of course it makes sense to have a + * configuration object type as well as an object type wrapper around pjsip's publishing + * client class. There also may be run time state data that needs to be tracked, so + * again having something to handle that is prudent. However, it may be tempting to think + * "why not combine the state and client object types?" Especially seeing as how they have + * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit + * more awkward. + * + * Currently this module maintains a global container of current state objects. When this + * states container is replaced, or deleted, it un-references all contained objects. Any + * state with a reference left have probably been carried over from a reload/realtime fetch. + * States not carried over are destructed and the associated client (and all its publishers) + * get unpublished. + * + * This "unpublishing" goes through a careful process of unpublishing the client, all its + * publishers, and making sure all the appropriate references are removed in a sane order. + * This process is essentially kicked off with the destruction of the state. If the state + * and client objects were to be merged, where clients became the globally tracked object + * type, this "unpublishing" process would never start because of the multiple references + * held to the client object over it's lifetime. Meaning the global tracking container + * would remove its reference to the client object when done with it, but other sources + * would still be holding a reference to it (namely the datastore and publisher(s)). + * + * Thus at this time it is easier to keep them separate. + */ + /*! \brief Outbound publish information */ struct ast_sip_outbound_publish { /*! \brief Sorcery object details */ @@ -137,30 +177,47 @@ struct ast_sip_outbound_publish { unsigned int max_auth_attempts; /*! \brief Configured authentication credentials */ struct ast_sip_auth_vector outbound_auths; + /*! \brief The publishing client is used for multiple users when true */ + unsigned int multi_user; }; -/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */ -struct ast_sip_outbound_publish_client { +struct sip_outbound_publisher { + /*! \brief The client object that 'owns' this client + + \note any potential circular reference problems are accounted + for (see publisher alloc for more information) + */ + struct ast_sip_outbound_publish_client *owner; /*! \brief Underlying publish client */ pjsip_publishc *client; + /*! \brief The From URI for this specific publisher */ + char *from_uri; + /*! \brief The To URI for this specific publisher */ + char *to_uri; /*! \brief Timer entry for refreshing publish */ pj_timer_entry timer; - /*! \brief Publisher datastores set up by handlers */ - struct ao2_container *datastores; /*! \brief The number of auth attempts done */ unsigned int auth_attempts; /*! \brief Queue of outgoing publish messages to send*/ AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue; /*! \brief The message currently being sent */ struct sip_outbound_publish_message *sending; - /*! \brief Publish client has been fully started and event type informed */ - unsigned int started; /*! \brief Publish client should be destroyed */ unsigned int destroy; + /*! \brief User, if any, associated with the publisher */ + char user[0]; +}; + +/*! \brief Outbound publish client state information (persists for lifetime of a publish) */ +struct ast_sip_outbound_publish_client { /*! \brief Outbound publish information */ struct ast_sip_outbound_publish *publish; - /*! \brief The name of the transport to be used for the publish */ - char *transport_name; + /*! \brief Publisher datastores set up by handlers */ + struct ao2_container *datastores; + /*! \brief Container of all the client publishing objects */ + struct ao2_container *publishers; + /*! \brief Publishing has been fully started and event type informed */ + unsigned int started; }; /*! \brief Outbound publish state information (persists for lifetime of a publish) */ @@ -171,6 +228,20 @@ struct ast_sip_outbound_publish_state { char id[0]; }; +/*! + * \brief Used for locking while loading/reloading + * + * Mutli-user configurations make it so publishers can be dynamically added and + * removed. Publishers should not be added or removed during a [re]load since + * it could cause the current_clients container to be out of sync. Thus the + * reason for this lock. + */ +AST_RWLOCK_DEFINE_STATIC(load_lock); + +#define DEFAULT_PUBLISHER_BUCKETS 119 +AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user); +AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user); + /*! \brief Unloading data */ struct unloading_data { int is_unloading; @@ -238,6 +309,7 @@ static int outbound_publish_state_cmp(void *obj, void *arg, int flags) static struct ao2_container *get_publishes_and_update_state(void) { struct ao2_container *container; + SCOPED_WRLOCK(lock, &load_lock); container = ast_sorcery_retrieve_by_fields( ast_sip_get_sorcery(), "outbound-publish", @@ -274,22 +346,22 @@ static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_ return iter; } -/*! \brief Helper function which cancels the refresh timer on a client */ -static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client) +/*! \brief Helper function which cancels the refresh timer on a publisher */ +static void cancel_publish_refresh(struct sip_outbound_publisher *publisher) { - if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) { - /* The timer was successfully cancelled, drop the refcount of the client */ - ao2_ref(client, -1); + if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) { + /* The timer was successfully cancelled, drop the refcount of the publisher */ + ao2_ref(publisher, -1); } } /*! \brief Helper function which sets up the timer to send publication */ -static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration) +static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration) { - struct ast_sip_outbound_publish *publish = ao2_bump(client->publish); + struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish); pj_time_val delay = { .sec = 0, }; - cancel_publish_refresh(client); + cancel_publish_refresh(publisher); if (expiration > 0) { delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH; @@ -301,61 +373,83 @@ static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *cli delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH; } - ao2_ref(client, +1); - if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) { + ao2_ref(publisher, +1); + if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) { ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n"); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } ao2_ref(publish, -1); } +static int publisher_client_send(void *obj, void *arg, void *data, int flags); + /*! \brief Publish client timer callback function */ static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry) { - struct ast_sip_outbound_publish_client *client = entry->user_data; + struct sip_outbound_publisher *publisher = entry->user_data; - ao2_lock(client); - if (AST_LIST_EMPTY(&client->queue)) { + ao2_lock(publisher); + if (AST_LIST_EMPTY(&publisher->queue)) { + int res; /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */ - ast_sip_publish_client_send(client, NULL); + publisher_client_send(publisher, NULL, &res, 0); } - ao2_unlock(client); + ao2_unlock(publisher); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } /*! \brief Task for cancelling a refresh timer */ static int cancel_refresh_timer_task(void *data) { - struct ast_sip_outbound_publish_client *client = data; + struct sip_outbound_publisher *publisher = data; - cancel_publish_refresh(client); - ao2_ref(client, -1); + cancel_publish_refresh(publisher); + ao2_ref(publisher, -1); return 0; } +static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata) +{ + if (!ast_strlen_zero(publisher->owner->publish->transport)) { + pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; + ast_sip_set_tpselector_from_transport_name( + publisher->owner->publish->transport, &selector); + pjsip_tx_data_set_transport(tdata, &selector); + } +} + /*! \brief Task for sending an unpublish */ static int send_unpublish_task(void *data) { - struct ast_sip_outbound_publish_client *client = data; + struct sip_outbound_publisher *publisher = data; pjsip_tx_data *tdata; - if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) { - if (!ast_strlen_zero(client->transport_name)) { - pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; - ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector); - pjsip_tx_data_set_transport(tdata, &selector); - } - - pjsip_publishc_send(client->client, tdata); + if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) { + set_transport(publisher, tdata); + pjsip_publishc_send(publisher->client, tdata); } - ao2_ref(client, -1); + ao2_ref(publisher, -1); return 0; } +static void stop_publishing(struct ast_sip_outbound_publish_client *client, + struct ast_sip_event_publisher_handler *handler) +{ + if (!handler) { + handler = find_publisher_handler_for_event_name(client->publish->event); + } + + if (handler) { + handler->stop_publishing(client); + } +} + +static int cancel_and_unpublish(void *obj, void *arg, int flags); + /*! \brief Helper function which starts or stops publish clients when applicable */ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed) { @@ -389,15 +483,10 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand } else { state->client->started = 1; } - } else if (!handler && removed && !strcmp(publish->event, removed->event_name)) { - /* If the publisher client has been started but it is going away stop it */ - removed->stop_publishing(state->client); + } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) { + stop_publishing(state->client, removed); + ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL); state->client->started = 0; - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) { - ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n", - ast_sorcery_object_get_id(publish)); - ao2_ref(state->client, -1); - } } ao2_ref(publish, -1); ao2_ref(state, -1); @@ -440,6 +529,49 @@ const char *ast_sip_publish_client_get_from_uri(struct ast_sip_outbound_publish_ return S_OR(publish->from_uri, S_OR(publish->server_uri, "")); } +static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher( + struct ast_sip_outbound_publish_client *client, const char *user); + +static struct sip_outbound_publisher *sip_outbound_publish_client_get_publisher( + struct ast_sip_outbound_publish_client *client, const char *user) +{ + struct sip_outbound_publisher *publisher; + + /* + * Lock before searching since there could be a race between searching and adding. + * Just use the load_lock since we might need to lock it anyway (if adding) and + * also it simplifies the code (otherwise we'd have to lock the publishers, no- + * lock the search and pass a flag to 'add publisher to no-lock the potential link). + */ + ast_rwlock_wrlock(&load_lock); + publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY); + if (!publisher) { + if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) { + ast_rwlock_unlock(&load_lock); + return NULL; + } + } + ast_rwlock_unlock(&load_lock); + + return publisher; +} + +const char *ast_sip_publish_client_get_user_from_uri(struct ast_sip_outbound_publish_client *client, const char *user, + char *uri, size_t size) +{ + struct sip_outbound_publisher *publisher; + + publisher = sip_outbound_publish_client_get_publisher(client, user); + if (!publisher) { + return NULL; + } + + ast_copy_string(uri, publisher->from_uri, size); + ao2_ref(publisher, -1); + + return uri; +} + const char *ast_sip_publish_client_get_to_uri(struct ast_sip_outbound_publish_client *client) { struct ast_sip_outbound_publish *publish = client->publish; @@ -447,6 +579,22 @@ const char *ast_sip_publish_client_get_to_uri(struct ast_sip_outbound_publish_cl return S_OR(publish->to_uri, S_OR(publish->server_uri, "")); } +const char *ast_sip_publish_client_get_user_to_uri(struct ast_sip_outbound_publish_client *client, const char *user, + char *uri, size_t size) +{ + struct sip_outbound_publisher *publisher; + + publisher = sip_outbound_publish_client_get_publisher(client, user); + if (!publisher) { + return NULL; + } + + ast_copy_string(uri, publisher->to_uri, size); + ao2_ref(publisher, -1); + + return uri; +} + int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler) { struct ast_sip_event_publisher_handler *existing; @@ -583,19 +731,19 @@ void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_cli ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA); } -static int sip_publish_client_service_queue(void *data) +static int sip_publisher_service_queue(void *data) { - RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup); - SCOPED_AO2LOCK(lock, client); + RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup); + SCOPED_AO2LOCK(lock, publisher); struct sip_outbound_publish_message *message; pjsip_tx_data *tdata; pj_status_t status; - if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) { + if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) { return 0; } - if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) { + if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) { goto fatal; } @@ -605,13 +753,9 @@ static int sip_publish_client_service_queue(void *data) goto fatal; } - if (!ast_strlen_zero(client->transport_name)) { - pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; - ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector); - pjsip_tx_data_set_transport(tdata, &selector); - } + set_transport(publisher, tdata); - status = pjsip_publishc_send(client->client, tdata); + status = pjsip_publishc_send(publisher->client, tdata); if (status == PJ_EBUSY) { /* We attempted to send the message but something else got there first */ goto service; @@ -619,30 +763,31 @@ static int sip_publish_client_service_queue(void *data) goto fatal; } - client->sending = message; + publisher->sending = message; return 0; fatal: - AST_LIST_REMOVE_HEAD(&client->queue, entry); + AST_LIST_REMOVE_HEAD(&publisher->queue, entry); ast_free(message); service: - if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) { - ao2_ref(client, -1); + if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) { + ao2_ref(publisher, -1); } return -1; } -int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client, - const struct ast_sip_body *body) +static int publisher_client_send(void *obj, void *arg, void *data, int flags) { - SCOPED_AO2LOCK(lock, client); + struct sip_outbound_publisher *publisher = obj; + const struct ast_sip_body *body = arg; struct sip_outbound_publish_message *message; size_t type_len = 0, subtype_len = 0, body_text_len = 0; - int res; + int *res = data; - if (!client->client) { + *res = -1; + if (!publisher->client) { return -1; } @@ -668,31 +813,204 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client, message->body.body_text = strcpy(dst, body->body_text); } - AST_LIST_INSERT_TAIL(&client->queue, message, entry); + AST_LIST_INSERT_TAIL(&publisher->queue, message, entry); - res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client)); - if (res) { - ao2_ref(client, -1); + *res = ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher)); + if (*res) { + ao2_ref(publisher, -1); } + return *res; +} + +int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client, + const struct ast_sip_body *body) +{ + SCOPED_AO2LOCK(lock, client); + int res = 0; + + ao2_callback_data(client->publishers, OBJ_NODATA, + publisher_client_send, (void *)body, &res); return res; } +static int sip_outbound_publisher_set_uri( + pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri) +{ + pj_str_t tmp; + pjsip_uri *parsed; + pjsip_sip_uri *parsed_uri; + int size; + + pj_strdup2_with_null(pool, &tmp, uri); + if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) { + return -1; + } + + if (!(parsed_uri = pjsip_uri_get_uri(parsed))) { + return -1; + } + + if (!ast_strlen_zero(user)) { + pj_strdup2(pool, &parsed_uri->user, user); + } + + res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size); + if (!res_uri->ptr) { + return -1; + } + + if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr, + pjsip_max_url_size - 1)) <= 0) { + return -1; + } + res_uri->ptr[size] = '\0'; + res_uri->slen = size; + + return 0; +} + +static int sip_outbound_publisher_set_uris( + pj_pool_t *pool, struct sip_outbound_publisher *publisher, + pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri) +{ + struct ast_sip_outbound_publish *publish = publisher->owner->publish; + + if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) { + ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n", + publish->server_uri, ast_sorcery_object_get_id(publish)); + return -1; + } + + if (ast_strlen_zero(publish->to_uri)) { + to_uri->ptr = server_uri->ptr; + to_uri->slen = server_uri->slen; + } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) { + ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n", + publish->to_uri, ast_sorcery_object_get_id(publish)); + return -1; + } + + publisher->to_uri = ast_strdup(to_uri->ptr); + if (!publisher->to_uri) { + return -1; + } + + if (ast_strlen_zero(publish->from_uri)) { + from_uri->ptr = server_uri->ptr; + from_uri->slen = server_uri->slen; + } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) { + ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n", + publish->from_uri, ast_sorcery_object_get_id(publish)); + return -1; + } + + publisher->from_uri = ast_strdup(from_uri->ptr); + if (!publisher->from_uri) { + return -1; + } + + return 0; +} + +static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param); + +/*! \brief Helper function that allocates a pjsip publish client and configures it */ +static int sip_outbound_publisher_init(void *data) +{ + struct sip_outbound_publisher *publisher = data; + RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup); + pjsip_publishc_opt opt = { + .queue_request = PJ_FALSE, + }; + pj_pool_t *pool; + pj_str_t event, server_uri, to_uri, from_uri; + + if (publisher->client) { + return 0; + } + + if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, + ao2_bump(publisher), sip_outbound_publish_callback, + &publisher->client) != PJ_SUCCESS) { + ao2_ref(publisher, -1); + return -1; + } + + publish = ao2_bump(publisher->owner->publish); + + if (!ast_strlen_zero(publish->outbound_proxy)) { + pjsip_route_hdr route_set, *route; + static const pj_str_t ROUTE_HNAME = { "Route", 5 }; + + pj_list_init(&route_set); + + if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME, + (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) { + pjsip_publishc_destroy(publisher->client); + return -1; + } + pj_list_insert_nodes_before(&route_set, route); + + pjsip_publishc_set_route_set(publisher->client, &route_set); + } + + pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", + pjsip_max_url_size, pjsip_max_url_size); + if (!pool) { + ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n", + ast_sorcery_object_get_id(publish)); + pjsip_publishc_destroy(publisher->client); + return -1; + } + + if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) { + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + pjsip_publishc_destroy(publisher->client); + return -1; + } + + pj_cstr(&event, publish->event); + if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri, + publish->expiration != PJ_SUCCESS)) { + ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n", + ast_sorcery_object_get_id(publish)); + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + pjsip_publishc_destroy(publisher->client); + return -1; + } + + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + return 0; +} + +static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags) +{ + return sip_outbound_publisher_init(obj); +} + +static int sip_outbound_publisher_reinit_all(void *data) +{ + ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL); + return 0; +} + /*! \brief Destructor function for publish client */ -static void sip_outbound_publish_client_destroy(void *obj) +static void sip_outbound_publisher_destroy(void *obj) { - struct ast_sip_outbound_publish_client *client = obj; + struct sip_outbound_publisher *publisher = obj; struct sip_outbound_publish_message *message; /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */ - while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) { + while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) { ast_free(message); } - ao2_cleanup(client->datastores); - ao2_cleanup(client->publish); - ast_free(client->transport_name); + ao2_cleanup(publisher->owner); + + ast_free(publisher->from_uri); + ast_free(publisher->to_uri); /* if unloading the module and all objects have been unpublished send the signal to finish unloading */ @@ -705,67 +1023,184 @@ static void sip_outbound_publish_client_destroy(void *obj) } } +static struct sip_outbound_publisher *sip_outbound_publisher_alloc( + struct ast_sip_outbound_publish_client *client, const char *user) +{ + struct sip_outbound_publisher *publisher; + + publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1, + sip_outbound_publisher_destroy); + if (!publisher) { + return NULL; + } + + /* + * Bump the ref to the client. This essentially creates a circular reference, + * but it is needed in order to make sure the client object doesn't get pulled + * out from under us when the publisher stops publishing. + * + * The circular reference is alleviated by calling cancel_and_unpublish for + * each client, from the state's destructor. By calling it there all references + * to the publishers should go to zero, thus calling the publisher's destructor. + * This in turn removes the client reference we added here. The state then removes + * its reference to the client, which should take it to zero. + */ + publisher->owner = ao2_bump(client); + publisher->timer.user_data = publisher; + publisher->timer.cb = sip_outbound_publish_timer_cb; + if (user) { + strcpy(publisher->user, user); + } else { + *publisher->user = '\0'; + } + + if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) { + ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n", + ast_sorcery_object_get_id(client->publish)); + ao2_ref(publisher, -1); + return NULL; + } + + return publisher; +} + +static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher( + struct ast_sip_outbound_publish_client *client, const char *user) +{ + struct sip_outbound_publisher *publisher = + sip_outbound_publisher_alloc(client, user); + + if (!publisher) { + return NULL; + } + + if (!ao2_link(client->publishers, publisher)) { + /* + * No need to bump the reference here. The task will take care of + * removing the reference. + */ + if (ast_sip_push_task(NULL, cancel_refresh_timer_task, publisher)) { + ao2_ref(publisher, -1); + } + return NULL; + } + + return publisher; +} + +int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client, + const char *user, const struct ast_sip_body *body) +{ + struct sip_outbound_publisher *publisher; + int res; + + publisher = sip_outbound_publish_client_get_publisher(client, user); + if (!publisher) { + return -1; + } + + publisher_client_send(publisher, (void *)body, &res, 0); + ao2_ref(publisher, -1); + return res; +} + +void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client, + const char *user) +{ + SCOPED_WRLOCK(lock, &load_lock); + ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA); +} + static int explicit_publish_destroy(void *data) { - struct ast_sip_outbound_publish_client *client = data; + struct sip_outbound_publisher *publisher = data; /* * If there is no pjsip publishing client then we obviously don't need * to destroy it. Also, the ref for the Asterisk publishing client that * pjsip had would not exist or should already be gone as well. */ - if (client->client) { - pjsip_publishc_destroy(client->client); - ao2_ref(client, -1); + if (publisher->client) { + pjsip_publishc_destroy(publisher->client); + ao2_ref(publisher, -1); } return 0; } /*! \brief Helper function which cancels and un-publishes a no longer used client */ -static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client) +static int cancel_and_unpublish(void *obj, void *arg, int flags) { - struct ast_sip_event_publisher_handler *handler; - SCOPED_AO2LOCK(lock, client); + struct sip_outbound_publisher *publisher = obj; + struct ast_sip_outbound_publish_client *client = publisher->owner; + + SCOPED_AO2LOCK(lock, publisher); if (!client->started) { - /* If the client was never started, there's nothing to unpublish, so just - * destroy the publication and remove its reference to the client. + /* If the publisher was never started, there's nothing to unpublish, so just + * destroy the publication and remove its reference to the publisher. */ - ast_sip_push_task(NULL, explicit_publish_destroy, client); + ast_sip_push_task(NULL, explicit_publish_destroy, publisher); return 0; } - handler = find_publisher_handler_for_event_name(client->publish->event); - if (handler) { - handler->stop_publishing(client); - } - - client->started = 0; - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) { + if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publisher))) { ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */ - if (!client->sending) { - if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) { + if (!publisher->sending) { + if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) { ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } } - client->destroy = 1; + publisher->destroy = 1; return 0; } +/*! \brief Destructor function for publish client */ +static void sip_outbound_publish_client_destroy(void *obj) +{ + struct ast_sip_outbound_publish_client *client = obj; + + ao2_cleanup(client->datastores); + + /* + * The client's publishers have already been unpublished and destroyed + * by this point, so it is safe to finally remove the reference to the + * publish object. The client needed to hold a reference to it until + * the publishers were done with it. + */ + ao2_cleanup(client->publish); +} + /*! \brief Destructor function for publish state */ static void sip_outbound_publish_state_destroy(void *obj) { struct ast_sip_outbound_publish_state *state = obj; - cancel_and_unpublish(state->client); + stop_publishing(state->client, NULL); + /* + * Since the state is being destroyed the associated client needs to also + * be destroyed. However simply removing the reference to the client will + * not initiate client destruction since the client's publisher(s) hold a + * reference to the client object as well. So we need to unpublish the + * the client's publishers here, which will remove the publisher's client + * reference during that process. + * + * That being said we don't want to remove the client's reference to the + * publish object just yet. We'll hold off on that until client destruction + * itself. This is because the publishers need access to the client's + * publish object while they are unpublishing. + */ + ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL); + ao2_cleanup(state->client->publishers); + + state->client->started = 0; ao2_cleanup(state->client); } @@ -799,126 +1234,31 @@ static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct a return 1; } -static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param); - -/*! \brief Helper function that allocates a pjsip publish client and configures it */ -static int sip_outbound_publish_client_alloc(void *data) -{ - struct ast_sip_outbound_publish_client *client = data; - RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup); - pjsip_publishc_opt opt = { - .queue_request = PJ_FALSE, - }; - pj_str_t event, server_uri, to_uri, from_uri; - pj_status_t status; - - if (client->client) { - return 0; - } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback, - &client->client) != PJ_SUCCESS) { - ao2_ref(client, -1); - return -1; - } - - publish = ao2_bump(client->publish); - - if (!ast_strlen_zero(publish->outbound_proxy)) { - pjsip_route_hdr route_set, *route; - static const pj_str_t ROUTE_HNAME = { "Route", 5 }; - - pj_list_init(&route_set); - - if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME, - (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) { - pjsip_publishc_destroy(client->client); - return -1; - } - pj_list_insert_nodes_before(&route_set, route); - - pjsip_publishc_set_route_set(client->client, &route_set); - } - - pj_cstr(&event, publish->event); - pj_cstr(&server_uri, publish->server_uri); - pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri)); - pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri)); - - status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri, - publish->expiration); - if (status == PJSIP_EINVALIDURI) { - pj_pool_t *pool; - pj_str_t tmp; - pjsip_uri *uri; - - pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256); - if (!pool) { - ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n", - ast_sorcery_object_get_id(publish)); - pjsip_publishc_destroy(client->client); - return -1; - } - - pj_strdup2_with_null(pool, &tmp, publish->server_uri); - uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0); - if (!uri) { - ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n", - publish->server_uri, ast_sorcery_object_get_id(publish)); - } - - if (!ast_strlen_zero(publish->to_uri)) { - pj_strdup2_with_null(pool, &tmp, publish->to_uri); - uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0); - if (!uri) { - ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n", - publish->to_uri, ast_sorcery_object_get_id(publish)); - } - } - - if (!ast_strlen_zero(publish->from_uri)) { - pj_strdup2_with_null(pool, &tmp, publish->from_uri); - uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0); - if (!uri) { - ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n", - publish->from_uri, ast_sorcery_object_get_id(publish)); - } - } - - pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); - pjsip_publishc_destroy(client->client); - return -1; - } else if (status != PJ_SUCCESS) { - pjsip_publishc_destroy(client->client); - return -1; - } - - return 0; -} - /*! \brief Callback function for publish client responses */ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) { #define DESTROY_CLIENT() do { \ - pjsip_publishc_destroy(client->client); \ - client->client = NULL; \ - ao2_ref(client, -1); } while (0) + pjsip_publishc_destroy(publisher->client); \ + publisher->client = NULL; \ + ao2_ref(publisher, -1); } while (0) - RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup); - RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup); - SCOPED_AO2LOCK(lock, client); + RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup); + RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup); + SCOPED_AO2LOCK(lock, publisher); pjsip_tx_data *tdata; - if (client->destroy) { - if (client->sending) { - client->sending = NULL; + if (publisher->destroy) { + if (publisher->sending) { + publisher->sending = NULL; - if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) { + if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) { return; } ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } - /* Once the destroy is called this callback will not get called any longer, so drop the client ref */ + /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */ DESTROY_CLIENT(); return; } @@ -928,16 +1268,12 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) if (!ast_sip_create_request_with_auth(&publish->outbound_auths, param->rdata, tsx->last_tx, &tdata)) { - if (!ast_strlen_zero(client->transport_name)) { - pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; - ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector); - pjsip_tx_data_set_transport(tdata, &selector); - } - pjsip_publishc_send(client->client, tdata); + set_transport(publisher, tdata); + pjsip_publishc_send(publisher->client, tdata); } - client->auth_attempts++; + publisher->auth_attempts++; - if (client->auth_attempts == publish->max_auth_attempts) { + if (publisher->auth_attempts == publish->max_auth_attempts) { DESTROY_CLIENT(); ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); @@ -947,18 +1283,18 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) return; } - client->auth_attempts = 0; + publisher->auth_attempts = 0; if (param->code == 412) { DESTROY_CLIENT(); - if (sip_outbound_publish_client_alloc(client)) { + if (sip_outbound_publisher_init(publisher)) { ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n", ast_sorcery_object_get_id(publish)); goto end; } /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */ - client->sending = NULL; + publisher->sending = NULL; } else if (param->code == 423) { /* Update the expiration with the new expiration time if available */ pjsip_expires_hdr *expires; @@ -971,33 +1307,33 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) goto end; } - pjsip_publishc_update_expires(client->client, expires->ivalue); - client->sending = NULL; - } else if (client->sending) { + pjsip_publishc_update_expires(publisher->client, expires->ivalue); + publisher->sending = NULL; + } else if (publisher->sending) { /* Remove the message currently being sent so that when the queue is serviced another will get sent */ - AST_LIST_REMOVE_HEAD(&client->queue, entry); - ast_free(client->sending); - client->sending = NULL; + AST_LIST_REMOVE_HEAD(&publisher->queue, entry); + ast_free(publisher->sending); + publisher->sending = NULL; if (!param->rdata) { ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); } } - if (AST_LIST_EMPTY(&client->queue)) { - schedule_publish_refresh(client, param->expiration); + if (AST_LIST_EMPTY(&publisher->queue)) { + schedule_publish_refresh(publisher, param->expiration); } end: - if (!client->client) { + if (!publisher->client) { struct sip_outbound_publish_message *message; - while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) { + while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) { ast_free(message); } } else { - if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) { - ao2_ref(client, -1); + if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) { + ao2_ref(publisher, -1); } } } @@ -1085,27 +1421,18 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc( return NULL; } - state->client->timer.user_data = state->client; - state->client->timer.cb = sip_outbound_publish_timer_cb; - state->client->transport_name = ast_strdup(publish->transport); + state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn, + sip_outbound_publisher_cmp_fn); + if (!state->client->publishers) { + ao2_ref(state, -1); + return NULL; + } state->client->publish = ao2_bump(publish); strcpy(state->id, id); return state; } -static int initialize_publish_client(struct ast_sip_outbound_publish *publish, - struct ast_sip_outbound_publish_state *state) -{ - if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) { - ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n", - ast_sorcery_object_get_id(publish)); - return -1; - } - - return 0; -} - static int validate_publish_config(struct ast_sip_outbound_publish *publish) { if (ast_strlen_zero(publish->server_uri)) { @@ -1125,6 +1452,15 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish, { struct ast_sip_outbound_publish *old_publish; + /* + * Don't maintain the old state/client objects if the multi_user option changed. + */ + if ((!publish->multi_user && current_state->client->publish->multi_user) || + (publish->multi_user && !current_state->client->publish->multi_user)) { + return 0; + } + + if (!can_reuse_publish(current_state->client->publish, publish)) { /* * Something significant has changed in the configuration, so we are @@ -1140,12 +1476,15 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish, */ old_publish = current_state->client->publish; current_state->client->publish = publish; - if (initialize_publish_client(publish, current_state)) { + if (ast_sip_push_task_synchronous( + NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) { /* * If the state object fails to re-initialize then swap * the old publish info back in. */ current_state->client->publish = publish; + ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n", + ast_sorcery_object_get_id(current_state->client->publish)); return -1; } @@ -1170,6 +1509,7 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o struct ast_sip_outbound_publish *applied = obj; struct ast_sip_outbound_publish_state *current_state, *new_state; + struct sip_outbound_publisher *publisher = NULL; int res; /* @@ -1216,11 +1556,13 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o return -1; }; - if (initialize_publish_client(applied, new_state)) { + if (!applied->multi_user && + !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) { ADD_TO_NEW_STATES(current_state); ao2_ref(new_state, -1); return -1; } + ao2_cleanup(publisher); ADD_TO_NEW_STATES(new_state); ao2_cleanup(current_state); @@ -1238,6 +1580,9 @@ static int load_module(void) { CHECK_PJSIP_MODULE_LOADED(); + /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */ + ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size); + ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish"); ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish"); @@ -1257,6 +1602,7 @@ static int load_module(void) ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts)); ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport)); ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0); + ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user)); ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish"); @@ -1279,6 +1625,13 @@ static int reload_module(void) return 0; } +static int current_publishing_count(void *obj, void *arg, int flags) +{ + struct ast_sip_outbound_publish_state *state = obj; + unloading.count += ao2_container_count(state->client->publishers); + return 0; +} + static int unload_module(void) { struct timeval start = ast_tvnow(); @@ -1289,11 +1642,18 @@ static int unload_module(void) int res = 0; struct ao2_container *states = ao2_global_obj_ref(current_states); - if (!states || !(unloading.count = ao2_container_count(states))) { + if (!states) { return 0; } + + unloading.count = 0; + ao2_callback(states, OBJ_NODATA, current_publishing_count, NULL); ao2_ref(states, -1); + if (!unloading.count) { + return 0; + } + ast_mutex_init(&unloading.lock); ast_cond_init(&unloading.cond, NULL); ast_mutex_lock(&unloading.lock); diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 815432d10..62fe6dab2 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -1912,6 +1912,26 @@ static const struct ast_sorcery_instance_observer observer_callbacks_registratio .object_type_loaded = registration_loaded_observer, }; +static void registration_deleted_observer(const void *obj) +{ + const struct sip_outbound_registration *registration = obj; + struct ao2_container *states; + + states = ao2_global_obj_ref(current_states); + if (!states) { + /* Global container has gone. Likely shutting down. */ + return; + } + + ao2_find(states, ast_sorcery_object_get_id(registration), OBJ_UNLINK | OBJ_NODATA | OBJ_SEARCH_KEY); + + ao2_ref(states, -1); +} + +static const struct ast_sorcery_observer registration_observer = { + .deleted = registration_deleted_observer, +}; + static int unload_module(void) { int remaining; @@ -2011,7 +2031,9 @@ static int load_module(void) if (ast_sorcery_instance_observer_add(ast_sip_get_sorcery(), &observer_callbacks_registrations) || ast_sorcery_observer_add(ast_sip_get_sorcery(), "auth", - &observer_callbacks_auth)) { + &observer_callbacks_auth) + || ast_sorcery_observer_add(ast_sip_get_sorcery(), "registration", + ®istration_observer)) { ast_log(LOG_ERROR, "Unable to register observers.\n"); unload_module(); return AST_MODULE_LOAD_FAILURE; diff --git a/res/res_stasis_playback.c b/res/res_stasis_playback.c index 97191c26d..a64ecffa7 100644 --- a/res/res_stasis_playback.c +++ b/res/res_stasis_playback.c @@ -70,10 +70,16 @@ static struct ao2_container *playbacks; struct stasis_app_playback { AST_DECLARE_STRING_FIELDS( AST_STRING_FIELD(id); /*!< Playback unique id */ - AST_STRING_FIELD(media); /*!< Playback media uri */ + AST_STRING_FIELD(media); /*!< The current media playing */ AST_STRING_FIELD(language); /*!< Preferred language */ AST_STRING_FIELD(target); /*!< Playback device uri */ - ); + ); + /*! The list of medias to play back */ + AST_VECTOR(, char *) medias; + + /*! The current index in \c medias we're playing */ + size_t media_index; + /*! Control object for the channel we're playing back to */ struct stasis_app_control *control; /*! Number of milliseconds to skip before playing */ @@ -99,6 +105,8 @@ static struct ast_json *playback_to_json(struct stasis_message *message, if (!strcmp(state, "playing")) { type = "PlaybackStarted"; + } else if (!strcmp(state, "continuing")) { + type = "PlaybackContinuing"; } else if (!strcmp(state, "done")) { type = "PlaybackFinished"; } else { @@ -117,6 +125,14 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_app_playback_snapshot_type, static void playback_dtor(void *obj) { struct stasis_app_playback *playback = obj; + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&playback->medias); i++) { + char *media = AST_VECTOR_GET(&playback->medias, i); + + ast_free(media); + } + AST_VECTOR_FREE(&playback->medias); ao2_cleanup(playback->control); ast_string_field_free_memory(playback); @@ -137,6 +153,11 @@ static struct stasis_app_playback *playback_create( return NULL; } + if (AST_VECTOR_INIT(&playback->medias, 8)) { + ao2_ref(playback, -1); + return NULL; + } + if (!ast_strlen_zero(id)) { ast_string_field_set(playback, id, id); } else { @@ -180,6 +201,8 @@ static const char *state_to_string(enum stasis_app_playback_state state) return "playing"; case STASIS_PLAYBACK_STATE_PAUSED: return "paused"; + case STASIS_PLAYBACK_STATE_CONTINUING: + return "continuing"; case STASIS_PLAYBACK_STATE_STOPPED: case STASIS_PLAYBACK_STATE_COMPLETE: case STASIS_PLAYBACK_STATE_CANCELED: @@ -241,7 +264,11 @@ static void playback_final_update(struct stasis_app_playback *playback, playback->playedms = playedms; if (res == 0) { - playback->state = STASIS_PLAYBACK_STATE_COMPLETE; + if (playback->media_index == AST_VECTOR_SIZE(&playback->medias) - 1) { + playback->state = STASIS_PLAYBACK_STATE_COMPLETE; + } else { + playback->state = STASIS_PLAYBACK_STATE_CONTINUING; + } } else { if (playback->state == STASIS_PLAYBACK_STATE_STOPPED) { ast_log(LOG_NOTICE, "%s: Playback stopped for %s\n", @@ -262,7 +289,7 @@ static void play_on_channel(struct stasis_app_playback *playback, int res; long offsetms; - /* Even though these local variables look fairly pointless, the avoid + /* Even though these local variables look fairly pointless, they avoid * having a bunch of NULL's passed directly into * ast_control_streamfile() */ const char *fwd = NULL; @@ -273,73 +300,80 @@ static void play_on_channel(struct stasis_app_playback *playback, ast_assert(playback != NULL); - offsetms = playback->offsetms; - - res = playback_first_update(playback, ast_channel_uniqueid(chan)); - - if (res != 0) { - return; - } - if (ast_channel_state(chan) != AST_STATE_UP) { ast_indicate(chan, AST_CONTROL_PROGRESS); } - if (ast_begins_with(playback->media, SOUND_URI_SCHEME)) { - playback->controllable = 1; - - /* Play sound */ - res = ast_control_streamfile_lang(chan, playback->media + strlen(SOUND_URI_SCHEME), - fwd, rev, stop, pause, restart, playback->skipms, playback->language, - &offsetms); - } else if (ast_begins_with(playback->media, RECORDING_URI_SCHEME)) { - /* Play recording */ - RAII_VAR(struct stasis_app_stored_recording *, recording, NULL, - ao2_cleanup); - const char *relname = - playback->media + strlen(RECORDING_URI_SCHEME); - recording = stasis_app_stored_recording_find_by_name(relname); - - if (!recording) { - ast_log(LOG_ERROR, "Attempted to play recording '%s' on channel '%s' but recording does not exist", - relname, ast_channel_name(chan)); - return; - } + offsetms = playback->offsetms; - playback->controllable = 1; + for (; playback->media_index < AST_VECTOR_SIZE(&playback->medias); playback->media_index++) { - res = ast_control_streamfile_lang(chan, - stasis_app_stored_recording_get_file(recording), fwd, rev, stop, pause, - restart, playback->skipms, playback->language, &offsetms); - } else if (ast_begins_with(playback->media, NUMBER_URI_SCHEME)) { - int number; + /* Set the current media to play */ + ast_string_field_set(playback, media, AST_VECTOR_GET(&playback->medias, playback->media_index)); - if (sscanf(playback->media + strlen(NUMBER_URI_SCHEME), "%30d", &number) != 1) { - ast_log(LOG_ERROR, "Attempted to play number '%s' on channel '%s' but number is invalid", - playback->media + strlen(NUMBER_URI_SCHEME), ast_channel_name(chan)); + res = playback_first_update(playback, ast_channel_uniqueid(chan)); + if (res != 0) { return; } - res = ast_say_number(chan, number, stop, playback->language, NULL); - } else if (ast_begins_with(playback->media, DIGITS_URI_SCHEME)) { - res = ast_say_digit_str(chan, playback->media + strlen(DIGITS_URI_SCHEME), - stop, playback->language); - } else if (ast_begins_with(playback->media, CHARACTERS_URI_SCHEME)) { - res = ast_say_character_str(chan, playback->media + strlen(CHARACTERS_URI_SCHEME), - stop, playback->language, AST_SAY_CASE_NONE); - } else if (ast_begins_with(playback->media, TONE_URI_SCHEME)) { - playback->controllable = 1; - res = ast_control_tone(chan, playback->media + strlen(TONE_URI_SCHEME)); - } else { - /* Play URL */ - ast_log(LOG_ERROR, "Attempted to play URI '%s' on channel '%s' but scheme is unsupported\n", - playback->media, ast_channel_name(chan)); - return; - } + if (ast_begins_with(playback->media, SOUND_URI_SCHEME)) { + playback->controllable = 1; + + /* Play sound */ + res = ast_control_streamfile_lang(chan, playback->media + strlen(SOUND_URI_SCHEME), + fwd, rev, stop, pause, restart, playback->skipms, playback->language, + &offsetms); + } else if (ast_begins_with(playback->media, RECORDING_URI_SCHEME)) { + /* Play recording */ + RAII_VAR(struct stasis_app_stored_recording *, recording, NULL, + ao2_cleanup); + const char *relname = + playback->media + strlen(RECORDING_URI_SCHEME); + recording = stasis_app_stored_recording_find_by_name(relname); + + if (!recording) { + ast_log(LOG_ERROR, "Attempted to play recording '%s' on channel '%s' but recording does not exist", + relname, ast_channel_name(chan)); + continue; + } + + playback->controllable = 1; + + res = ast_control_streamfile_lang(chan, + stasis_app_stored_recording_get_file(recording), fwd, rev, stop, pause, + restart, playback->skipms, playback->language, &offsetms); + } else if (ast_begins_with(playback->media, NUMBER_URI_SCHEME)) { + int number; + + if (sscanf(playback->media + strlen(NUMBER_URI_SCHEME), "%30d", &number) != 1) { + ast_log(LOG_ERROR, "Attempted to play number '%s' on channel '%s' but number is invalid", + playback->media + strlen(NUMBER_URI_SCHEME), ast_channel_name(chan)); + continue; + } + + res = ast_say_number(chan, number, stop, playback->language, NULL); + } else if (ast_begins_with(playback->media, DIGITS_URI_SCHEME)) { + res = ast_say_digit_str(chan, playback->media + strlen(DIGITS_URI_SCHEME), + stop, playback->language); + } else if (ast_begins_with(playback->media, CHARACTERS_URI_SCHEME)) { + res = ast_say_character_str(chan, playback->media + strlen(CHARACTERS_URI_SCHEME), + stop, playback->language, AST_SAY_CASE_NONE); + } else if (ast_begins_with(playback->media, TONE_URI_SCHEME)) { + playback->controllable = 1; + res = ast_control_tone(chan, playback->media + strlen(TONE_URI_SCHEME)); + } else { + /* Play URL */ + ast_log(LOG_ERROR, "Attempted to play URI '%s' on channel '%s' but scheme is unsupported\n", + playback->media, ast_channel_name(chan)); + continue; + } - playback_final_update(playback, offsetms, res, - ast_channel_uniqueid(chan)); + playback_final_update(playback, offsetms, res, + ast_channel_uniqueid(chan)); + /* Reset offset for any subsequent media */ + offsetms = 0; + } return; } @@ -431,30 +465,45 @@ static void set_target_uri( } struct stasis_app_playback *stasis_app_control_play_uri( - struct stasis_app_control *control, const char *uri, - const char *language, const char *target_id, + struct stasis_app_control *control, const char **media, + size_t media_count, const char *language, const char *target_id, enum stasis_app_playback_target_type target_type, int skipms, long offsetms, const char *id) { struct stasis_app_playback *playback; + size_t i; - if (skipms < 0 || offsetms < 0) { + if (skipms < 0 || offsetms < 0 || media_count == 0) { return NULL; } - ast_debug(3, "%s: Sending play(%s) command\n", - stasis_app_control_get_channel_id(control), uri); - playback = playback_create(control, id); if (!playback) { return NULL; } + for (i = 0; i < media_count; i++) { + char *media_uri; + + media_uri = ast_malloc(strlen(media[i]) + 1); + if (!media_uri) { + ao2_ref(playback, -1); + return NULL; + } + + ast_debug(3, "%s: Sending play(%s) command\n", + stasis_app_control_get_channel_id(control), media[i]); + + /* safe */ + strcpy(media_uri, media[i]); + AST_VECTOR_APPEND(&playback->medias, media_uri); + } + if (skipms == 0) { skipms = PLAYBACK_DEFAULT_SKIPMS; } - ast_string_field_set(playback, media, uri); + ast_string_field_set(playback, media, AST_VECTOR_GET(&playback->medias, 0)); ast_string_field_set(playback, language, language); set_target_uri(playback, target_type, target_id); playback->skipms = skipms; @@ -497,12 +546,22 @@ struct ast_json *stasis_app_playback_to_json( return NULL; } - json = ast_json_pack("{s: s, s: s, s: s, s: s, s: s}", - "id", playback->id, - "media_uri", playback->media, - "target_uri", playback->target, - "language", playback->language, - "state", state_to_string(playback->state)); + if (playback->media_index == AST_VECTOR_SIZE(&playback->medias) - 1) { + json = ast_json_pack("{s: s, s: s, s: s, s: s, s: s}", + "id", playback->id, + "media_uri", playback->media, + "target_uri", playback->target, + "language", playback->language, + "state", state_to_string(playback->state)); + } else { + json = ast_json_pack("{s: s, s: s, s: s, s: s, s: s, s: s}", + "id", playback->id, + "media_uri", playback->media, + "next_media_uri", AST_VECTOR_GET(&playback->medias, playback->media_index + 1), + "target_uri", playback->target, + "language", playback->language, + "state", state_to_string(playback->state)); + } return ast_json_ref(json); } @@ -615,6 +674,13 @@ playback_opreation_cb operations[STASIS_PLAYBACK_STATE_MAX][STASIS_PLAYBACK_MEDI [STASIS_PLAYBACK_STATE_PLAYING][STASIS_PLAYBACK_REVERSE] = playback_reverse, [STASIS_PLAYBACK_STATE_PLAYING][STASIS_PLAYBACK_FORWARD] = playback_forward, + [STASIS_PLAYBACK_STATE_CONTINUING][STASIS_PLAYBACK_STOP] = playback_stop, + [STASIS_PLAYBACK_STATE_CONTINUING][STASIS_PLAYBACK_RESTART] = playback_restart, + [STASIS_PLAYBACK_STATE_CONTINUING][STASIS_PLAYBACK_PAUSE] = playback_pause, + [STASIS_PLAYBACK_STATE_CONTINUING][STASIS_PLAYBACK_UNPAUSE] = playback_noop, + [STASIS_PLAYBACK_STATE_CONTINUING][STASIS_PLAYBACK_REVERSE] = playback_reverse, + [STASIS_PLAYBACK_STATE_CONTINUING][STASIS_PLAYBACK_FORWARD] = playback_forward, + [STASIS_PLAYBACK_STATE_PAUSED][STASIS_PLAYBACK_STOP] = playback_stop, [STASIS_PLAYBACK_STATE_PAUSED][STASIS_PLAYBACK_PAUSE] = playback_noop, [STASIS_PLAYBACK_STATE_PAUSED][STASIS_PLAYBACK_UNPAUSE] = playback_unpause, |