Skip to content

Commit

Permalink
feat: enable compensations (#340)
Browse files Browse the repository at this point in the history
Compensations are stored as a special-form updates that don't specify
previous row values. Previously, such updates were treated as inserts by
the server, but now they are converted to an update that doesn't mark
any columns as modified. This allows PG to just use the tag for reviving
the row as needed.

This is covered by an e2e test.

Closes VAX-801
  • Loading branch information
icehaunter committed Aug 21, 2023
1 parent 3cf2bc2 commit f60ce16
Show file tree
Hide file tree
Showing 25 changed files with 1,667 additions and 950 deletions.
6 changes: 6 additions & 0 deletions .changeset/happy-jars-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@core/electric": patch
"electric-sql": patch
---

Implemented correct semantics for compensations to work across the stack
2 changes: 1 addition & 1 deletion clients/typescript/src/migrators/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const data = {
//`-- Somewhere to track migrations\n`,
`CREATE TABLE IF NOT EXISTS ${migrationsTable} (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n version TEXT NOT NULL UNIQUE,\n applied_at TEXT NOT NULL\n);`,
//`-- Initialisation of the metadata table\n`,
`INSERT INTO ${metaTable} (key, value) VALUES ('compensations', 0), ('lastAckdRowId','0'), ('lastSentRowId', '0'), ('lsn', ''), ('clientId', ''), ('subscriptions', '');`,
`INSERT INTO ${metaTable} (key, value) VALUES ('compensations', 1), ('lastAckdRowId','0'), ('lastSentRowId', '0'), ('lsn', ''), ('clientId', ''), ('subscriptions', '');`,
//`-- These are toggles for turning the triggers on and off\n`,
`DROP TABLE IF EXISTS ${triggersTable};`,
`CREATE TABLE ${triggersTable} (tablename TEXT PRIMARY KEY, flag INTEGER);`,
Expand Down
54 changes: 31 additions & 23 deletions clients/typescript/src/migrators/triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,48 +111,61 @@ export function generateOplogTriggers(

/**
* Generates triggers for compensations for all foreign keys in the provided table.
*
* Compensation is recorded as a specially-formatted update. It acts as a no-op, with
* previous value set to NULL, and it's on the server to figure out that this is a no-op
* compensation operation (usually `UPDATE` would have previous row state known). The entire
* reason for it existing is to maybe revive the row if it has been deleted, so we need correct tags.
*
* The compensation update contains _just_ the primary keys, no other columns are present.
*
* @param tableFullName Full name of the table.
* @param table The corresponding table.
* @param tables Map of all tables (needed to look up the tables that are pointed at by FKs).
* @returns An array of SQLite statements that add the necessary compensation triggers.
*/
function generateCompensationTriggers(
tableFullName: TableFullName,
table: Table,
tables: Tables
table: Table
): Statement[] {
const { tableName, namespace, foreignKeys } = table

const makeTriggers = (foreignKey: ForeignKey) => {
const { childKey } = foreignKey
const fkTable = tables.get(foreignKey.table)
if (fkTable === undefined)
throw new Error(`Table ${foreignKey.table} for foreign key not found.`)
const joinedFkPKs = joinColsForJSON(fkTable.primary)
const joinedFkCols = joinColsForJSON(fkTable.columns)

const fkTableNamespace = 'main' // currently, Electric always uses the 'main' namespace
const fkTableName = foreignKey.table
const fkTablePK = foreignKey.parentKey // primary key of the table pointed at by the FK.
const joinedFkPKs = joinColsForJSON([fkTablePK])

return [
`DROP TRIGGER IF EXISTS compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog;`,
`-- Triggers for foreign key compensations
DROP TRIGGER IF EXISTS compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog;`,
// The compensation trigger inserts a row in `_electric_oplog` if the row pointed at by the FK exists
// The way how this works is that the values for the row are passed to the nested SELECT
// which will return those values for every record that matches the query
// which can be at most once since we filter on the foreign key which is also the primary key and thus is unique.
`
CREATE TRIGGER compensation_insert_${namespace}_${tableName}_${childKey}_into_oplog
AFTER INSERT ON ${tableFullName}
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTable.namespace}.${fkTable.tableName}') AND
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTableNamespace}.${fkTableName}') AND
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTable.namespace}', '${fkTable.tableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkCols}), NULL, NULL
FROM ${fkTable.namespace}.${fkTable.tableName} WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey};
SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
FROM ${fkTableNamespace}.${fkTableName} WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey};
END;
`,
`DROP TRIGGER IF EXISTS compensation_update_${namespace}_${tableName}_${foreignKey.childKey}_into_oplog;`,
`
CREATE TRIGGER compensation_update_${namespace}_${tableName}_${foreignKey.childKey}_into_oplog
AFTER UPDATE ON ${namespace}.${tableName}
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTable.namespace}.${fkTable.tableName}') AND
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == '${fkTableNamespace}.${fkTableName}') AND
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTable.namespace}', '${fkTable.tableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkCols}), NULL, NULL
FROM ${fkTable.namespace}.${fkTable.tableName} WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey};
SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
FROM ${fkTableNamespace}.${fkTableName} WHERE ${foreignKey.parentKey} = new.${foreignKey.childKey};
END;
`,
].map(mkStatement)
Expand All @@ -170,15 +183,10 @@ function generateCompensationTriggers(
*/
export function generateTableTriggers(
tableFullName: TableFullName,
tables: Tables
table: Table
): Statement[] {
const table = tables.get(tableFullName)
if (typeof table === 'undefined')
throw new Error(
`Could not generate triggers for ${tableFullName}. Table not found.`
)
const oplogTriggers = generateOplogTriggers(tableFullName, table)
const fkTriggers = generateCompensationTriggers(tableFullName, table, tables)
const fkTriggers = generateCompensationTriggers(tableFullName, table) //, tables)
return oplogTriggers.concat(fkTriggers)
}

Expand All @@ -189,8 +197,8 @@ export function generateTableTriggers(
*/
export function generateTriggers(tables: Tables): Statement[] {
const tableTriggers: Statement[] = []
tables.forEach((_table, tableFullName) => {
const triggers = generateTableTriggers(tableFullName, tables)
tables.forEach((table, tableFullName) => {
const triggers = generateTableTriggers(tableFullName, table) //, tables)
tableTriggers.push(...triggers)
})

Expand Down
75 changes: 72 additions & 3 deletions clients/typescript/src/satellite/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,79 @@ import {
Tag,
OplogEntryChanges,
ShadowEntryChanges,
OplogEntry,
PendingChanges,
localOperationsToTableChanges,
remoteOperationsToTableChanges,
generateTag,
OPTYPES,
} from './oplog'
import { difference, union } from '../util/sets'
import { Row } from '../util'

/**
* Merge server-sent operation with local pending oplog to arrive at the same row state the server is at.
* @param localOrigin string specifying the local origin
* @param local local oplog entries
* @param incomingOrigin string specifying the upstream origin
* @param incoming incoming oplog entries
* @returns Changes to be made to the shadow tables
*/
export function mergeEntries(
localOrigin: string,
local: OplogEntry[],
incomingOrigin: string,
incoming: OplogEntry[]
): PendingChanges {
const localTableChanges = localOperationsToTableChanges(
local,
(timestamp: Date) => {
return generateTag(localOrigin, timestamp)
}
)
const incomingTableChanges = remoteOperationsToTableChanges(incoming)

for (const [tablename, incomingMapping] of Object.entries(
incomingTableChanges
)) {
const localMapping = localTableChanges[tablename]

if (localMapping === undefined) {
continue
}

for (const [primaryKey, incomingChanges] of Object.entries(
incomingMapping
)) {
const localInfo = localMapping[primaryKey]
if (localInfo === undefined) {
continue
}
const [_, localChanges] = localInfo

const changes = mergeChangesLastWriteWins(
localOrigin,
localChanges.changes,
incomingOrigin,
incomingChanges.changes,
incomingChanges.fullRow
)
let optype

const tags = mergeOpTags(localChanges, incomingChanges)
if (tags.length == 0) {
optype = OPTYPES.delete
} else {
optype = OPTYPES.upsert
}

Object.assign(incomingChanges, { changes, optype, tags })
}
}

return incomingTableChanges
}

/**
* Merge two sets of changes, using the timestamp to arbitrate conflicts
* so that the last write wins.
Expand Down Expand Up @@ -69,14 +138,14 @@ export const mergeChangesLastWriteWins = (
}, initialValue)
}

export const mergeOpTags = (
function mergeOpTags(
local: OplogEntryChanges,
remote: ShadowEntryChanges
): Tag[] => {
): Tag[] {
return calculateTags(local.tag, remote.tags, local.clearTags)
}

const calculateTags = (tag: Tag | null, tags: Tag[], clear: Tag[]) => {
function calculateTags(tag: Tag | null, tags: Tag[], clear: Tag[]): Tag[] {
if (tag == null) {
return difference(tags, clear)
} else {
Expand Down
6 changes: 3 additions & 3 deletions clients/typescript/src/satellite/oplog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export interface OplogTableChanges {
}
}

export interface ShadowTableChanges {
export interface PendingChanges {
[qualifiedTablenameStr: string]: {
[primaryKey: string]: ShadowEntryChanges
}
Expand Down Expand Up @@ -232,8 +232,8 @@ export const localOperationsToTableChanges = (

export const remoteOperationsToTableChanges = (
operations: OplogEntry[]
): ShadowTableChanges => {
const initialValue: ShadowTableChanges = {}
): PendingChanges => {
const initialValue: PendingChanges = {}

return operations.reduce((acc, entry) => {
const entryChanges = remoteEntryToChanges(entry)
Expand Down
69 changes: 4 additions & 65 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,17 @@ import {
SqlValue,
} from '../util/types'
import { SatelliteOpts } from './config'
import { mergeChangesLastWriteWins, mergeOpTags } from './merge'
import { mergeEntries } from './merge'
import {
encodeTags,
fromTransaction,
generateTag,
getShadowPrimaryKey,
localOperationsToTableChanges,
OplogEntry,
OPTYPES,
primaryKeyToStr,
remoteOperationsToTableChanges,
ShadowEntry,
ShadowEntryChanges,
ShadowTableChanges,
toTransactions,
} from './oplog'
import {
Expand All @@ -65,7 +62,7 @@ import {
} from '../util/common'

import Log from 'loglevel'
import { generateOplogTriggers } from '../migrators/triggers'
import { generateTableTriggers } from '../migrators/triggers'
import { InMemorySubscriptionsManager } from './shapes/manager'
import {
ClientShapeDefinition,
Expand Down Expand Up @@ -869,7 +866,7 @@ export class SatelliteProcess implements Satellite {
// any existing subscription. We need a way to detect and prevent that.
async _apply(incoming: OplogEntry[], incoming_origin: string) {
const local = await this._getEntries()
const merged = this._mergeEntries(
const merged = mergeEntries(
this._authState!.clientId,
local,
incoming_origin,
Expand Down Expand Up @@ -953,64 +950,6 @@ export class SatelliteProcess implements Satellite {
}
}

// Merge changes, with last-write-wins and add-wins semantics.
// clearTags field is used by the calling code to determine new value of
// the shadowTags
_mergeEntries(
local_origin: string,
local: OplogEntry[],
incoming_origin: string,
incoming: OplogEntry[]
): ShadowTableChanges {
const localTableChanges = localOperationsToTableChanges(
local,
(timestamp: Date) => {
return generateTag(local_origin, timestamp)
}
)
const incomingTableChanges = remoteOperationsToTableChanges(incoming)

for (const [tablename, incomingMapping] of Object.entries(
incomingTableChanges
)) {
const localMapping = localTableChanges[tablename]

if (localMapping === undefined) {
continue
}

for (const [primaryKey, incomingChanges] of Object.entries(
incomingMapping
)) {
const localInfo = localMapping[primaryKey]
if (localInfo === undefined) {
continue
}
const [_, localChanges] = localInfo

const changes = mergeChangesLastWriteWins(
local_origin,
localChanges.changes,
incoming_origin,
incomingChanges.changes,
incomingChanges.fullRow
)
let optype

const tags = mergeOpTags(localChanges, incomingChanges)
if (tags.length == 0) {
optype = OPTYPES.delete
} else {
optype = OPTYPES.upsert
}

Object.assign(incomingChanges, { changes, optype, tags })
}
}

return incomingTableChanges
}

_updateRelations(rel: Omit<Relation, 'id'>) {
if (rel.tableType === SatRelation_RelationType.TABLE) {
// this relation may be for a newly created table
Expand Down Expand Up @@ -1503,5 +1442,5 @@ export function generateTriggersForTable(tbl: MigrationTable): Statement[] {
}),
}
const fullTableName = table.namespace + '.' + table.tableName
return generateOplogTriggers(fullTableName, table)
return generateTableTriggers(fullTableName, table)
}
41 changes: 41 additions & 0 deletions clients/typescript/test/satellite/merge.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import test from 'ava'
import { mergeEntries } from '../../src/satellite/merge'
import { OplogEntry, primaryKeyToStr } from '../../src/satellite/oplog'

test('merging entries: local no-op updates should cancel incoming delete', (t) => {
const pk = primaryKeyToStr({ id: 1 })

const local: OplogEntry[] = [
{
rowid: 0,
namespace: 'main',
tablename: 'public',
optype: 'UPDATE',
timestamp: '1970-01-02T03:46:41.000Z', // 100001000 as a unix timestamp
primaryKey: pk,
newRow: JSON.stringify({ id: 1 }),
oldRow: undefined,
clearTags: JSON.stringify(['common@100000000']),
},
]

const remote: OplogEntry[] = [
{
rowid: 0,
namespace: 'main',
tablename: 'public',
optype: 'DELETE',
timestamp: '1970-01-02T03:46:42.000Z', // 100002000 as a unix timestamp
primaryKey: pk,
oldRow: JSON.stringify({ id: 1, value: 'TEST' }),
clearTags: JSON.stringify(['common@100000000']),
},
]

const merged = mergeEntries('local', local, 'remote', remote)

// Merge should resolve into the UPSERT for this row, since the remote DELETE didn't observe this local update
t.like(merged, { 'main.public': { [pk]: { optype: 'UPSERT' } } })
t.deepEqual(merged['main.public'][pk].tags, ['local@100001000'])
t.deepEqual(merged['main.public'][pk].fullRow, { id: 1, value: 'TEST' })
})
Loading

0 comments on commit f60ce16

Please sign in to comment.