summaryrefslogblamecommitdiffstats
path: root/vendor/github.com/mattermost/mattermost-server/v5/model/websocket_client.go
blob: cd89e2d8c24f48da3b32ed78d7637cf849e13c16 (plain) (tree)



































                                                                                          
                                           

























































































































































































































































































                                                                                                                                                                                              
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.

package model

import (
	"bytes"
	"encoding/json"
	"net/http"
	"sync/atomic"
	"time"

	"github.com/gorilla/websocket"
)

const (
	SOCKET_MAX_MESSAGE_SIZE_KB  = 8 * 1024 // 8KB
	PING_TIMEOUT_BUFFER_SECONDS = 5
)

type msgType int

const (
	msgTypeJSON msgType = iota + 1
	msgTypePong
)

type writeMessage struct {
	msgType msgType
	data    interface{}
}

const avgReadMsgSizeBytes = 1024

// WebSocketClient stores the necessary information required to
// communicate with a WebSocket endpoint.
// A client must read from PingTimeoutChannel, EventChannel and ResponseChannel to prevent
// deadlocks from occurring in the program.
type WebSocketClient struct {
	Url                string                  // The location of the server like "ws://localhost:8065"
	ApiUrl             string                  // The API location of the server like "ws://localhost:8065/api/v3"
	ConnectUrl         string                  // The WebSocket URL to connect to like "ws://localhost:8065/api/v3/path/to/websocket"
	Conn               *websocket.Conn         // The WebSocket connection
	AuthToken          string                  // The token used to open the WebSocket connection
	Sequence           int64                   // The ever-incrementing sequence attached to each WebSocket action
	PingTimeoutChannel chan bool               // The channel used to signal ping timeouts
	EventChannel       chan *WebSocketEvent    // The channel used to receive various events pushed from the server. For example: typing, posted
	ResponseChannel    chan *WebSocketResponse // The channel used to receive responses for requests made to the server
	ListenError        *AppError               // A field that is set if there was an abnormal closure of the WebSocket connection
	writeChan          chan writeMessage

	pingTimeoutTimer *time.Timer
	quitPingWatchdog chan struct{}

	quitWriterChan chan struct{}
	resetTimerChan chan struct{}
	closed         int32
}

// NewWebSocketClient constructs a new WebSocket client with convenience
// methods for talking to the server.
func NewWebSocketClient(url, authToken string) (*WebSocketClient, *AppError) {
	return NewWebSocketClientWithDialer(websocket.DefaultDialer, url, authToken)
}

// NewWebSocketClientWithDialer constructs a new WebSocket client with convenience
// methods for talking to the server using a custom dialer.
func NewWebSocketClientWithDialer(dialer *websocket.Dialer, url, authToken string) (*WebSocketClient, *AppError) {
	conn, _, err := dialer.Dial(url+API_URL_SUFFIX+"/websocket", nil)
	if err != nil {
		return nil, NewAppError("NewWebSocketClient", "model.websocket_client.connect_fail.app_error", nil, err.Error(), http.StatusInternalServerError)
	}

	client := &WebSocketClient{
		Url:                url,
		ApiUrl:             url + API_URL_SUFFIX,
		ConnectUrl:         url + API_URL_SUFFIX + "/websocket",
		Conn:               conn,
		AuthToken:          authToken,
		Sequence:           1,
		PingTimeoutChannel: make(chan bool, 1),
		EventChannel:       make(chan *WebSocketEvent, 100),
		ResponseChannel:    make(chan *WebSocketResponse, 100),
		writeChan:          make(chan writeMessage),
		quitPingWatchdog:   make(chan struct{}),
		quitWriterChan:     make(chan struct{}),
		resetTimerChan:     make(chan struct{}),
	}

	client.configurePingHandling()
	go client.writer()

	client.SendMessage(WEBSOCKET_AUTHENTICATION_CHALLENGE, map[string]interface{}{"token": authToken})

	return client, nil
}

// NewWebSocketClient4 constructs a new WebSocket client with convenience
// methods for talking to the server. Uses the v4 endpoint.
func NewWebSocketClient4(url, authToken string) (*WebSocketClient, *AppError) {
	return NewWebSocketClient4WithDialer(websocket.DefaultDialer, url, authToken)
}

