Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fileservice: more logs for memory cache updating #20889

Merged
merged 2 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions pkg/fileservice/mem_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,68 @@ func NewMemCache(
}

postSetFn := func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64) {
// events
LogEvent(ctx, str_memory_cache_post_set_begin)
defer LogEvent(ctx, str_memory_cache_post_set_end)

// retain
value.Retain()

// metrics
LogEvent(ctx, str_update_metrics_begin)
inuseBytes.Add(float64(size))
capacityBytes.Set(float64(capacityFunc()))
value.Retain()
LogEvent(ctx, str_update_metrics_end)

// callbacks
if callbacks != nil {
LogEvent(ctx, str_memory_cache_callbacks_begin)
for _, fn := range callbacks.PostSet {
fn(key, value)
}
LogEvent(ctx, str_memory_cache_callbacks_end)
}
}

postGetFn := func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64) {
// events
LogEvent(ctx, str_memory_cache_post_get_begin)
defer LogEvent(ctx, str_memory_cache_post_get_end)

// retain
value.Retain()

// callbacks
if callbacks != nil {
LogEvent(ctx, str_memory_cache_callbacks_begin)
for _, fn := range callbacks.PostGet {
fn(key, value)
}
LogEvent(ctx, str_memory_cache_callbacks_end)
}
}

postEvictFn := func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64) {
// events
LogEvent(ctx, str_memory_cache_post_evict_begin)
defer LogEvent(ctx, str_memory_cache_post_evict_end)

// relaese
value.Release()

// metrics
LogEvent(ctx, str_update_metrics_begin)
inuseBytes.Add(float64(-size))
capacityBytes.Set(float64(capacityFunc()))
value.Release()
LogEvent(ctx, str_update_metrics_end)

// callbacks
if callbacks != nil {
LogEvent(ctx, str_memory_cache_callbacks_begin)
for _, fn := range callbacks.PostEvict {
fn(key, value)
}
LogEvent(ctx, str_memory_cache_callbacks_end)
}
}

Expand Down Expand Up @@ -176,7 +208,9 @@ func (m *MemCache) Update(
Sz: entry.Size,
}

LogEvent(ctx, str_set_memory_cache_entry_begin)
m.cache.Set(ctx, key, entry.CachedData)
LogEvent(ctx, str_set_memory_cache_entry_end)
}
return nil
}
Expand Down
64 changes: 37 additions & 27 deletions pkg/fileservice/mem_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,27 @@ func BenchmarkMemoryCacheUpdate(b *testing.B) {
)
defer cache.Flush(ctx)

for i := range b.N {
vec := &IOVector{
FilePath: fmt.Sprintf("%d", i),
Entries: []IOEntry{
{
Data: []byte("a"),
Size: 1,
CachedData: DefaultCacheDataAllocator().AllocateCacheData(ctx, 1),
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for i := 0; pb.Next(); i++ {
vec := &IOVector{
FilePath: fmt.Sprintf("%d", i),
Entries: []IOEntry{
{
Data: []byte("a"),
Size: 1,
CachedData: DefaultCacheDataAllocator().AllocateCacheData(ctx, 1),
},
},
},
}
if err := cache.Update(ctx, vec, false); err != nil {
b.Fatal(err)
}
if err := cache.Update(ctx, vec, false); err != nil {
b.Fatal(err)
}
vec.Release()
}
vec.Release()
}
})

}

func BenchmarkMemoryCacheRead(b *testing.B) {
Expand Down Expand Up @@ -212,21 +217,26 @@ func BenchmarkMemoryCacheRead(b *testing.B) {
}
vec.Release()

for range b.N {
vec := &IOVector{
FilePath: "foo",
Entries: []IOEntry{
{
Size: 1,
ToCacheData: CacheOriginalData,
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
vec := &IOVector{
FilePath: "foo",
Entries: []IOEntry{
{
Size: 1,
ToCacheData: CacheOriginalData,
},
},
},
}
if err := cache.Read(ctx, vec); err != nil {
b.Fatal(err)
}
if err := cache.Read(ctx, vec); err != nil {
b.Fatal(err)
}
vec.Release()
}
vec.Release()
}
})

}

func TestMemoryCacheGlobalSizeHint(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/fileservice/s3_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,9 @@ read_memory_cache:
return
}
t0 := time.Now()
LogEvent(ctx, str_update_memory_cache_Caches_begin)
LogEvent(ctx, str_update_memory_cache_begin)
err = s.memCache.Update(ctx, vector, s.asyncUpdate)
LogEvent(ctx, str_update_memory_cache_Caches_end)
LogEvent(ctx, str_update_memory_cache_end)
metric.FSReadDurationUpdateMemoryCache.Observe(time.Since(t0).Seconds())
}()
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/fileservice/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var (
str_update_vector_Caches_end = internString("update vector.Caches end")
str_read_memory_cache_Caches_begin = internString("read memory cache begin")
str_read_memory_cache_Caches_end = internString("read memory cache end")
str_update_memory_cache_Caches_begin = internString("update memory cache begin")
str_update_memory_cache_Caches_end = internString("update memory cache end")
str_update_memory_cache_begin = internString("update memory cache begin")
str_update_memory_cache_end = internString("update memory cache end")
str_read_disk_cache_Caches_begin = internString("read disk cache begin")
str_read_disk_cache_Caches_end = internString("read disk cache end")
str_update_disk_cache_Caches_begin = internString("update disk cache begin")
Expand Down Expand Up @@ -83,6 +83,16 @@ var (
str_prepareData_end = internString("prepareData end")
str_WriterForRead_Write_begin = internString("WriterForRead.Write begin")
str_WriterForRead_Write_end = internString("WriterForRead.Write end")
str_set_memory_cache_entry_begin = internString("set memory cache entry begin")
str_set_memory_cache_entry_end = internString("set memory cache entry end")
str_memory_cache_post_set_begin = internString("memory cache postSet begin")
str_memory_cache_post_set_end = internString("memory cache postSet end")
str_memory_cache_post_get_begin = internString("memory cache postGet begin")
str_memory_cache_post_get_end = internString("memory cache postGet end")
str_memory_cache_post_evict_begin = internString("memory cache postEvict begin")
str_memory_cache_post_evict_end = internString("memory cache postEvict end")
str_memory_cache_callbacks_begin = internString("memory cache callbacks begin")
str_memory_cache_callbacks_end = internString("memory cache callbacks end")
)

type stringRef struct {
Expand Down
Loading