Skip to content

Commit

Permalink
#22 added PushIterable and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
j50n committed Feb 11, 2022
1 parent 9b14901 commit 54ba0c1
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ deno doc --reload https://deno.land/x/proc/mod.ts 2> /dev/null

- [Simple Examples for Input and Output Handlers](./runners/handlers/README.md)
- [Count Unique Words in _War and Peace_](./examples/warandpeace/README.md)
- [Use `PushIterable` to Implement Workers](./examples/pushiterable/README.md)

## Related Projects

Expand Down
104 changes: 104 additions & 0 deletions examples/pushiterable/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Use `PushIterable` to Implement Workers

`proc` can be used to manage persistent child processes that accept messages
from your parent process and respond back with messages of their own. This is
simlar to web workers but with fewer limitations.

Note that this isn't limited to processes that run Deno. You could just as
easily run something like `grep` or `awk` or something written in Python.

## Run the Example

This example amends the `PATH` temporarily to add the current folder.

```sh
cd ./examples/pushiterable/
PATH=".:$PATH" ./example-of-pushiterable.ts
```

## `example-of-pushiterable.ts`

In this example, I am going to set up a child process that lets me ask a
"question" and get back an "answer." The question is a number `n`, and the
answer includes the number in roman-numeral format and written out in English.
Data is passed to and from the child process via JSON messages.

[example-of-pushiterable.ts](./example-of-pushiterable.ts)

```ts
import { Answer, Question } from "./common-json-defs.ts";
import * as proc from "https://deno.land/x/proc/mod.ts";
import { asynciter } from "https://deno.land/x/asynciter/mod.ts";
import { blue, red } from "https://deno.land/std/fmt/colors.ts";

const it = new proc.PushIterable<Question>();

(async () => {
try {
for (let n = 1; n <= 3; n++) {
console.error(blue(`I am asking about ${n}.`));

const question: Question = { n };
await it.write(question);

await proc.sleep(1000);
}
} finally {
it.close();
}
})();

for await (
const answer: Answer of asynciter(
proc.runner(
proc.stringIterableUnbufferedInput(),
proc.stringIterableUnbufferedOutput(async (stderrLines) => {
for await (const line of stderrLines) {
console.error(red(line));
}
}),
)().run(
{ cmd: ["humanize-numbers.ts"] },
asynciter(it).map(JSON.stringify),
),
).map(JSON.parse)
) {
console.dir(answer);
}
```

## `humanize-numbers.ts`

The child process converts stdin to `Question` instances (unbuffered IO), does
some conversion, and writes the `Answer`s out to `stdout` (that is where
`console.log()` goes).

[humanize-numbers.ts](./humanize-numbers.ts)

```ts
import "https://deno.land/x/humanizer/romanNumerals.ts";
import "https://deno.land/x/humanizer/numberToWords.ts";
import { asynciter } from "https://deno.land/x/asynciter/mod.ts";
import {
readerToBytesUnbuffered,
toLines,
} from "https://deno.land/x/proc/mod.ts";
import { Answer, Question } from "./common-json-defs.ts";

for await (
const question of asynciter(toLines(readerToBytesUnbuffered(Deno.stdin))).map(
(line: string): Question => JSON.parse(line),
)
) {
const n = question.n;
console.error(`humanize errors process question: ${n}`);
const answer: Answer = {
n: question.n,
roman: (n).toRoman(),
words: (n).toWords(),
};
console.log(JSON.stringify(answer));
}

console.error("humanize-numbers process done (normal exit)");
```
9 changes: 9 additions & 0 deletions examples/pushiterable/common-json-defs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export interface Question {
n: number;
}

export interface Answer {
n: number;
roman: string;
words: string;
}
71 changes: 71 additions & 0 deletions examples/pushiterable/example-of-pushiterable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env -S deno run --quiet --allow-run='humanize-numbers.ts'

import { Answer, Question } from "./common-json-defs.ts";
import * as proc from "../../mod.ts";
import { asynciter } from "https://deno.land/x/[email protected]/mod.ts";
import { blue, red } from "https://deno.land/[email protected]/fmt/colors.ts";

/**
* This demonstrates sending objects to and receiving objects from a child process
* using JSON, and with push input and pull output.
*
* This shows how you can use `proc` to implement a worker using child processes.
*/

const it = new proc.PushIterable<Question>();

