diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 9c915b69ba4..e452a2a0c95 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -31,7 +31,6 @@ const ( var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace " + defaultSourceKs, "Lock keyspace " + defaultTargetKs, - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", defaultSourceKs, defaultTargetKs), fmt.Sprintf("/Stop writes on keyspace %s for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:%s;shard:0;position:", defaultSourceKs, defaultSourceKs), "Wait for vreplication on stopped streams to catchup for up to 30s", "Create reverse vreplication workflow p2c_reverse", @@ -49,7 +48,6 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace " + defaultSourceKs, - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [RDONLY,REPLICA]", defaultSourceKs, defaultTargetKs), fmt.Sprintf("Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace %s for tablet types [RDONLY,REPLICA]", defaultTargetKs), "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", fmt.Sprintf("Serving VSchema will be rebuilt for the %s keyspace", defaultTargetKs), diff --git a/go/vt/proto/topodata/topodata.pb.go b/go/vt/proto/topodata/topodata.pb.go index 035d91153e1..3c1710e6ee1 100644 --- a/go/vt/proto/topodata/topodata.pb.go +++ b/go/vt/proto/topodata/topodata.pb.go @@ -1575,7 +1575,9 @@ type Shard_TabletControl struct { DeniedTables []string `protobuf:"bytes,4,rep,name=denied_tables,json=deniedTables,proto3" json:"denied_tables,omitempty"` // frozen is set if we've started failing over traffic for // the primary. If set, this record should not be removed. - Frozen bool `protobuf:"varint,5,opt,name=frozen,proto3" json:"frozen,omitempty"` + Frozen bool `protobuf:"varint,5,opt,name=frozen,proto3" json:"frozen,omitempty"` + // Allow reads to the denied tables. + AllowReads bool `protobuf:"varint,6,opt,name=allow_reads,json=allowReads,proto3" json:"allow_reads,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1638,6 +1640,13 @@ func (x *Shard_TabletControl) GetFrozen() bool { return false } +func (x *Shard_TabletControl) GetAllowReads() bool { + if x != nil { + return x.AllowReads + } + return false +} + // Node describes a tablet instance within the cell type ShardReplication_Node struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1823,7 +1832,7 @@ const file_topodata_proto_rawDesc = "" + "\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\x1a7\n" + "\tTagsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01J\x04\b\x03\x10\x04J\x04\b\v\x10\fJ\x04\b\x0f\x10\x10\"\xef\x05\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01J\x04\b\x03\x10\x04J\x04\b\v\x10\fJ\x04\b\x0f\x10\x10\"\x90\x06\n" + "\x05Shard\x12:\n" + "\rprimary_alias\x18\x01 \x01(\v2\x15.topodata.TabletAliasR\fprimaryAlias\x12C\n" + "\x17primary_term_start_time\x18\b \x01(\v2\f.vttime.TimeR\x14primaryTermStartTime\x12/\n" + @@ -1838,13 +1847,15 @@ const file_topodata_proto_rawDesc = "" + "\bkeyspace\x18\x02 \x01(\tR\bkeyspace\x12\x14\n" + "\x05shard\x18\x03 \x01(\tR\x05shard\x12/\n" + "\tkey_range\x18\x04 \x01(\v2\x12.topodata.KeyRangeR\bkeyRange\x12\x16\n" + - "\x06tables\x18\x05 \x03(\tR\x06tables\x1a\x9f\x01\n" + + "\x06tables\x18\x05 \x03(\tR\x06tables\x1a\xc0\x01\n" + "\rTabletControl\x125\n" + "\vtablet_type\x18\x01 \x01(\x0e2\x14.topodata.TabletTypeR\n" + "tabletType\x12\x14\n" + "\x05cells\x18\x02 \x03(\tR\x05cells\x12#\n" + "\rdenied_tables\x18\x04 \x03(\tR\fdeniedTables\x12\x16\n" + - "\x06frozen\x18\x05 \x01(\bR\x06frozenJ\x04\b\x03\x10\x04J\x04\b\x03\x10\x04J\x04\b\x05\x10\x06\"\x88\x03\n" + + "\x06frozen\x18\x05 \x01(\bR\x06frozen\x12\x1f\n" + + "\vallow_reads\x18\x06 \x01(\bR\n" + + "allowReadsJ\x04\b\x03\x10\x04J\x04\b\x03\x10\x04J\x04\b\x05\x10\x06\"\x88\x03\n" + "\bKeyspace\x12;\n" + "\rkeyspace_type\x18\x05 \x01(\x0e2\x16.topodata.KeyspaceTypeR\fkeyspaceType\x12#\n" + "\rbase_keyspace\x18\x06 \x01(\tR\fbaseKeyspace\x121\n" + diff --git a/go/vt/proto/topodata/topodata_vtproto.pb.go b/go/vt/proto/topodata/topodata_vtproto.pb.go index 41c60bf1133..56daf822129 100644 --- a/go/vt/proto/topodata/topodata_vtproto.pb.go +++ b/go/vt/proto/topodata/topodata_vtproto.pb.go @@ -141,6 +141,7 @@ func (m *Shard_TabletControl) CloneVT() *Shard_TabletControl { r := new(Shard_TabletControl) r.TabletType = m.TabletType r.Frozen = m.Frozen + r.AllowReads = m.AllowReads if rhs := m.Cells; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -906,6 +907,16 @@ func (m *Shard_TabletControl) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.AllowReads { + i-- + if m.AllowReads { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x30 + } if m.Frozen { i-- if m.Frozen { @@ -2129,6 +2140,9 @@ func (m *Shard_TabletControl) SizeVT() (n int) { if m.Frozen { n += 2 } + if m.AllowReads { + n += 2 + } n += len(m.unknownFields) return n } @@ -3732,6 +3746,26 @@ func (m *Shard_TabletControl) UnmarshalVT(dAtA []byte) error { } } m.Frozen = bool(v != 0) + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AllowReads", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.AllowReads = bool(v != 0) default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index dc49ad8578b..540a56b51e9 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -402,7 +402,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat // because it's not used in the same context (vertical vs horizontal sharding) // // This function should be called while holding the keyspace lock. -func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error { +func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string, allowReads bool) error { if err := CheckKeyspaceLocked(ctx, si.keyspace); err != nil { return err } @@ -424,12 +424,13 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata TabletType: tabletType, Cells: cells, DeniedTables: tables, + AllowReads: allowReads, }) return nil } if tabletType == topodatapb.TabletType_PRIMARY { - if err := si.updatePrimaryTabletControl(tc, remove, tables); err != nil { + if err := si.updatePrimaryTabletControl(tc, remove, tables, allowReads); err != nil { return err } return nil @@ -448,7 +449,9 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata return nil } -func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletControl, remove bool, tables []string) error { +func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletControl, remove bool, tables []string, allowReads bool) error { + tc.AllowReads = allowReads + var newTables []string for _, table := range tables { exists := false diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go index 678b883a463..79fc1eae388 100644 --- a/go/vt/topo/shard_test.go +++ b/go/vt/topo/shard_test.go @@ -106,14 +106,14 @@ func lockedKeyspaceContext(keyspace string) context.Context { } func addToDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error { - if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables); err != nil { + if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables, false); err != nil { return err } return nil } func removeFromDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error { - if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables); err != nil { + if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables, false); err != nil { return err } return nil @@ -285,7 +285,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) { } var err error if tcase.tables != nil || tcase.cells != nil { - err = si.UpdateDeniedTables(tcase.ctx, tcase.tabletType, tcase.cells, tcase.remove, tcase.tables) + err = si.UpdateDeniedTables(tcase.ctx, tcase.tabletType, tcase.cells, tcase.remove, tcase.tables, false) } if tcase.wantError != "" { require.Error(t, err) diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 520d322ba67..e74804f47a3 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -3959,7 +3959,7 @@ func (s *VtctldServer) SetShardTabletControl(ctx context.Context, req *vtctldata defer unlock(&err) si, err := s.ts.UpdateShardFields(ctx, req.Keyspace, req.Shard, func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables) + return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables, false) }) switch { diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 7febad2359d..c6005837cef 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -946,7 +946,7 @@ func checkCellRouting(t *testing.T, ws *Server, cell string, want map[string][]s require.EqualValues(t, got, want, "routing rules don't match for cell %s: got: %v, want: %v", cell, got, want) } -func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want []string) { +func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, wantDeniedTables []string, wantAllowReadsFromDeniedTables bool) { t.Helper() ctx := context.Background() si, err := ts.GetShard(ctx, keyspace, shard) @@ -956,7 +956,10 @@ func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want [ if tc != nil { got = tc.DeniedTables } - require.EqualValues(t, got, want, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, want) + require.EqualValues(t, got, wantDeniedTables, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, wantDeniedTables) + if tc != nil { + require.Equal(t, tc.AllowReads, wantAllowReadsFromDeniedTables, "allow reads for %s/%s: got: %v, want: %v", keyspace, shard, tc.AllowReads, wantAllowReadsFromDeniedTables) + } } func checkServedTypes(t *testing.T, ts *topo.Server, keyspace, shard string, want int) { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 32f77b1c14b..3d9bc0c2e3a 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -506,6 +506,34 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN } } } + mirrorRules, err := topotools.GetMirrorRules(ctx, ts.TopoServer()) + if err != nil { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get mirror rules: %v", err) + } + for _, table := range ts.Tables() { + // If a rule for the primary tablet type exists (= no @primary + // qualifier) for any table and points to the target keyspace, + // then write traffic is mirrored. + fromTable := fmt.Sprintf("%s.%s", sourceKeyspace, table) + toTable := fmt.Sprintf("%s.%s", targetKeyspace, table) + if mr, ok := mirrorRules[fromTable]; ok { + if _, ok := mr[toTable]; ok { + state.WritesMirrored = true + } + } + + // If a rule for either @replica or @rdonly tablet type exists for + // any table and points to the target keyspace, then read traffic is + // mirrored. + for _, tabletType := range []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} { + fromTable = fmt.Sprintf("%s@%s", fromTable, topoproto.TabletTypeLString(tabletType)) + if mr, ok := mirrorRules[fromTable]; ok { + if _, ok := mr[toTable]; ok { + state.ReadsMirrored = true + } + } + } + } } else { state.WorkflowType = TypeReshard @@ -1338,7 +1366,7 @@ func setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error { } return ts.ForAllTargets(func(target *MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables(), false) }); err != nil { return err } @@ -2831,7 +2859,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } if switchPrimary { - if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, timeout, false); err != nil { + if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, startState, timeout, false); err != nil { return nil, err } s.Logger().Infof("Switch Writes done for workflow %s.%s", req.Keyspace, req.Workflow) @@ -3012,9 +3040,11 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } // Remove mirror rules for the specified tablet types. - if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil { - return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", - ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + if state.ReadsMirrored { + if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil { + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", + ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + } } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { @@ -3057,7 +3087,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } // switchWrites is a generic way of migrating write traffic for a workflow. -func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, waitTimeout time.Duration, +func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, state *State, waitTimeout time.Duration, cancel bool, ) (journalID int64, dryRunResults *[]string, err error) { var sw iswitcher @@ -3132,9 +3162,11 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } // Remove mirror rules for the primary tablet type. - if err := sw.mirrorTableTraffic(ctx, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { - return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type", - ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + if state.WritesMirrored { + if err := sw.mirrorTableTraffic(ctx, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { + return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type", + ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + } } // Find out if the target is using any sequence tables for auto_increment @@ -3621,6 +3653,20 @@ func (s *Server) WorkflowMirrorTraffic(ctx context.Context, req *vtctldatapb.Wor strings.Join(cannotSwitchTabletTypes, ","), startState.Workflow) } + // Lock the workflow for mirror traffic. + lockName := fmt.Sprintf("%s/%s", req.Keyspace, req.Workflow) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "MirrorTraffic") + if lockErr != nil { + return nil, vterrors.Wrapf(lockErr, "failed to lock the %s workflow", lockName) + } + defer workflowUnlock(&err) + + ctx, targetUnlock, lockErr := s.ts.LockKeyspace(ctx, req.Keyspace, "MirrorTraffic") + if lockErr != nil { + return nil, vterrors.Wrapf(lockErr, "failed to lock the %s keyspace", req.Keyspace) + } + defer targetUnlock(&err) + if err := s.mirrorTraffic(ctx, req, ts, startState); err != nil { return nil, err } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 360c6a97360..c2653d73e81 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -482,7 +482,7 @@ func TestMoveTablesComplete(t *testing.T) { // DeniedTables live. for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { for _, shardName := range keyspace.ShardNames { - checkDenyList(t, env.ts, keyspace.KeyspaceName, shardName, nil) + checkDenyList(t, env.ts, keyspace.KeyspaceName, shardName, nil, false) } } } @@ -1043,7 +1043,7 @@ func TestWorkflowDelete(t *testing.T) { _, err := env.ts.UpdateShardFields(lockCtx, targetKeyspaceName, shard, func(si *topo.ShardInfo) error { // So t1_2 and t1_3 do not exist in the denied table list when we go // to remove t1, t1_2, and t1_3. - err := si.UpdateDeniedTables(lockCtx, topodatapb.TabletType_PRIMARY, nil, false, []string{table1Name, "t2", "t3"}) + err := si.UpdateDeniedTables(lockCtx, topodatapb.TabletType_PRIMARY, nil, false, []string{table1Name, "t2", "t3"}, false) return err }) require.NoError(t, err) @@ -1176,7 +1176,7 @@ func TestWorkflowDelete(t *testing.T) { // DeniedTables live. for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { for _, shardName := range keyspace.ShardNames { - checkDenyList(t, env.ts, keyspace.KeyspaceName, shardName, nil) + checkDenyList(t, env.ts, keyspace.KeyspaceName, shardName, nil, false) } } } @@ -1681,13 +1681,11 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { }, want: []string{ "Lock keyspace " + sourceKeyspaceName, - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), "Unlock keyspace " + sourceKeyspaceName, "Lock keyspace " + sourceKeyspaceName, "Lock keyspace " + targetKeyspaceName, - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", sourceKeyspaceName, tablesStr, sourceKeyspaceName, position, sourceKeyspaceName, position), "Wait for vreplication on stopped streams to catchup for up to 30s", @@ -1722,13 +1720,11 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { }, want: []string{ "Lock keyspace " + targetKeyspaceName, - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, sourceKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), "Unlock keyspace " + targetKeyspaceName, "Lock keyspace " + targetKeyspaceName, "Lock keyspace " + sourceKeyspaceName, - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", targetKeyspaceName, tablesStr, targetKeyspaceName, position, targetKeyspaceName, position), "Wait for vreplication on stopped streams to catchup for up to 30s", @@ -1763,7 +1759,6 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { }, want: []string{ "Lock keyspace " + sourceKeyspaceName, - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, sourceKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), fmt.Sprintf("Serving VSchema will be rebuilt for the %s keyspace", sourceKeyspaceName), @@ -1857,8 +1852,13 @@ func TestMirrorTraffic(t *testing.T) { targetKeyspace string targetShards []string - wantErr string - wantMirrorRules map[string]map[string]float32 + wantMirrorTrafficErr string + wantMirrorRules map[string]map[string]float32 + wantDeniedTables []string + wantAllowReadsFromDeniedTables bool + wantWorkflowStateErr string + wantReadsMirrored bool + wantWritesMirrored bool }{ { name: "no such keyspace", @@ -1868,8 +1868,8 @@ func TestMirrorTraffic(t *testing.T) { TabletTypes: tabletTypes, Percent: 50.0, }, - wantErr: "FindAllShardsInKeyspace(no_ks): List: node doesn't exist: keyspaces/no_ks/shards", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "FindAllShardsInKeyspace(no_ks): List: node doesn't exist: keyspaces/no_ks/shards", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "no such workflow", @@ -1888,8 +1888,9 @@ func TestMirrorTraffic(t *testing.T) { return nil, nil } }, - wantErr: "no streams found in keyspace target for no_workflow", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "no streams found in keyspace target for no_workflow", + wantMirrorRules: make(map[string]map[string]float32), + wantWorkflowStateErr: "no streams found in keyspace target", }, { name: "cannot mirror traffic for migrate workflows", @@ -1902,8 +1903,8 @@ func TestMirrorTraffic(t *testing.T) { setup: func(t *testing.T, ctx context.Context, te *testMaterializerEnv) { te.tmc.readVReplicationWorkflow = createReadVReplicationWorkflowFunc(t, binlogdatapb.VReplicationWorkflowType_Migrate, nil, te.tmc.keyspace, sourceShards, []string{table1, table2}) }, - wantErr: "invalid action for Migrate workflow: MirrorTraffic", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "invalid action for Migrate workflow: MirrorTraffic", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "cannot mirror traffic for reshard workflows", @@ -1920,8 +1921,8 @@ func TestMirrorTraffic(t *testing.T) { setup: func(t *testing.T, ctx context.Context, te *testMaterializerEnv) { te.tmc.readVReplicationWorkflow = createReadVReplicationWorkflowFunc(t, binlogdatapb.VReplicationWorkflowType_Reshard, nil, sourceKs, []string{"-80", "80-"}, []string{table1, table2}) }, - wantErr: "invalid action for Reshard workflow: MirrorTraffic", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "invalid action for Reshard workflow: MirrorTraffic", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "cannot mirror rdonly traffic after switch rdonly traffic", @@ -1935,8 +1936,8 @@ func TestMirrorTraffic(t *testing.T) { fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)}, fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)}, }, - wantErr: "cannot mirror [rdonly] traffic for workflow src2target at this time: traffic for those tablet types is switched", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "cannot mirror [rdonly] traffic for workflow src2target at this time: traffic for those tablet types is switched", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "cannot mirror replica traffic after switch replica traffic", @@ -1950,8 +1951,8 @@ func TestMirrorTraffic(t *testing.T) { fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)}, fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)}, }, - wantErr: "cannot mirror [replica] traffic for workflow src2target at this time: traffic for those tablet types is switched", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "cannot mirror [replica] traffic for workflow src2target at this time: traffic for those tablet types is switched", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "cannot mirror write traffic after switch traffic", @@ -1965,8 +1966,8 @@ func TestMirrorTraffic(t *testing.T) { fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", targetKs, table1)}, fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", targetKs, table2)}, }, - wantErr: "cannot mirror [primary] traffic for workflow src2target at this time: traffic for those tablet types is switched", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "cannot mirror [primary] traffic for workflow src2target at this time: traffic for those tablet types is switched", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "does not mirror traffic for partial move tables", @@ -2006,10 +2007,10 @@ func TestMirrorTraffic(t *testing.T) { }, nil } }, - sourceShards: []string{"-80", "80-"}, - targetShards: []string{"-80", "80-"}, - wantErr: "invalid action for partial migration: MirrorTraffic", - wantMirrorRules: make(map[string]map[string]float32), + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantMirrorTrafficErr: "invalid action for partial migration: MirrorTraffic", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "does not mirror traffic for multi-tenant move tables", @@ -2022,8 +2023,8 @@ func TestMirrorTraffic(t *testing.T) { setup: func(t *testing.T, ctx context.Context, te *testMaterializerEnv) { te.tmc.readVReplicationWorkflow = createReadVReplicationWorkflowFunc(t, binlogdatapb.VReplicationWorkflowType_MoveTables, &vtctldatapb.WorkflowOptions{TenantId: "123"}, te.tmc.keyspace, sourceShards, []string{table1, table2}) }, - wantErr: "invalid action for multi-tenant migration: MirrorTraffic", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "invalid action for multi-tenant migration: MirrorTraffic", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "does not mirror traffic for reverse move tables", @@ -2033,8 +2034,8 @@ func TestMirrorTraffic(t *testing.T) { TabletTypes: tabletTypes, Percent: 50.0, }, - wantErr: "invalid action for reverse workflow: MirrorTraffic", - wantMirrorRules: make(map[string]map[string]float32), + wantMirrorTrafficErr: "invalid action for reverse workflow: MirrorTraffic", + wantMirrorRules: make(map[string]map[string]float32), }, { name: "ok", @@ -2065,6 +2066,54 @@ func TestMirrorTraffic(t *testing.T) { fmt.Sprintf("%s.%s", targetKs, table2): 50.0, }, }, + wantDeniedTables: []string{table1, table2}, + wantAllowReadsFromDeniedTables: true, + wantReadsMirrored: true, + wantWritesMirrored: true, + }, + { + name: "ok @primary tablet type", + req: &vtctldatapb.WorkflowMirrorTrafficRequest{ + Keyspace: targetKs, + Workflow: workflow, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + Percent: 50.0, + }, + routingRules: initialRoutingRules, + wantMirrorRules: map[string]map[string]float32{ + fmt.Sprintf("%s.%s", sourceKs, table1): { + fmt.Sprintf("%s.%s", targetKs, table1): 50.0, + }, + fmt.Sprintf("%s.%s", sourceKs, table2): { + fmt.Sprintf("%s.%s", targetKs, table2): 50.0, + }, + }, + wantDeniedTables: []string{table1, table2}, + wantAllowReadsFromDeniedTables: true, + wantReadsMirrored: false, + wantWritesMirrored: true, + }, + { + name: "ok @replica tablet type", + req: &vtctldatapb.WorkflowMirrorTrafficRequest{ + Keyspace: targetKs, + Workflow: workflow, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, + Percent: 50.0, + }, + routingRules: initialRoutingRules, + wantMirrorRules: map[string]map[string]float32{ + fmt.Sprintf("%s.%s@replica", sourceKs, table1): { + fmt.Sprintf("%s.%s", targetKs, table1): 50.0, + }, + fmt.Sprintf("%s.%s@replica", sourceKs, table2): { + fmt.Sprintf("%s.%s", targetKs, table2): 50.0, + }, + }, + wantDeniedTables: []string{table1, table2}, + wantAllowReadsFromDeniedTables: true, + wantReadsMirrored: true, + wantWritesMirrored: false, }, { name: "does not overwrite unrelated mirror rules", @@ -2103,6 +2152,10 @@ func TestMirrorTraffic(t *testing.T) { targetKs + ".table2": 25.0, }, }, + wantDeniedTables: []string{table1, table2}, + wantAllowReadsFromDeniedTables: true, + wantReadsMirrored: true, + wantWritesMirrored: true, }, { name: "does not overwrite when some but not all mirror rules already exist", @@ -2124,7 +2177,7 @@ func TestMirrorTraffic(t *testing.T) { TabletTypes: tabletTypes, Percent: 50.0, }, - wantErr: "wrong number of pre-existing mirror rules", + wantMirrorTrafficErr: "wrong number of pre-existing mirror rules", wantMirrorRules: map[string]map[string]float32{ fmt.Sprintf("%s.%s", sourceKs, table1): { fmt.Sprintf("%s.%s", targetKs, table1): 25.0, @@ -2136,6 +2189,8 @@ func TestMirrorTraffic(t *testing.T) { fmt.Sprintf("%s.%s", targetKs, table1): 25.0, }, }, + wantReadsMirrored: true, // due to pre-existing mirror rules + wantWritesMirrored: true, // due to pre-existing mirror rules }, } @@ -2185,12 +2240,13 @@ func TestMirrorTraffic(t *testing.T) { } got, err := te.ws.WorkflowMirrorTraffic(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) + if tt.wantMirrorTrafficErr != "" { + require.EqualError(t, err, tt.wantMirrorTrafficErr) } else { require.NoError(t, err) require.NotNil(t, got) } + mr, err := topotools.GetMirrorRules(ctx, te.topoServ) require.NoError(t, err) wantMirrorRules := tt.mirrorRules @@ -2198,6 +2254,19 @@ func TestMirrorTraffic(t *testing.T) { wantMirrorRules = tt.wantMirrorRules } require.Equal(t, wantMirrorRules, mr) + + for _, shard := range tt.targetShards { + checkDenyList(t, te.topoServ, tt.targetKeyspace, shard, tt.wantDeniedTables, tt.wantAllowReadsFromDeniedTables) + } + + _, ws, err := te.ws.getWorkflowState(ctx, tt.targetKeyspace, workflow) + if tt.wantWorkflowStateErr != "" { + require.ErrorContains(t, err, tt.wantWorkflowStateErr) + } else { + require.NoError(t, err) + require.Equal(t, tt.wantReadsMirrored, ws.ReadsMirrored) + require.Equal(t, tt.wantWritesMirrored, ws.WritesMirrored) + } }) } } diff --git a/go/vt/vtctl/workflow/state.go b/go/vt/vtctl/workflow/state.go index 56513da9b19..df06b5613d8 100644 --- a/go/vt/vtctl/workflow/state.go +++ b/go/vt/vtctl/workflow/state.go @@ -74,6 +74,9 @@ type State struct { IsPartialMigration bool ShardsAlreadySwitched []string ShardsNotYetSwitched []string + + ReadsMirrored bool + WritesMirrored bool } func (s *State) String() string { diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index c518cda36f4..a00853051f9 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -469,7 +469,7 @@ func (ts *trafficSwitcher) deleteKeyspaceRoutingRules(ctx context.Context) error func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { return ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables(), false) }); err != nil { return err } @@ -493,7 +493,7 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { func (ts *trafficSwitcher) dropTargetDeniedTables(ctx context.Context) error { return ts.ForAllTargets(func(target *MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables(), false) }); err != nil { return err } @@ -1088,7 +1088,7 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool egrp.Go(func() error { return ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables()) + return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables(), false) }); err != nil { return err } @@ -1109,27 +1109,7 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool }) }) egrp.Go(func() error { - return ts.ForAllTargets(func(target *MigrationTarget) error { - if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, rmtarget, ts.Tables()) - }); err != nil { - return err - } - rtbsCtx, cancel := context.WithTimeout(ectx, shardTabletRefreshTimeout) - defer cancel() - isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) - if isPartial { - msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v", - target.GetShard().Keyspace(), target.GetShard().ShardName(), err, partialDetails) - if ts.force { - log.Warning(msg) - return nil - } else { - return errors.New(msg) - } - } - return err - }) + return ts.setTargetDeniedTables(ectx, rmtarget, false /*readable*/) }) if err := egrp.Wait(); err != nil { ts.Logger().Warningf("Error in switchDeniedTables: %s", err) @@ -1548,7 +1528,7 @@ func (ts *trafficSwitcher) mirrorTableTraffic(ctx context.Context, types []topod } var numExisting int - for _, table := range ts.tables { + for _, table := range ts.Tables() { for _, tabletType := range types { fromTable := fmt.Sprintf("%s.%s", ts.SourceKeyspaceName(), table) if tabletType != topodatapb.TabletType_PRIMARY { @@ -1575,13 +1555,44 @@ func (ts *trafficSwitcher) mirrorTableTraffic(ctx context.Context, types []topod } } - if numExisting > 0 && numExisting != (len(types)*len(ts.tables)) { + if numExisting > 0 && numExisting != (len(types)*len(ts.Tables())) { return vterrors.Errorf(vtrpcpb.Code_ALREADY_EXISTS, "wrong number of pre-existing mirror rules") } if err := topotools.SaveMirrorRules(ctx, ts.TopoServer(), mrs); err != nil { - return err + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to save mirror rules: %v", err) + } + + // Allow read-only queries to execute against the target tablet if there + // are any mirror rules. Otherwise, disallow them. + allowReads := len(mrs) > 0 + if err := ts.setTargetDeniedTables(ctx, false /*remove*/, allowReads); err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to make target denied tables readable=%t: %v", allowReads, err) } return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } + +func (ts *trafficSwitcher) setTargetDeniedTables(ctx context.Context, remove, allowReads bool) error { + return ts.ForAllTargets(func(target *MigrationTarget) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, remove, ts.Tables(), allowReads) + }); err != nil { + return err + } + rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) + defer cancel() + isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) + if isPartial { + msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v", + target.GetShard().Keyspace(), target.GetShard().ShardName(), err, partialDetails) + if ts.force { + log.Warning(msg) + return nil + } else { + return errors.New(msg) + } + } + return err + }) +} diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index af7468aba97..221b094455a 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/utils" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver" + "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -85,6 +86,10 @@ type tmState struct { // displayState contains the current snapshot of the internal state // and has its own mutex. displayState displayState + + // allowReadsFromDeniedTables allows readonly operations to execute against + // denied tables. + allowReadsFromDeniedTables map[topodatapb.TabletType]bool } func newTMState(tm *TabletManager, tablet *topodatapb.Tablet) *tmState { @@ -149,9 +154,11 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar ts.isResharding = len(shardInfo.SourceShards) > 0 ts.deniedTables = make(map[topodatapb.TabletType][]string) + ts.allowReadsFromDeniedTables = make(map[topodatapb.TabletType]bool) for _, tc := range shardInfo.TabletControls { if topo.InCellList(ts.tm.tabletAlias.Cell, tc.Cells) { ts.deniedTables[tc.TabletType] = tc.DeniedTables + ts.allowReadsFromDeniedTables[tc.TabletType] = tc.AllowReads } } } @@ -405,6 +412,41 @@ func (ts *tmState) applyDenyList(ctx context.Context) (err error) { for _, t := range tables { qr.AddTableCond(t) } + // This pathway exists in order to allow traffic to pass to the + // target of a MoveTables workflow after using MirrorTraffic. + if ts.allowReadsFromDeniedTables[ts.tablet.Type] { + // If a plan does not match any of the types below, it will be + // allowed to execute in spite of the table conditions above. + // + // The only plan supported by traffic mirror at present is + // PlanSelect, so add all other plans below. + qr.AddPlanCond(planbuilder.PlanNextval) + qr.AddPlanCond(planbuilder.PlanInsert) + qr.AddPlanCond(planbuilder.PlanInsertMessage) + qr.AddPlanCond(planbuilder.PlanUpdate) + qr.AddPlanCond(planbuilder.PlanUpdateLimit) + qr.AddPlanCond(planbuilder.PlanDelete) + qr.AddPlanCond(planbuilder.PlanDeleteLimit) + qr.AddPlanCond(planbuilder.PlanDDL) + qr.AddPlanCond(planbuilder.PlanSet) + qr.AddPlanCond(planbuilder.PlanOtherRead) + qr.AddPlanCond(planbuilder.PlanOtherAdmin) + qr.AddPlanCond(planbuilder.PlanMessageStream) + qr.AddPlanCond(planbuilder.PlanSavepoint) + qr.AddPlanCond(planbuilder.PlanRelease) + qr.AddPlanCond(planbuilder.PlanSRollback) + qr.AddPlanCond(planbuilder.PlanShow) + qr.AddPlanCond(planbuilder.PlanLoad) + qr.AddPlanCond(planbuilder.PlanFlush) + qr.AddPlanCond(planbuilder.PlanUnlockTables) + qr.AddPlanCond(planbuilder.PlanCallProc) + qr.AddPlanCond(planbuilder.PlanAlterMigration) + qr.AddPlanCond(planbuilder.PlanRevertMigration) + qr.AddPlanCond(planbuilder.PlanShowMigrations) + qr.AddPlanCond(planbuilder.PlanShowMigrationLogs) + qr.AddPlanCond(planbuilder.PlanShowThrottledApps) + qr.AddPlanCond(planbuilder.PlanShowThrottlerStatus) + } denyListRules.Add(qr) } } diff --git a/go/vt/vttablet/tabletmanager/tm_state_test.go b/go/vt/vttablet/tabletmanager/tm_state_test.go index 7a507248e18..ac98fea291d 100644 --- a/go/vt/vttablet/tabletmanager/tm_state_test.go +++ b/go/vt/vttablet/tabletmanager/tm_state_test.go @@ -105,35 +105,72 @@ func TestStateResharding(t *testing.T) { } func TestStateDenyList(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ts := memorytopo.NewServer(ctx, "cell1") - tm := newTestTM(t, ts, 1, "ks", "0", nil) - defer tm.Stop() - - fmd := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) - fmd.Schema = &tabletmanagerdatapb.SchemaDefinition{ - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Name: "t1", - }}, - } - si := &topo.ShardInfo{ - Shard: &topodatapb.Shard{ - TabletControls: []*topodatapb.Shard_TabletControl{{ + tests := []struct { + name string + tableDefinitions []*tabletmanagerdatapb.TableDefinition + tabletControls []*topodatapb.Shard_TabletControl + wantDeniedTables map[topodatapb.TabletType][]string + wantAllowReadsFromDeniedTables map[topodatapb.TabletType]bool + wantQueryRules string + }{ + { + name: "replica denied table", + tableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + }}, + tabletControls: []*topodatapb.Shard_TabletControl{{ + TabletType: topodatapb.TabletType_REPLICA, + Cells: []string{"cell1"}, + DeniedTables: []string{"t1"}, + }}, + wantDeniedTables: map[topodatapb.TabletType][]string{topodatapb.TabletType_REPLICA: {"t1"}}, + wantAllowReadsFromDeniedTables: map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: false}, + wantQueryRules: `[{"Description":"enforce denied tables","Name":"denied_table","TableNames":["t1"],"Action":"FAIL_RETRY"}]`, + }, + { + name: "replica denied table with allow reads", + tableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + }}, + tabletControls: []*topodatapb.Shard_TabletControl{{ TabletType: topodatapb.TabletType_REPLICA, Cells: []string{"cell1"}, DeniedTables: []string{"t1"}, + AllowReads: true, }}, + wantDeniedTables: map[topodatapb.TabletType][]string{topodatapb.TabletType_REPLICA: {"t1"}}, + wantAllowReadsFromDeniedTables: map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, + wantQueryRules: `[{"Description":"enforce denied tables","Name":"denied_table","Plans":["Nextval","Insert","InsertMessage","Update","UpdateLimit","Delete","DeleteLimit","DDL","Set","OtherRead","OtherAdmin","MessageStream","Savepoint","Release","RollbackSavepoint","Show","Load","Flush","UnlockTables","CallProcedure","AlterMigration","RevertMigration","ShowMigrations","ShowMigrationLogs","ShowThrottledApps","ShowThrottlerStatus"],"TableNames":["t1"],"Action":"FAIL_RETRY"}]`, }, } - tm.tmState.RefreshFromTopoInfo(ctx, si, nil) - tm.tmState.mu.Lock() - assert.Equal(t, map[topodatapb.TabletType][]string{topodatapb.TabletType_REPLICA: {"t1"}}, tm.tmState.deniedTables) - tm.tmState.mu.Unlock() - qsc := tm.QueryServiceControl.(*tabletservermock.Controller) - b, _ := json.Marshal(qsc.GetQueryRules(denyListQueryList)) - assert.Equal(t, `[{"Description":"enforce denied tables","Name":"denied_table","TableNames":["t1"],"Action":"FAIL_RETRY"}]`, string(b)) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := memorytopo.NewServer(ctx, "cell1") + tm := newTestTM(t, ts, 1, "ks", "0", nil) + + fmd := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) + fmd.Schema = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: tt.tableDefinitions, + } + si := &topo.ShardInfo{ + Shard: &topodatapb.Shard{ + TabletControls: tt.tabletControls, + }, + } + tm.tmState.RefreshFromTopoInfo(ctx, si, nil) + tm.tmState.mu.Lock() + assert.Equal(t, tt.wantDeniedTables, tm.tmState.deniedTables) + assert.Equal(t, tt.wantAllowReadsFromDeniedTables, tm.tmState.allowReadsFromDeniedTables) + tm.tmState.mu.Unlock() + + qsc := tm.QueryServiceControl.(*tabletservermock.Controller) + b, _ := json.Marshal(qsc.GetQueryRules(denyListQueryList)) + assert.Equal(t, tt.wantQueryRules, string(b)) + }) + } } func TestStateTabletControls(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/rules/rules.go b/go/vt/vttablet/tabletserver/rules/rules.go index 4a7d128b950..47dc31686d4 100644 --- a/go/vt/vttablet/tabletserver/rules/rules.go +++ b/go/vt/vttablet/tabletserver/rules/rules.go @@ -179,7 +179,8 @@ func (qrs *Rules) GetAction( action Action, cancelCtx context.Context, timeout time.Duration, - desc string) { + desc string, +) { for _, qr := range qrs.rules { if act := qr.GetAction(ip, user, bindVars, marginComments); act != QRContinue { return act, qr.cancelCtx, qr.timeout, qr.Description @@ -1065,3 +1066,7 @@ func safeEncode(b *bytes.Buffer, prefix string, v any) { _ = enc.Encode(err.Error()) } } + +type TableCond struct { + tableName string +} diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index dcebe8c16c9..ed38b37b839 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1235,7 +1235,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { err := ts.ForAllSources(func(source *workflow.MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables(), false) }); err != nil { return err } @@ -1539,7 +1539,7 @@ func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error { return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables(), false) }); err != nil { return err } @@ -1683,7 +1683,7 @@ func (ts *trafficSwitcher) TargetShards() []*topo.ShardInfo { func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { return ts.ForAllSources(func(source *workflow.MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables(), false) }); err != nil { return err } @@ -1697,7 +1697,7 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { func (ts *trafficSwitcher) dropTargetDeniedTables(ctx context.Context) error { return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables(), false) }); err != nil { return err } diff --git a/proto/topodata.proto b/proto/topodata.proto index 97b5f1a8e42..f5e1be43473 100644 --- a/proto/topodata.proto +++ b/proto/topodata.proto @@ -246,6 +246,9 @@ message Shard { // frozen is set if we've started failing over traffic for // the primary. If set, this record should not be removed. bool frozen = 5; + + // Allow reads to the denied tables. + bool allow_reads = 6; } // tablet_controls has at most one entry per TabletType. @@ -286,7 +289,7 @@ message Keyspace { // base_keyspace is the base keyspace from which a snapshot // keyspace is created. empty for normal keyspaces string base_keyspace = 6; - + // snapshot_time (in UTC) is a property of snapshot // keyspaces which tells us what point in time // the snapshot is of @@ -460,7 +463,7 @@ message CellInfo { reserved 3; } -// CellsAlias +// CellsAlias message CellsAlias { // Cells that map to this alias repeated string cells = 2; diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 6794bb480c5..2a4d6d77800 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -18824,6 +18824,9 @@ export namespace topodata { /** TabletControl frozen */ frozen?: (boolean|null); + + /** TabletControl allow_reads */ + allow_reads?: (boolean|null); } /** Represents a TabletControl. */ @@ -18847,6 +18850,9 @@ export namespace topodata { /** TabletControl frozen. */ public frozen: boolean; + /** TabletControl allow_reads. */ + public allow_reads: boolean; + /** * Creates a new TabletControl instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index d84cb496f8f..2cd166e91cd 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -43147,6 +43147,7 @@ export const topodata = $root.topodata = (() => { * @property {Array.|null} [cells] TabletControl cells * @property {Array.|null} [denied_tables] TabletControl denied_tables * @property {boolean|null} [frozen] TabletControl frozen + * @property {boolean|null} [allow_reads] TabletControl allow_reads */ /** @@ -43198,6 +43199,14 @@ export const topodata = $root.topodata = (() => { */ TabletControl.prototype.frozen = false; + /** + * TabletControl allow_reads. + * @member {boolean} allow_reads + * @memberof topodata.Shard.TabletControl + * @instance + */ + TabletControl.prototype.allow_reads = false; + /** * Creates a new TabletControl instance using the specified properties. * @function create @@ -43232,6 +43241,8 @@ export const topodata = $root.topodata = (() => { writer.uint32(/* id 4, wireType 2 =*/34).string(message.denied_tables[i]); if (message.frozen != null && Object.hasOwnProperty.call(message, "frozen")) writer.uint32(/* id 5, wireType 0 =*/40).bool(message.frozen); + if (message.allow_reads != null && Object.hasOwnProperty.call(message, "allow_reads")) + writer.uint32(/* id 6, wireType 0 =*/48).bool(message.allow_reads); return writer; }; @@ -43286,6 +43297,10 @@ export const topodata = $root.topodata = (() => { message.frozen = reader.bool(); break; } + case 6: { + message.allow_reads = reader.bool(); + break; + } default: reader.skipType(tag & 7); break; @@ -43355,6 +43370,9 @@ export const topodata = $root.topodata = (() => { if (message.frozen != null && message.hasOwnProperty("frozen")) if (typeof message.frozen !== "boolean") return "frozen: boolean expected"; + if (message.allow_reads != null && message.hasOwnProperty("allow_reads")) + if (typeof message.allow_reads !== "boolean") + return "allow_reads: boolean expected"; return null; }; @@ -43438,6 +43456,8 @@ export const topodata = $root.topodata = (() => { } if (object.frozen != null) message.frozen = Boolean(object.frozen); + if (object.allow_reads != null) + message.allow_reads = Boolean(object.allow_reads); return message; }; @@ -43461,6 +43481,7 @@ export const topodata = $root.topodata = (() => { if (options.defaults) { object.tablet_type = options.enums === String ? "UNKNOWN" : 0; object.frozen = false; + object.allow_reads = false; } if (message.tablet_type != null && message.hasOwnProperty("tablet_type")) object.tablet_type = options.enums === String ? $root.topodata.TabletType[message.tablet_type] === undefined ? message.tablet_type : $root.topodata.TabletType[message.tablet_type] : message.tablet_type; @@ -43476,6 +43497,8 @@ export const topodata = $root.topodata = (() => { } if (message.frozen != null && message.hasOwnProperty("frozen")) object.frozen = message.frozen; + if (message.allow_reads != null && message.hasOwnProperty("allow_reads")) + object.allow_reads = message.allow_reads; return object; };