From 10a753bfb6addfd802d055cade7ca0ed95a0f657 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 23 Nov 2022 01:24:50 +0300 Subject: [PATCH] Add I/O statistics printer --- api/common/config.go | 2 + internal/file.go | 27 ++++++----- internal/flags.go | 7 +++ internal/goofys.go | 110 ++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 132 insertions(+), 14 deletions(-) diff --git a/api/common/config.go b/api/common/config.go index 1f828641..6c554b23 100644 --- a/api/common/config.go +++ b/api/common/config.go @@ -96,6 +96,8 @@ type FlagStorage struct { DebugS3 bool Foreground bool LogFile string + + StatsInterval time.Duration } func (flags *FlagStorage) GetMimeType(fileName string) (retMime *string) { diff --git a/internal/file.go b/internal/file.go index 8b36ea57..421a0e11 100644 --- a/internal/file.go +++ b/internal/file.go @@ -560,7 +560,7 @@ func (inode *Inode) OpenCacheFD() error { // Load some inode data into memory // Must be called with inode.mu taken // Loaded range should be guarded against eviction by adding it into inode.readRanges -func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, ignoreMemoryLimit bool) (requestErr error) { +func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, ignoreMemoryLimit bool) (miss bool, requestErr error) { end := offset+readAheadSize if size > readAheadSize { @@ -606,7 +606,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, } else if b.state == BUF_FL_CLEARED { // Buffer is saved as a part and then removed // We must complete multipart upload to be able to read it back - return syscall.ESPIPE + return true, syscall.ESPIPE } pos = b.offset+b.length } @@ -713,7 +713,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, if len(diskRequests) > 0 { if err := inode.OpenCacheFD(); err != nil { - return err + return true, err } loadedFromDisk := uint64(0) for i := 0; i < len(diskRequests); i += 2 { @@ -722,7 +722,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, data := make([]byte, requestSize) _, err := inode.DiskCacheFD.ReadAt(data, int64(requestOffset)) if err != nil { - return err + return true, err } pos := locateBuffer(inode.buffers, requestOffset) var ib *FileBuffer @@ -754,6 +754,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, return } + miss = true end = offset+size for { // Check if all buffers are loaded or if there is a read error @@ -910,8 +911,8 @@ func (inode *Inode) IsRangeLocked(offset uint64, size uint64, onlyFlushing bool) return false } -func (inode *Inode) CheckLoadRange(offset, size, readAheadSize uint64, ignoreMemoryLimit bool) error { - err := inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit) +func (inode *Inode) CheckLoadRange(offset, size, readAheadSize uint64, ignoreMemoryLimit bool) (bool, error) { + miss, err := inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit) if err == syscall.ESPIPE { // Finalize multipart upload to get some flushed data back // We have to flush all parts that extend the file up until the last flushed part @@ -933,7 +934,7 @@ func (inode *Inode) CheckLoadRange(offset, size, readAheadSize uint64, ignoreMem err = inode.SyncFile() inode.mu.Lock() if err == nil { - err = inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit) + _, err = inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit) } } inode.pauseWriters-- @@ -941,7 +942,7 @@ func (inode *Inode) CheckLoadRange(offset, size, readAheadSize uint64, ignoreMem inode.readCond.Broadcast() } } - return err + return miss, err } func appendZero(data [][]byte, zeroBuf []byte, zeroLen int) [][]byte { @@ -1022,7 +1023,6 @@ func (fh *FileHandle) ReadFile(sOffset int64, sLen int64) (data [][]byte, bytesR defer fh.inode.UnlockRange(offset, end-offset, false) // Check if anything requires to be loaded from the server - var requestErr error ra := fh.inode.fs.flags.ReadAheadKB*1024 if fh.seqReadSize >= fh.inode.fs.flags.LargeReadCutoffKB*1024 { // Use larger readahead with 'pipelining' @@ -1038,7 +1038,10 @@ func (fh *FileHandle) ReadFile(sOffset int64, sLen int64) (data [][]byte, bytesR if ra+end > maxFileSize { ra = 0 } - requestErr = fh.inode.CheckLoadRange(offset, end-offset, ra, false) + miss, requestErr := fh.inode.CheckLoadRange(offset, end-offset, ra, false) + if !miss { + atomic.AddInt64(&fh.inode.fs.stats.readHits, 1) + } mappedErr := mapAwsError(requestErr) if requestErr != nil { err = requestErr @@ -1676,7 +1679,7 @@ func (inode *Inode) FlushSmallObject() { inode.LockRange(0, sz, true) if inode.CacheState == ST_MODIFIED { - err := inode.LoadRange(0, sz, 0, true) + _, err := inode.LoadRange(0, sz, 0, true) mappedErr := mapAwsError(err) if mappedErr == fuse.ENOENT || mappedErr == syscall.ERANGE { // Object is deleted or resized remotely (416). Discard local version @@ -1873,7 +1876,7 @@ func (inode *Inode) FlushPart(part uint64) { // Ignore memory limit to not produce a deadlock when we need to free some memory // by flushing objects, but we can't flush a part without allocating more memory // for read-modify-write... - err := inode.LoadRange(partOffset, partSize, 0, true) + _, err := inode.LoadRange(partOffset, partSize, 0, true) if err == syscall.ESPIPE { // Part is partly evicted, we can't flush it return diff --git a/internal/flags.go b/internal/flags.go index c6ba5a4c..400a0572 100644 --- a/internal/flags.go +++ b/internal/flags.go @@ -519,6 +519,12 @@ func NewApp() (app *cli.App) { Usage: "Redirect logs to file, 'stderr' (default for foreground) or 'syslog' (default for background).", Value: "", }, + + cli.DurationFlag{ + Name: "print-stats", + Value: 30 * time.Second, + Usage: "I/O statistics printing interval. Set to 0 to disable.", + }, } app = &cli.App{ @@ -699,6 +705,7 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) { DebugS3: c.Bool("debug_s3"), Foreground: c.Bool("f"), LogFile: c.String("log-file"), + StatsInterval: c.Duration("print-stats"), } flags.PartSizes = parsePartSizes(c.String("part-sizes")) diff --git a/internal/goofys.go b/internal/goofys.go index 545c572a..19204768 100644 --- a/internal/goofys.go +++ b/internal/goofys.go @@ -110,6 +110,19 @@ type Goofys struct { diskFdMu sync.Mutex diskFdCond *sync.Cond diskFdCount int64 + + stats OpStats +} + +type OpStats struct { + reads int64 + readHits int64 + writes int64 + flushes int64 + metadataReads int64 + metadataWrites int64 + noops int64 + ts time.Time } var s3Log = GetLogger("s3") @@ -196,6 +209,9 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage, zeroBuf: make([]byte, 1048576), inflightChanges: make(map[string]int), inflightListings: make(map[int]map[string]bool), + stats: OpStats{ + ts: time.Now(), + }, } var prefix string @@ -264,6 +280,9 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage, fs.flusherCond = sync.NewCond(&fs.flusherMu) go fs.Flusher() + if fs.flags.StatsInterval > 0 { + go fs.StatPrinter() + } if fs.flags.CachePath != "" && fs.flags.MaxDiskCacheFD > 0 { fs.diskFdCond = sync.NewCond(&fs.diskFdMu) @@ -320,6 +339,38 @@ func (fs *Goofys) getInodeOrDie(id fuseops.InodeID) (inode *Inode) { return } +func (fs *Goofys) StatPrinter() { + for { + time.Sleep(fs.flags.StatsInterval) + now := time.Now() + d := now.Sub(fs.stats.ts).Seconds() + reads := atomic.SwapInt64(&fs.stats.reads, 0) + readHits := atomic.SwapInt64(&fs.stats.readHits, 0) + writes := atomic.SwapInt64(&fs.stats.writes, 0) + flushes := atomic.SwapInt64(&fs.stats.flushes, 0) + metadataReads := atomic.SwapInt64(&fs.stats.metadataReads, 0) + metadataWrites := atomic.SwapInt64(&fs.stats.metadataWrites, 0) + noops := atomic.SwapInt64(&fs.stats.noops, 0) + fs.stats.ts = now + readsOr1 := float64(reads) + if reads == 0 { + readsOr1 = 1 + } + fmt.Fprintf( + os.Stderr, + "%v I/O: %.2f read/s, %.2f %% hits, %.2f write/s; metadata: %.2f read/s, %.2f write/s; %.2f noop/s; %.2f flush/s\n", + now.Format("2006/01/02 15:04:05.000000"), + float64(reads) / d, + float64(readHits)/readsOr1*100, + float64(writes) / d, + float64(metadataReads) / d, + float64(metadataWrites) / d, + float64(noops) / d, + float64(flushes) / d, + ) + } +} + // Close unneeded cache FDs func (fs *Goofys) FDCloser() { fs.diskFdMu.Lock() @@ -525,7 +576,7 @@ func (fs *Goofys) ScheduleRetryFlush() { func (fs *Goofys) Flusher() { var inodes []fuseops.InodeID again := false - for true { + for { if !again { fs.flusherMu.Lock() if fs.flushPending == 0 { @@ -554,7 +605,10 @@ func (fs *Goofys) Flusher() { inode := fs.inodes[id] fs.mu.RUnlock() if inode != nil { - inode.TryFlush() + sent := inode.TryFlush() + if sent { + atomic.AddInt64(&fs.stats.flushes, 1) + } if atomic.LoadInt64(&fs.activeFlushers) >= fs.flags.MaxFlushers { break } @@ -686,6 +740,8 @@ func (fs *Goofys) StatFS( ctx context.Context, op *fuseops.StatFSOp) (err error) { + atomic.AddInt64(&fs.stats.metadataReads, 1) + const BLOCK_SIZE = 4096 const TOTAL_SPACE = 1 * 1024 * 1024 * 1024 * 1024 * 1024 // 1PB const TOTAL_BLOCKS = TOTAL_SPACE / BLOCK_SIZE @@ -704,6 +760,8 @@ func (fs *Goofys) GetInodeAttributes( ctx context.Context, op *fuseops.GetInodeAttributesOp) (err error) { + atomic.AddInt64(&fs.stats.metadataReads, 1) + fs.mu.RLock() inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() @@ -729,6 +787,8 @@ func (fs *Goofys) GetXattr(ctx context.Context, inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() + atomic.AddInt64(&fs.stats.metadataReads, 1) + if atomic.LoadInt32(&inode.refreshed) == -1 { // Stale inode return syscall.ESTALE @@ -758,6 +818,8 @@ func (fs *Goofys) ListXattr(ctx context.Context, inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() + atomic.AddInt64(&fs.stats.metadataReads, 1) + if atomic.LoadInt32(&inode.refreshed) == -1 { // Stale inode return syscall.ESTALE @@ -794,6 +856,8 @@ func (fs *Goofys) RemoveXattr(ctx context.Context, inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() + atomic.AddInt64(&fs.stats.metadataWrites, 1) + if atomic.LoadInt32(&inode.refreshed) == -1 { // Stale inode return syscall.ESTALE @@ -815,6 +879,8 @@ func (fs *Goofys) SetXattr(ctx context.Context, inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() + atomic.AddInt64(&fs.stats.metadataWrites, 1) + if atomic.LoadInt32(&inode.refreshed) == -1 { // Stale inode return syscall.ESTALE @@ -835,6 +901,8 @@ func (fs *Goofys) CreateSymlink(ctx context.Context, parent := fs.getInodeOrDie(op.Parent) fs.mu.RUnlock() + atomic.AddInt64(&fs.stats.metadataWrites, 1) + if atomic.LoadInt32(&parent.refreshed) == -1 { // Stale inode return syscall.ESTALE @@ -854,6 +922,8 @@ func (fs *Goofys) ReadSymlink(ctx context.Context, inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() + atomic.AddInt64(&fs.stats.metadataReads, 1) + if atomic.LoadInt32(&inode.refreshed) == -1 { // Stale inode return syscall.ESTALE @@ -952,6 +1022,8 @@ func (fs *Goofys) LookUpInode( ctx context.Context, op *fuseops.LookUpInodeOp) (err error) { + atomic.AddInt64(&fs.stats.metadataReads, 1) + var inode *Inode var ok bool defer func() { fuseLog.Debugf("<-- LookUpInode %v %v %v", op.Parent, op.Name, err) }() @@ -1072,6 +1144,8 @@ func (fs *Goofys) ForgetInode( ctx context.Context, op *fuseops.ForgetInodeOp) (err error) { + atomic.AddInt64(&fs.stats.metadataReads, 1) + fs.mu.RLock() inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() @@ -1087,6 +1161,8 @@ func (fs *Goofys) OpenDir( ctx context.Context, op *fuseops.OpenDirOp) (err error) { + atomic.AddInt64(&fs.stats.noops, 1) + fs.mu.Lock() in := fs.getInodeOrDie(op.Inode) if atomic.LoadInt32(&in.refreshed) == -1 { @@ -1127,6 +1203,8 @@ func (fs *Goofys) ReadDir( ctx context.Context, op *fuseops.ReadDirOp) (err error) { + atomic.AddInt64(&fs.stats.metadataReads, 1) + // Find the handle. fs.mu.RLock() dh := fs.dirHandles[op.Handle] @@ -1252,6 +1330,8 @@ func (fs *Goofys) ReleaseDirHandle( ctx context.Context, op *fuseops.ReleaseDirHandleOp) (err error) { + atomic.AddInt64(&fs.stats.noops, 1) + fs.mu.RLock() dh := fs.dirHandles[op.Handle] fs.mu.RUnlock() @@ -1274,6 +1354,8 @@ func (fs *Goofys) OpenFile( in := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() + atomic.AddInt64(&fs.stats.noops, 1) + if atomic.LoadInt32(&in.refreshed) == -1 { // Stale inode return syscall.ESTALE @@ -1314,6 +1396,8 @@ func (fs *Goofys) ReadFile( ctx context.Context, op *fuseops.ReadFileOp) (err error) { + atomic.AddInt64(&fs.stats.reads, 1) + fs.mu.RLock() fh := fs.fileHandles[op.Handle] fs.mu.RUnlock() @@ -1328,6 +1412,8 @@ func (fs *Goofys) SyncFile( ctx context.Context, op *fuseops.SyncFileOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + if !fs.flags.IgnoreFsync { fs.mu.RLock() in := fs.getInodeOrDie(op.Inode) @@ -1353,6 +1439,8 @@ func (fs *Goofys) FlushFile( // FlushFile is a no-op because we flush changes to the server asynchronously // If the user really wants to persist a file to the server he should call fsync() + atomic.AddInt64(&fs.stats.noops, 1) + return } @@ -1364,6 +1452,8 @@ func (fs *Goofys) ReleaseFileHandle( fh := fs.fileHandles[op.Handle] fh.Release() + atomic.AddInt64(&fs.stats.noops, 1) + fuseLog.Debugln("ReleaseFileHandle", fh.inode.FullName(), op.Handle, fh.inode.Id) delete(fs.fileHandles, op.Handle) @@ -1377,6 +1467,8 @@ func (fs *Goofys) CreateFile( ctx context.Context, op *fuseops.CreateFileOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + fs.mu.RLock() parent := fs.getInodeOrDie(op.Parent) fs.mu.RUnlock() @@ -1419,6 +1511,8 @@ func (fs *Goofys) MkNode( ctx context.Context, op *fuseops.MkNodeOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + if (op.Mode & os.ModeType) != os.ModeDir && (op.Mode & os.ModeType) != 0 && !fs.flags.EnableSpecials { @@ -1461,6 +1555,8 @@ func (fs *Goofys) MkDir( ctx context.Context, op *fuseops.MkDirOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + fs.mu.RLock() parent := fs.getInodeOrDie(op.Parent) fs.mu.RUnlock() @@ -1494,6 +1590,8 @@ func (fs *Goofys) RmDir( ctx context.Context, op *fuseops.RmDirOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + fs.mu.RLock() parent := fs.getInodeOrDie(op.Parent) fs.mu.RUnlock() @@ -1513,6 +1611,8 @@ func (fs *Goofys) SetInodeAttributes( ctx context.Context, op *fuseops.SetInodeAttributesOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + fs.mu.RLock() inode := fs.getInodeOrDie(op.Inode) fs.mu.RUnlock() @@ -1599,6 +1699,8 @@ func (fs *Goofys) WriteFile( ctx context.Context, op *fuseops.WriteFileOp) (err error) { + atomic.AddInt64(&fs.stats.writes, 1) + fs.mu.RLock() fh, ok := fs.fileHandles[op.Handle] @@ -1621,6 +1723,8 @@ func (fs *Goofys) Unlink( ctx context.Context, op *fuseops.UnlinkOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + fs.mu.RLock() parent := fs.getInodeOrDie(op.Parent) fs.mu.RUnlock() @@ -1641,6 +1745,8 @@ func (fs *Goofys) Rename( ctx context.Context, op *fuseops.RenameOp) (err error) { + atomic.AddInt64(&fs.stats.metadataWrites, 1) + fs.mu.RLock() parent := fs.getInodeOrDie(op.OldParent) newParent := fs.getInodeOrDie(op.NewParent)