Skip to content

Commit f00f9df

Browse files
Small improvements (#4)
* mount blobfs on client side * force unmount first * make more fuse settings configurable * update default config * set creation/access times on blobfs nodes * force unmount juicefs * mount blobfs on client side * force unmount first * make more fuse settings configurable * update default config * set creation/access times on blobfs nodes * force unmount juicefs
1 parent be1b3d1 commit f00f9df

File tree

9 files changed

+90
-44
lines changed

9 files changed

+90
-44
lines changed

hack/okteto.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ dev:
66
- ../:/workspace
77
environment:
88
PATH: ${PATH}:/usr/local/go/bin:/workspace/bin
9-
BUILD_COMMAND: "umount -l /cache; go build -o /workspace/bin/blobcache /workspace/cmd/main.go"
9+
BUILD_COMMAND: "go build -o /workspace/bin/blobcache /workspace/cmd/main.go"
1010
BUILD_BINARY_PATH: "/workspace/bin/blobcache"
1111
CONFIG_PATH: /workspace/config.yaml
1212
forward:

pkg/blobfs.go

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"fmt"
99
"log"
1010
"os"
11+
"os/exec"
12+
"strings"
1113
"time"
1214

1315
"github.com/hanwen/go-fuse/v2/fs"
@@ -60,6 +62,7 @@ type BlobFsSystemOpts struct {
6062
Verbose bool
6163
Metadata *BlobCacheMetadata
6264
Config BlobCacheConfig
65+
Client *BlobCacheClient
6366
}
6467

6568
type BlobFs struct {
@@ -82,37 +85,42 @@ func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan err
8285
}
8386

8487
Logger.Info("Mount point directory created.")
88+
} else if isFuseMount(mountPoint) {
89+
if err := forceUnmount(mountPoint); err != nil {
90+
return nil, nil, fmt.Errorf("failed to unmount existing FUSE mount: %v", err)
91+
}
8592
}
8693

8794
blobfs, err := NewFileSystem(ctx, opts)
8895
if err != nil {
8996
return nil, nil, fmt.Errorf("could not create filesystem: %v", err)
9097
}
9198

92-
for _, sourceConfig := range opts.Config.BlobFs.Sources {
93-
_, err := NewSource(sourceConfig)
94-
if err != nil {
95-
Logger.Errorf("Failed to configure content source: %+v\n", err)
96-
continue
97-
}
98-
99-
Logger.Infof("Configured and mounted source: %+v\n", sourceConfig.FilesystemName)
100-
}
101-
10299
root, _ := blobfs.Root()
103100
attrTimeout := time.Second * 60
104101
entryTimeout := time.Second * 60
105102
fsOptions := &fs.Options{
106103
AttrTimeout: &attrTimeout,
107104
EntryTimeout: &entryTimeout,
108105
}
106+
107+
maxReadAheadKB := opts.Config.BlobFs.MaxReadAheadKB
108+
if maxReadAheadKB <= 0 {
109+
maxReadAheadKB = 128
110+
}
111+
112+
maxBackgroundTasks := opts.Config.BlobFs.MaxBackgroundTasks
113+
if maxBackgroundTasks <= 0 {
114+
maxBackgroundTasks = 512
115+
}
116+
109117
server, err := fuse.NewServer(fs.NewNodeFS(root, fsOptions), mountPoint, &fuse.MountOptions{
110-
MaxBackground: 512,
118+
MaxBackground: maxBackgroundTasks,
111119
DisableXAttrs: true,
112120
EnableSymlinkCaching: true,
113121
SyncRead: false,
114122
RememberInodes: true,
115-
MaxReadAhead: 1 << 17,
123+
MaxReadAhead: maxReadAheadKB * 1024,
116124
})
117125
if err != nil {
118126
return nil, nil, fmt.Errorf("could not create server: %v", err)
@@ -142,17 +150,12 @@ func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan err
142150
func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error) {
143151
metadata := opts.Metadata
144152

145-
client, err := NewBlobCacheClient(ctx, opts.Config)
146-
if err != nil {
147-
return nil, err
148-
}
149-
150153
bfs := &BlobFs{
151-
verbose: opts.Verbose,
152154
ctx: ctx,
155+
verbose: opts.Verbose,
153156
Config: opts.Config,
157+
Client: opts.Client,
154158
Metadata: metadata,
155-
Client: client,
156159
}
157160

158161
rootID := GenerateFsID("/")
@@ -199,3 +202,20 @@ func (bfs *BlobFs) Root() (fs.InodeEmbedder, error) {
199202
}
200203
return bfs.root, nil
201204
}
205+
206+
func isFuseMount(mountPoint string) bool {
207+
cmd := exec.Command("findmnt", "-n", "-o", "FSTYPE", mountPoint)
208+
output, err := cmd.Output()
209+
if err != nil {
210+
return false
211+
}
212+
return strings.Contains(string(output), "fuse")
213+
}
214+
215+
func forceUnmount(mountPoint string) error {
216+
cmd := exec.Command("fusermount", "-uz", mountPoint)
217+
if _, err := cmd.CombinedOutput(); err != nil {
218+
return err
219+
}
220+
return nil
221+
}

