Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix455
Browse files Browse the repository at this point in the history
  • Loading branch information
JinHai-CN committed Jan 9, 2025
2 parents a6b069b + b8477cb commit 7a4df25
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 26 deletions.
9 changes: 9 additions & 0 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
SnapshotOp snapshot_operation = snapshot_cmd->operation();
SnapshotScope snapshot_scope = snapshot_cmd->scope();
const String &snapshot_name = snapshot_cmd->name();

{
// Full checkpoint
Txn *txn = query_context->GetTxn();
auto checkpoint_task = MakeShared<ForceCheckpointTask>(txn, true);
query_context->storage()->bg_processor()->Submit(checkpoint_task);
checkpoint_task->Wait();
}

switch (snapshot_operation) {
case SnapshotOp::kCreate: {
LOG_INFO(fmt::format("Execute snapshot create"));
Expand Down
6 changes: 3 additions & 3 deletions src/executor/operator/snapshot/snapshot_brief.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ Vector<SnapshotBrief> SnapshotBrief::GetSnapshots(const String &dir) {

snapshot_brief.commit_ts_ = snapshot_json["commit_ts"];

std::filesystem::path compressed_file(snapshot_path);
compressed_file.replace_extension("lz4");
snapshot_brief.size_ = VirtualStore::GetFileSize(compressed_file.string());
// std::filesystem::path compressed_file(snapshot_path);
// compressed_file.replace_extension("lz4");
snapshot_brief.size_ = VirtualStore::GetDirectorySize(dir);


struct stat statbuf;
Expand Down
13 changes: 7 additions & 6 deletions src/executor/operator/snapshot/table_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import table_entry;
import status;
import third_party;
import config;
import infinity_exception;

namespace infinity {

Status Snapshot::CreateTableSnapshot(QueryContext *query_context, const String &snapshot_name, const String &table_name) {
Txn *txn_ptr = query_context->GetTxn();
const String &db_name = query_context->schema_name();
Tuple<TableEntry *, Status> result = txn_ptr->GetTableByName(db_name, table_name);
TableEntry *table_entry_ptr = std::get<0>(result);
Status table_status = std::get<1>(result);
if (!table_status.ok()) {
return table_status;

SharedPtr<TableSnapshotInfo> table_snapshot;
Status status;
std::tie(table_snapshot, status) = txn_ptr->GetTableSnapshot(db_name, table_name);
if (!status.ok()) {
RecoverableError(status);
}
SharedPtr<TableSnapshotInfo> table_snapshot = table_entry_ptr->GetSnapshotInfo(txn_ptr);
table_snapshot->snapshot_name_ = snapshot_name;
String snapshot_dir = query_context->global_config()->SnapshotDir();
table_snapshot->Serialize(snapshot_dir);
Expand Down
38 changes: 21 additions & 17 deletions src/storage/common/snapshot_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import persistence_manager;
import persist_result_handler;
import defer_op;
import utility;
import block_version;

namespace infinity {

Expand Down Expand Up @@ -97,8 +98,8 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
PersistenceManager *persistence_manager = InfinityContext::instance().persistence_manager();

// Create compressed file
String compressed_filename = fmt::format("{}/{}.lz4", save_dir, snapshot_name_);
std::ofstream output_stream = VirtualStore::BeginCompress(compressed_filename);
// String compressed_filename = fmt::format("{}/{}.lz4", save_dir, snapshot_name_);
// std::ofstream output_stream = VirtualStore::BeginCompress(compressed_filename);

// Get files
Vector<String> original_files = GetFiles();
Expand Down Expand Up @@ -152,10 +153,10 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
}
write_file_handle->Sync();

Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
if (!compress_status.ok()) {
RecoverableError(compress_status);
}
// Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
// if (!compress_status.ok()) {
// RecoverableError(compress_status);
// }
}
} else {
String data_dir = config->DataDir();
Expand All @@ -168,20 +169,20 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
RecoverableError(copy_status);
}

Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
if (!compress_status.ok()) {
RecoverableError(compress_status);
}
// Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
// if (!compress_status.ok()) {
// RecoverableError(compress_status);
// }
}
}

VirtualStore::EndCompress(output_stream);
// VirtualStore::EndCompress(output_stream);

// String md5 = CalcMD5(compressed_filename);
// String md5 = CalcMD5(compressed_filename);

// Remove the directory
// String directory = fmt::format("{}/{}", save_dir, snapshot_name_);
// VirtualStore::RemoveDirectory(directory);
// String directory = fmt::format("{}/{}", save_dir, snapshot_name_);
// VirtualStore::RemoveDirectory(directory);

nlohmann::json json_res;
json_res["version"] = version_;
Expand All @@ -190,7 +191,7 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
json_res["database_name"] = db_name_;
json_res["table_name"] = table_name_;
json_res["table_comment"] = table_comment_;
json_res["md5"] = "empty";
// json_res["md5"] = md5;

json_res["txn_id"] = txn_id_;
json_res["begin_ts"] = begin_ts_;
Expand Down Expand Up @@ -261,14 +262,17 @@ Vector<String> TableSnapshotInfo::GetFiles() const {
files.emplace_back(VirtualStore::ConcatenatePath(block_snapshot->block_dir_, outline_snapshot->filename_));
}
}

files.emplace_back(VirtualStore::ConcatenatePath(block_snapshot->block_dir_, *BlockVersion::FileName()));
}
}

