summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/klauspost/compress/s2/encode.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/encode.go')
-rw-r--r--vendor/github.com/klauspost/compress/s2/encode.go1172
1 files changed, 1172 insertions, 0 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/encode.go b/vendor/github.com/klauspost/compress/s2/encode.go
new file mode 100644
index 00000000..aa8b108d
--- /dev/null
+++ b/vendor/github.com/klauspost/compress/s2/encode.go
@@ -0,0 +1,1172 @@
+// Copyright 2011 The Snappy-Go Authors. All rights reserved.
+// Copyright (c) 2019 Klaus Post. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package s2
+
+import (
+ "crypto/rand"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "math/bits"
+ "runtime"
+ "sync"
+)
+
+// Encode returns the encoded form of src. The returned slice may be a sub-
+// slice of dst if dst was large enough to hold the entire encoded block.
+// Otherwise, a newly allocated slice will be returned.
+//
+// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// The blocks will require the same amount of memory to decode as encoding,
+// and does not make for concurrent decoding.
+// Also note that blocks do not contain CRC information, so corruption may be undetected.
+//
+// If you need to encode larger amounts of data, consider using
+// the streaming interface which gives all of these features.
+func Encode(dst, src []byte) []byte {
+ if n := MaxEncodedLen(len(src)); n < 0 {
+ panic(ErrTooLarge)
+ } else if cap(dst) < n {
+ dst = make([]byte, n)
+ } else {
+ dst = dst[:n]
+ }
+
+ // The block starts with the varint-encoded length of the decompressed bytes.
+ d := binary.PutUvarint(dst, uint64(len(src)))
+
+ if len(src) == 0 {
+ return dst[:d]
+ }
+ if len(src) < minNonLiteralBlockSize {
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+ }
+ n := encodeBlock(dst[d:], src)
+ if n > 0 {
+ d += n
+ return dst[:d]
+ }
+ // Not compressible
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+}
+
+// EncodeBetter returns the encoded form of src. The returned slice may be a sub-
+// slice of dst if dst was large enough to hold the entire encoded block.
+// Otherwise, a newly allocated slice will be returned.
+//
+// EncodeBetter compresses better than Encode but typically with a
+// 10-40% speed decrease on both compression and decompression.
+//
+// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// The blocks will require the same amount of memory to decode as encoding,
+// and does not make for concurrent decoding.
+// Also note that blocks do not contain CRC information, so corruption may be undetected.
+//
+// If you need to encode larger amounts of data, consider using
+// the streaming interface which gives all of these features.
+func EncodeBetter(dst, src []byte) []byte {
+ if n := MaxEncodedLen(len(src)); n < 0 {
+ panic(ErrTooLarge)
+ } else if len(dst) < n {
+ dst = make([]byte, n)
+ }
+
+ // The block starts with the varint-encoded length of the decompressed bytes.
+ d := binary.PutUvarint(dst, uint64(len(src)))
+
+ if len(src) == 0 {
+ return dst[:d]
+ }
+ if len(src) < minNonLiteralBlockSize {
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+ }
+ n := encodeBlockBetter(dst[d:], src)
+ if n > 0 {
+ d += n
+ return dst[:d]
+ }
+ // Not compressible
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+}
+
+// EncodeBest returns the encoded form of src. The returned slice may be a sub-
+// slice of dst if dst was large enough to hold the entire encoded block.
+// Otherwise, a newly allocated slice will be returned.
+//
+// EncodeBest compresses as good as reasonably possible but with a
+// big speed decrease.
+//
+// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// The blocks will require the same amount of memory to decode as encoding,
+// and does not make for concurrent decoding.
+// Also note that blocks do not contain CRC information, so corruption may be undetected.
+//
+// If you need to encode larger amounts of data, consider using
+// the streaming interface which gives all of these features.
+func EncodeBest(dst, src []byte) []byte {
+ if n := MaxEncodedLen(len(src)); n < 0 {
+ panic(ErrTooLarge)
+ } else if len(dst) < n {
+ dst = make([]byte, n)
+ }
+
+ // The block starts with the varint-encoded length of the decompressed bytes.
+ d := binary.PutUvarint(dst, uint64(len(src)))
+
+ if len(src) == 0 {
+ return dst[:d]
+ }
+ if len(src) < minNonLiteralBlockSize {
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+ }
+ n := encodeBlockBest(dst[d:], src)
+ if n > 0 {
+ d += n
+ return dst[:d]
+ }
+ // Not compressible
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+}
+
+// EncodeSnappy returns the encoded form of src. The returned slice may be a sub-
+// slice of dst if dst was large enough to hold the entire encoded block.
+// Otherwise, a newly allocated slice will be returned.
+//
+// The output is Snappy compatible and will likely decompress faster.
+//
+// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// The blocks will require the same amount of memory to decode as encoding,
+// and does not make for concurrent decoding.
+// Also note that blocks do not contain CRC information, so corruption may be undetected.
+//
+// If you need to encode larger amounts of data, consider using
+// the streaming interface which gives all of these features.
+func EncodeSnappy(dst, src []byte) []byte {
+ if n := MaxEncodedLen(len(src)); n < 0 {
+ panic(ErrTooLarge)
+ } else if cap(dst) < n {
+ dst = make([]byte, n)
+ } else {
+ dst = dst[:n]
+ }
+
+ // The block starts with the varint-encoded length of the decompressed bytes.
+ d := binary.PutUvarint(dst, uint64(len(src)))
+
+ if len(src) == 0 {
+ return dst[:d]
+ }
+ if len(src) < minNonLiteralBlockSize {
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+ }
+
+ n := encodeBlockSnappy(dst[d:], src)
+ if n > 0 {
+ d += n
+ return dst[:d]
+ }
+ // Not compressible
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+}
+
+// EncodeSnappyBetter returns the encoded form of src. The returned slice may be a sub-
+// slice of dst if dst was large enough to hold the entire encoded block.
+// Otherwise, a newly allocated slice will be returned.
+//
+// The output is Snappy compatible and will likely decompress faster.
+//
+// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// The blocks will require the same amount of memory to decode as encoding,
+// and does not make for concurrent decoding.
+// Also note that blocks do not contain CRC information, so corruption may be undetected.
+//
+// If you need to encode larger amounts of data, consider using
+// the streaming interface which gives all of these features.
+func EncodeSnappyBetter(dst, src []byte) []byte {
+ if n := MaxEncodedLen(len(src)); n < 0 {
+ panic(ErrTooLarge)
+ } else if cap(dst) < n {
+ dst = make([]byte, n)
+ } else {
+ dst = dst[:n]
+ }
+
+ // The block starts with the varint-encoded length of the decompressed bytes.
+ d := binary.PutUvarint(dst, uint64(len(src)))
+
+ if len(src) == 0 {
+ return dst[:d]
+ }
+ if len(src) < minNonLiteralBlockSize {
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+ }
+
+ n := encodeBlockBetterSnappy(dst[d:], src)
+ if n > 0 {
+ d += n
+ return dst[:d]
+ }
+ // Not compressible
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+}
+
+// EncodeSnappyBest returns the encoded form of src. The returned slice may be a sub-
+// slice of dst if dst was large enough to hold the entire encoded block.
+// Otherwise, a newly allocated slice will be returned.
+//
+// The output is Snappy compatible and will likely decompress faster.
+//
+// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// The blocks will require the same amount of memory to decode as encoding,
+// and does not make for concurrent decoding.
+// Also note that blocks do not contain CRC information, so corruption may be undetected.
+//
+// If you need to encode larger amounts of data, consider using
+// the streaming interface which gives all of these features.
+func EncodeSnappyBest(dst, src []byte) []byte {
+ if n := MaxEncodedLen(len(src)); n < 0 {
+ panic(ErrTooLarge)
+ } else if cap(dst) < n {
+ dst = make([]byte, n)
+ } else {
+ dst = dst[:n]
+ }
+
+ // The block starts with the varint-encoded length of the decompressed bytes.
+ d := binary.PutUvarint(dst, uint64(len(src)))
+
+ if len(src) == 0 {
+ return dst[:d]
+ }
+ if len(src) < minNonLiteralBlockSize {
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+ }
+
+ n := encodeBlockBestSnappy(dst[d:], src)
+ if n > 0 {
+ d += n
+ return dst[:d]
+ }
+ // Not compressible
+ d += emitLiteral(dst[d:], src)
+ return dst[:d]
+}
+
+// ConcatBlocks will concatenate the supplied blocks and append them to the supplied destination.
+// If the destination is nil or too small, a new will be allocated.
+// The blocks are not validated, so garbage in = garbage out.
+// dst may not overlap block data.
+// Any data in dst is preserved as is, so it will not be considered a block.
+func ConcatBlocks(dst []byte, blocks ...[]byte) ([]byte, error) {
+ totalSize := uint64(0)
+ compSize := 0
+ for _, b := range blocks {
+ l, hdr, err := decodedLen(b)
+ if err != nil {
+ return nil, err
+ }
+ totalSize += uint64(l)
+ compSize += len(b) - hdr
+ }
+ if totalSize == 0 {
+ dst = append(dst, 0)
+ return dst, nil
+ }
+ if totalSize > math.MaxUint32 {
+ return nil, ErrTooLarge
+ }
+ var tmp [binary.MaxVarintLen32]byte
+ hdrSize := binary.PutUvarint(tmp[:], totalSize)
+ wantSize := hdrSize + compSize
+
+ if cap(dst)-len(dst) < wantSize {
+ dst = append(make([]byte, 0, wantSize+len(dst)), dst...)
+ }
+ dst = append(dst, tmp[:hdrSize]...)
+ for _, b := range blocks {
+ _, hdr, err := decodedLen(b)
+ if err != nil {
+ return nil, err
+ }
+ dst = append(dst, b[hdr:]...)
+ }
+ return dst, nil
+}
+
+// inputMargin is the minimum number of extra input bytes to keep, inside
+// encodeBlock's inner loop. On some architectures, this margin lets us
+// implement a fast path for emitLiteral, where the copy of short (<= 16 byte)
+// literals can be implemented as a single load to and store from a 16-byte
+// register. That literal's actual length can be as short as 1 byte, so this
+// can copy up to 15 bytes too much, but that's OK as subsequent iterations of
+// the encoding loop will fix up the copy overrun, and this inputMargin ensures
+// that we don't overrun the dst and src buffers.
+const inputMargin = 8
+
+// minNonLiteralBlockSize is the minimum size of the input to encodeBlock that
+// will be accepted by the encoder.
+const minNonLiteralBlockSize = 32
+
+// MaxBlockSize is the maximum value where MaxEncodedLen will return a valid block size.
+// Blocks this big are highly discouraged, though.
+const MaxBlockSize = math.MaxUint32 - binary.MaxVarintLen32 - 5
+
+// MaxEncodedLen returns the maximum length of a snappy block, given its
+// uncompressed length.
+//
+// It will return a negative value if srcLen is too large to encode.
+// 32 bit platforms will have lower thresholds for rejecting big content.
+func MaxEncodedLen(srcLen int) int {
+ n := uint64(srcLen)
+ if n > 0xffffffff {
+ // Also includes negative.
+ return -1
+ }
+ // Size of the varint encoded block size.
+ n = n + uint64((bits.Len64(n)+7)/7)
+
+ // Add maximum size of encoding block as literals.
+ n += uint64(literalExtraSize(int64(srcLen)))
+ if n > 0xffffffff {
+ return -1
+ }
+ return int(n)
+}
+
+var errClosed = errors.New("s2: Writer is closed")
+
+// NewWriter returns a new Writer that compresses to w, using the
+// framing format described at
+// https://github.com/google/snappy/blob/master/framing_format.txt
+//
+// Users must call Close to guarantee all data has been forwarded to
+// the underlying io.Writer and that resources are released.
+// They may also call Flush zero or more times before calling Close.
+func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
+ w2 := Writer{
+ blockSize: defaultBlockSize,
+ concurrency: runtime.GOMAXPROCS(0),
+ randSrc: rand.Reader,
+ level: levelFast,
+ }
+ for _, opt := range opts {
+ if err := opt(&w2); err != nil {
+ w2.errState = err
+ return &w2
+ }
+ }
+ w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
+ w2.paramsOK = true
+ w2.ibuf = make([]byte, 0, w2.blockSize)
+ w2.buffers.New = func() interface{} {
+ return make([]byte, w2.obufLen)
+ }
+ w2.Reset(w)
+ return &w2
+}
+
+// Writer is an io.Writer that can write Snappy-compressed bytes.
+type Writer struct {
+ errMu sync.Mutex
+ errState error
+
+ // ibuf is a buffer for the incoming (uncompressed) bytes.
+ ibuf []byte
+
+ blockSize int
+ obufLen int
+ concurrency int
+ written int64
+ output chan chan result
+ buffers sync.Pool
+ pad int
+
+ writer io.Writer
+ randSrc io.Reader
+ writerWg sync.WaitGroup
+
+ // wroteStreamHeader is whether we have written the stream header.
+ wroteStreamHeader bool
+ paramsOK bool
+ snappy bool
+ flushOnWrite bool
+ level uint8
+}
+
+const (
+ levelUncompressed = iota + 1
+ levelFast
+ levelBetter
+ levelBest
+)
+
+type result []byte
+
+// err returns the previously set error.
+// If no error has been set it is set to err if not nil.
+func (w *Writer) err(err error) error {
+ w.errMu.Lock()
+ errSet := w.errState
+ if errSet == nil && err != nil {
+ w.errState = err
+ errSet = err
+ }
+ w.errMu.Unlock()
+ return errSet
+}
+
+// Reset discards the writer's state and switches the Snappy writer to write to w.
+// This permits reusing a Writer rather than allocating a new one.
+func (w *Writer) Reset(writer io.Writer) {
+ if !w.paramsOK {
+ return
+ }
+ // Close previous writer, if any.
+ if w.output != nil {
+ close(w.output)
+ w.writerWg.Wait()
+ w.output = nil
+ }
+ w.errState = nil
+ w.ibuf = w.ibuf[:0]
+ w.wroteStreamHeader = false
+ w.written = 0
+ w.writer = writer
+ // If we didn't get a writer, stop here.
+ if writer == nil {
+ return
+ }
+ // If no concurrency requested, don't spin up writer goroutine.
+ if w.concurrency == 1 {
+ return
+ }
+
+ toWrite := make(chan chan result, w.concurrency)
+ w.output = toWrite
+ w.writerWg.Add(1)
+
+ // Start a writer goroutine that will write all output in order.
+ go func() {
+ defer w.writerWg.Done()
+
+ // Get a queued write.
+ for write := range toWrite {
+ // Wait for the data to be available.
+ in := <-write
+ if len(in) > 0 {
+ if w.err(nil) == nil {
+ // Don't expose data from previous buffers.
+ toWrite := in[:len(in):len(in)]
+ // Write to output.
+ n, err := writer.Write(toWrite)
+ if err == nil && n != len(toWrite) {
+ err = io.ErrShortBuffer
+ }
+ _ = w.err(err)
+ w.written += int64(n)
+ }
+ }
+ if cap(in) >= w.obufLen {
+ w.buffers.Put([]byte(in))
+ }
+ // close the incoming write request.
+ // This can be used for synchronizing flushes.
+ close(write)
+ }
+ }()
+}
+
+// Write satisfies the io.Writer interface.
+func (w *Writer) Write(p []byte) (nRet int, errRet error) {
+ if w.flushOnWrite {
+ return w.write(p)
+ }
+ // If we exceed the input buffer size, start writing
+ for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
+ var n int
+ if len(w.ibuf) == 0 {
+ // Large write, empty buffer.
+ // Write directly from p to avoid copy.
+ n, _ = w.write(p)
+ } else {
+ n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
+ w.ibuf = w.ibuf[:len(w.ibuf)+n]
+ w.write(w.ibuf)
+ w.ibuf = w.ibuf[:0]
+ }
+ nRet += n
+ p = p[n:]
+ }
+ if err := w.err(nil); err != nil {
+ return nRet, err
+ }
+ // p should always be able to fit into w.ibuf now.
+ n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
+ w.ibuf = w.ibuf[:len(w.ibuf)+n]
+ nRet += n
+ return nRet, nil
+}
+
+// ReadFrom implements the io.ReaderFrom interface.
+// Using this is typically more efficient since it avoids a memory copy.
+// ReadFrom reads data from r until EOF or error.
+// The return value n is the number of bytes read.
+// Any error except io.EOF encountered during the read is also returned.
+func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
+ if len(w.ibuf) > 0 {
+ err := w.Flush()
+ if err != nil {
+ return 0, err
+ }
+ }
+ if br, ok := r.(byter); ok {
+ buf := br.Bytes()
+ if err := w.EncodeBuffer(buf); err != nil {
+ return 0, err
+ }
+ return int64(len(buf)), w.Flush()
+ }
+ for {
+ inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
+ n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
+ if err != nil {
+ if err == io.ErrUnexpectedEOF {
+ err = io.EOF
+ }
+ if err != io.EOF {
+ return n, w.err(err)
+ }
+ }
+ if n2 == 0 {
+ break
+ }
+ n += int64(n2)
+ err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
+ if w.err(err2) != nil {
+ break
+ }
+
+ if err != nil {
+ // We got EOF and wrote everything
+ break
+ }
+ }
+
+ return n, w.err(nil)
+}
+
+// EncodeBuffer will add a buffer to the stream.
+// This is the fastest way to encode a stream,
+// but the input buffer cannot be written to by the caller
+// until Flush or Close has been called when concurrency != 1.
+//
+// If you cannot control that, use the regular Write function.
+//
+// Note that input is not buffered.
+// This means that each write will result in discrete blocks being created.
+// For buffered writes, use the regular Write function.
+func (w *Writer) EncodeBuffer(buf []byte) (err error) {
+ if err := w.err(nil); err != nil {
+ return err
+ }
+
+ if w.flushOnWrite {
+ _, err := w.write(buf)
+ return err
+ }
+ // Flush queued data first.
+ if len(w.ibuf) > 0 {
+ err := w.Flush()
+ if err != nil {
+ return err
+ }
+ }
+ if w.concurrency == 1 {
+ _, err := w.writeSync(buf)
+ return err
+ }
+
+ // Spawn goroutine and write block to output channel.
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ hWriter := make(chan result)
+ w.output <- hWriter
+ if w.snappy {
+ hWriter <- []byte(magicChunkSnappy)
+ } else {
+ hWriter <- []byte(magicChunk)
+ }
+ }
+
+ for len(buf) > 0 {
+ // Cut input.
+ uncompressed := buf
+ if len(uncompressed) > w.blockSize {
+ uncompressed = uncompressed[:w.blockSize]
+ }
+ buf = buf[len(uncompressed):]
+ // Get an output buffer.
+ obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
+ output := make(chan result)
+ // Queue output now, so we keep order.
+ w.output <- output
+ go func() {
+ checksum := crc(uncompressed)
+
+ // Set to uncompressed.
+ chunkType := uint8(chunkTypeUncompressedData)
+ chunkLen := 4 + len(uncompressed)
+
+ // Attempt compressing.
+ n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
+ n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
+
+ // Check if we should use this, or store as uncompressed instead.
+ if n2 > 0 {
+ chunkType = uint8(chunkTypeCompressedData)
+ chunkLen = 4 + n + n2
+ obuf = obuf[:obufHeaderLen+n+n2]
+ } else {
+ // copy uncompressed
+ copy(obuf[obufHeaderLen:], uncompressed)
+ }
+
+ // Fill in the per-chunk header that comes before the body.
+ obuf[0] = chunkType
+ obuf[1] = uint8(chunkLen >> 0)
+ obuf[2] = uint8(chunkLen >> 8)
+ obuf[3] = uint8(chunkLen >> 16)
+ obuf[4] = uint8(checksum >> 0)
+ obuf[5] = uint8(checksum >> 8)
+ obuf[6] = uint8(checksum >> 16)
+ obuf[7] = uint8(checksum >> 24)
+
+ // Queue final output.
+ output <- obuf
+ }()
+ }
+ return nil
+}
+
+func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
+ if w.snappy {
+ switch w.level {
+ case levelFast:
+ return encodeBlockSnappy(obuf, uncompressed)
+ case levelBetter:
+ return encodeBlockBetterSnappy(obuf, uncompressed)
+ case levelBest:
+ return encodeBlockBestSnappy(obuf, uncompressed)
+ }
+ return 0
+ }
+ switch w.level {
+ case levelFast:
+ return encodeBlock(obuf, uncompressed)
+ case levelBetter:
+ return encodeBlockBetter(obuf, uncompressed)
+ case levelBest:
+ return encodeBlockBest(obuf, uncompressed)
+ }
+ return 0
+}
+
+func (w *Writer) write(p []byte) (nRet int, errRet error) {
+ if err := w.err(nil); err != nil {
+ return 0, err
+ }
+ if w.concurrency == 1 {
+ return w.writeSync(p)
+ }
+
+ // Spawn goroutine and write block to output channel.
+ for len(p) > 0 {
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ hWriter := make(chan result)
+ w.output <- hWriter
+ if w.snappy {
+ hWriter <- []byte(magicChunkSnappy)
+ } else {
+ hWriter <- []byte(magicChunk)
+ }
+ }
+
+ var uncompressed []byte
+ if len(p) > w.blockSize {
+ uncompressed, p = p[:w.blockSize], p[w.blockSize:]
+ } else {
+ uncompressed, p = p, nil
+ }
+
+ // Copy input.
+ // If the block is incompressible, this is used for the result.
+ inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
+ obuf := w.buffers.Get().([]byte)[:w.obufLen]
+ copy(inbuf[obufHeaderLen:], uncompressed)
+ uncompressed = inbuf[obufHeaderLen:]
+
+ output := make(chan result)
+ // Queue output now, so we keep order.
+ w.output <- output
+ go func() {
+ checksum := crc(uncompressed)
+
+ // Set to uncompressed.
+ chunkType := uint8(chunkTypeUncompressedData)
+ chunkLen := 4 + len(uncompressed)
+
+ // Attempt compressing.
+ n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
+ n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
+
+ // Check if we should use this, or store as uncompressed instead.
+ if n2 > 0 {
+ chunkType = uint8(chunkTypeCompressedData)
+ chunkLen = 4 + n + n2
+ obuf = obuf[:obufHeaderLen+n+n2]
+ } else {
+ // Use input as output.
+ obuf, inbuf = inbuf, obuf
+ }
+
+ // Fill in the per-chunk header that comes before the body.
+ obuf[0] = chunkType
+ obuf[1] = uint8(chunkLen >> 0)
+ obuf[2] = uint8(chunkLen >> 8)
+ obuf[3] = uint8(chunkLen >> 16)
+ obuf[4] = uint8(checksum >> 0)
+ obuf[5] = uint8(checksum >> 8)
+ obuf[6] = uint8(checksum >> 16)
+ obuf[7] = uint8(checksum >> 24)
+
+ // Queue final output.
+ output <- obuf
+
+ // Put unused buffer back in pool.
+ w.buffers.Put(inbuf)
+ }()
+ nRet += len(uncompressed)
+ }
+ return nRet, nil
+}
+
+// writeFull is a special version of write that will always write the full buffer.
+// Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
+// The data will be written as a single block.
+// The caller is not allowed to use inbuf after this function has been called.
+func (w *Writer) writeFull(inbuf []byte) (errRet error) {
+ if err := w.err(nil); err != nil {
+ return err
+ }
+
+ if w.concurrency == 1 {
+ _, err := w.writeSync(inbuf[obufHeaderLen:])
+ return err
+ }
+
+ // Spawn goroutine and write block to output channel.
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ hWriter := make(chan result)
+ w.output <- hWriter
+ if w.snappy {
+ hWriter <- []byte(magicChunkSnappy)
+ } else {
+ hWriter <- []byte(magicChunk)
+ }
+ }
+
+ // Get an output buffer.
+ obuf := w.buffers.Get().([]byte)[:w.obufLen]
+ uncompressed := inbuf[obufHeaderLen:]
+
+ output := make(chan result)
+ // Queue output now, so we keep order.
+ w.output <- output
+ go func() {
+ checksum := crc(uncompressed)
+
+ // Set to uncompressed.
+ chunkType := uint8(chunkTypeUncompressedData)
+ chunkLen := 4 + len(uncompressed)
+
+ // Attempt compressing.
+ n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
+ n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
+
+ // Check if we should use this, or store as uncompressed instead.
+ if n2 > 0 {
+ chunkType = uint8(chunkTypeCompressedData)
+ chunkLen = 4 + n + n2
+ obuf = obuf[:obufHeaderLen+n+n2]
+ } else {
+ // Use input as output.
+ obuf, inbuf = inbuf, obuf
+ }
+
+ // Fill in the per-chunk header that comes before the body.
+ obuf[0] = chunkType
+ obuf[1] = uint8(chunkLen >> 0)
+ obuf[2] = uint8(chunkLen >> 8)
+ obuf[3] = uint8(chunkLen >> 16)
+ obuf[4] = uint8(checksum >> 0)
+ obuf[5] = uint8(checksum >> 8)
+ obuf[6] = uint8(checksum >> 16)
+ obuf[7] = uint8(checksum >> 24)
+
+ // Queue final output.
+ output <- obuf
+
+ // Put unused buffer back in pool.
+ w.buffers.Put(inbuf)
+ }()
+ return nil
+}
+
+func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
+ if err := w.err(nil); err != nil {
+ return 0, err
+ }
+ if !w.wroteStreamHeader {
+ w.wroteStreamHeader = true
+ var n int
+ var err error
+ if w.snappy {
+ n, err = w.writer.Write([]byte(magicChunkSnappy))
+ } else {
+ n, err = w.writer.Write([]byte(magicChunk))
+ }
+ if err != nil {
+ return 0, w.err(err)
+ }
+ if n != len(magicChunk) {
+ return 0, w.err(io.ErrShortWrite)
+ }
+ w.written += int64(n)
+ }
+
+ for len(p) > 0 {
+ var uncompressed []byte
+ if len(p) > w.blockSize {
+ uncompressed, p = p[:w.blockSize], p[w.blockSize:]
+ } else {
+ uncompressed, p = p, nil
+ }
+
+ obuf := w.buffers.Get().([]byte)[:w.obufLen]
+ checksum := crc(uncompressed)
+
+ // Set to uncompressed.
+ chunkType := uint8(chunkTypeUncompressedData)
+ chunkLen := 4 + len(uncompressed)
+
+ // Attempt compressing.
+ n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
+ n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
+
+ if n2 > 0 {
+ chunkType = uint8(chunkTypeCompressedData)
+ chunkLen = 4 + n + n2
+ obuf = obuf[:obufHeaderLen+n+n2]
+ } else {
+ obuf = obuf[:8]
+ }
+
+ // Fill in the per-chunk header that comes before the body.
+ obuf[0] = chunkType
+ obuf[1] = uint8(chunkLen >> 0)
+ obuf[2] = uint8(chunkLen >> 8)
+ obuf[3] = uint8(chunkLen >> 16)
+ obuf[4] = uint8(checksum >> 0)
+ obuf[5] = uint8(checksum >> 8)
+ obuf[6] = uint8(checksum >> 16)
+ obuf[7] = uint8(checksum >> 24)
+
+ n, err := w.writer.Write(obuf)
+ if err != nil {
+ return 0, w.err(err)
+ }
+ if n != len(obuf) {
+ return 0, w.err(io.ErrShortWrite)
+ }
+ w.written += int64(n)
+ if chunkType == chunkTypeUncompressedData {
+ // Write uncompressed data.
+ n, err := w.writer.Write(uncompressed)
+ if err != nil {
+ return 0, w.err(err)
+ }
+ if n != len(uncompressed) {
+ return 0, w.err(io.ErrShortWrite)
+ }
+ w.written += int64(n)
+ }
+ w.buffers.Put(obuf)
+ // Queue final output.
+ nRet += len(uncompressed)
+ }
+ return nRet, nil
+}
+
+// Flush flushes the Writer to its underlying io.Writer.
+// This does not apply padding.
+func (w *Writer) Flush() error {
+ if err := w.err(nil); err != nil {
+ return err
+ }
+
+ // Queue any data still in input buffer.
+ if len(w.ibuf) != 0 {
+ if !w.wroteStreamHeader {
+ _, err := w.writeSync(w.ibuf)
+ w.ibuf = w.ibuf[:0]
+ return w.err(err)
+ } else {
+ _, err := w.write(w.ibuf)
+ w.ibuf = w.ibuf[:0]
+ err = w.err(err)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ if w.output == nil {
+ return w.err(nil)
+ }
+
+ // Send empty buffer
+ res := make(chan result)
+ w.output <- res
+ // Block until this has been picked up.
+ res <- nil
+ // When it is closed, we have flushed.
+ <-res
+ return w.err(nil)
+}
+
+// Close calls Flush and then closes the Writer.
+// Calling Close multiple times is ok.
+func (w *Writer) Close() error {
+ err := w.Flush()
+ if w.output != nil {
+ close(w.output)
+ w.writerWg.Wait()
+ w.output = nil
+ }
+ if w.err(nil) == nil && w.writer != nil && w.pad > 0 {
+ add := calcSkippableFrame(w.written, int64(w.pad))
+ frame, err := skippableFrame(w.ibuf[:0], add, w.randSrc)
+ if err = w.err(err); err != nil {
+ return err
+ }
+ _, err2 := w.writer.Write(frame)
+ _ = w.err(err2)
+ }
+ _ = w.err(errClosed)
+ if err == errClosed {
+ return nil
+ }
+ return err
+}
+
+const skippableFrameHeader = 4
+
+// calcSkippableFrame will return a total size to be added for written
+// to be divisible by multiple.
+// The value will always be > skippableFrameHeader.
+// The function will panic if written < 0 or wantMultiple <= 0.
+func calcSkippableFrame(written, wantMultiple int64) int {
+ if wantMultiple <= 0 {
+ panic("wantMultiple <= 0")
+ }
+ if written < 0 {
+ panic("written < 0")
+ }
+ leftOver := written % wantMultiple
+ if leftOver == 0 {
+ return 0
+ }
+ toAdd := wantMultiple - leftOver
+ for toAdd < skippableFrameHeader {
+ toAdd += wantMultiple
+ }
+ return int(toAdd)
+}
+
+// skippableFrame will add a skippable frame with a total size of bytes.
+// total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
+func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
+ if total == 0 {
+ return dst, nil
+ }
+ if total < skippableFrameHeader {
+ return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
+ }
+ if int64(total) >= maxBlockSize+skippableFrameHeader {
+ return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
+ }
+ // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
+ dst = append(dst, chunkTypePadding)
+ f := uint32(total - skippableFrameHeader)
+ // Add chunk length.
+ dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
+ // Add data
+ start := len(dst)
+ dst = append(dst, make([]byte, f)...)
+ _, err := io.ReadFull(r, dst[start:])
+ return dst, err
+}
+
+// WriterOption is an option for creating a encoder.
+type WriterOption func(*Writer) error
+
+// WriterConcurrency will set the concurrency,
+// meaning the maximum number of decoders to run concurrently.
+// The value supplied must be at least 1.
+// By default this will be set to GOMAXPROCS.
+func WriterConcurrency(n int) WriterOption {
+ return func(w *Writer) error {
+ if n <= 0 {
+ return errors.New("concurrency must be at least 1")
+ }
+ w.concurrency = n
+ return nil
+ }
+}
+
+// WriterBetterCompression will enable better compression.
+// EncodeBetter compresses better than Encode but typically with a
+// 10-40% speed decrease on both compression and decompression.
+func WriterBetterCompression() WriterOption {
+ return func(w *Writer) error {
+ w.level = levelBetter
+ return nil
+ }
+}
+
+// WriterBestCompression will enable better compression.
+// EncodeBetter compresses better than Encode but typically with a
+// big speed decrease on compression.
+func WriterBestCompression() WriterOption {
+ return func(w *Writer) error {
+ w.level = levelBest
+ return nil
+ }
+}
+
+// WriterUncompressed will bypass compression.
+// The stream will be written as uncompressed blocks only.
+// If concurrency is > 1 CRC and output will still be done async.
+func WriterUncompressed() WriterOption {
+ return func(w *Writer) error {
+ w.level = levelUncompressed
+ return nil
+ }
+}
+
+// WriterBlockSize allows to override the default block size.
+// Blocks will be this size or smaller.
+// Minimum size is 4KB and and maximum size is 4MB.
+//
+// Bigger blocks may give bigger throughput on systems with many cores,
+// and will increase compression slightly, but it will limit the possible
+// concurrency for smaller payloads for both encoding and decoding.
+// Default block size is 1MB.
+//
+// When writing Snappy compatible output using WriterSnappyCompat,
+// the maximum block size is 64KB.
+func WriterBlockSize(n int) WriterOption {
+ return func(w *Writer) error {
+ if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
+ return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
+ }
+ if n > maxBlockSize || n < minBlockSize {
+ return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
+ }
+ w.blockSize = n
+ return nil
+ }
+}
+
+// WriterPadding will add padding to all output so the size will be a multiple of n.
+// This can be used to obfuscate the exact output size or make blocks of a certain size.
+// The contents will be a skippable frame, so it will be invisible by the decoder.
+// n must be > 0 and <= 4MB.
+// The padded area will be filled with data from crypto/rand.Reader.
+// The padding will be applied whenever Close is called on the writer.
+func WriterPadding(n int) WriterOption {
+ return func(w *Writer) error {
+ if n <= 0 {
+ return fmt.Errorf("s2: padding must be at least 1")
+ }
+ // No need to waste our time.
+ if n == 1 {
+ w.pad = 0
+ }
+ if n > maxBlockSize {
+ return fmt.Errorf("s2: padding must less than 4MB")
+ }
+ w.pad = n
+ return nil
+ }
+}
+
+// WriterPaddingSrc will get random data for padding from the supplied source.
+// By default crypto/rand is used.
+func WriterPaddingSrc(reader io.Reader) WriterOption {
+ return func(w *Writer) error {
+ w.randSrc = reader
+ return nil
+ }
+}
+
+// WriterSnappyCompat will write snappy compatible output.
+// The output can be decompressed using either snappy or s2.
+// If block size is more than 64KB it is set to that.
+func WriterSnappyCompat() WriterOption {
+ return func(w *Writer) error {
+ w.snappy = true
+ if w.blockSize > 64<<10 {
+ // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
+ // And allows us to skip some size checks.
+ w.blockSize = (64 << 10) - 8
+ }
+ return nil
+ }
+}
+
+// WriterFlushOnWrite will compress blocks on each call to the Write function.
+//
+// This is quite inefficient as blocks size will depend on the write size.
+//
+// Use WriterConcurrency(1) to also make sure that output is flushed.
+// When Write calls return, otherwise they will be written when compression is done.
+func WriterFlushOnWrite() WriterOption {
+ return func(w *Writer) error {
+ w.flushOnWrite = true
+ return nil
+ }
+}