diff --git a/src/core/framework/index_mapping.cc b/src/core/framework/index_mapping.cc index 41475d0e..b27ecef3 100644 --- a/src/core/framework/index_mapping.cc +++ b/src/core/framework/index_mapping.cc @@ -82,6 +82,7 @@ static inline int UnpackMappingSize(ailego::File &file, size_t *len) { } int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { + path_ = path; full_mode_ = full_mode; copy_on_write_ = cow; huge_page_ = Ishugetlbfs(path); @@ -108,7 +109,12 @@ int IndexMapping::open(const std::string &path, bool cow, bool full_mode) { return this->init_index_mapping(mapping_size); } -int IndexMapping::create(const std::string &path, size_t segs_size) { +int IndexMapping::create(const std::string &path, size_t seg_meta_capacity) { + path_ = path; + seg_meta_capacity_ = seg_meta_capacity; + current_header_start_offset_ = 0; + + // write() & copying to mmap() will auto extend the file size if (!file_.create(path.c_str(), 0)) { LOG_ERROR("Failed to create file %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -116,17 +122,34 @@ int IndexMapping::create(const std::string &path, size_t segs_size) { } huge_page_ = Ishugetlbfs(path); if (huge_page_) { - return create_hugepage(segs_size); + return init_hugepage_meta_section(); } - size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - false); + return init_meta_section(); +} + +int IndexMapping::init_meta_section() { + if (current_header_start_offset_ % ailego::MemoryHelper::PageSize() != 0) { + LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", + current_header_start_offset_, ailego::MemoryHelper::PageSize()); + return IndexError_InvalidValue; + } + + auto &path = path_; + size_t len = + CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + false); IndexFormat::MetaHeader meta_header; IndexFormat::MetaFooter meta_footer; // Write index header IndexFormat::SetupMetaHeader(&meta_header, len - sizeof(meta_footer), len); + if (!file_.seek(current_header_start_offset_, ailego::File::Origin::Begin)) { + LOG_ERROR("Failed to seek file %s, errno %d, %s", path.c_str(), errno, + std::strerror(errno)); + return IndexError_SeekFile; + } if (file_.write(&meta_header, sizeof(meta_header)) != sizeof(meta_header)) { LOG_ERROR("Failed to write file: %s, errno %d, %s", path.c_str(), errno, std::strerror(errno)); @@ -155,11 +178,18 @@ int IndexMapping::create(const std::string &path, size_t segs_size) { return this->init_index_mapping(len); } -int IndexMapping::create_hugepage(size_t segs_size) { - size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) + - sizeof(IndexFormat::MetaFooter), - true); - int file_offset = 0; +int IndexMapping::init_hugepage_meta_section() { + ssize_t file_offset = (ssize_t)current_header_start_offset_; + if (file_offset % ailego::MemoryHelper::HugePageSize() != 0) { + LOG_ERROR("File offset %zu is not a multiple of the page size: %zu", + file_offset, ailego::MemoryHelper::HugePageSize()); + return IndexError_InvalidValue; + } + + size_t len = + CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter), + true); int opts = ailego::File::MMAP_SHARED | ailego::File::MMAP_HUGE_PAGE; void *addr = ailego::File::MemoryMap(file_.native_handle(), file_offset, len, opts); @@ -194,6 +224,7 @@ int IndexMapping::create_hugepage(size_t segs_size) { IndexFormat::UpdateMetaFooter(&meta_footer, 0); memcpy((char *)addr + file_offset, &meta_footer, sizeof(meta_footer)); file_offset += sizeof(meta_footer); + return this->init_index_mapping(len); } @@ -201,12 +232,16 @@ void IndexMapping::close(void) { // Unmap all memory this->unmap_all(); if (header_) { - ailego::File::MemoryUnmap(header_, header_->content_offset); + for (auto item : header_addr_map_) { + auto header = item.second; + ailego::File::MemoryUnmap(header, header->content_offset); + } } // Reset members segment_ids_offset_ = 0; segment_start_ = nullptr; header_ = nullptr; + header_addr_map_.clear(); footer_ = nullptr; index_size_ = 0u; segments_.clear(); @@ -218,9 +253,19 @@ void IndexMapping::close(void) { } void IndexMapping::refresh(uint64_t check_point) { - footer_->segments_meta_crc = - ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); - IndexFormat::UpdateMetaFooter(footer_, check_point); + // support add_with_id + for (auto item : header_addr_map_) { + auto header_start_offset = item.first; + auto header = item.second; + auto footer = reinterpret_cast( + reinterpret_cast(header) + header->meta_footer_offset); + auto segment_start = reinterpret_cast( + reinterpret_cast(header) + + (header->meta_footer_offset - footer->segments_meta_size)); + footer->segments_meta_crc = + ailego::Crc32c::Hash(segment_start, footer->segments_meta_size, 0); + IndexFormat::UpdateMetaFooter(footer, check_point); + } header_dirty_ = true; } @@ -238,7 +283,21 @@ int IndexMapping::append(const std::string &id, size_t size) { size_t need_size = sizeof(IndexFormat::SegmentMeta) + id_size; if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count + need_size > segment_ids_offset_) { - return IndexError_NoBuffer; + LOG_DEBUG("segment meta section expanded: %s", path_.c_str()); + footer_->next_meta_header_offset = index_size_; + refresh(0); + flush(); + // mmap file storage write() will update segment's meta + // ailego::File::MemoryUnmap(header_, header_->content_offset); + header_ = nullptr; + footer_ = nullptr; + + current_header_start_offset_ = index_size_; + const int ret = + huge_page_ ? init_hugepage_meta_section() : init_meta_section(); + if (ret != 0) { + return ret; + } } if (!copy_on_write_ && !file_.truncate(index_size_ + size)) { @@ -251,7 +310,8 @@ int IndexMapping::append(const std::string &id, size_t size) { segment_ids_offset_ -= static_cast(id_size); IndexFormat::SegmentMeta *segment = segment_start_ + footer_->segment_count; segment->segment_id_offset = segment_ids_offset_; - segment->data_index = index_size_ - header_->content_offset; + segment->data_index = + index_size_ - header_->content_offset - current_header_start_offset_; segment->data_size = 0; segment->data_crc = 0; segment->padding_size = size; @@ -265,7 +325,8 @@ int IndexMapping::append(const std::string &id, size_t size) { footer_->content_size += size; footer_->total_size += size; IndexFormat::UpdateMetaFooter(footer_, 0); - segments_.emplace(id, segment); + segments_.emplace( + id, SegmentInfo{Segment{segment}, current_header_start_offset_, header_}); header_dirty_ = true; return 0; } @@ -276,11 +337,14 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup, if (iter == segments_.end()) { return nullptr; } - Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (!item->data()) { auto meta = item->meta(); size_t mapping_size = meta->data_size + meta->padding_size; - size_t offset = meta->data_index + header_->content_offset; + size_t offset = segment_info.segment_header_start_offset + + segment_info.segment_header->content_offset + + meta->data_index; void *addr = nullptr; if (!copy_on_write_) { @@ -326,7 +390,8 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup, void IndexMapping::unmap(const std::string &id) { auto iter = segments_.find(id); if (iter != segments_.end()) { - Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (item->data()) { ailego::File::MemoryUnmap( @@ -338,7 +403,8 @@ void IndexMapping::unmap(const std::string &id) { void IndexMapping::unmap_all(void) { for (auto iter = segments_.begin(); iter != segments_.end(); ++iter) { - Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (item->data()) { ailego::File::MemoryUnmap( @@ -356,14 +422,17 @@ int IndexMapping::flush(void) { } for (auto iter = segments_.begin(); iter != segments_.end(); ++iter) { - const Segment *item = &iter->second; + SegmentInfo &segment_info = iter->second; + Segment *item = &segment_info.segment; if (!item->data() || !item->dirty()) { continue; } size_t segment_size = item->meta()->data_size + item->meta()->padding_size; if (full_mode_ && copy_on_write_) { - size_t off = header_->content_offset + item->meta()->data_index; + size_t off = segment_info.segment_header_start_offset + + segment_info.segment_header->content_offset + + item->meta()->data_index; if (file_.write(off, item->data(), segment_size) != segment_size) { LOG_ERROR("Failed to write segment, size %zu, errno %d, %s", segment_size, errno, std::strerror(errno)); @@ -381,14 +450,21 @@ int IndexMapping::flush(void) { header_dirty_ = false; if (full_mode_ && copy_on_write_) { - if (file_.write(0, header_, header_->content_offset) != - header_->content_offset) { - LOG_ERROR("Failed to write segment, size %u, errno %d, %s", - header_->content_offset, errno, std::strerror(errno)); - return IndexError_WriteData; + for (auto item : header_addr_map_) { + auto header_start_offset = item.first; + auto header = item.second; + if (file_.write(header_start_offset, header, header->content_offset) != + header->content_offset) { + LOG_ERROR("Failed to write segment, size %lu, errno %d, %s", + header->content_offset, errno, std::strerror(errno)); + return IndexError_WriteData; + } } } else { - ailego::File::MemoryFlush(header_, header_->content_offset); + for (auto item : header_addr_map_) { + auto header = item.second; + ailego::File::MemoryFlush(header, header->content_offset); + } } return 0; } @@ -399,8 +475,8 @@ int IndexMapping::init_index_mapping(size_t len) { if (huge_page_) { opts |= ailego::File::MMAP_HUGE_PAGE; } - uint8_t *start = reinterpret_cast( - ailego::File::MemoryMap(file_.native_handle(), 0, len, opts)); + uint8_t *start = reinterpret_cast(ailego::File::MemoryMap( + file_.native_handle(), current_header_start_offset_, len, opts)); if (!start) { LOG_ERROR("Failed to map file, errno %d, %s", errno, std::strerror(errno)); return IndexError_MMapFile; @@ -408,6 +484,7 @@ int IndexMapping::init_index_mapping(size_t len) { // Unpack header header_ = reinterpret_cast(start); + header_addr_map_.insert({current_header_start_offset_, header_}); if (header_->meta_header_size != sizeof(IndexFormat::MetaHeader)) { return IndexError_InvalidLength; } @@ -416,6 +493,14 @@ int IndexMapping::init_index_mapping(size_t len) { return IndexError_InvalidChecksum; } + switch (header_->version) { + case IndexFormat::FORMAT_VERSION: + break; + default: + LOG_ERROR("Unsupported index version: %u", header_->version); + return IndexError_Unsupported; + } + // Unpack footer if (header_->meta_footer_size != sizeof(IndexFormat::MetaFooter)) { return IndexError_InvalidLength; @@ -479,12 +564,30 @@ int IndexMapping::init_index_mapping(size_t len) { segments_.emplace( std::string(reinterpret_cast(segment_start_) + iter->segment_id_offset), - iter); + SegmentInfo{Segment{iter}, current_header_start_offset_, header_}); } if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count > segment_ids_offset_) { return IndexError_InvalidLength; } + + // if (header_->version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) { + // header_->version = IndexFormat::CURRENT_FORMAT_VERSION; + // LOG_INFO("Index file format upgraded"); + // IndexFormat::UpdateMetaHeader(header_); + // footer_->segments_meta_crc = + // ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0); + // IndexFormat::UpdateMetaFooter(footer_, 0); + // header_dirty_ = true; + // } + + if (footer_->next_meta_header_offset > 0) { + current_header_start_offset_ = footer_->next_meta_header_offset; + // Meta sections have all the same size, so we can use the same size to map + // the next meta section + return this->init_index_mapping(len); + } + return 0; } diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 4ac3c6b3..bccf07e2 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -29,21 +29,24 @@ class BufferStorage : public IndexStorage { public: /*! Index Storage Segment */ - class Segment : public IndexStorage::Segment, - public std::enable_shared_from_this { + class WrappedSegment : public IndexStorage::Segment, + public std::enable_shared_from_this { public: //! Index Storage Pointer typedef std::shared_ptr Pointer; //! Constructor - Segment(BufferStorage *owner, IndexMapping::Segment *segment) + WrappedSegment(BufferStorage *owner, IndexMapping::Segment *segment, + uint64_t segment_header_start_offset, + IndexFormat::MetaHeader *segment_header) : segment_(segment), owner_(owner), capacity_(static_cast(segment->meta()->data_size + - segment->meta()->padding_size)) {} - + segment->meta()->padding_size)), + segment_header_start_offset_(segment_header_start_offset), + segment_header_(segment_header) {} //! Destructor - virtual ~Segment(void) {} + virtual ~WrappedSegment(void) {} //! Retrieve size of data size_t data_size(void) const override { @@ -90,8 +93,9 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = - segment_->meta()->data_index + owner_->get_context_offset() + offset; + size_t buffer_offset = segment_header_start_offset_ + + segment_header_->content_offset + + segment_->meta()->data_index + offset; ailego::BufferHandle buffer_handle = owner_->get_buffer_handle(buffer_offset, len); *data = buffer_handle.pin_vector_data(); @@ -106,8 +110,9 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = - segment_->meta()->data_index + owner_->get_context_offset() + offset; + size_t buffer_offset = segment_header_start_offset_ + + segment_header_->content_offset + + segment_->meta()->data_index + offset; data.reset(owner_->get_buffer_handle_ptr(buffer_offset, len)); if (data.data()) { return len; @@ -139,10 +144,15 @@ class BufferStorage : public IndexStorage { return shared_from_this(); } - private: + protected: + friend BufferStorage; IndexMapping::Segment *segment_{}; + + private: BufferStorage *owner_{nullptr}; size_t capacity_{}; + uint64_t segment_header_start_offset_; + IndexFormat::MetaHeader *segment_header_; }; //! Destructor @@ -248,7 +258,8 @@ class BufferStorage : public IndexStorage { segments_.emplace( std::string(reinterpret_cast(segment_start) + iter->segment_id_offset), - iter); + IndexMapping::SegmentInfo{IndexMapping::Segment{iter}, + current_header_start_offset_, &header_}); if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > footer_.segments_meta_size) { return IndexError_InvalidLength; @@ -258,25 +269,58 @@ class BufferStorage : public IndexStorage { } int ParseToMapping() { - ParseHeader(0); + while (true) { + int ret; + ret = ParseHeader(current_header_start_offset_); + if (ret != 0) { + LOG_ERROR("Failed to parse header, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } - // Unpack footer - if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { - return IndexError_InvalidLength; - } - if ((int32_t)header_.meta_footer_offset < 0) { - return IndexError_Unsupported; - } - size_t footer_offset = header_.meta_footer_offset; - ParseFooter(footer_offset); + switch (header_.version) { + case IndexFormat::FORMAT_VERSION: + break; + default: + LOG_ERROR("Unsupported index version: %u", header_.version); + return IndexError_Unsupported; + } - // Unpack segment table - if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > - footer_.segments_meta_size) { - return IndexError_InvalidLength; + // Unpack footer + if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { + return IndexError_InvalidLength; + } + if ((int32_t)header_.meta_footer_offset < 0) { + return IndexError_Unsupported; + } + uint64_t footer_offset = + header_.meta_footer_offset + current_header_start_offset_; + ret = ParseFooter(footer_offset); + if (ret != 0) { + LOG_ERROR("Failed to parse footer, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } + + // Unpack segment table + if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > + footer_.segments_meta_size) { + return IndexError_InvalidLength; + } + const uint64_t segment_start_offset = + footer_offset - footer_.segments_meta_size; + ret = ParseSegment(segment_start_offset); + if (ret != 0) { + LOG_ERROR("Failed to parse segment, errno %d, %s", ret, + IndexError::What(ret)); + return ret; + } + + if (footer_.next_meta_header_offset == 0) { + break; + } + current_header_start_offset_ = footer_.next_meta_header_offset; } - const int segment_start_offset = footer_offset - footer_.segments_meta_size; - ParseSegment(segment_start_offset); return 0; } @@ -308,11 +352,13 @@ class BufferStorage : public IndexStorage { //! Retrieve a segment by id IndexStorage::Segment::Pointer get(const std::string &id, int) override { - IndexMapping::Segment *segment = this->get_segment(id); - if (!segment) { - return BufferStorage::Segment::Pointer(); + auto segment_info = this->get_segment_info(id); + if (!segment_info) { + return WrappedSegment::Pointer{}; } - return std::make_shared(this, segment); + return std::make_shared( + this, &segment_info->segment, segment_info->segment_header_start_offset, + segment_info->segment_header); } //! Test if it a segment exists @@ -325,10 +371,6 @@ class BufferStorage : public IndexStorage { return header_.magic; } - uint32_t get_context_offset() { - return header_.content_offset; - } - protected: //! Initialize index version segment int init_version_segment(void) { @@ -339,7 +381,7 @@ class BufferStorage : public IndexStorage { return error_code; } - IndexMapping::Segment *segment = get_segment(INDEX_VERSION_SEGMENT_NAME); + auto segment = &get_segment_info(INDEX_VERSION_SEGMENT_NAME)->segment; if (!segment) { return IndexError_MMapFile; } @@ -408,14 +450,13 @@ class BufferStorage : public IndexStorage { } //! Get a segment from storage - IndexMapping::Segment *get_segment(const std::string &id) { + IndexMapping::SegmentInfo *get_segment_info(const std::string &id) { std::lock_guard latch(mapping_mutex_); auto iter = segments_.find(id); if (iter == segments_.end()) { return nullptr; } - IndexMapping::Segment *item = &iter->second; - return item; + return &iter->second; } private: @@ -431,9 +472,10 @@ class BufferStorage : public IndexStorage { // buffer manager std::string file_name_; - IndexFormat::MetaHeader header_; - IndexFormat::MetaFooter footer_; - std::map segments_{}; + IndexFormat::MetaHeader header_{}; + IndexFormat::MetaFooter footer_{}; + std::map segments_{}; + uint64_t current_header_start_offset_{0u}; }; INDEX_FACTORY_REGISTER_STORAGE(BufferStorage); diff --git a/src/include/zvec/core/framework/index_format.h b/src/include/zvec/core/framework/index_format.h index d3b7179c..498bb404 100644 --- a/src/include/zvec/core/framework/index_format.h +++ b/src/include/zvec/core/framework/index_format.h @@ -40,8 +40,7 @@ struct IndexFormat { uint16_t meta_header_size; uint16_t meta_footer_size; uint32_t meta_footer_offset; - uint32_t content_offset; - uint32_t reserved2_; + uint64_t content_offset; uint64_t setup_time; uint64_t reserved3_[3]; }; @@ -56,13 +55,17 @@ struct IndexFormat { uint32_t segments_meta_crc; uint32_t content_crc; uint32_t segment_count; + // meta section size uint32_t segments_meta_size; uint32_t reserved1_; + // segments' data section size uint64_t content_size; uint64_t content_padding_size; + uint64_t check_point; uint64_t update_time; - uint64_t reserved2_[8]; + uint64_t reserved2_[7]; + uint64_t next_meta_header_offset; uint64_t total_size; }; @@ -73,6 +76,7 @@ struct IndexFormat { */ struct SegmentMeta { uint32_t segment_id_offset; + // used only by immutable segments, e.g., IndexMeta, or searcher uint32_t data_crc; uint64_t data_index; uint64_t data_size; @@ -156,6 +160,11 @@ struct IndexFormat { header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); } + static void UpdateMetaHeader(MetaHeader *header) { + header->header_crc = 0; + header->header_crc = ailego::Crc32c::Hash(header, sizeof(MetaHeader), 0); + } + //! Setup meta footer structure static void SetupMetaFooter(MetaFooter *footer) { memset(footer, 0, sizeof(MetaFooter)); diff --git a/src/include/zvec/core/framework/index_mapping.h b/src/include/zvec/core/framework/index_mapping.h index 7074088c..d2b98095 100644 --- a/src/include/zvec/core/framework/index_mapping.h +++ b/src/include/zvec/core/framework/index_mapping.h @@ -82,6 +82,12 @@ class IndexMapping { mutable bool dirty_{false}; }; + struct SegmentInfo { + Segment segment; + uint64_t segment_header_start_offset; + IndexFormat::MetaHeader *segment_header; + }; + //! Constructor IndexMapping(void) {} @@ -120,8 +126,6 @@ class IndexMapping { //! Create a index file int create(const std::string &path, size_t segs_size); - int create_hugepage(size_t segs_size); - //! Close the index void close(void); @@ -183,6 +187,9 @@ class IndexMapping { bool Ishugetlbfs(const std::string &path) const; + int init_meta_section(); + int init_hugepage_meta_section(); + private: //! Disable them IndexMapping(const IndexMapping &) = delete; @@ -192,15 +199,19 @@ class IndexMapping { uint32_t segment_ids_offset_{0}; IndexFormat::SegmentMeta *segment_start_{nullptr}; IndexFormat::MetaHeader *header_{nullptr}; + std::map header_addr_map_{}; IndexFormat::MetaFooter *footer_{nullptr}; - std::map segments_{}; + std::map segments_{}; size_t index_size_{0u}; ailego::File file_{}; + std::string path_; bool copy_on_write_{false}; bool full_mode_{false}; bool header_dirty_{false}; bool huge_page_{false}; + size_t seg_meta_capacity_{0u}; + uint64_t current_header_start_offset_{0u}; }; } // namespace core -} // namespace zvec +} // namespace zvec \ No newline at end of file diff --git a/src/include/zvec/core/framework/index_unpacker.h b/src/include/zvec/core/framework/index_unpacker.h index 425ad12d..03e091e7 100644 --- a/src/include/zvec/core/framework/index_unpacker.h +++ b/src/include/zvec/core/framework/index_unpacker.h @@ -107,22 +107,29 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); - if (!this->unpack_header(read_data)) { - LOG_ERROR("Failed to unpack index header"); - return false; - } - if (!this->unpack_footer(read_data, total)) { - LOG_ERROR("Failed to unpack index footer"); - return false; - } - if (!this->unpack_segments(read_data, total)) { - LOG_ERROR("Failed to unpack index segments' meta"); - return false; - } - if (checksum && !this->validate_checksum(read_data)) { - LOG_ERROR("Failed to validate checksum of index content"); - return false; + while (true) { + if (!this->unpack_header(read_data)) { + LOG_ERROR("Failed to unpack index header"); + return false; + } + if (!this->unpack_footer(read_data, total)) { + LOG_ERROR("Failed to unpack index footer"); + return false; + } + if (!this->unpack_segments(read_data, total)) { + LOG_ERROR("Failed to unpack index segments' meta"); + return false; + } + if (checksum && !this->validate_checksum(read_data)) { + LOG_ERROR("Failed to validate checksum of index content"); + return false; + } + if (footer_.next_meta_header_offset == 0) { + break; + } + current_header_start_offset_ = footer_.next_meta_header_offset; } + if (!this->unpack_version(read_data)) { LOG_ERROR("Failed to unpack index version"); return false; @@ -137,7 +144,8 @@ class IndexUnpacker { const void **, size_t>::value, "Invocable function type"); const void *data = nullptr; - if (read_data(0u, &data, sizeof(header_)) != sizeof(header_)) { + if (read_data(current_header_start_offset_, &data, sizeof(header_)) != + sizeof(header_)) { return false; } @@ -170,15 +178,15 @@ class IndexUnpacker { } const void *data = nullptr; - if (read_data(footer_offset, &data, sizeof(footer_)) != sizeof(footer_)) { + if (read_data(current_header_start_offset_ + footer_offset, &data, + sizeof(footer_)) != sizeof(footer_)) { return false; } memcpy(&footer_, data, sizeof(footer_)); - if ((footer_.total_size != total) || - (footer_.content_size + footer_.content_padding_size + - header_.content_offset > - total)) { + if (footer_.content_size + footer_.content_padding_size + + header_.content_offset > + footer_.total_size) { return false; } if (ailego::Crc32c::Hash(&footer_, sizeof(footer_), footer_.footer_crc) != @@ -207,8 +215,8 @@ class IndexUnpacker { offset -= footer_.segments_meta_size; const void *data = nullptr; - if (read_data(offset, &data, footer_.segments_meta_size) != - footer_.segments_meta_size) { + if (read_data(current_header_start_offset_ + offset, &data, + footer_.segments_meta_size) != footer_.segments_meta_size) { return false; } if (ailego::Crc32c::Hash(data, footer_.segments_meta_size, 0u) != @@ -230,8 +238,9 @@ class IndexUnpacker { segments_.emplace( std::string(reinterpret_cast(data) + seg->segment_id_offset), - SegmentMeta(seg->data_index + header_.content_offset, seg->data_size, - seg->padding_size, seg->data_crc)); + SegmentMeta(seg->data_index + header_.content_offset + + current_header_start_offset_, + seg->data_size, seg->padding_size, seg->data_crc)); } return true; } @@ -251,7 +260,7 @@ class IndexUnpacker { const SegmentMeta &segment = it->second; const void *data = nullptr; - if (read_data(segment.data_offset(), &data, segment.data_size()) != + if (read_data(0 + segment.data_offset(), &data, segment.data_size()) != segment.data_size()) { return false; } @@ -280,14 +289,16 @@ class IndexUnpacker { size_t offset = sizeof(header_); while (total >= block_size) { - if (read_data(offset, &data, block_size) != block_size) { + if (read_data(current_header_start_offset_ + offset, &data, block_size) != + block_size) { return false; } checksum = ailego::Crc32c::Hash(data, block_size, checksum); total -= block_size; offset += block_size; } - if (read_data(offset, &data, total) != total) { + if (read_data(current_header_start_offset_ + offset, &data, total) != + total) { return false; } checksum = ailego::Crc32c::Hash(data, total, checksum); @@ -295,10 +306,11 @@ class IndexUnpacker { } private: - IndexFormat::MetaHeader header_; - IndexFormat::MetaFooter footer_; + IndexFormat::MetaHeader header_{}; + IndexFormat::MetaFooter footer_{}; std::string version_{}; std::map segments_{}; + uint64_t current_header_start_offset_{0u}; }; } // namespace core diff --git a/tools/core/helper.h b/tools/core/helper.h index c24c89ff..037958e5 100644 --- a/tools/core/helper.h +++ b/tools/core/helper.h @@ -72,7 +72,7 @@ int parse_and_load_index_param( index_config.as()); index = core_interface::IndexFactory::CreateAndInitIndex(*params); if (!index) { - cerr << "Failed to create index" << endl; + LOG_ERROR("Failed to create index"); return -1; } core_interface::StorageOptions storage_options; @@ -82,13 +82,13 @@ int parse_and_load_index_param( int ret = index->Open(index_dir, storage_options); if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; - return false; + LOG_ERROR("Index open failed with ret %d", ret); + return -1; } cout << "Load index done!" << endl; } else { - cerr << "IndexCommon.IndexConfig is required" << endl; + LOG_ERROR("IndexCommon.IndexConfig is required"); return -1; } @@ -114,7 +114,7 @@ int parse_and_load_index_param( core_interface::BaseIndexQueryParam>( query_param_config.as()); if (!query_param) { - cerr << "Failed to deserialize query params" << endl; + LOG_ERROR("Failed to deserialize query params"); return -1; } } @@ -130,8 +130,7 @@ int parse_and_load_index_param( auto scale_factor = scale_factor_config.as(); refiner_param->scale_factor_ = scale_factor; } else { - cerr << "QueryConfig.RefinerConfig.ScaleFactor config is required" - << endl; + LOG_ERROR("QueryConfig.RefinerConfig.ScaleFactor config is required"); return -1; } @@ -149,9 +148,9 @@ int parse_and_load_index_param( reference_index = core_interface::IndexFactory::CreateAndInitIndex(*params); } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " - "required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex.Config config is " + "required"); return -1; } @@ -169,20 +168,20 @@ int parse_and_load_index_param( int ret = reference_index->Open(reference_index_path, storage_options); if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; + LOG_ERROR("Index open failed with ret %d", ret); return -1; } cout << "Load reference index done!" << endl; } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex.Path is required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex.Path is required"); return -1; } refiner_param->reference_index = reference_index; } else { - cerr << "QueryConfig.RefinerConfig.ReferenceIndex section is required" - << endl; + LOG_ERROR( + "QueryConfig.RefinerConfig.ReferenceIndex section is required"); return -1; } // QueryConfig.RefinerConfig.ReferenceIndex diff --git a/tools/core/local_builder.cc b/tools/core/local_builder.cc index 9efc6b4a..f6d80510 100644 --- a/tools/core/local_builder.cc +++ b/tools/core/local_builder.cc @@ -157,9 +157,9 @@ static inline size_t AlignSize(size_t size) { return (size + 0x1F) & (~0x1F); } -int64_t dump_meta_segment(const IndexDumper::Pointer &dumper, - const std::string &segment_id, const void *data, - size_t size, size_t &writes) { +bool dump_meta_segment(const IndexDumper::Pointer &dumper, + const std::string &segment_id, const void *data, + size_t size, size_t &writes) { size_t len = dumper->write(data, size); if (len != size) { LOG_ERROR("Dump segment %s data failed, expect: %lu, actual: %lu", @@ -373,17 +373,17 @@ int build_sparse_by_streamer(IndexStreamer::Pointer &streamer, ailego::Params params; int ret = storage->init(params); if (ret != 0) { - cerr << "Storage Failed init"; + cerr << "Storage Failed init" << endl; return IndexError_Runtime; } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open"; + cerr << "Storage Failed to open" << endl; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage"; + cerr << "Failed to open storage" << endl; return IndexError_Runtime; } @@ -542,8 +542,6 @@ int build_by_streamer(IndexStreamer::Pointer &streamer, return IndexError_NoExist; } ailego::Params params; - params.set("proxima.mmap_file.storage.segment_meta_capacity", - 20 * 1024 * 1024); int ret = storage->init(params); if (ret != 0) { cerr << "Storage Failed init"; @@ -551,12 +549,12 @@ int build_by_streamer(IndexStreamer::Pointer &streamer, } ret = storage->open(path, true); if (ret != 0) { - cerr << "Storage Failed to open"; + cerr << "Storage Failed to open" << endl; return IndexError_Runtime; } ret = streamer->open(storage); if (ret != 0) { - cerr << "Failed to open storage"; + cerr << "Failed to open storage" << endl; return IndexError_Runtime; } diff --git a/tools/core/recall.cc b/tools/core/recall.cc index 75d55ba4..7fa0a9bc 100644 --- a/tools/core/recall.cc +++ b/tools/core/recall.cc @@ -1478,27 +1478,6 @@ void usage(void) { cout << "Usage: recall CONFIG.yaml [plugin file path]" << endl; } -bool load_index(core_interface::Index::Pointer index, string &index_dir, - std::vector> &id_to_tags_list, - std::vector &tag_key_list) { - core_interface::StorageOptions storage_options; - storage_options.type = core_interface::StorageOptions::StorageType::kMMAP; - storage_options.create_new = false; - storage_options.read_only = true; - - int ret = index->Open(index_dir, storage_options); - if (0 != ret) { - cerr << "Index open failed with ret " << ret << endl; - return false; - } - - // Load tag lists if available - load_taglists(index_dir, id_to_tags_list, tag_key_list); - - cout << "Load index done!" << endl; - return true; -}; - int recall_dense(std::string &query_type, size_t thread_count, size_t batch_count, string top_k, size_t gt_count, string query_file, string &first_sep, string &second_sep, @@ -1612,7 +1591,10 @@ int recall_sparse(std::string &query_type, size_t thread_count, std::vector> id_to_tags_list; std::vector tag_key_list; // Load tag lists if available - load_taglists(index_dir, id_to_tags_list, tag_key_list); + if (load_taglists(index_dir, id_to_tags_list, tag_key_list) != 0) { + cerr << "Failed to load tag lists" << endl; + return -1; + } recall.set_tag_lists(id_to_tags_list, tag_key_list); @@ -1638,12 +1620,12 @@ int get_recall_precision(string &recall_precision_string) { } catch (const std::invalid_argument &e) { std::cerr << "Exeception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return false; + return -1; } catch (const std::out_of_range &e) { std::cerr << "Out of range exception in getting recall precision: " << e.what() << ", value: " << recall_precision_string << std::endl; - return false; + return -1; } return true;