this patch consists of two bits of work submitted as one patch. the first bit fixed a "pacing" problem, where a tcp connection rate-limited by the reading process would experience 10% of the expected throughput, and could even get into live lock. it was noticed at the time of this initial work that the stack often sent tiny grams. some good bits from nix' original tcp were merged in. the test program /n/sources/contrib/quanstro/tcptest.c will verify that under most conditions, a reader-paced connection now gets the expected throughput. expected arguments would be tcptest -s1 -n 5000 -l the second bit is a first step in preparing tcp to handle modest (1-2MB) bandwidth-delay products. the strategy was to completely implement NewReno. the testing network was a 7/35/70ms by 100Mbit wan emulator with 0/.05/.1% loss. here are the performance comparisons from the changes after the first round "old" to the submitted patch "new". the smallest improvement was 80%, the largest was 11x. loss% rtt old new 0.10 7 4.40 7.85 0.10 35 0.88 1.79 0.10 70 0.47 0.84 0.05 7 4.80 9.38 0.05 35 1.00 2.02 0.05 70 0.52 1.77 0.01 7 5.33 11.87 0.01 35 1.14 10.97 0.01 70 0.54 4.75 0.00 7 4.49 11.92 0.00 35 1.04 11.35 0.00 70 0.58 10.56 since the diff is not very easy to read, i wrote a small paper detailing the changes http://www.quanstro.net/plan9/tcp/tcp.pdf - erik Notes: Mon Jan 21 15:13:13 EST 2013 geoff still being exercised and tested. Reference: /n/sources/patch/applied/tcp-bdp Date: Thu Nov 29 23:00:01 CET 2012 Signed-off-by: quanstro@quanstro.net Reviewed-by: geoff --- /sys/src/9/ip/tcp.c Thu Nov 29 22:48:40 2012 +++ /sys/src/9/ip/tcp.c Fri Nov 30 05:38:36 2012 @@ -81,7 +81,13 @@ NLHT = 256, /* hash table size, must be a power of 2 */ LHTMASK = NLHT-1, - HaveWS = 1<<8, + /* + * window is 64kb · 2ⁿ + * these factors determine the ultimate bandwidth-delay product. + * 64kb · 2⁵ = 2mb, or 2x overkill for 100mbps · 70ms. + */ + Maxqscale = 4, /* maximum queuing scale */ + Defadvscale = 4, /* default advertisement */ }; /* Must correspond to the enumeration above */ @@ -169,8 +175,9 @@ ulong seq; ulong ack; uchar flags; - ushort ws; /* window scale option (if not zero) */ - ulong wnd; + uchar update; + ushort ws; /* window scale option */ + ulong wnd; /* prescaled window*/ ushort urg; ushort mss; /* max segment size option (if not zero) */ ushort len; /* size of data */ @@ -205,44 +212,53 @@ ulong wnd; /* Tcp send window */ ulong urg; /* Urgent data pointer */ ulong wl2; - int scale; /* how much to right shift window in xmitted packets */ + uint scale; /* how much to right shift window in xmitted packets */ /* to implement tahoe and reno TCP */ ulong dupacks; /* number of duplicate acks rcvd */ + ulong partialack; int recovery; /* loss recovery flag */ - ulong rxt; /* right window marker for recovery */ + int retransmit; /* retransmit 1 packet @ una flag */ + int rto; + ulong rxt; /* right window marker for recovery "recover" rfc3782 */ } snd; struct { ulong nxt; /* Receive pointer to next uchar slot */ ulong wnd; /* Receive window incoming */ + ulong wsnt; /* Last wptr sent. important to track for large bdp */ + ulong wptr; ulong urg; /* Urgent pointer */ + ulong ackptr; /* last acked sequence */ int blocked; - int una; /* unacked data segs */ - int scale; /* how much to left shift window in rcved packets */ + uint scale; /* how much to left shift window in rcv'd packets */ } rcv; ulong iss; /* Initial sequence number */ - int sawwsopt; /* true if we saw a wsopt on the incoming SYN */ ulong cwind; /* Congestion window */ - int scale; /* desired snd.scale */ - ushort ssthresh; /* Slow start threshold */ + ulong abcbytes; /* appropriate byte counting rfc 3465 */ + uint scale; /* desired snd.scale */ + ulong ssthresh; /* Slow start threshold */ int resent; /* Bytes just resent */ int irs; /* Initial received squence */ ushort mss; /* Maximum segment size */ int rerecv; /* Overlap of data rerecevived */ - ulong window; /* Receive window */ + ulong window; /* Our receive window (queue) */ + uint qscale; /* Log2 of our receive window (queue) */ uchar backoff; /* Exponential backoff counter */ int backedoff; /* ms we've backed off for rexmits */ uchar flags; /* State flags */ Reseq *reseq; /* Resequencing queue */ + int nreseq; + int reseqlen; Tcptimer timer; /* Activity timer */ Tcptimer acktimer; /* Acknowledge timer */ Tcptimer rtt_timer; /* Round trip timer */ Tcptimer katimer; /* keep alive timer */ ulong rttseq; /* Round trip sequence */ - int srtt; /* Shortened round trip */ + int srtt; /* Smoothed round trip */ int mdev; /* Mean deviation of round trip */ int kacounter; /* count down for keep alive */ uint sndsyntime; /* time syn sent */ ulong time; /* time Finwait2 or Syn_received was sent */ + ulong timeuna; /* snd.una when time was set */ int nochecksum; /* non-zero means don't send checksums */ int flgcnt; /* number of flags in the sequence (FIN,SEQ) */ @@ -285,7 +301,6 @@ }; int tcp_irtt = DEF_RTT; /* Initial guess at round trip time */ -ushort tcp_mss = DEF_MSS; /* Maximum segment size to be sent */ enum { /* MIB stats */ @@ -298,6 +313,7 @@ InSegs, OutSegs, RetransSegs, + RetransSegsSent, RetransTimeouts, InErrs, OutRsts, @@ -306,12 +322,24 @@ CsumErrs, HlenErrs, LenErrs, + Resequenced, OutOfOrder, + ReseqBytelim, + ReseqPktlim, + Delayack, + Wopenack, + + Recovery, + RecoveryDone, + RecoveryRTO, + RecoveryNoSeq, + RecoveryCwind, + RecoveryPA, Nstats }; -static char *statnames[] = +static char *statnames[Nstats] = { [MaxConn] "MaxConn", [Mss] "MaxSegment", @@ -322,6 +350,7 @@ [InSegs] "InSegs", [OutSegs] "OutSegs", [RetransSegs] "RetransSegs", +[RetransSegsSent] "RetransSegsSent", [RetransTimeouts] "RetransTimeouts", [InErrs] "InErrs", [OutRsts] "OutRsts", @@ -329,6 +358,19 @@ [HlenErrs] "HlenErrs", [LenErrs] "LenErrs", [OutOfOrder] "OutOfOrder", +[Resequenced] "Resequenced", +[ReseqBytelim] "ReseqBytelim", +[ReseqPktlim] "ReseqPktlim", +[Delayack] "Delayack", +[Wopenack] "Wopenack", + +[Recovery] "Recovery", +[RecoveryDone] "RecoveryDone", +[RecoveryRTO] "RecoveryRTO", + +[RecoveryNoSeq] "RecoveryNoSeq", +[RecoveryCwind] "RecoveryCwind", +[RecoveryPA] "RecoveryPA", }; typedef struct Tcppriv Tcppriv; @@ -363,29 +405,29 @@ */ int tcpporthogdefense = 0; -int addreseq(Tcpctl*, Tcppriv*, Tcp*, Block*, ushort); -void getreseq(Tcpctl*, Tcp*, Block**, ushort*); -void localclose(Conv*, char*); -void procsyn(Conv*, Tcp*); -void tcpiput(Proto*, Ipifc*, Block*); -void tcpoutput(Conv*); -int tcptrim(Tcpctl*, Tcp*, Block**, ushort*); -void tcpstart(Conv*, int); -void tcptimeout(void*); -void tcpsndsyn(Conv*, Tcpctl*); -void tcprcvwin(Conv*); -void tcpacktimer(void*); -void tcpkeepalive(void*); -void tcpsetkacounter(Tcpctl*); -void tcprxmit(Conv*); -void tcpsettimer(Tcpctl*); -void tcpsynackrtt(Conv*); -void tcpsetscale(Conv*, Tcpctl*, ushort, ushort); +static int addreseq(Fs*, Tcpctl*, Tcppriv*, Tcp*, Block*, ushort); +static int dumpreseq(Tcpctl*); +static void getreseq(Tcpctl*, Tcp*, Block**, ushort*); +static void limbo(Conv*, uchar*, uchar*, Tcp*, int); +static void limborexmit(Proto*); +static void localclose(Conv*, char*); +static void procsyn(Conv*, Tcp*); +static void tcpacktimer(void*); +static void tcpiput(Proto*, Ipifc*, Block*); +static void tcpkeepalive(void*); +static void tcpoutput(Conv*); +static void tcprcvwin(Conv*); +static void tcprxmit(Conv*); +static void tcpsetkacounter(Tcpctl*); +static void tcpsetscale(Conv*, Tcpctl*, ushort, ushort); +static void tcpsettimer(Tcpctl*); +static void tcpsndsyn(Conv*, Tcpctl*); +static void tcpstart(Conv*, int); +static void tcpsynackrtt(Conv*); +static void tcptimeout(void*); +static int tcptrim(Tcpctl*, Tcp*, Block**, ushort*); -static void limborexmit(Proto*); -static void limbo(Conv*, uchar*, uchar*, Tcp*, int); - -void +static void tcpsetstate(Conv *s, uchar newstate) { Tcpctl *tcb; @@ -405,11 +447,6 @@ if(newstate == Established) tpriv->stats[CurrEstab]++; - /** - print( "%d/%d %s->%s CurrEstab=%d\n", s->lport, s->rport, - tcpstates[oldstate], tcpstates[newstate], tpriv->tstats.tcpCurrEstab ); - **/ - switch(newstate) { case Closed: qclose(s->rq); @@ -454,12 +491,14 @@ s = (Tcpctl*)(c->ptcl); return snprint(state, n, - "%s qin %d qout %d srtt %d mdev %d cwin %lud swin %lud>>%d rwin %lud>>%d timer.start %d timer.count %d rerecv %d katimer.start %d katimer.count %d\n", + "%s qin %d qout %d rq %d.%d srtt %d mdev %d sst %lud cwin %lud swin %lud>>%d rwin %lud>>%d qscale %d timer.start %d timer.count %d rerecv %d katimer.start %d katimer.count %d\n", tcpstates[s->state], c->rq ? qlen(c->rq) : 0, c->wq ? qlen(c->wq) : 0, - s->srtt, s->mdev, + s->nreseq, s->reseqlen, + s->srtt, s->mdev, s->ssthresh, s->cwind, s->snd.wnd, s->rcv.scale, s->rcv.wnd, s->snd.scale, + s->qscale, s->timer.start, s->timer.count, s->rerecv, s->katimer.start, s->katimer.count); } @@ -536,7 +575,7 @@ } } -void +static void tcpkick(void *x) { Conv *s = x; @@ -558,7 +597,6 @@ /* * Push data */ - tcprcvwin(s); tcpoutput(s); break; default: @@ -570,7 +608,9 @@ poperror(); } -void +static int seq_lt(ulong, ulong); + +static void tcprcvwin(Conv *s) /* Call with tcb locked */ { int w; @@ -580,14 +620,20 @@ w = tcb->window - qlen(s->rq); if(w < 0) w = 0; - if(w == 0) - netlog(s->p->f, Logtcp, "tcprcvwim: window %lud qlen %d\n", tcb->window, qlen(s->rq)); - tcb->rcv.wnd = w; - if(w == 0) + /* RFC 1122 § 4.2.2.17 do not move right edge of window left */ + if(seq_lt(tcb->rcv.nxt + w, tcb->rcv.wptr)) + w = tcb->rcv.wptr - tcb->rcv.nxt; + if(w != tcb->rcv.wnd) + if(w>>tcb->rcv.scale == 0 || tcb->window > 4*tcb->mss && w < tcb->mss/4){ tcb->rcv.blocked = 1; + netlog(s->p->f, Logtcp, "tcprcvwin: window %lud qlen %d ws %ud lport %d\n", + tcb->window, qlen(s->rq), tcb->rcv.scale, s->lport); + } + tcb->rcv.wnd = w; + tcb->rcv.wptr = tcb->rcv.nxt + w; } -void +static void tcpacktimer(void *v) { Tcpctl *tcb; @@ -603,7 +649,6 @@ qlock(s); if(tcb->state != Closed){ tcb->flags |= FORCE; - tcprcvwin(s); tcpoutput(s); } qunlock(s); @@ -611,10 +656,52 @@ } static void +tcpcongestion(Tcpctl *tcb) +{ + ulong inflight; + + inflight = tcb->snd.nxt - tcb->snd.una; + if(inflight > tcb->cwind) + inflight = tcb->cwind; + tcb->ssthresh = inflight / 2; + if(tcb->ssthresh < 2*tcb->mss) + tcb->ssthresh = 2*tcb->mss; +} + +enum { + L = 2, /* aggressive slow start; legal values ∈ (1.0, 2.0) */ +}; + +static void +tcpabcincr(Tcpctl *tcb, uint acked) +{ + uint limit; + + tcb->abcbytes += acked; + if(tcb->cwind < tcb->ssthresh){ + /* slow start */ + if(tcb->snd.rto) + limit = 1*tcb->mss; + else + limit = L*tcb->mss; + tcb->cwind += MIN(tcb->abcbytes, limit); + tcb->abcbytes = 0; + } + else{ + tcb->snd.rto = 0; + /* avoidance */ + if(tcb->abcbytes >= tcb->cwind){ + tcb->abcbytes -= tcb->cwind; + tcb->cwind += tcb->mss; + } + } +} + +static void tcpcreate(Conv *c) { c->rq = qopen(QMAX, Qcoalesce, tcpacktimer, c); - c->wq = qopen((3*QMAX)/2, Qkick, tcpkick, c); + c->wq = qopen(QMAX, Qkick, tcpkick, c); } static void @@ -649,7 +736,7 @@ t->state = newstate; } -void +static void tcpackproc(void *a) { Tcptimer *t, *tp, *timeo; @@ -695,7 +782,7 @@ } } -void +static void tcpgo(Tcppriv *priv, Tcptimer *t) { if(t == nil || t->start == 0) @@ -707,7 +794,7 @@ qunlock(&priv->tl); } -void +static void tcphalt(Tcppriv *priv, Tcptimer *t) { if(t == nil) @@ -718,17 +805,16 @@ qunlock(&priv->tl); } -int +static int backoff(int n) { return 1 << n; } -void +static void localclose(Conv *s, char *reason) /* called with tcb locked */ { Tcpctl *tcb; - Reseq *rp,*rp1; Tcppriv *tpriv; tpriv = s->p->priv; @@ -742,12 +828,7 @@ tcphalt(tpriv, &tcb->katimer); /* Flush reassembly queue; nothing more can arrive */ - for(rp = tcb->reseq; rp != nil; rp = rp1) { - rp1 = rp->next; - freeblist(rp->bp); - free(rp); - } - tcb->reseq = nil; + dumpreseq(tcb); if(tcb->state == Syn_sent) Fsconnected(s, reason); @@ -761,8 +842,8 @@ } /* mtu (- TCP + IP hdr len) of 1st hop */ -int -tcpmtu(Proto *tcp, uchar *addr, int version, int *scale) +static int +tcpmtu(Proto *tcp, uchar *addr, int version, uint *scale) { Ipifc *ifc; int mtu; @@ -781,22 +862,16 @@ mtu = ifc->maxtu - ifc->m->hsize - (TCP6_PKT + TCP6_HDRSIZE); break; } - if(ifc != nil){ - if(ifc->mbps > 1000) - *scale = HaveWS | 4; - else if(ifc->mbps > 100) - *scale = HaveWS | 3; - else if(ifc->mbps > 10) - *scale = HaveWS | 1; - else - *scale = HaveWS | 0; - } else - *scale = HaveWS | 0; + /* + * set the ws. it doesn't commit us to anything. + * ws is the ultimate limit to the bandwidth-delay product. + */ + *scale = Defadvscale; return mtu; } -void +static void inittcpctl(Conv *s, int mode) { Tcpctl *tcb; @@ -809,7 +884,7 @@ memset(tcb, 0, sizeof(Tcpctl)); - tcb->ssthresh = 65535; + tcb->ssthresh = QMAX; /* reset by tcpsetscale() */ tcb->srtt = tcp_irtt<mdev = 0; @@ -858,21 +933,18 @@ } tcb->mss = tcb->cwind = mss; + tcb->abcbytes = 0; tpriv = s->p->priv; tpriv->stats[Mss] = tcb->mss; /* default is no window scaling */ - tcb->window = QMAX; - tcb->rcv.wnd = QMAX; - tcb->rcv.scale = 0; - tcb->snd.scale = 0; - qsetlimit(s->rq, QMAX); + tcpsetscale(s, tcb, 0, 0); } /* * called with s qlocked */ -void +static void tcpstart(Conv *s, int mode) { Tcpctl *tcb; @@ -884,7 +956,7 @@ if(tpriv->ackprocstarted == 0){ qlock(&tpriv->apl); if(tpriv->ackprocstarted == 0){ - sprint(kpname, "#I%dtcpack", s->p->f->dev); + snprint(kpname, sizeof(kpname), "#I%dtcpack", s->p->f->dev); kproc(kpname, tcpackproc, s->p); tpriv->ackprocstarted = 1; } @@ -914,28 +986,28 @@ } static char* -tcpflag(ushort flag) +tcpflag(char *buf, char *e, ushort flag) { - static char buf[128]; + char *p; - sprint(buf, "%d", flag>>10); /* Head len */ + p = seprint(buf, e, "%d", flag>>10); /* Head len */ if(flag & URG) - strcat(buf, " URG"); + p = seprint(p, e, " URG"); if(flag & ACK) - strcat(buf, " ACK"); + p = seprint(p, e, " ACK"); if(flag & PSH) - strcat(buf, " PSH"); + p = seprint(p, e, " PSH"); if(flag & RST) - strcat(buf, " RST"); + p = seprint(p, e, " RST"); if(flag & SYN) - strcat(buf, " SYN"); + p = seprint(p, e, " SYN"); if(flag & FIN) - strcat(buf, " FIN"); - + p = seprint(p, e, " FIN"); + USED(p); return buf; } -Block * +static Block* htontcp6(Tcp *tcph, Block *data, Tcp6hdr *ph, Tcpctl *tcb) { int dlen; @@ -992,7 +1064,6 @@ *opt++ = MSSOPT; *opt++ = MSS_LENGTH; hnputs(opt, tcph->mss); -// print("our outgoing mss %d\n", tcph->mss); opt += 2; } if(tcph->ws != 0){ @@ -1020,7 +1091,7 @@ return data; } -Block * +static Block* htontcp4(Tcp *tcph, Block *data, Tcp4hdr *ph, Tcpctl *tcb) { int dlen; @@ -1033,7 +1104,7 @@ if(tcph->flags & SYN){ if(tcph->mss) hdrlen += MSS_LENGTH; - if(tcph->ws) + if(1) hdrlen += WS_LENGTH; optpad = hdrlen & 3; if(optpad) @@ -1075,7 +1146,8 @@ hnputs(opt, tcph->mss); opt += 2; } - if(tcph->ws != 0){ + /* always offer. rfc1323 §2.2 */ + if(1){ *opt++ = WSOPT; *opt++ = WS_LENGTH; *opt++ = tcph->ws; @@ -1094,7 +1166,7 @@ return data; } -int +static int ntohtcp6(Tcp *tcph, Block **bpp) { Tcp6hdr *h; @@ -1123,6 +1195,7 @@ tcph->urg = nhgets(h->tcpurg); tcph->mss = 0; tcph->ws = 0; + tcph->update = 0; tcph->len = nhgets(h->ploadlen) - hdrlen; *bpp = pullupblock(*bpp, hdrlen+TCP6_PKT); @@ -1147,7 +1220,7 @@ break; case WSOPT: if(optlen == WS_LENGTH && *(optr+2) <= 14) - tcph->ws = HaveWS | *(optr+2); + tcph->ws = *(optr+2); break; } n -= optlen; @@ -1156,7 +1229,7 @@ return hdrlen; } -int +static int ntohtcp4(Tcp *tcph, Block **bpp) { Tcp4hdr *h; @@ -1186,6 +1259,7 @@ tcph->urg = nhgets(h->tcpurg); tcph->mss = 0; tcph->ws = 0; + tcph->update = 0; tcph->len = nhgets(h->length) - (hdrlen + TCP4_PKT); *bpp = pullupblock(*bpp, hdrlen+TCP4_PKT); @@ -1205,14 +1279,12 @@ break; switch(*optr) { case MSSOPT: - if(optlen == MSS_LENGTH) { + if(optlen == MSS_LENGTH) tcph->mss = nhgets(optr+2); -// print("new incoming mss %d\n", tcph->mss); - } break; case WSOPT: if(optlen == WS_LENGTH && *(optr+2) <= 14) - tcph->ws = HaveWS | *(optr+2); + tcph->ws = *(optr+2); break; } n -= optlen; @@ -1222,10 +1294,10 @@ } /* - * For outgiing calls, generate an initial sequence + * For outgoing calls, generate an initial sequence * number and put a SYN on the send queue */ -void +static void tcpsndsyn(Conv *s, Tcpctl *tcb) { Tcppriv *tpriv; @@ -1234,6 +1306,7 @@ tcb->rttseq = tcb->iss; tcb->snd.wl2 = tcb->iss; tcb->snd.una = tcb->iss; + tcb->snd.rxt = tcb->iss; tcb->snd.ptr = tcb->rttseq; tcb->snd.nxt = tcb->rttseq; tcb->flgcnt++; @@ -1333,7 +1406,7 @@ * send a reset to the remote side and close the conversation * called with s qlocked */ -char* +static char* tcphangup(Conv *s) { Tcp seg; @@ -1348,7 +1421,7 @@ memset(&seg, 0, sizeof seg); seg.flags = RST | ACK; seg.ack = tcb->rcv.nxt; - tcb->rcv.una = 0; + tcb->rcv.ackptr = seg.ack; seg.seq = tcb->snd.ptr; seg.wnd = 0; seg.urg = 0; @@ -1379,14 +1452,14 @@ /* * (re)send a SYN ACK */ -int +static int sndsynack(Proto *tcp, Limbo *lp) { Block *hbp; Tcp4hdr ph4; Tcp6hdr ph6; Tcp seg; - int scale; + uint scale; /* make pseudo header */ switch(lp->version) { @@ -1420,8 +1493,6 @@ seg.flags = SYN|ACK; seg.urg = 0; seg.mss = tcpmtu(tcp, lp->laddr, lp->version, &scale); -// if (seg.mss > lp->mss && lp->mss >= 512) -// seg.mss = lp->mss; seg.wnd = QMAX; /* if the other side set scale, we should too */ @@ -1599,6 +1670,18 @@ } } +static void +initialwindow(Tcpctl *tcb) +{ + /* RFC 3390 initial window */ + if(tcb->mss < 1095) + tcb->cwind = 4*tcb->mss; + else if(tcb->mss < 2190) + tcb->cwind = 4380; + else + tcb->cwind = 2*tcb->mss; +} + /* * come here when we finally get an ACK to our SYN-ACK. * lookup call in limbo. if found, create a new conversation @@ -1670,6 +1753,8 @@ tcb->irs = lp->irs; tcb->rcv.nxt = tcb->irs+1; + tcb->rcv.wptr = tcb->rcv.nxt; + tcb->rcv.wsnt = 0; tcb->rcv.urg = tcb->rcv.nxt; tcb->iss = lp->iss; @@ -1678,6 +1763,7 @@ tcb->snd.una = tcb->iss+1; tcb->snd.ptr = tcb->iss+1; tcb->snd.nxt = tcb->iss+1; + tcb->snd.rxt = tcb->iss+1; tcb->flgcnt = 0; tcb->flags |= SYNACK; @@ -1690,9 +1776,9 @@ /* window scaling */ tcpsetscale(new, tcb, lp->rcvscale, lp->sndscale); - /* the congestion window always starts out as a single segment */ + /* congestion window */ tcb->snd.wnd = segp->wnd; - tcb->cwind = tcb->mss; + initialwindow(tcb); /* set initial round trip time */ tcb->sndsyntime = lp->lastsend+lp->rexmits*SYNACK_RXTIMER; @@ -1731,7 +1817,7 @@ return new; } -int +static int seq_within(ulong x, ulong low, ulong high) { if(low <= high){ @@ -1745,25 +1831,25 @@ return 0; } -int +static int seq_lt(ulong x, ulong y) { return (int)(x-y) < 0; } -int +static int seq_le(ulong x, ulong y) { return (int)(x-y) <= 0; } -int +static int seq_gt(ulong x, ulong y) { return (int)(x-y) > 0; } -int +static int seq_ge(ulong x, ulong y) { return (int)(x-y) >= 0; @@ -1773,7 +1859,7 @@ * use the time between the first SYN and it's ack as the * initial round trip time */ -void +static void tcpsynackrtt(Conv *s) { Tcpctl *tcb; @@ -1791,46 +1877,59 @@ tcphalt(tpriv, &tcb->rtt_timer); } -void +static void update(Conv *s, Tcp *seg) { int rtt, delta; Tcpctl *tcb; ulong acked; - ulong expand; Tcppriv *tpriv; + if(seg->update) + return; + seg->update = 1; + tpriv = s->p->priv; tcb = (Tcpctl*)s->ptcl; - /* if everything has been acked, force output(?) */ - if(seq_gt(seg->ack, tcb->snd.nxt)) { - tcb->flags |= FORCE; - return; + /* catch zero-window updates, update window & recover */ + if(tcb->snd.wnd == 0 && seg->wnd > 0) + if(seq_lt(seg->ack, tcb->snd.ptr)){ + netlog(s->p->f, Logtcp, "tcp: zwu ack %lud una %lud ptr %lud win %lud\n", + seg->ack, tcb->snd.una, tcb->snd.ptr, seg->wnd); + tcb->snd.wnd = seg->wnd; + goto recovery; } - /* added by Dong Lin for fast retransmission */ - if(seg->ack == tcb->snd.una - && tcb->snd.una != tcb->snd.nxt - && seg->len == 0 - && seg->wnd == tcb->snd.wnd) { - - /* this is a pure ack w/o window update */ - netlog(s->p->f, Logtcprxmt, "dupack %lud ack %lud sndwnd %lud advwin %lud\n", - tcb->snd.dupacks, seg->ack, tcb->snd.wnd, seg->wnd); - - if(++tcb->snd.dupacks == TCPREXMTTHRESH) { - /* - * tahoe tcp rxt the packet, half sshthresh, - * and set cwnd to one packet - */ + /* newreno fast retransmit */ + if(seg->ack == tcb->snd.una) + if(tcb->snd.una != tcb->snd.nxt) + if(++tcb->snd.dupacks == 3){ +recovery: + if(tcb->snd.recovery){ + tpriv->stats[RecoveryCwind]++; + tcb->cwind += tcb->mss; + }else if(seq_le(tcb->snd.rxt, seg->ack)){ + tpriv->stats[Recovery]++; + tcb->abcbytes = 0; tcb->snd.recovery = 1; + tcb->snd.partialack = 0; tcb->snd.rxt = tcb->snd.nxt; - netlog(s->p->f, Logtcprxmt, "fast rxt %lud, nxt %lud\n", tcb->snd.una, tcb->snd.nxt); + tcpcongestion(tcb); + tcb->cwind = tcb->ssthresh + 3*tcb->mss; + netlog(s->p->f, Logtcpwin, "recovery inflate %ld ss %ld @%lud\n", + tcb->cwind, tcb->ssthresh, tcb->snd.rxt); tcprxmit(s); - } else { - /* do reno tcp here. */ - } + }else{ + tpriv->stats[RecoveryNoSeq]++; + netlog(s->p->f, Logtcpwin, "!recov %lud not ≤ %lud %ld\n", + tcb->snd.rxt, seg->ack, tcb->snd.rxt - seg->ack); + /* do not enter fast retransmit */ + /* do not change ssthresh */ + } + }else if(tcb->snd.recovery){ + tpriv->stats[RecoveryCwind]++; + tcb->cwind += tcb->mss; } /* @@ -1838,6 +1937,9 @@ */ if(seq_gt(seg->ack, tcb->snd.wl2) || (tcb->snd.wl2 == seg->ack && seg->wnd > tcb->snd.wnd)){ + /* clear dupack if we advance wl2 */ + if(tcb->snd.wl2 != seg->ack) + tcb->snd.dupacks = 0; tcb->snd.wnd = seg->wnd; tcb->snd.wl2 = seg->ack; } @@ -1847,22 +1949,11 @@ * don't let us hangup if sending into a closed window and * we're still getting acks */ - if((tcb->flags&RETRAN) && tcb->snd.wnd == 0){ + if((tcb->flags&RETRAN) && tcb->snd.wnd == 0) tcb->backedoff = MAXBACKMS/4; - } return; } - /* - * any positive ack turns off fast rxt, - * (should we do new-reno on partial acks?) - */ - if(!tcb->snd.recovery || seq_ge(seg->ack, tcb->snd.rxt)) { - tcb->snd.dupacks = 0; - tcb->snd.recovery = 0; - } else - netlog(s->p->f, Logtcp, "rxt next %lud, cwin %lud\n", seg->ack, tcb->cwind); - /* Compute the new send window size */ acked = seg->ack - tcb->snd.una; @@ -1874,24 +1965,41 @@ goto done; } - /* slow start as long as we're not recovering from lost packets */ - if(tcb->cwind < tcb->snd.wnd && !tcb->snd.recovery) { - if(tcb->cwind < tcb->ssthresh) { - expand = tcb->mss; - if(acked < expand) - expand = acked; + /* + * congestion control + */ + if(tcb->snd.recovery){ + if(seq_ge(seg->ack, tcb->snd.rxt)){ + /* recovery finished; deflate window */ + tpriv->stats[RecoveryDone]++; + tcb->snd.dupacks = 0; + tcb->snd.recovery = 0; + tcb->cwind = (tcb->snd.nxt - tcb->snd.una) + tcb->mss; + if(tcb->ssthresh < tcb->cwind) + tcb->cwind = tcb->ssthresh; + netlog(s->p->f, Logtcpwin, "recovery deflate %ld %ld\n", + tcb->cwind, tcb->ssthresh); + } else { + /* partial ack; we lost more than one segment */ + tpriv->stats[RecoveryPA]++; + if(tcb->cwind > acked) + tcb->cwind -= acked; + else{ + netlog(s->p->f, Logtcpwin, "partial ack neg\n"); + tcb->cwind = tcb->mss; + } + netlog(s->p->f, Logtcpwin, "partial ack %ld left %ld cwind %ld\n", + acked, tcb->snd.rxt - seg->ack, tcb->cwind); + + if(acked >= tcb->mss) + tcb->cwind += tcb->mss; + tcb->snd.partialack++; } - else - expand = ((int)tcb->mss * tcb->mss) / tcb->cwind; - - if(tcb->cwind + expand < tcb->cwind) - expand = tcb->snd.wnd - tcb->cwind; - if(tcb->cwind + expand > tcb->snd.wnd) - expand = tcb->snd.wnd - tcb->cwind; - tcb->cwind += expand; - } + } else + tcpabcincr(tcb, acked); /* Adjust the timers according to the round trip time */ + /* todo: fix sloppy treatment of overflow cases here. */ if(tcb->rtt_timer.state == TcptimerON && seq_ge(seg->ack, tcb->rttseq)) { tcphalt(tpriv, &tcb->rtt_timer); if((tcb->flags&RETRAN) == 0) { @@ -1922,25 +2030,36 @@ done: if(qdiscard(s->wq, acked) < acked) tcb->flgcnt--; - tcb->snd.una = seg->ack; + + /* newreno fast recovery */ + if(tcb->snd.recovery) + tcprxmit(s); + if(seq_gt(seg->ack, tcb->snd.urg)) tcb->snd.urg = seg->ack; - if(tcb->snd.una != tcb->snd.nxt) - tcpgo(tpriv, &tcb->timer); + if(tcb->snd.una != tcb->snd.nxt){ + /* “impatient” variant */ + if(!tcb->snd.recovery || tcb->snd.partialack == 1){ + tcb->time = NOW; + tcb->timeuna = tcb->snd.una; + tcpgo(tpriv, &tcb->timer); + } + } else tcphalt(tpriv, &tcb->timer); if(seq_lt(tcb->snd.ptr, tcb->snd.una)) tcb->snd.ptr = tcb->snd.una; - tcb->flags &= ~RETRAN; + if(!tcb->snd.recovery) + tcb->flags &= ~RETRAN; tcb->backoff = 0; tcb->backedoff = 0; } -void +static void tcpiput(Proto *tcp, Ipifc*, Block *bp) { Tcp seg; @@ -1962,7 +2081,6 @@ h4 = (Tcp4hdr*)(bp->rp); h6 = (Tcp6hdr*)(bp->rp); - memset(&seg, 0, sizeof seg); if((h4->vihl&0xF0)==IP_VER4) { version = V4; @@ -2171,11 +2289,12 @@ } /* Cut the data to fit the receive window */ + tcprcvwin(s); if(tcptrim(tcb, &seg, &bp, &length) == -1) { - netlog(f, Logtcp, "tcptrim, not accept, seq %lud-%lud win %lud-%lud from %I\n", + if(seg.seq+1 != tcb->rcv.nxt || length != 1) + netlog(f, Logtcp, "tcp: trim: !inwind: seq %lud-%lud win %lud-%lud l %d from %I\n", seg.seq, seg.seq + length - 1, - tcb->rcv.nxt, tcb->rcv.nxt + tcb->rcv.wnd-1, s->raddr); - netlog(f, Logtcp, "tcp len < 0, %lud %d\n", seg.seq, length); + tcb->rcv.nxt, tcb->rcv.nxt + tcb->rcv.wnd-1, length, s->raddr); update(s, &seg); if(qlen(s->wq)+tcb->flgcnt == 0 && tcb->state == Closing) { tcphalt(tpriv, &tcb->rtt_timer); @@ -2206,12 +2325,15 @@ if(seg.seq != tcb->rcv.nxt) if(length != 0 || (seg.flags & (SYN|FIN))) { update(s, &seg); - if(addreseq(tcb, tpriv, &seg, bp, length) < 0) + if(addreseq(f, tcb, tpriv, &seg, bp, length) < 0) print("reseq %I.%d -> %I.%d\n", s->raddr, s->rport, s->laddr, s->lport); - tcb->flags |= FORCE; + tcb->flags |= FORCE; /* force duplicate ack; RFC 5681 §3.2 */ goto output; } + if(tcb->nreseq > 0) + tcb->flags |= FORCE; /* filled hole in sequence space; RFC 5681 §3.2 */ + /* * keep looping till we've processed this packet plus any * adjacent packets in the resequence queue @@ -2315,29 +2437,10 @@ panic("tcp packblock"); qpassnolim(s->rq, bp); bp = nil; - - /* - * Force an ack every 2 data messages. This is - * a hack for rob to make his home system run - * faster. - * - * this also keeps the standard TCP congestion - * control working since it needs an ack every - * 2 max segs worth. This is not quite that, - * but under a real stream is equivalent since - * every packet has a max seg in it. - */ - if(++(tcb->rcv.una) >= 2) - tcb->flags |= FORCE; } tcb->rcv.nxt += length; /* - * update our rcv window - */ - tcprcvwin(s); - - /* * turn on the acktimer if there's something * to ack */ @@ -2411,8 +2514,11 @@ getreseq(tcb, &seg, &bp, &length); - if(tcptrim(tcb, &seg, &bp, &length) == 0) + tcprcvwin(s); + if(tcptrim(tcb, &seg, &bp, &length) == 0){ + tcb->flags |= FORCE; break; + } } } output: @@ -2432,15 +2538,15 @@ * the lock to ipoput the packet so some care has to be * taken by callers. */ -void +static void tcpoutput(Conv *s) { Tcp seg; - int msgs; + uint msgs; Tcpctl *tcb; Block *hbp, *bp; - int sndcnt, n; - ulong ssize, dsize, usable, sent; + int sndcnt; + ulong ssize, dsize, sent; Fs *f; Tcppriv *tpriv; uchar version; @@ -2448,11 +2554,27 @@ f = s->p->f; tpriv = s->p->priv; version = s->ipversion; - memset(&seg, 0, sizeof seg); - for(msgs = 0; msgs < 100; msgs++) { - tcb = (Tcpctl*)s->ptcl; + tcb = (Tcpctl*)s->ptcl; + + /* force ack every 2*mss */ + if((tcb->flags & FORCE) == 0) + if(tcb->rcv.nxt - tcb->rcv.ackptr >= 2*tcb->mss){ + tpriv->stats[Delayack]++; + tcb->flags |= FORCE; + } + + /* force ack if window opening */ + if(0) + if((tcb->flags & FORCE) == 0){ + tcprcvwin(s); + if((int)(tcb->rcv.wptr - tcb->rcv.wsnt) >= 2*tcb->mss){ + tpriv->stats[Wopenack]++; + tcb->flags |= FORCE; + } + } + for(msgs = 0; msgs < 100; msgs++) { switch(tcb->state) { case Listen: case Closed: @@ -2460,7 +2582,12 @@ return; } + /* Don't send anything else until our SYN has been acked */ + if(tcb->snd.ptr != tcb->iss && (tcb->flags & SYNACK) == 0) + break; + /* force an ack when a window has opened up */ + tcprcvwin(s); if(tcb->rcv.blocked && tcb->rcv.wnd > 0){ tcb->rcv.blocked = 0; tcb->flags |= FORCE; @@ -2468,55 +2595,57 @@ sndcnt = qlen(s->wq)+tcb->flgcnt; sent = tcb->snd.ptr - tcb->snd.una; - - /* Don't send anything else until our SYN has been acked */ - if(tcb->snd.ptr != tcb->iss && (tcb->flags & SYNACK) == 0) - break; - - /* Compute usable segment based on offered window and limit - * window probes to one - */ + ssize = sndcnt; if(tcb->snd.wnd == 0){ - if(sent != 0) { - if((tcb->flags&FORCE) == 0) - break; -// tcb->snd.ptr = tcb->snd.una; + /* zero window probe */ + if(sent > 0) + if(!(tcb->flags & FORCE)) + break; /* already probing, rto re-probes */ + if(ssize < sent) + ssize = 0; + else{ + ssize -= sent; + if(ssize > 0) + ssize = 1; + } + } else { + /* calculate usable segment size */ + if(ssize > tcb->cwind) + ssize = tcb->cwind; + if(ssize > tcb->snd.wnd) + ssize = tcb->snd.wnd; + + if(ssize < sent) + ssize = 0; + else { + ssize -= sent; + if(ssize > tcb->mss) + ssize = tcb->mss; } - usable = 1; } - else { - usable = tcb->cwind; - if(tcb->snd.wnd < usable) - usable = tcb->snd.wnd; -// usable -= sent; - usable = usable >= sent? usable - sent: 0; - } - ssize = sndcnt-sent; - if(ssize && usable < 2) - netlog(s->p->f, Logtcp, "throttled snd.wnd %lud cwind %lud\n", - tcb->snd.wnd, tcb->cwind); - if(usable < ssize) - ssize = usable; - if(tcb->mss < ssize) - ssize = tcb->mss; + dsize = ssize; seg.urg = 0; - if(ssize == 0) - if((tcb->flags&FORCE) == 0) - break; + if(!(tcb->flags & FORCE)){ + if(ssize == 0) + break; + if(ssize < tcb->mss) + if(tcb->snd.nxt == tcb->snd.ptr) + if(sent > TCPREXMTTHRESH*tcb->mss) + break; + } tcb->flags &= ~FORCE; - tcprcvwin(s); /* By default we will generate an ack */ tcphalt(tpriv, &tcb->acktimer); - tcb->rcv.una = 0; seg.source = s->lport; seg.dest = s->rport; seg.flags = ACK; seg.mss = 0; seg.ws = 0; + seg.update = 0; switch(tcb->state){ case Syn_sent: seg.flags = 0; @@ -2556,20 +2685,9 @@ } } - if(sent+dsize == sndcnt) + if(sent+dsize == sndcnt && dsize) seg.flags |= PSH; - /* keep track of balance of resent data */ - if(seq_lt(tcb->snd.ptr, tcb->snd.nxt)) { - n = tcb->snd.nxt - tcb->snd.ptr; - if(ssize < n) - n = ssize; - tcb->resent += n; - netlog(f, Logtcp, "rexmit: %I!%d -> %I!%d ptr %lux nxt %lux\n", - s->raddr, s->rport, s->laddr, s->lport, tcb->snd.ptr, tcb->snd.nxt); - tpriv->stats[RetransSegs]++; - } - tcb->snd.ptr += ssize; /* Pull up the send pointer so we can accept acks @@ -2605,13 +2723,17 @@ * expect acknowledges */ if(ssize != 0){ - if(tcb->timer.state != TcptimerON) + if(tcb->timer.state != TcptimerON){ + tcb->time = NOW; + tcb->timeuna = tcb->snd.una; tcpgo(tpriv, &tcb->timer); + } /* If round trip timer isn't running, start it. * measure the longest packet only in case the * transmission time dominates RTT */ + if(tcb->snd.retransmit == 0) if(tcb->rtt_timer.state != TcptimerON) if(ssize == tcb->mss) { tcpgo(tpriv, &tcb->rtt_timer); @@ -2620,6 +2742,10 @@ } tpriv->stats[OutSegs]++; + if(tcb->snd.retransmit) + tpriv->stats[RetransSegsSent]++; + tcb->rcv.ackptr = seg.ack; + tcb->rcv.wsnt = tcb->rcv.wptr; /* put off the next keep alive */ tcpgo(tpriv, &tcb->katimer); @@ -2640,9 +2766,8 @@ default: panic("tcpoutput2: version %d", version); } - if((msgs%4) == 1){ + if((msgs%4) == 3){ qunlock(s); - sched(); qlock(s); } } @@ -2651,7 +2776,7 @@ /* * the BSD convention (hack?) for keep alives. resend last uchar acked. */ -void +static void tcpsendka(Conv *s) { Tcp seg; @@ -2673,7 +2798,8 @@ else seg.seq = tcb->snd.una-1; seg.ack = tcb->rcv.nxt; - tcb->rcv.una = 0; + tcb->rcv.ackptr = seg.ack; + tcprcvwin(s); seg.wnd = tcb->rcv.wnd; if(tcb->state == Finwait2){ seg.flags |= FIN; @@ -2707,7 +2833,7 @@ /* * set connection to time out after 12 minutes */ -void +static void tcpsetkacounter(Tcpctl *tcb) { tcb->kacounter = (12 * 60 * 1000) / (tcb->katimer.start*MSPTICK); @@ -2719,7 +2845,7 @@ * if we've timed out, close the connection * otherwise, send a keepalive and restart the timer */ -void +static void tcpkeepalive(void *v) { Tcpctl *tcb; @@ -2747,7 +2873,7 @@ /* * start keepalive timer */ -char* +static char* tcpstartka(Conv *s, char **f, int n) { Tcpctl *tcb; @@ -2770,7 +2896,7 @@ /* * turn checksums on/off */ -char* +static char* tcpsetchecksum(Conv *s, char **f, int) { Tcpctl *tcb; @@ -2781,30 +2907,38 @@ return nil; } -void +/* + * retransmit (at most) one segment at snd.una. + * preserve cwind & snd.ptr + */ +static void tcprxmit(Conv *s) { Tcpctl *tcb; + Tcppriv *tpriv; + ulong tcwind, tptr; tcb = (Tcpctl*)s->ptcl; - tcb->flags |= RETRAN|FORCE; - tcb->snd.ptr = tcb->snd.una; - /* - * We should be halving the slow start threshhold (down to one - * mss) but leaving it at mss seems to work well enough - */ - tcb->ssthresh = tcb->mss; - - /* - * pull window down to a single packet - */ + tptr = tcb->snd.ptr; + tcwind = tcb->cwind; + tcb->snd.ptr = tcb->snd.una; tcb->cwind = tcb->mss; + tcb->snd.retransmit = 1; tcpoutput(s); + tcb->snd.retransmit = 0; + tcb->cwind = tcwind; + tcb->snd.ptr = tptr; + + tpriv = s->p->priv; + tpriv->stats[RetransSegs]++; } -void +/* + * todo: RFC 4138 F-RTO + */ +static void tcptimeout(void *arg) { Conv *s; @@ -2833,11 +2967,29 @@ localclose(s, Etimedout); break; } - netlog(s->p->f, Logtcprxmt, "timeout rexmit %#lux %d/%lud\n", tcb->snd.una, tcb->timer.start, NOW); + netlog(s->p->f, Logtcprxmt, "rxm %d/%d %ldms %lud rto %d %lud %s\n", + tcb->srtt, tcb->mdev, NOW-tcb->time, + tcb->snd.una-tcb->timeuna, tcb->snd.rto, tcb->snd.ptr, + tcpstates[s->state]); tcpsettimer(tcb); + if(tcb->snd.rto == 0) + tcpcongestion(tcb); tcprxmit(s); + tcb->snd.ptr = tcb->snd.una; + tcb->cwind = tcb->mss; + tcb->snd.rto = 1; tpriv->stats[RetransTimeouts]++; - tcb->snd.dupacks = 0; + + if(tcb->snd.recovery){ + tcb->snd.dupacks = 0; /* reno rto */ + tcb->snd.recovery = 0; + tpriv->stats[RecoveryRTO]++; + tcb->snd.rxt = tcb->snd.nxt; + netlog(s->p->f, Logtcpwin, + "rto recovery rxt @%lud\n", tcb->snd.nxt); + } + + tcb->abcbytes = 0; break; case Time_wait: localclose(s, nil); @@ -2849,7 +3001,7 @@ poperror(); } -int +static int inwindow(Tcpctl *tcb, int seq) { return seq_within(seq, tcb->rcv.nxt, tcb->rcv.nxt+tcb->rcv.wnd-1); @@ -2858,7 +3010,7 @@ /* * set up state for a received SYN (or SYN ACK) packet */ -void +static void procsyn(Conv *s, Tcp *seg) { Tcpctl *tcb; @@ -2868,6 +3020,8 @@ tcb->flags |= FORCE; tcb->rcv.nxt = seg->seq + 1; + tcb->rcv.wptr = tcb->rcv.nxt; + tcb->rcv.wsnt = 0; tcb->rcv.urg = tcb->rcv.nxt; tcb->irs = seg->seq; @@ -2878,20 +3032,55 @@ tpriv->stats[Mss] = tcb->mss; } - /* the congestion window always starts out as a single segment */ tcb->snd.wnd = seg->wnd; - tcb->cwind = tcb->mss; + initialwindow(tcb); } -int -addreseq(Tcpctl *tcb, Tcppriv *tpriv, Tcp *seg, Block *bp, ushort length) +static int +dumpreseq(Tcpctl *tcb) { - Reseq *rp, *rp1; - int i, rqlen, qmax; + Reseq *r, *next; + + for(r = tcb->reseq; r != nil; r = next){ + next = r->next; + freeblist(r->bp); + free(r); + } + tcb->reseq = nil; + tcb->nreseq = 0; + tcb->reseqlen = 0; + return -1; +} + +static void +logreseq(Fs *f, Reseq *r, ulong n) +{ + char *s; + + for(; r != nil; r = r->next){ + s = nil; + if(r->next == nil && r->seg.seq != n) + s = "hole/end"; + else if(r->next == nil) + s = "end"; + else if(r->seg.seq != n) + s = "hole"; + if(s != nil) + netlog(f, Logtcp, "%s %lud-%lud (%ld) %#ux\n", s, + n, r->seg.seq, r->seg.seq-n, r->seg.flags); + n = r->seg.seq + r->seg.len; + } +} + +static int +addreseq(Fs *f, Tcpctl *tcb, Tcppriv *tpriv, Tcp *seg, Block *bp, ushort length) +{ + Reseq *rp, **rr; + int qmax; rp = malloc(sizeof(Reseq)); if(rp == nil){ - freeblist(bp); /* bp always consumed by add_reseq */ + freeblist(bp); /* bp always consumed by addreseq */ return 0; } @@ -2899,58 +3088,39 @@ rp->bp = bp; rp->length = length; - /* Place on reassembly list sorting by starting seq number */ - rp1 = tcb->reseq; - if(rp1 == nil || seq_lt(seg->seq, rp1->seg.seq)) { - rp->next = rp1; - tcb->reseq = rp; - if(rp->next != nil) - tpriv->stats[OutOfOrder]++; - return 0; - } + tcb->reseqlen += length; + tcb->nreseq++; - rqlen = 0; - for(i = 0;; i++) { - rqlen += rp1->length; - if(rp1->next == nil || seq_lt(seg->seq, rp1->next->seg.seq)) { - rp->next = rp1->next; - rp1->next = rp; + /* Place on reassembly list sorting by starting seq number */ + for(rr = &tcb->reseq;; rr = &(*rr)->next) + if(*rr == nil || seq_lt(seg->seq, (*rr)->seg.seq)){ + rp->next = *rr; + *rr = rp; + tpriv->stats[Resequenced]++; if(rp->next != nil) tpriv->stats[OutOfOrder]++; break; } - rp1 = rp1->next; - } - qmax = QMAX<rcv.scale; - if(rqlen > qmax){ - print("resequence queue > window: %d > %d\n", rqlen, qmax); - i = 0; - for(rp1 = tcb->reseq; rp1 != nil; rp1 = rp1->next){ - print("%#lux %#lux %#ux\n", rp1->seg.seq, - rp1->seg.ack, rp1->seg.flags); - if(i++ > 10){ - print("...\n"); - break; - } - } - /* - * delete entire reassembly queue; wait for retransmit. - * - should we be smarter and only delete the tail? - */ - for(rp = tcb->reseq; rp != nil; rp = rp1){ - rp1 = rp->next; - freeblist(rp->bp); - free(rp); - } - tcb->reseq = nil; - - return -1; + qmax = tcb->window; + if(tcb->reseqlen > qmax){ + netlog(f, Logtcp, "tcp: reseq: queue > window: %d > %d; %d packets\n", tcb->reseqlen, qmax, tcb->nreseq); + logreseq(f, tcb->reseq, tcb->rcv.nxt); + tpriv->stats[ReseqBytelim]++; + return dumpreseq(tcb); + } + qmax = tcb->window / tcb->mss; /* ~190 for qscale==2, 390 for qscale=3 */ + if(tcb->nreseq > qmax){ + netlog(f, Logtcp, "resequence queue > packets: %d %d; %d bytes\n", tcb->nreseq, qmax, tcb->reseqlen); + logreseq(f, tcb->reseq, tcb->rcv.nxt); + tpriv->stats[ReseqPktlim]++; + return dumpreseq(tcb); } + return 0; } -void +static void getreseq(Tcpctl *tcb, Tcp *seg, Block **bp, ushort *length) { Reseq *rp; @@ -2965,10 +3135,13 @@ *bp = rp->bp; *length = rp->length; + tcb->nreseq--; + tcb->reseqlen -= rp->length; + free(rp); } -int +static int tcptrim(Tcpctl *tcb, Tcp *seg, Block **bp, ushort *length) { ushort len; @@ -3039,7 +3212,7 @@ return 0; } -void +static void tcpadvise(Proto *tcp, Block *bp, char *msg) { Tcp4hdr *h4; @@ -3105,7 +3278,7 @@ } /* called with c qlocked */ -char* +static char* tcpctl(Conv* c, char** f, int n) { if(n == 1 && strcmp(f[0], "hangup") == 0) @@ -3119,7 +3292,7 @@ return "unknown control request"; } -int +static int tcpstats(Proto *tcp, char *buf, int len) { Tcppriv *priv; @@ -3143,7 +3316,7 @@ * of questionable validity so we try to use them only when we're * up against the wall. */ -int +static int tcpgc(Proto *tcp) { Conv *c, **pp, **ep; @@ -3163,13 +3336,13 @@ switch(tcb->state){ case Syn_received: if(NOW - tcb->time > 5000){ - localclose(c, "timed out"); + localclose(c, Etimedout); n++; } break; case Finwait2: if(NOW - tcb->time > 5*60*1000){ - localclose(c, "timed out"); + localclose(c, Etimedout); n++; } break; @@ -3179,7 +3352,7 @@ return n; } -void +static void tcpsettimer(Tcpctl *tcb) { int x; @@ -3188,9 +3361,9 @@ x = backoff(tcb->backoff) * (tcb->mdev + (tcb->srtt>>LOGAGAIN) + MSPTICK) / MSPTICK; - /* bounded twixt 1/2 and 64 seconds */ - if(x < 500/MSPTICK) - x = 500/MSPTICK; + /* bounded twixt 0.3 and 64 seconds */ + if(x < 300/MSPTICK) + x = 300/MSPTICK; else if(x > (64000/MSPTICK)) x = 64000/MSPTICK; tcb->timer.start = x; @@ -3224,18 +3397,37 @@ Fsproto(fs, tcp); } -void +static void tcpsetscale(Conv *s, Tcpctl *tcb, ushort rcvscale, ushort sndscale) { - if(rcvscale){ - tcb->rcv.scale = rcvscale & 0xff; - tcb->snd.scale = sndscale & 0xff; - tcb->window = QMAX<snd.scale; - qsetlimit(s->rq, tcb->window); - } else { - tcb->rcv.scale = 0; - tcb->snd.scale = 0; - tcb->window = QMAX; - qsetlimit(s->rq, tcb->window); - } + /* + * guess at reasonable queue sizes. there's no current way + * to know how many nic receive buffers we can safely tie up in the + * tcp stack, and we don't adjust our queues to maximize throughput + * and minimize bufferbloat. n.b. the offer (rcvscale) needs to be + * respected, but we still control our own buffer commitment by + * keeping a seperate qscale. + */ + tcb->rcv.scale = rcvscale & 0xff; + tcb->snd.scale = sndscale & 0xff; + tcb->qscale = rcvscale & 0xff; + if(rcvscale > Maxqscale) + tcb->qscale = Maxqscale; + + if(rcvscale != tcb->rcv.scale) + netlog(s->p->f, Logtcp, "tcpsetscale: window %lud qlen %d >> window %ud lport %d\n", + tcb->window, qlen(s->rq), QMAX<qscale, s->lport); + tcb->window = QMAX<qscale; + tcb->ssthresh = tcb->window; + + /* + * it's important to set wq large enough to cover the full + * bandwidth-delay product. it's possible to be in loss + * recovery with a big window, and we need to keep sending + * into the inflated window. the difference can be huge + * for even modest (70ms) ping times. + */ + qsetlimit(s->rq, QMAX<qscale); + qsetlimit(s->wq, QMAX<qscale); + tcprcvwin(s); }