Skip to content

Commit

Permalink
chore (client): provide generic DB adapters for serial and batch exec…
Browse files Browse the repository at this point in the history
…ution of SQL (#685)

This PR splits the generic database adapter into two separate ones, one for serial execution of SQL queries and one for batched execution of SQL queries.
  • Loading branch information
kevin-dp committed Nov 27, 2023
1 parent 9209cd6 commit 5c4a85d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 50 deletions.
5 changes: 5 additions & 0 deletions .changeset/few-apples-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Generic implementation of serial and batched database adapters.
2 changes: 1 addition & 1 deletion clients/typescript/src/drivers/expo-sqlite/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Row } from '../../util/types'
import { Statement } from '../../util'
import { DatabaseAdapter as GenericDatabaseAdapter } from '../generic'
import { SerialDatabaseAdapter as GenericDatabaseAdapter } from '../generic'
import { Database } from './database'

export class DatabaseAdapter extends GenericDatabaseAdapter {
Expand Down
118 changes: 80 additions & 38 deletions clients/typescript/src/drivers/generic/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,42 @@ import { Mutex } from 'async-mutex'
import { AnyDatabase } from '..'

/**
* A generic adapter that manages transactions manually by executing `BEGIN` and `COMMIT`/`ROLLBACK` commands.
* A generic database adapter.
* Uses a mutex to ensure that transactions are not interleaved.
* Concrete adapters extending this class must implement the `exec` and `getRowsModified` methods.
* Concrete adapters extending this class must implement the
* `exec`, `getRowsModified`, and `runInTransaction` methods.
*/
export abstract class DatabaseAdapter
abstract class DatabaseAdapter
extends TableNameImpl
implements DatabaseAdapterInterface
{
abstract readonly db: AnyDatabase
#txMutex: Mutex
protected txMutex: Mutex

constructor() {
super()
this.#txMutex = new Mutex()
this.txMutex = new Mutex()
}

/**
* @param statement A SQL statement to execute against the DB.
*/
abstract exec(statement: Statement): Promise<Row[]>

/**
* @returns The number of rows modified by the last insert/update/delete query.
*/
abstract getRowsModified(): number

async runInTransaction(...statements: Statement[]): Promise<RunResult> {
// Uses a mutex to ensure that transactions are not interleaved.
// This is needed because transactions are executed manually using `BEGIN` and `COMMIT` commands.
const release = await this.#txMutex.acquire()
let open = false
let rowsAffected = 0
try {
await this.exec({ sql: 'BEGIN' })
open = true
for (const stmt of statements) {
await this.exec(stmt)
if (isInsertUpdateOrDeleteStatement(stmt.sql)) {
// Fetch the number of rows affected by the last insert, update, or delete
rowsAffected += this.getRowsModified()
}
}
return {
rowsAffected: rowsAffected,
}
} catch (error) {
await this.exec({ sql: 'ROLLBACK' })
open = false
throw error // rejects the promise with the reason for the rollback
} finally {
if (open) {
await this.exec({ sql: 'COMMIT' })
}
release()
}
}
/**
* @param statements A list of SQL statements to execute against the DB.
*/
abstract runInTransaction(...statements: Statement[]): Promise<RunResult>

async transaction<T>(
f: (_tx: Tx, setResult: (res: T) => void) => void
): Promise<T> {
const release = await this.#txMutex.acquire()
const release = await this.txMutex.acquire()

try {
await this.exec({ sql: 'BEGIN' })
Expand Down Expand Up @@ -100,7 +82,7 @@ export abstract class DatabaseAdapter
run(stmt: Statement): Promise<RunResult> {
// Also uses the mutex to avoid running this query while a transaction is executing.
// Because that would make the query part of the transaction which was not the intention.
return this.#txMutex.runExclusive(() => {
return this.txMutex.runExclusive(() => {
return this._runUncoordinated(stmt)
})
}
Expand All @@ -118,7 +100,7 @@ export abstract class DatabaseAdapter
query(stmt: Statement): Promise<Row[]> {
// Also uses the mutex to avoid running this query while a transaction is executing.
// Because that would make the query part of the transaction which was not the intention.
return this.#txMutex.runExclusive(() => {
return this.txMutex.runExclusive(() => {
return this._queryUncoordinated(stmt)
})
}
Expand All @@ -130,6 +112,66 @@ export abstract class DatabaseAdapter
}
}

/**
* A generic database adapter that uses batch execution of SQL queries.
* Extend this database adapter if your driver supports batch execution of SQL queries.
*/
export abstract class BatchDatabaseAdapter
extends DatabaseAdapter
implements DatabaseAdapterInterface
{
/**
* @param statements SQL statements to execute against the DB in a single batch.
*/
abstract execBatch(statements: Statement[]): Promise<RunResult>

async runInTransaction(...statements: Statement[]): Promise<RunResult> {
// Uses a mutex to ensure that transactions are not interleaved.
return this.txMutex.runExclusive(() => {
return this.execBatch(statements)
})
}
}

/**
* A generic database adapter that uses serial execution of SQL queries.
* Extend this database adapter if your driver does not support batch execution of SQL queries.
*/
export abstract class SerialDatabaseAdapter
extends DatabaseAdapter
implements DatabaseAdapterInterface
{
async runInTransaction(...statements: Statement[]): Promise<RunResult> {
// Uses a mutex to ensure that transactions are not interleaved.
const release = await this.txMutex.acquire()
let open = false
let rowsAffected = 0
try {
await this.exec({ sql: 'BEGIN' })
open = true
for (const stmt of statements) {
await this.exec(stmt)
if (isInsertUpdateOrDeleteStatement(stmt.sql)) {
// Fetch the number of rows affected by the last insert, update, or delete
rowsAffected += this.getRowsModified()
}
}
return {
rowsAffected: rowsAffected,
}
} catch (error) {
await this.exec({ sql: 'ROLLBACK' })
open = false
throw error // rejects the promise with the reason for the rollback
} finally {
if (open) {
await this.exec({ sql: 'COMMIT' })
}
release()
}
}
}

class Transaction implements Tx {
constructor(
private adapter: DatabaseAdapter,
Expand Down
4 changes: 1 addition & 3 deletions clients/typescript/src/drivers/generic/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import { DatabaseAdapter } from './adapter'

export { DatabaseAdapter }
export { SerialDatabaseAdapter, BatchDatabaseAdapter } from './adapter'
2 changes: 1 addition & 1 deletion clients/typescript/src/drivers/wa-sqlite/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Database } from './database'
import { Row } from '../../util/types'
import { Statement } from '../../util'
import { DatabaseAdapter as GenericDatabaseAdapter } from '../generic'
import { SerialDatabaseAdapter as GenericDatabaseAdapter } from '../generic'

export class DatabaseAdapter extends GenericDatabaseAdapter {
readonly db: Database
Expand Down
9 changes: 2 additions & 7 deletions clients/typescript/src/util/statements.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { SqlValue, Statement } from './types'

export function isInsertUpdateOrDeleteStatement(sql: string) {
const tpe = sql.toLowerCase().trimStart()
return (
tpe.startsWith('insert') ||
tpe.startsWith('update') ||
tpe.startsWith('delete')
)
export function isInsertUpdateOrDeleteStatement(stmt: string) {
return /^\s*(insert|update|delete)/i.test(stmt)
}

/**
Expand Down

0 comments on commit 5c4a85d

Please sign in to comment.