Skip to content

Commit

Permalink
fix(ext/node): emit online event after worker thread is initialized (
Browse files Browse the repository at this point in the history
…#25243)

Fixes #23281. Part of #20613.

We were emitting the `online` event in the constructor, so the caller
could never receive it (since there was no time for them to add a
listener). Instead, emit the event where it's intended – after the
worker is initialized.

---

After this parcel no longer freezes, but still will fail due to other
bugs (which will be fixed in other PRs)
  • Loading branch information
nathanwhit committed Aug 28, 2024
1 parent 3dba985 commit 511d13a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 3 deletions.
37 changes: 34 additions & 3 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ function debugWT(...args) {
}
}

interface WorkerOnlineMsg {
type: "WORKER_ONLINE";
}

function isWorkerOnlineMsg(data: unknown): data is WorkerOnlineMsg {
return typeof data === "object" && data !== null &&
ObjectHasOwn(data, "type") &&
(data as { "type": unknown })["type"] === "WORKER_ONLINE";
}

export interface WorkerOptions {
// only for typings
argv?: unknown[];
Expand Down Expand Up @@ -81,6 +91,7 @@ class NodeWorker extends EventEmitter {
#refCount = 1;
#messagePromise = undefined;
#controlPromise = undefined;
#workerOnline = false;
// "RUNNING" | "CLOSED" | "TERMINATED"
// "TERMINATED" means that any controls or messages received will be
// discarded. "CLOSED" means that we have received a control
Expand Down Expand Up @@ -141,6 +152,7 @@ class NodeWorker extends EventEmitter {
workerData: options?.workerData,
environmentData: environmentData,
env: env_,
isWorkerThread: true,
}, options?.transferList ?? []);
const id = op_create_worker(
{
Expand All @@ -159,8 +171,6 @@ class NodeWorker extends EventEmitter {
this.threadId = id;
this.#pollControl();
this.#pollMessages();
// https://nodejs.org/api/worker_threads.html#event-online
this.emit("online");
}

[privateWorkerRef](ref) {
Expand Down Expand Up @@ -243,7 +253,17 @@ class NodeWorker extends EventEmitter {
this.emit("messageerror", err);
return;
}
this.emit("message", message);
if (
// only emit "online" event once, and since the message
// has to come before user messages, we are safe to assume
// it came from us
!this.#workerOnline && isWorkerOnlineMsg(message)
) {
this.#workerOnline = true;
this.emit("online");
} else {
this.emit("message", message);
}
}
};

Expand Down Expand Up @@ -358,10 +378,12 @@ internals.__initWorkerThreads = (

parentPort = globalThis as ParentPort;
threadId = workerId;
let isWorkerThread = false;
if (maybeWorkerMetadata) {
const { 0: metadata, 1: _ } = maybeWorkerMetadata;
workerData = metadata.workerData;
environmentData = metadata.environmentData;
isWorkerThread = metadata.isWorkerThread;
const env = metadata.env;
if (env) {
process.env = env;
Expand Down Expand Up @@ -425,6 +447,15 @@ internals.__initWorkerThreads = (
parentPort.ref = () => {
parentPort[unrefPollForMessages] = false;
};

if (isWorkerThread) {
// Notify the host that the worker is online
parentPort.postMessage(
{
type: "WORKER_ONLINE",
} satisfies WorkerOnlineMsg,
);
}
}
};

Expand Down
31 changes: 31 additions & 0 deletions tests/unit_node/worker_threads_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,34 @@ Deno.test({
channel.port2.close();
},
});

Deno.test({
name: "[node/worker_threads] Emits online event",
async fn() {
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
const p = Promise.withResolvers();
let ok = false;
parentPort.on("message", () => {
ok = true;
p.resolve();
});
await Promise.race([p.promise, new Promise(resolve => setTimeout(resolve, 20000))]);
if (ok) {
parentPort.postMessage("ok");
} else {
parentPort.postMessage("timed out");
}
`,
{
eval: true,
},
);
worker.on("online", () => {
worker.postMessage("ok");
});
assertEquals((await once(worker, "message"))[0], "ok");
worker.terminate();
},
});

0 comments on commit 511d13a

Please sign in to comment.