diff options
Diffstat (limited to 'vendor/github.com/mattermost/logr/target.go')
-rw-r--r-- | vendor/github.com/mattermost/logr/target.go | 299 |
1 files changed, 299 insertions, 0 deletions
diff --git a/vendor/github.com/mattermost/logr/target.go b/vendor/github.com/mattermost/logr/target.go new file mode 100644 index 00000000..f8e7bf75 --- /dev/null +++ b/vendor/github.com/mattermost/logr/target.go @@ -0,0 +1,299 @@ +package logr + +import ( + "context" + "fmt" + "os" + "sync" + "time" +) + +// Target represents a destination for log records such as file, +// database, TCP socket, etc. +type Target interface { + // SetName provides an optional name for the target. + SetName(name string) + + // IsLevelEnabled returns true if this target should emit + // logs for the specified level. Also determines if + // a stack trace is required. + IsLevelEnabled(Level) (enabled bool, stacktrace bool) + + // Formatter returns the Formatter associated with this Target. + Formatter() Formatter + + // Log outputs the log record to this target's destination. + Log(rec *LogRec) + + // Shutdown makes best effort to flush target queue and + // frees/closes all resources. + Shutdown(ctx context.Context) error +} + +// RecordWriter can convert a LogRecord to bytes and output to some data sink. +type RecordWriter interface { + Write(rec *LogRec) error +} + +// Basic provides the basic functionality of a Target that can be used +// to more easily compose your own Targets. To use, just embed Basic +// in your target type, implement `RecordWriter`, and call `(*Basic).Start`. +type Basic struct { + target Target + + filter Filter + formatter Formatter + + in chan *LogRec + done chan struct{} + w RecordWriter + + mux sync.RWMutex + name string + + metrics bool + queueSizeGauge Gauge + loggedCounter Counter + errorCounter Counter + droppedCounter Counter + blockedCounter Counter + + metricsUpdateFreqMillis int64 +} + +// Start initializes this target helper and starts accepting log records for processing. +func (b *Basic) Start(target Target, rw RecordWriter, filter Filter, formatter Formatter, maxQueued int) { + if filter == nil { + filter = &StdFilter{Lvl: Fatal} + } + if formatter == nil { + formatter = &DefaultFormatter{} + } + + b.target = target + b.filter = filter + b.formatter = formatter + b.in = make(chan *LogRec, maxQueued) + b.done = make(chan struct{}, 1) + b.w = rw + go b.start() + + if b.hasMetrics() { + go b.startMetricsUpdater() + } +} + +func (b *Basic) SetName(name string) { + b.mux.Lock() + defer b.mux.Unlock() + b.name = name +} + +// IsLevelEnabled returns true if this target should emit +// logs for the specified level. Also determines if +// a stack trace is required. +func (b *Basic) IsLevelEnabled(lvl Level) (enabled bool, stacktrace bool) { + return b.filter.IsEnabled(lvl), b.filter.IsStacktraceEnabled(lvl) +} + +// Formatter returns the Formatter associated with this Target. +func (b *Basic) Formatter() Formatter { + return b.formatter +} + +// Shutdown stops processing log records after making best +// effort to flush queue. +func (b *Basic) Shutdown(ctx context.Context) error { + // close the incoming channel and wait for read loop to exit. + close(b.in) + select { + case <-ctx.Done(): + case <-b.done: + } + + // b.in channel should now be drained. + return nil +} + +// Log outputs the log record to this targets destination. +func (b *Basic) Log(rec *LogRec) { + lgr := rec.Logger().Logr() + select { + case b.in <- rec: + default: + handler := lgr.OnTargetQueueFull + if handler != nil && handler(b.target, rec, cap(b.in)) { + b.incDroppedCounter() + return // drop the record + } + b.incBlockedCounter() + + select { + case <-time.After(lgr.enqueueTimeout()): + lgr.ReportError(fmt.Errorf("target enqueue timeout for log rec [%v]", rec)) + case b.in <- rec: // block until success or timeout + } + } +} + +// Metrics enables metrics collection using the provided MetricsCollector. +func (b *Basic) EnableMetrics(collector MetricsCollector, updateFreqMillis int64) error { + name := fmt.Sprintf("%v", b) + + b.mux.Lock() + defer b.mux.Unlock() + + b.metrics = true + b.metricsUpdateFreqMillis = updateFreqMillis + + var err error + + if b.queueSizeGauge, err = collector.QueueSizeGauge(name); err != nil { + return err + } + if b.loggedCounter, err = collector.LoggedCounter(name); err != nil { + return err + } + if b.errorCounter, err = collector.ErrorCounter(name); err != nil { + return err + } + if b.droppedCounter, err = collector.DroppedCounter(name); err != nil { + return err + } + if b.blockedCounter, err = collector.BlockedCounter(name); err != nil { + return err + } + return nil +} + +func (b *Basic) hasMetrics() bool { + b.mux.RLock() + defer b.mux.RUnlock() + return b.metrics +} + +func (b *Basic) setQueueSizeGauge(val float64) { + b.mux.RLock() + defer b.mux.RUnlock() + if b.queueSizeGauge != nil { + b.queueSizeGauge.Set(val) + } +} + +func (b *Basic) incLoggedCounter() { + b.mux.RLock() + defer b.mux.RUnlock() + if b.loggedCounter != nil { + b.loggedCounter.Inc() + } +} + +func (b *Basic) incErrorCounter() { + b.mux.RLock() + defer b.mux.RUnlock() + if b.errorCounter != nil { + b.errorCounter.Inc() + } +} + +func (b *Basic) incDroppedCounter() { + b.mux.RLock() + defer b.mux.RUnlock() + if b.droppedCounter != nil { + b.droppedCounter.Inc() + } +} + +func (b *Basic) incBlockedCounter() { + b.mux.RLock() + defer b.mux.RUnlock() + if b.blockedCounter != nil { + b.blockedCounter.Inc() + } +} + +// String returns a name for this target. Use `SetName` to specify a name. +func (b *Basic) String() string { + b.mux.RLock() + defer b.mux.RUnlock() + + if b.name != "" { + return b.name + } + return fmt.Sprintf("%T", b.target) +} + +// Start accepts log records via In channel and writes to the +// supplied writer, until Done channel signaled. +func (b *Basic) start() { + defer func() { + if r := recover(); r != nil { + fmt.Fprintln(os.Stderr, "Basic.start -- ", r) + go b.start() + } + }() + + for rec := range b.in { + if rec.flush != nil { + b.flush(rec.flush) + } else { + err := b.w.Write(rec) + if err != nil { + b.incErrorCounter() + rec.Logger().Logr().ReportError(err) + } else { + b.incLoggedCounter() + } + } + } + close(b.done) +} + +// startMetricsUpdater updates the metrics for any polled values every `MetricsUpdateFreqSecs` seconds until +// target is closed. +func (b *Basic) startMetricsUpdater() { + for { + updateFreq := b.getMetricsUpdateFreqMillis() + if updateFreq == 0 { + updateFreq = DefMetricsUpdateFreqMillis + } + if updateFreq < 250 { + updateFreq = 250 // don't peg the CPU + } + + select { + case <-b.done: + return + case <-time.After(time.Duration(updateFreq) * time.Millisecond): + b.setQueueSizeGauge(float64(len(b.in))) + } + } +} + +func (b *Basic) getMetricsUpdateFreqMillis() int64 { + b.mux.RLock() + defer b.mux.RUnlock() + return b.metricsUpdateFreqMillis +} + +// flush drains the queue and notifies when done. +func (b *Basic) flush(done chan<- struct{}) { + for { + var rec *LogRec + var err error + select { + case rec = <-b.in: + // ignore any redundant flush records. + if rec.flush == nil { + err = b.w.Write(rec) + if err != nil { + b.incErrorCounter() + rec.Logger().Logr().ReportError(err) + } + } + default: + done <- struct{}{} + return + } + } +} |