Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions src/VecSim/algorithms/svs/svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ struct SVSIndexBase
size_t num_marked_deleted;
};

/** Thread Management Strategy:
* - addVector(): Requires numThreads == 1
* - addVectors(): Allows any numThreads value, but prohibits n=1 with numThreads>1
* - Callers are responsible for setting appropriate thread counts
**/
template <typename MetricType, typename DataType, bool isMulti, size_t QuantBits,
size_t ResidualBits, bool IsLeanVec>
class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, float>,
Expand Down Expand Up @@ -199,6 +204,12 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return processed_blob;
}

// Assuming numThreads was updated to reflect the number of available threads before this
// function was called.
// This function assumes that the caller has already set numThreads to the appropriate value
// for the operation.
// Important NOTE: For single vector operations (n=1), numThreads should be 1.
// For bulk operations (n>1), numThreads should reflect the number of available threads.
int addVectorsImpl(const void *vectors_data, const labelType *labels, size_t n) {
if (n == 0) {
return 0;
Expand All @@ -217,12 +228,6 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
// Wrap data into SVS SimpleDataView for SVS API
auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim};

// If n == 1, we should ensure single-threading
const size_t current_num_threads = getNumThreads();
if (n == 1 && current_num_threads > 1) {
setNumThreads(1);
}

if (!impl_) {
// SVS index instance cannot be empty, so we have to construct it at first rows
initImpl(points, ids);
Expand All @@ -231,11 +236,6 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
impl_->add_points(points, ids);
}

// Restore multi-threading if needed
if (n == 1 && current_num_threads > 1) {
setNumThreads(current_num_threads);
}

return n - deleted_num;
}

Expand Down Expand Up @@ -374,7 +374,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
.maxCandidatePoolSize = this->buildParams.max_candidate_pool_size,
.pruneTo = this->buildParams.prune_to,
.useSearchHistory = this->buildParams.use_full_search_history,
.numThreads = this->getNumThreads(),
.numThreads = this->getThreadPoolCapacity(),
.lastReservedThreads = this->getNumThreads(),
.numberOfMarkedDeletedNodes = this->num_marked_deleted,
.searchWindowSize = this->search_window_size,
.searchBufferCapacity = this->search_buffer_capacity,
Expand Down Expand Up @@ -443,6 +444,11 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.numThreads}}});

infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_LAST_RESERVED_THREADS_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.lastReservedThreads}}});

infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::NUM_MARKED_DELETED,
.fieldType = INFOFIELD_UINT64,
Expand Down Expand Up @@ -472,10 +478,18 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
}

int addVector(const void *vector_data, labelType label) override {
// Enforce single-threaded execution for single vector operations to ensure optimal
// performance and consistent behavior. Callers must set numThreads=1 before calling this
// method.
assert(getNumThreads() == 1 && "Can't use more than one thread to insert a single vector");
return addVectorsImpl(vector_data, &label, 1);
}

int addVectors(const void *vectors_data, const labelType *labels, size_t n) override {
// Prevent misuse: single vector operations should use addVector(), not addVectors() with
// n=1 This ensures proper thread management and API contract enforcement.
assert(!(n == 1 && getNumThreads() > 1) &&
"Can't use more than one thread to insert a single vector");
return addVectorsImpl(vectors_data, labels, n);
}

Expand Down
7 changes: 6 additions & 1 deletion src/VecSim/algorithms/svs/svs_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,12 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
// prevent update job from running in parallel and lock any access to the backend
// index
std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard);
return svs_index->addVectors(storage_blob.get(), &label, 1);
// Set available thread count to 1 for single vector write-in-place operation.
// This maintains the contract that single vector operations use exactly one thread.
// TODO: Replace this setNumThreads(1) call with an assertion once we establish
// a contract that write-in-place mode guarantees numThreads == 1.
svs_index->setNumThreads(1);
return this->backendIndex->addVector(storage_blob.get(), label);
}
}
assert(this->getWriteMode() != VecSim_WriteInPlace && "InPlace mode returns early");
Expand Down
2 changes: 1 addition & 1 deletion src/VecSim/algorithms/svs/svs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// Maximum training threshold for SVS index, used to limit the size of training data
constexpr size_t SVS_MAX_TRAINING_THRESHOLD = 100 * DEFAULT_BLOCK_SIZE; // 100 * 1024 vectors
// Default wait time for the update job in microseconds
constexpr size_t SVS_DEFAULT_UPDATE_JOB_WAIT_TIME = 100; // 0.1 ms
constexpr size_t SVS_DEFAULT_UPDATE_JOB_WAIT_TIME = 5000; // 5 ms

