Skip to content

Commit

Permalink
Load hnsw from mmap.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Dec 26, 2024
1 parent 2a5a20e commit ad2b530
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 360 deletions.
7 changes: 6 additions & 1 deletion src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
module;

#include <tuple>
#include <cstring>
#include<cerrno>

module file_worker;

Expand Down Expand Up @@ -223,7 +225,10 @@ void FileWorker::Mmap() {
auto [defer_fn, read_path] = GetFilePathInner(false);
bool use_object_cache = persistence_manager_ != nullptr;
SizeT file_size = VirtualStore::GetFileSize(read_path);
VirtualStore::MmapFile(read_path, mmap_addr_, file_size);
int ret = VirtualStore::MmapFile(read_path, mmap_addr_, file_size);
if (ret < 0) {
UnrecoverableError(fmt::format("Mmap file {} failed. {}", read_path, strerror(errno)));
}
if (use_object_cache) {
const void *ptr = mmap_addr_ + obj_addr_.part_offset_;
this->ReadFromMmapImpl(ptr, obj_addr_.part_size_);
Expand Down
4 changes: 2 additions & 2 deletions src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ protected:

virtual void ReadFromFileImpl(SizeT file_size) = 0;

Pair<Optional<DeferFn<std::function<void()>>>, String> GetFilePathInner(bool spill);

private:
String ChooseFileDir(bool spill) const;

Pair<Optional<DeferFn<std::function<void()>>>, String> GetFilePathInner(bool spill);

public:
const SharedPtr<String> data_dir_{};
const SharedPtr<String> temp_dir_{};
Expand Down
65 changes: 19 additions & 46 deletions src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void HnswFileWorker::AllocateInMemory() {
}

void HnswFileWorker::FreeInMemory() {
if (!data_) {
if (!data_ || !hnsw_mem_) {
String error_message = "FreeInMemory: Data is not allocated.";
UnrecoverableError(error_message);
}
Expand All @@ -99,6 +99,10 @@ void HnswFileWorker::FreeInMemory() {
*p);
delete p;
data_ = nullptr;

auto [defer_fn, read_path] = GetFilePathInner(false);
VirtualStore::MunmapFile(read_path);
hnsw_mem_ = nullptr;
}

bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
Expand All @@ -115,7 +119,7 @@ bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
} else {
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (IndexT::kOwnMem) {
index->Save(*file_handle_);
index->SaveToPtr(*file_handle_);
} else {
UnrecoverableError("Invalid index type.");
}
Expand All @@ -126,35 +130,22 @@ bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
return true;
}

void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
void HnswFileWorker::ReadFromFileImpl(SizeT fsize) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
data_ = static_cast<void *>(new AbstractHnsw(HnswIndexInMem::InitAbstractIndex(index_base_.get(), column_def_.get())));
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(data_);
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
UnrecoverableError("Invalid index type.");
} else {
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (IndexT::kOwnMem) {
index = IndexT::Load(*file_handle_).release();
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
}

bool HnswFileWorker::ReadFromMmapImpl(const void *ptr, SizeT size) {
if (mmap_data_ != nullptr) {
UnrecoverableError("Mmap data is already allocated.");
auto [defer_fn, read_path] = GetFilePathInner(false);
SizeT file_size = VirtualStore::GetFileSize(read_path);
u8 *mmap_addr = nullptr;
int ret = VirtualStore::MmapFile(read_path, mmap_addr, file_size);
hnsw_mem_ = mmap_addr;
if (ret < 0) {
UnrecoverableError(fmt::format("Mmap file {} failed. {}", read_path, strerror(errno)));
}
mmap_data_ = reinterpret_cast<u8 *>(new AbstractHnsw(HnswIndexInMem::InitAbstractIndex(index_base_.get(), column_def_.get(), false)));
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(mmap_data_);

data_ = static_cast<void *>(new AbstractHnsw(HnswIndexInMem::InitAbstractIndex(index_base_.get(), column_def_.get(), false /*own_mem*/)));
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(data_);
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
Expand All @@ -163,32 +154,14 @@ bool HnswFileWorker::ReadFromMmapImpl(const void *ptr, SizeT size) {
} else {
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (!IndexT::kOwnMem) {
const auto *p = static_cast<const char *>(ptr);
index = IndexT::LoadFromPtr(p, size).release();
const auto *mmap_addr = static_cast<const char *>(hnsw_mem_);
index = IndexT::LoadFromPtr(mmap_addr, file_size).release();
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
return true;
}

void HnswFileWorker::FreeFromMmapImpl() {
if (mmap_data_ == nullptr) {
UnrecoverableError("Mmap data is not allocated.");
}
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(mmap_data_);
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
delete index;
}
},
*hnsw_index);
delete hnsw_index;
mmap_data_ = nullptr;
}

} // namespace infinity
5 changes: 1 addition & 4 deletions src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ protected:

void ReadFromFileImpl(SizeT file_size) override;

bool ReadFromMmapImpl(const void *ptr, SizeT size) override;

void FreeFromMmapImpl() override;

private:
SizeT index_size_{};
void *hnsw_mem_{};
};

} // namespace infinity
2 changes: 1 addition & 1 deletion src/storage/io/virtual_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ i32 VirtualStore::MmapFile(const String &file_path, u8 *&data_ptr, SizeT &data_l
return -1;
i32 f = open(file_path.c_str(), O_RDONLY);
void *tmpd = mmap(NULL, len_f, PROT_READ, MAP_SHARED, f, 0);
close(f);
if (tmpd == MAP_FAILED)
return -1;
close(f);
i32 rc = madvise(tmpd,
len_f,
MADV_NORMAL
Expand Down
Loading

0 comments on commit ad2b530

Please sign in to comment.