Skip to content

Commit 588a901

Browse files
authored
Improve recursive normalized data read performance (#2742)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> https://man312219.monday.com/boards/7852509418/pulses/18298965201 #### What does this implement or fix? ##### `read` `batch_read_keys` read index keys of individual leaf nodes one by one during submission of read tasks. This PR has made this step runs in parallel in C++ layer. It has shown read performance improvment, espceially on slow network or data with more leaf nodes: | Read | Time(s) | | | | |-----------------------|------------------------------------------------------------|-----------------:|------------------------------------|------------------| | | **Remote AWS** | | **Local S3 Storage (moto)** | | | | **Before** | **After** | **Before** | **After** | | 200 Large Dataframe | 98.4112 | 50.547 | 27.7294 | 25.2147 | | 2000 Small Dataframe | 159.712 | 9.73144 | 33.0835 | 10.7383 | ##### `batch_read` It has been changed to unify to code path with `read`, Now node keys are read in the same chain of root keys. The performance has not bettered or worsened, as expected. | Read | Time(s) | | | | |-----------------------|--------------------|----------|--------------------------------|----------| | | **Remote AWS** | | **Local S3 Storage (moto)** | | | | **Before** | **After**| **Before** | **After**| | 2000 Symbols × 200 Dataframe | 7.379 | 7.161 | 7.224 | 7.252 | #### Any other comments? ASV benchmark fails because of unreliable arrow and peakmem tests. They can be ignored. #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent f0a5cc3 commit 588a901

20 files changed

+597
-234
lines changed

cpp/arcticdb/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ set(arcticdb_srcs
555555
version/version_utils.cpp
556556
version/symbol_list.cpp
557557
version/version_map_batch_methods.cpp
558+
version/version_tasks.cpp
558559
storage/s3/ec2_utils.cpp
559560
)
560561

cpp/arcticdb/entity/read_result.hpp

Lines changed: 61 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,21 @@ namespace arcticdb {
2424

2525
using OutputFrame = std::variant<pipelines::PandasOutputFrame, ArrowOutputFrame>;
2626

27+
struct ARCTICDB_VISIBILITY_HIDDEN NodeReadResult {
28+
NodeReadResult(
29+
const StreamId& symbol, OutputFrame&& frame_data,
30+
arcticdb::proto::descriptors::NormalizationMetadata&& norm_meta
31+
) :
32+
symbol_(symbol),
33+
frame_data_(std::move(frame_data)),
34+
norm_meta_(std::move(norm_meta)) {};
35+
StreamId symbol_;
36+
OutputFrame frame_data_;
37+
arcticdb::proto::descriptors::NormalizationMetadata norm_meta_;
38+
39+
ARCTICDB_MOVE_ONLY_DEFAULT(NodeReadResult)
40+
};
41+
2742
struct ARCTICDB_VISIBILITY_HIDDEN ReadResult {
2843
ReadResult(
2944
const std::variant<VersionedItem, std::vector<VersionedItem>>& versioned_item, OutputFrame&& frame_data,
@@ -32,15 +47,15 @@ struct ARCTICDB_VISIBILITY_HIDDEN ReadResult {
3247
arcticdb::proto::descriptors::UserDefinedMetadata,
3348
std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>& user_meta,
3449
const arcticdb::proto::descriptors::UserDefinedMetadata& multi_key_meta,
35-
std::vector<entity::AtomKey>&& multi_keys
50+
std::vector<NodeReadResult>&& node_results = {}
3651
) :
3752
item(versioned_item),
3853
frame_data(std::move(frame_data)),
3954
output_format(output_format),
4055
norm_meta(norm_meta),
4156
user_meta(user_meta),
4257
multi_key_meta(multi_key_meta),
43-
multi_keys(std::move(multi_keys)) {}
58+
node_results(std::move(node_results)) {}
4459
std::variant<VersionedItem, std::vector<VersionedItem>> item;
4560
OutputFrame frame_data;
4661
OutputFormat output_format;
@@ -50,71 +65,53 @@ struct ARCTICDB_VISIBILITY_HIDDEN ReadResult {
5065
std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>
5166
user_meta;
5267
arcticdb::proto::descriptors::UserDefinedMetadata multi_key_meta;
53-
std::vector<entity::AtomKey> multi_keys;
68+
std::vector<NodeReadResult> node_results;
5469

5570
ARCTICDB_MOVE_ONLY_DEFAULT(ReadResult)
5671
};
5772

58-
inline ReadResult create_python_read_result(
59-
const std::variant<VersionedItem, std::vector<VersionedItem>>& version, OutputFormat output_format,
60-
FrameAndDescriptor&& fd,
61-
std::optional<std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>&& user_meta = std::nullopt
62-
) {
63-
auto result = std::move(fd);
64-
65-
// If version is a vector then this was a multi-symbol join, so the user_meta vector should have a value
66-
// Otherwise, there is a single piece of metadata on the frame descriptor
67-
util::check(
68-
std::holds_alternative<VersionedItem>(version) ^ user_meta.has_value(),
69-
"Unexpected argument combination to create_python_read_result"
70-
);
71-
72-
// Very old (pre Nov-2020) PandasIndex protobuf messages had no "start" or "step" fields. If is_physically_stored
73-
// (renamed from is_not_range_index) was false, the index was always RangeIndex(num_rows, 1)
74-
// This used to be handled in the Python layer by passing None to the DataFrame index parameter, which would then
75-
// default to RangeIndex(num_rows, 1). However, the empty index also has is_physically_stored as false, and because
76-
// integer protobuf fields default to zero if they are not present on the wire, it is impossible to tell from
77-
// the normalization metadata alone if the data was written with an empty index, or with a very old range index.
78-
// We therefore patch the normalization metadata here in this case
79-
auto norm_meta = result.desc_.mutable_proto().mutable_normalization();
80-
if (norm_meta->has_df() || norm_meta->has_series()) {
81-
auto common = norm_meta->has_df() ? norm_meta->mutable_df()->mutable_common()
82-
: norm_meta->mutable_series()->mutable_common();
83-
if (common->has_index()) {
84-
auto index = common->mutable_index();
85-
if (result.desc_.index().type() == IndexDescriptor::Type::ROWCOUNT && !index->is_physically_stored() &&
86-
index->start() == 0 && index->step() == 0) {
87-
index->set_step(1);
88-
}
89-
}
90-
}
91-
92-
auto python_frame = [&]() -> OutputFrame {
93-
if (output_format == OutputFormat::ARROW) {
94-
return ArrowOutputFrame{segment_to_arrow_data(result.frame_)};
95-
} else {
96-
return pipelines::PandasOutputFrame{result.frame_};
97-
}
98-
}();
99-
util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__);
100-
101-
const auto& desc_proto = result.desc_.proto();
102-
std::variant<
103-
arcticdb::proto::descriptors::UserDefinedMetadata,
104-
std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>
105-
metadata;
106-
if (user_meta.has_value()) {
107-
metadata = std::move(*user_meta);
108-
} else {
109-
metadata = std::move(desc_proto.user_meta());
110-
}
111-
return {version,
112-
std::move(python_frame),
113-
output_format,
114-
desc_proto.normalization(),
115-
metadata,
116-
desc_proto.multi_key_meta(),
117-
std::move(result.keys_)};
118-
}
73+
namespace version_store {
74+
75+
struct SymbolProcessingResult {
76+
VersionedItem versioned_item_;
77+
proto::descriptors::UserDefinedMetadata metadata_;
78+
OutputSchema output_schema_;
79+
std::vector<EntityId> entity_ids_;
80+
};
81+
82+
struct ReadVersionOutput {
83+
ReadVersionOutput() = delete;
84+
ReadVersionOutput(VersionedItem&& versioned_item, FrameAndDescriptor&& frame_and_descriptor) :
85+
versioned_item_(std::move(versioned_item)),
86+
frame_and_descriptor_(std::move(frame_and_descriptor)) {}
87+
88+
ARCTICDB_MOVE_ONLY_DEFAULT(ReadVersionOutput)
89+
90+
VersionedItem versioned_item_;
91+
FrameAndDescriptor frame_and_descriptor_;
92+
};
93+
94+
struct ReadVersionWithNodesOutput {
95+
ReadVersionOutput root_;
96+
std::vector<ReadVersionOutput> nodes_;
97+
};
98+
99+
struct MultiSymbolReadOutput {
100+
MultiSymbolReadOutput() = delete;
101+
MultiSymbolReadOutput(
102+
std::vector<VersionedItem>&& versioned_items,
103+
std::vector<proto::descriptors::UserDefinedMetadata>&& metadatas, FrameAndDescriptor&& frame_and_descriptor
104+
) :
105+
versioned_items_(std::move(versioned_items)),
106+
metadatas_(std::move(metadatas)),
107+
frame_and_descriptor_(std::move(frame_and_descriptor)) {}
108+
109+
ARCTICDB_MOVE_ONLY_DEFAULT(MultiSymbolReadOutput)
110+
111+
std::vector<VersionedItem> versioned_items_;
112+
std::vector<proto::descriptors::UserDefinedMetadata> metadatas_;
113+
FrameAndDescriptor frame_and_descriptor_;
114+
};
115+
} // namespace version_store
119116

120117
} // namespace arcticdb

cpp/arcticdb/pipeline/pipeline_utils.hpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,80 @@ inline void apply_type_handlers(SegmentInMemory seg, std::any& handler_data, Out
4747
}
4848
}
4949

50+
inline ReadResult create_python_read_result(
51+
const std::variant<VersionedItem, std::vector<VersionedItem>>& version, OutputFormat output_format,
52+
FrameAndDescriptor&& fd,
53+
std::optional<std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>&& user_meta = std::nullopt,
54+
std::vector<version_store::ReadVersionOutput>&& node_outputs = {}
55+
) {
56+
auto result = std::move(fd);
57+
58+
// If version is a vector then this was a multi-symbol join, so the user_meta vector should have a value
59+
// Otherwise, there is a single piece of metadata on the frame descriptor
60+
util::check(
61+
std::holds_alternative<VersionedItem>(version) ^ user_meta.has_value(),
62+
"Unexpected argument combination to create_python_read_result"
63+
);
64+
65+
// Very old (pre Nov-2020) PandasIndex protobuf messages had no "start" or "step" fields. If is_physically_stored
66+
// (renamed from is_not_range_index) was false, the index was always RangeIndex(num_rows, 1)
67+
// This used to be handled in the Python layer by passing None to the DataFrame index parameter, which would then
68+
// default to RangeIndex(num_rows, 1). However, the empty index also has is_physically_stored as false, and because
69+
// integer protobuf fields default to zero if they are not present on the wire, it is impossible to tell from
70+
// the normalization metadata alone if the data was written with an empty index, or with a very old range index.
71+
// We therefore patch the normalization metadata here in this case
72+
auto norm_meta = result.desc_.mutable_proto().mutable_normalization();
73+
if (norm_meta->has_df() || norm_meta->has_series()) {
74+
auto common = norm_meta->has_df() ? norm_meta->mutable_df()->mutable_common()
75+
: norm_meta->mutable_series()->mutable_common();
76+
if (common->has_index()) {
77+
auto index = common->mutable_index();
78+
if (result.desc_.index().type() == IndexDescriptor::Type::ROWCOUNT && !index->is_physically_stored() &&
79+
index->start() == 0 && index->step() == 0) {
80+
index->set_step(1);
81+
}
82+
}
83+
}
84+
85+
auto get_python_frame = [output_format](auto& result) -> OutputFrame {
86+
if (output_format == OutputFormat::ARROW) {
87+
return ArrowOutputFrame{segment_to_arrow_data(result.frame_)};
88+
} else {
89+
return pipelines::PandasOutputFrame{result.frame_};
90+
}
91+
};
92+
auto python_frame = get_python_frame(result);
93+
util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__);
94+
95+
const auto& desc_proto = result.desc_.proto();
96+
std::variant<
97+
arcticdb::proto::descriptors::UserDefinedMetadata,
98+
std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>
99+
metadata;
100+
if (user_meta.has_value()) {
101+
metadata = std::move(*user_meta);
102+
} else {
103+
metadata = std::move(desc_proto.user_meta());
104+
}
105+
106+
std::vector<NodeReadResult> node_results;
107+
for (auto& node_output : node_outputs) {
108+
auto& node_fd = node_output.frame_and_descriptor_;
109+
auto node_python_frame = get_python_frame(node_fd);
110+
auto node_metadata = node_fd.desc_.proto().normalization();
111+
node_results.emplace_back(
112+
node_output.versioned_item_.symbol(), std::move(node_python_frame), std::move(node_metadata)
113+
);
114+
}
115+
return {version,
116+
std::move(python_frame),
117+
output_format,
118+
desc_proto.normalization(),
119+
metadata,
120+
desc_proto.multi_key_meta(),
121+
std::move(node_results)};
122+
}
123+
50124
inline ReadResult read_result_from_single_frame(
51125
FrameAndDescriptor& frame_and_desc, const AtomKey& key, std::any& handler_data, OutputFormat output_format
52126
) {

cpp/arcticdb/python/adapt_read_dataframe.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ inline py::tuple adapt_read_df(ReadResult&& ret, std::any* const handler_data) {
3434
}
3535
);
3636
auto multi_key_meta = python_util::pb_to_python(ret.multi_key_meta);
37-
return py::make_tuple(ret.item, std::move(ret.frame_data), pynorm, pyuser_meta, multi_key_meta, ret.multi_keys);
37+
auto node_results = python_util::node_results_to_python_list(std::move(ret.node_results));
38+
return py::make_tuple(
39+
ret.item, std::move(ret.frame_data), pynorm, pyuser_meta, multi_key_meta, std::move(node_results)
40+
);
3841
};
3942

