diff --git a/crates/store/src/backend/mod.rs b/crates/store/src/backend/mod.rs index 11dd3619b..199874239 100644 --- a/crates/store/src/backend/mod.rs +++ b/crates/store/src/backend/mod.rs @@ -23,12 +23,12 @@ pub mod postgres; pub mod redis; #[cfg(feature = "rocks")] pub mod rocksdb; +#[cfg(feature = "rqlite")] +pub mod rqlite; #[cfg(feature = "s3")] pub mod s3; #[cfg(feature = "sqlite")] pub mod sqlite; -#[cfg(feature = "rqlite")] -pub mod rqlite; pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize; pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1; diff --git a/crates/store/src/backend/rqlite/blob.rs b/crates/store/src/backend/rqlite/blob.rs index 0fc1f8a1c..f407ef69c 100644 --- a/crates/store/src/backend/rqlite/blob.rs +++ b/crates/store/src/backend/rqlite/blob.rs @@ -8,9 +8,9 @@ use std::ops::Range; use rusqlite::OptionalExtension; -use super::{into_error, SqliteStore}; +use super::{into_error, RqliteStore}; -impl SqliteStore { +impl RqliteStore { pub(crate) async fn get_blob( &self, key: &[u8], @@ -19,12 +19,14 @@ impl SqliteStore { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { let mut result = conn - .prepare_cached("SELECT v FROM t WHERE k = ?") + .exec(rqlite_rs::query!("SELECT v FROM t WHERE k = ?", key)) + .await .map_err(into_error)?; result - .query_row([&key], |row| { + .first() + .map(|row| { Ok({ - let bytes = row.get_ref(0)?.as_bytes()?; + let bytes = row.get_by_index(0)?.as_bytes()?; if range.start == 0 && range.end == usize::MAX { bytes.to_vec() } else { @@ -35,7 +37,6 @@ impl SqliteStore { } }) }) - .optional() .map_err(into_error) }) .await @@ -44,11 +45,14 @@ impl SqliteStore { pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { - conn.prepare_cached("INSERT OR REPLACE INTO t (k, v) VALUES (?, ?)") - .map_err(into_error)? - .execute([key, data]) - .map_err(into_error) - .map(|_| ()) + conn.exec(rqlite_rs::query!( + "INSERT OR REPLACE INTO t (k, v) VALUES (?, ?)", + key, + data + )) + .await + .map_err(into_error) + .map(|_| ()) }) .await } @@ -56,9 +60,8 @@ impl SqliteStore { pub(crate) async fn delete_blob(&self, key: &[u8]) -> trc::Result { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { - conn.prepare_cached("DELETE FROM t WHERE k = ?") - .map_err(into_error)? - .execute([key]) + conn.exec(rqlite_rs::query!("DELETE FROM t WHERE k = ?", key)) + .await .map_err(into_error) .map(|_| true) }) diff --git a/crates/store/src/backend/rqlite/lookup.rs b/crates/store/src/backend/rqlite/lookup.rs index 6b83aaf75..07def9b45 100644 --- a/crates/store/src/backend/rqlite/lookup.rs +++ b/crates/store/src/backend/rqlite/lookup.rs @@ -3,14 +3,14 @@ * * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ - -use rusqlite::{types::FromSql, Row, Rows, ToSql}; +use rqlite_rs::query::arguments::RqliteArgument; +use rqlite_rs::query::{Operation, RqliteQuery}; use crate::{IntoRows, QueryResult, QueryType, Value}; -use super::{into_error, SqliteStore}; +use super::{into_error, RqliteStore}; -impl SqliteStore { +impl RqliteStore { pub(crate) async fn query( &self, query: &str, @@ -18,26 +18,36 @@ impl SqliteStore { ) -> trc::Result { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { - let mut s = conn.prepare_cached(query).map_err(into_error)?; - let params = params_ - .iter() - .map(|v| v as &(dyn rusqlite::types::ToSql)) - .collect::>(); + let params: Vec = + params_.iter().map(|v| (&v.to_owned()).into()).collect(); + + let mut query = RqliteQuery { + query: query.to_string(), + args: params, + op: Operation::Select, + }; match T::query_type() { - QueryType::Execute => s - .execute(params.as_slice()) + QueryType::Execute => conn + .exec(query) + .await + .map_err(into_error)? .map_or_else(|e| Err(into_error(e)), |r| Ok(T::from_exec(r))), - QueryType::Exists => s - .exists(params.as_slice()) + QueryType::Exists => conn + .fetch(query) + .await + .map_err(into_error)? + .first() .map(T::from_exists) .map_err(into_error), - QueryType::QueryOne => s - .query(params.as_slice()) - .and_then(|mut rows| Ok(T::from_query_one(rows.next()?))) + QueryType::QueryOne => conn + .fetch(query) + .await + .map_err(into_error)? + .and_then(|mut rows| Ok(T::from_query_one(rows.first()?))) .map_err(into_error), QueryType::QueryAll => Ok(T::from_query_all( - s.query(params.as_slice()).map_err(into_error)?, + conn.fetch(query).await.map_err(into_error)?, )), } }) @@ -45,21 +55,20 @@ impl SqliteStore { } } -impl ToSql for Value<'_> { - fn to_sql(&self) -> rusqlite::Result> { - match self { - Value::Integer(value) => value.to_sql(), - Value::Bool(value) => value.to_sql(), - Value::Float(value) => value.to_sql(), - Value::Text(value) => value.to_sql(), - Value::Blob(value) => value.to_sql(), - Value::Null => Ok(rusqlite::types::ToSqlOutput::Owned( - rusqlite::types::Value::Null, - )), +impl From<&Value<'_>> for RqliteArgument { + fn from(value: &Value<'_>) -> RqliteArgument { + match value { + Value::Integer(u) => RqliteArgument::I64(*u as i64), + Value::Bool(b) => RqliteArgument::Bool(*b), + Value::Float(f) => RqliteArgument::F64(*f as f64), + Value::Text(s) => RqliteArgument::String(s.to_string()), + Value::Blob(blob) => RqliteArgument::Blob(blob.to_vec()), + Value::Null => RqliteArgument::Null, } } } +/* impl FromSql for Value<'static> { fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { Ok(match value { @@ -72,8 +81,9 @@ impl FromSql for Value<'static> { rusqlite::types::ValueRef::Blob(v) => Value::Blob(v.to_vec().into()), }) } -} +} */ +/* impl IntoRows for Rows<'_> { fn into_rows(mut self) -> crate::Rows { let column_count = self.as_ref().map(|s| s.column_count()).unwrap_or_default(); @@ -124,7 +134,8 @@ impl IntoRows for Rows<'_> { unreachable!() } } - +*/ +/* impl IntoRows for Option<&Row<'_>> { fn into_row(self) -> Option { self.map(|row| crate::Row { @@ -142,3 +153,4 @@ impl IntoRows for Option<&Row<'_>> { unreachable!() } } +*/ diff --git a/crates/store/src/backend/rqlite/main.rs b/crates/store/src/backend/rqlite/main.rs index 716670f8d..cd9f7ef75 100644 --- a/crates/store/src/backend/rqlite/main.rs +++ b/crates/store/src/backend/rqlite/main.rs @@ -10,11 +10,17 @@ use utils::config::{utils::AsKey, Config}; use crate::*; -use super::{into_error, pool::SqliteConnectionManager, SqliteStore}; +use super::{into_error, pool::RqliteConnectionManager, RqliteStore}; -impl SqliteStore { - pub fn open(config: &mut Config, prefix: impl AsKey) -> Option { +impl RqliteStore { + pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option { let prefix = prefix.as_key(); + let endpoints = config + .properties::((&prefix, "endpoints")) + .into_iter() + .map(|(_key, addr_str)| addr_str) + .collect::>(); + let db = Self { conn_pool: Pool::builder() .max_size( @@ -22,17 +28,7 @@ impl SqliteStore { .property((&prefix, "pool.max-connections")) .unwrap_or_else(|| (num_cpus::get() * 4) as u32), ) - .build( - SqliteConnectionManager::file(config.value_require((&prefix, "path"))?) - .with_init(|c| { - c.execute_batch(concat!( - "PRAGMA journal_mode = WAL; ", - "PRAGMA synchronous = NORMAL; ", - "PRAGMA temp_store = memory;", - "PRAGMA busy_timeout = 30000;" - )) - }), - ) + .build(RqliteConnectionManager::endpoints(endpoints)) .map_err(|err| { config.new_build_error( prefix.as_str(), @@ -58,40 +54,20 @@ impl SqliteStore { .ok()?, }; - if let Err(err) = db.create_tables() { + if let Err(err) = db.create_tables().await { config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}")); } Some(db) } - #[cfg(feature = "test_mode")] - pub fn open_memory() -> trc::Result { - use super::into_error; - - let db = Self { - conn_pool: Pool::builder() - .max_size(1) - .build(SqliteConnectionManager::memory()) - .map_err(into_error)?, - worker_pool: rayon::ThreadPoolBuilder::new() - .num_threads(num_cpus::get()) - .build() - .map_err(|err| { - into_error(err).ctx(trc::Key::Reason, "Failed to build worker pool") - })?, - }; - db.create_tables()?; - Ok(db) - } - - pub(super) fn create_tables(&self) -> trc::Result<()> { + pub(crate) async fn create_tables(&self) -> trc::Result<()> { let conn = self.conn_pool.get().map_err(into_error)?; for table in [ SUBSPACE_ACL, SUBSPACE_DIRECTORY, - SUBSPACE_FTS_QUEUE, + SUBSPACE_TASK_QUEUE, SUBSPACE_BLOB_RESERVE, SUBSPACE_BLOB_LINK, SUBSPACE_LOOKUP_VALUE, @@ -103,24 +79,33 @@ impl SqliteStore { SUBSPACE_REPORT_IN, SUBSPACE_FTS_INDEX, SUBSPACE_LOGS, - SUBSPACE_BLOBS, SUBSPACE_TELEMETRY_SPAN, SUBSPACE_TELEMETRY_METRIC, SUBSPACE_TELEMETRY_INDEX, ] { let table = char::from(table); - conn.execute( - &format!( - "CREATE TABLE IF NOT EXISTS {table} ( - k BLOB PRIMARY KEY, - v BLOB NOT NULL - )" - ), - [], - ) + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {table} ( + k TINYBLOB, + v MEDIUMBLOB NOT NULL, + PRIMARY KEY (k(255)) + ) ENGINE=InnoDB" + ))) + .await .map_err(into_error)?; } + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {} ( + k TINYBLOB, + v LONGBLOB NOT NULL, + PRIMARY KEY (k(255)) + ) ENGINE=InnoDB", + char::from(SUBSPACE_BLOBS), + ))) + .await + .map_err(into_error)?; + for table in [ SUBSPACE_INDEXES, SUBSPACE_BITMAP_ID, @@ -128,28 +113,26 @@ impl SqliteStore { SUBSPACE_BITMAP_TEXT, ] { let table = char::from(table); - conn.execute( - &format!( - "CREATE TABLE IF NOT EXISTS {table} ( - k BLOB PRIMARY KEY - )" - ), - [], - ) + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {table} ( + k BLOB, + PRIMARY KEY (k(400)) + ) ENGINE=InnoDB" + ))) + .await .map_err(into_error)?; } for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] { - conn.execute( - &format!( - "CREATE TABLE IF NOT EXISTS {} ( - k BLOB PRIMARY KEY, - v INTEGER NOT NULL DEFAULT 0 - )", - char::from(table) - ), - [], - ) + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {} ( + k TINYBLOB, + v BIGINT NOT NULL DEFAULT 0, + PRIMARY KEY (k(255)) + ) ENGINE=InnoDB", + char::from(table) + ))) + .await .map_err(into_error)?; } diff --git a/crates/store/src/backend/rqlite/mod.rs b/crates/store/src/backend/rqlite/mod.rs index 47ec465ab..3b5962be9 100644 --- a/crates/store/src/backend/rqlite/mod.rs +++ b/crates/store/src/backend/rqlite/mod.rs @@ -8,7 +8,7 @@ use std::fmt::Display; use r2d2::Pool; -use self::pool::SqliteConnectionManager; +use self::pool::RqliteConnectionManager; pub mod blob; pub mod lookup; @@ -17,12 +17,12 @@ pub mod pool; pub mod read; pub mod write; -pub struct SqliteStore { - pub(crate) conn_pool: Pool, +pub struct RqliteStore { + pub(crate) conn_pool: Pool, pub(crate) worker_pool: rayon::ThreadPool, } #[inline(always)] fn into_error(err: impl Display) -> trc::Error { - trc::StoreEvent::SqliteError.reason(err) + trc::StoreEvent::RqliteError.reason(err) } diff --git a/crates/store/src/backend/rqlite/pool.rs b/crates/store/src/backend/rqlite/pool.rs index 6471f7d8c..02c1afb19 100644 --- a/crates/store/src/backend/rqlite/pool.rs +++ b/crates/store/src/backend/rqlite/pool.rs @@ -4,85 +4,31 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use rusqlite::{Connection, Error, OpenFlags}; +use rqlite_rs::client::RqliteClient; +use rqlite_rs::error::{ClientBuilderError, RequestError}; +use rqlite_rs::RqliteClientBuilder; use std::fmt; -use std::path::{Path, PathBuf}; - -#[derive(Debug)] -enum Source { - File(PathBuf), - Memory, -} - -type InitFn = dyn Fn(&mut Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static; /// An `r2d2::ManageConnection` for `rusqlite::Connection`s. -pub struct SqliteConnectionManager { - source: Source, - flags: OpenFlags, - init: Option>, +pub struct RqliteConnectionManager { + endpoints: Vec, } -impl fmt::Debug for SqliteConnectionManager { +impl fmt::Debug for RqliteConnectionManager { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut builder = f.debug_struct("SqliteConnectionManager"); - let _ = builder.field("source", &self.source); - let _ = builder.field("flags", &self.source); - let _ = builder.field("init", &self.init.as_ref().map(|_| "InitFn")); + let _ = builder.field("endpoints", &self.endpoints); builder.finish() } } -impl SqliteConnectionManager { - /// Creates a new `SqliteConnectionManager` from file. - /// - /// See `rusqlite::Connection::open` - pub fn file>(path: P) -> Self { - Self { - source: Source::File(path.as_ref().to_path_buf()), - flags: OpenFlags::default(), - init: None, - } - } - - /// Creates a new `SqliteConnectionManager` from memory. - pub fn memory() -> Self { +impl RqliteConnectionManager { + /// Creates a new `RqliteConnectionManager` from endpoints. + pub fn endpoints(endpoints: Vec) -> Self { Self { - source: Source::Memory, - flags: OpenFlags::default(), - init: None, + endpoints: endpoints, } } - - /// Converts `SqliteConnectionManager` into one that sets OpenFlags upon - /// connection creation. - /// - /// See `rustqlite::OpenFlags` for a list of available flags. - pub fn with_flags(self, flags: OpenFlags) -> Self { - Self { flags, ..self } - } - - /// Converts `SqliteConnectionManager` into one that calls an initialization - /// function upon connection creation. Could be used to set PRAGMAs, for - /// example. - /// - /// ### Example - /// - /// Make a `SqliteConnectionManager` that sets the `foreign_keys` pragma to - /// true for every connection. - /// - /// ```rust,no_run - /// # use r2d2_sqlite::{SqliteConnectionManager}; - /// let manager = SqliteConnectionManager::file("app.db") - /// .with_init(|c| c.execute_batch("PRAGMA foreign_keys=1;")); - /// ``` - pub fn with_init(self, init: F) -> Self - where - F: Fn(&mut Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static, - { - let init: Option> = Some(Box::new(init)); - Self { init, ..self } - } } fn sleeper(_: i32) -> bool { @@ -90,30 +36,30 @@ fn sleeper(_: i32) -> bool { true } -impl r2d2::ManageConnection for SqliteConnectionManager { - type Connection = Connection; - type Error = rusqlite::Error; +impl r2d2::ManageConnection for RqliteConnectionManager { + type Connection = RqliteClient; + type Error = ClientBuilderError; + + fn connect(&self) -> Result { + let mut client_builder = RqliteClientBuilder::new(); - fn connect(&self) -> Result { - match self.source { - Source::File(ref path) => Connection::open_with_flags(path, self.flags), - Source::Memory => Connection::open_in_memory_with_flags(self.flags), + for endpoint in &self.endpoints { + client_builder = client_builder.known_host(endpoint); } - .map_err(Into::into) - .and_then(|mut c| { - c.busy_handler(Some(sleeper))?; - match self.init { - None => Ok(c), - Some(ref init) => init(&mut c).map(|_| c), - } - }) + + client_builder.build().map_err(Into::into) } - fn is_valid(&self, conn: &mut Connection) -> Result<(), Error> { - conn.execute_batch("").map_err(Into::into) + fn is_valid(&self, conn: &mut RqliteClient) -> Result<(), ClientBuilderError> { + Ok(()) + /*let res = conn.exec(rqlite_rs::query!("SELECT 1;")); + match res.wait().map_err(Into::into) { + Ok(_) => Ok(()), + Err(err) => Err(err) + }*/ } - fn has_broken(&self, _: &mut Connection) -> bool { + fn has_broken(&self, _: &mut RqliteClient) -> bool { false } } diff --git a/crates/store/src/backend/rqlite/read.rs b/crates/store/src/backend/rqlite/read.rs index 1540324af..8dc9b74a4 100644 --- a/crates/store/src/backend/rqlite/read.rs +++ b/crates/store/src/backend/rqlite/read.rs @@ -12,29 +12,23 @@ use crate::{ BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, }; -use super::{into_error, SqliteStore}; +use super::{into_error, RqliteStore}; -impl SqliteStore { +impl RqliteStore { pub(crate) async fn get_value(&self, key: impl Key) -> trc::Result> where U: Deserialize + 'static, { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { - let mut result = conn - .prepare_cached(&format!( - "SELECT v FROM {} WHERE k = ?", - char::from(key.subspace()) - )) - .map_err(into_error)?; - let key = key.serialize(0); - result - .query_row([&key], |row| { - U::deserialize(row.get_ref(0)?.as_bytes()?) - .map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into())) - }) - .optional() - .map_err(into_error) + let query = rqlite_rs::query!( + &format!("SELECT v FROM {} WHERE k = ?", char::from(key.subspace())), + key.serialize(0) + ); + match conn.fetch(query).await.map_err(into_error) { + Ok(rows) => U::deserialize(rows.get_by_index_opt(0)?.as_bytes()?) + .map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into())), + } }) .await } @@ -52,14 +46,18 @@ impl SqliteStore { self.spawn_worker(move || { let mut bm = RoaringBitmap::new(); - let mut query = conn - .prepare_cached(&format!("SELECT k FROM {table} WHERE k >= ? AND k <= ?")) + let mut rows = conn + .fetch(rqlite_rs::query!( + &format!("SELECT k FROM {table} WHERE k >= ? AND k <= ?"), + begin, + end + )) + .await .map_err(into_error)?; - let mut rows = query.query([&begin, &end]).map_err(into_error)?; while let Some(row) = rows.next().map_err(into_error)? { let key = row - .get_ref(0) + .get_by_index(0) .map_err(into_error)? .as_bytes() .map_err(into_error)?; @@ -86,7 +84,7 @@ impl SqliteStore { let keys = if params.values { "k, v" } else { "k" }; let mut query = conn - .prepare_cached(&match (params.first, params.ascending) { + .fetch(rqlite_rs::query!(&match (params.first, params.ascending) { (true, true) => { format!( "SELECT {keys} FROM {table} WHERE k >= ? AND k <= ? ORDER BY k ASC LIMIT 1" @@ -105,19 +103,20 @@ impl SqliteStore { "SELECT {keys} FROM {table} WHERE k >= ? AND k <= ? ORDER BY k DESC" ) } - }) + })) + .await .map_err(into_error)?; let mut rows = query.query([&begin, &end]).map_err(into_error)?; if params.values { while let Some(row) = rows.next().map_err(into_error)? { let key = row - .get_ref(0) + .get_by_index(0) .map_err(into_error)? .as_bytes() .map_err(into_error)?; let value = row - .get_ref(1) + .get_by_index(1) .map_err(into_error)? .as_bytes() .map_err(into_error)?; @@ -129,7 +128,7 @@ impl SqliteStore { } else { while let Some(row) = rows.next().map_err(into_error)? { if !cb( - row.get_ref(0) + row.get_by_index(0) .map_err(into_error)? .as_bytes() .map_err(into_error)?, @@ -155,7 +154,10 @@ impl SqliteStore { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { match conn - .prepare_cached(&format!("SELECT v FROM {table} WHERE k = ?")) + .fetch(rqlite_rs::query!(&format!( + "SELECT v FROM {table} WHERE k = ?" + ))) + .await .map_err(into_error)? .query_row([&key], |row| row.get::<_, i64>(0)) { diff --git a/crates/store/src/backend/rqlite/write.rs b/crates/store/src/backend/rqlite/write.rs index 1e9280d79..2003df209 100644 --- a/crates/store/src/backend/rqlite/write.rs +++ b/crates/store/src/backend/rqlite/write.rs @@ -5,7 +5,7 @@ */ use roaring::RoaringBitmap; -use rusqlite::{params, OptionalExtension, TransactionBehavior}; +use rqlite_rs::query::RqliteQuery; use crate::{ write::{ @@ -15,9 +15,9 @@ use crate::{ BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN, }; -use super::{into_error, SqliteStore}; +use super::{into_error, RqliteStore}; -impl SqliteStore { +impl RqliteStore { pub(crate) async fn write(&self, batch: Batch) -> trc::Result { let mut conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { @@ -25,9 +25,7 @@ impl SqliteStore { let mut collection = u8::MAX; let mut document_id = u32::MAX; let mut change_id = u64::MAX; - let trx = conn - .transaction_with_behavior(TransactionBehavior::Immediate) - .map_err(into_error)?; + let queries: Vec = vec![]; let mut result = AssignedIds::default(); for op in &batch.ops { @@ -64,55 +62,75 @@ impl SqliteStore { match op { ValueOp::Set(value) => { - trx.prepare_cached(&format!( - "INSERT OR REPLACE INTO {} (k, v) VALUES (?, ?)", - table - )) - .map_err(into_error)? - .execute([&key, value.resolve(&result)?.as_ref()]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!( + &format!( + "INSERT OR REPLACE INTO {} (k, v) VALUES (?, ?)", + table + ), + key, + value.resolve(&result)?.as_ref() + ) + .await, + ); } ValueOp::AtomicAdd(by) => { if *by >= 0 { - trx.prepare_cached(&format!( - concat!( - "INSERT INTO {} (k, v) VALUES (?, ?) ", - "ON CONFLICT(k) DO UPDATE SET v = v + excluded.v" - ), - table - )) - .map_err(into_error)? - .execute(params![&key, *by]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!( + &format!( + concat!( + "INSERT INTO {} (k, v) VALUES (?, ?) ", + "ON CONFLICT(k) DO UPDATE SET v = v + excluded.v" + ), + table + ), + key, + *by + ) + .await, + ); } else { - trx.prepare_cached(&format!( - "UPDATE {table} SET v = v + ? WHERE k = ?" - )) - .map_err(into_error)? - .execute(params![*by, &key]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!( + &format!("UPDATE {table} SET v = v + ? WHERE k = ?"), + *by, + key + ) + .await, + ); } } ValueOp::AddAndGet(by) => { + // NOTE: escapes the transaction result.push_counter_id( - trx.prepare_cached(&format!( - concat!( - "INSERT INTO {} (k, v) VALUES (?, ?) ", - "ON CONFLICT(k) DO UPDATE SET v = v + ", - "excluded.v RETURNING v" - ), - table - )) + conn.fetch( + rqlite_rs::query!( + &format!( + concat!( + "INSERT INTO {} (k, v) VALUES (?, ?) ", + "ON CONFLICT(k) DO UPDATE SET v = v + ", + "excluded.v RETURNING v" + ), + table + ), + key, + *by + ) + .await, + ) + .await .map_err(into_error)? - .query_row(params![&key, &by], |row| row.get::<_, i64>(0)) + .first() + .map(|row| row.get::<_, i64>(0)) .map_err(into_error)?, ); } ValueOp::Clear => { - trx.prepare_cached(&format!("DELETE FROM {} WHERE k = ?", table)) - .map_err(into_error)? - .execute([&key]) - .map_err(into_error)?; + queries.push(rqlite_rs::query!( + &format!("DELETE FROM {} WHERE k = ?", table), + key + )); } } } @@ -127,15 +145,12 @@ impl SqliteStore { .serialize(0); if *set { - trx.prepare_cached("INSERT OR IGNORE INTO i (k) VALUES (?)") - .map_err(into_error)? - .execute([&key]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!("INSERT OR IGNORE INTO i (k) VALUES (?)", key) + .await, + ); } else { - trx.prepare_cached("DELETE FROM i WHERE k = ?") - .map_err(into_error)? - .execute([&key]) - .map_err(into_error)?; + queries.push(rqlite_rs::query!("DELETE FROM i WHERE k = ?", key).await); } } Operation::Bitmap { class, set } => { @@ -158,14 +173,19 @@ impl SqliteStore { .serialize(0); let key_len = begin.len(); - let mut query = trx - .prepare_cached("SELECT k FROM b WHERE k >= ? AND k <= ?") - .map_err(into_error)?; - let mut rows = query.query([&begin, &end]).map_err(into_error)?; + // NOTE: escapes the transaction + let rows = rqlite_rs::query!( + "SELECT k FROM b WHERE k >= ? AND k <= ?", + begin, + end + ) + .await + .map_err(into_error)?; + let mut found_ids = RoaringBitmap::new(); - while let Some(row) = rows.next().map_err(into_error)? { + for row in rows { let key = row - .get_ref(0) + .get_by_index(0) .map_err(into_error)? .as_bytes() .map_err(into_error)?; @@ -188,24 +208,26 @@ impl SqliteStore { if *set { if is_document_id { - trx.prepare_cached("INSERT INTO b (k) VALUES (?)") - .map_err(into_error)? - .execute(params![&key]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!("INSERT INTO b (k) VALUES (?)", key).await, + ); } else { - trx.prepare_cached(&format!( - "INSERT OR IGNORE INTO {} (k) VALUES (?)", - table - )) - .map_err(into_error)? - .execute(params![&key]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!( + &format!("INSERT OR IGNORE INTO {} (k) VALUES (?)", table), + key + ) + .await, + ); } } else { - trx.prepare_cached(&format!("DELETE FROM {} WHERE k = ?", table)) - .map_err(into_error)? - .execute(params![&key]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!( + &format!("DELETE FROM {} WHERE k = ?", table), + key + ) + .await, + ); }; } Operation::Log { set } => { @@ -216,10 +238,14 @@ impl SqliteStore { } .serialize(0); - trx.prepare_cached("INSERT OR REPLACE INTO l (k, v) VALUES (?, ?)") - .map_err(into_error)? - .execute([&key, set.resolve(&result).map_err(into_error)?.as_ref()]) - .map_err(into_error)?; + queries.push( + rqlite_rs::query!( + "INSERT OR REPLACE INTO l (k, v) VALUES (?, ?)", + key, + set.resolve(&result).map_err(into_error)?.as_ref() + ) + .await, + ); } Operation::AssertValue { class, @@ -234,24 +260,30 @@ impl SqliteStore { ); let table = char::from(class.subspace(collection)); - let matches = trx - .prepare_cached(&format!("SELECT v FROM {} WHERE k = ?", table)) + // NOTE: escapes the transaction + let matches = conn + .fetch( + rqlite_rs::query!( + &format!("SELECT v FROM {} WHERE k = ?", table), + key + ) + .await, + ) + .await .map_err(into_error)? - .query_row([&key], |row| { - Ok(assert_value.matches(row.get_ref(0)?.as_bytes()?)) - }) - .optional() + .first() + .map(|row| Ok(assert_value.matches(row.get_by_index(0)?.as_bytes()?))) .map_err(into_error)? .unwrap_or_else(|| assert_value.is_none()); + if !matches { - trx.rollback().map_err(into_error)?; return Err(trc::StoreEvent::AssertValueFailed.into()); } } } } - trx.commit().map(|_| result).map_err(into_error) + conn.transaction(queries).await.map_err(into_error)?; }) .await } @@ -260,10 +292,12 @@ impl SqliteStore { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] { - conn.prepare_cached(&format!("DELETE FROM {} WHERE v = 0", char::from(subspace),)) - .map_err(into_error)? - .execute([]) - .map_err(into_error)?; + conn.exec(rqlite_rs::query!(&format!( + "DELETE FROM {} WHERE v = 0", + char::from(subspace), + ))) + .await + .map_err(into_error)?; } Ok(()) @@ -274,10 +308,11 @@ impl SqliteStore { pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> trc::Result<()> { let conn = self.conn_pool.get().map_err(into_error)?; self.spawn_worker(move || { - conn.prepare_cached(&format!( + conn.exec(rqlite_rs::query!(&format!( "DELETE FROM {} WHERE k >= ? AND k < ?", char::from(from.subspace()), - )) + ))) + .await .map_err(into_error)? .execute([from.serialize(0), to.serialize(0)]) .map_err(into_error)?; diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index f06b91520..b751fb47d 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -203,14 +203,14 @@ impl Stores { continue; } - if let Some(db) = RqliteStore::open(config, prefix).map(Store::from) { + if let Some(db) = RqliteStore::open(config, prefix).await.map(Store::from) { self.stores.insert(store_id.clone(), db.clone()); self.fts_stores.insert(store_id.clone(), db.clone().into()); self.blob_stores.insert( store_id.clone(), BlobStore::from(db.clone()).with_compression(compression_algo), ); - self.lookup_stores.insert(store_id.clone(), db.into()); + self.in_memory_stores.insert(store_id.clone(), db.into()); } } "fs" => { diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 584b3c33e..0f4672dee 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -241,7 +241,7 @@ impl Store { #[cfg(feature = "rqlite")] Sel::SQLiRe(store) => store.write(batch).await, #[cfg(feature = "sqlite")] - Self::SQLite(store) => store.write(batch).await, + Self::SQLite(store) => store.write(batch).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.write(batch).await, #[cfg(feature = "postgres")]