Skip to content

Commit 56d6d9b

Browse files
vdiezjulien-f
authored andcommitted
implement stream backpressure
1 parent 80b74f0 commit 56d6d9b

File tree

1 file changed

+36
-22
lines changed

1 file changed

+36
-22
lines changed

lib/api/createReadStream.js

+36-22
Original file line numberDiff line numberDiff line change
@@ -45,39 +45,53 @@ module.exports = function createReadStream(path, options, cb) {
4545
});
4646
};
4747
}
48+
4849
var running = false;
50+
51+
function read(size) {
52+
request(
53+
'read',
54+
{
55+
FileId: file.FileId,
56+
Length: Math.min(MAX_READ_LENGTH, size, end - offset),
57+
Offset: new BigInt(8, offset).toBuffer(),
58+
},
59+
connection,
60+
function(err, content) {
61+
if (err != null) {
62+
return process.nextTick(stream.emit.bind(stream, 'error', err));
63+
}
64+
65+
offset += content.length;
66+
if (stream.push(content)) {
67+
if (end - offset === 0) {
68+
running = false;
69+
stream.push(null);
70+
}
71+
else read(size);
72+
}
73+
else {
74+
running = false;
75+
}
76+
}
77+
);
78+
}
4979
stream._read = function(size) {
5080
if (running) {
5181
return;
5282
}
5383

5484
if (offset >= end) {
5585
return shouldClose
56-
? close(function() {
86+
? close(function() {
5787
stream.push(null);
5888
})
59-
: stream.push(null);
89+
: stream.push(null);
90+
}
91+
else {
92+
running = true;
93+
read(size)
6094
}
61-
62-
running = true;
63-
request(
64-
'read',
65-
{
66-
FileId: file.FileId,
67-
Length: Math.min(MAX_READ_LENGTH, size, end - offset),
68-
Offset: new BigInt(8, offset).toBuffer(),
69-
},
70-
connection,
71-
function(err, content) {
72-
running = false;
73-
if (err != null) {
74-
return process.nextTick(stream.emit.bind(stream, 'error', err));
75-
}
76-
77-
offset += content.length;
78-
stream.push(content);
79-
}
80-
);
8195
};
8296
cb(null, stream);
8397
}

0 commit comments

Comments
 (0)