summaryrefslogtreecommitdiff
path: root/pjmedia/src
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-06-22 18:49:45 +0000
committerBenny Prijono <bennylp@teluu.com>2006-06-22 18:49:45 +0000
commit0935d5a1236d6a78e346c22a0b62442eff95bd41 (patch)
tree0d9da39e6fcdc35c9986d12083cd891e57c9b4ec /pjmedia/src
parentdd4c4eb0397a77c84ca6818e2073b374bef79dd1 (diff)
Added better API for media transport, and fixed bugs with pending RTP write operation in UDP media transport
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@539 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia/src')
-rw-r--r--pjmedia/src/pjmedia/transport_udp.c146
1 files changed, 100 insertions, 46 deletions
diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c
index 2ba16659..31501e4e 100644
--- a/pjmedia/src/pjmedia/transport_udp.c
+++ b/pjmedia/src/pjmedia/transport_udp.c
@@ -30,6 +30,16 @@
/* Maximum size of incoming RTCP packet */
#define RTCP_LEN 600
+/* Maximum pending write operations */
+#define MAX_PENDING 4
+
+/* Pending write buffer */
+typedef struct pending_write
+{
+ char buffer[RTP_LEN];
+ pj_ioqueue_op_key_t op_key;
+} pending_write;
+
struct transport_udp
{
@@ -37,7 +47,8 @@ struct transport_udp
pj_pool_t *pool; /**< Memory pool */
unsigned options; /**< Transport options. */
- pjmedia_stream *stream; /**< Stream user (may be NULL) */
+ void *user_data; /**< Only valid when attached */
+ pj_bool_t attached; /**< Has attachment? */
pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address */
pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address */
void (*rtp_cb)( pjmedia_stream*,/**< To report incoming RTP. */
@@ -51,7 +62,8 @@ struct transport_udp
pj_sockaddr_in rtp_addr_name; /**< Published RTP address. */
pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */
pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */
- pj_ioqueue_op_key_t rtp_write_op; /**< Pending write operation */
+ unsigned rtp_write_op_id;/**< Next write_op to use */
+ pending_write rtp_pending_write[MAX_PENDING]; /**< Pending write */
pj_sockaddr_in rtp_src_addr; /**< Actual packet src addr. */
unsigned rtp_src_cnt; /**< How many pkt from this addr. */
int rtp_addrlen; /**< Address length. */
@@ -75,17 +87,17 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key,
pj_ssize_t bytes_read);
static pj_status_t transport_attach( pjmedia_transport *tp,
- pjmedia_stream *strm,
+ void *user_data,
const pj_sockaddr_t *rem_addr,
unsigned addr_len,
- void (*rtp_cb)(pjmedia_stream*,
+ void (*rtp_cb)(void*,
const void*,
pj_ssize_t),
- void (*rtcp_cb)(pjmedia_stream*,
+ void (*rtcp_cb)(void*,
const void*,
pj_ssize_t));
static void transport_detach( pjmedia_transport *tp,
- pjmedia_stream *strm);
+ void *strm);
static pj_status_t transport_send_rtp( pjmedia_transport *tp,
const void *pkt,
pj_size_t size);
@@ -113,6 +125,20 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt,
unsigned options,
pjmedia_transport **p_tp)
{
+ return pjmedia_transport_udp_create2(endpt, name, NULL, port, options,
+ p_tp);
+}
+
+/**
+ * Create UDP stream transport.
+ */
+PJ_DEF(pj_status_t) pjmedia_transport_udp_create2(pjmedia_endpt *endpt,
+ const char *name,
+ const pj_str_t *addr,
+ int port,
+ unsigned options,
+ pjmedia_transport **p_tp)
+{
pjmedia_sock_info si;
pj_status_t status;
@@ -130,9 +156,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt,
goto on_error;
/* Bind RTP socket */
- si.rtp_addr_name.sin_family = PJ_AF_INET;
- si.rtp_addr_name.sin_port = pj_htons((pj_uint16_t)port);
-
+ pj_sockaddr_in_init(&si.rtp_addr_name, addr, (pj_uint16_t)port);
status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name,
sizeof(si.rtp_addr_name));
if (status != PJ_SUCCESS)
@@ -145,9 +169,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt,
goto on_error;
/* Bind RTCP socket */
- si.rtcp_addr_name.sin_family = PJ_AF_INET;
- si.rtcp_addr_name.sin_port = pj_htons((pj_uint16_t)(port+1));
-
+ pj_sockaddr_in_init(&si.rtcp_addr_name, addr, (pj_uint16_t)(port+1));
status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name,
sizeof(si.rtcp_addr_name));
if (status != PJ_SUCCESS)
@@ -181,6 +203,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback rtp_cb, rtcp_cb;
pj_ssize_t size;
+ unsigned i;
pj_status_t status;
@@ -223,7 +246,9 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
goto on_error;
pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op));
- pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op));
+ for (i=0; i<PJ_ARRAY_SIZE(tp->rtp_pending_write); ++i)
+ pj_ioqueue_op_key_init(&tp->rtp_pending_write[i].op_key,
+ sizeof(tp->rtp_pending_write[i].op_key));
/* Kick of pending RTP read from the ioqueue */
tp->rtp_addrlen = sizeof(tp->rtp_src_addr);
@@ -270,16 +295,17 @@ on_error:
/*
* Get media socket info.
*/
-PJ_DEF(pj_status_t) pjmedia_transport_udp_get_sock_info(pjmedia_transport *tp,
- pjmedia_sock_info *inf)
+PJ_DEF(pj_status_t)
+pjmedia_transport_udp_get_info( pjmedia_transport *tp,
+ pjmedia_transport_udp_info *inf)
{
struct transport_udp *udp = (struct transport_udp*)tp;
PJ_ASSERT_RETURN(tp && inf, PJ_EINVAL);
- inf->rtp_sock = udp->rtp_sock;
- inf->rtp_addr_name = udp->rtp_addr_name;
- inf->rtcp_sock = udp->rtcp_sock;
- inf->rtcp_addr_name = udp->rtcp_addr_name;
+ inf->skinfo.rtp_sock = udp->rtp_sock;
+ inf->skinfo.rtp_addr_name = udp->rtp_addr_name;
+ inf->skinfo.rtcp_sock = udp->rtcp_sock;
+ inf->skinfo.rtcp_addr_name = udp->rtcp_addr_name;
return PJ_SUCCESS;
}
@@ -296,7 +322,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp)
PJ_ASSERT_RETURN(tp, PJ_EINVAL);
/* Must not close while stream is using this */
- PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP);
+ PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP);
if (udp->rtp_key) {
@@ -334,14 +360,14 @@ static void on_rx_rtp( pj_ioqueue_key_t *key,
udp = pj_ioqueue_get_user_data(key);
do {
- void (*cb)(pjmedia_stream*,const void*,pj_ssize_t);
- pjmedia_stream *stream;
+ void (*cb)(void*,const void*,pj_ssize_t);
+ void *user_data;
cb = udp->rtp_cb;
- stream = udp->stream;
+ user_data = udp->user_data;
- if (bytes_read > 0 && cb && stream)
- (*cb)(stream, udp->rtp_pkt, bytes_read);
+ if (udp->attached && cb)
+ (*cb)(user_data, udp->rtp_pkt, bytes_read);
/* See if source address of RTP packet is different than the
* configured address, and switch RTP remote address to
@@ -405,14 +431,14 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key,
udp = pj_ioqueue_get_user_data(key);
do {
- void (*cb)(pjmedia_stream*,const void*,pj_ssize_t);
- pjmedia_stream *stream;
+ void (*cb)(void*,const void*,pj_ssize_t);
+ void *user_data;
cb = udp->rtcp_cb;
- stream = udp->stream;
+ user_data = udp->user_data;
- if (bytes_read > 0 && cb && stream)
- (*cb)(stream, udp->rtcp_pkt, bytes_read);
+ if (udp->attached && cb)
+ (*cb)(user_data, udp->rtcp_pkt, bytes_read);
bytes_read = sizeof(udp->rtcp_pkt);
status = pj_ioqueue_recv(udp->rtcp_key, &udp->rtcp_read_op,
@@ -424,23 +450,23 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key,
/* Called by stream to initialize the transport */
static pj_status_t transport_attach( pjmedia_transport *tp,
- pjmedia_stream *strm,
+ void *user_data,
const pj_sockaddr_t *rem_addr,
unsigned addr_len,
- void (*rtp_cb)(pjmedia_stream*,
+ void (*rtp_cb)(void*,
const void*,
pj_ssize_t),
- void (*rtcp_cb)(pjmedia_stream*,
+ void (*rtcp_cb)(void*,
const void*,
pj_ssize_t))
{
struct transport_udp *udp = (struct transport_udp*) tp;
/* Validate arguments */
- PJ_ASSERT_RETURN(tp && strm && rem_addr && addr_len, PJ_EINVAL);
+ PJ_ASSERT_RETURN(tp && rem_addr && addr_len, PJ_EINVAL);
/* Must not be "attached" to existing stream */
- PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP);
+ PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP);
/* "Attach" the stream: */
@@ -455,9 +481,10 @@ static pj_status_t transport_attach( pjmedia_transport *tp,
/* Save the callbacks */
udp->rtp_cb = rtp_cb;
udp->rtcp_cb = rtcp_cb;
+ udp->user_data = user_data;
- /* Last, save the stream to mark that we have a "client" */
- udp->stream = strm;
+ /* Last, mark transport as attached */
+ udp->attached = PJ_TRUE;
return PJ_SUCCESS;
}
@@ -465,18 +492,25 @@ static pj_status_t transport_attach( pjmedia_transport *tp,
/* Called by stream when it no longer needs the transport */
static void transport_detach( pjmedia_transport *tp,
- pjmedia_stream *strm)
+ void *user_data)
{
struct transport_udp *udp = (struct transport_udp*) tp;
- pj_assert(tp && strm);
+ pj_assert(tp);
+
+ /* User data is unreferenced on Release build */
+ PJ_UNUSED_ARG(user_data);
+
+ /* As additional checking, check if the same user data is specified */
+ pj_assert(user_data == udp->user_data);
- PJ_UNUSED_ARG(strm);
+ /* First, mark stream as unattached */
+ udp->attached = PJ_FALSE;
/* Clear up stream infos from transport */
- udp->stream = NULL;
udp->rtp_cb = NULL;
udp->rtcp_cb = NULL;
+ udp->user_data = NULL;
}
@@ -487,14 +521,34 @@ static pj_status_t transport_send_rtp( pjmedia_transport *tp,
{
struct transport_udp *udp = (struct transport_udp*)tp;
pj_ssize_t sent;
+ unsigned id;
+ struct pending_write *pw;
pj_status_t status;
- PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP);
+ /* Must be attached */
+ PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP);
+
+ /* Check that the size is supported */
+ PJ_ASSERT_RETURN(size <= RTP_LEN, PJ_ETOOBIG);
+
+ id = udp->rtp_write_op_id;
+ pw = &udp->rtp_pending_write[id];
+
+ /* We need to copy packet to our buffer because when the
+ * operation is pending, caller might write something else
+ * to the original buffer.
+ */
+ pj_memcpy(pw->buffer, pkt, size);
sent = size;
- status = pj_ioqueue_sendto( udp->rtp_key, &udp->rtp_write_op,
- pkt, &sent, 0,
- &udp->rem_rtp_addr, sizeof(pj_sockaddr_in));
+ status = pj_ioqueue_sendto( udp->rtp_key,
+ &udp->rtp_pending_write[id].op_key,
+ pw->buffer, &sent, 0,
+ &udp->rem_rtp_addr,
+ sizeof(pj_sockaddr_in));
+
+ udp->rtp_write_op_id = (udp->rtp_write_op_id + 1) %
+ PJ_ARRAY_SIZE(udp->rtp_pending_write);
if (status==PJ_SUCCESS || status==PJ_EPENDING)
return PJ_SUCCESS;
@@ -511,7 +565,7 @@ static pj_status_t transport_send_rtcp(pjmedia_transport *tp,
pj_ssize_t sent;
pj_status_t status;
- PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP);
+ PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP);
sent = size;
status = pj_ioqueue_sendto( udp->rtcp_key, &udp->rtcp_write_op,