Skip to content

Commit 94ff246

Browse files
authored
EXT-1333 Refactor pq grpc requests (#27620)
1 parent 959c4f9 commit 94ff246

File tree

13 files changed

+65
-45
lines changed

13 files changed

+65
-45
lines changed

ydb/core/grpc_services/grpc_request_proxy.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -670,8 +670,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
670670
HFunc(TEvStreamTopicWriteRequest, PreHandle);
671671
HFunc(TEvStreamTopicReadRequest, PreHandle);
672672
HFunc(TEvStreamTopicDirectReadRequest, PreHandle);
673-
HFunc(TEvPQReadInfoRequest, PreHandle);
674-
HFunc(TEvDiscoverPQClustersRequest, PreHandle);
675673
HFunc(TEvCoordinationSessionRequest, PreHandle);
676674
HFunc(TEvProxyRuntimeEvent, PreHandle);
677675
HFunc(TEvRequestAuthAndCheck, PreHandle);

ydb/core/grpc_services/grpc_request_proxy_handle_methods.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ class TGRpcRequestProxyHandleMethods {
1616
static void Handle(TEvStreamTopicWriteRequest::TPtr& ev, const TActorContext& ctx);
1717
static void Handle(TEvStreamTopicReadRequest::TPtr& ev, const TActorContext& ctx);
1818
static void Handle(TEvStreamTopicDirectReadRequest::TPtr& ev, const TActorContext& ctx);
19-
static void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
20-
static void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx);
2119
static void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx);
2220
};
2321

ydb/core/grpc_services/rpc_calls.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,6 @@ using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices
5353
using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer>;
5454
using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>;
5555
using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer>;
56-
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
57-
//TODO: Change this to runtime dispatching!
58-
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
59-
using TEvListFederationDatabasesRequest = TGRpcRequestWrapper<TRpcServices::EvListFederationDatabases, Ydb::FederationDiscovery::ListFederationDatabasesRequest, Ydb::FederationDiscovery::ListFederationDatabasesResponse, true>;
6056

6157
using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>;
6258

ydb/core/grpc_services/rpc_calls_topic.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ using TEvPQAlterTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::Alt
2121
using TEvPQDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse>;
2222
using TEvPQAddReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse>;
2323
using TEvPQRemoveReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse>;
24+
using TEvPQReadInfoRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse>;
2425

2526
}

ydb/core/grpc_services/service_topic.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ void DoPQAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProv
2828
void DoPQDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2929
void DoPQAddReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
3030
void DoPQRemoveReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
31+
void DoPQReadInfoRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
3132

3233
}
3334
}

ydb/services/persqueue_cluster_discovery/cluster_discovery_service.cpp

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "counters.h"
55

66
#include <ydb/core/base/appdata.h>
7+
#include <ydb/core/grpc_services/base/base.h>
78
#include <ydb/core/grpc_services/grpc_request_proxy.h>
89
#include <ydb/core/mind/address_classification/net_classifier.h>
910
#include <ydb/core/mon/mon.h>
@@ -196,22 +197,23 @@ class TClusterDiscoveryServiceActor: public TActorBootstrapped<TClusterDiscovery
196197
switch (ev->GetTypeRewrite()) {
197198
hFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, HandleClassifierUpdateWhileIniting);
198199
hFunc(NClusterTracker::TEvClusterTracker::TEvClustersUpdate, HandleClustersUpdateWhileIniting);
199-
hFunc(NGRpcService::TEvDiscoverPQClustersRequest, HandleDiscoverPQClustersRequestWhileIniting);
200+
fFunc(NGRpcService::TRpcServices::EvDiscoverPQClusters, HandleDiscoverPQClustersRequestWhileIniting);
200201
hFunc(NMon::TEvHttpInfo, HandleHttpRequest);
201202
hFunc(TEvents::TEvWakeup, UpdateTimedCounters);
202203
}
203204
}
204205

