Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore (client): refactoring subscribe methods of the event notifier #708

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/twelve-pigs-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Refactorings to the event notifier.
10 changes: 4 additions & 6 deletions clients/typescript/src/electric/namespace.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// This is the namespace that's patched onto the user's database client
// (technically via the proxy machinery) as the `.electric` property.
import { DatabaseAdapter } from './adapter'
import { Notifier } from '../notifiers'
import { Notifier, UnsubscribeFunction } from '../notifiers'
import { ConnectivityState } from '../util/types'
import { GlobalRegistry, Registry } from '../satellite'

Expand All @@ -15,7 +15,7 @@ export class ElectricNamespace {
return this._isConnected
}

private _stateChangeSubscription: string
private _unsubscribeStateChanges: UnsubscribeFunction

constructor(
dbName: string,
Expand All @@ -29,7 +29,7 @@ export class ElectricNamespace {
this.registry = registry
this._isConnected = false

this._stateChangeSubscription =
this._unsubscribeStateChanges =
this.notifier.subscribeToConnectivityStateChanges(
({ connectivityState }) => {
this.setIsConnected(connectivityState)
Expand All @@ -52,9 +52,7 @@ export class ElectricNamespace {
* Cleans up the resources used by the `ElectricNamespace`.
*/
async close(): Promise<void> {
this.notifier.unsubscribeFromConnectivityStateChanges(
this._stateChangeSubscription
)
this._unsubscribeStateChanges()
await this.registry.stop(this.dbName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ const useConnectivityState: HookFn = () => {
setState(getValidState(connectivityState))
}

const subscriptionKey =
notifier.subscribeToConnectivityStateChanges(handler)
const unsubscribe = notifier.subscribeToConnectivityStateChanges(handler)

return () => {
shouldStop = true

notifier.unsubscribeFromConnectivityStateChanges(subscriptionKey)
unsubscribe()
}
}, [electric])

Expand Down
17 changes: 10 additions & 7 deletions clients/typescript/src/frameworks/react/hooks/useLiveQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import {
} from 'react'
import { hash } from 'ohash'

import { ChangeNotification } from '../../../notifiers/index'
import {
ChangeNotification,
UnsubscribeFunction,
} from '../../../notifiers/index'
import { QualifiedTablename, hasIntersection } from '../../../util/tablename'

import { ElectricContext } from '../provider'
Expand Down Expand Up @@ -122,7 +125,7 @@ function useLiveQueryWithQueryUpdates<Res>(
): ResultData<Res> {
const electric = useContext(ElectricContext)

const changeSubscriptionKey = useRef<string>()
const unsubscribeDataChanges = useRef<UnsubscribeFunction>()
const tablenames = useRef<QualifiedTablename[]>()
const tablenamesKey = useRef<string>()
const [resultData, setResultData] = useState<ResultData<Res>>({})
Expand Down Expand Up @@ -194,16 +197,16 @@ function useLiveQueryWithQueryUpdates<Res>(
}
}

const key = notifier.subscribeToDataChanges(handleChange)
if (changeSubscriptionKey.current !== undefined) {
notifier.unsubscribeFromDataChanges(changeSubscriptionKey.current)
const unsubscribe = notifier.subscribeToDataChanges(handleChange)
if (unsubscribeDataChanges.current !== undefined) {
unsubscribeDataChanges.current()
}

changeSubscriptionKey.current = key
unsubscribeDataChanges.current = unsubscribe

return () => {
ignore = true
notifier.unsubscribeFromDataChanges(key)
unsubscribe()
}
}, [electric, tablenamesKey.current, tablenames.current, runLiveQuery])

Expand Down
90 changes: 18 additions & 72 deletions clients/typescript/src/notifiers/event.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { EventEmitter } from 'events'

import { AuthState } from '../auth/index'
import { randomValue } from '../util/random'
import { QualifiedTablename } from '../util/tablename'
import { ConnectivityState, DbName } from '../util/types'
import Log from 'loglevel'
Expand All @@ -19,9 +18,10 @@ import {
Notifier,
PotentialChangeCallback,
PotentialChangeNotification,
UnsubscribeFunction,
} from './index'

const EVENT_NAMES = {
export const EVENT_NAMES = {
authChange: 'auth:changed',
actualDataChange: 'data:actually:changed',
potentialDataChange: 'data:potentially:changed',
Expand Down Expand Up @@ -51,14 +51,6 @@ export class EventNotifier implements Notifier {

events: EventEmitter

_changeCallbacks: {
[key: string]: NotificationCallback
}

_connectivityStatusCallbacks: {
[key: string]: NotificationCallback
}

constructor(dbName: DbName, eventEmitter?: EventEmitter) {
this.dbName = dbName
this.attachedDbIndex = {
Expand All @@ -67,9 +59,6 @@ export class EventNotifier implements Notifier {
}

this.events = eventEmitter !== undefined ? eventEmitter : globalEmitter

this._changeCallbacks = {}
this._connectivityStatusCallbacks = {}
}

attach(dbName: DbName, dbAlias: string): void {
Expand Down Expand Up @@ -113,24 +102,13 @@ export class EventNotifier implements Notifier {
authStateChanged(authState: AuthState): void {
this._emitAuthStateChange(authState)
}
subscribeToAuthStateChanges(callback: AuthStateCallback): string {
const key = randomValue()

this._changeCallbacks[key] = callback
subscribeToAuthStateChanges(
callback: AuthStateCallback
): UnsubscribeFunction {
this._subscribe(EVENT_NAMES.authChange, callback)

return key
}
unsubscribeFromAuthStateChanges(key: string): void {
const callback = this._changeCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.authChange, callback)
}

this._unsubscribe(EVENT_NAMES.authChange, callback)

delete this._changeCallbacks[key]
}

potentiallyChanged(): void {
Expand All @@ -148,8 +126,9 @@ export class EventNotifier implements Notifier {
this._emitActualChange(dbName, changes)
}

subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string {
const key = randomValue()
subscribeToPotentialDataChanges(
callback: PotentialChangeCallback
): UnsubscribeFunction {
const thisHasDbName = this._hasDbName.bind(this)

const wrappedCallback = (notification: PotentialChangeNotification) => {
Expand All @@ -158,25 +137,14 @@ export class EventNotifier implements Notifier {
}
}

this._changeCallbacks[key] = wrappedCallback
this._subscribe(EVENT_NAMES.potentialDataChange, wrappedCallback)

return key
}
unsubscribeFromPotentialDataChanges(key: string): void {
const callback = this._changeCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.potentialDataChange, wrappedCallback)
}

this._unsubscribe(EVENT_NAMES.potentialDataChange, callback)

delete this._changeCallbacks[key]
}

subscribeToDataChanges(callback: ChangeCallback): string {
const key = randomValue()
subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction {
const thisHasDbName = this._hasDbName.bind(this)

const wrappedCallback = (notification: ChangeNotification) => {
Expand All @@ -185,21 +153,11 @@ export class EventNotifier implements Notifier {
}
}

this._changeCallbacks[key] = wrappedCallback
this._subscribe(EVENT_NAMES.actualDataChange, wrappedCallback)

return key
}
unsubscribeFromDataChanges(key: string): void {
const callback = this._changeCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.actualDataChange, wrappedCallback)
}

this._unsubscribe(EVENT_NAMES.actualDataChange, callback)

delete this._changeCallbacks[key]
}

connectivityStateChanged(dbName: string, status: ConnectivityState) {
Expand All @@ -212,8 +170,7 @@ export class EventNotifier implements Notifier {

subscribeToConnectivityStateChanges(
callback: ConnectivityStateChangeCallback
) {
const key = randomValue()
): UnsubscribeFunction {
const thisHasDbName = this._hasDbName.bind(this)

const wrappedCallback = (
Expand All @@ -224,22 +181,11 @@ export class EventNotifier implements Notifier {
}
}

this._connectivityStatusCallbacks[key] = wrappedCallback
this._subscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback)

return key
}

unsubscribeFromConnectivityStateChanges(key: string): void {
const callback = this._connectivityStatusCallbacks[key]

if (callback === undefined) {
return
return () => {
this._unsubscribe(EVENT_NAMES.connectivityStateChange, wrappedCallback)
}

this._unsubscribe(EVENT_NAMES.connectivityStateChange, callback)

delete this._connectivityStatusCallbacks[key]
}

_getDbNames(): DbName[] {
Expand Down
16 changes: 8 additions & 8 deletions clients/typescript/src/notifiers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export type NotificationCallback =
| PotentialChangeCallback
| ConnectivityStateChangeCallback

export type UnsubscribeFunction = () => void

export interface Notifier {
// The name of the primary database that components communicating via this
// notifier have open and are using.
Expand Down Expand Up @@ -78,8 +80,7 @@ export interface Notifier {
// Calling `authStateChanged` notifies the Satellite process that the
// user's authentication credentials have changed.
authStateChanged(authState: AuthState): void
subscribeToAuthStateChanges(callback: AuthStateCallback): string
unsubscribeFromAuthStateChanges(key: string): void
subscribeToAuthStateChanges(callback: AuthStateCallback): UnsubscribeFunction

// The data change notification workflow starts by the electric database
// clients (or the user manually) calling `potentiallyChanged` whenever
Expand All @@ -90,8 +91,9 @@ export interface Notifier {
// Satellite processes subscribe to these "data has potentially changed"
// notifications. When they get one, they check the `_oplog` table in the
// database for *actual* changes persisted by the triggers.
subscribeToPotentialDataChanges(callback: PotentialChangeCallback): string
unsubscribeFromPotentialDataChanges(key: string): void
subscribeToPotentialDataChanges(
callback: PotentialChangeCallback
): UnsubscribeFunction

// When Satellite detects actual data changes in the oplog for a given
// database, it replicates it and calls `actuallyChanged` with the list
Expand All @@ -102,8 +104,7 @@ export interface Notifier {
// using the info to trigger re-queries, if the changes affect databases and
// tables that their queries depend on. This then trigger re-rendering if
// the query results are actually affected by the data changes.
subscribeToDataChanges(callback: ChangeCallback): string
unsubscribeFromDataChanges(key: string): void
subscribeToDataChanges(callback: ChangeCallback): UnsubscribeFunction

// Notification for network connectivity state changes.
// A connectivity change s can be triggered manually,
Expand All @@ -116,6 +117,5 @@ export interface Notifier {

subscribeToConnectivityStateChanges(
callback: ConnectivityStateChangeCallback
): string
unsubscribeFromConnectivityStateChanges(key: string): void
): UnsubscribeFunction
}
5 changes: 3 additions & 2 deletions clients/typescript/src/notifiers/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import { DbName } from '../util/types'

import { Notification, Notifier } from './index'
import { EventNotifier } from './event'
import EventEmitter from 'events'

export class MockNotifier extends EventNotifier implements Notifier {
notifications: Notification[]

constructor(dbName: DbName) {
super(dbName)
constructor(dbName: DbName, emitter?: EventEmitter) {
super(dbName, emitter)

this.notifications = []
}
Expand Down
Loading
Loading