package ddp

import (
	"encoding/json"
	"fmt"
	"io"
	"log"
	"sync"
	"time"

	"golang.org/x/net/websocket"
	"errors"
)

const (
	DISCONNECTED = iota
	DIALING
	CONNECTING
	CONNECTED
)

type ConnectionListener interface {
	Connected()
}

type ConnectionNotifier interface {
	AddConnectionListener(listener ConnectionListener)
}

type StatusListener interface {
	Status(status int)
}

type StatusNotifier interface {
	AddStatusListener(listener StatusListener)
}

// Client represents a DDP client connection. The DDP client establish a DDP
// session and acts as a message pump for other tools.
type Client struct {
	// HeartbeatInterval is the time between heartbeats to send
	HeartbeatInterval time.Duration
	// HeartbeatTimeout is the time for a heartbeat ping to timeout
	HeartbeatTimeout time.Duration
	// ReconnectInterval is the time between reconnections on bad connections
	ReconnectInterval time.Duration

	// writeStats controls statistics gathering for current websocket writes.
	writeSocketStats *WriterStats
	// writeStats controls statistics gathering for overall client writes.
	writeStats *WriterStats
	// writeLog controls logging for client writes.
	writeLog *WriterLogger
	// readStats controls statistics gathering for current websocket reads.
	readSocketStats *ReaderStats
	// readStats controls statistics gathering for overall client reads.
	readStats *ReaderStats
	// readLog control logging for clietn reads.
	readLog *ReaderLogger
	// reconnects in the number of reconnections the client has made
	reconnects int64
	// pingsIn is the number of pings received from the server
	pingsIn int64
	// pingsOut is te number of pings sent by the client
	pingsOut int64

	// session contains the DDP session token (can be used for reconnects and debugging).
	session string
	// version contains the negotiated DDP protocol version in use.
	version string
	// serverID the cluster node ID for the server we connected to
	serverID string
	// ws is the underlying websocket being used.
	ws *websocket.Conn
	// encoder is a JSON encoder to send outgoing packets to the websocket.
	encoder *json.Encoder
	// url the URL the websocket is connected to
	url string
	// origin is the origin for the websocket connection
	origin string
	// inbox is an incoming message channel
	inbox chan map[string]interface{}
	// errors is an incoming errors channel
	errors chan error
	// pingTimer is a timer for sending regular pings to the server
	pingTimer *time.Timer
	// pings tracks inflight pings based on each ping ID.
	pings map[string][]*pingTracker
	// calls tracks method invocations that are still in flight
	calls map[string]*Call
	// subs tracks active subscriptions. Map contains name->args
	subs map[string]*Call
	// collections contains all the collections currently subscribed
	collections map[string]Collection
	// connectionStatus is the current connection status of the client
	connectionStatus int
	// reconnectTimer is the timer tracking reconnections
	reconnectTimer *time.Timer
	// reconnectLock protects access to reconnection
	reconnectLock *sync.Mutex

	// statusListeners will be informed when the connection status of the client changes
	statusListeners []StatusListener
	// connectionListeners will be informed when a connection to the server is established
	connectionListeners []ConnectionListener

	// idManager tracks IDs for ddp messages
	idManager
}

// NewClient creates a default client (using an internal websocket) to the
// provided URL using the origin for the connection. The client will
// automatically connect, upgrade to a websocket, and establish a DDP
// connection session before returning the client. The client will
// automatically and internally handle heartbeats and reconnects.
//
// TBD create an option to use an external websocket (aka htt.Transport)
// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport)
// TBD create an option to hijack the connection (aka http.Hijacker)
// TBD create profiling features (aka net/http/pprof)
func NewClient(url, origin string) *Client {
	c := &Client{
		HeartbeatInterval: time.Minute,      // Meteor impl default + 10 (we ping last)
		HeartbeatTimeout:  15 * time.Second, // Meteor impl default
		ReconnectInterval: 5 * time.Second,
		collections:       map[string]Collection{},
		url:               url,
		origin:            origin,
		inbox:             make(chan map[string]interface{}, 100),
		errors:            make(chan error, 100),
		pings:             map[string][]*pingTracker{},
		calls:             map[string]*Call{},
		subs:              map[string]*Call{},
		connectionStatus:  DISCONNECTED,
		reconnectLock:     &sync.Mutex{},

		// Stats
		writeSocketStats: NewWriterStats(nil),
		writeStats:       NewWriterStats(nil),
		readSocketStats:  NewReaderStats(nil),
		readStats:        NewReaderStats(nil),

		// Loggers
		writeLog: NewWriterTextLogger(nil),
		readLog:  NewReaderTextLogger(nil),

		idManager: *newidManager(),
	}
	c.encoder = json.NewEncoder(c.writeStats)
	c.SetSocketLogActive(false)

	// We spin off an inbox processing goroutine
	go c.inboxManager()

	return c
}

// Session returns the negotiated session token for the connection.
func (c *Client) Session() string {
	return c.session
}

// Version returns the negotiated protocol version in use by the client.
func (c *Client) Version() string {
	return c.version
}

// AddStatusListener in order to receive status change updates.
func (c *Client) AddStatusListener(listener StatusListener) {
	c.statusListeners = append(c.statusListeners, listener)
}

// AddConnectionListener in order to receive connection updates.
func (c *Client) AddConnectionListener(listener ConnectionListener) {
	c.connectionListeners = append(c.connectionListeners, listener)
}

// status updates all status listeners with the new client status.
func (c *Client) status(status int) {
	if c.connectionStatus == status {
		return
	}
	c.connectionStatus = status
	for _, listener := range c.statusListeners {
		listener.Status(status)
	}
}

// Connect attempts to connect the client to the server.
func (c *Client) Connect() error {
	c.status(DIALING)
	ws, err := websocket.Dial(c.url, "", c.origin)
	if err != nil {
		c.Close()
		log.Println("Dial error", err)
		c.reconnectLater()
		return err
	}
	// Start DDP connection
	c.start(ws, NewConnect())
	return nil
}

// Reconnect attempts to reconnect the client to the server on the existing
// DDP session.
//
// TODO needs a reconnect backoff so we don't trash a down server
// TODO reconnect should not allow more reconnects while a reconnection is already in progress.
func (c *Client) Reconnect() {
	func() {
		c.reconnectLock.Lock()
		defer c.reconnectLock.Unlock()
		if c.reconnectTimer != nil {
			c.reconnectTimer.Stop()
			c.reconnectTimer = nil
		}
	}()

	c.Close()

	c.reconnects++

	// Reconnect
	c.status(DIALING)
	ws, err := websocket.Dial(c.url, "", c.origin)
	if err != nil {
		c.Close()
		log.Println("Dial error", err)
		c.reconnectLater()
		return
	}

	c.start(ws, NewReconnect(c.session))

	// --------------------------------------------------------------------
	// We resume inflight or ongoing subscriptions - we don't have to wait
	// for connection confirmation (messages can be pipelined).
	// --------------------------------------------------------------------

	// Send calls that haven't been confirmed - may not have been sent
	// and effects should be idempotent
	for _, call := range c.calls {
		c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{})))
	}

	// Resend subscriptions and patch up collections
	for _, sub := range c.subs {
		c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{})))
	}
}

// Subscribe subscribes to data updates.
func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call {

	if args == nil {
		args = []interface{}{}
	}
	call := new(Call)
	call.ID = c.newID()
	call.ServiceMethod = subName
	call.Args = args
	call.Owner = c

	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel.  If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("ddp.rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	c.subs[call.ID] = call

	// Save this subscription to the client so we can reconnect
	subArgs := make([]interface{}, len(args))
	copy(subArgs, args)

	c.Send(NewSub(call.ID, subName, args))

	return call
}

// Sub sends a synchronous subscription request to the server.
func (c *Client) Sub(subName string, args ...interface{}) error {
	call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done
	return call.Error
}

// Go invokes the function asynchronously.  It returns the Call structure representing
// the invocation.  The done channel will signal when the call is complete by returning
// the same Call object.  If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
//
// Go and Call are modeled after the standard `net/rpc` package versions.
func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call {

	if args == nil {
		args = []interface{}{}
	}
	call := new(Call)
	call.ID = c.newID()
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Owner = c
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel.  If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("ddp.rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	c.calls[call.ID] = call

	c.Send(NewMethod(call.ID, serviceMethod, args))

	return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) {
	call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done
	return call.Reply, call.Error
}

// Ping sends a heartbeat signal to the server. The Ping doesn't look for
// a response but may trigger the connection to reconnect if the ping timesout.
// This is primarily useful for reviving an unresponsive Client connection.
func (c *Client) Ping() {
	c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) {
		if err != nil {
			// Is there anything else we should or can do?
			c.reconnectLater()
		}
	})
}

