diff --git a/.github/workflows/replication-test.yml b/.github/workflows/replication-test.yml index 0c9f01b7..3ffa99e4 100644 --- a/.github/workflows/replication-test.yml +++ b/.github/workflows/replication-test.yml @@ -90,15 +90,21 @@ jobs: docker exec source-db dolt sql -q " CREATE DATABASE test; CREATE TABLE test.items (id INT PRIMARY KEY, name VARCHAR(50)); - INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2');" + INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2'); + CREATE TABLE test.skip (id INT PRIMARY KEY, name VARCHAR(50)); + INSERT INTO test.skip VALUES (1, 'abc'), (2, 'def');" elif [ "${{ matrix.source }}" = "mariadb" ]; then docker exec source-db mariadb -uroot -proot test -e " CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50)); - INSERT INTO items VALUES (1, 'test1'), (2, 'test2');" + INSERT INTO items VALUES (1, 'test1'), (2, 'test2'); + CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50)); + INSERT INTO skip VALUES (1, 'abc'), (2, 'def');" else docker exec source-db mysql -uroot -proot test -e " CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50)); - INSERT INTO items VALUES (1, 'test1'), (2, 'test2');" + INSERT INTO items VALUES (1, 'test1'), (2, 'test2'); + CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50)); + INSERT INTO skip VALUES (1, 'abc'), (2, 'def');" fi - name: Start MyDuck Server in replica mode @@ -106,7 +112,7 @@ jobs: if [ "${{ matrix.source }}" = "postgres" ]; then SOURCE_DSN="postgres://postgres:postgres@host.docker.internal:5432/test" else - SOURCE_DSN="mysql://root:root@host.docker.internal:3306" + SOURCE_DSN="mysql://root:root@host.docker.internal:3306/test?skip-tables=skip" fi docker run -d --name myduck \ @@ -203,6 +209,9 @@ jobs: exit 1 fi + # Print the logs + docker logs myduck + - name: Cleanup if: always() run: | diff --git a/backend/loaddata.go b/backend/loaddata.go index b9b9a91c..d0b50783 100644 --- a/backend/loaddata.go +++ b/backend/loaddata.go @@ -123,7 +123,7 @@ func (db *DuckBuilder) executeLoadData(ctx *sql.Context, insert *plan.InsertInto // Replicated tables do not have physical primary keys. // Their logical primary keys are fake and should not be used in INSERT INTO statements. // https://github.com/apecloud/myduckserver/issues/272 - keyless = t.ExtraTableInfo().Replicated + keyless = t.ExtraTableInfo().Replicated || !t.HasPrimaryKey() } } diff --git a/binlogreplication/binlog_replica_controller.go b/binlogreplication/binlog_replica_controller.go index fdb5b621..3aafc258 100644 --- a/binlogreplication/binlog_replica_controller.go +++ b/binlogreplication/binlog_replica_controller.go @@ -313,6 +313,18 @@ func (d *myBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context func (d *myBinlogReplicaController) SetReplicationFilterOptions(_ *sql.Context, options []binlogreplication.ReplicationOption) error { for _, option := range options { switch strings.ToUpper(option.Name) { + case "REPLICATE_DO_DB": + value, err := getOptionValueAsDatabaseNames(option) + if err != nil { + return err + } + d.filters.setDoDatabases(value) + case "REPLICATE_IGNORE_DB": + value, err := getOptionValueAsDatabaseNames(option) + if err != nil { + return err + } + d.filters.setIgnoreDatabases(value) case "REPLICATE_DO_TABLE": value, err := getOptionValueAsTableNames(option) if err != nil { @@ -378,6 +390,8 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr copy.SourceServerUuid = replicaSourceInfo.Uuid copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount + // copy.ReplicateDoDBs = d.filters.getDoDatabases() + // copy.ReplicateIgnoreDBs = d.filters.getIgnoreDatabases() copy.ReplicateDoTables = d.filters.getDoTables() copy.ReplicateIgnoreTables = d.filters.getIgnoreTables() @@ -523,6 +537,24 @@ func getOptionValueAsTableNames(option binlogreplication.ReplicationOption) ([]s "but expected a list of tables", option.Name, option.Value.GetValue()) } +func getOptionValueAsDatabaseNames(option binlogreplication.ReplicationOption) ([]string, error) { + // The value of the option should be a list of database names. + // But since the parser doesn't have a database name list type, + // we reuse the table name list type to represent a list of database names. + ov, ok := option.Value.(binlogreplication.TableNamesReplicationOptionValue) + if ok { + list := ov.GetValueAsTableList() + names := make([]string, len(list)) + for i, t := range list { + names[i] = t.Name() + } + return names, nil + } + + return nil, fmt.Errorf("unsupported value type for option %q; found %T, "+ + "but expected a list of databases", option.Name, option.Value.GetValue()) +} + func verifyAllTablesAreQualified(urts []sql.UnresolvedTable) error { for _, urt := range urts { if urt.Database().Name() == "" { diff --git a/binlogreplication/binlog_replica_filtering.go b/binlogreplication/binlog_replica_filtering.go index 46dd56bb..110b2fec 100644 --- a/binlogreplication/binlog_replica_filtering.go +++ b/binlogreplication/binlog_replica_filtering.go @@ -25,6 +25,10 @@ import ( // filterConfiguration defines the binlog filtering rules applied on the replica. type filterConfiguration struct { + // doDatabases holds a map of database names that SHOULD be replicated. + doDatabases map[string]struct{} + // ignoreDatabases holds a map of database names that should NOT be replicated. + ignoreDatabases map[string]struct{} // doTables holds a map of database name to map of table names, indicating tables that SHOULD be replicated. doTables map[string]map[string]struct{} // ignoreTables holds a map of database name to map of table names, indicating tables that should NOT be replicated. @@ -36,9 +40,39 @@ type filterConfiguration struct { // newFilterConfiguration creates a new filterConfiguration instance and initializes members. func newFilterConfiguration() *filterConfiguration { return &filterConfiguration{ - doTables: make(map[string]map[string]struct{}), - ignoreTables: make(map[string]map[string]struct{}), - mu: &sync.Mutex{}, + doDatabases: make(map[string]struct{}), + ignoreDatabases: make(map[string]struct{}), + doTables: make(map[string]map[string]struct{}), + ignoreTables: make(map[string]map[string]struct{}), + mu: &sync.Mutex{}, + } +} + +// setDoDatabases sets the databases that are allowed to replicate. If any DoDatabases were previously configured, +// they are cleared out before the new databases are set. +func (fc *filterConfiguration) setDoDatabases(databases []string) { + fc.mu.Lock() + defer fc.mu.Unlock() + + // Setting new replication filters clears out any existing filters + fc.doDatabases = make(map[string]struct{}) + + for _, db := range databases { + fc.doDatabases[strings.ToLower(db)] = struct{}{} + } +} + +// setIgnoreDatabases sets the databases that are NOT allowed to replicate. If any IgnoreDatabases were previously configured, +// they are cleared out before the new databases are set. +func (fc *filterConfiguration) setIgnoreDatabases(databases []string) { + fc.mu.Lock() + defer fc.mu.Unlock() + + // Setting new replication filters clears out any existing filters + fc.ignoreDatabases = make(map[string]struct{}) + + for _, db := range databases { + fc.ignoreDatabases[strings.ToLower(db)] = struct{}{} } } @@ -96,6 +130,38 @@ func (fc *filterConfiguration) setIgnoreTables(urts []sql.UnresolvedTable) error return nil } +// getDoDatabases returns a slice of database names that are configured to be replicated. +func (fc *filterConfiguration) getDoDatabases() []string { + fc.mu.Lock() + defer fc.mu.Unlock() + + if len(fc.doDatabases) == 0 { + return nil + } + + databases := make([]string, 0, len(fc.doDatabases)) + for db := range fc.doDatabases { + databases = append(databases, db) + } + return databases +} + +// getIgnoreDatabases returns a slice of database names that are configured to be filtered out of replication. +func (fc *filterConfiguration) getIgnoreDatabases() []string { + fc.mu.Lock() + defer fc.mu.Unlock() + + if len(fc.ignoreDatabases) == 0 { + return nil + } + + databases := make([]string, 0, len(fc.ignoreDatabases)) + for db := range fc.ignoreDatabases { + databases = append(databases, db) + } + return databases +} + // isTableFilteredOut returns true if the table identified by |tableMap| has been filtered out on this replica and // should not have any updates applied from binlog messages. func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *mysql.TableMap) bool { @@ -109,6 +175,21 @@ func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *my fc.mu.Lock() defer fc.mu.Unlock() + // If any filter doDatabase options are specified, then a database MUST be listed in the set + // for it to be replicated. doDatabase options are processed BEFORE ignoreDatabase options. + // https://dev.mysql.com/doc/refman/8.4/en/replication-rules-db-options.html + if len(fc.doDatabases) > 0 { + if _, ok := fc.doDatabases[db]; !ok { + ctx.GetLogger().Tracef("skipping database %s (not in doDatabases)", db) + return true + } + } else if len(fc.ignoreDatabases) > 0 { + if _, ok := fc.ignoreDatabases[db]; ok { + ctx.GetLogger().Tracef("skipping database %s (in ignoreDatabases)", db) + return true + } + } + // If any filter doTable options are specified, then a table MUST be listed in the set // for it to be replicated. doTables options are processed BEFORE ignoreTables options. // If a table appears in both doTable and ignoreTables, it is ignored. @@ -160,7 +241,7 @@ func convertFilterMapToStringSlice(filterMap map[string]map[string]struct{}) []s tableNames := make([]string, 0, len(filterMap)) for dbName, tableMap := range filterMap { - for tableName, _ := range tableMap { + for tableName := range tableMap { tableNames = append(tableNames, fmt.Sprintf("%s.%s", dbName, tableName)) } } diff --git a/binlogreplication/binlog_replication_filters_test.go b/binlogreplication/binlog_replication_filters_test.go index 299be4ef..76f5c14a 100644 --- a/binlogreplication/binlog_replication_filters_test.go +++ b/binlogreplication/binlog_replication_filters_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" ) -// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication +// TestReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication // filtering option is correctly applied and honored. func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) { defer teardown(t) @@ -189,3 +189,96 @@ func TestBinlogReplicationFilters_errorCases(t *testing.T) { require.Error(t, err) require.ErrorContains(t, err, "no database specified for table") } + +// TestReplicationFilters_ignoreDatabasesOnly tests that the ignoreDatabases replication +// filtering option is correctly applied and honored. +func TestReplicationFilters_ignoreDatabasesOnly(t *testing.T) { + defer teardown(t) + startSqlServersWithSystemVars(t, duckReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Ignore replication events for db01. Also tests that the first filter setting is overwritten by + // the second and that db names are case-insensitive. + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(db02);") + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(DB01);") + + // TODO(fan): Not implemented yet + // Assert that status shows replication filters + // status := showReplicaStatus(t) + // require.Equal(t, "db01", status["Replicate_Ignore_DB"]) + // require.Equal(t, "", status["Replicate_Do_DB"]) + + // Make changes on the primary + primaryDatabase.MustExec("CREATE DATABASE db02;") + primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);") + primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);") + for i := 1; i < 12; i++ { + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i)) + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i)) + } + + // Pause to let the replica catch up + waitForReplicaToCatchUp(t) + + // Although the database is ignored, it is still created on the replica + // because the DDL statements are not filtered out. + + // Verify that no changes from db01 were applied on the replica + rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "0", row["count"]) + require.NoError(t, rows.Close()) + + // Verify that all changes from db02 were applied on the replica + rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "11", row["count"]) + require.NoError(t, rows.Close()) +} + +// TestReplicationFilters_doDatabasesOnly tests that the doDatabases replication +// filtering option is correctly applied and honored. +func TestReplicationFilters_doDatabasesOnly(t *testing.T) { + defer teardown(t) + startSqlServersWithSystemVars(t, duckReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Do replication events for db01. Also tests that the first filter setting is overwritten by + // the second and that db names are case-insensitive. + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(db02);") + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(DB01);") + + // TODO(fan): Not implemented yet + // Assert that status shows replication filters + // status := showReplicaStatus(t) + // require.Equal(t, "db01", status["Replicate_Do_DB"]) + // require.Equal(t, "", status["Replicate_Ignore_DB"]) + + // Make changes on the primary + primaryDatabase.MustExec("CREATE DATABASE db02;") + primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);") + primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);") + for i := 1; i < 12; i++ { + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i)) + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i)) + } + + // Pause to let the replica catch up + waitForReplicaToCatchUp(t) + + // Verify that all changes from db01 were applied on the replica + rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "11", row["count"]) + require.NoError(t, rows.Close()) + + // Verify that no changes from db02 were applied on the replica + rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "0", row["count"]) + require.NoError(t, rows.Close()) +} diff --git a/catalog/database.go b/catalog/database.go index 13ee6541..76d214f3 100644 --- a/catalog/database.go +++ b/catalog/database.go @@ -95,7 +95,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table } func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error) { - rows, err := adapter.QueryCatalog(ctx, "SELECT DISTINCT table_name, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern) + rows, err := adapter.QueryCatalog(ctx, "SELECT table_name, has_primary_key, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern) if err != nil { return nil, ErrDuckDB.New(err) } @@ -104,11 +104,12 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error var tbls []*Table for rows.Next() { var tblName string + var hasPrimaryKey bool var comment stdsql.NullString - if err := rows.Scan(&tblName, &comment); err != nil { + if err := rows.Scan(&tblName, &hasPrimaryKey, &comment); err != nil { return nil, ErrDuckDB.New(err) } - t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String)) + t := NewTable(d, tblName, hasPrimaryKey).withComment(DecodeComment[ExtraTableInfo](comment.String)) tbls = append(tbls, t) } if err := rows.Err(); err != nil { diff --git a/catalog/inserter.go b/catalog/inserter.go index b3ddeec0..46c49474 100644 --- a/catalog/inserter.go +++ b/catalog/inserter.go @@ -15,6 +15,7 @@ type rowInserter struct { db string table string schema sql.Schema + hasPK bool replace bool once sync.Once @@ -69,7 +70,7 @@ func (ri *rowInserter) init(ctx *sql.Context) { insert.Reset() insert.WriteString("INSERT ") - if ri.replace { + if ri.replace && ri.hasPK { insert.WriteString(" OR REPLACE") } insert.WriteString(" INTO ") diff --git a/catalog/table.go b/catalog/table.go index f92c8ca3..fa5d6e2a 100644 --- a/catalog/table.go +++ b/catalog/table.go @@ -16,11 +16,14 @@ import ( ) type Table struct { - mu *sync.RWMutex - name string + mu sync.RWMutex db *Database - comment *Comment[ExtraTableInfo] // save the comment to avoid querying duckdb everytime + name string + comment *Comment[ExtraTableInfo] // save the comment to avoid querying duckdb every time schema sql.PrimaryKeySchema + + // Whether the table has a physical primary key. + hasPrimaryKey bool } type ExtraTableInfo struct { @@ -59,11 +62,12 @@ var _ sql.AutoIncrementTable = (*Table)(nil) var _ sql.CheckTable = (*Table)(nil) var _ sql.CheckAlterableTable = (*Table)(nil) -func NewTable(name string, db *Database) *Table { +func NewTable(db *Database, name string, hasPrimaryKey bool) *Table { return &Table{ - mu: &sync.RWMutex{}, - name: name, db: db, + name: name, + + hasPrimaryKey: hasPrimaryKey, } } @@ -96,6 +100,10 @@ func (t *Table) ExtraTableInfo() ExtraTableInfo { return t.comment.Meta } +func (t *Table) HasPrimaryKey() bool { + return t.hasPrimaryKey +} + // Collation implements sql.Table. func (t *Table) Collation() sql.CollationID { return sql.Collation_Default @@ -293,6 +301,7 @@ func (t *Table) AddColumn(ctx *sql.Context, column *sql.Column, order *sql.Colum } // Update the PK ordinals only after the column is successfully added. if column.PrimaryKey { + t.hasPrimaryKey = true t.comment.Meta.PkOrdinals = tableInfo.PkOrdinals } return t.withSchema(ctx) @@ -457,6 +466,7 @@ func (t *Table) ModifyColumn(ctx *sql.Context, columnName string, column *sql.Co // Update table metadata if column.PrimaryKey { + t.hasPrimaryKey = true t.comment.Meta.PkOrdinals = []int{oldColumnIndex} } if !oldColumn.AutoIncrement && column.AutoIncrement { @@ -510,6 +520,7 @@ func (t *Table) Inserter(*sql.Context) sql.RowInserter { db: t.db.Name(), table: t.name, schema: t.schema.Schema, + hasPK: t.hasPrimaryKey, } } @@ -535,6 +546,7 @@ func (t *Table) Replacer(*sql.Context) sql.RowReplacer { db: t.db.Name(), table: t.name, schema: t.schema.Schema, + hasPK: t.hasPrimaryKey, replace: hasKey, } } diff --git a/devtools/replica-setup-mysql/snapshot.sh b/devtools/replica-setup-mysql/snapshot.sh index 401565ed..164a5504 100644 --- a/devtools/replica-setup-mysql/snapshot.sh +++ b/devtools/replica-setup-mysql/snapshot.sh @@ -34,6 +34,21 @@ THREAD_COUNT=$(( 2 * CORE_COUNT )) echo "Detected core count: $CORE_COUNT" echo "Thread count set to: $THREAD_COUNT" +# Prepare filter options +FILTER_OPTIONS="" +if [ -n "$INCLUDE_SCHEMAS" ]; then + FILTER_OPTIONS="$FILTER_OPTIONS --include-schemas $INCLUDE_SCHEMAS" +fi +if [ -n "$EXCLUDE_SCHEMAS" ]; then + FILTER_OPTIONS="$FILTER_OPTIONS --exclude-schemas $EXCLUDE_SCHEMAS" +fi +if [ -n "$INCLUDE_TABLES" ]; then + FILTER_OPTIONS="$FILTER_OPTIONS --include-tables $INCLUDE_TABLES" +fi +if [ -n "$EXCLUDE_TABLES" ]; then + FILTER_OPTIONS="$FILTER_OPTIONS --exclude-tables $EXCLUDE_TABLES" +fi + echo "Copying data from MySQL to MyDuck..." # Run mysqlsh command and capture the output output=$(mysqlsh --uri "$SOURCE_DSN" $SOURCE_PASSWORD_OPTION -- util copy-instance "mysql://${MYDUCK_USER}:${MYDUCK_PASSWORD}@${MYDUCK_HOST}:${MYDUCK_PORT}" \ @@ -46,6 +61,7 @@ output=$(mysqlsh --uri "$SOURCE_DSN" $SOURCE_PASSWORD_OPTION -- util copy-instan --ignore-version true \ --load-indexes false \ --defer-table-indexes all \ + ${FILTER_OPTIONS} \ ) if [[ $GTID_MODE == "ON" ]]; then diff --git a/devtools/replica-setup-mysql/start_replication.sh b/devtools/replica-setup-mysql/start_replication.sh index 58d19ae2..4453ba21 100644 --- a/devtools/replica-setup-mysql/start_replication.sh +++ b/devtools/replica-setup-mysql/start_replication.sh @@ -23,8 +23,30 @@ if [ $GTID_MODE == "OFF" ]; then SOURCE_LOG_POS=${BINLOG_POS}" fi +# Prepare replication filters +REPLICATION_FILTERS="" +if [ -n "$INCLUDE_SCHEMAS" ]; then + REPLICATION_FILTERS="$REPLICATION_FILTERS REPLICATE_DO_DB=(${INCLUDE_SCHEMAS})" +fi +if [ -n "$EXCLUDE_SCHEMAS" ]; then + REPLICATION_FILTERS="$REPLICATION_FILTERS REPLICATE_IGNORE_DB=(${EXCLUDE_SCHEMAS})" +fi +if [ -n "$INCLUDE_TABLES" ]; then + REPLICATION_FILTERS="$REPLICATION_FILTERS REPLICATE_DO_TABLE=(${INCLUDE_TABLES})" +fi +if [ -n "$EXCLUDE_TABLES" ]; then + REPLICATION_FILTERS="$REPLICATION_FILTERS REPLICATE_IGNORE_TABLE=(${EXCLUDE_TABLES})" +fi + +# Add filter command if any filters are set +CHANGE_REPLICATION_CMD=${CHANGE_REPLICATION_CMD:-} +if [ -n "$REPLICATION_FILTERS" ]; then + CHANGE_REPLICATION_CMD="CHANGE REPLICATION FILTER ${REPLICATION_FILTERS};" +fi + mysqlsh --sql --host=${MYDUCK_HOST} --port=${MYDUCK_PORT} --user=${MYDUCK_USER} ${MYDUCK_PASSWORD_OPTION} <