summaryrefslogtreecommitdiffstats
path: root/vendor/golang.org/x/net/http2
diff options
context:
space:
mode:
authordependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2022-01-18 20:24:14 +0100
committerGitHub <noreply@github.com>2022-01-18 20:24:14 +0100
commitaad60c882e16cd2c8769a49e6d9f87a040590d62 (patch)
tree3bfe1f8953b40f9beb39c69db3a7647ea6de54d2 /vendor/golang.org/x/net/http2
parentfecca575078a21dedb0cab213dde7fd97161c0fa (diff)
downloadmatterbridge-msglm-aad60c882e16cd2c8769a49e6d9f87a040590d62.tar.gz
matterbridge-msglm-aad60c882e16cd2c8769a49e6d9f87a040590d62.tar.bz2
matterbridge-msglm-aad60c882e16cd2c8769a49e6d9f87a040590d62.zip
Bump github.com/mattermost/mattermost-server/v6 from 6.1.0 to 6.3.0 (#1686)
Bumps [github.com/mattermost/mattermost-server/v6](https://github.com/mattermost/mattermost-server) from 6.1.0 to 6.3.0. - [Release notes](https://github.com/mattermost/mattermost-server/releases) - [Changelog](https://github.com/mattermost/mattermost-server/blob/master/CHANGELOG.md) - [Commits](https://github.com/mattermost/mattermost-server/compare/v6.1.0...v6.3.0) --- updated-dependencies: - dependency-name: github.com/mattermost/mattermost-server/v6 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/golang.org/x/net/http2')
-rw-r--r--vendor/golang.org/x/net/http2/hpack/huffman.go38
-rw-r--r--vendor/golang.org/x/net/http2/transport.go415
2 files changed, 307 insertions, 146 deletions
diff --git a/vendor/golang.org/x/net/http2/hpack/huffman.go b/vendor/golang.org/x/net/http2/hpack/huffman.go
index a1ab2f05..fe0b84cc 100644
--- a/vendor/golang.org/x/net/http2/hpack/huffman.go
+++ b/vendor/golang.org/x/net/http2/hpack/huffman.go
@@ -140,25 +140,29 @@ func buildRootHuffmanNode() {
panic("unexpected size")
}
lazyRootHuffmanNode = newInternalNode()
- for i, code := range huffmanCodes {
- addDecoderNode(byte(i), code, huffmanCodeLen[i])
- }
-}
+ // allocate a leaf node for each of the 256 symbols
+ leaves := new([256]node)
+
+ for sym, code := range huffmanCodes {
+ codeLen := huffmanCodeLen[sym]
+
+ cur := lazyRootHuffmanNode
+ for codeLen > 8 {
+ codeLen -= 8
+ i := uint8(code >> codeLen)
+ if cur.children[i] == nil {
+ cur.children[i] = newInternalNode()
+ }
+ cur = cur.children[i]
+ }
+ shift := 8 - codeLen
+ start, end := int(uint8(code<<shift)), int(1<<shift)
-func addDecoderNode(sym byte, code uint32, codeLen uint8) {
- cur := lazyRootHuffmanNode
- for codeLen > 8 {
- codeLen -= 8
- i := uint8(code >> codeLen)
- if cur.children[i] == nil {
- cur.children[i] = newInternalNode()
+ leaves[sym].sym = byte(sym)
+ leaves[sym].codeLen = codeLen
+ for i := start; i < start+end; i++ {
+ cur.children[i] = &leaves[sym]
}
- cur = cur.children[i]
- }
- shift := 8 - codeLen
- start, end := int(uint8(code<<shift)), int(1<<shift)
- for i := start; i < start+end; i++ {
- cur.children[i] = &node{sym: sym, codeLen: codeLen}
}
}
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index 653a1a0b..581999be 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -24,6 +24,7 @@ import (
"net/http"
"net/http/httptrace"
"net/textproto"
+ "os"
"sort"
"strconv"
"strings"
@@ -130,6 +131,11 @@ type Transport struct {
// Defaults to 15s.
PingTimeout time.Duration
+ // WriteByteTimeout is the timeout after which the connection will be
+ // closed no data can be written to it. The timeout begins when data is
+ // available to write, and is extended whenever any bytes are written.
+ WriteByteTimeout time.Duration
+
// CountError, if non-nil, is called on HTTP/2 transport errors.
// It's intended to increment a metric for monitoring, such
// as an expvar or Prometheus metric.
@@ -300,12 +306,17 @@ type ClientConn struct {
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type clientStream struct {
- cc *ClientConn
- req *http.Request
+ cc *ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ reqCancel <-chan struct{}
+
trace *httptrace.ClientTrace // or nil
ID uint32
bufPipe pipe // buffered pipe with the flow-controlled response payload
requestedGzip bool
+ isHead bool
abortOnce sync.Once
abort chan struct{} // closed to signal stream should end immediately
@@ -322,7 +333,10 @@ type clientStream struct {
inflow flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
- stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+
+ reqBody io.ReadCloser
+ reqBodyContentLength int64 // -1 means unknown
+ reqBodyClosed bool // body has been closed; guarded by cc.mu
// owned by writeRequest:
sentEndStream bool // sent an END_STREAM flag to the peer
@@ -362,6 +376,10 @@ func (cs *clientStream) abortStreamLocked(err error) {
cs.abortErr = err
close(cs.abort)
})
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
+ }
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
@@ -369,31 +387,43 @@ func (cs *clientStream) abortStreamLocked(err error) {
}
}
-func (cs *clientStream) abortRequestBodyWrite(err error) {
- if err == nil {
- panic("nil error")
- }
+func (cs *clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
- if cs.stopReqBody == nil {
- cs.stopReqBody = err
+ defer cc.mu.Unlock()
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
cc.cond.Broadcast()
}
- cc.mu.Unlock()
}
type stickyErrWriter struct {
- w io.Writer
- err *error
+ conn net.Conn
+ timeout time.Duration
+ err *error
}
func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
- n, err = sew.w.Write(p)
- *sew.err = err
- return
+ for {
+ if sew.timeout != 0 {
+ sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
+ }
+ nn, err := sew.conn.Write(p[n:])
+ n += nn
+ if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
+ // Keep extending the deadline so long as we're making progress.
+ continue
+ }
+ if sew.timeout != 0 {
+ sew.conn.SetWriteDeadline(time.Time{})
+ }
+ *sew.err = err
+ return n, err
+ }
}
// noCachedConnError is the concrete type of ErrNoCachedConn, which
@@ -648,7 +678,11 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
- cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
+ cc.bw = bufio.NewWriter(stickyErrWriter{
+ conn: c,
+ timeout: t.WriteByteTimeout,
+ err: &cc.werr,
+ })
cc.br = bufio.NewReader(c)
cc.fr = NewFramer(cc.bw, cc.br)
if t.CountError != nil {
@@ -759,6 +793,61 @@ func (cc *ClientConn) ReserveNewRequest() bool {
return true
}
+// ClientConnState describes the state of a ClientConn.
+type ClientConnState struct {
+ // Closed is whether the connection is closed.
+ Closed bool
+
+ // Closing is whether the connection is in the process of
+ // closing. It may be closing due to shutdown, being a
+ // single-use connection, being marked as DoNotReuse, or
+ // having received a GOAWAY frame.
+ Closing bool
+
+ // StreamsActive is how many streams are active.
+ StreamsActive int
+
+ // StreamsReserved is how many streams have been reserved via
+ // ClientConn.ReserveNewRequest.
+ StreamsReserved int
+
+ // StreamsPending is how many requests have been sent in excess
+ // of the peer's advertised MaxConcurrentStreams setting and
+ // are waiting for other streams to complete.
+ StreamsPending int
+
+ // MaxConcurrentStreams is how many concurrent streams the
+ // peer advertised as acceptable. Zero means no SETTINGS
+ // frame has been received yet.
+ MaxConcurrentStreams uint32
+
+ // LastIdle, if non-zero, is when the connection last
+ // transitioned to idle state.
+ LastIdle time.Time
+}
+
+// State returns a snapshot of cc's state.
+func (cc *ClientConn) State() ClientConnState {
+ cc.wmu.Lock()
+ maxConcurrent := cc.maxConcurrentStreams
+ if !cc.seenSettings {
+ maxConcurrent = 0
+ }
+ cc.wmu.Unlock()
+
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return ClientConnState{
+ Closed: cc.closed,
+ Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
+ StreamsActive: len(cc.streams),
+ StreamsReserved: cc.streamsReserved,
+ StreamsPending: cc.pendingRequests,
+ LastIdle: cc.lastIdle,
+ MaxConcurrentStreams: maxConcurrent,
+ }
+}
+
// clientConnIdleState describes the suitability of a client
// connection to initiate a new RoundTrip request.
type clientConnIdleState struct {
@@ -1010,15 +1099,19 @@ func (cc *ClientConn) decrStreamReservationsLocked() {
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
cs := &clientStream{
- cc: cc,
- req: req,
- trace: httptrace.ContextClientTrace(req.Context()),
- peerClosed: make(chan struct{}),
- abort: make(chan struct{}),
- respHeaderRecv: make(chan struct{}),
- donec: make(chan struct{}),
- }
- go cs.doRequest()
+ cc: cc,
+ ctx: ctx,
+ reqCancel: req.Cancel,
+ isHead: req.Method == "HEAD",
+ reqBody: req.Body,
+ reqBodyContentLength: actualContentLength(req),
+ trace: httptrace.ContextClientTrace(ctx),
+ peerClosed: make(chan struct{}),
+ abort: make(chan struct{}),
+ respHeaderRecv: make(chan struct{}),
+ donec: make(chan struct{}),
+ }
+ go cs.doRequest(req)
waitDone := func() error {
select {
@@ -1026,44 +1119,60 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
+ handleResponseHeaders := func() (*http.Response, error) {
+ res := cs.res
+ if res.StatusCode > 299 {
+ // On error or status code 3xx, 4xx, 5xx, etc abort any
+ // ongoing write, assuming that the server doesn't care
+ // about our request body. If the server replied with 1xx or
+ // 2xx, however, then assume the server DOES potentially
+ // want our body (e.g. full-duplex streaming:
+ // golang.org/issue/13444). If it turns out the server
+ // doesn't, they'll RST_STREAM us soon enough. This is a
+ // heuristic to avoid adding knobs to Transport. Hopefully
+ // we can keep it.
+ cs.abortRequestBodyWrite()
+ }
+ res.Request = req
+ res.TLS = cc.tlsState
+ if res.Body == noBody && actualContentLength(req) == 0 {
+ // If there isn't a request or response body still being
+ // written, then wait for the stream to be closed before
+ // RoundTrip returns.
+ if err := waitDone(); err != nil {
+ return nil, err
+ }
+ }
+ return res, nil
+ }
+
for {
select {
case <-cs.respHeaderRecv:
- res := cs.res
- if res.StatusCode > 299 {
- // On error or status code 3xx, 4xx, 5xx, etc abort any
- // ongoing write, assuming that the server doesn't care
- // about our request body. If the server replied with 1xx or
- // 2xx, however, then assume the server DOES potentially
- // want our body (e.g. full-duplex streaming:
- // golang.org/issue/13444). If it turns out the server
- // doesn't, they'll RST_STREAM us soon enough. This is a
- // heuristic to avoid adding knobs to Transport. Hopefully
- // we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
- }
- res.Request = req
- res.TLS = cc.tlsState
- if res.Body == noBody && actualContentLength(req) == 0 {
- // If there isn't a request or response body still being
- // written, then wait for the stream to be closed before
- // RoundTrip returns.
- if err := waitDone(); err != nil {
- return nil, err
- }
- }
- return res, nil
+ return handleResponseHeaders()
case <-cs.abort:
- waitDone()
- return nil, cs.abortErr
+ select {
+ case <-cs.respHeaderRecv:
+ // If both cs.respHeaderRecv and cs.abort are signaling,
+ // pick respHeaderRecv. The server probably wrote the
+ // response and immediately reset the stream.
+ // golang.org/issue/49645
+ return handleResponseHeaders()
+ default:
+ waitDone()
+ return nil, cs.abortErr
+ }
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1072,8 +1181,8 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -1084,12 +1193,11 @@ func (cs *clientStream) doRequest() {
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}
@@ -1101,7 +1209,7 @@ func (cs *clientStream) writeRequest() (err error) {
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1118,13 +1226,16 @@ func (cs *clientStream) writeRequest() (err error) {
return err
}
cc.addStreamLocked(cs) // assigns stream ID
+ if isConnectionCloseRequest(req) {
+ cc.doNotReuse = true
+ }
cc.mu.Unlock()
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1143,19 +1254,23 @@ func (cs *clientStream) writeRequest() (err error) {
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1171,7 +1286,7 @@ func (cs *clientStream) writeRequest() (err error) {
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = errRequestCanceled
}
timer.Stop()
@@ -1181,7 +1296,7 @@ func (cs *clientStream) writeRequest() (err error) {
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1211,21 +1326,21 @@ func (cs *clientStream) writeRequest() (err error) {
case <-respHeaderTimer:
return errTimeout
case <-respHeaderRecv:
+ respHeaderRecv = nil
respHeaderTimer = nil // keep waiting for END_STREAM
case <-cs.abort:
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
}
-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -1236,7 +1351,7 @@ func (cs *clientStream) encodeAndWriteHeaders() error {
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
default:
}
@@ -1246,14 +1361,14 @@ func (cs *clientStream) encodeAndWriteHeaders() error {
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1272,7 +1387,6 @@ func (cs *clientStream) encodeAndWriteHeaders() error {
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1283,10 +1397,12 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -1320,7 +1436,6 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
if cs.ID != 0 {
cc.forgetStreamID(cs.ID)
}
- close(cs.donec)
cc.wmu.Lock()
werr := cc.werr
@@ -1328,6 +1443,8 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
if werr != nil {
cc.Close()
}
+
+ close(cs.donec)
}
// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams.
@@ -1401,7 +1518,7 @@ func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1416,13 +1533,13 @@ func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
var bufPool sync.Pool // of *[]byte
-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -1463,23 +1580,26 @@ func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
return err
}
}
- if err == io.EOF {
- sawEOF = true
- err = nil
- } else if err != nil {
- return err
+ if err != nil {
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cc.mu.Unlock()
+ switch {
+ case bodyClosed:
+ return errStopReqBodyWrite
+ case err == io.EOF:
+ sawEOF = true
+ err = nil
+ default:
+ return err
+ }
}
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1510,16 +1630,26 @@ func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
return nil
}
+ // Since the RoundTrip contract permits the caller to "mutate or reuse"
+ // a request after the Response's Body is closed, verify that this hasn't
+ // happened before accessing the trailers.
+ cc.mu.Lock()
+ trailer := req.Trailer
+ err = cs.abortErr
+ cc.mu.Unlock()
+ if err != nil {
+ return err
+ }
+
cc.wmu.Lock()
+ defer cc.wmu.Unlock()
var trls []byte
- if hasTrailers {
- trls, err = cc.encodeTrailers(req)
+ if len(trailer) > 0 {
+ trls, err = cc.encodeTrailers(trailer)
if err != nil {
- cc.wmu.Unlock()
return err
}
}
- defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
@@ -1540,23 +1670,22 @@ func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, errRequestCanceled
default:
}
@@ -1770,11 +1899,11 @@ func shouldSendReqContentLength(method string, contentLength int64) bool {
}
// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1784,7 +1913,7 @@ func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
return nil, errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -1920,7 +2049,13 @@ func (rl *clientConnReadLoop) cleanup() {
}
cc.closed = true
for _, cs := range cc.streams {
- cs.abortStreamLocked(err)
+ select {
+ case <-cs.peerClosed:
+ // The server closed the stream before closing the conn,
+ // so no need to interrupt it.
+ default:
+ cs.abortStreamLocked(err)
+ }
}
cc.cond.Broadcast()
cc.mu.Unlock()
@@ -2162,33 +2297,40 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
return nil, nil
}
- streamEnded := f.StreamEnded()
- isHead := cs.req.Method == "HEAD"
- if !streamEnded || isHead {
- res.ContentLength = -1
- if clens := res.Header["Content-Length"]; len(clens) == 1 {
- if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
- res.ContentLength = int64(cl)
- } else {
- // TODO: care? unlike http/1, it won't mess up our framing, so it's
- // more safe smuggling-wise to ignore.
- }
- } else if len(clens) > 1 {
+ res.ContentLength = -1
+ if clens := res.Header["Content-Length"]; len(clens) == 1 {
+ if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
+ res.ContentLength = int64(cl)
+ } else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
+ } else if len(clens) > 1 {
+ // TODO: care? unlike http/1, it won't mess up our framing, so it's
+ // more safe smuggling-wise to ignore.
+ } else if f.StreamEnded() && !cs.isHead {
+ res.ContentLength = 0
}
- if streamEnded || isHead {
+ if cs.isHead {
res.Body = noBody
return res, nil
}
+ if f.StreamEnded() {
+ if res.ContentLength > 0 {
+ res.Body = missingBody{}
+ } else {
+ res.Body = noBody
+ }
+ return res, nil
+ }
+
cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
cs.bytesRemain = res.ContentLength
res.Body = transportResponseBody{cs}
- if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
+ if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
res.Header.Del("Content-Encoding")
res.Header.Del("Content-Length")
res.ContentLength = -1
@@ -2227,8 +2369,7 @@ func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFr
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2311,6 +2452,8 @@ func (b transportResponseBody) Close() error {
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2325,9 +2468,12 @@ func (b transportResponseBody) Close() error {
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ // See golang/go#49366: The net/http package can cancel the
+ // request context after the response body is fully read.
+ // Don't treat this as an error.
+ return nil
+ case <-cs.reqCancel:
return errRequestCanceled
}
return nil
@@ -2381,7 +2527,7 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
@@ -2450,6 +2596,12 @@ func (rl *clientConnReadLoop) endStream(cs *clientStream) {
// server.go's (*stream).endStream method.
if !cs.readClosed {
cs.readClosed = true
+ // Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
+ // race condition: The caller can read io.EOF from Response.Body
+ // and close the body before we close cs.peerClosed, causing
+ // cleanupWriteRequest to send a RST_STREAM.
+ rl.cc.mu.Lock()
+ defer rl.cc.mu.Unlock()
cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
close(cs.peerClosed)
}
@@ -2731,6 +2883,11 @@ func (t *Transport) logf(format string, args ...interface{}) {
var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
+type missingBody struct{}
+
+func (missingBody) Close() error { return nil }
+func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
+
func strSliceContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {