From 94a9d599442174742be3ab41532de574c99adbdc Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 19 Oct 2025 09:08:09 +0000 Subject: [PATCH 1/9] get capcity instead of num that can be changed during addvector --- src/VecSim/algorithms/svs/svs.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index b701d8c7e..80b7f032f 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -374,7 +374,7 @@ class SVSIndex : public VecSimIndexAbstract, fl .maxCandidatePoolSize = this->buildParams.max_candidate_pool_size, .pruneTo = this->buildParams.prune_to, .useSearchHistory = this->buildParams.use_full_search_history, - .numThreads = this->getNumThreads(), + .numThreads = this->getThreadPoolCapacity(), .numberOfMarkedDeletedNodes = this->num_marked_deleted, .searchWindowSize = this->search_window_size, .searchBufferCapacity = this->search_buffer_capacity, From 1ec73bb41389d5267574f519ceeccf93e7e0e120 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 19 Oct 2025 12:48:34 +0000 Subject: [PATCH 2/9] add test --- src/VecSim/algorithms/svs/svs.h | 1 + src/VecSim/vec_sim_common.h | 2 + tests/unit/test_svs_tiered.cpp | 82 +++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 80b7f032f..c210b82a0 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -374,6 +374,7 @@ class SVSIndex : public VecSimIndexAbstract, fl .maxCandidatePoolSize = this->buildParams.max_candidate_pool_size, .pruneTo = this->buildParams.prune_to, .useSearchHistory = this->buildParams.use_full_search_history, + .lastReservedThreads = this->getNumThreads(), .numThreads = this->getThreadPoolCapacity(), .numberOfMarkedDeletedNodes = this->num_marked_deleted, .searchWindowSize = this->search_window_size, diff --git a/src/VecSim/vec_sim_common.h b/src/VecSim/vec_sim_common.h index fbeec10dc..c31e695ce 100644 --- a/src/VecSim/vec_sim_common.h +++ b/src/VecSim/vec_sim_common.h @@ -351,6 +351,8 @@ typedef struct { size_t pruneTo; // Amount that candidates will be pruned. bool useSearchHistory; // Either the contents of the search buffer can be used or // the entire search history. + size_t lastReservedThreads; // Number of threads that were successfully reserved by the last + // ingestion operation. size_t numThreads; // Maximum number of threads to be used by svs for ingestion. size_t numberOfMarkedDeletedNodes; // The number of nodes that are marked as deleted. size_t searchWindowSize; // Search window size for Vamana graph accuracy/latency tune. diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index e52b40236..3a784d45f 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -251,6 +251,88 @@ TYPED_TEST(SVSTieredIndexTest, ThreadsReservation) { mock_thread_pool.thread_pool_join(); } +TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { + // Set thread_pool_size to 4 or actual number of available CPUs + const auto num_threads = std::min(4U, getAvailableCPUs()); + if (num_threads < 2) { + // If the number of threads is less than 2, we can't run the test + GTEST_SKIP() << "No threads available"; + } + + constexpr size_t training_threshold = 10; + constexpr size_t update_threshold = 10; + constexpr size_t update_job_wait_time = 10000; + constexpr size_t dim = 4; + SVSParams params = {.type = TypeParam::get_index_type(), + .dim = dim, + .metric = VecSimMetric_L2, + .num_threads = num_threads}; + VecSimParams svs_params = CreateParams(params); + auto mock_thread_pool = tieredIndexMock(); + mock_thread_pool.thread_pool_size = num_threads; + + // Create TieredSVS index instance with a mock queue. + auto *tiered_index = this->CreateTieredSVSIndex( + svs_params, mock_thread_pool, training_threshold, update_threshold, update_job_wait_time); + ASSERT_INDEX(tiered_index); + + // Verify initial state: both fields should equal configured thread count + VecSimIndexDebugInfo backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, num_threads); + // Get the allocator from the tiered index. + auto allocator = tiered_index->getAllocator(); + + // Simulate thread contention by keeping one thread busy, to force reduced thread availability + std::atomic is_reserved = false; + std::atomic thread_wait = true; + auto reserve_and_wait = [&](VecSimIndex * /*unused*/, size_t num_threads) { + is_reserved = true; + while (thread_wait) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }; + + // Keep one thread occupied + mock_thread_pool.init_threads(); + SVSMultiThreadJob::JobsRegistry registry(allocator); + std::chrono::milliseconds timeout{update_job_wait_time}; // long enough to reserve one thread + auto jobs = SVSMultiThreadJob::createJobs(allocator, SVS_BATCH_UPDATE_JOB, reserve_and_wait, + tiered_index, 1, timeout, ®istry); + ASSERT_EQ(jobs.size(), 1); + tiered_index->submitJobs(jobs); + // Wait for thread reservation to complete + while (!is_reserved) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // Trigger training with reduced thread availability + for (size_t i = 0; i < training_threshold; ++i) { + GenerateAndAddVector(tiered_index, dim, i); + } + while (tiered_index->GetBackendIndex()->indexSize() != training_threshold) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + // Verify: numThreads unchanged, lastReservedThreads reflects actual availability + backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, num_threads - 1); + + // Release occupied thread and trigger another background indexing + thread_wait = false; + mock_thread_pool.thread_pool_wait(); + // add more vectors to trigger background indexing + for (size_t i = training_threshold; i < training_threshold + update_threshold; ++i) { + GenerateAndAddVector(tiered_index, dim, i); + } + mock_thread_pool.thread_pool_join(); + + // Verify: numThreads unchanged, lastReservedThreads reflects we used all configured threads + backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, num_threads); +} + TYPED_TEST(SVSTieredIndexTest, CreateIndexInstance) { // Create TieredSVS index instance with a mock queue. SVSParams params = {.type = TypeParam::get_index_type(), From 7a636c4bbae6feff5bac98ff926d425aff7ab714 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 19 Oct 2025 12:50:59 +0000 Subject: [PATCH 3/9] increase SVS_DEFAULT_UPDATE_JOB_WAIT_TIME to 1 ms --- src/VecSim/algorithms/svs/svs_utils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index e53037766..454d81b36 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -27,7 +27,7 @@ // Maximum training threshold for SVS index, used to limit the size of training data constexpr size_t SVS_MAX_TRAINING_THRESHOLD = 100 * DEFAULT_BLOCK_SIZE; // 100 * 1024 vectors // Default wait time for the update job in microseconds -constexpr size_t SVS_DEFAULT_UPDATE_JOB_WAIT_TIME = 100; // 0.1 ms +constexpr size_t SVS_DEFAULT_UPDATE_JOB_WAIT_TIME = 1000; // 1 ms namespace svs_details { // VecSim->SVS data type conversion From f42acfae6e80a2e437a1d3f207ea963c083fc3c7 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 19 Oct 2025 13:13:21 +0000 Subject: [PATCH 4/9] add to info iterator --- src/VecSim/algorithms/svs/svs.h | 7 ++++++- src/VecSim/utils/vec_utils.cpp | 1 + src/VecSim/utils/vec_utils.h | 1 + src/VecSim/vec_sim_common.h | 2 +- 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index c210b82a0..9c5335ae6 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -374,8 +374,8 @@ class SVSIndex : public VecSimIndexAbstract, fl .maxCandidatePoolSize = this->buildParams.max_candidate_pool_size, .pruneTo = this->buildParams.prune_to, .useSearchHistory = this->buildParams.use_full_search_history, - .lastReservedThreads = this->getNumThreads(), .numThreads = this->getThreadPoolCapacity(), + .lastReservedThreads = this->getNumThreads(), .numberOfMarkedDeletedNodes = this->num_marked_deleted, .searchWindowSize = this->search_window_size, .searchBufferCapacity = this->search_buffer_capacity, @@ -444,6 +444,11 @@ class SVSIndex : public VecSimIndexAbstract, fl .fieldType = INFOFIELD_UINT64, .fieldValue = {FieldValue{.uintegerValue = info.svsInfo.numThreads}}}); + infoIterator->addInfoField(VecSim_InfoField{ + .fieldName = VecSimCommonStrings::SVS_LAST_RESERVED_THREADS_STRING, + .fieldType = INFOFIELD_UINT64, + .fieldValue = {FieldValue{.uintegerValue = info.svsInfo.lastReservedThreads}}}); + infoIterator->addInfoField(VecSim_InfoField{ .fieldName = VecSimCommonStrings::NUM_MARKED_DELETED, .fieldType = INFOFIELD_UINT64, diff --git a/src/VecSim/utils/vec_utils.cpp b/src/VecSim/utils/vec_utils.cpp index 6e702e4c9..ed353ab89 100644 --- a/src/VecSim/utils/vec_utils.cpp +++ b/src/VecSim/utils/vec_utils.cpp @@ -64,6 +64,7 @@ const char *VecSimCommonStrings::SVS_GRAPH_MAX_DEGREE_STRING = "GRAPH_MAX_DEGREE const char *VecSimCommonStrings::SVS_MAX_CANDIDATE_POOL_SIZE_STRING = "MAX_CANDIDATE_POOL_SIZE"; const char *VecSimCommonStrings::SVS_PRUNE_TO_STRING = "PRUNE_TO"; const char *VecSimCommonStrings::SVS_NUM_THREADS_STRING = "NUM_THREADS"; +const char *VecSimCommonStrings::SVS_LAST_RESERVED_THREADS_STRING = "LAST_RESERVED_NUM_THREADS"; const char *VecSimCommonStrings::SVS_LEANVEC_DIM_STRING = "LEANVEC_DIMENSION"; const char *VecSimCommonStrings::BLOCK_SIZE_STRING = "BLOCK_SIZE"; diff --git a/src/VecSim/utils/vec_utils.h b/src/VecSim/utils/vec_utils.h index 946968ae7..9e3cd74df 100644 --- a/src/VecSim/utils/vec_utils.h +++ b/src/VecSim/utils/vec_utils.h @@ -65,6 +65,7 @@ struct VecSimCommonStrings { static const char *SVS_MAX_CANDIDATE_POOL_SIZE_STRING; static const char *SVS_PRUNE_TO_STRING; static const char *SVS_NUM_THREADS_STRING; + static const char *SVS_LAST_RESERVED_THREADS_STRING; static const char *SVS_LEANVEC_DIM_STRING; static const char *BLOCK_SIZE_STRING; diff --git a/src/VecSim/vec_sim_common.h b/src/VecSim/vec_sim_common.h index c31e695ce..3110753ac 100644 --- a/src/VecSim/vec_sim_common.h +++ b/src/VecSim/vec_sim_common.h @@ -351,9 +351,9 @@ typedef struct { size_t pruneTo; // Amount that candidates will be pruned. bool useSearchHistory; // Either the contents of the search buffer can be used or // the entire search history. + size_t numThreads; // Maximum number of threads to be used by svs for ingestion. size_t lastReservedThreads; // Number of threads that were successfully reserved by the last // ingestion operation. - size_t numThreads; // Maximum number of threads to be used by svs for ingestion. size_t numberOfMarkedDeletedNodes; // The number of nodes that are marked as deleted. size_t searchWindowSize; // Search window size for Vamana graph accuracy/latency tune. size_t searchBufferCapacity; // Search buffer capacity for Vamana graph accuracy/latency tune. From 06428613d1dc9111864bc60232a527781b2de19f Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 19 Oct 2025 13:36:46 +0000 Subject: [PATCH 5/9] fix tests --- tests/unit/unit_test_utils.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/unit/unit_test_utils.cpp b/tests/unit/unit_test_utils.cpp index 21c587bd7..3c9a70ae6 100644 --- a/tests/unit/unit_test_utils.cpp +++ b/tests/unit/unit_test_utils.cpp @@ -23,7 +23,7 @@ using float16 = vecsim_types::float16; namespace DebugInfoIteratorFieldCount { constexpr size_t FLAT = 10; constexpr size_t HNSW = 17; -constexpr size_t SVS = 23; +constexpr size_t SVS = 24; constexpr size_t TIERED_HNSW = 15; constexpr size_t TIERED_SVS = 17; } // namespace DebugInfoIteratorFieldCount @@ -618,6 +618,11 @@ void compareSVSIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIte // SVS number of threads. ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); ASSERT_EQ(infoField->fieldValue.uintegerValue, info.svsInfo.numThreads); + } else if (!strcmp(infoField->fieldName, + VecSimCommonStrings::SVS_LAST_RESERVED_THREADS_STRING)) { + // SVS number of last reserved threads. + ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); + ASSERT_EQ(infoField->fieldValue.uintegerValue, info.svsInfo.lastReservedThreads); } else if (!strcmp(infoField->fieldName, VecSimCommonStrings::NUM_MARKED_DELETED)) { // SVS number of marked deleted nodes. ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); @@ -771,6 +776,7 @@ std::vector getSVSFields() { fields.push_back(VecSimCommonStrings::SVS_PRUNE_TO_STRING); fields.push_back(VecSimCommonStrings::SVS_USE_SEARCH_HISTORY_STRING); fields.push_back(VecSimCommonStrings::SVS_NUM_THREADS_STRING); + fields.push_back(VecSimCommonStrings::SVS_LAST_RESERVED_THREADS_STRING); fields.push_back(VecSimCommonStrings::NUM_MARKED_DELETED); fields.push_back(VecSimCommonStrings::SVS_SEARCH_WS_STRING); fields.push_back(VecSimCommonStrings::SVS_SEARCH_BC_STRING); From c9c0ae1ec3d28dd5c85a2bb642163ad2415b4432 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 20 Oct 2025 04:39:08 +0000 Subject: [PATCH 6/9] tiered index calls setNumThreads before adding vectors to the backend index limit addVector: should only be called with numThreads=1 limit addvectors: can be called with n ==1 only when numThreads ==1 enforce this change in tiered tests: set the num threads in the index to 1 --- src/VecSim/algorithms/svs/svs.h | 30 +++++--- src/VecSim/algorithms/svs/svs_tiered.h | 7 +- tests/unit/test_svs_fp16.cpp | 22 +++++- tests/unit/test_svs_tiered.cpp | 102 +++++++++++++++++++++++-- 4 files changed, 139 insertions(+), 22 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 9c5335ae6..e407642f6 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -54,6 +54,11 @@ struct SVSIndexBase size_t num_marked_deleted; }; +/** Thread Management Strategy: + * - addVector(): Requires numThreads == 1 + * - addVectors(): Allows any numThreads value, but prohibits n=1 with numThreads>1 + * - Callers are responsible for setting appropriate thread counts + **/ template class SVSIndex : public VecSimIndexAbstract, float>, @@ -199,6 +204,12 @@ class SVSIndex : public VecSimIndexAbstract, fl return processed_blob; } + // Assuming numThreads was updated to reflect the number of available threads before this + // function was called. + // This function assumes that the caller has already set numThreads to the appropriate value + // for the operation. + // Important NOTE: For single vector operations (n=1), numThreads should be 1. + // For bulk operations (n>1), numThreads should reflect the number of available threads. int addVectorsImpl(const void *vectors_data, const labelType *labels, size_t n) { if (n == 0) { return 0; @@ -217,12 +228,6 @@ class SVSIndex : public VecSimIndexAbstract, fl // Wrap data into SVS SimpleDataView for SVS API auto points = svs::data::SimpleDataView{typed_vectors_data, n, this->dim}; - // If n == 1, we should ensure single-threading - const size_t current_num_threads = getNumThreads(); - if (n == 1 && current_num_threads > 1) { - setNumThreads(1); - } - if (!impl_) { // SVS index instance cannot be empty, so we have to construct it at first rows initImpl(points, ids); @@ -231,11 +236,6 @@ class SVSIndex : public VecSimIndexAbstract, fl impl_->add_points(points, ids); } - // Restore multi-threading if needed - if (n == 1 && current_num_threads > 1) { - setNumThreads(current_num_threads); - } - return n - deleted_num; } @@ -478,10 +478,18 @@ class SVSIndex : public VecSimIndexAbstract, fl } int addVector(const void *vector_data, labelType label) override { + // Enforce single-threaded execution for single vector operations to ensure optimal + // performance and consistent behavior. Callers must set numThreads=1 before calling this + // method. + assert(getNumThreads() == 1 && "Can't use more than one thread to insert a single vector"); return addVectorsImpl(vector_data, &label, 1); } int addVectors(const void *vectors_data, const labelType *labels, size_t n) override { + // Prevent misuse: single vector operations should use addVector(), not addVectors() with + // n=1 This ensures proper thread management and API contract enforcement. + assert(!(n == 1 && getNumThreads() > 1) && + "Can't use more than one thread to insert a single vector"); return addVectorsImpl(vectors_data, labels, n); } diff --git a/src/VecSim/algorithms/svs/svs_tiered.h b/src/VecSim/algorithms/svs/svs_tiered.h index 929627f21..9da3760b3 100644 --- a/src/VecSim/algorithms/svs/svs_tiered.h +++ b/src/VecSim/algorithms/svs/svs_tiered.h @@ -722,7 +722,12 @@ class TieredSVSIndex : public VecSimTieredIndex { // prevent update job from running in parallel and lock any access to the backend // index std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard); - return svs_index->addVectors(storage_blob.get(), &label, 1); + // Set available thread count to 1 for single vector write-in-place operation. + // This maintains the contract that single vector operations use exactly one thread. + // TODO: Replace this setNumThreads(1) call with an assertion once we establish + // a contract that write-in-place mode guarantees numThreads == 1. + svs_index->setNumThreads(1); + return this->backendIndex->addVector(storage_blob.get(), label); } } assert(this->getWriteMode() != VecSim_WriteInPlace && "InPlace mode returns early"); diff --git a/tests/unit/test_svs_fp16.cpp b/tests/unit/test_svs_fp16.cpp index 7e7ab0802..5ccc14c69 100644 --- a/tests/unit/test_svs_fp16.cpp +++ b/tests/unit/test_svs_fp16.cpp @@ -2223,26 +2223,42 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { TieredSVSParams{.trainingTriggerThreshold = training_threshold, .updateTriggerThreshold = update_threshold}}}; } + void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, + size_t expected_capcity) { + ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads); + ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity); + } TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, - tieredIndexMock &mock_thread_pool) { + tieredIndexMock &mock_thread_pool, + size_t num_available_threads = 1) { auto *tiered_index = reinterpret_cast *>(TieredFactory::NewIndex(&tiered_params)); // Set the created tiered index in the index external context (it will take ownership over // the index, and we'll need to release the ctx at the end of the test. mock_thread_pool.ctx->index_strong_ref.reset(tiered_index); + + // Set number of available threads to 1 unless specified otherwise, + // so we can insert one vector at a time directly to svs. + tiered_index->GetSVSIndex()->setNumThreads(num_available_threads); + size_t params_threadpool_size = + tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; + size_t expected_capacity = + params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size; + verifyNumThreads(tiered_index, num_available_threads, expected_capacity); return tiered_index; } TieredSVSIndex * CreateTieredSVSIndex(VecSimParams &svs_params, tieredIndexMock &mock_thread_pool, size_t training_threshold = SVS_VAMANA_DEFAULT_TRAINING_THRESHOLD, - size_t update_threshold = SVS_VAMANA_DEFAULT_UPDATE_THRESHOLD) { + size_t update_threshold = SVS_VAMANA_DEFAULT_UPDATE_THRESHOLD, + size_t num_available_threads = 1) { svs_params.algoParams.svsParams.quantBits = index_type_t::get_quant_bits(); TieredIndexParams tiered_params = CreateTieredSVSParams( svs_params, mock_thread_pool, training_threshold, update_threshold); - return CreateTieredSVSIndex(tiered_params, mock_thread_pool); + return CreateTieredSVSIndex(tiered_params, mock_thread_pool, num_available_threads); } void SetUp() override { diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index 3a784d45f..5dc221726 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -90,14 +90,31 @@ class SVSTieredIndexTest : public ::testing::Test { .updateJobWaitTime = update_job_wait_time}}}; } + void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, + size_t expected_capcity) { + ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads); + ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity); + } TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, - tieredIndexMock &mock_thread_pool) { + tieredIndexMock &mock_thread_pool, + size_t num_available_threads = 1) { auto *tiered_index = reinterpret_cast *>(TieredFactory::NewIndex(&tiered_params)); // Set the created tiered index in the index external context (it will take ownership over // the index, and we'll need to release the ctx at the end of the test. mock_thread_pool.ctx->index_strong_ref.reset(tiered_index); + // Set numThreads to 1 by default to allow direct calls to SVS addVector() API, + // which requires exactly 1 thread. When using tiered index addVector API, + // the thread count is managed internally according to the operation and threadpool + // capacity, so testing parallelism remains intact. + tiered_index->GetSVSIndex()->setNumThreads(num_available_threads); + size_t params_threadpool_size = + tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; + size_t expected_capacity = + params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size; + verifyNumThreads(tiered_index, num_available_threads, expected_capacity); + return tiered_index; } @@ -105,12 +122,13 @@ class SVSTieredIndexTest : public ::testing::Test { CreateTieredSVSIndex(VecSimParams &svs_params, tieredIndexMock &mock_thread_pool, size_t training_threshold = TestsDefaultTrainingThreshold, size_t update_threshold = TestsDefaultUpdateThreshold, - size_t update_job_wait_time = SVS_DEFAULT_UPDATE_JOB_WAIT_TIME) { + size_t update_job_wait_time = SVS_DEFAULT_UPDATE_JOB_WAIT_TIME, + size_t num_available_threads = 1) { svs_params.algoParams.svsParams.quantBits = index_type_t::get_quant_bits(); TieredIndexParams tiered_params = CreateTieredSVSParams(svs_params, mock_thread_pool, training_threshold, update_threshold, update_job_wait_time); - return CreateTieredSVSIndex(tiered_params, mock_thread_pool); + return CreateTieredSVSIndex(tiered_params, mock_thread_pool, num_available_threads); } void SetUp() override { @@ -272,8 +290,9 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { mock_thread_pool.thread_pool_size = num_threads; // Create TieredSVS index instance with a mock queue. - auto *tiered_index = this->CreateTieredSVSIndex( - svs_params, mock_thread_pool, training_threshold, update_threshold, update_job_wait_time); + auto *tiered_index = + this->CreateTieredSVSIndex(svs_params, mock_thread_pool, training_threshold, + update_threshold, update_job_wait_time, num_threads); ASSERT_INDEX(tiered_index); // Verify initial state: both fields should equal configured thread count @@ -322,8 +341,8 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { thread_wait = false; mock_thread_pool.thread_pool_wait(); // add more vectors to trigger background indexing - for (size_t i = training_threshold; i < training_threshold + update_threshold; ++i) { - GenerateAndAddVector(tiered_index, dim, i); + for (size_t i = 0; i < update_threshold; ++i) { + GenerateAndAddVector(tiered_index, dim, training_threshold + i); } mock_thread_pool.thread_pool_join(); @@ -333,6 +352,75 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, num_threads); } +TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCountWriteInPlace) { + // Test that write-in-place mode correctly reports thread usage in debug info. + // Even when the index is configured with multiple threads, write-in-place operations + // should use only 1 thread and lastReservedThreads should reflect this. + + // Set thread_pool_size to 4 or actual number of available CPUs + const auto num_threads = std::min(4U, getAvailableCPUs()); + if (num_threads < 2) { + // If the number of threads is less than 2, we can't run the test + GTEST_SKIP() << "No threads available"; + } + + // Test svs when mode is write in place, but the index is configured with multiple threads. + constexpr size_t training_threshold = 10; + constexpr size_t update_threshold = 10; + constexpr size_t update_job_wait_time = 10000; + constexpr size_t dim = 4; + SVSParams params = {.type = TypeParam::get_index_type(), + .dim = dim, + .metric = VecSimMetric_L2, + .num_threads = num_threads}; + VecSimParams svs_params = CreateParams(params); + auto mock_thread_pool = tieredIndexMock(); + mock_thread_pool.thread_pool_size = num_threads; + + // Create TieredSVS index instance with a mock queue. + auto *tiered_index = + this->CreateTieredSVSIndex(svs_params, mock_thread_pool, training_threshold, + update_threshold, update_job_wait_time, num_threads); + ASSERT_INDEX(tiered_index); + + // Set to mode to write in place even though the index is configured with multiple threads. + VecSim_SetWriteMode(VecSim_WriteInPlace); + // Verify initial state: both fields should equal configured thread count + VecSimIndexDebugInfo backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, num_threads); + // Get the allocator from the tiered index. + auto allocator = tiered_index->getAllocator(); + + mock_thread_pool.init_threads(); + + // Trigger training + for (size_t i = 0; i < training_threshold; ++i) { + GenerateAndAddVector(tiered_index, dim, i); + } + ASSERT_EQ(mock_thread_pool.jobQ.size(), 0); + + while (tiered_index->GetBackendIndex()->indexSize() != training_threshold) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + // Verify: numThreads unchanged, lastReservedThreads reflects we only used one thread (main + // thread) + backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); + + // add more vectors, each will be directly inserted to the backend index + size_t num_vectors = 10; + for (size_t i = 0; i < num_vectors; ++i) { + GenerateAndAddVector(tiered_index, dim, training_threshold + i); + // Verify: numThreads unchanged, lastReservedThreads reflects we used only one thread + backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); + ASSERT_EQ(mock_thread_pool.jobQ.size(), 0); + } +} + TYPED_TEST(SVSTieredIndexTest, CreateIndexInstance) { // Create TieredSVS index instance with a mock queue. SVSParams params = {.type = TypeParam::get_index_type(), From 26b35fbeb7b2e03edabde11702a3ee896442b099 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 20 Oct 2025 05:01:04 +0000 Subject: [PATCH 7/9] change to 5 ms --- src/VecSim/algorithms/svs/svs_utils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index 454d81b36..2e240358f 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -27,7 +27,7 @@ // Maximum training threshold for SVS index, used to limit the size of training data constexpr size_t SVS_MAX_TRAINING_THRESHOLD = 100 * DEFAULT_BLOCK_SIZE; // 100 * 1024 vectors // Default wait time for the update job in microseconds -constexpr size_t SVS_DEFAULT_UPDATE_JOB_WAIT_TIME = 1000; // 1 ms +constexpr size_t SVS_DEFAULT_UPDATE_JOB_WAIT_TIME = 5000; // 5 ms namespace svs_details { // VecSim->SVS data type conversion From 5a7305e1ae9090d6473903ca29aa4287282f8c1a Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 20 Oct 2025 16:10:27 +0000 Subject: [PATCH 8/9] test add vector async --- tests/unit/test_svs_tiered.cpp | 49 +++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index 5dc221726..d187c8299 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -421,6 +421,51 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCountWriteInPlace) { } } +TYPED_TEST(SVSTieredIndexTest, TestAddOneVectorAsync) { + auto mock_thread_pool = tieredIndexMock(); + const auto num_threads = mock_thread_pool.thread_pool_size; + if (num_threads < 2) { + // If the number of threads is less than 2, this test has no point. + GTEST_SKIP() << "No threads available"; + } + constexpr size_t training_threshold = 1; + constexpr size_t update_threshold = 1; + constexpr size_t update_job_wait_time = 10000; + constexpr size_t dim = 4; + SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2}; + VecSimParams svs_params = CreateParams(params); + + // Create TieredSVS index instance with a mock queue. + auto *tiered_index = this->CreateTieredSVSIndex( + svs_params, mock_thread_pool, training_threshold, update_threshold, update_job_wait_time); + ASSERT_INDEX(tiered_index); + mock_thread_pool.init_threads(); + + // Verify initial state: both fields should equal configured thread count + VecSimIndexDebugInfo backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + + // Add one vector - this should trigger background training + GenerateAndAddVector(tiered_index, dim, 0); + mock_thread_pool.thread_pool_wait(); + ASSERT_EQ(tiered_index->GetBackendIndex()->indexSize(), 1); + + // Verify: numThreads unchanged, and we used one thread + backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); + + // add another vectors to trigger background indexing + GenerateAndAddVector(tiered_index, dim, 1); + mock_thread_pool.thread_pool_join(); + ASSERT_EQ(tiered_index->GetBackendIndex()->indexSize(), 2); + + // Verify: yet again, numThreads unchanged, and we used one thread + backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); + ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); +} + TYPED_TEST(SVSTieredIndexTest, CreateIndexInstance) { // Create TieredSVS index instance with a mock queue. SVSParams params = {.type = TypeParam::get_index_type(), @@ -466,7 +511,7 @@ TYPED_TEST(SVSTieredIndexTest, addVector) { auto mock_thread_pool = tieredIndexMock(); - auto tiered_params = this->CreateTieredSVSParams(svs_params, mock_thread_pool, 1, 1); + auto tiered_params = this->CreateTieredSVSParams(svs_params, mock_thread_pool, 0, 1); auto *tiered_index = this->CreateTieredSVSIndex(tiered_params, mock_thread_pool); ASSERT_INDEX(tiered_index); @@ -482,6 +527,7 @@ TYPED_TEST(SVSTieredIndexTest, addVector) { ASSERT_LE(expected_mem, tiered_index->getAllocationSize()); ASSERT_GE(expected_mem * 1.02, tiered_index->getAllocationSize()); ASSERT_EQ(mock_thread_pool.jobQ.size(), 0); + mock_thread_pool.init_threads(); // Create a vector and add it to the tiered index. labelType vec_label = 1; @@ -496,6 +542,7 @@ TYPED_TEST(SVSTieredIndexTest, addVector) { ASSERT_EQ(tiered_index->indexCapacity(), DEFAULT_BLOCK_SIZE); ASSERT_EQ(tiered_index->GetFlatIndex()->getDistanceFrom_Unsafe(vec_label, vector), 0); ASSERT_EQ(mock_thread_pool.jobQ.size(), mock_thread_pool.thread_pool_size); + mock_thread_pool.thread_pool_wait(); // Account for the allocation of a new block due to the vector insertion. expected_mem += (BruteForceFactory::EstimateElementSize(&bf_params)) * DEFAULT_BLOCK_SIZE; From 275da794969773c34b0be38709aca572323989dc Mon Sep 17 00:00:00 2001 From: meiravgri Date: Tue, 21 Oct 2025 04:31:28 +0000 Subject: [PATCH 9/9] rever unrelated chcnages --- tests/unit/test_svs_tiered.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index d187c8299..d11e3b021 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -511,7 +511,7 @@ TYPED_TEST(SVSTieredIndexTest, addVector) { auto mock_thread_pool = tieredIndexMock(); - auto tiered_params = this->CreateTieredSVSParams(svs_params, mock_thread_pool, 0, 1); + auto tiered_params = this->CreateTieredSVSParams(svs_params, mock_thread_pool, 1, 1); auto *tiered_index = this->CreateTieredSVSIndex(tiered_params, mock_thread_pool); ASSERT_INDEX(tiered_index); @@ -527,7 +527,6 @@ TYPED_TEST(SVSTieredIndexTest, addVector) { ASSERT_LE(expected_mem, tiered_index->getAllocationSize()); ASSERT_GE(expected_mem * 1.02, tiered_index->getAllocationSize()); ASSERT_EQ(mock_thread_pool.jobQ.size(), 0); - mock_thread_pool.init_threads(); // Create a vector and add it to the tiered index. labelType vec_label = 1; @@ -542,7 +541,6 @@ TYPED_TEST(SVSTieredIndexTest, addVector) { ASSERT_EQ(tiered_index->indexCapacity(), DEFAULT_BLOCK_SIZE); ASSERT_EQ(tiered_index->GetFlatIndex()->getDistanceFrom_Unsafe(vec_label, vector), 0); ASSERT_EQ(mock_thread_pool.jobQ.size(), mock_thread_pool.thread_pool_size); - mock_thread_pool.thread_pool_wait(); // Account for the allocation of a new block due to the vector insertion. expected_mem += (BruteForceFactory::EstimateElementSize(&bf_params)) * DEFAULT_BLOCK_SIZE;