205-
void RespondServiceUnavailable(NGRpcService::TEvDiscoverPQClustersRequest::TPtr& ev) {
206+
void RespondServiceUnavailable(NGRpcService::TEvDiscoverPQClustersRequest* ev) {
206207
Counters->DroppedRequestsCount->Inc();
207208

208-
ev->Get()->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
209+
ev->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
209210
}
210211

211-
void HandleDiscoverPQClustersRequestWhileIniting(NGRpcService::TEvDiscoverPQClustersRequest::TPtr& ev) {
212+
void HandleDiscoverPQClustersRequestWhileIniting(TAutoPtr<NActors::IEventHandle>& ev) {
213+
ev->DropRewrite();
212214
Counters->TotalRequestsCount->Inc();
213215

214-
RespondServiceUnavailable(ev);
216+
RespondServiceUnavailable(ev->Get<NGRpcService::TEvDiscoverPQClustersRequest>());
215217
}
216218

217219
void HandleClassifierUpdateWhileWorking(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev) {
@@ -222,23 +224,24 @@ class TClusterDiscoveryServiceActor: public TActorBootstrapped<TClusterDiscovery
222224
UpdateClustersList(ev);
223225
}
224226

225-
void HandleDiscoverPQClustersRequestWhileWorking(NGRpcService::TEvDiscoverPQClustersRequest::TPtr& ev) {
227+
void HandleDiscoverPQClustersRequestWhileWorking(TAutoPtr<NActors::IEventHandle>& ev) {
228+
ev->DropRewrite();
226229
Counters->TotalRequestsCount->Inc();
227230

228231
if (!IsHealthy()) {
229-
RespondServiceUnavailable(ev);
232+
RespondServiceUnavailable(ev->Get<NGRpcService::TEvDiscoverPQClustersRequest>());
230233
return;
231234
}
232235

233-
IActor* actor = NWorker::CreateClusterDiscoveryWorker(ev, DatacenterClassifier, CloudNetworksClassifier, ClustersList, Counters);
236+
IActor* actor = NWorker::CreateClusterDiscoveryWorker(THolder(ev->Release<NGRpcService::TEvDiscoverPQClustersRequest>().Release()), DatacenterClassifier, CloudNetworksClassifier, ClustersList, Counters);
234237
Register(actor, TMailboxType::HTSwap, AppData(Ctx())->UserPoolId);
235238
}
236239

237240
STATEFN(Working) {
238241
switch (ev->GetTypeRewrite()) {
239242
hFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, HandleClassifierUpdateWhileWorking);
240243
hFunc(NClusterTracker::TEvClusterTracker::TEvClustersUpdate, HandleClustersUpdateWhileWorking);
241-
hFunc(NGRpcService::TEvDiscoverPQClustersRequest, HandleDiscoverPQClustersRequestWhileWorking);
244+
fFunc(NGRpcService::TRpcServices::EvDiscoverPQClusters, HandleDiscoverPQClustersRequestWhileWorking);
242245
hFunc(NMon::TEvHttpInfo, HandleHttpRequest);
243246
hFunc(TEvents::TEvWakeup, UpdateTimedCounters);
244247
}

ydb/services/persqueue_cluster_discovery/cluster_discovery_worker.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ class TClusterDiscoveryWorker : public TActorBootstrapped<TClusterDiscoveryWorke
2727
return NKikimrServices::TActivity::GRPC_REQ;
2828
}
2929

30-
TClusterDiscoveryWorker(TEvDiscoverPQClustersRequest::TPtr& ev,
30+
TClusterDiscoveryWorker(THolder<NGRpcService::TEvDiscoverPQClustersRequest> ev,
3131
TLabeledAddressClassifier::TConstPtr datacenterClassifier,
3232
TLabeledAddressClassifier::TConstPtr cloudNetsClassifier,
3333
TClustersList::TConstPtr clustersList,
3434
TClusterDiscoveryCounters::TPtr counters)
35-
: Request(ev->Release().Release())
35+
: Request(std::move(ev))
3636
, DatacenterClassifier(std::move(datacenterClassifier))
3737
, CloudNetsClassifier(std::move(cloudNetsClassifier))
3838
, ClustersList(std::move(clustersList))
@@ -271,13 +271,13 @@ class TClusterDiscoveryWorker : public TActorBootstrapped<TClusterDiscoveryWorke
271271
TClusterDiscoveryCounters::TPtr Counters;
272272
};
273273

274-
IActor* CreateClusterDiscoveryWorker(TEvDiscoverPQClustersRequest::TPtr& ev,
274+
IActor* CreateClusterDiscoveryWorker(THolder<NGRpcService::TEvDiscoverPQClustersRequest> ev,
275275
TLabeledAddressClassifier::TConstPtr datacenterClassifier,
276276
TLabeledAddressClassifier::TConstPtr cloudNetsClassifier,
277277
TClustersList::TConstPtr clustersList,
278278
TClusterDiscoveryCounters::TPtr counters)
279279
{
280-
return new TClusterDiscoveryWorker(ev,
280+
return new TClusterDiscoveryWorker(std::move(ev),
281281
std::move(datacenterClassifier),
282282
std::move(cloudNetsClassifier),
283283
std::move(clustersList),

ydb/services/persqueue_cluster_discovery/cluster_discovery_worker.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include "counters.h"
4+
#include "grpc_service.h"
45

56
#include <ydb/core/grpc_services/grpc_request_proxy.h>
67
#include <ydb/core/persqueue/public/cluster_tracker/cluster_tracker.h>
@@ -16,7 +17,7 @@ using namespace NGRpcService;
1617
using namespace NPQ::NClusterTracker;
1718
using namespace NCounters;
1819

19-
IActor* CreateClusterDiscoveryWorker(TEvDiscoverPQClustersRequest::TPtr& ev,
20+
IActor* CreateClusterDiscoveryWorker(THolder<NGRpcService::TEvDiscoverPQClustersRequest> ev,
2021
TLabeledAddressClassifier::TConstPtr datacenterClassifier,
2122
TLabeledAddressClassifier::TConstPtr cloudNetsClassifier,
2223
TClustersList::TConstPtr clustersList,

ydb/services/persqueue_cluster_discovery/grpc_service.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,19 @@ void TGRpcPQClusterDiscoveryService::DecRequest() {
5555
}
5656
}
5757

58+
static void DoDiscoverPQClustersRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider&) {
59+
auto ev = dynamic_cast<TEvDiscoverPQClustersRequest*>(ctx.release());
60+
Y_ENSURE(ev);
61+
62+
auto evHandle = std::make_unique<NActors::IEventHandle>(
63+
NPQ::NClusterDiscovery::MakeClusterDiscoveryServiceID(),
64+
NPQ::NClusterDiscovery::MakeClusterDiscoveryServiceID(),
65+
ev
66+
);
67+
evHandle->Rewrite(TRpcServices::EvDiscoverPQClusters, NPQ::NClusterDiscovery::MakeClusterDiscoveryServiceID());
68+
NActors::TActivationContext::Send(std::move(evHandle));
69+
}
70+
5871
void TGRpcPQClusterDiscoveryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
5972
auto getCounterBlock = NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
6073

@@ -70,7 +83,7 @@ void TGRpcPQClusterDiscoveryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr
7083
#NAME, logger, getCounterBlock("pq_cluster_discovery", #NAME))->Run();
7184

7285
ADD_REQUEST(DiscoverClusters, DiscoverClustersRequest, DiscoverClustersResponse, {
73-
ActorSystem_->Send(GRpcRequestProxyId_, new TEvDiscoverPQClustersRequest(ctx));
86+
ActorSystem_->Send(GRpcRequestProxyId_, new TEvDiscoverPQClustersRequest(ctx, DoDiscoverPQClustersRequest, TRequestAuxSettings{TRateLimiterMode::Off, nullptr, TAuditMode::NonModifying()}));
7487
})
7588
#undef ADD_REQUEST
7689

@@ -80,8 +93,4 @@ void TGRpcPQClusterDiscoveryService::StopService() noexcept {
8093
TGrpcServiceBase::StopService();
8194
}
8295

83-
void TGRpcRequestProxyHandleMethods::Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx) {
84-
ctx.Send(ev->Forward(NPQ::NClusterDiscovery::MakeClusterDiscoveryServiceID()));
85-
}
86-
8796
} // namespace NKikimr::NGRpcService

ydb/services/persqueue_cluster_discovery/grpc_service.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
#pragma once
22

3+
#include <ydb/core/grpc_services/base/base.h>
34
#include <ydb/library/actors/core/actorsystem_fwd.h>
45
#include <ydb/library/actors/core/actorid.h>
56
#include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h>
67
#include <ydb/library/grpc/server/grpc_server.h>
78

89
namespace NKikimr::NGRpcService {
910

11+
using TEvDiscoverPQClustersRequest = TGrpcRequestOperationCall<Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse>;
12+
1013
class TGRpcPQClusterDiscoveryService
1114
: public NYdbGrpc::TGrpcServiceBase<Ydb::PersQueue::V1::ClusterDiscoveryService> {
1215
public:

0 commit comments

Comments
 (0)