Skip to content

Commit

Permalink
fix: optiming threads deadlock (#453)
Browse files Browse the repository at this point in the history
* fix: optiming threads deadlock

Signed-off-by: cutecutecat <[email protected]>

* fix by comments

Signed-off-by: cutecutecat <[email protected]>

* Update crates/rayon/src/lib.rs

test clippy

* Update crates/rayon/src/lib.rs

test clippy

* fix monitor status

Signed-off-by: cutecutecat <[email protected]>

---------

Signed-off-by: cutecutecat <[email protected]>
Co-authored-by: usamoi <[email protected]>
  • Loading branch information
cutecutecat and usamoi committed Apr 3, 2024
1 parent 97ce096 commit b3c9181
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 17 deletions.
8 changes: 8 additions & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
[default.extend-words]
ND = "ND"
DUR = "DUR"
ANS = "ANS"

[default]
extend-ignore-re = [
# Latex formula
"\\$.+?\\$",
]

[files]
extend-exclude = ["vendor/pg_config/*.txt", "vendor/pgrx_binding/*.rs"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pgvecto.rs is a Postgres extension that provides vector similarity search functi
| | 🔪 Matryoshka embeddings | Subvector indexing, like vector[0:256], for enhanced Matryoshka embeddings. |
| | ⬆️ Extended Vector Length | Vector lengths up to 65535 supported, ideal for the latest cutting-edge models. |
| **System Performance** | 🚀 Production Ready | Battle-tested database ecosystem integrated with PostgreSQL. |
| | ⚙️ High Availability | Logical replication support to ensure high availbility. |
| | ⚙️ High Availability | Logical replication support to ensure high availability. |
| | 💡 Resource Efficient | Efficient attribute storage leveraging PostgreSQL. |
| **Security & Permissions** | 🔒 Permission Control | Easy access control like read-only roles, powered by PostgreSQL. |

Expand Down
48 changes: 40 additions & 8 deletions crates/index/src/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use base::index::*;
use base::operator::Borrowed;
pub use base::search::*;
pub use base::vector::*;
use crossbeam::channel::RecvError;
use crossbeam::channel::TryRecvError;
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
use std::cmp::Reverse;
use std::convert::Infallible;
Expand Down Expand Up @@ -99,26 +99,32 @@ impl<O: Op> OptimizerIndexing<O> {
}),
)
}
fn main(self, shutdown: Receiver<Infallible>) {
fn main(self, shutdown_rx: Receiver<Infallible>) {
let index = self.index;
loop {
let view = index.view();
let threads = view.flexible.optimizing_threads;
let (finish_tx, finish_rx) = bounded::<Infallible>(1);
rayon::ThreadPoolBuilder::new()
.num_threads(threads as usize)
.build_scoped(|pool| {
std::thread::scope(|scope| {
scope.spawn(|| match shutdown.recv() {
Ok(never) => match never {},
Err(RecvError) => {
pool.stop();
let handler = scope.spawn(|| {
let status = monitor(&finish_rx, &shutdown_rx);
match status {
MonitorStatus::Finished => (),
MonitorStatus::Shutdown => pool.stop(),
}
});
let _ = pool.install(|| optimizing_indexing(index.clone()));
pool.install(|| {
let _finish_tx = finish_tx;
let _ = optimizing_indexing(index.clone());
});
let _ = handler.join();
})
})
.unwrap();
match shutdown.recv_timeout(std::time::Duration::from_secs(60)) {
match shutdown_rx.recv_timeout(std::time::Duration::from_secs(60)) {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => return,
Err(RecvTimeoutError::Timeout) => (),
Expand All @@ -127,6 +133,32 @@ impl<O: Op> OptimizerIndexing<O> {
}
}

pub enum MonitorStatus {
Finished,
Shutdown,
}

/// Monitor the internal finish and the external shutdown of `optimizing_indexing`
fn monitor(finish_rx: &Receiver<Infallible>, shutdown_rx: &Receiver<Infallible>) -> MonitorStatus {
let timeout = std::time::Duration::from_secs(1);
loop {
match finish_rx.try_recv() {
Ok(never) => match never {},
Err(TryRecvError::Disconnected) => {
return MonitorStatus::Finished;
}
Err(TryRecvError::Empty) => (),
}
match shutdown_rx.recv_timeout(timeout) {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => {
return MonitorStatus::Shutdown;
}
Err(RecvTimeoutError::Timeout) => (),
}
}
}

enum Seg<O: Op> {
Sealed(Arc<SealedSegment<O>>),
Growing(Arc<GrowingSegment<O>>),
Expand Down
18 changes: 12 additions & 6 deletions crates/rayon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![feature(thread_local)]

use rayoff as rayon;
use std::cell::OnceCell;
use std::cell::RefCell;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -51,6 +51,15 @@ impl ThreadPoolBuilder {
let stop = Arc::new(AtomicBool::new(false));
match std::panic::catch_unwind(AssertUnwindSafe(|| {
self.builder
.start_handler({
let stop = stop.clone();
move |_| {
STOP.replace(Some(stop.clone()));
}
})
.exit_handler(|_| {
STOP.take();
})
.panic_handler(|e| {
if e.downcast_ref::<CheckPanic>().is_some() {
return;
Expand All @@ -60,9 +69,6 @@ impl ThreadPoolBuilder {
.build_scoped(
|thread| thread.run(),
|pool| {
pool.broadcast(|_| {
STOP.set(stop.clone()).unwrap();
});
let pool = ThreadPool::new(stop.clone(), pool);
f(&pool)
},
Expand Down Expand Up @@ -105,12 +111,12 @@ impl<'a> ThreadPool<'a> {
}

#[thread_local]
static STOP: OnceCell<Arc<AtomicBool>> = OnceCell::new();
static STOP: RefCell<Option<Arc<AtomicBool>>> = RefCell::new(None);

struct CheckPanic;

pub fn check() {
if let Some(stop) = STOP.get() {
if let Some(stop) = STOP.borrow().as_ref() {
if stop.load(Ordering::Relaxed) {
std::panic::panic_any(CheckPanic);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/crash/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ def process_filter(p: psutil.Process) -> bool:
if len(procs) == 1:
logging.info(f"Background worker recreated {pids}")
break
time.sleep(1)
time.sleep(1)
5 changes: 4 additions & 1 deletion tests/crash/restore.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ t
query I
SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0.5,0.5,0.5]' limit 10) t2;
----
10
10

statement ok
DROP TABLE t;
17 changes: 17 additions & 0 deletions tests/sealing/check.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
query I
SELECT idx_indexing FROM pg_vector_index_stat WHERE indexname = 'i';
----
f

query I
SELECT idx_growing FROM pg_vector_index_stat WHERE indexname = 'i';
----
{}

query I
SELECT idx_sealed FROM pg_vector_index_stat WHERE indexname = 'i';
----
{1000}

statement ok
DROP TABLE t;
9 changes: 9 additions & 0 deletions tests/sealing/create.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
statement ok
CREATE TABLE t (val vector(3));

statement ok
INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000);

statement ok
CREATE INDEX i ON t USING vectors (val vector_l2_ops)
WITH (options = "[indexing.hnsw]");
7 changes: 7 additions & 0 deletions tests/sealing/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -e

# Test the background threads `optimizing.indexing` and `optimizing.sealing` working properly
sqllogictest -u runner -d runner $(dirname $0)/create.slt
sleep 240
sqllogictest -u runner -d runner $(dirname $0)/check.slt

0 comments on commit b3c9181

Please sign in to comment.