Skip to content

Commit

Permalink
Support in-place extraction with -k option and re-use chunks already …
Browse files Browse the repository at this point in the history
…written to the output file in other places (#38)
  • Loading branch information
folbricht authored Jun 9, 2018
1 parent 9830e1f commit 922254f
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 14 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go get -u github.com/folbricht/desync/cmd/desync
- `-t` Trust all certificates presented by HTTPS stores. Allows the use of self-signed certs when using a HTTPS chunk server.
- `-key` Key file in PEM format used for HTTPS `chunk-server` command. Also requires a certificate with `-cert`
- `-cert` Certificate file in PEM format used for HTTPS `chunk-server` command. Also requires `-key`.
- `-k` Keep partially assembled files in place when `extract` fails or is interrupted. The command can then be restarted and it'll not have to retrieve completed parts again.

### Environment variables
- `CASYNC_SSH_PATH` overrides the default "ssh" with a command to run when connecting to a remote SSH or SFTP chunk store
Expand Down Expand Up @@ -163,6 +164,11 @@ desync extract \
somefile.tar.caibx somefile.tar
```

Extract a file in-place (`-k` option). If this operation fails, the file will remain partially complete and can be restarted without the need to re-download chunks from the remote SFTP store. Use `-k` when a local cache is not available and the extract may be interrupted.
```
desync extract -k -s sftp://192.168.1.1/path/to/store file.caibx file.tar
```

Verify a local cache. Errors will be reported to STDOUT, since `-r` is not given, nothing invalid will be removed.
```
desync verify -s /some/local/store
Expand Down
102 changes: 98 additions & 4 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package desync
import (
"context"
"crypto/sha512"
"errors"
"fmt"
"os"
"sync"
Expand All @@ -12,13 +13,18 @@ import (
// goroutines, creating one filehandle for the file "name" per goroutine
// and writes to the file simultaneously. If progress is provided, it'll be
// called when a chunk has been processed.
// If the input file exists and is not empty, the algorithm will first
// confirm if the data matches what is expected and only populate areas that
// differ from the expected content. This can be used to complete partly
// written files.
func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, progress func()) error {
var (
wg sync.WaitGroup
mu sync.Mutex
pErr error
in = make(chan IndexChunk)
nullChunk = NewNullChunk(idx.Index.ChunkSizeMax)
isBlank bool
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -33,17 +39,35 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p
cancel()
}

// Determine is the target exists and create it if not
info, err := os.Stat(name)
switch {
case os.IsNotExist(err):
f, err := os.Create(name)
if err != nil {
return err
}
f.Close()
isBlank = true
case info.Size() == 0:
isBlank = true
}

// Truncate the output file to the full expected size. Not only does this
// confirm there's enough disk space, but it allow allows for an optimization
// confirm there's enough disk space, but it allows for an optimization
// when dealing with the Null Chunk
if err := os.Truncate(name, idx.Length()); err != nil {
return err
}

// Keep a record of what's already been written to the file and can be
// re-used if there are duplicate chunks
var written fileChunks

// Start the workers, each having its own filehandle to write concurrently
for i := 0; i < n; i++ {
wg.Add(1)
f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, 0666)
f, err := os.OpenFile(name, os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("unable to open file %s, %s", name, err)
}
Expand All @@ -54,10 +78,34 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p
progress()
}
// See if we can skip the chunk retrieval and decompression if the
// null chunk is being requested. If the file is truncated to the
// null chunk is being requested. If a new file is truncated to the
// right size beforehand, there's nothing to do since everything
// defaults to 0 bytes.
if c.ID == nullChunk.ID {
if isBlank && c.ID == nullChunk.ID {
continue
}
// If we operate on an existing file there's a good chance we already
// have the data written for this chunk. Let's read it from disk and
// compare to what is expected.
if !isBlank {
b := make([]byte, c.Size)
if _, err := f.ReadAt(b, int64(c.Start)); err != nil {
recordError(err)
continue
}
sum := sha512.Sum512_256(b)
if sum == c.ID {
written.add(c)
continue
}
}
// Before pulling a chunk from the store, let's see if that same chunk's
// been written to the file already. If so, we can simply clone it from
// that location.
if cw, ok := written.get(c.ID); ok {
if err := cloneInFile(f, c, cw); err != nil {
recordError(err)
}
continue
}
// Pull the (compressed) chunk from the store
Expand Down Expand Up @@ -92,6 +140,8 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, p
recordError(err)
continue
}
// Make a record of this chunk being available in the file now
written.add(c)
}
wg.Done()
}()
Expand All @@ -113,3 +163,47 @@ loop:
wg.Wait()
return pErr
}

