Skip to content

Commit 06da671

Browse files
authored
Fix read error handling during compaction (#27624)
1 parent a26b0f0 commit 06da671

File tree

3 files changed

+130
-12
lines changed

3 files changed

+130
-12
lines changed

ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -768,28 +768,41 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {
768768

769769
const auto chunkIt = owner->ChunkData.find(msg->ChunkIdx);
770770
if (chunkIt == owner->ChunkData.end()) {
771-
res->Data.AddGap(0, size); // no data at all
771+
res->Data.AddGap(offset, offset + size); // no data at all
772+
memset(data.GetDataMut(), '~', size);
772773
} else {
773774
TImpl::TChunkData& chunk = chunkIt->second;
774775
const ui64 chunkOffset = (ui64)msg->ChunkIdx * Impl.ChunkSize;
775-
if (Impl.Corrupted & TIntervalSet<ui64>(chunkOffset + offset, chunkOffset + offset + size)) {
776+
const bool hasCorruptedParts = static_cast<bool>(Impl.Corrupted & TIntervalSet<ui64>(chunkOffset + offset,
777+
chunkOffset + offset + size));
778+
if (hasCorruptedParts && RandomNumber(2u)) {
776779
res->Status = NKikimrProto::CORRUPTED;
777780
} else {
778-
char *begin = data.GetDataMut(), *ptr = begin;
781+
size_t offsetInBuffer = 0;
782+
ui32 blockIdx = offset / Impl.AppendBlockSize;
783+
ui32 offsetInBlock = offset % Impl.AppendBlockSize;
779784
while (size) {
780-
const ui32 blockIdx = offset / Impl.AppendBlockSize;
781-
const ui32 offsetInBlock = offset % Impl.AppendBlockSize;
782785
const ui32 num = Min(size, Impl.AppendBlockSize - offsetInBlock);
786+
783787
const auto it = chunk.Blocks.find(blockIdx);
784-
if (it == chunk.Blocks.end()) {
785-
const ui32 base = ptr - begin;
786-
res->Data.AddGap(base, base + num);
788+
789+
const bool corrupted = hasCorruptedParts && (Impl.Corrupted & TIntervalSet<ui64>(
790+
chunkOffset + blockIdx * Impl.AppendBlockSize,
791+
chunkOffset + (blockIdx + 1) * Impl.AppendBlockSize));
792+
793+
if (it == chunk.Blocks.end() || corrupted) {
794+
res->Data.AddGap(offset, offset + num);
795+
memset(data.GetDataMut() + offsetInBuffer, '~', num);
787796
} else {
788-
memcpy(ptr, it->second->data() + offsetInBlock, num);
797+
memcpy(data.GetDataMut() + offsetInBuffer, it->second->data() + offsetInBlock, num);
789798
}
790-
ptr += num;
799+
791800
offset += num;
801+
offsetInBuffer += num;
792802
size -= num;
803+
804+
++blockIdx;
805+
offsetInBlock = 0;
793806
}
794807
}
795808
}

ydb/core/blobstorage/ut_blobstorage/corrupted_reads.cpp

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,109 @@ Y_UNIT_TEST_SUITE(CorruptedReads) {
147147
}
148148
}
149149
}
150+
151+
Y_UNIT_TEST(Compaction) {
152+
TEnvironmentSetup env(TEnvironmentSetup::TSettings{
153+
.NodeCount = 8,
154+
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
155+
});
156+
157+
env.CreateBoxAndPool(1, 1, 0, NKikimrBlobStorage::EPDiskType::NVME);
158+
env.Sim(TDuration::Minutes(1));
159+
auto groups = env.GetGroups();
160+
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
161+
const TIntrusivePtr<TBlobStorageGroupInfo> info = env.GetGroupInfo(groups.front());
162+
env.Sim(TDuration::Minutes(5));
163+
164+
const TActorId writer = env.Runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
165+
166+
const ui32 orderNum = 0;
167+
const TVDiskID& vdiskId = info->GetVDiskId(orderNum);
168+
const TActorId& actorId = info->GetActorId(orderNum);
169+
UNIT_ASSERT_VALUES_EQUAL(actorId.NodeId(), writer.NodeId());
170+
171+
std::vector<TLogoBlobID> blobs;
172+
THashMap<TLogoBlobID, TDiskPart> prevLocation;
173+
174+
for (ui32 iter = 0; iter < 2; ++iter) {
175+
ui32 index = iter + 1;
176+
177+
for (ui32 i = 0; i < 5000; ++i) {
178+
const size_t size = 100 + RandomNumber(10000u);
179+
TString data = FastGenDataForLZ4(size, index);
180+
const TLogoBlobID id(1, 1, index, 0, data.size(), 0);
181+
index += 2;
182+
183+
env.Runtime->WrapInActorContext(writer, [&] {
184+
SendToBSProxy(writer, info->GroupID, new TEvBlobStorage::TEvPut(id, TRope(data), TInstant::Max()));
185+
});
186+
const auto& res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(writer, false);
187+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
188+
}
189+
190+
auto ev = std::make_unique<IEventHandle>(actorId, writer, TEvCompactVDisk::Create(EHullDbType::LogoBlobs));
191+
ev->Rewrite(TEvBlobStorage::EvForwardToSkeleton, actorId);
192+
env.Runtime->Send(ev.release(), writer.NodeId());
193+
env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(writer, false);
194+
195+
TIntrusivePtr<TPDiskMockState> state;
196+
for (auto& [pdiskId, ptr] : env.PDiskMockStates) {
197+
const auto& [nodeId, _] = pdiskId;
198+
if (nodeId == writer.NodeId()) {
199+
UNIT_ASSERT(!state);
200+
state = ptr;
201+
}
202+
}
203+
204+
auto res = env.SyncQuery<TEvBlobStorage::TEvCaptureVDiskLayoutResult, TEvBlobStorage::TEvCaptureVDiskLayout>(actorId);
205+
for (const auto& item : res->Layout) {
206+
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
207+
if (item.Database != T::EDatabase::LogoBlobs) {
208+
continue;
209+
}
210+
if (!item.BlobId) {
211+
continue;
212+
}
213+
const auto& location = item.Location;
214+
if (iter == 0) {
215+
blobs.push_back(item.BlobId);
216+
prevLocation[item.BlobId] = location;
217+
state->SetCorruptedArea(location.ChunkIdx, location.Offset, location.Offset + location.Size / 2, true);
218+
} else if (prevLocation.contains(item.BlobId)) {
219+
UNIT_ASSERT(prevLocation[item.BlobId] != location);
220+
}
221+
}
222+
}
223+
224+
auto queueActorId = env.CreateQueueActor(vdiskId, NKikimrBlobStorage::GetFastRead, 0, actorId.NodeId());
225+
226+
for (ui32 nodeId : env.Runtime->GetNodes()) {
227+
if (nodeId != queueActorId.NodeId()) {
228+
env.StopNode(nodeId);
229+
}
230+
}
231+
232+
for (const TLogoBlobID& id : blobs) {
233+
auto query = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(),
234+
NKikimrBlobStorage::FastRead, TEvBlobStorage::TEvVGet::EFlags::None, {}, {id});
235+
env.Runtime->Send(new IEventHandle(queueActorId, writer, query.release()), queueActorId.NodeId());
236+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(writer, false);
237+
const auto& ev = res->Get();
238+
auto& record = ev->Record;
239+
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
240+
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
241+
for (auto& item : record.GetResult()) {
242+
UNIT_ASSERT_VALUES_EQUAL(item.GetStatus(), NKikimrProto::OK);
243+
const TLogoBlobID id = LogoBlobIDFromLogoBlobID(item.GetBlobID());
244+
const TString data = FastGenDataForLZ4(id.BlobSize(), id.Step());
245+
std::vector<TRope> parts(info->Type.TotalPartCount());
246+
ErasureSplit(static_cast<TBlobStorageGroupType::ECrcMode>(id.CrcMode()), info->Type, TRope(data), parts);
247+
UNIT_ASSERT(ev->HasBlob(item));
248+
const TRope& readPartData = ev->GetBlobData(item);
249+
const TRope& expectedPart = parts[id.PartId() - 1];
250+
UNIT_ASSERT_VALUES_EQUAL(readPartData.size(), expectedPart.size());
251+
UNIT_ASSERT_EQUAL(readPartData, expectedPart);
252+
}
253+
}
254+
}
150255
}

ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,9 @@ namespace NKikimr {
238238
// calculate offset of item inside response
239239
const ui32 offset = item.Offset - msg->Offset;
240240

241-
// if item is not readable at this point, we return ERROR
241+
// if item is not readable at this point, we return CORRUPTED, so the blob can be restored through scrubbing mechanism
242242
if (!data.IsReadable(offset, item.Size)) {
243-
item.Status = NKikimrProto::ERROR;
243+
item.Status = NKikimrProto::CORRUPTED;
244244
} else {
245245
item.Content = data.Substr(offset, item.Size);
246246
}

0 commit comments

Comments
 (0)