// PingPong sends a heartbeat signal to the server and calls the provided
// function when a pong is received. An optional id can be sent to help
// track the responses - or an empty string can be used. It is the
// responsibility of the caller to respond to any errors that may occur.
func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) {
	err := c.Send(NewPing(id))
	if err != nil {
		handler(err)
		return
	}
	c.pingsOut++
	pings, ok := c.pings[id]
	if !ok {
		pings = make([]*pingTracker, 0, 5)
	}
	tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() {
		handler(fmt.Errorf("ping timeout"))
	})}
	c.pings[id] = append(pings, tracker)
}

// Send transmits messages to the server. The msg parameter must be json
// encoder compatible.
func (c *Client) Send(msg interface{}) error {
	return c.encoder.Encode(msg)
}

// Close implements the io.Closer interface.
func (c *Client) Close() {
	// Shutdown out all outstanding pings
	if c.pingTimer != nil {
		c.pingTimer.Stop()
		c.pingTimer = nil
	}

	// Close websocket
	if c.ws != nil {
		c.ws.Close()
		c.ws = nil
	}
	for _, collection := range c.collections {
		collection.reset()
	}
	c.status(DISCONNECTED)
}

// ResetStats resets the statistics for the client.
func (c *Client) ResetStats() {
	c.readSocketStats.Reset()
	c.readStats.Reset()
	c.writeSocketStats.Reset()
	c.writeStats.Reset()
	c.reconnects = 0
	c.pingsIn = 0
	c.pingsOut = 0
}

// Stats returns the read and write statistics of the client.
func (c *Client) Stats() *ClientStats {
	return &ClientStats{
		Reads:       c.readSocketStats.Snapshot(),
		TotalReads:  c.readStats.Snapshot(),
		Writes:      c.writeSocketStats.Snapshot(),
		TotalWrites: c.writeStats.Snapshot(),
		Reconnects:  c.reconnects,
		PingsSent:   c.pingsOut,
		PingsRecv:   c.pingsIn,
	}
}

// SocketLogActive returns the current logging status for the socket.
func (c *Client) SocketLogActive() bool {
	return c.writeLog.Active
}

// SetSocketLogActive to true to enable logging of raw socket data.
func (c *Client) SetSocketLogActive(active bool) {
	c.writeLog.Active = active
	c.readLog.Active = active
}

// CollectionByName retrieves a collection by it's name.
func (c *Client) CollectionByName(name string) Collection {
	collection, ok := c.collections[name]
	if !ok {
		collection = NewCollection(name)
		c.collections[name] = collection
	}
	return collection
}

// CollectionStats returns a snapshot of statistics for the currently known collections.
func (c *Client) CollectionStats() []CollectionStats {
	stats := make([]CollectionStats, 0, len(c.collections))
	for name, collection := range c.collections {
		stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())})
	}
	return stats
}

// start starts a new client connection on the provided websocket
func (c *Client) start(ws *websocket.Conn, connect *Connect) {

	c.status(CONNECTING)

	c.ws = ws
	c.writeLog.SetWriter(ws)
	c.writeSocketStats = NewWriterStats(c.writeLog)
	c.writeStats.SetWriter(c.writeSocketStats)
	c.readLog.SetReader(ws)
	c.readSocketStats = NewReaderStats(c.readLog)
	c.readStats.SetReader(c.readSocketStats)

	// We spin off an inbox stuffing goroutine
	go c.inboxWorker(c.readStats)

	c.Send(connect)
}

