Skip to content

Commit

Permalink
chore (client): refactoring subscribe methods of the event notifier (#…
Browse files Browse the repository at this point in the history
…708)

This PR refactors the various subscribe methods provided by the event
notifier. For every listener that was being subscribed, the event
notifier would generate a random key, store the listener under that key,
and return that key. To unsubscribe, the user had to pass that key to
the corresponding unsubscribe method which would lookup the listener and
then unsubscribe it from the underlying event emitter. Storing the
listener under a key is not needed and is a bit redundant as the
underlying event emitter already stores them.

This PR modified the event notifier such that it does not keep track of
registered callbacks but instead returns an unsubscribe function from
the subscribe methods. The unsubscribe function closes over the callback
that needs to be unregistered from the underlying event emitter.
  • Loading branch information
kevin-dp authored Dec 4, 2023
1 parent e9c32c6 commit e091bbf
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 140 deletions.
5 changes: 5 additions & 0 deletions .changeset/twelve-pigs-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Refactorings to the event notifier.
10 changes: 4 additions & 6 deletions clients/typescript/src/electric/namespace.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// This is the namespace that's patched onto the user's database client
// (technically via the proxy machinery) as the `.electric` property.
import { DatabaseAdapter } from './adapter'
import { Notifier } from '../notifiers'
import { Notifier, UnsubscribeFunction } from '../notifiers'
import { ConnectivityState } from '../util/types'
import { GlobalRegistry, Registry } from '../satellite'

Expand All @@ -15,7 +15,7 @@ export class ElectricNamespace {
return this._isConnected
}

private _stateChangeSubscription: string
private _unsubscribeStateChanges: UnsubscribeFunction

