diff options
Diffstat (limited to 'vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go')
-rw-r--r-- | vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go | 274 |
1 files changed, 274 insertions, 0 deletions
diff --git a/vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go b/vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go new file mode 100644 index 00000000..dad20474 --- /dev/null +++ b/vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go @@ -0,0 +1,274 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package mlog + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/mattermost/logr" + + _ "net/http/pprof" +) + +const ( + DialTimeoutSecs = 30 + WriteTimeoutSecs = 30 + RetryBackoffMillis int64 = 100 + MaxRetryBackoffMillis int64 = 30 * 1000 // 30 seconds +) + +// Tcp outputs log records to raw socket server. +type Tcp struct { + logr.Basic + + params *TcpParams + addy string + + mutex sync.Mutex + conn net.Conn + monitor chan struct{} + shutdown chan struct{} +} + +// TcpParams provides parameters for dialing a socket server. +type TcpParams struct { + IP string `json:"IP"` + Port int `json:"Port"` + TLS bool `json:"TLS"` + Cert string `json:"Cert"` + Insecure bool `json:"Insecure"` +} + +// NewTcpTarget creates a target capable of outputting log records to a raw socket, with or without TLS. +func NewTcpTarget(filter logr.Filter, formatter logr.Formatter, params *TcpParams, maxQueue int) (*Tcp, error) { + tcp := &Tcp{ + params: params, + addy: fmt.Sprintf("%s:%d", params.IP, params.Port), + monitor: make(chan struct{}), + shutdown: make(chan struct{}), + } + tcp.Basic.Start(tcp, tcp, filter, formatter, maxQueue) + + return tcp, nil +} + +// getConn provides a net.Conn. If a connection already exists, it is returned immediately, +// otherwise this method blocks until a new connection is created, timeout or shutdown. +func (tcp *Tcp) getConn() (net.Conn, error) { + tcp.mutex.Lock() + defer tcp.mutex.Unlock() + + Log(LvlTcpLogTarget, "getConn enter", String("addy", tcp.addy)) + defer Log(LvlTcpLogTarget, "getConn exit", String("addy", tcp.addy)) + + if tcp.conn != nil { + Log(LvlTcpLogTarget, "reusing existing conn", String("addy", tcp.addy)) // use "With" once Zap is removed + return tcp.conn, nil + } + + type result struct { + conn net.Conn + err error + } + + connChan := make(chan result) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*DialTimeoutSecs) + defer cancel() + + go func(ctx context.Context, ch chan result) { + Log(LvlTcpLogTarget, "dailing", String("addy", tcp.addy)) + conn, err := tcp.dial(ctx) + if err == nil { + tcp.conn = conn + tcp.monitor = make(chan struct{}) + go monitor(tcp.conn, tcp.monitor) + } + connChan <- result{conn: conn, err: err} + }(ctx, connChan) + + select { + case <-tcp.shutdown: + return nil, errors.New("shutdown") + case res := <-connChan: + return res.conn, res.err + } +} + +// dial connects to a TCP socket, and optionally performs a TLS handshake. +// A non-nil context must be provided which can cancel the dial. +func (tcp *Tcp) dial(ctx context.Context) (net.Conn, error) { + var dialer net.Dialer + dialer.Timeout = time.Second * DialTimeoutSecs + conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", tcp.params.IP, tcp.params.Port)) + if err != nil { + return nil, err + } + + if !tcp.params.TLS { + return conn, nil + } + + Log(LvlTcpLogTarget, "TLS handshake", String("addy", tcp.addy)) + + tlsconfig := &tls.Config{ + ServerName: tcp.params.IP, + InsecureSkipVerify: tcp.params.Insecure, + } + if tcp.params.Cert != "" { + pool, err := getCertPool(tcp.params.Cert) + if err != nil { + return nil, err + } + tlsconfig.RootCAs = pool + } + + tlsConn := tls.Client(conn, tlsconfig) + if err := tlsConn.Handshake(); err != nil { + return nil, err + } + return tlsConn, nil +} + +func (tcp *Tcp) close() error { + tcp.mutex.Lock() + defer tcp.mutex.Unlock() + + var err error + if tcp.conn != nil { + Log(LvlTcpLogTarget, "closing connection", String("addy", tcp.addy)) + close(tcp.monitor) + err = tcp.conn.Close() + tcp.conn = nil + } + return err +} + +// Shutdown stops processing log records after making best effort to flush queue. +func (tcp *Tcp) Shutdown(ctx context.Context) error { + errs := &multierror.Error{} + + Log(LvlTcpLogTarget, "shutting down", String("addy", tcp.addy)) + + if err := tcp.Basic.Shutdown(ctx); err != nil { + errs = multierror.Append(errs, err) + } + + if err := tcp.close(); err != nil { + errs = multierror.Append(errs, err) + } + + close(tcp.shutdown) + return errs.ErrorOrNil() +} + +// Write converts the log record to bytes, via the Formatter, and outputs to the socket. +// Called by dedicated target goroutine and will block until success or shutdown. +func (tcp *Tcp) Write(rec *logr.LogRec) error { + _, stacktrace := tcp.IsLevelEnabled(rec.Level()) + + buf := rec.Logger().Logr().BorrowBuffer() + defer rec.Logger().Logr().ReleaseBuffer(buf) + + buf, err := tcp.Formatter().Format(rec, stacktrace, buf) + if err != nil { + return err + } + + try := 1 + backoff := RetryBackoffMillis + for { + select { + case <-tcp.shutdown: + return err + default: + } + + conn, err := tcp.getConn() + if err != nil { + Log(LvlTcpLogTarget, "failed getting connection", String("addy", tcp.addy), Err(err)) + reporter := rec.Logger().Logr().ReportError + reporter(fmt.Errorf("log target %s connection error: %w", tcp.String(), err)) + backoff = tcp.sleep(backoff) + continue + } + + conn.SetWriteDeadline(time.Now().Add(time.Second * WriteTimeoutSecs)) + _, err = buf.WriteTo(conn) + if err == nil { + return nil + } + + Log(LvlTcpLogTarget, "write error", String("addy", tcp.addy), Err(err)) + reporter := rec.Logger().Logr().ReportError + reporter(fmt.Errorf("log target %s write error: %w", tcp.String(), err)) + + _ = tcp.close() + + backoff = tcp.sleep(backoff) + try++ + Log(LvlTcpLogTarget, "retrying write", String("addy", tcp.addy), Int("try", try)) + } +} + +// monitor continuously tries to read from the connection to detect socket close. +// This is needed because TCP target uses a write only socket and Linux systems +// take a long time to detect a loss of connectivity on a socket when only writing; +// the writes simply fail without an error returned. +func monitor(conn net.Conn, done <-chan struct{}) { + addy := conn.RemoteAddr().String() + defer Log(LvlTcpLogTarget, "monitor exiting", String("addy", addy)) + + buf := make([]byte, 1) + for { + Log(LvlTcpLogTarget, "monitor loop", String("addy", addy)) + + select { + case <-done: + return + case <-time.After(1 * time.Second): + } + + err := conn.SetReadDeadline(time.Now().Add(time.Second * 30)) + if err != nil { + continue + } + + _, err = conn.Read(buf) + + if errt, ok := err.(net.Error); ok && errt.Timeout() { + // read timeout is expected, keep looping. + continue + } + + // Any other error closes the connection, forcing a reconnect. + Log(LvlTcpLogTarget, "monitor closing connection", Err(err)) + conn.Close() + return + } +} + +// String returns a string representation of this target. +func (tcp *Tcp) String() string { + return fmt.Sprintf("TcpTarget[%s:%d]", tcp.params.IP, tcp.params.Port) +} + +func (tcp *Tcp) sleep(backoff int64) int64 { + select { + case <-tcp.shutdown: + case <-time.After(time.Millisecond * time.Duration(backoff)): + } + + nextBackoff := backoff + (backoff >> 1) + if nextBackoff > MaxRetryBackoffMillis { + nextBackoff = MaxRetryBackoffMillis + } + return nextBackoff +} |