summaryrefslogblamecommitdiffstats
path: root/vendor/github.com/matterbridge/gozulipbot/queue.go
blob: d6e910eeee98d6667d627c309a45517a26628ac8 (plain) (tree)
1
2
3
4

                  
               










                       



                                                                      
                                                                







                                                   





                                       







































































































                                                                                                                                       



                                              




                                             

                                                       



                                                

























                                                                               


















                                                                          





























                                                                                     
package gozulipbot

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"math"
	"net/http"
	"net/url"
	"strconv"
	"sync/atomic"
	"time"
)

var (
	HeartbeatError     = fmt.Errorf("EventMessage is a heartbeat")
	UnauthorizedError  = fmt.Errorf("Request is unauthorized")
	BackoffError       = fmt.Errorf("Too many requests")
	BadEventQueueError = fmt.Errorf("BAD_EVENT_QUEUE_ID error")
	UnknownError       = fmt.Errorf("Error was unknown")
	NoJSONError        = fmt.Errorf("No JSON in body found")
)

type Queue struct {
	ID           string `json:"queue_id"`
	LastEventID  int    `json:"last_event_id"`
	MaxMessageID int    `json:"max_message_id"`
	Bot          *Bot   `json:"-"`
}

type QueueError struct {
	Code   string `json:"code"`
	Msg    string `json:"msg"`
	ID     string `json:"queue_id"`
	Result string `json:"result"`
}

func (q *Queue) EventsChan() (chan EventMessage, func()) {
	end := false
	endFunc := func() {
		end = true
	}

	out := make(chan EventMessage)
	go func() {
		defer close(out)
		for {
			backoffTime := time.Now().Add(q.Bot.Backoff * time.Duration(math.Pow10(int(atomic.LoadInt64(&q.Bot.Retries)))))
			minTime := time.Now().Add(q.Bot.Backoff)
			if end {
				return
			}
			ems, err := q.GetEvents()
			switch {
			case err == HeartbeatError:
				time.Sleep(time.Until(minTime))
				continue
			case err == BackoffError:
				time.Sleep(time.Until(backoffTime))
				atomic.AddInt64(&q.Bot.Retries, 1)
			case err == UnauthorizedError:
				// TODO? have error channel when ending the continuously running process?
				return
			default:
				atomic.StoreInt64(&q.Bot.Retries, 0)
			}
			if err != nil {
				// TODO: handle unknown error
				// For now, handle this like an UnauthorizedError and end the func.
				return
			}
			for _, em := range ems {
				out <- em
			}
			// Always make sure we wait the minimum time before asking again.
			time.Sleep(time.Until(minTime))
		}
	}()

	return out, endFunc
}

// EventsCallback will repeatedly call provided callback function with
// the output of continual queue.GetEvents calls.
// It returns a function which can be called to end the calls.
//
// It will end early if it receives an UnauthorizedError, or an unknown error.
// Note, it will never return a HeartbeatError.
func (q *Queue) EventsCallback(fn func(EventMessage, error)) func() {
	end := false
	endFunc := func() {
		end = true
	}
	go func() {
		for {
			backoffTime := time.Now().Add(q.Bot.Backoff * time.Duration(math.Pow10(int(atomic.LoadInt64(&q.Bot.Retries)))))
			minTime := time.Now().Add(q.Bot.Backoff)
			if end {
				return
			}
			ems, err := q.GetEvents()
			switch {
			case err == HeartbeatError:
				time.Sleep(time.Until(minTime))
				continue
			case err == BackoffError:
				time.Sleep(time.Until(backoffTime))
				atomic.AddInt64(&q.Bot.Retries, 1)
			case err == UnauthorizedError:
				// TODO? have error channel when ending the continuously running process?
				return
			default:
				atomic.StoreInt64(&q.Bot.Retries, 0)
			}
			if err != nil {
				// TODO: handle unknown error
				// For now, handle this like an UnauthorizedError and end the func.
				return
			}
			for _, em := range ems {
				fn(em, err)
			}
			// Always make sure we wait the minimum time before asking again.
			time.Sleep(time.Until(minTime))
		}
	}()

	return endFunc
}

// GetEvents is a blocking call that waits for and parses a list of EventMessages.
// There will usually only be one EventMessage returned.
// When a heartbeat is returned, GetEvents will return a HeartbeatError.
// When an http status code above 400 is returned, one of a BackoffError,
// UnauthorizedError, or UnknownError will be returned.
func (q *Queue) GetEvents() ([]EventMessage, error) {
	resp, err := q.RawGetEvents()
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}

	switch {
	case resp.StatusCode == 429:
		return nil, BackoffError
	case resp.StatusCode == 403:
		return nil, UnauthorizedError
	case resp.StatusCode >= 400:
		if bytes.HasPrefix(body, []byte("<")) {
			return nil, NoJSONError
		}
		qErr, err := q.ParseError(body)
		if err != nil || qErr == nil {
			return nil, UnknownError
		}
		return nil, BadEventQueueError
	}

	msgs, err := q.ParseEventMessages(body)
	if err != nil {
		return nil, err
	}

	return msgs, nil
}

// RawGetEvents is a blocking call that receives a response containing a list
// of events (a.k.a. received messages) since the last message id in the queue.
func (q *Queue) RawGetEvents() (*http.Response, error) {
	values := url.Values{}
	values.Set("queue_id", q.ID)
	values.Set("last_event_id", strconv.Itoa(q.LastEventID))

	url := "events?" + values.Encode()

	req, err := q.Bot.constructRequest("GET", url, "")
	if err != nil {
		return nil, err
	}

	return q.Bot.Client.Do(req)
}

func (q *Queue) ParseError(rawEventResponse []byte) (*QueueError, error) {
	rawResponse := map[string]json.RawMessage{}
	err := json.Unmarshal(rawEventResponse, &rawResponse)
	if err != nil {
		return nil, err
	}

	if _, ok := rawResponse["code"]; ok {
		var qErr QueueError
		err = json.Unmarshal(rawEventResponse, &qErr)
		if err != nil {
			return nil, err
		}
		if qErr.Code == "BAD_EVENT_QUEUE_ID" {
			return &qErr, nil
		}
	}
	return nil, nil
}

func (q *Queue) ParseEventMessages(rawEventResponse []byte) ([]EventMessage, error) {
	rawResponse := map[string]json.RawMessage{}
	err := json.Unmarshal(rawEventResponse, &rawResponse)
	if err != nil {
		return nil, err
	}

	events := []map[string]json.RawMessage{}
	err = json.Unmarshal(rawResponse["events"], &events)
	if err != nil {
		return nil, err
	}

	messages := []EventMessage{}
	for _, event := range events {
		// if the event is a heartbeat, return a special error
		if string(event["type"]) == `"heartbeat"` {
			return nil, HeartbeatError
		}
		var msg EventMessage
		err = json.Unmarshal(event["message"], &msg)
		// TODO? should this check be here
		if err != nil {
			return nil, err
		}
		msg.Queue = q
		messages = append(messages, msg)
	}

	return messages, nil
}