From d9ce6d34c18accbdc47c3bface049fb7d8686fe9 Mon Sep 17 00:00:00 2001 From: folbricht Date: Tue, 21 Aug 2018 21:31:07 -0600 Subject: [PATCH] Implement filesystem-level cloning to reduce disk usage and improve performance (#55) * Add seed/reflink support * Reimplement progress bar using a 3rd-party library * Add basic seed test * Break out the seedfile tests into a dedicated section and add more * Auto-detect FS blocksize used for reflinking * Implement self-seed to allow reflinking within the same file as it's being written * When cloning from the nullchunk seed isn't available and the target file is blank, no need to copy 0 byte ranges * First documentation for reflink/seed feature * Build ioctl Linux-only code only on Linux * Add -seed-dir option for the extract command * Add -stats option to extract command to show details about the operation * Use a fixed buffer when copying from seeds to avoid io.Copy running out of memory * Add test for self-seed * Tests should clean up temp files * Use less memory when recording write progress in the self-seed * Save CPU time, less looping, when writing self-seed * Fix chunk alignment issue. Only clone from self-seed if source and target chunks are aligned * Don't write stats to STDOUT --- README.md | 29 +++++- assemble.go | 171 +++++++++++++++++++--------------- assemble_test.go | 163 +++++++++++++++++++++++++++++++- chop.go | 7 ++ cmd/desync/cache.go | 6 +- cmd/desync/chop.go | 3 +- cmd/desync/extract.go | 143 +++++++++++++++++++++++----- cmd/desync/info.go | 5 +- cmd/desync/main.go | 13 ++- cmd/desync/make.go | 18 +--- cmd/desync/progressbar.go | 100 ++++---------------- cmd/desync/verifyindex.go | 6 +- copy.go | 13 ++- doc/seed.odg | Bin 0 -> 12135 bytes doc/seed.png | Bin 0 -> 18905 bytes extractstats.go | 39 ++++++++ fileseed.go | 190 ++++++++++++++++++++++++++++++++++++++ ioctl_linux.go | 65 +++++++++++++ ioctl_nonlinux.go | 16 ++++ make.go | 13 ++- nullseed.go | 136 +++++++++++++++++++++++++++ progress.go | 12 +++ progressbar.go | 8 -- seed.go | 50 ++++++++++ selfseed.go | 120 ++++++++++++++++++++++++ selfseed_test.go | 144 +++++++++++++++++++++++++++++ sequencer.go | 34 +++++++ verifyindex.go | 13 ++- 28 files changed, 1283 insertions(+), 234 deletions(-) create mode 100644 doc/seed.odg create mode 100644 doc/seed.png create mode 100644 extractstats.go create mode 100644 fileseed.go create mode 100644 ioctl_linux.go create mode 100644 ioctl_nonlinux.go create mode 100644 nullseed.go create mode 100644 progress.go delete mode 100644 progressbar.go create mode 100644 seed.go create mode 100644 selfseed.go create mode 100644 selfseed_test.go create mode 100644 sequencer.go diff --git a/README.md b/README.md index 6159556..d7031f4 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Among the distinguishing factors: - Allows FUSE mounting of blob indexes - S3 protocol support to access chunk stores for read operations and some some commands that write chunks - Stores and retrieves index files from remote index stores such as HTTP, SFTP and S3 +- Reflinking matching blocks (rather than copying) from seed files if supported by the filesystem (currently only Btrfs and XFS) ## Parallel chunking One of the significant differences to casync is that desync attempts to make chunking faster by utilizing more CPU resources, chunking data in parallel. Depending on the chosen degree of concurrency, the file is split into N equal parts and each part is chunked independently. While the chunking of each part is ongoing, part1 is trying to align with part2, and part3 is trying to align with part4 and so on. Alignment is achieved once a common split point is found in the overlapping area. If a common split point is found, the process chunking the previous part stops, eg. part1 chunker stops, part2 chunker keeps going until it aligns with part3 and so on until all split points have been found. Once all split points have been determined, the file is opened again (N times) to read, compress and store the chunks. While in most cases this process achieves significantly reduced chunking times at the cost of CPU, there are edge cases where chunking is only about as fast as upstream casync (with more CPU usage). This is the case if no split points can be found in the data between min and max chunk size as is the case if most or all of the file consists of 0-bytes. In this situation, the concurrent chunking processes for each part will not align with each other and a lot of effort is wasted. The table below shows how the type of data that is being chunked can influence runtime of each operation. `make` refers to the process of chunking, while `extract` refers to re-assembly of blobs from chunks. @@ -31,6 +32,17 @@ Command | Mostly/All 0-bytes | Typical data make | Slow (worst-case) - Likely comparable to casync | Fast - Parallel chunking extract | Extremely fast - Effectively the speed of a truncate() syscall | Fast - Done in parallel, usually limited by I/O +## Seeds and reflinks + +Copy-on-write filesystems such as Btrfs and XFS support cloning of blocks between files in order to save disk space as well as improve extraction performance. To utilize this feature, desync uses several seeds to clone sections of files rather than reading the data from chunk-stores and copying it in place: +- A built-in seed for Null-chunks (a chunk of Max chunk site containing only 0 bytes). This can significantly reduce the disk usage of files with large 0-byte ranges, such as VM images. This will effectively turn an eager-zeroed VM disk into a sparse disk while retaining all the advantages of eager-zeroed disk images. +- A build-in Self-seed. As chunks are being written to the destination file, the file itself becomes a seed. If one chunk, or a series of chunks is used again later in the file, it'll be cloned from the position written previously. This saves storage when the file contains several repetitive sections. +- Seed files and their indexes can be provided when extracting a file. For this feature, it's necessary to already have the index plus its blob on disk. So for example `image-v1.vmdk` and `image-v1.vmdk.caibx` can be used as seed for the extract operation of `image-v2.vmdk`. The amount of additional disk space required to store `image-v2.vmdk` will be the delta between it and `image-v1.vmdk`. + +![](doc/seed.png) + +Even if cloning is not available, seeds are still useful. `desync` automatically determines if reflinks are available (and the block size used in the filesystem). If cloning is not supported, sections are copied instead of cloned. Copying still improves performance and reduces the load created by retrieving chunks over the network and decompressing them. + ## Tool The tool is provided for convenience. It uses the desync library and makes most features of it available in a consistent fashion. It does not match upsteam casync's syntax exactly, but tries to be similar at least. @@ -43,7 +55,7 @@ go get -u github.com/folbricht/desync/cmd/desync ``` ### Subcommands -- `extract` - build a blob from an index file +- `extract` - build a blob from an index file, optionally using seed indexes+blobs - `verify` - verify the integrity of a local store - `list-chunks` - list all chunk IDs contained in an index file - `cache` - populate a cache from index files without extracting a blob or archive @@ -60,6 +72,8 @@ go get -u github.com/folbricht/desync/cmd/desync ### Options (not all apply to all commands) - `-s ` Location of the chunk store, can be local directory or a URL like ssh://hostname/path/to/store. Multiple stores can be specified, they'll be queried for chunks in the same order. The `chop`, `make`, `tar` and `prune` commands support updating chunk stores in S3, while `verify` only operates on a local store. +- `-seed ` Specifies a seed file and index for the `extract` command. The tool expects the matching file to be present and have the same name as the index file, without the `.caibx` extension. +- `-seed-dir ` Specifies a directory containing seed files and their indexes for the `extract` command. For each index file in the directory (`*.caibx`) there needs to be a matching blob without the extension. - `-c ` Location of a chunk store to be used as cache. Needs to be writable. - `-n ` Number of concurrent download jobs and ssh sessions to the chunk store. - `-r` Repair a local cache by removing invalid chunks. Only valid for the `verify` command. @@ -189,6 +203,19 @@ Use multiple stores, specify the local one first to improve performance. desync extract -s /some/local/store -s ssh://192.168.1.1/path/to/casync.store/ somefile.tar.caibx somefile.tar ``` +Extract version 3 of a disk image using the previous 2 versions as seed for cloning (if supported), or copying. Note, when providing a seed like `-seed .ext.caibx`, it is assumed that `.ext` is available next to the index file, and matches the index. +``` +desync extract -s /local/store \ + -seed image-v1.qcow2.caibx \ + -seed image-v2.qcow2.caibx \ + image-v3.qcow2.caibx image-v3.qcow2 +``` + +Extract an image using several seeds present in a directory. Each of the `.caibx` files in the directory needs to have a matching blob of the same name. It is possible for the source index file to be in the same directory also (it'll be skipped automatically). +``` +desync extract -s /local/store -seed-dir /path/to/images image-v3.qcow2.caibx image-v3.qcow2 +``` + Mix and match remote stores and use a local cache store to improve performance. ``` desync extract \ diff --git a/assemble.go b/assemble.go index b68cecc..2911f05 100644 --- a/assemble.go +++ b/assemble.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "sync" + "syscall" "github.com/pkg/errors" ) @@ -18,18 +19,34 @@ import ( // confirm if the data matches what is expected and only populate areas that // differ from the expected content. This can be used to complete partly // written files. -func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, progress func()) error { +func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, n int, pb ProgressBar) (*ExtractStats, error) { + type Job struct { + segment indexSegment + source SeedSegment + } var ( - wg sync.WaitGroup - mu sync.Mutex - pErr error - in = make(chan IndexChunk) - nullChunk = NewNullChunk(idx.Index.ChunkSizeMax) - isBlank bool + wg sync.WaitGroup + mu sync.Mutex + pErr error + in = make(chan Job) + isBlank bool ) ctx, cancel := context.WithCancel(ctx) defer cancel() + // Setup and start the progressbar if any + if pb != nil { + pb.SetTotal(len(idx.Chunks)) + pb.Start() + defer pb.Finish() + } + + // Initialize stats to be gathered during extraction + stats := &ExtractStats{ + BytesTotal: idx.Length(), + ChunksTotal: len(idx.Chunks), + } + // Helper function to record and deal with any errors in the goroutines recordError := func(err error) { mu.Lock() @@ -46,7 +63,7 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p case os.IsNotExist(err): f, err := os.Create(name) if err != nil { - return err + return stats, err } f.Close() isBlank = true @@ -58,33 +75,62 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p // confirm there's enough disk space, but it allows for an optimization // when dealing with the Null Chunk if err := os.Truncate(name, idx.Length()); err != nil { - return err + return stats, err } - // Keep a record of what's already been written to the file and can be - // re-used if there are duplicate chunks - var written fileChunks + // Determine the blocksize of the target file which is required for reflinking + blocksize := blocksizeOfFile(name) + + // Prepend a nullchunk seed to the list of seeds to make sure we read that + // before any large null sections in other seed files + ns, err := newNullChunkSeed(name, blocksize, idx.Index.ChunkSizeMax) + if err != nil { + return stats, err + } + defer ns.close() + + // Start a self-seed which will become usable once chunks are written contigously + // beginning at position 0. + ss, err := newSelfSeed(name, idx) + if err != nil { + return stats, err + } + seeds = append([]Seed{ns, ss}, seeds...) + + // Record the total number of seeds and blocksize in the stats + stats.Seeds = len(seeds) + stats.Blocksize = blocksize // Start the workers, each having its own filehandle to write concurrently for i := 0; i < n; i++ { wg.Add(1) f, err := os.OpenFile(name, os.O_RDWR, 0666) if err != nil { - return fmt.Errorf("unable to open file %s, %s", name, err) + return stats, fmt.Errorf("unable to open file %s, %s", name, err) } defer f.Close() go func() { - for c := range in { - if progress != nil { - progress() + for job := range in { + if pb != nil { + pb.Add(job.segment.lengthChunks()) } - // See if we can skip the chunk retrieval and decompression if the - // null chunk is being requested. If a new file is truncated to the - // right size beforehand, there's nothing to do since everything - // defaults to 0 bytes. - if isBlank && c.ID == nullChunk.ID { + if job.source != nil { + stats.addChunksFromSeed(uint64(job.segment.lengthChunks())) + offset := job.segment.start() + length := job.segment.lengthBytes() + copied, cloned, err := job.source.WriteInto(f, offset, length, blocksize, isBlank) + if err != nil { + recordError(err) + continue + } + stats.addBytesCopied(copied) + stats.addBytesCloned(cloned) + // Record this segment's been written in the self-seed to make it + // available going forward + ss.add(job.segment) continue } + c := job.segment.chunks()[0] // If we operate on an existing file there's a good chance we already // have the data written for this chunk. Let's read it from disk and // compare to what is expected. @@ -96,19 +142,15 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p } sum := sha512.Sum512_256(b) if sum == c.ID { - written.add(c) + // Record this chunk's been written in the self-seed + ss.add(job.segment) + // Record we kept this chunk in the file (when using in-place extract) + stats.incChunksInPlace() continue } } - // Before pulling a chunk from the store, let's see if that same chunk's - // been written to the file already. If so, we can simply clone it from - // that location. - if cw, ok := written.get(c.ID); ok { - if err := cloneInFile(f, c, cw); err != nil { - recordError(err) - } - continue - } + // Record this chunk having been pulled from the store + stats.incChunksFromStore() // Pull the (compressed) chunk from the store b, err := s.GetChunk(c.ID) if err != nil { @@ -141,70 +183,45 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p recordError(err) continue } - // Make a record of this chunk being available in the file now - written.add(c) + // Record this chunk's been written in the self-seed + ss.add(job.segment) } wg.Done() }() } - // Feed the workers, stop if there are any errors + // Let the sequencer break up the index into segments, feed the workers, and + // stop if there are any errors + seq := NewSeedSequencer(idx, seeds...) loop: - for _, c := range idx.Chunks { + for { // See if we're meant to stop select { case <-ctx.Done(): break loop default: } - in <- c + chunks, from, done := seq.Next() + in <- Job{chunks, from} + if done { + break + } } close(in) wg.Wait() - return pErr -} - -// fileChunks acts as a kind of in-file cache for chunks already written to -// the file being assembled. Every chunk ref that has been successfully written -// into the file is added to it. If another write operation requires the same -// (duplicate) chunk again, it can just copied out of the file to the new -// position, rather than requesting it from a (possibly remote) store again -// and decompressing it. -type fileChunks struct { - mu sync.RWMutex - chunks map[ChunkID]IndexChunk -} - -func (f *fileChunks) add(c IndexChunk) { - f.mu.Lock() - defer f.mu.Unlock() - if len(f.chunks) == 0 { - f.chunks = make(map[ChunkID]IndexChunk) - } - f.chunks[c.ID] = c -} - -func (f *fileChunks) get(id ChunkID) (IndexChunk, bool) { - f.mu.RLock() - defer f.mu.RUnlock() - c, ok := f.chunks[id] - return c, ok + return stats, pErr } -// cloneInFile copies a chunk from one position to another in the same file. -// Used when duplicate chunks are used in a file. TODO: The current implementation -// uses just the one given filehandle, copies into memory, then writes to disk. -// It may be more efficient to open a 2nd filehandle, seek, and copy directly -// with a io.LimitReader. -func cloneInFile(f *os.File, dst, src IndexChunk) error { - if src.ID != dst.ID || src.Size != dst.Size { - return errors.New("internal error: different chunks requested for in-file copy") +func blocksizeOfFile(name string) uint64 { + stat, err := os.Stat(name) + if err != nil { + return DefaultBlockSize } - b := make([]byte, int64(src.Size)) - if _, err := f.ReadAt(b, int64(src.Start)); err != nil { - return err + switch sys := stat.Sys().(type) { + case *syscall.Stat_t: + return uint64(sys.Blksize) + default: + return DefaultBlockSize } - _, err := f.WriteAt(b, int64(dst.Start)) - return err } diff --git a/assemble_test.go b/assemble_test.go index 8a321a7..9bfdc35 100644 --- a/assemble_test.go +++ b/assemble_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/md5" + "crypto/rand" "io" "io/ioutil" "os" @@ -76,7 +77,7 @@ func TestExtract(t *testing.T) { if err != nil { t.Fatal(err) } - os.RemoveAll(out1.Name()) + os.Remove(out1.Name()) // This one is a complete file matching what we exepct at the end out2, err := ioutil.TempFile("", "out2") @@ -87,7 +88,7 @@ func TestExtract(t *testing.T) { t.Fatal(err) } out2.Close() - defer os.RemoveAll(out2.Name()) + defer os.Remove(out2.Name()) // Incomplete or damaged file that has most but not all data out3, err := ioutil.TempFile("", "out3") @@ -101,7 +102,7 @@ func TestExtract(t *testing.T) { t.Fatal(err) } out3.Close() - defer os.RemoveAll(out3.Name()) + defer os.Remove(out3.Name()) // At this point we have the data needed for the test setup // in - Temp file that represents the original input file @@ -112,10 +113,11 @@ func TestExtract(t *testing.T) { // out1 - Just a non-existing file that gets assembled // out2 - The output file already fully complete, no GetChunk should be needed // out3 - Partial/damaged file with most, but not all data correct - + // seedIndex + seedFile - Seed file to help assemble the input tests := map[string]struct { outfile string store Store + seed []Seed }{ "extract to new file": {outfile: out1.Name(), store: s}, "extract to complete file": {outfile: out2.Name(), store: bs}, @@ -124,7 +126,8 @@ func TestExtract(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - if err := AssembleFile(context.Background(), test.outfile, index, test.store, 10, nil); err != nil { + defer os.Remove(test.outfile) + if _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, 10, nil); err != nil { t.Fatal(err) } b, err := ioutil.ReadFile(test.outfile) @@ -138,3 +141,153 @@ func TestExtract(t *testing.T) { }) } } + +func TestSeed(t *testing.T) { + // Prepare different types of data slices that'll be used to assemble target + // and seed files with varying amount of duplication + data1, err := ioutil.ReadFile("testdata/chunker.input") + if err != nil { + t.Fatal(err) + } + null := make([]byte, 4*ChunkSizeMaxDefault) + rand1 := make([]byte, 4*ChunkSizeMaxDefault) + rand.Read(rand1) + rand2 := make([]byte, 4*ChunkSizeMaxDefault) + rand.Read(rand2) + + // Setup a temporary store + store, err := ioutil.TempDir("", "store") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(store) + + s, err := NewLocalStore(store) + if err != nil { + t.Fatal(err) + } + + // Define tests with files with different content, by building files out + // of sets of byte slices to create duplication or not between the target and + // its seeds + tests := map[string]struct { + target [][]byte + seeds [][][]byte + }{ + "extract without seed": { + target: [][]byte{rand1, rand2}, + seeds: nil}, + "extract all null file": { + target: [][]byte{null, null, null, null, null}, + seeds: nil}, + "extract repetitive file": { + target: [][]byte{data1, data1, data1, data1, data1}, + seeds: nil}, + "extract with single file seed": { + target: [][]byte{data1, null, null, rand1, null}, + seeds: [][][]byte{ + {data1, null, rand2, rand2, data1}, + }, + }, + "extract with multiple file seeds": { + target: [][]byte{null, null, rand1, null, data1}, + seeds: [][][]byte{ + {rand2, null, rand2, rand2, data1}, + {data1, null, rand2, rand2, data1}, + {rand2}, + }, + }, + "extract with identical file seed": { + target: [][]byte{data1, null, rand1, null, data1}, + seeds: [][][]byte{ + {data1, null, rand1, null, data1}, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Build the destination file so we can chunk it + dst, err := ioutil.TempFile("", "dst") + if err != nil { + t.Fatal(err) + } + dstBytes := join(test.target...) + if _, err := io.Copy(dst, bytes.NewReader(dstBytes)); err != nil { + t.Fatal(err) + } + dst.Close() + defer os.Remove(dst.Name()) + + // Record the checksum of the target file, used to compare to the output later + dstSum := md5.Sum(dstBytes) + + // Chunk the file to get an index + dstIndex, _, err := IndexFromFile( + context.Background(), + dst.Name(), + 10, + ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, + nil, + ) + if err != nil { + t.Fatal(err) + } + + // Chop up the input file into the store + if err := ChopFile(context.Background(), dst.Name(), dstIndex.Chunks, s, 10, nil); err != nil { + t.Fatal(err) + } + + // Build the seed files and indexes then populate the array of seeds + var seeds []Seed + for _, f := range test.seeds { + seedFile, err := ioutil.TempFile("", "seed") + if err != nil { + t.Fatal(err) + } + if _, err := io.Copy(seedFile, bytes.NewReader(join(f...))); err != nil { + t.Fatal(err) + } + seedFile.Close() + defer os.Remove(seedFile.Name()) + seedIndex, _, err := IndexFromFile( + context.Background(), + seedFile.Name(), + 10, + ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, + nil, + ) + if err != nil { + t.Fatal(err) + } + seed, err := NewIndexSeed(dst.Name(), seedFile.Name(), seedIndex) + if err != nil { + t.Fatal(err) + } + seeds = append(seeds, seed) + } + + if _, err := AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, 10, nil); err != nil { + t.Fatal(err) + } + b, err := ioutil.ReadFile(dst.Name()) + if err != nil { + t.Fatal(err) + } + outSum := md5.Sum(b) + if dstSum != outSum { + t.Fatal("checksum of extracted file doesn't match expected") + } + }) + } + +} + +func join(slices ...[]byte) []byte { + var out []byte + for _, b := range slices { + out = append(out, b...) + } + return out +} diff --git a/chop.go b/chop.go index 9752fe1..d580c5a 100644 --- a/chop.go +++ b/chop.go @@ -22,6 +22,13 @@ func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteSto ctx, cancel := context.WithCancel(ctx) defer cancel() + // Setup and start the progressbar if any + if pb != nil { + pb.SetTotal(len(chunks)) + pb.Start() + defer pb.Finish() + } + // Helper function to record and deal with any errors in the goroutines recordError := func(err error) { mu.Lock() diff --git a/cmd/desync/cache.go b/cmd/desync/cache.go index d1546db..bbd3d52 100644 --- a/cmd/desync/cache.go +++ b/cmd/desync/cache.go @@ -93,10 +93,8 @@ func cache(ctx context.Context, args []string) error { defer dst.Close() // If this is a terminal, we want a progress bar - p := NewProgressBar(len(ids), "") - p.Start() - defer p.Stop() + pb := NewProgressBar("") // Pull all the chunks, and load them into the cache in the process - return desync.Copy(ctx, ids, s, dst, n, func() { p.Add(1) }) + return desync.Copy(ctx, ids, s, dst, n, pb) } diff --git a/cmd/desync/chop.go b/cmd/desync/chop.go index 3ae25d1..1705c5a 100644 --- a/cmd/desync/chop.go +++ b/cmd/desync/chop.go @@ -68,7 +68,8 @@ func chop(ctx context.Context, args []string) error { } // If this is a terminal, we want a progress bar - pb := NewProgressBar(len(c.Chunks), "") + pb := NewProgressBar("") + // Chop up the file into chunks and store them in the target store return desync.ChopFile(ctx, dataFile, c.Chunks, s, n, pb) } diff --git a/cmd/desync/extract.go b/cmd/desync/extract.go index e83f19b..0766fca 100644 --- a/cmd/desync/extract.go +++ b/cmd/desync/extract.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/folbricht/desync" "github.com/folbricht/tempfile" @@ -17,19 +18,25 @@ const extractUsage = `desync extract [options] Reads an index and builds a blob reading chunks from one or more chunk stores. When using -k, the blob will be extracted in-place utilizing existing data and the target file will not be deleted on error. This can be used to restart a -failed prior extraction without having to retrieve completed chunks again. Use -'-' to read the index from STDIN. -` +failed prior extraction without having to retrieve completed chunks again. +Muptiple optional seed indexes can be given with -seed. The matching blob needs +to have the same name as the indexfile without the .caibx extension. If several +seed files and indexes are available, the -seed-dir option can be used to +automatically select call .caibx files in a directory as seeds. Use '-' to read +the index from STDIN.` func extract(ctx context.Context, args []string) error { var ( - cacheLocation string - n int - err error - storeLocations = new(multiArg) - clientCert string - clientKey string - inPlace bool + cacheLocation string + n int + err error + storeLocations = new(multiArg) + seedLocations = new(multiArg) + seedDirLocations = new(multiArg) + clientCert string + clientKey string + inPlace bool + printStats bool ) flags := flag.NewFlagSet("extract", flag.ExitOnError) flags.Usage = func() { @@ -38,12 +45,15 @@ func extract(ctx context.Context, args []string) error { } flags.Var(storeLocations, "s", "casync store location, can be multiples") + flags.Var(seedLocations, "seed", "seed indexes, can be multiples") + flags.Var(seedDirLocations, "seed-dir", "directory with seed index files, can be multiples") flags.StringVar(&cacheLocation, "c", "", "use local store as cache") flags.IntVar(&n, "n", 10, "number of goroutines") flags.BoolVar(&desync.TrustInsecure, "t", false, "trust invalid certificates") flags.StringVar(&clientCert, "clientCert", "", "Path to Client Certificate for TLS authentication") flags.StringVar(&clientKey, "clientKey", "", "Path to Client Key for TLS authentication") flags.BoolVar(&inPlace, "k", false, "extract the file in place and keep it in case of error") + flags.BoolVar(&printStats, "stats", false, "Print statistics in JSON format") flags.Parse(args) if flags.NArg() < 2 { @@ -87,38 +97,125 @@ func extract(ctx context.Context, args []string) error { return err } + // Build a list of seeds if any were given in the command line + seeds, err := readSeeds(outFile, seedLocations.list, opts) + if err != nil { + return err + } + + // Expand the list of seeds with all found in provided directories + dSeeds, err := readSeedDirs(outFile, inFile, seedDirLocations.list, opts) + if err != nil { + return err + } + seeds = append(seeds, dSeeds...) + + var stats *desync.ExtractStats if inPlace { - return writeInplace(ctx, outFile, idx, s, n) + stats, err = writeInplace(ctx, outFile, idx, s, seeds, n) + } else { + stats, err = writeWithTmpFile(ctx, outFile, idx, s, seeds, n) + } + if err != nil { + return err } - return writeWithTmpFile(ctx, outFile, idx, s, n) + if printStats { + return printJSON(stats) + } + return nil } -func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desync.Store, n int) error { +func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, n int) (*desync.ExtractStats, error) { // Prepare a tempfile that'll hold the output during processing. Close it, we // just need the name here since it'll be opened multiple times during write. // Also make sure it gets removed regardless of any errors below. + var stats *desync.ExtractStats tmp, err := tempfile.NewMode(filepath.Dir(name), "."+filepath.Base(name), 0644) if err != nil { - return err + return stats, err } tmp.Close() defer os.Remove(tmp.Name()) // Build the blob from the chunks, writing everything into the tempfile - if err = writeInplace(ctx, tmp.Name(), idx, s, n); err != nil { - return err + if stats, err = writeInplace(ctx, tmp.Name(), idx, s, seeds, n); err != nil { + return stats, err } // Rename the tempfile to the output file - return os.Rename(tmp.Name(), name) + return stats, os.Rename(tmp.Name(), name) } -func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.Store, n int) error { - // If this is a terminal, we want a progress bar - p := NewProgressBar(len(idx.Chunks), "") - p.Start() - defer p.Stop() +func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, n int) (*desync.ExtractStats, error) { + pb := NewProgressBar("") // Build the blob from the chunks, writing everything into given filename - return desync.AssembleFile(ctx, name, idx, s, n, func() { p.Add(1) }) + return desync.AssembleFile(ctx, name, idx, s, seeds, n, pb) +} + +func readSeeds(dstFile string, locations []string, opts storeOptions) ([]desync.Seed, error) { + var seeds []desync.Seed + for _, srcIndexFile := range locations { + srcIndex, err := readCaibxFile(srcIndexFile, opts) + if err != nil { + return nil, err + } + srcFile := strings.TrimSuffix(srcIndexFile, ".caibx") + + seed, err := desync.NewIndexSeed(dstFile, srcFile, srcIndex) + if err != nil { + return nil, err + } + seeds = append(seeds, seed) + } + return seeds, nil +} + +func readSeedDirs(dstFile, dstIdxFile string, dirs []string, opts storeOptions) ([]desync.Seed, error) { + var seeds []desync.Seed + absIn, err := filepath.Abs(dstIdxFile) + if err != nil { + return nil, err + } + for _, dir := range dirs { + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + if filepath.Ext(path) != ".caibx" { + return nil + } + abs, err := filepath.Abs(path) + if err != nil { + return err + } + // The index we're trying to extract may be in the same dir, skip it + if abs == absIn { + return nil + } + // Expect the blob to be there next to the index file, skip the index if not + srcFile := strings.TrimSuffix(path, ".caibx") + if _, err := os.Stat(srcFile); err != nil { + return nil + } + // Read the index and add it to the list of seeds + srcIndex, err := readCaibxFile(path, opts) + if err != nil { + return err + } + seed, err := desync.NewIndexSeed(dstFile, srcFile, srcIndex) + if err != nil { + return err + } + seeds = append(seeds, seed) + return nil + }) + if err != nil { + return nil, err + } + } + return seeds, nil } diff --git a/cmd/desync/info.go b/cmd/desync/info.go index 9463d56..5739a6d 100644 --- a/cmd/desync/info.go +++ b/cmd/desync/info.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "errors" "flag" "fmt" @@ -115,11 +114,9 @@ func info(ctx context.Context, args []string) error { } if showJSON { - b, err := json.MarshalIndent(results, "", " ") - if err != nil { + if err := printJSON(results); err != nil { return err } - fmt.Println(string(b)) } else { fmt.Println("Blob size:", results.Size) fmt.Println("Total chunks:", results.Total) diff --git a/cmd/desync/main.go b/cmd/desync/main.go index 63d6e8f..1cf7b73 100644 --- a/cmd/desync/main.go +++ b/cmd/desync/main.go @@ -2,12 +2,14 @@ package main import ( "context" - "errors" + "encoding/json" "flag" "fmt" "os" "os/signal" "syscall" + + "github.com/pkg/errors" ) const usage = `desync [options] @@ -100,6 +102,15 @@ func help(ctx context.Context, args []string) error { return nil } +func printJSON(v interface{}) error { + b, err := json.MarshalIndent(v, "", " ") + if err != nil { + return err + } + fmt.Fprintln(os.Stderr, string(b)) + return nil +} + func die(err error) { fmt.Fprintln(os.Stderr, err) os.Exit(1) diff --git a/cmd/desync/make.go b/cmd/desync/make.go index 6789b4a..bae650a 100644 --- a/cmd/desync/make.go +++ b/cmd/desync/make.go @@ -73,29 +73,19 @@ func makeCmd(ctx context.Context, args []string) error { defer s.Close() } - // Progress bar based on file size for the chunking step - stat, err := os.Stat(dataFile) - if err != nil { - return err - } - pc := NewProgressBar(int(stat.Size()), "Chunking ") - // Split up the file and create and index from it - pc.Start() - index, stats, err := desync.IndexFromFile(ctx, dataFile, n, min, avg, max, func(v uint64) { pc.Set(int(v)) }) + pb := NewProgressBar("Chunking ") + index, stats, err := desync.IndexFromFile(ctx, dataFile, n, min, avg, max, pb) if err != nil { return err } - pc.Stop() // Chop up the file into chunks and store them in the target store if a store was given if s != nil { - ps := NewProgressBar(len(index.Chunks), "Storing ") - ps.Start() - if err := desync.ChopFile(ctx, dataFile, index.Chunks, s, n, ps); err != nil { + pb := NewProgressBar("Storing ") + if err := desync.ChopFile(ctx, dataFile, index.Chunks, s, n, pb); err != nil { return err } - ps.Stop() } fmt.Fprintln(os.Stderr, "Chunks produced:", stats.ChunksAccepted) diff --git a/cmd/desync/progressbar.go b/cmd/desync/progressbar.go index 9fce104..ddeca3a 100644 --- a/cmd/desync/progressbar.go +++ b/cmd/desync/progressbar.go @@ -1,103 +1,37 @@ package main import ( - "fmt" "os" - "strings" - "sync" - "time" "github.com/folbricht/desync" "golang.org/x/crypto/ssh/terminal" + pb "gopkg.in/cheggaaa/pb.v1" ) -type ConsoleProgressBar struct { - prefix string - mu sync.Mutex - done chan (struct{}) - total int - counter int - fd int -} - -func NewProgressBar(total int, prefix string) desync.ProgressBar { +// NewProgressBar initializes a wrapper for a https://github.com/cheggaaa/pb +// progressbar that implements desync.ProgressBar +func NewProgressBar(prefix string) desync.ProgressBar { if !terminal.IsTerminal(int(os.Stderr.Fd())) { - return NullProgressBar{} + return nil } - return &ConsoleProgressBar{prefix: prefix, total: total, done: make(chan (struct{}))} + bar := pb.New(0).Prefix(prefix) + bar.ShowCounters = false + bar.Output = os.Stderr + return ProgressBar{bar} } -func (p *ConsoleProgressBar) Add(n int) { - if p == nil { - return - } - p.mu.Lock() - defer p.mu.Unlock() - p.counter += n - if p.counter > p.total { - p.counter = p.total - } +type ProgressBar struct { + *pb.ProgressBar } -func (p *ConsoleProgressBar) Set(n int) { - if p == nil { - return - } - p.mu.Lock() - defer p.mu.Unlock() - p.counter = n - if p.counter > p.total { - p.counter = p.total - } +func (p ProgressBar) SetTotal(total int) { + p.ProgressBar.SetTotal(total) } -func (p *ConsoleProgressBar) Start() { - if p == nil { - return - } - ticker := time.NewTicker(time.Millisecond * 500) - go func() { - loop: - for { - select { - case <-p.done: - break loop - case <-ticker.C: - p.draw() - } - } - }() +func (p ProgressBar) Start() { + p.ProgressBar.Start() } -func (p *ConsoleProgressBar) Stop() { - if p == nil { - return - } - p.draw() - close(p.done) +func (p ProgressBar) Set(current int) { + p.ProgressBar.Set(current) } - -func (p *ConsoleProgressBar) draw() { - p.mu.Lock() - defer p.mu.Unlock() - width, _, err := terminal.GetSize(int(os.Stderr.Fd())) - if err != nil || width <= len(p.prefix)+2 { // Is that a terminal and big enough? - return - } - progress := (width - len(p.prefix) - 2) * p.counter / p.total - blank := width - len(p.prefix) - 2 - progress - if progress < 0 || blank < 0 { // No need to panic if anything's off - return - } - fmt.Fprintf(os.Stderr, "\r%s|%s%s|", p.prefix, strings.Repeat("=", progress), strings.Repeat(" ", blank)) -} - -type NullProgressBar struct{} - -func (p NullProgressBar) Add(n int) {} - -func (p NullProgressBar) Set(n int) {} - -func (p NullProgressBar) Start() {} - -func (p NullProgressBar) Stop() {} diff --git a/cmd/desync/verifyindex.go b/cmd/desync/verifyindex.go index 55f91ff..dfa1bf4 100644 --- a/cmd/desync/verifyindex.go +++ b/cmd/desync/verifyindex.go @@ -55,10 +55,8 @@ func verifyIndex(ctx context.Context, args []string) error { } // If this is a terminal, we want a progress bar - p := NewProgressBar(len(idx.Chunks), "") - p.Start() - defer p.Stop() + pb := NewProgressBar("") // Chop up the file into chunks and store them in the target store - return desync.VerifyIndex(ctx, dataFile, idx, n, func() { p.Add(1) }) + return desync.VerifyIndex(ctx, dataFile, idx, n, pb) } diff --git a/copy.go b/copy.go index 4a87c7c..38d0427 100644 --- a/copy.go +++ b/copy.go @@ -9,7 +9,7 @@ import ( // not already present in the dst store. The goal is to load chunks from remote // store to populate a cache. If progress is provided, it'll be called when a // chunk has been processed. Used to draw a progress bar, can be nil. -func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, progress func()) error { +func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error { var ( wg sync.WaitGroup in = make(chan ChunkID) @@ -19,6 +19,13 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, ctx, cancel := context.WithCancel(ctx) defer cancel() + // Setup and start the progressbar if any + if pb != nil { + pb.SetTotal(len(ids)) + pb.Start() + defer pb.Finish() + } + // Helper function to record and deal with any errors in the goroutines recordError := func(err error) { mu.Lock() @@ -44,8 +51,8 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, recordError(err) } } - if progress != nil { - progress() + if pb != nil { + pb.Increment() } } wg.Done() diff --git a/doc/seed.odg b/doc/seed.odg new file mode 100644 index 0000000000000000000000000000000000000000..deab78e564be2c61f2a6c0a8357ef24d013c037e GIT binary patch literal 12135 zcmch7by!@8=5yN5GoD-~F=#2)s*16azCVGc zFfxiL;JsgVIOe|T?tQAZYIUiI0MtNm33hD6(ok$>L$o)nJR&Tr>%v_<(os>$PJG>( zyr*xh=Typn>revGmMHpN9Gm1I5zl~Hkst{^uxz3Sb~MJ2bW;oVL)j5bKsOVF2nLm> z57)TEZVi0=1Z%~C(*_`1c==SSQxIS8iymcgV`^%ZrA)@Q$;iwM;#LE$aU?qD9eRMSG+!F{iD5B;aZD{xgnc1-}}I>PgcmDlmLBUEWOyimRNkGjYbTbb-s9-C*Xfl`#U5eYmZO0CwnRODPpHFR=J zVdr^1>e!IEn{e!dQuvU26f3KAk+z^%b7ey*RU6Q!8kIWb`~8?3C$P_Mleh8hc@xO< zw#s5JpB2_{JXT|)jPgq*s%IYuosgzLAf-^2`QUyKxwO|hCiN}DRUOp1Z%qpa922@v zFbVL6K9+ON3cRSWX5>}-xieH;qkHqFeV9ihd+VbL%D9bG`E<3943L3_DNbQ!Y6S%W zp@skf`R9xyyyo7)*wNA4+SK7^)|WJOY}UD6v#zU{tzj-sEB!gcS&%+g5LOIwNZsdr zyNBV6(_z^W8&kX25y%n}c|5EUyC$Kgzy*GgJ01w zalzCY5_%nvBliW`xBCX9I_2#t(0y1AT8#pQgznL@25EDgGUp%NrS** zOn?D9TQ$ZE4EgQ|?BEGUg)l34gb5K0Ue!vU7^UWiau>OzK82R8?dg;u#1B~pRh|a> z7*;G9CHi7z5N1Ff$s~6(hHjtV`@MHs($C^m!PL=3-2)`44F%1_a&nDM3Qrc;QW6)Lv=QW*1R9Ht~hkIY~!PIJLHMKR#$p_G%ug1+ecrf$-79$$cr z8Y8aHiQJ;3hJ1bp?il6{Od-e5Do|h&&GC1Q3S5|7{S}K$pyYQO%7NI`#~M+gc@+ z;71a6BB?F!BlL!=q$xfks)o3na_;8={+tmr~2ikn%shhzr%#alj*hp_d z^(qx`g58c899djlx0p*x=<{m%m9G+F%6(yK`cKIzdV%&Vs= zOq3}!#xh)hO zx@~S4&BOONRv(4N4Hi35NbYg9f|G7ihj^j2YCe4zew)h11U^dSZ*hZu&(*=~sy?oj zeH_!I_6=Ri6fkZWuDBEq>B73Ba@4JLDRmr9|PqD8!PNqDcJq-!6{#U2=zA}^?e zb2x&6T#B%7pb%JX%APz8OVi;gPnzL1TwQDH#T3c{wud1{>Igp^kFXTB$}u0vEmKVEs!Kq99h^K~rE+caNpqXZaVZH={LdRM76s@EQ@yS&rW90t8 z&1svX(+`q)JGYFbrOf^=9Kt;rTG85Vf0oW=16ZoW6id-U>&GA4SpfZf_W(25_C#ws z5L}Q^*^oI>W&FHcY3(&v?|nO}$i8yS&)zj72b_vAZ;)m7d|G5G4=sNd;L({n^blJt z)#?*jNmn;F1{K$h82qAmxKn7a$B7BYS>`V0~!6N&hD%l0o$ z!iz+G+U+j;7dT}Ly`iSCje0BGq?XLo(Rxaa&p{gvYi8+qq`>9m996__+g$j-9xn0Dg`9)lf-s_HCjruAj|BbNqjQR}Ws6~N0 zk^VSe2Ppr5Exc2|py8~X#VHznk_i9olaeLA3|w`HwieMOgt_Z%7Lg%qH}K6nqhRK+ zMvUtGjK!9$hs|{_6t!pQpOX)#5;&9y6bOhC#lKBH@UN4Pp^dfU&&A2>+;gO(5tloF z>UC2|JXcM0xLUNHSm@E+a(B`DX{cV3C1_;*&_<(#IDTZe2a4_?WDcR zs}j1$w@!&xn6a0)w^aNz2I*yFdK!tM9<;t0Vb-;BVA5Of(BM#--qV1hN|IZJuy_=G z3`l%KJs%_!j}X*1c9^Kf2`kMEo5{dDq7Popi}>LF9Y+fE}0f*c=HEbP; z^G&a@n+T8A&F*ac@`n8V!Bfk8MXc>^J&YAN4dof9e_@Jp?V}yOdJbC9$7*G}Y`Rh< zuQ7nYF}b(wBLX=>Q*F{>HF%*gFH(zOx1(Se2ADfBg)Th_nCN^rNPcSziTu)X_RH1t z5~XQ`a0|sXGCI6hyos|4XjhJLf70U{t=KtsJFngK#vh^3Mrhh3~H+1uAj&28Ut-3n7VL3duoGYUMp!Hr)V zf?y(y+?IS*@eH1NP06(ymQp4^s0kh?BA=v;rYEwo z=uJolK%d_jYsWoQtCZmK;;+DD9uaqn$wx1@kKPoPqzsRTF1kGO5ZO3md^nCH&}a1J zjETiy9Npo0M!u$ye`ys-;yl+jG!tel0|fYXR4{~!@K^wVk0YIFo&FiO)AxC6Icsm* zglbT`9AIw1?|YwjoCQn}MnPgGci2SMqGK!-fr_OU6E0GD$v19iB$v5iLe5*U*IS^2 zbQ{dKJ;qyH`B0n-Dd0WvvFhGZ7oPkOngA*2(-#-E&9jf6* z!nYE)#pRa}3(ICien=nok@F6;I%HuRgG||=F39;RJJ6?o3g^i!B|4&! zjd*25yf~z$AWGD9Y}JeM9p@5Fw715W2<;HPF`qWy6j1tcoahPd0qJzs#CGgOaRX=3 zWFkgyA^GcF(*xHuRBb5JwTg@iw+1OLQfjOsZ_rE1PQkEnLUL(8pqEdztJ3ndcg*m49SaQWfWYY z3)XcMd#(4+Fho{Y?P;HlTxcOb;O}&%YN`^EIhY1_2v6-B?Y{(Vf!M~eho$CeR4K1S z-oG0>st4hFv(#<6IPd=hNBugw@eoFK2T(L3v`H}#a?o&HWhZ%J(lJ@q)xZ%&8b}{o z0jDK;dI7gUgMd#BL@ zIZWGw!C!^t%EPmV_X=(05+aA04UT5P5f%(k4~>5JR(M;LBu_2~1><|EsALwx+bx8Cnw9O3}X;~LkOPMxgRZN26InN6DutLg{l}WZ3 zUXc{aa5td*n2;0cRuhfHR6o-vVfU~p&HK}Q0Ds-Vj|HDiT(|9%)tPIUOY@NuxTPYe zm-p8Y*Q{<5FfXBv8eXgg-@s82#prFmO>>qY6eUEEy+LJ1Oxzjo@ui6dvIOc~HV@wx zGu6tSVQGn|%t4Z(%gdbzywO7s^Y1a^FmOR&HbarwlFcW8$kSAXd^g6?yG@#B=`mHz zH&@E%3h+oZn<%7osVf|$$mJ88o(?)!iKb=D@>dxM&0%+=?R%p}g`}9ss|D76xEH2edBD zN9iC?DFsSe83N?1@Erye{7Tz~XCs+qW=2R2`4Gz&*!ZhB1m})Oqp*9Yxr1S;CY6sB zEANLOE6KN%#Fkn|AWy;kw9xlCLAP*7SX7~3uZL>pAz{wwl4I8KL1rnH0Qq=HY4 z0B#$g1egklWgn+t&|ab%no`k94Jz7^(HwFrd(u#1pz3q451Qqh}6&S zfH**~PEVcAnh6Fs2guHe=lXc@DZ2f|11(O1kDF(US3q!SZL_2LFZWfi>hyzB9SSIZlBjV~w%J-4%Nfc#c56WAfg<2{3jQAH};$O$22YmJLTy$>W_~ z9-a6#Jj|IxozKQB5Q3xinRgsp`}jPy$fATpFw;z6jy_YHX~+&^jkZ^+@a>Jj`-LP3 zCRW>qLGY3(Fr9cIf=j1-NGLqLZrtxmZ08XIG(1{>ku>#AOxV{Y7R$YH5EyeKg(;qf z0e3q-N94`B#3S>pG4JZ{BRe0VdeV`O={-$<*q zRLhCUrOo;RRA^Iw0E90yOaLQxzK=v&Mq)}I8aE7i zz+AKxd>;bOm_gX#9%t!@!0H1JerYCBCKvIe>Ft~J0;PII>5rlJFnX#<{V3>({cw@a z#C%HdKL!<-)F#cp$Bcy=F_EU+^`>E7W{((5R4zPv^W>T~p|$QSH`}3EgFi%ilRBW~ zrZ5nG>#WTJKemr{gN6PPFVEq(=fz)7PR!Mj?WD7(5L}t(a!!z*TsV%C_n|4x_ zt`3^Py|PY(G$)n0Ioo|d}O^80M>r%pt;mpG!0?{UyJ`06IRc$ysIcS0 zyAobs5Y6Z2f7l3YJ@vE(>qIjcA0u*|`89PHr`rrG%4v_{o{yw4xbSl4mTFGnao%vG z$763-bIxNtKeu!m6C2C=!lFMIh6JnU#xGe+gBs{_q~tR9EMl*(sR|g&^4ZayovOoX zu_$PeJi;+YDxt`tF@FtWty~+iD0u?oN;0zd*LKnbwPn$EWoMAr{c|lI|`K2C?+=CA2-o4RjVdWmu6r9 z5mDpT00}F0p!s0tS7CPf`Je$_<;^6gQ28AKBP}h~i&SoF#7El_D~GBt#I9;gCPKQGe3F|4rGx7I0rr5gTh0b5ke#KL}t4 z7DiheTPNFB*4K&gzqr5O`QiV@Pv6kc7-;+oV`Kj>p1-Tn!BOAQ$-zMXf9vOHV*|9+ zw>Ad;OMTeCiS%dV{=K%}wa?bx#?;=};r|gQ(QoQAx6(H?c3>1XceK*Cb@*Qf`?Hb% z9_;U$Xacm+cQpRb4i(Q$lwx^h>VKX;y;RJctPHI6&4CV#j(-*zY^_a00rC=vaJX=< zUqO_T6jge?_JM$ag1~^kmR9`l>|bvh0J17#@Q6qda42}>%(Sn@r^v19EYE2v!)+za zV=MA%;!-j)GUD>;s;a8m+S=m6u9AYzUK$QYMn;Z~j-H;Ld^$eDQhuTeA<|Ml3L25J zn$faWaT=CB+UAkEW+^I8De7)nD(-ojp1Hd2Wd;V(dNzqxz(h}z5J#INFS|q|=X__^ zY%QMxwSZ==!20(=t-7Dut^F#Ud`h*$I`kqt4WfHY69z03dd(9DtdjeHDMNOdV{Tc) z^#bB8d_vWp{0WlUnPO@w5*nXm48y&EMZyYs9#Wk=eB)efA*O*oHnF81;W=J$^{zo- zJ`SG(JeDF^-OMnXBJ0!UsWKt{5y0s;cU!^7j^;?mO6f-{?=;%dXP znlp-j)cd_@c&)3eYj}9LV{~O;WO8b1YH@L~ z=iA!A^ycjR>eSNS!p8CD=H|h{!N%VC?(x;p`Sr!c#nsLI&F#bU^Yd%^US3`(4xoBL zK%h9KM1@pb=a1$;IV`Z@L!U+n)t&vo!<(qpUq`qt*UbA(3_?x~E zD%g+vtY~$IO^2HXDis$7xz){e{gx5pxg5vglnI{nMC8yB^SiF3cdKVZ&qJrrVM|D( z553}sPEAusV=j&A+eeAdbYqFMxJ?oZMJf{m84rFfB)E3bVj@HA<~qnIq)mO~G1asI zr(hIdonV8)eUKM(t4yw#hzby`TW)V}G{0WLI2E6`yEo#%FKLrU2JSo2lqXDAt4a5= zBo@q^3q%E-PlZ^nU3_hwa9xY1KAE2_R;JvSq0UQ^s}aDHp*7`K)6dpebOe(&6D-bo zii?SoPh&CHEmZIZ@|zpA?`~Z?R>r7aIQP(7=f%bx#F9?KQ%~@TZ`@WF5*?ei&dr`C zgm1ItelzQ#i|1Vr6Z0L|R$(NntPJbx3-5D{$`(sG?;|>*tZ;m%&Lg11vL*4*?6@~8 zU$}H!$CI(_>}1YZYON&_TEjKY!&So~Myhg$9*miTYKxp!W!L;8k>hZtBei20>|PxvxVKo^6d^rBZ_!WKx8-ZrC24-_H99v&KEC_ z=<^`6I~A0qiRjya=n^X_FLA{Qlr{Y*w~A3SjtFkHntp}Hi^7SBauYhbRp^pv`9LuElufZ9d7 zLN9^ap@z4*Ahyyaaji`Bl(0FJ zX2V*kQZuubBx;qT;Z{3llssQfvxisOSG-5?;H-~Sd z`3GgDNZ)snPv_EN_iE$4ldFEWG|})zJ>&Jrv=b*N+)g&wXEr^Yg2!5}c^KQ3YUu-V zWRJGYyvy5@`+_jcqVyxY8ikQ=Y(KEKx7UcVIIe3%zB%i8@4QULg!3x}?pFp#(v0vj zo^9(Tog6Bf!J&ck*p5_;FIQ+7Q5&&L_hU^+YVtGw^gG98;RsWwyDRlIq7&~YFb3ae z1=xJQcsgfN;}m20I{g@3EVQR`K#wcF_#T)Hv9rbz6ugdO(Ty~gOwfX1h{wjMvZW84 zVO5!F!VhsQ$2OR%5aRrb;Cw$w@FK^>FWQ zVzRB}aQWQiK$MQpu{E)LJ+Ry!W{hHfTr^rnN@ne@NwoN2bp5d_ji^B3lhHUiTPmw? z2#$lyZSVB7dhVcdX}5?-!e-O>n#b`|?R)XkH)O*f!zY2*`)pZSSrf82qv48shWK)X z4OH=+7`a{iA!*F>qq7aY*fehpy9aoT9Nu<$)S z^?nl2=ATgikd=9y7|x6h`TK$MA4k;xpC5#SGQ5B|Im4{i>yTQ!P9`8yV)CLD!g_xH z{FwOXBErGZ4QTvd4~eyO9TqsyyxSGB^$Tyr&!RSaCVTf5*&Q4*iXAwe94ZMyAgIZR zJ3)nphmyYZKX^TnEkR1c#Y?8t81p!>pa;+C=-mV_K{`vdKddoq#A(sUP@L0JCf|<* zY{!IXWPaez@_H)t*n2#EW;_v%tqNfabfD})E*+{WVVWls$YhJ3bP!or z{ag*Jf18#zuinu8W39<9K3w=Yg>oIml{{%cS^~rHTS3w4x;!3@QaaNTo7c4Willy` zB-wm)CpaKMd3TbgEP!gePDz$yElc|0tG8*56Av%*c+qCU1*`vbWkl9uG(H%Dl0`S1 zZ*80XvoxIIvsKhM!n?S!>4B31Ht5o~r}+bt%WG80VZ`jqXoGb&t{bZU)NzWs76mph2DzGnEQTBW{|cEvU=XiOB^gmeGgGbfz(_p#|F*(P#p+ z>ctL~3aSx$HY^7IlX#jCGWY5+rPKho@8k(+#zNnSF_rb-!!pj~o#o^Vk*e;g<%p z$vuarP$hTD4hrq<`~GRxmRe&XQb}YGb9MSM))Ug5i!q*XgG_-owhLwSN)V#IFt(~P z%Nl9hX{|uy{0Z2!(bb0Wr4WX9~Er5t`jxY;-O~MsLEhyn@Dq)eu1Z88{Y0G5^*UaJTYCkJnQwdh7gc3s+SpK4&5Js+b zhB7PA-Fk=oM?XJ8i&s)yKYYZR~2 z#QxZ#xOC{ zN8}a;|8iorf(m1;4s`A6EgKJ>u)6dcIklSbj0i*r9C@G zcc*UoZujRvm{DEx%n#|P*|RBy;4eTFoyR7$da(*x4x6IWck_Hc_1W`M6MD*G)8wLj zb1EoHipu$p{qtb?gfmCJ?Cj^hC*GA3?Dn#pCh+fNu2WqiD!q!7(ft;jLKNDMP0NDi}rNXZaE z8T*KNkc@`0Gc$OiIK)cn(8{O3P_w2JqOb1bOg~%GVHOkC5 z8ioP|tnHB9Nu#Xto` z=6iICiJyb4J+sgrY{>eHRW@ZfT^uoCIQVwFKyV^0s*SGF)CN|^{r))=Qfyp#7p6zo z?&?C=PObIsxz+N5lP*j;u%HRx-6Ea|(07CnC^*E0g;stPL+{3x$Q)O&H)5l9g-wIf zwYl4-^4K(D#xt)w&7Y&Habo98V`0#?{mU8 zH23Qp{UwOnz1ldWICkCYV295N?<6MOmeXr1>RU2{BhpsjDBpitZ)g-Fo%TFHq!*2v^z{?B9Pp zBnM^GQ`~ z`+%s8Ez5{`?K)M5v?7mnpCMSDYXlM=4ZN`=xwUE~8|fRUF`5&q+*l`8oN2GxScjlz zxL)-|uaLYXPq;mDY15&7YsiapLJ!r}NnZ+!L!IC}e002^$OIRBy48LB<9T=L{gM~1 zyWq+mPf)+3;E?_d%%cjNS3vFdMSBvMk{@5a}#5S zKa>#V$}$$+Oejk`7~&o>R%&8BgE`;O^PmA+F|5Q90sNg!o>u6T4CE*x&a5N5p4-_= z*)9$5L}Yi1^-mrnR0Zc7C3U*LM(JKb^Bi*W6v80p(e32mE-N6P|8GD)-3kH44 zOL>VNyAFVSaAuFC7gp~CxSM7=1I#|be9I}hAQ$Xbfx)~YlT0t(nnJvte0P!qM>!oD z%Ki@Ds27?P+4W*oE__4vGps#fjvOQOOy>FS)%P!1nF!A0^3d{HXGU3qyHr+$3 zA|G98j)O7g$_Sl#Ig64(E`YT zf?(A``RW$ii4ExIzUyH9w+drhS|LOHtwaZUIi?pR`P1=tJnm06_Lo6p{yQ7|_ip;-2LkewFa2c#zrp!CJN(Z` ze~-xRH%NbFi~pxM)xW{{D|`IUI6rC9U$*xfoL_A6KcoD;e~7St)6ainm;V{*C%^p5 zG=78hf3eN~4EFcNCH@BN7yJA_MH%@G$}cwh?VX6edl?fXYXI_iPX}#bA^b32m=G-imHl|4h9D1SNM%hfCc{| zfuH*h{vvczLAzpLkT%2bc1#z8z&Z>J)o4{El(^48Nc1Ef}0m*?jvkwy_IH;Ab?l|_v#l@HxFR-w&3-6^U)JT`p z(2S5G2^6k`ld))MX#8w&Ei5dwvbJ8?Umv5UrY?3bp$=k;Mv5vcDt>wWWqqvjslC0_ zv$ego5x!yc5Gqa?(U@CVnfdzl$onUE@bSZ|t3^v@Y@TLy6XoWVEG;g(pc<6(UsJyp z%NdU|71pD+z?tMN4vA`a($s#L&%-UC*0YwH>L10YELvK@!QOBFr@y46_=uw_!!b5~ zNGFLp5v}vcSmzOEm>!zue&}-e8T@!HiAj>)#sW9+Ut72b3t#WG_Upuj{f{H|`h$7< z%Zh218Dd|XqW4yKIs6pb994Gx(Q>j6txu~#0M?EC zeA^P$LwlZDCf)+7rrUYguM>aW?YoAPp~BRJ^%{j%&CV+`(v@cW@jh7p+euJon#GNL zTcr~eL;IK<+5Nn5^d|Pih&}IjGILiyFX0l?>B*yvoA`dwfq{Vu2|w0GO0~^g+_Hc0 z-~m1^Zn??FTm1ZA$HtP|4G=h}cKIqc;>qQW4H?frzp(L1t!->9tgOPr!bq8A4ww4V zEG^Air77E*Jhlt&rNk#Ea|;N3vm(MlA%^10x^F8fy-7`FmU$jKUQ=D|=HgOSQ=_h- zkrdL+Ea{T>bgK6I_wUG+ysLK*IONzpQTDj_KdCG%EOLxBwe<9;DJUqYsb9Z-onKkG zy}T^p__NvT!_QXV_20i=$Hr!rlt_7O8C8f8q$CR(U;8eifWWaq#8Xdv&%NXA?fr^H z?&s|6?BrxBx-j~r%A(JGbF$`17pb&#Q+<7XM>K6;Z|}EMvAfSlt8IR~z2(b5Nts(( zO8ogxPXf1?m>8?Erlw|2VK9VwsQAP>*OItQketf;7{mX?;%($es7eBoP- zMMXR}Zrng3F_b9XV`5@{3=N4pOcSVG8>vBm)J$~>5{Ia6~^AX3(EKJ|Jd8xi;5z$ zva)h8y>q8a*86~pmX^53Yj1^oV{&{Pv8tn^^Uinq8}QSC@<30$`y z{ry*{XlnKJ^yt%71-$oGK6@v&ww}Y5z_DoYIie6JA|`H#9QS2FWu*{!bRF(O=~*m3 zz&*kvjM_N)w>@9Uj6fh58NaHVf0)pGD+8M%aPzIKcW+P6@87?lt&hfZpY61$dG)tru)(Ftit4@#x#S92r7tes=~O5^KMc| zNQk_QwB=Vi>kfIJM*oZR#Kc5+Xn0m6nRRs@T3WG5Nl7s=>iYUgm6erm-#&74tB&Qz z#>O@@oVs#VobKvXJ3BkanMOVRgIUSF^YatO1zAOP_5Q!h11KNUtAf&=o}N7!TtT9g z$9t+AheUVBB!$+UThGjmy4*?D@>Rk)C*x;kgCl2bs;*RNkSGNe@Z zM9K3~-dPPWTd%b|-)-yej<7X}^^ZFYue4#r(H}6wb+y)_iLf*>Vss>G@7vin@jn}} z_9xH7{gHo{)_M(P?egcBYKbrE9M@`IAy&U59+pYK@OBMtw?EYD;GWaTzr0<@$-v=X?SJoGLo1N966| zZ3wSX$40|Qs}CsrD0E+)-RSj{dm6y@^UqF^z^W?%TeJz=ckgqyf(u{+7vF4 zVVCz>+R@v)HXwIlKgc@Ms%IcFZDT`SnXUnL2Eh zU(_@5amum!eX*>HYIpCJmXti?*WPxEFoi`TTT9Q#D9_1Z)Bb0Tb@>?#+@3wl&(C+9 zZJnt!k^BBC>eVX=Vd3v%W14De(6S!1{2mz|zU6<;&$&PQYreCy6Dli>?)mupCyL>u zBZGs88BXoz`)jYWvrR25@>ety!?5HQ+d_$DWn>uKs4lNy37Z~D4+BRPQU?kQi@5!p z*RQFSYWVca=x+JkOyIST$$ayMj)y(x?9(UtclR2*#SJf4TkJ=xuXlv`RWB-)wcmZo zaNVuVO?4j}yOkTxpSEV2zI^!t)z;kH{9tqH-w#c>nFd$~@0Ray` ze~7WxCcTK|&VF`50p7+J5sawMfALP=*VL37@@l_vlzQIkzG)ySDY=t!NPaj;^+)epQ&OyOoOJQesH}b`cjq&%nnVBp+8FVz3aG9mADFTMI&pkcQcD|4;4`e1MC%=k{ z!s>@Job&#@GYS(!OjI|lMP$)rNb#--4&cfNrzlMee=!_Sqf5{^3Jv|#* zTW=}-2Amdq(|MsC;7kmo~qGu$fk(Lql;1?km1xxc>MP)Qe|(E7}C&%Di$=IDh^l zWMyT&n=BX_1N~=weB7wT>)E$-$*lV|FQZ~&p8hDfLVg{P6}>2_o9p@M5#qYvzZnl5 zM!Iy)zBv;uQFiv+jSYJ|tC#Pgha+Zx8Flw_o&6qlD9j|np|F(KArF9q&uGm^60 zGO8Pzn|s{)?GroEHBK28Y4@^<3PF>!Hh;#;gVO&HAk6t`11kJphSWa54>D$%m7)CT zh;aF)x7*TaEzZGNI7pkbA3r(>h7@i8Y*`;JrhHGlTinzn{k}XuzeAvN^A}P;z}VPW z-JV6}`Od#t|8Qf38G&~;;lP;i^GS;K7y%J}OU9XzAUryCN?Hely z4i1iG!M{9vB7^aqqAG{Ue1gTr#fZP}W6jK*jbp!K`+IG5HRa8l z=_U_2QxX!i=f5{gwP>x&72lC)0`SHjM9&8Y;JXH}u*%5zoF49eJU_OZQNxwQHr)p7 zR^OD>=JQnHF<<+CN>aZ(67LVuB&(Vg|EICBvEE*_=Hz!Z&;kJ=gvgUIOd`%QZaQP_ z5u^tOP_^C0yHj$x=jPjRBZ5Bq9AW7Vq)Alzo*e8O_?;Y-L*s!Rl-5srf`sV{+YT2QytmTVr`eschnJCPXa*Bfas^nfuB^xtMLc(M+JR=1 zlbg%hZCvlf%1BBQ zVKvJOxj^D~_ElF`_jxI$ZgK415u+{VXbmsUwyKAoi6ik z>Dy)jio}!bqO$rj11jmLsxo(5mmrsX=GiB|#%7fku+$NS($nA0x8}6_Z257+ zmfH;dl)6Fu&uJwOf$$?=9`Y_sJVlfHahw7hA_U1n1Jt4UY#9Y*q91|_6c#s!6_io8 z@1ym#95hfi%G4LH=GwmNzqVuVx(I)r5V!SRTkmi_L+V+{Nd~^n4K4G~xCakwo%%X# zF(vTs8vQJi~x66_FGY zF2zA1bf$4w1nKn@9WgPEQ|0dCzEK5(RJ5_cHzDM&a9rr?6n-f@zns|F*@YW>*~|mk z49T*Hu{XXRS{k>U%WLTe6Df{vy)bFtdFoDPLQcl7`vYAfhu$zyl2M5jupqgZt*zXBTt?L=N)o_d-%G^gLo%cVs{H*em=!NJ+v zoJwxLMk8qS=FOYu7w2bdYis3(bPAZY&Mlm50b~_aWWW`G{{au&fp>%M0q5wlZ@3Ex2xO$E+dqApkdUBCJJH+QyR+jY zCnx9c?~hm5*WKd%?+Oz0eQtcVE{{JyKoY+U&tiTa)qM5ZwQEuE5eWz|zMDHGm8smlJ5}$@&Qly) zmY$x@_n;!67RV!ARoj;@o?U)Uflow4M6w@6zC{LA6ATXz|7`YDtf3WEUT7gs5;ECZ zT%_#69Cm_!V>ecD*(!^RZM?j^l$Aq2d=RO7mz%o?R3R!lI@Da&YmdMkWQhnjXm8`K zuFGlg0rkr;c4eQfQSpipvZ~y@V{Ci|T@kj!{nVUH{`T@GH$S;6C}|| zmV^G~XI>QZ3qAQ@H-1kt%#*vsSkXWP=ys*a$AU@XR5o-RuDb5RCw0sjH&<6|Tyv*_ z7E`AJ19(XI+uE>e+nZy2K0IpmA-ozI8al1?^N1H|d&~FO&Bw=Q#%8>DVL#WTy}!t^OCx zqQXcd^5)GqY2B6|GG#om{9VSkCO#CVryD(b^ax6Rgpu`kz}qTR@gp^M560CsHS0c{ zNENhJRoxI4j$6@n+L`+djl;Obt45M%qbogmOe54vPNlqr|AAd;rGvGIS+9LRY3H-M zey!&Lae-{nDhTMVjC;z|6jT#Hy9Lly92^`t3lqXmI<2j({|^$j99{Sy5{9U@u(!9b zs;ctw_1)jwg9Z_5PESWSJ~`Zo^RVIbMc%XYXxcrVFXNRYba&K@5 zprOP~eet!aN42Z%yr*AhWc0(c3_D4;YY82VUT2slp~|sb7;PEx6vEsqD27OL6WWll zu+)^4O=uCLWoTSnTxczbCy(l#0#B6r`1$Vvy_=n7*`sYU99#hTYb@;&4G9SesjwdR zz}9;!^^6GXmjvVEv@!ezr^vTl$-Xx|alxgtXU3iN`hl=0FB3p6W(!RhX7^+6}na=OwCI+xg zJ|#=$|OMyMKFS>MO@n%Y19t9m-RbFKDwy0*l1 zn}U&Tv2k(m#<)@TKZ4o#NN^`AUBA0Kj1@TOpEr$p_?wg!0!oz%w;SJnjCfvA8)u=~ zR)$C`iOM-hk4sJ-`1Xya(mhZ3QX6au3yRPAdj3PgE#&0kf^GE%YHMHg~jylorNJ{9|6e1k64< zG<11Ea(wHM*U8EEH*uLzapjpnTTTuSPk|b=|AM!s_&zvTU>diWce5j+V-9*L2K*IZ1 zt;~MBh^7^a&{HFKLB&x=Ky}T6qDn$1U&BtIfYt6c16}0WwF`BI2m1Q8&WoME$ol&F zcI-|AJr94eje-W8wpxSg2m6rl$7_ z-u$@E)`p7vJpd{nu+xE)uXkdTNV|C=%Hrb4-(Mvs$NU@ut61}>#!Jim$Z2C-xSj@3 z%jFVwhjf_vo*u?EO=T-aGocd?@z9P0Rcu z7sMrKYJgHMSC9gQLd}Dg^y~+kA8>OWD2S$jnFSgOJj$L0vz@TWU7U&}^z8lllQT(< z6G!0U;-av$siCS0n@^NRq6CbSJn_v#D zeO5`L&@!h7xi&xnxTW?!Cz&*!s&5=B<`jc=K9 zjusXj5~XEj_HQmy;%_)t+~rhG2{GXk|0agRO6g-P51;D=*e4ECwK0w~G6yJA|BN5y z+2Sep3jUEAv5?q2>h9?=<(S$VG&aw?;enV3RZDgYXz+XlCB(;zKb=fW zO1jM-Fh4g3is~R7*UpSSe}1F->%zjj=VvGTNzHDn@fjIR=zTU7S3GvKT9I7*>!hSd zwe}N$MX0H$REjBn4NOdc*53ot62>kRGvJ6!K$PK(_J94VQmo(x9i;&dEms8r&Mb!FsIt8{R(iC&oVX8Xl~dTd-{ zOU<(Z_6#6^H5x4)o#Wju7O2i@8(yBC`GtkEt-gk_pNUADc>!Nf7swVpQc+O>k_OUk zYilcT?q<)wYyDm)2Li*yWY{~PNHnmeA3k`=EL&1h@y+b}S%;_srMZQLaQsuRwK3Y~ zdjba4)?*dM=jZ2HYB7}*YVYZQc|$77wHVJgY-@j;uQQx30p z0)ODvXh)$5j05y}r}n*S31AAcd@|LZT*&?GPSL5-=YsaY2Y zLzd%R?kiK4QUBB3F7`Vw?SD3>NN59VcrXlQgByNzw}BptqJgCj3Jz|Fv==`C?+4Zd zG_H^*fB*ccpUu7HbNI>p2CKG;K5P?H30c_;I(;oP8Z@1U+u@ZTFV3gByNghTuaE)^ zRA@EpH{B(rrNK7=&;Q{vK%+AxT{{N`G|^~Jc>FAKe)hAiEl6T0ONLeEswyf3?H`_W zzJN9#fOrN(7C4KxwszLo&!0b`R0DSq7Z*pP(OSv>H%~&*30Rl@Wx*T3T`*a&zzrG% zaYD`7iT1ms>(c4*dL=zQ{dD5Hcke*M3>XT;@%Hpo&%AZE&=IY`|9?Th&^u@ajgYr& z1K~7_0c%B=L_9^|111?LSJ4^;Pc7T_f1zKs!$^7L#@yWC;16iE#8Ld5!|YXvoKdy7 zq@-4lZDw#CUiI|!H1i^KDXu~Xtq~_EaGu*nL{opwMxjBY0N}o;9{4z=(l_V^DQJJx zI(X1+d8!iSe|*;1-Q5L8?b2p~vaF5|?>?w&u6%R@o+$}@#;oG*pDo@G5|vwi{rWY@ z^KNO&=c`0r&I(g-KC-l^Q{sWDc7ZT68^LN;w+@!NvbeOg;b7yV=64Uv+RGDge#0pk z0o->{ac1rnhNQq&PNJ~BR>gA5=en5K(>!umQs6y0r>I!2fPTx|ZgE*r>IqP-|A7`B zcU#LmE-92JuB^G)2Zh4)qvRr&in0gEU0E5|LxKK1xZnmfY^{RzPV;TU#rM-bv82l1 ziEDusN$43K8nVq}l9X)t^T$QUNa2!^L>v5&80#1sg5yM@oTRkEQw0TEUsu<@nF)b5 z!@7=RLmolV1Ks2bgC~^1g$#Wv8xl9F3D~v(+9ZCygxe^OvnLozCs#HvFDsLgkufqd z+WOfNy&e{e#iT3_ayP(;n7Z7oEG@hDH*jpeMf_>RxalNyFVVkzzb1y~HdWCWzRT)Z zV)LV5Vrq&EG+vb> z*X3_0OvLV02?r8DFK{h|pW=ugihun?Nl8dT2pW^RDLM>3OBD*7A1$k^>=+wAe;KHbh@=X`x|g3OltGamNg&2`hDETzX#20@t!oyC`7^`SAZ z%^uV?2hZoI;^|MVIn`*E)>s;xh%%o00@%>v6vIeVH8kAa+;A)yMXh6b?gU-~NeLz5 z-yT7p;|)0rPIh)zFn)n`wBkmz#{Wm}=MNEcn&sr+xG5>=Ox)hlaRwd`tRhY%j=GHB z>CGb6b{%Lg$bMw5kV$KZ$^_7X2M_8Ta#B+0R}QBeT<`1Xl$Dncq)UGAJ#j}_D$5B8 zx>2F)hatW({9M|&@lr0aX=cK)VF0%X9M^RLJR&=DbLgyzadE5*pHw1lXB(3d5p-3c zKZGToL=*2uUD|LY8p+H%0@BiHbpbD5e!KLRn51y9q@u7nKsUM!BM|696CpuE{}d4x zE>5FkWF#5DyG~3*yb7`vt^kd>J_+o3C*Ihq9THK~oebDB0dM3J$8fRnNCLPRsz;U4 zv59ja47cB%0vyb5P~AN>>oDy{Xz}gI9xIF9XA^!O*#!} zNU72%(D3-wJOzU#l(Oasr?YGH>289KGoyrOWh(G9lqk}|5H2{wDOgR3 zKJ0rxVV@&8sFLN2^I!n%$E$Kzu!2M}2g6D|bb`POgC?cLwp+r(!!usz$kHD0?urNf zG&@*Y;$mXzs;Ys5Z?;*TC<7T4Ul2}3F-Ew3SEgB*1qbTsY%5b9y~FK=57ZHhK$gI7 z#$Y`F84uKz<^HrT3BdG2KYyM>zk`ZXTU&d}Yma9a{h|^roR!ht>U*M}SP{gA6(e*w z1+JC=R)jeq!;s{_b^6JUD>X7;2`$v>=?0A2%GbVV+6V`ZhW8-j* zit#2uNZ@?9`1nHTs=RDHY)YkQ^eb=k(z79i9I<&fLjym#kL_=Ai}<0k#mJP1;l5W~GTIWLCs zM1Jxit)Vzdeacf!IRUJKqDcbEpoOJnyu}q1hZ>NBmRYi_3BjLJ50wz`UrtYb$eW0w z6u=4kP*v4k1 zY5m`NdnqgeopG|0*4Av{#UNlX!To>%1UP-fuOAPfj~hDt~y`3nw{Gn zb&GDkziWuF^qa2U$WFh7+T9Udec7FG`|(9f@XR5i+^2sHPcnk~o*; zna%UBS){z~7-4(n^+Z)q@BNX9_R!G1okD|8LTIwtw-NC<_u7OgJKifgM$qFq8GI`U zf62yoypHw!PjDOQJCVgSZ^#$0t?&Q-Hk4^XaE%hPGb`G?lcKUm2`hna;C}1GL98<2XCQWj*&-56S<8tqXr5EaT}Y} zprD|bs}k-TToe;#1=9(;3np8kx6RoaPmXiK9U*JU%>h+nj{O--mJzrgK&w9D;y75O!Q${{pFZ zT^_v)B71|Y6?mUnJ5cwaNHDXokX_mFJ3CfSKit342??<|tR~2LF{&_P)ojl+(e{4Q zyTgB?MhzX4yUyea8{2-gy^hWhaGSY5y#oUew6&d`oCpXARzUY@!MC-sIXXVZ!^5+* zu=vdR{NI*w1V1ugSX>!rE3>^_seW|bQdPnH_3&hGFWjT2EPSA`1&mb+J z!Giw`S1#1ZSeOjQMw!l=3|b@51SwP1Hj7J3mR44GsOZG)>8g7@mmkDLM8v*+jceh5 zxMON#!`r}Od=;nRK09m9{a4*Vx{}h;K-TD)zavvYCd}$5oZ?*E#85bb9G3WW>9?Pd~eBu3T?XzBt=01>r~qNz`Anm}v-3?gAzYjANGdB5)zRX3*p= zuySKOFo|)skYDQ8H@WZ#4OuqB&w&!S3s^rbp97n(F3HKs0Rew+fB)?CbdF+Kz|qlB z(`W^8$A0`Brl8ThcqkoJ{C}BQLJ?+jVJXrc6|o{gpOh%rrpjk`h`q5)rd37PHdPv( zm6YZfO80+$(9FEmjwdtbu{GVJ9!?zgSz8A~e6GsesX~wKAHdGRwb?2Q;)omDX52p{ zhsD0(nwf8jkEM>=DaQ4IniZVTqoW|pMob%1SN9L(6R?$8af+sJaSSx!lmv|?Wn>r| z8BGHv0)x=1Kea;uBc^!!;{~%D4VK=5BmZ;(B3Pm7znK30B0v_jY`?)f1X1^oEIR>D z7U%d5ONn2#6y0p(36pZ|vmF4IulXK8t%!RdVJHffz0H%9H=Fs=Cljx}ft13S(9zZo z4FB+7*}?Dm4gZrJ6ea!d>|mC9J6{#TK*5_R?~1|0{Yfe_EfswYjc@S2@`m6vlA!13 z=b@3)xZMVhWCGIu#z5yQC^8FVPh&$Y@>W-Wwn%StSL$O-qi#+|Q><+pzw(l1|E0Z6LdXh9%G7-kPu)N5C@1v zhFY;-MhzoQW|FKUmzS1CPnLh#B|MBzu0*juBe$d6X&t6z+l|#TFu2x>jz)zGw)_xe zREWzbLOmf82iRwAB+nksiU@~vA47$vb<*dL03WFb67vG5a0q#<;(}>u zt$;d;jYZV+L&U1LuMY&8FjA(}jEw$r1*m3rk}@*sjk96eIyyO}rFXNjtCipU$B^UT z|33^l=-prD_*=0a=>4}JHx1w(3^~^ICW1QN_N-X`oRg9oD%&%yP0>MS0-ywtF#O=v15R`^J?+3$AB%pwSvJ`_tTUhE54>v=+k}w zy_N4yGYW#bIO2c*{+%}|O7*}90v&qHh4O;5CLm$t$B*1md@r_p=Go5g&3h>7=a2^> z?y@oo^KFwK{r%O{)O2;Jr~(e+{>!#AGW;)<0v38F{KdhD&+u=)cWMdPBBE{iLQp7D1(_h_jaGL=%#OU5o z!w$fwX{y`01hz{3EdyHfpDZQD|H@Jt5N7kNEFJIf z<2biHB@*W0SO;Mf7B*I4F9CE#sy!PzqE91S$7I7=Wq0m1Ia_w^yEbif60KK6tAMxQ zKFq+5(eJKx+j?D|k4TVgJ{S9tR58HXt@8zaP^XZ*ArZ%6({`)0&*8h47TIkKoSc4c z9D?hYClPLLRh3}=PJjW20d8CJv7C?kB68~@fFkXLcni*1hs;aAXvwp#eU|`YE(kccsrEWSnI1HY?iir^u7l-uu4mA0!rw$#M;*V)5 zSc!?`g?a8(@69`1Z=1V^#JBu>2r?(2f(ZyYgL4C87{7a|K|2A#ywLd_qzoh^UcGwd z56Lzdn}9uJ>xr8@4S8NLBwT)VlgDa!c>IH41Bk(pbZewUo}ReAn|O@b2Ff3L@a%TQ zIpNF5KW73^4W_*DS&1hj`HiX|w(S%d{U=}Vc4|t!-q+uGwJ8^W&;HF?8A%qEmkYcN zU3!tTu#xH0rIB(Y)bccG7`3u|jco3|qMl?V|JQT2CfXv?oC7_wGz`Nm^L{0J4o zd!(&x3L?3U4dDa+20eC=6M&1Zef7w!TWIQiW7y@&Yz+>0P*P%#t$|tZyo#d=q8o$~VbDP1 z$8e4JKZp9DJ1^+I_da3>5EWIo!(a_0R~^+Gkmi4jfY{PvLLCLX)eQSfm#u?Lrf{HsuQpRHwOpeOthiJ z-$2xmkucyo`-XbaFaVgTfBtwciURDrqXl9DwzPN!#Qkkc>5uOD~H zFdzbQr*M-{`=P38WS-Sd8S(IC{t80&4D|FC=H}HacW0-kPeE?rsp`)JObZ1OQYgOg zs;ZmQ5ak7{UERbR@JH2$52$P@_YDFmuwBCo{smHja(Q}sT6BO_oC)DWAJ)69u%qy- zWZGu^FVrJJQItIbs^9<}+uq&>Y{e}Sc7bS6w!db|U4YdQv=pb1N(fE^NcVY@#K#Kv z<3_->w3H4d?e6VWpj-d-YZx{gRCfs9YOlO_{)rdTV`T<4YJ=$)DkAf+Q_FJ~9}|Zb zuo1_&MQ|1$1Udiief{avClJ#aA)_oPSP|zMCN3*?ewZ5`9v+eKmrWP4kHanA9>B(h zL|Y{0KL5PO#YjX*m?Gm@m7AMeTPtqu0BT6_K@sM?R8bpnjqKnz;AnUZSiC|)+asm4 z0GVi0qsoelS7EJ-4i-!xa0FyW+J5{x94-Qqe_JzCL0F1lC<9BZG^io+HWSyp6ucBy?GFUg z5xp&CtpKLPhsJIj0(E7Yx0_%PM@B{h6w9AXc6@vz1wjFO2z1t;Lqk<}UNOUSUYn{D z8u26i^5cgqd@4xZ4c>T)5|XlR1E6AN?8Fp)L z`3FS?EF)cSB@ghp07uSCWlxeDD4N8D;yqVYRRw>cFRJ0TGe#Y(NQ>*T256i#(J-*H zwX`G-C8C$x5Q17gy}d3F51DTG3!@w~0tQOua-z!+kOrmirZh>--~)5kNi5G6zcVRO z$Kr$h&jHIynnGGrij028tA(I7!c_>kq3XdD7uIJX&ep)e*(MLNwxwJ7L|k?wcAPmpAlV8O&C{ZYnwaCN?%UB?S)ZS912-U=hU7ivSd9 zhmSO*$BqLOvSXi;H++`%aw!Y96Lj~`)SVaOUNJV2VKL4@6aNOwbHqabkVZGTnln5$%DWUz^(t7#@o zzl13(DQRsg-yclF_V@?K3Qm$0>=%#?JT@n(85j(Vjjz0adUCJ{8_U+tPWt)IV;7UW zpn{8TRz5yEP!qBacl!8X(Cp#Ed%(|OpeWJ5%I-C|!dD=;Y%8=vTCVI4AP^qGPIcjw zH=meT4KHIo?+$yw1{XKCmAN?tOv(*vqWUcrv041+BP2kT-`?6HMOiPFieJAD&JTvY zygi6O=TKN>d3daQ;yJaF;TS{LQhJot98*cax-=WK!~nI0$sn3Px+BQ@YR)wl78V#` zp+npkBArfz86+41fFUY)Z$)pIvCCW`F}Jmq0x2w3K|r9qbC}@$rV;5oV;vorC~RxN zT|9cTHCWL4h6Z&|UX~RRuVCg--Z0x@U(5dW;Mt$ep?qSaXr8KPf%VTY@y%$>a=?*} zodJgi-hN@4#Xvi9-(9*5~GSpnt+cA;!RBvm$MGfWRt5^MTId2Xb2rD%HS9qtSaa@h$a9@gk)sl`14R_Sr0>Z#F5Cyhdc8zWuc^bg@ubxiY4Ve;vi_xHvL7{`}?1O z!I5xmd!VLXpbJCO!^H*fg$4FXs+e83S3f-Z?ODH*E8-A|h0;(5&J;Ka!IK)PqJ(=) zR%+A=?(PQ=TY%P=-O@cdF#+ctMOnYvv|h)MCHXP(3(NsPgYy6_E4JfNioz!@ymw<5mX!q#VB~kSvWN&GRyffD%bAH z8>tovRPQc-%OIAs0WA%A027&P?`deFsk>RZxGVwSf=UGK1G&PX|D6a)BiAYEbvfK* zay1Y32z=a{faV2cxM03-mzpCPp7UJu z^?q=s!(ja4-&H9EHpFe?=I6o!0uYnCvd|B0?XH=QAc#;fB=CYxxNT_21&)VO=5-uq z7cMR?UGG32T;(LtJTNiG;6Ux>B;x4n>mwXt%aU)Jya%j8LYY9sDJZbW z`hYw`XOG&uX21v{=%e>$xR;VHD(>u9Ss7BKVB{lgwKwmU{tjpHb8>TI1cV7O3ooxm z$cU}#;o#v>C|-McCX5wAh^aM*?c_BWBMM(=l=XAQCnjJ9_TS3)F93J| z$;H!kiGh_5L=MW3{NeO;(z1=(`QaiNNVTVHBlj+|M{u9>=XsOSqpy;ZMnN#XG~Y@e zRKg*;n?n>~egT#}WNI#h;kmDTNM0m?8lGc%xM#E{PBUA{ZOc0fvn z#epkg3dAZ=gAgAX9qsDu40|UoTB1w2Zq#FHg#ws$x!a8A_!?96;oGOOyV@NRq>Uj0yMIuZT z&-el2_4>E9xxMWV;BP2K!fIi0G5pHa|DISp?*gS*02+mtx~Jh%b-?>Wjq0byL_ zhJ*z7Ki3$TAckBK#QcHw9G#zetG%$b^WV^SvLMx+g=aaq;mWIF;}fTUuJ?jZ27$fwT`X$mYnHO!a1EC#Wx=wgFiF zbA_jB7N%LSB;=yX@(4*tra^O2RV9=N?J$cI^9X@5S6H|VmC2JbJ4PXD*YX>jpx~e& zC?7|7R1=N>cW$|@AllIY#V)V=ePqP=-Q&-oVb=44DLeR$v(Han{u5XMfHdlP-Tt%h zgq}MH7}dwx-idXDjfyv@Uw;AjALDDu*$?wDP!G|%x=I`I`xOD7y1T6x+9Ltoak`^g z0Wqjn&&mWMvSItguapFe!YxdG1ycnUg{Lb1HcT%M3^>C4WV>JkTi;xBTtrk9XIO*-Sw-@#tjg+Jy6J}r zx5G8uI23FLNPj`@v-nCBKMQz_w$|3L^oml=-MKU%kpe>ns`hMkDAa8c4kCoUKo>D9 zrJab6kB6grTUl9qlb{Ws3Vn-?jt+YC*EhoKVq&jtO$;Pq^a3Ug0DOmsD7?$cLR#N} zY1F%I&;NdgECm2T818~iwu7mhBnKzHuBr+Kk^M?P%JIVeNdh%Vigkv0V<;sFPvFHBw=J1qd!4i}+(VXBO_k77RP-y;`2I-wtAFIeUq>sHOG=`Q zPvR?zs}Ppy!S>GD7HQ%eVHaW+y?0B^EtGG?R?BfVHQrc0AZ%$(U_EoSbb(|HWMDd~ zUZ}t%UZA`ihR)Q;oU>_XaocGfvz^}q9lJimIrvJ0CJ{gKFMl?eTiKWol~sR?4c^}{ zz#EU-5hK*NuOzLY)~g~?!6A3}OhZDL@lDg;_ba5+YfG$ZJ&darPgu$pQU$z@Wn@1# zpS4`QvafzyU92>>I0C=5<{fn|(ZVN#kMeEbGhMbj+{8B5Wc>7FyPBbD&U05bKbFZsy(7B{A|kA09*DB(D;Z zr>bqou+=uF4`FuFpHRJS<-X@(C|`b;pr4ob__?zqBKT|{SM8Zz$7a2pn^yz$ah*+C zHpC5O(D~m(!#^7kjfOlwH!W>4Ja<556nt!hLqmV|_pgwUaLn95d3qkL{cdgME&lHP zjHExti!4zqhJcMxyot+wS#1#gmdafZO&%)9y075E0jY~>80$8>s&oycTL}KLvoC^~ zr=t^dM!+5czknab{WLRZ@!DSupA|F&oQIg-UDT#7Kln8oip+5mIj-9XuW6$8drk#5 zle3ZJ`w4}r?rXffl2TGG9#Aq1a&tw1hyu6*^EPw3u|6O%!Qs+EL{1O!>x&KC(~zG? zZhMn~nV`R6p|+Stlc(zW@APMk$j4DN^Beh7)gtp5q7*g`Dn`7ZIO1PWFtLL8t)QUr zzq>y`JMK{3)zEkZV;XRfAQT5ZssWrxOl-XCH%rbIU&#UfPvn8gKd>QS9`R{~@8Diu z9tU2b$S3eVz4q6Xqo$x@LKN(u$x=)jZOOy9{XY&ayKbONu@3W1cb*`0|zKrQYg_alF{Cs)-GNYp>{p7ofsIW8a^YgCtZ!!hO&@AtkTOJBy^Eq8=gP#~=4> zJLpQM721B(?-hHzq-rpk|7Ejj&-}VucR48v$NBfC#x^oZA~oAEnltHt4A$1BLhOYY zt7^SfB)7#!ezLY9P0SlaI3biN-+crMAQXs?Y3&FXSnv?V_K;m3h)dW(+GO6KY3 zXDc+hUyzZb5lAE5YPUih=HXR)F3!*l0z*2E=*vM#Pwp-A8!4C3(G@*4HKNobWD}sf zX(Q`ZX!;Nk9dB>X1D-VN`)>w1W|lm3a(q(zR+F9IuXUUDnMf~8TFwJ(O%RQby(dH<@SBys<1W%NdlNSc{9Q%GC(w~Vn8fXHb8=ew-)5gKNFQJP z*EdVF1FeBJ2Gs7x&2`C3GzRhKdT`4l%Kqz{RO-dE>1I7@>}#z>B9QP^8v^m0#&f@a zO%y9i5&5;SG!;eiBFAAHpS%$AIeJ zo!GH=??CQ=pymZ=0x(?2nAVfq{>(q^k60 z#=C=A*j%}m?qnvYE+TaL?ZLpGtqgC*J`pG2~uj;0oCt max { + match = m + max = len(m) + } + } + return max, newFileSeedSegment(s.srcFile, match, s.canReflink, true) +} + +// Returns a slice of chunks from the seed. Compares chunks from position 0 +// with seed chunks starting at p. +func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk { + if len(chunks) == 0 { + return nil + } + var ( + sp int + dp = p + ) + for { + if dp >= len(s.index.Chunks) || sp >= len(chunks) { + break + } + if chunks[sp].ID != s.index.Chunks[dp].ID { + break + } + dp++ + sp++ + } + return s.index.Chunks[p:dp] +} + +type fileSeedSegment struct { + file string + chunks []IndexChunk + canReflink bool + needValidation bool +} + +func newFileSeedSegment(file string, chunks []IndexChunk, canReflink, needValidation bool) *fileSeedSegment { + return &fileSeedSegment{ + canReflink: canReflink, + needValidation: needValidation, + file: file, + chunks: chunks, + } +} + +func (f *fileSeedSegment) Size() uint64 { + if len(f.chunks) == 0 { + return 0 + } + last := f.chunks[len(f.chunks)-1] + return last.Start + last.Size - f.chunks[0].Start +} + +func (s *fileSeedSegment) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) { + if length != s.Size() { + return 0, 0, fmt.Errorf("unable to copy %d bytes from %s to %s : wrong size", length, s.file, dst.Name()) + } + src, err := os.Open(s.file) + if err != nil { + return 0, 0, err + } + defer src.Close() + + // Make sure the data we're planning on pulling from the file matches what + // the index says it is if that's required. + if s.needValidation { + if err := s.validate(src); err != nil { + return 0, 0, err + } + } + // Do a straight copy if reflinks are not supported or blocks aren't aligned + if !s.canReflink || s.chunks[0].Start%blocksize != offset%blocksize { + return s.copy(dst, src, s.chunks[0].Start, length, offset) + } + return s.clone(dst, src, s.chunks[0].Start, length, offset, blocksize) +} + +// Compares all chunks in this slice of the seed index to the underlying data +// and fails if they don't match. +func (s *fileSeedSegment) validate(src *os.File) error { + for _, c := range s.chunks { + b := make([]byte, c.Size) + if _, err := src.ReadAt(b, int64(c.Start)); err != nil { + return err + } + sum := sha512.Sum512_256(b) + if sum != c.ID { + return fmt.Errorf("seed index for %s doesn't match its data", s.file) + } + } + return nil +} + +// Performs a plain copy of everything in the seed to the target, not cloning +// of blocks. +func (s *fileSeedSegment) copy(dst, src *os.File, srcOffset, length, dstOffset uint64) (uint64, uint64, error) { + if _, err := dst.Seek(int64(dstOffset), os.SEEK_SET); err != nil { + return 0, 0, err + } + if _, err := src.Seek(int64(srcOffset), os.SEEK_SET); err != nil { + return 0, 0, err + } + + // Copy using a fixed buffer. Using io.Copy() with a LimitReader will make it + // create a buffer matching N of the LimitReader which can be too large + copied, err := io.CopyBuffer(dst, io.LimitReader(src, int64(length)), make([]byte, 64*1024)) + return uint64(copied), 0, err +} + +// Reflink the overlapping blocks in the two ranges and copy the bit before and +// after the blocks. +func (s *fileSeedSegment) clone(dst, src *os.File, srcOffset, srcLength, dstOffset, blocksize uint64) (uint64, uint64, error) { + if srcOffset%blocksize != dstOffset%blocksize { + return 0, 0, fmt.Errorf("reflink ranges not aligned between %s and %s", src.Name(), dst.Name()) + } + + srcAlignStart := (srcOffset/blocksize + 1) * blocksize + srcAlignEnd := (srcOffset + srcLength) / blocksize * blocksize + dstAlignStart := (dstOffset/blocksize + 1) * blocksize + alignLength := srcAlignEnd - srcAlignStart + dstAlignEnd := dstAlignStart + alignLength + + // fill the area before the first aligned block + var copied uint64 + c1, _, err := s.copy(dst, src, srcOffset, srcAlignStart-srcOffset, dstOffset) + if err != nil { + return c1, 0, err + } + copied += c1 + // fill the area after the last aligned block + c2, _, err := s.copy(dst, src, srcAlignEnd, srcOffset+srcLength-srcAlignEnd, dstAlignEnd) + if err != nil { + return copied + c2, 0, err + } + copied += c2 + // close the aligned blocks + return copied, alignLength, CloneRange(dst, src, srcAlignStart, alignLength, dstAlignStart) +} diff --git a/ioctl_linux.go b/ioctl_linux.go new file mode 100644 index 0000000..95f4657 --- /dev/null +++ b/ioctl_linux.go @@ -0,0 +1,65 @@ +// +build linux + +package desync + +import ( + "bytes" + "encoding/binary" + "io/ioutil" + "os" + "path/filepath" + "syscall" + "unsafe" + + "github.com/pkg/errors" +) + +// FICLONERANGE ioctl +const fiCloneRange = 0x4020940d + +// CanClone tries to determine if the filesystem allows cloning of blocks between +// two files. It'll create two tempfiles in the same dirs and attempt to perfom +// a 0-byte long block clone. If that's successful it'll return true. +func CanClone(dstFile, srcFile string) bool { + dst, err := ioutil.TempFile(filepath.Dir(dstFile), ".tmp") + if err != nil { + return false + } + defer dst.Close() + defer os.Remove(dst.Name()) + src, err := ioutil.TempFile(filepath.Dir(srcFile), ".tmp") + if err != nil { + return false + } + defer src.Close() + defer os.Remove(src.Name()) + err = CloneRange(dst, src, 0, 0, 0) + return err == nil +} + +// CloneRange uses the FICLONERANGE ioctl to de-dupe blocks between two files +// when using XFS or btrfs. Only works at block-boundaries. +func CloneRange(dst, src *os.File, srcOffset, srcLength, dstOffset uint64) error { + // Build a structure to hold the argument for this IOCTL + // struct file_clone_range { + // __s64 src_fd; + // __u64 src_offset; + // __u64 src_length; + // __u64 dest_offset; + // }; + arg := new(bytes.Buffer) + binary.Write(arg, binary.LittleEndian, uint64(src.Fd())) + binary.Write(arg, binary.LittleEndian, srcOffset) + binary.Write(arg, binary.LittleEndian, srcLength) + binary.Write(arg, binary.LittleEndian, dstOffset) + err := ioctl(dst.Fd(), fiCloneRange, uintptr(unsafe.Pointer(&arg.Bytes()[0]))) + return errors.Wrapf(err, "failure cloning blocks from %s to %s", src.Name(), dst.Name()) +} + +func ioctl(fd, operation, argp uintptr) error { + _, _, e := syscall.Syscall(syscall.SYS_IOCTL, fd, operation, argp) + if e != 0 { + return syscall.Errno(e) + } + return nil +} diff --git a/ioctl_nonlinux.go b/ioctl_nonlinux.go new file mode 100644 index 0000000..bb7200f --- /dev/null +++ b/ioctl_nonlinux.go @@ -0,0 +1,16 @@ +// +build !linux + +package desync + +import ( + "errors" + "os" +) + +func CanClone(dstFile string, srcFile string) bool { + return false +} + +func CloneRange(dst, src *os.File, srcOffset, srcLength, dstOffset uint64) error { + return errors.New("Not available on this platform") +} diff --git a/make.go b/make.go index 95953e1..aeded95 100644 --- a/make.go +++ b/make.go @@ -23,7 +23,7 @@ func IndexFromFile(ctx context.Context, name string, n int, min, avg, max uint64, - progress func(uint64), + pb ProgressBar, ) (Index, ChunkingStats, error) { ctx, cancel := context.WithCancel(ctx) @@ -52,6 +52,13 @@ func IndexFromFile(ctx context.Context, size := uint64(info.Size()) span := size / uint64(n) // initial spacing between chunkers + // Setup and start the progressbar if any + if pb != nil { + pb.SetTotal(int(info.Size())) + pb.Start() + defer pb.Finish() + } + // Create/initialize the workers worker := make([]*pChunker, n) for i := 0; i < n; i++ { @@ -102,8 +109,8 @@ func IndexFromFile(ctx context.Context, for chunk := range w.results { // Assemble the list of chunks in the index index.Chunks = append(index.Chunks, chunk) - if progress != nil { - progress(chunk.Start + chunk.Size) + if pb != nil { + pb.Set(int(chunk.Start + chunk.Size)) } stats.incAccepted() } diff --git a/nullseed.go b/nullseed.go new file mode 100644 index 0000000..2958500 --- /dev/null +++ b/nullseed.go @@ -0,0 +1,136 @@ +package desync + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" +) + +type nullChunkSeed struct { + id ChunkID + blockfile *os.File + canReflink bool +} + +func newNullChunkSeed(dstFile string, blocksize uint64, max uint64) (*nullChunkSeed, error) { + blockfile, err := ioutil.TempFile(filepath.Dir(dstFile), ".tmp-block") + if err != nil { + return nil, err + } + var canReflink bool + if CanClone(dstFile, blockfile.Name()) { + canReflink = true + b := make([]byte, blocksize) + if _, err := blockfile.Write(b); err != nil { + return nil, err + } + } + return &nullChunkSeed{ + id: NewNullChunk(max).ID, + canReflink: canReflink, + blockfile: blockfile, + }, nil +} + +func (s *nullChunkSeed) close() error { + if s.blockfile != nil { + s.blockfile.Close() + return os.Remove(s.blockfile.Name()) + } + return nil +} + +func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { + if len(chunks) == 0 { + return 0, nil + } + var n int + for _, c := range chunks { + if c.ID != s.id { + break + } + n++ + } + if n == 0 { + return 0, nil + } + return n, &nullChunkSection{ + from: chunks[0].Start, + to: chunks[n-1].Start + chunks[n-1].Size, + blockfile: s.blockfile, + canReflink: s.canReflink, + } +} + +type nullChunkSection struct { + from, to uint64 + blockfile *os.File + canReflink bool +} + +func (s *nullChunkSection) Size() uint64 { return s.to - s.from } + +func (s *nullChunkSection) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) { + if length != s.Size() { + return 0, 0, fmt.Errorf("unable to copy %d bytes to %s : wrong size", length, dst.Name()) + } + + // When cloning isn'a available we'd normally have to copy the 0 bytes into + // the target range. But if that's already blank (because it's a new/truncated + // file) there's no need to copy 0 bytes. + if !s.canReflink { + if isBlank { + return 0, 0, nil + } + return s.copy(dst, offset, s.Size()) + } + return s.clone(dst, offset, length, blocksize) +} + +func (s *nullChunkSection) copy(dst *os.File, offset, length uint64) (uint64, uint64, error) { + if _, err := dst.Seek(int64(offset), os.SEEK_SET); err != nil { + return 0, 0, err + } + // Copy using a fixed buffer. Using io.Copy() with a LimitReader will make it + // create a buffer matching N of the LimitReader which can be too large + copied, err := io.CopyBuffer(dst, io.LimitReader(nullReader{}, int64(length)), make([]byte, 64*1024)) + return uint64(copied), 0, err +} + +func (s *nullChunkSection) clone(dst *os.File, offset, length, blocksize uint64) (uint64, uint64, error) { + dstAlignStart := (offset/blocksize + 1) * blocksize + dstAlignEnd := (offset + length) / blocksize * blocksize + + // fill the area before the first aligned block + var copied, cloned uint64 + c1, _, err := s.copy(dst, offset, dstAlignStart-offset) + if err != nil { + return c1, 0, err + } + copied += c1 + // fill the area after the last aligned block + c2, _, err := s.copy(dst, dstAlignEnd, offset+length-dstAlignEnd) + if err != nil { + return copied + c2, 0, err + } + copied += c2 + + for blkOffset := dstAlignStart; blkOffset < dstAlignEnd; blkOffset += blocksize { + if err := CloneRange(dst, s.blockfile, 0, blocksize, blkOffset); err != nil { + return copied, cloned, err + } + cloned += blocksize + } + return copied, cloned, nil +} + +type nullReader struct{} + +func (r nullReader) Read(b []byte) (n int, err error) { + for i := range b { + b[i] = 0 + } + return len(b), nil +} diff --git a/progress.go b/progress.go new file mode 100644 index 0000000..ec1519b --- /dev/null +++ b/progress.go @@ -0,0 +1,12 @@ +package desync + +// ProgressBar allows clients to provide their own implementations of graphical +// progress visualizations. Optional, can be nil to disable this feature. +type ProgressBar interface { + SetTotal(total int) + Start() + Finish() + Increment() int + Add(add int) int + Set(current int) +} diff --git a/progressbar.go b/progressbar.go deleted file mode 100644 index 0db389a..0000000 --- a/progressbar.go +++ /dev/null @@ -1,8 +0,0 @@ -package desync - -type ProgressBar interface { - Add(n int) - Set(n int) - Start() - Stop() -} diff --git a/seed.go b/seed.go new file mode 100644 index 0000000..d08ca3a --- /dev/null +++ b/seed.go @@ -0,0 +1,50 @@ +package desync + +import ( + "os" +) + +// Filesystem block size +const DefaultBlockSize = 4096 + +// Seed represent a source of chunks other than the store. Typically a seed is +// another index+blob that present on disk already and is used to copy or clone +// existing chunks or blocks into the target from. +type Seed interface { + LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) +} + +// SeedSegment represents a matching range between a Seed and a a file being +// assembled from an Index. It's used to copy or reflink data from seeds into +// a target file during an extract operation. +type SeedSegment interface { + Size() uint64 + WriteInto(dst *os.File, offset, end, blocksize uint64, isBlank bool) (copied uint64, cloned uint64, err error) +} + +// indexSegment represents a contiguous section of an index which is used when +// assembling a file from seeds. first/last are positions in the index. +type indexSegment struct { + index Index + first, last int +} + +func (s indexSegment) lengthChunks() int { + return s.last - s.first + 1 +} + +func (s indexSegment) lengthBytes() uint64 { + return s.end() - s.start() +} + +func (s indexSegment) start() uint64 { + return s.index.Chunks[s.first].Start +} + +func (s indexSegment) end() uint64 { + return s.index.Chunks[s.last].Start + s.index.Chunks[s.last].Size +} + +func (s indexSegment) chunks() []IndexChunk { + return s.index.Chunks[s.first : s.last+1] +} diff --git a/selfseed.go b/selfseed.go new file mode 100644 index 0000000..1dadd39 --- /dev/null +++ b/selfseed.go @@ -0,0 +1,120 @@ +package desync + +import ( + "sync" +) + +// FileSeed is used to populate a contiguous seed during extraction in order +// to copy/clone ranges that were written to the output file earlier. This is +// to potentially dedup/reflink duplicate chunks or ranges of chunks within the +// same file. +type selfSeed struct { + file string + index Index + pos map[ChunkID][]int + canReflink bool + written int + mu sync.RWMutex + cache map[int]int +} + +// newSelfSeed initializes a new seed based on the file being extracted +func newSelfSeed(file string, index Index) (*selfSeed, error) { + s := selfSeed{ + file: file, + pos: make(map[ChunkID][]int), + index: index, + canReflink: CanClone(file, file), + cache: make(map[int]int), + } + return &s, nil +} + +// add records a new segment that's been written to the file. Since only contiguous +// ranges of chunks are considered and writing happens concurrently, the segment +// written here will not be usable until all earlier chunks have been written as +// well. +func (s *selfSeed) add(segment indexSegment) { + s.mu.Lock() + defer s.mu.Unlock() + + // Make a record of this segment in the cache since those could come in + // out-of-order + s.cache[segment.first] = segment.last + 1 + + // Advance pos until we find a chunk we don't yet have recorded while recording + // the chunk positions we do have in the position map used to find seed matches. + // Since it's guaranteed that the numbers are only increasing, we drop old numbers + // from the cache map to keep it's size to a minimum and only store out-of-sequence + // numbers + for { + // See if we can advance the write pointer in the self-seed which requires + // consecutive chunks. If we don't have the next segment yet, just keep it + // in the cache until we do. + next, ok := s.cache[s.written] + if !ok { + break + } + // Record all chunks in this segment as written by adding them to the position map + for i := s.written; i < next; i++ { + chunk := s.index.Chunks[i] + s.pos[chunk.ID] = append(s.pos[chunk.ID], i) + } + delete(s.cache, s.written) + s.written = next + } +} + +// LongestMatchWith returns the longest sequence of of chunks anywhere in Source +// that match b starting at b[0]. If there is no match, it returns nil +func (s *selfSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { + if len(chunks) == 0 || len(s.index.Chunks) == 0 { + return 0, nil + } + s.mu.RLock() + pos, ok := s.pos[chunks[0].ID] + s.mu.RUnlock() + if !ok { + return 0, nil + } + // From every position of b[0] in the source, find a slice of + // matching chunks. Then return the longest of those slices. + var ( + match []IndexChunk + max int + ) + for _, p := range pos { + m := s.maxMatchFrom(chunks, p) + if len(m) > max { + match = m + max = len(m) + } + } + return max, newFileSeedSegment(s.file, match, s.canReflink, false) +} + +// Returns a slice of chunks from the seed. Compares chunks from position 0 +// with seed chunks starting at p. +func (s *selfSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk { + if len(chunks) == 0 { + return nil + } + s.mu.RLock() + written := s.written + s.mu.RUnlock() + var ( + sp int + dp = p + ) + for { + if dp >= written || sp >= len(chunks) { + break + } + if chunks[sp].ID != s.index.Chunks[dp].ID { + break + } + dp++ + sp++ + } + return s.index.Chunks[p:dp] +} diff --git a/selfseed_test.go b/selfseed_test.go new file mode 100644 index 0000000..6965bad --- /dev/null +++ b/selfseed_test.go @@ -0,0 +1,144 @@ +package desync + +import ( + "context" + "crypto/md5" + "crypto/rand" + "crypto/sha512" + "io/ioutil" + "os" + "testing" +) + +func TestSelfSeed(t *testing.T) { + // Setup a temporary store + store, err := ioutil.TempDir("", "store") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(store) + + s, err := NewLocalStore(store) + if err != nil { + t.Fatal(err) + } + + // Build a number of fake chunks that can then be used in the test in any order + type rawChunk struct { + id ChunkID + data []byte + } + size := 1024 + numChunks := 10 + chunks := make([]rawChunk, numChunks) + + for i := 0; i < numChunks; i++ { + b := make([]byte, size) + rand.Read(b) + id := sha512.Sum512_256(b) + chunks[i] = rawChunk{id, b} + b, err := Compress(b) + if err != nil { + t.Fatal(err) + } + if err = s.StoreChunk(id, b); err != nil { + t.Fatal(err) + } + } + + // Define tests with files with different content, by building files out + // of sets of byte slices to create duplication or not between the target and + // its seeds. Also define a min/max of bytes that should be cloned (from the + // self-seed). That number can vary since even with 1 worker goroutine there + // another feeder goroutine which can influence timings/results a little. + tests := map[string]struct { + index []int + minCloned int + maxCloned int + }{ + "single chunk": { + index: []int{0}, + minCloned: 0, + maxCloned: 0, + }, + "repeating single chunk": { + index: []int{0, 0, 0, 0, 0}, + minCloned: 3 * size, + maxCloned: 4 * size, + }, + "repeating chunk sequence": { + index: []int{0, 1, 2, 0, 1, 2, 2}, + minCloned: 4 * size, + maxCloned: 4 * size, + }, + "repeating chunk sequence mid file": { + index: []int{1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3}, + minCloned: 7 * size, + maxCloned: 7 * size, + }, + "repeating chunk sequence reversed": { + index: []int{0, 1, 2, 2, 1, 0}, + minCloned: 2 * size, + maxCloned: 3 * size, + }, + "non-repeating chunks": { + index: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + minCloned: 0, + maxCloned: 0, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Build an index from the target chunks + var idx Index + var b []byte + for i, p := range test.index { + chunk := IndexChunk{ + ID: chunks[p].id, + Start: uint64(i * size), + Size: uint64(size), + } + b = append(b, chunks[p].data...) + idx.Chunks = append(idx.Chunks, chunk) + } + + // Calculate the expected checksum + sum := md5.Sum(b) + + // Build a temp target file to extract into + dst, err := ioutil.TempFile("", "dst") + if err != nil { + t.Fatal(err) + } + defer dst.Close() + defer os.Remove(dst.Name()) + + // Extract the file + stats, err := AssembleFile(context.Background(), dst.Name(), idx, s, nil, 1, nil) + if err != nil { + t.Fatal(err) + } + + // Compare the checksums to that of the input data + b, err = ioutil.ReadFile(dst.Name()) + if err != nil { + t.Fatal(err) + } + outSum := md5.Sum(b) + if sum != outSum { + t.Fatal("checksum of extracted file doesn't match expected") + } + + // Compare to the expected number of bytes copied or cloned from the seed + fromSeed := int(stats.BytesCopied + stats.BytesCloned) + if fromSeed < test.minCloned { + t.Fatalf("expected min %d bytes copied/cloned from self-seed, got %d", test.minCloned, fromSeed) + } + if fromSeed > test.maxCloned { + t.Fatalf("expected max %d bytes copied/cloned from self-seed, got %d", test.maxCloned, fromSeed) + } + }) + } + +} diff --git a/sequencer.go b/sequencer.go new file mode 100644 index 0000000..fda91a4 --- /dev/null +++ b/sequencer.go @@ -0,0 +1,34 @@ +package desync + +type SeedSequencer struct { + seeds []Seed + index Index + current int +} + +func NewSeedSequencer(idx Index, src ...Seed) *SeedSequencer { + return &SeedSequencer{ + seeds: src, + index: idx, + } +} + +func (r *SeedSequencer) Next() (indexSegment, SeedSegment, bool) { + var ( + max uint64 + advance int = 1 + source SeedSegment + ) + for _, s := range r.seeds { + n, m := s.LongestMatchWith(r.index.Chunks[r.current:]) + if n > 0 && m.Size() > max { + source = m + advance = n + max = m.Size() + } + } + + segment := indexSegment{index: r.index, first: r.current, last: r.current + advance - 1} + r.current += advance + return segment, source, r.current >= len(r.index.Chunks) +} diff --git a/verifyindex.go b/verifyindex.go index 59992f3..1a27ab0 100644 --- a/verifyindex.go +++ b/verifyindex.go @@ -11,7 +11,7 @@ import ( // VerifyIndex re-calculates the checksums of a blob comparing it to a given index. // Fails if the index does not match the blob. -func VerifyIndex(ctx context.Context, name string, idx Index, n int, progress func()) error { +func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb ProgressBar) error { var ( wg sync.WaitGroup mu sync.Mutex @@ -21,6 +21,13 @@ func VerifyIndex(ctx context.Context, name string, idx Index, n int, progress fu ctx, cancel := context.WithCancel(ctx) defer cancel() + // Setup and start the progressbar if any + if pb != nil { + pb.SetTotal(len(idx.Chunks)) + pb.Start() + defer pb.Finish() + } + stat, err := os.Stat(name) if err != nil { return err @@ -51,8 +58,8 @@ func VerifyIndex(ctx context.Context, name string, idx Index, n int, progress fu var err error for c := range in { // Update progress bar if any - if progress != nil { - progress() + if pb != nil { + pb.Increment() } // Position the filehandle to the place where the chunk is meant to come