Skip to content
4 changes: 4 additions & 0 deletions ydb/core/formats/arrow/accessor/plain/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class TTrivialArray: public IChunkedArray {
}
};

static TPlainBuilder<arrow::BinaryType> MakeBuilderBinary(const ui32 reserveItems = 0, const ui32 reserveData = 0) {
return TPlainBuilder<arrow::BinaryType>(reserveItems, reserveData);
}

static TPlainBuilder<arrow::StringType> MakeBuilderUtf8(const ui32 reserveItems = 0, const ui32 reserveSize = 0) {
return TPlainBuilder<arrow::StringType>(reserveItems, reserveSize);
}
Expand Down
18 changes: 16 additions & 2 deletions ydb/core/formats/arrow/accessor/sparsed/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,18 @@ class TSparsedArrayChunk {
TSparsedArrayChunk(TSparsedArrayChunk&&) = default;

void VisitValues(const IChunkedArray::TValuesSimpleVisitor& visitor) const {
visitor(ColValue);
visitor(DefaultsArray);
ui32 prevIndex = 0;
for (ui32 idx = 0; idx < UI32ColIndex->length(); ++idx) {
auto currentIndex = UI32ColIndex->Value(idx);
for (ui32 i = prevIndex; i < currentIndex; ++i) {
visitor(DefaultsArray);
}
visitor(ColValue->Slice(idx, 1));
prevIndex = currentIndex + 1;
}
for (; prevIndex < RecordsCount; ++prevIndex) {
visitor(DefaultsArray);
}
Comment on lines +73 to +84
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The VisitValues implementation creates array slices (ColValue->Slice(idx, 1)) in a loop which could be inefficient for large datasets. Consider batching or accessing the underlying array data directly to avoid repeated slice operations.

Copilot uses AI. Check for mistakes.
}

ui32 GetFinishPosition() const {
Expand Down Expand Up @@ -286,6 +296,10 @@ class TSparsedArray: public IChunkedArray {
}
};

static TSparsedBuilder<arrow::BinaryType> MakeBuilderBinary(const ui32 reserveItems = 0, const ui32 reserveData = 0) {
return TSparsedBuilder<arrow::BinaryType>(nullptr, reserveItems, reserveData);
}

static TSparsedBuilder<arrow::StringType> MakeBuilderUtf8(const ui32 reserveItems = 0, const ui32 reserveData = 0) {
return TSparsedBuilder<arrow::StringType>(nullptr, reserveItems, reserveData);
}
Expand Down
25 changes: 11 additions & 14 deletions ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ TConclusion<std::shared_ptr<TSubColumnsArray>> TSubColumnsArray::Make(
AFL_VERIFY(sourceArray);
NSubColumns::TDataBuilder builder(columnType, settings);
IChunkedArray::TReader reader(sourceArray);
std::vector<std::shared_ptr<arrow::Array>> storage;
for (ui32 i = 0; i < reader.GetRecordsCount();) {
auto address = reader.GetReadChunk(i);
storage.emplace_back(address.GetArray());
auto conclusion = settings.GetDataExtractor()->AddDataToBuilders(address.GetArray(), builder);
if (conclusion.IsFail()) {
return conclusion;
Expand Down Expand Up @@ -72,7 +70,7 @@ TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& extern
ui32 columnIdx = 0;
TMonotonic pred = TMonotonic::Now();
for (auto&& i : ColumnsData.GetRecords()->GetColumns()) {
TChunkConstructionData cData(GetRecordsCount(), nullptr, arrow::utf8(), externalInfo.GetDefaultSerializer());
TChunkConstructionData cData(GetRecordsCount(), nullptr, arrow::binary(), externalInfo.GetDefaultSerializer());
blobRanges.emplace_back(ColumnsData.GetStats().GetAccessorConstructor(columnIdx).SerializeToString(i, cData));
auto* cInfo = proto.AddKeyColumns();
cInfo->SetSize(blobRanges.back().size());
Expand Down Expand Up @@ -125,7 +123,6 @@ class TJsonRestorer {
}

TConclusion<NBinaryJson::TBinaryJson> Finish() {
auto str = Result.GetStringRobust();
auto bJson = NBinaryJson::SerializeToBinaryJson(Result.GetStringRobust());
if (const TString* val = std::get_if<TString>(&bJson)) {
return TConclusionStatus::Fail(*val);
Expand All @@ -136,13 +133,13 @@ class TJsonRestorer {
}
}

void SetValueByPath(const TString& path, const TString& valueStr) {
void SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue) {
ui32 start = 0;
bool enqueue = false;
bool wasEnqueue = false;
NJson::TJsonValue* current = &Result;
if (path.empty()) {
current->InsertValue(path, valueStr);
current->InsertValue(path, jsonValue);
return;
}
for (ui32 i = 0; i < path.size(); ++i) {
Expand Down Expand Up @@ -196,7 +193,7 @@ class TJsonRestorer {
if (wasEnqueue) {
AFL_VERIFY(path.size() > start + 2)("path", path)("start", start);
TStringBuf key(path.data() + start + 1, (path.size() - 1) - start - 1);
current->InsertValue(key, valueStr);
current->InsertValue(key, jsonValue);
} else {
AFL_VERIFY(path.size() > start)("path", path)("start", start);
TStringBuf key(path.data() + start, (path.size()) - start);
Expand All @@ -208,11 +205,11 @@ class TJsonRestorer {
if (current->GetArraySafe().size() <= keyIndex) {
current->GetArraySafe().resize(keyIndex + 1);
}
current->GetArraySafe()[keyIndex] = valueStr;
current->GetArraySafe()[keyIndex] = jsonValue;
} else {
AFL_VERIFY(!current->IsArray())("key", key)("current", current->GetStringRobust())("full", Result.GetStringRobust())(
"current_type", current->GetType());
current->InsertValue(key, valueStr);
current->InsertValue(key, jsonValue);
}
}
}
Expand Down Expand Up @@ -245,15 +242,15 @@ std::shared_ptr<arrow::Array> TSubColumnsArray::BuildBJsonArray(const TColumnCon
}
};

const auto addValueToJson = [&](const TString& path, const TString& valueStr) {
value.SetValueByPath(path, valueStr);
const auto addValueToJson = [&](const TString& path, const NJson::TJsonValue& jsonValue) {
value.SetValueByPath(path, jsonValue);
};

auto onRecordKV = [&](const ui32 index, const std::string_view valueView, const bool isColumn) {
auto onRecordKV = [&](const ui32 index, const NJson::TJsonValue& jsonValue, const bool isColumn) {
if (isColumn) {
addValueToJson(ColumnsData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size()));
addValueToJson(ColumnsData.GetStats().GetColumnNameString(index), jsonValue);
} else {
addValueToJson(OthersData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size()));
addValueToJson(OthersData.GetStats().GetColumnNameString(index), jsonValue);
}
};
it.ReadRecord(recordIndex, onStartRecord, onRecordKV, onFinishRecord);
Expand Down
17 changes: 15 additions & 2 deletions ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/formats/arrow/arrow_filter.h>

#include <yql/essentials/types/binary_json/read.h>

namespace NKikimr::NArrow::NAccessor::NSubColumns {
TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
auto records = Records->Slice(offset, count);
Expand Down Expand Up @@ -61,8 +63,8 @@ void TColumnsData::TIterator::InitArrays() {
}
const ui32 localIndex = FullArrayAddress->GetAddress().GetLocalIndex(CurrentIndex);
ChunkAddress = FullArrayAddress->GetArray()->GetChunk(ChunkAddress, localIndex);
AFL_VERIFY(ChunkAddress->GetArray()->type()->id() == arrow::utf8()->id());
CurrentArrayData = static_cast<const arrow::StringArray*>(ChunkAddress->GetArray().get());
AFL_VERIFY(ChunkAddress->GetArray()->type()->id() == arrow::binary()->id());
CurrentArrayData = static_cast<const arrow::BinaryArray*>(ChunkAddress->GetArray().get());
if (FullArrayAddress->GetArray()->GetType() == IChunkedArray::EType::Array) {
if (CurrentArrayData->IsNull(localIndex)) {
Next();
Expand All @@ -82,4 +84,15 @@ void TColumnsData::TIterator::InitArrays() {
AFL_VERIFY(CurrentIndex <= GlobalChunkedArray->GetRecordsCount())("index", CurrentIndex)("count", GlobalChunkedArray->GetRecordsCount());
}

NJson::TJsonValue TColumnsData::TIterator::GetValue() const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я кстати не уверен что NJson::TJsonValue это самый эффективный способ хранения json, можно будет поисследовать другие либы как они по префу. Это уже отдельно можно провернуть

auto view = CurrentArrayData->GetView(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex));
if (view.empty()) {
return NJson::TJsonValue(NJson::JSON_UNDEFINED);
}
auto data = NBinaryJson::SerializeToJson(TStringBuf(view.data(), view.size()));
NJson::TJsonValue res;
AFL_VERIFY(NJson::ReadJsonTree(data, &res));
return res;
}

} // namespace NKikimr::NArrow::NAccessor::NSubColumns
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TColumnsData {
private:
ui32 KeyIndex;
std::shared_ptr<IChunkedArray> GlobalChunkedArray;
const arrow::StringArray* CurrentArrayData;
const arrow::BinaryArray* CurrentArrayData;
std::optional<IChunkedArray::TFullChunkedArrayAddress> FullArrayAddress;
std::optional<IChunkedArray::TFullDataAddress> ChunkAddress;
ui32 CurrentIndex = 0;
Expand All @@ -71,11 +71,13 @@ class TColumnsData {
return KeyIndex;
}

std::string_view GetValue() const {
std::string_view GetRawValue() const {
auto view = CurrentArrayData->GetView(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex));
return std::string_view(view.data(), view.size());
}

NJson::TJsonValue GetValue() const;

bool HasValue() const {
return !CurrentArrayData->IsNull(ChunkAddress->GetAddress().GetLocalIndex(CurrentIndex));
}
Expand Down Expand Up @@ -129,7 +131,7 @@ class TColumnsData {
, Records(data) {
AFL_VERIFY(Records->num_columns() == Stats.GetColumnsCount())("records", Records->num_columns())("stats", Stats.GetColumnsCount());
for (auto&& i : Records->GetColumns()) {
AFL_VERIFY(i->GetDataType()->id() == arrow::utf8()->id());
AFL_VERIFY(i->GetDataType()->id() == arrow::binary()->id());
}
}
};
Expand Down
102 changes: 28 additions & 74 deletions ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,89 +10,43 @@

namespace NKikimr::NArrow::NAccessor::NSubColumns {

class TSimdBuffers: public TDataBuilder::IBuffers {
private:
std::vector<simdjson::padded_string> PaddedStrings;
std::vector<TString> Strings;

public:
TSimdBuffers(std::vector<simdjson::padded_string>&& paddedStrings, std::vector<TString>&& strings)
: PaddedStrings(std::move(paddedStrings))
, Strings(std::move(strings)) {
}
};

TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const {
auto arr = std::static_pointer_cast<arrow::BinaryArray>(sourceArray);
std::optional<bool> isBinaryJson;
if (arr->type()->id() == arrow::utf8()->id()) {
isBinaryJson = false;
}
if (!arr->length()) {
AFL_VERIFY(sourceArray->type()->id() == arrow::binary()->id())
("sourceArray->type()->id()", (int)sourceArray->type()->id());
if (!sourceArray->length()) {
return TConclusionStatus::Success();
}
simdjson::ondemand::parser simdParser;
std::vector<simdjson::padded_string> paddedStrings;
std::vector<TString> forceSIMDStrings;
ui32 sumBuf = 0;
ui32 paddedBorder = 0;
for (i32 i = arr->length() - 1; i >= 1; --i) {
sumBuf += arr->GetView(i).size();
if (sumBuf > simdjson::SIMDJSON_PADDING) {
paddedBorder = i;
break;
}
}

auto arr = std::static_pointer_cast<arrow::BinaryArray>(sourceArray);
for (ui32 i = 0; i < arr->length(); ++i) {
const auto view = arr->GetView(i);
if (view.size() && !arr->IsNull(i)) {
TStringBuf sbJson(view.data(), view.size());
if (!isBinaryJson) {
isBinaryJson = NBinaryJson::IsValidBinaryJson(sbJson);
}
TString json;
if (*isBinaryJson && ForceSIMDJsonParsing) {
json = NBinaryJson::SerializeToJson(sbJson);
forceSIMDStrings.emplace_back(json);
sbJson = TStringBuf(json.data(), json.size());
}
if (!json && *isBinaryJson) {
auto reader = NBinaryJson::TBinaryJsonReader::Make(sbJson);
auto cursor = reader->GetRootCursor();
std::deque<std::unique_ptr<IJsonObjectExtractor>> iterators;
if (cursor.GetType() == NBinaryJson::EContainerType::Object) {
iterators.push_back(std::make_unique<TKVExtractor>(cursor.GetObjectIterator(), TStringBuf(), FirstLevelOnly));
} else if (cursor.GetType() == NBinaryJson::EContainerType::Array) {
iterators.push_back(std::make_unique<TArrayExtractor>(cursor.GetArrayIterator(), TStringBuf(), FirstLevelOnly));
}
while (iterators.size()) {
const auto conclusion = iterators.front()->Fill(dataBuilder, iterators);
if (conclusion.IsFail()) {
return conclusion;
}
iterators.pop_front();
}
} else {
std::deque<std::unique_ptr<IJsonObjectExtractor>> iterators;
simdjson::simdjson_result<simdjson::ondemand::document> doc;
if (i < paddedBorder) {
doc = simdParser.iterate(
simdjson::padded_string_view(sbJson.data(), sbJson.size(), sbJson.size() + simdjson::SIMDJSON_PADDING));
} else {
paddedStrings.emplace_back(simdjson::padded_string(sbJson.data(), sbJson.size()));
doc = simdParser.iterate(paddedStrings.back());
}
auto conclusion = TSIMDExtractor(doc, FirstLevelOnly).Fill(dataBuilder, iterators);
if (conclusion.IsFail()) {
return conclusion;
}
if (view.empty() || arr->IsNull(i)) {
dataBuilder.StartNextRecord();
continue;
}

TStringBuf sbJson(view.data(), view.size());
auto reader = NBinaryJson::TBinaryJsonReader::Make(sbJson);
auto cursor = reader->GetRootCursor();
std::deque<std::unique_ptr<IJsonObjectExtractor>> iterators;

if (cursor.GetType() == NBinaryJson::EContainerType::Object) {
iterators.push_back(std::make_unique<TKVExtractor>(cursor.GetObjectIterator(), TStringBuf(), FirstLevelOnly));
} else if (cursor.GetType() == NBinaryJson::EContainerType::Array) {
iterators.push_back(std::make_unique<TArrayExtractor>(cursor.GetArrayIterator(), TStringBuf(), FirstLevelOnly));
}

while (iterators.size()) {
const auto conclusion = iterators.front()->Fill(dataBuilder, iterators);
if (conclusion.IsFail()) {
return conclusion;
}
iterators.pop_front();
}

dataBuilder.StartNextRecord();
}
if (paddedStrings.size()) {
dataBuilder.StoreBuffer(std::make_shared<TSimdBuffers>(std::move(paddedStrings), std::move(forceSIMDStrings)));
}

return TConclusionStatus::Success();
}

Expand Down
17 changes: 11 additions & 6 deletions ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns {

void TColumnElements::BuildSparsedAccessor(const ui32 recordsCount) {
AFL_VERIFY(!Accessor);
auto recordsBuilder = TSparsedArray::MakeBuilderUtf8(RecordIndexes.size(), DataSize);
auto recordsBuilder = TSparsedArray::MakeBuilderBinary(RecordIndexes.size(), DataSize);
for (ui32 idx = 0; idx < RecordIndexes.size(); ++idx) {
recordsBuilder.AddRecord(RecordIndexes[idx], Values[idx]);
const auto& rec = Values[idx];
recordsBuilder.AddRecord(RecordIndexes[idx], std::string_view(rec.Data(), rec.Size()));
}
Accessor = recordsBuilder.Finish(recordsCount);
}

void TColumnElements::BuildPlainAccessor(const ui32 recordsCount) {
AFL_VERIFY(!Accessor);
auto builder = TTrivialArray::MakeBuilderUtf8(recordsCount, DataSize);
auto builder = TTrivialArray::MakeBuilderBinary(recordsCount, DataSize);
for (auto it = RecordIndexes.begin(); it != RecordIndexes.end(); ++it) {
builder.AddRecord(*it, Values[it - RecordIndexes.begin()]);
const auto& rec = Values[it - RecordIndexes.begin()];
builder.AddRecord(*it, std::string_view(rec.Data(), rec.Size()));
}
Accessor = builder.Finish(recordsCount);
}
Expand Down Expand Up @@ -87,7 +89,7 @@ std::shared_ptr<TSubColumnsArray> TDataBuilder::Finish() {

auto records = std::make_shared<TGeneralContainer>(CurrentRecordIndex);
for (auto&& i : columnElements) {
records->AddField(std::make_shared<arrow::Field>(std::string(i->GetKeyName()), arrow::utf8()), i->GetAccessorVerified()).Validate();
records->AddField(std::make_shared<arrow::Field>(std::string(i->GetKeyName()), arrow::binary()), i->GetAccessorVerified()).Validate();
}
TColumnsData cData(std::move(columnStats), std::move(records));
return std::make_shared<TSubColumnsArray>(std::move(cData), std::move(rbOthers), Type, CurrentRecordIndex, Settings);
Expand All @@ -105,7 +107,10 @@ TOthersData TDataBuilder::MergeOthers(const std::vector<TColumnElements*>& other
auto othersBuilder = TOthersData::MakeMergedBuilder();
while (heap.size()) {
std::pop_heap(heap.begin(), heap.end());
othersBuilder->AddImpl(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), heap.back().GetValuePointer());
std::string_view view = heap.back().GetValuePointer() ?
std::string_view(heap.back().GetValuePointer()->Data(), heap.back().GetValuePointer()->Size()) : "";
std::string_view* viewPtr = heap.back().GetValuePointer() ? &view : nullptr;
Comment on lines +110 to +112
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code calls heap.back().GetValuePointer() multiple times (4 times total). Consider storing the result in a variable to improve readability and avoid redundant calls.

Suggested change
std::string_view view = heap.back().GetValuePointer() ?
std::string_view(heap.back().GetValuePointer()->Data(), heap.back().GetValuePointer()->Size()) : "";
std::string_view* viewPtr = heap.back().GetValuePointer() ? &view : nullptr;
auto* valuePtr = heap.back().GetValuePointer();
std::string_view view = valuePtr ? std::string_view(valuePtr->Data(), valuePtr->Size()) : "";
std::string_view* viewPtr = valuePtr ? &view : nullptr;

Copilot uses AI. Check for mistakes.
othersBuilder->AddImpl(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), viewPtr);
if (!heap.back().Next()) {
heap.pop_back();
} else {
Expand Down
Loading
Loading