summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/matterbridge/gomatrix/sync.go
diff options
context:
space:
mode:
authorWim <wim@42.be>2017-12-25 00:54:39 +0100
committerWim <wim@42.be>2017-12-25 00:54:39 +0100
commit16f3fa6bae77f646c980f616e262ab84a12dddb7 (patch)
treec103acf984bc0e2f11c66852aeef53fb335dea5e /vendor/github.com/matterbridge/gomatrix/sync.go
parent1f706673cf326625b749b42b1102a75835ff1bbb (diff)
downloadmatterbridge-msglm-16f3fa6bae77f646c980f616e262ab84a12dddb7.tar.gz
matterbridge-msglm-16f3fa6bae77f646c980f616e262ab84a12dddb7.tar.bz2
matterbridge-msglm-16f3fa6bae77f646c980f616e262ab84a12dddb7.zip
Vendor github.com/matterbridge/gomatrix
Diffstat (limited to 'vendor/github.com/matterbridge/gomatrix/sync.go')
-rw-r--r--vendor/github.com/matterbridge/gomatrix/sync.go164
1 files changed, 164 insertions, 0 deletions
diff --git a/vendor/github.com/matterbridge/gomatrix/sync.go b/vendor/github.com/matterbridge/gomatrix/sync.go
new file mode 100644
index 00000000..c4bea48c
--- /dev/null
+++ b/vendor/github.com/matterbridge/gomatrix/sync.go
@@ -0,0 +1,164 @@
+package gomatrix
+
+import (
+ "encoding/json"
+ "fmt"
+ "runtime/debug"
+ "time"
+)
+
+// Syncer represents an interface that must be satisfied in order to do /sync requests on a client.
+type Syncer interface {
+ // Process the /sync response. The since parameter is the since= value that was used to produce the response.
+ // This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped
+ // permanently.
+ ProcessResponse(resp *RespSync, since string) error
+ // OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
+ OnFailedSync(res *RespSync, err error) (time.Duration, error)
+ // GetFilterJSON for the given user ID. NOT the filter ID.
+ GetFilterJSON(userID string) json.RawMessage
+}
+
+// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
+// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer
+// pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information.
+type DefaultSyncer struct {
+ UserID string
+ Store Storer
+ listeners map[string][]OnEventListener // event type to listeners array
+}
+
+// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events.
+type OnEventListener func(*Event)
+
+// NewDefaultSyncer returns an instantiated DefaultSyncer
+func NewDefaultSyncer(userID string, store Storer) *DefaultSyncer {
+ return &DefaultSyncer{
+ UserID: userID,
+ Store: store,
+ listeners: make(map[string][]OnEventListener),
+ }
+}
+
+// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of
+// unrepeating events. Returns a fatal error if a listener panics.
+func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) {
+ if !s.shouldProcessResponse(res, since) {
+ return
+ }
+
+ defer func() {
+ if r := recover(); r != nil {
+ err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack())
+ }
+ }()
+
+ for roomID, roomData := range res.Rooms.Join {
+ room := s.getOrCreateRoom(roomID)
+ for _, event := range roomData.State.Events {
+ event.RoomID = roomID
+ room.UpdateState(&event)
+ s.notifyListeners(&event)
+ }
+ for _, event := range roomData.Timeline.Events {
+ event.RoomID = roomID
+ s.notifyListeners(&event)
+ }
+ }
+ for roomID, roomData := range res.Rooms.Invite {
+ room := s.getOrCreateRoom(roomID)
+ for _, event := range roomData.State.Events {
+ event.RoomID = roomID
+ room.UpdateState(&event)
+ s.notifyListeners(&event)
+ }
+ }
+ for roomID, roomData := range res.Rooms.Leave {
+ room := s.getOrCreateRoom(roomID)
+ for _, event := range roomData.Timeline.Events {
+ if event.StateKey != nil {
+ event.RoomID = roomID
+ room.UpdateState(&event)
+ s.notifyListeners(&event)
+ }
+ }
+ }
+ return
+}
+
+// OnEventType allows callers to be notified when there are new events for the given event type.
+// There are no duplicate checks.
+func (s *DefaultSyncer) OnEventType(eventType string, callback OnEventListener) {
+ _, exists := s.listeners[eventType]
+ if !exists {
+ s.listeners[eventType] = []OnEventListener{}
+ }
+ s.listeners[eventType] = append(s.listeners[eventType], callback)
+}
+
+// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
+// stuff that shouldn't be processed.
+func (s *DefaultSyncer) shouldProcessResponse(resp *RespSync, since string) bool {
+ if since == "" {
+ return false
+ }
+ // This is a horrible hack because /sync will return the most recent messages for a room
+ // as soon as you /join it. We do NOT want to process those events in that particular room
+ // because they may have already been processed (if you toggle the bot in/out of the room).
+ //
+ // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
+ // exists and is "join" and then discard processing that room entirely if so.
+ // TODO: We probably want to process messages from after the last join event in the timeline.
+ for roomID, roomData := range resp.Rooms.Join {
+ for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- {
+ e := roomData.Timeline.Events[i]
+ if e.Type == "m.room.member" && e.StateKey != nil && *e.StateKey == s.UserID {
+ m := e.Content["membership"]
+ mship, ok := m.(string)
+ if !ok {
+ continue
+ }
+ if mship == "join" {
+ _, ok := resp.Rooms.Join[roomID]
+ if !ok {
+ continue
+ }
+ delete(resp.Rooms.Join, roomID) // don't re-process messages
+ delete(resp.Rooms.Invite, roomID) // don't re-process invites
+ break
+ }
+ }
+ }
+ }
+ return true
+}
+
+// getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse()
+func (s *DefaultSyncer) getOrCreateRoom(roomID string) *Room {
+ room := s.Store.LoadRoom(roomID)
+ if room == nil { // create a new Room
+ room = NewRoom(roomID)
+ s.Store.SaveRoom(room)
+ }
+ return room
+}
+
+func (s *DefaultSyncer) notifyListeners(event *Event) {
+ listeners, exists := s.listeners[event.Type]
+ if !exists {
+ return
+ }
+ for _, fn := range listeners {
+ fn(event)
+ }
+}
+
+// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error.
+func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) {
+ return 10 * time.Second, nil
+}
+
+// GetFilterJSON returns a filter with a timeline limit of 50.
+func (s *DefaultSyncer) GetFilterJSON(userID string) json.RawMessage {
+ return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`)
+}