Skip to content

Commit 33f3a23

Browse files
committed
Setup basic structure for the merge update and create python bindings
Add prune_previous, metadata, on and match_on_index to the skeleton. Imlement checks for features that are not yet implemented Initial implementation Set types properly Fix tests Fix unit tests Enable more tests
1 parent 4a9edc5 commit 33f3a23

21 files changed

+640
-344
lines changed

cpp/arcticdb/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ set(arcticdb_srcs
411411
version/version_store_api.hpp
412412
version/version_store_objects.hpp
413413
version/version_utils.hpp
414+
version/merge_options.hpp
414415
# CPP files
415416
arrow/arrow_handlers.cpp
416417
arrow/arrow_output_frame.cpp

cpp/arcticdb/column_store/memory_segment.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class SegmentInMemory {
6565
}
6666

6767
template<typename T>
68-
requires std::same_as<std::decay_t<T>, std::string>
68+
requires std::convertible_to<T, std::string_view>
6969
void set_scalar(position_t idx, const T& val) {
7070
impl_->set_string(idx, val);
7171
}

cpp/arcticdb/entity/native_tensor.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,20 @@ struct NativeTensor {
145145

146146
NativeTensor& request() { return *this; }
147147

148+
template<typename T>
149+
std::span<const T> as_span() const {
150+
debug::check<ErrorCode::E_ASSERTION_FAILURE>(ndim() == 1, "Can only convert 1D NativeTensor to span");
151+
debug::check<ErrorCode::E_ASSERTION_FAILURE>(elsize() > 0, "Cannot convert NativeTensor with elsize 0 to span");
152+
debug::check<ErrorCode::E_ASSERTION_FAILURE>(
153+
details::visit_type(
154+
dt_, []<typename TypeTag>(TypeTag) { return std::is_same_v<typename TypeTag::raw_type, T>; }
155+
),
156+
"Type mismatch when converting {} to span",
157+
dt_
158+
);
159+
return std::span<const T>{static_cast<const T*>(ptr), nbytes() / sizeof(T)};
160+
}
161+
148162
util::MagicNum<'T', 'n', 's', 'r'> magic_;
149163
int64_t nbytes_;
150164
int ndim_;

cpp/arcticdb/entity/types.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ requires std::is_base_of_v<DataTypeTagBase, DT> && std::is_base_of_v<DimensionTa
481481
struct TypeDescriptorTag {
482482
using DataTypeTag = DT;
483483
using DimensionTag = D;
484+
using RawType = typename DataTypeTag::raw_type;
484485
explicit constexpr operator TypeDescriptor() const { return type_descriptor(); }
485486

486487
[[nodiscard]] static constexpr Dimension dimension() { return DimensionTag::value; }

cpp/arcticdb/pipeline/frame_slice.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,11 +256,18 @@ struct SliceAndKey {
256256

257257
bool invalid() const { return (!segment_ && !key_) || (segment_ && segment_->is_null()); }
258258

259-
const AtomKey& key() const {
259+
const AtomKey& key() const& {
260260
util::check(static_cast<bool>(key_), "No key found");
261261
return *key_;
262262
}
263263

264+
void set_key(AtomKey&& key) { key_ = std::move(key); }
265+
266+
AtomKey&& key() && {
267+
internal::check<ErrorCode::E_ASSERTION_FAILURE>(key_, "No key found");
268+
return std::move(*key_);
269+
}
270+
264271
void unset_segment() { segment_ = std::nullopt; }
265272

266273
void set_segment(SegmentInMemory&& seg) { segment_ = std::move(seg); }

cpp/arcticdb/pipeline/input_tensor_frame.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ struct InputTensorFrame {
5656
}
5757
}
5858

59+
SortedValue sorted() const { return desc.sorted(); }
60+
5961
void set_bucketize_dynamic(bool bucketize) const { bucketize_dynamic = bucketize; }
6062

6163
bool has_index() const { return desc.index().field_count() != 0ULL; }

cpp/arcticdb/python/normalization_checks.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,15 @@ bool check_ndarray_append(const NormalizationMetadata& old_norm, NormalizationMe
195195
}
196196

197197
void fix_normalization_or_throw(
198-
bool is_append, const pipelines::index::IndexSegmentReader& existing_isr,
198+
NormalizationOperation operation, const pipelines::index::IndexSegmentReader& existing_isr,
199199
const pipelines::InputTensorFrame& new_frame
200200
) {
201201
auto& old_norm = existing_isr.tsd().proto().normalization();
202202
auto& new_norm = new_frame.norm_meta;
203203
normalization::check<ErrorCode::E_INCOMPATIBLE_OBJECTS>(
204204
old_norm.input_type_case() == new_frame.norm_meta.input_type_case(),
205205
"{} can be performed only on objects of the same type. Existing type is {} new type is {}.",
206-
is_append ? "Append" : "Update",
206+
operation,
207207
old_norm.input_type_case(),
208208
new_frame.norm_meta.input_type_case()
209209
);
@@ -215,13 +215,15 @@ void fix_normalization_or_throw(
215215
}
216216
return;
217217
}
218-
if (is_append) {
218+
if (operation == NormalizationOperation::APPEND) {
219219
if (check_ndarray_append(old_norm, new_norm))
220220
return;
221221
} else {
222222
// ndarray normalizes to a ROWCOUNT frame and we don't support update on those
223223
normalization::check<ErrorCode::E_UPDATE_NOT_SUPPORTED>(
224-
!old_norm.has_np() && !new_norm.has_np(), "current normalization scheme doesn't allow update of ndarray"
224+
!old_norm.has_np() && !new_norm.has_np(),
225+
"current normalization scheme doesn't allow performing {} of ndarray",
226+
operation
225227
);
226228
}
227229
}

cpp/arcticdb/python/normalization_checks.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#pragma once
1010

1111
#include <arcticdb/util/error_code.hpp>
12+
#include <arcticdb/version/schema_checks.hpp>
1213

1314
namespace arcticdb {
1415

@@ -25,7 +26,7 @@ struct IndexSegmentReader;
2526
* The new frame for append/update is compatible with the existing index. Throws various exceptions if not.
2627
*/
2728
void fix_normalization_or_throw(
28-
bool is_append, const pipelines::index::IndexSegmentReader& existing_isr,
29+
NormalizationOperation op, const pipelines::index::IndexSegmentReader& existing_isr,
2930
const pipelines::InputTensorFrame& new_frame
3031
);
3132
} // namespace arcticdb

cpp/arcticdb/version/local_versioned_engine.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,31 @@ VersionedItem LocalVersionedEngine::delete_range_internal(
603603
return versioned_item;
604604
}
605605

606+
VersionedItem LocalVersionedEngine::merge_internal(
607+
const StreamId& stream_id, const std::shared_ptr<InputTensorFrame>& source, bool prune_previous_versions,
608+
const MergeStrategy& strategy, std::span<const std::string> on, bool match_on_timeseries_index
609+
) {
610+
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: merge");
611+
py::gil_scoped_release release_gil;
612+
const auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
613+
if (update_info.previous_index_key_.has_value()) {
614+
if (source->empty()) {
615+
ARCTICDB_DEBUG(
616+
log::version(),
617+
"Merging into existing data with an empty source has no effect. \n No new version is being created "
618+
"for symbol='{}', and the last version is returned",
619+
stream_id
620+
);
621+
return VersionedItem{*std::move(update_info.previous_index_key_)};
622+
}
623+
auto versioned_item =
624+
merge_impl(store(), source, update_info, get_write_options(), strategy, on, match_on_timeseries_index);
625+
write_version_and_prune_previous(prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
626+
return versioned_item;
627+
}
628+
storage::raise<ErrorCode::E_SYMBOL_NOT_FOUND>("Cannot merge into non-existent symbol \"{}\".", stream_id);
629+
}
630+
606631
VersionedItem LocalVersionedEngine::update_internal(
607632
const StreamId& stream_id, const UpdateQuery& query, const std::shared_ptr<InputTensorFrame>& frame,
608633
bool upsert, bool dynamic_schema, bool prune_previous_versions

cpp/arcticdb/version/local_versioned_engine.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <arcticdb/version/versioned_engine.hpp>
2222
#include <arcticdb/entity/descriptor_item.hpp>
2323
#include <arcticdb/entity/data_error.hpp>
24+
#include <arcticdb/version/merge_options.hpp>
2425

2526
namespace arcticdb::version_store {
2627

@@ -67,6 +68,11 @@ class LocalVersionedEngine : public VersionedEngine {
6768

6869
virtual ~LocalVersionedEngine() = default;
6970

71+
VersionedItem merge_internal(
72+
const StreamId& stream_id, const std::shared_ptr<InputTensorFrame>& source, bool prune_previous_versions,
73+
const MergeStrategy& strategy, std::span<const std::string> on, bool match_on_timeseries_index
74+
) override;
75+
7076
VersionedItem update_internal(
7177
const StreamId& stream_id, const UpdateQuery& query, const std::shared_ptr<InputTensorFrame>& frame,
7278
bool upsert, bool dynamic_schema, bool prune_previous_versions

0 commit comments

Comments
 (0)