summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/gopackage/ddp/ddp_client.go
blob: 8d6323b75955a3b41c41b5f8cb0357895a70d7d7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
package ddp

import (
	"encoding/json"
	"fmt"
	"io"
	"log"
	"sync"
	"time"

	"golang.org/x/net/websocket"
	"errors"
)

const (
	DISCONNECTED = iota
	DIALING
	CONNECTING
	CONNECTED
)

type ConnectionListener interface {
	Connected()
}

type ConnectionNotifier interface {
	AddConnectionListener(listener ConnectionListener)
}

type StatusListener interface {
	Status(status int)
}

type StatusNotifier interface {
	AddStatusListener(listener StatusListener)
}

// Client represents a DDP client connection. The DDP client establish a DDP
// session and acts as a message pump for other tools.
type Client struct {
	// HeartbeatInterval is the time between heartbeats to send
	HeartbeatInterval time.Duration
	// HeartbeatTimeout is the time for a heartbeat ping to timeout
	HeartbeatTimeout time.Duration
	// ReconnectInterval is the time between reconnections on bad connections
	ReconnectInterval time.Duration

	// writeStats controls statistics gathering for current websocket writes.
	writeSocketStats *WriterStats
	// writeStats controls statistics gathering for overall client writes.
	writeStats *WriterStats
	// writeLog controls logging for client writes.
	writeLog *WriterLogger
	// readStats controls statistics gathering for current websocket reads.
	readSocketStats *ReaderStats
	// readStats controls statistics gathering for overall client reads.
	readStats *ReaderStats
	// readLog control logging for clietn reads.
	readLog *ReaderLogger
	// reconnects in the number of reconnections the client has made
	reconnects int64
	// pingsIn is the number of pings received from the server
	pingsIn int64
	// pingsOut is te number of pings sent by the client
	pingsOut int64

	// session contains the DDP session token (can be used for reconnects and debugging).
	session string
	// version contains the negotiated DDP protocol version in use.
	version string
	// serverID the cluster node ID for the server we connected to
	serverID string
	// ws is the underlying websocket being used.
	ws *websocket.Conn
	// encoder is a JSON encoder to send outgoing packets to the websocket.
	encoder *json.Encoder
	// url the URL the websocket is connected to
	url string
	// origin is the origin for the websocket connection
	origin string
	// inbox is an incoming message channel
	inbox chan map[string]interface{}
	// errors is an incoming errors channel
	errors chan error
	// pingTimer is a timer for sending regular pings to the server
	pingTimer *time.Timer
	// pings tracks inflight pings based on each ping ID.
	pings map[string][]*pingTracker
	// calls tracks method invocations that are still in flight
	calls map[string]*Call
	// subs tracks active subscriptions. Map contains name->args
	subs map[string]*Call
	// collections contains all the collections currently subscribed
	collections map[string]Collection
	// connectionStatus is the current connection status of the client
	connectionStatus int
	// reconnectTimer is the timer tracking reconnections
	reconnectTimer *time.Timer
	// reconnectLock protects access to reconnection
	reconnectLock *sync.Mutex

	// statusListeners will be informed when the connection status of the client changes
	statusListeners []StatusListener
	// connectionListeners will be informed when a connection to the server is established
	connectionListeners []ConnectionListener

	// idManager tracks IDs for ddp messages
	idManager
}

// NewClient creates a default client (using an internal websocket) to the
// provided URL using the origin for the connection. The client will
// automatically connect, upgrade to a websocket, and establish a DDP
// connection session before returning the client. The client will
// automatically and internally handle heartbeats and reconnects.
//
// TBD create an option to use an external websocket (aka htt.Transport)
// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport)
// TBD create an option to hijack the connection (aka http.Hijacker)
// TBD create profiling features (aka net/http/pprof)
func NewClient(url, origin string) *Client {
	c := &Client{
		HeartbeatInterval: time.Minute,      // Meteor impl default + 10 (we ping last)
		HeartbeatTimeout:  15 * time.Second, // Meteor impl default
		ReconnectInterval: 5 * time.Second,
		collections:       map[string]Collection{},
		url:               url,
		origin:            origin,
		inbox:             make(chan map[string]interface{}, 100),
		errors:            make(chan error, 100),
		pings:             map[string][]*pingTracker{},
		calls:             map[string]*Call{},
		subs:              map[string]*Call{},
		connectionStatus:  DISCONNECTED,
		reconnectLock:     &sync.Mutex{},

		// Stats
		writeSocketStats: NewWriterStats(nil),
		writeStats:       NewWriterStats(nil),
		readSocketStats:  NewReaderStats(nil),
		readStats:        NewReaderStats(nil),

		// Loggers
		writeLog: NewWriterTextLogger(nil),
		readLog:  NewReaderTextLogger(nil),

		idManager: *newidManager(),
	}
	c.encoder = json.NewEncoder(c.writeStats)
	c.SetSocketLogActive(false)

	// We spin off an inbox processing goroutine
	go c.inboxManager()

	return c
}