for (const auto &table_index_snapshot_pair : table_index_snapshots_) {
for (const auto &segment_index_snapshot_pair : table_index_snapshot_pair.second->index_by_segment_) {
for (const auto &chunk_index_snapshot : segment_index_snapshot_pair.second->chunk_index_snapshots_) {
if(chunk_index_snapshot->files_.empty()) {
files.emplace_back(VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_, chunk_index_snapshot->base_name_));
if (chunk_index_snapshot->files_.empty()) {
files.emplace_back(
VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_, chunk_index_snapshot->base_name_));
} else {
files.insert(files.end(), chunk_index_snapshot->files_.cbegin(), chunk_index_snapshot->files_.cend());
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ SharedPtr<BlockColumnSnapshotInfo> BlockColumnEntry::GetSnapshotInfo() const {
String outline_file_path = *OutlineFilename(file_idx);
SharedPtr<OutlineSnapshotInfo> outline_snapshot_info = MakeShared<OutlineSnapshotInfo>();
outline_snapshot_info->filename_ = outline_file_path;
block_column_snapshot_info->outline_snapshots_.emplace_back(outline_snapshot_info);
}
return block_column_snapshot_info;
}
Expand Down
17 changes: 17 additions & 0 deletions src/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,23 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
return result;
}

Tuple<SizeT, Status> PersistenceManager::GetDirectorySize(const String &path_str) {
SizeT total_size = 0;

if (!VirtualStore::Exists(path_str)) {
return {0, Status::IOError(fmt::format("{} doesn't exist.", path_str))};
}

const Path path(path_str);
for (const auto &entry : fs::recursive_directory_iterator(path)) {
if (fs::is_regular_file(entry.status())) {
total_size += fs::file_size(entry.path());
}
}

return {total_size, Status::OK()};
}

Tuple<SizeT, Status> PersistenceManager::GetFileSize(const String &file_path) {
PersistReadResult result;

Expand Down
2 changes: 2 additions & 0 deletions src/storage/persistence/persistence_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public:

Tuple<SizeT, Status> GetFileSize(const String &file_path);

Tuple<SizeT, Status> GetDirectorySize(const String &path_str);

ObjAddr GetObjCacheWithoutCnt(const String &local_path);

[[nodiscard]] PersistWriteResult PutObjCache(const String &file_path);
Expand Down

0 comments on commit 7a4df25

Please sign in to comment.