From 5c4a85d4d6c7761ac7c9bcdc5f014e16848effdb Mon Sep 17 00:00:00 2001 From: Kevin Date: Mon, 27 Nov 2023 10:07:47 +0100 Subject: [PATCH] chore (client): provide generic DB adapters for serial and batch execution of SQL (#685) This PR splits the generic database adapter into two separate ones, one for serial execution of SQL queries and one for batched execution of SQL queries. --- .changeset/few-apples-buy.md | 5 + .../src/drivers/expo-sqlite/adapter.ts | 2 +- .../typescript/src/drivers/generic/adapter.ts | 118 ++++++++++++------ .../typescript/src/drivers/generic/index.ts | 4 +- .../src/drivers/wa-sqlite/adapter.ts | 2 +- clients/typescript/src/util/statements.ts | 9 +- 6 files changed, 90 insertions(+), 50 deletions(-) create mode 100644 .changeset/few-apples-buy.md diff --git a/.changeset/few-apples-buy.md b/.changeset/few-apples-buy.md new file mode 100644 index 0000000000..126a7d84e4 --- /dev/null +++ b/.changeset/few-apples-buy.md @@ -0,0 +1,5 @@ +--- +"electric-sql": patch +--- + +Generic implementation of serial and batched database adapters. diff --git a/clients/typescript/src/drivers/expo-sqlite/adapter.ts b/clients/typescript/src/drivers/expo-sqlite/adapter.ts index 19728865a1..09eb2ad46f 100644 --- a/clients/typescript/src/drivers/expo-sqlite/adapter.ts +++ b/clients/typescript/src/drivers/expo-sqlite/adapter.ts @@ -1,6 +1,6 @@ import { Row } from '../../util/types' import { Statement } from '../../util' -import { DatabaseAdapter as GenericDatabaseAdapter } from '../generic' +import { SerialDatabaseAdapter as GenericDatabaseAdapter } from '../generic' import { Database } from './database' export class DatabaseAdapter extends GenericDatabaseAdapter { diff --git a/clients/typescript/src/drivers/generic/adapter.ts b/clients/typescript/src/drivers/generic/adapter.ts index 2ff5a7fc89..94c8376223 100644 --- a/clients/typescript/src/drivers/generic/adapter.ts +++ b/clients/typescript/src/drivers/generic/adapter.ts @@ -10,60 +10,42 @@ import { Mutex } from 'async-mutex' import { AnyDatabase } from '..' /** - * A generic adapter that manages transactions manually by executing `BEGIN` and `COMMIT`/`ROLLBACK` commands. + * A generic database adapter. * Uses a mutex to ensure that transactions are not interleaved. - * Concrete adapters extending this class must implement the `exec` and `getRowsModified` methods. + * Concrete adapters extending this class must implement the + * `exec`, `getRowsModified`, and `runInTransaction` methods. */ -export abstract class DatabaseAdapter +abstract class DatabaseAdapter extends TableNameImpl implements DatabaseAdapterInterface { abstract readonly db: AnyDatabase - #txMutex: Mutex + protected txMutex: Mutex constructor() { super() - this.#txMutex = new Mutex() + this.txMutex = new Mutex() } + /** + * @param statement A SQL statement to execute against the DB. + */ abstract exec(statement: Statement): Promise + + /** + * @returns The number of rows modified by the last insert/update/delete query. + */ abstract getRowsModified(): number - async runInTransaction(...statements: Statement[]): Promise { - // Uses a mutex to ensure that transactions are not interleaved. - // This is needed because transactions are executed manually using `BEGIN` and `COMMIT` commands. - const release = await this.#txMutex.acquire() - let open = false - let rowsAffected = 0 - try { - await this.exec({ sql: 'BEGIN' }) - open = true - for (const stmt of statements) { - await this.exec(stmt) - if (isInsertUpdateOrDeleteStatement(stmt.sql)) { - // Fetch the number of rows affected by the last insert, update, or delete - rowsAffected += this.getRowsModified() - } - } - return { - rowsAffected: rowsAffected, - } - } catch (error) { - await this.exec({ sql: 'ROLLBACK' }) - open = false - throw error // rejects the promise with the reason for the rollback - } finally { - if (open) { - await this.exec({ sql: 'COMMIT' }) - } - release() - } - } + /** + * @param statements A list of SQL statements to execute against the DB. + */ + abstract runInTransaction(...statements: Statement[]): Promise async transaction( f: (_tx: Tx, setResult: (res: T) => void) => void ): Promise { - const release = await this.#txMutex.acquire() + const release = await this.txMutex.acquire() try { await this.exec({ sql: 'BEGIN' }) @@ -100,7 +82,7 @@ export abstract class DatabaseAdapter run(stmt: Statement): Promise { // Also uses the mutex to avoid running this query while a transaction is executing. // Because that would make the query part of the transaction which was not the intention. - return this.#txMutex.runExclusive(() => { + return this.txMutex.runExclusive(() => { return this._runUncoordinated(stmt) }) } @@ -118,7 +100,7 @@ export abstract class DatabaseAdapter query(stmt: Statement): Promise { // Also uses the mutex to avoid running this query while a transaction is executing. // Because that would make the query part of the transaction which was not the intention. - return this.#txMutex.runExclusive(() => { + return this.txMutex.runExclusive(() => { return this._queryUncoordinated(stmt) }) } @@ -130,6 +112,66 @@ export abstract class DatabaseAdapter } } +/** + * A generic database adapter that uses batch execution of SQL queries. + * Extend this database adapter if your driver supports batch execution of SQL queries. + */ +export abstract class BatchDatabaseAdapter + extends DatabaseAdapter + implements DatabaseAdapterInterface +{ + /** + * @param statements SQL statements to execute against the DB in a single batch. + */ + abstract execBatch(statements: Statement[]): Promise + + async runInTransaction(...statements: Statement[]): Promise { + // Uses a mutex to ensure that transactions are not interleaved. + return this.txMutex.runExclusive(() => { + return this.execBatch(statements) + }) + } +} + +/** + * A generic database adapter that uses serial execution of SQL queries. + * Extend this database adapter if your driver does not support batch execution of SQL queries. + */ +export abstract class SerialDatabaseAdapter + extends DatabaseAdapter + implements DatabaseAdapterInterface +{ + async runInTransaction(...statements: Statement[]): Promise { + // Uses a mutex to ensure that transactions are not interleaved. + const release = await this.txMutex.acquire() + let open = false + let rowsAffected = 0 + try { + await this.exec({ sql: 'BEGIN' }) + open = true + for (const stmt of statements) { + await this.exec(stmt) + if (isInsertUpdateOrDeleteStatement(stmt.sql)) { + // Fetch the number of rows affected by the last insert, update, or delete + rowsAffected += this.getRowsModified() + } + } + return { + rowsAffected: rowsAffected, + } + } catch (error) { + await this.exec({ sql: 'ROLLBACK' }) + open = false + throw error // rejects the promise with the reason for the rollback + } finally { + if (open) { + await this.exec({ sql: 'COMMIT' }) + } + release() + } + } +} + class Transaction implements Tx { constructor( private adapter: DatabaseAdapter, diff --git a/clients/typescript/src/drivers/generic/index.ts b/clients/typescript/src/drivers/generic/index.ts index c0a6b2cd41..ad64df94eb 100644 --- a/clients/typescript/src/drivers/generic/index.ts +++ b/clients/typescript/src/drivers/generic/index.ts @@ -1,3 +1 @@ -import { DatabaseAdapter } from './adapter' - -export { DatabaseAdapter } +export { SerialDatabaseAdapter, BatchDatabaseAdapter } from './adapter' diff --git a/clients/typescript/src/drivers/wa-sqlite/adapter.ts b/clients/typescript/src/drivers/wa-sqlite/adapter.ts index c4c543a333..c6d1c82b00 100644 --- a/clients/typescript/src/drivers/wa-sqlite/adapter.ts +++ b/clients/typescript/src/drivers/wa-sqlite/adapter.ts @@ -1,7 +1,7 @@ import { Database } from './database' import { Row } from '../../util/types' import { Statement } from '../../util' -import { DatabaseAdapter as GenericDatabaseAdapter } from '../generic' +import { SerialDatabaseAdapter as GenericDatabaseAdapter } from '../generic' export class DatabaseAdapter extends GenericDatabaseAdapter { readonly db: Database diff --git a/clients/typescript/src/util/statements.ts b/clients/typescript/src/util/statements.ts index d85c2e60cd..35e1c3da14 100644 --- a/clients/typescript/src/util/statements.ts +++ b/clients/typescript/src/util/statements.ts @@ -1,12 +1,7 @@ import { SqlValue, Statement } from './types' -export function isInsertUpdateOrDeleteStatement(sql: string) { - const tpe = sql.toLowerCase().trimStart() - return ( - tpe.startsWith('insert') || - tpe.startsWith('update') || - tpe.startsWith('delete') - ) +export function isInsertUpdateOrDeleteStatement(stmt: string) { + return /^\s*(insert|update|delete)/i.test(stmt) } /**