Skip to content

Commit

Permalink
Merge pull request #10 from connectfit-team/fix-close-method
Browse files Browse the repository at this point in the history
Fixed the cancellation of the event delivery
  • Loading branch information
matthieugusmini authored Oct 20, 2023
2 parents e25516e + f477ed0 commit e39e6e7
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Client struct {
password string
uuid string
conn net.Conn
done chan struct{}
wg sync.WaitGroup
clientOptions
}
Expand Down Expand Up @@ -167,11 +168,15 @@ func (c *Client) Stop() error {
return ErrNotConnected
}

// Close the connection to stop the blocking read call.
err := c.conn.Close()

// In case the goroutine handling the events was trying to send an event
// on the channel, we tell it to stop so we can close the event channel safely.
close(c.done)

c.wg.Wait()

c.conn = nil
return err
}

Expand Down Expand Up @@ -213,6 +218,7 @@ func (c *Client) register() error {
// See https://mariadb.com/kb/en/mariadb-maxscale-6-change-data-capture-cdc-protocol/#request-data
func (c *Client) requestData(database, table, version, gtid string) (<-chan Event, error) {
events := make(chan Event, 1)
c.done = make(chan struct{}, 1)

cmd, err := c.formatRequestDataCommand(database, table, version, gtid)
if err != nil {
Expand Down Expand Up @@ -268,7 +274,14 @@ func (c *Client) handleEvents(data chan<- Event) error {
if err != nil {
return err
}
data <- event

// If the client is stopped, we stop sending events on the channel.
// Otherwise it would block forever if no one is receiving from it.
select {
case data <- event:
case <-c.done:
return nil
}
}
return scanner.Err()
}
Expand Down

0 comments on commit e39e6e7

Please sign in to comment.