summaryrefslogtreecommitdiffstats
path: root/bridge/slack
diff options
context:
space:
mode:
authorDuco van Amstel <duco.vanamstel@gmail.com>2018-11-10 21:09:41 +0000
committerWim <wim@42.be>2018-11-10 22:09:41 +0100
commit2f042ad9153a5116abed82cf74688af4579908ad (patch)
tree5e3651cca10768ad9a4c1c54c3eb03b71a5e9a6a /bridge/slack
parentba706918778d228f4c069f47968cb56578d06051 (diff)
downloadmatterbridge-msglm-2f042ad9153a5116abed82cf74688af4579908ad.tar.gz
matterbridge-msglm-2f042ad9153a5116abed82cf74688af4579908ad.tar.bz2
matterbridge-msglm-2f042ad9153a5116abed82cf74688af4579908ad.zip
Add more rate-limit handling (slack) (#581)
Diffstat (limited to 'bridge/slack')
-rw-r--r--bridge/slack/helpers.go93
-rw-r--r--bridge/slack/slack.go100
2 files changed, 138 insertions, 55 deletions
diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go
index fdcef2e7..861a8758 100644
--- a/bridge/slack/helpers.go
+++ b/bridge/slack/helpers.go
@@ -1,6 +1,7 @@
package bslack
import (
+ "context"
"fmt"
"regexp"
"strings"
@@ -76,17 +77,26 @@ func (b *Bslack) populateUsers() {
b.refreshInProgress = true
b.refreshMutex.Unlock()
- users, err := b.sc.GetUsers()
- if err != nil {
- b.Log.Errorf("Could not reload users: %#v", err)
- return
- }
-
newUsers := map[string]*slack.User{}
- for i := range users {
- // Use array index for pointer, not the copy
- // See: https://stackoverflow.com/a/29498133/504018
- newUsers[users[i].ID] = &users[i]
+ pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
+ for {
+ var err error
+ pagination, err = pagination.Next(context.Background())
+ if err != nil {
+ if pagination.Done(err) {
+ break
+ }
+
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Could not retrieve users: %#v", err)
+ return
+ }
+ continue
+ }
+
+ for i := range pagination.Users {
+ newUsers[pagination.Users[i].ID] = &pagination.Users[i]
+ }
}
b.usersMutex.Lock()
@@ -122,10 +132,14 @@ func (b *Bslack) populateChannels() {
for {
channels, nextCursor, err := b.sc.GetConversations(queryParams)
if err != nil {
- b.Log.Errorf("Could not reload channels: %#v", err)
- return
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Could not retrieve channels: %#v", err)
+ return
+ }
+ continue
}
- for i := 0; i < len(channels); i++ {
+
+ for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i]
newChannelsByName[channels[i].Name] = &channels[i]
}
@@ -189,18 +203,8 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi
// First, deal with bot-originating messages but only do so when not using webhooks: we
// would not be able to distinguish which bot would be sending them.
- if ev.BotID != "" && b.GetString(outgoingWebhookConfig) == "" {
- bot, err := b.rtm.GetBotInfo(ev.BotID)
- if err != nil {
- return err
- }
- if bot.Name != "" && bot.Name != "Slack API Tester" {
- rmsg.Username = bot.Name
- if ev.Username != "" {
- rmsg.Username = ev.Username
- }
- rmsg.UserID = bot.ID
- }
+ if err := b.populateMessageWithBotInfo(ev, rmsg); err != nil {
+ return err
}
// Second, deal with "real" users if we have the necessary information.
@@ -227,6 +231,35 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi
return nil
}
+func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config.Message) error {
+ if ev.BotID == "" || b.GetString(outgoingWebhookConfig) != "" {
+ return nil
+ }
+
+ var err error
+ var bot *slack.Bot
+ for {
+ bot, err = b.rtm.GetBotInfo(ev.BotID)
+ if err == nil {
+ break
+ }
+
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Could not retrieve bot information: %#v", err)
+ return err
+ }
+ }
+
+ if bot.Name != "" && bot.Name != "Slack API Tester" {
+ rmsg.Username = bot.Name
+ if ev.Username != "" {
+ rmsg.Username = ev.Username
+ }
+ rmsg.UserID = bot.ID
+ }
+ return nil
+}
+
var (
mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`)
channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`)
@@ -277,3 +310,13 @@ func (b *Bslack) replaceURL(text string) string {
}
return text
}
+
+func (b *Bslack) handleRateLimit(err error) error {
+ rateLimit, ok := err.(*slack.RateLimitedError)
+ if !ok {
+ return err
+ }
+ b.Log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter)
+ time.Sleep(rateLimit.RetryAfter)
+ return nil
+}
diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go
index fd8e3791..038e0d90 100644
--- a/bridge/slack/slack.go
+++ b/bridge/slack/slack.go
@@ -278,34 +278,20 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {
return "", nil
}
- // Delete message
- if msg.Event == config.EVENT_MSG_DELETE {
- // some protocols echo deletes, but with empty ID
- if msg.ID == "" {
- return "", nil
- }
- // we get a "slack <ID>", split it
- ts := strings.Fields(msg.ID)
- _, _, err = b.rtm.DeleteMessage(channelInfo.ID, ts[1])
- if err != nil {
- return msg.ID, err
- }
- return msg.ID, nil
+ // Handle message deletions.
+ var handled bool
+ if handled, err = b.deleteMessage(&msg, channelInfo); handled {
+ return msg.ID, err
}
- // Prepend nick if configured
+ // Prepend nickname if configured.
if b.GetBool(useNickPrefixConfig) {
msg.Text = msg.Username + msg.Text
}
- // Edit message if we have an ID
- if msg.ID != "" {
- ts := strings.Fields(msg.ID)
- _, _, _, err = b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text)
- if err != nil {
- return msg.ID, err
- }
- return msg.ID, nil
+ // Handle message edits.
+ if handled, err = b.editMessage(&msg, channelInfo); handled {
+ return msg.ID, err
}
messageParameters := b.prepareMessageParameters(&msg)
@@ -319,19 +305,73 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {
}
}
// Upload files if necessary (from Slack, Telegram or Mattermost).
- b.handleUploadFile(&msg, channelInfo.ID)
+ b.uploadFile(&msg, channelInfo.ID)
}
- // Post normal message
- _, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters)
- if err != nil {
- return "", err
+ // Post message.
+ return b.postMessage(&msg, messageParameters, channelInfo)
+}
+
+func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) {
+ if msg.Event != config.EVENT_MSG_DELETE {
+ return false, nil
+ }
+
+ // Some protocols echo deletes, but with an empty ID.
+ if msg.ID == "" {
+ return true, nil
+ }
+
+ // If we get a "slack <ID>", split it.
+ ts := strings.Fields(msg.ID)
+ for {
+ _, _, err := b.rtm.DeleteMessage(channelInfo.ID, ts[1])
+ if err == nil {
+ return true, nil
+ }
+
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Failed to delete user message from Slack: %#v", err)
+ return true, err
+ }
+ }
+}
+
+func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) {
+ if msg.ID == "" {
+ return false, nil
+ }
+
+ ts := strings.Fields(msg.ID)
+ for {
+ _, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text)
+ if err == nil {
+ return true, nil
+ }
+
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Failed to edit user message on Slack: %#v", err)
+ return true, err
+ }
+ }
+}
+
+func (b *Bslack) postMessage(msg *config.Message, messageParameters *slack.PostMessageParameters, channelInfo *slack.Channel) (string, error) {
+ for {
+ _, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters)
+ if err == nil {
+ return "slack " + id, nil
+ }
+
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Failed to sent user message to Slack: %#v", err)
+ return "", err
+ }
}
- return "slack " + id, nil
}
-// handleUploadFile handles native upload of files
-func (b *Bslack) handleUploadFile(msg *config.Message, channelID string) {
+// uploadFile handles native upload of files
+func (b *Bslack) uploadFile(msg *config.Message, channelID string) {
for _, f := range msg.Extra["file"] {
fi := f.(config.FileInfo)
if msg.Text == fi.Comment {