diff --git a/assemble.go b/assemble.go index 6a407bd..d9793b4 100644 --- a/assemble.go +++ b/assemble.go @@ -3,11 +3,25 @@ package desync import ( "context" "fmt" + "golang.org/x/sync/errgroup" "os" +) - "golang.org/x/sync/errgroup" +// InvalidSeedAction represent the action that we will take if a seed +// happens to be invalid. There are currently two options: either fail with +// an error or skip the invalid seed and try to continue. +type InvalidSeedAction int + +const ( + InvalidSeedActionBailOut InvalidSeedAction = iota + InvalidSeedActionSkip ) +type AssembleOptions struct { + N int + InvalidSeedAction InvalidSeedAction +} + // AssembleFile re-assembles a file based on a list of index chunks. It runs n // goroutines, creating one filehandle for the file "name" per goroutine // and writes to the file simultaneously. If progress is provided, it'll be @@ -16,7 +30,7 @@ import ( // confirm if the data matches what is expected and only populate areas that // differ from the expected content. This can be used to complete partly // written files. -func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, n int, pb ProgressBar) (*ExtractStats, error) { +func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, options AssembleOptions, pb ProgressBar) (*ExtractStats, error) { type Job struct { segment IndexSegment source SeedSegment @@ -78,21 +92,22 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] return stats, err } defer ns.close() + seeds = append([]Seed{ns}, seeds...) // Start a self-seed which will become usable once chunks are written contigously - // beginning at position 0. + // beginning at position 0. There is no need to add this to the seeds list because + // when we create a plan it will be empty. ss, err := newSelfSeed(name, idx) if err != nil { return stats, err } - seeds = append([]Seed{ns, ss}, seeds...) // Record the total number of seeds and blocksize in the stats stats.Seeds = len(seeds) stats.Blocksize = blocksize // Start the workers, each having its own filehandle to write concurrently - for i := 0; i < n; i++ { + for i := 0; i < options.N; i++ { f, err := os.OpenFile(name, os.O_RDWR, 0666) if err != nil { return stats, fmt.Errorf("unable to open file %s, %s", name, err) @@ -104,6 +119,8 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] pb.Add(job.segment.lengthChunks()) } if job.source != nil { + // If we have a seedSegment we expect 1 or more chunks between + // the start and the end of this segment. stats.addChunksFromSeed(uint64(job.segment.lengthChunks())) offset := job.segment.start() length := job.segment.lengthBytes() @@ -118,7 +135,30 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] ss.add(job.segment) continue } + + // If we don't have a seedSegment we expect an IndexSegment with just + // a single chunk, that we can take from either the selfSeed, from the + // destination file, or from the store. + if len(job.segment.chunks()) != 1 { + panic("Received an unexpected segment that doesn't contain just a single chunk") + } c := job.segment.chunks()[0] + + // If we already took this chunk from the store we can reuse it by looking + // into the selfSeed. + if segment := ss.getChunk(c.ID); segment != nil { + copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank) + if err != nil { + return err + } + stats.addBytesCopied(copied) + stats.addBytesCloned(cloned) + // Even if we already confirmed that this chunk is present in the + // self-seed, we still need to record it as being written, otherwise + // the self-seed position pointer doesn't advance as we expect. + ss.add(job.segment) + } + // If we operate on an existing file there's a good chance we already // have the data written for this chunk. Let's read it from disk and // compare to what is expected. @@ -162,19 +202,32 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] }) } - // Let the sequencer break up the index into segments, feed the workers, and - // stop if there are any errors + // Let the sequencer break up the index into segments, create and validate a plan, + // feed the workers, and stop if there are any errors seq := NewSeedSequencer(idx, seeds...) -loop: + plan := seq.Plan() for { - chunks, from, done := seq.Next() + if err := plan.Validate(ctx, options.N); err != nil { + // This plan has at least one invalid seed + if options.InvalidSeedAction == InvalidSeedActionBailOut { + return stats, err + } + // Skip the invalid seed and try again + Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it") + seq.Rewind() + plan = seq.Plan() + continue + } + // Found a valid plan + break + } + +loop: + for _, segment := range plan { select { case <-ctx.Done(): break loop - case in <- Job{chunks, from}: - } - if done { - break + case in <- Job{segment.indexSegment, segment.source}: } } close(in) diff --git a/assemble_test.go b/assemble_test.go index dd6b21d..dee3f68 100644 --- a/assemble_test.go +++ b/assemble_test.go @@ -127,7 +127,9 @@ func TestExtract(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { defer os.Remove(test.outfile) - if _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, 10, nil); err != nil { + if _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, + AssembleOptions{10, InvalidSeedActionBailOut}, nil, + ); err != nil { t.Fatal(err) } b, err := ioutil.ReadFile(test.outfile) @@ -268,7 +270,9 @@ func TestSeed(t *testing.T) { seeds = append(seeds, seed) } - if _, err := AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, 10, nil); err != nil { + if _, err := AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, + AssembleOptions{10, InvalidSeedActionBailOut}, nil, + ); err != nil { t.Fatal(err) } b, err := ioutil.ReadFile(dst.Name()) diff --git a/cmd/desync/extract.go b/cmd/desync/extract.go index 2e91c84..bf8bff9 100644 --- a/cmd/desync/extract.go +++ b/cmd/desync/extract.go @@ -14,12 +14,13 @@ import ( type extractOptions struct { cmdStoreOptions - stores []string - cache string - seeds []string - seedDirs []string - inPlace bool - printStats bool + stores []string + cache string + seeds []string + seedDirs []string + inPlace bool + printStats bool + skipInvalidSeeds bool } func newExtractCommand(ctx context.Context) *cobra.Command { @@ -50,6 +51,7 @@ the index from STDIN.`, flags.StringSliceVarP(&opt.stores, "store", "s", nil, "source store(s)") flags.StringSliceVar(&opt.seeds, "seed", nil, "seed indexes") flags.StringSliceVar(&opt.seedDirs, "seed-dir", nil, "directory with seed index files") + flags.BoolVar(&opt.skipInvalidSeeds, "skip-invalid-seeds", false, "Skip seeds with invalid chunks") flags.StringVarP(&opt.cache, "cache", "c", "", "store to be used as cache") flags.BoolVarP(&opt.inPlace, "in-place", "k", false, "extract the file in place and keep it in case of error") flags.BoolVarP(&opt.printStats, "print-stats", "", false, "print statistics") @@ -100,11 +102,18 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error { } seeds = append(seeds, dSeeds...) + // By default, bail out if we encounter an invalid seed + invalidSeedAction := desync.InvalidSeedActionBailOut + if opt.skipInvalidSeeds { + invalidSeedAction = desync.InvalidSeedActionSkip + } + assembleOpt := desync.AssembleOptions{N: opt.n, InvalidSeedAction: invalidSeedAction} + var stats *desync.ExtractStats if opt.inPlace { - stats, err = writeInplace(ctx, outFile, idx, s, seeds, opt.n) + stats, err = writeInplace(ctx, outFile, idx, s, seeds, assembleOpt) } else { - stats, err = writeWithTmpFile(ctx, outFile, idx, s, seeds, opt.n) + stats, err = writeWithTmpFile(ctx, outFile, idx, s, seeds, assembleOpt) } if err != nil { return err @@ -115,7 +124,7 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error { return nil } -func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, n int) (*desync.ExtractStats, error) { +func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, assembleOpt desync.AssembleOptions) (*desync.ExtractStats, error) { // Prepare a tempfile that'll hold the output during processing. Close it, we // just need the name here since it'll be opened multiple times during write. // Also make sure it gets removed regardless of any errors below. @@ -128,7 +137,7 @@ func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desy defer os.Remove(tmp.Name()) // Build the blob from the chunks, writing everything into the tempfile - if stats, err = writeInplace(ctx, tmp.Name(), idx, s, seeds, n); err != nil { + if stats, err = writeInplace(ctx, tmp.Name(), idx, s, seeds, assembleOpt); err != nil { return stats, err } @@ -136,11 +145,11 @@ func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desy return stats, os.Rename(tmp.Name(), name) } -func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, n int) (*desync.ExtractStats, error) { +func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, assembleOpt desync.AssembleOptions) (*desync.ExtractStats, error) { pb := NewProgressBar("") // Build the blob from the chunks, writing everything into given filename - return desync.AssembleFile(ctx, name, idx, s, seeds, n, pb) + return desync.AssembleFile(ctx, name, idx, s, seeds, assembleOpt, pb) } func readSeeds(dstFile string, locations []string, opts cmdStoreOptions) ([]desync.Seed, error) { diff --git a/cmd/desync/extract_test.go b/cmd/desync/extract_test.go index 9a60059..20fb902 100644 --- a/cmd/desync/extract_test.go +++ b/cmd/desync/extract_test.go @@ -48,13 +48,26 @@ func TestExtractCommand(t *testing.T) { {"extract with multi seed", []string{"-s", "testdata/blob1.store", "--seed", "testdata/blob2.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, {"extract with seed directory", - []string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "testdata/blob1.caibx"}, out1}, + []string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1}, {"extract with cache", []string{"-s", "testdata/blob1.store", "-c", cacheDir, "testdata/blob1.caibx"}, out1}, {"extract with multiple stores", []string{"-s", "testdata/blob2.store", "-s", "testdata/blob1.store", "testdata/blob1.caibx"}, out1}, {"extract with multiple stores and cache", []string{"-n", "1", "-s", "testdata/blob2.store", "-s", "testdata/blob1.store", "--cache", cacheDir, "testdata/blob1.caibx"}, out1}, + {"extract with corrupted seed", + []string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1}, + {"extract with multiple corrupted seeds", + []string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1}, + // Here we don't need the `--skip-invalid-seeds` because we expect the blob1 seed to always be the chosen one, being + // a 1:1 match with the index that we want to write. So we never reach the point where we validate the corrupted seed. + {"extract with seed directory without skipping invalid seeds", + []string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "testdata/blob1.caibx"}, out1}, + // Same as above, no need for `--skip-invalid-seeds` + {"extract with multiple corrupted seeds", + []string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, + {"extract with single seed that has all the expected chunks", + []string{"--store", "testdata/empty.store", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, } { t.Run(test.name, func(t *testing.T) { cmd := newExtractCommand(context.Background()) @@ -96,3 +109,32 @@ func TestExtractWithFailover(t *testing.T) { _, err = cmd.ExecuteC() require.NoError(t, err) } + +func TestExtractWithInvalidSeeds(t *testing.T) { + outDir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(outDir) + out := filepath.Join(outDir, "out") + + for _, test := range []struct { + name string + args []string + output string + }{ + {"extract with corrupted seed", + []string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "testdata/blob1.caibx"}, out}, + {"extract with multiple corrupted seeds", + []string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob2.caibx"}, out}, + } { + t.Run(test.name, func(t *testing.T) { + cmd := newExtractCommand(context.Background()) + cmd.SetArgs(append(test.args, test.output)) + + // Redirect the command's output and run it + stderr = ioutil.Discard + cmd.SetOutput(ioutil.Discard) + _, err := cmd.ExecuteC() + require.Error(t, err) + }) + } +} diff --git a/cmd/desync/testdata/blob2_corrupted b/cmd/desync/testdata/blob2_corrupted new file mode 100644 index 0000000..4ecd1e0 Binary files /dev/null and b/cmd/desync/testdata/blob2_corrupted differ diff --git a/cmd/desync/testdata/blob2_corrupted.caibx b/cmd/desync/testdata/blob2_corrupted.caibx new file mode 120000 index 0000000..0dc15d1 --- /dev/null +++ b/cmd/desync/testdata/blob2_corrupted.caibx @@ -0,0 +1 @@ +blob2.caibx \ No newline at end of file diff --git a/cmd/desync/testdata/empty.store/.keep b/cmd/desync/testdata/empty.store/.keep new file mode 100644 index 0000000..e69de29 diff --git a/fileseed.go b/fileseed.go index 4fda1d6..57e5e50 100644 --- a/fileseed.go +++ b/fileseed.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "sync" ) // FileSeed is used to copy or clone blocks from an existing index+blob during @@ -13,6 +14,8 @@ type FileSeed struct { index Index pos map[ChunkID][]int canReflink bool + isInvalid bool + mu sync.RWMutex } // NewIndexSeed initializes a new seed that uses an existing index and its blob @@ -22,6 +25,7 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error pos: make(map[ChunkID][]int), index: index, canReflink: CanClone(dstFile, srcFile), + isInvalid: false, } for i, c := range s.index.Chunks { s.pos[c.ID] = append(s.pos[c.ID], i) @@ -29,17 +33,21 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error return &s, nil } -// LongestMatchWith returns the longest sequence of of chunks anywhere in Source -// that match b starting at b[0]. If there is no match, it returns nil +// LongestMatchWith returns the longest sequence of chunks anywhere in Source +// that match `chunks` starting at chunks[0]. If there is no match, it returns a +// length of zero and a nil SeedSegment. func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { - if len(chunks) == 0 || len(s.index.Chunks) == 0 { + s.mu.RLock() + // isInvalid can be concurrently read or wrote. Use a mutex to avoid a race + if len(chunks) == 0 || len(s.index.Chunks) == 0 || s.isInvalid { return 0, nil } + s.mu.RUnlock() pos, ok := s.pos[chunks[0].ID] if !ok { return 0, nil } - // From every position of b[0] in the source, find a slice of + // From every position of chunks[0] in the source, find a slice of // matching chunks. Then return the longest of those slices. var ( match []IndexChunk @@ -52,7 +60,13 @@ func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { max = len(m) } } - return max, newFileSeedSegment(s.srcFile, match, s.canReflink, true) + return max, newFileSeedSegment(s.srcFile, match, s.canReflink) +} + +func (s *FileSeed) SetInvalid(value bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.isInvalid = value } // Returns a slice of chunks from the seed. Compares chunks from position 0 @@ -85,15 +99,18 @@ type fileSeedSegment struct { needValidation bool } -func newFileSeedSegment(file string, chunks []IndexChunk, canReflink, needValidation bool) *fileSeedSegment { +func newFileSeedSegment(file string, chunks []IndexChunk, canReflink bool) *fileSeedSegment { return &fileSeedSegment{ - canReflink: canReflink, - needValidation: needValidation, - file: file, - chunks: chunks, + canReflink: canReflink, + file: file, + chunks: chunks, } } +func (s *fileSeedSegment) FileName() string { + return s.file +} + func (s *fileSeedSegment) Size() uint64 { if len(s.chunks) == 0 { return 0 @@ -112,13 +129,6 @@ func (s *fileSeedSegment) WriteInto(dst *os.File, offset, length, blocksize uint } defer src.Close() - // Make sure the data we're planning on pulling from the file matches what - // the index says it is if that's required. - if s.needValidation { - if err := s.validate(src); err != nil { - return 0, 0, err - } - } // Do a straight copy if reflinks are not supported or blocks aren't aligned if !s.canReflink || s.chunks[0].Start%blocksize != offset%blocksize { return s.copy(dst, src, s.chunks[0].Start, length, offset) @@ -126,12 +136,12 @@ func (s *fileSeedSegment) WriteInto(dst *os.File, offset, length, blocksize uint return s.clone(dst, src, s.chunks[0].Start, length, offset, blocksize) } -// Compares all chunks in this slice of the seed index to the underlying data +// Validate compares all chunks in this slice of the seed index to the underlying data // and fails if they don't match. -func (s *fileSeedSegment) validate(src *os.File) error { +func (s *fileSeedSegment) Validate(file *os.File) error { for _, c := range s.chunks { b := make([]byte, c.Size) - if _, err := src.ReadAt(b, int64(c.Start)); err != nil { + if _, err := file.ReadAt(b, int64(c.Start)); err != nil { return err } sum := Digest.Sum(b) diff --git a/nullseed.go b/nullseed.go index 2958500..ba8dc57 100644 --- a/nullseed.go +++ b/nullseed.go @@ -64,12 +64,25 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) } } +func (s *nullChunkSeed) SetInvalid(value bool) { + panic("A nullseed is never expected to be invalid") +} + type nullChunkSection struct { from, to uint64 blockfile *os.File canReflink bool } +func (s *nullChunkSection) Validate(file *os.File) error { + // We always assume a nullseed to be valid + return nil +} + +func (s *nullChunkSection) FileName() string { + return "" +} + func (s *nullChunkSection) Size() uint64 { return s.to - s.from } func (s *nullChunkSection) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) { diff --git a/seed.go b/seed.go index 47eba76..dceb0de 100644 --- a/seed.go +++ b/seed.go @@ -12,13 +12,16 @@ const DefaultBlockSize = 4096 // existing chunks or blocks into the target from. type Seed interface { LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) + SetInvalid(value bool) } -// SeedSegment represents a matching range between a Seed and a a file being +// SeedSegment represents a matching range between a Seed and a file being // assembled from an Index. It's used to copy or reflink data from seeds into // a target file during an extract operation. type SeedSegment interface { + FileName() string Size() uint64 + Validate(file *os.File) error WriteInto(dst *os.File, offset, end, blocksize uint64, isBlank bool) (copied uint64, cloned uint64, err error) } diff --git a/selfseed.go b/selfseed.go index 9c584ba..48bf8f7 100644 --- a/selfseed.go +++ b/selfseed.go @@ -65,56 +65,19 @@ func (s *selfSeed) add(segment IndexSegment) { } } -// LongestMatchWith returns the longest sequence of of chunks anywhere in Source -// that match b starting at b[0]. If there is no match, it returns nil -func (s *selfSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { - if len(chunks) == 0 || len(s.index.Chunks) == 0 { - return 0, nil - } +// getChunk returns a segment with the requested chunk ID. If selfSeed doesn't +// have the requested chunk, nil will be returned. +func (s *selfSeed) getChunk(id ChunkID) SeedSegment { s.mu.RLock() - pos, ok := s.pos[chunks[0].ID] + pos, ok := s.pos[id] s.mu.RUnlock() if !ok { - return 0, nil - } - // From every position of b[0] in the source, find a slice of - // matching chunks. Then return the longest of those slices. - var ( - match []IndexChunk - max int - ) - for _, p := range pos { - m := s.maxMatchFrom(chunks, p) - if len(m) > max { - match = m - max = len(m) - } + return nil } - return max, newFileSeedSegment(s.file, match, s.canReflink, false) + first := pos[0] + return newFileSeedSegment(s.file, s.index.Chunks[first:first+1], s.canReflink) } -// Returns a slice of chunks from the seed. Compares chunks from position 0 -// with seed chunks starting at p. -func (s *selfSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk { - if len(chunks) == 0 { - return nil - } - s.mu.RLock() - written := s.written - s.mu.RUnlock() - var ( - sp int - dp = p - ) - for { - if dp >= written || sp >= len(chunks) { - break - } - if chunks[sp].ID != s.index.Chunks[dp].ID { - break - } - dp++ - sp++ - } - return s.index.Chunks[p:dp] +func (s *selfSeed) SetInvalid(value bool) { + panic("A selfSeed is never expected to be invalid") } diff --git a/selfseed_test.go b/selfseed_test.go index 55eed09..7d57ef2 100644 --- a/selfseed_test.go +++ b/selfseed_test.go @@ -110,7 +110,9 @@ func TestSelfSeed(t *testing.T) { defer dst.Close() // Extract the file - stats, err := AssembleFile(context.Background(), dst.Name(), idx, s, nil, 1, nil) + stats, err := AssembleFile(context.Background(), dst.Name(), idx, s, nil, + AssembleOptions{1, InvalidSeedActionBailOut}, nil, + ) if err != nil { t.Fatal(err) } diff --git a/sequencer.go b/sequencer.go index 7f6b95f..ebc6d99 100644 --- a/sequencer.go +++ b/sequencer.go @@ -1,5 +1,11 @@ package desync +import ( + "context" + "golang.org/x/sync/errgroup" + "os" +) + // SeedSequencer is used to find sequences of chunks from seed files when assembling // a file from an index. Using seeds reduces the need to download and decompress chunks // from chunk stores. It also enables the use of reflinking/cloning of sections of @@ -10,6 +16,16 @@ type SeedSequencer struct { current int } +// SeedSegmentCandidate represent a single segment that we expect to use +// in a Plan +type SeedSegmentCandidate struct { + seed Seed + source SeedSegment + indexSegment IndexSegment +} + +type Plan []SeedSegmentCandidate + // NewSeedSequencer initializes a new sequencer from a number of seeds. func NewSeedSequencer(idx Index, src ...Seed) *SeedSequencer { return &SeedSequencer{ @@ -18,11 +34,24 @@ func NewSeedSequencer(idx Index, src ...Seed) *SeedSequencer { } } +// Plan returns a new possible plan, representing an ordered list of +// segments that can be used to re-assemble the requested file +func (r *SeedSequencer) Plan() (plan Plan) { + for { + seed, segment, source, done := r.Next() + plan = append(plan, SeedSegmentCandidate{seed, source, segment}) + if done { + break + } + } + return plan +} + // Next returns a sequence of index chunks (from the target index) and the // longest matching segment from one of the seeds. If source is nil, no // match was found in the seeds and the chunk needs to be retrieved from a // store. If done is true, the sequencer is complete. -func (r *SeedSequencer) Next() (segment IndexSegment, source SeedSegment, done bool) { +func (r *SeedSequencer) Next() (seed Seed, segment IndexSegment, source SeedSegment, done bool) { var ( max uint64 advance = 1 @@ -30,6 +59,7 @@ func (r *SeedSequencer) Next() (segment IndexSegment, source SeedSegment, done b for _, s := range r.seeds { n, m := s.LongestMatchWith(r.index.Chunks[r.current:]) if n > 0 && m.Size() > max { + seed = s source = m advance = n max = m.Size() @@ -38,5 +68,73 @@ func (r *SeedSequencer) Next() (segment IndexSegment, source SeedSegment, done b segment = IndexSegment{index: r.index, first: r.current, last: r.current + advance - 1} r.current += advance - return segment, source, r.current >= len(r.index.Chunks) + return seed, segment, source, r.current >= len(r.index.Chunks) +} + +// Rewind resets the current target index to the beginning. +func (r *SeedSequencer) Rewind() { + r.current = 0 +} + +// Validate validates a proposed plan by checking if all the chosen chunks +// are correctly provided from the seeds. In case a seed has invalid chunks, the +// entire seed is marked as invalid and an error is returned. +func (p Plan) Validate(ctx context.Context, n int) (err error) { + type Job struct { + candidate SeedSegmentCandidate + file *os.File + } + var ( + in = make(chan Job) + fileMap = make(map[string]*os.File) + ) + // Share a single file descriptor per seed for all the goroutines + for _, s := range p { + // We expect an empty filename when using nullSeeds + if s.source == nil || s.source.FileName() == "" { + continue + } + name := s.source.FileName() + if _, present := fileMap[name]; present { + continue + } else { + file, err := os.Open(name) + if err != nil { + // We were not able to open the seed. Mark it as invalid and return + s.seed.SetInvalid(true) + return err + } + fileMap[name] = file + defer file.Close() + } + } + g, ctx := errgroup.WithContext(ctx) + // Concurrently validate all the chunks in this plan + for i := 0; i < n; i++ { + g.Go(func() error { + for job := range in { + if err := job.candidate.source.Validate(job.file); err != nil { + job.candidate.seed.SetInvalid(true) + return err + } + } + return nil + }) + } + +loop: + for _, s := range p { + if s.source == nil || s.source.FileName() == "" { + // This is not a fileSeed, we have nothing to validate + continue + } + select { + case <-ctx.Done(): + break loop + case in <- Job{s, fileMap[s.source.FileName()]}: + } + } + close(in) + + return g.Wait() } diff --git a/verifyindex.go b/verifyindex.go index c01793f..af4cf97 100644 --- a/verifyindex.go +++ b/verifyindex.go @@ -39,8 +39,8 @@ func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb Progress g.Go(func() error { for c := range in { // Reuse the fileSeedSegment structure, this is really just a seed segment after all - segment := newFileSeedSegment(name, c, false, false) - if err := segment.validate(f); err != nil { + segment := newFileSeedSegment(name, c, false) + if err := segment.Validate(f); err != nil { return err }