summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/lrstanley/girc/conn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/lrstanley/girc/conn.go')
-rw-r--r--vendor/github.com/lrstanley/girc/conn.go208
1 files changed, 119 insertions, 89 deletions
diff --git a/vendor/github.com/lrstanley/girc/conn.go b/vendor/github.com/lrstanley/girc/conn.go
index 626a6dca..c32eca69 100644
--- a/vendor/github.com/lrstanley/girc/conn.go
+++ b/vendor/github.com/lrstanley/girc/conn.go
@@ -12,6 +12,8 @@ import (
"net"
"sync"
"time"
+
+ "github.com/lrstanley/girc/internal/ctxgroup"
)
// Messages are delimited with CR and LF line endings, we're using the last
@@ -142,17 +144,44 @@ type ErrParseEvent struct {
func (e ErrParseEvent) Error() string { return "unable to parse event: " + e.Line }
-func (c *ircConn) decode() (event *Event, err error) {
- line, err := c.io.ReadString(delim)
- if err != nil {
- return nil, err
- }
+type decodedEvent struct {
+ event *Event
+ err error
+}
- if event = ParseEvent(line); event == nil {
- return nil, ErrParseEvent{line}
- }
+func (c *ircConn) decode() <-chan decodedEvent {
+ ch := make(chan decodedEvent)
+
+ go func() {
+ defer close(ch)
+
+ line, err := c.io.ReadString(delim)
+ if err != nil {
+ select {
+ case ch <- decodedEvent{err: err}:
+ default:
+ }
+
+ return
+ }
+
+ event := ParseEvent(line)
+ if event == nil {
+ select {
+ case ch <- decodedEvent{err: ErrParseEvent{Line: line}}:
+ default:
+ }
+
+ return
+ }
- return event, nil
+ select {
+ case ch <- decodedEvent{event: event}:
+ default:
+ }
+ }()
+
+ return ch
}
func (c *ircConn) encode(event *Event) error {
@@ -291,20 +320,17 @@ startConn:
} else {
c.conn = newMockConn(mock)
}
+ c.mu.Unlock()
var ctx context.Context
ctx, c.stop = context.WithCancel(context.Background())
- c.mu.Unlock()
- errs := make(chan error, 4)
- var wg sync.WaitGroup
- // 4 being the number of goroutines we need to finish when this function
- // returns.
- wg.Add(4)
- go c.execLoop(ctx, errs, &wg)
- go c.readLoop(ctx, errs, &wg)
- go c.sendLoop(ctx, errs, &wg)
- go c.pingLoop(ctx, errs, &wg)
+ group := ctxgroup.New(ctx)
+
+ group.Go(c.execLoop)
+ group.Go(c.readLoop)
+ group.Go(c.sendLoop)
+ group.Go(c.pingLoop)
// Passwords first.
@@ -338,16 +364,15 @@ startConn:
c.RunHandlers(&Event{Command: INITIALIZED, Params: []string{addr}})
// Wait for the first error.
- var result error
- select {
- case <-ctx.Done():
+ err := group.Wait()
+ if err != nil {
+ c.debug.Printf("received error, beginning cleanup: %v", err)
+ } else {
if !c.state.sts.beginUpgrade {
c.debug.Print("received request to close, beginning clean up")
}
+
c.RunHandlers(&Event{Command: CLOSED, Params: []string{addr}})
- case err := <-errs:
- c.debug.Printf("received error, beginning cleanup: %v", err)
- result = err
}
// Make sure that the connection is closed if not already.
@@ -363,20 +388,13 @@ startConn:
c.RunHandlers(&Event{Command: DISCONNECTED, Params: []string{addr}})
- // Once we have our error/result, let all other functions know we're done.
- c.debug.Print("waiting for all routines to finish")
-
- // Wait for all goroutines to finish.
- wg.Wait()
- close(errs)
-
// This helps ensure that the end user isn't improperly using the client
// more than once. If they want to do this, they should be using multiple
// clients, not multiple instances of Connect().
c.mu.Lock()
c.conn = nil
- if result == nil {
+ if err == nil {
if c.state.sts.beginUpgrade {
c.state.sts.beginUpgrade = false
c.mu.Unlock()
@@ -389,76 +407,85 @@ startConn:
}
c.mu.Unlock()
- return result
+ return err
}
// readLoop sets a timeout of 300 seconds, and then attempts to read from the
// IRC server. If there is an error, it calls Reconnect.
-func (c *Client) readLoop(ctx context.Context, errs chan error, wg *sync.WaitGroup) {
+func (c *Client) readLoop(ctx context.Context) error {
c.debug.Print("starting readLoop")
defer c.debug.Print("closing readLoop")
- var event *Event
- var err error
+ var de decodedEvent
for {
select {
case <-ctx.Done():
- wg.Done()
- return
+ return nil
default:
_ = c.conn.sock.SetReadDeadline(time.Now().Add(300 * time.Second))
- event, err = c.conn.decode()
- if err != nil {
- errs <- err
- wg.Done()
- return
+
+ select {
+ case <-ctx.Done():
+ return nil
+ case de = <-c.conn.decode():
+ }
+
+ if de.err != nil {
+ return de.err
}
// Check if it's an echo-message.
if !c.Config.disableTracking {
- event.Echo = (event.Command == PRIVMSG || event.Command == NOTICE) &&
- event.Source != nil && event.Source.ID() == c.GetID()
+ de.event.Echo = (de.event.Command == PRIVMSG || de.event.Command == NOTICE) &&
+ de.event.Source != nil && de.event.Source.ID() == c.GetID()
}
- c.rx <- event
+ c.receive(de.event)
}
}
}
-// Send sends an event to the server. Use Client.RunHandlers() if you are
-// simply looking to trigger handlers with an event.
+// Send sends an event to the server. Send will split events if the event is longer
+// than what the server supports, and is an event that supports splitting. Use
+// Client.RunHandlers() if you are simply looking to trigger handlers with an event.
func (c *Client) Send(event *Event) {
var delay time.Duration
- if !c.Config.AllowFlood {
- c.mu.RLock()
-
- // Drop the event early as we're disconnected, this way we don't have to wait
- // the (potentially long) rate limit delay before dropping.
- if c.conn == nil {
- c.debugLogEvent(event, true)
- c.mu.RUnlock()
- return
- }
-
- c.conn.mu.Lock()
- delay = c.conn.rate(event.Len())
- c.conn.mu.Unlock()
- c.mu.RUnlock()
- }
-
if c.Config.GlobalFormat && len(event.Params) > 0 && event.Params[len(event.Params)-1] != "" &&
(event.Command == PRIVMSG || event.Command == TOPIC || event.Command == NOTICE) {
event.Params[len(event.Params)-1] = Fmt(event.Params[len(event.Params)-1])
}
- <-time.After(delay)
- c.write(event)
+ var events []*Event
+ events = event.split(c.MaxEventLength())
+
+ for _, e := range events {
+ if !c.Config.AllowFlood {
+ c.mu.RLock()
+
+ // Drop the event early as we're disconnected, this way we don't have to wait
+ // the (potentially long) rate limit delay before dropping.
+ if c.conn == nil {
+ c.debugLogEvent(e, true)
+ c.mu.RUnlock()
+ return
+ }
+
+ c.conn.mu.Lock()
+ delay = c.conn.rate(e.Len())
+ c.conn.mu.Unlock()
+ c.mu.RUnlock()
+ }
+
+ <-time.After(delay)
+ c.write(e)
+ }
}
// write is the lower level function to write an event. It does not have a
-// write-delay when sending events.
+// write-delay when sending events. write will timeout after 30s if the event
+// can't be sent.
func (c *Client) write(event *Event) {
c.mu.RLock()
defer c.mu.RUnlock()
@@ -468,7 +495,19 @@ func (c *Client) write(event *Event) {
c.debugLogEvent(event, true)
return
}
- c.tx <- event
+
+ t := time.NewTimer(30 * time.Second)
+ defer func() {
+ if !t.Stop() {
+ <-t.C
+ }
+ }()
+
+ select {
+ case c.tx <- event:
+ case <-t.C:
+ c.debugLogEvent(event, true)
+ }
}
// rate allows limiting events based on how frequent the event is being sent,
@@ -487,7 +526,7 @@ func (c *ircConn) rate(chars int) time.Duration {
return 0
}
-func (c *Client) sendLoop(ctx context.Context, errs chan error, wg *sync.WaitGroup) {
+func (c *Client) sendLoop(ctx context.Context) error {
c.debug.Print("starting sendLoop")
defer c.debug.Print("closing sendLoop")
@@ -537,18 +576,14 @@ func (c *Client) sendLoop(ctx context.Context, errs chan error, wg *sync.WaitGro
if event.Command == QUIT {
c.Close()
- wg.Done()
- return
+ return nil
}
if err != nil {
- errs <- err
- wg.Done()
- return
+ return err
}
case <-ctx.Done():
- wg.Done()
- return
+ return nil
}
}
}
@@ -568,11 +603,10 @@ type ErrTimedOut struct {
func (ErrTimedOut) Error() string { return "timed out waiting for a requested PING response" }
-func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGroup) {
+func (c *Client) pingLoop(ctx context.Context) error {
// Don't run the pingLoop if they want to disable it.
if c.Config.PingDelay <= 0 {
- wg.Done()
- return
+ return nil
}
c.debug.Print("starting pingLoop")
@@ -604,9 +638,8 @@ func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGro
}
c.conn.mu.RLock()
- if pingSent && time.Since(c.conn.lastPong) > c.Config.PingDelay+(60*time.Second) {
- // It's 60 seconds over what out ping delay is, connection
- // has probably dropped.
+ if pingSent && time.Since(c.conn.lastPong) > c.Config.PingDelay+c.Config.PingTimeout {
+ // PingTimeout exceeded, connection has probably dropped.
err := ErrTimedOut{
TimeSinceSuccess: time.Since(c.conn.lastPong),
LastPong: c.conn.lastPong,
@@ -615,9 +648,7 @@ func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGro
}
c.conn.mu.RUnlock()
- errs <- err
- wg.Done()
- return
+ return err
}
c.conn.mu.RUnlock()
@@ -628,8 +659,7 @@ func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGro
c.Cmd.Ping(fmt.Sprintf("%d", time.Now().UnixNano()))
pingSent = true
case <-ctx.Done():
- wg.Done()
- return
+ return nil
}
}
}