Skip to content

Commit

Permalink
fix(satellite): cannot catch up to the servers current state (#319)
Browse files Browse the repository at this point in the history
Fixes issues with server reconnection after server is gone or client lags behind wal-cached window.
- handles BEHIND_WINDOW error
- clears local state and reconnect with server
- recovers current subscription
  • Loading branch information
balegas committed Aug 22, 2023
1 parent ea4a464 commit 8209293
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 143 deletions.
5 changes: 5 additions & 0 deletions .changeset/young-cougars-marry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fixed reconnect issues when the client is too far behind the server
4 changes: 3 additions & 1 deletion clients/typescript/src/migrators/bundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ export class BundleMigrator implements Migrator {

if (migration.version !== version) {
throw new Error(
`Migrations cannot be altered once applied: expecting ${version} at index ${i}.`
`Local migrations ${version} does not match server version ${migration.version}.\
This is an unrecoverable error. Please clear your local storage and try again.\
Check documentation (https://electric-sql.com/docs/reference/limitations) to learn more.`
)
}
})
Expand Down
141 changes: 94 additions & 47 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import {
SchemaChange,
OutgoingReplication,
Transaction,
StartReplicationResponse,
StopReplicationResponse,
} from '../util/types'
import {
base64,
Expand Down Expand Up @@ -89,7 +91,13 @@ import { setMaskBit, getMaskBit } from '../util/bitmaskHelpers'
type IncomingHandler = {
handle: (
msg: any
) => void | AuthResponse | SubscribeResponse | UnsubscribeResponse
) =>
| void
| AuthResponse
| StartReplicationResponse
| StopReplicationResponse
| SubscribeResponse
| UnsubscribeResponse
isRpc: boolean
}