// fileChunks acts as a kind of in-file cache for chunks already written to
// the file being assembled. Every chunk ref that has been successfully written
// into the file is added to it. If another write operation requires the same
// (duplicate) chunk again, it can just copied out of the file to the new
// position, rather than requesting it from a (possibly remote) store again
// and decompressing it.
type fileChunks struct {
mu sync.RWMutex
chunks map[ChunkID]IndexChunk
}

func (f *fileChunks) add(c IndexChunk) {
f.mu.Lock()
defer f.mu.Unlock()
if len(f.chunks) == 0 {
f.chunks = make(map[ChunkID]IndexChunk)
}
f.chunks[c.ID] = c
}

func (f *fileChunks) get(id ChunkID) (IndexChunk, bool) {
f.mu.RLock()
defer f.mu.RUnlock()
c, ok := f.chunks[id]
return c, ok
}

// cloneInFile copies a chunk from one position to another in the same file.
// Used when duplicate chunks are used in a file. TODO: The current implementation
// uses just the one given filehandle, copies into memory, then writes to disk.
// It may be more efficient to open a 2nd filehandle, seek, and copy directly
// with a io.LimitReader.
func cloneInFile(f *os.File, dst, src IndexChunk) error {
if src.ID != dst.ID || src.Size != dst.Size {
return errors.New("internal error: different chunks requested for in-file copy")
}
b := make([]byte, int64(src.Size))
if _, err := f.ReadAt(b, int64(src.Start)); err != nil {
return err
}
_, err := f.WriteAt(b, int64(dst.Start))
return err
}
140 changes: 140 additions & 0 deletions assemble_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package desync

import (
"bytes"
"context"
"crypto/md5"
"io"
"io/ioutil"
"os"
"testing"
)

