Skip to content

Commit cac13b6

Browse files
authored
Check that consumer exist and it is streaming consumer (#27429)
Signed-off-by: Nikolay Shestakov <[email protected]>
1 parent 08fc39c commit cac13b6

File tree

8 files changed

+75
-39
lines changed

8 files changed

+75
-39
lines changed

ydb/core/persqueue/pqrb/read_balancer__balancing.cpp

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,25 +1727,32 @@ void TBalancer::Handle(TEvPersQueue::TEvRegisterReadSession::TPtr& ev, const TAc
17271727
return;
17281728
}
17291729

1730-
// TODO MLP
1731-
// auto* consumerConfig = ::NKikimr::NPQ::GetConsumer(TopicActor.TabletConfig, consumerName);
1732-
// if (!consumerConfig || consumerConfig->GetType() != ::NKikimrPQ::TPQTabletConfig::EConsumerType::TPQTabletConfig_EConsumerType_CONSUMER_TYPE_STREAMING) {
1733-
// THolder<TEvPersQueue::TEvError> response(new TEvPersQueue::TEvError);
1734-
// response->Record.SetCode(NPersQueue::NErrorCode::BAD_REQUEST);
1735-
// response->Record.SetDescription(TStringBuilder() << "consumer \"" << consumerName << "\" not found in topic " << Topic());
1736-
// ctx.Send(ev->Sender, response.Release());
1737-
// return;
1738-
// }
1730+
auto* consumerConfig = ::NKikimr::NPQ::GetConsumer(TopicActor.TabletConfig, consumerName);
1731+
if (!consumerConfig) {
1732+
auto response = std::make_unique<TEvPersQueue::TEvError>();
1733+
response->Record.SetCode(NPersQueue::NErrorCode::BAD_REQUEST);
1734+
response->Record.SetDescription(TStringBuilder() << "consumer \"" << consumerName << "\" was not found in topic '" << Topic() << "'");
1735+
ctx.Send(ev->Sender, std::move(response));
1736+
return;
1737+
}
1738+
1739+
if (consumerConfig->GetType() != ::NKikimrPQ::TPQTabletConfig::EConsumerType::TPQTabletConfig_EConsumerType_CONSUMER_TYPE_STREAMING) {
1740+
auto response = std::make_unique<TEvPersQueue::TEvError>();
1741+
response->Record.SetCode(NPersQueue::NErrorCode::BAD_REQUEST);
1742+
response->Record.SetDescription(TStringBuilder() << "consumer \"" << consumerName << "\" is not streaming");
1743+
ctx.Send(ev->Sender, std::move(response));
1744+
return;
1745+
}
17391746

17401747
std::vector<ui32> partitions;
17411748
partitions.reserve(r.GroupsSize());
17421749
for (auto& group : r.GetGroups()) {
17431750
auto partitionId = group - 1;
17441751
if (group == 0 || !GetPartitionInfo(partitionId)) {
1745-
THolder<TEvPersQueue::TEvError> response(new TEvPersQueue::TEvError);
1752+
auto response = std::make_unique<TEvPersQueue::TEvError>();
17461753
response->Record.SetCode(NPersQueue::NErrorCode::BAD_REQUEST);
17471754
response->Record.SetDescription(TStringBuilder() << "no group " << group << " in topic " << Topic());
1748-
ctx.Send(ev->Sender, response.Release());
1755+
ctx.Send(ev->Sender, std::move(response));
17491756
return;
17501757
}
17511758
partitions.push_back(partitionId);

ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
476476
ReadSession.reset();
477477
PartitionStream.Reset();
478478

479-
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
479+
auto* factory = AppData(ctx)->PersQueueMirrorReaderFactory;
480480
PQ_ENSURE(factory);
481481

482482
TLog log(MakeHolder<TDeferredActorLogBackend>(

ydb/core/persqueue/ut/ut_with_sdk/mirrorer_autoscaling_ut.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ namespace NKikimr::NPersQueueTests {
380380
/*ui64 writeSpeed =*/writeSpeed,
381381
/*TString user =*/"",
382382
/*ui64 readSpeed =*/readSpeed,
383-
/*TVector<TString> rr =*/{},
383+
/*TVector<TString> rr =*/{"some_user"},
384384
/*TVector<TString> important =*/{},
385385
/*std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom =*/{},
386386
/*ui64 sourceIdMaxCount =*/6000000,
@@ -498,7 +498,7 @@ namespace NKikimr::NPersQueueTests {
498498
/*ui64 writeSpeed =*/writeSpeed,
499499
/*TString user =*/"",
500500
/*ui64 readSpeed =*/readSpeed,
501-
/*TVector<TString> rr =*/{},
501+
/*TVector<TString> rr =*/{"some_user"},
502502
/*TVector<TString> important =*/{},
503503
/*std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom =*/{},
504504
/*ui64 sourceIdMaxCount =*/6000000,
@@ -645,7 +645,7 @@ namespace NKikimr::NPersQueueTests {
645645
/*ui64 writeSpeed =*/writeSpeed,
646646
/*TString user =*/"",
647647
/*ui64 readSpeed =*/readSpeed,
648-
/*TVector<TString> rr =*/{},
648+
/*TVector<TString> rr =*/{"some_user"},
649649
/*TVector<TString> important =*/{},
650650
/*std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom =*/{},
651651
/*ui64 sourceIdMaxCount =*/6000000,
@@ -778,7 +778,7 @@ namespace NKikimr::NPersQueueTests {
778778
/*ui64 writeSpeed =*/writeSpeed,
779779
/*TString user =*/"",
780780
/*ui64 readSpeed =*/readSpeed,
781-
/*TVector<TString> rr =*/{},
781+
/*TVector<TString> rr =*/{"some_user"},
782782
/*TVector<TString> important =*/{},
783783
/*std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom =*/{},
784784
/*ui64 sourceIdMaxCount =*/6000000,
@@ -903,7 +903,7 @@ namespace NKikimr::NPersQueueTests {
903903
/*ui64 writeSpeed =*/writeSpeed,
904904
/*TString user =*/"",
905905
/*ui64 readSpeed =*/readSpeed,
906-
/*TVector<TString> rr =*/{},
906+
/*TVector<TString> rr =*/{"some_user"},
907907
/*TVector<TString> important =*/{},
908908
/*std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom =*/{},
909909
/*ui64 sourceIdMaxCount =*/6000000,
@@ -1038,7 +1038,7 @@ namespace NKikimr::NPersQueueTests {
10381038
/*ui64 writeSpeed =*/writeSpeed,
10391039
/*TString user =*/"",
10401040
/*ui64 readSpeed =*/readSpeed,
1041-
/*TVector<TString> rr =*/{},
1041+
/*TVector<TString> rr =*/{"some_user"},
10421042
/*TVector<TString> important =*/{},
10431043
/*std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom =*/{},
10441044
/*ui64 sourceIdMaxCount =*/6000000,

ydb/core/persqueue/ut/ut_with_sdk/mirrorer_ut.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,17 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
2626
TString srcTopicFullName = "rt3.dc1--" + srcTopic;
2727
TString dstTopicFullName = "rt3.dc1--" + dstTopic;
2828

29-
server.AnnoyingClient->CreateTopic(srcTopicFullName, partitionsCount);
29+
server.AnnoyingClient->CreateTopic(
30+
srcTopicFullName,
31+
partitionsCount,
32+
/*ui32 lowWatermark =*/ 8_MB,
33+
/*ui64 lifetimeS =*/ 86400,
34+
/*ui64 writeSpeed =*/ 20000000,
35+
/*TString user =*/ "",
36+
/*ui64 readSpeed =*/ 200000000,
37+
/*TVector<TString> rr =*/ {"some_user", "user"},
38+
/*TVector<TString> important =*/ {}
39+
);
3040

3141
NKikimrPQ::TMirrorPartitionConfig mirrorFrom;
3242
mirrorFrom.SetEndpoint("localhost");
@@ -45,7 +55,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
4555
/*ui64 writeSpeed =*/ 20000000,
4656
/*TString user =*/ "",
4757
/*ui64 readSpeed =*/ 200000000,
48-
/*TVector<TString> rr =*/ {},
58+
/*TVector<TString> rr =*/ {"user"},
4959
/*TVector<TString> important =*/ {},
5060
mirrorFrom
5161
);
@@ -113,7 +123,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
113123
auto createTopicReader = [&](const TString& topic) {
114124
auto settings = NTopic::TReadSessionSettings()
115125
.AppendTopics(NTopic::TTopicReadSettings(topic))
116-
.ConsumerName("shared/user")
126+
.ConsumerName("user")
117127
.Decompress(false);
118128

119129
return NTopic::TTopicClient(*driver).CreateReadSession(settings);

ydb/core/testlib/test_pq_client.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -957,7 +957,7 @@ class TFlatMsgBusPQClient : public NFlatTests::TFlatMsgBusClient {
957957
.DoWait = doWait,
958958
.CanWrite = canWrite,
959959
.Dc = dc,
960-
.ReadRules = rr,
960+
.ReadRules = std::move(rr),
961961
.Account = account,
962962
.ExpectFail = expectFail
963963
});
@@ -1026,7 +1026,9 @@ class TFlatMsgBusPQClient : public NFlatTests::TFlatMsgBusClient {
10261026
if (!UseConfigTables) {
10271027
path = TStringBuilder() << "/Root/PQ/" << name;
10281028
}
1029-
auto settings = NYdb::NPersQueue::TAlterTopicSettings().PartitionsCount(nParts);
1029+
auto settings = NYdb::NPersQueue::TAlterTopicSettings()
1030+
.PartitionsCount(nParts)
1031+
.ReadRules({NYdb::NPersQueue::TReadRuleSettings().ConsumerName("user")});
10301032
settings.RetentionPeriod(TDuration::Seconds(lifetimeS));
10311033
auto pqClient = NYdb::NPersQueue::TPersQueueClient(*Driver);
10321034
auto res = pqClient.AlterTopic(path, settings);

ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ namespace NYdb::NConsoleClient {
154154

155155
const TString topicPath = server.GetTopic();
156156
auto driver = server.Server->AnnoyingClient->GetDriver();
157-
server.Server->AnnoyingClient->CreateConsumer("cli");
158157
NPersQueue::TPersQueueClient persQueueClient(*driver);
159158

160159
WriteTestData(driver, topicPath, dataToWrite);
@@ -179,7 +178,7 @@ namespace NYdb::NConsoleClient {
179178

180179
NTopic::TReadSessionSettings PrepareReadSessionSettings(const std::string& topicPath) {
181180
NTopic::TReadSessionSettings settings;
182-
settings.ConsumerName("cli");
181+
settings.ConsumerName("user");
183182
settings.AppendTopics(topicPath);
184183

185184
return settings;

ydb/services/persqueue_v1/persqueue_compat_ut.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,26 @@ class TPQv1CompatTestBase {
3333
Server->EnableLogs({ NKikimrServices::PQ_READ_PROXY, NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_METACACHE });
3434

3535

36-
Server->AnnoyingClient->CreateTopicNoLegacy("rt3.dc2--account--topic1", 1, true, false);
37-
Server->AnnoyingClient->CreateTopicNoLegacy("rt3.dc1--account--topic1", 1, true);
38-
36+
Server->AnnoyingClient->CreateTopicNoLegacy("rt3.dc2--account--topic1", 1, true, false, std::nullopt, {"user", "test-consumer"});
37+
Server->AnnoyingClient->CreateTopicNoLegacy("rt3.dc1--account--topic1", 1, true, true, std::nullopt, {"user", "test-consumer"});
3938
Server->WaitInit("account/topic1");
39+
4040
Server->AnnoyingClient->MkDir("/Root", "LbCommunal");
4141
Server->AnnoyingClient->MkDir("/Root/LbCommunal", "account");
4242
Server->AnnoyingClient->CreateTopicNoLegacy(
43-
"/Root/LbCommunal/account/topic2", 1, true, true, {}, {}, "account"
43+
"/Root/LbCommunal/account/topic2", 1, true, true, {}, {"user", "test-consumer"}, "account"
4444
);
4545
Server->AnnoyingClient->CreateTopicNoLegacy(
46-
"/Root/LbCommunal/account/topic2-mirrored-from-dc2", 1, true, false, {}, {}, "account"
46+
"/Root/LbCommunal/account/topic2-mirrored-from-dc2", 1, true, false, {}, {"user", "test-consumer"}, "account"
4747
);
4848

4949
Server->AnnoyingClient->MkDir("/Root", "LbCommunal");
5050
Server->AnnoyingClient->MkDir("/Root/LbCommunal", "account2");
5151
Server->AnnoyingClient->CreateTopicNoLegacy(
52-
"/Root/LbCommunal/account2/topic3", 1, true, true, {}, {}, "account2"
52+
"/Root/LbCommunal/account2/topic3", 1, true, true, {}, {"test-consumer"}, "account2"
5353
);
5454
Server->AnnoyingClient->CreateTopicNoLegacy(
55-
"/Root/LbCommunal/account2/topic3-mirrored-from-dc2", 1, true, false, {}, {}, "account2"
55+
"/Root/LbCommunal/account2/topic3-mirrored-from-dc2", 1, true, false, {}, {"test-consumer"}, "account2"
5656
);
5757

5858

@@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(TPQCompatTest) {
127127
Cerr << "Got session closed event" << otherEv->DebugString() << Endl;
128128
}
129129
}
130-
UNIT_ASSERT(lockEvent);
130+
UNIT_ASSERT_C(lockEvent, JoinRange(", ", paths.begin(), paths.end()));
131131
Cerr << "Got lock event: " << lockEvent->DebugString() << Endl;
132132
const auto& path = lockEvent->GetPartitionStream()->GetTopicPath();
133133
const auto& cluster = lockEvent->GetPartitionStream()->GetCluster();

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2520,9 +2520,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
25202520
request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW);
25212521
request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_CUSTOM + 42);
25222522

2523-
auto consumer = request.add_consumers();
2524-
consumer->set_name("first-consumer");
2525-
consumer->set_important(false);
2523+
{
2524+
auto consumer = request.add_consumers();
2525+
consumer->set_name("first-consumer");
2526+
consumer->set_important(false);
2527+
}
2528+
{
2529+
auto consumer = request.add_consumers();
2530+
consumer->set_name("user");
2531+
consumer->set_important(false);
2532+
}
25262533
grpc::ClientContext rcontext;
25272534

25282535
auto status = TopicStubP_->CreateTopic(&rcontext, request, &response);
@@ -4416,7 +4423,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
44164423
auto checkCounters = [](auto monPort, const TString& session,
44174424
const std::set<std::string>& canonicalSensorNames,
44184425
const TString& clientDc, const TString& originDc,
4419-
const TString& client, const TString& consumerPath) {
4426+
const TString& client,
4427+
const TString& consumerPath) {
44204428
NJson::TJsonValue counters;
44214429

44224430
if (clientDc.empty() && originDc.empty()) {
@@ -4492,7 +4500,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
44924500

44934501
GetClassifierUpdate(*server.CleverServer, sender); //wait for initializing
44944502

4495-
server.AnnoyingClient->CreateTopic("rt3.dc1--account--topic1", 10, 10000, 10000, 2000);
4503+
server.AnnoyingClient->CreateTopic(
4504+
"rt3.dc1--account--topic1",
4505+
10,
4506+
10000,
4507+
10000,
4508+
2000,
4509+
"",
4510+
200000000,
4511+
{ consumerName }
4512+
);
44964513

44974514
auto driver = server.AnnoyingClient->GetDriver();
44984515

@@ -4568,7 +4585,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
45684585
{
45694586
NYdb::NPersQueue::TReadSessionSettings settings;
45704587
settings.ConsumerName(originallyProvidedConsumerName)
4571-
.AppendTopics(std::string{"account/topic1"}).ReadOriginal({"dc1"})
4588+
.AppendTopics(std::string{"account/topic1"})
4589+
.ReadOriginal({"dc1"})
45724590
.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}});
45734591

45744592
auto reader = CreateReader(*driver, settings);

0 commit comments

Comments
 (0)