Skip to content

Commit

Permalink
feat(rqlite): Finish storing data
Browse files Browse the repository at this point in the history
  • Loading branch information
williamdes committed Jan 1, 2025
1 parent 3d3cdd0 commit 40dc617
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 316 deletions.
4 changes: 2 additions & 2 deletions crates/store/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ pub mod postgres;
pub mod redis;
#[cfg(feature = "rocks")]
pub mod rocksdb;
#[cfg(feature = "rqlite")]
pub mod rqlite;
#[cfg(feature = "s3")]
pub mod s3;
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "rqlite")]
pub mod rqlite;

pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize;
pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;
Expand Down
31 changes: 17 additions & 14 deletions crates/store/src/backend/rqlite/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::ops::Range;

use rusqlite::OptionalExtension;

use super::{into_error, SqliteStore};
use super::{into_error, RqliteStore};

impl SqliteStore {
impl RqliteStore {
pub(crate) async fn get_blob(
&self,
key: &[u8],
Expand All @@ -19,12 +19,14 @@ impl SqliteStore {
let conn = self.conn_pool.get().map_err(into_error)?;
self.spawn_worker(move || {
let mut result = conn
.prepare_cached("SELECT v FROM t WHERE k = ?")
.exec(rqlite_rs::query!("SELECT v FROM t WHERE k = ?", key))
.await
.map_err(into_error)?;
result
.query_row([&key], |row| {
.first()
.map(|row| {
Ok({
let bytes = row.get_ref(0)?.as_bytes()?;
let bytes = row.get_by_index(0)?.as_bytes()?;
if range.start == 0 && range.end == usize::MAX {
bytes.to_vec()
} else {
Expand All @@ -35,7 +37,6 @@ impl SqliteStore {
}
})
})
.optional()
.map_err(into_error)
})
.await
Expand All @@ -44,21 +45,23 @@ impl SqliteStore {
pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> {
let conn = self.conn_pool.get().map_err(into_error)?;
self.spawn_worker(move || {
conn.prepare_cached("INSERT OR REPLACE INTO t (k, v) VALUES (?, ?)")
.map_err(into_error)?
.execute([key, data])
.map_err(into_error)
.map(|_| ())
conn.exec(rqlite_rs::query!(
"INSERT OR REPLACE INTO t (k, v) VALUES (?, ?)",
key,
data
))
.await
.map_err(into_error)
.map(|_| ())
})
.await
}

pub(crate) async fn delete_blob(&self, key: &[u8]) -> trc::Result<bool> {
let conn = self.conn_pool.get().map_err(into_error)?;
self.spawn_worker(move || {
conn.prepare_cached("DELETE FROM t WHERE k = ?")
.map_err(into_error)?
.execute([key])
conn.exec(rqlite_rs::query!("DELETE FROM t WHERE k = ?", key))
.await
.map_err(into_error)
.map(|_| true)
})
Expand Down
72 changes: 42 additions & 30 deletions crates/store/src/backend/rqlite/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,72 @@
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use rusqlite::{types::FromSql, Row, Rows, ToSql};
use rqlite_rs::query::arguments::RqliteArgument;
use rqlite_rs::query::{Operation, RqliteQuery};

use crate::{IntoRows, QueryResult, QueryType, Value};

use super::{into_error, SqliteStore};
use super::{into_error, RqliteStore};

impl SqliteStore {
impl RqliteStore {
pub(crate) async fn query<T: QueryResult>(
&self,
query: &str,
params_: &[Value<'_>],
) -> trc::Result<T> {
let conn = self.conn_pool.get().map_err(into_error)?;
self.spawn_worker(move || {
let mut s = conn.prepare_cached(query).map_err(into_error)?;
let params = params_
.iter()
.map(|v| v as &(dyn rusqlite::types::ToSql))
.collect::<Vec<_>>();
let params: Vec<RqliteArgument> =
params_.iter().map(|v| (&v.to_owned()).into()).collect();

let mut query = RqliteQuery {
query: query.to_string(),
args: params,
op: Operation::Select,
};

match T::query_type() {
QueryType::Execute => s
.execute(params.as_slice())
QueryType::Execute => conn
.exec(query)
.await
.map_err(into_error)?
.map_or_else(|e| Err(into_error(e)), |r| Ok(T::from_exec(r))),
QueryType::Exists => s
.exists(params.as_slice())
QueryType::Exists => conn
.fetch(query)
.await
.map_err(into_error)?
.first()
.map(T::from_exists)
.map_err(into_error),
QueryType::QueryOne => s
.query(params.as_slice())
.and_then(|mut rows| Ok(T::from_query_one(rows.next()?)))
QueryType::QueryOne => conn
.fetch(query)
.await
.map_err(into_error)?
.and_then(|mut rows| Ok(T::from_query_one(rows.first()?)))
.map_err(into_error),
QueryType::QueryAll => Ok(T::from_query_all(
s.query(params.as_slice()).map_err(into_error)?,
conn.fetch(query).await.map_err(into_error)?,
)),
}
})
.await
}
}

impl ToSql for Value<'_> {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
match self {
Value::Integer(value) => value.to_sql(),
Value::Bool(value) => value.to_sql(),
Value::Float(value) => value.to_sql(),
Value::Text(value) => value.to_sql(),
Value::Blob(value) => value.to_sql(),
Value::Null => Ok(rusqlite::types::ToSqlOutput::Owned(
rusqlite::types::Value::Null,
)),
impl From<&Value<'_>> for RqliteArgument {
fn from(value: &Value<'_>) -> RqliteArgument {
match value {
Value::Integer(u) => RqliteArgument::I64(*u as i64),
Value::Bool(b) => RqliteArgument::Bool(*b),
Value::Float(f) => RqliteArgument::F64(*f as f64),
Value::Text(s) => RqliteArgument::String(s.to_string()),
Value::Blob(blob) => RqliteArgument::Blob(blob.to_vec()),
Value::Null => RqliteArgument::Null,
}
}
}

/*
impl FromSql for Value<'static> {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
Ok(match value {
Expand All @@ -72,8 +81,9 @@ impl FromSql for Value<'static> {
rusqlite::types::ValueRef::Blob(v) => Value::Blob(v.to_vec().into()),
})
}
}
} */

/*
impl IntoRows for Rows<'_> {
fn into_rows(mut self) -> crate::Rows {
let column_count = self.as_ref().map(|s| s.column_count()).unwrap_or_default();
Expand Down Expand Up @@ -124,7 +134,8 @@ impl IntoRows for Rows<'_> {
unreachable!()
}
}

*/
/*
impl IntoRows for Option<&Row<'_>> {
fn into_row(self) -> Option<crate::Row> {
self.map(|row| crate::Row {
Expand All @@ -142,3 +153,4 @@ impl IntoRows for Option<&Row<'_>> {
unreachable!()
}
}
*/
113 changes: 48 additions & 65 deletions crates/store/src/backend/rqlite/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,25 @@ use utils::config::{utils::AsKey, Config};

use crate::*;

use super::{into_error, pool::SqliteConnectionManager, SqliteStore};
use super::{into_error, pool::RqliteConnectionManager, RqliteStore};

impl SqliteStore {
pub fn open(config: &mut Config, prefix: impl AsKey) -> Option<Self> {
impl RqliteStore {
pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option<Self> {
let prefix = prefix.as_key();
let endpoints = config
.properties::<String>((&prefix, "endpoints"))
.into_iter()
.map(|(_key, addr_str)| addr_str)
.collect::<Vec<String>>();

let db = Self {
conn_pool: Pool::builder()
.max_size(
config
.property((&prefix, "pool.max-connections"))
.unwrap_or_else(|| (num_cpus::get() * 4) as u32),
)
.build(
SqliteConnectionManager::file(config.value_require((&prefix, "path"))?)
.with_init(|c| {
c.execute_batch(concat!(
"PRAGMA journal_mode = WAL; ",
"PRAGMA synchronous = NORMAL; ",
"PRAGMA temp_store = memory;",
"PRAGMA busy_timeout = 30000;"
))
}),
)
.build(RqliteConnectionManager::endpoints(endpoints))
.map_err(|err| {
config.new_build_error(
prefix.as_str(),
Expand All @@ -58,40 +54,20 @@ impl SqliteStore {
.ok()?,
};

if let Err(err) = db.create_tables() {
if let Err(err) = db.create_tables().await {
config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}"));
}

Some(db)
}

#[cfg(feature = "test_mode")]
pub fn open_memory() -> trc::Result<Self> {
use super::into_error;

let db = Self {
conn_pool: Pool::builder()
.max_size(1)
.build(SqliteConnectionManager::memory())
.map_err(into_error)?,
worker_pool: rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.build()
.map_err(|err| {
into_error(err).ctx(trc::Key::Reason, "Failed to build worker pool")
})?,
};
db.create_tables()?;
Ok(db)
}

pub(super) fn create_tables(&self) -> trc::Result<()> {
pub(crate) async fn create_tables(&self) -> trc::Result<()> {
let conn = self.conn_pool.get().map_err(into_error)?;

for table in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_QUEUE,
SUBSPACE_TASK_QUEUE,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
Expand All @@ -103,53 +79,60 @@ impl SqliteStore {
SUBSPACE_REPORT_IN,
SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_TELEMETRY_INDEX,
] {
let table = char::from(table);
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {table} (
k BLOB PRIMARY KEY,
v BLOB NOT NULL
)"
),
[],
)
conn.exec(rqlite_rs::query!(&format!(
"CREATE TABLE IF NOT EXISTS {table} (
k TINYBLOB,
v MEDIUMBLOB NOT NULL,
PRIMARY KEY (k(255))
) ENGINE=InnoDB"
)))
.await
.map_err(into_error)?;
}

conn.exec(rqlite_rs::query!(&format!(
"CREATE TABLE IF NOT EXISTS {} (
k TINYBLOB,
v LONGBLOB NOT NULL,
PRIMARY KEY (k(255))
) ENGINE=InnoDB",
char::from(SUBSPACE_BLOBS),
)))
.await
.map_err(into_error)?;

for table in [
SUBSPACE_INDEXES,
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
] {
let table = char::from(table);
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {table} (
k BLOB PRIMARY KEY
)"
),
[],
)
conn.exec(rqlite_rs::query!(&format!(
"CREATE TABLE IF NOT EXISTS {table} (
k BLOB,
PRIMARY KEY (k(400))
) ENGINE=InnoDB"
)))
.await
.map_err(into_error)?;
}

for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
v INTEGER NOT NULL DEFAULT 0
)",
char::from(table)
),
[],
)
conn.exec(rqlite_rs::query!(&format!(
"CREATE TABLE IF NOT EXISTS {} (
k TINYBLOB,
v BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (k(255))
) ENGINE=InnoDB",
char::from(table)
)))
.await
.map_err(into_error)?;
}

Expand Down
Loading

0 comments on commit 40dc617

Please sign in to comment.