Skip to content

Commit

Permalink
fix(client): Fix batch deletes causing call stack overflow (#1394)
Browse files Browse the repository at this point in the history
Addresses
[VAX-1985](https://linear.app/electric-sql/issue/VAX-1985/large-unsubscribes-lead-client-with-wa-sqlite-driver-to-a-stack)

Apparently the method we were using, where we chained `OR` and `AND`
clauses in the `WHERE` clause, led to:
1. SQLite call stack overflow (at least with `wa-sqlite`) at around 6k
`OR` clauses, far below the max parameter limit
2. Sub-optimal performance in both SQLite and PGlite

I've converted the batching to use `IN` statements for both
single-column queries and multi-column ones, like with composite primary
keys. The performance is significantly improved and has room for more
improvement:

#### Old Batching
##### Shadow Table 30k deletes
- ~4700ms pglite
- ~500ms wa-sqlite (arbitrary small batching to avoid overflow)

##### Comment 9k deletes
- ~700ms pglite
- ~2000ms wa-sqlite (max 6k batches)

#### New Batching
##### Shadow Table 30k deletes
- ~100ms pglite
- ~450ms wa-sqlite

##### Comment 9k deletes
- ~40ms pglite
- ~700ms wa-sqlite


While the shadow table deletes take similar time for wa-sqlite (~500ms),
the shadow table uses a triple composite key of 3 string columns.
There's room for significant optimization there, concatenating the
parameters and deleting based on the concatenated PK columns reduced the
time to execute from ~500ms to ~50ms - pglite also had a very small
improvement but hard to measure.

I think it's possible to avoid having a composite primary key for the
shadow table since we know it's 3 string columns and they can be
collapsed into one if that proves to be a significant optimization, but
that is outside of the scope of this fix, just raising it here as a
potential optimization.


NOTE: this improvement is the result of the same investigation that
yielded #1389, both
together result in a useable linearlite with >2k issues otherwise we run
into timeouts and call stack overflows.
  • Loading branch information
msfstef committed Jun 26, 2024
1 parent cb19c58 commit 5e2e276
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-poems-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fix `DELETE` statement batching causing call stack overflow and improve performance.
59 changes: 38 additions & 21 deletions clients/typescript/src/migrators/query-builder/builder.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ForeignKey } from '../triggers'
import { QualifiedTablename, SqlValue, Statement } from '../../util'
import { QualifiedTablename, Row, SqlValue, Statement } from '../../util'

export type Dialect = 'SQLite' | 'Postgres'
export abstract class QueryBuilder {
Expand Down Expand Up @@ -287,6 +287,23 @@ export abstract class QueryBuilder {
shadowTable: QualifiedTablename
): string

/**
* Generates IN clause for a WHERE statement, checking that the given
* columns have a value present in the given args array
*
* The `args` array must be an array of values to compare each column to.
*
* If using a single column, then it can be a 1-dimensional array of
* values to check against.
*
* If using multiple columns, then it needs to be a 2-dimensional array where
* each entry is a row of values that the columns need to conform to
*/
protected abstract createInClause(
columns: string[],
args: (string | string[])[]
): string

/**
* Prepare multiple batched insert statements for an array of records.
*
Expand All @@ -302,10 +319,10 @@ export abstract class QueryBuilder {
* @param suffixSql optional SQL string to append to each insert statement
* @returns array of statements ready to be executed by the adapter
*/
prepareInsertBatchedStatements(
prepareInsertBatchedStatements<T extends Row>(
baseSql: string,
columns: string[],
records: Record<string, SqlValue>[],
columns: Array<keyof T>,
records: Row[],
maxParameters: number,
suffixSql = ''
): Statement[] {
Expand Down Expand Up @@ -376,23 +393,20 @@ export abstract class QueryBuilder {
* @param suffixSql optional SQL string to append to each insert statement
* @returns array of statements ready to be executed by the adapter
*/
public prepareDeleteBatchedStatements<T extends object>(
public prepareDeleteBatchedStatements<T extends Row>(
baseSql: string,
columns: Array<keyof T>,
columns: string[],
records: T[],
maxParameters: number,
suffixSql = ''
): Statement[] {
const stmts: Statement[] = []
const columnCount = columns.length
const recordCount = records.length
const isSingleColumnQuery = columnCount === 1
// Amount of rows we can delete at once
const batchMaxSize = Math.floor(maxParameters / columnCount)

// keep a temporary join array for joining strings, to avoid
// the overhead of generating a new array every time
const tempColumnComparisonJoinArr = Array.from({ length: columnCount })

let processed = 0
let prevDeleteCount = -1
let deletePattern = ''
Expand All @@ -403,17 +417,20 @@ export abstract class QueryBuilder {
// of `batchMaxSize` - ideally we can externalize this cache since for a
// given adapter this is _always_ going to be the same
if (currentDeleteCount !== prevDeleteCount) {
deletePattern = Array.from(
{ length: currentDeleteCount },
(_, recordIdx) => {
for (let i = 0; i < columnCount; i++) {
tempColumnComparisonJoinArr[i] = `"${
columns[i] as string
}" = ${this.makePositionalParam(recordIdx * columnCount + i + 1)}`
}
return ` (${tempColumnComparisonJoinArr.join(' AND ')})`
}
).join(' OR')
deletePattern =
' ' +
this.createInClause(
columns,
Array.from({ length: currentDeleteCount }, (_, recordIdx) =>
isSingleColumnQuery
? this.makePositionalParam(recordIdx + 1)
: Array.from({ length: columnCount }, (_, colIdx) =>
this.makePositionalParam(
recordIdx * columnCount + colIdx + 1
)
)
)
)
}
let sql = baseSql + deletePattern

Expand Down
14 changes: 14 additions & 0 deletions clients/typescript/src/migrators/query-builder/pgBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,20 @@ class PgBuilder extends QueryBuilder {
makePositionalParam(i: number): string {
return this.paramSign + i
}

protected createInClause(
columns: string[],
args: (string | string[])[]
): string {
const useTuples = columns.length > 1
return `(${columns.map(quote).join(', ')}) IN (${
useTuples
? ` VALUES ${(args as string[][])
.map((tup) => `(${tup.join(', ')})`)
.join(', ')}`
: args.join(', ')
})`
}
}

export default new PgBuilder()
14 changes: 14 additions & 0 deletions clients/typescript/src/migrators/query-builder/sqliteBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { QualifiedTablename, SqlValue, Statement } from '../../util'
import { QueryBuilder } from './builder'
import { ForeignKey } from '../triggers'

const quote = (col: string) => `"${col}"`

class SqliteBuilder extends QueryBuilder {
readonly dialect = 'SQLite'
readonly AUTOINCREMENT_PK = 'INTEGER PRIMARY KEY AUTOINCREMENT'
Expand Down Expand Up @@ -312,6 +314,18 @@ class SqliteBuilder extends QueryBuilder {
makePositionalParam(_i: number): string {
return this.paramSign
}

protected createInClause(
columns: string[],
args: (string | string[])[]
): string {
const useTuples = columns.length > 1
return `(${columns.map(quote).join(`, `)}) IN (${
useTuples
? (args as string[][]).map((tup) => `(${tup.join(`, `)})`).join(', ')
: args.join(', ')
})`
}
}

export default new SqliteBuilder()
2 changes: 1 addition & 1 deletion clients/typescript/src/satellite/oplog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export type Tag = string
export type ShadowKey = string

// Oplog table schema.
export interface OplogEntry {
export type OplogEntry = {
namespace: string
tablename: string
primaryKey: string // json object
Expand Down
5 changes: 3 additions & 2 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
ReplicatedRowTransformer,
DataGone,
ServerTransaction,
Row,
} from '../util/types'
import { SatelliteOpts } from './config'
import { Client, Satellite } from './index'
Expand Down Expand Up @@ -1564,7 +1565,7 @@ export class SatelliteProcess implements Satellite {

// Batch-delete shadow entries
const stmts = this.builder.prepareDeleteBatchedStatements(
`DELETE FROM ${this.opts.shadowTable} WHERE `,
`DELETE FROM ${this.opts.shadowTable} WHERE`,
['namespace', 'tablename', 'primaryKey'],
fakeOplogEntries,
this.maxSqlParameters
Expand All @@ -1588,7 +1589,7 @@ export class SatelliteProcess implements Satellite {
...this.builder.prepareDeleteBatchedStatements(
`DELETE FROM ${fqtn} WHERE`,
pkCols,
gone.map((x) => x.oldRecord) as Record<string, SqlValue>[],
gone.map((x) => x.oldRecord as Row),
this.maxSqlParameters
)
)
Expand Down
129 changes: 105 additions & 24 deletions clients/typescript/test/migrators/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,26 @@ export const builderTests = (test: TestFn<ContextType>) => {
])
})

test('makePositionalParam generates correct parameter strings', (t) => {
const { builder } = t.context
const numParams = 4
let expectedParams: string[]
switch (builder.dialect) {
case 'SQLite':
expectedParams = ['?', '?', '?', '?']
break
case 'Postgres':
expectedParams = ['$1', '$2', '$3', '$4']
break
}
t.deepEqual(
Array.from({ length: numParams }, (_, idx) =>
builder.makePositionalParam(idx + 1)
),
expectedParams
)
})

test('prepareInsertBatchedStatements correctly splits up data in batches', (t) => {
const { builder } = t.context
const data = [
Expand All @@ -350,18 +370,23 @@ export const builderTests = (test: TestFn<ContextType>) => {
5 // at most 5 `?`s in one SQL statement, so we should see the split
)

const posArgs: string[] =
builder.dialect === 'SQLite'
? ['?', '?', '?', '?']
: ['$1', '$2', '$3', '$4']
let parameters: string[]
switch (builder.dialect) {
case 'SQLite':
parameters = ['(?, ?), (?, ?)', '(?, ?)']
break
case 'Postgres':
parameters = ['($1, $2), ($3, $4)', '($1, $2)']
break
}

t.deepEqual(stmts, [
{
sql: `INSERT INTO test (a, b) VALUES (${posArgs[0]}, ${posArgs[1]}), (${posArgs[2]}, ${posArgs[3]})`,
sql: `INSERT INTO test (a, b) VALUES ${parameters[0]}`,
args: [1, 2, 3, 4],
},
{
sql: `INSERT INTO test (a, b) VALUES (${posArgs[0]}, ${posArgs[1]})`,
sql: `INSERT INTO test (a, b) VALUES ${parameters[1]}`,
args: [5, 6],
},
])
Expand All @@ -381,18 +406,23 @@ export const builderTests = (test: TestFn<ContextType>) => {
5
)

const posArgs: string[] =
builder.dialect === 'SQLite'
? ['?', '?', '?', '?']
: ['$1', '$2', '$3', '$4']
let parameters: string[]
switch (builder.dialect) {
case 'SQLite':
parameters = ['(?, ?), (?, ?)', '(?, ?)']
break
case 'Postgres':
parameters = ['($1, $2), ($3, $4)', '($1, $2)']
break
}

t.deepEqual(stmts, [
{
sql: `INSERT INTO test (a, b) VALUES (${posArgs[0]}, ${posArgs[1]}), (${posArgs[2]}, ${posArgs[3]})`,
sql: `INSERT INTO test (a, b) VALUES ${parameters[0]}`,
args: [2, 1, 4, 3],
},
{
sql: `INSERT INTO test (a, b) VALUES (${posArgs[0]}, ${posArgs[1]})`,
sql: `INSERT INTO test (a, b) VALUES ${parameters[1]}`,
args: [6, 5],
},
])
Expand All @@ -412,24 +442,66 @@ export const builderTests = (test: TestFn<ContextType>) => {
5 // at most 5 `?`s in one SQL statement, so we should see the split
)

const posArgs: string[] =
builder.dialect === 'SQLite'
? ['?', '?', '?', '?']
: ['$1', '$2', '$3', '$4']
let parameters: string[]
switch (builder.dialect) {
case 'SQLite':
parameters = ['(?, ?), (?, ?)', '(?, ?)']
break
case 'Postgres':
parameters = ['($1, $2), ($3, $4)', '($1, $2)']
break
}

t.deepEqual(stmts, [
{
sql: `DELETE FROM test WHERE ("a" = ${posArgs[0]} AND "b" = ${posArgs[1]}) OR ("a" = ${posArgs[2]} AND "b" = ${posArgs[3]})`,
sql: `DELETE FROM test WHERE ("a", "b") IN (${builder.pgOnly(
' VALUES '
)}${parameters[0]})`,

args: [1, 2, 3, 4],
},
{
sql: `DELETE FROM test WHERE ("a" = ${posArgs[0]} AND "b" = ${posArgs[1]})`,
sql: `DELETE FROM test WHERE ("a", "b") IN (${builder.pgOnly(
' VALUES '
)}${parameters[1]})`,
args: [5, 6],
},
])
})

test('prepareDeleteBatchedStatements handles single column deletes', (t) => {
const { builder } = t.context
const data = [
{ a: 1, b: 2 },
{ a: 3, b: 4 },
{ a: 5, b: 6 },
]
const stmts = builder.prepareDeleteBatchedStatements(
'DELETE FROM test WHERE',
['a'],
data,
5 // at most 5 `?`s in one SQL statement, so we should see the split
)

let parameters: string[]
switch (builder.dialect) {
case 'SQLite':
parameters = ['?, ?, ?']
break
case 'Postgres':
parameters = ['$1, $2, $3']
break
}

t.deepEqual(stmts, [
{
sql: `DELETE FROM test WHERE ("a") IN (${parameters[0]})`,

args: [1, 3, 5],
},
])
})

test('prepareDeleteBatchedStatements respects column order', (t) => {
const { builder } = t.context
const data = [
Expand All @@ -444,18 +516,27 @@ export const builderTests = (test: TestFn<ContextType>) => {
5
)

const posArgs: string[] =
builder.dialect === 'SQLite'
? ['?', '?', '?', '?']
: ['$1', '$2', '$3', '$4']
let parameters: string[]
switch (builder.dialect) {
case 'SQLite':
parameters = ['(?, ?), (?, ?)', '(?, ?)']
break
case 'Postgres':
parameters = ['($1, $2), ($3, $4)', '($1, $2)']
break
}

t.deepEqual(stmts, [
{
sql: `DELETE FROM test WHERE ("b" = ${posArgs[0]} AND "a" = ${posArgs[1]}) OR ("b" = ${posArgs[2]} AND "a" = ${posArgs[3]})`,
sql: `DELETE FROM test WHERE ("b", "a") IN (${builder.pgOnly(
' VALUES '
)}${parameters[0]})`,
args: [2, 1, 4, 3],
},
{
sql: `DELETE FROM test WHERE ("b" = ${posArgs[0]} AND "a" = ${posArgs[1]})`,
sql: `DELETE FROM test WHERE ("b", "a") IN (${builder.pgOnly(
' VALUES '
)}${parameters[1]})`,
args: [6, 5],
},
])
Expand Down

0 comments on commit 5e2e276

Please sign in to comment.