diff options
author | Benny Prijono <bennylp@teluu.com> | 2006-06-22 18:49:45 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2006-06-22 18:49:45 +0000 |
commit | 0935d5a1236d6a78e346c22a0b62442eff95bd41 (patch) | |
tree | 0d9da39e6fcdc35c9986d12083cd891e57c9b4ec /pjmedia | |
parent | dd4c4eb0397a77c84ca6818e2073b374bef79dd1 (diff) |
Added better API for media transport, and fixed bugs with pending RTP write operation in UDP media transport
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@539 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia')
-rw-r--r-- | pjmedia/include/pjmedia/transport.h | 133 | ||||
-rw-r--r-- | pjmedia/include/pjmedia/transport_udp.h | 45 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/transport_udp.c | 146 |
3 files changed, 266 insertions, 58 deletions
diff --git a/pjmedia/include/pjmedia/transport.h b/pjmedia/include/pjmedia/transport.h index 66678d7f..f1d38a90 100644 --- a/pjmedia/include/pjmedia/transport.h +++ b/pjmedia/include/pjmedia/transport.h @@ -66,28 +66,37 @@ struct pjmedia_transport_op * to be used by the stream for the first time, and it tells the transport * about remote RTP address to send the packet and some callbacks to be * called for incoming packets. + * + * Application should call #pjmedia_transport_attach() instead of + * calling this function directly. */ pj_status_t (*attach)(pjmedia_transport *tp, - pjmedia_stream *strm, + void *user_data, const pj_sockaddr_t *rem_addr, unsigned addr_len, - void (*rtp_cb)(pjmedia_stream*, - const void*, - pj_ssize_t), - void (*rtcp_cb)(pjmedia_stream*, - const void*, - pj_ssize_t)); + void (*rtp_cb)(void *user_data, + const void *pkt, + pj_ssize_t size), + void (*rtcp_cb)(void *user_data, + const void *pkt, + pj_ssize_t size)); /** * This function is called by the stream when the stream is no longer * need the transport (normally when the stream is about to be closed). + * + * Application should call #pjmedia_transport_detach() instead of + * calling this function directly. */ void (*detach)(pjmedia_transport *tp, - pjmedia_stream *strm); + void *user_data); /** * This function is called by the stream to send RTP packet using the * transport. + * + * Application should call #pjmedia_transport_send_rtp() instead of + * calling this function directly. */ pj_status_t (*send_rtp)(pjmedia_transport *tp, const void *pkt, @@ -96,6 +105,9 @@ struct pjmedia_transport_op /** * This function is called by the stream to send RTCP packet using the * transport. + * + * Application should call #pjmedia_transport_send_rtcp() instead of + * calling this function directly. */ pj_status_t (*send_rtcp)(pjmedia_transport *tp, const void *pkt, @@ -103,6 +115,9 @@ struct pjmedia_transport_op /** * This function can be called to destroy this transport. + * + * Application should call #pjmedia_transport_close() instead of + * calling this function directly. */ pj_status_t (*destroy)(pjmedia_transport *tp); }; @@ -129,6 +144,108 @@ struct pjmedia_transport }; + +/** + * Attach callbacks to be called on receipt of incoming RTP/RTCP packets. + * This is just a simple wrapper which calls <tt>attach()</tt> member of + * the transport. + * + * @param tp The media transport. + * @param user_data Arbitrary user data to be set when the callbacks are + * called. + * @param rem_addr Remote RTP address to send RTP packet to. + * @param addr_len Length of the remote address. + * @param rtp_cb Callback to be called when RTP packet is received on + * the transport. + * @param rtcp_cb Callback to be called when RTCP packet is received on + * the transport. + * + * @return PJ_SUCCESS on success, or the appropriate error code. + */ +PJ_INLINE(pj_status_t) pjmedia_transport_attach(pjmedia_transport *tp, + void *user_data, + const pj_sockaddr_t *rem_addr, + unsigned addr_len, + void (*rtp_cb)(void *user_data, + const void *pkt, + pj_ssize_t), + void (*rtcp_cb)(void *usr_data, + const void*pkt, + pj_ssize_t)) +{ + return tp->op->attach(tp, user_data, rem_addr, addr_len, rtp_cb, rtcp_cb); +} + + +/** + * Detach callbacks from the transport. + * This is just a simple wrapper which calls <tt>attach()</tt> member of + * the transport. + * + * @param tp The media transport. + * @param user_data User data which must match the previously set value + * on attachment. + */ +PJ_INLINE(void) pjmedia_transport_detach(pjmedia_transport *tp, + void *user_data) +{ + tp->op->detach(tp, user_data); +} + + +/** + * Send RTP packet with the specified media transport. This is just a simple + * wrapper which calls <tt>send_rtp()</tt> member of the transport. + * + * @param tp The media transport. + * @param pkt The packet to send. + * @param size Size of the packet. + * + * @return PJ_SUCCESS on success, or the appropriate error code. + */ +PJ_INLINE(pj_status_t) pjmedia_transport_send_rtp(pjmedia_transport *tp, + const void *pkt, + pj_size_t size) +{ + return (*tp->op->send_rtp)(tp, pkt, size); +} + + +/** + * Send RTCP packet with the specified media transport. This is just a simple + * wrapper which calls <tt>send_rtcp()</tt> member of the transport. + * + * @param tp The media transport. + * @param pkt The packet to send. + * @param size Size of the packet. + * + * @return PJ_SUCCESS on success, or the appropriate error code. + */ +PJ_INLINE(pj_status_t) pjmedia_transport_send_rtcp(pjmedia_transport *tp, + const void *pkt, + pj_size_t size) +{ + return (*tp->op->send_rtcp)(tp, pkt, size); +} + + +/** + * Close media transport. This is just a simple wrapper which calls + * <tt>destroy()</tt> member of the transport. + * + * @param tp The media transport. + * + * @return PJ_SUCCESS on success, or the appropriate error code. + */ +PJ_INLINE(pj_status_t) pjmedia_transport_close(pjmedia_transport *tp) +{ + if (tp->op->destroy) + return (*tp->op->destroy)(tp); + else + return PJ_SUCCESS; +} + + PJ_END_DECL /** diff --git a/pjmedia/include/pjmedia/transport_udp.h b/pjmedia/include/pjmedia/transport_udp.h index 536aeb7a..bf40f83b 100644 --- a/pjmedia/include/pjmedia/transport_udp.h +++ b/pjmedia/include/pjmedia/transport_udp.h @@ -56,7 +56,20 @@ enum pjmedia_transport_udp_options /** - * Create an RTP and RTCP sockets and bind RTP the socket to the specified + * UDP transport info. + */ +typedef struct pjmedia_transport_udp_info +{ + /** + * Media socket info. + */ + pjmedia_sock_info skinfo; + +} pjmedia_transport_udp_info; + + +/** + * Create an RTP and RTCP sockets and bind the sockets to the specified * port to create media transport. * * @param endpt The media endpoint instance. @@ -76,15 +89,39 @@ PJ_DECL(pj_status_t) pjmedia_transport_udp_create(pjmedia_endpt *endpt, /** + * Create an RTP and RTCP sockets and bind the sockets to the specified + * address and port to create media transport. + * + * @param endpt The media endpoint instance. + * @param name Optional name to be assigned to the transport. + * @param addr Optional local address to bind the sockets to. If this + * argument is NULL or empty, the sockets will be bound + * to all interface. + * @param port UDP port number for the RTP socket. The RTCP port number + * will be set to one above RTP port. + * @param options Options, bitmask of #pjmedia_transport_udp_options. + * @param p_tp Pointer to receive the transport instance. + * + * @return PJ_SUCCESS on success. + */ +PJ_DECL(pj_status_t) pjmedia_transport_udp_create2(pjmedia_endpt *endpt, + const char *name, + const pj_str_t *addr, + int port, + unsigned options, + pjmedia_transport **p_tp); + +/** * Get media socket info from the specified UDP transport. * * @param tp The UDP transport interface. - * @param i Media socket info to be initialized. + * @param info Media socket info to be initialized. * * @return PJ_SUCCESS on success. */ -PJ_DECL(pj_status_t) pjmedia_transport_udp_get_sock_info(pjmedia_transport *tp, - pjmedia_sock_info *i); +PJ_DECL(pj_status_t) +pjmedia_transport_udp_get_info( pjmedia_transport *tp, + pjmedia_transport_udp_info *info); /** diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c index 2ba16659..31501e4e 100644 --- a/pjmedia/src/pjmedia/transport_udp.c +++ b/pjmedia/src/pjmedia/transport_udp.c @@ -30,6 +30,16 @@ /* Maximum size of incoming RTCP packet */ #define RTCP_LEN 600 +/* Maximum pending write operations */ +#define MAX_PENDING 4 + +/* Pending write buffer */ +typedef struct pending_write +{ + char buffer[RTP_LEN]; + pj_ioqueue_op_key_t op_key; +} pending_write; + struct transport_udp { @@ -37,7 +47,8 @@ struct transport_udp pj_pool_t *pool; /**< Memory pool */ unsigned options; /**< Transport options. */ - pjmedia_stream *stream; /**< Stream user (may be NULL) */ + void *user_data; /**< Only valid when attached */ + pj_bool_t attached; /**< Has attachment? */ pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address */ pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address */ void (*rtp_cb)( pjmedia_stream*,/**< To report incoming RTP. */ @@ -51,7 +62,8 @@ struct transport_udp pj_sockaddr_in rtp_addr_name; /**< Published RTP address. */ pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */ pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */ - pj_ioqueue_op_key_t rtp_write_op; /**< Pending write operation */ + unsigned rtp_write_op_id;/**< Next write_op to use */ + pending_write rtp_pending_write[MAX_PENDING]; /**< Pending write */ pj_sockaddr_in rtp_src_addr; /**< Actual packet src addr. */ unsigned rtp_src_cnt; /**< How many pkt from this addr. */ int rtp_addrlen; /**< Address length. */ @@ -75,17 +87,17 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); static pj_status_t transport_attach( pjmedia_transport *tp, - pjmedia_stream *strm, + void *user_data, const pj_sockaddr_t *rem_addr, unsigned addr_len, - void (*rtp_cb)(pjmedia_stream*, + void (*rtp_cb)(void*, const void*, pj_ssize_t), - void (*rtcp_cb)(pjmedia_stream*, + void (*rtcp_cb)(void*, const void*, pj_ssize_t)); static void transport_detach( pjmedia_transport *tp, - pjmedia_stream *strm); + void *strm); static pj_status_t transport_send_rtp( pjmedia_transport *tp, const void *pkt, pj_size_t size); @@ -113,6 +125,20 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt, unsigned options, pjmedia_transport **p_tp) { + return pjmedia_transport_udp_create2(endpt, name, NULL, port, options, + p_tp); +} + +/** + * Create UDP stream transport. + */ +PJ_DEF(pj_status_t) pjmedia_transport_udp_create2(pjmedia_endpt *endpt, + const char *name, + const pj_str_t *addr, + int port, + unsigned options, + pjmedia_transport **p_tp) +{ pjmedia_sock_info si; pj_status_t status; @@ -130,9 +156,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt, goto on_error; /* Bind RTP socket */ - si.rtp_addr_name.sin_family = PJ_AF_INET; - si.rtp_addr_name.sin_port = pj_htons((pj_uint16_t)port); - + pj_sockaddr_in_init(&si.rtp_addr_name, addr, (pj_uint16_t)port); status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name, sizeof(si.rtp_addr_name)); if (status != PJ_SUCCESS) @@ -145,9 +169,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt, goto on_error; /* Bind RTCP socket */ - si.rtcp_addr_name.sin_family = PJ_AF_INET; - si.rtcp_addr_name.sin_port = pj_htons((pj_uint16_t)(port+1)); - + pj_sockaddr_in_init(&si.rtcp_addr_name, addr, (pj_uint16_t)(port+1)); status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name, sizeof(si.rtcp_addr_name)); if (status != PJ_SUCCESS) @@ -181,6 +203,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, pj_ioqueue_t *ioqueue; pj_ioqueue_callback rtp_cb, rtcp_cb; pj_ssize_t size; + unsigned i; pj_status_t status; @@ -223,7 +246,9 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, goto on_error; pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op)); - pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op)); + for (i=0; i<PJ_ARRAY_SIZE(tp->rtp_pending_write); ++i) + pj_ioqueue_op_key_init(&tp->rtp_pending_write[i].op_key, + sizeof(tp->rtp_pending_write[i].op_key)); /* Kick of pending RTP read from the ioqueue */ tp->rtp_addrlen = sizeof(tp->rtp_src_addr); @@ -270,16 +295,17 @@ on_error: /* * Get media socket info. */ -PJ_DEF(pj_status_t) pjmedia_transport_udp_get_sock_info(pjmedia_transport *tp, - pjmedia_sock_info *inf) +PJ_DEF(pj_status_t) +pjmedia_transport_udp_get_info( pjmedia_transport *tp, + pjmedia_transport_udp_info *inf) { struct transport_udp *udp = (struct transport_udp*)tp; PJ_ASSERT_RETURN(tp && inf, PJ_EINVAL); - inf->rtp_sock = udp->rtp_sock; - inf->rtp_addr_name = udp->rtp_addr_name; - inf->rtcp_sock = udp->rtcp_sock; - inf->rtcp_addr_name = udp->rtcp_addr_name; + inf->skinfo.rtp_sock = udp->rtp_sock; + inf->skinfo.rtp_addr_name = udp->rtp_addr_name; + inf->skinfo.rtcp_sock = udp->rtcp_sock; + inf->skinfo.rtcp_addr_name = udp->rtcp_addr_name; return PJ_SUCCESS; } @@ -296,7 +322,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp) PJ_ASSERT_RETURN(tp, PJ_EINVAL); /* Must not close while stream is using this */ - PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP); if (udp->rtp_key) { @@ -334,14 +360,14 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, udp = pj_ioqueue_get_user_data(key); do { - void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); - pjmedia_stream *stream; + void (*cb)(void*,const void*,pj_ssize_t); + void *user_data; cb = udp->rtp_cb; - stream = udp->stream; + user_data = udp->user_data; - if (bytes_read > 0 && cb && stream) - (*cb)(stream, udp->rtp_pkt, bytes_read); + if (udp->attached && cb) + (*cb)(user_data, udp->rtp_pkt, bytes_read); /* See if source address of RTP packet is different than the * configured address, and switch RTP remote address to @@ -405,14 +431,14 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key, udp = pj_ioqueue_get_user_data(key); do { - void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); - pjmedia_stream *stream; + void (*cb)(void*,const void*,pj_ssize_t); + void *user_data; cb = udp->rtcp_cb; - stream = udp->stream; + user_data = udp->user_data; - if (bytes_read > 0 && cb && stream) - (*cb)(stream, udp->rtcp_pkt, bytes_read); + if (udp->attached && cb) + (*cb)(user_data, udp->rtcp_pkt, bytes_read); bytes_read = sizeof(udp->rtcp_pkt); status = pj_ioqueue_recv(udp->rtcp_key, &udp->rtcp_read_op, @@ -424,23 +450,23 @@ static void on_rx_rtcp(pj_ioqueue_key_t *key, /* Called by stream to initialize the transport */ static pj_status_t transport_attach( pjmedia_transport *tp, - pjmedia_stream *strm, + void *user_data, const pj_sockaddr_t *rem_addr, unsigned addr_len, - void (*rtp_cb)(pjmedia_stream*, + void (*rtp_cb)(void*, const void*, pj_ssize_t), - void (*rtcp_cb)(pjmedia_stream*, + void (*rtcp_cb)(void*, const void*, pj_ssize_t)) { struct transport_udp *udp = (struct transport_udp*) tp; /* Validate arguments */ - PJ_ASSERT_RETURN(tp && strm && rem_addr && addr_len, PJ_EINVAL); + PJ_ASSERT_RETURN(tp && rem_addr && addr_len, PJ_EINVAL); /* Must not be "attached" to existing stream */ - PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP); /* "Attach" the stream: */ @@ -455,9 +481,10 @@ static pj_status_t transport_attach( pjmedia_transport *tp, /* Save the callbacks */ udp->rtp_cb = rtp_cb; udp->rtcp_cb = rtcp_cb; + udp->user_data = user_data; - /* Last, save the stream to mark that we have a "client" */ - udp->stream = strm; + /* Last, mark transport as attached */ + udp->attached = PJ_TRUE; return PJ_SUCCESS; } @@ -465,18 +492,25 @@ static pj_status_t transport_attach( pjmedia_transport *tp, /* Called by stream when it no longer needs the transport */ static void transport_detach( pjmedia_transport *tp, - pjmedia_stream *strm) + void *user_data) { struct transport_udp *udp = (struct transport_udp*) tp; - pj_assert(tp && strm); + pj_assert(tp); + + /* User data is unreferenced on Release build */ + PJ_UNUSED_ARG(user_data); + + /* As additional checking, check if the same user data is specified */ + pj_assert(user_data == udp->user_data); - PJ_UNUSED_ARG(strm); + /* First, mark stream as unattached */ + udp->attached = PJ_FALSE; /* Clear up stream infos from transport */ - udp->stream = NULL; udp->rtp_cb = NULL; udp->rtcp_cb = NULL; + udp->user_data = NULL; } @@ -487,14 +521,34 @@ static pj_status_t transport_send_rtp( pjmedia_transport *tp, { struct transport_udp *udp = (struct transport_udp*)tp; pj_ssize_t sent; + unsigned id; + struct pending_write *pw; pj_status_t status; - PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); + /* Must be attached */ + PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP); + + /* Check that the size is supported */ + PJ_ASSERT_RETURN(size <= RTP_LEN, PJ_ETOOBIG); + + id = udp->rtp_write_op_id; + pw = &udp->rtp_pending_write[id]; + + /* We need to copy packet to our buffer because when the + * operation is pending, caller might write something else + * to the original buffer. + */ + pj_memcpy(pw->buffer, pkt, size); sent = size; - status = pj_ioqueue_sendto( udp->rtp_key, &udp->rtp_write_op, - pkt, &sent, 0, - &udp->rem_rtp_addr, sizeof(pj_sockaddr_in)); + status = pj_ioqueue_sendto( udp->rtp_key, + &udp->rtp_pending_write[id].op_key, + pw->buffer, &sent, 0, + &udp->rem_rtp_addr, + sizeof(pj_sockaddr_in)); + + udp->rtp_write_op_id = (udp->rtp_write_op_id + 1) % + PJ_ARRAY_SIZE(udp->rtp_pending_write); if (status==PJ_SUCCESS || status==PJ_EPENDING) return PJ_SUCCESS; @@ -511,7 +565,7 @@ static pj_status_t transport_send_rtcp(pjmedia_transport *tp, pj_ssize_t sent; pj_status_t status; - PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); + PJ_ASSERT_RETURN(udp->attached, PJ_EINVALIDOP); sent = size; status = pj_ioqueue_sendto( udp->rtcp_key, &udp->rtcp_write_op, |