diff options
author | msglm <msglm@techchud.xyz> | 2023-10-27 07:08:25 -0500 |
---|---|---|
committer | msglm <msglm@techchud.xyz> | 2023-10-27 07:08:25 -0500 |
commit | 032a7e0c1188d3507b8d9a9571f2446a43cf775b (patch) | |
tree | 2bd38c01bc7761a6195e426082ce7191ebc765a1 /bridge/matrix | |
parent | 56e7bd01ca09ad52b0c4f48f146a20a4f1b78696 (diff) | |
download | matterbridge-msglm-032a7e0c1188d3507b8d9a9571f2446a43cf775b.tar.gz matterbridge-msglm-032a7e0c1188d3507b8d9a9571f2446a43cf775b.tar.bz2 matterbridge-msglm-032a7e0c1188d3507b8d9a9571f2446a43cf775b.zip |
apply https://github.com/42wim/matterbridge/pull/1864v1.26.0+0.1.0
Diffstat (limited to 'bridge/matrix')
-rw-r--r-- | bridge/matrix/appservice.go | 147 | ||||
-rw-r--r-- | bridge/matrix/handlers.go | 344 | ||||
-rw-r--r-- | bridge/matrix/helpers.go | 367 | ||||
-rw-r--r-- | bridge/matrix/matrix.go | 806 | ||||
-rw-r--r-- | bridge/matrix/zerolog_abstraction.go | 43 |
5 files changed, 1057 insertions, 650 deletions
diff --git a/bridge/matrix/appservice.go b/bridge/matrix/appservice.go new file mode 100644 index 00000000..ef172bff --- /dev/null +++ b/bridge/matrix/appservice.go @@ -0,0 +1,147 @@ +package bmatrix + +import ( + "fmt" + "regexp" + + "github.com/sirupsen/logrus" + + "maunium.net/go/mautrix/appservice" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +type AppServiceNamespaces struct { + rooms []*regexp.Regexp + usernames []*regexp.Regexp + prefixes []string +} + +type AppServiceWrapper struct { + appService *appservice.AppService + namespaces AppServiceNamespaces + stop chan struct{} + stopAck chan struct{} +} + +func (w *AppServiceWrapper) ParseNamespaces(logger *logrus.Entry) error { + if w.appService.Registration != nil { + // TODO: handle non-exclusive registrations + for _, v := range w.appService.Registration.Namespaces.RoomIDs { + re, err := regexp.Compile(v.Regex) + if err != nil { + logger.Warnf("couldn't parse the appservice regex '%s'", v.Regex) + continue + } + + w.namespaces.rooms = append(w.namespaces.rooms, re) + } + + for _, v := range w.appService.Registration.Namespaces.UserIDs { + re, err := regexp.Compile(v.Regex) + if err != nil { + logger.Warnf("couldn't parse the appservice regex '%s'", v.Regex) + continue + } + + // we assume that the user regexes will be of the form '@<some prefix>.*' + // where '.*' will be replaced by the username we spoof + prefix, _ := re.LiteralPrefix() + if prefix == "" || prefix == "@" { + logger.Warnf("couldn't find an acceptable prefix in the appservice regex '%s'", v.Regex) + continue + } + + if v.Regex != fmt.Sprintf("%s.*", prefix) { + logger.Warnf("complex regexpes are not supported for appServices, the regexp '%s' does not match the format '@<prefix>.*'", v.Regex) + continue + } + + w.namespaces.usernames = append(w.namespaces.usernames, re) + // drop the '@' in the prefix + w.namespaces.prefixes = append(w.namespaces.prefixes, prefix[1:]) + } + } + + return nil +} + +func (b *Bmatrix) NewAppService() (*AppServiceWrapper, error) { + w := &AppServiceWrapper{ + appService: appservice.Create(), + namespaces: AppServiceNamespaces{ + rooms: []*regexp.Regexp{}, + usernames: []*regexp.Regexp{}, + prefixes: []string{}, + }, + stop: make(chan struct{}, 1), + stopAck: make(chan struct{}, 1), + } + + err := w.appService.SetHomeserverURL(b.mc.HomeserverURL.String()) + if err != nil { + return nil, err + } + + _, homeServerDomain, _ := b.mc.UserID.Parse() + w.appService.HomeserverDomain = homeServerDomain + //nolint:exhaustruct + w.appService.Host = appservice.HostConfig{ + Hostname: b.GetString("AppServiceHost"), + Port: uint16(b.GetInt("AppServicePort")), + } + w.appService.Registration, err = appservice.LoadRegistration(b.GetString("AppServiceConfigPath")) + if err != nil { + return nil, err + } + + // forward logs from the appService to the matterbridge logger + w.appService.Log = NewZerologWrapper(b.Log) + + if err = w.ParseNamespaces(b.Log); err != nil { + return nil, err + } + + return w, nil +} + +func (a *AppServiceNamespaces) containsRoom(roomID id.RoomID) bool { + // no room specified: we check all the rooms + if len(a.rooms) == 0 { + return true + } + + for _, room := range a.rooms { + if room.MatchString(roomID.String()) { + return true + } + } + + return false +} + +// nolint: wrapcheck +func (b *Bmatrix) startAppService() error { + wrapper := b.appService + // TODO: detect service completion and rerun automatically + go wrapper.appService.Start() + b.Log.Debug("appservice launched") + + processor := appservice.NewEventProcessor(wrapper.appService) + processor.On(event.EventMessage, func(ev *event.Event) { + b.handleEvent(originAppService, ev) + }) + go processor.Start() + b.Log.Debug("appservice event dispatcher launched") + + // handle service stopping/restarting + go func(appService *appservice.AppService, processor *appservice.EventProcessor) { + <-wrapper.stop + + appService.Stop() + processor.Stop() + wrapper.stopAck <- struct{}{} + }(wrapper.appService, processor) + + return nil +} diff --git a/bridge/matrix/handlers.go b/bridge/matrix/handlers.go new file mode 100644 index 00000000..8833b0ed --- /dev/null +++ b/bridge/matrix/handlers.go @@ -0,0 +1,344 @@ +package bmatrix + +import ( + "bytes" + "fmt" + "mime" + "regexp" + "strings" + + matrix "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" + + "github.com/42wim/matterbridge/bridge/config" + "github.com/42wim/matterbridge/bridge/helper" +) + +// Determines if the event comes from ourselves, in which case we want to ignore it +func (b *Bmatrix) ignoreBridgingEvents(ev *event.Event) bool { + if ev.Sender == b.UserID { + return true + } + + // ignore messages we may have sent via the appservice + if b.appService != nil { + if ev.Sender == b.appService.appService.BotClient().UserID { + return true + } + + // ignore virtual users messages (we ignore the 'exclusive' field of Namespace for now) + for _, username := range b.appService.namespaces.usernames { + if username.MatchString(ev.Sender.String()) { + return true + } + } + } + + return false +} + +//nolint: funlen +func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) { + if b.ignoreBridgingEvents(ev) { + return + } + + b.RLock() + channel, ok := b.RoomMap[ev.RoomID] + b.RUnlock() + if !ok { + // we don't know that room yet, that could be a room returned by an + // application service, but matterbridge doesn't handle those just yet + b.Log.Debugf("Received event for room %s, not joined yet/not handled", ev.RoomID) + + return + } + + if ev.Type == event.EphemeralEventReceipt { + // we do not support read receipts across servers, considering that + // multiple services (e.g. Discord) doesn't expose that information) + return + } + + if ev.Type == event.StateMember { + b.handleMemberChange(ev) + + return + } + + // if we receive appservice events for this room, there is no need to check them with the classical syncer + if !channel.appService && origin == originAppService { + channel.appService = true + b.Lock() + b.RoomMap[ev.RoomID] = channel + b.Unlock() + } + + // if we receive messages both via the classical matrix syncer and appserver, prefer appservice and throw away this duplicate event + if channel.appService && origin != originAppService { + b.Log.Debugf("Dropping event, should receive it via appservice: %s", ev.ID) + + return + } + + b.Log.Debugf("== Receiving event: %#v (appService=%t)", ev, origin == originAppService) + + if ev.Type == event.EphemeralEventTyping { + typing := ev.Content.AsTyping() + if len(typing.UserIDs) > 0 { + //nolint:exhaustruct + b.Remote <- config.Message{ + Event: config.EventUserTyping, + Channel: channel.name, + Account: b.Account, + } + } + + return + } + + defer (func(ev *event.Event) { + // not crucial, so no ratelimit check here + if err := b.mc.MarkRead(ev.RoomID, ev.ID); err != nil { + b.Log.Errorf("couldn't mark message as read %s", err.Error()) + } + })(ev) + + // Create our message + //nolint:exhaustruct + rmsg := config.Message{ + Username: b.getDisplayName(ev.RoomID, ev.Sender), + Channel: channel.name, + Account: b.Account, + UserID: string(ev.Sender), + ID: string(ev.ID), + } + + // Remove homeserver suffix if configured + if b.GetBool("NoHomeServerSuffix") { + re := regexp.MustCompile("(.*?):.*") + rmsg.Username = re.ReplaceAllString(rmsg.Username, `$1`) + } + + // Delete event + if ev.Type == event.EventRedaction { + rmsg.Event = config.EventMsgDelete + rmsg.ID = string(ev.Redacts) + rmsg.Text = config.EventMsgDelete + b.Remote <- rmsg + + return + } + + b.handleMessage(rmsg, ev) +} + +func (b *Bmatrix) handleMemberChange(ev *event.Event) { + member := ev.Content.AsMember() + if member == nil { + b.Log.Errorf("Couldn't process a member event:\n%#v", ev) + + return + } + + // Update the displayname on join messages, according to https://spec.matrix.org/v1.3/client-server-api/#events-on-change-of-profile-information + if member.Membership == event.MembershipJoin { + b.cacheDisplayName(ev.RoomID, ev.Sender, member.Displayname) + } else if member.Membership == event.MembershipLeave || member.Membership == event.MembershipBan { + b.removeDisplayNameFromCache(ev.Sender) + } +} + +func (b *Bmatrix) handleMessage(rmsg config.Message, ev *event.Event) { + msg := ev.Content.AsMessage() + if msg == nil { + b.Log.Errorf("matterbridge don't support this event type: %s", ev.Type.Type) + b.Log.Debugf("Full event: %#v", ev) + + return + } + + rmsg.Text = msg.Body + + // TODO: cache the avatars + avatarURL := b.getAvatarURL(ev.Sender) + contentURI, err := id.ParseContentURI(avatarURL) + if err == nil { + avatarURL = b.mc.GetDownloadURL(contentURI) + rmsg.Avatar = avatarURL + } + + // Do we have a /me action + //nolint: exhaustive + switch msg.MsgType { + case event.MsgEmote: + rmsg.Event = config.EventUserAction + case event.MsgImage, event.MsgVideo, event.MsgFile: + // Do we have attachments? (we only allow images, videos or files msgtypes) + err := b.handleDownloadFile(&rmsg, *msg) + if err != nil { + b.Log.Errorf("download failed: %#v", err) + } + default: + if msg.RelatesTo == nil { + break + } + + if msg.RelatesTo.Type == event.RelReplace && msg.NewContent != nil { + // Is it an edit? + rmsg.ID = string(msg.RelatesTo.EventID) + rmsg.Text = msg.NewContent.Body + } else if msg.RelatesTo.Type == event.RelReference && msg.RelatesTo.InReplyTo != nil { + // Is it a reply? + body := msg.Body + if !b.GetBool("keepquotedreply") { + for strings.HasPrefix(body, "> ") { + lineIdx := strings.Index(body, "\n\n") + if lineIdx == -1 { + break + } + + body = body[(lineIdx + 2):] + } + } + + rmsg.ParentID = string(msg.RelatesTo.EventID) + rmsg.Text = body + } + } + + b.Log.Debugf("<= Sending message from %s on %s to gateway", ev.Sender, b.Account) + b.Remote <- rmsg +} + +// handleDownloadFile handles file download +func (b *Bmatrix) handleDownloadFile(rmsg *config.Message, msg event.MessageEventContent) error { + rmsg.Extra = make(map[string][]interface{}) + if msg.URL == "" || msg.Info == nil { + b.Log.Error("couldn't download a file with no URL or no file informations (invalid event ?)") + b.Log.Debugf("Full Message content:\n%#v", msg) + } + + url := strings.ReplaceAll(string(msg.URL), "mxc://", b.GetString("Server")+"/_matrix/media/v1/download/") + filename := msg.Body + + // check if we have an image uploaded without extension + if !strings.Contains(filename, ".") { + mext, _ := mime.ExtensionsByType(msg.Info.MimeType) + if len(mext) > 0 { + filename += mext[0] + } else if msg.MsgType == event.MsgImage { + // just a default .png extension if we don't have mime info + filename += ".png" + } + } + + // check if the size is ok + err := helper.HandleDownloadSize(b.Log, rmsg, filename, int64(msg.Info.Size), b.General) + if err != nil { + return err + } + // actually download the file + data, err := helper.DownloadFile(url) + if err != nil { + return fmt.Errorf("download %s failed %#v", url, err) + } + // add the downloaded data to the message + helper.HandleDownloadData(b.Log, rmsg, filename, "", url, data, b.General) + return nil +} + +// handleUploadFiles handles native upload of files. +func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel id.RoomID) (string, error) { + for _, f := range msg.Extra["file"] { + if fi, ok := f.(config.FileInfo); ok { + b.handleUploadFile(msg, channel, &fi) + } + } + return "", nil +} + +// handleUploadFile handles native upload of a file. +//nolint: funlen +func (b *Bmatrix) handleUploadFile(msg *config.Message, channel id.RoomID, fi *config.FileInfo) { + content := bytes.NewReader(*fi.Data) + sp := strings.Split(fi.Name, ".") + mtype := mime.TypeByExtension("." + sp[len(sp)-1]) + + // image and video uploads send no username, we have to do this ourself here #715 + //nolint:exhaustruct + m := event.MessageEventContent{ + MsgType: event.MsgText, + Body: fi.Comment, + FormattedBody: fi.Comment, + } + + _, err := b.sendMessageEventWithRetries(channel, m, msg.Username) + if err != nil { + b.Log.Errorf("file comment failed: %#v", err) + } + + b.Log.Debugf("uploading file: %s %s", fi.Name, mtype) + + var res *matrix.RespMediaUpload + //nolint:exhaustruct + req := matrix.ReqUploadMedia{ + Content: content, + ContentType: mtype, + ContentLength: fi.Size, + } + + err = b.retry(func() error { + res, err = b.mc.UploadMedia(req) + + return err + }) + + if err != nil { + b.Log.Errorf("file upload failed: %#v", err) + return + } + + b.Log.Debugf("result: %#v", res) + + //nolint:exhaustruct + m = event.MessageEventContent{ + Body: fi.Name, + URL: res.ContentURI.CUString(), + } + + switch { + case strings.Contains(mtype, "video"): + b.Log.Debugf("sendVideo %s", res.ContentURI) + + m.MsgType = event.MsgVideo + case strings.Contains(mtype, "image"): + b.Log.Debugf("sendImage %s", res.ContentURI) + + m.MsgType = event.MsgImage + case strings.Contains(mtype, "audio"): + b.Log.Debugf("sendAudio %s", res.ContentURI) + + m.MsgType = event.MsgAudio + //nolint:exhaustruct + m.Info = &event.FileInfo{ + MimeType: mtype, + Size: len(*fi.Data), + } + default: + b.Log.Debugf("sendFile %s", res.ContentURI) + + m.MsgType = event.MsgFile + //nolint:exhaustruct + m.Info = &event.FileInfo{ + MimeType: mtype, + Size: len(*fi.Data), + } + } + + _, err = b.sendMessageEventWithRetries(channel, m, msg.Username) + if err != nil { + b.Log.Errorf("sending the message referencing the uploaded file failed: %#v", err) + } +} diff --git a/bridge/matrix/helpers.go b/bridge/matrix/helpers.go index 5a91f748..93031e1d 100644 --- a/bridge/matrix/helpers.go +++ b/bridge/matrix/helpers.go @@ -1,16 +1,21 @@ package bmatrix import ( - "encoding/json" "errors" "fmt" "html" - "strings" + "sort" + "sync" "time" - matrix "github.com/matterbridge/gomatrix" + matrix "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" ) +// arbitrary limit to determine when to cleanup nickname cache entries +const MaxNumberOfUsersInCache = 50_000 + func newMatrixUsername(username string) *matrixUsername { mUsername := new(matrixUsername) @@ -28,11 +33,11 @@ func newMatrixUsername(username string) *matrixUsername { } // getRoomID retrieves a matching room ID from the channel name. -func (b *Bmatrix) getRoomID(channel string) string { +func (b *Bmatrix) getRoomID(channelName string) id.RoomID { b.RLock() defer b.RUnlock() - for ID, name := range b.RoomMap { - if name == channel { + for ID, channel := range b.RoomMap { + if channelName == channel.name { return ID } } @@ -40,31 +45,59 @@ func (b *Bmatrix) getRoomID(channel string) string { return "" } -// interface2Struct marshals and immediately unmarshals an interface. -// Useful for converting map[string]interface{} to a struct. -func interface2Struct(in interface{}, out interface{}) error { - jsonObj, err := json.Marshal(in) - if err != nil { - return err //nolint:wrapcheck - } +type NicknameCacheEntry struct { + displayName string + lastUpdated time.Time + conflictWithOtherUsername bool +} - return json.Unmarshal(jsonObj, out) +type NicknameUserEntry struct { + globalEntry *NicknameCacheEntry + perChannel map[id.RoomID]NicknameCacheEntry } -// getDisplayName retrieves the displayName for mxid, querying the homeserver if the mxid is not in the cache. -func (b *Bmatrix) getDisplayName(mxid string) string { - if b.GetBool("UseUserName") { - return mxid[1:] +type NicknameCache struct { + users map[id.UserID]NicknameUserEntry + sync.RWMutex +} + +func NewNicknameCache() *NicknameCache { + return &NicknameCache{ + users: make(map[id.UserID]NicknameUserEntry), + RWMutex: sync.RWMutex{}, } +} - b.RLock() - if val, present := b.NicknameMap[mxid]; present { - b.RUnlock() +// note: cache is not locked here +func (c *NicknameCache) retrieveDisplaynameFromCache(channelID id.RoomID, mxid id.UserID) string { + var cachedEntry *NicknameCacheEntry = nil + + c.RLock() + if user, userPresent := c.users[mxid]; userPresent { + // try first the name of the user in the room, then globally + if roomCachedEntry, roomPresent := user.perChannel[channelID]; roomPresent { + cachedEntry = &roomCachedEntry + } else if user.globalEntry != nil { + cachedEntry = user.globalEntry + } + } + c.RUnlock() - return val.displayName + if cachedEntry == nil { + return "" + } + + if cachedEntry.conflictWithOtherUsername { + // TODO: the current behavior is that only users with clashing usernames and *that have + // spoken since the bridge started* will get their mxids shown, and this doesn't + // feel right + return fmt.Sprintf("%s (%s)", cachedEntry.displayName, mxid) } - b.RUnlock() + return cachedEntry.displayName +} + +func (b *Bmatrix) retrieveGlobalDisplayname(mxid id.UserID) string { displayName, err := b.mc.GetDisplayName(mxid) var httpError *matrix.HTTPError if errors.As(err, &httpError) { @@ -72,127 +105,198 @@ func (b *Bmatrix) getDisplayName(mxid string) string { } if err != nil { - return b.cacheDisplayName(mxid, mxid[1:]) + return string(mxid)[1:] } - return b.cacheDisplayName(mxid, displayName.DisplayName) + return displayName.DisplayName } -// cacheDisplayName stores the mapping between a mxid and a display name, to be reused later without performing a query to the homserver. -// Note that old entries are cleaned when this function is called. -func (b *Bmatrix) cacheDisplayName(mxid string, displayName string) string { - now := time.Now() +// getDisplayName retrieves the displayName for mxid, querying the homeserver if the mxid is not in the cache. +func (b *Bmatrix) getDisplayName(channelID id.RoomID, mxid id.UserID) string { + if b.GetBool("UseUserName") { + return string(mxid)[1:] + } - // scan to delete old entries, to stop memory usage from becoming too high with old entries. - // In addition, we also detect if another user have the same username, and if so, we append their mxids to their usernames to differentiate them. - toDelete := []string{} - conflict := false + displayname := b.NicknameCache.retrieveDisplaynameFromCache(channelID, mxid) + if displayname != "" { + return displayname + } - b.Lock() - for mxid, v := range b.NicknameMap { - // to prevent username reuse across matrix servers - or even on the same server, append - // the mxid to the username when there is a conflict - if v.displayName == displayName { - conflict = true - // TODO: it would be nice to be able to rename previous messages from this user. - // The current behavior is that only users with clashing usernames and *that have spoken since the bridge last started* will get their mxids shown, and I don't know if that's the expected behavior. - v.displayName = fmt.Sprintf("%s (%s)", displayName, mxid) - b.NicknameMap[mxid] = v + // retrieve the global display name + return b.cacheDisplayName("", mxid, b.retrieveGlobalDisplayname(mxid)) +} + +// scan to delete old entries, to stop memory usage from becoming high with obsolete entries. +// note: assume the cache is already write-locked +// TODO: should we update the timestamp when the entry is used? +func (c *NicknameCache) clearObsoleteEntries(mxid id.UserID) { + // we have a "off-by-one" to account for when the user being added to the + // cache already have obsolete cache entries, as we want to keep it because + // we will be refreshing it in a minute + if len(c.users) <= MaxNumberOfUsersInCache+1 { + return + } + + usersLastTimestamp := make(map[id.UserID]int64, len(c.users)) + // compute the last updated timestamp entry for each user + for mxidIter, NicknameCacheIter := range c.users { + userLastTimestamp := time.Unix(0, 0) + for _, userInChannelCacheEntry := range NicknameCacheIter.perChannel { + if userInChannelCacheEntry.lastUpdated.After(userLastTimestamp) { + userLastTimestamp = userInChannelCacheEntry.lastUpdated + } } - if now.Sub(v.lastUpdated) > 10*time.Minute { - toDelete = append(toDelete, mxid) + if NicknameCacheIter.globalEntry != nil { + if NicknameCacheIter.globalEntry.lastUpdated.After(userLastTimestamp) { + userLastTimestamp = NicknameCacheIter.globalEntry.lastUpdated + } } - } - if conflict { - displayName = fmt.Sprintf("%s (%s)", displayName, mxid) + usersLastTimestamp[mxidIter] = userLastTimestamp.UnixNano() } - for _, v := range toDelete { - delete(b.NicknameMap, v) + // get the limit timestamp before which we must clear entries as obsolete + sortedTimestamps := make([]int64, 0, len(usersLastTimestamp)) + for _, value := range usersLastTimestamp { + sortedTimestamps = append(sortedTimestamps, value) } - - b.NicknameMap[mxid] = NicknameCacheEntry{ - displayName: displayName, - lastUpdated: now, + sort.Slice(sortedTimestamps, func(i, j int) bool { return sortedTimestamps[i] < sortedTimestamps[j] }) + limitTimestamp := sortedTimestamps[len(sortedTimestamps)-MaxNumberOfUsersInCache] + + // delete entries older than the limit + for mxidIter, timestamp := range usersLastTimestamp { + // do not clear the user that we are adding to the cache + if timestamp <= limitTimestamp && mxidIter != mxid { + delete(c.users, mxidIter) + } } - b.Unlock() - - return displayName } -// handleError converts errors into httpError. -//nolint:exhaustivestruct -func handleError(err error) *httpError { - var mErr matrix.HTTPError - if !errors.As(err, &mErr) { - return &httpError{ - Err: "not a HTTPError", +// to prevent username reuse across matrix rooms - or even inside the same room, if a user uses multiple servers - +// identify users with naming conflicts +func (c *NicknameCache) detectConflict(mxid id.UserID, displayName string) bool { + conflict := false + + for mxidIter, NicknameCacheIter := range c.users { + // skip conflict detection against ourselves, obviously + if mxidIter == mxid { + continue } - } - var httpErr httpError + for channelID, userInChannelCacheEntry := range NicknameCacheIter.perChannel { + if userInChannelCacheEntry.displayName == displayName { + userInChannelCacheEntry.conflictWithOtherUsername = true + c.users[mxidIter].perChannel[channelID] = userInChannelCacheEntry + conflict = true + } + } - if err := json.Unmarshal(mErr.Contents, &httpErr); err != nil { - return &httpError{ - Err: "unmarshal failed", + if NicknameCacheIter.globalEntry != nil && NicknameCacheIter.globalEntry.displayName == displayName { + c.users[mxidIter].globalEntry.conflictWithOtherUsername = true + conflict = true } } - return &httpErr + return conflict } -func (b *Bmatrix) containsAttachment(content map[string]interface{}) bool { - // Skip empty messages - if content["msgtype"] == nil { - return false +// cacheDisplayName stores the mapping between a mxid and a display name, to be reused +// later without performing a query to the homeserver. +// Note that old entries are cleaned when this function is called. +func (b *Bmatrix) cacheDisplayName(channelID id.RoomID, mxid id.UserID, displayName string) string { + now := time.Now() + + cache := b.NicknameCache + + cache.Lock() + defer cache.Unlock() + + conflict := cache.detectConflict(mxid, displayName) + + cache.clearObsoleteEntries(mxid) + + var newEntry NicknameUserEntry + if user, userPresent := cache.users[mxid]; userPresent { + newEntry = user + } else { + newEntry = NicknameUserEntry{ + globalEntry: nil, + perChannel: make(map[id.RoomID]NicknameCacheEntry), + } } - // Only allow image,video or file msgtypes - if !(content["msgtype"].(string) == "m.image" || - content["msgtype"].(string) == "m.video" || - content["msgtype"].(string) == "m.file") { - return false + cacheEntry := NicknameCacheEntry{ + displayName: displayName, + lastUpdated: now, + conflictWithOtherUsername: conflict, } - return true + // this is a local (room-specific) display name, let's cache it as such + if channelID == "" { + newEntry.globalEntry = &cacheEntry + } else { + globalDisplayName := b.retrieveGlobalDisplayname(mxid) + // updating the global display name or resetting the room name to the global name + if globalDisplayName == displayName { + delete(newEntry.perChannel, channelID) + newEntry.globalEntry = &cacheEntry + } else { + newEntry.perChannel[channelID] = cacheEntry + } + } + + cache.users[mxid] = newEntry + + return displayName } -// getAvatarURL returns the avatar URL of the specified sender. -func (b *Bmatrix) getAvatarURL(sender string) string { - urlPath := b.mc.BuildURL("profile", sender, "avatar_url") +func (b *Bmatrix) removeDisplayNameFromCache(mxid id.UserID) { + cache := b.NicknameCache - s := struct { - AvatarURL string `json:"avatar_url"` - }{} + cache.Lock() + defer cache.Unlock() - err := b.mc.MakeRequest("GET", urlPath, nil, &s) - if err != nil { - b.Log.Errorf("getAvatarURL failed: %s", err) + delete(cache.users, mxid) +} +// getAvatarURL returns the avatar URL of the specified sender. +func (b *Bmatrix) getAvatarURL(sender id.UserID) string { + url, err := b.mc.GetAvatarURL(sender) + if err != nil { + b.Log.Errorf("Couldn't retrieve the URL of the avatar for MXID %s", sender) return "" } - url := strings.ReplaceAll(s.AvatarURL, "mxc://", b.GetString("Server")+"/_matrix/media/r0/thumbnail/") - if url != "" { - url += "?width=37&height=37&method=crop" - } - - return url + return url.String() } // handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) { - httpErr := handleError(err) - if httpErr.Errcode != "M_LIMIT_EXCEEDED" { + var mErr matrix.HTTPError + if !errors.As(err, &mErr) { + b.Log.Errorf("Received a non-HTTPError, don't know what to make of it:\n%#v", err) + return 0, false + } + + if mErr.RespError.ErrCode != "M_LIMIT_EXCEEDED" { return 0, false } - b.Log.Debugf("ratelimited: %s", httpErr.Err) - b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000) + b.Log.Debugf("ratelimited: %s", mErr.RespError.Err) + + // fallback to a one-second delay + retryDelayMs := 1000 + + if retryDelayString, present := mErr.RespError.ExtraData["retry_after_ms"]; present { + if retryDelayInt, correct := retryDelayString.(int); correct && retryDelayInt > retryDelayMs { + retryDelayMs = retryDelayInt + } + } + + b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", retryDelayMs/1000) - return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true + return time.Duration(retryDelayMs) * time.Millisecond, true } // retry function will check if we're ratelimited and retries again when backoff time expired @@ -213,3 +317,66 @@ func (b *Bmatrix) retry(f func() error) error { } } } + +type SendMessageEventWrapper struct { + inner *matrix.Client +} + +//nolint: wrapcheck +func (w SendMessageEventWrapper) SendMessageEvent(roomID id.RoomID, eventType event.Type, contentJSON interface{}) (*matrix.RespSendEvent, error) { + return w.inner.SendMessageEvent(roomID, eventType, contentJSON) +} + +//nolint: wrapcheck +func (b *Bmatrix) sendMessageEventWithRetries(channel id.RoomID, message event.MessageEventContent, username string) (string, error) { + var ( + resp *matrix.RespSendEvent + client interface { + SendMessageEvent(roomID id.RoomID, eventType event.Type, contentJSON interface{}) (resp *matrix.RespSendEvent, err error) + } + err error + ) + + b.RLock() + appservice := b.RoomMap[channel].appService + b.RUnlock() + + client = SendMessageEventWrapper{inner: b.mc} + + // only try to send messages through the app Service *once* we have received + // events through it (otherwise we don't really know if the appservice works) + // Additionally, even if we're receiving messages in that room via the appService listener, + // let's check that the appservice "covers" that room + if appservice && b.appService.namespaces.containsRoom(channel) && len(b.appService.namespaces.prefixes) > 0 { + b.Log.Debugf("Sending with appService") + // we take the first prefix + bridgeUserID := fmt.Sprintf("@%s%s:%s", b.appService.namespaces.prefixes[0], id.EncodeUserLocalpart(username), b.appService.appService.HomeserverDomain) + intent := b.appService.appService.Intent(id.UserID(bridgeUserID)) + // if we can't change the display name it's not great but not the end of the world either, ignore it + // TODO: do not perform this action on every message, with an in-memory cache or something + _ = intent.SetDisplayName(username) + client = intent + } else { + applyUsernametoMessage(&message, username) + } + + err = b.retry(func() error { + resp, err = client.SendMessageEvent(channel, event.EventMessage, message) + + return err + }) + if err != nil { + return "", err + } + + return string(resp.EventID), err +} + +func applyUsernametoMessage(newMsg *event.MessageEventContent, username string) { + matrixUsername := newMatrixUsername(username) + + newMsg.Body = matrixUsername.plain + newMsg.Body + if newMsg.FormattedBody != "" { + newMsg.FormattedBody = matrixUsername.formatted + newMsg.FormattedBody + } +} diff --git a/bridge/matrix/matrix.go b/bridge/matrix/matrix.go index 49fc33b3..ee2081ff 100644 --- a/bridge/matrix/matrix.go +++ b/bridge/matrix/matrix.go @@ -1,18 +1,18 @@ +// nolint: exhaustivestruct package bmatrix import ( - "bytes" "fmt" - "mime" "regexp" - "strings" "sync" - "time" + + matrix "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" "github.com/42wim/matterbridge/bridge" "github.com/42wim/matterbridge/bridge/config" "github.com/42wim/matterbridge/bridge/helper" - matrix "github.com/matterbridge/gomatrix" ) var ( @@ -20,25 +20,30 @@ var ( htmlReplacementTag = regexp.MustCompile("<[^>]*>") ) -type NicknameCacheEntry struct { - displayName string - lastUpdated time.Time +type EventOrigin int + +const ( + originClassicSyncer EventOrigin = iota + originAppService +) + +type RoomInfo struct { + name string + appService bool } type Bmatrix struct { - mc *matrix.Client - UserID string - NicknameMap map[string]NicknameCacheEntry - RoomMap map[string]string - rateMutex sync.RWMutex + mc *matrix.Client + UserID id.UserID + appService *AppServiceWrapper + NicknameCache *NicknameCache + RoomMap map[id.RoomID]RoomInfo + rateMutex sync.RWMutex + joinedRooms []id.RoomID sync.RWMutex *bridge.Config -} - -type httpError struct { - Errcode string `json:"errcode"` - Err string `json:"error"` - RetryAfterMs int `json:"retry_after_ms"` + stopNormalSync chan struct{} + stopNormalSyncAck chan struct{} } type matrixUsername struct { @@ -46,44 +51,12 @@ type matrixUsername struct { formatted string } -// SubTextMessage represents the new content of the message in edit messages. -type SubTextMessage struct { - MsgType string `json:"msgtype"` - Body string `json:"body"` - FormattedBody string `json:"formatted_body,omitempty"` - Format string `json:"format,omitempty"` -} - -// MessageRelation explains how the current message relates to a previous message. -// Notably used for message edits. -type MessageRelation struct { - EventID string `json:"event_id"` - Type string `json:"rel_type"` -} - -type EditedMessage struct { - NewContent SubTextMessage `json:"m.new_content"` - RelatedTo MessageRelation `json:"m.relates_to"` - matrix.TextMessage -} - -type InReplyToRelationContent struct { - EventID string `json:"event_id"` -} - -type InReplyToRelation struct { - InReplyTo InReplyToRelationContent `json:"m.in_reply_to"` -} - -type ReplyMessage struct { - RelatedTo InReplyToRelation `json:"m.relates_to"` - matrix.TextMessage -} - func New(cfg *bridge.Config) bridge.Bridger { b := &Bmatrix{Config: cfg} - b.RoomMap = make(map[string]string) - b.NicknameMap = make(map[string]NicknameCacheEntry) + b.RoomMap = make(map[id.RoomID]RoomInfo) + b.NicknameCache = NewNicknameCache() + b.stopNormalSync = make(chan struct{}, 1) + b.stopNormalSyncAck = make(chan struct{}, 1) return b } @@ -91,13 +64,13 @@ func (b *Bmatrix) Connect() error { var err error b.Log.Infof("Connecting %s", b.GetString("Server")) if b.GetString("MxID") != "" && b.GetString("Token") != "" { + b.UserID = id.UserID(b.GetString("MxID")) b.mc, err = matrix.NewClient( - b.GetString("Server"), b.GetString("MxID"), b.GetString("Token"), + b.GetString("Server"), b.UserID, b.GetString("Token"), ) if err != nil { return err } - b.UserID = b.GetString("MxID") b.Log.Info("Using existing Matrix credentials") } else { b.mc, err = matrix.NewClient(b.GetString("Server"), "", "") @@ -105,102 +78,150 @@ func (b *Bmatrix) Connect() error { return err } resp, err := b.mc.Login(&matrix.ReqLogin{ - Type: "m.login.password", - User: b.GetString("Login"), - Password: b.GetString("Password"), - Identifier: matrix.NewUserIdentifier(b.GetString("Login")), + Type: matrix.AuthTypePassword, + Password: b.GetString("Password"), + Identifier: matrix.UserIdentifier{Type: matrix.IdentifierTypeUser, User: b.GetString("Login")}, //nolint: exhaustruct + StoreCredentials: true, }) if err != nil { return err } - b.mc.SetCredentials(resp.UserID, resp.AccessToken) b.UserID = resp.UserID b.Log.Info("Connection succeeded") } - go b.handlematrix() + + b.Log.Debug("Retrieving the list of rooms we have already joined") + joinedRooms, err := b.mc.JoinedRooms() + if err != nil { + b.Log.Errorf("couldn't list the joined rooms") + + return err + } + b.joinedRooms = joinedRooms.JoinedRooms + for _, roomID := range joinedRooms.JoinedRooms { + // leave the channel name (usually a channel alias - in the matrix sense) + // unresolved for now, it will be completed when JoinChannel() is called + b.RoomMap[roomID] = RoomInfo{name: "", appService: false} + } + return nil } func (b *Bmatrix) Disconnect() error { + // tell the Sync() loop to exit + b.stopNormalSync <- struct{}{} + b.mc.StopSync() + + // wait for both the syncer and the appservice to terminate + <-b.stopNormalSyncAck + if b.appService != nil { + b.appService.stop <- struct{}{} + <-b.appService.stopAck + } + return nil } func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error { - return b.retry(func() error { - resp, err := b.mc.JoinRoom(channel.Name, "", nil) + resolvedAlias, err := b.mc.ResolveAlias(id.RoomAlias(channel.Name)) + if err != nil { + b.Log.Errorf("couldn't retrieve the room ID for the alias '%s'", channel.Name) + + return err + } + + roomInfo := RoomInfo{name: channel.Name, appService: false} + alreadyJoined := false + for _, roomID := range b.joinedRooms { + // we have already joined this room (e.g. in a previous execution of matterbridge) + // => we only update the room alias, but do not attempt to join it again + if roomID == resolvedAlias.RoomID { + alreadyJoined = true + break + } + } + + if !alreadyJoined { + err = b.retry(func() error { + _, innerErr := b.mc.JoinRoom(channel.Name, "", nil) + return innerErr + }) + if err != nil { return err } + } - b.Lock() - b.RoomMap[resp.RoomID] = channel.Name - b.Unlock() + b.Lock() + b.RoomMap[resolvedAlias.RoomID] = roomInfo + b.Unlock() - return nil - }) + return nil } -func (b *Bmatrix) Send(msg config.Message) (string, error) { - b.Log.Debugf("=> Receiving %#v", msg) - - channel := b.getRoomID(msg.Channel) - b.Log.Debugf("Channel %s maps to channel id %s", msg.Channel, channel) +func (b *Bmatrix) Start() error { + // at this point, JoinChannel() has been called on all the channels + // declared in the configuration, so we can exit every other joined room + // in order to stop receiving events from rooms we no longer follow + b.RLock() + for _, roomID := range b.joinedRooms { + if _, present := b.RoomMap[roomID]; !present { + // we deliberately ignore the return value, + // because the bridge will still work even if we couln't exit the room + _, _ = b.mc.LeaveRoom(roomID, &matrix.ReqLeave{Reason: "No longer bridged"}) + } + } + b.RUnlock() - username := newMatrixUsername(msg.Username) + go b.handlematrix() - body := username.plain + msg.Text - formattedBody := username.formatted + helper.ParseMarkdown(msg.Text) + if b.GetBool("UseAppService") { + appService, err := b.NewAppService() + if err != nil { + b.Log.Errorf("couldn't load the app service configuration: %#v", err) - if b.GetBool("SpoofUsername") { - // https://spec.matrix.org/v1.3/client-server-api/#mroommember - type stateMember struct { - AvatarURL string `json:"avatar_url,omitempty"` - DisplayName string `json:"displayname"` - Membership string `json:"membership"` + return err } - // TODO: reset username afterwards with DisplayName: null ? - m := stateMember{ - AvatarURL: "", - DisplayName: username.plain, - Membership: "join", - } + b.appService = appService + err = b.startAppService() + if err != nil { + b.Log.Errorf("couldn't start the application service: %#v", err) - _, err := b.mc.SendStateEvent(channel, "m.room.member", b.UserID, m) - if err == nil { - body = msg.Text - formattedBody = helper.ParseMarkdown(msg.Text) + return err } } - // Make a action /me of the message - if msg.Event == config.EventUserAction { - m := matrix.TextMessage{ - MsgType: "m.emote", - Body: body, - FormattedBody: formattedBody, - Format: "org.matrix.custom.html", - } + return nil +} - if b.GetBool("HTMLDisable") { - m.Format = "" - m.FormattedBody = "" - } +func (b *Bmatrix) Send(msg config.Message) (string, error) { + b.Log.Debugf("=> Sending %#v", msg) - msgID := "" + channel := b.getRoomID(msg.Channel) + if channel == "" { + return "", fmt.Errorf("got message for unknown channel '%s'", msg.Channel) + } - err := b.retry(func() error { - resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) - if err != nil { - return err - } + if msg.Event == config.EventUserTyping && b.GetBool("ShowUserTyping") { + _, err := b.mc.UserTyping(channel, true, 15000) + return "", err + } - msgID = resp.EventID + // Make a action /me of the message + if msg.Event == config.EventUserAction { + //nolint:exhaustruct + m := event.MessageEventContent{ + MsgType: event.MsgEmote, + Body: msg.Text, + } - return err - }) + if !b.GetBool("HTMLDisable") { + m.FormattedBody = helper.ParseMarkdown(msg.Text) + m.Format = event.FormatHTML + } - return msgID, err + return b.sendMessageEventWithRetries(channel, m, msg.Username) } // Delete message @@ -212,12 +233,10 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) { msgID := "" err := b.retry(func() error { - resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{}) - if err != nil { - return err - } + //nolint:exhaustruct + resp, err := b.mc.RedactEvent(channel, id.EventID(msg.ID), matrix.ReqRedact{}) - msgID = resp.EventID + msgID = string(resp.EventID) return err }) @@ -228,13 +247,13 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) { // Upload a file if it exists if msg.Extra != nil { for _, rmsg := range helper.HandleExtra(&msg, b.General) { - rmsg := rmsg - - err := b.retry(func() error { - _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text) + //nolint:exhaustruct + m := event.MessageEventContent{ + MsgType: event.MsgText, + Body: rmsg.Text, + } - return err - }) + _, err := b.sendMessageEventWithRetries(channel, m, msg.Username) if err != nil { b.Log.Errorf("sendText failed: %s", err) } @@ -247,472 +266,159 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) { // Edit message if we have an ID if msg.ID != "" { - rmsg := EditedMessage{ - TextMessage: matrix.TextMessage{ - Body: body, - MsgType: "m.text", - Format: "org.matrix.custom.html", - FormattedBody: formattedBody, - }, + //nolint:exhaustruct + rmsg := event.MessageEventContent{ + MsgType: event.MsgText, + Body: msg.Text, } - - rmsg.NewContent = SubTextMessage{ - Body: rmsg.TextMessage.Body, - FormattedBody: rmsg.TextMessage.FormattedBody, - Format: rmsg.TextMessage.Format, - MsgType: "m.text", + //nolint:exhaustruct + rmsg.NewContent = &event.MessageEventContent{ + Body: rmsg.Body, + MsgType: event.MsgText, } - if b.GetBool("HTMLDisable") { - rmsg.TextMessage.Format = "" - rmsg.TextMessage.FormattedBody = "" - rmsg.NewContent.Format = "" - rmsg.NewContent.FormattedBody = "" + rmsg.FormattedBody = "* " + msg.Text + } else { + rmsg.Format = event.FormatHTML + rmsg.FormattedBody = "* " + helper.ParseMarkdown(msg.Text) + rmsg.NewContent.Format = rmsg.Format + rmsg.NewContent.FormattedBody = rmsg.FormattedBody } - rmsg.RelatedTo = MessageRelation{ - EventID: msg.ID, - Type: "m.replace", + //nolint:exhaustruct + rmsg.RelatesTo = &event.RelatesTo{ + EventID: id.EventID(msg.ID), + Type: event.RelReplace, } - err := b.retry(func() error { - _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg) + return b.sendMessageEventWithRetries(channel, rmsg, msg.Username) + } - return err - }) - if err != nil { - return "", err - } + //nolint:exhaustruct + m := event.MessageEventContent{ + Body: msg.Text, + } - return msg.ID, nil + if !b.GetBool("HTMLDisable") { + m.Format = event.FormatHTML + m.FormattedBody = msg.Text } // Use notices to send join/leave events if msg.Event == config.EventJoinLeave { - m := matrix.TextMessage{ - MsgType: "m.notice", - Body: body, - FormattedBody: formattedBody, - Format: "org.matrix.custom.html", - } - + m.MsgType = event.MsgNotice + } else { + m.MsgType = event.MsgText if b.GetBool("HTMLDisable") { - m.Format = "" m.FormattedBody = "" + } else { + m.FormattedBody = helper.ParseMarkdown(msg.Text) } - var ( - resp *matrix.RespSendEvent - err error - ) - - err = b.retry(func() error { - resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m) - - return err - }) - if err != nil { - return "", err - } - - return resp.EventID, err - } - - if msg.ParentValid() { - m := ReplyMessage{ - TextMessage: matrix.TextMessage{ - MsgType: "m.text", - Body: body, - FormattedBody: formattedBody, - Format: "org.matrix.custom.html", - }, - } - - if b.GetBool("HTMLDisable") { - m.TextMessage.Format = "" - m.TextMessage.FormattedBody = "" - } - - m.RelatedTo = InReplyToRelation{ - InReplyTo: InReplyToRelationContent{ - EventID: msg.ParentID, - }, - } - - var ( - resp *matrix.RespSendEvent - err error - ) - - err = b.retry(func() error { - resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m) - - return err - }) - if err != nil { - return "", err - } - - return resp.EventID, err - } - - if b.GetBool("HTMLDisable") { - var ( - resp *matrix.RespSendEvent - err error - ) - - err = b.retry(func() error { - resp, err = b.mc.SendText(channel, body) - - return err - }) - if err != nil { - return "", err - } - - return resp.EventID, err - } - - // Post normal message with HTML support (eg riot.im) - var ( - resp *matrix.RespSendEvent - err error - ) - - err = b.retry(func() error { - resp, err = b.mc.SendFormattedText(channel, body, formattedBody) - - return err - }) - if err != nil { - return "", err - } - - return resp.EventID, err -} - -func (b *Bmatrix) handlematrix() { - syncer := b.mc.Syncer.(*matrix.DefaultSyncer) - syncer.OnEventType("m.room.redaction", b.handleEvent) - syncer.OnEventType("m.room.message", b.handleEvent) - syncer.OnEventType("m.room.member", b.handleMemberChange) - go func() { - for { - if b == nil { - return - } - if err := b.mc.Sync(); err != nil { - b.Log.Println("Sync() returned ", err) + if msg.ParentValid() { + m.RelatesTo = &event.RelatesTo{ + EventID: "", + Type: event.RelReference, + InReplyTo: &event.InReplyTo{ + EventID: id.EventID(msg.ParentID), + }, + Key: "", } } - }() -} - -func (b *Bmatrix) handleEdit(ev *matrix.Event, rmsg config.Message) bool { - relationInterface, present := ev.Content["m.relates_to"] - newContentInterface, present2 := ev.Content["m.new_content"] - if !(present && present2) { - return false } - var relation MessageRelation - if err := interface2Struct(relationInterface, &relation); err != nil { - b.Log.Warnf("Couldn't parse 'm.relates_to' object with value %#v", relationInterface) - return false - } - - var newContent SubTextMessage - if err := interface2Struct(newContentInterface, &newContent); err != nil { - b.Log.Warnf("Couldn't parse 'm.new_content' object with value %#v", newContentInterface) - return false - } - - if relation.Type != "m.replace" { - return false - } - - rmsg.ID = relation.EventID - rmsg.Text = newContent.Body - b.Remote <- rmsg - - return true + return b.sendMessageEventWithRetries(channel, m, msg.Username) } -func (b *Bmatrix) handleReply(ev *matrix.Event, rmsg config.Message) bool { - relationInterface, present := ev.Content["m.relates_to"] - if !present { - return false - } - - var relation InReplyToRelation - if err := interface2Struct(relationInterface, &relation); err != nil { - // probably fine - return false - } - - body := rmsg.Text - - if !b.GetBool("keepquotedreply") { - for strings.HasPrefix(body, "> ") { - lineIdx := strings.IndexRune(body, '\n') - if lineIdx == -1 { - body = "" - } else { - body = body[(lineIdx + 1):] +// DontProcessOldEvents returns true if a sync event should be considered for further processing. +// We use that function to filter out events we have already read. +func (b *Bmatrix) DontProcessOldEvents(resp *matrix.RespSync, since string) bool { + // we only filter old events in the initial sync(), because subsequent sync() + // (where since != "") should only return new events + if since != "" { + return true + } + + for joinedRoom, roomData := range resp.Rooms.Join { + var readTimestamp int64 = 0 + // retrieve the timestamp of the last read receipt + // note: we're not sure some events will not be thrown away in this + // initial sync, as the server may not have received some events yet when + // the read receipt was sent: there is a mix of timestamps between + // the read receipt on the target homeserver and the timestamps when + // events were *created* on the homeserver peers + for _, evt := range roomData.Ephemeral.Events { + if evt.Type != event.EphemeralEventReceipt { + continue } - } - } - - rmsg.Text = body - rmsg.ParentID = relation.InReplyTo.EventID - b.Remote <- rmsg - - return true -} - -func (b *Bmatrix) handleMemberChange(ev *matrix.Event) { - // Update the displayname on join messages, according to https://matrix.org/docs/spec/client_server/r0.6.1#events-on-change-of-profile-information - if ev.Content["membership"] == "join" { - if dn, ok := ev.Content["displayname"].(string); ok { - b.cacheDisplayName(ev.Sender, dn) - } - } -} - -func (b *Bmatrix) handleEvent(ev *matrix.Event) { - b.Log.Debugf("== Receiving event: %#v", ev) - if ev.Sender != b.UserID { - b.RLock() - channel, ok := b.RoomMap[ev.RoomID] - b.RUnlock() - if !ok { - b.Log.Debugf("Unknown room %s", ev.RoomID) - return - } - - // Create our message - rmsg := config.Message{ - Username: b.getDisplayName(ev.Sender), - Channel: channel, - Account: b.Account, - UserID: ev.Sender, - ID: ev.ID, - Avatar: b.getAvatarURL(ev.Sender), - } - - // Remove homeserver suffix if configured - if b.GetBool("NoHomeServerSuffix") { - re := regexp.MustCompile("(.*?):.*") - rmsg.Username = re.ReplaceAllString(rmsg.Username, `$1`) - } - - // Delete event - if ev.Type == "m.room.redaction" { - rmsg.Event = config.EventMsgDelete - rmsg.ID = ev.Redacts - rmsg.Text = config.EventMsgDelete - b.Remote <- rmsg - return - } - - // Text must be a string - if rmsg.Text, ok = ev.Content["body"].(string); !ok { - b.Log.Errorf("Content[body] is not a string: %T\n%#v", - ev.Content["body"], ev.Content) - return - } - - // Do we have a /me action - if ev.Content["msgtype"].(string) == "m.emote" { - rmsg.Event = config.EventUserAction - } - - // Is it an edit? - if b.handleEdit(ev, rmsg) { - return - } - - // Is it a reply? - if b.handleReply(ev, rmsg) { - return - } - // Do we have attachments - if b.containsAttachment(ev.Content) { - err := b.handleDownloadFile(&rmsg, ev.Content) + err := evt.Content.ParseRaw(evt.Type) if err != nil { - b.Log.Errorf("download failed: %#v", err) + b.Log.Warnf("couldn't parse receipt event %#v", evt.Content) + } + receipts := *evt.Content.AsReceipt() + for _, receiptByType := range receipts { + for _, receiptsByUser := range receiptByType { + for userID, userReceipt := range receiptsByUser { + // ignore read receipts of other users + if userID != b.UserID { + continue + } + + readTimestamp = userReceipt.Timestamp.UnixNano() + } + } } } - b.Log.Debugf("<= Sending message from %s on %s to gateway", ev.Sender, b.Account) - b.Remote <- rmsg - - // not crucial, so no ratelimit check here - if err := b.mc.MarkRead(ev.RoomID, ev.ID); err != nil { - b.Log.Errorf("couldn't mark message as read %s", err.Error()) - } - } -} - -// handleDownloadFile handles file download -func (b *Bmatrix) handleDownloadFile(rmsg *config.Message, content map[string]interface{}) error { - var ( - ok bool - url, name, msgtype, mtype string - info map[string]interface{} - size float64 - ) - - rmsg.Extra = make(map[string][]interface{}) - if url, ok = content["url"].(string); !ok { - return fmt.Errorf("url isn't a %T", url) - } - url = strings.Replace(url, "mxc://", b.GetString("Server")+"/_matrix/media/v1/download/", -1) - - if info, ok = content["info"].(map[string]interface{}); !ok { - return fmt.Errorf("info isn't a %T", info) - } - if size, ok = info["size"].(float64); !ok { - return fmt.Errorf("size isn't a %T", size) - } - if name, ok = content["body"].(string); !ok { - return fmt.Errorf("name isn't a %T", name) - } - if msgtype, ok = content["msgtype"].(string); !ok { - return fmt.Errorf("msgtype isn't a %T", msgtype) - } - if mtype, ok = info["mimetype"].(string); !ok { - return fmt.Errorf("mtype isn't a %T", mtype) - } - - // check if we have an image uploaded without extension - if !strings.Contains(name, ".") { - if msgtype == "m.image" { - mext, _ := mime.ExtensionsByType(mtype) - if len(mext) > 0 { - name += mext[0] + newEventList := make([]*event.Event, 0, len(roomData.Timeline.Events)) + for _, evt := range roomData.Timeline.Events { + // remove old event, except for state changes + if evt.Timestamp > readTimestamp || evt.Type.Class == event.StateEventType { + newEventList = append(newEventList, evt) } - } else { - // just a default .png extension if we don't have mime info - name += ".png" } - } - - // check if the size is ok - err := helper.HandleDownloadSize(b.Log, rmsg, name, int64(size), b.General) - if err != nil { - return err - } - // actually download the file - data, err := helper.DownloadFile(url) - if err != nil { - return fmt.Errorf("download %s failed %#v", url, err) - } - // add the downloaded data to the message - helper.HandleDownloadData(b.Log, rmsg, name, "", url, data, b.General) - return nil -} -// handleUploadFiles handles native upload of files. -func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel string) (string, error) { - for _, f := range msg.Extra["file"] { - if fi, ok := f.(config.FileInfo); ok { - b.handleUploadFile(msg, channel, &fi) - } + roomData.Timeline.Events = newEventList + resp.Rooms.Join[joinedRoom] = roomData } - return "", nil + return true } -// handleUploadFile handles native upload of a file. -func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *config.FileInfo) { - username := newMatrixUsername(msg.Username) - content := bytes.NewReader(*fi.Data) - sp := strings.Split(fi.Name, ".") - mtype := mime.TypeByExtension("." + sp[len(sp)-1]) - // image and video uploads send no username, we have to do this ourself here #715 - err := b.retry(func() error { - _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment) - - return err - }) - if err != nil { - b.Log.Errorf("file comment failed: %#v", err) - } - - b.Log.Debugf("uploading file: %s %s", fi.Name, mtype) - - var res *matrix.RespMediaUpload - - err = b.retry(func() error { - res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data))) - - return err - }) +func (b *Bmatrix) handlematrix() { + syncer, ok := b.mc.Syncer.(*matrix.DefaultSyncer) + if !ok { + b.Log.Errorf("couldn't convert the Syncer object to a DefaultSyncer structure, the matrix bridge won't work") - if err != nil { - b.Log.Errorf("file upload failed: %#v", err) return } - switch { - case strings.Contains(mtype, "video"): - b.Log.Debugf("sendVideo %s", res.ContentURI) - err = b.retry(func() error { - _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI) + // register our custom filtering function + syncer.OnSync(b.DontProcessOldEvents) - return err + eventsTypes := []event.Type{event.EventRedaction, event.EventMessage, event.StateMember, event.EphemeralEventReceipt} + if b.GetBool("ShowUserTyping") { + eventsTypes = append(eventsTypes, event.EphemeralEventTyping) + } + for _, evType := range eventsTypes { + syncer.OnEventType(evType, func(source matrix.EventSource, ev *event.Event) { + b.handleEvent(originClassicSyncer, ev) }) - if err != nil { - b.Log.Errorf("sendVideo failed: %#v", err) - } - case strings.Contains(mtype, "image"): - b.Log.Debugf("sendImage %s", res.ContentURI) - err = b.retry(func() error { - _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI) + } - return err - }) - if err != nil { - b.Log.Errorf("sendImage failed: %#v", err) - } - case strings.Contains(mtype, "audio"): - b.Log.Debugf("sendAudio %s", res.ContentURI) - err = b.retry(func() error { - _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{ - MsgType: "m.audio", - Body: fi.Name, - URL: res.ContentURI, - Info: matrix.AudioInfo{ - Mimetype: mtype, - Size: uint(len(*fi.Data)), - }, - }) + go func() { + for { + select { + case <-b.stopNormalSync: + b.stopNormalSyncAck <- struct{}{} - return err - }) - if err != nil { - b.Log.Errorf("sendAudio failed: %#v", err) - } - default: - b.Log.Debugf("sendFile %s", res.ContentURI) - err = b.retry(func() error { - _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{ - MsgType: "m.file", - Body: fi.Name, - URL: res.ContentURI, - Info: matrix.FileInfo{ - Mimetype: mtype, - Size: uint(len(*fi.Data)), - }, - }) + return + default: - return err - }) - if err != nil { - b.Log.Errorf("sendFile failed: %#v", err) + if err := b.mc.Sync(); err != nil { + b.Log.Warningf("Sync() returned %#v", err) + } + } } - } - b.Log.Debugf("result: %#v", res) + }() } diff --git a/bridge/matrix/zerolog_abstraction.go b/bridge/matrix/zerolog_abstraction.go new file mode 100644 index 00000000..3d74a4e0 --- /dev/null +++ b/bridge/matrix/zerolog_abstraction.go @@ -0,0 +1,43 @@ +package bmatrix + +import ( + "errors" + + "github.com/sirupsen/logrus" + + "github.com/rs/zerolog" +) + +var levels_zerolog2logrus = map[zerolog.Level]logrus.Level{ + zerolog.DebugLevel: logrus.DebugLevel, + zerolog.InfoLevel: logrus.InfoLevel, + zerolog.WarnLevel: logrus.WarnLevel, + zerolog.FatalLevel: logrus.FatalLevel, + zerolog.PanicLevel: logrus.PanicLevel, + zerolog.ErrorLevel: logrus.ErrorLevel, + zerolog.TraceLevel: logrus.TraceLevel, +} + +// an abstraction for zerolog so we can pipe its output to logrus.Entry, that is used in matterbridge +type zerologWrapper struct { + inner *logrus.Entry +} + +func (w zerologWrapper) Write(p []byte) (n int, err error) { + return w.inner.Logger.Writer().Write(p) +} + +func (w zerologWrapper) WriteLevel(level zerolog.Level, p []byte) (n int, err error) { + if logrus_level, present := levels_zerolog2logrus[level]; present { + return w.inner.Logger.WriterLevel(logrus_level).Write(p) + } + // drop the message if we haven't a matching level + return 0, errors.New("Unsupported logging level") +} + +func NewZerologWrapper(entry *logrus.Entry) zerolog.Logger { + wrapper := zerologWrapper{inner: entry} + log := zerolog.New(wrapper) + + return log +} |