-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to implement a through stream #527
Comments
Not sure I understand you mean something like (no packages, this is pseudocode not an implementation just to understand the ask): myStream.compose(async function*(s) {
let currentReadable = new Readable();
for await (const chunk of s) {
let delimiter = chunk.indexOf("\n");
if (delimiter !== -1) {
currentReadable.push(chunk.slice(0, currentReadable));
yield currentReadable();
currentReadable = new Readable();
currentReadable.push(chunk.slice(delimiter));
} else {
currentReadable.push(chunk);
}
}
yield currentReadable();
}).forEach((readable) => {
readable.pipe(somewhereDependingOnWhateverLogicYouHave);
}); |
Almost. I don't want to mix my "WhateverLogicYouHave" with this particular code. Instead I'm supplying a I'm not sure if it's necessary, but in order to be able to write to outputStream as well as piping it to other places I made it a through stream. The Through stream looks like this: class MyTrough extends Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, cb) {
//No transformation, just pass through data
this.push(chunk);
cb();
}
} Also, I'm not awaiting anything. Instead I'm using the data event like this: child.stdout.on('data', (chunk) => {
const delimiterIndex = chunk.indexOf(fileDelimiter, 0, 'binary');
if (delimiterIndex > -1) {
outputStream.end();
fileIndex++;
outputStream = new MyThrough();
handleOutputStream({ fileIndex, outputStream });
chunk = chunk.slice(
delimiterIndex + fileDelimiter.length + 1,
chunk.length
);
}
outputStream.write(chunk);
}); |
const { Transform} = require('readable-stream')
const stream = new Transform({
transform (chunk, encoding, cb) {
// ..
}
}) Which is equivalent to: const stream = through2(function (chunk, encoding, cb) {
// ..
}) |
Thanks @vweevers I do wondet what your |
I just intended to show that the body of the function is equal between the two code snippets. Fully written out, it's: const { Transform } = require('readable-stream')
const stream = new Transform({
transform (chunk, encoding, cb) {
cb(null, chunk)
}
}) If you need exactly that, then you can also use the PassThrough utility: const { PassThrough } = require('readable-stream')
const stream = new PassThrough() |
I tested the code and fixed it, here is a "native" solution, can be simplified much further but has the advantage of only reading the "files" until needed. const { Readable, PassThrough } = require("stream");
const arr = Uint8Array.from([
...(Array(10000).fill(1)),
10,
...(Array(10000).fill(2)),
10,
...(Array(10000).fill(3))
]);
let streams = Readable.from([arr, arr], { objectMode: false }).compose(async function* (stream) {
let currentReadable = new PassThrough();
for await (let chunk of stream) {
let delimiter = chunk.indexOf(10);
let found = delimiter !== -1;
while (delimiter !== -1) {
currentReadable.push(chunk.slice(0, delimiter));
yield currentReadable;
chunk = chunk.slice(delimiter + 1);
currentReadable = new PassThrough();
currentReadable.push(chunk);
delimiter = chunk.indexOf("\n");
}
if (!found) {
currentReadable.push(chunk);
}
}
yield currentReadable;
});
// sometimes later
streams.forEach(async s => {
console.log("Got stream, do whatever with it, pass it to wherever");
}); This can be made faster and the while loop can likely be simplified a lot. |
@benjamingr As I'm only looking for a specific file delimiter I switched your last Other then that, the code works great. Next step is to try to clone those streams. I had trouble with this last I tried (mcollina/cloneable-readable#44). @vweevers Thank you for pointing out that Through is available as a util in readable-stream. I didn't find this in the docs, so I assumed it was not there. |
We are creating these new streams in our "stream of streams" why do you need to further clone them? |
Each stream is a separate file. Example:
The two files extracted from delimitedFileStream will end up in three writeStreams each. Resizing is performed with Sharp (https://github.com/lovell/sharp). |
Is there any "best practice" on how to get hold of and marchal error code out of a stream generator as above? My original stream is stdout from a child process. I need to capture both the exit code and stderr from the child process and somehow marchal these to the caller. I tried to wrap the code within another promise, but I don't think async function pdfiumCommand({ command, input, options = {} }) {
try {
return new Promise((resolve, reject) => {
const child = spawnPdfiumProcess({ command, input, options });
let fileStreams = child.stdout.compose(async function* (stream) {
let currentReadable = new PassThrough();
for await (let chunk of stream) {
let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
while (delimiterIndex !== -1) {
currentReadable.push(chunk.slice(0, delimiterIndex));
yield currentReadable;
chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
currentReadable = new PassThrough();
currentReadable.push(chunk);
delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
}
if (delimiterIndex === -1) {
currentReadable.push(chunk);
}
}
yield currentReadable;
});
let stderr = '';
// child.stderr.on('data', (data) => {
// stderr += data.toString();
// });
child.on('close', (code) => {
resolve({
fileStreams,
stderr,
code,
});
});
});
} catch (e) {
reject(new Error(code || 'No error code returned from "pdfium"'));
}
} edit: |
Frustrating.. I'm not able to catch my error. I probably lack understanding of error handling with generators and/or streams. My last attempt was to throw an error if the exit code from the child process is not 0. async function pdfiumCommand({ command, input, options = {} }) {
const child = spawnPdfiumProcess({ command, input, options });
let fileStreams = child.stdout.compose(async function* (stream) {
let currentReadable = new PassThrough();
for await (let chunk of stream) {
let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
while (delimiterIndex !== -1) {
currentReadable.push(chunk.slice(0, delimiterIndex));
yield currentReadable;
chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
currentReadable = new PassThrough();
currentReadable.push(chunk);
delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
}
if (delimiterIndex === -1) {
currentReadable.push(chunk);
}
}
yield currentReadable;
});
child.on('close', (code) => {
if (code !== 0) {
throw new Error('PDFium error: ' + code);
}
});
return fileStreams;
} My application crashes and I can see the correct code in the crash log. But I'm not able to catch the error. I even tried const fileStreams = await pdfiumExplode2ToStream({
input,
options,
});
let i = 0;
fileStreams.forEach(async (fileStream) => {
i++;
console.log('Processing file ' + i);
const outputPath = `../test/tmp`;
const outputPdf = fs.createWriteStream(`${outputPath}/file_${i}.pdf`);
try {
pump(fileStream, outputPdf, function (err) {
if (err) {
console.log('error from pump callback');
console.log(err);
} else {
console.log('Pump finished');
}
});
} catch (err) {
console.log('error from try-catch');
console.log(err);
}
}); |
compose creates a stream, my code returns a stream of streams (they're already copied), there is no need to wrap it in |
I understood that. I just tried to solve my error reporting problem with an outer promise. This was a bad idea. I also tried to throw an error (my last post), but I can't find any way to catch this error. |
How is your child process signaling it had an error? Does it terminate with a different exit code? Write to stderr? Processes don't uniformly distinguish "I exited early" from "I exited with an error" from "I finished" |
Anyway you can |
Im getting both a non zero exit code and a message on stderr if something goes wrong in the child process. But I'm not able to propagate the error up in the chain. I tried this, but it didn't work: child.on('close', (code) => {
if (code !== 0) {
fileStreams.destroy(new Error('PDFium error: ' + code));
}
}); |
I also tried to destroy each inner stream. child.on('close', (code) => {
if (code !== 0) {
fileStreams.forEach(async (fileStream) => {
fileStream.destroy(new Error('PDFium error: ' + code));
});
fileStreams.destroy(new Error('PDFium error: ' + code));
}
}); |
I found one (ugly) solution. This is not a neat solution, but at least I can get hold of the error message from stderr. The first stream in the composed "bundle" is now stderr. I'm sure there must be better ways to solve this. let errorStream = new PassThrough();
let fileStreams = child.stdout.compose(async function* (stream) {
yield errorStream;
let currentReadable = new PassThrough();
for await (let chunk of stream) {
let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
while (delimiterIndex !== -1) {
currentReadable.push(chunk.slice(0, delimiterIndex));
yield currentReadable;
chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
currentReadable = new PassThrough();
currentReadable.push(chunk);
delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
}
if (delimiterIndex === -1) {
currentReadable.push(chunk);
}
}
yield currentReadable;
});
child.stderr.on('data', (data) => {
errorStream.push(data);
}); |
@benjamingr In my example I have only one file in my componsed file streams. The following works fine: jpgFileStream.forEach(async (fileStream) => {
const outFile = fs.createWriteStream(`../test/tmp/fromGenerator.jpg`);
fileStream.pipe(outFile);
}); As soon as I introduce a transform stream (sharp) I get empty files in the output. jpgFileStream.forEach(async (fileStream) => {
const outFile = fs.createWriteStream(`../test/tmp/fromGenerator.jpg`);
const sharpPipeline = sharp().rotate(90);
fileStream.pipe(sharpPipeline).pipe(outFile);
}); Sometimes I get "cut off" jpeg files like this, which leads me to believe that the streams are already started: The sharpPipeline works fine if I get the input directly from a readable filestream. const sharpPipeline = sharp().rotate(90);
const inFile = fs.createReadStream('../test/resources/test.jpg');
const outFile = fs.createWriteStream(`../test/tmp/fromLocalDisk.jpg`);
inFile.pipe(sharpPipeline).pipe(outFile); |
Update: Whenever my childprocess takes a bit longer to produce the stream (i.e. more complicated processing), the destination writeStream seems to end prematurely, i.e I get empty or sometimes half files written to disk. It's almost as if the I went back to my original solution and tried the exact same files and file processing. It works great. The main difference is that I provide a I suspect that this packaging och streams in streams either has some design problem or maybe some stream bug. |
Streams are sometimes hard to grasp for me. The various versions in Node doesn't make it easier.
It seems lika a good idea to use redable-stream so I can at least have a solid ground to build knowledge upon, instead of an ever changing landscape.
I'm reading a stream of binary files (stdout from a child process), the files are delimited with a delimiter. I'm looking at each chunk when it arrives and either just write it to my outputStream or create a new outputStream if a file delimiter is discovered. After that , each new stream needs to be cloned (using cloneable-readable) and piped to different locations.
Originally I had an "outputStreamFactory" which created a writableStream and piped it to it's destination. The streamFactory was used to create a new stream everytime a file delimiter was discovered. This does not work anymore, since I need to pipe the stream again (cannot pipe a writable stream).
Q1: should I use a through stream for this?
Q2: readable-stream does not have a through-stream. Should I build one from a transform stream?
Q3: in that case, how should I build a through stream safely from a transform stream?
Q4: or should I use thgrough2, which is probably not using readable-stream under the hood
The text was updated successfully, but these errors were encountered: