Skip to content

Commit ea2415b

Browse files
author
Alan Shaw
committed
feat: implement batching and cleanup
1 parent cae1621 commit ea2415b

File tree

11 files changed

+254
-316
lines changed

11 files changed

+254
-316
lines changed

bench/put-x10_000-batcher.js

+26-14
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import * as Batch from '../src/batch/index.js'
44
import { ShardBlock } from '../src/shard.js'
55
import { MemoryBlockstore } from '../src/block.js'
66
import { randomCID, randomString, randomInteger } from '../test/helpers.js'
7+
import { collectMetrics, writePail } from './util.js'
78

89
const NUM = 10_000
910

@@ -23,24 +24,35 @@ async function main () {
2324
kvs.push([k, v])
2425
}
2526

27+
/** @type {API.ShardLink} */
28+
let root = rootBlock.cid
2629
console.log('bench')
2730
console.time(`put x${NUM}`)
28-
const batch = await Batch.create(blocks, rootBlock.cid)
29-
for (let i = 0; i < kvs.length; i++) {
30-
await batch.put(kvs[i][0], kvs[i][1])
31-
if (i % 1000 === 0) {
32-
process.stdout.write('.')
31+
try {
32+
const batch = await Batch.create(blocks, rootBlock.cid)
33+
for (let i = 0; i < kvs.length; i++) {
34+
await batch.put(kvs[i][0], kvs[i][1])
35+
if (i % 1000 === 0) {
36+
process.stdout.write('.')
37+
}
3338
}
39+
const result = await batch.commit()
40+
for (const b of result.additions) {
41+
blocks.putSync(b.cid, b.bytes)
42+
}
43+
for (const b of result.removals) {
44+
blocks.deleteSync(b.cid)
45+
}
46+
root = result.root
47+
} catch (err) {
48+
console.log('')
49+
console.error(err)
50+
} finally {
51+
console.log('')
52+
console.timeEnd(`put x${NUM}`)
53+
await writePail(blocks, root)
54+
console.log(await collectMetrics(blocks, root))
3455
}
35-
const result = await batch.commit()
36-
for (const b of result.additions) {
37-
blocks.putSync(b.cid, b.bytes)
38-
}
39-
for (const b of result.removals) {
40-
blocks.deleteSync(b.cid)
41-
}
42-
console.log('')
43-
console.timeEnd(`put x${NUM}`)
4456
}
4557

4658
main()

cli.js

+56-98
Original file line numberDiff line numberDiff line change
@@ -2,97 +2,84 @@
22
import fs from 'fs'
33
import os from 'os'
44
import { join } from 'path'
5-
import { Readable } from 'stream'
5+
import { Readable, Writable } from 'stream'
66
import sade from 'sade'
77
import { CID } from 'multiformats/cid'
8-
import { CarIndexedReader, CarReader, CarWriter } from '@ipld/car'
8+
import { CARReaderStream, CARWriterStream } from 'carstream'
99
import clc from 'cli-color'
1010
import archy from 'archy'
1111
// eslint-disable-next-line no-unused-vars
1212
import * as API from './src/api.js'
13-
import { put, get, del, entries } from './src/v1/index.js'
14-
import { ShardFetcher, ShardBlock } from './src/v1/shard.js'
15-
import { MaxShardSize } from './src/shard.js'
13+
import { put, get, del, entries } from './src/index.js'
14+
import { ShardFetcher, ShardBlock, isShardLink } from './src/shard.js'
1615
import { difference } from './src/diff.js'
1716
import { merge } from './src/merge.js'
17+
import { MemoryBlockstore, MultiBlockFetcher } from './src/block.js'
1818

1919
const cli = sade('pail')
2020
.option('--path', 'Path to data store.', './pail.car')
2121

2222
cli.command('put <key> <value>')
2323
.describe('Put a value (a CID) for the given key. If the key exists it\'s value is overwritten.')
2424
.alias('set')
25-
.option('--max-shard-size', 'Maximum shard size in bytes.', MaxShardSize)
2625
.action(async (key, value, opts) => {
27-
const maxShardSize = opts['max-shard-size'] ?? MaxShardSize
28-
const blocks = await openPail(opts.path, { maxSize: maxShardSize })
29-
const roots = await blocks.getRoots()
30-
// @ts-expect-error
31-
const { root, additions, removals } = await put(blocks, roots[0], key, CID.parse(value))
26+
const { root: prevRoot, blocks } = await openPail(opts.path)
27+
const { root, additions, removals } = await put(blocks, prevRoot, key, CID.parse(value))
3228
await updatePail(opts.path, blocks, root, { additions, removals })
3329

34-
console.log(clc.red(`--- ${roots[0]}`))
30+
console.log(clc.red(`--- ${prevRoot}`))
3531
console.log(clc.green(`+++ ${root}`))
3632
console.log(clc.magenta('@@ -1 +1 @@'))
3733
additions.forEach(b => console.log(clc.green(`+${b.cid}`)))
3834
removals.forEach(b => console.log(clc.red(`-${b.cid}`)))
39-
await closePail(blocks)
4035
})
4136

4237
cli.command('get <key>')
4338
.describe('Get the stored value for the given key from the pail. If the key is not found, `undefined` is returned.')
4439
.action(async (key, opts) => {
45-
const blocks = await openPail(opts.path)
46-
// @ts-expect-error
47-
const value = await get(blocks, (await blocks.getRoots())[0], key)
40+
const { root, blocks } = await openPail(opts.path)
41+
const value = await get(blocks, root, key)
4842
if (value) console.log(value.toString())
49-
await closePail(blocks)
5043
})
5144

5245
cli.command('del <key>')
5346
.describe('Delete the value for the given key from the pail. If the key is not found no operation occurs.')
5447
.alias('delete', 'rm', 'remove')
5548
.action(async (key, opts) => {
56-
const blocks = await openPail(opts.path)
57-
const roots = await blocks.getRoots()
58-
// @ts-expect-error
59-
const { root, additions, removals } = await del(blocks, roots[0], key)
49+
const { root: prevRoot, blocks } = await openPail(opts.path)
50+
const { root, additions, removals } = await del(blocks, prevRoot, key)
6051
await updatePail(opts.path, blocks, root, { additions, removals })
6152

62-
console.log(clc.red(`--- ${roots[0]}`))
53+
console.log(clc.red(`--- ${prevRoot}`))
6354
console.log(clc.green(`+++ ${root}`))
6455
console.log(clc.magenta('@@ -1 +1 @@'))
6556
additions.forEach(b => console.log(clc.green(`+ ${b.cid}`)))
6657
removals.forEach(b => console.log(clc.red(`- ${b.cid}`)))
67-
await closePail(blocks)
6858
})
6959

7060
cli.command('ls')
7161
.describe('List entries in the pail.')
7262
.alias('list')
7363
.option('-p, --prefix', 'Key prefix to filter by.')
64+
.option('--gt', 'Filter results by keys greater than this string.')
65+
.option('--lt', 'Filter results by keys less than this string.')
7466
.option('--json', 'Format output as newline delimted JSON.')
7567
.action(async (opts) => {
76-
const blocks = await openPail(opts.path)
77-
const root = (await blocks.getRoots())[0]
68+
const { root, blocks } = await openPail(opts.path)
7869
let n = 0
79-
// @ts-expect-error
80-
for await (const [k, v] of entries(blocks, root, { prefix: opts.prefix })) {
70+
for await (const [k, v] of entries(blocks, root, { prefix: opts.prefix, gt: opts.gt, lt: opts.lt })) {
8171
console.log(opts.json ? JSON.stringify({ key: k, value: v.toString() }) : `${k}\t${v}`)
8272
n++
8373
}
8474
if (!opts.json) console.log(`total ${n}`)
85-
await closePail(blocks)
8675
})
8776

8877
cli.command('tree')
8978
.describe('Visualise the pail.')
79+
.alias('vis')
9080
.action(async (opts) => {
91-
const blocks = await openPail(opts.path)
92-
const root = (await blocks.getRoots())[0]
93-
// @ts-expect-error
81+
const { root, blocks } = await openPail(opts.path)
9482
const shards = new ShardFetcher(blocks)
95-
// @ts-expect-error
9683
const rshard = await shards.get(root)
9784

9885
/** @type {archy.Data} */
@@ -119,27 +106,19 @@ cli.command('tree')
119106
}
120107

121108
console.log(archy(archyRoot))
122-
await closePail(blocks)
123109
})
124110

125111
cli.command('diff <path>')
126112
.describe('Find the differences between this pail and the passed pail.')
127113
.option('-k, --keys', 'Output key/value diff.')
128114
.action(async (path, opts) => {
129-
const [ablocks, bblocks] = await Promise.all([openPail(opts.path), openPail(path)])
130-
const [aroot, broot] = await Promise.all([ablocks, bblocks].map(async blocks => {
131-
return /** @type {API.ShardLink} */((await blocks.getRoots())[0])
132-
}))
115+
const [
116+
{ root: aroot, blocks: ablocks },
117+
{ root: broot, blocks: bblocks }
118+
] = await Promise.all([openPail(opts.path), openPail(path)])
133119
if (aroot.toString() === broot.toString()) return
134120

135-
const fetcher = {
136-
async get (cid) {
137-
const blk = await ablocks.get(cid)
138-
if (blk) return blk
139-
return bblocks.get(cid)
140-
}
141-
}
142-
// @ts-expect-error CarReader is not BlockFetcher
121+
const fetcher = new MultiBlockFetcher(ablocks, bblocks)
143122
const { shards: { additions, removals }, keys } = await difference(fetcher, aroot, broot)
144123

145124
console.log(clc.red(`--- ${aroot}`))
@@ -155,27 +134,18 @@ cli.command('diff <path>')
155134
additions.forEach(b => console.log(clc.green(`+ ${b.cid}`)))
156135
removals.forEach(b => console.log(clc.red(`- ${b.cid}`)))
157136
}
158-
159-
await Promise.all([closePail(ablocks), closePail(bblocks)])
160137
})
161138

