Skip to content

Commit

Permalink
Fixed the cancellation
Browse files Browse the repository at this point in the history
The client was still trying to send event to a full channel even tho no one was listening on it. So we added a `done` channel to tell the goroutine to stop when the client it stoped.
  • Loading branch information
Matthieu` committed Oct 20, 2023
1 parent 3cad4f1 commit f477ed0
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Client struct {
password string
uuid string
conn net.Conn
done chan struct{}
wg sync.WaitGroup
clientOptions
}
Expand Down Expand Up @@ -167,11 +168,15 @@ func (c *Client) Stop() error {
return ErrNotConnected
}

// Close the connection to stop the blocking read call.
err := c.conn.Close()

// In case the goroutine handling the events was trying to send an event
// on the channel, we tell it to stop so we can close the event channel safely.
close(c.done)

c.wg.Wait()

c.conn = nil
return err
}

Expand Down Expand Up @@ -213,6 +218,7 @@ func (c *Client) register() error {
// See https://mariadb.com/kb/en/mariadb-maxscale-6-change-data-capture-cdc-protocol/#request-data
func (c *Client) requestData(database, table, version, gtid string) (<-chan Event, error) {
events := make(chan Event, 1)
c.done = make(chan struct{}, 1)

cmd, err := c.formatRequestDataCommand(database, table, version, gtid)
if err != nil {
Expand Down Expand Up @@ -268,7 +274,14 @@ func (c *Client) handleEvents(data chan<- Event) error {
if err != nil {
return err
}
data <- event

// If the client is stopped, we stop sending events on the channel.
// Otherwise it would block forever if no one is receiving from it.
select {
case data <- event:
case <-c.done:
return nil
}
}
return scanner.Err()
}
Expand Down

0 comments on commit f477ed0

Please sign in to comment.