package bslack import ( "context" "fmt" "strings" "sync" "time" "github.com/42wim/matterbridge/bridge/config" "github.com/nlopes/slack" "github.com/sirupsen/logrus" ) const minimumRefreshInterval = 10 * time.Second type users struct { log *logrus.Entry sc *slack.Client users map[string]*slack.User usersMutex sync.RWMutex usersSyncPoints map[string]chan struct{} refreshInProgress bool earliestRefresh time.Time refreshMutex sync.Mutex } func newUserManager(log *logrus.Entry, sc *slack.Client) *users { return &users{ log: log, sc: sc, users: make(map[string]*slack.User), usersSyncPoints: make(map[string]chan struct{}), earliestRefresh: time.Now(), } } func (b *users) getUser(id string) *slack.User { b.usersMutex.RLock() user, ok := b.users[id] b.usersMutex.RUnlock() if ok { return user } b.populateUser(id) b.usersMutex.RLock() defer b.usersMutex.RUnlock() return b.users[id] } func (b *users) getUsername(id string) string { if user := b.getUser(id); user != nil { if user.Profile.DisplayName != "" { return user.Profile.DisplayName } return user.Name } b.log.Warnf("Could not find user with ID '%s'", id) return "" } func (b *users) getAvatar(id string) string { if user := b.getUser(id); user != nil { return user.Profile.Image48 } return "" } func (b *users) populateUser(userID string) { for { b.usersMutex.Lock() _, exists := b.users[userID] if exists { // already in cache b.usersMutex.Unlock() return } if syncPoint, ok := b.usersSyncPoints[userID]; ok { // Another goroutine is already populating this user for us so wait on it to finish. b.usersMutex.Unlock() <-syncPoint // We do not return and iterate again to check that the entry does indeed exist // in case the previous query failed for some reason. } else { b.usersSyncPoints[userID] = make(chan struct{}) break } } // Do not hold the lock while fetching information from Slack // as this might take an unbounded amount of time. b.usersMutex.Unlock() user, err := b.sc.GetUserInfo(userID) if err != nil { b.log.Debugf("GetUserInfo failed for %v: %v", userID, err) return } b.usersMutex.Lock() defer b.usersMutex.Unlock() // Register user information. b.users[userID] = user // Wake up any waiting goroutines and remove the synchronization point. close(b.usersSyncPoints[userID]) delete(b.usersSyncPoints, userID) } func (b *users) populateUsers(wait bool) { b.refreshMutex.Lock() if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { b.log.Debugf("Not refreshing user list as it was done less than %v ago.", minimumRefreshInterval) b.refreshMutex.Unlock() return } for b.refreshInProgress { b.refreshMutex.Unlock() time.Sleep(time.Second) b.refreshMutex.Lock() } b.refreshInProgress = true b.refreshMutex.Unlock() newUsers := map[string]*slack.User{} pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) count := 0 for { var err error pagination, err = pagination.Next(context.Background()) time.Sleep(time.Second) if err != nil { if pagination.Done(err) { break } if err = handleRateLimit(b.log, err); err != nil { b.log.Errorf("Could not retrieve users: %#v", err) return } continue } for i := range pagination.Users { newUsers[pagination.Users[i].ID] = &pagination.Users[i] } b.log.Debugf("getting %d users", len(pagination.Users)) count++ // more > 2000 users, slack will complain and ratelimit. break if count > 10 { b.log.Info("Large slack detected > 2000 users, skipping loading complete userlist.") break } } b.usersMutex.Lock() defer b.usersMutex.Unlock() b.users = newUsers b.refreshMutex.Lock() defer b.refreshMutex.Unlock() b.earliestRefresh = time.Now().Add(minimumRefreshInterval) b.refreshInProgress = false } type channels struct { log *logrus.Entry sc *slack.Client channelsByID map[string]*slack.Channel channelsByName map[string]*slack.Channel channelsMutex sync.RWMutex channelMembers map[string][]string channelMembersMutex sync.RWMutex refreshInProgress bool earliestRefresh time.Time refreshMutex sync.Mutex } func newChannelManager(log *logrus.Entry, sc *slack.Client) *channels { return &channels{ log: log, sc: sc, channelsByID: make(map[string]*slack.Channel), channelsByName: make(map[string]*slack.Channel), earliestRefresh: time.Now(), } } func (b *channels) getChannel(channel string) (*slack.Channel, error) { if strings.HasPrefix(channel, "ID:") { return b.getChannelByID(strings.TrimPrefix(channel, "ID:")) } return b.getChannelByName(channel) } func (b *channels) getChannelByName(name string) (*slack.Channel, error) { return b.getChannelBy(name, b.channelsByName) } func (b *channels) getChannelByID(id string) (*slack.Channel, error) { return b.getChannelBy(id, b.channelsByID) } func (b *channels) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) { b.channelsMutex.RLock() defer b.channelsMutex.RUnlock() if channel, ok := lookupMap[lookupKey]; ok { return channel, nil } return nil, fmt.Errorf("channel %s not found", lookupKey) } func (b *channels) getChannelMembers(users *users) config.ChannelMembers { b.channelMembersMutex.RLock() defer b.channelMembersMutex.RUnlock() membersInfo := config.ChannelMembers{} for channelID, members := range b.channelMembers { for _, member := range members { channelName := "" userName := "" userNick := "" user := users.getUser(member) if user != nil { userName = user.Name userNick = user.Profile.DisplayName } channel, _ := b.getChannelByID(channelID) if channel != nil { channelName = channel.Name } memberInfo := config.ChannelMember{ Username: userName, Nick: userNick, UserID: member, ChannelID: channelID, ChannelName: channelName, } membersInfo = append(membersInfo, memberInfo) } } return membersInfo } func (b *channels) registerChannel(channel slack.Channel) { b.channelsMutex.Lock() defer b.channelsMutex.Unlock() b.channelsByID[channel.ID] = &channel b.channelsByName[channel.Name] = &channel } func (b *channels) populateChannels(wait bool) { b.refreshMutex.Lock() if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { b.log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", minimumRefreshInterval) b.refreshMutex.Unlock() return } for b.refreshInProgress { b.refreshMutex.Unlock() time.Sleep(time.Second) b.refreshMutex.Lock() } b.refreshInProgress = true b.refreshMutex.Unlock() newChannelsByID := map[string]*slack.Channel{} newChannelsByName := map[string]*slack.Channel{} newChannelMembers := make(map[string][]string) // We only retrieve public and private channels, not IMs // and MPIMs as those do not have a channel name. queryParams := &slack.GetConversationsParameters{ ExcludeArchived: "true", Types: []string{"public_channel,private_channel"}, } for { channels, nextCursor, err := b.sc.GetConversations(queryParams) if err != nil { if err = handleRateLimit(b.log, err); err != nil { b.log.Errorf("Could not retrieve channels: %#v", err) return } continue } for i := range channels { newChannelsByID[channels[i].ID] = &channels[i] newChannelsByName[channels[i].Name] = &channels[i] // also find all the members in every channel // comment for now, issues on big slacks /* members, err := b.getUsersInConversation(channels[i].ID) if err != nil { if err = b.handleRateLimit(err); err != nil { b.Log.Errorf("Could not retrieve channel members: %#v", err) return } continue } newChannelMembers[channels[i].ID] = members */ } if nextCursor == "" { break } queryParams.Cursor = nextCursor } b.channelsMutex.Lock() defer b.channelsMutex.Unlock() b.channelsByID = newChannelsByID b.channelsByName = newChannelsByName b.channelMembersMutex.Lock() defer b.channelMembersMutex.Unlock() b.channelMembers = newChannelMembers b.refreshMutex.Lock() defer b.refreshMutex.Unlock() b.earliestRefresh = time.Now().Add(minimumRefreshInterval) b.refreshInProgress = false }