4043
} // namespace arcticdb

cpp/arcticdb/python/python_utils.hpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,16 @@ class PyTimestampRange {
224224
timestamp end_;
225225
};
226226

227+
inline py::list node_results_to_python_list(std::vector<NodeReadResult>&& node_results) {
228+
py::list node_results_list;
229+
for (auto& node_result : node_results) {
230+
node_results_list.append(py::make_tuple(
231+
node_result.symbol_, std::move(node_result.frame_data_), pb_to_python(node_result.norm_meta_)
232+
));
233+
}
234+
return node_results_list;
235+
}
236+
227237
inline py::list adapt_read_dfs(std::vector<std::variant<ReadResult, DataError>>&& r, std::any* const handler) {
228238
auto ret = std::move(r);
229239
py::list lst;
@@ -232,22 +242,22 @@ inline py::list adapt_read_dfs(std::vector<std::variant<ReadResult, DataError>>&
232242
util::variant_match(
233243
res,
234244
[&lst, &output_format](ReadResult& read_result) {
235-
auto pynorm = python_util::pb_to_python(read_result.norm_meta);
245+
auto pynorm = pb_to_python(read_result.norm_meta);
236246
util::check(
237247
std::holds_alternative<proto::descriptors::UserDefinedMetadata>(read_result.user_meta),
238248
"Expected single user metadata in adapt_read_dfs, received vector"
239249
);
240-
auto pyuser_meta = python_util::pb_to_python(
241-
std::get<proto::descriptors::UserDefinedMetadata>(read_result.user_meta)
242-
);
243-
auto multi_key_meta = python_util::pb_to_python(read_result.multi_key_meta);
250+
auto pyuser_meta =
251+
pb_to_python(std::get<proto::descriptors::UserDefinedMetadata>(read_result.user_meta));
252+
auto multi_key_meta = pb_to_python(read_result.multi_key_meta);
253+
auto node_results = node_results_to_python_list(std::move(read_result.node_results));
244254
lst.append(py::make_tuple(
245255
read_result.item,
246256
std::move(read_result.frame_data),
247257
pynorm,
248258
pyuser_meta,
249259
multi_key_meta,
250-
read_result.multi_keys
260+
std::move(node_results)
251261
));
252262
util::check(
253263
!output_format.has_value() || output_format.value() == read_result.output_format,

0 commit comments

Comments
 (0)