diff options
Diffstat (limited to 'vendor/github.com/mattermost/logr/logr.go')
-rw-r--r-- | vendor/github.com/mattermost/logr/logr.go | 664 |
1 files changed, 664 insertions, 0 deletions
diff --git a/vendor/github.com/mattermost/logr/logr.go b/vendor/github.com/mattermost/logr/logr.go new file mode 100644 index 00000000..631366a5 --- /dev/null +++ b/vendor/github.com/mattermost/logr/logr.go @@ -0,0 +1,664 @@ +package logr + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "sync" + "time" + + "github.com/wiggin77/cfg" + "github.com/wiggin77/merror" +) + +// Logr maintains a list of log targets and accepts incoming +// log records. +type Logr struct { + tmux sync.RWMutex // target mutex + targets []Target + + mux sync.RWMutex + maxQueueSizeActual int + in chan *LogRec + done chan struct{} + once sync.Once + shutdown bool + lvlCache levelCache + + metricsInitOnce sync.Once + metricsCloseOnce sync.Once + metricsDone chan struct{} + metrics MetricsCollector + queueSizeGauge Gauge + loggedCounter Counter + errorCounter Counter + + bufferPool sync.Pool + + // MaxQueueSize is the maximum number of log records that can be queued. + // If exceeded, `OnQueueFull` is called which determines if the log + // record will be dropped or block until add is successful. + // If this is modified, it must be done before `Configure` or + // `AddTarget`. Defaults to DefaultMaxQueueSize. + MaxQueueSize int + + // OnLoggerError, when not nil, is called any time an internal + // logging error occurs. For example, this can happen when a + // target cannot connect to its data sink. + OnLoggerError func(error) + + // OnQueueFull, when not nil, is called on an attempt to add + // a log record to a full Logr queue. + // `MaxQueueSize` can be used to modify the maximum queue size. + // This function should return quickly, with a bool indicating whether + // the log record should be dropped (true) or block until the log record + // is successfully added (false). If nil then blocking (false) is assumed. + OnQueueFull func(rec *LogRec, maxQueueSize int) bool + + // OnTargetQueueFull, when not nil, is called on an attempt to add + // a log record to a full target queue provided the target supports reporting + // this condition. + // This function should return quickly, with a bool indicating whether + // the log record should be dropped (true) or block until the log record + // is successfully added (false). If nil then blocking (false) is assumed. + OnTargetQueueFull func(target Target, rec *LogRec, maxQueueSize int) bool + + // OnExit, when not nil, is called when a FatalXXX style log API is called. + // When nil, then the default behavior is to cleanly shut down this Logr and + // call `os.Exit(code)`. + OnExit func(code int) + + // OnPanic, when not nil, is called when a PanicXXX style log API is called. + // When nil, then the default behavior is to cleanly shut down this Logr and + // call `panic(err)`. + OnPanic func(err interface{}) + + // EnqueueTimeout is the amount of time a log record can take to be queued. + // This only applies to blocking enqueue which happen after `logr.OnQueueFull` + // is called and returns false. + EnqueueTimeout time.Duration + + // ShutdownTimeout is the amount of time `logr.Shutdown` can execute before + // timing out. + ShutdownTimeout time.Duration + + // FlushTimeout is the amount of time `logr.Flush` can execute before + // timing out. + FlushTimeout time.Duration + + // UseSyncMapLevelCache can be set to true before the first target is added + // when high concurrency (e.g. >32 cores) is expected. This may improve + // performance with large numbers of cores - benchmark for your use case. + UseSyncMapLevelCache bool + + // MaxPooledFormatBuffer determines the maximum size of a buffer that can be + // pooled. To reduce allocations, the buffers needed during formatting (etc) + // are pooled. A very large log item will grow a buffer that could stay in + // memory indefinitely. This settings lets you control how big a pooled buffer + // can be - anything larger will be garbage collected after use. + // Defaults to 1MB. + MaxPooledBuffer int + + // DisableBufferPool when true disables the buffer pool. See MaxPooledBuffer. + DisableBufferPool bool + + // MetricsUpdateFreqMillis determines how often polled metrics are updated + // when metrics are enabled. + MetricsUpdateFreqMillis int64 +} + +// Configure adds/removes targets via the supplied `Config`. +func (logr *Logr) Configure(config *cfg.Config) error { + // TODO + return fmt.Errorf("not implemented yet") +} + +func (logr *Logr) ensureInit() { + logr.once.Do(func() { + defer func() { + go logr.start() + }() + + logr.mux.Lock() + defer logr.mux.Unlock() + + logr.maxQueueSizeActual = logr.MaxQueueSize + if logr.maxQueueSizeActual == 0 { + logr.maxQueueSizeActual = DefaultMaxQueueSize + } + + if logr.maxQueueSizeActual < 0 { + logr.maxQueueSizeActual = 0 + } + + logr.in = make(chan *LogRec, logr.maxQueueSizeActual) + logr.done = make(chan struct{}) + + if logr.UseSyncMapLevelCache { + logr.lvlCache = &syncMapLevelCache{} + } else { + logr.lvlCache = &arrayLevelCache{} + } + + if logr.MaxPooledBuffer == 0 { + logr.MaxPooledBuffer = DefaultMaxPooledBuffer + } + logr.bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + } + + logr.lvlCache.setup() + }) +} + +// AddTarget adds one or more targets to the logger which will receive +// log records for outputting. +func (logr *Logr) AddTarget(targets ...Target) error { + if logr.IsShutdown() { + return fmt.Errorf("AddTarget called after Logr shut down") + } + + logr.ensureInit() + metrics := logr.getMetricsCollector() + defer logr.ResetLevelCache() // call this after tmux is released + + logr.tmux.Lock() + defer logr.tmux.Unlock() + + errs := merror.New() + for _, t := range targets { + if t == nil { + continue + } + + logr.targets = append(logr.targets, t) + if metrics != nil { + if tm, ok := t.(TargetWithMetrics); ok { + if err := tm.EnableMetrics(metrics, logr.MetricsUpdateFreqMillis); err != nil { + errs.Append(err) + } + } + } + } + return errs.ErrorOrNil() +} + +// NewLogger creates a Logger using defaults. A `Logger` is light-weight +// enough to create on-demand, but typically one or more Loggers are +// created and re-used. +func (logr *Logr) NewLogger() Logger { + logger := Logger{logr: logr} + return logger +} + +var levelStatusDisabled = LevelStatus{} + +// IsLevelEnabled returns true if at least one target has the specified +// level enabled. The result is cached so that subsequent checks are fast. +func (logr *Logr) IsLevelEnabled(lvl Level) LevelStatus { + status, ok := logr.isLevelEnabledFromCache(lvl) + if ok { + return status + } + + // Check each target. + logr.tmux.RLock() + for _, t := range logr.targets { + e, s := t.IsLevelEnabled(lvl) + if e { + status.Enabled = true + if s { + status.Stacktrace = true + break // if both enabled then no sense checking more targets + } + } + } + logr.tmux.RUnlock() + + // Cache and return the result. + if err := logr.updateLevelCache(lvl.ID, status); err != nil { + logr.ReportError(err) + return LevelStatus{} + } + return status +} + +func (logr *Logr) isLevelEnabledFromCache(lvl Level) (LevelStatus, bool) { + logr.mux.RLock() + defer logr.mux.RUnlock() + + // Don't accept new log records after shutdown. + if logr.shutdown { + return levelStatusDisabled, true + } + + // Check cache. lvlCache may still be nil if no targets added. + if logr.lvlCache == nil { + return levelStatusDisabled, true + } + status, ok := logr.lvlCache.get(lvl.ID) + if ok { + return status, true + } + return LevelStatus{}, false +} + +func (logr *Logr) updateLevelCache(id LevelID, status LevelStatus) error { + logr.mux.RLock() + defer logr.mux.RUnlock() + if logr.lvlCache != nil { + return logr.lvlCache.put(id, status) + } + return nil +} + +// HasTargets returns true only if at least one target exists within the Logr. +func (logr *Logr) HasTargets() bool { + logr.tmux.RLock() + defer logr.tmux.RUnlock() + return len(logr.targets) > 0 +} + +// TargetInfo provides name and type for a Target. +type TargetInfo struct { + Name string + Type string +} + +// TargetInfos enumerates all the targets added to this Logr. +// The resulting slice represents a snapshot at time of calling. +func (logr *Logr) TargetInfos() []TargetInfo { + logr.tmux.RLock() + defer logr.tmux.RUnlock() + + infos := make([]TargetInfo, 0) + + for _, t := range logr.targets { + inf := TargetInfo{ + Name: fmt.Sprintf("%v", t), + Type: fmt.Sprintf("%T", t), + } + infos = append(infos, inf) + } + return infos +} + +// RemoveTargets safely removes one or more targets based on the filtering method. +// f should return true to delete the target, false to keep it. +// When removing a target, best effort is made to write any queued log records before +// closing, with cxt determining how much time can be spent in total. +// Note, keep the timeout short since this method blocks certain logging operations. +func (logr *Logr) RemoveTargets(cxt context.Context, f func(ti TargetInfo) bool) error { + var removed bool + defer func() { + if removed { + // call this after tmux is released since + // it will lock mux and we don't want to + // introduce possible deadlock. + logr.ResetLevelCache() + } + }() + + errs := merror.New() + + logr.tmux.Lock() + defer logr.tmux.Unlock() + + cp := make([]Target, 0) + + for _, t := range logr.targets { + inf := TargetInfo{ + Name: fmt.Sprintf("%v", t), + Type: fmt.Sprintf("%T", t), + } + if f(inf) { + if err := t.Shutdown(cxt); err != nil { + errs.Append(err) + } + removed = true + } else { + cp = append(cp, t) + } + } + logr.targets = cp + return errs.ErrorOrNil() +} + +// ResetLevelCache resets the cached results of `IsLevelEnabled`. This is +// called any time a Target is added or a target's level is changed. +func (logr *Logr) ResetLevelCache() { + // Write lock so that new cache entries cannot be stored while we + // clear the cache. + logr.mux.Lock() + defer logr.mux.Unlock() + logr.resetLevelCache() +} + +// resetLevelCache empties the level cache without locking. +// mux.Lock must be held before calling this function. +func (logr *Logr) resetLevelCache() { + // lvlCache may still be nil if no targets added. + if logr.lvlCache != nil { + logr.lvlCache.clear() + } +} + +// enqueue adds a log record to the logr queue. If the queue is full then +// this function either blocks or the log record is dropped, depending on +// the result of calling `OnQueueFull`. +func (logr *Logr) enqueue(rec *LogRec) { + if logr.in == nil { + logr.ReportError(fmt.Errorf("AddTarget or Configure must be called before enqueue")) + } + + select { + case logr.in <- rec: + default: + if logr.OnQueueFull != nil && logr.OnQueueFull(rec, logr.maxQueueSizeActual) { + return // drop the record + } + select { + case <-time.After(logr.enqueueTimeout()): + logr.ReportError(fmt.Errorf("enqueue timed out for log rec [%v]", rec)) + case logr.in <- rec: // block until success or timeout + } + } +} + +// exit is called by one of the FatalXXX style APIS. If `logr.OnExit` is not nil +// then that method is called, otherwise the default behavior is to shut down this +// Logr cleanly then call `os.Exit(code)`. +func (logr *Logr) exit(code int) { + if logr.OnExit != nil { + logr.OnExit(code) + return + } + + if err := logr.Shutdown(); err != nil { + logr.ReportError(err) + } + os.Exit(code) +} + +// panic is called by one of the PanicXXX style APIS. If `logr.OnPanic` is not nil +// then that method is called, otherwise the default behavior is to shut down this +// Logr cleanly then call `panic(err)`. +func (logr *Logr) panic(err interface{}) { + if logr.OnPanic != nil { + logr.OnPanic(err) + return + } + + if err := logr.Shutdown(); err != nil { + logr.ReportError(err) + } + panic(err) +} + +// Flush blocks while flushing the logr queue and all target queues, by +// writing existing log records to valid targets. +// Any attempts to add new log records will block until flush is complete. +// `logr.FlushTimeout` determines how long flush can execute before +// timing out. Use `IsTimeoutError` to determine if the returned error is +// due to a timeout. +func (logr *Logr) Flush() error { + ctx, cancel := context.WithTimeout(context.Background(), logr.flushTimeout()) + defer cancel() + return logr.FlushWithTimeout(ctx) +} + +// Flush blocks while flushing the logr queue and all target queues, by +// writing existing log records to valid targets. +// Any attempts to add new log records will block until flush is complete. +// Use `IsTimeoutError` to determine if the returned error is +// due to a timeout. +func (logr *Logr) FlushWithTimeout(ctx context.Context) error { + if !logr.HasTargets() { + return nil + } + + if logr.IsShutdown() { + return errors.New("Flush called on shut down Logr") + } + + rec := newFlushLogRec(logr.NewLogger()) + logr.enqueue(rec) + + select { + case <-ctx.Done(): + return newTimeoutError("logr queue shutdown timeout") + case <-rec.flush: + } + return nil +} + +// IsShutdown returns true if this Logr instance has been shut down. +// No further log records can be enqueued and no targets added after +// shutdown. +func (logr *Logr) IsShutdown() bool { + logr.mux.Lock() + defer logr.mux.Unlock() + return logr.shutdown +} + +// Shutdown cleanly stops the logging engine after making best efforts +// to flush all targets. Call this function right before application +// exit - logr cannot be restarted once shut down. +// `logr.ShutdownTimeout` determines how long shutdown can execute before +// timing out. Use `IsTimeoutError` to determine if the returned error is +// due to a timeout. +func (logr *Logr) Shutdown() error { + ctx, cancel := context.WithTimeout(context.Background(), logr.shutdownTimeout()) + defer cancel() + return logr.ShutdownWithTimeout(ctx) +} + +// Shutdown cleanly stops the logging engine after making best efforts +// to flush all targets. Call this function right before application +// exit - logr cannot be restarted once shut down. +// Use `IsTimeoutError` to determine if the returned error is due to a +// timeout. +func (logr *Logr) ShutdownWithTimeout(ctx context.Context) error { + logr.mux.Lock() + if logr.shutdown { + logr.mux.Unlock() + return errors.New("Shutdown called again after shut down") + } + logr.shutdown = true + logr.resetLevelCache() + logr.mux.Unlock() + + logr.metricsCloseOnce.Do(func() { + if logr.metricsDone != nil { + close(logr.metricsDone) + } + }) + + errs := merror.New() + + // close the incoming channel and wait for read loop to exit. + if logr.in != nil { + close(logr.in) + select { + case <-ctx.Done(): + errs.Append(newTimeoutError("logr queue shutdown timeout")) + case <-logr.done: + } + } + + // logr.in channel should now be drained to targets and no more log records + // can be added. + logr.tmux.RLock() + defer logr.tmux.RUnlock() + for _, t := range logr.targets { + err := t.Shutdown(ctx) + if err != nil { + errs.Append(err) + } + } + return errs.ErrorOrNil() +} + +// ReportError is used to notify the host application of any internal logging errors. +// If `OnLoggerError` is not nil, it is called with the error, otherwise the error is +// output to `os.Stderr`. +func (logr *Logr) ReportError(err interface{}) { + logr.incErrorCounter() + + if logr.OnLoggerError == nil { + fmt.Fprintln(os.Stderr, err) + return + } + logr.OnLoggerError(fmt.Errorf("%v", err)) +} + +// BorrowBuffer borrows a buffer from the pool. Release the buffer to reduce garbage collection. +func (logr *Logr) BorrowBuffer() *bytes.Buffer { + if logr.DisableBufferPool { + return &bytes.Buffer{} + } + return logr.bufferPool.Get().(*bytes.Buffer) +} + +// ReleaseBuffer returns a buffer to the pool to reduce garbage collection. The buffer is only +// retained if less than MaxPooledBuffer. +func (logr *Logr) ReleaseBuffer(buf *bytes.Buffer) { + if !logr.DisableBufferPool && buf.Cap() < logr.MaxPooledBuffer { + buf.Reset() + logr.bufferPool.Put(buf) + } +} + +// enqueueTimeout returns amount of time a log record can take to be queued. +// This only applies to blocking enqueue which happen after `logr.OnQueueFull` is called +// and returns false. +func (logr *Logr) enqueueTimeout() time.Duration { + if logr.EnqueueTimeout == 0 { + return DefaultEnqueueTimeout + } + return logr.EnqueueTimeout +} + +// shutdownTimeout returns the timeout duration for `logr.Shutdown`. +func (logr *Logr) shutdownTimeout() time.Duration { + if logr.ShutdownTimeout == 0 { + return DefaultShutdownTimeout + } + return logr.ShutdownTimeout +} + +// flushTimeout returns the timeout duration for `logr.Flush`. +func (logr *Logr) flushTimeout() time.Duration { + if logr.FlushTimeout == 0 { + return DefaultFlushTimeout + } + return logr.FlushTimeout +} + +// start selects on incoming log records until done channel signals. +// Incoming log records are fanned out to all log targets. +func (logr *Logr) start() { + defer func() { + if r := recover(); r != nil { + logr.ReportError(r) + go logr.start() + } + }() + + for rec := range logr.in { + if rec.flush != nil { + logr.flush(rec.flush) + } else { + rec.prep() + logr.fanout(rec) + } + } + close(logr.done) +} + +// startMetricsUpdater updates the metrics for any polled values every `MetricsUpdateFreqSecs` seconds until +// logr is closed. +func (logr *Logr) startMetricsUpdater() { + for { + updateFreq := logr.getMetricsUpdateFreqMillis() + if updateFreq == 0 { + updateFreq = DefMetricsUpdateFreqMillis + } + if updateFreq < 250 { + updateFreq = 250 // don't peg the CPU + } + + select { + case <-logr.metricsDone: + return + case <-time.After(time.Duration(updateFreq) * time.Millisecond): + logr.setQueueSizeGauge(float64(len(logr.in))) + } + } +} + +func (logr *Logr) getMetricsUpdateFreqMillis() int64 { + logr.mux.RLock() + defer logr.mux.RUnlock() + return logr.MetricsUpdateFreqMillis +} + +// fanout pushes a LogRec to all targets. +func (logr *Logr) fanout(rec *LogRec) { + var target Target + defer func() { + if r := recover(); r != nil { + logr.ReportError(fmt.Errorf("fanout failed for target %s, %v", target, r)) + } + }() + + var logged bool + defer func() { + if logged { + logr.incLoggedCounter() // call this after tmux is released + } + }() + + logr.tmux.RLock() + defer logr.tmux.RUnlock() + for _, target = range logr.targets { + if enabled, _ := target.IsLevelEnabled(rec.Level()); enabled { + target.Log(rec) + logged = true + } + } +} + +// flush drains the queue and notifies when done. +func (logr *Logr) flush(done chan<- struct{}) { + // first drain the logr queue. +loop: + for { + var rec *LogRec + select { + case rec = <-logr.in: + if rec.flush == nil { + rec.prep() + logr.fanout(rec) + } + default: + break loop + } + } + + logger := logr.NewLogger() + + // drain all the targets; block until finished. + logr.tmux.RLock() + defer logr.tmux.RUnlock() + for _, target := range logr.targets { + rec := newFlushLogRec(logger) + target.Log(rec) + <-rec.flush + } + done <- struct{}{} +} |