Skip to content

Commit

Permalink
feat: serve blobs over http (#117)
Browse files Browse the repository at this point in the history
* WIP: server blobs over http

* Add basic test

* add magic bytes dep

* implement initial mime type support

* updates to hyper* types declarations

* fix mime guessing

* add more tests

* maybe fix CI issue

* attempt 2 at fixing CI

* Revert "attempt 2 at fixing CI"

This reverts commit 9383baf.

Just to figure out the original issue and go from there...

* adjust export of SUPPORTED_BLOB_VARIANTS

* implement fastify plugin

* Avoid unused import

* Fix magicbytes type & avoid Array.from(buf)

* Set .mvnrc

* fix Hyperdrive.entry return type

* fix Hyperdrive.get return type

* fix Hyperdrive.getBlobs return type

* extract fixtures setup in tests

* remove use of solo

* update fastify import

* move fixtures constants

* fix return type of Hyperblobs.get

* fix return type of BlobStore.entry

* add error code paths

* update populateStore test helper

* create separate test helper for creating server

* WIP: add test for non-replicated blob 404

test passes but not for the expected reason

* remove console log

* mark non-replicated blob test as todo

* specify fastify as dev dep for tests

* remove plain http implementation

* add blob-server to exports field

* Revert "remove plain http implementation"

This reverts commit c3aae56.

* replace plain http implementation with fastify

* fix non-replicated 404 test

* fix jsdoc import

* add createEntryReadStream and getEntryBlob to BlobStore

* fix test names

* update types for plugin opts

* runtime check for blobStore plugin opt

* remove type annotation for params json schema def

* add projectId route param

* add route creation util for tests

* add projectId param to fastify validation schema

* remove commented line

* chore: use typebox for fastify param

* validate blobId without casting type

* 404 if projectId not found

* DRY test setup

* make #getDrive() a private method

* Add comment as future reminder

* stricter metadata and rename mimeType

Don't see a reason to allow any metadata at this time

* Add test for invalid variant-type combo

Checked coverage and this was not being tested

* fix tests for mimeType handling

* Add test for throw on invalid option

* Add logger option to testenv

* Add random fixture to test application/octet-stream

The branch for returning this header for an unrecognized mimeType was
not being tested by existing fixtures

---------

Co-authored-by: Andrew Chou <[email protected]>
  • Loading branch information
gmaclennan and achou11 authored Jul 28, 2023
1 parent 16bef6c commit 8e64abd
Show file tree
Hide file tree
Showing 14 changed files with 1,171 additions and 77 deletions.
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
18
129 changes: 129 additions & 0 deletions lib/blob-server/fastify-plugin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// @ts-check
import fp from 'fastify-plugin'
import { filetypemime } from 'magic-bytes.js'
import { Type as T } from '@sinclair/typebox'

import { SUPPORTED_BLOB_VARIANTS } from '../blob-store/index.js'

export default fp(blobServerPlugin, {
fastify: '4.x',
name: 'mapeo-blob-server',
})

/** @typedef {import('../types').BlobId} BlobId */

/**
* @typedef {Object} BlobServerPluginOpts
*
* @property {(projectId: string) => import('../blob-store/index.js').BlobStore} getBlobStore
*/

const BLOB_TYPES = /** @type {BlobId['type'][]} */ (
Object.keys(SUPPORTED_BLOB_VARIANTS)
)
const BLOB_VARIANTS = [
...new Set(Object.values(SUPPORTED_BLOB_VARIANTS).flat()),
]
const HEX_REGEX_32_BYTES = '^[0-9a-fA-F]{64}$'
const HEX_STRING_32_BYTES = T.String({ pattern: HEX_REGEX_32_BYTES })

const PARAMS_JSON_SCHEMA = T.Object({
projectId: HEX_STRING_32_BYTES,
driveId: HEX_STRING_32_BYTES,
type: T.Union(
BLOB_TYPES.map((type) => {
return T.Literal(type)
})
),
variant: T.Union(
BLOB_VARIANTS.map((variant) => {
return T.Literal(variant)
})
),
name: T.String(),
})

/** @type {import('fastify').FastifyPluginAsync<import('fastify').RegisterOptions & BlobServerPluginOpts>} */
async function blobServerPlugin(fastify, options) {
if (!options.getBlobStore) throw new Error('Missing getBlobStore')

// We call register here so that the `prefix` option can work if desired
// https://fastify.dev/docs/latest/Reference/Routes#route-prefixing-and-fastify-plugin
fastify.register(routes, options)
}

/** @type {import('fastify').FastifyPluginAsync<Omit<BlobServerPluginOpts, 'prefix'>, import('fastify').RawServerDefault, import('@fastify/type-provider-typebox').TypeBoxTypeProvider>} */
async function routes(fastify, options) {
const { getBlobStore } = options

fastify.get(
'/:projectId/:driveId/:type/:variant/:name',
{ schema: { params: PARAMS_JSON_SCHEMA } },
async (request, reply) => {
const { projectId, ...blobId } = request.params

if (!isValidBlobId(blobId)) {
reply.code(400)
throw new Error(
`Unsupported variant "${blobId.variant}" for ${blobId.type}`
)
}
const { driveId } = blobId

let blobStore
try {
blobStore = getBlobStore(projectId)
} catch (e) {
reply.code(404)
throw e
}

const entry = await blobStore.entry(blobId, { wait: false })

if (!entry) {
reply.code(404)
throw new Error('Entry not found')
}

const { metadata } = entry.value

const blobStream = await blobStore.createEntryReadStream(driveId, entry)

// Extract the 'mimeType' property of the metadata and use it for the response header if found
if (
metadata &&
'mimeType' in metadata &&
typeof metadata.mimeType === 'string'
) {
reply.header('Content-Type', metadata.mimeType)
} else {
// Attempt to guess the MIME type based on the blob contents
const blobSlice = await blobStore.getEntryBlob(driveId, entry, {
length: 20,
})

if (!blobSlice) {
reply.code(404)
throw new Error('Blob not found')
}

const [guessedMime] = filetypemime(blobSlice)

reply.header('Content-Type', guessedMime || 'application/octet-stream')
}

return reply.send(blobStream)
}
)
}

/**
* @param {Omit<BlobId, 'variant'> & { variant: BlobId['variant'] }} maybeBlobId
* @returns {maybeBlobId is BlobId}
*/
function isValidBlobId(maybeBlobId) {
const { type, variant } = maybeBlobId
/** @type {readonly BlobId['variant'][]} */
const validVariants = SUPPORTED_BLOB_VARIANTS[type]
return validVariants.includes(variant)
}
25 changes: 25 additions & 0 deletions lib/blob-server/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import fastify from 'fastify'

import BlobServerPlugin from './fastify-plugin.js'

/**
* @param {object} opts
* @param {import('fastify').FastifyServerOptions['logger']} opts.logger
* @param {import('../blob-store/index.js').BlobStore} opts.blobStore
* @param {import('./fastify-plugin.js').BlobServerPluginOpts['getBlobStore']} opts.getBlobStore
* @param {import('fastify').RegisterOptions['prefix']} opts.prefix
* @param {string} opts.projectId Temporary option to enable `getBlobStore` option. Will be removed when multiproject support in Mapeo class is implemented.
*
*/
export function createBlobServer({ logger, blobStore, prefix, projectId }) {
const server = fastify({ logger })
server.register(BlobServerPlugin, {
getBlobStore: (projId) => {
// Temporary measure until multiprojects is implemented in Mapeo class
if (projectId !== projId) throw new Error('Project ID does not match')
return blobStore
},
prefix,
})
return server
}
119 changes: 94 additions & 25 deletions lib/blob-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import { LiveDownload } from './live-download.js'

// prop = blob type name
// value = array of blob variants supported for that type
export const SUPPORTED_BLOB_VARIANTS = /** @type {const} */ ({
const SUPPORTED_BLOB_VARIANTS = /** @type {const} */ ({
photo: ['original', 'preview', 'thumbnail'],
audio: ['original'],
video: ['original']
video: ['original'],
})

// Cannot directly export the const assignment above because export does not
// like the assignment being wrapped in parenthesis, which is necessary to cast
// the type with JSDoc
export { SUPPORTED_BLOB_VARIANTS }

class ErrNotFound extends Error {
constructor(message = 'NotFound') {
super(message)
Expand All @@ -36,7 +41,7 @@ export class BlobStore {
* @param {object} options
* @param {import('../core-manager/index.js').CoreManager} options.coreManager
*/
constructor ({ coreManager }) {
constructor({ coreManager }) {
const corestore = new PretendCorestore({ coreManager })
const blobIndexCores = coreManager.getCores('blobIndex')
const { key: writerKey } = coreManager.getWriterCore('blobIndex')
Expand All @@ -61,15 +66,24 @@ export class BlobStore {
throw new Error('Could not find a writer for the blobIndex namespace')
}

/**
* @param {string} driveId
*/
#getDrive(driveId) {
// TODO: Support 'wait' and 'timeout' opts (requires patch to Hyperdrive, or custom implementation of createReadStream)
const drive = this.#hyperdrives.get(driveId)
if (!drive) throw new Error('Drive not found ' + driveId.slice(0, 7))
return drive
}

/**
* @param {BlobId} blobId
* @param {object} opts
* @param {false} [opts.wait=false] [NOT YET IMPLEMENTED] Set to `true` to wait for a blob to download, otherwise will throw if blob is not available locally
* @param {never} [opts.timeout] [NOT YET IMPLEMENTED] Optional timeout to wait for a blob to download
*/
async get ({ type, variant, name, driveId }, { wait = false, timeout } = {}) {
const drive = this.#hyperdrives.get(driveId)
if (!drive) throw new Error('Drive not found ' + driveId.slice(0, 7))
async get({ type, variant, name, driveId }, { wait = false, timeout } = {}) {
const drive = this.#getDrive(driveId)
const path = makePath({ type, variant, name })
const blob = await drive.get(path, { wait, timeout })
if (!blob) throw new ErrNotFound()
Expand All @@ -89,34 +103,70 @@ export class BlobStore {
* @param {AbortSignal} [options.signal] Optional AbortSignal to cancel in-progress download
* @returns EventEmitter with `.state` propery, emits `state` with new state when it updates
*/
download (filter, { signal } = {}) {
download(filter, { signal } = {}) {
return new LiveDownload(this.#hyperdrives.values(), this.#driveEmitter, {
filter,
signal
signal,
})
}

/**
* @param {BlobId} blobId
*/
createReadStream ({ type, variant, name, driveId }) {
// TODO: Support 'wait' and 'timeout' opts (requires patch to Hyperdrive, or custom implementation of createReadStream)
const drive = this.#hyperdrives.get(driveId)
// TODO: Should this be an emit error on the returned stream?
if (!drive) throw new Error('Drive not found ' + driveId.slice(0, 7))
createReadStream({ type, variant, name, driveId }) {
// TODO: Error thrown from this be an emit error on the returned stream?
const drive = this.#getDrive(driveId)
const path = makePath({ type, variant, name })
return drive.createReadStream(path)
}

/**
* Optimization for creating the blobs read stream when you have
* previously read the entry from Hyperdrive using `drive.entry`
* @param {BlobId['driveId']} driveId Hyperdrive drive id
* @param {import('hyperdrive').HyperdriveEntry} entry Hyperdrive entry
*/
async createEntryReadStream(driveId, entry) {
const drive = this.#getDrive(driveId)
const blobs = await drive.getBlobs()

if (!blobs)
throw new Error(
'Hyperblobs instance not found for drive ' + driveId.slice(0, 7)
)

return blobs.createReadStream(entry.value.blob)
}

/**
* @param {BlobId['driveId']} driveId Hyperdrive drive id
* @param {import('hyperdrive').HyperdriveEntry} entry Hyperdrive entry
* @param {object} [opts]
* @param {number} [opts.length]
*
* @returns {Promise<Buffer | null>}
*/
async getEntryBlob(driveId, entry, { length } = {}) {
const drive = this.#getDrive(driveId)
const blobs = await drive.getBlobs()

if (!blobs)
throw new Error(
'Hyperblobs instance not found for drive ' + driveId.slice(0, 7)
)

return blobs.get(entry.value.blob, { wait: false, start: 0, length })
}

/**
*
* @param {Omit<BlobId, 'driveId'>} blobId
* @param {Buffer} blob
* @param {object} [options]
* @param {any} [options.metadata] Metadata to store with the blob
* @param {{mimeType: string}} [options.metadata] Metadata to store with the blob
* @returns {Promise<string>} public key as hex string of hyperdrive where blob is stored
*/
async put ({ type, variant, name }, blob, options) {
async put({ type, variant, name }, blob, options) {
const path = makePath({ type, variant, name })
await this.#writer.put(path, blob, options)
return this.#writer.key.toString('hex')
Expand All @@ -125,13 +175,32 @@ export class BlobStore {
/**
* @param {Omit<BlobId, 'driveId'>} blobId
* @param {object} [options]
* @param {any} [options.metadata] Metadata to store with the blob
* @param {{mimeType: string}} [options.metadata] Metadata to store with the blob
*/
createWriteStream ({ type, variant, name }, options) {
createWriteStream({ type, variant, name }, options) {
const path = makePath({ type, variant, name })
const stream = this.#writer.createWriteStream(path, options)
return proxyProps(stream, { driveId: this.#writer.key.toString('hex') })
}

/**
* @param {BlobId} blobId
* @param {object} [options]
* @param {boolean} [options.follow=false] Set to `true` to follow symlinks (16 max or throws an error)
* @param {false} [options.wait=false] Set to `true` to wait for a blob to download, otherwise will throw if blob is not available locally
* @param {never} [options.timeout] Optional timeout to wait for a blob to download
* @returns {Promise<import('hyperdrive').HyperdriveEntry | null>}
*/
async entry(
{ type, variant, name, driveId },
options = { follow: false, wait: false }
) {
const drive = this.#hyperdrives.get(driveId)
if (!drive) throw new Error('Drive not found ' + driveId.slice(0, 7))
const path = makePath({ type, variant, name })
const entry = await drive.entry(path, options)
return entry
}
}

/**
Expand All @@ -141,22 +210,22 @@ export class BlobStore {
* @param {U} props
* @returns {T & U}
*/
function proxyProps (target, props) {
function proxyProps(target, props) {
// @ts-ignore - too much time to learn how to teach this to Typescript
return new Proxy(target, {
get (target, prop, receiver) {
get(target, prop, receiver) {
// eslint-disable-next-line no-prototype-builtins
if (props.hasOwnProperty(prop)) {
return Reflect.get(props, prop, receiver)
} else {
return Reflect.get(target, prop, receiver)
}
}
},
})
}

/** @param {Pick<BlobId, 'type' | 'variant' | 'name'>} opts */
function makePath ({ type, variant, name }) {
function makePath({ type, variant, name }) {
return `/${type}/${variant}/${name}`
}

Expand All @@ -170,15 +239,15 @@ class PretendCorestore {
* @param {object} options
* @param {import('../core-manager/index.js').CoreManager} options.coreManager
*/
constructor ({ coreManager }) {
constructor({ coreManager }) {
this.#coreManager = coreManager
}

/**
* @param {Buffer | { publicKey: Buffer } | { name: string }} opts
* @returns {import('hypercore').default | undefined}
* @returns {import('hypercore') | undefined}
*/
get (opts) {
get(opts) {
if (b4a.isBuffer(opts)) {
opts = { publicKey: opts }
}
Expand Down Expand Up @@ -207,5 +276,5 @@ class PretendCorestore {
}

/** no-op */
close () {}
close() {}
}
Loading

0 comments on commit 8e64abd

Please sign in to comment.