Skip to content

Commit

Permalink
Merge pull request #53 from folbricht/issue-52
Browse files Browse the repository at this point in the history
Allow indexes to be read/written to STDIN/STDOUT. Implements #52
  • Loading branch information
folbricht authored Aug 19, 2018
2 parents e2702a3 + a13af84 commit 637756e
Show file tree
Hide file tree
Showing 17 changed files with 128 additions and 45 deletions.
28 changes: 27 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Among the distinguishing factors:
- While casync supports very small min chunk sizes, optimizations in desync require min chunk sizes larger than the window size of the rolling hash used (currently 48 bytes). The tool's default chunk sizes match the defaults used in casync, min 16k, avg 64k, max 256k.
- Allows FUSE mounting of blob indexes
- S3 protocol support to access chunk stores for read operations and some some commands that write chunks
- Stores and retrieves index files from remote index stores such as HTTP, SFTP and S3

## Parallel chunking
One of the significant differences to casync is that desync attempts to make chunking faster by utilizing more CPU resources, chunking data in parallel. Depending on the chosen degree of concurrency, the file is split into N equal parts and each part is chunked independently. While the chunking of each part is ongoing, part1 is trying to align with part2, and part3 is trying to align with part4 and so on. Alignment is achieved once a common split point is found in the overlapping area. If a common split point is found, the process chunking the previous part stops, eg. part1 chunker stops, part2 chunker keeps going until it aligns with part3 and so on until all split points have been found. Once all split points have been determined, the file is opened again (N times) to read, compress and store the chunks. While in most cases this process achieves significantly reduced chunking times at the cost of CPU, there are edge cases where chunking is only about as fast as upstream casync (with more CPU usage). This is the case if no split points can be found in the data between min and max chunk size as is the case if most or all of the file consists of 0-bytes. In this situation, the concurrent chunking processes for each part will not align with each other and a lot of effort is wasted. The table below shows how the type of data that is being chunked can influence runtime of each operation. `make` refers to the process of chunking, while `extract` refers to re-assembly of blobs from chunks.
Expand Down Expand Up @@ -93,6 +94,15 @@ Not all types of stores support all operations. The table below lists the suppor
| Prune | yes | yes | no | yes | no |
| Verify | yes | yes | no | no | no |

### Remote indexes
Indexes can be stored and retrieved from remote locations via SFTP, S3, and HTTP. Storing indexes remotely is optional and deliberately separate from chunk storage. While it's possible to store indexes in the same location as chunks in the case of SFTP and S3, this should only be done in secured environments. The built-in HTTP chunk store (`chunk-server` command) can not be used as index server. Use the `index-server` command instead to start an index server that serves indexes and can optionally store them as well (with `-w`).

Using remote indexes, it is possible to use desync completely file-less. For example when wanting to share a large file with `mount-index`, one could read the index from an index store like this:
```
desync mount-index -s http://chunk.store/store http://index.store/myindex.caibx /mnt/image
```
No file would need to be stored on disk in this case.

### S3 chunk stores
desync supports reading from and writing to chunk stores that offer an S3 API, for example hosted in AWS or running on a local server. When using such a store, credentials are passed into the tool either via environment variables `S3_ACCESS_KEY` and `S3_SECRET_KEY` or, if multiples are required, in the config file. Care is required when building those URLs. Below a few examples:

