From 4dd8bae5c91fa4aef09d865d8fef1acd84f90925 Mon Sep 17 00:00:00 2001 From: Wim Date: Sun, 17 Oct 2021 00:47:22 +0200 Subject: Update dependencies (#1610) * Update dependencies * Update module to go 1.17 --- vendor/github.com/gopackage/ddp/.gitignore | 4 + vendor/github.com/gopackage/ddp/LICENSE | 2 +- vendor/github.com/gopackage/ddp/README.md | 6 +- vendor/github.com/gopackage/ddp/client.go | 634 +++++++++++++++++++++ vendor/github.com/gopackage/ddp/collection.go | 245 ++++++++ vendor/github.com/gopackage/ddp/ddp.go | 79 --- vendor/github.com/gopackage/ddp/ddp_client.go | 654 ---------------------- vendor/github.com/gopackage/ddp/ddp_collection.go | 245 -------- vendor/github.com/gopackage/ddp/ddp_ejson.go | 217 ------- vendor/github.com/gopackage/ddp/ddp_messages.go | 82 --- vendor/github.com/gopackage/ddp/ddp_stats.go | 321 ----------- vendor/github.com/gopackage/ddp/doc.go | 6 + vendor/github.com/gopackage/ddp/messages.go | 128 +++++ vendor/github.com/gopackage/ddp/stats.go | 170 ++++++ vendor/github.com/gopackage/ddp/time.go | 49 ++ vendor/github.com/gopackage/ddp/utils.go | 77 +++ 16 files changed, 1318 insertions(+), 1601 deletions(-) create mode 100644 vendor/github.com/gopackage/ddp/client.go create mode 100644 vendor/github.com/gopackage/ddp/collection.go delete mode 100644 vendor/github.com/gopackage/ddp/ddp.go delete mode 100644 vendor/github.com/gopackage/ddp/ddp_client.go delete mode 100644 vendor/github.com/gopackage/ddp/ddp_collection.go delete mode 100644 vendor/github.com/gopackage/ddp/ddp_ejson.go delete mode 100644 vendor/github.com/gopackage/ddp/ddp_messages.go delete mode 100644 vendor/github.com/gopackage/ddp/ddp_stats.go create mode 100644 vendor/github.com/gopackage/ddp/doc.go create mode 100644 vendor/github.com/gopackage/ddp/messages.go create mode 100644 vendor/github.com/gopackage/ddp/stats.go create mode 100644 vendor/github.com/gopackage/ddp/time.go create mode 100644 vendor/github.com/gopackage/ddp/utils.go (limited to 'vendor/github.com/gopackage/ddp') diff --git a/vendor/github.com/gopackage/ddp/.gitignore b/vendor/github.com/gopackage/ddp/.gitignore index daf913b1..e71cbee4 100644 --- a/vendor/github.com/gopackage/ddp/.gitignore +++ b/vendor/github.com/gopackage/ddp/.gitignore @@ -22,3 +22,7 @@ _testmain.go *.exe *.test *.prof + +# Editors +.idea/ +.vscode/ diff --git a/vendor/github.com/gopackage/ddp/LICENSE b/vendor/github.com/gopackage/ddp/LICENSE index 03d77e8a..98a0fe76 100644 --- a/vendor/github.com/gopackage/ddp/LICENSE +++ b/vendor/github.com/gopackage/ddp/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2015, Metamech LLC. +Copyright (c) 2021, Metamech LLC. Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above diff --git a/vendor/github.com/gopackage/ddp/README.md b/vendor/github.com/gopackage/ddp/README.md index fd62c718..7ae713b9 100644 --- a/vendor/github.com/gopackage/ddp/README.md +++ b/vendor/github.com/gopackage/ddp/README.md @@ -1,3 +1,5 @@ -# ddp +# DDP in Go -MeteorJS DDP library for Golang +[Meteor](https://meteor.com) DDP library for Go. This library allows Go applications to connect to Meteor applications, subscribe to Meteor publications, read from a cached Collection (similar to minimongo), and call Meteor methods on the server. + +See `ddp/_examples` for some tips and an example app that walks through all the features of the library. \ No newline at end of file diff --git a/vendor/github.com/gopackage/ddp/client.go b/vendor/github.com/gopackage/ddp/client.go new file mode 100644 index 00000000..205dc433 --- /dev/null +++ b/vendor/github.com/gopackage/ddp/client.go @@ -0,0 +1,634 @@ +package ddp + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/apex/log" + "golang.org/x/net/websocket" +) + +const ( + DISCONNECTED = iota + DIALING + CONNECTING + CONNECTED +) + +type ConnectionListener interface { + Connected() +} + +type ConnectionNotifier interface { + AddConnectionListener(listener ConnectionListener) +} + +type StatusListener interface { + Status(status int) +} + +type StatusNotifier interface { + AddStatusListener(listener StatusListener) +} + +// Client represents a DDP client connection. The DDP client establish a DDP +// session and acts as a message pump for other tools. +type Client struct { + // HeartbeatInterval is the time between heartbeats to send + HeartbeatInterval time.Duration + // HeartbeatTimeout is the time for a heartbeat ping to timeout + HeartbeatTimeout time.Duration + // ReconnectInterval is the time between reconnections on bad connections + ReconnectInterval time.Duration + + // writeSocketStats controls statistics gathering for current websocket writes. + writeSocketStats *WriterStats + // writeStats controls statistics gathering for overall client writes. + writeStats *WriterStats + // readSocketStats controls statistics gathering for current websocket reads. + readSocketStats *ReaderStats + // readStats controls statistics gathering for overall client reads. + readStats *ReaderStats + // reconnects in the number of reconnections the client has made + reconnects int64 + // pingsIn is the number of pings received from the server + pingsIn int64 + // pingsOut is te number of pings sent by the client + pingsOut int64 + + // session contains the DDP session token (can be used for reconnects and debugging). + session string + // version contains the negotiated DDP protocol version in use. + version string + // serverID the cluster node ID for the server we connected to + serverID string + // ws is the underlying websocket being used. + ws *websocket.Conn + // encoder is a JSON encoder to send outgoing packets to the websocket. + encoder *json.Encoder + // url the websocket is connected to + url string + // origin is the origin for the websocket connection + origin string + // inbox is an incoming message channel + inbox chan map[string]interface{} + // errors is an incoming errors channel + errors chan error + // pingTimer is a timer for sending regular pings to the server + pingTimer *time.Timer + // pings tracks inflight pings based on each ping ID. + pings map[string][]*PingTracker + // calls tracks method invocations that are still in flight + calls map[string]*Call + // subs tracks active subscriptions. Map contains name->args + subs map[string]*Call + // collections contains all the collections currently subscribed + collections map[string]Collection + // connectionStatus is the current connection status of the client + connectionStatus int + // reconnectTimer is the timer tracking reconnections + reconnectTimer *time.Timer + // reconnectLock protects access to reconnection + reconnectLock *sync.Mutex + + // statusListeners will be informed when the connection status of the client changes + statusListeners []StatusListener + // connectionListeners will be informed when a connection to the server is established + connectionListeners []ConnectionListener + + // KeyManager tracks IDs for ddp messages + KeyManager +} + +// NewClient creates a default client (using an internal websocket) to the +// provided URL using the origin for the connection. The client will +// automatically connect, upgrade to a websocket, and establish a DDP +// connection session before returning the client. The client will +// automatically and internally handle heartbeats and reconnects. +// +// TBD create an option to use an external websocket (aka htt.Transport) +// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Transport) +// TBD create an option to hijack the connection (aka http.Hijacker) +// TBD create profiling features (aka net/http/pprof) +func NewClient(url, origin string) *Client { + c := &Client{ + HeartbeatInterval: time.Minute, // Meteor impl default + 10 (we ping last) + HeartbeatTimeout: 15 * time.Second, // Meteor impl default + ReconnectInterval: 5 * time.Second, + collections: map[string]Collection{}, + url: url, + origin: origin, + inbox: make(chan map[string]interface{}, 100), + errors: make(chan error, 100), + pings: map[string][]*PingTracker{}, + calls: map[string]*Call{}, + subs: map[string]*Call{}, + connectionStatus: DISCONNECTED, + reconnectLock: &sync.Mutex{}, + + // Stats + writeSocketStats: NewWriterStats(nil), + writeStats: NewWriterStats(nil), + readSocketStats: NewReaderStats(nil), + readStats: NewReaderStats(nil), + + KeyManager: *NewKeyManager(), + } + c.encoder = json.NewEncoder(c.writeStats) + + // We spin off an inbox processing goroutine + go c.inboxManager() + + return c +} + +// Session returns the negotiated session token for the connection. +func (c *Client) Session() string { + return c.session +} + +// Version returns the negotiated protocol version in use by the client. +func (c *Client) Version() string { + return c.version +} + +// AddStatusListener in order to receive status change updates. +func (c *Client) AddStatusListener(listener StatusListener) { + c.statusListeners = append(c.statusListeners, listener) +} + +// AddConnectionListener in order to receive connection updates. +func (c *Client) AddConnectionListener(listener ConnectionListener) { + c.connectionListeners = append(c.connectionListeners, listener) +} + +// status updates all status listeners with the new client status. +func (c *Client) status(status int) { + if c.connectionStatus == status { + return + } + c.connectionStatus = status + for _, listener := range c.statusListeners { + listener.Status(status) + } +} + +// Connect attempts to connect the client to the server. +func (c *Client) Connect() error { + c.status(DIALING) + ws, err := websocket.Dial(c.url, "", c.origin) + if err != nil { + c.Close() + log.WithError(err).Debug("dial error") + c.reconnectLater() + return err + } + log.Debug("dialed") + // Start DDP connection + c.start(ws, NewConnect()) + return nil +} + +// Reconnect attempts to reconnect the client to the server on the existing +// DDP session. +// +// TODO needs a reconnect backoff so we don't trash a down server +// TODO reconnect should not allow more reconnects while a reconnection is already in progress. +func (c *Client) Reconnect() { + func() { + c.reconnectLock.Lock() + defer c.reconnectLock.Unlock() + if c.reconnectTimer != nil { + c.reconnectTimer.Stop() + c.reconnectTimer = nil + } + }() + + c.Close() + + c.reconnects++ + + // Reconnect + c.status(DIALING) + ws, err := websocket.Dial(c.url, "", c.origin) + if err != nil { + c.Close() + log.WithError(err).Debug("Dial error") + c.reconnectLater() + return + } + + c.start(ws, NewReconnect(c.session)) + + // -------------------------------------------------------------------- + // We resume inflight or ongoing subscriptions - we don't have to wait + // for connection confirmation (messages can be pipelined). + // -------------------------------------------------------------------- + + // Send calls that haven't been confirmed - may not have been sent + // and effects should be idempotent + for _, call := range c.calls { + IgnoreErr(c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{}))), "resend method") + } + + // Resend subscriptions and patch up collections + for _, sub := range c.subs { + IgnoreErr(c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{}))), "resend sub") + } +} + +// Subscribe to data updates. +func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call { + + if args == nil { + args = []interface{}{} + } + call := new(Call) + call.ID = c.Next() + call.ServiceMethod = subName + call.Args = args + call.Owner = c + + if done == nil { + done = make(chan *Call, 10) // buffered. + } else { + // If caller passes done != nil, it must arrange that + // done has enough buffer for the number of simultaneous + // RPCs that will be using that channel. If the channel + // is totally unbuffered, it's best not to run at all. + if cap(done) == 0 { + log.Fatal("ddp.rpc: done channel is unbuffered") + } + } + call.Done = done + c.subs[call.ID] = call + + // Save this subscription to the client so we can reconnect + subArgs := make([]interface{}, len(args)) + copy(subArgs, args) + + IgnoreErr(c.Send(NewSub(call.ID, subName, args)), "send sub") + + return call +} + +// Sub sends a synchronous subscription request to the server. +func (c *Client) Sub(subName string, args ...interface{}) error { + call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done + return call.Error +} + +// Go invokes the function asynchronously. It returns the Call structure representing +// the invocation. The done channel will signal when the call is complete by returning +// the same Call object. If done is nil, Go will allocate a new channel. +// If non-nil, done must be buffered or Go will deliberately crash. +// +// Go and Call are modeled after the standard `net/rpc` package versions. +func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call { + + if args == nil { + args = []interface{}{} + } + call := new(Call) + call.ID = c.Next() + call.ServiceMethod = serviceMethod + call.Args = args + call.Owner = c + if done == nil { + done = make(chan *Call, 10) // buffered. + } else { + // If caller passes done != nil, it must arrange that + // done has enough buffer for the number of simultaneous + // RPCs that will be using that channel. If the channel + // is totally unbuffered, it's best not to run at all. + if cap(done) == 0 { + log.Fatal("ddp.rpc: done channel is unbuffered") + } + } + call.Done = done + c.calls[call.ID] = call + + IgnoreErr(c.Send(NewMethod(call.ID, serviceMethod, args)), "send method") + + return call +} + +// Call invokes the named function, waits for it to complete, and returns its error status. +func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) { + call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done + return call.Reply, call.Error +} + +// Ping sends a heartbeat signal to the server. The Ping doesn't look for +// a response but may trigger the connection to reconnect if the ping times out. +// This is primarily useful for reviving an unresponsive Client connection. +func (c *Client) Ping() { + c.PingPong(c.Next(), c.HeartbeatTimeout, func(err error) { + if err != nil { + // Is there anything else we should or can do? + c.reconnectLater() + } + }) +} + +// PingPong sends a heartbeat signal to the server and calls the provided +// function when a pong is received. An optional id can be sent to help +// track the responses - or an empty string can be used. It is the +// responsibility of the caller to respond to any errors that may occur. +func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) { + err := c.Send(NewPing(id)) + if err != nil { + handler(err) + return + } + c.pingsOut++ + pings, ok := c.pings[id] + if !ok { + pings = make([]*PingTracker, 0, 5) + } + tracker := &PingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() { + handler(fmt.Errorf("ping timeout")) + })} + c.pings[id] = append(pings, tracker) +} + +// Send transmits messages to the server. The msg parameter must be json +// encoder compatible. +func (c *Client) Send(msg interface{}) error { + return c.encoder.Encode(msg) +} + +// Close implements the io.Closer interface. +func (c *Client) Close() { + // Shutdown out all outstanding pings + if c.pingTimer != nil { + c.pingTimer.Stop() + c.pingTimer = nil + } + + // Close websocket + if c.ws != nil { + IgnoreErr(c.ws.Close(), "close ws") + c.ws = nil + } + for _, collection := range c.collections { + collection.reset() + } + c.status(DISCONNECTED) +} + +// ResetStats resets the statistics for the client. +func (c *Client) ResetStats() { + c.readSocketStats.Reset() + c.readStats.Reset() + c.writeSocketStats.Reset() + c.writeStats.Reset() + c.reconnects = 0 + c.pingsIn = 0 + c.pingsOut = 0 +} + +// Stats returns the read and write statistics of the client. +func (c *Client) Stats() *ClientStats { + return &ClientStats{ + Reads: c.readSocketStats.Snapshot(), + TotalReads: c.readStats.Snapshot(), + Writes: c.writeSocketStats.Snapshot(), + TotalWrites: c.writeStats.Snapshot(), + Reconnects: c.reconnects, + PingsSent: c.pingsOut, + PingsRecv: c.pingsIn, + } +} + +// CollectionByName retrieves a collection by its name. +func (c *Client) CollectionByName(name string) Collection { + collection, ok := c.collections[name] + if !ok { + collection = NewCollection(name) + c.collections[name] = collection + } + return collection +} + +// CollectionStats returns a snapshot of statistics for the currently known collections. +func (c *Client) CollectionStats() []CollectionStats { + stats := make([]CollectionStats, 0, len(c.collections)) + for name, collection := range c.collections { + stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())}) + } + return stats +} + +// start a new client connection on the provided websocket +func (c *Client) start(ws *websocket.Conn, connect *Connect) { + + c.status(CONNECTING) + + c.ws = ws + c.writeSocketStats = NewWriterStats(c.ws) + c.writeStats.Writer = c.writeSocketStats + c.readSocketStats = NewReaderStats(c.ws) + c.readStats.Reader = c.readSocketStats + + // We spin off an inbox stuffing goroutine + go c.inboxWorker(c.readStats) + + IgnoreErr(c.Send(connect), "send connect") +} + +// inboxManager pulls messages from the inbox and routes them to appropriate +// handlers. +func (c *Client) inboxManager() { + for { + select { + case msg := <-c.inbox: + // Message! + //log.Println("Got message", msg) + msgType, ok := msg["msg"] + if ok { + log.WithField("msg", msgType).Debug("recv") + switch msgType.(string) { + // Connection management + case "connected": + c.status(CONNECTED) + for _, collection := range c.collections { + collection.init() + } + c.version = "1" // "1" is the only version we support + c.session = msg["session"].(string) + // Start automatic heartbeats + c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() { + c.Ping() + c.pingTimer.Reset(c.HeartbeatInterval) + }) + // Notify connection listeners + for _, listener := range c.connectionListeners { + go listener.Connected() + } + case "failed": + log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"]) + + // Heartbeats + case "ping": + // We received a ping - need to respond with a pong + id, ok := msg["id"] + if ok { + IgnoreErr(c.Send(NewPong(id.(string))), "send id ping") + } else { + IgnoreErr(c.Send(NewPong("")), "send empty ping") + } + c.pingsIn++ + case "pong": + // We received a pong - we can clear the ping tracker and call its handler + id, ok := msg["id"] + var key string + if ok { + key = id.(string) + } + pings, ok := c.pings[key] + if ok && len(pings) > 0 { + ping := pings[0] + pings = pings[1:] + if len(key) == 0 || len(pings) > 0 { + c.pings[key] = pings + } + ping.timer.Stop() + ping.handler(nil) + } + + // Live Data + case "nosub": + log.WithField("msg", msg).Debug("sub returned a nosub error") + // Clear related subscriptions + sub, ok := msg["id"] + if ok { + id := sub.(string) + runningSub := c.subs[id] + + if runningSub != nil { + runningSub.Error = errors.New("sub returned a nosub error") + runningSub.done() + delete(c.subs, id) + } + } + case "ready": + // Run 'done' callbacks on all ready subscriptions + subs, ok := msg["subs"] + if ok { + for _, sub := range subs.([]interface{}) { + call, ok := c.subs[sub.(string)] + if ok { + call.done() + } + } + } + case "added": + c.collectionBy(msg).added(msg) + case "changed": + c.collectionBy(msg).changed(msg) + case "removed": + c.collectionBy(msg).removed(msg) + case "addedBefore": + c.collectionBy(msg).addedBefore(msg) + case "movedBefore": + c.collectionBy(msg).movedBefore(msg) + + // RPC + case "result": + id, ok := msg["id"] + if ok { + call := c.calls[id.(string)] + delete(c.calls, id.(string)) + e, ok := msg["error"] + if ok { + txt, _ := json.Marshal(e) + call.Error = fmt.Errorf(string(txt)) + call.Reply = e + } else { + call.Reply = msg["result"] + } + call.done() + } + case "updated": + // We currently don't do anything with updated status + + default: + // Ignore? + log.WithField("msg", msg).Debug("Server sent unexpected message") + } + } else { + // Current Meteor server sends an undocumented DDP message + // (looks like clustering "hint"). We will register and + // ignore rather than log an error. + serverID, ok := msg["server_id"] + if ok { + switch ID := serverID.(type) { + case string: + c.serverID = ID + default: + log.WithField("id", serverID).Debug("Server cluster node") + } + } else { + log.WithField("msg", msg).Debug("Server sent message with no `msg` field") + } + } + case err := <-c.errors: + log.WithError(err).Warn("Websocket error") + } + } +} + +func (c *Client) collectionBy(msg map[string]interface{}) Collection { + n, ok := msg["collection"] + if !ok { + return NewMockCollection() + } + switch name := n.(type) { + case string: + return c.CollectionByName(name) + default: + return NewMockCollection() + } +} + +// inboxWorker pulls messages from a websocket, decodes JSON packets, and +// stuffs them into a message channel. +func (c *Client) inboxWorker(ws io.Reader) { + dec := json.NewDecoder(ws) + for { + var event interface{} + + if err := dec.Decode(&event); err == io.EOF { + break + } else if err != nil { + c.errors <- err + } + if c.pingTimer != nil { + c.pingTimer.Reset(c.HeartbeatInterval) + } + if event == nil { + log.Debug("Inbox worker found nil event. May be due to broken websocket. Reconnecting.") + break + } else { + c.inbox <- event.(map[string]interface{}) + } + } + + c.reconnectLater() +} + +// reconnectLater schedules a reconnect action for later. We need to make sure that we don't +// block, and that we don't reconnect more frequently than once every c.ReconnectInterval +func (c *Client) reconnectLater() { + c.Close() + c.reconnectLock.Lock() + defer c.reconnectLock.Unlock() + if c.reconnectTimer == nil { + c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect) + } +} diff --git a/vendor/github.com/gopackage/ddp/collection.go b/vendor/github.com/gopackage/ddp/collection.go new file mode 100644 index 00000000..f417e68a --- /dev/null +++ b/vendor/github.com/gopackage/ddp/collection.go @@ -0,0 +1,245 @@ +package ddp + +// ---------------------------------------------------------------------- +// Collection +// ---------------------------------------------------------------------- + +type Update map[string]interface{} +type UpdateListener interface { + CollectionUpdate(collection, operation, id string, doc Update) +} + +// Collection managed cached collection data sent from the server in a +// livedata subscription. +// +// It would be great to build an entire mongo compatible local store (minimongo) +type Collection interface { + + // FindOne queries objects and returns the first match. + FindOne(id string) Update + // FindAll returns a map of all items in the cache - this is a hack + // until we have time to build out a real minimongo interface. + FindAll() map[string]Update + // AddUpdateListener adds a channel that receives update messages. + AddUpdateListener(listener UpdateListener) + + // livedata updates + added(msg Update) + changed(msg Update) + removed(msg Update) + addedBefore(msg Update) + movedBefore(msg Update) + init() // init informs the collection that the connection to the server has begun/resumed + reset() // reset informs the collection that the connection to the server has been lost +} + +// NewMockCollection creates an empty collection that does nothing. +func NewMockCollection() Collection { + return &MockCache{} +} + +// NewCollection creates a new collection - always KeyCache. +func NewCollection(name string) Collection { + return &KeyCache{name, make(map[string]Update), nil} +} + +// KeyCache caches items keyed on unique ID. +type KeyCache struct { + // The name of the collection + Name string + // items contains collection items by ID + items map[string]Update + // listeners contains all the listeners that should be notified of collection updates. + listeners []UpdateListener + // TODO(badslug): do we need to protect from multiple threads +} + +func (c *KeyCache) added(msg Update) { + id, fields := parseUpdate(msg) + if fields != nil { + c.items[id] = fields + c.notify("create", id, fields) + } +} + +func (c *KeyCache) changed(msg Update) { + id, fields := parseUpdate(msg) + if fields != nil { + item, ok := c.items[id] + if ok { + for key, value := range fields { + item[key] = value + } + c.items[id] = item + c.notify("update", id, item) + } + } +} + +func (c *KeyCache) removed(msg Update) { + id, _ := parseUpdate(msg) + if len(id) > 0 { + delete(c.items, id) + c.notify("remove", id, nil) + } +} + +func (c *KeyCache) addedBefore(msg Update) { + // for keyed cache, ordered commands are a noop +} + +func (c *KeyCache) movedBefore(msg Update) { + // for keyed cache, ordered commands are a noop +} + +// init prepares the collection for data updates (called when a new connection is +// made or a connection/session is resumed). +func (c *KeyCache) init() { + // TODO start to patch up the current data with fresh server state +} + +func (c *KeyCache) reset() { + // TODO we should mark the collection but maintain it's contents and then + // patch up the current contents with the new contents when we receive them. + //c.items = nil + c.notify("reset", "", nil) +} + +// notify sends a Update to all UpdateListener's which should never block. +func (c *KeyCache) notify(operation, id string, doc Update) { + for _, listener := range c.listeners { + listener.CollectionUpdate(c.Name, operation, id, doc) + } +} + +// FindOne returns the item with matching id. +func (c *KeyCache) FindOne(id string) Update { + return c.items[id] +} + +// FindAll returns a dump of all items in the collection +func (c *KeyCache) FindAll() map[string]Update { + return c.items +} + +// AddUpdateListener adds a listener for changes on a collection. +func (c *KeyCache) AddUpdateListener(listener UpdateListener) { + c.listeners = append(c.listeners, listener) +} + +// OrderedCache caches items based on list order. +// This is a placeholder, currently not implemented as the Meteor server +// does not transmit ordered collections over DDP yet. +type OrderedCache struct { + // ranks contains ordered collection items for ordered collections + items []interface{} +} + +func (c *OrderedCache) added(msg Update) { + // for ordered cache, key commands are a noop +} + +func (c *OrderedCache) changed(msg Update) { + +} + +func (c *OrderedCache) removed(msg Update) { + +} + +func (c *OrderedCache) addedBefore(msg Update) { + +} + +func (c *OrderedCache) movedBefore(msg Update) { + +} + +func (c *OrderedCache) init() { + +} + +func (c *OrderedCache) reset() { + +} + +// FindOne returns the item with matching id. +func (c *OrderedCache) FindOne(id string) Update { + return nil +} + +// FindAll returns a dump of all items in the collection +func (c *OrderedCache) FindAll() map[string]Update { + return map[string]Update{} +} + +// AddUpdateListener does nothing. +func (c *OrderedCache) AddUpdateListener(ch UpdateListener) { +} + +// MockCache implements the Collection interface but does nothing with the data. +type MockCache struct { +} + +func (c *MockCache) added(msg Update) { + +} + +func (c *MockCache) changed(msg Update) { + +} + +func (c *MockCache) removed(msg Update) { + +} + +func (c *MockCache) addedBefore(msg Update) { + +} + +func (c *MockCache) movedBefore(msg Update) { + +} + +func (c *MockCache) init() { + +} + +func (c *MockCache) reset() { + +} + +// FindOne returns the item with matching id. +func (c *MockCache) FindOne(id string) Update { + return nil +} + +// FindAll returns a dump of all items in the collection +func (c *MockCache) FindAll() map[string]Update { + return map[string]Update{} +} + +// AddUpdateListener does nothing. +func (c *MockCache) AddUpdateListener(ch UpdateListener) { +} + +// parseUpdate returns the ID and fields from a DDP Update document. +func parseUpdate(up Update) (ID string, Fields Update) { + key, ok := up["id"] + if ok { + switch id := key.(type) { + case string: + updates, ok := up["fields"] + if ok { + switch fields := updates.(type) { + case map[string]interface{}: + return id, Update(fields) + default: + // Don't know what to do... + } + } + return id, nil + } + } + return "", nil +} diff --git a/vendor/github.com/gopackage/ddp/ddp.go b/vendor/github.com/gopackage/ddp/ddp.go deleted file mode 100644 index 910adafd..00000000 --- a/vendor/github.com/gopackage/ddp/ddp.go +++ /dev/null @@ -1,79 +0,0 @@ -// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback -// to longpolling is NOT supported (and is not planned on ever being supported -// by this library). We will try to model the library after `net/http` - right -// now the library is barebones and doesn't provide the pluggability of http. -// However, that's the goal for the package eventually. -package ddp - -import ( - "fmt" - "log" - "sync" - "time" -) - -// debugLog is true if we should log debugging information about the connection -var debugLog = true - -// The main file contains common utility types. - -// ------------------------------------------------------------------- - -// idManager provides simple incrementing IDs for ddp messages. -type idManager struct { - // nextID is the next ID for API calls - nextID uint64 - // idMutex is a mutex to protect ID updates - idMutex *sync.Mutex -} - -// newidManager creates a new instance and sets up resources. -func newidManager() *idManager { - return &idManager{idMutex: new(sync.Mutex)} -} - -// newID issues a new ID for use in calls. -func (id *idManager) newID() string { - id.idMutex.Lock() - next := id.nextID - id.nextID++ - id.idMutex.Unlock() - return fmt.Sprintf("%x", next) -} - -// ------------------------------------------------------------------- - -// pingTracker tracks in-flight pings. -type pingTracker struct { - handler func(error) - timeout time.Duration - timer *time.Timer -} - -// ------------------------------------------------------------------- - -// Call represents an active RPC call. -type Call struct { - ID string // The uuid for this method call - ServiceMethod string // The name of the service and method to call. - Args interface{} // The argument to the function (*struct). - Reply interface{} // The reply from the function (*struct). - Error error // After completion, the error status. - Done chan *Call // Strobes when call is complete. - Owner *Client // Client that owns the method call -} - -// done removes the call from any owners and strobes the done channel with itself. -func (call *Call) done() { - delete(call.Owner.calls, call.ID) - select { - case call.Done <- call: - // ok - default: - // We don't want to block here. It is the caller's responsibility to make - // sure the channel has enough buffer space. See comment in Go(). - if debugLog { - log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") - } - } -} diff --git a/vendor/github.com/gopackage/ddp/ddp_client.go b/vendor/github.com/gopackage/ddp/ddp_client.go deleted file mode 100644 index 8d6323b7..00000000 --- a/vendor/github.com/gopackage/ddp/ddp_client.go +++ /dev/null @@ -1,654 +0,0 @@ -package ddp - -import ( - "encoding/json" - "fmt" - "io" - "log" - "sync" - "time" - - "golang.org/x/net/websocket" - "errors" -) - -const ( - DISCONNECTED = iota - DIALING - CONNECTING - CONNECTED -) - -type ConnectionListener interface { - Connected() -} - -type ConnectionNotifier interface { - AddConnectionListener(listener ConnectionListener) -} - -type StatusListener interface { - Status(status int) -} - -type StatusNotifier interface { - AddStatusListener(listener StatusListener) -} - -// Client represents a DDP client connection. The DDP client establish a DDP -// session and acts as a message pump for other tools. -type Client struct { - // HeartbeatInterval is the time between heartbeats to send - HeartbeatInterval time.Duration - // HeartbeatTimeout is the time for a heartbeat ping to timeout - HeartbeatTimeout time.Duration - // ReconnectInterval is the time between reconnections on bad connections - ReconnectInterval time.Duration - - // writeStats controls statistics gathering for current websocket writes. - writeSocketStats *WriterStats - // writeStats controls statistics gathering for overall client writes. - writeStats *WriterStats - // writeLog controls logging for client writes. - writeLog *WriterLogger - // readStats controls statistics gathering for current websocket reads. - readSocketStats *ReaderStats - // readStats controls statistics gathering for overall client reads. - readStats *ReaderStats - // readLog control logging for clietn reads. - readLog *ReaderLogger - // reconnects in the number of reconnections the client has made - reconnects int64 - // pingsIn is the number of pings received from the server - pingsIn int64 - // pingsOut is te number of pings sent by the client - pingsOut int64 - - // session contains the DDP session token (can be used for reconnects and debugging). - session string - // version contains the negotiated DDP protocol version in use. - version string - // serverID the cluster node ID for the server we connected to - serverID string - // ws is the underlying websocket being used. - ws *websocket.Conn - // encoder is a JSON encoder to send outgoing packets to the websocket. - encoder *json.Encoder - // url the URL the websocket is connected to - url string - // origin is the origin for the websocket connection - origin string - // inbox is an incoming message channel - inbox chan map[string]interface{} - // errors is an incoming errors channel - errors chan error - // pingTimer is a timer for sending regular pings to the server - pingTimer *time.Timer - // pings tracks inflight pings based on each ping ID. - pings map[string][]*pingTracker - // calls tracks method invocations that are still in flight - calls map[string]*Call - // subs tracks active subscriptions. Map contains name->args - subs map[string]*Call - // collections contains all the collections currently subscribed - collections map[string]Collection - // connectionStatus is the current connection status of the client - connectionStatus int - // reconnectTimer is the timer tracking reconnections - reconnectTimer *time.Timer - // reconnectLock protects access to reconnection - reconnectLock *sync.Mutex - - // statusListeners will be informed when the connection status of the client changes - statusListeners []StatusListener - // connectionListeners will be informed when a connection to the server is established - connectionListeners []ConnectionListener - - // idManager tracks IDs for ddp messages - idManager -} - -// NewClient creates a default client (using an internal websocket) to the -// provided URL using the origin for the connection. The client will -// automatically connect, upgrade to a websocket, and establish a DDP -// connection session before returning the client. The client will -// automatically and internally handle heartbeats and reconnects. -// -// TBD create an option to use an external websocket (aka htt.Transport) -// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport) -// TBD create an option to hijack the connection (aka http.Hijacker) -// TBD create profiling features (aka net/http/pprof) -func NewClient(url, origin string) *Client { - c := &Client{ - HeartbeatInterval: time.Minute, // Meteor impl default + 10 (we ping last) - HeartbeatTimeout: 15 * time.Second, // Meteor impl default - ReconnectInterval: 5 * time.Second, - collections: map[string]Collection{}, - url: url, - origin: origin, - inbox: make(chan map[string]interface{}, 100), - errors: make(chan error, 100), - pings: map[string][]*pingTracker{}, - calls: map[string]*Call{}, - subs: map[string]*Call{}, - connectionStatus: DISCONNECTED, - reconnectLock: &sync.Mutex{}, - - // Stats - writeSocketStats: NewWriterStats(nil), - writeStats: NewWriterStats(nil), - readSocketStats: NewReaderStats(nil), - readStats: NewReaderStats(nil), - - // Loggers - writeLog: NewWriterTextLogger(nil), - readLog: NewReaderTextLogger(nil), - - idManager: *newidManager(), - } - c.encoder = json.NewEncoder(c.writeStats) - c.SetSocketLogActive(false) - - // We spin off an inbox processing goroutine - go c.inboxManager() - - return c -} - -// Session returns the negotiated session token for the connection. -func (c *Client) Session() string { - return c.session -} - -// Version returns the negotiated protocol version in use by the client. -func (c *Client) Version() string { - return c.version -} - -// AddStatusListener in order to receive status change updates. -func (c *Client) AddStatusListener(listener StatusListener) { - c.statusListeners = append(c.statusListeners, listener) -} - -// AddConnectionListener in order to receive connection updates. -func (c *Client) AddConnectionListener(listener ConnectionListener) { - c.connectionListeners = append(c.connectionListeners, listener) -} - -// status updates all status listeners with the new client status. -func (c *Client) status(status int) { - if c.connectionStatus == status { - return - } - c.connectionStatus = status - for _, listener := range c.statusListeners { - listener.Status(status) - } -} - -// Connect attempts to connect the client to the server. -func (c *Client) Connect() error { - c.status(DIALING) - ws, err := websocket.Dial(c.url, "", c.origin) - if err != nil { - c.Close() - log.Println("Dial error", err) - c.reconnectLater() - return err - } - // Start DDP connection - c.start(ws, NewConnect()) - return nil -} - -// Reconnect attempts to reconnect the client to the server on the existing -// DDP session. -// -// TODO needs a reconnect backoff so we don't trash a down server -// TODO reconnect should not allow more reconnects while a reconnection is already in progress. -func (c *Client) Reconnect() { - func() { - c.reconnectLock.Lock() - defer c.reconnectLock.Unlock() - if c.reconnectTimer != nil { - c.reconnectTimer.Stop() - c.reconnectTimer = nil - } - }() - - c.Close() - - c.reconnects++ - - // Reconnect - c.status(DIALING) - ws, err := websocket.Dial(c.url, "", c.origin) - if err != nil { - c.Close() - log.Println("Dial error", err) - c.reconnectLater() - return - } - - c.start(ws, NewReconnect(c.session)) - - // -------------------------------------------------------------------- - // We resume inflight or ongoing subscriptions - we don't have to wait - // for connection confirmation (messages can be pipelined). - // -------------------------------------------------------------------- - - // Send calls that haven't been confirmed - may not have been sent - // and effects should be idempotent - for _, call := range c.calls { - c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{}))) - } - - // Resend subscriptions and patch up collections - for _, sub := range c.subs { - c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{}))) - } -} - -// Subscribe subscribes to data updates. -func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call { - - if args == nil { - args = []interface{}{} - } - call := new(Call) - call.ID = c.newID() - call.ServiceMethod = subName - call.Args = args - call.Owner = c - - if done == nil { - done = make(chan *Call, 10) // buffered. - } else { - // If caller passes done != nil, it must arrange that - // done has enough buffer for the number of simultaneous - // RPCs that will be using that channel. If the channel - // is totally unbuffered, it's best not to run at all. - if cap(done) == 0 { - log.Panic("ddp.rpc: done channel is unbuffered") - } - } - call.Done = done - c.subs[call.ID] = call - - // Save this subscription to the client so we can reconnect - subArgs := make([]interface{}, len(args)) - copy(subArgs, args) - - c.Send(NewSub(call.ID, subName, args)) - - return call -} - -// Sub sends a synchronous subscription request to the server. -func (c *Client) Sub(subName string, args ...interface{}) error { - call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done - return call.Error -} - -// Go invokes the function asynchronously. It returns the Call structure representing -// the invocation. The done channel will signal when the call is complete by returning -// the same Call object. If done is nil, Go will allocate a new channel. -// If non-nil, done must be buffered or Go will deliberately crash. -// -// Go and Call are modeled after the standard `net/rpc` package versions. -func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call { - - if args == nil { - args = []interface{}{} - } - call := new(Call) - call.ID = c.newID() - call.ServiceMethod = serviceMethod - call.Args = args - call.Owner = c - if done == nil { - done = make(chan *Call, 10) // buffered. - } else { - // If caller passes done != nil, it must arrange that - // done has enough buffer for the number of simultaneous - // RPCs that will be using that channel. If the channel - // is totally unbuffered, it's best not to run at all. - if cap(done) == 0 { - log.Panic("ddp.rpc: done channel is unbuffered") - } - } - call.Done = done - c.calls[call.ID] = call - - c.Send(NewMethod(call.ID, serviceMethod, args)) - - return call -} - -// Call invokes the named function, waits for it to complete, and returns its error status. -func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) { - call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done - return call.Reply, call.Error -} - -// Ping sends a heartbeat signal to the server. The Ping doesn't look for -// a response but may trigger the connection to reconnect if the ping timesout. -// This is primarily useful for reviving an unresponsive Client connection. -func (c *Client) Ping() { - c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) { - if err != nil { - // Is there anything else we should or can do? - c.reconnectLater() - } - }) -} - -// PingPong sends a heartbeat signal to the server and calls the provided -// function when a pong is received. An optional id can be sent to help -// track the responses - or an empty string can be used. It is the -// responsibility of the caller to respond to any errors that may occur. -func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) { - err := c.Send(NewPing(id)) - if err != nil { - handler(err) - return - } - c.pingsOut++ - pings, ok := c.pings[id] - if !ok { - pings = make([]*pingTracker, 0, 5) - } - tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() { - handler(fmt.Errorf("ping timeout")) - })} - c.pings[id] = append(pings, tracker) -} - -// Send transmits messages to the server. The msg parameter must be json -// encoder compatible. -func (c *Client) Send(msg interface{}) error { - return c.encoder.Encode(msg) -} - -// Close implements the io.Closer interface. -func (c *Client) Close() { - // Shutdown out all outstanding pings - if c.pingTimer != nil { - c.pingTimer.Stop() - c.pingTimer = nil - } - - // Close websocket - if c.ws != nil { - c.ws.Close() - c.ws = nil - } - for _, collection := range c.collections { - collection.reset() - } - c.status(DISCONNECTED) -} - -// ResetStats resets the statistics for the client. -func (c *Client) ResetStats() { - c.readSocketStats.Reset() - c.readStats.Reset() - c.writeSocketStats.Reset() - c.writeStats.Reset() - c.reconnects = 0 - c.pingsIn = 0 - c.pingsOut = 0 -} - -// Stats returns the read and write statistics of the client. -func (c *Client) Stats() *ClientStats { - return &ClientStats{ - Reads: c.readSocketStats.Snapshot(), - TotalReads: c.readStats.Snapshot(), - Writes: c.writeSocketStats.Snapshot(), - TotalWrites: c.writeStats.Snapshot(), - Reconnects: c.reconnects, - PingsSent: c.pingsOut, - PingsRecv: c.pingsIn, - } -} - -// SocketLogActive returns the current logging status for the socket. -func (c *Client) SocketLogActive() bool { - return c.writeLog.Active -} - -// SetSocketLogActive to true to enable logging of raw socket data. -func (c *Client) SetSocketLogActive(active bool) { - c.writeLog.Active = active - c.readLog.Active = active -} - -// CollectionByName retrieves a collection by it's name. -func (c *Client) CollectionByName(name string) Collection { - collection, ok := c.collections[name] - if !ok { - collection = NewCollection(name) - c.collections[name] = collection - } - return collection -} - -// CollectionStats returns a snapshot of statistics for the currently known collections. -func (c *Client) CollectionStats() []CollectionStats { - stats := make([]CollectionStats, 0, len(c.collections)) - for name, collection := range c.collections { - stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())}) - } - return stats -} - -// start starts a new client connection on the provided websocket -func (c *Client) start(ws *websocket.Conn, connect *Connect) { - - c.status(CONNECTING) - - c.ws = ws - c.writeLog.SetWriter(ws) - c.writeSocketStats = NewWriterStats(c.writeLog) - c.writeStats.SetWriter(c.writeSocketStats) - c.readLog.SetReader(ws) - c.readSocketStats = NewReaderStats(c.readLog) - c.readStats.SetReader(c.readSocketStats) - - // We spin off an inbox stuffing goroutine - go c.inboxWorker(c.readStats) - - c.Send(connect) -} - -// inboxManager pulls messages from the inbox and routes them to appropriate -// handlers. -func (c *Client) inboxManager() { - for { - select { - case msg := <-c.inbox: - // Message! - //log.Println("Got message", msg) - mtype, ok := msg["msg"] - if ok { - switch mtype.(string) { - // Connection management - case "connected": - c.status(CONNECTED) - for _, collection := range c.collections { - collection.init() - } - c.version = "1" // Currently the only version we support - c.session = msg["session"].(string) - // Start automatic heartbeats - c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() { - c.Ping() - c.pingTimer.Reset(c.HeartbeatInterval) - }) - // Notify connection listeners - for _, listener := range c.connectionListeners { - go listener.Connected() - } - case "failed": - log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"]) - - // Heartbeats - case "ping": - // We received a ping - need to respond with a pong - id, ok := msg["id"] - if ok { - c.Send(NewPong(id.(string))) - } else { - c.Send(NewPong("")) - } - c.pingsIn++ - case "pong": - // We received a pong - we can clear the ping tracker and call its handler - id, ok := msg["id"] - var key string - if ok { - key = id.(string) - } - pings, ok := c.pings[key] - if ok && len(pings) > 0 { - ping := pings[0] - pings = pings[1:] - if len(key) == 0 || len(pings) > 0 { - c.pings[key] = pings - } - ping.timer.Stop() - ping.handler(nil) - } - - // Live Data - case "nosub": - log.Println("Subscription returned a nosub error", msg) - // Clear related subscriptions - sub, ok := msg["id"] - if ok { - id := sub.(string) - runningSub := c.subs[id] - - if runningSub != nil { - runningSub.Error = errors.New("Subscription returned a nosub error") - runningSub.done() - delete(c.subs, id) - } - } - case "ready": - // Run 'done' callbacks on all ready subscriptions - subs, ok := msg["subs"] - if ok { - for _, sub := range subs.([]interface{}) { - call, ok := c.subs[sub.(string)] - if ok { - call.done() - } - } - } - case "added": - c.collectionBy(msg).added(msg) - case "changed": - c.collectionBy(msg).changed(msg) - case "removed": - c.collectionBy(msg).removed(msg) - case "addedBefore": - c.collectionBy(msg).addedBefore(msg) - case "movedBefore": - c.collectionBy(msg).movedBefore(msg) - - // RPC - case "result": - id, ok := msg["id"] - if ok { - call := c.calls[id.(string)] - delete(c.calls, id.(string)) - e, ok := msg["error"] - if ok { - txt, _ := json.Marshal(e) - call.Error = fmt.Errorf(string(txt)) - call.Reply = e - } else { - call.Reply = msg["result"] - } - call.done() - } - case "updated": - // We currently don't do anything with updated status - - default: - // Ignore? - log.Println("Server sent unexpected message", msg) - } - } else { - // Current Meteor server sends an undocumented DDP message - // (looks like clustering "hint"). We will register and - // ignore rather than log an error. - serverID, ok := msg["server_id"] - if ok { - switch ID := serverID.(type) { - case string: - c.serverID = ID - default: - log.Println("Server cluster node", serverID) - } - } else { - log.Println("Server sent message with no `msg` field", msg) - } - } - case err := <-c.errors: - log.Println("Websocket error", err) - } - } -} - -func (c *Client) collectionBy(msg map[string]interface{}) Collection { - n, ok := msg["collection"] - if !ok { - return NewMockCollection() - } - switch name := n.(type) { - case string: - return c.CollectionByName(name) - default: - return NewMockCollection() - } -} - -// inboxWorker pulls messages from a websocket, decodes JSON packets, and -// stuffs them into a message channel. -func (c *Client) inboxWorker(ws io.Reader) { - dec := json.NewDecoder(ws) - for { - var event interface{} - - if err := dec.Decode(&event); err == io.EOF { - break - } else if err != nil { - c.errors <- err - } - if c.pingTimer != nil { - c.pingTimer.Reset(c.HeartbeatInterval) - } - if event == nil { - log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.") - break - } else { - c.inbox <- event.(map[string]interface{}) - } - } - - c.reconnectLater() -} - -// reconnectLater schedules a reconnect for later. We need to make sure that we don't -// block, and that we don't reconnect more frequently than once every c.ReconnectInterval -func (c *Client) reconnectLater() { - c.Close() - c.reconnectLock.Lock() - defer c.reconnectLock.Unlock() - if c.reconnectTimer == nil { - c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect) - } -} diff --git a/vendor/github.com/gopackage/ddp/ddp_collection.go b/vendor/github.com/gopackage/ddp/ddp_collection.go deleted file mode 100644 index f417e68a..00000000 --- a/vendor/github.com/gopackage/ddp/ddp_collection.go +++ /dev/null @@ -1,245 +0,0 @@ -package ddp - -// ---------------------------------------------------------------------- -// Collection -// ---------------------------------------------------------------------- - -type Update map[string]interface{} -type UpdateListener interface { - CollectionUpdate(collection, operation, id string, doc Update) -} - -// Collection managed cached collection data sent from the server in a -// livedata subscription. -// -// It would be great to build an entire mongo compatible local store (minimongo) -type Collection interface { - - // FindOne queries objects and returns the first match. - FindOne(id string) Update - // FindAll returns a map of all items in the cache - this is a hack - // until we have time to build out a real minimongo interface. - FindAll() map[string]Update - // AddUpdateListener adds a channel that receives update messages. - AddUpdateListener(listener UpdateListener) - - // livedata updates - added(msg Update) - changed(msg Update) - removed(msg Update) - addedBefore(msg Update) - movedBefore(msg Update) - init() // init informs the collection that the connection to the server has begun/resumed - reset() // reset informs the collection that the connection to the server has been lost -} - -// NewMockCollection creates an empty collection that does nothing. -func NewMockCollection() Collection { - return &MockCache{} -} - -// NewCollection creates a new collection - always KeyCache. -func NewCollection(name string) Collection { - return &KeyCache{name, make(map[string]Update), nil} -} - -// KeyCache caches items keyed on unique ID. -type KeyCache struct { - // The name of the collection - Name string - // items contains collection items by ID - items map[string]Update - // listeners contains all the listeners that should be notified of collection updates. - listeners []UpdateListener - // TODO(badslug): do we need to protect from multiple threads -} - -func (c *KeyCache) added(msg Update) { - id, fields := parseUpdate(msg) - if fields != nil { - c.items[id] = fields - c.notify("create", id, fields) - } -} - -func (c *KeyCache) changed(msg Update) { - id, fields := parseUpdate(msg) - if fields != nil { - item, ok := c.items[id] - if ok { - for key, value := range fields { - item[key] = value - } - c.items[id] = item - c.notify("update", id, item) - } - } -} - -func (c *KeyCache) removed(msg Update) { - id, _ := parseUpdate(msg) - if len(id) > 0 { - delete(c.items, id) - c.notify("remove", id, nil) - } -} - -func (c *KeyCache) addedBefore(msg Update) { - // for keyed cache, ordered commands are a noop -} - -func (c *KeyCache) movedBefore(msg Update) { - // for keyed cache, ordered commands are a noop -} - -// init prepares the collection for data updates (called when a new connection is -// made or a connection/session is resumed). -func (c *KeyCache) init() { - // TODO start to patch up the current data with fresh server state -} - -func (c *KeyCache) reset() { - // TODO we should mark the collection but maintain it's contents and then - // patch up the current contents with the new contents when we receive them. - //c.items = nil - c.notify("reset", "", nil) -} - -// notify sends a Update to all UpdateListener's which should never block. -func (c *KeyCache) notify(operation, id string, doc Update) { - for _, listener := range c.listeners { - listener.CollectionUpdate(c.Name, operation, id, doc) - } -} - -// FindOne returns the item with matching id. -func (c *KeyCache) FindOne(id string) Update { - return c.items[id] -} - -// FindAll returns a dump of all items in the collection -func (c *KeyCache) FindAll() map[string]Update { - return c.items -} - -// AddUpdateListener adds a listener for changes on a collection. -func (c *KeyCache) AddUpdateListener(listener UpdateListener) { - c.listeners = append(c.listeners, listener) -} - -// OrderedCache caches items based on list order. -// This is a placeholder, currently not implemented as the Meteor server -// does not transmit ordered collections over DDP yet. -type OrderedCache struct { - // ranks contains ordered collection items for ordered collections - items []interface{} -} - -func (c *OrderedCache) added(msg Update) { - // for ordered cache, key commands are a noop -} - -func (c *OrderedCache) changed(msg Update) { - -} - -func (c *OrderedCache) removed(msg Update) { - -} - -func (c *OrderedCache) addedBefore(msg Update) { - -} - -func (c *OrderedCache) movedBefore(msg Update) { - -} - -func (c *OrderedCache) init() { - -} - -func (c *OrderedCache) reset() { - -} - -// FindOne returns the item with matching id. -func (c *OrderedCache) FindOne(id string) Update { - return nil -} - -// FindAll returns a dump of all items in the collection -func (c *OrderedCache) FindAll() map[string]Update { - return map[string]Update{} -} - -// AddUpdateListener does nothing. -func (c *OrderedCache) AddUpdateListener(ch UpdateListener) { -} - -// MockCache implements the Collection interface but does nothing with the data. -type MockCache struct { -} - -func (c *MockCache) added(msg Update) { - -} - -func (c *MockCache) changed(msg Update) { - -} - -func (c *MockCache) removed(msg Update) { - -} - -func (c *MockCache) addedBefore(msg Update) { - -} - -func (c *MockCache) movedBefore(msg Update) { - -} - -func (c *MockCache) init() { - -} - -func (c *MockCache) reset() { - -} - -// FindOne returns the item with matching id. -func (c *MockCache) FindOne(id string) Update { - return nil -} - -// FindAll returns a dump of all items in the collection -func (c *MockCache) FindAll() map[string]Update { - return map[string]Update{} -} - -// AddUpdateListener does nothing. -func (c *MockCache) AddUpdateListener(ch UpdateListener) { -} - -// parseUpdate returns the ID and fields from a DDP Update document. -func parseUpdate(up Update) (ID string, Fields Update) { - key, ok := up["id"] - if ok { - switch id := key.(type) { - case string: - updates, ok := up["fields"] - if ok { - switch fields := updates.(type) { - case map[string]interface{}: - return id, Update(fields) - default: - // Don't know what to do... - } - } - return id, nil - } - } - return "", nil -} diff --git a/vendor/github.com/gopackage/ddp/ddp_ejson.go b/vendor/github.com/gopackage/ddp/ddp_ejson.go deleted file mode 100644 index a3e1fec0..00000000 --- a/vendor/github.com/gopackage/ddp/ddp_ejson.go +++ /dev/null @@ -1,217 +0,0 @@ -package ddp - -import ( - "crypto/sha256" - "encoding/hex" - "io" - "strings" - "time" -) - -// ---------------------------------------------------------------------- -// EJSON document interface -// ---------------------------------------------------------------------- -// https://github.com/meteor/meteor/blob/devel/packages/ddp/DDP.md#appendix-ejson - -// Doc provides hides the complexity of ejson documents. -type Doc struct { - root interface{} -} - -// NewDoc creates a new document from a generic json parsed document. -func NewDoc(in interface{}) *Doc { - doc := &Doc{in} - return doc -} - -// Map locates a map[string]interface{} - json object - at a path -// or returns nil if not found. -func (d *Doc) Map(path string) map[string]interface{} { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case map[string]interface{}: - return m - default: - return nil - } - } - return nil -} - -// Array locates an []interface{} - json array - at a path -// or returns nil if not found. -func (d *Doc) Array(path string) []interface{} { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case []interface{}: - return m - default: - return nil - } - } - return nil -} - -// StringArray locates an []string - json array of strings - at a path -// or returns nil if not found. The string array will contain all string values -// in the array and skip any non-string entries. -func (d *Doc) StringArray(path string) []string { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case []interface{}: - items := []string{} - for _, item := range m { - switch val := item.(type) { - case string: - items = append(items, val) - } - } - return items - case []string: - return m - default: - return nil - } - } - return nil -} - -// String returns a string value located at the path or an empty string if not found. -func (d *Doc) String(path string) string { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case string: - return m - default: - return "" - } - } - return "" -} - -// Bool returns a boolean value located at the path or false if not found. -func (d *Doc) Bool(path string) bool { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case bool: - return m - default: - return false - } - } - return false -} - -// Float returns a float64 value located at the path or zero if not found. -func (d *Doc) Float(path string) float64 { - item := d.Item(path) - if item != nil { - switch m := item.(type) { - case float64: - return m - default: - return 0 - } - } - return 0 -} - -// Time returns a time value located at the path or nil if not found. -func (d *Doc) Time(path string) time.Time { - ticks := d.Float(path + ".$date") - var t time.Time - if ticks > 0 { - sec := int64(ticks / 1000) - t = time.Unix(int64(sec), 0) - } - return t -} - -// Item locates a "raw" item at the provided path, returning -// the item found or nil if not found. -func (d *Doc) Item(path string) interface{} { - item := d.root - steps := strings.Split(path, ".") - for _, step := range steps { - // This is an intermediate step - we must be in a map - switch m := item.(type) { - case map[string]interface{}: - item = m[step] - case Update: - item = m[step] - default: - return nil - } - } - return item -} - -// Set a value for a path. Intermediate items are created as necessary. -func (d *Doc) Set(path string, value interface{}) { - item := d.root - steps := strings.Split(path, ".") - last := steps[len(steps)-1] - steps = steps[:len(steps)-1] - for _, step := range steps { - // This is an intermediate step - we must be in a map - switch m := item.(type) { - case map[string]interface{}: - item = m[step] - if item == nil { - item = map[string]interface{}{} - m[step] = item - } - default: - return - } - } - // Item is now the last map so we just set the value - switch m := item.(type) { - case map[string]interface{}: - m[last] = value - } -} - -// Accounts password login support -type Login struct { - User *User `json:"user"` - Password *Password `json:"password"` -} - -func NewEmailLogin(email, pass string) *Login { - return &Login{User: &User{Email: email}, Password: NewPassword(pass)} -} - -func NewUsernameLogin(user, pass string) *Login { - return &Login{User: &User{Username: user}, Password: NewPassword(pass)} -} - -type LoginResume struct { - Token string `json:"resume"` -} - -func NewLoginResume(token string) *LoginResume { - return &LoginResume{Token: token} -} - -type User struct { - Email string `json:"email,omitempty"` - Username string `json:"username,omitempty"` -} - -type Password struct { - Digest string `json:"digest"` - Algorithm string `json:"algorithm"` -} - -func NewPassword(pass string) *Password { - sha := sha256.New() - io.WriteString(sha, pass) - digest := sha.Sum(nil) - return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"} -} diff --git a/vendor/github.com/gopackage/ddp/ddp_messages.go b/vendor/github.com/gopackage/ddp/ddp_messages.go deleted file mode 100644 index 68c9eab4..00000000 --- a/vendor/github.com/gopackage/ddp/ddp_messages.go +++ /dev/null @@ -1,82 +0,0 @@ -package ddp - -// ------------------------------------------------------------ -// DDP Messages -// -// Go structs representing DDP raw messages ready for JSON -// encoding. -// ------------------------------------------------------------ - -// Message contains the common fields that all DDP messages use. -type Message struct { - Type string `json:"msg"` - ID string `json:"id,omitempty"` -} - -// Connect represents a DDP connect message. -type Connect struct { - Message - Version string `json:"version"` - Support []string `json:"support"` - Session string `json:"session,omitempty"` -} - -// NewConnect creates a new connect message -func NewConnect() *Connect { - return &Connect{Message: Message{Type: "connect"}, Version: "1", Support: []string{"1"}} -} - -// NewReconnect creates a new connect message with a session ID to resume. -func NewReconnect(session string) *Connect { - c := NewConnect() - c.Session = session - return c -} - -// Ping represents a DDP ping message. -type Ping Message - -// NewPing creates a new ping message with optional ID. -func NewPing(id string) *Ping { - return &Ping{Type: "ping", ID: id} -} - -// Pong represents a DDP pong message. -type Pong Message - -// NewPong creates a new pong message with optional ID. -func NewPong(id string) *Pong { - return &Pong{Type: "pong", ID: id} -} - -// Method is used to send a remote procedure call to the server. -type Method struct { - Message - ServiceMethod string `json:"method"` - Args []interface{} `json:"params"` -} - -// NewMethod creates a new method invocation object. -func NewMethod(id, serviceMethod string, args []interface{}) *Method { - return &Method{ - Message: Message{Type: "method", ID: id}, - ServiceMethod: serviceMethod, - Args: args, - } -} - -// Sub is used to send a subscription request to the server. -type Sub struct { - Message - SubName string `json:"name"` - Args []interface{} `json:"params"` -} - -// NewSub creates a new sub object. -func NewSub(id, subName string, args []interface{}) *Sub { - return &Sub{ - Message: Message{Type: "sub", ID: id}, - SubName: subName, - Args: args, - } -} diff --git a/vendor/github.com/gopackage/ddp/ddp_stats.go b/vendor/github.com/gopackage/ddp/ddp_stats.go deleted file mode 100644 index 1546b547..00000000 --- a/vendor/github.com/gopackage/ddp/ddp_stats.go +++ /dev/null @@ -1,321 +0,0 @@ -package ddp - -import ( - "encoding/hex" - "fmt" - "io" - "log" - "os" - "sync" - "time" -) - -// Gather statistics about a DDP connection. - -// --------------------------------------------------------- -// io utilities -// -// This is generic - should be moved into a stand alone lib -// --------------------------------------------------------- - -// ReaderProxy provides common tooling for structs that manage an io.Reader. -type ReaderProxy struct { - reader io.Reader -} - -// NewReaderProxy creates a new proxy for the provided reader. -func NewReaderProxy(reader io.Reader) *ReaderProxy { - return &ReaderProxy{reader} -} - -// SetReader sets the reader on the proxy. -func (r *ReaderProxy) SetReader(reader io.Reader) { - r.reader = reader -} - -// WriterProxy provides common tooling for structs that manage an io.Writer. -type WriterProxy struct { - writer io.Writer -} - -// NewWriterProxy creates a new proxy for the provided writer. -func NewWriterProxy(writer io.Writer) *WriterProxy { - return &WriterProxy{writer} -} - -// SetWriter sets the writer on the proxy. -func (w *WriterProxy) SetWriter(writer io.Writer) { - w.writer = writer -} - -// Logging data types -const ( - DataByte = iota // data is raw []byte - DataText // data is string data -) - -// Logger logs data from i/o sources. -type Logger struct { - // Active is true if the logger should be logging reads - Active bool - // Truncate is >0 to indicate the number of characters to truncate output - Truncate int - - logger *log.Logger - dtype int -} - -// NewLogger creates a new i/o logger. -func NewLogger(logger *log.Logger, active bool, dataType int, truncate int) Logger { - return Logger{logger: logger, Active: active, dtype: dataType, Truncate: truncate} -} - -// Log logs the current i/o operation and returns the read and error for -// easy call chaining. -func (l *Logger) Log(p []byte, n int, err error) (int, error) { - if l.Active && err == nil { - limit := n - truncated := false - if l.Truncate > 0 && l.Truncate < limit { - limit = l.Truncate - truncated = true - } - switch l.dtype { - case DataText: - if truncated { - l.logger.Printf("[%d] %s...", n, string(p[:limit])) - } else { - l.logger.Printf("[%d] %s", n, string(p[:limit])) - } - case DataByte: - fallthrough - default: - l.logger.Println(hex.Dump(p[:limit])) - } - } - return n, err -} - -// ReaderLogger logs data from any io.Reader. -// ReaderLogger wraps a Reader and passes data to the actual data consumer. -type ReaderLogger struct { - Logger - ReaderProxy -} - -// NewReaderDataLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewReaderDataLogger(reader io.Reader) *ReaderLogger { - logger := log.New(os.Stdout, "<- ", log.LstdFlags) - return NewReaderLogger(reader, logger, true, DataByte, 0) -} - -// NewReaderTextLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewReaderTextLogger(reader io.Reader) *ReaderLogger { - logger := log.New(os.Stdout, "<- ", log.LstdFlags) - return NewReaderLogger(reader, logger, true, DataText, 80) -} - -// NewReaderLogger creates a Reader logger for the provided parameters. -func NewReaderLogger(reader io.Reader, logger *log.Logger, active bool, dataType int, truncate int) *ReaderLogger { - return &ReaderLogger{ReaderProxy: *NewReaderProxy(reader), Logger: NewLogger(logger, active, dataType, truncate)} -} - -// Read logs the read bytes and passes them to the wrapped reader. -func (r *ReaderLogger) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - return r.Log(p, n, err) -} - -// WriterLogger logs data from any io.Writer. -// WriterLogger wraps a Writer and passes data to the actual data producer. -type WriterLogger struct { - Logger - WriterProxy -} - -// NewWriterDataLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewWriterDataLogger(writer io.Writer) *WriterLogger { - logger := log.New(os.Stdout, "-> ", log.LstdFlags) - return NewWriterLogger(writer, logger, true, DataByte, 0) -} - -// NewWriterTextLogger creates an active binary data logger with a default -// log.Logger and a '->' prefix. -func NewWriterTextLogger(writer io.Writer) *WriterLogger { - logger := log.New(os.Stdout, "-> ", log.LstdFlags) - return NewWriterLogger(writer, logger, true, DataText, 80) -} - -// NewWriterLogger creates a Reader logger for the provided parameters. -func NewWriterLogger(writer io.Writer, logger *log.Logger, active bool, dataType int, truncate int) *WriterLogger { - return &WriterLogger{WriterProxy: *NewWriterProxy(writer), Logger: NewLogger(logger, active, dataType, truncate)} -} - -// Write logs the written bytes and passes them to the wrapped writer. -func (w *WriterLogger) Write(p []byte) (int, error) { - if w.writer != nil { - n, err := w.writer.Write(p) - return w.Log(p, n, err) - } - return 0, nil -} - -// Stats tracks statistics for i/o operations. Stats are produced from a -// of a running stats agent. -type Stats struct { - // Bytes is the total number of bytes transferred. - Bytes int64 - // Ops is the total number of i/o operations performed. - Ops int64 - // Errors is the total number of i/o errors encountered. - Errors int64 - // Runtime is the duration that stats have been gathered. - Runtime time.Duration -} - -// ClientStats displays combined statistics for the Client. -type ClientStats struct { - // Reads provides statistics on the raw i/o network reads for the current connection. - Reads *Stats - // Reads provides statistics on the raw i/o network reads for the all client connections. - TotalReads *Stats - // Writes provides statistics on the raw i/o network writes for the current connection. - Writes *Stats - // Writes provides statistics on the raw i/o network writes for all the client connections. - TotalWrites *Stats - // Reconnects is the number of reconnections the client has made. - Reconnects int64 - // PingsSent is the number of pings sent by the client - PingsSent int64 - // PingsRecv is the number of pings received by the client - PingsRecv int64 -} - -// String produces a compact string representation of the client stats. -func (stats *ClientStats) String() string { - i := stats.Reads - ti := stats.TotalReads - o := stats.Writes - to := stats.TotalWrites - totalRun := (ti.Runtime * 1000000) / 1000000 - run := (i.Runtime * 1000000) / 1000000 - return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v", - i.Bytes, o.Bytes, - ti.Bytes, to.Bytes, - i.Ops, o.Ops, - ti.Ops, to.Ops, - i.Errors, o.Errors, - ti.Errors, to.Errors, - stats.Reconnects, - stats.PingsRecv, stats.PingsSent, - run, totalRun) -} - -// CollectionStats combines statistics about a collection. -type CollectionStats struct { - Name string // Name of the collection - Count int // Count is the total number of documents in the collection -} - -// String produces a compact string representation of the collection stat. -func (s *CollectionStats) String() string { - return fmt.Sprintf("%s[%d]", s.Name, s.Count) -} - -// StatsTracker provides the basic tooling for tracking i/o stats. -type StatsTracker struct { - bytes int64 - ops int64 - errors int64 - start time.Time - lock *sync.Mutex -} - -// NewStatsTracker create a new stats tracker with start time set to now. -func NewStatsTracker() *StatsTracker { - return &StatsTracker{start: time.Now(), lock: new(sync.Mutex)} -} - -// Op records an i/o operation. The parameters are passed through to -// allow easy chaining. -func (t *StatsTracker) Op(n int, err error) (int, error) { - t.lock.Lock() - defer t.lock.Unlock() - t.ops++ - if err == nil { - t.bytes += int64(n) - } else { - if err == io.EOF { - // I don't think we should log EOF stats as an error - } else { - t.errors++ - } - } - - return n, err -} - -// Snapshot takes a snapshot of the current reader statistics. -func (t *StatsTracker) Snapshot() *Stats { - t.lock.Lock() - defer t.lock.Unlock() - return t.snap() -} - -// Reset sets all of the stats to initial values. -func (t *StatsTracker) Reset() *Stats { - t.lock.Lock() - defer t.lock.Unlock() - - stats := t.snap() - t.bytes = 0 - t.ops = 0 - t.errors = 0 - t.start = time.Now() - - return stats -} - -func (t *StatsTracker) snap() *Stats { - return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)} -} - -// ReaderStats tracks statistics on any io.Reader. -// ReaderStats wraps a Reader and passes data to the actual data consumer. -type ReaderStats struct { - StatsTracker - ReaderProxy -} - -// NewReaderStats creates a ReaderStats object for the provided reader. -func NewReaderStats(reader io.Reader) *ReaderStats { - return &ReaderStats{ReaderProxy: *NewReaderProxy(reader), StatsTracker: *NewStatsTracker()} -} - -// Read passes through a read collecting statistics and logging activity. -func (r *ReaderStats) Read(p []byte) (int, error) { - return r.Op(r.reader.Read(p)) -} - -// WriterStats tracks statistics on any io.Writer. -// WriterStats wraps a Writer and passes data to the actual data producer. -type WriterStats struct { - StatsTracker - WriterProxy -} - -// NewWriterStats creates a WriterStats object for the provided writer. -func NewWriterStats(writer io.Writer) *WriterStats { - return &WriterStats{WriterProxy: *NewWriterProxy(writer), StatsTracker: *NewStatsTracker()} -} - -// Write passes through a write collecting statistics. -func (w *WriterStats) Write(p []byte) (int, error) { - if w.writer != nil { - return w.Op(w.writer.Write(p)) - } - return 0, nil -} diff --git a/vendor/github.com/gopackage/ddp/doc.go b/vendor/github.com/gopackage/ddp/doc.go new file mode 100644 index 00000000..97f1b63e --- /dev/null +++ b/vendor/github.com/gopackage/ddp/doc.go @@ -0,0 +1,6 @@ +// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback +// to long polling is NOT supported (and is not planned on ever being supported +// by this library). We will try to model the library after `net/http` - right +// now the library is bare bones and doesn't provide the plug-ability of http. +// However, that's the goal for the package eventually. +package ddp diff --git a/vendor/github.com/gopackage/ddp/messages.go b/vendor/github.com/gopackage/ddp/messages.go new file mode 100644 index 00000000..fc127cee --- /dev/null +++ b/vendor/github.com/gopackage/ddp/messages.go @@ -0,0 +1,128 @@ +package ddp + +import ( + "crypto/sha256" + "encoding/hex" + "io" +) + +// ------------------------------------------------------------ +// DDP Messages +// +// Go structs representing common DDP raw messages ready for JSON +// encoding. +// ------------------------------------------------------------ + +// Message contains the common fields that all DDP messages use. +type Message struct { + Type string `json:"msg"` + ID string `json:"id,omitempty"` +} + +// Connect represents a DDP connect message. +type Connect struct { + Message + Version string `json:"version"` + Support []string `json:"support"` + Session string `json:"session,omitempty"` +} + +// NewConnect creates a new connect message +func NewConnect() *Connect { + return &Connect{Message: Message{Type: "connect"}, Version: "1", Support: []string{"1"}} +} + +// NewReconnect creates a new connect message with a session ID to resume. +func NewReconnect(session string) *Connect { + c := NewConnect() + c.Session = session + return c +} + +// Ping represents a DDP ping message. +type Ping Message + +// NewPing creates a new ping message with optional ID. +func NewPing(id string) *Ping { + return &Ping{Type: "ping", ID: id} +} + +// Pong represents a DDP pong message. +type Pong Message + +// NewPong creates a new pong message with optional ID. +func NewPong(id string) *Pong { + return &Pong{Type: "pong", ID: id} +} + +// Method is used to send a remote procedure call to the server. +type Method struct { + Message + ServiceMethod string `json:"method"` + Args []interface{} `json:"params"` +} + +// NewMethod creates a new method invocation object. +func NewMethod(id, serviceMethod string, args []interface{}) *Method { + return &Method{ + Message: Message{Type: "method", ID: id}, + ServiceMethod: serviceMethod, + Args: args, + } +} + +// Sub is used to send a subscription request to the server. +type Sub struct { + Message + SubName string `json:"name"` + Args []interface{} `json:"params"` +} + +// NewSub creates a new sub object. +func NewSub(id, subName string, args []interface{}) *Sub { + return &Sub{ + Message: Message{Type: "sub", ID: id}, + SubName: subName, + Args: args, + } +} + + +// Login provides a Meteor.Accounts password login support +type Login struct { + User *User `json:"user"` + Password *Password `json:"password"` +} + +func NewEmailLogin(email, pass string) *Login { + return &Login{User: &User{Email: email}, Password: NewPassword(pass)} +} + +func NewUsernameLogin(user, pass string) *Login { + return &Login{User: &User{Username: user}, Password: NewPassword(pass)} +} + +type LoginResume struct { + Token string `json:"resume"` +} + +func NewLoginResume(token string) *LoginResume { + return &LoginResume{Token: token} +} + +type User struct { + Email string `json:"email,omitempty"` + Username string `json:"username,omitempty"` +} + +type Password struct { + Digest string `json:"digest"` + Algorithm string `json:"algorithm"` +} + +func NewPassword(pass string) *Password { + sha := sha256.New() + io.WriteString(sha, pass) + digest := sha.Sum(nil) + return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"} +} diff --git a/vendor/github.com/gopackage/ddp/stats.go b/vendor/github.com/gopackage/ddp/stats.go new file mode 100644 index 00000000..b2548098 --- /dev/null +++ b/vendor/github.com/gopackage/ddp/stats.go @@ -0,0 +1,170 @@ +package ddp + +import ( + "fmt" + "io" + "sync" + "time" +) + +// Gather statistics about a DDP connection. + +// Stats tracks statistics for i/o operations. +type Stats struct { + // Bytes is the total number of bytes transferred. + Bytes int64 + // Ops is the total number of i/o operations performed. + Ops int64 + // Errors is the total number of i/o errors encountered. + Errors int64 + // Runtime is the duration that stats have been gathered. + Runtime time.Duration +} + +// ClientStats displays combined statistics for the Client. +type ClientStats struct { + // Reads provides statistics on the raw i/o network reads for the current connection. + Reads *Stats + // Reads provides statistics on the raw i/o network reads for the all client connections. + TotalReads *Stats + // Writes provides statistics on the raw i/o network writes for the current connection. + Writes *Stats + // Writes provides statistics on the raw i/o network writes for all the client connections. + TotalWrites *Stats + // Reconnects is the number of reconnections the client has made. + Reconnects int64 + // PingsSent is the number of pings sent by the client + PingsSent int64 + // PingsRecv is the number of pings received by the client + PingsRecv int64 +} + +// String produces a compact string representation of the client stats. +func (stats *ClientStats) String() string { + i := stats.Reads + ti := stats.TotalReads + o := stats.Writes + to := stats.TotalWrites + totalRun := (ti.Runtime * 1000000) / 1000000 + run := (i.Runtime * 1000000) / 1000000 + return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v", + i.Bytes, o.Bytes, + ti.Bytes, to.Bytes, + i.Ops, o.Ops, + ti.Ops, to.Ops, + i.Errors, o.Errors, + ti.Errors, to.Errors, + stats.Reconnects, + stats.PingsRecv, stats.PingsSent, + run, totalRun) +} + +// CollectionStats combines statistics about a collection. +type CollectionStats struct { + Name string // Name of the collection + Count int // Count is the total number of documents in the collection +} + +// String produces a compact string representation of the collection stat. +func (s *CollectionStats) String() string { + return fmt.Sprintf("%s[%d]", s.Name, s.Count) +} + +// StatsTracker provides the basic tooling for tracking i/o stats. +type StatsTracker struct { + bytes int64 + ops int64 + errors int64 + start time.Time + lock sync.Mutex +} + +// NewStatsTracker create a new tracker with start time set to now. +func NewStatsTracker() *StatsTracker { + return &StatsTracker{start: time.Now()} +} + +// Op records an i/o operation. The parameters are passed through to +// allow easy chaining. +func (t *StatsTracker) Op(n int, err error) (int, error) { + t.lock.Lock() + defer t.lock.Unlock() + t.ops++ + if err == nil { + t.bytes += int64(n) + } else { + if err == io.EOF { + // I don't think we should log EOF stats as an error + } else { + t.errors++ + } + } + + return n, err +} + +// Snapshot takes a snapshot of the current Reader statistics. +func (t *StatsTracker) Snapshot() *Stats { + t.lock.Lock() + defer t.lock.Unlock() + return t.snap() +} + +// Reset all stats to initial values. +func (t *StatsTracker) Reset() *Stats { + t.lock.Lock() + defer t.lock.Unlock() + + stats := t.snap() + t.bytes = 0 + t.ops = 0 + t.errors = 0 + t.start = time.Now() + + return stats +} + +func (t *StatsTracker) snap() *Stats { + return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)} +} + +// ReaderStats tracks statistics on any io.Reader. +// ReaderStats wraps a Reader and passes data to the actual data consumer. +type ReaderStats struct { + StatsTracker + Reader io.Reader +} + +// NewReaderStats creates a ReaderStats object for the provided Reader. +func NewReaderStats(reader io.Reader) *ReaderStats { + r := &ReaderStats{Reader: reader} + r.Reset() + return r +} + +// Read passes through a read collecting statistics and logging activity. +func (r *ReaderStats) Read(p []byte) (int, error) { + return r.Op(r.Reader.Read(p)) +} + +// WriterStats tracks statistics on any io.Writer. +// WriterStats wraps a Writer and passes data to the actual data producer. +type WriterStats struct { + StatsTracker + Writer io.Writer +} + +// NewWriterStats creates a WriterStats object for the provided Writer. +func NewWriterStats(writer io.Writer) *WriterStats { + w := &WriterStats{Writer: writer} + w.Reset() + return w +} + +// Write collects Writer statistics. +func (w *WriterStats) Write(p []byte) (int, error) { + if w.Writer != nil { + return w.Op(w.Writer.Write(p)) + } + return 0, nil +} diff --git a/vendor/github.com/gopackage/ddp/time.go b/vendor/github.com/gopackage/ddp/time.go new file mode 100644 index 00000000..584f9ce0 --- /dev/null +++ b/vendor/github.com/gopackage/ddp/time.go @@ -0,0 +1,49 @@ +package ddp + +import ( + "encoding/json" + "io" + "strconv" + "time" +) + +// utcOffset in milliseconds for the current local time (east of UTC). +var utcOffset int64 + +func init() { + _, offsetSeconds := time.Now().Zone() + utcOffset = int64(offsetSeconds * 1000) +} + +// Time is an alias for time.Time with custom json marshalling implementations to support ejson. +type Time struct { + time.Time +} + +// UnixMilli creates a new Time from the given unix millis but in UTC (as opposed to time.UnixMilli which returns +// time in the local time zone). This supports the proper loading of times from EJSON $date objects. +func UnixMilli(i int64) Time { + return Time{Time: time.UnixMilli(i - utcOffset)} +} + +func (t *Time) UnmarshalJSON(b []byte) error { + var data map[string]float64 + err := json.Unmarshal(b, &data) + if err != nil { + return err + } + val, ok := data["$date"] + if !ok { + return io.ErrUnexpectedEOF + } + // The time MUST be UTC but time.UnixMilli uses local time. + // We see what time it is in local time and calculate the offset to UTC + *t = UnixMilli(int64(val)) + + return nil +} + +func (t Time) MarshalJSON() ([]byte, error) { + return []byte("{\"$date\":" + strconv.FormatInt(t.UnixMilli(), 10) + "}"), nil +} + diff --git a/vendor/github.com/gopackage/ddp/utils.go b/vendor/github.com/gopackage/ddp/utils.go new file mode 100644 index 00000000..7ff16a20 --- /dev/null +++ b/vendor/github.com/gopackage/ddp/utils.go @@ -0,0 +1,77 @@ +package ddp + +import ( + "fmt" + "sync" + "time" + + "github.com/apex/log" +) + +// Contains common utility types. + +// ------------------------------------------------------------------- + +// KeyManager provides simple incrementing IDs for ddp messages. +type KeyManager struct { + // nextID is the next ID for API calls + nextID uint64 + // idMutex is a mutex to protect ID updates + idMutex *sync.Mutex +} + +// NewKeyManager creates a new instance and sets up resources. +func NewKeyManager() *KeyManager { + return &KeyManager{idMutex: new(sync.Mutex)} +} + +// Next issues a new ID for use in calls. +func (id *KeyManager) Next() string { + id.idMutex.Lock() + next := id.nextID + id.nextID++ + id.idMutex.Unlock() + return fmt.Sprintf("%x", next) +} + +// ------------------------------------------------------------------- + +// PingTracker tracks in-flight pings. +type PingTracker struct { + handler func(error) + timeout time.Duration + timer *time.Timer +} + +// ------------------------------------------------------------------- + +// Call represents an active RPC call. +type Call struct { + ID string // The uuid for this method call + ServiceMethod string // The name of the service and method to call. + Args interface{} // The argument to the function (*struct). + Reply interface{} // The reply from the function (*struct). + Error error // After completion, the error status. + Done chan *Call // Strobes when call is complete. + Owner *Client // Client that owns the method call +} + +// done removes the call from any owners and strobes the done channel with itself. +func (call *Call) done() { + delete(call.Owner.calls, call.ID) + select { + case call.Done <- call: + // ok + default: + // We don't want to block here. It is the caller's responsibility to make + // sure the channel has enough buffer space. See comment in Go(). + log.Debug("rpc: discarding Call reply due to insufficient Done chan capacity") + } +} + +// IgnoreErr logs an error if it occurs and ignores it. +func IgnoreErr(err error, msg string) { + if err != nil { + log.WithError(err).Debug(msg) + } +} \ No newline at end of file -- cgit v1.2.3