Skip to content

Commit 1787dd7

Browse files
authored
JSON subcolumns store change from string to binary (#27653)
1 parent d929655 commit 1787dd7

File tree

35 files changed

+526
-412
lines changed

35 files changed

+526
-412
lines changed

ydb/core/formats/arrow/accessor/plain/accessor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ class TTrivialArray: public IChunkedArray {
112112
}
113113
};
114114

115+
static TPlainBuilder<arrow::BinaryType> MakeBuilderBinary(const ui32 reserveItems = 0, const ui32 reserveData = 0) {
116+
return TPlainBuilder<arrow::BinaryType>(reserveItems, reserveData);
117+
}
118+
115119
static TPlainBuilder<arrow::StringType> MakeBuilderUtf8(const ui32 reserveItems = 0, const ui32 reserveSize = 0) {
116120
return TPlainBuilder<arrow::StringType>(reserveItems, reserveSize);
117121
}

ydb/core/formats/arrow/accessor/sparsed/accessor.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,18 @@ class TSparsedArrayChunk {
7070
TSparsedArrayChunk(TSparsedArrayChunk&&) = default;
7171

7272
void VisitValues(const IChunkedArray::TValuesSimpleVisitor& visitor) const {
73-
visitor(ColValue);
74-
visitor(DefaultsArray);
73+
ui32 prevIndex = 0;
74+
for (ui32 idx = 0; idx < UI32ColIndex->length(); ++idx) {
75+
auto currentIndex = UI32ColIndex->Value(idx);
76+
for (ui32 i = prevIndex; i < currentIndex; ++i) {
77+
visitor(DefaultsArray);
78+
}
79+
visitor(ColValue->Slice(idx, 1));
80+
prevIndex = currentIndex + 1;
81+
}
82+
for (; prevIndex < RecordsCount; ++prevIndex) {
83+
visitor(DefaultsArray);
84+
}
7585
}
7686

7787
ui32 GetFinishPosition() const {
@@ -286,6 +296,10 @@ class TSparsedArray: public IChunkedArray {
286296
}
287297
};
288298

299+
static TSparsedBuilder<arrow::BinaryType> MakeBuilderBinary(const ui32 reserveItems = 0, const ui32 reserveData = 0) {
300+
return TSparsedBuilder<arrow::BinaryType>(nullptr, reserveItems, reserveData);
301+
}
302+
289303
static TSparsedBuilder<arrow::StringType> MakeBuilderUtf8(const ui32 reserveItems = 0, const ui32 reserveData = 0) {
290304
return TSparsedBuilder<arrow::StringType>(nullptr, reserveItems, reserveData);
291305
}

ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ TConclusion<std::shared_ptr<TSubColumnsArray>> TSubColumnsArray::Make(
2121
AFL_VERIFY(sourceArray);
2222
NSubColumns::TDataBuilder builder(columnType, settings);
2323
IChunkedArray::TReader reader(sourceArray);
24-
std::vector<std::shared_ptr<arrow::Array>> storage;
2524
for (ui32 i = 0; i < reader.GetRecordsCount();) {
2625
auto address = reader.GetReadChunk(i);
27-
storage.emplace_back(address.GetArray());
2826
auto conclusion = settings.GetDataExtractor()->AddDataToBuilders(address.GetArray(), builder);
2927
if (conclusion.IsFail()) {
3028
return conclusion;
@@ -72,7 +70,7 @@ TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& extern
7270
ui32 columnIdx = 0;
7371
TMonotonic pred = TMonotonic::Now();
7472
for (auto&& i : ColumnsData.GetRecords()->GetColumns()) {
75-
TChunkConstructionData cData(GetRecordsCount(), nullptr, arrow::utf8(), externalInfo.GetDefaultSerializer());
73+
TChunkConstructionData cData(GetRecordsCount(), nullptr, arrow::binary(), externalInfo.GetDefaultSerializer());
7674
blobRanges.emplace_back(ColumnsData.GetStats().GetAccessorConstructor(columnIdx).SerializeToString(i, cData));
7775
auto* cInfo = proto.AddKeyColumns();
7876
cInfo->SetSize(blobRanges.back().size());
@@ -125,7 +123,6 @@ class TJsonRestorer {
125123
}
126124

127125
TConclusion<NBinaryJson::TBinaryJson> Finish() {
128-
auto str = Result.GetStringRobust();
129126
auto bJson = NBinaryJson::SerializeToBinaryJson(Result.GetStringRobust());
130127
if (const TString* val = std::get_if<TString>(&bJson)) {
131128
return TConclusionStatus::Fail(*val);
@@ -136,13 +133,13 @@ class TJsonRestorer {
136133
}
137134
}
138135

139-
void SetValueByPath(const TString& path, const TString& valueStr) {
136+
void SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue) {
140137
ui32 start = 0;
141138
bool enqueue = false;
142139
bool wasEnqueue = false;
143140
NJson::TJsonValue* current = &Result;
144141
if (path.empty()) {
145-
current->InsertValue(path, valueStr);
142+
current->InsertValue(path, jsonValue);
146143
return;
147144
}
148145
for (ui32 i = 0; i < path.size(); ++i) {
@@ -196,7 +193,7 @@ class TJsonRestorer {
196193
if (wasEnqueue) {
197194
AFL_VERIFY(path.size() > start + 2)("path", path)("start", start);
198195
TStringBuf key(path.data() + start + 1, (path.size() - 1) - start - 1);
199-
current->InsertValue(key, valueStr);
196+
current->InsertValue(key, jsonValue);
200197
} else {
201198
AFL_VERIFY(path.size() > start)("path", path)("start", start);
202199
TStringBuf key(path.data() + start, (path.size()) - start);
@@ -208,11 +205,11 @@ class TJsonRestorer {
208205
if (current->GetArraySafe().size() <= keyIndex) {
209206
current->GetArraySafe().resize(keyIndex + 1);
210207
}
211-
current->GetArraySafe()[keyIndex] = valueStr;
208+
current->GetArraySafe()[keyIndex] = jsonValue;
212209
} else {
213210
AFL_VERIFY(!current->IsArray())("key", key)("current", current->GetStringRobust())("full", Result.GetStringRobust())(
214211
"current_type", current->GetType());
215-
current->InsertValue(key, valueStr);
212+
current->InsertValue(key, jsonValue);
216213
}
217214
}
218215
}
@@ -245,15 +242,15 @@ std::shared_ptr<arrow::Array> TSubColumnsArray::BuildBJsonArray(const TColumnCon
245242
}
246243
};
247244

248-
const auto addValueToJson = [&](const TString& path, const TString& valueStr) {
249-
value.SetValueByPath(path, valueStr);
245+
const auto addValueToJson = [&](const TString& path, const NJson::TJsonValue& jsonValue) {
246+
value.SetValueByPath(path, jsonValue);
250247
};
251248

252-
auto onRecordKV = [&](const ui32 index, const std::string_view valueView, const bool isColumn) {
249+
auto onRecordKV = [&](const ui32 index, const NJson::TJsonValue& jsonValue, const bool isColumn) {
253250
if (isColumn) {
254-
addValueToJson(ColumnsData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size()));
251+
addValueToJson(ColumnsData.GetStats().GetColumnNameString(index), jsonValue);
255252
} else {
256-
addValueToJson(OthersData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size()));
253+
addValueToJson(OthersData.GetStats().GetColumnNameString(index), jsonValue);
257254
}
258255
};
259256
it.ReadRecord(recordIndex, onStartRecord, onRecordKV, onFinishRecord);

ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include <ydb/core/formats/arrow/arrow_filter.h>
44

5+
#include <yql/essentials/types/binary_json/read.h>
6+
57
namespace NKikimr::NArrow::NAccessor::NSubColumns {
68
TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
79
auto records = Records->Slice(offset, count);
@@ -61,8 +63,8 @@ void TColumnsData::TIterator::InitArrays() {
6163
}
6264
const ui32 localIndex = FullArrayAddress->GetAddress().GetLocalIndex(CurrentIndex);
6365
ChunkAddress = FullArrayAddress->GetArray()->GetChunk(ChunkAddress, localIndex);
64-
AFL_VERIFY(ChunkAddress->GetArray()->type()->id() == arrow::utf8()->id());
65-
CurrentArrayData = static_cast<const arrow::StringArray*>(ChunkAddress->GetArray().get());
66+
AFL_VERIFY(ChunkAddress->GetArray()->type()->id() == arrow::binary()->id());
67+
CurrentArrayData = static_cast<const arrow::BinaryArray*>(ChunkAddress->GetArray().get());
6668
if (FullArrayAddress->GetArray()->GetType() == IChunkedArray::EType::Array) {
6769
if (CurrentArrayData->IsNull(localIndex)) {
6870
Next();
@@ -82,4 +84,15 @@ void TColumnsData::TIterator::InitArrays() {
8284
AFL_VERIFY(CurrentIndex <= GlobalChunkedArray->GetRecordsCount())("index", CurrentIndex)("count", GlobalChunkedArray->GetRecordsCount());
8385
}
8486

87+
NJson::TJsonValue TColumnsData::TIterator::GetValue() const {
88+
auto view = CurrentArrayData->GetView(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex));
89+
if (view.empty()) {
90+
return NJson::TJsonValue(NJson::JSON_UNDEFINED);
91+
}
92+
auto data = NBinaryJson::SerializeToJson(TStringBuf(view.data(), view.size()));
93+
NJson::TJsonValue res;
94+
AFL_VERIFY(NJson::ReadJsonTree(data, &res));
95+
return res;
96+
}
97+
8598
} // namespace NKikimr::NArrow::NAccessor::NSubColumns

ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class TColumnsData {
4949
private:
5050
ui32 KeyIndex;
5151
std::shared_ptr<IChunkedArray> GlobalChunkedArray;
52-
const arrow::StringArray* CurrentArrayData;
52+
const arrow::BinaryArray* CurrentArrayData;
5353
std::optional<IChunkedArray::TFullChunkedArrayAddress> FullArrayAddress;
5454
std::optional<IChunkedArray::TFullDataAddress> ChunkAddress;
5555
ui32 CurrentIndex = 0;
@@ -71,11 +71,13 @@ class TColumnsData {
7171
return KeyIndex;
7272
}
7373

74-
std::string_view GetValue() const {
74+
std::string_view GetRawValue() const {
7575
auto view = CurrentArrayData->GetView(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex));
7676
return std::string_view(view.data(), view.size());
7777
}
7878

79+
NJson::TJsonValue GetValue() const;
80+
7981
bool HasValue() const {
8082
return !CurrentArrayData->IsNull(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex));
8183
}
@@ -129,7 +131,7 @@ class TColumnsData {
129131
, Records(data) {
130132
AFL_VERIFY(Records->num_columns() == Stats.GetColumnsCount())("records", Records->num_columns())("stats", Stats.GetColumnsCount());
131133
for (auto&& i : Records->GetColumns()) {
132-
AFL_VERIFY(i->GetDataType()->id() == arrow::utf8()->id());
134+
AFL_VERIFY(i->GetDataType()->id() == arrow::binary()->id());
133135
}
134136
}
135137
};

ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp

Lines changed: 28 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -10,89 +10,43 @@
1010

1111
namespace NKikimr::NArrow::NAccessor::NSubColumns {
1212

13-
class TSimdBuffers: public TDataBuilder::IBuffers {
14-
private:
15-
std::vector<simdjson::padded_string> PaddedStrings;
16-
std::vector<TString> Strings;
17-
18-
public:
19-
TSimdBuffers(std::vector<simdjson::padded_string>&& paddedStrings, std::vector<TString>&& strings)
20-
: PaddedStrings(std::move(paddedStrings))
21-
, Strings(std::move(strings)) {
22-
}
23-
};
24-
2513
TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const {
26-
auto arr = std::static_pointer_cast<arrow::BinaryArray>(sourceArray);
27-
std::optional<bool> isBinaryJson;
28-
if (arr->type()->id() == arrow::utf8()->id()) {
29-
isBinaryJson = false;
30-
}
31-
if (!arr->length()) {
14+
AFL_VERIFY(sourceArray->type()->id() == arrow::binary()->id())
15+
("sourceArray->type()->id()", (int)sourceArray->type()->id());
16+
if (!sourceArray->length()) {
3217
return TConclusionStatus::Success();
3318
}
34-
simdjson::ondemand::parser simdParser;
35-
std::vector<simdjson::padded_string> paddedStrings;
36-
std::vector<TString> forceSIMDStrings;
37-
ui32 sumBuf = 0;
38-
ui32 paddedBorder = 0;
39-
for (i32 i = arr->length() - 1; i >= 1; --i) {
40-
sumBuf += arr->GetView(i).size();
41-
if (sumBuf > simdjson::SIMDJSON_PADDING) {
42-
paddedBorder = i;
43-
break;
44-
}
45-
}
19+
20+
auto arr = std::static_pointer_cast<arrow::BinaryArray>(sourceArray);
4621
for (ui32 i = 0; i < arr->length(); ++i) {
4722
const auto view = arr->GetView(i);
48-
if (view.size() && !arr->IsNull(i)) {
49-
TStringBuf sbJson(view.data(), view.size());
50-
if (!isBinaryJson) {
51-
isBinaryJson = NBinaryJson::IsValidBinaryJson(sbJson);
52-
}
53-
TString json;
54-
if (*isBinaryJson && ForceSIMDJsonParsing) {
55-
json = NBinaryJson::SerializeToJson(sbJson);
56-
forceSIMDStrings.emplace_back(json);
57-
sbJson = TStringBuf(json.data(), json.size());
58-
}
59-
if (!json && *isBinaryJson) {
60-
auto reader = NBinaryJson::TBinaryJsonReader::Make(sbJson);
61-
auto cursor = reader->GetRootCursor();
62-
std::deque<std::unique_ptr<IJsonObjectExtractor>> iterators;
63-
if (cursor.GetType() == NBinaryJson::EContainerType::Object) {
64-
iterators.push_back(std::make_unique<TKVExtractor>(cursor.GetObjectIterator(), TStringBuf(), FirstLevelOnly));
65-
} else if (cursor.GetType() == NBinaryJson::EContainerType::Array) {
66-
iterators.push_back(std::make_unique<TArrayExtractor>(cursor.GetArrayIterator(), TStringBuf(), FirstLevelOnly));
67-
}
68-
while (iterators.size()) {
69-
const auto conclusion = iterators.front()->Fill(dataBuilder, iterators);
70-
if (conclusion.IsFail()) {
71-
return conclusion;
72-
}
73-
iterators.pop_front();
74-
}
75-
} else {
76-
std::deque<std::unique_ptr<IJsonObjectExtractor>> iterators;
77-
simdjson::simdjson_result<simdjson::ondemand::document> doc;
78-
if (i < paddedBorder) {
79-
doc = simdParser.iterate(
80-
simdjson::padded_string_view(sbJson.data(), sbJson.size(), sbJson.size() + simdjson::SIMDJSON_PADDING));
81-
} else {
82-
paddedStrings.emplace_back(simdjson::padded_string(sbJson.data(), sbJson.size()));
83-
doc = simdParser.iterate(paddedStrings.back());
84-
}
85-
auto conclusion = TSIMDExtractor(doc, FirstLevelOnly).Fill(dataBuilder, iterators);
86-
if (conclusion.IsFail()) {
87-
return conclusion;
88-
}
23+
if (view.empty() || arr->IsNull(i)) {
24+
dataBuilder.StartNextRecord();
25+
continue;
26+
}
27+
28+
TStringBuf sbJson(view.data(), view.size());
29+
auto reader = NBinaryJson::TBinaryJsonReader::Make(sbJson);
30+
auto cursor = reader->GetRootCursor();
31+
std::deque<std::unique_ptr<IJsonObjectExtractor>> iterators;
32+
33+
if (cursor.GetType() == NBinaryJson::EContainerType::Object) {
34+
iterators.push_back(std::make_unique<TKVExtractor>(cursor.GetObjectIterator(), TStringBuf(), FirstLevelOnly));
35+
} else if (cursor.GetType() == NBinaryJson::EContainerType::Array) {
36+
iterators.push_back(std::make_unique<TArrayExtractor>(cursor.GetArrayIterator(), TStringBuf(), FirstLevelOnly));
37+
}
38+
39+
while (iterators.size()) {
40+
const auto conclusion = iterators.front()->Fill(dataBuilder, iterators);
41+
if (conclusion.IsFail()) {
42+
return conclusion;
8943
}
44+
iterators.pop_front();
9045
}
46+
9147
dataBuilder.StartNextRecord();
9248
}
93-
if (paddedStrings.size()) {
94-
dataBuilder.StoreBuffer(std::make_shared<TSimdBuffers>(std::move(paddedStrings), std::move(forceSIMDStrings)));
95-
}
49+
9650
return TConclusionStatus::Success();
9751
}
9852

ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns {
1616

1717
void TColumnElements::BuildSparsedAccessor(const ui32 recordsCount) {
1818
AFL_VERIFY(!Accessor);
19-
auto recordsBuilder = TSparsedArray::MakeBuilderUtf8(RecordIndexes.size(), DataSize);
19+
auto recordsBuilder = TSparsedArray::MakeBuilderBinary(RecordIndexes.size(), DataSize);
2020
for (ui32 idx = 0; idx < RecordIndexes.size(); ++idx) {
21-
recordsBuilder.AddRecord(RecordIndexes[idx], Values[idx]);
21+
const auto& rec = Values[idx];
22+
recordsBuilder.AddRecord(RecordIndexes[idx], std::string_view(rec.Data(), rec.Size()));
2223
}
2324
Accessor = recordsBuilder.Finish(recordsCount);
2425
}
2526

2627
void TColumnElements::BuildPlainAccessor(const ui32 recordsCount) {
2728
AFL_VERIFY(!Accessor);
28-
auto builder = TTrivialArray::MakeBuilderUtf8(recordsCount, DataSize);
29+
auto builder = TTrivialArray::MakeBuilderBinary(recordsCount, DataSize);
2930
for (auto it = RecordIndexes.begin(); it != RecordIndexes.end(); ++it) {
30-
builder.AddRecord(*it, Values[it - RecordIndexes.begin()]);
31+
const auto& rec = Values[it - RecordIndexes.begin()];
32+
builder.AddRecord(*it, std::string_view(rec.Data(), rec.Size()));
3133
}
3234
Accessor = builder.Finish(recordsCount);
3335
}
@@ -87,7 +89,7 @@ std::shared_ptr<TSubColumnsArray> TDataBuilder::Finish() {
8789

8890
auto records = std::make_shared<TGeneralContainer>(CurrentRecordIndex);
8991
for (auto&& i : columnElements) {
90-
records->AddField(std::make_shared<arrow::Field>(std::string(i->GetKeyName()), arrow::utf8()), i->GetAccessorVerified()).Validate();
92+
records->AddField(std::make_shared<arrow::Field>(std::string(i->GetKeyName()), arrow::binary()), i->GetAccessorVerified()).Validate();
9193
}
9294
TColumnsData cData(std::move(columnStats), std::move(records));
9395
return std::make_shared<TSubColumnsArray>(std::move(cData), std::move(rbOthers), Type, CurrentRecordIndex, Settings);
@@ -105,7 +107,10 @@ TOthersData TDataBuilder::MergeOthers(const std::vector<TColumnElements*>& other
105107
auto othersBuilder = TOthersData::MakeMergedBuilder();
106108
while (heap.size()) {
107109
std::pop_heap(heap.begin(), heap.end());
108-
othersBuilder->AddImpl(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), heap.back().GetValuePointer());
110+
std::string_view view = heap.back().GetValuePointer() ?
111+
std::string_view(heap.back().GetValuePointer()->Data(), heap.back().GetValuePointer()->Size()) : "";
112+
std::string_view* viewPtr = heap.back().GetValuePointer() ? &view : nullptr;
113+
othersBuilder->AddImpl(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), viewPtr);
109114
if (!heap.back().Next()) {
110115
heap.pop_back();
111116
} else {

0 commit comments

Comments
 (0)