From fde4e788d7e947a2f91583041d3ce0de97ad2d6a Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Wed, 22 Mar 2006 11:59:11 +0000 Subject: Redesign RTP/RTCP stuffs so that stream does not create thread implicitly. Changed pjmedia_endpt_create() API. git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@350 74dad513-b988-da41-8d7b-12977e46ad98 --- pjmedia/include/pjmedia/endpoint.h | 20 ++- pjmedia/src/pjmedia/endpoint.c | 130 +++++++++++++++--- pjmedia/src/pjmedia/stream.c | 268 ++++++++++++++++++++----------------- 3 files changed, 273 insertions(+), 145 deletions(-) (limited to 'pjmedia') diff --git a/pjmedia/include/pjmedia/endpoint.h b/pjmedia/include/pjmedia/endpoint.h index 351d3e7f..d4dca351 100644 --- a/pjmedia/include/pjmedia/endpoint.h +++ b/pjmedia/include/pjmedia/endpoint.h @@ -44,17 +44,24 @@ PJ_BEGIN_DECL - /** * Create an instance of media endpoint. * * @param pf Pool factory, which will be used by the media endpoint * throughout its lifetime. + * @param ioqueue Optional ioqueue instance to be registered to the + * endpoint. The ioqueue instance is used to poll all RTP + * and RTCP sockets. If this argument is NULL, the + * endpoint will create an internal ioqueue instance. + * @param worker_cnt Specify the number of worker threads to be created + * to poll the ioqueue. * @param p_endpt Pointer to receive the endpoint instance. * * @return PJ_SUCCESS on success. */ PJ_DECL(pj_status_t) pjmedia_endpt_create( pj_pool_factory *pf, + pj_ioqueue_t *ioqueue, + unsigned worker_cnt, pjmedia_endpt **p_endpt); /** @@ -67,6 +74,17 @@ PJ_DECL(pj_status_t) pjmedia_endpt_create( pj_pool_factory *pf, PJ_DECL(pj_status_t) pjmedia_endpt_destroy(pjmedia_endpt *endpt); + +/** + * Get the ioqueue instance of the media endpoint. + * + * @param endpt The media endpoint instance. + * + * @return The ioqueue instance of the media endpoint. + */ +PJ_DECL(pj_ioqueue_t*) pjmedia_endpt_get_ioqueue(pjmedia_endpt *endpt); + + /** * Request the media endpoint to create pool. * diff --git a/pjmedia/src/pjmedia/endpoint.c b/pjmedia/src/pjmedia/endpoint.c index 71217b61..f4c22756 100644 --- a/pjmedia/src/pjmedia/endpoint.c +++ b/pjmedia/src/pjmedia/endpoint.c @@ -19,12 +19,13 @@ #include #include #include -#include -#include -#include #include -#include +#include #include +#include +#include +#include +#include #define THIS_FILE "endpoint.c" @@ -65,6 +66,12 @@ PJ_DECL(pj_str_t) pjmedia_strerror( pj_status_t status, char *buffer, pj_size_t bufsize); +/* Worker thread proc. */ +static int PJ_THREAD_FUNC worker_proc(void*); + + +#define MAX_THREADS 16 + /** Concrete declaration of media endpoint. */ struct pjmedia_endpt @@ -77,16 +84,34 @@ struct pjmedia_endpt /** Codec manager. */ pjmedia_codec_mgr codec_mgr; + + /** IOqueue instance. */ + pj_ioqueue_t *ioqueue; + + /** Do we own the ioqueue? */ + pj_bool_t own_ioqueue; + + /** Number of threads. */ + unsigned thread_cnt; + + /** IOqueue polling thread, if any. */ + pj_thread_t *thread[MAX_THREADS]; + + /** To signal polling thread to quit. */ + pj_bool_t quit_flag; }; /** * Initialize and get the instance of media endpoint. */ PJ_DEF(pj_status_t) pjmedia_endpt_create(pj_pool_factory *pf, + pj_ioqueue_t *ioqueue, + unsigned worker_cnt, pjmedia_endpt **p_endpt) { pj_pool_t *pool; pjmedia_endpt *endpt; + unsigned i; pj_status_t status; if (!error_subsys_registered) { @@ -104,39 +129,59 @@ PJ_DEF(pj_status_t) pjmedia_endpt_create(pj_pool_factory *pf, endpt = pj_pool_zalloc(pool, sizeof(struct pjmedia_endpt)); endpt->pool = pool; endpt->pf = pf; + endpt->ioqueue = ioqueue; + endpt->thread_cnt = worker_cnt; /* Sound */ pjmedia_snd_init(pf); /* Init codec manager. */ status = pjmedia_codec_mgr_init(&endpt->codec_mgr); - if (status != PJ_SUCCESS) { - pjmedia_snd_deinit(); + if (status != PJ_SUCCESS) goto on_error; - } - /* Init and register G.711 codec. */ -#if 0 - // Starting from 0.5.4, codec factory is registered by applications. - factory = pj_pool_alloc (endpt->pool, sizeof(pjmedia_codec_factory)); + /* Create ioqueue if none is specified. */ + if (endpt->ioqueue == NULL) { + + endpt->own_ioqueue = PJ_TRUE; - status = g711_init_factory (factory, endpt->pool); - if (status != PJ_SUCCESS) { - pjmedia_snd_deinit(); - goto on_error; + status = pj_ioqueue_create( endpt->pool, PJ_IOQUEUE_MAX_HANDLES, + &endpt->ioqueue); + if (status != PJ_SUCCESS) + goto on_error; + + if (worker_cnt == 0) { + PJ_LOG(4,(THIS_FILE, "Warning: no worker thread is created in" + "media endpoint for internal ioqueue")); + } } - status = pjmedia_codec_mgr_register_factory (&endpt->codec_mgr, factory); - if (status != PJ_SUCCESS) { - pjmedia_snd_deinit(); - goto on_error; + /* Create worker threads if asked. */ + for (i=0; ipool, "media", &worker_proc, + endpt, 0, 0, &endpt->thread[i]); + if (status != PJ_SUCCESS) + goto on_error; } -#endif + *p_endpt = endpt; return PJ_SUCCESS; on_error: + + /* Destroy threads */ + for (i=0; ithread_cnt; ++i) { + if (endpt->thread[i]) { + pj_thread_destroy(endpt->thread[i]); + } + } + + /* Destroy internal ioqueue */ + if (endpt->ioqueue && endpt->own_ioqueue) + pj_ioqueue_destroy(endpt->ioqueue); + + pjmedia_snd_deinit(); pj_pool_release(pool); return status; } @@ -154,8 +199,27 @@ PJ_DEF(pjmedia_codec_mgr*) pjmedia_endpt_get_codec_mgr(pjmedia_endpt *endpt) */ PJ_DEF(pj_status_t) pjmedia_endpt_destroy (pjmedia_endpt *endpt) { + unsigned i; + PJ_ASSERT_RETURN(endpt, PJ_EINVAL); + endpt->quit_flag = 1; + + /* Destroy threads */ + for (i=0; ithread_cnt; ++i) { + if (endpt->thread[i]) { + pj_thread_join(endpt->thread[i]); + pj_thread_destroy(endpt->thread[i]); + endpt->thread[i] = NULL; + } + } + + /* Destroy internal ioqueue */ + if (endpt->ioqueue && endpt->own_ioqueue) { + pj_ioqueue_destroy(endpt->ioqueue); + endpt->ioqueue = NULL; + } + endpt->pf = NULL; pjmedia_snd_deinit(); @@ -164,6 +228,32 @@ PJ_DEF(pj_status_t) pjmedia_endpt_destroy (pjmedia_endpt *endpt) return PJ_SUCCESS; } + +/** + * Get the ioqueue instance of the media endpoint. + */ +PJ_DEF(pj_ioqueue_t*) pjmedia_endpt_get_ioqueue(pjmedia_endpt *endpt) +{ + PJ_ASSERT_RETURN(endpt, NULL); + return endpt->ioqueue; +} + + +/** + * Worker thread proc. + */ +static int PJ_THREAD_FUNC worker_proc(void *arg) +{ + pjmedia_endpt *endpt = arg; + + while (!endpt->quit_flag) { + pj_time_val timeout = { 0, 500 }; + pj_ioqueue_poll(endpt->ioqueue, &timeout); + } + + return 0; +} + /** * Create pool. */ diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c index 671e7e59..95d9a7c9 100644 --- a/pjmedia/src/pjmedia/stream.c +++ b/pjmedia/src/pjmedia/stream.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -78,6 +79,9 @@ struct dtmf */ struct pjmedia_stream { + pjmedia_endpt *endpt; /**< Media endpoint. */ + pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */ + pjmedia_port port; /**< Port interface. */ pjmedia_channel *enc; /**< Encoding channel. */ pjmedia_channel *dec; /**< Decoding channel. */ @@ -86,7 +90,6 @@ struct pjmedia_stream pjmedia_stream_stat stat; /**< Stream statistics. */ void *user_data; /**< User data. */ - pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */ pjmedia_codec *codec; /**< Codec instance being used. */ pj_size_t frame_size; /**< Size of encoded frame. */ pj_mutex_t *jb_mutex; @@ -96,13 +99,18 @@ struct pjmedia_stream pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address. */ pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address. */ - pj_sockaddr_in rem_src_rtp; /**< addr of src pkt from remote*/ - unsigned rem_src_cnt; /**< if different, # of pkt rcv */ pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */ - pj_bool_t quit_flag; /**< To signal thread exit. */ - pj_thread_t *thread; /**< Jitter buffer's thread. */ + pj_ioqueue_key_t *rtp_key; /**< RTP ioqueue key. */ + pj_ioqueue_op_key_t rtp_op_key; /**< The pending read op key. */ + pj_sockaddr_in rtp_src_addr; /**< addr of src pkt from remote*/ + unsigned rtp_src_cnt; /**< if different, # of pkt rcv */ + int rtp_addrlen; /**< Address length. */ + + pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */ + pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */ + /* RFC 2833 DTMF transmission queue: */ int tx_event_pt; /**< Outgoing pt for dtmf. */ @@ -152,12 +160,6 @@ static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame) pj_status_t status; struct pjmedia_frame frame_in, frame_out; - /* Do nothing if we're quitting. */ - if (stream->quit_flag) { - frame->type = PJMEDIA_FRAME_TYPE_NONE; - return PJ_SUCCESS; - } - /* Lock jitter buffer mutex */ pj_mutex_lock( stream->jb_mutex ); @@ -271,10 +273,6 @@ static pj_status_t put_frame( pjmedia_port *port, int rtphdrlen; pj_ssize_t sent; - /* Check if stream is quitting. */ - if (stream->quit_flag) - return -1; - /* Number of samples in the frame */ ts_len = frame->size / 2; @@ -438,77 +436,56 @@ static void handle_incoming_dtmf( pjmedia_stream *stream, /* - * This thread will poll the socket for incoming packets, and put - * the packets to jitter buffer. + * This callback is called by ioqueue framework on receipt of packets + * in the RTP socket. */ -static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg) +static void on_rx_rtp( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) + { - pjmedia_stream *stream = arg; + pjmedia_stream *stream = pj_ioqueue_get_user_data(key); pjmedia_channel *channel = stream->dec; + pj_status_t status; + + + PJ_UNUSED_ARG(op_key); - while (!stream->quit_flag) { - pj_ssize_t len; + /* + * Loop while we have packet. + */ + do { const pjmedia_rtp_hdr *hdr; const void *payload; unsigned payloadlen; - int addrlen; - int status; - - /* Wait for packet. */ - pj_fd_set_t fds; - pj_time_val timeout; - - PJ_FD_ZERO (&fds); - PJ_FD_SET (stream->skinfo.rtp_sock, &fds); - timeout.sec = 0; - timeout.msec = 1; - - /* Wait with timeout. */ - status = pj_sock_select(FD_SETSIZE, &fds, NULL, NULL, &timeout); - if (status < 0) { - TRACE_((THIS_FILE, "Jitter buffer select() error", - pj_get_netos_error())); - pj_thread_sleep(500); - continue; - } else if (status == 0) - continue; - - /* Get packet from socket. */ - len = channel->in_pkt_size; - addrlen = sizeof(stream->rem_src_rtp); - status = pj_sock_recvfrom(stream->skinfo.rtp_sock, - channel->in_pkt, &len, 0, - &stream->rem_src_rtp, &addrlen); - if (len < 1 || status != PJ_SUCCESS) { - if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { - /* On Win2K SP2 (or above) and WinXP, recv() will get - * WSAECONNRESET when the sending side receives ICMP port - * unreachable. - */ - continue; - } - pj_thread_sleep(1); - continue; - } - if (channel->paused) - continue; + /* Go straight to read next packet if bytes_read == 0. + */ + if (bytes_read == 0) + goto read_next_packet; + /* Update RTP and RTCP session. */ - status = pjmedia_rtp_decode_rtp(&channel->rtp, channel->in_pkt, len, - &hdr, &payload, &payloadlen); + status = pjmedia_rtp_decode_rtp(&channel->rtp, + channel->in_pkt, bytes_read, + &hdr, &payload, &payloadlen); if (status != PJ_SUCCESS) { TRACE_((THIS_FILE, "RTP decode error", status)); - continue; + goto read_next_packet; } + /* Handle incoming DTMF. */ if (hdr->pt == stream->rx_event_pt) { handle_incoming_dtmf(stream, payload, payloadlen); - continue; + goto read_next_packet; } + + /* Update RTP session (also checks if RTP session can accept + * the incoming packet. + */ status = pjmedia_rtp_session_update(&channel->rtp, hdr); if (status != 0 && status != PJMEDIA_RTP_ESESSPROBATION && @@ -518,46 +495,85 @@ static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg) status)); PJ_LOG(4,(THIS_FILE,"RTP packet detail: pt=%d, seq=%d", hdr->pt, pj_ntohs(hdr->seq))); - continue; + goto read_next_packet; } - pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts)); + + + /* Update the RTCP session. */ + pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), + pj_ntohl(hdr->ts)); + /* Update stat */ stream->stat.dec.pkt++; - stream->stat.dec.bytes += len; + stream->stat.dec.bytes += bytes_read; + /* See if source address of RTP packet is different than the * configured address. */ if ((stream->rem_rtp_addr.sin_addr.s_addr != - stream->rem_src_rtp.sin_addr.s_addr) || - (stream->rem_rtp_addr.sin_port != stream->rem_src_rtp.sin_port)) + stream->rtp_src_addr.sin_addr.s_addr) || + (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port)) { - stream->rem_src_cnt++; + stream->rtp_src_cnt++; - if (stream->rem_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { + if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { - stream->rem_rtp_addr = stream->rem_src_rtp; - stream->rem_src_cnt = 0; + stream->rem_rtp_addr = stream->rtp_src_addr; + stream->rtp_src_cnt = 0; PJ_LOG(4,(THIS_FILE,"Remote RTP address switched to %s:%d", - pj_inet_ntoa(stream->rem_src_rtp.sin_addr), - pj_ntohs(stream->rem_src_rtp.sin_port))); + pj_inet_ntoa(stream->rtp_src_addr.sin_addr), + pj_ntohs(stream->rtp_src_addr.sin_port))); } } + /* Put to jitter buffer. */ pj_mutex_lock( stream->jb_mutex ); - status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, pj_ntohs(hdr->seq)); + status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, + pj_ntohs(hdr->seq)); pj_mutex_unlock( stream->jb_mutex ); if (status != 0) { TRACE_((THIS_FILE, "Jitter buffer put() error", status)); - continue; + goto read_next_packet; } + +read_next_packet: + bytes_read = channel->in_pkt_size; + stream->rtp_addrlen = sizeof(stream->rtp_src_addr); + status = pj_ioqueue_recvfrom( stream->rtp_key, + &stream->rtp_op_key, + channel->in_pkt, + &bytes_read, 0, + &stream->rtp_src_addr, + &stream->rtp_addrlen); + + } while (status == PJ_SUCCESS); + + if (status != PJ_SUCCESS && status != PJ_EPENDING) { + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(THIS_FILE, "Error reading RTP packet: %s [status=%d]", + errmsg, status)); } +} + - return 0; +/* + * This callback is called by ioqueue framework on receipt of packets + * in the RTCP socket. + */ +static void on_rx_rtcp( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + PJ_UNUSED_ARG(key); + PJ_UNUSED_ARG(op_key); + PJ_UNUSED_ARG(bytes_read); } @@ -641,6 +657,7 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, { pjmedia_stream *stream; pjmedia_codec_param codec_param; + pj_ioqueue_callback ioqueue_cb; pj_status_t status; PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL); @@ -667,10 +684,10 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, /* Init stream: */ - + stream->endpt = endpt; + stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); stream->dir = info->dir; stream->user_data = user_data; - stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); stream->skinfo = info->sock_info; stream->rem_rtp_addr = info->rem_addr; stream->tx_event_pt = info->tx_event_pt; @@ -732,15 +749,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, goto err_cleanup; - /* Create jitter buffer thread: */ - - status = pj_thread_create(pool, "decode", - &jitter_buffer_thread, stream, - 0, PJ_THREAD_SUSPENDED, &stream->thread); - if (status != PJ_SUCCESS) - goto err_cleanup; - - /* Create decoder channel: */ status = create_channel( pool, stream, PJMEDIA_DIR_DECODING, @@ -756,11 +764,44 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, if (status != PJ_SUCCESS) goto err_cleanup; - /* Resume jitter buffer thread. */ - status = pj_thread_resume( stream->thread ); + /* Register RTP socket to ioqueue */ + pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); + ioqueue_cb.on_read_complete = &on_rx_rtp; + + status = pj_ioqueue_register_sock( pool, + pjmedia_endpt_get_ioqueue(endpt), + stream->skinfo.rtp_sock, + stream, &ioqueue_cb, &stream->rtp_key); if (status != PJ_SUCCESS) goto err_cleanup; + /* Init pending operation key. */ + pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key)); + + /* Bootstrap the first recvfrom() operation. */ + on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0); + + + /* Register RTCP socket to ioqueue. */ + if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) { + pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); + ioqueue_cb.on_read_complete = &on_rx_rtcp; + + status = pj_ioqueue_register_sock( pool, + pjmedia_endpt_get_ioqueue(endpt), + stream->skinfo.rtcp_sock, + stream, &ioqueue_cb, + &stream->rtcp_key); + if (status != PJ_SUCCESS) + goto err_cleanup; + } + + /* Init pending operation key. */ + pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key)); + + /* Bootstrap the first recvfrom() operation. */ + on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0); + /* Success! */ *p_stream = stream; return PJ_SUCCESS; @@ -780,41 +821,20 @@ PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream ) PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); - /* Signal threads to quit. */ - - stream->quit_flag = 1; - - - /* Close encoding sound stream. */ - - /* - if (stream->enc && stream->enc->snd_stream) { - - pjmedia_snd_stream_stop(stream->enc->snd_stream); - pjmedia_snd_stream_close(stream->enc->snd_stream); - stream->enc->snd_stream = NULL; - - } - */ - - /* Close decoding sound stream. */ - /* - if (stream->dec && stream->dec->snd_stream) { + /* This function may be called when stream is partly initialized. */ + if (stream->jb_mutex) + pj_mutex_lock(stream->jb_mutex); - pjmedia_snd_stream_stop(stream->dec->snd_stream); - pjmedia_snd_stream_close(stream->dec->snd_stream); - stream->dec->snd_stream = NULL; + /* Unregister from ioqueue. */ + if (stream->rtp_key) { + pj_ioqueue_unregister(stream->rtp_key); + stream->rtp_key = NULL; } - */ - - /* Wait for jitter buffer thread to quit: */ - - if (stream->thread) { - pj_thread_join(stream->thread); - pj_thread_destroy(stream->thread); - stream->thread = NULL; + if (stream->rtcp_key) { + pj_ioqueue_unregister(stream->rtcp_key); + stream->rtcp_key = NULL; } /* Free codec. */ -- cgit v1.2.3