Expand Down Expand Up @@ -194,6 +204,11 @@ Extract a file in-place (`-k` option). If this operation fails, the file will re
desync extract -k -s sftp://192.168.1.1/path/to/store file.caibx file.tar
```

Extract a file using a remote index stored in an HTTP index store
```
desync extract -k -s sftp://192.168.1.1/path/to/store http://192.168.1.2/file.caibx file.tar
```

Verify a local cache. Errors will be reported to STDOUT, since `-r` is not given, nothing invalid will be removed.
```
desync verify -s /some/local/store
Expand Down Expand Up @@ -250,6 +265,13 @@ Start a chunk server on port 8080 acting as proxy for other remote HTTP and SSH
desync chunk-server -s http://192.168.1.1/ -s ssh://192.168.1.2/store -c cache -l :8080
```

Start a writable index server, chunk a file and store the index.
```
server# desync index-server -s /mnt/indexes -w -l :8080
client# desync make -s /some/store http://192.168.1.1:8080/file.vmdk.caibx file.vmdk
```

Copy all chunks referenced in an index file from a remote HTTP store to a remote SFTP store.
```
desync cache -s ssh://192.168.1.2/store -c sftp://192.168.1.3/path/to/store /path/to/index.caibx
Expand All @@ -275,6 +297,11 @@ FUSE mount an index file. This will make the indexed blob available as file unde
desync mount-index -s /some/local/store index.caibx /some/mnt
```

FUSE mount a chunked and remote index file. First a (small) index file is read from the index-server which is used to re-assemble a larger index file and pipe it into the 2nd command that then mounts it.
```
desync cat -s http://192.168.1.1/store http://192.168.1.2/small.caibx | desync mount-index -s http://192.168.1.1/store - /mnt/point
```

Show information about an index file to see how many of its chunks are present in a local store or an S3 store. The local store is queried first, S3 is only queried if the chunk is not present in the local store. The output will be in JSON format (`-j`) for easier processing in scripts.
```
desync info -j -s /tmp/store -s s3+http://127.0.0.1:9000/store /path/to/index
Expand All @@ -286,5 +313,4 @@ desync info -j -s /tmp/store -s s3+http://127.0.0.1:9000/store /path/to/index

## TODOs
- Pre-allocate the output file to avoid fragmentation when using extract command
- Support retrieval of index files from the chunk store
- Allow on-disk chunk cache to optionally be stored uncompressed, such that blocks can be directly reflinked (rather than copied) into files, when on a platform and filesystem where reflink support is available.
2 changes: 1 addition & 1 deletion cmd/desync/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const cacheUsage = `desync cache [options] <index> [<index>...]
Read chunk IDs from caibx or caidx files from one or more stores without
writing to disk. Can be used (with -c) to populate a store with desired chunks
either to be used as cache, or to populate a store with chunks referenced in an
index file.`
index file. Use '-' to read (a single) index from STDIN.`

func cache(ctx context.Context, args []string) error {
var (
Expand Down
8 changes: 5 additions & 3 deletions cmd/desync/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (
"github.com/folbricht/desync"
)

const catUsage = `desync cat [options] <caibx> [<outputfile>]
const catUsage = `desync cat [options] <index> [<outputfile>]
Stream a caibx to stdout or a file-like object, optionally seeking and limiting
Stream a blob to stdout or a file-like object, optionally seeking and limiting
the read length.
Unlike extract, this supports output to FIFOs, named pipes, and other
non-seekable destinations.
This is inherently slower than extract as while multiple chunks can be
retrieved concurrently, writing to stdout cannot be parallelized.`
retrieved concurrently, writing to stdout cannot be parallelized.
Use '-' to read the index from STDIN.`

func cat(ctx context.Context, args []string) error {
var (
Expand Down
6 changes: 3 additions & 3 deletions cmd/desync/chop.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/folbricht/desync"
)

const chopUsage = `desync chop [options] <caibx> <file>
const chopUsage = `desync chop [options] <index> <file>
Reads the index file and extracts all referenced chunks from the file
into a local or S3 store.`
Reads the index and extracts all referenced chunks from the file into a store,
local or remote. Use '-' to read the index from STDIN.`

func chop(ctx context.Context, args []string) error {
var (
Expand Down
7 changes: 4 additions & 3 deletions cmd/desync/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"github.com/folbricht/tempfile"
)

const extractUsage = `desync extract [options] <caibx> <output>
const extractUsage = `desync extract [options] <index> <output>
Read a caibx and build a blob reading chunks from one or more casync stores.
Reads an index and builds a blob reading chunks from one or more chunk stores.
When using -k, the blob will be extracted in-place utilizing existing data and
the target file will not be deleted on error. This can be used to restart a
failed prior extraction without having to retrieve completed chunks again.
failed prior extraction without having to retrieve completed chunks again. Use
'-' to read the index from STDIN.
`

