Skip to content

Commit

Permalink
feat: blobStore.setDownloadFilter() & `createEntriesReadableStream(…
Browse files Browse the repository at this point in the history
…)` (#940)

- Removes `blobStore.download()` and the `LiveDownload` class.
- Adds `blobStore.createEntriesReadableStream()` which is a stream of
  all entries in the blob store, filtered by blob type and variant(s).
- Changes behaviour of `BlobStore` to auto-download blobs according to a
  `downloadFilter`.
- Changes the `BlobStore` filter according to `isArchiveDevice` setting.
- Changes the behaviour of `CoreManager` to not auto-download blob cores
  (this responsibility is now with `BlobStore`).

Closes [#681].

[#681]: #681

Co-authored-by: Evan Hahn <[email protected]>
  • Loading branch information
gmaclennan and EvanHahn authored Nov 5, 2024
1 parent 5efcdfc commit 0a2db23
Show file tree
Hide file tree
Showing 19 changed files with 961 additions and 1,055 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
"@mapeo/crypto": "1.0.0-alpha.10",
"@mapeo/sqlite-indexer": "1.0.0-alpha.9",
"@sinclair/typebox": "^0.29.6",
"@sindresorhus/merge-streams": "^4.0.0",
"b4a": "^1.6.3",
"bcp-47": "^2.1.0",
"better-sqlite3": "^8.7.0",
Expand Down Expand Up @@ -203,6 +204,7 @@
"tiny-typed-emitter": "^2.1.0",
"type-fest": "^4.5.0",
"undici": "^6.13.0",
"unix-path-resolve": "^1.0.2",
"varint": "^6.0.0",
"ws": "^8.18.0",
"yauzl-promise": "^4.0.0"
Expand Down
130 changes: 130 additions & 0 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { createEntriesStream } from './entries-stream.js'
import { filePathMatchesFilter } from './utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobFilter } from '../types.js' */

/**
* Like hyperdrive.download() but 'live', and for multiple drives.
*
* Will emit an 'error' event for any unexpected errors. A consumer must attach
* an error listener to avoid uncaught errors. Sources of errors include:
*
* - If the entries stream emits an error
* - If a drive referenced in an entry is not found
* - If core.has() throws (e.g. if hypercore is closed)
* - If core.download().done() throws, which should not happen according to
* current hypercore code.
* - If the entries stream ends unexpectedly (it should be live and not end)
*
* NB: unlike hyperdrive.download(), this will also download deleted and
* previous versions of blobs - we don't currently support editing or deleting
* of blobs, so this should not be an issue, and if we do in the future,
* downloading deleted and previous versions may be desirable behavior anyway
*
* @extends {TypedEmitter<{ error: (error: Error) => void }>}
*/
export class Downloader extends TypedEmitter {
/** @type {import('./index.js').THyperdriveIndex} */
#driveIndex
/** @type {Set<{ done(): Promise<void>, destroy(): void }>} */
#queuedDownloads = new Set()
#entriesStream
#processEntriesPromise
#ac = new AbortController()
#shouldDownloadFile

/**
* @param {import('./index.js').THyperdriveIndex} driveIndex
* @param {object} [options]
* @param {import('../types.js').BlobFilter | null} [options.filter] Filter blobs of specific types and/or sizes to download
*/
constructor(driveIndex, { filter } = {}) {
super()
this.#driveIndex = driveIndex

this.#shouldDownloadFile = filter
? filePathMatchesFilter.bind(null, filter)
: () => true

this.#entriesStream = createEntriesStream(driveIndex, { live: true })
this.#entriesStream.once('error', this.#handleError)

this.#ac.signal.addEventListener('abort', this.#handleAbort, { once: true })

this.#processEntriesPromise = this.#processEntries()
this.#processEntriesPromise.catch(this.#handleError)
}

/**
* Start processing entries from the entries stream - if an entry matches the
* filter, and we don't already have it, queue it for download. If the
* Downloader is live, this method will never resolve, otherwise it will
* resolve when all the entries have been processed and downloaded.
*/
async #processEntries() {
for await (const entry of this.#entriesStream) {
this.#ac.signal.throwIfAborted()
const {
driveId,
key: filePath,
value: { blob },
} = entry
if (!this.#shouldDownloadFile(filePath)) continue
const drive = this.#driveIndex.get(driveId)
// ERROR HANDLING: this is unexpected and should not happen
if (!drive) throw new Error('Drive not found: ' + driveId)
const blobs = await drive.getBlobs()
this.#ac.signal.throwIfAborted()
await this.#processEntry(blobs.core, blob)
this.#ac.signal.throwIfAborted()
}
throw new Error('Entries stream ended unexpectedly')
}

/**
* Update state and queue missing entries for download
*
* @param {import('hypercore')} blobsCore
* @param {{ blockOffset: number, blockLength: number, byteLength: number }} blob
*/
async #processEntry(blobsCore, { blockOffset: start, blockLength: length }) {
const end = start + length
const have = await blobsCore.has(start, end)
this.#ac.signal.throwIfAborted()
if (have) return
const download = blobsCore.download({ start, end })
this.#queuedDownloads.add(download)
download
.done()
// According to the code, this should never throw.
.catch(this.#handleError)
.finally(() => {
this.#queuedDownloads.delete(download)
})
}

/**
* Cancel the downloads and clean up resources.
*/
destroy() {
this.#ac.abort()
}

/** @param {Error} error */
#handleError = (error) => {
if (this.#ac.signal.aborted) return
this.emit('error', error)
this.#ac.abort(error)
}

#handleAbort = () => {
for (const download of this.#queuedDownloads) download.destroy()
this.#ac.signal.removeEventListener('abort', this.#handleAbort)
this.#entriesStream.removeListener('error', this.#ac.abort)
// queuedDownloads is likely to be empty here anyway, but clear just in case.
this.#queuedDownloads.clear()
this.#entriesStream.destroy()
}
}
80 changes: 80 additions & 0 deletions src/blob-store/entries-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import SubEncoder from 'sub-encoder'
import mergeStreams from '@sindresorhus/merge-streams'
import { Transform, pipeline } from 'node:stream'
import { noop } from '../utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobStoreEntriesStream } from '../types.js' */

const keyEncoding = new SubEncoder('files', 'utf-8')

/**
*
* @param {import('./index.js').THyperdriveIndex} driveIndex
* @param {object} opts
* @param {boolean} [opts.live=false]
* @returns {BlobStoreEntriesStream}
*/
export function createEntriesStream(driveIndex, { live = false } = {}) {
const mergedEntriesStreams = mergeStreams(
[...driveIndex].map((drive) => getHistoryStream(drive.db, { live }))
)
driveIndex.on('add-drive', addDrive)
// Close is always emitted, so we can use it to remove the listener
mergedEntriesStreams.once('close', () =>
driveIndex.off('add-drive', addDrive)
)
return mergedEntriesStreams

/** @param {Hyperdrive} drive */
function addDrive(drive) {
mergedEntriesStreams.add(getHistoryStream(drive.db, { live }))
}
}

/**
*
* @param {import('hyperbee')} bee
* @param {object} opts
* @param {boolean} opts.live
*/
function getHistoryStream(bee, { live }) {
// This will also include old versions of files, but it is the only way to
// get a live stream from a Hyperbee, however we currently do not support
// edits of blobs, so this should not be an issue, and the consequence is
// that old versions are downloaded too, which is acceptable.
const historyStream = bee.createHistoryStream({
live,
// `keyEncoding` is necessary because hyperdrive stores file index data
// under the `files` sub-encoding key
keyEncoding,
})
return pipeline(historyStream, new AddDriveIds(bee.core), noop)
}

class AddDriveIds extends Transform {
#core
#cachedDriveId

/** @param {import('hypercore')} core */
constructor(core) {
super({ objectMode: true })
this.#core = core
this.#cachedDriveId = core.discoveryKey?.toString('hex')
}

/** @type {Transform['_transform']} */
_transform(entry, _, callback) {
// Minimal performance optimization to only call toString() once.
// core.discoveryKey will always be defined by the time it starts
// streaming, but could be null when the instance is first created.
let driveId
if (this.#cachedDriveId) {
driveId = this.#cachedDriveId
} else {
driveId = this.#core.discoveryKey?.toString('hex')
this.#cachedDriveId = driveId
}
callback(null, { ...entry, driveId })
}
}
Loading

0 comments on commit 0a2db23

Please sign in to comment.