constructor(
dbName: string,
Expand All @@ -29,7 +29,7 @@ export class ElectricNamespace {
this.registry = registry
this._isConnected = false

this._stateChangeSubscription =
this._unsubscribeStateChanges =
this.notifier.subscribeToConnectivityStateChanges(
({ connectivityState }) => {
this.setIsConnected(connectivityState)
Expand All @@ -52,9 +52,7 @@ export class ElectricNamespace {
* Cleans up the resources used by the `ElectricNamespace`.
*/
async close(): Promise<void> {
this.notifier.unsubscribeFromConnectivityStateChanges(
this._stateChangeSubscription
)
this._unsubscribeStateChanges()
await this.registry.stop(this.dbName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ const useConnectivityState: HookFn = () => {
setState(getValidState(connectivityState))
}

const subscriptionKey =
notifier.subscribeToConnectivityStateChanges(handler)
const unsubscribe = notifier.subscribeToConnectivityStateChanges(handler)

return () => {
shouldStop = true

notifier.unsubscribeFromConnectivityStateChanges(subscriptionKey)
unsubscribe()
}
}, [electric])

Expand Down
17 changes: 10 additions & 7 deletions clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import {
} from 'react'
import { hash } from 'ohash'

import { ChangeNotification } from '../../../notifiers/index'
import {
ChangeNotification,
UnsubscribeFunction,
} from '../../../notifiers/index'
import { QualifiedTablename, hasIntersection } from '../../../util/tablename'

import { ElectricContext } from '../provider'
Expand Down Expand Up @@ -122,7 +125,7 @@ function useLiveQueryWithQueryUpdates<Res>(
): ResultData<Res> {
const electric = useContext(ElectricContext)

const changeSubscriptionKey = useRef<string>()
const unsubscribeDataChanges = useRef<UnsubscribeFunction>()
const tablenames = useRef<QualifiedTablename[]>()
const tablenamesKey = useRef<string>()
const [resultData, setResultData] = useState<ResultData<Res>>({})
Expand Down Expand Up @@ -194,16 +197,16 @@ function useLiveQueryWithQueryUpdates<Res>(
}
}

const key = notifier.subscribeToDataChanges(handleChange)
if (changeSubscriptionKey.current !== undefined) {
notifier.unsubscribeFromDataChanges(changeSubscriptionKey.current)
const unsubscribe = notifier.subscribeToDataChanges(handleChange)
if (unsubscribeDataChanges.current !== undefined) {
unsubscribeDataChanges.current()
}

changeSubscriptionKey.current = key
unsubscribeDataChanges.current = unsubscribe

return () => {
ignore = true
notifier.unsubscribeFromDataChanges(key)
unsubscribe()
}
}, [electric, tablenamesKey.current, tablenames.current, runLiveQuery])

Expand Down
90 changes: 18 additions & 72 deletions clients/typescript/src/notifiers/event.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { EventEmitter } from 'events'

import { AuthState } from '../auth/index'
import { randomValue } from '../util/random'
import { QualifiedTablename } from '../util/tablename'
import { ConnectivityState, DbName } from '../util/types'
import Log from 'loglevel'
Expand All @@ -19,9 +18,10 @@ import {
Notifier,
PotentialChangeCallback,
PotentialChangeNotification,
UnsubscribeFunction,
} from './index'

const EVENT_NAMES = {
export const EVENT_NAMES = {
authChange: 'auth:changed',
actualDataChange: 'data:actually:changed',
potentialDataChange: 'data:potentially:changed',
Expand Down Expand Up @@ -51,14 +51,6 @@ export class EventNotifier implements Notifier {

events: EventEmitter

_changeCallbacks: {
[key: string]: NotificationCallback
}

_connectivityStatusCallbacks: {
[key: string]: NotificationCallback
}

constructor(dbName: DbName, eventEmitter?: EventEmitter) {
this.dbName = dbName
this.attachedDbIndex = {
Expand All @@ -67,9 +59,6 @@ export class EventNotifier implements Notifier {
}

this.events = eventEmitter !== undefined ? eventEmitter : globalEmitter

this._changeCallbacks = {}
this._connectivityStatusCallbacks = {}
}

attach(dbName: DbName, dbAlias: string): void {
Expand Down Expand Up @@ -113,24 +102,13 @@ export class EventNotifier implements Notifier {
authStateChanged(authState: AuthState): void {
this._emitAuthStateChange(authState)
}
subscribeToAuthStateChanges(callback: AuthStateCallback): string {
const key = randomValue()

this._changeCallbacks[key] = callback
subscribeToAuthStateChanges(
callback: AuthStateCallback
): UnsubscribeFunction {
this._subscribe(EVENT_NAMES.authChange, callback)

return key
}
unsubscribeFromAuthStateChanges(key: string): void {
const callback = this._changeCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.authChange, callback)
}

this._unsubscribe(EVENT_NAMES.authChange, callback)

delete this._changeCallbacks[key]
}

potentiallyChanged(): void {
Expand All @@ -148,8 +126,9 @@ export class EventNotifier implements Notifier {
this._emitActualChange(dbName, changes)
}

subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string {
const key = randomValue()
subscribeToPotentialDataChanges(
callback: PotentialChangeCallback
): UnsubscribeFunction {
const thisHasDbName = this._hasDbName.bind(this)

const wrappedCallback = (notification: PotentialChangeNotification) => {
Expand All @@ -158,25 +137,14 @@ export class EventNotifier implements Notifier {
}
}

this._changeCallbacks[key] = wrappedCallback
this._subscribe(EVENT_NAMES.potentialDataChange, wrappedCallback)

return key
}
unsubscribeFromPotentialDataChanges(key: string): void {
const callback = this._changeCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.potentialDataChange, wrappedCallback)
}

this._unsubscribe(EVENT_NAMES.potentialDataChange, callback)

delete this._changeCallbacks[key]
}

subscribeToDataChanges(callback: ChangeCallback): string {
const key = randomValue()
subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction {
const thisHasDbName = this._hasDbName.bind(this)

const wrappedCallback = (notification: ChangeNotification) => {
Expand All @@ -185,21 +153,11 @@ export class EventNotifier implements Notifier {
}
}

this._changeCallbacks[key] = wrappedCallback
this._subscribe(EVENT_NAMES.actualDataChange, wrappedCallback)

return key
}
unsubscribeFromDataChanges(key: string): void {
const callback = this._changeCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.actualDataChange, wrappedCallback)
}

this._unsubscribe(EVENT_NAMES.actualDataChange, callback)

delete this._changeCallbacks[key]
}

connectivityStateChanged(dbName: string, status: ConnectivityState) {
Expand All @@ -212,8 +170,7 @@ export class EventNotifier implements Notifier {

subscribeToConnectivityStateChanges(
callback: ConnectivityStateChangeCallback
) {
const key = randomValue()
): UnsubscribeFunction {
const thisHasDbName = this._hasDbName.bind(this)

const wrappedCallback = (
Expand All @@ -224,22 +181,11 @@ export class EventNotifier implements Notifier {
}
}

this._connectivityStatusCallbacks[key] = wrappedCallback
this._subscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback)

return key
}

unsubscribeFromConnectivityStateChanges(key: string): void {
const callback = this._connectivityStatusCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback)
}

this._unsubscribe(EVENT_NAMES.connectivityStateChange, callback)

delete this._connectivityStatusCallbacks[key]
}

_getDbNames(): DbName[] {
Expand Down
16 changes: 8 additions & 8 deletions clients/typescript/src/notifiers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export type NotificationCallback =
| PotentialChangeCallback
| ConnectivityStateChangeCallback

export type UnsubscribeFunction = () => void

export interface Notifier {
// The name of the primary database that components communicating via this
// notifier have open and are using.
Expand Down Expand Up @@ -78,8 +80,7 @@ export interface Notifier {
// Calling `authStateChanged` notifies the Satellite process that the
// user's authentication credentials have changed.
authStateChanged(authState: AuthState): void
subscribeToAuthStateChanges(callback: AuthStateCallback): string
unsubscribeFromAuthStateChanges(key: string): void
subscribeToAuthStateChanges(callback: AuthStateCallback): UnsubscribeFunction

// The data change notification workflow starts by the electric database
// clients (or the user manually) calling `potentiallyChanged` whenever
Expand All @@ -90,8 +91,9 @@ export interface Notifier {
// Satellite processes subscribe to these "data has potentially changed"
// notifications. When they get one, they check the `_oplog` table in the
// database for *actual* changes persisted by the triggers.
subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string
unsubscribeFromPotentialDataChanges(key: string): void
subscribeToPotentialDataChanges(
callback: PotentialChangeCallback
): UnsubscribeFunction

// When Satellite detects actual data changes in the oplog for a given
// database, it replicates it and calls `actuallyChanged` with the list
Expand All @@ -102,8 +104,7 @@ export interface Notifier {
// using the info to trigger re-queries, if the changes affect databases and
// tables that their queries depend on. This then trigger re-rendering if
// the query results are actually affected by the data changes.
subscribeToDataChanges(callback: ChangeCallback): string
unsubscribeFromDataChanges(key: string): void
subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction

// Notification for network connectivity state changes.
// A connectivity change s can be triggered manually,
Expand All @@ -116,6 +117,5 @@ export interface Notifier {

subscribeToConnectivityStateChanges(
callback: ConnectivityStateChangeCallback
): string
unsubscribeFromConnectivityStateChanges(key: string): void
): UnsubscribeFunction
}
5 changes: 3 additions & 2 deletions clients/typescript/src/notifiers/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import { DbName } from '../util/types'

import { Notification, Notifier } from './index'
import { EventNotifier } from './event'
import EventEmitter from 'events'

export class MockNotifier extends EventNotifier implements Notifier {
notifications: Notification[]

constructor(dbName: DbName) {
super(dbName)
constructor(dbName: DbName, emitter?: EventEmitter) {
super(dbName, emitter)

this.notifications = []
}
Expand Down
Loading

0 comments on commit e091bbf

Please sign in to comment.