summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/gopackage/ddp/ddp_client.go
diff options
context:
space:
mode:
authorWim <wim@42.be>2021-10-17 00:47:22 +0200
committerGitHub <noreply@github.com>2021-10-17 00:47:22 +0200
commit4dd8bae5c91fa4aef09d865d8fef1acd84f90925 (patch)
treeffad9b242daccaf8c86d1c1fbd59032302bd3be9 /vendor/github.com/gopackage/ddp/ddp_client.go
parent7ae45c42e712bd0e66c101f3f714c05aa1dc2104 (diff)
downloadmatterbridge-msglm-4dd8bae5c91fa4aef09d865d8fef1acd84f90925.tar.gz
matterbridge-msglm-4dd8bae5c91fa4aef09d865d8fef1acd84f90925.tar.bz2
matterbridge-msglm-4dd8bae5c91fa4aef09d865d8fef1acd84f90925.zip
Update dependencies (#1610)
* Update dependencies * Update module to go 1.17
Diffstat (limited to 'vendor/github.com/gopackage/ddp/ddp_client.go')
-rw-r--r--vendor/github.com/gopackage/ddp/ddp_client.go654
1 files changed, 0 insertions, 654 deletions
diff --git a/vendor/github.com/gopackage/ddp/ddp_client.go b/vendor/github.com/gopackage/ddp/ddp_client.go
deleted file mode 100644
index 8d6323b7..00000000
--- a/vendor/github.com/gopackage/ddp/ddp_client.go
+++ /dev/null
@@ -1,654 +0,0 @@
-package ddp
-
-import (
- "encoding/json"
- "fmt"
- "io"
- "log"
- "sync"
- "time"
-
- "golang.org/x/net/websocket"
- "errors"
-)
-
-const (
- DISCONNECTED = iota
- DIALING
- CONNECTING
- CONNECTED
-)
-
-type ConnectionListener interface {
- Connected()
-}
-
-type ConnectionNotifier interface {
- AddConnectionListener(listener ConnectionListener)
-}
-
-type StatusListener interface {
- Status(status int)
-}
-
-type StatusNotifier interface {
- AddStatusListener(listener StatusListener)
-}
-
-// Client represents a DDP client connection. The DDP client establish a DDP
-// session and acts as a message pump for other tools.
-type Client struct {
- // HeartbeatInterval is the time between heartbeats to send
- HeartbeatInterval time.Duration
- // HeartbeatTimeout is the time for a heartbeat ping to timeout
- HeartbeatTimeout time.Duration
- // ReconnectInterval is the time between reconnections on bad connections
- ReconnectInterval time.Duration
-
- // writeStats controls statistics gathering for current websocket writes.
- writeSocketStats *WriterStats
- // writeStats controls statistics gathering for overall client writes.
- writeStats *WriterStats
- // writeLog controls logging for client writes.
- writeLog *WriterLogger
- // readStats controls statistics gathering for current websocket reads.
- readSocketStats *ReaderStats
- // readStats controls statistics gathering for overall client reads.
- readStats *ReaderStats
- // readLog control logging for clietn reads.
- readLog *ReaderLogger
- // reconnects in the number of reconnections the client has made
- reconnects int64
- // pingsIn is the number of pings received from the server
- pingsIn int64
- // pingsOut is te number of pings sent by the client
- pingsOut int64
-
- // session contains the DDP session token (can be used for reconnects and debugging).
- session string
- // version contains the negotiated DDP protocol version in use.
- version string
- // serverID the cluster node ID for the server we connected to
- serverID string
- // ws is the underlying websocket being used.
- ws *websocket.Conn
- // encoder is a JSON encoder to send outgoing packets to the websocket.
- encoder *json.Encoder
- // url the URL the websocket is connected to
- url string
- // origin is the origin for the websocket connection
- origin string
- // inbox is an incoming message channel
- inbox chan map[string]interface{}
- // errors is an incoming errors channel
- errors chan error
- // pingTimer is a timer for sending regular pings to the server
- pingTimer *time.Timer
- // pings tracks inflight pings based on each ping ID.
- pings map[string][]*pingTracker
- // calls tracks method invocations that are still in flight
- calls map[string]*Call
- // subs tracks active subscriptions. Map contains name->args
- subs map[string]*Call
- // collections contains all the collections currently subscribed
- collections map[string]Collection
- // connectionStatus is the current connection status of the client
- connectionStatus int
- // reconnectTimer is the timer tracking reconnections
- reconnectTimer *time.Timer
- // reconnectLock protects access to reconnection
- reconnectLock *sync.Mutex
-
- // statusListeners will be informed when the connection status of the client changes
- statusListeners []StatusListener
- // connectionListeners will be informed when a connection to the server is established
- connectionListeners []ConnectionListener
-
- // idManager tracks IDs for ddp messages
- idManager
-}
-
-// NewClient creates a default client (using an internal websocket) to the
-// provided URL using the origin for the connection. The client will
-// automatically connect, upgrade to a websocket, and establish a DDP
-// connection session before returning the client. The client will
-// automatically and internally handle heartbeats and reconnects.
-//
-// TBD create an option to use an external websocket (aka htt.Transport)
-// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport)
-// TBD create an option to hijack the connection (aka http.Hijacker)
-// TBD create profiling features (aka net/http/pprof)
-func NewClient(url, origin string) *Client {
- c := &Client{
- HeartbeatInterval: time.Minute, // Meteor impl default + 10 (we ping last)
- HeartbeatTimeout: 15 * time.Second, // Meteor impl default
- ReconnectInterval: 5 * time.Second,
- collections: map[string]Collection{},
- url: url,
- origin: origin,
- inbox: make(chan map[string]interface{}, 100),
- errors: make(chan error, 100),
- pings: map[string][]*pingTracker{},
- calls: map[string]*Call{},
- subs: map[string]*Call{},
- connectionStatus: DISCONNECTED,
- reconnectLock: &sync.Mutex{},
-
- // Stats
- writeSocketStats: NewWriterStats(nil),
- writeStats: NewWriterStats(nil),
- readSocketStats: NewReaderStats(nil),
- readStats: NewReaderStats(nil),
-
- // Loggers
- writeLog: NewWriterTextLogger(nil),
- readLog: NewReaderTextLogger(nil),
-
- idManager: *newidManager(),
- }
- c.encoder = json.NewEncoder(c.writeStats)
- c.SetSocketLogActive(false)
-
- // We spin off an inbox processing goroutine
- go c.inboxManager()
-
- return c
-}
-
-// Session returns the negotiated session token for the connection.
-func (c *Client) Session() string {
- return c.session
-}
-
-// Version returns the negotiated protocol version in use by the client.
-func (c *Client) Version() string {
- return c.version
-}
-
-// AddStatusListener in order to receive status change updates.
-func (c *Client) AddStatusListener(listener StatusListener) {
- c.statusListeners = append(c.statusListeners, listener)
-}
-
-// AddConnectionListener in order to receive connection updates.
-func (c *Client) AddConnectionListener(listener ConnectionListener) {
- c.connectionListeners = append(c.connectionListeners, listener)
-}
-
-// status updates all status listeners with the new client status.
-func (c *Client) status(status int) {
- if c.connectionStatus == status {
- return
- }
- c.connectionStatus = status
- for _, listener := range c.statusListeners {
- listener.Status(status)
- }
-}
-
-// Connect attempts to connect the client to the server.
-func (c *Client) Connect() error {
- c.status(DIALING)
- ws, err := websocket.Dial(c.url, "", c.origin)
- if err != nil {
- c.Close()
- log.Println("Dial error", err)
- c.reconnectLater()
- return err
- }
- // Start DDP connection
- c.start(ws, NewConnect())
- return nil
-}
-
-// Reconnect attempts to reconnect the client to the server on the existing
-// DDP session.
-//
-// TODO needs a reconnect backoff so we don't trash a down server
-// TODO reconnect should not allow more reconnects while a reconnection is already in progress.
-func (c *Client) Reconnect() {
- func() {
- c.reconnectLock.Lock()
- defer c.reconnectLock.Unlock()
- if c.reconnectTimer != nil {
- c.reconnectTimer.Stop()
- c.reconnectTimer = nil
- }
- }()
-
- c.Close()
-
- c.reconnects++
-
- // Reconnect
- c.status(DIALING)
- ws, err := websocket.Dial(c.url, "", c.origin)
- if err != nil {
- c.Close()
- log.Println("Dial error", err)
- c.reconnectLater()
- return
- }
-
- c.start(ws, NewReconnect(c.session))
-
- // --------------------------------------------------------------------
- // We resume inflight or ongoing subscriptions - we don't have to wait
- // for connection confirmation (messages can be pipelined).
- // --------------------------------------------------------------------
-
- // Send calls that haven't been confirmed - may not have been sent
- // and effects should be idempotent
- for _, call := range c.calls {
- c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{})))
- }
-
- // Resend subscriptions and patch up collections
- for _, sub := range c.subs {
- c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{})))
- }
-}
-
-// Subscribe subscribes to data updates.
-func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call {
-
- if args == nil {
- args = []interface{}{}
- }
- call := new(Call)
- call.ID = c.newID()
- call.ServiceMethod = subName
- call.Args = args
- call.Owner = c
-
- if done == nil {
- done = make(chan *Call, 10) // buffered.
- } else {
- // If caller passes done != nil, it must arrange that
- // done has enough buffer for the number of simultaneous
- // RPCs that will be using that channel. If the channel
- // is totally unbuffered, it's best not to run at all.
- if cap(done) == 0 {
- log.Panic("ddp.rpc: done channel is unbuffered")
- }
- }
- call.Done = done
- c.subs[call.ID] = call
-
- // Save this subscription to the client so we can reconnect
- subArgs := make([]interface{}, len(args))
- copy(subArgs, args)
-
- c.Send(NewSub(call.ID, subName, args))
-
- return call
-}
-
-// Sub sends a synchronous subscription request to the server.
-func (c *Client) Sub(subName string, args ...interface{}) error {
- call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done
- return call.Error
-}
-
-// Go invokes the function asynchronously. It returns the Call structure representing
-// the invocation. The done channel will signal when the call is complete by returning
-// the same Call object. If done is nil, Go will allocate a new channel.
-// If non-nil, done must be buffered or Go will deliberately crash.
-//
-// Go and Call are modeled after the standard `net/rpc` package versions.
-func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call {
-
- if args == nil {
- args = []interface{}{}
- }
- call := new(Call)
- call.ID = c.newID()
- call.ServiceMethod = serviceMethod
- call.Args = args
- call.Owner = c
- if done == nil {
- done = make(chan *Call, 10) // buffered.
- } else {
- // If caller passes done != nil, it must arrange that
- // done has enough buffer for the number of simultaneous
- // RPCs that will be using that channel. If the channel
- // is totally unbuffered, it's best not to run at all.
- if cap(done) == 0 {
- log.Panic("ddp.rpc: done channel is unbuffered")
- }
- }
- call.Done = done
- c.calls[call.ID] = call
-
- c.Send(NewMethod(call.ID, serviceMethod, args))
-
- return call
-}
-
-// Call invokes the named function, waits for it to complete, and returns its error status.
-func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) {
- call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done
- return call.Reply, call.Error
-}
-
-// Ping sends a heartbeat signal to the server. The Ping doesn't look for
-// a response but may trigger the connection to reconnect if the ping timesout.
-// This is primarily useful for reviving an unresponsive Client connection.
-func (c *Client) Ping() {
- c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) {
- if err != nil {
- // Is there anything else we should or can do?
- c.reconnectLater()
- }
- })
-}
-
-// PingPong sends a heartbeat signal to the server and calls the provided
-// function when a pong is received. An optional id can be sent to help
-// track the responses - or an empty string can be used. It is the
-// responsibility of the caller to respond to any errors that may occur.
-func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) {
- err := c.Send(NewPing(id))
- if err != nil {
- handler(err)
- return
- }
- c.pingsOut++
- pings, ok := c.pings[id]
- if !ok {
- pings = make([]*pingTracker, 0, 5)
- }
- tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() {
- handler(fmt.Errorf("ping timeout"))
- })}
- c.pings[id] = append(pings, tracker)
-}
-
-// Send transmits messages to the server. The msg parameter must be json
-// encoder compatible.
-func (c *Client) Send(msg interface{}) error {
- return c.encoder.Encode(msg)
-}
-
-// Close implements the io.Closer interface.
-func (c *Client) Close() {
- // Shutdown out all outstanding pings
- if c.pingTimer != nil {
- c.pingTimer.Stop()
- c.pingTimer = nil
- }
-
- // Close websocket
- if c.ws != nil {
- c.ws.Close()
- c.ws = nil
- }
- for _, collection := range c.collections {
- collection.reset()
- }
- c.status(DISCONNECTED)
-}
-
-// ResetStats resets the statistics for the client.
-func (c *Client) ResetStats() {
- c.readSocketStats.Reset()
- c.readStats.Reset()
- c.writeSocketStats.Reset()
- c.writeStats.Reset()
- c.reconnects = 0
- c.pingsIn = 0
- c.pingsOut = 0
-}
-
-// Stats returns the read and write statistics of the client.
-func (c *Client) Stats() *ClientStats {
- return &ClientStats{
- Reads: c.readSocketStats.Snapshot(),
- TotalReads: c.readStats.Snapshot(),
- Writes: c.writeSocketStats.Snapshot(),
- TotalWrites: c.writeStats.Snapshot(),
- Reconnects: c.reconnects,
- PingsSent: c.pingsOut,
- PingsRecv: c.pingsIn,
- }
-}
-
-// SocketLogActive returns the current logging status for the socket.
-func (c *Client) SocketLogActive() bool {
- return c.writeLog.Active
-}
-
-// SetSocketLogActive to true to enable logging of raw socket data.
-func (c *Client) SetSocketLogActive(active bool) {
- c.writeLog.Active = active
- c.readLog.Active = active
-}
-
-// CollectionByName retrieves a collection by it's name.
-func (c *Client) CollectionByName(name string) Collection {
- collection, ok := c.collections[name]
- if !ok {
- collection = NewCollection(name)
- c.collections[name] = collection
- }
- return collection
-}
-
-// CollectionStats returns a snapshot of statistics for the currently known collections.
-func (c *Client) CollectionStats() []CollectionStats {
- stats := make([]CollectionStats, 0, len(c.collections))
- for name, collection := range c.collections {
- stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())})
- }
- return stats
-}
-
-// start starts a new client connection on the provided websocket
-func (c *Client) start(ws *websocket.Conn, connect *Connect) {
-
- c.status(CONNECTING)
-
- c.ws = ws
- c.writeLog.SetWriter(ws)
- c.writeSocketStats = NewWriterStats(c.writeLog)
- c.writeStats.SetWriter(c.writeSocketStats)
- c.readLog.SetReader(ws)
- c.readSocketStats = NewReaderStats(c.readLog)
- c.readStats.SetReader(c.readSocketStats)
-
- // We spin off an inbox stuffing goroutine
- go c.inboxWorker(c.readStats)
-
- c.Send(connect)
-}
-
-// inboxManager pulls messages from the inbox and routes them to appropriate
-// handlers.
-func (c *Client) inboxManager() {
- for {
- select {
- case msg := <-c.inbox:
- // Message!
- //log.Println("Got message", msg)
- mtype, ok := msg["msg"]
- if ok {
- switch mtype.(string) {
- // Connection management
- case "connected":
- c.status(CONNECTED)
- for _, collection := range c.collections {
- collection.init()
- }
- c.version = "1" // Currently the only version we support
- c.session = msg["session"].(string)
- // Start automatic heartbeats
- c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() {
- c.Ping()
- c.pingTimer.Reset(c.HeartbeatInterval)
- })
- // Notify connection listeners
- for _, listener := range c.connectionListeners {
- go listener.Connected()
- }
- case "failed":
- log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"])
-
- // Heartbeats
- case "ping":
- // We received a ping - need to respond with a pong
- id, ok := msg["id"]
- if ok {
- c.Send(NewPong(id.(string)))
- } else {
- c.Send(NewPong(""))
- }
- c.pingsIn++
- case "pong":
- // We received a pong - we can clear the ping tracker and call its handler
- id, ok := msg["id"]
- var key string
- if ok {
- key = id.(string)
- }
- pings, ok := c.pings[key]
- if ok && len(pings) > 0 {
- ping := pings[0]
- pings = pings[1:]
- if len(key) == 0 || len(pings) > 0 {
- c.pings[key] = pings
- }
- ping.timer.Stop()
- ping.handler(nil)
- }
-
- // Live Data
- case "nosub":
- log.Println("Subscription returned a nosub error", msg)
- // Clear related subscriptions
- sub, ok := msg["id"]
- if ok {
- id := sub.(string)
- runningSub := c.subs[id]
-
- if runningSub != nil {
- runningSub.Error = errors.New("Subscription returned a nosub error")
- runningSub.done()
- delete(c.subs, id)
- }
- }
- case "ready":
- // Run 'done' callbacks on all ready subscriptions
- subs, ok := msg["subs"]
- if ok {
- for _, sub := range subs.([]interface{}) {
- call, ok := c.subs[sub.(string)]
- if ok {
- call.done()
- }
- }
- }
- case "added":
- c.collectionBy(msg).added(msg)
- case "changed":
- c.collectionBy(msg).changed(msg)
- case "removed":
- c.collectionBy(msg).removed(msg)
- case "addedBefore":
- c.collectionBy(msg).addedBefore(msg)
- case "movedBefore":
- c.collectionBy(msg).movedBefore(msg)
-
- // RPC
- case "result":
- id, ok := msg["id"]
- if ok {
- call := c.calls[id.(string)]
- delete(c.calls, id.(string))
- e, ok := msg["error"]
- if ok {
- txt, _ := json.Marshal(e)
- call.Error = fmt.Errorf(string(txt))
- call.Reply = e
- } else {
- call.Reply = msg["result"]
- }
- call.done()
- }
- case "updated":
- // We currently don't do anything with updated status
-
- default:
- // Ignore?
- log.Println("Server sent unexpected message", msg)
- }
- } else {
- // Current Meteor server sends an undocumented DDP message
- // (looks like clustering "hint"). We will register and
- // ignore rather than log an error.
- serverID, ok := msg["server_id"]
- if ok {
- switch ID := serverID.(type) {
- case string:
- c.serverID = ID
- default:
- log.Println("Server cluster node", serverID)
- }
- } else {
- log.Println("Server sent message with no `msg` field", msg)
- }
- }
- case err := <-c.errors:
- log.Println("Websocket error", err)
- }
- }
-}
-
-func (c *Client) collectionBy(msg map[string]interface{}) Collection {
- n, ok := msg["collection"]
- if !ok {
- return NewMockCollection()
- }
- switch name := n.(type) {
- case string:
- return c.CollectionByName(name)
- default:
- return NewMockCollection()
- }
-}
-
-// inboxWorker pulls messages from a websocket, decodes JSON packets, and
-// stuffs them into a message channel.
-func (c *Client) inboxWorker(ws io.Reader) {
- dec := json.NewDecoder(ws)
- for {
- var event interface{}
-
- if err := dec.Decode(&event); err == io.EOF {
- break
- } else if err != nil {
- c.errors <- err
- }
- if c.pingTimer != nil {
- c.pingTimer.Reset(c.HeartbeatInterval)
- }
- if event == nil {
- log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
- break
- } else {
- c.inbox <- event.(map[string]interface{})
- }
- }
-
- c.reconnectLater()
-}
-
-// reconnectLater schedules a reconnect for later. We need to make sure that we don't
-// block, and that we don't reconnect more frequently than once every c.ReconnectInterval
-func (c *Client) reconnectLater() {
- c.Close()
- c.reconnectLock.Lock()
- defer c.reconnectLock.Unlock()
- if c.reconnectTimer == nil {
- c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect)
- }
-}