From 1da3c0e2826c6b3d99e946f5518fca81b6fef716 Mon Sep 17 00:00:00 2001 From: Jianning Wang Date: Thu, 22 Jan 2026 10:52:12 +0800 Subject: [PATCH 1/4] revert local_builder's meta cap setting --- tools/core/local_builder.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/core/local_builder.cc b/tools/core/local_builder.cc index 9efc6b4a..28226418 100644 --- a/tools/core/local_builder.cc +++ b/tools/core/local_builder.cc @@ -542,8 +542,6 @@ int build_by_streamer(IndexStreamer::Pointer &streamer, return IndexError_NoExist; } ailego::Params params; - params.set("proxima.mmap_file.storage.segment_meta_capacity", - 20 * 1024 * 1024); int ret = storage->init(params); if (ret != 0) { cerr << "Storage Failed init"; From 82c11e451544b912b901be33fae79e964a487b1f Mon Sep 17 00:00:00 2001 From: Jianning Wang Date: Wed, 4 Feb 2026 15:30:34 +0800 Subject: [PATCH 2/4] feat: auto scalable segment meta section of mmap files --- src/core/framework/index_mapping.cc | 175 +++++++++++++++--- src/core/utility/buffer_storage.cc | 100 +++++++--- .../zvec/core/framework/index_format.h | 29 ++- .../zvec/core/framework/index_mapping.h | 11 +- .../zvec/core/framework/index_unpacker.h | 91 +++++---- tools/core/helper.h | 29 ++- tools/core/local_builder.cc | 16 +- tools/core/recall.cc | 30 +-- 8 files changed, 339 insertions(+), 142 deletions(-) diff --git a/src/core/framework/index_mapping.cc b/src/core/framework/index_mapping.cc index 41475d0e..0257f1e1 100644 --- a/src/core/framework/index_mapping.cc +++ b/src/core/framework/index_mapping.cc @@ -82,6 +82,7 @@ static inline int UnpackMappingSize(ailego::File &file, size_t *len) { } int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { + path_ = path; full_mode_ = full_mode; copy_on_write_ = cow; huge_page_ = Ishugetlbfs(path); @@ -108,7 +109,12 @@ int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { return this->init_index_mapping(mapping_size); } -int IndexMapping::create(const std::string &path, size_t segs_size) { +int IndexMapping::create(const std::string &path, size_t seg_meta_capacity) { + path_ = path; + seg_meta_capacity_ = seg_meta_capacity; + current_header_start_offset_ = 0; + + // write() & copying to mmap() will auto extend the file size if (!file_.create(path.c_str(), 0)) { LOG_ERROR("Failed to create file %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -116,17 +122,34 @@ int IndexMapping::create(const std::string &path, size_t segs_size) { } huge_page_ = Ishugetlbfs(path); if (huge_page_) { - return create_hugepage(segs_size); + return init_hugepage_meta_section(); + } + return init_meta_section(); +} + +int IndexMapping::init_meta_section() { + if (current_header_start_offset_ % ailego::MemoryHelper::PageSize() != 0) { + LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", + current_header_start_offset_, ailego::MemoryHelper::PageSize()); + return IndexError_InvalidValue; } - size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - false); + + auto &path = path_; + size_t len = + CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + false); IndexFormat::MetaHeader meta_header; IndexFormat::MetaFooter meta_footer; // Write index header IndexFormat::SetupMetaHeader(&meta_header, len - sizeof(meta_footer), len); + if (!file_.seek(current_header_start_offset_, ailego::File::Origin::Begin)) { + LOG_ERROR("Failed to seek file %s, errno %d, %s", path.c_str(), errno, + std::strerror(errno)); + return IndexError_SeekFile; + } if (file_.write(&meta_header, sizeof(meta_header)) != sizeof(meta_header)) { LOG_ERROR("Failed to write file: %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -155,11 +178,18 @@ int IndexMapping::create(const std::string &path, size_t segs_size) { return this->init_index_mapping(len); } -int IndexMapping::create_hugepage(size_t segs_size) { - size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - true); - int file_offset = 0; +int IndexMapping::init_hugepage_meta_section() { + ssize_t file_offset = (ssize_t)current_header_start_offset_; + if (file_offset % ailego::MemoryHelper::HugePageSize() != 0) { + LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", + file_offset, ailego::MemoryHelper::HugePageSize()); + return IndexError_InvalidValue; + } + + size_t len = + CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + true); int opts = ailego::File::MMAP_SHARED | ailego::File::MMAP_HUGE_PAGE; void *addr = ailego::File::MemoryMap(file_.native_handle(), file_offset, len, opts); @@ -194,6 +224,7 @@ int IndexMapping::create_hugepage(size_t segs_size) { IndexFormat::UpdateMetaFooter(&meta_footer, 0); memcpy((char *)addr + file_offset, &meta_footer, sizeof(meta_footer)); file_offset += sizeof(meta_footer); + return this->init_index_mapping(len); } @@ -201,12 +232,16 @@ void IndexMapping::close(void) { // Unmap all memory this->unmap_all(); if (header_) { - ailego::File::MemoryUnmap(header_, header_->content_offset); + for (auto item : header_addr_map_) { + auto header = item.second; + ailego::File::MemoryUnmap(header, header->content_offset); + } } // Reset members segment_ids_offset_ = 0; segment_start_ = nullptr; header_ = nullptr; + header_addr_map_.clear(); footer_ = nullptr; index_size_ = 0u; segments_.clear(); @@ -218,9 +253,19 @@ void IndexMapping::close(void) { } void IndexMapping::refresh(uint64_t check_point) { - footer_->segments_meta_crc = - ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); - IndexFormat::UpdateMetaFooter(footer_, check_point); + // support add_with_id + for (auto item : header_addr_map_) { + auto header_start_offset = item.first; + auto header = item.second; + auto footer = reinterpret_cast( + reinterpret_cast(header) + header->meta_footer_offset); + auto segment_start = reinterpret_cast( + reinterpret_cast(header) + + (header->meta_footer_offset - footer->segments_meta_size)); + footer->segments_meta_crc = + ailego::Crc32c::Hash(segment_start, footer->segments_meta_size, 0); + IndexFormat::UpdateMetaFooter(footer, check_point); + } header_dirty_ = true; } @@ -238,7 +283,21 @@ int IndexMapping::append(const std::string &id, size_t size) { size_t need_size = sizeof(IndexFormat::SegmentMeta) + id_size; if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count + need_size > segment_ids_offset_) { - return IndexError_NoBuffer; + LOG_DEBUG("segment meta section expanded: %s", path_.c_str()); + footer_->next_meta_header_offset = index_size_; + refresh(0); + flush(); + // mmap file storage write() will update segment's meta + // ailego::File::MemoryUnmap(header_, header_->content_offset); + header_ = nullptr; + footer_ = nullptr; + + current_header_start_offset_ = index_size_; + const int ret = + huge_page_ ? init_hugepage_meta_section() : init_meta_section(); + if (ret != 0) { + return ret; + } } if (!copy_on_write_ && !file_.truncate(index_size_ + size)) { @@ -251,7 +310,10 @@ int IndexMapping::append(const std::string &id, size_t size) { segment_ids_offset_ -= static_cast(id_size); IndexFormat::SegmentMeta *segment = segment_start_ + footer_->segment_count; segment->segment_id_offset = segment_ids_offset_; - segment->data_index = index_size_ - header_->content_offset; + // For compatibility, write offset relative to the content_offset, + // and then restore the absolute offset while loading, i.e., + // init_index_mapping(). + segment->data_absolute_offset = index_size_; segment->data_size = 0; segment->data_crc = 0; segment->padding_size = size; @@ -280,7 +342,7 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup, if (!item->data()) { auto meta = item->meta(); size_t mapping_size = meta->data_size + meta->padding_size; - size_t offset = meta->data_index + header_->content_offset; + size_t offset = meta->data_absolute_offset; void *addr = nullptr; if (!copy_on_write_) { @@ -363,7 +425,7 @@ int IndexMapping::flush(void) { size_t segment_size = item->meta()->data_size + item->meta()->padding_size; if (full_mode_ && copy_on_write_) { - size_t off = header_->content_offset + item->meta()->data_index; + size_t off = item->meta()->data_absolute_offset; if (file_.write(off, item->data(), segment_size) != segment_size) { LOG_ERROR("Failed to write segment, size %zu, errno %d, %s", segment_size, errno, std::strerror(errno)); @@ -381,14 +443,21 @@ int IndexMapping::flush(void) { header_dirty_ = false; if (full_mode_ && copy_on_write_) { - if (file_.write(0, header_, header_->content_offset) != - header_->content_offset) { - LOG_ERROR("Failed to write segment, size %u, errno %d, %s", - header_->content_offset, errno, std::strerror(errno)); - return IndexError_WriteData; + for (auto item : header_addr_map_) { + auto header_start_offset = item.first; + auto header = item.second; + if (file_.write(header_start_offset, header, header->content_offset) != + header->content_offset) { + LOG_ERROR("Failed to write segment, size %lu, errno %d, %s", + header->content_offset, errno, std::strerror(errno)); + return IndexError_WriteData; + } } } else { - ailego::File::MemoryFlush(header_, header_->content_offset); + for (auto item : header_addr_map_) { + auto header = item.second; + ailego::File::MemoryFlush(header, header->content_offset); + } } return 0; } @@ -399,8 +468,8 @@ int IndexMapping::init_index_mapping(size_t len) { if (huge_page_) { opts |= ailego::File::MMAP_HUGE_PAGE; } - uint8_t *start = reinterpret_cast( - ailego::File::MemoryMap(file_.native_handle(), 0, len, opts)); + uint8_t *start = reinterpret_cast(ailego::File::MemoryMap( + file_.native_handle(), current_header_start_offset_, len, opts)); if (!start) { LOG_ERROR("Failed to map file, errno %d, %s", errno, std::strerror(errno)); return IndexError_MMapFile; @@ -408,6 +477,7 @@ int IndexMapping::init_index_mapping(size_t len) { // Unpack header header_ = reinterpret_cast(start); + header_addr_map_.insert({current_header_start_offset_, header_}); if (header_->meta_header_size != sizeof(IndexFormat::MetaHeader)) { return IndexError_InvalidLength; } @@ -416,6 +486,15 @@ int IndexMapping::init_index_mapping(size_t len) { return IndexError_InvalidChecksum; } + switch (header_->version) { + case IndexFormat::CURRENT_FORMAT_VERSION: + case IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002: + break; + default: + LOG_ERROR("Unsupported index version: %u", header_->version); + return IndexError_Unsupported; + } + // Unpack footer if (header_->meta_footer_size != sizeof(IndexFormat::MetaFooter)) { return IndexError_InvalidLength; @@ -466,12 +545,34 @@ int IndexMapping::init_index_mapping(size_t len) { if (iter->segment_id_offset > footer_->segments_meta_size) { return IndexError_InvalidValue; } - if (iter->data_index > footer_->content_size) { + // TODO: this is a simplified format compatibility handling logic, + // and in the future, it should become a independent function. + if (header_->version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { + // v2: data offset is relative to the content_offset. + if (iter->data_absolute_offset > footer_->content_size) { + return IndexError_InvalidValue; + } + if (iter->data_absolute_offset + iter->data_size > + footer_->content_size) { + return IndexError_InvalidLength; + } + iter->data_absolute_offset += + header_->content_offset + current_header_start_offset_; + // v2 end + } + + // v3: data offset is absolute offset in file. + if (iter->data_absolute_offset > footer_->content_size + + header_->content_offset + + current_header_start_offset_) { return IndexError_InvalidValue; } - if (iter->data_index + iter->data_size > footer_->content_size) { + if (iter->data_absolute_offset + iter->data_size > + footer_->content_size + header_->content_offset + + current_header_start_offset_) { return IndexError_InvalidLength; } + // v3 end if (iter->segment_id_offset < segment_ids_offset_) { segment_ids_offset_ = iter->segment_id_offset; @@ -485,6 +586,24 @@ int IndexMapping::init_index_mapping(size_t len) { segment_ids_offset_) { return IndexError_InvalidLength; } + + if (header_->version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { + header_->version = IndexFormat::CURRENT_FORMAT_VERSION; + LOG_INFO("Index file format upgraded"); + IndexFormat::UpdateMetaHeader(header_); + footer_->segments_meta_crc = + ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); + IndexFormat::UpdateMetaFooter(footer_, 0); + header_dirty_ = true; + } + + if (footer_->next_meta_header_offset > 0) { + current_header_start_offset_ = footer_->next_meta_header_offset; + // Meta sections have all the same size, so we can use the same size to map + // the next meta section + return this->init_index_mapping(len); + } + return 0; } diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 4ac3c6b3..68200d85 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -90,8 +90,7 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = - segment_->meta()->data_index + owner_->get_context_offset() + offset; + size_t buffer_offset = segment_->meta()->data_absolute_offset + offset; ailego::BufferHandle buffer_handle = owner_->get_buffer_handle(buffer_offset, len); *data = buffer_handle.pin_vector_data(); @@ -106,8 +105,7 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = - segment_->meta()->data_index + owner_->get_context_offset() + offset; + size_t buffer_offset = segment_->meta()->data_absolute_offset + offset; data.reset(owner_->get_buffer_handle_ptr(buffer_offset, len)); if (data.data()) { return len; @@ -235,12 +233,35 @@ class BufferStorage : public IndexStorage { if (iter->segment_id_offset > footer_.segments_meta_size) { return IndexError_InvalidValue; } - if (iter->data_index > footer_.content_size) { + + // TODO: this is a simplified format compatibility handling logic, + // and in the future, it should become a independent function. + if (header_.version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { + // v2: data offset is relative to the content_offset. + if (iter->data_absolute_offset > footer_.content_size) { + return IndexError_InvalidValue; + } + if (iter->data_absolute_offset + iter->data_size > + footer_.content_size) { + return IndexError_InvalidLength; + } + iter->data_absolute_offset += + header_.content_offset + current_header_start_offset_; + // v2 end + } + + // v3: data offset is absolute offset in file. + if (iter->data_absolute_offset > footer_.content_size + + header_.content_offset + + current_header_start_offset_) { return IndexError_InvalidValue; } - if (iter->data_index + iter->data_size > footer_.content_size) { + if (iter->data_absolute_offset + iter->data_size > + footer_.content_size + header_.content_offset + + current_header_start_offset_) { return IndexError_InvalidLength; } + // v3 end if (iter->segment_id_offset < segment_ids_offset) { segment_ids_offset = iter->segment_id_offset; @@ -258,25 +279,59 @@ class BufferStorage : public IndexStorage { } int ParseToMapping() { - ParseHeader(0); + while (true) { + int ret; + ret = ParseHeader(current_header_start_offset_); + if (ret != 0) { + LOG_ERROR("Failed to parse header, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } - // Unpack footer - if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { - return IndexError_InvalidLength; - } - if ((int32_t)header_.meta_footer_offset < 0) { - return IndexError_Unsupported; - } - size_t footer_offset = header_.meta_footer_offset; - ParseFooter(footer_offset); + switch (header_.version) { + case IndexFormat::CURRENT_FORMAT_VERSION: + case IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002: + break; + default: + LOG_ERROR("Unsupported index version: %u", header_.version); + return IndexError_Unsupported; + } - // Unpack segment table - if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > - footer_.segments_meta_size) { - return IndexError_InvalidLength; + // Unpack footer + if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { + return IndexError_InvalidLength; + } + if ((int32_t)header_.meta_footer_offset < 0) { + return IndexError_Unsupported; + } + uint64_t footer_offset = + header_.meta_footer_offset + current_header_start_offset_; + ret = ParseFooter(footer_offset); + if (ret != 0) { + LOG_ERROR("Failed to parse footer, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } + + // Unpack segment table + if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > + footer_.segments_meta_size) { + return IndexError_InvalidLength; + } + const uint64_t segment_start_offset = + footer_offset - footer_.segments_meta_size; + ret = ParseSegment(segment_start_offset); + if (ret != 0) { + LOG_ERROR("Failed to parse segment, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } + + if (footer_.next_meta_header_offset == 0) { + break; + } + current_header_start_offset_ = footer_.next_meta_header_offset; } - const int segment_start_offset = footer_offset - footer_.segments_meta_size; - ParseSegment(segment_start_offset); return 0; } @@ -434,6 +489,7 @@ class BufferStorage : public IndexStorage { IndexFormat::MetaHeader header_; IndexFormat::MetaFooter footer_; std::map segments_{}; + uint64_t current_header_start_offset_{0u}; }; INDEX_FACTORY_REGISTER_STORAGE(BufferStorage); diff --git a/src/include/zvec/core/framework/index_format.h b/src/include/zvec/core/framework/index_format.h index d3b7179c..95d6b9d5 100644 --- a/src/include/zvec/core/framework/index_format.h +++ b/src/include/zvec/core/framework/index_format.h @@ -27,7 +27,12 @@ namespace core { struct IndexFormat { /*! Version Number */ - enum { FORMAT_VERSION = 0x0002 }; + enum { + CURRENT_FORMAT_VERSION = 0x0003, + // 0x0002: data offset in SegmentMeta is relative to the content_offset, + // while new version is absolute offset in file. + COMPATIBLE_FORMAT_VERSION_0X0002 = 0x0002, + }; /*! Index Format Meta Header */ @@ -40,8 +45,7 @@ struct IndexFormat { uint16_t meta_header_size; uint16_t meta_footer_size; uint32_t meta_footer_offset; - uint32_t content_offset; - uint32_t reserved2_; + uint64_t content_offset; uint64_t setup_time; uint64_t reserved3_[3]; }; @@ -56,13 +60,17 @@ struct IndexFormat { uint32_t segments_meta_crc; uint32_t content_crc; uint32_t segment_count; + // meta section size uint32_t segments_meta_size; uint32_t reserved1_; + // segments' data section uint64_t content_size; uint64_t content_padding_size; + uint64_t check_point; uint64_t update_time; - uint64_t reserved2_[8]; + uint64_t reserved2_[7]; + uint64_t next_meta_header_offset; uint64_t total_size; }; @@ -73,8 +81,10 @@ struct IndexFormat { */ struct SegmentMeta { uint32_t segment_id_offset; + // used only by immutable segments, e.g., IndexMeta, or searcher uint32_t data_crc; - uint64_t data_index; + // offset in file + uint64_t data_absolute_offset; uint64_t data_size; uint64_t padding_size; }; @@ -100,7 +110,7 @@ struct IndexFormat { } SegmentMeta *meta = (SegmentMeta *)buffer_.data() + count_; meta->segment_id_offset = static_cast(buffer_.size()); - meta->data_index = offset_; + meta->data_absolute_offset = offset_; meta->data_size = data_size; meta->data_crc = data_crc; meta->padding_size = padding_size; @@ -145,7 +155,7 @@ struct IndexFormat { static void SetupMetaHeader(MetaHeader *header, uint32_t footer_offset, uint32_t content_offset) { memset(header, 0, sizeof(MetaHeader)); - header->version = IndexFormat::FORMAT_VERSION; + header->version = IndexFormat::CURRENT_FORMAT_VERSION; header->revision = 0; header->magic = std::random_device()(); header->meta_header_size = sizeof(MetaHeader); @@ -156,6 +166,11 @@ struct IndexFormat { header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); } + static void UpdateMetaHeader(MetaHeader *header) { + header->header_crc = 0; + header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); + } + //! Setup meta footer structure static void SetupMetaFooter(MetaFooter *footer) { memset(footer, 0, sizeof(MetaFooter)); diff --git a/src/include/zvec/core/framework/index_mapping.h b/src/include/zvec/core/framework/index_mapping.h index 7074088c..cb6f932f 100644 --- a/src/include/zvec/core/framework/index_mapping.h +++ b/src/include/zvec/core/framework/index_mapping.h @@ -120,8 +120,6 @@ class IndexMapping { //! Create a index file int create(const std::string &path, size_t segs_size); - int create_hugepage(size_t segs_size); - //! Close the index void close(void); @@ -183,6 +181,9 @@ class IndexMapping { bool Ishugetlbfs(const std::string &path) const; + int init_meta_section(); + int init_hugepage_meta_section(); + private: //! Disable them IndexMapping(const IndexMapping &) = delete; @@ -192,15 +193,19 @@ class IndexMapping { uint32_t segment_ids_offset_{0}; IndexFormat::SegmentMeta *segment_start_{nullptr}; IndexFormat::MetaHeader *header_{nullptr}; + std::map header_addr_map_{}; IndexFormat::MetaFooter *footer_{nullptr}; std::map segments_{}; size_t index_size_{0u}; ailego::File file_{}; + std::string path_; bool copy_on_write_{false}; bool full_mode_{false}; bool header_dirty_{false}; bool huge_page_{false}; + size_t seg_meta_capacity_{0u}; + uint64_t current_header_start_offset_{0u}; }; } // namespace core -} // namespace zvec +} // namespace zvec \ No newline at end of file diff --git a/src/include/zvec/core/framework/index_unpacker.h b/src/include/zvec/core/framework/index_unpacker.h index 425ad12d..35fea5d2 100644 --- a/src/include/zvec/core/framework/index_unpacker.h +++ b/src/include/zvec/core/framework/index_unpacker.h @@ -107,22 +107,29 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); - if (!this->unpack_header(read_data)) { - LOG_ERROR("Failed to unpack index header"); - return false; - } - if (!this->unpack_footer(read_data, total)) { - LOG_ERROR("Failed to unpack index footer"); - return false; - } - if (!this->unpack_segments(read_data, total)) { - LOG_ERROR("Failed to unpack index segments' meta"); - return false; - } - if (checksum && !this->validate_checksum(read_data)) { - LOG_ERROR("Failed to validate checksum of index content"); - return false; + while (true) { + if (!this->unpack_header(read_data)) { + LOG_ERROR("Failed to unpack index header"); + return false; + } + if (!this->unpack_footer(read_data, total)) { + LOG_ERROR("Failed to unpack index footer"); + return false; + } + if (!this->unpack_segments(read_data, total)) { + LOG_ERROR("Failed to unpack index segments' meta"); + return false; + } + if (checksum && !this->validate_checksum(read_data)) { + LOG_ERROR("Failed to validate checksum of index content"); + return false; + } + if (footer_.next_meta_header_offset == 0) { + break; + } + current_header_start_offset_ = footer_.next_meta_header_offset; } + if (!this->unpack_version(read_data)) { LOG_ERROR("Failed to unpack index version"); return false; @@ -137,7 +144,8 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); const void *data = nullptr; - if (read_data(0u, &data, sizeof(header_)) != sizeof(header_)) { + if (read_data(current_header_start_offset_, &data, sizeof(header_)) != + sizeof(header_)) { return false; } @@ -149,6 +157,13 @@ class IndexUnpacker { header_.header_crc) { return false; } + + if (header_.version != IndexFormat::CURRENT_FORMAT_VERSION) { + LOG_ERROR( + "This index is an old version, please use mmap(rw)+streamer to open " + "the index and perform auto upgrade of the index format"); + return false; + } return true; } @@ -170,15 +185,15 @@ class IndexUnpacker { } const void *data = nullptr; - if (read_data(footer_offset, &data, sizeof(footer_)) != sizeof(footer_)) { + if (read_data(current_header_start_offset_ + footer_offset, &data, + sizeof(footer_)) != sizeof(footer_)) { return false; } memcpy(&footer_, data, sizeof(footer_)); - if ((footer_.total_size != total) || - (footer_.content_size + footer_.content_padding_size + - header_.content_offset > - total)) { + if (footer_.content_size + footer_.content_padding_size + + header_.content_offset > + footer_.total_size) { return false; } if (ailego::Crc32c::Hash(&footer_, sizeof(footer_), footer_.footer_crc) != @@ -207,8 +222,8 @@ class IndexUnpacker { offset -= footer_.segments_meta_size; const void *data = nullptr; - if (read_data(offset, &data, footer_.segments_meta_size) != - footer_.segments_meta_size) { + if (read_data(current_header_start_offset_ + offset, &data, + footer_.segments_meta_size) != footer_.segments_meta_size) { return false; } if (ailego::Crc32c::Hash(data, footer_.segments_meta_size, 0u) != @@ -221,17 +236,20 @@ class IndexUnpacker { if (seg->segment_id_offset > footer_.segments_meta_size) { return false; } - if (seg->data_index > footer_.content_size) { + if (seg->data_absolute_offset > footer_.content_size + + header_.content_offset + + current_header_start_offset_) { return false; } - if (seg->data_index + seg->data_size > footer_.content_size) { + if (seg->data_absolute_offset + seg->data_size > + footer_.content_size + header_.content_offset + + current_header_start_offset_) { return false; } - segments_.emplace( - std::string(reinterpret_cast(data) + - seg->segment_id_offset), - SegmentMeta(seg->data_index + header_.content_offset, seg->data_size, - seg->padding_size, seg->data_crc)); + segments_.emplace(std::string(reinterpret_cast(data) + + seg->segment_id_offset), + SegmentMeta(seg->data_absolute_offset, seg->data_size, + seg->padding_size, seg->data_crc)); } return true; } @@ -251,7 +269,7 @@ class IndexUnpacker { const SegmentMeta &segment = it->second; const void *data = nullptr; - if (read_data(segment.data_offset(), &data, segment.data_size()) != + if (read_data(0 + segment.data_offset(), &data, segment.data_size()) != segment.data_size()) { return false; } @@ -280,14 +298,16 @@ class IndexUnpacker { size_t offset = sizeof(header_); while (total >= block_size) { - if (read_data(offset, &data, block_size) != block_size) { + if (read_data(current_header_start_offset_ + offset, &data, block_size) != + block_size) { return false; } checksum = ailego::Crc32c::Hash(data, block_size, checksum); total -= block_size; offset += block_size; } - if (read_data(offset, &data, total) != total) { + if (read_data(current_header_start_offset_ + offset, &data, total) != + total) { return false; } checksum = ailego::Crc32c::Hash(data, total, checksum); @@ -295,10 +315,11 @@ class IndexUnpacker { } private: - IndexFormat::MetaHeader header_; - IndexFormat::MetaFooter footer_; + IndexFormat::MetaHeader header_{}; + IndexFormat::MetaFooter footer_{}; std::string version_{}; std::map segments_{}; + uint64_t current_header_start_offset_{0u}; }; } // namespace core diff --git a/tools/core/helper.h b/tools/core/helper.h index c24c89ff..037958e5 100644 --- a/tools/core/helper.h +++ b/tools/core/helper.h @@ -72,7 +72,7 @@ int parse_and_load_index_param( index_config.as()); index = core_interface::IndexFactory::CreateAndInitIndex(*params); if (!index) { - cerr << "Failed to create index" << endl; + LOG_ERROR("Failed to create index"); return -1; } core_interface::StorageOptions storage_options; @@ -82,13 +82,13 @@ int parse_and_load_index_param( int ret = index->Open(index_dir, storage_options); if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; - return false; + LOG_ERROR("Index open failed with ret %d", ret); + return -1; } cout << "Load index done!" << endl; } else { - cerr << "IndexCommon.IndexConfig is required" << endl; + LOG_ERROR("IndexCommon.IndexConfig is required"); return -1; } @@ -114,7 +114,7 @@ int parse_and_load_index_param( core_interface::BaseIndexQueryParam>( query_param_config.as()); if (!query_param) { - cerr << "Failed to deserialize query params" << endl; + LOG_ERROR("Failed to deserialize query params"); return -1; } } @@ -130,8 +130,7 @@ int parse_and_load_index_param( auto scale_factor = scale_factor_config.as(); refiner_param->scale_factor_ = scale_factor; } else { - cerr << "QueryConfig.RefinerConfig.ScaleFactor config is required" - << endl; + LOG_ERROR("QueryConfig.RefinerConfig.ScaleFactor config is required"); return -1; } @@ -149,9 +148,9 @@ int parse_and_load_index_param( reference_index = core_interface::IndexFactory::CreateAndInitIndex(*params); } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " - "required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " + "required"); return -1; } @@ -169,20 +168,20 @@ int parse_and_load_index_param( int ret = reference_index->Open(reference_index_path, storage_options); if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; + LOG_ERROR("Index open failed with ret %d", ret); return -1; } cout << "Load reference index done!" << endl; } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Path is required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex.Path is required"); return -1; } refiner_param->reference_index = reference_index; } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex section is required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex section is required"); return -1; } // QueryConfig.RefinerConfig.ReferenceIndex diff --git a/tools/core/local_builder.cc b/tools/core/local_builder.cc index 28226418..f6d80510 100644 --- a/tools/core/local_builder.cc +++ b/tools/core/local_builder.cc @@ -157,9 +157,9 @@ static inline size_t AlignSize(size_t size) { return (size + 0x1F) & (~0x1F); } -int64_t dump_meta_segment(const IndexDumper::Pointer &dumper, - const std::string &segment_id, const void *data, - size_t size, size_t &writes) { +bool dump_meta_segment(const IndexDumper::Pointer &dumper, + const std::string &segment_id, const void *data, + size_t size, size_t &writes) { size_t len = dumper->write(data, size); if (len != size) { LOG_ERROR("Dump segment %s data failed, expect: %lu, actual: %lu", @@ -373,17 +373,17 @@ int build_sparse_by_streamer(IndexStreamer::Pointer &streamer, ailego::Params params; int ret = storage->init(params); if (ret != 0) { - cerr << "Storage Failed init"; + cerr << "Storage Failed init" << endl; return IndexError_Runtime; } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open"; + cerr << "Storage Failed to open" << endl; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage"; + cerr << "Failed to open storage" << endl; return IndexError_Runtime; } @@ -549,12 +549,12 @@ int build_by_streamer(IndexStreamer::Pointer &streamer, } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open"; + cerr << "Storage Failed to open" << endl; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage"; + cerr << "Failed to open storage" << endl; return IndexError_Runtime; } diff --git a/tools/core/recall.cc b/tools/core/recall.cc index 75d55ba4..7fa0a9bc 100644 --- a/tools/core/recall.cc +++ b/tools/core/recall.cc @@ -1478,27 +1478,6 @@ void usage(void) { cout << "Usage: recall CONFIG.yaml [plugin file path]" << endl; } -bool load_index(core_interface::Index::Pointer index, string &index_dir, - std::vector> &id_to_tags_list, - std::vector &tag_key_list) { - core_interface::StorageOptions storage_options; - storage_options.type = core_interface::StorageOptions::StorageType::kMMAP; - storage_options.create_new = false; - storage_options.read_only = true; - - int ret = index->Open(index_dir, storage_options); - if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; - return false; - } - - // Load tag lists if available - load_taglists(index_dir, id_to_tags_list, tag_key_list); - - cout << "Load index done!" << endl; - return true; -}; - int recall_dense(std::string &query_type, size_t thread_count, size_t batch_count, string top_k, size_t gt_count, string query_file, string &first_sep, string &second_sep, @@ -1612,7 +1591,10 @@ int recall_sparse(std::string &query_type, size_t thread_count, std::vector> id_to_tags_list; std::vector tag_key_list; // Load tag lists if available - load_taglists(index_dir, id_to_tags_list, tag_key_list); + if (load_taglists(index_dir, id_to_tags_list, tag_key_list) != 0) { + cerr << "Failed to load tag lists" << endl; + return -1; + } recall.set_tag_lists(id_to_tags_list, tag_key_list); @@ -1638,12 +1620,12 @@ int get_recall_precision(string &recall_precision_string) { } catch (const std::invalid_argument &e) { std::cerr << "Exeception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return false; + return -1; } catch (const std::out_of_range &e) { std::cerr << "Out of range exception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return false; + return -1; } return true; From ce4351a4b715083c3397e75df33c7582084cb686 Mon Sep 17 00:00:00 2001 From: Jianning Wang Date: Wed, 4 Feb 2026 18:16:27 +0800 Subject: [PATCH 3/4] Revert "feat: auto scalable segment meta section of mmap files" This reverts commit 82c11e451544b912b901be33fae79e964a487b1f. --- src/core/framework/index_mapping.cc | 175 +++--------------- src/core/utility/buffer_storage.cc | 100 +++------- .../zvec/core/framework/index_format.h | 29 +-- .../zvec/core/framework/index_mapping.h | 11 +- .../zvec/core/framework/index_unpacker.h | 91 ++++----- tools/core/helper.h | 29 +-- tools/core/local_builder.cc | 16 +- tools/core/recall.cc | 30 ++- 8 files changed, 142 insertions(+), 339 deletions(-) diff --git a/src/core/framework/index_mapping.cc b/src/core/framework/index_mapping.cc index 0257f1e1..41475d0e 100644 --- a/src/core/framework/index_mapping.cc +++ b/src/core/framework/index_mapping.cc @@ -82,7 +82,6 @@ static inline int UnpackMappingSize(ailego::File &file, size_t *len) { } int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { - path_ = path; full_mode_ = full_mode; copy_on_write_ = cow; huge_page_ = Ishugetlbfs(path); @@ -109,12 +108,7 @@ int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { return this->init_index_mapping(mapping_size); } -int IndexMapping::create(const std::string &path, size_t seg_meta_capacity) { - path_ = path; - seg_meta_capacity_ = seg_meta_capacity; - current_header_start_offset_ = 0; - - // write() & copying to mmap() will auto extend the file size +int IndexMapping::create(const std::string &path, size_t segs_size) { if (!file_.create(path.c_str(), 0)) { LOG_ERROR("Failed to create file %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -122,34 +116,17 @@ int IndexMapping::create(const std::string &path, size_t seg_meta_capacity) { } huge_page_ = Ishugetlbfs(path); if (huge_page_) { - return init_hugepage_meta_section(); - } - return init_meta_section(); -} - -int IndexMapping::init_meta_section() { - if (current_header_start_offset_ % ailego::MemoryHelper::PageSize() != 0) { - LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", - current_header_start_offset_, ailego::MemoryHelper::PageSize()); - return IndexError_InvalidValue; + return create_hugepage(segs_size); } - - auto &path = path_; - size_t len = - CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - false); + size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + false); IndexFormat::MetaHeader meta_header; IndexFormat::MetaFooter meta_footer; // Write index header IndexFormat::SetupMetaHeader(&meta_header, len - sizeof(meta_footer), len); - if (!file_.seek(current_header_start_offset_, ailego::File::Origin::Begin)) { - LOG_ERROR("Failed to seek file %s, errno %d, %s", path.c_str(), errno, - std::strerror(errno)); - return IndexError_SeekFile; - } if (file_.write(&meta_header, sizeof(meta_header)) != sizeof(meta_header)) { LOG_ERROR("Failed to write file: %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -178,18 +155,11 @@ int IndexMapping::init_meta_section() { return this->init_index_mapping(len); } -int IndexMapping::init_hugepage_meta_section() { - ssize_t file_offset = (ssize_t)current_header_start_offset_; - if (file_offset % ailego::MemoryHelper::HugePageSize() != 0) { - LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", - file_offset, ailego::MemoryHelper::HugePageSize()); - return IndexError_InvalidValue; - } - - size_t len = - CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - true); +int IndexMapping::create_hugepage(size_t segs_size) { + size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + true); + int file_offset = 0; int opts = ailego::File::MMAP_SHARED | ailego::File::MMAP_HUGE_PAGE; void *addr = ailego::File::MemoryMap(file_.native_handle(), file_offset, len, opts); @@ -224,7 +194,6 @@ int IndexMapping::init_hugepage_meta_section() { IndexFormat::UpdateMetaFooter(&meta_footer, 0); memcpy((char *)addr + file_offset, &meta_footer, sizeof(meta_footer)); file_offset += sizeof(meta_footer); - return this->init_index_mapping(len); } @@ -232,16 +201,12 @@ void IndexMapping::close(void) { // Unmap all memory this->unmap_all(); if (header_) { - for (auto item : header_addr_map_) { - auto header = item.second; - ailego::File::MemoryUnmap(header, header->content_offset); - } + ailego::File::MemoryUnmap(header_, header_->content_offset); } // Reset members segment_ids_offset_ = 0; segment_start_ = nullptr; header_ = nullptr; - header_addr_map_.clear(); footer_ = nullptr; index_size_ = 0u; segments_.clear(); @@ -253,19 +218,9 @@ void IndexMapping::close(void) { } void IndexMapping::refresh(uint64_t check_point) { - // support add_with_id - for (auto item : header_addr_map_) { - auto header_start_offset = item.first; - auto header = item.second; - auto footer = reinterpret_cast( - reinterpret_cast(header) + header->meta_footer_offset); - auto segment_start = reinterpret_cast( - reinterpret_cast(header) + - (header->meta_footer_offset - footer->segments_meta_size)); - footer->segments_meta_crc = - ailego::Crc32c::Hash(segment_start, footer->segments_meta_size, 0); - IndexFormat::UpdateMetaFooter(footer, check_point); - } + footer_->segments_meta_crc = + ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); + IndexFormat::UpdateMetaFooter(footer_, check_point); header_dirty_ = true; } @@ -283,21 +238,7 @@ int IndexMapping::append(const std::string &id, size_t size) { size_t need_size = sizeof(IndexFormat::SegmentMeta) + id_size; if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count + need_size > segment_ids_offset_) { - LOG_DEBUG("segment meta section expanded: %s", path_.c_str()); - footer_->next_meta_header_offset = index_size_; - refresh(0); - flush(); - // mmap file storage write() will update segment's meta - // ailego::File::MemoryUnmap(header_, header_->content_offset); - header_ = nullptr; - footer_ = nullptr; - - current_header_start_offset_ = index_size_; - const int ret = - huge_page_ ? init_hugepage_meta_section() : init_meta_section(); - if (ret != 0) { - return ret; - } + return IndexError_NoBuffer; } if (!copy_on_write_ && !file_.truncate(index_size_ + size)) { @@ -310,10 +251,7 @@ int IndexMapping::append(const std::string &id, size_t size) { segment_ids_offset_ -= static_cast(id_size); IndexFormat::SegmentMeta *segment = segment_start_ + footer_->segment_count; segment->segment_id_offset = segment_ids_offset_; - // For compatibility, write offset relative to the content_offset, - // and then restore the absolute offset while loading, i.e., - // init_index_mapping(). - segment->data_absolute_offset = index_size_; + segment->data_index = index_size_ - header_->content_offset; segment->data_size = 0; segment->data_crc = 0; segment->padding_size = size; @@ -342,7 +280,7 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup, if (!item->data()) { auto meta = item->meta(); size_t mapping_size = meta->data_size + meta->padding_size; - size_t offset = meta->data_absolute_offset; + size_t offset = meta->data_index + header_->content_offset; void *addr = nullptr; if (!copy_on_write_) { @@ -425,7 +363,7 @@ int IndexMapping::flush(void) { size_t segment_size = item->meta()->data_size + item->meta()->padding_size; if (full_mode_ && copy_on_write_) { - size_t off = item->meta()->data_absolute_offset; + size_t off = header_->content_offset + item->meta()->data_index; if (file_.write(off, item->data(), segment_size) != segment_size) { LOG_ERROR("Failed to write segment, size %zu, errno %d, %s", segment_size, errno, std::strerror(errno)); @@ -443,21 +381,14 @@ int IndexMapping::flush(void) { header_dirty_ = false; if (full_mode_ && copy_on_write_) { - for (auto item : header_addr_map_) { - auto header_start_offset = item.first; - auto header = item.second; - if (file_.write(header_start_offset, header, header->content_offset) != - header->content_offset) { - LOG_ERROR("Failed to write segment, size %lu, errno %d, %s", - header->content_offset, errno, std::strerror(errno)); - return IndexError_WriteData; - } + if (file_.write(0, header_, header_->content_offset) != + header_->content_offset) { + LOG_ERROR("Failed to write segment, size %u, errno %d, %s", + header_->content_offset, errno, std::strerror(errno)); + return IndexError_WriteData; } } else { - for (auto item : header_addr_map_) { - auto header = item.second; - ailego::File::MemoryFlush(header, header->content_offset); - } + ailego::File::MemoryFlush(header_, header_->content_offset); } return 0; } @@ -468,8 +399,8 @@ int IndexMapping::init_index_mapping(size_t len) { if (huge_page_) { opts |= ailego::File::MMAP_HUGE_PAGE; } - uint8_t *start = reinterpret_cast(ailego::File::MemoryMap( - file_.native_handle(), current_header_start_offset_, len, opts)); + uint8_t *start = reinterpret_cast( + ailego::File::MemoryMap(file_.native_handle(), 0, len, opts)); if (!start) { LOG_ERROR("Failed to map file, errno %d, %s", errno, std::strerror(errno)); return IndexError_MMapFile; @@ -477,7 +408,6 @@ int IndexMapping::init_index_mapping(size_t len) { // Unpack header header_ = reinterpret_cast(start); - header_addr_map_.insert({current_header_start_offset_, header_}); if (header_->meta_header_size != sizeof(IndexFormat::MetaHeader)) { return IndexError_InvalidLength; } @@ -486,15 +416,6 @@ int IndexMapping::init_index_mapping(size_t len) { return IndexError_InvalidChecksum; } - switch (header_->version) { - case IndexFormat::CURRENT_FORMAT_VERSION: - case IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002: - break; - default: - LOG_ERROR("Unsupported index version: %u", header_->version); - return IndexError_Unsupported; - } - // Unpack footer if (header_->meta_footer_size != sizeof(IndexFormat::MetaFooter)) { return IndexError_InvalidLength; @@ -545,34 +466,12 @@ int IndexMapping::init_index_mapping(size_t len) { if (iter->segment_id_offset > footer_->segments_meta_size) { return IndexError_InvalidValue; } - // TODO: this is a simplified format compatibility handling logic, - // and in the future, it should become a independent function. - if (header_->version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { - // v2: data offset is relative to the content_offset. - if (iter->data_absolute_offset > footer_->content_size) { - return IndexError_InvalidValue; - } - if (iter->data_absolute_offset + iter->data_size > - footer_->content_size) { - return IndexError_InvalidLength; - } - iter->data_absolute_offset += - header_->content_offset + current_header_start_offset_; - // v2 end - } - - // v3: data offset is absolute offset in file. - if (iter->data_absolute_offset > footer_->content_size + - header_->content_offset + - current_header_start_offset_) { + if (iter->data_index > footer_->content_size) { return IndexError_InvalidValue; } - if (iter->data_absolute_offset + iter->data_size > - footer_->content_size + header_->content_offset + - current_header_start_offset_) { + if (iter->data_index + iter->data_size > footer_->content_size) { return IndexError_InvalidLength; } - // v3 end if (iter->segment_id_offset < segment_ids_offset_) { segment_ids_offset_ = iter->segment_id_offset; @@ -586,24 +485,6 @@ int IndexMapping::init_index_mapping(size_t len) { segment_ids_offset_) { return IndexError_InvalidLength; } - - if (header_->version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { - header_->version = IndexFormat::CURRENT_FORMAT_VERSION; - LOG_INFO("Index file format upgraded"); - IndexFormat::UpdateMetaHeader(header_); - footer_->segments_meta_crc = - ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); - IndexFormat::UpdateMetaFooter(footer_, 0); - header_dirty_ = true; - } - - if (footer_->next_meta_header_offset > 0) { - current_header_start_offset_ = footer_->next_meta_header_offset; - // Meta sections have all the same size, so we can use the same size to map - // the next meta section - return this->init_index_mapping(len); - } - return 0; } diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 68200d85..4ac3c6b3 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -90,7 +90,8 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = segment_->meta()->data_absolute_offset + offset; + size_t buffer_offset = + segment_->meta()->data_index + owner_->get_context_offset() + offset; ailego::BufferHandle buffer_handle = owner_->get_buffer_handle(buffer_offset, len); *data = buffer_handle.pin_vector_data(); @@ -105,7 +106,8 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = segment_->meta()->data_absolute_offset + offset; + size_t buffer_offset = + segment_->meta()->data_index + owner_->get_context_offset() + offset; data.reset(owner_->get_buffer_handle_ptr(buffer_offset, len)); if (data.data()) { return len; @@ -233,35 +235,12 @@ class BufferStorage : public IndexStorage { if (iter->segment_id_offset > footer_.segments_meta_size) { return IndexError_InvalidValue; } - - // TODO: this is a simplified format compatibility handling logic, - // and in the future, it should become a independent function. - if (header_.version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { - // v2: data offset is relative to the content_offset. - if (iter->data_absolute_offset > footer_.content_size) { - return IndexError_InvalidValue; - } - if (iter->data_absolute_offset + iter->data_size > - footer_.content_size) { - return IndexError_InvalidLength; - } - iter->data_absolute_offset += - header_.content_offset + current_header_start_offset_; - // v2 end - } - - // v3: data offset is absolute offset in file. - if (iter->data_absolute_offset > footer_.content_size + - header_.content_offset + - current_header_start_offset_) { + if (iter->data_index > footer_.content_size) { return IndexError_InvalidValue; } - if (iter->data_absolute_offset + iter->data_size > - footer_.content_size + header_.content_offset + - current_header_start_offset_) { + if (iter->data_index + iter->data_size > footer_.content_size) { return IndexError_InvalidLength; } - // v3 end if (iter->segment_id_offset < segment_ids_offset) { segment_ids_offset = iter->segment_id_offset; @@ -279,59 +258,25 @@ class BufferStorage : public IndexStorage { } int ParseToMapping() { - while (true) { - int ret; - ret = ParseHeader(current_header_start_offset_); - if (ret != 0) { - LOG_ERROR("Failed to parse header, errno %d, %s", ret, - IndexError::What(ret)); - return ret; - } + ParseHeader(0); - switch (header_.version) { - case IndexFormat::CURRENT_FORMAT_VERSION: - case IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002: - break; - default: - LOG_ERROR("Unsupported index version: %u", header_.version); - return IndexError_Unsupported; - } - - // Unpack footer - if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { - return IndexError_InvalidLength; - } - if ((int32_t)header_.meta_footer_offset < 0) { - return IndexError_Unsupported; - } - uint64_t footer_offset = - header_.meta_footer_offset + current_header_start_offset_; - ret = ParseFooter(footer_offset); - if (ret != 0) { - LOG_ERROR("Failed to parse footer, errno %d, %s", ret, - IndexError::What(ret)); - return ret; - } - - // Unpack segment table - if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > - footer_.segments_meta_size) { - return IndexError_InvalidLength; - } - const uint64_t segment_start_offset = - footer_offset - footer_.segments_meta_size; - ret = ParseSegment(segment_start_offset); - if (ret != 0) { - LOG_ERROR("Failed to parse segment, errno %d, %s", ret, - IndexError::What(ret)); - return ret; - } + // Unpack footer + if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { + return IndexError_InvalidLength; + } + if ((int32_t)header_.meta_footer_offset < 0) { + return IndexError_Unsupported; + } + size_t footer_offset = header_.meta_footer_offset; + ParseFooter(footer_offset); - if (footer_.next_meta_header_offset == 0) { - break; - } - current_header_start_offset_ = footer_.next_meta_header_offset; + // Unpack segment table + if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > + footer_.segments_meta_size) { + return IndexError_InvalidLength; } + const int segment_start_offset = footer_offset - footer_.segments_meta_size; + ParseSegment(segment_start_offset); return 0; } @@ -489,7 +434,6 @@ class BufferStorage : public IndexStorage { IndexFormat::MetaHeader header_; IndexFormat::MetaFooter footer_; std::map segments_{}; - uint64_t current_header_start_offset_{0u}; }; INDEX_FACTORY_REGISTER_STORAGE(BufferStorage); diff --git a/src/include/zvec/core/framework/index_format.h b/src/include/zvec/core/framework/index_format.h index 95d6b9d5..d3b7179c 100644 --- a/src/include/zvec/core/framework/index_format.h +++ b/src/include/zvec/core/framework/index_format.h @@ -27,12 +27,7 @@ namespace core { struct IndexFormat { /*! Version Number */ - enum { - CURRENT_FORMAT_VERSION = 0x0003, - // 0x0002: data offset in SegmentMeta is relative to the content_offset, - // while new version is absolute offset in file. - COMPATIBLE_FORMAT_VERSION_0X0002 = 0x0002, - }; + enum { FORMAT_VERSION = 0x0002 }; /*! Index Format Meta Header */ @@ -45,7 +40,8 @@ struct IndexFormat { uint16_t meta_header_size; uint16_t meta_footer_size; uint32_t meta_footer_offset; - uint64_t content_offset; + uint32_t content_offset; + uint32_t reserved2_; uint64_t setup_time; uint64_t reserved3_[3]; }; @@ -60,17 +56,13 @@ struct IndexFormat { uint32_t segments_meta_crc; uint32_t content_crc; uint32_t segment_count; - // meta section size uint32_t segments_meta_size; uint32_t reserved1_; - // segments' data section uint64_t content_size; uint64_t content_padding_size; - uint64_t check_point; uint64_t update_time; - uint64_t reserved2_[7]; - uint64_t next_meta_header_offset; + uint64_t reserved2_[8]; uint64_t total_size; }; @@ -81,10 +73,8 @@ struct IndexFormat { */ struct SegmentMeta { uint32_t segment_id_offset; - // used only by immutable segments, e.g., IndexMeta, or searcher uint32_t data_crc; - // offset in file - uint64_t data_absolute_offset; + uint64_t data_index; uint64_t data_size; uint64_t padding_size; }; @@ -110,7 +100,7 @@ struct IndexFormat { } SegmentMeta *meta = (SegmentMeta *)buffer_.data() + count_; meta->segment_id_offset = static_cast(buffer_.size()); - meta->data_absolute_offset = offset_; + meta->data_index = offset_; meta->data_size = data_size; meta->data_crc = data_crc; meta->padding_size = padding_size; @@ -155,7 +145,7 @@ struct IndexFormat { static void SetupMetaHeader(MetaHeader *header, uint32_t footer_offset, uint32_t content_offset) { memset(header, 0, sizeof(MetaHeader)); - header->version = IndexFormat::CURRENT_FORMAT_VERSION; + header->version = IndexFormat::FORMAT_VERSION; header->revision = 0; header->magic = std::random_device()(); header->meta_header_size = sizeof(MetaHeader); @@ -166,11 +156,6 @@ struct IndexFormat { header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); } - static void UpdateMetaHeader(MetaHeader *header) { - header->header_crc = 0; - header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); - } - //! Setup meta footer structure static void SetupMetaFooter(MetaFooter *footer) { memset(footer, 0, sizeof(MetaFooter)); diff --git a/src/include/zvec/core/framework/index_mapping.h b/src/include/zvec/core/framework/index_mapping.h index cb6f932f..7074088c 100644 --- a/src/include/zvec/core/framework/index_mapping.h +++ b/src/include/zvec/core/framework/index_mapping.h @@ -120,6 +120,8 @@ class IndexMapping { //! Create a index file int create(const std::string &path, size_t segs_size); + int create_hugepage(size_t segs_size); + //! Close the index void close(void); @@ -181,9 +183,6 @@ class IndexMapping { bool Ishugetlbfs(const std::string &path) const; - int init_meta_section(); - int init_hugepage_meta_section(); - private: //! Disable them IndexMapping(const IndexMapping &) = delete; @@ -193,19 +192,15 @@ class IndexMapping { uint32_t segment_ids_offset_{0}; IndexFormat::SegmentMeta *segment_start_{nullptr}; IndexFormat::MetaHeader *header_{nullptr}; - std::map header_addr_map_{}; IndexFormat::MetaFooter *footer_{nullptr}; std::map segments_{}; size_t index_size_{0u}; ailego::File file_{}; - std::string path_; bool copy_on_write_{false}; bool full_mode_{false}; bool header_dirty_{false}; bool huge_page_{false}; - size_t seg_meta_capacity_{0u}; - uint64_t current_header_start_offset_{0u}; }; } // namespace core -} // namespace zvec \ No newline at end of file +} // namespace zvec diff --git a/src/include/zvec/core/framework/index_unpacker.h b/src/include/zvec/core/framework/index_unpacker.h index 35fea5d2..425ad12d 100644 --- a/src/include/zvec/core/framework/index_unpacker.h +++ b/src/include/zvec/core/framework/index_unpacker.h @@ -107,29 +107,22 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); - while (true) { - if (!this->unpack_header(read_data)) { - LOG_ERROR("Failed to unpack index header"); - return false; - } - if (!this->unpack_footer(read_data, total)) { - LOG_ERROR("Failed to unpack index footer"); - return false; - } - if (!this->unpack_segments(read_data, total)) { - LOG_ERROR("Failed to unpack index segments' meta"); - return false; - } - if (checksum && !this->validate_checksum(read_data)) { - LOG_ERROR("Failed to validate checksum of index content"); - return false; - } - if (footer_.next_meta_header_offset == 0) { - break; - } - current_header_start_offset_ = footer_.next_meta_header_offset; + if (!this->unpack_header(read_data)) { + LOG_ERROR("Failed to unpack index header"); + return false; + } + if (!this->unpack_footer(read_data, total)) { + LOG_ERROR("Failed to unpack index footer"); + return false; + } + if (!this->unpack_segments(read_data, total)) { + LOG_ERROR("Failed to unpack index segments' meta"); + return false; + } + if (checksum && !this->validate_checksum(read_data)) { + LOG_ERROR("Failed to validate checksum of index content"); + return false; } - if (!this->unpack_version(read_data)) { LOG_ERROR("Failed to unpack index version"); return false; @@ -144,8 +137,7 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); const void *data = nullptr; - if (read_data(current_header_start_offset_, &data, sizeof(header_)) != - sizeof(header_)) { + if (read_data(0u, &data, sizeof(header_)) != sizeof(header_)) { return false; } @@ -157,13 +149,6 @@ class IndexUnpacker { header_.header_crc) { return false; } - - if (header_.version != IndexFormat::CURRENT_FORMAT_VERSION) { - LOG_ERROR( - "This index is an old version, please use mmap(rw)+streamer to open " - "the index and perform auto upgrade of the index format"); - return false; - } return true; } @@ -185,15 +170,15 @@ class IndexUnpacker { } const void *data = nullptr; - if (read_data(current_header_start_offset_ + footer_offset, &data, - sizeof(footer_)) != sizeof(footer_)) { + if (read_data(footer_offset, &data, sizeof(footer_)) != sizeof(footer_)) { return false; } memcpy(&footer_, data, sizeof(footer_)); - if (footer_.content_size + footer_.content_padding_size + - header_.content_offset > - footer_.total_size) { + if ((footer_.total_size != total) || + (footer_.content_size + footer_.content_padding_size + + header_.content_offset > + total)) { return false; } if (ailego::Crc32c::Hash(&footer_, sizeof(footer_), footer_.footer_crc) != @@ -222,8 +207,8 @@ class IndexUnpacker { offset -= footer_.segments_meta_size; const void *data = nullptr; - if (read_data(current_header_start_offset_ + offset, &data, - footer_.segments_meta_size) != footer_.segments_meta_size) { + if (read_data(offset, &data, footer_.segments_meta_size) != + footer_.segments_meta_size) { return false; } if (ailego::Crc32c::Hash(data, footer_.segments_meta_size, 0u) != @@ -236,20 +221,17 @@ class IndexUnpacker { if (seg->segment_id_offset > footer_.segments_meta_size) { return false; } - if (seg->data_absolute_offset > footer_.content_size + - header_.content_offset + - current_header_start_offset_) { + if (seg->data_index > footer_.content_size) { return false; } - if (seg->data_absolute_offset + seg->data_size > - footer_.content_size + header_.content_offset + - current_header_start_offset_) { + if (seg->data_index + seg->data_size > footer_.content_size) { return false; } - segments_.emplace(std::string(reinterpret_cast(data) + - seg->segment_id_offset), - SegmentMeta(seg->data_absolute_offset, seg->data_size, - seg->padding_size, seg->data_crc)); + segments_.emplace( + std::string(reinterpret_cast(data) + + seg->segment_id_offset), + SegmentMeta(seg->data_index + header_.content_offset, seg->data_size, + seg->padding_size, seg->data_crc)); } return true; } @@ -269,7 +251,7 @@ class IndexUnpacker { const SegmentMeta &segment = it->second; const void *data = nullptr; - if (read_data(0 + segment.data_offset(), &data, segment.data_size()) != + if (read_data(segment.data_offset(), &data, segment.data_size()) != segment.data_size()) { return false; } @@ -298,16 +280,14 @@ class IndexUnpacker { size_t offset = sizeof(header_); while (total >= block_size) { - if (read_data(current_header_start_offset_ + offset, &data, block_size) != - block_size) { + if (read_data(offset, &data, block_size) != block_size) { return false; } checksum = ailego::Crc32c::Hash(data, block_size, checksum); total -= block_size; offset += block_size; } - if (read_data(current_header_start_offset_ + offset, &data, total) != - total) { + if (read_data(offset, &data, total) != total) { return false; } checksum = ailego::Crc32c::Hash(data, total, checksum); @@ -315,11 +295,10 @@ class IndexUnpacker { } private: - IndexFormat::MetaHeader header_{}; - IndexFormat::MetaFooter footer_{}; + IndexFormat::MetaHeader header_; + IndexFormat::MetaFooter footer_; std::string version_{}; std::map segments_{}; - uint64_t current_header_start_offset_{0u}; }; } // namespace core diff --git a/tools/core/helper.h b/tools/core/helper.h index 037958e5..c24c89ff 100644 --- a/tools/core/helper.h +++ b/tools/core/helper.h @@ -72,7 +72,7 @@ int parse_and_load_index_param( index_config.as()); index = core_interface::IndexFactory::CreateAndInitIndex(*params); if (!index) { - LOG_ERROR("Failed to create index"); + cerr << "Failed to create index" << endl; return -1; } core_interface::StorageOptions storage_options; @@ -82,13 +82,13 @@ int parse_and_load_index_param( int ret = index->Open(index_dir, storage_options); if (0 != ret) { - LOG_ERROR("Index open failed with ret %d", ret); - return -1; + cerr << "Index open failed with ret " << ret << endl; + return false; } cout << "Load index done!" << endl; } else { - LOG_ERROR("IndexCommon.IndexConfig is required"); + cerr << "IndexCommon.IndexConfig is required" << endl; return -1; } @@ -114,7 +114,7 @@ int parse_and_load_index_param( core_interface::BaseIndexQueryParam>( query_param_config.as()); if (!query_param) { - LOG_ERROR("Failed to deserialize query params"); + cerr << "Failed to deserialize query params" << endl; return -1; } } @@ -130,7 +130,8 @@ int parse_and_load_index_param( auto scale_factor = scale_factor_config.as(); refiner_param->scale_factor_ = scale_factor; } else { - LOG_ERROR("QueryConfig.RefinerConfig.ScaleFactor config is required"); + cerr << "QueryConfig.RefinerConfig.ScaleFactor config is required" + << endl; return -1; } @@ -148,9 +149,9 @@ int parse_and_load_index_param( reference_index = core_interface::IndexFactory::CreateAndInitIndex(*params); } else { - LOG_ERROR( - "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " - "required"); + cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " + "required" + << endl; return -1; } @@ -168,20 +169,20 @@ int parse_and_load_index_param( int ret = reference_index->Open(reference_index_path, storage_options); if (0 != ret) { - LOG_ERROR("Index open failed with ret %d", ret); + cerr << "Index open failed with ret " << ret << endl; return -1; } cout << "Load reference index done!" << endl; } else { - LOG_ERROR( - "QueryConfig.RefinerConfig.ReferenceIndex.Path is required"); + cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Path is required" + << endl; return -1; } refiner_param->reference_index = reference_index; } else { - LOG_ERROR( - "QueryConfig.RefinerConfig.ReferenceIndex section is required"); + cerr << "QueryConfig.RefinerConfig.ReferenceIndex section is required" + << endl; return -1; } // QueryConfig.RefinerConfig.ReferenceIndex diff --git a/tools/core/local_builder.cc b/tools/core/local_builder.cc index f6d80510..28226418 100644 --- a/tools/core/local_builder.cc +++ b/tools/core/local_builder.cc @@ -157,9 +157,9 @@ static inline size_t AlignSize(size_t size) { return (size + 0x1F) & (~0x1F); } -bool dump_meta_segment(const IndexDumper::Pointer &dumper, - const std::string &segment_id, const void *data, - size_t size, size_t &writes) { +int64_t dump_meta_segment(const IndexDumper::Pointer &dumper, + const std::string &segment_id, const void *data, + size_t size, size_t &writes) { size_t len = dumper->write(data, size); if (len != size) { LOG_ERROR("Dump segment %s data failed, expect: %lu, actual: %lu", @@ -373,17 +373,17 @@ int build_sparse_by_streamer(IndexStreamer::Pointer &streamer, ailego::Params params; int ret = storage->init(params); if (ret != 0) { - cerr << "Storage Failed init" << endl; + cerr << "Storage Failed init"; return IndexError_Runtime; } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open" << endl; + cerr << "Storage Failed to open"; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage" << endl; + cerr << "Failed to open storage"; return IndexError_Runtime; } @@ -549,12 +549,12 @@ int build_by_streamer(IndexStreamer::Pointer &streamer, } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open" << endl; + cerr << "Storage Failed to open"; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage" << endl; + cerr << "Failed to open storage"; return IndexError_Runtime; } diff --git a/tools/core/recall.cc b/tools/core/recall.cc index 7fa0a9bc..75d55ba4 100644 --- a/tools/core/recall.cc +++ b/tools/core/recall.cc @@ -1478,6 +1478,27 @@ void usage(void) { cout << "Usage: recall CONFIG.yaml [plugin file path]" << endl; } +bool load_index(core_interface::Index::Pointer index, string &index_dir, + std::vector> &id_to_tags_list, + std::vector &tag_key_list) { + core_interface::StorageOptions storage_options; + storage_options.type = core_interface::StorageOptions::StorageType::kMMAP; + storage_options.create_new = false; + storage_options.read_only = true; + + int ret = index->Open(index_dir, storage_options); + if (0 != ret) { + cerr << "Index open failed with ret " << ret << endl; + return false; + } + + // Load tag lists if available + load_taglists(index_dir, id_to_tags_list, tag_key_list); + + cout << "Load index done!" << endl; + return true; +}; + int recall_dense(std::string &query_type, size_t thread_count, size_t batch_count, string top_k, size_t gt_count, string query_file, string &first_sep, string &second_sep, @@ -1591,10 +1612,7 @@ int recall_sparse(std::string &query_type, size_t thread_count, std::vector> id_to_tags_list; std::vector tag_key_list; // Load tag lists if available - if (load_taglists(index_dir, id_to_tags_list, tag_key_list) != 0) { - cerr << "Failed to load tag lists" << endl; - return -1; - } + load_taglists(index_dir, id_to_tags_list, tag_key_list); recall.set_tag_lists(id_to_tags_list, tag_key_list); @@ -1620,12 +1638,12 @@ int get_recall_precision(string &recall_precision_string) { } catch (const std::invalid_argument &e) { std::cerr << "Exeception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return -1; + return false; } catch (const std::out_of_range &e) { std::cerr << "Out of range exception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return -1; + return false; } return true; From c1a467e0f49deabc2032f6a24712967860bc9e71 Mon Sep 17 00:00:00 2001 From: Jianning Wang Date: Wed, 4 Feb 2026 18:17:33 +0800 Subject: [PATCH 4/4] v2: don't change to absolute offset --- src/core/framework/index_mapping.cc | 167 ++++++++++++++---- src/core/utility/buffer_storage.cc | 128 +++++++++----- .../zvec/core/framework/index_format.h | 15 +- .../zvec/core/framework/index_mapping.h | 19 +- .../zvec/core/framework/index_unpacker.h | 72 ++++---- tools/core/helper.h | 29 ++- tools/core/local_builder.cc | 16 +- tools/core/recall.cc | 30 +--- 8 files changed, 317 insertions(+), 159 deletions(-) diff --git a/src/core/framework/index_mapping.cc b/src/core/framework/index_mapping.cc index 41475d0e..b27ecef3 100644 --- a/src/core/framework/index_mapping.cc +++ b/src/core/framework/index_mapping.cc @@ -82,6 +82,7 @@ static inline int UnpackMappingSize(ailego::File &file, size_t *len) { } int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { + path_ = path; full_mode_ = full_mode; copy_on_write_ = cow; huge_page_ = Ishugetlbfs(path); @@ -108,7 +109,12 @@ int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { return this->init_index_mapping(mapping_size); } -int IndexMapping::create(const std::string &path, size_t segs_size) { +int IndexMapping::create(const std::string &path, size_t seg_meta_capacity) { + path_ = path; + seg_meta_capacity_ = seg_meta_capacity; + current_header_start_offset_ = 0; + + // write() & copying to mmap() will auto extend the file size if (!file_.create(path.c_str(), 0)) { LOG_ERROR("Failed to create file %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -116,17 +122,34 @@ int IndexMapping::create(const std::string &path, size_t segs_size) { } huge_page_ = Ishugetlbfs(path); if (huge_page_) { - return create_hugepage(segs_size); + return init_hugepage_meta_section(); } - size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - false); + return init_meta_section(); +} + +int IndexMapping::init_meta_section() { + if (current_header_start_offset_ % ailego::MemoryHelper::PageSize() != 0) { + LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", + current_header_start_offset_, ailego::MemoryHelper::PageSize()); + return IndexError_InvalidValue; + } + + auto &path = path_; + size_t len = + CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + false); IndexFormat::MetaHeader meta_header; IndexFormat::MetaFooter meta_footer; // Write index header IndexFormat::SetupMetaHeader(&meta_header, len - sizeof(meta_footer), len); + if (!file_.seek(current_header_start_offset_, ailego::File::Origin::Begin)) { + LOG_ERROR("Failed to seek file %s, errno %d, %s", path.c_str(), errno, + std::strerror(errno)); + return IndexError_SeekFile; + } if (file_.write(&meta_header, sizeof(meta_header)) != sizeof(meta_header)) { LOG_ERROR("Failed to write file: %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -155,11 +178,18 @@ int IndexMapping::create(const std::string &path, size_t segs_size) { return this->init_index_mapping(len); } -int IndexMapping::create_hugepage(size_t segs_size) { - size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - true); - int file_offset = 0; +int IndexMapping::init_hugepage_meta_section() { + ssize_t file_offset = (ssize_t)current_header_start_offset_; + if (file_offset % ailego::MemoryHelper::HugePageSize() != 0) { + LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", + file_offset, ailego::MemoryHelper::HugePageSize()); + return IndexError_InvalidValue; + } + + size_t len = + CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + true); int opts = ailego::File::MMAP_SHARED | ailego::File::MMAP_HUGE_PAGE; void *addr = ailego::File::MemoryMap(file_.native_handle(), file_offset, len, opts); @@ -194,6 +224,7 @@ int IndexMapping::create_hugepage(size_t segs_size) { IndexFormat::UpdateMetaFooter(&meta_footer, 0); memcpy((char *)addr + file_offset, &meta_footer, sizeof(meta_footer)); file_offset += sizeof(meta_footer); + return this->init_index_mapping(len); } @@ -201,12 +232,16 @@ void IndexMapping::close(void) { // Unmap all memory this->unmap_all(); if (header_) { - ailego::File::MemoryUnmap(header_, header_->content_offset); + for (auto item : header_addr_map_) { + auto header = item.second; + ailego::File::MemoryUnmap(header, header->content_offset); + } } // Reset members segment_ids_offset_ = 0; segment_start_ = nullptr; header_ = nullptr; + header_addr_map_.clear(); footer_ = nullptr; index_size_ = 0u; segments_.clear(); @@ -218,9 +253,19 @@ void IndexMapping::close(void) { } void IndexMapping::refresh(uint64_t check_point) { - footer_->segments_meta_crc = - ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); - IndexFormat::UpdateMetaFooter(footer_, check_point); + // support add_with_id + for (auto item : header_addr_map_) { + auto header_start_offset = item.first; + auto header = item.second; + auto footer = reinterpret_cast( + reinterpret_cast(header) + header->meta_footer_offset); + auto segment_start = reinterpret_cast( + reinterpret_cast(header) + + (header->meta_footer_offset - footer->segments_meta_size)); + footer->segments_meta_crc = + ailego::Crc32c::Hash(segment_start, footer->segments_meta_size, 0); + IndexFormat::UpdateMetaFooter(footer, check_point); + } header_dirty_ = true; } @@ -238,7 +283,21 @@ int IndexMapping::append(const std::string &id, size_t size) { size_t need_size = sizeof(IndexFormat::SegmentMeta) + id_size; if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count + need_size > segment_ids_offset_) { - return IndexError_NoBuffer; + LOG_DEBUG("segment meta section expanded: %s", path_.c_str()); + footer_->next_meta_header_offset = index_size_; + refresh(0); + flush(); + // mmap file storage write() will update segment's meta + // ailego::File::MemoryUnmap(header_, header_->content_offset); + header_ = nullptr; + footer_ = nullptr; + + current_header_start_offset_ = index_size_; + const int ret = + huge_page_ ? init_hugepage_meta_section() : init_meta_section(); + if (ret != 0) { + return ret; + } } if (!copy_on_write_ && !file_.truncate(index_size_ + size)) { @@ -251,7 +310,8 @@ int IndexMapping::append(const std::string &id, size_t size) { segment_ids_offset_ -= static_cast(id_size); IndexFormat::SegmentMeta *segment = segment_start_ + footer_->segment_count; segment->segment_id_offset = segment_ids_offset_; - segment->data_index = index_size_ - header_->content_offset; + segment->data_index = + index_size_ - header_->content_offset - current_header_start_offset_; segment->data_size = 0; segment->data_crc = 0; segment->padding_size = size; @@ -265,7 +325,8 @@ int IndexMapping::append(const std::string &id, size_t size) { footer_->content_size += size; footer_->total_size += size; IndexFormat::UpdateMetaFooter(footer_, 0); - segments_.emplace(id, segment); + segments_.emplace( + id, SegmentInfo{Segment{segment}, current_header_start_offset_, header_}); header_dirty_ = true; return 0; } @@ -276,11 +337,14 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup, if (iter == segments_.end()) { return nullptr; } - Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (!item->data()) { auto meta = item->meta(); size_t mapping_size = meta->data_size + meta->padding_size; - size_t offset = meta->data_index + header_->content_offset; + size_t offset = segment_info.segment_header_start_offset + + segment_info.segment_header->content_offset + + meta->data_index; void *addr = nullptr; if (!copy_on_write_) { @@ -326,7 +390,8 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup, void IndexMapping::unmap(const std::string &id) { auto iter = segments_.find(id); if (iter != segments_.end()) { - Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (item->data()) { ailego::File::MemoryUnmap( @@ -338,7 +403,8 @@ void IndexMapping::unmap(const std::string &id) { void IndexMapping::unmap_all(void) { for (auto iter = segments_.begin(); iter != segments_.end(); ++iter) { - Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (item->data()) { ailego::File::MemoryUnmap( @@ -356,14 +422,17 @@ int IndexMapping::flush(void) { } for (auto iter = segments_.begin(); iter != segments_.end(); ++iter) { - const Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (!item->data() || !item->dirty()) { continue; } size_t segment_size = item->meta()->data_size + item->meta()->padding_size; if (full_mode_ && copy_on_write_) { - size_t off = header_->content_offset + item->meta()->data_index; + size_t off = segment_info.segment_header_start_offset + + segment_info.segment_header->content_offset + + item->meta()->data_index; if (file_.write(off, item->data(), segment_size) != segment_size) { LOG_ERROR("Failed to write segment, size %zu, errno %d, %s", segment_size, errno, std::strerror(errno)); @@ -381,14 +450,21 @@ int IndexMapping::flush(void) { header_dirty_ = false; if (full_mode_ && copy_on_write_) { - if (file_.write(0, header_, header_->content_offset) != - header_->content_offset) { - LOG_ERROR("Failed to write segment, size %u, errno %d, %s", - header_->content_offset, errno, std::strerror(errno)); - return IndexError_WriteData; + for (auto item : header_addr_map_) { + auto header_start_offset = item.first; + auto header = item.second; + if (file_.write(header_start_offset, header, header->content_offset) != + header->content_offset) { + LOG_ERROR("Failed to write segment, size %lu, errno %d, %s", + header->content_offset, errno, std::strerror(errno)); + return IndexError_WriteData; + } } } else { - ailego::File::MemoryFlush(header_, header_->content_offset); + for (auto item : header_addr_map_) { + auto header = item.second; + ailego::File::MemoryFlush(header, header->content_offset); + } } return 0; } @@ -399,8 +475,8 @@ int IndexMapping::init_index_mapping(size_t len) { if (huge_page_) { opts |= ailego::File::MMAP_HUGE_PAGE; } - uint8_t *start = reinterpret_cast( - ailego::File::MemoryMap(file_.native_handle(), 0, len, opts)); + uint8_t *start = reinterpret_cast(ailego::File::MemoryMap( + file_.native_handle(), current_header_start_offset_, len, opts)); if (!start) { LOG_ERROR("Failed to map file, errno %d, %s", errno, std::strerror(errno)); return IndexError_MMapFile; @@ -408,6 +484,7 @@ int IndexMapping::init_index_mapping(size_t len) { // Unpack header header_ = reinterpret_cast(start); + header_addr_map_.insert({current_header_start_offset_, header_}); if (header_->meta_header_size != sizeof(IndexFormat::MetaHeader)) { return IndexError_InvalidLength; } @@ -416,6 +493,14 @@ int IndexMapping::init_index_mapping(size_t len) { return IndexError_InvalidChecksum; } + switch (header_->version) { + case IndexFormat::FORMAT_VERSION: + break; + default: + LOG_ERROR("Unsupported index version: %u", header_->version); + return IndexError_Unsupported; + } + // Unpack footer if (header_->meta_footer_size != sizeof(IndexFormat::MetaFooter)) { return IndexError_InvalidLength; @@ -479,12 +564,30 @@ int IndexMapping::init_index_mapping(size_t len) { segments_.emplace( std::string(reinterpret_cast(segment_start_) + iter->segment_id_offset), - iter); + SegmentInfo{Segment{iter}, current_header_start_offset_, header_}); } if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count > segment_ids_offset_) { return IndexError_InvalidLength; } + + // if (header_->version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { + // header_->version = IndexFormat::CURRENT_FORMAT_VERSION; + // LOG_INFO("Index file format upgraded"); + // IndexFormat::UpdateMetaHeader(header_); + // footer_->segments_meta_crc = + // ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); + // IndexFormat::UpdateMetaFooter(footer_, 0); + // header_dirty_ = true; + // } + + if (footer_->next_meta_header_offset > 0) { + current_header_start_offset_ = footer_->next_meta_header_offset; + // Meta sections have all the same size, so we can use the same size to map + // the next meta section + return this->init_index_mapping(len); + } + return 0; } diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 4ac3c6b3..bccf07e2 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -29,21 +29,24 @@ class BufferStorage : public IndexStorage { public: /*! Index Storage Segment */ - class Segment : public IndexStorage::Segment, - public std::enable_shared_from_this { + class WrappedSegment : public IndexStorage::Segment, + public std::enable_shared_from_this { public: //! Index Storage Pointer typedef std::shared_ptr Pointer; //! Constructor - Segment(BufferStorage *owner, IndexMapping::Segment *segment) + WrappedSegment(BufferStorage *owner, IndexMapping::Segment *segment, + uint64_t segment_header_start_offset, + IndexFormat::MetaHeader *segment_header) : segment_(segment), owner_(owner), capacity_(static_cast(segment->meta()->data_size + - segment->meta()->padding_size)) {} - + segment->meta()->padding_size)), + segment_header_start_offset_(segment_header_start_offset), + segment_header_(segment_header) {} //! Destructor - virtual ~Segment(void) {} + virtual ~WrappedSegment(void) {} //! Retrieve size of data size_t data_size(void) const override { @@ -90,8 +93,9 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = - segment_->meta()->data_index + owner_->get_context_offset() + offset; + size_t buffer_offset = segment_header_start_offset_ + + segment_header_->content_offset + + segment_->meta()->data_index + offset; ailego::BufferHandle buffer_handle = owner_->get_buffer_handle(buffer_offset, len); *data = buffer_handle.pin_vector_data(); @@ -106,8 +110,9 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = - segment_->meta()->data_index + owner_->get_context_offset() + offset; + size_t buffer_offset = segment_header_start_offset_ + + segment_header_->content_offset + + segment_->meta()->data_index + offset; data.reset(owner_->get_buffer_handle_ptr(buffer_offset, len)); if (data.data()) { return len; @@ -139,10 +144,15 @@ class BufferStorage : public IndexStorage { return shared_from_this(); } - private: + protected: + friend BufferStorage; IndexMapping::Segment *segment_{}; + + private: BufferStorage *owner_{nullptr}; size_t capacity_{}; + uint64_t segment_header_start_offset_; + IndexFormat::MetaHeader *segment_header_; }; //! Destructor @@ -248,7 +258,8 @@ class BufferStorage : public IndexStorage { segments_.emplace( std::string(reinterpret_cast(segment_start) + iter->segment_id_offset), - iter); + IndexMapping::SegmentInfo{IndexMapping::Segment{iter}, + current_header_start_offset_, &header_}); if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > footer_.segments_meta_size) { return IndexError_InvalidLength; @@ -258,25 +269,58 @@ class BufferStorage : public IndexStorage { } int ParseToMapping() { - ParseHeader(0); + while (true) { + int ret; + ret = ParseHeader(current_header_start_offset_); + if (ret != 0) { + LOG_ERROR("Failed to parse header, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } - // Unpack footer - if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { - return IndexError_InvalidLength; - } - if ((int32_t)header_.meta_footer_offset < 0) { - return IndexError_Unsupported; - } - size_t footer_offset = header_.meta_footer_offset; - ParseFooter(footer_offset); + switch (header_.version) { + case IndexFormat::FORMAT_VERSION: + break; + default: + LOG_ERROR("Unsupported index version: %u", header_.version); + return IndexError_Unsupported; + } - // Unpack segment table - if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > - footer_.segments_meta_size) { - return IndexError_InvalidLength; + // Unpack footer + if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { + return IndexError_InvalidLength; + } + if ((int32_t)header_.meta_footer_offset < 0) { + return IndexError_Unsupported; + } + uint64_t footer_offset = + header_.meta_footer_offset + current_header_start_offset_; + ret = ParseFooter(footer_offset); + if (ret != 0) { + LOG_ERROR("Failed to parse footer, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } + + // Unpack segment table + if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > + footer_.segments_meta_size) { + return IndexError_InvalidLength; + } + const uint64_t segment_start_offset = + footer_offset - footer_.segments_meta_size; + ret = ParseSegment(segment_start_offset); + if (ret != 0) { + LOG_ERROR("Failed to parse segment, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } + + if (footer_.next_meta_header_offset == 0) { + break; + } + current_header_start_offset_ = footer_.next_meta_header_offset; } - const int segment_start_offset = footer_offset - footer_.segments_meta_size; - ParseSegment(segment_start_offset); return 0; } @@ -308,11 +352,13 @@ class BufferStorage : public IndexStorage { //! Retrieve a segment by id IndexStorage::Segment::Pointer get(const std::string &id, int) override { - IndexMapping::Segment *segment = this->get_segment(id); - if (!segment) { - return BufferStorage::Segment::Pointer(); + auto segment_info = this->get_segment_info(id); + if (!segment_info) { + return WrappedSegment::Pointer{}; } - return std::make_shared(this, segment); + return std::make_shared( + this, &segment_info->segment, segment_info->segment_header_start_offset, + segment_info->segment_header); } //! Test if it a segment exists @@ -325,10 +371,6 @@ class BufferStorage : public IndexStorage { return header_.magic; } - uint32_t get_context_offset() { - return header_.content_offset; - } - protected: //! Initialize index version segment int init_version_segment(void) { @@ -339,7 +381,7 @@ class BufferStorage : public IndexStorage { return error_code; } - IndexMapping::Segment *segment = get_segment(INDEX_VERSION_SEGMENT_NAME); + auto segment = &get_segment_info(INDEX_VERSION_SEGMENT_NAME)->segment; if (!segment) { return IndexError_MMapFile; } @@ -408,14 +450,13 @@ class BufferStorage : public IndexStorage { } //! Get a segment from storage - IndexMapping::Segment *get_segment(const std::string &id) { + IndexMapping::SegmentInfo *get_segment_info(const std::string &id) { std::lock_guard latch(mapping_mutex_); auto iter = segments_.find(id); if (iter == segments_.end()) { return nullptr; } - IndexMapping::Segment *item = &iter->second; - return item; + return &iter->second; } private: @@ -431,9 +472,10 @@ class BufferStorage : public IndexStorage { // buffer manager std::string file_name_; - IndexFormat::MetaHeader header_; - IndexFormat::MetaFooter footer_; - std::map segments_{}; + IndexFormat::MetaHeader header_{}; + IndexFormat::MetaFooter footer_{}; + std::map segments_{}; + uint64_t current_header_start_offset_{0u}; }; INDEX_FACTORY_REGISTER_STORAGE(BufferStorage); diff --git a/src/include/zvec/core/framework/index_format.h b/src/include/zvec/core/framework/index_format.h index d3b7179c..498bb404 100644 --- a/src/include/zvec/core/framework/index_format.h +++ b/src/include/zvec/core/framework/index_format.h @@ -40,8 +40,7 @@ struct IndexFormat { uint16_t meta_header_size; uint16_t meta_footer_size; uint32_t meta_footer_offset; - uint32_t content_offset; - uint32_t reserved2_; + uint64_t content_offset; uint64_t setup_time; uint64_t reserved3_[3]; }; @@ -56,13 +55,17 @@ struct IndexFormat { uint32_t segments_meta_crc; uint32_t content_crc; uint32_t segment_count; + // meta section size uint32_t segments_meta_size; uint32_t reserved1_; + // segments' data section size uint64_t content_size; uint64_t content_padding_size; + uint64_t check_point; uint64_t update_time; - uint64_t reserved2_[8]; + uint64_t reserved2_[7]; + uint64_t next_meta_header_offset; uint64_t total_size; }; @@ -73,6 +76,7 @@ struct IndexFormat { */ struct SegmentMeta { uint32_t segment_id_offset; + // used only by immutable segments, e.g., IndexMeta, or searcher uint32_t data_crc; uint64_t data_index; uint64_t data_size; @@ -156,6 +160,11 @@ struct IndexFormat { header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); } + static void UpdateMetaHeader(MetaHeader *header) { + header->header_crc = 0; + header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); + } + //! Setup meta footer structure static void SetupMetaFooter(MetaFooter *footer) { memset(footer, 0, sizeof(MetaFooter)); diff --git a/src/include/zvec/core/framework/index_mapping.h b/src/include/zvec/core/framework/index_mapping.h index 7074088c..d2b98095 100644 --- a/src/include/zvec/core/framework/index_mapping.h +++ b/src/include/zvec/core/framework/index_mapping.h @@ -82,6 +82,12 @@ class IndexMapping { mutable bool dirty_{false}; }; + struct SegmentInfo { + Segment segment; + uint64_t segment_header_start_offset; + IndexFormat::MetaHeader *segment_header; + }; + //! Constructor IndexMapping(void) {} @@ -120,8 +126,6 @@ class IndexMapping { //! Create a index file int create(const std::string &path, size_t segs_size); - int create_hugepage(size_t segs_size); - //! Close the index void close(void); @@ -183,6 +187,9 @@ class IndexMapping { bool Ishugetlbfs(const std::string &path) const; + int init_meta_section(); + int init_hugepage_meta_section(); + private: //! Disable them IndexMapping(const IndexMapping &) = delete; @@ -192,15 +199,19 @@ class IndexMapping { uint32_t segment_ids_offset_{0}; IndexFormat::SegmentMeta *segment_start_{nullptr}; IndexFormat::MetaHeader *header_{nullptr}; + std::map header_addr_map_{}; IndexFormat::MetaFooter *footer_{nullptr}; - std::map segments_{}; + std::map segments_{}; size_t index_size_{0u}; ailego::File file_{}; + std::string path_; bool copy_on_write_{false}; bool full_mode_{false}; bool header_dirty_{false}; bool huge_page_{false}; + size_t seg_meta_capacity_{0u}; + uint64_t current_header_start_offset_{0u}; }; } // namespace core -} // namespace zvec +} // namespace zvec \ No newline at end of file diff --git a/src/include/zvec/core/framework/index_unpacker.h b/src/include/zvec/core/framework/index_unpacker.h index 425ad12d..03e091e7 100644 --- a/src/include/zvec/core/framework/index_unpacker.h +++ b/src/include/zvec/core/framework/index_unpacker.h @@ -107,22 +107,29 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); - if (!this->unpack_header(read_data)) { - LOG_ERROR("Failed to unpack index header"); - return false; - } - if (!this->unpack_footer(read_data, total)) { - LOG_ERROR("Failed to unpack index footer"); - return false; - } - if (!this->unpack_segments(read_data, total)) { - LOG_ERROR("Failed to unpack index segments' meta"); - return false; - } - if (checksum && !this->validate_checksum(read_data)) { - LOG_ERROR("Failed to validate checksum of index content"); - return false; + while (true) { + if (!this->unpack_header(read_data)) { + LOG_ERROR("Failed to unpack index header"); + return false; + } + if (!this->unpack_footer(read_data, total)) { + LOG_ERROR("Failed to unpack index footer"); + return false; + } + if (!this->unpack_segments(read_data, total)) { + LOG_ERROR("Failed to unpack index segments' meta"); + return false; + } + if (checksum && !this->validate_checksum(read_data)) { + LOG_ERROR("Failed to validate checksum of index content"); + return false; + } + if (footer_.next_meta_header_offset == 0) { + break; + } + current_header_start_offset_ = footer_.next_meta_header_offset; } + if (!this->unpack_version(read_data)) { LOG_ERROR("Failed to unpack index version"); return false; @@ -137,7 +144,8 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); const void *data = nullptr; - if (read_data(0u, &data, sizeof(header_)) != sizeof(header_)) { + if (read_data(current_header_start_offset_, &data, sizeof(header_)) != + sizeof(header_)) { return false; } @@ -170,15 +178,15 @@ class IndexUnpacker { } const void *data = nullptr; - if (read_data(footer_offset, &data, sizeof(footer_)) != sizeof(footer_)) { + if (read_data(current_header_start_offset_ + footer_offset, &data, + sizeof(footer_)) != sizeof(footer_)) { return false; } memcpy(&footer_, data, sizeof(footer_)); - if ((footer_.total_size != total) || - (footer_.content_size + footer_.content_padding_size + - header_.content_offset > - total)) { + if (footer_.content_size + footer_.content_padding_size + + header_.content_offset > + footer_.total_size) { return false; } if (ailego::Crc32c::Hash(&footer_, sizeof(footer_), footer_.footer_crc) != @@ -207,8 +215,8 @@ class IndexUnpacker { offset -= footer_.segments_meta_size; const void *data = nullptr; - if (read_data(offset, &data, footer_.segments_meta_size) != - footer_.segments_meta_size) { + if (read_data(current_header_start_offset_ + offset, &data, + footer_.segments_meta_size) != footer_.segments_meta_size) { return false; } if (ailego::Crc32c::Hash(data, footer_.segments_meta_size, 0u) != @@ -230,8 +238,9 @@ class IndexUnpacker { segments_.emplace( std::string(reinterpret_cast(data) + seg->segment_id_offset), - SegmentMeta(seg->data_index + header_.content_offset, seg->data_size, - seg->padding_size, seg->data_crc)); + SegmentMeta(seg->data_index + header_.content_offset + + current_header_start_offset_, + seg->data_size, seg->padding_size, seg->data_crc)); } return true; } @@ -251,7 +260,7 @@ class IndexUnpacker { const SegmentMeta &segment = it->second; const void *data = nullptr; - if (read_data(segment.data_offset(), &data, segment.data_size()) != + if (read_data(0 + segment.data_offset(), &data, segment.data_size()) != segment.data_size()) { return false; } @@ -280,14 +289,16 @@ class IndexUnpacker { size_t offset = sizeof(header_); while (total >= block_size) { - if (read_data(offset, &data, block_size) != block_size) { + if (read_data(current_header_start_offset_ + offset, &data, block_size) != + block_size) { return false; } checksum = ailego::Crc32c::Hash(data, block_size, checksum); total -= block_size; offset += block_size; } - if (read_data(offset, &data, total) != total) { + if (read_data(current_header_start_offset_ + offset, &data, total) != + total) { return false; } checksum = ailego::Crc32c::Hash(data, total, checksum); @@ -295,10 +306,11 @@ class IndexUnpacker { } private: - IndexFormat::MetaHeader header_; - IndexFormat::MetaFooter footer_; + IndexFormat::MetaHeader header_{}; + IndexFormat::MetaFooter footer_{}; std::string version_{}; std::map segments_{}; + uint64_t current_header_start_offset_{0u}; }; } // namespace core diff --git a/tools/core/helper.h b/tools/core/helper.h index c24c89ff..037958e5 100644 --- a/tools/core/helper.h +++ b/tools/core/helper.h @@ -72,7 +72,7 @@ int parse_and_load_index_param( index_config.as()); index = core_interface::IndexFactory::CreateAndInitIndex(*params); if (!index) { - cerr << "Failed to create index" << endl; + LOG_ERROR("Failed to create index"); return -1; } core_interface::StorageOptions storage_options; @@ -82,13 +82,13 @@ int parse_and_load_index_param( int ret = index->Open(index_dir, storage_options); if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; - return false; + LOG_ERROR("Index open failed with ret %d", ret); + return -1; } cout << "Load index done!" << endl; } else { - cerr << "IndexCommon.IndexConfig is required" << endl; + LOG_ERROR("IndexCommon.IndexConfig is required"); return -1; } @@ -114,7 +114,7 @@ int parse_and_load_index_param( core_interface::BaseIndexQueryParam>( query_param_config.as()); if (!query_param) { - cerr << "Failed to deserialize query params" << endl; + LOG_ERROR("Failed to deserialize query params"); return -1; } } @@ -130,8 +130,7 @@ int parse_and_load_index_param( auto scale_factor = scale_factor_config.as(); refiner_param->scale_factor_ = scale_factor; } else { - cerr << "QueryConfig.RefinerConfig.ScaleFactor config is required" - << endl; + LOG_ERROR("QueryConfig.RefinerConfig.ScaleFactor config is required"); return -1; } @@ -149,9 +148,9 @@ int parse_and_load_index_param( reference_index = core_interface::IndexFactory::CreateAndInitIndex(*params); } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " - "required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " + "required"); return -1; } @@ -169,20 +168,20 @@ int parse_and_load_index_param( int ret = reference_index->Open(reference_index_path, storage_options); if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; + LOG_ERROR("Index open failed with ret %d", ret); return -1; } cout << "Load reference index done!" << endl; } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Path is required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex.Path is required"); return -1; } refiner_param->reference_index = reference_index; } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex section is required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex section is required"); return -1; } // QueryConfig.RefinerConfig.ReferenceIndex diff --git a/tools/core/local_builder.cc b/tools/core/local_builder.cc index 28226418..f6d80510 100644 --- a/tools/core/local_builder.cc +++ b/tools/core/local_builder.cc @@ -157,9 +157,9 @@ static inline size_t AlignSize(size_t size) { return (size + 0x1F) & (~0x1F); } -int64_t dump_meta_segment(const IndexDumper::Pointer &dumper, - const std::string &segment_id, const void *data, - size_t size, size_t &writes) { +bool dump_meta_segment(const IndexDumper::Pointer &dumper, + const std::string &segment_id, const void *data, + size_t size, size_t &writes) { size_t len = dumper->write(data, size); if (len != size) { LOG_ERROR("Dump segment %s data failed, expect: %lu, actual: %lu", @@ -373,17 +373,17 @@ int build_sparse_by_streamer(IndexStreamer::Pointer &streamer, ailego::Params params; int ret = storage->init(params); if (ret != 0) { - cerr << "Storage Failed init"; + cerr << "Storage Failed init" << endl; return IndexError_Runtime; } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open"; + cerr << "Storage Failed to open" << endl; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage"; + cerr << "Failed to open storage" << endl; return IndexError_Runtime; } @@ -549,12 +549,12 @@ int build_by_streamer(IndexStreamer::Pointer &streamer, } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open"; + cerr << "Storage Failed to open" << endl; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage"; + cerr << "Failed to open storage" << endl; return IndexError_Runtime; } diff --git a/tools/core/recall.cc b/tools/core/recall.cc index 75d55ba4..7fa0a9bc 100644 --- a/tools/core/recall.cc +++ b/tools/core/recall.cc @@ -1478,27 +1478,6 @@ void usage(void) { cout << "Usage: recall CONFIG.yaml [plugin file path]" << endl; } -bool load_index(core_interface::Index::Pointer index, string &index_dir, - std::vector> &id_to_tags_list, - std::vector &tag_key_list) { - core_interface::StorageOptions storage_options; - storage_options.type = core_interface::StorageOptions::StorageType::kMMAP; - storage_options.create_new = false; - storage_options.read_only = true; - - int ret = index->Open(index_dir, storage_options); - if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; - return false; - } - - // Load tag lists if available - load_taglists(index_dir, id_to_tags_list, tag_key_list); - - cout << "Load index done!" << endl; - return true; -}; - int recall_dense(std::string &query_type, size_t thread_count, size_t batch_count, string top_k, size_t gt_count, string query_file, string &first_sep, string &second_sep, @@ -1612,7 +1591,10 @@ int recall_sparse(std::string &query_type, size_t thread_count, std::vector> id_to_tags_list; std::vector tag_key_list; // Load tag lists if available - load_taglists(index_dir, id_to_tags_list, tag_key_list); + if (load_taglists(index_dir, id_to_tags_list, tag_key_list) != 0) { + cerr << "Failed to load tag lists" << endl; + return -1; + } recall.set_tag_lists(id_to_tags_list, tag_key_list); @@ -1638,12 +1620,12 @@ int get_recall_precision(string &recall_precision_string) { } catch (const std::invalid_argument &e) { std::cerr << "Exeception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return false; + return -1; } catch (const std::out_of_range &e) { std::cerr << "Out of range exception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return false; + return -1; } return true;