diff options
Diffstat (limited to 'vendor/github.com/lrstanley/girc/conn.go')
-rw-r--r-- | vendor/github.com/lrstanley/girc/conn.go | 208 |
1 files changed, 119 insertions, 89 deletions
diff --git a/vendor/github.com/lrstanley/girc/conn.go b/vendor/github.com/lrstanley/girc/conn.go index 626a6dca..c32eca69 100644 --- a/vendor/github.com/lrstanley/girc/conn.go +++ b/vendor/github.com/lrstanley/girc/conn.go @@ -12,6 +12,8 @@ import ( "net" "sync" "time" + + "github.com/lrstanley/girc/internal/ctxgroup" ) // Messages are delimited with CR and LF line endings, we're using the last @@ -142,17 +144,44 @@ type ErrParseEvent struct { func (e ErrParseEvent) Error() string { return "unable to parse event: " + e.Line } -func (c *ircConn) decode() (event *Event, err error) { - line, err := c.io.ReadString(delim) - if err != nil { - return nil, err - } +type decodedEvent struct { + event *Event + err error +} - if event = ParseEvent(line); event == nil { - return nil, ErrParseEvent{line} - } +func (c *ircConn) decode() <-chan decodedEvent { + ch := make(chan decodedEvent) + + go func() { + defer close(ch) + + line, err := c.io.ReadString(delim) + if err != nil { + select { + case ch <- decodedEvent{err: err}: + default: + } + + return + } + + event := ParseEvent(line) + if event == nil { + select { + case ch <- decodedEvent{err: ErrParseEvent{Line: line}}: + default: + } + + return + } - return event, nil + select { + case ch <- decodedEvent{event: event}: + default: + } + }() + + return ch } func (c *ircConn) encode(event *Event) error { @@ -291,20 +320,17 @@ startConn: } else { c.conn = newMockConn(mock) } + c.mu.Unlock() var ctx context.Context ctx, c.stop = context.WithCancel(context.Background()) - c.mu.Unlock() - errs := make(chan error, 4) - var wg sync.WaitGroup - // 4 being the number of goroutines we need to finish when this function - // returns. - wg.Add(4) - go c.execLoop(ctx, errs, &wg) - go c.readLoop(ctx, errs, &wg) - go c.sendLoop(ctx, errs, &wg) - go c.pingLoop(ctx, errs, &wg) + group := ctxgroup.New(ctx) + + group.Go(c.execLoop) + group.Go(c.readLoop) + group.Go(c.sendLoop) + group.Go(c.pingLoop) // Passwords first. @@ -338,16 +364,15 @@ startConn: c.RunHandlers(&Event{Command: INITIALIZED, Params: []string{addr}}) // Wait for the first error. - var result error - select { - case <-ctx.Done(): + err := group.Wait() + if err != nil { + c.debug.Printf("received error, beginning cleanup: %v", err) + } else { if !c.state.sts.beginUpgrade { c.debug.Print("received request to close, beginning clean up") } + c.RunHandlers(&Event{Command: CLOSED, Params: []string{addr}}) - case err := <-errs: - c.debug.Printf("received error, beginning cleanup: %v", err) - result = err } // Make sure that the connection is closed if not already. @@ -363,20 +388,13 @@ startConn: c.RunHandlers(&Event{Command: DISCONNECTED, Params: []string{addr}}) - // Once we have our error/result, let all other functions know we're done. - c.debug.Print("waiting for all routines to finish") - - // Wait for all goroutines to finish. - wg.Wait() - close(errs) - // This helps ensure that the end user isn't improperly using the client // more than once. If they want to do this, they should be using multiple // clients, not multiple instances of Connect(). c.mu.Lock() c.conn = nil - if result == nil { + if err == nil { if c.state.sts.beginUpgrade { c.state.sts.beginUpgrade = false c.mu.Unlock() @@ -389,76 +407,85 @@ startConn: } c.mu.Unlock() - return result + return err } // readLoop sets a timeout of 300 seconds, and then attempts to read from the // IRC server. If there is an error, it calls Reconnect. -func (c *Client) readLoop(ctx context.Context, errs chan error, wg *sync.WaitGroup) { +func (c *Client) readLoop(ctx context.Context) error { c.debug.Print("starting readLoop") defer c.debug.Print("closing readLoop") - var event *Event - var err error + var de decodedEvent for { select { case <-ctx.Done(): - wg.Done() - return + return nil default: _ = c.conn.sock.SetReadDeadline(time.Now().Add(300 * time.Second)) - event, err = c.conn.decode() - if err != nil { - errs <- err - wg.Done() - return + + select { + case <-ctx.Done(): + return nil + case de = <-c.conn.decode(): + } + + if de.err != nil { + return de.err } // Check if it's an echo-message. if !c.Config.disableTracking { - event.Echo = (event.Command == PRIVMSG || event.Command == NOTICE) && - event.Source != nil && event.Source.ID() == c.GetID() + de.event.Echo = (de.event.Command == PRIVMSG || de.event.Command == NOTICE) && + de.event.Source != nil && de.event.Source.ID() == c.GetID() } - c.rx <- event + c.receive(de.event) } } } -// Send sends an event to the server. Use Client.RunHandlers() if you are -// simply looking to trigger handlers with an event. +// Send sends an event to the server. Send will split events if the event is longer +// than what the server supports, and is an event that supports splitting. Use +// Client.RunHandlers() if you are simply looking to trigger handlers with an event. func (c *Client) Send(event *Event) { var delay time.Duration - if !c.Config.AllowFlood { - c.mu.RLock() - - // Drop the event early as we're disconnected, this way we don't have to wait - // the (potentially long) rate limit delay before dropping. - if c.conn == nil { - c.debugLogEvent(event, true) - c.mu.RUnlock() - return - } - - c.conn.mu.Lock() - delay = c.conn.rate(event.Len()) - c.conn.mu.Unlock() - c.mu.RUnlock() - } - if c.Config.GlobalFormat && len(event.Params) > 0 && event.Params[len(event.Params)-1] != "" && (event.Command == PRIVMSG || event.Command == TOPIC || event.Command == NOTICE) { event.Params[len(event.Params)-1] = Fmt(event.Params[len(event.Params)-1]) } - <-time.After(delay) - c.write(event) + var events []*Event + events = event.split(c.MaxEventLength()) + + for _, e := range events { + if !c.Config.AllowFlood { + c.mu.RLock() + + // Drop the event early as we're disconnected, this way we don't have to wait + // the (potentially long) rate limit delay before dropping. + if c.conn == nil { + c.debugLogEvent(e, true) + c.mu.RUnlock() + return + } + + c.conn.mu.Lock() + delay = c.conn.rate(e.Len()) + c.conn.mu.Unlock() + c.mu.RUnlock() + } + + <-time.After(delay) + c.write(e) + } } // write is the lower level function to write an event. It does not have a -// write-delay when sending events. +// write-delay when sending events. write will timeout after 30s if the event +// can't be sent. func (c *Client) write(event *Event) { c.mu.RLock() defer c.mu.RUnlock() @@ -468,7 +495,19 @@ func (c *Client) write(event *Event) { c.debugLogEvent(event, true) return } - c.tx <- event + + t := time.NewTimer(30 * time.Second) + defer func() { + if !t.Stop() { + <-t.C + } + }() + + select { + case c.tx <- event: + case <-t.C: + c.debugLogEvent(event, true) + } } // rate allows limiting events based on how frequent the event is being sent, @@ -487,7 +526,7 @@ func (c *ircConn) rate(chars int) time.Duration { return 0 } -func (c *Client) sendLoop(ctx context.Context, errs chan error, wg *sync.WaitGroup) { +func (c *Client) sendLoop(ctx context.Context) error { c.debug.Print("starting sendLoop") defer c.debug.Print("closing sendLoop") @@ -537,18 +576,14 @@ func (c *Client) sendLoop(ctx context.Context, errs chan error, wg *sync.WaitGro if event.Command == QUIT { c.Close() - wg.Done() - return + return nil } if err != nil { - errs <- err - wg.Done() - return + return err } case <-ctx.Done(): - wg.Done() - return + return nil } } } @@ -568,11 +603,10 @@ type ErrTimedOut struct { func (ErrTimedOut) Error() string { return "timed out waiting for a requested PING response" } -func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGroup) { +func (c *Client) pingLoop(ctx context.Context) error { // Don't run the pingLoop if they want to disable it. if c.Config.PingDelay <= 0 { - wg.Done() - return + return nil } c.debug.Print("starting pingLoop") @@ -604,9 +638,8 @@ func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGro } c.conn.mu.RLock() - if pingSent && time.Since(c.conn.lastPong) > c.Config.PingDelay+(60*time.Second) { - // It's 60 seconds over what out ping delay is, connection - // has probably dropped. + if pingSent && time.Since(c.conn.lastPong) > c.Config.PingDelay+c.Config.PingTimeout { + // PingTimeout exceeded, connection has probably dropped. err := ErrTimedOut{ TimeSinceSuccess: time.Since(c.conn.lastPong), LastPong: c.conn.lastPong, @@ -615,9 +648,7 @@ func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGro } c.conn.mu.RUnlock() - errs <- err - wg.Done() - return + return err } c.conn.mu.RUnlock() @@ -628,8 +659,7 @@ func (c *Client) pingLoop(ctx context.Context, errs chan error, wg *sync.WaitGro c.Cmd.Ping(fmt.Sprintf("%d", time.Now().UnixNano())) pingSent = true case <-ctx.Done(): - wg.Done() - return + return nil } } } |