summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWim <wim@42.be>2020-12-06 17:18:25 +0100
committerGitHub <noreply@github.com>2020-12-06 17:18:25 +0100
commit2d3c26a4b218279d169da106032a70d3d80c4a53 (patch)
treea43acf615af032f486808e6d3a2374af09c02e4e
parent8eba2d3e50353b9cfe240f8c5b167a6c26bbd3c4 (diff)
downloadmatterbridge-msglm-2d3c26a4b218279d169da106032a70d3d80c4a53.tar.gz
matterbridge-msglm-2d3c26a4b218279d169da106032a70d3d80c4a53.tar.bz2
matterbridge-msglm-2d3c26a4b218279d169da106032a70d3d80c4a53.zip
Implement ratelimiting (matrix). Fixes #1238 (#1326)
-rw-r--r--bridge/matrix/helpers.go32
-rw-r--r--bridge/matrix/matrix.go188
2 files changed, 169 insertions, 51 deletions
diff --git a/bridge/matrix/helpers.go b/bridge/matrix/helpers.go
index 8256dc7d..03e448da 100644
--- a/bridge/matrix/helpers.go
+++ b/bridge/matrix/helpers.go
@@ -181,3 +181,35 @@ func (b *Bmatrix) getAvatarURL(sender string) string {
return url
}
+
+// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep
+func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) {
+ httpErr := handleError(err)
+ if httpErr.Errcode != "M_LIMIT_EXCEEDED" {
+ return 0, false
+ }
+
+ b.Log.Debugf("ratelimited: %s", httpErr.Err)
+ b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000)
+
+ return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true
+}
+
+// retry function will check if we're ratelimited and retries again when backoff time expired
+// returns original error if not 429 ratelimit
+func (b *Bmatrix) retry(f func() error) error {
+ b.rateMutex.Lock()
+ defer b.rateMutex.Unlock()
+
+ for {
+ if err := f(); err != nil {
+ if backoff, ok := b.handleRatelimit(err); ok {
+ time.Sleep(backoff)
+ } else {
+ return err
+ }
+ } else {
+ return nil
+ }
+ }
+}
diff --git a/bridge/matrix/matrix.go b/bridge/matrix/matrix.go
index 725f49a6..acb20261 100644
--- a/bridge/matrix/matrix.go
+++ b/bridge/matrix/matrix.go
@@ -30,6 +30,7 @@ type Bmatrix struct {
UserID string
NicknameMap map[string]NicknameCacheEntry
RoomMap map[string]string
+ rateMutex sync.RWMutex
sync.RWMutex
*bridge.Config
}
@@ -99,25 +100,18 @@ func (b *Bmatrix) Disconnect() error {
}
func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error {
-retry:
- resp, err := b.mc.JoinRoom(channel.Name, "", nil)
- if err != nil {
- httpErr := handleError(err)
- if httpErr.Errcode == "M_LIMIT_EXCEEDED" {
- b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before joining %s", httpErr.RetryAfterMs/1000, channel.Name)
- time.Sleep((time.Duration(httpErr.RetryAfterMs) * time.Millisecond))
-
- goto retry
+ return b.retry(func() error {
+ resp, err := b.mc.JoinRoom(channel.Name, "", nil)
+ if err != nil {
+ return err
}
- return err
- }
+ b.Lock()
+ b.RoomMap[resp.RoomID] = channel.Name
+ b.Unlock()
- b.Lock()
- b.RoomMap[resp.RoomID] = channel.Name
- b.Unlock()
-
- return nil
+ return nil
+ })
}
func (b *Bmatrix) Send(msg config.Message) (string, error) {
@@ -135,11 +129,21 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
Body: username.plain + msg.Text,
FormattedBody: username.formatted + msg.Text,
}
- resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
- if err != nil {
- return "", err
- }
- return resp.EventID, err
+
+ msgID := ""
+
+ err := b.retry(func() error {
+ resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
+ if err != nil {
+ return err
+ }
+
+ msgID = resp.EventID
+
+ return err
+ })
+
+ return msgID, err
}
// Delete message
@@ -147,17 +151,34 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
if msg.ID == "" {
return "", nil
}
- resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{})
- if err != nil {
- return "", err
- }
- return resp.EventID, err
+
+ msgID := ""
+
+ err := b.retry(func() error {
+ resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{})
+ if err != nil {
+ return err
+ }
+
+ msgID = resp.EventID
+
+ return err
+ })
+
+ return msgID, err
}
// Upload a file if it exists
if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) {
- if _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text); err != nil {
+ rmsg := rmsg
+
+ err := b.retry(func() error {
+ _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text)
+
+ return err
+ })
+ if err != nil {
b.Log.Errorf("sendText failed: %s", err)
}
}
@@ -187,7 +208,12 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
EventID: msg.ID,
Type: "m.replace",
}
- _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg)
+
+ err := b.retry(func() error {
+ _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg)
+
+ return err
+ })
if err != nil {
return "", err
}
@@ -202,26 +228,58 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
Body: username.plain + msg.Text,
FormattedBody: username.formatted + msg.Text,
}
- resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
+
+ var (
+ resp *matrix.RespSendEvent
+ err error
+ )
+
+ err = b.retry(func() error {
+ resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m)
+
+ return err
+ })
if err != nil {
return "", err
}
+
return resp.EventID, err
}
if b.GetBool("HTMLDisable") {
- resp, err := b.mc.SendText(channel, username.plain+msg.Text)
+ var (
+ resp *matrix.RespSendEvent
+ err error
+ )
+
+ err = b.retry(func() error {
+ resp, err = b.mc.SendText(channel, username.plain+msg.Text)
+
+ return err
+ })
if err != nil {
return "", err
}
+
return resp.EventID, err
}
// Post normal message with HTML support (eg riot.im)
- resp, err := b.mc.SendFormattedText(channel, username.plain+msg.Text, username.formatted+helper.ParseMarkdown(msg.Text))
+ var (
+ resp *matrix.RespSendEvent
+ err error
+ )
+
+ err = b.retry(func() error {
+ resp, err = b.mc.SendFormattedText(channel, username.plain+msg.Text,
+ username.formatted+helper.ParseMarkdown(msg.Text))
+
+ return err
+ })
if err != nil {
return "", err
}
+
return resp.EventID, err
}
@@ -420,13 +478,25 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
sp := strings.Split(fi.Name, ".")
mtype := mime.TypeByExtension("." + sp[len(sp)-1])
// image and video uploads send no username, we have to do this ourself here #715
- _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment)
+ err := b.retry(func() error {
+ _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment)
+
+ return err
+ })
if err != nil {
b.Log.Errorf("file comment failed: %#v", err)
}
b.Log.Debugf("uploading file: %s %s", fi.Name, mtype)
- res, err := b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))
+
+ var res *matrix.RespMediaUpload
+
+ err = b.retry(func() error {
+ res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))
+
+ return err
+ })
+
if err != nil {
b.Log.Errorf("file upload failed: %#v", err)
return
@@ -435,40 +505,56 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
switch {
case strings.Contains(mtype, "video"):
b.Log.Debugf("sendVideo %s", res.ContentURI)
- _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI)
+ err = b.retry(func() error {
+ _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI)
+
+ return err
+ })
if err != nil {
b.Log.Errorf("sendVideo failed: %#v", err)
}
case strings.Contains(mtype, "image"):
b.Log.Debugf("sendImage %s", res.ContentURI)
- _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI)
+ err = b.retry(func() error {
+ _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI)
+
+ return err
+ })
if err != nil {
b.Log.Errorf("sendImage failed: %#v", err)
}
case strings.Contains(mtype, "audio"):
b.Log.Debugf("sendAudio %s", res.ContentURI)
- _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{
- MsgType: "m.audio",
- Body: fi.Name,
- URL: res.ContentURI,
- Info: matrix.AudioInfo{
- Mimetype: mtype,
- Size: uint(len(*fi.Data)),
- },
+ err = b.retry(func() error {
+ _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{
+ MsgType: "m.audio",
+ Body: fi.Name,
+ URL: res.ContentURI,
+ Info: matrix.AudioInfo{
+ Mimetype: mtype,
+ Size: uint(len(*fi.Data)),
+ },
+ })
+
+ return err
})
if err != nil {
b.Log.Errorf("sendAudio failed: %#v", err)
}
default:
b.Log.Debugf("sendFile %s", res.ContentURI)
- _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{
- MsgType: "m.file",
- Body: fi.Name,
- URL: res.ContentURI,
- Info: matrix.FileInfo{
- Mimetype: mtype,
- Size: uint(len(*fi.Data)),
- },
+ err = b.retry(func() error {
+ _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{
+ MsgType: "m.file",
+ Body: fi.Name,
+ URL: res.ContentURI,
+ Info: matrix.FileInfo{
+ Mimetype: mtype,
+ Size: uint(len(*fi.Data)),
+ },
+ })
+
+ return err
})
if err != nil {
b.Log.Errorf("sendFile failed: %#v", err)