Skip to content

Commit

Permalink
chore (client): use async event emitter in Satellite client (#696)
Browse files Browse the repository at this point in the history
## Problem
The Satellite client uses an `EventEmitter` to emit events to which the
Satellite process is subscribed. However, the Satellite process
registers asynchronous event listeners but the `EventEmitter` calls them
synchronously. Hence, async listeners for an event may run concurrently
and a new event may be emitted while async listeners of the previous
event are still running.

An example in NodeJS:

```ts
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
emitter.on('transaction', (tx) => {
  console.log('listener 1 start: ' + tx);
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log('listener 1 end: ' + tx);
      resolve()
    }, 10)
  })
})

emitter.emit('transaction', 'tx1');
emitter.emit('transaction', 'tx2');
/*
   Prints:
     listener 1 start: tx1
     listener 1 start: tx2
     listener 1 end: tx1
     listener 1 end: tx2
 */

emitter.on('transaction', (tx) => {
  console.log('listener 2 start: ' + tx);
})

emitter.emit('transaction');

/*
   Prints:
     listener 1 start: tx3
     listener 2 start: tx3
     listener 1 end: tx3
 */
```

The first example above shows interleaving of async listeners on an
event.
The second example shows interleaving of listeners for distinct events.

## Solution
Implemented an `AsyncEventEmitter` that awaits all listeners before
processing the next event. Modified the Satellite client to use the
`AsyncEventEmitter` instead of the regular `EventEmitter`. In addition,
the `AsyncEventEmitter` is fully typed, inspired by the typings of the
[typed-emitter package](https://www.npmjs.com/package/typed-emitter).

Note that, we start all listeners for an event in order **without
awaiting**. Hence, async listeners registered for an event may run
concurrently. However, we await all listeners before processing the next
event.

With the new `AsyncEventEmitter` we get this behavior:

```ts
const emitter = new AsyncEventEmitter();
emitter.on('transaction', (tx) => {
  console.log('listener 1 start: ' + tx);
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log('listener 1 end: ' + tx);
      resolve()
    }, 10)
  })
})

emitter.enqueueEmit('transaction', 'tx1');
emitter.enqueueEmit('transaction', 'tx2');
/*
   Prints:
     listener 1 start: tx1
     listener 1 end: tx1
     listener 1 start: tx2
     listener 1 end: tx2
 */

emitter.on('transaction', (tx) => {
  console.log('listener 2 start: ' + tx);
})

emitter.enqueueEmit('transaction');

/*
   Prints:
     listener 1 start: tx3
     listener 2 start: tx3
     listener 1 end: tx3
 */
```

The first example above shows that events are processed in order only
after all listeners have finished.
The second example shows that listeners for a given event are started in
order one after the other without awaiting.

Note: we renamed `emit` to `enqueueEmit` to avoid confusion with the
classical `EventEmitter`'s `emit` function which has different semantics
as it does not await all listeners before processing the next event.
  • Loading branch information
kevin-dp authored Dec 13, 2023
1 parent 22652fb commit 9ffb11a
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 58 deletions.
5 changes: 5 additions & 0 deletions .changeset/gold-eagles-talk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Modify Satellite client to use async event emitter.
59 changes: 19 additions & 40 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import {
} from '../util/proto'
import { PROTOCOL_VSN, Socket, SocketFactory } from '../sockets/index'
import _m0 from 'protobufjs/minimal.js'
import { EventEmitter } from 'events'
import {
AuthResponse,
DataChangeType,
Expand All @@ -62,7 +61,6 @@ import {
StopReplicationResponse,
ErrorCallback,
RelationCallback,
IncomingTransactionCallback,
OutboundStartedCallback,
TransactionCallback,
} from '../util/types'
Expand Down Expand Up @@ -93,6 +91,7 @@ import { RPC, rpcRespond, withRpcRequestLogging } from './RPC'
import { Mutex } from 'async-mutex'
import { DbSchema } from '../client/model'
import { PgBasicType, PgDateType, PgType } from '../client/conversions/types'
import { AsyncEventEmitter } from '../util'

type IncomingHandler = (msg: any) => void

Expand All @@ -105,38 +104,18 @@ const subscriptionError = [
SatelliteErrorCode.SHAPE_DELIVERY_ERROR,
]

interface SafeEventEmitter {
on(event: 'error', callback: ErrorCallback): this
on(event: 'relation', callback: RelationCallback): this
on(event: 'transaction', callback: IncomingTransactionCallback): this
on(event: 'outbound_started', callback: OutboundStartedCallback): this

emit(event: 'error', error: SatelliteError): boolean
emit(event: 'relation', relation: Relation): boolean
emit(
event: 'transaction',
transaction: Transaction,
ackCb: () => void
): boolean
emit(event: 'outbound_started', lsn: LSN): boolean

removeListener(event: 'error', callback: ErrorCallback): void
removeListener(event: 'relation', callback: RelationCallback): void
removeListener(event: 'transaction', callback: TransactionCallback): void
removeListener(
event: 'outbound_started',
callback: OutboundStartedCallback
): void

removeAllListeners(): void

listenerCount(event: 'error'): number
type Events = {
error: (error: SatelliteError) => void
relation: (relation: Relation) => void
transaction: (transaction: Transaction, ackCb: () => void) => Promise<void>
outbound_started: () => void
}
type EventEmitter = AsyncEventEmitter<Events>

export class SatelliteClient implements Client {
private opts: Required<SatelliteClientOpts>

private emitter: SafeEventEmitter
private emitter: EventEmitter

private socketFactory: SocketFactory
private socket?: Socket
Expand Down Expand Up @@ -184,7 +163,7 @@ export class SatelliteClient implements Client {
socketFactory: SocketFactory,
opts: SatelliteClientOpts
) {
this.emitter = new EventEmitter() as SafeEventEmitter
this.emitter = new AsyncEventEmitter<Events>()

this.opts = { ...satelliteClientDefaults, ...opts }
this.socketFactory = socketFactory
Expand Down Expand Up @@ -256,14 +235,14 @@ export class SatelliteClient implements Client {
`socket error but no listener is attached: ${error.message}`
)
}
this.emitter.emit('error', error)
this.emitter.enqueueEmit('error', error)
})
this.socket.onClose(() => {
this.disconnect()
if (this.emitter.listenerCount('error') === 0) {
Log.error(`socket closed but no listener is attached`)
}
this.emitter.emit(
this.emitter.enqueueEmit(
'error',
new SatelliteError(SatelliteErrorCode.SOCKET_ERROR, 'socket closed')
)
Expand Down Expand Up @@ -698,10 +677,10 @@ export class SatelliteClient implements Client {
{ leading: true, trailing: true }
)

this.emitter.emit('outbound_started', message.lsn)
this.emitter.enqueueEmit('outbound_started')
return SatInStartReplicationResp.create()
} else {
this.emitter.emit(
this.emitter.enqueueEmit(
'error',
new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
Expand All @@ -726,7 +705,7 @@ export class SatelliteClient implements Client {

return SatInStopReplicationResp.create()
} else {
this.emitter.emit(
this.emitter.enqueueEmit(
'error',
new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
Expand Down Expand Up @@ -756,7 +735,7 @@ export class SatelliteClient implements Client {

private handleRelation(message: SatRelation) {
if (this.inbound.isReplicating !== ReplicationStatus.ACTIVE) {
this.emitter.emit(
this.emitter.enqueueEmit(
'error',
new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
Expand All @@ -782,7 +761,7 @@ export class SatelliteClient implements Client {
}

this.inbound.relations.set(relation.id, relation)
this.emitter.emit('relation', relation)
this.emitter.enqueueEmit('relation', relation)
}

private handleTransaction(message: SatOpLog) {
Expand All @@ -800,7 +779,7 @@ export class SatelliteClient implements Client {
}

private handleError(error: SatErrorResp) {
this.emitter.emit('error', serverErrorToSatelliteError(error))
this.emitter.enqueueEmit('error', serverErrorToSatelliteError(error))
}

private handleSubscription(msg: SatSubsResp): SubscribeResponse {
Expand Down Expand Up @@ -877,7 +856,7 @@ export class SatelliteClient implements Client {
if (error instanceof SatelliteError) {
// subscription errors are emitted through specific event
if (!subscriptionError.includes(error.code)) {
this.emitter.emit('error', error)
this.emitter.enqueueEmit('error', error)
}
} else {
// This is an unexpected runtime error
Expand Down Expand Up @@ -910,7 +889,7 @@ export class SatelliteClient implements Client {
origin,
migrationVersion,
}
this.emitter.emit(
this.emitter.enqueueEmit(
'transaction',
transaction,
() => (this.inbound.last_lsn = transaction.lsn)
Expand Down
25 changes: 19 additions & 6 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import { Client, ConnectionWrapper, Satellite } from './index'
import { SatelliteOpts, SatelliteOverrides, satelliteDefaults } from './config'
import { BaseRegistry } from './registry'
import { SocketFactory } from '../sockets'
import { EventEmitter } from 'events'
import { DEFAULT_LOG_POS, subsDataErrorToSatelliteError, base64 } from '../util'
import {
DEFAULT_LOG_POS,
subsDataErrorToSatelliteError,
base64,
AsyncEventEmitter,
} from '../util'
import { bytesToNumber, uuid } from '../util/common'
import { generateTag } from './oplog'
import {
Expand Down Expand Up @@ -136,7 +140,16 @@ export class MockRegistry extends BaseRegistry {
}
}

export class MockSatelliteClient extends EventEmitter implements Client {
type Events = {
[SUBSCRIPTION_DELIVERED]: (data: SubscriptionData) => void
[SUBSCRIPTION_ERROR]: (error: SatelliteError, subscriptionId: string) => void
outbound_started: OutboundStartedCallback
error: ErrorCallback
}
export class MockSatelliteClient
extends AsyncEventEmitter<Events>
implements Client
{
isDown = false
replicating = false
disconnected = true
Expand Down Expand Up @@ -215,7 +228,7 @@ export class MockSatelliteClient extends EventEmitter implements Client {

return new Promise((resolve) => {
const emit = () => {
this.emit(SUBSCRIPTION_DELIVERED, {
this.enqueueEmit(SUBSCRIPTION_DELIVERED, {
subscriptionId,
lsn: base64.toBytes('MTIz'), // base64.encode("123")
data,
Expand Down Expand Up @@ -303,7 +316,7 @@ export class MockSatelliteClient extends EventEmitter implements Client {
this.replicating = true
this.inboundAck = lsn

const t = setTimeout(() => this.emit('outbound_started'), 100)
const t = setTimeout(() => this.enqueueEmit('outbound_started'), 100)
this.timeouts.push(t)

if (lsn && bytesToNumber(lsn) == MOCK_BEHIND_WINDOW_LSN) {
Expand Down Expand Up @@ -376,7 +389,7 @@ export class MockSatelliteClient extends EventEmitter implements Client {
})

const satError = subsDataErrorToSatelliteError(satSubsError)
this.emit(SUBSCRIPTION_ERROR, satError, subscriptionId)
this.enqueueEmit(SUBSCRIPTION_ERROR, satError, subscriptionId)
}, timeout)
}
}
1 change: 0 additions & 1 deletion clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ export class SatelliteProcess implements Satellite {
setClientListeners(): void {
this.client.subscribeToError(this._handleClientError.bind(this))
this.client.subscribeToRelations(this._updateRelations.bind(this))
// FIXME: calling an async function in an event emitter
this.client.subscribeToTransactions(this._applyTransaction.bind(this))
this.client.subscribeToOutboundStarted(this._throttledSnapshot.bind(this))

Expand Down
Loading

0 comments on commit 9ffb11a

Please sign in to comment.