Skip to content

Commit

Permalink
Finish table snapshot create
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN committed Jan 9, 2025
1 parent c1663df commit ebc4b9e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
13 changes: 7 additions & 6 deletions src/executor/operator/snapshot/table_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import table_entry;
import status;
import third_party;
import config;
import infinity_exception;

namespace infinity {

Status Snapshot::CreateTableSnapshot(QueryContext *query_context, const String &snapshot_name, const String &table_name) {
Txn *txn_ptr = query_context->GetTxn();
const String &db_name = query_context->schema_name();
Tuple<TableEntry *, Status> result = txn_ptr->GetTableByName(db_name, table_name);
TableEntry *table_entry_ptr = std::get<0>(result);
Status table_status = std::get<1>(result);
if (!table_status.ok()) {
return table_status;

SharedPtr<TableSnapshotInfo> table_snapshot;
Status status;
std::tie(table_snapshot, status) = txn_ptr->GetTableSnapshot(db_name, table_name);
if (!status.ok()) {
RecoverableError(status);
}
SharedPtr<TableSnapshotInfo> table_snapshot = table_entry_ptr->GetSnapshotInfo(txn_ptr);
table_snapshot->snapshot_name_ = snapshot_name;
String snapshot_dir = query_context->global_config()->SnapshotDir();
table_snapshot->Serialize(snapshot_dir);
Expand Down
38 changes: 21 additions & 17 deletions src/storage/common/snapshot_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import persistence_manager;
import persist_result_handler;
import defer_op;
import utility;
import block_version;

namespace infinity {

Expand Down Expand Up @@ -97,8 +98,8 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
PersistenceManager *persistence_manager = InfinityContext::instance().persistence_manager();

// Create compressed file
// String compressed_filename = fmt::format("{}/{}.lz4", save_dir, snapshot_name_);
// std::ofstream output_stream = VirtualStore::BeginCompress(compressed_filename);
// String compressed_filename = fmt::format("{}/{}.lz4", save_dir, snapshot_name_);
// std::ofstream output_stream = VirtualStore::BeginCompress(compressed_filename);

// Get files
Vector<String> original_files = GetFiles();
Expand Down Expand Up @@ -152,10 +153,10 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
}
write_file_handle->Sync();

// Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
// if (!compress_status.ok()) {
// RecoverableError(compress_status);
// }
// Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
// if (!compress_status.ok()) {
// RecoverableError(compress_status);
// }
}
} else {
String data_dir = config->DataDir();
Expand All @@ -168,20 +169,20 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
RecoverableError(copy_status);
}

// Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
// if (!compress_status.ok()) {
// RecoverableError(compress_status);
// }
// Status compress_status = VirtualStore::AddFileCompress(output_stream, dst_file_path);
// if (!compress_status.ok()) {
// RecoverableError(compress_status);
// }
}
}

// VirtualStore::EndCompress(output_stream);
// VirtualStore::EndCompress(output_stream);

// String md5 = CalcMD5(compressed_filename);
// String md5 = CalcMD5(compressed_filename);

// Remove the directory
// String directory = fmt::format("{}/{}", save_dir, snapshot_name_);
// VirtualStore::RemoveDirectory(directory);
// String directory = fmt::format("{}/{}", save_dir, snapshot_name_);
// VirtualStore::RemoveDirectory(directory);

nlohmann::json json_res;
json_res["version"] = version_;
Expand All @@ -190,7 +191,7 @@ void TableSnapshotInfo::Serialize(const String &save_dir) {
json_res["database_name"] = db_name_;
json_res["table_name"] = table_name_;
json_res["table_comment"] = table_comment_;
// json_res["md5"] = md5;
// json_res["md5"] = md5;

json_res["txn_id"] = txn_id_;
json_res["begin_ts"] = begin_ts_;
Expand Down Expand Up @@ -261,14 +262,17 @@ Vector<String> TableSnapshotInfo::GetFiles() const {
files.emplace_back(VirtualStore::ConcatenatePath(block_snapshot->block_dir_, outline_snapshot->filename_));
}
}

files.emplace_back(VirtualStore::ConcatenatePath(block_snapshot->block_dir_, *BlockVersion::FileName()));
}
}

for (const auto &table_index_snapshot_pair : table_index_snapshots_) {
for (const auto &segment_index_snapshot_pair : table_index_snapshot_pair.second->index_by_segment_) {
for (const auto &chunk_index_snapshot : segment_index_snapshot_pair.second->chunk_index_snapshots_) {
if(chunk_index_snapshot->files_.empty()) {
files.emplace_back(VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_, chunk_index_snapshot->base_name_));
if (chunk_index_snapshot->files_.empty()) {
files.emplace_back(
VirtualStore::ConcatenatePath(*table_index_snapshot_pair.second->index_dir_, chunk_index_snapshot->base_name_));
} else {
files.insert(files.end(), chunk_index_snapshot->files_.cbegin(), chunk_index_snapshot->files_.cend());
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ SharedPtr<BlockColumnSnapshotInfo> BlockColumnEntry::GetSnapshotInfo() const {
String outline_file_path = *OutlineFilename(file_idx);
SharedPtr<OutlineSnapshotInfo> outline_snapshot_info = MakeShared<OutlineSnapshotInfo>();
outline_snapshot_info->filename_ = outline_file_path;
block_column_snapshot_info->outline_snapshots_.emplace_back(outline_snapshot_info);
}
return block_column_snapshot_info;
}
Expand Down

0 comments on commit ebc4b9e

Please sign in to comment.