diff options
Diffstat (limited to 'vendor/github.com/nlopes/slack/websocket_managed_conn.go')
-rw-r--r-- | vendor/github.com/nlopes/slack/websocket_managed_conn.go | 183 |
1 files changed, 122 insertions, 61 deletions
diff --git a/vendor/github.com/nlopes/slack/websocket_managed_conn.go b/vendor/github.com/nlopes/slack/websocket_managed_conn.go index fec07fd7..b6d1bfc8 100644 --- a/vendor/github.com/nlopes/slack/websocket_managed_conn.go +++ b/vendor/github.com/nlopes/slack/websocket_managed_conn.go @@ -4,10 +4,11 @@ import ( "encoding/json" "fmt" "io" + "net/http" "reflect" "time" - "golang.org/x/net/websocket" + "github.com/gorilla/websocket" ) // ManageConnection can be called on a Slack RTM instance returned by the @@ -24,25 +25,35 @@ import ( // // The defined error events are located in websocket_internals.go. func (rtm *RTM) ManageConnection() { - var connectionCount int - for { - connectionCount++ + var ( + err error + info *Info + conn *websocket.Conn + ) + + for connectionCount := 0; ; connectionCount++ { // start trying to connect // the returned err is already passed onto the IncomingEvents channel - info, conn, err := rtm.connect(connectionCount, rtm.useRTMStart) - // if err != nil then the connection is sucessful - otherwise it is - // fatal - if err != nil { + if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil { + // when the connection is unsuccessful its fatal, and we need to bail out. + rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err) return } + + // lock to prevent data races with Disconnect particularly around isConnected + // and conn. + rtm.mu.Lock() + rtm.conn = conn + rtm.isConnected = true rtm.info = info + rtm.mu.Unlock() + rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{ ConnectionCount: connectionCount, Info: info, }} - rtm.conn = conn - rtm.isConnected = true + rtm.Debugf("RTM connection succeeded on try %d", connectionCount) keepRunning := make(chan bool) // we're now connected (or have failed fatally) so we can set up @@ -50,7 +61,7 @@ func (rtm *RTM) ManageConnection() { go rtm.handleIncomingEvents(keepRunning) // this should be a blocking call until the connection has ended - rtm.handleEvents(keepRunning, 30*time.Second) + rtm.handleEvents(keepRunning) // after being disconnected we need to check if it was intentional // if not then we should try to reconnect @@ -67,6 +78,12 @@ func (rtm *RTM) ManageConnection() { // If useRTMStart is false then it uses rtm.connect to create the connection, // otherwise it uses rtm.start. func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) { + const ( + errInvalidAuth = "invalid_auth" + errInactiveAccount = "account_inactive" + errMissingAuthToken = "not_authed" + ) + // used to provide exponential backoff wait time with jitter before trying // to connect to slack again boff := &backoff{ @@ -87,10 +104,14 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke if err == nil { return info, conn, nil } - // check for fatal errors - currently only invalid_auth - if sErr, ok := err.(*WebError); ok && (sErr.Error() == "invalid_auth" || sErr.Error() == "account_inactive") { + + // check for fatal errors + switch err.Error() { + case errInvalidAuth, errInactiveAccount, errMissingAuthToken: + rtm.Debugf("Invalid auth when connecting with RTM: %s", err) rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} - return nil, nil, sErr + return nil, nil, err + default: } // any other errors are treated as recoverable and we try again after @@ -102,7 +123,7 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke // check if Disconnect() has been invoked. select { - case _ = <-rtm.disconnected: + case <-rtm.disconnected: rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: true}} return nil, nil, fmt.Errorf("disconnect received while trying to connect") default: @@ -119,23 +140,34 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke // startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true, // then it returns the full information returned by the "rtm.start" method on the // slack API. Else it uses the "rtm.connect" method to connect -func (rtm *RTM) startRTMAndDial(useRTMStart bool) (*Info, *websocket.Conn, error) { - var info *Info - var url string - var err error +func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) { + var ( + url string + ) if useRTMStart { + rtm.Debugf("Starting RTM") info, url, err = rtm.StartRTM() } else { + rtm.Debugf("Connecting to RTM") info, url, err = rtm.ConnectRTM() } if err != nil { + rtm.Debugf("Failed to start or connect to RTM: %s", err) return nil, nil, err } + rtm.Debugf("Dialing to websocket on url %s", url) // Only use HTTPS for connections to prevent MITM attacks on the connection. - conn, err := websocketProxyDial(url, "https://api.slack.com") + upgradeHeader := http.Header{} + upgradeHeader.Add("Origin", "https://api.slack.com") + dialer := websocket.DefaultDialer + if rtm.dialer != nil { + dialer = rtm.dialer + } + conn, _, err := dialer.Dial(url, upgradeHeader) if err != nil { + rtm.Debugf("Failed to dial to the websocket: %s", err) return nil, nil, err } return info, conn, err @@ -163,8 +195,8 @@ func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error { // interval. This also sends outgoing messages that are received from the RTM's // outgoingMessages channel. This also handles incoming raw events from the RTM // rawEvents channel. -func (rtm *RTM) handleEvents(keepRunning chan bool, interval time.Duration) { - ticker := time.NewTicker(interval) +func (rtm *RTM) handleEvents(keepRunning chan bool) { + ticker := time.NewTicker(rtm.pingInterval) defer ticker.Stop() for { select { @@ -172,7 +204,12 @@ func (rtm *RTM) handleEvents(keepRunning chan bool, interval time.Duration) { case intentional := <-rtm.killChannel: _ = rtm.killConnection(keepRunning, intentional) return - // send pings on ticker interval + + // detect when the connection is dead. + case <-rtm.pingDeadman.C: + rtm.Debugln("deadman switch trigger disconnecting") + _ = rtm.killConnection(keepRunning, false) + // send pings on ticker interval case <-ticker.C: err := rtm.ping() if err != nil { @@ -190,7 +227,11 @@ func (rtm *RTM) handleEvents(keepRunning chan bool, interval time.Duration) { rtm.sendOutgoingMessage(msg) // listen for incoming messages that need to be parsed case rawEvent := <-rtm.rawEvents: - rtm.handleRawEvent(rawEvent) + switch rtm.handleRawEvent(rawEvent) { + case rtmEventTypeGoodbye: + _ = rtm.killConnection(keepRunning, false) + default: + } } } } @@ -208,7 +249,9 @@ func (rtm *RTM) handleIncomingEvents(keepRunning <-chan bool) { case <-keepRunning: return default: - rtm.receiveIncomingEvent() + if err := rtm.receiveIncomingEvent(); err != nil { + return + } } } } @@ -218,7 +261,7 @@ func (rtm *RTM) sendWithDeadline(msg interface{}) error { if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { return err } - if err := websocket.JSON.Send(rtm.conn, msg); err != nil { + if err := rtm.conn.WriteJSON(msg); err != nil { return err } // remove write deadline @@ -258,9 +301,7 @@ func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) { func (rtm *RTM) ping() error { id := rtm.idGen.Next() rtm.Debugln("Sending PING ", id) - rtm.pings[id] = time.Now() - - msg := &Ping{ID: id, Type: "ping"} + msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()} if err := rtm.sendWithDeadline(msg); err != nil { rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error()) @@ -271,52 +312,62 @@ func (rtm *RTM) ping() error { // receiveIncomingEvent attempts to receive an event from the RTM's websocket. // This will block until a frame is available from the websocket. -func (rtm *RTM) receiveIncomingEvent() { +// If the read from the websocket results in a fatal error, this function will return non-nil. +func (rtm *RTM) receiveIncomingEvent() error { event := json.RawMessage{} - err := websocket.JSON.Receive(rtm.conn, &event) - if err == io.EOF { + err := rtm.conn.ReadJSON(&event) + switch { + case err == io.ErrUnexpectedEOF: // EOF's don't seem to signify a failed connection so instead we ignore // them here and detect a failed connection upon attempting to send a // 'PING' message - // trigger a 'PING' to detect pontential websocket disconnect + // trigger a 'PING' to detect potential websocket disconnect rtm.forcePing <- true - return - } else if err != nil { + case err != nil: + // All other errors from ReadJSON come from NextReader, and should + // kill the read loop and force a reconnect. rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{ ErrorObj: err, }} - // force a ping here too? - return - } else if len(event) == 0 { + rtm.killChannel <- false + return err + case len(event) == 0: rtm.Debugln("Received empty event") - return + default: + rtm.Debugln("Incoming Event:", string(event[:])) + rtm.rawEvents <- event } - rtm.Debugln("Incoming Event:", string(event[:])) - rtm.rawEvents <- event + return nil } // handleRawEvent takes a raw JSON message received from the slack websocket // and handles the encoded event. -func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) { +// returns the event type of the message. +func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string { event := &Event{} err := json.Unmarshal(rawEvent, event) if err != nil { rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} - return + return "" } + switch event.Type { - case "": + case rtmEventTypeAck: rtm.handleAck(rawEvent) - case "hello": + case rtmEventTypeHello: rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}} - case "pong": + case rtmEventTypePong: rtm.handlePong(rawEvent) - case "desktop_notification": + case rtmEventTypeGoodbye: + // just return the event type up for goodbye, will be handled by caller. + case rtmEventTypeDesktopNotification: rtm.Debugln("Received desktop notification, ignoring") default: rtm.handleEvent(event.Type, rawEvent) } + + return event.Type } // handleAck handles an incoming 'ACK' message. @@ -331,7 +382,13 @@ func (rtm *RTM) handleAck(event json.RawMessage) { if ack.Ok { rtm.IncomingEvents <- RTMEvent{"ack", ack} } else if ack.RTMResponse.Error != nil { - rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}} + // As there is no documentation for RTM error-codes, this + // identification of a rate-limit warning is very brittle. + if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." { + rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}} + } else { + rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}} + } } else { rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}} } @@ -341,19 +398,20 @@ func (rtm *RTM) handleAck(event json.RawMessage) { // a previously sent 'PING' message. This is then used to compute the // connection's latency. func (rtm *RTM) handlePong(event json.RawMessage) { - pong := &Pong{} - if err := json.Unmarshal(event, pong); err != nil { - rtm.Debugln("RTM Error unmarshalling 'pong' event:", err) + var ( + p Pong + ) + + rtm.resetDeadman() + + if err := json.Unmarshal(event, &p); err != nil { + logger.Println("RTM Error unmarshalling 'pong' event:", err) rtm.Debugln(" -> Erroneous 'ping' event:", string(event)) return } - if pingTime, exists := rtm.pings[pong.ReplyTo]; exists { - latency := time.Since(pingTime) - rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}} - delete(rtm.pings, pong.ReplyTo) - } else { - rtm.Debugln("RTM Error - unmatched 'pong' event:", string(event)) - } + + latency := time.Since(time.Unix(p.Timestamp, 0)) + rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}} } // handleEvent is the "default" response to an event that does not have a @@ -363,7 +421,7 @@ func (rtm *RTM) handlePong(event json.RawMessage) { // correct struct then this sends an UnmarshallingErrorEvent to the // IncomingEvents channel. func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) { - v, exists := eventMapping[typeStr] + v, exists := EventMapping[typeStr] if !exists { rtm.Debugf("RTM Error, received unmapped event %q: %s\n", typeStr, string(event)) err := fmt.Errorf("RTM Error: Received unmapped event %q: %s\n", typeStr, string(event)) @@ -382,10 +440,10 @@ func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) { rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent} } -// eventMapping holds a mapping of event names to their corresponding struct +// EventMapping holds a mapping of event names to their corresponding struct // implementations. The structs should be instances of the unmarshalling // target for the matching event type. -var eventMapping = map[string]interface{}{ +var EventMapping = map[string]interface{}{ "message": MessageEvent{}, "presence_change": PresenceChangeEvent{}, "user_typing": UserTypingEvent{}, @@ -463,4 +521,7 @@ var eventMapping = map[string]interface{}{ "accounts_changed": AccountsChangedEvent{}, "reconnect_url": ReconnectUrlEvent{}, + + "member_joined_channel": MemberJoinedChannelEvent{}, + "member_left_channel": MemberLeftChannelEvent{}, } |