Skip to content

Commit

Permalink
Merge pull request #9 from connectfit-team/fix-logging-format
Browse files Browse the repository at this point in the history
Fixed logging format
  • Loading branch information
matthieugusmini authored Oct 20, 2023
2 parents 8e43c0d + 3cad4f1 commit e25516e
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21

- name: Run integration tests
run: make integration-tests
2 changes: 1 addition & 1 deletion Dockerfile.test
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1

FROM golang:1.19
FROM golang:1.21

WORKDIR /app

Expand Down
32 changes: 16 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"time"

"golang.org/x/exp/slog"
)

const MaxScanTokenSize = 1024 * 1024
const maxScanTokenSize = 1024 * 1024

var (
// ErrNotConnected is returned when trying to stop the client from streaming
Expand Down Expand Up @@ -82,7 +81,7 @@ type Client struct {
uuid string
conn net.Conn
wg sync.WaitGroup
options clientOptions
clientOptions
}

// NewClient returns a new MaxScale CDC Client given an address to connect to,
Expand All @@ -93,7 +92,7 @@ func NewClient(address, user, password, uuid string, opts ...ClientOption) *Clie
user: user,
password: password,
uuid: uuid,
options: clientOptions{
clientOptions: clientOptions{
dialTimeout: defaultDialTimeout,
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
Expand All @@ -102,7 +101,7 @@ func NewClient(address, user, password, uuid string, opts ...ClientOption) *Clie
}

for _, opt := range opts {
opt(&client.options)
opt(&client.clientOptions)
}

return client
Expand Down Expand Up @@ -179,7 +178,7 @@ func (c *Client) Stop() error {
// See https://mariadb.com/kb/en/mariadb-maxscale-6-change-data-capture-cdc-protocol/#connection-and-authentication
func (c *Client) connect() error {
dialer := &net.Dialer{
Timeout: c.options.dialTimeout,
Timeout: c.dialTimeout,
}
conn, err := dialer.Dial("tcp", c.address)
if err != nil {
Expand Down Expand Up @@ -233,9 +232,10 @@ func (c *Client) requestData(database, table, version, gtid string) (<-chan Even
defer c.wg.Done()

if err := c.handleEvents(events); err != nil {
c.options.logger.Error("An error happened while decoding CDC events", err,
"database", database,
"table", table,
c.logger.Error("An error happened while decoding CDC events",
slog.Any("err", err),
slog.String("database", database),
slog.String("table", table),
)
}
close(events)
Expand All @@ -247,15 +247,15 @@ func (c *Client) requestData(database, table, version, gtid string) (<-chan Even
func (c *Client) handleEvents(data chan<- Event) error {
var readSchema bool
scanner := bufio.NewScanner(c.conn)
buf := make([]byte, 0, MaxScanTokenSize)
scanner.Buffer(buf, MaxScanTokenSize)
buf := make([]byte, 0, maxScanTokenSize)
scanner.Buffer(buf, maxScanTokenSize)
for scanner.Scan() {
token := scanner.Bytes()

// If the request for data is rejected, an error will be sent instead of the table schema.
if !readSchema && isErrorResponse(token) {
c.options.logger.Warn("Failed to read the table schema",
"error", string(token),
c.logger.Warn("Failed to read the table schema",
slog.String("error", string(token)),
)
continue
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (c *Client) formatRequestDataCommand(database, table, version, gtid string)
}

func (c *Client) writeToConnection(b []byte) error {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.options.writeTimeout)); err != nil {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)); err != nil {
return fmt.Errorf("could not set write deadline to the future write call on the connection: %w", err)
}

Expand All @@ -368,7 +368,7 @@ func (c *Client) writeToConnection(b []byte) error {
}

func (c *Client) readResponse() ([]byte, error) {
if err := c.conn.SetReadDeadline(time.Now().Add(c.options.readTimeout)); err != nil {
if err := c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil {
return nil, fmt.Errorf("could not set read deadline to the future read call on the connection: %w", err)
}

Expand Down
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
module github.com/connectfit-team/maxscale-cdc

go 1.19
go 1.21

require (
github.com/google/go-cmp v0.5.9
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2
)
require github.com/google/go-cmp v0.5.9
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 h1:5sPMf9HJXrvBWIamTw+rTST0bZ3Mho2n1p58M0+W99c=
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=

0 comments on commit e25516e

Please sign in to comment.