Skip to content

Commit 58ec6db

Browse files
authored
Supported big messages by MLP reader (#27656)
1 parent fcdbfe7 commit 58ec6db

File tree

8 files changed

+60
-24
lines changed

8 files changed

+60
-24
lines changed

ydb/core/persqueue/pqtablet/partition/mlp/mlp_common.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ bool IsSucess(const TEvPQ::TEvProxyResponse::TPtr& ev) {
5353
ev->Get()->Response->GetErrorCode() == NPersQueue::NErrorCode::OK;
5454
}
5555

56+
bool IsSucess(const TEvPersQueue::TEvResponse::TPtr& ev) {
57+
return ev->Get()->Record.GetStatus() == NMsgBusProxy::MSTATUS_OK &&
58+
ev->Get()->Record.GetErrorCode() == NPersQueue::NErrorCode::OK;
59+
}
60+
5661
ui64 GetCookie(const TEvPQ::TEvProxyResponse::TPtr& ev) {
5762
return ev->Get()->Response->GetCookie();
5863
}

ydb/core/persqueue/pqtablet/partition/mlp/mlp_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ std::unique_ptr<TEvPQ::TEvSetClientInfo> MakeEvCommit(
4141
);
4242

4343
bool IsSucess(const TEvPQ::TEvProxyResponse::TPtr& ev);
44+
bool IsSucess(const TEvPersQueue::TEvResponse::TPtr& ev);
4445
ui64 GetCookie(const TEvPQ::TEvProxyResponse::TPtr& ev);
4546

4647
} // namespace NKikimr::NPQ::NMLP

ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ void TConsumerActor::HandleOnWrite(TEvKeyValue::TEvResponse::TPtr& ev) {
221221

222222
if (!PendingReadQueue.empty()) {
223223
auto msgs = std::exchange(PendingReadQueue, {});
224-
RegisterWithSameMailbox(new TMessageEnricherActor(PartitionId, PartitionActorId, Config.GetName(), std::move(msgs))); // TODO excahnge
224+
RegisterWithSameMailbox(new TMessageEnricherActor(TabletActorId, PartitionId, Config.GetName(), std::move(msgs)));
225225
}
226226
ReplyOk<TEvPQ::TEvMLPCommitResponse>(SelfId(), PendingCommitQueue);
227227
ReplyOk<TEvPQ::TEvMLPUnlockResponse>(SelfId(), PendingUnlockQueue);

ydb/core/persqueue/pqtablet/partition/mlp/mlp_message_enricher.cpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
namespace NKikimr::NPQ::NMLP {
44

5-
TMessageEnricherActor::TMessageEnricherActor(ui32 partitionId, const TActorId& partitionActor, const TString& consumerName, std::deque<TReadResult>&& replies)
5+
TMessageEnricherActor::TMessageEnricherActor(const TActorId& tabletActorId, ui32 partitionId, const TString& consumerName, std::deque<TReadResult>&& replies)
66
: TBaseActor(NKikimrServices::EServiceKikimr::PQ_MLP_ENRICHER)
7+
, TabletActorId(tabletActorId)
78
, PartitionId(partitionId)
8-
, PartitionActorId(partitionActor)
99
, ConsumerName(consumerName)
1010
, Queue(std::move(replies))
1111
, Backoff(5, TDuration::MilliSeconds(50))
@@ -28,22 +28,17 @@ void TMessageEnricherActor::PassAway() {
2828
TBase::PassAway();
2929
}
3030

31-
void TMessageEnricherActor::Handle(TEvPQ::TEvProxyResponse::TPtr& ev) {
32-
LOG_D("Handle TEvPQ::TEvProxyResponse");
33-
if (Cookie != GetCookie(ev)) {
34-
// TODO MLP
35-
LOG_D("Cookie mismatch: " << Cookie << " != " << GetCookie(ev));
36-
//return PassAway();
37-
}
31+
void TMessageEnricherActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev) {
32+
LOG_D("Handle TEvPersQueue::TEvResponse");
3833

3934
if (!IsSucess(ev)) {
40-
LOG_W("Fetch messages failed: " << ev->Get()->Response->DebugString());
35+
LOG_W("Fetch messages failed: " << ev->Get()->Record.DebugString());
4136
return PassAway();
4237
}
4338

44-
auto& response = ev->Get()->Response;
45-
if (response->GetPartitionResponse().HasCmdReadResult()) {
46-
for (auto& result : response->GetPartitionResponse().GetCmdReadResult().GetResult()) {
39+
auto& response = ev->Get()->Record;
40+
if (response.GetPartitionResponse().HasCmdReadResult()) {
41+
for (auto& result : response.GetPartitionResponse().GetCmdReadResult().GetResult()) {
4742
auto offset = result.GetOffset();
4843

4944
while(!Queue.empty()) {
@@ -54,7 +49,6 @@ void TMessageEnricherActor::Handle(TEvPQ::TEvProxyResponse::TPtr& ev) {
5449
while (!reply.Offsets.empty() && offset > reply.Offsets.front()) {
5550
reply.Offsets.pop_front();
5651
}
57-
// TODO MLP multi part messages
5852
if (!reply.Offsets.empty() && offset == reply.Offsets.front()) {
5953
auto* message = PendingResponse->Record.AddMessage();
6054
message->MutableId()->SetPartitionId(PartitionId);
@@ -100,7 +94,7 @@ void TMessageEnricherActor::Handle(TEvents::TEvWakeup::TPtr&) {
10094

10195
STFUNC(TMessageEnricherActor::StateWork) {
10296
switch (ev->GetTypeRewrite()) {
103-
hFunc(TEvPQ::TEvProxyResponse, Handle);
97+
hFunc(TEvPersQueue::TEvResponse, Handle);
10498
hFunc(TEvPQ::TEvError, Handle);
10599
hFunc(TEvents::TEvWakeup, Handle);
106100
sFunc(TEvents::TEvPoison, PassAway);
@@ -122,8 +116,17 @@ void TMessageEnricherActor::ProcessQueue() {
122116
auto firstOffset = reply.Offsets.front();
123117
auto lastOffset = Queue.back().Offsets.back();
124118
auto count = lastOffset - firstOffset + 1;
125-
LOG_D("Fetching from offset " << firstOffset << " count " << count << " from " << PartitionActorId);
126-
Send(PartitionActorId, MakeEvRead(SelfId(), ConsumerName, firstOffset, count, ++Cookie));
119+
LOG_D("Fetching from offset " << firstOffset << " count " << count << " from " << TabletActorId);
120+
121+
auto request = std::make_unique<TEvPersQueue::TEvRequest>();
122+
auto* partitionRequest = request->Record.MutablePartitionRequest();
123+
partitionRequest->SetPartition(PartitionId);
124+
auto* read = partitionRequest->MutableCmdRead();
125+
read->SetClientId(ConsumerName);
126+
read->SetOffset(firstOffset);
127+
read->SetTimeoutMs(0);
128+
129+
Send(TabletActorId, std::move(request), 0, ++Cookie);
127130

128131
return;
129132
}

ydb/core/persqueue/pqtablet/partition/mlp/mlp_message_enricher.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@ class TMessageEnricherActor : public TBaseActor<TMessageEnricherActor>
1515
static constexpr TDuration Timeout = TDuration::Seconds(1);
1616

1717
public:
18-
TMessageEnricherActor(const ui32 partitionId,
19-
const TActorId& partitionActor,
18+
TMessageEnricherActor(const TActorId& tabletActorId,
19+
const ui32 partitionId,
2020
const TString& consumerName,
2121
std::deque<TReadResult>&& replies);
2222

2323
void Bootstrap();
2424
void PassAway() override;
2525

2626
private:
27-
void Handle(TEvPQ::TEvProxyResponse::TPtr&);
27+
void Handle(TEvPersQueue::TEvResponse::TPtr&);
2828
void Handle(TEvPQ::TEvError::TPtr&);
2929
void Handle(TEvents::TEvWakeup::TPtr&);
3030

@@ -33,8 +33,8 @@ class TMessageEnricherActor : public TBaseActor<TMessageEnricherActor>
3333
void ProcessQueue();
3434

3535
private:
36+
const TActorId TabletActorId;
3637
const ui32 PartitionId;
37-
const TActorId PartitionActorId;
3838
const TString ConsumerName;
3939
std::deque<TReadResult> Queue;
4040
TBackoff Backoff;

ydb/core/persqueue/pqtablet/pq_impl.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2598,7 +2598,6 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
25982598
directKey.SessionId = pipeIter->second.SessionId;
25992599
directKey.PartitionSessionId = pipeIter->second.PartitionSessionId;
26002600
}
2601-
TStringBuilder log; log << "PQ - create read proxy" << Endl;
26022601
TActorId rr = ctx.RegisterWithSameMailbox(CreateReadProxy(ev->Sender, TabletID(), ctx.SelfID, GetGeneration(), directKey, request));
26032602
ans = CreateResponseProxy(rr, ctx.SelfID, TopicName, p, m, s, c, ResourceMetrics, ctx);
26042603
} else {

ydb/core/persqueue/public/mlp/mlp_reader_ut.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,34 @@ Y_UNIT_TEST_SUITE(TMLPReaderTests) {
154154
}
155155

156156
}
157+
158+
Y_UNIT_TEST(TopicWithBigMessage) {
159+
auto setup = CreateSetup();
160+
161+
auto bigMessage = NUnitTest::RandomString(1_MB);
162+
163+
CreateTopic(setup, "/Root/topic1", "mlp-consumer");
164+
setup->Write("/Root/topic1", bigMessage, 0);
165+
166+
auto& runtime = setup->GetRuntime();
167+
CreateReaderActor(runtime, {
168+
.DatabasePath = "/Root",
169+
.TopicName = "/Root/topic1",
170+
.Consumer = "mlp-consumer",
171+
.WaitTime = TDuration::Seconds(3),
172+
.VisibilityTimeout = TDuration::Seconds(30),
173+
.MaxNumberOfMessage = 1,
174+
.UncompressMessages = true
175+
});
176+
177+
auto response = GetReadResponse(runtime);
178+
UNIT_ASSERT_VALUES_EQUAL(response->Messages.size(), 1);
179+
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.PartitionId, 0);
180+
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.Offset, 0);
181+
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].Data, bigMessage);
182+
}
183+
184+
157185
}
158186

159187
} // namespace NKikimr::NPQ::NMLP

ydb/core/persqueue/public/mlp/ut/common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ inline auto CreateSetup() {
2929
NKikimrServices::PERSQUEUE,
3030
NKikimrServices::PERSQUEUE_READ_BALANCER,
3131
},
32-
NActors::NLog::PRI_INFO
32+
NActors::NLog::PRI_DEBUG
3333
);
3434
return setup;
3535
}

0 commit comments

Comments
 (0)