diff --git a/http2/http2_test.go b/http2/http2_test.go index b7c946b98..b1e71f153 100644 --- a/http2/http2_test.go +++ b/http2/http2_test.go @@ -283,3 +283,11 @@ func TestNoUnicodeStrings(t *testing.T) { t.Fatal(err) } } + +// must returns v if err is nil, or panics otherwise. +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/http2/transport.go b/http2/transport.go index e989bd19e..5d198baa5 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -364,6 +364,14 @@ type ClientConn struct { readIdleTimeout time.Duration pingTimeout time.Duration + // pendingResets is the number of RST_STREAM frames we have sent to the peer, + // without confirming that the peer has received them. When we send a RST_STREAM, + // we bundle it with a PING frame, unless a PING is already in flight. We count + // the reset stream against the connection's concurrency limit until we get + // a PING response. This limits the number of requests we'll try to send to a + // completely unresponsive connection. + pendingResets int + // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests. // Write to reqHeaderMu to lock it, read from it to unlock. // Lock reqmu BEFORE mu or wmu. @@ -960,7 +968,7 @@ func (cc *ClientConn) State() ClientConnState { return ClientConnState{ Closed: cc.closed, Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil, - StreamsActive: len(cc.streams), + StreamsActive: len(cc.streams) + cc.pendingResets, StreamsReserved: cc.streamsReserved, StreamsPending: cc.pendingRequests, LastIdle: cc.lastIdle, @@ -992,7 +1000,13 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) { // writing it. maxConcurrentOkay = true } else { - maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams) + // We can take a new request if the total of + // - active streams; + // - reservation slots for new streams; and + // - streams for which we have sent a RST_STREAM and a PING, + // but received no subsequent frame + // is less than the concurrency limit. + maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) } st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && @@ -1002,6 +1016,12 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) { return } +// currentRequestCountLocked reports the number of concurrency slots currently in use, +// including active streams, reserved slots, and reset streams waiting for acknowledgement. +func (cc *ClientConn) currentRequestCountLocked() int { + return len(cc.streams) + cc.streamsReserved + cc.pendingResets +} + func (cc *ClientConn) canTakeNewRequestLocked() bool { st := cc.idleStateLocked() return st.canTakeNewRequest @@ -1578,6 +1598,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) { cs.reqBodyClosed = make(chan struct{}) } bodyClosed := cs.reqBodyClosed + closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil cc.mu.Unlock() if mustCloseBody { cs.reqBody.Close() @@ -1602,16 +1623,40 @@ func (cs *clientStream) cleanupWriteRequest(err error) { if cs.sentHeaders { if se, ok := err.(StreamError); ok { if se.Cause != errFromPeer { - cc.writeStreamReset(cs.ID, se.Code, err) + cc.writeStreamReset(cs.ID, se.Code, false, err) } } else { - cc.writeStreamReset(cs.ID, ErrCodeCancel, err) + // We're cancelling an in-flight request. + // + // This could be due to the server becoming unresponsive. + // To avoid sending too many requests on a dead connection, + // we let the request continue to consume a concurrency slot + // until we can confirm the server is still responding. + // We do this by sending a PING frame along with the RST_STREAM + // (unless a ping is already in flight). + // + // For simplicity, we don't bother tracking the PING payload: + // We reset cc.pendingResets any time we receive a PING ACK. + // + // We skip this if the conn is going to be closed on idle, + // because it's short lived and will probably be closed before + // we get the ping response. + ping := false + if !closeOnIdle { + cc.mu.Lock() + if cc.pendingResets == 0 { + ping = true + } + cc.pendingResets++ + cc.mu.Unlock() + } + cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err) } } cs.bufPipe.CloseWithError(err) // no-op if already closed } else { if cs.sentHeaders && !cs.sentEndStream { - cc.writeStreamReset(cs.ID, ErrCodeNo, nil) + cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil) } cs.bufPipe.CloseWithError(errRequestCanceled) } @@ -1638,7 +1683,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error { return errClientConnUnusable } cc.lastIdle = time.Time{} - if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) { + if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) { return nil } cc.pendingRequests++ @@ -3065,6 +3110,11 @@ func (rl *clientConnReadLoop) processPing(f *PingFrame) error { close(c) delete(cc.pings, f.Data) } + if cc.pendingResets > 0 { + // See clientStream.cleanupWriteRequest. + cc.pendingResets = 0 + cc.cond.Broadcast() + } return nil } cc := rl.cc @@ -3087,13 +3137,20 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { return ConnectionError(ErrCodeProtocol) } -func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) { +// writeStreamReset sends a RST_STREAM frame. +// When ping is true, it also sends a PING frame with a random payload. +func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) { // TODO: map err to more interesting error codes, once the // HTTP community comes up with some. But currently for // RST_STREAM there's no equivalent to GOAWAY frame's debug // data, and the error codes are all pretty vague ("cancel"). cc.wmu.Lock() cc.fr.WriteRSTStream(streamID, code) + if ping { + var payload [8]byte + rand.Read(payload[:]) + cc.fr.WritePing(false, payload) + } cc.bw.Flush() cc.wmu.Unlock() } diff --git a/http2/transport_test.go b/http2/transport_test.go index 757a45a7a..f6ef295a4 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -2559,6 +2559,9 @@ func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) { } return true }, + func(f *PingFrame) bool { + return true + }, func(f *WindowUpdateFrame) bool { if !oneDataFrame && !sentAdditionalData { t.Fatalf("Got WindowUpdateFrame, don't expect one yet") @@ -5512,3 +5515,126 @@ func TestTransport1xxLimits(t *testing.T) { }) } } + +func TestTransportSendPingWithReset(t *testing.T) { + tc := newTestClientConn(t, func(tr *Transport) { + tr.StrictMaxConcurrentStreams = true + }) + + const maxConcurrent = 3 + tc.greet(Setting{SettingMaxConcurrentStreams, maxConcurrent}) + + // Start several requests. + var rts []*testRoundTrip + for i := 0; i < maxConcurrent+1; i++ { + req := must(http.NewRequest("GET", "https://dummy.tld/", nil)) + rt := tc.roundTrip(req) + if i >= maxConcurrent { + tc.wantIdle() + continue + } + tc.wantFrameType(FrameHeaders) + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt.wantStatus(200) + rts = append(rts, rt) + } + + // Cancel one request. We send a PING frame along with the RST_STREAM. + rts[0].response().Body.Close() + tc.wantRSTStream(rts[0].streamID(), ErrCodeCancel) + pf := readFrame[*PingFrame](t, tc) + tc.wantIdle() + + // Cancel another request. No PING frame, since one is in flight. + rts[1].response().Body.Close() + tc.wantRSTStream(rts[1].streamID(), ErrCodeCancel) + tc.wantIdle() + + // Respond to the PING. + // This finalizes the previous resets, and allows the pending request to be sent. + tc.writePing(true, pf.Data) + tc.wantFrameType(FrameHeaders) + tc.wantIdle() + + // Cancel the last request. We send another PING, since none are in flight. + rts[2].response().Body.Close() + tc.wantRSTStream(rts[2].streamID(), ErrCodeCancel) + tc.wantFrameType(FramePing) + tc.wantIdle() +} + +func TestTransportConnBecomesUnresponsive(t *testing.T) { + // We send a number of requests in series to an unresponsive connection. + // Each request is canceled or times out without a response. + // Eventually, we open a new connection rather than trying to use the old one. + tt := newTestTransport(t) + + const maxConcurrent = 3 + + t.Logf("first request opens a new connection and succeeds") + req1 := must(http.NewRequest("GET", "https://dummy.tld/", nil)) + rt1 := tt.roundTrip(req1) + tc1 := tt.getConn() + tc1.wantFrameType(FrameSettings) + tc1.wantFrameType(FrameWindowUpdate) + hf1 := readFrame[*HeadersFrame](t, tc1) + tc1.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent}) + tc1.wantFrameType(FrameSettings) // ack + tc1.writeHeaders(HeadersFrameParam{ + StreamID: hf1.StreamID, + EndHeaders: true, + EndStream: true, + BlockFragment: tc1.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt1.wantStatus(200) + rt1.response().Body.Close() + + // Send more requests. + // None receive a response. + // Each is canceled. + for i := 0; i < maxConcurrent; i++ { + t.Logf("request %v receives no response and is canceled", i) + ctx, cancel := context.WithCancel(context.Background()) + req := must(http.NewRequestWithContext(ctx, "GET", "https://dummy.tld/", nil)) + tt.roundTrip(req) + if tt.hasConn() { + t.Fatalf("new connection created; expect existing conn to be reused") + } + tc1.wantFrameType(FrameHeaders) + cancel() + tc1.wantFrameType(FrameRSTStream) + if i == 0 { + tc1.wantFrameType(FramePing) + } + tc1.wantIdle() + } + + // The conn has hit its concurrency limit. + // The next request is sent on a new conn. + req2 := must(http.NewRequest("GET", "https://dummy.tld/", nil)) + rt2 := tt.roundTrip(req2) + tc2 := tt.getConn() + tc2.wantFrameType(FrameSettings) + tc2.wantFrameType(FrameWindowUpdate) + hf := readFrame[*HeadersFrame](t, tc2) + tc2.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent}) + tc2.wantFrameType(FrameSettings) // ack + tc2.writeHeaders(HeadersFrameParam{ + StreamID: hf.StreamID, + EndHeaders: true, + EndStream: true, + BlockFragment: tc2.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt2.wantStatus(200) + rt2.response().Body.Close() +}