diff options
Diffstat (limited to 'vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go')
-rw-r--r-- | vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go | 485 |
1 files changed, 158 insertions, 327 deletions
diff --git a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go index c735052c..0a74dd90 100644 --- a/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go +++ b/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go @@ -6,21 +6,26 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "os/exec" "strings" "sync" "time" + + "github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1" + "github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1" ) // API is the main object used for communicating with the Keybase JSON API type API struct { sync.Mutex - apiInput io.Writer - apiOutput *bufio.Reader - apiCmd *exec.Cmd - username string - runOpts RunOptions + apiInput io.Writer + apiOutput *bufio.Reader + apiCmd *exec.Cmd + username string + runOpts RunOptions + subscriptions []*NewSubscription } func getUsername(runOpts RunOptions) (username string, err error) { @@ -40,9 +45,10 @@ func getUsername(runOpts RunOptions) (username string, err error) { doneCh <- errors.New("unable to find Keybase username") return } - toks := strings.Fields(scanner.Text()) + text := scanner.Text() + toks := strings.Fields(text) if len(toks) != 2 { - doneCh <- errors.New("invalid Keybase username output") + doneCh <- fmt.Errorf("invalid Keybase username output: %q", text) return } username = toks[1] @@ -71,6 +77,10 @@ type RunOptions struct { HomeDir string Oneshot *OneshotOptions StartService bool + // Have the bot send/receive typing notifications + EnableTyping bool + // Disable bot lite mode + DisableBotLiteMode bool } func (r RunOptions) Location() string { @@ -100,6 +110,10 @@ func Start(runOpts RunOptions) (*API, error) { return api, nil } +func (a *API) Command(args ...string) *exec.Cmd { + return a.runOpts.Command(args...) +} + func (a *API) auth() (string, error) { username, err := getUsername(a.runOpts) if err == nil { @@ -132,17 +146,28 @@ func (a *API) startPipes() (err error) { a.Lock() defer a.Unlock() if a.apiCmd != nil { - a.apiCmd.Process.Kill() + if err := a.apiCmd.Process.Kill(); err != nil { + return err + } } a.apiCmd = nil if a.runOpts.StartService { - a.runOpts.Command("service").Start() + args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"} + if err := a.runOpts.Command(args...).Start(); err != nil { + return err + } } if a.username, err = a.auth(); err != nil { return err } + + cmd := a.runOpts.Command("chat", "notification-settings", fmt.Sprintf("-disable-typing=%v", !a.runOpts.EnableTyping)) + if err = cmd.Run(); err != nil { + return err + } + a.apiCmd = a.runOpts.Command("chat", "api") if a.apiInput, err = a.apiCmd.StdinPipe(); err != nil { return err @@ -168,73 +193,11 @@ func (a *API) getAPIPipesLocked() (io.Writer, *bufio.Reader, error) { return a.apiInput, a.apiOutput, nil } -// GetConversations reads all conversations from the current user's inbox. -func (a *API) GetConversations(unreadOnly bool) ([]Conversation, error) { - apiInput := fmt.Sprintf(`{"method":"list", "params": { "options": { "unread_only": %v}}}`, unreadOnly) - output, err := a.doFetch(apiInput) - if err != nil { - return nil, err - } - - var inbox Inbox - if err := json.Unmarshal(output, &inbox); err != nil { - return nil, err - } - return inbox.Result.Convs, nil -} - -// GetTextMessages fetches all text messages from a given channel. Optionally can filter -// ont unread status. -func (a *API) GetTextMessages(channel Channel, unreadOnly bool) ([]Message, error) { - channelBytes, err := json.Marshal(channel) - if err != nil { - return nil, err - } - apiInput := fmt.Sprintf(`{"method": "read", "params": {"options": {"channel": %s}}}`, string(channelBytes)) - output, err := a.doFetch(apiInput) - if err != nil { - return nil, err - } - - var thread Thread - - if err := json.Unmarshal(output, &thread); err != nil { - return nil, fmt.Errorf("unable to decode thread: %s", err.Error()) - } - - var res []Message - for _, msg := range thread.Result.Messages { - if msg.Msg.Content.Type == "text" { - res = append(res, msg.Msg) - } - } - - return res, nil -} - -type sendMessageBody struct { - Body string -} - -type sendMessageOptions struct { - Channel Channel `json:"channel,omitempty"` - ConversationID string `json:"conversation_id,omitempty"` - Message sendMessageBody `json:",omitempty"` - Filename string `json:"filename,omitempty"` - Title string `json:"title,omitempty"` - MsgID int `json:"message_id,omitempty"` -} - -type sendMessageParams struct { - Options sendMessageOptions -} - -type sendMessageArg struct { - Method string - Params sendMessageParams +func (a *API) GetUsername() string { + return a.username } -func (a *API) doSend(arg interface{}) (response SendResponse, err error) { +func (a *API) doSend(arg interface{}) (resp SendResponse, err error) { a.Lock() defer a.Unlock() @@ -253,10 +216,12 @@ func (a *API) doSend(arg interface{}) (response SendResponse, err error) { if err != nil { return SendResponse{}, err } - if err := json.Unmarshal(responseRaw, &response); err != nil { - return SendResponse{}, fmt.Errorf("failed to decode API response: %s", err) + if err := json.Unmarshal(responseRaw, &resp); err != nil { + return resp, fmt.Errorf("failed to decode API response: %s", err) + } else if resp.Error != nil { + return resp, errors.New(resp.Error.Message) } - return response, nil + return resp, nil } func (a *API) doFetch(apiInput string) ([]byte, error) { @@ -278,234 +243,121 @@ func (a *API) doFetch(apiInput string) ([]byte, error) { return byteOutput, nil } -func (a *API) SendMessage(channel Channel, body string) (SendResponse, error) { - arg := sendMessageArg{ - Method: "send", - Params: sendMessageParams{ - Options: sendMessageOptions{ - Channel: channel, - Message: sendMessageBody{ - Body: body, - }, - }, - }, - } - return a.doSend(arg) -} - -func (a *API) SendMessageByConvID(convID string, body string) (SendResponse, error) { - arg := sendMessageArg{ - Method: "send", - Params: sendMessageParams{ - Options: sendMessageOptions{ - ConversationID: convID, - Message: sendMessageBody{ - Body: body, - }, - }, - }, - } - return a.doSend(arg) -} - -// SendMessageByTlfName sends a message on the given TLF name -func (a *API) SendMessageByTlfName(tlfName string, body string) (SendResponse, error) { - arg := sendMessageArg{ - Method: "send", - Params: sendMessageParams{ - Options: sendMessageOptions{ - Channel: Channel{ - Name: tlfName, - }, - Message: sendMessageBody{ - Body: body, - }, - }, - }, - } - return a.doSend(arg) -} - -func (a *API) SendMessageByTeamName(teamName string, body string, inChannel *string) (SendResponse, error) { - channel := "general" - if inChannel != nil { - channel = *inChannel - } - arg := sendMessageArg{ - Method: "send", - Params: sendMessageParams{ - Options: sendMessageOptions{ - Channel: Channel{ - MembersType: "team", - Name: teamName, - TopicName: channel, - }, - Message: sendMessageBody{ - Body: body, - }, - }, - }, - } - return a.doSend(arg) -} - -func (a *API) SendAttachmentByTeam(teamName string, filename string, title string, inChannel *string) (SendResponse, error) { - channel := "general" - if inChannel != nil { - channel = *inChannel - } - arg := sendMessageArg{ - Method: "attach", - Params: sendMessageParams{ - Options: sendMessageOptions{ - Channel: Channel{ - MembersType: "team", - Name: teamName, - TopicName: channel, - }, - Filename: filename, - Title: title, - }, - }, - } - return a.doSend(arg) -} - -type reactionOptions struct { - ConversationID string `json:"conversation_id"` - Message sendMessageBody - MsgID int `json:"message_id"` - Channel Channel `json:"channel"` -} - -type reactionParams struct { - Options reactionOptions -} - -type reactionArg struct { - Method string - Params reactionParams -} - -func newReactionArg(options reactionOptions) reactionArg { - return reactionArg{ - Method: "reaction", - Params: reactionParams{Options: options}, - } -} - -func (a *API) ReactByChannel(channel Channel, msgID int, reaction string) (SendResponse, error) { - arg := newReactionArg(reactionOptions{ - Message: sendMessageBody{Body: reaction}, - MsgID: msgID, - Channel: channel, - }) - return a.doSend(arg) -} - -func (a *API) ReactByConvID(convID string, msgID int, reaction string) (SendResponse, error) { - arg := newReactionArg(reactionOptions{ - Message: sendMessageBody{Body: reaction}, - MsgID: msgID, - ConversationID: convID, - }) - return a.doSend(arg) -} - -type advertiseParams struct { - Options Advertisement -} - -type advertiseMsgArg struct { - Method string - Params advertiseParams -} - -func newAdvertiseMsgArg(ad Advertisement) advertiseMsgArg { - return advertiseMsgArg{ - Method: "advertisecommands", - Params: advertiseParams{ - Options: ad, - }, - } -} - -func (a *API) AdvertiseCommands(ad Advertisement) (SendResponse, error) { - return a.doSend(newAdvertiseMsgArg(ad)) -} - -func (a *API) Username() string { - return a.username -} - // SubscriptionMessage contains a message and conversation object type SubscriptionMessage struct { - Message Message - Conversation Conversation + Message chat1.MsgSummary + Conversation chat1.ConvSummary +} + +type SubscriptionConversation struct { + Conversation chat1.ConvSummary } type SubscriptionWalletEvent struct { - Payment Payment + Payment stellar1.PaymentDetailsLocal } // NewSubscription has methods to control the background message fetcher loop type NewSubscription struct { + sync.Mutex + newMsgsCh <-chan SubscriptionMessage + newConvsCh <-chan SubscriptionConversation newWalletCh <-chan SubscriptionWalletEvent errorCh <-chan error + running bool shutdownCh chan struct{} } // Read blocks until a new message arrives -func (m NewSubscription) Read() (SubscriptionMessage, error) { +func (m *NewSubscription) Read() (SubscriptionMessage, error) { select { case msg := <-m.newMsgsCh: return msg, nil case err := <-m.errorCh: return SubscriptionMessage{}, err + case <-m.shutdownCh: + return SubscriptionMessage{}, errors.New("Subscription shutdown") + } +} + +func (m *NewSubscription) ReadNewConvs() (SubscriptionConversation, error) { + select { + case conv := <-m.newConvsCh: + return conv, nil + case err := <-m.errorCh: + return SubscriptionConversation{}, err + case <-m.shutdownCh: + return SubscriptionConversation{}, errors.New("Subscription shutdown") } } // Read blocks until a new message arrives -func (m NewSubscription) ReadWallet() (SubscriptionWalletEvent, error) { +func (m *NewSubscription) ReadWallet() (SubscriptionWalletEvent, error) { select { case msg := <-m.newWalletCh: return msg, nil case err := <-m.errorCh: return SubscriptionWalletEvent{}, err + case <-m.shutdownCh: + return SubscriptionWalletEvent{}, errors.New("Subscription shutdown") } } // Shutdown terminates the background process -func (m NewSubscription) Shutdown() { - m.shutdownCh <- struct{}{} +func (m *NewSubscription) Shutdown() { + m.Lock() + defer m.Unlock() + if m.running { + close(m.shutdownCh) + m.running = false + } } type ListenOptions struct { Wallet bool + Convs bool +} + +type PaymentHolder struct { + Payment stellar1.PaymentDetailsLocal `json:"notification"` +} + +type TypeHolder struct { + Type string `json:"type"` } // ListenForNewTextMessages proxies to Listen without wallet events -func (a *API) ListenForNewTextMessages() (NewSubscription, error) { +func (a *API) ListenForNewTextMessages() (*NewSubscription, error) { opts := ListenOptions{Wallet: false} return a.Listen(opts) } +func (a *API) registerSubscription(sub *NewSubscription) { + a.Lock() + defer a.Unlock() + a.subscriptions = append(a.subscriptions, sub) +} + // Listen fires of a background loop and puts chat messages and wallet // events into channels -func (a *API) Listen(opts ListenOptions) (NewSubscription, error) { - newMsgCh := make(chan SubscriptionMessage, 100) +func (a *API) Listen(opts ListenOptions) (*NewSubscription, error) { + newMsgsCh := make(chan SubscriptionMessage, 100) + newConvsCh := make(chan SubscriptionConversation, 100) newWalletCh := make(chan SubscriptionWalletEvent, 100) errorCh := make(chan error, 100) shutdownCh := make(chan struct{}) done := make(chan struct{}) - sub := NewSubscription{ - newMsgsCh: newMsgCh, + sub := &NewSubscription{ + newMsgsCh: newMsgsCh, + newConvsCh: newConvsCh, newWalletCh: newWalletCh, shutdownCh: shutdownCh, errorCh: errorCh, + running: true, } + a.registerSubscription(sub) pause := 2 * time.Second readScanner := func(boutput *bufio.Scanner) { for { @@ -518,28 +370,44 @@ func (a *API) Listen(opts ListenOptions) (NewSubscription, error) { } switch typeHolder.Type { case "chat": - var holder MessageHolder - if err := json.Unmarshal([]byte(t), &holder); err != nil { + var notification chat1.MsgNotification + if err := json.Unmarshal([]byte(t), ¬ification); err != nil { errorCh <- err break } - subscriptionMessage := SubscriptionMessage{ - Message: holder.Msg, - Conversation: Conversation{ - ID: holder.Msg.ConversationID, - Channel: holder.Msg.Channel, - }, + if notification.Error != nil { + log.Printf("error message received: %s", *notification.Error) + } else if notification.Msg != nil { + subscriptionMessage := SubscriptionMessage{ + Message: *notification.Msg, + Conversation: chat1.ConvSummary{ + Id: notification.Msg.ConvID, + Channel: notification.Msg.Channel, + }, + } + newMsgsCh <- subscriptionMessage + } + case "chat_conv": + var notification chat1.ConvNotification + if err := json.Unmarshal([]byte(t), ¬ification); err != nil { + errorCh <- err + break + } + if notification.Error != nil { + log.Printf("error message received: %s", *notification.Error) + } else if notification.Conv != nil { + subscriptionConv := SubscriptionConversation{ + Conversation: *notification.Conv, + } + newConvsCh <- subscriptionConv } - newMsgCh <- subscriptionMessage case "wallet": var holder PaymentHolder if err := json.Unmarshal([]byte(t), &holder); err != nil { errorCh <- err break } - subscriptionPayment := SubscriptionWalletEvent{ - Payment: holder.Payment, - } + subscriptionPayment := SubscriptionWalletEvent(holder) newWalletCh <- subscriptionPayment default: continue @@ -552,6 +420,13 @@ func (a *API) Listen(opts ListenOptions) (NewSubscription, error) { maxAttempts := 1800 go func() { for { + select { + case <-shutdownCh: + log.Printf("Listen: received shutdown") + return + default: + } + if attempts >= maxAttempts { panic("Listen: failed to auth, giving up") } @@ -565,6 +440,9 @@ func (a *API) Listen(opts ListenOptions) (NewSubscription, error) { if opts.Wallet { cmdElements = append(cmdElements, "--wallet") } + if opts.Convs { + cmdElements = append(cmdElements, "--convs") + } p := a.runOpts.Command(cmdElements...) output, err := p.StdoutPipe() if err != nil { @@ -572,6 +450,12 @@ func (a *API) Listen(opts ListenOptions) (NewSubscription, error) { time.Sleep(pause) continue } + stderr, err := p.StderrPipe() + if err != nil { + log.Printf("Listen: failed to listen to stderr: %s", err) + time.Sleep(pause) + continue + } boutput := bufio.NewScanner(output) if err := p.Start(); err != nil { log.Printf("Listen: failed to make listen scanner: %s", err) @@ -581,78 +465,19 @@ func (a *API) Listen(opts ListenOptions) (NewSubscription, error) { attempts = 0 go readScanner(boutput) <-done - p.Wait() + if err := p.Wait(); err != nil { + stderrBytes, rerr := ioutil.ReadAll(stderr) + if rerr != nil { + stderrBytes = []byte("failed to get stderr") + } + log.Printf("Listen: failed to Wait for command: %s (```%s```)", err, stderrBytes) + } time.Sleep(pause) } }() return sub, nil } -func (a *API) GetUsername() string { - return a.username -} - -func (a *API) ListChannels(teamName string) ([]string, error) { - apiInput := fmt.Sprintf(`{"method": "listconvsonname", "params": {"options": {"topic_type": "CHAT", "members_type": "team", "name": "%s"}}}`, teamName) - output, err := a.doFetch(apiInput) - if err != nil { - return nil, err - } - - var channelsList ChannelsList - if err := json.Unmarshal(output, &channelsList); err != nil { - return nil, err - } - - var channels []string - for _, conv := range channelsList.Result.Convs { - channels = append(channels, conv.Channel.TopicName) - } - return channels, nil -} - -func (a *API) JoinChannel(teamName string, channelName string) (JoinChannelResult, error) { - empty := JoinChannelResult{} - - apiInput := fmt.Sprintf(`{"method": "join", "params": {"options": {"channel": {"name": "%s", "members_type": "team", "topic_name": "%s"}}}}`, teamName, channelName) - output, err := a.doFetch(apiInput) - if err != nil { - return empty, err - } - - joinChannel := JoinChannel{} - err = json.Unmarshal(output, &joinChannel) - if err != nil { - return empty, fmt.Errorf("failed to parse output from keybase team api: %v", err) - } - if joinChannel.Error.Message != "" { - return empty, fmt.Errorf("received error from keybase team api: %s", joinChannel.Error.Message) - } - - return joinChannel.Result, nil -} - -func (a *API) LeaveChannel(teamName string, channelName string) (LeaveChannelResult, error) { - empty := LeaveChannelResult{} - - apiInput := fmt.Sprintf(`{"method": "leave", "params": {"options": {"channel": {"name": "%s", "members_type": "team", "topic_name": "%s"}}}}`, teamName, channelName) - output, err := a.doFetch(apiInput) - if err != nil { - return empty, err - } - - leaveChannel := LeaveChannel{} - err = json.Unmarshal(output, &leaveChannel) - if err != nil { - return empty, fmt.Errorf("failed to parse output from keybase team api: %v", err) - } - if leaveChannel.Error.Message != "" { - return empty, fmt.Errorf("received error from keybase team api: %s", leaveChannel.Error.Message) - } - - return leaveChannel.Result, nil -} - func (a *API) LogSend(feedback string) error { feedback = "go-keybase-chat-bot log send\n" + "username: " + a.GetUsername() + "\n" + @@ -675,6 +500,12 @@ func (a *API) LogSend(feedback string) error { } func (a *API) Shutdown() error { + a.Lock() + defer a.Unlock() + for _, sub := range a.subscriptions { + sub.Shutdown() + } + if a.runOpts.Oneshot != nil { err := a.runOpts.Command("logout", "--force").Run() if err != nil { |