diff --git a/binlogreplication/binlog_replica_applier.go b/binlogreplication/binlog_replica_applier.go index 3feca741..407d0e83 100644 --- a/binlogreplication/binlog_replica_applier.go +++ b/binlogreplication/binlog_replica_applier.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "io" - "regexp" "strconv" "strings" "sync/atomic" @@ -54,9 +53,6 @@ const ( ERFatalReplicaError = 13117 ) -// Match any strings starting with "ON" (case insensitive) -var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`) - type tableIdentifier struct { dbName, tableName string } @@ -127,27 +123,46 @@ func (a *binlogReplicaApplier) IsRunning() bool { } // This function will connect to the MySQL server and check the GTID_MODE. -func connAndCheckGtidModeEnabled(ctx *sql.Context, params mysql.ConnParams) (bool, error) { +func detectVersionAndGTIDMode(ctx *sql.Context, params mysql.ConnParams) (mariaDB, gtidMode bool, err error) { conn, err := mysql.Connect(ctx, ¶ms) if err != nil { - return false, err + return false, false, err } defer conn.Close() var qr *sqltypes.Result - qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true) + qr, err = conn.ExecuteFetch("SELECT VERSION()", 1, true) if err != nil { - // Maybe it's a MariaDB server, try to get the GTID_STRICT_MODE instead + return false, false, fmt.Errorf("failed to check MySQL version: %w", err) + } + if len(qr.Rows) == 0 { + return false, false, errors.New("no rows returned when checking MySQL version") + } + version := string(qr.Rows[0][0].Raw()) + + mariaDB = strings.Contains(version, "MariaDB") + if mariaDB { qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_STRICT_MODE", 1, true) if err != nil { - return false, fmt.Errorf("error checking GTID_MODE: %v", err) + return mariaDB, false, fmt.Errorf("failed to check GTID_STRICT_MODE: %w", err) + } + } else { + qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true) + if err != nil { + return mariaDB, false, fmt.Errorf("failed to check GTID_MODE: %w", err) } } if len(qr.Rows) == 0 { - return false, fmt.Errorf("no rows returned when checking GTID_MODE") + return mariaDB, false, errors.New("no rows returned when checking GTID_MODE") } - gtidMode := string(qr.Rows[0][0].Raw()) - return gtidModeIsOnRegex.MatchString(gtidMode), nil + + gtidMode, err = qr.Rows[0][0].ToBool() + if err != nil { + gtidMode = strings.EqualFold(string(qr.Rows[0][0].Raw()), "ON") || + string(qr.Rows[0][0].Raw()) == "1" + } + + return mariaDB, gtidMode, nil } // connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing @@ -162,10 +177,13 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co connectRetryDelay = status.ConnectRetry }) - var conn *mysql.Conn - var err error - gtidModeEnabled := false - flavorName := "" + var ( + conn *mysql.Conn + err error + gtidMode = false + mariaDB = false + flavorName = "" + ) for connectionAttempts := uint64(0); ; connectionAttempts++ { replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb) @@ -193,13 +211,15 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co ConnectTimeoutMs: 4_000, } - gtidModeEnabled, err = connAndCheckGtidModeEnabled(ctx, connParams) + mariaDB, gtidMode, err = detectVersionAndGTIDMode(ctx, connParams) if err != nil && connectionAttempts >= maxConnectionAttempts { return nil, err } - if !gtidModeEnabled { + if !gtidMode { flavorName = replication.FilePosFlavorID + } else if mariaDB { + flavorName = replication.MariadbFlavorID } else { flavorName = replication.Mysql56FlavorID } @@ -232,7 +252,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co // Request binlog events to start // TODO: This should also have retry logic - err = a.startReplicationEventStream(ctx, conn, gtidModeEnabled, flavorName) + err = a.startReplicationEventStream(ctx, conn, gtidMode, flavorName) if err != nil { return nil, err } @@ -316,14 +336,14 @@ func (a *binlogReplicaApplier) loadLogFilePosition(ctx *sql.Context, positionSto // startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin // sending binlog events. -func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidModeEnabled bool, flavorName string) error { +func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidMode bool, flavorName string) error { serverId, err := loadReplicaServerId() if err != nil { return err } var position replication.Position - if gtidModeEnabled { + if gtidMode { position, err = a.loadGtidPosition(ctx, positionStore, flavorName) if err != nil { return err diff --git a/binlogreplication/binlog_replication_test.go b/binlogreplication/binlog_replication_test.go index 66595daf..5d1d02b4 100644 --- a/binlogreplication/binlog_replication_test.go +++ b/binlogreplication/binlog_replication_test.go @@ -320,6 +320,14 @@ func TestResetReplica(t *testing.T) { require.Equal(t, "No", status["Replica_IO_Running"]) require.Equal(t, "No", status["Replica_SQL_Running"]) + // Now try querying the status using the older, deprecated 'show slave status' statement + // and spot check that the data is the same, but the column names have changed + status = querySlaveStatus(t) + require.Equal(t, "", status["Master_Host"]) + require.Equal(t, "", status["Master_User"]) + require.Equal(t, "No", status["Slave_IO_Running"]) + require.Equal(t, "No", status["Slave_SQL_Running"]) + rows, err = replicaDatabase.Queryx("select * from mysql.slave_master_info;") require.NoError(t, err) require.False(t, rows.Next()) @@ -953,6 +961,19 @@ func queryReplicaStatus(t *testing.T) map[string]any { return status } +// querySlaveStatus returns the results of `SHOW SLAVE STATUS` as a map, for the replica +// database. If any errors are encountered, this function will fail the current test. +// The queryReplicaStatus() function should generally be favored over this function for +// getting the status of a replica. This function exists only to help test that the +// deprecated 'show slave status' statement works. +func querySlaveStatus(t *testing.T) map[string]any { + rows, err := replicaDatabase.Queryx("SHOW SLAVE STATUS;") + require.NoError(t, err) + status := convertMapScanResultToStrings(readNextRow(t, rows)) + require.NoError(t, rows.Close()) + return status +} + // mustListDatabases returns a string slice of the databases (i.e. schemas) available on the specified |db|. If // any errors are encountered, this function will fail the current test. func mustListDatabases(t *testing.T, db *sqlx.DB) []string {