diff options
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2')
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/README.md | 256 | ||||
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/decode.go | 229 | ||||
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/encode.go | 241 | ||||
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/index.go | 525 | ||||
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/s2.go | 4 |
5 files changed, 1188 insertions, 67 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/README.md b/vendor/github.com/klauspost/compress/s2/README.md index 81fad652..e6716aea 100644 --- a/vendor/github.com/klauspost/compress/s2/README.md +++ b/vendor/github.com/klauspost/compress/s2/README.md @@ -20,6 +20,7 @@ This is important, so you don't have to worry about spending CPU cycles on alrea * Concurrent stream compression * Faster decompression, even for Snappy compatible content * Ability to quickly skip forward in compressed stream +* Random seeking with indexes * Compatible with reading Snappy compressed content * Smaller block size overhead on incompressible blocks * Block concatenation @@ -29,8 +30,8 @@ This is important, so you don't have to worry about spending CPU cycles on alrea ## Drawbacks over Snappy -* Not optimized for 32 bit systems. -* Streams use slightly more memory due to larger blocks and concurrency (configurable). +* Not optimized for 32 bit systems +* Streams use slightly more memory due to larger blocks and concurrency (configurable) # Usage @@ -141,7 +142,7 @@ Binaries can be downloaded on the [Releases Page](https://github.com/klauspost/c Installing then requires Go to be installed. To install them, use: -`go install github.com/klauspost/compress/s2/cmd/s2c && go install github.com/klauspost/compress/s2/cmd/s2d` +`go install github.com/klauspost/compress/s2/cmd/s2c@latest && go install github.com/klauspost/compress/s2/cmd/s2d@latest` To build binaries to the current folder use: @@ -176,6 +177,8 @@ Options: Compress faster, but with a minor compression loss -help Display help + -index + Add seek index (default true) -o string Write output to another file. Single input file only -pad string @@ -217,11 +220,15 @@ Options: Display help -o string Write output to another file. Single input file only - -q Don't write any output to terminal, except errors + -offset string + Start at offset. Examples: 92, 64K, 256K, 1M, 4M. Requires Index + -q Don't write any output to terminal, except errors -rm - Delete source file(s) after successful decompression + Delete source file(s) after successful decompression -safe - Do not overwrite output files + Do not overwrite output files + -tail string + Return last of compressed file. Examples: 92, 64K, 256K, 1M, 4M. Requires Index -verify Verify files, but do not write output ``` @@ -633,12 +640,12 @@ Compression and speed is typically a bit better `MaxEncodedLen` is also smaller Comparison of [`webdevdata.org-2015-01-07-subset`](https://files.klauspost.com/compress/webdevdata.org-2015-01-07-4GB-subset.7z), 53927 files, total input size: 4,014,735,833 bytes. amd64, single goroutine used: -| Encoder | Size | MB/s | Reduction | -|-----------------------|------------|--------|------------ -| snappy.Encode | 1128706759 | 725.59 | 71.89% | -| s2.EncodeSnappy | 1093823291 | 899.16 | 72.75% | -| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% | -| s2.EncodeSnappyBest | 944507998 | 66.00 | 76.47% | +| Encoder | Size | MB/s | Reduction | +|-----------------------|------------|------------|------------ +| snappy.Encode | 1128706759 | 725.59 | 71.89% | +| s2.EncodeSnappy | 1093823291 | **899.16** | 72.75% | +| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% | +| s2.EncodeSnappyBest | 944507998 | 66.00 | **76.47%**| ## Streams @@ -649,11 +656,11 @@ Comparison of different streams, AMD Ryzen 3950x, 16 cores. Size and throughput: | File | snappy.NewWriter | S2 Snappy | S2 Snappy, Better | S2 Snappy, Best | |-----------------------------|--------------------------|---------------------------|--------------------------|-------------------------| -| nyc-taxi-data-10M.csv | 1316042016 - 517.54MB/s | 1307003093 - 8406.29MB/s | 1174534014 - 4984.35MB/s | 1115904679 - 177.81MB/s | -| enwik10 | 5088294643 - 433.45MB/s | 5175840939 - 8454.52MB/s | 4560784526 - 4403.10MB/s | 4340299103 - 159.71MB/s | -| 10gb.tar | 6056946612 - 703.25MB/s | 6208571995 - 9035.75MB/s | 5741646126 - 2402.08MB/s | 5548973895 - 171.17MB/s | -| github-june-2days-2019.json | 1525176492 - 908.11MB/s | 1476519054 - 12625.93MB/s | 1400547532 - 6163.61MB/s | 1321887137 - 200.71MB/s | -| consensus.db.10gb | 5412897703 - 1054.38MB/s | 5354073487 - 12634.82MB/s | 5335069899 - 2472.23MB/s | 5201000954 - 166.32MB/s | +| nyc-taxi-data-10M.csv | 1316042016 - 539.47MB/s | 1307003093 - 10132.73MB/s | 1174534014 - 5002.44MB/s | 1115904679 - 177.97MB/s | +| enwik10 (xml) | 5088294643 - 451.13MB/s | 5175840939 - 9440.69MB/s | 4560784526 - 4487.21MB/s | 4340299103 - 158.92MB/s | +| 10gb.tar (mixed) | 6056946612 - 729.73MB/s | 6208571995 - 9978.05MB/s | 5741646126 - 4919.98MB/s | 5548973895 - 180.44MB/s | +| github-june-2days-2019.json | 1525176492 - 933.00MB/s | 1476519054 - 13150.12MB/s | 1400547532 - 5803.40MB/s | 1321887137 - 204.29MB/s | +| consensus.db.10gb (db) | 5412897703 - 1102.14MB/s | 5354073487 - 13562.91MB/s | 5335069899 - 5294.73MB/s | 5201000954 - 175.72MB/s | # Decompression @@ -679,7 +686,220 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped, Blocks can be concatenated using the `ConcatBlocks` function. -Snappy blocks/streams can safely be concatenated with S2 blocks and streams. +Snappy blocks/streams can safely be concatenated with S2 blocks and streams. +Streams with indexes (see below) will currently not work on concatenated streams. + +# Stream Seek Index + +S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data. + +The index can either be appended to the stream as a skippable block or returned for separate storage. + +When the index is appended to a stream it will be skipped by regular decoders, +so the output remains compatible with other decoders. + +## Creating an Index + +To automatically add an index to a stream, add `WriterAddIndex()` option to your writer. +Then the index will be added to the stream when `Close()` is called. + +``` + // Add Index to stream... + enc := s2.NewWriter(w, s2.WriterAddIndex()) + io.Copy(enc, r) + enc.Close() +``` + +If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`. +This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`. + +``` + // Get index for separate storage... + enc := s2.NewWriter(w) + io.Copy(enc, r) + index, err := enc.CloseIndex() +``` + +The `index` can then be used needing to read from the stream. +This means the index can be used without needing to seek to the end of the stream +or for manually forwarding streams. See below. + +Finally, an existing S2/Snappy stream can be indexed using the `s2.IndexStream(r io.Reader)` function. + +## Using Indexes + +To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available. + +Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader. + +If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported. +Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. + +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(false, nil) + rs.Seek(wantOffset, io.SeekStart) +``` + +Get a seeker to seek forward. Since no index is provided, the index is read from the stream. +This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. + +A custom index can be specified which will be used if supplied. +When using a custom index, it will not be read from the input stream. + +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(false, index) + rs.Seek(wantOffset, io.SeekStart) +``` + +This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker + +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(true, index) + rs.Seek(wantOffset, io.SeekStart) +``` + +Finally, since we specify that we want to do random seeking `r` must be an io.Seeker. + +The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader, +meaning changes performed to one is reflected in the other. + +To check if a stream contains an index at the end, the `(*Index).LoadStream(rs io.ReadSeeker) error` can be used. + +## Manually Forwarding Streams + +Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type. +This can be used for parsing indexes, either separate or in streams. + +In some cases it may not be possible to serve a seekable stream. +This can for instance be an HTTP stream, where the Range request +is sent at the start of the stream. + +With a little bit of extra code it is still possible to use indexes +to forward to specific offset with a single forward skip. + +It is possible to load the index manually like this: +``` + var index s2.Index + _, err = index.Load(idxBytes) +``` + +This can be used to figure out how much to offset the compressed stream: + +``` + compressedOffset, uncompressedOffset, err := index.Find(wantOffset) +``` + +The `compressedOffset` is the number of bytes that should be skipped +from the beginning of the compressed file. + +The `uncompressedOffset` will then be offset of the uncompressed bytes returned +when decoding from that position. This will always be <= wantOffset. + +When creating a decoder it must be specified that it should *not* expect a stream identifier +at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset` +we create the decoder like this: + +``` + dec := s2.NewReader(r, s2.ReaderIgnoreStreamIdentifier()) +``` + +We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want. +This is done using the regular "Skip" function: + +``` + err = dec.Skip(wantOffset - uncompressedOffset) +``` + +This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset. + +## Index Format: + +Each block is structured as a snappy skippable block, with the chunk ID 0x99. + +The block can be read from the front, but contains information so it can be read from the back as well. + +Numbers are stored as fixed size little endian values or [zigzag encoded](https://developers.google.com/protocol-buffers/docs/encoding#signed_integers) [base 128 varints](https://developers.google.com/protocol-buffers/docs/encoding), +with un-encoded value length of 64 bits, unless other limits are specified. + +| Content | Format | +|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------| +| ID, `[1]byte` | Always 0x99. | +| Data Length, `[3]byte` | 3 byte little-endian length of the chunk in bytes, following this. | +| Header `[6]byte` | Header, must be `[115, 50, 105, 100, 120, 0]` or in text: "s2idx\x00". | +| UncompressedSize, Varint | Total Uncompressed size. | +| CompressedSize, Varint | Total Compressed size if known. Should be -1 if unknown. | +| EstBlockSize, Varint | Block Size, used for guessing uncompressed offsets. Must be >= 0. | +| Entries, Varint | Number of Entries in index, must be < 65536 and >=0. | +| HasUncompressedOffsets `byte` | 0 if no uncompressed offsets are present, 1 if present. Other values are invalid. | +| UncompressedOffsets, [Entries]VarInt | Uncompressed offsets. See below how to decode. | +| CompressedOffsets, [Entries]VarInt | Compressed offsets. See below how to decode. | +| Block Size, `[4]byte` | Little Endian total encoded size (including header and trailer). Can be used for searching backwards to start of block. | +| Trailer `[6]byte` | Trailer, must be `[0, 120, 100, 105, 50, 115]` or in text: "\x00xdi2s". Can be used for identifying block from end of stream. | + +For regular streams the uncompressed offsets are fully predictable, +so `HasUncompressedOffsets` allows to specify that compressed blocks all have +exactly `EstBlockSize` bytes of uncompressed content. + +Entries *must* be in order, starting with the lowest offset, +and there *must* be no uncompressed offset duplicates. +Entries *may* point to the start of a skippable block, +but it is then not allowed to also have an entry for the next block since +that would give an uncompressed offset duplicate. + +There is no requirement for all blocks to be represented in the index. +In fact there is a maximum of 65536 block entries in an index. + +The writer can use any method to reduce the number of entries. +An implicit block start at 0,0 can be assumed. + +### Decoding entries: + +``` +// Read Uncompressed entries. +// Each assumes EstBlockSize delta from previous. +for each entry { + uOff = 0 + if HasUncompressedOffsets == 1 { + uOff = ReadVarInt // Read value from stream + } + + // Except for the first entry, use previous values. + if entryNum == 0 { + entry[entryNum].UncompressedOffset = uOff + continue + } + + // Uncompressed uses previous offset and adds EstBlockSize + entry[entryNum].UncompressedOffset = entry[entryNum-1].UncompressedOffset + EstBlockSize +} + + +// Guess that the first block will be 50% of uncompressed size. +// Integer truncating division must be used. +CompressGuess := EstBlockSize / 2 + +// Read Compressed entries. +// Each assumes CompressGuess delta from previous. +// CompressGuess is adjusted for each value. +for each entry { + cOff = ReadVarInt // Read value from stream + + // Except for the first entry, use previous values. + if entryNum == 0 { + entry[entryNum].CompressedOffset = cOff + continue + } + + // Compressed uses previous and our estimate. + entry[entryNum].CompressedOffset = entry[entryNum-1].CompressedOffset + CompressGuess + + // Adjust compressed offset for next loop, integer truncating division must be used. + CompressGuess += cOff/2 +} +``` # Format Extensions diff --git a/vendor/github.com/klauspost/compress/s2/decode.go b/vendor/github.com/klauspost/compress/s2/decode.go index d0ae5304..9e7fce88 100644 --- a/vendor/github.com/klauspost/compress/s2/decode.go +++ b/vendor/github.com/klauspost/compress/s2/decode.go @@ -8,7 +8,9 @@ package s2 import ( "encoding/binary" "errors" + "fmt" "io" + "io/ioutil" ) var ( @@ -22,6 +24,16 @@ var ( ErrUnsupported = errors.New("s2: unsupported input") ) +// ErrCantSeek is returned if the stream cannot be seeked. +type ErrCantSeek struct { + Reason string +} + +// Error returns the error as string. +func (e ErrCantSeek) Error() string { + return fmt.Sprintf("s2: Can't seek because %s", e.Reason) +} + // DecodedLen returns the length of the decoded block. func DecodedLen(src []byte) (int, error) { v, _, err := decodedLen(src) @@ -88,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader { } else { nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize) } + nr.readHeader = nr.ignoreStreamID nr.paramsOK = true return &nr } @@ -131,12 +144,41 @@ func ReaderAllocBlock(blockSize int) ReaderOption { } } +// ReaderIgnoreStreamIdentifier will make the reader skip the expected +// stream identifier at the beginning of the stream. +// This can be used when serving a stream that has been forwarded to a specific point. +func ReaderIgnoreStreamIdentifier() ReaderOption { + return func(r *Reader) error { + r.ignoreStreamID = true + return nil + } +} + +// ReaderSkippableCB will register a callback for chuncks with the specified ID. +// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive). +// For each chunk with the ID, the callback is called with the content. +// Any returned non-nil error will abort decompression. +// Only one callback per ID is supported, latest sent will be used. +func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption { + return func(r *Reader) error { + if id < 0x80 || id > 0xfd { + return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)") + } + r.skippableCB[id] = fn + return nil + } +} + // Reader is an io.Reader that can read Snappy-compressed bytes. type Reader struct { - r io.Reader - err error - decoded []byte - buf []byte + r io.Reader + err error + decoded []byte + buf []byte + skippableCB [0x80]func(r io.Reader) error + blockStart int64 // Uncompressed offset at start of current. + index *Index + // decoded[i:j] contains decoded bytes that have not yet been passed on. i, j int // maximum block size allowed. @@ -144,10 +186,11 @@ type Reader struct { // maximum expected buffer size. maxBufSize int // alloc a buffer this size if > 0. - lazyBuf int - readHeader bool - paramsOK bool - snappyFrame bool + lazyBuf int + readHeader bool + paramsOK bool + snappyFrame bool + ignoreStreamID bool } // ensureBufferSize will ensure that the buffer can take at least n bytes. @@ -172,11 +215,12 @@ func (r *Reader) Reset(reader io.Reader) { if !r.paramsOK { return } + r.index = nil r.r = reader r.err = nil r.i = 0 r.j = 0 - r.readHeader = false + r.readHeader = r.ignoreStreamID } func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { @@ -189,11 +233,24 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { return true } -// skipN will skip n bytes. +// skippable will skip n bytes. // If the supplied reader supports seeking that is used. // tmp is used as a temporary buffer for reading. // The supplied slice does not need to be the size of the read. -func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) { +func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) { + if id < 0x80 { + r.err = fmt.Errorf("interbal error: skippable id < 0x80") + return false + } + if fn := r.skippableCB[id-0x80]; fn != nil { + rd := io.LimitReader(r.r, int64(n)) + r.err = fn(rd) + if r.err != nil { + return false + } + _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp) + return r.err == nil + } if rs, ok := r.r.(io.ReadSeeker); ok { _, err := rs.Seek(int64(n), io.SeekCurrent) if err == nil { @@ -247,6 +304,7 @@ func (r *Reader) Read(p []byte) (int, error) { // https://github.com/google/snappy/blob/master/framing_format.txt switch chunkType { case chunkTypeCompressedData: + r.blockStart += int64(r.j) // Section 4.2. Compressed data (chunk type 0x00). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -294,6 +352,7 @@ func (r *Reader) Read(p []byte) (int, error) { continue case chunkTypeUncompressedData: + r.blockStart += int64(r.j) // Section 4.3. Uncompressed data (chunk type 0x01). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -357,17 +416,20 @@ func (r *Reader) Read(p []byte) (int, error) { if chunkType <= 0x7f { // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + // fmt.Printf("ERR chunktype: 0x%x\n", chunkType) r.err = ErrUnsupported return 0, r.err } // Section 4.4 Padding (chunk type 0xfe). // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if chunkLen > maxBlockSize { + if chunkLen > maxChunkSize { + // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen) r.err = ErrUnsupported return 0, r.err } - if !r.skipN(r.buf, chunkLen, false) { + // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen) + if !r.skippable(r.buf, chunkLen, false, chunkType) { return 0, r.err } } @@ -396,7 +458,7 @@ func (r *Reader) Skip(n int64) error { return nil } n -= int64(r.j - r.i) - r.i, r.j = 0, 0 + r.i = r.j } // Buffer empty; read blocks until we have content. @@ -420,6 +482,7 @@ func (r *Reader) Skip(n int64) error { // https://github.com/google/snappy/blob/master/framing_format.txt switch chunkType { case chunkTypeCompressedData: + r.blockStart += int64(r.j) // Section 4.2. Compressed data (chunk type 0x00). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -468,6 +531,7 @@ func (r *Reader) Skip(n int64) error { r.i, r.j = 0, dLen continue case chunkTypeUncompressedData: + r.blockStart += int64(r.j) // Section 4.3. Uncompressed data (chunk type 0x01). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -528,19 +592,138 @@ func (r *Reader) Skip(n int64) error { r.err = ErrUnsupported return r.err } - if chunkLen > maxBlockSize { + if chunkLen > maxChunkSize { r.err = ErrUnsupported return r.err } // Section 4.4 Padding (chunk type 0xfe). // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if !r.skipN(r.buf, chunkLen, false) { + if !r.skippable(r.buf, chunkLen, false, chunkType) { return r.err } } return nil } +// ReadSeeker provides random or forward seeking in compressed content. +// See Reader.ReadSeeker +type ReadSeeker struct { + *Reader +} + +// ReadSeeker will return an io.ReadSeeker compatible version of the reader. +// If 'random' is specified the returned io.Seeker can be used for +// random seeking, otherwise only forward seeking is supported. +// Enabling random seeking requires the original input to support +// the io.Seeker interface. +// A custom index can be specified which will be used if supplied. +// When using a custom index, it will not be read from the input stream. +// The returned ReadSeeker contains a shallow reference to the existing Reader, +// meaning changes performed to one is reflected in the other. +func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) { + // Read index if provided. + if len(index) != 0 { + if r.index == nil { + r.index = &Index{} + } + if _, err := r.index.Load(index); err != nil { + return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()} + } + } + + // Check if input is seekable + rs, ok := r.r.(io.ReadSeeker) + if !ok { + if !random { + return &ReadSeeker{Reader: r}, nil + } + return nil, ErrCantSeek{Reason: "input stream isn't seekable"} + } + + if r.index != nil { + // Seekable and index, ok... + return &ReadSeeker{Reader: r}, nil + } + + // Load from stream. + r.index = &Index{} + + // Read current position. + pos, err := rs.Seek(0, io.SeekCurrent) + if err != nil { + return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} + } + err = r.index.LoadStream(rs) + if err != nil { + if err == ErrUnsupported { + return nil, ErrCantSeek{Reason: "input stream does not contain an index"} + } + return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()} + } + + // reset position. + _, err = rs.Seek(pos, io.SeekStart) + if err != nil { + return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} + } + return &ReadSeeker{Reader: r}, nil +} + +// Seek allows seeking in compressed data. +func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { + if r.err != nil { + return 0, r.err + } + if offset == 0 && whence == io.SeekCurrent { + return r.blockStart + int64(r.i), nil + } + if !r.readHeader { + // Make sure we read the header. + _, r.err = r.Read([]byte{}) + } + rs, ok := r.r.(io.ReadSeeker) + if r.index == nil || !ok { + if whence == io.SeekCurrent && offset >= 0 { + err := r.Skip(offset) + return r.blockStart + int64(r.i), err + } + if whence == io.SeekStart && offset >= r.blockStart+int64(r.i) { + err := r.Skip(offset - r.blockStart - int64(r.i)) + return r.blockStart + int64(r.i), err + } + return 0, ErrUnsupported + + } + + switch whence { + case io.SeekCurrent: + offset += r.blockStart + int64(r.i) + case io.SeekEnd: + offset = -offset + } + c, u, err := r.index.Find(offset) + if err != nil { + return r.blockStart + int64(r.i), err + } + + // Seek to next block + _, err = rs.Seek(c, io.SeekStart) + if err != nil { + return 0, err + } + + if offset < 0 { + offset = r.index.TotalUncompressed + offset + } + + r.i = r.j // Remove rest of current block. + if u < offset { + // Forward inside block + return offset, r.Skip(offset - u) + } + return offset, nil +} + // ReadByte satisfies the io.ByteReader interface. func (r *Reader) ReadByte() (byte, error) { if r.err != nil { @@ -563,3 +746,17 @@ func (r *Reader) ReadByte() (byte, error) { } return 0, io.ErrNoProgress } + +// SkippableCB will register a callback for chunks with the specified ID. +// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive). +// For each chunk with the ID, the callback is called with the content. +// Any returned non-nil error will abort decompression. +// Only one callback per ID is supported, latest sent will be used. +// Sending a nil function will disable previous callbacks. +func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error { + if id < 0x80 || id > chunkTypePadding { + return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)") + } + r.skippableCB[id] = fn + return nil +} diff --git a/vendor/github.com/klauspost/compress/s2/encode.go b/vendor/github.com/klauspost/compress/s2/encode.go index aa8b108d..59f992ca 100644 --- a/vendor/github.com/klauspost/compress/s2/encode.go +++ b/vendor/github.com/klauspost/compress/s2/encode.go @@ -395,23 +395,26 @@ type Writer struct { // ibuf is a buffer for the incoming (uncompressed) bytes. ibuf []byte - blockSize int - obufLen int - concurrency int - written int64 - output chan chan result - buffers sync.Pool - pad int + blockSize int + obufLen int + concurrency int + written int64 + uncompWritten int64 // Bytes sent to compression + output chan chan result + buffers sync.Pool + pad int writer io.Writer randSrc io.Reader writerWg sync.WaitGroup + index Index // wroteStreamHeader is whether we have written the stream header. wroteStreamHeader bool paramsOK bool snappy bool flushOnWrite bool + appendIndex bool level uint8 } @@ -422,7 +425,11 @@ const ( levelBest ) -type result []byte +type result struct { + b []byte + // Uncompressed start offset + startOffset int64 +} // err returns the previously set error. // If no error has been set it is set to err if not nil. @@ -454,6 +461,9 @@ func (w *Writer) Reset(writer io.Writer) { w.wroteStreamHeader = false w.written = 0 w.writer = writer + w.uncompWritten = 0 + w.index.reset(w.blockSize) + // If we didn't get a writer, stop here. if writer == nil { return @@ -474,7 +484,8 @@ func (w *Writer) Reset(writer io.Writer) { // Get a queued write. for write := range toWrite { // Wait for the data to be available. - in := <-write + input := <-write + in := input.b if len(in) > 0 { if w.err(nil) == nil { // Don't expose data from previous buffers. @@ -485,11 +496,12 @@ func (w *Writer) Reset(writer io.Writer) { err = io.ErrShortBuffer } _ = w.err(err) + w.err(w.index.add(w.written, input.startOffset)) w.written += int64(n) } } if cap(in) >= w.obufLen { - w.buffers.Put([]byte(in)) + w.buffers.Put(in) } // close the incoming write request. // This can be used for synchronizing flushes. @@ -500,6 +512,9 @@ func (w *Writer) Reset(writer io.Writer) { // Write satisfies the io.Writer interface. func (w *Writer) Write(p []byte) (nRet int, errRet error) { + if err := w.err(nil); err != nil { + return 0, err + } if w.flushOnWrite { return w.write(p) } @@ -535,6 +550,9 @@ func (w *Writer) Write(p []byte) (nRet int, errRet error) { // The return value n is the number of bytes read. // Any error except io.EOF encountered during the read is also returned. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { + if err := w.err(nil); err != nil { + return 0, err + } if len(w.ibuf) > 0 { err := w.Flush() if err != nil { @@ -577,6 +595,85 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { return n, w.err(nil) } +// AddSkippableBlock will add a skippable block to the stream. +// The ID must be 0x80-0xfe (inclusive). +// Length of the skippable block must be <= 16777215 bytes. +func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { + if err := w.err(nil); err != nil { + return err + } + if len(data) == 0 { + return nil + } + if id < 0x80 || id > chunkTypePadding { + return fmt.Errorf("invalid skippable block id %x", id) + } + if len(data) > maxChunkSize { + return fmt.Errorf("skippable block excessed maximum size") + } + var header [4]byte + chunkLen := 4 + len(data) + header[0] = id + header[1] = uint8(chunkLen >> 0) + header[2] = uint8(chunkLen >> 8) + header[3] = uint8(chunkLen >> 16) + if w.concurrency == 1 { + write := func(b []byte) error { + n, err := w.writer.Write(b) + if err = w.err(err); err != nil { + return err + } + if n != len(data) { + return w.err(io.ErrShortWrite) + } + w.written += int64(n) + return w.err(nil) + } + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + if w.snappy { + if err := write([]byte(magicChunkSnappy)); err != nil { + return err + } + } else { + if err := write([]byte(magicChunk)); err != nil { + return err + } + } + } + if err := write(header[:]); err != nil { + return err + } + if err := write(data); err != nil { + return err + } + } + + // Create output... + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + hWriter := make(chan result) + w.output <- hWriter + if w.snappy { + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} + } else { + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} + } + } + + // Copy input. + inbuf := w.buffers.Get().([]byte)[:4] + copy(inbuf, header[:]) + inbuf = append(inbuf, data...) + + output := make(chan result, 1) + // Queue output. + w.output <- output + output <- result{startOffset: w.uncompWritten, b: inbuf} + + return nil +} + // EncodeBuffer will add a buffer to the stream. // This is the fastest way to encode a stream, // but the input buffer cannot be written to by the caller @@ -614,9 +711,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -632,6 +729,10 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) go func() { checksum := crc(uncompressed) @@ -664,7 +765,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res }() } return nil @@ -708,9 +810,9 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -731,6 +833,11 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) + go func() { checksum := crc(uncompressed) @@ -763,7 +870,8 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res // Put unused buffer back in pool. w.buffers.Put(inbuf) @@ -793,9 +901,9 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -806,6 +914,11 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) + go func() { checksum := crc(uncompressed) @@ -838,7 +951,8 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res // Put unused buffer back in pool. w.buffers.Put(inbuf) @@ -912,7 +1026,10 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { if n != len(obuf) { return 0, w.err(io.ErrShortWrite) } + w.err(w.index.add(w.written, w.uncompWritten)) w.written += int64(n) + w.uncompWritten += int64(len(uncompressed)) + if chunkType == chunkTypeUncompressedData { // Write uncompressed data. n, err := w.writer.Write(uncompressed) @@ -961,39 +1078,88 @@ func (w *Writer) Flush() error { res := make(chan result) w.output <- res // Block until this has been picked up. - res <- nil + res <- result{b: nil, startOffset: w.uncompWritten} // When it is closed, we have flushed. <-res return w.err(nil) } // Close calls Flush and then closes the Writer. -// Calling Close multiple times is ok. +// Calling Close multiple times is ok, +// but calling CloseIndex after this will make it not return the index. func (w *Writer) Close() error { + _, err := w.closeIndex(w.appendIndex) + return err +} + +// CloseIndex calls Close and returns an index on first call. +// This is not required if you are only adding index to a stream. +func (w *Writer) CloseIndex() ([]byte, error) { + return w.closeIndex(true) +} + +func (w *Writer) closeIndex(idx bool) ([]byte, error) { err := w.Flush() if w.output != nil { close(w.output) w.writerWg.Wait() w.output = nil } - if w.err(nil) == nil && w.writer != nil && w.pad > 0 { - add := calcSkippableFrame(w.written, int64(w.pad)) - frame, err := skippableFrame(w.ibuf[:0], add, w.randSrc) - if err = w.err(err); err != nil { - return err + + var index []byte + if w.err(nil) == nil && w.writer != nil { + // Create index. + if idx { + compSize := int64(-1) + if w.pad <= 1 { + compSize = w.written + } + index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize) + // Count as written for padding. + if w.appendIndex { + w.written += int64(len(index)) + } + if true { + _, err := w.index.Load(index) + if err != nil { + panic(err) + } + } + } + + if w.pad > 1 { + tmp := w.ibuf[:0] + if len(index) > 0 { + // Allocate another buffer. + tmp = w.buffers.Get().([]byte)[:0] + defer w.buffers.Put(tmp) + } + add := calcSkippableFrame(w.written, int64(w.pad)) + frame, err := skippableFrame(tmp, add, w.randSrc) + if err = w.err(err); err != nil { + return nil, err + } + n, err2 := w.writer.Write(frame) + if err2 == nil && n != len(frame) { + err2 = io.ErrShortWrite + } + _ = w.err(err2) + } + if len(index) > 0 && w.appendIndex { + n, err2 := w.writer.Write(index) + if err2 == nil && n != len(index) { + err2 = io.ErrShortWrite + } + _ = w.err(err2) } - _, err2 := w.writer.Write(frame) - _ = w.err(err2) } - _ = w.err(errClosed) + err = w.err(errClosed) if err == errClosed { - return nil + return index, nil } - return err + return nil, err } -const skippableFrameHeader = 4 - // calcSkippableFrame will return a total size to be added for written // to be divisible by multiple. // The value will always be > skippableFrameHeader. @@ -1057,6 +1223,15 @@ func WriterConcurrency(n int) WriterOption { } } +// WriterAddIndex will append an index to the end of a stream +// when it is closed. +func WriterAddIndex() WriterOption { + return func(w *Writer) error { + w.appendIndex = true + return nil + } +} + // WriterBetterCompression will enable better compression. // EncodeBetter compresses better than Encode but typically with a // 10-40% speed decrease on both compression and decompression. diff --git a/vendor/github.com/klauspost/compress/s2/index.go b/vendor/github.com/klauspost/compress/s2/index.go new file mode 100644 index 00000000..fd857682 --- /dev/null +++ b/vendor/github.com/klauspost/compress/s2/index.go @@ -0,0 +1,525 @@ +// Copyright (c) 2022+ Klaus Post. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package s2 + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "io" +) + +const ( + S2IndexHeader = "s2idx\x00" + S2IndexTrailer = "\x00xdi2s" + maxIndexEntries = 1 << 16 +) + +// Index represents an S2/Snappy index. +type Index struct { + TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown. + TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown. + info []struct { + compressedOffset int64 + uncompressedOffset int64 + } + estBlockUncomp int64 +} + +func (i *Index) reset(maxBlock int) { + i.estBlockUncomp = int64(maxBlock) + i.TotalCompressed = -1 + i.TotalUncompressed = -1 + if len(i.info) > 0 { + i.info = i.info[:0] + } +} + +// allocInfos will allocate an empty slice of infos. +func (i *Index) allocInfos(n int) { + if n > maxIndexEntries { + panic("n > maxIndexEntries") + } + i.info = make([]struct { + compressedOffset int64 + uncompressedOffset int64 + }, 0, n) +} + +// add an uncompressed and compressed pair. +// Entries must be sent in order. +func (i *Index) add(compressedOffset, uncompressedOffset int64) error { + if i == nil { + return nil + } + lastIdx := len(i.info) - 1 + if lastIdx >= 0 { + latest := i.info[lastIdx] + if latest.uncompressedOffset == uncompressedOffset { + // Uncompressed didn't change, don't add entry, + // but update start index. + latest.compressedOffset = compressedOffset + i.info[lastIdx] = latest + return nil + } + if latest.uncompressedOffset > uncompressedOffset { + return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset) + } + if latest.compressedOffset > compressedOffset { + return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset) + } + } + i.info = append(i.info, struct { + compressedOffset int64 + uncompressedOffset int64 + }{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset}) + return nil +} + +// Find the offset at or before the wanted (uncompressed) offset. +// If offset is 0 or positive it is the offset from the beginning of the file. +// If the uncompressed size is known, the offset must be within the file. +// If an offset outside the file is requested io.ErrUnexpectedEOF is returned. +// If the offset is negative, it is interpreted as the distance from the end of the file, +// where -1 represents the last byte. +// If offset from the end of the file is requested, but size is unknown, +// ErrUnsupported will be returned. +func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) { + if i.TotalUncompressed < 0 { + return 0, 0, ErrCorrupt + } + if offset < 0 { + offset = i.TotalUncompressed + offset + if offset < 0 { + return 0, 0, io.ErrUnexpectedEOF + } + } + if offset > i.TotalUncompressed { + return 0, 0, io.ErrUnexpectedEOF + } + for _, info := range i.info { + if info.uncompressedOffset > offset { + break + } + compressedOff = info.compressedOffset + uncompressedOff = info.uncompressedOffset + } + return compressedOff, uncompressedOff, nil +} + +// reduce to stay below maxIndexEntries +func (i *Index) reduce() { + if len(i.info) < maxIndexEntries && i.estBlockUncomp >= 1<<20 { + return + } + + // Algorithm, keep 1, remove removeN entries... + removeN := (len(i.info) + 1) / maxIndexEntries + src := i.info + j := 0 + + // Each block should be at least 1MB, but don't reduce below 1000 entries. + for i.estBlockUncomp*(int64(removeN)+1) < 1<<20 && len(i.info)/(removeN+1) > 1000 { + removeN++ + } + for idx := 0; idx < len(src); idx++ { + i.info[j] = src[idx] + j++ + idx += removeN + } + i.info = i.info[:j] + // Update maxblock estimate. + i.estBlockUncomp += i.estBlockUncomp * int64(removeN) +} + +func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte { + i.reduce() + var tmp [binary.MaxVarintLen64]byte + + initSize := len(b) + // We make the start a skippable header+size. + b = append(b, ChunkTypeIndex, 0, 0, 0) + b = append(b, []byte(S2IndexHeader)...) + // Total Uncompressed size + n := binary.PutVarint(tmp[:], uncompTotal) + b = append(b, tmp[:n]...) + // Total Compressed size + n = binary.PutVarint(tmp[:], compTotal) + b = append(b, tmp[:n]...) + // Put EstBlockUncomp size + n = binary.PutVarint(tmp[:], i.estBlockUncomp) + b = append(b, tmp[:n]...) + // Put length + n = binary.PutVarint(tmp[:], int64(len(i.info))) + b = append(b, tmp[:n]...) + + // Check if we should add uncompressed offsets + var hasUncompressed byte + for idx, info := range i.info { + if idx == 0 { + if info.uncompressedOffset != 0 { + hasUncompressed = 1 + break + } + continue + } + if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp { + hasUncompressed = 1 + break + } + } + b = append(b, hasUncompressed) + + // Add each entry + if hasUncompressed == 1 { + for idx, info := range i.info { + uOff := info.uncompressedOffset + if idx > 0 { + prev := i.info[idx-1] + uOff -= prev.uncompressedOffset + (i.estBlockUncomp) + } + n = binary.PutVarint(tmp[:], uOff) + b = append(b, tmp[:n]...) + } + } + + // Initial compressed size estimate. + cPredict := i.estBlockUncomp / 2 + + for idx, info := range i.info { + cOff := info.compressedOffset + if idx > 0 { + prev := i.info[idx-1] + cOff -= prev.compressedOffset + cPredict + // Update compressed size prediction, with half the error. + cPredict += cOff / 2 + } + n = binary.PutVarint(tmp[:], cOff) + b = append(b, tmp[:n]...) + } + + // Add Total Size. + // Stored as fixed size for easier reading. + binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer))) + b = append(b, tmp[:4]...) + // Trailer + b = append(b, []byte(S2IndexTrailer)...) + + // Update size + chunkLen := len(b) - initSize - skippableFrameHeader + b[initSize+1] = uint8(chunkLen >> 0) + b[initSize+2] = uint8(chunkLen >> 8) + b[initSize+3] = uint8(chunkLen >> 16) + //fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal) + return b +} + +// Load a binary index. +// A zero value Index can be used or a previous one can be reused. +func (i *Index) Load(b []byte) ([]byte, error) { + if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) { + return b, io.ErrUnexpectedEOF + } + if b[0] != ChunkTypeIndex { + return b, ErrCorrupt + } + chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16 + b = b[4:] + + // Validate we have enough... + if len(b) < chunkLen { + return b, io.ErrUnexpectedEOF + } + if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) { + return b, ErrUnsupported + } + b = b[len(S2IndexHeader):] + + // Total Uncompressed + if v, n := binary.Varint(b); n <= 0 || v < 0 { + return b, ErrCorrupt + } else { + i.TotalUncompressed = v + b = b[n:] + } + + // Total Compressed + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + i.TotalCompressed = v + b = b[n:] + } + + // Read EstBlockUncomp + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + if v < 0 { + return b, ErrCorrupt + } + i.estBlockUncomp = v + b = b[n:] + } + + var entries int + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + if v < 0 || v > maxIndexEntries { + return b, ErrCorrupt + } + entries = int(v) + b = b[n:] + } + if cap(i.info) < entries { + i.allocInfos(entries) + } + i.info = i.info[:entries] + + if len(b) < 1 { + return b, io.ErrUnexpectedEOF + } + hasUncompressed := b[0] + b = b[1:] + if hasUncompressed&1 != hasUncompressed { + return b, ErrCorrupt + } + + // Add each uncompressed entry + for idx := range i.info { + var uOff int64 + if hasUncompressed != 0 { + // Load delta + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + uOff = v + b = b[n:] + } + } + + if idx > 0 { + prev := i.info[idx-1].uncompressedOffset + uOff += prev + (i.estBlockUncomp) + if uOff <= prev { + return b, ErrCorrupt + } + } + if uOff < 0 { + return b, ErrCorrupt + } + i.info[idx].uncompressedOffset = uOff + } + + // Initial compressed size estimate. + cPredict := i.estBlockUncomp / 2 + + // Add each compressed entry + for idx := range i.info { + var cOff int64 + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + cOff = v + b = b[n:] + } + + if idx > 0 { + // Update compressed size prediction, with half the error. + cPredictNew := cPredict + cOff/2 + + prev := i.info[idx-1].compressedOffset + cOff += prev + cPredict + if cOff <= prev { + return b, ErrCorrupt + } + cPredict = cPredictNew + } + if cOff < 0 { + return b, ErrCorrupt + } + i.info[idx].compressedOffset = cOff + } + if len(b) < 4+len(S2IndexTrailer) { + return b, io.ErrUnexpectedEOF + } + // Skip size... + b = b[4:] + + // Check trailer... + if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) { + return b, ErrCorrupt + } + return b[len(S2IndexTrailer):], nil +} + +// LoadStream will load an index from the end of the supplied stream. +// ErrUnsupported will be returned if the signature cannot be found. +// ErrCorrupt will be returned if unexpected values are found. +// io.ErrUnexpectedEOF is returned if there are too few bytes. +// IO errors are returned as-is. +func (i *Index) LoadStream(rs io.ReadSeeker) error { + // Go to end. + _, err := rs.Seek(-10, io.SeekEnd) + if err != nil { + return err + } + var tmp [10]byte + _, err = io.ReadFull(rs, tmp[:]) + if err != nil { + return err + } + // Check trailer... + if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) { + return ErrUnsupported + } + sz := binary.LittleEndian.Uint32(tmp[:4]) + if sz > maxChunkSize+skippableFrameHeader { + return ErrCorrupt + } + _, err = rs.Seek(-int64(sz), io.SeekEnd) + if err != nil { + return err + } + + // Read index. + buf := make([]byte, sz) + _, err = io.ReadFull(rs, buf) + if err != nil { + return err + } + _, err = i.Load(buf) + return err +} + +// IndexStream will return an index for a stream. +// The stream structure will be checked, but +// data within blocks is not verified. +// The returned index can either be appended to the end of the stream +// or stored separately. +func IndexStream(r io.Reader) ([]byte, error) { + var i Index + var buf [maxChunkSize]byte + var readHeader bool + for { + _, err := io.ReadFull(r, buf[:4]) + if err != nil { + if err == io.EOF { + return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil + } + return nil, err + } + // Start of this chunk. + startChunk := i.TotalCompressed + i.TotalCompressed += 4 + + chunkType := buf[0] + if !readHeader { + if chunkType != chunkTypeStreamIdentifier { + return nil, ErrCorrupt + } + readHeader = true + } + chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16 + if chunkLen < checksumSize { + return nil, ErrCorrupt + } + + i.TotalCompressed += int64(chunkLen) + _, err = io.ReadFull(r, buf[:chunkLen]) + if err != nil { + return nil, io.ErrUnexpectedEOF + } + // The chunk types are specified at + // https://github.com/google/snappy/blob/master/framing_format.txt + switch chunkType { + case chunkTypeCompressedData: + // Section 4.2. Compressed data (chunk type 0x00). + // Skip checksum. + dLen, err := DecodedLen(buf[checksumSize:]) + if err != nil { + return nil, err + } + if dLen > maxBlockSize { + return nil, ErrCorrupt + } + if i.estBlockUncomp == 0 { + // Use first block for estimate... + i.estBlockUncomp = int64(dLen) + } + err = i.add(startChunk, i.TotalUncompressed) + if err != nil { + return nil, err + } + i.TotalUncompressed += int64(dLen) + continue + case chunkTypeUncompressedData: + n2 := chunkLen - checksumSize + if n2 > maxBlockSize { + return nil, ErrCorrupt + } + if i.estBlockUncomp == 0 { + // Use first block for estimate... + i.estBlockUncomp = int64(n2) + } + err = i.add(startChunk, i.TotalUncompressed) + if err != nil { + return nil, err + } + i.TotalUncompressed += int64(n2) + continue + case chunkTypeStreamIdentifier: + // Section 4.1. Stream identifier (chunk type 0xff). + if chunkLen != len(magicBody) { + return nil, ErrCorrupt + } + + if string(buf[:len(magicBody)]) != magicBody { + if string(buf[:len(magicBody)]) != magicBodySnappy { + return nil, ErrCorrupt + } + } + + continue + } + + if chunkType <= 0x7f { + // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + return nil, ErrUnsupported + } + if chunkLen > maxChunkSize { + return nil, ErrUnsupported + } + // Section 4.4 Padding (chunk type 0xfe). + // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). + } +} + +// JSON returns the index as JSON text. +func (i *Index) JSON() []byte { + x := struct { + TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown. + TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown. + Offsets []struct { + CompressedOffset int64 `json:"compressed"` + UncompressedOffset int64 `json:"uncompressed"` + } `json:"offsets"` + EstBlockUncomp int64 `json:"est_block_uncompressed"` + }{ + TotalUncompressed: i.TotalUncompressed, + TotalCompressed: i.TotalCompressed, + EstBlockUncomp: i.estBlockUncomp, + } + for _, v := range i.info { + x.Offsets = append(x.Offsets, struct { + CompressedOffset int64 `json:"compressed"` + UncompressedOffset int64 `json:"uncompressed"` + }{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset}) + } + b, _ := json.MarshalIndent(x, "", " ") + return b +} diff --git a/vendor/github.com/klauspost/compress/s2/s2.go b/vendor/github.com/klauspost/compress/s2/s2.go index 89d69e96..dae3f731 100644 --- a/vendor/github.com/klauspost/compress/s2/s2.go +++ b/vendor/github.com/klauspost/compress/s2/s2.go @@ -87,6 +87,9 @@ const ( // minBlockSize is the minimum size of block setting when creating a writer. minBlockSize = 4 << 10 + skippableFrameHeader = 4 + maxChunkSize = 1<<24 - 1 // 16777215 + // Default block size defaultBlockSize = 1 << 20 @@ -99,6 +102,7 @@ const ( const ( chunkTypeCompressedData = 0x00 chunkTypeUncompressedData = 0x01 + ChunkTypeIndex = 0x99 chunkTypePadding = 0xfe chunkTypeStreamIdentifier = 0xff ) |