diff options
Diffstat (limited to 'vendor/github.com/nlopes/slack/websocket_managed_conn.go')
-rw-r--r-- | vendor/github.com/nlopes/slack/websocket_managed_conn.go | 142 |
1 files changed, 84 insertions, 58 deletions
diff --git a/vendor/github.com/nlopes/slack/websocket_managed_conn.go b/vendor/github.com/nlopes/slack/websocket_managed_conn.go index 62157910..8b3b3833 100644 --- a/vendor/github.com/nlopes/slack/websocket_managed_conn.go +++ b/vendor/github.com/nlopes/slack/websocket_managed_conn.go @@ -10,6 +10,8 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/nlopes/slack/internal/errorsx" + "github.com/nlopes/slack/internal/timex" ) // ManageConnection can be called on a Slack RTM instance returned by the @@ -38,6 +40,7 @@ func (rtm *RTM) ManageConnection() { if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil { // when the connection is unsuccessful its fatal, and we need to bail out. rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err) + rtm.disconnect() return } @@ -45,7 +48,6 @@ func (rtm *RTM) ManageConnection() { // and conn. rtm.mu.Lock() rtm.conn = conn - rtm.isConnected = true rtm.info = info rtm.mu.Unlock() @@ -56,20 +58,19 @@ func (rtm *RTM) ManageConnection() { rtm.Debugf("RTM connection succeeded on try %d", connectionCount) - keepRunning := make(chan bool) - // we're now connected (or have failed fatally) so we can set up - // listeners - go rtm.handleIncomingEvents(keepRunning) + // we're now connected so we can set up listeners + go rtm.handleIncomingEvents() // this should be a blocking call until the connection has ended - rtm.handleEvents(keepRunning) + rtm.handleEvents() - // after being disconnected we need to check if it was intentional - // if not then we should try to reconnect - if rtm.wasIntentional { + select { + case <-rtm.disconnected: + // after handle events returns we need to check if we're disconnected return + default: + // otherwise continue and run the loop again to reconnect } - // else continue and run the loop again to connect } } @@ -88,18 +89,20 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke // used to provide exponential backoff wait time with jitter before trying // to connect to slack again boff := &backoff{ - Min: 100 * time.Millisecond, - Max: 5 * time.Minute, - Factor: 2, - Jitter: true, + Max: 5 * time.Minute, } for { + var ( + backoff time.Duration + ) + // send connecting event rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{ Attempt: boff.attempts + 1, ConnectionCount: connectionCount, }} + // attempt to start the connection info, conn, err := rtm.startRTMAndDial(useRTMStart) if err == nil { @@ -109,32 +112,49 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke // check for fatal errors switch err.Error() { case errInvalidAuth, errInactiveAccount, errMissingAuthToken: - rtm.Debugf("Invalid auth when connecting with RTM: %s", err) + rtm.Debugf("invalid auth when connecting with RTM: %s", err) rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} return nil, nil, err default: } + switch actual := err.(type) { + case statusCodeError: + if actual.Code == http.StatusNotFound { + rtm.Debugf("invalid auth when connecting with RTM: %s", err) + rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} + return nil, nil, err + } + case *RateLimitedError: + backoff = actual.RetryAfter + default: + } + + backoff = timex.Max(backoff, boff.Duration()) // any other errors are treated as recoverable and we try again after // sending the event along the IncomingEvents channel rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{ Attempt: boff.attempts, + Backoff: backoff, ErrorObj: err, }} - // check if Disconnect() has been invoked. + // get time we should wait before attempting to connect again + rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff) + + // wait for one of the following to occur, + // backoff duration has elapsed, killChannel is signalled, or + // the rtm finishes disconnecting. select { + case <-time.After(backoff): // retry after the backoff. + case intentional := <-rtm.killChannel: + if intentional { + rtm.killConnection(intentional, ErrRTMDisconnected) + return nil, nil, ErrRTMDisconnected + } case <-rtm.disconnected: - rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: true}} - return nil, nil, fmt.Errorf("disconnect received while trying to connect") - default: + return nil, nil, ErrRTMDisconnected } - - // get time we should wait before attempting to connect again - dur := boff.Duration() - rtm.Debugf("reconnection %d failed: %s", boff.attempts+1, err) - rtm.Debugln(" -> reconnecting in", dur) - time.Sleep(dur) } } @@ -187,15 +207,19 @@ func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn // // This should not be called directly! Instead a boolean value (true for // intentional, false otherwise) should be sent to the killChannel on the RTM. -func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error { +func (rtm *RTM) killConnection(intentional bool, cause error) (err error) { rtm.Debugln("killing connection") - if rtm.isConnected { - close(keepRunning) + + if rtm.conn != nil { + err = rtm.conn.Close() + } + + rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}} + + if intentional { + rtm.disconnect() } - rtm.isConnected = false - rtm.wasIntentional = intentional - err := rtm.conn.Close() - rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{intentional}} + return err } @@ -204,31 +228,28 @@ func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error { // interval. This also sends outgoing messages that are received from the RTM's // outgoingMessages channel. This also handles incoming raw events from the RTM // rawEvents channel. -func (rtm *RTM) handleEvents(keepRunning chan bool) { +func (rtm *RTM) handleEvents() { ticker := time.NewTicker(rtm.pingInterval) defer ticker.Stop() for { select { // catch "stop" signal on channel close case intentional := <-rtm.killChannel: - _ = rtm.killConnection(keepRunning, intentional) + _ = rtm.killConnection(intentional, errorsx.String("signaled")) return - // detect when the connection is dead. case <-rtm.pingDeadman.C: - rtm.Debugln("deadman switch trigger disconnecting") - _ = rtm.killConnection(keepRunning, false) + _ = rtm.killConnection(false, errorsx.String("deadman switch triggered")) + return // send pings on ticker interval case <-ticker.C: - err := rtm.ping() - if err != nil { - _ = rtm.killConnection(keepRunning, false) + if err := rtm.ping(); err != nil { + _ = rtm.killConnection(false, err) return } case <-rtm.forcePing: - err := rtm.ping() - if err != nil { - _ = rtm.killConnection(keepRunning, false) + if err := rtm.ping(); err != nil { + _ = rtm.killConnection(false, err) return } // listen for messages that need to be sent @@ -238,7 +259,8 @@ func (rtm *RTM) handleEvents(keepRunning chan bool) { case rawEvent := <-rtm.rawEvents: switch rtm.handleRawEvent(rawEvent) { case rtmEventTypeGoodbye: - _ = rtm.killConnection(keepRunning, false) + _ = rtm.killConnection(false, errorsx.String("goodbye detected")) + return default: } } @@ -250,17 +272,10 @@ func (rtm *RTM) handleEvents(keepRunning chan bool) { // // This will stop executing once the RTM's keepRunning channel has been closed // or has anything sent to it. -func (rtm *RTM) handleIncomingEvents(keepRunning <-chan bool) { +func (rtm *RTM) handleIncomingEvents() { for { - // non-blocking listen to see if channel is closed - select { - // catch "stop" signal on channel close - case <-keepRunning: + if err := rtm.receiveIncomingEvent(); err != nil { return - default: - if err := rtm.receiveIncomingEvent(); err != nil { - return - } } } } @@ -296,7 +311,6 @@ func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) { Message: msg, ErrorObj: err, }} - // TODO force ping? } } @@ -332,20 +346,32 @@ func (rtm *RTM) receiveIncomingEvent() error { // 'PING' message // trigger a 'PING' to detect potential websocket disconnect - rtm.forcePing <- true + select { + case rtm.forcePing <- true: + case <-rtm.disconnected: + } case err != nil: // All other errors from ReadJSON come from NextReader, and should // kill the read loop and force a reconnect. rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{ ErrorObj: err, }} - rtm.killChannel <- false + + select { + case rtm.killChannel <- false: + case <-rtm.disconnected: + } + return err case len(event) == 0: rtm.Debugln("Received empty event") default: - rtm.Debugln("Incoming Event:", string(event[:])) - rtm.rawEvents <- event + rtm.Debugln("Incoming Event:", string(event)) + select { + case rtm.rawEvents <- event: + case <-rtm.disconnected: + rtm.Debugln("disonnected while attempting to send raw event") + } } return nil } |