summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/sromku/go-gitter/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/sromku/go-gitter/stream.go')
-rw-r--r--vendor/github.com/sromku/go-gitter/stream.go220
1 files changed, 220 insertions, 0 deletions
diff --git a/vendor/github.com/sromku/go-gitter/stream.go b/vendor/github.com/sromku/go-gitter/stream.go
new file mode 100644
index 00000000..5f1cd78f
--- /dev/null
+++ b/vendor/github.com/sromku/go-gitter/stream.go
@@ -0,0 +1,220 @@
+package gitter
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "time"
+
+ "github.com/mreiferson/go-httpclient"
+)
+
+var defaultConnectionWaitTime time.Duration = 3000 // millis
+var defaultConnectionMaxRetries = 5
+
+// Stream initialize stream
+func (gitter *Gitter) Stream(roomID string) *Stream {
+ return &Stream{
+ url: streamBaseURL + "rooms/" + roomID + "/chatMessages",
+ Event: make(chan Event),
+ gitter: gitter,
+ streamConnection: gitter.newStreamConnection(
+ defaultConnectionWaitTime,
+ defaultConnectionMaxRetries),
+ }
+}
+
+// Implemented to conform with https://developer.gitter.im/docs/streaming-api
+func (gitter *Gitter) Listen(stream *Stream) {
+
+ defer stream.destroy()
+
+ var reader *bufio.Reader
+ var gitterMessage Message
+ lastKeepalive := time.Now().Unix()
+
+ // connect
+ stream.connect()
+
+Loop:
+ for {
+
+ // if closed then stop trying
+ if stream.isClosed() {
+ stream.Event <- Event{
+ Data: &GitterConnectionClosed{},
+ }
+ break Loop
+ }
+
+ resp := stream.getResponse()
+ if resp.StatusCode != 200 {
+ gitter.log(fmt.Sprintf("Unexpected response code %v", resp.StatusCode))
+ continue
+ }
+
+ //"The JSON stream returns messages as JSON objects that are delimited by carriage return (\r)" <- Not true crap it's (\n) only
+ reader = bufio.NewReader(resp.Body)
+ line, err := reader.ReadBytes('\n')
+
+ //Check if the line only consists of whitespace
+ onlyWhitespace := true
+ for _, b := range line {
+ if b != ' ' && b != '\t' && b != '\r' && b != '\n' {
+ onlyWhitespace = false
+ }
+ }
+
+ if onlyWhitespace {
+ //"Parsers must be tolerant of occasional extra newline characters placed between messages."
+ currentKeepalive := time.Now().Unix() //interesting behavior of 100+ keepalives per seconds was observed
+ if currentKeepalive-lastKeepalive > 10 {
+ lastKeepalive = currentKeepalive
+ gitter.log("Keepalive was received")
+ }
+ continue
+ } else if stream.isClosed() {
+ gitter.log("Stream closed")
+ continue
+ } else if err != nil {
+ gitter.log("ReadBytes error: " + err.Error())
+ stream.connect()
+ continue
+ }
+
+ // unmarshal the streamed data
+ err = json.Unmarshal(line, &gitterMessage)
+ if err != nil {
+ gitter.log("JSON Unmarshal error: " + err.Error())
+ continue
+ }
+
+ // we are here, then we got the good message. pipe it forward.
+ stream.Event <- Event{
+ Data: &MessageReceived{
+ Message: gitterMessage,
+ },
+ }
+ }
+
+ gitter.log("Listening was completed")
+}
+
+// Stream holds stream data.
+type Stream struct {
+ url string
+ Event chan Event
+ streamConnection *streamConnection
+ gitter *Gitter
+}
+
+func (stream *Stream) destroy() {
+ close(stream.Event)
+}
+
+type Event struct {
+ Data interface{}
+}
+
+type GitterConnectionClosed struct {
+}
+
+type MessageReceived struct {
+ Message Message
+}
+
+// connect and try to reconnect with
+func (stream *Stream) connect() {
+
+ if stream.streamConnection.retries == stream.streamConnection.currentRetries {
+ stream.Close()
+ stream.gitter.log("Number of retries exceeded the max retries number, we are done here")
+ return
+ }
+
+ res, err := stream.gitter.getResponse(stream.url, stream)
+ if stream.streamConnection.canceled {
+ // do nothing
+ } else if err != nil || res.StatusCode != 200 {
+ stream.gitter.log("Failed to get response, trying reconnect ")
+ stream.gitter.log(err)
+
+ // sleep and wait
+ stream.streamConnection.currentRetries++
+ time.Sleep(time.Millisecond * stream.streamConnection.wait * time.Duration(stream.streamConnection.currentRetries))
+
+ // connect again
+ stream.Close()
+ stream.connect()
+ } else {
+ stream.gitter.log("Response was received")
+ stream.streamConnection.currentRetries = 0
+ stream.streamConnection.closed = false
+ stream.streamConnection.response = res
+ }
+}
+
+type streamConnection struct {
+
+ // connection was closed
+ closed bool
+
+ // canceled
+ canceled bool
+
+ // wait time till next try
+ wait time.Duration
+
+ // max tries to recover
+ retries int
+
+ // current streamed response
+ response *http.Response
+
+ // current request
+ request *http.Request
+
+ // current status
+ currentRetries int
+}
+
+// Close the stream connection and stop receiving streamed data
+func (stream *Stream) Close() {
+ conn := stream.streamConnection
+ conn.closed = true
+ if conn.response != nil {
+ stream.gitter.log("Stream connection close response")
+ defer conn.response.Body.Close()
+ }
+ if conn.request != nil {
+ stream.gitter.log("Stream connection close request")
+ switch transport := stream.gitter.config.client.Transport.(type) {
+ case *httpclient.Transport:
+ stream.streamConnection.canceled = true
+ transport.CancelRequest(conn.request)
+ default:
+ }
+
+ }
+ conn.currentRetries = 0
+}
+
+func (stream *Stream) isClosed() bool {
+ return stream.streamConnection.closed
+}
+
+func (stream *Stream) getResponse() *http.Response {
+ return stream.streamConnection.response
+}
+
+// Optional, set stream connection properties
+// wait - time in milliseconds of waiting between reconnections. Will grow exponentially.
+// retries - number of reconnections retries before dropping the stream.
+func (gitter *Gitter) newStreamConnection(wait time.Duration, retries int) *streamConnection {
+ return &streamConnection{
+ closed: true,
+ wait: wait,
+ retries: retries,
+ }
+}