summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/klauspost/compress/s2
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2')
-rw-r--r--vendor/github.com/klauspost/compress/s2/README.md256
-rw-r--r--vendor/github.com/klauspost/compress/s2/decode.go229
-rw-r--r--vendor/github.com/klauspost/compress/s2/encode.go241
-rw-r--r--vendor/github.com/klauspost/compress/s2/index.go525
-rw-r--r--vendor/github.com/klauspost/compress/s2/s2.go4
5 files changed, 1188 insertions, 67 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/README.md b/vendor/github.com/klauspost/compress/s2/README.md
index 81fad652..e6716aea 100644
--- a/vendor/github.com/klauspost/compress/s2/README.md
+++ b/vendor/github.com/klauspost/compress/s2/README.md
@@ -20,6 +20,7 @@ This is important, so you don't have to worry about spending CPU cycles on alrea
* Concurrent stream compression
* Faster decompression, even for Snappy compatible content
* Ability to quickly skip forward in compressed stream
+* Random seeking with indexes
* Compatible with reading Snappy compressed content
* Smaller block size overhead on incompressible blocks
* Block concatenation
@@ -29,8 +30,8 @@ This is important, so you don't have to worry about spending CPU cycles on alrea
## Drawbacks over Snappy
-* Not optimized for 32 bit systems.
-* Streams use slightly more memory due to larger blocks and concurrency (configurable).
+* Not optimized for 32 bit systems
+* Streams use slightly more memory due to larger blocks and concurrency (configurable)
# Usage
@@ -141,7 +142,7 @@ Binaries can be downloaded on the [Releases Page](https://github.com/klauspost/c
Installing then requires Go to be installed. To install them, use:
-`go install github.com/klauspost/compress/s2/cmd/s2c && go install github.com/klauspost/compress/s2/cmd/s2d`
+`go install github.com/klauspost/compress/s2/cmd/s2c@latest && go install github.com/klauspost/compress/s2/cmd/s2d@latest`
To build binaries to the current folder use:
@@ -176,6 +177,8 @@ Options:
Compress faster, but with a minor compression loss
-help
Display help
+ -index
+ Add seek index (default true)
-o string
Write output to another file. Single input file only
-pad string
@@ -217,11 +220,15 @@ Options:
Display help
-o string
Write output to another file. Single input file only
- -q Don't write any output to terminal, except errors
+ -offset string
+ Start at offset. Examples: 92, 64K, 256K, 1M, 4M. Requires Index
+ -q Don't write any output to terminal, except errors
-rm
- Delete source file(s) after successful decompression
+ Delete source file(s) after successful decompression
-safe
- Do not overwrite output files
+ Do not overwrite output files
+ -tail string
+ Return last of compressed file. Examples: 92, 64K, 256K, 1M, 4M. Requires Index
-verify
Verify files, but do not write output
```
@@ -633,12 +640,12 @@ Compression and speed is typically a bit better `MaxEncodedLen` is also smaller
Comparison of [`webdevdata.org-2015-01-07-subset`](https://files.klauspost.com/compress/webdevdata.org-2015-01-07-4GB-subset.7z),
53927 files, total input size: 4,014,735,833 bytes. amd64, single goroutine used:
-| Encoder | Size | MB/s | Reduction |
-|-----------------------|------------|--------|------------
-| snappy.Encode | 1128706759 | 725.59 | 71.89% |
-| s2.EncodeSnappy | 1093823291 | 899.16 | 72.75% |
-| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% |
-| s2.EncodeSnappyBest | 944507998 | 66.00 | 76.47% |
+| Encoder | Size | MB/s | Reduction |
+|-----------------------|------------|------------|------------
+| snappy.Encode | 1128706759 | 725.59 | 71.89% |
+| s2.EncodeSnappy | 1093823291 | **899.16** | 72.75% |
+| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% |
+| s2.EncodeSnappyBest | 944507998 | 66.00 | **76.47%**|
## Streams
@@ -649,11 +656,11 @@ Comparison of different streams, AMD Ryzen 3950x, 16 cores. Size and throughput:
| File | snappy.NewWriter | S2 Snappy | S2 Snappy, Better | S2 Snappy, Best |
|-----------------------------|--------------------------|---------------------------|--------------------------|-------------------------|
-| nyc-taxi-data-10M.csv | 1316042016 - 517.54MB/s | 1307003093 - 8406.29MB/s | 1174534014 - 4984.35MB/s | 1115904679 - 177.81MB/s |
-| enwik10 | 5088294643 - 433.45MB/s | 5175840939 - 8454.52MB/s | 4560784526 - 4403.10MB/s | 4340299103 - 159.71MB/s |
-| 10gb.tar | 6056946612 - 703.25MB/s | 6208571995 - 9035.75MB/s | 5741646126 - 2402.08MB/s | 5548973895 - 171.17MB/s |
-| github-june-2days-2019.json | 1525176492 - 908.11MB/s | 1476519054 - 12625.93MB/s | 1400547532 - 6163.61MB/s | 1321887137 - 200.71MB/s |
-| consensus.db.10gb | 5412897703 - 1054.38MB/s | 5354073487 - 12634.82MB/s | 5335069899 - 2472.23MB/s | 5201000954 - 166.32MB/s |
+| nyc-taxi-data-10M.csv | 1316042016 - 539.47MB/s | 1307003093 - 10132.73MB/s | 1174534014 - 5002.44MB/s | 1115904679 - 177.97MB/s |
+| enwik10 (xml) | 5088294643 - 451.13MB/s | 5175840939 - 9440.69MB/s | 4560784526 - 4487.21MB/s | 4340299103 - 158.92MB/s |
+| 10gb.tar (mixed) | 6056946612 - 729.73MB/s | 6208571995 - 9978.05MB/s | 5741646126 - 4919.98MB/s | 5548973895 - 180.44MB/s |
+| github-june-2days-2019.json | 1525176492 - 933.00MB/s | 1476519054 - 13150.12MB/s | 1400547532 - 5803.40MB/s | 1321887137 - 204.29MB/s |
+| consensus.db.10gb (db) | 5412897703 - 1102.14MB/s | 5354073487 - 13562.91MB/s | 5335069899 - 5294.73MB/s | 5201000954 - 175.72MB/s |
# Decompression
@@ -679,7 +686,220 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped,
Blocks can be concatenated using the `ConcatBlocks` function.
-Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
+Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
+Streams with indexes (see below) will currently not work on concatenated streams.
+
+# Stream Seek Index
+
+S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data.
+
+The index can either be appended to the stream as a skippable block or returned for separate storage.
+
+When the index is appended to a stream it will be skipped by regular decoders,
+so the output remains compatible with other decoders.
+
+## Creating an Index
+
+To automatically add an index to a stream, add `WriterAddIndex()` option to your writer.
+Then the index will be added to the stream when `Close()` is called.
+
+```
+ // Add Index to stream...
+ enc := s2.NewWriter(w, s2.WriterAddIndex())
+ io.Copy(enc, r)
+ enc.Close()
+```
+
+If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`.
+This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`.
+
+```
+ // Get index for separate storage...
+ enc := s2.NewWriter(w)
+ io.Copy(enc, r)
+ index, err := enc.CloseIndex()
+```
+
+The `index` can then be used needing to read from the stream.
+This means the index can be used without needing to seek to the end of the stream
+or for manually forwarding streams. See below.
+
+Finally, an existing S2/Snappy stream can be indexed using the `s2.IndexStream(r io.Reader)` function.
+
+## Using Indexes
+
+To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available.
+
+Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader.
+
+If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported.
+Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.
+
+```
+ dec := s2.NewReader(r)
+ rs, err := dec.ReadSeeker(false, nil)
+ rs.Seek(wantOffset, io.SeekStart)
+```
+
+Get a seeker to seek forward. Since no index is provided, the index is read from the stream.
+This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.
+
+A custom index can be specified which will be used if supplied.
+When using a custom index, it will not be read from the input stream.
+
+```
+ dec := s2.NewReader(r)
+ rs, err := dec.ReadSeeker(false, index)
+ rs.Seek(wantOffset, io.SeekStart)
+```
+
+This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker
+
+```
+ dec := s2.NewReader(r)
+ rs, err := dec.ReadSeeker(true, index)
+ rs.Seek(wantOffset, io.SeekStart)
+```
+
+Finally, since we specify that we want to do random seeking `r` must be an io.Seeker.
+
+The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader,
+meaning changes performed to one is reflected in the other.
+
+To check if a stream contains an index at the end, the `(*Index).LoadStream(rs io.ReadSeeker) error` can be used.
+
+## Manually Forwarding Streams
+
+Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type.
+This can be used for parsing indexes, either separate or in streams.
+
+In some cases it may not be possible to serve a seekable stream.
+This can for instance be an HTTP stream, where the Range request
+is sent at the start of the stream.
+
+With a little bit of extra code it is still possible to use indexes
+to forward to specific offset with a single forward skip.
+
+It is possible to load the index manually like this:
+```
+ var index s2.Index
+ _, err = index.Load(idxBytes)
+```
+
+This can be used to figure out how much to offset the compressed stream:
+
+```
+ compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
+```
+
+The `compressedOffset` is the number of bytes that should be skipped
+from the beginning of the compressed file.
+
+The `uncompressedOffset` will then be offset of the uncompressed bytes returned
+when decoding from that position. This will always be <= wantOffset.
+
+When creating a decoder it must be specified that it should *not* expect a stream identifier
+at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset`
+we create the decoder like this:
+
+```
+ dec := s2.NewReader(r, s2.ReaderIgnoreStreamIdentifier())
+```
+
+We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want.
+This is done using the regular "Skip" function:
+
+```
+ err = dec.Skip(wantOffset - uncompressedOffset)
+```
+
+This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset.
+
+## Index Format:
+
+Each block is structured as a snappy skippable block, with the chunk ID 0x99.
+
+The block can be read from the front, but contains information so it can be read from the back as well.
+
+Numbers are stored as fixed size little endian values or [zigzag encoded](https://developers.google.com/protocol-buffers/docs/encoding#signed_integers) [base 128 varints](https://developers.google.com/protocol-buffers/docs/encoding),
+with un-encoded value length of 64 bits, unless other limits are specified.
+
+| Content | Format |
+|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
+| ID, `[1]byte` | Always 0x99. |
+| Data Length, `[3]byte` | 3 byte little-endian length of the chunk in bytes, following this. |
+| Header `[6]byte` | Header, must be `[115, 50, 105, 100, 120, 0]` or in text: "s2idx\x00". |
+| UncompressedSize, Varint | Total Uncompressed size. |
+| CompressedSize, Varint | Total Compressed size if known. Should be -1 if unknown. |
+| EstBlockSize, Varint | Block Size, used for guessing uncompressed offsets. Must be >= 0. |
+| Entries, Varint | Number of Entries in index, must be < 65536 and >=0. |
+| HasUncompressedOffsets `byte` | 0 if no uncompressed offsets are present, 1 if present. Other values are invalid. |
+| UncompressedOffsets, [Entries]VarInt | Uncompressed offsets. See below how to decode. |
+| CompressedOffsets, [Entries]VarInt | Compressed offsets. See below how to decode. |
+| Block Size, `[4]byte` | Little Endian total encoded size (including header and trailer). Can be used for searching backwards to start of block. |
+| Trailer `[6]byte` | Trailer, must be `[0, 120, 100, 105, 50, 115]` or in text: "\x00xdi2s". Can be used for identifying block from end of stream. |
+
+For regular streams the uncompressed offsets are fully predictable,
+so `HasUncompressedOffsets` allows to specify that compressed blocks all have
+exactly `EstBlockSize` bytes of uncompressed content.
+
+Entries *must* be in order, starting with the lowest offset,
+and there *must* be no uncompressed offset duplicates.
+Entries *may* point to the start of a skippable block,
+but it is then not allowed to also have an entry for the next block since
+that would give an uncompressed offset duplicate.
+
+There is no requirement for all blocks to be represented in the index.
+In fact there is a maximum of 65536 block entries in an index.
+
+The writer can use any method to reduce the number of entries.
+An implicit block start at 0,0 can be assumed.
+
+### Decoding entries:
+
+```
+// Read Uncompressed entries.
+// Each assumes EstBlockSize delta from previous.
+for each entry {
+ uOff = 0
+ if HasUncompressedOffsets == 1 {
+ uOff = ReadVarInt // Read value from stream
+ }
+
+ // Except for the first entry, use previous values.
+ if entryNum == 0 {
+ entry[entryNum].UncompressedOffset = uOff
+ continue
+ }
+
+ // Uncompressed uses previous offset and adds EstBlockSize
+ entry[entryNum].UncompressedOffset = entry[entryNum-1].UncompressedOffset + EstBlockSize
+}
+
+
+// Guess that the first block will be 50% of uncompressed size.
+// Integer truncating division must be used.
+CompressGuess := EstBlockSize / 2
+
+// Read Compressed entries.
+// Each assumes CompressGuess delta from previous.
+// CompressGuess is adjusted for each value.
+for each entry {
+ cOff = ReadVarInt // Read value from stream
+
+ // Except for the first entry, use previous values.
+ if entryNum == 0 {
+ entry[entryNum].CompressedOffset = cOff
+ continue
+ }
+
+ // Compressed uses previous and our estimate.
+ entry[entryNum].CompressedOffset = entry[entryNum-1].CompressedOffset + CompressGuess
+
+ // Adjust compressed offset for next loop, integer truncating division must be used.
+ CompressGuess += cOff/2
+}
+```
# Format Extensions
diff --git a/vendor/github.com/klauspost/compress/s2/decode.go b/vendor/github.com/klauspost/compress/s2/decode.go
index d0ae5304..9e7fce88 100644
--- a/vendor/github.com/klauspost/compress/s2/decode.go
+++ b/vendor/github.com/klauspost/compress/s2/decode.go
@@ -8,7 +8,9 @@ package s2
import (
"encoding/binary"
"errors"
+ "fmt"
"io"
+ "io/ioutil"
)
var (
@@ -22,6 +24,16 @@ var (
ErrUnsupported = errors.New("s2: unsupported input")
)
+// ErrCantSeek is returned if the stream cannot be seeked.
+type ErrCantSeek struct {
+ Reason string
+}
+
+// Error returns the error as string.
+func (e ErrCantSeek) Error() string {
+ return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
+}
+
// DecodedLen returns the length of the decoded block.
func DecodedLen(src []byte) (int, error) {
v, _, err := decodedLen(src)
@@ -88,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
} else {
nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
}
+ nr.readHeader = nr.ignoreStreamID
nr.paramsOK = true
return &nr
}
@@ -131,12 +144,41 @@ func ReaderAllocBlock(blockSize int) ReaderOption {
}
}
+// ReaderIgnoreStreamIdentifier will make the reader skip the expected
+// stream identifier at the beginning of the stream.
+// This can be used when serving a stream that has been forwarded to a specific point.
+func ReaderIgnoreStreamIdentifier() ReaderOption {
+ return func(r *Reader) error {
+ r.ignoreStreamID = true
+ return nil
+ }
+}
+
+// ReaderSkippableCB will register a callback for chuncks with the specified ID.
+// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
+// For each chunk with the ID, the callback is called with the content.
+// Any returned non-nil error will abort decompression.
+// Only one callback per ID is supported, latest sent will be used.
+func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
+ return func(r *Reader) error {
+ if id < 0x80 || id > 0xfd {
+ return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
+ }
+ r.skippableCB[id] = fn
+ return nil
+ }
+}
+
// Reader is an io.Reader that can read Snappy-compressed bytes.
type Reader struct {
- r io.Reader
- err error
- decoded []byte
- buf []byte
+ r io.Reader
+ err error
+ decoded []byte
+ buf []byte
+ skippableCB [0x80]func(r io.Reader) error
+ blockStart int64 // Uncompressed offset at start of current.
+ index *Index
+
// decoded[i:j] contains decoded bytes that have not yet been passed on.
i, j int
// maximum block size allowed.
@@ -144,10 +186,11 @@ type Reader struct {
// maximum expected buffer size.
maxBufSize int
// alloc a buffer this size if > 0.
- lazyBuf int
- readHeader bool
- paramsOK bool
- snappyFrame bool
+ lazyBuf int
+ readHeader bool
+ paramsOK bool
+ snappyFrame bool
+ ignoreStreamID bool
}
// ensureBufferSize will ensure that the buffer can take at least n bytes.
@@ -172,11 +215,12 @@ func (r *Reader) Reset(reader io.Reader) {
if !r.paramsOK {
return
}
+ r.index = nil
r.r = reader
r.err = nil
r.i = 0
r.j = 0
- r.readHeader = false
+ r.readHeader = r.ignoreStreamID
}
func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
@@ -189,11 +233,24 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
return true
}
-// skipN will skip n bytes.
+// skippable will skip n bytes.
// If the supplied reader supports seeking that is used.
// tmp is used as a temporary buffer for reading.
// The supplied slice does not need to be the size of the read.
-func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) {
+func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
+ if id < 0x80 {
+ r.err = fmt.Errorf("interbal error: skippable id < 0x80")
+ return false
+ }
+ if fn := r.skippableCB[id-0x80]; fn != nil {
+ rd := io.LimitReader(r.r, int64(n))
+ r.err = fn(rd)
+ if r.err != nil {
+ return false
+ }
+ _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
+ return r.err == nil
+ }
if rs, ok := r.r.(io.ReadSeeker); ok {
_, err := rs.Seek(int64(n), io.SeekCurrent)
if err == nil {
@@ -247,6 +304,7 @@ func (r *Reader) Read(p []byte) (int, error) {
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
+ r.blockStart += int64(r.j)
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -294,6 +352,7 @@ func (r *Reader) Read(p []byte) (int, error) {
continue
case chunkTypeUncompressedData:
+ r.blockStart += int64(r.j)
// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -357,17 +416,20 @@ func (r *Reader) Read(p []byte) (int, error) {
if chunkType <= 0x7f {
// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
+ // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
r.err = ErrUnsupported
return 0, r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
- if chunkLen > maxBlockSize {
+ if chunkLen > maxChunkSize {
+ // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
r.err = ErrUnsupported
return 0, r.err
}
- if !r.skipN(r.buf, chunkLen, false) {
+ // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
+ if !r.skippable(r.buf, chunkLen, false, chunkType) {
return 0, r.err
}
}
@@ -396,7 +458,7 @@ func (r *Reader) Skip(n int64) error {
return nil
}
n -= int64(r.j - r.i)
- r.i, r.j = 0, 0
+ r.i = r.j
}
// Buffer empty; read blocks until we have content.
@@ -420,6 +482,7 @@ func (r *Reader) Skip(n int64) error {
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
+ r.blockStart += int64(r.j)
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -468,6 +531,7 @@ func (r *Reader) Skip(n int64) error {
r.i, r.j = 0, dLen
continue
case chunkTypeUncompressedData:
+ r.blockStart += int64(r.j)
// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@@ -528,19 +592,138 @@ func (r *Reader) Skip(n int64) error {
r.err = ErrUnsupported
return r.err
}
- if chunkLen > maxBlockSize {
+ if chunkLen > maxChunkSize {
r.err = ErrUnsupported
return r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
- if !r.skipN(r.buf, chunkLen, false) {
+ if !r.skippable(r.buf, chunkLen, false, chunkType) {
return r.err
}
}
return nil
}
+// ReadSeeker provides random or forward seeking in compressed content.
+// See Reader.ReadSeeker
+type ReadSeeker struct {
+ *Reader
+}
+
+// ReadSeeker will return an io.ReadSeeker compatible version of the reader.
+// If 'random' is specified the returned io.Seeker can be used for
+// random seeking, otherwise only forward seeking is supported.
+// Enabling random seeking requires the original input to support
+// the io.Seeker interface.
+// A custom index can be specified which will be used if supplied.
+// When using a custom index, it will not be read from the input stream.
+// The returned ReadSeeker contains a shallow reference to the existing Reader,
+// meaning changes performed to one is reflected in the other.
+func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
+ // Read index if provided.
+ if len(index) != 0 {
+ if r.index == nil {
+ r.index = &Index{}
+ }
+ if _, err := r.index.Load(index); err != nil {
+ return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
+ }
+ }
+
+ // Check if input is seekable
+ rs, ok := r.r.(io.ReadSeeker)
+ if !ok {
+ if !random {
+ return &ReadSeeker{Reader: r}, nil
+ }
+ return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
+ }
+
+ if r.index != nil {
+ // Seekable and index, ok...
+ return &ReadSeeker{Reader: r}, nil
+ }
+
+ // Load from stream.
+ r.index = &Index{}
+
+ // Read current position.
+ pos, err := rs.Seek(0, io.SeekCurrent)
+ if err != nil {
+ return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
+ }
+ err = r.index.LoadStream(rs)
+ if err != nil {
+ if err == ErrUnsupported {
+ return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
+ }
+ return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
+ }
+
+ // reset position.
+ _, err = rs.Seek(pos, io.SeekStart)
+ if err != nil {
+ return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
+ }
+ return &ReadSeeker{Reader: r}, nil
+}
+
+// Seek allows seeking in compressed data.
+func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
+ if r.err != nil {
+ return 0, r.err
+ }
+ if offset == 0 && whence == io.SeekCurrent {
+ return r.blockStart + int64(r.i), nil
+ }
+ if !r.readHeader {
+ // Make sure we read the header.
+ _, r.err = r.Read([]byte{})
+ }
+ rs, ok := r.r.(io.ReadSeeker)
+ if r.index == nil || !ok {
+ if whence == io.SeekCurrent && offset >= 0 {
+ err := r.Skip(offset)
+ return r.blockStart + int64(r.i), err
+ }
+ if whence == io.SeekStart && offset >= r.blockStart+int64(r.i) {
+ err := r.Skip(offset - r.blockStart - int64(r.i))
+ return r.blockStart + int64(r.i), err
+ }
+ return 0, ErrUnsupported
+
+ }
+
+ switch whence {
+ case io.SeekCurrent:
+ offset += r.blockStart + int64(r.i)
+ case io.SeekEnd:
+ offset = -offset
+ }
+ c, u, err := r.index.Find(offset)
+ if err != nil {
+ return r.blockStart + int64(r.i), err
+ }
+
+ // Seek to next block
+ _, err = rs.Seek(c, io.SeekStart)
+ if err != nil {
+ return 0, err
+ }
+
+ if offset < 0 {
+ offset = r.index.TotalUncompressed + offset
+ }
+
+ r.i = r.j // Remove rest of current block.
+ if u < offset {
+ // Forward inside block
+ return offset, r.Skip(offset - u)
+ }
+ return offset, nil
+}
+
// ReadByte satisfies the io.ByteReader interface.
func (r *Reader) ReadByte() (byte, error) {
if r.err != nil {
@@ -563,3 +746,17 @@ func (r *Reader) ReadByte() (byte, error) {
}
return 0, io.ErrNoProgress
}
+
+// SkippableCB will register a callback for chunks with the specified ID.
+// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive).
+// For each chunk with the ID, the callback is called with the content.
+// Any returned non-nil error will abort decompression.
+// Only one callback per ID is supported, latest sent will be used.
+// Sending a nil function will disable previous callbacks.
+func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
+ if id < 0x80 || id > chunkTypePadding {
+ return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
+ }
+ r.skippableCB[id] = fn
+ return nil
+}
diff --git a/vendor/github.com/klauspost/compress/s2/encode.go b/vendor/github.com/klauspost/compress/s2/encode.go
index aa8b108d..59f992ca 100644
--- a/vendor/github.com/klauspost/compress/s2/encode.go
+++ b/vendor/github.com/klauspost/compress/s2/encode.go
@@ -395,23 +395,26 @@ type Writer struct {
// ibuf is a buffer for the incoming (uncompressed) bytes.
ibuf []byte
- blockSize int
- obufLen int
- concurrency int
- written int64
- output chan chan result
- buffers sync.Pool
- pad int
+ blockSize int
+ obufLen int
+ concurrency int
+ written int64
+ uncompWritten int64 // Bytes sent to compression
+ output chan chan result
+ buffers sync.Pool
+ pad int
writer io.Writer
randSrc io.Reader
writerWg sync.WaitGroup
+ index Index
// wroteStreamHeader is whether we have written the stream header.
wroteStreamHeader bool
paramsOK bool
snappy bool
flushOnWrite bool
+ appendIndex bool
level uint8
}
@@ -422,7 +425,11 @@ const (
levelBest
)
-type result []byte
+type result struct {
+ b []byte
+ // Uncompressed start offset
+ startOffset int64
+}
// err returns the previously set error.
// If no error has been set it is set to err if not nil.
@@ -454,6 +461,9 @@ func (w *Writer) Reset(writer io.Writer) {
w.wroteStreamHeader = false
w.written = 0
w.writer = writer
+ w.uncompWritten = 0
+ w.index.reset(w.blockSize)
+
// If we didn't get a writer, stop here.
if writer == nil {
return
@@ -474,7 +484,8 @@ func (w *Writer) Reset(writer io.Writer) {
// Get a queued write.
for write := range toWrite {
// Wait for the data to be available.
- in := <-write
+ input := <-write
+ in := input.b
if len(in) > 0 {
if w.err(nil) == nil {
// Don't expose data from previous buffers.
@@ -485,11 +496,12 @@ func (w *Writer) Reset(writer io.Writer) {
err = io.ErrShortBuffer
}
_ = w.err(err)
+ w.err(w.index.add(w.written, input.startOffset))
w.written += int64(n)
}
}
if cap(in) >= w.obufLen {
- w.buffers.Put([]byte(in))
+ w.buffers.Put(in)
}
// close the incoming write request.
// This can be used for synchronizing flushes.
@@ -500,6 +512,9 @@ func (w *Writer) Reset(writer io.Writer) {
// Write satisfies the io.Writer interface.
func (w *Writer) Write(p []byte) (nRet int, errRet error) {
+ if err := w.err(nil); err != nil {
+ return 0, err
+ }
if w.flushOnWrite {
return w.write(p)
}
@@ -535,6 +550,9 @@ func (w *Writer) Write(p []byte) (nRet int, errRet error) {
// The return value n is the number of bytes read.
// Any error except io.EOF encountered during the read is also returned.
func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
+ if err := w.err(nil); err != nil {
+ return 0, err
+ }
if len(w.ibuf) > 0 {
err := w.Flush()
if err != nil {
@@ -577,6 +595,85 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return n, w.err(nil)
}
+// AddSkippableBlock will add a skippable block to the stream.
+// The ID must be 0x80-0xfe (inclusive).
+// Length of the skippable block must be <= 16777215 bytes.
+func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
+ if err := w.err(nil); err != nil {
+ return err
+ }
+ if len(data) == 0 {
+ return nil
+ }
+ if id < 0x80 || id > chunkTypePadding {
+ return fmt.Errorf("invalid skippable block id %x", id)
+ }
+ if len(data) > maxChunkSize {
+ return fmt.Errorf("skippable block excessed maximum size")
+ }
+ var header [4]byte
+ chunkLen := 4 + len(data)
+ header[0] = id
+ header[1] = uint8(chunkLen >> 0)
+ header[2] = uint8(chunkLen >> 8)
+ header[3] = uint8(chunkLen >> 16)
+ if w.concurrency == 1 {
+ write := func(b []byte) error {
+ n, err := w.writer.Write(b)
+ if err = w.err(err); err != nil {
+ return err
+ }
+ if n != len(data) {
+ return w.err(io.ErrShortWrite)
+ }
+ w.written += int64(n)
+ return w.err(nil)
+ }
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ if w.snappy {
+ if err := write([]byte(magicChunkSnappy)); err != nil {
+ return err
+ }
+ } else {
+ if err := write([]byte(magicChunk)); err != nil {
+ return err
+ }
+ }
+ }
+ if err := write(header[:]); err != nil {
+ return err
+ }
+ if err := write(data); err != nil {
+ return err
+ }
+ }
+
+ // Create output...
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ hWriter := make(chan result)
+ w.output <- hWriter
+ if w.snappy {
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
+ } else {
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
+ }
+ }
+
+ // Copy input.
+ inbuf := w.buffers.Get().([]byte)[:4]
+ copy(inbuf, header[:])
+ inbuf = append(inbuf, data...)
+
+ output := make(chan result, 1)
+ // Queue output.
+ w.output <- output
+ output <- result{startOffset: w.uncompWritten, b: inbuf}
+
+ return nil
+}
+
// EncodeBuffer will add a buffer to the stream.
// This is the fastest way to encode a stream,
// but the input buffer cannot be written to by the caller
@@ -614,9 +711,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
- hWriter <- []byte(magicChunkSnappy)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
- hWriter <- []byte(magicChunk)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@@ -632,6 +729,10 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
+ res := result{
+ startOffset: w.uncompWritten,
+ }
+ w.uncompWritten += int64(len(uncompressed))
go func() {
checksum := crc(uncompressed)
@@ -664,7 +765,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
- output <- obuf
+ res.b = obuf
+ output <- res
}()
}
return nil
@@ -708,9 +810,9 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
- hWriter <- []byte(magicChunkSnappy)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
- hWriter <- []byte(magicChunk)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@@ -731,6 +833,11 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
+ res := result{
+ startOffset: w.uncompWritten,
+ }
+ w.uncompWritten += int64(len(uncompressed))
+
go func() {
checksum := crc(uncompressed)
@@ -763,7 +870,8 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
- output <- obuf
+ res.b = obuf
+ output <- res
// Put unused buffer back in pool.
w.buffers.Put(inbuf)
@@ -793,9 +901,9 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
- hWriter <- []byte(magicChunkSnappy)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
- hWriter <- []byte(magicChunk)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@@ -806,6 +914,11 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
+ res := result{
+ startOffset: w.uncompWritten,
+ }
+ w.uncompWritten += int64(len(uncompressed))
+
go func() {
checksum := crc(uncompressed)
@@ -838,7 +951,8 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
- output <- obuf
+ res.b = obuf
+ output <- res
// Put unused buffer back in pool.
w.buffers.Put(inbuf)
@@ -912,7 +1026,10 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
if n != len(obuf) {
return 0, w.err(io.ErrShortWrite)
}
+ w.err(w.index.add(w.written, w.uncompWritten))
w.written += int64(n)
+ w.uncompWritten += int64(len(uncompressed))
+
if chunkType == chunkTypeUncompressedData {
// Write uncompressed data.
n, err := w.writer.Write(uncompressed)
@@ -961,39 +1078,88 @@ func (w *Writer) Flush() error {
res := make(chan result)
w.output <- res
// Block until this has been picked up.
- res <- nil
+ res <- result{b: nil, startOffset: w.uncompWritten}
// When it is closed, we have flushed.
<-res
return w.err(nil)
}
// Close calls Flush and then closes the Writer.
-// Calling Close multiple times is ok.
+// Calling Close multiple times is ok,
+// but calling CloseIndex after this will make it not return the index.
func (w *Writer) Close() error {
+ _, err := w.closeIndex(w.appendIndex)
+ return err
+}
+
+// CloseIndex calls Close and returns an index on first call.
+// This is not required if you are only adding index to a stream.
+func (w *Writer) CloseIndex() ([]byte, error) {
+ return w.closeIndex(true)
+}
+
+func (w *Writer) closeIndex(idx bool) ([]byte, error) {
err := w.Flush()
if w.output != nil {
close(w.output)
w.writerWg.Wait()
w.output = nil
}
- if w.err(nil) == nil && w.writer != nil && w.pad > 0 {
- add := calcSkippableFrame(w.written, int64(w.pad))
- frame, err := skippableFrame(w.ibuf[:0], add, w.randSrc)
- if err = w.err(err); err != nil {
- return err
+
+ var index []byte
+ if w.err(nil) == nil && w.writer != nil {
+ // Create index.
+ if idx {
+ compSize := int64(-1)
+ if w.pad <= 1 {
+ compSize = w.written
+ }
+ index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
+ // Count as written for padding.
+ if w.appendIndex {
+ w.written += int64(len(index))
+ }
+ if true {
+ _, err := w.index.Load(index)
+ if err != nil {
+ panic(err)
+ }
+ }
+ }
+
+ if w.pad > 1 {
+ tmp := w.ibuf[:0]
+ if len(index) > 0 {
+ // Allocate another buffer.
+ tmp = w.buffers.Get().([]byte)[:0]
+ defer w.buffers.Put(tmp)
+ }
+ add := calcSkippableFrame(w.written, int64(w.pad))
+ frame, err := skippableFrame(tmp, add, w.randSrc)
+ if err = w.err(err); err != nil {
+ return nil, err
+ }
+ n, err2 := w.writer.Write(frame)
+ if err2 == nil && n != len(frame) {
+ err2 = io.ErrShortWrite
+ }
+ _ = w.err(err2)
+ }
+ if len(index) > 0 && w.appendIndex {
+ n, err2 := w.writer.Write(index)
+ if err2 == nil && n != len(index) {
+ err2 = io.ErrShortWrite
+ }
+ _ = w.err(err2)
}
- _, err2 := w.writer.Write(frame)
- _ = w.err(err2)
}
- _ = w.err(errClosed)
+ err = w.err(errClosed)
if err == errClosed {
- return nil
+ return index, nil
}
- return err
+ return nil, err
}
-const skippableFrameHeader = 4
-
// calcSkippableFrame will return a total size to be added for written
// to be divisible by multiple.
// The value will always be > skippableFrameHeader.
@@ -1057,6 +1223,15 @@ func WriterConcurrency(n int) WriterOption {
}
}
+// WriterAddIndex will append an index to the end of a stream
+// when it is closed.
+func WriterAddIndex() WriterOption {
+ return func(w *Writer) error {
+ w.appendIndex = true
+ return nil
+ }
+}
+
// WriterBetterCompression will enable better compression.
// EncodeBetter compresses better than Encode but typically with a
// 10-40% speed decrease on both compression and decompression.
diff --git a/vendor/github.com/klauspost/compress/s2/index.go b/vendor/github.com/klauspost/compress/s2/index.go
new file mode 100644
index 00000000..fd857682
--- /dev/null
+++ b/vendor/github.com/klauspost/compress/s2/index.go
@@ -0,0 +1,525 @@
+// Copyright (c) 2022+ Klaus Post. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package s2
+
+import (
+ "bytes"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "io"
+)
+
+const (
+ S2IndexHeader = "s2idx\x00"
+ S2IndexTrailer = "\x00xdi2s"
+ maxIndexEntries = 1 << 16
+)
+
+// Index represents an S2/Snappy index.
+type Index struct {
+ TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown.
+ TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown.
+ info []struct {
+ compressedOffset int64
+ uncompressedOffset int64
+ }
+ estBlockUncomp int64
+}
+
+func (i *Index) reset(maxBlock int) {
+ i.estBlockUncomp = int64(maxBlock)
+ i.TotalCompressed = -1
+ i.TotalUncompressed = -1
+ if len(i.info) > 0 {
+ i.info = i.info[:0]
+ }
+}
+
+// allocInfos will allocate an empty slice of infos.
+func (i *Index) allocInfos(n int) {
+ if n > maxIndexEntries {
+ panic("n > maxIndexEntries")
+ }
+ i.info = make([]struct {
+ compressedOffset int64
+ uncompressedOffset int64
+ }, 0, n)
+}
+
+// add an uncompressed and compressed pair.
+// Entries must be sent in order.
+func (i *Index) add(compressedOffset, uncompressedOffset int64) error {
+ if i == nil {
+ return nil
+ }
+ lastIdx := len(i.info) - 1
+ if lastIdx >= 0 {
+ latest := i.info[lastIdx]
+ if latest.uncompressedOffset == uncompressedOffset {
+ // Uncompressed didn't change, don't add entry,
+ // but update start index.
+ latest.compressedOffset = compressedOffset
+ i.info[lastIdx] = latest
+ return nil
+ }
+ if latest.uncompressedOffset > uncompressedOffset {
+ return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
+ }
+ if latest.compressedOffset > compressedOffset {
+ return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
+ }
+ }
+ i.info = append(i.info, struct {
+ compressedOffset int64
+ uncompressedOffset int64
+ }{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset})
+ return nil
+}
+
+// Find the offset at or before the wanted (uncompressed) offset.
+// If offset is 0 or positive it is the offset from the beginning of the file.
+// If the uncompressed size is known, the offset must be within the file.
+// If an offset outside the file is requested io.ErrUnexpectedEOF is returned.
+// If the offset is negative, it is interpreted as the distance from the end of the file,
+// where -1 represents the last byte.
+// If offset from the end of the file is requested, but size is unknown,
+// ErrUnsupported will be returned.
+func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) {
+ if i.TotalUncompressed < 0 {
+ return 0, 0, ErrCorrupt
+ }
+ if offset < 0 {
+ offset = i.TotalUncompressed + offset
+ if offset < 0 {
+ return 0, 0, io.ErrUnexpectedEOF
+ }
+ }
+ if offset > i.TotalUncompressed {
+ return 0, 0, io.ErrUnexpectedEOF
+ }
+ for _, info := range i.info {
+ if info.uncompressedOffset > offset {
+ break
+ }
+ compressedOff = info.compressedOffset
+ uncompressedOff = info.uncompressedOffset
+ }
+ return compressedOff, uncompressedOff, nil
+}
+
+// reduce to stay below maxIndexEntries
+func (i *Index) reduce() {
+ if len(i.info) < maxIndexEntries && i.estBlockUncomp >= 1<<20 {
+ return
+ }
+
+ // Algorithm, keep 1, remove removeN entries...
+ removeN := (len(i.info) + 1) / maxIndexEntries
+ src := i.info
+ j := 0
+
+ // Each block should be at least 1MB, but don't reduce below 1000 entries.
+ for i.estBlockUncomp*(int64(removeN)+1) < 1<<20 && len(i.info)/(removeN+1) > 1000 {
+ removeN++
+ }
+ for idx := 0; idx < len(src); idx++ {
+ i.info[j] = src[idx]
+ j++
+ idx += removeN
+ }
+ i.info = i.info[:j]
+ // Update maxblock estimate.
+ i.estBlockUncomp += i.estBlockUncomp * int64(removeN)
+}
+
+func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte {
+ i.reduce()
+ var tmp [binary.MaxVarintLen64]byte
+
+ initSize := len(b)
+ // We make the start a skippable header+size.
+ b = append(b, ChunkTypeIndex, 0, 0, 0)
+ b = append(b, []byte(S2IndexHeader)...)
+ // Total Uncompressed size
+ n := binary.PutVarint(tmp[:], uncompTotal)
+ b = append(b, tmp[:n]...)
+ // Total Compressed size
+ n = binary.PutVarint(tmp[:], compTotal)
+ b = append(b, tmp[:n]...)
+ // Put EstBlockUncomp size
+ n = binary.PutVarint(tmp[:], i.estBlockUncomp)
+ b = append(b, tmp[:n]...)
+ // Put length
+ n = binary.PutVarint(tmp[:], int64(len(i.info)))
+ b = append(b, tmp[:n]...)
+
+ // Check if we should add uncompressed offsets
+ var hasUncompressed byte
+ for idx, info := range i.info {
+ if idx == 0 {
+ if info.uncompressedOffset != 0 {
+ hasUncompressed = 1
+ break
+ }
+ continue
+ }
+ if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp {
+ hasUncompressed = 1
+ break
+ }
+ }
+ b = append(b, hasUncompressed)
+
+ // Add each entry
+ if hasUncompressed == 1 {
+ for idx, info := range i.info {
+ uOff := info.uncompressedOffset
+ if idx > 0 {
+ prev := i.info[idx-1]
+ uOff -= prev.uncompressedOffset + (i.estBlockUncomp)
+ }
+ n = binary.PutVarint(tmp[:], uOff)
+ b = append(b, tmp[:n]...)
+ }
+ }
+
+ // Initial compressed size estimate.
+ cPredict := i.estBlockUncomp / 2
+
+ for idx, info := range i.info {
+ cOff := info.compressedOffset
+ if idx > 0 {
+ prev := i.info[idx-1]
+ cOff -= prev.compressedOffset + cPredict
+ // Update compressed size prediction, with half the error.
+ cPredict += cOff / 2
+ }
+ n = binary.PutVarint(tmp[:], cOff)
+ b = append(b, tmp[:n]...)
+ }
+
+ // Add Total Size.
+ // Stored as fixed size for easier reading.
+ binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer)))
+ b = append(b, tmp[:4]...)
+ // Trailer
+ b = append(b, []byte(S2IndexTrailer)...)
+
+ // Update size
+ chunkLen := len(b) - initSize - skippableFrameHeader
+ b[initSize+1] = uint8(chunkLen >> 0)
+ b[initSize+2] = uint8(chunkLen >> 8)
+ b[initSize+3] = uint8(chunkLen >> 16)
+ //fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal)
+ return b
+}
+
+// Load a binary index.
+// A zero value Index can be used or a previous one can be reused.
+func (i *Index) Load(b []byte) ([]byte, error) {
+ if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) {
+ return b, io.ErrUnexpectedEOF
+ }
+ if b[0] != ChunkTypeIndex {
+ return b, ErrCorrupt
+ }
+ chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
+ b = b[4:]
+
+ // Validate we have enough...
+ if len(b) < chunkLen {
+ return b, io.ErrUnexpectedEOF
+ }
+ if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
+ return b, ErrUnsupported
+ }
+ b = b[len(S2IndexHeader):]
+
+ // Total Uncompressed
+ if v, n := binary.Varint(b); n <= 0 || v < 0 {
+ return b, ErrCorrupt
+ } else {
+ i.TotalUncompressed = v
+ b = b[n:]
+ }
+
+ // Total Compressed
+ if v, n := binary.Varint(b); n <= 0 {
+ return b, ErrCorrupt
+ } else {
+ i.TotalCompressed = v
+ b = b[n:]
+ }
+
+ // Read EstBlockUncomp
+ if v, n := binary.Varint(b); n <= 0 {
+ return b, ErrCorrupt
+ } else {
+ if v < 0 {
+ return b, ErrCorrupt
+ }
+ i.estBlockUncomp = v
+ b = b[n:]
+ }
+
+ var entries int
+ if v, n := binary.Varint(b); n <= 0 {
+ return b, ErrCorrupt
+ } else {
+ if v < 0 || v > maxIndexEntries {
+ return b, ErrCorrupt
+ }
+ entries = int(v)
+ b = b[n:]
+ }
+ if cap(i.info) < entries {
+ i.allocInfos(entries)
+ }
+ i.info = i.info[:entries]
+
+ if len(b) < 1 {
+ return b, io.ErrUnexpectedEOF
+ }
+ hasUncompressed := b[0]
+ b = b[1:]
+ if hasUncompressed&1 != hasUncompressed {
+ return b, ErrCorrupt
+ }
+
+ // Add each uncompressed entry
+ for idx := range i.info {
+ var uOff int64
+ if hasUncompressed != 0 {
+ // Load delta
+ if v, n := binary.Varint(b); n <= 0 {
+ return b, ErrCorrupt
+ } else {
+ uOff = v
+ b = b[n:]
+ }
+ }
+
+ if idx > 0 {
+ prev := i.info[idx-1].uncompressedOffset
+ uOff += prev + (i.estBlockUncomp)
+ if uOff <= prev {
+ return b, ErrCorrupt
+ }
+ }
+ if uOff < 0 {
+ return b, ErrCorrupt
+ }
+ i.info[idx].uncompressedOffset = uOff
+ }
+
+ // Initial compressed size estimate.
+ cPredict := i.estBlockUncomp / 2
+
+ // Add each compressed entry
+ for idx := range i.info {
+ var cOff int64
+ if v, n := binary.Varint(b); n <= 0 {
+ return b, ErrCorrupt
+ } else {
+ cOff = v
+ b = b[n:]
+ }
+
+ if idx > 0 {
+ // Update compressed size prediction, with half the error.
+ cPredictNew := cPredict + cOff/2
+
+ prev := i.info[idx-1].compressedOffset
+ cOff += prev + cPredict
+ if cOff <= prev {
+ return b, ErrCorrupt
+ }
+ cPredict = cPredictNew
+ }
+ if cOff < 0 {
+ return b, ErrCorrupt
+ }
+ i.info[idx].compressedOffset = cOff
+ }
+ if len(b) < 4+len(S2IndexTrailer) {
+ return b, io.ErrUnexpectedEOF
+ }
+ // Skip size...
+ b = b[4:]
+
+ // Check trailer...
+ if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
+ return b, ErrCorrupt
+ }
+ return b[len(S2IndexTrailer):], nil
+}
+
+// LoadStream will load an index from the end of the supplied stream.
+// ErrUnsupported will be returned if the signature cannot be found.
+// ErrCorrupt will be returned if unexpected values are found.
+// io.ErrUnexpectedEOF is returned if there are too few bytes.
+// IO errors are returned as-is.
+func (i *Index) LoadStream(rs io.ReadSeeker) error {
+ // Go to end.
+ _, err := rs.Seek(-10, io.SeekEnd)
+ if err != nil {
+ return err
+ }
+ var tmp [10]byte
+ _, err = io.ReadFull(rs, tmp[:])
+ if err != nil {
+ return err
+ }
+ // Check trailer...
+ if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
+ return ErrUnsupported
+ }
+ sz := binary.LittleEndian.Uint32(tmp[:4])
+ if sz > maxChunkSize+skippableFrameHeader {
+ return ErrCorrupt
+ }
+ _, err = rs.Seek(-int64(sz), io.SeekEnd)
+ if err != nil {
+ return err
+ }
+
+ // Read index.
+ buf := make([]byte, sz)
+ _, err = io.ReadFull(rs, buf)
+ if err != nil {
+ return err
+ }
+ _, err = i.Load(buf)
+ return err
+}
+
+// IndexStream will return an index for a stream.
+// The stream structure will be checked, but
+// data within blocks is not verified.
+// The returned index can either be appended to the end of the stream
+// or stored separately.
+func IndexStream(r io.Reader) ([]byte, error) {
+ var i Index
+ var buf [maxChunkSize]byte
+ var readHeader bool
+ for {
+ _, err := io.ReadFull(r, buf[:4])
+ if err != nil {
+ if err == io.EOF {
+ return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil
+ }
+ return nil, err
+ }
+ // Start of this chunk.
+ startChunk := i.TotalCompressed
+ i.TotalCompressed += 4
+
+ chunkType := buf[0]
+ if !readHeader {
+ if chunkType != chunkTypeStreamIdentifier {
+ return nil, ErrCorrupt
+ }
+ readHeader = true
+ }
+ chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16
+ if chunkLen < checksumSize {
+ return nil, ErrCorrupt
+ }
+
+ i.TotalCompressed += int64(chunkLen)
+ _, err = io.ReadFull(r, buf[:chunkLen])
+ if err != nil {
+ return nil, io.ErrUnexpectedEOF
+ }
+ // The chunk types are specified at
+ // https://github.com/google/snappy/blob/master/framing_format.txt
+ switch chunkType {
+ case chunkTypeCompressedData:
+ // Section 4.2. Compressed data (chunk type 0x00).
+ // Skip checksum.
+ dLen, err := DecodedLen(buf[checksumSize:])
+ if err != nil {
+ return nil, err
+ }
+ if dLen > maxBlockSize {
+ return nil, ErrCorrupt
+ }
+ if i.estBlockUncomp == 0 {
+ // Use first block for estimate...
+ i.estBlockUncomp = int64(dLen)
+ }
+ err = i.add(startChunk, i.TotalUncompressed)
+ if err != nil {
+ return nil, err
+ }
+ i.TotalUncompressed += int64(dLen)
+ continue
+ case chunkTypeUncompressedData:
+ n2 := chunkLen - checksumSize
+ if n2 > maxBlockSize {
+ return nil, ErrCorrupt
+ }
+ if i.estBlockUncomp == 0 {
+ // Use first block for estimate...
+ i.estBlockUncomp = int64(n2)
+ }
+ err = i.add(startChunk, i.TotalUncompressed)
+ if err != nil {
+ return nil, err
+ }
+ i.TotalUncompressed += int64(n2)
+ continue
+ case chunkTypeStreamIdentifier:
+ // Section 4.1. Stream identifier (chunk type 0xff).
+ if chunkLen != len(magicBody) {
+ return nil, ErrCorrupt
+ }
+
+ if string(buf[:len(magicBody)]) != magicBody {
+ if string(buf[:len(magicBody)]) != magicBodySnappy {
+ return nil, ErrCorrupt
+ }
+ }
+
+ continue
+ }
+
+ if chunkType <= 0x7f {
+ // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
+ return nil, ErrUnsupported
+ }
+ if chunkLen > maxChunkSize {
+ return nil, ErrUnsupported
+ }
+ // Section 4.4 Padding (chunk type 0xfe).
+ // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
+ }
+}
+
+// JSON returns the index as JSON text.
+func (i *Index) JSON() []byte {
+ x := struct {
+ TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown.
+ TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown.
+ Offsets []struct {
+ CompressedOffset int64 `json:"compressed"`
+ UncompressedOffset int64 `json:"uncompressed"`
+ } `json:"offsets"`
+ EstBlockUncomp int64 `json:"est_block_uncompressed"`
+ }{
+ TotalUncompressed: i.TotalUncompressed,
+ TotalCompressed: i.TotalCompressed,
+ EstBlockUncomp: i.estBlockUncomp,
+ }
+ for _, v := range i.info {
+ x.Offsets = append(x.Offsets, struct {
+ CompressedOffset int64 `json:"compressed"`
+ UncompressedOffset int64 `json:"uncompressed"`
+ }{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset})
+ }
+ b, _ := json.MarshalIndent(x, "", " ")
+ return b
+}
diff --git a/vendor/github.com/klauspost/compress/s2/s2.go b/vendor/github.com/klauspost/compress/s2/s2.go
index 89d69e96..dae3f731 100644
--- a/vendor/github.com/klauspost/compress/s2/s2.go
+++ b/vendor/github.com/klauspost/compress/s2/s2.go
@@ -87,6 +87,9 @@ const (
// minBlockSize is the minimum size of block setting when creating a writer.
minBlockSize = 4 << 10
+ skippableFrameHeader = 4
+ maxChunkSize = 1<<24 - 1 // 16777215
+
// Default block size
defaultBlockSize = 1 << 20
@@ -99,6 +102,7 @@ const (
const (
chunkTypeCompressedData = 0x00
chunkTypeUncompressedData = 0x01
+ ChunkTypeIndex = 0x99
chunkTypePadding = 0xfe
chunkTypeStreamIdentifier = 0xff
)