// Session returns the negotiated session token for the connection.
func (c *Client) Session() string {
	return c.session
}

// Version returns the negotiated protocol version in use by the client.
func (c *Client) Version() string {
	return c.version
}

// AddStatusListener in order to receive status change updates.
func (c *Client) AddStatusListener(listener StatusListener) {
	c.statusListeners = append(c.statusListeners, listener)
}

// AddConnectionListener in order to receive connection updates.
func (c *Client) AddConnectionListener(listener ConnectionListener) {
	c.connectionListeners = append(c.connectionListeners, listener)
}

// status updates all status listeners with the new client status.
func (c *Client) status(status int) {
	if c.connectionStatus == status {
		return
	}
	c.connectionStatus = status
	for _, listener := range c.statusListeners {
		listener.Status(status)
	}
}

// Connect attempts to connect the client to the server.
func (c *Client) Connect() error {
	c.status(DIALING)
	ws, err := websocket.Dial(c.url, "", c.origin)
	if err != nil {
		c.Close()
		log.Println("Dial error", err)
		c.reconnectLater()
		return err
	}
	// Start DDP connection
	c.start(ws, NewConnect())
	return nil
}

// Reconnect attempts to reconnect the client to the server on the existing
// DDP session.
//
// TODO needs a reconnect backoff so we don't trash a down server
// TODO reconnect should not allow more reconnects while a reconnection is already in progress.
func (c *Client) Reconnect() {
	func() {
		c.reconnectLock.Lock()
		defer c.reconnectLock.Unlock()
		if c.reconnectTimer != nil {
			c.reconnectTimer.Stop()
			c.reconnectTimer = nil
		}
	}()

	c.Close()

	c.reconnects++

	// Reconnect
	c.status(DIALING)
	ws, err := websocket.Dial(c.url, "", c.origin)
	if err != nil {
		c.Close()
		log.Println("Dial error", err)
		c.reconnectLater()
		return
	}

	c.start(ws, NewReconnect(c.session))

	// --------------------------------------------------------------------
	// We resume inflight or ongoing subscriptions - we don't have to wait
	// for connection confirmation (messages can be pipelined).
	// --------------------------------------------------------------------

	// Send calls that haven't been confirmed - may not have been sent
	// and effects should be idempotent
	for _, call := range c.calls {
		c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{})))
	}

	// Resend subscriptions and patch up collections
	for _, sub := range c.subs {
		c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{})))
	}
}

// Subscribe subscribes to data updates.
func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call {

	if args == nil {
		args = []interface{}{}
	}
	call := new(Call)
	call.ID = c.newID()
	call.ServiceMethod = subName
	call.Args = args
	call.Owner = c

	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel.  If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("ddp.rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	c.subs[call.ID] = call

	// Save this subscription to the client so we can reconnect
	subArgs := make([]interface{}, len(args))
	copy(subArgs, args)

	c.Send(NewSub(call.ID, subName, args))

	return call
}

// Sub sends a synchronous subscription request to the server.
func (c *Client) Sub(subName string, args ...interface{}) error {
	call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done
	return call.Error
}

// Go invokes the function asynchronously.  It returns the Call structure representing
// the invocation.  The done channel will signal when the call is complete by returning
// the same Call object.  If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
//
// Go and Call are modeled after the standard `net/rpc` package versions.
func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call {

	if args == nil {
		args = []interface{}{}
	}
	call := new(Call)
	call.ID = c.newID()
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Owner = c
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel.  If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("ddp.rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	c.calls[call.ID] = call

	c.Send(NewMethod(call.ID, serviceMethod, args))

	return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) {
	call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done
	return call.Reply, call.Error
}

// Ping sends a heartbeat signal to the server. The Ping doesn't look for
// a response but may trigger the connection to reconnect if the ping timesout.
// This is primarily useful for reviving an unresponsive Client connection.
func (c *Client) Ping() {
	c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) {
		if err != nil {
			// Is there anything else we should or can do?
			c.reconnectLater()
		}
	})
}

