package slack
import (
"encoding/json"
"fmt"
"io"
"net/http"
stdurl "net/url"
"reflect"
"time"
"github.com/gorilla/websocket"
"github.com/nlopes/slack/internal/errorsx"
"github.com/nlopes/slack/internal/timex"
)
// ManageConnection can be called on a Slack RTM instance returned by the
// NewRTM method. It will connect to the slack RTM API and handle all incoming
// and outgoing events. If a connection fails then it will attempt to reconnect
// and will notify any listeners through an error event on the IncomingEvents
// channel.
//
// If the connection ends and the disconnect was unintentional then this will
// attempt to reconnect.
//
// This should only be called once per slack API! Otherwise expect undefined
// behavior.
//
// The defined error events are located in websocket_internals.go.
func (rtm *RTM) ManageConnection() {
var (
err error
info *Info
conn *websocket.Conn
)
for connectionCount := 0; ; connectionCount++ {
// start trying to connect
// the returned err is already passed onto the IncomingEvents channel
if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil {
// when the connection is unsuccessful its fatal, and we need to bail out.
rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
rtm.disconnect()
return
}
// lock to prevent data races with Disconnect particularly around isConnected
// and conn.
rtm.mu.Lock()
rtm.conn = conn
rtm.info = info
rtm.mu.Unlock()
rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{
ConnectionCount: connectionCount,
Info: info,
}}
rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
rawEvents := make(chan json.RawMessage)
// we're now connected so we can set up listeners
go rtm.handleIncomingEvents(rawEvents)
// this should be a blocking call until the connection has ended
rtm.handleEvents(rawEvents)
select {
case <-rtm.disconnected:
// after handle events returns we need to check if we're disconnected
// when this happens we need to cleanup the newly created connection.
if err = conn.Close(); err != nil {
rtm.Debugln("failed to close conn on disconnected RTM", err)
}
return
default:
// otherwise continue and run the loop again to reconnect
}
}
}
// connect attempts to connect to the slack websocket API. It handles any
// errors that occur while connecting and will return once a connection
// has been successfully opened.
// If useRTMStart is false then it uses rtm.connect to create the connection,
// otherwise it uses rtm.start.
func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) {
const (
errInvalidAuth = "invalid_auth"
errInactiveAccount = "account_inactive"
errMissingAuthToken = "not_authed"
)
// used to provide exponential backoff wait time with jitter before trying
// to connect to slack again
boff := &backoff{
Max: 5 * time.Minute,
}
for {
var (
backoff time.Duration
)
// send connecting event
rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{
Attempt: boff.attempts + 1,
ConnectionCount: connectionCount,
}}
// attempt to start the connection
info, conn, err := rtm.startRTMAndDial(useRTMStart)
if err == nil {
return info, conn, nil
}
// check for fatal errors
switch err.Error() {
case errInvalidAuth, errInactiveAccount, errMissingAuthToken:
rtm.Debugf("invalid auth when connecting with RTM: %s", err)
rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
return nil, nil, err
default:
}
switch actual := err.(type) {
case statusCodeError:
if actual.Code == http.StatusNotFound {
rtm.Debugf("invalid auth when connecting with RTM: %s", err)
rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
return nil, nil, err
}
case *RateLimitedError:
backoff = actual.RetryAfter
default:
}
backoff = timex.Max(backoff, boff.Duration())
// any other errors are treated as recoverable and we try again after
// sending the event along the IncomingEvents channel
rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{
Attempt: boff.attempts,
Backoff: backoff,
ErrorObj: err,
}}
// get time we should wait before attempting to connect again
rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff)
// wait for one of the following to occur,
// backoff duration has elapsed, killChannel is signalled, or
// the rtm finishes disconnecting.
select {
case <-time.After(backoff): // retry after the backoff.
case intentional := <-rtm.killChannel:
if intentional {
rtm.killConnection(intentional, ErrRTMDisconnected)
return nil, nil, ErrRTMDisconnected
}
case <-rtm.disconnected:
return nil, nil, ErrRTMDisconnected
}
}
}
// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true,
// then it returns the full information returned by the "rtm.start" method on the
// slack API. Else it uses the "rtm.connect" method to connect
func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) {
var (
url string
)
if useRTMStart {
rtm.Debugf("Starting RTM")
info, url, err = rtm.StartRTM()
} else {
rtm.Debugf("Connecting to RTM")
info, url, err = rtm.ConnectRTM()
}
if err != nil {
rtm.Debugf("Failed to start or connect to RTM: %s", err)
return nil, nil, err
}
// install connection parameters
u, err := stdurl.Parse(url)
if err != nil {
return nil, nil, err
}
u.RawQuery = rtm.connParams.Encode()
url = u.String()
rtm.Debugf("Dialing to websocket on url %s", url)
// Only use HTTPS for connections to prevent MITM attacks on the connection.
upgradeHeader := http.Header{}
upgradeHeader.Add("Origin", "https://api.slack.com")
dialer := websocket.DefaultDialer
if rtm.dialer != nil {
dialer = rtm.dialer
}
conn, _, err := dialer.Dial(url, upgradeHeader)
if err != nil {
rtm.Debugf("Failed to dial to the websocket: %s", err)
return nil, nil, err
}
return info, conn, err
}
// killConnection stops the websocket connection and signals to all goroutines
// that they should cease listening to the connection for events.
//
// This should not be called directly! Instead a boolean value (true for
// intentional, false otherwise) should be sent to the killChannel on the RTM.
func (rtm *RTM) killConnection(intentional bool, cause error) (err error) {
rtm.Debugln("killing connection", cause)
if rtm.conn != nil {
err = rtm.conn.Close()
}
rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}}
if intentional {
rtm.disconnect()
}
return err
}
// handleEvents is a blocking function that handles all events. This sends
// pings when asked to (on rtm.forcePing) and upon every given elapsed
// interval. This also sends outgoing messages that are received from the RTM's
// outgoingMessages channel. This also handles incoming raw events from the RTM
// rawEvents channel.
func (rtm *RTM) handleEvents(events chan json.RawMessage) {
ticker := time.NewTicker(rtm.pingInterval)
defer ticker.Stop()
for {
select {
// catch "stop" signal on channel close
case intentional := <-rtm.killChannel:
_ = rtm.killConnection(intentional, errorsx.String("signaled"))
return
// detect when the connection is dead.
case <-rtm.pingDeadman.C:
_ = rtm.killConnection(false, ErrRTMDeadman)
return
// send pings on ticker interval
case <-ticker.C:
if err := rtm.ping(); err != nil {
_ = rtm.killConnection(false, err)
return
}
case <-rtm.forcePing:
if err := rtm.ping(); err != nil {
_ = rtm.killConnection(false, err)
return
}
// listen for messages that need to be sent
case msg := <-rtm.outgoingMessages:
rtm.sendOutgoingMessage(msg)
// listen for incoming messages that need to be parsed
case rawEvent := <-events:
switch rtm.handleRawEvent(rawEvent) {
case rtmEventTypeGoodbye:
// kill the connection, but DO NOT RETURN, a follow up kill signal will
// be sent that still needs to be processed. this duplication is because
// the event reader restarts once it emits the goodbye event.
// unlike the other cases in this function a final read will be triggered
// against the connection which will emit a kill signal. if we return early
// this kill signal will be processed by the next connection.
_ = rtm.killConnection(false, ErrRTMGoodbye)
default:
}
}
}
}
// handleIncomingEvents monitors the RTM's opened websocket for any incoming
// events. It pushes the raw events into the channel.
//
// This will stop executing once the RTM's when a fatal error is detected, or
// a disconnect occurs.
func (rtm *RTM) handleIncomingEvents(events chan json.RawMessage) {
for {
if err := rtm.receiveIncomingEvent(events); err != nil {
select {
case rtm.killChannel <- false:
case <-rtm.disconnected:
}
return
}
}
}
func (rtm *RTM) sendWithDeadline(msg interface{}) error {
// set a write deadline on the connection
if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
return err
}
if err := rtm.conn.WriteJSON(msg); err != nil {
return err
}
// remove write deadline
return rtm.conn.SetWriteDeadline(time.Time{})
}
// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket.
//
// It does not currently detect if a outgoing message fails due to a disconnect
// and instead lets a future failed 'PING' detect the failed connection.
func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
rtm.Debugln("Sending message:", msg)
if len([]rune(msg.Text)) > MaxMessageTextLength {
rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{
Message: msg,
MaxLength: MaxMessageTextLength,
}}
return
}
if err := rtm.sendWithDeadline(msg); err != nil {
rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{
Message: msg,
ErrorObj: err,
}}
}
}
// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message
// fails to send then this returns an error signifying that the connection
// should be considered disconnected.
//
// This does not handle incoming 'PONG' responses but does store the time of
// each successful 'PING' send so latency can be detected upon a 'PONG'
// response.
func (rtm *RTM) ping() error {
id := rtm.idGen.Next()
rtm.Debugln("Sending PING ", id)
msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()}
if err := rtm.sendWithDeadline(msg); err != nil {
rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error())
return err
}
return nil
}
// receiveIncomingEvent attempts to receive an event from the RTM's websocket.
// This will block until a frame is available from the websocket.
// If the read from the websocket results in a fatal error, this function will return non-nil.
func (rtm *RTM) receiveIncomingEvent(events chan json.RawMessage) error {
event := json.RawMessage{}
err := rtm.conn.ReadJSON(&event)
// check if the connection was closed.
if websocket.IsUnexpectedCloseError(err) {
return err
}
switch {
case err == io.ErrUnexpectedEOF:
// EOF's don't seem to signify a failed connection so instead we ignore
// them here and detect a failed connection upon attempting to send a
// 'PING' message
// trigger a 'PING' to detect potential websocket disconnect
select {
case rtm.forcePing <- true:
case <-rtm.disconnected:
}
case err != nil:
// All other errors from ReadJSON come from NextReader, and should
// kill the read loop and force a reconnect.
rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
ErrorObj: err,
}}
return err
case len(event) == 0:
rtm.Debugln("Received empty event")
default:
rtm.Debugln("Incoming Event:", string(event))
select {
case events <- event:
case <-rtm.disconnected:
rtm.Debugln("disonnected while attempting to send raw event")
}
}
return nil
}
// handleRawEvent takes a raw JSON message received from the slack websocket
// and handles the encoded event.
// returns the event type of the message.
func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string {
event := &Event{}
err := json.Unmarshal(rawEvent, event)
if err != nil {
rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
return ""
}
switch event.Type {
case rtmEventTypeAck:
rtm.handleAck(rawEvent)
case rtmEventTypeHello:
rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}}
case rtmEventTypePong:
rtm.handlePong(rawEvent)
case rtmEventTypeGoodbye:
// just return the event type up for goodbye, will be handled by caller.
default:
rtm.handleEvent(event.Type, rawEvent)
}
return event.Type
}
// handleAck handles an incoming 'ACK' message.
func (rtm *RTM) handleAck(event json.RawMessage) {
ack := &AckMessage{}
if err := json.Unmarshal(event, ack); err != nil {
rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
return
}
if ack.Ok {
rtm.IncomingEvents <- RTMEvent{"ack", ack}
} else if ack.RTMResponse.Error != nil {
// As there is no documentation for RTM error-codes, this
// identification of a rate-limit warning is very brittle.
if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." {
rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}}
} else {
rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}}
}
} else {
rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}}
}
}
// handlePong handles an incoming 'PONG' message which should be in response to
// a previously sent 'PING' message. This is then used to compute the
// connection's latency.
func (rtm *RTM) handlePong(event json.RawMessage) {
var (
p Pong
)
rtm.resetDeadman()
if err := json.Unmarshal(event, &p); err != nil {
rtm.Client.log.Println("RTM Error unmarshalling 'pong' event:", err)
return
}
latency := time.Since(time.Unix(p.Timestamp, 0))
rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}}
}
// handleEvent is the "default" response to an event that does not have a
// special case. It matches the command's name to a mapping of defined events
// and then sends the corresponding event struct to the IncomingEvents channel.
// If the event type is not found or the event cannot be unmarshalled into the
// correct struct then this sends an UnmarshallingErrorEvent to the
// IncomingEvents channel.
func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
v, exists := EventMapping[typeStr]
if !exists {
rtm.Debugf("RTM Error - received unmapped event %q: %s\n", typeStr, string(event))
err := fmt.Errorf("RTM Error: Received unmapped event %q: %s", typeStr, string(event))
rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
return
}
t := reflect.TypeOf(v)
recvEvent := reflect.New(t).Interface()
err := json.Unmarshal(event, recvEvent)
if err != nil {
rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event))
err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s", typeStr, string(event))
rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
return
}
rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent}
}
// EventMapping holds a mapping of event names to their corresponding struct
// implementations. The structs should be instances of the unmarshalling
// target for the matching event type.
var EventMapping = map[string]interface{}{
"message": MessageEvent{},
"presence_change": PresenceChangeEvent{},
"user_typing": UserTypingEvent{},
"channel_marked": ChannelMarkedEvent{},
"channel_created": ChannelCreatedEvent{},
"channel_joined": ChannelJoinedEvent{},
"channel_left": ChannelLeftEvent{},
"channel_deleted": ChannelDeletedEvent{},
"channel_rename": ChannelRenameEvent{},
"channel_archive": ChannelArchiveEvent{},
"channel_unarchive": ChannelUnarchiveEvent{},
"channel_history_changed": ChannelHistoryChangedEvent{},
"dnd_updated": DNDUpdatedEvent{},
"dnd_updated_user": DNDUpdatedEvent{},
"im_created": IMCreatedEvent{},
"im_open": IMOpenEvent{},
"im_close": IMCloseEvent{},
"im_marked": IMMarkedEvent{},
"im_history_changed": IMHistoryChangedEvent{},
"group_marked": GroupMarkedEvent{},
"group_open": GroupOpenEvent{},
"group_joined": GroupJoinedEvent{},
"group_left": GroupLeftEvent{},
"group_close": GroupCloseEvent{},
"group_rename": GroupRenameEvent{},
"group_archive": GroupArchiveEvent{},
"group_unarchive": GroupUnarchiveEvent{},
"group_history_changed": GroupHistoryChangedEvent{},
"file_created": FileCreatedEvent{},
"file_shared": FileSharedEvent{},
"file_unshared": FileUnsharedEvent{},
"file_public": FilePublicEvent{},
"file_private": FilePrivateEvent{},
"file_change": FileChangeEvent{},
"file_deleted": FileDeletedEvent{},
"file_comment_added": FileCommentAddedEvent{},
"file_comment_edited": FileCommentEditedEvent{},
"file_comment_deleted": FileCommentDeletedEvent{},
"pin_added": PinAddedEvent{},
"pin_removed": PinRemovedEvent{},
"star_added": StarAddedEvent{},
"star_removed": StarRemovedEvent{},
"reaction_added": ReactionAddedEvent{},
"reaction_removed": ReactionRemovedEvent{},
"pref_change": PrefChangeEvent{},
"team_join": TeamJoinEvent{},
"team_rename": TeamRenameEvent{},
"team_pref_change": TeamPrefChangeEvent{},
"team_domain_change": TeamDomainChangeEvent{},
"team_migration_started": TeamMigrationStartedEvent{},
"manual_presence_change": ManualPresenceChangeEvent{},
"user_change": UserChangeEvent{},
"emoji_changed": EmojiChangedEvent{},
"commands_changed": CommandsChangedEvent{},
"email_domain_changed": EmailDomainChangedEvent{},
"bot_added": BotAddedEvent{},
"bot_changed": BotChangedEvent{},
"accounts_changed": AccountsChangedEvent{},
"reconnect_url": ReconnectUrlEvent{},
"member_joined_channel": MemberJoinedChannelEvent{},
"member_left_channel": MemberLeftChannelEvent{},
"subteam_created": SubteamCreatedEvent{},
"subteam_self_added": SubteamSelfAddedEvent{},
"subteam_self_removed": SubteamSelfRemovedEvent{},
"subteam_updated": SubteamUpdatedEvent{},
"desktop_notification": DesktopNotificationEvent{},
}