summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/francoispqt/gojay/decode_stream.go
blob: 74beee4d755f07c16d2f13f3de48c22da59da451 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package gojay

import (
	"sync"
	"time"
)

// UnmarshalerStream is the interface to implement for a slice, an array or a slice
// to decode a line delimited JSON to.
type UnmarshalerStream interface {
	UnmarshalStream(*StreamDecoder) error
}

// Stream is a struct holding the Stream api
var Stream = stream{}

type stream struct{}

// A StreamDecoder reads and decodes JSON values from an input stream.
//
// It implements conext.Context and provide a channel to notify interruption.
type StreamDecoder struct {
	mux sync.RWMutex
	*Decoder
	done     chan struct{}
	deadline *time.Time
}

// DecodeStream reads the next line delimited JSON-encoded value from the decoder's input (io.Reader) and stores it in the value pointed to by c.
//
// c must implement UnmarshalerStream. Ideally c is a channel. See example for implementation.
//
// See the documentation for Unmarshal for details about the conversion of JSON into a Go value.
func (dec *StreamDecoder) DecodeStream(c UnmarshalerStream) error {
	if dec.isPooled == 1 {
		panic(InvalidUsagePooledDecoderError("Invalid usage of pooled decoder"))
	}
	if dec.r == nil {
		dec.err = NoReaderError("No reader given to decode stream")
		close(dec.done)
		return dec.err
	}
	for ; dec.cursor < dec.length || dec.read(); dec.cursor++ {
		switch dec.data[dec.cursor] {
		case ' ', '\n', '\t', '\r', ',':
			continue
		default:
			// char is not space start reading
			for dec.nextChar() != 0 {
				// calling unmarshal stream
				err := c.UnmarshalStream(dec)
				if err != nil {
					dec.err = err
					close(dec.done)
					return err
				}
				// garbage collects buffer
				// we don't want the buffer to grow extensively
				dec.data = dec.data[dec.cursor:]
				dec.length = dec.length - dec.cursor
				dec.cursor = 0
			}
			// close the done channel to signal the end of the job
			close(dec.done)
			return nil
		}
	}
	close(dec.done)
	dec.mux.Lock()
	err := dec.raiseInvalidJSONErr(dec.cursor)
	dec.mux.Unlock()
	return err
}

// context.Context implementation

// Done returns a channel that's closed when work is done.
// It implements context.Context
func (dec *StreamDecoder) Done() <-chan struct{} {
	return dec.done
}

// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
func (dec *StreamDecoder) Deadline() (time.Time, bool) {
	if dec.deadline != nil {
		return *dec.deadline, true
	}
	return time.Time{}, false
}

// SetDeadline sets the deadline
func (dec *StreamDecoder) SetDeadline(t time.Time) {
	dec.deadline = &t
}

// Err returns nil if Done is not yet closed.
// If Done is closed, Err returns a non-nil error explaining why.
// It implements context.Context
func (dec *StreamDecoder) Err() error {
	select {
	case <-dec.done:
		dec.mux.RLock()
		defer dec.mux.RUnlock()
		return dec.err
	default:
		return nil
	}
}

// Value implements context.Context
func (dec *StreamDecoder) Value(key interface{}) interface{} {
	return nil
}