Skip to content

Commit

Permalink
chore (client): include tests for bigint support in the oplog trigger…
Browse files Browse the repository at this point in the history
…s and the merge logic (#699)

In addition to the tests, the following logic is removed because at the
oplog level, the Record values cannot be BigInt, as the oplog trigger
serializes those values as string. Is the `deserialize` who checks the
postgres type to turn it into the correct JS object.
Leaving the type check there might cause confusion in the future.

```js
if (typeof value === 'bigint') {
    return value.toString()
}
```
  • Loading branch information
davidmartos96 authored Nov 27, 2023
1 parent 144ee6c commit 2001a24
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 66 deletions.
3 changes: 0 additions & 3 deletions clients/typescript/src/satellite/oplog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ function serialiseRow(row?: Rec): string {
return '-Inf'
}
}
if (typeof value === 'bigint') {
return value.toString()
}
return value
})
}
Expand Down
12 changes: 7 additions & 5 deletions clients/typescript/test/migrators/triggers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test('generateTableTriggers should create correct triggers for a table', (t) =>
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'personTable')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
VALUES ('main', 'personTable', 'INSERT', json_object('id', cast(new."id" as TEXT)), json_object('age', new."age", 'bmi', cast(new."bmi" as TEXT), 'id', cast(new."id" as TEXT), 'name', new."name"), NULL, NULL);
VALUES ('main', 'personTable', 'INSERT', json_object('id', cast(new."id" as TEXT)), json_object('age', new."age", 'bmi', cast(new."bmi" as TEXT), 'id', cast(new."id" as TEXT), 'int8', cast(new."int8" as TEXT), 'name', new."name"), NULL, NULL);
END;
`
)
Expand All @@ -46,7 +46,7 @@ test('generateTableTriggers should create correct triggers for a table', (t) =>
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'personTable')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
VALUES ('main', 'personTable', 'UPDATE', json_object('id', cast(new."id" as TEXT)), json_object('age', new."age", 'bmi', cast(new."bmi" as TEXT), 'id', cast(new."id" as TEXT), 'name', new."name"), json_object('age', old."age", 'bmi', cast(old."bmi" as TEXT), 'id', cast(old."id" as TEXT), 'name', old."name"), NULL);
VALUES ('main', 'personTable', 'UPDATE', json_object('id', cast(new."id" as TEXT)), json_object('age', new."age", 'bmi', cast(new."bmi" as TEXT), 'id', cast(new."id" as TEXT), 'int8', cast(new."int8" as TEXT), 'name', new."name"), json_object('age', old."age", 'bmi', cast(old."bmi" as TEXT), 'id', cast(old."id" as TEXT), 'int8', cast(old."int8" as TEXT), 'name', old."name"), NULL);
END;
`
)
Expand All @@ -60,7 +60,7 @@ test('generateTableTriggers should create correct triggers for a table', (t) =>
WHEN 1 == (SELECT flag from _electric_trigger_settings WHERE tablename == 'personTable')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
VALUES ('main', 'personTable', 'DELETE', json_object('id', cast(old."id" as TEXT)), NULL, json_object('age', old."age", 'bmi', cast(old."bmi" as TEXT), 'id', cast(old."id" as TEXT), 'name', old."name"), NULL);
VALUES ('main', 'personTable', 'DELETE', json_object('id', cast(old."id" as TEXT)), NULL, json_object('age', old."age", 'bmi', cast(old."bmi" as TEXT), 'id', cast(old."id" as TEXT), 'int8', cast(old."int8" as TEXT), 'name', old."name"), NULL);
END;
`
)
Expand All @@ -75,7 +75,7 @@ test('oplog insertion trigger should insert row into oplog table', (t) => {
migrateDb()

// Insert a row in the table
const insertRowSQL = `INSERT INTO ${tableName} (id, name, age, bmi) VALUES (1, 'John Doe', 30, 25.5)`
const insertRowSQL = `INSERT INTO ${tableName} (id, name, age, bmi, int8) VALUES (1, 'John Doe', 30, 25.5, 7)`
db.exec(insertRowSQL)

// Check that the oplog table contains an entry for the inserted row
Expand All @@ -99,6 +99,7 @@ test('oplog insertion trigger should insert row into oplog table', (t) => {
age: 30,
bmi: '25.5',
id: '1.0',
int8: '7', // BigInts are serialized as strings in the oplog
name: 'John Doe',
}),
oldRow: null,
Expand All @@ -116,7 +117,7 @@ test('oplog trigger should handle Infinity values correctly', (t) => {
migrateDb()

// Insert a row in the table
const insertRowSQL = `INSERT INTO ${tableName} (id, name, age, bmi) VALUES (-9e999, 'John Doe', 30, 9e999)`
const insertRowSQL = `INSERT INTO ${tableName} (id, name, age, bmi, int8) VALUES (-9e999, 'John Doe', 30, 9e999, 7)`
db.exec(insertRowSQL)

// Check that the oplog table contains an entry for the inserted row
Expand All @@ -140,6 +141,7 @@ test('oplog trigger should handle Infinity values correctly', (t) => {
age: 30,
bmi: 'Inf',
id: '-Inf',
int8: '7', // BigInts are serialized as strings in the oplog
name: 'John Doe',
}),
oldRow: null,
Expand Down
29 changes: 24 additions & 5 deletions clients/typescript/test/satellite/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ export const relations = {
},
],
},
floatTable: {
mergeTable: {
id: 3,
schema: 'public',
table: 'floatTable',
table: 'mergeTable',
tableType: 0,
columns: [
{
Expand All @@ -114,11 +114,23 @@ export const relations = {
primaryKey: true,
},
{
name: 'value',
name: 'real',
type: 'REAL',
isNullable: true,
primaryKey: false,
},
{
name: 'int8',
type: 'INT8',
isNullable: true,
primaryKey: false,
},
{
name: 'bigint',
type: 'BIGINT',
isNullable: true,
primaryKey: false,
},
],
},
personTable: {
Expand Down Expand Up @@ -151,6 +163,12 @@ export const relations = {
isNullable: true,
primaryKey: false,
},
{
name: 'int8',
type: 'INT8',
isNullable: true,
primaryKey: false,
},
],
},
}
Expand Down Expand Up @@ -249,7 +267,7 @@ export const cleanAndStopSatellite = async (
export function migrateDb(db: SqliteDB, table: Table) {
const tableName = table.tableName
// Create the table in the database
const createTableSQL = `CREATE TABLE ${tableName} (id REAL PRIMARY KEY, name TEXT, age INTEGER, bmi REAL)`
const createTableSQL = `CREATE TABLE ${tableName} (id REAL PRIMARY KEY, name TEXT, age INTEGER, bmi REAL, int8 INTEGER)`
db.exec(createTableSQL)

// Apply the initial migration on the database
Expand All @@ -270,13 +288,14 @@ export function migrateDb(db: SqliteDB, table: Table) {
export const personTable: Table = {
namespace: 'main',
tableName: 'personTable',
columns: ['id', 'name', 'age', 'bmi'],
columns: ['id', 'name', 'age', 'bmi', 'int8'],
primary: ['id'],
foreignKeys: [],
columnTypes: {
id: { sqliteType: 'REAL', pgType: PgBasicType.PG_REAL },
name: { sqliteType: 'TEXT', pgType: PgBasicType.PG_TEXT },
age: { sqliteType: 'INTEGER', pgType: PgBasicType.PG_INTEGER },
bmi: { sqliteType: 'REAL', pgType: PgBasicType.PG_REAL },
int8: { sqliteType: 'INTEGER', pgType: PgBasicType.PG_INT8 },
},
}
128 changes: 75 additions & 53 deletions clients/typescript/test/satellite/merge.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import test from 'ava'
import test, { ExecutionContext } from 'ava'
import { mergeEntries } from '../../src/satellite/merge'
import {
OplogEntry,
Expand Down Expand Up @@ -54,67 +54,77 @@ test('merging entries: local no-op updates should cancel incoming delete', (t) =
})

test('merge can handle infinity values', (t) => {
const pk = primaryKeyToStr({ id: 1 })

const to_commit_timestamp = (timestamp: string): Long =>
Long.UZERO.add(new Date(timestamp).getTime())

const tx1: DataTransaction = {
lsn: DEFAULT_LOG_POS,
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:41.000Z'),
changes: [
{
relation: relations.floatTable,
type: DataChangeType.INSERT,
record: { id: 1, value: +Infinity },
tags: [],
},
],
}

const tx2: DataTransaction = {
lsn: DEFAULT_LOG_POS,
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:42.000Z'),
changes: [
{
relation: relations.floatTable,
type: DataChangeType.INSERT,
record: { id: 1, value: -Infinity },
tags: [],
},
],
}
_mergeTableTest(t, {
initial: { real: Infinity },
incoming: { real: -Infinity },
expected: { real: -Infinity },
})
})

// we go through `fromTransaction` on purpose
// in order to also test serialisation/deserialisation of the rows
const entry1: OplogEntry[] = fromTransaction(tx1, relations)
const entry2: OplogEntry[] = fromTransaction(tx2, relations)
test('merge can handle NaN values', (t) => {
_mergeTableTest(t, {
initial: { real: 5.0 },
incoming: { real: NaN },
expected: { real: NaN },
})
})

const merged = mergeEntries('local', entry1, 'remote', entry2, relations)
test('merge can handle BigInt (INT8 pgtype) values', (t) => {
// Big ints are serialized as strings in the oplog
_mergeTableTest(t, {
initial: { int8: '3' },
incoming: { int8: '9223372036854775807' },
expected: { int8: BigInt('9223372036854775807') },
})
})

// tx2 should win because tx1 and tx2 happened concurrently
// but the timestamp of tx2 > tx1
t.like(merged, { 'main.floatTable': { [pk]: { optype: 'UPSERT' } } })
t.deepEqual(merged['main.floatTable'][pk].fullRow, {
id: 1,
value: -Infinity,
test('merge can handle BigInt (BIGINT pgtype) values', (t) => {
// Big ints are serialized as strings in the oplog
_mergeTableTest(t, {
initial: { bigint: '-5' },
incoming: { bigint: '-9223372036854775808' },
expected: { bigint: BigInt('-9223372036854775808') },
})
})

const to_commit_timestamp = (timestamp: string): Long =>
Long.UZERO.add(new Date(timestamp).getTime())

test('merge can handle NaN values', (t) => {
const pk = primaryKeyToStr({ id: 1 })
/**
* Merges two secuential transactions over the same row
* and checks that the value is merged correctly
* The operation is over the "mergeTable" table in the
* database schema
*/
function _mergeTableTest(
t: ExecutionContext,
opts: {
initial: Record<string, unknown>
incoming: Record<string, unknown>
expected: Record<string, unknown>
}
) {
if (opts.initial.id !== undefined) {
throw new Error('id must not be provided in initial')
}
if (opts.incoming.id !== undefined) {
throw new Error('id must not be provided in incoming')
}
if (opts.expected.id !== undefined) {
throw new Error('id must not be provided in expected')
}

const pkId = 1
const pk = primaryKeyToStr({ id: pkId })

const tx1: DataTransaction = {
lsn: DEFAULT_LOG_POS,
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:41.000Z'),
changes: [
{
relation: relations.floatTable,
relation: relations.mergeTable,
type: DataChangeType.INSERT,
record: { id: 1, value: 5.0 },
record: { ...opts.initial, id: pkId },
tags: [],
},
],
Expand All @@ -125,9 +135,9 @@ test('merge can handle NaN values', (t) => {
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:42.000Z'),
changes: [
{
relation: relations.floatTable,
relation: relations.mergeTable,
type: DataChangeType.INSERT,
record: { id: 1, value: NaN },
record: { ...opts.incoming, id: pkId },
tags: [],
},
],
Expand All @@ -142,9 +152,13 @@ test('merge can handle NaN values', (t) => {

// tx2 should win because tx1 and tx2 happened concurrently
// but the timestamp of tx2 > tx1
t.like(merged, { 'main.floatTable': { [pk]: { optype: 'UPSERT' } } })
t.deepEqual(merged['main.floatTable'][pk].fullRow, { id: 1, value: NaN })
})
t.like(merged, { 'main.mergeTable': { [pk]: { optype: 'UPSERT' } } })

t.deepEqual(merged['main.mergeTable'][pk].fullRow, {
...opts.expected,
id: pkId,
})
}

test('merge works on oplog entries', (t) => {
const db = new Database(':memory:')
Expand All @@ -153,7 +167,7 @@ test('merge works on oplog entries', (t) => {
migrateDb(db, personTable)

// Insert a row in the table
const insertRowSQL = `INSERT INTO ${personTable.tableName} (id, name, age, bmi) VALUES (9e999, 'John Doe', 30, 25.5)`
const insertRowSQL = `INSERT INTO ${personTable.tableName} (id, name, age, bmi, int8) VALUES (9e999, 'John Doe', 30, 25.5, 7)`
db.exec(insertRowSQL)

// Fetch the oplog entry for the inserted row
Expand All @@ -174,7 +188,14 @@ test('merge works on oplog entries', (t) => {
{
relation: relations[personTable.tableName as keyof typeof relations],
type: DataChangeType.INSERT,
record: { age: 30, bmi: 8e888, id: 9e999, name: 'John Doe' }, // fields must be ordered alphabetically to match the behavior of the triggers
record: {
// fields must be ordered alphabetically to match the behavior of the triggers
age: 30,
bmi: 8e888,
id: 9e999,
int8: '224', // Big ints are serialized as strings in the oplog
name: 'John Doe',
},
tags: [],
},
],
Expand All @@ -200,5 +221,6 @@ test('merge works on oplog entries', (t) => {
name: 'John Doe',
age: 30,
bmi: Infinity,
int8: 224n,
})
})

0 comments on commit 2001a24

Please sign in to comment.