Skip to content

Commit

Permalink
Address PR comment
Browse files Browse the repository at this point in the history
  • Loading branch information
phoebusm committed Jul 2, 2024
1 parent 3adbed4 commit bc0bc7c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 12 deletions.
8 changes: 4 additions & 4 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -812,15 +812,15 @@ std::unordered_map<StreamId, VersionId> min_versions_for_each_stream(const std::
folly::Future<folly::Unit> delete_trees_responsibly(
std::shared_ptr<Store> store,
std::shared_ptr<VersionMap> &version_map,
const std::vector<IndexTypeKey>& idx_to_be_deleted,
const std::vector<IndexTypeKey>& orig_keys_to_delete,
const arcticdb::MasterSnapshotMap& snapshot_map,
const std::optional<SnapshotId>& snapshot_being_deleted,
const PreDeleteChecks& check,
bool dry_run) {
const bool dry_run) {
ARCTICDB_SAMPLE(DeleteTree, 0)
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: delete_tree");

util::ContainerFilterWrapper keys_to_delete(idx_to_be_deleted);
util::ContainerFilterWrapper keys_to_delete(orig_keys_to_delete);
util::ContainerFilterWrapper not_to_delete(check.could_share_data);

// Each section below performs these checks:
Expand Down Expand Up @@ -858,7 +858,7 @@ folly::Future<folly::Unit> delete_trees_responsibly(
if (load_type != LoadType::NOT_LOADED) {
std::unordered_map<StreamId, std::shared_ptr<VersionMapEntry>> entry_map;
{
auto min_versions = min_versions_for_each_stream(idx_to_be_deleted);
auto min_versions = min_versions_for_each_stream(orig_keys_to_delete);
for (const auto& min : min_versions) {
auto load_param = load_type == LoadType::LOAD_DOWNTO
? LoadParameter{load_type, static_cast<SignedVersionId>(min.second)}
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ struct SortMergeOptions {
folly::Future<folly::Unit> delete_trees_responsibly(
std::shared_ptr<Store> store,
std::shared_ptr<VersionMap> &version_map,
const std::vector<IndexTypeKey>& idx_to_be_deleted,
const std::vector<IndexTypeKey>& orig_keys_to_delete,
const arcticdb::MasterSnapshotMap& snapshot_map,
const std::optional<SnapshotId>& snapshot_being_deleted = std::nullopt,
const PreDeleteChecks& check = default_pre_delete_checks,
bool dry_run = false
const bool dry_run = false
);

class LocalVersionedEngine : public VersionedEngine {
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/version_map_batch_methods.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum class BatchGetVersionOption {
COUNT
};

inline std::optional<std::string> collect_futures_exceptions(auto futures) {
inline std::optional<std::string> collect_futures_exceptions(auto&& futures) {
std::optional<std::string> all_exceptions;
for (auto&& collected_fut: futures) {
if (!collected_fut.hasValue()) {
Expand All @@ -60,7 +60,7 @@ inline void submit_tasks_for_range(const Inputs& inputs, TaskSubmitter submitter
}, window_size);

auto collected_futs = folly::collectAll(futures).get();
std::optional<std::string> all_exceptions = collect_futures_exceptions(collected_futs);
std::optional<std::string> all_exceptions = collect_futures_exceptions(std::move(collected_futs));
internal::check<ErrorCode::E_RUNTIME_ERROR>(!all_exceptions.has_value(), all_exceptions.value_or(""));
}

Expand Down
14 changes: 10 additions & 4 deletions cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,20 +358,23 @@ void PythonVersionStore::add_to_snapshot(
util::check(inserted, "Multiple elements in add_to_snapshot with key {}", id_version.first);
}

bool is_delete_keys_immediately = variant_key_type(snap_key) != KeyType::SNAPSHOT_REF || !cfg().write_options().delayed_deletes();
for(auto&& key : snapshot_contents) {
auto new_version = affected_keys.find(key.id());
if(new_version == std::end(affected_keys)) {
retained_keys.emplace_back(std::move(key));
} else {
deleted_keys.emplace_back(std::move(key));
if (is_delete_keys_immediately) {
deleted_keys.emplace_back(std::move(key));
}
}
}

for(auto&& [id, key] : *specific_versions_index_map)
retained_keys.emplace_back(std::move(key));

std::sort(std::begin(retained_keys), std::end(retained_keys));
if(variant_key_type(snap_key) != KeyType::SNAPSHOT_REF || !cfg().write_options().delayed_deletes()) {
if(is_delete_keys_immediately) {
delete_trees_responsibly(store(), version_map(), deleted_keys, get_master_snapshots_map(store()), snap_name).get();
if (version_map()->log_changes()) {
log_delete_snapshot(store(), snap_name);
Expand Down Expand Up @@ -400,17 +403,20 @@ void PythonVersionStore::remove_from_snapshot(
symbol_versions.emplace(stream_ids[i], version_ids[i]);
}

bool is_delete_keys_immediately = variant_key_type(snap_key) != KeyType::SNAPSHOT_REF || !cfg().write_options().delayed_deletes();
std::vector<AtomKey> deleted_keys;
std::vector<AtomKey> retained_keys;
for(auto&& key : snapshot_contents) {
if(symbol_versions.find(SymbolVersion{key.id(), key.version_id()}) == symbol_versions.end()) {
retained_keys.emplace_back(std::move(key));
} else {
deleted_keys.emplace_back(std::move(key));
if (is_delete_keys_immediately) {
deleted_keys.emplace_back(std::move(key));
}
}
}

if(variant_key_type(snap_key) != KeyType::SNAPSHOT_REF || !cfg().write_options().delayed_deletes()) {
if(is_delete_keys_immediately) {
delete_trees_responsibly(store(), version_map(), deleted_keys, get_master_snapshots_map(store()), snap_name).get();
if (version_map()->log_changes()) {
log_delete_snapshot(store(), snap_name);
Expand Down

0 comments on commit bc0bc7c

Please sign in to comment.