Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
node_modules
dist
lcov.info
tsconfig.tsbuildinfo
1 change: 1 addition & 0 deletions eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module.exports = require('neostandard')({
semi: true,
ts: true,
noStyle: true,
noJsx: true,
ignores: ['dist', 'node_modules', 'docs/build', 'docs/.docusaurus'],
globals: {
SharedArrayBuffer: true,
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"scripts": {
"build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs",
"build:clean": "rm -rf ./dist tsconfig.tsbuildinfo",
"lint": "eslint && tsc --noEmit && tsc --project test/tsconfig.json --noEmit",
"test": "node scripts/run-tests.js --pattern='test/**/*test.ts'",
"test:ci": "npm run build && npm run lint && npm run test:coverage",
Expand Down
12 changes: 11 additions & 1 deletion scripts/run-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ const options = {
short: 'o',
description: 'Run only tests marked with { only: true }',
default: false,
},
verbose: {
type: 'boolean',
description: 'Enable verbose output',
default: false,
}
};

Expand All @@ -36,9 +41,13 @@ const {
const pattern = values.pattern;
const isCoverage = values.coverage;
const runOnly = values.only;
const log = values.verbose ? console.log : () => {};

const testFiles = globSync(pattern, { absolute: true });

log(`Running tests with pattern: ${pattern}`);
log(`-- Files: ${testFiles.join(', ')}`);

const args = [
'--enable-source-maps',
'--import=tsx',
Expand All @@ -47,8 +56,9 @@ const args = [
];


let result;
log(`Args: \t\n${args.join('\t\n')}`);

let result;
// we skip coverage for node 20
// because this issuse happen https://github.com/nodejs/node/pull/53315
if (isCoverage && !process.version.startsWith('v20.')) {
Expand Down
2 changes: 0 additions & 2 deletions src/abort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ export type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;

export class AbortError extends Error {
constructor (reason?: AbortSignalEventTarget['reason']) {
// TS does not recognizes the cause clause
// @ts-expect-error
super('The task has been aborted', { cause: reason });
}

Expand Down
68 changes: 45 additions & 23 deletions src/common.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { fileURLToPath, URL } from 'node:url';
import { availableParallelism } from 'node:os';
import { fileURLToPath, URL } from "node:url";
import { availableParallelism } from "node:os";

import { kMovable, kTransferable, kValue } from './symbols';
import { kMovable, kTransferable, kValue } from "./symbols";

// States wether the worker is ready to receive tasks
export const READY = '_WORKER_READY';
export const READY = "_WORKER_READY";

/**
* True if the object implements the Transferable interface
Expand All @@ -13,10 +13,10 @@ export const READY = '_WORKER_READY';
* @param {unknown} value
* @return {*} {boolean}
*/
export function isTransferable (value: unknown): boolean {
export function isTransferable(value: unknown): boolean {
return (
value != null &&
typeof value === 'object' &&
typeof value === "object" &&
kTransferable in value &&
kValue in value
);
Expand All @@ -31,49 +31,71 @@ export function isTransferable (value: unknown): boolean {
* @param {(unknown & PiscinaMovable)} value
* @return {*} {boolean}
*/
export function isMovable (value: any): boolean {
export function isMovable(value: any): boolean {
return isTransferable(value) && value[kMovable] === true;
}

export function markMovable (value: {}): void {
export function markMovable(value: {}): void {
Object.defineProperty(value, kMovable, {
enumerable: false,
configurable: true,
writable: true,
value: true
value: true,
});
}

// State of Piscina pool
export const commonState = {
isWorkerThread: false,
workerData: undefined
workerData: undefined,
};

export function maybeFileURLToPath (filename : string) : string {
return filename.startsWith('file:')
export function maybeFileURLToPath(filename: string): string {
return filename.startsWith("file:")
? fileURLToPath(new URL(filename))
: filename;
}

export function getAvailableParallelism () : number {
export function getAvailableParallelism(): number {
return availableParallelism();
}

export function promiseResolvers <T = any> () :
{ promise: Promise<T>,
resolve: (res: T) => void,
reject: (err: Error) => void
} {
export function promiseResolvers<T = any>(): {
promise: Promise<T>;
resolve: (res: T) => void;
reject: (err: Error) => void;
} {
// @ts-expect-error - available from v24 onwards
if (Promise.withResolvers != null) return Promise.withResolvers();

let res: (res: T) => void;
let rej: (err: Error) => void
let rej: (err: Error) => void;

return {
promise: new Promise<T>((resolve, reject) => { res = resolve; rej = reject; } ),
promise: new Promise<T>((resolve, reject) => {
res = resolve;
rej = reject;
}),
resolve: res!,
reject: rej!
}
}
reject: rej!,
};
}

// Ring Buffer
export const RING_BUFFER_INDEXES = {
READ_INDEX: 0,
WRITE_INDEX: 1,
STATUS_INDEX: 2,
};

export const RING_BUFFER_STATUSES = {
PAUSED: 0,
RESUME: 1,
ENDED: 2,
ERRORED: 3, // ?
};

export const GeneratorFunctionConstructor = (function*(){}).constructor.name;
export const AsyncGeneratorConstructor = (async function*(){}).constructor.name;
export const AsyncFunctionConstructor = (async function(){}).constructor.name;
export const FunctionConstructor = (function(){}).constructor.name;
71 changes: 51 additions & 20 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ interface RunOptions<Func = string> {
transferList? : TransferList,
filename? : string | null,
signal? : AbortSignalAny | null,
name? : Func | null
name? : Func | null,
bufferSize?: number | null
}

interface FilledRunOptions<Func extends string = string> extends RunOptions<Func> {
transferList : TransferList | never,
filename : string | null,
signal : AbortSignalAny | null,
name : Func | null
name : Func | null,
bufferSize?: number | null
}

interface CloseOptions {
Expand Down Expand Up @@ -135,7 +137,8 @@ const kDefaultRunOptions : FilledRunOptions = {
transferList: undefined,
filename: null,
signal: null,
name: null
name: null,
bufferSize: null,
};

const kDefaultCloseOptions : Required<CloseOptions> = {
Expand Down Expand Up @@ -285,20 +288,25 @@ class ThreadPool {
this.workers.add(workerInfo);

function onMessage (this: ThreadPool, message : ResponseMessage) {
const { taskId, result } = message;
const { taskId, result, done } = message;
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, which marks it as
// free again.
const taskInfo = workerInfo.popTask(taskId);
this.workers.taskDone(workerInfo);
const taskInfo = done === 0 ? workerInfo.getTask(taskId) : workerInfo.popTask(taskId);


if (taskInfo == null) { /* c8 ignore next */
const err = new Error(
`Unexpected message from Worker: ${inspect(message)}`);
this.publicInterface.emit('error', err);
this._processPendingMessages();
return;
}

if (done === 1) {
this.workers.taskDone(workerInfo);
taskInfo!.done(message.error, result);
} else {
taskInfo.done(message.error, result);
taskInfo!.handleResponse(message);
}

this._processPendingMessages();
Expand Down Expand Up @@ -471,13 +479,15 @@ class ThreadPool {
options : RunOptions<string>) : Promise<any> {
let {
filename,
name
name,
bufferSize
} = options;
const {
transferList = []
} = options;
filename = filename ?? this.options.filename;
name = name ?? this.options.name;
bufferSize = bufferSize ?? 1024 * 1024;

if (typeof filename !== 'string') {
return Promise.reject(Errors.FilenameNotProvided());
Expand All @@ -501,21 +511,30 @@ class ThreadPool {
transferList,
filename,
name,
bufferSize,
abortSignal: signal,
triggerAsyncId: this.publicInterface.asyncResource.asyncId()
},
(err : Error | null, result : any) => {
this.completed++;
if (taskInfo.started) {
this.histogram?.recordRunTime(performance.now() - taskInfo.started);
}
if (err !== null) {
reject(err);
} else {
resolve(result);
}
queueMicrotask(this._maybeDrain.bind(this))
this.completed++;
if (taskInfo.started) {
this.histogram?.recordRunTime(performance.now() - taskInfo.started);
}

// Promise already resolved, this is for statistics only
if (taskInfo.redeable != null) return;

queueMicrotask(this._maybeDrain.bind(this))
if (err !== null) {
reject(err);
} else {
resolve(result);
}
},
// Until now, we just assume this is a streamed response and we jump into handling the chunks as stream
(err) => {
if (err != null) reject(err);
else resolve(taskInfo.redeable);
});

if (signal != null) {
Expand Down Expand Up @@ -782,7 +801,8 @@ export default class Piscina<Exports extends Record<string, (payload: any) => an
transferList,
filename,
name,
signal
signal,
bufferSize
} = options;
if (transferList !== undefined && !Array.isArray(transferList)) {
return Promise.reject(
Expand All @@ -799,6 +819,17 @@ export default class Piscina<Exports extends Record<string, (payload: any) => an
return Promise.reject(
new TypeError('signal argument must be an object'));
}
if (bufferSize != null &&
(
typeof bufferSize !== 'number'||
!Number.isInteger(bufferSize) ||
!Number.isFinite(bufferSize) ||
bufferSize <= 0
)
) {
return Promise.reject(
new TypeError('bufferSize argument must be a finite integer'));
}

return this.#pool.runTask(task, { transferList, filename, name, signal });
}
Expand Down
Loading
Loading