Skip to content

Commit 16bee87

Browse files
committed
feat: fixed invalid pending state after mutation cancel
1 parent c38a0cc commit 16bee87

39 files changed

+363
-364
lines changed

.eslintrc.cjs

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ const config = {
2121
plugins: ["react"],
2222
ignorePatterns: ["vitest.config.ts", "vite.config.ts", "test-setup.ts"],
2323
rules: {
24-
'react/jsx-key': ['error', { checkFragmentShorthand: true }],
25-
'react-hooks/exhaustive-deps': 'error',
24+
"react/jsx-key": ["error", { checkFragmentShorthand: true }],
25+
"react-hooks/exhaustive-deps": "error",
2626
"@typescript-eslint/explicit-function-return-type": 0,
2727
"react/display-name": 0,
2828
"@typescript-eslint/no-invalid-void-type": 0,
@@ -33,4 +33,4 @@ const config = {
3333
}
3434
}
3535

36-
module.exports = config
36+
module.exports = config

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# reactjrx
22

3-
`reactjrx` is an innovative library designed to bridge RxJS and React, offering developers a powerful set of tools to integrate observables into their React applications seamlessly.
3+
`reactjrx` is an innovative library designed to bridge RxJS and React, offering developers a powerful set of tools to integrate observables into their React applications seamlessly.
44

5-
It simplifies the reactive programming approach by providing easy-to-use bindings for observing and reacting to data changes, coupled with advanced state management and queries helpers akin to `react-query` for handling effects, mutations, and asynchronous data fetching.
5+
It simplifies the reactive programming approach by providing easy-to-use bindings for observing and reacting to data changes, coupled with advanced state management and queries helpers akin to `react-query` for handling effects, mutations, and asynchronous data fetching.
66

77
Tailored for projects where RxJS is a core dependency, `reactjrx` enhances the development experience by enabling more dynamic, efficient, and responsive UIs, making it an essential addition for developers looking to leverage the full potential of RxJS within their React applications.
88

src/lib/binding/types.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type Observable } from "rxjs";
1+
import { type Observable } from "rxjs"
22

33
export type SubscribeSource<Data> =
44
| (() => Observable<Data>)

src/lib/queries/client/focusManager.ts

+1-9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
1-
import {
2-
filter,
3-
fromEvent,
4-
map,
5-
merge,
6-
skip,
7-
tap,
8-
BehaviorSubject,
9-
} from "rxjs"
1+
import { filter, fromEvent, map, merge, skip, tap, BehaviorSubject } from "rxjs"
102

