Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/headers/caller_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var (
)

var (
SystemOperatorCallerInfo = CallerInfo{
CallerName: CallerNameSystem,
CallerType: CallerTypeOperator,
}
SystemBackgroundHighCallerInfo = CallerInfo{
CallerName: CallerNameSystem,
CallerType: CallerTypeBackgroundHigh,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,12 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) {
s.fatalOnError("NewClusterMetadataManager", err)

s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, dynamicconfig.NewNoopCollection(), s.Logger)
s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager, dynamicconfig.GetBoolPropertyFn(true))
s.SearchAttributesManager = searchattribute.NewManager(
clock.NewRealTimeSource(),
s.ClusterMetadataManager,
s.Logger,
dynamicconfig.GetBoolPropertyFn(true),
)

s.MetadataManager, err = factory.NewMetadataManager()
s.fatalOnError("NewMetadataManager", err)
Expand Down
4 changes: 4 additions & 0 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,28 @@ func SearchAttributeMapperProviderProvider(
}

func SearchAttributeProviderProvider(
logger log.SnTaggedLogger,
timeSource clock.TimeSource,
cmMgr persistence.ClusterMetadataManager,
dynamicCollection *dynamicconfig.Collection,
) searchattribute.Provider {
return searchattribute.NewManager(
timeSource,
cmMgr,
logger,
dynamicconfig.ForceSearchAttributesCacheRefreshOnRead.Get(dynamicCollection))
}

func SearchAttributeManagerProvider(
logger log.SnTaggedLogger,
timeSource clock.TimeSource,
cmMgr persistence.ClusterMetadataManager,
dynamicCollection *dynamicconfig.Collection,
) searchattribute.Manager {
return searchattribute.NewManager(
timeSource,
cmMgr,
logger,
dynamicconfig.ForceSearchAttributesCacheRefreshOnRead.Get(dynamicCollection))
}

Expand Down
98 changes: 51 additions & 47 deletions common/searchattribute/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package searchattribute
import (
"context"
"maps"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand All @@ -13,16 +14,21 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/persistence"
)

const (
cacheRefreshTimeout = 5 * time.Second
cacheRefreshInterval = 60 * time.Second
cacheRefreshIfUnavailableInterval = 20 * time.Second
cacheRefreshColdInterval = 1 * time.Second
)

