Skip to content

Commit

Permalink
Issue 50 - Allow configuration of individual stores and support uncom…
Browse files Browse the repository at this point in the history
…pressed stores (#62)

* Introduce new Chunk object in preparation for uncompressed store support

* Make store features more customizable by passing StoreOptions to all constructors and allowing the options to be read from config or command line

* Add skip-verify option to allow for efficient chaining of chunk stores

* Support for uncompressed stores and chunk-servers

* Add documentation

* Fix failing tests and simplify chunk object usage

* Fix out-of-bounds error dealing with index stores without path or URL component
  • Loading branch information
folbricht authored Sep 29, 2018
1 parent 7efc19a commit f8b8acc
Show file tree
Hide file tree
Showing 53 changed files with 1,048 additions and 521 deletions.
46 changes: 33 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Among the distinguishing factors:
- Where the upstream command has chosen to optimize for storage efficiency (f/e, being able to use local files as "seeds", building temporary indexes into them), this command chooses to optimize for runtime performance (maintaining a local explicit chunk store, avoiding the need to reindex) at cost to storage efficiency.
- Where the upstream command has chosen to take full advantage of Linux platform features, this client chooses to implement a minimum featureset and, while high-value platform-specific features (such as support for btrfs reflinks into a decompressed local chunk cache) might be added in the future, the ability to build without them on other platforms will be maintained.
- SHA512/256 is currently the only supported hash function.
- Only chunk store using zstd compression are supported at this point.
- Only chunk stores using zstd compression as well uncompressed are supported at this point.
- Supports local stores as well as remote stores (as client) over SSH, SFTP and HTTP
- Built-in HTTP(S) chunk server that can proxy multiple local or remote stores and also supports caching.
- Drop-in replacement for casync on SSH servers when serving chunks read-only
Expand All @@ -22,6 +22,7 @@ Among the distinguishing factors:
- Allows FUSE mounting of blob indexes
- S3 protocol support to access chunk stores for read operations and some some commands that write chunks
- Stores and retrieves index files from remote index stores such as HTTP, SFTP and S3
- Built-in HTTP(S) index server to read/write indexes
- Reflinking matching blocks (rather than copying) from seed files if supported by the filesystem (currently only Btrfs and XFS)

## Parallel chunking
Expand Down Expand Up @@ -136,24 +137,37 @@ This is a store running on the local machine on port 9000 without SSL.
s3+http://127.0.0.1:9000/store
```

#### Previous S3 storage layout
Before April 2018, chunks in S3 stores were kept in a flat layout, with the name being the checksum of the chunk. Since then, the layout was modified to match that of local stores: `<4-checksum-chars>/<checksum>.cacnk` This change allows the use of other tools to convert or copy stores between local and S3 stores. To convert an existing s3 store from the old format, a command `upgrade-s3` is available in the tool.
### Compressed vs Uncompressed chunk stores
By default, desync reads and writes chunks in compressed form to all supported stores. This is in line with upstream casync's goal of storing in the most efficient way. It is however possible to change this behavior by providing desync with a config file (see Configuration section below). Disabling compression and store chunks uncompressed may reduce latency in some use-cases and improve performance. desync supports reading and writing uncompressed chunks to SFTP, S3, HTTP and local stores and caches. If more than one store is used, each of those can be configured independently, for example it's possible to read compressed chunks from S3 while using a local uncompressed cache for best performance. However, care needs to be taken when using the `chunk-server` command and building chains of chunk store proxies to avoid shifting the decompression load onto the server (it's possible this is actually desirable).

In the setup below, a client reads chunks from an HTTP chunk server which itself gets chunks from S3.
```
<Client> ---> <HTTP chunk server> ---> <S3 store>
```
If the client configures the HTTP chunk server to be uncompressed (`chunk-server` needs to be started with the `-u` option), and the chunk server reads compressed chunks from S3, then the chunk server will have to decompress every chunk that's requested before responding to the client. If the chunk server was reading uncompressed chunks from S3, there would be no overhead.

Compressed and uncompressed chunks can live in the same store and don't interfere with each other. A store that's configured for compressed chunks by configuring it client-side will not see the uncompressed chunks that may be present. `prune` and `verify` too will ignore any chunks written in the other format. Both kinds of chunks can be accessed by multiple clients concurrently and independently.

### Configuration

For most use cases, it is sufficient to use the tool's default configuration not requiring a config file. Having a config file `$HOME/.config/desync/config.json` allows for further customization of timeouts, error retry behaviour or credentials that can't be set via command-line options or environment variables. To view the current configuration, use `desync config`. If no config file is present, this will show the defaults. To create a config file allowing custom values, use `desync config -w` which will write the current configuration to the file, then edit the file.
For most use cases, it is sufficient to use the tool's default configuration not requiring a config file. Having a config file `$HOME/.config/desync/config.json` allows for further customization of timeouts, error retry behaviour or credentials that can't be set via command-line options or environment variables. All values have sensible defaults if unconfigured. Only add configuration for values that differ from the defaults. To view the current configuration, use `desync config`. If no config file is present, this will show the defaults. To create a config file allowing custom values, use `desync config -w` which will write the current configuration to the file, then edit the file.

Available configuration values:
- `http-timeout` - HTTP request timeout used in HTTP stores (not S3) in nanoseconds
- `http-error-retry` - Number of times to retry failed chunk requests from HTTP stores
- `http-timeout` *DEPRECATED, see `store-options.<Location>.timeout`* - HTTP request timeout used in HTTP stores (not S3) in nanoseconds
- `http-error-retry` *DEPRECATED, see `store-options.<Location>.error-retry` - Number of times to retry failed chunk requests from HTTP stores
- `s3-credentials` - Defines credentials for use with S3 stores. Especially useful if more than one S3 store is used. The key in the config needs to be the URL scheme and host used for the store, excluding the path, but including the port number if used in the store URL. It is also possible to use a [standard aws credentials file](https://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html) in order to store s3 credentials.
- `store-options` - Allows customization of chunk and index stores, for example comression settings, timeouts, retry behavior and keys. Not all options are applicable to every store, some of these like `timeout` are ignored for local stores. Some of these options, such as the client certificates are overwritten with any values set in the command line. Note that the store location used in the command line needs to match the key under `store-options` exactly for these options to be used. Watch out for trailing `/` in URLs.
- `timeout` - Time limit for chunk read or write operation in nanoseconds. Default: 1 minute.
- `error-retry` - Number of times to retry failed chunk requests. Default: 0.
- `client-cert` - Cerificate file to be used for stores where the server requires mutual SSL.
- `client-key` - Key file to be used for stores where the server requires mutual SSL.
- `skip-verify` - Disables data integrity verification when reading chunks to improve performance. Only recommended when chaining chunk stores with the `chunk-server` command using compressed stores.
- `uncompressed` - Reads and writes uncompressed chunks from/to this store. This can improve performance, especially for local stores or caches. Compressed and uncompressed chunks can coexist in the same store, but only one kind is read or written by one client.

**Example config**

```
```json
{
"http-timeout": 60000000000,
"http-error-retry": 0,
"s3-credentials": {
"http://localhost": {
"access-key": "MYACCESSKEY",
Expand All @@ -171,6 +185,16 @@ Available configuration values:
"aws-region": "us-west-2",
"aws-profile": "profile_refreshable"
}
},
"store-options": {
"https://192.168.1.1/store": {
"client-cert": "/path/to/crt",
"client-key": "/path/to/key",
"error-retry": 1
},
"/path/to/local/cache": {
"uncompressed": true
}
}
}
```
Expand Down Expand Up @@ -337,7 +361,3 @@ desync info -j -s /tmp/store -s s3+http://127.0.0.1:9000/store /path/to/index
## Links
- casync - https://github.com/systemd/casync
- GoDoc for desync library - https://godoc.org/github.com/folbricht/desync

## TODOs
- Pre-allocate the output file to avoid fragmentation when using extract command
- Allow on-disk chunk cache to optionally be stored uncompressed, such that blocks can be directly reflinked (rather than copied) into files, when on a platform and filesystem where reflink support is available.
23 changes: 5 additions & 18 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"os"
"sync"

"github.com/pkg/errors"
)

