diff options
Diffstat (limited to 'vendor/github.com/matterbridge/matterclient/matterclient.go')
-rw-r--r-- | vendor/github.com/matterbridge/matterclient/matterclient.go | 743 |
1 files changed, 743 insertions, 0 deletions
diff --git a/vendor/github.com/matterbridge/matterclient/matterclient.go b/vendor/github.com/matterbridge/matterclient/matterclient.go new file mode 100644 index 00000000..a32219fe --- /dev/null +++ b/vendor/github.com/matterbridge/matterclient/matterclient.go @@ -0,0 +1,743 @@ +package matterclient + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net/http" + "net/http/cookiejar" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + lru "github.com/hashicorp/golang-lru" + "github.com/jpillora/backoff" + prefixed "github.com/matterbridge/logrus-prefixed-formatter" + "github.com/mattermost/mattermost-server/v6/model" + "github.com/sirupsen/logrus" +) + +type Credentials struct { + Login string + Team string + Pass string + Token string + CookieToken bool + Server string + NoTLS bool + SkipTLSVerify bool + SkipVersionCheck bool + MFAToken string +} + +type Team struct { + Team *model.Team + ID string + Channels []*model.Channel + MoreChannels []*model.Channel + Users map[string]*model.User +} + +type Message struct { + Raw *model.WebSocketEvent + Post *model.Post + Team string + Channel string + Username string + Text string + Type string + UserID string +} + +type Client struct { + sync.RWMutex + *Credentials + + Team *Team + OtherTeams []*Team + Client *model.Client4 + User *model.User + Users map[string]*model.User + MessageChan chan *Message + WsClient *model.WebSocketClient + AntiIdle bool + AntiIdleChan string + AntiIdleIntvl int + WsQuit bool + WsConnected bool + OnWsConnect func() + reconnectBusy bool + + logger *logrus.Entry + rootLogger *logrus.Logger + lruCache *lru.Cache + aliveChan chan bool + loginCancel context.CancelFunc + lastPong time.Time +} + +func New(login string, pass string, team string, server string, mfatoken string) *Client { + rootLogger := logrus.New() + rootLogger.SetFormatter(&prefixed.TextFormatter{ + PrefixPadding: 13, + DisableColors: true, + FullTimestamp: true, + }) + + cred := &Credentials{ + Login: login, + Pass: pass, + Team: team, + Server: server, + MFAToken: mfatoken, + } + + cache, _ := lru.New(500) + + return &Client{ + Credentials: cred, + MessageChan: make(chan *Message, 100), + Users: make(map[string]*model.User), + rootLogger: rootLogger, + lruCache: cache, + logger: rootLogger.WithFields(logrus.Fields{"prefix": "matterclient"}), + aliveChan: make(chan bool), + } +} + +// Login tries to connect the client with the loging details with which it was initialized. +func (m *Client) Login() error { + // check if this is a first connect or a reconnection + firstConnection := true + if m.WsConnected { + firstConnection = false + } + + m.WsConnected = false + if m.WsQuit { + return nil + } + + b := &backoff.Backoff{ + Min: time.Second, + Max: 5 * time.Minute, + Jitter: true, + } + + // do initialization setup + if err := m.initClient(b); err != nil { + return err + } + + if err := m.doLogin(firstConnection, b); err != nil { + return err + } + + if err := m.initUser(); err != nil { + return err + } + + if m.Team == nil { + validTeamNames := make([]string, len(m.OtherTeams)) + for i, t := range m.OtherTeams { + validTeamNames[i] = t.Team.Name + } + + return fmt.Errorf("Team '%s' not found in %v", m.Credentials.Team, validTeamNames) + } + + // connect websocket + m.wsConnect() + + ctx, loginCancel := context.WithCancel(context.Background()) + m.loginCancel = loginCancel + + m.logger.Debug("starting wsreceiver") + + go m.WsReceiver(ctx) + + if m.OnWsConnect != nil { + m.logger.Debug("executing OnWsConnect()") + + go m.OnWsConnect() + } + + go m.checkConnection(ctx) + + if m.AntiIdle { + if m.AntiIdleChan == "" { + // do anti idle on town-square, every installation should have this channel + m.AntiIdleChan = "town-square" + } + + channels := m.GetChannels() + for _, channel := range channels { + if channel.Name == m.AntiIdleChan { + go m.antiIdle(ctx, channel.Id, m.AntiIdleIntvl) + + continue + } + } + } + + return nil +} + +func (m *Client) Reconnect() { + if m.reconnectBusy { + return + } + + m.reconnectBusy = true + + m.logger.Info("reconnect: logout") + m.reconnectLogout() + + for { + m.logger.Info("reconnect: login") + + err := m.Login() + if err != nil { + m.logger.Errorf("reconnect: login failed: %s, retrying in 10 seconds", err) + time.Sleep(time.Second * 10) + + continue + } + + break + } + + m.logger.Info("reconnect successful") + + m.reconnectBusy = false +} + +func (m *Client) initClient(b *backoff.Backoff) error { + uriScheme := "https://" + if m.NoTLS { + uriScheme = "http://" + } + // login to mattermost + m.Client = model.NewAPIv4Client(uriScheme + m.Credentials.Server) + m.Client.HTTPClient.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: m.SkipTLSVerify, //nolint:gosec + }, + Proxy: http.ProxyFromEnvironment, + } + m.Client.HTTPClient.Timeout = time.Second * 10 + + // handle MMAUTHTOKEN and personal token + if err := m.handleLoginToken(); err != nil { + return err + } + + // check if server alive, retry until + if err := m.serverAlive(b); err != nil { + return err + } + + return nil +} + +func (m *Client) handleLoginToken() error { + switch { + case strings.Contains(m.Credentials.Pass, model.SessionCookieToken): + token := strings.Split(m.Credentials.Pass, model.SessionCookieToken+"=") + if len(token) != 2 { + return errors.New("incorrect MMAUTHTOKEN. valid input is MMAUTHTOKEN=yourtoken") + } + + m.Credentials.Token = token[1] + m.Credentials.CookieToken = true + case strings.Contains(m.Credentials.Pass, "token="): + token := strings.Split(m.Credentials.Pass, "token=") + if len(token) != 2 { + return errors.New("incorrect personal token. valid input is token=yourtoken") + } + + m.Credentials.Token = token[1] + } + + return nil +} + +func (m *Client) serverAlive(b *backoff.Backoff) error { + defer b.Reset() + + for { + d := b.Duration() + // bogus call to get the serverversion + resp, err := m.Client.Logout() + if err != nil { + return err + } + + if resp.ServerVersion == "" { + m.logger.Debugf("Server not up yet, reconnecting in %s", d) + time.Sleep(d) + } else { + m.logger.Infof("Found version %s", resp.ServerVersion) + + return nil + } + } +} + +// initialize user and teams +// nolint:funlen +func (m *Client) initUser() error { + m.Lock() + defer m.Unlock() + // we only load all team data on initial login. + // all other updates are for channels from our (primary) team only. + teams, _, err := m.Client.GetTeamsForUser(m.User.Id, "") + if err != nil { + return err + } + + for _, team := range teams { + idx := 0 + max := 200 + usermap := make(map[string]*model.User) + + mmusers, _, err := m.Client.GetUsersInTeam(team.Id, idx, max, "") + if err != nil { + return err + } + + for len(mmusers) > 0 { + for _, user := range mmusers { + usermap[user.Id] = user + } + + mmusers, _, err = m.Client.GetUsersInTeam(team.Id, idx, max, "") + if err != nil { + return err + } + + idx++ + + time.Sleep(time.Millisecond * 200) + } + + m.logger.Infof("found %d users in team %s", len(usermap), team.Name) + + t := &Team{ + Team: team, + Users: usermap, + ID: team.Id, + } + + mmchannels, _, err := m.Client.GetChannelsForTeamForUser(team.Id, m.User.Id, false, "") + if err != nil { + return err + } + + t.Channels = mmchannels + + mmchannels, _, err = m.Client.GetPublicChannelsForTeam(team.Id, 0, 5000, "") + if err != nil { + return err + } + + t.MoreChannels = mmchannels + m.OtherTeams = append(m.OtherTeams, t) + + if team.Name == m.Credentials.Team { + m.Team = t + m.logger.Debugf("initUser(): found our team %s (id: %s)", team.Name, team.Id) + } + // add all users + for k, v := range t.Users { + m.Users[k] = v + } + } + + return nil +} + +func (m *Client) doLogin(firstConnection bool, b *backoff.Backoff) error { + var ( + logmsg = "trying login" + err error + user *model.User + ) + + for { + m.logger.Debugf("%s %s %s %s", logmsg, m.Credentials.Team, m.Credentials.Login, m.Credentials.Server) + + switch { + case m.Credentials.Token != "": + user, _, err = m.doLoginToken() + if err != nil { + return err + } + case m.Credentials.MFAToken != "": + user, _, err = m.Client.LoginWithMFA(m.Credentials.Login, m.Credentials.Pass, m.Credentials.MFAToken) + default: + user, _, err = m.Client.Login(m.Credentials.Login, m.Credentials.Pass) + } + + if err != nil { + d := b.Duration() + + m.logger.Debug(err) + + if firstConnection { + return err + } + + m.logger.Debugf("LOGIN: %s, reconnecting in %s", err, d) + + time.Sleep(d) + + logmsg = "retrying login" + + continue + } + + m.User = user + + break + } + // reset timer + b.Reset() + + return nil +} + +func (m *Client) doLoginToken() (*model.User, *model.Response, error) { + var ( + resp *model.Response + logmsg = "trying login" + user *model.User + err error + ) + + m.Client.AuthType = model.HeaderBearer + m.Client.AuthToken = m.Credentials.Token + + if m.Credentials.CookieToken { + m.logger.Debugf(logmsg + " with cookie (MMAUTH) token") + m.Client.HTTPClient.Jar = m.createCookieJar(m.Credentials.Token) + } else { + m.logger.Debugf(logmsg + " with personal token") + } + + user, resp, err = m.Client.GetMe("") + if err != nil { + return user, resp, err + } + + if user == nil { + m.logger.Errorf("LOGIN TOKEN: %s is invalid", m.Credentials.Pass) + + return user, resp, errors.New("invalid token") + } + + return user, resp, nil +} + +func (m *Client) createCookieJar(token string) *cookiejar.Jar { + var cookies []*http.Cookie + + jar, _ := cookiejar.New(nil) + + firstCookie := &http.Cookie{ + Name: "MMAUTHTOKEN", + Value: token, + Path: "/", + Domain: m.Credentials.Server, + } + + cookies = append(cookies, firstCookie) + cookieURL, _ := url.Parse("https://" + m.Credentials.Server) + + jar.SetCookies(cookieURL, cookies) + + return jar +} + +func (m *Client) wsConnect() { + b := &backoff.Backoff{ + Min: time.Second, + Max: 5 * time.Minute, + Jitter: true, + } + + m.WsConnected = false + wsScheme := "wss://" + + if m.NoTLS { + wsScheme = "ws://" + } + + // setup websocket connection + wsurl := wsScheme + m.Credentials.Server + // + model.API_URL_SUFFIX_V4 + // + "/websocket" + header := http.Header{} + header.Set(model.HeaderAuth, "BEARER "+m.Client.AuthToken) + + m.logger.Debugf("WsClient: making connection: %s", wsurl) + + for { + wsDialer := &websocket.Dialer{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: m.SkipTLSVerify, //nolint:gosec + }, + Proxy: http.ProxyFromEnvironment, + } + + var err error + + m.WsClient, err = model.NewWebSocketClientWithDialer(wsDialer, wsurl, m.Client.AuthToken) + if err != nil { + d := b.Duration() + + m.logger.Debugf("WSS: %s, reconnecting in %s", err, d) + + time.Sleep(d) + + continue + } + + break + } + + m.WsClient.Listen() + + m.lastPong = time.Now() + + m.logger.Debug("WsClient: connected") + + // only start to parse WS messages when login is completely done + m.WsConnected = true +} + +func (m *Client) doCheckAlive() error { + if _, _, err := m.Client.GetMe(""); err != nil { + return err + } + + if m.reconnectBusy { + return nil + } + + if m.WsClient.ListenError == nil { + m.WsClient.SendMessage("ping", nil) + } else { + m.logger.Errorf("got a listen error: %#v", m.WsClient.ListenError) + + return m.WsClient.ListenError + } + + if time.Since(m.lastPong) > 90*time.Second { + return errors.New("no pong received in 90 seconds") + } + + return nil +} + +func (m *Client) checkAlive(ctx context.Context) { + ticker := time.NewTicker(time.Second * 45) + + for { + select { + case <-ctx.Done(): + m.logger.Debugf("checkAlive: ctx.Done() triggered") + + return + case <-ticker.C: + // check if session still is valid + err := m.doCheckAlive() + if err != nil { + m.logger.Errorf("connection not alive: %s", err) + m.aliveChan <- false + } + + m.aliveChan <- true + } + } +} + +func (m *Client) checkConnection(ctx context.Context) { + go m.checkAlive(ctx) + + for { + select { + case alive := <-m.aliveChan: + if !alive { + time.Sleep(time.Second * 10) + + if m.doCheckAlive() != nil { + m.Reconnect() + } + } + case <-ctx.Done(): + m.logger.Debug("checkConnection: ctx.Done() triggered, exiting") + + return + } + } +} + +// WsReceiver implements the core loop that manages the connection to the chat server. In +// case of a disconnect it will try to reconnect. A call to this method is blocking until +// the 'WsQuite' field of the MMClient object is set to 'true'. +func (m *Client) WsReceiver(ctx context.Context) { + m.logger.Debug("starting WsReceiver") + + ticker := time.NewTicker(time.Second * 10) + + for { + select { + case event := <-m.WsClient.EventChannel: + if event == nil { + return + } + + if !event.IsValid() { + continue + } + + m.logger.Debugf("WsReceiver event: %#v", event) + + msg := &Message{ + Raw: event, + Team: m.Credentials.Team, + } + + m.parseMessage(msg) + + m.MessageChan <- msg + case response := <-m.WsClient.ResponseChannel: + if response == nil || !response.IsValid() { + continue + } + + m.logger.Debugf("WsReceiver response: %#v", response) + + if text, ok := response.Data["text"].(string); ok { + if text == "pong" { + m.lastPong = time.Now() + } + } + + m.parseResponse(response) + case <-m.WsClient.PingTimeoutChannel: + m.logger.Error("got a ping timeout") + m.Reconnect() + + return + case <-ticker.C: + if m.WsClient.ListenError != nil { + m.logger.Errorf("%#v", m.WsClient.ListenError) + m.Reconnect() + + return + } + case <-ctx.Done(): + m.logger.Debugf("wsReceiver: ctx.Done() triggered") + + return + } + } +} + +// Logout disconnects the client from the chat server. +func (m *Client) reconnectLogout() error { + err := m.Logout() + m.WsQuit = false + + if err != nil { + return err + } + + return nil +} + +// Logout disconnects the client from the chat server. +func (m *Client) Logout() error { + m.logger.Debug("logout running loginCancel to exit goroutines") + m.loginCancel() + + m.logger.Debugf("logout as %s (team: %s) on %s", m.Credentials.Login, m.Credentials.Team, m.Credentials.Server) + m.WsQuit = true + // close the websocket + m.logger.Debug("closing websocket") + m.WsClient.Close() + + if strings.Contains(m.Credentials.Pass, model.SessionCookieToken) { + m.logger.Debug("Not invalidating session in logout, credential is a token") + + return nil + } + + // actually log out + m.logger.Debug("running m.Client.Logout") + + if _, err := m.Client.Logout(); err != nil { + return err + } + + m.logger.Debug("exiting Logout()") + + return nil +} + +// SetLogLevel tries to parse the specified level and if successful sets +// the log level accordingly. Accepted levels are: 'debug', 'info', 'warn', +// 'error', 'fatal' and 'panic'. +func (m *Client) SetLogLevel(level string) { + l, err := logrus.ParseLevel(level) + if err != nil { + m.logger.Warnf("Failed to parse specified log-level '%s': %#v", level, err) + } else { + m.rootLogger.SetLevel(l) + } +} + +func (m *Client) HandleRatelimit(name string, resp *model.Response) error { + if resp.StatusCode != 429 { + return fmt.Errorf("StatusCode error: %d", resp.StatusCode) + } + + waitTime, err := strconv.Atoi(resp.Header.Get("X-RateLimit-Reset")) + if err != nil { + return err + } + + m.logger.Warnf("Ratelimited on %s for %d", name, waitTime) + + time.Sleep(time.Duration(waitTime) * time.Second) + + return nil +} + +func (m *Client) antiIdle(ctx context.Context, channelID string, interval int) { + if interval == 0 { + interval = 60 + } + + m.logger.Debugf("starting antiIdle for %s every %d secs", channelID, interval) + ticker := time.NewTicker(time.Second * time.Duration(interval)) + + for { + select { + case <-ctx.Done(): + m.logger.Debugf("antiIlde: ctx.Done() triggered, exiting for %s", channelID) + + return + case <-ticker.C: + m.logger.Tracef("antiIdle %s", channelID) + + m.UpdateLastViewed(channelID) + } + } +} |