summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/gopackage/ddp
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/gopackage/ddp')
-rw-r--r--vendor/github.com/gopackage/ddp/.gitignore4
-rw-r--r--vendor/github.com/gopackage/ddp/LICENSE2
-rw-r--r--vendor/github.com/gopackage/ddp/README.md6
-rw-r--r--vendor/github.com/gopackage/ddp/client.go (renamed from vendor/github.com/gopackage/ddp/ddp_client.go)118
-rw-r--r--vendor/github.com/gopackage/ddp/collection.go (renamed from vendor/github.com/gopackage/ddp/ddp_collection.go)0
-rw-r--r--vendor/github.com/gopackage/ddp/ddp_ejson.go217
-rw-r--r--vendor/github.com/gopackage/ddp/ddp_stats.go321
-rw-r--r--vendor/github.com/gopackage/ddp/doc.go6
-rw-r--r--vendor/github.com/gopackage/ddp/messages.go (renamed from vendor/github.com/gopackage/ddp/ddp_messages.go)48
-rw-r--r--vendor/github.com/gopackage/ddp/stats.go170
-rw-r--r--vendor/github.com/gopackage/ddp/time.go49
-rw-r--r--vendor/github.com/gopackage/ddp/utils.go (renamed from vendor/github.com/gopackage/ddp/ddp.go)42
12 files changed, 350 insertions, 633 deletions
diff --git a/vendor/github.com/gopackage/ddp/.gitignore b/vendor/github.com/gopackage/ddp/.gitignore
index daf913b1..e71cbee4 100644
--- a/vendor/github.com/gopackage/ddp/.gitignore
+++ b/vendor/github.com/gopackage/ddp/.gitignore
@@ -22,3 +22,7 @@ _testmain.go
*.exe
*.test
*.prof
+
+# Editors
+.idea/
+.vscode/
diff --git a/vendor/github.com/gopackage/ddp/LICENSE b/vendor/github.com/gopackage/ddp/LICENSE
index 03d77e8a..98a0fe76 100644
--- a/vendor/github.com/gopackage/ddp/LICENSE
+++ b/vendor/github.com/gopackage/ddp/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2015, Metamech LLC.
+Copyright (c) 2021, Metamech LLC.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
diff --git a/vendor/github.com/gopackage/ddp/README.md b/vendor/github.com/gopackage/ddp/README.md
index fd62c718..7ae713b9 100644
--- a/vendor/github.com/gopackage/ddp/README.md
+++ b/vendor/github.com/gopackage/ddp/README.md
@@ -1,3 +1,5 @@
-# ddp
+# DDP in Go
-MeteorJS DDP library for Golang
+[Meteor](https://meteor.com) DDP library for Go. This library allows Go applications to connect to Meteor applications, subscribe to Meteor publications, read from a cached Collection (similar to minimongo), and call Meteor methods on the server.
+
+See `ddp/_examples` for some tips and an example app that walks through all the features of the library. \ No newline at end of file
diff --git a/vendor/github.com/gopackage/ddp/ddp_client.go b/vendor/github.com/gopackage/ddp/client.go
index 8d6323b7..205dc433 100644
--- a/vendor/github.com/gopackage/ddp/ddp_client.go
+++ b/vendor/github.com/gopackage/ddp/client.go
@@ -2,14 +2,14 @@ package ddp
import (
"encoding/json"
+ "errors"
"fmt"
"io"
- "log"
"sync"
"time"
+ "github.com/apex/log"
"golang.org/x/net/websocket"
- "errors"
)
const (
@@ -45,18 +45,14 @@ type Client struct {
// ReconnectInterval is the time between reconnections on bad connections
ReconnectInterval time.Duration
- // writeStats controls statistics gathering for current websocket writes.
+ // writeSocketStats controls statistics gathering for current websocket writes.
writeSocketStats *WriterStats
// writeStats controls statistics gathering for overall client writes.
writeStats *WriterStats
- // writeLog controls logging for client writes.
- writeLog *WriterLogger
- // readStats controls statistics gathering for current websocket reads.
+ // readSocketStats controls statistics gathering for current websocket reads.
readSocketStats *ReaderStats
// readStats controls statistics gathering for overall client reads.
readStats *ReaderStats
- // readLog control logging for clietn reads.
- readLog *ReaderLogger
// reconnects in the number of reconnections the client has made
reconnects int64
// pingsIn is the number of pings received from the server
@@ -74,7 +70,7 @@ type Client struct {
ws *websocket.Conn
// encoder is a JSON encoder to send outgoing packets to the websocket.
encoder *json.Encoder
- // url the URL the websocket is connected to
+ // url the websocket is connected to
url string
// origin is the origin for the websocket connection
origin string
@@ -85,7 +81,7 @@ type Client struct {
// pingTimer is a timer for sending regular pings to the server
pingTimer *time.Timer
// pings tracks inflight pings based on each ping ID.
- pings map[string][]*pingTracker
+ pings map[string][]*PingTracker
// calls tracks method invocations that are still in flight
calls map[string]*Call
// subs tracks active subscriptions. Map contains name->args
@@ -104,8 +100,8 @@ type Client struct {
// connectionListeners will be informed when a connection to the server is established
connectionListeners []ConnectionListener
- // idManager tracks IDs for ddp messages
- idManager
+ // KeyManager tracks IDs for ddp messages
+ KeyManager
}
// NewClient creates a default client (using an internal websocket) to the
@@ -115,7 +111,7 @@ type Client struct {
// automatically and internally handle heartbeats and reconnects.
//
// TBD create an option to use an external websocket (aka htt.Transport)
-// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport)
+// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Transport)
// TBD create an option to hijack the connection (aka http.Hijacker)
// TBD create profiling features (aka net/http/pprof)
func NewClient(url, origin string) *Client {
@@ -128,7 +124,7 @@ func NewClient(url, origin string) *Client {
origin: origin,
inbox: make(chan map[string]interface{}, 100),
errors: make(chan error, 100),
- pings: map[string][]*pingTracker{},
+ pings: map[string][]*PingTracker{},
calls: map[string]*Call{},
subs: map[string]*Call{},
connectionStatus: DISCONNECTED,
@@ -140,14 +136,9 @@ func NewClient(url, origin string) *Client {
readSocketStats: NewReaderStats(nil),
readStats: NewReaderStats(nil),
- // Loggers
- writeLog: NewWriterTextLogger(nil),
- readLog: NewReaderTextLogger(nil),
-
- idManager: *newidManager(),
+ KeyManager: *NewKeyManager(),
}
c.encoder = json.NewEncoder(c.writeStats)
- c.SetSocketLogActive(false)
// We spin off an inbox processing goroutine
go c.inboxManager()
@@ -192,10 +183,11 @@ func (c *Client) Connect() error {
ws, err := websocket.Dial(c.url, "", c.origin)
if err != nil {
c.Close()
- log.Println("Dial error", err)
+ log.WithError(err).Debug("dial error")
c.reconnectLater()
return err
}
+ log.Debug("dialed")
// Start DDP connection
c.start(ws, NewConnect())
return nil
@@ -225,7 +217,7 @@ func (c *Client) Reconnect() {
ws, err := websocket.Dial(c.url, "", c.origin)
if err != nil {
c.Close()
- log.Println("Dial error", err)
+ log.WithError(err).Debug("Dial error")
c.reconnectLater()
return
}
@@ -240,23 +232,23 @@ func (c *Client) Reconnect() {
// Send calls that haven't been confirmed - may not have been sent
// and effects should be idempotent
for _, call := range c.calls {
- c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{})))
+ IgnoreErr(c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{}))), "resend method")
}
// Resend subscriptions and patch up collections
for _, sub := range c.subs {
- c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{})))
+ IgnoreErr(c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{}))), "resend sub")
}
}
-// Subscribe subscribes to data updates.
+// Subscribe to data updates.
func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call {
if args == nil {
args = []interface{}{}
}
call := new(Call)
- call.ID = c.newID()
+ call.ID = c.Next()
call.ServiceMethod = subName
call.Args = args
call.Owner = c
@@ -269,7 +261,7 @@ func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{})
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
- log.Panic("ddp.rpc: done channel is unbuffered")
+ log.Fatal("ddp.rpc: done channel is unbuffered")
}
}
call.Done = done
@@ -279,7 +271,7 @@ func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{})
subArgs := make([]interface{}, len(args))
copy(subArgs, args)
- c.Send(NewSub(call.ID, subName, args))
+ IgnoreErr(c.Send(NewSub(call.ID, subName, args)), "send sub")
return call
}
@@ -302,7 +294,7 @@ func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{})
args = []interface{}{}
}
call := new(Call)
- call.ID = c.newID()
+ call.ID = c.Next()
call.ServiceMethod = serviceMethod
call.Args = args
call.Owner = c
@@ -314,13 +306,13 @@ func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{})
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
- log.Panic("ddp.rpc: done channel is unbuffered")
+ log.Fatal("ddp.rpc: done channel is unbuffered")
}
}
call.Done = done
c.calls[call.ID] = call
- c.Send(NewMethod(call.ID, serviceMethod, args))
+ IgnoreErr(c.Send(NewMethod(call.ID, serviceMethod, args)), "send method")
return call
}
@@ -332,10 +324,10 @@ func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, e
}
// Ping sends a heartbeat signal to the server. The Ping doesn't look for
-// a response but may trigger the connection to reconnect if the ping timesout.
+// a response but may trigger the connection to reconnect if the ping times out.
// This is primarily useful for reviving an unresponsive Client connection.
func (c *Client) Ping() {
- c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) {
+ c.PingPong(c.Next(), c.HeartbeatTimeout, func(err error) {
if err != nil {
// Is there anything else we should or can do?
c.reconnectLater()
@@ -356,9 +348,9 @@ func (c *Client) PingPong(id string, timeout time.Duration, handler func(error))
c.pingsOut++
pings, ok := c.pings[id]
if !ok {
- pings = make([]*pingTracker, 0, 5)
+ pings = make([]*PingTracker, 0, 5)
}
- tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() {
+ tracker := &PingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() {
handler(fmt.Errorf("ping timeout"))
})}
c.pings[id] = append(pings, tracker)
@@ -380,7 +372,7 @@ func (c *Client) Close() {
// Close websocket
if c.ws != nil {
- c.ws.Close()
+ IgnoreErr(c.ws.Close(), "close ws")
c.ws = nil
}
for _, collection := range c.collections {
@@ -413,18 +405,7 @@ func (c *Client) Stats() *ClientStats {
}
}
-// SocketLogActive returns the current logging status for the socket.
-func (c *Client) SocketLogActive() bool {
- return c.writeLog.Active
-}
-
-// SetSocketLogActive to true to enable logging of raw socket data.
-func (c *Client) SetSocketLogActive(active bool) {
- c.writeLog.Active = active
- c.readLog.Active = active
-}
-
-// CollectionByName retrieves a collection by it's name.
+// CollectionByName retrieves a collection by its name.
func (c *Client) CollectionByName(name string) Collection {
collection, ok := c.collections[name]
if !ok {
@@ -443,23 +424,21 @@ func (c *Client) CollectionStats() []CollectionStats {
return stats
}
-// start starts a new client connection on the provided websocket
+// start a new client connection on the provided websocket
func (c *Client) start(ws *websocket.Conn, connect *Connect) {
c.status(CONNECTING)
c.ws = ws
- c.writeLog.SetWriter(ws)
- c.writeSocketStats = NewWriterStats(c.writeLog)
- c.writeStats.SetWriter(c.writeSocketStats)
- c.readLog.SetReader(ws)
- c.readSocketStats = NewReaderStats(c.readLog)
- c.readStats.SetReader(c.readSocketStats)
+ c.writeSocketStats = NewWriterStats(c.ws)
+ c.writeStats.Writer = c.writeSocketStats
+ c.readSocketStats = NewReaderStats(c.ws)
+ c.readStats.Reader = c.readSocketStats
// We spin off an inbox stuffing goroutine
go c.inboxWorker(c.readStats)
- c.Send(connect)
+ IgnoreErr(c.Send(connect), "send connect")
}
// inboxManager pulls messages from the inbox and routes them to appropriate
@@ -470,16 +449,17 @@ func (c *Client) inboxManager() {
case msg := <-c.inbox:
// Message!
//log.Println("Got message", msg)
- mtype, ok := msg["msg"]
+ msgType, ok := msg["msg"]
if ok {
- switch mtype.(string) {
+ log.WithField("msg", msgType).Debug("recv")
+ switch msgType.(string) {
// Connection management
case "connected":
c.status(CONNECTED)
for _, collection := range c.collections {
collection.init()
}
- c.version = "1" // Currently the only version we support
+ c.version = "1" // "1" is the only version we support
c.session = msg["session"].(string)
// Start automatic heartbeats
c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() {
@@ -498,9 +478,9 @@ func (c *Client) inboxManager() {
// We received a ping - need to respond with a pong
id, ok := msg["id"]
if ok {
- c.Send(NewPong(id.(string)))
+ IgnoreErr(c.Send(NewPong(id.(string))), "send id ping")
} else {
- c.Send(NewPong(""))
+ IgnoreErr(c.Send(NewPong("")), "send empty ping")
}
c.pingsIn++
case "pong":
@@ -523,7 +503,7 @@ func (c *Client) inboxManager() {
// Live Data
case "nosub":
- log.Println("Subscription returned a nosub error", msg)
+ log.WithField("msg", msg).Debug("sub returned a nosub error")
// Clear related subscriptions
sub, ok := msg["id"]
if ok {
@@ -531,7 +511,7 @@ func (c *Client) inboxManager() {
runningSub := c.subs[id]
if runningSub != nil {
- runningSub.Error = errors.New("Subscription returned a nosub error")
+ runningSub.Error = errors.New("sub returned a nosub error")
runningSub.done()
delete(c.subs, id)
}
@@ -579,7 +559,7 @@ func (c *Client) inboxManager() {
default:
// Ignore?
- log.Println("Server sent unexpected message", msg)
+ log.WithField("msg", msg).Debug("Server sent unexpected message")
}
} else {
// Current Meteor server sends an undocumented DDP message
@@ -591,14 +571,14 @@ func (c *Client) inboxManager() {
case string:
c.serverID = ID
default:
- log.Println("Server cluster node", serverID)
+ log.WithField("id", serverID).Debug("Server cluster node")
}
} else {
- log.Println("Server sent message with no `msg` field", msg)
+ log.WithField("msg", msg).Debug("Server sent message with no `msg` field")
}
}
case err := <-c.errors:
- log.Println("Websocket error", err)
+ log.WithError(err).Warn("Websocket error")
}
}
}
@@ -632,7 +612,7 @@ func (c *Client) inboxWorker(ws io.Reader) {
c.pingTimer.Reset(c.HeartbeatInterval)
}
if event == nil {
- log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
+ log.Debug("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
break
} else {
c.inbox <- event.(map[string]interface{})
@@ -642,7 +622,7 @@ func (c *Client) inboxWorker(ws io.Reader) {
c.reconnectLater()
}
-// reconnectLater schedules a reconnect for later. We need to make sure that we don't
+// reconnectLater schedules a reconnect action for later. We need to make sure that we don't
// block, and that we don't reconnect more frequently than once every c.ReconnectInterval
func (c *Client) reconnectLater() {
c.Close()
diff --git a/vendor/github.com/gopackage/ddp/ddp_collection.go b/vendor/github.com/gopackage/ddp/collection.go
index f417e68a..f417e68a 100644
--- a/vendor/github.com/gopackage/ddp/ddp_collection.go
+++ b/vendor/github.com/gopackage/ddp/collection.go
diff --git a/vendor/github.com/gopackage/ddp/ddp_ejson.go b/vendor/github.com/gopackage/ddp/ddp_ejson.go
deleted file mode 100644
index a3e1fec0..00000000
--- a/vendor/github.com/gopackage/ddp/ddp_ejson.go
+++ /dev/null
@@ -1,217 +0,0 @@
-package ddp
-
-import (
- "crypto/sha256"
- "encoding/hex"
- "io"
- "strings"
- "time"
-)
-
-// ----------------------------------------------------------------------
-// EJSON document interface
-// ----------------------------------------------------------------------
-// https://github.com/meteor/meteor/blob/devel/packages/ddp/DDP.md#appendix-ejson
-
-// Doc provides hides the complexity of ejson documents.
-type Doc struct {
- root interface{}
-}
-
-// NewDoc creates a new document from a generic json parsed document.
-func NewDoc(in interface{}) *Doc {
- doc := &Doc{in}
- return doc
-}
-
-// Map locates a map[string]interface{} - json object - at a path
-// or returns nil if not found.
-func (d *Doc) Map(path string) map[string]interface{} {
- item := d.Item(path)
- if item != nil {
- switch m := item.(type) {
- case map[string]interface{}:
- return m
- default:
- return nil
- }
- }
- return nil
-}
-
-// Array locates an []interface{} - json array - at a path
-// or returns nil if not found.
-func (d *Doc) Array(path string) []interface{} {
- item := d.Item(path)
- if item != nil {
- switch m := item.(type) {
- case []interface{}:
- return m
- default:
- return nil
- }
- }
- return nil
-}
-
-// StringArray locates an []string - json array of strings - at a path
-// or returns nil if not found. The string array will contain all string values
-// in the array and skip any non-string entries.
-func (d *Doc) StringArray(path string) []string {
- item := d.Item(path)
- if item != nil {
- switch m := item.(type) {
- case []interface{}:
- items := []string{}
- for _, item := range m {
- switch val := item.(type) {
- case string:
- items = append(items, val)
- }
- }
- return items
- case []string:
- return m
- default:
- return nil
- }
- }
- return nil
-}
-
-// String returns a string value located at the path or an empty string if not found.
-func (d *Doc) String(path string) string {
- item := d.Item(path)
- if item != nil {
- switch m := item.(type) {
- case string:
- return m
- default:
- return ""
- }
- }
- return ""
-}
-
-// Bool returns a boolean value located at the path or false if not found.
-func (d *Doc) Bool(path string) bool {
- item := d.Item(path)
- if item != nil {
- switch m := item.(type) {
- case bool:
- return m
- default:
- return false
- }
- }
- return false
-}
-
-// Float returns a float64 value located at the path or zero if not found.
-func (d *Doc) Float(path string) float64 {
- item := d.Item(path)
- if item != nil {
- switch m := item.(type) {
- case float64:
- return m
- default:
- return 0
- }
- }
- return 0
-}
-
-// Time returns a time value located at the path or nil if not found.
-func (d *Doc) Time(path string) time.Time {
- ticks := d.Float(path + ".$date")
- var t time.Time
- if ticks > 0 {
- sec := int64(ticks / 1000)
- t = time.Unix(int64(sec), 0)
- }
- return t
-}
-
-// Item locates a "raw" item at the provided path, returning
-// the item found or nil if not found.
-func (d *Doc) Item(path string) interface{} {
- item := d.root
- steps := strings.Split(path, ".")
- for _, step := range steps {
- // This is an intermediate step - we must be in a map
- switch m := item.(type) {
- case map[string]interface{}:
- item = m[step]
- case Update:
- item = m[step]
- default:
- return nil
- }
- }
- return item
-}
-
-// Set a value for a path. Intermediate items are created as necessary.
-func (d *Doc) Set(path string, value interface{}) {
- item := d.root
- steps := strings.Split(path, ".")
- last := steps[len(steps)-1]
- steps = steps[:len(steps)-1]
- for _, step := range steps {
- // This is an intermediate step - we must be in a map
- switch m := item.(type) {
- case map[string]interface{}:
- item = m[step]
- if item == nil {
- item = map[string]interface{}{}
- m[step] = item
- }
- default:
- return
- }
- }
- // Item is now the last map so we just set the value
- switch m := item.(type) {
- case map[string]interface{}:
- m[last] = value
- }
-}
-
-// Accounts password login support
-type Login struct {
- User *User `json:"user"`
- Password *Password `json:"password"`
-}
-
-func NewEmailLogin(email, pass string) *Login {
- return &Login{User: &User{Email: email}, Password: NewPassword(pass)}
-}
-
-func NewUsernameLogin(user, pass string) *Login {
- return &Login{User: &User{Username: user}, Password: NewPassword(pass)}
-}
-
-type LoginResume struct {
- Token string `json:"resume"`
-}
-
-func NewLoginResume(token string) *LoginResume {
- return &LoginResume{Token: token}
-}
-
-type User struct {
- Email string `json:"email,omitempty"`
- Username string `json:"username,omitempty"`
-}
-
-type Password struct {
- Digest string `json:"digest"`
- Algorithm string `json:"algorithm"`
-}
-
-func NewPassword(pass string) *Password {
- sha := sha256.New()
- io.WriteString(sha, pass)
- digest := sha.Sum(nil)
- return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"}
-}
diff --git a/vendor/github.com/gopackage/ddp/ddp_stats.go b/vendor/github.com/gopackage/ddp/ddp_stats.go
deleted file mode 100644
index 1546b547..00000000
--- a/vendor/github.com/gopackage/ddp/ddp_stats.go
+++ /dev/null
@@ -1,321 +0,0 @@
-package ddp
-
-import (
- "encoding/hex"
- "fmt"
- "io"
- "log"
- "os"
- "sync"
- "time"
-)
-
-// Gather statistics about a DDP connection.
-
-// ---------------------------------------------------------
-// io utilities
-//
-// This is generic - should be moved into a stand alone lib
-// ---------------------------------------------------------
-
-// ReaderProxy provides common tooling for structs that manage an io.Reader.
-type ReaderProxy struct {
- reader io.Reader
-}
-
-// NewReaderProxy creates a new proxy for the provided reader.
-func NewReaderProxy(reader io.Reader) *ReaderProxy {
- return &ReaderProxy{reader}
-}
-
-// SetReader sets the reader on the proxy.
-func (r *ReaderProxy) SetReader(reader io.Reader) {
- r.reader = reader
-}
-
-// WriterProxy provides common tooling for structs that manage an io.Writer.
-type WriterProxy struct {
- writer io.Writer
-}
-
-// NewWriterProxy creates a new proxy for the provided writer.
-func NewWriterProxy(writer io.Writer) *WriterProxy {
- return &WriterProxy{writer}
-}
-
-// SetWriter sets the writer on the proxy.
-func (w *WriterProxy) SetWriter(writer io.Writer) {
- w.writer = writer
-}
-
-// Logging data types
-const (
- DataByte = iota // data is raw []byte
- DataText // data is string data
-)
-
-// Logger logs data from i/o sources.
-type Logger struct {
- // Active is true if the logger should be logging reads
- Active bool
- // Truncate is >0 to indicate the number of characters to truncate output
- Truncate int
-
- logger *log.Logger
- dtype int
-}
-
-// NewLogger creates a new i/o logger.
-func NewLogger(logger *log.Logger, active bool, dataType int, truncate int) Logger {
- return Logger{logger: logger, Active: active, dtype: dataType, Truncate: truncate}
-}
-
-// Log logs the current i/o operation and returns the read and error for
-// easy call chaining.
-func (l *Logger) Log(p []byte, n int, err error) (int, error) {
- if l.Active && err == nil {
- limit := n
- truncated := false
- if l.Truncate > 0 && l.Truncate < limit {
- limit = l.Truncate
- truncated = true
- }
- switch l.dtype {
- case DataText:
- if truncated {
- l.logger.Printf("[%d] %s...", n, string(p[:limit]))
- } else {
- l.logger.Printf("[%d] %s", n, string(p[:limit]))
- }
- case DataByte:
- fallthrough
- default:
- l.logger.Println(hex.Dump(p[:limit]))
- }
- }
- return n, err
-}
-
-// ReaderLogger logs data from any io.Reader.
-// ReaderLogger wraps a Reader and passes data to the actual data consumer.
-type ReaderLogger struct {
- Logger
- ReaderProxy
-}
-
-// NewReaderDataLogger creates an active binary data logger with a default
-// log.Logger and a '->' prefix.
-func NewReaderDataLogger(reader io.Reader) *ReaderLogger {
- logger := log.New(os.Stdout, "<- ", log.LstdFlags)
- return NewReaderLogger(reader, logger, true, DataByte, 0)
-}
-
-// NewReaderTextLogger creates an active binary data logger with a default
-// log.Logger and a '->' prefix.
-func NewReaderTextLogger(reader io.Reader) *ReaderLogger {
- logger := log.New(os.Stdout, "<- ", log.LstdFlags)
- return NewReaderLogger(reader, logger, true, DataText, 80)
-}
-
-// NewReaderLogger creates a Reader logger for the provided parameters.
-func NewReaderLogger(reader io.Reader, logger *log.Logger, active bool, dataType int, truncate int) *ReaderLogger {
- return &ReaderLogger{ReaderProxy: *NewReaderProxy(reader), Logger: NewLogger(logger, active, dataType, truncate)}
-}
-
-// Read logs the read bytes and passes them to the wrapped reader.
-func (r *ReaderLogger) Read(p []byte) (int, error) {
- n, err := r.reader.Read(p)
- return r.Log(p, n, err)
-}
-
-// WriterLogger logs data from any io.Writer.
-// WriterLogger wraps a Writer and passes data to the actual data producer.
-type WriterLogger struct {
- Logger
- WriterProxy
-}
-
-// NewWriterDataLogger creates an active binary data logger with a default
-// log.Logger and a '->' prefix.
-func NewWriterDataLogger(writer io.Writer) *WriterLogger {
- logger := log.New(os.Stdout, "-> ", log.LstdFlags)
- return NewWriterLogger(writer, logger, true, DataByte, 0)
-}
-
-// NewWriterTextLogger creates an active binary data logger with a default
-// log.Logger and a '->' prefix.
-func NewWriterTextLogger(writer io.Writer) *WriterLogger {
- logger := log.New(os.Stdout, "-> ", log.LstdFlags)
- return NewWriterLogger(writer, logger, true, DataText, 80)
-}
-
-// NewWriterLogger creates a Reader logger for the provided parameters.
-func NewWriterLogger(writer io.Writer, logger *log.Logger, active bool, dataType int, truncate int) *WriterLogger {
- return &WriterLogger{WriterProxy: *NewWriterProxy(writer), Logger: NewLogger(logger, active, dataType, truncate)}
-}
-
-// Write logs the written bytes and passes them to the wrapped writer.
-func (w *WriterLogger) Write(p []byte) (int, error) {
- if w.writer != nil {
- n, err := w.writer.Write(p)
- return w.Log(p, n, err)
- }
- return 0, nil
-}
-
-// Stats tracks statistics for i/o operations. Stats are produced from a
-// of a running stats agent.
-type Stats struct {
- // Bytes is the total number of bytes transferred.
- Bytes int64
- // Ops is the total number of i/o operations performed.
- Ops int64
- // Errors is the total number of i/o errors encountered.
- Errors int64
- // Runtime is the duration that stats have been gathered.
- Runtime time.Duration
-}
-
-// ClientStats displays combined statistics for the Client.
-type ClientStats struct {
- // Reads provides statistics on the raw i/o network reads for the current connection.
- Reads *Stats
- // Reads provides statistics on the raw i/o network reads for the all client connections.
- TotalReads *Stats
- // Writes provides statistics on the raw i/o network writes for the current connection.
- Writes *Stats
- // Writes provides statistics on the raw i/o network writes for all the client connections.
- TotalWrites *Stats
- // Reconnects is the number of reconnections the client has made.
- Reconnects int64
- // PingsSent is the number of pings sent by the client
- PingsSent int64
- // PingsRecv is the number of pings received by the client
- PingsRecv int64
-}
-
-// String produces a compact string representation of the client stats.
-func (stats *ClientStats) String() string {
- i := stats.Reads
- ti := stats.TotalReads
- o := stats.Writes
- to := stats.TotalWrites
- totalRun := (ti.Runtime * 1000000) / 1000000
- run := (i.Runtime * 1000000) / 1000000
- return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v",
- i.Bytes, o.Bytes,
- ti.Bytes, to.Bytes,
- i.Ops, o.Ops,
- ti.Ops, to.Ops,
- i.Errors, o.Errors,
- ti.Errors, to.Errors,
- stats.Reconnects,
- stats.PingsRecv, stats.PingsSent,
- run, totalRun)
-}
-
-// CollectionStats combines statistics about a collection.
-type CollectionStats struct {
- Name string // Name of the collection
- Count int // Count is the total number of documents in the collection
-}
-
-// String produces a compact string representation of the collection stat.
-func (s *CollectionStats) String() string {
- return fmt.Sprintf("%s[%d]", s.Name, s.Count)
-}
-
-// StatsTracker provides the basic tooling for tracking i/o stats.
-type StatsTracker struct {
- bytes int64
- ops int64
- errors int64
- start time.Time
- lock *sync.Mutex
-}
-
-// NewStatsTracker create a new stats tracker with start time set to now.
-func NewStatsTracker() *StatsTracker {
- return &StatsTracker{start: time.Now(), lock: new(sync.Mutex)}
-}
-
-// Op records an i/o operation. The parameters are passed through to
-// allow easy chaining.
-func (t *StatsTracker) Op(n int, err error) (int, error) {
- t.lock.Lock()
- defer t.lock.Unlock()
- t.ops++
- if err == nil {
- t.bytes += int64(n)
- } else {
- if err == io.EOF {
- // I don't think we should log EOF stats as an error
- } else {
- t.errors++
- }
- }
-
- return n, err
-}
-
-// Snapshot takes a snapshot of the current reader statistics.
-func (t *StatsTracker) Snapshot() *Stats {
- t.lock.Lock()
- defer t.lock.Unlock()
- return t.snap()
-}
-
-// Reset sets all of the stats to initial values.
-func (t *StatsTracker) Reset() *Stats {
- t.lock.Lock()
- defer t.lock.Unlock()
-
- stats := t.snap()
- t.bytes = 0
- t.ops = 0
- t.errors = 0
- t.start = time.Now()
-
- return stats
-}
-
-func (t *StatsTracker) snap() *Stats {
- return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)}
-}
-
-// ReaderStats tracks statistics on any io.Reader.
-// ReaderStats wraps a Reader and passes data to the actual data consumer.
-type ReaderStats struct {
- StatsTracker
- ReaderProxy
-}
-
-// NewReaderStats creates a ReaderStats object for the provided reader.
-func NewReaderStats(reader io.Reader) *ReaderStats {
- return &ReaderStats{ReaderProxy: *NewReaderProxy(reader), StatsTracker: *NewStatsTracker()}
-}
-
-// Read passes through a read collecting statistics and logging activity.
-func (r *ReaderStats) Read(p []byte) (int, error) {
- return r.Op(r.reader.Read(p))
-}
-
-// WriterStats tracks statistics on any io.Writer.
-// WriterStats wraps a Writer and passes data to the actual data producer.
-type WriterStats struct {
- StatsTracker
- WriterProxy
-}
-
-// NewWriterStats creates a WriterStats object for the provided writer.
-func NewWriterStats(writer io.Writer) *WriterStats {
- return &WriterStats{WriterProxy: *NewWriterProxy(writer), StatsTracker: *NewStatsTracker()}
-}
-
-// Write passes through a write collecting statistics.
-func (w *WriterStats) Write(p []byte) (int, error) {
- if w.writer != nil {
- return w.Op(w.writer.Write(p))
- }
- return 0, nil
-}
diff --git a/vendor/github.com/gopackage/ddp/doc.go b/vendor/github.com/gopackage/ddp/doc.go
new file mode 100644
index 00000000..97f1b63e
--- /dev/null
+++ b/vendor/github.com/gopackage/ddp/doc.go
@@ -0,0 +1,6 @@
+// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback
+// to long polling is NOT supported (and is not planned on ever being supported
+// by this library). We will try to model the library after `net/http` - right
+// now the library is bare bones and doesn't provide the plug-ability of http.
+// However, that's the goal for the package eventually.
+package ddp
diff --git a/vendor/github.com/gopackage/ddp/ddp_messages.go b/vendor/github.com/gopackage/ddp/messages.go
index 68c9eab4..fc127cee 100644
--- a/vendor/github.com/gopackage/ddp/ddp_messages.go
+++ b/vendor/github.com/gopackage/ddp/messages.go
@@ -1,9 +1,15 @@
package ddp
+import (
+ "crypto/sha256"
+ "encoding/hex"
+ "io"
+)
+
// ------------------------------------------------------------
// DDP Messages
//
-// Go structs representing DDP raw messages ready for JSON
+// Go structs representing common DDP raw messages ready for JSON
// encoding.
// ------------------------------------------------------------
@@ -80,3 +86,43 @@ func NewSub(id, subName string, args []interface{}) *Sub {
Args: args,
}
}
+
+
+// Login provides a Meteor.Accounts password login support
+type Login struct {
+ User *User `json:"user"`
+ Password *Password `json:"password"`
+}
+
+func NewEmailLogin(email, pass string) *Login {
+ return &Login{User: &User{Email: email}, Password: NewPassword(pass)}
+}
+
+func NewUsernameLogin(user, pass string) *Login {
+ return &Login{User: &User{Username: user}, Password: NewPassword(pass)}
+}
+
+type LoginResume struct {
+ Token string `json:"resume"`
+}
+
+func NewLoginResume(token string) *LoginResume {
+ return &LoginResume{Token: token}
+}
+
+type User struct {
+ Email string `json:"email,omitempty"`
+ Username string `json:"username,omitempty"`
+}
+
+type Password struct {
+ Digest string `json:"digest"`
+ Algorithm string `json:"algorithm"`
+}
+
+func NewPassword(pass string) *Password {
+ sha := sha256.New()
+ io.WriteString(sha, pass)
+ digest := sha.Sum(nil)
+ return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"}
+}
diff --git a/vendor/github.com/gopackage/ddp/stats.go b/vendor/github.com/gopackage/ddp/stats.go
new file mode 100644
index 00000000..b2548098
--- /dev/null
+++ b/vendor/github.com/gopackage/ddp/stats.go
@@ -0,0 +1,170 @@
+package ddp
+
+import (
+ "fmt"
+ "io"
+ "sync"
+ "time"
+)
+
+// Gather statistics about a DDP connection.
+
+// Stats tracks statistics for i/o operations.
+type Stats struct {
+ // Bytes is the total number of bytes transferred.
+ Bytes int64
+ // Ops is the total number of i/o operations performed.
+ Ops int64
+ // Errors is the total number of i/o errors encountered.
+ Errors int64
+ // Runtime is the duration that stats have been gathered.
+ Runtime time.Duration
+}
+
+// ClientStats displays combined statistics for the Client.
+type ClientStats struct {
+ // Reads provides statistics on the raw i/o network reads for the current connection.
+ Reads *Stats
+ // Reads provides statistics on the raw i/o network reads for the all client connections.
+ TotalReads *Stats
+ // Writes provides statistics on the raw i/o network writes for the current connection.
+ Writes *Stats
+ // Writes provides statistics on the raw i/o network writes for all the client connections.
+ TotalWrites *Stats
+ // Reconnects is the number of reconnections the client has made.
+ Reconnects int64
+ // PingsSent is the number of pings sent by the client
+ PingsSent int64
+ // PingsRecv is the number of pings received by the client
+ PingsRecv int64
+}
+
+// String produces a compact string representation of the client stats.
+func (stats *ClientStats) String() string {
+ i := stats.Reads
+ ti := stats.TotalReads
+ o := stats.Writes
+ to := stats.TotalWrites
+ totalRun := (ti.Runtime * 1000000) / 1000000
+ run := (i.Runtime * 1000000) / 1000000
+ return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v",
+ i.Bytes, o.Bytes,
+ ti.Bytes, to.Bytes,
+ i.Ops, o.Ops,
+ ti.Ops, to.Ops,
+ i.Errors, o.Errors,
+ ti.Errors, to.Errors,
+ stats.Reconnects,
+ stats.PingsRecv, stats.PingsSent,
+ run, totalRun)
+}
+
+// CollectionStats combines statistics about a collection.
+type CollectionStats struct {
+ Name string // Name of the collection
+ Count int // Count is the total number of documents in the collection
+}
+
+// String produces a compact string representation of the collection stat.
+func (s *CollectionStats) String() string {
+ return fmt.Sprintf("%s[%d]", s.Name, s.Count)
+}
+
+// StatsTracker provides the basic tooling for tracking i/o stats.
+type StatsTracker struct {
+ bytes int64
+ ops int64
+ errors int64
+ start time.Time
+ lock sync.Mutex
+}
+
+// NewStatsTracker create a new tracker with start time set to now.
+func NewStatsTracker() *StatsTracker {
+ return &StatsTracker{start: time.Now()}
+}
+
+// Op records an i/o operation. The parameters are passed through to
+// allow easy chaining.
+func (t *StatsTracker) Op(n int, err error) (int, error) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+ t.ops++
+ if err == nil {
+ t.bytes += int64(n)
+ } else {
+ if err == io.EOF {
+ // I don't think we should log EOF stats as an error
+ } else {
+ t.errors++
+ }
+ }
+
+ return n, err
+}
+
+// Snapshot takes a snapshot of the current Reader statistics.
+func (t *StatsTracker) Snapshot() *Stats {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+ return t.snap()
+}
+
+// Reset all stats to initial values.
+func (t *StatsTracker) Reset() *Stats {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ stats := t.snap()
+ t.bytes = 0
+ t.ops = 0
+ t.errors = 0
+ t.start = time.Now()
+
+ return stats
+}
+
+func (t *StatsTracker) snap() *Stats {
+ return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)}
+}
+
+// ReaderStats tracks statistics on any io.Reader.
+// ReaderStats wraps a Reader and passes data to the actual data consumer.
+type ReaderStats struct {
+ StatsTracker
+ Reader io.Reader
+}
+
+// NewReaderStats creates a ReaderStats object for the provided Reader.
+func NewReaderStats(reader io.Reader) *ReaderStats {
+ r := &ReaderStats{Reader: reader}
+ r.Reset()
+ return r
+}
+
+// Read passes through a read collecting statistics and logging activity.
+func (r *ReaderStats) Read(p []byte) (int, error) {
+ return r.Op(r.Reader.Read(p))
+}
+
+// WriterStats tracks statistics on any io.Writer.
+// WriterStats wraps a Writer and passes data to the actual data producer.
+type WriterStats struct {
+ StatsTracker
+ Writer io.Writer
+}
+
+// NewWriterStats creates a WriterStats object for the provided Writer.
+func NewWriterStats(writer io.Writer) *WriterStats {
+ w := &WriterStats{Writer: writer}
+ w.Reset()
+ return w
+}
+
+// Write collects Writer statistics.
+func (w *WriterStats) Write(p []byte) (int, error) {
+ if w.Writer != nil {
+ return w.Op(w.Writer.Write(p))
+ }
+ return 0, nil
+}
diff --git a/vendor/github.com/gopackage/ddp/time.go b/vendor/github.com/gopackage/ddp/time.go
new file mode 100644
index 00000000..584f9ce0
--- /dev/null
+++ b/vendor/github.com/gopackage/ddp/time.go
@@ -0,0 +1,49 @@
+package ddp
+
+import (
+ "encoding/json"
+ "io"
+ "strconv"
+ "time"
+)
+
+// utcOffset in milliseconds for the current local time (east of UTC).
+var utcOffset int64
+
+func init() {
+ _, offsetSeconds := time.Now().Zone()
+ utcOffset = int64(offsetSeconds * 1000)
+}
+
+// Time is an alias for time.Time with custom json marshalling implementations to support ejson.
+type Time struct {
+ time.Time
+}
+
+// UnixMilli creates a new Time from the given unix millis but in UTC (as opposed to time.UnixMilli which returns
+// time in the local time zone). This supports the proper loading of times from EJSON $date objects.
+func UnixMilli(i int64) Time {
+ return Time{Time: time.UnixMilli(i - utcOffset)}
+}
+
+func (t *Time) UnmarshalJSON(b []byte) error {
+ var data map[string]float64
+ err := json.Unmarshal(b, &data)
+ if err != nil {
+ return err
+ }
+ val, ok := data["$date"]
+ if !ok {
+ return io.ErrUnexpectedEOF
+ }
+ // The time MUST be UTC but time.UnixMilli uses local time.
+ // We see what time it is in local time and calculate the offset to UTC
+ *t = UnixMilli(int64(val))
+
+ return nil
+}
+
+func (t Time) MarshalJSON() ([]byte, error) {
+ return []byte("{\"$date\":" + strconv.FormatInt(t.UnixMilli(), 10) + "}"), nil
+}
+
diff --git a/vendor/github.com/gopackage/ddp/ddp.go b/vendor/github.com/gopackage/ddp/utils.go
index 910adafd..7ff16a20 100644
--- a/vendor/github.com/gopackage/ddp/ddp.go
+++ b/vendor/github.com/gopackage/ddp/utils.go
@@ -1,39 +1,32 @@
-// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback
-// to longpolling is NOT supported (and is not planned on ever being supported
-// by this library). We will try to model the library after `net/http` - right
-// now the library is barebones and doesn't provide the pluggability of http.
-// However, that's the goal for the package eventually.
package ddp
import (
"fmt"
- "log"
"sync"
"time"
-)
-// debugLog is true if we should log debugging information about the connection
-var debugLog = true
+ "github.com/apex/log"
+)
-// The main file contains common utility types.
+// Contains common utility types.
// -------------------------------------------------------------------
-// idManager provides simple incrementing IDs for ddp messages.
-type idManager struct {
+// KeyManager provides simple incrementing IDs for ddp messages.
+type KeyManager struct {
// nextID is the next ID for API calls
nextID uint64
// idMutex is a mutex to protect ID updates
idMutex *sync.Mutex
}
-// newidManager creates a new instance and sets up resources.
-func newidManager() *idManager {
- return &idManager{idMutex: new(sync.Mutex)}
+// NewKeyManager creates a new instance and sets up resources.
+func NewKeyManager() *KeyManager {
+ return &KeyManager{idMutex: new(sync.Mutex)}
}
-// newID issues a new ID for use in calls.
-func (id *idManager) newID() string {
+// Next issues a new ID for use in calls.
+func (id *KeyManager) Next() string {
id.idMutex.Lock()
next := id.nextID
id.nextID++
@@ -43,8 +36,8 @@ func (id *idManager) newID() string {
// -------------------------------------------------------------------
-// pingTracker tracks in-flight pings.
-type pingTracker struct {
+// PingTracker tracks in-flight pings.
+type PingTracker struct {
handler func(error)
timeout time.Duration
timer *time.Timer
@@ -72,8 +65,13 @@ func (call *Call) done() {
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
- if debugLog {
- log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
- }
+ log.Debug("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
+
+// IgnoreErr logs an error if it occurs and ignores it.
+func IgnoreErr(err error, msg string) {
+ if err != nil {
+ log.WithError(err).Debug(msg)
+ }
+} \ No newline at end of file