From 74699a8262ef9bcc44046238ba43267620fd8a8e Mon Sep 17 00:00:00 2001 From: Duco van Amstel Date: Tue, 12 Mar 2019 21:52:36 +0000 Subject: Split-out Slack user and channel management (#762) --- .golangci.yaml | 1 - bridge/slack/helpers.go | 238 +++-------------------------------------- bridge/slack/slack.go | 8 +- bridge/slack/users_channels.go | 222 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 239 insertions(+), 230 deletions(-) create mode 100644 bridge/slack/users_channels.go diff --git a/.golangci.yaml b/.golangci.yaml index 64c7f4c2..d6222775 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -158,7 +158,6 @@ linters-settings: - regexpMust - singleCaseSwitch - sloppyLen - - sloppyReassign - switchTrue - typeSwitchVar - typeUnparen diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go index 4d191c0d..ddbc770f 100644 --- a/bridge/slack/helpers.go +++ b/bridge/slack/helpers.go @@ -1,7 +1,6 @@ package bslack import ( - "context" "fmt" "regexp" "strings" @@ -9,220 +8,9 @@ import ( "github.com/42wim/matterbridge/bridge/config" "github.com/nlopes/slack" + "github.com/sirupsen/logrus" ) -func (b *Bslack) getUser(id string) *slack.User { - b.usersMutex.RLock() - user, ok := b.users[id] - b.usersMutex.RUnlock() - if ok { - return user - } - b.populateUser(id) - b.usersMutex.RLock() - defer b.usersMutex.RUnlock() - - return b.users[id] -} - -func (b *Bslack) getUsername(id string) string { - if user := b.getUser(id); user != nil { - if user.Profile.DisplayName != "" { - return user.Profile.DisplayName - } - return user.Name - } - b.Log.Warnf("Could not find user with ID '%s'", id) - return "" -} - -func (b *Bslack) getAvatar(id string) string { - if user := b.getUser(id); user != nil { - return user.Profile.Image48 - } - return "" -} - -func (b *Bslack) getChannel(channel string) (*slack.Channel, error) { - if strings.HasPrefix(channel, "ID:") { - return b.getChannelByID(strings.TrimPrefix(channel, "ID:")) - } - return b.getChannelByName(channel) -} - -func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) { - return b.getChannelBy(name, b.channelsByName) -} - -func (b *Bslack) getChannelByID(ID string) (*slack.Channel, error) { - return b.getChannelBy(ID, b.channelsByID) -} - -func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) { - b.channelsMutex.RLock() - defer b.channelsMutex.RUnlock() - - if channel, ok := lookupMap[lookupKey]; ok { - return channel, nil - } - return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey) -} - -const minimumRefreshInterval = 10 * time.Second - -func (b *Bslack) populateUser(userID string) { - b.usersMutex.RLock() - _, exists := b.users[userID] - b.usersMutex.RUnlock() - if exists { - // already in cache - return - } - - user, err := b.sc.GetUserInfo(userID) - if err != nil { - b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err) - return - } - - b.usersMutex.Lock() - b.users[userID] = user - b.usersMutex.Unlock() -} - -func (b *Bslack) populateUsers(wait bool) { - b.refreshMutex.Lock() - if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) { - b.Log.Debugf("Not refreshing user list as it was done less than %v ago.", - minimumRefreshInterval) - b.refreshMutex.Unlock() - - return - } - for b.refreshInProgress { - b.refreshMutex.Unlock() - time.Sleep(time.Second) - b.refreshMutex.Lock() - } - b.refreshInProgress = true - b.refreshMutex.Unlock() - - newUsers := map[string]*slack.User{} - pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) - count := 0 - for { - var err error - pagination, err = pagination.Next(context.Background()) - time.Sleep(time.Second) - if err != nil { - if pagination.Done(err) { - break - } - - if err = b.handleRateLimit(err); err != nil { - b.Log.Errorf("Could not retrieve users: %#v", err) - return - } - continue - } - - for i := range pagination.Users { - newUsers[pagination.Users[i].ID] = &pagination.Users[i] - } - b.Log.Debugf("getting %d users", len(pagination.Users)) - count++ - // more > 2000 users, slack will complain and ratelimit. break - if count > 10 { - b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.") - break - } - } - - b.usersMutex.Lock() - defer b.usersMutex.Unlock() - b.users = newUsers - - b.refreshMutex.Lock() - defer b.refreshMutex.Unlock() - b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval) - b.refreshInProgress = false -} - -func (b *Bslack) populateChannels(wait bool) { - b.refreshMutex.Lock() - if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) { - b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", - minimumRefreshInterval) - b.refreshMutex.Unlock() - return - } - for b.refreshInProgress { - b.refreshMutex.Unlock() - time.Sleep(time.Second) - b.refreshMutex.Lock() - } - b.refreshInProgress = true - b.refreshMutex.Unlock() - - newChannelsByID := map[string]*slack.Channel{} - newChannelsByName := map[string]*slack.Channel{} - newChannelMembers := make(map[string][]string) - - // We only retrieve public and private channels, not IMs - // and MPIMs as those do not have a channel name. - queryParams := &slack.GetConversationsParameters{ - ExcludeArchived: "true", - Types: []string{"public_channel,private_channel"}, - } - for { - channels, nextCursor, err := b.sc.GetConversations(queryParams) - if err != nil { - if err = b.handleRateLimit(err); err != nil { - b.Log.Errorf("Could not retrieve channels: %#v", err) - return - } - continue - } - - for i := range channels { - newChannelsByID[channels[i].ID] = &channels[i] - newChannelsByName[channels[i].Name] = &channels[i] - // also find all the members in every channel - // comment for now, issues on big slacks - /* - members, err := b.getUsersInConversation(channels[i].ID) - if err != nil { - if err = b.handleRateLimit(err); err != nil { - b.Log.Errorf("Could not retrieve channel members: %#v", err) - return - } - continue - } - newChannelMembers[channels[i].ID] = members - */ - } - - if nextCursor == "" { - break - } - queryParams.Cursor = nextCursor - } - - b.channelsMutex.Lock() - defer b.channelsMutex.Unlock() - b.channelsByID = newChannelsByID - b.channelsByName = newChannelsByName - - b.channelMembersMutex.Lock() - defer b.channelMembersMutex.Unlock() - b.channelMembers = newChannelMembers - - b.refreshMutex.Lock() - defer b.refreshMutex.Unlock() - b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval) - b.refreshInProgress = false -} - // populateReceivedMessage shapes the initial Matterbridge message that we will forward to the // router before we apply message-dependent modifications. func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) { @@ -315,7 +103,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config break } - if err = b.handleRateLimit(err); err != nil { + if err = handleRateLimit(b.Log, err); err != nil { b.Log.Errorf("Could not retrieve bot information: %#v", err) return err } @@ -404,16 +192,6 @@ func (b *Bslack) replaceCodeFence(text string) string { return codeFenceRE.ReplaceAllString(text, "```") } -func (b *Bslack) handleRateLimit(err error) error { - rateLimit, ok := err.(*slack.RateLimitedError) - if !ok { - return err - } - b.Log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter) - time.Sleep(rateLimit.RetryAfter) - return nil -} - // getUsersInConversation returns an array of userIDs that are members of channelID func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { channelMembers := []string{} @@ -424,7 +202,7 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { members, nextCursor, err := b.sc.GetUsersInConversation(queryParams) if err != nil { - if err = b.handleRateLimit(err); err != nil { + if err = handleRateLimit(b.Log, err); err != nil { return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err) } continue @@ -439,3 +217,13 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { } return channelMembers, nil } + +func handleRateLimit(log *logrus.Entry, err error) error { + rateLimit, ok := err.(*slack.RateLimitedError) + if !ok { + return err + } + log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter) + time.Sleep(rateLimit.RetryAfter) + return nil +} diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index 6ab70a1d..860f4942 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -351,7 +351,7 @@ func (b *Bslack) updateTopicOrPurpose(msg *config.Message, channelInfo *slack.Ch if err == nil { return nil } - if err = b.handleRateLimit(err); err != nil { + if err = handleRateLimit(b.Log, err); err != nil { return err } } @@ -392,7 +392,7 @@ func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) return true, nil } - if err = b.handleRateLimit(err); err != nil { + if err = handleRateLimit(b.Log, err); err != nil { b.Log.Errorf("Failed to delete user message from Slack: %#v", err) return true, err } @@ -411,7 +411,7 @@ func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (b return true, nil } - if err = b.handleRateLimit(err); err != nil { + if err = handleRateLimit(b.Log, err); err != nil { b.Log.Errorf("Failed to edit user message on Slack: %#v", err) return true, err } @@ -431,7 +431,7 @@ func (b *Bslack) postMessage(msg *config.Message, channelInfo *slack.Channel) (s return id, nil } - if err = b.handleRateLimit(err); err != nil { + if err = handleRateLimit(b.Log, err); err != nil { b.Log.Errorf("Failed to sent user message to Slack: %#v", err) return "", err } diff --git a/bridge/slack/users_channels.go b/bridge/slack/users_channels.go new file mode 100644 index 00000000..7eadd034 --- /dev/null +++ b/bridge/slack/users_channels.go @@ -0,0 +1,222 @@ +package bslack + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/nlopes/slack" +) + +const minimumRefreshInterval = 10 * time.Second + +func (b *Bslack) getUser(id string) *slack.User { + b.usersMutex.RLock() + user, ok := b.users[id] + b.usersMutex.RUnlock() + if ok { + return user + } + b.populateUser(id) + b.usersMutex.RLock() + defer b.usersMutex.RUnlock() + + return b.users[id] +} + +func (b *Bslack) getUsername(id string) string { + if user := b.getUser(id); user != nil { + if user.Profile.DisplayName != "" { + return user.Profile.DisplayName + } + return user.Name + } + b.Log.Warnf("Could not find user with ID '%s'", id) + return "" +} + +func (b *Bslack) getAvatar(id string) string { + if user := b.getUser(id); user != nil { + return user.Profile.Image48 + } + return "" +} + +func (b *Bslack) populateUser(userID string) { + b.usersMutex.RLock() + _, exists := b.users[userID] + b.usersMutex.RUnlock() + if exists { + // already in cache + return + } + + user, err := b.sc.GetUserInfo(userID) + if err != nil { + b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err) + return + } + + b.usersMutex.Lock() + b.users[userID] = user + b.usersMutex.Unlock() +} + +func (b *Bslack) populateUsers(wait bool) { + b.refreshMutex.Lock() + if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) { + b.Log.Debugf("Not refreshing user list as it was done less than %v ago.", + minimumRefreshInterval) + b.refreshMutex.Unlock() + + return + } + for b.refreshInProgress { + b.refreshMutex.Unlock() + time.Sleep(time.Second) + b.refreshMutex.Lock() + } + b.refreshInProgress = true + b.refreshMutex.Unlock() + + newUsers := map[string]*slack.User{} + pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) + count := 0 + for { + var err error + pagination, err = pagination.Next(context.Background()) + time.Sleep(time.Second) + if err != nil { + if pagination.Done(err) { + break + } + + if err = handleRateLimit(b.Log, err); err != nil { + b.Log.Errorf("Could not retrieve users: %#v", err) + return + } + continue + } + + for i := range pagination.Users { + newUsers[pagination.Users[i].ID] = &pagination.Users[i] + } + b.Log.Debugf("getting %d users", len(pagination.Users)) + count++ + // more > 2000 users, slack will complain and ratelimit. break + if count > 10 { + b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.") + break + } + } + + b.usersMutex.Lock() + defer b.usersMutex.Unlock() + b.users = newUsers + + b.refreshMutex.Lock() + defer b.refreshMutex.Unlock() + b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval) + b.refreshInProgress = false +} + +func (b *Bslack) getChannel(channel string) (*slack.Channel, error) { + if strings.HasPrefix(channel, "ID:") { + return b.getChannelByID(strings.TrimPrefix(channel, "ID:")) + } + return b.getChannelByName(channel) +} + +func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) { + return b.getChannelBy(name, b.channelsByName) +} + +func (b *Bslack) getChannelByID(id string) (*slack.Channel, error) { + return b.getChannelBy(id, b.channelsByID) +} + +func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) { + b.channelsMutex.RLock() + defer b.channelsMutex.RUnlock() + + if channel, ok := lookupMap[lookupKey]; ok { + return channel, nil + } + return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey) +} + +func (b *Bslack) populateChannels(wait bool) { + b.refreshMutex.Lock() + if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) { + b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", + minimumRefreshInterval) + b.refreshMutex.Unlock() + return + } + for b.refreshInProgress { + b.refreshMutex.Unlock() + time.Sleep(time.Second) + b.refreshMutex.Lock() + } + b.refreshInProgress = true + b.refreshMutex.Unlock() + + newChannelsByID := map[string]*slack.Channel{} + newChannelsByName := map[string]*slack.Channel{} + newChannelMembers := make(map[string][]string) + + // We only retrieve public and private channels, not IMs + // and MPIMs as those do not have a channel name. + queryParams := &slack.GetConversationsParameters{ + ExcludeArchived: "true", + Types: []string{"public_channel,private_channel"}, + } + for { + channels, nextCursor, err := b.sc.GetConversations(queryParams) + if err != nil { + if err = handleRateLimit(b.Log, err); err != nil { + b.Log.Errorf("Could not retrieve channels: %#v", err) + return + } + continue + } + + for i := range channels { + newChannelsByID[channels[i].ID] = &channels[i] + newChannelsByName[channels[i].Name] = &channels[i] + // also find all the members in every channel + // comment for now, issues on big slacks + /* + members, err := b.getUsersInConversation(channels[i].ID) + if err != nil { + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve channel members: %#v", err) + return + } + continue + } + newChannelMembers[channels[i].ID] = members + */ + } + + if nextCursor == "" { + break + } + queryParams.Cursor = nextCursor + } + + b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + b.channelsByID = newChannelsByID + b.channelsByName = newChannelsByName + + b.channelMembersMutex.Lock() + defer b.channelMembersMutex.Unlock() + b.channelMembers = newChannelMembers + + b.refreshMutex.Lock() + defer b.refreshMutex.Unlock() + b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval) + b.refreshInProgress = false +} -- cgit v1.2.3