diff options
author | Duco van Amstel <duco.vanamstel@gmail.com> | 2018-11-10 21:09:41 +0000 |
---|---|---|
committer | Wim <wim@42.be> | 2018-11-10 22:09:41 +0100 |
commit | 2f042ad9153a5116abed82cf74688af4579908ad (patch) | |
tree | 5e3651cca10768ad9a4c1c54c3eb03b71a5e9a6a /bridge/slack | |
parent | ba706918778d228f4c069f47968cb56578d06051 (diff) | |
download | matterbridge-msglm-2f042ad9153a5116abed82cf74688af4579908ad.tar.gz matterbridge-msglm-2f042ad9153a5116abed82cf74688af4579908ad.tar.bz2 matterbridge-msglm-2f042ad9153a5116abed82cf74688af4579908ad.zip |
Add more rate-limit handling (slack) (#581)
Diffstat (limited to 'bridge/slack')
-rw-r--r-- | bridge/slack/helpers.go | 93 | ||||
-rw-r--r-- | bridge/slack/slack.go | 100 |
2 files changed, 138 insertions, 55 deletions
diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go index fdcef2e7..861a8758 100644 --- a/bridge/slack/helpers.go +++ b/bridge/slack/helpers.go @@ -1,6 +1,7 @@ package bslack import ( + "context" "fmt" "regexp" "strings" @@ -76,17 +77,26 @@ func (b *Bslack) populateUsers() { b.refreshInProgress = true b.refreshMutex.Unlock() - users, err := b.sc.GetUsers() - if err != nil { - b.Log.Errorf("Could not reload users: %#v", err) - return - } - newUsers := map[string]*slack.User{} - for i := range users { - // Use array index for pointer, not the copy - // See: https://stackoverflow.com/a/29498133/504018 - newUsers[users[i].ID] = &users[i] + pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) + for { + var err error + pagination, err = pagination.Next(context.Background()) + if err != nil { + if pagination.Done(err) { + break + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve users: %#v", err) + return + } + continue + } + + for i := range pagination.Users { + newUsers[pagination.Users[i].ID] = &pagination.Users[i] + } } b.usersMutex.Lock() @@ -122,10 +132,14 @@ func (b *Bslack) populateChannels() { for { channels, nextCursor, err := b.sc.GetConversations(queryParams) if err != nil { - b.Log.Errorf("Could not reload channels: %#v", err) - return + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve channels: %#v", err) + return + } + continue } - for i := 0; i < len(channels); i++ { + + for i := range channels { newChannelsByID[channels[i].ID] = &channels[i] newChannelsByName[channels[i].Name] = &channels[i] } @@ -189,18 +203,8 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi // First, deal with bot-originating messages but only do so when not using webhooks: we // would not be able to distinguish which bot would be sending them. - if ev.BotID != "" && b.GetString(outgoingWebhookConfig) == "" { - bot, err := b.rtm.GetBotInfo(ev.BotID) - if err != nil { - return err - } - if bot.Name != "" && bot.Name != "Slack API Tester" { - rmsg.Username = bot.Name - if ev.Username != "" { - rmsg.Username = ev.Username - } - rmsg.UserID = bot.ID - } + if err := b.populateMessageWithBotInfo(ev, rmsg); err != nil { + return err } // Second, deal with "real" users if we have the necessary information. @@ -227,6 +231,35 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi return nil } +func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config.Message) error { + if ev.BotID == "" || b.GetString(outgoingWebhookConfig) != "" { + return nil + } + + var err error + var bot *slack.Bot + for { + bot, err = b.rtm.GetBotInfo(ev.BotID) + if err == nil { + break + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve bot information: %#v", err) + return err + } + } + + if bot.Name != "" && bot.Name != "Slack API Tester" { + rmsg.Username = bot.Name + if ev.Username != "" { + rmsg.Username = ev.Username + } + rmsg.UserID = bot.ID + } + return nil +} + var ( mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`) channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`) @@ -277,3 +310,13 @@ func (b *Bslack) replaceURL(text string) string { } return text } + +func (b *Bslack) handleRateLimit(err error) error { + rateLimit, ok := err.(*slack.RateLimitedError) + if !ok { + return err + } + b.Log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter) + time.Sleep(rateLimit.RetryAfter) + return nil +} diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index fd8e3791..038e0d90 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -278,34 +278,20 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) { return "", nil } - // Delete message - if msg.Event == config.EVENT_MSG_DELETE { - // some protocols echo deletes, but with empty ID - if msg.ID == "" { - return "", nil - } - // we get a "slack <ID>", split it - ts := strings.Fields(msg.ID) - _, _, err = b.rtm.DeleteMessage(channelInfo.ID, ts[1]) - if err != nil { - return msg.ID, err - } - return msg.ID, nil + // Handle message deletions. + var handled bool + if handled, err = b.deleteMessage(&msg, channelInfo); handled { + return msg.ID, err } - // Prepend nick if configured + // Prepend nickname if configured. if b.GetBool(useNickPrefixConfig) { msg.Text = msg.Username + msg.Text } - // Edit message if we have an ID - if msg.ID != "" { - ts := strings.Fields(msg.ID) - _, _, _, err = b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text) - if err != nil { - return msg.ID, err - } - return msg.ID, nil + // Handle message edits. + if handled, err = b.editMessage(&msg, channelInfo); handled { + return msg.ID, err } messageParameters := b.prepareMessageParameters(&msg) @@ -319,19 +305,73 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) { } } // Upload files if necessary (from Slack, Telegram or Mattermost). - b.handleUploadFile(&msg, channelInfo.ID) + b.uploadFile(&msg, channelInfo.ID) } - // Post normal message - _, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters) - if err != nil { - return "", err + // Post message. + return b.postMessage(&msg, messageParameters, channelInfo) +} + +func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) { + if msg.Event != config.EVENT_MSG_DELETE { + return false, nil + } + + // Some protocols echo deletes, but with an empty ID. + if msg.ID == "" { + return true, nil + } + + // If we get a "slack <ID>", split it. + ts := strings.Fields(msg.ID) + for { + _, _, err := b.rtm.DeleteMessage(channelInfo.ID, ts[1]) + if err == nil { + return true, nil + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Failed to delete user message from Slack: %#v", err) + return true, err + } + } +} + +func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) { + if msg.ID == "" { + return false, nil + } + + ts := strings.Fields(msg.ID) + for { + _, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text) + if err == nil { + return true, nil + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Failed to edit user message on Slack: %#v", err) + return true, err + } + } +} + +func (b *Bslack) postMessage(msg *config.Message, messageParameters *slack.PostMessageParameters, channelInfo *slack.Channel) (string, error) { + for { + _, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters) + if err == nil { + return "slack " + id, nil + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Failed to sent user message to Slack: %#v", err) + return "", err + } } - return "slack " + id, nil } -// handleUploadFile handles native upload of files -func (b *Bslack) handleUploadFile(msg *config.Message, channelID string) { +// uploadFile handles native upload of files +func (b *Bslack) uploadFile(msg *config.Message, channelID string) { for _, f := range msg.Extra["file"] { fi := f.(config.FileInfo) if msg.Text == fi.Comment { |