diff --git a/client.go b/client.go index 2474482..119cfb5 100644 --- a/client.go +++ b/client.go @@ -80,6 +80,7 @@ type Client struct { password string uuid string conn net.Conn + done chan struct{} wg sync.WaitGroup clientOptions } @@ -167,11 +168,15 @@ func (c *Client) Stop() error { return ErrNotConnected } + // Close the connection to stop the blocking read call. err := c.conn.Close() + // In case the goroutine handling the events was trying to send an event + // on the channel, we tell it to stop so we can close the event channel safely. + close(c.done) + c.wg.Wait() - c.conn = nil return err } @@ -213,6 +218,7 @@ func (c *Client) register() error { // See https://mariadb.com/kb/en/mariadb-maxscale-6-change-data-capture-cdc-protocol/#request-data func (c *Client) requestData(database, table, version, gtid string) (<-chan Event, error) { events := make(chan Event, 1) + c.done = make(chan struct{}, 1) cmd, err := c.formatRequestDataCommand(database, table, version, gtid) if err != nil { @@ -268,7 +274,14 @@ func (c *Client) handleEvents(data chan<- Event) error { if err != nil { return err } - data <- event + + // If the client is stopped, we stop sending events on the channel. + // Otherwise it would block forever if no one is receiving from it. + select { + case data <- event: + case <-c.done: + return nil + } } return scanner.Err() }