Skip to content

Commit

Permalink
fix: support log file based positioning replication (to #155) (#156)
Browse files Browse the repository at this point in the history
* fix: support log file based positioning replication (to #155)

* fix: adopt CR feedback

* fix: adopt CR feedback

* fix: return INVALID if Source_Log_File is empty

* fix: wait for the 'stop_replica' signal if fail to connect to primary

* chore: refine the output of unit test in GitHub workflow action

* test: add binlog replication tests for log file based positioning
  • Loading branch information
VWagen1989 authored Nov 15, 2024
1 parent 321ec8e commit 8c7543c
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 97 deletions.
23 changes: 19 additions & 4 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,25 @@ jobs:
run: go build -v

- name: Test packages
run: go test -v -cover ./charset ./transpiler ./backend ./harness
run: |
go test -v -cover ./charset ./transpiler ./backend ./harness | tee packages.log
cat packages.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat packages.log | grep -q "FAIL" && exit 1 || exit 0
- name: Test Query Engine
run: go test -v -cover --timeout 600s .
run: |
go test -v -cover --timeout 600s . | tee query.log
cat query.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat query.log | grep -q "FAIL" && exit 1 || exit 0
- name: Test Binlog Replication
run: go test -v -p 1 --timeout 360s ./binlogreplication
- name: Test Binlog Replication With GTID Enabled
run: |
GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
- name: Test Binlog Replication With GTID Disabled
run: |
GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
13 changes: 11 additions & 2 deletions .github/workflows/mysql-replication.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,14 @@ jobs:
- name: Build
run: go build -v

- name: Test Binlog Replication
run: go test -v -p 1 --timeout 360s ./binlogreplication
- name: Test Binlog Replication With GTID Enabled
run: |
GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
- name: Test Binlog Replication With GTID Disabled
run: |
GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
5 changes: 2 additions & 3 deletions binlogreplication/binlog_position_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
)

const binlogPositionDirectory = ".replica"
const mysqlFlavor = "MySQL56"
const defaultChannelName = ""

// binlogPositionStore manages loading and saving data to the binlog position metadata table. This provides
Expand All @@ -47,7 +46,7 @@ type binlogPositionStore struct {
// Currently only the default binlog channel ("") is supported.
// If no position is stored, this method returns a zero mysql.Position and a nil error.
// If any errors are encountered, a nil mysql.Position and an error are returned.
func (store *binlogPositionStore) Load(ctx *sql.Context, engine *gms.Engine) (pos replication.Position, err error) {
func (store *binlogPositionStore) Load(flavor string, ctx *sql.Context, engine *gms.Engine) (pos replication.Position, err error) {
store.mu.Lock()
defer store.mu.Unlock()

Expand All @@ -62,7 +61,7 @@ func (store *binlogPositionStore) Load(ctx *sql.Context, engine *gms.Engine) (po
// Strip off the "MySQL56/" prefix
positionString = strings.TrimPrefix(positionString, "MySQL56/")

return replication.ParsePosition(mysqlFlavor, positionString)
return replication.ParsePosition(flavor, positionString)
}

// Save persists the specified |position| to disk.
Expand Down
113 changes: 97 additions & 16 deletions binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -53,6 +54,9 @@ const (
ERFatalReplicaError = 13117
)

// Match any strings starting with "ON" (case insensitive)
var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`)

// binlogReplicaApplier represents the process that applies updates from a binlog connection.
//
// This type is NOT used concurrently – there is currently only one single applier process running to process binlog
Expand Down Expand Up @@ -116,6 +120,30 @@ func (a *binlogReplicaApplier) IsRunning() bool {
return a.running.Load()
}

// This function will connect to the MySQL server and check the GTID_MODE.
func connAndCheckGtidModeEnabled(ctx *sql.Context, params mysql.ConnParams) (bool, error) {
conn, err := mysql.Connect(ctx, &params)
if err != nil {
return false, err
}
defer conn.Close()

var qr *sqltypes.Result
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true)
if err != nil {
// Maybe it's a MariaDB server, try to get the GTID_STRICT_MODE instead
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_STRICT_MODE", 1, true)
if err != nil {
return false, fmt.Errorf("error checking GTID_MODE: %v", err)
}
}
if len(qr.Rows) == 0 {
return false, fmt.Errorf("no rows returned when checking GTID_MODE")
}
gtidMode := string(qr.Rows[0][0].Raw())
return gtidModeIsOnRegex.MatchString(gtidMode), nil
}

// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
// and retrying if errors are encountered.
func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) {
Expand All @@ -130,6 +158,8 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

var conn *mysql.Conn
var err error
gtidModeEnabled := false
flavorName := ""
for connectionAttempts := uint64(0); ; connectionAttempts++ {
replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb)

Expand Down Expand Up @@ -157,6 +187,18 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
ConnectTimeoutMs: 4_000,
}

gtidModeEnabled, err = connAndCheckGtidModeEnabled(ctx, connParams)
if err != nil && connectionAttempts >= maxConnectionAttempts {
return nil, err
}

if !gtidModeEnabled {
flavorName = replication.FilePosFlavorID
} else {
flavorName = replication.Mysql56FlavorID
}
connParams.Flavor = flavorName

conn, err = mysql.Connect(ctx, &connParams)
if err != nil {
logrus.Warnf("failed connection attempt to source (%s): %s",
Expand Down Expand Up @@ -184,7 +226,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

// Request binlog events to start
// TODO: This should also have retry logic
err = a.startReplicationEventStream(ctx, conn)
err = a.startReplicationEventStream(ctx, conn, gtidModeEnabled, flavorName)
if err != nil {
return nil, err
}
Expand All @@ -196,17 +238,10 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
return conn, nil
}

// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin
// sending binlog events.
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn) error {
serverId, err := loadReplicaServerId()
func (a *binlogReplicaApplier) loadGtidPosition(ctx *sql.Context, positionStore *binlogPositionStore, flavorName string) (replication.Position, error) {
position, err := positionStore.Load(flavorName, ctx, a.engine)
if err != nil {
return err
}

position, err := positionStore.Load(ctx, a.engine)
if err != nil {
return err
return replication.Position{}, err
}

if position.IsZero() {
Expand All @@ -227,9 +262,9 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
gtidPurged = gtidPurged[1:]
}

purged, err := replication.ParsePosition(mysqlFlavor, gtidPurged)
purged, err := replication.ParsePosition(flavorName, gtidPurged)
if err != nil {
return err
return replication.Position{}, err
}
position = purged
}
Expand All @@ -248,11 +283,57 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
position = replication.Position{GTIDSet: gtid.GTIDSet()}
}

return position, nil
}

// another method like "initializedGtidPosition" to get the current log file based position
func (a *binlogReplicaApplier) loadLogFilePosition(ctx *sql.Context, positionStore *binlogPositionStore, flavorName string) (replication.Position, error) {
position, err := positionStore.Load(flavorName, ctx, a.engine)
if err != nil {
return replication.Position{}, err
}

if position.IsZero() {
replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb)
if err != nil {
return replication.Position{}, err
}
filePosGtid := replication.FilePosGTID{
File: replicaSourceInfo.SourceLogFile,
Pos: uint32(replicaSourceInfo.SourceLogPos),
}
position = replication.Position{GTIDSet: filePosGtid}
}

return position, nil
}

// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin
// sending binlog events.
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidModeEnabled bool, flavorName string) error {
serverId, err := loadReplicaServerId()
if err != nil {
return err
}

var position replication.Position
if gtidModeEnabled {
position, err = a.loadGtidPosition(ctx, positionStore, flavorName)
if err != nil {
return err
}
if err := sql.SystemVariables.AssignValues(map[string]interface{}{"gtid_executed": position.GTIDSet.String()}); err != nil {
ctx.GetLogger().Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error())
}
} else {
position, err = a.loadLogFilePosition(ctx, positionStore, flavorName)
if err != nil {
return err
}
}

a.currentPosition = position
a.pendingPosition = position
if err := sql.SystemVariables.AssignValues(map[string]interface{}{"gtid_executed": a.currentPosition.GTIDSet.String()}); err != nil {
ctx.GetLogger().Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error())
}

// Clear out the format description in case we're reconnecting, so that we don't use the old format description
// to interpret any event messages before we receive the new format description from the new stream.
Expand Down
24 changes: 24 additions & 0 deletions binlogreplication/binlog_replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ func (d *myBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context
return err
}
replicaSourceInfo.ConnectRetryCount = uint64(intValue)
case "SOURCE_LOG_FILE":
value, err := getOptionValueAsString(option)
if err != nil {
return err
}
replicaSourceInfo.SourceLogFile = value
case "SOURCE_LOG_POS":
intValue, err := getOptionValueAsInt(option)
if err != nil {
return err
}
replicaSourceInfo.SourceLogPos = uint64(intValue)
case "SOURCE_AUTO_POSITION":
intValue, err := getOptionValueAsInt(option)
if err != nil {
Expand Down Expand Up @@ -333,6 +345,14 @@ func (d *myBinlogReplicaController) SetReplicationFilterOptions(_ *sql.Context,
return nil
}

func changeSourceLogFileToInvalidIfEmpty(status *binlogreplication.ReplicaStatus) {
// As the original design of go-mysql-server, the source log file should be "INVALID" if GTID_MODE is ON.
// An empty string of source log file means GTID_MODE is ON, and we should set it to "INVALID" here.
if status.SourceLogFile == "" {
status.SourceLogFile = "INVALID"
}
}

// GetReplicaStatus implements the BinlogReplicaController interface
func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogreplication.ReplicaStatus, error) {
replicaSourceInfo, err := loadReplicationConfiguration(ctx, d.engine.Analyzer.Catalog.MySQLDb)
Expand All @@ -346,12 +366,15 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr
var copy = d.status

if replicaSourceInfo == nil {
changeSourceLogFileToInvalidIfEmpty(&copy)
return &copy, nil
}

copy.SourceUser = replicaSourceInfo.User
copy.SourceHost = replicaSourceInfo.Host
copy.SourcePort = uint(replicaSourceInfo.Port)
copy.SourceLogFile = replicaSourceInfo.SourceLogFile
copy.SourceLogPos = replicaSourceInfo.SourceLogPos
copy.SourceServerUuid = replicaSourceInfo.Uuid
copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval
copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount
Expand All @@ -363,6 +386,7 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr
copy.RetrievedGtidSet = copy.ExecutedGtidSet
}

changeSourceLogFileToInvalidIfEmpty(&copy)
return &copy, nil
}

Expand Down
5 changes: 4 additions & 1 deletion binlogreplication/binlog_replication_reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ func testInitialReplicaStatus(t *testing.T) {
require.Equal(t, "Yes", status["Replica_SQL_Running"])

// Unsupported fields
require.Equal(t, "INVALID", status["Source_Log_File"])
gtidEnabled := getGtidEnabled()
if gtidEnabled {
require.Equal(t, "INVALID", status["Source_Log_File"])
}
require.Equal(t, "Ignored", status["Source_SSL_Allowed"])
require.Equal(t, "None", status["Until_Condition"])
require.Equal(t, "0", status["SQL_Delay"])
Expand Down
Loading

0 comments on commit 8c7543c

Please sign in to comment.