diff options
Diffstat (limited to 'vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go')
-rw-r--r-- | vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go | 274 |
1 files changed, 0 insertions, 274 deletions
diff --git a/vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go b/vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go deleted file mode 100644 index dad20474..00000000 --- a/vendor/github.com/mattermost/mattermost-server/v5/mlog/tcp.go +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. -// See LICENSE.txt for license information. - -package mlog - -import ( - "context" - "crypto/tls" - "errors" - "fmt" - "net" - "sync" - "time" - - "github.com/hashicorp/go-multierror" - "github.com/mattermost/logr" - - _ "net/http/pprof" -) - -const ( - DialTimeoutSecs = 30 - WriteTimeoutSecs = 30 - RetryBackoffMillis int64 = 100 - MaxRetryBackoffMillis int64 = 30 * 1000 // 30 seconds -) - -// Tcp outputs log records to raw socket server. -type Tcp struct { - logr.Basic - - params *TcpParams - addy string - - mutex sync.Mutex - conn net.Conn - monitor chan struct{} - shutdown chan struct{} -} - -// TcpParams provides parameters for dialing a socket server. -type TcpParams struct { - IP string `json:"IP"` - Port int `json:"Port"` - TLS bool `json:"TLS"` - Cert string `json:"Cert"` - Insecure bool `json:"Insecure"` -} - -// NewTcpTarget creates a target capable of outputting log records to a raw socket, with or without TLS. -func NewTcpTarget(filter logr.Filter, formatter logr.Formatter, params *TcpParams, maxQueue int) (*Tcp, error) { - tcp := &Tcp{ - params: params, - addy: fmt.Sprintf("%s:%d", params.IP, params.Port), - monitor: make(chan struct{}), - shutdown: make(chan struct{}), - } - tcp.Basic.Start(tcp, tcp, filter, formatter, maxQueue) - - return tcp, nil -} - -// getConn provides a net.Conn. If a connection already exists, it is returned immediately, -// otherwise this method blocks until a new connection is created, timeout or shutdown. -func (tcp *Tcp) getConn() (net.Conn, error) { - tcp.mutex.Lock() - defer tcp.mutex.Unlock() - - Log(LvlTcpLogTarget, "getConn enter", String("addy", tcp.addy)) - defer Log(LvlTcpLogTarget, "getConn exit", String("addy", tcp.addy)) - - if tcp.conn != nil { - Log(LvlTcpLogTarget, "reusing existing conn", String("addy", tcp.addy)) // use "With" once Zap is removed - return tcp.conn, nil - } - - type result struct { - conn net.Conn - err error - } - - connChan := make(chan result) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*DialTimeoutSecs) - defer cancel() - - go func(ctx context.Context, ch chan result) { - Log(LvlTcpLogTarget, "dailing", String("addy", tcp.addy)) - conn, err := tcp.dial(ctx) - if err == nil { - tcp.conn = conn - tcp.monitor = make(chan struct{}) - go monitor(tcp.conn, tcp.monitor) - } - connChan <- result{conn: conn, err: err} - }(ctx, connChan) - - select { - case <-tcp.shutdown: - return nil, errors.New("shutdown") - case res := <-connChan: - return res.conn, res.err - } -} - -// dial connects to a TCP socket, and optionally performs a TLS handshake. -// A non-nil context must be provided which can cancel the dial. -func (tcp *Tcp) dial(ctx context.Context) (net.Conn, error) { - var dialer net.Dialer - dialer.Timeout = time.Second * DialTimeoutSecs - conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", tcp.params.IP, tcp.params.Port)) - if err != nil { - return nil, err - } - - if !tcp.params.TLS { - return conn, nil - } - - Log(LvlTcpLogTarget, "TLS handshake", String("addy", tcp.addy)) - - tlsconfig := &tls.Config{ - ServerName: tcp.params.IP, - InsecureSkipVerify: tcp.params.Insecure, - } - if tcp.params.Cert != "" { - pool, err := getCertPool(tcp.params.Cert) - if err != nil { - return nil, err - } - tlsconfig.RootCAs = pool - } - - tlsConn := tls.Client(conn, tlsconfig) - if err := tlsConn.Handshake(); err != nil { - return nil, err - } - return tlsConn, nil -} - -func (tcp *Tcp) close() error { - tcp.mutex.Lock() - defer tcp.mutex.Unlock() - - var err error - if tcp.conn != nil { - Log(LvlTcpLogTarget, "closing connection", String("addy", tcp.addy)) - close(tcp.monitor) - err = tcp.conn.Close() - tcp.conn = nil - } - return err -} - -// Shutdown stops processing log records after making best effort to flush queue. -func (tcp *Tcp) Shutdown(ctx context.Context) error { - errs := &multierror.Error{} - - Log(LvlTcpLogTarget, "shutting down", String("addy", tcp.addy)) - - if err := tcp.Basic.Shutdown(ctx); err != nil { - errs = multierror.Append(errs, err) - } - - if err := tcp.close(); err != nil { - errs = multierror.Append(errs, err) - } - - close(tcp.shutdown) - return errs.ErrorOrNil() -} - -// Write converts the log record to bytes, via the Formatter, and outputs to the socket. -// Called by dedicated target goroutine and will block until success or shutdown. -func (tcp *Tcp) Write(rec *logr.LogRec) error { - _, stacktrace := tcp.IsLevelEnabled(rec.Level()) - - buf := rec.Logger().Logr().BorrowBuffer() - defer rec.Logger().Logr().ReleaseBuffer(buf) - - buf, err := tcp.Formatter().Format(rec, stacktrace, buf) - if err != nil { - return err - } - - try := 1 - backoff := RetryBackoffMillis - for { - select { - case <-tcp.shutdown: - return err - default: - } - - conn, err := tcp.getConn() - if err != nil { - Log(LvlTcpLogTarget, "failed getting connection", String("addy", tcp.addy), Err(err)) - reporter := rec.Logger().Logr().ReportError - reporter(fmt.Errorf("log target %s connection error: %w", tcp.String(), err)) - backoff = tcp.sleep(backoff) - continue - } - - conn.SetWriteDeadline(time.Now().Add(time.Second * WriteTimeoutSecs)) - _, err = buf.WriteTo(conn) - if err == nil { - return nil - } - - Log(LvlTcpLogTarget, "write error", String("addy", tcp.addy), Err(err)) - reporter := rec.Logger().Logr().ReportError - reporter(fmt.Errorf("log target %s write error: %w", tcp.String(), err)) - - _ = tcp.close() - - backoff = tcp.sleep(backoff) - try++ - Log(LvlTcpLogTarget, "retrying write", String("addy", tcp.addy), Int("try", try)) - } -} - -// monitor continuously tries to read from the connection to detect socket close. -// This is needed because TCP target uses a write only socket and Linux systems -// take a long time to detect a loss of connectivity on a socket when only writing; -// the writes simply fail without an error returned. -func monitor(conn net.Conn, done <-chan struct{}) { - addy := conn.RemoteAddr().String() - defer Log(LvlTcpLogTarget, "monitor exiting", String("addy", addy)) - - buf := make([]byte, 1) - for { - Log(LvlTcpLogTarget, "monitor loop", String("addy", addy)) - - select { - case <-done: - return - case <-time.After(1 * time.Second): - } - - err := conn.SetReadDeadline(time.Now().Add(time.Second * 30)) - if err != nil { - continue - } - - _, err = conn.Read(buf) - - if errt, ok := err.(net.Error); ok && errt.Timeout() { - // read timeout is expected, keep looping. - continue - } - - // Any other error closes the connection, forcing a reconnect. - Log(LvlTcpLogTarget, "monitor closing connection", Err(err)) - conn.Close() - return - } -} - -// String returns a string representation of this target. -func (tcp *Tcp) String() string { - return fmt.Sprintf("TcpTarget[%s:%d]", tcp.params.IP, tcp.params.Port) -} - -func (tcp *Tcp) sleep(backoff int64) int64 { - select { - case <-tcp.shutdown: - case <-time.After(time.Millisecond * time.Duration(backoff)): - } - - nextBackoff := backoff + (backoff >> 1) - if nextBackoff > MaxRetryBackoffMillis { - nextBackoff = MaxRetryBackoffMillis - } - return nextBackoff -} |