113
export class FocusManager {
124
readonly #visibility$ = merge(

src/lib/queries/client/keys/hashKey.test.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ it("should return correct serialized key", () => {
1010
expect(hashKey(["todos", { page: 2, status: "foo" }])).toBe(
1111
`["todos",{"page":2,"status":"foo"}]`
1212
)
13-
expect(
14-
hashKey(["todos", { page: 2, status: "foo", bar: undefined }])
15-
).toBe(`["todos",{"page":2,"status":"foo"}]`)
13+
expect(hashKey(["todos", { page: 2, status: "foo", bar: undefined }])).toBe(
14+
`["todos",{"page":2,"status":"foo"}]`
15+
)
1616
})

src/lib/queries/client/keys/matchKey.test.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ it("should compare keys", () => {
66
expect(matchKey([], [], { exact: true })).toBe(true)
77
expect(matchKey([], ["foo"], { exact: false })).toBe(false)
88
expect(matchKey(["foo"], [], { exact: false })).toBe(true)
9-
expect(
10-
matchKey(["foo", "bar"], ["foo"], { exact: false })
11-
).toBe(true)
9+
expect(matchKey(["foo", "bar"], ["foo"], { exact: false })).toBe(true)
1210
expect(matchKey(["foo"], ["foo", "bar"], { exact: true })).toBe(false)
1311
expect(matchKey(["foo", "bar"], ["foo"], { exact: false })).toBe(true)
1412
expect(matchKey(["foo"], ["foo", "bar"], { exact: false })).toBe(false)

src/lib/queries/client/mutations/cache/MutationCache.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
combineLatest,
1313
EMPTY
1414
} from "rxjs"
15-
import { createPredicateForFilters } from "../filters"
15+
import { createPredicateForFilters } from "../utils/filters"
1616
import {
1717
type MutationCacheConfig,
1818
type MutationCacheNotifyEvent

src/lib/queries/client/mutations/mutation/Mutation.ts

+70-58
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import {
1010
concat,
1111
toArray,
1212
mergeMap,
13-
NEVER,
14-
startWith
13+
startWith,
14+
map,
15+
filter,
16+
scan
1517
} from "rxjs"
16-
import { getDefaultMutationState } from "../defaultMutationState"
18+
import { getDefaultMutationState } from "../utils/defaultMutationState"
1719
import { type DefaultError } from "../../types"
1820
import { type MutationCache } from "../cache/MutationCache"
1921
import { makeObservable } from "../../utils/makeObservable"
@@ -25,6 +27,7 @@ import {
2527
import { executeMutation } from "./executeMutation"
2628
import { trackSubscriptions } from "../../../../utils/operators/trackSubscriptions"
2729
import { observeUntilFinished } from "./observeUntilFinished"
30+
import { distinctUntilStateChanged } from "../utils/distinctUntilStateChanged"
2831

2932
interface MutationConfig<TData, TError, TVariables, TContext> {
3033
mutationCache: MutationCache
@@ -59,18 +62,24 @@ export class Mutation<
5962
this.options = options
6063
this.state = state ?? this.state
6164

62-
const execution$ = this.#executeSubject.pipe(
65+
const resetPendingState$ = this.#cancelSubject.pipe(
66+
filter(() => this.state.status === "pending"),
67+
map(() => ({
68+
status: "idle" as const
69+
}))
70+
)
71+
72+
const executionState$ = this.#executeSubject.pipe(
6373
switchMap((variables) =>
6474
executeMutation<TData, TError, TVariables, TContext>({
6575
options: {
6676
...this.options,
6777
onMutate: (variables) => {
68-
const onCacheMutate$ = makeObservable(
69-
() =>
70-
mutationCache.config.onMutate?.(
71-
variables,
72-
this as Mutation<any, any, any, any>
73-
)
78+
const onCacheMutate$ = makeObservable(() =>
79+
mutationCache.config.onMutate?.(
80+
variables,
81+
this as Mutation<any, any, any, any>
82+
)
7483
) as Observable<TContext>
7584

7685
// eslint-disable-next-line @typescript-eslint/promise-function-async
@@ -87,53 +96,50 @@ export class Mutation<
8796
return context$
8897
},
8998
onError: (error, variables, context) => {
90-
const onCacheError$ = makeObservable(
91-
() =>
92-
mutationCache.config.onError?.(
93-
error as any,
94-
variables,
95-
context,
96-
this as Mutation<any, any, any, any>
97-
)
99+
const onCacheError$ = makeObservable(() =>
100+
mutationCache.config.onError?.(
101+
error as any,
102+
variables,
103+
context,
104+
this as Mutation<any, any, any, any>
105+
)
98106
)
99107

100-
const onOptionError$ = makeObservable(
101-
() => this.options.onError?.(error, variables, context)
108+
const onOptionError$ = makeObservable(() =>
109+
this.options.onError?.(error, variables, context)
102110
)
103111

104112
return concat(onCacheError$, onOptionError$).pipe(toArray())
105113
},
106114
onSettled: (data, error, variables, context) => {
107-
const onCacheSuccess$ = makeObservable(
108-
() =>
109-
mutationCache.config.onSettled?.(
110-
data,
111-
error as Error,
112-
variables,
113-
context,
114-
this as Mutation<any, any, any, any>
115-
)
115+
const onCacheSuccess$ = makeObservable(() =>
116+
mutationCache.config.onSettled?.(
117+
data,
118+
error as Error,
119+
variables,
120+
context,
121+
this as Mutation<any, any, any, any>
122+
)
116123
)
117124

118-
const onOptionSettled$ = makeObservable(
119-
() => this.options.onSettled?.(data, error, variables, context)
125+
const onOptionSettled$ = makeObservable(() =>
126+
this.options.onSettled?.(data, error, variables, context)
120127
)
121128

122129
return concat(onCacheSuccess$, onOptionSettled$).pipe(toArray())
123130
},
124131
onSuccess: (data, variables, context) => {
125-
const onCacheSuccess$ = makeObservable(
126-
() =>
127-
mutationCache.config.onSuccess?.(
128-
data,
129-
variables,
130-
context,
131-
this as Mutation<any, any, any, any>
132-
)
132+
const onCacheSuccess$ = makeObservable(() =>
133+
mutationCache.config.onSuccess?.(
134+
data,
135+
variables,
136+
context,
137+
this as Mutation<any, any, any, any>
138+
)
133139
)
134140

135-
const onOptionSuccess$ = makeObservable(
136-
() => this.options.onSuccess?.(data, variables, context)
141+
const onOptionSuccess$ = makeObservable(() =>
142+
this.options.onSuccess?.(data, variables, context)
137143
)
138144

139145
return concat(onCacheSuccess$, onOptionSuccess$).pipe(toArray())
@@ -145,25 +151,27 @@ export class Mutation<
145151
)
146152
)
147153

148-
this.state$ = merge(
149-
execution$,
150-
/**
151-
* We keep state forever since only a explicit destroy
152-
* may terminate the mutation
153-
*/
154-
NEVER
155-
).pipe(
154+
/**
155+
* @important
156+
* This observable needs to complete on cancelSubject
157+
* otherwise the state will never be freed
158+
*/
159+
const stateChange$ = merge(resetPendingState$, executionState$)
160+
161+
this.state$ = stateChange$.pipe(
156162
startWith(this.state),
163+
scan(
164+
(acc, partialState) => ({
165+
...acc,
166+
...partialState
167+
}),
168+
this.state
169+
),
170+
distinctUntilStateChanged,
157171
tap((value) => {
158-
this.state = { ...this.state, ...value }
172+
this.state = value
159173
}),
160-
takeUntil(this.#cancelSubject),
161-
/**
162-
* refCount as true somewhat make NEVER complete when there are
163-
* no more observers. I thought I should have to complete manually (which is
164-
* why we still cancel the observable when we remove it from cache)
165-
*/
166-
shareReplay({ bufferSize: 1, refCount: false }),
174+
shareReplay(1),
167175
trackSubscriptions((count) => {
168176
this.#observerCount.next(count)
169177
})
@@ -196,7 +204,11 @@ export class Mutation<
196204
return this.execute(this.state.variables as TVariables)
197205
}
198206

199-
// @todo merge with query
207+
/**
208+
* Cancel if needed and finalize the mutation.
209+
* The mutation will be garbage collected automatically
210+
* when no more observers are listening
211+
*/
200212
cancel() {
201213
this.#cancelSubject.next()
202214
this.#cancelSubject.complete()

src/lib/queries/client/mutations/mutation/executeMutation.ts

+2-8
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@ import {
99
iif,
1010
catchError,
1111
scan,
12-
distinctUntilChanged,
1312
shareReplay
1413
} from "rxjs"
1514
import { type MutationOptions, type MutationState } from "./types"
1615
import { makeObservable } from "../../utils/makeObservable"
1716
import { type DefaultError } from "../../types"
18-
import { getDefaultMutationState } from "../defaultMutationState"
19-
import { shallowEqual } from "../../../../utils/shallowEqual"
17+
import { getDefaultMutationState } from "../utils/defaultMutationState"
2018
import { onlineManager } from "../../onlineManager"
2119
import { waitForNetworkOnError } from "./waitForNetworkOnError"
2220
import { delayWhenNetworkOnline } from "./delayWhenNetworkOnline"
@@ -227,11 +225,7 @@ export const executeMutation = <
227225
data: current.data ?? acc.data,
228226
error: current.error ?? acc.error
229227
}
230-
}, getDefaultMutationState<TData, TError, TVariables, TContext>()),
231-
distinctUntilChanged(
232-
({ data: prevData, ...prev }, { data: currData, ...curr }) =>
233-
shallowEqual(prev, curr) && shallowEqual(prevData, currData)
234-
)
228+
}, getDefaultMutationState<TData, TError, TVariables, TContext>())
235229
)
236230

237231
return mutation$

src/lib/queries/client/mutations/mutation/observeUntilFinished.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { type Observable, takeWhile } from "rxjs"
22
import { type MutationState } from "./types"
33

4-
export const observeUntilFinished = <T extends MutationState<any, any, any, any>>(
4+
export const observeUntilFinished = <
5+
T extends MutationState<any, any, any, any>
6+
>(
57
source: Observable<T>
68
) =>
79
source.pipe(

src/lib/queries/client/mutations/mutation/waitForNetworkOnError.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ export const waitForNetworkOnError = <
3737
* timer needed to be iso RQ, so the state returned by mutation include both previous and next one
3838
*/
3939
timer(1).pipe(
40-
mergeMap(() => throwError(() => error).pipe(delayWhenNetworkOnline<T>()))
40+
mergeMap(() =>
41+
throwError(() => error).pipe(delayWhenNetworkOnline<T>())
42+
)
4143
)
4244
)
4345
} else {

src/lib/queries/client/mutations/observers/MutationObserver.ts

+34-5
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import {
1010
last,
1111
filter,
1212
tap,
13-
ignoreElements
13+
ignoreElements,
14+
distinctUntilChanged,
15+
scan
1416
} from "rxjs"
1517
import { type MutateOptions } from "../types"
16-
import { getDefaultMutationState } from "../defaultMutationState"
18+
import { getDefaultMutationState } from "../utils/defaultMutationState"
1719
import { type QueryClient } from "../../QueryClient"
1820
import { type DefaultError } from "../../types"
1921
import { type Mutation } from "../mutation/Mutation"
@@ -27,6 +29,8 @@ import { type MutationState } from "../mutation/types"
2729
import { isDefined } from "../../../../utils/isDefined"
2830
import { matchKey } from "../../keys/matchKey"
2931
import { observeUntilFinished } from "../mutation/observeUntilFinished"
32+
import { shallowEqual } from "../../../../utils/shallowEqual"
33+
import { distinctUntilStateChanged } from "../utils/distinctUntilStateChanged"
3034

3135
/**
3236
* Provide API to observe mutations results globally.
@@ -193,10 +197,35 @@ export class MutationObserver<
193197
)
194198

195199
const mutationResult$ = this.#mutationRunner.state$.pipe(
200+
distinctUntilChanged(
201+
(
202+
{ mutation: prevMutation, state: { data: prevData, ...prev } },
203+
{ mutation: currentMutation, state: { data: currData, ...curr } }
204+
) =>
205+
prevMutation === currentMutation &&
206+
shallowEqual(prev, curr) &&
207+
shallowEqual(prevData, currData)
208+
),
209+
map(({ state }) => state),
210+
scan<
211+
MutationState<TData, TError, TVariables, TContext>,
212+
MutationState<TData, TError, TVariables, TContext>
213+
>((acc, current) => {
214+
return {
215+
...acc,
216+
...current,
217+
...(current.status === "pending" && {
218+
data: current.data ?? acc.data
219+
}),
220+
...(current.status === "pending" && {
221+
error: current.error ?? acc.error
222+
})
223+
}
224+
}),
196225
map((state) => this.getObserverResultFromState(state))
197226
)
198227

199-
const currentMutationCancelled$ = this.#currentMutationSubject.pipe(
228+
const defaultStateOnMutationCancel$ = this.#currentMutationSubject.pipe(
200229
filter(isDefined),
201230
switchMap((mutation) => mutation.mutation.cancelled$),
202231
map(() => this.getObserverResultFromState(getDefaultMutationState()))
@@ -205,8 +234,8 @@ export class MutationObserver<
205234
const result$ = merge(
206235
this.observed$,
207236
mutationResult$,
208-
currentMutationCancelled$
209-
)
237+
defaultStateOnMutationCancel$
238+
).pipe(distinctUntilStateChanged)
210239

211240
return { result$, lastValue }
212241
}

0 commit comments

Comments
 (0)