diff options
Diffstat (limited to 'vendor/github.com/gopackage/ddp')
-rw-r--r-- | vendor/github.com/gopackage/ddp/.gitignore | 4 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/LICENSE | 2 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/README.md | 6 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/client.go (renamed from vendor/github.com/gopackage/ddp/ddp_client.go) | 118 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/collection.go (renamed from vendor/github.com/gopackage/ddp/ddp_collection.go) | 0 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/ddp_ejson.go | 217 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/ddp_stats.go | 321 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/doc.go | 6 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/messages.go (renamed from vendor/github.com/gopackage/ddp/ddp_messages.go) | 48 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/stats.go | 170 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/time.go | 49 | ||||
-rw-r--r-- | vendor/github.com/gopackage/ddp/utils.go (renamed from vendor/github.com/gopackage/ddp/ddp.go) | 42 |
12 files changed, 350 insertions, 633 deletions
diff --git a/vendor/github.com/gopackage/ddp/.gitignore b/vendor/github.com/gopackage/ddp/.gitignore index daf913b1..e71cbee4 100644 --- a/vendor/github.com/gopackage/ddp/.gitignore +++ b/vendor/github.com/gopackage/ddp/.gitignore @@ -22,3 +22,7 @@ _testmain.go *.exe *.test *.prof + +# Editors +.idea/ +.vscode/ diff --git a/vendor/github.com/gopackage/ddp/LICENSE b/vendor/github.com/gopackage/ddp/LICENSE index 03d77e8a..98a0fe76 100644 --- a/vendor/github.com/gopackage/ddp/LICENSE +++ b/vendor/github.com/gopackage/ddp/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2015, Metamech LLC. +Copyright (c) 2021, Metamech LLC. Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above diff --git a/vendor/github.com/gopackage/ddp/README.md b/vendor/github.com/gopackage/ddp/README.md index fd62c718..7ae713b9 100644 --- a/vendor/github.com/gopackage/ddp/README.md +++ b/vendor/github.com/gopackage/ddp/README.md @@ -1,3 +1,5 @@ -# ddp +# DDP in Go -MeteorJS DDP library for Golang +[Meteor](https://meteor.com) DDP library for Go. This library allows Go applications to connect to Meteor applications, subscribe to Meteor publications, read from a cached Collection (similar to minimongo), and call Meteor methods on the server. + +See `ddp/_examples` for some tips and an example app that walks through all the features of the library.
\ No newline at end of file diff --git a/vendor/github.com/gopackage/ddp/ddp_client.go b/vendor/github.com/gopackage/ddp/client.go index 8d6323b7..205dc433 100644 --- a/vendor/github.com/gopackage/ddp/ddp_client.go +++ b/vendor/github.com/gopackage/ddp/client.go @@ -2,14 +2,14 @@ package ddp import ( "encoding/json" + "errors" "fmt" "io" - "log" "sync" "time" + "github.com/apex/log" "golang.org/x/net/websocket" - "errors" ) const ( @@ -45,18 +45,14 @@ type Client struct { // ReconnectInterval is the time between reconnections on bad connections ReconnectInterval time.Duration - // writeStats controls statistics gathering for current websocket writes. + // writeSocketStats controls statistics gathering for current websocket writes. writeSocketStats *WriterStats // writeStats controls statistics gathering for overall client writes. writeStats *WriterStats - // writeLog controls logging for client writes. - writeLog *WriterLogger - // readStats controls statistics gathering for current websocket reads. + // readSocketStats controls statistics gathering for current websocket reads. readSocketStats *ReaderStats // readStats controls statistics gathering for overall client reads. readStats *ReaderStats - // readLog control logging for clietn reads. - readLog *ReaderLogger // reconnects in the number of reconnections the client has made reconnects int64 // pingsIn is the number of pings received from the server @@ -74,7 +70,7 @@ type Client struct { ws *websocket.Conn // encoder is a JSON encoder to send outgoing packets to the websocket. encoder *json.Encoder - // url the URL the websocket is connected to + // url the websocket is connected to url string // origin is the origin for the websocket connection origin string @@ -85,7 +81,7 @@ type Client struct { // pingTimer is a timer for sending regular pings to the server pingTimer *time.Timer // pings tracks inflight pings based on each ping ID. - pings map[string][]*pingTracker + pings map[string][]*PingTracker // calls tracks method invocations that are still in flight calls map[string]*Call // subs tracks active subscriptions. Map contains name->args @@ -104,8 +100,8 @@ type Client struct { // connectionListeners will be informed when a connection to the server is established connectionListeners []ConnectionListener - // idManager tracks IDs for ddp messages - idManager + // KeyManager tracks IDs for ddp messages + KeyManager } // NewClient creates a default client (using an internal websocket) to the @@ -115,7 +111,7 @@ type Client struct { // automatically and internally handle heartbeats and reconnects. // // TBD create an option to use an external websocket (aka htt.Transport) -// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport) +// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Transport) // TBD create an option to hijack the connection (aka http.Hijacker) // TBD create profiling features (aka net/http/pprof) func NewClient(url, origin string) *Client { @@ -128,7 +124,7 @@ func NewClient(url, origin string) *Client { origin: origin, inbox: make(chan map[string]interface{}, 100), errors: make(chan error, 100), - pings: map[string][]*pingTracker{}, + pings: map[string][]*PingTracker{}, calls: map[string]*Call{}, subs: map[string]*Call{}, connectionStatus: DISCONNECTED, @@ -140,14 +136,9 @@ func NewClient(url, origin string) *Client { readSocketStats: NewReaderStats(nil), readStats: NewReaderStats(nil), - // Loggers - writeLog: NewWriterTextLogger(nil), - readLog: NewReaderTextLogger(nil), - - idManager: *newidManager(), + KeyManager: *NewKeyManager(), } c.encoder = json.NewEncoder(c.writeStats) - c.SetSocketLogActive(false) // We spin off an inbox processing goroutine go c.inboxManager() @@ -192,10 +183,11 @@ func (c *Client) Connect() error { ws, err := websocket.Dial(c.url, "", c.origin) if err != nil { c.Close() - log.Println("Dial error", err) + log.WithError(err).Debug("dial error") c.reconnectLater() return err } + log.Debug("dialed") // Start DDP connection c.start(ws, NewConnect()) return nil @@ -225,7 +217,7 @@ func (c *Client) Reconnect() { ws, err := websocket.Dial(c.url, "", c.origin) if err != nil { c.Close() - log.Println("Dial error", err) + log.WithError(err).Debug("Dial error") c.reconnectLater() return } @@ -240,23 +232,23 @@ func (c *Client) Reconnect() { // Send calls that haven't been confirmed - may not have been sent // and effects should be idempotent for _, call := range c.calls { - c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{}))) + IgnoreErr(c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{}))), "resend method") } // Resend subscriptions and patch up collections for _, sub := range c.subs { - c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{}))) + IgnoreErr(c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{}))), "resend sub") } } -// Subscribe subscribes to data updates. +// Subscribe to data updates. func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call { if args == nil { args = []interface{}{} } call := new(Call) - call.ID = c.newID() + call.ID = c.Next() call.ServiceMethod = subName call.Args = args call.Owner = c @@ -269,7 +261,7 @@ func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap(done) == 0 { - log.Panic("ddp.rpc: done channel is unbuffered") + log.Fatal("ddp.rpc: done channel is unbuffered") } } call.Done = done @@ -279,7 +271,7 @@ func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) subArgs := make([]interface{}, len(args)) copy(subArgs, args) - c.Send(NewSub(call.ID, subName, args)) + IgnoreErr(c.Send(NewSub(call.ID, subName, args)), "send sub") return call } @@ -302,7 +294,7 @@ func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) args = []interface{}{} } call := new(Call) - call.ID = c.newID() + call.ID = c.Next() call.ServiceMethod = serviceMethod call.Args = args call.Owner = c @@ -314,13 +306,13 @@ func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap(done) == 0 { - log.Panic("ddp.rpc: done channel is unbuffered") + log.Fatal("ddp.rpc: done channel is unbuffered") } } call.Done = done c.calls[call.ID] = call - c.Send(NewMethod(call.ID, serviceMethod, args)) + IgnoreErr(c.Send(NewMethod(call.ID, serviceMethod, args)), "send method") return call } @@ -332,10 +324,10 @@ func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, e } // Ping sends a heartbeat signal to the server. The Ping doesn't look for -// a response but may trigger the connection to reconnect if the ping timesout. +// a response but may trigger the connection to reconnect if the ping times out. // This is primarily useful for reviving an unresponsive Client connection. func (c *Client) Ping() { - c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) { + c.PingPong(c.Next(), c.HeartbeatTimeout, func(err error) { if err != nil { // Is there anything else we should or can do? c.reconnectLater() @@ -356,9 +348,9 @@ func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) c.pingsOut++ pings, ok := c.pings[id] if !ok { - pings = make([]*pingTracker, 0, 5) + pings = make([]*PingTracker, 0, 5) } - tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() { + tracker := &PingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() { handler(fmt.Errorf("ping timeout")) })} c.pings[id] = append(pings, tracker) @@ -380,7 +372,7 @@ func (c *Client) Close() { // Close websocket if c.ws != nil { - c.ws.Close() + IgnoreErr(c.ws.Close(), "close ws") c.ws = nil } for _, collection := range c.collections { @@ -413,18 +405,7 @@ func (c *Client) Stats() *ClientStats { } } -// SocketLogActive returns the current logging status for the socket. -func (c *Client) SocketLogActive() bool { - return c.writeLog.Active -} - -// SetSocketLogActive to true to enable logging of raw socket data. -func (c *Client) SetSocketLogActive(active bool) { - c.writeLog.Active = active - c.readLog.Active = active -} - -// CollectionByName retrieves a collection by it's name. +// CollectionByName retrieves a collection by its name. func (c *Client) CollectionByName(name string) Collection { collection, ok := c.collections[name] if !ok { @@ -443,23 +424,21 @@ func (c *Client) CollectionStats() []CollectionStats { return stats } -// start starts a new client connection on the provided websocket +// start a new client connection on the provided websocket func (c *Client) start(ws *websocket.Conn, connect *Connect) { c.status(CONNECTING) c.ws = ws - c.writeLog.SetWriter(ws) - c.writeSocketStats = NewWriterStats(c.writeLog) - c.writeStats.SetWriter(c.writeSocketStats) - c.readLog.SetReader(ws) - c.readSocketStats = NewReaderStats(c.readLog) - c.readStats.SetReader(c.readSocketStats) + c.writeSocketStats = NewWriterStats(c.ws) + c.writeStats.Writer = c.writeSocketStats + c.readSocketStats = NewReaderStats(c.ws) + c.readStats.Reader = c.readSocketStats // We spin off an inbox stuffing goroutine go c.inboxWorker(c.readStats) - c.Send(connect) + IgnoreErr(c.Send(connect), "send connect") } // inboxManager pulls messages from the inbox and routes them to appropriate @@ -470,16 +449,17 @@ func (c *Client) inboxManager() { case msg := <-c.inbox: // Message! //log.Println("Got message", msg) - mtype, ok := msg["msg"] + msgType, ok := msg["msg"] if ok { - switch mtype.(string) { + log.WithField("msg", msgType).Debug("recv") + switch msgType.(string) { // Connection management case "connected": c.status(CONNECTED) for _, collection := range c.collections { collection.init() } - c.version = "1" // Currently the only version we support + c.version = "1" // "1" is the only version we support c.session = msg["session"].(string) // Start automatic heartbeats c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() { @@ -498,9 +478,9 @@ func (c *Client) inboxManager() { // We received a ping - need to respond with a pong id, ok := msg["id"] if ok { - c.Send(NewPong(id.(string))) + IgnoreErr(c.Send(NewPong(id.(string))), "send id ping") } else { - c.Send(NewPong("")) + IgnoreErr(c.Send(NewPong("")), "send empty ping") } c.pingsIn++ case "pong": @@ -523,7 +503,7 @@ func (c *Client) inboxManager() { // Live Data case "nosub": - log.Println("Subscription returned a nosub error", msg) + log.WithField("msg", msg).Debug("sub returned a nosub error") // Clear related subscriptions sub, ok := msg["id"] if ok { @@ -531,7 +511,7 @@ func (c *Client) inboxManager() { runningSub := c.subs[id] if runningSub != nil { - runningSub.Error = errors.New("Subscription returned a nosub error") + runningSub.Error = errors.New("sub returned a nosub error") runningSub.done() delete(c.subs, id) } @@ -579,7 +559,7 @@ func (c *Client) inboxManager() { default: // Ignore? - log.Println("Server sent unexpected message", msg) + log.WithField("msg", msg).Debug("Server sent unexpected message") } } else { // Current Meteor server sends an undocumented DDP message @@ -591,14 +571,14 @@ func (c *Client) inboxManager() { case string: c.serverID = ID default: - log.Println("Server cluster node", serverID) + log.WithField("id", serverID).Debug("Server cluster node") } } else { - log.Println("Server sent message with no `msg` field", msg) + log.WithField("msg", msg).Debug("Server sent message with no `msg` field") } } case err := <-c.errors: - log.Println("Websocket error", err) + log.WithError(err).Warn("Websocket error") } } } @@ -632,7 +612,7 @@ func (c *Client) inboxWorker(ws io.Reader) { c.pingTimer.Reset(c.HeartbeatInterval) } if event == nil { - log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.") + log.Debug("Inbox worker found nil event. May be due to broken websocket. Reconnecting.") break } else { c.inbox <- event.(map[string]interface{}) @@ -642,7 +622,7 @@ func (c *Client) inboxWorker(ws io.Reader) { c.reconnectLater() } -// reconnectLater schedules a reconnect for later. We need to make sure that we don't +// reconnectLater schedules a reconnect action for later. We need to make sure that we don't // block, and that we don't reconnect more frequently than once every c.ReconnectInterval func (c *Client) reconnectLater() { c.Close() diff --git a/vendor/github.com/gopackage/ddp/ddp_collection.go b/vendor/github.com/gopackage/ddp/collection.go index f417e68a..f417e68a 100644 --- a/vendor/github.com/gopackage/ddp/ddp_collection.go +++ b/vendor/github.com/gopackage/ddp/collection.go diff --git a/vendor/github.com/gopackage/ddp/ddp_ejson.go b/vendor/github.com/gopackage/ddp/ddp_ejson.go deleted file mode 100644 index a3e1fec0..00000000 --- a/vendor/github.com/gopackage/ddp/ddp_ejson.go +++ /dev/null @@ -1,217 +0,0 @@ -package ddp - -import ( - "crypto/sha256" - "encoding/hex" - "io" - "strings" - "time" -) - -// ---------------------------------------------------------------------- -// EJSON document interface -// ---------------------------------------------------------------------- -// https://github.com/meteor/meteor/blob/devel/packages/ddp/DDP.md#appendix-ejson - -// Doc provides hides the complexity of ejson documents. -type Doc struct { - root interface{} -} - -// NewDoc creates a new document from a generic json parsed document. -func NewDoc(in interface{}) *Doc { - doc := &Doc{in} - return doc -} - -// Map locates a map[string]interface{} - json object - at a path -// or returns nil if not found. -func (d *Doc) Map(path string) map[string]interface{} { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case map[string]interface{}: - return m - default: - return nil - } - } - return nil -} - -// Array locates an []interface{} - json array - at a path -// or returns nil if not found. -func (d *Doc) Array(path string) []interface{} { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case []interface{}: - return m - default: - return nil - } - } - return nil -} - -// StringArray locates an []string - json array of strings - at a path -// or returns nil if not found. The string array will contain all string values -// in the array and skip any non-string entries. -func (d *Doc) StringArray(path string) []string { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case []interface{}: - items := []string{} - for _, item := range m { - switch val := item.(type) { - case string: - items = append(items, val) - } - } - return items - case []string: - return m - default: - return nil - } - } - return nil -} - -// String returns a string value located at the path or an empty string if not found. -func (d *Doc) String(path string) string { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case string: - return m - default: - return "" - } - } - return "" -} - -// Bool returns a boolean value located at the path or false if not found. -func (d *Doc) Bool(path string) bool { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case bool: - return m - default: - return false - } - } - return false -} - -// Float returns a float64 value located at the path or zero if not found. -func (d *Doc) Float(path string) float64 { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case float64: - return m - default: - return 0 - } - } - return 0 -} - -// Time returns a time value located at the path or nil if not found. -func (d *Doc) Time(path string) time.Time { - ticks := d.Float(path + ".$date") - var t time.Time - if ticks > 0 { - sec := int64(ticks / 1000) - t = time.Unix(int64(sec), 0) - } - return t -} - -// Item locates a "raw" item at the provided path, returning -// the item found or nil if not found. -func (d *Doc) Item(path string) interface{} { - item := d.root - steps := strings.Split(path, ".") - for _, step := range steps { - // This is an intermediate step - we must be in a map - switch m := item.(type) { - case map[string]interface{}: - item = m[step] - case Update: - item = m[step] - default: - return nil - } - } - return item -} - -// Set a value for a path. Intermediate items are created as necessary. -func (d *Doc) Set(path string, value interface{}) { - item := d.root - steps := strings.Split(path, ".") - last := steps[len(steps)-1] - steps = steps[:len(steps)-1] - for _, step := range steps { - // This is an intermediate step - we must be in a map - switch m := item.(type) { - case map[string]interface{}: - item = m[step] - if item == nil { - item = map[string]interface{}{} - m[step] = item - } - default: - return - } - } - // Item is now the last map so we just set the value - switch m := item.(type) { - case map[string]interface{}: - m[last] = value - } -} - -// Accounts password login support -type Login struct { - User *User `json:"user"` - Password *Password `json:"password"` -} - -func NewEmailLogin(email, pass string) *Login { - return &Login{User: &User{Email: email}, Password: NewPassword(pass)} -} - -func NewUsernameLogin(user, pass string) *Login { - return &Login{User: &User{Username: user}, Password: NewPassword(pass)} -} - -type LoginResume struct { - Token string `json:"resume"` -} - -func NewLoginResume(token string) *LoginResume { - return &LoginResume{Token: token} -} - -type User struct { - Email string `json:"email,omitempty"` - Username string `json:"username,omitempty"` -} - -type Password struct { - Digest string `json:"digest"` - Algorithm string `json:"algorithm"` -} - -func NewPassword(pass string) *Password { - sha := sha256.New() - io.WriteString(sha, pass) - digest := sha.Sum(nil) - return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"} -} diff --git a/vendor/github.com/gopackage/ddp/ddp_stats.go b/vendor/github.com/gopackage/ddp/ddp_stats.go deleted file mode 100644 index 1546b547..00000000 --- a/vendor/github.com/gopackage/ddp/ddp_stats.go +++ /dev/null @@ -1,321 +0,0 @@ -package ddp - -import ( - "encoding/hex" - "fmt" - "io" - "log" - "os" - "sync" - "time" -) - -// Gather statistics about a DDP connection. - -// --------------------------------------------------------- -// io utilities -// -// This is generic - should be moved into a stand alone lib -// --------------------------------------------------------- - -// ReaderProxy provides common tooling for structs that manage an io.Reader. -type ReaderProxy struct { - reader io.Reader -} - -// NewReaderProxy creates a new proxy for the provided reader. -func NewReaderProxy(reader io.Reader) *ReaderProxy { - return &ReaderProxy{reader} -} - -// SetReader sets the reader on the proxy. -func (r *ReaderProxy) SetReader(reader io.Reader) { - r.reader = reader -} - -// WriterProxy provides common tooling for structs that manage an io.Writer. -type WriterProxy struct { - writer io.Writer -} - -// NewWriterProxy creates a new proxy for the provided writer. -func NewWriterProxy(writer io.Writer) *WriterProxy { - return &WriterProxy{writer} -} - -// SetWriter sets the writer on the proxy. -func (w *WriterProxy) SetWriter(writer io.Writer) { - w.writer = writer -} - -// Logging data types -const ( - DataByte = iota // data is raw []byte - DataText // data is string data -) - -// Logger logs data from i/o sources. -type Logger struct { - // Active is true if the logger should be logging reads - Active bool - // Truncate is >0 to indicate the number of characters to truncate output - Truncate int - - logger *log.Logger - dtype int -} - -// NewLogger creates a new i/o logger. -func NewLogger(logger *log.Logger, active bool, dataType int, truncate int) Logger { - return Logger{logger: logger, Active: active, dtype: dataType, Truncate: truncate} -} - -// Log logs the current i/o operation and returns the read and error for -// easy call chaining. -func (l *Logger) Log(p []byte, n int, err error) (int, error) { - if l.Active && err == nil { - limit := n - truncated := false - if l.Truncate > 0 && l.Truncate < limit { - limit = l.Truncate - truncated = true - } - switch l.dtype { - case DataText: - if truncated { - l.logger.Printf("[%d] %s...", n, string(p[:limit])) - } else { - l.logger.Printf("[%d] %s", n, string(p[:limit])) - } - case DataByte: - fallthrough - default: - l.logger.Println(hex.Dump(p[:limit])) - } - } - return n, err -} - -// ReaderLogger logs data from any io.Reader. -// ReaderLogger wraps a Reader and passes data to the actual data consumer. -type ReaderLogger struct { - Logger - ReaderProxy -} - -// NewReaderDataLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewReaderDataLogger(reader io.Reader) *ReaderLogger { - logger := log.New(os.Stdout, "<- ", log.LstdFlags) - return NewReaderLogger(reader, logger, true, DataByte, 0) -} - -// NewReaderTextLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewReaderTextLogger(reader io.Reader) *ReaderLogger { - logger := log.New(os.Stdout, "<- ", log.LstdFlags) - return NewReaderLogger(reader, logger, true, DataText, 80) -} - -// NewReaderLogger creates a Reader logger for the provided parameters. -func NewReaderLogger(reader io.Reader, logger *log.Logger, active bool, dataType int, truncate int) *ReaderLogger { - return &ReaderLogger{ReaderProxy: *NewReaderProxy(reader), Logger: NewLogger(logger, active, dataType, truncate)} -} - -// Read logs the read bytes and passes them to the wrapped reader. -func (r *ReaderLogger) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - return r.Log(p, n, err) -} - -// WriterLogger logs data from any io.Writer. -// WriterLogger wraps a Writer and passes data to the actual data producer. -type WriterLogger struct { - Logger - WriterProxy -} - -// NewWriterDataLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewWriterDataLogger(writer io.Writer) *WriterLogger { - logger := log.New(os.Stdout, "-> ", log.LstdFlags) - return NewWriterLogger(writer, logger, true, DataByte, 0) -} - -// NewWriterTextLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewWriterTextLogger(writer io.Writer) *WriterLogger { - logger := log.New(os.Stdout, "-> ", log.LstdFlags) - return NewWriterLogger(writer, logger, true, DataText, 80) -} - -// NewWriterLogger creates a Reader logger for the provided parameters. -func NewWriterLogger(writer io.Writer, logger *log.Logger, active bool, dataType int, truncate int) *WriterLogger { - return &WriterLogger{WriterProxy: *NewWriterProxy(writer), Logger: NewLogger(logger, active, dataType, truncate)} -} - -// Write logs the written bytes and passes them to the wrapped writer. -func (w *WriterLogger) Write(p []byte) (int, error) { - if w.writer != nil { - n, err := w.writer.Write(p) - return w.Log(p, n, err) - } - return 0, nil -} - -// Stats tracks statistics for i/o operations. Stats are produced from a -// of a running stats agent. -type Stats struct { - // Bytes is the total number of bytes transferred. - Bytes int64 - // Ops is the total number of i/o operations performed. - Ops int64 - // Errors is the total number of i/o errors encountered. - Errors int64 - // Runtime is the duration that stats have been gathered. - Runtime time.Duration -} - -// ClientStats displays combined statistics for the Client. -type ClientStats struct { - // Reads provides statistics on the raw i/o network reads for the current connection. - Reads *Stats - // Reads provides statistics on the raw i/o network reads for the all client connections. - TotalReads *Stats - // Writes provides statistics on the raw i/o network writes for the current connection. - Writes *Stats - // Writes provides statistics on the raw i/o network writes for all the client connections. - TotalWrites *Stats - // Reconnects is the number of reconnections the client has made. - Reconnects int64 - // PingsSent is the number of pings sent by the client - PingsSent int64 - // PingsRecv is the number of pings received by the client - PingsRecv int64 -} - -// String produces a compact string representation of the client stats. -func (stats *ClientStats) String() string { - i := stats.Reads - ti := stats.TotalReads - o := stats.Writes - to := stats.TotalWrites - totalRun := (ti.Runtime * 1000000) / 1000000 - run := (i.Runtime * 1000000) / 1000000 - return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v", - i.Bytes, o.Bytes, - ti.Bytes, to.Bytes, - i.Ops, o.Ops, - ti.Ops, to.Ops, - i.Errors, o.Errors, - ti.Errors, to.Errors, - stats.Reconnects, - stats.PingsRecv, stats.PingsSent, - run, totalRun) -} - -// CollectionStats combines statistics about a collection. -type CollectionStats struct { - Name string // Name of the collection - Count int // Count is the total number of documents in the collection -} - -// String produces a compact string representation of the collection stat. -func (s *CollectionStats) String() string { - return fmt.Sprintf("%s[%d]", s.Name, s.Count) -} - -// StatsTracker provides the basic tooling for tracking i/o stats. -type StatsTracker struct { - bytes int64 - ops int64 - errors int64 - start time.Time - lock *sync.Mutex -} - -// NewStatsTracker create a new stats tracker with start time set to now. -func NewStatsTracker() *StatsTracker { - return &StatsTracker{start: time.Now(), lock: new(sync.Mutex)} -} - -// Op records an i/o operation. The parameters are passed through to -// allow easy chaining. -func (t *StatsTracker) Op(n int, err error) (int, error) { - t.lock.Lock() - defer t.lock.Unlock() - t.ops++ - if err == nil { - t.bytes += int64(n) - } else { - if err == io.EOF { - // I don't think we should log EOF stats as an error - } else { - t.errors++ - } - } - - return n, err -} - -// Snapshot takes a snapshot of the current reader statistics. -func (t *StatsTracker) Snapshot() *Stats { - t.lock.Lock() - defer t.lock.Unlock() - return t.snap() -} - -// Reset sets all of the stats to initial values. -func (t *StatsTracker) Reset() *Stats { - t.lock.Lock() - defer t.lock.Unlock() - - stats := t.snap() - t.bytes = 0 - t.ops = 0 - t.errors = 0 - t.start = time.Now() - - return stats -} - -func (t *StatsTracker) snap() *Stats { - return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)} -} - -// ReaderStats tracks statistics on any io.Reader. -// ReaderStats wraps a Reader and passes data to the actual data consumer. -type ReaderStats struct { - StatsTracker - ReaderProxy -} - -// NewReaderStats creates a ReaderStats object for the provided reader. -func NewReaderStats(reader io.Reader) *ReaderStats { - return &ReaderStats{ReaderProxy: *NewReaderProxy(reader), StatsTracker: *NewStatsTracker()} -} - -// Read passes through a read collecting statistics and logging activity. -func (r *ReaderStats) Read(p []byte) (int, error) { - return r.Op(r.reader.Read(p)) -} - -// WriterStats tracks statistics on any io.Writer. -// WriterStats wraps a Writer and passes data to the actual data producer. -type WriterStats struct { - StatsTracker - WriterProxy -} - -// NewWriterStats creates a WriterStats object for the provided writer. -func NewWriterStats(writer io.Writer) *WriterStats { - return &WriterStats{WriterProxy: *NewWriterProxy(writer), StatsTracker: *NewStatsTracker()} -} - -// Write passes through a write collecting statistics. -func (w *WriterStats) Write(p []byte) (int, error) { - if w.writer != nil { - return w.Op(w.writer.Write(p)) - } - return 0, nil -} diff --git a/vendor/github.com/gopackage/ddp/doc.go b/vendor/github.com/gopackage/ddp/doc.go new file mode 100644 index 00000000..97f1b63e --- /dev/null +++ b/vendor/github.com/gopackage/ddp/doc.go @@ -0,0 +1,6 @@ +// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback +// to long polling is NOT supported (and is not planned on ever being supported +// by this library). We will try to model the library after `net/http` - right +// now the library is bare bones and doesn't provide the plug-ability of http. +// However, that's the goal for the package eventually. +package ddp diff --git a/vendor/github.com/gopackage/ddp/ddp_messages.go b/vendor/github.com/gopackage/ddp/messages.go index 68c9eab4..fc127cee 100644 --- a/vendor/github.com/gopackage/ddp/ddp_messages.go +++ b/vendor/github.com/gopackage/ddp/messages.go @@ -1,9 +1,15 @@ package ddp +import ( + "crypto/sha256" + "encoding/hex" + "io" +) + // ------------------------------------------------------------ // DDP Messages // -// Go structs representing DDP raw messages ready for JSON +// Go structs representing common DDP raw messages ready for JSON // encoding. // ------------------------------------------------------------ @@ -80,3 +86,43 @@ func NewSub(id, subName string, args []interface{}) *Sub { Args: args, } } + + +// Login provides a Meteor.Accounts password login support +type Login struct { + User *User `json:"user"` + Password *Password `json:"password"` +} + +func NewEmailLogin(email, pass string) *Login { + return &Login{User: &User{Email: email}, Password: NewPassword(pass)} +} + +func NewUsernameLogin(user, pass string) *Login { + return &Login{User: &User{Username: user}, Password: NewPassword(pass)} +} + +type LoginResume struct { + Token string `json:"resume"` +} + +func NewLoginResume(token string) *LoginResume { + return &LoginResume{Token: token} +} + +type User struct { + Email string `json:"email,omitempty"` + Username string `json:"username,omitempty"` +} + +type Password struct { + Digest string `json:"digest"` + Algorithm string `json:"algorithm"` +} + +func NewPassword(pass string) *Password { + sha := sha256.New() + io.WriteString(sha, pass) + digest := sha.Sum(nil) + return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"} +} diff --git a/vendor/github.com/gopackage/ddp/stats.go b/vendor/github.com/gopackage/ddp/stats.go new file mode 100644 index 00000000..b2548098 --- /dev/null +++ b/vendor/github.com/gopackage/ddp/stats.go @@ -0,0 +1,170 @@ +package ddp + +import ( + "fmt" + "io" + "sync" + "time" +) + +// Gather statistics about a DDP connection. + +// Stats tracks statistics for i/o operations. +type Stats struct { + // Bytes is the total number of bytes transferred. + Bytes int64 + // Ops is the total number of i/o operations performed. + Ops int64 + // Errors is the total number of i/o errors encountered. + Errors int64 + // Runtime is the duration that stats have been gathered. + Runtime time.Duration +} + +// ClientStats displays combined statistics for the Client. +type ClientStats struct { + // Reads provides statistics on the raw i/o network reads for the current connection. + Reads *Stats + // Reads provides statistics on the raw i/o network reads for the all client connections. + TotalReads *Stats + // Writes provides statistics on the raw i/o network writes for the current connection. + Writes *Stats + // Writes provides statistics on the raw i/o network writes for all the client connections. + TotalWrites *Stats + // Reconnects is the number of reconnections the client has made. + Reconnects int64 + // PingsSent is the number of pings sent by the client + PingsSent int64 + // PingsRecv is the number of pings received by the client + PingsRecv int64 +} + +// String produces a compact string representation of the client stats. +func (stats *ClientStats) String() string { + i := stats.Reads + ti := stats.TotalReads + o := stats.Writes + to := stats.TotalWrites + totalRun := (ti.Runtime * 1000000) / 1000000 + run := (i.Runtime * 1000000) / 1000000 + return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v", + i.Bytes, o.Bytes, + ti.Bytes, to.Bytes, + i.Ops, o.Ops, + ti.Ops, to.Ops, + i.Errors, o.Errors, + ti.Errors, to.Errors, + stats.Reconnects, + stats.PingsRecv, stats.PingsSent, + run, totalRun) +} + +// CollectionStats combines statistics about a collection. +type CollectionStats struct { + Name string // Name of the collection + Count int // Count is the total number of documents in the collection +} + +// String produces a compact string representation of the collection stat. +func (s *CollectionStats) String() string { + return fmt.Sprintf("%s[%d]", s.Name, s.Count) +} + +// StatsTracker provides the basic tooling for tracking i/o stats. +type StatsTracker struct { + bytes int64 + ops int64 + errors int64 + start time.Time + lock sync.Mutex +} + +// NewStatsTracker create a new tracker with start time set to now. +func NewStatsTracker() *StatsTracker { + return &StatsTracker{start: time.Now()} +} + +// Op records an i/o operation. The parameters are passed through to +// allow easy chaining. +func (t *StatsTracker) Op(n int, err error) (int, error) { + t.lock.Lock() + defer t.lock.Unlock() + t.ops++ + if err == nil { + t.bytes += int64(n) + } else { + if err == io.EOF { + // I don't think we should log EOF stats as an error + } else { + t.errors++ + } + } + + return n, err +} + +// Snapshot takes a snapshot of the current Reader statistics. +func (t *StatsTracker) Snapshot() *Stats { + t.lock.Lock() + defer t.lock.Unlock() + return t.snap() +} + +// Reset all stats to initial values. +func (t *StatsTracker) Reset() *Stats { + t.lock.Lock() + defer t.lock.Unlock() + + stats := t.snap() + t.bytes = 0 + t.ops = 0 + t.errors = 0 + t.start = time.Now() + + return stats +} + +func (t *StatsTracker) snap() *Stats { + return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)} +} + +// ReaderStats tracks statistics on any io.Reader. +// ReaderStats wraps a Reader and passes data to the actual data consumer. +type ReaderStats struct { + StatsTracker + Reader io.Reader +} + +// NewReaderStats creates a ReaderStats object for the provided Reader. +func NewReaderStats(reader io.Reader) *ReaderStats { + r := &ReaderStats{Reader: reader} + r.Reset() + return r +} + +// Read passes through a read collecting statistics and logging activity. +func (r *ReaderStats) Read(p []byte) (int, error) { + return r.Op(r.Reader.Read(p)) +} + +// WriterStats tracks statistics on any io.Writer. +// WriterStats wraps a Writer and passes data to the actual data producer. +type WriterStats struct { + StatsTracker + Writer io.Writer +} + +// NewWriterStats creates a WriterStats object for the provided Writer. +func NewWriterStats(writer io.Writer) *WriterStats { + w := &WriterStats{Writer: writer} + w.Reset() + return w +} + +// Write collects Writer statistics. +func (w *WriterStats) Write(p []byte) (int, error) { + if w.Writer != nil { + return w.Op(w.Writer.Write(p)) + } + return 0, nil +} diff --git a/vendor/github.com/gopackage/ddp/time.go b/vendor/github.com/gopackage/ddp/time.go new file mode 100644 index 00000000..584f9ce0 --- /dev/null +++ b/vendor/github.com/gopackage/ddp/time.go @@ -0,0 +1,49 @@ +package ddp + +import ( + "encoding/json" + "io" + "strconv" + "time" +) + +// utcOffset in milliseconds for the current local time (east of UTC). +var utcOffset int64 + +func init() { + _, offsetSeconds := time.Now().Zone() + utcOffset = int64(offsetSeconds * 1000) +} + +// Time is an alias for time.Time with custom json marshalling implementations to support ejson. +type Time struct { + time.Time +} + +// UnixMilli creates a new Time from the given unix millis but in UTC (as opposed to time.UnixMilli which returns +// time in the local time zone). This supports the proper loading of times from EJSON $date objects. +func UnixMilli(i int64) Time { + return Time{Time: time.UnixMilli(i - utcOffset)} +} + +func (t *Time) UnmarshalJSON(b []byte) error { + var data map[string]float64 + err := json.Unmarshal(b, &data) + if err != nil { + return err + } + val, ok := data["$date"] + if !ok { + return io.ErrUnexpectedEOF + } + // The time MUST be UTC but time.UnixMilli uses local time. + // We see what time it is in local time and calculate the offset to UTC + *t = UnixMilli(int64(val)) + + return nil +} + +func (t Time) MarshalJSON() ([]byte, error) { + return []byte("{\"$date\":" + strconv.FormatInt(t.UnixMilli(), 10) + "}"), nil +} + diff --git a/vendor/github.com/gopackage/ddp/ddp.go b/vendor/github.com/gopackage/ddp/utils.go index 910adafd..7ff16a20 100644 --- a/vendor/github.com/gopackage/ddp/ddp.go +++ b/vendor/github.com/gopackage/ddp/utils.go @@ -1,39 +1,32 @@ -// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback -// to longpolling is NOT supported (and is not planned on ever being supported -// by this library). We will try to model the library after `net/http` - right -// now the library is barebones and doesn't provide the pluggability of http. -// However, that's the goal for the package eventually. package ddp import ( "fmt" - "log" "sync" "time" -) -// debugLog is true if we should log debugging information about the connection -var debugLog = true + "github.com/apex/log" +) -// The main file contains common utility types. +// Contains common utility types. // ------------------------------------------------------------------- -// idManager provides simple incrementing IDs for ddp messages. -type idManager struct { +// KeyManager provides simple incrementing IDs for ddp messages. +type KeyManager struct { // nextID is the next ID for API calls nextID uint64 // idMutex is a mutex to protect ID updates idMutex *sync.Mutex } -// newidManager creates a new instance and sets up resources. -func newidManager() *idManager { - return &idManager{idMutex: new(sync.Mutex)} +// NewKeyManager creates a new instance and sets up resources. +func NewKeyManager() *KeyManager { + return &KeyManager{idMutex: new(sync.Mutex)} } -// newID issues a new ID for use in calls. -func (id *idManager) newID() string { +// Next issues a new ID for use in calls. +func (id *KeyManager) Next() string { id.idMutex.Lock() next := id.nextID id.nextID++ @@ -43,8 +36,8 @@ func (id *idManager) newID() string { // ------------------------------------------------------------------- -// pingTracker tracks in-flight pings. -type pingTracker struct { +// PingTracker tracks in-flight pings. +type PingTracker struct { handler func(error) timeout time.Duration timer *time.Timer @@ -72,8 +65,13 @@ func (call *Call) done() { default: // We don't want to block here. It is the caller's responsibility to make // sure the channel has enough buffer space. See comment in Go(). - if debugLog { - log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") - } + log.Debug("rpc: discarding Call reply due to insufficient Done chan capacity") } } + +// IgnoreErr logs an error if it occurs and ignores it. +func IgnoreErr(err error, msg string) { + if err != nil { + log.WithError(err).Debug(msg) + } +}
\ No newline at end of file |