func extract(ctx context.Context, args []string) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/desync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const infoUsage = `desync info [-s <store>] <index>
Displays information about the provided index, such as number of chunks. If a
store is provided, it'll also show how many of the chunks are present in the
store.`
store. Use '-' to read the index from STDIN.`

func info(ctx context.Context, args []string) error {
var (
Expand Down
5 changes: 3 additions & 2 deletions cmd/desync/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"os"
)

const listUsage = `desync list-chunks <caibx>
const listUsage = `desync list-chunks <index>
Reads the index file and prints the list of chunk IDs in it.`
Reads the index file and prints the list of chunk IDs in it. Use '-' to read
the index from STDIN.`

func list(ctx context.Context, args []string) error {
var (
Expand Down
9 changes: 5 additions & 4 deletions cmd/desync/make.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
const makeUsage = `desync make [options] <index> <file>
Creates chunks from the input file and builds an index. If a chunk store is
provided with -s, such as a local directory or S3 store, it split the input
file according to the index and stores the chunks.`
provided with -s, such as a local directory or S3 store, it splits the input
file according to the index and stores the chunks. Use '-' to write the index
from STDOUT.`

func makeCmd(ctx context.Context, args []string) error {
var (
Expand Down Expand Up @@ -97,8 +98,8 @@ func makeCmd(ctx context.Context, args []string) error {
ps.Stop()
}

fmt.Println("Chunks produced:", stats.ChunksAccepted)
fmt.Println("Overhead:", stats.ChunksProduced-stats.ChunksAccepted)
fmt.Fprintln(os.Stderr, "Chunks produced:", stats.ChunksAccepted)
fmt.Fprintln(os.Stderr, "Overhead:", stats.ChunksProduced-stats.ChunksAccepted)

return storeCaibxFile(index, indexFile, sOpts)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/desync/mount-index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const mountIdxUsage = `desync mount-index [options] <index> <mountpoint>
FUSE mount of the blob in the index file. It makes the (single) file in
the index available for read access. Use 'extract' if the goal is to
assemble the whole blob locally as that is more efficient.
assemble the whole blob locally as that is more efficient. Use '-' to read
the index from STDIN.
`

func mountIdx(ctx context.Context, args []string) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/desync/progressbar.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (p *ConsoleProgressBar) draw() {
if progress < 0 || blank < 0 { // No need to panic if anything's off
return
}
fmt.Printf("\r%s|%s%s|", p.prefix, strings.Repeat("=", progress), strings.Repeat(" ", blank))
fmt.Fprintf(os.Stderr, "\r%s|%s%s|", p.prefix, strings.Repeat("=", progress), strings.Repeat(" ", blank))
}

type NullProgressBar struct{}
Expand Down
3 changes: 2 additions & 1 deletion cmd/desync/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
const pruneUsage = `desync prune [options] <index> [<index>..]
Read chunk IDs in from index files and delete any chunks from a local (or s3)
store that are not referenced in the index files.`
store that are not referenced in the index files. Use '-' to read a single index
from STDIN.`

func prune(ctx context.Context, args []string) error {
var (
Expand Down
13 changes: 8 additions & 5 deletions cmd/desync/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func writableIndexStore(location string, opts storeOptions) (desync.IndexWriteSt
}
store, ok := s.(desync.IndexWriteStore)
if !ok {
return nil, indexName, fmt.Errorf("store '%s' does not support writing", location)
return nil, indexName, fmt.Errorf("index store '%s' does not support writing", location)
}
return store, indexName, nil
}
Expand Down Expand Up @@ -198,10 +198,13 @@ func indexStoreFromLocation(location string, opts storeOptions) (desync.IndexSto
return nil, "", err
}
case "":

s, err = desync.NewLocaIndexlStore(p.String())
if err != nil {
return nil, "", err
if location == "-" {
s, _ = desync.NewConsoleIndexStore()
} else {
s, err = desync.NewLocaIndexStore(p.String())
if err != nil {
return nil, "", err
}
}
default:
return nil, "", fmt.Errorf("Unsupported store access scheme %s", loc.Scheme)
Expand Down
21 changes: 14 additions & 7 deletions cmd/desync/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
"github.com/folbricht/desync"
)

const tarUsage = `desync tar <catar|caidx> <source>
const tarUsage = `desync tar <catar|index> <source>
Encodes a directory tree into a catar archive or alternatively an index file
with the archive chunked in a local or S3 store.`
with the archive chunked in a local or S3 store. Use '-' to write the output,
catar or index to STDOUT.`

func tar(ctx context.Context, args []string) error {
var (
Expand Down Expand Up @@ -58,12 +59,18 @@ func tar(ctx context.Context, args []string) error {

// Just make the catar and stop if that's all that was required
if !makeIndex {
f, err := os.Create(output)
if err != nil {
return err
var w io.Writer
if output == "-" {
w = os.Stdout
} else {
f, err := os.Create(output)
if err != nil {
return err
}
defer f.Close()
w = f
}
defer f.Close()
return desync.Tar(ctx, f, sourceDir)
return desync.Tar(ctx, w, sourceDir)
}

sOpts := storeOptions{
Expand Down
5 changes: 3 additions & 2 deletions cmd/desync/untar.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/folbricht/desync"
)

const untarUsage = `desync untar <catar|caidx> <target>
const untarUsage = `desync untar <catar|index> <target>
Extracts a directory tree from a catar file or an index file.`
Extracts a directory tree from a catar file or an index. Use '-' to read the
index from STDIN.`

func untar(ctx context.Context, args []string) error {
var (
Expand Down
5 changes: 3 additions & 2 deletions cmd/desync/verifyindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"github.com/folbricht/desync"
)

const verifyIndexUsage = `desync verify-index [options] <caibx> <file>
const verifyIndexUsage = `desync verify-index [options] <index> <file>
Verifies an index file matches the content of a blob.
Verifies an index file matches the content of a blob. Use '-' to read the index
from STDIN.
`

func verifyIndex(ctx context.Context, args []string) error {
Expand Down
39 changes: 39 additions & 0 deletions consoleindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package desync

import (
"io/ioutil"
"os"

"io"
)

// ConsoleIndexStore is used for writing/reading indexes from STDOUT/STDIN
type ConsoleIndexStore struct{}

// NewConsoleStore creates an instance of an indexStore that reads/writes to and
// from console
func NewConsoleIndexStore() (ConsoleIndexStore, error) {
return ConsoleIndexStore{}, nil
}

// GetIndexReader returns a reader from STDIN
func (s ConsoleIndexStore) GetIndexReader(string) (io.ReadCloser, error) {
return ioutil.NopCloser(os.Stdin), nil
}

// GetIndex reads an index from STDIN and returns it.
func (s ConsoleIndexStore) GetIndex(string) (i Index, e error) {
return IndexFromReader(os.Stdin)
}

// StoreIndex writes the provided indes to STDOUT. The name is ignored.
func (s ConsoleIndexStore) StoreIndex(name string, idx Index) error {
_, err := idx.WriteTo(os.Stdout)
return err
}

func (r ConsoleIndexStore) String() string {
return "-"
}

func (s ConsoleIndexStore) Close() error { return nil }
15 changes: 7 additions & 8 deletions localindex.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
package desync

import (
"os"
"strings"

"fmt"

"io"
"os"
"strings"

"github.com/pkg/errors"
)

// LocalStore index store
// LocalIndexStore is used to read/write index files on local disk
type LocalIndexStore struct {
Path string
}

// NewLocalStore creates an instance of a local castore, it only checks presence
// of the store
func NewLocaIndexlStore(path string) (LocalIndexStore, error) {
func NewLocaIndexStore(path string) (LocalIndexStore, error) {
info, err := os.Stat(path)
if err != nil {
return LocalIndexStore{}, err
Expand All @@ -32,7 +30,8 @@ func NewLocaIndexlStore(path string) (LocalIndexStore, error) {
return LocalIndexStore{Path: path}, nil
}

// Get and Index Reader from a local store, returns an error if the specified index file does not exist.
// GetIndexReader returns a reader of an index file in the store or an error if
// the specified index file does not exist.
func (s LocalIndexStore) GetIndexReader(name string) (rdr io.ReadCloser, e error) {
return os.Open(s.Path + name)
}
Expand All @@ -51,7 +50,7 @@ func (s LocalIndexStore) GetIndex(name string) (i Index, e error) {
return idx, err
}

// GetIndex returns an Index structure from the store
// StoreIndex stores an index in the index store with the given name.
func (s LocalIndexStore) StoreIndex(name string, idx Index) error {
// Write the index to file
i, err := os.Create(s.Path + name)
Expand Down

0 comments on commit 637756e

Please sign in to comment.