func TestExtract(t *testing.T) {
// Make a test file that's guaranteed to have duplicate chunks.
b, err := ioutil.ReadFile("testdata/chunker.input")
if err != nil {
t.Fatal(err)
}
for i := 0; i < 4; i++ { // Replicate it a few times to make sure we get dupes
b = append(b, b...)
}
b = append(b, make([]byte, 2*ChunkSizeMaxDefault)...) // want to have at least one null-chunk in the input
in, err := ioutil.TempFile("", "in")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(in.Name())
if _, err := io.Copy(in, bytes.NewReader(b)); err != nil {
t.Fatal(err)
}
in.Close()

// Record the checksum of the input file, used to compare to the output later
inSum := md5.Sum(b)

// Chunk the file to get an index
index, _, err := IndexFromFile(
context.Background(),
in.Name(),
10,
ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault,
nil,
)
if err != nil {
t.Fatal(err)
}

// Chop up the input file into a (temporary) local store
store, err := ioutil.TempDir("", "store")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(store)

s, err := NewLocalStore(store)
if err != nil {
t.Fatal(err)
}
if err := ChopFile(context.Background(), in.Name(), index.Chunks, s, 10, nil); err != nil {
t.Fatal(err)
}

// Make a blank store - used to test a case where no chunk *should* be requested
blankstore, err := ioutil.TempDir("", "blankstore")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(blankstore)
bs, err := NewLocalStore(blankstore)
if err != nil {
t.Fatal(err)
}

// Prepare output files for each test - first a non-existing one
out1, err := ioutil.TempFile("", "out1")
if err != nil {
t.Fatal(err)
}
os.RemoveAll(out1.Name())

// This one is a complete file matching what we exepct at the end
out2, err := ioutil.TempFile("", "out2")
if err != nil {
t.Fatal(err)
}
if _, err := io.Copy(out2, bytes.NewReader(b)); err != nil {
t.Fatal(err)
}
out2.Close()
defer os.RemoveAll(out2.Name())

// Incomplete or damaged file that has most but not all data
out3, err := ioutil.TempFile("", "out3")
if err != nil {
t.Fatal(err)
}
b[0] ^= 0xff // flip some bits
b[len(b)-1] ^= 0xff
b = append(b, 0) // make it longer
if _, err := io.Copy(out3, bytes.NewReader(b)); err != nil {
t.Fatal(err)
}
out3.Close()
defer os.RemoveAll(out3.Name())

// At this point we have the data needed for the test setup
// in - Temp file that represents the original input file
// inSub - MD5 of the input file
// index - Index file for the input file
// s - Local store containing the chunks needed to rebuild the input file
// bs - A blank local store, all GetChunk fail on it
// out1 - Just a non-existing file that gets assembled
// out2 - The output file already fully complete, no GetChunk should be needed
// out3 - Partial/damaged file with most, but not all data correct

tests := map[string]struct {
outfile string
store Store
}{
"extract to new file": {outfile: out1.Name(), store: s},
"extract to complete file": {outfile: out2.Name(), store: bs},
"extract to incomplete file": {outfile: out3.Name(), store: s},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
if err := AssembleFile(context.Background(), test.outfile, index, test.store, 10, nil); err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadFile(test.outfile)
if err != nil {
t.Fatal(err)
}
outSum := md5.Sum(b)
if inSum != outSum {
t.Fatal("checksum of extracted file doesn't match expected")
}
})
}
}
33 changes: 23 additions & 10 deletions cmd/desync/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import (

const extractUsage = `desync extract [options] <caibx> <output>
Read a caibx and build a blob reading chunks from one or more casync stores.`
Read a caibx and build a blob reading chunks from one or more casync stores.
When using -k, the blob will be extracted in-place utilizing existing data and
the target file will not be deleted on error. This can be used to restart a
failed prior extraction without having to retrieve completed chunks again.
`

func extract(ctx context.Context, args []string) error {
var (
Expand All @@ -24,6 +28,7 @@ func extract(ctx context.Context, args []string) error {
storeLocations = new(multiArg)
clientCert string
clientKey string
inPlace bool
)
flags := flag.NewFlagSet("extract", flag.ExitOnError)
flags.Usage = func() {
Expand All @@ -37,6 +42,7 @@ func extract(ctx context.Context, args []string) error {
flags.BoolVar(&desync.TrustInsecure, "t", false, "trust invalid certificates")
flags.StringVar(&clientCert, "clientCert", "", "Path to Client Certificate for TLS authentication")
flags.StringVar(&clientKey, "clientKey", "", "Path to Client Key for TLS authentication")
flags.BoolVar(&inPlace, "k", false, "extract the file in place and keep it in case of error")
flags.Parse(args)

if flags.NArg() < 2 {
Expand Down Expand Up @@ -80,11 +86,13 @@ func extract(ctx context.Context, args []string) error {
return err
}

// Write the output
return writeOutput(ctx, outFile, idx, s, n)
if inPlace {
return writeInplace(ctx, outFile, idx, s, n)
}
return writeWithTmpFile(ctx, outFile, idx, s, n)
}

func writeOutput(ctx context.Context, name string, idx desync.Index, s desync.Store, n int) error {
func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desync.Store, n int) error {
// Prepare a tempfile that'll hold the output during processing. Close it, we
// just need the name here since it'll be opened multiple times during write.
// Also make sure it gets removed regardless of any errors below.
Expand All @@ -95,13 +103,8 @@ func writeOutput(ctx context.Context, name string, idx desync.Index, s desync.St
tmpfile.Close()
defer os.Remove(tmpfile.Name())

// If this is a terminal, we want a progress bar
p := NewProgressBar(len(idx.Chunks), "")
p.Start()
defer p.Stop()

// Build the blob from the chunks, writing everything into the tempfile
if err = desync.AssembleFile(ctx, tmpfile.Name(), idx, s, n, func() { p.Add(1) }); err != nil {
if err = writeInplace(ctx, tmpfile.Name(), idx, s, n); err != nil {
return err
}

Expand All @@ -115,3 +118,13 @@ func writeOutput(ctx context.Context, name string, idx desync.Index, s desync.St
// generates a tempfile name. Set 0644 perms here after rename (ignoring umask)
return os.Chmod(name, 0644)
}

func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.Store, n int) error {
// If this is a terminal, we want a progress bar
p := NewProgressBar(len(idx.Chunks), "")
p.Start()
defer p.Stop()

// Build the blob from the chunks, writing everything into given filename
return desync.AssembleFile(ctx, name, idx, s, n, func() { p.Add(1) })
}

0 comments on commit 922254f

Please sign in to comment.