summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/matterbridge/gozulipbot/queue.go
diff options
context:
space:
mode:
authorWim <wim@42.be>2020-08-21 00:14:33 +0200
committerGitHub <noreply@github.com>2020-08-21 00:14:33 +0200
commitb451285af70df0651989b43149eb748648ac8c1b (patch)
tree9a3d9f9093cdceb7325bcddfda36983862b5b80b /vendor/github.com/matterbridge/gozulipbot/queue.go
parent63a1847cdc895a3e968c567c20d20359f0785ce2 (diff)
downloadmatterbridge-msglm-b451285af70df0651989b43149eb748648ac8c1b.tar.gz
matterbridge-msglm-b451285af70df0651989b43149eb748648ac8c1b.tar.bz2
matterbridge-msglm-b451285af70df0651989b43149eb748648ac8c1b.zip
Sync with upstream gozulipbot fixes (#1202)
Diffstat (limited to 'vendor/github.com/matterbridge/gozulipbot/queue.go')
-rw-r--r--vendor/github.com/matterbridge/gozulipbot/queue.go18
1 files changed, 16 insertions, 2 deletions
diff --git a/vendor/github.com/matterbridge/gozulipbot/queue.go b/vendor/github.com/matterbridge/gozulipbot/queue.go
index d6e910ee..77aef7fc 100644
--- a/vendor/github.com/matterbridge/gozulipbot/queue.go
+++ b/vendor/github.com/matterbridge/gozulipbot/queue.go
@@ -59,6 +59,7 @@ func (q *Queue) EventsChan() (chan EventMessage, func()) {
case err == BackoffError:
time.Sleep(time.Until(backoffTime))
atomic.AddInt64(&q.Bot.Retries, 1)
+ continue
case err == UnauthorizedError:
// TODO? have error channel when ending the continuously running process?
return
@@ -81,7 +82,7 @@ func (q *Queue) EventsChan() (chan EventMessage, func()) {
return out, endFunc
}
-// EventsCallback will repeatedly call provided callback function with
+// EventsCallback will repeatedly call the provided callback function with
// the output of continual queue.GetEvents calls.
// It returns a function which can be called to end the calls.
//
@@ -107,6 +108,7 @@ func (q *Queue) EventsCallback(fn func(EventMessage, error)) func() {
case err == BackoffError:
time.Sleep(time.Until(backoffTime))
atomic.AddInt64(&q.Bot.Retries, 1)
+ continue
case err == UnauthorizedError:
// TODO? have error channel when ending the continuously running process?
return
@@ -221,9 +223,19 @@ func (q *Queue) ParseEventMessages(rawEventResponse []byte) ([]EventMessage, err
}
messages := []EventMessage{}
+ newLastEventID := 0
for _, event := range events {
- // if the event is a heartbeat, return a special error
+ // Update the lastEventID
+ var id int
+ json.Unmarshal(event["id"], &id)
+ if id > newLastEventID {
+ newLastEventID = id
+ }
+
+ // If the event is a heartbeat, there won't be any more events.
+ // So update the last event id and return a special error.
if string(event["type"]) == `"heartbeat"` {
+ q.LastEventID = newLastEventID
return nil, HeartbeatError
}
var msg EventMessage
@@ -236,5 +248,7 @@ func (q *Queue) ParseEventMessages(rawEventResponse []byte) ([]EventMessage, err
messages = append(messages, msg)
}
+ q.LastEventID = newLastEventID
+
return messages, nil
}