Skip to content

Commit

Permalink
Sparse mount support (#180)
Browse files Browse the repository at this point in the history
* Support copy-on-read file for better caching of mounted indexes

* Add missing files

* Add null-chunk optimization

* Store sparse file state next to the sparse file and load it on startup

* Don't clear existing sparse file if they're present already

* bitmap and SIGHUP handler for writing state (#173)

* bitmap and SIGHUP handler for writing state

* nicer way to write state file

* file permission

Co-authored-by: jbeguin <[email protected]>

* Contrib (#174)

* bitmap and SIGHUP handler for writing state

* nicer way to write state file

* file permission

* fix size comparaison for state file

Co-authored-by: jbeguin <[email protected]>
Co-authored-by: Frank Olbricht <[email protected]>

* Support pre-loading of chunks referenced in an init-state file

* Fix test and windows build

Co-authored-by: jbeguin <[email protected]>
Co-authored-by: jbeguin <[email protected]>
  • Loading branch information
3 people authored Dec 26, 2020
1 parent 73dcbb2 commit 00695e8
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 8 deletions.
48 changes: 45 additions & 3 deletions cmd/desync/mount-index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package main
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/folbricht/desync"
"github.com/pkg/errors"
Expand All @@ -18,6 +21,8 @@ type mountIndexOptions struct {
stores []string
cache string
storeFile string
corFile string
desync.SparseFileOptions
}

func newMountIndexCommand(ctx context.Context) *cobra.Command {
Expand All @@ -31,12 +36,21 @@ the index available for read access. Use 'extract' if the goal is to
assemble the whole blob locally as that is more efficient. Use '-' to read
the index from STDIN.
When a Copy-on-Read file is given (with -x), the file is used as a fast cache. All chunks
that are retrieved from the store are written into the file as read operations are
performed. Once all chunks have been accessed once, the COR file is fully populated. On
termination, a <name>.state file is written containing information about which chunks
of the index have or have not been read. A state file is only valid for one cache-file and
one index. When re-using it with a different index, data corruption can occur.
This command supports the --store-file option which can be used to define the stores
and caches in a JSON file. The config can then be reloaded by sending a SIGHUP without
needing to restart the server. This can be done under load as well.
`,
Example: ` desync mount-index -s http://192.168.1.1/ file.caibx /mnt/blob`,
Args: cobra.ExactArgs(2),
Example: ` desync mount-index -s http://192.168.1.1/ file.caibx /mnt/blob
desync mount-index -s /path/to/store -x /var/tmp/blob.cor blob.caibx /mnt/blob
`,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
return runMountIndex(ctx, opt, args)
},
Expand All @@ -46,6 +60,10 @@ needing to restart the server. This can be done under load as well.
flags.StringSliceVarP(&opt.stores, "store", "s", nil, "source store(s)")
flags.StringVarP(&opt.cache, "cache", "c", "", "store to be used as cache")
flags.StringVar(&opt.storeFile, "store-file", "", "read store arguments from a file, supports reload on SIGHUP")
flags.StringVarP(&opt.corFile, "cor-file", "", "", "use a copy-on-read sparse file as cache")
flags.StringVarP(&opt.StateSaveFile, "cor-state-save", "", "", "file to store the state for copy-on-read")
flags.StringVarP(&opt.StateInitFile, "cor-state-init", "", "", "copy-on-read state init file")
flags.IntVarP(&opt.StateInitConcurrency, "cor-init-n", "", 10, "number of gorooutines to use for initialization (with --cor-state-init)")
addStoreOptions(&opt.cmdStoreOptions, flags)
return cmd
}
Expand Down Expand Up @@ -95,8 +113,32 @@ func runMountIndex(ctx context.Context, opt mountIndexOptions, args []string) er
return err
}

// Pick a filesystem based on the options
var ifs desync.MountFS
if opt.corFile != "" {
fs, err := desync.NewSparseMountFS(idx, mountFName, s, opt.corFile, opt.SparseFileOptions)
if err != nil {
return err
}

// Save state file on SIGHUP
sighup := make(chan os.Signal)
signal.Notify(sighup, syscall.SIGHUP)
go func() {
for range sighup {
if err := fs.WriteState(); err != nil {
fmt.Fprintln(os.Stderr, "failed to save state:", err)
}
}
}()

ifs = fs
} else {
ifs = desync.NewIndexMountFS(idx, mountFName, s)
}

// Mount it
return desync.MountIndex(ctx, idx, mountPoint, mountFName, s, opt.n)
return desync.MountIndex(ctx, idx, ifs, mountPoint, s, opt.n)
}

// Reads the store-related command line options and returns the appropriate store.
Expand Down
2 changes: 1 addition & 1 deletion cmd/desync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/spf13/pflag"
)

// cmdStoreOptions are used to pass additional options to store initalization from the
// cmdStoreOptions are used to pass additional options to store initialization from the
// commandline. These generally override settings from the config file.
type cmdStoreOptions struct {
n int
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
cloud.google.com/go/storage v1.6.0
github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d
github.com/datadog/zstd v1.4.5
github.com/dchest/siphash v1.2.1
github.com/fatih/color v1.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d h1:zsO4lp+bjv5XvPTF58Vq+qgmZEYZttJK+CWtSZhKenI=
github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d/go.mod h1:f1iKL6ZhUWvbk7PdWVmOaak10o86cqMUYEmn1CZNGEI=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down
19 changes: 16 additions & 3 deletions mount-index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"github.com/hanwen/go-fuse/v2/fuse"
)

