summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/klauspost/compress/s2/encode.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/encode.go')
-rw-r--r--vendor/github.com/klauspost/compress/s2/encode.go241
1 files changed, 208 insertions, 33 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/encode.go b/vendor/github.com/klauspost/compress/s2/encode.go
index aa8b108d..59f992ca 100644
--- a/vendor/github.com/klauspost/compress/s2/encode.go
+++ b/vendor/github.com/klauspost/compress/s2/encode.go
@@ -395,23 +395,26 @@ type Writer struct {
// ibuf is a buffer for the incoming (uncompressed) bytes.
ibuf []byte
- blockSize int
- obufLen int
- concurrency int
- written int64
- output chan chan result
- buffers sync.Pool
- pad int
+ blockSize int
+ obufLen int
+ concurrency int
+ written int64
+ uncompWritten int64 // Bytes sent to compression
+ output chan chan result
+ buffers sync.Pool
+ pad int
writer io.Writer
randSrc io.Reader
writerWg sync.WaitGroup
+ index Index
// wroteStreamHeader is whether we have written the stream header.
wroteStreamHeader bool
paramsOK bool
snappy bool
flushOnWrite bool
+ appendIndex bool
level uint8
}
@@ -422,7 +425,11 @@ const (
levelBest
)
-type result []byte
+type result struct {
+ b []byte
+ // Uncompressed start offset
+ startOffset int64
+}
// err returns the previously set error.
// If no error has been set it is set to err if not nil.
@@ -454,6 +461,9 @@ func (w *Writer) Reset(writer io.Writer) {
w.wroteStreamHeader = false
w.written = 0
w.writer = writer
+ w.uncompWritten = 0
+ w.index.reset(w.blockSize)
+
// If we didn't get a writer, stop here.
if writer == nil {
return
@@ -474,7 +484,8 @@ func (w *Writer) Reset(writer io.Writer) {
// Get a queued write.
for write := range toWrite {
// Wait for the data to be available.
- in := <-write
+ input := <-write
+ in := input.b
if len(in) > 0 {
if w.err(nil) == nil {
// Don't expose data from previous buffers.
@@ -485,11 +496,12 @@ func (w *Writer) Reset(writer io.Writer) {
err = io.ErrShortBuffer
}
_ = w.err(err)
+ w.err(w.index.add(w.written, input.startOffset))
w.written += int64(n)
}
}
if cap(in) >= w.obufLen {
- w.buffers.Put([]byte(in))
+ w.buffers.Put(in)
}
// close the incoming write request.
// This can be used for synchronizing flushes.
@@ -500,6 +512,9 @@ func (w *Writer) Reset(writer io.Writer) {
// Write satisfies the io.Writer interface.
func (w *Writer) Write(p []byte) (nRet int, errRet error) {
+ if err := w.err(nil); err != nil {
+ return 0, err
+ }
if w.flushOnWrite {
return w.write(p)
}
@@ -535,6 +550,9 @@ func (w *Writer) Write(p []byte) (nRet int, errRet error) {
// The return value n is the number of bytes read.
// Any error except io.EOF encountered during the read is also returned.
func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
+ if err := w.err(nil); err != nil {
+ return 0, err
+ }
if len(w.ibuf) > 0 {
err := w.Flush()
if err != nil {
@@ -577,6 +595,85 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return n, w.err(nil)
}
+// AddSkippableBlock will add a skippable block to the stream.
+// The ID must be 0x80-0xfe (inclusive).
+// Length of the skippable block must be <= 16777215 bytes.
+func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
+ if err := w.err(nil); err != nil {
+ return err
+ }
+ if len(data) == 0 {
+ return nil
+ }
+ if id < 0x80 || id > chunkTypePadding {
+ return fmt.Errorf("invalid skippable block id %x", id)
+ }
+ if len(data) > maxChunkSize {
+ return fmt.Errorf("skippable block excessed maximum size")
+ }
+ var header [4]byte
+ chunkLen := 4 + len(data)
+ header[0] = id
+ header[1] = uint8(chunkLen >> 0)
+ header[2] = uint8(chunkLen >> 8)
+ header[3] = uint8(chunkLen >> 16)
+ if w.concurrency == 1 {
+ write := func(b []byte) error {
+ n, err := w.writer.Write(b)
+ if err = w.err(err); err != nil {
+ return err
+ }
+ if n != len(data) {
+ return w.err(io.ErrShortWrite)
+ }
+ w.written += int64(n)
+ return w.err(nil)
+ }
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ if w.snappy {
+ if err := write([]byte(magicChunkSnappy)); err != nil {
+ return err
+ }
+ } else {
+ if err := write([]byte(magicChunk)); err != nil {
+ return err
+ }
+ }
+ }
+ if err := write(header[:]); err != nil {
+ return err
+ }
+ if err := write(data); err != nil {
+ return err
+ }
+ }
+
+ // Create output...
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ hWriter := make(chan result)
+ w.output <- hWriter
+ if w.snappy {
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
+ } else {
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
+ }
+ }
+
+ // Copy input.
+ inbuf := w.buffers.Get().([]byte)[:4]
+ copy(inbuf, header[:])
+ inbuf = append(inbuf, data...)
+
+ output := make(chan result, 1)
+ // Queue output.
+ w.output <- output
+ output <- result{startOffset: w.uncompWritten, b: inbuf}
+
+ return nil
+}
+
// EncodeBuffer will add a buffer to the stream.
// This is the fastest way to encode a stream,
// but the input buffer cannot be written to by the caller
@@ -614,9 +711,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
- hWriter <- []byte(magicChunkSnappy)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
- hWriter <- []byte(magicChunk)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@@ -632,6 +729,10 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
+ res := result{
+ startOffset: w.uncompWritten,
+ }
+ w.uncompWritten += int64(len(uncompressed))
go func() {
checksum := crc(uncompressed)
@@ -664,7 +765,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
- output <- obuf
+ res.b = obuf
+ output <- res
}()
}
return nil
@@ -708,9 +810,9 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
- hWriter <- []byte(magicChunkSnappy)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
- hWriter <- []byte(magicChunk)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@@ -731,6 +833,11 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
+ res := result{
+ startOffset: w.uncompWritten,
+ }
+ w.uncompWritten += int64(len(uncompressed))
+
go func() {
checksum := crc(uncompressed)
@@ -763,7 +870,8 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
- output <- obuf
+ res.b = obuf
+ output <- res
// Put unused buffer back in pool.
w.buffers.Put(inbuf)
@@ -793,9 +901,9 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
- hWriter <- []byte(magicChunkSnappy)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
- hWriter <- []byte(magicChunk)
+ hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@@ -806,6 +914,11 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
+ res := result{
+ startOffset: w.uncompWritten,
+ }
+ w.uncompWritten += int64(len(uncompressed))
+
go func() {
checksum := crc(uncompressed)
@@ -838,7 +951,8 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
- output <- obuf
+ res.b = obuf
+ output <- res
// Put unused buffer back in pool.
w.buffers.Put(inbuf)
@@ -912,7 +1026,10 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
if n != len(obuf) {
return 0, w.err(io.ErrShortWrite)
}
+ w.err(w.index.add(w.written, w.uncompWritten))
w.written += int64(n)
+ w.uncompWritten += int64(len(uncompressed))
+
if chunkType == chunkTypeUncompressedData {
// Write uncompressed data.
n, err := w.writer.Write(uncompressed)
@@ -961,39 +1078,88 @@ func (w *Writer) Flush() error {
res := make(chan result)
w.output <- res
// Block until this has been picked up.
- res <- nil
+ res <- result{b: nil, startOffset: w.uncompWritten}
// When it is closed, we have flushed.
<-res
return w.err(nil)
}
// Close calls Flush and then closes the Writer.
-// Calling Close multiple times is ok.
+// Calling Close multiple times is ok,
+// but calling CloseIndex after this will make it not return the index.
func (w *Writer) Close() error {
+ _, err := w.closeIndex(w.appendIndex)
+ return err
+}
+
+// CloseIndex calls Close and returns an index on first call.
+// This is not required if you are only adding index to a stream.
+func (w *Writer) CloseIndex() ([]byte, error) {
+ return w.closeIndex(true)
+}
+
+func (w *Writer) closeIndex(idx bool) ([]byte, error) {
err := w.Flush()
if w.output != nil {
close(w.output)
w.writerWg.Wait()
w.output = nil
}
- if w.err(nil) == nil && w.writer != nil && w.pad > 0 {
- add := calcSkippableFrame(w.written, int64(w.pad))
- frame, err := skippableFrame(w.ibuf[:0], add, w.randSrc)
- if err = w.err(err); err != nil {
- return err
+
+ var index []byte
+ if w.err(nil) == nil && w.writer != nil {
+ // Create index.
+ if idx {
+ compSize := int64(-1)
+ if w.pad <= 1 {
+ compSize = w.written
+ }
+ index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
+ // Count as written for padding.
+ if w.appendIndex {
+ w.written += int64(len(index))
+ }
+ if true {
+ _, err := w.index.Load(index)
+ if err != nil {
+ panic(err)
+ }
+ }
+ }
+
+ if w.pad > 1 {
+ tmp := w.ibuf[:0]
+ if len(index) > 0 {
+ // Allocate another buffer.
+ tmp = w.buffers.Get().([]byte)[:0]
+ defer w.buffers.Put(tmp)
+ }
+ add := calcSkippableFrame(w.written, int64(w.pad))
+ frame, err := skippableFrame(tmp, add, w.randSrc)
+ if err = w.err(err); err != nil {
+ return nil, err
+ }
+ n, err2 := w.writer.Write(frame)
+ if err2 == nil && n != len(frame) {
+ err2 = io.ErrShortWrite
+ }
+ _ = w.err(err2)
+ }
+ if len(index) > 0 && w.appendIndex {
+ n, err2 := w.writer.Write(index)
+ if err2 == nil && n != len(index) {
+ err2 = io.ErrShortWrite
+ }
+ _ = w.err(err2)
}
- _, err2 := w.writer.Write(frame)
- _ = w.err(err2)
}
- _ = w.err(errClosed)
+ err = w.err(errClosed)
if err == errClosed {
- return nil
+ return index, nil
}
- return err
+ return nil, err
}
-const skippableFrameHeader = 4
-
// calcSkippableFrame will return a total size to be added for written
// to be divisible by multiple.
// The value will always be > skippableFrameHeader.
@@ -1057,6 +1223,15 @@ func WriterConcurrency(n int) WriterOption {
}
}
+// WriterAddIndex will append an index to the end of a stream
+// when it is closed.
+func WriterAddIndex() WriterOption {
+ return func(w *Writer) error {
+ w.appendIndex = true
+ return nil
+ }
+}
+
// WriterBetterCompression will enable better compression.
// EncodeBetter compresses better than Encode but typically with a
// 10-40% speed decrease on both compression and decompression.