summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
diff options
context:
space:
mode:
authorWim <wim@42.be>2023-01-28 22:57:53 +0100
committerGitHub <noreply@github.com>2023-01-28 22:57:53 +0100
commit880586bac42817ffcfea5d9f746f503fa29915b8 (patch)
treea89374cba6f88975f12316ec8d1b8aa1d4c6ba79 /vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
parenteac2a8c8dc831f946970d327e2a80b26b0684255 (diff)
downloadmatterbridge-msglm-880586bac42817ffcfea5d9f746f503fa29915b8.tar.gz
matterbridge-msglm-880586bac42817ffcfea5d9f746f503fa29915b8.tar.bz2
matterbridge-msglm-880586bac42817ffcfea5d9f746f503fa29915b8.zip
Update dependencies (#1951)
Diffstat (limited to 'vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go')
-rw-r--r--vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go140
1 files changed, 89 insertions, 51 deletions
diff --git a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
index b8d9eb3c..b5248106 100644
--- a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
+++ b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
+ "math"
"os"
"os/exec"
"runtime"
@@ -45,10 +46,10 @@ type Subscription struct {
}
func NewSubscription() *Subscription {
- newMsgsCh := make(chan SubscriptionMessage, 100)
- newConvsCh := make(chan SubscriptionConversation, 100)
- newWalletCh := make(chan SubscriptionWalletEvent, 100)
- errorCh := make(chan error, 100)
+ newMsgsCh := make(chan SubscriptionMessage, 250)
+ newConvsCh := make(chan SubscriptionConversation, 250)
+ newWalletCh := make(chan SubscriptionWalletEvent, 250)
+ errorCh := make(chan error, 250)
shutdownCh := make(chan struct{})
return &Subscription{
DebugOutput: NewDebugOutput("Subscription"),
@@ -137,6 +138,8 @@ type RunOptions struct {
EnableTyping bool
// Disable bot lite mode
DisableBotLiteMode bool
+ // Number of processes to spin up to connect to the keybase service
+ NumPipes int
}
func (r RunOptions) Location() string {
@@ -164,13 +167,20 @@ func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) {
return api, nil
}
+type apiPipe struct {
+ sync.Mutex
+ input io.Writer
+ output *bufio.Reader
+ cmd *exec.Cmd
+}
+
// API is the main object used for communicating with the Keybase JSON API
type API struct {
sync.Mutex
*DebugOutput
- apiInput io.Writer
- apiOutput *bufio.Reader
- apiCmd *exec.Cmd
+ // Round robin hand out API pipes to allow concurrent API requests.
+ pipeIdx int
+ pipes []*apiPipe
username string
runOpts RunOptions
subscriptions []*Subscription
@@ -282,12 +292,15 @@ func (a *API) auth() (string, error) {
func (a *API) startPipes() (err error) {
a.Lock()
defer a.Unlock()
- if a.apiCmd != nil {
- if err := a.apiCmd.Process.Kill(); err != nil {
- return fmt.Errorf("unable to kill previous API command %v", err)
+ for _, pipe := range a.pipes {
+ if pipe.cmd != nil {
+ if err := pipe.cmd.Process.Kill(); err != nil {
+ return fmt.Errorf("unable to kill previous API command %v", err)
+ }
}
+ pipe.cmd = nil
}
- a.apiCmd = nil
+ a.pipes = nil
if a.runOpts.StartService {
args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"}
@@ -306,30 +319,39 @@ func (a *API) startPipes() (err error) {
a.Debug("unable to set notifiation settings %v", err)
}
- a.apiCmd = a.runOpts.Command("chat", "api")
- if a.apiInput, err = a.apiCmd.StdinPipe(); err != nil {
- return fmt.Errorf("unable to get api stdin: %v", err)
- }
- output, err := a.apiCmd.StdoutPipe()
- if err != nil {
- return fmt.Errorf("unable to get api stdout: %v", err)
- }
- if runtime.GOOS != "windows" {
- a.apiCmd.ExtraFiles = []*os.File{output.(*os.File)}
- }
- if err := a.apiCmd.Start(); err != nil {
- return fmt.Errorf("unable to run chat api cmd: %v", err)
+ // Startup NumPipes processes to the keybase chat api
+ for i := 0; i < int(math.Max(float64(a.runOpts.NumPipes), 1)); i++ {
+ pipe := apiPipe{}
+ pipe.cmd = a.runOpts.Command("chat", "api")
+ if pipe.input, err = pipe.cmd.StdinPipe(); err != nil {
+ return fmt.Errorf("unable to get api stdin: %v", err)
+ }
+ output, err := pipe.cmd.StdoutPipe()
+ if err != nil {
+ return fmt.Errorf("unable to get api stdout: %v", err)
+ }
+ if runtime.GOOS != "windows" {
+ pipe.cmd.ExtraFiles = []*os.File{output.(*os.File)}
+ }
+ if err := pipe.cmd.Start(); err != nil {
+ return fmt.Errorf("unable to run chat api cmd: %v", err)
+ }
+ pipe.output = bufio.NewReader(output)
+ a.pipes = append(a.pipes, &pipe)
}
- a.apiOutput = bufio.NewReader(output)
return nil
}
-func (a *API) getAPIPipesLocked() (io.Writer, *bufio.Reader, error) {
- // this should only be called inside a lock
- if a.apiCmd == nil {
- return nil, nil, errAPIDisconnected
+func (a *API) getAPIPipes() (*apiPipe, error) {
+ a.Lock()
+ defer a.Unlock()
+ idx := a.pipeIdx % len(a.pipes)
+ a.pipeIdx++
+ pipe := a.pipes[idx]
+ if pipe.cmd == nil {
+ return nil, errAPIDisconnected
}
- return a.apiInput, a.apiOutput, nil
+ return pipe, nil
}
func (a *API) GetUsername() string {
@@ -337,21 +359,21 @@ func (a *API) GetUsername() string {
}
func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
- a.Lock()
- defer a.Unlock()
-
bArg, err := json.Marshal(arg)
if err != nil {
return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err)
}
- input, output, err := a.getAPIPipesLocked()
+ pipe, err := a.getAPIPipes()
if err != nil {
return SendResponse{}, err
}
- if _, err := io.WriteString(input, string(bArg)); err != nil {
+ pipe.Lock()
+ defer pipe.Unlock()
+
+ if _, err := io.WriteString(pipe.input, string(bArg)); err != nil {
return SendResponse{}, err
}
- responseRaw, err := output.ReadBytes('\n')
+ responseRaw, err := pipe.output.ReadBytes('\n')
if err != nil {
return SendResponse{}, err
}
@@ -364,17 +386,17 @@ func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
}
func (a *API) doFetch(apiInput string) ([]byte, error) {
- a.Lock()
- defer a.Unlock()
-
- input, output, err := a.getAPIPipesLocked()
+ pipe, err := a.getAPIPipes()
if err != nil {
return nil, err
}
- if _, err := io.WriteString(input, apiInput); err != nil {
+ pipe.Lock()
+ defer pipe.Unlock()
+
+ if _, err := io.WriteString(pipe.input, apiInput); err != nil {
return nil, err
}
- byteOutput, err := output.ReadBytes('\n')
+ byteOutput, err := pipe.output.ReadBytes('\n')
if err != nil {
return nil, err
}
@@ -412,16 +434,22 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
}
boutput.Scan()
t := boutput.Text()
+ submitErr := func(err error) {
+ if len(sub.errorCh)*2 > cap(sub.errorCh) {
+ a.Debug("large errorCh queue: len: %d cap: %d ", len(sub.errorCh), cap(sub.errorCh))
+ }
+ sub.errorCh <- err
+ }
var typeHolder TypeHolder
if err := json.Unmarshal([]byte(t), &typeHolder); err != nil {
- sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
switch typeHolder.Type {
case "chat":
var notification chat1.MsgNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
- sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
if notification.Error != nil {
@@ -434,12 +462,15 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
Channel: notification.Msg.Channel,
},
}
+ if len(sub.newMsgsCh)*2 > cap(sub.newMsgsCh) {
+ a.Debug("large newMsgsCh queue: len: %d cap: %d ", len(sub.newMsgsCh), cap(sub.newMsgsCh))
+ }
sub.newMsgsCh <- subscriptionMessage
}
case "chat_conv":
var notification chat1.ConvNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
- sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
if notification.Error != nil {
@@ -448,15 +479,21 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
subscriptionConv := SubscriptionConversation{
Conversation: *notification.Conv,
}
+ if len(sub.newConvsCh)*2 > cap(sub.newConvsCh) {
+ a.Debug("large newConvsCh queue: len: %d cap: %d ", len(sub.newConvsCh), cap(sub.newConvsCh))
+ }
sub.newConvsCh <- subscriptionConv
}
case "wallet":
var holder PaymentHolder
if err := json.Unmarshal([]byte(t), &holder); err != nil {
- sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
+ submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
subscriptionPayment := SubscriptionWalletEvent(holder)
+ if len(sub.newWalletCh)*2 > cap(sub.newWalletCh) {
+ a.Debug("large newWalletCh queue: len: %d cap: %d ", len(sub.newWalletCh), cap(sub.newWalletCh))
+ }
sub.newWalletCh <- subscriptionPayment
default:
continue
@@ -518,7 +555,6 @@ func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
}
boutput := bufio.NewScanner(output)
if err := p.Start(); err != nil {
-
a.Debug("Listen: failed to make listen scanner: %s", err)
time.Sleep(pause)
continue
@@ -568,10 +604,12 @@ func (a *API) Shutdown() (err error) {
for _, sub := range a.subscriptions {
sub.Shutdown()
}
- if a.apiCmd != nil {
- a.Debug("waiting for API command")
- if err := a.apiCmd.Wait(); err != nil {
- return err
+ for _, pipe := range a.pipes {
+ if pipe.cmd != nil {
+ a.Debug("waiting for API command")
+ if err := pipe.cmd.Wait(); err != nil {
+ return err
+ }
}
}