summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/matterbridge/matterclient/matterclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/matterbridge/matterclient/matterclient.go')
-rw-r--r--vendor/github.com/matterbridge/matterclient/matterclient.go743
1 files changed, 743 insertions, 0 deletions
diff --git a/vendor/github.com/matterbridge/matterclient/matterclient.go b/vendor/github.com/matterbridge/matterclient/matterclient.go
new file mode 100644
index 00000000..a32219fe
--- /dev/null
+++ b/vendor/github.com/matterbridge/matterclient/matterclient.go
@@ -0,0 +1,743 @@
+package matterclient
+
+import (
+ "context"
+ "crypto/tls"
+ "errors"
+ "fmt"
+ "net/http"
+ "net/http/cookiejar"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/gorilla/websocket"
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/jpillora/backoff"
+ prefixed "github.com/matterbridge/logrus-prefixed-formatter"
+ "github.com/mattermost/mattermost-server/v6/model"
+ "github.com/sirupsen/logrus"
+)
+
+type Credentials struct {
+ Login string
+ Team string
+ Pass string
+ Token string
+ CookieToken bool
+ Server string
+ NoTLS bool
+ SkipTLSVerify bool
+ SkipVersionCheck bool
+ MFAToken string
+}
+
+type Team struct {
+ Team *model.Team
+ ID string
+ Channels []*model.Channel
+ MoreChannels []*model.Channel
+ Users map[string]*model.User
+}
+
+type Message struct {
+ Raw *model.WebSocketEvent
+ Post *model.Post
+ Team string
+ Channel string
+ Username string
+ Text string
+ Type string
+ UserID string
+}
+
+type Client struct {
+ sync.RWMutex
+ *Credentials
+
+ Team *Team
+ OtherTeams []*Team
+ Client *model.Client4
+ User *model.User
+ Users map[string]*model.User
+ MessageChan chan *Message
+ WsClient *model.WebSocketClient
+ AntiIdle bool
+ AntiIdleChan string
+ AntiIdleIntvl int
+ WsQuit bool
+ WsConnected bool
+ OnWsConnect func()
+ reconnectBusy bool
+
+ logger *logrus.Entry
+ rootLogger *logrus.Logger
+ lruCache *lru.Cache
+ aliveChan chan bool
+ loginCancel context.CancelFunc
+ lastPong time.Time
+}
+
+func New(login string, pass string, team string, server string, mfatoken string) *Client {
+ rootLogger := logrus.New()
+ rootLogger.SetFormatter(&prefixed.TextFormatter{
+ PrefixPadding: 13,
+ DisableColors: true,
+ FullTimestamp: true,
+ })
+
+ cred := &Credentials{
+ Login: login,
+ Pass: pass,
+ Team: team,
+ Server: server,
+ MFAToken: mfatoken,
+ }
+
+ cache, _ := lru.New(500)
+
+ return &Client{
+ Credentials: cred,
+ MessageChan: make(chan *Message, 100),
+ Users: make(map[string]*model.User),
+ rootLogger: rootLogger,
+ lruCache: cache,
+ logger: rootLogger.WithFields(logrus.Fields{"prefix": "matterclient"}),
+ aliveChan: make(chan bool),
+ }
+}
+
+// Login tries to connect the client with the loging details with which it was initialized.
+func (m *Client) Login() error {
+ // check if this is a first connect or a reconnection
+ firstConnection := true
+ if m.WsConnected {
+ firstConnection = false
+ }
+
+ m.WsConnected = false
+ if m.WsQuit {
+ return nil
+ }
+
+ b := &backoff.Backoff{
+ Min: time.Second,
+ Max: 5 * time.Minute,
+ Jitter: true,
+ }
+
+ // do initialization setup
+ if err := m.initClient(b); err != nil {
+ return err
+ }
+
+ if err := m.doLogin(firstConnection, b); err != nil {
+ return err
+ }
+
+ if err := m.initUser(); err != nil {
+ return err
+ }
+
+ if m.Team == nil {
+ validTeamNames := make([]string, len(m.OtherTeams))
+ for i, t := range m.OtherTeams {
+ validTeamNames[i] = t.Team.Name
+ }
+
+ return fmt.Errorf("Team '%s' not found in %v", m.Credentials.Team, validTeamNames)
+ }
+
+ // connect websocket
+ m.wsConnect()
+
+ ctx, loginCancel := context.WithCancel(context.Background())
+ m.loginCancel = loginCancel
+
+ m.logger.Debug("starting wsreceiver")
+
+ go m.WsReceiver(ctx)
+
+ if m.OnWsConnect != nil {
+ m.logger.Debug("executing OnWsConnect()")
+
+ go m.OnWsConnect()
+ }
+
+ go m.checkConnection(ctx)
+
+ if m.AntiIdle {
+ if m.AntiIdleChan == "" {
+ // do anti idle on town-square, every installation should have this channel
+ m.AntiIdleChan = "town-square"
+ }
+
+ channels := m.GetChannels()
+ for _, channel := range channels {
+ if channel.Name == m.AntiIdleChan {
+ go m.antiIdle(ctx, channel.Id, m.AntiIdleIntvl)
+
+ continue
+ }
+ }
+ }
+
+ return nil
+}
+
+func (m *Client) Reconnect() {
+ if m.reconnectBusy {
+ return
+ }
+
+ m.reconnectBusy = true
+
+ m.logger.Info("reconnect: logout")
+ m.reconnectLogout()
+
+ for {
+ m.logger.Info("reconnect: login")
+
+ err := m.Login()
+ if err != nil {
+ m.logger.Errorf("reconnect: login failed: %s, retrying in 10 seconds", err)
+ time.Sleep(time.Second * 10)
+
+ continue
+ }
+
+ break
+ }
+
+ m.logger.Info("reconnect successful")
+
+ m.reconnectBusy = false
+}
+
+func (m *Client) initClient(b *backoff.Backoff) error {
+ uriScheme := "https://"
+ if m.NoTLS {
+ uriScheme = "http://"
+ }
+ // login to mattermost
+ m.Client = model.NewAPIv4Client(uriScheme + m.Credentials.Server)
+ m.Client.HTTPClient.Transport = &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: m.SkipTLSVerify, //nolint:gosec
+ },
+ Proxy: http.ProxyFromEnvironment,
+ }
+ m.Client.HTTPClient.Timeout = time.Second * 10
+
+ // handle MMAUTHTOKEN and personal token
+ if err := m.handleLoginToken(); err != nil {
+ return err
+ }
+
+ // check if server alive, retry until
+ if err := m.serverAlive(b); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *Client) handleLoginToken() error {
+ switch {
+ case strings.Contains(m.Credentials.Pass, model.SessionCookieToken):
+ token := strings.Split(m.Credentials.Pass, model.SessionCookieToken+"=")
+ if len(token) != 2 {
+ return errors.New("incorrect MMAUTHTOKEN. valid input is MMAUTHTOKEN=yourtoken")
+ }
+
+ m.Credentials.Token = token[1]
+ m.Credentials.CookieToken = true
+ case strings.Contains(m.Credentials.Pass, "token="):
+ token := strings.Split(m.Credentials.Pass, "token=")
+ if len(token) != 2 {
+ return errors.New("incorrect personal token. valid input is token=yourtoken")
+ }
+
+ m.Credentials.Token = token[1]
+ }
+
+ return nil
+}
+
+func (m *Client) serverAlive(b *backoff.Backoff) error {
+ defer b.Reset()
+
+ for {
+ d := b.Duration()
+ // bogus call to get the serverversion
+ resp, err := m.Client.Logout()
+ if err != nil {
+ return err
+ }
+
+ if resp.ServerVersion == "" {
+ m.logger.Debugf("Server not up yet, reconnecting in %s", d)
+ time.Sleep(d)
+ } else {
+ m.logger.Infof("Found version %s", resp.ServerVersion)
+
+ return nil
+ }
+ }
+}
+
+// initialize user and teams
+// nolint:funlen
+func (m *Client) initUser() error {
+ m.Lock()
+ defer m.Unlock()
+ // we only load all team data on initial login.
+ // all other updates are for channels from our (primary) team only.
+ teams, _, err := m.Client.GetTeamsForUser(m.User.Id, "")
+ if err != nil {
+ return err
+ }
+
+ for _, team := range teams {
+ idx := 0
+ max := 200
+ usermap := make(map[string]*model.User)
+
+ mmusers, _, err := m.Client.GetUsersInTeam(team.Id, idx, max, "")
+ if err != nil {
+ return err
+ }
+
+ for len(mmusers) > 0 {
+ for _, user := range mmusers {
+ usermap[user.Id] = user
+ }
+
+ mmusers, _, err = m.Client.GetUsersInTeam(team.Id, idx, max, "")
+ if err != nil {
+ return err
+ }
+
+ idx++
+
+ time.Sleep(time.Millisecond * 200)
+ }
+
+ m.logger.Infof("found %d users in team %s", len(usermap), team.Name)
+
+ t := &Team{
+ Team: team,
+ Users: usermap,
+ ID: team.Id,
+ }
+
+ mmchannels, _, err := m.Client.GetChannelsForTeamForUser(team.Id, m.User.Id, false, "")
+ if err != nil {
+ return err
+ }
+
+ t.Channels = mmchannels
+
+ mmchannels, _, err = m.Client.GetPublicChannelsForTeam(team.Id, 0, 5000, "")
+ if err != nil {
+ return err
+ }
+
+ t.MoreChannels = mmchannels
+ m.OtherTeams = append(m.OtherTeams, t)
+
+ if team.Name == m.Credentials.Team {
+ m.Team = t
+ m.logger.Debugf("initUser(): found our team %s (id: %s)", team.Name, team.Id)
+ }
+ // add all users
+ for k, v := range t.Users {
+ m.Users[k] = v
+ }
+ }
+
+ return nil
+}
+
+func (m *Client) doLogin(firstConnection bool, b *backoff.Backoff) error {
+ var (
+ logmsg = "trying login"
+ err error
+ user *model.User
+ )
+
+ for {
+ m.logger.Debugf("%s %s %s %s", logmsg, m.Credentials.Team, m.Credentials.Login, m.Credentials.Server)
+
+ switch {
+ case m.Credentials.Token != "":
+ user, _, err = m.doLoginToken()
+ if err != nil {
+ return err
+ }
+ case m.Credentials.MFAToken != "":
+ user, _, err = m.Client.LoginWithMFA(m.Credentials.Login, m.Credentials.Pass, m.Credentials.MFAToken)
+ default:
+ user, _, err = m.Client.Login(m.Credentials.Login, m.Credentials.Pass)
+ }
+
+ if err != nil {
+ d := b.Duration()
+
+ m.logger.Debug(err)
+
+ if firstConnection {
+ return err
+ }
+
+ m.logger.Debugf("LOGIN: %s, reconnecting in %s", err, d)
+
+ time.Sleep(d)
+
+ logmsg = "retrying login"
+
+ continue
+ }
+
+ m.User = user
+
+ break
+ }
+ // reset timer
+ b.Reset()
+
+ return nil
+}
+
+func (m *Client) doLoginToken() (*model.User, *model.Response, error) {
+ var (
+ resp *model.Response
+ logmsg = "trying login"
+ user *model.User
+ err error
+ )
+
+ m.Client.AuthType = model.HeaderBearer
+ m.Client.AuthToken = m.Credentials.Token
+
+ if m.Credentials.CookieToken {
+ m.logger.Debugf(logmsg + " with cookie (MMAUTH) token")
+ m.Client.HTTPClient.Jar = m.createCookieJar(m.Credentials.Token)
+ } else {
+ m.logger.Debugf(logmsg + " with personal token")
+ }
+
+ user, resp, err = m.Client.GetMe("")
+ if err != nil {
+ return user, resp, err
+ }
+
+ if user == nil {
+ m.logger.Errorf("LOGIN TOKEN: %s is invalid", m.Credentials.Pass)
+
+ return user, resp, errors.New("invalid token")
+ }
+
+ return user, resp, nil
+}
+
+func (m *Client) createCookieJar(token string) *cookiejar.Jar {
+ var cookies []*http.Cookie
+
+ jar, _ := cookiejar.New(nil)
+
+ firstCookie := &http.Cookie{
+ Name: "MMAUTHTOKEN",
+ Value: token,
+ Path: "/",
+ Domain: m.Credentials.Server,
+ }
+
+ cookies = append(cookies, firstCookie)
+ cookieURL, _ := url.Parse("https://" + m.Credentials.Server)
+
+ jar.SetCookies(cookieURL, cookies)
+
+ return jar
+}
+
+func (m *Client) wsConnect() {
+ b := &backoff.Backoff{
+ Min: time.Second,
+ Max: 5 * time.Minute,
+ Jitter: true,
+ }
+
+ m.WsConnected = false
+ wsScheme := "wss://"
+
+ if m.NoTLS {
+ wsScheme = "ws://"
+ }
+
+ // setup websocket connection
+ wsurl := wsScheme + m.Credentials.Server
+ // + model.API_URL_SUFFIX_V4
+ // + "/websocket"
+ header := http.Header{}
+ header.Set(model.HeaderAuth, "BEARER "+m.Client.AuthToken)
+
+ m.logger.Debugf("WsClient: making connection: %s", wsurl)
+
+ for {
+ wsDialer := &websocket.Dialer{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: m.SkipTLSVerify, //nolint:gosec
+ },
+ Proxy: http.ProxyFromEnvironment,
+ }
+
+ var err error
+
+ m.WsClient, err = model.NewWebSocketClientWithDialer(wsDialer, wsurl, m.Client.AuthToken)
+ if err != nil {
+ d := b.Duration()
+
+ m.logger.Debugf("WSS: %s, reconnecting in %s", err, d)
+
+ time.Sleep(d)
+
+ continue
+ }
+
+ break
+ }
+
+ m.WsClient.Listen()
+
+ m.lastPong = time.Now()
+
+ m.logger.Debug("WsClient: connected")
+
+ // only start to parse WS messages when login is completely done
+ m.WsConnected = true
+}
+
+func (m *Client) doCheckAlive() error {
+ if _, _, err := m.Client.GetMe(""); err != nil {
+ return err
+ }
+
+ if m.reconnectBusy {
+ return nil
+ }
+
+ if m.WsClient.ListenError == nil {
+ m.WsClient.SendMessage("ping", nil)
+ } else {
+ m.logger.Errorf("got a listen error: %#v", m.WsClient.ListenError)
+
+ return m.WsClient.ListenError
+ }
+
+ if time.Since(m.lastPong) > 90*time.Second {
+ return errors.New("no pong received in 90 seconds")
+ }
+
+ return nil
+}
+
+func (m *Client) checkAlive(ctx context.Context) {
+ ticker := time.NewTicker(time.Second * 45)
+
+ for {
+ select {
+ case <-ctx.Done():
+ m.logger.Debugf("checkAlive: ctx.Done() triggered")
+
+ return
+ case <-ticker.C:
+ // check if session still is valid
+ err := m.doCheckAlive()
+ if err != nil {
+ m.logger.Errorf("connection not alive: %s", err)
+ m.aliveChan <- false
+ }
+
+ m.aliveChan <- true
+ }
+ }
+}
+
+func (m *Client) checkConnection(ctx context.Context) {
+ go m.checkAlive(ctx)
+
+ for {
+ select {
+ case alive := <-m.aliveChan:
+ if !alive {
+ time.Sleep(time.Second * 10)
+
+ if m.doCheckAlive() != nil {
+ m.Reconnect()
+ }
+ }
+ case <-ctx.Done():
+ m.logger.Debug("checkConnection: ctx.Done() triggered, exiting")
+
+ return
+ }
+ }
+}
+
+// WsReceiver implements the core loop that manages the connection to the chat server. In
+// case of a disconnect it will try to reconnect. A call to this method is blocking until
+// the 'WsQuite' field of the MMClient object is set to 'true'.
+func (m *Client) WsReceiver(ctx context.Context) {
+ m.logger.Debug("starting WsReceiver")
+
+ ticker := time.NewTicker(time.Second * 10)
+
+ for {
+ select {
+ case event := <-m.WsClient.EventChannel:
+ if event == nil {
+ return
+ }
+
+ if !event.IsValid() {
+ continue
+ }
+
+ m.logger.Debugf("WsReceiver event: %#v", event)
+
+ msg := &Message{
+ Raw: event,
+ Team: m.Credentials.Team,
+ }
+
+ m.parseMessage(msg)
+
+ m.MessageChan <- msg
+ case response := <-m.WsClient.ResponseChannel:
+ if response == nil || !response.IsValid() {
+ continue
+ }
+
+ m.logger.Debugf("WsReceiver response: %#v", response)
+
+ if text, ok := response.Data["text"].(string); ok {
+ if text == "pong" {
+ m.lastPong = time.Now()
+ }
+ }
+
+ m.parseResponse(response)
+ case <-m.WsClient.PingTimeoutChannel:
+ m.logger.Error("got a ping timeout")
+ m.Reconnect()
+
+ return
+ case <-ticker.C:
+ if m.WsClient.ListenError != nil {
+ m.logger.Errorf("%#v", m.WsClient.ListenError)
+ m.Reconnect()
+
+ return
+ }
+ case <-ctx.Done():
+ m.logger.Debugf("wsReceiver: ctx.Done() triggered")
+
+ return
+ }
+ }
+}
+
+// Logout disconnects the client from the chat server.
+func (m *Client) reconnectLogout() error {
+ err := m.Logout()
+ m.WsQuit = false
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Logout disconnects the client from the chat server.
+func (m *Client) Logout() error {
+ m.logger.Debug("logout running loginCancel to exit goroutines")
+ m.loginCancel()
+
+ m.logger.Debugf("logout as %s (team: %s) on %s", m.Credentials.Login, m.Credentials.Team, m.Credentials.Server)
+ m.WsQuit = true
+ // close the websocket
+ m.logger.Debug("closing websocket")
+ m.WsClient.Close()
+
+ if strings.Contains(m.Credentials.Pass, model.SessionCookieToken) {
+ m.logger.Debug("Not invalidating session in logout, credential is a token")
+
+ return nil
+ }
+
+ // actually log out
+ m.logger.Debug("running m.Client.Logout")
+
+ if _, err := m.Client.Logout(); err != nil {
+ return err
+ }
+
+ m.logger.Debug("exiting Logout()")
+
+ return nil
+}
+
+// SetLogLevel tries to parse the specified level and if successful sets
+// the log level accordingly. Accepted levels are: 'debug', 'info', 'warn',
+// 'error', 'fatal' and 'panic'.
+func (m *Client) SetLogLevel(level string) {
+ l, err := logrus.ParseLevel(level)
+ if err != nil {
+ m.logger.Warnf("Failed to parse specified log-level '%s': %#v", level, err)
+ } else {
+ m.rootLogger.SetLevel(l)
+ }
+}
+
+func (m *Client) HandleRatelimit(name string, resp *model.Response) error {
+ if resp.StatusCode != 429 {
+ return fmt.Errorf("StatusCode error: %d", resp.StatusCode)
+ }
+
+ waitTime, err := strconv.Atoi(resp.Header.Get("X-RateLimit-Reset"))
+ if err != nil {
+ return err
+ }
+
+ m.logger.Warnf("Ratelimited on %s for %d", name, waitTime)
+
+ time.Sleep(time.Duration(waitTime) * time.Second)
+
+ return nil
+}
+
+func (m *Client) antiIdle(ctx context.Context, channelID string, interval int) {
+ if interval == 0 {
+ interval = 60
+ }
+
+ m.logger.Debugf("starting antiIdle for %s every %d secs", channelID, interval)
+ ticker := time.NewTicker(time.Second * time.Duration(interval))
+
+ for {
+ select {
+ case <-ctx.Done():
+ m.logger.Debugf("antiIlde: ctx.Done() triggered, exiting for %s", channelID)
+
+ return
+ case <-ticker.C:
+ m.logger.Tracef("antiIdle %s", channelID)
+
+ m.UpdateLastViewed(channelID)
+ }
+ }
+}