diff --git a/common/headers/caller_info.go b/common/headers/caller_info.go index 40d3eb98688..387bdc95144 100644 --- a/common/headers/caller_info.go +++ b/common/headers/caller_info.go @@ -27,6 +27,10 @@ var ( ) var ( + SystemOperatorCallerInfo = CallerInfo{ + CallerName: CallerNameSystem, + CallerType: CallerTypeOperator, + } SystemBackgroundHighCallerInfo = CallerInfo{ CallerName: CallerNameSystem, CallerType: CallerTypeBackgroundHigh, diff --git a/common/persistence/persistence-tests/persistence_test_base.go b/common/persistence/persistence-tests/persistence_test_base.go index d32aecfdaa9..9f29638b4a4 100644 --- a/common/persistence/persistence-tests/persistence_test_base.go +++ b/common/persistence/persistence-tests/persistence_test_base.go @@ -226,7 +226,12 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) { s.fatalOnError("NewClusterMetadataManager", err) s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, dynamicconfig.NewNoopCollection(), s.Logger) - s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager, dynamicconfig.GetBoolPropertyFn(true)) + s.SearchAttributesManager = searchattribute.NewManager( + clock.NewRealTimeSource(), + s.ClusterMetadataManager, + s.Logger, + dynamicconfig.GetBoolPropertyFn(true), + ) s.MetadataManager, err = factory.NewMetadataManager() s.fatalOnError("NewMetadataManager", err) diff --git a/common/resource/fx.go b/common/resource/fx.go index 30a0541f58a..252f062aa52 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -158,6 +158,7 @@ func SearchAttributeMapperProviderProvider( } func SearchAttributeProviderProvider( + logger log.SnTaggedLogger, timeSource clock.TimeSource, cmMgr persistence.ClusterMetadataManager, dynamicCollection *dynamicconfig.Collection, @@ -165,10 +166,12 @@ func SearchAttributeProviderProvider( return searchattribute.NewManager( timeSource, cmMgr, + logger, dynamicconfig.ForceSearchAttributesCacheRefreshOnRead.Get(dynamicCollection)) } func SearchAttributeManagerProvider( + logger log.SnTaggedLogger, timeSource clock.TimeSource, cmMgr persistence.ClusterMetadataManager, dynamicCollection *dynamicconfig.Collection, @@ -176,6 +179,7 @@ func SearchAttributeManagerProvider( return searchattribute.NewManager( timeSource, cmMgr, + logger, dynamicconfig.ForceSearchAttributesCacheRefreshOnRead.Get(dynamicCollection)) } diff --git a/common/searchattribute/manager.go b/common/searchattribute/manager.go index 69afc8bef74..82bd54a63b0 100644 --- a/common/searchattribute/manager.go +++ b/common/searchattribute/manager.go @@ -3,6 +3,7 @@ package searchattribute import ( "context" "maps" + "math/rand" "sync" "sync/atomic" "time" @@ -13,16 +14,21 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/persistence" ) const ( + cacheRefreshTimeout = 5 * time.Second cacheRefreshInterval = 60 * time.Second cacheRefreshIfUnavailableInterval = 20 * time.Second + cacheRefreshColdInterval = 1 * time.Second ) type ( managerImpl struct { + logger log.Logger timeSource clock.TimeSource clusterMetadataManager persistence.ClusterMetadataManager forceRefresh dynamicconfig.BoolPropertyFn @@ -44,9 +50,9 @@ var _ Manager = (*managerImpl)(nil) func NewManager( timeSource clock.TimeSource, clusterMetadataManager persistence.ClusterMetadataManager, + logger log.Logger, forceRefresh dynamicconfig.BoolPropertyFn, ) *managerImpl { - var saCache atomic.Value saCache.Store(cache{ searchAttributes: map[string]NameTypeMap{}, @@ -55,6 +61,7 @@ func NewManager( }) return &managerImpl{ + logger: logger, timeSource: timeSource, cache: saCache, clusterMetadataManager: clusterMetadataManager, @@ -68,44 +75,15 @@ func (m *managerImpl) GetSearchAttributes( indexName string, forceRefreshCache bool, ) (NameTypeMap, error) { - now := m.timeSource.Now() - saCache := m.cache.Load().(cache) - - if m.needRefreshCache(saCache, forceRefreshCache, now) { - m.cacheUpdateMutex.Lock() - saCache = m.cache.Load().(cache) - if m.needRefreshCache(saCache, forceRefreshCache, now) { - var err error - saCache, err = m.refreshCache(saCache, now) - if err != nil { - m.cacheUpdateMutex.Unlock() - return NameTypeMap{}, err - } - } - m.cacheUpdateMutex.Unlock() - } - result := NameTypeMap{} - indexSearchAttributes, ok := saCache.searchAttributes[indexName] - if ok { - result.customSearchAttributes = maps.Clone(indexSearchAttributes.customSearchAttributes) + saCache, err := m.refreshCache(forceRefreshCache, now) + if err != nil { + m.logger.Error("failed to refresh search attributes cache", tag.Error(err)) + return result, err } - - // TODO (rodrigozhou): remove following block for v1.21. - // Try to look for the empty string indexName for backward compatibility: up to v1.19, - // empty string was used when Elasticsearch was not configured. - // If there's a value, merging with current index name value. This is to avoid handling - // all code references to GetSearchAttributes. - if indexName != "" { - indexSearchAttributes, ok = saCache.searchAttributes[""] - if ok { - if result.customSearchAttributes == nil { - result.customSearchAttributes = maps.Clone(indexSearchAttributes.customSearchAttributes) - } else { - maps.Copy(result.customSearchAttributes, indexSearchAttributes.customSearchAttributes) - } - } + if indexSearchAttributes, ok := saCache.searchAttributes[indexName]; ok { + result.customSearchAttributes = maps.Clone(indexSearchAttributes.customSearchAttributes) } return result, nil } @@ -114,12 +92,33 @@ func (m *managerImpl) needRefreshCache(saCache cache, forceRefreshCache bool, no return forceRefreshCache || saCache.expireOn.Before(now) || m.forceRefresh() } -func (m *managerImpl) refreshCache(saCache cache, now time.Time) (cache, error) { - // TODO: specify a timeout for the context - ctx := headers.SetCallerInfo( - context.TODO(), - headers.SystemBackgroundHighCallerInfo, - ) +func (m *managerImpl) refreshCache(forceRefreshCache bool, now time.Time) (cache, error) { + //nolint:revive // cache value is always of type `cache` + saCache := m.cache.Load().(cache) + if !m.needRefreshCache(saCache, forceRefreshCache, now) { + return saCache, nil + } + + m.cacheUpdateMutex.Lock() + defer m.cacheUpdateMutex.Unlock() + //nolint:revive // cache value is always of type `cache` + saCache = m.cache.Load().(cache) + if !m.needRefreshCache(saCache, forceRefreshCache, now) { + return saCache, nil + } + + return m.refreshCacheLocked(saCache, now) +} + +func (m *managerImpl) refreshCacheLocked(saCache cache, now time.Time) (cache, error) { + ctx, cancel := context.WithTimeout(context.Background(), cacheRefreshTimeout) + defer cancel() + if saCache.dbVersion == 0 { + // if cache is cold, use the highest priority caller + ctx = headers.SetCallerInfo(ctx, headers.SystemOperatorCallerInfo) + } else { + ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundHighCallerInfo) + } clusterMetadata, err := m.clusterMetadataManager.GetCurrentClusterMetadata(ctx) if err != nil { @@ -128,14 +127,21 @@ func (m *managerImpl) refreshCache(saCache cache, now time.Time) (cache, error) // NotFound means cluster metadata was never persisted and custom search attributes are not defined. // Ignore the error. saCache.expireOn = now.Add(cacheRefreshInterval) + err = nil case *serviceerror.Unavailable: - // If persistence is Unavailable, ignore the error and use existing cache for cacheRefreshIfUnavailableInterval. - saCache.expireOn = now.Add(cacheRefreshIfUnavailableInterval) - default: - return saCache, err + if saCache.dbVersion == 0 { + // If the cache is still cold, and persistence is Unavailable, retry more aggressively + // within cacheRefreshColdInterval. + saCache.expireOn = now.Add(time.Duration(rand.Int63n(int64(cacheRefreshColdInterval)))) + } else { + // If persistence is Unavailable, but cache was loaded at least once, then ignore the error + // and use existing cache for cacheRefreshIfUnavailableInterval. + saCache.expireOn = now.Add(cacheRefreshIfUnavailableInterval) + err = nil + } } m.cache.Store(saCache) - return saCache, nil + return saCache, err } // clusterMetadata.Version <= saCache.dbVersion means DB is not changed. diff --git a/common/searchattribute/manager_test.go b/common/searchattribute/manager_test.go index bcc0ca3b20d..f5879b3164f 100644 --- a/common/searchattribute/manager_test.go +++ b/common/searchattribute/manager_test.go @@ -13,8 +13,8 @@ import ( "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/clock" - "go.temporal.io/server/common/log" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/testing/testlogger" "go.uber.org/mock/gomock" ) @@ -25,7 +25,7 @@ type ( controller *gomock.Controller - logger log.Logger + logger *testlogger.TestLogger timeSource *clock.EventTimeSource mockClusterMetadataManager *persistence.MockClusterMetadataManager manager *managerImpl @@ -47,12 +47,13 @@ func (s *searchAttributesManagerSuite) TearDownSuite() { func (s *searchAttributesManagerSuite) SetupTest() { s.Assertions = require.New(s.T()) s.controller = gomock.NewController(s.T()) - s.logger = log.NewTestLogger() + s.logger = testlogger.NewTestLogger(s.T(), testlogger.FailOnAnyUnexpectedError) s.timeSource = clock.NewEventTimeSource() s.mockClusterMetadataManager = persistence.NewMockClusterMetadataManager(s.controller) s.manager = NewManager( s.timeSource, s.mockClusterMetadataManager, + s.logger, func() bool { return s.forceCacheRefresh }, @@ -127,6 +128,7 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_Error() { s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 0, 0, time.UTC)) // Initial call s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(nil, errors.New("random error")) + s.logger.Expect(testlogger.Error, "failed to refresh search attributes") searchAttributes, err := s.manager.GetSearchAttributes("index-name", false) s.Error(err) s.Len(searchAttributes.Custom(), 0) @@ -149,7 +151,17 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_NotFoundErro func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_UnavailableError() { s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 0, 0, time.UTC)) - // First call populates cache. + // First call: DB is down, cache is cold + s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(nil, serviceerror.NewUnavailable("db is down")) + s.logger.Expect(testlogger.Error, "failed to refresh search attributes") + searchAttributes, err := s.manager.GetSearchAttributes("index-name", false) + s.Error(err) + s.Len(searchAttributes.Custom(), 0) + + // Move time forward + s.timeSource.Update(time.Date(2020, 8, 22, 1, 1, 0, 0, time.UTC)) + + // Second call populates cache. s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(&persistence.GetClusterMetadataResponse{ ClusterMetadata: &persistencespb.ClusterMetadata{ IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{ @@ -160,14 +172,14 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_UnavailableE }, Version: 1, }, nil) - searchAttributes, err := s.manager.GetSearchAttributes("index-name", false) + searchAttributes, err = s.manager.GetSearchAttributes("index-name", false) s.NoError(err) s.Len(searchAttributes.Custom(), 1) // Expire cache. s.timeSource.Update(time.Date(2020, 8, 22, 2, 0, 0, 0, time.UTC)) - // Second call, cache is expired, DB is down, but cache data is returned. + // Third call, cache is expired, DB is down, but cache data is returned. s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata(gomock.Any()).Return(nil, serviceerror.NewUnavailable("db is down")) searchAttributes, err = s.manager.GetSearchAttributes("index-name", false) s.NoError(err) diff --git a/temporal/fx.go b/temporal/fx.go index bf10740e355..a21664003cc 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -630,7 +630,7 @@ func ApplyClusterMetadataConfigProvider( tag.ClusterName(clusterMetadata.CurrentClusterName)) return svc.ClusterMetadata, svc.Persistence, missingCurrentClusterMetadataErr } - ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundHighCallerInfo) + ctx = headers.SetCallerInfo(ctx, headers.SystemOperatorCallerInfo) resp, err := clusterMetadataManager.GetClusterMetadata( ctx, &persistence.GetClusterMetadataRequest{ClusterName: clusterMetadata.CurrentClusterName},