diff --git a/Cargo.lock b/Cargo.lock index 71a48056..831c2b45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,6 +363,7 @@ dependencies = [ name = "base" version = "0.0.0" dependencies = [ + "base_macros", "bytemuck", "c", "detect", @@ -372,6 +373,7 @@ dependencies = [ "rand", "serde", "thiserror", + "toml", "uuid", "validator", ] @@ -382,6 +384,15 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base_macros" +version = "0.0.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.55", +] + [[package]] name = "basic-cookies" version = "0.1.5" @@ -2949,9 +2960,9 @@ dependencies = [ [[package]] name = "validator" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da339118f018cc70ebf01fafc103360528aad53717e4bf311db929cb01cb9345" +checksum = "ecda4130ab69f138bc9ec971ac01c173ce053d993cc600eb01633be50a8f0b1a" dependencies = [ "idna", "once_cell", @@ -2965,15 +2976,15 @@ dependencies = [ [[package]] name = "validator_derive" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76e88ea23b8f5e59230bff8a2f03c0ee0054a61d5b8343a38946bcd406fe624c" +checksum = "c1829bd6a78a15a6a689dd17921ad614e281224a34b233b15be4a11affa61c1b" dependencies = [ "darling", + "once_cell", "proc-macro-error", "proc-macro2", "quote", - "regex", "syn 2.0.55", ] diff --git a/Cargo.toml b/Cargo.toml index 30924014..59d37f30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ thiserror.workspace = true tikv-jemallocator = { version = "0.5.4", features = [ "disable_initial_exec_tls", ] } -toml = "0.8.10" +toml.workspace = true validator.workspace = true base = { path = "crates/base" } @@ -87,8 +87,9 @@ rustix = { version = "0.38.31", features = ["fs", "mm", "net"] } serde = "1" serde_json = "1" thiserror = "1" +toml = "0.8.10" uuid = { version = "1.7.0", features = ["serde", "v4"] } -validator = { version = "0.17.0", features = ["derive"] } +validator = { version = "0.18.0", features = ["derive"] } [workspace.lints] rust.unsafe_op_in_unsafe_fn = "forbid" diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index a0eb840d..1fec0c8e 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -11,9 +11,11 @@ num-traits.workspace = true rand.workspace = true serde.workspace = true thiserror.workspace = true +toml.workspace = true uuid.workspace = true validator.workspace = true +base_macros = { path = "../base_macros" } c = { path = "../c" } detect = { path = "../detect" } diff --git a/crates/base/src/index.rs b/crates/base/src/index.rs index ae3222a7..cac34e8b 100644 --- a/crates/base/src/index.rs +++ b/crates/base/src/index.rs @@ -1,5 +1,6 @@ use crate::distance::*; use crate::vector::*; +use base_macros::Alter; use serde::{Deserialize, Serialize}; use thiserror::Error; use uuid::Uuid; @@ -8,7 +9,7 @@ use validator::{Validate, ValidationError}; #[must_use] #[derive(Debug, Clone, Error, Serialize, Deserialize)] pub enum CreateError { - #[error("Invalid index options.")] + #[error("Invalid index options: {reason}.")] InvalidIndexOptions { reason: String }, } @@ -81,47 +82,37 @@ pub enum StatError { #[must_use] #[derive(Debug, Clone, Error, Serialize, Deserialize)] pub enum AlterError { - #[error("Setting key {key} is not exist.")] - BadKey { key: String }, - #[error("Setting key {key} has a wrong value {value}.")] - BadValue { key: String, value: String }, #[error("Index not found.")] NotExist, + #[error("Key {key} not found.")] + KeyNotExists { key: String }, + #[error("Invalid index options: {reason}.")] + InvalidIndexOptions { reason: String }, } -#[derive(Debug, Clone, Serialize, Deserialize, Validate)] -#[serde(deny_unknown_fields)] -pub struct IndexFlexibleOptions { - #[serde(default = "IndexFlexibleOptions::default_optimizing_threads")] - #[validate(range(min = 1, max = 65535))] - pub optimizing_threads: u16, -} - -impl IndexFlexibleOptions { - pub fn default_optimizing_threads() -> u16 { - 1 - } +#[must_use] +#[derive(Debug, Clone, Error, Serialize, Deserialize)] +pub enum StopError { + #[error("Index not found.")] + NotExist, } -impl Default for IndexFlexibleOptions { - fn default() -> Self { - Self { - optimizing_threads: Self::default_optimizing_threads(), - } - } +#[must_use] +#[derive(Debug, Clone, Error, Serialize, Deserialize)] +pub enum StartError { + #[error("Index not found.")] + NotExist, } #[derive(Debug, Clone, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] #[validate(schema(function = "IndexOptions::validate_index_options"))] pub struct IndexOptions { - #[validate] + #[validate(nested)] pub vector: VectorOptions, - #[validate] + #[validate(nested)] pub segment: SegmentsOptions, - #[validate] - pub optimizing: OptimizingOptions, - #[validate] + #[validate(nested)] pub indexing: IndexingOptions, } @@ -140,13 +131,20 @@ impl IndexOptions { }; if !is_trivial { return Err(ValidationError::new( - "Quantization is not supported for svector, bvector, and vecint8.", + "Quantization is not supported for svector, bvector, and veci8.", )); } Ok(()) } } +#[derive(Debug, Clone, Serialize, Deserialize, Validate, Alter)] +#[serde(deny_unknown_fields)] +pub struct IndexAlterableOptions { + #[validate(nested)] + pub optimizing: OptimizingOptions, +} + #[derive(Debug, Clone, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] #[validate(schema(function = "Self::validate_0"))] @@ -222,9 +220,12 @@ impl Default for SegmentsOptions { } } -#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate, Alter)] #[serde(deny_unknown_fields)] pub struct OptimizingOptions { + #[serde(default = "OptimizingOptions::default_optimizing_threads")] + #[validate(range(min = 1, max = 65535))] + pub optimizing_threads: u16, #[serde(default = "OptimizingOptions::default_sealing_secs")] #[validate(range(min = 1, max = 60))] pub sealing_secs: u64, @@ -237,6 +238,9 @@ pub struct OptimizingOptions { } impl OptimizingOptions { + fn default_optimizing_threads() -> u16 { + 1 + } fn default_sealing_secs() -> u64 { 60 } @@ -251,6 +255,7 @@ impl OptimizingOptions { impl Default for OptimizingOptions { fn default() -> Self { Self { + optimizing_threads: Self::default_optimizing_threads(), sealing_secs: Self::default_sealing_secs(), sealing_size: Self::default_sealing_size(), delete_threshold: Self::default_delete_threshold(), @@ -308,7 +313,7 @@ impl Validate for IndexingOptions { #[serde(deny_unknown_fields)] pub struct FlatIndexingOptions { #[serde(default)] - #[validate] + #[validate(nested)] pub quantization: QuantizationOptions, } @@ -336,7 +341,7 @@ pub struct IvfIndexingOptions { #[validate(range(min = 1, max = 1_000_000))] pub nsample: u32, #[serde(default)] - #[validate] + #[validate(nested)] pub quantization: QuantizationOptions, } @@ -375,9 +380,9 @@ pub struct HnswIndexingOptions { pub m: u32, #[serde(default = "HnswIndexingOptions::default_ef_construction")] #[validate(range(min = 10, max = 2000))] - pub ef_construction: usize, + pub ef_construction: u32, #[serde(default)] - #[validate] + #[validate(nested)] pub quantization: QuantizationOptions, } @@ -385,7 +390,7 @@ impl HnswIndexingOptions { fn default_m() -> u32 { 12 } - fn default_ef_construction() -> usize { + fn default_ef_construction() -> u32 { 300 } } @@ -492,7 +497,7 @@ impl Default for ProductQuantizationOptionsRatio { pub struct SearchOptions { pub prefilter_enable: bool, #[validate(range(min = 1, max = 65535))] - pub hnsw_ef_search: usize, + pub hnsw_ef_search: u32, #[validate(range(min = 1, max = 1_000_000))] pub ivf_nprobe: u32, } @@ -507,8 +512,30 @@ pub struct IndexStat { #[derive(Debug, Serialize, Deserialize)] pub struct SegmentStat { pub id: Uuid, - #[serde(rename = "type")] - pub typ: String, + pub r#type: String, pub length: usize, pub size: u64, } + +pub trait Alter { + fn alter(&mut self, key: &[&str], value: &str) -> Result<(), AlterError>; +} + +macro_rules! impl_alter_for { + {$($t:ty)*} => { + $(impl Alter for $t { + fn alter(&mut self, key: &[&str], value: &str) -> Result<(), AlterError> { + use std::str::FromStr; + if key.is_empty() { + *self = FromStr::from_str(value).map_err(|_| AlterError::InvalidIndexOptions { reason: "failed to parse".to_string() })?; + return Ok(()); + } + Err(AlterError::KeyNotExists { key: key.join(".") }) + } + })* + }; +} + +impl_alter_for! { + String u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 bool +} diff --git a/crates/base/src/worker.rs b/crates/base/src/worker.rs index 02d36b6a..d6ebf8fe 100644 --- a/crates/base/src/worker.rs +++ b/crates/base/src/worker.rs @@ -3,7 +3,12 @@ use crate::search::*; use crate::vector::*; pub trait WorkerOperations { - fn create(&self, handle: Handle, options: IndexOptions) -> Result<(), CreateError>; + fn create( + &self, + handle: Handle, + options: IndexOptions, + alterable_options: IndexAlterableOptions, + ) -> Result<(), CreateError>; fn drop(&self, handle: Handle) -> Result<(), DropError>; fn flush(&self, handle: Handle) -> Result<(), FlushError>; fn insert( @@ -17,7 +22,9 @@ pub trait WorkerOperations { fn view_vbase(&self, handle: Handle) -> Result; fn view_list(&self, handle: Handle) -> Result; fn stat(&self, handle: Handle) -> Result; - fn alter(&self, handle: Handle, key: String, value: String) -> Result<(), AlterError>; + fn alter(&self, handle: Handle, key: &str, value: &str) -> Result<(), AlterError>; + fn stop(&self, handle: Handle) -> Result<(), StopError>; + fn start(&self, handle: Handle) -> Result<(), StartError>; } pub trait ViewBasicOperations { diff --git a/crates/base_macros/Cargo.toml b/crates/base_macros/Cargo.toml new file mode 100644 index 00000000..44680a5f --- /dev/null +++ b/crates/base_macros/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "base_macros" +version.workspace = true +edition.workspace = true + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = { version = "1.0.79", features = ["proc-macro"] } +quote = "1.0.35" +syn = { version = "2.0.53", default-features = false, features = [ + "clone-impls", + "full", + "parsing", + "printing", + "proc-macro", +] } + +[lints] +workspace = true diff --git a/crates/base_macros/src/lib.rs b/crates/base_macros/src/lib.rs new file mode 100644 index 00000000..31e72658 --- /dev/null +++ b/crates/base_macros/src/lib.rs @@ -0,0 +1,77 @@ +use quote::quote; +use syn::{parse_macro_input, Data, DeriveInput, Fields}; + +#[proc_macro_derive(Alter)] +pub fn alter(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let t = input.ident.clone(); + match input.data { + Data::Struct(data_struct) => { + assert!( + data_struct.semi_token.is_none(), + "unit struct is not supported" + ); + let mut idents = Vec::new(); + for field in data_struct.fields { + idents.push(field.ident.clone().expect("tuple struct is not supported")); + } + proc_macro::TokenStream::from(quote! { + impl Alter for #t { + fn alter(&mut self, key: &[&str], value: &str) -> Result<(), AlterError> { + if key.is_empty() { + *self = toml::from_str(value).map_err(|e| AlterError::InvalidIndexOptions { reason: e.to_string() })?; + return Ok(()); + } + match key[0] { + #(stringify!(#idents) => Alter::alter(&mut self.#idents, &key[1..], value),)* + _ => Err(AlterError::KeyNotExists { key: key.join(".") }), + } + } + } + }) + } + Data::Enum(data_enum) => { + let mut idents = Vec::new(); + let mut is_unit = false; + let mut is_unnamed = false; + for variant in data_enum.variants { + idents.push(variant.ident.clone()); + match variant.fields { + Fields::Named(_) => panic!("named fields in enum is not supported"), + Fields::Unnamed(_) => is_unnamed = true, + Fields::Unit => is_unit = true, + } + } + match (is_unit, is_unnamed) { + (true, true) => panic!("both unit and unnamed fields in enum is not supported"), + (false, false) => panic!("only inhabited enum is supported"), + (true, _) => proc_macro::TokenStream::from(quote! { + impl Alter for #t { + fn alter(&mut self, key: &[&str], value: &str) -> Result<(), AlterError> { + if key.is_empty() { + *self = toml::from_str(value).map_err(|e| AlterError::InvalidIndexOptions { reason: e.to_string() })?; + return Ok(()); + } + Err(AlterError::KeyNotExists { key: key.join(".") }) + } + } + }), + (_, true) => proc_macro::TokenStream::from(quote! { + impl Alter for #t { + fn alter(&mut self, key: &[&str], value: &str) -> Result<(), AlterError> { + if key.is_empty() { + *self = toml::from_str(value).map_err(|e| AlterError::InvalidIndexOptions { reason: e.to_string() })?; + return Ok(()); + } + match self { + #(Self::#idents(x) if stringify!(#idents).to_lowercase() == key[0] => Alter::alter(x, &key[1..], value),)* + _ => Err(AlterError::KeyNotExists { key: key.join(".") }), + } + } + } + }), + } + } + Data::Union(_) => panic!("union is not supported"), + } +} diff --git a/crates/hnsw/src/lib.rs b/crates/hnsw/src/lib.rs index 68fae2d4..1b17ea2f 100644 --- a/crates/hnsw/src/lib.rs +++ b/crates/hnsw/src/lib.rs @@ -349,7 +349,7 @@ pub fn make>( &mut visited, target, u, - ef_construction, + ef_construction as usize, j, ); edges.sort(); @@ -449,7 +449,7 @@ pub fn open(path: &Path, options: IndexOptions) -> HnswMmap pub fn basic( mmap: &HnswMmap, vector: Borrowed<'_, O>, - ef_search: usize, + ef_search: u32, filter: impl Filter, ) -> BinaryHeap> { let Some(s) = entry(mmap, filter.clone()) else { @@ -457,13 +457,13 @@ pub fn basic( }; let levels = count_layers_of_a_vertex(mmap.m, s) - 1; let u = fast_search(mmap, 1..=levels, s, vector, filter.clone()); - local_search_basic(mmap, ef_search, u, vector, filter).into_reversed_heap() + local_search_basic(mmap, ef_search as usize, u, vector, filter).into_reversed_heap() } pub fn vbase<'a, O: OperatorHnsw>( mmap: &'a HnswMmap, vector: Borrowed<'a, O>, - range: usize, + ef_search: u32, filter: impl Filter + 'a, ) -> (Vec, Box<(dyn Iterator + 'a)>) { let Some(s) = entry(mmap, filter.clone()) else { @@ -472,14 +472,14 @@ pub fn vbase<'a, O: OperatorHnsw>( let levels = count_layers_of_a_vertex(mmap.m, s) - 1; let u = fast_search(mmap, 1..=levels, s, vector, filter.clone()); let mut iter = local_search_vbase(mmap, u, vector, filter.clone()); - let mut queue = BinaryHeap::::with_capacity(1 + range); + let mut queue = BinaryHeap::::with_capacity(1 + ef_search as usize); let mut stage1 = Vec::new(); for x in &mut iter { - if queue.len() == range && queue.peek().unwrap().distance < x.distance { + if queue.len() == ef_search as usize && queue.peek().unwrap().distance < x.distance { stage1.push(x); break; } - if queue.len() == range { + if queue.len() == ef_search as usize { queue.pop(); } queue.push(x); diff --git a/crates/index/src/lib.rs b/crates/index/src/lib.rs index 821a5d83..2321763b 100644 --- a/crates/index/src/lib.rs +++ b/crates/index/src/lib.rs @@ -11,8 +11,7 @@ mod utils; use self::delete::Delete; use self::segments::growing::GrowingSegment; use self::segments::sealed::SealedSegment; -use crate::optimizing::indexing::OptimizerIndexing; -use crate::optimizing::sealing::OptimizerSealing; +use crate::optimizing::Optimizing; use crate::utils::tournament_tree::LoserTree; use arc_swap::ArcSwap; pub use base::distance::*; @@ -34,7 +33,6 @@ use std::collections::HashMap; use std::collections::HashSet; use std::convert::Infallible; use std::path::PathBuf; -use std::str::FromStr; use std::sync::Arc; use std::thread::JoinHandle; use std::time::Instant; @@ -55,20 +53,28 @@ pub struct Index { delete: Arc, protect: Mutex>, view: ArcSwap>, - instant_index: AtomicCell, - instant_write: AtomicCell, - background_indexing: Mutex, JoinHandle<()>)>>, - background_sealing: Mutex, JoinHandle<()>)>>, + instant_indexed: AtomicCell, + instant_written: AtomicCell, + optimizing: Mutex, JoinHandle<()>)>>, _tracker: Arc, } impl Index { - pub fn create(path: PathBuf, options: IndexOptions) -> Result, CreateError> { + pub fn create( + path: PathBuf, + options: IndexOptions, + alterable_options: IndexAlterableOptions, + ) -> Result, CreateError> { if let Err(err) = options.validate() { return Err(CreateError::InvalidIndexOptions { reason: err.to_string(), }); } + if let Err(e) = alterable_options.validate() { + return Err(CreateError::InvalidIndexOptions { + reason: e.to_string(), + }); + } std::fs::create_dir(&path).unwrap(); std::fs::write( path.join("options"), @@ -81,7 +87,7 @@ impl Index { IndexStartup { sealeds: HashSet::new(), growings: HashSet::new(), - flexible: IndexFlexibleOptions::default(), + alterable_options: alterable_options.clone(), }, ); let delete = Delete::create(path.join("delete")); @@ -95,19 +101,19 @@ impl Index { sealed: HashMap::new(), growing: HashMap::new(), write: None, + alterable_options: alterable_options.clone(), }), view: ArcSwap::new(Arc::new(IndexView { options: options.clone(), - flexible: IndexFlexibleOptions::default(), + alterable_options: alterable_options.clone(), sealed: HashMap::new(), growing: HashMap::new(), delete: delete.clone(), write: None, })), - instant_index: AtomicCell::new(Instant::now()), - instant_write: AtomicCell::new(Instant::now()), - background_indexing: Mutex::new(None), - background_sealing: Mutex::new(None), + instant_indexed: AtomicCell::new(Instant::now()), + instant_written: AtomicCell::new(Instant::now()), + optimizing: Mutex::new(None), _tracker: Arc::new(IndexTracker { path }), }); Ok(index) @@ -119,7 +125,7 @@ impl Index { .unwrap(); let tracker = Arc::new(IndexTracker { path: path.clone() }); let startup = FileAtomic::::open(path.join("startup")); - let flexible = startup.get().flexible.clone(); + let alterable_options = startup.get().alterable_options.clone(); clean( path.join("segments"), startup @@ -171,19 +177,19 @@ impl Index { sealed: sealed.clone(), growing: growing.clone(), write: None, + alterable_options: alterable_options.clone(), }), view: ArcSwap::new(Arc::new(IndexView { options: options.clone(), - flexible, + alterable_options: alterable_options.clone(), delete: delete.clone(), sealed, growing, write: None, })), - instant_index: AtomicCell::new(Instant::now()), - instant_write: AtomicCell::new(Instant::now()), - background_indexing: Mutex::new(None), - background_sealing: Mutex::new(None), + instant_indexed: AtomicCell::new(Instant::now()), + instant_written: AtomicCell::new(Instant::now()), + optimizing: Mutex::new(None), _tracker: tracker, }) } @@ -193,21 +199,18 @@ impl Index { pub fn view(&self) -> Arc> { self.view.load_full() } - pub fn alter(self: &Arc, key: String, value: String) -> Result<(), AlterError> { + pub fn alter(self: &Arc, key: &str, value: &str) -> Result<(), AlterError> { let mut protect = self.protect.lock(); - match key.as_str() { - "optimizing.threads" => { - let parsed = i32::from_str(value.as_str()) - .map_err(|_e| AlterError::BadValue { key, value })?; - let optimizing_threads = match parsed { - 0 => IndexFlexibleOptions::default_optimizing_threads(), - threads_limit => threads_limit as u16, - }; - protect.flexible_set(IndexFlexibleOptions { optimizing_threads }); - protect.maintain(self.options.clone(), self.delete.clone(), &self.view); - } - _ => return Err(AlterError::BadKey { key }), - }; + let mut alterable_options = protect.alterable_options.clone(); + let key = key.split('.').collect::>(); + alterable_options.alter(key.as_slice(), value)?; + if let Err(e) = alterable_options.validate() { + return Err(AlterError::InvalidIndexOptions { + reason: e.to_string(), + }); + } + protect.alterable_options = alterable_options; + protect.maintain(self.options.clone(), self.delete.clone(), &self.view); Ok(()) } pub fn refresh(&self) { @@ -230,7 +233,7 @@ impl Index { ); protect.write = Some((write_segment_uuid, write_segment)); protect.maintain(self.options.clone(), self.delete.clone(), &self.view); - self.instant_write.store(Instant::now()); + self.instant_written.store(Instant::now()); } pub fn seal(&self, check: Uuid) { let mut protect = self.protect.lock(); @@ -243,12 +246,12 @@ impl Index { } protect.write = None; protect.maintain(self.options.clone(), self.delete.clone(), &self.view); - self.instant_write.store(Instant::now()); + self.instant_written.store(Instant::now()); } pub fn stat(&self) -> IndexStat { let view = self.view(); IndexStat { - indexing: self.instant_index.load() < self.instant_write.load(), + indexing: self.instant_indexed.load() < self.instant_written.load(), options: self.options().clone(), segments: { let mut segments = Vec::new(); @@ -266,33 +269,16 @@ impl Index { } } pub fn start(self: &Arc) { - { - let mut background_indexing = self.background_indexing.lock(); - if background_indexing.is_none() { - *background_indexing = Some(OptimizerIndexing::new(self.clone()).spawn()); - } - } - { - let mut background_sealing = self.background_sealing.lock(); - if background_sealing.is_none() { - *background_sealing = Some(OptimizerSealing::new(self.clone()).spawn()); - } + let mut optimizing = self.optimizing.lock(); + if optimizing.is_none() { + *optimizing = Some(Optimizing::new(self.clone()).spawn()); } } pub fn stop(&self) { - { - let mut background_indexing = self.background_indexing.lock(); - if let Some((sender, join_handle)) = background_indexing.take() { - drop(sender); - let _ = join_handle.join(); - } - } - { - let mut background_sealing = self.background_sealing.lock(); - if let Some((sender, join_handle)) = background_sealing.take() { - drop(sender); - let _ = join_handle.join(); - } + let mut optimizing = self.optimizing.lock(); + if let Some((sender, join_handle)) = optimizing.take() { + drop(sender); + let _ = join_handle.join(); } } pub fn wait(&self) -> Arc { @@ -300,10 +286,6 @@ impl Index { } } -impl Drop for Index { - fn drop(&mut self) {} -} - #[derive(Debug, Clone)] pub struct IndexTracker { path: PathBuf, @@ -317,7 +299,7 @@ impl Drop for IndexTracker { pub struct IndexView { pub options: IndexOptions, - pub flexible: IndexFlexibleOptions, + pub alterable_options: IndexAlterableOptions, pub delete: Arc, pub sealed: HashMap>>, pub growing: HashMap>>, @@ -532,7 +514,7 @@ impl IndexView { struct IndexStartup { sealeds: HashSet, growings: HashSet, - flexible: IndexFlexibleOptions, + alterable_options: IndexAlterableOptions, } struct IndexProtect { @@ -540,20 +522,19 @@ struct IndexProtect { sealed: HashMap>>, growing: HashMap>>, write: Option<(Uuid, Arc>)>, + alterable_options: IndexAlterableOptions, } impl IndexProtect { - /// Export IndexProtect to IndexView fn maintain( &mut self, options: IndexOptions, delete: Arc, swap: &ArcSwap>, ) { - let old_startup = self.startup.get(); let view = Arc::new(IndexView { options, - flexible: old_startup.flexible.clone(), + alterable_options: self.alterable_options.clone(), delete, sealed: self.sealed.clone(), growing: self.growing.clone(), @@ -565,16 +546,8 @@ impl IndexProtect { self.startup.set(IndexStartup { sealeds: startup_sealeds, growings: startup_growings, - flexible: old_startup.flexible.clone(), + alterable_options: self.alterable_options.clone(), }); swap.swap(view); } - fn flexible_set(&mut self, flexible: IndexFlexibleOptions) { - let src = self.startup.get(); - self.startup.set(IndexStartup { - sealeds: src.sealeds.clone(), - growings: src.sealeds.clone(), - flexible, - }); - } } diff --git a/crates/index/src/optimizing/index_source.rs b/crates/index/src/optimizing/index_source.rs new file mode 100644 index 00000000..acb8ed43 --- /dev/null +++ b/crates/index/src/optimizing/index_source.rs @@ -0,0 +1,71 @@ +use crate::Op; +use crate::{GrowingSegment, SealedSegment}; +use base::index::IndexOptions; +use base::operator::Borrowed; +use base::search::*; +use std::sync::Arc; + +pub struct IndexSource { + pub(super) sealed: Option>>, + pub(super) growing: Vec>>, + pub(super) dims: u32, +} + +impl IndexSource { + pub fn new( + options: IndexOptions, + sealed: Option>>, + growing: Vec>>, + ) -> Self { + IndexSource { + sealed, + growing, + dims: options.vector.dims, + } + } +} + +impl Collection for IndexSource { + fn dims(&self) -> u32 { + self.dims + } + + fn len(&self) -> u32 { + self.sealed.iter().map(|x| x.len()).sum::() + + self.growing.iter().map(|x| x.len()).sum::() + } + + fn vector(&self, mut index: u32) -> Borrowed<'_, O> { + for x in self.sealed.iter() { + if index < x.len() { + return x.vector(index); + } + index -= x.len(); + } + for x in self.growing.iter() { + if index < x.len() { + return x.vector(index); + } + index -= x.len(); + } + panic!("Out of bound.") + } + + fn payload(&self, mut index: u32) -> Payload { + for x in self.sealed.iter() { + if index < x.len() { + return x.payload(index); + } + index -= x.len(); + } + for x in self.growing.iter() { + if index < x.len() { + return x.payload(index); + } + index -= x.len(); + } + panic!("Out of bound.") + } +} + +impl Source for IndexSource {} diff --git a/crates/index/src/optimizing/indexing.rs b/crates/index/src/optimizing/indexing.rs index ba851a21..04612e69 100644 --- a/crates/index/src/optimizing/indexing.rs +++ b/crates/index/src/optimizing/indexing.rs @@ -1,265 +1,109 @@ -use crate::GrowingSegment; +use crate::optimizing::index_source::IndexSource; use crate::Index; use crate::Op; use crate::SealedSegment; pub use base::distance::*; pub use base::index::*; -use base::operator::Borrowed; pub use base::search::*; pub use base::vector::*; -use crossbeam::channel::TryRecvError; -use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender}; -use std::cmp::Reverse; -use std::convert::Infallible; use std::sync::Arc; -use std::thread::JoinHandle; -use std::time::Instant; -use thiserror::Error; use uuid::Uuid; -pub struct IndexSource { - sealed: Vec>>, - growing: Vec>>, - dims: u32, -} - -impl IndexSource { - pub fn new( - options: IndexOptions, - sealed: Vec>>, - growing: Vec>>, - ) -> Self { - IndexSource { - sealed, - growing, - dims: options.vector.dims, - } - } -} - -impl Collection for IndexSource { - fn dims(&self) -> u32 { - self.dims - } - - fn len(&self) -> u32 { - self.sealed.iter().map(|x| x.len()).sum::() - + self.growing.iter().map(|x| x.len()).sum::() - } - - fn vector(&self, mut index: u32) -> Borrowed<'_, O> { - for x in self.sealed.iter() { - if index < x.len() { - return x.vector(index); - } - index -= x.len(); - } - for x in self.growing.iter() { - if index < x.len() { - return x.vector(index); - } - index -= x.len(); - } - panic!("Out of bound.") - } - - fn payload(&self, mut index: u32) -> Payload { - for x in self.sealed.iter() { - if index < x.len() { - return x.payload(index); +pub fn scan(index: Arc>) -> Option> { + let (sealed, growing) = 'a: { + let protect = index.protect.lock(); + // task 1 approach 1: merge small segments to a big segment + { + let mut counter = 0u64; + let base_segment = { + let mut sealed_segments = protect.sealed.values().collect::>(); + sealed_segments.sort_by_key(|s| s.len()); + let base_segment = sealed_segments.first().cloned(); + counter += base_segment.map(|x| x.len() as u64).unwrap_or_default(); + base_segment.cloned() + }; + let delta_segments = { + let mut growing_segments = protect.growing.values().collect::>(); + growing_segments.sort_by_key(|s| s.len()); + let mut delta_segments = Vec::new(); + for growing_segment in growing_segments.iter().cloned().cloned() { + if counter + growing_segment.len() as u64 + <= index.options.segment.max_sealed_segment_size as u64 + { + counter += growing_segment.len() as u64; + delta_segments.push(growing_segment); + } else { + break; + } + } + delta_segments + }; + if !delta_segments.is_empty() { + break 'a (base_segment, delta_segments); } - index -= x.len(); } - for x in self.growing.iter() { - if index < x.len() { - return x.payload(index); + // task 1 approach 2: merge small segments + { + let mut counter = 0u64; + let delta_segments = { + let mut growing_segments = protect.growing.values().collect::>(); + growing_segments.sort_by_key(|s| s.len()); + let mut delta_segments = Vec::new(); + for growing_segment in growing_segments.iter().cloned().cloned() { + if counter + growing_segment.len() as u64 + <= index.options.segment.max_sealed_segment_size as u64 + { + counter += growing_segment.len() as u64; + delta_segments.push(growing_segment); + } else { + break; + } + } + delta_segments + }; + if !delta_segments.is_empty() { + break 'a (None, delta_segments); } - index -= x.len(); } - panic!("Out of bound.") - } -} - -impl Source for IndexSource {} - -pub struct OptimizerIndexing { - index: Arc>, + return None; + }; + Some(IndexSource::new( + index.options().clone(), + sealed.clone(), + growing.clone(), + )) } -impl OptimizerIndexing { - pub fn new(index: Arc>) -> Self { - Self { index } - } - pub fn spawn(self) -> (Sender, JoinHandle<()>) { - let (tx, rx) = bounded(1); - ( - tx, - std::thread::spawn(move || { - self.main(rx); - }), +pub fn make(index: Arc>, source: IndexSource) { + let next = { + let uuid = Uuid::new_v4(); + SealedSegment::create( + index._tracker.clone(), + index.path.join("segments").join(uuid.to_string()), + uuid, + index.options.clone(), + &source, ) - } - fn main(self, shutdown_rx: Receiver) { - let index = self.index; - loop { - let view = index.view(); - let threads = view.flexible.optimizing_threads; - let (finish_tx, finish_rx) = bounded::(1); - rayon::ThreadPoolBuilder::new() - .num_threads(threads as usize) - .build_scoped(|pool| { - std::thread::scope(|scope| { - let handler = scope.spawn(|| { - let status = monitor(&finish_rx, &shutdown_rx); - match status { - MonitorStatus::Finished => (), - MonitorStatus::Shutdown => pool.stop(), - } - }); - pool.install(|| { - let _finish_tx = finish_tx; - let _ = optimizing_indexing(index.clone()); - }); - let _ = handler.join(); - }) - }) - .unwrap(); - match shutdown_rx.recv_timeout(std::time::Duration::from_secs(60)) { - Ok(never) => match never {}, - Err(RecvTimeoutError::Disconnected) => return, - Err(RecvTimeoutError::Timeout) => (), - } - } - } -} - -pub enum MonitorStatus { - Finished, - Shutdown, -} - -/// Monitor the internal finish and the external shutdown of `optimizing_indexing` -fn monitor(finish_rx: &Receiver, shutdown_rx: &Receiver) -> MonitorStatus { - let timeout = std::time::Duration::from_secs(1); - loop { - match finish_rx.try_recv() { - Ok(never) => match never {}, - Err(TryRecvError::Disconnected) => { - return MonitorStatus::Finished; - } - Err(TryRecvError::Empty) => (), - } - match shutdown_rx.recv_timeout(timeout) { - Ok(never) => match never {}, - Err(RecvTimeoutError::Disconnected) => { - return MonitorStatus::Shutdown; - } - Err(RecvTimeoutError::Timeout) => (), - } - } -} - -enum Seg { - Sealed(Arc>), - Growing(Arc>), -} - -impl Seg { - fn uuid(&self) -> Uuid { - use Seg::*; - match self { - Sealed(x) => x.uuid(), - Growing(x) => x.uuid(), - } - } - fn len(&self) -> u32 { - use Seg::*; - match self { - Sealed(x) => x.len(), - Growing(x) => x.len(), + }; + let mut protect = index.protect.lock(); + for sealed in source.sealed.iter() { + if protect.sealed.contains_key(&sealed.uuid()) { + continue; } + return; } - fn get_sealed(&self) -> Option>> { - match self { - Seg::Sealed(x) => Some(x.clone()), - _ => None, + for growing in source.growing.iter() { + if protect.growing.contains_key(&growing.uuid()) { + continue; } + return; } - fn get_growing(&self) -> Option>> { - match self { - Seg::Growing(x) => Some(x.clone()), - _ => None, - } + for sealed in source.sealed.iter() { + protect.sealed.remove(&sealed.uuid()); } -} - -#[derive(Debug, Error)] -#[error("Interrupted, retry again.")] -pub struct RetryError; - -pub fn optimizing_indexing(index: Arc>) -> Result<(), RetryError> { - use Seg::*; - let segs = { - let protect = index.protect.lock(); - let mut segs_0 = Vec::new(); - segs_0.extend(protect.growing.values().map(|x| Growing(x.clone()))); - segs_0.extend(protect.sealed.values().map(|x| Sealed(x.clone()))); - segs_0.sort_by_key(|case| Reverse(case.len())); - let mut segs_1 = Vec::new(); - let mut total = 0u64; - let mut count = 0; - while let Some(seg) = segs_0.pop() { - if total + seg.len() as u64 <= index.options.segment.max_sealed_segment_size as u64 { - total += seg.len() as u64; - if let Growing(_) = seg { - count += 1; - } - segs_1.push(seg); - } else { - break; - } - } - if segs_1.is_empty() || (segs_1.len() == 1 && count == 0) { - index.instant_index.store(Instant::now()); - return Err(RetryError); - } - segs_1 - }; - let sealed_segment = merge(&index, &segs); - { - let mut protect = index.protect.lock(); - for seg in segs.iter() { - if protect.sealed.contains_key(&seg.uuid()) { - continue; - } - if protect.growing.contains_key(&seg.uuid()) { - continue; - } - return Ok(()); - } - for seg in segs.iter() { - protect.sealed.remove(&seg.uuid()); - protect.growing.remove(&seg.uuid()); - } - protect.sealed.insert(sealed_segment.uuid(), sealed_segment); - protect.maintain(index.options.clone(), index.delete.clone(), &index.view); + for growing in source.growing.iter() { + protect.growing.remove(&growing.uuid()); } - Ok(()) -} - -fn merge(index: &Arc>, segs: &[Seg]) -> Arc> { - let sealed = segs.iter().filter_map(|x| x.get_sealed()).collect(); - let growing = segs.iter().filter_map(|x| x.get_growing()).collect(); - let sealed_segment_uuid = Uuid::new_v4(); - let collection = IndexSource::new(index.options().clone(), sealed, growing); - SealedSegment::create( - index._tracker.clone(), - index - .path - .join("segments") - .join(sealed_segment_uuid.to_string()), - sealed_segment_uuid, - index.options.clone(), - &collection, - ) + protect.sealed.insert(next.uuid(), next); + protect.maintain(index.options.clone(), index.delete.clone(), &index.view); } diff --git a/crates/index/src/optimizing/mod.rs b/crates/index/src/optimizing/mod.rs index ab4ba8d0..ad0b9a62 100644 --- a/crates/index/src/optimizing/mod.rs +++ b/crates/index/src/optimizing/mod.rs @@ -1,3 +1,127 @@ +pub mod index_source; pub mod indexing; -pub mod sealing; -pub mod vacuum; + +use self::indexing::{make, scan}; +use crate::Index; +use crate::Op; +use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender, TryRecvError}; +use std::collections::BTreeMap; +use std::convert::Infallible; +use std::sync::Arc; +use std::thread::JoinHandle; +use std::time::Duration; +use std::time::Instant; + +pub struct Optimizing { + index: Arc>, +} + +impl Optimizing { + pub fn new(index: Arc>) -> Self { + Self { index } + } + pub fn spawn(self) -> (Sender, JoinHandle<()>) { + let (tx, rx) = bounded(1); + ( + tx, + std::thread::spawn(move || { + self.main(rx); + }), + ) + } + fn main(self, shutdown: Receiver) { + let index = self.index; + let mut tasks = BTreeMap:: Instant>>::new(); + tasks.insert(Instant::now(), { + let index = index.clone(); + let mut check = None; + let mut first = true; + Box::new(move || { + let view = index.view(); + let stamp = view + .write + .as_ref() + .map(|(uuid, segment)| (*uuid, segment.len())); + if first || stamp == check { + if let Some((uuid, len)) = stamp { + if len >= view.alterable_options.optimizing.sealing_size { + index.seal(uuid); + } + } + } else { + check = stamp; + } + first = false; + Instant::now() + Duration::from_secs(view.alterable_options.optimizing.sealing_secs) + }) + }); + tasks.insert( + Instant::now(), + Box::new(|| { + let view = index.view(); + if let Some(source) = scan(index.clone()) { + rayon::ThreadPoolBuilder::new() + .num_threads(view.alterable_options.optimizing.optimizing_threads as usize) + .build_scoped(|pool| { + let (stop_tx, stop_rx) = bounded::(0); + std::thread::scope(|scope| { + scope.spawn(|| { + let stop_rx = stop_rx; + loop { + match stop_rx.try_recv() { + Ok(never) => match never {}, + Err(TryRecvError::Empty) => (), + Err(TryRecvError::Disconnected) => return, + } + match shutdown.recv_timeout(Duration::from_secs(1)) { + Ok(never) => match never {}, + Err(RecvTimeoutError::Timeout) => (), + Err(RecvTimeoutError::Disconnected) => { + pool.stop(); + return; + } + } + } + }); + scope.spawn(|| { + let _stop_tx = stop_tx; + pool.install(|| make(index.clone(), source)); + }); + }) + }) + .unwrap(); + Instant::now() + } else { + index.instant_indexed.store(Instant::now()); + Instant::now() + Duration::from_secs(60) + } + }), + ); + loop { + while let Some(e) = tasks.first_entry() { + if *e.key() < Instant::now() { + let mut task = e.remove(); + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(&mut task)) { + Ok(instant) => { + tasks.insert(instant, task); + } + Err(e) => { + log::error!("index task panickied: {:?}", e); + } + } + } else { + break; + } + } + if let Some(e) = tasks.first_entry() { + match shutdown.recv_deadline(*e.key()) { + Ok(never) => match never {}, + Err(RecvTimeoutError::Disconnected) => return, + Err(RecvTimeoutError::Timeout) => (), + } + } else { + break; + } + } + } +} diff --git a/crates/index/src/optimizing/sealing.rs b/crates/index/src/optimizing/sealing.rs deleted file mode 100644 index 837ff2d1..00000000 --- a/crates/index/src/optimizing/sealing.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::Index; -use crate::Op; -pub use base::distance::*; -pub use base::index::*; -pub use base::search::*; -pub use base::vector::*; -use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender}; -use std::convert::Infallible; -use std::sync::Arc; -use std::thread::JoinHandle; -use std::time::Duration; - -pub struct OptimizerSealing { - index: Arc>, -} - -impl OptimizerSealing { - pub fn new(index: Arc>) -> Self { - Self { index } - } - pub fn spawn(self) -> (Sender, JoinHandle<()>) { - let (tx, rx) = bounded(1); - ( - tx, - std::thread::spawn(move || { - self.main(rx); - }), - ) - } - fn main(self, shutdown: Receiver) { - let index = self.index; - let dur = Duration::from_secs(index.options.optimizing.sealing_secs); - let least = index.options.optimizing.sealing_size; - let mut check = None; - loop { - let view = index.view(); - let stamp = view - .write - .as_ref() - .map(|(uuid, segment)| (*uuid, segment.len())); - if stamp == check { - if let Some((uuid, len)) = stamp { - if len >= least { - index.seal(uuid); - } - } - } else { - check = stamp; - } - match shutdown.recv_timeout(dur) { - Ok(never) => match never {}, - Err(RecvTimeoutError::Disconnected) => return, - Err(RecvTimeoutError::Timeout) => continue, - } - } - } -} diff --git a/crates/index/src/optimizing/vacuum.rs b/crates/index/src/optimizing/vacuum.rs deleted file mode 100644 index 8b137891..00000000 --- a/crates/index/src/optimizing/vacuum.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/index/src/segments/growing.rs b/crates/index/src/segments/growing.rs index 49e40a8a..5aae0020 100644 --- a/crates/index/src/segments/growing.rs +++ b/crates/index/src/segments/growing.rs @@ -158,7 +158,7 @@ impl GrowingSegment { pub fn stat_growing(&self) -> SegmentStat { SegmentStat { id: self.uuid, - typ: "growing".to_string(), + r#type: "growing".to_string(), length: self.len() as usize, size: (self.len() as u64) * (std::mem::size_of::>() as u64), } @@ -167,7 +167,7 @@ impl GrowingSegment { pub fn stat_write(&self) -> SegmentStat { SegmentStat { id: self.uuid, - typ: "write".to_string(), + r#type: "write".to_string(), length: self.len() as usize, size: (self.len() as u64) * (std::mem::size_of::>() as u64), } diff --git a/crates/index/src/segments/sealed.rs b/crates/index/src/segments/sealed.rs index ead5c64b..2c4d289e 100644 --- a/crates/index/src/segments/sealed.rs +++ b/crates/index/src/segments/sealed.rs @@ -7,15 +7,19 @@ use base::index::*; use base::operator::*; use base::search::*; use common::dir_ops::sync_dir; +use crossbeam::atomic::AtomicCell; use std::cmp::Reverse; use std::collections::BinaryHeap; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use uuid::Uuid; pub struct SealedSegment { uuid: Uuid, indexing: Indexing, + deletes: AtomicCell<(Instant, u32)>, _tracker: Arc, } @@ -33,6 +37,7 @@ impl SealedSegment { Arc::new(Self { uuid, indexing, + deletes: AtomicCell::new((Instant::now(), 0)), _tracker: Arc::new(SegmentTracker { path, _tracker }), }) } @@ -47,6 +52,7 @@ impl SealedSegment { Arc::new(Self { uuid, indexing, + deletes: AtomicCell::new((Instant::now(), 0)), _tracker: Arc::new(SegmentTracker { path, _tracker }), }) } @@ -59,7 +65,7 @@ impl SealedSegment { let path = self._tracker.path.join("indexing"); SegmentStat { id: self.uuid, - typ: "sealed".to_string(), + r#type: "sealed".to_string(), length: self.len() as usize, size: dir_size(&path).unwrap(), } @@ -94,4 +100,20 @@ impl SealedSegment { pub fn payload(&self, i: u32) -> Payload { self.indexing.payload(i) } + + pub fn inspect(&self, d: Duration, check: impl Fn(u64) -> bool) -> Result { + let (t, c) = self.deletes.load(); + if t.elapsed() > d { + let mut counter = 0_u32; + for i in 0..self.len() { + if check(self.payload(i).time()) { + counter += 1; + } + } + self.deletes.store((Instant::now(), counter)); + Ok(counter) + } else { + Err(c) + } + } } diff --git a/crates/service/src/instance.rs b/crates/service/src/instance.rs index 6eefbfee..a6170438 100644 --- a/crates/service/src/instance.rs +++ b/crates/service/src/instance.rs @@ -32,70 +32,74 @@ pub enum Instance { } impl Instance { - pub fn create(path: PathBuf, options: IndexOptions) -> Result { + pub fn create( + path: PathBuf, + options: IndexOptions, + alterable_options: IndexAlterableOptions, + ) -> Result { match (options.vector.d, options.vector.v) { (DistanceKind::Cos, VectorKind::Vecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Vecf32Cos(index)) } (DistanceKind::Dot, VectorKind::Vecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Vecf32Dot(index)) } (DistanceKind::L2, VectorKind::Vecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Vecf32L2(index)) } (DistanceKind::Cos, VectorKind::Vecf16) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Vecf16Cos(index)) } (DistanceKind::Dot, VectorKind::Vecf16) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Vecf16Dot(index)) } (DistanceKind::L2, VectorKind::Vecf16) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Vecf16L2(index)) } (DistanceKind::Cos, VectorKind::SVecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::SVecf32Cos(index)) } (DistanceKind::Dot, VectorKind::SVecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::SVecf32Dot(index)) } (DistanceKind::L2, VectorKind::SVecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::SVecf32L2(index)) } (DistanceKind::Cos, VectorKind::BVecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::BVecf32Cos(index)) } (DistanceKind::Dot, VectorKind::BVecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::BVecf32Dot(index)) } (DistanceKind::L2, VectorKind::BVecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::BVecf32L2(index)) } (DistanceKind::Jaccard, VectorKind::BVecf32) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::BVecf32Jaccard(index)) } (DistanceKind::L2, VectorKind::Veci8) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Veci8L2(index)) } (DistanceKind::Cos, VectorKind::Veci8) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Veci8Cos(index)) } (DistanceKind::Dot, VectorKind::Veci8) => { - let index = Index::create(path.clone(), options)?; + let index = Index::create(path.clone(), options, alterable_options)?; Ok(Self::Veci8Dot(index)) } (DistanceKind::Jaccard, _) => Err(CreateError::InvalidIndexOptions { @@ -187,7 +191,7 @@ impl Instance { Instance::Veci8Dot(x) => x.stat(), } } - pub fn alter(&self, key: String, value: String) -> Result<(), AlterError> { + pub fn alter(&self, key: &str, value: &str) -> Result<(), AlterError> { match self { Instance::Vecf32Cos(x) => x.alter(key, value), Instance::Vecf32Dot(x) => x.alter(key, value), diff --git a/crates/service/src/worker.rs b/crates/service/src/worker.rs index 94357510..0439b21d 100644 --- a/crates/service/src/worker.rs +++ b/crates/service/src/worker.rs @@ -7,6 +7,7 @@ use base::worker::*; use common::clean::clean; use common::dir_ops::sync_dir; use common::file_atomic::FileAtomic; +use index::OutdatedError; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -46,6 +47,7 @@ impl Worker { for &id in startup.get().indexes.iter() { let path = path.join("indexes").join(id.to_string()); let index = Instance::open(path); + index.start(); indexes.insert(id, index); } let view = Arc::new(WorkerView { @@ -64,13 +66,21 @@ impl Worker { } impl WorkerOperations for Worker { - fn create(&self, handle: Handle, options: IndexOptions) -> Result<(), CreateError> { + fn create( + &self, + handle: Handle, + options: IndexOptions, + alterable_options: IndexAlterableOptions, + ) -> Result<(), CreateError> { use std::collections::hash_map::Entry; let mut protect = self.protect.lock(); match protect.indexes.entry(handle) { Entry::Vacant(o) => { - let index = - Instance::create(self.path.join("indexes").join(handle.to_string()), options)?; + let index = Instance::create( + self.path.join("indexes").join(handle.to_string()), + options, + alterable_options, + )?; index.start(); o.insert(index); protect.maintain(&self.view); @@ -96,6 +106,7 @@ impl WorkerOperations for Worker { let index = Instance::create( self.path.join("indexes").join(handle.to_string()), options, + alterable_options, )?; index.start(); protect.indexes.insert(handle, index); @@ -143,7 +154,9 @@ impl WorkerOperations for Worker { let view = instance.view(); match view.insert(vector.clone(), pointer)? { Ok(()) => break, - Err(_) => instance.refresh(), + Err(OutdatedError) => { + instance.refresh(); + } } } Ok(()) @@ -176,11 +189,23 @@ impl WorkerOperations for Worker { let stat = instance.stat(); Ok(stat) } - fn alter(&self, handle: Handle, key: String, value: String) -> Result<(), AlterError> { + fn alter(&self, handle: Handle, key: &str, value: &str) -> Result<(), AlterError> { let view = self.view(); let instance = view.get(handle).ok_or(AlterError::NotExist)?; instance.alter(key, value) } + fn stop(&self, handle: Handle) -> Result<(), StopError> { + let view = self.view(); + let instance = view.get(handle).ok_or(StopError::NotExist)?; + instance.stop(); + Ok(()) + } + fn start(&self, handle: Handle) -> Result<(), StartError> { + let view = self.view(); + let instance = view.get(handle).ok_or(StartError::NotExist)?; + instance.start(); + Ok(()) + } } pub struct WorkerView { diff --git a/crates/stand-alone-test/src/bin/main.rs b/crates/stand-alone-test/src/bin/main.rs index 1335efbc..16d2ed8e 100644 --- a/crates/stand-alone-test/src/bin/main.rs +++ b/crates/stand-alone-test/src/bin/main.rs @@ -36,7 +36,7 @@ enum Commands { /// ef_construction parameter for HNSW index #[arg(short, long)] - ef_construction: usize, + ef_construction: u32, /// directory to save the hnsw index #[arg(short, long)] @@ -67,7 +67,7 @@ enum Commands { /// ef_search parameter for HNSW search #[arg(long)] - ef: usize, + ef: u32, }, } diff --git a/crates/stand-alone-test/src/hnsw.rs b/crates/stand-alone-test/src/hnsw.rs index b250a1fe..3665bbda 100644 --- a/crates/stand-alone-test/src/hnsw.rs +++ b/crates/stand-alone-test/src/hnsw.rs @@ -188,7 +188,7 @@ fn mock_make(path: &Path, data_path: &Path, options: IndexOptions) -> HnswRam(query_file).unwrap(); diff --git a/src/bgworker/normal.rs b/src/bgworker/normal.rs index 558446a9..c4225a78 100644 --- a/src/bgworker/normal.rs +++ b/src/bgworker/normal.rs @@ -65,8 +65,18 @@ fn session(worker: Arc, handler: ServerRpcHandler) -> Result { - handler = x.leave(WorkerOperations::create(worker.as_ref(), handle, options))?; + ServerRpcHandle::Create { + handle, + options, + alterable_options, + x, + } => { + handler = x.leave(WorkerOperations::create( + worker.as_ref(), + handle, + options, + alterable_options, + ))?; } ServerRpcHandle::Drop { handle, x } => { handler = x.leave(WorkerOperations::drop(worker.as_ref(), handle))?; @@ -95,7 +105,7 @@ fn session(worker: Arc, handler: ServerRpcHandler) -> Result { - handler = x.leave(worker.alter(handle, key, value))?; + handler = x.leave(worker.alter(handle, &key, &value))?; } ServerRpcHandle::Basic { handle, @@ -188,6 +198,12 @@ fn session(worker: Arc, handler: ServerRpcHandler) -> Result handler = x.error_err(e)?, }; } + ServerRpcHandle::Stop { handle, x } => { + handler = x.leave(worker.stop(handle))?; + } + ServerRpcHandle::Start { handle, x } => { + handler = x.leave(worker.start(handle))?; + } } } } diff --git a/src/gucs/executing.rs b/src/gucs/executing.rs index 4d093113..9f099189 100644 --- a/src/gucs/executing.rs +++ b/src/gucs/executing.rs @@ -41,7 +41,7 @@ pub unsafe fn init() { pub fn search_options() -> SearchOptions { SearchOptions { prefilter_enable: ENABLE_PREFILTER.get(), - hnsw_ef_search: HNSW_EF_SEARCH.get() as usize, + hnsw_ef_search: HNSW_EF_SEARCH.get() as u32, ivf_nprobe: IVF_NPROBE.get() as u32, } } diff --git a/src/index/am.rs b/src/index/am.rs index 223c6cea..be387a29 100644 --- a/src/index/am.rs +++ b/src/index/am.rs @@ -141,15 +141,19 @@ pub unsafe extern "C" fn ambuild( } let oid = unsafe { (*index).rd_id }; let handle = from_oid_to_handle(oid); - let options = unsafe { am_options::options(index) }; + let (options, alterable_options) = unsafe { am_options::options(index) }; let mut rpc = check_client(client()); - match rpc.create(handle, options) { + match rpc.create(handle, options, alterable_options) { Ok(()) => (), Err(CreateError::InvalidIndexOptions { reason }) => { bad_service_invalid_index_options(&reason); } } on_index_build(handle); + match rpc.stop(handle) { + Ok(()) => (), + Err(StopError::NotExist) => pgrx::error!("internal error"), + } let result = unsafe { pgrx::PgBox::::alloc0() }; let mut builder = Builder { rpc, @@ -199,6 +203,33 @@ pub unsafe extern "C" fn ambuild( (*state.result).heap_tuples += 1.0; } } + let mut rpc = builder.rpc; + match rpc.start(handle) { + Ok(()) => (), + Err(StartError::NotExist) => pgrx::error!("internal error"), + } + loop { + pgrx::check_for_interrupts!(); + match rpc.stat(handle) { + Ok(s) => { + if !s.indexing { + break; + } + } + Err(StatError::NotExist) => pgrx::error!("internal error"), + } + unsafe { + pgrx::pg_sys::WaitLatch( + pgrx::pg_sys::MyLatch, + (pgrx::pg_sys::WL_LATCH_SET + | pgrx::pg_sys::WL_TIMEOUT + | pgrx::pg_sys::WL_EXIT_ON_PM_DEATH) as _, + 1000, + pgrx::pg_sys::WaitEventTimeout_WAIT_EVENT_PG_SLEEP, + ); + pgrx::pg_sys::ResetLatch(pgrx::pg_sys::MyLatch); + } + } result.into_pg() } diff --git a/src/index/am_options.rs b/src/index/am_options.rs index f9634103..99ab1e16 100644 --- a/src/index/am_options.rs +++ b/src/index/am_options.rs @@ -107,7 +107,7 @@ unsafe fn convert_reloptions_to_options( } } -pub unsafe fn options(index: pgrx::pg_sys::Relation) -> IndexOptions { +pub unsafe fn options(index: pgrx::pg_sys::Relation) -> (IndexOptions, IndexAlterableOptions) { let opfamily = unsafe { (*index).rd_opfamily.read() }; let att = unsafe { &mut *(*index).rd_att }; let atts = unsafe { att.attrs.as_slice(att.natts as _) }; @@ -125,10 +125,12 @@ pub unsafe fn options(index: pgrx::pg_sys::Relation) -> IndexOptions { // get segment, optimizing, indexing let (segment, optimizing, indexing) = unsafe { convert_reloptions_to_options((*index).rd_options) }; - IndexOptions { - vector: VectorOptions { dims, v, d }, - segment, - optimizing, - indexing, - } + ( + IndexOptions { + vector: VectorOptions { dims, v, d }, + segment, + indexing, + }, + IndexAlterableOptions { optimizing }, + ) } diff --git a/src/index/views.rs b/src/index/views.rs index 11198ae8..81704018 100644 --- a/src/index/views.rs +++ b/src/index/views.rs @@ -40,7 +40,7 @@ fn _vectors_index_stat( "idx_sealed", segments .iter() - .filter(|x| x.typ == "sealed") + .filter(|x| x.r#type == "sealed") .map(|x| x.length as i64) .collect::>(), ) @@ -49,7 +49,7 @@ fn _vectors_index_stat( "idx_growing", segments .iter() - .filter(|x| x.typ == "growing") + .filter(|x| x.r#type == "growing") .map(|x| x.length as i64) .collect::>(), ) @@ -58,7 +58,7 @@ fn _vectors_index_stat( "idx_write", segments .iter() - .filter(|x| x.typ == "write") + .filter(|x| x.r#type == "write") .map(|x| x.length as i64) .sum::(), ) diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 5486a071..de654a5a 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -320,7 +320,7 @@ macro_rules! defines { } defines! { - unary create(handle: Handle, options: IndexOptions) -> (); + unary create(handle: Handle, options: IndexOptions, alterable_options: IndexAlterableOptions) -> (); unary drop(handle: Handle) -> (); unary flush(handle: Handle) -> (); unary insert(handle: Handle, vector: OwnedVector, pointer: Pointer) -> (); @@ -330,4 +330,6 @@ defines! { stream list(handle: Handle) -> Pointer; unary stat(handle: Handle) -> IndexStat; unary alter(handle: Handle, key: String, value: String) -> (); + unary stop(handle: Handle) -> (); + unary start(handle: Handle) -> (); } diff --git a/tests/sealing/test.sh b/tests/sealing/test.sh index c5ffbb20..6c730058 100755 --- a/tests/sealing/test.sh +++ b/tests/sealing/test.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash set -e -# Test the background threads `optimizing.indexing` and `optimizing.sealing` working properly +# Test the background threads `optimizing.optimizing_indexing` and `optimizing.sealing_*` working properly sqllogictest -u runner -d runner $(dirname $0)/create.slt -sleep 240 +sleep 20 sqllogictest -u runner -d runner $(dirname $0)/check.slt diff --git a/tests/sqllogictest/bvector.slt b/tests/sqllogictest/bvector.slt index 41532d5b..f5550e28 100644 --- a/tests/sqllogictest/bvector.slt +++ b/tests/sqllogictest/bvector.slt @@ -17,11 +17,11 @@ WITH (options = "[indexing.hnsw]"); statement ok CREATE INDEX ON t USING vectors (val bvector_cos_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = "[indexing.hnsw]"); statement ok CREATE INDEX ON t USING vectors (val bvector_jaccard_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = "[indexing.hnsw]"); query I SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0,1,0]'::bvector limit 10) t2; @@ -29,12 +29,12 @@ SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0,1,0]'::bvector limit 10 query I -SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <=> '[0,1,0]'::bvector limit 10) t2; +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0,1,0]'::bvector limit 10) t2; ---- 10 query I -SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0,1,0]'::bvector limit 10) t2; +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <=> '[0,1,0]'::bvector limit 10) t2; ---- 10 diff --git a/tests/sqllogictest/cast.slt b/tests/sqllogictest/cast.slt index 52574763..e9bc3624 100644 --- a/tests/sqllogictest/cast.slt +++ b/tests/sqllogictest/cast.slt @@ -1,3 +1,6 @@ +statement ok +SET search_path TO pg_temp, vectors; + # cast string to vector query I SELECT '[1,2,3]'::vector; diff --git a/tests/sqllogictest/compact_stmt.slt b/tests/sqllogictest/compact_stmt.slt index d197d6d3..e119392f 100644 --- a/tests/sqllogictest/compact_stmt.slt +++ b/tests/sqllogictest/compact_stmt.slt @@ -1,3 +1,6 @@ +statement ok +SET search_path TO pg_temp, vectors; + statement ok DROP TABLE IF EXISTS t; diff --git a/tests/sqllogictest/compact_var.slt b/tests/sqllogictest/compact_var.slt index ad9176b6..72ccc980 100644 --- a/tests/sqllogictest/compact_var.slt +++ b/tests/sqllogictest/compact_var.slt @@ -1,3 +1,6 @@ +statement ok +SET search_path TO pg_temp, vectors; + statement ok SET vectors.pgvector_compatibility=off; diff --git a/tests/sqllogictest/drop_index_when_indexing.slt b/tests/sqllogictest/drop_index_when_indexing.slt index aee479e5..485fdfae 100644 --- a/tests/sqllogictest/drop_index_when_indexing.slt +++ b/tests/sqllogictest/drop_index_when_indexing.slt @@ -5,11 +5,14 @@ statement ok CREATE TABLE t (val vector(10)); statement ok -INSERT INTO t (val) SELECT ARRAY[random(), random(), random(), random(), random(), random(), random(), random(), random(), random()]::real[] FROM generate_series(1, 100000); +INSERT INTO t (val) SELECT ARRAY[random(), random(), random(), random(), random(), random(), random(), random(), random(), random()]::real[] FROM generate_series(1, 1000); statement ok CREATE INDEX ON t USING vectors (val vector_l2_ops) -WITH (options = "[indexing.hnsw]"); +WITH (options = $$ +optimizing.optimizing_threads = 16 +[indexing.hnsw] +$$); statement ok DROP INDEX t_val_idx; diff --git a/tests/sqllogictest/fp16.slt b/tests/sqllogictest/fp16.slt index 8362fb35..e7cdf604 100644 --- a/tests/sqllogictest/fp16.slt +++ b/tests/sqllogictest/fp16.slt @@ -17,7 +17,7 @@ WITH (options = "[indexing.hnsw]"); statement ok CREATE INDEX ON t USING vectors (val vecf16_cos_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = "[indexing.hnsw]"); query I @@ -26,12 +26,12 @@ SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0.5,0.5,0.5]'::vecf16 l 10 query I -SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <=> '[0.5,0.5,0.5]'::vecf16 limit 10) t2; +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]'::vecf16 limit 10) t2; ---- 10 query I -SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]'::vecf16 limit 10) t2; +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <=> '[0.5,0.5,0.5]'::vecf16 limit 10) t2; ---- 10 diff --git a/tests/sqllogictest/index.slt b/tests/sqllogictest/index.slt index e718cfa1..0cfe0a24 100644 --- a/tests/sqllogictest/index.slt +++ b/tests/sqllogictest/index.slt @@ -22,7 +22,7 @@ WITH (options = "[indexing.hnsw]"); statement ok CREATE INDEX ON t USING vectors (val vector_cos_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = "[indexing.hnsw]"); query I SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0.5,0.5,0.5]' limit 10) t2; diff --git a/tests/sqllogictest/index_edit.slt b/tests/sqllogictest/index_edit.slt index ea74b226..7ba247a8 100644 --- a/tests/sqllogictest/index_edit.slt +++ b/tests/sqllogictest/index_edit.slt @@ -19,14 +19,17 @@ SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) statement error does not exist SELECT alter_vector_index('unknown_index'::regclass::oid, 'optimizing.threads', '1'); -statement error Setting key +statement error not found SELECT alter_vector_index('hnsw_1'::regclass::oid, 'unknown_key', '1'); -statement error wrong value -SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.threads', 'unknown_value'); +statement error Invalid index options +SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.optimizing_threads', 'unknown_value'); + +statement error Invalid index options +SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.optimizing_threads', '0'); statement ok -SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.threads', '1'); +SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.optimizing_threads', '1'); query I SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) t2; diff --git a/tests/sqllogictest/int8.slt b/tests/sqllogictest/int8.slt index a4097ed6..caac864d 100644 --- a/tests/sqllogictest/int8.slt +++ b/tests/sqllogictest/int8.slt @@ -17,7 +17,7 @@ WITH (options = "[indexing.hnsw]"); statement ok CREATE INDEX ON t USING vectors (val veci8_cos_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = "[indexing.hnsw]"); query I @@ -26,12 +26,12 @@ SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0.5,0.5,0.5]'::veci8 li 10 query I -SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <=> '[0.5,0.5,0.5]'::veci8 limit 10) t2; +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]'::veci8 limit 10) t2; ---- 10 query I -SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]'::veci8 limit 10) t2; +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <=> '[0.5,0.5,0.5]'::veci8 limit 10) t2; ---- 10 diff --git a/tests/sqllogictest/ivf.slt b/tests/sqllogictest/ivf.slt index 4b8a7ebb..df6170c6 100644 --- a/tests/sqllogictest/ivf.slt +++ b/tests/sqllogictest/ivf.slt @@ -11,7 +11,10 @@ INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM gene statement ok CREATE INDEX ON t USING vectors (val vector_l2_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = $$ +[indexing.ivf] +nlist = 100 +$$); statement ok INSERT INTO t (val) VALUES ('[0.6,0.6,0.6]'); diff --git a/tests/sqllogictest/operator.slt b/tests/sqllogictest/operator.slt index 594a39fb..d431eab4 100644 --- a/tests/sqllogictest/operator.slt +++ b/tests/sqllogictest/operator.slt @@ -1,3 +1,6 @@ +statement ok +SET search_path TO pg_temp, vectors; + # basic + - = <> < <= > >= query I SELECT '[1,2.3,4e5]'::vector + '[6,7.8,9e10]'; diff --git a/tests/sqllogictest/partition.slt b/tests/sqllogictest/partition.slt index b6e4ec71..ad1ad86f 100644 --- a/tests/sqllogictest/partition.slt +++ b/tests/sqllogictest/partition.slt @@ -32,7 +32,7 @@ SELECT COUNT(1) FROM (SELECT 1 FROM items ORDER BY val <-> '[0.5,0.5,0.5]' limit statement ok CREATE INDEX ON id_123 USING vectors (val vector_cos_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = "[indexing.hnsw]"); query I SELECT COUNT(1) FROM (SELECT 1 FROM items ORDER BY val <=> '[0.5,0.5,0.5]' limit 10) t2; @@ -42,7 +42,7 @@ SELECT COUNT(1) FROM (SELECT 1 FROM items ORDER BY val <=> '[0.5,0.5,0.5]' limit # partial index statement ok CREATE INDEX ON items USING vectors (val vector_dot_ops) -WITH (options = "[indexing.ivf]") WHERE (category_id = 1); +WITH (options = "[indexing.hnsw]") WHERE (category_id = 1); query I SELECT COUNT(1) FROM diff --git a/tests/sqllogictest/sparse.slt b/tests/sqllogictest/sparse.slt index 66739a02..018b3742 100644 --- a/tests/sqllogictest/sparse.slt +++ b/tests/sqllogictest/sparse.slt @@ -17,7 +17,7 @@ WITH (options = "[indexing.hnsw]"); statement ok CREATE INDEX ON t USING vectors (val svector_cos_ops) -WITH (options = "[indexing.ivf]"); +WITH (options = "[indexing.hnsw]"); query I SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0.5,0.5,0.5,0.5,0.5,0.5]'::svector limit 10) t2; diff --git a/tests/sqllogictest/vbase.slt b/tests/sqllogictest/vbase.slt index 47d054e7..91c1e180 100644 --- a/tests/sqllogictest/vbase.slt +++ b/tests/sqllogictest/vbase.slt @@ -5,11 +5,14 @@ statement ok CREATE TABLE t (val vector(3)); statement ok -INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 100000); +INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); statement ok CREATE INDEX ON t USING vectors (val vector_l2_ops) -WITH (options = "[indexing.hnsw]"); +WITH (options = $$ +optimizing.optimizing_threads = 16 +[indexing.hnsw] +$$); statement ok INSERT INTO t (val) VALUES ('[0.6,0.6,0.6]');