diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml new file mode 100644 index 00000000000..4d0d1cb24a8 --- /dev/null +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml @@ -0,0 +1,155 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (onlineddl_vrepl_bench) +on: + push: + branches: + - "main" + - "release-[0-9]+.[0-9]" + tags: '**' + pull_request: + branches: '**' +concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (onlineddl_vrepl_bench)') + cancel-in-progress: true + +permissions: read-all + +env: + LAUNCHABLE_ORGANIZATION: "vitess" + LAUNCHABLE_WORKSPACE: "vitess-app" + GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" + +jobs: + build: + timeout-minutes: 60 + name: Run endtoend tests on Cluster (onlineddl_vrepl_bench) + runs-on: oracle-vm-16cpu-64gb-x86-64 + + steps: + - name: Skip CI + run: | + if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then + echo "skipping CI due to the 'Skip CI' label" + exit 1 + fi + + - name: Check out code + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + persist-credentials: 'false' + + - name: Check for changes in relevant files + uses: dorny/paths-filter@ebc4d7e9ebcb0b1eb21480bb8f43113e996ac77a # v3.0.1 + id: changes + with: + token: '' + filters: | + end_to_end: + - 'test/config.json' + - 'go/**/*.go' + - 'go/vt/sidecardb/**/*.sql' + - 'go/test/endtoend/onlineddl/vrepl_suite/**' + - 'test.go' + - 'Makefile' + - 'build.env' + - 'go.sum' + - 'go.mod' + - 'proto/*.proto' + - 'tools/**' + - 'config/**' + - 'bootstrap.sh' + - '.github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml' + - 'go/test/endtoend/onlineddl/vrepl_suite/testdata' + + - name: Set up Go + if: steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0 + with: + go-version-file: go.mod + + - name: Set up python + if: steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + + - name: Tune the OS + if: steps.changes.outputs.end_to_end == 'true' + uses: ./.github/actions/tune-os + + - name: Setup MySQL + if: steps.changes.outputs.end_to_end == 'true' + uses: ./.github/actions/setup-mysql + with: + flavor: mysql-8.4 + + - name: Get dependencies + if: steps.changes.outputs.end_to_end == 'true' + timeout-minutes: 10 + run: | + + sudo apt-get -qq install -y mysql-shell + + # Install everything else we need, and configure + sudo apt-get -qq install -y make unzip g++ etcd-client etcd-server curl git wget xz-utils libncurses6 + + sudo service etcd stop + + go mod download + + # install JUnit report formatter + go install github.com/vitessio/go-junit-report@99fa7f0daf16db969f54a49139a14471e633e6e8 # HEAD + + - name: Setup launchable dependencies + if: github.event_name == 'pull_request' && github.event.pull_request.draft == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' + run: | + # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up + pip3 install --user launchable~=1.0 > /dev/null + + # verify that launchable setup is all correct. + launchable verify || true + + # Tell Launchable about the build you are producing and testing + launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source . + + - name: Run cluster endtoend test + if: steps.changes.outputs.end_to_end == 'true' + timeout-minutes: 45 + run: | + # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file + # which musn't be more than 107 characters long. + export VTDATAROOT="/tmp/" + source build.env + + set -exo pipefail + + cat <<-EOF>>./config/mycnf/mysql84.cnf + binlog-transaction-compression=ON + EOF + + cat <<-EOF>>./config/mycnf/mysql84.cnf + binlog-row-value-options=PARTIAL_JSON + EOF + + # Some of these tests require specific locales to be installed. + # See https://github.com/cncf/automation/commit/49f2ad7a791a62ff7d038002bbb2b1f074eed5d5 + # run the tests however you normally do, then produce a JUnit XML file + go run test.go -docker=false -follow -shard onlineddl_vrepl_bench | tee -a output.txt | go-junit-report -set-exit-code > report.xml + + - name: Record test results in launchable if PR is not a draft + if: github.event_name == 'pull_request' && github.event.pull_request.draft == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' && !cancelled() + run: | + # send recorded tests to launchable + launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + + - name: Print test output + if: steps.changes.outputs.end_to_end == 'true' && !cancelled() + run: | + # print test output + cat output.txt + + - name: Test Summary + if: steps.changes.outputs.end_to_end == 'true' && !cancelled() + uses: test-summary/action@31493c76ec9e7aa675f1585d3ed6f1da69269a86 # v2.4 + with: + paths: "report.xml" + show: "fail" diff --git a/config/mycnf/mysql8026.cnf b/config/mycnf/mysql8026.cnf index 05e2fbc586e..8f63d122014 100644 --- a/config/mycnf/mysql8026.cnf +++ b/config/mycnf/mysql8026.cnf @@ -37,3 +37,9 @@ super-read-only # Replication parameters to ensure reparents are fast. replica_net_timeout = 8 + +binlog_transaction_dependency_tracking=WRITESET +slave_preserve_commit_order=ON +slave_parallel_type=LOGICAL_CLOCK +transaction_write_set_extraction=XXHASH64 + diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh index 22938c5cecd..0ca95726ee2 100755 --- a/examples/common/scripts/vttablet-up.sh +++ b/examples/common/scripts/vttablet-up.sh @@ -55,6 +55,7 @@ vttablet \ --pid-file $VTDATAROOT/$tablet_dir/vttablet.pid \ --heartbeat-on-demand-duration=5s \ --pprof-http \ + --vreplication_experimental_flags=15 \ > $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 & # Block waiting for the tablet to be listening diff --git a/go/test/endtoend/onlineddl/vrepl_bench/onlineddl_vrepl_mini_bench_test.go b/go/test/endtoend/onlineddl/vrepl_bench/onlineddl_vrepl_mini_bench_test.go new file mode 100644 index 00000000000..ad00dd90ff4 --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_bench/onlineddl_vrepl_mini_bench_test.go @@ -0,0 +1,896 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplstress + +import ( + "context" + "flag" + "fmt" + "math/rand/v2" + "os" + "path" + "runtime" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/protoutil" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" + "vitess.io/vitess/go/vt/log" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/schema" + vttablet "vitess.io/vitess/go/vt/vttablet/common" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" +) + +type WriteMetrics struct { + mu sync.Mutex + insertsAttempts, insertsFailures, insertsNoops, inserts int64 + updatesAttempts, updatesFailures, updatesNoops, updates int64 + deletesAttempts, deletesFailures, deletesNoops, deletes int64 +} + +func (w *WriteMetrics) Clear() { + w.mu.Lock() + defer w.mu.Unlock() + + w.inserts = 0 + w.updates = 0 + w.deletes = 0 + + w.insertsAttempts = 0 + w.insertsFailures = 0 + w.insertsNoops = 0 + + w.updatesAttempts = 0 + w.updatesFailures = 0 + w.updatesNoops = 0 + + w.deletesAttempts = 0 + w.deletesFailures = 0 + w.deletesNoops = 0 +} + +func (w *WriteMetrics) String() string { + return fmt.Sprintf(`WriteMetrics: inserts-deletes=%d, updates-deletes=%d, +insertsAttempts=%d, insertsFailures=%d, insertsNoops=%d, inserts=%d, +updatesAttempts=%d, updatesFailures=%d, updatesNoops=%d, updates=%d, +deletesAttempts=%d, deletesFailures=%d, deletesNoops=%d, deletes=%d, +`, + w.inserts-w.deletes, w.updates-w.deletes, + w.insertsAttempts, w.insertsFailures, w.insertsNoops, w.inserts, + w.updatesAttempts, w.updatesFailures, w.updatesNoops, w.updates, + w.deletesAttempts, w.deletesFailures, w.deletesNoops, w.deletes, + ) +} + +var ( + clusterInstance *cluster.LocalProcessCluster + shards []cluster.Shard + vtParams mysql.ConnParams + primaryTablet *cluster.Vttablet + replicaTablet *cluster.Vttablet + throttlerCheckOK atomic.Bool + + opOrder int64 + opOrderMutex sync.Mutex + idSequence atomic.Int32 + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + schemaChangeDirectory = "" + tableName = `stress_test` + cleanupStatements = []string{ + `DROP TABLE IF EXISTS stress_test`, + `DROP TABLE IF EXISTS t1`, + } + createStatement = ` + CREATE TABLE stress_test ( + id bigint not null, + rand_val varchar(32) null default '', + op_order bigint unsigned not null default 0, + hint_col varchar(64) not null default '', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key created_idx(created_timestamp), + key updates_idx(updates) + ) ENGINE=InnoDB; + CREATE TABLE t1 ( + id bigint not null, + i int not null default 0, + PRIMARY KEY (id) + ) ENGINE=InnoDB; + ` + alterHintStatement = ` + ALTER TABLE stress_test modify hint_col varchar(64) not null default '%s' + ` + insertRowStatement = ` + INSERT IGNORE INTO stress_test (id, rand_val, op_order) VALUES (%d, left(md5(rand()), 8), %d) + ` + updateRowStatement = ` + UPDATE stress_test SET op_order=%d, updates=updates+1 WHERE id=%d + ` + deleteRowStatement = ` + DELETE FROM stress_test WHERE id=%d AND updates=1 + ` + selectMaxOpOrder = ` + SELECT MAX(op_order) as m FROM stress_test + ` + // We use CAST(SUM(updates) AS SIGNED) because SUM() returns a DECIMAL datatype, and we want to read a SIGNED INTEGER type + selectCountRowsStatement = ` + SELECT COUNT(*) AS num_rows, CAST(SUM(updates) AS SIGNED) AS sum_updates FROM stress_test + ` + truncateStatement = ` + TRUNCATE TABLE stress_test + ` + writeMetrics WriteMetrics +) + +var ( + countIterations = 1 +) + +const ( + maxTableRows = 4096 + workloadDuration = 45 * time.Second + migrationWaitTimeout = 60 * time.Second + // wlType = insertBatchWorkloadType + wlType = mixedWorkloadType + throttleWorkload = true +) + +type workloadType int + +const ( + mixedWorkloadType workloadType = iota + insertWorkloadType + insertBatchWorkloadType +) + +func resetOpOrder() { + opOrderMutex.Lock() + defer opOrderMutex.Unlock() + opOrder = 0 +} + +func nextOpOrder() int64 { + opOrderMutex.Lock() + defer opOrderMutex.Unlock() + opOrder++ + return opOrder +} + +func TestMain(m *testing.M) { + flag.Parse() + + exitcode, err := func() (int, error) { + clusterInstance = cluster.NewCluster(cell, hostname) + schemaChangeDirectory = path.Join("/tmp", fmt.Sprintf("schema_change_dir_%d", clusterInstance.GetAndReserveTabletUID())) + defer os.RemoveAll(schemaChangeDirectory) + defer clusterInstance.Teardown() + + if _, err := os.Stat(schemaChangeDirectory); os.IsNotExist(err) { + _ = os.Mkdir(schemaChangeDirectory, 0700) + } + + clusterInstance.VtctldExtraArgs = []string{ + "--schema_change_dir", schemaChangeDirectory, + "--schema_change_controller", "local", + "--schema_change_check_interval", "1s", + } + + clusterInstance.VtTabletExtraArgs = []string{ + // "--relay_log_max_items", "10000000", + // "--relay_log_max_size", "1000000000", + "--heartbeat_interval", "250ms", + "--heartbeat_on_demand_duration", fmt.Sprintf("%v", migrationWaitTimeout*2), + "--migration_check_interval", "1.1s", + "--watch_replication_stream", + "--pprof-http", + // Test VPlayer batching mode. + fmt.Sprintf("--vreplication_experimental_flags=%d", + // vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts), + // vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching), + vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching|vttablet.VReplicationExperimentalFlagVPlayerParallel), + } + clusterInstance.VtGateExtraArgs = []string{ + "--ddl_strategy", "online", + } + + if err := clusterInstance.StartTopo(); err != nil { + return 1, err + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + } + + // No need for replicas in this stress test + if err := clusterInstance.StartKeyspace(*keyspace, []string{"1"}, 0, false); err != nil { + return 1, err + } + + vtgateInstance := clusterInstance.NewVtgateInstance() + // Start vtgate + if err := vtgateInstance.Setup(); err != nil { + return 1, err + } + // ensure it is torn down during cluster TearDown + clusterInstance.VtgateProcess = *vtgateInstance + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + + primaryTablet = clusterInstance.Keyspaces[0].Shards[0].Vttablets[0] + if len(clusterInstance.Keyspaces[0].Shards[0].Vttablets) > 1 { + replicaTablet = clusterInstance.Keyspaces[0].Shards[0].Vttablets[1] + } + return m.Run(), nil + }() + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } else { + os.Exit(exitcode) + } +} + +// trackVreplicationLag is used as a helper function to track vreplication lag and print progress to standard output. +func trackVreplicationLag(t *testing.T, ctx context.Context, workloadCtx context.Context, uuid string) { + reportTicker := time.NewTicker(1 * time.Second) + defer reportTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-workloadCtx.Done(): + return + case <-reportTicker.C: + } + func() { + query := fmt.Sprintf(`select time_updated, transaction_timestamp from _vt.vreplication where workflow='%s'`, uuid) + rs, err := primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + row := rs.Named().Row() + if row == nil { + return + } + + durationDiff := func(t1, t2 time.Time) time.Duration { + return t1.Sub(t2).Abs() + } + timeNow := time.Now() + timeUpdated := time.Unix(row.AsInt64("time_updated", 0), 0) + transactionTimestamp := time.Unix(row.AsInt64("transaction_timestamp", 0), 0) + vreplicationLag := max(durationDiff(timeNow, timeUpdated), durationDiff(timeNow, transactionTimestamp)) + fmt.Printf("vreplication lag: %ds\n", int64(vreplicationLag.Seconds())) + }() + } +} + +func TestVreplMiniStressSchemaChanges(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + shards = clusterInstance.Keyspaces[0].Shards + require.Equal(t, 1, len(shards)) + + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance) + + t.Run("validate config on primary", func(t *testing.T) { + // Validate the config + conn, err := primaryTablet.VttabletProcess.TabletConn(keyspaceName, true) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("set @@global.binlog_transaction_dependency_tracking='WRITESET'", 1, true) + require.NoError(t, err) + + { + rs, err := conn.ExecuteFetch("select @@global.binlog_transaction_dependency_tracking as v", 1, true) + require.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + t.Logf("binlog_transaction_dependency_tracking: %v", row.AsString("v", "")) + } + _, err = conn.ExecuteFetch("set @@global.replica_preserve_commit_order=1", 1, true) + require.NoError(t, err) + }) + t.Run("validate config on replica", func(t *testing.T) { + if replicaTablet == nil { + t.SkipNow() + } + // Validate the config + conn, err := replicaTablet.VttabletProcess.TabletConn(keyspaceName, true) + require.NoError(t, err) + defer conn.Close() + { + rs, err := conn.ExecuteFetch("select @@global.replica_parallel_workers as val", 1, true) + require.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + parallelWorkers := row.AsInt64("val", 0) + require.Positive(t, parallelWorkers) + t.Logf("replica_parallel_workers: %v", parallelWorkers) + } + { + rs, err := conn.ExecuteFetch("select @@global.replica_preserve_commit_order as val", 1, true) + require.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + preserveCommitOrder := row.AsInt64("val", 0) + require.Positive(t, preserveCommitOrder) + t.Logf("replica_preserve_commit_order: %v", preserveCommitOrder) + } + _, err = conn.ExecuteFetch("set @@global.binlog_transaction_dependency_tracking='WRITESET'", 1, true) + require.NoError(t, err) + }) + + throttlerCheckOK.Store(true) + + results := []time.Duration{} + for i := 0; i < countIterations; i++ { + // Finally, this is the real test: + // We populate a table, and begin a concurrent workload (this is the "mini stress") + // We then ALTER TABLE via vreplication. + // Once convinced ALTER TABLE is complete, we stop the workload. + // We then compare expected metrics with table metrics. If they agree, then + // the vreplication/ALTER TABLE did not corrupt our data and we are happy. + testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations) + t.Run(testName, func(t *testing.T) { + t.Run("create schema", func(t *testing.T) { + testWithInitialSchema(t) + }) + t.Run("init table", func(t *testing.T) { + initTable(t) + }) + + var uuid string + t.Run("start migration", func(t *testing.T) { + hint := fmt.Sprintf("hint-alter-with-workload-%d", i) + uuid = testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), "online --postpone-completion --force-cut-over-after=1ns", "", true) + }) + t.Run("wait for ready_to_complete", func(t *testing.T) { + waitForReadyToComplete(t, uuid, true) + }) + t.Run("throttle online-ddl", func(t *testing.T) { + if !throttleWorkload { + return + } + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.VPlayerName, false) + // onlineddl.ThrottleAllMigrations(t, &vtParams) + + appRule := &topodatapb.ThrottledAppRule{ + Name: throttlerapp.VPlayerName.String(), + Ratio: throttle.DefaultThrottleRatio, + ExpiresAt: protoutil.TimeToProto(time.Now().Add(time.Hour)), + } + req := &vtctldatapb.UpdateThrottlerConfigRequest{Threshold: throttler.DefaultThreshold.Seconds()} + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, appRule, nil) + assert.NoError(t, err) + + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.VPlayerName, true) + throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.VPlayerName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_APP_DENIED, time.Minute) + }) + readPos := func(t *testing.T) { + { + rs, err := primaryTablet.VttabletProcess.QueryTablet("select @@global.gtid_executed", keyspaceName, true) + require.NoError(t, err) + t.Logf("gtid executed: %v", rs.Rows[0][0].ToString()) + } + { + query := fmt.Sprintf("select pos from _vt.vreplication where workflow='%s'", uuid) + rs, err := primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + require.NotEmpty(t, rs.Rows) + t.Logf("vreplication pos: %v", rs.Rows[0][0].ToString()) + } + } + t.Run("read pos", func(t *testing.T) { + readPos(t) + }) + t.Run(fmt.Sprintf("start workload: %v", workloadDuration), func(t *testing.T) { + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.VPlayerName, throttleWorkload) + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + // runMultipleConnections(ctx, t, mixedWorkloadType) + // runMultipleConnections(ctx, t, insertWorkloadType) + runMultipleConnections(ctx, t) + }) + t.Run("read pos", func(t *testing.T) { + readPos(t) + }) + t.Run("mark for completion", func(t *testing.T) { + onlineddl.CheckCompleteAllMigrations(t, &vtParams, 1) + }) + t.Run("validate throttler at end of workload", func(t *testing.T) { + if !throttleWorkload { + return + } + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.VPlayerName, true) + throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.VPlayerName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_APP_DENIED, time.Second) + }) + var startTime = time.Now() + t.Run("unthrottle online-ddl", func(t *testing.T) { + if !throttleWorkload { + return + } + // onlineddl.UnthrottleAllMigrations(t, &vtParams) + + appRule := &topodatapb.ThrottledAppRule{ + Name: throttlerapp.VPlayerName.String(), + ExpiresAt: protoutil.TimeToProto(time.Now()), + } + req := &vtctldatapb.UpdateThrottlerConfigRequest{Threshold: throttler.DefaultThreshold.Seconds()} + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, appRule, nil) + assert.NoError(t, err) + + if !onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.VPlayerName, false) { + status, err := throttler.GetThrottlerStatus(&clusterInstance.VtctldClientProcess, primaryTablet) + assert.NoError(t, err) + + t.Logf("Throttler status: %+v", status) + } + }) + t.Run("read pos", func(t *testing.T) { + readPos(t) + }) + t.Run("wait for migration to complete", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go trackVreplicationLag(t, ctx, ctx, uuid) + + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + t.Logf("# Migration status (for debug purposes): <%s>", status) + if !onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) { + query := fmt.Sprintf("select * from _vt.vreplication where workflow='%s'", uuid) + rs, err := primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, false) + require.NoError(t, err) + require.NotEmpty(t, rs.Rows) + t.Logf("vreplication: %v", rs.Rows[0]) + } + + onlineddl.CheckCancelAllMigrations(t, &vtParams, -1) + }) + t.Run("read pos", func(t *testing.T) { + readPos(t) + }) + endTime := time.Now() + results = append(results, endTime.Sub(startTime)) + t.Logf(":::::::::::::::::::: Workload catchup took %v ::::::::::::::::::::", endTime.Sub(startTime)) + t.Run("cleanup", func(t *testing.T) { + throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.VPlayerName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, time.Minute) + }) + t.Run("validate metrics", func(t *testing.T) { + testSelectTableMetrics(t) + }) + t.Run("sleep", func(t *testing.T) { + time.Sleep(time.Minute * 3) + }) + }) + } + + t.Run("summary", func(t *testing.T) { + t.Logf(":::::::::::::::::::: Workload catchup took: %+v ::::::::::::::::::::", results) + }) +} + +func testWithInitialSchema(t *testing.T) { + for _, statement := range cleanupStatements { + err := clusterInstance.VtctldClientProcess.ApplySchema(keyspaceName, statement) + require.NoError(t, err) + } + // Create the stress table + err := clusterInstance.VtctldClientProcess.ApplySchema(keyspaceName, createStatement) + require.NoError(t, err) + + // Check if table is created + checkTable(t, tableName) +} + +func waitForReadyToComplete(t *testing.T, uuid string, expected bool) bool { + ctx, cancel := context.WithTimeout(context.Background(), migrationWaitTimeout) + defer cancel() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + readyToComplete := row.AsInt64("ready_to_complete", 0) + if expected == (readyToComplete > 0) { + // all good. This is what we waited for + if expected { + // if migration is ready to complete, the timestamp should be non-null + assert.False(t, row["ready_to_complete_timestamp"].IsNull()) + } else { + assert.True(t, row["ready_to_complete_timestamp"].IsNull()) + } + return true + } + } + select { + case <-ticker.C: + case <-ctx.Done(): + assert.NoError(t, ctx.Err(), "timeout waiting for ready_to_complete") + return false + } + } +} + +// testOnlineDDLStatement runs an online DDL, ALTER statement +func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, expectHint string, skipWait bool) (uuid string) { + row := onlineddl.VtgateExecDDL(t, &vtParams, ddlStrategy, alterStatement, "").Named().Row() + require.NotNil(t, row) + uuid = row.AsString("uuid", "") + uuid = strings.TrimSpace(uuid) + require.NotEmpty(t, uuid) + t.Logf("# Generated UUID (for debug purposes):") + t.Logf("<%s>", uuid) + + strategySetting, err := schema.ParseDDLStrategy(ddlStrategy) + assert.NoError(t, err) + + if !strategySetting.Strategy.IsDirect() && !skipWait && uuid != "" { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + t.Logf("# Migration status (for debug purposes): <%s>", status) + } + + if expectHint != "" { + checkMigratedTable(t, tableName, expectHint) + } + return uuid +} + +// checkTable checks the number of tables in the first two shards. +func checkTable(t *testing.T, showTableName string) { + for i := range clusterInstance.Keyspaces[0].Shards { + checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], showTableName, 1) + } +} + +// checkTablesCount checks the number of tables in the given tablet +func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName string, expectCount int) { + query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + rowcount := 0 + + for { + queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + rowcount = len(queryResult.Rows) + if rowcount > 0 { + break + } + + select { + case <-ticker.C: + continue // Keep looping + case <-ctx.Done(): + // Break below to the assertion + } + + break + } + + assert.Equal(t, expectCount, rowcount) +} + +// checkMigratedTables checks the CREATE STATEMENT of a table after migration +func checkMigratedTable(t *testing.T, tableName, expectHint string) { + for i := range clusterInstance.Keyspaces[0].Shards { + createStatement := getCreateTableStatement(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], tableName) + assert.Contains(t, createStatement, expectHint) + } +} + +// getCreateTableStatement returns the CREATE TABLE statement for a given table +func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName string) (statement string) { + queryResult, err := tablet.VttabletProcess.QueryTablet(fmt.Sprintf("show create table %s;", tableName), keyspaceName, true) + require.NoError(t, err) + + assert.Equal(t, len(queryResult.Rows), 1) + assert.Equal(t, len(queryResult.Rows[0]), 2) // table name, create statement + statement = queryResult.Rows[0][1].ToString() + return statement +} + +func generateInsert(t *testing.T, conn *mysql.Conn) error { + id := idSequence.Add(1) + if wlType != insertBatchWorkloadType { + id = rand.Int32N(int32(maxTableRows)) + } + query := fmt.Sprintf(insertRowStatement, id, nextOpOrder()) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.insertsAttempts++ + if err != nil { + writeMetrics.insertsFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.insertsNoops++ + return + } + writeMetrics.inserts++ + }() + { + id := rand.Int32N(int32(maxTableRows)) + query := fmt.Sprintf("insert into t1 values (%d, %d)", id, id) + _, _ = conn.ExecuteFetch(query, 1000, true) + } + return err +} + +func generateUpdate(t *testing.T, conn *mysql.Conn) error { + id := rand.Int32N(int32(maxTableRows)) + query := fmt.Sprintf(updateRowStatement, nextOpOrder(), id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.updatesAttempts++ + if err != nil { + writeMetrics.updatesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.updatesNoops++ + return + } + writeMetrics.updates++ + }() + { + id := rand.Int32N(int32(maxTableRows)) + query := fmt.Sprintf("update t1 set i=i+1 where id=%d", id) + _, _ = conn.ExecuteFetch(query, 1000, true) + } + return err +} + +func generateDelete(t *testing.T, conn *mysql.Conn) error { + id := rand.Int32N(int32(maxTableRows)) + query := fmt.Sprintf(deleteRowStatement, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.deletesAttempts++ + if err != nil { + writeMetrics.deletesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.deletesNoops++ + return + } + writeMetrics.deletes++ + }() + { + id := rand.Int32N(int32(maxTableRows)) + query := fmt.Sprintf("delete from t1 where id=%d", id) + _, _ = conn.ExecuteFetch(query, 1000, true) + } + return err +} + +func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { + log.Infof("Running single connection") + + // conn, err := mysql.Connect(ctx, &vtParams) + conn, err := primaryTablet.VttabletProcess.TabletConn(keyspaceName, true) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("set innodb_lock_wait_timeout=5", 1000, true) + require.NoError(t, err) + _, err = conn.ExecuteFetch("set autocommit=1", 1000, true) + require.NoError(t, err) + _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) + require.NoError(t, err) + + ticker := time.NewTicker(sleepInterval) + defer ticker.Stop() + + log.Infof("+- Starting single connection") + defer log.Infof("+- Terminating single connection") + for { + select { + case <-ctx.Done(): + log.Infof("runSingleConnection context timeout") + return + case <-ticker.C: + } + if !throttlerCheckOK.Load() { + continue + } + switch wlType { + case mixedWorkloadType: + switch rand.Int32N(3) { + case 0: + err = generateInsert(t, conn) + case 1: + err = generateUpdate(t, conn) + case 2: + err = generateDelete(t, conn) + } + assert.Nil(t, err) + case insertWorkloadType: + err = generateInsert(t, conn) + assert.Nil(t, err) + case insertBatchWorkloadType: + _, err := conn.ExecuteFetch("begin", 1, false) + assert.Nil(t, err) + for range 50 { + err = generateInsert(t, conn) + assert.Nil(t, err) + } + _, err = conn.ExecuteFetch("commit", 1, false) + assert.Nil(t, err) + } + } +} + +func runMultipleConnections(ctx context.Context, t *testing.T) { + // The workload for a 16 vCPU machine is: + // - Concurrency of 16 + // - 2ms interval between queries for each connection + // As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine + // we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms. + maxConcurrency := runtime.NumCPU() + + { + // Unlike similar stress tests, here we're not looking to be any mercyful. We're not looking to have + // replication capacity. We want to hammer as much as possible, then wait for catchup time. We therefore + // use a minimal sleep interval. + // + // sleepModifier := 16.0 / float64(maxConcurrency) + // baseSleepInterval := 2 * time.Millisecond + // singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier + // sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) + } + sleepInterval := time.Microsecond + + log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) + var wg sync.WaitGroup + for i := 0; i < maxConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + runSingleConnection(ctx, t, sleepInterval) + }() + } + flushBinlogs := func() { + // Flushing binlogs on primary restarts imposes more challenges to parallel vplayer + // because a binlog rotation is a parallellism barrier: all events from previous binlog + // must be consumed before starting to apply events in new binlog. + conn, err := primaryTablet.VttabletProcess.TabletConn(keyspaceName, true) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("flush binary logs", 1000, true) + require.NoError(t, err) + } + time.AfterFunc(200*time.Millisecond, flushBinlogs) + time.AfterFunc(400*time.Millisecond, flushBinlogs) + wg.Wait() + log.Infof("Running multiple connections: done") +} + +func initTable(t *testing.T) { + log.Infof("initTable begin") + defer log.Infof("initTable complete") + + t.Run("cancel pending migrations", func(t *testing.T) { + cancelQuery := "alter vitess_migration cancel all" + r := onlineddl.VtgateExecQuery(t, &vtParams, cancelQuery, "") + if r.RowsAffected > 0 { + fmt.Printf("# Cancelled migrations (for debug purposes): %d\n", r.RowsAffected) + } + }) + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + resetOpOrder() + writeMetrics.Clear() + _, err = conn.ExecuteFetch(truncateStatement, 1000, true) + require.NoError(t, err) + + for i := 0; i < maxTableRows/2; i++ { + generateInsert(t, conn) + } + if wlType != insertBatchWorkloadType { + for i := 0; i < maxTableRows/4; i++ { + generateUpdate(t, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateDelete(t, conn) + } + } +} + +func testSelectTableMetrics(t *testing.T) { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + { + rs := onlineddl.VtgateExecQuery(t, &vtParams, selectMaxOpOrder, "") + row := rs.Named().Row() + require.NotNil(t, row) + + maxOpOrder := row.AsInt64("m", 0) + fmt.Printf("# max op_order in table: %d\n", maxOpOrder) + } + + log.Infof("%s", writeMetrics.String()) + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + rs, err := conn.ExecuteFetch(selectCountRowsStatement, 1000, true) + require.NoError(t, err) + + row := rs.Named().Row() + require.NotNil(t, row) + log.Infof("testSelectTableMetrics, row: %v", row) + numRows := row.AsInt64("num_rows", 0) + sumUpdates := row.AsInt64("sum_updates", 0) + assert.NotZero(t, numRows) + assert.NotZero(t, sumUpdates) + assert.NotZero(t, writeMetrics.inserts) + assert.NotZero(t, writeMetrics.deletes) + assert.NotZero(t, writeMetrics.updates) + assert.Equal(t, writeMetrics.inserts-writeMetrics.deletes, numRows) + assert.Equal(t, writeMetrics.updates-writeMetrics.deletes, sumUpdates) // because we DELETE WHERE updates=1 +} diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/datetime-to-timestamp/create.sql b/go/test/endtoend/onlineddl/vrepl_suite/testdata/datetime-to-timestamp/create.sql index 99018d8c798..79cab9748bc 100644 --- a/go/test/endtoend/onlineddl/vrepl_suite/testdata/datetime-to-timestamp/create.sql +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/datetime-to-timestamp/create.sql @@ -11,6 +11,8 @@ create table onlineddl_test ( key i_idx(i) ) auto_increment=1; +insert into onlineddl_test values (null, 7, null, now(), now(), '2010-10-20 07:20:30', 0); + drop event if exists onlineddl_test; delimiter ;; create event onlineddl_test diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 4c12c89e58d..029851f4e38 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -672,6 +672,31 @@ func GenerateUpdatePos(uid int32, pos replication.Position, timeUpdated int64, t "update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v", strGTID, timeUpdated, rowsCopied, uid) } +// GenerateInitWorkerPos returns a statement to initialize a worker's entry in vreplication_worker_pos +func GenerateInitWorkerPos(uid int32, worker int) string { + // Preserve existing row values. Otherwise just ensure the row is there. + return fmt.Sprintf("insert ignore into _vt.vreplication_worker_pos (id, worker, gtid, transaction_timestamp) values (%v, %v, '', 0)", uid, worker) +} + +// GenerateUpdateWorkerPos returns a statement to record the latest processed gtid of a worker in the _vt.vreplication_worker_pos table. +// TODO: +// - compress? +// - txTimestamp? +// - rowsCopied? +// - timeUpdated? +func GenerateUpdateWorkerPos(uid int32, worker int, pos string, transactionTimestamp int64) string { + strGTID := encodeString(pos) + // Append GTID value to already existing value in `gtid` column + if transactionTimestamp == 0 { + return fmt.Sprintf( + "update _vt.vreplication_worker_pos set gtid=GTID_SUBTRACT(concat(gtid, ',', %v), '') where id=%v and worker=%v", + strGTID, uid, worker) + } + return fmt.Sprintf( + "update _vt.vreplication_worker_pos set gtid=GTID_SUBTRACT(concat(gtid, ',', %v), ''), transaction_timestamp=%v where id=%v and worker=%v", + strGTID, transactionTimestamp, uid, worker) +} + // GenerateUpdateRowsCopied returns a statement to update the rows_copied value in the _vt.vreplication table. func GenerateUpdateRowsCopied(uid int32, rowsCopied int64) string { return fmt.Sprintf("update _vt.vreplication set rows_copied=%v where id=%v", rowsCopied, uid) @@ -712,6 +737,11 @@ func DeleteVReplication(uid int32) string { return fmt.Sprintf("delete from _vt.vreplication where id=%v", uid) } +// DeleteVReplication returns a statement to delete the replication. +func DeleteVReplicationWorkerPos(uid int32) string { + return fmt.Sprintf("delete from _vt.vreplication_worker_pos where id=%v", uid) +} + // MessageTruncate truncates the message string to a safe length. func MessageTruncate(msg string) string { // message length is 1000 bytes. @@ -728,6 +758,26 @@ func ReadVReplicationPos(index int32) string { return fmt.Sprintf("select pos from _vt.vreplication where id=%v", index) } +// ReadVReplicationCombinedWorkersGTIDs returns a statement to query the combined GTID +// and the last transaction timestamp from the _vt.vreplication_worker_pos table, for +// a given workflow. +func ReadVReplicationCombinedWorkersGTIDs(index int32) string { + return fmt.Sprintf(` + select + max(@aggregated_gtid) as gtid, + max(transaction_timestamp) as transaction_timestamp + from ( + select + @aggregated_gtid:=gtid_subtract(concat(@aggregated_gtid,',',gtid),'') as running_total, + transaction_timestamp + from + _vt.vreplication_worker_pos, + (select @aggregated_gtid:='') as sel_init + where + id=%v + ) sel_inner`, index) +} + // ReadVReplicationStatus returns a statement to query the status fields for a // given stream from the _vt.vreplication table. func ReadVReplicationStatus(index int32) string { @@ -779,6 +829,16 @@ func DecodePosition(gtid string) (replication.Position, error) { return replication.DecodePosition(gtid) } +// DecodePosition attempts to uncompress the passed value first and if it fails tries to decode it as a valid GTID +func DecodeMySQL56Position(gtid string) (replication.Position, error) { + b := MysqlUncompress(gtid) + if b != nil { + gtid = string(b) + } + pos, _, err := replication.DecodePositionMySQL56(gtid) + return pos, err +} + // StatsHistoryRecord is used to store a Message with timestamp type StatsHistoryRecord struct { Time time.Time diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index a35f5439698..2bc721ad80c 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -1903,7 +1903,13 @@ type VEvent struct { // For GTID events, the sequence number (logical clock) value of this transaction. SequenceNumber int64 `protobuf:"varint,27,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` // EventGTID is decorated by VPlayer. It is the specific GTID (not the GTID set) for this event. - EventGtid string `protobuf:"bytes,28,opt,name=event_gtid,json=eventGtid,proto3" json:"event_gtid,omitempty"` + EventGtid string `protobuf:"bytes,28,opt,name=event_gtid,json=eventGtid,proto3" json:"event_gtid,omitempty"` + // MustSave is a decoration by VPlayer + MustSave bool `protobuf:"varint,29,opt,name=must_save,json=mustSave,proto3" json:"must_save,omitempty"` + // PinWorker is a decoration by parallel VPlayer + PinWorker bool `protobuf:"varint,30,opt,name=pin_worker,json=pinWorker,proto3" json:"pin_worker,omitempty"` + // Skippable is a decoration by parallel VPlayer + Skippable bool `protobuf:"varint,31,opt,name=skippable,proto3" json:"skippable,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2064,6 +2070,27 @@ func (x *VEvent) GetEventGtid() string { return "" } +func (x *VEvent) GetMustSave() bool { + if x != nil { + return x.MustSave + } + return false +} + +func (x *VEvent) GetPinWorker() bool { + if x != nil { + return x.PinWorker + } + return false +} + +func (x *VEvent) GetSkippable() bool { + if x != nil { + return x.Skippable + } + return false +} + type MinimalTable struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -3213,7 +3240,7 @@ const file_binlogdata_proto_rawDesc = "" + "\vshard_gtids\x18\x05 \x03(\v2\x15.binlogdata.ShardGtidR\n" + "shardGtids\x12=\n" + "\fparticipants\x18\x06 \x03(\v2\x19.binlogdata.KeyspaceShardR\fparticipants\x12)\n" + - "\x10source_workflows\x18\a \x03(\tR\x0fsourceWorkflows\"\xa3\x05\n" + + "\x10source_workflows\x18\a \x03(\tR\x0fsourceWorkflows\"\xfd\x05\n" + "\x06VEvent\x12*\n" + "\x04type\x18\x01 \x01(\x0e2\x16.binlogdata.VEventTypeR\x04type\x12\x1c\n" + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12\x12\n" + @@ -3234,7 +3261,11 @@ const file_binlogdata_proto_rawDesc = "" + "\rcommit_parent\x18\x1a \x01(\x03R\fcommitParent\x12'\n" + "\x0fsequence_number\x18\x1b \x01(\x03R\x0esequenceNumber\x12\x1d\n" + "\n" + - "event_gtid\x18\x1c \x01(\tR\teventGtid\"\x8d\x01\n" + + "event_gtid\x18\x1c \x01(\tR\teventGtid\x12\x1b\n" + + "\tmust_save\x18\x1d \x01(\bR\bmustSave\x12\x1d\n" + + "\n" + + "pin_worker\x18\x1e \x01(\bR\tpinWorker\x12\x1c\n" + + "\tskippable\x18\x1f \x01(\bR\tskippable\"\x8d\x01\n" + "\fMinimalTable\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12$\n" + "\x06fields\x18\x02 \x03(\v2\f.query.FieldR\x06fields\x12\x1e\n" + diff --git a/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go b/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go index 7657e849b33..6c38baee9f9 100644 --- a/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go +++ b/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go @@ -515,6 +515,9 @@ func (m *VEvent) CloneVT() *VEvent { r.CommitParent = m.CommitParent r.SequenceNumber = m.SequenceNumber r.EventGtid = m.EventGtid + r.MustSave = m.MustSave + r.PinWorker = m.PinWorker + r.Skippable = m.Skippable if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -2178,6 +2181,42 @@ func (m *VEvent) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Skippable { + i-- + if m.Skippable { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xf8 + } + if m.PinWorker { + i-- + if m.PinWorker { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xf0 + } + if m.MustSave { + i-- + if m.MustSave { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xe8 + } if len(m.EventGtid) > 0 { i -= len(m.EventGtid) copy(dAtA[i:], m.EventGtid) @@ -3941,6 +3980,15 @@ func (m *VEvent) SizeVT() (n int) { if l > 0 { n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.MustSave { + n += 3 + } + if m.PinWorker { + n += 3 + } + if m.Skippable { + n += 3 + } n += len(m.unknownFields) return n } @@ -8371,6 +8419,66 @@ func (m *VEvent) UnmarshalVT(dAtA []byte) error { } m.EventGtid = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 29: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MustSave", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.MustSave = bool(v != 0) + case 30: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PinWorker", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.PinWorker = bool(v != 0) + case 31: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Skippable", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Skippable = bool(v != 0) default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/sidecardb/schema/vreplication/vreplication.sql b/go/vt/sidecardb/schema/vreplication/vreplication.sql index 6670b671f4f..4926598f380 100644 --- a/go/vt/sidecardb/schema/vreplication/vreplication.sql +++ b/go/vt/sidecardb/schema/vreplication/vreplication.sql @@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS vreplication `id` int NOT NULL AUTO_INCREMENT, `workflow` varbinary(1000) DEFAULT NULL, `source` mediumblob NOT NULL, - `pos` varbinary(10000) NOT NULL, + `pos` varbinary(45535) NOT NULL, `stop_pos` varbinary(10000) DEFAULT NULL, `max_tps` bigint NOT NULL, `max_replication_lag` bigint NOT NULL, diff --git a/go/vt/sidecardb/schema/vreplication/vreplication_worker_pos.sql b/go/vt/sidecardb/schema/vreplication/vreplication_worker_pos.sql new file mode 100644 index 00000000000..9ddfe38da0c --- /dev/null +++ b/go/vt/sidecardb/schema/vreplication/vreplication_worker_pos.sql @@ -0,0 +1,24 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +CREATE TABLE IF NOT EXISTS vreplication_worker_pos +( + `id` int NOT NULL AUTO_INCREMENT, + `worker` int unsigned NOT NULL, + `gtid` longblob NOT NULL, + `transaction_timestamp` bigint NOT NULL, + PRIMARY KEY (`id`, `worker`) +) ENGINE = InnoDB CHARSET = utf8mb4 diff --git a/go/vt/vttablet/common/flags.go b/go/vt/vttablet/common/flags.go index fa6380c0144..eb911cdcb64 100644 --- a/go/vt/vttablet/common/flags.go +++ b/go/vt/vttablet/common/flags.go @@ -31,6 +31,7 @@ const ( VReplicationExperimentalFlagOptimizeInserts = int64(1) VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2) VReplicationExperimentalFlagVPlayerBatching = int64(4) + VReplicationExperimentalFlagVPlayerParallel = int64(8) ) var ( diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index e6f4f2a2720..841ed18e776 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -78,6 +78,8 @@ type controller struct { WorkflowConfig *vttablet.VReplicationConfig } +type dbClientGenerator func() (binlogplayer.DBClient, error) + func processWorkflowOptions(params map[string]string) (*vttablet.VReplicationConfig, error) { options, ok := params["options"] if !ok { @@ -297,7 +299,27 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { } defer vsClient.Close(ctx) - vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre, ct.WorkflowConfig) + var dbClients []binlogplayer.DBClient + dbClientGen := func() (binlogplayer.DBClient, error) { + dbClient := ct.dbClientFactory() + if err := dbClient.Connect(); err != nil { + return nil, err + } + if ct.source.Filter != nil { + if err := setDBClientSettings(dbClient, ct.WorkflowConfig); err != nil { + return nil, err + } + } + dbClients = append(dbClients, dbClient) + return dbClient, nil + } + defer func() { + for _, dbClient := range dbClients { + dbClient.Close() + } + }() + vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, dbClientGen, ct.mysqld, ct.vre, ct.WorkflowConfig) + err = vr.Replicate(ctx) ct.lastWorkflowError.Record(err) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index fae0ba0796f..f139c70b649 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -711,8 +711,11 @@ func (vre *Engine) transitionJournal(je *journalEvent) { } for _, ks := range participants { id := je.participants[ks] - _, err := dbClient.ExecuteFetch(binlogplayer.DeleteVReplication(id), maxRows) - if err != nil { + if _, err := dbClient.ExecuteFetch(binlogplayer.DeleteVReplication(id), maxRows); err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + if _, err := dbClient.ExecuteFetch(binlogplayer.DeleteVReplicationWorkerPos(id), maxRows); err != nil { log.Errorf("transitionJournal: %v", err) return } diff --git a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go index 058ca29ff78..27297a7e6ad 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go @@ -29,6 +29,10 @@ import ( const relayLogIOStalledMsg = "relay log I/O stalled" +// maxItemsPrealloc is some reasonable initial allocation for the relay log buffer, +// to avoid incrementally re-allocating what would end up fully allocated anyhow. +const maxItemsPrealloc = 5000 + type relayLog struct { ctx context.Context maxItems int @@ -87,6 +91,9 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error { } } rl.timedout = false + if rl.items == nil { + rl.items = make([][]*binlogdatapb.VEvent, 0, min(maxItemsPrealloc, rl.maxItems)) + } rl.items = append(rl.items, events) rl.curSize += eventsSize(events) rl.hasItems.Broadcast() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 61512e0e5a6..4c816d30f92 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -24,6 +24,7 @@ import ( "math" "strconv" "strings" + "sync" "time" "vitess.io/vitess/go/mysql/replication" @@ -59,6 +60,7 @@ type vplayer struct { replicatorPlan *ReplicatorPlan tablePlans map[string]*TablePlan + planMu sync.RWMutex // These are set when creating the VPlayer based on whether the VPlayer // is in batch (stmt and trx) execution mode or not. @@ -66,7 +68,8 @@ type vplayer struct { commit func() error // If the VPlayer is in batch mode, we accumulate each transaction's statements // that are then sent as a single multi-statement protocol request to the database. - batchMode bool + batchMode bool + parallelMode bool pos replication.Position // unsavedEvent is set any time we skip an event without @@ -166,6 +169,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } vr.dbClient.maxBatchSize = maxAllowedPacket } + parallelMode := len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerParallel != 0 return &vplayer{ vr: vr, @@ -181,6 +185,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map query: queryFunc, commit: commitFunc, batchMode: batchMode, + parallelMode: parallelMode, } } @@ -281,9 +286,65 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) { }() applyErr := make(chan error, 1) - go func() { - applyErr <- vp.applyEvents(ctx, relay) - }() + + if vp.parallelMode { + defer log.Errorf("========== QQQ DONE fetchAndApply") + log.Errorf("========== QQQ fetchAndApply startPos: %v", vp.startPos) + + producer, err := newParallelProducer(ctx, vp.vr.dbClientGen, vp) + if err != nil { + return err + } + _, combinedPos, err := producer.aggregateWorkersPos(ctx, vp.vr.dbClient, false) + if err != nil { + return err + } + producer.startPos = combinedPos + log.Errorf("========== QQQ fetchAndApply producer.startPos = %v", producer.startPos) + + go func() { + defer log.Errorf("========== QQQ DONE fetchAndApply/parallel goroutine") + // ctx, cancel := context.WithCancel(ctx) + // defer cancel() + + err := func() error { + defer func() { + log.Errorf("========== QQQ DONE fetchAndApply/parallel inner. max_concurrency=%v, num commits=%v", producer.maxConcurrency.Load(), producer.numCommits.Load()) + + _, combinedPos, err := producer.aggregateWorkersPos(ctx, vp.vr.dbClient, false) + if err != nil { + log.Errorf("========== QQQ fetchAndApply producer.aggregateWorkersPos err=%v", err) + } + log.Errorf("========== QQQ fetchAndApply good. combinedPos=%v", combinedPos) + }() + log.Errorf("========== QQQ fetchAndApply applyEvents call") + if err := producer.applyEvents(ctx, relay); err != nil && err != io.EOF { + log.Errorf("========== QQQ fetchAndApply applyEvents err=%v", err) + return err + } + // log.Errorf("========== QQQ fetchAndApply producer.commitAll call") + // if err := producer.commitAll(ctx, nil); err != nil && err != io.EOF { + // log.Errorf("========== QQQ producer.commitAll err=%v", err) + // return err + // } + // TODO(shlomi): DeleteVReplicationWorkerPos + if producer.posReached.Load() { + log.Infof("Stopped at position: %v", vp.stopPos) + if vp.saveStop { + if err := vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, fmt.Sprintf("Stopped at position %v", vp.stopPos)); err != nil { + return err + } + } + } + return nil + }() + applyErr <- err + }() + } else { + go func() { + applyErr <- vp.applyEvents(ctx, relay) + }() + } select { case err := <-applyErr: @@ -347,15 +408,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row return fmt.Errorf("unexpected event on table %s", rowEvent.TableName) } applyFunc := func(sql string) (*sqltypes.Result, error) { - start := time.Now() - qr, err := vp.query(ctx, sql) - vp.vr.stats.QueryCount.Add(vp.phase, 1) - vp.vr.stats.QueryTimings.Record(vp.phase, start) - if vp.vr.workflowConfig.EnableHttpLog { - stats := NewVrLogStats("ROWCHANGE", start) - stats.Send(sql) - } - return qr, err + return vp.query(ctx, sql) } if vp.batchMode && len(rowEvent.RowChanges) > 1 { @@ -644,10 +697,10 @@ func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string { } func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error { - var stats *VrLogStats - if vp.vr.workflowConfig.EnableHttpLog { - stats = NewVrLogStats(event.Type.String(), time.Now()) - } + // var stats *VrLogStats + // if vp.vr.workflowConfig.EnableHttpLog { + // stats = NewVrLogStats(event.Type.String(), time.Now()) + // } switch event.Type { case binlogdatapb.VEventType_GTID: pos, err := binlogplayer.DecodePosition(event.Gtid) @@ -693,9 +746,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return err } vp.tablePlans[event.FieldEvent.TableName] = tplan - if stats != nil { - stats.Send(fmt.Sprintf("%v", event.FieldEvent)) - } + // if stats != nil { + // stats.Send(fmt.Sprintf("%v", event.FieldEvent)) + // } case binlogdatapb.VEventType_INSERT, binlogdatapb.VEventType_DELETE, binlogdatapb.VEventType_UPDATE, binlogdatapb.VEventType_REPLACE, binlogdatapb.VEventType_SAVEPOINT: @@ -713,9 +766,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyStmtEvent(ctx, event); err != nil { return err } - if stats != nil { - stats.Send(sql) - } + // if stats != nil { + // stats.Send(sql) + // } } case binlogdatapb.VEventType_ROW: // This player is configured for row based replication @@ -728,9 +781,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } // Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed // time for the Row event. - if stats != nil { - stats.Send(fmt.Sprintf("%v", event.RowEvent)) - } + // if stats != nil { + // stats.Send(fmt.Sprintf("%v", event.RowEvent)) + // } case binlogdatapb.VEventType_OTHER: if vp.vr.dbClient.InTransaction { // Unreachable @@ -784,9 +837,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if _, err := vp.query(ctx, event.Statement); err != nil { return err } - if stats != nil { - stats.Send(event.Statement) - } + // if stats != nil { + // stats.Send(fmt.Sprintf("%v", event.Statement)) + // } posReached, err := vp.updatePos(ctx, event.Timestamp) if err != nil { return err @@ -798,9 +851,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if _, err := vp.query(ctx, event.Statement); err != nil { log.Infof("Ignoring error: %v for DDL: %s", err, event.Statement) } - if stats != nil { - stats.Send(event.Statement) - } + // if stats != nil { + // stats.Send(fmt.Sprintf("%v", event.Statement)) + // } posReached, err := vp.updatePos(ctx, event.Timestamp) if err != nil { return err @@ -854,9 +907,9 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } return io.EOF } - if stats != nil { - stats.Send(fmt.Sprintf("%v", event.Journal)) - } + // if stats != nil { + // stats.Send(fmt.Sprintf("%v", event.Journal)) + // } return io.EOF case binlogdatapb.VEventType_HEARTBEAT: if event.Throttled { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_events.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_events.go new file mode 100644 index 00000000000..4e64fb026cf --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_events.go @@ -0,0 +1,46 @@ +/* +Copyright 2025 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "math" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +var ( + considerCommitWorkerEvent = &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_UNKNOWN, + SequenceNumber: math.MinInt64, + } +) + +func (p *parallelProducer) generateConsiderCommitWorkerEvent() *binlogdatapb.VEvent { + return considerCommitWorkerEvent +} + +func (p *parallelProducer) generateCommitWorkerEvent() *binlogdatapb.VEvent { + return &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_UNKNOWN, + SequenceNumber: p.commitWorkerEventSequence.Add(-1), + } +} + +func isConsiderCommitWorkerEvent(event *binlogdatapb.VEvent) bool { + return event.Type == binlogdatapb.VEventType_UNKNOWN && event.SequenceNumber == math.MinInt64 +} + +func isCommitWorkerEvent(event *binlogdatapb.VEvent) bool { + return event.Type == binlogdatapb.VEventType_UNKNOWN && event.SequenceNumber < 0 +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go new file mode 100644 index 00000000000..6b779ac7fea --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go @@ -0,0 +1,699 @@ +/* +Copyright 2025 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "fmt" + "io" + "math" + "sync" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vterrors" + vttablet "vitess.io/vitess/go/vt/vttablet/common" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +const ( + countWorkers = 6 + maxWorkerEvents = 5000 + maxCountWorkersEvents = countWorkers * maxWorkerEvents + maxIdleWorkerDuration = 250 * time.Millisecond + + aggregatePosInterval = 250 * time.Millisecond + + noUncommittedSequence = math.MaxInt64 +) + +type parallelProducer struct { + vp *vplayer + + workers []*parallelWorker + startPos replication.Position + + posReached atomic.Bool + workerErrors chan error + + sequenceToWorkersMap map[int64]int // sequence number => worker index + lowestUncommittedSequence atomic.Int64 + lowestSequenceListeners map[int64]chan int64 + sequenceToWorkersMapMu sync.RWMutex + + commitWorkerEventSequence atomic.Int64 + assignSequence int64 + + countAssignedToCurrentWorker int64 + + newDBClient func() (*vdbClient, error) + aggregateWorkersPosQuery string + lastAggregatedWorkersPosStr string + + numCommits atomic.Int64 // temporary. TODO: remove + currentConcurrency atomic.Int64 // temporary. TODO: remove + maxConcurrency atomic.Int64 // temporary. TODO: remove +} + +func newParallelProducer(ctx context.Context, dbClientGen dbClientGenerator, vp *vplayer) (*parallelProducer, error) { + p := ¶llelProducer{ + vp: vp, + workers: make([]*parallelWorker, countWorkers), + workerErrors: make(chan error, countWorkers+1), + sequenceToWorkersMap: make(map[int64]int), + lowestSequenceListeners: make(map[int64]chan int64), + aggregateWorkersPosQuery: binlogplayer.ReadVReplicationCombinedWorkersGTIDs(vp.vr.id), + } + p.lowestUncommittedSequence.Store(noUncommittedSequence) + + useBatchCommits := (vp.vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0) + + p.newDBClient = func() (*vdbClient, error) { + dbClient, err := dbClientGen() + if err != nil { + return nil, err + } + vdbClient := newVDBClient(dbClient, vp.vr.stats, 0) + _, err = vp.vr.setSQLMode(ctx, vdbClient) + if err != nil { + return nil, err + } + if useBatchCommits { + vdbClient.maxBatchSize = vp.vr.dbClient.maxBatchSize + } else { + vdbClient.maxBatchSize = 0 + } + + return vdbClient, nil + } + for i := range p.workers { + w := newParallelWorker(i, p, maxWorkerEvents) + var err error + if w.dbClient, err = p.newDBClient(); err != nil { + return nil, err + } + w.queryFunc = func(ctx context.Context, sql string) (*sqltypes.Result, error) { + if useBatchCommits { + if !w.dbClient.InTransaction { // Should be sent down the wire immediately + return w.dbClient.Execute(sql) + } + return nil, w.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch + } else { + return w.dbClient.Execute(sql) + } + } + w.commitFunc = func() error { + // REMOVE for batched commit + // return w.dbClient.CommitTrxQueryBatch() // Commit the current trx batch + return w.dbClient.Commit() + } + // INSERT a row into _vt.vreplication_worker_pos with an empty position + query := binlogplayer.GenerateInitWorkerPos(vp.vr.id, w.index) + log.Errorf("========== QQQ initWorkersPos query: %v", query) + if _, err := w.dbClient.ExecuteFetch(query, -1); err != nil { + return nil, err + } + p.workers[i] = w + } + + return p, nil +} + +// commitAll commits all workers and waits for them to complete. +func (p *parallelProducer) commitAll(ctx context.Context, except *parallelWorker) error { + var eg errgroup.Group + for _, w := range p.workers { + if except != nil && w.index == except.index { + continue + } + eg.Go(func() error { + return <-w.commitEvents(ctx) + }) + } + return eg.Wait() +} + +// updatePos updates _vt.vreplication with the given position and timestamp. +// This producer updates said position based on the aggregation of all committed workers positions. +func (p *parallelProducer) updatePos(ctx context.Context, pos replication.Position, ts int64, dbClient *vdbClient) (posReached bool, err error) { + update := binlogplayer.GenerateUpdatePos(p.vp.vr.id, pos, time.Now().Unix(), ts, p.vp.vr.stats.CopyRowCount.Get(), p.vp.vr.workflowConfig.StoreCompressedGTID) + if _, err := dbClient.ExecuteWithRetry(ctx, update); err != nil { + return false, fmt.Errorf("error %v updating position", err) + } + // p.vp.numAccumulatedHeartbeats = 0 + // p.vp.unsavedEvent = nil + // p.vp.timeLastSaved = time.Now() + // p.vp.vr.stats.SetLastPosition(p.vp.pos) + return posReached, nil +} + +// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record +// with a rate limit so that it's only saved in the database at most once per +// throttleUpdatesRateLimiter.tickerTime. +// It also increments the throttled count in the stats to keep track of how many +// times a VReplication workflow, and the specific sub-component, is throttled by the +// tablet throttler over time. It also increments the global throttled count to keep +// track of how many times in total vreplication has been throttled across all workflows +// (both ones that currently exist and ones that no longer do). +func (p *parallelProducer) updateTimeThrottled(appThrottled throttlerapp.Name, reasonThrottled string, dbClient *vdbClient) error { + appName := appThrottled.String() + p.vp.vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1) + globalStats.ThrottledCount.Add(1) + err := p.vp.vr.throttleUpdatesRateLimiter.Do(func() error { + tm := time.Now().Unix() + update, err := binlogplayer.GenerateUpdateTimeThrottled(p.vp.vr.id, tm, appName, reasonThrottled) + if err != nil { + return err + } + if _, err := dbClient.ExecuteFetch(update, maxRows); err != nil { + return fmt.Errorf("error %v updating time throttled", err) + } + return nil + }) + return err +} + +// aggregateWorkersPos aggregates the committed GTID positions of all workers, along with their transaction timestamps. +// If any change since last call is detected, the combined position is written to _vt.vreplication. +func (p *parallelProducer) aggregateWorkersPos(ctx context.Context, dbClient *vdbClient, onlyFirstContiguous bool) (aggregatedWorkersPos replication.Position, combinedPos replication.Position, err error) { + qr, err := dbClient.ExecuteFetch(p.aggregateWorkersPosQuery, -1) + if err != nil { + log.Errorf("Error fetching vreplication worker positions: %v. isclosed? %v", err, dbClient.IsClosed()) + return aggregatedWorkersPos, combinedPos, err + } + var aggregatedWorkersPosStr string + var lastEventTimestamp int64 + for _, row := range qr.Rows { // there's actually exactly one row in this result set + aggregatedWorkersPosStr = row[0].ToString() + lastEventTimestamp, err = row[1].ToInt64() + if err != nil { + return aggregatedWorkersPos, combinedPos, err + } + } + if aggregatedWorkersPosStr == p.lastAggregatedWorkersPosStr { + // Nothing changed since last visit. Skip all the parsing and updates. + return aggregatedWorkersPos, combinedPos, nil + } + aggregatedWorkersPos, err = binlogplayer.DecodeMySQL56Position(aggregatedWorkersPosStr) + if err != nil { + return aggregatedWorkersPos, combinedPos, err + } + + if onlyFirstContiguous && aggregatedWorkersPos.GTIDSet != nil { + // This is a performance optimization which, for the running duration, only + // considers the first contiguous part of the aggregated GTID set. + // The idea is that with out-of-order workers, a worker's GTID set + // is punctured, and that means its representation is very long. + // Consider something like: + // 9ee2b9ca-e848-11ef-b80c-091a65af3e28:4697:4699:4702:4704:4706:4708:4710:4713:4717:4719:4723:4726:4728:4730:4732:4734:4737:4739:4742:4744:4746:4748:4751:4753:4756:4758:4760:4762:4765:4768-4769:4772-4774:... + // Even when combined with multiple workers, it's enough that one worker is behind, + // that the same amount of punctures exists in the aggregated GTID set. + // What this leads to is: + // - Longer and more complex parsing of GTID sets. + // - More data to be sent over the wire. + // - More data to be stored in the database. Easily surpassing VARCHAR(10000) limitation. + // And the observation is that we don't really need all of this data _at this time_. + // Consider: when do we stop the workflow? + // - Either startPos is defined (and in all likelihood it is contiguous, as in 9ee2b9ca-e848-11ef-b80c-091a65af3e28:1-100000) + // - Or sommething is running VReplicationWaitForPos (e.g. Online DDL), and again, it's contiguous. + // So the conditions for which we wait don't care about the punctures. These are as good as "not applied" + // when it comes to comparing to the expected terminal position. + // However, when the workflow is stopped, we do need to know the full GTID set, so that we + // have accurate information about what was applied and what wasn't (consider Online DDL reverts + // that need to start at that precise position). And so when the workflow is stopped, we will + // have onlyFirstContiguous==false. + mysql56gtid := aggregatedWorkersPos.GTIDSet.(replication.Mysql56GTIDSet) + for sid := range mysql56gtid { + mysql56gtid[sid] = mysql56gtid[sid][:1] + } + } + // The aggregatedWorkersPos only looks at the GTI entries actually processed in the binary logs. + // The combinedPos also takes into account the startPos. combinedPos is what we end up storing + // in _vt.vreplication, and it's what will be compared to stopPosition or used by VReplicationWaitForPos. + combinedPos = replication.AppendGTIDSet(aggregatedWorkersPos, p.vp.startPos.GTIDSet) + p.vp.pos = combinedPos // TODO(shlomi) potential for race condition + + if !onlyFirstContiguous { + log.Errorf("========== QQQ aggregateWorkersPos aggregatedWorkersPos: %v", aggregatedWorkersPos) + log.Errorf("========== QQQ aggregateWorkersPos combinedPos: %v", combinedPos) + } + // Update _vt.vreplication. This write reflects everything we could read from the workers table, + // which means that data was committed by the workers, which means this is a de-factor "what's been + // actually applied"." + if _, err := p.updatePos(ctx, combinedPos, lastEventTimestamp, dbClient); err != nil { + log.Errorf("========== QQQ aggregateWorkersPos error: %v", err) + return aggregatedWorkersPos, combinedPos, err + } + p.lastAggregatedWorkersPosStr = aggregatedWorkersPosStr + return aggregatedWorkersPos, combinedPos, nil +} + +// watchPos runs in a goroutine and is in charge of peridocially aggregating workers +// positions and writing the aggregated value to _vt.vreplication, as well as +// sending the aggregated value to the workers themselves. +func (p *parallelProducer) watchPos(ctx context.Context) error { + dbClient, err := p.newDBClient() + if err != nil { + return err + } + defer dbClient.Close() + + aggregatePosTicker := time.NewTicker(aggregatePosInterval) + defer aggregatePosTicker.Stop() + // noProgressRateLimiter := timer.NewRateLimiter(time.Second) + // defer noProgressRateLimiter.Stop() + // var lastCombinedPos replication.Position + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-aggregatePosTicker.C: + aggregatedWorkersPos, combinedPos, err := p.aggregateWorkersPos(ctx, dbClient, true) + if err != nil { + log.Errorf("Error aggregating vreplication worker positions: %v. isclosed? %v", err, dbClient.IsClosed()) + continue + } + if aggregatedWorkersPos.IsZero() { + // This happens when there's been no change since last polling. It's a performance + // optimization to save the cost of updating the position. + log.Errorf("========== QQQ watchPos aggregatedWorkersPos is IDLE: %v", p.lastAggregatedWorkersPosStr) + continue + } + // log.Errorf("========== QQQ watchPos aggregatedWorkersPos: %v, combinedPos: %v, stop: %v", aggregatedWorkersPos, combinedPos, p.vp.stopPos) + + // Write back this combined pos to all workers, so that we condense their otherwise sparse GTID sets. + for _, w := range p.workers { + go func() { w.aggregatedPosChan <- aggregatedWorkersPos.String() }() + } + // log.Errorf("========== QQQ watchPos pushed combined pos") + // if combinedPos.GTIDSet.Equal(lastCombinedPos.GTIDSet) { + // // no progress has been made + // err := noProgressRateLimiter.Do(func() error { + // log.Errorf("========== QQQ watchPos no progress!! committing all") + // return p.commitAll(ctx, nil) + // }) + // if err != nil { + // return err + // } + // } else { + // // progress has been made + // lastCombinedPos.GTIDSet = combinedPos.GTIDSet + // } + if !p.vp.stopPos.IsZero() && combinedPos.AtLeast(p.vp.stopPos) { + if err := p.commitAll(ctx, nil); err != nil { + return err + } + p.posReached.Store(true) + return io.EOF + } + } + } +} + +func (p *parallelProducer) assignTransactionToWorker(sequenceNumber int64, lastCommitted int64, currentWorkerIndex int, preferCurrentWorker bool) (workerIndex int) { + p.sequenceToWorkersMapMu.RLock() + defer p.sequenceToWorkersMapMu.RUnlock() + + if sequenceNumber != 0 { + if lowest := p.lowestUncommittedSequence.Load(); sequenceNumber < lowest { + // log.Errorf("========== QQQ process lowest has CHANGED DOWN to %v", sequenceNumber) + p.lowestUncommittedSequence.Store(sequenceNumber) + } + } + + p.assignSequence++ + if workerIndex, ok := p.sequenceToWorkersMap[sequenceNumber]; ok { + // All events of same sequence should be executed by same worker + return workerIndex + } + if workerIndex, ok := p.sequenceToWorkersMap[lastCommitted]; ok { + // Transaction depends on another transaction, that is still being owned by some worker. + // Use that same worker, so that this transaction is non-blocking. + p.sequenceToWorkersMap[sequenceNumber] = workerIndex + return workerIndex + } + for i := p.lowestUncommittedSequence.Load(); i < lastCommitted; i++ { + if workerIndex, ok := p.sequenceToWorkersMap[i]; ok { + // Transaction depends on another transaction, that is still being owned by some worker. + // Use that same worker, so that this transaction is non-blocking. + p.sequenceToWorkersMap[sequenceNumber] = workerIndex + return workerIndex + } + } + // No specific transaction dependency constraints (any parent transactions were long since + // committed, and no worker longer indicates it owns any such transactions) + if preferCurrentWorker && p.countAssignedToCurrentWorker < maxWorkerEvents { + // Prefer the current worker, if it has capacity. On one hand, we want + // to batch queries as possible. On the other hand, we want to spread the + // the load across workers. + workerIndex = currentWorkerIndex + } else { + // Even if not specifically requested to assign current worker, we still + // want to do some batching. We batch `maxWorkerEvents` events per worker. + workerIndex = int(p.assignSequence/maxWorkerEvents) % len(p.workers) + } + p.sequenceToWorkersMap[sequenceNumber] = workerIndex + return workerIndex +} + +func (p *parallelProducer) registerLowestSequenceListener(notifyOnLowestAbove int64) (lowest int64, ch chan int64) { + p.sequenceToWorkersMapMu.Lock() + defer p.sequenceToWorkersMapMu.Unlock() + + lowest = p.lowestUncommittedSequence.Load() + if lowest > notifyOnLowestAbove { + // lowest uncommitted sequence is already above the requested value, so there is no need to + // register a listener, We return an empty channel as indication. + // log.Errorf("========== QQQ registerLowestSequenceListener free pass for notifyOnLowestAbove=%v because lowest=%v", notifyOnLowestAbove, lowest) + return lowest, nil + } + // log.Errorf("========== QQQ registerLowestSequenceListener worker %v registered for notifyOnLowestAbove=%v assigned to worker %v", worker, notifyOnLowestAbove, p.sequenceToWorkersMap[notifyOnLowestAbove]) + if ch, ok := p.lowestSequenceListeners[notifyOnLowestAbove]; ok { + // listener already exists + return lowest, ch + } + ch = make(chan int64, countWorkers) + p.lowestSequenceListeners[notifyOnLowestAbove] = ch + return lowest, ch +} + +func (p *parallelProducer) evaluateLowestUncommittedSequence(completedSequence int64) (lowest int64, changed bool) { + // assumed to be protected by sequenceToWorkersMapMu lock + lowest = p.lowestUncommittedSequence.Load() + if completedSequence != lowest { + // means p.lowestUncommittedSequence is still undeleted + return lowest, false + } + if len(p.sequenceToWorkersMap) == 0 { + return noUncommittedSequence, true + } + // Find the lowest sequence number that is not yet committed. + for { + lowest++ + if _, ok := p.sequenceToWorkersMap[lowest]; ok { + return lowest, true + } + } +} + +// completeSequenceNumbers is called by a worker once it committed or applies transactions. +// The function removes the sequence numbers from the map and updates the lowest uncommitted sequence number. +// It also notifies any listeners that may have been waiting for the lowest uncommitted sequence number to +// cross a certain value. +func (p *parallelProducer) completeSequenceNumbers(sequenceNumbers []int64) { + p.sequenceToWorkersMapMu.Lock() + defer p.sequenceToWorkersMapMu.Unlock() + + for _, sequenceNumber := range sequenceNumbers { + delete(p.sequenceToWorkersMap, sequenceNumber) + if lowest, changed := p.evaluateLowestUncommittedSequence(sequenceNumber); changed { + // log.Errorf("========== QQQ process lowest has CHANGED to %v", lowest) + p.lowestUncommittedSequence.Store(lowest) + for notifyOnLowestAbove, ch := range p.lowestSequenceListeners { + if lowest > notifyOnLowestAbove { + // log.Errorf("========== QQQ process notifying listener for %v on new lowest %v", notifyOnLowestAbove, lowest) + for range countWorkers { + ch <- lowest // nonblocking. The channel has enough capacity. + } + delete(p.lowestSequenceListeners, notifyOnLowestAbove) + } + } + } + } +} + +// process is a goroutine that reads events from the input channel and assigns them to workers. +func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatapb.VEvent) error { + currentWorker := p.workers[0] + hasFieldEvent := false + workerWithFieldEvent := -1 + for { + if p.posReached.Load() { + return io.EOF + } + select { + case <-ctx.Done(): + return ctx.Err() + case event := <-events: + canApplyInParallel := false + switch event.Type { + case binlogdatapb.VEventType_BEGIN, + binlogdatapb.VEventType_ROW, + binlogdatapb.VEventType_COMMIT, + binlogdatapb.VEventType_GTID: + // We can parallelize these events. + canApplyInParallel = true + case binlogdatapb.VEventType_FIELD: + canApplyInParallel = true + hasFieldEvent = true + workerWithFieldEvent = currentWorker.index + case binlogdatapb.VEventType_PREVIOUS_GTIDS: + // This `case` is not required, but let's make this very explicit: + // The transaction dependency graph is scoped to per-binary log. + // When rotating into a new binary log, we must wait until all + // existing workers have completed, as there is no information + // about dependencies cross binlogs. + canApplyInParallel = false + } + if !canApplyInParallel { + // As an example, thus could be a DDL. + // Wait for all existing workers to complete, including the one we are about to assign to. + if err := p.commitAll(ctx, nil); err != nil { + return err + } + } + workerIndex := p.assignTransactionToWorker(event.SequenceNumber, event.CommitParent, currentWorker.index, event.PinWorker) + if workerIndex == currentWorker.index { + // Measure how many events we have assigned to the current worker. We will + // cap at some value so as to distribute events to other workers and to avoid + // one worker taking all the load. + // See logic in assignTransactionToWorker() + p.countAssignedToCurrentWorker++ + } else { + currentWorker.events <- p.generateConsiderCommitWorkerEvent() + p.countAssignedToCurrentWorker = 1 + if hasFieldEvent { + log.Errorf("========== QQQ process FIELD issue: %v event seq=%v assigned to worker %v even though there was field event in worker %v", + event.Type, event.SequenceNumber, workerIndex, workerWithFieldEvent) + } + } + + if hasFieldEvent && event.Type == binlogdatapb.VEventType_COMMIT { + event.Skippable = false + } + currentWorker = p.workers[workerIndex] + select { + case <-ctx.Done(): + return ctx.Err() + case currentWorker.events <- event: + } + if hasFieldEvent && event.Type == binlogdatapb.VEventType_COMMIT { + // We wait until the field event is applied. + if err := <-currentWorker.commitEvents(ctx); err != nil { + return err + } + hasFieldEvent = false + } + if !canApplyInParallel { + // Say this was a DDL. Then we need to wait until it is absolutely complete, before we allow the next event to be processed. + if err := <-currentWorker.commitEvents(ctx); err != nil { + return err + } + } + } + } +} + +// applyEvents is a parallel variation on VPlayer's applyEvents() function. It spawns the necessary +// goroutines, starts the workers, processes the vents, and looks for errors. +func (p *parallelProducer) applyEvents(ctx context.Context, relay *relayLog) error { + defer log.Errorf("========== QQQ applyEvents defer") + + dbClient, err := p.newDBClient() + if err != nil { + return err + } + defer dbClient.Close() + + go func() { + if err := p.watchPos(ctx); err != nil { + p.workerErrors <- err + } + }() + + var wg sync.WaitGroup + defer wg.Wait() + workersCtx, workersCancel := context.WithCancel(ctx) + defer workersCancel() + + for _, w := range p.workers { + wg.Add(1) + go func() { + defer wg.Done() + p.workerErrors <- w.applyQueuedEvents(workersCtx) + }() + } + + estimateLag := func() { + behind := time.Now().UnixNano() - p.vp.lastTimestampNs - p.vp.timeOffsetNs + p.vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9) + //p.vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(p.vp.vr.id)), time.Duration(behind/1e9)*time.Second) + } + + eventQueue := make(chan *binlogdatapb.VEvent, 2*maxCountWorkersEvents) + go p.process(ctx, eventQueue) + + // If we're not running, set ReplicationLagSeconds to be very high. + // TODO(sougou): if we also stored the time of the last event, we + // can estimate this value more accurately. + defer p.vp.vr.stats.ReplicationLagSeconds.Store(math.MaxInt64) + //defer p.vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(p.vp.vr.id)), math.MaxInt64) + var lagSecs int64 + for { + if p.posReached.Load() { + return io.EOF + } + if ctx.Err() != nil { + return ctx.Err() + } + // Check throttler. + if checkResult, ok := p.vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(p.vp.throttlerAppName)); !ok { + go func() { + _ = p.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary(), dbClient) + estimateLag() + }() + continue + } + + items, err := relay.Fetch() + if err != nil { + return err + } + var pinWorker bool + countPinnedWorkerEvents := 0 + lagSecs = -1 + for i, events := range items { + for j, event := range events { + if !p.vp.stopPos.IsZero() { + // event.GTID is a GTIDSet of combined parsed events + _, streamedGTID, err := replication.DecodePositionMySQL56(event.Gtid) + if err != nil { + return err + } + if !p.vp.stopPos.GTIDSet.Contains(streamedGTID) { + // This event goes beyond the stop position. We skip it. + continue + } + } + if event.EventGtid != "" && !p.startPos.IsZero() { + // eventGTID is a singular GTID entry + eventGTID, err := replication.ParseMysql56GTID(event.EventGtid) + if err != nil { + return err + } + if p.startPos.GTIDSet.ContainsGTID(eventGTID) { + // This event was already processed. + log.Errorf("========== QQQ applyEvents skipping GTID entry %v", eventGTID) + continue + } + } + + if event.Timestamp != 0 { + // If the event is a heartbeat sent while throttled then do not update + // the lag based on it. + // If the batch consists only of throttled heartbeat events then we cannot + // determine the actual lag, as the vstreamer is fully throttled, and we + // will estimate it after processing the batch. + if !(event.Type == binlogdatapb.VEventType_HEARTBEAT && event.Throttled) { + p.vp.lastTimestampNs = event.Timestamp * 1e9 + p.vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime + lagSecs = event.CurrentTime/1e9 - event.Timestamp + } + } + switch event.Type { + case binlogdatapb.VEventType_COMMIT: + // If we've reached the stop position, we must save the current commit + // even if it's empty. So, the next applyEvent is invoked with the + // mustSave flag. + if !p.vp.stopPos.IsZero() && p.vp.pos.AtLeast(p.vp.stopPos) { + // We break early, so we never set `event.Skippable = true`. + // This is the equivalent of `mustSave` in sequential VPlayer + break + } + // In order to group multiple commits into a single one, we look ahead for + // the next commit. If there is one, we skip the current commit, which ends up + // applying the next set of events as part of the current transaction. This approach + // also handles the case where the last transaction is partial. In that case, + // we only group the transactions with commits we've seen so far. + // if countPinnedWorkerEvents < 2*maxCountWorkersEvents && false { + if countPinnedWorkerEvents < maxCountWorkersEvents && hasAnotherCommit(items, i, j+1) { + pinWorker = true + event.Skippable = true + } else { + pinWorker = false + } + case binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_JOURNAL: + pinWorker = false + } + if pinWorker { + countPinnedWorkerEvents++ + } else { + countPinnedWorkerEvents = 0 + } + event.PinWorker = pinWorker + select { + case eventQueue <- event: // to be consumed by p.Process() + case <-ctx.Done(): + return ctx.Err() + case err := <-p.workerErrors: + if err != io.EOF { + p.vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1) + var table, tableLogMsg, gtidLogMsg string + switch { + case event.GetFieldEvent() != nil: + table = event.GetFieldEvent().TableName + case event.GetRowEvent() != nil: + table = event.GetRowEvent().TableName + } + if table != "" { + tableLogMsg = " for table " + table + } + pos := getNextPosition(items, i, j+1) + if pos != "" { + gtidLogMsg = " while processing position " + pos + } + log.Errorf("Error applying event%s%s: %s", tableLogMsg, gtidLogMsg, err.Error()) + err = vterrors.Wrapf(err, "error applying event%s%s", tableLogMsg, gtidLogMsg) + } + return err + } + } + } + + if lagSecs >= 0 { + p.vp.vr.stats.ReplicationLagSeconds.Store(lagSecs) + //p.vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(p.vp.vr.id)), time.Duration(lagSecs)*time.Second) + } else { // We couldn't determine the lag, so we need to estimate it + estimateLag() + } + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_worker.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_worker.go new file mode 100644 index 00000000000..c893ef6da7a --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_worker.go @@ -0,0 +1,659 @@ +/* +Copyright 2025 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "fmt" + "io" + "strconv" + "strings" + "sync" + "time" + + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" +) + +type parallelWorker struct { + index int + dbClient *vdbClient + queryFunc func(ctx context.Context, sql string) (*sqltypes.Result, error) + commitFunc func() error + vp *vplayer + aggregatedPosChan chan string + + producer *parallelProducer + + events chan *binlogdatapb.VEvent + stats *VrLogStats + sequenceNumbers []int64 + sequenceNumbersMap map[int64]int64 + localLowestUncommittedSequence int64 + commitSubscribers map[int64]chan error // subscribing to commit events + commitSubscribersMu sync.RWMutex + + // foreignKeyChecksEnabled is the current state of the foreign key checks for the current session. + // It reflects what we have set the @@session.foreign_key_checks session variable to. + foreignKeyChecksEnabled bool + // foreignKeyChecksStateInitialized is set to true once we have initialized the foreignKeyChecksEnabled. + // The initialization is done on the first row event that this vplayer sees. + foreignKeyChecksStateInitialized bool + + numCommits int // TODO(shlomi): remove this (only used as benchmark info) + numEvents int // TODO(shlomi): remove this (only used as benchmark info) + numSubscribes int // TODO(shlomi): remove this (only used as benchmark info) + + updatedPos replication.Position + updatedPosTimestamp int64 +} + +func newParallelWorker(index int, producer *parallelProducer, capacity int) *parallelWorker { + log.Errorf("======= QQQ newParallelWorker index: %v", index) + return ¶llelWorker{ + index: index, + producer: producer, + events: make(chan *binlogdatapb.VEvent, capacity), + aggregatedPosChan: make(chan string), + sequenceNumbers: make([]int64, maxWorkerEvents), + sequenceNumbersMap: make(map[int64]int64), + commitSubscribers: make(map[int64]chan error), + vp: producer.vp, + } +} + +func (w *parallelWorker) subscribeCommitWorkerEvent(sequenceNumber int64) chan error { + w.commitSubscribersMu.Lock() + defer w.commitSubscribersMu.Unlock() + + w.numSubscribes++ + c := make(chan error, 1) + w.commitSubscribers[sequenceNumber] = c + return c +} + +// updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval. +func (w *parallelWorker) updatePos(ctx context.Context, posStr string, transactionTimestamp int64, singleGTID bool) (posReached bool, err error) { + if w.dbClient.InTransaction { + // We're assuming there's multiple calls to updatePos within this + // transaction. We don't write them at this time. Instead, we + // aggregate the given positions and write them in the commit. + if singleGTID { + // Faster to ParseMysql56GTID than DecodeMySQL56Position when it's just the one entry + gtid, err := replication.ParseMysql56GTID(posStr) + if err != nil { + return false, err + } + w.updatedPos = replication.AppendGTIDInPlace(w.updatedPos, gtid) + } else { + pos, err := binlogplayer.DecodeMySQL56Position(posStr) + if err != nil { + return false, err + } + w.updatedPos = replication.AppendGTIDSetInPlace(w.updatedPos, pos.GTIDSet) + } + w.updatedPosTimestamp = max(w.updatedPosTimestamp, transactionTimestamp) + return false, nil + } + update := binlogplayer.GenerateUpdateWorkerPos(w.vp.vr.id, w.index, posStr, transactionTimestamp) + if _, err := w.queryFunc(ctx, update); err != nil { + return false, fmt.Errorf("error updating position: %v", err) + } + // TODO (shlomi): handle these + // vp.numAccumulatedHeartbeats = 0 + // vp.unsavedEvent = nil + // vp.timeLastSaved = time.Now() + // vp.vr.stats.SetLastPosition(vp.pos) + + return posReached, nil +} + +func (w *parallelWorker) updatePosByEvent(ctx context.Context, event *binlogdatapb.VEvent) error { + if _, err := w.updatePos(ctx, event.EventGtid, event.Timestamp, true); err != nil { + return err + } + return nil +} + +func (w *parallelWorker) commitEvents(ctx context.Context) chan error { + event := w.producer.generateCommitWorkerEvent() + // log.Errorf("========== QQQ commitEvents: %v", event) + c := w.subscribeCommitWorkerEvent(event.SequenceNumber) + // log.Errorf("========== QQQ commitEvents: subscribed to %v in worker %v", event.SequenceNumber, w.index) + select { + case w.events <- event: + case <-ctx.Done(): + c := make(chan error, 1) + c <- ctx.Err() + return c + } + // log.Errorf("========== QQQ commitEvents: pushed event") + return c +} + +func (w *parallelWorker) applyQueuedEvents(ctx context.Context) (err error) { + log.Errorf("========== QQQ applyQueuedEvents") + + defer func() { + // Anything that's not committed should be rolled back + w.dbClient.Rollback() + log.Errorf("========== QQQ applyQueuedEvents worker %v num commits=%v, num events=%v, numSubscribes=%v", w.index, w.numCommits, w.numEvents, w.numSubscribes) + }() + + ticker := time.NewTicker(maxIdleWorkerDuration / 2) + defer ticker.Stop() + + var lastTickerTime time.Time + var lastAppliedTickerTime time.Time + var lastEventWasSkippedCommit bool + + applyEvent := func(event *binlogdatapb.VEvent) error { + lastEventWasSkippedCommit = false + if err := w.applyQueuedEvent(ctx, event); err != nil { + return err + } + lastAppliedTickerTime = lastTickerTime + return nil + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + case lastTickerTime = <-ticker.C: + if lastEventWasSkippedCommit && lastTickerTime.Sub(lastAppliedTickerTime) >= maxIdleWorkerDuration { + // The last event was a commit, which we did nto actually apply, as we figured we'd + // follow up with more statements. But it's been a while and there's been no statement since. + // So we're just sitting idly with a bunch of uncommitted statements. Better to commit now. + log.Errorf("========== QQQ applyQueuedEvents worker %v idle with skipped commit and %v events. COMMITTING", w.index, len(w.sequenceNumbers)) + if err := applyEvent(w.producer.generateCommitWorkerEvent()); err != nil { + return vterrors.Wrapf(err, "failed to commit idle worker %v", w.index) + } + } + case pos := <-w.aggregatedPosChan: + if _, err := w.updatePos(ctx, pos, 0, false); err != nil { + return err + } + case event := <-w.events: + if _, ch := w.applyQueuedEventBlocker(ctx, event); ch != nil { + if lastEventWasSkippedCommit { + // log.Errorf("========== QQQ applyQueuedEvent worker %v event %v (seq=%v) is going to block. lowest=%v. committing %v sequence numbers", w.index, event.Type, event.SequenceNumber, lowest, len(w.sequenceNumbersMap)) + // log.Errorf("========== QQQ applyQueuedEvent worker %v event %v (seq=%v) is COMMITTING due to commit parent %v because lowest is %v. sequence numbers: %v", w.index, event.Type, event.SequenceNumber, event.CommitParent, lowest, w.sequenceNumbersMap) + if err := applyEvent(w.producer.generateCommitWorkerEvent()); err != nil { + return vterrors.Wrapf(err, "failed to commit idle worker %v", w.index) + } + // log.Errorf("========== QQQ applyQueuedEvent worker %v event %v (seq=%v) is going to block. committed", w.index, event.Type, event.SequenceNumber) + } + // log.Errorf("========== QQQ applyQueuedEvent worker %v event %v (seq=%v) committing everyone else", w.index, event.Type, event.SequenceNumber) + // w.producer.commitAll(ctx, w) + // log.Errorf("========== QQQ applyQueuedEvent worker %v event %v (seq=%v) is WAITING on commit parent %v because lowest is %v. sequence numbers: %v", w.index, event.Type, event.SequenceNumber, event.CommitParent, lowest, w.sequenceNumbers) + select { + case <-ch: // unblock + // log.Errorf("========== QQQ applyQueuedEvent worker %v event %v (seq=%v) is RELEASED on commit parent %v and newLowest=%v", w.index, event.Type, event.SequenceNumber, event.CommitParent, newLowest) + case <-ctx.Done(): + return ctx.Err() + } + } + if event.SequenceNumber >= 0 { + // Negative values are happen in commitWorkerEvent(). These are not real events. + w.sequenceNumbers = append(w.sequenceNumbers, event.SequenceNumber) + w.sequenceNumbersMap[event.SequenceNumber] = event.CommitParent + } + + if isConsiderCommitWorkerEvent(event) { + if !lastEventWasSkippedCommit { + continue + } + } + + skippable := func() bool { + if event.Type != binlogdatapb.VEventType_COMMIT { + return false + } + if event.Skippable { + return true + } + // if len(w.sequenceNumbers) < maxWorkerEvents { + // // We don't want to commit yet. We're waiting for more events. + // return true + // } + return false + } + + if skippable() { + // At this time only COMMIT events are Skippable, so checking for the type is not + // strictly necessary. But it's safer to add that check. + lastEventWasSkippedCommit = true + continue + } + // log.Errorf("========== QQQ applyQueuedEvents worker %v applying %v event at %v. in transaction=%v", w.index, event.Type, event.EventGtid, w.dbClient.InTransaction) + // if event.EventGtid == "" { + // log.Errorf("========== QQQ applyQueuedEvents worker %v event.EventGtid is empty. worker position=%v", w.index, w.updatedPos) + // } + if err := applyEvent(event); err != nil { + return vterrors.Wrapf(err, "worker %v failed to apply %v event at position %v", w.index, event.Type, event.EventGtid) + } + } + } +} + +// applyQueuedEventBlocker checks if the event can be applied immediately or if it needs to wait for other +// events to be applied first. It returns a channel that will be populated once the event can be applied. +// If the event can be applied immediately, the function returns nil. +func (w *parallelWorker) applyQueuedEventBlocker(ctx context.Context, event *binlogdatapb.VEvent) (int64, chan int64) { + lowest := w.producer.lowestUncommittedSequence.Load() + if event.CommitParent < lowest { + // Means the parent is already committed. No need to wait. + return lowest, nil + } + allDependenciesAreInCurrentWorker := func() bool { + if len(w.sequenceNumbersMap) < int(event.SequenceNumber-lowest) { + return false + } + for i := lowest; i <= event.CommitParent; i++ { + if _, ok := w.sequenceNumbersMap[i]; !ok { + return false + } + } + // log.Errorf("========== QQQ applyQueuedEvent worker %v allDependenciesAreInCurrentWorker lowest=%v, commitparent=%v, sequenceNumbers=%v", w.index, lowest, event.CommitParent, w.sequenceNumbers) + return true + } + if allDependenciesAreInCurrentWorker() { + return lowest, nil + } + return w.producer.registerLowestSequenceListener(event.CommitParent) +} + +// applyQueuedEvent applies an event from the queue. +func (w *parallelWorker) applyQueuedEvent(ctx context.Context, event *binlogdatapb.VEvent) error { + switch event.Type { + case binlogdatapb.VEventType_GTID: + if err := w.updatePosByEvent(ctx, event); err != nil { + return err + } + return nil + case binlogdatapb.VEventType_BEGIN: + // No-op: begin is called as needed. + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_UNKNOWN: + return w.applyQueuedCommit(ctx, event) + case binlogdatapb.VEventType_FIELD: + if err := w.dbClient.Begin(); err != nil { + return err + } + onField := func() error { + w.vp.planMu.Lock() + defer w.vp.planMu.Unlock() + + tplan, err := w.vp.replicatorPlan.buildExecutionPlan(event.FieldEvent) + if err != nil { + return err + } + + w.vp.tablePlans[event.FieldEvent.TableName] = tplan + return nil + } + if err := onField(); err != nil { + return err + } + case binlogdatapb.VEventType_INSERT, binlogdatapb.VEventType_DELETE, binlogdatapb.VEventType_UPDATE, + binlogdatapb.VEventType_REPLACE, binlogdatapb.VEventType_SAVEPOINT: + // use event.Statement if available, preparing for deprecation in 8.0 + sql := event.Statement + if sql == "" { + sql = event.Dml + } + // If the event is for one of the AWS RDS "special" or pt-table-checksum tables, we skip + if !strings.Contains(sql, " mysql.rds_") && !strings.Contains(sql, " percona.checksums") { + // This is a player using statement based replication + if err := w.dbClient.Begin(); err != nil { + return err + } + if err := w.applyQueuedStmtEvent(ctx, event); err != nil { + return err + } + } + case binlogdatapb.VEventType_ROW: + if err := w.dbClient.Begin(); err != nil { + return err + } + if err := w.applyQueuedRowEvent(ctx, event); err != nil { + return err + } + // Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed + // time for the Row event. + case binlogdatapb.VEventType_OTHER: + if w.dbClient.InTransaction { + // Unreachable + log.Errorf("internal error: vplayer is in a transaction on event: %v", event) + return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event) + } + // Just update the position. + if err := w.updatePosByEvent(ctx, event); err != nil { + return err + } + w.flushSequenceNumbers() + case binlogdatapb.VEventType_DDL: + if w.dbClient.InTransaction { + // Unreachable + log.Errorf("internal error: vplayer is in a transaction on event: %v", event) + return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event) + } + w.vp.vr.stats.DDLEventActions.Add(w.vp.vr.source.OnDdl.String(), 1) // Record the DDL handling + switch w.vp.vr.source.OnDdl { + case binlogdatapb.OnDDLAction_IGNORE: + // We still have to update the position. + if err := w.updatePosByEvent(ctx, event); err != nil { + return err + } + w.flushSequenceNumbers() + case binlogdatapb.OnDDLAction_STOP: + if err := w.dbClient.Begin(); err != nil { + return err + } + if err := w.updatePosByEvent(ctx, event); err != nil { + return err + } + w.flushSequenceNumbers() + if err := w.setVRState(binlogdatapb.VReplicationWorkflowState_Stopped, "Stopped at DDL "+event.Statement); err != nil { + return err + } + if err := w.commitFunc(); err != nil { + return err + } + return io.EOF + case binlogdatapb.OnDDLAction_EXEC: + // It's impossible to save the position transactionally with the statement. + // So, we apply the DDL first, and then save the position. + // Manual intervention may be needed if there is a partial + // failure here. + if _, err := w.queryFunc(ctx, event.Statement); err != nil { + return err + } + if err := w.updatePosByEvent(ctx, event); err != nil { + return err + } + w.flushSequenceNumbers() + case binlogdatapb.OnDDLAction_EXEC_IGNORE: + if _, err := w.queryFunc(ctx, event.Statement); err != nil { + log.Infof("Ignoring error: %v for DDL: %s", err, event.Statement) + } + if err := w.updatePosByEvent(ctx, event); err != nil { + return err + } + w.flushSequenceNumbers() + } + case binlogdatapb.VEventType_JOURNAL: + if w.dbClient.InTransaction { + // Unreachable + log.Errorf("internal error: vplayer is in a transaction on event: %v", event) + return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event) + } + // Ensure that we don't have a partial set of table matches in the journal. + switch event.Journal.MigrationType { + case binlogdatapb.MigrationType_SHARDS: + // All tables of the source were migrated. So, no validation needed. + case binlogdatapb.MigrationType_TABLES: + // Validate that all or none of the tables are in the journal. + jtables := make(map[string]bool) + for _, table := range event.Journal.Tables { + jtables[table] = true + } + found := false + notFound := false + for tableName := range w.vp.replicatorPlan.TablePlans { + if _, ok := jtables[tableName]; ok { + found = true + } else { + notFound = true + } + } + switch { + case found && notFound: + // Some were found and some were not found. We can't handle this. + if err := w.setVRState(binlogdatapb.VReplicationWorkflowState_Stopped, "unable to handle journal event: tables were partially matched"); err != nil { + return err + } + return io.EOF + case notFound: + // None were found. Ignore journal. + return nil + } + // All were found. We must register journal. + } + log.Infof("Binlog event registering journal event %+v", event.Journal) + if err := w.vp.vr.vre.registerJournal(event.Journal, w.vp.vr.id); err != nil { + if err := w.setVRState(binlogdatapb.VReplicationWorkflowState_Stopped, err.Error()); err != nil { + return err + } + return io.EOF + } + return io.EOF + case binlogdatapb.VEventType_HEARTBEAT: + if event.Throttled { + if err := w.vp.vr.updateTimeThrottled(throttlerapp.VStreamerName, event.ThrottledReason); err != nil { + return err + } + } + if !w.dbClient.InTransaction { + w.vp.numAccumulatedHeartbeats++ + if err := w.vp.recordHeartbeat(); err != nil { + return err + } + } + } + return nil +} + +func (w *parallelWorker) setVRState(state binlogdatapb.VReplicationWorkflowState, message string) error { + // TODO (shlomi): handle race conditions in vr.state + if message != "" { + w.vp.vr.stats.History.Add(&binlogplayer.StatsHistoryRecord{ + Time: time.Now(), + Message: message, + }) + } + w.vp.vr.stats.State.Store(state.String()) + query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), w.vp.vr.id) + // If we're batching a transaction, then include the state update + // in the current transaction batch. + dbClient := w.dbClient + if dbClient.InTransaction && dbClient.maxBatchSize > 0 { + dbClient.AddQueryToTrxBatch(query) + } else { // Otherwise, send it down the wire + if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + return fmt.Errorf("could not set state: %v: %v", query, err) + } + } + if state == w.vp.vr.state { + return nil + } + insertLog(dbClient, LogStateChange, w.vp.vr.id, state.String(), message) + w.vp.vr.state = state + + return nil +} + +// applyQueuedStmtEvent applies an actual DML statement received from the source, directly onto the backend database +func (w *parallelWorker) applyQueuedStmtEvent(ctx context.Context, event *binlogdatapb.VEvent) error { + vp := w.vp + sql := event.Statement + if sql == "" { + sql = event.Dml + } + if event.Type == binlogdatapb.VEventType_SAVEPOINT || vp.canAcceptStmtEvents { + start := time.Now() + _, err := w.queryFunc(ctx, sql) + vp.vr.stats.QueryTimings.Record(vp.phase, start) + vp.vr.stats.QueryCount.Add(vp.phase, 1) + return err + } + return fmt.Errorf("filter rules are not supported for SBR replication: %v", vp.vr.source.Filter.GetRules()) +} + +// updateFKCheck updates the @@session.foreign_key_checks variable based on the binlog row event flags. +// The function only does it if it has changed to avoid redundant updates, using the cached vplayer.foreignKeyChecksEnabled +// The foreign_key_checks value for a transaction is determined by the 2nd bit (least significant) of the flags: +// - If set (1), foreign key checks are disabled. +// - If unset (0), foreign key checks are enabled. +// updateFKCheck also updates the state for the first row event that this vplayer, and hence the db connection, sees. +func (w *parallelWorker) updateFKCheck(ctx context.Context, flags2 uint32) error { + mustUpdate := false + if w.vp.vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy) { + // If this is an atomic copy, we must update the foreign_key_checks state even when the vplayer runs during + // the copy phase, i.e., for catchup and fastforward. + mustUpdate = true + } else if w.vp.vr.state == binlogdatapb.VReplicationWorkflowState_Running { + // If the vreplication workflow is in Running state, we must update the foreign_key_checks + // state for all workflow types. + mustUpdate = true + } + if !mustUpdate { + return nil + } + dbForeignKeyChecksEnabled := flags2&NoForeignKeyCheckFlagBitmask != NoForeignKeyCheckFlagBitmask + + if w.foreignKeyChecksStateInitialized /* already set earlier */ && + dbForeignKeyChecksEnabled == w.foreignKeyChecksEnabled /* no change in the state, no need to update */ { + return nil + } + log.Infof("Setting this session's foreign_key_checks to %s", strconv.FormatBool(dbForeignKeyChecksEnabled)) + if _, err := w.queryFunc(ctx, "set @@session.foreign_key_checks="+strconv.FormatBool(dbForeignKeyChecksEnabled)); err != nil { + return fmt.Errorf("failed to set session foreign_key_checks: %w", err) + } + w.foreignKeyChecksEnabled = dbForeignKeyChecksEnabled + if !w.foreignKeyChecksStateInitialized { + log.Infof("First foreign_key_checks update to: %s", strconv.FormatBool(dbForeignKeyChecksEnabled)) + w.foreignKeyChecksStateInitialized = true + } + return nil +} + +func (w *parallelWorker) applyQueuedRowEvent(ctx context.Context, vevent *binlogdatapb.VEvent) error { + if err := w.updateFKCheck(ctx, vevent.RowEvent.Flags); err != nil { + return err + } + var tplan *TablePlan + func() { + w.vp.planMu.RLock() + defer w.vp.planMu.RUnlock() + tplan = w.vp.tablePlans[vevent.RowEvent.TableName] + }() + if tplan == nil { + return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "unexpected event on table %s that has no plan yet", vevent.RowEvent.TableName) + } + applyFunc := func(sql string) (*sqltypes.Result, error) { + return w.queryFunc(ctx, sql) + } + + rowEvent := vevent.RowEvent + if w.vp.batchMode && len(rowEvent.RowChanges) > 1 { + // If we have multiple delete row events for a table with a single PK column + // then we can perform a simple bulk DELETE using an IN clause. + if (rowEvent.RowChanges[0].Before != nil && rowEvent.RowChanges[0].After == nil) && + tplan.MultiDelete != nil { + _, err := tplan.applyBulkDeleteChanges(rowEvent.RowChanges, applyFunc, w.dbClient.maxBatchSize) + return err + } + // If we're done with the copy phase then we will be replicating all INSERTS + // regardless of the PK value and can use a single INSERT statment with + // multiple VALUES clauses. + // TODO(shlomi): race condition over w.vp.copyState + if len(w.vp.copyState) == 0 && (rowEvent.RowChanges[0].Before == nil && rowEvent.RowChanges[0].After != nil) { + _, err := tplan.applyBulkInsertChanges(rowEvent.RowChanges, applyFunc, w.dbClient.maxBatchSize) + return err + } + } + { + // Measure parallel vplayer concurrency (TODO(shlomi): remove) + currentConcurrency := w.producer.currentConcurrency.Add(1) + defer w.producer.currentConcurrency.Add(-1) + if currentConcurrency > w.producer.maxConcurrency.Load() { + w.producer.maxConcurrency.Store(currentConcurrency) + } + } + + for _, change := range vevent.RowEvent.RowChanges { + if _, err := tplan.applyChange(change, applyFunc); err != nil { + return err + } + } + return nil +} + +func (w *parallelWorker) applyQueuedCommit(ctx context.Context, event *binlogdatapb.VEvent) error { + switch { + case event.Type == binlogdatapb.VEventType_COMMIT: + case isCommitWorkerEvent(event): + default: + // Not a commit + return nil + } + // As a very simple optimization, we will only commit if we have any events at all to commit. + shouldActuallyCommit := len(w.sequenceNumbers) > 0 + var err error + if shouldActuallyCommit { + if !w.updatedPos.IsZero() { + update := binlogplayer.GenerateUpdateWorkerPos(w.vp.vr.id, w.index, w.updatedPos.String(), w.updatedPosTimestamp) + // log.Errorf("========== QQQ applyQueuedCommit worker %v actual commit with updatedPos=%v", w.index, w.updatedPos) + if _, err := w.queryFunc(ctx, update); err != nil { + // log.Errorf("========== QQQ applyQueuedCommit worker %v actual commit FAILED", w.index) + return err + } + // log.Errorf("========== QQQ applyQueuedCommit worker %v actual commit SUCCESS", w.index) + w.updatedPos = replication.Position{} + } + err = w.commitFunc() + } + func() { + // Notify subsribers of commit event + if event.SequenceNumber >= 0 { + // Not a subscribed event + return + } + w.commitSubscribersMu.Lock() + defer w.commitSubscribersMu.Unlock() + if subs, ok := w.commitSubscribers[event.SequenceNumber]; ok { + subs <- err + delete(w.commitSubscribers, event.SequenceNumber) + } + }() + if err != nil { + return err + } + // Commit successful + if shouldActuallyCommit { + // Parallel VPlayer metrics. TODO(shlomi): remove + w.producer.numCommits.Add(1) + w.numCommits++ + w.numEvents += len(w.sequenceNumbers) + } + w.flushSequenceNumbers() + if w.producer.posReached.Load() { + return io.EOF + } + return nil +} + +func (w *parallelWorker) flushSequenceNumbers() { + // We now let the producer know that we've completed the sequence numbers. + // It will deassign these sequence numebrs from the worker. + w.producer.completeSequenceNumbers(w.sequenceNumbers) + w.sequenceNumbers = w.sequenceNumbers[:0] + clear(w.sequenceNumbersMap) +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 5723c09ebcc..63f31b07d39 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -95,9 +95,10 @@ const ( // vreplicator provides the core logic to start vreplication streams type vreplicator struct { - vre *Engine - id int32 - dbClient *vdbClient + vre *Engine + id int32 + dbClient *vdbClient + dbClientGen dbClientGenerator // source source *binlogdatapb.BinlogSource sourceVStreamer VStreamerClient @@ -142,7 +143,7 @@ type vreplicator struct { // More advanced constructs can be used. Please see the table plan builder // documentation for more info. func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, - dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, vre *Engine, workflowConfig *vttablet.VReplicationConfig) *vreplicator { + dbClient binlogplayer.DBClient, dbClientGen dbClientGenerator, mysqld mysqlctl.MysqlDaemon, vre *Engine, workflowConfig *vttablet.VReplicationConfig) *vreplicator { if workflowConfig == nil { workflowConfig = vttablet.DefaultVReplicationConfig } @@ -158,6 +159,7 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer sourceVStreamer: sourceVStreamer, stats: stats, dbClient: newVDBClient(dbClient, stats, workflowConfig.RelayLogMaxItems), + dbClientGen: dbClientGen, mysqld: mysqld, workflowConfig: workflowConfig, } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index d2854327be6..3375c676961 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -233,7 +233,7 @@ func TestDeferSecondaryKeys(t *testing.T) { _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) require.NoError(t, err) }() - vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) + vr := newVReplicator(id, bls, vsclient, stats, dbClient, nil, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) getActionsSQLf := "select action from _vt.post_copy_action where table_name='%s'" getCurrentDDL := func(tableName string) string { req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{tableName}} @@ -387,7 +387,7 @@ func TestDeferSecondaryKeys(t *testing.T) { if err != nil { return err } - myvr := newVReplicator(myid, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) + myvr := newVReplicator(myid, bls, vsclient, stats, dbClient, nil, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) myvr.WorkflowType = int32(binlogdatapb.VReplicationWorkflowType_Reshard) // Insert second post copy action record to simulate a shard merge where you // have N controllers/replicators running for the same table on the tablet. @@ -631,7 +631,7 @@ func TestCancelledDeferSecondaryKeys(t *testing.T) { _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) require.NoError(t, err) }() - vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) + vr := newVReplicator(id, bls, vsclient, stats, dbClient, nil, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) vr.WorkflowType = int32(binlogdatapb.VReplicationWorkflowType_MoveTables) getCurrentDDL := func(tableName string) string { req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{tableName}} @@ -750,7 +750,7 @@ func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) { _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) require.NoError(t, err) }() - vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) + vr := newVReplicator(id, bls, vsclient, stats, dbClient, nil, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get()) } @@ -851,7 +851,7 @@ func TestThrottlerAppNames(t *testing.T) { _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) require.NoError(t, err) }() - vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) + vr := newVReplicator(id, bls, vsclient, stats, dbClient, nil, env.Mysqld, playerEngine, vttablet.DefaultVReplicationConfig) settings, _, err := vr.loadSettings(ctx, newVDBClient(dbClient, stats, vttablet.DefaultVReplicationConfig.RelayLogMaxItems)) require.NoError(t, err) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 2cbb59acb02..eeb094b6c0a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -412,7 +412,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } for _, vevent := range vevents { if err := bufferAndTransmit(vevent); err != nil { - if err == io.EOF { + if errors.Is(vterrors.UnwrapAll(err), io.EOF) { return nil } vs.vse.errorCounts.Add("BufferAndTransmit", 1) @@ -440,7 +440,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case <-hbTimer.C: checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOK(ctx, vs.throttlerApp) if err := injectHeartbeat(!ok, checkResult.Summary()); err != nil { - if err == io.EOF { + if errors.Is(vterrors.UnwrapAll(err), io.EOF) { return nil } vs.vse.errorCounts.Add("Send", 1) @@ -466,6 +466,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev if ev.IsFormatDescription() { var err error vs.format, err = ev.Format() + vs.eventGTID = nil if err != nil { return nil, fmt.Errorf("can't parse FORMAT_DESCRIPTION_EVENT: %v, event data: %#v", err, ev) } @@ -513,7 +514,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev SequenceNumber: sequenceNumber, }) } - vs.pos = replication.AppendGTID(vs.pos, gtid) + vs.pos = replication.AppendGTID(vs.pos, gtid) // Ideally using AppendGTIDInPlace, but there's a race condition here, see https://github.com/vitessio/vitess/pull/18611 vs.commitParent = commitParent vs.sequenceNumber = sequenceNumber vs.eventGTID = gtid @@ -721,7 +722,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev for { tpevent, err := tp.GetNextEvent() if err != nil { - if err == io.EOF { + if errors.Is(vterrors.UnwrapAll(err), io.EOF) { break } return nil, err diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index d4b0abecd94..080810950a5 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -486,6 +486,12 @@ message VEvent { int64 sequence_number = 27; // EventGTID is decorated by VPlayer. It is the specific GTID (not the GTID set) for this event. string event_gtid = 28; + // MustSave is a decoration by VPlayer + bool must_save = 29; + // PinWorker is a decoration by parallel VPlayer + bool pin_worker = 30; + // Skippable is a decoration by parallel VPlayer + bool skippable = 31; } message MinimalTable { diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index da6c4423497..6e8337a5ed7 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -103,6 +103,7 @@ var ( "mysql_server_vault", "vstream", "onlineddl_vrepl", + "onlineddl_vrepl_bench", "onlineddl_vrepl_stress", "onlineddl_vrepl_stress_suite", "onlineddl_vrepl_suite", @@ -175,6 +176,7 @@ var ( } clusterRequiring16CoresMachines = []string{ "onlineddl_vrepl", + "onlineddl_vrepl_bench", "onlineddl_vrepl_stress", "onlineddl_vrepl_stress_suite", "onlineddl_vrepl_suite", diff --git a/test/config.json b/test/config.json index ab1fa3d00ac..64c886aaf50 100644 --- a/test/config.json +++ b/test/config.json @@ -295,6 +295,15 @@ "RetryMax": 1, "Tags": [] }, + "onlineddl_vrepl_bench": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/onlineddl/vrepl_bench", "-timeout", "30m"], + "Command": [], + "Manual": false, + "Shard": "onlineddl_vrepl_bench", + "RetryMax": 1, + "Tags": [] + }, "onlineddl_vrepl_suite": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/onlineddl/vrepl_suite", "-timeout", "30m"], diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 419e558161f..5e81904d471 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -39821,6 +39821,15 @@ export namespace binlogdata { /** VEvent event_gtid */ event_gtid?: (string|null); + + /** VEvent must_save */ + must_save?: (boolean|null); + + /** VEvent pin_worker */ + pin_worker?: (boolean|null); + + /** VEvent skippable */ + skippable?: (boolean|null); } /** Represents a VEvent. */ @@ -39886,6 +39895,15 @@ export namespace binlogdata { /** VEvent event_gtid. */ public event_gtid: string; + /** VEvent must_save. */ + public must_save: boolean; + + /** VEvent pin_worker. */ + public pin_worker: boolean; + + /** VEvent skippable. */ + public skippable: boolean; + /** * Creates a new VEvent instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 854d7430112..279145e849a 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -93798,6 +93798,9 @@ export const binlogdata = $root.binlogdata = (() => { * @property {number|Long|null} [commit_parent] VEvent commit_parent * @property {number|Long|null} [sequence_number] VEvent sequence_number * @property {string|null} [event_gtid] VEvent event_gtid + * @property {boolean|null} [must_save] VEvent must_save + * @property {boolean|null} [pin_worker] VEvent pin_worker + * @property {boolean|null} [skippable] VEvent skippable */ /** @@ -93959,6 +93962,30 @@ export const binlogdata = $root.binlogdata = (() => { */ VEvent.prototype.event_gtid = ""; + /** + * VEvent must_save. + * @member {boolean} must_save + * @memberof binlogdata.VEvent + * @instance + */ + VEvent.prototype.must_save = false; + + /** + * VEvent pin_worker. + * @member {boolean} pin_worker + * @memberof binlogdata.VEvent + * @instance + */ + VEvent.prototype.pin_worker = false; + + /** + * VEvent skippable. + * @member {boolean} skippable + * @memberof binlogdata.VEvent + * @instance + */ + VEvent.prototype.skippable = false; + /** * Creates a new VEvent instance using the specified properties. * @function create @@ -94019,6 +94046,12 @@ export const binlogdata = $root.binlogdata = (() => { writer.uint32(/* id 27, wireType 0 =*/216).int64(message.sequence_number); if (message.event_gtid != null && Object.hasOwnProperty.call(message, "event_gtid")) writer.uint32(/* id 28, wireType 2 =*/226).string(message.event_gtid); + if (message.must_save != null && Object.hasOwnProperty.call(message, "must_save")) + writer.uint32(/* id 29, wireType 0 =*/232).bool(message.must_save); + if (message.pin_worker != null && Object.hasOwnProperty.call(message, "pin_worker")) + writer.uint32(/* id 30, wireType 0 =*/240).bool(message.pin_worker); + if (message.skippable != null && Object.hasOwnProperty.call(message, "skippable")) + writer.uint32(/* id 31, wireType 0 =*/248).bool(message.skippable); return writer; }; @@ -94125,6 +94158,18 @@ export const binlogdata = $root.binlogdata = (() => { message.event_gtid = reader.string(); break; } + case 29: { + message.must_save = reader.bool(); + break; + } + case 30: { + message.pin_worker = reader.bool(); + break; + } + case 31: { + message.skippable = reader.bool(); + break; + } default: reader.skipType(tag & 7); break; @@ -94249,6 +94294,15 @@ export const binlogdata = $root.binlogdata = (() => { if (message.event_gtid != null && message.hasOwnProperty("event_gtid")) if (!$util.isString(message.event_gtid)) return "event_gtid: string expected"; + if (message.must_save != null && message.hasOwnProperty("must_save")) + if (typeof message.must_save !== "boolean") + return "must_save: boolean expected"; + if (message.pin_worker != null && message.hasOwnProperty("pin_worker")) + if (typeof message.pin_worker !== "boolean") + return "pin_worker: boolean expected"; + if (message.skippable != null && message.hasOwnProperty("skippable")) + if (typeof message.skippable !== "boolean") + return "skippable: boolean expected"; return null; }; @@ -94437,6 +94491,12 @@ export const binlogdata = $root.binlogdata = (() => { message.sequence_number = new $util.LongBits(object.sequence_number.low >>> 0, object.sequence_number.high >>> 0).toNumber(); if (object.event_gtid != null) message.event_gtid = String(object.event_gtid); + if (object.must_save != null) + message.must_save = Boolean(object.must_save); + if (object.pin_worker != null) + message.pin_worker = Boolean(object.pin_worker); + if (object.skippable != null) + message.skippable = Boolean(object.skippable); return message; }; @@ -94488,6 +94548,9 @@ export const binlogdata = $root.binlogdata = (() => { } else object.sequence_number = options.longs === String ? "0" : 0; object.event_gtid = ""; + object.must_save = false; + object.pin_worker = false; + object.skippable = false; } if (message.type != null && message.hasOwnProperty("type")) object.type = options.enums === String ? $root.binlogdata.VEventType[message.type] === undefined ? message.type : $root.binlogdata.VEventType[message.type] : message.type; @@ -94537,6 +94600,12 @@ export const binlogdata = $root.binlogdata = (() => { object.sequence_number = options.longs === String ? $util.Long.prototype.toString.call(message.sequence_number) : options.longs === Number ? new $util.LongBits(message.sequence_number.low >>> 0, message.sequence_number.high >>> 0).toNumber() : message.sequence_number; if (message.event_gtid != null && message.hasOwnProperty("event_gtid")) object.event_gtid = message.event_gtid; + if (message.must_save != null && message.hasOwnProperty("must_save")) + object.must_save = message.must_save; + if (message.pin_worker != null && message.hasOwnProperty("pin_worker")) + object.pin_worker = message.pin_worker; + if (message.skippable != null && message.hasOwnProperty("skippable")) + object.skippable = message.skippable; return object; };