diff options
Diffstat (limited to 'vendor/github.com/sromku/go-gitter/stream.go')
-rw-r--r-- | vendor/github.com/sromku/go-gitter/stream.go | 220 |
1 files changed, 0 insertions, 220 deletions
diff --git a/vendor/github.com/sromku/go-gitter/stream.go b/vendor/github.com/sromku/go-gitter/stream.go deleted file mode 100644 index 5f1cd78f..00000000 --- a/vendor/github.com/sromku/go-gitter/stream.go +++ /dev/null @@ -1,220 +0,0 @@ -package gitter - -import ( - "bufio" - "encoding/json" - "fmt" - "net/http" - "time" - - "github.com/mreiferson/go-httpclient" -) - -var defaultConnectionWaitTime time.Duration = 3000 // millis -var defaultConnectionMaxRetries = 5 - -// Stream initialize stream -func (gitter *Gitter) Stream(roomID string) *Stream { - return &Stream{ - url: streamBaseURL + "rooms/" + roomID + "/chatMessages", - Event: make(chan Event), - gitter: gitter, - streamConnection: gitter.newStreamConnection( - defaultConnectionWaitTime, - defaultConnectionMaxRetries), - } -} - -// Implemented to conform with https://developer.gitter.im/docs/streaming-api -func (gitter *Gitter) Listen(stream *Stream) { - - defer stream.destroy() - - var reader *bufio.Reader - var gitterMessage Message - lastKeepalive := time.Now().Unix() - - // connect - stream.connect() - -Loop: - for { - - // if closed then stop trying - if stream.isClosed() { - stream.Event <- Event{ - Data: &GitterConnectionClosed{}, - } - break Loop - } - - resp := stream.getResponse() - if resp.StatusCode != 200 { - gitter.log(fmt.Sprintf("Unexpected response code %v", resp.StatusCode)) - continue - } - - //"The JSON stream returns messages as JSON objects that are delimited by carriage return (\r)" <- Not true crap it's (\n) only - reader = bufio.NewReader(resp.Body) - line, err := reader.ReadBytes('\n') - - //Check if the line only consists of whitespace - onlyWhitespace := true - for _, b := range line { - if b != ' ' && b != '\t' && b != '\r' && b != '\n' { - onlyWhitespace = false - } - } - - if onlyWhitespace { - //"Parsers must be tolerant of occasional extra newline characters placed between messages." - currentKeepalive := time.Now().Unix() //interesting behavior of 100+ keepalives per seconds was observed - if currentKeepalive-lastKeepalive > 10 { - lastKeepalive = currentKeepalive - gitter.log("Keepalive was received") - } - continue - } else if stream.isClosed() { - gitter.log("Stream closed") - continue - } else if err != nil { - gitter.log("ReadBytes error: " + err.Error()) - stream.connect() - continue - } - - // unmarshal the streamed data - err = json.Unmarshal(line, &gitterMessage) - if err != nil { - gitter.log("JSON Unmarshal error: " + err.Error()) - continue - } - - // we are here, then we got the good message. pipe it forward. - stream.Event <- Event{ - Data: &MessageReceived{ - Message: gitterMessage, - }, - } - } - - gitter.log("Listening was completed") -} - -// Stream holds stream data. -type Stream struct { - url string - Event chan Event - streamConnection *streamConnection - gitter *Gitter -} - -func (stream *Stream) destroy() { - close(stream.Event) -} - -type Event struct { - Data interface{} -} - -type GitterConnectionClosed struct { -} - -type MessageReceived struct { - Message Message -} - -// connect and try to reconnect with -func (stream *Stream) connect() { - - if stream.streamConnection.retries == stream.streamConnection.currentRetries { - stream.Close() - stream.gitter.log("Number of retries exceeded the max retries number, we are done here") - return - } - - res, err := stream.gitter.getResponse(stream.url, stream) - if stream.streamConnection.canceled { - // do nothing - } else if err != nil || res.StatusCode != 200 { - stream.gitter.log("Failed to get response, trying reconnect ") - stream.gitter.log(err) - - // sleep and wait - stream.streamConnection.currentRetries++ - time.Sleep(time.Millisecond * stream.streamConnection.wait * time.Duration(stream.streamConnection.currentRetries)) - - // connect again - stream.Close() - stream.connect() - } else { - stream.gitter.log("Response was received") - stream.streamConnection.currentRetries = 0 - stream.streamConnection.closed = false - stream.streamConnection.response = res - } -} - -type streamConnection struct { - - // connection was closed - closed bool - - // canceled - canceled bool - - // wait time till next try - wait time.Duration - - // max tries to recover - retries int - - // current streamed response - response *http.Response - - // current request - request *http.Request - - // current status - currentRetries int -} - -// Close the stream connection and stop receiving streamed data -func (stream *Stream) Close() { - conn := stream.streamConnection - conn.closed = true - if conn.response != nil { - stream.gitter.log("Stream connection close response") - defer conn.response.Body.Close() - } - if conn.request != nil { - stream.gitter.log("Stream connection close request") - switch transport := stream.gitter.config.client.Transport.(type) { - case *httpclient.Transport: - stream.streamConnection.canceled = true - transport.CancelRequest(conn.request) - default: - } - - } - conn.currentRetries = 0 -} - -func (stream *Stream) isClosed() bool { - return stream.streamConnection.closed -} - -func (stream *Stream) getResponse() *http.Response { - return stream.streamConnection.response -} - -// Optional, set stream connection properties -// wait - time in milliseconds of waiting between reconnections. Will grow exponentially. -// retries - number of reconnections retries before dropping the stream. -func (gitter *Gitter) newStreamConnection(wait time.Duration, retries int) *streamConnection { - return &streamConnection{ - closed: true, - wait: wait, - retries: retries, - } -} |