diff --git a/src/core/search/hnsw_index.cc b/src/core/search/hnsw_index.cc index 0e444e7812a3..4bdc36e1b1d9 100644 --- a/src/core/search/hnsw_index.cc +++ b/src/core/search/hnsw_index.cc @@ -69,12 +69,16 @@ class HnswSpace : public hnswlib::SpaceInterface { struct HnswlibAdapter { // Default setting of hnswlib/hnswalg constexpr static size_t kDefaultEfRuntime = 10; + constexpr static size_t kSeed = 100; explicit HnswlibAdapter(const SchemaField::VectorParams& params, bool copy_vector) : space_{params.dim, params.sim}, - world_{&space_, params.capacity, params.hnsw_m, params.hnsw_ef_construction, - 100 /* seed*/, copy_vector}, + world_{&space_, params.capacity, params.hnsw_m, params.hnsw_ef_construction, + kSeed, copy_vector}, copy_vector_{copy_vector}, + capacity_{params.capacity}, + M_{params.hnsw_m}, + ef_construction_{params.hnsw_ef_construction}, data_size_{params.dim * sizeof(float)}, stub_vector_(data_size_ / sizeof(float), 1.0f) { } @@ -181,6 +185,15 @@ struct HnswlibAdapter { } private: + // Discard world_ and reconstruct it with the original ctor parameters — used to + // recover from a partially-applied RestoreFromNodes when the wire-ordering + // invariant is violated. Must be called under the write lock. + void Reset() { + world_.~HierarchicalNSW(); + new (&world_) + HierarchicalNSW(&space_, capacity_, M_, ef_construction_, kSeed, copy_vector_); + } + // Actually add the point. Must be called while holding mrmw write lock. void DoAdd(const void* data, GlobalDocId id) { while (true) { @@ -274,35 +287,41 @@ struct HnswlibAdapter { DCHECK_EQ(world_.cur_element_count.load(), 0u) << "RestoreFromNodes should only be called on an empty index during deserialization"; - // hnswlib pairs enterpoint_node_ with maxlevel_; node levels are immutable after - // creation, so the entry point's level in the serialized set equals the live - // maxlevel at metadata capture. max(node.level) would risk OOB reads when a - // concurrent Add raised maxlevel between capture and node serialization. - size_t max_internal_id = 0; - int entrypoint_level = -1; - for (const auto& node : nodes) { - max_internal_id = std::max(max_internal_id, node.internal_id); - if (node.internal_id == metadata.enterpoint_node) - entrypoint_level = node.level; - } - if (entrypoint_level < 0) { + // Wire-ordering invariant: GetNodesRange writes nodes by ascending internal_id + // 0..count-1 under the saver's read lock, and the loader reads them sequentially + // (LoadVectorIndexNodes), so nodes[i].internal_id == i and nodes.size() is the + // capacity we need. Verify the entry-point in O(1) and read its level directly — + // by the hnswlib invariant it equals world_.maxlevel_ at save time. + if (metadata.enterpoint_node >= nodes.size()) { LOG(ERROR) << "HNSW restore: entry point internal_id=" << metadata.enterpoint_node - << " not present in serialized node set (" << nodes.size() + << " out of range (" << nodes.size() << " nodes); skipping restore — index will be rebuilt from the keyspace"; return false; } - if (world_.max_elements_ < max_internal_id + 1) { - world_.resizeIndex(max_internal_id + 1); + int entrypoint_level = nodes[metadata.enterpoint_node].level; + if (world_.max_elements_ < nodes.size()) { + world_.resizeIndex(nodes.size()); } - // Restore each node - directly set up memory and fields + // Restore each node - directly set up memory and fields. We also enforce the + // wire-ordering invariant (nodes[i].internal_id == i) inline: if a corrupted or + // future-format wire violates it we bail out cleanly so the index is rebuilt from + // the keyspace instead of writing past the resized memory. On failure Reset() + // discards world_ entirely (calling its destructor) and reconstructs it with the + // original ctor params — this leaves the index indistinguishable from a freshly + // created empty graph regardless of what internal state hnswlib accumulates. size_t restored_count = 0; - for (const auto& node : nodes) { - size_t internal_id = node.internal_id; - - // Validate internal_id is within bounds - invalid internal_id indicates corrupted data - CHECK(internal_id < world_.max_elements_); + for (size_t i = 0; i < nodes.size(); ++i) { + const auto& node = nodes[i]; + if (node.internal_id != i) { + LOG(ERROR) << "HNSW restore: wire ordering invariant violated at index " << i + << " (got internal_id=" << node.internal_id << "); index will be rebuilt " + << "from the keyspace"; + Reset(); + return false; + } + size_t internal_id = i; // Register label in lookup table world_.label_lookup_[node.global_id] = internal_id; @@ -424,6 +443,9 @@ struct HnswlibAdapter { mutable MRMWMutex mrmw_mutex_; bool copy_vector_; // Whether vectors are copied into hnswlib. + size_t capacity_; // Initial max_elements_ — used to reconstruct world_. + size_t M_; // hnsw_m — used to reconstruct world_. + size_t ef_construction_; // hnsw_ef_construction — used to reconstruct world_. size_t data_size_; // Byte size of a single vector. std::vector stub_vector_; // Non-zero data for deleted nodes in borrowed mode. }; diff --git a/src/core/search/hnsw_index.h b/src/core/search/hnsw_index.h index 55f817351be9..65920ed9d4f7 100644 --- a/src/core/search/hnsw_index.h +++ b/src/core/search/hnsw_index.h @@ -11,10 +11,10 @@ namespace dfly::search { -// Wire format for HNSW index AUX. Only the entry point is persisted: capacity is -// derived from max(internal_id)+1 in the node set and maxlevel from the entry-point -// node's level (hnswlib pairs enterpoint_node_ with maxlevel_, and node levels are -// immutable after creation). +// HNSW graph state needed at restore time. Capacity is derived from nodes.size() +// (internal_ids are contiguous 0..N-1 because hnswlib uses tombstones for deletes +// and GetNodesRange writes them in order); maxlevel is the entry-point node's +// level by hnswlib invariant, looked up in O(1) at restore. struct HnswIndexMetadata { size_t enterpoint_node = 0; }; diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index b79ad78a299c..a1e6f9d796c0 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -49,10 +49,11 @@ constexpr uint8_t RDB_OPCODE_DF_MASK = 220; /* Mask for key properties */ constexpr uint32_t DF_MASK_FLAG_STICKY = (1 << 0); constexpr uint32_t DF_MASK_FLAG_MC_FLAGS = (1 << 1); -// Opcode to store HNSW vector index node data for global indices -// Format: [index_name, elements_number, internal_id, global_id, level, zero_level_links_num, -// zero_level_links, -// higher_level_links_num (only if level > 0), higher_level_links (only if level > 0)] +// Opcode to store HNSW vector index node data for global indices. +// Format: [index_name, enterpoint_node, elements_number, +// then for each node in ascending internal_id 0..elements_number-1: +// internal_id, global_id, level, zero_level_links_num, zero_level_links, +// higher_level_links_num (only if level > 0), higher_level_links (only if level > 0)] constexpr uint8_t RDB_OPCODE_VECTOR_INDEX = 222; // Opcode to store ShardDocIndex key-to-DocId mapping for search indices diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index fe8b99b1be04..093a7ea85e63 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2702,8 +2702,6 @@ error_code RdbLoader::HandleAux() { /* Just ignored. */ } else if (auxkey == "search-index") { LoadSearchIndexDefFromAux(std::move(auxval)); - } else if (auxkey == "hnsw-index-metadata") { - LoadHnswIndexMetadataFromAux(std::move(auxval)); } else if (auxkey == "search-synonyms") { LoadSearchSynonymsFromAux(std::move(auxval)); } else if (auxkey == "shard-count") { @@ -3055,38 +3053,18 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition", true); } -void RdbLoader::LoadHnswIndexMetadataFromAux(string&& def) { - try { - auto json_opt = JsonFromString(def); - if (!json_opt) { - LOG(ERROR) << "Invalid HNSW index metadata JSON: " << def; - return; - } - const auto& json = *json_opt; - - PendingHnswMetadata phm; - phm.index_name = json["index_name"].as(); - phm.field_name = json["field_name"].as(); - phm.metadata.enterpoint_node = json["enterpoint_node"].as(); - - LOG(INFO) << "Loaded HNSW metadata for index=" << phm.index_name << " field=" << phm.field_name - << " enterpoint=" << phm.metadata.enterpoint_node; - - load_context_->AddPendingHnswMetadata(std::move(phm)); - } catch (const std::exception& e) { - LOG(ERROR) << "Failed to parse HNSW index metadata JSON: " << e.what() << " def: " << def; - } -} - error_code RdbLoader::HandleVectorIndex() { // HNSW vector index graph data. - // Binary format: [index_key, elements_number, - // then for each node (little-endian): + // Binary format: [index_key, enterpoint_node, elements_number, + // then for each node (little-endian, ascending internal_id 0..count-1): // internal_id (4 bytes), global_id (8 bytes), level (4 bytes), // for each level (0 to level): links_num (4 bytes) + links (4 bytes each)] string index_key; SET_OR_RETURN(FetchGenericString(), index_key); + search::HnswIndexMetadata metadata; + SET_OR_RETURN(LoadLen(nullptr), metadata.enterpoint_node); + uint64_t elements_number; SET_OR_RETURN(LoadLen(nullptr), elements_number); @@ -3104,12 +3082,12 @@ error_code RdbLoader::HandleVectorIndex() { if (shard_count_ == shard_set->size()) { // Same shard count: restore directly. - return RestoreVectorIndex(index_key, index_name, field_name, elements_number); + return RestoreVectorIndex(index_key, index_name, field_name, elements_number, metadata); } // Different shard count: load nodes and defer restoration. // Global_ids will be remapped in PerformPostLoad after all key mappings are collected. - PendingHnswNodes pending{std::string(index_name), std::string(field_name), {}}; + PendingHnswNodes pending{std::string(index_name), std::string(field_name), metadata, {}}; RETURN_ON_ERR(LoadVectorIndexNodes(elements_number, &pending.nodes)); LOG(INFO) << "Deferred HNSW index restore for " << index_key << " with " << pending.nodes.size() << " nodes (shard count mismatch: " << shard_count_ << " vs " << shard_set->size() @@ -3179,7 +3157,8 @@ error_code RdbLoader::LoadVectorIndexNodes(uint64_t elements_number, } error_code RdbLoader::RestoreVectorIndex(string_view index_key, string_view index_name, - string_view field_name, uint64_t elements_number) { + string_view field_name, uint64_t elements_number, + const search::HnswIndexMetadata& metadata) { #ifdef WITH_SEARCH // Look up the HNSW index in the global registry. It should exist from FT.CREATE in aux. auto hnsw_index = GlobalHnswIndexRegistry::Instance().Get(index_name, field_name); @@ -3194,13 +3173,7 @@ error_code RdbLoader::RestoreVectorIndex(string_view index_key, string_view inde if (nodes.empty()) return {}; - auto metadata = load_context_->FindHnswMetadata(index_name, field_name); - if (!metadata) { - LOG(ERROR) << "HNSW metadata missing for " << index_key - << "; skipping graph restore — index will be rebuilt from keyspace"; - return {}; - } - if (!hnsw_index->RestoreFromNodes(nodes, *metadata)) { + if (!hnsw_index->RestoreFromNodes(nodes, metadata)) { LOG(WARNING) << "HNSW graph restore rejected for " << index_key << "; index will be rebuilt from keyspace"; return {}; diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index ed293d38e5fa..f52b516e3177 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -395,15 +395,13 @@ class RdbLoader : protected RdbLoaderBase { // issues an FT.CREATE call, but does not start indexing void LoadSearchIndexDefFromAux(std::string&& value); - // Load HNSW index metadata from JSON, sets metadata on the GlobalHnswIndexRegistry - void LoadHnswIndexMetadataFromAux(std::string&& value); - // Load synonyms from RESP string and issue FT.SYNUPDATE call void LoadSearchSynonymsFromAux(std::string&& value); // Restore HNSW vector index graph from serialized node data. std::error_code RestoreVectorIndex(std::string_view index_key, std::string_view index_name, - std::string_view field_name, uint64_t elements_number); + std::string_view field_name, uint64_t elements_number, + const search::HnswIndexMetadata& metadata); // Load HNSW vector index nodes into a vector for deferred restoration. std::error_code LoadVectorIndexNodes(uint64_t elements_number, diff --git a/src/server/rdb_load_context.cc b/src/server/rdb_load_context.cc index cd686cc1aaa9..aaf1de0a9623 100644 --- a/src/server/rdb_load_context.cc +++ b/src/server/rdb_load_context.cc @@ -77,11 +77,16 @@ HnswRemapTable BuildRemapTable( // Remaps global_ids in deferred HNSW nodes and restores the graphs. // Returns the set of index names that failed restoration (to be excluded from key mappings). absl::flat_hash_set RemapAndRestoreHnswGraphs( - std::vector& pending_nodes, - const std::vector& hnsw_metadata, const HnswRemapTable& remap_table) { + std::vector& pending_nodes, const HnswRemapTable& remap_table) { absl::flat_hash_set failed_indices; #ifdef WITH_SEARCH for (auto& pn : pending_nodes) { + // Empty graph is a valid state, not a failure — skip restore (the index already + // matches an empty graph) and don't mark it failed. + if (pn.nodes.empty()) { + continue; + } + auto remap_it = remap_table.find(pn.index_name); auto hnsw_index = GlobalHnswIndexRegistry::Instance().Get(pn.index_name, pn.field_name); @@ -120,21 +125,7 @@ absl::flat_hash_set RemapAndRestoreHnswGraphs( continue; } - const PendingHnswMetadata* phm_ptr = nullptr; - for (const auto& phm : hnsw_metadata) { - if (phm.index_name == pn.index_name && phm.field_name == pn.field_name) { - phm_ptr = &phm; - break; - } - } - if (!phm_ptr) { - LOG(ERROR) << "HNSW metadata missing for " << pn.index_name << ":" << pn.field_name - << ". Will rebuild from scratch."; - failed_indices.insert(pn.index_name); - continue; - } - - if (!hnsw_index->RestoreFromNodes(pn.nodes, phm_ptr->metadata)) { + if (!hnsw_index->RestoreFromNodes(pn.nodes, pn.metadata)) { LOG(WARNING) << "HNSW graph restore rejected for " << pn.index_name << ":" << pn.field_name << ". Will rebuild from scratch."; failed_indices.insert(pn.index_name); @@ -258,11 +249,6 @@ void RdbLoadContext::AddPendingIndexMapping(uint32_t shard_id, PendingIndexMappi pending_index_mappings_[shard_id].emplace_back(std::move(mapping)); } -void RdbLoadContext::AddPendingHnswMetadata(PendingHnswMetadata metadata) { - util::fb2::LockGuard lk(mu_); - pending_hnsw_metadata_.emplace_back(std::move(metadata)); -} - void RdbLoadContext::AddPendingHnswNodes(PendingHnswNodes nodes) { util::fb2::LockGuard lk(mu_); pending_hnsw_nodes_.emplace_back(std::move(nodes)); @@ -272,17 +258,6 @@ void RdbLoadContext::SetMasterShardCount(uint32_t count) { master_shard_count_ = count; } -std::optional RdbLoadContext::FindHnswMetadata( - std::string_view index_name, std::string_view field_name) const { - util::fb2::LockGuard lk(mu_); - for (const auto& phm : pending_hnsw_metadata_) { - if (phm.index_name == index_name && phm.field_name == field_name) { - return phm.metadata; - } - } - return std::nullopt; -} - std::vector RdbLoadContext::TakePendingSynonymCommands() { util::fb2::LockGuard lk(mu_); std::vector result; @@ -305,8 +280,7 @@ std::vector RdbLoadContext::TakePendingHnswNodes() { RdbLoadContext::PerShardMappings RdbLoadContext::RemapHnswForDifferentShardCount( const absl::flat_hash_map>& index_mappings, - std::vector& pending_nodes, - const std::vector& hnsw_metadata) { + std::vector& pending_nodes) { const ShardId new_shard_count = shard_set->size(); // Build remap table: index_name -> master_shard_id -> new_global_ids indexed by old doc_id. @@ -314,7 +288,7 @@ RdbLoadContext::PerShardMappings RdbLoadContext::RemapHnswForDifferentShardCount HnswRemapTable remap_table = BuildRemapTable(index_mappings, new_shard_count); // Remap global_ids, restore HNSW graphs; failed indices are excluded from key mappings. - auto failed = RemapAndRestoreHnswGraphs(pending_nodes, hnsw_metadata, remap_table); + auto failed = RemapAndRestoreHnswGraphs(pending_nodes, remap_table); for (const auto& name : failed) { remap_table.erase(name); } @@ -333,16 +307,8 @@ void RdbLoadContext::PerformPostLoad(Service* service, bool is_error) { auto index_mappings = TakePendingIndexMappings(); auto pending_nodes = TakePendingHnswNodes(); - // Extract remaining shared state under lock. After this, no member access is needed. - std::vector hnsw_metadata; - { - util::fb2::LockGuard lk(mu_); - hnsw_metadata.swap(pending_hnsw_metadata_); - } uint32_t master_shards = master_shard_count_; - bool has_hnsw_restore = !hnsw_metadata.empty(); - if (is_error) return; @@ -352,8 +318,7 @@ void RdbLoadContext::PerformPostLoad(Service* service, bool is_error) { if (shard_count_differs && !index_mappings.empty()) { // Remaps HNSW global_ids, restores HNSW graphs, and pre-distributes key mappings by target // shard. The internal remap table is local to the function and freed when it returns. - auto per_shard_mappings = - RemapHnswForDifferentShardCount(index_mappings, pending_nodes, hnsw_metadata); + auto per_shard_mappings = RemapHnswForDifferentShardCount(index_mappings, pending_nodes); // Each shard reads only its own pre-built slice — no per-shard filtering of all N keys. shard_set->AwaitRunningOnShardQueue([&per_shard_mappings](EngineShard* es) { @@ -390,13 +355,14 @@ void RdbLoadContext::PerformPostLoad(Service* service, bool is_error) { // RestoreKeyIndex (above) and RebuildAllIndices (below) run in separate sequential // AwaitRunningOnShardQueue calls, so there is no parallel index build that could interfere // with the doc_ids assigned during key mapping restoration. - LOG(INFO) << "PostLoad: rebuilding search indices across shards has_hnsw_restore=" - << has_hnsw_restore << " rss=" + // RebuildAllIndices decides per-index whether to use the restore path or rebuild from + // scratch, based on the index's actual graph + key_index state. + LOG(INFO) << "PostLoad: rebuilding search indices across shards rss=" << strings::HumanReadableNumBytes(rss_mem_current.load(std::memory_order_relaxed)); - shard_set->AwaitRunningOnShardQueue([has_hnsw_restore](EngineShard* es) { + shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { OpArgs op_args{es, nullptr, DbContext{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()}}; - es->search_indices()->RebuildAllIndices(op_args, has_hnsw_restore); + es->search_indices()->RebuildAllIndices(op_args); }); // Now execute all pending synonym commands after indices are rebuilt @@ -411,19 +377,19 @@ void RdbLoadContext::PerformPostLoad(Service* service, bool is_error) { << strings::HumanReadableNumBytes(rss_mem_current.load(std::memory_order_relaxed)); }); - // All shards completed restoration — drain pending ops. - // DrainPendingVectorUpdates sets kBuilding which allows Add calls. - if (has_hnsw_restore) { - shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { - OpArgs op_args{es, nullptr, - DbContext{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()}}; - for (const auto& name : es->search_indices()->GetIndexNames()) { - if (auto* idx = es->search_indices()->GetIndex(name)) { - idx->DrainPendingVectorUpdates(op_args); - } + // Transition every search index out of kRestoring/kSerializing into kBuilding and + // drain any journal-buffered vector updates accumulated during a restoring window. + // For indices already in kBuilding the state assignment is idempotent and the empty + // pending set returns early, so this is cheap when nothing was deferred. + shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { + OpArgs op_args{es, nullptr, + DbContext{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()}}; + for (const auto& name : es->search_indices()->GetIndexNames()) { + if (auto* idx = es->search_indices()->GetIndex(name)) { + idx->DrainPendingVectorUpdates(op_args); } - }); - } + } + }); #endif } diff --git a/src/server/rdb_load_context.h b/src/server/rdb_load_context.h index 010dc2045d5f..7f627c7becb9 100644 --- a/src/server/rdb_load_context.h +++ b/src/server/rdb_load_context.h @@ -27,17 +27,12 @@ struct PendingIndexMapping { std::vector> mappings; }; -// HNSW metadata loaded from "hnsw-index-metadata" AUX fields. -struct PendingHnswMetadata { - std::string index_name; - std::string field_name; - search::HnswIndexMetadata metadata; -}; - // Deferred HNSW graph nodes for restoration when shard counts differ. +// The entry-point travels with the nodes inside RDB_OPCODE_VECTOR_INDEX. struct PendingHnswNodes { std::string index_name; std::string field_name; + search::HnswIndexMetadata metadata; std::vector nodes; }; @@ -54,13 +49,9 @@ class RdbLoadContext { void AddPendingSynonymCommand(std::string cmd); void AddPendingIndexMapping(uint32_t shard_id, PendingIndexMapping mapping); - void AddPendingHnswMetadata(PendingHnswMetadata metadata); void AddPendingHnswNodes(PendingHnswNodes nodes); void SetMasterShardCount(uint32_t count); - std::optional FindHnswMetadata(std::string_view index_name, - std::string_view field_name) const; - // Performs post load procedures while still remaining in global LOADING state. // Called once immediately after loading the snapshot / full sync succeeded from the coordinator. void PerformPostLoad(Service* service, bool is_error = false); @@ -79,14 +70,12 @@ class RdbLoadContext { // Failed indices are excluded from the returned mappings so they fall back to a full rebuild. PerShardMappings RemapHnswForDifferentShardCount( const absl::flat_hash_map>& index_mappings, - std::vector& pending_nodes, - const std::vector& hnsw_metadata); + std::vector& pending_nodes); mutable util::fb2::Mutex mu_; std::vector pending_synonym_cmds_ ABSL_GUARDED_BY(mu_); absl::flat_hash_map> pending_index_mappings_ ABSL_GUARDED_BY(mu_); - std::vector pending_hnsw_metadata_ ABSL_GUARDED_BY(mu_); std::vector pending_hnsw_nodes_ ABSL_GUARDED_BY(mu_); uint32_t master_shard_count_ = 0; // Set identically by all loaders from AUX field. }; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index cf6149af341c..e43e5ed22d0a 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -29,7 +29,6 @@ extern "C" { #include "core/cms.h" #include "core/json/json_object.h" #include "core/qlist.h" -#include "core/search/hnsw_index.h" #include "core/size_tracking_channel.h" #include "core/sorted_map.h" #include "core/string_map.h" @@ -41,7 +40,6 @@ extern "C" { #include "server/namespaces.h" #include "server/rdb_extensions.h" #include "server/search/doc_index.h" -#include "server/search/global_hnsw_index.h" #include "server/serializer_commons.h" #include "server/snapshot.h" #include "server/tiered_storage.h" @@ -1511,14 +1509,14 @@ error_code RdbSaver::Impl::FlushSerializer() { namespace { -// Collect search index definitions and optionally HNSW metadata. +// Collect search index definitions for replication / RDB. // search_indices always gets simple "index_name cmd" restore commands. -// For summary shards, hnsw_index_metadata gets JSON with HNSW graph metadata, -// and search_synonyms gets synonym group restore commands. +// For summary shards, search_synonyms gets synonym group restore commands. +// (HNSW graph metadata travels inline with the node data in RDB_OPCODE_VECTOR_INDEX, +// so it is not collected here.) void CollectSearchIndices([[maybe_unused]] const EngineShard& shard, [[maybe_unused]] StringVec* search_indices, [[maybe_unused]] StringVec* search_synonyms, - [[maybe_unused]] StringVec* hnsw_index_metadata, [[maybe_unused]] bool is_summary) { #ifdef WITH_SEARCH auto* indices = shard.search_indices(); @@ -1533,28 +1531,6 @@ void CollectSearchIndices([[maybe_unused]] const EngineShard& shard, if (!is_summary) continue; - // Collect HNSW metadata for vector field (first one found), for now we don't support multiple - // vector fields per index serialization - for (const auto& [fident, finfo] : index_info.base_index.schema.fields) { - if (finfo.type == search::SchemaField::VECTOR && - !(finfo.flags & search::SchemaField::NOINDEX)) { - if (auto hnsw_index = GlobalHnswIndexRegistry::Instance().Get(index_name, finfo.short_name); - hnsw_index) { - // Empty graph: enterpoint_node_ is -1 (wraps to UINT32_MAX as tableint); skip - // emission so the load path doesn't receive a garbage entry point. - if (hnsw_index->GetNodeCount() == 0) - break; - auto meta = hnsw_index->GetMetadata(); - TmpJson meta_json; - meta_json["index_name"] = index_name; - meta_json["field_name"] = finfo.short_name; - meta_json["enterpoint_node"] = meta.enterpoint_node; - hnsw_index_metadata->emplace_back(meta_json.to_string()); - break; - } - } - } - // Save synonym groups const auto& synonym_groups = index->GetSynonyms().GetGroups(); for (const auto& [group_id, terms] : synonym_groups) { @@ -1571,18 +1547,16 @@ void CollectSearchIndices([[maybe_unused]] const EngineShard& shard, } // namespace RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service, bool is_summary) { - StringVec script_bodies, search_indices, search_synonyms, hnsw_index_metadata; + StringVec script_bodies, search_indices, search_synonyms; size_t table_mem_result = 0; if (!is_summary) { shard_set->RunBriefInParallel([&](EngineShard* shard) { if (shard->shard_id() == 0) - CollectSearchIndices(*shard, &search_indices, &search_synonyms, &hnsw_index_metadata, - is_summary); + CollectSearchIndices(*shard, &search_indices, &search_synonyms, is_summary); }); return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices), - std::move(search_synonyms), std::move(hnsw_index_metadata), - table_mem_result}; + std::move(search_synonyms), table_mem_result}; } { // For summary file: collect all global data @@ -1595,8 +1569,7 @@ RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service, bool is_sum atomic table_mem{0}; shard_set->RunBriefInParallel([&](EngineShard* shard) { if (shard->shard_id() == 0) - CollectSearchIndices(*shard, &search_indices, &search_synonyms, &hnsw_index_metadata, - is_summary); + CollectSearchIndices(*shard, &search_indices, &search_synonyms, is_summary); auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()); size_t shard_table_mem = 0; @@ -1610,8 +1583,7 @@ RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service, bool is_sum }); return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices), - std::move(search_synonyms), std::move(hnsw_index_metadata), - table_mem.load(memory_order_relaxed)}; + std::move(search_synonyms), table_mem.load(memory_order_relaxed)}; } void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const { @@ -1758,12 +1730,6 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) { RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s)); } - // HNSW index metadata (JSON, summary only) - only for replicas >= VER6 - if (replica_dfly_version_ >= DflyVersion::VER6) { - for (const string& s : glob_state.hnsw_index_metadata) - RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("hnsw-index-metadata", s)); - } - // Save synonyms only in summary file DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_synonyms.empty()); for (const string& s : glob_state.search_synonyms) diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 091a8db89c08..802751ebbd0a 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -160,11 +160,10 @@ class RdbSaver { public: // Global data which doesn't belong to shards and is serialized in header struct GlobalData { - const StringVec lua_scripts; // bodies of lua scripts - const StringVec search_indices; // ft.create commands to re-create search indices - const StringVec search_synonyms; // ft.synupdate commands to restore synonyms - const StringVec hnsw_index_metadata; // HNSW metadata JSON (summary only) - size_t table_used_memory = 0; // total memory used by all tables in all shards + const StringVec lua_scripts; // bodies of lua scripts + const StringVec search_indices; // ft.create commands to re-create search indices + const StringVec search_synonyms; // ft.synupdate commands to restore synonyms + size_t table_used_memory = 0; // total memory used by all tables in all shards }; // single_shard - true means that we run RdbSaver on a single shard and we do not use diff --git a/src/server/search/doc_index.cc b/src/server/search/doc_index.cc index 8493c5989cb8..f4b7961072fa 100644 --- a/src/server/search/doc_index.cc +++ b/src/server/search/doc_index.cc @@ -1223,13 +1223,21 @@ void ShardDocIndices::DropIndexCache(const dfly::ShardDocIndex& shard_doc_index) JsonAccessor::RemoveFieldFromCache(fident); } -void ShardDocIndices::RebuildAllIndices(const OpArgs& op_args, bool is_restored) { +void ShardDocIndices::RebuildAllIndices(const OpArgs& op_args) { for (auto& [index_name, ptr] : indices_) { ptr->InitHnswShardIndices(); - // Only use the restore path for indices that have populated key mappings. - // When shard counts differ, PerformPostLoad remaps the mappings; if remapping fails, - // the mappings are removed so the index falls back to full rebuild here. - bool index_restored = is_restored && ptr->key_index_.Size() > 0; + // Use the restore path only when the HNSW graph was actually populated AND we have + // matching key mappings — otherwise (no graph, no mappings, or a corrupted save + // that left one without the other) fall back to a full rebuild from the keyspace. + bool any_hnsw_field_has_nodes = false; + for (const auto& [_, field] : GetIndexedHnswFields(ptr->base_->schema)) { + if (auto h = GlobalHnswIndexRegistry::Instance().Get(index_name, field.short_name); + h && h->GetNodeCount() > 0) { + any_hnsw_field_has_nodes = true; + break; + } + } + bool index_restored = any_hnsw_field_has_nodes && ptr->key_index_.Size() > 0; ptr->Rebuild(op_args, &local_mr_, index_restored); } } diff --git a/src/server/search/doc_index.h b/src/server/search/doc_index.h index c6aa10f972b5..99ae54273b2c 100644 --- a/src/server/search/doc_index.h +++ b/src/server/search/doc_index.h @@ -544,8 +544,10 @@ class ShardDocIndices { // Drop all indices void DropAllIndices(); - // Rebuild all indices - void RebuildAllIndices(const OpArgs& op_args, bool is_restored); + // Rebuild all indices. Each index decides on its own whether to use the restore + // path (graph already populated from RDB → only vectors need to be filled in) + // or a full rebuild from the keyspace, based on its own state at call time. + void RebuildAllIndices(const OpArgs& op_args); // Block until construction of all indices finishes void BlockUntilConstructionEnd(); diff --git a/src/server/search/doc_index_fallback.cc b/src/server/search/doc_index_fallback.cc index 3349ba636954..5c1b49032cdc 100644 --- a/src/server/search/doc_index_fallback.cc +++ b/src/server/search/doc_index_fallback.cc @@ -23,7 +23,7 @@ void ShardDocIndices::RemoveDoc(std::string_view key, const DbContext& db_cnt, P void ShardDocIndices::DropAllIndices() { } -void ShardDocIndices::RebuildAllIndices(const OpArgs& op_args, bool is_restored) { +void ShardDocIndices::RebuildAllIndices(const OpArgs& op_args) { } void ShardDocIndices::BlockUntilConstructionEnd() { } diff --git a/src/server/search/serialization_utils.cc b/src/server/search/serialization_utils.cc index 4bff8adc8aaa..cc482c3887a5 100644 --- a/src/server/search/serialization_utils.cc +++ b/src/server/search/serialization_utils.cc @@ -89,8 +89,18 @@ void SearchSerializer::SerializeGlobalHnswIndices() const { // Acquire a read lock to ensure a consistent snapshot of the graph. auto read_lock = index->GetReadLock(); - // Format: [RDB_OPCODE_VECTOR_INDEX, index_name, elements_number, + // Skip empty indices entirely — world_.enterpoint_node_ is -1 for an empty graph + // (wraps to UINT32_MAX as tableint); emitting that as the inline entry-point would + // ship a garbage sentinel. The replica's RebuildAllIndices will see no graph nodes + // and rebuild from the (also empty) keyspace, which is the right outcome. + size_t node_count = index->GetNodeCount(); + if (node_count == 0) { + continue; + } + + // Format: [RDB_OPCODE_VECTOR_INDEX, index_name, enterpoint_node, elements_number, // then for each node: binary encoded entry via SaveHNSWEntry] + // The entry-point ships with the node data so loaders need no separate AUX field. if (auto ec = serializer_->WriteOpcode(RDB_OPCODE_VECTOR_INDEX); ec) { continue; } @@ -98,7 +108,11 @@ void SearchSerializer::SerializeGlobalHnswIndices() const { continue; } - size_t node_count = index->GetNodeCount(); + auto metadata = index->GetMetadata(); + if (auto ec = serializer_->SaveLen(metadata.enterpoint_node); ec) { + continue; + } + if (auto ec = serializer_->SaveLen(node_count); ec) { continue; }