Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ set(arcticdb_srcs
version/version_store_api.hpp
version/version_store_objects.hpp
version/version_utils.hpp
version/merge_options.hpp
# CPP files
arrow/arrow_handlers.cpp
arrow/arrow_output_frame.cpp
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class SegmentInMemory {
}

template<typename T>
requires std::same_as<std::decay_t<T>, std::string>
requires std::convertible_to<T, std::string_view>
void set_scalar(position_t idx, const T& val) {
impl_->set_string(idx, val);
}
Expand Down
15 changes: 15 additions & 0 deletions cpp/arcticdb/entity/native_tensor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

// for std::accumulate
#include <numeric>
#include <span>

#include <pybind11/numpy.h>

Expand Down Expand Up @@ -145,6 +146,20 @@ struct NativeTensor {

NativeTensor& request() { return *this; }

template<typename T>
std::span<const T> as_span() const {
debug::check<ErrorCode::E_ASSERTION_FAILURE>(ndim() == 1, "Can only convert 1D NativeTensor to span");
debug::check<ErrorCode::E_ASSERTION_FAILURE>(elsize() > 0, "Cannot convert NativeTensor with elsize 0 to span");
debug::check<ErrorCode::E_ASSERTION_FAILURE>(
details::visit_type(
dt_, []<typename TypeTag>(TypeTag) { return std::is_same_v<typename TypeTag::raw_type, T>; }
),
"Type mismatch when converting {} to span",
dt_
);
return std::span<const T>{static_cast<const T*>(ptr), nbytes() / sizeof(T)};
}

util::MagicNum<'T', 'n', 's', 'r'> magic_;
int64_t nbytes_;
int ndim_;
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/entity/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ requires std::is_base_of_v<DataTypeTagBase, DT> && std::is_base_of_v<DimensionTa
struct TypeDescriptorTag {
using DataTypeTag = DT;
using DimensionTag = D;
using RawType = typename DataTypeTag::raw_type;
explicit constexpr operator TypeDescriptor() const { return type_descriptor(); }

[[nodiscard]] static constexpr Dimension dimension() { return DimensionTag::value; }
Expand Down
9 changes: 8 additions & 1 deletion cpp/arcticdb/pipeline/frame_slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,18 @@ struct SliceAndKey {

bool invalid() const { return (!segment_ && !key_) || (segment_ && segment_->is_null()); }

const AtomKey& key() const {
const AtomKey& key() const& {
util::check(static_cast<bool>(key_), "No key found");
return *key_;
}

void set_key(AtomKey&& key) { key_ = std::move(key); }

AtomKey&& key() && {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(key_, "No key found");
return std::move(*key_);
}

void unset_segment() { segment_ = std::nullopt; }

void set_segment(SegmentInMemory&& seg) { segment_ = std::move(seg); }
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/pipeline/input_tensor_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ struct InputTensorFrame {
}
}

SortedValue sorted() const { return desc.sorted(); }

void set_bucketize_dynamic(bool bucketize) const { bucketize_dynamic = bucketize; }

bool has_index() const { return desc.index().field_count() != 0ULL; }
Expand Down
10 changes: 6 additions & 4 deletions cpp/arcticdb/python/normalization_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ bool check_ndarray_append(const NormalizationMetadata& old_norm, NormalizationMe
}

void fix_normalization_or_throw(
bool is_append, const pipelines::index::IndexSegmentReader& existing_isr,
NormalizationOperation operation, const pipelines::index::IndexSegmentReader& existing_isr,
const pipelines::InputTensorFrame& new_frame
) {
auto& old_norm = existing_isr.tsd().proto().normalization();
auto& new_norm = new_frame.norm_meta;
normalization::check<ErrorCode::E_INCOMPATIBLE_OBJECTS>(
old_norm.input_type_case() == new_frame.norm_meta.input_type_case(),
"{} can be performed only on objects of the same type. Existing type is {} new type is {}.",
is_append ? "Append" : "Update",
operation,
old_norm.input_type_case(),
new_frame.norm_meta.input_type_case()
);
Expand All @@ -215,13 +215,15 @@ void fix_normalization_or_throw(
}
return;
}
if (is_append) {
if (operation == NormalizationOperation::APPEND) {
if (check_ndarray_append(old_norm, new_norm))
return;
} else {
// ndarray normalizes to a ROWCOUNT frame and we don't support update on those
normalization::check<ErrorCode::E_UPDATE_NOT_SUPPORTED>(
!old_norm.has_np() && !new_norm.has_np(), "current normalization scheme doesn't allow update of ndarray"
!old_norm.has_np() && !new_norm.has_np(),
"current normalization scheme doesn't allow performing {} of ndarray",
operation
);
}
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/python/normalization_checks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include <arcticdb/util/error_code.hpp>
#include <arcticdb/version/schema_checks.hpp>

namespace arcticdb {

Expand All @@ -25,7 +26,7 @@ struct IndexSegmentReader;
* The new frame for append/update is compatible with the existing index. Throws various exceptions if not.
*/
void fix_normalization_or_throw(
bool is_append, const pipelines::index::IndexSegmentReader& existing_isr,
NormalizationOperation op, const pipelines::index::IndexSegmentReader& existing_isr,
const pipelines::InputTensorFrame& new_frame
);
} // namespace arcticdb
25 changes: 25 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,31 @@ VersionedItem LocalVersionedEngine::delete_range_internal(
return versioned_item;
}

VersionedItem LocalVersionedEngine::merge_internal(
const StreamId& stream_id, const std::shared_ptr<InputTensorFrame>& source, bool prune_previous_versions,
const MergeStrategy& strategy, std::span<const std::string> on, bool match_on_timeseries_index
) {
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: merge");
py::gil_scoped_release release_gil;
const auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
if (update_info.previous_index_key_.has_value()) {
if (source->empty()) {
ARCTICDB_DEBUG(
log::version(),
"Merging into existing data with an empty source has no effect. \n No new version is being created "
"for symbol='{}', and the last version is returned",
stream_id
);
return VersionedItem{*std::move(update_info.previous_index_key_)};
}
auto versioned_item =
merge_impl(store(), source, update_info, get_write_options(), strategy, on, match_on_timeseries_index);
write_version_and_prune_previous(prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
}
storage::raise<ErrorCode::E_SYMBOL_NOT_FOUND>("Cannot merge into non-existent symbol \"{}\".", stream_id);
}

VersionedItem LocalVersionedEngine::update_internal(
const StreamId& stream_id, const UpdateQuery& query, const std::shared_ptr<InputTensorFrame>& frame,
bool upsert, bool dynamic_schema, bool prune_previous_versions
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <arcticdb/version/versioned_engine.hpp>
#include <arcticdb/entity/descriptor_item.hpp>
#include <arcticdb/entity/data_error.hpp>
#include <arcticdb/version/merge_options.hpp>

namespace arcticdb::version_store {

Expand Down Expand Up @@ -67,6 +68,11 @@ class LocalVersionedEngine : public VersionedEngine {

virtual ~LocalVersionedEngine() = default;

VersionedItem merge_internal(
const StreamId& stream_id, const std::shared_ptr<InputTensorFrame>& source, bool prune_previous_versions,
const MergeStrategy& strategy, std::span<const std::string> on, bool match_on_timeseries_index
) override;

VersionedItem update_internal(
const StreamId& stream_id, const UpdateQuery& query, const std::shared_ptr<InputTensorFrame>& frame,
bool upsert, bool dynamic_schema, bool prune_previous_versions
Expand Down
17 changes: 17 additions & 0 deletions cpp/arcticdb/version/merge_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Copyright 2025 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software
* will be governed by the Apache License, version 2.0.
*/
#pragma once

namespace arcticdb {
enum class MergeAction : uint8_t { DO_NOTHING, UPDATE, INSERT };
struct MergeStrategy {
MergeAction matched;
MergeAction not_matched_by_target;
};

} // namespace arcticdb
11 changes: 11 additions & 0 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <arcticdb/version/schema_checks.hpp>
#include <arcticdb/util/pybind_mutex.hpp>
#include <arcticdb/storage/storage_exceptions.hpp>
#include <arcticdb/version/merge_options.hpp>

namespace arcticdb::version_store {

Expand Down Expand Up @@ -241,6 +242,12 @@ void register_bindings(py::module& version, py::exception<arcticdb::ArcticExcept
.value("PANDAS", OutputFormat::PANDAS)
.value("ARROW", OutputFormat::ARROW);

py::enum_<MergeAction>(version, "MergeAction")
.value("DO_NOTHING", MergeAction::DO_NOTHING)
.value("UPDATE", MergeAction::UPDATE)
.value("INSERT", MergeAction::INSERT)
.export_values();

py::class_<ReadOptions>(version, "PythonVersionStoreReadOptions")
.def(py::init())
.def("set_force_strings_to_object", &ReadOptions::set_force_strings_to_object)
Expand Down Expand Up @@ -632,6 +639,10 @@ void register_bindings(py::module& version, py::exception<arcticdb::ArcticExcept
&PythonVersionStore::append,
py::call_guard<SingleThreadMutexHolder>(),
"Append a dataframe to the most recent version")
.def("merge",
&PythonVersionStore::merge,
py::call_guard<SingleThreadMutexHolder>(),
"Merge a dataframe into the most recent version")
.def("append_incomplete",
&PythonVersionStore::append_incomplete,
py::call_guard<SingleThreadMutexHolder>(),
Expand Down
28 changes: 14 additions & 14 deletions cpp/arcticdb/version/schema_checks.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
#include <arcticdb/version/schema_checks.hpp>
#include <arcticdb/python/normalization_checks.hpp>
#include <arcticdb/pipeline/index_segment_reader.hpp>
#include <arcticdb/entity/type_utils.hpp>

namespace arcticdb {

std::string_view normalization_operation_str(NormalizationOperation operation) {
switch (operation) {
case APPEND:
return "APPEND";
case UPDATE:
return "UPDATE";
default:
util::raise_rte("Unknown operation type {}", static_cast<uint8_t>(operation));
}
}

IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& left, const IndexDescriptor::Type& right) {
if (left == right) {
return left;
Expand Down Expand Up @@ -139,13 +129,14 @@ bool columns_match(
}

void fix_descriptor_mismatch_or_throw(
NormalizationOperation operation, bool dynamic_schema, const pipelines::index::IndexSegmentReader& existing_isr,
const pipelines::InputTensorFrame& new_frame, bool empty_types
arcticdb::NormalizationOperation operation, bool dynamic_schema,
const pipelines::index::IndexSegmentReader& existing_isr, const pipelines::InputTensorFrame& new_frame,
bool empty_types
) {
const auto& old_sd = existing_isr.tsd().as_stream_descriptor();
check_normalization_index_match(operation, old_sd, new_frame, empty_types);

fix_normalization_or_throw(operation == APPEND, existing_isr, new_frame);
fix_normalization_or_throw(operation, existing_isr, new_frame);

// We need to check that the index names match regardless of the dynamic schema setting
if (!index_names_match(old_sd, new_frame.desc)) {
Expand Down Expand Up @@ -186,4 +177,13 @@ void fix_descriptor_mismatch_or_throw(
);
}
}

StreamDescriptorMismatch::StreamDescriptorMismatch(
const char* preamble, const StreamId& stream_id, const StreamDescriptor& existing,
const StreamDescriptor& new_val, NormalizationOperation operation
) :
ArcticSpecificException(fmt::format(
"{}: {}; stream_id=\"{}\"; existing=\"{}\"; new_val=\"{}\"", preamble, operation, stream_id,
existing.fields(), new_val.fields()
)) {}
} // namespace arcticdb
38 changes: 28 additions & 10 deletions cpp/arcticdb/version/schema_checks.hpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
#pragma once

#include <arcticdb/pipeline/input_tensor_frame.hpp>
#include <arcticdb/python/normalization_checks.hpp>

namespace arcticdb {

enum NormalizationOperation : uint8_t {
APPEND,
UPDATE,
};
namespace pipelines::index {
struct IndexSegmentReader;
}

enum NormalizationOperation : uint8_t { APPEND, UPDATE, MERGE };

std::string_view normalization_operation_str(NormalizationOperation operation);

struct StreamDescriptorMismatch : ArcticSpecificException<ErrorCode::E_DESCRIPTOR_MISMATCH> {
StreamDescriptorMismatch(
const char* preamble, const StreamId& stream_id, const StreamDescriptor& existing,
const StreamDescriptor& new_val, NormalizationOperation operation
) :
ArcticSpecificException(fmt::format(
"{}: {}; stream_id=\"{}\"; existing=\"{}\"; new_val=\"{}\"", preamble,
normalization_operation_str(operation), stream_id, existing.fields(), new_val.fields()
)) {}
);
};

IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& left, const IndexDescriptor::Type& right);
Expand All @@ -42,3 +38,25 @@
const pipelines::InputTensorFrame& new_frame, bool empty_types
);
} // namespace arcticdb

template<>
struct fmt::formatter<arcticdb::NormalizationOperation> {

template<typename ParseContext>
constexpr auto parse(ParseContext& ctx) {
return ctx.begin();
}

template<typename FormatContext>
auto format(const arcticdb::NormalizationOperation op, FormatContext& ctx) const {
using namespace arcticdb::entity;
switch (op) {
case arcticdb::NormalizationOperation::APPEND:
return fmt::format_to(ctx.out(), "APPEND");
case arcticdb::NormalizationOperation::UPDATE:
return fmt::format_to(ctx.out(), "UPDATE");
case arcticdb::NormalizationOperation::MERGE:
return fmt::format_to(ctx.out(), "MERGE");
}
}

Check warning on line 61 in cpp/arcticdb/version/schema_checks.hpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

'fmt::v10::formatter<enum arcticdb::NormalizationOperation,char,void>::format<fmt::v10::basic_format_context<fmt::v10::appender,char> >': not all control paths return a value

Check failure on line 61 in cpp/arcticdb/version/schema_checks.hpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

the following warning is treated as an error
};
Loading
Loading