162139
cli.command('merge <path>')
163140
.describe('Merge the passed pail into this pail.')
164141
.action(async (path, opts) => {
165-
const [ablocks, bblocks] = await Promise.all([openPail(opts.path), openPail(path)])
166-
const [aroot, broot] = await Promise.all([ablocks, bblocks].map(async blocks => {
167-
return /** @type {API.ShardLink} */((await blocks.getRoots())[0])
168-
}))
142+
const [
143+
{ root: aroot, blocks: ablocks },
144+
{ root: broot, blocks: bblocks }
145+
] = await Promise.all([openPail(opts.path), openPail(path)])
169146
if (aroot.toString() === broot.toString()) return
170147

171-
const fetcher = {
172-
async get (cid) {
173-
const blk = await ablocks.get(cid)
174-
if (blk) return blk
175-
return bblocks.get(cid)
176-
}
177-
}
178-
// @ts-expect-error CarReader is not BlockFetcher
148+
const fetcher = new MultiBlockFetcher(ablocks, bblocks)
179149
const { root, additions, removals } = await merge(fetcher, aroot, [broot])
180150

181151
await updatePail(opts.path, ablocks, root, { additions, removals })
@@ -185,65 +155,53 @@ cli.command('merge <path>')
185155
console.log(clc.magenta('@@ -1 +1 @@'))
186156
additions.forEach(b => console.log(clc.green(`+ ${b.cid}`)))
187157
removals.forEach(b => console.log(clc.red(`- ${b.cid}`)))
188-
189-
await Promise.all([closePail(ablocks), closePail(bblocks)])
190158
})
191159

