@@ -335,7 +335,8 @@ bool GrpcServerConnector::SendDiscoverRequest(ServiceListener& service_listener)
335335 }
336336 if (discover_stream_state_ != kDiscoverStreamInit ) {
337337 const ServiceKey& discover_service = context_->GetContextImpl ()->GetDiscoverService ().service_ ;
338- if (isEmpty (discover_service)) {
338+ if (discover_service.name_ .empty ()) {
339+ POLARIS_LOG (LOG_INFO, " discover service is empty, state transive to DiscoverStreamInit" );
339340 discover_stream_state_ = kDiscoverStreamInit ;
340341 } else {
341342 if (service_key.name_ != discover_service.name_ || service_key.namespace_ != discover_service.namespace_ ) {
@@ -572,6 +573,10 @@ ReturnCode GrpcServerConnector::SelectInstance(const ServiceKey& service_key, ui
572573 return ConsumerApiImpl::GetSystemServer (context_, service_key, criteria, *instance, timeout);
573574}
574575
576+ SeedServer& GrpcServerConnector::SelectSeed () {
577+ return server_lists_[rand () % server_lists_.size ()];
578+ }
579+
575580void GrpcServerConnector::ServerSwitch () {
576581 if (server_switch_state_ == kServerSwitchNormal // 服务调用出错或超时触发切换
577582 || server_switch_state_ == kServerSwitchBegin ) { // 切换后异步连接回调触发重新
@@ -590,12 +595,12 @@ void GrpcServerConnector::ServerSwitch() {
590595 // 选择一个服务器
591596 std::string host;
592597 int port = 0 ;
593- if (discover_stream_state_ >= kDiscoverStreamGetInstance ) { // 说明内部服务已经返回
598+ const ServiceKey& discover_service = context_->GetContextImpl ()->GetDiscoverService ().service_ ;
599+ if (!discover_service.name_ .empty () && discover_stream_state_ >= kDiscoverStreamGetInstance ) { // 说明内部服务已经返回
594600 if (discover_instance_ != nullptr ) {
595601 delete discover_instance_;
596602 discover_instance_ = nullptr ;
597603 }
598- const ServiceKey& discover_service = context_->GetContextImpl ()->GetDiscoverService ().service_ ;
599604 bool ignore_half_open = server_switch_state_ != kServerSwitchPeriodic ; // 周期切换才选半开节点
600605 ReturnCode ret_code = SelectInstance (discover_service, 0 , &discover_instance_, ignore_half_open);
601606 if (ret_code == kReturnOk ) {
@@ -612,7 +617,7 @@ void GrpcServerConnector::ServerSwitch() {
612617 }
613618 if (host.empty ()) {
614619 discover_stream_state_ = kDiscoverStreamNotInit ;
615- SeedServer& server = server_lists_[ rand () % server_lists_. size ()] ;
620+ SeedServer& server = SelectSeed () ;
616621 host = server.ip_ ;
617622 port = server.port_ ;
618623 POLARIS_LOG (LOG_INFO, " discover stream switch to seed server[%s:%d]" , host.c_str (), port);
@@ -837,11 +842,19 @@ bool GrpcServerConnector::GetInstance(BlockRequest* block_request) {
837842 POLARIS_ASSERT (block_request != nullptr );
838843 POLARIS_ASSERT (block_request->instance_ == nullptr );
839844 const ServiceKey& service = GetPolarisService (context_, block_request->request_type_ );
845+ if (service.name_ .empty ()) {
846+ SeedServer& seedServer = SelectSeed ();
847+ block_request->host_ = seedServer.ip_ ;
848+ block_request->port_ = seedServer.port_ ;
849+ return true ;
850+ }
840851 ReturnCode ret_code = SelectInstance (service, block_request->request_timeout_ , &block_request->instance_ );
841852 if (ret_code == kReturnOk ) {
842853 POLARIS_ASSERT (block_request->instance_ != nullptr );
843854 POLARIS_LOG (LOG_DEBUG, " get server:%s:%d for %s" , block_request->instance_ ->GetHost ().c_str (),
844855 block_request->instance_ ->GetPort (), PolarisRequestTypeStr (block_request->request_type_ ));
856+ block_request->host_ = block_request->instance_ ->GetHost ();
857+ block_request->port_ = block_request->instance_ ->GetPort ();
845858 return true ;
846859 } else {
847860 POLARIS_ASSERT (block_request->instance_ == nullptr );
@@ -852,7 +865,9 @@ bool GrpcServerConnector::GetInstance(BlockRequest* block_request) {
852865}
853866
854867void GrpcServerConnector::UpdateCallResult (BlockRequest* block_request) {
855- POLARIS_ASSERT (block_request->instance_ != nullptr );
868+ if (block_request->instance_ == nullptr ) {
869+ return ;
870+ }
856871 const ServiceKey& service = GetPolarisService (context_, block_request->request_type_ );
857872 CallRetStatus status = kCallRetOk ;
858873 if (kServerCodeConnectError <= block_request->server_code_ &&
@@ -876,7 +891,9 @@ BlockRequest::BlockRequest(PolarisRequestType request_type, GrpcServerConnector&
876891 message_(nullptr ),
877892 promise_(nullptr ),
878893 instance_(nullptr ),
879- grpc_client_(nullptr ) {}
894+ host_(" " ),
895+ port_(0 ),
896+ grpc_client_(nullptr ) {}
880897
881898BlockRequest::~BlockRequest () {
882899 if (instance_ != nullptr ) {
@@ -934,18 +951,18 @@ bool BlockRequest::PrepareClient() {
934951
935952 // 建立grpc客户端,并尝试连接
936953 grpc_client_ = new grpc::GrpcClient (connector_.GetReactor ());
937- if (!grpc_client_->ConnectTo (instance_-> GetHost (), instance_-> GetPort () ) ||
954+ if (!grpc_client_->ConnectTo (host_, port_ ) ||
938955 !grpc_client_->WaitConnected (request_timeout_)) {
939956 POLARIS_LOG (LOG_ERROR, " %s connect to server[%s:%d] timeout" , PolarisRequestTypeStr (request_type_),
940- instance_-> GetHost () .c_str (), instance_-> GetPort () );
957+ host_ .c_str (), port_ );
941958 server_code_ = kServerCodeConnectError ;
942959 connector_.UpdateCallResult (this );
943960 return false ;
944961 }
945962 uint64_t use_time = Time::GetCoarseSteadyTimeMs () - begin_time;
946963 if (use_time >= request_timeout_) {
947964 POLARIS_LOG (LOG_ERROR, " %s connect to server[%s:%d] timeout" , PolarisRequestTypeStr (request_type_),
948- instance_-> GetHost () .c_str (), instance_-> GetPort () );
965+ host_ .c_str (), port_ );
949966 server_code_ = kServerCodeConnectError ;
950967 connector_.UpdateCallResult (this );
951968 return false ;
@@ -1020,6 +1037,8 @@ AsyncRequest::AsyncRequest(Reactor& reactor, GrpcServerConnector* connector, Pol
10201037 timeout_(timeout),
10211038 callback_(callback),
10221039 server_(nullptr ),
1040+ host_(" " ),
1041+ port_(0 ),
10231042 client_(nullptr ),
10241043 timing_task_(connector->GetReactor ().TimingTaskEnd()) {}
10251044
@@ -1046,16 +1065,23 @@ bool AsyncRequest::Submit() {
10461065 }
10471066
10481067 const ServiceKey& service = GetPolarisService (connector_->context_ , request_type_);
1049- ReturnCode ret_code = connector_->SelectInstance (service, 0 , &server_);
1050- if (ret_code != kReturnOk ) {
1051- callback_ (ret_code, " select server failed" , nullptr );
1052- return false ;
1068+ if (service.name_ .empty ()) {
1069+ SeedServer& seedServer = connector_->SelectSeed ();
1070+ host_ = seedServer.ip_ ;
1071+ port_ = seedServer.port_ ;
1072+ } else {
1073+ ReturnCode ret_code = connector_->SelectInstance (service, 0 , &server_);
1074+ if (ret_code != kReturnOk ) {
1075+ callback_ (ret_code, " select server failed" , nullptr );
1076+ return false ;
1077+ }
1078+ host_ = server_->GetHost ();
1079+ port_ = server_->GetPort ();
10531080 }
1054-
10551081 connector_->async_request_map_ [request_id_] = this ; // 记录请求
10561082 // 尝试建立连接
10571083 client_ = new grpc::GrpcClient (reactor_);
1058- client_->Connect (server_-> GetHost (), server_-> GetPort () , GetTimeLeft (),
1084+ client_->Connect (host_, port_ , GetTimeLeft (),
10591085 std::bind (&AsyncRequest::OnConnect, this , std::placeholders::_1));
10601086 return true ;
10611087}
0 commit comments