summaryrefslogtreecommitdiff
path: root/pjmedia/src/pjmedia/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjmedia/src/pjmedia/stream.c')
-rw-r--r--pjmedia/src/pjmedia/stream.c57
1 files changed, 30 insertions, 27 deletions
diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c
index 718cace1..323e0415 100644
--- a/pjmedia/src/pjmedia/stream.c
+++ b/pjmedia/src/pjmedia/stream.c
@@ -24,8 +24,13 @@
#include <pj/log.h>
#include <pj/string.h> /* memcpy() */
#include <pj/pool.h>
+#include <pj/assert.h>
+#include <pj/compat/socket.h>
+#include <pj/sock_select.h>
+#include <pj/errno.h>
#include <stdlib.h>
+
#define THISFILE "stream.c"
#define ERRLEVEL 1
@@ -91,7 +96,7 @@ static pj_status_t play_callback(/* in */ void *user_data,
pj_status_t status;
struct pj_audio_frame frame_in, frame_out;
- PJ_UNUSED_ARG(timestamp)
+ PJ_UNUSED_ARG(timestamp);
/* Lock mutex */
pj_mutex_lock (channel->mutex);
@@ -152,12 +157,12 @@ static pj_status_t rec_callback( /* in */ void *user_data,
int ts_len;
void *rtphdr;
int rtphdrlen;
- int sent;
+ pj_ssize_t sent;
#if 0
static FILE *fhnd = NULL;
#endif
- PJ_UNUSED_ARG(timestamp)
+ PJ_UNUSED_ARG(timestamp);
/* Start locking channel mutex */
pj_mutex_lock (channel->mutex);
@@ -202,14 +207,11 @@ static pj_status_t rec_callback( /* in */ void *user_data,
pj_memcpy(channel->out_pkt, rtphdr, sizeof(pj_rtp_hdr));
/* Send. */
- sent = pj_sock_sendto (channel->rtp_sock, channel->out_pkt, frame_out.size+sizeof(pj_rtp_hdr), 0,
+ sent = frame_out.size+sizeof(pj_rtp_hdr);
+ status = pj_sock_sendto (channel->rtp_sock, channel->out_pkt, &sent, 0,
&channel->dst_addr, sizeof(channel->dst_addr));
- if (sent != (int)frame_out.size + (int)sizeof(pj_rtp_hdr)) {
- pj_perror(THISFILE, "Error sending RTP packet to %s:%d",
- pj_sockaddr_get_str_addr(&channel->dst_addr),
- pj_sockaddr_get_port(&channel->dst_addr));
+ if (status != PJ_SUCCESS)
goto on_return;
- }
/* Update stat */
channel->stat.pkt_tx++;
@@ -231,12 +233,12 @@ on_return:
}
-static void* PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg)
+static int PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg)
{
pj_media_stream_t *channel = arg;
while (!channel->thread_quit_flag) {
- int len, size;
+ pj_ssize_t len, size;
const pj_rtp_hdr *hdr;
const void *payload;
unsigned payloadlen;
@@ -244,7 +246,7 @@ static void* PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg)
struct jb_frame *jb_frame;
/* Wait for packet. */
- fd_set fds;
+ pj_fd_set_t fds;
pj_time_val timeout;
PJ_FD_ZERO (&fds);
@@ -258,15 +260,16 @@ static void* PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg)
continue;
/* Get packet from socket. */
- len = pj_sock_recv (channel->rtp_sock, channel->in_pkt, channel->in_pkt_size, 0);
- if (len < 1) {
- if (pj_getlasterror() == PJ_ECONNRESET) {
+ len = channel->in_pkt_size;
+ status = pj_sock_recv (channel->rtp_sock, channel->in_pkt, &len, 0);
+ if (len < 1 || status != PJ_SUCCESS) {
+ if (pj_get_netos_error() == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) {
/* On Win2K SP2 (or above) and WinXP, recv() will get WSAECONNRESET
when the sending side receives ICMP port unreachable.
*/
continue;
}
- pj_perror(THISFILE, "Error receiving packet from socket (len=%d)", len);
+ //pj_perror(THISFILE, "Error receiving packet from socket (len=%d)", len);
pj_thread_sleep(1);
continue;
}
@@ -325,7 +328,7 @@ static void* PJ_THREAD_FUNC stream_decoder_transport_thread (void*arg)
pj_mutex_unlock (channel->mutex);
}
- return NULL;
+ return 0;
}
static void init_snd_param_from_codec_attr (pj_snd_stream_info *param,
@@ -348,7 +351,7 @@ static pj_media_stream_t *create_channel ( pj_pool_t *pool,
pj_codec_attr codec_attr;
void *ptr;
unsigned size;
- int status;
+ pj_status_t status;
/* Allocate memory for channel descriptor */
size = sizeof(pj_media_stream_t);
@@ -369,8 +372,8 @@ static pj_media_stream_t *create_channel ( pj_pool_t *pool,
channel->state = STREAM_STOPPED;
/* Create mutex for the channel. */
- channel->mutex = pj_mutex_create(pool, NULL, PJ_MUTEX_SIMPLE);
- if (channel->mutex == NULL)
+ status = pj_mutex_create_simple(pool, NULL, &channel->mutex);
+ if (status != PJ_SUCCESS)
goto err_cleanup;
/* Create and initialize codec, only if peer is not present.
@@ -484,11 +487,11 @@ static pj_media_stream_t *create_channel ( pj_pool_t *pool,
goto err_cleanup;
}
- channel->transport_thread = pj_thread_create(pool, "decode",
- &stream_decoder_transport_thread, channel,
- 0, NULL, 0);
- if (!channel->transport_thread) {
- pj_perror(THISFILE, "Unable to create transport thread");
+ status = pj_thread_create(pool, "decode",
+ &stream_decoder_transport_thread, channel,
+ 0, 0, &channel->transport_thread);
+ if (status != PJ_SUCCESS) {
+ //pj_perror(THISFILE, "Unable to create transport thread");
goto err_cleanup;
}
}
@@ -573,13 +576,13 @@ PJ_DEF(pj_status_t) pj_media_stream_get_stat (const pj_media_stream_t *stream,
PJ_DEF(pj_status_t) pj_media_stream_pause (pj_media_stream_t *channel)
{
- PJ_UNUSED_ARG(channel)
+ PJ_UNUSED_ARG(channel);
return -1;
}
PJ_DEF(pj_status_t) pj_media_stream_resume (pj_media_stream_t *channel)
{
- PJ_UNUSED_ARG(channel)
+ PJ_UNUSED_ARG(channel);
return -1;
}