From d7bc16968aed6a77df3f07abf990b93203a7e811 Mon Sep 17 00:00:00 2001 From: Mark Sujew Date: Tue, 24 Dec 2024 16:31:12 +0100 Subject: [PATCH] Fix file data streaming for long polling (#14659) --- .../src/common/remote-file-system-provider.ts | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index 7035bc5460056..fa6704d0b2e7c 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -46,7 +46,7 @@ export interface RemoteFileSystemServer extends RpcServer; close(fd: number): Promise; read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }>; - readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise; + readFileStream(resource: string, handle: number, opts: FileReadStreamOptions, token: CancellationToken): Promise; readFile(resource: string): Promise; write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise; writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise; @@ -162,6 +162,8 @@ export class RemoteFileSystemProvider implements Required, D protected readonly readyDeferred = new Deferred(); readonly ready = this.readyDeferred.promise; + protected streamHandleSeq = 0; + /** * Wrapped remote filesystem. */ @@ -251,36 +253,35 @@ export class RemoteFileSystemProvider implements Required, D readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents { const capturedError = new Error(); - // eslint-disable-next-line @typescript-eslint/no-shadow - const stream = newWriteableStream(data => BinaryBuffer.concat(data.map(data => BinaryBuffer.wrap(data))).buffer); - this.server.readFileStream(resource.toString(), opts, token).then(streamHandle => { + const stream = newWriteableStream(data => BinaryBuffer.concat(data.map(item => BinaryBuffer.wrap(item))).buffer); + const streamHandle = this.streamHandleSeq++; + const toDispose = new DisposableCollection( + token.onCancellationRequested(() => stream.end(cancelled())), + this.onFileStreamData(([handle, data]) => { + if (streamHandle === handle) { + stream.write(data); + } + }), + this.onFileStreamEnd(([handle, error]) => { + if (streamHandle === handle) { + if (error) { + const code = ('code' in error && error.code) || FileSystemProviderErrorCode.Unknown; + const fileOperationError = new FileSystemProviderError(error.message, code); + fileOperationError.name = error.name; + const capturedStack = capturedError.stack || ''; + fileOperationError.stack = `${capturedStack}\nCaused by: ${error.stack}`; + stream.end(fileOperationError); + } else { + stream.end(); + } + } + }) + ); + stream.on('end', () => toDispose.dispose()); + this.server.readFileStream(resource.toString(), streamHandle, opts, token).then(() => { if (token.isCancellationRequested) { stream.end(cancelled()); - return; } - const toDispose = new DisposableCollection( - token.onCancellationRequested(() => stream.end(cancelled())), - this.onFileStreamData(([handle, data]) => { - if (streamHandle === handle) { - stream.write(data); - } - }), - this.onFileStreamEnd(([handle, error]) => { - if (streamHandle === handle) { - if (error) { - const code = ('code' in error && error.code) || FileSystemProviderErrorCode.Unknown; - const fileOperationError = new FileSystemProviderError(error.message, code); - fileOperationError.name = error.name; - const capturedStack = capturedError.stack || ''; - fileOperationError.stack = `${capturedStack}\nCaused by: ${error.stack}`; - stream.end(fileOperationError); - } else { - stream.end(); - } - } - }) - ); - stream.on('end', () => toDispose.dispose()); }, error => stream.end(error)); return stream; } @@ -528,11 +529,8 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { } } - protected readFileStreamSeq = 0; - - async readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise { + async readFileStream(resource: string, handle: number, opts: FileReadStreamOptions, token: CancellationToken): Promise { if (hasFileReadStreamCapability(this.provider)) { - const handle = this.readFileStreamSeq++; const stream = this.provider.readFileStream(new URI(resource), opts, token); stream.on('data', data => this.client?.onFileStreamData(handle, data)); stream.on('error', error => { @@ -541,9 +539,9 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { this.client?.onFileStreamEnd(handle, { code, name, message, stack }); }); stream.on('end', () => this.client?.onFileStreamEnd(handle, undefined)); - return handle; + } else { + throw new Error('not supported'); } - throw new Error('not supported'); } }