diff --git a/.gitmodules b/.gitmodules
index a6fa563cf..40da5edf0 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -2,3 +2,12 @@
path = ThirdParty/zstd
url = https://github.com/facebook/zstd
branch = release
+[submodule "ThirdParty/spdk"]
+ path = ThirdParty/spdk
+ url = https://github.com/spdk/spdk
+[submodule "ThirdParty/isal-l_crypto"]
+ path = ThirdParty/isal-l_crypto
+ url = https://github.com/intel/isa-l_crypto
+[submodule "ThirdParty/RocksDB"]
+ path = ThirdParty/RocksDB
+ url = git@github.com:facebook/rocksdb.git
diff --git a/AnnService/CMakeLists.txt b/AnnService/CMakeLists.txt
index de5f588e1..4e24a6bca 100644
--- a/AnnService/CMakeLists.txt
+++ b/AnnService/CMakeLists.txt
@@ -3,9 +3,16 @@
set(AnnService ${PROJECT_SOURCE_DIR}/AnnService)
set(Zstd ${PROJECT_SOURCE_DIR}/ThirdParty/zstd)
+set(Spdk ${PROJECT_SOURCE_DIR}/ThirdParty/spdk/build)
+set(Dpdk ${PROJECT_SOURCE_DIR}/ThirdParty/spdk/dpdk/build)
+set(IsalLCrypto ${PROJECT_SOURCE_DIR}/ThirdParty/isal-l_crypto/.libs/libisal_crypto.a)
+set(SpdkLibPrefix ${Spdk}/lib/libspdk_)
+set(DpdkLibPrefix ${Dpdk}/lib/librte_)
+set(SPDK_LIBRARIES -Wl,--whole-archive ${SpdkLibPrefix}bdev_nvme.a ${SpdkLibPrefix}bdev.a ${SpdkLibPrefix}nvme.a ${SpdkLibPrefix}vfio_user.a ${SpdkLibPrefix}sock.a ${SpdkLibPrefix}dma.a ${SpdkLibPrefix}notify.a ${SpdkLibPrefix}accel.a ${SpdkLibPrefix}event_bdev.a ${SpdkLibPrefix}event_accel.a ${SpdkLibPrefix}vmd.a ${SpdkLibPrefix}event_vmd.a ${SpdkLibPrefix}event_sock.a ${SpdkLibPrefix}event_iobuf.a ${SpdkLibPrefix}event.a ${SpdkLibPrefix}env_dpdk.a ${SpdkLibPrefix}log.a ${SpdkLibPrefix}thread.a ${SpdkLibPrefix}rpc.a ${SpdkLibPrefix}init.a ${SpdkLibPrefix}jsonrpc.a ${SpdkLibPrefix}json.a ${SpdkLibPrefix}trace.a ${SpdkLibPrefix}util.a ${DpdkLibPrefix}mempool.a ${DpdkLibPrefix}mempool_ring.a ${DpdkLibPrefix}eal.a ${DpdkLibPrefix}ring.a ${DpdkLibPrefix}telemetry.a ${DpdkLibPrefix}bus_pci.a ${DpdkLibPrefix}kvargs.a ${DpdkLibPrefix}pci.a -Wl,--no-whole-archive dl rt isal ${IsalLCrypto} uuid)
include_directories(${AnnService})
include_directories(${Zstd}/lib)
+include_directories(${Spdk}/include)
file(GLOB_RECURSE HDR_FILES ${AnnService}/inc/Core/*.h ${AnnService}/inc/Helper/*.h)
file(GLOB_RECURSE SRC_FILES ${AnnService}/src/Core/*.cpp ${AnnService}/src/Helper/*.cpp)
@@ -37,10 +44,28 @@ if(${CMAKE_CXX_COMPILER_ID} STREQUAL "GNU")
target_compile_options(DistanceUtils PRIVATE -mavx2 -mavx -msse -msse2 -mavx512f -mavx512bw -mavx512dq -fPIC)
endif()
+find_package(RocksDB CONFIG)
+if((DEFINED RocksDB_DIR) AND RocksDB_DIR)
+ list(APPEND RocksDB_LIBRARIES RocksDB::rocksdb)
+ find_package(uring)
+ set(uring_LIBRARIES "")
+ message (STATUS "Found RocksDB ${RocksDB_VERSION}")
+ message (STATUS "RocksDB: ${RocksDB_DIR}")
+else()
+ set(RocksDB_LIBRARIES "")
+ set(uring_LIBRARIES "")
+ message (FATAL_ERROR "Could not find RocksDB!")
+endif()
+
+find_package(TBB REQUIRED)
+if (TBB_FOUND)
+ message (STATUS "Found TBB")
+endif()
+
add_library (SPTAGLib SHARED ${SRC_FILES} ${HDR_FILES})
-target_link_libraries (SPTAGLib DistanceUtils libzstd_shared ${NUMA_LIBRARY})
+target_link_libraries (SPTAGLib DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_shared ${NUMA_LIBRARY} tbb ${SPDK_LIBRARIES})
add_library (SPTAGLibStatic STATIC ${SRC_FILES} ${HDR_FILES})
-target_link_libraries (SPTAGLibStatic DistanceUtils libzstd_static ${NUMA_LIBRARY_STATIC})
+target_link_libraries (SPTAGLibStatic DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_static ${NUMA_LIBRARY_STATIC} tbb ${SPDK_LIBRARIES})
if(${CMAKE_CXX_COMPILER_ID} STREQUAL "GNU")
target_compile_options(SPTAGLibStatic PRIVATE -fPIC)
endif()
@@ -88,9 +113,15 @@ endif()
file(GLOB_RECURSE SSD_SERVING_HDR_FILES ${AnnService}/inc/SSDServing/*.h)
file(GLOB_RECURSE SSD_SERVING_FILES ${AnnService}/src/SSDServing/*.cpp)
+file(GLOB_RECURSE SPFRESH_HDR_FILES ${AnnService}/inc/SPFresh/*.h)
+file(GLOB_RECURSE SPFRESH_FILES ${AnnService}/src/SPFresh/*.cpp)
+
add_executable(ssdserving ${SSD_SERVING_HDR_FILES} ${SSD_SERVING_FILES})
-target_link_libraries(ssdserving SPTAGLibStatic ${Boost_LIBRARIES})
+add_executable(spfresh ${SPFRESH_HDR_FILES} ${SPFRESH_FILES})
+target_link_libraries(ssdserving SPTAGLibStatic ${Boost_LIBRARIES} ${RocksDB_LIBRARIES})
+target_link_libraries(spfresh SPTAGLibStatic ${Boost_LIBRARIES} ${RocksDB_LIBRARIES})
target_compile_definitions(ssdserving PRIVATE _exe)
+target_compile_definitions(spfresh PRIVATE _exe)
# for Test
add_library(ssdservingLib ${SSD_SERVING_HDR_FILES} ${SSD_SERVING_FILES})
diff --git a/AnnService/CoreLibrary.vcxproj b/AnnService/CoreLibrary.vcxproj
index 78295d087..eb4c56f2f 100644
--- a/AnnService/CoreLibrary.vcxproj
+++ b/AnnService/CoreLibrary.vcxproj
@@ -149,6 +149,7 @@
+
@@ -167,7 +168,10 @@
-
+
+
+
+
@@ -182,6 +186,7 @@
+
@@ -229,11 +234,15 @@
+
+
This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.
+
+
\ No newline at end of file
diff --git a/AnnService/CoreLibrary.vcxproj.filters b/AnnService/CoreLibrary.vcxproj.filters
index d79d57a50..9f617827a 100644
--- a/AnnService/CoreLibrary.vcxproj.filters
+++ b/AnnService/CoreLibrary.vcxproj.filters
@@ -196,7 +196,7 @@
Header Files\Core\SPANN
-
+
Header Files\Core\SPANN
@@ -220,6 +220,21 @@
Header Files\Core\Common
+
+ Header Files\Core\SPANN
+
+
+ Header Files\Core\SPANN
+
+
+ Header Files\Helper
+
+
+ Header Files\Core\Common
+
+
+ Header Files\Core\SPANN
+
diff --git a/AnnService/inc/Core/BKT/Index.h b/AnnService/inc/Core/BKT/Index.h
index c8c18f2e2..e263daca7 100644
--- a/AnnService/inc/Core/BKT/Index.h
+++ b/AnnService/inc/Core/BKT/Index.h
@@ -178,6 +178,8 @@ namespace SPTAG
ErrorCode RefineSearchIndex(QueryResult &p_query, bool p_searchDeleted = false) const;
ErrorCode SearchTree(QueryResult &p_query) const;
ErrorCode AddIndex(const void* p_data, SizeType p_vectorNum, DimensionType p_dimension, std::shared_ptr p_metadataSet, bool p_withMetaIndex = false, bool p_normalized = false);
+ ErrorCode AddIndexIdx(SizeType begin, SizeType end);
+ ErrorCode AddIndexId(const void* p_data, SizeType p_vectorNum, DimensionType p_dimension, int& beginHead, int& endHead);
ErrorCode DeleteIndex(const void* p_vectors, SizeType p_vectorNum);
ErrorCode DeleteIndex(const SizeType& p_id);
@@ -209,7 +211,6 @@ namespace SPTAG
private:
void SearchIndex(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_space, bool p_searchDeleted, bool p_searchDuplicated, std::function filterFunc = nullptr) const;
-
template &, SizeType, float), bool(*checkFilter)(const std::shared_ptr&, SizeType, std::function)>
void Search(COMMON::QueryResultSet& p_query, COMMON::WorkSpace& p_space, std::function filterFunc) const;
};
diff --git a/AnnService/inc/Core/Common.h b/AnnService/inc/Core/Common.h
index afa8f1dc7..8775d4064 100644
--- a/AnnService/inc/Core/Common.h
+++ b/AnnService/inc/Core/Common.h
@@ -67,6 +67,7 @@ inline T max(T a, T b) {
#define InterlockedExchange8(a,b) __sync_lock_test_and_set(a, b)
#define Sleep(a) usleep(a * 1000)
#define strtok_s(a, b, c) strtok_r(a, b, c)
+#define ALIGN_ROUND(size) ((size) + 31) / 32 * 32
#else
@@ -98,6 +99,17 @@ inline bool fileexists(const TCHAR* path) {
#define min(a,b) (((a) < (b)) ? (a) : (b))
#endif
+FORCEINLINE
+char
+InterlockedCompareExchange(
+ _Inout_ _Interlocked_operand_ char volatile* Destination,
+ _In_ char Exchange,
+ _In_ char Comperand
+)
+{
+ return (char)_InterlockedCompareExchange8(Destination, Exchange, Comperand);
+}
+
#endif
namespace SPTAG
@@ -114,6 +126,8 @@ namespace SPTAG
#define PAGE_FREE(ptr) ::operator delete(ptr, (std::align_val_t)512)
#endif
+#define ALIGN_ROUND(size) ((size) + 31) / 32 * 32
+
typedef std::int32_t SizeType;
typedef std::int32_t DimensionType;
diff --git a/AnnService/inc/Core/Common/Dataset.h b/AnnService/inc/Core/Common/Dataset.h
index a2183cf81..810a41d10 100644
--- a/AnnService/inc/Core/Common/Dataset.h
+++ b/AnnService/inc/Core/Common/Dataset.h
@@ -4,15 +4,12 @@
#ifndef _SPTAG_COMMON_DATASET_H_
#define _SPTAG_COMMON_DATASET_H_
-#include "inc/Helper/Logging.h"
-#include
-#include
-
namespace SPTAG
{
namespace COMMON
{
// structure to save Data and Graph
+ /*
template
class Dataset
{
@@ -31,7 +28,7 @@ namespace SPTAG
public:
Dataset() {}
- Dataset(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, T* data_ = nullptr, bool shareOwnership_ = true)
+ Dataset(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, const void* data_ = nullptr, bool shareOwnership_ = true)
{
Initialize(rows_, cols_, rowsInBlock_, capacity_, data_, shareOwnership_);
}
@@ -41,11 +38,17 @@ namespace SPTAG
for (T* ptr : incBlocks) ALIGN_FREE(ptr);
incBlocks.clear();
}
- void Initialize(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, T* data_ = nullptr, bool shareOwnership_ = true)
+ void Initialize(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, const void* data_ = nullptr, bool shareOwnership_ = true)
{
+ if (data != nullptr) {
+ if (ownData) ALIGN_FREE(data);
+ for (T* ptr : incBlocks) ALIGN_FREE(ptr);
+ incBlocks.clear();
+ }
+
rows = rows_;
cols = cols_;
- data = data_;
+ data = (T*)data_;
if (data_ == nullptr || !shareOwnership_)
{
ownData = true;
@@ -77,20 +80,11 @@ namespace SPTAG
inline const T* At(SizeType index) const
{
- if (index < R() && index >= 0)
- {
- if (index >= rows) {
- SizeType incIndex = index - rows;
- return incBlocks[incIndex >> rowsInBlockEx] + ((size_t)(incIndex & rowsInBlock)) * cols;
- }
- return data + ((size_t)index) * cols;
- }
- else
- {
- std::ostringstream oss;
- oss << "Index out of range in Dataset. Index: " << index << " Size: " << R();
- throw std::out_of_range(oss.str());
+ if (index >= rows) {
+ SizeType incIndex = index - rows;
+ return incBlocks[incIndex >> rowsInBlockEx] + ((size_t)(incIndex & rowsInBlock)) * cols;
}
+ return data + ((size_t)index) * cols;
}
T* operator[](SizeType index)
@@ -103,7 +97,7 @@ namespace SPTAG
return At(index);
}
- ErrorCode AddBatch(const T* pData, SizeType num)
+ ErrorCode AddBatch(SizeType num, const T* pData = nullptr)
{
if (R() > maxRows - num) return ErrorCode::MemoryOverFlow;
@@ -111,38 +105,20 @@ namespace SPTAG
while (written < num) {
SizeType curBlockIdx = ((incRows + written) >> rowsInBlockEx);
if (curBlockIdx >= (SizeType)incBlocks.size()) {
- T* newBlock = (T*)ALIGN_ALLOC(((size_t)rowsInBlock + 1) * cols * sizeof(T));
+ T* newBlock = (T*)ALIGN_ALLOC(sizeof(T) * (rowsInBlock + 1) * cols);
if (newBlock == nullptr) return ErrorCode::MemoryOverFlow;
+ std::memset(newBlock, -1, sizeof(T) * (rowsInBlock + 1) * cols);
incBlocks.push_back(newBlock);
}
SizeType curBlockPos = ((incRows + written) & rowsInBlock);
SizeType toWrite = min(rowsInBlock + 1 - curBlockPos, num - written);
- std::memcpy(incBlocks[curBlockIdx] + ((size_t)curBlockPos) * cols, pData + ((size_t)written) * cols, ((size_t)toWrite) * cols * sizeof(T));
+ if (pData != nullptr) std::memcpy(incBlocks[curBlockIdx] + ((size_t)curBlockPos) * cols, pData + ((size_t)written) * cols, ((size_t)toWrite) * cols * sizeof(T));
written += toWrite;
}
incRows += written;
return ErrorCode::Success;
}
- ErrorCode AddBatch(SizeType num)
- {
- if (R() > maxRows - num) return ErrorCode::MemoryOverFlow;
-
- SizeType written = 0;
- while (written < num) {
- SizeType curBlockIdx = (incRows + written) >> rowsInBlockEx;
- if (curBlockIdx >= (SizeType)incBlocks.size()) {
- T* newBlock = (T*)ALIGN_ALLOC(sizeof(T) * (rowsInBlock + 1) * cols);
- if (newBlock == nullptr) return ErrorCode::MemoryOverFlow;
- std::memset(newBlock, -1, sizeof(T) * (rowsInBlock + 1) * cols);
- incBlocks.push_back(newBlock);
- }
- written += min(rowsInBlock + 1 - ((incRows + written) & rowsInBlock), num - written);
- }
- incRows += written;
- return ErrorCode::Success;
- }
-
ErrorCode Save(std::shared_ptr p_out) const
{
SizeType CR = R();
@@ -203,12 +179,12 @@ namespace SPTAG
return ErrorCode::Success;
}
- ErrorCode Refine(const std::vector& indices, Dataset& p_data) const
+ ErrorCode Refine(const std::vector& indices, Dataset& data) const
{
SizeType R = (SizeType)(indices.size());
- p_data.Initialize(R, cols, rowsInBlock + 1, static_cast(incBlocks.capacity() * (rowsInBlock + 1)));
+ data.Initialize(R, cols, rowsInBlock + 1, static_cast(incBlocks.capacity() * (rowsInBlock + 1)));
for (SizeType i = 0; i < R; i++) {
- std::memcpy((void*)p_data.At(i), (void*)this->At(indices[i]), sizeof(T) * cols);
+ std::memcpy((void*)data.At(i), (void*)this->At(indices[i]), sizeof(T) * cols);
}
return ErrorCode::Success;
}
@@ -234,6 +210,272 @@ namespace SPTAG
return Refine(indices, ptr);
}
};
+ */
+ template
+ class Dataset
+ {
+ private:
+ std::string name = "Data";
+ SizeType rows = 0;
+ DimensionType cols = 1;
+ char* data = nullptr;
+ bool ownData = false;
+ SizeType incRows = 0;
+ SizeType maxRows;
+ SizeType rowsInBlock;
+ SizeType rowsInBlockEx;
+ std::shared_ptr> incBlocks;
+
+ DimensionType colStart = 0;
+ DimensionType mycols = 0;
+
+ public:
+ Dataset() {}
+
+ Dataset(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, const void* data_ = nullptr, bool shareOwnership_ = true, std::shared_ptr> incBlocks_ = nullptr, int colStart_ = 0, int rowEnd_ = -1)
+ {
+ Initialize(rows_, cols_, rowsInBlock_, capacity_, data_, shareOwnership_, incBlocks_, colStart_, rowEnd_);
+ }
+ ~Dataset()
+ {
+ if (ownData) ALIGN_FREE(data);
+ for (char* ptr : *incBlocks) ALIGN_FREE(ptr);
+ incBlocks->clear();
+ }
+
+ void Initialize(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, const void* data_ = nullptr, bool shareOwnership_ = true, std::shared_ptr> incBlocks_ = nullptr, int colStart_ = 0, int rowEnd_ = -1)
+ {
+ if (data != nullptr) {
+ if (ownData) ALIGN_FREE(data);
+ for (char* ptr : *incBlocks) ALIGN_FREE(ptr);
+ incBlocks->clear();
+ }
+
+ rows = rows_;
+ if (rowEnd_ >= colStart_) cols = rowEnd_;
+ else cols = cols_ * sizeof(T);
+ data = (char*)data_;
+ if (data_ == nullptr || !shareOwnership_)
+ {
+ ownData = true;
+ data = (char*)ALIGN_ALLOC(((size_t)rows) * cols);
+ if (data_ != nullptr) memcpy(data, data_, ((size_t)rows) * cols);
+ else std::memset(data, -1, ((size_t)rows) * cols);
+ }
+ maxRows = capacity_;
+ rowsInBlockEx = static_cast(ceil(log2(rowsInBlock_)));
+ rowsInBlock = (1 << rowsInBlockEx) - 1;
+ incBlocks = incBlocks_;
+ if (incBlocks == nullptr) incBlocks.reset(new std::vector());
+ incBlocks->reserve((static_cast(capacity_) + rowsInBlock) >> rowsInBlockEx);
+
+ colStart = colStart_;
+ mycols = cols_;
+ }
+
+ bool IsReady() const { return data != nullptr; }
+
+ void SetName(const std::string& name_) { name = name_; }
+ const std::string& Name() const { return name; }
+
+ void SetR(SizeType R_)
+ {
+ if (R_ >= rows)
+ incRows = R_ - rows;
+ else
+ {
+ rows = R_;
+ incRows = 0;
+ }
+ }
+
+ inline SizeType R() const { return rows + incRows; }
+ inline const DimensionType& C() const { return mycols; }
+ inline std::uint64_t BufferSize() const { return sizeof(SizeType) + sizeof(DimensionType) + sizeof(T) * R() * C(); }
+
+#define GETITEM(index) \
+ if (index >= rows) { \
+ SizeType incIndex = index - rows; \
+ return (T*)((*incBlocks)[incIndex >> rowsInBlockEx] + ((size_t)(incIndex & rowsInBlock)) * cols + colStart); \
+ } \
+ return (T*)(data + ((size_t)index) * cols + colStart); \
+
+ inline const T* At(SizeType index) const
+ {
+ GETITEM(index)
+ }
+
+ inline T* At(SizeType index)
+ {
+ GETITEM(index)
+ }
+
+ inline T* operator[](SizeType index)
+ {
+ GETITEM(index)
+ }
+
+ inline const T* operator[](SizeType index) const
+ {
+ GETITEM(index)
+ }
+
+#undef GETITEM
+
+ ErrorCode AddBatch(SizeType num, const T* pData = nullptr)
+ {
+ if (colStart != 0) return ErrorCode::Success;
+ if (R() > maxRows - num) return ErrorCode::MemoryOverFlow;
+
+ SizeType written = 0;
+ while (written < num) {
+ SizeType curBlockIdx = ((incRows + written) >> rowsInBlockEx);
+ if (curBlockIdx >= (SizeType)(incBlocks->size())) {
+ char* newBlock = (char*)ALIGN_ALLOC(((size_t)rowsInBlock + 1) * cols);
+ if (newBlock == nullptr) return ErrorCode::MemoryOverFlow;
+ std::memset(newBlock, -1, ((size_t)rowsInBlock + 1) * cols);
+ incBlocks->push_back(newBlock);
+ }
+ SizeType curBlockPos = ((incRows + written) & rowsInBlock);
+ SizeType toWrite = min(rowsInBlock + 1 - curBlockPos, num - written);
+ if (pData) {
+ for (int i = 0; i < toWrite; i++) {
+ std::memcpy((*incBlocks)[curBlockIdx] + ((size_t)curBlockPos + i) * cols + colStart, pData + ((size_t)written + i) * mycols, mycols * sizeof(T));
+ }
+ }
+ written += toWrite;
+ }
+ incRows += written;
+ return ErrorCode::Success;
+ }
+
+ ErrorCode Save(std::shared_ptr p_out) const
+ {
+ SizeType CR = R();
+ IOBINARY(p_out, WriteBinary, sizeof(SizeType), (char*)&CR);
+ IOBINARY(p_out, WriteBinary, sizeof(DimensionType), (char*)&mycols);
+ for (SizeType i = 0; i < CR; i++) {
+ IOBINARY(p_out, WriteBinary, sizeof(T) * mycols, (char*)At(i));
+ }
+
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Save %s (%d,%d) Finish!\n", name.c_str(), CR, mycols);
+ return ErrorCode::Success;
+ }
+
+ ErrorCode Save(std::string sDataPointsFileName) const
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Save %s To %s\n", name.c_str(), sDataPointsFileName.c_str());
+ auto ptr = f_createIO();
+ if (ptr == nullptr || !ptr->Initialize(sDataPointsFileName.c_str(), std::ios::binary | std::ios::out)) return ErrorCode::FailedCreateFile;
+ return Save(ptr);
+ }
+
+ ErrorCode Load(std::shared_ptr pInput, SizeType blockSize, SizeType capacity)
+ {
+ IOBINARY(pInput, ReadBinary, sizeof(SizeType), (char*)&(rows));
+ IOBINARY(pInput, ReadBinary, sizeof(DimensionType), (char*)&mycols);
+
+ if (data == nullptr) Initialize(rows, mycols, blockSize, capacity);
+
+ for (SizeType i = 0; i < rows; i++) {
+ IOBINARY(pInput, ReadBinary, sizeof(T) * mycols, (char*)At(i));
+ }
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load %s (%d,%d) Finish!\n", name.c_str(), rows, mycols);
+ return ErrorCode::Success;
+ }
+
+ ErrorCode Load(std::string sDataPointsFileName, SizeType blockSize, SizeType capacity)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load %s From %s\n", name.c_str(), sDataPointsFileName.c_str());
+ auto ptr = f_createIO();
+ if (ptr == nullptr || !ptr->Initialize(sDataPointsFileName.c_str(), std::ios::binary | std::ios::in)) return ErrorCode::FailedOpenFile;
+ return Load(ptr, blockSize, capacity);
+ }
+
+ // Functions for loading models from memory mapped files
+ ErrorCode Load(char* pDataPointsMemFile, SizeType blockSize, SizeType capacity)
+ {
+ SizeType R;
+ DimensionType C;
+ R = *((SizeType*)pDataPointsMemFile);
+ pDataPointsMemFile += sizeof(SizeType);
+
+ C = *((DimensionType*)pDataPointsMemFile);
+ pDataPointsMemFile += sizeof(DimensionType);
+
+ Initialize(R, C, blockSize, capacity, (char*)pDataPointsMemFile);
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load %s (%d,%d) Finish!\n", name.c_str(), R, C);
+ return ErrorCode::Success;
+ }
+
+ ErrorCode Refine(const std::vector& indices, COMMON::Dataset& dataset) const
+ {
+ SizeType newrows = (SizeType)(indices.size());
+ if (dataset.data == nullptr) dataset.Initialize(newrows, mycols, rowsInBlock + 1, static_cast(incBlocks->capacity() * (rowsInBlock + 1)));
+
+ for (SizeType i = 0; i < newrows; i++) {
+ std::memcpy((void*)dataset.At(i), (void*)At(indices[i]), sizeof(T) * mycols);
+ }
+ return ErrorCode::Success;
+ }
+
+ virtual ErrorCode Refine(const std::vector& indices, std::shared_ptr output) const
+ {
+ SizeType newrows = (SizeType)(indices.size());
+ IOBINARY(output, WriteBinary, sizeof(SizeType), (char*)&newrows);
+ IOBINARY(output, WriteBinary, sizeof(DimensionType), (char*)&mycols);
+
+ for (SizeType i = 0; i < newrows; i++) {
+ IOBINARY(output, WriteBinary, sizeof(T) * mycols, (char*)At(indices[i]));
+ }
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Save Refine %s (%d,%d) Finish!\n", name.c_str(), newrows, C());
+ return ErrorCode::Success;
+ }
+
+ virtual ErrorCode Refine(const std::vector& indices, std::string sDataPointsFileName) const
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Save Refine %s To %s\n", name.c_str(), sDataPointsFileName.c_str());
+ auto ptr = f_createIO();
+ if (ptr == nullptr || !ptr->Initialize(sDataPointsFileName.c_str(), std::ios::binary | std::ios::out)) return ErrorCode::FailedCreateFile;
+ return Refine(indices, ptr);
+ }
+ };
+
+ template
+ ErrorCode LoadOptDatasets(std::shared_ptr pVectorsInput, std::shared_ptr pGraphInput,
+ Dataset& pVectors, Dataset& pGraph, DimensionType pNeighborhoodSize,
+ SizeType blockSize, SizeType capacity) {
+ SizeType VR, GR;
+ DimensionType VC, GC;
+ IOBINARY(pVectorsInput, ReadBinary, sizeof(SizeType), (char*)&VR);
+ IOBINARY(pVectorsInput, ReadBinary, sizeof(DimensionType), (char*)&VC);
+ DimensionType totalC = ALIGN_ROUND(sizeof(T) * VC + sizeof(SizeType) * pNeighborhoodSize);
+
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "OPT TotalC: %d\n", totalC);
+ char* data = (char*)ALIGN_ALLOC(((size_t)totalC) * VR);
+ std::shared_ptr> incBlocks(new std::vector());
+
+ pVectors.Initialize(VR, VC, blockSize, capacity, data, true, incBlocks, 0, totalC);
+ pVectors.SetName("Opt" + pVectors.Name());
+ for (SizeType i = 0; i < VR; i++) {
+ IOBINARY(pVectorsInput, ReadBinary, sizeof(T) * VC, (char*)(pVectors.At(i)));
+ }
+
+ IOBINARY(pGraphInput, ReadBinary, sizeof(SizeType), (char*)&GR);
+ IOBINARY(pGraphInput, ReadBinary, sizeof(DimensionType), (char*)&GC);
+ if (GR != VR || GC != pNeighborhoodSize) return ErrorCode::DiskIOFail;
+
+ pGraph.Initialize(GR, GC, blockSize, capacity, data, false, incBlocks, sizeof(T) * VC, totalC);
+ pGraph.SetName("Opt" + pGraph.Name());
+ for (SizeType i = 0; i < VR; i++) {
+ IOBINARY(pGraphInput, ReadBinary, sizeof(SizeType) * GC, (char*)(pGraph.At(i)));
+ }
+
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load %s (%d,%d) Finish!\n", pVectors.Name().c_str(), pVectors.R(), pVectors.C());
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load %s (%d,%d) Finish!\n", pGraph.Name().c_str(), pGraph.R(), pGraph.C());
+
+ return ErrorCode::Success;
+ }
}
}
diff --git a/AnnService/inc/Core/Common/FineGrainedLock.h b/AnnService/inc/Core/Common/FineGrainedLock.h
index 98659dab2..61f3fda0f 100644
--- a/AnnService/inc/Core/Common/FineGrainedLock.h
+++ b/AnnService/inc/Core/Common/FineGrainedLock.h
@@ -38,6 +38,32 @@ namespace SPTAG
return ((unsigned)(idx * 99991) + _rotl(idx, 2) + 101) & PoolSize;
}
};
+
+ class FineGrainedRWLock {
+ public:
+ FineGrainedRWLock() {
+ m_locks.reset(new std::shared_timed_mutex[PoolSize + 1]);
+ }
+ ~FineGrainedRWLock() {}
+
+ std::shared_timed_mutex& operator[](SizeType idx) {
+ unsigned index = hash_func((unsigned)idx);
+ return m_locks[index];
+ }
+
+ const std::shared_timed_mutex& operator[](SizeType idx) const {
+ unsigned index = hash_func((unsigned)idx);
+ return m_locks[index];
+ }
+
+ inline unsigned hash_func(unsigned idx) const
+ {
+ return ((unsigned)(idx * 99991) + _rotl(idx, 2) + 101) & PoolSize;
+ }
+ private:
+ static const int PoolSize = 32767;
+ std::unique_ptr m_locks;
+ };
}
}
diff --git a/AnnService/inc/Core/Common/OPQQuantizer.h b/AnnService/inc/Core/Common/OPQQuantizer.h
index 250a29f7e..533fb3ea7 100644
--- a/AnnService/inc/Core/Common/OPQQuantizer.h
+++ b/AnnService/inc/Core/Common/OPQQuantizer.h
@@ -207,4 +207,4 @@ namespace SPTAG
}
}
-#endif _SPTAG_COMMON_OPQQUANTIZER_H_
+#endif // _SPTAG_COMMON_OPQQUANTIZER_H_
diff --git a/AnnService/inc/Core/Common/PostingSizeRecord.h b/AnnService/inc/Core/Common/PostingSizeRecord.h
new file mode 100644
index 000000000..8577a64f6
--- /dev/null
+++ b/AnnService/inc/Core/Common/PostingSizeRecord.h
@@ -0,0 +1,110 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+#ifndef _SPTAG_COMMON_POSTINGSIZERECORD_H_
+#define _SPTAG_COMMON_POSTINGSIZERECORD_H_
+
+#include
+#include "Dataset.h"
+
+namespace SPTAG
+{
+ namespace COMMON
+ {
+ class PostingSizeRecord
+ {
+ private:
+ Dataset m_data;
+
+ public:
+ PostingSizeRecord()
+ {
+ m_data.SetName("PostingSizeRecord");
+ }
+
+ void Initialize(SizeType size, SizeType blockSize, SizeType capacity)
+ {
+ m_data.Initialize(size, 1, blockSize, capacity);
+ }
+
+ inline int GetSize(const SizeType& headID)
+ {
+ return *m_data[headID];
+ }
+
+ inline bool UpdateSize(const SizeType& headID, int newSize)
+ {
+ while (true) {
+ int oldSize = GetSize(headID);
+ if (InterlockedCompareExchange((unsigned*)m_data[headID], (unsigned)newSize, (unsigned)oldSize) == oldSize) {
+ return true;
+ }
+ }
+ }
+
+ inline bool IncSize(const SizeType& headID, int appendNum)
+ {
+ while (true) {
+ int oldSize = GetSize(headID);
+ int newSize = oldSize + appendNum;
+ if (InterlockedCompareExchange((unsigned*)m_data[headID], (unsigned)newSize, (unsigned)oldSize) == oldSize) {
+ return true;
+ }
+ }
+ }
+
+ inline SizeType GetPostingNum()
+ {
+ return m_data.R();
+ }
+
+ inline ErrorCode Save(std::shared_ptr output)
+ {
+ return m_data.Save(output);
+ }
+
+ inline ErrorCode Save(const std::string& filename)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Save %s To %s\n", m_data.Name().c_str(), filename.c_str());
+ auto ptr = f_createIO();
+ if (ptr == nullptr || !ptr->Initialize(filename.c_str(), std::ios::binary | std::ios::out)) return ErrorCode::FailedCreateFile;
+ return Save(ptr);
+ }
+
+ inline ErrorCode Load(std::shared_ptr input, SizeType blockSize, SizeType capacity)
+ {
+ return m_data.Load(input, blockSize, capacity);
+ }
+
+ inline ErrorCode Load(const std::string& filename, SizeType blockSize, SizeType capacity)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load %s From %s\n", m_data.Name().c_str(), filename.c_str());
+ auto ptr = f_createIO();
+ if (ptr == nullptr || !ptr->Initialize(filename.c_str(), std::ios::binary | std::ios::in)) return ErrorCode::FailedOpenFile;
+ return Load(ptr, blockSize, capacity);
+ }
+
+ inline ErrorCode Load(char* pmemoryFile, SizeType blockSize, SizeType capacity)
+ {
+ return m_data.Load(pmemoryFile + sizeof(SizeType), blockSize, capacity);
+ }
+
+ inline ErrorCode AddBatch(SizeType num)
+ {
+ return m_data.AddBatch(num);
+ }
+
+ inline std::uint64_t BufferSize() const
+ {
+ return m_data.BufferSize() + sizeof(SizeType);
+ }
+
+ inline void SetR(SizeType num)
+ {
+ m_data.SetR(num);
+ }
+ };
+ }
+}
+
+#endif // _SPTAG_COMMON_LABELSET_H_
diff --git a/AnnService/inc/Core/Common/VersionLabel.h b/AnnService/inc/Core/Common/VersionLabel.h
new file mode 100644
index 000000000..2ff2166c4
--- /dev/null
+++ b/AnnService/inc/Core/Common/VersionLabel.h
@@ -0,0 +1,126 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+#ifndef _SPTAG_COMMON_VERSIONLABEL_H_
+#define _SPTAG_COMMON_VERSIONLABEL_H_
+
+#include
+#include "Dataset.h"
+
+namespace SPTAG
+{
+ namespace COMMON
+ {
+ class VersionLabel
+ {
+ private:
+ std::atomic m_deleted;
+ Dataset m_data;
+
+ public:
+ VersionLabel()
+ {
+ m_deleted = 0;
+ m_data.SetName("versionLabelID");
+ }
+
+ void Initialize(SizeType size, SizeType blockSize, SizeType capacity)
+ {
+ m_data.Initialize(size, 1, blockSize, capacity);
+ }
+
+ inline size_t Count() const { return m_data.R() - m_deleted.load(); }
+
+ inline size_t GetDeleteCount() const { return m_deleted.load();}
+
+ inline bool Deleted(const SizeType& key) const
+ {
+ return *m_data[key] == 0xfe;
+ }
+
+ inline bool Delete(const SizeType& key)
+ {
+ uint8_t oldvalue = (uint8_t)InterlockedExchange8((char*)(m_data[key]), (char)0xfe);
+ if (oldvalue == 0xfe) return false;
+ m_deleted++;
+ return true;
+ }
+
+ inline uint8_t GetVersion(const SizeType& key)
+ {
+ return *m_data[key];
+ }
+
+ inline bool IncVersion(const SizeType& key, uint8_t* newVersion)
+ {
+ while (true) {
+ if (Deleted(key)) return false;
+ uint8_t oldVersion = GetVersion(key);
+ *newVersion = (oldVersion+1) & 0x7f;
+ if (((uint8_t)InterlockedCompareExchange((char*)m_data[key], (char)*newVersion, (char)oldVersion)) == oldVersion) {
+ return true;
+ }
+ }
+ }
+
+ inline SizeType GetVectorNum()
+ {
+ return m_data.R();
+ }
+
+ inline ErrorCode Save(std::shared_ptr output)
+ {
+ SizeType deleted = m_deleted.load();
+ IOBINARY(output, WriteBinary, sizeof(SizeType), (char*)&deleted);
+ return m_data.Save(output);
+ }
+
+ inline ErrorCode Save(const std::string& filename)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Save %s To %s\n", m_data.Name().c_str(), filename.c_str());
+ auto ptr = f_createIO();
+ if (ptr == nullptr || !ptr->Initialize(filename.c_str(), std::ios::binary | std::ios::out)) return ErrorCode::FailedCreateFile;
+ return Save(ptr);
+ }
+
+ inline ErrorCode Load(std::shared_ptr input, SizeType blockSize, SizeType capacity)
+ {
+ SizeType deleted;
+ IOBINARY(input, ReadBinary, sizeof(SizeType), (char*)&deleted);
+ m_deleted = deleted;
+ return m_data.Load(input, blockSize, capacity);
+ }
+
+ inline ErrorCode Load(const std::string& filename, SizeType blockSize, SizeType capacity)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load %s From %s\n", m_data.Name().c_str(), filename.c_str());
+ auto ptr = f_createIO();
+ if (ptr == nullptr || !ptr->Initialize(filename.c_str(), std::ios::binary | std::ios::in)) return ErrorCode::FailedOpenFile;
+ return Load(ptr, blockSize, capacity);
+ }
+
+ inline ErrorCode Load(char* pmemoryFile, SizeType blockSize, SizeType capacity)
+ {
+ m_deleted = *((SizeType*)pmemoryFile);
+ return m_data.Load(pmemoryFile + sizeof(SizeType), blockSize, capacity);
+ }
+
+ inline ErrorCode AddBatch(SizeType num)
+ {
+ return m_data.AddBatch(num);
+ }
+
+ inline std::uint64_t BufferSize() const
+ {
+ return m_data.BufferSize() + sizeof(SizeType);
+ }
+
+ inline void SetR(SizeType num)
+ {
+ m_data.SetR(num);
+ }
+ };
+ }
+}
+
+#endif // _SPTAG_COMMON_LABELSET_H_
diff --git a/AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h b/AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
new file mode 100644
index 000000000..7618e050c
--- /dev/null
+++ b/AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
@@ -0,0 +1,1782 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+#ifndef _SPTAG_SPANN_EXTRADYNAMICSEARCHER_H_
+#define _SPTAG_SPANN_EXTRADYNAMICSEARCHER_H_
+
+#include "inc/Helper/VectorSetReader.h"
+#include "inc/Helper/AsyncFileReader.h"
+#include "IExtraSearcher.h"
+#include "ExtraStaticSearcher.h"
+#include "inc/Core/Common/TruthSet.h"
+#include "inc/Helper/KeyValueIO.h"
+#include "inc/Core/Common/FineGrainedLock.h"
+#include "PersistentBuffer.h"
+#include "inc/Core/Common/PostingSizeRecord.h"
+#include "ExtraSPDKController.h"
+#include
+#include
+#include