type (
managerImpl struct {
logger log.Logger
timeSource clock.TimeSource
clusterMetadataManager persistence.ClusterMetadataManager
forceRefresh dynamicconfig.BoolPropertyFn
Expand All @@ -44,9 +50,9 @@ var _ Manager = (*managerImpl)(nil)
func NewManager(
timeSource clock.TimeSource,
clusterMetadataManager persistence.ClusterMetadataManager,
logger log.Logger,
forceRefresh dynamicconfig.BoolPropertyFn,
) *managerImpl {

var saCache atomic.Value
saCache.Store(cache{
searchAttributes: map[string]NameTypeMap{},
Expand All @@ -55,6 +61,7 @@ func NewManager(
})

return &managerImpl{
logger: logger,
timeSource: timeSource,
cache: saCache,
clusterMetadataManager: clusterMetadataManager,
Expand All @@ -68,44 +75,15 @@ func (m *managerImpl) GetSearchAttributes(
indexName string,
forceRefreshCache bool,
) (NameTypeMap, error) {

now := m.timeSource.Now()
saCache := m.cache.Load().(cache)

if m.needRefreshCache(saCache, forceRefreshCache, now) {
m.cacheUpdateMutex.Lock()
saCache = m.cache.Load().(cache)
if m.needRefreshCache(saCache, forceRefreshCache, now) {
var err error
saCache, err = m.refreshCache(saCache, now)
if err != nil {
m.cacheUpdateMutex.Unlock()
return NameTypeMap{}, err
}
}
m.cacheUpdateMutex.Unlock()
}

result := NameTypeMap{}
indexSearchAttributes, ok := saCache.searchAttributes[indexName]
if ok {
result.customSearchAttributes = maps.Clone(indexSearchAttributes.customSearchAttributes)
saCache, err := m.refreshCache(forceRefreshCache, now)
if err != nil {
m.logger.Error("failed to refresh search attributes cache", tag.Error(err))
return result, err
}

// TODO (rodrigozhou): remove following block for v1.21.
// Try to look for the empty string indexName for backward compatibility: up to v1.19,
// empty string was used when Elasticsearch was not configured.
// If there's a value, merging with current index name value. This is to avoid handling
// all code references to GetSearchAttributes.
if indexName != "" {
indexSearchAttributes, ok = saCache.searchAttributes[""]
if ok {
if result.customSearchAttributes == nil {
result.customSearchAttributes = maps.Clone(indexSearchAttributes.customSearchAttributes)
} else {
maps.Copy(result.customSearchAttributes, indexSearchAttributes.customSearchAttributes)
}
}
if indexSearchAttributes, ok := saCache.searchAttributes[indexName]; ok {
result.customSearchAttributes = maps.Clone(indexSearchAttributes.customSearchAttributes)
}
return result, nil
}
Expand All @@ -114,12 +92,31 @@ func (m *managerImpl) needRefreshCache(saCache cache, forceRefreshCache bool, no
return forceRefreshCache || saCache.expireOn.Before(now) || m.forceRefresh()
}

func (m *managerImpl) refreshCache(saCache cache, now time.Time) (cache, error) {
// TODO: specify a timeout for the context
ctx := headers.SetCallerInfo(
context.TODO(),
headers.SystemBackgroundHighCallerInfo,
)
func (m *managerImpl) refreshCache(forceRefreshCache bool, now time.Time) (cache, error) {
saCache := m.cache.Load().(cache)
if !m.needRefreshCache(saCache, forceRefreshCache, now) {
return saCache, nil
}

m.cacheUpdateMutex.Lock()
defer m.cacheUpdateMutex.Unlock()
saCache = m.cache.Load().(cache)
if !m.needRefreshCache(saCache, forceRefreshCache, now) {
return saCache, nil
}

return m.refreshCacheLocked(saCache, now)
}

func (m *managerImpl) refreshCacheLocked(saCache cache, now time.Time) (cache, error) {
ctx, cancel := context.WithTimeout(context.Background(), cacheRefreshTimeout)
defer cancel()
if saCache.dbVersion == 0 {
// if cache is cold, use the highest priority caller
ctx = headers.SetCallerInfo(ctx, headers.SystemOperatorCallerInfo)
} else {
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundHighCallerInfo)
}

clusterMetadata, err := m.clusterMetadataManager.GetCurrentClusterMetadata(ctx)
if err != nil {
Expand All @@ -128,14 +125,21 @@ func (m *managerImpl) refreshCache(saCache cache, now time.Time) (cache, error)
// NotFound means cluster metadata was never persisted and custom search attributes are not defined.
// Ignore the error.
saCache.expireOn = now.Add(cacheRefreshInterval)
err = nil
case *serviceerror.Unavailable:
// If persistence is Unavailable, ignore the error and use existing cache for cacheRefreshIfUnavailableInterval.
saCache.expireOn = now.Add(cacheRefreshIfUnavailableInterval)
default:
return saCache, err
if saCache.dbVersion == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If cache is cold, 1) default refresh interval should be much shorter, 1s or even shorter than that. 2) we should retry on all errors if cache is code, not just Unavailable error, like resource exhausted or timeout errors.

Copy link
Contributor Author

@rodrigozhou rodrigozhou Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduced the interval to 1 second.
If there are any other error besides NotFound or Unavailable, it will retry immediately no matter if the cache is cold or not. This is the existing logic, and I didn't change.

// If the cache is still cold, and persistence is Unavailable, retry more aggressively
// within cacheRefreshColdInterval.
saCache.expireOn = now.Add(time.Duration(rand.Int63n(int64(cacheRefreshColdInterval))))
} else {
// If persistence is Unavailable, but cache was loaded at least once, then ignore the error
// and use existing cache for cacheRefreshIfUnavailableInterval.
saCache.expireOn = now.Add(cacheRefreshIfUnavailableInterval)
err = nil
}
}
m.cache.Store(saCache)
return saCache, nil
return saCache, err
}

// clusterMetadata.Version <= saCache.dbVersion means DB is not changed.
Expand Down
24 changes: 18 additions & 6 deletions common/searchattribute/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/testing/testlogger"
"go.uber.org/mock/gomock"
)

Expand All @@ -25,7 +25,7 @@ type (

controller *gomock.Controller

logger log.Logger
logger *testlogger.TestLogger
timeSource *clock.EventTimeSource
mockClusterMetadataManager *persistence.MockClusterMetadataManager
manager *managerImpl
Expand All @@ -47,12 +47,13 @@ func (s *searchAttributesManagerSuite) TearDownSuite() {
func (s *searchAttributesManagerSuite) SetupTest() {
s.Assertions = require.New(s.T())
s.controller = gomock.NewController(s.T())
s.logger = log.NewTestLogger()
s.logger = testlogger.NewTestLogger(s.T(), testlogger.FailOnAnyUnexpectedError)
s.timeSource = clock.NewEventTimeSource()
s.mockClusterMetadataManager = persistence.NewMockClusterMetadataManager(s.controller)
s.manager = NewManager(
s.timeSource,
s.mockClusterMetadataManager,
s.logger,
func() bool {
return s.forceCacheRefresh
},
Expand Down Expand Up @@ -127,6 +128,7 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_Error() {
s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 0, 0, time.UTC))
// Initial call
s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(nil, errors.New("random error"))
s.logger.Expect(testlogger.Error, "failed to refresh search attributes")
searchAttributes, err := s.manager.GetSearchAttributes("index-name", false)
s.Error(err)
s.Len(searchAttributes.Custom(), 0)
Expand All @@ -149,7 +151,17 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_NotFoundErro
func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_UnavailableError() {
s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 0, 0, time.UTC))

// First call populates cache.
// First call: DB is down, cache is cold
s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(nil, serviceerror.NewUnavailable("db is down"))
s.logger.Expect(testlogger.Error, "failed to refresh search attributes")
searchAttributes, err := s.manager.GetSearchAttributes("index-name", false)
s.Error(err)
s.Len(searchAttributes.Custom(), 0)

// Move time forward
s.timeSource.Update(time.Date(2020, 8, 22, 1, 1, 0, 0, time.UTC))

// Second call populates cache.
s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(&persistence.GetClusterMetadataResponse{
ClusterMetadata: &persistencespb.ClusterMetadata{
IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{
Expand All @@ -160,14 +172,14 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_UnavailableE
},
Version: 1,
}, nil)
searchAttributes, err := s.manager.GetSearchAttributes("index-name", false)
searchAttributes, err = s.manager.GetSearchAttributes("index-name", false)
s.NoError(err)
s.Len(searchAttributes.Custom(), 1)

// Expire cache.
s.timeSource.Update(time.Date(2020, 8, 22, 2, 0, 0, 0, time.UTC))

// Second call, cache is expired, DB is down, but cache data is returned.
// Third call, cache is expired, DB is down, but cache data is returned.
s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(nil, serviceerror.NewUnavailable("db is down"))
searchAttributes, err = s.manager.GetSearchAttributes("index-name", false)
s.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion temporal/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func ApplyClusterMetadataConfigProvider(
tag.ClusterName(clusterMetadata.CurrentClusterName))
return svc.ClusterMetadata, svc.Persistence, missingCurrentClusterMetadataErr
}
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundHighCallerInfo)
ctx = headers.SetCallerInfo(ctx, headers.SystemOperatorCallerInfo)
resp, err := clusterMetadataManager.GetClusterMetadata(
ctx,
&persistence.GetClusterMetadataRequest{ClusterName: clusterMetadata.CurrentClusterName},
Expand Down
Loading