diff options
Diffstat (limited to 'vendor/maunium.net/go/mautrix/sync.go')
-rw-r--r-- | vendor/maunium.net/go/mautrix/sync.go | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/vendor/maunium.net/go/mautrix/sync.go b/vendor/maunium.net/go/mautrix/sync.go new file mode 100644 index 00000000..6d3a1756 --- /dev/null +++ b/vendor/maunium.net/go/mautrix/sync.go @@ -0,0 +1,310 @@ +// Copyright (c) 2020 Tulir Asokan +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package mautrix + +import ( + "errors" + "fmt" + "runtime/debug" + "time" + + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +// EventSource represents the part of the sync response that an event came from. +type EventSource int + +const ( + EventSourcePresence EventSource = 1 << iota + EventSourceJoin + EventSourceInvite + EventSourceLeave + EventSourceAccountData + EventSourceTimeline + EventSourceState + EventSourceEphemeral + EventSourceToDevice + EventSourceDecrypted +) + +const primaryTypes = EventSourcePresence | EventSourceAccountData | EventSourceToDevice | EventSourceTimeline | EventSourceState +const roomSections = EventSourceJoin | EventSourceInvite | EventSourceLeave +const roomableTypes = EventSourceAccountData | EventSourceTimeline | EventSourceState +const encryptableTypes = roomableTypes | EventSourceToDevice + +func (es EventSource) String() string { + var typeName string + switch es & primaryTypes { + case EventSourcePresence: + typeName = "presence" + case EventSourceAccountData: + typeName = "account data" + case EventSourceToDevice: + typeName = "to-device" + case EventSourceTimeline: + typeName = "timeline" + case EventSourceState: + typeName = "state" + default: + return fmt.Sprintf("unknown (%d)", es) + } + if es&roomableTypes != 0 { + switch es & roomSections { + case EventSourceJoin: + typeName = "joined room " + typeName + case EventSourceInvite: + typeName = "invited room " + typeName + case EventSourceLeave: + typeName = "left room " + typeName + default: + return fmt.Sprintf("unknown (%d)", es) + } + es &^= roomableTypes + } + if es&encryptableTypes != 0 && es&EventSourceDecrypted != 0 { + typeName += " (decrypted)" + es &^= EventSourceDecrypted + } + es &^= primaryTypes + if es != 0 { + return fmt.Sprintf("unknown (%d)", es) + } + return typeName +} + +// EventHandler handles a single event from a sync response. +type EventHandler func(source EventSource, evt *event.Event) + +// SyncHandler handles a whole sync response. If the return value is false, handling will be stopped completely. +type SyncHandler func(resp *RespSync, since string) bool + +// Syncer is an interface that must be satisfied in order to do /sync requests on a client. +type Syncer interface { + // ProcessResponse processes the /sync response. The since parameter is the since= value that was used to produce the response. + // This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped permanently. + ProcessResponse(resp *RespSync, since string) error + // OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently. + OnFailedSync(res *RespSync, err error) (time.Duration, error) + // GetFilterJSON for the given user ID. NOT the filter ID. + GetFilterJSON(userID id.UserID) *Filter +} + +type ExtensibleSyncer interface { + OnSync(callback SyncHandler) + OnEvent(callback EventHandler) + OnEventType(eventType event.Type, callback EventHandler) +} + +type DispatchableSyncer interface { + Dispatch(source EventSource, evt *event.Event) +} + +// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively +// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer +// pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information. +type DefaultSyncer struct { + // syncListeners want the whole sync response, e.g. the crypto machine + syncListeners []SyncHandler + // globalListeners want all events + globalListeners []EventHandler + // listeners want a specific event type + listeners map[event.Type][]EventHandler + // ParseEventContent determines whether or not event content should be parsed before passing to handlers. + ParseEventContent bool + // ParseErrorHandler is called when event.Content.ParseRaw returns an error. + // If it returns false, the event will not be forwarded to listeners. + ParseErrorHandler func(evt *event.Event, err error) bool + // FilterJSON is used when the client starts syncing and doesn't get an existing filter ID from SyncStore's LoadFilterID. + FilterJSON *Filter +} + +var _ Syncer = (*DefaultSyncer)(nil) +var _ ExtensibleSyncer = (*DefaultSyncer)(nil) + +// NewDefaultSyncer returns an instantiated DefaultSyncer +func NewDefaultSyncer() *DefaultSyncer { + return &DefaultSyncer{ + listeners: make(map[event.Type][]EventHandler), + syncListeners: []SyncHandler{}, + globalListeners: []EventHandler{}, + ParseEventContent: true, + ParseErrorHandler: func(evt *event.Event, err error) bool { + return false + }, + } +} + +// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of +// unrepeating events. Returns a fatal error if a listener panics. +func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("ProcessResponse panicked! since=%s panic=%s\n%s", since, r, debug.Stack()) + } + }() + + for _, listener := range s.syncListeners { + if !listener(res, since) { + return + } + } + + s.processSyncEvents("", res.ToDevice.Events, EventSourceToDevice) + s.processSyncEvents("", res.Presence.Events, EventSourcePresence) + s.processSyncEvents("", res.AccountData.Events, EventSourceAccountData) + + for roomID, roomData := range res.Rooms.Join { + s.processSyncEvents(roomID, roomData.State.Events, EventSourceJoin|EventSourceState) + s.processSyncEvents(roomID, roomData.Timeline.Events, EventSourceJoin|EventSourceTimeline) + s.processSyncEvents(roomID, roomData.Ephemeral.Events, EventSourceJoin|EventSourceEphemeral) + s.processSyncEvents(roomID, roomData.AccountData.Events, EventSourceJoin|EventSourceAccountData) + } + for roomID, roomData := range res.Rooms.Invite { + s.processSyncEvents(roomID, roomData.State.Events, EventSourceInvite|EventSourceState) + } + for roomID, roomData := range res.Rooms.Leave { + s.processSyncEvents(roomID, roomData.State.Events, EventSourceLeave|EventSourceState) + s.processSyncEvents(roomID, roomData.Timeline.Events, EventSourceLeave|EventSourceTimeline) + } + return +} + +func (s *DefaultSyncer) processSyncEvents(roomID id.RoomID, events []*event.Event, source EventSource) { + for _, evt := range events { + s.processSyncEvent(roomID, evt, source) + } +} + +func (s *DefaultSyncer) processSyncEvent(roomID id.RoomID, evt *event.Event, source EventSource) { + evt.RoomID = roomID + + // Ensure the type class is correct. It's safe to mutate the class since the event type is not a pointer. + // Listeners are keyed by type structs, which means only the correct class will pass. + switch { + case evt.StateKey != nil: + evt.Type.Class = event.StateEventType + case source == EventSourcePresence, source&EventSourceEphemeral != 0: + evt.Type.Class = event.EphemeralEventType + case source&EventSourceAccountData != 0: + evt.Type.Class = event.AccountDataEventType + case source == EventSourceToDevice: + evt.Type.Class = event.ToDeviceEventType + default: + evt.Type.Class = event.MessageEventType + } + + if s.ParseEventContent { + err := evt.Content.ParseRaw(evt.Type) + if err != nil && !s.ParseErrorHandler(evt, err) { + return + } + } + + s.Dispatch(source, evt) +} + +func (s *DefaultSyncer) Dispatch(source EventSource, evt *event.Event) { + for _, fn := range s.globalListeners { + fn(source, evt) + } + listeners, exists := s.listeners[evt.Type] + if exists { + for _, fn := range listeners { + fn(source, evt) + } + } +} + +// OnEventType allows callers to be notified when there are new events for the given event type. +// There are no duplicate checks. +func (s *DefaultSyncer) OnEventType(eventType event.Type, callback EventHandler) { + _, exists := s.listeners[eventType] + if !exists { + s.listeners[eventType] = []EventHandler{} + } + s.listeners[eventType] = append(s.listeners[eventType], callback) +} + +func (s *DefaultSyncer) OnSync(callback SyncHandler) { + s.syncListeners = append(s.syncListeners, callback) +} + +func (s *DefaultSyncer) OnEvent(callback EventHandler) { + s.globalListeners = append(s.globalListeners, callback) +} + +// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error. +func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) { + if errors.Is(err, MUnknownToken) { + return 0, err + } + return 10 * time.Second, nil +} + +var defaultFilter = Filter{ + Room: RoomFilter{ + Timeline: FilterPart{ + Limit: 50, + }, + }, +} + +// GetFilterJSON returns a filter with a timeline limit of 50. +func (s *DefaultSyncer) GetFilterJSON(userID id.UserID) *Filter { + if s.FilterJSON == nil { + defaultFilterCopy := defaultFilter + s.FilterJSON = &defaultFilterCopy + } + return s.FilterJSON +} + +// OldEventIgnorer is an utility struct for bots to ignore events from before the bot joined the room. +// +// Create a struct and call Register with your DefaultSyncer to register the sync handler, e.g.: +// +// (&OldEventIgnorer{UserID: cli.UserID}).Register(cli.Syncer.(mautrix.ExtensibleSyncer)) +type OldEventIgnorer struct { + UserID id.UserID +} + +func (oei *OldEventIgnorer) Register(syncer ExtensibleSyncer) { + syncer.OnSync(oei.DontProcessOldEvents) +} + +// DontProcessOldEvents returns true if a sync response should be processed. May modify the response to remove +// stuff that shouldn't be processed. +func (oei *OldEventIgnorer) DontProcessOldEvents(resp *RespSync, since string) bool { + if since == "" { + return false + } + // This is a horrible hack because /sync will return the most recent messages for a room + // as soon as you /join it. We do NOT want to process those events in that particular room + // because they may have already been processed (if you toggle the bot in/out of the room). + // + // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us + // exists and is "join" and then discard processing that room entirely if so. + // TODO: We probably want to process messages from after the last join event in the timeline. + for roomID, roomData := range resp.Rooms.Join { + for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { + evt := roomData.Timeline.Events[i] + if evt.Type == event.StateMember && evt.GetStateKey() == string(oei.UserID) { + membership, _ := evt.Content.Raw["membership"].(string) + if membership == "join" { + _, ok := resp.Rooms.Join[roomID] + if !ok { + continue + } + delete(resp.Rooms.Join, roomID) // don't re-process messages + delete(resp.Rooms.Invite, roomID) // don't re-process invites + break + } + } + } + } + return true +} |