summaryrefslogtreecommitdiff
path: root/pjmedia
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-03-22 11:59:11 +0000
committerBenny Prijono <bennylp@teluu.com>2006-03-22 11:59:11 +0000
commitfde4e788d7e947a2f91583041d3ce0de97ad2d6a (patch)
tree70e8e0907d2a24cb73292fea08311a5e9202fa50 /pjmedia
parentd494bf04e608a604eaa96bd65b9daa72a1d98252 (diff)
Redesign RTP/RTCP stuffs so that stream does not create thread implicitly. Changed pjmedia_endpt_create() API.
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@350 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia')
-rw-r--r--pjmedia/include/pjmedia/endpoint.h20
-rw-r--r--pjmedia/src/pjmedia/endpoint.c130
-rw-r--r--pjmedia/src/pjmedia/stream.c268
3 files changed, 273 insertions, 145 deletions
diff --git a/pjmedia/include/pjmedia/endpoint.h b/pjmedia/include/pjmedia/endpoint.h
index 351d3e7f..d4dca351 100644
--- a/pjmedia/include/pjmedia/endpoint.h
+++ b/pjmedia/include/pjmedia/endpoint.h
@@ -44,17 +44,24 @@
PJ_BEGIN_DECL
-
/**
* Create an instance of media endpoint.
*
* @param pf Pool factory, which will be used by the media endpoint
* throughout its lifetime.
+ * @param ioqueue Optional ioqueue instance to be registered to the
+ * endpoint. The ioqueue instance is used to poll all RTP
+ * and RTCP sockets. If this argument is NULL, the
+ * endpoint will create an internal ioqueue instance.
+ * @param worker_cnt Specify the number of worker threads to be created
+ * to poll the ioqueue.
* @param p_endpt Pointer to receive the endpoint instance.
*
* @return PJ_SUCCESS on success.
*/
PJ_DECL(pj_status_t) pjmedia_endpt_create( pj_pool_factory *pf,
+ pj_ioqueue_t *ioqueue,
+ unsigned worker_cnt,
pjmedia_endpt **p_endpt);
/**
@@ -67,6 +74,17 @@ PJ_DECL(pj_status_t) pjmedia_endpt_create( pj_pool_factory *pf,
PJ_DECL(pj_status_t) pjmedia_endpt_destroy(pjmedia_endpt *endpt);
+
+/**
+ * Get the ioqueue instance of the media endpoint.
+ *
+ * @param endpt The media endpoint instance.
+ *
+ * @return The ioqueue instance of the media endpoint.
+ */
+PJ_DECL(pj_ioqueue_t*) pjmedia_endpt_get_ioqueue(pjmedia_endpt *endpt);
+
+
/**
* Request the media endpoint to create pool.
*
diff --git a/pjmedia/src/pjmedia/endpoint.c b/pjmedia/src/pjmedia/endpoint.c
index 71217b61..f4c22756 100644
--- a/pjmedia/src/pjmedia/endpoint.c
+++ b/pjmedia/src/pjmedia/endpoint.c
@@ -19,12 +19,13 @@
#include <pjmedia/endpoint.h>
#include <pjmedia/errno.h>
#include <pjmedia/sdp.h>
-#include <pj/sock.h>
-#include <pj/pool.h>
-#include <pj/string.h>
#include <pj/assert.h>
-#include <pj/os.h>
+#include <pj/ioqueue.h>
#include <pj/log.h>
+#include <pj/os.h>
+#include <pj/pool.h>
+#include <pj/sock.h>
+#include <pj/string.h>
#define THIS_FILE "endpoint.c"
@@ -65,6 +66,12 @@ PJ_DECL(pj_str_t) pjmedia_strerror( pj_status_t status, char *buffer,
pj_size_t bufsize);
+/* Worker thread proc. */
+static int PJ_THREAD_FUNC worker_proc(void*);
+
+
+#define MAX_THREADS 16
+
/** Concrete declaration of media endpoint. */
struct pjmedia_endpt
@@ -77,16 +84,34 @@ struct pjmedia_endpt
/** Codec manager. */
pjmedia_codec_mgr codec_mgr;
+
+ /** IOqueue instance. */
+ pj_ioqueue_t *ioqueue;
+
+ /** Do we own the ioqueue? */
+ pj_bool_t own_ioqueue;
+
+ /** Number of threads. */
+ unsigned thread_cnt;
+
+ /** IOqueue polling thread, if any. */
+ pj_thread_t *thread[MAX_THREADS];
+
+ /** To signal polling thread to quit. */
+ pj_bool_t quit_flag;
};
/**
* Initialize and get the instance of media endpoint.
*/
PJ_DEF(pj_status_t) pjmedia_endpt_create(pj_pool_factory *pf,
+ pj_ioqueue_t *ioqueue,
+ unsigned worker_cnt,
pjmedia_endpt **p_endpt)
{
pj_pool_t *pool;
pjmedia_endpt *endpt;
+ unsigned i;
pj_status_t status;
if (!error_subsys_registered) {
@@ -104,39 +129,59 @@ PJ_DEF(pj_status_t) pjmedia_endpt_create(pj_pool_factory *pf,
endpt = pj_pool_zalloc(pool, sizeof(struct pjmedia_endpt));
endpt->pool = pool;
endpt->pf = pf;
+ endpt->ioqueue = ioqueue;
+ endpt->thread_cnt = worker_cnt;
/* Sound */
pjmedia_snd_init(pf);
/* Init codec manager. */
status = pjmedia_codec_mgr_init(&endpt->codec_mgr);
- if (status != PJ_SUCCESS) {
- pjmedia_snd_deinit();
+ if (status != PJ_SUCCESS)
goto on_error;
- }
- /* Init and register G.711 codec. */
-#if 0
- // Starting from 0.5.4, codec factory is registered by applications.
- factory = pj_pool_alloc (endpt->pool, sizeof(pjmedia_codec_factory));
+ /* Create ioqueue if none is specified. */
+ if (endpt->ioqueue == NULL) {
+
+ endpt->own_ioqueue = PJ_TRUE;
- status = g711_init_factory (factory, endpt->pool);
- if (status != PJ_SUCCESS) {
- pjmedia_snd_deinit();
- goto on_error;
+ status = pj_ioqueue_create( endpt->pool, PJ_IOQUEUE_MAX_HANDLES,
+ &endpt->ioqueue);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ if (worker_cnt == 0) {
+ PJ_LOG(4,(THIS_FILE, "Warning: no worker thread is created in"
+ "media endpoint for internal ioqueue"));
+ }
}
- status = pjmedia_codec_mgr_register_factory (&endpt->codec_mgr, factory);
- if (status != PJ_SUCCESS) {
- pjmedia_snd_deinit();
- goto on_error;
+ /* Create worker threads if asked. */
+ for (i=0; i<worker_cnt; ++i) {
+ status = pj_thread_create( endpt->pool, "media", &worker_proc,
+ endpt, 0, 0, &endpt->thread[i]);
+ if (status != PJ_SUCCESS)
+ goto on_error;
}
-#endif
+
*p_endpt = endpt;
return PJ_SUCCESS;
on_error:
+
+ /* Destroy threads */
+ for (i=0; i<endpt->thread_cnt; ++i) {
+ if (endpt->thread[i]) {
+ pj_thread_destroy(endpt->thread[i]);
+ }
+ }
+
+ /* Destroy internal ioqueue */
+ if (endpt->ioqueue && endpt->own_ioqueue)
+ pj_ioqueue_destroy(endpt->ioqueue);
+
+ pjmedia_snd_deinit();
pj_pool_release(pool);
return status;
}
@@ -154,8 +199,27 @@ PJ_DEF(pjmedia_codec_mgr*) pjmedia_endpt_get_codec_mgr(pjmedia_endpt *endpt)
*/
PJ_DEF(pj_status_t) pjmedia_endpt_destroy (pjmedia_endpt *endpt)
{
+ unsigned i;
+
PJ_ASSERT_RETURN(endpt, PJ_EINVAL);
+ endpt->quit_flag = 1;
+
+ /* Destroy threads */
+ for (i=0; i<endpt->thread_cnt; ++i) {
+ if (endpt->thread[i]) {
+ pj_thread_join(endpt->thread[i]);
+ pj_thread_destroy(endpt->thread[i]);
+ endpt->thread[i] = NULL;
+ }
+ }
+
+ /* Destroy internal ioqueue */
+ if (endpt->ioqueue && endpt->own_ioqueue) {
+ pj_ioqueue_destroy(endpt->ioqueue);
+ endpt->ioqueue = NULL;
+ }
+
endpt->pf = NULL;
pjmedia_snd_deinit();
@@ -164,6 +228,32 @@ PJ_DEF(pj_status_t) pjmedia_endpt_destroy (pjmedia_endpt *endpt)
return PJ_SUCCESS;
}
+
+/**
+ * Get the ioqueue instance of the media endpoint.
+ */
+PJ_DEF(pj_ioqueue_t*) pjmedia_endpt_get_ioqueue(pjmedia_endpt *endpt)
+{
+ PJ_ASSERT_RETURN(endpt, NULL);
+ return endpt->ioqueue;
+}
+
+
+/**
+ * Worker thread proc.
+ */
+static int PJ_THREAD_FUNC worker_proc(void *arg)
+{
+ pjmedia_endpt *endpt = arg;
+
+ while (!endpt->quit_flag) {
+ pj_time_val timeout = { 0, 500 };
+ pj_ioqueue_poll(endpt->ioqueue, &timeout);
+ }
+
+ return 0;
+}
+
/**
* Create pool.
*/
diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c
index 671e7e59..95d9a7c9 100644
--- a/pjmedia/src/pjmedia/stream.c
+++ b/pjmedia/src/pjmedia/stream.c
@@ -26,6 +26,7 @@
#include <pj/ctype.h>
#include <pj/compat/socket.h>
#include <pj/errno.h>
+#include <pj/ioqueue.h>
#include <pj/log.h>
#include <pj/os.h>
#include <pj/pool.h>
@@ -78,6 +79,9 @@ struct dtmf
*/
struct pjmedia_stream
{
+ pjmedia_endpt *endpt; /**< Media endpoint. */
+ pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */
+
pjmedia_port port; /**< Port interface. */
pjmedia_channel *enc; /**< Encoding channel. */
pjmedia_channel *dec; /**< Decoding channel. */
@@ -86,7 +90,6 @@ struct pjmedia_stream
pjmedia_stream_stat stat; /**< Stream statistics. */
void *user_data; /**< User data. */
- pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */
pjmedia_codec *codec; /**< Codec instance being used. */
pj_size_t frame_size; /**< Size of encoded frame. */
pj_mutex_t *jb_mutex;
@@ -96,13 +99,18 @@ struct pjmedia_stream
pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address. */
pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address. */
- pj_sockaddr_in rem_src_rtp; /**< addr of src pkt from remote*/
- unsigned rem_src_cnt; /**< if different, # of pkt rcv */
pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */
- pj_bool_t quit_flag; /**< To signal thread exit. */
- pj_thread_t *thread; /**< Jitter buffer's thread. */
+ pj_ioqueue_key_t *rtp_key; /**< RTP ioqueue key. */
+ pj_ioqueue_op_key_t rtp_op_key; /**< The pending read op key. */
+ pj_sockaddr_in rtp_src_addr; /**< addr of src pkt from remote*/
+ unsigned rtp_src_cnt; /**< if different, # of pkt rcv */
+ int rtp_addrlen; /**< Address length. */
+
+ pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */
+ pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */
+
/* RFC 2833 DTMF transmission queue: */
int tx_event_pt; /**< Outgoing pt for dtmf. */
@@ -152,12 +160,6 @@ static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame)
pj_status_t status;
struct pjmedia_frame frame_in, frame_out;
- /* Do nothing if we're quitting. */
- if (stream->quit_flag) {
- frame->type = PJMEDIA_FRAME_TYPE_NONE;
- return PJ_SUCCESS;
- }
-
/* Lock jitter buffer mutex */
pj_mutex_lock( stream->jb_mutex );
@@ -271,10 +273,6 @@ static pj_status_t put_frame( pjmedia_port *port,
int rtphdrlen;
pj_ssize_t sent;
- /* Check if stream is quitting. */
- if (stream->quit_flag)
- return -1;
-
/* Number of samples in the frame */
ts_len = frame->size / 2;
@@ -438,77 +436,56 @@ static void handle_incoming_dtmf( pjmedia_stream *stream,
/*
- * This thread will poll the socket for incoming packets, and put
- * the packets to jitter buffer.
+ * This callback is called by ioqueue framework on receipt of packets
+ * in the RTP socket.
*/
-static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg)
+static void on_rx_rtp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+
{
- pjmedia_stream *stream = arg;
+ pjmedia_stream *stream = pj_ioqueue_get_user_data(key);
pjmedia_channel *channel = stream->dec;
+ pj_status_t status;
+
+
+ PJ_UNUSED_ARG(op_key);
- while (!stream->quit_flag) {
- pj_ssize_t len;
+ /*
+ * Loop while we have packet.
+ */
+ do {
const pjmedia_rtp_hdr *hdr;
const void *payload;
unsigned payloadlen;
- int addrlen;
- int status;
-
- /* Wait for packet. */
- pj_fd_set_t fds;
- pj_time_val timeout;
-
- PJ_FD_ZERO (&fds);
- PJ_FD_SET (stream->skinfo.rtp_sock, &fds);
- timeout.sec = 0;
- timeout.msec = 1;
-
- /* Wait with timeout. */
- status = pj_sock_select(FD_SETSIZE, &fds, NULL, NULL, &timeout);
- if (status < 0) {
- TRACE_((THIS_FILE, "Jitter buffer select() error",
- pj_get_netos_error()));
- pj_thread_sleep(500);
- continue;
- } else if (status == 0)
- continue;
-
- /* Get packet from socket. */
- len = channel->in_pkt_size;
- addrlen = sizeof(stream->rem_src_rtp);
- status = pj_sock_recvfrom(stream->skinfo.rtp_sock,
- channel->in_pkt, &len, 0,
- &stream->rem_src_rtp, &addrlen);
- if (len < 1 || status != PJ_SUCCESS) {
- if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) {
- /* On Win2K SP2 (or above) and WinXP, recv() will get
- * WSAECONNRESET when the sending side receives ICMP port
- * unreachable.
- */
- continue;
- }
- pj_thread_sleep(1);
- continue;
- }
- if (channel->paused)
- continue;
+ /* Go straight to read next packet if bytes_read == 0.
+ */
+ if (bytes_read == 0)
+ goto read_next_packet;
+
/* Update RTP and RTCP session. */
- status = pjmedia_rtp_decode_rtp(&channel->rtp, channel->in_pkt, len,
- &hdr, &payload, &payloadlen);
+ status = pjmedia_rtp_decode_rtp(&channel->rtp,
+ channel->in_pkt, bytes_read,
+ &hdr, &payload, &payloadlen);
if (status != PJ_SUCCESS) {
TRACE_((THIS_FILE, "RTP decode error", status));
- continue;
+ goto read_next_packet;
}
+
/* Handle incoming DTMF. */
if (hdr->pt == stream->rx_event_pt) {
handle_incoming_dtmf(stream, payload, payloadlen);
- continue;
+ goto read_next_packet;
}
+
+ /* Update RTP session (also checks if RTP session can accept
+ * the incoming packet.
+ */
status = pjmedia_rtp_session_update(&channel->rtp, hdr);
if (status != 0 &&
status != PJMEDIA_RTP_ESESSPROBATION &&
@@ -518,46 +495,85 @@ static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg)
status));
PJ_LOG(4,(THIS_FILE,"RTP packet detail: pt=%d, seq=%d",
hdr->pt, pj_ntohs(hdr->seq)));
- continue;
+ goto read_next_packet;
}
- pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts));
+
+
+ /* Update the RTCP session. */
+ pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq),
+ pj_ntohl(hdr->ts));
+
/* Update stat */
stream->stat.dec.pkt++;
- stream->stat.dec.bytes += len;
+ stream->stat.dec.bytes += bytes_read;
+
/* See if source address of RTP packet is different than the
* configured address.
*/
if ((stream->rem_rtp_addr.sin_addr.s_addr !=
- stream->rem_src_rtp.sin_addr.s_addr) ||
- (stream->rem_rtp_addr.sin_port != stream->rem_src_rtp.sin_port))
+ stream->rtp_src_addr.sin_addr.s_addr) ||
+ (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port))
{
- stream->rem_src_cnt++;
+ stream->rtp_src_cnt++;
- if (stream->rem_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {
+ if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {
- stream->rem_rtp_addr = stream->rem_src_rtp;
- stream->rem_src_cnt = 0;
+ stream->rem_rtp_addr = stream->rtp_src_addr;
+ stream->rtp_src_cnt = 0;
PJ_LOG(4,(THIS_FILE,"Remote RTP address switched to %s:%d",
- pj_inet_ntoa(stream->rem_src_rtp.sin_addr),
- pj_ntohs(stream->rem_src_rtp.sin_port)));
+ pj_inet_ntoa(stream->rtp_src_addr.sin_addr),
+ pj_ntohs(stream->rtp_src_addr.sin_port)));
}
}
+
/* Put to jitter buffer. */
pj_mutex_lock( stream->jb_mutex );
- status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, pj_ntohs(hdr->seq));
+ status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen,
+ pj_ntohs(hdr->seq));
pj_mutex_unlock( stream->jb_mutex );
if (status != 0) {
TRACE_((THIS_FILE, "Jitter buffer put() error", status));
- continue;
+ goto read_next_packet;
}
+
+read_next_packet:
+ bytes_read = channel->in_pkt_size;
+ stream->rtp_addrlen = sizeof(stream->rtp_src_addr);
+ status = pj_ioqueue_recvfrom( stream->rtp_key,
+ &stream->rtp_op_key,
+ channel->in_pkt,
+ &bytes_read, 0,
+ &stream->rtp_src_addr,
+ &stream->rtp_addrlen);
+
+ } while (status == PJ_SUCCESS);
+
+ if (status != PJ_SUCCESS && status != PJ_EPENDING) {
+ char errmsg[PJ_ERR_MSG_SIZE];
+
+ pj_strerror(status, errmsg, sizeof(errmsg));
+ PJ_LOG(4,(THIS_FILE, "Error reading RTP packet: %s [status=%d]",
+ errmsg, status));
}
+}
+
- return 0;
+/*
+ * This callback is called by ioqueue framework on receipt of packets
+ * in the RTCP socket.
+ */
+static void on_rx_rtcp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ PJ_UNUSED_ARG(key);
+ PJ_UNUSED_ARG(op_key);
+ PJ_UNUSED_ARG(bytes_read);
}
@@ -641,6 +657,7 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
{
pjmedia_stream *stream;
pjmedia_codec_param codec_param;
+ pj_ioqueue_callback ioqueue_cb;
pj_status_t status;
PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL);
@@ -667,10 +684,10 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
/* Init stream: */
-
+ stream->endpt = endpt;
+ stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);
stream->dir = info->dir;
stream->user_data = user_data;
- stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);
stream->skinfo = info->sock_info;
stream->rem_rtp_addr = info->rem_addr;
stream->tx_event_pt = info->tx_event_pt;
@@ -732,15 +749,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
goto err_cleanup;
- /* Create jitter buffer thread: */
-
- status = pj_thread_create(pool, "decode",
- &jitter_buffer_thread, stream,
- 0, PJ_THREAD_SUSPENDED, &stream->thread);
- if (status != PJ_SUCCESS)
- goto err_cleanup;
-
-
/* Create decoder channel: */
status = create_channel( pool, stream, PJMEDIA_DIR_DECODING,
@@ -756,11 +764,44 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
if (status != PJ_SUCCESS)
goto err_cleanup;
- /* Resume jitter buffer thread. */
- status = pj_thread_resume( stream->thread );
+ /* Register RTP socket to ioqueue */
+ pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
+ ioqueue_cb.on_read_complete = &on_rx_rtp;
+
+ status = pj_ioqueue_register_sock( pool,
+ pjmedia_endpt_get_ioqueue(endpt),
+ stream->skinfo.rtp_sock,
+ stream, &ioqueue_cb, &stream->rtp_key);
if (status != PJ_SUCCESS)
goto err_cleanup;
+ /* Init pending operation key. */
+ pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key));
+
+ /* Bootstrap the first recvfrom() operation. */
+ on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0);
+
+
+ /* Register RTCP socket to ioqueue. */
+ if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) {
+ pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
+ ioqueue_cb.on_read_complete = &on_rx_rtcp;
+
+ status = pj_ioqueue_register_sock( pool,
+ pjmedia_endpt_get_ioqueue(endpt),
+ stream->skinfo.rtcp_sock,
+ stream, &ioqueue_cb,
+ &stream->rtcp_key);
+ if (status != PJ_SUCCESS)
+ goto err_cleanup;
+ }
+
+ /* Init pending operation key. */
+ pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key));
+
+ /* Bootstrap the first recvfrom() operation. */
+ on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0);
+
/* Success! */
*p_stream = stream;
return PJ_SUCCESS;
@@ -780,41 +821,20 @@ PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream )
PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL);
- /* Signal threads to quit. */
-
- stream->quit_flag = 1;
-
-
- /* Close encoding sound stream. */
-
- /*
- if (stream->enc && stream->enc->snd_stream) {
-
- pjmedia_snd_stream_stop(stream->enc->snd_stream);
- pjmedia_snd_stream_close(stream->enc->snd_stream);
- stream->enc->snd_stream = NULL;
-
- }
- */
-
- /* Close decoding sound stream. */
- /*
- if (stream->dec && stream->dec->snd_stream) {
+ /* This function may be called when stream is partly initialized. */
+ if (stream->jb_mutex)
+ pj_mutex_lock(stream->jb_mutex);
- pjmedia_snd_stream_stop(stream->dec->snd_stream);
- pjmedia_snd_stream_close(stream->dec->snd_stream);
- stream->dec->snd_stream = NULL;
+ /* Unregister from ioqueue. */
+ if (stream->rtp_key) {
+ pj_ioqueue_unregister(stream->rtp_key);
+ stream->rtp_key = NULL;
}
- */
-
- /* Wait for jitter buffer thread to quit: */
-
- if (stream->thread) {
- pj_thread_join(stream->thread);
- pj_thread_destroy(stream->thread);
- stream->thread = NULL;
+ if (stream->rtcp_key) {
+ pj_ioqueue_unregister(stream->rtcp_key);
+ stream->rtcp_key = NULL;
}
/* Free codec. */