// NewWebSocketClient4WithDialer constructs a new WebSocket client with convenience
// methods for talking to the server using a custom dialer. Uses the v4 endpoint.
func NewWebSocketClient4WithDialer(dialer *websocket.Dialer, url, authToken string) (*WebSocketClient, *AppError) {
	return NewWebSocketClientWithDialer(dialer, url, authToken)
}

// Connect creates a websocket connection with the given ConnectUrl.
// This is racy and error-prone should not be used. Use any of the New* functions to create a websocket.
func (wsc *WebSocketClient) Connect() *AppError {
	return wsc.ConnectWithDialer(websocket.DefaultDialer)
}

// ConnectWithDialer creates a websocket connection with the given ConnectUrl using the dialer.
// This is racy and error-prone and should not be used. Use any of the New* functions to create a websocket.
func (wsc *WebSocketClient) ConnectWithDialer(dialer *websocket.Dialer) *AppError {
	var err error
	wsc.Conn, _, err = dialer.Dial(wsc.ConnectUrl, nil)
	if err != nil {
		return NewAppError("Connect", "model.websocket_client.connect_fail.app_error", nil, err.Error(), http.StatusInternalServerError)
	}
	// Super racy and should not be done anyways.
	// All of this needs to be redesigned for v6.
	wsc.configurePingHandling()
	// If it has been closed before, we just restart the writer.
	if atomic.CompareAndSwapInt32(&wsc.closed, 1, 0) {
		wsc.writeChan = make(chan writeMessage)
		wsc.quitWriterChan = make(chan struct{})
		go wsc.writer()
		wsc.resetTimerChan = make(chan struct{})
		wsc.quitPingWatchdog = make(chan struct{})
	}

	wsc.EventChannel = make(chan *WebSocketEvent, 100)
	wsc.ResponseChannel = make(chan *WebSocketResponse, 100)

	wsc.SendMessage(WEBSOCKET_AUTHENTICATION_CHALLENGE, map[string]interface{}{"token": wsc.AuthToken})

	return nil
}

// Close closes the websocket client. It is recommended that a closed client should not be
// reused again. Rather a new client should be created anew.
func (wsc *WebSocketClient) Close() {
	// CAS to 1 and proceed. Return if already 1.
	if !atomic.CompareAndSwapInt32(&wsc.closed, 0, 1) {
		return
	}
	wsc.quitWriterChan <- struct{}{}
	close(wsc.writeChan)
	// We close the connection, which breaks the reader loop.
	// Then we let the defer block in the reader do further cleanup.
	wsc.Conn.Close()
}

// TODO: un-export the Conn so that Write methods go through the writer
func (wsc *WebSocketClient) writer() {
	for {
		select {
		case msg := <-wsc.writeChan:
			switch msg.msgType {
			case msgTypeJSON:
				wsc.Conn.WriteJSON(msg.data)
			case msgTypePong:
				wsc.Conn.WriteMessage(websocket.PongMessage, []byte{})
			}
		case <-wsc.quitWriterChan:
			return
		}
	}
}

