Skip to content

Commit

Permalink
Merge pull request #6 from sass/new-apis
Browse files Browse the repository at this point in the history
Add more generally useful functionality
  • Loading branch information
nex3 authored Nov 1, 2024
2 parents 13e5d5c + ecc296a commit 356c7fc
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 19 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
## 1.1.0

* Add `SyncMessagePort.receiveMessageIfAvailable()`.

* Add `timeout` and `timeoutValue` options to
`SyncMessagePort.receiveMessage()`.

* Add a `closedValue` option to `SyncMessagePort.receiveMessage()`.

## 1.0.0

* Initial release
168 changes: 166 additions & 2 deletions lib/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as fs from 'fs';
import * as p from 'path';
import {MessagePort, Worker} from 'worker_threads';

import {SyncMessagePort} from './index';
import {SyncMessagePort, TimeoutException} from './index';

describe('SyncMessagePort', () => {
describe('sends a message', () => {
Expand Down Expand Up @@ -73,6 +73,124 @@ describe('SyncMessagePort', () => {
});
});

describe('receiveMessageIfAvailable()', () => {
it('without a queued message', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);
expect(port.receiveMessageIfAvailable()).toBe(undefined);
port.close();
});

it('with a queued message', () => {
const channel = SyncMessagePort.createChannel();
const port1 = new SyncMessagePort(channel.port1);
const port2 = new SyncMessagePort(channel.port2);

port1.postMessage('done!');
expect(port2.receiveMessageIfAvailable()?.message).toBe('done!');
port1.close();
});

it('on a closed channel', () => {
const channel = SyncMessagePort.createChannel();
const port1 = new SyncMessagePort(channel.port1);
const port2 = new SyncMessagePort(channel.port2);

port1.close();
expect(port2.receiveMessageIfAvailable()).toBe(undefined);
});

it('bewteen receiving blocking messages', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);

spawnWorker(
`
// Wait a little bit just to make entirely sure that the parent thread
// is awaiting a message.
setTimeout(() => {
port.postMessage('first');
port.postMessage('second');
setTimeout(() => {
port.postMessage('third');
port.close();
}, 100);
}, 100);
`,
channel.port2,
);

expect(port.receiveMessage()).toEqual('first');
expect(port.receiveMessageIfAvailable()?.message).toEqual('second');
expect(port.receiveMessage()).toEqual('third');
});
});

describe('timeout', () => {
it("returns a value if it's already available", () => {
const channel = SyncMessagePort.createChannel();
const port1 = new SyncMessagePort(channel.port1);
const port2 = new SyncMessagePort(channel.port2);
port1.postMessage('message');
expect(port2.receiveMessage({timeout: 0})).toBe('message');
});

it('returns a value if it becomes available before the timeout', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);

spawnWorker(
`
port.postMessage('ready');
setTimeout(() => {
port.postMessage('message');
port.close();
}, 100);
`,
channel.port2,
);

expect(port.receiveMessage()).toEqual('ready');
expect(port.receiveMessage({timeout: 200})).toEqual('message');
});

it('throws an error if it times out before a value is available', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);
expect(() => port.receiveMessage({timeout: 0})).toThrow(TimeoutException);
});

it('returns timeoutValue if it times out before a value is available', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);
expect(port.receiveMessage({timeout: 0, timeoutValue: 'timed out'})).toBe(
'timed out',
);
});

it('throws an error if the channel closes before the request times out', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);

spawnWorker(
`
port.postMessage('ready');
setTimeout(() => {
port.close();
}, 100);
`,
channel.port2,
);

expect(port.receiveMessage()).toEqual('ready');
// timeoutValue shouldn't take precedence over this error
expect(() =>
port.receiveMessage({timeout: 10000, timeoutValue: 'timed out'}),
).toThrow();
});
});

describe('with an asynchronous listener', () => {
it('receives a message sent before listening', async () => {
const channel = SyncMessagePort.createChannel();
Expand Down Expand Up @@ -127,7 +245,7 @@ describe('SyncMessagePort', () => {
await new Promise(resolve => port2.once('close', resolve));
});

it('receiveMessage() throws an error for a closed port', () => {
it("receiveMessage() throws an error for a port that's already closed", () => {
const channel = SyncMessagePort.createChannel();
const port1 = new SyncMessagePort(channel.port1);
const port2 = new SyncMessagePort(channel.port2);
Expand All @@ -136,6 +254,52 @@ describe('SyncMessagePort', () => {
expect(port1.receiveMessage).toThrow();
expect(port2.receiveMessage).toThrow();
});

it('receiveMessage() throws an error when a port closes', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);

spawnWorker(
`
setTimeout(() => {
port.close();
}, 100);
`,
channel.port2,
);

expect(port.receiveMessage).toThrow();
});

it(
"receiveMessage() returns option.closedValue for a port that's " +
'already closed',
() => {
const channel = SyncMessagePort.createChannel();
const port1 = new SyncMessagePort(channel.port1);
const port2 = new SyncMessagePort(channel.port2);

port1.close();
expect(port1.receiveMessage({closedValue: 'closed'})).toBe('closed');
expect(port2.receiveMessage({closedValue: 'closed'})).toBe('closed');
},
);

it('receiveMessage() throws an error when a port closes', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);

spawnWorker(
`
setTimeout(() => {
port.close();
}, 100);
`,
channel.port2,
);

expect(port.receiveMessage({closedValue: 'closed'})).toBe('closed');
});
});
});

