Skip to content

Commit

Permalink
fix (client): fix race condition in process.subscribe that made the c…
Browse files Browse the repository at this point in the history
…lient crash (#702)

This PR fixes the race condition described in #426.
The problem was that `process.subscribe` was creating a promise, storing
it, making an async call, fetching the promise and returning it. But,
depending on the order in which messages are received from Electric, the
async call may resolve after the subscription was already delivered and
hence the promise had already been deleted. The fix consists of storing
the promise in a local variable inside `process.subscribe` such that it
doesn't need to fetch the promise after the async call.
  • Loading branch information
kevin-dp authored Nov 29, 2023
1 parent 0dc6166 commit 3ed5469
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 21 deletions.
5 changes: 5 additions & 0 deletions .changeset/nine-boats-burn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fix race condition in process.subscribe that made the client crash.
29 changes: 24 additions & 5 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ export class MockSatelliteClient extends EventEmitter implements Client {

relationData: Record<string, DataRecord[]> = {}

deliverFirst = false

setRelations(relations: RelationsCache): void {
this.relations = relations
if (this.relationsCb) {
Expand All @@ -169,6 +171,10 @@ export class MockSatelliteClient extends EventEmitter implements Client {
data.push(record)
}

enableDeliverFirst() {
this.deliverFirst = true
}

subscribe(
subscriptionId: string,
shapes: ShapeRequest[]
Expand Down Expand Up @@ -208,18 +214,31 @@ export class MockSatelliteClient extends EventEmitter implements Client {
}

return new Promise((resolve) => {
setTimeout(() => {
const emit = () => {
this.emit(SUBSCRIPTION_DELIVERED, {
subscriptionId,
lsn: base64.toBytes('MTIz'), // base64.encode("123")
data,
shapeReqToUuid,
} as SubscriptionData)
}, 1)
}

resolve({
subscriptionId,
})
const resolveProm = () => {
resolve({
subscriptionId,
})
}

if (this.deliverFirst) {
// When the `deliverFirst` flag is set,
// we deliver the subscription before resolving the promise.
emit()
setTimeout(resolveProm, 1)
} else {
// Otherwise, we resolve the promise before delivering the subscription.
setTimeout(emit, 1)
resolveProm()
}
})
}

Expand Down
42 changes: 28 additions & 14 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,25 +418,39 @@ export class SatelliteProcess implements Satellite {
// and this resolver and rejecter would not yet be stored
// this could especially happen in unit tests
this.subscriptionNotifiers[subId] = emptyPromise()

const { subscriptionId, error }: SubscribeResponse =
await this.client.subscribe(subId, shapeReqs)
if (subId !== subscriptionId) {
// store the promise because by the time the
// `await this.client.subscribe(subId, shapeReqs)` call resolves
// the `subId` entry in the `subscriptionNotifiers` may have been deleted
// so we can no longer access it
const subProm = this.subscriptionNotifiers[subId].promise

// `clearSubAndThrow` deletes the listeners and cancels the subscription
const clearSubAndThrow = (error: any): never => {
delete this.subscriptionNotifiers[subId]
this.subscriptions.subscriptionCancelled(subId)
throw new Error(
`Expected SubscripeResponse for subscription id: ${subId} but got it for another id: ${subscriptionId}`
)
}

if (error) {
delete this.subscriptionNotifiers[subscriptionId]
this.subscriptions.subscriptionCancelled(subscriptionId)
throw error
}

return {
synced: this.subscriptionNotifiers[subId].promise,
try {
const { subscriptionId, error }: SubscribeResponse =
await this.client.subscribe(subId, shapeReqs)
if (subId !== subscriptionId) {
clearSubAndThrow(
new Error(
`Expected SubscripeResponse for subscription id: ${subId} but got it for another id: ${subscriptionId}`
)
)
}

if (error) {
clearSubAndThrow(error)
}

return {
synced: subProm,
}
} catch (error: any) {
return clearSubAndThrow(error)
}
}

Expand Down
40 changes: 38 additions & 2 deletions clients/typescript/test/satellite/process.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import anyTest, { TestFn } from 'ava'
import {
MOCK_BEHIND_WINDOW_LSN,
MOCK_INTERNAL_ERROR,
MockSatelliteClient,
} from '../../src/satellite/mock'
import { QualifiedTablename } from '../../src/util/tablename'
import { sleepAsync } from '../../src/util/timer'
Expand Down Expand Up @@ -1340,6 +1341,38 @@ test('apply shape data and persist subscription', async (t) => {
}
})

test('(regression) shape subscription succeeds even if subscription data is delivered before the SatSubsReq RPC call receives its SatSubsResp answer', async (t) => {
const { client, satellite } = t.context
const { runMigrations, authState } = t.context
await runMigrations()

const tablename = 'parent'

// relations must be present at subscription delivery
client.setRelations(relations)
client.setRelationData(tablename, parentRecord)

const conn = await satellite.start(authState)
await conn.connectionPromise

const shapeDef: ClientShapeDefinition = {
selects: [{ tablename }],
}

satellite!.relations = relations

// Enable the deliver first flag in the mock client
// such that the subscription data is delivered before the
// subscription promise is resolved
const mockClient = satellite.client as MockSatelliteClient
mockClient.enableDeliverFirst()

const { synced } = await satellite.subscribe([shapeDef])
await synced

t.pass()
})

test('multiple subscriptions for the same shape are deduplicated', async (t) => {
const { client, satellite } = t.context
const { runMigrations, authState } = t.context
Expand Down Expand Up @@ -1536,9 +1569,8 @@ test('a single subscribe with multiple tables with FKs', async (t) => {
}

satellite!.relations = relations
await satellite.subscribe([shapeDef1, shapeDef2])

return new Promise<void>((res, rej) => {
const prom = new Promise<void>((res, rej) => {
client.subscribeToSubscriptionEvents(
(data: SubscriptionData) => {
// child is applied first
Expand All @@ -1561,6 +1593,10 @@ test('a single subscribe with multiple tables with FKs', async (t) => {
() => undefined
)
})

await satellite.subscribe([shapeDef1, shapeDef2])

return prom
})

test.serial('a shape delivery that triggers garbage collection', async (t) => {
Expand Down

0 comments on commit 3ed5469

Please sign in to comment.