summaryrefslogtreecommitdiffstats
path: root/bridge/api/api.go
diff options
context:
space:
mode:
Diffstat (limited to 'bridge/api/api.go')
-rw-r--r--bridge/api/api.go76
1 files changed, 42 insertions, 34 deletions
diff --git a/bridge/api/api.go b/bridge/api/api.go
index 62336881..9c937565 100644
--- a/bridge/api/api.go
+++ b/bridge/api/api.go
@@ -6,9 +6,10 @@ import (
"sync"
"time"
+ "gopkg.in/olahol/melody.v1"
+
"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
- "github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
ring "github.com/zfjagann/golang-ring"
@@ -18,6 +19,7 @@ type API struct {
Messages ring.Ring
sync.RWMutex
*bridge.Config
+ mrouter *melody.Melody
}
type Message struct {
@@ -33,6 +35,32 @@ func New(cfg *bridge.Config) bridge.Bridger {
e := echo.New()
e.HideBanner = true
e.HidePort = true
+
+ b.mrouter = melody.New()
+ b.mrouter.HandleMessage(func(s *melody.Session, msg []byte) {
+ message := config.Message{}
+ err := json.Unmarshal(msg, &message)
+ if err != nil {
+ b.Log.Errorf("failed to decode message from byte[] '%s'", string(msg))
+ return
+ }
+ b.handleWebsocketMessage(message)
+ })
+ b.mrouter.HandleConnect(func(session *melody.Session) {
+ greet := b.getGreeting()
+ data, err := json.Marshal(greet)
+ if err != nil {
+ b.Log.Errorf("failed to encode message '%v'", greet)
+ return
+ }
+ err = session.Write(data)
+ if err != nil {
+ b.Log.Errorf("failed to write message '%s'", string(data))
+ return
+ }
+ // TODO: send message history buffer from `b.Messages` here
+ })
+
b.Messages = ring.Ring{}
if b.GetInt("Buffer") != 0 {
b.Messages.SetCapacity(b.GetInt("Buffer"))
@@ -67,13 +95,13 @@ func New(cfg *bridge.Config) bridge.Bridger {
func (b *API) Connect() error {
return nil
}
+
func (b *API) Disconnect() error {
return nil
-
}
+
func (b *API) JoinChannel(channel config.ChannelInfo) error {
return nil
-
}
func (b *API) Send(msg config.Message) (string, error) {
@@ -83,7 +111,14 @@ func (b *API) Send(msg config.Message) (string, error) {
if msg.Event == config.EventMsgDelete {
return "", nil
}
- b.Messages.Enqueue(&msg)
+ b.Log.Debugf("enqueueing message from %s on ring buffer", msg.Username)
+ b.Messages.Enqueue(msg)
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ b.Log.Errorf("failed to encode message '%s'", msg)
+ }
+ _ = b.mrouter.Broadcast(data)
return "", nil
}
@@ -131,6 +166,7 @@ func (b *API) handleStream(c echo.Context) error {
}
c.Response().Flush()
for {
+ // TODO: this causes issues, messages should be broadcasted to all connected clients
msg := b.Messages.Dequeue()
if msg != nil {
if err := json.NewEncoder(c.Response()).Encode(msg); err != nil {
@@ -153,40 +189,12 @@ func (b *API) handleWebsocketMessage(message config.Message) {
b.Remote <- message
}
-func (b *API) writePump(conn *websocket.Conn) {
- for {
- msg := b.Messages.Dequeue()
- if msg != nil {
- err := conn.WriteJSON(msg)
- if err != nil {
- break
- }
- }
- }
-}
-
-func (b *API) readPump(conn *websocket.Conn) {
- for {
- message := config.Message{}
- err := conn.ReadJSON(&message)
- if err != nil {
- break
- }
- b.handleWebsocketMessage(message)
- }
-}
-
func (b *API) handleWebsocket(c echo.Context) error {
- conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024)
+ err := b.mrouter.HandleRequest(c.Response(), c.Request())
if err != nil {
+ b.Log.Errorf("error in websocket handling '%v'", err)
return err
}
- greet := b.getGreeting()
- _ = conn.WriteJSON(greet)
-
- go b.writePump(conn)
- go b.readPump(conn)
-
return nil
}