Expand Down
95 changes: 79 additions & 16 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
MessageChannel,
MessagePort,
TransferListItem,
Worker,

Check warning on line 11 in lib/index.ts

View workflow job for this annotation

GitHub Actions / Static analysis

'Worker' is defined but never used
receiveMessageOnPort,
} from 'worker_threads';

Expand Down Expand Up @@ -36,6 +37,40 @@ enum BufferState {
Closed = 0b10,
}

/**
* Options that can be passed to {@link SyncMessagePort.receiveMessage}.
*/
export interface ReceiveMessageOptions {
/**
* The time (in milliseconds) to wait for a message before returning {@link
* timeoutValue} (if set) or throwing a [TimeoutException] otherwise.
*/
timeout?: number;

/**
* If a message isn't received within {@link timeout} milliseconds, this value
* is returned. Ignored if {@link timeout} is not set.
*/
timeoutValue?: unknown;

/**
* If the underlying channel is closed before calling {@link
* SyncMessagePort.receiveMessage} or while a call is pending, return this
* value.
*/
closedValue?: unknown;
}

/**
* An exception thrown by {@link SyncMessagePort.receiveMessage} if a message
* isn't received within {@link ReceivedMessageOptions.timeout} milliseconds.
*/
export class TimeoutException extends Error {
constructor(message: string) {
super(message);
}
}

/**
* A communication port that can receive messages synchronously from another
* `SyncMessagePort`.
Expand Down Expand Up @@ -110,20 +145,36 @@ export class SyncMessagePort extends EventEmitter {
}
}

// TODO(nex3):
// * Add a non-blocking `receiveMessage()`
// * Add a timeout option to `receiveMessage()`
// * Add an option to `receiveMessage()` to return a special value if the
// channel is closed.
/**
* Returns the message sent by the other port, if one is available. This *does
* not* block, and will return `undefined` immediately if no message is
* available. In order to distinguish between a message with value `undefined`
* and no message, a message is return in an object with a `message` field.
*
* This may not be called while this has a listener for the `'message'` event.
* It does *not* throw an error if the port is closed when this is called;
* instead, it just returns `undefined`.
*/
receiveMessageIfAvailable(): {message: unknown} | undefined {
if (this.listenerCount('message')) {
throw new Error(
'SyncMessageChannel.receiveMessageIfAvailable() may not be called ' +
'while there are message listeners.',
);
}

return receiveMessageOnPort(this.port);
}

/**
* Blocks and returns the next message sent by the other port.
*
* This may not be called while this has a listener for the `'message'` event.
* Throws an error if the channel is closed, including if it closes while this
* is waiting for a message.
* is waiting for a message, unless {@link ReceiveMessageOptions.closedValue}
* is passed.
*/
receiveMessage(): unknown {
receiveMessage(options?: ReceiveMessageOptions): unknown {
if (this.listenerCount('message')) {
throw new Error(
'SyncMessageChannel.receiveMessage() may not be called while there ' +
Expand All @@ -136,14 +187,14 @@ export class SyncMessagePort extends EventEmitter {
// `receiveMessageOnPort` and the call to `Atomics.wait()`, we won't
// overwrite it. Use `Atomics.compareExchange` so that we don't overwrite
// the "closed" state.
if (
Atomics.compareExchange(
this.buffer,
0,
BufferState.MessageSent,
BufferState.AwaitingMessage,
) === BufferState.Closed
) {
const previousState = Atomics.compareExchange(
this.buffer,
0,
BufferState.MessageSent,
BufferState.AwaitingMessage,
);
if (previousState === BufferState.Closed) {
if (options && 'closedValue' in options) return options.closedValue;
throw new Error("The SyncMessagePort's channel is closed.");
}

Expand All @@ -153,20 +204,32 @@ export class SyncMessagePort extends EventEmitter {
// If there's no new message, wait for the other port to flip the "new
// message" indicator to 1. If it's been set to 1 since we stored 0, this
// will terminate immediately.
Atomics.wait(this.buffer, 0, BufferState.AwaitingMessage);
const result = Atomics.wait(
this.buffer,
0,
BufferState.AwaitingMessage,
options?.timeout,
);
message = receiveMessageOnPort(this.port);
if (message) return message.message;

if (result === 'timed-out') {
if ('timeoutValue' in options!) return options.timeoutValue;
throw new TimeoutException('SyncMessagePort.receiveMessage() timed out.');
}

// Update the state to 0b10 after the last message is consumed.
const oldState = Atomics.and(this.buffer, 0, BufferState.Closed);
// Assert the old state was either 0b10 or 0b11.
assert.equal(oldState & BufferState.Closed, BufferState.Closed);
if (options && 'closedValue' in options) return options.closedValue;
throw new Error("The SyncMessagePort's channel is closed.");
}

/** See `MessagePort.close()`. */
close(): void {
Atomics.or(this.buffer, 0, BufferState.Closed);
Atomics.notify(this.buffer, 0);
this.port.close();
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "sync-message-port",
"version": "1.0.0",
"version": "1.1.0",
"description": "A Node.js communication port that can pass messages synchronously between workers",
"repository": "sass/sync-message-port",
"author": "Google Inc.",
Expand Down

0 comments on commit 356c7fc

Please sign in to comment.