package gitter import ( "bufio" "encoding/json" "fmt" "net/http" "time" "github.com/mreiferson/go-httpclient" ) var defaultConnectionWaitTime time.Duration = 3000 // millis var defaultConnectionMaxRetries = 5 // Stream initialize stream func (gitter *Gitter) Stream(roomID string) *Stream { return &Stream{ url: streamBaseURL + "rooms/" + roomID + "/chatMessages", Event: make(chan Event), gitter: gitter, streamConnection: gitter.newStreamConnection( defaultConnectionWaitTime, defaultConnectionMaxRetries), } } // Implemented to conform with https://developer.gitter.im/docs/streaming-api func (gitter *Gitter) Listen(stream *Stream) { defer stream.destroy() var reader *bufio.Reader var gitterMessage Message lastKeepalive := time.Now().Unix() // connect stream.connect() Loop: for { // if closed then stop trying if stream.isClosed() { stream.Event <- Event{ Data: &GitterConnectionClosed{}, } break Loop } resp := stream.getResponse() if resp.StatusCode != 200 { gitter.log(fmt.Sprintf("Unexpected response code %v", resp.StatusCode)) continue } //"The JSON stream returns messages as JSON objects that are delimited by carriage return (\r)" <- Not true crap it's (\n) only reader = bufio.NewReader(resp.Body) line, err := reader.ReadBytes('\n') if err != nil { gitter.log("ReadBytes error: " + err.Error()) stream.connect() continue } //Check if the line only consists of whitespace onlyWhitespace := true for _, b := range line { if b != ' ' && b != '\t' && b != '\r' && b != '\n' { onlyWhitespace = false } } if onlyWhitespace { //"Parsers must be tolerant of occasional extra newline characters placed between messages." currentKeepalive := time.Now().Unix() //interesting behavior of 100+ keepalives per seconds was observed if currentKeepalive-lastKeepalive > 10 { lastKeepalive = currentKeepalive gitter.log("Keepalive was received") } continue } else if stream.isClosed() { gitter.log("Stream closed") continue } // unmarshal the streamed data err = json.Unmarshal(line, &gitterMessage) if err != nil { gitter.log("JSON Unmarshal error: " + err.Error()) continue } // we are here, then we got the good message. pipe it forward. stream.Event <- Event{ Data: &MessageReceived{ Message: gitterMessage, }, } } gitter.log("Listening was completed") } // Stream holds stream data. type Stream struct { url string Event chan Event streamConnection *streamConnection gitter *Gitter } func (stream *Stream) destroy() { close(stream.Event) } type Event struct { Data interface{} } type GitterConnectionClosed struct { } type MessageReceived struct { Message Message } // connect and try to reconnect with func (stream *Stream) connect() { if stream.streamConnection.retries == stream.streamConnection.currentRetries { stream.Close() stream.gitter.log("Number of retries exceeded the max retries number, we are done here") return } res, err := stream.gitter.getResponse(stream.url, stream) if stream.streamConnection.canceled { // do nothing } else if err != nil || res.StatusCode != 200 { stream.gitter.log("Failed to get response, trying reconnect ") stream.gitter.log(err) // sleep and wait stream.streamConnection.currentRetries++ time.Sleep(time.Millisecond * stream.streamConnection.wait * time.Duration(stream.streamConnection.currentRetries)) // connect again stream.Close() stream.connect() } else { stream.gitter.log("Response was received") stream.streamConnection.currentRetries = 0 stream.streamConnection.closed = false stream.streamConnection.response = res } } type streamConnection struct { // connection was closed closed bool // canceled canceled bool // wait time till next try wait time.Duration // max tries to recover retries int // current streamed response response *http.Response // current request request *http.Request // current status currentRetries int } // Close the stream connection and stop receiving streamed data func (stream *Stream) Close() { conn := stream.streamConnection conn.closed = true if conn.response != nil { stream.gitter.log("Stream connection close response") defer conn.response.Body.Close() } if conn.request != nil { stream.gitter.log("Stream connection close request") switch transport := stream.gitter.config.client.Transport.(type) { case *httpclient.Transport: stream.streamConnection.canceled = true transport.CancelRequest(conn.request) default: } } conn.currentRetries = 0 } func (stream *Stream) isClosed() bool { return stream.streamConnection.closed } func (stream *Stream) getResponse() *http.Response { return stream.streamConnection.response } // Optional, set stream connection properties // wait - time in milliseconds of waiting between reconnections. Will grow exponentially. // retries - number of reconnections retries before dropping the stream. func (gitter *Gitter) newStreamConnection(wait time.Duration, retries int) *streamConnection { return &streamConnection{ closed: true, wait: wait, retries: retries, } }