Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletions src/core/search/hnsw_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ class HnswSpace : public hnswlib::SpaceInterface<float> {
struct HnswlibAdapter {
// Default setting of hnswlib/hnswalg
constexpr static size_t kDefaultEfRuntime = 10;
constexpr static size_t kSeed = 100;

explicit HnswlibAdapter(const SchemaField::VectorParams& params, bool copy_vector)
: space_{params.dim, params.sim},
world_{&space_, params.capacity, params.hnsw_m, params.hnsw_ef_construction,
100 /* seed*/, copy_vector},
world_{&space_, params.capacity, params.hnsw_m, params.hnsw_ef_construction,
kSeed, copy_vector},
copy_vector_{copy_vector},
capacity_{params.capacity},
M_{params.hnsw_m},
ef_construction_{params.hnsw_ef_construction},
data_size_{params.dim * sizeof(float)},
stub_vector_(data_size_ / sizeof(float), 1.0f) {
}
Expand Down Expand Up @@ -181,6 +185,15 @@ struct HnswlibAdapter {
}

private:
// Discard world_ and reconstruct it with the original ctor parameters — used to
// recover from a partially-applied RestoreFromNodes when the wire-ordering
// invariant is violated. Must be called under the write lock.
void Reset() {
world_.~HierarchicalNSW<float>();
new (&world_)
HierarchicalNSW<float>(&space_, capacity_, M_, ef_construction_, kSeed, copy_vector_);
}

// Actually add the point. Must be called while holding mrmw write lock.
void DoAdd(const void* data, GlobalDocId id) {
while (true) {
Expand Down Expand Up @@ -274,35 +287,41 @@ struct HnswlibAdapter {
DCHECK_EQ(world_.cur_element_count.load(), 0u)
<< "RestoreFromNodes should only be called on an empty index during deserialization";

// hnswlib pairs enterpoint_node_ with maxlevel_; node levels are immutable after
// creation, so the entry point's level in the serialized set equals the live
// maxlevel at metadata capture. max(node.level) would risk OOB reads when a
// concurrent Add raised maxlevel between capture and node serialization.
size_t max_internal_id = 0;
int entrypoint_level = -1;
for (const auto& node : nodes) {
max_internal_id = std::max<size_t>(max_internal_id, node.internal_id);
if (node.internal_id == metadata.enterpoint_node)
entrypoint_level = node.level;
}
if (entrypoint_level < 0) {
// Wire-ordering invariant: GetNodesRange writes nodes by ascending internal_id
// 0..count-1 under the saver's read lock, and the loader reads them sequentially
// (LoadVectorIndexNodes), so nodes[i].internal_id == i and nodes.size() is the
// capacity we need. Verify the entry-point in O(1) and read its level directly —
// by the hnswlib invariant it equals world_.maxlevel_ at save time.
if (metadata.enterpoint_node >= nodes.size()) {
LOG(ERROR) << "HNSW restore: entry point internal_id=" << metadata.enterpoint_node
<< " not present in serialized node set (" << nodes.size()
<< " out of range (" << nodes.size()
<< " nodes); skipping restore — index will be rebuilt from the keyspace";
return false;
}
if (world_.max_elements_ < max_internal_id + 1) {
world_.resizeIndex(max_internal_id + 1);
int entrypoint_level = nodes[metadata.enterpoint_node].level;
if (world_.max_elements_ < nodes.size()) {
Comment thread
BorysTheDev marked this conversation as resolved.
world_.resizeIndex(nodes.size());
}
Comment thread
BorysTheDev marked this conversation as resolved.

// Restore each node - directly set up memory and fields
// Restore each node - directly set up memory and fields. We also enforce the
// wire-ordering invariant (nodes[i].internal_id == i) inline: if a corrupted or
// future-format wire violates it we bail out cleanly so the index is rebuilt from
// the keyspace instead of writing past the resized memory. On failure Reset()
// discards world_ entirely (calling its destructor) and reconstructs it with the
// original ctor params — this leaves the index indistinguishable from a freshly
// created empty graph regardless of what internal state hnswlib accumulates.
size_t restored_count = 0;

for (const auto& node : nodes) {
size_t internal_id = node.internal_id;

// Validate internal_id is within bounds - invalid internal_id indicates corrupted data
CHECK(internal_id < world_.max_elements_);
for (size_t i = 0; i < nodes.size(); ++i) {
const auto& node = nodes[i];
if (node.internal_id != i) {
LOG(ERROR) << "HNSW restore: wire ordering invariant violated at index " << i
<< " (got internal_id=" << node.internal_id << "); index will be rebuilt "
<< "from the keyspace";
Reset();
return false;
Comment thread
BorysTheDev marked this conversation as resolved.
}
size_t internal_id = i;

// Register label in lookup table
world_.label_lookup_[node.global_id] = internal_id;
Expand Down Expand Up @@ -424,6 +443,9 @@ struct HnswlibAdapter {
mutable MRMWMutex mrmw_mutex_;

bool copy_vector_; // Whether vectors are copied into hnswlib.
size_t capacity_; // Initial max_elements_ — used to reconstruct world_.
size_t M_; // hnsw_m — used to reconstruct world_.
size_t ef_construction_; // hnsw_ef_construction — used to reconstruct world_.
size_t data_size_; // Byte size of a single vector.
std::vector<float> stub_vector_; // Non-zero data for deleted nodes in borrowed mode.
};
Expand Down
8 changes: 4 additions & 4 deletions src/core/search/hnsw_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

namespace dfly::search {

// Wire format for HNSW index AUX. Only the entry point is persisted: capacity is
// derived from max(internal_id)+1 in the node set and maxlevel from the entry-point
// node's level (hnswlib pairs enterpoint_node_ with maxlevel_, and node levels are
// immutable after creation).
// HNSW graph state needed at restore time. Capacity is derived from nodes.size()
// (internal_ids are contiguous 0..N-1 because hnswlib uses tombstones for deletes
// and GetNodesRange writes them in order); maxlevel is the entry-point node's
// level by hnswlib invariant, looked up in O(1) at restore.
struct HnswIndexMetadata {
size_t enterpoint_node = 0;
};
Expand Down
9 changes: 5 additions & 4 deletions src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ constexpr uint8_t RDB_OPCODE_DF_MASK = 220; /* Mask for key properties */
constexpr uint32_t DF_MASK_FLAG_STICKY = (1 << 0);
constexpr uint32_t DF_MASK_FLAG_MC_FLAGS = (1 << 1);

// Opcode to store HNSW vector index node data for global indices
// Format: [index_name, elements_number, internal_id, global_id, level, zero_level_links_num,
// zero_level_links,
// higher_level_links_num (only if level > 0), higher_level_links (only if level > 0)]
// Opcode to store HNSW vector index node data for global indices.
// Format: [index_name, enterpoint_node, elements_number,
// then for each node in ascending internal_id 0..elements_number-1:
// internal_id, global_id, level, zero_level_links_num, zero_level_links,
// higher_level_links_num (only if level > 0), higher_level_links (only if level > 0)]
constexpr uint8_t RDB_OPCODE_VECTOR_INDEX = 222;

// Opcode to store ShardDocIndex key-to-DocId mapping for search indices
Expand Down
47 changes: 10 additions & 37 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2702,8 +2702,6 @@ error_code RdbLoader::HandleAux() {
/* Just ignored. */
} else if (auxkey == "search-index") {
LoadSearchIndexDefFromAux(std::move(auxval));
Comment thread
BorysTheDev marked this conversation as resolved.
} else if (auxkey == "hnsw-index-metadata") {
LoadHnswIndexMetadataFromAux(std::move(auxval));
} else if (auxkey == "search-synonyms") {
LoadSearchSynonymsFromAux(std::move(auxval));
} else if (auxkey == "shard-count") {
Expand Down Expand Up @@ -3055,38 +3053,18 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition", true);
}

void RdbLoader::LoadHnswIndexMetadataFromAux(string&& def) {
try {
auto json_opt = JsonFromString(def);
if (!json_opt) {
LOG(ERROR) << "Invalid HNSW index metadata JSON: " << def;
return;
}
const auto& json = *json_opt;

PendingHnswMetadata phm;
phm.index_name = json["index_name"].as<string>();
phm.field_name = json["field_name"].as<string>();
phm.metadata.enterpoint_node = json["enterpoint_node"].as<size_t>();

LOG(INFO) << "Loaded HNSW metadata for index=" << phm.index_name << " field=" << phm.field_name
<< " enterpoint=" << phm.metadata.enterpoint_node;

load_context_->AddPendingHnswMetadata(std::move(phm));
} catch (const std::exception& e) {
LOG(ERROR) << "Failed to parse HNSW index metadata JSON: " << e.what() << " def: " << def;
}
}

error_code RdbLoader::HandleVectorIndex() {
// HNSW vector index graph data.
// Binary format: [index_key, elements_number,
// then for each node (little-endian):
// Binary format: [index_key, enterpoint_node, elements_number,
Comment thread
BorysTheDev marked this conversation as resolved.
// then for each node (little-endian, ascending internal_id 0..count-1):
// internal_id (4 bytes), global_id (8 bytes), level (4 bytes),
// for each level (0 to level): links_num (4 bytes) + links (4 bytes each)]
string index_key;
SET_OR_RETURN(FetchGenericString(), index_key);

search::HnswIndexMetadata metadata;
SET_OR_RETURN(LoadLen(nullptr), metadata.enterpoint_node);
Comment thread
BorysTheDev marked this conversation as resolved.

uint64_t elements_number;
Comment thread
BorysTheDev marked this conversation as resolved.
SET_OR_RETURN(LoadLen(nullptr), elements_number);

Expand All @@ -3104,12 +3082,12 @@ error_code RdbLoader::HandleVectorIndex() {

if (shard_count_ == shard_set->size()) {
// Same shard count: restore directly.
return RestoreVectorIndex(index_key, index_name, field_name, elements_number);
return RestoreVectorIndex(index_key, index_name, field_name, elements_number, metadata);
}

// Different shard count: load nodes and defer restoration.
// Global_ids will be remapped in PerformPostLoad after all key mappings are collected.
PendingHnswNodes pending{std::string(index_name), std::string(field_name), {}};
PendingHnswNodes pending{std::string(index_name), std::string(field_name), metadata, {}};
RETURN_ON_ERR(LoadVectorIndexNodes(elements_number, &pending.nodes));
LOG(INFO) << "Deferred HNSW index restore for " << index_key << " with " << pending.nodes.size()
<< " nodes (shard count mismatch: " << shard_count_ << " vs " << shard_set->size()
Expand Down Expand Up @@ -3179,7 +3157,8 @@ error_code RdbLoader::LoadVectorIndexNodes(uint64_t elements_number,
}

error_code RdbLoader::RestoreVectorIndex(string_view index_key, string_view index_name,
string_view field_name, uint64_t elements_number) {
string_view field_name, uint64_t elements_number,
const search::HnswIndexMetadata& metadata) {
#ifdef WITH_SEARCH
// Look up the HNSW index in the global registry. It should exist from FT.CREATE in aux.
auto hnsw_index = GlobalHnswIndexRegistry::Instance().Get(index_name, field_name);
Expand All @@ -3194,13 +3173,7 @@ error_code RdbLoader::RestoreVectorIndex(string_view index_key, string_view inde
if (nodes.empty())
return {};

auto metadata = load_context_->FindHnswMetadata(index_name, field_name);
if (!metadata) {
LOG(ERROR) << "HNSW metadata missing for " << index_key
<< "; skipping graph restore — index will be rebuilt from keyspace";
return {};
}
if (!hnsw_index->RestoreFromNodes(nodes, *metadata)) {
if (!hnsw_index->RestoreFromNodes(nodes, metadata)) {
LOG(WARNING) << "HNSW graph restore rejected for " << index_key
<< "; index will be rebuilt from keyspace";
return {};
Expand Down
6 changes: 2 additions & 4 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,15 +395,13 @@ class RdbLoader : protected RdbLoaderBase {
// issues an FT.CREATE call, but does not start indexing
void LoadSearchIndexDefFromAux(std::string&& value);

// Load HNSW index metadata from JSON, sets metadata on the GlobalHnswIndexRegistry
void LoadHnswIndexMetadataFromAux(std::string&& value);

// Load synonyms from RESP string and issue FT.SYNUPDATE call
void LoadSearchSynonymsFromAux(std::string&& value);

// Restore HNSW vector index graph from serialized node data.
std::error_code RestoreVectorIndex(std::string_view index_key, std::string_view index_name,
std::string_view field_name, uint64_t elements_number);
std::string_view field_name, uint64_t elements_number,
const search::HnswIndexMetadata& metadata);

// Load HNSW vector index nodes into a vector for deferred restoration.
std::error_code LoadVectorIndexNodes(uint64_t elements_number,
Expand Down
Loading
Loading