From 3724cc3a15c76a0a0dd1cac15736aebb0410d8ca Mon Sep 17 00:00:00 2001 From: Duco van Amstel Date: Thu, 30 May 2019 11:31:54 +0100 Subject: Clean-up XMPP handling code (#831) --- bridge/xmpp/xmpp.go | 206 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 129 insertions(+), 77 deletions(-) (limited to 'bridge') diff --git a/bridge/xmpp/xmpp.go b/bridge/xmpp/xmpp.go index 6fc573bf..66368087 100644 --- a/bridge/xmpp/xmpp.go +++ b/bridge/xmpp/xmpp.go @@ -14,50 +14,29 @@ import ( ) type Bxmpp struct { - xc *xmpp.Client - xmppMap map[string]string *bridge.Config + startTime time.Time + xc *xmpp.Client + xmppMap map[string]string } func New(cfg *bridge.Config) bridge.Bridger { - b := &Bxmpp{Config: cfg} - b.xmppMap = make(map[string]string) - return b + return &Bxmpp{ + Config: cfg, + xmppMap: make(map[string]string), + } } func (b *Bxmpp) Connect() error { - var err error b.Log.Infof("Connecting %s", b.GetString("Server")) - b.xc, err = b.createXMPP() - if err != nil { + if err := b.createXMPP(); err != nil { b.Log.Debugf("%#v", err) return err } + b.Log.Info("Connection succeeded") - go func() { - initial := true - bf := &backoff.Backoff{ - Min: time.Second, - Max: 5 * time.Minute, - Jitter: true, - } - for { - if initial { - b.handleXMPP() - initial = false - } - d := bf.Duration() - b.Log.Infof("Disconnected. Reconnecting in %s", d) - time.Sleep(d) - b.xc, err = b.createXMPP() - if err == nil { - b.Remote <- config.Message{Username: "system", Text: "rejoin", Channel: "", Account: b.Account, Event: config.EventRejoinChannels} - b.handleXMPP() - bf.Reset() - } - } - }() + go b.manageConnection() return nil } @@ -82,34 +61,48 @@ func (b *Bxmpp) Send(msg config.Message) (string, error) { } b.Log.Debugf("=> Receiving %#v", msg) - // Upload a file (in xmpp case send the upload URL because xmpp has no native upload support) + // Upload a file (in XMPP case send the upload URL because XMPP has no native upload support). if msg.Extra != nil { for _, rmsg := range helper.HandleExtra(&msg, b.General) { - b.xc.Send(xmpp.Chat{Type: "groupchat", Remote: rmsg.Channel + "@" + b.GetString("Muc"), Text: rmsg.Username + rmsg.Text}) + b.Log.Debugf("=> Sending attachement message %#v", rmsg) + if _, err := b.xc.Send(xmpp.Chat{ + Type: "groupchat", + Remote: rmsg.Channel + "@" + b.GetString("Muc"), + Text: rmsg.Username + rmsg.Text, + }); err != nil { + b.Log.WithError(err).Error("Unable to send message with share URL.") + } } if len(msg.Extra["file"]) > 0 { - return b.handleUploadFile(&msg) + return "", b.handleUploadFile(&msg) } } - var msgreplaceid string - msgid := xid.New().String() + var msgReplaceID string + msgID := xid.New().String() if msg.ID != "" { - msgid = msg.ID - msgreplaceid = msg.ID + msgID = msg.ID + msgReplaceID = msg.ID } - // Post normal message - _, err := b.xc.Send(xmpp.Chat{Type: "groupchat", Remote: msg.Channel + "@" + b.GetString("Muc"), Text: msg.Username + msg.Text, ID: msgid, ReplaceID: msgreplaceid}) - if err != nil { + // Post normal message. + b.Log.Debugf("=> Sending message %#v", msg) + if _, err := b.xc.Send(xmpp.Chat{ + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Text: msg.Username + msg.Text, + ID: msgID, + ReplaceID: msgReplaceID, + }); err != nil { return "", err } - return msgid, nil + return msgID, nil } -func (b *Bxmpp) createXMPP() (*xmpp.Client, error) { - tc := new(tls.Config) - tc.InsecureSkipVerify = b.GetBool("SkipTLSVerify") - tc.ServerName = strings.Split(b.GetString("Server"), ":")[0] +func (b *Bxmpp) createXMPP() error { + tc := &tls.Config{ + ServerName: strings.Split(b.GetString("Server"), ":")[0], + InsecureSkipVerify: b.GetBool("SkipTLSVerify"), // nolint: gosec + } options := xmpp.Options{ Host: b.GetString("Server"), User: b.GetString("Jid"), @@ -127,7 +120,51 @@ func (b *Bxmpp) createXMPP() (*xmpp.Client, error) { } var err error b.xc, err = options.NewClient() - return b.xc, err + return err +} + +func (b *Bxmpp) manageConnection() { + initial := true + bf := &backoff.Backoff{ + Min: time.Second, + Max: 5 * time.Minute, + Jitter: true, + } + + // Main connection loop. Each iteration corresponds to a successful + // connection attempt and the subsequent handling of the connection. + for { + if initial { + initial = false + } else { + b.Remote <- config.Message{ + Username: "system", + Text: "rejoin", + Channel: "", + Account: b.Account, + Event: config.EventRejoinChannels, + } + } + + if err := b.handleXMPP(); err != nil { + b.Log.WithError(err).Error("Disconnected.") + } + + // Reconnection loop using an exponential back-off strategy. We + // only break out of the loop if we have successfully reconnected. + for { + d := bf.Duration() + b.Log.Infof("Reconnecting in %s.", d) + time.Sleep(d) + + b.Log.Infof("Reconnecting now.") + if err := b.createXMPP(); err == nil { + bf.Reset() + break + } + b.Log.Warn("Failed to reconnect.") + } + } } func (b *Bxmpp) xmppKeepAlive() chan bool { @@ -139,8 +176,7 @@ func (b *Bxmpp) xmppKeepAlive() chan bool { select { case <-ticker.C: b.Log.Debugf("PING") - err := b.xc.PingC2S("", "") - if err != nil { + if err := b.xc.PingC2S("", ""); err != nil { b.Log.Debugf("PING failed %#v", err) } case <-done: @@ -152,31 +188,35 @@ func (b *Bxmpp) xmppKeepAlive() chan bool { } func (b *Bxmpp) handleXMPP() error { - var ok bool - var msgid string b.startTime = time.Now() + done := b.xmppKeepAlive() defer close(done) + for { m, err := b.xc.Recv() if err != nil { return err } + switch v := m.(type) { case xmpp.Chat: if v.Type == "groupchat" { b.Log.Debugf("== Receiving %#v", v) - event := "" - // skip invalid messages + + // Skip invalid messages. if b.skipMessage(v) { continue } + + var event string if strings.Contains(v.Text, "has set the subject to:") { event = config.EventTopicChange } - msgid = v.ID + + msgID := v.ID if v.ReplaceID != "" { - msgid = v.ReplaceID + msgID = v.ReplaceID } rmsg := config.Message{ Username: b.parseNick(v.Remote), @@ -184,21 +224,23 @@ func (b *Bxmpp) handleXMPP() error { Channel: b.parseChannel(v.Remote), Account: b.Account, UserID: v.Remote, - ID: msgid, + ID: msgID, Event: event, } - // check if we have an action event + // Check if we have an action event. + var ok bool rmsg.Text, ok = b.replaceAction(rmsg.Text) if ok { rmsg.Event = config.EventUserAction } + b.Log.Debugf("<= Sending message from %s on %s to gateway", rmsg.Username, b.Account) b.Log.Debugf("<= Message is %#v", rmsg) b.Remote <- rmsg } case xmpp.Presence: - // do nothing + // Do nothing. } } } @@ -211,30 +253,41 @@ func (b *Bxmpp) replaceAction(text string) (string, bool) { } // handleUploadFile handles native upload of files -func (b *Bxmpp) handleUploadFile(msg *config.Message) (string, error) { - var urldesc = "" +func (b *Bxmpp) handleUploadFile(msg *config.Message) error { + var urlDesc string - for _, f := range msg.Extra["file"] { - fi := f.(config.FileInfo) - if fi.Comment != "" { - msg.Text += fi.Comment + ": " + for _, file := range msg.Extra["file"] { + fileInfo := file.(config.FileInfo) + if fileInfo.Comment != "" { + msg.Text += fileInfo.Comment + ": " } - if fi.URL != "" { - msg.Text = fi.URL - if fi.Comment != "" { - msg.Text = fi.Comment + ": " + fi.URL - urldesc = fi.Comment + if fileInfo.URL != "" { + msg.Text = fileInfo.URL + if fileInfo.Comment != "" { + msg.Text = fileInfo.Comment + ": " + fileInfo.URL + urlDesc = fileInfo.Comment } } - _, err := b.xc.Send(xmpp.Chat{Type: "groupchat", Remote: msg.Channel + "@" + b.GetString("Muc"), Text: msg.Username + msg.Text}) - if err != nil { - return "", err + if _, err := b.xc.Send(xmpp.Chat{ + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Text: msg.Username + msg.Text, + }); err != nil { + return err } - if fi.URL != "" { - b.xc.SendOOB(xmpp.Chat{Type: "groupchat", Remote: msg.Channel + "@" + b.GetString("Muc"), Ooburl: fi.URL, Oobdesc: urldesc}) + + if fileInfo.URL != "" { + if _, err := b.xc.SendOOB(xmpp.Chat{ + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Ooburl: fileInfo.URL, + Oobdesc: urlDesc, + }); err != nil { + b.Log.WithError(err).Warn("Failed to send share URL.") + } } } - return "", nil + return nil } func (b *Bxmpp) parseNick(remote string) string { @@ -279,6 +332,5 @@ func (b *Bxmpp) skipMessage(message xmpp.Chat) bool { } // skip delayed messages - t := time.Time{} - return message.Stamp != t + return message.Stamp.IsZero() } -- cgit v1.2.3