1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
package logr
import "time"
const (
DefMetricsUpdateFreqMillis = 15000 // 15 seconds
)
// Counter is a simple metrics sink that can only increment a value.
// Implementations are external to Logr and provided via `MetricsCollector`.
type Counter interface {
// Inc increments the counter by 1. Use Add to increment it by arbitrary non-negative values.
Inc()
// Add adds the given value to the counter. It panics if the value is < 0.
Add(float64)
}
// Gauge is a simple metrics sink that can receive values and increase or decrease.
// Implementations are external to Logr and provided via `MetricsCollector`.
type Gauge interface {
// Set sets the Gauge to an arbitrary value.
Set(float64)
// Add adds the given value to the Gauge. (The value can be negative, resulting in a decrease of the Gauge.)
Add(float64)
// Sub subtracts the given value from the Gauge. (The value can be negative, resulting in an increase of the Gauge.)
Sub(float64)
}
// MetricsCollector provides a way for users of this Logr package to have metrics pushed
// in an efficient way to any backend, e.g. Prometheus.
// For each target added to Logr, the supplied MetricsCollector will provide a Gauge
// and Counters that will be called frequently as logging occurs.
type MetricsCollector interface {
// QueueSizeGauge returns a Gauge that will be updated by the named target.
QueueSizeGauge(target string) (Gauge, error)
// LoggedCounter returns a Counter that will be incremented by the named target.
LoggedCounter(target string) (Counter, error)
// ErrorCounter returns a Counter that will be incremented by the named target.
ErrorCounter(target string) (Counter, error)
// DroppedCounter returns a Counter that will be incremented by the named target.
DroppedCounter(target string) (Counter, error)
// BlockedCounter returns a Counter that will be incremented by the named target.
BlockedCounter(target string) (Counter, error)
}
// TargetWithMetrics is a target that provides metrics.
type TargetWithMetrics interface {
EnableMetrics(collector MetricsCollector, updateFreqMillis int64) error
}
type metrics struct {
collector MetricsCollector
updateFreqMillis int64
queueSizeGauge Gauge
loggedCounter Counter
errorCounter Counter
done chan struct{}
}
// initMetrics initializes metrics collection.
func (lgr *Logr) initMetrics(collector MetricsCollector, updatefreq int64) {
lgr.stopMetricsUpdater()
if collector == nil {
lgr.metricsMux.Lock()
lgr.metrics = nil
lgr.metricsMux.Unlock()
return
}
metrics := &metrics{
collector: collector,
updateFreqMillis: updatefreq,
done: make(chan struct{}),
}
metrics.queueSizeGauge, _ = collector.QueueSizeGauge("_logr")
metrics.loggedCounter, _ = collector.LoggedCounter("_logr")
metrics.errorCounter, _ = collector.ErrorCounter("_logr")
lgr.metricsMux.Lock()
lgr.metrics = metrics
lgr.metricsMux.Unlock()
go lgr.startMetricsUpdater()
}
func (lgr *Logr) setQueueSizeGauge(val float64) {
lgr.metricsMux.RLock()
defer lgr.metricsMux.RUnlock()
if lgr.metrics != nil {
lgr.metrics.queueSizeGauge.Set(val)
}
}
func (lgr *Logr) incLoggedCounter() {
lgr.metricsMux.RLock()
defer lgr.metricsMux.RUnlock()
if lgr.metrics != nil {
lgr.metrics.loggedCounter.Inc()
}
}
func (lgr *Logr) incErrorCounter() {
lgr.metricsMux.RLock()
defer lgr.metricsMux.RUnlock()
if lgr.metrics != nil {
lgr.metrics.errorCounter.Inc()
}
}
// startMetricsUpdater updates the metrics for any polled values every `metricsUpdateFreqSecs` seconds until
// logr is closed.
func (lgr *Logr) startMetricsUpdater() {
for {
lgr.metricsMux.RLock()
metrics := lgr.metrics
c := metrics.done
lgr.metricsMux.RUnlock()
select {
case <-c:
return
case <-time.After(time.Duration(metrics.updateFreqMillis) * time.Millisecond):
lgr.setQueueSizeGauge(float64(len(lgr.in)))
}
}
}
func (lgr *Logr) stopMetricsUpdater() {
lgr.metricsMux.Lock()
defer lgr.metricsMux.Unlock()
if lgr.metrics != nil && lgr.metrics.done != nil {
close(lgr.metrics.done)
lgr.metrics.done = nil
}
}
|