diff options
Diffstat (limited to 'vendor/github.com/nlopes/slack/websocket_managed_conn.go')
-rw-r--r-- | vendor/github.com/nlopes/slack/websocket_managed_conn.go | 581 |
1 files changed, 0 insertions, 581 deletions
diff --git a/vendor/github.com/nlopes/slack/websocket_managed_conn.go b/vendor/github.com/nlopes/slack/websocket_managed_conn.go deleted file mode 100644 index dbbf682d..00000000 --- a/vendor/github.com/nlopes/slack/websocket_managed_conn.go +++ /dev/null @@ -1,581 +0,0 @@ -package slack - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - stdurl "net/url" - "reflect" - "time" - - "github.com/gorilla/websocket" - "github.com/nlopes/slack/internal/errorsx" - "github.com/nlopes/slack/internal/timex" -) - -// ManageConnection can be called on a Slack RTM instance returned by the -// NewRTM method. It will connect to the slack RTM API and handle all incoming -// and outgoing events. If a connection fails then it will attempt to reconnect -// and will notify any listeners through an error event on the IncomingEvents -// channel. -// -// If the connection ends and the disconnect was unintentional then this will -// attempt to reconnect. -// -// This should only be called once per slack API! Otherwise expect undefined -// behavior. -// -// The defined error events are located in websocket_internals.go. -func (rtm *RTM) ManageConnection() { - var ( - err error - info *Info - conn *websocket.Conn - ) - - for connectionCount := 0; ; connectionCount++ { - // start trying to connect - // the returned err is already passed onto the IncomingEvents channel - if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil { - // when the connection is unsuccessful its fatal, and we need to bail out. - rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err) - rtm.disconnect() - return - } - - // lock to prevent data races with Disconnect particularly around isConnected - // and conn. - rtm.mu.Lock() - rtm.conn = conn - rtm.info = info - rtm.mu.Unlock() - - rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{ - ConnectionCount: connectionCount, - Info: info, - }} - - rtm.Debugf("RTM connection succeeded on try %d", connectionCount) - - rawEvents := make(chan json.RawMessage) - // we're now connected so we can set up listeners - go rtm.handleIncomingEvents(rawEvents) - // this should be a blocking call until the connection has ended - rtm.handleEvents(rawEvents) - - select { - case <-rtm.disconnected: - // after handle events returns we need to check if we're disconnected - // when this happens we need to cleanup the newly created connection. - if err = conn.Close(); err != nil { - rtm.Debugln("failed to close conn on disconnected RTM", err) - } - return - default: - // otherwise continue and run the loop again to reconnect - } - } -} - -// connect attempts to connect to the slack websocket API. It handles any -// errors that occur while connecting and will return once a connection -// has been successfully opened. -// If useRTMStart is false then it uses rtm.connect to create the connection, -// otherwise it uses rtm.start. -func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) { - const ( - errInvalidAuth = "invalid_auth" - errInactiveAccount = "account_inactive" - errMissingAuthToken = "not_authed" - ) - - // used to provide exponential backoff wait time with jitter before trying - // to connect to slack again - boff := &backoff{ - Max: 5 * time.Minute, - } - - for { - var ( - backoff time.Duration - ) - - // send connecting event - rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{ - Attempt: boff.attempts + 1, - ConnectionCount: connectionCount, - }} - - // attempt to start the connection - info, conn, err := rtm.startRTMAndDial(useRTMStart) - if err == nil { - return info, conn, nil - } - - // check for fatal errors - switch err.Error() { - case errInvalidAuth, errInactiveAccount, errMissingAuthToken: - rtm.Debugf("invalid auth when connecting with RTM: %s", err) - rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} - return nil, nil, err - default: - } - - switch actual := err.(type) { - case statusCodeError: - if actual.Code == http.StatusNotFound { - rtm.Debugf("invalid auth when connecting with RTM: %s", err) - rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} - return nil, nil, err - } - case *RateLimitedError: - backoff = actual.RetryAfter - default: - } - - backoff = timex.Max(backoff, boff.Duration()) - // any other errors are treated as recoverable and we try again after - // sending the event along the IncomingEvents channel - rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{ - Attempt: boff.attempts, - Backoff: backoff, - ErrorObj: err, - }} - - // get time we should wait before attempting to connect again - rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff) - - // wait for one of the following to occur, - // backoff duration has elapsed, killChannel is signalled, or - // the rtm finishes disconnecting. - select { - case <-time.After(backoff): // retry after the backoff. - case intentional := <-rtm.killChannel: - if intentional { - rtm.killConnection(intentional, ErrRTMDisconnected) - return nil, nil, ErrRTMDisconnected - } - case <-rtm.disconnected: - return nil, nil, ErrRTMDisconnected - } - } -} - -// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true, -// then it returns the full information returned by the "rtm.start" method on the -// slack API. Else it uses the "rtm.connect" method to connect -func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) { - var ( - url string - ) - - if useRTMStart { - rtm.Debugf("Starting RTM") - info, url, err = rtm.StartRTM() - } else { - rtm.Debugf("Connecting to RTM") - info, url, err = rtm.ConnectRTM() - } - if err != nil { - rtm.Debugf("Failed to start or connect to RTM: %s", err) - return nil, nil, err - } - - // install connection parameters - u, err := stdurl.Parse(url) - if err != nil { - return nil, nil, err - } - u.RawQuery = rtm.connParams.Encode() - url = u.String() - - rtm.Debugf("Dialing to websocket on url %s", url) - // Only use HTTPS for connections to prevent MITM attacks on the connection. - upgradeHeader := http.Header{} - upgradeHeader.Add("Origin", "https://api.slack.com") - dialer := websocket.DefaultDialer - if rtm.dialer != nil { - dialer = rtm.dialer - } - conn, _, err := dialer.Dial(url, upgradeHeader) - if err != nil { - rtm.Debugf("Failed to dial to the websocket: %s", err) - return nil, nil, err - } - return info, conn, err -} - -// killConnection stops the websocket connection and signals to all goroutines -// that they should cease listening to the connection for events. -// -// This should not be called directly! Instead a boolean value (true for -// intentional, false otherwise) should be sent to the killChannel on the RTM. -func (rtm *RTM) killConnection(intentional bool, cause error) (err error) { - rtm.Debugln("killing connection", cause) - - if rtm.conn != nil { - err = rtm.conn.Close() - } - - rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}} - - if intentional { - rtm.disconnect() - } - - return err -} - -// handleEvents is a blocking function that handles all events. This sends -// pings when asked to (on rtm.forcePing) and upon every given elapsed -// interval. This also sends outgoing messages that are received from the RTM's -// outgoingMessages channel. This also handles incoming raw events from the RTM -// rawEvents channel. -func (rtm *RTM) handleEvents(events chan json.RawMessage) { - ticker := time.NewTicker(rtm.pingInterval) - defer ticker.Stop() - for { - select { - // catch "stop" signal on channel close - case intentional := <-rtm.killChannel: - _ = rtm.killConnection(intentional, errorsx.String("signaled")) - return - // detect when the connection is dead. - case <-rtm.pingDeadman.C: - _ = rtm.killConnection(false, ErrRTMDeadman) - return - // send pings on ticker interval - case <-ticker.C: - if err := rtm.ping(); err != nil { - _ = rtm.killConnection(false, err) - return - } - case <-rtm.forcePing: - if err := rtm.ping(); err != nil { - _ = rtm.killConnection(false, err) - return - } - // listen for messages that need to be sent - case msg := <-rtm.outgoingMessages: - rtm.sendOutgoingMessage(msg) - // listen for incoming messages that need to be parsed - case rawEvent := <-events: - switch rtm.handleRawEvent(rawEvent) { - case rtmEventTypeGoodbye: - // kill the connection, but DO NOT RETURN, a follow up kill signal will - // be sent that still needs to be processed. this duplication is because - // the event reader restarts once it emits the goodbye event. - // unlike the other cases in this function a final read will be triggered - // against the connection which will emit a kill signal. if we return early - // this kill signal will be processed by the next connection. - _ = rtm.killConnection(false, ErrRTMGoodbye) - default: - } - } - } -} - -// handleIncomingEvents monitors the RTM's opened websocket for any incoming -// events. It pushes the raw events into the channel. -// -// This will stop executing once the RTM's when a fatal error is detected, or -// a disconnect occurs. -func (rtm *RTM) handleIncomingEvents(events chan json.RawMessage) { - for { - if err := rtm.receiveIncomingEvent(events); err != nil { - select { - case rtm.killChannel <- false: - case <-rtm.disconnected: - } - return - } - } -} - -func (rtm *RTM) sendWithDeadline(msg interface{}) error { - // set a write deadline on the connection - if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { - return err - } - if err := rtm.conn.WriteJSON(msg); err != nil { - return err - } - // remove write deadline - return rtm.conn.SetWriteDeadline(time.Time{}) -} - -// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket. -// -// It does not currently detect if a outgoing message fails due to a disconnect -// and instead lets a future failed 'PING' detect the failed connection. -func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) { - rtm.Debugln("Sending message:", msg) - if len([]rune(msg.Text)) > MaxMessageTextLength { - rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{ - Message: msg, - MaxLength: MaxMessageTextLength, - }} - return - } - - if err := rtm.sendWithDeadline(msg); err != nil { - rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{ - Message: msg, - ErrorObj: err, - }} - } -} - -// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message -// fails to send then this returns an error signifying that the connection -// should be considered disconnected. -// -// This does not handle incoming 'PONG' responses but does store the time of -// each successful 'PING' send so latency can be detected upon a 'PONG' -// response. -func (rtm *RTM) ping() error { - id := rtm.idGen.Next() - rtm.Debugln("Sending PING ", id) - msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()} - - if err := rtm.sendWithDeadline(msg); err != nil { - rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error()) - return err - } - return nil -} - -// receiveIncomingEvent attempts to receive an event from the RTM's websocket. -// This will block until a frame is available from the websocket. -// If the read from the websocket results in a fatal error, this function will return non-nil. -func (rtm *RTM) receiveIncomingEvent(events chan json.RawMessage) error { - event := json.RawMessage{} - err := rtm.conn.ReadJSON(&event) - - // check if the connection was closed. - if websocket.IsUnexpectedCloseError(err) { - return err - } - - switch { - case err == io.ErrUnexpectedEOF: - // EOF's don't seem to signify a failed connection so instead we ignore - // them here and detect a failed connection upon attempting to send a - // 'PING' message - - // trigger a 'PING' to detect potential websocket disconnect - select { - case rtm.forcePing <- true: - case <-rtm.disconnected: - } - case err != nil: - // All other errors from ReadJSON come from NextReader, and should - // kill the read loop and force a reconnect. - rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{ - ErrorObj: err, - }} - - return err - case len(event) == 0: - rtm.Debugln("Received empty event") - default: - rtm.Debugln("Incoming Event:", string(event)) - select { - case events <- event: - case <-rtm.disconnected: - rtm.Debugln("disonnected while attempting to send raw event") - } - } - - return nil -} - -// handleRawEvent takes a raw JSON message received from the slack websocket -// and handles the encoded event. -// returns the event type of the message. -func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string { - event := &Event{} - err := json.Unmarshal(rawEvent, event) - if err != nil { - rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} - return "" - } - - switch event.Type { - case rtmEventTypeAck: - rtm.handleAck(rawEvent) - case rtmEventTypeHello: - rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}} - case rtmEventTypePong: - rtm.handlePong(rawEvent) - case rtmEventTypeGoodbye: - // just return the event type up for goodbye, will be handled by caller. - default: - rtm.handleEvent(event.Type, rawEvent) - } - - return event.Type -} - -// handleAck handles an incoming 'ACK' message. -func (rtm *RTM) handleAck(event json.RawMessage) { - ack := &AckMessage{} - if err := json.Unmarshal(event, ack); err != nil { - rtm.Debugln("RTM Error unmarshalling 'ack' event:", err) - rtm.Debugln(" -> Erroneous 'ack' event:", string(event)) - return - } - - if ack.Ok { - rtm.IncomingEvents <- RTMEvent{"ack", ack} - } else if ack.RTMResponse.Error != nil { - // As there is no documentation for RTM error-codes, this - // identification of a rate-limit warning is very brittle. - if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." { - rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}} - } else { - rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}} - } - } else { - rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}} - } -} - -// handlePong handles an incoming 'PONG' message which should be in response to -// a previously sent 'PING' message. This is then used to compute the -// connection's latency. -func (rtm *RTM) handlePong(event json.RawMessage) { - var ( - p Pong - ) - - rtm.resetDeadman() - - if err := json.Unmarshal(event, &p); err != nil { - rtm.Client.log.Println("RTM Error unmarshalling 'pong' event:", err) - return - } - - latency := time.Since(time.Unix(p.Timestamp, 0)) - rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}} -} - -// handleEvent is the "default" response to an event that does not have a -// special case. It matches the command's name to a mapping of defined events -// and then sends the corresponding event struct to the IncomingEvents channel. -// If the event type is not found or the event cannot be unmarshalled into the -// correct struct then this sends an UnmarshallingErrorEvent to the -// IncomingEvents channel. -func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) { - v, exists := EventMapping[typeStr] - if !exists { - rtm.Debugf("RTM Error - received unmapped event %q: %s\n", typeStr, string(event)) - err := fmt.Errorf("RTM Error: Received unmapped event %q: %s", typeStr, string(event)) - rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} - return - } - t := reflect.TypeOf(v) - recvEvent := reflect.New(t).Interface() - err := json.Unmarshal(event, recvEvent) - if err != nil { - rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event)) - err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s", typeStr, string(event)) - rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} - return - } - rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent} -} - -// EventMapping holds a mapping of event names to their corresponding struct -// implementations. The structs should be instances of the unmarshalling -// target for the matching event type. -var EventMapping = map[string]interface{}{ - "message": MessageEvent{}, - "presence_change": PresenceChangeEvent{}, - "user_typing": UserTypingEvent{}, - - "channel_marked": ChannelMarkedEvent{}, - "channel_created": ChannelCreatedEvent{}, - "channel_joined": ChannelJoinedEvent{}, - "channel_left": ChannelLeftEvent{}, - "channel_deleted": ChannelDeletedEvent{}, - "channel_rename": ChannelRenameEvent{}, - "channel_archive": ChannelArchiveEvent{}, - "channel_unarchive": ChannelUnarchiveEvent{}, - "channel_history_changed": ChannelHistoryChangedEvent{}, - - "dnd_updated": DNDUpdatedEvent{}, - "dnd_updated_user": DNDUpdatedEvent{}, - - "im_created": IMCreatedEvent{}, - "im_open": IMOpenEvent{}, - "im_close": IMCloseEvent{}, - "im_marked": IMMarkedEvent{}, - "im_history_changed": IMHistoryChangedEvent{}, - - "group_marked": GroupMarkedEvent{}, - "group_open": GroupOpenEvent{}, - "group_joined": GroupJoinedEvent{}, - "group_left": GroupLeftEvent{}, - "group_close": GroupCloseEvent{}, - "group_rename": GroupRenameEvent{}, - "group_archive": GroupArchiveEvent{}, - "group_unarchive": GroupUnarchiveEvent{}, - "group_history_changed": GroupHistoryChangedEvent{}, - - "file_created": FileCreatedEvent{}, - "file_shared": FileSharedEvent{}, - "file_unshared": FileUnsharedEvent{}, - "file_public": FilePublicEvent{}, - "file_private": FilePrivateEvent{}, - "file_change": FileChangeEvent{}, - "file_deleted": FileDeletedEvent{}, - "file_comment_added": FileCommentAddedEvent{}, - "file_comment_edited": FileCommentEditedEvent{}, - "file_comment_deleted": FileCommentDeletedEvent{}, - - "pin_added": PinAddedEvent{}, - "pin_removed": PinRemovedEvent{}, - - "star_added": StarAddedEvent{}, - "star_removed": StarRemovedEvent{}, - - "reaction_added": ReactionAddedEvent{}, - "reaction_removed": ReactionRemovedEvent{}, - - "pref_change": PrefChangeEvent{}, - - "team_join": TeamJoinEvent{}, - "team_rename": TeamRenameEvent{}, - "team_pref_change": TeamPrefChangeEvent{}, - "team_domain_change": TeamDomainChangeEvent{}, - "team_migration_started": TeamMigrationStartedEvent{}, - - "manual_presence_change": ManualPresenceChangeEvent{}, - - "user_change": UserChangeEvent{}, - - "emoji_changed": EmojiChangedEvent{}, - - "commands_changed": CommandsChangedEvent{}, - - "email_domain_changed": EmailDomainChangedEvent{}, - - "bot_added": BotAddedEvent{}, - "bot_changed": BotChangedEvent{}, - - "accounts_changed": AccountsChangedEvent{}, - - "reconnect_url": ReconnectUrlEvent{}, - - "member_joined_channel": MemberJoinedChannelEvent{}, - "member_left_channel": MemberLeftChannelEvent{}, - - "subteam_created": SubteamCreatedEvent{}, - "subteam_self_added": SubteamSelfAddedEvent{}, - "subteam_self_removed": SubteamSelfRemovedEvent{}, - "subteam_updated": SubteamUpdatedEvent{}, - - "desktop_notification": DesktopNotificationEvent{}, -} |