diff --git a/.changeset/twelve-pigs-give.md b/.changeset/twelve-pigs-give.md new file mode 100644 index 0000000000..a8991224d3 --- /dev/null +++ b/.changeset/twelve-pigs-give.md @@ -0,0 +1,5 @@ +--- +"electric-sql": patch +--- + +Refactorings to the event notifier. diff --git a/clients/typescript/src/electric/namespace.ts b/clients/typescript/src/electric/namespace.ts index 094c0c73f2..402e071378 100644 --- a/clients/typescript/src/electric/namespace.ts +++ b/clients/typescript/src/electric/namespace.ts @@ -1,7 +1,7 @@ // This is the namespace that's patched onto the user's database client // (technically via the proxy machinery) as the `.electric` property. import { DatabaseAdapter } from './adapter' -import { Notifier } from '../notifiers' +import { Notifier, UnsubscribeFunction } from '../notifiers' import { ConnectivityState } from '../util/types' import { GlobalRegistry, Registry } from '../satellite' @@ -15,7 +15,7 @@ export class ElectricNamespace { return this._isConnected } - private _stateChangeSubscription: string + private _unsubscribeStateChanges: UnsubscribeFunction constructor( dbName: string, @@ -29,7 +29,7 @@ export class ElectricNamespace { this.registry = registry this._isConnected = false - this._stateChangeSubscription = + this._unsubscribeStateChanges = this.notifier.subscribeToConnectivityStateChanges( ({ connectivityState }) => { this.setIsConnected(connectivityState) @@ -52,9 +52,7 @@ export class ElectricNamespace { * Cleans up the resources used by the `ElectricNamespace`. */ async close(): Promise { - this.notifier.unsubscribeFromConnectivityStateChanges( - this._stateChangeSubscription - ) + this._unsubscribeStateChanges() await this.registry.stop(this.dbName) } } diff --git a/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts b/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts index 45603d29e9..4a70efdc3e 100644 --- a/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts +++ b/clients/typescript/src/frameworks/react/hooks/useConnectivityState.ts @@ -63,13 +63,11 @@ const useConnectivityState: HookFn = () => { setState(getValidState(connectivityState)) } - const subscriptionKey = - notifier.subscribeToConnectivityStateChanges(handler) + const unsubscribe = notifier.subscribeToConnectivityStateChanges(handler) return () => { shouldStop = true - - notifier.unsubscribeFromConnectivityStateChanges(subscriptionKey) + unsubscribe() } }, [electric]) diff --git a/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts b/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts index 9599428e07..d97dd53372 100644 --- a/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts +++ b/clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts @@ -9,7 +9,10 @@ import { } from 'react' import { hash } from 'ohash' -import { ChangeNotification } from '../../../notifiers/index' +import { + ChangeNotification, + UnsubscribeFunction, +} from '../../../notifiers/index' import { QualifiedTablename, hasIntersection } from '../../../util/tablename' import { ElectricContext } from '../provider' @@ -122,7 +125,7 @@ function useLiveQueryWithQueryUpdates( ): ResultData { const electric = useContext(ElectricContext) - const changeSubscriptionKey = useRef() + const unsubscribeDataChanges = useRef() const tablenames = useRef() const tablenamesKey = useRef() const [resultData, setResultData] = useState>({}) @@ -194,16 +197,16 @@ function useLiveQueryWithQueryUpdates( } } - const key = notifier.subscribeToDataChanges(handleChange) - if (changeSubscriptionKey.current !== undefined) { - notifier.unsubscribeFromDataChanges(changeSubscriptionKey.current) + const unsubscribe = notifier.subscribeToDataChanges(handleChange) + if (unsubscribeDataChanges.current !== undefined) { + unsubscribeDataChanges.current() } - changeSubscriptionKey.current = key + unsubscribeDataChanges.current = unsubscribe return () => { ignore = true - notifier.unsubscribeFromDataChanges(key) + unsubscribe() } }, [electric, tablenamesKey.current, tablenames.current, runLiveQuery]) diff --git a/clients/typescript/src/notifiers/event.ts b/clients/typescript/src/notifiers/event.ts index a0b0c6ada7..921063bfda 100644 --- a/clients/typescript/src/notifiers/event.ts +++ b/clients/typescript/src/notifiers/event.ts @@ -1,7 +1,6 @@ import { EventEmitter } from 'events' import { AuthState } from '../auth/index' -import { randomValue } from '../util/random' import { QualifiedTablename } from '../util/tablename' import { ConnectivityState, DbName } from '../util/types' import Log from 'loglevel' @@ -19,9 +18,10 @@ import { Notifier, PotentialChangeCallback, PotentialChangeNotification, + UnsubscribeFunction, } from './index' -const EVENT_NAMES = { +export const EVENT_NAMES = { authChange: 'auth:changed', actualDataChange: 'data:actually:changed', potentialDataChange: 'data:potentially:changed', @@ -51,14 +51,6 @@ export class EventNotifier implements Notifier { events: EventEmitter - _changeCallbacks: { - [key: string]: NotificationCallback - } - - _connectivityStatusCallbacks: { - [key: string]: NotificationCallback - } - constructor(dbName: DbName, eventEmitter?: EventEmitter) { this.dbName = dbName this.attachedDbIndex = { @@ -67,9 +59,6 @@ export class EventNotifier implements Notifier { } this.events = eventEmitter !== undefined ? eventEmitter : globalEmitter - - this._changeCallbacks = {} - this._connectivityStatusCallbacks = {} } attach(dbName: DbName, dbAlias: string): void { @@ -113,24 +102,13 @@ export class EventNotifier implements Notifier { authStateChanged(authState: AuthState): void { this._emitAuthStateChange(authState) } - subscribeToAuthStateChanges(callback: AuthStateCallback): string { - const key = randomValue() - - this._changeCallbacks[key] = callback + subscribeToAuthStateChanges( + callback: AuthStateCallback + ): UnsubscribeFunction { this._subscribe(EVENT_NAMES.authChange, callback) - - return key - } - unsubscribeFromAuthStateChanges(key: string): void { - const callback = this._changeCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.authChange, callback) } - - this._unsubscribe(EVENT_NAMES.authChange, callback) - - delete this._changeCallbacks[key] } potentiallyChanged(): void { @@ -148,8 +126,9 @@ export class EventNotifier implements Notifier { this._emitActualChange(dbName, changes) } - subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string { - const key = randomValue() + subscribeToPotentialDataChanges( + callback: PotentialChangeCallback + ): UnsubscribeFunction { const thisHasDbName = this._hasDbName.bind(this) const wrappedCallback = (notification: PotentialChangeNotification) => { @@ -158,25 +137,14 @@ export class EventNotifier implements Notifier { } } - this._changeCallbacks[key] = wrappedCallback this._subscribe(EVENT_NAMES.potentialDataChange, wrappedCallback) - return key - } - unsubscribeFromPotentialDataChanges(key: string): void { - const callback = this._changeCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.potentialDataChange, wrappedCallback) } - - this._unsubscribe(EVENT_NAMES.potentialDataChange, callback) - - delete this._changeCallbacks[key] } - subscribeToDataChanges(callback: ChangeCallback): string { - const key = randomValue() + subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction { const thisHasDbName = this._hasDbName.bind(this) const wrappedCallback = (notification: ChangeNotification) => { @@ -185,21 +153,11 @@ export class EventNotifier implements Notifier { } } - this._changeCallbacks[key] = wrappedCallback this._subscribe(EVENT_NAMES.actualDataChange, wrappedCallback) - return key - } - unsubscribeFromDataChanges(key: string): void { - const callback = this._changeCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.actualDataChange, wrappedCallback) } - - this._unsubscribe(EVENT_NAMES.actualDataChange, callback) - - delete this._changeCallbacks[key] } connectivityStateChanged(dbName: string, status: ConnectivityState) { @@ -212,8 +170,7 @@ export class EventNotifier implements Notifier { subscribeToConnectivityStateChanges( callback: ConnectivityStateChangeCallback - ) { - const key = randomValue() + ): UnsubscribeFunction { const thisHasDbName = this._hasDbName.bind(this) const wrappedCallback = ( @@ -224,22 +181,11 @@ export class EventNotifier implements Notifier { } } - this._connectivityStatusCallbacks[key] = wrappedCallback this._subscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback) - return key - } - - unsubscribeFromConnectivityStateChanges(key: string): void { - const callback = this._connectivityStatusCallbacks[key] - - if (callback === undefined) { - return + return () => { + this._unsubscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback) } - - this._unsubscribe(EVENT_NAMES.connectivityStateChange, callback) - - delete this._connectivityStatusCallbacks[key] } _getDbNames(): DbName[] { diff --git a/clients/typescript/src/notifiers/index.ts b/clients/typescript/src/notifiers/index.ts index 04f808d008..547c628282 100644 --- a/clients/typescript/src/notifiers/index.ts +++ b/clients/typescript/src/notifiers/index.ts @@ -47,6 +47,8 @@ export type NotificationCallback = | PotentialChangeCallback | ConnectivityStateChangeCallback +export type UnsubscribeFunction = () => void + export interface Notifier { // The name of the primary database that components communicating via this // notifier have open and are using. @@ -78,8 +80,7 @@ export interface Notifier { // Calling `authStateChanged` notifies the Satellite process that the // user's authentication credentials have changed. authStateChanged(authState: AuthState): void - subscribeToAuthStateChanges(callback: AuthStateCallback): string - unsubscribeFromAuthStateChanges(key: string): void + subscribeToAuthStateChanges(callback: AuthStateCallback): UnsubscribeFunction // The data change notification workflow starts by the electric database // clients (or the user manually) calling `potentiallyChanged` whenever @@ -90,8 +91,9 @@ export interface Notifier { // Satellite processes subscribe to these "data has potentially changed" // notifications. When they get one, they check the `_oplog` table in the // database for *actual* changes persisted by the triggers. - subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string - unsubscribeFromPotentialDataChanges(key: string): void + subscribeToPotentialDataChanges( + callback: PotentialChangeCallback + ): UnsubscribeFunction // When Satellite detects actual data changes in the oplog for a given // database, it replicates it and calls `actuallyChanged` with the list @@ -102,8 +104,7 @@ export interface Notifier { // using the info to trigger re-queries, if the changes affect databases and // tables that their queries depend on. This then trigger re-rendering if // the query results are actually affected by the data changes. - subscribeToDataChanges(callback: ChangeCallback): string - unsubscribeFromDataChanges(key: string): void + subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction // Notification for network connectivity state changes. // A connectivity change s can be triggered manually, @@ -116,6 +117,5 @@ export interface Notifier { subscribeToConnectivityStateChanges( callback: ConnectivityStateChangeCallback - ): string - unsubscribeFromConnectivityStateChanges(key: string): void + ): UnsubscribeFunction } diff --git a/clients/typescript/src/notifiers/mock.ts b/clients/typescript/src/notifiers/mock.ts index b791ff4018..fe821b17e9 100644 --- a/clients/typescript/src/notifiers/mock.ts +++ b/clients/typescript/src/notifiers/mock.ts @@ -2,12 +2,13 @@ import { DbName } from '../util/types' import { Notification, Notifier } from './index' import { EventNotifier } from './event' +import EventEmitter from 'events' export class MockNotifier extends EventNotifier implements Notifier { notifications: Notification[] - constructor(dbName: DbName) { - super(dbName) + constructor(dbName: DbName, emitter?: EventEmitter) { + super(dbName, emitter) this.notifications = [] } diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 41da30532a..fd831262ce 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -12,6 +12,7 @@ import { Change, ConnectivityStateChangeNotification, Notifier, + UnsubscribeFunction, } from '../notifiers/index' import { Waiter, @@ -119,13 +120,13 @@ export class SatelliteProcess implements Satellite { opts: SatelliteOpts _authState?: AuthState - _authStateSubscription?: string + _unsubscribeFromAuthState?: UnsubscribeFunction connectivityState?: ConnectivityState - _connectivityChangeSubscription?: string + _unsubscribeFromConnectivityChanges?: UnsubscribeFunction _pollingInterval?: any - _potentialDataChangeSubscription?: string + _unsubscribeFromPotentialDataChanges?: UnsubscribeFunction _throttledSnapshot: ThrottleFunction _lsn?: LSN @@ -221,9 +222,10 @@ export class SatelliteProcess implements Satellite { await this._setAuthState({ clientId: clientId, token: authConfig.token }) const notifierSubscriptions = Object.entries({ - _authStateSubscription: this._authStateSubscription, - _connectivityChangeSubscription: this._connectivityChangeSubscription, - _potentialDataChangeSubscription: this._potentialDataChangeSubscription, + _authStateSubscription: this._unsubscribeFromAuthState, + _connectivityChangeSubscription: this._unsubscribeFromConnectivityChanges, + _potentialDataChangeSubscription: + this._unsubscribeFromPotentialDataChanges, }) notifierSubscriptions.forEach(([name, value]) => { if (value !== undefined) { @@ -237,7 +239,7 @@ export class SatelliteProcess implements Satellite { // Monitor auth state changes. const authStateHandler = this._updateAuthState.bind(this) - this._authStateSubscription = + this._unsubscribeFromAuthState = this.notifier.subscribeToAuthStateChanges(authStateHandler) // Monitor connectivity state changes. @@ -246,13 +248,13 @@ export class SatelliteProcess implements Satellite { }: ConnectivityStateChangeNotification) => { this._handleConnectivityStateChange(connectivityState) } - this._connectivityChangeSubscription = + this._unsubscribeFromConnectivityChanges = this.notifier.subscribeToConnectivityStateChanges( connectivityStateHandler ) // Request a snapshot whenever the data in our database potentially changes. - this._potentialDataChangeSubscription = + this._unsubscribeFromPotentialDataChanges = this.notifier.subscribeToPotentialDataChanges(this._throttledSnapshot) // Start polling to request a snapshot every `pollingInterval` ms. @@ -350,27 +352,20 @@ export class SatelliteProcess implements Satellite { this._pollingInterval = undefined } - if (this._authStateSubscription !== undefined) { - this.notifier.unsubscribeFromAuthStateChanges(this._authStateSubscription) - - this._authStateSubscription = undefined - } - - if (this._connectivityChangeSubscription !== undefined) { - this.notifier.unsubscribeFromConnectivityStateChanges( - this._connectivityChangeSubscription - ) - - this._connectivityChangeSubscription = undefined - } - - if (this._potentialDataChangeSubscription !== undefined) { - this.notifier.unsubscribeFromPotentialDataChanges( - this._potentialDataChangeSubscription - ) - - this._potentialDataChangeSubscription = undefined - } + // Unsubscribe all listeners and remove them + const unsubscribers = [ + '_unsubscribeFromAuthState', + '_unsubscribeFromConnectivityChanges', + '_unsubscribeFromPotentialDataChanges', + ] as const + + unsubscribers.forEach((unsubscriber) => { + const unsub = this[unsubscriber] + if (unsub !== undefined) { + unsub!() + this[unsubscriber] = undefined + } + }) this._disconnect() diff --git a/clients/typescript/test/client/notifications.test.ts b/clients/typescript/test/client/notifications.test.ts index d91dcb7471..bb6e8d4e4b 100644 --- a/clients/typescript/test/client/notifications.test.ts +++ b/clients/typescript/test/client/notifications.test.ts @@ -5,6 +5,7 @@ import { schema } from './generated' import { MockRegistry } from '../../src/satellite/mock' import { EventNotifier } from '../../src/notifiers' import { mockElectricClient } from '../satellite/common' +import { EVENT_NAMES } from '../../src/notifiers/event' const conn = new Database(':memory:') const config = { @@ -20,13 +21,15 @@ await db.Items.sync() // sync the Items table async function runAndCheckNotifications(f: () => Promise) { let notifications = 0 - const sub = notifier.subscribeToPotentialDataChanges((_notification) => { - notifications = notifications + 1 - }) + const unsubscribe = notifier.subscribeToPotentialDataChanges( + (_notification) => { + notifications = notifications + 1 + } + ) await f() - notifier.unsubscribeFromPotentialDataChanges(sub) + unsubscribe() return notifications } @@ -227,15 +230,22 @@ test.serial( // Check that the listeners are registered const notifier = electric.notifier as EventNotifier - t.assert(Object.keys(notifier._changeCallbacks).length > 0) - t.assert(Object.keys(notifier._connectivityStatusCallbacks).length > 0) + const events = [ + EVENT_NAMES.authChange, + EVENT_NAMES.potentialDataChange, + EVENT_NAMES.connectivityStateChange, + ] + events.forEach((eventName) => { + t.assert(notifier.events.listenerCount(eventName) > 0) + }) // Close the Electric client await electric.close() // Check that the listeners are unregistered - t.deepEqual(notifier._changeCallbacks, {}) - t.deepEqual(notifier._connectivityStatusCallbacks, {}) + events.forEach((eventName) => { + t.is(notifier.events.listenerCount(eventName), 0) + }) // Check that the Satellite process is unregistered t.assert(!registry.satellites.hasOwnProperty(conn.name)) diff --git a/clients/typescript/test/notifiers/event.test.ts b/clients/typescript/test/notifiers/event.test.ts index e64172d989..8c940fc0fc 100644 --- a/clients/typescript/test/notifiers/event.test.ts +++ b/clients/typescript/test/notifiers/event.test.ts @@ -120,13 +120,13 @@ test('no more connectivity events after unsubscribe', async (t) => { const notifications: ConnectivityStateChangeNotification[] = [] - const key = target.subscribeToConnectivityStateChanges((x) => { + const unsubscribe = target.subscribeToConnectivityStateChanges((x) => { notifications.push(x) }) source.connectivityStateChanged('test.db', 'connected') - target.unsubscribeFromConnectivityStateChanges(key) + unsubscribe() source.connectivityStateChanged('test.db', 'connected') diff --git a/clients/typescript/test/satellite/common.ts b/clients/typescript/test/satellite/common.ts index be80116001..1dec7f8a7e 100644 --- a/clients/typescript/test/satellite/common.ts +++ b/clients/typescript/test/satellite/common.ts @@ -180,6 +180,7 @@ import { DbSchema, TableSchema } from '../../src/client/model/schema' import { PgBasicType } from '../../src/client/conversions/types' import { HKT } from '../../src/client/util/hkt' import { ElectricClient } from '../../src/client/model' +import EventEmitter from 'events' // Speed up the intervals for testing. export const opts = Object.assign({}, satelliteDefaults, { @@ -258,7 +259,7 @@ export const mockElectricClient = async ( const dbName = db.name const adapter = new DatabaseAdapter(db) const migrator = new BundleMigrator(adapter, migrations) - const notifier = new MockNotifier(dbName) + const notifier = new MockNotifier(dbName, new EventEmitter()) const client = new MockSatelliteClient() const satellite = new SatelliteProcess( dbName,