Skip to content

Commit

Permalink
feat (client): extract sync API and make DAL optional (#1355)
Browse files Browse the repository at this point in the history
This PR extracts the sync method out of the DAL by introducing a
`subscribe` method on the `ElectricClient.sync` object. To compute the
shape requested by the user we need a description of the database
schema, namely:
- Table name, and for each table:
  - columns (column name + PG type)
  - outgoing and incoming relations

The outgoing relations are the FKs in the table that reference a column
in (another) table.
The incoming relations are FKs defined in (another) table that reference
a column of this table.
The incoming relations are required in order to be able to traverse FKs
in the reverse direction, e.g. this is needed when syncing the following
shape because there is no FK from `issues` to `comments`, only from
`comments` to `issues`:
```json
{
  "table": "issues",
  "include": {
    "comments": true
  }
}
```

The database description used to be generated by our generator and
included in the generated Electric client. To truly extract the sync API
i had to make it independent of the DAL and the generated client.
Therefore, i extended the CLI to use the metadata that is provided with
the migrations to generate the DB description and bundle that into the
app.

Note:
- I modified our TS satellite client in the e2e tests such that every
DAL query has raw equivalent and to dynamically dispatch to the right
version depending whether we are running with or without the DAL. I
introduced a matrix in CI such that every Satellite e2e test is ran once
with the DAL and once without the DAL.

Questions:
- The modified CLI changes the behavior of `electric-sql generate` as by
default it will no longer generate the DAL. If you want to generate the
DAL, you need to run it with the `--with-dal` flag as in `electric-sql
generate --with-dal`. Is thi acceptable, or we want to keep generating
the DAL by default and have a `--no-dal` flag instead?
- I modified the format of the table columns in the generated DB
description, it was previously a `Map<TableName, PgType>` and is now
represented as a `Record<TableName, PgType>`. While this is not strictly
necessary, this simplifies the generation of the DB description.
Downside is that clients generated with the old generator won't work
with newer versions of the ts-client and will require re-generating the
DAL with the new ts-client. Is this fine or should i change it back to
using a `Map<TableName, PgType>` ?
  • Loading branch information
kevin-dp committed Jun 26, 2024
1 parent 5e2e276 commit 837ce92
Show file tree
Hide file tree
Showing 40 changed files with 2,454 additions and 716 deletions.
6 changes: 6 additions & 0 deletions .changeset/six-knives-notice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"electric-sql": patch
"@electric-sql/prisma-generator": patch
---

Extract the sync API out of the DAL and make the DAL optional.
4 changes: 3 additions & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,19 @@ jobs:
https://analytics-api.buildkite.com/v1/uploads
e2e_satellite_tests:
name: E2E Satellite tests
runs-on: electric-e2e-8-32
strategy:
matrix:
dialect: [SQLite, Postgres]
dal: [true, false]
name: E2E Satellite tests (Dialect ${{ matrix.dialect }} - uses DAL? ${{ matrix.dal }})
defaults:
run:
working-directory: e2e
env:
BUILDKITE_ANALYTICS_TOKEN: ${{ secrets.BUILDKITE_TEST_ANALYTICS_E2E }}
DIALECT: ${{ matrix.dialect }}
DAL: ${{ matrix.dal }}
steps:
- uses: actions/checkout@v3
with:
Expand Down
9 changes: 9 additions & 0 deletions clients/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"./node": "./dist/drivers/better-sqlite3/index.js",
"./node-postgres": "./dist/drivers/node-postgres/index.js",
"./pglite": "./dist/drivers/pglite/index.js",
"./protocol": "./dist/_generated/protocol/satellite.js",
"./react": "./dist/frameworks/react/index.js",
"./tauri-postgres": "./dist/drivers/tauri-postgres/index.js",
"./vuejs": "./dist/frameworks/vuejs/index.js",
Expand All @@ -78,6 +79,9 @@
"capacitor": [
"./dist/drivers/capacitor-sqlite/index.d.ts"
],
"client": [
"./dist/client/index.d.ts"
],
"expo": [
"./dist/drivers/expo-sqlite/index.d.ts"
],
Expand All @@ -96,6 +100,9 @@
"pglite": [
"./dist/drivers/pglite/index.d.ts"
],
"protocol": [
"./dist/_generated/protocol/satellite.d.ts"
],
"react": [
"./dist/frameworks/react/index.d.ts"
],
Expand Down Expand Up @@ -181,6 +188,7 @@
"lodash.flow": "^3.5.0",
"lodash.groupby": "^4.6.0",
"lodash.isequal": "^4.5.0",
"lodash.keyby": "^4.6.0",
"lodash.mapvalues": "^4.6.0",
"lodash.omitby": "^4.6.0",
"lodash.partition": "^4.6.0",
Expand Down Expand Up @@ -209,6 +217,7 @@
"@types/lodash.flow": "^3.5.7",
"@types/lodash.groupby": "^4.6.7",
"@types/lodash.isequal": "^4.5.6",
"@types/lodash.keyby": "^4.6.9",
"@types/lodash.mapvalues": "^4.6.7",
"@types/lodash.omitby": "^4.6.7",
"@types/lodash.partition": "^4.6.7",
Expand Down
3 changes: 3 additions & 0 deletions clients/typescript/src/client/conversions/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { postgresConverter } from './postgres'
export { sqliteConverter } from './sqlite'
export { PgBasicType } from './types'
9 changes: 6 additions & 3 deletions clients/typescript/src/client/conversions/input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ export class InputTransformer {
value: any,
fields: Fields
): any {
const pgType = fields.get(field)
const pgType = fields[field]

if (!pgType) throw new InvalidArgumentError(`Unknown field ${field}`)

Expand Down Expand Up @@ -335,7 +335,7 @@ export function transformFields(
// as those will be transformed later when the query on the related field is processed.
const copied: Record<string, any> = { ...o }
Object.entries(o).forEach(([field, value]) => {
const pgType = fields.get(field)
const pgType = fields[field]
// Skip anything that's not an actual column on the table
if (pgType === undefined) return

Expand Down Expand Up @@ -363,7 +363,10 @@ export function isFilterObject(value: any): boolean {
* @returns A filtered object.
*/
function keepTableFieldsOnly(o: object, fields: Fields) {
return filterKeys(o, fields)
return filterKeys(o, {
...fields,
has: (x) => Object.hasOwn(fields, x),
})
}

/**
Expand Down
3 changes: 3 additions & 0 deletions clients/typescript/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export type { TableName, AnyTable, AnyTableSchema } from './model'
export { type DbSchema, createDbDescription } from './util/relations'
export * from './conversions'
4 changes: 2 additions & 2 deletions clients/typescript/src/client/model/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ export class Builder {
* The DAL will convert the string into a BigInt in the `fromSqlite` function from `../conversions/sqlite.ts`.
*/
private castBigIntToText(field: string) {
const pgType = this._tableDescription.fields.get(field)
const pgType = this._tableDescription.fields[field]
if (pgType === PgBasicType.PG_INT8 && this.dialect === 'SQLite') {
const quotedField = quoteIdentifier(field)
return `cast(${quotedField} as TEXT) AS ${quotedField}`
Expand Down Expand Up @@ -308,7 +308,7 @@ export class Builder {
// if field is of type BigInt cast the result to TEXT
// because not all adapters deal well with BigInts
// the DAL will convert the string into a BigInt in the `fromSqlite` function from `../conversions/sqlite.ts`.
const pgType = this._tableDescription.fields.get(field)
const pgType = this._tableDescription.fields[field]
if (pgType === PgBasicType.PG_INT8 && this.dialect === 'SQLite') {
// make a raw string and quote the field name ourselves
// because otherwise Squel would add quotes around the entire cast
Expand Down
131 changes: 101 additions & 30 deletions clients/typescript/src/client/model/client.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
import { ElectricNamespace } from '../../electric/namespace'
import { DbSchema, TableSchema } from './schema'
import { DbSchema, TableSchema, TableSchemas } from './schema'
import { rawQuery, liveRawQuery, unsafeExec, Table } from './table'
import { Row, Statement } from '../../util'
import {
QualifiedTablename,
ReplicatedRowTransformer,
Row,
Statement,
} from '../../util'
import { LiveResultContext } from './model'
import { Notifier } from '../../notifiers'
import { DatabaseAdapter } from '../../electric/adapter'
import { GlobalRegistry, Registry, Satellite } from '../../satellite'
import { ReplicationTransformManager } from './transforms'
import {
GlobalRegistry,
Registry,
Satellite,
ShapeSubscription,
} from '../../satellite'
import {
IReplicationTransformManager,
ReplicationTransformManager,
setReplicationTransform,
} from './transforms'
import { Dialect } from '../../migrators/query-builder/builder'
import { InputTransformer } from '../conversions/input'
import { sqliteConverter } from '../conversions/sqlite'
import { postgresConverter } from '../conversions/postgres'
import { IShapeManager } from './shapes'
import { ShapeInputWithTable, sync } from './sync'

export type ClientTables<DB extends DbSchema<any>> = {
[Tbl in keyof DB['tables']]: DB['tables'][Tbl] extends TableSchema<
Expand Down Expand Up @@ -96,25 +111,62 @@ interface RawQueries {
export class ElectricClient<
DB extends DbSchema<any>
> extends ElectricNamespace {
public sync: Omit<IShapeManager, 'subscribe'>
public sync: Omit<IShapeManager, 'subscribe'> & {
/**
* Subscribes to the given shape, returnig a {@link ShapeSubscription} object which
* can be used to wait for the shape to sync initial data.
*
* NOTE: If you establish a shape subscription that has already synced its initial data,
* awaiting `shape.synced` will always resolve immediately as shape subscriptions are persisted.
* i.e.: imagine that you re-sync the same shape during subsequent application loads.
* Awaiting `shape.synced` a second time will only ensure that the initial
* shape load is complete. It does not ensure that the replication stream
* has caught up to the central DB's more recent state.
*
* @param i - The shape to subscribe to
* @param key - An optional unique key that identifies the subscription
* @returns A shape subscription
*/
subscribe: (
i: ShapeInputWithTable,
key?: string
) => Promise<ShapeSubscription>
}

private constructor(
public db: ClientTables<DB> & RawQueries,
dbName: string,
private _dbDescription: DB,
adapter: DatabaseAdapter,
notifier: Notifier,
public readonly satellite: Satellite,
registry: Registry | GlobalRegistry
registry: Registry | GlobalRegistry,
private _replicationTransformManager: IReplicationTransformManager
) {
super(dbName, adapter, notifier, registry)
this.satellite = satellite
// Expose the Shape Sync API without additional properties
this.sync = {
syncStatus: this.satellite.syncStatus.bind(this.satellite),
subscribe: sync.bind(null, this.satellite, this._dbDescription),
unsubscribe: this.satellite.unsubscribe.bind(this.satellite),
}
}

setReplicationTransform<
T extends Record<string, unknown> = Record<string, unknown>
>(
qualifiedTableName: QualifiedTablename,
i: ReplicatedRowTransformer<T>
): void {
setReplicationTransform<T>(
this._dbDescription,
this._replicationTransformManager,
qualifiedTableName,
i
)
}

/**
* Connects to the Electric sync service.
* This method is idempotent, it is safe to call it multiple times.
Expand All @@ -136,7 +188,10 @@ export class ElectricClient<
this.satellite.clientDisconnect()
}

// Builds the DAL namespace from a `dbDescription` object
/**
* Builds the DAL namespace from a `dbDescription` object
* @param minimalDbDescription - A minimal description of the database schema can be provided in order to use Electric without the DAL.
*/
static create<DB extends DbSchema<any>>(
dbName: string,
dbDescription: DB,
Expand All @@ -154,30 +209,44 @@ export class ElectricClient<
)
const inputTransformer = new InputTransformer(converter)

const createTable = (tableName: string) => {
return new Table(
tableName,
adapter,
notifier,
satellite,
replicationTransformManager,
dbDescription,
inputTransformer,
dialect
)
}
// Check if we need to create the DAL
// If the schemas are missing from the `dbDescription``
// it means that the user did not generate the Electric client
// and thus we don't create the DAL.
// This is needed because we piggyback the minimal DB description (that is used without the DAL)
// on the same DB description argument as the one that is used with the DAL.
const ts: Array<[string, TableSchemas]> = Object.entries(
dbDescription.tables
)
const withDal = ts.length > 0 && ts[0][1].modelSchema !== undefined
let dal = {} as ClientTables<DB>

// Create all tables
const dal = Object.fromEntries(
Object.keys(tables).map((tableName) => {
return [tableName, createTable(tableName)]
})
) as ClientTables<DB>
if (withDal) {
const createTable = (tableName: string) => {
return new Table(
tableName,
adapter,
notifier,
satellite,
replicationTransformManager,
dbDescription,
inputTransformer,
dialect
)
}

// Now inform each table about all tables
Object.keys(dal).forEach((tableName) => {
dal[tableName].setTables(new Map(Object.entries(dal)))
})
// Create all tables
dal = Object.fromEntries(
Object.keys(tables).map((tableName) => {
return [tableName, createTable(tableName)]
})
) as ClientTables<DB>

// Now inform each table about all tables
Object.keys(dal).forEach((tableName) => {
dal[tableName].setTables(new Map(Object.entries(dal)))
})
}

const db: ClientTables<DB> & RawQueries = {
...dal,
Expand All @@ -191,10 +260,12 @@ export class ElectricClient<
return new ElectricClient(
db,
dbName,
dbDescription,
adapter,
notifier,
satellite,
registry
registry,
replicationTransformManager
)
}
}
8 changes: 7 additions & 1 deletion clients/typescript/src/client/model/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
export { ElectricClient } from './client'
export type { ClientTables } from './client'
export type { TableSchema } from './schema'
export type {
TableSchema,
TableSchemas,
TableName,
AnyTableSchema,
} from './schema'
export { DbSchema, Relation } from './schema'
export { Table } from './table'
export type { AnyTable } from './table'
export type { HKT } from '../util/hkt'
export type { SyncStatus } from './shapes'
20 changes: 15 additions & 5 deletions clients/typescript/src/client/model/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export type TableName = string
export type FieldName = string
export type RelationName = string

export type Fields = Map<FieldName, PgType>
export type Fields = Record<FieldName, PgType>

export type TableSchema<
T extends Record<string, any>,
Expand Down Expand Up @@ -76,11 +76,21 @@ export type ExtendedTableSchema<
incomingRelations: Relation[]
}

export type TableSchemas = Record<
TableName,
TableSchema<any, any, any, any, any, any, any, any, any, HKT>
export type AnyTableSchema = TableSchema<
any,
any,
any,
any,
any,
any,
any,
any,
any,
HKT
>

export type TableSchemas = Record<TableName, AnyTableSchema>

export type ExtendedTableSchemas = Record<
TableName,
ExtendedTableSchema<any, any, any, any, any, any, any, any, any, HKT>
Expand Down Expand Up @@ -190,7 +200,7 @@ export class DbSchema<T extends TableSchemas> {
}

getFieldNames(table: TableName): FieldName[] {
return Array.from(this.getFields(table).keys())
return Array.from(Object.keys(this.getFields(table)))
}

hasRelationForField(table: TableName, field: FieldName): boolean {
Expand Down
Loading

0 comments on commit 837ce92

Please sign in to comment.