summaryrefslogblamecommitdiffstats
path: root/vendor/github.com/francoispqt/gojay/encode_stream.go
blob: fae8a17cf88f2f3ff4308ec1075cb1f43bc8923b (plain) (tree)











































































































































































































                                                                                             
package gojay

import (
	"strconv"
	"sync"
	"time"
)

// MarshalerStream is the interface to implement
// to continuously encode of stream of data.
type MarshalerStream interface {
	MarshalStream(enc *StreamEncoder)
}

// A StreamEncoder reads and encodes values to JSON from an input stream.
//
// It implements conext.Context and provide a channel to notify interruption.
type StreamEncoder struct {
	mux *sync.RWMutex
	*Encoder
	nConsumer int
	delimiter byte
	deadline  *time.Time
	done      chan struct{}
}

// EncodeStream spins up a defined number of non blocking consumers of the MarshalerStream m.
//
// m must implement MarshalerStream. Ideally m is a channel. See example for implementation.
//
// See the documentation for Marshal for details about the conversion of Go value to JSON.
func (s *StreamEncoder) EncodeStream(m MarshalerStream) {
	// if a single consumer, just use this encoder
	if s.nConsumer == 1 {
		go consume(s, s, m)
		return
	}
	// else use this Encoder only for first consumer
	// and use new encoders for other consumers
	// this is to avoid concurrent writing to same buffer
	// resulting in a weird JSON
	go consume(s, s, m)
	for i := 1; i < s.nConsumer; i++ {
		s.mux.RLock()
		select {
		case <-s.done:
		default:
			ss := Stream.borrowEncoder(s.w)
			ss.mux.Lock()
			ss.done = s.done
			ss.buf = make([]byte, 0, 512)
			ss.delimiter = s.delimiter
			go consume(s, ss, m)
			ss.mux.Unlock()
		}
		s.mux.RUnlock()
	}
	return
}

// LineDelimited sets the delimiter to a new line character.
//
// It will add a new line after each JSON marshaled by the MarshalerStream
func (s *StreamEncoder) LineDelimited() *StreamEncoder {
	s.delimiter = '\n'
	return s
}

// CommaDelimited sets the delimiter to a comma.
//
// It will add a new line after each JSON marshaled by the MarshalerStream
func (s *StreamEncoder) CommaDelimited() *StreamEncoder {
	s.delimiter = ','
	return s
}

// NConsumer sets the number of non blocking go routine to consume the stream.
func (s *StreamEncoder) NConsumer(n int) *StreamEncoder {
	s.nConsumer = n
	return s
}

// Release sends back a Decoder to the pool.
// If a decoder is used after calling Release
// a panic will be raised with an InvalidUsagePooledDecoderError error.
func (s *StreamEncoder) Release() {
	s.isPooled = 1
	streamEncPool.Put(s)
}

// Done returns a channel that's closed when work is done.
// It implements context.Context
func (s *StreamEncoder) Done() <-chan struct{} {
	return s.done
}

// Err returns nil if Done is not yet closed.
// If Done is closed, Err returns a non-nil error explaining why.
// It implements context.Context
func (s *StreamEncoder) Err() error {
	return s.err
}

// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
func (s *StreamEncoder) Deadline() (time.Time, bool) {
	if s.deadline != nil {
		return *s.deadline, true
	}
	return time.Time{}, false
}

// SetDeadline sets the deadline
func (s *StreamEncoder) SetDeadline(t time.Time) {
	s.deadline = &t
}

// Value implements context.Context
func (s *StreamEncoder) Value(key interface{}) interface{} {
	return nil
}

// Cancel cancels the consumers of the stream, interrupting the stream encoding.
//
// After calling cancel, Done() will return a closed channel.
func (s *StreamEncoder) Cancel(err error) {
	s.mux.Lock()
	defer s.mux.Unlock()
	
	select {
	case <-s.done:
	default:
		s.err = err
		close(s.done)
	}
}

// AddObject adds an object to be encoded.
// value must implement MarshalerJSONObject.
func (s *StreamEncoder) AddObject(v MarshalerJSONObject) {
	if v.IsNil() {
		return
	}
	s.Encoder.writeByte('{')
	v.MarshalJSONObject(s.Encoder)
	s.Encoder.writeByte('}')
	s.Encoder.writeByte(s.delimiter)
}

// AddString adds a string to be encoded.
func (s *StreamEncoder) AddString(v string) {
	s.Encoder.writeByte('"')
	s.Encoder.writeString(v)
	s.Encoder.writeByte('"')
	s.Encoder.writeByte(s.delimiter)
}

// AddArray adds an implementation of MarshalerJSONArray to be encoded.
func (s *StreamEncoder) AddArray(v MarshalerJSONArray) {
	s.Encoder.writeByte('[')
	v.MarshalJSONArray(s.Encoder)
	s.Encoder.writeByte(']')
	s.Encoder.writeByte(s.delimiter)
}

// AddInt adds an int to be encoded.
func (s *StreamEncoder) AddInt(value int) {
	s.buf = strconv.AppendInt(s.buf, int64(value), 10)
	s.Encoder.writeByte(s.delimiter)
}

// AddFloat64 adds a float64 to be encoded.
func (s *StreamEncoder) AddFloat64(value float64) {
	s.buf = strconv.AppendFloat(s.buf, value, 'f', -1, 64)
	s.Encoder.writeByte(s.delimiter)
}

// AddFloat adds a float64 to be encoded.
func (s *StreamEncoder) AddFloat(value float64) {
	s.AddFloat64(value)
}

// Non exposed

func consume(init *StreamEncoder, s *StreamEncoder, m MarshalerStream) {
	defer s.Release()
	for {
		select {
		case <-init.Done():
			return
		default:
			m.MarshalStream(s)
			if s.Encoder.err != nil {
				init.Cancel(s.Encoder.err)
				return
			}
			i, err := s.Encoder.Write()
			if err != nil || i == 0 {
				init.Cancel(err)
				return
			}
		}
	}
}