diff options
Diffstat (limited to 'bridge/api/api.go')
-rw-r--r-- | bridge/api/api.go | 76 |
1 files changed, 42 insertions, 34 deletions
diff --git a/bridge/api/api.go b/bridge/api/api.go index 62336881..9c937565 100644 --- a/bridge/api/api.go +++ b/bridge/api/api.go @@ -6,9 +6,10 @@ import ( "sync" "time" + "gopkg.in/olahol/melody.v1" + "github.com/42wim/matterbridge/bridge" "github.com/42wim/matterbridge/bridge/config" - "github.com/gorilla/websocket" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" ring "github.com/zfjagann/golang-ring" @@ -18,6 +19,7 @@ type API struct { Messages ring.Ring sync.RWMutex *bridge.Config + mrouter *melody.Melody } type Message struct { @@ -33,6 +35,32 @@ func New(cfg *bridge.Config) bridge.Bridger { e := echo.New() e.HideBanner = true e.HidePort = true + + b.mrouter = melody.New() + b.mrouter.HandleMessage(func(s *melody.Session, msg []byte) { + message := config.Message{} + err := json.Unmarshal(msg, &message) + if err != nil { + b.Log.Errorf("failed to decode message from byte[] '%s'", string(msg)) + return + } + b.handleWebsocketMessage(message) + }) + b.mrouter.HandleConnect(func(session *melody.Session) { + greet := b.getGreeting() + data, err := json.Marshal(greet) + if err != nil { + b.Log.Errorf("failed to encode message '%v'", greet) + return + } + err = session.Write(data) + if err != nil { + b.Log.Errorf("failed to write message '%s'", string(data)) + return + } + // TODO: send message history buffer from `b.Messages` here + }) + b.Messages = ring.Ring{} if b.GetInt("Buffer") != 0 { b.Messages.SetCapacity(b.GetInt("Buffer")) @@ -67,13 +95,13 @@ func New(cfg *bridge.Config) bridge.Bridger { func (b *API) Connect() error { return nil } + func (b *API) Disconnect() error { return nil - } + func (b *API) JoinChannel(channel config.ChannelInfo) error { return nil - } func (b *API) Send(msg config.Message) (string, error) { @@ -83,7 +111,14 @@ func (b *API) Send(msg config.Message) (string, error) { if msg.Event == config.EventMsgDelete { return "", nil } - b.Messages.Enqueue(&msg) + b.Log.Debugf("enqueueing message from %s on ring buffer", msg.Username) + b.Messages.Enqueue(msg) + + data, err := json.Marshal(msg) + if err != nil { + b.Log.Errorf("failed to encode message '%s'", msg) + } + _ = b.mrouter.Broadcast(data) return "", nil } @@ -131,6 +166,7 @@ func (b *API) handleStream(c echo.Context) error { } c.Response().Flush() for { + // TODO: this causes issues, messages should be broadcasted to all connected clients msg := b.Messages.Dequeue() if msg != nil { if err := json.NewEncoder(c.Response()).Encode(msg); err != nil { @@ -153,40 +189,12 @@ func (b *API) handleWebsocketMessage(message config.Message) { b.Remote <- message } -func (b *API) writePump(conn *websocket.Conn) { - for { - msg := b.Messages.Dequeue() - if msg != nil { - err := conn.WriteJSON(msg) - if err != nil { - break - } - } - } -} - -func (b *API) readPump(conn *websocket.Conn) { - for { - message := config.Message{} - err := conn.ReadJSON(&message) - if err != nil { - break - } - b.handleWebsocketMessage(message) - } -} - func (b *API) handleWebsocket(c echo.Context) error { - conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024) + err := b.mrouter.HandleRequest(c.Response(), c.Request()) if err != nil { + b.Log.Errorf("error in websocket handling '%v'", err) return err } - greet := b.getGreeting() - _ = conn.WriteJSON(greet) - - go b.writePump(conn) - go b.readPump(conn) - return nil } |