@@ -138,20 +138,38 @@ void InMemoryRegistry::RunGcTask() {
138138 service_circuit_breaker_config_data_.CheckGc (min_gc_time);
139139}
140140
141- Service* InMemoryRegistry::GetOrCreateServiceInLock (const ServiceKey& service_key) {
141+ Service* InMemoryRegistry::CreateServiceInLock (const ServiceKey& service_key) {
142142 Service* service = NULL ;
143143 pthread_rwlock_wrlock (&rwlock_);
144144 std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find (service_key);
145- if (service_it == service_cache_.end ()) {
146- service = new Service (service_key, ++next_service_id_);
147- service_cache_[service_key] = service;
148- } else {
145+ POLARIS_ASSERT (service_it == service_cache_.end ())
146+ service = new Service (service_key, ++next_service_id_);
147+ service_cache_[service_key] = service;
148+ pthread_rwlock_unlock (&rwlock_);
149+ return service;
150+ }
151+
152+ Service* InMemoryRegistry::GetServiceInLock (const ServiceKey& service_key) {
153+ Service* service = NULL ;
154+ pthread_rwlock_wrlock (&rwlock_);
155+ std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find (service_key);
156+ if (service_it != service_cache_.end ()) {
149157 service = service_it->second ;
150158 }
151159 pthread_rwlock_unlock (&rwlock_);
152160 return service;
153161}
154162
163+ void InMemoryRegistry::DeleteServiceInLock (const ServiceKey& service_key) {
164+ pthread_rwlock_wrlock (&rwlock_);
165+ std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find (service_key);
166+ if (service_it != service_cache_.end ()) {
167+ delete service_it->second ;
168+ service_cache_.erase (service_it);
169+ }
170+ pthread_rwlock_unlock (&rwlock_);
171+ }
172+
155173void InMemoryRegistry::CheckExpireServiceData (uint64_t min_access_time,
156174 RcuMap<ServiceKey, ServiceData>& rcu_cache,
157175 ServiceDataType service_data_type) {
@@ -166,16 +184,16 @@ void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time,
166184 if (service_data_notify_map_.erase (service_key_with_type) > 0 ) { // 有通知对象表示注册过handler
167185 context_->GetServerConnector ()->DeregisterEventHandler (expired_services[i],
168186 service_data_type);
169- } else { // 没有通知对象,表示未注册过handler,从磁盘加载后从未访问过的数据,直接删除数据
170- rcu_cache.Delete (expired_services[i]);
171- context_impl->GetServiceRecord ()->ServiceDataDelete (expired_services[i], service_data_type);
172- context_impl->GetCacheManager ()->GetCachePersist ().PersistServiceData (expired_services[i],
173- service_data_type, " " );
174187 }
175- pthread_rwlock_unlock (¬ify_rwlock_);
176188 if (service_data_type == kServiceDataInstances ) { // 清除实例数据时对应的服务级别插件也删除
177189 context_impl->DeleteServiceContext (expired_services[i]);
190+ DeleteServiceInLock (expired_services[i]);
178191 }
192+ rcu_cache.Delete (expired_services[i]);
193+ context_impl->GetServiceRecord ()->ServiceDataDelete (expired_services[i], service_data_type);
194+ context_impl->GetCacheManager ()->GetCachePersist ().PersistServiceData (expired_services[i],
195+ service_data_type, " " );
196+ pthread_rwlock_unlock (¬ify_rwlock_);
179197 }
180198}
181199
@@ -248,6 +266,9 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service
248266 if (interval_it != service_interval_map_.end ()) {
249267 refresh_interval = interval_it->second ;
250268 }
269+ if (data_type == kServiceDataInstances ) {
270+ CreateServiceInLock (service_key);
271+ }
251272 // 先加载磁盘缓存数据
252273 CachePersist& cache_persist = context_->GetContextImpl ()->GetCacheManager ()->GetCachePersist ();
253274 ServiceData* disk_service_data = cache_persist.LoadServiceData (service_key, data_type);
@@ -268,25 +289,21 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service
268289 return kReturnOk ;
269290}
270291
271- void InMemoryRegistry::DeleteServiceInLock (const ServiceKey& service_key) {
272- pthread_rwlock_wrlock (&rwlock_);
273- std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find (service_key);
274- if (service_it != service_cache_.end ()) {
275- delete service_it->second ;
276- service_cache_.erase (service_it);
277- }
278- pthread_rwlock_unlock (&rwlock_);
279- }
280-
281292ReturnCode InMemoryRegistry::UpdateServiceData (const ServiceKey& service_key,
282293 ServiceDataType data_type,
283294 ServiceData* service_data) {
284- if (service_data != NULL ) { // 更新服务数据指向服务
285- Service* service = GetOrCreateServiceInLock (service_key);
295+ Service* service = GetServiceInLock (service_key);
296+ if ( service != NULL ) { // 更新服务数据指向服务
286297 service->UpdateData (service_data);
287298 }
288299 ContextImpl* context_impl = context_->GetContextImpl ();
289300 if (data_type == kServiceDataInstances ) {
301+ if (service == NULL ) { // 服务被反注册了
302+ if (service_data != NULL ) {
303+ service_data->DecrementRef ();
304+ }
305+ return kReturnOk ;
306+ }
290307 ServiceData* old_service_data = service_instances_data_.Get (service_key);
291308 if (old_service_data != NULL ) {
292309 PluginManager::Instance ().OnPreUpdateServiceData (old_service_data, service_data);
@@ -307,13 +324,6 @@ ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key,
307324 POLARIS_ASSERT (false );
308325 }
309326 if (service_data == NULL ) { // Server Connector反注册Handler触发更新为NULL
310- if (data_type == kServiceDataInstances ) { // 删除服务实例数据时,同时删除服务
311- DeleteServiceInLock (service_key);
312- }
313- context_impl->GetServiceRecord ()->ServiceDataDelete (service_key,
314- data_type); // 同步记录Service数据删除
315- context_impl->GetCacheManager ()->GetCachePersist ().PersistServiceData (
316- service_key, data_type, " " ); // 异步删除磁盘服务数据
317327 return kReturnOk ;
318328 }
319329 context_impl->GetServiceRecord ()->ServiceDataUpdate (service_data); // 同步记录Service版本变化
@@ -389,4 +399,45 @@ ReturnCode InMemoryRegistry::UpdateSetCircuitBreakerData(
389399 return service->WriteCircuitBreakerUnhealthySets (unhealthy_sets);
390400}
391401
402+ ReturnCode InMemoryRegistry::GetCircuitBreakerInstances (const ServiceKey& service_key,
403+ ServiceData*& service_data,
404+ std::vector<Instance*>& open_instances) {
405+ service_data = service_instances_data_.Get (service_key, false );
406+ if (service_data == NULL ) {
407+ return kReturnServiceNotFound ;
408+ }
409+ if (service_data->GetDataStatus () < kDataIsSyncing ) {
410+ service_data->DecrementRef ();
411+ return kReturnServiceNotFound ;
412+ }
413+ // 由于此处获取service data没有更新访问时间,服务可能淘汰,不能直接使用其关联的服务数据
414+ pthread_rwlock_rdlock (&rwlock_);
415+ std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find (service_key);
416+ if (service_it == service_cache_.end ()) {
417+ pthread_rwlock_unlock (&rwlock_);
418+ return kReturnServiceNotFound ;
419+ }
420+ std::set<std::string> open_instance = service_it->second ->GetCircuitBreakerOpenInstances ();
421+ pthread_rwlock_unlock (&rwlock_);
422+
423+ ServiceInstances service_instances (service_data);
424+ std::map<std::string, Instance*>& instance_map = service_instances.GetInstances ();
425+ for (std::set<std::string>::iterator it = open_instance.begin (); it != open_instance.end ();
426+ ++it) {
427+ const std::string& instance_id = *it;
428+ std::map<std::string, Instance*>::iterator iter = instance_map.find (instance_id);
429+ if (iter == instance_map.end ()) {
430+ POLARIS_LOG (LOG_INFO, " The outlier detector of service[%s/%s] getting instance[%s] failed" ,
431+ service_key.namespace_ .c_str (), service_key.name_ .c_str (), instance_id.c_str ());
432+ continue ;
433+ }
434+ open_instances.push_back (iter->second );
435+ }
436+ if (open_instances.empty ()) {
437+ return kReturnInstanceNotFound ;
438+ }
439+ service_data->IncrementRef ();
440+ return kReturnOk ;
441+ }
442+
392443} // namespace polaris
0 commit comments