summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/slack-go/slack/websocket_managed_conn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/slack-go/slack/websocket_managed_conn.go')
-rw-r--r--vendor/github.com/slack-go/slack/websocket_managed_conn.go581
1 files changed, 581 insertions, 0 deletions
diff --git a/vendor/github.com/slack-go/slack/websocket_managed_conn.go b/vendor/github.com/slack-go/slack/websocket_managed_conn.go
new file mode 100644
index 00000000..6395938c
--- /dev/null
+++ b/vendor/github.com/slack-go/slack/websocket_managed_conn.go
@@ -0,0 +1,581 @@
+package slack
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ stdurl "net/url"
+ "reflect"
+ "time"
+
+ "github.com/gorilla/websocket"
+ "github.com/slack-go/slack/internal/errorsx"
+ "github.com/slack-go/slack/internal/timex"
+)
+
+// ManageConnection can be called on a Slack RTM instance returned by the
+// NewRTM method. It will connect to the slack RTM API and handle all incoming
+// and outgoing events. If a connection fails then it will attempt to reconnect
+// and will notify any listeners through an error event on the IncomingEvents
+// channel.
+//
+// If the connection ends and the disconnect was unintentional then this will
+// attempt to reconnect.
+//
+// This should only be called once per slack API! Otherwise expect undefined
+// behavior.
+//
+// The defined error events are located in websocket_internals.go.
+func (rtm *RTM) ManageConnection() {
+ var (
+ err error
+ info *Info
+ conn *websocket.Conn
+ )
+
+ for connectionCount := 0; ; connectionCount++ {
+ // start trying to connect
+ // the returned err is already passed onto the IncomingEvents channel
+ if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil {
+ // when the connection is unsuccessful its fatal, and we need to bail out.
+ rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
+ rtm.disconnect()
+ return
+ }
+
+ // lock to prevent data races with Disconnect particularly around isConnected
+ // and conn.
+ rtm.mu.Lock()
+ rtm.conn = conn
+ rtm.info = info
+ rtm.mu.Unlock()
+
+ rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{
+ ConnectionCount: connectionCount,
+ Info: info,
+ }}
+
+ rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
+
+ rawEvents := make(chan json.RawMessage)
+ // we're now connected so we can set up listeners
+ go rtm.handleIncomingEvents(rawEvents)
+ // this should be a blocking call until the connection has ended
+ rtm.handleEvents(rawEvents)
+
+ select {
+ case <-rtm.disconnected:
+ // after handle events returns we need to check if we're disconnected
+ // when this happens we need to cleanup the newly created connection.
+ if err = conn.Close(); err != nil {
+ rtm.Debugln("failed to close conn on disconnected RTM", err)
+ }
+ return
+ default:
+ // otherwise continue and run the loop again to reconnect
+ }
+ }
+}
+
+// connect attempts to connect to the slack websocket API. It handles any
+// errors that occur while connecting and will return once a connection
+// has been successfully opened.
+// If useRTMStart is false then it uses rtm.connect to create the connection,
+// otherwise it uses rtm.start.
+func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) {
+ const (
+ errInvalidAuth = "invalid_auth"
+ errInactiveAccount = "account_inactive"
+ errMissingAuthToken = "not_authed"
+ )
+
+ // used to provide exponential backoff wait time with jitter before trying
+ // to connect to slack again
+ boff := &backoff{
+ Max: 5 * time.Minute,
+ }
+
+ for {
+ var (
+ backoff time.Duration
+ )
+
+ // send connecting event
+ rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{
+ Attempt: boff.attempts + 1,
+ ConnectionCount: connectionCount,
+ }}
+
+ // attempt to start the connection
+ info, conn, err := rtm.startRTMAndDial(useRTMStart)
+ if err == nil {
+ return info, conn, nil
+ }
+
+ // check for fatal errors
+ switch err.Error() {
+ case errInvalidAuth, errInactiveAccount, errMissingAuthToken:
+ rtm.Debugf("invalid auth when connecting with RTM: %s", err)
+ rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
+ return nil, nil, err
+ default:
+ }
+
+ switch actual := err.(type) {
+ case statusCodeError:
+ if actual.Code == http.StatusNotFound {
+ rtm.Debugf("invalid auth when connecting with RTM: %s", err)
+ rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
+ return nil, nil, err
+ }
+ case *RateLimitedError:
+ backoff = actual.RetryAfter
+ default:
+ }
+
+ backoff = timex.Max(backoff, boff.Duration())
+ // any other errors are treated as recoverable and we try again after
+ // sending the event along the IncomingEvents channel
+ rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{
+ Attempt: boff.attempts,
+ Backoff: backoff,
+ ErrorObj: err,
+ }}
+
+ // get time we should wait before attempting to connect again
+ rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff)
+
+ // wait for one of the following to occur,
+ // backoff duration has elapsed, killChannel is signalled, or
+ // the rtm finishes disconnecting.
+ select {
+ case <-time.After(backoff): // retry after the backoff.
+ case intentional := <-rtm.killChannel:
+ if intentional {
+ rtm.killConnection(intentional, ErrRTMDisconnected)
+ return nil, nil, ErrRTMDisconnected
+ }
+ case <-rtm.disconnected:
+ return nil, nil, ErrRTMDisconnected
+ }
+ }
+}
+
+// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true,
+// then it returns the full information returned by the "rtm.start" method on the
+// slack API. Else it uses the "rtm.connect" method to connect
+func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) {
+ var (
+ url string
+ )
+
+ if useRTMStart {
+ rtm.Debugf("Starting RTM")
+ info, url, err = rtm.StartRTM()
+ } else {
+ rtm.Debugf("Connecting to RTM")
+ info, url, err = rtm.ConnectRTM()
+ }
+ if err != nil {
+ rtm.Debugf("Failed to start or connect to RTM: %s", err)
+ return nil, nil, err
+ }
+
+ // install connection parameters
+ u, err := stdurl.Parse(url)
+ if err != nil {
+ return nil, nil, err
+ }
+ u.RawQuery = rtm.connParams.Encode()
+ url = u.String()
+
+ rtm.Debugf("Dialing to websocket on url %s", url)
+ // Only use HTTPS for connections to prevent MITM attacks on the connection.
+ upgradeHeader := http.Header{}
+ upgradeHeader.Add("Origin", "https://api.slack.com")
+ dialer := websocket.DefaultDialer
+ if rtm.dialer != nil {
+ dialer = rtm.dialer
+ }
+ conn, _, err := dialer.Dial(url, upgradeHeader)
+ if err != nil {
+ rtm.Debugf("Failed to dial to the websocket: %s", err)
+ return nil, nil, err
+ }
+ return info, conn, err
+}
+
+// killConnection stops the websocket connection and signals to all goroutines
+// that they should cease listening to the connection for events.
+//
+// This should not be called directly! Instead a boolean value (true for
+// intentional, false otherwise) should be sent to the killChannel on the RTM.
+func (rtm *RTM) killConnection(intentional bool, cause error) (err error) {
+ rtm.Debugln("killing connection", cause)
+
+ if rtm.conn != nil {
+ err = rtm.conn.Close()
+ }
+
+ rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}}
+
+ if intentional {
+ rtm.disconnect()
+ }
+
+ return err
+}
+
+// handleEvents is a blocking function that handles all events. This sends
+// pings when asked to (on rtm.forcePing) and upon every given elapsed
+// interval. This also sends outgoing messages that are received from the RTM's
+// outgoingMessages channel. This also handles incoming raw events from the RTM
+// rawEvents channel.
+func (rtm *RTM) handleEvents(events chan json.RawMessage) {
+ ticker := time.NewTicker(rtm.pingInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ // catch "stop" signal on channel close
+ case intentional := <-rtm.killChannel:
+ _ = rtm.killConnection(intentional, errorsx.String("signaled"))
+ return
+ // detect when the connection is dead.
+ case <-rtm.pingDeadman.C:
+ _ = rtm.killConnection(false, ErrRTMDeadman)
+ return
+ // send pings on ticker interval
+ case <-ticker.C:
+ if err := rtm.ping(); err != nil {
+ _ = rtm.killConnection(false, err)
+ return
+ }
+ case <-rtm.forcePing:
+ if err := rtm.ping(); err != nil {
+ _ = rtm.killConnection(false, err)
+ return
+ }
+ // listen for messages that need to be sent
+ case msg := <-rtm.outgoingMessages:
+ rtm.sendOutgoingMessage(msg)
+ // listen for incoming messages that need to be parsed
+ case rawEvent := <-events:
+ switch rtm.handleRawEvent(rawEvent) {
+ case rtmEventTypeGoodbye:
+ // kill the connection, but DO NOT RETURN, a follow up kill signal will
+ // be sent that still needs to be processed. this duplication is because
+ // the event reader restarts once it emits the goodbye event.
+ // unlike the other cases in this function a final read will be triggered
+ // against the connection which will emit a kill signal. if we return early
+ // this kill signal will be processed by the next connection.
+ _ = rtm.killConnection(false, ErrRTMGoodbye)
+ default:
+ }
+ }
+ }
+}
+
+// handleIncomingEvents monitors the RTM's opened websocket for any incoming
+// events. It pushes the raw events into the channel.
+//
+// This will stop executing once the RTM's when a fatal error is detected, or
+// a disconnect occurs.
+func (rtm *RTM) handleIncomingEvents(events chan json.RawMessage) {
+ for {
+ if err := rtm.receiveIncomingEvent(events); err != nil {
+ select {
+ case rtm.killChannel <- false:
+ case <-rtm.disconnected:
+ }
+ return
+ }
+ }
+}
+
+func (rtm *RTM) sendWithDeadline(msg interface{}) error {
+ // set a write deadline on the connection
+ if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
+ return err
+ }
+ if err := rtm.conn.WriteJSON(msg); err != nil {
+ return err
+ }
+ // remove write deadline
+ return rtm.conn.SetWriteDeadline(time.Time{})
+}
+
+// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket.
+//
+// It does not currently detect if a outgoing message fails due to a disconnect
+// and instead lets a future failed 'PING' detect the failed connection.
+func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
+ rtm.Debugln("Sending message:", msg)
+ if len([]rune(msg.Text)) > MaxMessageTextLength {
+ rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{
+ Message: msg,
+ MaxLength: MaxMessageTextLength,
+ }}
+ return
+ }
+
+ if err := rtm.sendWithDeadline(msg); err != nil {
+ rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{
+ Message: msg,
+ ErrorObj: err,
+ }}
+ }
+}
+
+// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message
+// fails to send then this returns an error signifying that the connection
+// should be considered disconnected.
+//
+// This does not handle incoming 'PONG' responses but does store the time of
+// each successful 'PING' send so latency can be detected upon a 'PONG'
+// response.
+func (rtm *RTM) ping() error {
+ id := rtm.idGen.Next()
+ rtm.Debugln("Sending PING ", id)
+ msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()}
+
+ if err := rtm.sendWithDeadline(msg); err != nil {
+ rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error())
+ return err
+ }
+ return nil
+}
+
+// receiveIncomingEvent attempts to receive an event from the RTM's websocket.
+// This will block until a frame is available from the websocket.
+// If the read from the websocket results in a fatal error, this function will return non-nil.
+func (rtm *RTM) receiveIncomingEvent(events chan json.RawMessage) error {
+ event := json.RawMessage{}
+ err := rtm.conn.ReadJSON(&event)
+
+ // check if the connection was closed.
+ if websocket.IsUnexpectedCloseError(err) {
+ return err
+ }
+
+ switch {
+ case err == io.ErrUnexpectedEOF:
+ // EOF's don't seem to signify a failed connection so instead we ignore
+ // them here and detect a failed connection upon attempting to send a
+ // 'PING' message
+
+ // trigger a 'PING' to detect potential websocket disconnect
+ select {
+ case rtm.forcePing <- true:
+ case <-rtm.disconnected:
+ }
+ case err != nil:
+ // All other errors from ReadJSON come from NextReader, and should
+ // kill the read loop and force a reconnect.
+ rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
+ ErrorObj: err,
+ }}
+
+ return err
+ case len(event) == 0:
+ rtm.Debugln("Received empty event")
+ default:
+ rtm.Debugln("Incoming Event:", string(event))
+ select {
+ case events <- event:
+ case <-rtm.disconnected:
+ rtm.Debugln("disonnected while attempting to send raw event")
+ }
+ }
+
+ return nil
+}
+
+// handleRawEvent takes a raw JSON message received from the slack websocket
+// and handles the encoded event.
+// returns the event type of the message.
+func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string {
+ event := &Event{}
+ err := json.Unmarshal(rawEvent, event)
+ if err != nil {
+ rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
+ return ""
+ }
+
+ switch event.Type {
+ case rtmEventTypeAck:
+ rtm.handleAck(rawEvent)
+ case rtmEventTypeHello:
+ rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}}
+ case rtmEventTypePong:
+ rtm.handlePong(rawEvent)
+ case rtmEventTypeGoodbye:
+ // just return the event type up for goodbye, will be handled by caller.
+ default:
+ rtm.handleEvent(event.Type, rawEvent)
+ }
+
+ return event.Type
+}
+
+// handleAck handles an incoming 'ACK' message.
+func (rtm *RTM) handleAck(event json.RawMessage) {
+ ack := &AckMessage{}
+ if err := json.Unmarshal(event, ack); err != nil {
+ rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
+ rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
+ return
+ }
+
+ if ack.Ok {
+ rtm.IncomingEvents <- RTMEvent{"ack", ack}
+ } else if ack.RTMResponse.Error != nil {
+ // As there is no documentation for RTM error-codes, this
+ // identification of a rate-limit warning is very brittle.
+ if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." {
+ rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}}
+ } else {
+ rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}}
+ }
+ } else {
+ rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}}
+ }
+}
+
+// handlePong handles an incoming 'PONG' message which should be in response to
+// a previously sent 'PING' message. This is then used to compute the
+// connection's latency.
+func (rtm *RTM) handlePong(event json.RawMessage) {
+ var (
+ p Pong
+ )
+
+ rtm.resetDeadman()
+
+ if err := json.Unmarshal(event, &p); err != nil {
+ rtm.Client.log.Println("RTM Error unmarshalling 'pong' event:", err)
+ return
+ }
+
+ latency := time.Since(time.Unix(p.Timestamp, 0))
+ rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}}
+}
+
+// handleEvent is the "default" response to an event that does not have a
+// special case. It matches the command's name to a mapping of defined events
+// and then sends the corresponding event struct to the IncomingEvents channel.
+// If the event type is not found or the event cannot be unmarshalled into the
+// correct struct then this sends an UnmarshallingErrorEvent to the
+// IncomingEvents channel.
+func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
+ v, exists := EventMapping[typeStr]
+ if !exists {
+ rtm.Debugf("RTM Error - received unmapped event %q: %s\n", typeStr, string(event))
+ err := fmt.Errorf("RTM Error: Received unmapped event %q: %s", typeStr, string(event))
+ rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
+ return
+ }
+ t := reflect.TypeOf(v)
+ recvEvent := reflect.New(t).Interface()
+ err := json.Unmarshal(event, recvEvent)
+ if err != nil {
+ rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event))
+ err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s", typeStr, string(event))
+ rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
+ return
+ }
+ rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent}
+}
+
+// EventMapping holds a mapping of event names to their corresponding struct
+// implementations. The structs should be instances of the unmarshalling
+// target for the matching event type.
+var EventMapping = map[string]interface{}{
+ "message": MessageEvent{},
+ "presence_change": PresenceChangeEvent{},
+ "user_typing": UserTypingEvent{},
+
+ "channel_marked": ChannelMarkedEvent{},
+ "channel_created": ChannelCreatedEvent{},
+ "channel_joined": ChannelJoinedEvent{},
+ "channel_left": ChannelLeftEvent{},
+ "channel_deleted": ChannelDeletedEvent{},
+ "channel_rename": ChannelRenameEvent{},
+ "channel_archive": ChannelArchiveEvent{},
+ "channel_unarchive": ChannelUnarchiveEvent{},
+ "channel_history_changed": ChannelHistoryChangedEvent{},
+
+ "dnd_updated": DNDUpdatedEvent{},
+ "dnd_updated_user": DNDUpdatedEvent{},
+
+ "im_created": IMCreatedEvent{},
+ "im_open": IMOpenEvent{},
+ "im_close": IMCloseEvent{},
+ "im_marked": IMMarkedEvent{},
+ "im_history_changed": IMHistoryChangedEvent{},
+
+ "group_marked": GroupMarkedEvent{},
+ "group_open": GroupOpenEvent{},
+ "group_joined": GroupJoinedEvent{},
+ "group_left": GroupLeftEvent{},
+ "group_close": GroupCloseEvent{},
+ "group_rename": GroupRenameEvent{},
+ "group_archive": GroupArchiveEvent{},
+ "group_unarchive": GroupUnarchiveEvent{},
+ "group_history_changed": GroupHistoryChangedEvent{},
+
+ "file_created": FileCreatedEvent{},
+ "file_shared": FileSharedEvent{},
+ "file_unshared": FileUnsharedEvent{},
+ "file_public": FilePublicEvent{},
+ "file_private": FilePrivateEvent{},
+ "file_change": FileChangeEvent{},
+ "file_deleted": FileDeletedEvent{},
+ "file_comment_added": FileCommentAddedEvent{},
+ "file_comment_edited": FileCommentEditedEvent{},
+ "file_comment_deleted": FileCommentDeletedEvent{},
+
+ "pin_added": PinAddedEvent{},
+ "pin_removed": PinRemovedEvent{},
+
+ "star_added": StarAddedEvent{},
+ "star_removed": StarRemovedEvent{},
+
+ "reaction_added": ReactionAddedEvent{},
+ "reaction_removed": ReactionRemovedEvent{},
+
+ "pref_change": PrefChangeEvent{},
+
+ "team_join": TeamJoinEvent{},
+ "team_rename": TeamRenameEvent{},
+ "team_pref_change": TeamPrefChangeEvent{},
+ "team_domain_change": TeamDomainChangeEvent{},
+ "team_migration_started": TeamMigrationStartedEvent{},
+
+ "manual_presence_change": ManualPresenceChangeEvent{},
+
+ "user_change": UserChangeEvent{},
+
+ "emoji_changed": EmojiChangedEvent{},
+
+ "commands_changed": CommandsChangedEvent{},
+
+ "email_domain_changed": EmailDomainChangedEvent{},
+
+ "bot_added": BotAddedEvent{},
+ "bot_changed": BotChangedEvent{},
+
+ "accounts_changed": AccountsChangedEvent{},
+
+ "reconnect_url": ReconnectUrlEvent{},
+
+ "member_joined_channel": MemberJoinedChannelEvent{},
+ "member_left_channel": MemberLeftChannelEvent{},
+
+ "subteam_created": SubteamCreatedEvent{},
+ "subteam_self_added": SubteamSelfAddedEvent{},
+ "subteam_self_removed": SubteamSelfRemovedEvent{},
+ "subteam_updated": SubteamUpdatedEvent{},
+
+ "desktop_notification": DesktopNotificationEvent{},
+}