// PingPong sends a heartbeat signal to the server and calls the provided
// function when a pong is received. An optional id can be sent to help
// track the responses - or an empty string can be used. It is the
// responsibility of the caller to respond to any errors that may occur.
func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) {
	err := c.Send(NewPing(id))
	if err != nil {
		handler(err)
		return
	}
	c.pingsOut++
	pings, ok := c.pings[id]
	if !ok {
		pings = make([]*pingTracker, 0, 5)
	}
	tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() {
		handler(fmt.Errorf("ping timeout"))
	})}
	c.pings[id] = append(pings, tracker)
}

// Send transmits messages to the server. The msg parameter must be json
// encoder compatible.
func (c *Client) Send(msg interface{}) error {
	return c.encoder.Encode(msg)
}

// Close implements the io.Closer interface.
func (c *Client) Close() {
	// Shutdown out all outstanding pings
	if c.pingTimer != nil {
		c.pingTimer.Stop()
		c.pingTimer = nil
	}

	// Close websocket
	if c.ws != nil {
		c.ws.Close()
		c.ws = nil
	}
	for _, collection := range c.collections {
		collection.reset()
	}
	c.status(DISCONNECTED)
}

// ResetStats resets the statistics for the client.
func (c *Client) ResetStats() {
	c.readSocketStats.Reset()
	c.readStats.Reset()
	c.writeSocketStats.Reset()
	c.writeStats.Reset()
	c.reconnects = 0
	c.pingsIn = 0
	c.pingsOut = 0
}

// Stats returns the read and write statistics of the client.
func (c *Client) Stats() *ClientStats {
	return &ClientStats{
		Reads:       c.readSocketStats.Snapshot(),
		TotalReads:  c.readStats.Snapshot(),
		Writes:      c.writeSocketStats.Snapshot(),
		TotalWrites: c.writeStats.Snapshot(),
		Reconnects:  c.reconnects,
		PingsSent:   c.pingsOut,
		PingsRecv:   c.pingsIn,
	}
}

// SocketLogActive returns the current logging status for the socket.
func (c *Client) SocketLogActive() bool {
	return c.writeLog.Active
}

// SetSocketLogActive to true to enable logging of raw socket data.
func (c *Client) SetSocketLogActive(active bool) {
	c.writeLog.Active = active
	c.readLog.Active = active
}

// CollectionByName retrieves a collection by it's name.
func (c *Client) CollectionByName(name string) Collection {
	collection, ok := c.collections[name]
	if !ok {
		collection = NewCollection(name)
		c.collections[name] = collection
	}
	return collection
}

// CollectionStats returns a snapshot of statistics for the currently known collections.
func (c *Client) CollectionStats() []CollectionStats {
	stats := make([]CollectionStats, 0, len(c.collections))
	for name, collection := range c.collections {
		stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())})
	}
	return stats
}

// start starts a new client connection on the provided websocket
func (c *Client) start(ws *websocket.Conn, connect *Connect) {

	c.status(CONNECTING)

	c.ws = ws
	c.writeLog.SetWriter(ws)
	c.writeSocketStats = NewWriterStats(c.writeLog)
	c.writeStats.SetWriter(c.writeSocketStats)
	c.readLog.SetReader(ws)
	c.readSocketStats = NewReaderStats(c.readLog)
	c.readStats.SetReader(c.readSocketStats)

	// We spin off an inbox stuffing goroutine
	go c.inboxWorker(c.readStats)

	c.Send(connect)
}

