Skip to content

Commit

Permalink
test: add binlog replication tests for log file based positioning
Browse files Browse the repository at this point in the history
  • Loading branch information
VWagen1989 committed Nov 15, 2024
1 parent e84ae85 commit 9bdf0e7
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 22 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ jobs:
cat query.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat query.log | grep -q "FAIL" && exit 1 || exit 0
- name: Test Binlog Replication
- name: Test Binlog Replication With GTID Enabled
run: |
go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
- name: Test Binlog Replication With GTID Disabled
run: |
GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
13 changes: 11 additions & 2 deletions .github/workflows/mysql-replication.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,14 @@ jobs:
- name: Build
run: go build -v

- name: Test Binlog Replication
run: go test -v -p 1 --timeout 600s ./binlogreplication
- name: Test Binlog Replication With GTID Enabled
run: |
GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
- name: Test Binlog Replication With GTID Disabled
run: |
GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
5 changes: 4 additions & 1 deletion binlogreplication/binlog_replication_reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ func testInitialReplicaStatus(t *testing.T) {
require.Equal(t, "Yes", status["Replica_SQL_Running"])

// Unsupported fields
require.Equal(t, "INVALID", status["Source_Log_File"])
gtidEnabled := getGtidEnabled()
if gtidEnabled {
require.Equal(t, "INVALID", status["Source_Log_File"])
}
require.Equal(t, "Ignored", status["Source_SSL_Allowed"])
require.Equal(t, "None", status["Until_Condition"])
require.Equal(t, "0", status["SQL_Delay"])
Expand Down
113 changes: 96 additions & 17 deletions binlogreplication/binlog_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,13 @@ func TestFlushLogs(t *testing.T) {
assertCreateTableStatement(t, replicaDatabase, "t", expectedStatement)

primaryDatabase.MustExec("flush binary logs;")
waitForReplicaToCatchUp(t)
gtidEnabled := getGtidEnabled()
if gtidEnabled {
// The 'FLUSH BINARY LOGS' statement will update the file position, but not the GTID.
// Since the applier will not update the replication progress when it receives a
// FormatDescription event, we should only check the replica status if GTID is enabled.
waitForReplicaToCatchUp(t)
}

primaryDatabase.MustExec("insert into t values (1), (2), (3);")
waitForReplicaToCatchUp(t)
Expand Down Expand Up @@ -549,14 +555,24 @@ func assertWarning(t *testing.T, database *sqlx.DB, code int, message string) {
}

func queryGtid(t *testing.T, database *sqlx.DB) string {
rows, err := database.Queryx("SELECT @@global.gtid_executed as gtid_executed;")
require.NoError(t, err)
defer rows.Close()
row := convertMapScanResultToStrings(readNextRow(t, rows))
if row["gtid_executed"] == nil {
t.Fatal("no value for @@GLOBAL.gtid_executed")
gtidEnabled := getGtidEnabled()
isPrimary := database == primaryDatabase
if gtidEnabled {
rows, err := database.Queryx("SELECT @@global.gtid_executed as gtid_executed;")
require.NoError(t, err)
defer rows.Close()
row := convertMapScanResultToStrings(readNextRow(t, rows))
if row["gtid_executed"] == nil {
t.Fatal("no value for @@GLOBAL.gtid_executed")
}
return row["gtid_executed"].(string)
} else if isPrimary {
sourceLogFile, sourceLogPos := getPrimaryLogPosition(t, gtidEnabled)
return fmt.Sprintf("%s:%s", sourceLogFile, sourceLogPos)
} else {
sourceLogFile, sourceLogPos := getReplicaLogPosition(t)
return fmt.Sprintf("%s:%s", sourceLogFile, sourceLogPos)
}
return row["gtid_executed"].(string)
}

func readNextRow(t *testing.T, rows *sqlx.Rows) map[string]interface{} {
Expand Down Expand Up @@ -659,13 +675,56 @@ func stopDuckSqlServer(t *testing.T) {
time.Sleep(250 * time.Millisecond)
}

func getPrimaryLogPosition(t *testing.T, gtidEnabled bool) (string, string) {
rows, err := primaryDatabase.Queryx("SHOW BINARY LOG STATUS;")
require.NoError(t, err)
primaryStatus := convertMapScanResultToStrings(readNextRow(t, rows))
sourceLogFile := primaryStatus["File"].(string)
sourceLogPos := primaryStatus["Position"].(string)
require.NoError(t, rows.Close())

return sourceLogFile, sourceLogPos
}

func getReplicaLogPosition(t *testing.T) (string, string) {
rows, err := replicaDatabase.Queryx("SHOW REPLICA STATUS;")
require.NoError(t, err)
replicaStatus := convertMapScanResultToStrings(readNextRow(t, rows))
executedGtidSet := replicaStatus["Executed_Gtid_Set"].(string)
require.NoError(t, rows.Close())

// the executedGtidSet is like the format of "binlog.000002:6757", split it into file and pos
parts := strings.Split(executedGtidSet, ":")
require.Equal(t, 2, len(parts))
sourceLogFile := parts[0]
sourceLogPos := parts[1]
return sourceLogFile, sourceLogPos
}

// startReplication configures the replication source on the replica and runs the START REPLICA statement.
func startReplication(_ *testing.T, port int) {
replicaDatabase.MustExec(
fmt.Sprintf("change replication source to SOURCE_HOST='localhost', "+
"SOURCE_USER='replicator', SOURCE_PASSWORD='Zqr8_blrGm1!', "+
"SOURCE_PORT=%v, SOURCE_AUTO_POSITION=1, SOURCE_CONNECT_RETRY=5;", port))
func startReplication(t *testing.T, port int) {
gtidEnabled := getGtidEnabled()

// If GTID is not enabled, we should get the log position by "SHOW MASTER STATUS" first,
// if it failed, try "SHOW BINARY LOG STATUS" instead. Then we extract the log position from
// the result and use it as the source_log_pos when starting the replica.
sourceLogFile := ""
sourceLogPos := ""
if !gtidEnabled {
sourceLogFile, sourceLogPos = getPrimaryLogPosition(t, gtidEnabled)
}

cmdStr := fmt.Sprintf("CHANGE REPLICATION SOURCE TO "+
"SOURCE_HOST='localhost', "+
"SOURCE_USER='replicator', "+
"SOURCE_PASSWORD='Zqr8_blrGm1!', "+
"SOURCE_PORT=%v, "+
"SOURCE_AUTO_POSITION=1, "+
"SOURCE_CONNECT_RETRY=5", port)
if !gtidEnabled {
cmdStr += fmt.Sprintf(", SOURCE_LOG_FILE='%s', SOURCE_LOG_POS=%s", sourceLogFile, sourceLogPos)
}
replicaDatabase.MustExec(cmdStr)
replicaDatabase.MustExec("start replica;")
}

Expand Down Expand Up @@ -735,6 +794,17 @@ func findFreePort() int {
return freePort
}

func getGtidEnabled() bool {
gtidEnabled := strings.ToLower(os.Getenv("GTID_ENABLED"))
if gtidEnabled == "" {
return true
}
if gtidEnabled != "true" && gtidEnabled != "false" {
panic(fmt.Sprintf("GTID_ENABLED environment variable must be 'true' or 'false', got: %s", gtidEnabled))
}
return gtidEnabled == "true"
}

// startMySqlServer configures a starts a fresh MySQL server instance in a Docker container
// and returns the port it is running on. If unable to start up the MySQL server, an error is returned.
func startMySqlServer(dir string) (int, string, error) {
Expand All @@ -743,8 +813,11 @@ func startMySqlServer(dir string) (int, string, error) {
// Use a random name for the container to avoid conflicts
mySqlContainer = "mysql-test-" + strconv.Itoa(rand.Int())

gtidEnabled := getGtidEnabled()

// Build the Docker command to start the MySQL container
cmd := exec.Command("docker", "run",
cmdArgs := []string{
"run",
"--rm", // Remove the container when it stops
"-d", // Run in detached mode
"-p", fmt.Sprintf("%d:3306", mySqlPort), // Map the container's port 3306 to the host's mySqlPort
Expand All @@ -753,9 +826,15 @@ func startMySqlServer(dir string) (int, string, error) {
"--name", mySqlContainer, // Give the container a name
"mysql:latest", // Use the latest MySQL image
"mysqld",
"--gtid_mode=ON",
"--enforce-gtid-consistency=ON",
)
}
if gtidEnabled {
cmdArgs = append(cmdArgs, "--gtid_mode=ON", "--enforce-gtid-consistency=ON")
} else {
cmdArgs = append(cmdArgs, "--gtid_mode=OFF", "--enforce-gtid-consistency=OFF")
}

// Build the Docker command to start the MySQL container
cmd := exec.Command("docker", cmdArgs...)

// Execute the Docker command
output, err := cmd.CombinedOutput()
Expand Down

0 comments on commit 9bdf0e7

Please sign in to comment.