@@ -9,7 +9,11 @@ import {
9
9
iif ,
10
10
catchError ,
11
11
scan ,
12
- shareReplay
12
+ shareReplay ,
13
+ isEmpty ,
14
+ tap ,
15
+ share ,
16
+ ignoreElements
13
17
} from "rxjs"
14
18
import { type MutationOptions , type MutationState } from "./types"
15
19
import { makeObservable } from "../../utils/makeObservable"
@@ -43,16 +47,16 @@ export const executeMutation = <
43
47
44
48
const mutationFn = options . mutationFn ?? defaultFn
45
49
46
- const onOptionMutate$ = iif (
47
- ( ) => isPaused ,
48
- of ( state . context ) ,
49
- makeObservable (
50
- // eslint-disable-next-line @typescript-eslint/promise-function-async
51
- ( ) => options . onMutate ?.( variables ) ?? undefined
52
- )
50
+ const contextFromOnMutate$ = makeObservable (
51
+ // eslint-disable-next-line @typescript-eslint/promise-function-async
52
+ ( ) => options . onMutate ?.( variables ) ?? undefined
53
53
)
54
54
55
- const onMutate$ = onOptionMutate$ . pipe ( shareReplay ( 1 ) )
55
+ const rawContext$ = of ( state . context )
56
+
57
+ const context$ = iif ( ( ) => isPaused , rawContext$ , contextFromOnMutate$ ) . pipe (
58
+ shareReplay ( 1 )
59
+ )
56
60
57
61
type QueryState = Omit < Partial < LocalState > , "data" > & {
58
62
// add layer to allow undefined as mutation result
@@ -78,15 +82,27 @@ export const executeMutation = <
78
82
)
79
83
}
80
84
81
- const queryRunner$ = onMutate $. pipe (
85
+ const queryRunner$ = context $. pipe (
82
86
switchMap ( ( context ) => {
83
87
const fn$ =
84
88
typeof mutationFn === "function"
85
89
? // eslint-disable-next-line @typescript-eslint/promise-function-async
86
90
makeObservable ( ( ) => mutationFn ( variables ) )
87
91
: mutationFn
88
92
89
- const finalFn$ = fn$ . pipe (
93
+ const sharedFn$ = fn$ . pipe ( share ( ) )
94
+
95
+ const completeWithoutValue$ = sharedFn$ . pipe (
96
+ isEmpty ( ) ,
97
+ tap ( ( isEmppty ) => {
98
+ if ( isEmppty ) {
99
+ throw new Error ( "Mutation completed without any emission (EMPTY)" )
100
+ }
101
+ } ) ,
102
+ ignoreElements ( )
103
+ )
104
+
105
+ const finalFn$ = merge ( sharedFn$ , completeWithoutValue$ ) . pipe (
90
106
map (
91
107
( data ) : QueryState => ( {
92
108
result : {
@@ -152,7 +168,7 @@ export const executeMutation = <
152
168
const mutation$ = merge (
153
169
initState$ ,
154
170
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
155
- onMutate $. pipe ( map ( ( context ) => ( { context } ) as Partial < LocalState > ) ) ,
171
+ context $. pipe ( map ( ( context ) => ( { context } ) as Partial < LocalState > ) ) ,
156
172
queryRunner$ . pipe (
157
173
switchMap ( ( { result : mutationData , error, ...restState } ) => {
158
174
if ( ! mutationData && ! error )
@@ -161,7 +177,7 @@ export const executeMutation = <
161
177
...restState
162
178
} as Partial < LocalState > )
163
179
164
- const onSuccess $ = error
180
+ const success $ = error
165
181
? of ( null )
166
182
: makeObservable ( ( ) =>
167
183
options . onSuccess ?.(
@@ -180,11 +196,11 @@ export const executeMutation = <
180
196
)
181
197
)
182
198
183
- const onSettled $ = onOptionSettled$ . pipe (
199
+ const settled $ = onOptionSettled$ . pipe (
184
200
catchError ( ( error ) => ( mutationData ? of ( mutationData ) : of ( error ) ) )
185
201
)
186
202
187
- const result$ = concat ( onSuccess $, onSettled $) . pipe (
203
+ const result$ = concat ( success $, settled $) . pipe (
188
204
toArray ( ) ,
189
205
map ( ( ) =>
190
206
error
0 commit comments