From f8b8acc185f3520fc0f8f8a7de9189f93b32970b Mon Sep 17 00:00:00 2001 From: folbricht Date: Sat, 29 Sep 2018 14:16:57 -0600 Subject: [PATCH] Issue 50 - Allow configuration of individual stores and support uncompressed stores (#62) * Introduce new Chunk object in preparation for uncompressed store support * Make store features more customizable by passing StoreOptions to all constructors and allowing the options to be read from config or command line * Add skip-verify option to allow for efficient chaining of chunk stores * Support for uncompressed stores and chunk-servers * Add documentation * Fix failing tests and simplify chunk object usage * Fix out-of-bounds error dealing with index stores without path or URL component --- README.md | 46 ++++++--- assemble.go | 23 +---- assemble_test.go | 6 +- cache.go | 18 ++-- chop.go | 11 +-- chunk.go | 86 +++++++++++++++++ chunkstorage.go | 42 +-------- cmd/desync/cache.go | 2 +- cmd/desync/cat.go | 2 +- cmd/desync/chop.go | 2 +- cmd/desync/config.go | 20 ++-- cmd/desync/extract.go | 6 +- cmd/desync/info.go | 4 +- cmd/desync/list.go | 2 +- cmd/desync/main.go | 2 - cmd/desync/make.go | 2 +- cmd/desync/mount-index.go | 2 +- cmd/desync/prune.go | 4 +- cmd/desync/pull.go | 8 +- cmd/desync/server.go | 35 ++++--- cmd/desync/store.go | 111 ++++++++++++++++------ cmd/desync/tar.go | 2 +- cmd/desync/untar.go | 2 +- cmd/desync/upgrade-s3.go | 54 ----------- cmd/desync/verify.go | 4 +- cmd/desync/verifyindex.go | 2 +- const.go | 6 ++ copy.go | 4 +- go.sum | 46 +++++++++ httphandler.go | 103 ++++++++++---------- httphandler_test.go | 132 ++++++++++++++++++++++++++ httphandlerbase.go | 3 +- index.go | 11 +-- index_test.go | 4 +- local.go | 130 +++++++++++++++----------- local_test.go | 191 ++++++++++++++++++++++++++++++++++++++ protocol.go | 11 +-- protocol_test.go | 11 ++- protocolserver.go | 8 +- protocolserver_test.go | 13 ++- readseeker.go | 20 ++-- remotehttp.go | 98 +++++++++++-------- remotehttpindex.go | 4 +- remotessh.go | 12 +-- s3.go | 111 ++++++++++------------ s3index.go | 4 +- selfseed_test.go | 13 +-- sftp.go | 62 ++++++++++--- sftpindex.go | 4 +- store.go | 36 ++++++- store_test.go | 4 +- storerouter.go | 6 +- untar.go | 24 ++--- 53 files changed, 1048 insertions(+), 521 deletions(-) create mode 100644 chunk.go delete mode 100644 cmd/desync/upgrade-s3.go create mode 100644 go.sum create mode 100644 httphandler_test.go create mode 100644 local_test.go diff --git a/README.md b/README.md index d7031f4..5e8bbe0 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Among the distinguishing factors: - Where the upstream command has chosen to optimize for storage efficiency (f/e, being able to use local files as "seeds", building temporary indexes into them), this command chooses to optimize for runtime performance (maintaining a local explicit chunk store, avoiding the need to reindex) at cost to storage efficiency. - Where the upstream command has chosen to take full advantage of Linux platform features, this client chooses to implement a minimum featureset and, while high-value platform-specific features (such as support for btrfs reflinks into a decompressed local chunk cache) might be added in the future, the ability to build without them on other platforms will be maintained. - SHA512/256 is currently the only supported hash function. -- Only chunk store using zstd compression are supported at this point. +- Only chunk stores using zstd compression as well uncompressed are supported at this point. - Supports local stores as well as remote stores (as client) over SSH, SFTP and HTTP - Built-in HTTP(S) chunk server that can proxy multiple local or remote stores and also supports caching. - Drop-in replacement for casync on SSH servers when serving chunks read-only @@ -22,6 +22,7 @@ Among the distinguishing factors: - Allows FUSE mounting of blob indexes - S3 protocol support to access chunk stores for read operations and some some commands that write chunks - Stores and retrieves index files from remote index stores such as HTTP, SFTP and S3 +- Built-in HTTP(S) index server to read/write indexes - Reflinking matching blocks (rather than copying) from seed files if supported by the filesystem (currently only Btrfs and XFS) ## Parallel chunking @@ -136,24 +137,37 @@ This is a store running on the local machine on port 9000 without SSL. s3+http://127.0.0.1:9000/store ``` -#### Previous S3 storage layout -Before April 2018, chunks in S3 stores were kept in a flat layout, with the name being the checksum of the chunk. Since then, the layout was modified to match that of local stores: `<4-checksum-chars>/.cacnk` This change allows the use of other tools to convert or copy stores between local and S3 stores. To convert an existing s3 store from the old format, a command `upgrade-s3` is available in the tool. +### Compressed vs Uncompressed chunk stores +By default, desync reads and writes chunks in compressed form to all supported stores. This is in line with upstream casync's goal of storing in the most efficient way. It is however possible to change this behavior by providing desync with a config file (see Configuration section below). Disabling compression and store chunks uncompressed may reduce latency in some use-cases and improve performance. desync supports reading and writing uncompressed chunks to SFTP, S3, HTTP and local stores and caches. If more than one store is used, each of those can be configured independently, for example it's possible to read compressed chunks from S3 while using a local uncompressed cache for best performance. However, care needs to be taken when using the `chunk-server` command and building chains of chunk store proxies to avoid shifting the decompression load onto the server (it's possible this is actually desirable). + +In the setup below, a client reads chunks from an HTTP chunk server which itself gets chunks from S3. +``` + ---> ---> +``` +If the client configures the HTTP chunk server to be uncompressed (`chunk-server` needs to be started with the `-u` option), and the chunk server reads compressed chunks from S3, then the chunk server will have to decompress every chunk that's requested before responding to the client. If the chunk server was reading uncompressed chunks from S3, there would be no overhead. + +Compressed and uncompressed chunks can live in the same store and don't interfere with each other. A store that's configured for compressed chunks by configuring it client-side will not see the uncompressed chunks that may be present. `prune` and `verify` too will ignore any chunks written in the other format. Both kinds of chunks can be accessed by multiple clients concurrently and independently. ### Configuration -For most use cases, it is sufficient to use the tool's default configuration not requiring a config file. Having a config file `$HOME/.config/desync/config.json` allows for further customization of timeouts, error retry behaviour or credentials that can't be set via command-line options or environment variables. To view the current configuration, use `desync config`. If no config file is present, this will show the defaults. To create a config file allowing custom values, use `desync config -w` which will write the current configuration to the file, then edit the file. +For most use cases, it is sufficient to use the tool's default configuration not requiring a config file. Having a config file `$HOME/.config/desync/config.json` allows for further customization of timeouts, error retry behaviour or credentials that can't be set via command-line options or environment variables. All values have sensible defaults if unconfigured. Only add configuration for values that differ from the defaults. To view the current configuration, use `desync config`. If no config file is present, this will show the defaults. To create a config file allowing custom values, use `desync config -w` which will write the current configuration to the file, then edit the file. Available configuration values: -- `http-timeout` - HTTP request timeout used in HTTP stores (not S3) in nanoseconds -- `http-error-retry` - Number of times to retry failed chunk requests from HTTP stores +- `http-timeout` *DEPRECATED, see `store-options..timeout`* - HTTP request timeout used in HTTP stores (not S3) in nanoseconds +- `http-error-retry` *DEPRECATED, see `store-options..error-retry` - Number of times to retry failed chunk requests from HTTP stores - `s3-credentials` - Defines credentials for use with S3 stores. Especially useful if more than one S3 store is used. The key in the config needs to be the URL scheme and host used for the store, excluding the path, but including the port number if used in the store URL. It is also possible to use a [standard aws credentials file](https://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html) in order to store s3 credentials. +- `store-options` - Allows customization of chunk and index stores, for example comression settings, timeouts, retry behavior and keys. Not all options are applicable to every store, some of these like `timeout` are ignored for local stores. Some of these options, such as the client certificates are overwritten with any values set in the command line. Note that the store location used in the command line needs to match the key under `store-options` exactly for these options to be used. Watch out for trailing `/` in URLs. + - `timeout` - Time limit for chunk read or write operation in nanoseconds. Default: 1 minute. + - `error-retry` - Number of times to retry failed chunk requests. Default: 0. + - `client-cert` - Cerificate file to be used for stores where the server requires mutual SSL. + - `client-key` - Key file to be used for stores where the server requires mutual SSL. + - `skip-verify` - Disables data integrity verification when reading chunks to improve performance. Only recommended when chaining chunk stores with the `chunk-server` command using compressed stores. + - `uncompressed` - Reads and writes uncompressed chunks from/to this store. This can improve performance, especially for local stores or caches. Compressed and uncompressed chunks can coexist in the same store, but only one kind is read or written by one client. **Example config** -``` +```json { - "http-timeout": 60000000000, - "http-error-retry": 0, "s3-credentials": { "http://localhost": { "access-key": "MYACCESSKEY", @@ -171,6 +185,16 @@ Available configuration values: "aws-region": "us-west-2", "aws-profile": "profile_refreshable" } + }, + "store-options": { + "https://192.168.1.1/store": { + "client-cert": "/path/to/crt", + "client-key": "/path/to/key", + "error-retry": 1 + }, + "/path/to/local/cache": { + "uncompressed": true + } } } ``` @@ -337,7 +361,3 @@ desync info -j -s /tmp/store -s s3+http://127.0.0.1:9000/store /path/to/index ## Links - casync - https://github.com/systemd/casync - GoDoc for desync library - https://godoc.org/github.com/folbricht/desync - -## TODOs -- Pre-allocate the output file to avoid fragmentation when using extract command -- Allow on-disk chunk cache to optionally be stored uncompressed, such that blocks can be directly reflinked (rather than copied) into files, when on a platform and filesystem where reflink support is available. diff --git a/assemble.go b/assemble.go index e3ef4cb..4391406 100644 --- a/assemble.go +++ b/assemble.go @@ -6,8 +6,6 @@ import ( "fmt" "os" "sync" - - "github.com/pkg/errors" ) // AssembleFile re-assembles a file based on a list of index chunks. It runs n @@ -151,34 +149,23 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] // Record this chunk having been pulled from the store stats.incChunksFromStore() // Pull the (compressed) chunk from the store - b, err := s.GetChunk(c.ID) + chunk, err := s.GetChunk(c.ID) if err != nil { recordError(err) continue } - // Since we know how big the chunk is supposed to be, pre-allocate a - // slice to decompress into - var db []byte - db = make([]byte, c.Size) - // The the chunk is compressed. Decompress it here - db, err = Decompress(db, b) + b, err := chunk.Uncompressed() if err != nil { - recordError(errors.Wrap(err, c.ID.String())) - continue - } - // Verify the checksum of the chunk matches the ID - sum := sha512.Sum512_256(db) - if sum != c.ID { - recordError(fmt.Errorf("unexpected sha512/256 %s for chunk id %s", sum, c.ID)) + recordError(err) continue } // Might as well verify the chunk size while we're at it - if c.Size != uint64(len(db)) { + if c.Size != uint64(len(b)) { recordError(fmt.Errorf("unexpected size for chunk %s", c.ID)) continue } // Write the decompressed chunk into the file at the right position - if _, err = f.WriteAt(db, int64(c.Start)); err != nil { + if _, err = f.WriteAt(b, int64(c.Start)); err != nil { recordError(err) continue } diff --git a/assemble_test.go b/assemble_test.go index 9bfdc35..dd6b21d 100644 --- a/assemble_test.go +++ b/assemble_test.go @@ -53,7 +53,7 @@ func TestExtract(t *testing.T) { } defer os.RemoveAll(store) - s, err := NewLocalStore(store) + s, err := NewLocalStore(store, StoreOptions{}) if err != nil { t.Fatal(err) } @@ -67,7 +67,7 @@ func TestExtract(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(blankstore) - bs, err := NewLocalStore(blankstore) + bs, err := NewLocalStore(blankstore, StoreOptions{}) if err != nil { t.Fatal(err) } @@ -162,7 +162,7 @@ func TestSeed(t *testing.T) { } defer os.RemoveAll(store) - s, err := NewLocalStore(store) + s, err := NewLocalStore(store, StoreOptions{}) if err != nil { t.Fatal(err) } diff --git a/cache.go b/cache.go index 65815d4..ee53b43 100644 --- a/cache.go +++ b/cache.go @@ -23,25 +23,25 @@ func NewCache(s Store, l WriteStore) Cache { // GetChunk first asks the local store for the chunk and then the remote one. // If we get a chunk from the remote, it's stored locally too. -func (c Cache) GetChunk(id ChunkID) ([]byte, error) { - b, err := c.l.GetChunk(id) +func (c Cache) GetChunk(id ChunkID) (*Chunk, error) { + chunk, err := c.l.GetChunk(id) switch err.(type) { case nil: - return b, nil + return chunk, nil case ChunkMissing: default: - return nil, err + return chunk, err } // At this point we failed to find it in the local cache. Ask the remote - b, err = c.s.GetChunk(id) + chunk, err = c.s.GetChunk(id) if err != nil { - return nil, err + return chunk, err } // Got the chunk. Store it in the local cache for next time - if err = c.l.StoreChunk(id, b); err != nil { - return nil, errors.Wrap(err, "failed to store in local cache") + if err = c.l.StoreChunk(chunk); err != nil { + return chunk, errors.Wrap(err, "failed to store in local cache") } - return b, nil + return chunk, nil } // HasChunk first checks the cache for the chunk, then the store. diff --git a/chop.go b/chop.go index d580c5a..f36efd3 100644 --- a/chop.go +++ b/chop.go @@ -2,7 +2,6 @@ package desync import ( "context" - "crypto/sha512" "fmt" "io" "os" @@ -66,15 +65,13 @@ func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteSto continue } - // Calculate this chunks checksum and compare to what it's supposed to be - // according to the index - sum := sha512.Sum512_256(b) - if sum != c.ID { - recordError(fmt.Errorf("chunk %s checksum does not match", c.ID)) + chunk, err := NewChunkWithID(c.ID, b, nil, false) + if err != nil { + recordError(err) continue } - if err := s.StoreChunk(c.ID, b); err != nil { + if err := s.StoreChunk(chunk); err != nil { recordError(err) continue } diff --git a/chunk.go b/chunk.go new file mode 100644 index 0000000..40723cf --- /dev/null +++ b/chunk.go @@ -0,0 +1,86 @@ +package desync + +import ( + "crypto/sha512" + "errors" +) + +// Chunk holds chunk data compressed, uncompressed, or both. If a chunk is created +// from compressed data, such as read from a compressed chunk store, and later the +// application requires the uncompressed data, it'll be decompressed on demand and +// also stored in Chunk. The same happens when the Chunk is made from uncompressed +// bytes and then to be stored in a compressed form. +type Chunk struct { + compressed, uncompressed []byte + id ChunkID + idCalculated bool +} + +// NewChunkFromUncompressed creates a new chunk from uncompressed data. +func NewChunkFromUncompressed(b []byte) *Chunk { + return &Chunk{uncompressed: b} +} + +// NewChunkWithID creates a new chunk from either compressed or uncompressed data +// (or both if available). It also expects an ID and validates that it matches +// the uncompressed data unless skipVerify is true. If called with just compressed +// data, it'll decompress it for the ID validation. +func NewChunkWithID(id ChunkID, uncompressed, compressed []byte, skipVerify bool) (*Chunk, error) { + c := &Chunk{id: id, uncompressed: uncompressed, compressed: compressed} + if skipVerify { + c.idCalculated = true // Pretend this was calculated. No need to re-calc later + return c, nil + } + sum := c.ID() + if sum != id { + return nil, ChunkInvalid{ID: id, Sum: sum} + } + return c, nil +} + +// Compressed returns the chunk data in compressed form. If the chunk was created +// with uncompressed data only, it'll be compressed, stored and returned. The +// caller must not modify the data in the returned slice. +func (c *Chunk) Compressed() ([]byte, error) { + if len(c.compressed) > 0 { + return c.compressed, nil + } + if len(c.uncompressed) > 0 { + var err error + c.compressed, err = Compress(c.uncompressed) + return c.compressed, err + } + return nil, errors.New("no data in chunk") +} + +// Uncompressed returns the chunk data in uncompressed form. If the chunk was created +// with compressed data only, it'll be decompressed, stored and returned. The +// caller must not modify the data in the returned slice. +func (c *Chunk) Uncompressed() ([]byte, error) { + if len(c.uncompressed) > 0 { + return c.uncompressed, nil + } + if len(c.compressed) > 0 { + var err error + c.uncompressed, err = Decompress(nil, c.compressed) + return c.uncompressed, err + } + return nil, errors.New("no data in chunk") +} + +// ID returns the checksum/ID of the uncompressed chunk data. The ID is stored +// after the first call and doesn't need to be re-calculated. Note that calculating +// the ID may mean decompressing the data first. +func (c *Chunk) ID() ChunkID { + + if c.idCalculated { + return c.id + } + b, err := c.Uncompressed() + if err != nil { + return ChunkID{} + } + c.id = sha512.Sum512_256(b) + c.idCalculated = true + return c.id +} diff --git a/chunkstorage.go b/chunkstorage.go index 58658bc..a81d01c 100644 --- a/chunkstorage.go +++ b/chunkstorage.go @@ -1,12 +1,7 @@ package desync import ( - "bytes" - "fmt" - "os" "sync" - - "github.com/pkg/errors" ) // ChunkStorage stores chunks in a writable store. It can be safely used by multiple goroutines and @@ -46,17 +41,17 @@ func (s *ChunkStorage) unmarkProcessed(id ChunkID) { } // StoreChunk stores a single chunk in a synchronous manner. -func (s *ChunkStorage) StoreChunk(id ChunkID, b []byte) (err error) { +func (s *ChunkStorage) StoreChunk(chunk *Chunk) (err error) { // Mark this chunk as done so no other goroutine will attempt to store it // at the same time. If this is the first time this chunk is marked, it'll // return false and we need to continue processing/storing the chunk below. - if s.markProcessed(id) { + if s.markProcessed(chunk.ID()) { return nil } // Skip this chunk if the store already has it - if s.ws.HasChunk(id) { + if s.ws.HasChunk(chunk.ID()) { return nil } @@ -64,37 +59,10 @@ func (s *ChunkStorage) StoreChunk(id ChunkID, b []byte) (err error) { // store it, we need to unmark it again. defer func() { if err != nil { - s.unmarkProcessed(id) + s.unmarkProcessed(chunk.ID()) } }() - var retried bool -retry: - // Compress the chunk - cb, err := Compress(b) - if err != nil { - return err - } - - // The zstd library appears to fail to compress correctly in some cases, to - // avoid storing invalid chunks, verify the chunk again by decompressing - // and comparing. See https://github.com/folbricht/desync/issues/37. - // Ideally the code below should be removed once zstd library can be trusted - // again. - db, err := Decompress(nil, cb) - if err != nil { - return errors.Wrap(err, id.String()) - } - - if !bytes.Equal(b, db) { - if !retried { - fmt.Fprintln(os.Stderr, "zstd compression error detected, retrying") - retried = true - goto retry - } - return errors.New("too many zstd compression errors, aborting") - } - // Store the compressed chunk - return s.ws.StoreChunk(id, cb) + return s.ws.StoreChunk(chunk) } diff --git a/cmd/desync/cache.go b/cmd/desync/cache.go index bbd3d52..034c9f8 100644 --- a/cmd/desync/cache.go +++ b/cmd/desync/cache.go @@ -56,7 +56,7 @@ func cache(ctx context.Context, args []string) error { } // Parse the store locations, open the stores and add a cache is requested - opts := storeOptions{ + opts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/cmd/desync/cat.go b/cmd/desync/cat.go index b9325c6..fa72f13 100644 --- a/cmd/desync/cat.go +++ b/cmd/desync/cat.go @@ -84,7 +84,7 @@ func cat(ctx context.Context, args []string) error { } // Parse the store locations, open the stores and add a cache is requested - opts := storeOptions{ + opts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/cmd/desync/chop.go b/cmd/desync/chop.go index 1705c5a..74d981e 100644 --- a/cmd/desync/chop.go +++ b/cmd/desync/chop.go @@ -48,7 +48,7 @@ func chop(ctx context.Context, args []string) error { dataFile := flags.Arg(1) // Parse the store locations, open the stores and add a cache is requested - opts := storeOptions{ + opts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/cmd/desync/config.go b/cmd/desync/config.go index 2c9ce84..37b1597 100644 --- a/cmd/desync/config.go +++ b/cmd/desync/config.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/folbricht/desync" "github.com/minio/minio-go/pkg/credentials" "github.com/pkg/errors" ) @@ -27,9 +28,10 @@ type S3Creds struct { } type Config struct { - HTTPTimeout time.Duration `json:"http-timeout"` - HTTPErrorRetry int `json:"http-error-retry"` - S3Credentials map[string]S3Creds `json:"s3-credentials"` + HTTPTimeout time.Duration `json:"http-timeout,omitempty"` + HTTPErrorRetry int `json:"http-error-retry,omitempty"` + S3Credentials map[string]S3Creds `json:"s3-credentials"` + StoreOptions map[string]desync.StoreOptions `json:"store-options"` } // GetS3CredentialsFor attempts to find creds and region for an S3 location in the @@ -63,12 +65,16 @@ func (c Config) GetS3CredentialsFor(u *url.URL) (*credentials.Credentials, strin return creds, region } -// Global config in the main packe defining the defaults. Those can be -// overridden by loading a config file. -var cfg = Config{ - HTTPTimeout: time.Minute, +// GetStoreOptionsFor returns optional config options for a specific store. Note that +// the location string in the config file needs to match exactly (watch for trailing /). +func (c Config) GetStoreOptionsFor(location string) desync.StoreOptions { + return c.StoreOptions[location] } +// Global config in the main packe defining the defaults. Those can be +// overridden by loading a config file or in the command line. +var cfg Config + const configUsage = `desync config Shows the current internal config settings, either the defaults or the values diff --git a/cmd/desync/extract.go b/cmd/desync/extract.go index 0766fca..f9f49e2 100644 --- a/cmd/desync/extract.go +++ b/cmd/desync/extract.go @@ -80,7 +80,7 @@ func extract(ctx context.Context, args []string) error { // Parse the store locations, open the stores and add a cache is requested var s desync.Store - opts := storeOptions{ + opts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, @@ -153,7 +153,7 @@ func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.S return desync.AssembleFile(ctx, name, idx, s, seeds, n, pb) } -func readSeeds(dstFile string, locations []string, opts storeOptions) ([]desync.Seed, error) { +func readSeeds(dstFile string, locations []string, opts cmdStoreOptions) ([]desync.Seed, error) { var seeds []desync.Seed for _, srcIndexFile := range locations { srcIndex, err := readCaibxFile(srcIndexFile, opts) @@ -171,7 +171,7 @@ func readSeeds(dstFile string, locations []string, opts storeOptions) ([]desync. return seeds, nil } -func readSeedDirs(dstFile, dstIdxFile string, dirs []string, opts storeOptions) ([]desync.Seed, error) { +func readSeedDirs(dstFile, dstIdxFile string, dirs []string, opts cmdStoreOptions) ([]desync.Seed, error) { var seeds []desync.Seed absIn, err := filepath.Abs(dstIdxFile) if err != nil { diff --git a/cmd/desync/info.go b/cmd/desync/info.go index 5739a6d..eaacfe3 100644 --- a/cmd/desync/info.go +++ b/cmd/desync/info.go @@ -55,7 +55,7 @@ func info(ctx context.Context, args []string) error { return errors.New("-clientKey and -clientCert options need to be provided together.") } - opts := storeOptions{ + opts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, @@ -87,7 +87,7 @@ func info(ctx context.Context, args []string) error { results.Unique = len(deduped) if len(storeLocations.list) > 0 { - store, err := multiStore(storeOptions{n: n}, storeLocations.list...) + store, err := multiStore(cmdStoreOptions{n: n}, storeLocations.list...) if err != nil { return err } diff --git a/cmd/desync/list.go b/cmd/desync/list.go index b6f9243..4d1b163 100644 --- a/cmd/desync/list.go +++ b/cmd/desync/list.go @@ -40,7 +40,7 @@ func list(ctx context.Context, args []string) error { } // Parse the store locations, open the stores and add a cache is requested - opts := storeOptions{ + opts := cmdStoreOptions{ clientCert: clientCert, clientKey: clientKey, } diff --git a/cmd/desync/main.go b/cmd/desync/main.go index 1cf7b73..1a24050 100644 --- a/cmd/desync/main.go +++ b/cmd/desync/main.go @@ -31,7 +31,6 @@ prune - remove all unreferenced chunks from a local store verify-index - verify that an index file matches a given blob chunk-server - start a HTTP chunk server mount-index - FUSE mount an index -upgrade-s3 - convert an s3 store from the old to the new storage layout info - show information about an index file ` @@ -81,7 +80,6 @@ func main() { "chunk": chunkCmd, "make": makeCmd, "mount-index": mountIdx, - "upgrade-s3": upgradeS3, "config": config, "info": info, "verify-index": verifyIndex, diff --git a/cmd/desync/make.go b/cmd/desync/make.go index bae650a..d064f3e 100644 --- a/cmd/desync/make.go +++ b/cmd/desync/make.go @@ -57,7 +57,7 @@ func makeCmd(ctx context.Context, args []string) error { indexFile := flags.Arg(0) dataFile := flags.Arg(1) - sOpts := storeOptions{ + sOpts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/cmd/desync/mount-index.go b/cmd/desync/mount-index.go index 2a7c6a8..f792ce4 100644 --- a/cmd/desync/mount-index.go +++ b/cmd/desync/mount-index.go @@ -66,7 +66,7 @@ func mountIdx(ctx context.Context, args []string) error { } // Parse the store locations, open the stores and add a cache is requested - opts := storeOptions{ + opts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/cmd/desync/prune.go b/cmd/desync/prune.go index 30c1b3d..e0e3596 100644 --- a/cmd/desync/prune.go +++ b/cmd/desync/prune.go @@ -48,13 +48,13 @@ func prune(ctx context.Context, args []string) error { } // Parse the store locations, open the stores and add a cache is requested - opts := storeOptions{ + opts := cmdStoreOptions{ clientCert: clientCert, clientKey: clientKey, } // Open the target store - sr, err := storeFromLocation(storeLocation, storeOptions{}) + sr, err := storeFromLocation(storeLocation, cmdStoreOptions{}) if err != nil { return err } diff --git a/cmd/desync/pull.go b/cmd/desync/pull.go index 463974f..ec9428b 100644 --- a/cmd/desync/pull.go +++ b/cmd/desync/pull.go @@ -30,8 +30,14 @@ func pull(ctx context.Context, args []string) error { storeLocation := flags.Arg(3) + // SSH only supports serving compressed chunks currently. And we really + // don't want to have to decompress every chunk to verify its checksum. + // Clients will do that anyway, so disable verification here. + opt := cfg.GetStoreOptionsFor(storeLocation) + opt.SkipVerify = true + // Open the local store to serve chunks from - s, err := desync.NewLocalStore(storeLocation) + s, err := desync.NewLocalStore(storeLocation, opt) if err != nil { return err } diff --git a/cmd/desync/server.go b/cmd/desync/server.go index 8ac593d..c539a7d 100644 --- a/cmd/desync/server.go +++ b/cmd/desync/server.go @@ -27,7 +27,11 @@ Starts an HTTP chunk server that can be used as remote store. It supports reading from multiple local or remote stores as well as a local cache. If -cert and -key are provided, the server will serve over HTTPS. The -w option enables writing to this store, but this is only allowed when just one upstream -chunk store is provided.` +chunk store is provided. The option -skip-verify-write disables validation of +chunks written to this server which bypasses checksum validation as well as +the necessary decompression step to calculate it. If -u is used, only +uncompressed chunks are being served (and accepted). If the upstream store +serves compressed chunks, everything will have to be decompressed server-side.` indexServerUsage = `desync index-server [options] @@ -47,6 +51,8 @@ func server(ctx context.Context, serverType ServerType, args []string) error { clientCert string clientKey string writable bool + skipVerifyWrite bool + uncompressed bool ) flags := flag.NewFlagSet("server", flag.ExitOnError) flags.Usage = func() { @@ -77,6 +83,8 @@ func server(ctx context.Context, serverType ServerType, args []string) error { flags.StringVar(&clientCert, "clientCert", "", "Path to Client Certificate for TLS authentication") flags.StringVar(&clientKey, "clientKey", "", "Path to Client Key for TLS authentication") flags.BoolVar(&writable, "w", false, "support writing") + flags.BoolVar(&skipVerifyWrite, "skip-verify-write", false, "Don't verify chunk data written to this server (faster)") + flags.BoolVar(&uncompressed, "u", false, "Serve uncompressed chunks") flags.Parse(args) if flags.NArg() > 0 { @@ -105,17 +113,18 @@ func server(ctx context.Context, serverType ServerType, args []string) error { return errors.New("Only one upstream store supported for writing") } - // Parse the store locations, open the stores and add a cache is requested - var ( - opts = storeOptions{ - n: n, - clientCert: clientCert, - clientKey: clientKey, - } - ) + // Parse the store locations, open the stores and add a cache if requested. Since we're + // just proxying chunks, disable verification so we don't need to decompress everything + // to verify the checksum. Clients will be checking the data anyway. + var opts = cmdStoreOptions{ + n: n, + clientCert: clientCert, + clientKey: clientKey, + skipVerify: true, + } if serverType == ChunkServer { - s, err := handleChunkStore(writable, storeLocations, opts, cacheLocation) + s, err := handleChunkStore(writable, skipVerifyWrite, uncompressed, storeLocations, opts, cacheLocation) if err != nil { return err } @@ -153,7 +162,7 @@ func server(ctx context.Context, serverType ServerType, args []string) error { return nil } -func handleChunkStore(writable bool, storeLocations *multiArg, opts storeOptions, cacheLocation string) (desync.Store, error) { +func handleChunkStore(writable, skipVerifyWrite, uncompressed bool, storeLocations *multiArg, opts cmdStoreOptions, cacheLocation string) (desync.Store, error) { var ( s desync.Store err error @@ -167,11 +176,11 @@ func handleChunkStore(writable bool, storeLocations *multiArg, opts storeOptions return nil, err } - http.Handle("/", desync.NewHTTPHandler(s, writable)) + http.Handle("/", desync.NewHTTPHandler(s, writable, skipVerifyWrite, uncompressed)) return s, err } -func handleIndexStore(writable bool, storeLocations *multiArg, opts storeOptions, cacheLocation string) (desync.IndexStore, error) { +func handleIndexStore(writable bool, storeLocations *multiArg, opts cmdStoreOptions, cacheLocation string) (desync.IndexStore, error) { var ( s desync.IndexStore err error diff --git a/cmd/desync/store.go b/cmd/desync/store.go index 0610843..7cd808c 100644 --- a/cmd/desync/store.go +++ b/cmd/desync/store.go @@ -3,6 +3,7 @@ package main import ( "fmt" "net/url" + "strings" "path" "path/filepath" @@ -11,24 +12,42 @@ import ( "github.com/pkg/errors" ) -// storeOptions are used to pass additional options to store initalization -type storeOptions struct { +// cmdStoreOptions are used to pass additional options to store initalization from the +// commandline. These generally override settings from the config file. +type cmdStoreOptions struct { n int clientCert string clientKey string + skipVerify bool +} + +// MergeWith takes store options as read from the config, and applies command-line +// provided options on top of them and returns the merged result. +func (o cmdStoreOptions) MergedWith(opt desync.StoreOptions) desync.StoreOptions { + opt.N = o.n + if o.clientCert != "" { + opt.ClientCert = o.clientCert + } + if o.clientKey != "" { + opt.ClientKey = o.clientKey + } + if o.skipVerify { + opt.SkipVerify = true + } + return opt } // MultiStoreWithCache is used to parse store and cache locations given in the // command line. // cacheLocation - Place of the local store used for caching, can be blank // storeLocation - URLs or paths to remote or local stores that should be queried in order -func MultiStoreWithCache(opts storeOptions, cacheLocation string, storeLocations ...string) (desync.Store, error) { +func MultiStoreWithCache(cmdOpt cmdStoreOptions, cacheLocation string, storeLocations ...string) (desync.Store, error) { var ( store desync.Store stores []desync.Store ) for _, location := range storeLocations { - s, err := storeFromLocation(location, opts) + s, err := storeFromLocation(location, cmdOpt) if err != nil { return store, err } @@ -41,7 +60,7 @@ func MultiStoreWithCache(opts storeOptions, cacheLocation string, storeLocations // See if we want to use a writable store as cache, if so, attach a cache to // the router if cacheLocation != "" { - cache, err := WritableStore(cacheLocation, opts) + cache, err := WritableStore(cacheLocation, cmdOpt) if err != nil { return store, err } @@ -56,10 +75,10 @@ func MultiStoreWithCache(opts storeOptions, cacheLocation string, storeLocations // multiStoreWithCache is used to parse store locations, and return a store // router instance containing them all for reading, in the order they're given -func multiStore(opts storeOptions, storeLocations ...string) (desync.Store, error) { +func multiStore(cmdOpt cmdStoreOptions, storeLocations ...string) (desync.Store, error) { var stores []desync.Store for _, location := range storeLocations { - s, err := storeFromLocation(location, opts) + s, err := storeFromLocation(location, cmdOpt) if err != nil { return nil, err } @@ -73,8 +92,8 @@ func multiStore(opts storeOptions, storeLocations ...string) (desync.Store, erro // commands that expect to write chunks, such as make or tar. It determines // which type of writable store is needed, instantiates and returns a // single desync.WriteStore. -func WritableStore(location string, opts storeOptions) (desync.WriteStore, error) { - s, err := storeFromLocation(location, opts) +func WritableStore(location string, cmdOpt cmdStoreOptions) (desync.WriteStore, error) { + s, err := storeFromLocation(location, cmdOpt) if err != nil { return nil, err } @@ -86,34 +105,46 @@ func WritableStore(location string, opts storeOptions) (desync.WriteStore, error } // Parse a single store URL or path and return an initialized instance of it -func storeFromLocation(location string, opts storeOptions) (desync.Store, error) { +func storeFromLocation(location string, cmdOpt cmdStoreOptions) (desync.Store, error) { loc, err := url.Parse(location) if err != nil { return nil, fmt.Errorf("Unable to parse store location %s : %s", location, err) } + + // Get any store options from the config if present and overwrite with settings from + // the command line + opt := cmdOpt.MergedWith(cfg.GetStoreOptionsFor(location)) + var s desync.Store switch loc.Scheme { case "ssh": - s, err = desync.NewRemoteSSHStore(loc, opts.n) + s, err = desync.NewRemoteSSHStore(loc, opt) if err != nil { return nil, err } case "sftp": - s, err = desync.NewSFTPStore(loc) + s, err = desync.NewSFTPStore(loc, opt) if err != nil { return nil, err } case "http", "https": - h, err := desync.NewRemoteHTTPStore(loc, opts.n, opts.clientCert, opts.clientKey) + // This is for backwards compatibility only, to support http-timeout and + // http-error-retry in the config file for a bit longer. If those are + // set, and the options for the store aren't, then use the old values. + // TODO: Remove this code and drop these config options in the future. + if opt.Timeout == 0 && cfg.HTTPTimeout > 0 { + opt.Timeout = cfg.HTTPTimeout + } + if opt.ErrorRetry == 0 && cfg.HTTPErrorRetry > 0 { + opt.ErrorRetry = cfg.HTTPErrorRetry + } + s, err = desync.NewRemoteHTTPStore(loc, opt) if err != nil { return nil, err } - h.SetTimeout(cfg.HTTPTimeout) - h.SetErrorRetry(cfg.HTTPErrorRetry) - s = h case "s3+http", "s3+https": s3Creds, region := cfg.GetS3CredentialsFor(loc) - s, err = desync.NewS3Store(loc, s3Creds, region) + s, err = desync.NewS3Store(loc, s3Creds, region, opt) if err != nil { return nil, err } @@ -126,8 +157,8 @@ func storeFromLocation(location string, opts storeOptions) (desync.Store, error) return s, nil } -func readCaibxFile(location string, opts storeOptions) (c desync.Index, err error) { - is, indexName, err := indexStoreFromLocation(location, opts) +func readCaibxFile(location string, cmdOpt cmdStoreOptions) (c desync.Index, err error) { + is, indexName, err := indexStoreFromLocation(location, cmdOpt) if err != nil { return c, err } @@ -136,8 +167,8 @@ func readCaibxFile(location string, opts storeOptions) (c desync.Index, err erro return is.GetIndex(indexName) } -func storeCaibxFile(idx desync.Index, location string, opts storeOptions) error { - is, indexName, err := writableIndexStore(location, opts) +func storeCaibxFile(idx desync.Index, location string, cmdOpt cmdStoreOptions) error { + is, indexName, err := writableIndexStore(location, cmdOpt) if err != nil { return err } @@ -149,8 +180,8 @@ func storeCaibxFile(idx desync.Index, location string, opts storeOptions) error // commands that expect to write indexes, such as make or tar. It determines // which type of writable store is needed, instantiates and returns a // single desync.IndexWriteStore. -func writableIndexStore(location string, opts storeOptions) (desync.IndexWriteStore, string, error) { - s, indexName, err := indexStoreFromLocation(location, opts) +func writableIndexStore(location string, cmdOpt cmdStoreOptions) (desync.IndexWriteStore, string, error) { + s, indexName, err := indexStoreFromLocation(location, cmdOpt) if err != nil { return nil, indexName, err } @@ -162,7 +193,7 @@ func writableIndexStore(location string, opts storeOptions) (desync.IndexWriteSt } // Parse a single store URL or path and return an initialized instance of it -func indexStoreFromLocation(location string, opts storeOptions) (desync.IndexStore, string, error) { +func indexStoreFromLocation(location string, cmdOpt cmdStoreOptions) (desync.IndexStore, string, error) { loc, err := url.Parse(location) if err != nil { return nil, "", fmt.Errorf("Unable to parse store location %s : %s", location, err) @@ -173,26 +204,46 @@ func indexStoreFromLocation(location string, opts storeOptions) (desync.IndexSto p := *loc p.Path = path.Dir(p.Path) + // Get any store options from the config if present and overwrite with settings from + // the command line. To do that it's necessary to get the base string so it can be looked + // up in the config. We could be dealing with Unix-style paths or URLs that use / or with + // Windows paths that could be using \. + var base string + switch { + case strings.Contains(location, "/"): + base = location[:strings.LastIndex(location, "/")] + case strings.Contains(location, "\\"): + base = location[:strings.LastIndex(location, "\\")] + } + opt := cmdOpt.MergedWith(cfg.GetStoreOptionsFor(base)) + var s desync.IndexStore switch loc.Scheme { case "ssh": return nil, "", errors.New("Index storage is not supported by ssh remote stores") case "sftp": - s, err = desync.NewSFTPIndexStore(loc) + s, err = desync.NewSFTPIndexStore(loc, opt) if err != nil { return nil, "", err } case "http", "https": - h, err := desync.NewRemoteHTTPIndexStore(&p, opts.n, opts.clientCert, opts.clientKey) + // This is for backwards compatibility only, to support http-timeout and + // http-error-retry in the config file for a bit longer. If those are + // set, and the options for the store aren't, then use the old values. + // TODO: Remove this code and drop these config options in the future. + if opt.Timeout == 0 && cfg.HTTPTimeout > 0 { + opt.Timeout = cfg.HTTPTimeout + } + if opt.ErrorRetry == 0 && cfg.HTTPErrorRetry > 0 { + opt.ErrorRetry = cfg.HTTPErrorRetry + } + s, err = desync.NewRemoteHTTPIndexStore(&p, opt) if err != nil { return nil, "", err } - h.SetTimeout(cfg.HTTPTimeout) - h.SetErrorRetry(cfg.HTTPErrorRetry) - s = h case "s3+http", "s3+https": s3Creds, region := cfg.GetS3CredentialsFor(&p) - s, err = desync.NewS3IndexStore(&p, s3Creds, region) + s, err = desync.NewS3IndexStore(&p, s3Creds, region, opt) if err != nil { return nil, "", err } diff --git a/cmd/desync/tar.go b/cmd/desync/tar.go index aef4091..1046349 100644 --- a/cmd/desync/tar.go +++ b/cmd/desync/tar.go @@ -73,7 +73,7 @@ func tar(ctx context.Context, args []string) error { return desync.Tar(ctx, w, sourceDir) } - sOpts := storeOptions{ + sOpts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/cmd/desync/untar.go b/cmd/desync/untar.go index 1091052..b6ceb87 100644 --- a/cmd/desync/untar.go +++ b/cmd/desync/untar.go @@ -69,7 +69,7 @@ func untar(ctx context.Context, args []string) error { return desync.UnTar(ctx, f, targetDir, opts) } - sOpts := storeOptions{ + sOpts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/cmd/desync/upgrade-s3.go b/cmd/desync/upgrade-s3.go deleted file mode 100644 index 204113e..0000000 --- a/cmd/desync/upgrade-s3.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "context" - "errors" - "flag" - "fmt" - "net/url" - "os" - - "github.com/folbricht/desync" -) - -const upgradeS3Usage = `desync upgrade-s3 -s - -Upgrades an S3 store using the deprecated layout (flat structure) to the new -layout which mirrors local stores. In the new format, each chunk is prefixed -with the 4 first characters of the checksum and prefixed with .cacnk` - -func upgradeS3(ctx context.Context, args []string) error { - var ( - storeLocation string - ) - flags := flag.NewFlagSet("upgrade-s3", flag.ExitOnError) - flags.Usage = func() { - fmt.Fprintln(os.Stderr, pruneUsage) - flags.PrintDefaults() - } - - flags.StringVar(&storeLocation, "s", "", "local store directory") - flags.Parse(args) - - if flags.NArg() > 0 { - return errors.New("Too many arguments. See -h for help.") - } - - if storeLocation == "" { - return errors.New("No store provided.") - } - - // Open the target store - loc, err := url.Parse(storeLocation) - if err != nil { - return err - } - s3Creds, region := cfg.GetS3CredentialsFor(loc) - s, err := desync.NewS3Store(loc, s3Creds, region) - if err != nil { - return err - } - defer s.Close() - - return s.Upgrade(ctx) -} diff --git a/cmd/desync/verify.go b/cmd/desync/verify.go index fc8d303..009bd21 100644 --- a/cmd/desync/verify.go +++ b/cmd/desync/verify.go @@ -40,9 +40,9 @@ func verify(ctx context.Context, args []string) error { return errors.New("No store provided.") } - s, err := desync.NewLocalStore(storeLocation) + s, err := desync.NewLocalStore(storeLocation, cfg.GetStoreOptionsFor(storeLocation)) if err != nil { return err } - return s.Verify(ctx, n, repair) + return s.Verify(ctx, n, repair, os.Stderr) } diff --git a/cmd/desync/verifyindex.go b/cmd/desync/verifyindex.go index dfa1bf4..72f72e4 100644 --- a/cmd/desync/verifyindex.go +++ b/cmd/desync/verifyindex.go @@ -42,7 +42,7 @@ func verifyIndex(ctx context.Context, args []string) error { dataFile := flags.Arg(1) // Parse the store locations, open the stores and add a cache is requested - opts := storeOptions{ + opts := cmdStoreOptions{ n: n, clientCert: clientCert, clientKey: clientKey, diff --git a/const.go b/const.go index 5fc8d1f..9e7c52f 100644 --- a/const.go +++ b/const.go @@ -136,3 +136,9 @@ var ( CaFormatTableTailMarker: "CaFormatTableTailMarker", } ) + +// CompressedChunkExt is the file extension used for compressed chunks +const CompressedChunkExt = ".cacnk" + +// UncompressedChunkExt is the file extension of uncompressed chunks +const UncompressedChunkExt = "" diff --git a/copy.go b/copy.go index 38d0427..1086dcc 100644 --- a/copy.go +++ b/copy.go @@ -42,12 +42,12 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, go func() { for id := range in { if !dst.HasChunk(id) { - b, err := src.GetChunk(id) + chunk, err := src.GetChunk(id) if err != nil { recordError(err) continue } - if err := dst.StoreChunk(id, b); err != nil { + if err := dst.StoreChunk(chunk); err != nil { recordError(err) } } diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..640737a --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/datadog/zstd v1.3.5-0.20180806225715-e46e47635dc0 h1:F123N5GB1d+0BHDy5JiIVgRekD5uGcUcnSC/YNYiKKQ= +github.com/datadog/zstd v1.3.5-0.20180806225715-e46e47635dc0/go.mod h1:inRp+etsHuvVqMPNTXaFlpf/Tj7wqviBtdJoPVrPEFQ= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.0 h1:YWOShuhvg0GqbQpMa60QlCGtEyf7O7HC1Jf0VjdQ60M= +github.com/dchest/siphash v1.2.0/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/folbricht/tempfile v0.0.1 h1:kB3DubP2Fm5e3W7TrWFNZBfzFEHBoKL7Pjn0HvqKxSQ= +github.com/folbricht/tempfile v0.0.1/go.mod h1:/Flpxx/6U+clQJ61jQ3y6Z7L2l6j1/ZSiU4B9EDPgWw= +github.com/go-ini/ini v1.38.2 h1:6Hl/z3p3iFkA0dlDfzYxuFuUGD+kaweypF6btsR2/Q4= +github.com/go-ini/ini v1.38.2/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hanwen/go-fuse v0.0.0-20180831115513-1d35017e9701 h1:EdQovFP5MjnhrasVy4rTG9x+twZ63LTsxeqxmtomylQ= +github.com/hanwen/go-fuse v0.0.0-20180831115513-1d35017e9701/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= +github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= +github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4= +github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/minio/minio-go v6.0.6+incompatible h1:AOPYom8W/kjdsjlsCVYwfb5BELGmkMP7EXhocAm5iME= +github.com/minio/minio-go v6.0.6+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8= +github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0= +github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/sftp v1.8.2 h1:3upwlsK5/USEeM5gzIe9eWdzU4sV+kG3gKKg3RLBuWE= +github.com/pkg/sftp v1.8.2/go.mod h1:NxmoDg/QLVWluQDUYG7XBZTLUpKeFa8e3aMf1BfjyHk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522 h1:Ve1ORMCxvRmSXBwJK+t3Oy+V2vRW2OetUQBq4rJIkZE= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/cheggaaa/pb.v1 v1.0.25 h1:Ev7yu1/f6+d+b3pi5vPdRPc6nNtP1umSfcWiEfRqv6I= +gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= +gopkg.in/ini.v1 v1.38.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/httphandler.go b/httphandler.go index 6e9bf81..1864e74 100644 --- a/httphandler.go +++ b/httphandler.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net/http" - "os" "path/filepath" "strings" ) @@ -13,82 +12,62 @@ import ( // HTTPHandler is the server-side handler for a HTTP chunk store. type HTTPHandler struct { HTTPHandlerBase - s Store + s Store + SkipVerifyWrite bool + Uncompressed bool } // NewHTTPHandler initializes and returns a new HTTP handler for a chunks erver. -func NewHTTPHandler(s Store, writable bool) http.Handler { - return HTTPHandler{HTTPHandlerBase{"chunk", writable}, s} +func NewHTTPHandler(s Store, writable, skipVerifyWrite, uncompressed bool) http.Handler { + return HTTPHandler{HTTPHandlerBase{"chunk", writable}, s, skipVerifyWrite, uncompressed} } func (h HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - sid := strings.TrimSuffix(filepath.Base(r.URL.Path), ".cacnk") - - // We only really need the ID, but to maintain compatibility with stores - // that are simply shared with HTTP, we expect /prefix/chunkID. Make sure - // the prefix does match the first characters of the ID. - if r.URL.Path != filepath.Join(string(os.PathSeparator), sid[0:4], sid+".cacnk") { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("expected /prefix/chunkid.cacnk")) + id, err := h.idFromPath(r.URL.Path) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - switch r.Method { case "GET": - h.get(sid, w) + h.get(id, w) case "HEAD": - h.head(sid, w) + h.head(id, w) case "PUT": - h.put(sid, w, r) + h.put(id, w, r) default: w.WriteHeader(http.StatusMethodNotAllowed) w.Write([]byte("only GET, PUT and HEAD are supported")) } } -func (h HTTPHandler) parseChunkID(sid string, w http.ResponseWriter) (ChunkID, error) { - // Parse the ID and verify the format - cid, err := ChunkIDFromString(sid) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("invalid chunk id")) - return ChunkID{}, err - } - return cid, err -} - -func (h HTTPHandler) get(sid string, w http.ResponseWriter) { - cid, err := h.parseChunkID(sid, w) - if err != nil { - return +func (h HTTPHandler) get(id ChunkID, w http.ResponseWriter) { + var b []byte + chunk, err := h.s.GetChunk(id) + if err == nil { + if h.Uncompressed { + b, err = chunk.Uncompressed() + } else { + b, err = chunk.Compressed() + } } - b, err := h.s.GetChunk(cid) - h.HTTPHandlerBase.get(sid, b, err, w) + h.HTTPHandlerBase.get(id.String(), b, err, w) } -func (h HTTPHandler) head(sid string, w http.ResponseWriter) { - cid, err := h.parseChunkID(sid, w) - if err != nil { - return - } - if h.s.HasChunk(cid) { +func (h HTTPHandler) head(id ChunkID, w http.ResponseWriter) { + if h.s.HasChunk(id) { w.WriteHeader(http.StatusOK) return } w.WriteHeader(http.StatusNotFound) } -func (h HTTPHandler) put(sid string, w http.ResponseWriter, r *http.Request) { +func (h HTTPHandler) put(id ChunkID, w http.ResponseWriter, r *http.Request) { err := h.HTTPHandlerBase.validateWritable(h.s.String(), w, r) if err != nil { return } - cid, err := h.parseChunkID(sid, w) - if err != nil { - return - } - // The upstream store needs to support writing as well s, ok := h.s.(WriteStore) if !ok { @@ -105,11 +84,39 @@ func (h HTTPHandler) put(sid string, w http.ResponseWriter, r *http.Request) { return } + // Turn it into a chunk, and validate the ID unless verification is disabled + var chunk *Chunk + if h.Uncompressed { + chunk, err = NewChunkWithID(id, b.Bytes(), nil, h.SkipVerifyWrite) + } else { + chunk, err = NewChunkWithID(id, nil, b.Bytes(), h.SkipVerifyWrite) + } + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + // Store it upstream - if err := s.StoreChunk(cid, b.Bytes()); err != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintln(w, err) + if err := s.StoreChunk(chunk); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) } + +func (h HTTPHandler) idFromPath(path string) (ChunkID, error) { + ext := CompressedChunkExt + if h.Uncompressed { + ext = UncompressedChunkExt + } + sID := strings.TrimSuffix(filepath.Base(path), ext) + if len(sID) < 4 { + return ChunkID{}, fmt.Errorf("expected format '//%s", ext) + } + + // Make sure the prefix does match the first characters of the ID. + if path != filepath.Join("/", sID[0:4], sID+ext) { + return ChunkID{}, fmt.Errorf("expected format '//%s", ext) + } + return ChunkIDFromString(sID) +} diff --git a/httphandler_test.go b/httphandler_test.go new file mode 100644 index 0000000..75d2783 --- /dev/null +++ b/httphandler_test.go @@ -0,0 +1,132 @@ +package desync + +import ( + "io/ioutil" + "net/http/httptest" + "net/url" + "os" + "testing" +) + +func TestHTTPHandlerReadWrite(t *testing.T) { + // Setup a temporary store used as upstream store that the HTTP server + // pulls from + store, err := ioutil.TempDir("", "store") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(store) + + upstream, err := NewLocalStore(store, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + + // Start a read-write capable server and a read-only server + rw := httptest.NewServer(NewHTTPHandler(upstream, true, false, false)) + defer rw.Close() + ro := httptest.NewServer(NewHTTPHandler(upstream, false, false, false)) + defer ro.Close() + + // Initialize HTTP chunks stores, one RW and the other RO + rwStoreURL, _ := url.Parse(rw.URL) + rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + roStoreURL, _ := url.Parse(ro.URL) + roStore, err := NewRemoteHTTPStore(roStoreURL, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + + // Make up some data and store it in the RW store + dataIn := []byte("some data") + chunkIn := NewChunkFromUncompressed(dataIn) + id := chunkIn.ID() + if err := rwStore.StoreChunk(chunkIn); err != nil { + t.Fatal(err) + } + + // Check it's in the store + if !rwStore.HasChunk(id) { + t.Fatal("chunk not found in store") + } + + // Let's try to send some data to the RO store, that should fail + if err := roStore.StoreChunk(chunkIn); err == nil { + t.Fatal("expected error writing to read-only chunkstore") + } +} + +func TestHTTPHandlerCompression(t *testing.T) { + // Setup a temporary store used as upstream store that the HTTP server + // pulls from + store, err := ioutil.TempDir("", "store") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(store) + + upstream, err := NewLocalStore(store, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + + // Start a server that uses compression, and one that serves uncompressed chunks + co := httptest.NewServer(NewHTTPHandler(upstream, true, false, false)) + defer co.Close() + un := httptest.NewServer(NewHTTPHandler(upstream, true, false, true)) + defer un.Close() + + // Initialize HTTP chunks stores, one RW and the other RO. Also make one that's + // trying to get compressed data from a HTTP store that serves only uncompressed. + coStoreURL, _ := url.Parse(co.URL) + coStore, err := NewRemoteHTTPStore(coStoreURL, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + unStoreURL, _ := url.Parse(un.URL) + unStore, err := NewRemoteHTTPStore(unStoreURL, StoreOptions{Uncompressed: true}) + if err != nil { + t.Fatal(err) + } + invalidStore, err := NewRemoteHTTPStore(unStoreURL, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + + // Make up some data and store it in the RW store + dataIn := []byte("some data") + chunkIn := NewChunkFromUncompressed(dataIn) + id := chunkIn.ID() + + // Try to get compressed chunks from a store that only serves uncompressed chunks + if _, err := invalidStore.GetChunk(id); err == nil { + t.Fatal("expected failure trying to get compressed chunks from uncompressed http store") + } + + if err := coStore.StoreChunk(chunkIn); err != nil { + t.Fatal(err) + } + + // Check it's in the store when looking for compressed chunks + if !coStore.HasChunk(id) { + t.Fatal("chunk not found in store") + } + + // It's also visible when looking for uncompressed data + if !unStore.HasChunk(id) { + t.Fatal("chunk not found in store") + } + + // Send it uncompressed + if err := unStore.StoreChunk(chunkIn); err != nil { + t.Fatal(err) + } + + // Try to get the uncompressed chunk + if _, err := unStore.GetChunk(id); err != nil { + t.Fatal(err) + } +} diff --git a/httphandlerbase.go b/httphandlerbase.go index edad732..1ca2b05 100644 --- a/httphandlerbase.go +++ b/httphandlerbase.go @@ -19,8 +19,7 @@ func (h HTTPHandlerBase) get(id string, b []byte, err error, w http.ResponseWrit case nil: w.WriteHeader(http.StatusOK) w.Write(b) - case ChunkMissing: - case NoSuchObject: + case ChunkMissing, NoSuchObject: w.WriteHeader(http.StatusNotFound) fmt.Fprintf(w, "%s %s not found", h.handlerType, id) default: diff --git a/index.go b/index.go index 5859247..6defddf 100644 --- a/index.go +++ b/index.go @@ -3,7 +3,6 @@ package desync import ( "bufio" "context" - "crypto/sha512" "fmt" "math" "sync" @@ -168,14 +167,14 @@ func ChunkStream(ctx context.Context, c Chunker, ws WriteStore, n int) (Index, e wg.Add(1) go func() { for c := range in { - // Calculate the chunk ID - id := sha512.Sum512_256(c.b) + // Create a chunk object, needed to calculate the checksum + chunk := NewChunkFromUncompressed(c.b) // Record the index row - chunk := IndexChunk{Start: c.start, Size: uint64(len(c.b)), ID: id} - recordResult(c.num, chunk) + idxChunk := IndexChunk{Start: c.start, Size: uint64(len(c.b)), ID: chunk.ID()} + recordResult(c.num, idxChunk) - if err := s.StoreChunk(id, c.b); err != nil { + if err := s.StoreChunk(chunk); err != nil { recordError(err) continue } diff --git a/index_test.go b/index_test.go index d2da156..1a7461e 100644 --- a/index_test.go +++ b/index_test.go @@ -87,7 +87,7 @@ func TestIndexChunking(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(dir) // clean up - s, err := NewLocalStore(dir) + s, err := NewLocalStore(dir, StoreOptions{}) if err != nil { t.Fatal(err) } @@ -175,7 +175,7 @@ func splitBlob(b *testing.B) { b.Fatal(err) } defer os.RemoveAll(dir) // clean up - s, err := NewLocalStore(dir) + s, err := NewLocalStore(dir, StoreOptions{}) if err != nil { b.Fatal(err) } diff --git a/local.go b/local.go index a087f61..cfd6143 100644 --- a/local.go +++ b/local.go @@ -2,8 +2,8 @@ package desync import ( "context" - "crypto/sha512" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -11,11 +11,8 @@ import ( "sync" "github.com/folbricht/tempfile" - "github.com/pkg/errors" ) -const chunkFileExt = ".cacnk" - // LocalStore casync store type LocalStore struct { Base string @@ -23,11 +20,13 @@ type LocalStore struct { // When accessing chunks, should mtime be updated? Useful when this is // a cache. Old chunks can be identified and removed from the store that way UpdateTimes bool + + opt StoreOptions } // NewLocalStore creates an instance of a local castore, it only checks presence // of the store -func NewLocalStore(dir string) (LocalStore, error) { +func NewLocalStore(dir string, opt StoreOptions) (LocalStore, error) { info, err := os.Stat(dir) if err != nil { return LocalStore{}, err @@ -35,26 +34,26 @@ func NewLocalStore(dir string) (LocalStore, error) { if !info.IsDir() { return LocalStore{}, fmt.Errorf("%s is not a directory", dir) } - return LocalStore{Base: dir}, nil + return LocalStore{Base: dir, opt: opt}, nil } // GetChunk reads and returns one (compressed!) chunk from the store -func (s LocalStore) GetChunk(id ChunkID) ([]byte, error) { - sID := id.String() - p := filepath.Join(s.Base, sID[0:4], sID) + chunkFileExt - +func (s LocalStore) GetChunk(id ChunkID) (*Chunk, error) { + _, p := s.nameFromID(id) b, err := ioutil.ReadFile(p) if os.IsNotExist(err) { - err = ChunkMissing{id} + return nil, ChunkMissing{id} } - return b, err + if s.opt.Uncompressed { + return NewChunkWithID(id, b, nil, s.opt.SkipVerify) + } + return NewChunkWithID(id, nil, b, s.opt.SkipVerify) } // RemoveChunk deletes a chunk, typically an invalid one, from the filesystem. // Used when verifying and repairing caches. func (s LocalStore) RemoveChunk(id ChunkID) error { - sID := id.String() - p := filepath.Join(s.Base, sID[0:4], sID) + chunkFileExt + _, p := s.nameFromID(id) if _, err := os.Stat(p); err != nil { return ChunkMissing{id} } @@ -62,9 +61,20 @@ func (s LocalStore) RemoveChunk(id ChunkID) error { } // StoreChunk adds a new chunk to the store -func (s LocalStore) StoreChunk(id ChunkID, b []byte) error { - sID := id.String() - d := filepath.Join(s.Base, sID[0:4]) +func (s LocalStore) StoreChunk(chunk *Chunk) error { + d, p := s.nameFromID(chunk.ID()) + var ( + b []byte + err error + ) + if s.opt.Uncompressed { + b, err = chunk.Uncompressed() + } else { + b, err = chunk.Compressed() + } + if err != nil { + return err + } if err := os.MkdirAll(d, 0755); err != nil { return err } @@ -83,8 +93,9 @@ func (s LocalStore) StoreChunk(id ChunkID, b []byte) error { } // Verify all chunks in the store. If repair is set true, bad chunks are deleted. -// n determines the number of concurrent operations. -func (s LocalStore) Verify(ctx context.Context, n int, repair bool) error { +// n determines the number of concurrent operations. w is used to write any messages +// intended for the user, typically os.Stderr. +func (s LocalStore) Verify(ctx context.Context, n int, repair bool, w io.Writer) error { var wg sync.WaitGroup ids := make(chan ChunkID) @@ -93,7 +104,7 @@ func (s LocalStore) Verify(ctx context.Context, n int, repair bool) error { wg.Add(1) go func() { for id := range ids { - err := s.verifyChunk(id) + _, err := s.GetChunk(id) switch err.(type) { case ChunkInvalid: // bad chunk, report and delete (if repair=true) msg := err.Error() @@ -104,10 +115,10 @@ func (s LocalStore) Verify(ctx context.Context, n int, repair bool) error { msg = msg + ": removed" } } - fmt.Fprintln(os.Stderr, msg) - case nil: // all good, move to the next + fmt.Fprintln(w, msg) + case nil: default: // unexpected, print the error and carry on - fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(w, err) } } wg.Done() @@ -129,12 +140,22 @@ func (s LocalStore) Verify(ctx context.Context, n int, repair bool) error { if info.IsDir() { // Skip dirs return nil } - if !strings.HasSuffix(path, chunkFileExt) { // Skip files without chunk extension - return nil + // Skip compressed chunks if this is running in uncompressed mode and vice-versa + var sID string + if s.opt.Uncompressed { + if !strings.HasSuffix(path, UncompressedChunkExt) { + return nil + } + sID = strings.TrimSuffix(filepath.Base(path), UncompressedChunkExt) + } else { + if !strings.HasSuffix(path, CompressedChunkExt) { + return nil + } + sID = strings.TrimSuffix(filepath.Base(path), CompressedChunkExt) } // Convert the name into a checksum, if that fails we're probably not looking // at a chunk file and should skip it. - id, err := ChunkIDFromString(strings.TrimSuffix(filepath.Base(path), ".cacnk")) + id, err := ChunkIDFromString(sID) if err != nil { return nil } @@ -164,12 +185,22 @@ func (s LocalStore) Prune(ctx context.Context, ids map[ChunkID]struct{}) error { if info.IsDir() { // Skip dirs return nil } - if !strings.HasSuffix(path, chunkFileExt) { // Skip files without chunk extension - return nil + // Skip compressed chunks if this is running in uncompressed mode and vice-versa + var sID string + if s.opt.Uncompressed { + if !strings.HasSuffix(path, UncompressedChunkExt) { + return nil + } + sID = strings.TrimSuffix(filepath.Base(path), UncompressedChunkExt) + } else { + if !strings.HasSuffix(path, CompressedChunkExt) { + return nil + } + sID = strings.TrimSuffix(filepath.Base(path), CompressedChunkExt) } // Convert the name into a checksum, if that fails we're probably not looking // at a chunk file and should skip it. - id, err := ChunkIDFromString(strings.TrimSuffix(filepath.Base(path), ".cacnk")) + id, err := ChunkIDFromString(sID) if err != nil { return nil } @@ -185,34 +216,11 @@ func (s LocalStore) Prune(ctx context.Context, ids map[ChunkID]struct{}) error { return err } -// Unpack a chunk, calculate the checksum of its content and return nil if -// they match. -func (s LocalStore) verifyChunk(id ChunkID) error { - b, err := s.GetChunk(id) - if err != nil { - return err - } - // The the chunk is compressed. Decompress it here - db, err := Decompress(nil, b) - if err != nil { - return errors.Wrap(err, id.String()) - } - // Verify the checksum of the chunk matches the ID - sum := sha512.Sum512_256(db) - if sum != id { - return ChunkInvalid{ID: id, Sum: sum} - } - return nil -} - // HasChunk returns true if the chunk is in the store func (s LocalStore) HasChunk(id ChunkID) bool { - sID := id.String() - p := filepath.Join(s.Base, sID[0:4], sID) + chunkFileExt - if _, err := os.Stat(p); err == nil { - return true - } - return false + _, p := s.nameFromID(id) + _, err := os.Stat(p) + return err == nil } func (s LocalStore) String() string { @@ -221,3 +229,15 @@ func (s LocalStore) String() string { // Close the store. NOP opertation, needed to implement Store interface. func (s LocalStore) Close() error { return nil } + +func (s LocalStore) nameFromID(id ChunkID) (dir, name string) { + sID := id.String() + dir = filepath.Join(s.Base, sID[0:4]) + name = filepath.Join(dir, sID) + if s.opt.Uncompressed { + name += UncompressedChunkExt + } else { + name += CompressedChunkExt + } + return +} diff --git a/local_test.go b/local_test.go new file mode 100644 index 0000000..2445c47 --- /dev/null +++ b/local_test.go @@ -0,0 +1,191 @@ +package desync + +import ( + "bytes" + "context" + "io/ioutil" + "os" + "testing" +) + +func TestLocalStoreCompressed(t *testing.T) { + // Setup a temporary store + store, err := ioutil.TempDir("", "store") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(store) + + s, err := NewLocalStore(store, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + + // Make up some data and store it + dataIn := []byte("some data") + + chunkIn := NewChunkFromUncompressed(dataIn) + id := chunkIn.ID() + if err := s.StoreChunk(chunkIn); err != nil { + t.Fatal(err) + } + + // Check it's in the store + if !s.HasChunk(id) { + t.Fatal("chunk not found in store") + } + + // Pull the data the "official" way + chunkOut, err := s.GetChunk(id) + if err != nil { + t.Fatal(err) + } + dataOut, err := chunkOut.Uncompressed() + if err != nil { + t.Fatal(err) + } + + // Compare the data that went in with what came out + if !bytes.Equal(dataIn, dataOut) { + t.Fatal("input and output data doesn't match after store/retrieve") + } + + // Now let's look at the file in the store directly to make sure it's compressed + _, name := s.nameFromID(id) + b, err := ioutil.ReadFile(name) + if err != nil { + t.Fatal(err) + } + if bytes.Equal(dataIn, b) { + t.Fatal("chunk is not compressed") + } +} + +func TestLocalStoreUncompressed(t *testing.T) { + // Setup a temporary store + store, err := ioutil.TempDir("", "store") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(store) + + s, err := NewLocalStore(store, StoreOptions{Uncompressed: true}) + if err != nil { + t.Fatal(err) + } + + // Make up some data and store it + dataIn := []byte("some data") + + chunkIn := NewChunkFromUncompressed(dataIn) + id := chunkIn.ID() + if err := s.StoreChunk(chunkIn); err != nil { + t.Fatal(err) + } + + // Check it's in the store + if !s.HasChunk(id) { + t.Fatal("chunk not found in store") + } + + // Pull the data the "official" way + chunkOut, err := s.GetChunk(id) + if err != nil { + t.Fatal(err) + } + dataOut, err := chunkOut.Uncompressed() + if err != nil { + t.Fatal(err) + } + + // Compare the data that went in with what came out + if !bytes.Equal(dataIn, dataOut) { + t.Fatal("input and output data doesn't match after store/retrieve") + } + + // Now let's look at the file in the store directly to make sure it's uncompressed + _, name := s.nameFromID(id) + b, err := ioutil.ReadFile(name) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(dataIn, b) { + t.Fatal("chunk is not compressed") + } +} + +func TestLocalStoreErrorHandling(t *testing.T) { + // Setup a temporary store + store, err := ioutil.TempDir("", "store") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(store) + + s, err := NewLocalStore(store, StoreOptions{}) + if err != nil { + t.Fatal(err) + } + + // Make up some data and store it + dataIn := []byte("some data") + + chunkIn := NewChunkFromUncompressed(dataIn) + id := chunkIn.ID() + if err := s.StoreChunk(chunkIn); err != nil { + t.Fatal(err) + } + + // Now put an invalid chunk into the store + idInvalid, err := ChunkIDFromString("1000000000000000000000000000000000000000000000000000000000000000") + if err != nil { + t.Fatal(err) + } + dirInvalid, nameInvalid := s.nameFromID(idInvalid) + os.Mkdir(dirInvalid, 0755) + if err := ioutil.WriteFile(nameInvalid, []byte("invalid data"), 0644); err != nil { + t.Fatal(err) + } + + // Also add a blank chunk + idBlank, err := ChunkIDFromString("2000000000000000000000000000000000000000000000000000000000000000") + if err != nil { + t.Fatal(err) + } + dirBlank, nameBlank := s.nameFromID(idBlank) + os.Mkdir(dirBlank, 0755) + if err := ioutil.WriteFile(nameBlank, nil, 0644); err != nil { + t.Fatal(err) + } + + // Let's see if we can retrieve the good chunk and get errors from the bad ones + if _, err := s.GetChunk(id); err != nil { + t.Fatal(err) + } + _, err = s.GetChunk(idInvalid) + if _, ok := err.(ChunkInvalid); !ok { + t.Fatal(err) + } + _, err = s.GetChunk(idBlank) + if _, ok := err.(ChunkInvalid); !ok { + t.Fatal(err) + } + + // Run the verify with repair enabled which should get rid of the invalid and blank chunks + if err := s.Verify(context.Background(), 1, true, ioutil.Discard); err != nil { + t.Fatal(err) + } + + // Let's see if we can still retrieve the good chunk and get Not Found for the others + if _, err := s.GetChunk(id); err != nil { + t.Fatal(err) + } + _, err = s.GetChunk(idInvalid) + if _, ok := err.(ChunkMissing); !ok { + t.Fatal(err) + } + _, err = s.GetChunk(idBlank) + if _, ok := err.(ChunkMissing); !ok { + t.Fatal(err) + } +} diff --git a/protocol.go b/protocol.go index 6ebeb6a..d32581a 100644 --- a/protocol.go +++ b/protocol.go @@ -134,7 +134,7 @@ func (p *Protocol) SendGoodbye() error { // RequestChunk sends a request for a specific chunk to the server, waits for // the response and returns the bytes in the chunk. Returns an error if the // server reports the chunk as missing -func (p *Protocol) RequestChunk(id ChunkID) ([]byte, error) { +func (p *Protocol) RequestChunk(id ChunkID) (*Chunk, error) { if !p.initialized { return nil, errors.New("protocol not initialized") } @@ -153,15 +153,8 @@ func (p *Protocol) RequestChunk(id ChunkID) ([]byte, error) { if len(m.Body) < 40 { return nil, errors.New("received chunk too small") } - cid, err := ChunkIDFromSlice(m.Body[8:40]) - if err != nil { - return nil, err - } - if cid != id { - return nil, fmt.Errorf("requested chunk %s from server, but got %s", id, cid) - } // The rest should be the chunk data - return m.Body[40:], nil + return NewChunkWithID(id, nil, m.Body[40:], false) default: return nil, fmt.Errorf("unexpected protocol message type %x", m.Type) } diff --git a/protocol_test.go b/protocol_test.go index 556b313..e5e2b76 100644 --- a/protocol_test.go +++ b/protocol_test.go @@ -14,8 +14,10 @@ func TestProtocol(t *testing.T) { client := NewProtocol(r2, w1) // Test data - cID := ChunkID{1, 2, 3, 4} - cData := []byte{0, 0, 1, 1, 2, 2} + uncompressed := []byte{0, 0, 1, 1, 2, 2} + inChunk := NewChunkFromUncompressed(uncompressed) + compressed, _ := inChunk.Compressed() + cID := inChunk.ID() // Server go func() { @@ -37,7 +39,7 @@ func TestProtocol(t *testing.T) { if err != nil { t.Fatal(err) } - if err := client.SendProtocolChunk(id, 0, cData); err != nil { + if err := client.SendProtocolChunk(id, 0, compressed); err != nil { t.Fatal(err) } default: @@ -60,7 +62,8 @@ func TestProtocol(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(chunk, cData) { + b, _ := chunk.Uncompressed() + if !bytes.Equal(b, uncompressed) { t.Fatal("chunk data doesn't match expected") } } diff --git a/protocolserver.go b/protocolserver.go index 4a8b264..3d5f5c6 100644 --- a/protocolserver.go +++ b/protocolserver.go @@ -52,7 +52,7 @@ func (s *ProtocolServer) Serve(ctx context.Context) error { if err != nil { return errors.Wrap(err, "unable to decode requested chunk id") } - b, err := s.store.GetChunk(id) + chunk, err := s.store.GetChunk(id) if err != nil { if _, ok := err.(ChunkMissing); ok { if err = s.p.SendMissing(id); err != nil { @@ -61,7 +61,11 @@ func (s *ProtocolServer) Serve(ctx context.Context) error { } return errors.Wrap(err, "unable to read chunk from store") } - if err := s.p.SendProtocolChunk(id, CaProtocolChunkCompressed, b); err != nil { + b, err := chunk.Compressed() + if err != nil { + return err + } + if err := s.p.SendProtocolChunk(chunk.ID(), CaProtocolChunkCompressed, b); err != nil { return errors.Wrap(err, "failed to send chunk data") } case CaProtocolAbort: diff --git a/protocolserver_test.go b/protocolserver_test.go index 115e195..2e83e1a 100644 --- a/protocolserver_test.go +++ b/protocolserver_test.go @@ -12,10 +12,14 @@ func TestProtocolServer(t *testing.T) { r2, w2 := io.Pipe() server := NewProtocol(r1, w2) - // client := NewProtocol(r2, w1) + // Test data + uncompressed := []byte{4, 3, 2, 1} + chunkIn := NewChunkFromUncompressed(uncompressed) + compressed, _ := chunkIn.Compressed() + id := chunkIn.ID() store := TestStore{ - ChunkID{1}: []byte{4, 3, 2, 1}, + id: compressed, } ps := NewProtocolServer(r2, w1, store) @@ -31,11 +35,12 @@ func TestProtocolServer(t *testing.T) { } // Should find this chunk - chunk, err := server.RequestChunk(ChunkID{1}) + chunk, err := server.RequestChunk(id) if err != nil { t.Fatal(err) } - if !bytes.Equal(chunk, store[ChunkID{1}]) { + b, _ := chunk.Uncompressed() + if !bytes.Equal(b, uncompressed) { t.Fatal("chunk data doesn't match expected") } diff --git a/readseeker.go b/readseeker.go index 35db2c1..bf62a90 100644 --- a/readseeker.go +++ b/readseeker.go @@ -4,8 +4,6 @@ import ( "fmt" "io" "sort" - - "github.com/pkg/errors" ) // TODO: Implement WriterTo interface @@ -91,28 +89,22 @@ func (ip *IndexPos) findOffset(newPos int64) (int64, error) { return newPos, err } -func (ip *IndexPos) loadChunk() (err error) { +func (ip *IndexPos) loadChunk() error { // See if we can simply read a blank slice from memory if the null chunk // is being loaded if ip.curChunkID == ip.nullChunk.ID { ip.curChunk = ip.nullChunk.Data - return + return nil } - - var compressedChunk []byte - var decompressedChunk []byte - - compressedChunk, err = ip.Store.GetChunk(ip.curChunkID) + chunk, err := ip.Store.GetChunk(ip.curChunkID) if err != nil { return err } - - decompressedChunk, err = Decompress(nil, compressedChunk) + b, err := chunk.Uncompressed() if err != nil { - return errors.Wrap(err, ip.curChunkID.String()) + return err } - - ip.curChunk = decompressedChunk + ip.curChunk = b return nil } diff --git a/remotehttp.go b/remotehttp.go index 4e48937..7b64f29 100644 --- a/remotehttp.go +++ b/remotehttp.go @@ -23,9 +23,9 @@ var TrustInsecure bool // RemoteHTTPBase is the base object for a remote, HTTP-based chunk or index stores. type RemoteHTTPBase struct { - location *url.URL - client *http.Client - errorRetry int + location *url.URL + client *http.Client + opt StoreOptions } // RemoteHTTP is a remote casync store accessed via HTTP. @@ -34,7 +34,7 @@ type RemoteHTTP struct { } // NewRemoteHTTPStoreBase initializes a base object for HTTP index or chunk stores. -func NewRemoteHTTPStoreBase(location *url.URL, n int, cert string, key string) (*RemoteHTTPBase, error) { +func NewRemoteHTTPStoreBase(location *url.URL, opt StoreOptions) (*RemoteHTTPBase, error) { if location.Scheme != "http" && location.Scheme != "https" { return nil, fmt.Errorf("unsupported scheme %s, expected http or https", location.Scheme) } @@ -46,11 +46,11 @@ func NewRemoteHTTPStoreBase(location *url.URL, n int, cert string, key string) ( var tr *http.Transport - if cert != "" && key != "" { + if opt.ClientCert != "" && opt.ClientKey != "" { // Load client cert - certificate, err := tls.LoadX509KeyPair(cert, key) + certificate, err := tls.LoadX509KeyPair(opt.ClientCert, opt.ClientKey) if err != nil { - return nil, fmt.Errorf("failed to load client certificate from %s", cert) + return nil, fmt.Errorf("failed to load client certificate from %s", opt.ClientCert) } caCertPool, err := x509.SystemCertPool() if err != nil { @@ -59,7 +59,7 @@ func NewRemoteHTTPStoreBase(location *url.URL, n int, cert string, key string) ( tr = &http.Transport{ Proxy: http.ProxyFromEnvironment, DisableCompression: true, - MaxIdleConnsPerHost: n, + MaxIdleConnsPerHost: opt.N, IdleConnTimeout: 60 * time.Second, TLSClientConfig: &tls.Config{ InsecureSkipVerify: TrustInsecure, @@ -74,27 +74,19 @@ func NewRemoteHTTPStoreBase(location *url.URL, n int, cert string, key string) ( tr = &http.Transport{ Proxy: http.ProxyFromEnvironment, DisableCompression: true, - MaxIdleConnsPerHost: n, + MaxIdleConnsPerHost: opt.N, IdleConnTimeout: 60 * time.Second, TLSClientConfig: &tls.Config{InsecureSkipVerify: TrustInsecure}, } } - client := &http.Client{Transport: tr} - - return &RemoteHTTPBase{location: location, client: client}, nil -} - -// SetTimeout configures the timeout on the HTTP client for all requests -func (r *RemoteHTTPBase) SetTimeout(timeout time.Duration) { - r.client.Timeout = timeout -} + timeout := opt.Timeout + if timeout == 0 { + timeout = time.Minute + } + client := &http.Client{Transport: tr, Timeout: timeout} -// SetErrorRetry defines how many HTTP errors are retried. This can be useful -// when dealing with unreliable networks that can timeout or where errors are -// transient. -func (r *RemoteHTTPBase) SetErrorRetry(n int) { - r.errorRetry = n + return &RemoteHTTPBase{location: location, client: client, opt: opt}, nil } func (r *RemoteHTTPBase) String() string { @@ -117,7 +109,7 @@ func (r *RemoteHTTPBase) GetObject(name string) ([]byte, error) { attempt++ resp, err = r.client.Get(u.String()) if err != nil { - if attempt >= r.errorRetry { + if attempt >= r.opt.ErrorRetry { return nil, errors.Wrap(err, u.String()) } continue @@ -132,7 +124,7 @@ func (r *RemoteHTTPBase) GetObject(name string) ([]byte, error) { } b, err = ioutil.ReadAll(resp.Body) if err != nil { - if attempt >= r.errorRetry { + if attempt >= r.opt.ErrorRetry { return nil, errors.Wrap(err, u.String()) } continue @@ -159,7 +151,7 @@ retry: } resp, err = r.client.Do(req) if err != nil { - if attempt >= r.errorRetry { + if attempt >= r.opt.ErrorRetry { return err } goto retry @@ -174,26 +166,30 @@ retry: // NewRemoteHTTPStore initializes a new store that pulls chunks via HTTP(S) from // a remote web server. n defines the size of idle connections allowed. -func NewRemoteHTTPStore(location *url.URL, n int, cert string, key string) (*RemoteHTTP, error) { - b, err := NewRemoteHTTPStoreBase(location, n, cert, key) +func NewRemoteHTTPStore(location *url.URL, opt StoreOptions) (*RemoteHTTP, error) { + b, err := NewRemoteHTTPStoreBase(location, opt) if err != nil { return nil, err } return &RemoteHTTP{b}, nil } -// GetChunk reads and returns one (compressed!) chunk from the store -func (r *RemoteHTTP) GetChunk(id ChunkID) ([]byte, error) { - sID := id.String() - p := filepath.Join(sID[0:4], sID) + chunkFileExt - return r.GetObject(p) +// GetChunk reads and returns one chunk from the store +func (r *RemoteHTTP) GetChunk(id ChunkID) (*Chunk, error) { + p := r.nameFromID(id) + b, err := r.GetObject(p) + if err != nil { + return nil, err + } + if r.opt.Uncompressed { + return NewChunkWithID(id, b, nil, r.opt.SkipVerify) + } + return NewChunkWithID(id, nil, b, r.opt.SkipVerify) } // HasChunk returns true if the chunk is in the store func (r *RemoteHTTP) HasChunk(id ChunkID) bool { - sID := id.String() - p := filepath.Join(sID[0:4], sID) + chunkFileExt - + p := r.nameFromID(id) u, _ := r.location.Parse(p) var ( resp *http.Response @@ -204,7 +200,7 @@ retry: attempt++ resp, err = r.client.Head(u.String()) if err != nil { - if attempt >= r.errorRetry { + if attempt >= r.opt.ErrorRetry { return false } goto retry @@ -220,8 +216,30 @@ retry: } // StoreChunk adds a new chunk to the store -func (r *RemoteHTTP) StoreChunk(id ChunkID, b []byte) error { - sID := id.String() - p := filepath.Join(sID[0:4], sID) + chunkFileExt +func (r *RemoteHTTP) StoreChunk(chunk *Chunk) error { + p := r.nameFromID(chunk.ID()) + var ( + b []byte + err error + ) + if r.opt.Uncompressed { + b, err = chunk.Uncompressed() + } else { + b, err = chunk.Compressed() + } + if err != nil { + return err + } return r.StoreObject(p, bytes.NewReader(b)) } + +func (r *RemoteHTTP) nameFromID(id ChunkID) string { + sID := id.String() + name := filepath.Join(sID[0:4], sID) + if r.opt.Uncompressed { + name += UncompressedChunkExt + } else { + name += CompressedChunkExt + } + return name +} diff --git a/remotehttpindex.go b/remotehttpindex.go index 9a036f5..7a8006e 100644 --- a/remotehttpindex.go +++ b/remotehttpindex.go @@ -14,8 +14,8 @@ type RemoteHTTPIndex struct { // NewRemoteHTTPIndexStore initializes a new store that pulls the specified index file via HTTP(S) from // a remote web server. -func NewRemoteHTTPIndexStore(location *url.URL, n int, cert string, key string) (*RemoteHTTPIndex, error) { - b, err := NewRemoteHTTPStoreBase(location, n, cert, key) +func NewRemoteHTTPIndexStore(location *url.URL, opt StoreOptions) (*RemoteHTTPIndex, error) { + b, err := NewRemoteHTTPStoreBase(location, opt) if err != nil { return nil, err } diff --git a/remotessh.go b/remotessh.go index f1fd4d0..b39a098 100644 --- a/remotessh.go +++ b/remotessh.go @@ -18,10 +18,10 @@ type RemoteSSH struct { } // NewRemoteSSHStore establishes up to n connections with a casync chunk server -func NewRemoteSSHStore(location *url.URL, n int) (*RemoteSSH, error) { - remote := RemoteSSH{location: location, pool: make(chan *Protocol, n), n: n} +func NewRemoteSSHStore(location *url.URL, opt StoreOptions) (*RemoteSSH, error) { + remote := RemoteSSH{location: location, pool: make(chan *Protocol, opt.N), n: opt.N} // Start n sessions and put them into the pool (buffered channel) - for i := 0; i < n; i++ { + for i := 0; i < remote.n; i++ { s, err := StartProtocol(location) if err != nil { return &remote, errors.Wrap(err, "failed to start chunk server command") @@ -34,11 +34,11 @@ func NewRemoteSSHStore(location *url.URL, n int) (*RemoteSSH, error) { // GetChunk requests a chunk from the server and returns a (compressed) one. // It uses any of the n sessions this store maintains in its pool. Blocks until // one session becomes available -func (r *RemoteSSH) GetChunk(id ChunkID) ([]byte, error) { +func (r *RemoteSSH) GetChunk(id ChunkID) (*Chunk, error) { client := <-r.pool - b, err := client.RequestChunk(id) + chunk, err := client.RequestChunk(id) r.pool <- client - return b, err + return chunk, err } // Close terminates all client connections diff --git a/s3.go b/s3.go index 2bf1226..2891ecd 100644 --- a/s3.go +++ b/s3.go @@ -19,6 +19,7 @@ type S3StoreBase struct { client *minio.Client bucket string prefix string + opt StoreOptions } // S3Store is a read-write store with S3 backing @@ -27,9 +28,9 @@ type S3Store struct { } // NewS3StoreBase initializes a base object used for chunk or index stores backed by S3. -func NewS3StoreBase(u *url.URL, s3Creds *credentials.Credentials, region string) (S3StoreBase, error) { +func NewS3StoreBase(u *url.URL, s3Creds *credentials.Credentials, region string, opt StoreOptions) (S3StoreBase, error) { var err error - s := S3StoreBase{Location: u.String()} + s := S3StoreBase{Location: u.String(), opt: opt} if !strings.HasPrefix(u.Scheme, "s3+http") { return s, fmt.Errorf("invalid scheme '%s', expected 's3+http' or 's3+https'", u.Scheme) } @@ -69,16 +70,16 @@ func (s S3StoreBase) Close() error { return nil } // should be provided like this: s3+http://host:port/bucket // Credentials are passed in via the environment variables S3_ACCESS_KEY // and S3S3_SECRET_KEY, or via the desync config file. -func NewS3Store(location *url.URL, s3Creds *credentials.Credentials, region string) (s S3Store, e error) { - b, err := NewS3StoreBase(location, s3Creds, region) +func NewS3Store(location *url.URL, s3Creds *credentials.Credentials, region string, opt StoreOptions) (s S3Store, e error) { + b, err := NewS3StoreBase(location, s3Creds, region, opt) if err != nil { return s, err } return S3Store{b}, nil } -// GetChunk reads and returns one (compressed!) chunk from the store -func (s S3Store) GetChunk(id ChunkID) ([]byte, error) { +// GetChunk reads and returns one chunk from the store +func (s S3Store) GetChunk(id ChunkID) (*Chunk, error) { name := s.nameFromID(id) obj, err := s.client.GetObject(s.bucket, name, minio.GetObjectOptions{}) if err != nil { @@ -97,14 +98,32 @@ func (s S3Store) GetChunk(id ChunkID) ([]byte, error) { err = errors.Wrap(err, fmt.Sprintf("chunk %s could not be retrieved from s3 store", id)) } } - return b, err + if err != nil { + return nil, err + } + if s.opt.Uncompressed { + return NewChunkWithID(id, b, nil, s.opt.SkipVerify) + } + return NewChunkWithID(id, nil, b, s.opt.SkipVerify) } // StoreChunk adds a new chunk to the store -func (s S3Store) StoreChunk(id ChunkID, b []byte) error { +func (s S3Store) StoreChunk(chunk *Chunk) error { contentType := "application/zstd" - name := s.nameFromID(id) - _, err := s.client.PutObject(s.bucket, name, bytes.NewReader(b), int64(len(b)), minio.PutObjectOptions{ContentType: contentType}) + name := s.nameFromID(chunk.ID()) + var ( + b []byte + err error + ) + if s.opt.Uncompressed { + b, err = chunk.Uncompressed() + } else { + b, err = chunk.Compressed() + } + if err != nil { + return err + } + _, err = s.client.PutObject(s.bucket, name, bytes.NewReader(b), int64(len(b)), minio.PutObjectOptions{ContentType: contentType}) return errors.Wrap(err, s.String()) } @@ -153,66 +172,30 @@ func (s S3Store) Prune(ctx context.Context, ids map[ChunkID]struct{}) error { return nil } -// Upgrade converts the storage layout in S3 from the old format (just a flat -// layout) to the current layout which prefixes every chunk with the first 4 -// characters of the checksum as well as a .cacnk extension. This aligns the -// layout with that of local stores and allows the used of sync tools outside -// of this tool, local stores could be copied into S3 for example. -func (s S3Store) Upgrade(ctx context.Context) error { - doneCh := make(chan struct{}) - defer close(doneCh) - objectCh := s.client.ListObjectsV2(s.bucket, s.prefix, false, doneCh) - for object := range objectCh { - if object.Err != nil { - return object.Err - } - // See if we're meant to stop - select { - case <-ctx.Done(): - return Interrupted{} - default: - } - - // Skip if this one's already in the new format - if strings.HasSuffix(object.Key, chunkFileExt) { - continue - } - - // Skip if we can't parse this checksum, must be an unrelated file - id, err := ChunkIDFromString(strings.TrimPrefix(object.Key, s.prefix)) - if err != nil { - continue - } - - // Copy the chunk with the new name - newName := s.nameFromID(id) - src := minio.NewSourceInfo(s.bucket, object.Key, nil) - dst, err := minio.NewDestinationInfo(s.bucket, newName, nil, nil) - if err != nil { - return err - } - if err = s.client.CopyObject(dst, src); err != nil { - return err - } - - // Once copied, drop the old chunk - if err = s.client.RemoveObject(s.bucket, object.Key); err != nil { - return err - } - } - return nil -} - func (s S3Store) nameFromID(id ChunkID) string { sID := id.String() - return s.prefix + sID[0:4] + "/" + sID + chunkFileExt + name := s.prefix + sID[0:4] + "/" + sID + if s.opt.Uncompressed { + name += UncompressedChunkExt + } else { + name += CompressedChunkExt + } + return name } func (s S3Store) idFromName(name string) (ChunkID, error) { - if !strings.HasSuffix(name, chunkFileExt) { - return ChunkID{}, fmt.Errorf("object %s is not a chunk", name) + var n string + if s.opt.Uncompressed { + if !strings.HasSuffix(name, UncompressedChunkExt) { + return ChunkID{}, fmt.Errorf("object %s is not a chunk", name) + } + n = strings.TrimSuffix(strings.TrimPrefix(name, s.prefix), UncompressedChunkExt) + } else { + if !strings.HasSuffix(name, CompressedChunkExt) { + return ChunkID{}, fmt.Errorf("object %s is not a chunk", name) + } + n = strings.TrimSuffix(strings.TrimPrefix(name, s.prefix), CompressedChunkExt) } - n := strings.TrimSuffix(strings.TrimPrefix(name, s.prefix), chunkFileExt) fragments := strings.Split(n, "/") if len(fragments) != 2 { return ChunkID{}, fmt.Errorf("incorrect chunk name for object %s", name) diff --git a/s3index.go b/s3index.go index f1b5981..d55e039 100644 --- a/s3index.go +++ b/s3index.go @@ -21,8 +21,8 @@ type S3IndexStore struct { // should be provided like this: s3+http://host:port/bucket // Credentials are passed in via the environment variables S3_ACCESS_KEY // and S3S3_SECRET_KEY, or via the desync config file. -func NewS3IndexStore(location *url.URL, s3Creds *credentials.Credentials, region string) (s S3IndexStore, e error) { - b, err := NewS3StoreBase(location, s3Creds, region) +func NewS3IndexStore(location *url.URL, s3Creds *credentials.Credentials, region string, opt StoreOptions) (s S3IndexStore, e error) { + b, err := NewS3StoreBase(location, s3Creds, region, opt) if err != nil { return s, err } diff --git a/selfseed_test.go b/selfseed_test.go index 6965bad..d644c4c 100644 --- a/selfseed_test.go +++ b/selfseed_test.go @@ -4,7 +4,6 @@ import ( "context" "crypto/md5" "crypto/rand" - "crypto/sha512" "io/ioutil" "os" "testing" @@ -18,7 +17,7 @@ func TestSelfSeed(t *testing.T) { } defer os.RemoveAll(store) - s, err := NewLocalStore(store) + s, err := NewLocalStore(store, StoreOptions{}) if err != nil { t.Fatal(err) } @@ -35,15 +34,11 @@ func TestSelfSeed(t *testing.T) { for i := 0; i < numChunks; i++ { b := make([]byte, size) rand.Read(b) - id := sha512.Sum512_256(b) - chunks[i] = rawChunk{id, b} - b, err := Compress(b) - if err != nil { - t.Fatal(err) - } - if err = s.StoreChunk(id, b); err != nil { + chunk := NewChunkFromUncompressed(b) + if err = s.StoreChunk(chunk); err != nil { t.Fatal(err) } + chunks[i] = rawChunk{chunk.ID(), b} } // Define tests with files with different content, by building files out diff --git a/sftp.go b/sftp.go index 1ecee1e..abe758a 100644 --- a/sftp.go +++ b/sftp.go @@ -25,6 +25,7 @@ type SFTPStoreBase struct { path string client *sftp.Client cancel context.CancelFunc + opt StoreOptions } // SFTPStore is a chunk store that uses SFTP over SSH. @@ -33,7 +34,7 @@ type SFTPStore struct { } // Creates a base sftp client -func newSFTPStoreBase(location *url.URL) (*SFTPStoreBase, error) { +func newSFTPStoreBase(location *url.URL, opt StoreOptions) (*SFTPStoreBase, error) { sshCmd := os.Getenv("CASYNC_SSH_PATH") if sshCmd == "" { sshCmd = "ssh" @@ -69,7 +70,7 @@ func newSFTPStoreBase(location *url.URL) (*SFTPStoreBase, error) { cancel() return nil, err } - return &SFTPStoreBase{location, path, client, cancel}, nil + return &SFTPStoreBase{location, path, client, cancel, opt}, nil } // StoreObject adds a new object to a writable index or chunk store. @@ -117,8 +118,8 @@ func (s *SFTPStoreBase) String() string { } // NewSFTPStore initializes a chunk store using SFTP over SSH. -func NewSFTPStore(location *url.URL) (*SFTPStore, error) { - b, err := newSFTPStoreBase(location) +func NewSFTPStore(location *url.URL, opt StoreOptions) (*SFTPStore, error) { + b, err := newSFTPStoreBase(location, opt) if err != nil { return nil, err } @@ -126,7 +127,7 @@ func NewSFTPStore(location *url.URL) (*SFTPStore, error) { } // GetChunk returns a chunk from an SFTP store, returns ChunkMissing if the file does not exist -func (s *SFTPStore) GetChunk(id ChunkID) ([]byte, error) { +func (s *SFTPStore) GetChunk(id ChunkID) (*Chunk, error) { name := s.nameFromID(id) f, err := s.client.Open(name) if err != nil { @@ -136,7 +137,14 @@ func (s *SFTPStore) GetChunk(id ChunkID) ([]byte, error) { return nil, err } defer f.Close() - return ioutil.ReadAll(f) + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, errors.Wrapf(err, "unable to read from %s", name) + } + if s.opt.Uncompressed { + return NewChunkWithID(id, b, nil, s.opt.SkipVerify) + } + return NewChunkWithID(id, nil, b, s.opt.SkipVerify) } // RemoveChunk deletes a chunk, typically an invalid one, from the filesystem. @@ -150,8 +158,21 @@ func (s *SFTPStore) RemoveChunk(id ChunkID) error { } // StoreChunk adds a new chunk to the store -func (s *SFTPStore) StoreChunk(id ChunkID, b []byte) error { - return s.StoreObject(s.nameFromID(id), bytes.NewReader(b)) +func (s *SFTPStore) StoreChunk(chunk *Chunk) error { + name := s.nameFromID(chunk.ID()) + var ( + b []byte + err error + ) + if s.opt.Uncompressed { + b, err = chunk.Uncompressed() + } else { + b, err = chunk.Compressed() + } + if err != nil { + return err + } + return s.StoreObject(name, bytes.NewReader(b)) } // HasChunk returns true if the chunk is in the store @@ -181,12 +202,25 @@ func (s *SFTPStore) Prune(ctx context.Context, ids map[ChunkID]struct{}) error { continue } path := walker.Path() - if !strings.HasSuffix(path, chunkFileExt) { // Skip files without chunk extension + if !strings.HasSuffix(path, CompressedChunkExt) { // Skip files without chunk extension continue } + // Skip compressed chunks if this is running in uncompressed mode and vice-versa + var sID string + if s.opt.Uncompressed { + if !strings.HasSuffix(path, UncompressedChunkExt) { + return nil + } + sID = strings.TrimSuffix(filepath.Base(path), UncompressedChunkExt) + } else { + if !strings.HasSuffix(path, CompressedChunkExt) { + return nil + } + sID = strings.TrimSuffix(filepath.Base(path), CompressedChunkExt) + } // Convert the name into a checksum, if that fails we're probably not looking // at a chunk file and should skip it. - id, err := ChunkIDFromString(strings.TrimSuffix(filepath.Base(path), ".cacnk")) + id, err := ChunkIDFromString(sID) if err != nil { continue } @@ -203,5 +237,11 @@ func (s *SFTPStore) Prune(ctx context.Context, ids map[ChunkID]struct{}) error { func (s *SFTPStore) nameFromID(id ChunkID) string { sID := id.String() - return s.path + sID[0:4] + "/" + sID + chunkFileExt + name := s.path + sID[0:4] + "/" + sID + if s.opt.Uncompressed { + name += UncompressedChunkExt + } else { + name += CompressedChunkExt + } + return name } diff --git a/sftpindex.go b/sftpindex.go index f7d3212..d2c486b 100644 --- a/sftpindex.go +++ b/sftpindex.go @@ -15,8 +15,8 @@ type SFTPIndexStore struct { } // NewSFTPIndexStore initializes and index store backed by SFTP over SSH. -func NewSFTPIndexStore(location *url.URL) (*SFTPIndexStore, error) { - b, err := newSFTPStoreBase(location) +func NewSFTPIndexStore(location *url.URL, opt StoreOptions) (*SFTPIndexStore, error) { + b, err := newSFTPStoreBase(location, opt) if err != nil { return nil, err } diff --git a/store.go b/store.go index 6b96d15..a4f17e8 100644 --- a/store.go +++ b/store.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "io" + "time" ) // Store is a generic interface implemented by read-only stores, like SSH or // HTTP remote stores currently. type Store interface { - GetChunk(id ChunkID) ([]byte, error) + GetChunk(id ChunkID) (*Chunk, error) HasChunk(id ChunkID) bool io.Closer fmt.Stringer @@ -19,7 +20,7 @@ type Store interface { // such as a local store or an S3 store. type WriteStore interface { Store - StoreChunk(id ChunkID, b []byte) error + StoreChunk(c *Chunk) error } // PruneStore is a store that supports pruning of chunks @@ -41,3 +42,34 @@ type IndexWriteStore interface { IndexStore StoreIndex(name string, idx Index) error } + +// StoreOptions provide additional common settings used in chunk stores, such as compression +// error retry or timeouts. Not all options available are applicable to all types of stores. +type StoreOptions struct { + // Concurrency used in the store. Depending on store type, it's used for + // the number of goroutines, processes, or connection pool size. + N int `json:"n,omitempty"` + + // Cert file name for HTTP SSL connections that require mutual SSL. + ClientCert string `json:"client-cert,omitempty"` + // Key file name for HTTP SSL connections that require mutual SSL. + ClientKey string `json:"client-key,omitempty"` + + // Timeout for waiting for objects to be retrieved. Default: 1 minute + Timeout time.Duration `json:"timeout,omitempty"` + + // Number of times object retrieval should be attempted on error. Useful when dealing + // with unreliable connections. Default: 0 + ErrorRetry int `json:"error-retry,omitempty"` + + // If SkipVerify is true, this store will not verfiy the data it reads and serves up. This is + // helpful when a store is merely a proxy and the data will pass through additional stores + // before being used. Verifying the checksum of a chunk requires it be uncompressed, so if + // a compressed chunkstore is being proxied, all chunks would have to be decompressed first. + // This setting avoids the extra overhead. While this could be used in other cases, it's not + // recommended as a damaged chunk might be processed further leading to unpredictable results. + SkipVerify bool `json:"skip-verify,omitempty"` + + // Store and read chunks uncompressed, without chunk file extension + Uncompressed bool `json:"uncompressed"` +} diff --git a/store_test.go b/store_test.go index 3a7a442..c89ff0a 100644 --- a/store_test.go +++ b/store_test.go @@ -2,12 +2,12 @@ package desync type TestStore map[ChunkID][]byte -func (s TestStore) GetChunk(id ChunkID) ([]byte, error) { +func (s TestStore) GetChunk(id ChunkID) (*Chunk, error) { b, ok := s[id] if !ok { return nil, ChunkMissing{id} } - return b, nil + return &Chunk{compressed: b}, nil } func (s TestStore) HasChunk(id ChunkID) bool { diff --git a/storerouter.go b/storerouter.go index a17336d..50cbf5a 100644 --- a/storerouter.go +++ b/storerouter.go @@ -24,12 +24,12 @@ func NewStoreRouter(stores ...Store) StoreRouter { // GetChunk queries the available stores in order and moves to the next if // it gets a ChunkMissing. Fails if any store returns a different error. -func (r StoreRouter) GetChunk(id ChunkID) ([]byte, error) { +func (r StoreRouter) GetChunk(id ChunkID) (*Chunk, error) { for _, s := range r.Stores { - b, err := s.GetChunk(id) + chunk, err := s.GetChunk(id) switch err.(type) { case nil: - return b, nil + return chunk, nil case ChunkMissing: continue default: diff --git a/untar.go b/untar.go index 960b51f..b44f78c 100644 --- a/untar.go +++ b/untar.go @@ -5,7 +5,6 @@ package desync import ( "bytes" "context" - "crypto/sha512" "fmt" "io" "os" @@ -201,37 +200,26 @@ func UnTarIndex(ctx context.Context, dst string, index Index, s Store, n int, op wg.Add(1) go func() { for r := range req { - // Pull the (compressed) chunk from the store - b, err := s.GetChunk(r.chunk.ID) + // Pull the chunk from the store + chunk, err := s.GetChunk(r.chunk.ID) if err != nil { recordError(err) close(r.data) continue } - // Since we know how big the chunk is supposed to be, pre-allocate a - // slice to decompress into - db := make([]byte, r.chunk.Size) - // The the chunk is compressed. Decompress it here - db, err = Decompress(db, b) + b, err := chunk.Uncompressed() if err != nil { - recordError(errors.Wrap(err, r.chunk.ID.String())) - close(r.data) - continue - } - // Verify the checksum of the chunk matches the ID - sum := sha512.Sum512_256(db) - if sum != r.chunk.ID { - recordError(fmt.Errorf("unexpected sha512/256 %s for chunk id %s", sum, r.chunk.ID)) + recordError(err) close(r.data) continue } // Might as well verify the chunk size while we're at it - if r.chunk.Size != uint64(len(db)) { + if r.chunk.Size != uint64(len(b)) { recordError(fmt.Errorf("unexpected size for chunk %s", r.chunk.ID)) close(r.data) continue } - r.data <- db + r.data <- b close(r.data) } wg.Done()