Skip to content

Commit

Permalink
fileservice: more logs for memory cache updating (#20889)
Browse files Browse the repository at this point in the history
  • Loading branch information
reusee authored Dec 25, 2024
1 parent f48a2aa commit ca798fc
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 33 deletions.
38 changes: 36 additions & 2 deletions pkg/fileservice/mem_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,68 @@ func NewMemCache(
}

postSetFn := func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64) {
// events
LogEvent(ctx, str_memory_cache_post_set_begin)
defer LogEvent(ctx, str_memory_cache_post_set_end)

// retain
value.Retain()

// metrics
LogEvent(ctx, str_update_metrics_begin)
inuseBytes.Add(float64(size))
capacityBytes.Set(float64(capacityFunc()))
value.Retain()
LogEvent(ctx, str_update_metrics_end)

// callbacks
if callbacks != nil {
LogEvent(ctx, str_memory_cache_callbacks_begin)
for _, fn := range callbacks.PostSet {
fn(key, value)
}
LogEvent(ctx, str_memory_cache_callbacks_end)
}
}

postGetFn := func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64) {
// events
LogEvent(ctx, str_memory_cache_post_get_begin)
defer LogEvent(ctx, str_memory_cache_post_get_end)

// retain
value.Retain()

// callbacks
if callbacks != nil {
LogEvent(ctx, str_memory_cache_callbacks_begin)
for _, fn := range callbacks.PostGet {
fn(key, value)
}
LogEvent(ctx, str_memory_cache_callbacks_end)
}
}

postEvictFn := func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64) {
// events
LogEvent(ctx, str_memory_cache_post_evict_begin)
defer LogEvent(ctx, str_memory_cache_post_evict_end)

// relaese
value.Release()

// metrics
LogEvent(ctx, str_update_metrics_begin)
inuseBytes.Add(float64(-size))
capacityBytes.Set(float64(capacityFunc()))
value.Release()
LogEvent(ctx, str_update_metrics_end)

// callbacks
if callbacks != nil {
LogEvent(ctx, str_memory_cache_callbacks_begin)
for _, fn := range callbacks.PostEvict {
fn(key, value)
}
LogEvent(ctx, str_memory_cache_callbacks_end)
}
}

Expand Down Expand Up @@ -176,7 +208,9 @@ func (m *MemCache) Update(
Sz: entry.Size,
}

LogEvent(ctx, str_set_memory_cache_entry_begin)
m.cache.Set(ctx, key, entry.CachedData)
LogEvent(ctx, str_set_memory_cache_entry_end)
}
return nil
}
Expand Down
64 changes: 37 additions & 27 deletions pkg/fileservice/mem_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,27 @@ func BenchmarkMemoryCacheUpdate(b *testing.B) {
)
defer cache.Flush(ctx)

for i := range b.N {
vec := &IOVector{
FilePath: fmt.Sprintf("%d", i),
Entries: []IOEntry{
{
Data: []byte("a"),
Size: 1,
CachedData: DefaultCacheDataAllocator().AllocateCacheData(ctx, 1),
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for i := 0; pb.Next(); i++ {
vec := &IOVector{
FilePath: fmt.Sprintf("%d", i),
Entries: []IOEntry{
{
Data: []byte("a"),
Size: 1,
CachedData: DefaultCacheDataAllocator().AllocateCacheData(ctx, 1),
},
},
},
}
if err := cache.Update(ctx, vec, false); err != nil {
b.Fatal(err)
}
if err := cache.Update(ctx, vec, false); err != nil {
b.Fatal(err)
}
vec.Release()
}
vec.Release()
}
})

}

func BenchmarkMemoryCacheRead(b *testing.B) {
Expand Down Expand Up @@ -212,21 +217,26 @@ func BenchmarkMemoryCacheRead(b *testing.B) {
}
vec.Release()

for range b.N {
vec := &IOVector{
FilePath: "foo",
Entries: []IOEntry{
{
Size: 1,
ToCacheData: CacheOriginalData,
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
vec := &IOVector{
FilePath: "foo",
Entries: []IOEntry{
{
Size: 1,
ToCacheData: CacheOriginalData,
},
},
},
}
if err := cache.Read(ctx, vec); err != nil {
b.Fatal(err)
}
if err := cache.Read(ctx, vec); err != nil {
b.Fatal(err)
}
vec.Release()
}
vec.Release()
}
})

}

func TestMemoryCacheGlobalSizeHint(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/fileservice/s3_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,9 @@ read_memory_cache:
return
}
t0 := time.Now()
LogEvent(ctx, str_update_memory_cache_Caches_begin)
LogEvent(ctx, str_update_memory_cache_begin)
err = s.memCache.Update(ctx, vector, s.asyncUpdate)
LogEvent(ctx, str_update_memory_cache_Caches_end)
LogEvent(ctx, str_update_memory_cache_end)
metric.FSReadDurationUpdateMemoryCache.Observe(time.Since(t0).Seconds())
}()
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/fileservice/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var (
str_update_vector_Caches_end = internString("update vector.Caches end")
str_read_memory_cache_Caches_begin = internString("read memory cache begin")
str_read_memory_cache_Caches_end = internString("read memory cache end")
str_update_memory_cache_Caches_begin = internString("update memory cache begin")
str_update_memory_cache_Caches_end = internString("update memory cache end")
str_update_memory_cache_begin = internString("update memory cache begin")
str_update_memory_cache_end = internString("update memory cache end")
str_read_disk_cache_Caches_begin = internString("read disk cache begin")
str_read_disk_cache_Caches_end = internString("read disk cache end")
str_update_disk_cache_Caches_begin = internString("update disk cache begin")
Expand Down Expand Up @@ -83,6 +83,16 @@ var (
str_prepareData_end = internString("prepareData end")
str_WriterForRead_Write_begin = internString("WriterForRead.Write begin")
str_WriterForRead_Write_end = internString("WriterForRead.Write end")
str_set_memory_cache_entry_begin = internString("set memory cache entry begin")
str_set_memory_cache_entry_end = internString("set memory cache entry end")
str_memory_cache_post_set_begin = internString("memory cache postSet begin")
str_memory_cache_post_set_end = internString("memory cache postSet end")
str_memory_cache_post_get_begin = internString("memory cache postGet begin")
str_memory_cache_post_get_end = internString("memory cache postGet end")
str_memory_cache_post_evict_begin = internString("memory cache postEvict begin")
str_memory_cache_post_evict_end = internString("memory cache postEvict end")
str_memory_cache_callbacks_begin = internString("memory cache callbacks begin")
str_memory_cache_callbacks_end = internString("memory cache callbacks end")
)

type stringRef struct {
Expand Down

0 comments on commit ca798fc

Please sign in to comment.