summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/nlopes/slack/websocket_managed_conn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/nlopes/slack/websocket_managed_conn.go')
-rw-r--r--vendor/github.com/nlopes/slack/websocket_managed_conn.go142
1 files changed, 84 insertions, 58 deletions
diff --git a/vendor/github.com/nlopes/slack/websocket_managed_conn.go b/vendor/github.com/nlopes/slack/websocket_managed_conn.go
index 62157910..8b3b3833 100644
--- a/vendor/github.com/nlopes/slack/websocket_managed_conn.go
+++ b/vendor/github.com/nlopes/slack/websocket_managed_conn.go
@@ -10,6 +10,8 @@ import (
"time"
"github.com/gorilla/websocket"
+ "github.com/nlopes/slack/internal/errorsx"
+ "github.com/nlopes/slack/internal/timex"
)
// ManageConnection can be called on a Slack RTM instance returned by the
@@ -38,6 +40,7 @@ func (rtm *RTM) ManageConnection() {
if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil {
// when the connection is unsuccessful its fatal, and we need to bail out.
rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
+ rtm.disconnect()
return
}
@@ -45,7 +48,6 @@ func (rtm *RTM) ManageConnection() {
// and conn.
rtm.mu.Lock()
rtm.conn = conn
- rtm.isConnected = true
rtm.info = info
rtm.mu.Unlock()
@@ -56,20 +58,19 @@ func (rtm *RTM) ManageConnection() {
rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
- keepRunning := make(chan bool)
- // we're now connected (or have failed fatally) so we can set up
- // listeners
- go rtm.handleIncomingEvents(keepRunning)
+ // we're now connected so we can set up listeners
+ go rtm.handleIncomingEvents()
// this should be a blocking call until the connection has ended
- rtm.handleEvents(keepRunning)
+ rtm.handleEvents()
- // after being disconnected we need to check if it was intentional
- // if not then we should try to reconnect
- if rtm.wasIntentional {
+ select {
+ case <-rtm.disconnected:
+ // after handle events returns we need to check if we're disconnected
return
+ default:
+ // otherwise continue and run the loop again to reconnect
}
- // else continue and run the loop again to connect
}
}
@@ -88,18 +89,20 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke
// used to provide exponential backoff wait time with jitter before trying
// to connect to slack again
boff := &backoff{
- Min: 100 * time.Millisecond,
- Max: 5 * time.Minute,
- Factor: 2,
- Jitter: true,
+ Max: 5 * time.Minute,
}
for {
+ var (
+ backoff time.Duration
+ )
+
// send connecting event
rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{
Attempt: boff.attempts + 1,
ConnectionCount: connectionCount,
}}
+
// attempt to start the connection
info, conn, err := rtm.startRTMAndDial(useRTMStart)
if err == nil {
@@ -109,32 +112,49 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke
// check for fatal errors
switch err.Error() {
case errInvalidAuth, errInactiveAccount, errMissingAuthToken:
- rtm.Debugf("Invalid auth when connecting with RTM: %s", err)
+ rtm.Debugf("invalid auth when connecting with RTM: %s", err)
rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
return nil, nil, err
default:
}
+ switch actual := err.(type) {
+ case statusCodeError:
+ if actual.Code == http.StatusNotFound {
+ rtm.Debugf("invalid auth when connecting with RTM: %s", err)
+ rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
+ return nil, nil, err
+ }
+ case *RateLimitedError:
+ backoff = actual.RetryAfter
+ default:
+ }
+
+ backoff = timex.Max(backoff, boff.Duration())
// any other errors are treated as recoverable and we try again after
// sending the event along the IncomingEvents channel
rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{
Attempt: boff.attempts,
+ Backoff: backoff,
ErrorObj: err,
}}
- // check if Disconnect() has been invoked.
+ // get time we should wait before attempting to connect again
+ rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff)
+
+ // wait for one of the following to occur,
+ // backoff duration has elapsed, killChannel is signalled, or
+ // the rtm finishes disconnecting.
select {
+ case <-time.After(backoff): // retry after the backoff.
+ case intentional := <-rtm.killChannel:
+ if intentional {
+ rtm.killConnection(intentional, ErrRTMDisconnected)
+ return nil, nil, ErrRTMDisconnected
+ }
case <-rtm.disconnected:
- rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: true}}
- return nil, nil, fmt.Errorf("disconnect received while trying to connect")
- default:
+ return nil, nil, ErrRTMDisconnected
}
-
- // get time we should wait before attempting to connect again
- dur := boff.Duration()
- rtm.Debugf("reconnection %d failed: %s", boff.attempts+1, err)
- rtm.Debugln(" -> reconnecting in", dur)
- time.Sleep(dur)
}
}
@@ -187,15 +207,19 @@ func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn
//
// This should not be called directly! Instead a boolean value (true for
// intentional, false otherwise) should be sent to the killChannel on the RTM.
-func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error {
+func (rtm *RTM) killConnection(intentional bool, cause error) (err error) {
rtm.Debugln("killing connection")
- if rtm.isConnected {
- close(keepRunning)
+
+ if rtm.conn != nil {
+ err = rtm.conn.Close()
+ }
+
+ rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}}
+
+ if intentional {
+ rtm.disconnect()
}
- rtm.isConnected = false
- rtm.wasIntentional = intentional
- err := rtm.conn.Close()
- rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{intentional}}
+
return err
}
@@ -204,31 +228,28 @@ func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error {
// interval. This also sends outgoing messages that are received from the RTM's
// outgoingMessages channel. This also handles incoming raw events from the RTM
// rawEvents channel.
-func (rtm *RTM) handleEvents(keepRunning chan bool) {
+func (rtm *RTM) handleEvents() {
ticker := time.NewTicker(rtm.pingInterval)
defer ticker.Stop()
for {
select {
// catch "stop" signal on channel close
case intentional := <-rtm.killChannel:
- _ = rtm.killConnection(keepRunning, intentional)
+ _ = rtm.killConnection(intentional, errorsx.String("signaled"))
return
-
// detect when the connection is dead.
case <-rtm.pingDeadman.C:
- rtm.Debugln("deadman switch trigger disconnecting")
- _ = rtm.killConnection(keepRunning, false)
+ _ = rtm.killConnection(false, errorsx.String("deadman switch triggered"))
+ return
// send pings on ticker interval
case <-ticker.C:
- err := rtm.ping()
- if err != nil {
- _ = rtm.killConnection(keepRunning, false)
+ if err := rtm.ping(); err != nil {
+ _ = rtm.killConnection(false, err)
return
}
case <-rtm.forcePing:
- err := rtm.ping()
- if err != nil {
- _ = rtm.killConnection(keepRunning, false)
+ if err := rtm.ping(); err != nil {
+ _ = rtm.killConnection(false, err)
return
}
// listen for messages that need to be sent
@@ -238,7 +259,8 @@ func (rtm *RTM) handleEvents(keepRunning chan bool) {
case rawEvent := <-rtm.rawEvents:
switch rtm.handleRawEvent(rawEvent) {
case rtmEventTypeGoodbye:
- _ = rtm.killConnection(keepRunning, false)
+ _ = rtm.killConnection(false, errorsx.String("goodbye detected"))
+ return
default:
}
}
@@ -250,17 +272,10 @@ func (rtm *RTM) handleEvents(keepRunning chan bool) {
//
// This will stop executing once the RTM's keepRunning channel has been closed
// or has anything sent to it.
-func (rtm *RTM) handleIncomingEvents(keepRunning <-chan bool) {
+func (rtm *RTM) handleIncomingEvents() {
for {
- // non-blocking listen to see if channel is closed
- select {
- // catch "stop" signal on channel close
- case <-keepRunning:
+ if err := rtm.receiveIncomingEvent(); err != nil {
return
- default:
- if err := rtm.receiveIncomingEvent(); err != nil {
- return
- }
}
}
}
@@ -296,7 +311,6 @@ func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
Message: msg,
ErrorObj: err,
}}
- // TODO force ping?
}
}
@@ -332,20 +346,32 @@ func (rtm *RTM) receiveIncomingEvent() error {
// 'PING' message
// trigger a 'PING' to detect potential websocket disconnect
- rtm.forcePing <- true
+ select {
+ case rtm.forcePing <- true:
+ case <-rtm.disconnected:
+ }
case err != nil:
// All other errors from ReadJSON come from NextReader, and should
// kill the read loop and force a reconnect.
rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
ErrorObj: err,
}}
- rtm.killChannel <- false
+
+ select {
+ case rtm.killChannel <- false:
+ case <-rtm.disconnected:
+ }
+
return err
case len(event) == 0:
rtm.Debugln("Received empty event")
default:
- rtm.Debugln("Incoming Event:", string(event[:]))
- rtm.rawEvents <- event
+ rtm.Debugln("Incoming Event:", string(event))
+ select {
+ case rtm.rawEvents <- event:
+ case <-rtm.disconnected:
+ rtm.Debugln("disonnected while attempting to send raw event")
+ }
}
return nil
}