summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/modern-go/concurrent/unbounded_executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/modern-go/concurrent/unbounded_executor.go')
-rw-r--r--vendor/github.com/modern-go/concurrent/unbounded_executor.go119
1 files changed, 119 insertions, 0 deletions
diff --git a/vendor/github.com/modern-go/concurrent/unbounded_executor.go b/vendor/github.com/modern-go/concurrent/unbounded_executor.go
new file mode 100644
index 00000000..05a77dce
--- /dev/null
+++ b/vendor/github.com/modern-go/concurrent/unbounded_executor.go
@@ -0,0 +1,119 @@
+package concurrent
+
+import (
+ "context"
+ "fmt"
+ "runtime"
+ "runtime/debug"
+ "sync"
+ "time"
+ "reflect"
+)
+
+// HandlePanic logs goroutine panic by default
+var HandlePanic = func(recovered interface{}, funcName string) {
+ ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
+ ErrorLogger.Println(string(debug.Stack()))
+}
+
+// UnboundedExecutor is a executor without limits on counts of alive goroutines
+// it tracks the goroutine started by it, and can cancel them when shutdown
+type UnboundedExecutor struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ activeGoroutinesMutex *sync.Mutex
+ activeGoroutines map[string]int
+ HandlePanic func(recovered interface{}, funcName string)
+}
+
+// GlobalUnboundedExecutor has the life cycle of the program itself
+// any goroutine want to be shutdown before main exit can be started from this executor
+// GlobalUnboundedExecutor expects the main function to call stop
+// it does not magically knows the main function exits
+var GlobalUnboundedExecutor = NewUnboundedExecutor()
+
+// NewUnboundedExecutor creates a new UnboundedExecutor,
+// UnboundedExecutor can not be created by &UnboundedExecutor{}
+// HandlePanic can be set with a callback to override global HandlePanic
+func NewUnboundedExecutor() *UnboundedExecutor {
+ ctx, cancel := context.WithCancel(context.TODO())
+ return &UnboundedExecutor{
+ ctx: ctx,
+ cancel: cancel,
+ activeGoroutinesMutex: &sync.Mutex{},
+ activeGoroutines: map[string]int{},
+ }
+}
+
+// Go starts a new goroutine and tracks its lifecycle.
+// Panic will be recovered and logged automatically, except for StopSignal
+func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
+ pc := reflect.ValueOf(handler).Pointer()
+ f := runtime.FuncForPC(pc)
+ funcName := f.Name()
+ file, line := f.FileLine(pc)
+ executor.activeGoroutinesMutex.Lock()
+ defer executor.activeGoroutinesMutex.Unlock()
+ startFrom := fmt.Sprintf("%s:%d", file, line)
+ executor.activeGoroutines[startFrom] += 1
+ go func() {
+ defer func() {
+ recovered := recover()
+ // if you want to quit a goroutine without trigger HandlePanic
+ // use runtime.Goexit() to quit
+ if recovered != nil {
+ if executor.HandlePanic == nil {
+ HandlePanic(recovered, funcName)
+ } else {
+ executor.HandlePanic(recovered, funcName)
+ }
+ }
+ executor.activeGoroutinesMutex.Lock()
+ executor.activeGoroutines[startFrom] -= 1
+ executor.activeGoroutinesMutex.Unlock()
+ }()
+ handler(executor.ctx)
+ }()
+}
+
+// Stop cancel all goroutines started by this executor without wait
+func (executor *UnboundedExecutor) Stop() {
+ executor.cancel()
+}
+
+// StopAndWaitForever cancel all goroutines started by this executor and
+// wait until all goroutines exited
+func (executor *UnboundedExecutor) StopAndWaitForever() {
+ executor.StopAndWait(context.Background())
+}
+
+// StopAndWait cancel all goroutines started by this executor and wait.
+// Wait can be cancelled by the context passed in.
+func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
+ executor.cancel()
+ for {
+ oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
+ select {
+ case <-oneHundredMilliseconds.C:
+ if executor.checkNoActiveGoroutines() {
+ return
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
+ executor.activeGoroutinesMutex.Lock()
+ defer executor.activeGoroutinesMutex.Unlock()
+ for startFrom, count := range executor.activeGoroutines {
+ if count > 0 {
+ InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
+ "startFrom", startFrom,
+ "count", count)
+ return false
+ }
+ }
+ return true
+}