summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/mattermost/logr/v2/target.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/mattermost/logr/v2/target.go')
-rw-r--r--vendor/github.com/mattermost/logr/v2/target.go304
1 files changed, 304 insertions, 0 deletions
diff --git a/vendor/github.com/mattermost/logr/v2/target.go b/vendor/github.com/mattermost/logr/v2/target.go
new file mode 100644
index 00000000..fa0a9320
--- /dev/null
+++ b/vendor/github.com/mattermost/logr/v2/target.go
@@ -0,0 +1,304 @@
+package logr
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "sync/atomic"
+ "time"
+)
+
+// Target represents a destination for log records such as file,
+// database, TCP socket, etc.
+type Target interface {
+ // Init is called once to initialize the target.
+ Init() error
+
+ // Write outputs to this target's destination.
+ Write(p []byte, rec *LogRec) (int, error)
+
+ // Shutdown is called once to free/close any resources.
+ // Target queue is already drained when this is called.
+ Shutdown() error
+}
+
+type targetMetrics struct {
+ queueSizeGauge Gauge
+ loggedCounter Counter
+ errorCounter Counter
+ droppedCounter Counter
+ blockedCounter Counter
+}
+
+type targetHostOptions struct {
+ name string
+ filter Filter
+ formatter Formatter
+ maxQueueSize int
+ metrics *metrics
+}
+
+// TargetHost hosts and manages the lifecycle of a target.
+// Incoming log records are queued and formatted before
+// being passed to the target.
+type TargetHost struct {
+ target Target
+ name string
+
+ filter Filter
+ formatter Formatter
+
+ in chan *LogRec
+ quit chan struct{} // closed by Shutdown to exit read loop
+ done chan struct{} // closed when read loop exited
+ targetMetrics *targetMetrics
+
+ shutdown int32
+}
+
+func newTargetHost(target Target, options targetHostOptions) (*TargetHost, error) {
+ host := &TargetHost{
+ target: target,
+ name: options.name,
+ filter: options.filter,
+ formatter: options.formatter,
+ in: make(chan *LogRec, options.maxQueueSize),
+ quit: make(chan struct{}),
+ done: make(chan struct{}),
+ }
+
+ if host.name == "" {
+ host.name = fmt.Sprintf("%T", target)
+ }
+
+ if host.filter == nil {
+ host.filter = &StdFilter{Lvl: Fatal}
+ }
+ if host.formatter == nil {
+ host.formatter = &DefaultFormatter{}
+ }
+
+ err := host.initMetrics(options.metrics)
+ if err != nil {
+ return nil, err
+ }
+
+ err = target.Init()
+ if err != nil {
+ return nil, err
+ }
+
+ go host.start()
+
+ return host, nil
+}
+
+func (h *TargetHost) initMetrics(metrics *metrics) error {
+ if metrics == nil {
+ return nil
+ }
+
+ var err error
+ tmetrics := &targetMetrics{}
+
+ if tmetrics.queueSizeGauge, err = metrics.collector.QueueSizeGauge(h.name); err != nil {
+ return err
+ }
+ if tmetrics.loggedCounter, err = metrics.collector.LoggedCounter(h.name); err != nil {
+ return err
+ }
+ if tmetrics.errorCounter, err = metrics.collector.ErrorCounter(h.name); err != nil {
+ return err
+ }
+ if tmetrics.droppedCounter, err = metrics.collector.DroppedCounter(h.name); err != nil {
+ return err
+ }
+ if tmetrics.blockedCounter, err = metrics.collector.BlockedCounter(h.name); err != nil {
+ return err
+ }
+ h.targetMetrics = tmetrics
+
+ updateFreqMillis := metrics.updateFreqMillis
+ if updateFreqMillis == 0 {
+ updateFreqMillis = DefMetricsUpdateFreqMillis
+ }
+ if updateFreqMillis < 250 {
+ updateFreqMillis = 250 // don't peg the CPU
+ }
+
+ go h.startMetricsUpdater(updateFreqMillis)
+ return nil
+}
+
+// IsLevelEnabled returns true if this target should emit logs for the specified level.
+func (h *TargetHost) IsLevelEnabled(lvl Level) (enabled bool, level Level) {
+ level, enabled = h.filter.GetEnabledLevel(lvl)
+ return enabled, level
+}
+
+// Shutdown stops processing log records after making best
+// effort to flush queue.
+func (h *TargetHost) Shutdown(ctx context.Context) error {
+ if atomic.SwapInt32(&h.shutdown, 1) != 0 {
+ return errors.New("targetHost shutdown called more than once")
+ }
+
+ close(h.quit)
+
+ // No more records can be accepted; now wait for read loop to exit.
+ select {
+ case <-ctx.Done():
+ case <-h.done:
+ }
+
+ // b.in channel should now be drained.
+ return h.target.Shutdown()
+}
+
+// Log queues a log record to be output to this target's destination.
+func (h *TargetHost) Log(rec *LogRec) {
+ if atomic.LoadInt32(&h.shutdown) != 0 {
+ return
+ }
+
+ lgr := rec.Logger().Logr()
+ select {
+ case h.in <- rec:
+ default:
+ handler := lgr.options.onTargetQueueFull
+ if handler != nil && handler(h.target, rec, cap(h.in)) {
+ h.incDroppedCounter()
+ return // drop the record
+ }
+ h.incBlockedCounter()
+
+ select {
+ case <-time.After(lgr.options.enqueueTimeout):
+ lgr.ReportError(fmt.Errorf("target enqueue timeout for log rec [%v]", rec))
+ case h.in <- rec: // block until success or timeout
+ }
+ }
+}
+
+func (h *TargetHost) setQueueSizeGauge(val float64) {
+ if h.targetMetrics != nil {
+ h.targetMetrics.queueSizeGauge.Set(val)
+ }
+}
+
+func (h *TargetHost) incLoggedCounter() {
+ if h.targetMetrics != nil {
+ h.targetMetrics.loggedCounter.Inc()
+ }
+}
+
+func (h *TargetHost) incErrorCounter() {
+ if h.targetMetrics != nil {
+ h.targetMetrics.errorCounter.Inc()
+ }
+}
+
+func (h *TargetHost) incDroppedCounter() {
+ if h.targetMetrics != nil {
+ h.targetMetrics.droppedCounter.Inc()
+ }
+}
+
+func (h *TargetHost) incBlockedCounter() {
+ if h.targetMetrics != nil {
+ h.targetMetrics.blockedCounter.Inc()
+ }
+}
+
+// String returns a name for this target.
+func (h *TargetHost) String() string {
+ return h.name
+}
+
+// start accepts log records via In channel and writes to the
+// supplied target, until Done channel signaled.
+func (h *TargetHost) start() {
+ defer func() {
+ if r := recover(); r != nil {
+ fmt.Fprintln(os.Stderr, "TargetHost.start -- ", r)
+ go h.start()
+ } else {
+ close(h.done)
+ }
+ }()
+
+ for {
+ var rec *LogRec
+ select {
+ case rec = <-h.in:
+ if rec.flush != nil {
+ h.flush(rec.flush)
+ } else {
+ err := h.writeRec(rec)
+ if err != nil {
+ h.incErrorCounter()
+ rec.Logger().Logr().ReportError(err)
+ } else {
+ h.incLoggedCounter()
+ }
+ }
+ case <-h.quit:
+ return
+ }
+ }
+}
+
+func (h *TargetHost) writeRec(rec *LogRec) error {
+ level, enabled := h.filter.GetEnabledLevel(rec.Level())
+ if !enabled {
+ // how did we get here?
+ return fmt.Errorf("level %s not enabled for target %s", rec.Level().Name, h.name)
+ }
+
+ buf := rec.logger.lgr.BorrowBuffer()
+ defer rec.logger.lgr.ReleaseBuffer(buf)
+
+ buf, err := h.formatter.Format(rec, level, buf)
+ if err != nil {
+ return err
+ }
+
+ _, err = h.target.Write(buf.Bytes(), rec)
+ return err
+}
+
+// startMetricsUpdater updates the metrics for any polled values every `updateFreqMillis` seconds until
+// target is shut down.
+func (h *TargetHost) startMetricsUpdater(updateFreqMillis int64) {
+ for {
+ select {
+ case <-h.done:
+ return
+ case <-time.After(time.Duration(updateFreqMillis) * time.Millisecond):
+ h.setQueueSizeGauge(float64(len(h.in)))
+ }
+ }
+}
+
+// flush drains the queue and notifies when done.
+func (h *TargetHost) flush(done chan<- struct{}) {
+ for {
+ var rec *LogRec
+ var err error
+ select {
+ case rec = <-h.in:
+ // ignore any redundant flush records.
+ if rec.flush == nil {
+ err = h.writeRec(rec)
+ if err != nil {
+ h.incErrorCounter()
+ rec.Logger().Logr().ReportError(err)
+ }
+ }
+ default:
+ done <- struct{}{}
+ return
+ }
+ }
+}