Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mmap2 #2407

Closed
wants to merge 17 commits into from
Closed

Mmap2 #2407

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/parser/type/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ inline std::string ReadBuf<std::string>(const char *buf) {
return str;
}

template <>
inline std::tuple<> ReadBuf<std::tuple<>>(const char *buf) {
return {};
}

template <>
inline std::tuple<> ReadBufAdv<std::tuple<>>(const char *&buf) {
return {};
}

template <>
inline std::string ReadBufAdv<std::string>(const char *&buf) {
int32_t size = ReadBufAdv<int32_t>(buf);
Expand Down Expand Up @@ -101,6 +111,12 @@ inline void WriteBufAdv<std::string>(char *&buf, const std::string &value) {
buf += len;
}

template <>
inline void WriteBuf<std::tuple<>>(char *const buf, const std::tuple<> &) {}

template <>
inline void WriteBufAdv<std::tuple<>>(char *&buf, const std::tuple<> &) {}

template <typename T>
inline void WriteBufVecAdv(char *&buf, const T *data, size_t size) {
static_assert(std::is_standard_layout_v<T>, "T must be POD");
Expand Down
9 changes: 8 additions & 1 deletion src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,16 @@ void BufferObj::UpdateFileWorkerInfo(UniquePtr<FileWorker> new_file_worker) {
}
}

BufferHandle BufferObj::Load() {
BufferHandle BufferObj::Load(bool no_mmap) {
buffer_mgr_->AddRequestCount();
std::unique_lock<std::mutex> locker(w_locker_);
if (type_ == BufferType::kMmap && no_mmap) {
if (rc_ > 0) {
String error_message = fmt::format("Buffer {} is mmaped, but has {} references", GetFilename(), rc_);
UnrecoverableError(error_message);
}
type_ = BufferType::kPersistent;
}
if (type_ == BufferType::kMmap) {
switch (status_) {
case BufferStatus::kLoaded: {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public:

public:
// called by ObjectHandle when load first time for that ObjectHandle
BufferHandle Load();
BufferHandle Load(bool no_mmap = false);

// called by BufferMgr in GC process.
bool Free();
Expand Down
4 changes: 2 additions & 2 deletions src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ protected:

virtual void ReadFromFileImpl(SizeT file_size) = 0;

Pair<Optional<DeferFn<std::function<void()>>>, String> GetFilePathInner(bool spill);

private:
String ChooseFileDir(bool spill) const;

Pair<Optional<DeferFn<std::function<void()>>>, String> GetFilePathInner(bool spill);

public:
const SharedPtr<String> data_dir_{};
const SharedPtr<String> temp_dir_{};
Expand Down
55 changes: 53 additions & 2 deletions src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
if constexpr (std::is_same_v<T, std::nullptr_t>) {
UnrecoverableError("Invalid index type.");
} else {
index->Save(*file_handle_);
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (IndexT::kOwnMem) {
index->SaveToPtr(*file_handle_);
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
Expand All @@ -134,10 +139,56 @@ void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
UnrecoverableError("Invalid index type.");
} else {
using IndexT = std::decay_t<decltype(*index)>;
index = IndexT::Load(*file_handle_).release();
if constexpr (IndexT::kOwnMem) {
index = IndexT::Load(*file_handle_).release();
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
}

bool HnswFileWorker::ReadFromMmapImpl(const void *ptr, SizeT size) {
if (mmap_data_ != nullptr) {
UnrecoverableError("Mmap data is already allocated.");
}
mmap_data_ = reinterpret_cast<u8 *>(new AbstractHnsw(HnswIndexInMem::InitAbstractIndex(index_base_.get(), column_def_.get(), false)));
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(mmap_data_);
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
UnrecoverableError("Invalid index type.");
} else {
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (!IndexT::kOwnMem) {
const auto *p = static_cast<const char *>(ptr);
index = IndexT::LoadFromPtr(p, size).release();
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
return true;
}

void HnswFileWorker::FreeFromMmapImpl() {
if (mmap_data_ == nullptr) {
UnrecoverableError("Mmap data is not allocated.");
}
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(mmap_data_);
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
delete index;
}
},
*hnsw_index);
delete hnsw_index;
mmap_data_ = nullptr;
}

} // namespace infinity
4 changes: 4 additions & 0 deletions src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ protected:

void ReadFromFileImpl(SizeT file_size) override;

bool ReadFromMmapImpl(const void *ptr, SizeT size) override;

void FreeFromMmapImpl() override;

private:
SizeT index_size_{};
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/io/virtual_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,9 @@ i32 VirtualStore::MmapFile(const String &file_path, u8 *&data_ptr, SizeT &data_l
return -1;
i32 f = open(file_path.c_str(), O_RDONLY);
void *tmpd = mmap(NULL, len_f, PROT_READ, MAP_SHARED, f, 0);
close(f);
if (tmpd == MAP_FAILED)
return -1;
close(f);
i32 rc = madvise(tmpd,
len_f,
MADV_NORMAL
Expand Down
137 changes: 68 additions & 69 deletions src/storage/knn_index/knn_hnsw/abstract_hnsw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,16 @@ HnswIndexInMem::HnswIndexInMem(RowID begin_row_id,
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
using IndexT = std::decay_t<decltype(*index)>;
hnsw_ = IndexT::Make(chunk_size, max_chunk_num, dim, M, ef_construction).release();
if constexpr (IndexT::kOwnMem) {
index = IndexT::Make(chunk_size, max_chunk_num, dim, M, ef_construction).release();
} else {
UnrecoverableError("HnswIndexInMem::HnswIndexInMem: index does not own memory");
}
}
},
hnsw_);
}

AbstractHnsw HnswIndexInMem::InitAbstractIndex(const IndexBase *index_base, const ColumnDef *column_def) {
const auto *index_hnsw = static_cast<const IndexHnsw *>(index_base);
const auto *embedding_info = static_cast<const EmbeddingInfo *>(column_def->type()->type_info().get());

switch (embedding_info->Type()) {
case EmbeddingDataType::kElemFloat: {
return InitAbstractIndex<float>(index_hnsw);
}
case EmbeddingDataType::kElemUInt8: {
return InitAbstractIndex<u8>(index_hnsw);
}
case EmbeddingDataType::kElemInt8: {
return InitAbstractIndex<i8>(index_hnsw);
}
default: {
return nullptr;
}
}
}

HnswIndexInMem::~HnswIndexInMem() {
SizeT mem_usage = 0;
std::visit(
Expand Down Expand Up @@ -146,27 +130,33 @@ void HnswIndexInMem::InsertVecs(SizeT block_offset,
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return;
} else {
using IndexT = std::decay_t<decltype(*index)>;
using DataType = typename IndexT::DataType;
SizeT mem_usage{};
switch (const auto &column_data_type = block_column_entry->column_type(); column_data_type->type()) {
case LogicalType::kEmbedding: {
MemIndexInserterIter<DataType> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
case LogicalType::kMultiVector: {
MemIndexInserterIter<MultiVectorRef<DataType>> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
if constexpr (IndexT::kOwnMem) {
using DataType = typename IndexT::DataType;
SizeT mem_usage{};
switch (const auto &column_data_type = block_column_entry->column_type(); column_data_type->type()) {
case LogicalType::kEmbedding: {
MemIndexInserterIter<DataType> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
case LogicalType::kMultiVector: {
MemIndexInserterIter<MultiVectorRef<DataType>> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
}
}
this->IncreaseMemoryUsageBase(mem_usage);
} else {
UnrecoverableError("HnswIndexInMem::InsertVecs: index does not own memory");
}
this->IncreaseMemoryUsageBase(mem_usage);
}
},
hnsw_);
Expand All @@ -183,38 +173,42 @@ void HnswIndexInMem::InsertVecs(const SegmentEntry *segment_entry,
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
using IndexT = std::decay_t<decltype(*index)>;
using DataType = typename IndexT::DataType;
if constexpr (!IndexT::kOwnMem) {
UnrecoverableError("HnswIndexInMem::InsertVecs: index does not own memory");
} else {
using DataType = typename IndexT::DataType;

SizeT mem_usage{};
switch (const auto &column_data_type = segment_entry->GetTableEntry()->GetColumnDefByID(column_id)->type();
column_data_type->type()) {
case LogicalType::kEmbedding: {
if (check_ts) {
OneColumnIterator<DataType> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<DataType, false> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
SizeT mem_usage{};
switch (const auto &column_data_type = segment_entry->GetTableEntry()->GetColumnDefByID(column_id)->type();
column_data_type->type()) {
case LogicalType::kEmbedding: {
if (check_ts) {
OneColumnIterator<DataType> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<DataType, false> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
}
break;
}
break;
}
case LogicalType::kMultiVector: {
const auto ele_size = column_data_type->type_info()->Size();
if (check_ts) {
OneColumnIterator<MultiVectorRef<DataType>> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<MultiVectorRef<DataType>, false> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
case LogicalType::kMultiVector: {
const auto ele_size = column_data_type->type_info()->Size();
if (check_ts) {
OneColumnIterator<MultiVectorRef<DataType>> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<MultiVectorRef<DataType>, false> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
}
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
}
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
}
this->IncreaseMemoryUsageBase(mem_usage);
}
this->IncreaseMemoryUsageBase(mem_usage);
}
},
hnsw_);
Expand All @@ -231,9 +225,14 @@ SharedPtr<ChunkIndexEntry> HnswIndexInMem::Dump(SegmentIndexEntry *segment_index
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return;
} else {
row_count = index->GetVecNum();
index_size = index->GetSizeInBytes();
dump_size = index->mem_usage();
using IndexT = typename std::remove_pointer_t<T>;
if constexpr (IndexT::kOwnMem) {
row_count = index->GetVecNum();
index_size = index->GetSizeInBytes();
dump_size = index->mem_usage();
} else {
UnrecoverableError("HnswIndexInMem::Dump: index does not own memory");
}
}
},
hnsw_);
Expand Down
Loading
Loading