summaryrefslogtreecommitdiff
path: root/pjmedia/src/pjmedia/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjmedia/src/pjmedia/stream.c')
-rw-r--r--pjmedia/src/pjmedia/stream.c1288
1 files changed, 644 insertions, 644 deletions
diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c
index 1a7080a4..718cace1 100644
--- a/pjmedia/src/pjmedia/stream.c
+++ b/pjmedia/src/pjmedia/stream.c
@@ -1,644 +1,644 @@
-/* $Id$ */
-/*
- * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
-#include <pjmedia/stream.h>
-#include <pjmedia/rtp.h>
-#include <pjmedia/rtcp.h>
-#include <pjmedia/jbuf.h>
-#include <pj/os.h>
-#include <pj/log.h>
-#include <pj/string.h> /* memcpy() */
-#include <pj/pool.h>
-#include <stdlib.h>
-
-#define THISFILE "stream.c"
-#define ERRLEVEL 1
-
-#define PJ_MAX_FRAME_DURATION_MS 200
-#define PJ_MAX_BUFFER_SIZE_MS 2000
-#define PJ_MAX_MTU 1500
-
-struct jb_frame
-{
- unsigned size;
- void *buf;
-};
-
-#define pj_fifobuf_alloc(fifo,size) malloc(size)
-#define pj_fifobuf_unalloc(fifo,buf) free(buf)
-#define pj_fifobuf_free(fifo, buf) free(buf)
-
-enum stream_state
-{
- STREAM_STOPPED,
- STREAM_STARTED,
-};
-
-struct pj_media_stream_t
-{
- pj_media_dir_t dir;
- int pt;
- int state;
- pj_media_stream_stat stat;
- pj_media_stream_t *peer;
- pj_snd_stream_info snd_info;
- pj_snd_stream *snd_stream;
- pj_mutex_t *mutex;
- unsigned in_pkt_size;
- void *in_pkt;
- unsigned out_pkt_size;
- void *out_pkt;
- unsigned pcm_buf_size;
- void *pcm_buf;
- //pj_fifobuf_t fifobuf;
- pj_codec_mgr *codec_mgr;
- pj_codec *codec;
- pj_rtp_session rtp;
- pj_rtcp_session *rtcp;
- pj_jitter_buffer *jb;
- pj_sock_t rtp_sock;
- pj_sock_t rtcp_sock;
- pj_sockaddr_in dst_addr;
- pj_thread_t *transport_thread;
- int thread_quit_flag;
-};
-
-
-static pj_status_t play_callback(/* in */ void *user_data,
- /* in */ pj_uint32_t timestamp,
- /* out */ void *frame,
- /*inout*/ unsigned size)
-{
- pj_media_stream_t *channel = user_data;
- struct jb_frame *jb_frame;
- void *p;
- pj_uint32_t extseq;
- pj_status_t status;
- struct pj_audio_frame frame_in, frame_out;
-
- PJ_UNUSED_ARG(timestamp)
-
- /* Lock mutex */
- pj_mutex_lock (channel->mutex);
-
- if (!channel->codec) {
- pj_mutex_unlock (channel->mutex);
- return -1;
- }
-
- /* Get frame from jitter buffer. */
- status = pj_jb_get (channel->jb, &extseq, &p);
- jb_frame = p;
- if (status != 0 || jb_frame == NULL) {
- pj_memset(frame, 0, size);
- pj_mutex_unlock(channel->mutex);
- return 0;
- }
-
- /* Decode */
- frame_in.buf = jb_frame->buf;
- frame_in.size = jb_frame->size;
- frame_in.type = PJ_AUDIO_FRAME_AUDIO; /* ignored */
- frame_out.buf = channel->pcm_buf;
- status = channel->codec->op->decode (channel->codec, &frame_in,
- channel->pcm_buf_size, &frame_out);
- if (status != 0) {
- PJ_LOG(3, (THISFILE, "decode() has return error status %d",
- status));
-
- pj_memset(frame, 0, size);
- pj_fifobuf_free (&channel->fifobuf, jb_frame);
- pj_mutex_unlock(channel->mutex);
- return 0;
- }
-
- /* Put in sound buffer. */
- if (frame_out.size > size) {
- PJ_LOG(3, (THISFILE, "Sound playout buffer truncated %d bytes",
- frame_out.size - size));
- frame_out.size = size;
- }
-
- pj_memcpy(frame, frame_out.buf, size);
-
- pj_fifobuf_free (&channel->fifobuf, jb_frame);
- pj_mutex_unlock(channel->mutex);
- return 0;
-}
-
-static pj_status_t rec_callback( /* in */ void *user_data,
- /* in */ pj_uint32_t timestamp,
- /* in */ const void *frame,
- /* in */ unsigned size)
-{
- pj_media_stream_t *channel = user_data;
- pj_status_t status = 0;
- struct pj_audio_frame frame_in, frame_out;
- int ts_len;
- void *rtphdr;
- int rtphdrlen;
- int sent;
-#if 0
- static FILE *fhnd = NULL;
-#endif
-
- PJ_UNUSED_ARG(timestamp)
-
- /* Start locking channel mutex */
- pj_mutex_lock (channel->mutex);
-
- if (!channel->codec) {
- status = -1;
- goto on_return;
- }
-
- /* Encode. */
- frame_in.type = PJ_MEDIA_TYPE_AUDIO;
- frame_in.buf = (void*)frame;
- frame_in.size = size;
- frame_out.buf = ((char*)channel->out_pkt) + sizeof(pj_rtp_hdr);
- status = channel->codec->op->encode (channel->codec, &frame_in,
- channel->out_pkt_size - sizeof(pj_rtp_hdr),
- &frame_out);
- if (status != 0) {
- PJ_LOG(3,(THISFILE, "Codec encode() has returned error status %d",
- status));
- goto on_return;
- }
-
- /* Encapsulate. */
- ts_len = size / (channel->snd_info.bits_per_sample / 8);
- status = pj_rtp_encode_rtp (&channel->rtp, channel->pt, 0,
- frame_out.size, ts_len,
- (const void**)&rtphdr, &rtphdrlen);
- if (status != 0) {
- PJ_LOG(3,(THISFILE, "RTP encode_rtp() has returned error status %d",
- status));
- goto on_return;
- }
-
- if (rtphdrlen != sizeof(pj_rtp_hdr)) {
- /* We don't support RTP with extended header yet. */
- PJ_TODO(SUPPORT_SENDING_RTP_WITH_EXTENDED_HEADER);
- PJ_LOG(3,(THISFILE, "Unsupported extended RTP header for transmission"));
- goto on_return;
- }
-
- pj_memcpy(channel->out_pkt, rtphdr, sizeof(pj_rtp_hdr));
-
- /* Send. */
- sent = pj_sock_sendto (channel->rtp_sock, channel->out_pkt, frame_out.size+sizeof(pj_rtp_hdr), 0,
- &channel->dst_addr, sizeof(channel->dst_addr));
- if (sent != (int)frame_out.size + (int)sizeof(pj_rtp_hdr)) {
- pj_perror(THISFILE, "Error sending RTP packet to %s:%d",
- pj_sockaddr_get_str_addr(&channel->dst_addr),
- pj_sockaddr_get_port(&channel->dst_addr));
- goto on_return;
- }
-
- /* Update stat */
- channel->stat.pkt_tx++;
- channel->stat.oct_tx += frame_out.size+sizeof(pj_rtp_hdr);
-
-#if 0
- if (fhnd == NULL) {
- fhnd = fopen("RTP.DAT", "wb");
- if (fhnd) {
- fwrite (channel->out_pkt, frame_out.size+sizeof(pj_rtp_hdr), 1, fhnd);
- fclose(fhnd);
- }
- }
-#endif
-
-on_return:
- pj_mutex_unlock (channel->mutex);
- return status;
-}
-
-
-static void* PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg)
-{
- pj_media_stream_t *channel = arg;
-
- while (!channel->thread_quit_flag) {
- int len, size;
- const pj_rtp_hdr *hdr;
- const void *payload;
- unsigned payloadlen;
- int status;
- struct jb_frame *jb_frame;
-
- /* Wait for packet. */
- fd_set fds;
- pj_time_val timeout;
-
- PJ_FD_ZERO (&fds);
- PJ_FD_SET (channel->rtp_sock, &fds);
- timeout.sec = 0;
- timeout.msec = 100;
-
- /* Wait with timeout. */
- status = pj_sock_select(channel->rtp_sock, &fds, NULL, NULL, &timeout);
- if (status != 1)
- continue;
-
- /* Get packet from socket. */
- len = pj_sock_recv (channel->rtp_sock, channel->in_pkt, channel->in_pkt_size, 0);
- if (len < 1) {
- if (pj_getlasterror() == PJ_ECONNRESET) {
- /* On Win2K SP2 (or above) and WinXP, recv() will get WSAECONNRESET
- when the sending side receives ICMP port unreachable.
- */
- continue;
- }
- pj_perror(THISFILE, "Error receiving packet from socket (len=%d)", len);
- pj_thread_sleep(1);
- continue;
- }
-
- if (channel->state != STREAM_STARTED)
- continue;
-
- if (channel->thread_quit_flag)
- break;
-
- /* Start locking the channel. */
- pj_mutex_lock (channel->mutex);
-
- /* Update RTP and RTCP session. */
- status = pj_rtp_decode_rtp (&channel->rtp, channel->in_pkt, len, &hdr, &payload, &payloadlen);
- if (status != 0) {
- pj_mutex_unlock (channel->mutex);
- PJ_LOG(4,(THISFILE, "RTP decode_rtp() has returned error status %d", status));
- continue;
- }
- status = pj_rtp_session_update (&channel->rtp, hdr);
- if (status != 0 && status != PJ_RTP_ERR_SESSION_PROBATION && status != PJ_RTP_ERR_SESSION_RESTARTED) {
- pj_mutex_unlock (channel->mutex);
- PJ_LOG(4,(THISFILE, "RTP session_update() has returned error status %d", status));
- continue;
- }
- pj_rtcp_rx_rtp (channel->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts));
-
- /* Update stat */
- channel->stat.pkt_rx++;
- channel->stat.oct_rx += len;
-
- /* Copy to FIFO buffer. */
- size = payloadlen+sizeof(struct jb_frame);
- jb_frame = pj_fifobuf_alloc (&channel->fifobuf, size);
- if (jb_frame == NULL) {
- pj_mutex_unlock (channel->mutex);
- PJ_LOG(4,(THISFILE, "Unable to allocate %d bytes FIFO buffer", size));
- continue;
- }
-
- /* Copy the payload */
- jb_frame->size = payloadlen;
- jb_frame->buf = ((char*)jb_frame) + sizeof(struct jb_frame);
- pj_memcpy (jb_frame->buf, payload, payloadlen);
-
- /* Put to jitter buffer. */
- status = pj_jb_put (channel->jb, pj_ntohs(hdr->seq), jb_frame);
- if (status != 0) {
- pj_fifobuf_unalloc (&channel->fifobuf, jb_frame);
- pj_mutex_unlock (channel->mutex);
- PJ_LOG(4,(THISFILE, "Jitter buffer put() has returned error status %d", status));
- continue;
- }
-
- pj_mutex_unlock (channel->mutex);
- }
-
- return NULL;
-}
-
-static void init_snd_param_from_codec_attr (pj_snd_stream_info *param,
- const pj_codec_attr *attr)
-{
- param->bits_per_sample = attr->pcm_bits_per_sample;
- param->bytes_per_frame = 2;
- param->frames_per_packet = attr->sample_rate * attr->ptime / 1000;
- param->samples_per_frame = 1;
- param->samples_per_sec = attr->sample_rate;
-}
-
-static pj_media_stream_t *create_channel ( pj_pool_t *pool,
- pj_media_dir_t dir,
- pj_media_stream_t *peer,
- pj_codec_id *codec_id,
- pj_media_stream_create_param *param)
-{
- pj_media_stream_t *channel;
- pj_codec_attr codec_attr;
- void *ptr;
- unsigned size;
- int status;
-
- /* Allocate memory for channel descriptor */
- size = sizeof(pj_media_stream_t);
- channel = pj_pool_calloc(pool, 1, size);
- if (!channel) {
- PJ_LOG(1,(THISFILE, "Unable to allocate %u bytes channel descriptor",
- size));
- return NULL;
- }
-
- channel->dir = dir;
- channel->pt = codec_id->pt;
- channel->peer = peer;
- channel->codec_mgr = pj_med_mgr_get_codec_mgr (param->mediamgr);
- channel->rtp_sock = param->rtp_sock;
- channel->rtcp_sock = param->rtcp_sock;
- channel->dst_addr = *param->remote_addr;
- channel->state = STREAM_STOPPED;
-
- /* Create mutex for the channel. */
- channel->mutex = pj_mutex_create(pool, NULL, PJ_MUTEX_SIMPLE);
- if (channel->mutex == NULL)
- goto err_cleanup;
-
- /* Create and initialize codec, only if peer is not present.
- We only use one codec instance for both encoder and decoder.
- */
- if (peer && peer->codec) {
- channel->codec = peer->codec;
- status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id,
- &codec_attr);
- if (status != 0) {
- goto err_cleanup;
- }
-
- } else {
- channel->codec = pj_codec_mgr_alloc_codec(channel->codec_mgr, codec_id);
- if (channel->codec == NULL) {
- goto err_cleanup;
- }
-
- status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id,
- &codec_attr);
- if (status != 0) {
- goto err_cleanup;
- }
-
- codec_attr.pt = codec_id->pt;
- status = channel->codec->op->open(channel->codec, &codec_attr);
- if (status != 0) {
- goto err_cleanup;
- }
- }
-
- /* Allocate buffer for incoming packet. */
- channel->in_pkt_size = PJ_MAX_MTU;
- channel->in_pkt = pj_pool_alloc(pool, channel->in_pkt_size);
- if (!channel->in_pkt) {
- PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes incoming packet buffer",
- channel->in_pkt_size));
- goto err_cleanup;
- }
-
- /* Allocate buffer for outgoing packet. */
- channel->out_pkt_size = sizeof(pj_rtp_hdr) +
- codec_attr.avg_bps / 8 * PJ_MAX_FRAME_DURATION_MS / 1000;
- if (channel->out_pkt_size > PJ_MAX_MTU)
- channel->out_pkt_size = PJ_MAX_MTU;
- channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size);
- if (!channel->out_pkt) {
- PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes encoding buffer",
- channel->out_pkt_size));
- goto err_cleanup;
- }
-
- /* Allocate buffer for decoding to PCM */
- channel->pcm_buf_size = codec_attr.sample_rate *
- codec_attr.pcm_bits_per_sample / 8 *
- PJ_MAX_FRAME_DURATION_MS / 1000;
- channel->pcm_buf = pj_pool_alloc (pool, channel->pcm_buf_size);
- if (!channel->pcm_buf) {
- PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes PCM buffer",
- channel->pcm_buf_size));
- goto err_cleanup;
- }
-
- /* Allocate buffer for frames put in jitter buffer. */
- size = codec_attr.avg_bps / 8 * PJ_MAX_BUFFER_SIZE_MS / 1000;
- ptr = pj_pool_alloc(pool, size);
- if (!ptr) {
- PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes jitter buffer",
- channel->pcm_buf_size));
- goto err_cleanup;
- }
- //pj_fifobuf_init (&channel->fifobuf, ptr, size);
-
- /* Create and initialize sound device */
- init_snd_param_from_codec_attr (&channel->snd_info, &codec_attr);
-
- if (dir == PJ_MEDIA_DIR_ENCODING)
- channel->snd_stream = pj_snd_open_recorder(-1, &channel->snd_info,
- &rec_callback, channel);
- else
- channel->snd_stream = pj_snd_open_player(-1, &channel->snd_info,
- &play_callback, channel);
-
- if (!channel->snd_stream)
- goto err_cleanup;
-
- /* Create RTP and RTCP sessions. */
- if (pj_rtp_session_init(&channel->rtp, codec_id->pt, param->ssrc) != 0) {
- PJ_LOG(1, (THISFILE, "RTP session initialization error"));
- goto err_cleanup;
- }
-
- /* For decoder, create RTCP session, jitter buffer, and transport thread. */
- if (dir == PJ_MEDIA_DIR_DECODING) {
- channel->rtcp = pj_pool_calloc(pool, 1, sizeof(pj_rtcp_session));
- if (!channel->rtcp) {
- PJ_LOG(1, (THISFILE, "Unable to allocate RTCP session"));
- goto err_cleanup;
- }
-
- pj_rtcp_init(channel->rtcp, param->ssrc);
-
- channel->jb = pj_pool_calloc(pool, 1, sizeof(pj_jitter_buffer));
- if (!channel->jb) {
- PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer descriptor"));
- goto err_cleanup;
- }
- if (pj_jb_init(channel->jb, pool, param->jb_min, param->jb_max, param->jb_maxcnt)) {
- PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer"));
- goto err_cleanup;
- }
-
- channel->transport_thread = pj_thread_create(pool, "decode",
- &stream_decoder_transport_thread, channel,
- 0, NULL, 0);
- if (!channel->transport_thread) {
- pj_perror(THISFILE, "Unable to create transport thread");
- goto err_cleanup;
- }
- }
-
- /* Done. */
- return channel;
-
-err_cleanup:
- pj_media_stream_destroy(channel);
- return NULL;
-}
-
-
-PJ_DEF(pj_status_t) pj_media_stream_create (pj_pool_t *pool,
- pj_media_stream_t **enc_stream,
- pj_media_stream_t **dec_stream,
- pj_media_stream_create_param *param)
-{
- *dec_stream = *enc_stream = NULL;
-
- if (param->dir & PJ_MEDIA_DIR_DECODING) {
- *dec_stream =
- create_channel(pool, PJ_MEDIA_DIR_DECODING, NULL, param->codec_id, param);
- if (!*dec_stream)
- return -1;
- }
-
- if (param->dir & PJ_MEDIA_DIR_ENCODING) {
- *enc_stream =
- create_channel(pool, PJ_MEDIA_DIR_ENCODING, *dec_stream, param->codec_id, param);
- if (!*enc_stream) {
- if (*dec_stream) {
- pj_media_stream_destroy(*dec_stream);
- *dec_stream = NULL;
- }
- return -1;
- }
-
- if (*dec_stream) {
- (*dec_stream)->peer = *enc_stream;
- }
- }
-
- return 0;
-}
-
-PJ_DEF(pj_status_t) pj_media_stream_start (pj_media_stream_t *channel)
-{
- pj_status_t status;
-
- status = pj_snd_stream_start(channel->snd_stream);
-
- if (status == 0)
- channel->state = STREAM_STARTED;
- return status;
-}
-
-PJ_DEF(pj_status_t) pj_media_stream_get_stat (const pj_media_stream_t *stream,
- pj_media_stream_stat *stat)
-{
- if (stream->dir == PJ_MEDIA_DIR_ENCODING) {
- pj_memcpy (stat, &stream->stat, sizeof(*stat));
- } else {
- pj_rtcp_pkt *rtcp_pkt;
- int len;
-
- pj_memset (stat, 0, sizeof(*stat));
- pj_assert (stream->rtcp != 0);
- pj_rtcp_build_rtcp (stream->rtcp, &rtcp_pkt, &len);
-
- stat->pkt_rx = stream->stat.pkt_rx;
- stat->oct_rx = stream->stat.oct_rx;
-
- PJ_TODO(SUPPORT_JITTER_CALCULATION_FOR_NON_8KHZ_SAMPLE_RATE)
- stat->jitter = pj_ntohl(rtcp_pkt->rr.jitter) / 8;
- stat->pkt_lost = (rtcp_pkt->rr.total_lost_2 << 16) +
- (rtcp_pkt->rr.total_lost_1 << 8) +
- rtcp_pkt->rr.total_lost_0;
- }
- return 0;
-}
-
-PJ_DEF(pj_status_t) pj_media_stream_pause (pj_media_stream_t *channel)
-{
- PJ_UNUSED_ARG(channel)
- return -1;
-}
-
-PJ_DEF(pj_status_t) pj_media_stream_resume (pj_media_stream_t *channel)
-{
- PJ_UNUSED_ARG(channel)
- return -1;
-}
-
-PJ_DEF(pj_status_t) pj_media_stream_destroy (pj_media_stream_t *channel)
-{
- channel->thread_quit_flag = 1;
-
- pj_mutex_lock (channel->mutex);
- if (channel->peer)
- pj_mutex_lock (channel->peer->mutex);
-
- if (channel->jb) {
- /* No need to deinitialize jitter buffer. */
- }
- if (channel->transport_thread) {
- pj_thread_join(channel->transport_thread);
- pj_thread_destroy(channel->transport_thread);
- channel->transport_thread = NULL;
- }
- if (channel->snd_stream != NULL) {
- pj_mutex_unlock (channel->mutex);
- pj_snd_stream_stop(channel->snd_stream);
- pj_mutex_lock (channel->mutex);
- pj_snd_stream_close(channel->snd_stream);
- channel->snd_stream = NULL;
- }
- if (channel->codec) {
- channel->codec->op->close(channel->codec);
- pj_codec_mgr_dealloc_codec(channel->codec_mgr, channel->codec);
- channel->codec = NULL;
- }
- if (channel->peer) {
- pj_media_stream_t *peer = channel->peer;
- peer->peer = NULL;
- peer->codec = NULL;
- peer->thread_quit_flag = 1;
- if (peer->transport_thread) {
- pj_mutex_unlock (peer->mutex);
- pj_thread_join(peer->transport_thread);
- pj_mutex_lock (peer->mutex);
- pj_thread_destroy(peer->transport_thread);
- peer->transport_thread = NULL;
- }
- if (peer->snd_stream) {
- pj_mutex_unlock (peer->mutex);
- pj_snd_stream_stop(peer->snd_stream);
- pj_mutex_lock (peer->mutex);
- pj_snd_stream_close(peer->snd_stream);
- peer->snd_stream = NULL;
- }
- }
-
- channel->state = STREAM_STOPPED;
-
- if (channel->peer)
- pj_mutex_unlock (channel->peer->mutex);
- pj_mutex_unlock(channel->mutex);
- pj_mutex_destroy(channel->mutex);
-
- return 0;
-}
-
+/* $Id$ */
+/*
+ * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pjmedia/stream.h>
+#include <pjmedia/rtp.h>
+#include <pjmedia/rtcp.h>
+#include <pjmedia/jbuf.h>
+#include <pj/os.h>
+#include <pj/log.h>
+#include <pj/string.h> /* memcpy() */
+#include <pj/pool.h>
+#include <stdlib.h>
+
+#define THISFILE "stream.c"
+#define ERRLEVEL 1
+
+#define PJ_MAX_FRAME_DURATION_MS 200
+#define PJ_MAX_BUFFER_SIZE_MS 2000
+#define PJ_MAX_MTU 1500
+
+struct jb_frame
+{
+ unsigned size;
+ void *buf;
+};
+
+#define pj_fifobuf_alloc(fifo,size) malloc(size)
+#define pj_fifobuf_unalloc(fifo,buf) free(buf)
+#define pj_fifobuf_free(fifo, buf) free(buf)
+
+enum stream_state
+{
+ STREAM_STOPPED,
+ STREAM_STARTED,
+};
+
+struct pj_media_stream_t
+{
+ pj_media_dir_t dir;
+ int pt;
+ int state;
+ pj_media_stream_stat stat;
+ pj_media_stream_t *peer;
+ pj_snd_stream_info snd_info;
+ pj_snd_stream *snd_stream;
+ pj_mutex_t *mutex;
+ unsigned in_pkt_size;
+ void *in_pkt;
+ unsigned out_pkt_size;
+ void *out_pkt;
+ unsigned pcm_buf_size;
+ void *pcm_buf;
+ //pj_fifobuf_t fifobuf;
+ pj_codec_mgr *codec_mgr;
+ pj_codec *codec;
+ pj_rtp_session rtp;
+ pj_rtcp_session *rtcp;
+ pj_jitter_buffer *jb;
+ pj_sock_t rtp_sock;
+ pj_sock_t rtcp_sock;
+ pj_sockaddr_in dst_addr;
+ pj_thread_t *transport_thread;
+ int thread_quit_flag;
+};
+
+
+static pj_status_t play_callback(/* in */ void *user_data,
+ /* in */ pj_uint32_t timestamp,
+ /* out */ void *frame,
+ /*inout*/ unsigned size)
+{
+ pj_media_stream_t *channel = user_data;
+ struct jb_frame *jb_frame;
+ void *p;
+ pj_uint32_t extseq;
+ pj_status_t status;
+ struct pj_audio_frame frame_in, frame_out;
+
+ PJ_UNUSED_ARG(timestamp)
+
+ /* Lock mutex */
+ pj_mutex_lock (channel->mutex);
+
+ if (!channel->codec) {
+ pj_mutex_unlock (channel->mutex);
+ return -1;
+ }
+
+ /* Get frame from jitter buffer. */
+ status = pj_jb_get (channel->jb, &extseq, &p);
+ jb_frame = p;
+ if (status != 0 || jb_frame == NULL) {
+ pj_memset(frame, 0, size);
+ pj_mutex_unlock(channel->mutex);
+ return 0;
+ }
+
+ /* Decode */
+ frame_in.buf = jb_frame->buf;
+ frame_in.size = jb_frame->size;
+ frame_in.type = PJ_AUDIO_FRAME_AUDIO; /* ignored */
+ frame_out.buf = channel->pcm_buf;
+ status = channel->codec->op->decode (channel->codec, &frame_in,
+ channel->pcm_buf_size, &frame_out);
+ if (status != 0) {
+ PJ_LOG(3, (THISFILE, "decode() has return error status %d",
+ status));
+
+ pj_memset(frame, 0, size);
+ pj_fifobuf_free (&channel->fifobuf, jb_frame);
+ pj_mutex_unlock(channel->mutex);
+ return 0;
+ }
+
+ /* Put in sound buffer. */
+ if (frame_out.size > size) {
+ PJ_LOG(3, (THISFILE, "Sound playout buffer truncated %d bytes",
+ frame_out.size - size));
+ frame_out.size = size;
+ }
+
+ pj_memcpy(frame, frame_out.buf, size);
+
+ pj_fifobuf_free (&channel->fifobuf, jb_frame);
+ pj_mutex_unlock(channel->mutex);
+ return 0;
+}
+
+static pj_status_t rec_callback( /* in */ void *user_data,
+ /* in */ pj_uint32_t timestamp,
+ /* in */ const void *frame,
+ /* in */ unsigned size)
+{
+ pj_media_stream_t *channel = user_data;
+ pj_status_t status = 0;
+ struct pj_audio_frame frame_in, frame_out;
+ int ts_len;
+ void *rtphdr;
+ int rtphdrlen;
+ int sent;
+#if 0
+ static FILE *fhnd = NULL;
+#endif
+
+ PJ_UNUSED_ARG(timestamp)
+
+ /* Start locking channel mutex */
+ pj_mutex_lock (channel->mutex);
+
+ if (!channel->codec) {
+ status = -1;
+ goto on_return;
+ }
+
+ /* Encode. */
+ frame_in.type = PJ_MEDIA_TYPE_AUDIO;
+ frame_in.buf = (void*)frame;
+ frame_in.size = size;
+ frame_out.buf = ((char*)channel->out_pkt) + sizeof(pj_rtp_hdr);
+ status = channel->codec->op->encode (channel->codec, &frame_in,
+ channel->out_pkt_size - sizeof(pj_rtp_hdr),
+ &frame_out);
+ if (status != 0) {
+ PJ_LOG(3,(THISFILE, "Codec encode() has returned error status %d",
+ status));
+ goto on_return;
+ }
+
+ /* Encapsulate. */
+ ts_len = size / (channel->snd_info.bits_per_sample / 8);
+ status = pj_rtp_encode_rtp (&channel->rtp, channel->pt, 0,
+ frame_out.size, ts_len,
+ (const void**)&rtphdr, &rtphdrlen);
+ if (status != 0) {
+ PJ_LOG(3,(THISFILE, "RTP encode_rtp() has returned error status %d",
+ status));
+ goto on_return;
+ }
+
+ if (rtphdrlen != sizeof(pj_rtp_hdr)) {
+ /* We don't support RTP with extended header yet. */
+ PJ_TODO(SUPPORT_SENDING_RTP_WITH_EXTENDED_HEADER);
+ PJ_LOG(3,(THISFILE, "Unsupported extended RTP header for transmission"));
+ goto on_return;
+ }
+
+ pj_memcpy(channel->out_pkt, rtphdr, sizeof(pj_rtp_hdr));
+
+ /* Send. */
+ sent = pj_sock_sendto (channel->rtp_sock, channel->out_pkt, frame_out.size+sizeof(pj_rtp_hdr), 0,
+ &channel->dst_addr, sizeof(channel->dst_addr));
+ if (sent != (int)frame_out.size + (int)sizeof(pj_rtp_hdr)) {
+ pj_perror(THISFILE, "Error sending RTP packet to %s:%d",
+ pj_sockaddr_get_str_addr(&channel->dst_addr),
+ pj_sockaddr_get_port(&channel->dst_addr));
+ goto on_return;
+ }
+
+ /* Update stat */
+ channel->stat.pkt_tx++;
+ channel->stat.oct_tx += frame_out.size+sizeof(pj_rtp_hdr);
+
+#if 0
+ if (fhnd == NULL) {
+ fhnd = fopen("RTP.DAT", "wb");
+ if (fhnd) {
+ fwrite (channel->out_pkt, frame_out.size+sizeof(pj_rtp_hdr), 1, fhnd);
+ fclose(fhnd);
+ }
+ }
+#endif
+
+on_return:
+ pj_mutex_unlock (channel->mutex);
+ return status;
+}
+
+
+static void* PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg)
+{
+ pj_media_stream_t *channel = arg;
+
+ while (!channel->thread_quit_flag) {
+ int len, size;
+ const pj_rtp_hdr *hdr;
+ const void *payload;
+ unsigned payloadlen;
+ int status;
+ struct jb_frame *jb_frame;
+
+ /* Wait for packet. */
+ fd_set fds;
+ pj_time_val timeout;
+
+ PJ_FD_ZERO (&fds);
+ PJ_FD_SET (channel->rtp_sock, &fds);
+ timeout.sec = 0;
+ timeout.msec = 100;
+
+ /* Wait with timeout. */
+ status = pj_sock_select(channel->rtp_sock, &fds, NULL, NULL, &timeout);
+ if (status != 1)
+ continue;
+
+ /* Get packet from socket. */
+ len = pj_sock_recv (channel->rtp_sock, channel->in_pkt, channel->in_pkt_size, 0);
+ if (len < 1) {
+ if (pj_getlasterror() == PJ_ECONNRESET) {
+ /* On Win2K SP2 (or above) and WinXP, recv() will get WSAECONNRESET
+ when the sending side receives ICMP port unreachable.
+ */
+ continue;
+ }
+ pj_perror(THISFILE, "Error receiving packet from socket (len=%d)", len);
+ pj_thread_sleep(1);
+ continue;
+ }
+
+ if (channel->state != STREAM_STARTED)
+ continue;
+
+ if (channel->thread_quit_flag)
+ break;
+
+ /* Start locking the channel. */
+ pj_mutex_lock (channel->mutex);
+
+ /* Update RTP and RTCP session. */
+ status = pj_rtp_decode_rtp (&channel->rtp, channel->in_pkt, len, &hdr, &payload, &payloadlen);
+ if (status != 0) {
+ pj_mutex_unlock (channel->mutex);
+ PJ_LOG(4,(THISFILE, "RTP decode_rtp() has returned error status %d", status));
+ continue;
+ }
+ status = pj_rtp_session_update (&channel->rtp, hdr);
+ if (status != 0 && status != PJ_RTP_ERR_SESSION_PROBATION && status != PJ_RTP_ERR_SESSION_RESTARTED) {
+ pj_mutex_unlock (channel->mutex);
+ PJ_LOG(4,(THISFILE, "RTP session_update() has returned error status %d", status));
+ continue;
+ }
+ pj_rtcp_rx_rtp (channel->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts));
+
+ /* Update stat */
+ channel->stat.pkt_rx++;
+ channel->stat.oct_rx += len;
+
+ /* Copy to FIFO buffer. */
+ size = payloadlen+sizeof(struct jb_frame);
+ jb_frame = pj_fifobuf_alloc (&channel->fifobuf, size);
+ if (jb_frame == NULL) {
+ pj_mutex_unlock (channel->mutex);
+ PJ_LOG(4,(THISFILE, "Unable to allocate %d bytes FIFO buffer", size));
+ continue;
+ }
+
+ /* Copy the payload */
+ jb_frame->size = payloadlen;
+ jb_frame->buf = ((char*)jb_frame) + sizeof(struct jb_frame);
+ pj_memcpy (jb_frame->buf, payload, payloadlen);
+
+ /* Put to jitter buffer. */
+ status = pj_jb_put (channel->jb, pj_ntohs(hdr->seq), jb_frame);
+ if (status != 0) {
+ pj_fifobuf_unalloc (&channel->fifobuf, jb_frame);
+ pj_mutex_unlock (channel->mutex);
+ PJ_LOG(4,(THISFILE, "Jitter buffer put() has returned error status %d", status));
+ continue;
+ }
+
+ pj_mutex_unlock (channel->mutex);
+ }
+
+ return NULL;
+}
+
+static void init_snd_param_from_codec_attr (pj_snd_stream_info *param,
+ const pj_codec_attr *attr)
+{
+ param->bits_per_sample = attr->pcm_bits_per_sample;
+ param->bytes_per_frame = 2;
+ param->frames_per_packet = attr->sample_rate * attr->ptime / 1000;
+ param->samples_per_frame = 1;
+ param->samples_per_sec = attr->sample_rate;
+}
+
+static pj_media_stream_t *create_channel ( pj_pool_t *pool,
+ pj_media_dir_t dir,
+ pj_media_stream_t *peer,
+ pj_codec_id *codec_id,
+ pj_media_stream_create_param *param)
+{
+ pj_media_stream_t *channel;
+ pj_codec_attr codec_attr;
+ void *ptr;
+ unsigned size;
+ int status;
+
+ /* Allocate memory for channel descriptor */
+ size = sizeof(pj_media_stream_t);
+ channel = pj_pool_calloc(pool, 1, size);
+ if (!channel) {
+ PJ_LOG(1,(THISFILE, "Unable to allocate %u bytes channel descriptor",
+ size));
+ return NULL;
+ }
+
+ channel->dir = dir;
+ channel->pt = codec_id->pt;
+ channel->peer = peer;
+ channel->codec_mgr = pj_med_mgr_get_codec_mgr (param->mediamgr);
+ channel->rtp_sock = param->rtp_sock;
+ channel->rtcp_sock = param->rtcp_sock;
+ channel->dst_addr = *param->remote_addr;
+ channel->state = STREAM_STOPPED;
+
+ /* Create mutex for the channel. */
+ channel->mutex = pj_mutex_create(pool, NULL, PJ_MUTEX_SIMPLE);
+ if (channel->mutex == NULL)
+ goto err_cleanup;
+
+ /* Create and initialize codec, only if peer is not present.
+ We only use one codec instance for both encoder and decoder.
+ */
+ if (peer && peer->codec) {
+ channel->codec = peer->codec;
+ status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id,
+ &codec_attr);
+ if (status != 0) {
+ goto err_cleanup;
+ }
+
+ } else {
+ channel->codec = pj_codec_mgr_alloc_codec(channel->codec_mgr, codec_id);
+ if (channel->codec == NULL) {
+ goto err_cleanup;
+ }
+
+ status = channel->codec->factory->op->default_attr(channel->codec->factory, codec_id,
+ &codec_attr);
+ if (status != 0) {
+ goto err_cleanup;
+ }
+
+ codec_attr.pt = codec_id->pt;
+ status = channel->codec->op->open(channel->codec, &codec_attr);
+ if (status != 0) {
+ goto err_cleanup;
+ }
+ }
+
+ /* Allocate buffer for incoming packet. */
+ channel->in_pkt_size = PJ_MAX_MTU;
+ channel->in_pkt = pj_pool_alloc(pool, channel->in_pkt_size);
+ if (!channel->in_pkt) {
+ PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes incoming packet buffer",
+ channel->in_pkt_size));
+ goto err_cleanup;
+ }
+
+ /* Allocate buffer for outgoing packet. */
+ channel->out_pkt_size = sizeof(pj_rtp_hdr) +
+ codec_attr.avg_bps / 8 * PJ_MAX_FRAME_DURATION_MS / 1000;
+ if (channel->out_pkt_size > PJ_MAX_MTU)
+ channel->out_pkt_size = PJ_MAX_MTU;
+ channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size);
+ if (!channel->out_pkt) {
+ PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes encoding buffer",
+ channel->out_pkt_size));
+ goto err_cleanup;
+ }
+
+ /* Allocate buffer for decoding to PCM */
+ channel->pcm_buf_size = codec_attr.sample_rate *
+ codec_attr.pcm_bits_per_sample / 8 *
+ PJ_MAX_FRAME_DURATION_MS / 1000;
+ channel->pcm_buf = pj_pool_alloc (pool, channel->pcm_buf_size);
+ if (!channel->pcm_buf) {
+ PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes PCM buffer",
+ channel->pcm_buf_size));
+ goto err_cleanup;
+ }
+
+ /* Allocate buffer for frames put in jitter buffer. */
+ size = codec_attr.avg_bps / 8 * PJ_MAX_BUFFER_SIZE_MS / 1000;
+ ptr = pj_pool_alloc(pool, size);
+ if (!ptr) {
+ PJ_LOG(1, (THISFILE, "Unable to allocate %u bytes jitter buffer",
+ channel->pcm_buf_size));
+ goto err_cleanup;
+ }
+ //pj_fifobuf_init (&channel->fifobuf, ptr, size);
+
+ /* Create and initialize sound device */
+ init_snd_param_from_codec_attr (&channel->snd_info, &codec_attr);
+
+ if (dir == PJ_MEDIA_DIR_ENCODING)
+ channel->snd_stream = pj_snd_open_recorder(-1, &channel->snd_info,
+ &rec_callback, channel);
+ else
+ channel->snd_stream = pj_snd_open_player(-1, &channel->snd_info,
+ &play_callback, channel);
+
+ if (!channel->snd_stream)
+ goto err_cleanup;
+
+ /* Create RTP and RTCP sessions. */
+ if (pj_rtp_session_init(&channel->rtp, codec_id->pt, param->ssrc) != 0) {
+ PJ_LOG(1, (THISFILE, "RTP session initialization error"));
+ goto err_cleanup;
+ }
+
+ /* For decoder, create RTCP session, jitter buffer, and transport thread. */
+ if (dir == PJ_MEDIA_DIR_DECODING) {
+ channel->rtcp = pj_pool_calloc(pool, 1, sizeof(pj_rtcp_session));
+ if (!channel->rtcp) {
+ PJ_LOG(1, (THISFILE, "Unable to allocate RTCP session"));
+ goto err_cleanup;
+ }
+
+ pj_rtcp_init(channel->rtcp, param->ssrc);
+
+ channel->jb = pj_pool_calloc(pool, 1, sizeof(pj_jitter_buffer));
+ if (!channel->jb) {
+ PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer descriptor"));
+ goto err_cleanup;
+ }
+ if (pj_jb_init(channel->jb, pool, param->jb_min, param->jb_max, param->jb_maxcnt)) {
+ PJ_LOG(1, (THISFILE, "Unable to allocate jitter buffer"));
+ goto err_cleanup;
+ }
+
+ channel->transport_thread = pj_thread_create(pool, "decode",
+ &stream_decoder_transport_thread, channel,
+ 0, NULL, 0);
+ if (!channel->transport_thread) {
+ pj_perror(THISFILE, "Unable to create transport thread");
+ goto err_cleanup;
+ }
+ }
+
+ /* Done. */
+ return channel;
+
+err_cleanup:
+ pj_media_stream_destroy(channel);
+ return NULL;
+}
+
+
+PJ_DEF(pj_status_t) pj_media_stream_create (pj_pool_t *pool,
+ pj_media_stream_t **enc_stream,
+ pj_media_stream_t **dec_stream,
+ pj_media_stream_create_param *param)
+{
+ *dec_stream = *enc_stream = NULL;
+
+ if (param->dir & PJ_MEDIA_DIR_DECODING) {
+ *dec_stream =
+ create_channel(pool, PJ_MEDIA_DIR_DECODING, NULL, param->codec_id, param);
+ if (!*dec_stream)
+ return -1;
+ }
+
+ if (param->dir & PJ_MEDIA_DIR_ENCODING) {
+ *enc_stream =
+ create_channel(pool, PJ_MEDIA_DIR_ENCODING, *dec_stream, param->codec_id, param);
+ if (!*enc_stream) {
+ if (*dec_stream) {
+ pj_media_stream_destroy(*dec_stream);
+ *dec_stream = NULL;
+ }
+ return -1;
+ }
+
+ if (*dec_stream) {
+ (*dec_stream)->peer = *enc_stream;
+ }
+ }
+
+ return 0;
+}
+
+PJ_DEF(pj_status_t) pj_media_stream_start (pj_media_stream_t *channel)
+{
+ pj_status_t status;
+
+ status = pj_snd_stream_start(channel->snd_stream);
+
+ if (status == 0)
+ channel->state = STREAM_STARTED;
+ return status;
+}
+
+PJ_DEF(pj_status_t) pj_media_stream_get_stat (const pj_media_stream_t *stream,
+ pj_media_stream_stat *stat)
+{
+ if (stream->dir == PJ_MEDIA_DIR_ENCODING) {
+ pj_memcpy (stat, &stream->stat, sizeof(*stat));
+ } else {
+ pj_rtcp_pkt *rtcp_pkt;
+ int len;
+
+ pj_memset (stat, 0, sizeof(*stat));
+ pj_assert (stream->rtcp != 0);
+ pj_rtcp_build_rtcp (stream->rtcp, &rtcp_pkt, &len);
+
+ stat->pkt_rx = stream->stat.pkt_rx;
+ stat->oct_rx = stream->stat.oct_rx;
+
+ PJ_TODO(SUPPORT_JITTER_CALCULATION_FOR_NON_8KHZ_SAMPLE_RATE)
+ stat->jitter = pj_ntohl(rtcp_pkt->rr.jitter) / 8;
+ stat->pkt_lost = (rtcp_pkt->rr.total_lost_2 << 16) +
+ (rtcp_pkt->rr.total_lost_1 << 8) +
+ rtcp_pkt->rr.total_lost_0;
+ }
+ return 0;
+}
+
+PJ_DEF(pj_status_t) pj_media_stream_pause (pj_media_stream_t *channel)
+{
+ PJ_UNUSED_ARG(channel)
+ return -1;
+}
+
+PJ_DEF(pj_status_t) pj_media_stream_resume (pj_media_stream_t *channel)
+{
+ PJ_UNUSED_ARG(channel)
+ return -1;
+}
+
+PJ_DEF(pj_status_t) pj_media_stream_destroy (pj_media_stream_t *channel)
+{
+ channel->thread_quit_flag = 1;
+
+ pj_mutex_lock (channel->mutex);
+ if (channel->peer)
+ pj_mutex_lock (channel->peer->mutex);
+
+ if (channel->jb) {
+ /* No need to deinitialize jitter buffer. */
+ }
+ if (channel->transport_thread) {
+ pj_thread_join(channel->transport_thread);
+ pj_thread_destroy(channel->transport_thread);
+ channel->transport_thread = NULL;
+ }
+ if (channel->snd_stream != NULL) {
+ pj_mutex_unlock (channel->mutex);
+ pj_snd_stream_stop(channel->snd_stream);
+ pj_mutex_lock (channel->mutex);
+ pj_snd_stream_close(channel->snd_stream);
+ channel->snd_stream = NULL;
+ }
+ if (channel->codec) {
+ channel->codec->op->close(channel->codec);
+ pj_codec_mgr_dealloc_codec(channel->codec_mgr, channel->codec);
+ channel->codec = NULL;
+ }
+ if (channel->peer) {
+ pj_media_stream_t *peer = channel->peer;
+ peer->peer = NULL;
+ peer->codec = NULL;
+ peer->thread_quit_flag = 1;
+ if (peer->transport_thread) {
+ pj_mutex_unlock (peer->mutex);
+ pj_thread_join(peer->transport_thread);
+ pj_mutex_lock (peer->mutex);
+ pj_thread_destroy(peer->transport_thread);
+ peer->transport_thread = NULL;
+ }
+ if (peer->snd_stream) {
+ pj_mutex_unlock (peer->mutex);
+ pj_snd_stream_stop(peer->snd_stream);
+ pj_mutex_lock (peer->mutex);
+ pj_snd_stream_close(peer->snd_stream);
+ peer->snd_stream = NULL;
+ }
+ }
+
+ channel->state = STREAM_STOPPED;
+
+ if (channel->peer)
+ pj_mutex_unlock (channel->peer->mutex);
+ pj_mutex_unlock(channel->mutex);
+ pj_mutex_destroy(channel->mutex);
+
+ return 0;
+}
+