diff options
Diffstat (limited to 'vendor/github.com/nlopes/slack/websocket_managed_conn.go')
-rw-r--r-- | vendor/github.com/nlopes/slack/websocket_managed_conn.go | 63 |
1 files changed, 39 insertions, 24 deletions
diff --git a/vendor/github.com/nlopes/slack/websocket_managed_conn.go b/vendor/github.com/nlopes/slack/websocket_managed_conn.go index 8b3b3833..dbbf682d 100644 --- a/vendor/github.com/nlopes/slack/websocket_managed_conn.go +++ b/vendor/github.com/nlopes/slack/websocket_managed_conn.go @@ -58,15 +58,19 @@ func (rtm *RTM) ManageConnection() { rtm.Debugf("RTM connection succeeded on try %d", connectionCount) + rawEvents := make(chan json.RawMessage) // we're now connected so we can set up listeners - go rtm.handleIncomingEvents() - + go rtm.handleIncomingEvents(rawEvents) // this should be a blocking call until the connection has ended - rtm.handleEvents() + rtm.handleEvents(rawEvents) select { case <-rtm.disconnected: // after handle events returns we need to check if we're disconnected + // when this happens we need to cleanup the newly created connection. + if err = conn.Close(); err != nil { + rtm.Debugln("failed to close conn on disconnected RTM", err) + } return default: // otherwise continue and run the loop again to reconnect @@ -208,7 +212,7 @@ func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn // This should not be called directly! Instead a boolean value (true for // intentional, false otherwise) should be sent to the killChannel on the RTM. func (rtm *RTM) killConnection(intentional bool, cause error) (err error) { - rtm.Debugln("killing connection") + rtm.Debugln("killing connection", cause) if rtm.conn != nil { err = rtm.conn.Close() @@ -228,7 +232,7 @@ func (rtm *RTM) killConnection(intentional bool, cause error) (err error) { // interval. This also sends outgoing messages that are received from the RTM's // outgoingMessages channel. This also handles incoming raw events from the RTM // rawEvents channel. -func (rtm *RTM) handleEvents() { +func (rtm *RTM) handleEvents(events chan json.RawMessage) { ticker := time.NewTicker(rtm.pingInterval) defer ticker.Stop() for { @@ -239,7 +243,7 @@ func (rtm *RTM) handleEvents() { return // detect when the connection is dead. case <-rtm.pingDeadman.C: - _ = rtm.killConnection(false, errorsx.String("deadman switch triggered")) + _ = rtm.killConnection(false, ErrRTMDeadman) return // send pings on ticker interval case <-ticker.C: @@ -255,12 +259,17 @@ func (rtm *RTM) handleEvents() { // listen for messages that need to be sent case msg := <-rtm.outgoingMessages: rtm.sendOutgoingMessage(msg) - // listen for incoming messages that need to be parsed - case rawEvent := <-rtm.rawEvents: + // listen for incoming messages that need to be parsed + case rawEvent := <-events: switch rtm.handleRawEvent(rawEvent) { case rtmEventTypeGoodbye: - _ = rtm.killConnection(false, errorsx.String("goodbye detected")) - return + // kill the connection, but DO NOT RETURN, a follow up kill signal will + // be sent that still needs to be processed. this duplication is because + // the event reader restarts once it emits the goodbye event. + // unlike the other cases in this function a final read will be triggered + // against the connection which will emit a kill signal. if we return early + // this kill signal will be processed by the next connection. + _ = rtm.killConnection(false, ErrRTMGoodbye) default: } } @@ -268,13 +277,17 @@ func (rtm *RTM) handleEvents() { } // handleIncomingEvents monitors the RTM's opened websocket for any incoming -// events. It pushes the raw events onto the RTM channel rawEvents. +// events. It pushes the raw events into the channel. // -// This will stop executing once the RTM's keepRunning channel has been closed -// or has anything sent to it. -func (rtm *RTM) handleIncomingEvents() { +// This will stop executing once the RTM's when a fatal error is detected, or +// a disconnect occurs. +func (rtm *RTM) handleIncomingEvents(events chan json.RawMessage) { for { - if err := rtm.receiveIncomingEvent(); err != nil { + if err := rtm.receiveIncomingEvent(events); err != nil { + select { + case rtm.killChannel <- false: + case <-rtm.disconnected: + } return } } @@ -336,9 +349,15 @@ func (rtm *RTM) ping() error { // receiveIncomingEvent attempts to receive an event from the RTM's websocket. // This will block until a frame is available from the websocket. // If the read from the websocket results in a fatal error, this function will return non-nil. -func (rtm *RTM) receiveIncomingEvent() error { +func (rtm *RTM) receiveIncomingEvent(events chan json.RawMessage) error { event := json.RawMessage{} err := rtm.conn.ReadJSON(&event) + + // check if the connection was closed. + if websocket.IsUnexpectedCloseError(err) { + return err + } + switch { case err == io.ErrUnexpectedEOF: // EOF's don't seem to signify a failed connection so instead we ignore @@ -357,22 +376,18 @@ func (rtm *RTM) receiveIncomingEvent() error { ErrorObj: err, }} - select { - case rtm.killChannel <- false: - case <-rtm.disconnected: - } - return err case len(event) == 0: rtm.Debugln("Received empty event") default: rtm.Debugln("Incoming Event:", string(event)) select { - case rtm.rawEvents <- event: + case events <- event: case <-rtm.disconnected: rtm.Debugln("disonnected while attempting to send raw event") } } + return nil } @@ -396,8 +411,6 @@ func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string { rtm.handlePong(rawEvent) case rtmEventTypeGoodbye: // just return the event type up for goodbye, will be handled by caller. - case rtmEventTypeDesktopNotification: - rtm.Debugln("Received desktop notification, ignoring") default: rtm.handleEvent(event.Type, rawEvent) } @@ -563,4 +576,6 @@ var EventMapping = map[string]interface{}{ "subteam_self_added": SubteamSelfAddedEvent{}, "subteam_self_removed": SubteamSelfRemovedEvent{}, "subteam_updated": SubteamUpdatedEvent{}, + + "desktop_notification": DesktopNotificationEvent{}, } |