Skip to content

Commit

Permalink
Add a lawful parallel Applicative instance with derivatives
Browse files Browse the repository at this point in the history
  • Loading branch information
Avaq committed Jul 11, 2023
1 parent 15ad809 commit cf42652
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 71 deletions.
29 changes: 20 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ some derivative functions are exported as well.
- Applicative: Pointed Apply
- Chain: `chain`, `chainFirst`, `bind`
- Monad: Pointed Chain
- ApplyPar: `apPar`, `apFirstPar`, `apSecondPar`, `apSPar`, `getApplySemigroupPar`, `sequenceTPar`, `sequenceSPar`
- ApplicativePar: Pointed ApplyPar

### Service

Expand Down Expand Up @@ -167,26 +169,35 @@ const withMyFile: Service<Error, Dependencies, FS.FileHandle> = (

### Combining services in parallel

The `Bracket` type provides a sequential `Applicative` instance that it uses by
default. To combine services in parallel, there's the following two functions:
The `Bracket` type has a sequential `Applicative` instance that it uses by
default, but there's also a parallel `ApplicativePar` instance that you can use
to combine services in parallel.\* Two very useful derivative function using
`ApplicativePar` are

- `combineStruct`: This can be used in Do-notation in place of `apS`.
- `packStruct`: This can be used as a parallel alternative to `sequenceS`.
- `sequenceSPar` for building a Struct of resources from a Struct of Brackets; and
- `apSPar` for adding another property to an existing Struct of services:

```ts
import {pipe} from 'fp-ts/function';
import * as Bracket from 'fp-ts-bootstrap/Bracket';

const withServices = Bracket.packStruct({
env: withEnv,
logger: withLogger({level: 'info'}),
database: withDatabase({url: 'postgres://localhost:5432'}),
});
const withServices = pipe(
Bracket.sequenceSPar({
env: withEnv,
logger: withLogger({level: 'info'}),
}),
Bracket.apSPar('database', withDatabase({url: 'postgres://localhost:5432'}))
);

const program = withServices(({env, logger, database}) => pipe(
// ...
));
```

\* By "in parallel" we mean that the services are *acquired* in parallel, but
disposed in sequence. This is a technical limitation that exists to ensure that
the `ApplyPar` instance is lawful.

### Threading dependencies during service composition

```ts
Expand Down
80 changes: 55 additions & 25 deletions src/Bracket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,6 @@ const equivalence = <E, R>(
);
};

const equivalenceOnSuccessfullConsumption = <E, R>(
EqR: $E.Eq<R>,
EqE: $E.Eq<E>,
ShowR: $S.Show<R> = ShowUnknown,
ShowE: $S.Show<E> = ShowUnknown,
) => {
const eq = eqBy(
BracketResultEq(EqE, EqR),
BracketResultShow(ShowE, ShowR)
);

return (a: Bracket.Bracket<E, R>, b: Bracket.Bracket<E, R>) => (
Promise.all([runBracket(TE.of)(a), runBracket(TE.of)(b)])
.then(([resultA, resultB]) => eq(resultA, resultB))
);
};

type Err = {error: string};
const ErrEq: $E.Eq<Err> = $E.struct({error: Str.Eq});
const ErrShow: $S.Show<Err> = ({show: (e) => `Err(${Str.Show.show(e.error)})`});
Expand Down Expand Up @@ -156,6 +139,13 @@ const testErr = {error: 'test error'};

const strErrEquivalence = equivalence(Str.Eq, ErrEq, Str.Show, ErrShow)(testErr);

const recordErrEquivalence = equivalence(
R.getEq(Str.Eq),
ErrEq,
R.getShow(Str.Ord)(Str.Show),
ErrShow
)(testErr);

const noDispose = <E>() => TE.of<E, undefined>(undefined);
type Strstr = (str: string) => string;
const composeStrstr = (f: Strstr) => (g: Strstr) => (x: string) => f(g(x));
Expand Down Expand Up @@ -218,6 +208,42 @@ hold('Applicative interchange', FC.asyncProperty(
)
));

hold('ApplyPar composition', FC.asyncProperty(
BracketArb(FC.string()),
BracketArb(StringFunctionArb),
BracketArb(StringFunctionArb),
(mx, mf, mg) => strErrEquivalence(
Bracket.apPar(Bracket.apPar(mx)(mf))(mg),
Bracket.apPar(mx)(Bracket.apPar(mf)(Bracket.map(composeStrstr)(mg)))
)
));

hold('ApplicativePar identity', FC.asyncProperty(
BracketArb(FC.string()),
(mx) => strErrEquivalence(
pipe(Bracket.of<Err, Strstr>(identity), Bracket.apPar(mx)),
mx,
)
));

hold('ApplicativePar homomorphism', FC.asyncProperty(
FC.string(),
StringFunctionArb,
(x, f) => strErrEquivalence(
pipe(Bracket.of<Err, Strstr>(f), Bracket.apPar(Bracket.of(x))),
Bracket.of(f(x))
)
));

hold('ApplicativePar interchange', FC.asyncProperty(
FC.string(),
BracketArb(StringFunctionArb),
(x, mf) => strErrEquivalence(
pipe(mf, Bracket.apPar(Bracket.of(x))),
pipe(Bracket.of<Err, (f: Strstr) => string>(f => f(x)), Bracket.apPar(mf))
)
));

hold('Chain associativity', FC.asyncProperty(
BracketArb(FC.string()),
BracketFunctionArb,
Expand Down Expand Up @@ -293,16 +319,20 @@ hold('of(x) = bracket(TE.of(x), noDispose)', FC.asyncProperty(
)
));

hold('sequenceS({a: ma, b: mb}) ~= packStruct({a: ma, b: mb})', FC.asyncProperty(
hold('ap(mx)(mf) = apPar(mx)(mf)', FC.asyncProperty(
BracketArb(FC.string()),
BracketArb(StringFunctionArb),
(mx, mf) => strErrEquivalence(
Bracket.ap(mx)(mf),
Bracket.apPar(mx)(mf)
)
));

hold('sequenceS({a: ma, b: mb}) = sequenceSPar({a: ma, b: mb})', FC.asyncProperty(
BracketArb(FC.string()),
BracketArb(FC.string()),
(ma, mb) => equivalenceOnSuccessfullConsumption(
R.getEq(Str.Eq),
ErrEq,
R.getShow(Str.Ord)(Str.Show),
ErrShow
)(
(ma, mb) => recordErrEquivalence(
Bracket.sequenceS({a: ma, b: mb}),
Bracket.packStruct({a: ma, b: mb})
Bracket.sequenceSPar({a: ma, b: mb})
)
));
92 changes: 55 additions & 37 deletions src/Bracket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import * as $Applicative from 'fp-ts/Applicative';
import * as $Chain from 'fp-ts/Chain';
import * as $Monad from 'fp-ts/Monad';
import * as TE from 'fp-ts/TaskEither';
import * as T from 'fp-ts/Task';
import * as E from 'fp-ts/Either';
import * as O from 'fp-ts/Option';
import {NaturalTransformation22} from 'fp-ts/NaturalTransformation';
import {constVoid, pipe} from 'fp-ts/function';
import {pipe} from 'fp-ts/function';

export const URI = 'fp-ts-bootstrap/Bracket';
export type URI = typeof URI;
Expand Down Expand Up @@ -73,58 +72,77 @@ export const sequenceS = $Apply.sequenceS(Apply);

export const Applicative: $Applicative.Applicative2<URI> = {...Pointed, ...Apply};

const UnlawfulApplyPar: $Apply.Apply2<URI> = {
export const ApplyPar: $Apply.Apply2<URI> = {
...Functor,
ap: <E, A, B>(fab: Bracket<E, (a: A) => B>, fa: Bracket<E, A>) => (
<T>(consume: (resource: B) => TE.TaskEither<E, T>): TE.TaskEither<E, T> => (
() => new Promise((resolve, reject) => {
() => {
let ab: O.Option<(a: A) => B> = O.none;
let a: O.Option<A> = O.none;
let resolveOther: (value: E.Either<E, T>) => void = constVoid;
let ran = false;

let run = () => new Promise<E.Either<E, T>>((resolve, reject) => {
if (O.isSome(ab) && O.isSome(a) && !ran) {
ran = true;
consume(ab.value(a.value))().then(ret => {
resolve(ret);
resolveOther(ret);
}, reject);
} else {
resolveOther = resolve;

let resolvedFa: O.Option<E.Either<E, T>> = O.none;
let resolveFa = (value: E.Either<E, T>) => {
resolvedFa = O.some(value);
};

let resolvedFab: O.Option<E.Either<E, T>> = O.none;
let resolveFab = (value: E.Either<E, T>) => {
resolvedFab = O.some(value);
};

const promiseFa = fa(x => () => {
if (O.isSome(resolvedFa)) {
return Promise.resolve(resolvedFa.value);
}
if (O.isSome(ab)) {
return consume(ab.value(x))();
}
return new Promise<E.Either<E, T>>(resolve => {
a = O.some(x);
resolveFa = resolve;
});
})().then(ea => {
resolveFab(ea);
return ea;
});

const taskAB = pipe(fab(f => {
ab = O.some(f);
return run;
}), T.map(eab => {
resolveOther(eab);
run = T.of(eab);
const promiseFab = fab(f => () => {
if (O.isSome(resolvedFab)) {
return Promise.resolve(resolvedFab.value);
}
if (O.isSome(a)) {
return consume(f(a.value))().then(ret => {
resolveFa(ret);
return promiseFa.then(retFa => pipe(retFa, E.apSecond(ret)));
});
}
return new Promise<E.Either<E, T>>(resolve => {
ab = O.some(f);
resolveFab = resolve;
});
})().then(eab => {
resolveFa(eab);
return eab;
}));

const taskA = pipe(fa(x => {
a = O.some(x);
return run;
}), T.map(ea => {
resolveOther(ea);
run = T.of(ea);
return ea;
}));
});

pipe(taskAB, TE.apFirst(taskA))().then(resolve, reject);
})
return Promise.all([promiseFab, promiseFa]).then(([eab]) => eab);
}
)
),
};

export const apPar = <E, A>(fa: Bracket<E, A>) => (
<B>(fab: Bracket<E, (a: A) => B>) => UnlawfulApplyPar.ap(fab, fa)
<B>(fab: Bracket<E, (a: A) => B>) => ApplyPar.ap(fab, fa)
);

export const combineStruct = $Apply.apS(UnlawfulApplyPar);
export const packStruct = $Apply.sequenceS(UnlawfulApplyPar);
export const apFirstPar = $Apply.apFirst(ApplyPar);
export const apSecondPar = $Apply.apSecond(ApplyPar);
export const apSPar = $Apply.apS(ApplyPar);
export const getApplySemigroupPar = $Apply.getApplySemigroup(ApplyPar);
export const sequenceTPar = $Apply.sequenceT(ApplyPar);
export const sequenceSPar = $Apply.sequenceS(ApplyPar);

export const ApplicativePar: $Applicative.Applicative2<URI> = {...Pointed, ...ApplyPar};

export const Chain: $Chain.Chain2<URI> = {
...Apply,
Expand Down
1 change: 1 addition & 0 deletions src/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ export type ResourceOf<S extends Service<any, any, any>> = (
export const of = RT.of(B.Pointed);
export const map = RT.map(B.Functor);
export const ap = RT.ap(B.Apply);
export const apPar = RT.ap(B.ApplyPar);
export const chain = RT.chain(B.Chain);
export const fromReader = RT.fromReader(B.Pointed);

0 comments on commit cf42652

Please sign in to comment.