summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bridge/bridge.go34
-rw-r--r--bridge/config/config.go31
-rw-r--r--bridge/slack/handlers.go52
-rw-r--r--bridge/slack/helpers.go46
-rw-r--r--bridge/slack/slack.go8
-rw-r--r--gateway/handlers.go17
-rw-r--r--gateway/router.go22
7 files changed, 189 insertions, 21 deletions
diff --git a/bridge/bridge.go b/bridge/bridge.go
index 336e2e2c..6b955a9e 100644
--- a/bridge/bridge.go
+++ b/bridge/bridge.go
@@ -5,6 +5,7 @@ import (
"github.com/42wim/matterbridge/bridge/config"
"github.com/sirupsen/logrus"
+ "sync"
)
type Bridger interface {
@@ -16,14 +17,16 @@ type Bridger interface {
type Bridge struct {
Bridger
- Name string
- Account string
- Protocol string
- Channels map[string]config.ChannelInfo
- Joined map[string]bool
- Log *logrus.Entry
- Config config.Config
- General *config.Protocol
+ Name string
+ Account string
+ Protocol string
+ Channels map[string]config.ChannelInfo
+ Joined map[string]bool
+ ChannelMembers *config.ChannelMembers
+ Log *logrus.Entry
+ Config config.Config
+ General *config.Protocol
+ *sync.RWMutex
}
type Config struct {
@@ -37,15 +40,17 @@ type Config struct {
type Factory func(*Config) Bridger
func New(bridge *config.Bridge) *Bridge {
- b := new(Bridge)
- b.Channels = make(map[string]config.ChannelInfo)
+ b := &Bridge{
+ Channels: make(map[string]config.ChannelInfo),
+ RWMutex: new(sync.RWMutex),
+ Joined: make(map[string]bool),
+ }
accInfo := strings.Split(bridge.Account, ".")
protocol := accInfo[0]
name := accInfo[1]
b.Name = name
b.Protocol = protocol
b.Account = bridge.Account
- b.Joined = make(map[string]bool)
return b
}
@@ -54,6 +59,13 @@ func (b *Bridge) JoinChannels() error {
return err
}
+// SetChannelMembers sets the newMembers to the bridge ChannelMembers
+func (b *Bridge) SetChannelMembers(newMembers *config.ChannelMembers) {
+ b.Lock()
+ b.ChannelMembers = newMembers
+ b.Unlock()
+}
+
func (b *Bridge) joinChannels(channels map[string]config.ChannelInfo, exists map[string]bool) error {
for ID, channel := range channels {
if !exists[ID] {
diff --git a/bridge/config/config.go b/bridge/config/config.go
index 932de2eb..3d1206c7 100644
--- a/bridge/config/config.go
+++ b/bridge/config/config.go
@@ -14,16 +14,17 @@ import (
)
const (
- EventJoinLeave = "join_leave"
- EventTopicChange = "topic_change"
- EventFailure = "failure"
- EventFileFailureSize = "file_failure_size"
- EventAvatarDownload = "avatar_download"
- EventRejoinChannels = "rejoin_channels"
- EventUserAction = "user_action"
- EventMsgDelete = "msg_delete"
- EventAPIConnected = "api_connected"
- EventUserTyping = "user_typing"
+ EventJoinLeave = "join_leave"
+ EventTopicChange = "topic_change"
+ EventFailure = "failure"
+ EventFileFailureSize = "file_failure_size"
+ EventAvatarDownload = "avatar_download"
+ EventRejoinChannels = "rejoin_channels"
+ EventUserAction = "user_action"
+ EventMsgDelete = "msg_delete"
+ EventAPIConnected = "api_connected"
+ EventUserTyping = "user_typing"
+ EventGetChannelMembers = "get_channel_members"
)
type Message struct {
@@ -61,6 +62,16 @@ type ChannelInfo struct {
Options ChannelOptions
}
+type ChannelMember struct {
+ Username string
+ Nick string
+ UserID string
+ ChannelID string
+ ChannelName string
+}
+
+type ChannelMembers []ChannelMember
+
type Protocol struct {
AuthCode string // steam
BindAddress string // mattermost, slack // DEPRECATED
diff --git a/bridge/slack/handlers.go b/bridge/slack/handlers.go
index f46eb595..5e601eaa 100644
--- a/bridge/slack/handlers.go
+++ b/bridge/slack/handlers.go
@@ -309,6 +309,58 @@ func (b *Bslack) handleDownloadFile(rmsg *config.Message, file *slack.File, retr
return nil
}
+// handleGetChannelMembers handles messages containing the GetChannelMembers event
+// Sends a message to the router containing *config.ChannelMembers
+func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool {
+ if rmsg.Event != config.EventGetChannelMembers {
+ return false
+ }
+
+ cMembers := config.ChannelMembers{}
+
+ b.channelMembersMutex.RLock()
+
+ for channelID, members := range b.channelMembers {
+ for _, member := range members {
+ channelName := ""
+ userName := ""
+ userNick := ""
+ user := b.getUser(member)
+ if user != nil {
+ userName = user.Name
+ userNick = user.Profile.DisplayName
+ }
+ channel, _ := b.getChannelByID(channelID)
+ if channel != nil {
+ channelName = channel.Name
+ }
+ cMember := config.ChannelMember{
+ Username: userName,
+ Nick: userNick,
+ UserID: member,
+ ChannelID: channelID,
+ ChannelName: channelName,
+ }
+ cMembers = append(cMembers, cMember)
+ }
+ }
+
+ b.channelMembersMutex.RUnlock()
+
+ extra := make(map[string][]interface{})
+ extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers)
+ msg := config.Message{
+ Extra: extra,
+ Event: config.EventGetChannelMembers,
+ Account: b.Account,
+ }
+
+ b.Log.Debugf("sending msg to remote %#v", msg)
+ b.Remote <- msg
+
+ return true
+}
+
// fileCached implements Matterbridge's caching logic for files
// shared via Slack.
//
diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go
index 4e6e5652..d84353f0 100644
--- a/bridge/slack/helpers.go
+++ b/bridge/slack/helpers.go
@@ -93,7 +93,9 @@ func (b *Bslack) populateUsers(wait bool) {
return
}
for b.refreshInProgress {
+ b.refreshMutex.Unlock()
time.Sleep(time.Second)
+ b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()
@@ -139,13 +141,16 @@ func (b *Bslack) populateChannels(wait bool) {
return
}
for b.refreshInProgress {
+ b.refreshMutex.Unlock()
time.Sleep(time.Second)
+ b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()
newChannelsByID := map[string]*slack.Channel{}
newChannelsByName := map[string]*slack.Channel{}
+ newChannelMembers := make(map[string][]string)
// We only retrieve public and private channels, not IMs
// and MPIMs as those do not have a channel name.
@@ -166,7 +171,18 @@ func (b *Bslack) populateChannels(wait bool) {
for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i]
newChannelsByName[channels[i].Name] = &channels[i]
+ // also find all the members in every channel
+ members, err := b.getUsersInConversation(channels[i].ID)
+ if err != nil {
+ if err = b.handleRateLimit(err); err != nil {
+ b.Log.Errorf("Could not retrieve channel members: %#v", err)
+ return
+ }
+ continue
+ }
+ newChannelMembers[channels[i].ID] = members
}
+
if nextCursor == "" {
break
}
@@ -178,6 +194,10 @@ func (b *Bslack) populateChannels(wait bool) {
b.channelsByID = newChannelsByID
b.channelsByName = newChannelsByName
+ b.channelMembersMutex.Lock()
+ defer b.channelMembersMutex.Unlock()
+ b.channelMembers = newChannelMembers
+
b.refreshMutex.Lock()
defer b.refreshMutex.Unlock()
b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval)
@@ -367,3 +387,29 @@ func (b *Bslack) handleRateLimit(err error) error {
time.Sleep(rateLimit.RetryAfter)
return nil
}
+
+// getUsersInConversation returns an array of userIDs that are members of channelID
+func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
+ channelMembers := []string{}
+ for {
+ queryParams := &slack.GetUsersInConversationParameters{
+ ChannelID: channelID,
+ }
+
+ members, nextCursor, err := b.sc.GetUsersInConversation(queryParams)
+ if err != nil {
+ if err = b.handleRateLimit(err); err != nil {
+ return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err)
+ }
+ continue
+ }
+
+ channelMembers = append(channelMembers, members...)
+
+ if nextCursor == "" {
+ break
+ }
+ queryParams.Cursor = nextCursor
+ }
+ return channelMembers, nil
+}
diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go
index b9430278..001b1268 100644
--- a/bridge/slack/slack.go
+++ b/bridge/slack/slack.go
@@ -37,6 +37,9 @@ type Bslack struct {
channelsByName map[string]*slack.Channel
channelsMutex sync.RWMutex
+ channelMembers map[string][]string
+ channelMembersMutex sync.RWMutex
+
refreshInProgress bool
earliestChannelRefresh time.Time
earliestUserRefresh time.Time
@@ -267,6 +270,11 @@ func (b *Bslack) sendWebhook(msg config.Message) error {
}
func (b *Bslack) sendRTM(msg config.Message) (string, error) {
+ // Handle channelmember messages.
+ if handled := b.handleGetChannelMembers(&msg); handled {
+ return "", nil
+ }
+
channelInfo, err := b.getChannel(msg.Channel)
if err != nil {
return "", fmt.Errorf("could not send message: %v", err)
diff --git a/gateway/handlers.go b/gateway/handlers.go
index 741c312e..5af13c14 100644
--- a/gateway/handlers.go
+++ b/gateway/handlers.go
@@ -30,6 +30,23 @@ func (r *Router) handleEventFailure(msg *config.Message) {
}
}
+// handleEventGetChannelMembers handles channel members
+func (r *Router) handleEventGetChannelMembers(msg *config.Message) {
+ if msg.Event != config.EventGetChannelMembers {
+ return
+ }
+ for _, gw := range r.Gateways {
+ for _, br := range gw.Bridges {
+ if msg.Account == br.Account {
+ cMembers := msg.Extra[config.EventGetChannelMembers][0].(config.ChannelMembers)
+ flog.Debugf("Syncing channelmembers from %s", msg.Account)
+ br.SetChannelMembers(&cMembers)
+ return
+ }
+ }
+ }
+}
+
// handleEventRejoinChannels handles rejoining of channels.
func (r *Router) handleEventRejoinChannels(msg *config.Message) {
if msg.Event != config.EventRejoinChannels {
diff --git a/gateway/router.go b/gateway/router.go
index a7181b96..d3c33b2a 100644
--- a/gateway/router.go
+++ b/gateway/router.go
@@ -2,6 +2,7 @@ package gateway
import (
"fmt"
+ "sync"
"time"
"github.com/42wim/matterbridge/bridge"
@@ -16,6 +17,7 @@ type Router struct {
Gateways map[string]*Gateway
Message chan config.Message
MattermostPlugin chan config.Message
+ sync.RWMutex
}
func NewRouter(cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) {
@@ -81,6 +83,7 @@ func (r *Router) Start() error {
}
}
go r.handleReceive()
+ go r.updateChannelMembers()
return nil
}
@@ -108,6 +111,7 @@ func (r *Router) getBridge(account string) *bridge.Bridge {
func (r *Router) handleReceive() {
for msg := range r.Message {
msg := msg // scopelint
+ r.handleEventGetChannelMembers(&msg)
r.handleEventFailure(&msg)
r.handleEventRejoinChannels(&msg)
for _, gw := range r.Gateways {
@@ -129,3 +133,21 @@ func (r *Router) handleReceive() {
}
}
}
+
+// updateChannelMembers sends every minute an GetChannelMembers event to all bridges.
+func (r *Router) updateChannelMembers() {
+ // TODO sleep a minute because slack can take a while
+ // fix this by having actually connectionDone events send to the router
+ time.Sleep(time.Minute)
+ for {
+ for _, gw := range r.Gateways {
+ for _, br := range gw.Bridges {
+ flog.Debugf("sending %s to %s", config.EventGetChannelMembers, br.Account)
+ if _, err := br.Send(config.Message{Event: config.EventGetChannelMembers}); err != nil {
+ flog.Errorf("updateChannelMembers: %s", err)
+ }
+ }
+ }
+ time.Sleep(time.Minute)
+ }
+}