package ddp import ( "encoding/json" "fmt" "io" "log" "sync" "time" "golang.org/x/net/websocket" "errors" ) const ( DISCONNECTED = iota DIALING CONNECTING CONNECTED ) type ConnectionListener interface { Connected() } type ConnectionNotifier interface { AddConnectionListener(listener ConnectionListener) } type StatusListener interface { Status(status int) } type StatusNotifier interface { AddStatusListener(listener StatusListener) } // Client represents a DDP client connection. The DDP client establish a DDP // session and acts as a message pump for other tools. type Client struct { // HeartbeatInterval is the time between heartbeats to send HeartbeatInterval time.Duration // HeartbeatTimeout is the time for a heartbeat ping to timeout HeartbeatTimeout time.Duration // ReconnectInterval is the time between reconnections on bad connections ReconnectInterval time.Duration // writeStats controls statistics gathering for current websocket writes. writeSocketStats *WriterStats // writeStats controls statistics gathering for overall client writes. writeStats *WriterStats // writeLog controls logging for client writes. writeLog *WriterLogger // readStats controls statistics gathering for current websocket reads. readSocketStats *ReaderStats // readStats controls statistics gathering for overall client reads. readStats *ReaderStats // readLog control logging for clietn reads. readLog *ReaderLogger // reconnects in the number of reconnections the client has made reconnects int64 // pingsIn is the number of pings received from the server pingsIn int64 // pingsOut is te number of pings sent by the client pingsOut int64 // session contains the DDP session token (can be used for reconnects and debugging). session string // version contains the negotiated DDP protocol version in use. version string // serverID the cluster node ID for the server we connected to serverID string // ws is the underlying websocket being used. ws *websocket.Conn // encoder is a JSON encoder to send outgoing packets to the websocket. encoder *json.Encoder // url the URL the websocket is connected to url string // origin is the origin for the websocket connection origin string // inbox is an incoming message channel inbox chan map[string]interface{} // errors is an incoming errors channel errors chan error // pingTimer is a timer for sending regular pings to the server pingTimer *time.Timer // pings tracks inflight pings based on each ping ID. pings map[string][]*pingTracker // calls tracks method invocations that are still in flight calls map[string]*Call // subs tracks active subscriptions. Map contains name->args subs map[string]*Call // collections contains all the collections currently subscribed collections map[string]Collection // connectionStatus is the current connection status of the client connectionStatus int // reconnectTimer is the timer tracking reconnections reconnectTimer *time.Timer // reconnectLock protects access to reconnection reconnectLock *sync.Mutex // statusListeners will be informed when the connection status of the client changes statusListeners []StatusListener // connectionListeners will be informed when a connection to the server is established connectionListeners []ConnectionListener // idManager tracks IDs for ddp messages idManager } // NewClient creates a default client (using an internal websocket) to the // provided URL using the origin for the connection. The client will // automatically connect, upgrade to a websocket, and establish a DDP // connection session before returning the client. The client will // automatically and internally handle heartbeats and reconnects. // // TBD create an option to use an external websocket (aka htt.Transport) // TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport) // TBD create an option to hijack the connection (aka http.Hijacker) // TBD create profiling features (aka net/http/pprof) func NewClient(url, origin string) *Client { c := &Client{ HeartbeatInterval: time.Minute, // Meteor impl default + 10 (we ping last) HeartbeatTimeout: 15 * time.Second, // Meteor impl default ReconnectInterval: 5 * time.Second, collections: map[string]Collection{}, url: url, origin: origin, inbox: make(chan map[string]interface{}, 100), errors: make(chan error, 100), pings: map[string][]*pingTracker{}, calls: map[string]*Call{}, subs: map[string]*Call{}, connectionStatus: DISCONNECTED, reconnectLock: &sync.Mutex{}, // Stats writeSocketStats: NewWriterStats(nil), writeStats: NewWriterStats(nil), readSocketStats: NewReaderStats(nil), readStats: NewReaderStats(nil), // Loggers writeLog: NewWriterTextLogger(nil), readLog: NewReaderTextLogger(nil), idManager: *newidManager(), } c.encoder = json.NewEncoder(c.writeStats) c.SetSocketLogActive(false) // We spin off an inbox processing goroutine go c.inboxManager() return c } // Session returns the negotiated session token for the connection. func (c *Client) Session() string { return c.session } // Version returns the negotiated protocol version in use by the client. func (c *Client) Version() string { return c.version } // AddStatusListener in order to receive status change updates. func (c *Client) AddStatusListener(listener StatusListener) { c.statusListeners = append(c.statusListeners, listener) } // AddConnectionListener in order to receive connection updates. func (c *Client) AddConnectionListener(listener ConnectionListener) { c.connectionListeners = append(c.connectionListeners, listener) } // status updates all status listeners with the new client status. func (c *Client) status(status int) { if c.connectionStatus == status { return } c.connectionStatus = status for _, listener := range c.statusListeners { listener.Status(status) } } // Connect attempts to connect the client to the server. func (c *Client) Connect() error { c.status(DIALING) ws, err := websocket.Dial(c.url, "", c.origin) if err != nil { c.Close() log.Println("Dial error", err) c.reconnectLater() return err } // Start DDP connection c.start(ws, NewConnect()) return nil } // Reconnect attempts to reconnect the client to the server on the existing // DDP session. // // TODO needs a reconnect backoff so we don't trash a down server // TODO reconnect should not allow more reconnects while a reconnection is already in progress. func (c *Client) Reconnect() { func() { c.reconnectLock.Lock() defer c.reconnectLock.Unlock() if c.reconnectTimer != nil { c.reconnectTimer.Stop() c.reconnectTimer = nil } }() c.Close() c.reconnects++ // Reconnect c.status(DIALING) ws, err := websocket.Dial(c.url, "", c.origin) if err != nil { c.Close() log.Println("Dial error", err) c.reconnectLater() return } c.start(ws, NewReconnect(c.session)) // -------------------------------------------------------------------- // We resume inflight or ongoing subscriptions - we don't have to wait // for connection confirmation (messages can be pipelined). // -------------------------------------------------------------------- // Send calls that haven't been confirmed - may not have been sent // and effects should be idempotent for _, call := range c.calls { c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{}))) } // Resend subscriptions and patch up collections for _, sub := range c.subs { c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{}))) } } // Subscribe subscribes to data updates. func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call { if args == nil { args = []interface{}{} } call := new(Call) call.ID = c.newID() call.ServiceMethod = subName call.Args = args call.Owner = c if done == nil { done = make(chan *Call, 10) // buffered. } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap(done) == 0 { log.Panic("ddp.rpc: done channel is unbuffered") } } call.Done = done c.subs[call.ID] = call // Save this subscription to the client so we can reconnect subArgs := make([]interface{}, len(args)) copy(subArgs, args) c.Send(NewSub(call.ID, subName, args)) return call } // Sub sends a synchronous subscription request to the server. func (c *Client) Sub(subName string, args ...interface{}) error { call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done return call.Error } // Go invokes the function asynchronously. It returns the Call structure representing // the invocation. The done channel will signal when the call is complete by returning // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. // // Go and Call are modeled after the standard `net/rpc` package versions. func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call { if args == nil { args = []interface{}{} } call := new(Call) call.ID = c.newID() call.ServiceMethod = serviceMethod call.Args = args call.Owner = c if done == nil { done = make(chan *Call, 10) // buffered. } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap(done) == 0 { log.Panic("ddp.rpc: done channel is unbuffered") } } call.Done = done c.calls[call.ID] = call c.Send(NewMethod(call.ID, serviceMethod, args)) return call } // Call invokes the named function, waits for it to complete, and returns its error status. func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) { call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done return call.Reply, call.Error } // Ping sends a heartbeat signal to the server. The Ping doesn't look for // a response but may trigger the connection to reconnect if the ping timesout. // This is primarily useful for reviving an unresponsive Client connection. func (c *Client) Ping() { c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) { if err != nil { // Is there anything else we should or can do? c.reconnectLater() } }) } // PingPong sends a heartbeat signal to the server and calls the provided // function when a pong is received. An optional id can be sent to help // track the responses - or an empty string can be used. It is the // responsibility of the caller to respond to any errors that may occur. func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) { err := c.Send(NewPing(id)) if err != nil { handler(err) return } c.pingsOut++ pings, ok := c.pings[id] if !ok { pings = make([]*pingTracker, 0, 5) } tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() { handler(fmt.Errorf("ping timeout")) })} c.pings[id] = append(pings, tracker) } // Send transmits messages to the server. The msg parameter must be json // encoder compatible. func (c *Client) Send(msg interface{}) error { return c.encoder.Encode(msg) } // Close implements the io.Closer interface. func (c *Client) Close() { // Shutdown out all outstanding pings if c.pingTimer != nil { c.pingTimer.Stop() c.pingTimer = nil } // Close websocket if c.ws != nil { c.ws.Close() c.ws = nil } for _, collection := range c.collections { collection.reset() } c.status(DISCONNECTED) } // ResetStats resets the statistics for the client. func (c *Client) ResetStats() { c.readSocketStats.Reset() c.readStats.Reset() c.writeSocketStats.Reset() c.writeStats.Reset() c.reconnects = 0 c.pingsIn = 0 c.pingsOut = 0 } // Stats returns the read and write statistics of the client. func (c *Client) Stats() *ClientStats { return &ClientStats{ Reads: c.readSocketStats.Snapshot(), TotalReads: c.readStats.Snapshot(), Writes: c.writeSocketStats.Snapshot(), TotalWrites: c.writeStats.Snapshot(), Reconnects: c.reconnects, PingsSent: c.pingsOut, PingsRecv: c.pingsIn, } } // SocketLogActive returns the current logging status for the socket. func (c *Client) SocketLogActive() bool { return c.writeLog.Active } // SetSocketLogActive to true to enable logging of raw socket data. func (c *Client) SetSocketLogActive(active bool) { c.writeLog.Active = active c.readLog.Active = active } // CollectionByName retrieves a collection by it's name. func (c *Client) CollectionByName(name string) Collection { collection, ok := c.collections[name] if !ok { collection = NewCollection(name) c.collections[name] = collection } return collection } // CollectionStats returns a snapshot of statistics for the currently known collections. func (c *Client) CollectionStats() []CollectionStats { stats := make([]CollectionStats, 0, len(c.collections)) for name, collection := range c.collections { stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())}) } return stats } // start starts a new client connection on the provided websocket func (c *Client) start(ws *websocket.Conn, connect *Connect) { c.status(CONNECTING) c.ws = ws c.writeLog.SetWriter(ws) c.writeSocketStats = NewWriterStats(c.writeLog) c.writeStats.SetWriter(c.writeSocketStats) c.readLog.SetReader(ws) c.readSocketStats = NewReaderStats(c.readLog) c.readStats.SetReader(c.readSocketStats) // We spin off an inbox stuffing goroutine go c.inboxWorker(c.readStats) c.Send(connect) } // inboxManager pulls messages from the inbox and routes them to appropriate // handlers. func (c *Client) inboxManager() { for { select { case msg := <-c.inbox: // Message! //log.Println("Got message", msg) mtype, ok := msg["msg"] if ok { switch mtype.(string) { // Connection management case "connected": c.status(CONNECTED) for _, collection := range c.collections { collection.init() } c.version = "1" // Currently the only version we support c.session = msg["session"].(string) // Start automatic heartbeats c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() { c.Ping() c.pingTimer.Reset(c.HeartbeatInterval) }) // Notify connection listeners for _, listener := range c.connectionListeners { go listener.Connected() } case "failed": log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"]) // Heartbeats case "ping": // We received a ping - need to respond with a pong id, ok := msg["id"] if ok { c.Send(NewPong(id.(string))) } else { c.Send(NewPong("")) } c.pingsIn++ case "pong": // We received a pong - we can clear the ping tracker and call its handler id, ok := msg["id"] var key string if ok { key = id.(string) } pings, ok := c.pings[key] if ok && len(pings) > 0 { ping := pings[0] pings = pings[1:] if len(key) == 0 || len(pings) > 0 { c.pings[key] = pings } ping.timer.Stop() ping.handler(nil) } // Live Data case "nosub": log.Println("Subscription returned a nosub error", msg) // Clear related subscriptions sub, ok := msg["id"] if ok { id := sub.(string) runningSub := c.subs[id] if runningSub != nil { runningSub.Error = errors.New("Subscription returned a nosub error") runningSub.done() delete(c.subs, id) } } case "ready": // Run 'done' callbacks on all ready subscriptions subs, ok := msg["subs"] if ok { for _, sub := range subs.([]interface{}) { call, ok := c.subs[sub.(string)] if ok { call.done() } } } case "added": c.collectionBy(msg).added(msg) case "changed": c.collectionBy(msg).changed(msg) case "removed": c.collectionBy(msg).removed(msg) case "addedBefore": c.collectionBy(msg).addedBefore(msg) case "movedBefore": c.collectionBy(msg).movedBefore(msg) // RPC case "result": id, ok := msg["id"] if ok { call := c.calls[id.(string)] delete(c.calls, id.(string)) e, ok := msg["error"] if ok { txt, _ := json.Marshal(e) call.Error = fmt.Errorf(string(txt)) call.Reply = e } else { call.Reply = msg["result"] } call.done() } case "updated": // We currently don't do anything with updated status default: // Ignore? log.Println("Server sent unexpected message", msg) } } else { // Current Meteor server sends an undocumented DDP message // (looks like clustering "hint"). We will register and // ignore rather than log an error. serverID, ok := msg["server_id"] if ok { switch ID := serverID.(type) { case string: c.serverID = ID default: log.Println("Server cluster node", serverID) } } else { log.Println("Server sent message with no `msg` field", msg) } } case err := <-c.errors: log.Println("Websocket error", err) } } } func (c *Client) collectionBy(msg map[string]interface{}) Collection { n, ok := msg["collection"] if !ok { return NewMockCollection() } switch name := n.(type) { case string: return c.CollectionByName(name) default: return NewMockCollection() } } // inboxWorker pulls messages from a websocket, decodes JSON packets, and // stuffs them into a message channel. func (c *Client) inboxWorker(ws io.Reader) { dec := json.NewDecoder(ws) for { var event interface{} if err := dec.Decode(&event); err == io.EOF { break } else if err != nil { c.errors <- err } if c.pingTimer != nil { c.pingTimer.Reset(c.HeartbeatInterval) } if event == nil { log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.") break } else { c.inbox <- event.(map[string]interface{}) } } c.reconnectLater() } // reconnectLater schedules a reconnect for later. We need to make sure that we don't // block, and that we don't reconnect more frequently than once every c.ReconnectInterval func (c *Client) reconnectLater() { c.Close() c.reconnectLock.Lock() defer c.reconnectLock.Unlock() if c.reconnectTimer == nil { c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect) } }