diff options
author | Wim <wim@42.be> | 2023-01-28 22:57:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-28 22:57:53 +0100 |
commit | 880586bac42817ffcfea5d9f746f503fa29915b8 (patch) | |
tree | a89374cba6f88975f12316ec8d1b8aa1d4c6ba79 /vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go | |
parent | eac2a8c8dc831f946970d327e2a80b26b0684255 (diff) | |
download | matterbridge-msglm-880586bac42817ffcfea5d9f746f503fa29915b8.tar.gz matterbridge-msglm-880586bac42817ffcfea5d9f746f503fa29915b8.tar.bz2 matterbridge-msglm-880586bac42817ffcfea5d9f746f503fa29915b8.zip |
Update dependencies (#1951)
Diffstat (limited to 'vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go')
-rw-r--r-- | vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go | 140 |
1 files changed, 89 insertions, 51 deletions
diff --git a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go index b8d9eb3c..b5248106 100644 --- a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go +++ b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "os" "os/exec" "runtime" @@ -45,10 +46,10 @@ type Subscription struct { } func NewSubscription() *Subscription { - newMsgsCh := make(chan SubscriptionMessage, 100) - newConvsCh := make(chan SubscriptionConversation, 100) - newWalletCh := make(chan SubscriptionWalletEvent, 100) - errorCh := make(chan error, 100) + newMsgsCh := make(chan SubscriptionMessage, 250) + newConvsCh := make(chan SubscriptionConversation, 250) + newWalletCh := make(chan SubscriptionWalletEvent, 250) + errorCh := make(chan error, 250) shutdownCh := make(chan struct{}) return &Subscription{ DebugOutput: NewDebugOutput("Subscription"), @@ -137,6 +138,8 @@ type RunOptions struct { EnableTyping bool // Disable bot lite mode DisableBotLiteMode bool + // Number of processes to spin up to connect to the keybase service + NumPipes int } func (r RunOptions) Location() string { @@ -164,13 +167,20 @@ func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) { return api, nil } +type apiPipe struct { + sync.Mutex + input io.Writer + output *bufio.Reader + cmd *exec.Cmd +} + // API is the main object used for communicating with the Keybase JSON API type API struct { sync.Mutex *DebugOutput - apiInput io.Writer - apiOutput *bufio.Reader - apiCmd *exec.Cmd + // Round robin hand out API pipes to allow concurrent API requests. + pipeIdx int + pipes []*apiPipe username string runOpts RunOptions subscriptions []*Subscription @@ -282,12 +292,15 @@ func (a *API) auth() (string, error) { func (a *API) startPipes() (err error) { a.Lock() defer a.Unlock() - if a.apiCmd != nil { - if err := a.apiCmd.Process.Kill(); err != nil { - return fmt.Errorf("unable to kill previous API command %v", err) + for _, pipe := range a.pipes { + if pipe.cmd != nil { + if err := pipe.cmd.Process.Kill(); err != nil { + return fmt.Errorf("unable to kill previous API command %v", err) + } } + pipe.cmd = nil } - a.apiCmd = nil + a.pipes = nil if a.runOpts.StartService { args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"} @@ -306,30 +319,39 @@ func (a *API) startPipes() (err error) { a.Debug("unable to set notifiation settings %v", err) } - a.apiCmd = a.runOpts.Command("chat", "api") - if a.apiInput, err = a.apiCmd.StdinPipe(); err != nil { - return fmt.Errorf("unable to get api stdin: %v", err) - } - output, err := a.apiCmd.StdoutPipe() - if err != nil { - return fmt.Errorf("unable to get api stdout: %v", err) - } - if runtime.GOOS != "windows" { - a.apiCmd.ExtraFiles = []*os.File{output.(*os.File)} - } - if err := a.apiCmd.Start(); err != nil { - return fmt.Errorf("unable to run chat api cmd: %v", err) + // Startup NumPipes processes to the keybase chat api + for i := 0; i < int(math.Max(float64(a.runOpts.NumPipes), 1)); i++ { + pipe := apiPipe{} + pipe.cmd = a.runOpts.Command("chat", "api") + if pipe.input, err = pipe.cmd.StdinPipe(); err != nil { + return fmt.Errorf("unable to get api stdin: %v", err) + } + output, err := pipe.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("unable to get api stdout: %v", err) + } + if runtime.GOOS != "windows" { + pipe.cmd.ExtraFiles = []*os.File{output.(*os.File)} + } + if err := pipe.cmd.Start(); err != nil { + return fmt.Errorf("unable to run chat api cmd: %v", err) + } + pipe.output = bufio.NewReader(output) + a.pipes = append(a.pipes, &pipe) } - a.apiOutput = bufio.NewReader(output) return nil } -func (a *API) getAPIPipesLocked() (io.Writer, *bufio.Reader, error) { - // this should only be called inside a lock - if a.apiCmd == nil { - return nil, nil, errAPIDisconnected +func (a *API) getAPIPipes() (*apiPipe, error) { + a.Lock() + defer a.Unlock() + idx := a.pipeIdx % len(a.pipes) + a.pipeIdx++ + pipe := a.pipes[idx] + if pipe.cmd == nil { + return nil, errAPIDisconnected } - return a.apiInput, a.apiOutput, nil + return pipe, nil } func (a *API) GetUsername() string { @@ -337,21 +359,21 @@ func (a *API) GetUsername() string { } func (a *API) doSend(arg interface{}) (resp SendResponse, err error) { - a.Lock() - defer a.Unlock() - bArg, err := json.Marshal(arg) if err != nil { return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err) } - input, output, err := a.getAPIPipesLocked() + pipe, err := a.getAPIPipes() if err != nil { return SendResponse{}, err } - if _, err := io.WriteString(input, string(bArg)); err != nil { + pipe.Lock() + defer pipe.Unlock() + + if _, err := io.WriteString(pipe.input, string(bArg)); err != nil { return SendResponse{}, err } - responseRaw, err := output.ReadBytes('\n') + responseRaw, err := pipe.output.ReadBytes('\n') if err != nil { return SendResponse{}, err } @@ -364,17 +386,17 @@ func (a *API) doSend(arg interface{}) (resp SendResponse, err error) { } func (a *API) doFetch(apiInput string) ([]byte, error) { - a.Lock() - defer a.Unlock() - - input, output, err := a.getAPIPipesLocked() + pipe, err := a.getAPIPipes() if err != nil { return nil, err } - if _, err := io.WriteString(input, apiInput); err != nil { + pipe.Lock() + defer pipe.Unlock() + + if _, err := io.WriteString(pipe.input, apiInput); err != nil { return nil, err } - byteOutput, err := output.ReadBytes('\n') + byteOutput, err := pipe.output.ReadBytes('\n') if err != nil { return nil, err } @@ -412,16 +434,22 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) { } boutput.Scan() t := boutput.Text() + submitErr := func(err error) { + if len(sub.errorCh)*2 > cap(sub.errorCh) { + a.Debug("large errorCh queue: len: %d cap: %d ", len(sub.errorCh), cap(sub.errorCh)) + } + sub.errorCh <- err + } var typeHolder TypeHolder if err := json.Unmarshal([]byte(t), &typeHolder); err != nil { - sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) break } switch typeHolder.Type { case "chat": var notification chat1.MsgNotification if err := json.Unmarshal([]byte(t), ¬ification); err != nil { - sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) break } if notification.Error != nil { @@ -434,12 +462,15 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) { Channel: notification.Msg.Channel, }, } + if len(sub.newMsgsCh)*2 > cap(sub.newMsgsCh) { + a.Debug("large newMsgsCh queue: len: %d cap: %d ", len(sub.newMsgsCh), cap(sub.newMsgsCh)) + } sub.newMsgsCh <- subscriptionMessage } case "chat_conv": var notification chat1.ConvNotification if err := json.Unmarshal([]byte(t), ¬ification); err != nil { - sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) break } if notification.Error != nil { @@ -448,15 +479,21 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) { subscriptionConv := SubscriptionConversation{ Conversation: *notification.Conv, } + if len(sub.newConvsCh)*2 > cap(sub.newConvsCh) { + a.Debug("large newConvsCh queue: len: %d cap: %d ", len(sub.newConvsCh), cap(sub.newConvsCh)) + } sub.newConvsCh <- subscriptionConv } case "wallet": var holder PaymentHolder if err := json.Unmarshal([]byte(t), &holder); err != nil { - sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) + submitErr(fmt.Errorf("err: %v, data: %v", err, t)) break } subscriptionPayment := SubscriptionWalletEvent(holder) + if len(sub.newWalletCh)*2 > cap(sub.newWalletCh) { + a.Debug("large newWalletCh queue: len: %d cap: %d ", len(sub.newWalletCh), cap(sub.newWalletCh)) + } sub.newWalletCh <- subscriptionPayment default: continue @@ -518,7 +555,6 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) { } boutput := bufio.NewScanner(output) if err := p.Start(); err != nil { - a.Debug("Listen: failed to make listen scanner: %s", err) time.Sleep(pause) continue @@ -568,10 +604,12 @@ func (a *API) Shutdown() (err error) { for _, sub := range a.subscriptions { sub.Shutdown() } - if a.apiCmd != nil { - a.Debug("waiting for API command") - if err := a.apiCmd.Wait(); err != nil { - return err + for _, pipe := range a.pipes { + if pipe.cmd != nil { + a.Debug("waiting for API command") + if err := pipe.cmd.Wait(); err != nil { + return err + } } } |