// inboxManager pulls messages from the inbox and routes them to appropriate
// handlers.
func (c *Client) inboxManager() {
	for {
		select {
		case msg := <-c.inbox:
			// Message!
			//log.Println("Got message", msg)
			mtype, ok := msg["msg"]
			if ok {
				switch mtype.(string) {
				// Connection management
				case "connected":
					c.status(CONNECTED)
					for _, collection := range c.collections {
						collection.init()
					}
					c.version = "1" // Currently the only version we support
					c.session = msg["session"].(string)
					// Start automatic heartbeats
					c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() {
						c.Ping()
						c.pingTimer.Reset(c.HeartbeatInterval)
					})
					// Notify connection listeners
					for _, listener := range c.connectionListeners {
						go listener.Connected()
					}
				case "failed":
					log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"])

				// Heartbeats
				case "ping":
					// We received a ping - need to respond with a pong
					id, ok := msg["id"]
					if ok {
						c.Send(NewPong(id.(string)))
					} else {
						c.Send(NewPong(""))
					}
					c.pingsIn++
				case "pong":
					// We received a pong - we can clear the ping tracker and call its handler
					id, ok := msg["id"]
					var key string
					if ok {
						key = id.(string)
					}
					pings, ok := c.pings[key]
					if ok && len(pings) > 0 {
						ping := pings[0]
						pings = pings[1:]
						if len(key) == 0 || len(pings) > 0 {
							c.pings[key] = pings
						}
						ping.timer.Stop()
						ping.handler(nil)
					}

				// Live Data
				case "nosub":
					log.Println("Subscription returned a nosub error", msg)
					// Clear related subscriptions
					sub, ok := msg["id"]
					if ok {
						id := sub.(string)
						runningSub := c.subs[id]

						if runningSub != nil {
							runningSub.Error = errors.New("Subscription returned a nosub error")
							runningSub.done()
							delete(c.subs, id)
						}
					}
				case "ready":
					// Run 'done' callbacks on all ready subscriptions
					subs, ok := msg["subs"]
					if ok {
						for _, sub := range subs.([]interface{}) {
							call, ok := c.subs[sub.(string)]
							if ok {
								call.done()
							}
						}
					}
				case "added":
					c.collectionBy(msg).added(msg)
				case "changed":
					c.collectionBy(msg).changed(msg)
				case "removed":
					c.collectionBy(msg).removed(msg)
				case "addedBefore":
					c.collectionBy(msg).addedBefore(msg)
				case "movedBefore":
					c.collectionBy(msg).movedBefore(msg)

				// RPC
				case "result":
					id, ok := msg["id"]
					if ok {
						call := c.calls[id.(string)]
						delete(c.calls, id.(string))
						e, ok := msg["error"]
						if ok {
							txt, _ := json.Marshal(e)
							call.Error = fmt.Errorf(string(txt))
							call.Reply = e
						} else {
							call.Reply = msg["result"]
						}
						call.done()
					}
				case "updated":
					// We currently don't do anything with updated status

				default:
					// Ignore?
					log.Println("Server sent unexpected message", msg)
				}
			} else {
				// Current Meteor server sends an undocumented DDP message
				// (looks like clustering "hint"). We will register and
				// ignore rather than log an error.
				serverID, ok := msg["server_id"]
				if ok {
					switch ID := serverID.(type) {
					case string:
						c.serverID = ID
					default:
						log.Println("Server cluster node", serverID)
					}
				} else {
					log.Println("Server sent message with no `msg` field", msg)
				}
			}
		case err := <-c.errors:
			log.Println("Websocket error", err)
		}
	}
}

func (c *Client) collectionBy(msg map[string]interface{}) Collection {
	n, ok := msg["collection"]
	if !ok {
		return NewMockCollection()
	}
	switch name := n.(type) {
	case string:
		return c.CollectionByName(name)
	default:
		return NewMockCollection()
	}
}

// inboxWorker pulls messages from a websocket, decodes JSON packets, and
// stuffs them into a message channel.
func (c *Client) inboxWorker(ws io.Reader) {
	dec := json.NewDecoder(ws)
	for {
		var event interface{}

		if err := dec.Decode(&event); err == io.EOF {
			break
		} else if err != nil {
			c.errors <- err
		}
		if c.pingTimer != nil {
			c.pingTimer.Reset(c.HeartbeatInterval)
		}
		if event == nil {
			log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
			break
		} else {
			c.inbox <- event.(map[string]interface{})
		}
	}

	c.reconnectLater()
}

// reconnectLater schedules a reconnect for later. We need to make sure that we don't
// block, and that we don't reconnect more frequently than once every c.ReconnectInterval
func (c *Client) reconnectLater() {
	c.Close()
	c.reconnectLock.Lock()
	defer c.reconnectLock.Unlock()
	if c.reconnectTimer == nil {
		c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect)
	}
}