// inboxManager pulls messages from the inbox and routes them to appropriate
// handlers.
func (c *Client) inboxManager() {
	for {
		select {
		case msg := <-c.inbox:
			// Message!
			//log.Println("Got message", msg)
			mtype, ok := msg["msg"]
			if ok {
				switch mtype.(string) {
				// Connection management
				case "connected":
					c.status(CONNECTED)
					for _, collection := range c.collections {
						collection.init()
					}
					c.version = "1" // Currently the only version we support
					c.session = msg["session"].(string)
					// Start automatic heartbeats
					c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() {
						c.Ping()
						c.pingTimer.Reset(c.HeartbeatInterval)
					})
					// Notify connection listeners
					for _, listener := range c.connectionListeners {
						go listener.Connected()
					}
				case "failed":
					log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"])

				// Heartbeats
				case "ping":
					// We received a ping - need to respond with a pong
					id, ok := msg["id"]
					if ok {
						c.Send(NewPong(id.(string)))
					} else {
						c.Send(NewPong(""))
					}
					c.pingsIn++
				case "pong":
					// We received a pong - we can clear the ping tracker and call its handler
					id, ok := msg["id"]
					var key string
					if ok {
						key = id.(string)
					}
					pings, ok := c.pings[key]
					if ok && len(pings) > 0 {
						ping := pings[0]
						pings = pings[1:]
						if len(key) == 0 || len(pings) > 0 {
							c.pings[key] = pings
						}
						ping.timer.Stop()
						ping.handler(nil)
					}

				// Live Data
				case "nosub":
					log.Println("Subscription returned a nosub error", msg)
					// Clear related subscriptions
					sub, ok := msg["id"]
					if ok {
						id := sub.(string)
						runningSub := c.subs[id]

						if runningSub != nil {
							runningSub.Error = errors.New("Subscription returned a nosub error")
							runningSub.done()
							delete(c.subs, id)
						}
					}
				case "ready":
					// Run 'done' callbacks on all ready subscriptions
					subs, ok := msg["subs"]
					if ok {
						for _, sub := range subs.([]interface{}) {
							call, ok := c.subs[sub.(string)]
							if ok {
								call.done()
							}
						}
					}
				case "added":
					c.collectionBy(msg).added(msg)
				case "changed":
					c.collectionBy(msg).changed(msg)
				case "removed":
					c.collectionBy(msg).removed(msg)
				case "addedBefore":
					c.collectionBy(msg).addedBefore(msg)
				case "movedBefore":
					c.collectionBy(msg).movedBefore(msg)

				// RPC
				case "result":
					id, ok := msg["id"]
					if ok {
						call := c.calls[id.(string)]
						delete(c.calls, id.(string))
						e, ok := msg["error"]
						if ok {
							txt, _ := json.Marshal(e)
							call.Error = fmt.Errorf(string(txt))
							call.Reply = e
						} else {
							call.Reply = msg["result"]
						}
						call.done()
					}
				case "updated":
					// We currently don't do anything with updated status

				default:
					// Ignore?
					log.Println("Server sent unexpected message", msg)
				}
			} else {
				// Current Meteor server sends an undocumented DDP message
				// (looks like clustering "hint"). We will register and
				// ignore rather than log an error.
				serverID, ok := msg["server_id"]
				if ok {
					switch ID := serverID.(type) {
					case string:
						c.serverID = ID
					default:
						log.Println("Server cluster node", serverID)
					}
				} else {
					log.Println("Server sent message with no `msg` field", msg)
				}
			}
		case err := <-c.errors:
			log.Println("Websocket error", err)
		}
	}
}

func (c *Client) collectionBy(msg map[string]interface{}) Collection {
	n, ok := msg["collection"]
	if !ok {
		return NewMockCollection()
	}
	switch name := n.(type) {
	case string:
		return c.CollectionByName(name)
	default:
		return NewMockCollection()
	}
}

// inboxWorker pulls messages from a websocket, decodes JSON packets, and
// stuffs them into a message channel.
func (c *Client) inboxWorker(ws io.Reader) {
	dec := json.NewDecoder(ws)
	for {
		var event interface{}

		if err := dec.Decode(&event); err == io.EOF {
			break
		} else if err != nil {
			c.errors <- err
		}
		if c.pingTimer != nil {
			c.pingTimer.Reset(c.HeartbeatInterval)
		}
		if event == nil {
			log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
			break
		} else {
			c.inbox <- event.(map[string]interface{})
		}
	}

	c.reconnectLater()
}

// reconnectLater schedules a reconnect for later. We need to make sure that we don't
// block, and that we don't reconnect more frequently than once every c.ReconnectInterval
func (c *Client) reconnectLater() {
	c.Close()
	c.reconnectLock.Lock()
	defer c.reconnectLock.Unlock()
	if c.reconnectTimer == nil {
		c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect)
	}
}