Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: merge staging to master #41

Merged
merged 7 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions benches/dummy.ts

This file was deleted.

2 changes: 0 additions & 2 deletions benches/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import fs from 'fs';
import path from 'path';
import si from 'systeminformation';
import Stream1KB from './stream_1KB';
// Import Dummy from './dummy';

async function main(): Promise<void> {
await fs.promises.mkdir(path.join(__dirname, 'results'), { recursive: true });
// Running benches
await Stream1KB();
// Await Dummy();
const resultFilenames = await fs.promises.readdir(
path.join(__dirname, 'results'),
);
Expand Down
36 changes: 36 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/QUICConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,10 @@ class QUICConnection extends EventTarget {
this.dispatchEvent(
new events.QUICConnectionStreamEvent({ detail: quicStream }),
);
// No need to read after creation, doing so will throw during early cancellation
} else {
quicStream.read();
}
quicStream.read();
}
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
const quicStream = this.streamMap.get(streamId);
Expand Down
91 changes: 56 additions & 35 deletions src/QUICStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ class QUICStream
// create Peer state.
try {
connection.conn.streamSend(streamId, new Uint8Array(0), false);
} catch {
// FIXME: If there is an error here then stream will not create? Maybe we should abort?
} catch (e) {
// We ignore any errors here, if this is a server side stream then state already exists.
// But it's possible for the stream to already be closed or have an error here.
// These errors will be handled by the QUICStream and not here.
}
const stream = new this({
streamId,
Expand Down Expand Up @@ -148,38 +150,44 @@ class QUICStream
const buf = Buffer.alloc(1024);
let recvLength: number, fin: boolean;
// Read messages until buffer is empty
while (true) {
try {
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
} catch (e) {
this.logger.debug(`Stream recv reported: error ${e.message}`);
// Done means there is no more data to read
if (!this._recvClosed && e.message !== 'Done') {
const reason =
(await this.processSendStreamError(e, 'recv')) ?? e;
// If it is `StreamReset(u64)` error, then the peer has closed
// the stream, and we are receiving the error code
// If it is not a `StreamReset(u64)`, then something else broke,
// and we need to propagate the error up and down the stream
controller.error(reason);
await this.closeRecv(true, reason);
try {
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
} catch (e) {
this.logger.debug(`Stream recv reported: error ${e.message}`);
// Done means there is no more data to read
if (!this._recvClosed && e.message !== 'Done') {
const reason =
(await this.processSendStreamError(e, 'recv')) ?? e;
// If it is `StreamReset(u64)` error, then the peer has closed
// the stream, and we are receiving the error code
// If it is not a `StreamReset(u64)`, then something else broke,
// and we need to propagate the error up and down the stream
controller.error(reason);
await this.closeRecv(true, reason);
// It is possible the stream was cancelled, let's check the writable state;
try {
this.conn.streamWritable(this.streamId, 0);
} catch (e) {
const match = e.message.match(/InvalidStreamState\((.+)\)/);
if (match == null) {
return never(
'Errors besides [InvalidStreamState(StreamId)] are not expected here',
);
}
this.writableController.error(reason);
}
break;
}
this.logger.debug(
`stream read ${recvLength} bytes with fin(${fin})`,
);
// Check and drop if we're already closed or message is 0-length message
if (!this._recvClosed && recvLength > 0) {
this.readableController.enqueue(buf.subarray(0, recvLength));
}
// If fin is true, then that means, the stream is CLOSED
if (fin) {
await this.closeRecv();
controller.close();
// Return out of the loop
break;
}
return;
}
this.logger.debug(`stream read ${recvLength} bytes with fin(${fin})`);
// Check and drop if we're already closed or message is 0-length message
if (!this._recvClosed && recvLength > 0) {
controller.enqueue(buf.subarray(0, recvLength));
}
// If fin is true, then that means, the stream is CLOSED
if (fin) {
await this.closeRecv();
controller.close();
}
},
cancel: async (reason) => {
Expand Down Expand Up @@ -319,8 +327,9 @@ class QUICStream
this.readableController.close();
}
} catch (e) {
// Ignore if done, not normally meant to happen but possible in rare cases
if (e.message !== 'Done') {
if (e.message === 'Done') {
never();
} else {
this.logger.debug(`Stream recv reported: error ${e.message}`);
if (!this._recvClosed) {
// Close stream in background
Expand All @@ -329,6 +338,18 @@ class QUICStream
(await this.processSendStreamError(e, 'recv')) ?? e;
this.readableController.error(reason);
await this.closeRecv(true, reason);
// It is possible the stream was cancelled, let's check the writable state;
try {
this.conn.streamWritable(this.streamId, 0);
} catch (e) {
const match = e.message.match(/InvalidStreamState\((.+)\)/);
if (match == null) {
return never(
'Errors besides [InvalidStreamState(StreamId)] are not expected here',
);
}
this.writableController.error(reason);
}
})();
}
}
Expand Down Expand Up @@ -488,7 +509,7 @@ class QUICStream
match = e.message.match(/InvalidStreamState\((.+)\)/);
if (match != null) {
// `InvalidStreamState()` returns the stream ID and not any actual error code
return await this.codeToReason(type, 0);
return never('Should never reach an [InvalidState(StreamId)] error');
}
return null;
}
Expand Down
4 changes: 2 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ function decodeConnectionId(connIdString: ConnectionIdString): ConnectionId {
return Buffer.from(connIdString, 'hex') as ConnectionId;
}

function never(): never {
throw new errors.ErrorQUICUndefinedBehaviour();
function never(message?: string): never {
throw new errors.ErrorQUICUndefinedBehaviour(message);
}

function certificateDERToPEM(der: Uint8Array): string {
Expand Down
46 changes: 35 additions & 11 deletions tests/QUICStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,13 @@ describe(QUICStream.name, () => {
await client.destroy({ force: true });
await server.stop({ force: true });
});
test('streams can be cancelled', async () => {
test('streams can be cancelled after data sent', async () => {
const cancelReason = Symbol('CancelReason');
const connectionEventProm =
utils.promise<events.QUICServerConnectionEvent>();
const tlsConfig1 = await generateConfig(defaultType);
const tlsConfig2 = await generateConfig(defaultType);
const reasonConverters = testsUtils.createReasonConverters();
const server = new QUICServer({
crypto: {
key,
Expand All @@ -809,6 +810,7 @@ describe(QUICStream.name, () => {
verifyPeer: true,
ca: tlsConfig2.ca,
},
...reasonConverters,
});
testsUtils.extractSocket(server, sockets);
server.addEventListener(
Expand All @@ -831,6 +833,7 @@ describe(QUICStream.name, () => {
key: tlsConfig2.key,
cert: tlsConfig2.cert,
},
...reasonConverters,
});
testsUtils.extractSocket(client, sockets);
const conn = (await connectionEventProm.p).detail;
Expand All @@ -848,19 +851,28 @@ describe(QUICStream.name, () => {
const writer = clientStream.writable.getWriter();
await writer.write(message);
writer.releaseLock();
await serverStreamProm.p;
clientStream.cancel(cancelReason);
await expect(clientStream.readable.getReader().read()).rejects.toBe(
cancelReason,
);
await expect(clientStream.writable.getWriter().write()).rejects.toBe(
cancelReason,
);

// Let's check that the server side ended
const serverStream = await serverStreamProm.p;
await expect(
serverStream.readable.pipeTo(serverStream.writable),
).rejects.toThrow();
const serverReadProm = (async () => {
for await (const _ of serverStream.readable) {
// Just consume until stream throws
}
})();
await expect(serverReadProm).rejects.toBe(cancelReason);
const serverWriter = serverStream.writable.getWriter();
// Should throw
await expect(serverWriter.write(Buffer.from('hello'))).rejects.toBe(
cancelReason,
);

// And client stream should've cleaned up
await testsUtils.sleep(100);
expect(clientStream[destroyed]).toBeTrue();
Expand All @@ -873,6 +885,7 @@ describe(QUICStream.name, () => {
utils.promise<events.QUICServerConnectionEvent>();
const tlsConfig1 = await generateConfig(defaultType);
const tlsConfig2 = await generateConfig(defaultType);
const reasonConverters = testsUtils.createReasonConverters();
const server = new QUICServer({
crypto: {
key,
Expand All @@ -885,6 +898,7 @@ describe(QUICStream.name, () => {
verifyPeer: true,
ca: tlsConfig2.ca,
},
...reasonConverters,
});
testsUtils.extractSocket(server, sockets);
server.addEventListener(
Expand All @@ -907,6 +921,7 @@ describe(QUICStream.name, () => {
key: tlsConfig2.key,
cert: tlsConfig2.cert,
},
...reasonConverters,
});
testsUtils.extractSocket(client, sockets);
const conn = (await connectionEventProm.p).detail;
Expand All @@ -921,25 +936,34 @@ describe(QUICStream.name, () => {
// Let's make a new streams.
const clientStream = await client.connection.streamNew();
clientStream.cancel(cancelReason);

await expect(clientStream.readable.getReader().read()).rejects.toBe(
cancelReason,
);
await expect(clientStream.writable.getWriter().write()).rejects.toBe(
cancelReason,
);

// Let's check that the server side ended
const serverStream = await serverStreamProm.p;
await expect(
serverStream.readable.pipeTo(serverStream.writable),
).rejects.toThrow('recv 0');
const serverReadProm = (async () => {
for await (const _ of serverStream.readable) {
// Just consume until stream throws
}
})();
await expect(serverReadProm).rejects.toBe(cancelReason);
const serverWriter = serverStream.writable.getWriter();
// Should throw
await expect(serverWriter.write(Buffer.from('hello'))).rejects.toBe(
cancelReason,
);

// And client stream should've cleaned up
await testsUtils.sleep(100);
expect(clientStream[destroyed]).toBeTrue();
await client.destroy({ force: true });
await server.stop({ force: true });
});
test('Stream will end when waiting for more data', async () => {
test('stream will end when waiting for more data', async () => {
// Needed to check that the pull based reading of data doesn't break when we
// temporarily run out of data to read
const connectionEventProm =
Expand Down Expand Up @@ -1008,7 +1032,7 @@ describe(QUICStream.name, () => {
await client.destroy({ force: true });
await server.stop({ force: true });
});
test('Stream can error when blocked on data', async () => {
test('stream can error when blocked on data', async () => {
// This checks that if the readable web-stream is full and not pulling data,
// we will still respond to an error in the readable stream

Expand Down
Loading