summaryrefslogblamecommitdiffstats
path: root/vendor/github.com/gopackage/ddp/ddp_stats.go
blob: 1546b5479492fb5ccc0abf703fe8f7f9ac02d73e (plain) (tree)































































































































































































































































































































                                                                                                                                
package ddp

import (
	"encoding/hex"
	"fmt"
	"io"
	"log"
	"os"
	"sync"
	"time"
)

// Gather statistics about a DDP connection.

// ---------------------------------------------------------
// io utilities
//
// This is generic - should be moved into a stand alone lib
// ---------------------------------------------------------

// ReaderProxy provides common tooling for structs that manage an io.Reader.
type ReaderProxy struct {
	reader io.Reader
}

// NewReaderProxy creates a new proxy for the provided reader.
func NewReaderProxy(reader io.Reader) *ReaderProxy {
	return &ReaderProxy{reader}
}

// SetReader sets the reader on the proxy.
func (r *ReaderProxy) SetReader(reader io.Reader) {
	r.reader = reader
}

// WriterProxy provides common tooling for structs that manage an io.Writer.
type WriterProxy struct {
	writer io.Writer
}

// NewWriterProxy creates a new proxy for the provided writer.
func NewWriterProxy(writer io.Writer) *WriterProxy {
	return &WriterProxy{writer}
}

// SetWriter sets the writer on the proxy.
func (w *WriterProxy) SetWriter(writer io.Writer) {
	w.writer = writer
}

// Logging data types
const (
	DataByte = iota // data is raw []byte
	DataText        // data is string data
)

// Logger logs data from i/o sources.
type Logger struct {
	// Active is true if the logger should be logging reads
	Active bool
	// Truncate is >0 to indicate the number of characters to truncate output
	Truncate int

	logger *log.Logger
	dtype  int
}

// NewLogger creates a new i/o logger.
func NewLogger(logger *log.Logger, active bool, dataType int, truncate int) Logger {
	return Logger{logger: logger, Active: active, dtype: dataType, Truncate: truncate}
}

// Log logs the current i/o operation and returns the read and error for
// easy call chaining.
func (l *Logger) Log(p []byte, n int, err error) (int, error) {
	if l.Active && err == nil {
		limit := n
		truncated := false
		if l.Truncate > 0 && l.Truncate < limit {
			limit = l.Truncate
			truncated = true
		}
		switch l.dtype {
		case DataText:
			if truncated {
				l.logger.Printf("[%d] %s...", n, string(p[:limit]))
			} else {
				l.logger.Printf("[%d] %s", n, string(p[:limit]))
			}
		case DataByte:
			fallthrough
		default:
			l.logger.Println(hex.Dump(p[:limit]))
		}
	}
	return n, err
}

// ReaderLogger logs data from any io.Reader.
// ReaderLogger wraps a Reader and passes data to the actual data consumer.
type ReaderLogger struct {
	Logger
	ReaderProxy
}

// NewReaderDataLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewReaderDataLogger(reader io.Reader) *ReaderLogger {
	logger := log.New(os.Stdout, "<- ", log.LstdFlags)
	return NewReaderLogger(reader, logger, true, DataByte, 0)
}

// NewReaderTextLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewReaderTextLogger(reader io.Reader) *ReaderLogger {
	logger := log.New(os.Stdout, "<- ", log.LstdFlags)
	return NewReaderLogger(reader, logger, true, DataText, 80)
}

// NewReaderLogger creates a Reader logger for the provided parameters.
func NewReaderLogger(reader io.Reader, logger *log.Logger, active bool, dataType int, truncate int) *ReaderLogger {
	return &ReaderLogger{ReaderProxy: *NewReaderProxy(reader), Logger: NewLogger(logger, active, dataType, truncate)}
}

// Read logs the read bytes and passes them to the wrapped reader.
func (r *ReaderLogger) Read(p []byte) (int, error) {
	n, err := r.reader.Read(p)
	return r.Log(p, n, err)
}

// WriterLogger logs data from any io.Writer.
// WriterLogger wraps a Writer and passes data to the actual data producer.
type WriterLogger struct {
	Logger
	WriterProxy
}

// NewWriterDataLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewWriterDataLogger(writer io.Writer) *WriterLogger {
	logger := log.New(os.Stdout, "-> ", log.LstdFlags)
	return NewWriterLogger(writer, logger, true, DataByte, 0)
}

// NewWriterTextLogger creates an active binary data logger with a default
// log.Logger and a '->' prefix.
func NewWriterTextLogger(writer io.Writer) *WriterLogger {
	logger := log.New(os.Stdout, "-> ", log.LstdFlags)
	return NewWriterLogger(writer, logger, true, DataText, 80)
}

// NewWriterLogger creates a Reader logger for the provided parameters.
func NewWriterLogger(writer io.Writer, logger *log.Logger, active bool, dataType int, truncate int) *WriterLogger {
	return &WriterLogger{WriterProxy: *NewWriterProxy(writer), Logger: NewLogger(logger, active, dataType, truncate)}
}