/*
* Write "questions" to the push-iterable.
*
* This is implemented as an asynchronous iffe, and there is no check to
* ensure that this finishes before the program exits. It will finish before
* the program exits - because the next step will completely consume all the
* writes from the push-iterable.
*
* We are writing a message about each question colored blue to stderr.
*
* The sleep simulates an arbitrary load. These messages could come from
* several sources, including events.
*/
(async () => {
try {
for (let n = 1; n <= 3; n++) {
console.error(blue(`I am asking about ${n}.`));

const question: Question = { n };
await it.write(question);

await proc.sleep(1000);
}
} finally {
it.close();
}
})();

/*
* Consume the push iterable with the child process `humanize-numbers.ts`.
*
* stderr from the child process is written out in red.
*
* The "answers" from the child process are on stdout.
*
* The child process will process questions until the push-iterable
* is exhausted (closed), and then it will shut down normally.
*/
for await (
const answer: Answer of asynciter(
proc.runner(
proc.stringIterableUnbufferedInput(),
proc.stringIterableUnbufferedOutput(async (stderrLines) => {
for await (const line of stderrLines) {
console.error(red(line));
}
}),
)().run(
{ cmd: ["humanize-numbers.ts"] },
asynciter(it).map(JSON.stringify),
),
).map(JSON.parse)
) {
console.dir(answer);
}
23 changes: 23 additions & 0 deletions examples/pushiterable/humanize-numbers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env -S deno run --quiet
import "https://deno.land/x/[email protected]/romanNumerals.ts";
import "https://deno.land/x/[email protected]/numberToWords.ts";
import { asynciter } from "https://deno.land/x/[email protected]/mod.ts";
import { readerToBytesUnbuffered, toLines } from "../../mod.ts";
import { Answer, Question } from "./common-json-defs.ts";

for await (
const question of asynciter(toLines(readerToBytesUnbuffered(Deno.stdin))).map(
(line: string): Question => JSON.parse(line),
)
) {
const n = question.n;
console.error(`humanize errors process question: ${n}`);
const answer: Answer = {
n: question.n,
roman: (n).toRoman(),
words: (n).toWords(),
};
console.log(JSON.stringify(answer));
}

console.error("humanize-numbers process done (normal exit)");
1 change: 1 addition & 0 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export * from "./runners/closers.ts";
export * from "./runners/process-exit-error.ts";
export * from "./runners/chained-error.ts";
export * from "./runners/stderr-support.ts";
export * from "./runners/push-iterable.ts";
export {
readerToBytes,
readerToBytesUnbuffered,
Expand Down
79 changes: 79 additions & 0 deletions runners/push-iterable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
type QueueEntry<T> = { promise: Promise<T>; resolve: (item: T) => void };

class Some<T> {
constructor(public readonly item: T) {
}
}

class None {}

/**
* Invert the normal data flow of an `AsyncIterable`, allowing you to push writes on one side and
* iterate on the other.
*
* The `write()` side **must** call `close()` when all write operations are done.
*/
export class PushIterable<T> implements Deno.Closer {
private closed = false;

private queue: QueueEntry<Some<T> | None>[] = [];

/**
* Create a new `PushIterable`.
*/
constructor() {
this.addEmptyPromiseToQueue();
}

/**
* Add an unresolved promise to the end of the queue.
*/
private addEmptyPromiseToQueue(): void {
let resolve: (item: Some<T> | None) => void;
const promise = new Promise<Some<T> | None>((res, _rej) => {
resolve = res;
});
this.queue.push({ promise, resolve: resolve! });
}

/**
* Close the iterable.
*
* Once closed, subsequent calls to `write(...)` will throw an error.
*
* It is safe to call `close()` multiple times.
*/
close(): void {
this.closed = true;
this.queue[this.queue.length - 1].resolve(new None());
}

/**
* Write an item.
* @param item The item.
*/
async write(item: T): Promise<void> {
if (this.closed) {
throw new Error("already closed");
}

this.queue[this.queue.length - 1].resolve(new Some(item));
this.addEmptyPromiseToQueue();

if (this.queue.length > 1) {
await this.queue[0].promise;
}
}

async *[Symbol.asyncIterator](): AsyncIterableIterator<T> {
while (true) {
const item = await this.queue[0].promise;
if (item instanceof Some) {
yield item.item;
} else {
break;
}
this.queue.shift();
}
}
}

0 comments on commit 54ba0c1

Please sign in to comment.