pkg/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,23 @@ func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheCli
9191
// Monitor and cleanup local client cache
9292
go bc.manageLocalClientCache(localClientCacheCleanupInterval, localClientCacheTTL)
9393

94+
// Mount cache as a FUSE filesystem if blobfs is enabled
95+
if cfg.BlobFs.Enabled {
96+
startServer, _, err := Mount(ctx, BlobFsSystemOpts{
97+
Config: cfg,
98+
Metadata: metadata,
99+
Client: bc,
100+
})
101+
if err != nil {
102+
return nil, err
103+
}
104+
105+
err = startServer()
106+
if err != nil {
107+
return nil, err
108+
}
109+
}
110+
94111
return bc, nil
95112
}
96113

pkg/config.default.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ discoveryIntervalS: 5
1010
blobfs:
1111
enabled: false
1212
mountPoint: /tmp/test
13+
maxBackgroundTasks: 512
14+
maxReadAheadKB: 128
1315
tailscale:
1416
controlUrl:
1517
user:

pkg/discovery.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,12 @@ func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobC
142142
host.CapacityUsagePct = float64(resp.GetCapacityUsagePct())
143143

144144
if resp.PrivateIpAddr != "" {
145-
Logger.Debugf("Host found with private addr: %s", resp.PrivateIpAddr)
146-
147145
privateAddr := fmt.Sprintf("%s:%d", resp.PrivateIpAddr, d.cfg.Port)
148146
privateConn, privateErr := DialWithTimeout(ctx, privateAddr)
149147
if privateErr == nil {
150148
privateConn.Close()
151149
host.PrivateAddr = privateAddr
152150
host.RTT = time.Duration(0)
153-
Logger.Debugf("Connected to host using private addr: %s", privateAddr)
154151
}
155152
}
156153

pkg/metadata.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"path/filepath"
77
"strings"
8+
"time"
89

910
mapset "github.com/deckarep/golang-set/v2"
1011
"github.com/hanwen/go-fuse/v2/fuse"
@@ -134,13 +135,23 @@ func (m *BlobCacheMetadata) StoreContentInBlobFs(ctx context.Context, path strin
134135
return err
135136
}
136137

138+
now := time.Now()
139+
nowSec := uint64(now.Unix())
140+
nowNsec := uint32(now.Nanosecond())
141+
137142
metadata := &BlobFsMetadata{
138-
PID: previousParentId,
139-
ID: currentNodeId,
140-
Name: part,
141-
Path: currentPath,
142-
Ino: inode,
143-
Mode: fuse.S_IFDIR | 0755,
143+
PID: previousParentId,
144+
ID: currentNodeId,
145+
Name: part,
146+
Path: currentPath,
147+
Ino: inode,
148+
Mode: fuse.S_IFDIR | 0755,
149+
Atime: nowSec,
150+
Mtime: nowSec,
151+
Ctime: nowSec,
152+
Atimensec: nowNsec,
153+
Mtimensec: nowNsec,
154+
Ctimensec: nowNsec,
144155
}
145156

146157
// Since this is the last file, store as a file, not a dir

pkg/server.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,14 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, e
6161

6262
// Mount cache as a FUSE filesystem if blobfs is enabled
6363
if cfg.BlobFs.Enabled {
64-
startServer, _, err := Mount(ctx, BlobFsSystemOpts{
65-
Config: cfg,
66-
Metadata: metadata,
67-
})
68-
if err != nil {
69-
return nil, err
70-
}
71-
72-
err = startServer()
73-
if err != nil {
74-
return nil, err
64+
for _, sourceConfig := range cfg.BlobFs.Sources {
65+
_, err := NewSource(sourceConfig)
66+
if err != nil {
67+
Logger.Errorf("Failed to configure content source: %+v\n", err)
68+
continue
69+
}
70+
71+
Logger.Infof("Configured and mounted source: %+v\n", sourceConfig.FilesystemName)
7572
}
7673
}
7774

pkg/source_juicefs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (s *JuiceFsSource) Format(fsName string) error {
119119
}
120120

121121
func (s *JuiceFsSource) Unmount(localPath string) error {
122-
cmd := exec.Command("juicefs", "umount", localPath)
122+
cmd := exec.Command("juicefs", "umount", "--force", localPath)
123123

124124
output, err := cmd.CombinedOutput()
125125
if err != nil {

pkg/types.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,11 @@ type RedisConfig struct {
7373
}
7474

7575
type BlobFsConfig struct {
76-
Enabled bool `key:"enabled" json:"enabled"`
77-
MountPoint string `key:"mountPoint" json:"mount_point"`
78-
Sources []SourceConfig `key:"sources" json:"sources"`
76+
Enabled bool `key:"enabled" json:"enabled"`
77+
MountPoint string `key:"mountPoint" json:"mount_point"`
78+
Sources []SourceConfig `key:"sources" json:"sources"`
79+
MaxBackgroundTasks int `key:"maxBackgroundTasks" json:"max_background_tasks"`
80+
MaxReadAheadKB int `key:"maxReadAheadKB" json:"max_read_ahead_kb"`
7981
}
8082

8183
type SourceConfig struct {

0 commit comments

Comments
 (0)