// Write logs the written bytes and passes them to the wrapped writer.
func (w *WriterLogger) Write(p []byte) (int, error) {
	if w.writer != nil {
		n, err := w.writer.Write(p)
		return w.Log(p, n, err)
	}
	return 0, nil
}

// Stats tracks statistics for i/o operations. Stats are produced from a
// of a running stats agent.
type Stats struct {
	// Bytes is the total number of bytes transferred.
	Bytes int64
	// Ops is the total number of i/o operations performed.
	Ops int64
	// Errors is the total number of i/o errors encountered.
	Errors int64
	// Runtime is the duration that stats have been gathered.
	Runtime time.Duration
}

// ClientStats displays combined statistics for the Client.
type ClientStats struct {
	// Reads provides statistics on the raw i/o network reads for the current connection.
	Reads *Stats
	// Reads provides statistics on the raw i/o network reads for the all client connections.
	TotalReads *Stats
	// Writes provides statistics on the raw i/o network writes for the current connection.
	Writes *Stats
	// Writes provides statistics on the raw i/o network writes for all the client connections.
	TotalWrites *Stats
	// Reconnects is the number of reconnections the client has made.
	Reconnects int64
	// PingsSent is the number of pings sent by the client
	PingsSent int64
	// PingsRecv is the number of pings received by the client
	PingsRecv int64
}

// String produces a compact string representation of the client stats.
func (stats *ClientStats) String() string {
	i := stats.Reads
	ti := stats.TotalReads
	o := stats.Writes
	to := stats.TotalWrites
	totalRun := (ti.Runtime * 1000000) / 1000000
	run := (i.Runtime * 1000000) / 1000000
	return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v",
		i.Bytes, o.Bytes,
		ti.Bytes, to.Bytes,
		i.Ops, o.Ops,
		ti.Ops, to.Ops,
		i.Errors, o.Errors,
		ti.Errors, to.Errors,
		stats.Reconnects,
		stats.PingsRecv, stats.PingsSent,
		run, totalRun)
}

// CollectionStats combines statistics about a collection.
type CollectionStats struct {
	Name  string // Name of the collection
	Count int    // Count is the total number of documents in the collection
}

// String produces a compact string representation of the collection stat.
func (s *CollectionStats) String() string {
	return fmt.Sprintf("%s[%d]", s.Name, s.Count)
}

// StatsTracker provides the basic tooling for tracking i/o stats.
type StatsTracker struct {
	bytes  int64
	ops    int64
	errors int64
	start  time.Time
	lock   *sync.Mutex
}

// NewStatsTracker create a new stats tracker with start time set to now.
func NewStatsTracker() *StatsTracker {
	return &StatsTracker{start: time.Now(), lock: new(sync.Mutex)}
}

// Op records an i/o operation. The parameters are passed through to
// allow easy chaining.
func (t *StatsTracker) Op(n int, err error) (int, error) {
	t.lock.Lock()
	defer t.lock.Unlock()
	t.ops++
	if err == nil {
		t.bytes += int64(n)
	} else {
		if err == io.EOF {
			// I don't think we should log EOF stats as an error
		} else {
			t.errors++
		}
	}

	return n, err
}

// Snapshot takes a snapshot of the current reader statistics.
func (t *StatsTracker) Snapshot() *Stats {
	t.lock.Lock()
	defer t.lock.Unlock()
	return t.snap()
}

// Reset sets all of the stats to initial values.
func (t *StatsTracker) Reset() *Stats {
	t.lock.Lock()
	defer t.lock.Unlock()

	stats := t.snap()
	t.bytes = 0
	t.ops = 0
	t.errors = 0
	t.start = time.Now()

	return stats
}

func (t *StatsTracker) snap() *Stats {
	return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)}
}

// ReaderStats tracks statistics on any io.Reader.
// ReaderStats wraps a Reader and passes data to the actual data consumer.
type ReaderStats struct {
	StatsTracker
	ReaderProxy
}

// NewReaderStats creates a ReaderStats object for the provided reader.
func NewReaderStats(reader io.Reader) *ReaderStats {
	return &ReaderStats{ReaderProxy: *NewReaderProxy(reader), StatsTracker: *NewStatsTracker()}
}

// Read passes through a read collecting statistics and logging activity.
func (r *ReaderStats) Read(p []byte) (int, error) {
	return r.Op(r.reader.Read(p))
}

// WriterStats tracks statistics on any io.Writer.
// WriterStats wraps a Writer and passes data to the actual data producer.
type WriterStats struct {
	StatsTracker
	WriterProxy
}

// NewWriterStats creates a WriterStats object for the provided writer.
func NewWriterStats(writer io.Writer) *WriterStats {
	return &WriterStats{WriterProxy: *NewWriterProxy(writer), StatsTracker: *NewStatsTracker()}
}

// Write passes through a write collecting statistics.
func (w *WriterStats) Write(p []byte) (int, error) {
	if w.writer != nil {
		return w.Op(w.writer.Write(p))
	}
	return 0, nil
}