Skip to content

Commit

Permalink
br/restore/log_client: use input ts as filter (pingcap#58734)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Jan 13, 2025
1 parent 91706ec commit 1f74ac3
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 44 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5867,13 +5867,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "92a67bcc499c06fd3d76cc153362540b22eaf1b09c4bda62a1599ce876b8ed78",
strip_prefix = "github.com/pingcap/[email protected]20241120071417-b5b7843d9037",
sha256 = "db34e3f94e5ac8fc5465b5440583c9e037a7f16aea8d0d8a200cdff210b12038",
strip_prefix = "github.com/pingcap/[email protected]20250102071301-c35d2b410115",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ func (helper *FakeStreamMetadataHelper) ReadFile(
) ([]byte, error) {
return helper.Data[offset : offset+length], nil
}

func (m WithMigrations) CompactionDirs() []string {
return m.compactionDirs
}
6 changes: 5 additions & 1 deletion br/pkg/restore/log_client/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (builder *WithMigrationsBuilder) coarseGrainedFilter(mig *backuppb.Migratio
// log file [ .. .. .. .. ]
//
for _, compaction := range mig.Compactions {
if compaction.CompactionUntilTs < builder.shiftStartTS || compaction.CompactionFromTs > builder.restoredTS {
// Some old compaction may not contain input min / max ts.
// In that case, we should never filter it out.
rangeValid := compaction.InputMinTs != 0 && compaction.InputMaxTs != 0
outOfRange := compaction.InputMaxTs < builder.shiftStartTS || compaction.InputMinTs > builder.restoredTS
if rangeValid && outOfRange {
return true
}
}
Expand Down
146 changes: 118 additions & 28 deletions br/pkg/restore/log_client/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 0,
CompactionUntilTs: 9,
InputMinTs: 1,
InputMaxTs: 9,
},
},
},
Expand All @@ -240,8 +240,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 50,
CompactionUntilTs: 52,
InputMinTs: 50,
InputMaxTs: 52,
},
},
},
Expand All @@ -264,8 +264,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 50,
CompactionUntilTs: 52,
InputMinTs: 50,
InputMaxTs: 52,
},
},
},
Expand All @@ -275,8 +275,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 120,
CompactionUntilTs: 140,
InputMinTs: 120,
InputMaxTs: 140,
},
},
},
Expand All @@ -299,8 +299,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 50,
CompactionUntilTs: 52,
InputMinTs: 50,
InputMaxTs: 52,
},
},
},
Expand All @@ -310,8 +310,8 @@ func TestMigrations(t *testing.T) {
},
Compactions: []*backuppb.LogFileCompaction{
{
CompactionFromTs: 1200,
CompactionUntilTs: 1400,
InputMinTs: 1200,
InputMaxTs: 1400,
},
},
},
Expand All @@ -329,24 +329,114 @@ func TestMigrations(t *testing.T) {
}

ctx := context.Background()
for _, cs := range cases {
builder := logclient.NewMigrationBuilder(10, 100, 200)
withMigrations := builder.Build(cs.migrations)
it := withMigrations.Metas(generateMetaNameIter())
checkMetaNameIter(t, cs.expectStoreIds, it)
it = withMigrations.Metas(generateMetaNameIter())
collect := iter.CollectAll(ctx, it)
require.NoError(t, collect.Err)
for j, meta := range collect.Item {
physicalIter := generatePhysicalIter(meta)
checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter)
physicalIter = generatePhysicalIter(meta)
collect := iter.CollectAll(ctx, physicalIter)
for i, cs := range cases {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
builder := logclient.NewMigrationBuilder(10, 100, 200)
withMigrations := builder.Build(cs.migrations)
it := withMigrations.Metas(generateMetaNameIter())
checkMetaNameIter(t, cs.expectStoreIds, it)
it = withMigrations.Metas(generateMetaNameIter())
collect := iter.CollectAll(ctx, it)
require.NoError(t, collect.Err)
for k, phy := range collect.Item {
logicalIter := generateLogicalIter(phy)
checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter)
for j, meta := range collect.Item {
physicalIter := generatePhysicalIter(meta)
checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter)
physicalIter = generatePhysicalIter(meta)
collect := iter.CollectAll(ctx, physicalIter)
require.NoError(t, collect.Err)
for k, phy := range collect.Item {
logicalIter := generateLogicalIter(phy)
checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter)
}
}
})
}
}

func pack[T any](ts ...T) []T {
return ts
}

