diff options
author | Wim <wim@42.be> | 2020-05-24 00:06:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-24 00:06:21 +0200 |
commit | 393f9e998b1b40aa59d3fb8794c3a73da38c3fb7 (patch) | |
tree | 2bc9b6e6abdbdc6d811b155997597bdae62bc7db /vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go | |
parent | ba0bfe70a8f07164e1341f4b094841acdad5c3a2 (diff) | |
download | matterbridge-msglm-393f9e998b1b40aa59d3fb8794c3a73da38c3fb7.tar.gz matterbridge-msglm-393f9e998b1b40aa59d3fb8794c3a73da38c3fb7.tar.bz2 matterbridge-msglm-393f9e998b1b40aa59d3fb8794c3a73da38c3fb7.zip |
Update dependencies / vendor (#1146)
Diffstat (limited to 'vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go')
-rw-r--r-- | vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go | 417 |
1 files changed, 224 insertions, 193 deletions
diff --git a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go index de76e75a..68c8ca70 100644 --- a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go +++ b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" "os/exec" "sync" @@ -18,63 +17,110 @@ import ( "github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1" ) -// API is the main object used for communicating with the Keybase JSON API -type API struct { +// SubscriptionMessage contains a message and conversation object +type SubscriptionMessage struct { + Message chat1.MsgSummary + Conversation chat1.ConvSummary +} + +type SubscriptionConversation struct { + Conversation chat1.ConvSummary +} + +type SubscriptionWalletEvent struct { + Payment stellar1.PaymentDetailsLocal +} + +// Subscription has methods to control the background message fetcher loop +type Subscription struct { + *DebugOutput sync.Mutex - apiInput io.Writer - apiOutput *bufio.Reader - apiCmd *exec.Cmd - username string - runOpts RunOptions - subscriptions []*NewSubscription + + newMsgsCh chan SubscriptionMessage + newConvsCh chan SubscriptionConversation + newWalletCh chan SubscriptionWalletEvent + errorCh chan error + running bool + shutdownCh chan struct{} } -func getUsername(runOpts RunOptions) (username string, err error) { - p := runOpts.Command("whoami", "-json") - output, err := p.StdoutPipe() - if err != nil { - return "", err +func NewSubscription() *Subscription { + newMsgsCh := make(chan SubscriptionMessage, 100) + newConvsCh := make(chan SubscriptionConversation, 100) + newWalletCh := make(chan SubscriptionWalletEvent, 100) + errorCh := make(chan error, 100) + shutdownCh := make(chan struct{}) + return &Subscription{ + DebugOutput: NewDebugOutput("Subscription"), + newMsgsCh: newMsgsCh, + newConvsCh: newConvsCh, + newWalletCh: newWalletCh, + shutdownCh: shutdownCh, + errorCh: errorCh, + running: true, } - p.ExtraFiles = []*os.File{output.(*os.File)} - if err = p.Start(); err != nil { - return "", err +} + +// Read blocks until a new message arrives +func (m *Subscription) Read() (msg SubscriptionMessage, err error) { + defer m.Trace(&err, "Read")() + select { + case msg = <-m.newMsgsCh: + return msg, nil + case err = <-m.errorCh: + return SubscriptionMessage{}, err + case <-m.shutdownCh: + return SubscriptionMessage{}, errors.New("Subscription shutdown") } +} - doneCh := make(chan error) - go func() { - defer func() { close(doneCh) }() - statusJSON, err := ioutil.ReadAll(output) - if err != nil { - doneCh <- fmt.Errorf("error reading whoami output: %v", err) - return - } - var status keybase1.CurrentStatus - if err := json.Unmarshal(statusJSON, &status); err != nil { - doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err) - return - } - if status.LoggedIn && status.User != nil { - username = status.User.Username - doneCh <- nil - } else { - doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User) - } - // Cleanup the command - if err := p.Wait(); err != nil { - log.Printf("unable to wait for cmd: %v", err) - } - }() +func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) { + defer m.Trace(&err, "ReadNewConvs")() + select { + case conv = <-m.newConvsCh: + return conv, nil + case err = <-m.errorCh: + return SubscriptionConversation{}, err + case <-m.shutdownCh: + return SubscriptionConversation{}, errors.New("Subscription shutdown") + } +} +// Read blocks until a new message arrives +func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) { + defer m.Trace(&err, "ReadWallet")() select { - case err = <-doneCh: - if err != nil { - return "", err - } - case <-time.After(5 * time.Second): - return "", errors.New("unable to run Keybase command") + case msg = <-m.newWalletCh: + return msg, nil + case err = <-m.errorCh: + return SubscriptionWalletEvent{}, err + case <-m.shutdownCh: + return SubscriptionWalletEvent{}, errors.New("Subscription shutdown") } +} - return username, nil +// Shutdown terminates the background process +func (m *Subscription) Shutdown() { + defer m.Trace(nil, "Shutdown")() + m.Lock() + defer m.Unlock() + if m.running { + close(m.shutdownCh) + m.running = false + } +} + +type ListenOptions struct { + Wallet bool + Convs bool +} + +type PaymentHolder struct { + Payment stellar1.PaymentDetailsLocal `json:"notification"` +} + +type TypeHolder struct { + Type string `json:"type"` } type OneshotOptions struct { @@ -110,22 +156,101 @@ func (r RunOptions) Command(args ...string) *exec.Cmd { } // Start fires up the Keybase JSON API in stdin/stdout mode -func Start(runOpts RunOptions) (*API, error) { - api := &API{ - runOpts: runOpts, - } +func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) { + api := NewAPI(runOpts, opts...) if err := api.startPipes(); err != nil { return nil, err } return api, nil } +// API is the main object used for communicating with the Keybase JSON API +type API struct { + sync.Mutex + *DebugOutput + apiInput io.Writer + apiOutput *bufio.Reader + apiCmd *exec.Cmd + username string + runOpts RunOptions + subscriptions []*Subscription + Timeout time.Duration + LogSendBytes int +} + +func CustomTimeout(timeout time.Duration) func(*API) { + return func(a *API) { + a.Timeout = timeout + } +} + +func NewAPI(runOpts RunOptions, opts ...func(*API)) *API { + api := &API{ + DebugOutput: NewDebugOutput("API"), + runOpts: runOpts, + Timeout: 5 * time.Second, + LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed + } + for _, opt := range opts { + opt(api) + } + return api +} + func (a *API) Command(args ...string) *exec.Cmd { return a.runOpts.Command(args...) } +func (a *API) getUsername(runOpts RunOptions) (username string, err error) { + p := runOpts.Command("whoami", "-json") + output, err := p.StdoutPipe() + if err != nil { + return "", err + } + p.ExtraFiles = []*os.File{output.(*os.File)} + if err = p.Start(); err != nil { + return "", err + } + + doneCh := make(chan error) + go func() { + defer func() { close(doneCh) }() + statusJSON, err := ioutil.ReadAll(output) + if err != nil { + doneCh <- fmt.Errorf("error reading whoami output: %v", err) + return + } + var status keybase1.CurrentStatus + if err := json.Unmarshal(statusJSON, &status); err != nil { + doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err) + return + } + if status.LoggedIn && status.User != nil { + username = status.User.Username + doneCh <- nil + } else { + doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User) + } + // Cleanup the command + if err := p.Wait(); err != nil { + a.Debug("unable to wait for cmd: %v", err) + } + }() + + select { + case err = <-doneCh: + if err != nil { + return "", err + } + case <-time.After(a.Timeout): + return "", errors.New("unable to run Keybase command") + } + + return username, nil +} + func (a *API) auth() (string, error) { - username, err := getUsername(a.runOpts) + username, err := a.getUsername(a.runOpts) if err == nil { return username, nil } @@ -194,8 +319,6 @@ func (a *API) startPipes() (err error) { return nil } -var errAPIDisconnected = errors.New("chat API disconnected") - func (a *API) getAPIPipesLocked() (io.Writer, *bufio.Reader, error) { // this should only be called inside a lock if a.apiCmd == nil { @@ -214,7 +337,7 @@ func (a *API) doSend(arg interface{}) (resp SendResponse, err error) { bArg, err := json.Marshal(arg) if err != nil { - return SendResponse{}, err + return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err) } input, output, err := a.getAPIPipesLocked() if err != nil { @@ -228,7 +351,7 @@ func (a *API) doSend(arg interface{}) (resp SendResponse, err error) { return SendResponse{}, err } if err := json.Unmarshal(responseRaw, &resp); err != nil { - return resp, fmt.Errorf("failed to decode API response: %s", err) + return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err) } else if resp.Error != nil { return resp, errors.New(resp.Error.Message) } @@ -254,97 +377,13 @@ func (a *API) doFetch(apiInput string) ([]byte, error) { return byteOutput, nil } -// SubscriptionMessage contains a message and conversation object -type SubscriptionMessage struct { - Message chat1.MsgSummary - Conversation chat1.ConvSummary -} - -type SubscriptionConversation struct { - Conversation chat1.ConvSummary -} - -type SubscriptionWalletEvent struct { - Payment stellar1.PaymentDetailsLocal -} - -// NewSubscription has methods to control the background message fetcher loop -type NewSubscription struct { - sync.Mutex - - newMsgsCh <-chan SubscriptionMessage - newConvsCh <-chan SubscriptionConversation - newWalletCh <-chan SubscriptionWalletEvent - errorCh <-chan error - running bool - shutdownCh chan struct{} -} - -// Read blocks until a new message arrives -func (m *NewSubscription) Read() (SubscriptionMessage, error) { - select { - case msg := <-m.newMsgsCh: - return msg, nil - case err := <-m.errorCh: - return SubscriptionMessage{}, err - case <-m.shutdownCh: - return SubscriptionMessage{}, errors.New("Subscription shutdown") - } -} - -func (m *NewSubscription) ReadNewConvs() (SubscriptionConversation, error) { - select { - case conv := <-m.newConvsCh: - return conv, nil - case err := <-m.errorCh: - return SubscriptionConversation{}, err - case <-m.shutdownCh: - return SubscriptionConversation{}, errors.New("Subscription shutdown") - } -} - -// Read blocks until a new message arrives -func (m *NewSubscription) ReadWallet() (SubscriptionWalletEvent, error) { - select { - case msg := <-m.newWalletCh: - return msg, nil - case err := <-m.errorCh: - return SubscriptionWalletEvent{}, err - case <-m.shutdownCh: - return SubscriptionWalletEvent{}, errors.New("Subscription shutdown") - } -} - -// Shutdown terminates the background process -func (m *NewSubscription) Shutdown() { - m.Lock() - defer m.Unlock() - if m.running { - close(m.shutdownCh) - m.running = false - } -} - -type ListenOptions struct { - Wallet bool - Convs bool -} - -type PaymentHolder struct { - Payment stellar1.PaymentDetailsLocal `json:"notification"` -} - -type TypeHolder struct { - Type string `json:"type"` -} - // ListenForNewTextMessages proxies to Listen without wallet events -func (a *API) ListenForNewTextMessages() (*NewSubscription, error) { +func (a *API) ListenForNewTextMessages() (*Subscription, error) { opts := ListenOptions{Wallet: false} return a.Listen(opts) } -func (a *API) registerSubscription(sub *NewSubscription) { +func (a *API) registerSubscription(sub *Subscription) { a.Lock() defer a.Unlock() a.subscriptions = append(a.subscriptions, sub) @@ -352,30 +391,17 @@ func (a *API) registerSubscription(sub *NewSubscription) { // Listen fires of a background loop and puts chat messages and wallet // events into channels -func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) { - newMsgsCh := make(chan SubscriptionMessage, 100) - newConvsCh := make(chan SubscriptionConversation, 100) - newWalletCh := make(chan SubscriptionWalletEvent, 100) - errorCh := make(chan error, 100) - shutdownCh := make(chan struct{}) +func (a *API) Listen(opts ListenOptions) (*Subscription, error) { done := make(chan struct{}) - - sub := &NewSubscription{ - newMsgsCh: newMsgsCh, - newConvsCh: newConvsCh, - newWalletCh: newWalletCh, - shutdownCh: shutdownCh, - errorCh: errorCh, - running: true, - } + sub := NewSubscription() a.registerSubscription(sub) pause := 2 * time.Second readScanner := func(boutput *bufio.Scanner) { defer func() { done <- struct{}{} }() for { select { - case <-shutdownCh: - log.Printf("readScanner: received shutdown") + case <-sub.shutdownCh: + a.Debug("readScanner: received shutdown") return default: } @@ -383,18 +409,18 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) { t := boutput.Text() var typeHolder TypeHolder if err := json.Unmarshal([]byte(t), &typeHolder); err != nil { - errorCh <- err + sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) break } switch typeHolder.Type { case "chat": var notification chat1.MsgNotification if err := json.Unmarshal([]byte(t), ¬ification); err != nil { - errorCh <- err + sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) break } if notification.Error != nil { - log.Printf("error message received: %s", *notification.Error) + a.Debug("error message received: %s", *notification.Error) } else if notification.Msg != nil { subscriptionMessage := SubscriptionMessage{ Message: *notification.Msg, @@ -403,30 +429,30 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) { Channel: notification.Msg.Channel, }, } - newMsgsCh <- subscriptionMessage + sub.newMsgsCh <- subscriptionMessage } case "chat_conv": var notification chat1.ConvNotification if err := json.Unmarshal([]byte(t), ¬ification); err != nil { - errorCh <- err + sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) break } if notification.Error != nil { - log.Printf("error message received: %s", *notification.Error) + a.Debug("error message received: %s", *notification.Error) } else if notification.Conv != nil { subscriptionConv := SubscriptionConversation{ Conversation: *notification.Conv, } - newConvsCh <- subscriptionConv + sub.newConvsCh <- subscriptionConv } case "wallet": var holder PaymentHolder if err := json.Unmarshal([]byte(t), &holder); err != nil { - errorCh <- err + sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t) break } subscriptionPayment := SubscriptionWalletEvent(holder) - newWalletCh <- subscriptionPayment + sub.newWalletCh <- subscriptionPayment default: continue } @@ -434,31 +460,31 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) { } attempts := 0 - maxAttempts := 1800 + maxAttempts := 30 go func() { defer func() { - close(newMsgsCh) - close(newConvsCh) - close(newWalletCh) - close(errorCh) + close(sub.newMsgsCh) + close(sub.newConvsCh) + close(sub.newWalletCh) + close(sub.errorCh) }() for { select { - case <-shutdownCh: - log.Printf("Listen: received shutdown") + case <-sub.shutdownCh: + a.Debug("Listen: received shutdown") return default: } if attempts >= maxAttempts { if err := a.LogSend("Listen: failed to auth, giving up"); err != nil { - log.Printf("Listen: logsend failed to send: %v", err) + a.Debug("Listen: logsend failed to send: %v", err) } panic("Listen: failed to auth, giving up") } attempts++ if _, err := a.auth(); err != nil { - log.Printf("Listen: failed to auth: %s", err) + a.Debug("Listen: failed to auth: %s", err) time.Sleep(pause) continue } @@ -472,13 +498,13 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) { p := a.runOpts.Command(cmdElements...) output, err := p.StdoutPipe() if err != nil { - log.Printf("Listen: failed to listen: %s", err) + a.Debug("Listen: failed to listen: %s", err) time.Sleep(pause) continue } stderr, err := p.StderrPipe() if err != nil { - log.Printf("Listen: failed to listen to stderr: %s", err) + a.Debug("Listen: failed to listen to stderr: %s", err) time.Sleep(pause) continue } @@ -486,19 +512,27 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) { boutput := bufio.NewScanner(output) if err := p.Start(); err != nil { - log.Printf("Listen: failed to make listen scanner: %s", err) + a.Debug("Listen: failed to make listen scanner: %s", err) time.Sleep(pause) continue } attempts = 0 go readScanner(boutput) - <-done + select { + case <-sub.shutdownCh: + a.Debug("Listen: received shutdown") + return + case <-done: + } if err := p.Wait(); err != nil { stderrBytes, rerr := ioutil.ReadAll(stderr) if rerr != nil { - stderrBytes = []byte("failed to get stderr") + stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr)) + } + a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes) + if err := a.startPipes(); err != nil { + a.Debug("Listen: failed to restart pipes: %v", err) } - log.Printf("Listen: failed to Wait for command: %s (```%s```)", err, stderrBytes) } time.Sleep(pause) } @@ -515,31 +549,27 @@ func (a *API) LogSend(feedback string) error { "log", "send", "--no-confirm", "--feedback", feedback, + "-n", fmt.Sprintf("%d", a.LogSendBytes), } - - // We're determining whether the service is already running by running status - // with autofork disabled. - if err := a.runOpts.Command("--no-auto-fork", "status"); err != nil { - // Assume that there's no service running, so log send as standalone - args = append([]string{"--standalone"}, args...) - } - return a.runOpts.Command(args...).Run() } -func (a *API) Shutdown() error { +func (a *API) Shutdown() (err error) { + defer a.Trace(&err, "Shutdown")() a.Lock() defer a.Unlock() for _, sub := range a.subscriptions { sub.Shutdown() } if a.apiCmd != nil { + a.Debug("waiting for API command") if err := a.apiCmd.Wait(); err != nil { return err } } if a.runOpts.Oneshot != nil { + a.Debug("logging out") err := a.runOpts.Command("logout", "--force").Run() if err != nil { return err @@ -547,6 +577,7 @@ func (a *API) Shutdown() error { } if a.runOpts.StartService { + a.Debug("stopping service") err := a.runOpts.Command("ctl", "stop", "--shutdown").Run() if err != nil { return err |