diff options
author | Mark Spencer <markster@digium.com> | 2004-04-27 15:18:55 +0000 |
---|---|---|
committer | Mark Spencer <markster@digium.com> | 2004-04-27 15:18:55 +0000 |
commit | 7596de9ac8176bb00a5f65d1cd24b29877c75683 (patch) | |
tree | 5ce43cdb4a3e94eef384e872f54099972d91d2e1 /channels | |
parent | 5385ca0a0e995904852027e5bc460c483c3263db (diff) |
Extreme IAX2 trunking performance improvements
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@2783 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'channels')
-rwxr-xr-x | channels/chan_iax2.c | 297 |
1 files changed, 187 insertions, 110 deletions
diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index 30ebb5e30..9b9748196 100755 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -230,10 +230,6 @@ struct iax2_peer { int delme; /* I need to be deleted */ int temponly; /* I'm only a temp */ int trunk; /* Treat as an IAX trunking */ - struct timeval txtrunktime; /* Transmit trunktime */ - struct timeval rxtrunktime; /* Receive trunktime */ - struct timeval lasttxtime; /* Last transmitted trunktime */ - unsigned int lastsent; /* Last sent time */ /* Qualification */ int callno; /* Call number of POKE request */ @@ -246,6 +242,28 @@ struct iax2_peer { int notransfer; }; +#define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr)) + +static struct iax2_trunk_peer { + ast_mutex_t lock; + struct sockaddr_in addr; + struct timeval txtrunktime; /* Transmit trunktime */ + struct timeval rxtrunktime; /* Receive trunktime */ + struct timeval lasttxtime; /* Last transmitted trunktime */ + struct timeval trunkact; /* Last trunk activity */ + unsigned int lastsent; /* Last sent time */ + /* Trunk data and length */ + unsigned char *trunkdata; + unsigned int trunkdatalen; + unsigned int trunkdataalloc; + struct iax2_trunk_peer *next; + int trunkerror; + int calls; + int firstcallno; +} *tpeers = NULL; + +static ast_mutex_t tpeerlock = AST_MUTEX_INITIALIZER; + struct iax_firmware { struct iax_firmware *next; int fd; @@ -290,7 +308,8 @@ static struct iax2_registry *registrations; #define MAX_RETRY_TIME 10000 #define MAX_JITTER_BUFFER 50 -#define MAX_TRUNKDATA 640 /* 40ms, uncompressed linear */ +#define DEFAULT_TRUNKDATA 640 * 10 /* 40ms, uncompressed linear * 10 channels */ +#define MAX_TRUNKDATA 640 * 200 /* 40ms, uncompressed linear * 200 channels */ /* If we have more than this much excess real jitter buffer, srhink it. */ static int max_jitter_buffer = MAX_JITTER_BUFFER; @@ -426,10 +445,6 @@ struct chan_iax2_pvt { int amaflags; /* This is part of a trunk interface */ int trunk; - /* Trunk data and length */ - unsigned char trunkdata[MAX_TRUNKDATA]; - unsigned int trunkdatalen; - int trunkerror; struct iax2_dpcache *dpentries; int notransfer; /* do we want native bridging */ }; @@ -2530,39 +2545,38 @@ static struct ast_channel *ast_iax2_new(struct chan_iax2_pvt *i, int state, int return tmp; } -static unsigned int calc_txpeerstamp(struct iax2_peer *peer, int sampms) +static unsigned int calc_txpeerstamp(struct iax2_trunk_peer *tpeer, int sampms, struct timeval *tv) { - struct timeval tv; long int mssincetx; long int ms, pred; - gettimeofday(&tv, NULL); - mssincetx = (tv.tv_sec - peer->lasttxtime.tv_sec) * 1000 + (tv.tv_usec - peer->lasttxtime.tv_usec) / 1000; + tpeer->trunkact = *tv; + mssincetx = (tv->tv_sec - tpeer->lasttxtime.tv_sec) * 1000 + (tv->tv_usec - tpeer->lasttxtime.tv_usec) / 1000; if (mssincetx > 5000) { /* If it's been at least 5 seconds since the last time we transmitted on this trunk, reset our timers */ - peer->txtrunktime.tv_sec = tv.tv_sec; - peer->txtrunktime.tv_usec = tv.tv_usec; - peer->lastsent = 999999; + tpeer->txtrunktime.tv_sec = tv->tv_sec; + tpeer->txtrunktime.tv_usec = tv->tv_usec; + tpeer->lastsent = 999999; } /* Update last transmit time now */ - peer->lasttxtime.tv_sec = tv.tv_sec; - peer->lasttxtime.tv_usec = tv.tv_usec; + tpeer->lasttxtime.tv_sec = tv->tv_sec; + tpeer->lasttxtime.tv_usec = tv->tv_usec; /* Calculate ms offset */ - ms = (tv.tv_sec - peer->txtrunktime.tv_sec) * 1000 + (tv.tv_usec - peer->txtrunktime.tv_usec) / 1000; + ms = (tv->tv_sec - tpeer->txtrunktime.tv_sec) * 1000 + (tv->tv_usec - tpeer->txtrunktime.tv_usec) / 1000; /* Predict from last value */ - pred = peer->lastsent + sampms; + pred = tpeer->lastsent + sampms; if (abs(ms - pred) < 640) ms = pred; /* We never send the same timestamp twice, so fudge a little if we must */ - if (ms == peer->lastsent) - ms = peer->lastsent + 1; - peer->lastsent = ms; + if (ms == tpeer->lastsent) + ms = tpeer->lastsent + 1; + tpeer->lastsent = ms; return ms; } -static unsigned int fix_peerts(struct iax2_peer *peer, int callno, unsigned int ts) +static unsigned int fix_peerts(struct iax2_trunk_peer *peer, int callno, unsigned int ts) { long ms; /* NOT unsigned */ if (!iaxs[callno]->rxcore.tv_sec && !iaxs[callno]->rxcore.tv_usec) { @@ -2677,6 +2691,85 @@ static unsigned int calc_rxstamp(struct chan_iax2_pvt *p) return ms; } +struct iax2_trunk_peer *find_tpeer(struct sockaddr_in *sin) +{ + struct iax2_trunk_peer *tpeer; + /* Finds and locks trunk peer */ + ast_mutex_lock(&tpeerlock); + tpeer = tpeers; + while(tpeer) { + /* We don't lock here because tpeer->addr *never* changes */ + if (!inaddrcmp(&tpeer->addr, sin)) { + ast_mutex_lock(&tpeer->lock); + break; + } + tpeer = tpeer->next; + } + if (!tpeer) { + tpeer = malloc(sizeof(struct iax2_trunk_peer)); + if (tpeer) { + memset(tpeer, 0, sizeof(struct iax2_trunk_peer)); + ast_mutex_init(&tpeer->lock); + tpeer->lastsent = 9999; + memcpy(&tpeer->addr, sin, sizeof(tpeer->addr)); + gettimeofday(&tpeer->trunkact, NULL); + ast_mutex_lock(&tpeer->lock); + tpeer->next = tpeers; + tpeers = tpeer; + ast_log(LOG_DEBUG, "Created trunk peer for '%s:%d'\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port)); + } + } + ast_mutex_unlock(&tpeerlock); + return tpeer; +} + +static int iax2_trunk_queue(struct chan_iax2_pvt *pvt, struct ast_frame *f) +{ + struct iax2_trunk_peer *tpeer; + void *tmp, *ptr; + struct ast_iax2_meta_trunk_entry *met; + tpeer = find_tpeer(&pvt->addr); + if (tpeer) { + if (tpeer->trunkdatalen + f->datalen + 4 >= tpeer->trunkdataalloc) { + /* Need to reallocate space */ + if (tpeer->trunkdataalloc < MAX_TRUNKDATA) { + tmp = realloc(tpeer->trunkdata, tpeer->trunkdataalloc + DEFAULT_TRUNKDATA + IAX2_TRUNK_PREFACE); + if (tmp) { + tpeer->trunkdataalloc += DEFAULT_TRUNKDATA; + tpeer->trunkdata = tmp; + ast_log(LOG_DEBUG, "Expanded trunk '%s:%d' to %d bytes\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), tpeer->trunkdataalloc); + } else { + ast_log(LOG_WARNING, "Insufficient memory to expand trunk data to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port)); + ast_mutex_unlock(&tpeer->lock); + return -1; + } + } else { + ast_log(LOG_WARNING, "Maximum trunk data space exceeded to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port)); + ast_mutex_unlock(&tpeer->lock); + return -1; + } + } + + /* Append to meta frame */ + ptr = tpeer->trunkdata + IAX2_TRUNK_PREFACE + tpeer->trunkdatalen; + met = (struct ast_iax2_meta_trunk_entry *)ptr; + /* Store call number and length in meta header */ + met->callno = htons(pvt->callno); + met->len = htons(f->datalen); + /* Advance pointers/decrease length past trunk entry header */ + ptr += sizeof(struct ast_iax2_meta_trunk_entry); + tpeer->trunkdatalen += sizeof(struct ast_iax2_meta_trunk_entry); + /* Copy actual trunk data */ + memcpy(ptr, f->data, f->datalen); + tpeer->trunkdatalen += f->datalen; + if (!tpeer->firstcallno) + tpeer->firstcallno = pvt->callno; + tpeer->calls++; + ast_mutex_unlock(&tpeer->lock); + } + return 0; +} + static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned int ts, int seqno, int now, int transfer, int final) { /* Queue a packet for delivery on a given private structure. Use "ts" for @@ -2794,17 +2887,7 @@ static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned in res = iax2_transmit(fr); } else { if (pvt->trunk) { - /* Queue for transmission in a meta frame */ - if ((sizeof(pvt->trunkdata) - pvt->trunkdatalen) >= fr->af.datalen) { - memcpy(pvt->trunkdata + pvt->trunkdatalen, fr->af.data, fr->af.datalen); - pvt->trunkdatalen += fr->af.datalen; - res = 0; - pvt->trunkerror = 0; - } else { - if (!pvt->trunkerror) - ast_log(LOG_WARNING, "Out of trunk data space on call number %d, dropping\n", pvt->callno); - pvt->trunkerror = 1; - } + iax2_trunk_queue(pvt, &fr->af); res = 0; } else if (fr->af.frametype == AST_FRAME_VIDEO) { /* Video frame have no sequence number */ @@ -4192,96 +4275,70 @@ static int iax2_poke_peer_s(void *data) return 0; } -static int send_trunk(struct iax2_peer *peer) +static int send_trunk(struct iax2_trunk_peer *tpeer, struct timeval *now) { - int x; - int calls = 0; int res = 0; - int firstcall = 0; - unsigned char buf[65536 + sizeof(struct iax_frame)], *ptr; - int len = 65536; struct iax_frame *fr; struct ast_iax2_meta_hdr *meta; struct ast_iax2_meta_trunk_hdr *mth; - struct ast_iax2_meta_trunk_entry *met; + int calls = 0; /* Point to frame */ - fr = (struct iax_frame *)buf; + fr = (struct iax_frame *)tpeer->trunkdata; /* Point to meta data */ meta = (struct ast_iax2_meta_hdr *)fr->afdata; mth = (struct ast_iax2_meta_trunk_hdr *)meta->data; - /* Point past meta data for first meta trunk entry */ - ptr = fr->afdata + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr); - len -= sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr); - - /* Search through trunked calls for a match with this peer */ - for (x=TRUNK_CALL_START;x<maxtrunkcall; x++) { - ast_mutex_lock(&iaxsl[x]); -#if 0 - if (iaxtrunkdebug) - ast_verbose("Call %d is at %s:%d (%d)\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port), iaxs[x]->addr.sin_family); -#endif - if (iaxs[x] && iaxs[x]->trunk && iaxs[x]->trunkdatalen && !inaddrcmp(&iaxs[x]->addr, &peer->addr)) { - if (iaxtrunkdebug) - ast_verbose(" -- Sending call %d via trunk to %s:%d\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port)); - if (len >= iaxs[x]->trunkdatalen + sizeof(struct ast_iax2_meta_trunk_entry)) { - met = (struct ast_iax2_meta_trunk_entry *)ptr; - /* Store call number and length in meta header */ - met->callno = htons(x); - met->len = htons(iaxs[x]->trunkdatalen); - /* Advance pointers/decrease length past trunk entry header */ - ptr += sizeof(struct ast_iax2_meta_trunk_entry); - len -= sizeof(struct ast_iax2_meta_trunk_entry); - /* Copy actual trunk data */ - memcpy(ptr, iaxs[x]->trunkdata, iaxs[x]->trunkdatalen); - /* Advance pointeres/decrease length for actual data */ - ptr += iaxs[x]->trunkdatalen; - len -= iaxs[x]->trunkdatalen; - } else - ast_log(LOG_WARNING, "Out of space in frame for trunking call %d\n", x); - iaxs[x]->trunkdatalen = 0; - calls++; - if (!firstcall) - firstcall = x; - } - ast_mutex_unlock(&iaxsl[x]); - } - if (calls) { + if (tpeer->trunkdatalen) { /* We're actually sending a frame, so fill the meta trunk header and meta header */ meta->zeros = 0; meta->metacmd = IAX_META_TRUNK; meta->cmddata = 0; - mth->ts = htonl(calc_txpeerstamp(peer, trunkfreq)); + mth->ts = htonl(calc_txpeerstamp(tpeer, trunkfreq, now)); /* And the rest of the ast_iax2 header */ fr->direction = DIRECTION_OUTGRESS; fr->retrans = -1; fr->transfer = 0; /* Any appropriate call will do */ - fr->callno = firstcall; + fr->callno = tpeer->firstcallno; fr->data = fr->afdata; - fr->datalen = 65536 - len; + fr->datalen = tpeer->trunkdatalen + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr); #if 0 ast_log(LOG_DEBUG, "Trunking %d calls in %d bytes, ts=%d\n", calls, fr->datalen, ntohl(mth->ts)); #endif res = send_packet(fr); + calls = tpeer->calls; + /* Reset transmit trunk side data */ + tpeer->trunkdatalen = 0; + tpeer->calls = 0; + tpeer->firstcallno = 0; } if (res < 0) return res; return calls; } +static inline int iax2_trunk_expired(struct iax2_trunk_peer *tpeer, struct timeval *now) +{ + /* Drop when trunk is about 5 seconds idle */ + if (now->tv_sec > tpeer->trunkact.tv_sec + 5) + return 1; + return 0; +} + static int timing_read(int *id, int fd, short events, void *cbdata) { char buf[1024]; int res; - struct iax2_peer *peer; + struct iax2_trunk_peer *tpeer, *prev = NULL, *drop=NULL; int processed = 0; int totalcalls = 0; #ifdef ZT_TIMERACK int x = 1; #endif + struct timeval now; if (iaxtrunkdebug) ast_verbose("Beginning trunk processing\n"); + gettimeofday(&now, NULL); if (events & AST_IO_PRI) { #ifdef ZT_TIMERACK /* Great, this is a timing interface, just call the ioctl */ @@ -4299,20 +4356,43 @@ static int timing_read(int *id, int fd, short events, void *cbdata) } } /* For each peer that supports trunking... */ - ast_mutex_lock(&peerl.lock); - peer = peerl.peers; - while(peer) { - if (peer->trunk) { - processed++; - res = send_trunk(peer); + ast_mutex_lock(&tpeerlock); + tpeer = tpeers; + while(tpeer) { + processed++; + ast_mutex_lock(&tpeer->lock); + /* We can drop a single tpeer per pass. That makes all this logic + substantially easier */ + if (!drop && iax2_trunk_expired(tpeer, &now)) { + /* Take it out of the list, but don't free it yet, because it + could be in use */ + if (prev) + prev->next = tpeer->next; + else + tpeers = tpeer->next; + drop = tpeer; + } else { + res = send_trunk(tpeer, &now); if (iaxtrunkdebug) - ast_verbose("Processed trunk peer '%s' (%s:%d) with %d call(s)\n", peer->name, inet_ntoa(peer->addr.sin_addr), ntohs(peer->addr.sin_port), res); - totalcalls += res; - res = 0; - } - peer = peer->next; + ast_verbose("Processed trunk peer (%s:%d) with %d call(s)\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), res); + } + totalcalls += res; + res = 0; + ast_mutex_unlock(&tpeer->lock); + prev = tpeer; + tpeer = tpeer->next; + } + ast_mutex_unlock(&tpeerlock); + if (drop) { + ast_mutex_lock(&drop->lock); + /* Once we have this lock, we're sure nobody else is using it or could use it once we release it, + because by the time they could get tpeerlock, we've already grabbed it */ + ast_log(LOG_DEBUG, "Dropping unused iax2 trunk peer '%s:%d'\n", inet_ntoa(drop->addr.sin_addr), ntohs(drop->addr.sin_port)); + free(drop->trunkdata); + ast_mutex_unlock(&drop->lock); + free(drop); + } - ast_mutex_unlock(&peerl.lock); if (iaxtrunkdebug) ast_verbose("Ending trunk processing with %d peers and %d calls processed\n", processed, totalcalls); iaxtrunkdebug =0; @@ -4466,6 +4546,7 @@ static int iax_park(struct ast_channel *chan1, struct ast_channel *chan2) return -1; } + static int socket_read(int *id, int fd, short events, void *cbdata) { struct sockaddr_in sin; @@ -4488,6 +4569,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata) struct ast_channel *c; struct iax2_dpcache *dp; struct iax2_peer *peer; + struct iax2_trunk_peer *tpeer; struct iax_ies ies; struct iax_ie_data ied0, ied1; int format; @@ -4526,21 +4608,16 @@ static int socket_read(int *id, int fd, short events, void *cbdata) ts = ntohl(mth->ts); res -= (sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr)); ptr = mth->data; - ast_mutex_lock(&peerl.lock); - peer = peerl.peers; - while(peer) { - if (!inaddrcmp(&peer->addr, &sin)) - break; - peer = peer->next; - } - ast_mutex_unlock(&peerl.lock); - if (!peer) { + tpeer = find_tpeer(&sin); + if (!tpeer) { ast_log(LOG_WARNING, "Unable to accept trunked packet from '%s:%d': No matching peer\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); return 1; } - if (!ts || (!peer->rxtrunktime.tv_sec && !peer->rxtrunktime.tv_usec)) { - gettimeofday(&peer->rxtrunktime, NULL); - } + if (!ts || (!tpeer->rxtrunktime.tv_sec && !tpeer->rxtrunktime.tv_usec)) { + gettimeofday(&tpeer->rxtrunktime, NULL); + tpeer->trunkact = tpeer->rxtrunktime; + } else + gettimeofday(&tpeer->trunkact, NULL); while(res >= sizeof(struct ast_iax2_meta_trunk_entry)) { /* Process channels */ mte = (struct ast_iax2_meta_trunk_entry *)ptr; @@ -4566,7 +4643,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata) f.data = ptr; else f.data = NULL; - fr.ts = fix_peerts(peer, fr.callno, ts); + fr.ts = fix_peerts(tpeer, fr.callno, ts); /* Don't pass any packets until we're started */ if ((iaxs[fr.callno]->state & IAX_STATE_STARTED)) { /* Common things */ @@ -4602,6 +4679,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata) ptr += len; res -= len; } + ast_mutex_unlock(&tpeer->lock); } return 1; @@ -5746,7 +5824,6 @@ static struct iax2_peer *build_peer(char *name, struct ast_variable *v) memset(peer, 0, sizeof(struct iax2_peer)); peer->expire = -1; peer->pokeexpire = -1; - peer->lastsent = 999999; } if (peer) { if (!found) { |