Skip to content

Commit

Permalink
Implement filesystem-level cloning to reduce disk usage and improve p…
Browse files Browse the repository at this point in the history
…erformance (#55)

* Add seed/reflink support

* Reimplement progress bar using a 3rd-party library

* Add basic seed test

* Break out the seedfile tests into a dedicated section and add more

* Auto-detect FS blocksize used for reflinking

* Implement self-seed to allow reflinking within the same file as it's being written

* When cloning from the nullchunk seed isn't available and the target file is blank, no need to copy 0 byte ranges

* First documentation for reflink/seed feature

* Build ioctl Linux-only code only on Linux

* Add -seed-dir option for the extract command

* Add -stats option to extract command to show details about the operation

* Use a fixed buffer when copying from seeds to avoid io.Copy running out of memory

* Add test for self-seed

* Tests should clean up temp files

* Use less memory when recording write progress in the self-seed

* Save CPU time, less looping, when writing self-seed

* Fix chunk alignment issue. Only clone from self-seed if source and target chunks are aligned

* Don't write stats to STDOUT
  • Loading branch information
folbricht authored Aug 22, 2018
1 parent 637756e commit d9ce6d3
Show file tree
Hide file tree
Showing 28 changed files with 1,283 additions and 234 deletions.
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Among the distinguishing factors:
- Allows FUSE mounting of blob indexes
- S3 protocol support to access chunk stores for read operations and some some commands that write chunks
- Stores and retrieves index files from remote index stores such as HTTP, SFTP and S3
- Reflinking matching blocks (rather than copying) from seed files if supported by the filesystem (currently only Btrfs and XFS)

## Parallel chunking
One of the significant differences to casync is that desync attempts to make chunking faster by utilizing more CPU resources, chunking data in parallel. Depending on the chosen degree of concurrency, the file is split into N equal parts and each part is chunked independently. While the chunking of each part is ongoing, part1 is trying to align with part2, and part3 is trying to align with part4 and so on. Alignment is achieved once a common split point is found in the overlapping area. If a common split point is found, the process chunking the previous part stops, eg. part1 chunker stops, part2 chunker keeps going until it aligns with part3 and so on until all split points have been found. Once all split points have been determined, the file is opened again (N times) to read, compress and store the chunks. While in most cases this process achieves significantly reduced chunking times at the cost of CPU, there are edge cases where chunking is only about as fast as upstream casync (with more CPU usage). This is the case if no split points can be found in the data between min and max chunk size as is the case if most or all of the file consists of 0-bytes. In this situation, the concurrent chunking processes for each part will not align with each other and a lot of effort is wasted. The table below shows how the type of data that is being chunked can influence runtime of each operation. `make` refers to the process of chunking, while `extract` refers to re-assembly of blobs from chunks.
Expand All @@ -31,6 +32,17 @@ Command | Mostly/All 0-bytes | Typical data
make | Slow (worst-case) - Likely comparable to casync | Fast - Parallel chunking
extract | Extremely fast - Effectively the speed of a truncate() syscall | Fast - Done in parallel, usually limited by I/O

## Seeds and reflinks

Copy-on-write filesystems such as Btrfs and XFS support cloning of blocks between files in order to save disk space as well as improve extraction performance. To utilize this feature, desync uses several seeds to clone sections of files rather than reading the data from chunk-stores and copying it in place:
- A built-in seed for Null-chunks (a chunk of Max chunk site containing only 0 bytes). This can significantly reduce the disk usage of files with large 0-byte ranges, such as VM images. This will effectively turn an eager-zeroed VM disk into a sparse disk while retaining all the advantages of eager-zeroed disk images.
- A build-in Self-seed. As chunks are being written to the destination file, the file itself becomes a seed. If one chunk, or a series of chunks is used again later in the file, it'll be cloned from the position written previously. This saves storage when the file contains several repetitive sections.
- Seed files and their indexes can be provided when extracting a file. For this feature, it's necessary to already have the index plus its blob on disk. So for example `image-v1.vmdk` and `image-v1.vmdk.caibx` can be used as seed for the extract operation of `image-v2.vmdk`. The amount of additional disk space required to store `image-v2.vmdk` will be the delta between it and `image-v1.vmdk`.

![](doc/seed.png)

Even if cloning is not available, seeds are still useful. `desync` automatically determines if reflinks are available (and the block size used in the filesystem). If cloning is not supported, sections are copied instead of cloned. Copying still improves performance and reduces the load created by retrieving chunks over the network and decompressing them.

## Tool

The tool is provided for convenience. It uses the desync library and makes most features of it available in a consistent fashion. It does not match upsteam casync's syntax exactly, but tries to be similar at least.
Expand All @@ -43,7 +55,7 @@ go get -u github.com/folbricht/desync/cmd/desync
```

### Subcommands
- `extract` - build a blob from an index file
- `extract` - build a blob from an index file, optionally using seed indexes+blobs
- `verify` - verify the integrity of a local store
- `list-chunks` - list all chunk IDs contained in an index file
- `cache` - populate a cache from index files without extracting a blob or archive
Expand All @@ -60,6 +72,8 @@ go get -u github.com/folbricht/desync/cmd/desync

### Options (not all apply to all commands)
- `-s <store>` Location of the chunk store, can be local directory or a URL like ssh://hostname/path/to/store. Multiple stores can be specified, they'll be queried for chunks in the same order. The `chop`, `make`, `tar` and `prune` commands support updating chunk stores in S3, while `verify` only operates on a local store.
- `-seed <indexfile>` Specifies a seed file and index for the `extract` command. The tool expects the matching file to be present and have the same name as the index file, without the `.caibx` extension.
- `-seed-dir <dir>` Specifies a directory containing seed files and their indexes for the `extract` command. For each index file in the directory (`*.caibx`) there needs to be a matching blob without the extension.
- `-c <store>` Location of a chunk store to be used as cache. Needs to be writable.
- `-n <int>` Number of concurrent download jobs and ssh sessions to the chunk store.
- `-r` Repair a local cache by removing invalid chunks. Only valid for the `verify` command.
Expand Down Expand Up @@ -189,6 +203,19 @@ Use multiple stores, specify the local one first to improve performance.
desync extract -s /some/local/store -s ssh://192.168.1.1/path/to/casync.store/ somefile.tar.caibx somefile.tar
```

Extract version 3 of a disk image using the previous 2 versions as seed for cloning (if supported), or copying. Note, when providing a seed like `-seed <file>.ext.caibx`, it is assumed that `<file>.ext` is available next to the index file, and matches the index.
```
desync extract -s /local/store \
-seed image-v1.qcow2.caibx \
-seed image-v2.qcow2.caibx \
image-v3.qcow2.caibx image-v3.qcow2
```

Extract an image using several seeds present in a directory. Each of the `.caibx` files in the directory needs to have a matching blob of the same name. It is possible for the source index file to be in the same directory also (it'll be skipped automatically).
```
desync extract -s /local/store -seed-dir /path/to/images image-v3.qcow2.caibx image-v3.qcow2
```

Mix and match remote stores and use a local cache store to improve performance.
```
desync extract \
Expand Down
171 changes: 94 additions & 77 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"sync"
"syscall"

"github.com/pkg/errors"
)
Expand All @@ -18,18 +19,34 @@ import (
// confirm if the data matches what is expected and only populate areas that
// differ from the expected content. This can be used to complete partly
// written files.
func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, progress func()) error {
func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, n int, pb ProgressBar) (*ExtractStats, error) {
type Job struct {
segment indexSegment
source SeedSegment
}
var (
wg sync.WaitGroup
mu sync.Mutex
pErr error
in = make(chan IndexChunk)
nullChunk = NewNullChunk(idx.Index.ChunkSizeMax)
isBlank bool
wg sync.WaitGroup
mu sync.Mutex
pErr error
in = make(chan Job)
isBlank bool
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Setup and start the progressbar if any
if pb != nil {
pb.SetTotal(len(idx.Chunks))
pb.Start()
defer pb.Finish()
}

// Initialize stats to be gathered during extraction
stats := &ExtractStats{
BytesTotal: idx.Length(),
ChunksTotal: len(idx.Chunks),
}

// Helper function to record and deal with any errors in the goroutines
recordError := func(err error) {
mu.Lock()
Expand All @@ -46,7 +63,7 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p
case os.IsNotExist(err):
f, err := os.Create(name)
if err != nil {
return err
return stats, err
}
f.Close()
isBlank = true
Expand All @@ -58,33 +75,62 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p
// confirm there's enough disk space, but it allows for an optimization
// when dealing with the Null Chunk
if err := os.Truncate(name, idx.Length()); err != nil {
return err
return stats, err
}

// Keep a record of what's already been written to the file and can be
// re-used if there are duplicate chunks
var written fileChunks
// Determine the blocksize of the target file which is required for reflinking
blocksize := blocksizeOfFile(name)

// Prepend a nullchunk seed to the list of seeds to make sure we read that
// before any large null sections in other seed files
ns, err := newNullChunkSeed(name, blocksize, idx.Index.ChunkSizeMax)
if err != nil {
return stats, err
}
defer ns.close()

// Start a self-seed which will become usable once chunks are written contigously
// beginning at position 0.
ss, err := newSelfSeed(name, idx)
if err != nil {
return stats, err
}
seeds = append([]Seed{ns, ss}, seeds...)

// Record the total number of seeds and blocksize in the stats
stats.Seeds = len(seeds)
stats.Blocksize = blocksize

// Start the workers, each having its own filehandle to write concurrently
for i := 0; i < n; i++ {
wg.Add(1)
f, err := os.OpenFile(name, os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("unable to open file %s, %s", name, err)
return stats, fmt.Errorf("unable to open file %s, %s", name, err)
}
defer f.Close()
go func() {
for c := range in {
if progress != nil {
progress()
for job := range in {
if pb != nil {
pb.Add(job.segment.lengthChunks())
}
// See if we can skip the chunk retrieval and decompression if the
// null chunk is being requested. If a new file is truncated to the
// right size beforehand, there's nothing to do since everything
// defaults to 0 bytes.
if isBlank && c.ID == nullChunk.ID {
if job.source != nil {
stats.addChunksFromSeed(uint64(job.segment.lengthChunks()))
offset := job.segment.start()
length := job.segment.lengthBytes()
copied, cloned, err := job.source.WriteInto(f, offset, length, blocksize, isBlank)
if err != nil {
recordError(err)
continue
}
stats.addBytesCopied(copied)
stats.addBytesCloned(cloned)
// Record this segment's been written in the self-seed to make it
// available going forward
ss.add(job.segment)
continue
}
c := job.segment.chunks()[0]
// If we operate on an existing file there's a good chance we already
// have the data written for this chunk. Let's read it from disk and
// compare to what is expected.
Expand All @@ -96,19 +142,15 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p
}
sum := sha512.Sum512_256(b)
if sum == c.ID {
written.add(c)
// Record this chunk's been written in the self-seed
ss.add(job.segment)
// Record we kept this chunk in the file (when using in-place extract)
stats.incChunksInPlace()
continue
}
}
// Before pulling a chunk from the store, let's see if that same chunk's
// been written to the file already. If so, we can simply clone it from
// that location.
if cw, ok := written.get(c.ID); ok {
if err := cloneInFile(f, c, cw); err != nil {
recordError(err)
}
continue
}
// Record this chunk having been pulled from the store
stats.incChunksFromStore()
// Pull the (compressed) chunk from the store
b, err := s.GetChunk(c.ID)
if err != nil {
Expand Down Expand Up @@ -141,70 +183,45 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p
recordError(err)
continue
}
// Make a record of this chunk being available in the file now
written.add(c)
// Record this chunk's been written in the self-seed
ss.add(job.segment)
}
wg.Done()
}()
}

// Feed the workers, stop if there are any errors
// Let the sequencer break up the index into segments, feed the workers, and
// stop if there are any errors
seq := NewSeedSequencer(idx, seeds...)
loop:
for _, c := range idx.Chunks {
for {
// See if we're meant to stop
select {
case <-ctx.Done():
break loop
default:
}
in <- c
chunks, from, done := seq.Next()
in <- Job{chunks, from}
if done {
break
}
}
close(in)

wg.Wait()
return pErr
}

// fileChunks acts as a kind of in-file cache for chunks already written to
// the file being assembled. Every chunk ref that has been successfully written
// into the file is added to it. If another write operation requires the same
// (duplicate) chunk again, it can just copied out of the file to the new
// position, rather than requesting it from a (possibly remote) store again
// and decompressing it.
type fileChunks struct {
mu sync.RWMutex
chunks map[ChunkID]IndexChunk
}

func (f *fileChunks) add(c IndexChunk) {
f.mu.Lock()
defer f.mu.Unlock()
if len(f.chunks) == 0 {
f.chunks = make(map[ChunkID]IndexChunk)
}
f.chunks[c.ID] = c
}

func (f *fileChunks) get(id ChunkID) (IndexChunk, bool) {
f.mu.RLock()
defer f.mu.RUnlock()
c, ok := f.chunks[id]
return c, ok
return stats, pErr
}

// cloneInFile copies a chunk from one position to another in the same file.
// Used when duplicate chunks are used in a file. TODO: The current implementation
// uses just the one given filehandle, copies into memory, then writes to disk.
// It may be more efficient to open a 2nd filehandle, seek, and copy directly
// with a io.LimitReader.
func cloneInFile(f *os.File, dst, src IndexChunk) error {
if src.ID != dst.ID || src.Size != dst.Size {
return errors.New("internal error: different chunks requested for in-file copy")
func blocksizeOfFile(name string) uint64 {
stat, err := os.Stat(name)
if err != nil {
return DefaultBlockSize
}
b := make([]byte, int64(src.Size))
if _, err := f.ReadAt(b, int64(src.Start)); err != nil {
return err
switch sys := stat.Sys().(type) {
case *syscall.Stat_t:
return uint64(sys.Blksize)
default:
return DefaultBlockSize
}
_, err := f.WriteAt(b, int64(dst.Start))
return err
}
Loading

0 comments on commit d9ce6d3

Please sign in to comment.