Skip to content

Commit

Permalink
Fixed nested pausing
Browse files Browse the repository at this point in the history
  • Loading branch information
calebsander committed Sep 27, 2017
1 parent 575b3a5 commit af7ccc7
Show file tree
Hide file tree
Showing 17 changed files with 388 additions and 150 deletions.
4 changes: 2 additions & 2 deletions compiled/download.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions compiled/upload-download.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions compiled/upload.js

Large diffs are not rendered by default.

18 changes: 17 additions & 1 deletion dist/lib/appendable-stream.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import AppendableBuffer from './appendable';
export default class AppendableStream implements AppendableBuffer {
private readonly outStream;
private writtenBytes;
private pauseCount;
private paused;
/**
* @param outStream The underlying writable stream
Expand Down Expand Up @@ -43,7 +44,22 @@ export default class AppendableStream implements AppendableBuffer {
* [[resume]] is next called and
* can be cancelled from being written
* by calling [[reset]].
* @throws If paused earlier and never resumed
*
* If called multiple times, [[resume]]
* and [[reset]] only act on bytes added
* since the most recent pause. Example:
* ````javascript
* let gb = new GrowableBuffer
* gb
* .pause()
* .add(1).add(2).add(3)
* .pause()
* .add(4).add(5).add(6)
* .reset() //cancels [4, 5, 6]
* .resume()
* .resume() //resumes [1, 2, 3]
* console.log(new Uint8Array(gb.toBuffer())) //Uint8Array [ 1, 2, 3 ]
* ````
*/
pause(): this;
/**
Expand Down
46 changes: 34 additions & 12 deletions dist/lib/appendable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class AppendableStream {
assert_1.default.instanceOf(outStream, WRITABLE_STREAMS);
this.outStream = outStream;
this.writtenBytes = 0;
this.paused = null;
this.pauseCount = 0;
this.paused = new growable_buffer_1.default;
}
/**
* Appends a byte to the end
Expand All @@ -39,7 +40,7 @@ class AppendableStream {
*/
addAll(buffer) {
assert_1.default.instanceOf(buffer, ArrayBuffer);
if (this.paused)
if (this.pauseCount)
this.paused.addAll(buffer);
else
this.outStream.write(Buffer.from(buffer));
Expand All @@ -65,11 +66,26 @@ class AppendableStream {
* [[resume]] is next called and
* can be cancelled from being written
* by calling [[reset]].
* @throws If paused earlier and never resumed
*
* If called multiple times, [[resume]]
* and [[reset]] only act on bytes added
* since the most recent pause. Example:
* ````javascript
* let gb = new GrowableBuffer
* gb
* .pause()
* .add(1).add(2).add(3)
* .pause()
* .add(4).add(5).add(6)
* .reset() //cancels [4, 5, 6]
* .resume()
* .resume() //resumes [1, 2, 3]
* console.log(new Uint8Array(gb.toBuffer())) //Uint8Array [ 1, 2, 3 ]
* ````
*/
pause() {
assert_1.default(this.paused === null, 'Already paused');
this.paused = new growable_buffer_1.default;
this.paused.pause();
this.pauseCount++;
return this;
}
/**
Expand All @@ -79,11 +95,15 @@ class AppendableStream {
* @throws If not currently paused
*/
resume() {
if (!this.paused)
if (!this.pauseCount)
throw new Error('Was not paused');
const { length } = this.paused;
this.outStream.write(Buffer.from(this.paused.rawBuffer, 0, length));
this.paused = null;
this.pauseCount--;
if (this.pauseCount)
this.paused.resume(); //still in pause stack
else {
this.outStream.write(Buffer.from(this.paused.rawBuffer, 0, this.paused.length));
this.paused = new growable_buffer_1.default; //must use a new buffer to avoid overwriting data sent to outStream
}
return this;
}
/**
Expand All @@ -95,10 +115,12 @@ class AppendableStream {
* @throws If not currently paused
*/
reset() {
if (!this.paused)
if (!this.pauseCount)
throw new Error('Was not paused');
this.writtenBytes -= this.paused.length;
this.paused = new growable_buffer_1.default;
const lengthBeforeReset = this.paused.length;
this.paused.reset();
const lengthAfterReset = this.paused.length;
this.writtenBytes -= lengthBeforeReset - lengthAfterReset;
return this;
}
}
Expand Down
21 changes: 18 additions & 3 deletions dist/lib/appendable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
* .add(1).add(2)
* .addAll(new Uint8Array([3, 4, 5]).buffer)
* .pause()
* .add(0)
* .reset()
* .add(0)
* .reset()
* .resume()
* console.log(new Uint8Array(gb.toBuffer())) //Uint8Array [ 1, 2, 3, 4, 5 ]
* ````
Expand Down Expand Up @@ -42,7 +42,22 @@ export default interface AppendableBuffer {
* [[resume]] is next called and
* can be cancelled from being written
* by calling [[reset]].
* @throws If paused earlier and never resumed
*
* If called multiple times, [[resume]]
* and [[reset]] only act on bytes added
* since the most recent pause. Example:
* ````javascript
* let gb = new GrowableBuffer
* gb
* .pause()
* .add(1).add(2).add(3)
* .pause()
* .add(4).add(5).add(6)
* .reset() //cancels [4, 5, 6]
* .resume()
* .resume() //resumes [1, 2, 3]
* console.log(new Uint8Array(gb.toBuffer())) //Uint8Array [ 1, 2, 3 ]
* ````
*/
pause(): this;
/**
Expand Down
23 changes: 17 additions & 6 deletions dist/lib/growable-buffer.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ import AppendableBuffer from './appendable';
export default class GrowableBuffer implements AppendableBuffer {
private buffer;
private size;
/**
* The length of the buffer before the current pause,
* or `null` if not currently paused
*/
private commitedSize;
private readonly pausePoints;
/**
* @param initialLength
* The number of bytes in the internal buffer at start
Expand Down Expand Up @@ -70,7 +66,22 @@ export default class GrowableBuffer implements AppendableBuffer {
* [[resume]] is next called and
* can be cancelled from being written
* by calling [[reset]].
* @throws If paused earlier and never resumed
*
* If called multiple times, [[resume]]
* and [[reset]] only act on bytes added
* since the most recent pause. Example:
* ````javascript
* let gb = new GrowableBuffer
* gb
* .pause()
* .add(1).add(2).add(3)
* .pause()
* .add(4).add(5).add(6)
* .reset() //cancels [4, 5, 6]
* .resume()
* .resume() //resumes [1, 2, 3]
* console.log(new Uint8Array(gb.toBuffer())) //Uint8Array [ 1, 2, 3 ]
* ````
*/
pause(): this;
/**
Expand Down
38 changes: 27 additions & 11 deletions dist/lib/growable-buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class GrowableBuffer {
}
this.buffer = new ArrayBuffer(initialLength);
this.size = 0;
this.commitedSize = null;
this.pausePoints = [];
}
/**
* The current number of bytes being occupied.
Expand Down Expand Up @@ -95,10 +95,10 @@ class GrowableBuffer {
*/
toBuffer() {
let length;
if (this.commitedSize === null)
length = this.size;
if (this.pausePoints.length)
[length] = this.pausePoints; //go up to first pause point
else
length = this.commitedSize;
length = this.size;
return this.buffer.slice(0, length);
}
/**
Expand All @@ -108,11 +108,25 @@ class GrowableBuffer {
* [[resume]] is next called and
* can be cancelled from being written
* by calling [[reset]].
* @throws If paused earlier and never resumed
*
* If called multiple times, [[resume]]
* and [[reset]] only act on bytes added
* since the most recent pause. Example:
* ````javascript
* let gb = new GrowableBuffer
* gb
* .pause()
* .add(1).add(2).add(3)
* .pause()
* .add(4).add(5).add(6)
* .reset() //cancels [4, 5, 6]
* .resume()
* .resume() //resumes [1, 2, 3]
* console.log(new Uint8Array(gb.toBuffer())) //Uint8Array [ 1, 2, 3 ]
* ````
*/
pause() {
assert_1.default(this.commitedSize === null, 'Already paused');
this.commitedSize = this.size;
this.pausePoints.push(this.size);
return this;
}
/**
Expand All @@ -122,8 +136,9 @@ class GrowableBuffer {
* @throws If not currently paused
*/
resume() {
assert_1.default(this.commitedSize !== null, 'Was not paused');
this.commitedSize = null;
const pausePoint = this.pausePoints.pop();
if (pausePoint === undefined)
throw new Error('Was not paused');
return this;
}
/**
Expand All @@ -135,9 +150,10 @@ class GrowableBuffer {
* @throws If not currently paused
*/
reset() {
if (this.commitedSize === null)
if (!this.pausePoints.length)
throw new Error('Was not paused');
this.size = this.commitedSize;
const pausePoint = this.pausePoints[this.pausePoints.length - 1];
this.size = pausePoint;
return this;
}
}
Expand Down
Loading

0 comments on commit af7ccc7

Please sign in to comment.