diff options
Diffstat (limited to 'vendor/github.com/facebookgo')
-rw-r--r-- | vendor/github.com/facebookgo/clock/LICENSE | 21 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/clock/clock.go | 363 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/grace/gracehttp/http.go | 190 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/grace/gracehttp/license | 30 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/grace/gracenet/license | 30 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/grace/gracenet/net.go | 252 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/httpdown/httpdown.go | 376 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/httpdown/httpdown_example/main.go | 43 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/httpdown/license | 30 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/stats/aggregation.go | 35 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/stats/counter.go | 112 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/stats/license | 30 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/stats/stats.go | 166 | ||||
-rw-r--r-- | vendor/github.com/facebookgo/stats/stopper.go | 17 |
14 files changed, 1695 insertions, 0 deletions
diff --git a/vendor/github.com/facebookgo/clock/LICENSE b/vendor/github.com/facebookgo/clock/LICENSE new file mode 100644 index 00000000..ce212cb1 --- /dev/null +++ b/vendor/github.com/facebookgo/clock/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Ben Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/facebookgo/clock/clock.go b/vendor/github.com/facebookgo/clock/clock.go new file mode 100644 index 00000000..bca1a7ba --- /dev/null +++ b/vendor/github.com/facebookgo/clock/clock.go @@ -0,0 +1,363 @@ +package clock + +import ( + "runtime" + "sort" + "sync" + "time" +) + +// Clock represents an interface to the functions in the standard library time +// package. Two implementations are available in the clock package. The first +// is a real-time clock which simply wraps the time package's functions. The +// second is a mock clock which will only make forward progress when +// programmatically adjusted. +type Clock interface { + After(d time.Duration) <-chan time.Time + AfterFunc(d time.Duration, f func()) *Timer + Now() time.Time + Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time + Ticker(d time.Duration) *Ticker + Timer(d time.Duration) *Timer +} + +// New returns an instance of a real-time clock. +func New() Clock { + return &clock{} +} + +// clock implements a real-time clock by simply wrapping the time package functions. +type clock struct{} + +func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) } + +func (c *clock) AfterFunc(d time.Duration, f func()) *Timer { + return &Timer{timer: time.AfterFunc(d, f)} +} + +func (c *clock) Now() time.Time { return time.Now() } + +func (c *clock) Sleep(d time.Duration) { time.Sleep(d) } + +func (c *clock) Tick(d time.Duration) <-chan time.Time { return time.Tick(d) } + +func (c *clock) Ticker(d time.Duration) *Ticker { + t := time.NewTicker(d) + return &Ticker{C: t.C, ticker: t} +} + +func (c *clock) Timer(d time.Duration) *Timer { + t := time.NewTimer(d) + return &Timer{C: t.C, timer: t} +} + +// Mock represents a mock clock that only moves forward programmically. +// It can be preferable to a real-time clock when testing time-based functionality. +type Mock struct { + mu sync.Mutex + now time.Time // current time + timers clockTimers // tickers & timers + + calls Calls + waiting []waiting + callsMutex sync.Mutex +} + +// NewMock returns an instance of a mock clock. +// The current time of the mock clock on initialization is the Unix epoch. +func NewMock() *Mock { + return &Mock{now: time.Unix(0, 0)} +} + +// Add moves the current time of the mock clock forward by the duration. +// This should only be called from a single goroutine at a time. +func (m *Mock) Add(d time.Duration) { + // Calculate the final current time. + t := m.now.Add(d) + + // Continue to execute timers until there are no more before the new time. + for { + if !m.runNextTimer(t) { + break + } + } + + // Ensure that we end with the new time. + m.mu.Lock() + m.now = t + m.mu.Unlock() + + // Give a small buffer to make sure the other goroutines get handled. + gosched() +} + +// runNextTimer executes the next timer in chronological order and moves the +// current time to the timer's next tick time. The next time is not executed if +// it's next time if after the max time. Returns true if a timer is executed. +func (m *Mock) runNextTimer(max time.Time) bool { + m.mu.Lock() + + // Sort timers by time. + sort.Sort(m.timers) + + // If we have no more timers then exit. + if len(m.timers) == 0 { + m.mu.Unlock() + return false + } + + // Retrieve next timer. Exit if next tick is after new time. + t := m.timers[0] + if t.Next().After(max) { + m.mu.Unlock() + return false + } + + // Move "now" forward and unlock clock. + m.now = t.Next() + m.mu.Unlock() + + // Execute timer. + t.Tick(m.now) + return true +} + +// After waits for the duration to elapse and then sends the current time on the returned channel. +func (m *Mock) After(d time.Duration) <-chan time.Time { + defer m.inc(&m.calls.After) + return m.Timer(d).C +} + +// AfterFunc waits for the duration to elapse and then executes a function. +// A Timer is returned that can be stopped. +func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer { + defer m.inc(&m.calls.AfterFunc) + t := m.Timer(d) + t.C = nil + t.fn = f + return t +} + +// Now returns the current wall time on the mock clock. +func (m *Mock) Now() time.Time { + defer m.inc(&m.calls.Now) + m.mu.Lock() + defer m.mu.Unlock() + return m.now +} + +// Sleep pauses the goroutine for the given duration on the mock clock. +// The clock must be moved forward in a separate goroutine. +func (m *Mock) Sleep(d time.Duration) { + defer m.inc(&m.calls.Sleep) + <-m.After(d) +} + +// Tick is a convenience function for Ticker(). +// It will return a ticker channel that cannot be stopped. +func (m *Mock) Tick(d time.Duration) <-chan time.Time { + defer m.inc(&m.calls.Tick) + return m.Ticker(d).C +} + +// Ticker creates a new instance of Ticker. +func (m *Mock) Ticker(d time.Duration) *Ticker { + defer m.inc(&m.calls.Ticker) + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan time.Time) + t := &Ticker{ + C: ch, + c: ch, + mock: m, + d: d, + next: m.now.Add(d), + } + m.timers = append(m.timers, (*internalTicker)(t)) + return t +} + +// Timer creates a new instance of Timer. +func (m *Mock) Timer(d time.Duration) *Timer { + defer m.inc(&m.calls.Timer) + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan time.Time) + t := &Timer{ + C: ch, + c: ch, + mock: m, + next: m.now.Add(d), + } + m.timers = append(m.timers, (*internalTimer)(t)) + return t +} + +func (m *Mock) removeClockTimer(t clockTimer) { + m.mu.Lock() + defer m.mu.Unlock() + for i, timer := range m.timers { + if timer == t { + copy(m.timers[i:], m.timers[i+1:]) + m.timers[len(m.timers)-1] = nil + m.timers = m.timers[:len(m.timers)-1] + break + } + } + sort.Sort(m.timers) +} + +func (m *Mock) inc(addr *uint32) { + m.callsMutex.Lock() + defer m.callsMutex.Unlock() + *addr++ + var newWaiting []waiting + for _, w := range m.waiting { + if m.calls.atLeast(w.expected) { + close(w.done) + continue + } + newWaiting = append(newWaiting, w) + } + m.waiting = newWaiting +} + +// Wait waits for at least the relevant calls before returning. The expected +// Calls are always over the lifetime of the Mock. Values in the Calls struct +// are used as the minimum number of calls, this allows you to wait for only +// the calls you care about. +func (m *Mock) Wait(s Calls) { + m.callsMutex.Lock() + if m.calls.atLeast(s) { + m.callsMutex.Unlock() + return + } + done := make(chan struct{}) + m.waiting = append(m.waiting, waiting{expected: s, done: done}) + m.callsMutex.Unlock() + <-done +} + +// clockTimer represents an object with an associated start time. +type clockTimer interface { + Next() time.Time + Tick(time.Time) +} + +// clockTimers represents a list of sortable timers. +type clockTimers []clockTimer + +func (a clockTimers) Len() int { return len(a) } +func (a clockTimers) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a clockTimers) Less(i, j int) bool { return a[i].Next().Before(a[j].Next()) } + +// Timer represents a single event. +// The current time will be sent on C, unless the timer was created by AfterFunc. +type Timer struct { + C <-chan time.Time + c chan time.Time + timer *time.Timer // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + fn func() // AfterFunc function, if set +} + +// Stop turns off the ticker. +func (t *Timer) Stop() { + if t.timer != nil { + t.timer.Stop() + } else { + t.mock.removeClockTimer((*internalTimer)(t)) + } +} + +type internalTimer Timer + +func (t *internalTimer) Next() time.Time { return t.next } +func (t *internalTimer) Tick(now time.Time) { + if t.fn != nil { + t.fn() + } else { + t.c <- now + } + t.mock.removeClockTimer((*internalTimer)(t)) + gosched() +} + +// Ticker holds a channel that receives "ticks" at regular intervals. +type Ticker struct { + C <-chan time.Time + c chan time.Time + ticker *time.Ticker // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + d time.Duration // time between ticks +} + +// Stop turns off the ticker. +func (t *Ticker) Stop() { + if t.ticker != nil { + t.ticker.Stop() + } else { + t.mock.removeClockTimer((*internalTicker)(t)) + } +} + +type internalTicker Ticker + +func (t *internalTicker) Next() time.Time { return t.next } +func (t *internalTicker) Tick(now time.Time) { + select { + case t.c <- now: + case <-time.After(1 * time.Millisecond): + } + t.next = now.Add(t.d) + gosched() +} + +// Sleep momentarily so that other goroutines can process. +func gosched() { runtime.Gosched() } + +// Calls keeps track of the count of calls for each of the methods on the Clock +// interface. +type Calls struct { + After uint32 + AfterFunc uint32 + Now uint32 + Sleep uint32 + Tick uint32 + Ticker uint32 + Timer uint32 +} + +// atLeast returns true if at least the number of calls in o have been made. +func (c Calls) atLeast(o Calls) bool { + if c.After < o.After { + return false + } + if c.AfterFunc < o.AfterFunc { + return false + } + if c.Now < o.Now { + return false + } + if c.Sleep < o.Sleep { + return false + } + if c.Tick < o.Tick { + return false + } + if c.Ticker < o.Ticker { + return false + } + if c.Timer < o.Timer { + return false + } + return true +} + +type waiting struct { + expected Calls + done chan struct{} +} diff --git a/vendor/github.com/facebookgo/grace/gracehttp/http.go b/vendor/github.com/facebookgo/grace/gracehttp/http.go new file mode 100644 index 00000000..fa3ac883 --- /dev/null +++ b/vendor/github.com/facebookgo/grace/gracehttp/http.go @@ -0,0 +1,190 @@ +// Package gracehttp provides easy to use graceful restart +// functionality for HTTP server. +package gracehttp + +import ( + "bytes" + "crypto/tls" + "fmt" + "log" + "net" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/facebookgo/grace/gracenet" + "github.com/facebookgo/httpdown" +) + +var ( + logger *log.Logger + didInherit = os.Getenv("LISTEN_FDS") != "" + ppid = os.Getppid() +) + +// An app contains one or more servers and associated configuration. +type app struct { + servers []*http.Server + http *httpdown.HTTP + net *gracenet.Net + listeners []net.Listener + sds []httpdown.Server + errors chan error +} + +func newApp(servers []*http.Server) *app { + return &app{ + servers: servers, + http: &httpdown.HTTP{}, + net: &gracenet.Net{}, + listeners: make([]net.Listener, 0, len(servers)), + sds: make([]httpdown.Server, 0, len(servers)), + + // 2x num servers for possible Close or Stop errors + 1 for possible + // StartProcess error. + errors: make(chan error, 1+(len(servers)*2)), + } +} + +func (a *app) listen() error { + for _, s := range a.servers { + // TODO: default addresses + l, err := a.net.Listen("tcp", s.Addr) + if err != nil { + return err + } + if s.TLSConfig != nil { + l = tls.NewListener(l, s.TLSConfig) + } + a.listeners = append(a.listeners, l) + } + return nil +} + +func (a *app) serve() { + for i, s := range a.servers { + a.sds = append(a.sds, a.http.Serve(s, a.listeners[i])) + } +} + +func (a *app) wait() { + var wg sync.WaitGroup + wg.Add(len(a.sds) * 2) // Wait & Stop + go a.signalHandler(&wg) + for _, s := range a.sds { + go func(s httpdown.Server) { + defer wg.Done() + if err := s.Wait(); err != nil { + a.errors <- err + } + }(s) + } + wg.Wait() +} + +func (a *app) term(wg *sync.WaitGroup) { + for _, s := range a.sds { + go func(s httpdown.Server) { + defer wg.Done() + if err := s.Stop(); err != nil { + a.errors <- err + } + }(s) + } +} + +func (a *app) signalHandler(wg *sync.WaitGroup) { + ch := make(chan os.Signal, 10) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2) + for { + sig := <-ch + switch sig { + case syscall.SIGINT, syscall.SIGTERM: + // this ensures a subsequent INT/TERM will trigger standard go behaviour of + // terminating. + signal.Stop(ch) + a.term(wg) + return + case syscall.SIGUSR2: + // we only return here if there's an error, otherwise the new process + // will send us a TERM when it's ready to trigger the actual shutdown. + if _, err := a.net.StartProcess(); err != nil { + a.errors <- err + } + } + } +} + +// Serve will serve the given http.Servers and will monitor for signals +// allowing for graceful termination (SIGTERM) or restart (SIGUSR2). +func Serve(servers ...*http.Server) error { + a := newApp(servers) + + // Acquire Listeners + if err := a.listen(); err != nil { + return err + } + + // Some useful logging. + if logger != nil { + if didInherit { + if ppid == 1 { + logger.Printf("Listening on init activated %s", pprintAddr(a.listeners)) + } else { + const msg = "Graceful handoff of %s with new pid %d and old pid %d" + logger.Printf(msg, pprintAddr(a.listeners), os.Getpid(), ppid) + } + } else { + const msg = "Serving %s with pid %d" + logger.Printf(msg, pprintAddr(a.listeners), os.Getpid()) + } + } + + // Start serving. + a.serve() + + // Close the parent if we inherited and it wasn't init that started us. + if didInherit && ppid != 1 { + if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil { + return fmt.Errorf("failed to close parent: %s", err) + } + } + + waitdone := make(chan struct{}) + go func() { + defer close(waitdone) + a.wait() + }() + + select { + case err := <-a.errors: + if err == nil { + panic("unexpected nil error") + } + return err + case <-waitdone: + if logger != nil { + logger.Printf("Exiting pid %d.", os.Getpid()) + } + return nil + } +} + +// Used for pretty printing addresses. +func pprintAddr(listeners []net.Listener) []byte { + var out bytes.Buffer + for i, l := range listeners { + if i != 0 { + fmt.Fprint(&out, ", ") + } + fmt.Fprint(&out, l.Addr()) + } + return out.Bytes() +} + +// SetLogger sets logger to be able to grab some useful logs +func SetLogger(l *log.Logger) { + logger = l +} diff --git a/vendor/github.com/facebookgo/grace/gracehttp/license b/vendor/github.com/facebookgo/grace/gracehttp/license new file mode 100644 index 00000000..3aea8753 --- /dev/null +++ b/vendor/github.com/facebookgo/grace/gracehttp/license @@ -0,0 +1,30 @@ +BSD License + +For grace software + +Copyright (c) 2015, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/facebookgo/grace/gracenet/license b/vendor/github.com/facebookgo/grace/gracenet/license new file mode 100644 index 00000000..3aea8753 --- /dev/null +++ b/vendor/github.com/facebookgo/grace/gracenet/license @@ -0,0 +1,30 @@ +BSD License + +For grace software + +Copyright (c) 2015, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/facebookgo/grace/gracenet/net.go b/vendor/github.com/facebookgo/grace/gracenet/net.go new file mode 100644 index 00000000..a980954a --- /dev/null +++ b/vendor/github.com/facebookgo/grace/gracenet/net.go @@ -0,0 +1,252 @@ +// Package gracenet provides a family of Listen functions that either open a +// fresh connection or provide an inherited connection from when the process +// was started. The behave like their counterparts in the net package, but +// transparently provide support for graceful restarts without dropping +// connections. This is provided in a systemd socket activation compatible form +// to allow using socket activation. +// +// BUG: Doesn't handle closing of listeners. +package gracenet + +import ( + "fmt" + "net" + "os" + "os/exec" + "strconv" + "strings" + "sync" +) + +const ( + // Used to indicate a graceful restart in the new process. + envCountKey = "LISTEN_FDS" + envCountKeyPrefix = envCountKey + "=" +) + +// In order to keep the working directory the same as when we started we record +// it at startup. +var originalWD, _ = os.Getwd() + +// Net provides the family of Listen functions and maintains the associated +// state. Typically you will have only once instance of Net per application. +type Net struct { + inherited []net.Listener + active []net.Listener + mutex sync.Mutex + inheritOnce sync.Once + + // used in tests to override the default behavior of starting from fd 3. + fdStart int +} + +func (n *Net) inherit() error { + var retErr error + n.inheritOnce.Do(func() { + n.mutex.Lock() + defer n.mutex.Unlock() + countStr := os.Getenv(envCountKey) + if countStr == "" { + return + } + count, err := strconv.Atoi(countStr) + if err != nil { + retErr = fmt.Errorf("found invalid count value: %s=%s", envCountKey, countStr) + return + } + + // In tests this may be overridden. + fdStart := n.fdStart + if fdStart == 0 { + // In normal operations if we are inheriting, the listeners will begin at + // fd 3. + fdStart = 3 + } + + for i := fdStart; i < fdStart+count; i++ { + file := os.NewFile(uintptr(i), "listener") + l, err := net.FileListener(file) + if err != nil { + file.Close() + retErr = fmt.Errorf("error inheriting socket fd %d: %s", i, err) + return + } + if err := file.Close(); err != nil { + retErr = fmt.Errorf("error closing inherited socket fd %d: %s", i, err) + return + } + n.inherited = append(n.inherited, l) + } + }) + return retErr +} + +// Listen announces on the local network address laddr. The network net must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It +// returns an inherited net.Listener for the matching network and address, or +// creates a new one using net.Listen. +func (n *Net) Listen(nett, laddr string) (net.Listener, error) { + switch nett { + default: + return nil, net.UnknownNetworkError(nett) + case "tcp", "tcp4", "tcp6": + addr, err := net.ResolveTCPAddr(nett, laddr) + if err != nil { + return nil, err + } + return n.ListenTCP(nett, addr) + case "unix", "unixpacket", "invalid_unix_net_for_test": + addr, err := net.ResolveUnixAddr(nett, laddr) + if err != nil { + return nil, err + } + return n.ListenUnix(nett, addr) + } +} + +// ListenTCP announces on the local network address laddr. The network net must +// be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the +// matching network and address, or creates a new one using net.ListenTCP. +func (n *Net) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) { + if err := n.inherit(); err != nil { + return nil, err + } + + n.mutex.Lock() + defer n.mutex.Unlock() + + // look for an inherited listener + for i, l := range n.inherited { + if l == nil { // we nil used inherited listeners + continue + } + if isSameAddr(l.Addr(), laddr) { + n.inherited[i] = nil + n.active = append(n.active, l) + return l.(*net.TCPListener), nil + } + } + + // make a fresh listener + l, err := net.ListenTCP(nett, laddr) + if err != nil { + return nil, err + } + n.active = append(n.active, l) + return l, nil +} + +// ListenUnix announces on the local network address laddr. The network net +// must be a: "unix" or "unixpacket". It returns an inherited net.Listener for +// the matching network and address, or creates a new one using net.ListenUnix. +func (n *Net) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) { + if err := n.inherit(); err != nil { + return nil, err + } + + n.mutex.Lock() + defer n.mutex.Unlock() + + // look for an inherited listener + for i, l := range n.inherited { + if l == nil { // we nil used inherited listeners + continue + } + if isSameAddr(l.Addr(), laddr) { + n.inherited[i] = nil + n.active = append(n.active, l) + return l.(*net.UnixListener), nil + } + } + + // make a fresh listener + l, err := net.ListenUnix(nett, laddr) + if err != nil { + return nil, err + } + n.active = append(n.active, l) + return l, nil +} + +// activeListeners returns a snapshot copy of the active listeners. +func (n *Net) activeListeners() ([]net.Listener, error) { + n.mutex.Lock() + defer n.mutex.Unlock() + ls := make([]net.Listener, len(n.active)) + copy(ls, n.active) + return ls, nil +} + +func isSameAddr(a1, a2 net.Addr) bool { + if a1.Network() != a2.Network() { + return false + } + a1s := a1.String() + a2s := a2.String() + if a1s == a2s { + return true + } + + // This allows for ipv6 vs ipv4 local addresses to compare as equal. This + // scenario is common when listening on localhost. + const ipv6prefix = "[::]" + a1s = strings.TrimPrefix(a1s, ipv6prefix) + a2s = strings.TrimPrefix(a2s, ipv6prefix) + const ipv4prefix = "0.0.0.0" + a1s = strings.TrimPrefix(a1s, ipv4prefix) + a2s = strings.TrimPrefix(a2s, ipv4prefix) + return a1s == a2s +} + +// StartProcess starts a new process passing it the active listeners. It +// doesn't fork, but starts a new process using the same environment and +// arguments as when it was originally started. This allows for a newly +// deployed binary to be started. It returns the pid of the newly started +// process when successful. +func (n *Net) StartProcess() (int, error) { + listeners, err := n.activeListeners() + if err != nil { + return 0, err + } + + // Extract the fds from the listeners. + files := make([]*os.File, len(listeners)) + for i, l := range listeners { + files[i], err = l.(filer).File() + if err != nil { + return 0, err + } + defer files[i].Close() + } + + // Use the original binary location. This works with symlinks such that if + // the file it points to has been changed we will use the updated symlink. + argv0, err := exec.LookPath(os.Args[0]) + if err != nil { + return 0, err + } + + // Pass on the environment and replace the old count key with the new one. + var env []string + for _, v := range os.Environ() { + if !strings.HasPrefix(v, envCountKeyPrefix) { + env = append(env, v) + } + } + env = append(env, fmt.Sprintf("%s%d", envCountKeyPrefix, len(listeners))) + + allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) + process, err := os.StartProcess(argv0, os.Args, &os.ProcAttr{ + Dir: originalWD, + Env: env, + Files: allFiles, + }) + if err != nil { + return 0, err + } + return process.Pid, nil +} + +type filer interface { + File() (*os.File, error) +} diff --git a/vendor/github.com/facebookgo/httpdown/httpdown.go b/vendor/github.com/facebookgo/httpdown/httpdown.go new file mode 100644 index 00000000..34c5dea9 --- /dev/null +++ b/vendor/github.com/facebookgo/httpdown/httpdown.go @@ -0,0 +1,376 @@ +// Package httpdown provides http.ConnState enabled graceful termination of +// http.Server. +package httpdown + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/facebookgo/clock" + "github.com/facebookgo/stats" +) + +const ( + defaultStopTimeout = time.Minute + defaultKillTimeout = time.Minute +) + +// A Server allows encapsulates the process of accepting new connections and +// serving them, and gracefully shutting down the listener without dropping +// active connections. +type Server interface { + // Wait waits for the serving loop to finish. This will happen when Stop is + // called, at which point it returns no error, or if there is an error in the + // serving loop. You must call Wait after calling Serve or ListenAndServe. + Wait() error + + // Stop stops the listener. It will block until all connections have been + // closed. + Stop() error +} + +// HTTP defines the configuration for serving a http.Server. Multiple calls to +// Serve or ListenAndServe can be made on the same HTTP instance. The default +// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop() +// returns. +type HTTP struct { + // StopTimeout is the duration before we begin force closing connections. + // Defaults to 1 minute. + StopTimeout time.Duration + + // KillTimeout is the duration before which we completely give up and abort + // even though we still have connected clients. This is useful when a large + // number of client connections exist and closing them can take a long time. + // Note, this is in addition to the StopTimeout. Defaults to 1 minute. + KillTimeout time.Duration + + // Stats is optional. If provided, it will be used to record various metrics. + Stats stats.Client + + // Clock allows for testing timing related functionality. Do not specify this + // in production code. + Clock clock.Clock +} + +// Serve provides the low-level API which is useful if you're creating your own +// net.Listener. +func (h HTTP) Serve(s *http.Server, l net.Listener) Server { + stopTimeout := h.StopTimeout + if stopTimeout == 0 { + stopTimeout = defaultStopTimeout + } + killTimeout := h.KillTimeout + if killTimeout == 0 { + killTimeout = defaultKillTimeout + } + klock := h.Clock + if klock == nil { + klock = clock.New() + } + + ss := &server{ + stopTimeout: stopTimeout, + killTimeout: killTimeout, + stats: h.Stats, + clock: klock, + oldConnState: s.ConnState, + listener: l, + server: s, + serveDone: make(chan struct{}), + serveErr: make(chan error, 1), + new: make(chan net.Conn), + active: make(chan net.Conn), + idle: make(chan net.Conn), + closed: make(chan net.Conn), + stop: make(chan chan struct{}), + kill: make(chan chan struct{}), + } + s.ConnState = ss.connState + go ss.manage() + go ss.serve() + return ss +} + +// ListenAndServe returns a Server for the given http.Server. It is equivalent +// to ListenAndServe from the standard library, but returns immediately. +// Requests will be accepted in a background goroutine. If the http.Server has +// a non-nil TLSConfig, a TLS enabled listener will be setup. +func (h HTTP) ListenAndServe(s *http.Server) (Server, error) { + addr := s.Addr + if addr == "" { + if s.TLSConfig == nil { + addr = ":http" + } else { + addr = ":https" + } + } + l, err := net.Listen("tcp", addr) + if err != nil { + stats.BumpSum(h.Stats, "listen.error", 1) + return nil, err + } + if s.TLSConfig != nil { + l = tls.NewListener(l, s.TLSConfig) + } + return h.Serve(s, l), nil +} + +// server manages the serving process and allows for gracefully stopping it. +type server struct { + stopTimeout time.Duration + killTimeout time.Duration + stats stats.Client + clock clock.Clock + + oldConnState func(net.Conn, http.ConnState) + server *http.Server + serveDone chan struct{} + serveErr chan error + listener net.Listener + + new chan net.Conn + active chan net.Conn + idle chan net.Conn + closed chan net.Conn + stop chan chan struct{} + kill chan chan struct{} + + stopOnce sync.Once + stopErr error +} + +func (s *server) connState(c net.Conn, cs http.ConnState) { + if s.oldConnState != nil { + s.oldConnState(c, cs) + } + + switch cs { + case http.StateNew: + s.new <- c + case http.StateActive: + s.active <- c + case http.StateIdle: + s.idle <- c + case http.StateHijacked, http.StateClosed: + s.closed <- c + } +} + +func (s *server) manage() { + defer func() { + close(s.new) + close(s.active) + close(s.idle) + close(s.closed) + close(s.stop) + close(s.kill) + }() + + var stopDone chan struct{} + + conns := map[net.Conn]http.ConnState{} + var countNew, countActive, countIdle float64 + + // decConn decrements the count associated with the current state of the + // given connection. + decConn := func(c net.Conn) { + switch conns[c] { + default: + panic(fmt.Errorf("unknown existing connection: %s", c)) + case http.StateNew: + countNew-- + case http.StateActive: + countActive-- + case http.StateIdle: + countIdle-- + } + } + + // setup a ticker to report various values every minute. if we don't have a + // Stats implementation provided, we Stop it so it never ticks. + statsTicker := s.clock.Ticker(time.Minute) + if s.stats == nil { + statsTicker.Stop() + } + + for { + select { + case <-statsTicker.C: + // we'll only get here when s.stats is not nil + s.stats.BumpAvg("http-state.new", countNew) + s.stats.BumpAvg("http-state.active", countActive) + s.stats.BumpAvg("http-state.idle", countIdle) + s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle) + case c := <-s.new: + conns[c] = http.StateNew + countNew++ + case c := <-s.active: + decConn(c) + countActive++ + + conns[c] = http.StateActive + case c := <-s.idle: + decConn(c) + countIdle++ + + conns[c] = http.StateIdle + + // if we're already stopping, close it + if stopDone != nil { + c.Close() + } + case c := <-s.closed: + stats.BumpSum(s.stats, "conn.closed", 1) + decConn(c) + delete(conns, c) + + // if we're waiting to stop and are all empty, we just closed the last + // connection and we're done. + if stopDone != nil && len(conns) == 0 { + close(stopDone) + return + } + case stopDone = <-s.stop: + // if we're already all empty, we're already done + if len(conns) == 0 { + close(stopDone) + return + } + + // close current idle connections right away + for c, cs := range conns { + if cs == http.StateIdle { + c.Close() + } + } + + // continue the loop and wait for all the ConnState updates which will + // eventually close(stopDone) and return from this goroutine. + + case killDone := <-s.kill: + // force close all connections + stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns))) + for c := range conns { + c.Close() + } + + // don't block the kill. + close(killDone) + + // continue the loop and we wait for all the ConnState updates and will + // return from this goroutine when we're all done. otherwise we'll try to + // send those ConnState updates on closed channels. + + } + } +} + +func (s *server) serve() { + stats.BumpSum(s.stats, "serve", 1) + s.serveErr <- s.server.Serve(s.listener) + close(s.serveDone) + close(s.serveErr) +} + +func (s *server) Wait() error { + if err := <-s.serveErr; !isUseOfClosedError(err) { + return err + } + return nil +} + +func (s *server) Stop() error { + s.stopOnce.Do(func() { + defer stats.BumpTime(s.stats, "stop.time").End() + stats.BumpSum(s.stats, "stop", 1) + + // first disable keep-alive for new connections + s.server.SetKeepAlivesEnabled(false) + + // then close the listener so new connections can't connect come thru + closeErr := s.listener.Close() + <-s.serveDone + + // then trigger the background goroutine to stop and wait for it + stopDone := make(chan struct{}) + s.stop <- stopDone + + // wait for stop + select { + case <-stopDone: + case <-s.clock.After(s.stopTimeout): + defer stats.BumpTime(s.stats, "kill.time").End() + stats.BumpSum(s.stats, "kill", 1) + + // stop timed out, wait for kill + killDone := make(chan struct{}) + s.kill <- killDone + select { + case <-killDone: + case <-s.clock.After(s.killTimeout): + // kill timed out, give up + stats.BumpSum(s.stats, "kill.timeout", 1) + } + } + + if closeErr != nil && !isUseOfClosedError(closeErr) { + stats.BumpSum(s.stats, "listener.close.error", 1) + s.stopErr = closeErr + } + }) + return s.stopErr +} + +func isUseOfClosedError(err error) bool { + if err == nil { + return false + } + if opErr, ok := err.(*net.OpError); ok { + err = opErr.Err + } + return err.Error() == "use of closed network connection" +} + +// ListenAndServe is a convenience function to serve and wait for a SIGTERM +// or SIGINT before shutting down. +func ListenAndServe(s *http.Server, hd *HTTP) error { + if hd == nil { + hd = &HTTP{} + } + hs, err := hd.ListenAndServe(s) + if err != nil { + return err + } + + waiterr := make(chan error, 1) + go func() { + defer close(waiterr) + waiterr <- hs.Wait() + }() + + signals := make(chan os.Signal, 10) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + + select { + case err := <-waiterr: + if err != nil { + return err + } + case <-signals: + signal.Stop(signals) + if err := hs.Stop(); err != nil { + return err + } + if err := <-waiterr; err != nil { + return err + } + } + return nil +} diff --git a/vendor/github.com/facebookgo/httpdown/httpdown_example/main.go b/vendor/github.com/facebookgo/httpdown/httpdown_example/main.go new file mode 100644 index 00000000..9e3c0bff --- /dev/null +++ b/vendor/github.com/facebookgo/httpdown/httpdown_example/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "time" + + "github.com/facebookgo/httpdown" +) + +func handler(w http.ResponseWriter, r *http.Request) { + duration, err := time.ParseDuration(r.FormValue("duration")) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + fmt.Fprintf(w, "going to sleep %s with pid %d\n", duration, os.Getpid()) + w.(http.Flusher).Flush() + time.Sleep(duration) + fmt.Fprintf(w, "slept %s with pid %d\n", duration, os.Getpid()) +} + +func main() { + server := &http.Server{ + Addr: "127.0.0.1:8080", + Handler: http.HandlerFunc(handler), + } + hd := &httpdown.HTTP{ + StopTimeout: 10 * time.Second, + KillTimeout: 1 * time.Second, + } + + flag.StringVar(&server.Addr, "addr", server.Addr, "http address") + flag.DurationVar(&hd.StopTimeout, "stop-timeout", hd.StopTimeout, "stop timeout") + flag.DurationVar(&hd.KillTimeout, "kill-timeout", hd.KillTimeout, "kill timeout") + flag.Parse() + + if err := httpdown.ListenAndServe(server, hd); err != nil { + panic(err) + } +} diff --git a/vendor/github.com/facebookgo/httpdown/license b/vendor/github.com/facebookgo/httpdown/license new file mode 100644 index 00000000..d849082f --- /dev/null +++ b/vendor/github.com/facebookgo/httpdown/license @@ -0,0 +1,30 @@ +BSD License + +For httpdown software + +Copyright (c) 2015, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/facebookgo/stats/aggregation.go b/vendor/github.com/facebookgo/stats/aggregation.go new file mode 100644 index 00000000..8f57fb7f --- /dev/null +++ b/vendor/github.com/facebookgo/stats/aggregation.go @@ -0,0 +1,35 @@ +package stats + +import "sort" + +// Average returns the average value +func Average(values []float64) float64 { + if len(values) == 0 { + return 0 + } + + var val float64 + for _, point := range values { + val += point + } + return val / float64(len(values)) +} + +// Sum returns the sum of all the given values +func Sum(values []float64) float64 { + var val float64 + for _, point := range values { + val += point + } + return val +} + +// Percentiles returns a map containing the asked for percentiles +func Percentiles(values []float64, percentiles map[string]float64) map[string]float64 { + sort.Float64s(values) + results := map[string]float64{} + for label, p := range percentiles { + results[label] = values[int(float64(len(values))*p)] + } + return results +} diff --git a/vendor/github.com/facebookgo/stats/counter.go b/vendor/github.com/facebookgo/stats/counter.go new file mode 100644 index 00000000..59a0ed1e --- /dev/null +++ b/vendor/github.com/facebookgo/stats/counter.go @@ -0,0 +1,112 @@ +package stats + +import "fmt" + +// Type is the type of aggregation of apply +type Type int + +const ( + AggregateAvg Type = iota + AggregateSum + AggregateHistogram +) + +var ( + // HistogramPercentiles is used to determine which percentiles to return for + // SimpleCounter.Aggregate + HistogramPercentiles = map[string]float64{ + "p50": 0.5, + "p95": 0.95, + "p99": 0.99, + } + + // MinSamplesForPercentiles is used by SimpleCounter.Aggregate to determine + // what the minimum number of samples is required for percentile analysis + MinSamplesForPercentiles = 10 +) + +// Aggregates can be used to merge counters together. This is not goroutine safe +type Aggregates map[string]Counter + +// Add adds the counter for aggregation. This is not goroutine safe +func (a Aggregates) Add(c Counter) error { + key := c.FullKey() + if counter, ok := a[key]; ok { + if counter.GetType() != c.GetType() { + return fmt.Errorf("stats: mismatched aggregation type for: %s", key) + } + counter.AddValues(c.GetValues()...) + } else { + a[key] = c + } + return nil +} + +// Counter is the interface used by Aggregates to merge counters together +type Counter interface { + // FullKey is used to uniquely identify the counter + FullKey() string + + // AddValues adds values for aggregation + AddValues(...float64) + + // GetValues returns the values for aggregation + GetValues() []float64 + + // GetType returns the type of aggregation to apply + GetType() Type +} + +// SimpleCounter is a basic implementation of the Counter interface +type SimpleCounter struct { + Key string + Values []float64 + Type Type +} + +// FullKey is part of the Counter interace +func (s *SimpleCounter) FullKey() string { + return s.Key +} + +// GetValues is part of the Counter interface +func (s *SimpleCounter) GetValues() []float64 { + return s.Values +} + +// AddValues is part of the Counter interface +func (s *SimpleCounter) AddValues(vs ...float64) { + s.Values = append(s.Values, vs...) +} + +// GetType is part of the Counter interface +func (s *SimpleCounter) GetType() Type { + return s.Type +} + +// Aggregate aggregates the provided values appropriately, returning a map +// from key to value. If AggregateHistogram is specified, the map will contain +// the relevant percentiles as specified by HistogramPercentiles +func (s *SimpleCounter) Aggregate() map[string]float64 { + switch s.Type { + case AggregateAvg: + return map[string]float64{ + s.Key: Average(s.Values), + } + case AggregateSum: + return map[string]float64{ + s.Key: Sum(s.Values), + } + case AggregateHistogram: + histogram := map[string]float64{ + s.Key: Average(s.Values), + } + if len(s.Values) > MinSamplesForPercentiles { + for k, v := range Percentiles(s.Values, HistogramPercentiles) { + histogram[fmt.Sprintf("%s.%s", s.Key, k)] = v + } + } + return histogram + } + panic("stats: unsupported aggregation type") +} diff --git a/vendor/github.com/facebookgo/stats/license b/vendor/github.com/facebookgo/stats/license new file mode 100644 index 00000000..feae8707 --- /dev/null +++ b/vendor/github.com/facebookgo/stats/license @@ -0,0 +1,30 @@ +BSD License + +For stats software + +Copyright (c) 2015, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/facebookgo/stats/stats.go b/vendor/github.com/facebookgo/stats/stats.go new file mode 100644 index 00000000..b833506a --- /dev/null +++ b/vendor/github.com/facebookgo/stats/stats.go @@ -0,0 +1,166 @@ +// Package stats defines a lightweight interface for collecting statistics. It +// doesn't provide an implementation, just the shared interface. +package stats + +// Client provides methods to collection statistics. +type Client interface { + // BumpAvg bumps the average for the given key. + BumpAvg(key string, val float64) + + // BumpSum bumps the sum for the given key. + BumpSum(key string, val float64) + + // BumpHistogram bumps the histogram for the given key. + BumpHistogram(key string, val float64) + + // BumpTime is a special version of BumpHistogram which is specialized for + // timers. Calling it starts the timer, and it returns a value on which End() + // can be called to indicate finishing the timer. A convenient way of + // recording the duration of a function is calling it like such at the top of + // the function: + // + // defer s.BumpTime("my.function").End() + BumpTime(key string) interface { + End() + } +} + +// PrefixClient adds multiple keys for the same value, with each prefix +// added to the key and calls the underlying client. +func PrefixClient(prefixes []string, client Client) Client { + return &prefixClient{ + Prefixes: prefixes, + Client: client, + } +} + +type prefixClient struct { + Prefixes []string + Client Client +} + +func (p *prefixClient) BumpAvg(key string, val float64) { + for _, prefix := range p.Prefixes { + p.Client.BumpAvg(prefix+key, val) + } +} + +func (p *prefixClient) BumpSum(key string, val float64) { + for _, prefix := range p.Prefixes { + p.Client.BumpSum(prefix+key, val) + } +} + +func (p *prefixClient) BumpHistogram(key string, val float64) { + for _, prefix := range p.Prefixes { + p.Client.BumpHistogram(prefix+key, val) + } +} + +func (p *prefixClient) BumpTime(key string) interface { + End() +} { + var m multiEnder + for _, prefix := range p.Prefixes { + m = append(m, p.Client.BumpTime(prefix+key)) + } + return m +} + +// multiEnder combines many enders together. +type multiEnder []interface { + End() +} + +func (m multiEnder) End() { + for _, e := range m { + e.End() + } +} + +// HookClient is useful for testing. It provides optional hooks for each +// expected method in the interface, which if provided will be called. If a +// hook is not provided, it will be ignored. +type HookClient struct { + BumpAvgHook func(key string, val float64) + BumpSumHook func(key string, val float64) + BumpHistogramHook func(key string, val float64) + BumpTimeHook func(key string) interface { + End() + } +} + +// BumpAvg will call BumpAvgHook if defined. +func (c *HookClient) BumpAvg(key string, val float64) { + if c.BumpAvgHook != nil { + c.BumpAvgHook(key, val) + } +} + +// BumpSum will call BumpSumHook if defined. +func (c *HookClient) BumpSum(key string, val float64) { + if c.BumpSumHook != nil { + c.BumpSumHook(key, val) + } +} + +// BumpHistogram will call BumpHistogramHook if defined. +func (c *HookClient) BumpHistogram(key string, val float64) { + if c.BumpHistogramHook != nil { + c.BumpHistogramHook(key, val) + } +} + +// BumpTime will call BumpTimeHook if defined. +func (c *HookClient) BumpTime(key string) interface { + End() +} { + if c.BumpTimeHook != nil { + return c.BumpTimeHook(key) + } + return NoOpEnd +} + +type noOpEnd struct{} + +func (n noOpEnd) End() {} + +// NoOpEnd provides a dummy value for use in tests as valid return value for +// BumpTime(). +var NoOpEnd = noOpEnd{} + +// BumpAvg calls BumpAvg on the Client if it isn't nil. This is useful when a +// component has an optional stats.Client. +func BumpAvg(c Client, key string, val float64) { + if c != nil { + c.BumpAvg(key, val) + } +} + +// BumpSum calls BumpSum on the Client if it isn't nil. This is useful when a +// component has an optional stats.Client. +func BumpSum(c Client, key string, val float64) { + if c != nil { + c.BumpSum(key, val) + } +} + +// BumpHistogram calls BumpHistogram on the Client if it isn't nil. This is +// useful when a component has an optional stats.Client. +func BumpHistogram(c Client, key string, val float64) { + if c != nil { + c.BumpHistogram(key, val) + } +} + +// BumpTime calls BumpTime on the Client if it isn't nil. If the Client is nil +// it still returns a valid return value which will be a no-op. This is useful +// when a component has an optional stats.Client. +func BumpTime(c Client, key string) interface { + End() +} { + if c != nil { + return c.BumpTime(key) + } + return NoOpEnd +} diff --git a/vendor/github.com/facebookgo/stats/stopper.go b/vendor/github.com/facebookgo/stats/stopper.go new file mode 100644 index 00000000..38e8eab8 --- /dev/null +++ b/vendor/github.com/facebookgo/stats/stopper.go @@ -0,0 +1,17 @@ +package stats + +import "time" + +// Stopper calls Client.BumpSum and Client.BumpHistogram when End'ed +type Stopper struct { + Key string + Start time.Time + Client Client +} + +// End the Stopper +func (s *Stopper) End() { + since := time.Since(s.Start).Seconds() * 1000.0 + s.Client.BumpSum(s.Key+".total", since) + s.Client.BumpHistogram(s.Key, since) +} |