summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/klauspost/compress/s2/decode.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/decode.go')
-rw-r--r--vendor/github.com/klauspost/compress/s2/decode.go229
1 files changed, 213 insertions, 16 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/decode.go b/vendor/github.com/klauspost/compress/s2/decode.go
index d0ae5304..9e7fce88 100644
--- a/vendor/github.com/klauspost/compress/s2/decode.go
+++ b/vendor/github.com/klauspost/compress/s2/decode.go
@@ -8,7 +8,9 @@ package s2
import (
"encoding/binary"
"errors"
+ "fmt"
"io"
+ "io/ioutil"
)
var (
@@ -22,6 +24,16 @@ var (
ErrUnsupported = errors.New("s2: unsupported input")
)
+// ErrCantSeek is returned if the stream cannot be seeked.
+type ErrCantSeek struct {
+ Reason string
+}
+
+// Error returns the error as string.
+func (e ErrCantSeek) Error() string {
+ return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
+}
+
// DecodedLen returns the length of the decoded block.
func DecodedLen(src []byte) (int, error) {
v, _, err := decodedLen(src)
@@ -88,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
} else {
nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
}
+ nr.readHeader = nr.ignoreStreamID
nr.paramsOK = true
return &nr
}
@@ -131,12 +144,41 @@ func ReaderAllocBlock(blockSize int) ReaderOption {
}
}
+// ReaderIgnoreStreamIdentifier will make the reader skip the expected
+// stream identifier at the beginning of the stream.
+// This can be used when serving a stream that has been forwarded to a specific point.
+func ReaderIgnoreStreamIdentifier() ReaderOption {
+ return func(r *Reader) error {
+ r.ignoreStreamID = true
+ return nil
+ }
+}
+
+// ReaderSkippableCB will register a callback for chuncks with the specified ID.
+// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
+// For each chunk with the ID, the callback is called with the content.
+// Any returned non-nil error will abort decompression.
+// Only one callback per ID is supported, latest sent will be used.
+func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
+ return func(r *Reader) error {
+ if id < 0x80 || id > 0xfd {
+ return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
+ }
+ r.skippableCB[id] = fn
+ return nil
+ }
+}
+
// Reader is an io.Reader that can read Snappy-compressed bytes.
type Reader struct {
- r io.Reader
- err error
- decoded []byte
- buf []byte
+ r io.Reader
+ err error
+ decoded []byte
+ buf []byte
+ skippableCB [0x80]func(r io.Reader) error
+ blockStart int64 // Uncompressed offset at start of current.
+ index *Index
+
// decoded[i:j] contains decoded bytes that have not yet been passed on.
i, j int
// maximum block size allowed.
@@ -144,10 +186,11 @@ type Reader struct {
// maximum expected buffer size.
maxBufSize int
// alloc a buffer this size if > 0.
- lazyBuf int
- readHeader bool
- paramsOK bool
- snappyFrame bool
+ lazyBuf int
+ readHeader bool
+ paramsOK bool
+ snappyFrame bool
+ ignoreStreamID bool
}
// ensureBufferSize will ensure that the buffer can take at least n bytes.
@@ -172,11 +215,12 @@ func (r *Reader) Reset(reader io.Reader) {
if !r.paramsOK {
return
}
+ r.index = nil
r.r = reader
r.err = nil
r.i = 0
r.j = 0
- r.readHeader = false
+ r.readHeader = r.ignoreStreamID
}
func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
@@ -189,11 +233,24 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
return true
}
-// skipN will skip n bytes.
+// skippable will skip n bytes.
// If the supplied reader supports seeking that is used.
// tmp is used as a temporary buffer for reading.
// The supplied slice does not need to be the size of the read.
-func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) {
+func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
+ if id < 0x80 {
+ r.err = fmt.Errorf("interbal error: skippable id < 0x80")
+ return false
+ }
+ if fn := r.skippableCB[id-0x80]; fn != nil {
+ rd := io.LimitReader(r.r, int64(n))
+ r.err = fn(rd)
+ if r.err != nil {
+ return false
+ }
+ _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
+ return r.err == nil
+ }
if rs, ok := r.r.(io.ReadSeeker); ok {
_, err := rs.Seek(int64(n), io.SeekCurrent)
if err == nil {
@@ -247,6 +304,7 @@ func (r *Reader) Read(p []byte) (int, error) {
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
+ r.blockStart += int64(r.j)
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -294,6 +352,7 @@ func (r *Reader) Read(p []byte) (int, error) {
continue
case chunkTypeUncompressedData:
+ r.blockStart += int64(r.j)
// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -357,17 +416,20 @@ func (r *Reader) Read(p []byte) (int, error) {
if chunkType <= 0x7f {
// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
+ // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
r.err = ErrUnsupported
return 0, r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
- if chunkLen > maxBlockSize {
+ if chunkLen > maxChunkSize {
+ // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
r.err = ErrUnsupported
return 0, r.err
}
- if !r.skipN(r.buf, chunkLen, false) {
+ // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
+ if !r.skippable(r.buf, chunkLen, false, chunkType) {
return 0, r.err
}
}
@@ -396,7 +458,7 @@ func (r *Reader) Skip(n int64) error {
return nil
}
n -= int64(r.j - r.i)
- r.i, r.j = 0, 0
+ r.i = r.j
}
// Buffer empty; read blocks until we have content.
@@ -420,6 +482,7 @@ func (r *Reader) Skip(n int64) error {
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
+ r.blockStart += int64(r.j)
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -468,6 +531,7 @@ func (r *Reader) Skip(n int64) error {
r.i, r.j = 0, dLen
continue
case chunkTypeUncompressedData:
+ r.blockStart += int64(r.j)
// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -528,19 +592,138 @@ func (r *Reader) Skip(n int64) error {
r.err = ErrUnsupported
return r.err
}
- if chunkLen > maxBlockSize {
+ if chunkLen > maxChunkSize {
r.err = ErrUnsupported
return r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
- if !r.skipN(r.buf, chunkLen, false) {
+ if !r.skippable(r.buf, chunkLen, false, chunkType) {
return r.err
}
}
return nil
}
+// ReadSeeker provides random or forward seeking in compressed content.
+// See Reader.ReadSeeker
+type ReadSeeker struct {
+ *Reader
+}
+
+// ReadSeeker will return an io.ReadSeeker compatible version of the reader.
+// If 'random' is specified the returned io.Seeker can be used for
+// random seeking, otherwise only forward seeking is supported.
+// Enabling random seeking requires the original input to support
+// the io.Seeker interface.
+// A custom index can be specified which will be used if supplied.
+// When using a custom index, it will not be read from the input stream.
+// The returned ReadSeeker contains a shallow reference to the existing Reader,
+// meaning changes performed to one is reflected in the other.
+func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
+ // Read index if provided.
+ if len(index) != 0 {
+ if r.index == nil {
+ r.index = &Index{}
+ }
+ if _, err := r.index.Load(index); err != nil {
+ return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
+ }
+ }
+
+ // Check if input is seekable
+ rs, ok := r.r.(io.ReadSeeker)
+ if !ok {
+ if !random {
+ return &ReadSeeker{Reader: r}, nil
+ }
+ return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
+ }
+
+ if r.index != nil {
+ // Seekable and index, ok...
+ return &ReadSeeker{Reader: r}, nil
+ }
+
+ // Load from stream.
+ r.index = &Index{}
+
+ // Read current position.
+ pos, err := rs.Seek(0, io.SeekCurrent)
+ if err != nil {
+ return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
+ }
+ err = r.index.LoadStream(rs)
+ if err != nil {
+ if err == ErrUnsupported {
+ return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
+ }
+ return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
+ }
+
+ // reset position.
+ _, err = rs.Seek(pos, io.SeekStart)
+ if err != nil {
+ return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
+ }
+ return &ReadSeeker{Reader: r}, nil
+}
+
+// Seek allows seeking in compressed data.
+func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
+ if r.err != nil {
+ return 0, r.err
+ }
+ if offset == 0 && whence == io.SeekCurrent {
+ return r.blockStart + int64(r.i), nil
+ }
+ if !r.readHeader {
+ // Make sure we read the header.
+ _, r.err = r.Read([]byte{})
+ }
+ rs, ok := r.r.(io.ReadSeeker)
+ if r.index == nil || !ok {
+ if whence == io.SeekCurrent && offset >= 0 {
+ err := r.Skip(offset)
+ return r.blockStart + int64(r.i), err
+ }
+ if whence == io.SeekStart && offset >= r.blockStart+int64(r.i) {
+ err := r.Skip(offset - r.blockStart - int64(r.i))
+ return r.blockStart + int64(r.i), err
+ }
+ return 0, ErrUnsupported
+
+ }
+
+ switch whence {
+ case io.SeekCurrent:
+ offset += r.blockStart + int64(r.i)
+ case io.SeekEnd:
+ offset = -offset
+ }
+ c, u, err := r.index.Find(offset)
+ if err != nil {
+ return r.blockStart + int64(r.i), err
+ }
+
+ // Seek to next block
+ _, err = rs.Seek(c, io.SeekStart)
+ if err != nil {
+ return 0, err
+ }
+
+ if offset < 0 {
+ offset = r.index.TotalUncompressed + offset
+ }
+
+ r.i = r.j // Remove rest of current block.
+ if u < offset {
+ // Forward inside block
+ return offset, r.Skip(offset - u)
+ }
+ return offset, nil
+}
+
// ReadByte satisfies the io.ByteReader interface.
func (r *Reader) ReadByte() (byte, error) {
if r.err != nil {
@@ -563,3 +746,17 @@ func (r *Reader) ReadByte() (byte, error) {
}
return 0, io.ErrNoProgress
}
+
+// SkippableCB will register a callback for chunks with the specified ID.
+// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive).
+// For each chunk with the ID, the callback is called with the content.
+// Any returned non-nil error will abort decompression.
+// Only one callback per ID is supported, latest sent will be used.
+// Sending a nil function will disable previous callbacks.
+func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
+ if id < 0x80 || id > chunkTypePadding {
+ return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
+ }
+ r.skippableCB[id] = fn
+ return nil
+}