Skip to content

Commit ae1ee1b

Browse files
authored
fix: Use duck typing to differentiate between a dedicated and a shared worker (#30)
1 parent fb6ebe5 commit ae1ee1b

File tree

2 files changed

+69
-20
lines changed

2 files changed

+69
-20
lines changed

README.md

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,16 @@
11
# @wjfe/async-workers
22

3-
> Provides thread-safe and atomic synchronization objects, and wrappers to easily use web workers with async/await
4-
> syntax.
3+
> Thread-safe, atomic synchronization objects and asynchronous worker wrappers for the browser and Node environments
54
65
[Live Demo](https://wjsoftware.github.io/wjfe-async-workers)
76

8-
> [!CAUTION]
9-
> This NPM package is in its experimental phase. Features may be incomplete or still requiring thorough testing.
7+
> **⚠️ Caution!**
8+
> This NPM package has had minimal testing under NodeJS + web-worker.
109
11-
## Introduction
10+
## Objectives
1211

13-
Using web workers imply a call to `Worker.postMessge()` to signal the worker you want the work done, and then expect
14-
some notification back via a listener in `Worker.onmessage` to at least know that the work completed, but usually to
15-
get a result back in the form of data (the result of the work). This is just the core, though. You should also add
16-
a listener to `Worker.onerror` just in case the worker has issues processing your request. Otherwise you'll be waiting
17-
forever for the notification, ignorant that an error has occurred and nothing will ever be returned.
18-
19-
Oh, but this is just on the user interface side. Then there's the matter of doing the web worker side. No point in
20-
continuing the explanation. The point is made: This is incredibly cumbersome. Multi-threaded runtimes like .Net can
21-
use `async/await` with threads and is far more convenient. The whole point of this NPM package is to bring this
22-
convenience to the web workers world.
12+
1. To provide friendly `async/await` syntax to the Node and web workers world.
13+
2. To provide thread-safe, atomic synchronization objects like the ones found in other runtimes like .Net
2314

2415
## Quickstart
2516

@@ -30,7 +21,7 @@ to incoming messages.
3021
2. Export the tasks worker object.
3122
3. Create a new instance of `Worker` the way is recommended by your bundler, usually with the syntax
3223
`new Worker("./myworker.js", impot.meta.url)`. However, this forces you to write the worker in JavaScript, at least
33-
for Vite-powered projects.
24+
in Vite-powered projects.
3425
4. Create a new instance of `AsyncWorker` (from this package) by passing the worker object and the tasks object from
3526
the previous points.
3627
5. Start worker tasks by using the `AsyncWorker.enqueue` property. The functions found in this object return an object
@@ -83,11 +74,13 @@ self.onmessage = workerListener(myWorker);
8374

8475
This is a 3-step worker. The worker simply waits to be informed which step to run from the user interface thread.
8576

77+
> ℹ️ Worker tasks may take zero arguments. This is perfectly valid.
78+
8679
### The Async Worker in the UI Thread
8780

8881
This is what needs to be done in order to obtain an object that commands the worker:
8982

90-
> [!IMPORTANT]
83+
> **⚡ Important**
9184
> This example is using TypeScript and the following assumes a Vite-powered project. We are deviating from the
9285
> recommended way of obtaining a worker because the recommended way requires the worker to be written in JavaScript
9386
> while in serve mode (`npm run dev`).
@@ -120,6 +113,17 @@ Yes! The above is valid: You may queue up as many tasks as you wish without ha
120113
previous ones, even if the worker is asynchronous (uses `async/await`). The worker controller will keep perfect record
121114
of the order in which the tasks must be run.
122115

116+
This table shows through examples how various call signatures change from from worker to enqueue object:
117+
118+
| Worker Function | Enqueue Function |
119+
| - | - |
120+
| `init(config: Configuration): void;` | `init(payload: Configuration, options?: QueueingOptions): WorkItem<void>;` |
121+
| `sort(array: MyData[]): MyData[];` | `sort(payload: MyData[], options?: QueueingOptions): WorkItem<MyData[]>;` |
122+
| `shutdown(): void;` | `shutdown(payload: void, options?: QueueingOptions): WorkItem<void>;` |
123+
| `primesBetween(payload: { a: number; b: number; }): number[];` | `primesBetween(payload: { a: number; b: number }): WorkItem<number[]>;` |
124+
125+
Task functions can only take zero or 1 parameter, so if more than one piece of data is needed as payload, pass an object with all the data, as the last example does.
126+
123127
## Shared Workers
124128

125129
Shared workers are also supported through the same `AsyncWorker` class. Note, however, the following key differences:
@@ -139,6 +143,9 @@ self.onconnect = (ev) => {
139143

140144
## Bi-Directional Communication
141145

146+
> **🕓 TL;DR**
147+
> It's OK for workers to transmit intermediate results like progress reports and partial results. It is not recommended for the main thread to have to send data to a paused task. Promises in work item objects resolve once the `QueueingOptions.processMessage()` function returns `true`.
148+
142149
The default functionality is fine for many cases: A worker task is started, the user interface waits for its
143150
completion and when the task finishes, the work item's `promise` property spits out the resultant object when awaited.
144151

@@ -214,7 +221,7 @@ const defaultRunningTotalWorkItem = myWorkerController.enqueue.calculateRunningT
214221
});
215222
```
216223

217-
This is it. Bi-directional communcation is fully set up.
224+
This is it. Bi-directional communication is fully set up.
218225

219226
### How About Sending Something Back?
220227

@@ -246,6 +253,31 @@ export const myWorker = {
246253
Inside `processMessage`, do `myWorkerController.enqueue.supplyMissingOrUpdatedDataWhileInTheAir(theData, { outOfOrder: true })`
247254
and hope for the best.
248255

256+
## Worker Task Cancellation
257+
258+
To fully understand, read [this topic](#cancellationsource) in the section about synchronization objects (coming up next), and also [this other topic about WorkItem class](#the-workitem-class).
259+
260+
To use a cancellation token, specify the `cancellable` option when the task is enqueued:
261+
262+
```typescript
263+
const workItem = myWorkerController.enqueue.doSomething(somePayload, { cancellable: true });
264+
...
265+
// If so needed, cancel at some point.
266+
workItem.cancel();
267+
...
268+
// At any point where the promise is awaited, an error will be thrown.
269+
try {
270+
const result = await workItem.promise;
271+
}
272+
catch (err: unknown) {
273+
if (err instanceof CancelledMessage) {
274+
// The task was cancelled.
275+
}
276+
}
277+
```
278+
279+
To learn about the implementation in the worker side, keep reading.
280+
249281
## Synchronization Objects
250282

251283
This package provides synchronization objects that use `Atomics` to cross the thread boundary safely.
@@ -286,7 +318,7 @@ is already simplified. Taking one line from the quickstart example:
286318
const defaultRunningTotalWorkItem = myWorkerController.enqueue.calculateRunningTotal(undefined, { cancellable: true });
287319
```
288320

289-
By adding the `cancellable` option, a cancellation token will be avilable to `calculateRunningTotal` in its third
321+
By adding the `cancellable` option, a cancellation token will be available to `calculateRunningTotal` in its third
290322
parameter, as seen in the previous section.
291323

292324
Whenever cancellation is desired, simply call the work item's `cancel()` method. For more information about this

src/workers/AsyncWorker.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,23 @@ import { InternalWorker } from "./InternalWorker.js";
66
import { WorkItem } from "./WorkItem.js";
77
import { WorkItemInternal } from "./WorkItemInternal.js";
88

9+
/**
10+
* Determines if a worker-like object is a SharedWorker using duck typing.
11+
* @param worker The worker object to test
12+
* @returns true if the worker appears to be a SharedWorker, false otherwise
13+
*/
14+
function isSharedWorker(worker: any): worker is SharedWorker {
15+
// SharedWorker has a 'port' property (MessagePort) with the required methods
16+
return worker &&
17+
typeof worker === 'object' &&
18+
'port' in worker &&
19+
worker.port &&
20+
typeof worker.port === 'object' &&
21+
typeof worker.port.postMessage === 'function' &&
22+
typeof worker.port.addEventListener === 'function' &&
23+
typeof worker.port.removeEventListener === 'function';
24+
}
25+
926
export type EnqueueFn<Fn extends ((...args: any[]) => any) = (() => any)> =
1027
(payload: Fn extends () => any ? void : Parameters<Fn>[0], options?: QueueingOptions) => WorkItem<ReturnType<Fn>>;
1128

@@ -55,7 +72,7 @@ export class AsyncWorker<Tasks extends Record<string, (...args: any[]) => any>>
5572
#taskRunning;
5673
#enqueue;
5774
constructor(worker: Worker | SharedWorker, tasks: Tasks) {
58-
this.#iWorker = Object.getPrototypeOf(worker).name === 'Worker' ? new InternalWorker(worker as Worker) : new InternalSharedWorker(worker as SharedWorker);
75+
this.#iWorker = isSharedWorker(worker) ? new InternalSharedWorker(worker) : new InternalWorker(worker as Worker);
5976
this.#queue = new Queue<WorkItemInternal>();
6077
this.#taskRunning = false;
6178
this.#enqueue = Object.keys(tasks).reduce((prev, curr) => {

0 commit comments

Comments
 (0)