diff options
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/decode.go')
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/decode.go | 229 |
1 files changed, 213 insertions, 16 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/decode.go b/vendor/github.com/klauspost/compress/s2/decode.go index d0ae5304..9e7fce88 100644 --- a/vendor/github.com/klauspost/compress/s2/decode.go +++ b/vendor/github.com/klauspost/compress/s2/decode.go @@ -8,7 +8,9 @@ package s2 import ( "encoding/binary" "errors" + "fmt" "io" + "io/ioutil" ) var ( @@ -22,6 +24,16 @@ var ( ErrUnsupported = errors.New("s2: unsupported input") ) +// ErrCantSeek is returned if the stream cannot be seeked. +type ErrCantSeek struct { + Reason string +} + +// Error returns the error as string. +func (e ErrCantSeek) Error() string { + return fmt.Sprintf("s2: Can't seek because %s", e.Reason) +} + // DecodedLen returns the length of the decoded block. func DecodedLen(src []byte) (int, error) { v, _, err := decodedLen(src) @@ -88,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader { } else { nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize) } + nr.readHeader = nr.ignoreStreamID nr.paramsOK = true return &nr } @@ -131,12 +144,41 @@ func ReaderAllocBlock(blockSize int) ReaderOption { } } +// ReaderIgnoreStreamIdentifier will make the reader skip the expected +// stream identifier at the beginning of the stream. +// This can be used when serving a stream that has been forwarded to a specific point. +func ReaderIgnoreStreamIdentifier() ReaderOption { + return func(r *Reader) error { + r.ignoreStreamID = true + return nil + } +} + +// ReaderSkippableCB will register a callback for chuncks with the specified ID. +// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive). +// For each chunk with the ID, the callback is called with the content. +// Any returned non-nil error will abort decompression. +// Only one callback per ID is supported, latest sent will be used. +func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption { + return func(r *Reader) error { + if id < 0x80 || id > 0xfd { + return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)") + } + r.skippableCB[id] = fn + return nil + } +} + // Reader is an io.Reader that can read Snappy-compressed bytes. type Reader struct { - r io.Reader - err error - decoded []byte - buf []byte + r io.Reader + err error + decoded []byte + buf []byte + skippableCB [0x80]func(r io.Reader) error + blockStart int64 // Uncompressed offset at start of current. + index *Index + // decoded[i:j] contains decoded bytes that have not yet been passed on. i, j int // maximum block size allowed. @@ -144,10 +186,11 @@ type Reader struct { // maximum expected buffer size. maxBufSize int // alloc a buffer this size if > 0. - lazyBuf int - readHeader bool - paramsOK bool - snappyFrame bool + lazyBuf int + readHeader bool + paramsOK bool + snappyFrame bool + ignoreStreamID bool } // ensureBufferSize will ensure that the buffer can take at least n bytes. @@ -172,11 +215,12 @@ func (r *Reader) Reset(reader io.Reader) { if !r.paramsOK { return } + r.index = nil r.r = reader r.err = nil r.i = 0 r.j = 0 - r.readHeader = false + r.readHeader = r.ignoreStreamID } func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { @@ -189,11 +233,24 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { return true } -// skipN will skip n bytes. +// skippable will skip n bytes. // If the supplied reader supports seeking that is used. // tmp is used as a temporary buffer for reading. // The supplied slice does not need to be the size of the read. -func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) { +func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) { + if id < 0x80 { + r.err = fmt.Errorf("interbal error: skippable id < 0x80") + return false + } + if fn := r.skippableCB[id-0x80]; fn != nil { + rd := io.LimitReader(r.r, int64(n)) + r.err = fn(rd) + if r.err != nil { + return false + } + _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp) + return r.err == nil + } if rs, ok := r.r.(io.ReadSeeker); ok { _, err := rs.Seek(int64(n), io.SeekCurrent) if err == nil { @@ -247,6 +304,7 @@ func (r *Reader) Read(p []byte) (int, error) { // https://github.com/google/snappy/blob/master/framing_format.txt switch chunkType { case chunkTypeCompressedData: + r.blockStart += int64(r.j) // Section 4.2. Compressed data (chunk type 0x00). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -294,6 +352,7 @@ func (r *Reader) Read(p []byte) (int, error) { continue case chunkTypeUncompressedData: + r.blockStart += int64(r.j) // Section 4.3. Uncompressed data (chunk type 0x01). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -357,17 +416,20 @@ func (r *Reader) Read(p []byte) (int, error) { if chunkType <= 0x7f { // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + // fmt.Printf("ERR chunktype: 0x%x\n", chunkType) r.err = ErrUnsupported return 0, r.err } // Section 4.4 Padding (chunk type 0xfe). // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if chunkLen > maxBlockSize { + if chunkLen > maxChunkSize { + // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen) r.err = ErrUnsupported return 0, r.err } - if !r.skipN(r.buf, chunkLen, false) { + // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen) + if !r.skippable(r.buf, chunkLen, false, chunkType) { return 0, r.err } } @@ -396,7 +458,7 @@ func (r *Reader) Skip(n int64) error { return nil } n -= int64(r.j - r.i) - r.i, r.j = 0, 0 + r.i = r.j } // Buffer empty; read blocks until we have content. @@ -420,6 +482,7 @@ func (r *Reader) Skip(n int64) error { // https://github.com/google/snappy/blob/master/framing_format.txt switch chunkType { case chunkTypeCompressedData: + r.blockStart += int64(r.j) // Section 4.2. Compressed data (chunk type 0x00). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -468,6 +531,7 @@ func (r *Reader) Skip(n int64) error { r.i, r.j = 0, dLen continue case chunkTypeUncompressedData: + r.blockStart += int64(r.j) // Section 4.3. Uncompressed data (chunk type 0x01). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -528,19 +592,138 @@ func (r *Reader) Skip(n int64) error { r.err = ErrUnsupported return r.err } - if chunkLen > maxBlockSize { + if chunkLen > maxChunkSize { r.err = ErrUnsupported return r.err } // Section 4.4 Padding (chunk type 0xfe). // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if !r.skipN(r.buf, chunkLen, false) { + if !r.skippable(r.buf, chunkLen, false, chunkType) { return r.err } } return nil } +// ReadSeeker provides random or forward seeking in compressed content. +// See Reader.ReadSeeker +type ReadSeeker struct { + *Reader +} + +// ReadSeeker will return an io.ReadSeeker compatible version of the reader. +// If 'random' is specified the returned io.Seeker can be used for +// random seeking, otherwise only forward seeking is supported. +// Enabling random seeking requires the original input to support +// the io.Seeker interface. +// A custom index can be specified which will be used if supplied. +// When using a custom index, it will not be read from the input stream. +// The returned ReadSeeker contains a shallow reference to the existing Reader, +// meaning changes performed to one is reflected in the other. +func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) { + // Read index if provided. + if len(index) != 0 { + if r.index == nil { + r.index = &Index{} + } + if _, err := r.index.Load(index); err != nil { + return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()} + } + } + + // Check if input is seekable + rs, ok := r.r.(io.ReadSeeker) + if !ok { + if !random { + return &ReadSeeker{Reader: r}, nil + } + return nil, ErrCantSeek{Reason: "input stream isn't seekable"} + } + + if r.index != nil { + // Seekable and index, ok... + return &ReadSeeker{Reader: r}, nil + } + + // Load from stream. + r.index = &Index{} + + // Read current position. + pos, err := rs.Seek(0, io.SeekCurrent) + if err != nil { + return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} + } + err = r.index.LoadStream(rs) + if err != nil { + if err == ErrUnsupported { + return nil, ErrCantSeek{Reason: "input stream does not contain an index"} + } + return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()} + } + + // reset position. + _, err = rs.Seek(pos, io.SeekStart) + if err != nil { + return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} + } + return &ReadSeeker{Reader: r}, nil +} + +// Seek allows seeking in compressed data. +func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { + if r.err != nil { + return 0, r.err + } + if offset == 0 && whence == io.SeekCurrent { + return r.blockStart + int64(r.i), nil + } + if !r.readHeader { + // Make sure we read the header. + _, r.err = r.Read([]byte{}) + } + rs, ok := r.r.(io.ReadSeeker) + if r.index == nil || !ok { + if whence == io.SeekCurrent && offset >= 0 { + err := r.Skip(offset) + return r.blockStart + int64(r.i), err + } + if whence == io.SeekStart && offset >= r.blockStart+int64(r.i) { + err := r.Skip(offset - r.blockStart - int64(r.i)) + return r.blockStart + int64(r.i), err + } + return 0, ErrUnsupported + + } + + switch whence { + case io.SeekCurrent: + offset += r.blockStart + int64(r.i) + case io.SeekEnd: + offset = -offset + } + c, u, err := r.index.Find(offset) + if err != nil { + return r.blockStart + int64(r.i), err + } + + // Seek to next block + _, err = rs.Seek(c, io.SeekStart) + if err != nil { + return 0, err + } + + if offset < 0 { + offset = r.index.TotalUncompressed + offset + } + + r.i = r.j // Remove rest of current block. + if u < offset { + // Forward inside block + return offset, r.Skip(offset - u) + } + return offset, nil +} + // ReadByte satisfies the io.ByteReader interface. func (r *Reader) ReadByte() (byte, error) { if r.err != nil { @@ -563,3 +746,17 @@ func (r *Reader) ReadByte() (byte, error) { } return 0, io.ErrNoProgress } + +// SkippableCB will register a callback for chunks with the specified ID. +// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive). +// For each chunk with the ID, the callback is called with the content. +// Any returned non-nil error will abort decompression. +// Only one callback per ID is supported, latest sent will be used. +// Sending a nil function will disable previous callbacks. +func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error { + if id < 0x80 || id > chunkTypePadding { + return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)") + } + r.skippableCB[id] = fn + return nil +} |