// AssembleFile re-assembles a file based on a list of index chunks. It runs n
Expand Down Expand Up @@ -151,34 +149,23 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
// Record this chunk having been pulled from the store
stats.incChunksFromStore()
// Pull the (compressed) chunk from the store
b, err := s.GetChunk(c.ID)
chunk, err := s.GetChunk(c.ID)
if err != nil {
recordError(err)
continue
}
// Since we know how big the chunk is supposed to be, pre-allocate a
// slice to decompress into
var db []byte
db = make([]byte, c.Size)
// The the chunk is compressed. Decompress it here
db, err = Decompress(db, b)
b, err := chunk.Uncompressed()
if err != nil {
recordError(errors.Wrap(err, c.ID.String()))
continue
}
// Verify the checksum of the chunk matches the ID
sum := sha512.Sum512_256(db)
if sum != c.ID {
recordError(fmt.Errorf("unexpected sha512/256 %s for chunk id %s", sum, c.ID))
recordError(err)
continue
}
// Might as well verify the chunk size while we're at it
if c.Size != uint64(len(db)) {
if c.Size != uint64(len(b)) {
recordError(fmt.Errorf("unexpected size for chunk %s", c.ID))
continue
}
// Write the decompressed chunk into the file at the right position
if _, err = f.WriteAt(db, int64(c.Start)); err != nil {
if _, err = f.WriteAt(b, int64(c.Start)); err != nil {
recordError(err)
continue
}
Expand Down
6 changes: 3 additions & 3 deletions assemble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestExtract(t *testing.T) {
}
defer os.RemoveAll(store)

s, err := NewLocalStore(store)
s, err := NewLocalStore(store, StoreOptions{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -67,7 +67,7 @@ func TestExtract(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(blankstore)
bs, err := NewLocalStore(blankstore)
bs, err := NewLocalStore(blankstore, StoreOptions{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestSeed(t *testing.T) {
}
defer os.RemoveAll(store)

s, err := NewLocalStore(store)
s, err := NewLocalStore(store, StoreOptions{})
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 9 additions & 9 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@ func NewCache(s Store, l WriteStore) Cache {

// GetChunk first asks the local store for the chunk and then the remote one.
// If we get a chunk from the remote, it's stored locally too.
func (c Cache) GetChunk(id ChunkID) ([]byte, error) {
b, err := c.l.GetChunk(id)
func (c Cache) GetChunk(id ChunkID) (*Chunk, error) {
chunk, err := c.l.GetChunk(id)
switch err.(type) {
case nil:
return b, nil
return chunk, nil
case ChunkMissing:
default:
return nil, err
return chunk, err
}
// At this point we failed to find it in the local cache. Ask the remote
b, err = c.s.GetChunk(id)
chunk, err = c.s.GetChunk(id)
if err != nil {
return nil, err
return chunk, err
}
// Got the chunk. Store it in the local cache for next time
if err = c.l.StoreChunk(id, b); err != nil {
return nil, errors.Wrap(err, "failed to store in local cache")
if err = c.l.StoreChunk(chunk); err != nil {
return chunk, errors.Wrap(err, "failed to store in local cache")
}
return b, nil
return chunk, nil
}

// HasChunk first checks the cache for the chunk, then the store.
Expand Down
11 changes: 4 additions & 7 deletions chop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package desync

import (
"context"
"crypto/sha512"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -66,15 +65,13 @@ func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteSto
continue
}

// Calculate this chunks checksum and compare to what it's supposed to be
// according to the index
sum := sha512.Sum512_256(b)
if sum != c.ID {
recordError(fmt.Errorf("chunk %s checksum does not match", c.ID))
chunk, err := NewChunkWithID(c.ID, b, nil, false)
if err != nil {
recordError(err)
continue
}

if err := s.StoreChunk(c.ID, b); err != nil {
if err := s.StoreChunk(chunk); err != nil {
recordError(err)
continue
}
Expand Down
86 changes: 86 additions & 0 deletions chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package desync

import (
"crypto/sha512"
"errors"
)

// Chunk holds chunk data compressed, uncompressed, or both. If a chunk is created
// from compressed data, such as read from a compressed chunk store, and later the
// application requires the uncompressed data, it'll be decompressed on demand and
// also stored in Chunk. The same happens when the Chunk is made from uncompressed
// bytes and then to be stored in a compressed form.
type Chunk struct {
compressed, uncompressed []byte
id ChunkID
idCalculated bool
}

// NewChunkFromUncompressed creates a new chunk from uncompressed data.
func NewChunkFromUncompressed(b []byte) *Chunk {
return &Chunk{uncompressed: b}
}

// NewChunkWithID creates a new chunk from either compressed or uncompressed data
// (or both if available). It also expects an ID and validates that it matches
// the uncompressed data unless skipVerify is true. If called with just compressed
// data, it'll decompress it for the ID validation.
func NewChunkWithID(id ChunkID, uncompressed, compressed []byte, skipVerify bool) (*Chunk, error) {
c := &Chunk{id: id, uncompressed: uncompressed, compressed: compressed}
if skipVerify {
c.idCalculated = true // Pretend this was calculated. No need to re-calc later
return c, nil
}
sum := c.ID()
if sum != id {
return nil, ChunkInvalid{ID: id, Sum: sum}
}
return c, nil
}

// Compressed returns the chunk data in compressed form. If the chunk was created
// with uncompressed data only, it'll be compressed, stored and returned. The
// caller must not modify the data in the returned slice.
func (c *Chunk) Compressed() ([]byte, error) {
if len(c.compressed) > 0 {
return c.compressed, nil
}
if len(c.uncompressed) > 0 {
var err error
c.compressed, err = Compress(c.uncompressed)
return c.compressed, err
}
return nil, errors.New("no data in chunk")
}

// Uncompressed returns the chunk data in uncompressed form. If the chunk was created
// with compressed data only, it'll be decompressed, stored and returned. The
// caller must not modify the data in the returned slice.
func (c *Chunk) Uncompressed() ([]byte, error) {
if len(c.uncompressed) > 0 {
return c.uncompressed, nil
}
if len(c.compressed) > 0 {
var err error
c.uncompressed, err = Decompress(nil, c.compressed)
return c.uncompressed, err
}
return nil, errors.New("no data in chunk")
}

// ID returns the checksum/ID of the uncompressed chunk data. The ID is stored
// after the first call and doesn't need to be re-calculated. Note that calculating
// the ID may mean decompressing the data first.
func (c *Chunk) ID() ChunkID {

if c.idCalculated {
return c.id
}
b, err := c.Uncompressed()
if err != nil {
return ChunkID{}
}
c.id = sha512.Sum512_256(b)
c.idCalculated = true
return c.id
}
42 changes: 5 additions & 37 deletions chunkstorage.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package desync

import (
"bytes"
"fmt"
"os"
"sync"

"github.com/pkg/errors"
)

// ChunkStorage stores chunks in a writable store. It can be safely used by multiple goroutines and
Expand Down Expand Up @@ -46,55 +41,28 @@ func (s *ChunkStorage) unmarkProcessed(id ChunkID) {
}

// StoreChunk stores a single chunk in a synchronous manner.
func (s *ChunkStorage) StoreChunk(id ChunkID, b []byte) (err error) {
func (s *ChunkStorage) StoreChunk(chunk *Chunk) (err error) {

// Mark this chunk as done so no other goroutine will attempt to store it
// at the same time. If this is the first time this chunk is marked, it'll
// return false and we need to continue processing/storing the chunk below.
if s.markProcessed(id) {
if s.markProcessed(chunk.ID()) {
return nil
}

// Skip this chunk if the store already has it
if s.ws.HasChunk(id) {
if s.ws.HasChunk(chunk.ID()) {
return nil
}

// The chunk was marked as "processed" above. If there's a problem to actually
// store it, we need to unmark it again.
defer func() {
if err != nil {
s.unmarkProcessed(id)
s.unmarkProcessed(chunk.ID())
}
}()

var retried bool
retry:
// Compress the chunk
cb, err := Compress(b)
if err != nil {
return err
}

// The zstd library appears to fail to compress correctly in some cases, to
// avoid storing invalid chunks, verify the chunk again by decompressing
// and comparing. See https://github.com/folbricht/desync/issues/37.
// Ideally the code below should be removed once zstd library can be trusted
// again.
db, err := Decompress(nil, cb)
if err != nil {
return errors.Wrap(err, id.String())
}

if !bytes.Equal(b, db) {
if !retried {
fmt.Fprintln(os.Stderr, "zstd compression error detected, retrying")
retried = true
goto retry
}
return errors.New("too many zstd compression errors, aborting")
}

// Store the compressed chunk
return s.ws.StoreChunk(id, cb)
return s.ws.StoreChunk(chunk)
}
2 changes: 1 addition & 1 deletion cmd/desync/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func cache(ctx context.Context, args []string) error {
}

// Parse the store locations, open the stores and add a cache is requested
opts := storeOptions{
opts := cmdStoreOptions{
n: n,
clientCert: clientCert,
clientKey: clientKey,
Expand Down
Loading

0 comments on commit f8b8acc

Please sign in to comment.