diff options
author | Wim <wim@42.be> | 2016-09-04 20:03:07 +0200 |
---|---|---|
committer | Wim <wim@42.be> | 2016-09-04 20:04:43 +0200 |
commit | 12389d602eecf4adab0e394da19b2b808cb489b1 (patch) | |
tree | 80d6d8c00ed31fdcd60540d7c65afa6e7d2d83d6 /vendor/github.com/mrexodia/wray/go_faye.go | |
parent | 44144587a0314b7e2c719d279116ac86b657657e (diff) | |
download | matterbridge-msglm-12389d602eecf4adab0e394da19b2b808cb489b1.tar.gz matterbridge-msglm-12389d602eecf4adab0e394da19b2b808cb489b1.tar.bz2 matterbridge-msglm-12389d602eecf4adab0e394da19b2b808cb489b1.zip |
Add Gitter support
Diffstat (limited to 'vendor/github.com/mrexodia/wray/go_faye.go')
-rw-r--r-- | vendor/github.com/mrexodia/wray/go_faye.go | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/vendor/github.com/mrexodia/wray/go_faye.go b/vendor/github.com/mrexodia/wray/go_faye.go new file mode 100644 index 00000000..ab78eed7 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/go_faye.go @@ -0,0 +1,140 @@ +package wray + +import ( + "fmt" + "path/filepath" + "strings" + "time" +) + +const ( + UNCONNECTED = 1 + CONNECTING = 2 + CONNECTED = 3 + DISCONNECTED = 4 + + HANDSHAKE = "handshake" + RETRY = "retry" + NONE = "none" + + CONNECTION_TIMEOUT = 60.0 + DEFAULT_RETRY = 5.0 + MAX_REQUEST_SIZE = 2048 +) + +var ( + MANDATORY_CONNECTION_TYPES = []string{"long-polling"} + registeredTransports = []Transport{} +) + +type FayeClient struct { + state int + url string + subscriptions []Subscription + transport Transport + clientId string + schedular Schedular +} + +type Subscription struct { + channel string + callback func(Message) +} + +type SubscriptionPromise struct { + subscription Subscription +} + +func NewFayeClient(url string) *FayeClient { + schedular := ChannelSchedular{} + client := &FayeClient{url: url, state: UNCONNECTED, schedular: schedular} + return client +} + +func (self *FayeClient) handshake() { + t, err := SelectTransport(self, MANDATORY_CONNECTION_TYPES, []string{}) + if err != nil { + panic("No usable transports available") + } + self.transport = t + self.transport.setUrl(self.url) + self.state = CONNECTING + handshakeParams := map[string]interface{}{"channel": "/meta/handshake", + "version": "1.0", + "supportedConnectionTypes": []string{"long-polling"}} + response, err := self.transport.send(handshakeParams) + if err != nil { + fmt.Println("Handshake failed. Retry in 10 seconds") + self.state = UNCONNECTED + self.schedular.wait(10*time.Second, func() { + fmt.Println("retying handshake") + self.handshake() + }) + return + } + self.clientId = response.clientId + self.state = CONNECTED + self.transport, err = SelectTransport(self, response.supportedConnectionTypes, []string{}) + if err != nil { + panic("Server does not support any available transports. Supported transports: " + strings.Join(response.supportedConnectionTypes, ",")) + } +} + +func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) SubscriptionPromise { + if self.state == UNCONNECTED { + self.handshake() + } + subscriptionParams := map[string]interface{}{"channel": "/meta/subscribe", "clientId": self.clientId, "subscription": channel, "id": "1"} + subscription := Subscription{channel: channel, callback: callback} + //TODO: deal with subscription failures + self.transport.send(subscriptionParams) + self.subscriptions = append(self.subscriptions, subscription) + return SubscriptionPromise{subscription} +} + +func (self *FayeClient) handleResponse(response Response) { + for _, message := range response.messages { + for _, subscription := range self.subscriptions { + matched, _ := filepath.Match(subscription.channel, message.Channel) + if matched { + go subscription.callback(message) + } + } + } +} + +func (self *FayeClient) connect() { + connectParams := map[string]interface{}{"channel": "/meta/connect", "clientId": self.clientId, "connectionType": self.transport.connectionType()} + responseChannel := make(chan Response) + go func() { + response, _ := self.transport.send(connectParams) + responseChannel <- response + }() + self.listen(responseChannel) +} + +func (self *FayeClient) listen(responseChannel chan Response) { + response := <-responseChannel + if response.successful == true { + go self.handleResponse(response) + } else { + } +} + +func (self *FayeClient) Listen() { + for { + self.connect() + } +} + +func (self *FayeClient) Publish(channel string, data map[string]interface{}) { + if self.state != CONNECTED { + self.handshake() + } + publishParams := map[string]interface{}{"channel": channel, "data": data, "clientId": self.clientId} + self.transport.send(publishParams) +} + +func RegisterTransports(transports []Transport) { + registeredTransports = transports +} |