summaryrefslogblamecommitdiffstats
path: root/vendor/github.com/mrexodia/wray/go_faye.go
blob: ab78eed7f5acdc334e08dd944856d3f036fe2bfe (plain) (tree)










































































































































                                                                                                                                                         
package wray

import (
	"fmt"
	"path/filepath"
	"strings"
	"time"
)

const (
	UNCONNECTED  = 1
	CONNECTING   = 2
	CONNECTED    = 3
	DISCONNECTED = 4

	HANDSHAKE = "handshake"
	RETRY     = "retry"
	NONE      = "none"

	CONNECTION_TIMEOUT = 60.0
	DEFAULT_RETRY      = 5.0
	MAX_REQUEST_SIZE   = 2048
)

var (
	MANDATORY_CONNECTION_TYPES = []string{"long-polling"}
	registeredTransports       = []Transport{}
)

type FayeClient struct {
	state         int
	url           string
	subscriptions []Subscription
	transport     Transport
	clientId      string
	schedular     Schedular
}

type Subscription struct {
	channel  string
	callback func(Message)
}

type SubscriptionPromise struct {
	subscription Subscription
}

func NewFayeClient(url string) *FayeClient {
	schedular := ChannelSchedular{}
	client := &FayeClient{url: url, state: UNCONNECTED, schedular: schedular}
	return client
}

func (self *FayeClient) handshake() {
	t, err := SelectTransport(self, MANDATORY_CONNECTION_TYPES, []string{})
	if err != nil {
		panic("No usable transports available")
	}
	self.transport = t
	self.transport.setUrl(self.url)
	self.state = CONNECTING
	handshakeParams := map[string]interface{}{"channel": "/meta/handshake",
		"version":                  "1.0",
		"supportedConnectionTypes": []string{"long-polling"}}
	response, err := self.transport.send(handshakeParams)
	if err != nil {
		fmt.Println("Handshake failed. Retry in 10 seconds")
		self.state = UNCONNECTED
		self.schedular.wait(10*time.Second, func() {
			fmt.Println("retying handshake")
			self.handshake()
		})
		return
	}
	self.clientId = response.clientId
	self.state = CONNECTED
	self.transport, err = SelectTransport(self, response.supportedConnectionTypes, []string{})
	if err != nil {
		panic("Server does not support any available transports. Supported transports: " + strings.Join(response.supportedConnectionTypes, ","))
	}
}

func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) SubscriptionPromise {
	if self.state == UNCONNECTED {
		self.handshake()
	}
	subscriptionParams := map[string]interface{}{"channel": "/meta/subscribe", "clientId": self.clientId, "subscription": channel, "id": "1"}
	subscription := Subscription{channel: channel, callback: callback}
	//TODO: deal with subscription failures
	self.transport.send(subscriptionParams)
	self.subscriptions = append(self.subscriptions, subscription)
	return SubscriptionPromise{subscription}
}

func (self *FayeClient) handleResponse(response Response) {
	for _, message := range response.messages {
		for _, subscription := range self.subscriptions {
			matched, _ := filepath.Match(subscription.channel, message.Channel)
			if matched {
				go subscription.callback(message)
			}
		}
	}
}

func (self *FayeClient) connect() {
	connectParams := map[string]interface{}{"channel": "/meta/connect", "clientId": self.clientId, "connectionType": self.transport.connectionType()}
	responseChannel := make(chan Response)
	go func() {
		response, _ := self.transport.send(connectParams)
		responseChannel <- response
	}()
	self.listen(responseChannel)
}

func (self *FayeClient) listen(responseChannel chan Response) {
	response := <-responseChannel
	if response.successful == true {
		go self.handleResponse(response)
	} else {
	}
}

func (self *FayeClient) Listen() {
	for {
		self.connect()
	}
}

func (self *FayeClient) Publish(channel string, data map[string]interface{}) {
	if self.state != CONNECTED {
		self.handshake()
	}
	publishParams := map[string]interface{}{"channel": channel, "data": data, "clientId": self.clientId}
	self.transport.send(publishParams)
}

func RegisterTransports(transports []Transport) {
	registeredTransports = transports
}