192160
cli.parse(process.argv)
193161

194162
/**
195163
* @param {string} path
196-
* @param {{ maxSize?: number }} [config]
197-
* @returns {Promise<import('@ipld/car/api').CarReader>}
164+
* @returns {Promise<{ root: API.ShardLink, blocks: MemoryBlockstore }>}
198165
*/
199-
async function openPail (path, config) {
166+
async function openPail (path) {
167+
const blocks = new MemoryBlockstore()
200168
try {
201-
return await CarIndexedReader.fromFile(path)
169+
const carReader = new CARReaderStream()
170+
const readable = /** @type {ReadableStream<Uint8Array>} */ (Readable.toWeb(fs.createReadStream(path)))
171+
await readable.pipeThrough(carReader).pipeTo(new WritableStream({ write: b => blocks.put(b.cid, b.bytes) }))
172+
const header = await carReader.getHeader()
173+
if (!isShardLink(header.roots[0])) throw new Error(`not a shard: ${header.roots[0]}`)
174+
return { root: header.roots[0], blocks }
202175
} catch (err) {
203176
if (err.code !== 'ENOENT') throw new Error('failed to open bucket', { cause: err })
204-
const rootblk = await ShardBlock.create(config)
205-
const { writer, out } = CarWriter.create(rootblk.cid)
206-
writer.put(rootblk)
207-
writer.close()
208-
return CarReader.fromIterable(out)
209-
}
210-
}
211-
212-
/** @param {import('@ipld/car/api').CarReader} reader */
213-
async function closePail (reader) {
214-
if (reader instanceof CarIndexedReader) {
215-
await reader.close()
177+
const rootblk = await ShardBlock.create()
178+
blocks.put(rootblk.cid, rootblk.bytes)
179+
return { root: rootblk.cid, blocks }
216180
}
217181
}
218182

219183
/**
220184
* @param {string} path
221-
* @param {import('@ipld/car/api').CarReader} reader
185+
* @param {MemoryBlockstore} blocks
222186
* @param {API.ShardLink} root
223187
* @param {API.ShardDiff} diff
224188
*/
225-
async function updatePail (path, reader, root, { additions, removals }) {
226-
// @ts-expect-error
227-
const { writer, out } = CarWriter.create(root)
189+
async function updatePail (path, blocks, root, { additions, removals }) {
228190
const tmp = join(os.tmpdir(), `pail${Date.now()}.car`)
229-
230-
const finishPromise = new Promise(resolve => {
231-
Readable.from(out).pipe(fs.createWriteStream(tmp)).on('finish', resolve)
232-
})
233-
234-
// put new blocks
235-
for (const b of additions) {
236-
await writer.put(b)
237-
}
238-
// put old blocks without removals
239-
for await (const b of reader.blocks()) {
240-
if (removals.some(r => b.cid.toString() === r.cid.toString())) {
241-
continue
191+
const iterator = blocks.entries()
192+
const readable = new ReadableStream({
193+
start (controller) {
194+
for (const b of additions) controller.enqueue(b)
195+
},
196+
pull (controller) {
197+
for (const b of iterator) {
198+
if (removals.some(r => b.cid.toString() === r.cid.toString())) continue
199+
return controller.enqueue(b)
200+
}
201+
controller.close()
242202
}
243-
await writer.put(b)
244-
}
245-
await writer.close()
246-
await finishPromise
203+
})
204+
await readable.pipeThrough(new CARWriterStream([root])).pipeTo(Writable.toWeb(fs.createWriteStream(tmp)))
247205

248206
const old = `${path}-${new Date().toISOString()}`
249207
try {

package.json

+6-6
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,17 @@
143143
"dist"
144144
],
145145
"dependencies": {
146-
"@ipld/car": "^5.2.4",
147-
"@ipld/dag-cbor": "^9.0.6",
148-
"archy": "^1.0.0",
149-
"cli-color": "^2.0.3",
150-
"multiformats": "^12.1.3",
151-
"sade": "^1.8.1"
146+
"@ipld/dag-cbor": "^9.2.0",
147+
"multiformats": "^13.1.0"
152148
},
153149
"devDependencies": {
150+
"archy": "^1.0.0",
154151
"c8": "^8.0.1",
152+
"carstream": "^2.0.0",
153+
"cli-color": "^2.0.3",
155154
"mocha": "^10.2.0",
156155
"nanoid": "^4.0.0",
156+
"sade": "^1.8.1",
157157
"standard": "^17.0.0",
158158
"typescript": "^5.0.2"
159159
},

0 commit comments

Comments
 (0)