1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
|
package ddp
import (
"encoding/hex"
"fmt"
"io"
"log"
"os"
"sync"
"time"
)
// Gather statistics about a DDP connection.
// ---------------------------------------------------------
// io utilities
//
// This is generic - should be moved into a stand alone lib
// ---------------------------------------------------------
// ReaderProxy provides common tooling for structs that manage an io.Reader.
type ReaderProxy struct {
reader io.Reader
}
// NewReaderProxy creates a new proxy for the provided reader.
func NewReaderProxy(reader io.Reader) *ReaderProxy {
return &ReaderProxy{reader}
}
// SetReader sets the reader on the proxy.
func (r *ReaderProxy) SetReader(reader io.Reader) {
r.reader = reader
}
// WriterProxy provides common tooling for structs that manage an io.Writer.
type WriterProxy struct {
writer io.Writer
}
// NewWriterProxy creates a new proxy for the provided writer.
func NewWriterProxy(writer io.Writer) *WriterProxy {
return &WriterProxy{writer}
}
// SetWriter sets the writer on the proxy.
func (w *WriterProxy) SetWriter(writer io.Writer) {
w.writer = writer
}
// Logging data types
const (
DataByte = iota // data is raw []byte
DataText // data is string data
)
// Logger logs data from i/o sources.
type Logger struct {
// Active is true if the logger should be logging reads
Active bool
// Truncate is >0 to indicate the number of characters to truncate output
Truncate int
logger *log.Logger
dtype int
}
// NewLogger creates a new i/o logger.
func NewLogger(logger *log.Logger, active bool, dataType int, truncate int) Logger {
return Logger{logger: logger, Active: active, dtype: dataType, Truncate: truncate}
}
// Log logs the current i/o operation and returns the read and error for
// easy call chaining.
func (l *Logger) Log(p []byte, n int, err error) (int, error) {
if l.Active && err == nil {
limit := n
truncated := false
if l.Truncate > 0 && l.Truncate < limit {
limit = l.Truncate
truncated = true
}
switch l.dtype {
case DataText:
if truncated {
l.logger.Printf("[%d] %s...", n, string(p[:limit]))
} else {
l.logger.Printf("[%d] %s", n, string(p[:limit]))
}
case DataByte:
fallthrough
default:
l.logger.Println(hex.Dump(p[:limit]))
}
}
return n, err
}
// ReaderLogger logs data from any io.Reader.
// ReaderLogger wraps a Reader and passes data to the actual data consumer.
type ReaderLogger struct {
Logger
ReaderProxy
}
// NewReaderDataLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewReaderDataLogger(reader io.Reader) *ReaderLogger {
logger := log.New(os.Stdout, "<- ", log.LstdFlags)
return NewReaderLogger(reader, logger, true, DataByte, 0)
}
// NewReaderTextLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewReaderTextLogger(reader io.Reader) *ReaderLogger {
logger := log.New(os.Stdout, "<- ", log.LstdFlags)
return NewReaderLogger(reader, logger, true, DataText, 80)
}
// NewReaderLogger creates a Reader logger for the provided parameters.
func NewReaderLogger(reader io.Reader, logger *log.Logger, active bool, dataType int, truncate int) *ReaderLogger {
return &ReaderLogger{ReaderProxy: *NewReaderProxy(reader), Logger: NewLogger(logger, active, dataType, truncate)}
}
// Read logs the read bytes and passes them to the wrapped reader.
func (r *ReaderLogger) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
return r.Log(p, n, err)
}
// WriterLogger logs data from any io.Writer.
// WriterLogger wraps a Writer and passes data to the actual data producer.
type WriterLogger struct {
Logger
WriterProxy
}
// NewWriterDataLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewWriterDataLogger(writer io.Writer) *WriterLogger {
logger := log.New(os.Stdout, "-> ", log.LstdFlags)
return NewWriterLogger(writer, logger, true, DataByte, 0)
}
// NewWriterTextLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewWriterTextLogger(writer io.Writer) *WriterLogger {
logger := log.New(os.Stdout, "-> ", log.LstdFlags)
return NewWriterLogger(writer, logger, true, DataText, 80)
}
// NewWriterLogger creates a Reader logger for the provided parameters.
func NewWriterLogger(writer io.Writer, logger *log.Logger, active bool, dataType int, truncate int) *WriterLogger {
return &WriterLogger{WriterProxy: *NewWriterProxy(writer), Logger: NewLogger(logger, active, dataType, truncate)}
}
// Write logs the written bytes and passes them to the wrapped writer.
func (w *WriterLogger) Write(p []byte) (int, error) {
if w.writer != nil {
n, err := w.writer.Write(p)
return w.Log(p, n, err)
}
return 0, nil
}
// Stats tracks statistics for i/o operations. Stats are produced from a
// of a running stats agent.
type Stats struct {
// Bytes is the total number of bytes transferred.
Bytes int64
// Ops is the total number of i/o operations performed.
Ops int64
// Errors is the total number of i/o errors encountered.
Errors int64
// Runtime is the duration that stats have been gathered.
Runtime time.Duration
}
// ClientStats displays combined statistics for the Client.
type ClientStats struct {
// Reads provides statistics on the raw i/o network reads for the current connection.
Reads *Stats
// Reads provides statistics on the raw i/o network reads for the all client connections.
TotalReads *Stats
// Writes provides statistics on the raw i/o network writes for the current connection.
Writes *Stats
// Writes provides statistics on the raw i/o network writes for all the client connections.
TotalWrites *Stats
// Reconnects is the number of reconnections the client has made.
Reconnects int64
// PingsSent is the number of pings sent by the client
PingsSent int64
// PingsRecv is the number of pings received by the client
PingsRecv int64
}
// String produces a compact string representation of the client stats.
func (stats *ClientStats) String() string {
i := stats.Reads
ti := stats.TotalReads
o := stats.Writes
to := stats.TotalWrites
totalRun := (ti.Runtime * 1000000) / 1000000
run := (i.Runtime * 1000000) / 1000000
return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v",
i.Bytes, o.Bytes,
ti.Bytes, to.Bytes,
i.Ops, o.Ops,
ti.Ops, to.Ops,
i.Errors, o.Errors,
ti.Errors, to.Errors,
stats.Reconnects,
stats.PingsRecv, stats.PingsSent,
run, totalRun)
}
// CollectionStats combines statistics about a collection.
type CollectionStats struct {
Name string // Name of the collection
Count int // Count is the total number of documents in the collection
}
// String produces a compact string representation of the collection stat.
func (s *CollectionStats) String() string {
return fmt.Sprintf("%s[%d]", s.Name, s.Count)
}
// StatsTracker provides the basic tooling for tracking i/o stats.
type StatsTracker struct {
bytes int64
ops int64
errors int64
start time.Time
lock *sync.Mutex
}
// NewStatsTracker create a new stats tracker with start time set to now.
func NewStatsTracker() *StatsTracker {
return &StatsTracker{start: time.Now(), lock: new(sync.Mutex)}
}
// Op records an i/o operation. The parameters are passed through to
// allow easy chaining.
func (t *StatsTracker) Op(n int, err error) (int, error) {
t.lock.Lock()
defer t.lock.Unlock()
t.ops++
if err == nil {
t.bytes += int64(n)
} else {
if err == io.EOF {
// I don't think we should log EOF stats as an error
} else {
t.errors++
}
}
return n, err
}
// Snapshot takes a snapshot of the current reader statistics.
func (t *StatsTracker) Snapshot() *Stats {
t.lock.Lock()
defer t.lock.Unlock()
return t.snap()
}
// Reset sets all of the stats to initial values.
func (t *StatsTracker) Reset() *Stats {
t.lock.Lock()
defer t.lock.Unlock()
stats := t.snap()
t.bytes = 0
t.ops = 0
t.errors = 0
t.start = time.Now()
return stats
}
func (t *StatsTracker) snap() *Stats {
return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)}
}
// ReaderStats tracks statistics on any io.Reader.
// ReaderStats wraps a Reader and passes data to the actual data consumer.
type ReaderStats struct {
StatsTracker
ReaderProxy
}
// NewReaderStats creates a ReaderStats object for the provided reader.
func NewReaderStats(reader io.Reader) *ReaderStats {
return &ReaderStats{ReaderProxy: *NewReaderProxy(reader), StatsTracker: *NewStatsTracker()}
}
// Read passes through a read collecting statistics and logging activity.
func (r *ReaderStats) Read(p []byte) (int, error) {
return r.Op(r.reader.Read(p))
}
// WriterStats tracks statistics on any io.Writer.
// WriterStats wraps a Writer and passes data to the actual data producer.
type WriterStats struct {
StatsTracker
WriterProxy
}
// NewWriterStats creates a WriterStats object for the provided writer.
func NewWriterStats(writer io.Writer) *WriterStats {
return &WriterStats{WriterProxy: *NewWriterProxy(writer), StatsTracker: *NewStatsTracker()}
}
// Write passes through a write collecting statistics.
func (w *WriterStats) Write(p []byte) (int, error) {
if w.writer != nil {
return w.Op(w.writer.Write(p))
}
return 0, nil
}
|