Skip to content

Commit

Permalink
Refactoring to support storage modifiers (#181)
Browse files Browse the repository at this point in the history
* Chunk encryption support

* Update dependencies

* Update go version in GH workflow
  • Loading branch information
folbricht authored Dec 27, 2020
1 parent 00695e8 commit e528127
Show file tree
Hide file tree
Showing 31 changed files with 645 additions and 440 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/validate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- uses: actions/checkout@v1
- uses: actions/setup-go@v2
with:
go-version: '^1.14.1'
go-version: '^1.15.6'

- uses: actions/cache@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
if err != nil {
return err
}
b, err := chunk.Uncompressed()
b, err := chunk.Data()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion chop.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ func readChunkFromFile(f *os.File, c IndexChunk) (*Chunk, error) {
if _, err = io.ReadFull(f, b); err != nil {
return nil, err
}
return NewChunkWithID(c.ID, b, nil, false)
return NewChunkWithID(c.ID, b, false)
}
69 changes: 36 additions & 33 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,31 @@ import (
"errors"
)

// Chunk holds chunk data compressed, uncompressed, or both. If a chunk is created
// from compressed data, such as read from a compressed chunk store, and later the
// application requires the uncompressed data, it'll be decompressed on demand and
// also stored in Chunk. The same happens when the Chunk is made from uncompressed
// bytes and then to be stored in a compressed form.
// Chunk holds chunk data plain, storage format, or both. If a chunk is created
// from storage data, such as read from a compressed chunk store, and later the
// application requires the plain data, it'll be converted on demand by applying
// the given storage converters in reverse order. The converters can only be used
// to read the plain data, not to convert back to storage format.
type Chunk struct {
compressed, uncompressed []byte
id ChunkID
idCalculated bool
data []byte // Plain data if available
storage []byte // Storage format (compressed, encrypted, etc)
converters Converters // Modifiers to convert from storage format to plain
id ChunkID
idCalculated bool
}

// NewChunkFromUncompressed creates a new chunk from uncompressed data.
func NewChunkFromUncompressed(b []byte) *Chunk {
return &Chunk{uncompressed: b}
// NewChunk creates a new chunk from plain data. The data is trusted and the ID is
// calculated on demand.
func NewChunk(b []byte) *Chunk {
return &Chunk{data: b}
}

// NewChunkWithID creates a new chunk from either compressed or uncompressed data
// (or both if available). It also expects an ID and validates that it matches
// the uncompressed data unless skipVerify is true. If called with just compressed
// data, it'll decompress it for the ID validation.
func NewChunkWithID(id ChunkID, uncompressed, compressed []byte, skipVerify bool) (*Chunk, error) {
c := &Chunk{id: id, uncompressed: uncompressed, compressed: compressed}
func NewChunkWithID(id ChunkID, b []byte, skipVerify bool) (*Chunk, error) {
c := &Chunk{id: id, data: b}
if skipVerify {
c.idCalculated = true // Pretend this was calculated. No need to re-calc later
return c, nil
Expand All @@ -37,32 +40,33 @@ func NewChunkWithID(id ChunkID, uncompressed, compressed []byte, skipVerify bool
return c, nil
}

// Compressed returns the chunk data in compressed form. If the chunk was created
// with uncompressed data only, it'll be compressed, stored and returned. The
// caller must not modify the data in the returned slice.
func (c *Chunk) Compressed() ([]byte, error) {
if len(c.compressed) > 0 {
return c.compressed, nil
// NewChunkFromStorage builds a new chunk from data that is not in plain format.
// It uses raw storage format from it source and the modifiers are used to convert
// into plain data as needed.
func NewChunkFromStorage(id ChunkID, b []byte, modifiers Converters, skipVerify bool) (*Chunk, error) {
c := &Chunk{id: id, storage: b, converters: modifiers}
if skipVerify {
c.idCalculated = true // Pretend this was calculated. No need to re-calc later
return c, nil
}
if len(c.uncompressed) > 0 {
var err error
c.compressed, err = Compress(c.uncompressed)
return c.compressed, err
sum := c.ID()
if sum != id {
return nil, ChunkInvalid{ID: id, Sum: sum}
}
return nil, errors.New("no data in chunk")
return c, nil
}

// Uncompressed returns the chunk data in uncompressed form. If the chunk was created
// Data returns the chunk data in uncompressed form. If the chunk was created
// with compressed data only, it'll be decompressed, stored and returned. The
// caller must not modify the data in the returned slice.
func (c *Chunk) Uncompressed() ([]byte, error) {
if len(c.uncompressed) > 0 {
return c.uncompressed, nil
func (c *Chunk) Data() ([]byte, error) {
if len(c.data) > 0 {
return c.data, nil
}
if len(c.compressed) > 0 {
if len(c.storage) > 0 {
var err error
c.uncompressed, err = Decompress(nil, c.compressed)
return c.uncompressed, err
c.data, err = c.converters.fromStorage(c.storage)
return c.data, err
}
return nil, errors.New("no data in chunk")
}
Expand All @@ -71,11 +75,10 @@ func (c *Chunk) Uncompressed() ([]byte, error) {
// after the first call and doesn't need to be re-calculated. Note that calculating
// the ID may mean decompressing the data first.
func (c *Chunk) ID() ChunkID {

if c.idCalculated {
return c.id
}
b, err := c.Uncompressed()
b, err := c.Data()
if err != nil {
return ChunkID{}
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/desync/chunkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ func runChunkServer(ctx context.Context, opt chunkServerOptions, args []string)
}
defer s.Close()

handler := desync.NewHTTPHandler(s, opt.writable, opt.skipVerifyWrite, opt.uncompressed, opt.auth)
var converters desync.Converters
if !opt.uncompressed {
converters = desync.Converters{desync.Compressor{}}
}

handler := desync.NewHTTPHandler(s, opt.writable, opt.skipVerifyWrite, converters, opt.auth)

// Wrap the handler in a logger if requested
switch opt.logFile {
Expand Down
37 changes: 12 additions & 25 deletions cmd/desync/chunkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
Expand All @@ -16,22 +15,19 @@ import (
)

func TestChunkServerReadCommand(t *testing.T) {
outdir := t.TempDir()

// Start a read-only server
addr, cancel := startChunkServer(t, "-s", "testdata/blob1.store")
defer cancel()
store := fmt.Sprintf("http://%s/", addr)

outdir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(outdir)
blob := filepath.Join(outdir, "blob")

// Run an "extract" command to confirm the chunk server provides chunks
extractCmd := newExtractCommand(context.Background())
extractCmd.SetArgs([]string{"-s", store, "testdata/blob1.caibx", blob})
extractCmd.SetArgs([]string{"-s", store, "testdata/blob1.caibx", filepath.Join(outdir, "blob")})
stdout = ioutil.Discard
extractCmd.SetOutput(ioutil.Discard)
_, err = extractCmd.ExecuteC()
_, err := extractCmd.ExecuteC()
require.NoError(t, err)

// The server should not be serving up arbitrary files from disk. Expect a 400 error
Expand All @@ -57,10 +53,7 @@ func TestChunkServerReadCommand(t *testing.T) {
}

func TestChunkServerWriteCommand(t *testing.T) {
// Create a blank store
outdir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(outdir)
outdir := t.TempDir()

// Start a (writable) server
addr, cancel := startChunkServer(t, "-s", outdir, "-w")
Expand All @@ -71,7 +64,7 @@ func TestChunkServerWriteCommand(t *testing.T) {
chopCmd := newChopCommand(context.Background())
chopCmd.SetArgs([]string{"-s", store, "testdata/blob1.caibx", "testdata/blob1"})
chopCmd.SetOutput(ioutil.Discard)
_, err = chopCmd.ExecuteC()
_, err := chopCmd.ExecuteC()
require.NoError(t, err)

// The server should not accept arbitrary (non-chunk) files.
Expand All @@ -82,9 +75,7 @@ func TestChunkServerWriteCommand(t *testing.T) {
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}
func TestChunkServerVerifiedTLS(t *testing.T) {
outdir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(outdir)
outdir := t.TempDir()

// Start a (writable) server
addr, cancel := startChunkServer(t, "-s", "testdata/blob1.store", "--key", "testdata/server.key", "--cert", "testdata/server.crt")
Expand All @@ -96,14 +87,12 @@ func TestChunkServerVerifiedTLS(t *testing.T) {
extractCmd := newExtractCommand(context.Background())
extractCmd.SetArgs([]string{"--ca-cert", "testdata/ca.crt", "-s", store, "testdata/blob1.caibx", filepath.Join(outdir, "blob1")})
extractCmd.SetOutput(ioutil.Discard)
_, err = extractCmd.ExecuteC()
_, err := extractCmd.ExecuteC()
require.NoError(t, err)
}

func TestChunkServerInsecureTLS(t *testing.T) {
outdir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(outdir)
outdir := t.TempDir()

stderr = ioutil.Discard
stdout = ioutil.Discard
Expand All @@ -118,7 +107,7 @@ func TestChunkServerInsecureTLS(t *testing.T) {
extractCmd := newExtractCommand(context.Background())
extractCmd.SetArgs([]string{"-t", "-s", store, "testdata/blob1.caibx", filepath.Join(outdir, "blob1")})
// extractCmd.SetOutput(ioutil.Discard)
_, err = extractCmd.ExecuteC()
_, err := extractCmd.ExecuteC()
require.NoError(t, err)

// Run the "extract" command without accepting any cert. Should fail.
Expand All @@ -132,9 +121,7 @@ func TestChunkServerInsecureTLS(t *testing.T) {
}

func TestChunkServerMutualTLS(t *testing.T) {
outdir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(outdir)
outdir := t.TempDir()

stderr = ioutil.Discard
stdout = ioutil.Discard
Expand All @@ -158,7 +145,7 @@ func TestChunkServerMutualTLS(t *testing.T) {
"--client-cert", "testdata/client.crt",
"--ca-cert", "testdata/ca.crt",
"-s", store, "testdata/blob1.caibx", filepath.Join(outdir, "blob1")})
_, err = extractCmd.ExecuteC()
_, err := extractCmd.ExecuteC()
require.NoError(t, err)

// Same without client certs, should fail.
Expand Down
97 changes: 97 additions & 0 deletions coverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package desync

// Converters are modifiers for chunk data, such as compression or encryption.
// They are used to prepare chunk data for storage, or to read it from storage.
// The order of the conversion layers matters. When plain data is prepared for
// storage, the toStorage method is used in the order the layers are defined.
// To read from storage, the fromStorage method is called for each layer in
// reverse order.
type Converters []converter

// Apply every data converter in the forward direction.
func (s Converters) toStorage(in []byte) ([]byte, error) {
var (
b = in
err error
)
for _, layer := range s {
b, err = layer.toStorage(b)
if err != nil {
return nil, err
}
}
return b, nil
}

// Apply the layers backwards.
func (s Converters) fromStorage(in []byte) ([]byte, error) {
var (
b = in
err error
)
for i := len(s) - 1; i >= 0; i-- {
b, err = s[i].fromStorage(b)
if err != nil {
return nil, err
}
}
return b, nil
}

// Returns true is conversion involves compression. Typically
// used to determine the correct file-extension.
func (s Converters) hasCompression() bool {
for _, layer := range s {
if _, ok := layer.(Compressor); ok {
return true
}
}
return false
}

// Returns true if both converters have the same layers in the
// same order. Used for optimizations.
func (s Converters) equal(c Converters) bool {
if len(s) != len(c) {
return false
}
for i := 0; i < len(s); i++ {
if !s[i].equal(c[i]) {
return false
}
}
return true
}

// converter is a storage data modifier layer.
type converter interface {
// Convert data from it's original form to storage format.
// The input could be plain data, or the output of a prior
// converter.
toStorage([]byte) ([]byte, error)

// Convert data from it's storage format towards it's plain
// form. The input could be encrypted or compressed, while
// the output may be used for the next conversion layer.
fromStorage([]byte) ([]byte, error)

equal(converter) bool
}

// Compression layer
type Compressor struct{}

var _ converter = Compressor{}

func (d Compressor) toStorage(in []byte) ([]byte, error) {
return Compress(in)
}

func (d Compressor) fromStorage(in []byte) ([]byte, error) {
return Decompress(nil, in)
}

func (d Compressor) equal(c converter) bool {
_, ok := c.(Compressor)
return ok
}
2 changes: 1 addition & 1 deletion dedupqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestDedupQueueParallel(t *testing.T) {
GetChunkFunc: func(ChunkID) (*Chunk, error) {
time.Sleep(time.Millisecond) // make it artificially slow to not complete too early
atomic.AddInt64(&requests, 1)
return NewChunkFromUncompressed([]byte{0}), nil
return NewChunk([]byte{0}), nil
},
}
q := NewDedupQueue(store)
Expand Down
Loading

0 comments on commit e528127

Please sign in to comment.