type MountFS interface {
fs.InodeEmbedder

Close() error
}

// IndexMountFS is used to FUSE mount an index file (as a blob, not an archive).
// It present a single file underneath the mountpoint.
type IndexMountFS struct {
Expand All @@ -26,6 +32,7 @@ type IndexMountFS struct {
}

var _ fs.NodeOnAdder = &IndexMountFS{}
var _ MountFS = &IndexMountFS{}

// NewIndexMountFS initializes a FUSE filesystem mount based on an index and a chunk store.
func NewIndexMountFS(idx Index, name string, s Store) *IndexMountFS {
Expand All @@ -47,6 +54,10 @@ func (r *IndexMountFS) OnAdd(ctx context.Context) {
r.AddChild(r.FName, ch, false)
}

func (r *IndexMountFS) Close() error {
return nil
}

var _ fs.NodeGetattrer = &indexFile{}
var _ fs.NodeOpener = &indexFile{}

Expand Down Expand Up @@ -109,15 +120,17 @@ func (f *indexFileHandle) read(dest []byte, off int64) (fuse.ReadResult, syscall

// MountIndex mounts an index file under a FUSE mount point. The mount will only expose a single
// blob file as represented by the index.
func MountIndex(ctx context.Context, idx Index, path, name string, s Store, n int) error {
ifs := NewIndexMountFS(idx, name, s)
func MountIndex(ctx context.Context, idx Index, ifs MountFS, path string, s Store, n int) error {
opts := &fs.Options{}
server, err := fs.Mount(path, ifs, opts)
if err != nil {
return err
}
go func() { // Unmount the server when the contex expires
go func() { // Unmount the server when the context expires
<-ctx.Done()
if err := ifs.Close(); err != nil {
fmt.Fprintln(os.Stderr, "error during unmount:", err)
}
server.Unmount()
}()
server.Wait()
Expand Down
3 changes: 2 additions & 1 deletion mount-index_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func TestMountIndex(t *testing.T) {

// Start the Fuse mount
go func() {
MountIndex(ctx, index, mnt, "blob1", s, 10)
ifs := NewIndexMountFS(index, "blob1", s)
MountIndex(ctx, index, ifs, mnt, s, 10)
wg.Done()
}()

Expand Down
98 changes: 98 additions & 0 deletions mount-sparse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// +build !windows

package desync

import (
"context"
"io"
"syscall"
"time"

"github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
)

// SparseMountFS is used to FUSE mount an index file (as a blob, not an archive).
// It uses a (local) sparse file as cache to improve performance. Every chunk that
// is being read is written into the sparse file
type SparseMountFS struct {
fs.Inode

FName string // File name in the mountpoint
sf *SparseFile
}

var _ fs.NodeOnAdder = &SparseMountFS{}
var _ MountFS = &SparseMountFS{}

// NewSparseMountFS initializes a FUSE filesystem mount based on an index, a sparse file and a chunk store.
func NewSparseMountFS(idx Index, name string, s Store, sparseFile string, opt SparseFileOptions) (*SparseMountFS, error) {
sf, err := NewSparseFile(sparseFile, idx, s, opt)
if err != nil {
return nil, err
}
return &SparseMountFS{
FName: name,
sf: sf,
}, err
}

// OnAdd is used to build the static filesystem structure at the start of the mount.
func (r *SparseMountFS) OnAdd(ctx context.Context) {
n := &sparseIndexFile{
sf: r.sf,
mtime: time.Now(),
size: r.sf.Length(),
}
ch := r.NewPersistentInode(ctx, n, fs.StableAttr{Mode: fuse.S_IFREG})
r.AddChild(r.FName, ch, false)
}

// Save the state of the sparse file.
func (r *SparseMountFS) WriteState() error {
return r.sf.WriteState()
}

// Close the sparse file and save its state.
func (r *SparseMountFS) Close() error {
return r.sf.WriteState()
}

var _ fs.NodeGetattrer = &indexFile{}
var _ fs.NodeOpener = &indexFile{}

type sparseIndexFile struct {
fs.Inode
sf *SparseFile
size int64
mtime time.Time
}

func (n *sparseIndexFile) Open(ctx context.Context, flags uint32) (fs.FileHandle, uint32, syscall.Errno) {
fh, err := n.sf.Open()
if err != nil {
Log.WithError(err).Error("failed to open sparse file")
return fh, fuse.FOPEN_KEEP_CACHE, syscall.EIO
}
return fh, fuse.FOPEN_KEEP_CACHE, fs.OK
}

func (n *sparseIndexFile) Read(ctx context.Context, fh fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
f := fh.(*SparseFileHandle)
length, err := f.ReadAt(dest, off)
if err != nil {
if err == io.EOF {
return fuse.ReadResultData(dest[:length]), fs.OK
}
Log.WithError(err).Error("failed to read sparse file")
return fuse.ReadResultData(dest[:length]), syscall.EIO
}
return fuse.ReadResultData(dest[:length]), fs.OK
}

func (n *sparseIndexFile) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
out.Mode = fuse.S_IFREG | 0444
out.Size = uint64(n.size)
out.Mtime = uint64(n.mtime.Unix())
return fs.OK
}
Loading

0 comments on commit 00695e8

Please sign in to comment.