From ada7346792452f021911063668997f79fdabc1f1 Mon Sep 17 00:00:00 2001 From: Richard Mudgett Date: Fri, 5 Jun 2015 15:37:33 -0500 Subject: res_pjsip: Need to use the same serializer for a pjproject SIP transaction. All send/receive processing for a SIP transaction needs to be done under the same threadpool serializer to prevent reentrancy problems inside pjproject and res_pjsip. * Add threadpool API call to get the current serializer associated with the worker thread. * Pick a serializer from a pool of default serializers if the caller of res_pjsip.c:ast_sip_push_task() does not provide one. This is a simple way to ensure that all outgoing SIP request messages are processed under a serializer. Otherwise, any place where a pushed task is done that would result in an outgoing out-of-dialog request would need to be modified to supply a serializer. Serializers from the default serializer pool are picked in a round robin sequence for simplicity. A side effect is that the default serializer pool will limit the growth of the thread pool from random tasks. This is not necessarily a bad thing. * Made pjsip_distributor.c save the thread's serializer name on the outgoing request tdata struct so the response can be processed under the same serializer. This is a cherry-pick from master. **** ASTERISK-25115 Change-Id: Iea71c16ce1132017b5791635e198b8c27973f40a NOTE: session_inv_on_state_changed() is disassociating the dialog from the session when the invite dialog becomes PJSIP_INV_STATE_DISCONNECTED. Unfortunately this is a tad too soon because our BYE request transaction has not completed yet. ASTERISK-25183 #close Reported by: Matt Jordan Change-Id: I8bad0ae1daf18d75b8c9e55874244b7962df2d0a --- res/res_pjsip.c | 92 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 12 deletions(-) (limited to 'res/res_pjsip.c') diff --git a/res/res_pjsip.c b/res/res_pjsip.c index e92de51bb..658a55e88 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1867,6 +1867,15 @@ #define MOD_DATA_CONTACT "contact" +/*! Number of serializers in pool if one not supplied. */ +#define SERIALIZER_POOL_SIZE 8 + +/*! Next serializer pool index to use. */ +static int serializer_pool_pos; + +/*! Pool of serializers to use if not supplied. */ +static struct ast_taskprocessor *serializer_pool[SERIALIZER_POOL_SIZE]; + static pjsip_endpoint *ast_pjsip_endpoint; static struct ast_threadpool *sip_threadpool; @@ -3341,8 +3350,62 @@ struct ast_taskprocessor *ast_sip_create_serializer(void) return ast_sip_create_serializer_group(NULL); } +/*! + * \internal + * \brief Shutdown the serializers in the default pool. + * \since 14.0.0 + * + * \return Nothing + */ +static void serializer_pool_shutdown(void) +{ + int idx; + + for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { + ast_taskprocessor_unreference(serializer_pool[idx]); + serializer_pool[idx] = NULL; + } +} + +/*! + * \internal + * \brief Setup the serializers in the default pool. + * \since 14.0.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int serializer_pool_setup(void) +{ + int idx; + + for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { + serializer_pool[idx] = ast_sip_create_serializer(); + if (!serializer_pool[idx]) { + serializer_pool_shutdown(); + return -1; + } + } + return 0; +} + int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { + if (!serializer) { + unsigned int pos; + + /* + * Pick a serializer to use from the pool. + * + * Note: We don't care about any reentrancy behavior + * when incrementing serializer_pool_pos. If it gets + * incorrectly incremented it doesn't matter. + */ + pos = serializer_pool_pos++; + pos %= SERIALIZER_POOL_SIZE; + serializer = serializer_pool[pos]; + } + if (serializer) { return ast_taskprocessor_push(serializer, sip_task, task_data); } else { @@ -3395,18 +3458,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si std.task = sip_task; std.task_data = task_data; - if (serializer) { - if (ast_taskprocessor_push(serializer, sync_task, &std)) { - ast_mutex_destroy(&std.lock); - ast_cond_destroy(&std.cond); - return -1; - } - } else { - if (ast_threadpool_push(sip_threadpool, sync_task, &std)) { - ast_mutex_destroy(&std.lock); - ast_cond_destroy(&std.cond); - return -1; - } + if (ast_sip_push_task(serializer, sync_task, &std)) { + ast_mutex_destroy(&std.lock); + ast_cond_destroy(&std.cond); + return -1; } ast_mutex_lock(&std.lock); @@ -3697,6 +3752,18 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } + if (serializer_pool_setup()) { + ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); + ast_threadpool_shutdown(sip_threadpool); + ast_sip_destroy_system(); + pj_pool_release(memory_pool); + memory_pool = NULL; + pjsip_endpt_destroy(ast_pjsip_endpoint); + ast_pjsip_endpoint = NULL; + pj_caching_pool_destroy(&caching_pool); + return AST_MODULE_LOAD_DECLINE; + } + ast_sip_initialize_dns(); pjsip_tsx_layer_init_module(ast_pjsip_endpoint); @@ -3826,6 +3893,7 @@ static int unload_module(void) */ ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL); + serializer_pool_shutdown(); ast_threadpool_shutdown(sip_threadpool); ast_sip_destroy_cli(); -- cgit v1.2.3