diff options
author | Wim <wim@42.be> | 2020-12-06 17:18:25 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-06 17:18:25 +0100 |
commit | 2d3c26a4b218279d169da106032a70d3d80c4a53 (patch) | |
tree | a43acf615af032f486808e6d3a2374af09c02e4e /bridge/matrix | |
parent | 8eba2d3e50353b9cfe240f8c5b167a6c26bbd3c4 (diff) | |
download | matterbridge-msglm-2d3c26a4b218279d169da106032a70d3d80c4a53.tar.gz matterbridge-msglm-2d3c26a4b218279d169da106032a70d3d80c4a53.tar.bz2 matterbridge-msglm-2d3c26a4b218279d169da106032a70d3d80c4a53.zip |
Implement ratelimiting (matrix). Fixes #1238 (#1326)
Diffstat (limited to 'bridge/matrix')
-rw-r--r-- | bridge/matrix/helpers.go | 32 | ||||
-rw-r--r-- | bridge/matrix/matrix.go | 188 |
2 files changed, 169 insertions, 51 deletions
diff --git a/bridge/matrix/helpers.go b/bridge/matrix/helpers.go index 8256dc7d..03e448da 100644 --- a/bridge/matrix/helpers.go +++ b/bridge/matrix/helpers.go @@ -181,3 +181,35 @@ func (b *Bmatrix) getAvatarURL(sender string) string { return url } + +// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep +func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) { + httpErr := handleError(err) + if httpErr.Errcode != "M_LIMIT_EXCEEDED" { + return 0, false + } + + b.Log.Debugf("ratelimited: %s", httpErr.Err) + b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000) + + return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true +} + +// retry function will check if we're ratelimited and retries again when backoff time expired +// returns original error if not 429 ratelimit +func (b *Bmatrix) retry(f func() error) error { + b.rateMutex.Lock() + defer b.rateMutex.Unlock() + + for { + if err := f(); err != nil { + if backoff, ok := b.handleRatelimit(err); ok { + time.Sleep(backoff) + } else { + return err + } + } else { + return nil + } + } +} diff --git a/bridge/matrix/matrix.go b/bridge/matrix/matrix.go index 725f49a6..acb20261 100644 --- a/bridge/matrix/matrix.go +++ b/bridge/matrix/matrix.go @@ -30,6 +30,7 @@ type Bmatrix struct { UserID string NicknameMap map[string]NicknameCacheEntry RoomMap map[string]string + rateMutex sync.RWMutex sync.RWMutex *bridge.Config } @@ -99,25 +100,18 @@ func (b *Bmatrix) Disconnect() error { } func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error { -retry: - resp, err := b.mc.JoinRoom(channel.Name, "", nil) - if err != nil { - httpErr := handleError(err) - if httpErr.Errcode == "M_LIMIT_EXCEEDED" { - b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before joining %s", httpErr.RetryAfterMs/1000, channel.Name) - time.Sleep((time.Duration(httpErr.RetryAfterMs) * time.Millisecond)) - - goto retry + return b.retry(func() error { + resp, err := b.mc.JoinRoom(channel.Name, "", nil) + if err != nil { + return err } - return err - } + b.Lock() + b.RoomMap[resp.RoomID] = channel.Name + b.Unlock() - b.Lock() - b.RoomMap[resp.RoomID] = channel.Name - b.Unlock() - - return nil + return nil + }) } func (b *Bmatrix) Send(msg config.Message) (string, error) { @@ -135,11 +129,21 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) { Body: username.plain + msg.Text, FormattedBody: username.formatted + msg.Text, } - resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) - if err != nil { - return "", err - } - return resp.EventID, err + + msgID := "" + + err := b.retry(func() error { + resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) + if err != nil { + return err + } + + msgID = resp.EventID + + return err + }) + + return msgID, err } // Delete message @@ -147,17 +151,34 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) { if msg.ID == "" { return "", nil } - resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{}) - if err != nil { - return "", err - } - return resp.EventID, err + + msgID := "" + + err := b.retry(func() error { + resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{}) + if err != nil { + return err + } + + msgID = resp.EventID + + return err + }) + + return msgID, err } // Upload a file if it exists if msg.Extra != nil { for _, rmsg := range helper.HandleExtra(&msg, b.General) { - if _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text); err != nil { + rmsg := rmsg + + err := b.retry(func() error { + _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text) + + return err + }) + if err != nil { b.Log.Errorf("sendText failed: %s", err) } } @@ -187,7 +208,12 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) { EventID: msg.ID, Type: "m.replace", } - _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg) + + err := b.retry(func() error { + _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg) + + return err + }) if err != nil { return "", err } @@ -202,26 +228,58 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) { Body: username.plain + msg.Text, FormattedBody: username.formatted + msg.Text, } - resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) + + var ( + resp *matrix.RespSendEvent + err error + ) + + err = b.retry(func() error { + resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m) + + return err + }) if err != nil { return "", err } + return resp.EventID, err } if b.GetBool("HTMLDisable") { - resp, err := b.mc.SendText(channel, username.plain+msg.Text) + var ( + resp *matrix.RespSendEvent + err error + ) + + err = b.retry(func() error { + resp, err = b.mc.SendText(channel, username.plain+msg.Text) + + return err + }) if err != nil { return "", err } + return resp.EventID, err } // Post normal message with HTML support (eg riot.im) - resp, err := b.mc.SendFormattedText(channel, username.plain+msg.Text, username.formatted+helper.ParseMarkdown(msg.Text)) + var ( + resp *matrix.RespSendEvent + err error + ) + + err = b.retry(func() error { + resp, err = b.mc.SendFormattedText(channel, username.plain+msg.Text, + username.formatted+helper.ParseMarkdown(msg.Text)) + + return err + }) if err != nil { return "", err } + return resp.EventID, err } @@ -420,13 +478,25 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf sp := strings.Split(fi.Name, ".") mtype := mime.TypeByExtension("." + sp[len(sp)-1]) // image and video uploads send no username, we have to do this ourself here #715 - _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment) + err := b.retry(func() error { + _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment) + + return err + }) if err != nil { b.Log.Errorf("file comment failed: %#v", err) } b.Log.Debugf("uploading file: %s %s", fi.Name, mtype) - res, err := b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data))) + + var res *matrix.RespMediaUpload + + err = b.retry(func() error { + res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data))) + + return err + }) + if err != nil { b.Log.Errorf("file upload failed: %#v", err) return @@ -435,40 +505,56 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf switch { case strings.Contains(mtype, "video"): b.Log.Debugf("sendVideo %s", res.ContentURI) - _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI) + err = b.retry(func() error { + _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI) + + return err + }) if err != nil { b.Log.Errorf("sendVideo failed: %#v", err) } case strings.Contains(mtype, "image"): b.Log.Debugf("sendImage %s", res.ContentURI) - _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI) + err = b.retry(func() error { + _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI) + + return err + }) if err != nil { b.Log.Errorf("sendImage failed: %#v", err) } case strings.Contains(mtype, "audio"): b.Log.Debugf("sendAudio %s", res.ContentURI) - _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{ - MsgType: "m.audio", - Body: fi.Name, - URL: res.ContentURI, - Info: matrix.AudioInfo{ - Mimetype: mtype, - Size: uint(len(*fi.Data)), - }, + err = b.retry(func() error { + _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{ + MsgType: "m.audio", + Body: fi.Name, + URL: res.ContentURI, + Info: matrix.AudioInfo{ + Mimetype: mtype, + Size: uint(len(*fi.Data)), + }, + }) + + return err }) if err != nil { b.Log.Errorf("sendAudio failed: %#v", err) } default: b.Log.Debugf("sendFile %s", res.ContentURI) - _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{ - MsgType: "m.file", - Body: fi.Name, - URL: res.ContentURI, - Info: matrix.FileInfo{ - Mimetype: mtype, - Size: uint(len(*fi.Data)), - }, + err = b.retry(func() error { + _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{ + MsgType: "m.file", + Body: fi.Name, + URL: res.ContentURI, + Info: matrix.FileInfo{ + Mimetype: mtype, + Size: uint(len(*fi.Data)), + }, + }) + + return err }) if err != nil { b.Log.Errorf("sendFile failed: %#v", err) |