summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-03-22 11:59:11 +0000
committerBenny Prijono <bennylp@teluu.com>2006-03-22 11:59:11 +0000
commitfde4e788d7e947a2f91583041d3ce0de97ad2d6a (patch)
tree70e8e0907d2a24cb73292fea08311a5e9202fa50
parentd494bf04e608a604eaa96bd65b9daa72a1d98252 (diff)
Redesign RTP/RTCP stuffs so that stream does not create thread implicitly. Changed pjmedia_endpt_create() API.
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@350 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjmedia/include/pjmedia/endpoint.h20
-rw-r--r--pjmedia/src/pjmedia/endpoint.c130
-rw-r--r--pjmedia/src/pjmedia/stream.c268
-rw-r--r--pjsip-apps/build/samples.dsp6
-rw-r--r--pjsip-apps/src/pjsip-perf/main.c4
-rw-r--r--pjsip-apps/src/samples/confsample.c2
-rw-r--r--pjsip-apps/src/samples/level.c2
-rw-r--r--pjsip-apps/src/samples/playfile.c2
-rw-r--r--pjsip-apps/src/samples/playsine.c2
-rw-r--r--pjsip-apps/src/samples/simpleua.c2
-rw-r--r--pjsip-apps/src/samples/sndinfo.c2
-rw-r--r--pjsip/include/pjsua-lib/pjsua.h1
-rw-r--r--pjsip/src/pjsua-lib/pjsua_call.c86
-rw-r--r--pjsip/src/pjsua-lib/pjsua_core.c5
14 files changed, 361 insertions, 171 deletions
diff --git a/pjmedia/include/pjmedia/endpoint.h b/pjmedia/include/pjmedia/endpoint.h
index 351d3e7f..d4dca351 100644
--- a/pjmedia/include/pjmedia/endpoint.h
+++ b/pjmedia/include/pjmedia/endpoint.h
@@ -44,17 +44,24 @@
PJ_BEGIN_DECL
-
/**
* Create an instance of media endpoint.
*
* @param pf Pool factory, which will be used by the media endpoint
* throughout its lifetime.
+ * @param ioqueue Optional ioqueue instance to be registered to the
+ * endpoint. The ioqueue instance is used to poll all RTP
+ * and RTCP sockets. If this argument is NULL, the
+ * endpoint will create an internal ioqueue instance.
+ * @param worker_cnt Specify the number of worker threads to be created
+ * to poll the ioqueue.
* @param p_endpt Pointer to receive the endpoint instance.
*
* @return PJ_SUCCESS on success.
*/
PJ_DECL(pj_status_t) pjmedia_endpt_create( pj_pool_factory *pf,
+ pj_ioqueue_t *ioqueue,
+ unsigned worker_cnt,
pjmedia_endpt **p_endpt);
/**
@@ -67,6 +74,17 @@ PJ_DECL(pj_status_t) pjmedia_endpt_create( pj_pool_factory *pf,
PJ_DECL(pj_status_t) pjmedia_endpt_destroy(pjmedia_endpt *endpt);
+
+/**
+ * Get the ioqueue instance of the media endpoint.
+ *
+ * @param endpt The media endpoint instance.
+ *
+ * @return The ioqueue instance of the media endpoint.
+ */
+PJ_DECL(pj_ioqueue_t*) pjmedia_endpt_get_ioqueue(pjmedia_endpt *endpt);
+
+
/**
* Request the media endpoint to create pool.
*
diff --git a/pjmedia/src/pjmedia/endpoint.c b/pjmedia/src/pjmedia/endpoint.c
index 71217b61..f4c22756 100644
--- a/pjmedia/src/pjmedia/endpoint.c
+++ b/pjmedia/src/pjmedia/endpoint.c
@@ -19,12 +19,13 @@
#include <pjmedia/endpoint.h>
#include <pjmedia/errno.h>
#include <pjmedia/sdp.h>
-#include <pj/sock.h>
-#include <pj/pool.h>
-#include <pj/string.h>
#include <pj/assert.h>
-#include <pj/os.h>
+#include <pj/ioqueue.h>
#include <pj/log.h>
+#include <pj/os.h>
+#include <pj/pool.h>
+#include <pj/sock.h>
+#include <pj/string.h>
#define THIS_FILE "endpoint.c"
@@ -65,6 +66,12 @@ PJ_DECL(pj_str_t) pjmedia_strerror( pj_status_t status, char *buffer,
pj_size_t bufsize);
+/* Worker thread proc. */
+static int PJ_THREAD_FUNC worker_proc(void*);
+
+
+#define MAX_THREADS 16
+
/** Concrete declaration of media endpoint. */
struct pjmedia_endpt
@@ -77,16 +84,34 @@ struct pjmedia_endpt
/** Codec manager. */
pjmedia_codec_mgr codec_mgr;
+
+ /** IOqueue instance. */
+ pj_ioqueue_t *ioqueue;
+
+ /** Do we own the ioqueue? */
+ pj_bool_t own_ioqueue;
+
+ /** Number of threads. */
+ unsigned thread_cnt;
+
+ /** IOqueue polling thread, if any. */
+ pj_thread_t *thread[MAX_THREADS];
+
+ /** To signal polling thread to quit. */
+ pj_bool_t quit_flag;
};
/**
* Initialize and get the instance of media endpoint.
*/
PJ_DEF(pj_status_t) pjmedia_endpt_create(pj_pool_factory *pf,
+ pj_ioqueue_t *ioqueue,
+ unsigned worker_cnt,
pjmedia_endpt **p_endpt)
{
pj_pool_t *pool;
pjmedia_endpt *endpt;
+ unsigned i;
pj_status_t status;
if (!error_subsys_registered) {
@@ -104,39 +129,59 @@ PJ_DEF(pj_status_t) pjmedia_endpt_create(pj_pool_factory *pf,
endpt = pj_pool_zalloc(pool, sizeof(struct pjmedia_endpt));
endpt->pool = pool;
endpt->pf = pf;
+ endpt->ioqueue = ioqueue;
+ endpt->thread_cnt = worker_cnt;
/* Sound */
pjmedia_snd_init(pf);
/* Init codec manager. */
status = pjmedia_codec_mgr_init(&endpt->codec_mgr);
- if (status != PJ_SUCCESS) {
- pjmedia_snd_deinit();
+ if (status != PJ_SUCCESS)
goto on_error;
- }
- /* Init and register G.711 codec. */
-#if 0
- // Starting from 0.5.4, codec factory is registered by applications.
- factory = pj_pool_alloc (endpt->pool, sizeof(pjmedia_codec_factory));
+ /* Create ioqueue if none is specified. */
+ if (endpt->ioqueue == NULL) {
+
+ endpt->own_ioqueue = PJ_TRUE;
- status = g711_init_factory (factory, endpt->pool);
- if (status != PJ_SUCCESS) {
- pjmedia_snd_deinit();
- goto on_error;
+ status = pj_ioqueue_create( endpt->pool, PJ_IOQUEUE_MAX_HANDLES,
+ &endpt->ioqueue);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ if (worker_cnt == 0) {
+ PJ_LOG(4,(THIS_FILE, "Warning: no worker thread is created in"
+ "media endpoint for internal ioqueue"));
+ }
}
- status = pjmedia_codec_mgr_register_factory (&endpt->codec_mgr, factory);
- if (status != PJ_SUCCESS) {
- pjmedia_snd_deinit();
- goto on_error;
+ /* Create worker threads if asked. */
+ for (i=0; i<worker_cnt; ++i) {
+ status = pj_thread_create( endpt->pool, "media", &worker_proc,
+ endpt, 0, 0, &endpt->thread[i]);
+ if (status != PJ_SUCCESS)
+ goto on_error;
}
-#endif
+
*p_endpt = endpt;
return PJ_SUCCESS;
on_error:
+
+ /* Destroy threads */
+ for (i=0; i<endpt->thread_cnt; ++i) {
+ if (endpt->thread[i]) {
+ pj_thread_destroy(endpt->thread[i]);
+ }
+ }
+
+ /* Destroy internal ioqueue */
+ if (endpt->ioqueue && endpt->own_ioqueue)
+ pj_ioqueue_destroy(endpt->ioqueue);
+
+ pjmedia_snd_deinit();
pj_pool_release(pool);
return status;
}
@@ -154,8 +199,27 @@ PJ_DEF(pjmedia_codec_mgr*) pjmedia_endpt_get_codec_mgr(pjmedia_endpt *endpt)
*/
PJ_DEF(pj_status_t) pjmedia_endpt_destroy (pjmedia_endpt *endpt)
{
+ unsigned i;
+
PJ_ASSERT_RETURN(endpt, PJ_EINVAL);
+ endpt->quit_flag = 1;
+
+ /* Destroy threads */
+ for (i=0; i<endpt->thread_cnt; ++i) {
+ if (endpt->thread[i]) {
+ pj_thread_join(endpt->thread[i]);
+ pj_thread_destroy(endpt->thread[i]);
+ endpt->thread[i] = NULL;
+ }
+ }
+
+ /* Destroy internal ioqueue */
+ if (endpt->ioqueue && endpt->own_ioqueue) {
+ pj_ioqueue_destroy(endpt->ioqueue);
+ endpt->ioqueue = NULL;
+ }
+
endpt->pf = NULL;
pjmedia_snd_deinit();
@@ -164,6 +228,32 @@ PJ_DEF(pj_status_t) pjmedia_endpt_destroy (pjmedia_endpt *endpt)
return PJ_SUCCESS;
}
+
+/**
+ * Get the ioqueue instance of the media endpoint.
+ */
+PJ_DEF(pj_ioqueue_t*) pjmedia_endpt_get_ioqueue(pjmedia_endpt *endpt)
+{
+ PJ_ASSERT_RETURN(endpt, NULL);
+ return endpt->ioqueue;
+}
+
+
+/**
+ * Worker thread proc.
+ */
+static int PJ_THREAD_FUNC worker_proc(void *arg)
+{
+ pjmedia_endpt *endpt = arg;
+
+ while (!endpt->quit_flag) {
+ pj_time_val timeout = { 0, 500 };
+ pj_ioqueue_poll(endpt->ioqueue, &timeout);
+ }
+
+ return 0;
+}
+
/**
* Create pool.
*/
diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c
index 671e7e59..95d9a7c9 100644
--- a/pjmedia/src/pjmedia/stream.c
+++ b/pjmedia/src/pjmedia/stream.c
@@ -26,6 +26,7 @@
#include <pj/ctype.h>
#include <pj/compat/socket.h>
#include <pj/errno.h>
+#include <pj/ioqueue.h>
#include <pj/log.h>
#include <pj/os.h>
#include <pj/pool.h>
@@ -78,6 +79,9 @@ struct dtmf
*/
struct pjmedia_stream
{
+ pjmedia_endpt *endpt; /**< Media endpoint. */
+ pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */
+
pjmedia_port port; /**< Port interface. */
pjmedia_channel *enc; /**< Encoding channel. */
pjmedia_channel *dec; /**< Decoding channel. */
@@ -86,7 +90,6 @@ struct pjmedia_stream
pjmedia_stream_stat stat; /**< Stream statistics. */
void *user_data; /**< User data. */
- pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */
pjmedia_codec *codec; /**< Codec instance being used. */
pj_size_t frame_size; /**< Size of encoded frame. */
pj_mutex_t *jb_mutex;
@@ -96,13 +99,18 @@ struct pjmedia_stream
pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address. */
pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address. */
- pj_sockaddr_in rem_src_rtp; /**< addr of src pkt from remote*/
- unsigned rem_src_cnt; /**< if different, # of pkt rcv */
pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */
- pj_bool_t quit_flag; /**< To signal thread exit. */
- pj_thread_t *thread; /**< Jitter buffer's thread. */
+ pj_ioqueue_key_t *rtp_key; /**< RTP ioqueue key. */
+ pj_ioqueue_op_key_t rtp_op_key; /**< The pending read op key. */
+ pj_sockaddr_in rtp_src_addr; /**< addr of src pkt from remote*/
+ unsigned rtp_src_cnt; /**< if different, # of pkt rcv */
+ int rtp_addrlen; /**< Address length. */
+
+ pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */
+ pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */
+
/* RFC 2833 DTMF transmission queue: */
int tx_event_pt; /**< Outgoing pt for dtmf. */
@@ -152,12 +160,6 @@ static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame)
pj_status_t status;
struct pjmedia_frame frame_in, frame_out;
- /* Do nothing if we're quitting. */
- if (stream->quit_flag) {
- frame->type = PJMEDIA_FRAME_TYPE_NONE;
- return PJ_SUCCESS;
- }
-
/* Lock jitter buffer mutex */
pj_mutex_lock( stream->jb_mutex );
@@ -271,10 +273,6 @@ static pj_status_t put_frame( pjmedia_port *port,
int rtphdrlen;
pj_ssize_t sent;
- /* Check if stream is quitting. */
- if (stream->quit_flag)
- return -1;
-
/* Number of samples in the frame */
ts_len = frame->size / 2;
@@ -438,77 +436,56 @@ static void handle_incoming_dtmf( pjmedia_stream *stream,
/*
- * This thread will poll the socket for incoming packets, and put
- * the packets to jitter buffer.
+ * This callback is called by ioqueue framework on receipt of packets
+ * in the RTP socket.
*/
-static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg)
+static void on_rx_rtp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+
{
- pjmedia_stream *stream = arg;
+ pjmedia_stream *stream = pj_ioqueue_get_user_data(key);
pjmedia_channel *channel = stream->dec;
+ pj_status_t status;
+
+
+ PJ_UNUSED_ARG(op_key);
- while (!stream->quit_flag) {
- pj_ssize_t len;
+ /*
+ * Loop while we have packet.
+ */
+ do {
const pjmedia_rtp_hdr *hdr;
const void *payload;
unsigned payloadlen;
- int addrlen;
- int status;
-
- /* Wait for packet. */
- pj_fd_set_t fds;
- pj_time_val timeout;
-
- PJ_FD_ZERO (&fds);
- PJ_FD_SET (stream->skinfo.rtp_sock, &fds);
- timeout.sec = 0;
- timeout.msec = 1;
-
- /* Wait with timeout. */
- status = pj_sock_select(FD_SETSIZE, &fds, NULL, NULL, &timeout);
- if (status < 0) {
- TRACE_((THIS_FILE, "Jitter buffer select() error",
- pj_get_netos_error()));
- pj_thread_sleep(500);
- continue;
- } else if (status == 0)
- continue;
-
- /* Get packet from socket. */
- len = channel->in_pkt_size;
- addrlen = sizeof(stream->rem_src_rtp);
- status = pj_sock_recvfrom(stream->skinfo.rtp_sock,
- channel->in_pkt, &len, 0,
- &stream->rem_src_rtp, &addrlen);
- if (len < 1 || status != PJ_SUCCESS) {
- if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) {
- /* On Win2K SP2 (or above) and WinXP, recv() will get
- * WSAECONNRESET when the sending side receives ICMP port
- * unreachable.
- */
- continue;
- }
- pj_thread_sleep(1);
- continue;
- }
- if (channel->paused)
- continue;
+ /* Go straight to read next packet if bytes_read == 0.
+ */
+ if (bytes_read == 0)
+ goto read_next_packet;
+
/* Update RTP and RTCP session. */
- status = pjmedia_rtp_decode_rtp(&channel->rtp, channel->in_pkt, len,
- &hdr, &payload, &payloadlen);
+ status = pjmedia_rtp_decode_rtp(&channel->rtp,
+ channel->in_pkt, bytes_read,
+ &hdr, &payload, &payloadlen);
if (status != PJ_SUCCESS) {
TRACE_((THIS_FILE, "RTP decode error", status));
- continue;
+ goto read_next_packet;
}
+
/* Handle incoming DTMF. */
if (hdr->pt == stream->rx_event_pt) {
handle_incoming_dtmf(stream, payload, payloadlen);
- continue;
+ goto read_next_packet;
}
+
+ /* Update RTP session (also checks if RTP session can accept
+ * the incoming packet.
+ */
status = pjmedia_rtp_session_update(&channel->rtp, hdr);
if (status != 0 &&
status != PJMEDIA_RTP_ESESSPROBATION &&
@@ -518,46 +495,85 @@ static int PJ_THREAD_FUNC jitter_buffer_thread (void*arg)
status));
PJ_LOG(4,(THIS_FILE,"RTP packet detail: pt=%d, seq=%d",
hdr->pt, pj_ntohs(hdr->seq)));
- continue;
+ goto read_next_packet;
}
- pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts));
+
+
+ /* Update the RTCP session. */
+ pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq),
+ pj_ntohl(hdr->ts));
+
/* Update stat */
stream->stat.dec.pkt++;
- stream->stat.dec.bytes += len;
+ stream->stat.dec.bytes += bytes_read;
+
/* See if source address of RTP packet is different than the
* configured address.
*/
if ((stream->rem_rtp_addr.sin_addr.s_addr !=
- stream->rem_src_rtp.sin_addr.s_addr) ||
- (stream->rem_rtp_addr.sin_port != stream->rem_src_rtp.sin_port))
+ stream->rtp_src_addr.sin_addr.s_addr) ||
+ (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port))
{
- stream->rem_src_cnt++;
+ stream->rtp_src_cnt++;
- if (stream->rem_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {
+ if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {
- stream->rem_rtp_addr = stream->rem_src_rtp;
- stream->rem_src_cnt = 0;
+ stream->rem_rtp_addr = stream->rtp_src_addr;
+ stream->rtp_src_cnt = 0;
PJ_LOG(4,(THIS_FILE,"Remote RTP address switched to %s:%d",
- pj_inet_ntoa(stream->rem_src_rtp.sin_addr),
- pj_ntohs(stream->rem_src_rtp.sin_port)));
+ pj_inet_ntoa(stream->rtp_src_addr.sin_addr),
+ pj_ntohs(stream->rtp_src_addr.sin_port)));
}
}
+
/* Put to jitter buffer. */
pj_mutex_lock( stream->jb_mutex );
- status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, pj_ntohs(hdr->seq));
+ status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen,
+ pj_ntohs(hdr->seq));
pj_mutex_unlock( stream->jb_mutex );
if (status != 0) {
TRACE_((THIS_FILE, "Jitter buffer put() error", status));
- continue;
+ goto read_next_packet;
}
+
+read_next_packet:
+ bytes_read = channel->in_pkt_size;
+ stream->rtp_addrlen = sizeof(stream->rtp_src_addr);
+ status = pj_ioqueue_recvfrom( stream->rtp_key,
+ &stream->rtp_op_key,
+ channel->in_pkt,
+ &bytes_read, 0,
+ &stream->rtp_src_addr,
+ &stream->rtp_addrlen);
+
+ } while (status == PJ_SUCCESS);
+
+ if (status != PJ_SUCCESS && status != PJ_EPENDING) {
+ char errmsg[PJ_ERR_MSG_SIZE];
+
+ pj_strerror(status, errmsg, sizeof(errmsg));
+ PJ_LOG(4,(THIS_FILE, "Error reading RTP packet: %s [status=%d]",
+ errmsg, status));
}
+}
+
- return 0;
+/*
+ * This callback is called by ioqueue framework on receipt of packets
+ * in the RTCP socket.
+ */
+static void on_rx_rtcp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ PJ_UNUSED_ARG(key);
+ PJ_UNUSED_ARG(op_key);
+ PJ_UNUSED_ARG(bytes_read);
}
@@ -641,6 +657,7 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
{
pjmedia_stream *stream;
pjmedia_codec_param codec_param;
+ pj_ioqueue_callback ioqueue_cb;
pj_status_t status;
PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL);
@@ -667,10 +684,10 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
/* Init stream: */
-
+ stream->endpt = endpt;
+ stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);
stream->dir = info->dir;
stream->user_data = user_data;
- stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);
stream->skinfo = info->sock_info;
stream->rem_rtp_addr = info->rem_addr;
stream->tx_event_pt = info->tx_event_pt;
@@ -732,15 +749,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
goto err_cleanup;
- /* Create jitter buffer thread: */
-
- status = pj_thread_create(pool, "decode",
- &jitter_buffer_thread, stream,
- 0, PJ_THREAD_SUSPENDED, &stream->thread);
- if (status != PJ_SUCCESS)
- goto err_cleanup;
-
-
/* Create decoder channel: */
status = create_channel( pool, stream, PJMEDIA_DIR_DECODING,
@@ -756,11 +764,44 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
if (status != PJ_SUCCESS)
goto err_cleanup;
- /* Resume jitter buffer thread. */
- status = pj_thread_resume( stream->thread );
+ /* Register RTP socket to ioqueue */
+ pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
+ ioqueue_cb.on_read_complete = &on_rx_rtp;
+
+ status = pj_ioqueue_register_sock( pool,
+ pjmedia_endpt_get_ioqueue(endpt),
+ stream->skinfo.rtp_sock,
+ stream, &ioqueue_cb, &stream->rtp_key);
if (status != PJ_SUCCESS)
goto err_cleanup;
+ /* Init pending operation key. */
+ pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key));
+
+ /* Bootstrap the first recvfrom() operation. */
+ on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0);
+
+
+ /* Register RTCP socket to ioqueue. */
+ if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) {
+ pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
+ ioqueue_cb.on_read_complete = &on_rx_rtcp;
+
+ status = pj_ioqueue_register_sock( pool,
+ pjmedia_endpt_get_ioqueue(endpt),
+ stream->skinfo.rtcp_sock,
+ stream, &ioqueue_cb,
+ &stream->rtcp_key);
+ if (status != PJ_SUCCESS)
+ goto err_cleanup;
+ }
+
+ /* Init pending operation key. */
+ pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key));
+
+ /* Bootstrap the first recvfrom() operation. */
+ on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0);
+
/* Success! */
*p_stream = stream;
return PJ_SUCCESS;
@@ -780,41 +821,20 @@ PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream )
PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL);
- /* Signal threads to quit. */
-
- stream->quit_flag = 1;
-
-
- /* Close encoding sound stream. */
-
- /*
- if (stream->enc && stream->enc->snd_stream) {
-
- pjmedia_snd_stream_stop(stream->enc->snd_stream);
- pjmedia_snd_stream_close(stream->enc->snd_stream);
- stream->enc->snd_stream = NULL;
-
- }
- */
-
- /* Close decoding sound stream. */
- /*
- if (stream->dec && stream->dec->snd_stream) {
+ /* This function may be called when stream is partly initialized. */
+ if (stream->jb_mutex)
+ pj_mutex_lock(stream->jb_mutex);
- pjmedia_snd_stream_stop(stream->dec->snd_stream);
- pjmedia_snd_stream_close(stream->dec->snd_stream);
- stream->dec->snd_stream = NULL;
+ /* Unregister from ioqueue. */
+ if (stream->rtp_key) {
+ pj_ioqueue_unregister(stream->rtp_key);
+ stream->rtp_key = NULL;
}
- */
-
- /* Wait for jitter buffer thread to quit: */
-
- if (stream->thread) {
- pj_thread_join(stream->thread);
- pj_thread_destroy(stream->thread);
- stream->thread = NULL;
+ if (stream->rtcp_key) {
+ pj_ioqueue_unregister(stream->rtcp_key);
+ stream->rtcp_key = NULL;
}
/* Free codec. */
diff --git a/pjsip-apps/build/samples.dsp b/pjsip-apps/build/samples.dsp
index acc4663f..662ca0ad 100644
--- a/pjsip-apps/build/samples.dsp
+++ b/pjsip-apps/build/samples.dsp
@@ -41,9 +41,9 @@ CFG=samples - Win32 Debug
# PROP Use_Debug_Libraries 0
# PROP Output_Dir "./output/samples-i386-win32-vc6-release"
# PROP Intermediate_Dir "./output/samples-i386-win32-vc6-release"
-# PROP Cmd_Line "nmake /f Samples-vc.mak BUILD_MODE=release"
+# PROP Cmd_Line "nmake /NOLOGO /S /f Samples-vc.mak BUILD_MODE=release"
# PROP Rebuild_Opt "/a"
-# PROP Target_File "All samples built successfully"
+# PROP Target_File "All samples"
# PROP Bsc_Name ""
# PROP Target_Dir ""
@@ -64,7 +64,7 @@ CFG=samples - Win32 Debug
# PROP Intermediate_Dir "./output/samples-i386-win32-vc6-debug"
# PROP Cmd_Line "nmake /NOLOGO /S /f Samples-vc.mak BUILD_MODE=debug"
# PROP Rebuild_Opt "/a"
-# PROP Target_File "All samples built successfully"
+# PROP Target_File "All samples"
# PROP Bsc_Name ""
# PROP Target_Dir ""
diff --git a/pjsip-apps/src/pjsip-perf/main.c b/pjsip-apps/src/pjsip-perf/main.c
index 0bf0d3a8..f28e6b25 100644
--- a/pjsip-apps/src/pjsip-perf/main.c
+++ b/pjsip-apps/src/pjsip-perf/main.c
@@ -258,7 +258,9 @@ static pj_status_t initialize(void)
pjsip_xfer_init_module( settings.endpt );
/* Init multimedia endpoint. */
- status = pjmedia_endpt_create(&settings.cp.factory, &settings.med_endpt);
+ status = pjmedia_endpt_create(&settings.cp.factory,
+ pjsip_endpt_get_ioqueue(settings.endpt), 0,
+ &settings.med_endpt);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "Unable to create media endpoint",
status);
diff --git a/pjsip-apps/src/samples/confsample.c b/pjsip-apps/src/samples/confsample.c
index 514bd028..553f5701 100644
--- a/pjsip-apps/src/samples/confsample.c
+++ b/pjsip-apps/src/samples/confsample.c
@@ -158,7 +158,7 @@ int main(int argc, char *argv[])
* Initialize media endpoint.
* This will implicitly initialize PJMEDIA too.
*/
- status = pjmedia_endpt_create(&cp.factory, &med_endpt);
+ status = pjmedia_endpt_create(&cp.factory, NULL, 1, &med_endpt);
PJ_ASSERT_RETURN(status == PJ_SUCCESS, 1);
/* Create memory pool to allocate memory */
diff --git a/pjsip-apps/src/samples/level.c b/pjsip-apps/src/samples/level.c
index 034c695e..8723d62f 100644
--- a/pjsip-apps/src/samples/level.c
+++ b/pjsip-apps/src/samples/level.c
@@ -84,7 +84,7 @@ int main(int argc, char *argv[])
* Initialize media endpoint.
* This will implicitly initialize PJMEDIA too.
*/
- status = pjmedia_endpt_create(&cp.factory, &med_endpt);
+ status = pjmedia_endpt_create(&cp.factory, NULL, 1, &med_endpt);
PJ_ASSERT_RETURN(status == PJ_SUCCESS, 1);
/* Create memory pool for our file player */
diff --git a/pjsip-apps/src/samples/playfile.c b/pjsip-apps/src/samples/playfile.c
index 912feea5..a703bc57 100644
--- a/pjsip-apps/src/samples/playfile.c
+++ b/pjsip-apps/src/samples/playfile.c
@@ -91,7 +91,7 @@ int main(int argc, char *argv[])
* Initialize media endpoint.
* This will implicitly initialize PJMEDIA too.
*/
- status = pjmedia_endpt_create(&cp.factory, &med_endpt);
+ status = pjmedia_endpt_create(&cp.factory, NULL, 1, &med_endpt);
PJ_ASSERT_RETURN(status == PJ_SUCCESS, 1);
/* Create memory pool for our file player */
diff --git a/pjsip-apps/src/samples/playsine.c b/pjsip-apps/src/samples/playsine.c
index 1040d8fd..2e83221d 100644
--- a/pjsip-apps/src/samples/playsine.c
+++ b/pjsip-apps/src/samples/playsine.c
@@ -203,7 +203,7 @@ int main(int argc, char *argv[])
* Initialize media endpoint.
* This will implicitly initialize PJMEDIA too.
*/
- status = pjmedia_endpt_create(&cp.factory, &med_endpt);
+ status = pjmedia_endpt_create(&cp.factory, NULL, 1, &med_endpt);
PJ_ASSERT_RETURN(status == PJ_SUCCESS, 1);
/* Create memory pool for our sine generator */
diff --git a/pjsip-apps/src/samples/simpleua.c b/pjsip-apps/src/samples/simpleua.c
index df4337d9..bed26711 100644
--- a/pjsip-apps/src/samples/simpleua.c
+++ b/pjsip-apps/src/samples/simpleua.c
@@ -259,7 +259,7 @@ int main(int argc, char *argv[])
* Initialize media endpoint.
* This will implicitly initialize PJMEDIA too.
*/
- status = pjmedia_endpt_create(&cp.factory, &g_med_endpt);
+ status = pjmedia_endpt_create(&cp.factory, NULL, 1, &g_med_endpt);
PJ_ASSERT_RETURN(status == PJ_SUCCESS, 1);
/*
diff --git a/pjsip-apps/src/samples/sndinfo.c b/pjsip-apps/src/samples/sndinfo.c
index efd761c2..862c19e8 100644
--- a/pjsip-apps/src/samples/sndinfo.c
+++ b/pjsip-apps/src/samples/sndinfo.c
@@ -142,7 +142,7 @@ int main(int argc, char *argv[])
* Initialize media endpoint.
* This will implicitly initialize PJMEDIA too.
*/
- status = pjmedia_endpt_create(&cp.factory, &med_endpt);
+ status = pjmedia_endpt_create(&cp.factory, NULL, 1, &med_endpt);
PJ_ASSERT_RETURN(status == PJ_SUCCESS, 1);
diff --git a/pjsip/include/pjsua-lib/pjsua.h b/pjsip/include/pjsua-lib/pjsua.h
index c5594f4c..d5f447ae 100644
--- a/pjsip/include/pjsua-lib/pjsua.h
+++ b/pjsip/include/pjsua-lib/pjsua.h
@@ -98,7 +98,6 @@ struct pjsua_call
pjsip_evsub *xfer_sub; /**< Xfer server subscription, if this
call was triggered by xfer. */
pjmedia_sock_info skinfo; /**< Preallocated media sockets. */
-
void *app_data; /**< Application data. */
pj_timer_entry refresh_tm;/**< Timer to send re-INVITE. */
pj_timer_entry hangup_tm; /**< Timer to hangup call. */
diff --git a/pjsip/src/pjsua-lib/pjsua_call.c b/pjsip/src/pjsua-lib/pjsua_call.c
index ea847464..b01b968d 100644
--- a/pjsip/src/pjsua-lib/pjsua_call.c
+++ b/pjsip/src/pjsua-lib/pjsua_call.c
@@ -97,6 +97,73 @@ static void schedule_call_timer( pjsua_call *call, pj_timer_entry *e,
}
+/* Close and reopen socket. */
+static pj_status_t reopen_sock( pj_sock_t *sock)
+{
+ pj_sockaddr_in addr;
+ int addrlen;
+ pj_status_t status;
+
+ addrlen = sizeof(pj_sockaddr_in);
+ status = pj_sock_getsockname(*sock, &addr, &addrlen);
+ if (status != PJ_SUCCESS) {
+ pjsua_perror(THIS_FILE, "Error getting RTP/RTCP socket name", status);
+ return status;
+ }
+
+ pj_sock_close(*sock);
+
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, sock);
+ if (status != PJ_SUCCESS) {
+ pjsua_perror(THIS_FILE, "Unable to create socket", status);
+ return status;
+ }
+
+ status = pj_sock_bind(*sock, &addr, sizeof(pj_sockaddr_in));
+ if (status != PJ_SUCCESS) {
+ pjsua_perror(THIS_FILE, "Unable to re-bind RTP/RTCP socket", status);
+ return status;
+ }
+
+ return PJ_SUCCESS;
+}
+
+/*
+ * Destroy the call's media
+ */
+static pj_status_t call_destroy_media(int call_index)
+{
+ pjsua_call *call = &pjsua.calls[call_index];
+
+ if (call->conf_slot > 0) {
+ pjmedia_conf_remove_port(pjsua.mconf, call->conf_slot);
+ call->conf_slot = 0;
+ }
+
+ if (call->session) {
+
+ /* Close and reopen RTP socket.
+ * This is necessary to get the socket unregistered from ioqueue,
+ * when IOCompletionPort is used.
+ */
+ reopen_sock(&call->skinfo.rtp_sock);
+
+ /* Close and reopen RTCP socket too. */
+ reopen_sock(&call->skinfo.rtcp_sock);
+
+ /* Must destroy session after socket is closed. */
+ pjmedia_session_destroy(call->session);
+ call->session = NULL;
+
+ }
+
+ PJ_LOG(3,(THIS_FILE, "Media session for call %d is destroyed",
+ call_index));
+
+ return PJ_SUCCESS;
+}
+
+
/**
* Make outgoing call.
*/
@@ -494,13 +561,8 @@ static void pjsua_call_on_state_changed(pjsip_inv_session *inv,
pj_assert(call != NULL);
- if (call && call->session) {
- pjmedia_conf_remove_port(pjsua.mconf, call->conf_slot);
- pjmedia_session_destroy(call->session);
- call->session = NULL;
-
- PJ_LOG(3,(THIS_FILE,"Media session is destroyed"));
- }
+ if (call)
+ call_destroy_media(call->index);
/* Remove timers. */
schedule_call_timer(call, &call->refresh_tm, REFRESH_CALL_TIMER, 0);
@@ -878,11 +940,8 @@ static void pjsua_call_on_media_update(pjsip_inv_session *inv,
/* Destroy existing media session, if any. */
- if (call && call->session) {
- pjmedia_conf_remove_port(pjsua.mconf, call->conf_slot);
- pjmedia_session_destroy(call->session);
- call->session = NULL;
- }
+ if (call)
+ call_destroy_media(call->index);
/* Get local and remote SDP */
@@ -947,8 +1006,7 @@ static void pjsua_call_on_media_update(pjsip_inv_session *inv,
if (status != PJ_SUCCESS) {
pjsua_perror(THIS_FILE, "Unable to create conference slot",
status);
- pjmedia_session_destroy(call->session);
- call->session = NULL;
+ call_destroy_media(call->index);
//call_disconnect(inv, PJSIP_SC_INTERNAL_SERVER_ERROR);
return;
}
diff --git a/pjsip/src/pjsua-lib/pjsua_core.c b/pjsip/src/pjsua-lib/pjsua_core.c
index fdf66c8f..1ff4fac2 100644
--- a/pjsip/src/pjsua-lib/pjsua_core.c
+++ b/pjsip/src/pjsua-lib/pjsua_core.c
@@ -104,6 +104,7 @@ void pjsua_default(void)
pjsua.calls[i].index = i;
pjsua.calls[i].refresh_tm._timer_id = -1;
pjsua.calls[i].hangup_tm._timer_id = -1;
+ pjsua.calls[i].conf_slot = 0;
}
/* Default max nb of calls. */
@@ -536,7 +537,9 @@ pj_status_t pjsua_init(void)
/* Init media endpoint: */
- status = pjmedia_endpt_create(&pjsua.cp.factory, &pjsua.med_endpt);
+ status = pjmedia_endpt_create(&pjsua.cp.factory,
+ pjsip_endpt_get_ioqueue(pjsua.endpt), 0,
+ &pjsua.med_endpt);
if (status != PJ_SUCCESS) {
pj_caching_pool_destroy(&pjsua.cp);
pjsua_perror(THIS_FILE,