Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace " + defaultSourceKs,
"Lock keyspace " + defaultTargetKs,
fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", defaultSourceKs, defaultTargetKs),
fmt.Sprintf("/Stop writes on keyspace %s for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:%s;shard:0;position:", defaultSourceKs, defaultSourceKs),
"Wait for vreplication on stopped streams to catchup for up to 30s",
"Create reverse vreplication workflow p2c_reverse",
Expand All @@ -49,7 +48,6 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace " + defaultSourceKs,
fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [RDONLY,REPLICA]", defaultSourceKs, defaultTargetKs),
fmt.Sprintf("Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace %s for tablet types [RDONLY,REPLICA]", defaultTargetKs),
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
fmt.Sprintf("Serving VSchema will be rebuilt for the %s keyspace", defaultTargetKs),
Expand Down
19 changes: 15 additions & 4 deletions go/vt/proto/topodata/topodata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions go/vt/proto/topodata/topodata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat
// because it's not used in the same context (vertical vs horizontal sharding)
//
// This function should be called while holding the keyspace lock.
func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error {
func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string, allowReads bool) error {
if err := CheckKeyspaceLocked(ctx, si.keyspace); err != nil {
return err
}
Expand All @@ -424,12 +424,13 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata
TabletType: tabletType,
Cells: cells,
DeniedTables: tables,
AllowReads: allowReads,
})
return nil
}

if tabletType == topodatapb.TabletType_PRIMARY {
if err := si.updatePrimaryTabletControl(tc, remove, tables); err != nil {
if err := si.updatePrimaryTabletControl(tc, remove, tables, allowReads); err != nil {
return err
}
return nil
Expand All @@ -448,7 +449,9 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata
return nil
}

func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletControl, remove bool, tables []string) error {
func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletControl, remove bool, tables []string, allowReads bool) error {
tc.AllowReads = allowReads

var newTables []string
for _, table := range tables {
exists := false
Expand Down
6 changes: 3 additions & 3 deletions go/vt/topo/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ func lockedKeyspaceContext(keyspace string) context.Context {
}

func addToDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables, false); err != nil {
return err
}
return nil
}

func removeFromDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables, false); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}
var err error
if tcase.tables != nil || tcase.cells != nil {
err = si.UpdateDeniedTables(tcase.ctx, tcase.tabletType, tcase.cells, tcase.remove, tcase.tables)
err = si.UpdateDeniedTables(tcase.ctx, tcase.tabletType, tcase.cells, tcase.remove, tcase.tables, false)
}
if tcase.wantError != "" {
require.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3957,7 +3957,7 @@ func (s *VtctldServer) SetShardTabletControl(ctx context.Context, req *vtctldata
defer unlock(&err)

si, err := s.ts.UpdateShardFields(ctx, req.Keyspace, req.Shard, func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables)
return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables, false)
})

switch {
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func checkCellRouting(t *testing.T, ws *Server, cell string, want map[string][]s
require.EqualValues(t, got, want, "routing rules don't match for cell %s: got: %v, want: %v", cell, got, want)
}

func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want []string) {
func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, wantDeniedTables []string, wantAllowReadsFromDeniedTables bool) {
t.Helper()
ctx := context.Background()
si, err := ts.GetShard(ctx, keyspace, shard)
Expand All @@ -956,7 +956,10 @@ func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want [
if tc != nil {
got = tc.DeniedTables
}
require.EqualValues(t, got, want, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, want)
require.EqualValues(t, got, wantDeniedTables, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, wantDeniedTables)
if tc != nil {
require.Equal(t, tc.AllowReads, wantAllowReadsFromDeniedTables, "allow reads for %s/%s: got: %v, want: %v", keyspace, shard, tc.AllowReads, wantAllowReadsFromDeniedTables)
}
}

func checkServedTypes(t *testing.T, ts *topo.Server, keyspace, shard string, want int) {
Expand Down
64 changes: 55 additions & 9 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,34 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
}
}
}
mirrorRules, err := topotools.GetMirrorRules(ctx, ts.TopoServer())
if err != nil {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get mirror rules: %v", err)
}
for _, table := range ts.Tables() {
// If a rule for the primary tablet type exists (= no @primary
// qualifier) for any table and points to the target keyspace,
// then write traffic is mirrored.
fromTable := fmt.Sprintf("%s.%s", sourceKeyspace, table)
toTable := fmt.Sprintf("%s.%s", targetKeyspace, table)
if mr, ok := mirrorRules[fromTable]; ok {
if _, ok := mr[toTable]; ok {
state.WritesMirrored = true
}
}

// If a rule for either @replica or @rdonly tablet type exists for
// any table and points to the target keyspace, then read traffic is
// mirrored.
for _, tabletType := range []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} {
fromTable = fmt.Sprintf("%s@%s", fromTable, topoproto.TabletTypeLString(tabletType))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to make a new variable here, this means that we'll end up with source.table@replica@rdonly doesn't it, since we concatenate in both loops?

if mr, ok := mirrorRules[fromTable]; ok {
if _, ok := mr[toTable]; ok {
state.ReadsMirrored = true
}
}
}
}
} else {
state.WorkflowType = TypeReshard

Expand Down Expand Up @@ -1333,7 +1361,7 @@ func setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error {
}
return ts.ForAllTargets(func(target *MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables())
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables(), false)
}); err != nil {
return err
}
Expand Down Expand Up @@ -2826,7 +2854,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
}

if switchPrimary {
if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, timeout, false); err != nil {
if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, startState, timeout, false); err != nil {
return nil, err
}
s.Logger().Infof("Switch Writes done for workflow %s.%s", req.Keyspace, req.Workflow)
Expand Down Expand Up @@ -3007,9 +3035,11 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}

// Remove mirror rules for the specified tablet types.
if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
if state.ReadsMirrored {
if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
}
}

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
Expand Down Expand Up @@ -3052,7 +3082,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}

// switchWrites is a generic way of migrating write traffic for a workflow.
func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, waitTimeout time.Duration,
func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, state *State, waitTimeout time.Duration,
cancel bool,
) (journalID int64, dryRunResults *[]string, err error) {
var sw iswitcher
Expand Down Expand Up @@ -3127,9 +3157,11 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}

// Remove mirror rules for the primary tablet type.
if err := sw.mirrorTableTraffic(ctx, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil {
return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
if state.WritesMirrored {
if err := sw.mirrorTableTraffic(ctx, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil {
return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
}
}

// Find out if the target is using any sequence tables for auto_increment
Expand Down Expand Up @@ -3616,6 +3648,20 @@ func (s *Server) WorkflowMirrorTraffic(ctx context.Context, req *vtctldatapb.Wor
strings.Join(cannotSwitchTabletTypes, ","), startState.Workflow)
}

// Lock the workflow for mirror traffic.
lockName := fmt.Sprintf("%s/%s", req.Keyspace, req.Workflow)
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "MirrorTraffic")
if lockErr != nil {
return nil, vterrors.Wrapf(lockErr, "failed to lock the %s workflow", lockName)
}
defer workflowUnlock(&err)

ctx, targetUnlock, lockErr := s.ts.LockKeyspace(ctx, req.Keyspace, "MirrorTraffic")
if lockErr != nil {
return nil, vterrors.Wrapf(lockErr, "failed to lock the %s keyspace", req.Keyspace)
}
defer targetUnlock(&err)

if err := s.mirrorTraffic(ctx, req, ts, startState); err != nil {
return nil, err
}
Expand Down
Loading
Loading