summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/matterbridge/gozulipbot/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/matterbridge/gozulipbot/queue.go')
-rw-r--r--vendor/github.com/matterbridge/gozulipbot/queue.go203
1 files changed, 203 insertions, 0 deletions
diff --git a/vendor/github.com/matterbridge/gozulipbot/queue.go b/vendor/github.com/matterbridge/gozulipbot/queue.go
new file mode 100644
index 00000000..9a37a8e7
--- /dev/null
+++ b/vendor/github.com/matterbridge/gozulipbot/queue.go
@@ -0,0 +1,203 @@
+package gozulipbot
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "math"
+ "net/http"
+ "net/url"
+ "strconv"
+ "sync/atomic"
+ "time"
+)
+
+var (
+ HeartbeatError = fmt.Errorf("EventMessage is a heartbeat")
+ UnauthorizedError = fmt.Errorf("Request is unauthorized")
+ BackoffError = fmt.Errorf("Too many requests")
+ UnknownError = fmt.Errorf("Error was unknown")
+)
+
+type Queue struct {
+ ID string `json:"queue_id"`
+ LastEventID int `json:"last_event_id"`
+ MaxMessageID int `json:"max_message_id"`
+ Bot *Bot `json:"-"`
+}
+
+func (q *Queue) EventsChan() (chan EventMessage, func()) {
+ end := false
+ endFunc := func() {
+ end = true
+ }
+
+ out := make(chan EventMessage)
+ go func() {
+ defer close(out)
+ for {
+ backoffTime := time.Now().Add(q.Bot.Backoff * time.Duration(math.Pow10(int(atomic.LoadInt64(&q.Bot.Retries)))))
+ minTime := time.Now().Add(q.Bot.Backoff)
+ if end {
+ return
+ }
+ ems, err := q.GetEvents()
+ switch {
+ case err == HeartbeatError:
+ time.Sleep(time.Until(minTime))
+ continue
+ case err == BackoffError:
+ time.Sleep(time.Until(backoffTime))
+ atomic.AddInt64(&q.Bot.Retries, 1)
+ case err == UnauthorizedError:
+ // TODO? have error channel when ending the continuously running process?
+ return
+ default:
+ atomic.StoreInt64(&q.Bot.Retries, 0)
+ }
+ if err != nil {
+ // TODO: handle unknown error
+ // For now, handle this like an UnauthorizedError and end the func.
+ return
+ }
+ for _, em := range ems {
+ out <- em
+ }
+ // Always make sure we wait the minimum time before asking again.
+ time.Sleep(time.Until(minTime))
+ }
+ }()
+
+ return out, endFunc
+}
+
+// EventsCallback will repeatedly call provided callback function with
+// the output of continual queue.GetEvents calls.
+// It returns a function which can be called to end the calls.
+//
+// It will end early if it receives an UnauthorizedError, or an unknown error.
+// Note, it will never return a HeartbeatError.
+func (q *Queue) EventsCallback(fn func(EventMessage, error)) func() {
+ end := false
+ endFunc := func() {
+ end = true
+ }
+ go func() {
+ for {
+ backoffTime := time.Now().Add(q.Bot.Backoff * time.Duration(math.Pow10(int(atomic.LoadInt64(&q.Bot.Retries)))))
+ minTime := time.Now().Add(q.Bot.Backoff)
+ if end {
+ return
+ }
+ ems, err := q.GetEvents()
+ switch {
+ case err == HeartbeatError:
+ time.Sleep(time.Until(minTime))
+ continue
+ case err == BackoffError:
+ time.Sleep(time.Until(backoffTime))
+ atomic.AddInt64(&q.Bot.Retries, 1)
+ case err == UnauthorizedError:
+ // TODO? have error channel when ending the continuously running process?
+ return
+ default:
+ atomic.StoreInt64(&q.Bot.Retries, 0)
+ }
+ if err != nil {
+ // TODO: handle unknown error
+ // For now, handle this like an UnauthorizedError and end the func.
+ return
+ }
+ for _, em := range ems {
+ fn(em, err)
+ }
+ // Always make sure we wait the minimum time before asking again.
+ time.Sleep(time.Until(minTime))
+ }
+ }()
+
+ return endFunc
+}
+
+// GetEvents is a blocking call that waits for and parses a list of EventMessages.
+// There will usually only be one EventMessage returned.
+// When a heartbeat is returned, GetEvents will return a HeartbeatError.
+// When an http status code above 400 is returned, one of a BackoffError,
+// UnauthorizedError, or UnknownError will be returned.
+func (q *Queue) GetEvents() ([]EventMessage, error) {
+ resp, err := q.RawGetEvents()
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+
+ switch {
+ case resp.StatusCode == 429:
+ return nil, BackoffError
+ case resp.StatusCode == 403:
+ return nil, UnauthorizedError
+ case resp.StatusCode >= 400:
+ return nil, UnknownError
+ }
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ msgs, err := q.ParseEventMessages(body)
+ if err != nil {
+ return nil, err
+ }
+
+ return msgs, nil
+}
+
+// RawGetEvents is a blocking call that receives a response containing a list
+// of events (a.k.a. received messages) since the last message id in the queue.
+func (q *Queue) RawGetEvents() (*http.Response, error) {
+ values := url.Values{}
+ values.Set("queue_id", q.ID)
+ values.Set("last_event_id", strconv.Itoa(q.LastEventID))
+
+ url := "events?" + values.Encode()
+
+ req, err := q.Bot.constructRequest("GET", url, "")
+ if err != nil {
+ return nil, err
+ }
+
+ return q.Bot.Client.Do(req)
+}
+
+func (q *Queue) ParseEventMessages(rawEventResponse []byte) ([]EventMessage, error) {
+ rawResponse := map[string]json.RawMessage{}
+ err := json.Unmarshal(rawEventResponse, &rawResponse)
+ if err != nil {
+ return nil, err
+ }
+
+ events := []map[string]json.RawMessage{}
+ err = json.Unmarshal(rawResponse["events"], &events)
+ if err != nil {
+ return nil, err
+ }
+
+ messages := []EventMessage{}
+ for _, event := range events {
+ // if the event is a heartbeat, return a special error
+ if string(event["type"]) == `"heartbeat"` {
+ return nil, HeartbeatError
+ }
+ var msg EventMessage
+ err = json.Unmarshal(event["message"], &msg)
+ // TODO? should this check be here
+ if err != nil {
+ return nil, err
+ }
+ msg.Queue = q
+ messages = append(messages, msg)
+ }
+
+ return messages, nil
+}