diff options
author | Benny Prijono <bennylp@teluu.com> | 2006-06-22 18:49:45 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2006-06-22 18:49:45 +0000 |
commit | 0935d5a1236d6a78e346c22a0b62442eff95bd41 (patch) | |
tree | 0d9da39e6fcdc35c9986d12083cd891e57c9b4ec /pjmedia/src | |
parent | dd4c4eb0397a77c84ca6818e2073b374bef79dd1 (diff) |
Added better API for media transport, and fixed bugs with pending RTP write operation in UDP media transport
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@539 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia/src')
-rw-r--r-- | pjmedia/src/pjmedia/transport_udp.c | 146 |
1 files changed, 100 insertions, 46 deletions
diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c index 2ba16659..31501e4e 100644 --- a/pjmedia/src/pjmedia/transport_udp.c +++ b/pjmedia/src/pjmedia/transport_udp.c @@ -30,6 +30,16 @@ /* Maximum size of incoming RTCP packet */ #define RTCP_LEN 600 +/* Maximum pending write operations */ +#define MAX_PENDING 4 + +/* Pending write buffer */ +typedef struct pending_write +{ + char buffer[RTP_LEN]; + pj_ioqueue_op_key_t op_key; +} pending_write; + struct transport_udp { @@ -37,7 +47,8 @@ struct transport_udp pj_pool_t *pool; /**< Memory pool */ unsigned options; /**< Transport options. */ - pjmedia_stream *stream; /**< Stream user (may be NULL) */ + void *user_data; /**< Only valid when attached */ + pj_bool_t attached; /**< Has attachment? */ pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address */ pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address */ void (*rtp_cb)( pjmedia_stream*,/**< To report incoming RTP. */ @@ -51,7 +62,8 @@ struct transport_udp pj_sockaddr_in rtp_addr_name; /**< Published RTP address. */ pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */ pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */ - pj_ioqueue_op_key_t rtp_write_op; /**< Pending write operation */ + unsigned rtp_write_op_id;/**< Next write_op to use */ + pending_write rtp_pending_write[MAX_PENDING]; /**< Pending write */ pj_sockaddr_in rtp_src_addr; /**< Actual packet src addr. */ unsigned rtp_src_cnt; /**< How many pkt from this addr. */ int rtp_addrlen; /**< Address length. */ @@ -75,17 +87,17 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); static pj_status_t transport_attach( pjmedia_transport *tp, - pjmedia_stream *strm, + void *user_data, const pj_sockaddr_t *rem_addr, unsigned addr_len, - void (*rtp_cb)(pjmedia_stream*, + void (*rtp_cb)(void*, const void*, pj_ssize_t), - void (*rtcp_cb)(pjmedia_stream*, + void (*rtcp_cb)(void*, const void*, pj_ssize_t)); static void transport_detach( pjmedia_transport *tp, - pjmedia_stream *strm); + void *strm); static pj_status_t transport_send_rtp( pjmedia_transport *tp, const void *pkt, pj_size_t size); @@ -113,6 +125,20 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt, unsigned options, pjmedia_transport **p_tp) { + return pjmedia_transport_udp_create2(endpt, name, NULL, port, options, + p_tp); +} + +/** + * Create UDP stream transport. + */ +PJ_DEF(pj_status_t) pjmedia_transport_udp_create2(pjmedia_endpt *endpt, + const char *name, + const pj_str_t *addr, + int port, + unsigned options, + pjmedia_transport **p_tp) +{ pjmedia_sock_info si; pj_status_t status; @@ -130,9 +156,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt, goto on_error; /* Bind RTP socket */ - si.rtp_addr_name.sin_family = PJ_AF_INET; - si.rtp_addr_name.sin_port = pj_htons((pj_uint16_t)port); - + pj_sockaddr_in_init(&si.rtp_addr_name, addr, (pj_uint16_t)port); status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name, sizeof(si.rtp_addr_name)); if (status != PJ_SUCCESS) @@ -145,9 +169,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt, goto on_error; /* Bind RTCP socket */ - si.rtcp_addr_name.sin_family = PJ_AF_INET; - si.rtcp_addr_name.sin_port = pj_htons((pj_uint16_t)(port+1)); - + pj_sockaddr_in_init(&si.rtcp_addr_name, addr, (pj_uint16_t)(port+1)); status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name, sizeof(si.rtcp_addr_name)); if (status != PJ_SUCCESS) @@ -181,6 +203,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, pj_ioqueue_t *ioqueue; pj_ioqueue_callback rtp_cb, rtcp_cb; pj_ssize_t size; + unsigned i; pj_status_t status; @@ -223,7 +246,9 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, goto on_error; pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op)); - pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op)); + for (i=0; i<PJ_ARRAY_SIZE(tp->rtp_pending_write); ++i) + pj_ioqueue_op_key_init(&tp->rtp_pending_write[i].op_key, + sizeof(tp->rtp_pending_write[i].op_key)); /* Kick of pending RTP read from the ioqueue */ tp->rtp_addrlen = sizeof(tp->rtp_src_addr); @@ -270,16 +295,17 @@ on_error: /* * Get media socket info. */ -PJ_DEF(pj_status_t) pjmedia_transport_udp_get_sock_info(pjmedia_transport *tp, - pjmedia_sock_info *inf) +PJ_DEF(pj_status_t) +pjmedia_transport_udp_get_info( pjmedia_transport *tp, + pjmedia_transport_udp_info *inf) { struct transport_udp *udp = (struct transport_udp*)tp; PJ_ASSERT_RETURN(tp && inf, PJ_EINVAL); - inf->rtp_sock = udp->rtp_sock; - inf->rtp_addr_name = udp->rtp_addr_name; - inf->rtcp_sock = udp->rtcp_sock; - inf->rtcp_addr_name = udp->rtcp_addr_name; + inf->skinfo.rtp_sock = udp->rtp_sock; + inf->skinfo.rtp_addr_name = udp->rtp_addr_name; + inf->skinfo.rtcp_sock = udp->rtcp_sock; + inf->skinfo.rtcp_addr_name = udp->rtcp_addr_name; return PJ_SUCCESS; } @@ -296,7 +322,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp) PJ_ASSERT_RETURN(tp, PJ_EINVAL); /* Must not close while stream is using this */ - PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP); if (udp->rtp_key) { @@ -334,14 +360,14 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, udp = pj_ioqueue_get_user_data(key); do { - void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); - pjmedia_stream *stream; + void (*cb)(void*,const void*,pj_ssize_t); + void *user_data; cb = udp->rtp_cb; - stream = udp->stream; + user_data = udp->user_data; - if (bytes_read > 0 && cb && stream) - (*cb)(stream, udp->rtp_pkt, bytes_read); + if (udp->attached && cb) + (*cb)(user_data, udp->rtp_pkt, bytes_read); /* See if source address of RTP packet is different than the * configured address, and switch RTP remote address to @@ -405,14 +431,14 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key, udp = pj_ioqueue_get_user_data(key); do { - void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); - pjmedia_stream *stream; + void (*cb)(void*,const void*,pj_ssize_t); + void *user_data; cb = udp->rtcp_cb; - stream = udp->stream; + user_data = udp->user_data; - if (bytes_read > 0 && cb && stream) - (*cb)(stream, udp->rtcp_pkt, bytes_read); + if (udp->attached && cb) + (*cb)(user_data, udp->rtcp_pkt, bytes_read); bytes_read = sizeof(udp->rtcp_pkt); status = pj_ioqueue_recv(udp->rtcp_key, &udp->rtcp_read_op, @@ -424,23 +450,23 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key, /* Called by stream to initialize the transport */ static pj_status_t transport_attach( pjmedia_transport *tp, - pjmedia_stream *strm, + void *user_data, const pj_sockaddr_t *rem_addr, unsigned addr_len, - void (*rtp_cb)(pjmedia_stream*, + void (*rtp_cb)(void*, const void*, pj_ssize_t), - void (*rtcp_cb)(pjmedia_stream*, + void (*rtcp_cb)(void*, const void*, pj_ssize_t)) { struct transport_udp *udp = (struct transport_udp*) tp; /* Validate arguments */ - PJ_ASSERT_RETURN(tp && strm && rem_addr && addr_len, PJ_EINVAL); + PJ_ASSERT_RETURN(tp && rem_addr && addr_len, PJ_EINVAL); /* Must not be "attached" to existing stream */ - PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP); /* "Attach" the stream: */ @@ -455,9 +481,10 @@ static pj_status_t transport_attach( pjmedia_transport *tp, /* Save the callbacks */ udp->rtp_cb = rtp_cb; udp->rtcp_cb = rtcp_cb; + udp->user_data = user_data; - /* Last, save the stream to mark that we have a "client" */ - udp->stream = strm; + /* Last, mark transport as attached */ + udp->attached = PJ_TRUE; return PJ_SUCCESS; } @@ -465,18 +492,25 @@ static pj_status_t transport_attach( pjmedia_transport *tp, /* Called by stream when it no longer needs the transport */ static void transport_detach( pjmedia_transport *tp, - pjmedia_stream *strm) + void *user_data) { struct transport_udp *udp = (struct transport_udp*) tp; - pj_assert(tp && strm); + pj_assert(tp); + + /* User data is unreferenced on Release build */ + PJ_UNUSED_ARG(user_data); + + /* As additional checking, check if the same user data is specified */ + pj_assert(user_data == udp->user_data); - PJ_UNUSED_ARG(strm); + /* First, mark stream as unattached */ + udp->attached = PJ_FALSE; /* Clear up stream infos from transport */ - udp->stream = NULL; udp->rtp_cb = NULL; udp->rtcp_cb = NULL; + udp->user_data = NULL; } @@ -487,14 +521,34 @@ static pj_status_t transport_send_rtp( pjmedia_transport *tp, { struct transport_udp *udp = (struct transport_udp*)tp; pj_ssize_t sent; + unsigned id; + struct pending_write *pw; pj_status_t status; - PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); + /* Must be attached */ + PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP); + + /* Check that the size is supported */ + PJ_ASSERT_RETURN(size <= RTP_LEN, PJ_ETOOBIG); + + id = udp->rtp_write_op_id; + pw = &udp->rtp_pending_write[id]; + + /* We need to copy packet to our buffer because when the + * operation is pending, caller might write something else + * to the original buffer. + */ + pj_memcpy(pw->buffer, pkt, size); sent = size; - status = pj_ioqueue_sendto( udp->rtp_key, &udp->rtp_write_op, - pkt, &sent, 0, - &udp->rem_rtp_addr, sizeof(pj_sockaddr_in)); + status = pj_ioqueue_sendto( udp->rtp_key, + &udp->rtp_pending_write[id].op_key, + pw->buffer, &sent, 0, + &udp->rem_rtp_addr, + sizeof(pj_sockaddr_in)); + + udp->rtp_write_op_id = (udp->rtp_write_op_id + 1) % + PJ_ARRAY_SIZE(udp->rtp_pending_write); if (status==PJ_SUCCESS || status==PJ_EPENDING) return PJ_SUCCESS; @@ -511,7 +565,7 @@ static pj_status_t transport_send_rtcp(pjmedia_transport *tp, pj_ssize_t sent; pj_status_t status; - PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP); sent = size; status = pj_ioqueue_sendto( udp->rtcp_key, &udp->rtcp_write_op, |