Skip to content

Commit

Permalink
feat: batching byte getter (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw authored Jun 3, 2024
1 parent 9d9551b commit 4d39fea
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 3 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
},
"dependencies": {
"@httpland/range-parser": "^1.2.0",
"multipart-byte-range": "^3.0.1"
"multipart-byte-range": "^3.0.1",
"p-defer": "^4.0.1",
"uint8arraylist": "^2.4.8"
},
"devDependencies": {
"@types/node": "^20.12.12",
Expand Down
11 changes: 11 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 76 additions & 0 deletions src/batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import defer from 'p-defer'
import { Uint8ArrayList } from 'uint8arraylist'

export const DefaultBatchSize = 10 * 1024 * 1024 // 10MiB

/**
* @param {import('multipart-byte-range').ByteGetter} getBytes
* @param {import('multipart-byte-range').AbsoluteRange[]} ranges
* @param {{ maxSize?: number }} [options]
* @returns {import('multipart-byte-range').ByteGetter}
*/
export const createBatchingByteGetter = (getBytes, ranges, options) => {
/** @type {Record<string, import('p-defer').DeferredPromise<ReadableStream<Uint8Array>>>} */
const requests = {}
const batches = batchRanges(ranges, options)

return async range => {
if (requests[range.toString()]) {
return requests[range.toString()].promise
}

const batch = batches.find(b => b.some(r => r[0] === range[0] && r[1] === range[1]))
if (!batch) throw new Error(`batch not found for range: ${range[0]}-${range[1]}`)
for (const r of batch) {
requests[r.toString()] = defer()
}

const offset = batch[0][0]
const source = await getBytes([offset, batch[batch.length - 1][1]])

const buffer = new Uint8ArrayList()
await source.pipeTo(new WritableStream({ write: chunk => { buffer.append(chunk) } }))

for (const r of batch) {
requests[r.toString()].resolve(new ReadableStream({
pull (controller) {
controller.enqueue(buffer.subarray(r[0] - offset, (r[1] + 1) - offset))
controller.close()
}
}))
}

return requests[range.toString()].promise
}
}

/**
* @param {import('multipart-byte-range').AbsoluteRange[]} ranges
* @param {{ maxSize?: number }} [options]
*/
export const batchRanges = (ranges, options) => {
ranges = [...ranges].sort((a, b) => a[0] - b[0])

const maxSize = options?.maxSize ?? DefaultBatchSize
const batches = []
/** @type {import('multipart-byte-range').AbsoluteRange[]} */
let batch = []
let batchSize = 0
for (const r of ranges) {
const size = r[1] - r[0]
const prevRange = batch.at(-1)
const bytesBetween = prevRange ? r[0] - prevRange[1] : 0
if (bytesBetween < 0) throw new Error('overlapping byte ranges')

if (batchSize + bytesBetween + size > maxSize) {
batches.push(batch)
batch = []
batchSize = 0
}

batch.push(r)
batchSize += bytesBetween + size
}
batches.push(batch)
return batches
}
6 changes: 4 additions & 2 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as API from './api.js'
import { MultipartByteRangeEncoder } from 'multipart-byte-range/encoder'
import { decodeRangeHeader, resolveRange } from './range.js'
import { createBatchingByteGetter } from './batch.js'

/**
* @param {{ bucket: API.Bucket }} model
Expand Down Expand Up @@ -94,12 +95,13 @@ const handleRange = async (bucket, key, size, range, options) => {
* @param {{ headers?: Headers }} [options]
*/
const handleMultipartRange = async (bucket, key, size, ranges, options) => {
const source = new MultipartByteRangeEncoder(ranges, async range => {
const getBytes = createBatchingByteGetter(async range => {
const options = { range: { offset: range[0], length: range[1] - range[0] + 1 } }
const object = await bucket.get(key, options)
if (!object || !object.body) throw new Error('Object Not Found')
return /** @type {ReadableStream} */ (object.body)
}, { totalSize: size })
}, ranges.map(r => resolveRange(r, size)))
const source = new MultipartByteRangeEncoder(ranges, getBytes, { totalSize: size })

const headers = new Headers(options?.headers)
for (const [k, v] of Object.entries(source.headers)) {
Expand Down
46 changes: 46 additions & 0 deletions test/batch.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { webcrypto as crypto } from 'node:crypto'
import { equals } from 'uint8arrays'
import { Uint8ArrayList } from 'uint8arraylist'
import { batchRanges, createBatchingByteGetter } from '../src/batch.js'

export const test = {
'should batch ranges within batch size': (/** @type {import('entail').assert} */ assert) => {
const batches = batchRanges([[3, 5], [7, 9], [10, 16], [17, 20], [21, 22]], { maxSize: 6 })
assert.deepEqual(batches, [[[3, 5], [7, 9]], [[10, 16]], [[17, 20], [21, 22]]])
},

'should not batch ranges larger than max batch size': (/** @type {import('entail').assert} */ assert) => {
const batches = batchRanges([[3, 5], [7, 9]], { maxSize: 5 })
assert.deepEqual(batches, [[[3, 5]], [[7, 9]]])
},

'should not batch ranges when bytes between exceeds max batch size': (/** @type {import('entail').assert} */ assert) => {
const batches = batchRanges([[3, 5], [8, 10]], { maxSize: 6 })
assert.deepEqual(batches, [[[3, 5]], [[8, 10]]])
},

'should fail when ranges overlap': (/** @type {import('entail').assert} */ assert) => {
assert.throws(() => batchRanges([[3, 5], [4, 6]]), /overlapping/)
},

'should fetch correct bytes from batching byte getter': async (/** @type {import('entail').assert} */ assert) => {
const bytes = crypto.getRandomValues(new Uint8Array(50))
/** @type {import('multipart-byte-range').AbsoluteRange[]} */
const ranges = [[3, 5], [7, 9], [10, 16], [17, 20], [21, 22]]
const getBytes = createBatchingByteGetter(async range => {
return new ReadableStream({
pull (controller) {
controller.enqueue(bytes.subarray(range[0], range[1] + 1))
controller.close()
}
})
}, ranges)

for (const r of ranges) {
const buf = new Uint8ArrayList()
const source = await getBytes(r)
await source.pipeTo(new WritableStream({ write: chunk => { buf.append(chunk) } }))
assert.ok(equals(buf.slice(), bytes.slice(r[0], r[1] + 1)))
}
}
}

0 comments on commit 4d39fea

Please sign in to comment.