From e091bbfb95cb7af2c0bdfe8b12427b1f45e14ae9 Mon Sep 17 00:00:00 2001 From: Kevin Date: Mon, 4 Dec 2023 16:43:28 +0100 Subject: [PATCH] chore (client): refactoring subscribe methods of the event notifier (#708) This PR refactors the various subscribe methods provided by the event notifier. For every listener that was being subscribed, the event notifier would generate a random key, store the listener under that key, and return that key. To unsubscribe, the user had to pass that key to the corresponding unsubscribe method which would lookup the listener and then unsubscribe it from the underlying event emitter. Storing the listener under a key is not needed and is a bit redundant as the underlying event emitter already stores them. This PR modified the event notifier such that it does not keep track of registered callbacks but instead returns an unsubscribe function from the subscribe methods. The unsubscribe function closes over the callback that needs to be unregistered from the underlying event emitter. --- .changeset/twelve-pigs-give.md | 5 ++ clients/typescript/src/electric/namespace.ts | 10 +-- .../react/hooks/useConnectivityState.ts | 6 +- .../frameworks/react/hooks/useLiveQuery.ts | 17 ++-- clients/typescript/src/notifiers/event.ts | 90 ++++--------------- clients/typescript/src/notifiers/index.ts | 16 ++-- clients/typescript/src/notifiers/mock.ts | 5 +- clients/typescript/src/satellite/process.ts | 55 ++++++------ .../test/client/notifications.test.ts | 26 ++++-- .../typescript/test/notifiers/event.test.ts | 4 +- clients/typescript/test/satellite/common.ts | 3 +- 11 files changed, 97 insertions(+), 140 deletions(-) create mode 100644 .changeset/twelve-pigs-give.md diff --git a/.changeset/twelve-pigs-give.md b/.changeset/twelve-pigs-give.md new file mode 100644 index 0000000000..a8991224d3 --- /dev/null +++ b/.changeset/twelve-pigs-give.md @@ -0,0 +1,5 @@ +--- +"electric-sql": patch +--- + +Refactorings to the event notifier. diff --git a/clients/typescript/src/electric/namespace.ts b/clients/typescript/src/electric/namespace.ts index 094c0c73f2..402e071378 100644 --- a/clients/typescript/src/electric/namespace.ts +++ b/clients/typescript/src/electric/namespace.ts @@ -1,7 +1,7 @@ // This is the namespace that's patched onto the user's database client // (technically via the proxy machinery) as the `.electric` property. import { DatabaseAdapter } from './adapter' -import { Notifier } from '../notifiers' +import { Notifier, UnsubscribeFunction } from '../notifiers' import { ConnectivityState } from '../util/types' import { GlobalRegistry, Registry } from '../satellite' @@ -15,7 +15,7 @@ export class ElectricNamespace { return this._isConnected } - private _stateChangeSubscription: string + private _unsubscribeStateChanges: UnsubscribeFunction constructor( dbName: string, @@ -29,7 +29,7 @@ export class ElectricNamespace { this.registry = registry this._isConnected = false - this._stateChangeSubscription = + this._unsubscribeStateChanges = this.notifier.subscribeToConnectivityStateChanges( ({ connectivityState }) => { this.setIsConnected(connectivityState) @@ -52,9 +52,7 @@ export class ElectricNamespace { * Cleans up the resources used by the `ElectricNamespace`. */ async close(): Promise { - this.notifier.unsubscribeFromConnectivityStateChanges( - this._stateChangeSubscription - ) + this._unsubscribeStateChanges() await this.registry.stop(this.dbName) } } diff --git a/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts b/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts index 45603d29e9..4a70efdc3e 100644 --- a/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts +++ b/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts @@ -63,13 +63,11 @@ const useConnectivityState: HookFn = () => { setState(getValidState(connectivityState)) } - const subscriptionKey = - notifier.subscribeToConnectivityStateChanges(handler) + const unsubscribe = notifier.subscribeToConnectivityStateChanges(handler) return () => { shouldStop = true - - notifier.unsubscribeFromConnectivityStateChanges(subscriptionKey) + unsubscribe() } }, [electric]) diff --git a/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts b/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts index 9599428e07..d97dd53372 100644 --- a/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts +++ b/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts @@ -9,7 +9,10 @@ import { } from 'react' import { hash } from 'ohash' -import { ChangeNotification } from '../../../notifiers/index' +import { + ChangeNotification, + UnsubscribeFunction, +} from '../../../notifiers/index' import { QualifiedTablename, hasIntersection } from '../../../util/tablename' import { ElectricContext } from '../provider' @@ -122,7 +125,7 @@ function useLiveQueryWithQueryUpdates( ): ResultData { const electric = useContext(ElectricContext) - const changeSubscriptionKey = useRef() + const unsubscribeDataChanges = useRef() const tablenames = useRef() const tablenamesKey = useRef() const [resultData, setResultData] = useState>({}) @@ -194,16 +197,16 @@ function useLiveQueryWithQueryUpdates( } } - const key = notifier.subscribeToDataChanges(handleChange) - if (changeSubscriptionKey.current !== undefined) { - notifier.unsubscribeFromDataChanges(changeSubscriptionKey.current) + const unsubscribe = notifier.subscribeToDataChanges(handleChange) + if (unsubscribeDataChanges.current !== undefined) { + unsubscribeDataChanges.current() } - changeSubscriptionKey.current = key + unsubscribeDataChanges.current = unsubscribe return () => { ignore = true - notifier.unsubscribeFromDataChanges(key) + unsubscribe() } }, [electric, tablenamesKey.current, tablenames.current, runLiveQuery]) diff --git a/clients/typescript/src/notifiers/event.ts b/clients/typescript/src/notifiers/event.ts index a0b0c6ada7..921063bfda 100644 --- a/clients/typescript/src/notifiers/event.ts +++ b/clients/typescript/src/notifiers/event.ts @@ -1,7 +1,6 @@ import { EventEmitter } from 'events' import { AuthState } from '../auth/index' -import { randomValue } from '../util/random' import { QualifiedTablename } from '../util/tablename' import { ConnectivityState, DbName } from '../util/types' import Log from 'loglevel' @@ -19,9 +18,10 @@ import { Notifier, PotentialChangeCallback, PotentialChangeNotification, + UnsubscribeFunction, } from './index' -const EVENT_NAMES = { +export const EVENT_NAMES = { authChange: 'auth:changed', actualDataChange: 'data:actually:changed', potentialDataChange: 'data:potentially:changed', @@ -51,14 +51,6 @@ export class EventNotifier implements Notifier { events: EventEmitter - _changeCallbacks: { - [key: string]: NotificationCallback - } - - _connectivityStatusCallbacks: { - [key: string]: NotificationCallback - } - constructor(dbName: DbName, eventEmitter?: EventEmitter) { this.dbName = dbName this.attachedDbIndex = { @@ -67,9 +59,6 @@ export class EventNotifier implements Notifier { } this.events = eventEmitter !== undefined ? eventEmitter : globalEmitter - - this._changeCallbacks = {} - this._connectivityStatusCallbacks = {} } attach(dbName: DbName, dbAlias: string): void { @@ -113,24 +102,13 @@ export class EventNotifier implements Notifier { authStateChanged(authState: AuthState): void { this._emitAuthStateChange(authState) } - subscribeToAuthStateChanges(callback: AuthStateCallback): string { - const key = randomValue() - - this._changeCallbacks[key] = callback + subscribeToAuthStateChanges( + callback: AuthStateCallback + ): UnsubscribeFunction { this._subscribe(EVENT_NAMES.authChange, callback) - - return key - } - unsubscribeFromAuthStateChanges(key: string): void { - const callback = this._changeCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.authChange, callback) } - - this._unsubscribe(EVENT_NAMES.authChange, callback) - - delete this._changeCallbacks[key] } potentiallyChanged(): void { @@ -148,8 +126,9 @@ export class EventNotifier implements Notifier { this._emitActualChange(dbName, changes) } - subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string { - const key = randomValue() + subscribeToPotentialDataChanges( + callback: PotentialChangeCallback + ): UnsubscribeFunction { const thisHasDbName = this._hasDbName.bind(this) const wrappedCallback = (notification: PotentialChangeNotification) => { @@ -158,25 +137,14 @@ export class EventNotifier implements Notifier { } } - this._changeCallbacks[key] = wrappedCallback this._subscribe(EVENT_NAMES.potentialDataChange, wrappedCallback) - return key - } - unsubscribeFromPotentialDataChanges(key: string): void { - const callback = this._changeCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.potentialDataChange, wrappedCallback) } - - this._unsubscribe(EVENT_NAMES.potentialDataChange, callback) - - delete this._changeCallbacks[key] } - subscribeToDataChanges(callback: ChangeCallback): string { - const key = randomValue() + subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction { const thisHasDbName = this._hasDbName.bind(this) const wrappedCallback = (notification: ChangeNotification) => { @@ -185,21 +153,11 @@ export class EventNotifier implements Notifier { } } - this._changeCallbacks[key] = wrappedCallback this._subscribe(EVENT_NAMES.actualDataChange, wrappedCallback) - return key - } - unsubscribeFromDataChanges(key: string): void { - const callback = this._changeCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.actualDataChange, wrappedCallback) } - - this._unsubscribe(EVENT_NAMES.actualDataChange, callback) - - delete this._changeCallbacks[key] } connectivityStateChanged(dbName: string, status: ConnectivityState) { @@ -212,8 +170,7 @@ export class EventNotifier implements Notifier { subscribeToConnectivityStateChanges( callback: ConnectivityStateChangeCallback - ) { - const key = randomValue() + ): UnsubscribeFunction { const thisHasDbName = this._hasDbName.bind(this) const wrappedCallback = ( @@ -224,22 +181,11 @@ export class EventNotifier implements Notifier { } } - this._connectivityStatusCallbacks[key] = wrappedCallback this._subscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback) - return key - } - - unsubscribeFromConnectivityStateChanges(key: string): void { - const callback = this._connectivityStatusCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback) } - - this._unsubscribe(EVENT_NAMES.connectivityStateChange, callback) - - delete this._connectivityStatusCallbacks[key] } _getDbNames(): DbName[] { diff --git a/clients/typescript/src/notifiers/index.ts b/clients/typescript/src/notifiers/index.ts index 04f808d008..547c628282 100644 --- a/clients/typescript/src/notifiers/index.ts +++ b/clients/typescript/src/notifiers/index.ts @@ -47,6 +47,8 @@ export type NotificationCallback = | PotentialChangeCallback | ConnectivityStateChangeCallback +export type UnsubscribeFunction = () => void + export interface Notifier { // The name of the primary database that components communicating via this // notifier have open and are using. @@ -78,8 +80,7 @@ export interface Notifier { // Calling `authStateChanged` notifies the Satellite process that the // user's authentication credentials have changed. authStateChanged(authState: AuthState): void - subscribeToAuthStateChanges(callback: AuthStateCallback): string - unsubscribeFromAuthStateChanges(key: string): void + subscribeToAuthStateChanges(callback: AuthStateCallback): UnsubscribeFunction // The data change notification workflow starts by the electric database // clients (or the user manually) calling `potentiallyChanged` whenever @@ -90,8 +91,9 @@ export interface Notifier { // Satellite processes subscribe to these "data has potentially changed" // notifications. When they get one, they check the `_oplog` table in the // database for *actual* changes persisted by the triggers. - subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string - unsubscribeFromPotentialDataChanges(key: string): void + subscribeToPotentialDataChanges( + callback: PotentialChangeCallback + ): UnsubscribeFunction // When Satellite detects actual data changes in the oplog for a given // database, it replicates it and calls `actuallyChanged` with the list @@ -102,8 +104,7 @@ export interface Notifier { // using the info to trigger re-queries, if the changes affect databases and // tables that their queries depend on. This then trigger re-rendering if // the query results are actually affected by the data changes. - subscribeToDataChanges(callback: ChangeCallback): string - unsubscribeFromDataChanges(key: string): void + subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction // Notification for network connectivity state changes. // A connectivity change s can be triggered manually, @@ -116,6 +117,5 @@ export interface Notifier { subscribeToConnectivityStateChanges( callback: ConnectivityStateChangeCallback - ): string - unsubscribeFromConnectivityStateChanges(key: string): void + ): UnsubscribeFunction } diff --git a/clients/typescript/src/notifiers/mock.ts b/clients/typescript/src/notifiers/mock.ts index b791ff4018..fe821b17e9 100644 --- a/clients/typescript/src/notifiers/mock.ts +++ b/clients/typescript/src/notifiers/mock.ts @@ -2,12 +2,13 @@ import { DbName } from '../util/types' import { Notification, Notifier } from './index' import { EventNotifier } from './event' +import EventEmitter from 'events' export class MockNotifier extends EventNotifier implements Notifier { notifications: Notification[] - constructor(dbName: DbName) { - super(dbName) + constructor(dbName: DbName, emitter?: EventEmitter) { + super(dbName, emitter) this.notifications = [] } diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 41da30532a..fd831262ce 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -12,6 +12,7 @@ import { Change, ConnectivityStateChangeNotification, Notifier, + UnsubscribeFunction, } from '../notifiers/index' import { Waiter, @@ -119,13 +120,13 @@ export class SatelliteProcess implements Satellite { opts: SatelliteOpts _authState?: AuthState - _authStateSubscription?: string + _unsubscribeFromAuthState?: UnsubscribeFunction connectivityState?: ConnectivityState - _connectivityChangeSubscription?: string + _unsubscribeFromConnectivityChanges?: UnsubscribeFunction _pollingInterval?: any - _potentialDataChangeSubscription?: string + _unsubscribeFromPotentialDataChanges?: UnsubscribeFunction _throttledSnapshot: ThrottleFunction _lsn?: LSN @@ -221,9 +222,10 @@ export class SatelliteProcess implements Satellite { await this._setAuthState({ clientId: clientId, token: authConfig.token }) const notifierSubscriptions = Object.entries({ - _authStateSubscription: this._authStateSubscription, - _connectivityChangeSubscription: this._connectivityChangeSubscription, - _potentialDataChangeSubscription: this._potentialDataChangeSubscription, + _authStateSubscription: this._unsubscribeFromAuthState, + _connectivityChangeSubscription: this._unsubscribeFromConnectivityChanges, + _potentialDataChangeSubscription: + this._unsubscribeFromPotentialDataChanges, }) notifierSubscriptions.forEach(([name, value]) => { if (value !== undefined) { @@ -237,7 +239,7 @@ export class SatelliteProcess implements Satellite { // Monitor auth state changes. const authStateHandler = this._updateAuthState.bind(this) - this._authStateSubscription = + this._unsubscribeFromAuthState = this.notifier.subscribeToAuthStateChanges(authStateHandler) // Monitor connectivity state changes. @@ -246,13 +248,13 @@ export class SatelliteProcess implements Satellite { }: ConnectivityStateChangeNotification) => { this._handleConnectivityStateChange(connectivityState) } - this._connectivityChangeSubscription = + this._unsubscribeFromConnectivityChanges = this.notifier.subscribeToConnectivityStateChanges( connectivityStateHandler ) // Request a snapshot whenever the data in our database potentially changes. - this._potentialDataChangeSubscription = + this._unsubscribeFromPotentialDataChanges = this.notifier.subscribeToPotentialDataChanges(this._throttledSnapshot) // Start polling to request a snapshot every `pollingInterval` ms. @@ -350,27 +352,20 @@ export class SatelliteProcess implements Satellite { this._pollingInterval = undefined } - if (this._authStateSubscription !== undefined) { - this.notifier.unsubscribeFromAuthStateChanges(this._authStateSubscription) - - this._authStateSubscription = undefined - } - - if (this._connectivityChangeSubscription !== undefined) { - this.notifier.unsubscribeFromConnectivityStateChanges( - this._connectivityChangeSubscription - ) - - this._connectivityChangeSubscription = undefined - } - - if (this._potentialDataChangeSubscription !== undefined) { - this.notifier.unsubscribeFromPotentialDataChanges( - this._potentialDataChangeSubscription - ) - - this._potentialDataChangeSubscription = undefined - } + // Unsubscribe all listeners and remove them + const unsubscribers = [ + '_unsubscribeFromAuthState', + '_unsubscribeFromConnectivityChanges', + '_unsubscribeFromPotentialDataChanges', + ] as const + + unsubscribers.forEach((unsubscriber) => { + const unsub = this[unsubscriber] + if (unsub !== undefined) { + unsub!() + this[unsubscriber] = undefined + } + }) this._disconnect() diff --git a/clients/typescript/test/client/notifications.test.ts b/clients/typescript/test/client/notifications.test.ts index d91dcb7471..bb6e8d4e4b 100644 --- a/clients/typescript/test/client/notifications.test.ts +++ b/clients/typescript/test/client/notifications.test.ts @@ -5,6 +5,7 @@ import { schema } from './generated' import { MockRegistry } from '../../src/satellite/mock' import { EventNotifier } from '../../src/notifiers' import { mockElectricClient } from '../satellite/common' +import { EVENT_NAMES } from '../../src/notifiers/event' const conn = new Database(':memory:') const config = { @@ -20,13 +21,15 @@ await db.Items.sync() // sync the Items table async function runAndCheckNotifications(f: () => Promise) { let notifications = 0 - const sub = notifier.subscribeToPotentialDataChanges((_notification) => { - notifications = notifications + 1 - }) + const unsubscribe = notifier.subscribeToPotentialDataChanges( + (_notification) => { + notifications = notifications + 1 + } + ) await f() - notifier.unsubscribeFromPotentialDataChanges(sub) + unsubscribe() return notifications } @@ -227,15 +230,22 @@ test.serial( // Check that the listeners are registered const notifier = electric.notifier as EventNotifier - t.assert(Object.keys(notifier._changeCallbacks).length > 0) - t.assert(Object.keys(notifier._connectivityStatusCallbacks).length > 0) + const events = [ + EVENT_NAMES.authChange, + EVENT_NAMES.potentialDataChange, + EVENT_NAMES.connectivityStateChange, + ] + events.forEach((eventName) => { + t.assert(notifier.events.listenerCount(eventName) > 0) + }) // Close the Electric client await electric.close() // Check that the listeners are unregistered - t.deepEqual(notifier._changeCallbacks, {}) - t.deepEqual(notifier._connectivityStatusCallbacks, {}) + events.forEach((eventName) => { + t.is(notifier.events.listenerCount(eventName), 0) + }) // Check that the Satellite process is unregistered t.assert(!registry.satellites.hasOwnProperty(conn.name)) diff --git a/clients/typescript/test/notifiers/event.test.ts b/clients/typescript/test/notifiers/event.test.ts index e64172d989..8c940fc0fc 100644 --- a/clients/typescript/test/notifiers/event.test.ts +++ b/clients/typescript/test/notifiers/event.test.ts @@ -120,13 +120,13 @@ test('no more connectivity events after unsubscribe', async (t) => { const notifications: ConnectivityStateChangeNotification[] = [] - const key = target.subscribeToConnectivityStateChanges((x) => { + const unsubscribe = target.subscribeToConnectivityStateChanges((x) => { notifications.push(x) }) source.connectivityStateChanged('test.db', 'connected') - target.unsubscribeFromConnectivityStateChanges(key) + unsubscribe() source.connectivityStateChanged('test.db', 'connected') diff --git a/clients/typescript/test/satellite/common.ts b/clients/typescript/test/satellite/common.ts index be80116001..1dec7f8a7e 100644 --- a/clients/typescript/test/satellite/common.ts +++ b/clients/typescript/test/satellite/common.ts @@ -180,6 +180,7 @@ import { DbSchema, TableSchema } from '../../src/client/model/schema' import { PgBasicType } from '../../src/client/conversions/types' import { HKT } from '../../src/client/util/hkt' import { ElectricClient } from '../../src/client/model' +import EventEmitter from 'events' // Speed up the intervals for testing. export const opts = Object.assign({}, satelliteDefaults, { @@ -258,7 +259,7 @@ export const mockElectricClient = async ( const dbName = db.name const adapter = new DatabaseAdapter(db) const migrator = new BundleMigrator(adapter, migrations) - const notifier = new MockNotifier(dbName) + const notifier = new MockNotifier(dbName, new EventEmitter()) const client = new MockSatelliteClient() const satellite = new SatelliteProcess( dbName,