Skip to content

Commit

Permalink
feat: added concat mutations
Browse files Browse the repository at this point in the history
  • Loading branch information
mbret committed Dec 29, 2024
1 parent c7e1f98 commit d27eafb
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 31 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export * from "./lib/utils"
export * from "./lib/queries/useQuery$"
export * from "./lib/queries/useMutation$"
export * from "./lib/queries/useSwitchMutation$"
export * from "./lib/queries/useConcatMutation$"
export * from "./lib/queries/QueryClientProvider$"


Expand Down
101 changes: 101 additions & 0 deletions src/lib/queries/useConcatMutation$.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import {
DefaultError,
MutationKey,
QueryClient,
useQueryClient
} from "@tanstack/react-query"
import {
BehaviorSubject,
filter,
first,
noop,
Subject,
switchMap
} from "rxjs"
import { useCallback } from "react"
import { useMutation$, UseMutation$Options } from "./useMutation$"

export function useContactMutation$<
TData = unknown,
TError = DefaultError,
TVariables = void,
TContext = unknown
>(
options: UseMutation$Options<TData | null, TError, TVariables, TContext> & {
mutationKey: MutationKey
},
queryClient?: QueryClient
) {
const client = useQueryClient(queryClient)
type TDataOrNull = TData | null
const mutationKey = options.mutationKey

const { mutateAsync, ...rest } = useMutation$<
TDataOrNull,
TError,
{ variables: TVariables; ready$: Subject<boolean> },
TContext
>(
{
...options,
onMutate({ variables }) {
return options.onMutate?.(variables)
},
onSuccess(data, variables, context) {
return options.onSuccess?.(data, variables.variables, context)
},
onError(error, variables, context) {
return options.onError?.(error, variables.variables, context)
},
onSettled(data, error, variables, context) {
return options.onSettled?.(data, error, variables.variables, context)
},
mutationFn: ({ ready$, variables }) => {
const source =
typeof options.mutationFn === "function"
? options.mutationFn(variables)
: options.mutationFn

return ready$.pipe(
filter((isReady) => isReady),
first(),
switchMap(() => source)
)
}
},
queryClient
)

const mutateAsyncConcat = useCallback(
async (variables: TVariables) => {
const mutations = client.getMutationCache().findAll({
mutationKey,
exact: true
})

const subject = new BehaviorSubject(false)

const result = mutateAsync({ variables, ready$: subject })

await Promise.all(
mutations.map((mutation) => mutation.continue().catch(noop))
)

subject.next(true)

return await result.finally(() => {
subject.complete()
})
},
[mutateAsync, client, mutationKey]
)

const mutateConcat = useCallback(
(variables: TVariables) => {
mutateAsyncConcat(variables).catch(noop)
},
[mutateAsyncConcat]
)

return { ...rest, mutate: mutateConcat, mutateAsync: mutateAsyncConcat }
}
56 changes: 26 additions & 30 deletions src/lib/queries/useMutation$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ export type UseMutation$Options<
UseMutationOptions<TData, TError, TVariables, TContext>,
"mutationFn"
> & {
mutationFn:
| ((variables: TVariables) => Observable<TData>)
| Observable<TData>
mutationFn: ((variables: TVariables) => Observable<TData>) | Observable<TData>
}

export function useMutation$<
Expand All @@ -45,36 +43,34 @@ export function useMutation$<
isIdle: true
})

const mutationFnAsync = (variables: TVariables) => {
let lastData: { value: TData } | undefined

return new Promise<TData>((resolve, reject) => {
const source =
typeof options.mutationFn === "function"
? options.mutationFn(variables)
: options.mutationFn

source.pipe(take(1)).subscribe({
next: (data) => {
lastData = { value: data }
},
error: (error) => {
reject(error)
},
complete: () => {
if (lastData === undefined)
return reject(new Error("Stream completed without any data"))

resolve(lastData.value)
}
})
})
}

const result = useMutation<TData, TError, TVariables, TContext>(
{
...options,
mutationFn: mutationFnAsync
mutationFn: (variables: TVariables) => {
let lastData: { value: TData } | undefined

return new Promise<TData>((resolve, reject) => {
const source =
typeof options.mutationFn === "function"
? options.mutationFn(variables)
: options.mutationFn

source.pipe(take(1)).subscribe({
next: (data) => {
lastData = { value: data }
},
error: (error) => {
reject(error)
},
complete: () => {
if (lastData === undefined)
return reject(new Error("Stream completed without any data"))

resolve(lastData.value)
}
})
})
}
},
queryClient
)
Expand Down
3 changes: 2 additions & 1 deletion src/lib/queries/useSwitchMutation$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ export function useSwitchMutation$<
const mutateSwitch = useCallback(
(variables: TVariables) => {
cancel()
mutate(variables)

return mutate(variables)
},
[mutate, cancel]
)
Expand Down

0 comments on commit d27eafb

Please sign in to comment.