Skip to content

Commit

Permalink
Add I/O statistics printer
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Nov 22, 2022
1 parent c72c0a7 commit 10a753b
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 14 deletions.
2 changes: 2 additions & 0 deletions api/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type FlagStorage struct {
DebugS3 bool
Foreground bool
LogFile string

StatsInterval time.Duration
}

func (flags *FlagStorage) GetMimeType(fileName string) (retMime *string) {
Expand Down
27 changes: 15 additions & 12 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (inode *Inode) OpenCacheFD() error {
// Load some inode data into memory
// Must be called with inode.mu taken
// Loaded range should be guarded against eviction by adding it into inode.readRanges
func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, ignoreMemoryLimit bool) (requestErr error) {
func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64, ignoreMemoryLimit bool) (miss bool, requestErr error) {

end := offset+readAheadSize
if size > readAheadSize {
Expand Down Expand Up @@ -606,7 +606,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64,
} else if b.state == BUF_FL_CLEARED {
// Buffer is saved as a part and then removed
// We must complete multipart upload to be able to read it back
return syscall.ESPIPE
return true, syscall.ESPIPE
}
pos = b.offset+b.length
}
Expand Down Expand Up @@ -713,7 +713,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64,

if len(diskRequests) > 0 {
if err := inode.OpenCacheFD(); err != nil {
return err
return true, err
}
loadedFromDisk := uint64(0)
for i := 0; i < len(diskRequests); i += 2 {
Expand All @@ -722,7 +722,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64,
data := make([]byte, requestSize)
_, err := inode.DiskCacheFD.ReadAt(data, int64(requestOffset))
if err != nil {
return err
return true, err
}
pos := locateBuffer(inode.buffers, requestOffset)
var ib *FileBuffer
Expand Down Expand Up @@ -754,6 +754,7 @@ func (inode *Inode) LoadRange(offset uint64, size uint64, readAheadSize uint64,
return
}

miss = true
end = offset+size
for {
// Check if all buffers are loaded or if there is a read error
Expand Down Expand Up @@ -910,8 +911,8 @@ func (inode *Inode) IsRangeLocked(offset uint64, size uint64, onlyFlushing bool)
return false
}

func (inode *Inode) CheckLoadRange(offset, size, readAheadSize uint64, ignoreMemoryLimit bool) error {
err := inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit)
func (inode *Inode) CheckLoadRange(offset, size, readAheadSize uint64, ignoreMemoryLimit bool) (bool, error) {
miss, err := inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit)
if err == syscall.ESPIPE {
// Finalize multipart upload to get some flushed data back
// We have to flush all parts that extend the file up until the last flushed part
Expand All @@ -933,15 +934,15 @@ func (inode *Inode) CheckLoadRange(offset, size, readAheadSize uint64, ignoreMem
err = inode.SyncFile()
inode.mu.Lock()
if err == nil {
err = inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit)
_, err = inode.LoadRange(offset, size, readAheadSize, ignoreMemoryLimit)
}
}
inode.pauseWriters--
if inode.readCond != nil {
inode.readCond.Broadcast()
}
}
return err
return miss, err
}

func appendZero(data [][]byte, zeroBuf []byte, zeroLen int) [][]byte {
Expand Down Expand Up @@ -1022,7 +1023,6 @@ func (fh *FileHandle) ReadFile(sOffset int64, sLen int64) (data [][]byte, bytesR
defer fh.inode.UnlockRange(offset, end-offset, false)

// Check if anything requires to be loaded from the server
var requestErr error
ra := fh.inode.fs.flags.ReadAheadKB*1024
if fh.seqReadSize >= fh.inode.fs.flags.LargeReadCutoffKB*1024 {
// Use larger readahead with 'pipelining'
Expand All @@ -1038,7 +1038,10 @@ func (fh *FileHandle) ReadFile(sOffset int64, sLen int64) (data [][]byte, bytesR
if ra+end > maxFileSize {
ra = 0
}
requestErr = fh.inode.CheckLoadRange(offset, end-offset, ra, false)
miss, requestErr := fh.inode.CheckLoadRange(offset, end-offset, ra, false)
if !miss {
atomic.AddInt64(&fh.inode.fs.stats.readHits, 1)
}
mappedErr := mapAwsError(requestErr)
if requestErr != nil {
err = requestErr
Expand Down Expand Up @@ -1676,7 +1679,7 @@ func (inode *Inode) FlushSmallObject() {
inode.LockRange(0, sz, true)

if inode.CacheState == ST_MODIFIED {
err := inode.LoadRange(0, sz, 0, true)
_, err := inode.LoadRange(0, sz, 0, true)
mappedErr := mapAwsError(err)
if mappedErr == fuse.ENOENT || mappedErr == syscall.ERANGE {
// Object is deleted or resized remotely (416). Discard local version
Expand Down Expand Up @@ -1873,7 +1876,7 @@ func (inode *Inode) FlushPart(part uint64) {
// Ignore memory limit to not produce a deadlock when we need to free some memory
// by flushing objects, but we can't flush a part without allocating more memory
// for read-modify-write...
err := inode.LoadRange(partOffset, partSize, 0, true)
_, err := inode.LoadRange(partOffset, partSize, 0, true)
if err == syscall.ESPIPE {
// Part is partly evicted, we can't flush it
return
Expand Down
7 changes: 7 additions & 0 deletions internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ func NewApp() (app *cli.App) {
Usage: "Redirect logs to file, 'stderr' (default for foreground) or 'syslog' (default for background).",
Value: "",
},

cli.DurationFlag{
Name: "print-stats",
Value: 30 * time.Second,
Usage: "I/O statistics printing interval. Set to 0 to disable.",
},
}

app = &cli.App{
Expand Down Expand Up @@ -699,6 +705,7 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
DebugS3: c.Bool("debug_s3"),
Foreground: c.Bool("f"),
LogFile: c.String("log-file"),
StatsInterval: c.Duration("print-stats"),
}

flags.PartSizes = parsePartSizes(c.String("part-sizes"))
Expand Down
Loading

0 comments on commit 10a753b

Please sign in to comment.