summaryrefslogtreecommitdiffstats
path: root/bridge/matrix
diff options
context:
space:
mode:
authormsglm <msglm@techchud.xyz>2023-10-27 07:08:25 -0500
committermsglm <msglm@techchud.xyz>2023-10-27 07:08:25 -0500
commit032a7e0c1188d3507b8d9a9571f2446a43cf775b (patch)
tree2bd38c01bc7761a6195e426082ce7191ebc765a1 /bridge/matrix
parent56e7bd01ca09ad52b0c4f48f146a20a4f1b78696 (diff)
downloadmatterbridge-msglm-032a7e0c1188d3507b8d9a9571f2446a43cf775b.tar.gz
matterbridge-msglm-032a7e0c1188d3507b8d9a9571f2446a43cf775b.tar.bz2
matterbridge-msglm-032a7e0c1188d3507b8d9a9571f2446a43cf775b.zip
apply https://github.com/42wim/matterbridge/pull/1864v1.26.0+0.1.0
Diffstat (limited to 'bridge/matrix')
-rw-r--r--bridge/matrix/appservice.go147
-rw-r--r--bridge/matrix/handlers.go344
-rw-r--r--bridge/matrix/helpers.go367
-rw-r--r--bridge/matrix/matrix.go806
-rw-r--r--bridge/matrix/zerolog_abstraction.go43
5 files changed, 1057 insertions, 650 deletions
diff --git a/bridge/matrix/appservice.go b/bridge/matrix/appservice.go
new file mode 100644
index 00000000..ef172bff
--- /dev/null
+++ b/bridge/matrix/appservice.go
@@ -0,0 +1,147 @@
+package bmatrix
+
+import (
+ "fmt"
+ "regexp"
+
+ "github.com/sirupsen/logrus"
+
+ "maunium.net/go/mautrix/appservice"
+ "maunium.net/go/mautrix/event"
+ "maunium.net/go/mautrix/id"
+)
+
+type AppServiceNamespaces struct {
+ rooms []*regexp.Regexp
+ usernames []*regexp.Regexp
+ prefixes []string
+}
+
+type AppServiceWrapper struct {
+ appService *appservice.AppService
+ namespaces AppServiceNamespaces
+ stop chan struct{}
+ stopAck chan struct{}
+}
+
+func (w *AppServiceWrapper) ParseNamespaces(logger *logrus.Entry) error {
+ if w.appService.Registration != nil {
+ // TODO: handle non-exclusive registrations
+ for _, v := range w.appService.Registration.Namespaces.RoomIDs {
+ re, err := regexp.Compile(v.Regex)
+ if err != nil {
+ logger.Warnf("couldn't parse the appservice regex '%s'", v.Regex)
+ continue
+ }
+
+ w.namespaces.rooms = append(w.namespaces.rooms, re)
+ }
+
+ for _, v := range w.appService.Registration.Namespaces.UserIDs {
+ re, err := regexp.Compile(v.Regex)
+ if err != nil {
+ logger.Warnf("couldn't parse the appservice regex '%s'", v.Regex)
+ continue
+ }
+
+ // we assume that the user regexes will be of the form '@<some prefix>.*'
+ // where '.*' will be replaced by the username we spoof
+ prefix, _ := re.LiteralPrefix()
+ if prefix == "" || prefix == "@" {
+ logger.Warnf("couldn't find an acceptable prefix in the appservice regex '%s'", v.Regex)
+ continue
+ }
+
+ if v.Regex != fmt.Sprintf("%s.*", prefix) {
+ logger.Warnf("complex regexpes are not supported for appServices, the regexp '%s' does not match the format '@<prefix>.*'", v.Regex)
+ continue
+ }
+
+ w.namespaces.usernames = append(w.namespaces.usernames, re)
+ // drop the '@' in the prefix
+ w.namespaces.prefixes = append(w.namespaces.prefixes, prefix[1:])
+ }
+ }
+
+ return nil
+}
+
+func (b *Bmatrix) NewAppService() (*AppServiceWrapper, error) {
+ w := &AppServiceWrapper{
+ appService: appservice.Create(),
+ namespaces: AppServiceNamespaces{
+ rooms: []*regexp.Regexp{},
+ usernames: []*regexp.Regexp{},
+ prefixes: []string{},
+ },
+ stop: make(chan struct{}, 1),
+ stopAck: make(chan struct{}, 1),
+ }
+
+ err := w.appService.SetHomeserverURL(b.mc.HomeserverURL.String())
+ if err != nil {
+ return nil, err
+ }
+
+ _, homeServerDomain, _ := b.mc.UserID.Parse()
+ w.appService.HomeserverDomain = homeServerDomain
+ //nolint:exhaustruct
+ w.appService.Host = appservice.HostConfig{
+ Hostname: b.GetString("AppServiceHost"),
+ Port: uint16(b.GetInt("AppServicePort")),
+ }
+ w.appService.Registration, err = appservice.LoadRegistration(b.GetString("AppServiceConfigPath"))
+ if err != nil {
+ return nil, err
+ }
+
+ // forward logs from the appService to the matterbridge logger
+ w.appService.Log = NewZerologWrapper(b.Log)
+
+ if err = w.ParseNamespaces(b.Log); err != nil {
+ return nil, err
+ }
+
+ return w, nil
+}
+
+func (a *AppServiceNamespaces) containsRoom(roomID id.RoomID) bool {
+ // no room specified: we check all the rooms
+ if len(a.rooms) == 0 {
+ return true
+ }
+
+ for _, room := range a.rooms {
+ if room.MatchString(roomID.String()) {
+ return true
+ }
+ }
+
+ return false
+}
+
+// nolint: wrapcheck
+func (b *Bmatrix) startAppService() error {
+ wrapper := b.appService
+ // TODO: detect service completion and rerun automatically
+ go wrapper.appService.Start()
+ b.Log.Debug("appservice launched")
+
+ processor := appservice.NewEventProcessor(wrapper.appService)
+ processor.On(event.EventMessage, func(ev *event.Event) {
+ b.handleEvent(originAppService, ev)
+ })
+ go processor.Start()
+ b.Log.Debug("appservice event dispatcher launched")
+
+ // handle service stopping/restarting
+ go func(appService *appservice.AppService, processor *appservice.EventProcessor) {
+ <-wrapper.stop
+
+ appService.Stop()
+ processor.Stop()
+ wrapper.stopAck <- struct{}{}
+ }(wrapper.appService, processor)
+
+ return nil
+}
diff --git a/bridge/matrix/handlers.go b/bridge/matrix/handlers.go
new file mode 100644
index 00000000..8833b0ed
--- /dev/null
+++ b/bridge/matrix/handlers.go
@@ -0,0 +1,344 @@
+package bmatrix
+
+import (
+ "bytes"
+ "fmt"
+ "mime"
+ "regexp"
+ "strings"
+
+ matrix "maunium.net/go/mautrix"
+ "maunium.net/go/mautrix/event"
+ "maunium.net/go/mautrix/id"
+
+ "github.com/42wim/matterbridge/bridge/config"
+ "github.com/42wim/matterbridge/bridge/helper"
+)
+
+// Determines if the event comes from ourselves, in which case we want to ignore it
+func (b *Bmatrix) ignoreBridgingEvents(ev *event.Event) bool {
+ if ev.Sender == b.UserID {
+ return true
+ }
+
+ // ignore messages we may have sent via the appservice
+ if b.appService != nil {
+ if ev.Sender == b.appService.appService.BotClient().UserID {
+ return true
+ }
+
+ // ignore virtual users messages (we ignore the 'exclusive' field of Namespace for now)
+ for _, username := range b.appService.namespaces.usernames {
+ if username.MatchString(ev.Sender.String()) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+//nolint: funlen
+func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
+ if b.ignoreBridgingEvents(ev) {
+ return
+ }
+
+ b.RLock()
+ channel, ok := b.RoomMap[ev.RoomID]
+ b.RUnlock()
+ if !ok {
+ // we don't know that room yet, that could be a room returned by an
+ // application service, but matterbridge doesn't handle those just yet
+ b.Log.Debugf("Received event for room %s, not joined yet/not handled", ev.RoomID)
+
+ return
+ }
+
+ if ev.Type == event.EphemeralEventReceipt {
+ // we do not support read receipts across servers, considering that
+ // multiple services (e.g. Discord) doesn't expose that information)
+ return
+ }
+
+ if ev.Type == event.StateMember {
+ b.handleMemberChange(ev)
+
+ return
+ }
+
+ // if we receive appservice events for this room, there is no need to check them with the classical syncer
+ if !channel.appService && origin == originAppService {
+ channel.appService = true
+ b.Lock()
+ b.RoomMap[ev.RoomID] = channel
+ b.Unlock()
+ }
+
+ // if we receive messages both via the classical matrix syncer and appserver, prefer appservice and throw away this duplicate event
+ if channel.appService && origin != originAppService {
+ b.Log.Debugf("Dropping event, should receive it via appservice: %s", ev.ID)
+
+ return
+ }
+
+ b.Log.Debugf("== Receiving event: %#v (appService=%t)", ev, origin == originAppService)
+
+ if ev.Type == event.EphemeralEventTyping {
+ typing := ev.Content.AsTyping()
+ if len(typing.UserIDs) > 0 {
+ //nolint:exhaustruct
+ b.Remote <- config.Message{
+ Event: config.EventUserTyping,
+ Channel: channel.name,
+ Account: b.Account,
+ }
+ }
+
+ return
+ }
+
+ defer (func(ev *event.Event) {
+ // not crucial, so no ratelimit check here
+ if err := b.mc.MarkRead(ev.RoomID, ev.ID); err != nil {
+ b.Log.Errorf("couldn't mark message as read %s", err.Error())
+ }
+ })(ev)
+
+ // Create our message
+ //nolint:exhaustruct
+ rmsg := config.Message{
+ Username: b.getDisplayName(ev.RoomID, ev.Sender),
+ Channel: channel.name,
+ Account: b.Account,
+ UserID: string(ev.Sender),
+ ID: string(ev.ID),
+ }
+
+ // Remove homeserver suffix if configured
+ if b.GetBool("NoHomeServerSuffix") {
+ re := regexp.MustCompile("(.*?):.*")
+ rmsg.Username = re.ReplaceAllString(rmsg.Username, `$1`)
+ }
+
+ // Delete event
+ if ev.Type == event.EventRedaction {
+ rmsg.Event = config.EventMsgDelete
+ rmsg.ID = string(ev.Redacts)
+ rmsg.Text = config.EventMsgDelete
+ b.Remote <- rmsg
+
+ return
+ }
+
+ b.handleMessage(rmsg, ev)
+}
+
+func (b *Bmatrix) handleMemberChange(ev *event.Event) {
+ member := ev.Content.AsMember()
+ if member == nil {
+ b.Log.Errorf("Couldn't process a member event:\n%#v", ev)
+
+ return
+ }
+
+ // Update the displayname on join messages, according to https://spec.matrix.org/v1.3/client-server-api/#events-on-change-of-profile-information
+ if member.Membership == event.MembershipJoin {
+ b.cacheDisplayName(ev.RoomID, ev.Sender, member.Displayname)
+ } else if member.Membership == event.MembershipLeave || member.Membership == event.MembershipBan {
+ b.removeDisplayNameFromCache(ev.Sender)
+ }
+}
+
+func (b *Bmatrix) handleMessage(rmsg config.Message, ev *event.Event) {
+ msg := ev.Content.AsMessage()
+ if msg == nil {
+ b.Log.Errorf("matterbridge don't support this event type: %s", ev.Type.Type)
+ b.Log.Debugf("Full event: %#v", ev)
+
+ return
+ }
+
+ rmsg.Text = msg.Body
+
+ // TODO: cache the avatars
+ avatarURL := b.getAvatarURL(ev.Sender)
+ contentURI, err := id.ParseContentURI(avatarURL)
+ if err == nil {
+ avatarURL = b.mc.GetDownloadURL(contentURI)
+ rmsg.Avatar = avatarURL
+ }
+
+ // Do we have a /me action
+ //nolint: exhaustive
+ switch msg.MsgType {
+ case event.MsgEmote:
+ rmsg.Event = config.EventUserAction
+ case event.MsgImage, event.MsgVideo, event.MsgFile:
+ // Do we have attachments? (we only allow images, videos or files msgtypes)
+ err := b.handleDownloadFile(&rmsg, *msg)
+ if err != nil {
+ b.Log.Errorf("download failed: %#v", err)
+ }
+ default:
+ if msg.RelatesTo == nil {
+ break
+ }
+
+ if msg.RelatesTo.Type == event.RelReplace && msg.NewContent != nil {
+ // Is it an edit?
+ rmsg.ID = string(msg.RelatesTo.EventID)
+ rmsg.Text = msg.NewContent.Body
+ } else if msg.RelatesTo.Type == event.RelReference && msg.RelatesTo.InReplyTo != nil {
+ // Is it a reply?
+ body := msg.Body
+ if !b.GetBool("keepquotedreply") {
+ for strings.HasPrefix(body, "> ") {
+ lineIdx := strings.Index(body, "\n\n")
+ if lineIdx == -1 {
+ break
+ }
+
+ body = body[(lineIdx + 2):]
+ }
+ }
+
+ rmsg.ParentID = string(msg.RelatesTo.EventID)
+ rmsg.Text = body
+ }
+ }
+
+ b.Log.Debugf("<= Sending message from %s on %s to gateway", ev.Sender, b.Account)
+ b.Remote <- rmsg
+}
+
+// handleDownloadFile handles file download
+func (b *Bmatrix) handleDownloadFile(rmsg *config.Message, msg event.MessageEventContent) error {
+ rmsg.Extra = make(map[string][]interface{})
+ if msg.URL == "" || msg.Info == nil {
+ b.Log.Error("couldn't download a file with no URL or no file informations (invalid event ?)")
+ b.Log.Debugf("Full Message content:\n%#v", msg)
+ }
+
+ url := strings.ReplaceAll(string(msg.URL), "mxc://", b.GetString("Server")+"/_matrix/media/v1/download/")
+ filename := msg.Body
+
+ // check if we have an image uploaded without extension
+ if !strings.Contains(filename, ".") {
+ mext, _ := mime.ExtensionsByType(msg.Info.MimeType)
+ if len(mext) > 0 {
+ filename += mext[0]
+ } else if msg.MsgType == event.MsgImage {
+ // just a default .png extension if we don't have mime info
+ filename += ".png"
+ }
+ }
+
+ // check if the size is ok
+ err := helper.HandleDownloadSize(b.Log, rmsg, filename, int64(msg.Info.Size), b.General)
+ if err != nil {
+ return err
+ }
+ // actually download the file
+ data, err := helper.DownloadFile(url)
+ if err != nil {
+ return fmt.Errorf("download %s failed %#v", url, err)
+ }
+ // add the downloaded data to the message
+ helper.HandleDownloadData(b.Log, rmsg, filename, "", url, data, b.General)
+ return nil
+}
+
+// handleUploadFiles handles native upload of files.
+func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel id.RoomID) (string, error) {
+ for _, f := range msg.Extra["file"] {
+ if fi, ok := f.(config.FileInfo); ok {
+ b.handleUploadFile(msg, channel, &fi)
+ }
+ }
+ return "", nil
+}
+
+// handleUploadFile handles native upload of a file.
+//nolint: funlen
+func (b *Bmatrix) handleUploadFile(msg *config.Message, channel id.RoomID, fi *config.FileInfo) {
+ content := bytes.NewReader(*fi.Data)
+ sp := strings.Split(fi.Name, ".")
+ mtype := mime.TypeByExtension("." + sp[len(sp)-1])
+
+ // image and video uploads send no username, we have to do this ourself here #715
+ //nolint:exhaustruct
+ m := event.MessageEventContent{
+ MsgType: event.MsgText,
+ Body: fi.Comment,
+ FormattedBody: fi.Comment,
+ }
+
+ _, err := b.sendMessageEventWithRetries(channel, m, msg.Username)
+ if err != nil {
+ b.Log.Errorf("file comment failed: %#v", err)
+ }
+
+ b.Log.Debugf("uploading file: %s %s", fi.Name, mtype)
+
+ var res *matrix.RespMediaUpload
+ //nolint:exhaustruct
+ req := matrix.ReqUploadMedia{
+ Content: content,
+ ContentType: mtype,
+ ContentLength: fi.Size,
+ }
+
+ err = b.retry(func() error {
+ res, err = b.mc.UploadMedia(req)
+
+ return err
+ })
+
+ if err != nil {
+ b.Log.Errorf("file upload failed: %#v", err)
+ return
+ }
+
+ b.Log.Debugf("result: %#v", res)
+
+ //nolint:exhaustruct
+ m = event.MessageEventContent{
+ Body: fi.Name,
+ URL: res.ContentURI.CUString(),
+ }
+
+ switch {
+ case strings.Contains(mtype, "video"):
+ b.Log.Debugf("sendVideo %s", res.ContentURI)
+
+ m.MsgType = event.MsgVideo
+ case strings.Contains(mtype, "image"):
+ b.Log.Debugf("sendImage %s", res.ContentURI)
+
+ m.MsgType = event.MsgImage
+ case strings.Contains(mtype, "audio"):
+ b.Log.Debugf("sendAudio %s", res.ContentURI)
+
+ m.MsgType = event.MsgAudio
+ //nolint:exhaustruct
+ m.Info = &event.FileInfo{
+ MimeType: mtype,
+ Size: len(*fi.Data),
+ }
+ default:
+ b.Log.Debugf("sendFile %s", res.ContentURI)
+
+ m.MsgType = event.MsgFile
+ //nolint:exhaustruct
+ m.Info = &event.FileInfo{
+ MimeType: mtype,
+ Size: len(*fi.Data),
+ }
+ }
+
+ _, err = b.sendMessageEventWithRetries(channel, m, msg.Username)
+ if err != nil {
+ b.Log.Errorf("sending the message referencing the uploaded file failed: %#v", err)
+ }
+}
diff --git a/bridge/matrix/helpers.go b/bridge/matrix/helpers.go
index 5a91f748..93031e1d 100644
--- a/bridge/matrix/helpers.go
+++ b/bridge/matrix/helpers.go
@@ -1,16 +1,21 @@
package bmatrix
import (
- "encoding/json"
"errors"
"fmt"
"html"
- "strings"
+ "sort"
+ "sync"
"time"
- matrix "github.com/matterbridge/gomatrix"
+ matrix "maunium.net/go/mautrix"
+ "maunium.net/go/mautrix/event"
+ "maunium.net/go/mautrix/id"
)
+// arbitrary limit to determine when to cleanup nickname cache entries
+const MaxNumberOfUsersInCache = 50_000
+
func newMatrixUsername(username string) *matrixUsername {
mUsername := new(matrixUsername)
@@ -28,11 +33,11 @@ func newMatrixUsername(username string) *matrixUsername {
}
// getRoomID retrieves a matching room ID from the channel name.
-func (b *Bmatrix) getRoomID(channel string) string {
+func (b *Bmatrix) getRoomID(channelName string) id.RoomID {
b.RLock()
defer b.RUnlock()
- for ID, name := range b.RoomMap {
- if name == channel {
+ for ID, channel := range b.RoomMap {
+ if channelName == channel.name {
return ID
}
}
@@ -40,31 +45,59 @@ func (b *Bmatrix) getRoomID(channel string) string {
return ""
}
-// interface2Struct marshals and immediately unmarshals an interface.
-// Useful for converting map[string]interface{} to a struct.
-func interface2Struct(in interface{}, out interface{}) error {
- jsonObj, err := json.Marshal(in)
- if err != nil {
- return err //nolint:wrapcheck
- }
+type NicknameCacheEntry struct {
+ displayName string
+ lastUpdated time.Time
+ conflictWithOtherUsername bool
+}
- return json.Unmarshal(jsonObj, out)
+type NicknameUserEntry struct {
+ globalEntry *NicknameCacheEntry
+ perChannel map[id.RoomID]NicknameCacheEntry
}
-// getDisplayName retrieves the displayName for mxid, querying the homeserver if the mxid is not in the cache.
-func (b *Bmatrix) getDisplayName(mxid string) string {
- if b.GetBool("UseUserName") {
- return mxid[1:]
+type NicknameCache struct {
+ users map[id.UserID]NicknameUserEntry
+ sync.RWMutex
+}
+
+func NewNicknameCache() *NicknameCache {
+ return &NicknameCache{
+ users: make(map[id.UserID]NicknameUserEntry),
+ RWMutex: sync.RWMutex{},
}
+}
- b.RLock()
- if val, present := b.NicknameMap[mxid]; present {
- b.RUnlock()
+// note: cache is not locked here
+func (c *NicknameCache) retrieveDisplaynameFromCache(channelID id.RoomID, mxid id.UserID) string {
+ var cachedEntry *NicknameCacheEntry = nil
+
+ c.RLock()
+ if user, userPresent := c.users[mxid]; userPresent {
+ // try first the name of the user in the room, then globally
+ if roomCachedEntry, roomPresent := user.perChannel[channelID]; roomPresent {
+ cachedEntry = &roomCachedEntry
+ } else if user.globalEntry != nil {
+ cachedEntry = user.globalEntry
+ }
+ }
+ c.RUnlock()
- return val.displayName
+ if cachedEntry == nil {
+ return ""
+ }
+
+ if cachedEntry.conflictWithOtherUsername {
+ // TODO: the current behavior is that only users with clashing usernames and *that have
+ // spoken since the bridge started* will get their mxids shown, and this doesn't
+ // feel right
+ return fmt.Sprintf("%s (%s)", cachedEntry.displayName, mxid)
}
- b.RUnlock()
+ return cachedEntry.displayName
+}
+
+func (b *Bmatrix) retrieveGlobalDisplayname(mxid id.UserID) string {
displayName, err := b.mc.GetDisplayName(mxid)
var httpError *matrix.HTTPError
if errors.As(err, &httpError) {
@@ -72,127 +105,198 @@ func (b *Bmatrix) getDisplayName(mxid string) string {
}
if err != nil {
- return b.cacheDisplayName(mxid, mxid[1:])
+ return string(mxid)[1:]
}
- return b.cacheDisplayName(mxid, displayName.DisplayName)
+ return displayName.DisplayName
}
-// cacheDisplayName stores the mapping between a mxid and a display name, to be reused later without performing a query to the homserver.
-// Note that old entries are cleaned when this function is called.
-func (b *Bmatrix) cacheDisplayName(mxid string, displayName string) string {
- now := time.Now()
+// getDisplayName retrieves the displayName for mxid, querying the homeserver if the mxid is not in the cache.
+func (b *Bmatrix) getDisplayName(channelID id.RoomID, mxid id.UserID) string {
+ if b.GetBool("UseUserName") {
+ return string(mxid)[1:]
+ }
- // scan to delete old entries, to stop memory usage from becoming too high with old entries.
- // In addition, we also detect if another user have the same username, and if so, we append their mxids to their usernames to differentiate them.
- toDelete := []string{}
- conflict := false
+ displayname := b.NicknameCache.retrieveDisplaynameFromCache(channelID, mxid)
+ if displayname != "" {
+ return displayname
+ }
- b.Lock()
- for mxid, v := range b.NicknameMap {
- // to prevent username reuse across matrix servers - or even on the same server, append
- // the mxid to the username when there is a conflict
- if v.displayName == displayName {
- conflict = true
- // TODO: it would be nice to be able to rename previous messages from this user.
- // The current behavior is that only users with clashing usernames and *that have spoken since the bridge last started* will get their mxids shown, and I don't know if that's the expected behavior.
- v.displayName = fmt.Sprintf("%s (%s)", displayName, mxid)
- b.NicknameMap[mxid] = v
+ // retrieve the global display name
+ return b.cacheDisplayName("", mxid, b.retrieveGlobalDisplayname(mxid))
+}
+
+// scan to delete old entries, to stop memory usage from becoming high with obsolete entries.
+// note: assume the cache is already write-locked
+// TODO: should we update the timestamp when the entry is used?
+func (c *NicknameCache) clearObsoleteEntries(mxid id.UserID) {
+ // we have a "off-by-one" to account for when the user being added to the
+ // cache already have obsolete cache entries, as we want to keep it because
+ // we will be refreshing it in a minute
+ if len(c.users) <= MaxNumberOfUsersInCache+1 {
+ return
+ }
+
+ usersLastTimestamp := make(map[id.UserID]int64, len(c.users))
+ // compute the last updated timestamp entry for each user
+ for mxidIter, NicknameCacheIter := range c.users {
+ userLastTimestamp := time.Unix(0, 0)
+ for _, userInChannelCacheEntry := range NicknameCacheIter.perChannel {
+ if userInChannelCacheEntry.lastUpdated.After(userLastTimestamp) {
+ userLastTimestamp = userInChannelCacheEntry.lastUpdated
+ }
}
- if now.Sub(v.lastUpdated) > 10*time.Minute {
- toDelete = append(toDelete, mxid)
+ if NicknameCacheIter.globalEntry != nil {
+ if NicknameCacheIter.globalEntry.lastUpdated.After(userLastTimestamp) {
+ userLastTimestamp = NicknameCacheIter.globalEntry.lastUpdated
+ }
}
- }
- if conflict {
- displayName = fmt.Sprintf("%s (%s)", displayName, mxid)
+ usersLastTimestamp[mxidIter] = userLastTimestamp.UnixNano()
}
- for _, v := range toDelete {
- delete(b.NicknameMap, v)
+ // get the limit timestamp before which we must clear entries as obsolete
+ sortedTimestamps := make([]int64, 0, len(usersLastTimestamp))
+ for _, value := range usersLastTimestamp {
+ sortedTimestamps = append(sortedTimestamps, value)
}
-
- b.NicknameMap[mxid] = NicknameCacheEntry{
- displayName: displayName,
- lastUpdated: now,
+ sort.Slice(sortedTimestamps, func(i, j int) bool { return sortedTimestamps[i] < sortedTimestamps[j] })
+ limitTimestamp := sortedTimestamps[len(sortedTimestamps)-MaxNumberOfUsersInCache]
+
+ // delete entries older than the limit
+ for mxidIter, timestamp := range usersLastTimestamp {
+ // do not clear the user that we are adding to the cache
+ if timestamp <= limitTimestamp && mxidIter != mxid {
+ delete(c.users, mxidIter)
+ }
}
- b.Unlock()
-
- return displayName
}
-// handleError converts errors into httpError.
-//nolint:exhaustivestruct
-func handleError(err error) *httpError {
- var mErr matrix.HTTPError
- if !errors.As(err, &mErr) {
- return &httpError{
- Err: "not a HTTPError",
+// to prevent username reuse across matrix rooms - or even inside the same room, if a user uses multiple servers -
+// identify users with naming conflicts
+func (c *NicknameCache) detectConflict(mxid id.UserID, displayName string) bool {
+ conflict := false
+
+ for mxidIter, NicknameCacheIter := range c.users {
+ // skip conflict detection against ourselves, obviously
+ if mxidIter == mxid {
+ continue
}
- }
- var httpErr httpError
+ for channelID, userInChannelCacheEntry := range NicknameCacheIter.perChannel {
+ if userInChannelCacheEntry.displayName == displayName {
+ userInChannelCacheEntry.conflictWithOtherUsername = true
+ c.users[mxidIter].perChannel[channelID] = userInChannelCacheEntry
+ conflict = true
+ }
+ }
- if err := json.Unmarshal(mErr.Contents, &httpErr); err != nil {
- return &httpError{
- Err: "unmarshal failed",
+ if NicknameCacheIter.globalEntry != nil && NicknameCacheIter.globalEntry.displayName == displayName {
+ c.users[mxidIter].globalEntry.conflictWithOtherUsername = true
+ conflict = true
}
}
- return &httpErr
+ return conflict
}
-func (b *Bmatrix) containsAttachment(content map[string]interface{}) bool {
- // Skip empty messages
- if content["msgtype"] == nil {
- return false
+// cacheDisplayName stores the mapping between a mxid and a display name, to be reused
+// later without performing a query to the homeserver.
+// Note that old entries are cleaned when this function is called.
+func (b *Bmatrix) cacheDisplayName(channelID id.RoomID, mxid id.UserID, displayName string) string {
+ now := time.Now()
+
+ cache := b.NicknameCache
+
+ cache.Lock()
+ defer cache.Unlock()
+
+ conflict := cache.detectConflict(mxid, displayName)
+
+ cache.clearObsoleteEntries(mxid)
+
+ var newEntry NicknameUserEntry
+ if user, userPresent := cache.users[mxid]; userPresent {
+ newEntry = user
+ } else {
+ newEntry = NicknameUserEntry{
+ globalEntry: nil,
+ perChannel: make(map[id.RoomID]NicknameCacheEntry),
+ }
}
- // Only allow image,video or file msgtypes
- if !(content["msgtype"].(string) == "m.image" ||
- content["msgtype"].(string) == "m.video" ||
- content["msgtype"].(string) == "m.file") {
- return false
+ cacheEntry := NicknameCacheEntry{
+ displayName: displayName,
+ lastUpdated: now,
+ conflictWithOtherUsername: conflict,
}
- return true
+ // this is a local (room-specific) display name, let's cache it as such
+ if channelID == "" {
+ newEntry.globalEntry = &cacheEntry
+ } else {
+ globalDisplayName := b.retrieveGlobalDisplayname(mxid)
+ // updating the global display name or resetting the room name to the global name
+ if globalDisplayName == displayName {
+ delete(newEntry.perChannel, channelID)
+ newEntry.globalEntry = &cacheEntry
+ } else {
+ newEntry.perChannel[channelID] = cacheEntry
+ }
+ }
+
+ cache.users[mxid] = newEntry
+
+ return displayName
}
-// getAvatarURL returns the avatar URL of the specified sender.
-func (b *Bmatrix) getAvatarURL(sender string) string {
- urlPath := b.mc.BuildURL("profile", sender, "avatar_url")
+func (b *Bmatrix) removeDisplayNameFromCache(mxid id.UserID) {
+ cache := b.NicknameCache
- s := struct {
- AvatarURL string `json:"avatar_url"`
- }{}
+ cache.Lock()
+ defer cache.Unlock()
- err := b.mc.MakeRequest("GET", urlPath, nil, &s)
- if err != nil {
- b.Log.Errorf("getAvatarURL failed: %s", err)
+ delete(cache.users, mxid)
+}
+// getAvatarURL returns the avatar URL of the specified sender.
+func (b *Bmatrix) getAvatarURL(sender id.UserID) string {
+ url, err := b.mc.GetAvatarURL(sender)
+ if err != nil {
+ b.Log.Errorf("Couldn't retrieve the URL of the avatar for MXID %s", sender)
return ""
}
- url := strings.ReplaceAll(s.AvatarURL, "mxc://", b.GetString("Server")+"/_matrix/media/r0/thumbnail/")
- if url != "" {
- url += "?width=37&height=37&method=crop"
- }
-
- return url
+ return url.String()
}
// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep
func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) {
- httpErr := handleError(err)
- if httpErr.Errcode != "M_LIMIT_EXCEEDED" {
+ var mErr matrix.HTTPError
+ if !errors.As(err, &mErr) {
+ b.Log.Errorf("Received a non-HTTPError, don't know what to make of it:\n%#v", err)
+ return 0, false
+ }
+
+ if mErr.RespError.ErrCode != "M_LIMIT_EXCEEDED" {
return 0, false
}
- b.Log.Debugf("ratelimited: %s", httpErr.Err)
- b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000)
+ b.Log.Debugf("ratelimited: %s", mErr.RespError.Err)
+
+ // fallback to a one-second delay
+ retryDelayMs := 1000
+
+ if retryDelayString, present := mErr.RespError.ExtraData["retry_after_ms"]; present {
+ if retryDelayInt, correct := retryDelayString.(int); correct && retryDelayInt > retryDelayMs {
+ retryDelayMs = retryDelayInt
+ }
+ }
+
+ b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", retryDelayMs/1000)
- return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true
+ return time.Duration(retryDelayMs) * time.Millisecond, true
}
// retry function will check if we're ratelimited and retries again when backoff time expired
@@ -213,3 +317,66 @@ func (b *Bmatrix) retry(f func() error) error {
}
}
}
+
+type SendMessageEventWrapper struct {
+ inner *matrix.Client
+}
+
+//nolint: wrapcheck
+func (w SendMessageEventWrapper) SendMessageEvent(roomID id.RoomID, eventType event.Type, contentJSON interface{}) (*matrix.RespSendEvent, error) {
+ return w.inner.SendMessageEvent(roomID, eventType, contentJSON)
+}
+
+//nolint: wrapcheck
+func (b *Bmatrix) sendMessageEventWithRetries(channel id.RoomID, message event.MessageEventContent, username string) (string, error) {
+ var (
+ resp *matrix.RespSendEvent
+ client interface {
+ SendMessageEvent(roomID id.RoomID, eventType event.Type, contentJSON interface{}) (resp *matrix.RespSendEvent, err error)
+ }
+ err error
+ )
+
+ b.RLock()
+ appservice := b.RoomMap[channel].appService
+ b.RUnlock()
+
+ client = SendMessageEventWrapper{inner: b.mc}
+
+ // only try to send messages through the app Service *once* we have received
+ // events through it (otherwise we don't really know if the appservice works)
+ // Additionally, even if we're receiving messages in that room via the appService listener,
+ // let's check that the appservice "covers" that room
+ if appservice && b.appService.namespaces.containsRoom(channel) && len(b.appService.namespaces.prefixes) > 0 {
+ b.Log.Debugf("Sending with appService")
+ // we take the first prefix
+ bridgeUserID := fmt.Sprintf("@%s%s:%s", b.appService.namespaces.prefixes[0], id.EncodeUserLocalpart(username), b.appService.appService.HomeserverDomain)
+ intent := b.appService.appService.Intent(id.UserID(bridgeUserID))
+ // if we can't change the display name it's not great but not the end of the world either, ignore it
+ // TODO: do not perform this action on every message, with an in-memory cache or something
+ _ = intent.SetDisplayName(username)
+ client = intent
+ } else {
+ applyUsernametoMessage(&message, username)
+ }
+
+ err = b.retry(func() error {
+ resp, err = client.SendMessageEvent(channel, event.EventMessage, message)
+
+ return err
+ })
+ if err != nil {
+ return "", err
+ }
+
+ return string(resp.EventID), err
+}
+
+func applyUsernametoMessage(newMsg *event.MessageEventContent, username string) {
+ matrixUsername := newMatrixUsername(username)
+
+ newMsg.Body = matrixUsername.plain + newMsg.Body
+ if newMsg.FormattedBody != "" {
+ newMsg.FormattedBody = matrixUsername.formatted + newMsg.FormattedBody
+ }
+}
diff --git a/bridge/matrix/matrix.go b/bridge/matrix/matrix.go
index 49fc33b3..ee2081ff 100644
--- a/bridge/matrix/matrix.go
+++ b/bridge/matrix/matrix.go
@@ -1,18 +1,18 @@
+// nolint: exhaustivestruct
package bmatrix
import (
- "bytes"
"fmt"
- "mime"
"regexp"
- "strings"
"sync"
- "time"
+
+ matrix "maunium.net/go/mautrix"
+ "maunium.net/go/mautrix/event"
+ "maunium.net/go/mautrix/id"
"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
"github.com/42wim/matterbridge/bridge/helper"
- matrix "github.com/matterbridge/gomatrix"
)
var (
@@ -20,25 +20,30 @@ var (
htmlReplacementTag = regexp.MustCompile("<[^>]*>")
)
-type NicknameCacheEntry struct {
- displayName string
- lastUpdated time.Time
+type EventOrigin int
+
+const (
+ originClassicSyncer EventOrigin = iota
+ originAppService
+)
+
+type RoomInfo struct {
+ name string
+ appService bool
}
type Bmatrix struct {
- mc *matrix.Client
- UserID string
- NicknameMap map[string]NicknameCacheEntry
- RoomMap map[string]string
- rateMutex sync.RWMutex
+ mc *matrix.Client
+ UserID id.UserID
+ appService *AppServiceWrapper
+ NicknameCache *NicknameCache
+ RoomMap map[id.RoomID]RoomInfo
+ rateMutex sync.RWMutex
+ joinedRooms []id.RoomID
sync.RWMutex
*bridge.Config
-}
-
-type httpError struct {
- Errcode string `json:"errcode"`
- Err string `json:"error"`
- RetryAfterMs int `json:"retry_after_ms"`
+ stopNormalSync chan struct{}
+ stopNormalSyncAck chan struct{}
}
type matrixUsername struct {
@@ -46,44 +51,12 @@ type matrixUsername struct {
formatted string
}
-// SubTextMessage represents the new content of the message in edit messages.
-type SubTextMessage struct {
- MsgType string `json:"msgtype"`
- Body string `json:"body"`
- FormattedBody string `json:"formatted_body,omitempty"`
- Format string `json:"format,omitempty"`
-}
-
-// MessageRelation explains how the current message relates to a previous message.
-// Notably used for message edits.
-type MessageRelation struct {
- EventID string `json:"event_id"`
- Type string `json:"rel_type"`
-}
-
-type EditedMessage struct {
- NewContent SubTextMessage `json:"m.new_content"`
- RelatedTo MessageRelation `json:"m.relates_to"`
- matrix.TextMessage
-}
-
-type InReplyToRelationContent struct {
- EventID string `json:"event_id"`
-}
-
-type InReplyToRelation struct {
- InReplyTo InReplyToRelationContent `json:"m.in_reply_to"`
-}
-
-type ReplyMessage struct {
- RelatedTo InReplyToRelation `json:"m.relates_to"`
- matrix.TextMessage
-}
-
func New(cfg *bridge.Config) bridge.Bridger {
b := &Bmatrix{Config: cfg}
- b.RoomMap = make(map[string]string)
- b.NicknameMap = make(map[string]NicknameCacheEntry)
+ b.RoomMap = make(map[id.RoomID]RoomInfo)
+ b.NicknameCache = NewNicknameCache()
+ b.stopNormalSync = make(chan struct{}, 1)
+ b.stopNormalSyncAck = make(chan struct{}, 1)
return b
}
@@ -91,13 +64,13 @@ func (b *Bmatrix) Connect() error {
var err error
b.Log.Infof("Connecting %s", b.GetString("Server"))
if b.GetString("MxID") != "" && b.GetString("Token") != "" {
+ b.UserID = id.UserID(b.GetString("MxID"))
b.mc, err = matrix.NewClient(
- b.GetString("Server"), b.GetString("MxID"), b.GetString("Token"),
+ b.GetString("Server"), b.UserID, b.GetString("Token"),
)
if err != nil {
return err
}
- b.UserID = b.GetString("MxID")
b.Log.Info("Using existing Matrix credentials")
} else {
b.mc, err = matrix.NewClient(b.GetString("Server"), "", "")
@@ -105,102 +78,150 @@ func (b *Bmatrix) Connect() error {
return err
}
resp, err := b.mc.Login(&matrix.ReqLogin{
- Type: "m.login.password",
- User: b.GetString("Login"),
- Password: b.GetString("Password"),
- Identifier: matrix.NewUserIdentifier(b.GetString("Login")),
+ Type: matrix.AuthTypePassword,
+ Password: b.GetString("Password"),
+ Identifier: matrix.UserIdentifier{Type: matrix.IdentifierTypeUser, User: b.GetString("Login")}, //nolint: exhaustruct
+ StoreCredentials: true,
})
if err != nil {
return err
}
- b.mc.SetCredentials(resp.UserID, resp.AccessToken)
b.UserID = resp.UserID
b.Log.Info("Connection succeeded")
}
- go b.handlematrix()
+
+ b.Log.Debug("Retrieving the list of rooms we have already joined")
+ joinedRooms, err := b.mc.JoinedRooms()
+ if err != nil {
+ b.Log.Errorf("couldn't list the joined rooms")
+
+ return err
+ }
+ b.joinedRooms = joinedRooms.JoinedRooms
+ for _, roomID := range joinedRooms.JoinedRooms {
+ // leave the channel name (usually a channel alias - in the matrix sense)
+ // unresolved for now, it will be completed when JoinChannel() is called
+ b.RoomMap[roomID] = RoomInfo{name: "", appService: false}
+ }
+
return nil
}
func (b *Bmatrix) Disconnect() error {
+ // tell the Sync() loop to exit
+ b.stopNormalSync <- struct{}{}
+ b.mc.StopSync()
+
+ // wait for both the syncer and the appservice to terminate
+ <-b.stopNormalSyncAck
+ if b.appService != nil {
+ b.appService.stop <- struct{}{}
+ <-b.appService.stopAck
+ }
+
return nil
}
func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error {
- return b.retry(func() error {
- resp, err := b.mc.JoinRoom(channel.Name, "", nil)
+ resolvedAlias, err := b.mc.ResolveAlias(id.RoomAlias(channel.Name))
+ if err != nil {
+ b.Log.Errorf("couldn't retrieve the room ID for the alias '%s'", channel.Name)
+
+ return err
+ }
+
+ roomInfo := RoomInfo{name: channel.Name, appService: false}
+ alreadyJoined := false
+ for _, roomID := range b.joinedRooms {
+ // we have already joined this room (e.g. in a previous execution of matterbridge)
+ // => we only update the room alias, but do not attempt to join it again
+ if roomID == resolvedAlias.RoomID {
+ alreadyJoined = true
+ break
+ }
+ }
+
+ if !alreadyJoined {
+ err = b.retry(func() error {
+ _, innerErr := b.mc.JoinRoom(channel.Name, "", nil)
+ return innerErr
+ })
+
if err != nil {
return err
}
+ }
- b.Lock()
- b.RoomMap[resp.RoomID] = channel.Name
- b.Unlock()
+ b.Lock()
+ b.RoomMap[resolvedAlias.RoomID] = roomInfo
+ b.Unlock()
- return nil
- })
+ return nil
}
-func (b *Bmatrix) Send(msg config.Message) (string, error) {
- b.Log.Debugf("=> Receiving %#v", msg)
-
- channel := b.getRoomID(msg.Channel)
- b.Log.Debugf("Channel %s maps to channel id %s", msg.Channel, channel)
+func (b *Bmatrix) Start() error {
+ // at this point, JoinChannel() has been called on all the channels
+ // declared in the configuration, so we can exit every other joined room
+ // in order to stop receiving events from rooms we no longer follow
+ b.RLock()
+ for _, roomID := range b.joinedRooms {
+ if _, present := b.RoomMap[roomID]; !present {
+ // we deliberately ignore the return value,
+ // because the bridge will still work even if we couln't exit the room
+ _, _ = b.mc.LeaveRoom(roomID, &matrix.ReqLeave{Reason: "No longer bridged"})
+ }
+ }
+ b.RUnlock()
- username := newMatrixUsername(msg.Username)
+ go b.handlematrix()
- body := username.plain + msg.Text
- formattedBody := username.formatted + helper.ParseMarkdown(msg.Text)
+ if b.GetBool("UseAppService") {
+ appService, err := b.NewAppService()
+ if err != nil {
+ b.Log.Errorf("couldn't load the app service configuration: %#v", err)
- if b.GetBool("SpoofUsername") {
- // https://spec.matrix.org/v1.3/client-server-api/#mroommember
- type stateMember struct {
- AvatarURL string `json:"avatar_url,omitempty"`
- DisplayName string `json:"displayname"`
- Membership string `json:"membership"`
+ return err
}
- // TODO: reset username afterwards with DisplayName: null ?
- m := stateMember{
- AvatarURL: "",
- DisplayName: username.plain,
- Membership: "join",
- }
+ b.appService = appService
+ err = b.startAppService()
+ if err != nil {
+ b.Log.Errorf("couldn't start the application service: %#v", err)
- _, err := b.mc.SendStateEvent(channel, "m.room.member", b.UserID, m)
- if err == nil {
- body = msg.Text
- formattedBody = helper.ParseMarkdown(msg.Text)
+ return err
}
}
- // Make a action /me of the message
- if msg.Event == config.EventUserAction {
- m := matrix.TextMessage{
- MsgType: "m.emote",
- Body: body,
- FormattedBody: formattedBody,
- Format: "org.matrix.custom.html",
- }
+ return nil
+}
- if b.GetBool("HTMLDisable") {
- m.Format = ""
- m.FormattedBody = ""
- }
+func (b *Bmatrix) Send(msg config.Message) (string, error) {
+ b.Log.Debugf("=> Sending %#v", msg)
- msgID := ""
+ channel := b.getRoomID(msg.Channel)
+ if channel == "" {
+ return "", fmt.Errorf("got message for unknown channel '%s'", msg.Channel)
+ }
- err := b.retry(func() error {
- resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
- if err != nil {
- return err
- }
+ if msg.Event == config.EventUserTyping && b.GetBool("ShowUserTyping") {
+ _, err := b.mc.UserTyping(channel, true, 15000)
+ return "", err
+ }
- msgID = resp.EventID
+ // Make a action /me of the message
+ if msg.Event == config.EventUserAction {
+ //nolint:exhaustruct
+ m := event.MessageEventContent{
+ MsgType: event.MsgEmote,
+ Body: msg.Text,
+ }
- return err
- })
+ if !b.GetBool("HTMLDisable") {
+ m.FormattedBody = helper.ParseMarkdown(msg.Text)
+ m.Format = event.FormatHTML
+ }
- return msgID, err
+ return b.sendMessageEventWithRetries(channel, m, msg.Username)
}
// Delete message
@@ -212,12 +233,10 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
msgID := ""
err := b.retry(func() error {
- resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{})
- if err != nil {
- return err
- }
+ //nolint:exhaustruct
+ resp, err := b.mc.RedactEvent(channel, id.EventID(msg.ID), matrix.ReqRedact{})
- msgID = resp.EventID
+ msgID = string(resp.EventID)
return err
})
@@ -228,13 +247,13 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
// Upload a file if it exists
if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) {
- rmsg := rmsg
-
- err := b.retry(func() error {
- _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text)
+ //nolint:exhaustruct
+ m := event.MessageEventContent{
+ MsgType: event.MsgText,
+ Body: rmsg.Text,
+ }
- return err
- })
+ _, err := b.sendMessageEventWithRetries(channel, m, msg.Username)
if err != nil {
b.Log.Errorf("sendText failed: %s", err)
}
@@ -247,472 +266,159 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
// Edit message if we have an ID
if msg.ID != "" {
- rmsg := EditedMessage{
- TextMessage: matrix.TextMessage{
- Body: body,
- MsgType: "m.text",
- Format: "org.matrix.custom.html",
- FormattedBody: formattedBody,
- },
+ //nolint:exhaustruct
+ rmsg := event.MessageEventContent{
+ MsgType: event.MsgText,
+ Body: msg.Text,
}
-
- rmsg.NewContent = SubTextMessage{
- Body: rmsg.TextMessage.Body,
- FormattedBody: rmsg.TextMessage.FormattedBody,
- Format: rmsg.TextMessage.Format,
- MsgType: "m.text",
+ //nolint:exhaustruct
+ rmsg.NewContent = &event.MessageEventContent{
+ Body: rmsg.Body,
+ MsgType: event.MsgText,
}
-
if b.GetBool("HTMLDisable") {
- rmsg.TextMessage.Format = ""
- rmsg.TextMessage.FormattedBody = ""
- rmsg.NewContent.Format = ""
- rmsg.NewContent.FormattedBody = ""
+ rmsg.FormattedBody = "* " + msg.Text
+ } else {
+ rmsg.Format = event.FormatHTML
+ rmsg.FormattedBody = "* " + helper.ParseMarkdown(msg.Text)
+ rmsg.NewContent.Format = rmsg.Format
+ rmsg.NewContent.FormattedBody = rmsg.FormattedBody
}
- rmsg.RelatedTo = MessageRelation{
- EventID: msg.ID,
- Type: "m.replace",
+ //nolint:exhaustruct
+ rmsg.RelatesTo = &event.RelatesTo{
+ EventID: id.EventID(msg.ID),
+ Type: event.RelReplace,
}
- err := b.retry(func() error {
- _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg)
+ return b.sendMessageEventWithRetries(channel, rmsg, msg.Username)
+ }
- return err
- })
- if err != nil {
- return "", err
- }
+ //nolint:exhaustruct
+ m := event.MessageEventContent{
+ Body: msg.Text,
+ }
- return msg.ID, nil
+ if !b.GetBool("HTMLDisable") {
+ m.Format = event.FormatHTML
+ m.FormattedBody = msg.Text
}
// Use notices to send join/leave events
if msg.Event == config.EventJoinLeave {
- m := matrix.TextMessage{
- MsgType: "m.notice",
- Body: body,
- FormattedBody: formattedBody,
- Format: "org.matrix.custom.html",
- }
-
+ m.MsgType = event.MsgNotice
+ } else {
+ m.MsgType = event.MsgText
if b.GetBool("HTMLDisable") {
- m.Format = ""
m.FormattedBody = ""
+ } else {
+ m.FormattedBody = helper.ParseMarkdown(msg.Text)
}
- var (
- resp *matrix.RespSendEvent
- err error
- )
-
- err = b.retry(func() error {
- resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m)
-
- return err
- })
- if err != nil {
- return "", err
- }
-
- return resp.EventID, err
- }
-
- if msg.ParentValid() {
- m := ReplyMessage{
- TextMessage: matrix.TextMessage{
- MsgType: "m.text",
- Body: body,
- FormattedBody: formattedBody,
- Format: "org.matrix.custom.html",
- },
- }
-
- if b.GetBool("HTMLDisable") {
- m.TextMessage.Format = ""
- m.TextMessage.FormattedBody = ""
- }
-
- m.RelatedTo = InReplyToRelation{
- InReplyTo: InReplyToRelationContent{
- EventID: msg.ParentID,
- },
- }
-
- var (
- resp *matrix.RespSendEvent
- err error
- )
-
- err = b.retry(func() error {
- resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m)
-
- return err
- })
- if err != nil {
- return "", err
- }
-
- return resp.EventID, err
- }
-
- if b.GetBool("HTMLDisable") {
- var (
- resp *matrix.RespSendEvent
- err error
- )
-
- err = b.retry(func() error {
- resp, err = b.mc.SendText(channel, body)
-
- return err
- })
- if err != nil {
- return "", err
- }
-
- return resp.EventID, err
- }
-
- // Post normal message with HTML support (eg riot.im)
- var (
- resp *matrix.RespSendEvent
- err error
- )
-
- err = b.retry(func() error {
- resp, err = b.mc.SendFormattedText(channel, body, formattedBody)
-
- return err
- })
- if err != nil {
- return "", err
- }
-
- return resp.EventID, err
-}
-
-func (b *Bmatrix) handlematrix() {
- syncer := b.mc.Syncer.(*matrix.DefaultSyncer)
- syncer.OnEventType("m.room.redaction", b.handleEvent)
- syncer.OnEventType("m.room.message", b.handleEvent)
- syncer.OnEventType("m.room.member", b.handleMemberChange)
- go func() {
- for {
- if b == nil {
- return
- }
- if err := b.mc.Sync(); err != nil {
- b.Log.Println("Sync() returned ", err)
+ if msg.ParentValid() {
+ m.RelatesTo = &event.RelatesTo{
+ EventID: "",
+ Type: event.RelReference,
+ InReplyTo: &event.InReplyTo{
+ EventID: id.EventID(msg.ParentID),
+ },
+ Key: "",
}
}
- }()
-}
-
-func (b *Bmatrix) handleEdit(ev *matrix.Event, rmsg config.Message) bool {
- relationInterface, present := ev.Content["m.relates_to"]
- newContentInterface, present2 := ev.Content["m.new_content"]
- if !(present && present2) {
- return false
}
- var relation MessageRelation
- if err := interface2Struct(relationInterface, &relation); err != nil {
- b.Log.Warnf("Couldn't parse 'm.relates_to' object with value %#v", relationInterface)
- return false
- }
-
- var newContent SubTextMessage
- if err := interface2Struct(newContentInterface, &newContent); err != nil {
- b.Log.Warnf("Couldn't parse 'm.new_content' object with value %#v", newContentInterface)
- return false
- }
-
- if relation.Type != "m.replace" {
- return false
- }
-
- rmsg.ID = relation.EventID
- rmsg.Text = newContent.Body
- b.Remote <- rmsg
-
- return true
+ return b.sendMessageEventWithRetries(channel, m, msg.Username)
}
-func (b *Bmatrix) handleReply(ev *matrix.Event, rmsg config.Message) bool {
- relationInterface, present := ev.Content["m.relates_to"]
- if !present {
- return false
- }
-
- var relation InReplyToRelation
- if err := interface2Struct(relationInterface, &relation); err != nil {
- // probably fine
- return false
- }
-
- body := rmsg.Text
-
- if !b.GetBool("keepquotedreply") {
- for strings.HasPrefix(body, "> ") {
- lineIdx := strings.IndexRune(body, '\n')
- if lineIdx == -1 {
- body = ""
- } else {
- body = body[(lineIdx + 1):]
+// DontProcessOldEvents returns true if a sync event should be considered for further processing.
+// We use that function to filter out events we have already read.
+func (b *Bmatrix) DontProcessOldEvents(resp *matrix.RespSync, since string) bool {
+ // we only filter old events in the initial sync(), because subsequent sync()
+ // (where since != "") should only return new events
+ if since != "" {
+ return true
+ }
+
+ for joinedRoom, roomData := range resp.Rooms.Join {
+ var readTimestamp int64 = 0
+ // retrieve the timestamp of the last read receipt
+ // note: we're not sure some events will not be thrown away in this
+ // initial sync, as the server may not have received some events yet when
+ // the read receipt was sent: there is a mix of timestamps between
+ // the read receipt on the target homeserver and the timestamps when
+ // events were *created* on the homeserver peers
+ for _, evt := range roomData.Ephemeral.Events {
+ if evt.Type != event.EphemeralEventReceipt {
+ continue
}
- }
- }
-
- rmsg.Text = body
- rmsg.ParentID = relation.InReplyTo.EventID
- b.Remote <- rmsg
-
- return true
-}
-
-func (b *Bmatrix) handleMemberChange(ev *matrix.Event) {
- // Update the displayname on join messages, according to https://matrix.org/docs/spec/client_server/r0.6.1#events-on-change-of-profile-information
- if ev.Content["membership"] == "join" {
- if dn, ok := ev.Content["displayname"].(string); ok {
- b.cacheDisplayName(ev.Sender, dn)
- }
- }
-}
-
-func (b *Bmatrix) handleEvent(ev *matrix.Event) {
- b.Log.Debugf("== Receiving event: %#v", ev)
- if ev.Sender != b.UserID {
- b.RLock()
- channel, ok := b.RoomMap[ev.RoomID]
- b.RUnlock()
- if !ok {
- b.Log.Debugf("Unknown room %s", ev.RoomID)
- return
- }
-
- // Create our message
- rmsg := config.Message{
- Username: b.getDisplayName(ev.Sender),
- Channel: channel,
- Account: b.Account,
- UserID: ev.Sender,
- ID: ev.ID,
- Avatar: b.getAvatarURL(ev.Sender),
- }
-
- // Remove homeserver suffix if configured
- if b.GetBool("NoHomeServerSuffix") {
- re := regexp.MustCompile("(.*?):.*")
- rmsg.Username = re.ReplaceAllString(rmsg.Username, `$1`)
- }
-
- // Delete event
- if ev.Type == "m.room.redaction" {
- rmsg.Event = config.EventMsgDelete
- rmsg.ID = ev.Redacts
- rmsg.Text = config.EventMsgDelete
- b.Remote <- rmsg
- return
- }
-
- // Text must be a string
- if rmsg.Text, ok = ev.Content["body"].(string); !ok {
- b.Log.Errorf("Content[body] is not a string: %T\n%#v",
- ev.Content["body"], ev.Content)
- return
- }
-
- // Do we have a /me action
- if ev.Content["msgtype"].(string) == "m.emote" {
- rmsg.Event = config.EventUserAction
- }
-
- // Is it an edit?
- if b.handleEdit(ev, rmsg) {
- return
- }
-
- // Is it a reply?
- if b.handleReply(ev, rmsg) {
- return
- }
- // Do we have attachments
- if b.containsAttachment(ev.Content) {
- err := b.handleDownloadFile(&rmsg, ev.Content)
+ err := evt.Content.ParseRaw(evt.Type)
if err != nil {
- b.Log.Errorf("download failed: %#v", err)
+ b.Log.Warnf("couldn't parse receipt event %#v", evt.Content)
+ }
+ receipts := *evt.Content.AsReceipt()
+ for _, receiptByType := range receipts {
+ for _, receiptsByUser := range receiptByType {
+ for userID, userReceipt := range receiptsByUser {
+ // ignore read receipts of other users
+ if userID != b.UserID {
+ continue
+ }
+
+ readTimestamp = userReceipt.Timestamp.UnixNano()
+ }
+ }
}
}
- b.Log.Debugf("<= Sending message from %s on %s to gateway", ev.Sender, b.Account)
- b.Remote <- rmsg
-
- // not crucial, so no ratelimit check here
- if err := b.mc.MarkRead(ev.RoomID, ev.ID); err != nil {
- b.Log.Errorf("couldn't mark message as read %s", err.Error())
- }
- }
-}
-
-// handleDownloadFile handles file download
-func (b *Bmatrix) handleDownloadFile(rmsg *config.Message, content map[string]interface{}) error {
- var (
- ok bool
- url, name, msgtype, mtype string
- info map[string]interface{}
- size float64
- )
-
- rmsg.Extra = make(map[string][]interface{})
- if url, ok = content["url"].(string); !ok {
- return fmt.Errorf("url isn't a %T", url)
- }
- url = strings.Replace(url, "mxc://", b.GetString("Server")+"/_matrix/media/v1/download/", -1)
-
- if info, ok = content["info"].(map[string]interface{}); !ok {
- return fmt.Errorf("info isn't a %T", info)
- }
- if size, ok = info["size"].(float64); !ok {
- return fmt.Errorf("size isn't a %T", size)
- }
- if name, ok = content["body"].(string); !ok {
- return fmt.Errorf("name isn't a %T", name)
- }
- if msgtype, ok = content["msgtype"].(string); !ok {
- return fmt.Errorf("msgtype isn't a %T", msgtype)
- }
- if mtype, ok = info["mimetype"].(string); !ok {
- return fmt.Errorf("mtype isn't a %T", mtype)
- }
-
- // check if we have an image uploaded without extension
- if !strings.Contains(name, ".") {
- if msgtype == "m.image" {
- mext, _ := mime.ExtensionsByType(mtype)
- if len(mext) > 0 {
- name += mext[0]
+ newEventList := make([]*event.Event, 0, len(roomData.Timeline.Events))
+ for _, evt := range roomData.Timeline.Events {
+ // remove old event, except for state changes
+ if evt.Timestamp > readTimestamp || evt.Type.Class == event.StateEventType {
+ newEventList = append(newEventList, evt)
}
- } else {
- // just a default .png extension if we don't have mime info
- name += ".png"
}
- }
-
- // check if the size is ok
- err := helper.HandleDownloadSize(b.Log, rmsg, name, int64(size), b.General)
- if err != nil {
- return err
- }
- // actually download the file
- data, err := helper.DownloadFile(url)
- if err != nil {
- return fmt.Errorf("download %s failed %#v", url, err)
- }
- // add the downloaded data to the message
- helper.HandleDownloadData(b.Log, rmsg, name, "", url, data, b.General)
- return nil
-}
-// handleUploadFiles handles native upload of files.
-func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel string) (string, error) {
- for _, f := range msg.Extra["file"] {
- if fi, ok := f.(config.FileInfo); ok {
- b.handleUploadFile(msg, channel, &fi)
- }
+ roomData.Timeline.Events = newEventList
+ resp.Rooms.Join[joinedRoom] = roomData
}
- return "", nil
+ return true
}
-// handleUploadFile handles native upload of a file.
-func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *config.FileInfo) {
- username := newMatrixUsername(msg.Username)
- content := bytes.NewReader(*fi.Data)
- sp := strings.Split(fi.Name, ".")
- mtype := mime.TypeByExtension("." + sp[len(sp)-1])
- // image and video uploads send no username, we have to do this ourself here #715
- err := b.retry(func() error {
- _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment)
-
- return err
- })
- if err != nil {
- b.Log.Errorf("file comment failed: %#v", err)
- }
-
- b.Log.Debugf("uploading file: %s %s", fi.Name, mtype)
-
- var res *matrix.RespMediaUpload
-
- err = b.retry(func() error {
- res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))
-
- return err
- })
+func (b *Bmatrix) handlematrix() {
+ syncer, ok := b.mc.Syncer.(*matrix.DefaultSyncer)
+ if !ok {
+ b.Log.Errorf("couldn't convert the Syncer object to a DefaultSyncer structure, the matrix bridge won't work")
- if err != nil {
- b.Log.Errorf("file upload failed: %#v", err)
return
}
- switch {
- case strings.Contains(mtype, "video"):
- b.Log.Debugf("sendVideo %s", res.ContentURI)
- err = b.retry(func() error {
- _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI)
+ // register our custom filtering function
+ syncer.OnSync(b.DontProcessOldEvents)
- return err
+ eventsTypes := []event.Type{event.EventRedaction, event.EventMessage, event.StateMember, event.EphemeralEventReceipt}
+ if b.GetBool("ShowUserTyping") {
+ eventsTypes = append(eventsTypes, event.EphemeralEventTyping)
+ }
+ for _, evType := range eventsTypes {
+ syncer.OnEventType(evType, func(source matrix.EventSource, ev *event.Event) {
+ b.handleEvent(originClassicSyncer, ev)
})
- if err != nil {
- b.Log.Errorf("sendVideo failed: %#v", err)
- }
- case strings.Contains(mtype, "image"):
- b.Log.Debugf("sendImage %s", res.ContentURI)
- err = b.retry(func() error {
- _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI)
+ }
- return err
- })
- if err != nil {
- b.Log.Errorf("sendImage failed: %#v", err)
- }
- case strings.Contains(mtype, "audio"):
- b.Log.Debugf("sendAudio %s", res.ContentURI)
- err = b.retry(func() error {
- _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{
- MsgType: "m.audio",
- Body: fi.Name,
- URL: res.ContentURI,
- Info: matrix.AudioInfo{
- Mimetype: mtype,
- Size: uint(len(*fi.Data)),
- },
- })
+ go func() {
+ for {
+ select {
+ case <-b.stopNormalSync:
+ b.stopNormalSyncAck <- struct{}{}
- return err
- })
- if err != nil {
- b.Log.Errorf("sendAudio failed: %#v", err)
- }
- default:
- b.Log.Debugf("sendFile %s", res.ContentURI)
- err = b.retry(func() error {
- _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{
- MsgType: "m.file",
- Body: fi.Name,
- URL: res.ContentURI,
- Info: matrix.FileInfo{
- Mimetype: mtype,
- Size: uint(len(*fi.Data)),
- },
- })
+ return
+ default:
- return err
- })
- if err != nil {
- b.Log.Errorf("sendFile failed: %#v", err)
+ if err := b.mc.Sync(); err != nil {
+ b.Log.Warningf("Sync() returned %#v", err)
+ }
+ }
}
- }
- b.Log.Debugf("result: %#v", res)
+ }()
}
diff --git a/bridge/matrix/zerolog_abstraction.go b/bridge/matrix/zerolog_abstraction.go
new file mode 100644
index 00000000..3d74a4e0
--- /dev/null
+++ b/bridge/matrix/zerolog_abstraction.go
@@ -0,0 +1,43 @@
+package bmatrix
+
+import (
+ "errors"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/rs/zerolog"
+)
+
+var levels_zerolog2logrus = map[zerolog.Level]logrus.Level{
+ zerolog.DebugLevel: logrus.DebugLevel,
+ zerolog.InfoLevel: logrus.InfoLevel,
+ zerolog.WarnLevel: logrus.WarnLevel,
+ zerolog.FatalLevel: logrus.FatalLevel,
+ zerolog.PanicLevel: logrus.PanicLevel,
+ zerolog.ErrorLevel: logrus.ErrorLevel,
+ zerolog.TraceLevel: logrus.TraceLevel,
+}
+
+// an abstraction for zerolog so we can pipe its output to logrus.Entry, that is used in matterbridge
+type zerologWrapper struct {
+ inner *logrus.Entry
+}
+
+func (w zerologWrapper) Write(p []byte) (n int, err error) {
+ return w.inner.Logger.Writer().Write(p)
+}
+
+func (w zerologWrapper) WriteLevel(level zerolog.Level, p []byte) (n int, err error) {
+ if logrus_level, present := levels_zerolog2logrus[level]; present {
+ return w.inner.Logger.WriterLevel(logrus_level).Write(p)
+ }
+ // drop the message if we haven't a matching level
+ return 0, errors.New("Unsupported logging level")
+}
+
+func NewZerologWrapper(entry *logrus.Entry) zerolog.Logger {
+ wrapper := zerologWrapper{inner: entry}
+ log := zerolog.New(wrapper)
+
+ return log
+}