namespace svs_details {
// VecSim->SVS data type conversion
Expand Down
1 change: 1 addition & 0 deletions src/VecSim/utils/vec_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const char *VecSimCommonStrings::SVS_GRAPH_MAX_DEGREE_STRING = "GRAPH_MAX_DEGREE
const char *VecSimCommonStrings::SVS_MAX_CANDIDATE_POOL_SIZE_STRING = "MAX_CANDIDATE_POOL_SIZE";
const char *VecSimCommonStrings::SVS_PRUNE_TO_STRING = "PRUNE_TO";
const char *VecSimCommonStrings::SVS_NUM_THREADS_STRING = "NUM_THREADS";
const char *VecSimCommonStrings::SVS_LAST_RESERVED_THREADS_STRING = "LAST_RESERVED_NUM_THREADS";
const char *VecSimCommonStrings::SVS_LEANVEC_DIM_STRING = "LEANVEC_DIMENSION";

const char *VecSimCommonStrings::BLOCK_SIZE_STRING = "BLOCK_SIZE";
Expand Down
1 change: 1 addition & 0 deletions src/VecSim/utils/vec_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct VecSimCommonStrings {
static const char *SVS_MAX_CANDIDATE_POOL_SIZE_STRING;
static const char *SVS_PRUNE_TO_STRING;
static const char *SVS_NUM_THREADS_STRING;
static const char *SVS_LAST_RESERVED_THREADS_STRING;
static const char *SVS_LEANVEC_DIM_STRING;

static const char *BLOCK_SIZE_STRING;
Expand Down
2 changes: 2 additions & 0 deletions src/VecSim/vec_sim_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ typedef struct {
bool useSearchHistory; // Either the contents of the search buffer can be used or
// the entire search history.
size_t numThreads; // Maximum number of threads to be used by svs for ingestion.
size_t lastReservedThreads; // Number of threads that were successfully reserved by the last
// ingestion operation.
size_t numberOfMarkedDeletedNodes; // The number of nodes that are marked as deleted.
size_t searchWindowSize; // Search window size for Vamana graph accuracy/latency tune.
size_t searchBufferCapacity; // Search buffer capacity for Vamana graph accuracy/latency tune.
Expand Down
22 changes: 19 additions & 3 deletions tests/unit/test_svs_fp16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2223,26 +2223,42 @@ class FP16SVSTieredIndexTest : public FP16SVSTest<index_type_t> {
TieredSVSParams{.trainingTriggerThreshold = training_threshold,
.updateTriggerThreshold = update_threshold}}};
}
void verifyNumThreads(TieredSVSIndex<data_t> *tiered_index, size_t expected_num_threads,
size_t expected_capcity) {
ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads);
ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity);
}

TieredSVSIndex<data_t> *CreateTieredSVSIndex(const TieredIndexParams &tiered_params,
tieredIndexMock &mock_thread_pool) {
tieredIndexMock &mock_thread_pool,
size_t num_available_threads = 1) {
auto *tiered_index =
reinterpret_cast<TieredSVSIndex<data_t> *>(TieredFactory::NewIndex(&tiered_params));

// Set the created tiered index in the index external context (it will take ownership over
// the index, and we'll need to release the ctx at the end of the test.
mock_thread_pool.ctx->index_strong_ref.reset(tiered_index);

// Set number of available threads to 1 unless specified otherwise,
// so we can insert one vector at a time directly to svs.
tiered_index->GetSVSIndex()->setNumThreads(num_available_threads);
size_t params_threadpool_size =
tiered_params.primaryIndexParams->algoParams.svsParams.num_threads;
size_t expected_capacity =
params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size;
verifyNumThreads(tiered_index, num_available_threads, expected_capacity);
return tiered_index;
}

TieredSVSIndex<data_t> *
CreateTieredSVSIndex(VecSimParams &svs_params, tieredIndexMock &mock_thread_pool,
size_t training_threshold = SVS_VAMANA_DEFAULT_TRAINING_THRESHOLD,
size_t update_threshold = SVS_VAMANA_DEFAULT_UPDATE_THRESHOLD) {
size_t update_threshold = SVS_VAMANA_DEFAULT_UPDATE_THRESHOLD,
size_t num_available_threads = 1) {
svs_params.algoParams.svsParams.quantBits = index_type_t::get_quant_bits();
TieredIndexParams tiered_params = CreateTieredSVSParams(
svs_params, mock_thread_pool, training_threshold, update_threshold);
return CreateTieredSVSIndex(tiered_params, mock_thread_pool);
return CreateTieredSVSIndex(tiered_params, mock_thread_pool, num_available_threads);
}

void SetUp() override {
Expand Down
Loading
Loading