func TestFilterOut(t *testing.T) {
type Case struct {
ShiftedStartTs uint64
RestoredTs uint64
Migs []*backuppb.Migration

ExceptedCompactionsArtificateDir []string
}
withCompactTsCompaction := func(iMin, iMax, cFrom, cUntil uint64, name string) *backuppb.LogFileCompaction {
return &backuppb.LogFileCompaction{
InputMinTs: iMin,
InputMaxTs: iMax,
CompactionFromTs: cFrom,
CompactionUntilTs: cUntil,
Artifacts: name,
}
}
simpleCompaction := func(iMin, iMax uint64, name string) *backuppb.LogFileCompaction {
return &backuppb.LogFileCompaction{
InputMinTs: iMin,
InputMaxTs: iMax,
Artifacts: name,
}
}
makeMig := func(cs ...*backuppb.LogFileCompaction) *backuppb.Migration {
return &backuppb.Migration{Compactions: cs}
}

cases := []Case{
{
ShiftedStartTs: 50,
RestoredTs: 60,
Migs: pack(
makeMig(simpleCompaction(49, 61, "a")),
makeMig(simpleCompaction(61, 80, "b")),
),

ExceptedCompactionsArtificateDir: pack("a"),
},
{
ShiftedStartTs: 30,
RestoredTs: 50,
Migs: pack(
makeMig(simpleCompaction(40, 60, "1a")),
makeMig(simpleCompaction(10, 20, "1b")),
makeMig(simpleCompaction(31, 50, "2a")),
makeMig(simpleCompaction(50, 80, "2b")),
),

ExceptedCompactionsArtificateDir: pack("1a", "2a", "2b"),
},
{
ShiftedStartTs: 30,
RestoredTs: 50,
Migs: pack(
makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")),
makeMig(withCompactTsCompaction(10, 30, 15, 29, "b")),
makeMig(withCompactTsCompaction(8, 29, 10, 20, "c")),
),

ExceptedCompactionsArtificateDir: pack("a", "b"),
},
{
ShiftedStartTs: 100,
RestoredTs: 120,
Migs: pack(
makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")),
makeMig(withCompactTsCompaction(0, 0, 15, 29, "b")),
makeMig(withCompactTsCompaction(0, 0, 10, 20, "c")),
),

ExceptedCompactionsArtificateDir: pack("a", "b", "c"),
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
b := logclient.NewMigrationBuilder(c.ShiftedStartTs, c.ShiftedStartTs, c.RestoredTs)
i := b.Build(c.Migs)
require.ElementsMatch(t, i.CompactionDirs(), c.ExceptedCompactionsArtificateDir)
})
}
}
5 changes: 3 additions & 2 deletions br/pkg/stream/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,9 @@ func (m MigrationExt) doTruncating(ctx context.Context, mig *pb.Migration, resul
// NOTE: Execution of truncation wasn't implemented here.
// If we are going to truncate some files, for now we still need to use `br log truncate`.
for _, compaction := range mig.Compactions {
// Can we also remove the compaction when `until-ts` is equal to `truncated-to`...?
if compaction.CompactionUntilTs > mig.TruncatedTo {
// We can only clean up a compaction when we are sure all its inputs
// are no more used.
if compaction.InputMaxTs > mig.TruncatedTo {
result.NewBase.Compactions = append(result.NewBase.Compactions, compaction)
} else {
m.tryRemovePrefix(ctx, compaction.Artifacts, result)
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/stream/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,13 @@ func mDstrPfx(path ...string) migOP {
}
}

func mCompaction(cPath, aPath string, fromTs, untilTs uint64) migOP {
func mCompaction(cPath, aPath string, minTs, maxTs uint64) migOP {
return func(m *backuppb.Migration) {
c := &backuppb.LogFileCompaction{}
c.GeneratedFiles = cPath
c.Artifacts = aPath
c.CompactionFromTs = fromTs
c.CompactionUntilTs = untilTs
c.InputMinTs = minTs
c.InputMaxTs = maxTs
m.Compactions = append(m.Compactions, c)
}
}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ func (t trivialFlushStream) RecvMsg(m any) error {
return nil
}

func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) {
f.flush()
return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil
}

func (f *fakeStore) GetID() uint64 {
return f.id
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115 h1:tFaBKtuVsTaYgWVa4fJVBHEi3vqdqRtmjMypEK+CN88=
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8=
Expand Down

0 comments on commit 1f74ac3

Please sign in to comment.