summaryrefslogtreecommitdiffstats
path: root/bridge
diff options
context:
space:
mode:
authorDuco van Amstel <duco.vanamstel@gmail.com>2019-03-12 21:52:36 +0000
committerWim <wim@42.be>2019-03-12 22:52:36 +0100
commit74699a8262ef9bcc44046238ba43267620fd8a8e (patch)
tree8e0611275edd2fbd00ca3f93c27a0915ec01a0eb /bridge
parenteabf2a45820eef497b2a2f19c27d801a29bfaeea (diff)
downloadmatterbridge-msglm-74699a8262ef9bcc44046238ba43267620fd8a8e.tar.gz
matterbridge-msglm-74699a8262ef9bcc44046238ba43267620fd8a8e.tar.bz2
matterbridge-msglm-74699a8262ef9bcc44046238ba43267620fd8a8e.zip
Split-out Slack user and channel management (#762)
Diffstat (limited to 'bridge')
-rw-r--r--bridge/slack/helpers.go238
-rw-r--r--bridge/slack/slack.go8
-rw-r--r--bridge/slack/users_channels.go222
3 files changed, 239 insertions, 229 deletions
diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go
index 4d191c0d..ddbc770f 100644
--- a/bridge/slack/helpers.go
+++ b/bridge/slack/helpers.go
@@ -1,7 +1,6 @@
package bslack
import (
- "context"
"fmt"
"regexp"
"strings"
@@ -9,220 +8,9 @@ import (
"github.com/42wim/matterbridge/bridge/config"
"github.com/nlopes/slack"
+ "github.com/sirupsen/logrus"
)
-func (b *Bslack) getUser(id string) *slack.User {
- b.usersMutex.RLock()
- user, ok := b.users[id]
- b.usersMutex.RUnlock()
- if ok {
- return user
- }
- b.populateUser(id)
- b.usersMutex.RLock()
- defer b.usersMutex.RUnlock()
-
- return b.users[id]
-}
-
-func (b *Bslack) getUsername(id string) string {
- if user := b.getUser(id); user != nil {
- if user.Profile.DisplayName != "" {
- return user.Profile.DisplayName
- }
- return user.Name
- }
- b.Log.Warnf("Could not find user with ID '%s'", id)
- return ""
-}
-
-func (b *Bslack) getAvatar(id string) string {
- if user := b.getUser(id); user != nil {
- return user.Profile.Image48
- }
- return ""
-}
-
-func (b *Bslack) getChannel(channel string) (*slack.Channel, error) {
- if strings.HasPrefix(channel, "ID:") {
- return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))
- }
- return b.getChannelByName(channel)
-}
-
-func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) {
- return b.getChannelBy(name, b.channelsByName)
-}
-
-func (b *Bslack) getChannelByID(ID string) (*slack.Channel, error) {
- return b.getChannelBy(ID, b.channelsByID)
-}
-
-func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {
- b.channelsMutex.RLock()
- defer b.channelsMutex.RUnlock()
-
- if channel, ok := lookupMap[lookupKey]; ok {
- return channel, nil
- }
- return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey)
-}
-
-const minimumRefreshInterval = 10 * time.Second
-
-func (b *Bslack) populateUser(userID string) {
- b.usersMutex.RLock()
- _, exists := b.users[userID]
- b.usersMutex.RUnlock()
- if exists {
- // already in cache
- return
- }
-
- user, err := b.sc.GetUserInfo(userID)
- if err != nil {
- b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err)
- return
- }
-
- b.usersMutex.Lock()
- b.users[userID] = user
- b.usersMutex.Unlock()
-}
-
-func (b *Bslack) populateUsers(wait bool) {
- b.refreshMutex.Lock()
- if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) {
- b.Log.Debugf("Not refreshing user list as it was done less than %v ago.",
- minimumRefreshInterval)
- b.refreshMutex.Unlock()
-
- return
- }
- for b.refreshInProgress {
- b.refreshMutex.Unlock()
- time.Sleep(time.Second)
- b.refreshMutex.Lock()
- }
- b.refreshInProgress = true
- b.refreshMutex.Unlock()
-
- newUsers := map[string]*slack.User{}
- pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
- count := 0
- for {
- var err error
- pagination, err = pagination.Next(context.Background())
- time.Sleep(time.Second)
- if err != nil {
- if pagination.Done(err) {
- break
- }
-
- if err = b.handleRateLimit(err); err != nil {
- b.Log.Errorf("Could not retrieve users: %#v", err)
- return
- }
- continue
- }
-
- for i := range pagination.Users {
- newUsers[pagination.Users[i].ID] = &pagination.Users[i]
- }
- b.Log.Debugf("getting %d users", len(pagination.Users))
- count++
- // more > 2000 users, slack will complain and ratelimit. break
- if count > 10 {
- b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")
- break
- }
- }
-
- b.usersMutex.Lock()
- defer b.usersMutex.Unlock()
- b.users = newUsers
-
- b.refreshMutex.Lock()
- defer b.refreshMutex.Unlock()
- b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval)
- b.refreshInProgress = false
-}
-
-func (b *Bslack) populateChannels(wait bool) {
- b.refreshMutex.Lock()
- if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) {
- b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.",
- minimumRefreshInterval)
- b.refreshMutex.Unlock()
- return
- }
- for b.refreshInProgress {
- b.refreshMutex.Unlock()
- time.Sleep(time.Second)
- b.refreshMutex.Lock()
- }
- b.refreshInProgress = true
- b.refreshMutex.Unlock()
-
- newChannelsByID := map[string]*slack.Channel{}
- newChannelsByName := map[string]*slack.Channel{}
- newChannelMembers := make(map[string][]string)
-
- // We only retrieve public and private channels, not IMs
- // and MPIMs as those do not have a channel name.
- queryParams := &slack.GetConversationsParameters{
- ExcludeArchived: "true",
- Types: []string{"public_channel,private_channel"},
- }
- for {
- channels, nextCursor, err := b.sc.GetConversations(queryParams)
- if err != nil {
- if err = b.handleRateLimit(err); err != nil {
- b.Log.Errorf("Could not retrieve channels: %#v", err)
- return
- }
- continue
- }
-
- for i := range channels {
- newChannelsByID[channels[i].ID] = &channels[i]
- newChannelsByName[channels[i].Name] = &channels[i]
- // also find all the members in every channel
- // comment for now, issues on big slacks
- /*
- members, err := b.getUsersInConversation(channels[i].ID)
- if err != nil {
- if err = b.handleRateLimit(err); err != nil {
- b.Log.Errorf("Could not retrieve channel members: %#v", err)
- return
- }
- continue
- }
- newChannelMembers[channels[i].ID] = members
- */
- }
-
- if nextCursor == "" {
- break
- }
- queryParams.Cursor = nextCursor
- }
-
- b.channelsMutex.Lock()
- defer b.channelsMutex.Unlock()
- b.channelsByID = newChannelsByID
- b.channelsByName = newChannelsByName
-
- b.channelMembersMutex.Lock()
- defer b.channelMembersMutex.Unlock()
- b.channelMembers = newChannelMembers
-
- b.refreshMutex.Lock()
- defer b.refreshMutex.Unlock()
- b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval)
- b.refreshInProgress = false
-}
-
// populateReceivedMessage shapes the initial Matterbridge message that we will forward to the
// router before we apply message-dependent modifications.
func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) {
@@ -315,7 +103,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config
break
}
- if err = b.handleRateLimit(err); err != nil {
+ if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Could not retrieve bot information: %#v", err)
return err
}
@@ -404,16 +192,6 @@ func (b *Bslack) replaceCodeFence(text string) string {
return codeFenceRE.ReplaceAllString(text, "```")
}
-func (b *Bslack) handleRateLimit(err error) error {
- rateLimit, ok := err.(*slack.RateLimitedError)
- if !ok {
- return err
- }
- b.Log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter)
- time.Sleep(rateLimit.RetryAfter)
- return nil
-}
-
// getUsersInConversation returns an array of userIDs that are members of channelID
func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
channelMembers := []string{}
@@ -424,7 +202,7 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
members, nextCursor, err := b.sc.GetUsersInConversation(queryParams)
if err != nil {
- if err = b.handleRateLimit(err); err != nil {
+ if err = handleRateLimit(b.Log, err); err != nil {
return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err)
}
continue
@@ -439,3 +217,13 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
}
return channelMembers, nil
}
+
+func handleRateLimit(log *logrus.Entry, err error) error {
+ rateLimit, ok := err.(*slack.RateLimitedError)
+ if !ok {
+ return err
+ }
+ log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter)
+ time.Sleep(rateLimit.RetryAfter)
+ return nil
+}
diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go
index 6ab70a1d..860f4942 100644
--- a/bridge/slack/slack.go
+++ b/bridge/slack/slack.go
@@ -351,7 +351,7 @@ func (b *Bslack) updateTopicOrPurpose(msg *config.Message, channelInfo *slack.Ch
if err == nil {
return nil
}
- if err = b.handleRateLimit(err); err != nil {
+ if err = handleRateLimit(b.Log, err); err != nil {
return err
}
}
@@ -392,7 +392,7 @@ func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel)
return true, nil
}
- if err = b.handleRateLimit(err); err != nil {
+ if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Failed to delete user message from Slack: %#v", err)
return true, err
}
@@ -411,7 +411,7 @@ func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (b
return true, nil
}
- if err = b.handleRateLimit(err); err != nil {
+ if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Failed to edit user message on Slack: %#v", err)
return true, err
}
@@ -431,7 +431,7 @@ func (b *Bslack) postMessage(msg *config.Message, channelInfo *slack.Channel) (s
return id, nil
}
- if err = b.handleRateLimit(err); err != nil {
+ if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Failed to sent user message to Slack: %#v", err)
return "", err
}
diff --git a/bridge/slack/users_channels.go b/bridge/slack/users_channels.go
new file mode 100644
index 00000000..7eadd034
--- /dev/null
+++ b/bridge/slack/users_channels.go
@@ -0,0 +1,222 @@
+package bslack
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/nlopes/slack"
+)
+
+const minimumRefreshInterval = 10 * time.Second
+
+func (b *Bslack) getUser(id string) *slack.User {
+ b.usersMutex.RLock()
+ user, ok := b.users[id]
+ b.usersMutex.RUnlock()
+ if ok {
+ return user
+ }
+ b.populateUser(id)
+ b.usersMutex.RLock()
+ defer b.usersMutex.RUnlock()
+
+ return b.users[id]
+}
+
+func (b *Bslack) getUsername(id string) string {
+ if user := b.getUser(id); user != nil {
+ if user.Profile.DisplayName != "" {
+ return user.Profile.DisplayName
+ }
+ return user.Name
+ }
+ b.Log.Warnf("Could not find user with ID '%s'", id)
+ return ""
+}
+
+func (b *Bslack) getAvatar(id string) string {
+ if user := b.getUser(id); user != nil {
+ return user.Profile.Image48
+ }
+ return ""
+}
+
+func (b *Bslack) populateUser(userID string) {
+ b.usersMutex.RLock()
+ _, exists := b.users[userID]
+ b.usersMutex.RUnlock()
+ if exists {
+ // already in cache
+ return
+ }
+
+ user, err := b.sc.GetUserInfo(userID)
+ if err != nil {
+ b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err)
+ return
+ }
+
+ b.usersMutex.Lock()
+ b.users[userID] = user
+ b.usersMutex.Unlock()
+}
+
+func (b *Bslack) populateUsers(wait bool) {
+ b.refreshMutex.Lock()
+ if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) {
+ b.Log.Debugf("Not refreshing user list as it was done less than %v ago.",
+ minimumRefreshInterval)
+ b.refreshMutex.Unlock()
+
+ return
+ }
+ for b.refreshInProgress {
+ b.refreshMutex.Unlock()
+ time.Sleep(time.Second)
+ b.refreshMutex.Lock()
+ }
+ b.refreshInProgress = true
+ b.refreshMutex.Unlock()
+
+ newUsers := map[string]*slack.User{}
+ pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
+ count := 0
+ for {
+ var err error
+ pagination, err = pagination.Next(context.Background())
+ time.Sleep(time.Second)
+ if err != nil {
+ if pagination.Done(err) {
+ break
+ }
+
+ if err = handleRateLimit(b.Log, err); err != nil {
+ b.Log.Errorf("Could not retrieve users: %#v", err)
+ return
+ }
+ continue
+ }
+
+ for i := range pagination.Users {
+ newUsers[pagination.Users[i].ID] = &pagination.Users[i]
+ }
+ b.Log.Debugf("getting %d users", len(pagination.Users))
+ count++
+ // more > 2000 users, slack will complain and ratelimit. break
+ if count > 10 {
+ b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")
+ break
+ }
+ }
+
+ b.usersMutex.Lock()
+ defer b.usersMutex.Unlock()
+ b.users = newUsers
+
+ b.refreshMutex.Lock()
+ defer b.refreshMutex.Unlock()
+ b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval)
+ b.refreshInProgress = false
+}
+
+func (b *Bslack) getChannel(channel string) (*slack.Channel, error) {
+ if strings.HasPrefix(channel, "ID:") {
+ return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))
+ }
+ return b.getChannelByName(channel)
+}
+
+func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) {
+ return b.getChannelBy(name, b.channelsByName)
+}
+
+func (b *Bslack) getChannelByID(id string) (*slack.Channel, error) {
+ return b.getChannelBy(id, b.channelsByID)
+}
+
+func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {
+ b.channelsMutex.RLock()
+ defer b.channelsMutex.RUnlock()
+
+ if channel, ok := lookupMap[lookupKey]; ok {
+ return channel, nil
+ }
+ return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey)
+}
+
+func (b *Bslack) populateChannels(wait bool) {
+ b.refreshMutex.Lock()
+ if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) {
+ b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.",
+ minimumRefreshInterval)
+ b.refreshMutex.Unlock()
+ return
+ }
+ for b.refreshInProgress {
+ b.refreshMutex.Unlock()
+ time.Sleep(time.Second)
+ b.refreshMutex.Lock()
+ }
+ b.refreshInProgress = true
+ b.refreshMutex.Unlock()
+
+ newChannelsByID := map[string]*slack.Channel{}
+ newChannelsByName := map[string]*slack.Channel{}
+ newChannelMembers := make(map[string][]string)
+
+ // We only retrieve public and private channels, not IMs
+ // and MPIMs as those do not have a channel name.
+ queryParams := &slack.GetConversationsParameters{
+ ExcludeArchived: "true",
+ Types: []string{"public_channel,private_channel"},
+ }
+ for {
+ channels, nextCursor, err := b.sc.GetConversations(queryParams)
+ if err != nil {
+ if err = handleRateLimit(b.Log, err); err != nil {
+ b.Log.Errorf("Could not retrieve channels: %#v", err)
+ return
+ }
+ continue
+ }
+
+ for i := range channels {
+ newChannelsByID[channels[i].ID] = &channels[i]
+ newChannelsByName[channels[i].Name] = &channels[i]
+ // also find all the members in every channel
+ // comment for now, issues on big slacks
+ /*
+ members, err := b.getUsersInConversation(channels[i].ID)
+ if err != nil {
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Could not retrieve channel members: %#v", err)
+ return
+ }
+ continue
+ }
+ newChannelMembers[channels[i].ID] = members
+ */
+ }
+
+ if nextCursor == "" {
+ break
+ }
+ queryParams.Cursor = nextCursor
+ }
+
+ b.channelsMutex.Lock()
+ defer b.channelsMutex.Unlock()
+ b.channelsByID = newChannelsByID
+ b.channelsByName = newChannelsByName
+
+ b.channelMembersMutex.Lock()
+ defer b.channelMembersMutex.Unlock()
+ b.channelMembers = newChannelMembers
+
+ b.refreshMutex.Lock()
+ defer b.refreshMutex.Unlock()
+ b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval)
+ b.refreshInProgress = false
+}