Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore (client): use async event emitter in Satellite client #696

Merged
merged 7 commits into from
Dec 13, 2023

Conversation

kevin-dp
Copy link
Contributor

@kevin-dp kevin-dp commented Nov 23, 2023

Problem

The Satellite client uses an EventEmitter to emit events to which the Satellite process is subscribed. However, the Satellite process registers asynchronous event listeners but the EventEmitter calls them synchronously. Hence, async listeners for an event may run concurrently and a new event may be emitted while async listeners of the previous event are still running.

An example in NodeJS:

const EventEmitter = require('node:events');
const emitter = new EventEmitter();
emitter.on('transaction', (tx) => {
  console.log('listener 1 start: ' + tx);
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log('listener 1 end: ' + tx);
      resolve()
    }, 10)
  })
})

emitter.emit('transaction', 'tx1');
emitter.emit('transaction', 'tx2');
/*
   Prints:
     listener 1 start: tx1
     listener 1 start: tx2
     listener 1 end: tx1
     listener 1 end: tx2
 */

emitter.on('transaction', (tx) => {
  console.log('listener 2 start: ' + tx);
})

emitter.emit('transaction');

/*
   Prints:
     listener 1 start: tx3
     listener 2 start: tx3
     listener 1 end: tx3
 */

The first example above shows interleaving of async listeners on an event.
The second example shows interleaving of listeners for distinct events.

Solution

Implemented an AsyncEventEmitter that awaits all listeners before processing the next event. Modified the Satellite client to use the AsyncEventEmitter instead of the regular EventEmitter. In addition, the AsyncEventEmitter is fully typed, inspired by the typings of the typed-emitter package.

Note that, we start all listeners for an event in order without awaiting. Hence, async listeners registered for an event may run concurrently. However, we await all listeners before processing the next event.

With the new AsyncEventEmitter we get this behavior:

const emitter = new AsyncEventEmitter();
emitter.on('transaction', (tx) => {
  console.log('listener 1 start: ' + tx);
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log('listener 1 end: ' + tx);
      resolve()
    }, 10)
  })
})

emitter.enqueueEmit('transaction', 'tx1');
emitter.enqueueEmit('transaction', 'tx2');
/*
   Prints:
     listener 1 start: tx1
     listener 1 end: tx1
     listener 1 start: tx2
     listener 1 end: tx2
 */

emitter.on('transaction', (tx) => {
  console.log('listener 2 start: ' + tx);
})

emitter.enqueueEmit('transaction');

/*
   Prints:
     listener 1 start: tx3
     listener 2 start: tx3
     listener 1 end: tx3
 */

The first example above shows that events are processed in order only after all listeners have finished.
The second example shows that listeners for a given event are started in order one after the other without awaiting.

Note: we renamed emit to enqueueEmit to avoid confusion with the classical EventEmitter's emit function which has different semantics as it does not await all listeners before processing the next event.

Copy link

linear bot commented Nov 23, 2023

VAX-937 Fix client to process events that are async functions

Look for this comment ```// FIXME: calling an async function in an event emitter```

@kevin-dp kevin-dp force-pushed the kevindp/vax-937-async-event-handlers branch from 6e054c9 to 5528a01 Compare November 23, 2023 09:25
Copy link
Contributor

@balegas balegas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs more discussion

removeAllListeners(): void

listenerCount(event: 'error'): number
type Events = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better than the code above, but I'd prefer to reuse the defined the callbacks (TransactionCallback, etc) than declaring them again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean here. Do you mean to use TransactionCallback as the type for the ackCb parameter for the transaction event below?
TransactionCallback is not the same type as () => void.

clients/typescript/src/satellite/mock.ts Outdated Show resolved Hide resolved
clients/typescript/src/satellite/client.ts Show resolved Hide resolved
@kevin-dp kevin-dp force-pushed the kevindp/vax-937-async-event-handlers branch from aaefd4e to 9e97b52 Compare November 27, 2023 07:55
Copy link
Contributor

@icehaunter icehaunter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice work. A couple of questions but nothing blocking -- address at your discretion. Only thing I would ask you to acknowledge explicitly is the error handling comment

error: ErrorCallback
}
export class MockSatelliteClient
extends AsyncEventEmitter<Events>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious as to why this is extends here but a separate private instance on the real client? I'd generally argue for exposing typed on methods on the client itself and doing away with subscribeToSmth method pattern, however that might be out of scope here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was already like that before so i left it how it was, only changed the EventEmitter to use AsyncEventEmitter. Your proposed refactoring would indeed make the code cleaner and more idiomatic but should probably be addressed in a separate PR as you suggest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can shed some light on why mock still extends event emitter. The client used to be like that, but I refactored it to not extend event emitter anymore. I might have overseen the mock.

clients/typescript/src/util/asyncEventEmitter.ts Outdated Show resolved Hide resolved
/**
* @returns An array listing the events for which the emitter has registered listeners.
*/
eventNames(): (keyof Events | string | symbol)[] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method even here if all we're doing with it is testing it, not even using it in other tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because AsyncEventEmitter implements an event emitter interface which typically provides an eventNames function. So i implemented it in case we need it later.

clients/typescript/test/util/asyncEventEmitter.test.ts Outdated Show resolved Hide resolved
clients/typescript/test/util/asyncEventEmitter.test.ts Outdated Show resolved Hide resolved
@kevin-dp kevin-dp force-pushed the kevindp/vax-937-async-event-handlers branch from 8a735a8 to bb0d968 Compare December 13, 2023 09:22
@kevin-dp kevin-dp merged commit 9ffb11a into main Dec 13, 2023
7 checks passed
@kevin-dp kevin-dp deleted the kevindp/vax-937-async-event-handlers branch December 13, 2023 10:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants