summaryrefslogtreecommitdiffstats
path: root/vendor/golang.org/x/net/http2/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net/http2/transport.go')
-rw-r--r--vendor/golang.org/x/net/http2/transport.go133
1 files changed, 91 insertions, 42 deletions
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index b97adff7..74c76da5 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -51,6 +51,15 @@ const (
transportDefaultStreamMinRefresh = 4 << 10
defaultUserAgent = "Go-http-client/2.0"
+
+ // initialMaxConcurrentStreams is a connections maxConcurrentStreams until
+ // it's received servers initial SETTINGS frame, which corresponds with the
+ // spec's minimum recommended value.
+ initialMaxConcurrentStreams = 100
+
+ // defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
+ // if the server doesn't include one in its initial SETTINGS frame.
+ defaultMaxConcurrentStreams = 1000
)
// Transport is an HTTP/2 Transport.
@@ -244,8 +253,10 @@ type ClientConn struct {
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow flow // our conn-level flow control quota (cs.flow is per stream)
inflow flow // peer's conn-level flow control
+ doNotReuse bool // whether conn is marked to not be reused for any future requests
closing bool
closed bool
+ seenSettings bool // true if we've seen a settings frame, false otherwise
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
@@ -385,8 +396,13 @@ func (cs *clientStream) abortRequestBodyWrite(err error) {
}
cc := cs.cc
cc.mu.Lock()
- cs.stopReqBody = err
- cc.cond.Broadcast()
+ if cs.stopReqBody == nil {
+ cs.stopReqBody = err
+ if cs.req.Body != nil {
+ cs.req.Body.Close()
+ }
+ cc.cond.Broadcast()
+ }
cc.mu.Unlock()
}
@@ -474,6 +490,7 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
}
reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
traceGotConn(req, cc, reused)
+ body := req.Body
res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
if err != nil && retry <= 6 {
if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
@@ -487,12 +504,17 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
case <-time.After(time.Second * time.Duration(backoff)):
continue
case <-req.Context().Done():
- return nil, req.Context().Err()
+ err = req.Context().Err()
}
}
}
if err != nil {
t.vlogf("RoundTrip failure: %v", err)
+ // If the error occurred after the body write started,
+ // the body writer will close the body. Otherwise, do so here.
+ if body != nil && !gotErrAfterReqBodyWrite {
+ body.Close()
+ }
return nil, err
}
return res, nil
@@ -531,7 +553,7 @@ func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*htt
// If the request body can be reset back to its original
// state via the optional req.GetBody, do that.
if req.GetBody != nil {
- // TODO: consider a req.Body.Close here? or audit that all caller paths do?
+ req.Body.Close()
body, err := req.GetBody()
if err != nil {
return nil, err
@@ -558,6 +580,10 @@ func canRetryError(err error) bool {
return true
}
if se, ok := err.(StreamError); ok {
+ if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
+ // See golang/go#47635, golang/go#42777
+ return true
+ }
return se.Code == ErrCodeRefusedStream
}
return false
@@ -632,10 +658,10 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
- maxFrameSize: 16 << 10, // spec default
- initialWindowSize: 65535, // spec default
- maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
- peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
+ maxFrameSize: 16 << 10, // spec default
+ initialWindowSize: 65535, // spec default
+ maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
@@ -709,6 +735,13 @@ func (cc *ClientConn) healthCheck() {
}
}
+// SetDoNotReuse marks cc as not reusable for future HTTP requests.
+func (cc *ClientConn) SetDoNotReuse() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ cc.doNotReuse = true
+}
+
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -767,10 +800,11 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
// writing it.
maxConcurrentOkay = true
} else {
- maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams)
+ maxConcurrentOkay = int64(len(cc.streams)+1) <= int64(cc.maxConcurrentStreams)
}
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
+ !cc.doNotReuse &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()
st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest
@@ -1057,13 +1091,14 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
if werr != nil {
if hasBody {
- req.Body.Close() // per RoundTripper contract
bodyWriter.cancel()
}
cc.forgetStreamID(cs.ID)
// Don't bother sending a RST_STREAM (our write already failed;
// no need to keep writing)
traceWroteRequest(cs.trace, werr)
+ // TODO(dneil): An error occurred while writing the headers.
+ // Should we return an error indicating that this request can be retried?
return nil, false, werr
}
@@ -1110,40 +1145,28 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
return res, false, nil
}
+ handleError := func(err error) (*http.Response, bool, error) {
+ if !hasBody || bodyWritten {
+ cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+ } else {
+ bodyWriter.cancel()
+ cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
+ <-bodyWriter.resc
+ }
+ cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), err
+ }
+
for {
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
case <-respHeaderTimer:
- if !hasBody || bodyWritten {
- cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
- } else {
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
- <-bodyWriter.resc
- }
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), errTimeout
+ return handleError(errTimeout)
case <-ctx.Done():
- if !hasBody || bodyWritten {
- cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
- } else {
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
- <-bodyWriter.resc
- }
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), ctx.Err()
+ return handleError(ctx.Err())
case <-req.Cancel:
- if !hasBody || bodyWritten {
- cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
- } else {
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
- <-bodyWriter.resc
- }
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), errRequestCanceled
+ return handleError(errRequestCanceled)
case <-cs.peerReset:
// processResetStream already removed the
// stream from the streams map; no need for
@@ -1184,7 +1207,7 @@ func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
return errClientConnUnusable
}
cc.lastIdle = time.Time{}
- if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
+ if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) {
if waitingForConn != nil {
close(waitingForConn)
}
@@ -1290,7 +1313,13 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- cerr := bodyCloser.Close()
+ var cerr error
+ cc.mu.Lock()
+ if cs.stopReqBody == nil {
+ cs.stopReqBody = errStopReqBodyWrite
+ cerr = bodyCloser.Close()
+ }
+ cc.mu.Unlock()
if err == nil {
err = cerr
}
@@ -2341,12 +2370,14 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
return ConnectionError(ErrCodeProtocol)
}
+ var seenMaxConcurrentStreams bool
err := f.ForeachSetting(func(s Setting) error {
switch s.ID {
case SettingMaxFrameSize:
cc.maxFrameSize = s.Val
case SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
+ seenMaxConcurrentStreams = true
case SettingMaxHeaderListSize:
cc.peerMaxHeaderListSize = uint64(s.Val)
case SettingInitialWindowSize:
@@ -2378,6 +2409,17 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
return err
}
+ if !cc.seenSettings {
+ if !seenMaxConcurrentStreams {
+ // This was the servers initial SETTINGS frame and it
+ // didn't contain a MAX_CONCURRENT_STREAMS field so
+ // increase the number of concurrent streams this
+ // connection can establish to our default.
+ cc.maxConcurrentStreams = defaultMaxConcurrentStreams
+ }
+ cc.seenSettings = true
+ }
+
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -2420,10 +2462,17 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
// which closes this, so there
// isn't a race.
default:
- err := streamError(cs.ID, f.ErrCode)
- cs.resetErr = err
+ serr := streamError(cs.ID, f.ErrCode)
+ if f.ErrCode == ErrCodeProtocol {
+ rl.cc.SetDoNotReuse()
+ serr.Cause = errFromPeer
+ // TODO(bradfitz): increment a varz here, once Transport
+ // takes an optional interface-typed field that expvar.Map.Add
+ // implements.
+ }
+ cs.resetErr = serr
close(cs.peerReset)
- cs.bufPipe.CloseWithError(err)
+ cs.bufPipe.CloseWithError(serr)
cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
return nil