diff options
author | Wim <wim@42.be> | 2018-08-06 21:10:54 +0200 |
---|---|---|
committer | Wim <wim@42.be> | 2018-08-06 21:11:13 +0200 |
commit | 7f3cbcedc0d748108d5421cf4a8e28876e7b4c29 (patch) | |
tree | c90cd45de6cf870ba3913a49115e993c7d7068f0 /vendor/github.com/matterbridge/discordgo/wsapi.go | |
parent | 6ef09def81f037ca81f4b8ff03069bad071a3d38 (diff) | |
download | matterbridge-msglm-7f3cbcedc0d748108d5421cf4a8e28876e7b4c29.tar.gz matterbridge-msglm-7f3cbcedc0d748108d5421cf4a8e28876e7b4c29.tar.bz2 matterbridge-msglm-7f3cbcedc0d748108d5421cf4a8e28876e7b4c29.zip |
Use own forks for logrus-prefixed-formatter and discordgo
Diffstat (limited to 'vendor/github.com/matterbridge/discordgo/wsapi.go')
-rw-r--r-- | vendor/github.com/matterbridge/discordgo/wsapi.go | 845 |
1 files changed, 845 insertions, 0 deletions
diff --git a/vendor/github.com/matterbridge/discordgo/wsapi.go b/vendor/github.com/matterbridge/discordgo/wsapi.go new file mode 100644 index 00000000..de66f693 --- /dev/null +++ b/vendor/github.com/matterbridge/discordgo/wsapi.go @@ -0,0 +1,845 @@ +// Discordgo - Discord bindings for Go +// Available at https://github.com/bwmarrin/discordgo + +// Copyright 2015-2016 Bruce Marriner <bruce@sqls.net>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file contains low level functions for interacting with the Discord +// data websocket interface. + +package discordgo + +import ( + "bytes" + "compress/zlib" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "runtime" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" +) + +// ErrWSAlreadyOpen is thrown when you attempt to open +// a websocket that already is open. +var ErrWSAlreadyOpen = errors.New("web socket already opened") + +// ErrWSNotFound is thrown when you attempt to use a websocket +// that doesn't exist +var ErrWSNotFound = errors.New("no websocket connection exists") + +// ErrWSShardBounds is thrown when you try to use a shard ID that is +// less than the total shard count +var ErrWSShardBounds = errors.New("ShardID must be less than ShardCount") + +type resumePacket struct { + Op int `json:"op"` + Data struct { + Token string `json:"token"` + SessionID string `json:"session_id"` + Sequence int64 `json:"seq"` + } `json:"d"` +} + +// Open creates a websocket connection to Discord. +// See: https://discordapp.com/developers/docs/topics/gateway#connecting +func (s *Session) Open() error { + s.log(LogInformational, "called") + + var err error + + // Prevent Open or other major Session functions from + // being called while Open is still running. + s.Lock() + defer s.Unlock() + + // If the websock is already open, bail out here. + if s.wsConn != nil { + return ErrWSAlreadyOpen + } + + // Get the gateway to use for the Websocket connection + if s.gateway == "" { + s.gateway, err = s.Gateway() + if err != nil { + return err + } + + // Add the version and encoding to the URL + s.gateway = s.gateway + "?v=" + APIVersion + "&encoding=json" + } + + // Connect to the Gateway + s.log(LogInformational, "connecting to gateway %s", s.gateway) + header := http.Header{} + header.Add("accept-encoding", "zlib") + s.wsConn, _, err = websocket.DefaultDialer.Dial(s.gateway, header) + if err != nil { + s.log(LogWarning, "error connecting to gateway %s, %s", s.gateway, err) + s.gateway = "" // clear cached gateway + s.wsConn = nil // Just to be safe. + return err + } + + defer func() { + // because of this, all code below must set err to the error + // when exiting with an error :) Maybe someone has a better + // way :) + if err != nil { + s.wsConn.Close() + s.wsConn = nil + } + }() + + // The first response from Discord should be an Op 10 (Hello) Packet. + // When processed by onEvent the heartbeat goroutine will be started. + mt, m, err := s.wsConn.ReadMessage() + if err != nil { + return err + } + e, err := s.onEvent(mt, m) + if err != nil { + return err + } + if e.Operation != 10 { + err = fmt.Errorf("expecting Op 10, got Op %d instead", e.Operation) + return err + } + s.log(LogInformational, "Op 10 Hello Packet received from Discord") + s.LastHeartbeatAck = time.Now().UTC() + var h helloOp + if err = json.Unmarshal(e.RawData, &h); err != nil { + err = fmt.Errorf("error unmarshalling helloOp, %s", err) + return err + } + + // Now we send either an Op 2 Identity if this is a brand new + // connection or Op 6 Resume if we are resuming an existing connection. + sequence := atomic.LoadInt64(s.sequence) + if s.sessionID == "" && sequence == 0 { + + // Send Op 2 Identity Packet + err = s.identify() + if err != nil { + err = fmt.Errorf("error sending identify packet to gateway, %s, %s", s.gateway, err) + return err + } + + } else { + + // Send Op 6 Resume Packet + p := resumePacket{} + p.Op = 6 + p.Data.Token = s.Token + p.Data.SessionID = s.sessionID + p.Data.Sequence = sequence + + s.log(LogInformational, "sending resume packet to gateway") + s.wsMutex.Lock() + err = s.wsConn.WriteJSON(p) + s.wsMutex.Unlock() + if err != nil { + err = fmt.Errorf("error sending gateway resume packet, %s, %s", s.gateway, err) + return err + } + + } + + // A basic state is a hard requirement for Voice. + // We create it here so the below READY/RESUMED packet can populate + // the state :) + // XXX: Move to New() func? + if s.State == nil { + state := NewState() + state.TrackChannels = false + state.TrackEmojis = false + state.TrackMembers = false + state.TrackRoles = false + state.TrackVoice = false + s.State = state + } + + // Now Discord should send us a READY or RESUMED packet. + mt, m, err = s.wsConn.ReadMessage() + if err != nil { + return err + } + e, err = s.onEvent(mt, m) + if err != nil { + return err + } + if e.Type != `READY` && e.Type != `RESUMED` { + // This is not fatal, but it does not follow their API documentation. + s.log(LogWarning, "Expected READY/RESUMED, instead got:\n%#v\n", e) + } + s.log(LogInformational, "First Packet:\n%#v\n", e) + + s.log(LogInformational, "We are now connected to Discord, emitting connect event") + s.handleEvent(connectEventType, &Connect{}) + + // A VoiceConnections map is a hard requirement for Voice. + // XXX: can this be moved to when opening a voice connection? + if s.VoiceConnections == nil { + s.log(LogInformational, "creating new VoiceConnections map") + s.VoiceConnections = make(map[string]*VoiceConnection) + } + + // Create listening chan outside of listen, as it needs to happen inside the + // mutex lock and needs to exist before calling heartbeat and listen + // go rountines. + s.listening = make(chan interface{}) + + // Start sending heartbeats and reading messages from Discord. + go s.heartbeat(s.wsConn, s.listening, h.HeartbeatInterval) + go s.listen(s.wsConn, s.listening) + + s.log(LogInformational, "exiting") + return nil +} + +// listen polls the websocket connection for events, it will stop when the +// listening channel is closed, or an error occurs. +func (s *Session) listen(wsConn *websocket.Conn, listening <-chan interface{}) { + + s.log(LogInformational, "called") + + for { + + messageType, message, err := wsConn.ReadMessage() + + if err != nil { + + // Detect if we have been closed manually. If a Close() has already + // happened, the websocket we are listening on will be different to + // the current session. + s.RLock() + sameConnection := s.wsConn == wsConn + s.RUnlock() + + if sameConnection { + + s.log(LogWarning, "error reading from gateway %s websocket, %s", s.gateway, err) + // There has been an error reading, close the websocket so that + // OnDisconnect event is emitted. + err := s.Close() + if err != nil { + s.log(LogWarning, "error closing session connection, %s", err) + } + + s.log(LogInformational, "calling reconnect() now") + s.reconnect() + } + + return + } + + select { + + case <-listening: + return + + default: + s.onEvent(messageType, message) + + } + } +} + +type heartbeatOp struct { + Op int `json:"op"` + Data int64 `json:"d"` +} + +type helloOp struct { + HeartbeatInterval time.Duration `json:"heartbeat_interval"` + Trace []string `json:"_trace"` +} + +// FailedHeartbeatAcks is the Number of heartbeat intervals to wait until forcing a connection restart. +const FailedHeartbeatAcks time.Duration = 5 * time.Millisecond + +// heartbeat sends regular heartbeats to Discord so it knows the client +// is still connected. If you do not send these heartbeats Discord will +// disconnect the websocket connection after a few seconds. +func (s *Session) heartbeat(wsConn *websocket.Conn, listening <-chan interface{}, heartbeatIntervalMsec time.Duration) { + + s.log(LogInformational, "called") + + if listening == nil || wsConn == nil { + return + } + + var err error + ticker := time.NewTicker(heartbeatIntervalMsec * time.Millisecond) + defer ticker.Stop() + + for { + s.RLock() + last := s.LastHeartbeatAck + s.RUnlock() + sequence := atomic.LoadInt64(s.sequence) + s.log(LogInformational, "sending gateway websocket heartbeat seq %d", sequence) + s.wsMutex.Lock() + err = wsConn.WriteJSON(heartbeatOp{1, sequence}) + s.wsMutex.Unlock() + if err != nil || time.Now().UTC().Sub(last) > (heartbeatIntervalMsec*FailedHeartbeatAcks) { + if err != nil { + s.log(LogError, "error sending heartbeat to gateway %s, %s", s.gateway, err) + } else { + s.log(LogError, "haven't gotten a heartbeat ACK in %v, triggering a reconnection", time.Now().UTC().Sub(last)) + } + s.Close() + s.reconnect() + return + } + s.Lock() + s.DataReady = true + s.Unlock() + + select { + case <-ticker.C: + // continue loop and send heartbeat + case <-listening: + return + } + } +} + +// UpdateStatusData ia provided to UpdateStatusComplex() +type UpdateStatusData struct { + IdleSince *int `json:"since"` + Game *Game `json:"game"` + AFK bool `json:"afk"` + Status string `json:"status"` +} + +type updateStatusOp struct { + Op int `json:"op"` + Data UpdateStatusData `json:"d"` +} + +// UpdateStreamingStatus is used to update the user's streaming status. +// If idle>0 then set status to idle. +// If game!="" then set game. +// If game!="" and url!="" then set the status type to streaming with the URL set. +// if otherwise, set status to active, and no game. +func (s *Session) UpdateStreamingStatus(idle int, game string, url string) (err error) { + + s.log(LogInformational, "called") + + usd := UpdateStatusData{ + Status: "online", + } + + if idle > 0 { + usd.IdleSince = &idle + } + + if game != "" { + gameType := GameTypeGame + if url != "" { + gameType = GameTypeStreaming + } + usd.Game = &Game{ + Name: game, + Type: gameType, + URL: url, + } + } + + return s.UpdateStatusComplex(usd) +} + +// UpdateStatusComplex allows for sending the raw status update data untouched by discordgo. +func (s *Session) UpdateStatusComplex(usd UpdateStatusData) (err error) { + + s.RLock() + defer s.RUnlock() + if s.wsConn == nil { + return ErrWSNotFound + } + + s.wsMutex.Lock() + err = s.wsConn.WriteJSON(updateStatusOp{3, usd}) + s.wsMutex.Unlock() + + return +} + +// UpdateStatus is used to update the user's status. +// If idle>0 then set status to idle. +// If game!="" then set game. +// if otherwise, set status to active, and no game. +func (s *Session) UpdateStatus(idle int, game string) (err error) { + return s.UpdateStreamingStatus(idle, game, "") +} + +type requestGuildMembersData struct { + GuildID string `json:"guild_id"` + Query string `json:"query"` + Limit int `json:"limit"` +} + +type requestGuildMembersOp struct { + Op int `json:"op"` + Data requestGuildMembersData `json:"d"` +} + +// RequestGuildMembers requests guild members from the gateway +// The gateway responds with GuildMembersChunk events +// guildID : The ID of the guild to request members of +// query : String that username starts with, leave empty to return all members +// limit : Max number of items to return, or 0 to request all members matched +func (s *Session) RequestGuildMembers(guildID, query string, limit int) (err error) { + s.log(LogInformational, "called") + + s.RLock() + defer s.RUnlock() + if s.wsConn == nil { + return ErrWSNotFound + } + + data := requestGuildMembersData{ + GuildID: guildID, + Query: query, + Limit: limit, + } + + s.wsMutex.Lock() + err = s.wsConn.WriteJSON(requestGuildMembersOp{8, data}) + s.wsMutex.Unlock() + + return +} + +// onEvent is the "event handler" for all messages received on the +// Discord Gateway API websocket connection. +// +// If you use the AddHandler() function to register a handler for a +// specific event this function will pass the event along to that handler. +// +// If you use the AddHandler() function to register a handler for the +// "OnEvent" event then all events will be passed to that handler. +func (s *Session) onEvent(messageType int, message []byte) (*Event, error) { + + var err error + var reader io.Reader + reader = bytes.NewBuffer(message) + + // If this is a compressed message, uncompress it. + if messageType == websocket.BinaryMessage { + + z, err2 := zlib.NewReader(reader) + if err2 != nil { + s.log(LogError, "error uncompressing websocket message, %s", err) + return nil, err2 + } + + defer func() { + err3 := z.Close() + if err3 != nil { + s.log(LogWarning, "error closing zlib, %s", err) + } + }() + + reader = z + } + + // Decode the event into an Event struct. + var e *Event + decoder := json.NewDecoder(reader) + if err = decoder.Decode(&e); err != nil { + s.log(LogError, "error decoding websocket message, %s", err) + return e, err + } + + s.log(LogDebug, "Op: %d, Seq: %d, Type: %s, Data: %s\n\n", e.Operation, e.Sequence, e.Type, string(e.RawData)) + + // Ping request. + // Must respond with a heartbeat packet within 5 seconds + if e.Operation == 1 { + s.log(LogInformational, "sending heartbeat in response to Op1") + s.wsMutex.Lock() + err = s.wsConn.WriteJSON(heartbeatOp{1, atomic.LoadInt64(s.sequence)}) + s.wsMutex.Unlock() + if err != nil { + s.log(LogError, "error sending heartbeat in response to Op1") + return e, err + } + + return e, nil + } + + // Reconnect + // Must immediately disconnect from gateway and reconnect to new gateway. + if e.Operation == 7 { + s.log(LogInformational, "Closing and reconnecting in response to Op7") + s.Close() + s.reconnect() + return e, nil + } + + // Invalid Session + // Must respond with a Identify packet. + if e.Operation == 9 { + + s.log(LogInformational, "sending identify packet to gateway in response to Op9") + + err = s.identify() + if err != nil { + s.log(LogWarning, "error sending gateway identify packet, %s, %s", s.gateway, err) + return e, err + } + + return e, nil + } + + if e.Operation == 10 { + // Op10 is handled by Open() + return e, nil + } + + if e.Operation == 11 { + s.Lock() + s.LastHeartbeatAck = time.Now().UTC() + s.Unlock() + s.log(LogInformational, "got heartbeat ACK") + return e, nil + } + + // Do not try to Dispatch a non-Dispatch Message + if e.Operation != 0 { + // But we probably should be doing something with them. + // TEMP + s.log(LogWarning, "unknown Op: %d, Seq: %d, Type: %s, Data: %s, message: %s", e.Operation, e.Sequence, e.Type, string(e.RawData), string(message)) + return e, nil + } + + // Store the message sequence + atomic.StoreInt64(s.sequence, e.Sequence) + + // Map event to registered event handlers and pass it along to any registered handlers. + if eh, ok := registeredInterfaceProviders[e.Type]; ok { + e.Struct = eh.New() + + // Attempt to unmarshal our event. + if err = json.Unmarshal(e.RawData, e.Struct); err != nil { + s.log(LogError, "error unmarshalling %s event, %s", e.Type, err) + } + + // Send event to any registered event handlers for it's type. + // Because the above doesn't cancel this, in case of an error + // the struct could be partially populated or at default values. + // However, most errors are due to a single field and I feel + // it's better to pass along what we received than nothing at all. + // TODO: Think about that decision :) + // Either way, READY events must fire, even with errors. + s.handleEvent(e.Type, e.Struct) + } else { + s.log(LogWarning, "unknown event: Op: %d, Seq: %d, Type: %s, Data: %s", e.Operation, e.Sequence, e.Type, string(e.RawData)) + } + + // For legacy reasons, we send the raw event also, this could be useful for handling unknown events. + s.handleEvent(eventEventType, e) + + return e, nil +} + +// ------------------------------------------------------------------------------------------------ +// Code related to voice connections that initiate over the data websocket +// ------------------------------------------------------------------------------------------------ + +type voiceChannelJoinData struct { + GuildID *string `json:"guild_id"` + ChannelID *string `json:"channel_id"` + SelfMute bool `json:"self_mute"` + SelfDeaf bool `json:"self_deaf"` +} + +type voiceChannelJoinOp struct { + Op int `json:"op"` + Data voiceChannelJoinData `json:"d"` +} + +// ChannelVoiceJoin joins the session user to a voice channel. +// +// gID : Guild ID of the channel to join. +// cID : Channel ID of the channel to join. +// mute : If true, you will be set to muted upon joining. +// deaf : If true, you will be set to deafened upon joining. +func (s *Session) ChannelVoiceJoin(gID, cID string, mute, deaf bool) (voice *VoiceConnection, err error) { + + s.log(LogInformational, "called") + + s.RLock() + voice, _ = s.VoiceConnections[gID] + s.RUnlock() + + if voice == nil { + voice = &VoiceConnection{} + s.Lock() + s.VoiceConnections[gID] = voice + s.Unlock() + } + + voice.Lock() + voice.GuildID = gID + voice.ChannelID = cID + voice.deaf = deaf + voice.mute = mute + voice.session = s + voice.Unlock() + + // Send the request to Discord that we want to join the voice channel + data := voiceChannelJoinOp{4, voiceChannelJoinData{&gID, &cID, mute, deaf}} + s.wsMutex.Lock() + err = s.wsConn.WriteJSON(data) + s.wsMutex.Unlock() + if err != nil { + return + } + + // doesn't exactly work perfect yet.. TODO + err = voice.waitUntilConnected() + if err != nil { + s.log(LogWarning, "error waiting for voice to connect, %s", err) + voice.Close() + return + } + + return +} + +// onVoiceStateUpdate handles Voice State Update events on the data websocket. +func (s *Session) onVoiceStateUpdate(st *VoiceStateUpdate) { + + // If we don't have a connection for the channel, don't bother + if st.ChannelID == "" { + return + } + + // Check if we have a voice connection to update + s.RLock() + voice, exists := s.VoiceConnections[st.GuildID] + s.RUnlock() + if !exists { + return + } + + // We only care about events that are about us. + if s.State.User.ID != st.UserID { + return + } + + // Store the SessionID for later use. + voice.Lock() + voice.UserID = st.UserID + voice.sessionID = st.SessionID + voice.ChannelID = st.ChannelID + voice.Unlock() +} + +// onVoiceServerUpdate handles the Voice Server Update data websocket event. +// +// This is also fired if the Guild's voice region changes while connected +// to a voice channel. In that case, need to re-establish connection to +// the new region endpoint. +func (s *Session) onVoiceServerUpdate(st *VoiceServerUpdate) { + + s.log(LogInformational, "called") + + s.RLock() + voice, exists := s.VoiceConnections[st.GuildID] + s.RUnlock() + + // If no VoiceConnection exists, just skip this + if !exists { + return + } + + // If currently connected to voice ws/udp, then disconnect. + // Has no effect if not connected. + voice.Close() + + // Store values for later use + voice.Lock() + voice.token = st.Token + voice.endpoint = st.Endpoint + voice.GuildID = st.GuildID + voice.Unlock() + + // Open a connection to the voice server + err := voice.open() + if err != nil { + s.log(LogError, "onVoiceServerUpdate voice.open, %s", err) + } +} + +type identifyProperties struct { + OS string `json:"$os"` + Browser string `json:"$browser"` + Device string `json:"$device"` + Referer string `json:"$referer"` + ReferringDomain string `json:"$referring_domain"` +} + +type identifyData struct { + Token string `json:"token"` + Properties identifyProperties `json:"properties"` + LargeThreshold int `json:"large_threshold"` + Compress bool `json:"compress"` + Shard *[2]int `json:"shard,omitempty"` +} + +type identifyOp struct { + Op int `json:"op"` + Data identifyData `json:"d"` +} + +// identify sends the identify packet to the gateway +func (s *Session) identify() error { + + properties := identifyProperties{runtime.GOOS, + "Discordgo v" + VERSION, + "", + "", + "", + } + + data := identifyData{s.Token, + properties, + 250, + s.Compress, + nil, + } + + if s.ShardCount > 1 { + + if s.ShardID >= s.ShardCount { + return ErrWSShardBounds + } + + data.Shard = &[2]int{s.ShardID, s.ShardCount} + } + + op := identifyOp{2, data} + + s.wsMutex.Lock() + err := s.wsConn.WriteJSON(op) + s.wsMutex.Unlock() + if err != nil { + return err + } + + return nil +} + +func (s *Session) reconnect() { + + s.log(LogInformational, "called") + + var err error + + if s.ShouldReconnectOnError { + + wait := time.Duration(1) + + for { + s.log(LogInformational, "trying to reconnect to gateway") + + err = s.Open() + if err == nil { + s.log(LogInformational, "successfully reconnected to gateway") + + // I'm not sure if this is actually needed. + // if the gw reconnect works properly, voice should stay alive + // However, there seems to be cases where something "weird" + // happens. So we're doing this for now just to improve + // stability in those edge cases. + s.RLock() + defer s.RUnlock() + for _, v := range s.VoiceConnections { + + s.log(LogInformational, "reconnecting voice connection to guild %s", v.GuildID) + go v.reconnect() + + // This is here just to prevent violently spamming the + // voice reconnects + time.Sleep(1 * time.Second) + + } + return + } + + // Certain race conditions can call reconnect() twice. If this happens, we + // just break out of the reconnect loop + if err == ErrWSAlreadyOpen { + s.log(LogInformational, "Websocket already exists, no need to reconnect") + return + } + + s.log(LogError, "error reconnecting to gateway, %s", err) + + <-time.After(wait * time.Second) + wait *= 2 + if wait > 600 { + wait = 600 + } + } + } +} + +// Close closes a websocket and stops all listening/heartbeat goroutines. +// TODO: Add support for Voice WS/UDP connections +func (s *Session) Close() (err error) { + + s.log(LogInformational, "called") + s.Lock() + + s.DataReady = false + + if s.listening != nil { + s.log(LogInformational, "closing listening channel") + close(s.listening) + s.listening = nil + } + + // TODO: Close all active Voice Connections too + // this should force stop any reconnecting voice channels too + + if s.wsConn != nil { + + s.log(LogInformational, "sending close frame") + // To cleanly close a connection, a client should send a close + // frame and wait for the server to close the connection. + s.wsMutex.Lock() + err := s.wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + s.wsMutex.Unlock() + if err != nil { + s.log(LogInformational, "error closing websocket, %s", err) + } + + // TODO: Wait for Discord to actually close the connection. + time.Sleep(1 * time.Second) + + s.log(LogInformational, "closing gateway websocket") + err = s.wsConn.Close() + if err != nil { + s.log(LogInformational, "error closing websocket, %s", err) + } + + s.wsConn = nil + } + + s.Unlock() + + s.log(LogInformational, "emit disconnect event") + s.handleEvent(disconnectEventType, &Disconnect{}) + + return +} |