summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go')
-rw-r--r--vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go417
1 files changed, 224 insertions, 193 deletions
diff --git a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
index de76e75a..68c8ca70 100644
--- a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
+++ b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
@@ -7,7 +7,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"os"
"os/exec"
"sync"
@@ -18,63 +17,110 @@ import (
"github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1"
)
-// API is the main object used for communicating with the Keybase JSON API
-type API struct {
+// SubscriptionMessage contains a message and conversation object
+type SubscriptionMessage struct {
+ Message chat1.MsgSummary
+ Conversation chat1.ConvSummary
+}
+
+type SubscriptionConversation struct {
+ Conversation chat1.ConvSummary
+}
+
+type SubscriptionWalletEvent struct {
+ Payment stellar1.PaymentDetailsLocal
+}
+
+// Subscription has methods to control the background message fetcher loop
+type Subscription struct {
+ *DebugOutput
sync.Mutex
- apiInput io.Writer
- apiOutput *bufio.Reader
- apiCmd *exec.Cmd
- username string
- runOpts RunOptions
- subscriptions []*NewSubscription
+
+ newMsgsCh chan SubscriptionMessage
+ newConvsCh chan SubscriptionConversation
+ newWalletCh chan SubscriptionWalletEvent
+ errorCh chan error
+ running bool
+ shutdownCh chan struct{}
}
-func getUsername(runOpts RunOptions) (username string, err error) {
- p := runOpts.Command("whoami", "-json")
- output, err := p.StdoutPipe()
- if err != nil {
- return "", err
+func NewSubscription() *Subscription {
+ newMsgsCh := make(chan SubscriptionMessage, 100)
+ newConvsCh := make(chan SubscriptionConversation, 100)
+ newWalletCh := make(chan SubscriptionWalletEvent, 100)
+ errorCh := make(chan error, 100)
+ shutdownCh := make(chan struct{})
+ return &Subscription{
+ DebugOutput: NewDebugOutput("Subscription"),
+ newMsgsCh: newMsgsCh,
+ newConvsCh: newConvsCh,
+ newWalletCh: newWalletCh,
+ shutdownCh: shutdownCh,
+ errorCh: errorCh,
+ running: true,
}
- p.ExtraFiles = []*os.File{output.(*os.File)}
- if err = p.Start(); err != nil {
- return "", err
+}
+
+// Read blocks until a new message arrives
+func (m *Subscription) Read() (msg SubscriptionMessage, err error) {
+ defer m.Trace(&err, "Read")()
+ select {
+ case msg = <-m.newMsgsCh:
+ return msg, nil
+ case err = <-m.errorCh:
+ return SubscriptionMessage{}, err
+ case <-m.shutdownCh:
+ return SubscriptionMessage{}, errors.New("Subscription shutdown")
}
+}
- doneCh := make(chan error)
- go func() {
- defer func() { close(doneCh) }()
- statusJSON, err := ioutil.ReadAll(output)
- if err != nil {
- doneCh <- fmt.Errorf("error reading whoami output: %v", err)
- return
- }
- var status keybase1.CurrentStatus
- if err := json.Unmarshal(statusJSON, &status); err != nil {
- doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err)
- return
- }
- if status.LoggedIn && status.User != nil {
- username = status.User.Username
- doneCh <- nil
- } else {
- doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User)
- }
- // Cleanup the command
- if err := p.Wait(); err != nil {
- log.Printf("unable to wait for cmd: %v", err)
- }
- }()
+func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) {
+ defer m.Trace(&err, "ReadNewConvs")()
+ select {
+ case conv = <-m.newConvsCh:
+ return conv, nil
+ case err = <-m.errorCh:
+ return SubscriptionConversation{}, err
+ case <-m.shutdownCh:
+ return SubscriptionConversation{}, errors.New("Subscription shutdown")
+ }
+}
+// Read blocks until a new message arrives
+func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) {
+ defer m.Trace(&err, "ReadWallet")()
select {
- case err = <-doneCh:
- if err != nil {
- return "", err
- }
- case <-time.After(5 * time.Second):
- return "", errors.New("unable to run Keybase command")
+ case msg = <-m.newWalletCh:
+ return msg, nil
+ case err = <-m.errorCh:
+ return SubscriptionWalletEvent{}, err
+ case <-m.shutdownCh:
+ return SubscriptionWalletEvent{}, errors.New("Subscription shutdown")
}
+}
- return username, nil
+// Shutdown terminates the background process
+func (m *Subscription) Shutdown() {
+ defer m.Trace(nil, "Shutdown")()
+ m.Lock()
+ defer m.Unlock()
+ if m.running {
+ close(m.shutdownCh)
+ m.running = false
+ }
+}
+
+type ListenOptions struct {
+ Wallet bool
+ Convs bool
+}
+
+type PaymentHolder struct {
+ Payment stellar1.PaymentDetailsLocal `json:"notification"`
+}
+
+type TypeHolder struct {
+ Type string `json:"type"`
}
type OneshotOptions struct {
@@ -110,22 +156,101 @@ func (r RunOptions) Command(args ...string) *exec.Cmd {
}
// Start fires up the Keybase JSON API in stdin/stdout mode
-func Start(runOpts RunOptions) (*API, error) {
- api := &API{
- runOpts: runOpts,
- }
+func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) {
+ api := NewAPI(runOpts, opts...)
if err := api.startPipes(); err != nil {
return nil, err
}
return api, nil
}
+// API is the main object used for communicating with the Keybase JSON API
+type API struct {
+ sync.Mutex
+ *DebugOutput
+ apiInput io.Writer
+ apiOutput *bufio.Reader
+ apiCmd *exec.Cmd
+ username string
+ runOpts RunOptions
+ subscriptions []*Subscription
+ Timeout time.Duration
+ LogSendBytes int
+}
+
+func CustomTimeout(timeout time.Duration) func(*API) {
+ return func(a *API) {
+ a.Timeout = timeout
+ }
+}
+
+func NewAPI(runOpts RunOptions, opts ...func(*API)) *API {
+ api := &API{
+ DebugOutput: NewDebugOutput("API"),
+ runOpts: runOpts,
+ Timeout: 5 * time.Second,
+ LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed
+ }
+ for _, opt := range opts {
+ opt(api)
+ }
+ return api
+}
+
func (a *API) Command(args ...string) *exec.Cmd {
return a.runOpts.Command(args...)
}
+func (a *API) getUsername(runOpts RunOptions) (username string, err error) {
+ p := runOpts.Command("whoami", "-json")
+ output, err := p.StdoutPipe()
+ if err != nil {
+ return "", err
+ }
+ p.ExtraFiles = []*os.File{output.(*os.File)}
+ if err = p.Start(); err != nil {
+ return "", err
+ }
+
+ doneCh := make(chan error)
+ go func() {
+ defer func() { close(doneCh) }()
+ statusJSON, err := ioutil.ReadAll(output)
+ if err != nil {
+ doneCh <- fmt.Errorf("error reading whoami output: %v", err)
+ return
+ }
+ var status keybase1.CurrentStatus
+ if err := json.Unmarshal(statusJSON, &status); err != nil {
+ doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err)
+ return
+ }
+ if status.LoggedIn && status.User != nil {
+ username = status.User.Username
+ doneCh <- nil
+ } else {
+ doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User)
+ }
+ // Cleanup the command
+ if err := p.Wait(); err != nil {
+ a.Debug("unable to wait for cmd: %v", err)
+ }
+ }()
+
+ select {
+ case err = <-doneCh:
+ if err != nil {
+ return "", err
+ }
+ case <-time.After(a.Timeout):
+ return "", errors.New("unable to run Keybase command")
+ }
+
+ return username, nil
+}
+
func (a *API) auth() (string, error) {
- username, err := getUsername(a.runOpts)
+ username, err := a.getUsername(a.runOpts)
if err == nil {
return username, nil
}
@@ -194,8 +319,6 @@ func (a *API) startPipes() (err error) {
return nil
}
-var errAPIDisconnected = errors.New("chat API disconnected")
-
func (a *API) getAPIPipesLocked() (io.Writer, *bufio.Reader, error) {
// this should only be called inside a lock
if a.apiCmd == nil {
@@ -214,7 +337,7 @@ func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
bArg, err := json.Marshal(arg)
if err != nil {
- return SendResponse{}, err
+ return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err)
}
input, output, err := a.getAPIPipesLocked()
if err != nil {
@@ -228,7 +351,7 @@ func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
return SendResponse{}, err
}
if err := json.Unmarshal(responseRaw, &resp); err != nil {
- return resp, fmt.Errorf("failed to decode API response: %s", err)
+ return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err)
} else if resp.Error != nil {
return resp, errors.New(resp.Error.Message)
}
@@ -254,97 +377,13 @@ func (a *API) doFetch(apiInput string) ([]byte, error) {
return byteOutput, nil
}
-// SubscriptionMessage contains a message and conversation object
-type SubscriptionMessage struct {
- Message chat1.MsgSummary
- Conversation chat1.ConvSummary
-}
-
-type SubscriptionConversation struct {
- Conversation chat1.ConvSummary
-}
-
-type SubscriptionWalletEvent struct {
- Payment stellar1.PaymentDetailsLocal
-}
-
-// NewSubscription has methods to control the background message fetcher loop
-type NewSubscription struct {
- sync.Mutex
-
- newMsgsCh <-chan SubscriptionMessage
- newConvsCh <-chan SubscriptionConversation
- newWalletCh <-chan SubscriptionWalletEvent
- errorCh <-chan error
- running bool
- shutdownCh chan struct{}
-}
-
-// Read blocks until a new message arrives
-func (m *NewSubscription) Read() (SubscriptionMessage, error) {
- select {
- case msg := <-m.newMsgsCh:
- return msg, nil
- case err := <-m.errorCh:
- return SubscriptionMessage{}, err
- case <-m.shutdownCh:
- return SubscriptionMessage{}, errors.New("Subscription shutdown")
- }
-}
-
-func (m *NewSubscription) ReadNewConvs() (SubscriptionConversation, error) {
- select {
- case conv := <-m.newConvsCh:
- return conv, nil
- case err := <-m.errorCh:
- return SubscriptionConversation{}, err
- case <-m.shutdownCh:
- return SubscriptionConversation{}, errors.New("Subscription shutdown")
- }
-}
-
-// Read blocks until a new message arrives
-func (m *NewSubscription) ReadWallet() (SubscriptionWalletEvent, error) {
- select {
- case msg := <-m.newWalletCh:
- return msg, nil
- case err := <-m.errorCh:
- return SubscriptionWalletEvent{}, err
- case <-m.shutdownCh:
- return SubscriptionWalletEvent{}, errors.New("Subscription shutdown")
- }
-}
-
-// Shutdown terminates the background process
-func (m *NewSubscription) Shutdown() {
- m.Lock()
- defer m.Unlock()
- if m.running {
- close(m.shutdownCh)
- m.running = false
- }
-}
-
-type ListenOptions struct {
- Wallet bool
- Convs bool
-}
-
-type PaymentHolder struct {
- Payment stellar1.PaymentDetailsLocal `json:"notification"`
-}
-
-type TypeHolder struct {
- Type string `json:"type"`
-}
-
// ListenForNewTextMessages proxies to Listen without wallet events
-func (a *API) ListenForNewTextMessages() (*NewSubscription, error) {
+func (a *API) ListenForNewTextMessages() (*Subscription, error) {
opts := ListenOptions{Wallet: false}
return a.Listen(opts)
}
-func (a *API) registerSubscription(sub *NewSubscription) {
+func (a *API) registerSubscription(sub *Subscription) {
a.Lock()
defer a.Unlock()
a.subscriptions = append(a.subscriptions, sub)
@@ -352,30 +391,17 @@ func (a *API) registerSubscription(sub *NewSubscription) {
// Listen fires of a background loop and puts chat messages and wallet
// events into channels
-func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) {
- newMsgsCh := make(chan SubscriptionMessage, 100)
- newConvsCh := make(chan SubscriptionConversation, 100)
- newWalletCh := make(chan SubscriptionWalletEvent, 100)
- errorCh := make(chan error, 100)
- shutdownCh := make(chan struct{})
+func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
done := make(chan struct{})
-
- sub := &NewSubscription{
- newMsgsCh: newMsgsCh,
- newConvsCh: newConvsCh,
- newWalletCh: newWalletCh,
- shutdownCh: shutdownCh,
- errorCh: errorCh,
- running: true,
- }
+ sub := NewSubscription()
a.registerSubscription(sub)
pause := 2 * time.Second
readScanner := func(boutput *bufio.Scanner) {
defer func() { done <- struct{}{} }()
for {
select {
- case <-shutdownCh:
- log.Printf("readScanner: received shutdown")
+ case <-sub.shutdownCh:
+ a.Debug("readScanner: received shutdown")
return
default:
}
@@ -383,18 +409,18 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) {
t := boutput.Text()
var typeHolder TypeHolder
if err := json.Unmarshal([]byte(t), &typeHolder); err != nil {
- errorCh <- err
+ sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
break
}
switch typeHolder.Type {
case "chat":
var notification chat1.MsgNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
- errorCh <- err
+ sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
break
}
if notification.Error != nil {
- log.Printf("error message received: %s", *notification.Error)
+ a.Debug("error message received: %s", *notification.Error)
} else if notification.Msg != nil {
subscriptionMessage := SubscriptionMessage{
Message: *notification.Msg,
@@ -403,30 +429,30 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) {
Channel: notification.Msg.Channel,
},
}
- newMsgsCh <- subscriptionMessage
+ sub.newMsgsCh <- subscriptionMessage
}
case "chat_conv":
var notification chat1.ConvNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
- errorCh <- err
+ sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
break
}
if notification.Error != nil {
- log.Printf("error message received: %s", *notification.Error)
+ a.Debug("error message received: %s", *notification.Error)
} else if notification.Conv != nil {
subscriptionConv := SubscriptionConversation{
Conversation: *notification.Conv,
}
- newConvsCh <- subscriptionConv
+ sub.newConvsCh <- subscriptionConv
}
case "wallet":
var holder PaymentHolder
if err := json.Unmarshal([]byte(t), &holder); err != nil {
- errorCh <- err
+ sub.errorCh <- fmt.Errorf("err: %v, data: %v", err, t)
break
}
subscriptionPayment := SubscriptionWalletEvent(holder)
- newWalletCh <- subscriptionPayment
+ sub.newWalletCh <- subscriptionPayment
default:
continue
}
@@ -434,31 +460,31 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) {
}
attempts := 0
- maxAttempts := 1800
+ maxAttempts := 30
go func() {
defer func() {
- close(newMsgsCh)
- close(newConvsCh)
- close(newWalletCh)
- close(errorCh)
+ close(sub.newMsgsCh)
+ close(sub.newConvsCh)
+ close(sub.newWalletCh)
+ close(sub.errorCh)
}()
for {
select {
- case <-shutdownCh:
- log.Printf("Listen: received shutdown")
+ case <-sub.shutdownCh:
+ a.Debug("Listen: received shutdown")
return
default:
}
if attempts >= maxAttempts {
if err := a.LogSend("Listen: failed to auth, giving up"); err != nil {
- log.Printf("Listen: logsend failed to send: %v", err)
+ a.Debug("Listen: logsend failed to send: %v", err)
}
panic("Listen: failed to auth, giving up")
}
attempts++
if _, err := a.auth(); err != nil {
- log.Printf("Listen: failed to auth: %s", err)
+ a.Debug("Listen: failed to auth: %s", err)
time.Sleep(pause)
continue
}
@@ -472,13 +498,13 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) {
p := a.runOpts.Command(cmdElements...)
output, err := p.StdoutPipe()
if err != nil {
- log.Printf("Listen: failed to listen: %s", err)
+ a.Debug("Listen: failed to listen: %s", err)
time.Sleep(pause)
continue
}
stderr, err := p.StderrPipe()
if err != nil {
- log.Printf("Listen: failed to listen to stderr: %s", err)
+ a.Debug("Listen: failed to listen to stderr: %s", err)
time.Sleep(pause)
continue
}
@@ -486,19 +512,27 @@ func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) {
boutput := bufio.NewScanner(output)
if err := p.Start(); err != nil {
- log.Printf("Listen: failed to make listen scanner: %s", err)
+ a.Debug("Listen: failed to make listen scanner: %s", err)
time.Sleep(pause)
continue
}
attempts = 0
go readScanner(boutput)
- <-done
+ select {
+ case <-sub.shutdownCh:
+ a.Debug("Listen: received shutdown")
+ return
+ case <-done:
+ }
if err := p.Wait(); err != nil {
stderrBytes, rerr := ioutil.ReadAll(stderr)
if rerr != nil {
- stderrBytes = []byte("failed to get stderr")
+ stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr))
+ }
+ a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes)
+ if err := a.startPipes(); err != nil {
+ a.Debug("Listen: failed to restart pipes: %v", err)
}
- log.Printf("Listen: failed to Wait for command: %s (```%s```)", err, stderrBytes)
}
time.Sleep(pause)
}
@@ -515,31 +549,27 @@ func (a *API) LogSend(feedback string) error {
"log", "send",
"--no-confirm",
"--feedback", feedback,
+ "-n", fmt.Sprintf("%d", a.LogSendBytes),
}
-
- // We're determining whether the service is already running by running status
- // with autofork disabled.
- if err := a.runOpts.Command("--no-auto-fork", "status"); err != nil {
- // Assume that there's no service running, so log send as standalone
- args = append([]string{"--standalone"}, args...)
- }
-
return a.runOpts.Command(args...).Run()
}
-func (a *API) Shutdown() error {
+func (a *API) Shutdown() (err error) {
+ defer a.Trace(&err, "Shutdown")()
a.Lock()
defer a.Unlock()
for _, sub := range a.subscriptions {
sub.Shutdown()
}
if a.apiCmd != nil {
+ a.Debug("waiting for API command")
if err := a.apiCmd.Wait(); err != nil {
return err
}
}
if a.runOpts.Oneshot != nil {
+ a.Debug("logging out")
err := a.runOpts.Command("logout", "--force").Run()
if err != nil {
return err
@@ -547,6 +577,7 @@ func (a *API) Shutdown() error {
}
if a.runOpts.StartService {
+ a.Debug("stopping service")
err := a.runOpts.Command("ctl", "stop", "--shutdown").Run()
if err != nil {
return err