From 54ba0c1353aa6a5c46475f4699732758740ad62d Mon Sep 17 00:00:00 2001 From: j50n Date: Fri, 11 Feb 2022 15:19:05 -0700 Subject: [PATCH] #22 added PushIterable and examples --- README.md | 1 + examples/pushiterable/README.md | 104 ++++++++++++++++++ examples/pushiterable/common-json-defs.ts | 9 ++ .../pushiterable/example-of-pushiterable.ts | 71 ++++++++++++ examples/pushiterable/humanize-numbers.ts | 23 ++++ mod.ts | 1 + runners/push-iterable.ts | 79 +++++++++++++ 7 files changed, 288 insertions(+) create mode 100644 examples/pushiterable/README.md create mode 100644 examples/pushiterable/common-json-defs.ts create mode 100755 examples/pushiterable/example-of-pushiterable.ts create mode 100755 examples/pushiterable/humanize-numbers.ts create mode 100644 runners/push-iterable.ts diff --git a/README.md b/README.md index bcc5c8e..5f90d98 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ deno doc --reload https://deno.land/x/proc/mod.ts 2> /dev/null - [Simple Examples for Input and Output Handlers](./runners/handlers/README.md) - [Count Unique Words in _War and Peace_](./examples/warandpeace/README.md) +- [Use `PushIterable` to Implement Workers](./examples/pushiterable/README.md) ## Related Projects diff --git a/examples/pushiterable/README.md b/examples/pushiterable/README.md new file mode 100644 index 0000000..90d8d6f --- /dev/null +++ b/examples/pushiterable/README.md @@ -0,0 +1,104 @@ +# Use `PushIterable` to Implement Workers + +`proc` can be used to manage persistent child processes that accept messages +from your parent process and respond back with messages of their own. This is +simlar to web workers but with fewer limitations. + +Note that this isn't limited to processes that run Deno. You could just as +easily run something like `grep` or `awk` or something written in Python. + +## Run the Example + +This example amends the `PATH` temporarily to add the current folder. + +```sh +cd ./examples/pushiterable/ +PATH=".:$PATH" ./example-of-pushiterable.ts +``` + +## `example-of-pushiterable.ts` + +In this example, I am going to set up a child process that lets me ask a +"question" and get back an "answer." The question is a number `n`, and the +answer includes the number in roman-numeral format and written out in English. +Data is passed to and from the child process via JSON messages. + +[example-of-pushiterable.ts](./example-of-pushiterable.ts) + +```ts +import { Answer, Question } from "./common-json-defs.ts"; +import * as proc from "https://deno.land/x/proc/mod.ts"; +import { asynciter } from "https://deno.land/x/asynciter/mod.ts"; +import { blue, red } from "https://deno.land/std/fmt/colors.ts"; + +const it = new proc.PushIterable(); + +(async () => { + try { + for (let n = 1; n <= 3; n++) { + console.error(blue(`I am asking about ${n}.`)); + + const question: Question = { n }; + await it.write(question); + + await proc.sleep(1000); + } + } finally { + it.close(); + } +})(); + +for await ( + const answer: Answer of asynciter( + proc.runner( + proc.stringIterableUnbufferedInput(), + proc.stringIterableUnbufferedOutput(async (stderrLines) => { + for await (const line of stderrLines) { + console.error(red(line)); + } + }), + )().run( + { cmd: ["humanize-numbers.ts"] }, + asynciter(it).map(JSON.stringify), + ), + ).map(JSON.parse) +) { + console.dir(answer); +} +``` + +## `humanize-numbers.ts` + +The child process converts stdin to `Question` instances (unbuffered IO), does +some conversion, and writes the `Answer`s out to `stdout` (that is where +`console.log()` goes). + +[humanize-numbers.ts](./humanize-numbers.ts) + +```ts +import "https://deno.land/x/humanizer/romanNumerals.ts"; +import "https://deno.land/x/humanizer/numberToWords.ts"; +import { asynciter } from "https://deno.land/x/asynciter/mod.ts"; +import { + readerToBytesUnbuffered, + toLines, +} from "https://deno.land/x/proc/mod.ts"; +import { Answer, Question } from "./common-json-defs.ts"; + +for await ( + const question of asynciter(toLines(readerToBytesUnbuffered(Deno.stdin))).map( + (line: string): Question => JSON.parse(line), + ) +) { + const n = question.n; + console.error(`humanize errors process question: ${n}`); + const answer: Answer = { + n: question.n, + roman: (n).toRoman(), + words: (n).toWords(), + }; + console.log(JSON.stringify(answer)); +} + +console.error("humanize-numbers process done (normal exit)"); +``` diff --git a/examples/pushiterable/common-json-defs.ts b/examples/pushiterable/common-json-defs.ts new file mode 100644 index 0000000..f389ab9 --- /dev/null +++ b/examples/pushiterable/common-json-defs.ts @@ -0,0 +1,9 @@ +export interface Question { + n: number; +} + +export interface Answer { + n: number; + roman: string; + words: string; +} diff --git a/examples/pushiterable/example-of-pushiterable.ts b/examples/pushiterable/example-of-pushiterable.ts new file mode 100755 index 0000000..1498d3b --- /dev/null +++ b/examples/pushiterable/example-of-pushiterable.ts @@ -0,0 +1,71 @@ +#!/usr/bin/env -S deno run --quiet --allow-run='humanize-numbers.ts' + +import { Answer, Question } from "./common-json-defs.ts"; +import * as proc from "../../mod.ts"; +import { asynciter } from "https://deno.land/x/asynciter@0.0.7/mod.ts"; +import { blue, red } from "https://deno.land/std@0.125.0/fmt/colors.ts"; + +/** + * This demonstrates sending objects to and receiving objects from a child process + * using JSON, and with push input and pull output. + * + * This shows how you can use `proc` to implement a worker using child processes. + */ + +const it = new proc.PushIterable(); + +/* + * Write "questions" to the push-iterable. + * + * This is implemented as an asynchronous iffe, and there is no check to + * ensure that this finishes before the program exits. It will finish before + * the program exits - because the next step will completely consume all the + * writes from the push-iterable. + * + * We are writing a message about each question colored blue to stderr. + * + * The sleep simulates an arbitrary load. These messages could come from + * several sources, including events. + */ +(async () => { + try { + for (let n = 1; n <= 3; n++) { + console.error(blue(`I am asking about ${n}.`)); + + const question: Question = { n }; + await it.write(question); + + await proc.sleep(1000); + } + } finally { + it.close(); + } +})(); + +/* + * Consume the push iterable with the child process `humanize-numbers.ts`. + * + * stderr from the child process is written out in red. + * + * The "answers" from the child process are on stdout. + * + * The child process will process questions until the push-iterable + * is exhausted (closed), and then it will shut down normally. + */ +for await ( + const answer: Answer of asynciter( + proc.runner( + proc.stringIterableUnbufferedInput(), + proc.stringIterableUnbufferedOutput(async (stderrLines) => { + for await (const line of stderrLines) { + console.error(red(line)); + } + }), + )().run( + { cmd: ["humanize-numbers.ts"] }, + asynciter(it).map(JSON.stringify), + ), + ).map(JSON.parse) +) { + console.dir(answer); +} diff --git a/examples/pushiterable/humanize-numbers.ts b/examples/pushiterable/humanize-numbers.ts new file mode 100755 index 0000000..54b4b8a --- /dev/null +++ b/examples/pushiterable/humanize-numbers.ts @@ -0,0 +1,23 @@ +#!/usr/bin/env -S deno run --quiet +import "https://deno.land/x/humanizer@1.1/romanNumerals.ts"; +import "https://deno.land/x/humanizer@1.1/numberToWords.ts"; +import { asynciter } from "https://deno.land/x/asynciter@0.0.7/mod.ts"; +import { readerToBytesUnbuffered, toLines } from "../../mod.ts"; +import { Answer, Question } from "./common-json-defs.ts"; + +for await ( + const question of asynciter(toLines(readerToBytesUnbuffered(Deno.stdin))).map( + (line: string): Question => JSON.parse(line), + ) +) { + const n = question.n; + console.error(`humanize errors process question: ${n}`); + const answer: Answer = { + n: question.n, + roman: (n).toRoman(), + words: (n).toWords(), + }; + console.log(JSON.stringify(answer)); +} + +console.error("humanize-numbers process done (normal exit)"); diff --git a/mod.ts b/mod.ts index 8b7f304..2b45a02 100644 --- a/mod.ts +++ b/mod.ts @@ -12,6 +12,7 @@ export * from "./runners/closers.ts"; export * from "./runners/process-exit-error.ts"; export * from "./runners/chained-error.ts"; export * from "./runners/stderr-support.ts"; +export * from "./runners/push-iterable.ts"; export { readerToBytes, readerToBytesUnbuffered, diff --git a/runners/push-iterable.ts b/runners/push-iterable.ts new file mode 100644 index 0000000..029e9e9 --- /dev/null +++ b/runners/push-iterable.ts @@ -0,0 +1,79 @@ +type QueueEntry = { promise: Promise; resolve: (item: T) => void }; + +class Some { + constructor(public readonly item: T) { + } +} + +class None {} + +/** + * Invert the normal data flow of an `AsyncIterable`, allowing you to push writes on one side and + * iterate on the other. + * + * The `write()` side **must** call `close()` when all write operations are done. + */ +export class PushIterable implements Deno.Closer { + private closed = false; + + private queue: QueueEntry | None>[] = []; + + /** + * Create a new `PushIterable`. + */ + constructor() { + this.addEmptyPromiseToQueue(); + } + + /** + * Add an unresolved promise to the end of the queue. + */ + private addEmptyPromiseToQueue(): void { + let resolve: (item: Some | None) => void; + const promise = new Promise | None>((res, _rej) => { + resolve = res; + }); + this.queue.push({ promise, resolve: resolve! }); + } + + /** + * Close the iterable. + * + * Once closed, subsequent calls to `write(...)` will throw an error. + * + * It is safe to call `close()` multiple times. + */ + close(): void { + this.closed = true; + this.queue[this.queue.length - 1].resolve(new None()); + } + + /** + * Write an item. + * @param item The item. + */ + async write(item: T): Promise { + if (this.closed) { + throw new Error("already closed"); + } + + this.queue[this.queue.length - 1].resolve(new Some(item)); + this.addEmptyPromiseToQueue(); + + if (this.queue.length > 1) { + await this.queue[0].promise; + } + } + + async *[Symbol.asyncIterator](): AsyncIterableIterator { + while (true) { + const item = await this.queue[0].promise; + if (item instanceof Some) { + yield item.item; + } else { + break; + } + this.queue.shift(); + } + } +}