// Listen starts the read loop of the websocket client.
func (wsc *WebSocketClient) Listen() {
	// This loop can exit in 2 conditions:
	// 1. Either the connection breaks naturally.
	// 2. Close was explicitly called, which closes the connection manually.
	//
	// Due to the way the API is written, there is a requirement that a client may NOT
	// call Listen at all and can still call Close and Connect.
	// Therefore, we let the cleanup of the reader stuff rely on closing the connection
	// and then we do the cleanup in the defer block.
	//
	// First, we close some channels and then CAS to 1 and proceed to close the writer chan also.
	// This is needed because then the defer clause does not double-close the writer when (2) happens.
	// But if (1) happens, we set the closed bit, and close the rest of the stuff.
	go func() {
		defer func() {
			close(wsc.EventChannel)
			close(wsc.ResponseChannel)
			close(wsc.quitPingWatchdog)
			close(wsc.resetTimerChan)
			// We CAS to 1 and proceed.
			if !atomic.CompareAndSwapInt32(&wsc.closed, 0, 1) {
				return
			}
			wsc.quitWriterChan <- struct{}{}
			close(wsc.writeChan)
			wsc.Conn.Close() // This can most likely be removed. Needs to be checked.
		}()

		var buf bytes.Buffer
		buf.Grow(avgReadMsgSizeBytes)

		for {
			// Reset buffer.
			buf.Reset()
			_, r, err := wsc.Conn.NextReader()
			if err != nil {
				if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
					wsc.ListenError = NewAppError("NewWebSocketClient", "model.websocket_client.connect_fail.app_error", nil, err.Error(), http.StatusInternalServerError)
				}
				return
			}
			// Use pre-allocated buffer.
			_, err = buf.ReadFrom(r)
			if err != nil {
				// This should use a different error ID, but en.json is not imported anyways.
				// It's a different bug altogether but we let it be for now.
				// See MM-24520.
				wsc.ListenError = NewAppError("NewWebSocketClient", "model.websocket_client.connect_fail.app_error", nil, err.Error(), http.StatusInternalServerError)
				return
			}

			event := WebSocketEventFromJson(bytes.NewReader(buf.Bytes()))
			if event == nil {
				continue
			}
			if event.IsValid() {
				wsc.EventChannel <- event
				continue
			}

			var response WebSocketResponse
			if err := json.Unmarshal(buf.Bytes(), &response); err == nil && response.IsValid() {
				wsc.ResponseChannel <- &response
				continue
			}
		}
	}()
}

func (wsc *WebSocketClient) SendMessage(action string, data map[string]interface{}) {
	req := &WebSocketRequest{}
	req.Seq = wsc.Sequence
	req.Action = action
	req.Data = data

	wsc.Sequence++
	wsc.writeChan <- writeMessage{
		msgType: msgTypeJSON,
		data:    req,
	}
}

// UserTyping will push a user_typing event out to all connected users
// who are in the specified channel
func (wsc *WebSocketClient) UserTyping(channelId, parentId string) {
	data := map[string]interface{}{
		"channel_id": channelId,
		"parent_id":  parentId,
	}

	wsc.SendMessage("user_typing", data)
}

// GetStatuses will return a map of string statuses using user id as the key
func (wsc *WebSocketClient) GetStatuses() {
	wsc.SendMessage("get_statuses", nil)
}

// GetStatusesByIds will fetch certain user statuses based on ids and return
// a map of string statuses using user id as the key
func (wsc *WebSocketClient) GetStatusesByIds(userIds []string) {
	data := map[string]interface{}{
		"user_ids": userIds,
	}
	wsc.SendMessage("get_statuses_by_ids", data)
}

func (wsc *WebSocketClient) configurePingHandling() {
	wsc.Conn.SetPingHandler(wsc.pingHandler)
	wsc.pingTimeoutTimer = time.NewTimer(time.Second * (60 + PING_TIMEOUT_BUFFER_SECONDS))
	go wsc.pingWatchdog()
}

func (wsc *WebSocketClient) pingHandler(appData string) error {
	if atomic.LoadInt32(&wsc.closed) == 1 {
		return nil
	}
	wsc.resetTimerChan <- struct{}{}
	wsc.writeChan <- writeMessage{
		msgType: msgTypePong,
	}
	return nil
}

// pingWatchdog is used to send values to the PingTimeoutChannel whenever a timeout occurs.
// We use the resetTimerChan from the pingHandler to pass the signal, and then reset the timer
// after draining it. And if the timer naturally expires, we also extend it to prevent it from
// being deadlocked when the resetTimerChan case runs. Because timer.Stop would return false,
// and the code would be forever stuck trying to read from C.
func (wsc *WebSocketClient) pingWatchdog() {
	for {
		select {
		case <-wsc.resetTimerChan:
			if !wsc.pingTimeoutTimer.Stop() {
				<-wsc.pingTimeoutTimer.C
			}
			wsc.pingTimeoutTimer.Reset(time.Second * (60 + PING_TIMEOUT_BUFFER_SECONDS))

		case <-wsc.pingTimeoutTimer.C:
			wsc.PingTimeoutChannel <- true
			wsc.pingTimeoutTimer.Reset(time.Second * (60 + PING_TIMEOUT_BUFFER_SECONDS))
		case <-wsc.quitPingWatchdog:
			return
		}
	}
}