diff options
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/encode.go')
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/encode.go | 241 |
1 files changed, 208 insertions, 33 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/encode.go b/vendor/github.com/klauspost/compress/s2/encode.go index aa8b108d..59f992ca 100644 --- a/vendor/github.com/klauspost/compress/s2/encode.go +++ b/vendor/github.com/klauspost/compress/s2/encode.go @@ -395,23 +395,26 @@ type Writer struct { // ibuf is a buffer for the incoming (uncompressed) bytes. ibuf []byte - blockSize int - obufLen int - concurrency int - written int64 - output chan chan result - buffers sync.Pool - pad int + blockSize int + obufLen int + concurrency int + written int64 + uncompWritten int64 // Bytes sent to compression + output chan chan result + buffers sync.Pool + pad int writer io.Writer randSrc io.Reader writerWg sync.WaitGroup + index Index // wroteStreamHeader is whether we have written the stream header. wroteStreamHeader bool paramsOK bool snappy bool flushOnWrite bool + appendIndex bool level uint8 } @@ -422,7 +425,11 @@ const ( levelBest ) -type result []byte +type result struct { + b []byte + // Uncompressed start offset + startOffset int64 +} // err returns the previously set error. // If no error has been set it is set to err if not nil. @@ -454,6 +461,9 @@ func (w *Writer) Reset(writer io.Writer) { w.wroteStreamHeader = false w.written = 0 w.writer = writer + w.uncompWritten = 0 + w.index.reset(w.blockSize) + // If we didn't get a writer, stop here. if writer == nil { return @@ -474,7 +484,8 @@ func (w *Writer) Reset(writer io.Writer) { // Get a queued write. for write := range toWrite { // Wait for the data to be available. - in := <-write + input := <-write + in := input.b if len(in) > 0 { if w.err(nil) == nil { // Don't expose data from previous buffers. @@ -485,11 +496,12 @@ func (w *Writer) Reset(writer io.Writer) { err = io.ErrShortBuffer } _ = w.err(err) + w.err(w.index.add(w.written, input.startOffset)) w.written += int64(n) } } if cap(in) >= w.obufLen { - w.buffers.Put([]byte(in)) + w.buffers.Put(in) } // close the incoming write request. // This can be used for synchronizing flushes. @@ -500,6 +512,9 @@ func (w *Writer) Reset(writer io.Writer) { // Write satisfies the io.Writer interface. func (w *Writer) Write(p []byte) (nRet int, errRet error) { + if err := w.err(nil); err != nil { + return 0, err + } if w.flushOnWrite { return w.write(p) } @@ -535,6 +550,9 @@ func (w *Writer) Write(p []byte) (nRet int, errRet error) { // The return value n is the number of bytes read. // Any error except io.EOF encountered during the read is also returned. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { + if err := w.err(nil); err != nil { + return 0, err + } if len(w.ibuf) > 0 { err := w.Flush() if err != nil { @@ -577,6 +595,85 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { return n, w.err(nil) } +// AddSkippableBlock will add a skippable block to the stream. +// The ID must be 0x80-0xfe (inclusive). +// Length of the skippable block must be <= 16777215 bytes. +func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { + if err := w.err(nil); err != nil { + return err + } + if len(data) == 0 { + return nil + } + if id < 0x80 || id > chunkTypePadding { + return fmt.Errorf("invalid skippable block id %x", id) + } + if len(data) > maxChunkSize { + return fmt.Errorf("skippable block excessed maximum size") + } + var header [4]byte + chunkLen := 4 + len(data) + header[0] = id + header[1] = uint8(chunkLen >> 0) + header[2] = uint8(chunkLen >> 8) + header[3] = uint8(chunkLen >> 16) + if w.concurrency == 1 { + write := func(b []byte) error { + n, err := w.writer.Write(b) + if err = w.err(err); err != nil { + return err + } + if n != len(data) { + return w.err(io.ErrShortWrite) + } + w.written += int64(n) + return w.err(nil) + } + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + if w.snappy { + if err := write([]byte(magicChunkSnappy)); err != nil { + return err + } + } else { + if err := write([]byte(magicChunk)); err != nil { + return err + } + } + } + if err := write(header[:]); err != nil { + return err + } + if err := write(data); err != nil { + return err + } + } + + // Create output... + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + hWriter := make(chan result) + w.output <- hWriter + if w.snappy { + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} + } else { + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} + } + } + + // Copy input. + inbuf := w.buffers.Get().([]byte)[:4] + copy(inbuf, header[:]) + inbuf = append(inbuf, data...) + + output := make(chan result, 1) + // Queue output. + w.output <- output + output <- result{startOffset: w.uncompWritten, b: inbuf} + + return nil +} + // EncodeBuffer will add a buffer to the stream. // This is the fastest way to encode a stream, // but the input buffer cannot be written to by the caller @@ -614,9 +711,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -632,6 +729,10 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) go func() { checksum := crc(uncompressed) @@ -664,7 +765,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res }() } return nil @@ -708,9 +810,9 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -731,6 +833,11 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) + go func() { checksum := crc(uncompressed) @@ -763,7 +870,8 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res // Put unused buffer back in pool. w.buffers.Put(inbuf) @@ -793,9 +901,9 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -806,6 +914,11 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) + go func() { checksum := crc(uncompressed) @@ -838,7 +951,8 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res // Put unused buffer back in pool. w.buffers.Put(inbuf) @@ -912,7 +1026,10 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { if n != len(obuf) { return 0, w.err(io.ErrShortWrite) } + w.err(w.index.add(w.written, w.uncompWritten)) w.written += int64(n) + w.uncompWritten += int64(len(uncompressed)) + if chunkType == chunkTypeUncompressedData { // Write uncompressed data. n, err := w.writer.Write(uncompressed) @@ -961,39 +1078,88 @@ func (w *Writer) Flush() error { res := make(chan result) w.output <- res // Block until this has been picked up. - res <- nil + res <- result{b: nil, startOffset: w.uncompWritten} // When it is closed, we have flushed. <-res return w.err(nil) } // Close calls Flush and then closes the Writer. -// Calling Close multiple times is ok. +// Calling Close multiple times is ok, +// but calling CloseIndex after this will make it not return the index. func (w *Writer) Close() error { + _, err := w.closeIndex(w.appendIndex) + return err +} + +// CloseIndex calls Close and returns an index on first call. +// This is not required if you are only adding index to a stream. +func (w *Writer) CloseIndex() ([]byte, error) { + return w.closeIndex(true) +} + +func (w *Writer) closeIndex(idx bool) ([]byte, error) { err := w.Flush() if w.output != nil { close(w.output) w.writerWg.Wait() w.output = nil } - if w.err(nil) == nil && w.writer != nil && w.pad > 0 { - add := calcSkippableFrame(w.written, int64(w.pad)) - frame, err := skippableFrame(w.ibuf[:0], add, w.randSrc) - if err = w.err(err); err != nil { - return err + + var index []byte + if w.err(nil) == nil && w.writer != nil { + // Create index. + if idx { + compSize := int64(-1) + if w.pad <= 1 { + compSize = w.written + } + index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize) + // Count as written for padding. + if w.appendIndex { + w.written += int64(len(index)) + } + if true { + _, err := w.index.Load(index) + if err != nil { + panic(err) + } + } + } + + if w.pad > 1 { + tmp := w.ibuf[:0] + if len(index) > 0 { + // Allocate another buffer. + tmp = w.buffers.Get().([]byte)[:0] + defer w.buffers.Put(tmp) + } + add := calcSkippableFrame(w.written, int64(w.pad)) + frame, err := skippableFrame(tmp, add, w.randSrc) + if err = w.err(err); err != nil { + return nil, err + } + n, err2 := w.writer.Write(frame) + if err2 == nil && n != len(frame) { + err2 = io.ErrShortWrite + } + _ = w.err(err2) + } + if len(index) > 0 && w.appendIndex { + n, err2 := w.writer.Write(index) + if err2 == nil && n != len(index) { + err2 = io.ErrShortWrite + } + _ = w.err(err2) } - _, err2 := w.writer.Write(frame) - _ = w.err(err2) } - _ = w.err(errClosed) + err = w.err(errClosed) if err == errClosed { - return nil + return index, nil } - return err + return nil, err } -const skippableFrameHeader = 4 - // calcSkippableFrame will return a total size to be added for written // to be divisible by multiple. // The value will always be > skippableFrameHeader. @@ -1057,6 +1223,15 @@ func WriterConcurrency(n int) WriterOption { } } +// WriterAddIndex will append an index to the end of a stream +// when it is closed. +func WriterAddIndex() WriterOption { + return func(w *Writer) error { + w.appendIndex = true + return nil + } +} + // WriterBetterCompression will enable better compression. // EncodeBetter compresses better than Encode but typically with a // 10-40% speed decrease on both compression and decompression. |