From 84697f0def74bfc01e0a0e6d0f904adc2db58276 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Thu, 22 Jun 2006 18:51:03 +0000 Subject: Changed siprtp to use media transport framework to handle NAT git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@540 74dad513-b988-da41-8d7b-12977e46ad98 --- pjsip-apps/src/samples/siprtp.c | 446 +++++++++++++++++++--------------------- 1 file changed, 209 insertions(+), 237 deletions(-) (limited to 'pjsip-apps') diff --git a/pjsip-apps/src/samples/siprtp.c b/pjsip-apps/src/samples/siprtp.c index 4ffe0123..29447291 100644 --- a/pjsip-apps/src/samples/siprtp.c +++ b/pjsip-apps/src/samples/siprtp.c @@ -96,11 +96,16 @@ struct codec }; -/* A bidirectional media stream */ +/* A bidirectional media stream created when the call is active. */ struct media_stream { /* Static: */ - pj_uint16_t port; /* RTP port (RTCP is +1) */ + unsigned call_index; /* Call owner. */ + unsigned media_index; /* Media index in call. */ + pjmedia_transport *transport; /* To send/recv RTP/RTCP */ + + /* Active? */ + pj_bool_t active; /* Non-zero if is in call. */ /* Current stream info: */ pjmedia_stream_info si; /* Current stream info. */ @@ -110,10 +115,6 @@ struct media_stream unsigned samples_per_frame; /* samples per frame */ unsigned bytes_per_frame; /* frame size. */ - /* Sockets: */ - pj_sock_t rtp_sock; /* RTP socket. */ - pj_sock_t rtcp_sock; /* RTCP socket. */ - /* RTP session: */ pjmedia_rtp_session out_sess; /* outgoing RTP session */ pjmedia_rtp_session in_sess; /* incoming RTP session */ @@ -122,17 +123,20 @@ struct media_stream pjmedia_rtcp_session rtcp; /* incoming RTCP session. */ /* Thread: */ - pj_bool_t thread_quit_flag; /* worker thread quit flag */ - pj_thread_t *thread; /* RTP/RTCP worker thread */ + pj_bool_t thread_quit_flag; /* Stop media thread. */ + pj_thread_t *thread; /* Media thread. */ }; +/* This is a call structure that is created when the application starts + * and only destroyed when the application quits. + */ struct call { unsigned index; pjsip_inv_session *inv; unsigned media_count; - struct media_stream media[2]; + struct media_stream media[1]; pj_time_val start_time; pj_time_val response_time; pj_time_val connect_time; @@ -141,6 +145,7 @@ struct call }; +/* Application's global variables */ static struct app { unsigned max_calls; @@ -150,7 +155,7 @@ static struct app unsigned thread_count; int sip_port; int rtp_start_port; - char *local_addr; + pj_str_t local_addr; pj_str_t local_uri; pj_str_t local_contact; @@ -168,7 +173,7 @@ static struct app pjsip_endpoint *sip_endpt; pj_bool_t thread_quit; - pj_thread_t *thread[1]; + pj_thread_t *sip_thread[1]; pjmedia_endpt *med_endpt; struct call call[MAX_CALLS]; @@ -195,7 +200,7 @@ static void call_on_forked(pjsip_inv_session *inv, pjsip_event *e); static pj_bool_t on_rx_request( pjsip_rx_data *rdata ); /* Worker thread prototype */ -static int worker_thread(void *arg); +static int sip_worker_thread(void *arg); /* Create SDP for call */ static pj_status_t create_sdp( pj_pool_t *pool, @@ -208,6 +213,15 @@ static void hangup_call(unsigned index); /* Destroy the call's media */ static void destroy_call_media(unsigned call_index); +/* Destroy media. */ +static void destroy_media(); + +/* This callback is called by media transport on receipt of RTP packet. */ +static void on_rx_rtp(void *user_data, const void *pkt, pj_ssize_t size); + +/* This callback is called by media transport on receipt of RTCP packet. */ +static void on_rx_rtcp(void *user_data, const void *pkt, pj_ssize_t size); + /* Display error */ static void app_perror(const char *sender, const char *title, pj_status_t status); @@ -246,7 +260,7 @@ struct codec audio_codecs[] = { 3, "GSM", 8000, 13200, 20, "GSM" }, { 4, "G723", 8000, 6400, 30, "G.723.1" }, { 8, "PCMA", 8000, 64000, 20, "G.711 ALaw" }, - { 18, "G729", 8000, 8000, 20, "G.729" }, + { 18, "G729", 8000, 8000, 20, "G.729" }, }; @@ -255,6 +269,7 @@ struct codec audio_codecs[] = */ static pj_status_t init_sip() { + unsigned i; pj_status_t status; /* init PJLIB-UTIL: */ @@ -267,25 +282,10 @@ static pj_status_t init_sip() /* Create application pool for misc. */ app.pool = pj_pool_create(&app.cp.factory, "app", 1000, 1000, NULL); - /* Create global endpoint: */ - { - const pj_str_t *hostname; - const char *endpt_name; - - /* Endpoint MUST be assigned a globally unique name. - * The name will be used as the hostname in Warning header. - */ - - /* For this implementation, we'll use hostname for simplicity */ - hostname = pj_gethostname(); - endpt_name = hostname->ptr; - - /* Create the endpoint: */ - - status = pjsip_endpt_create(&app.cp.factory, endpt_name, - &app.sip_endpt); - PJ_ASSERT_RETURN(status == PJ_SUCCESS, status); - } + /* Create the endpoint: */ + status = pjsip_endpt_create(&app.cp.factory, pj_gethostname()->ptr, + &app.sip_endpt); + PJ_ASSERT_RETURN(status == PJ_SUCCESS, status); /* Add UDP transport. */ @@ -298,13 +298,13 @@ static pj_status_t init_sip() addr.sin_addr.s_addr = 0; addr.sin_port = pj_htons((pj_uint16_t)app.sip_port); - if (app.local_addr) { - addrname.host = pj_str(app.local_addr); + if (app.local_addr.slen) { + addrname.host = app.local_addr; addrname.port = app.sip_port; } status = pjsip_udp_transport_start( app.sip_endpt, &addr, - (app.local_addr ? &addrname:NULL), + (app.local_addr.slen ? &addrname:NULL), 1, NULL); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Unable to start UDP transport", status); @@ -342,6 +342,9 @@ static pj_status_t init_sip() status = pjsip_endpt_register_module( app.sip_endpt, &mod_siprtp); PJ_ASSERT_RETURN(status == PJ_SUCCESS, status); + /* Init calls */ + for (i=0; iport = rtp_port; + unsigned j; - /* Create and bind RTP socket */ - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, - &m->rtp_sock); - if (status != PJ_SUCCESS) - goto on_error; - - addr.sin_port = pj_htons(rtp_port); - status = pj_sock_bind(m->rtp_sock, &addr, sizeof(addr)); - if (status != PJ_SUCCESS) { - pj_sock_close(m->rtp_sock), m->rtp_sock=0; - continue; - } - - - /* Create and bind RTCP socket */ - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, - &m->rtcp_sock); - if (status != PJ_SUCCESS) - goto on_error; - - addr.sin_port = pj_htons((pj_uint16_t)(rtp_port+1)); - status = pj_sock_bind(m->rtcp_sock, &addr, sizeof(addr)); - if (status != PJ_SUCCESS) { - pj_sock_close(m->rtp_sock), m->rtp_sock=0; - pj_sock_close(m->rtcp_sock), m->rtcp_sock=0; - continue; + /* Create transport for each media in the call */ + for (j=0; jtransport); + if (status == PJ_SUCCESS) { + rtp_port += 2; + break; + } } - } if (status != PJ_SUCCESS) goto on_error; - } /* Done */ return PJ_SUCCESS; on_error: - for (i=0; irtp_sock), m->rtp_sock=0; - pj_sock_close(m->rtcp_sock), m->rtcp_sock=0; - } - + destroy_media(); return status; } @@ -489,13 +449,15 @@ static void destroy_media() unsigned i; for (i=0; irtp_sock) - pj_sock_close(m->rtp_sock), m->rtp_sock = 0; - - if (m->rtcp_sock) - pj_sock_close(m->rtcp_sock), m->rtcp_sock = 0; + if (m->transport) { + pjmedia_transport_close(m->transport); + m->transport = NULL; + } + } } if (app.med_endpt) { @@ -794,8 +756,8 @@ static void app_perror(const char *sender, const char *title, } -/* Worker thread */ -static int worker_thread(void *arg) +/* Worker thread for SIP */ +static int sip_worker_thread(void *arg) { PJ_UNUSED_ARG(arg); @@ -863,7 +825,7 @@ static pj_status_t init_options(int argc, char *argv[]) app.thread_count = 1; app.sip_port = 5060; app.rtp_start_port = RTP_START_PORT; - app.local_addr = ip_addr; + app.local_addr = pj_str(ip_addr); app.log_level = 5; app.app_log_level = 3; app.log_filename = NULL; @@ -898,7 +860,7 @@ static pj_status_t init_options(int argc, char *argv[]) app.rtp_start_port = atoi(pj_optarg); break; case 'i': - app.local_addr = pj_optarg; + app.local_addr = pj_str(pj_optarg); break; case 'l': @@ -941,7 +903,7 @@ static pj_status_t init_options(int argc, char *argv[]) app.uri_to_call = pj_str(argv[pj_optind]); /* Build local URI and contact */ - pj_ansi_sprintf( local_uri, "sip:%s:%d", app.local_addr, app.sip_port); + pj_ansi_sprintf( local_uri, "sip:%s:%d", app.local_addr.ptr, app.sip_port); app.local_uri = pj_str(local_uri); app.local_contact = app.local_uri; @@ -965,11 +927,15 @@ static pj_status_t create_sdp( pj_pool_t *pool, pjmedia_sdp_session *sdp; pjmedia_sdp_media *m; pjmedia_sdp_attr *attr; + pjmedia_transport_udp_info tpinfo; struct media_stream *audio = &call->media[0]; PJ_ASSERT_RETURN(pool && p_sdp, PJ_EINVAL); + /* Get transport info */ + pjmedia_transport_udp_get_info(audio->transport, &tpinfo); + /* Create and initialize basic SDP session */ sdp = pj_pool_zalloc (pool, sizeof(pjmedia_sdp_session)); @@ -987,7 +953,7 @@ static pj_status_t create_sdp( pj_pool_t *pool, sdp->conn = pj_pool_zalloc (pool, sizeof(pjmedia_sdp_conn)); sdp->conn->net_type = pj_str("IN"); sdp->conn->addr_type = pj_str("IP4"); - sdp->conn->addr = pj_str(app.local_addr); + sdp->conn->addr = app.local_addr; /* SDP time and attributes. */ @@ -1002,7 +968,7 @@ static pj_status_t create_sdp( pj_pool_t *pool, /* Standard media info: */ m->desc.media = pj_str("audio"); - m->desc.port = audio->port; + m->desc.port = pj_ntohs(tpinfo.skinfo.rtp_addr_name.sin_port); m->desc.port_count = 1; m->desc.transport = pj_str("RTP/AVP"); @@ -1067,6 +1033,74 @@ static void boost_priority(void) # define boost_priority() #endif + +/* + * This callback is called by media transport on receipt of RTP packet. + */ +static void on_rx_rtp(void *user_data, const void *pkt, pj_ssize_t size) +{ + struct media_stream *strm; + pj_status_t status; + const pjmedia_rtp_hdr *hdr; + const void *payload; + unsigned payload_len; + + strm = user_data; + + /* Discard packet if media is inactive */ + if (!strm->active) + return; + + /* Check for errors */ + if (size < 0) { + app_perror(THIS_FILE, "RTP recv() error", -size); + return; + } + + /* Decode RTP packet. */ + status = pjmedia_rtp_decode_rtp(&strm->in_sess, + pkt, size, + &hdr, &payload, &payload_len); + if (status != PJ_SUCCESS) { + app_perror(THIS_FILE, "RTP decode error", status); + return; + } + + //PJ_LOG(4,(THIS_FILE, "Rx seq=%d", pj_ntohs(hdr->seq))); + + /* Update the RTCP session. */ + pjmedia_rtcp_rx_rtp(&strm->rtcp, pj_ntohs(hdr->seq), + pj_ntohl(hdr->ts), payload_len); + + /* Update RTP session */ + pjmedia_rtp_session_update(&strm->in_sess, hdr, NULL); + +} + +/* + * This callback is called by media transport on receipt of RTCP packet. + */ +static void on_rx_rtcp(void *user_data, const void *pkt, pj_ssize_t size) +{ + struct media_stream *strm; + + strm = user_data; + + /* Discard packet if media is inactive */ + if (!strm->active) + return; + + /* Check for errors */ + if (size < 0) { + app_perror(THIS_FILE, "Error receiving RTCP packet", -size); + return; + } + + /* Update RTCP session */ + pjmedia_rtcp_rx_rtcp(&strm->rtcp, pkt, size); +} + + /* * Media thread * @@ -1084,6 +1118,8 @@ static int media_thread(void *arg) /* Boost thread priority if necessary */ boost_priority(); + /* Let things settle */ + pj_thread_sleep(1000); msec_interval = strm->samples_per_frame * 1000 / strm->clock_rate; pj_get_timestamp_freq(&freq); @@ -1096,16 +1132,20 @@ static int media_thread(void *arg) while (!strm->thread_quit_flag) { - pj_fd_set_t set; pj_timestamp now, lesser; pj_time_val timeout; - int rc; + pj_bool_t send_rtp, send_rtcp; + + send_rtp = send_rtcp = PJ_FALSE; /* Determine how long to sleep */ - if (next_rtp.u64 < next_rtcp.u64) + if (next_rtp.u64 < next_rtcp.u64) { lesser = next_rtp; - else + send_rtp = PJ_TRUE; + } else { lesser = next_rtcp; + send_rtcp = PJ_TRUE; + } pj_get_timestamp(&now); if (lesser.u64 <= now.u64) { @@ -1121,76 +1161,16 @@ static int media_thread(void *arg) //printf("%d:%03d ", timeout.sec, timeout.msec); fflush(stdout); } - PJ_FD_ZERO(&set); - PJ_FD_SET(strm->rtp_sock, &set); - PJ_FD_SET(strm->rtcp_sock, &set); - - rc = pj_sock_select(FD_SETSIZE, &set, NULL, NULL, &timeout); - - if (rc < 0) { - pj_thread_sleep(10); - continue; - } - - if (rc > 0 && PJ_FD_ISSET(strm->rtp_sock, &set)) { - - /* - * Process incoming RTP packet. - */ - pj_status_t status; - pj_ssize_t size; - const pjmedia_rtp_hdr *hdr; - const void *payload; - unsigned payload_len; - - size = sizeof(packet); - status = pj_sock_recv(strm->rtp_sock, packet, &size, 0); - if (status != PJ_SUCCESS) { - app_perror(THIS_FILE, "RTP recv() error", status); - pj_thread_sleep(10); - continue; - } - - - /* Decode RTP packet. */ - status = pjmedia_rtp_decode_rtp(&strm->in_sess, - packet, size, - &hdr, - &payload, &payload_len); - if (status != PJ_SUCCESS) { - app_perror(THIS_FILE, "RTP decode error", status); - continue; - } - - /* Update the RTCP session. */ - pjmedia_rtcp_rx_rtp(&strm->rtcp, pj_ntohs(hdr->seq), - pj_ntohl(hdr->ts), payload_len); - - /* Update RTP session */ - pjmedia_rtp_session_update(&strm->in_sess, hdr, NULL); - } - - if (rc > 0 && PJ_FD_ISSET(strm->rtcp_sock, &set)) { - - /* - * Process incoming RTCP - */ - pj_status_t status; - pj_ssize_t size; - - size = sizeof(packet); - status = pj_sock_recv( strm->rtcp_sock, packet, &size, 0); - if (status != PJ_SUCCESS) { - app_perror(THIS_FILE, "Error receiving RTCP packet", status); - pj_thread_sleep(10); - } else - pjmedia_rtcp_rx_rtcp(&strm->rtcp, packet, size); - } - + /* Wait for next interval */ + //if (timeout.sec!=0 && timeout.msec!=0) { + pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); + if (strm->thread_quit_flag) + break; + //} pj_get_timestamp(&now); - if (next_rtp.u64 <= now.u64) { + if (send_rtp || next_rtp.u64 <= now.u64) { /* * Time to send RTP packet. */ @@ -1207,6 +1187,8 @@ static int media_thread(void *arg) (const void**)&hdr, &hdrlen); if (status == PJ_SUCCESS) { + //PJ_LOG(4,(THIS_FILE, "\t\tTx seq=%d", pj_ntohs(hdr->seq))); + /* Copy RTP header to packet */ pj_memcpy(packet, hdr, hdrlen); @@ -1215,13 +1197,13 @@ static int media_thread(void *arg) /* Send RTP packet */ size = hdrlen + strm->bytes_per_frame; - status = pj_sock_sendto( strm->rtp_sock, packet, &size, 0, - &strm->si.rem_addr, - sizeof(strm->si.rem_addr)); - + status = pjmedia_transport_send_rtp(strm->transport, + packet, size); if (status != PJ_SUCCESS) app_perror(THIS_FILE, "Error sending RTP packet", status); + } else { + pj_assert(!"RTP encode() error"); } /* Update RTCP SR */ @@ -1232,30 +1214,23 @@ static int media_thread(void *arg) } - if (next_rtcp.u64 <= now.u64) { + if (send_rtcp || next_rtcp.u64 <= now.u64) { /* * Time to send RTCP packet. */ pjmedia_rtcp_pkt *rtcp_pkt; int rtcp_len; - pj_sockaddr_in rem_addr; pj_ssize_t size; - int port; pj_status_t status; /* Build RTCP packet */ pjmedia_rtcp_build_rtcp(&strm->rtcp, &rtcp_pkt, &rtcp_len); - /* Calculate address based on RTP address */ - rem_addr = strm->si.rem_addr; - port = pj_ntohs(strm->si.rem_addr.sin_port) + 1; - rem_addr.sin_port = pj_htons((pj_uint16_t)port); - /* Send packet */ size = rtcp_len; - status = pj_sock_sendto(strm->rtcp_sock, rtcp_pkt, &size, 0, - &rem_addr, sizeof(rem_addr)); + status = pjmedia_transport_send_rtcp(strm->transport, + rtcp_pkt, size); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error sending RTCP packet", status); } @@ -1338,6 +1313,16 @@ static void call_on_media_update( pjsip_inv_session *inv, audio->samples_per_frame, 0); + /* Attach media to transport */ + status = pjmedia_transport_attach(audio->transport, audio, + &audio->si.rem_addr, + sizeof(pj_sockaddr_in), + &on_rx_rtp, + &on_rx_rtcp); + if (status != PJ_SUCCESS) { + app_perror(THIS_FILE, "Error on pjmedia_transport_attach()", status); + return; + } /* Start media thread. */ audio->thread_quit_flag = 0; @@ -1345,7 +1330,11 @@ static void call_on_media_update( pjsip_inv_session *inv, 0, 0, &audio->thread); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error creating media thread", status); + return; } + + /* Set the media as active */ + audio->active = PJ_TRUE; } @@ -1356,39 +1345,16 @@ static void destroy_call_media(unsigned call_index) struct media_stream *audio = &app.call[call_index].media[0]; if (audio->thread) { + + audio->active = PJ_FALSE; + audio->thread_quit_flag = 1; pj_thread_join(audio->thread); pj_thread_destroy(audio->thread); audio->thread = NULL; audio->thread_quit_flag = 0; - /* Flush RTP/RTCP packets */ - { - pj_fd_set_t set; - pj_time_val timeout = {0, 0}; - char packet[1500]; - pj_ssize_t size; - pj_status_t status; - int rc; - - do { - PJ_FD_ZERO(&set); - PJ_FD_SET(audio->rtp_sock, &set); - PJ_FD_SET(audio->rtcp_sock, &set); - - rc = pj_sock_select(FD_SETSIZE, &set, NULL, NULL, &timeout); - if (rc > 0 && PJ_FD_ISSET(audio->rtp_sock, &set)) { - size = sizeof(packet); - status = pj_sock_recv(audio->rtp_sock, packet, &size, 0); - - } - if (rc > 0 && PJ_FD_ISSET(audio->rtcp_sock, &set)) { - size = sizeof(packet); - status = pj_sock_recv(audio->rtcp_sock, packet, &size, 0); - } - - } while (rc > 0); - } + pjmedia_transport_detach(audio->transport, audio); } } @@ -1431,6 +1397,12 @@ static void hangup_all_calls() continue; hangup_call(i); } + + /* Wait until all calls are terminated */ + for (i=0; i