Expand Down Expand Up @@ -238,6 +246,10 @@ export class SatelliteClient extends EventEmitter implements Client {
connect(
retryHandler?: (error: any, attempt: number) => boolean
): Promise<void> {
if (!this.isClosed) {
this.close()
}

this.initializing = emptyPromise()

const connectPromise = new Promise<void>((resolve, reject) => {
Expand Down Expand Up @@ -286,12 +298,12 @@ export class SatelliteClient extends EventEmitter implements Client {
return backOff(() => connectPromise, retryPolicy).catch((e) => {
// We're very sure that no calls are going to modify `this.initializing` before this promise resolves
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.initializing!.reject(e)
this.initializing?.reject(e)
throw e
})
}

close(): Promise<void> {
close() {
Log.info('closing client')

this.outbound = this.resetReplication(
Expand All @@ -317,8 +329,6 @@ export class SatelliteClient extends EventEmitter implements Client {
this.socket!.closeAndRemoveListeners()
this.socket = undefined
}

return Promise.resolve()
}

isClosed(): boolean {
Expand All @@ -329,7 +339,7 @@ export class SatelliteClient extends EventEmitter implements Client {
lsn?: LSN,
schemaVersion?: string,
subscriptionIds?: string[]
): Promise<void> {
): Promise<StartReplicationResponse> {
if (this.inbound.isReplicating !== ReplicationStatus.STOPPED) {
return Promise.reject(
new SatelliteError(
Expand All @@ -340,7 +350,7 @@ export class SatelliteClient extends EventEmitter implements Client {
}

// Perform validations and prepare the request
let request
let request: SatInStartReplicationReq
if (!lsn || lsn.length == 0) {
Log.info(`no previous LSN, start replication from scratch`)
if (subscriptionIds && subscriptionIds.length > 0) {
Expand All @@ -353,23 +363,39 @@ export class SatelliteClient extends EventEmitter implements Client {
}
request = SatInStartReplicationReq.fromPartial({ schemaVersion })
} else {
Log.info(`starting replication with lsn: ${base64.fromBytes(lsn)}`)
Log.info(
`starting replication with lsn: ${base64.fromBytes(
lsn
)} subscriptions: ${subscriptionIds}`
)
request = SatInStartReplicationReq.fromPartial({ lsn, subscriptionIds })
}

// Then set the replication state
const promise = this.rpc<void>(request)
.then(() => this.initializing?.resolve())
this.inbound = this.resetReplication(lsn, lsn, ReplicationStatus.STARTING)
return this.rpc<StartReplicationResponse, SatInStartReplicationReq>(request)
.then((resp) => {
// FIXME: process handles BEHIND_WINDOW, we cant reject or resolve
// initializing. If process returns without triggering another
// RPC, initializing will never resolve.
// We shall improve this code to make no assumptions on how
// process handles errors
if (resp.error) {
if (resp.error.code !== SatelliteErrorCode.BEHIND_WINDOW) {
this.initializing?.reject(resp.error)
}
} else {
this.initializing?.resolve()
}
return resp
})
.catch((e) => {
this.initializing?.reject(e)
throw e
})
this.inbound = this.resetReplication(lsn, lsn, ReplicationStatus.STARTING)

return promise
}

stopReplication(): Promise<void> {
stopReplication(): Promise<StopReplicationResponse> {
if (this.inbound.isReplicating !== ReplicationStatus.ACTIVE) {
return Promise.reject(
new SatelliteError(
Expand Down Expand Up @@ -636,23 +662,25 @@ export class SatelliteClient extends EventEmitter implements Client {
return { serverId, error }
}

private handleStartResp(resp: SatInStartReplicationResp): void {
private handleStartResp(
resp: SatInStartReplicationResp
): StartReplicationResponse {
if (this.inbound.isReplicating == ReplicationStatus.STARTING) {
if (resp.err) {
this.inbound.isReplicating = ReplicationStatus.STOPPED
this.emit('error', startReplicationErrorToSatelliteError(resp.err))
return { error: startReplicationErrorToSatelliteError(resp.err) }
} else {
this.inbound.isReplicating = ReplicationStatus.ACTIVE
}
} else {
this.emit(
'error',
new SatelliteError(
return {
error: new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
`unexpected state ${this.inbound.isReplicating} handling 'start' response`
)
)
),
}
}
return {}
}

private handleStartReq(message: SatInStartReplicationReq) {
Expand Down Expand Up @@ -722,17 +750,17 @@ export class SatelliteClient extends EventEmitter implements Client {
}
}

private handleStopResp() {
private handleStopResp(): StopReplicationResponse {
if (this.inbound.isReplicating == ReplicationStatus.STOPPING) {
this.inbound.isReplicating = ReplicationStatus.STOPPED
} else {
this.emit(
'error',
new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
`unexpected state ${this.inbound.isReplicating} handling 'stop' response`
)
)
return {}
}

return {
error: new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
`unexpected state ${this.inbound.isReplicating} handling 'stop' response`
),
}
}

Expand Down Expand Up @@ -798,9 +826,15 @@ export class SatelliteClient extends EventEmitter implements Client {
}

private handleError(error: SatErrorResp) {
// TODO: this causing intermittent issues in tests because
// no one might catch this error. We shall pass this information
// as part of connectivity state
this.emit(
'error',
new Error(`server replied with error code: ${error.errorType}`)
new SatelliteError(
SatelliteErrorCode.SERVER_ERROR,
`server replied with error code: ${error.errorType}`
)
)
}

Expand Down Expand Up @@ -846,17 +880,26 @@ export class SatelliteClient extends EventEmitter implements Client {
const messageOrError = toMessage(data)
if (Log.getLevel() <= 1 && !(messageOrError instanceof SatelliteError))
Log.debug(`[proto] recv: ${msgToString(messageOrError)}`)
if (messageOrError instanceof Error) {
this.emit('error', messageOrError)
} else {
const handler = this.handlerForMessageType[messageOrError.$type]
try {
const response = handler.handle(messageOrError)
if (handler.isRpc) {
this.emit('rpc_response', response)
let handleIsRpc = false
try {
const message = toMessage(data)
const handler = this.handlerForMessageType[message.$type]
const response = handler.handle(message)
if ((handleIsRpc = handler.isRpc)) {
this.emit('rpc_response', response)
}
} catch (error) {
if (error instanceof SatelliteError) {
if (handleIsRpc) {
this.emit('error', error)
}
} catch (error) {
Log.warn(`uncaught errors while processing incoming message: ${error}`)
// no one is watching this error
Log.warn(
`unhandled satellite errors while processing incoming message: ${error}`
)
} else {
// This is an unexpected runtime error
throw error
}
}
}
Expand Down Expand Up @@ -1017,15 +1060,17 @@ export class SatelliteClient extends EventEmitter implements Client {
}, this.opts.timeout)

// reject on any error
this.once('error', (error: SatelliteError) => {
const errorListener = (error: any) => {
return reject(error)
})
}

this.once('error', errorListener)
if (distinguishOn) {
const handleRpcResp = (resp: T) => {
// TODO: remove this comment when RPC types are fixed
// @ts-ignore this comparison is valid because we expect the same field to be present on both request and response, but it's too much work at the moment to rewrite typings for it
if (resp[distinguishOn] === request[distinguishOn]) {
this.removeListener('error', errorListener)
return resolve(resp)
} else {
// This WAS an RPC response, but not the one we were expecting, waiting more
Expand All @@ -1034,9 +1079,11 @@ export class SatelliteClient extends EventEmitter implements Client {
}
this.once('rpc_response', handleRpcResp)
} else {
this.once('rpc_response', (resp: T) => resolve(resp))
this.once('rpc_response', (resp: T) => {
this.removeListener('error', errorListener)
resolve(resp)
})
}

this.sendMessage(request)
}).finally(() => clearTimeout(waitingFor))
}
Expand Down Expand Up @@ -1141,12 +1188,12 @@ function serializeNullData(): Uint8Array {
return typeEncoder.text('')
}

export function toMessage(data: Uint8Array): SatPbMsg | SatelliteError {
export function toMessage(data: Uint8Array): SatPbMsg {
const code = data[0]
const type = getTypeFromCode(code)
const obj = getObjFromString(type)
if (obj == undefined) {
return new SatelliteError(
throw new SatelliteError(
SatelliteErrorCode.UNEXPECTED_MESSAGE_TYPE,
`${code})`
)
Expand Down
3 changes: 3 additions & 0 deletions clients/typescript/src/satellite/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export interface SatelliteOpts {
pollingInterval: number
/** Throttle snapshotting to once per `minSnapshotWindow` milliseconds. */
minSnapshotWindow: number
/** On reconnect, clear client's state if cannot catch up with Electric buffered WAL*/
clearOnBehindWindow: boolean
}

export interface SatelliteOverrides {
Expand All @@ -34,6 +36,7 @@ export const satelliteDefaults: SatelliteOpts = {
shadowTable: new QualifiedTablename('main', '_electric_shadow'),
pollingInterval: 2000,
minSnapshotWindow: 40,
clearOnBehindWindow: true,
}

export const satelliteClientDefaults = {
Expand Down
15 changes: 6 additions & 9 deletions clients/typescript/src/satellite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
DataTransaction,
Transaction,
Relation,
StartReplicationResponse,
StopReplicationResponse,
} from '../util/types'
import {
ClientShapeDefinition,
Expand Down Expand Up @@ -47,8 +49,6 @@ export type ConnectionWrapper = {
connectionPromise: Promise<void | Error>
}

export type SatelliteReplicationOptions = { clearOnBehindWindow: boolean }

// `Satellite` is the main process handling ElectricSQL replication,
// processing the opslog and notifying when there are data changes.
export interface Satellite {
Expand All @@ -60,10 +60,7 @@ export interface Satellite {

connectivityState?: ConnectivityState

start(
authConfig: AuthConfig,
opts?: SatelliteReplicationOptions
): Promise<ConnectionWrapper>
start(authConfig: AuthConfig): Promise<ConnectionWrapper>
stop(): Promise<void>
subscribe(
shapeDefinitions: ClientShapeDefinition[]
Expand All @@ -75,15 +72,15 @@ export interface Client {
connect(
retryHandler?: (error: any, attempt: number) => boolean
): Promise<void>
close(): Promise<void>
close(): void
authenticate(authState: AuthState): Promise<AuthResponse>
isClosed(): boolean
startReplication(
lsn?: LSN,
schemaVersion?: string,
subscriptionIds?: string[]
): Promise<void>
stopReplication(): Promise<void>
): Promise<StartReplicationResponse>
stopReplication(): Promise<StopReplicationResponse>
subscribeToRelations(callback: (relation: Relation) => void): void
subscribeToTransactions(
callback: (transaction: Transaction) => Promise<void>
Expand Down
Loading

0 comments on commit 8209293

Please sign in to comment.