From 0668957cf5ec4e76432e84a5b23e6d939a636385 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 5 Apr 2024 15:25:07 +0200 Subject: [PATCH] chore: address review --- packages/capabilities/package.json | 4 + packages/capabilities/src/blob.js | 200 +----------------- packages/capabilities/src/http.js | 55 +++++ packages/capabilities/src/index.js | 12 +- packages/capabilities/src/types.ts | 46 ++-- packages/capabilities/src/ucan.js | 43 +--- .../capabilities/src/web3.storage/blob.js | 102 +++++++++ packages/upload-api/src/blob.js | 4 - packages/upload-api/src/blob/accept.js | 5 +- packages/upload-api/src/blob/add.js | 102 +++++---- packages/upload-api/src/blob/allocate.js | 150 ++++++------- packages/upload-api/src/blob/lib.js | 8 +- packages/upload-api/src/blob/list.js | 15 -- packages/upload-api/src/blob/remove.js | 22 -- packages/upload-api/src/errors.js | 13 ++ packages/upload-api/src/types.ts | 8 - packages/upload-api/src/ucan/conclude.js | 72 +++++-- packages/upload-api/test/handlers/blob.js | 105 +++++---- 18 files changed, 476 insertions(+), 490 deletions(-) create mode 100644 packages/capabilities/src/http.js create mode 100644 packages/capabilities/src/web3.storage/blob.js delete mode 100644 packages/upload-api/src/blob/list.js delete mode 100644 packages/upload-api/src/blob/remove.js diff --git a/packages/capabilities/package.json b/packages/capabilities/package.json index 4649e73c6..208fb626c 100644 --- a/packages/capabilities/package.json +++ b/packages/capabilities/package.json @@ -56,6 +56,10 @@ "types": "./dist/src/filecoin/dealer.d.ts", "import": "./src/filecoin/dealer.js" }, + "./web3.storage/blob": { + "types": "./dist/src/web3.storage/blob.d.ts", + "import": "./src/web3.storage/blob.js" + }, "./types": { "types": "./dist/src/types.d.ts", "import": "./src/types.js" diff --git a/packages/capabilities/src/blob.js b/packages/capabilities/src/blob.js index 163781126..a9108dcd4 100644 --- a/packages/capabilities/src/blob.js +++ b/packages/capabilities/src/blob.js @@ -11,16 +11,8 @@ * * @module */ -import { capability, Link, Schema, ok, fail } from '@ucanto/validator' -import { - equal, - equalBlob, - equalContent, - equalWith, - checkLink, - SpaceDID, - and, -} from './utils.js' +import { capability, Schema } from '@ucanto/validator' +import { equalBlob, equalWith, SpaceDID } from './utils.js' /** * Agent capabilities for Blob protocol @@ -79,192 +71,6 @@ export const add = capability({ derives: equalBlob, }) -/** - * `blob/remove` capability can be used to remove the stored Blob from the (memory) - * space identified by `with` field. - */ -export const remove = capability({ - can: 'blob/remove', - /** - * DID of the (memory) space where Blob is intended to - * be stored. - */ - with: SpaceDID, - nb: Schema.struct({ - /** - * A multihash digest of the blob payload bytes, uniquely identifying blob. - */ - content: Schema.bytes(), - }), - derives: equalContent, -}) - -/** - * `blob/list` capability can be invoked to request a list of stored Blobs in the - * (memory) space identified by `with` field. - */ -export const list = capability({ - can: 'blob/list', - /** - * DID of the (memory) space where Blob is intended to - * be stored. - */ - with: SpaceDID, - nb: Schema.struct({ - /** - * A pointer that can be moved back and forth on the list. - * It can be used to paginate a list for instance. - */ - cursor: Schema.string().optional(), - /** - * Maximum number of items per page. - */ - size: Schema.integer().optional(), - /** - * If true, return page of results preceding cursor. Defaults to false. - */ - pre: Schema.boolean().optional(), - }), - derives: (claimed, delegated) => { - if (claimed.with !== delegated.with) { - return fail( - `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` - ) - } - return ok({}) - }, -}) - -/** - * Service capabilities for Blob protocol - */ -/** - * Capability can only be delegated (but not invoked) allowing audience to - * derived any `web3.storage/blob/` prefixed capability for the (memory) space identified - * by DID in the `with` field. - */ -export const serviceBlob = capability({ - can: 'web3.storage/blob/*', - /** - * DID of the (memory) space where Blob is intended to - * be stored. - */ - with: SpaceDID, - derives: equalWith, -}) - -/** - * `web3.storage/blob//allocate` capability can be invoked to create a memory - * address where blob content can be written via HTTP PUT request. - */ -export const allocate = capability({ - can: 'web3.storage/blob/allocate', - /** - * Provider DID. - */ - with: Schema.did(), - nb: Schema.struct({ - /** - * Blob to allocate on the space. - */ - blob: blobStruct, - /** - * The Link for an Add Blob task, that caused an allocation - */ - cause: Link, - /** - * DID of the user space where allocation takes place - */ - space: SpaceDID, - }), - derives: (claim, from) => { - return ( - and(equalWith(claim, from)) || - and(equalBlob(claim, from)) || - and(checkLink(claim.nb.cause, from.nb.cause, 'cause')) || - and(equal(claim.nb.space, from.nb.space, 'space')) || - ok({}) - ) - }, -}) - -/** - * `http/put` capability invocation MAY be performed by any agent on behalf of the subject. - * The `blob/add` provider MUST add `/http/put` effect and capture private key of the - * `subject` in the `meta` field so that any agent could perform it. - */ -export const put = capability({ - can: 'http/put', - /** - * DID of the (memory) space where Blob is intended to - * be stored. - */ - with: SpaceDID, - nb: Schema.struct({ - /** - * Blob to allocate on the space. - */ - blob: blobStruct, - /** - * Blob to accept. - */ - address: Schema.struct({ - /** - * HTTP(S) location that can receive blob content via HTTP PUT request. - */ - url: Schema.string(), - /** - * HTTP headers. - */ - headers: Schema.unknown(), - }).optional(), - }), - derives: (claim, from) => { - return ( - and(equalWith(claim, from)) || - and(equalBlob(claim, from)) || - and(equal(claim.nb.address?.url, from.nb.address, 'url')) || - and(equal(claim.nb.address?.headers, from.nb.address, 'headers')) || - ok({}) - ) - }, -}) - -/** - * `blob/accept` capability invocation should either succeed when content is - * delivered on allocated address or fail if no content is allocation expires - * without content being delivered. - */ -export const accept = capability({ - can: 'web3.storage/blob/accept', - /** - * Provider DID. - */ - with: Schema.did(), - nb: Schema.struct({ - /** - * Blob to accept. - */ - blob: blobStruct, - /** - * Expiration.. - */ - exp: Schema.integer(), - }), - derives: (claim, from) => { - const result = equalBlob(claim, from) - if (result.error) { - return result - } else if (claim.nb.exp !== undefined && from.nb.exp !== undefined) { - return claim.nb.exp > from.nb.exp - ? fail(`exp constraint violation: ${claim.nb.exp} > ${from.nb.exp}`) - : ok({}) - } else { - return ok({}) - } - }, -}) - // ⚠️ We export imports here so they are not omitted in generated typedes // @see https://github.com/microsoft/TypeScript/issues/51548 -export { Schema, Link } +export { Schema } diff --git a/packages/capabilities/src/http.js b/packages/capabilities/src/http.js new file mode 100644 index 000000000..cf171ee4c --- /dev/null +++ b/packages/capabilities/src/http.js @@ -0,0 +1,55 @@ +/** + * HTTP Capabilities. + * + * These can be imported directly with: + * ```js + * import * as HTTP from '@web3-storage/capabilities/http' + * ``` + * + * @module + */ +import { capability, Schema, ok } from '@ucanto/validator' +import { blobStruct } from './blob.js' +import { equal, equalBlob, equalWith, SpaceDID, and } from './utils.js' + +/** + * `http/put` capability invocation MAY be performed by any agent on behalf of the subject. + * The `blob/add` provider MUST add `/http/put` effect and capture private key of the + * `subject` in the `meta` field so that any agent could perform it. + */ +export const put = capability({ + can: 'http/put', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * Blob to allocate on the space. + */ + blob: blobStruct, + /** + * Blob to accept. + */ + address: Schema.struct({ + /** + * HTTP(S) location that can receive blob content via HTTP PUT request. + */ + url: Schema.string(), + /** + * HTTP headers. + */ + headers: Schema.unknown(), + }).optional(), + }), + derives: (claim, from) => { + return ( + and(equalWith(claim, from)) || + and(equalBlob(claim, from)) || + and(equal(claim.nb.address?.url, from.nb.address, 'url')) || + and(equal(claim.nb.address?.headers, from.nb.address, 'headers')) || + ok({}) + ) + }, +}) diff --git a/packages/capabilities/src/index.js b/packages/capabilities/src/index.js index fc7e4bc7c..c5d9385d1 100644 --- a/packages/capabilities/src/index.js +++ b/packages/capabilities/src/index.js @@ -20,6 +20,8 @@ import * as UCAN from './ucan.js' import * as Plan from './plan.js' import * as Usage from './usage.js' import * as Blob from './blob.js' +import * as W3sBlob from './web3.storage/blob.js' +import * as HTTP from './http.js' export { Access, @@ -90,10 +92,8 @@ export const abilitiesAsStrings = [ Usage.report.can, Blob.blob.can, Blob.add.can, - Blob.remove.can, - Blob.list.can, - Blob.serviceBlob.can, - Blob.put.can, - Blob.allocate.can, - Blob.accept.can, + W3sBlob.blob.can, + W3sBlob.allocate.can, + W3sBlob.accept.can, + HTTP.put.can, ] diff --git a/packages/capabilities/src/types.ts b/packages/capabilities/src/types.ts index 64c99825c..2177b9958 100644 --- a/packages/capabilities/src/types.ts +++ b/packages/capabilities/src/types.ts @@ -22,6 +22,8 @@ import { space, info } from './space.js' import * as provider from './provider.js' import { top } from './top.js' import * as BlobCaps from './blob.js' +import * as W3sBlobCaps from './web3.storage/blob.js' +import * as HTTPCaps from './http.js' import * as StoreCaps from './store.js' import * as UploadCaps from './upload.js' import * as AccessCaps from './access.js' @@ -440,29 +442,33 @@ export interface UploadNotFound extends Ucanto.Failure { export type UploadGetFailure = UploadNotFound | Ucanto.Failure +// HTTP +export type HTTPPut = InferInvokedCapability + // Blob export type Blob = InferInvokedCapability export type BlobAdd = InferInvokedCapability -export type BlobRemove = InferInvokedCapability -export type BlobList = InferInvokedCapability -export type ServiceBlob = InferInvokedCapability -export type BlobPut = InferInvokedCapability -export type BlobAllocate = InferInvokedCapability -export type BlobAccept = InferInvokedCapability +export type ServiceBlob = InferInvokedCapability +export type BlobAllocate = InferInvokedCapability +export type BlobAccept = InferInvokedCapability export type BlobMultihash = Uint8Array +export interface BlobModel { + content: BlobMultihash + size: number +} // Blob add export interface BlobAddSuccess { - claim: { - 'await/ok': Link + location: { + 'ucan/await': ['.out.ok.claim', Link] } } -export interface BlobItemSizeExceeded extends Ucanto.Failure { - name: 'BlobItemSizeExceeded' +export interface BlobExceedsSizeLimit extends Ucanto.Failure { + name: 'BlobExceedsSizeLimit' } -export type BlobAddFailure = BlobItemSizeExceeded | Ucanto.Failure +export type BlobAddFailure = BlobExceedsSizeLimit | Ucanto.Failure // Blob remove export interface BlobRemoveSuccess { @@ -473,12 +479,13 @@ export interface BlobItemNotFound extends Ucanto.Failure { name: 'BlobItemNotFound' } +// TODO: Add more errors from stores export type BlobRemoveFailure = BlobItemNotFound | Ucanto.Failure // Blob list export interface BlobListSuccess extends ListResponse {} export interface BlobListItem { - blob: { content: Uint8Array; size: number } + blob: BlobModel insertedAt: ISO8601Date } @@ -657,11 +664,14 @@ export type UCANRevokeFailure = | UnauthorizedRevocation | RevocationsStoreFailure -export interface InvocationNotFound extends Ucanto.Failure { - name: 'InvocationNotFound' +/** + * Error is raised when receipt is received for unknown invocation + */ +export interface InvocationNotFoundForReceipt extends Ucanto.Failure { + name: 'InvocationNotFoundForReceipt' } -export type UCANConcludeFailure = InvocationNotFound | Ucanto.Failure +export type UCANConcludeFailure = InvocationNotFoundForReceipt | Ucanto.Failure // Admin export type Admin = InferInvokedCapability @@ -797,12 +807,10 @@ export type ServiceAbilityArray = [ UsageReport['can'], Blob['can'], BlobAdd['can'], - BlobRemove['can'], - BlobList['can'], ServiceBlob['can'], - BlobPut['can'], BlobAllocate['can'], - BlobAccept['can'] + BlobAccept['can'], + HTTPPut['can'] ] /** diff --git a/packages/capabilities/src/ucan.js b/packages/capabilities/src/ucan.js index c703f2f97..51a07ffe0 100644 --- a/packages/capabilities/src/ucan.js +++ b/packages/capabilities/src/ucan.js @@ -2,7 +2,7 @@ * UCAN core capabilities. */ -import { capability, Schema } from '@ucanto/validator' +import { capability, Schema, ok } from '@ucanto/validator' import * as API from '@ucanto/interface' import { equalWith, equal, and, checkLink } from './utils.js' @@ -89,44 +89,17 @@ export const conclude = capability({ * MUST be the DID of the audience of the ran invocation. */ with: Schema.did(), - // TODO: Should this just have bytes? nb: Schema.struct({ - bytes: Schema.Bytes, - // /** - // * A link to the UCAN invocation that this receipt is for. - // */ - // ran: UCANLink, - // /** - // * The value output of the invocation in Result format. - // */ - // out: Schema.unknown(), - // /** - // * Tasks that the invocation would like to enqueue. - // */ - // next: Schema.array(UCANLink), - // /** - // * Additional data about the receipt - // */ - // meta: Schema.unknown(), - // /** - // * The UTC Unix timestamp at which the Receipt was issued - // */ - // time: Schema.integer(), + /** + * CID of the content with the UCANTO Message. + */ + message: Schema.link(), }), derives: (claim, from) => // With field MUST be the same - and(equalWith(claim, from)) ?? - equal(claim.nb.bytes, from.nb.bytes, 'nb.bytes'), - // // invocation MUST be the same - // and(checkLink(claim.nb.ran, from.nb.ran, 'nb.ran')) ?? - // // value output MUST be the same - // and(equal(claim.nb.out, from.nb.out, 'nb.out')) ?? - // // tasks to enqueue MUST be the same - // and(equal(claim.nb.next, from.nb.next, 'nb.next')) ?? - // // additional data MUST be the same - // and(equal(claim.nb.meta, from.nb.meta, 'nb.meta')) ?? - // // the receipt issue time MUST be the same - // equal(claim.nb.time, from.nb.time, 'nb.time'), + and(equalWith(claim, from)) || + and(checkLink(claim.nb.message, from.nb.message, 'nb.message')) || + ok({}), }) /** diff --git a/packages/capabilities/src/web3.storage/blob.js b/packages/capabilities/src/web3.storage/blob.js new file mode 100644 index 000000000..56188c847 --- /dev/null +++ b/packages/capabilities/src/web3.storage/blob.js @@ -0,0 +1,102 @@ +import { capability, Schema, Link, ok, fail } from '@ucanto/validator' +import { blobStruct } from '../blob.js' +import { + equalBlob, + equalWith, + SpaceDID, + and, + equal, + checkLink, +} from '../utils.js' + +/** + * Service capabilities for Blob protocol + */ +/** + * Capability can only be delegated (but not invoked) allowing audience to + * derived any `web3.storage/blob/` prefixed capability for the (memory) space identified + * by DID in the `with` field. + */ +export const blob = capability({ + can: 'web3.storage/blob/*', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + derives: equalWith, +}) + +/** + * `web3.storage/blob//allocate` capability can be invoked to create a memory + * address where blob content can be written via HTTP PUT request. + */ +export const allocate = capability({ + can: 'web3.storage/blob/allocate', + /** + * Provider DID. + */ + with: Schema.did(), + nb: Schema.struct({ + /** + * Blob to allocate on the space. + */ + blob: blobStruct, + /** + * The Link for an Add Blob task, that caused an allocation + */ + cause: Link, + /** + * DID of the user space where allocation takes place + */ + space: SpaceDID, + }), + derives: (claim, from) => { + return ( + and(equalWith(claim, from)) || + and(equalBlob(claim, from)) || + and(checkLink(claim.nb.cause, from.nb.cause, 'cause')) || + and(equal(claim.nb.space, from.nb.space, 'space')) || + ok({}) + ) + }, +}) + +/** + * `blob/accept` capability invocation should either succeed when content is + * delivered on allocated address or fail if no content is allocation expires + * without content being delivered. + */ +export const accept = capability({ + can: 'web3.storage/blob/accept', + /** + * Provider DID. + */ + with: Schema.did(), + nb: Schema.struct({ + /** + * Blob to accept. + */ + blob: blobStruct, + /** + * Expiration.. + */ + exp: Schema.integer(), + }), + derives: (claim, from) => { + const result = equalBlob(claim, from) + if (result.error) { + return result + } else if (claim.nb.exp !== undefined && from.nb.exp !== undefined) { + return claim.nb.exp > from.nb.exp + ? fail(`exp constraint violation: ${claim.nb.exp} > ${from.nb.exp}`) + : ok({}) + } else { + return ok({}) + } + }, +}) + +// ⚠️ We export imports here so they are not omitted in generated typedes +// @see https://github.com/microsoft/TypeScript/issues/51548 +export { Schema, Link } diff --git a/packages/upload-api/src/blob.js b/packages/upload-api/src/blob.js index 78b4bb40b..84187cefd 100644 --- a/packages/upload-api/src/blob.js +++ b/packages/upload-api/src/blob.js @@ -1,6 +1,4 @@ import { blobAddProvider } from './blob/add.js' -import { blobListProvider } from './blob/list.js' -import { blobRemoveProvider } from './blob/remove.js' import * as API from './types.js' /** @@ -9,7 +7,5 @@ import * as API from './types.js' export function createService(context) { return { add: blobAddProvider(context), - list: blobListProvider(context), - remove: blobRemoveProvider(context), } } diff --git a/packages/upload-api/src/blob/accept.js b/packages/upload-api/src/blob/accept.js index 07113aaa3..f7a26af44 100644 --- a/packages/upload-api/src/blob/accept.js +++ b/packages/upload-api/src/blob/accept.js @@ -1,5 +1,5 @@ import * as Server from '@ucanto/server' -import * as Blob from '@web3-storage/capabilities/blob' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' import * as API from '../types.js' import { BlobItemNotFound } from './lib.js' @@ -8,7 +8,7 @@ import { BlobItemNotFound } from './lib.js' * @returns {API.ServiceMethod} */ export function blobAcceptProvider(context) { - return Server.provide(Blob.accept, async ({ capability }) => { + return Server.provide(W3sBlob.accept, async ({ capability }) => { const { blob } = capability.nb // If blob is not stored, we must fail const hasBlob = await context.blobsStorage.has(blob.content) @@ -18,6 +18,7 @@ export function blobAcceptProvider(context) { } } + // TODO: Set bucket name // TODO: return content commitment return { diff --git a/packages/upload-api/src/blob/add.js b/packages/upload-api/src/blob/add.js index 86f28b5ec..300d097ea 100644 --- a/packages/upload-api/src/blob/add.js +++ b/packages/upload-api/src/blob/add.js @@ -3,10 +3,12 @@ import { Message } from '@ucanto/core' import { ed25519 } from '@ucanto/principal' import { CAR } from '@ucanto/transport' import * as Blob from '@web3-storage/capabilities/blob' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' +import * as HTTP from '@web3-storage/capabilities/http' import * as UCAN from '@web3-storage/capabilities/ucan' import * as API from '../types.js' -import { BlobItemSizeExceeded } from './lib.js' +import { BlobExceedsSizeLimit } from './lib.js' /** * @param {API.BlobServiceContext} context @@ -31,21 +33,23 @@ export function blobAddProvider(context) { // Verify blob is within accept size if (blob.size > maxUploadSize) { return { - error: new BlobItemSizeExceeded(maxUploadSize), + error: new BlobExceedsSizeLimit(maxUploadSize), } } - // Create shared subject for agent to issue `http/put` receipt + // We derive principal from the content multihash to be an audience + // of the `http/put` invocation. That way anyone with blob content + // could perform the invocation and issue receipt by deriving same + // principal const putSubject = await ed25519.derive(blob.content.slice(0, 32)) - const facts = Object.entries(putSubject.toArchive().keys).map( - ([key, value]) => ({ - did: key, - bytes: value, - }) - ) + const facts = [ + { + keys: putSubject.toArchive(), + }, + ] // Create web3.storage/blob/* invocations - const blobAllocate = Blob.allocate.invoke({ + const blobAllocate = W3sBlob.allocate.invoke({ issuer: id, audience: id, with: id.did(), @@ -56,7 +60,7 @@ export function blobAddProvider(context) { }, expiration: Infinity, }) - const blobAccept = Blob.accept.invoke({ + const blobAccept = W3sBlob.accept.invoke({ issuer: id, audience: id, with: id.toDIDKey(), @@ -71,12 +75,8 @@ export function blobAddProvider(context) { blobAccept.delegate(), ]) - // Get receipt for `blob/allocate` if available, or schedule invocation if not - const allocatedGetRes = await allocationsStorage.get( - space, - blob.content - ) + const allocatedGetRes = await allocationsStorage.get(space, blob.content) let blobAllocateReceipt let blobAllocateOutAddress // If already allocated, just get the allocate receipt @@ -114,37 +114,43 @@ export function blobAddProvider(context) { // Create `blob/allocate` receipt invocation to inline as effect const message = await Message.build({ receipts: [blobAllocateReceipt] }) const messageCar = await CAR.outbound.encode(message) + const bytes = new Uint8Array(messageCar.body) + const messageLink = await CAR.codec.link(bytes) + const allocateUcanConcludefx = await UCAN.conclude .invoke({ issuer: id, audience: id, with: id.toDIDKey(), nb: { - bytes: messageCar.body, + message: messageLink, }, expiration: Infinity, }) .delegate() + allocateUcanConcludefx.attach({ + bytes, + cid: messageLink, + }) // Create result object /** @type {API.OkBuilder} */ const result = Server.ok({ - claim: { - 'await/ok': acceptfx.link(), + location: { + 'ucan/await': ['.out.ok.claim', acceptfx.link()], }, }) // In case blob allocate provided an address to write // the blob is still not stored if (blobAllocateOutAddress) { - // Create effects for `blob/add` receipt - const blobPut = Blob.put.invoke({ + const blobPut = HTTP.put.invoke({ issuer: putSubject, audience: putSubject, with: putSubject.toDIDKey(), nb: { blob, - address: blobAllocateOutAddress + address: blobAllocateOutAddress, }, facts, expiration: Infinity, @@ -167,38 +173,44 @@ export function blobAddProvider(context) { } } - return result - // 1. System attempts to allocate memory in user space for the blob. - .fork(allocatefx) - .fork(allocateUcanConcludefx) - // 2. System requests user agent (or anyone really) to upload the content - // corresponding to the blob - // via HTTP PUT to given location. - .fork(putfx) - // 3. System will attempt to accept uploaded content that matches blob - // multihash and size. - .join(acceptfx) + return ( + result + // 1. System attempts to allocate memory in user space for the blob. + .fork(allocatefx) + .fork(allocateUcanConcludefx) + // 2. System requests user agent (or anyone really) to upload the content + // corresponding to the blob + // via HTTP PUT to given location. + .fork(putfx) + // 3. System will attempt to accept uploaded content that matches blob + // multihash and size. + .join(acceptfx) + ) } // Add allocate receipt if allocate was executed if (allocateUcanConcludefx) { - return result - // 1. System attempts to allocate memory in user space for the blob. + return ( + result + // 1. System attempts to allocate memory in user space for the blob. + .fork(allocatefx) + .fork(allocateUcanConcludefx) + // 3. System will attempt to accept uploaded content that matches blob + // multihash and size. + .join(acceptfx) + ) + } + + // Blob was already allocated and is already stored in the system + return ( + result + // 1. System allocated memory in user space for the blob. .fork(allocatefx) .fork(allocateUcanConcludefx) // 3. System will attempt to accept uploaded content that matches blob // multihash and size. .join(acceptfx) - } - - // Blob was already allocated and is already stored in the system - return result - // 1. System allocated memory in user space for the blob. - .fork(allocatefx) - .fork(allocateUcanConcludefx) - // 3. System will attempt to accept uploaded content that matches blob - // multihash and size. - .join(acceptfx) + ) }, }) } diff --git a/packages/upload-api/src/blob/allocate.js b/packages/upload-api/src/blob/allocate.js index f61cbfa69..bf78d5691 100644 --- a/packages/upload-api/src/blob/allocate.js +++ b/packages/upload-api/src/blob/allocate.js @@ -1,7 +1,6 @@ import * as Server from '@ucanto/server' -import * as Blob from '@web3-storage/capabilities/blob' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' import * as API from '../types.js' -import { BlobItemNotFound } from './lib.js' import { ensureRateLimitAbove } from '../utils/rate-limits.js' /** @@ -9,92 +8,95 @@ import { ensureRateLimitAbove } from '../utils/rate-limits.js' * @returns {API.ServiceMethod} */ export function blobAllocateProvider(context) { - return Server.provide(Blob.allocate, async ({ capability, invocation }) => { - const { blob, cause, space } = capability.nb + return Server.provide( + W3sBlob.allocate, + async ({ capability, invocation }) => { + const { blob, cause, space } = capability.nb - // Rate limiting validation - const rateLimitResult = await ensureRateLimitAbove( - context.rateLimitsStorage, - [space], - 0 - ) - if (rateLimitResult.error) { - return { - error: { - name: 'InsufficientStorage', - message: `${space} is blocked`, - }, + // Rate limiting validation + // TODO: we should not produce rate limit error but rather suspend / queue task to be run after enforcing a limit without erroring + const rateLimitResult = await ensureRateLimitAbove( + context.rateLimitsStorage, + [space], + 0 + ) + if (rateLimitResult.error) { + return { + error: { + name: 'RateLimited', + message: `${space} is blocked`, + }, + } } - } - // Has Storage provider validation - const result = await context.provisionsStorage.hasStorageProvider(space) - if (result.error) { - return result - } - if (!result.ok) { - return { - /** @type {API.AllocationError} */ - error: { - name: 'InsufficientStorage', - message: `${space} has no storage provider`, - }, + // Has Storage provider validation + const result = await context.provisionsStorage.hasStorageProvider(space) + if (result.error) { + return result + } + if (!result.ok) { + return { + /** @type {API.AllocationError} */ + error: { + name: 'InsufficientStorage', + message: `${space} has no storage provider`, + }, + } } - } - // Check if blob already exists - const hasBlob = await context.blobsStorage.has(blob.content) - if (hasBlob.error) { - return { - error: new BlobItemNotFound(space), + // Allocate in space, ignoring if already allocated + const allocationInsert = await context.allocationsStorage.insert({ + space, + blob, + invocation: cause, + }) + if (allocationInsert.error) { + // if the insert failed with conflict then this item has already been + // added to the space and there is no allocation change. + if (allocationInsert.error.name === 'RecordKeyConflict') { + return { + ok: { size: 0 }, + } + } + return { + error: new Server.Failure('failed to allocate blob bytes'), + } } - } - // Get presigned URL for the write target - const createUploadUrl = await context.blobsStorage.createUploadUrl( - blob.content, - blob.size - ) - if (createUploadUrl.error) { - return { - error: new Server.Failure('failed to provide presigned url'), + + // Check if blob already exists + const hasBlobStore = await context.blobsStorage.has(blob.content) + if (hasBlobStore.error) { + return hasBlobStore } - } - const address = { - url: createUploadUrl.ok.url.toString(), - headers: createUploadUrl.ok.headers, - } - // Allocate in space, ignoring if already allocated - const allocationInsert = await context.allocationsStorage.insert({ - space, - blob, - invocation: cause, - }) - if (allocationInsert.error) { - // if the insert failed with conflict then this item has already been - // added to the space and there is no allocation change. - if (allocationInsert.error.name === 'RecordKeyConflict') { + // If blob is stored, we can just allocate it to the space + if (hasBlobStore.ok) { return { - ok: { size: 0 }, + ok: { size: blob.size }, } } - return { - error: new Server.Failure('failed to allocate blob bytes'), + + // Get presigned URL for the write target + const createUploadUrl = await context.blobsStorage.createUploadUrl( + blob.content, + blob.size + ) + if (createUploadUrl.error) { + return { + error: new Server.Failure('failed to provide presigned url'), + } + } + const address = { + url: createUploadUrl.ok.url.toString(), + headers: createUploadUrl.ok.headers, } - } - // If blob is stored, we can just allocate it to the space - if (hasBlob.ok) { return { - ok: { size: blob.size }, + ok: { + size: blob.size, + address, + }, } } - - return { - ok: { - size: blob.size, - address, - }, - } - }) + ) } diff --git a/packages/upload-api/src/blob/lib.js b/packages/upload-api/src/blob/lib.js index ff3995109..0e4b1b86a 100644 --- a/packages/upload-api/src/blob/lib.js +++ b/packages/upload-api/src/blob/lib.js @@ -29,8 +29,8 @@ export class BlobItemNotFound extends Failure { } } -export const BlobItemSizeExceededName = 'BlobItemSizeExceeded' -export class BlobItemSizeExceeded extends Failure { +export const BlobExceedsSizeLimitName = 'BlobExceedsSizeLimit' +export class BlobExceedsSizeLimit extends Failure { /** * @param {Number} maxUploadSize */ @@ -40,11 +40,11 @@ export class BlobItemSizeExceeded extends Failure { } get name() { - return BlobItemSizeExceededName + return BlobExceedsSizeLimitName } describe() { - return `Maximum size exceeded: ${this.maxUploadSize}, split DAG into smaller shards.` + return `Blob exceeded maximum size limit: ${this.maxUploadSize}, consider splitting it into blobs that fit limit.` } toJSON() { diff --git a/packages/upload-api/src/blob/list.js b/packages/upload-api/src/blob/list.js deleted file mode 100644 index c6c34bdb7..000000000 --- a/packages/upload-api/src/blob/list.js +++ /dev/null @@ -1,15 +0,0 @@ -import * as Server from '@ucanto/server' -import * as Blob from '@web3-storage/capabilities/blob' -import * as API from '../types.js' - -/** - * @param {API.BlobServiceContext} context - * @returns {API.ServiceMethod} - */ -export function blobListProvider(context) { - return Server.provide(Blob.list, async ({ capability }) => { - const { cursor, size, pre } = capability.nb - const space = Server.DID.parse(capability.with).did() - return await context.allocationsStorage.list(space, { size, cursor, pre }) - }) -} diff --git a/packages/upload-api/src/blob/remove.js b/packages/upload-api/src/blob/remove.js deleted file mode 100644 index fb2e8c2d8..000000000 --- a/packages/upload-api/src/blob/remove.js +++ /dev/null @@ -1,22 +0,0 @@ -import * as Server from '@ucanto/server' -import * as Blob from '@web3-storage/capabilities/blob' -import * as API from '../types.js' -import { BlobItemNotFound } from './lib.js' - -/** - * @param {API.BlobServiceContext} context - * @returns {API.ServiceMethod} - */ -export function blobRemoveProvider(context) { - return Server.provide(Blob.remove, async ({ capability }) => { - const { content } = capability.nb - const space = Server.DID.parse(capability.with).did() - - const res = await context.allocationsStorage.remove(space, content) - if (res.error && res.error.name === 'RecordNotFound') { - return Server.error(new BlobItemNotFound(space)) - } - - return res - }) -} diff --git a/packages/upload-api/src/errors.js b/packages/upload-api/src/errors.js index 13620c3a1..885c8a080 100644 --- a/packages/upload-api/src/errors.js +++ b/packages/upload-api/src/errors.js @@ -23,3 +23,16 @@ export class RecordNotFound extends Server.Failure { return RecordNotFoundErrorName } } + +export const DecodeBlockOperationErrorName = /** @type {const} */ ( + 'DecodeBlockOperationFailed' +) +export class DecodeBlockOperationFailed extends Server.Failure { + get reason() { + return this.message + } + + get name() { + return DecodeBlockOperationErrorName + } +} diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index ec9080616..51c78ede6 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -57,12 +57,6 @@ import { BlobAdd, BlobAddSuccess, BlobAddFailure, - BlobRemove, - BlobRemoveSuccess, - BlobRemoveFailure, - BlobList, - BlobListSuccess, - BlobListFailure, BlobAllocate, BlobAllocateSuccess, BlobAllocateFailure, @@ -192,8 +186,6 @@ export type { AllocationsStorage, BlobsStorage, TasksStorage, BlobAddInput } export interface Service extends StorefrontService, W3sService { blob: { add: ServiceMethod - remove: ServiceMethod - list: ServiceMethod } store: { add: ServiceMethod diff --git a/packages/upload-api/src/ucan/conclude.js b/packages/upload-api/src/ucan/conclude.js index ead5c752b..e6623e2d4 100644 --- a/packages/upload-api/src/ucan/conclude.js +++ b/packages/upload-api/src/ucan/conclude.js @@ -1,8 +1,10 @@ import { provide } from '@ucanto/server' import { Message } from '@ucanto/core' import { CAR } from '@ucanto/transport' -import * as Blob from '@web3-storage/capabilities/blob' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' +import * as HTTP from '@web3-storage/capabilities/http' import { conclude } from '@web3-storage/capabilities/ucan' +import { DecodeBlockOperationFailed } from '../errors.js' import * as API from '../types.js' /** @@ -15,8 +17,15 @@ export const ucanConcludeProvider = ({ tasksStorage, tasksScheduler, }) => - provide(conclude, async ({ capability }) => { - const messageCar = CAR.codec.decode(capability.nb.bytes) + provide(conclude, async ({ capability, invocation }) => { + const getBlockRes = await findBlock( + capability.nb.message, + invocation.iterateIPLDBlocks() + ) + if (getBlockRes.error) { + return getBlockRes + } + const messageCar = CAR.codec.decode(getBlockRes.ok) const message = Message.view({ root: messageCar.roots[0].cid, store: messageCar.blocks, @@ -30,6 +39,8 @@ export const ucanConcludeProvider = ({ throw new Error('receipt should exist') } + // TODO: Verify invocation exists failing with InvocationNotFoundForReceipt + // Store receipt const receiptPutRes = await receiptsStorage.put(receipt) if (receiptPutRes.error) { @@ -42,7 +53,6 @@ export const ucanConcludeProvider = ({ // Schedule `blob/accept` const ranInvocation = receipt.ran - // TODO: This actually needs the accept task!!!! // Get invocation const httpPutTaskGetRes = await tasksStorage.get(ranInvocation.link()) if (httpPutTaskGetRes.error) { @@ -52,27 +62,29 @@ export const ucanConcludeProvider = ({ // Schedule `blob/accept` if there is a `http/put` capability const scheduleRes = await Promise.all( httpPutTaskGetRes.ok.capabilities - .filter((cap) => cap.can === Blob.put.can) + .filter((cap) => cap.can === HTTP.put.can) .map(async (cap) => { - const blobAccept = await Blob.accept.invoke({ - issuer: id, - audience: id, - with: id.toDIDKey(), - nb: { - // @ts-expect-error blob exists in put - blob: cap.nb.blob, - exp: Number.MAX_SAFE_INTEGER, - }, - expiration: Infinity, - }).delegate() - + const blobAccept = await W3sBlob.accept + .invoke({ + issuer: id, + audience: id, + with: id.toDIDKey(), + nb: { + // @ts-expect-error blob exists in `http/put` but unknown type here + blob: cap.nb.blob, + exp: Number.MAX_SAFE_INTEGER, + }, + expiration: Infinity, + }) + .delegate() + return tasksScheduler.schedule(blobAccept) }) ) - const scheduleErrors = scheduleRes.filter(res => res.error) + const scheduleErrors = scheduleRes.filter((res) => res.error) if (scheduleErrors.length && scheduleErrors[0].error) { return { - error: scheduleErrors[0].error + error: scheduleErrors[0].error, } } @@ -80,3 +92,25 @@ export const ucanConcludeProvider = ({ ok: { time: Date.now() }, } }) + +/** + * @param {import('multiformats').UnknownLink} cid + * @param {IterableIterator>} blocks + * @returns {Promise>} + */ +export const findBlock = async (cid, blocks) => { + let bytes + for (const b of blocks) { + if (b.cid.equals(cid)) { + bytes = b.bytes + } + } + if (!bytes) { + return { + error: new DecodeBlockOperationFailed(`missing block: ${cid}`), + } + } + return { + ok: bytes, + } +} diff --git a/packages/upload-api/test/handlers/blob.js b/packages/upload-api/test/handlers/blob.js index 0b8bd3c86..4df6ffa71 100644 --- a/packages/upload-api/test/handlers/blob.js +++ b/packages/upload-api/test/handlers/blob.js @@ -7,13 +7,16 @@ import { ed25519 } from '@ucanto/principal' import { CAR } from '@ucanto/transport' import { sha256 } from 'multiformats/hashes/sha2' import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' +import * as HTTPCapabilities from '@web3-storage/capabilities/http' import * as UCAN from '@web3-storage/capabilities/ucan' import { base64pad } from 'multiformats/bases/base64' import { provisionProvider } from '../helpers/utils.js' import { createServer, connect } from '../../src/lib.js' import { alice, bob, createSpace, registerSpace } from '../util.js' -import { BlobItemSizeExceededName } from '../../src/blob/lib.js' +import { BlobExceedsSizeLimitName } from '../../src/blob/lib.js' +import { findBlock } from '../../src/ucan/conclude.js' /** * @type {API.Tests} @@ -54,9 +57,10 @@ export const test = { } // Validate receipt - assert.ok(blobAdd.out.ok.claim) + assert.ok(blobAdd.out.ok.location) + assert.equal(blobAdd.out.ok.location['ucan/await'][0], '.out.ok.claim') assert.ok( - blobAdd.out.ok.claim['await/ok'].equals(blobAdd.fx.join?.link()) + blobAdd.out.ok.location['ucan/await'][1].equals(blobAdd.fx.join?.link()) ) assert.ok(blobAdd.fx.join) @@ -67,13 +71,13 @@ export const test = { const forkInvocations = blobAdd.fx.fork assert.equal(blobAdd.fx.fork.length, 3) const allocatefx = forkInvocations.find( - (fork) => fork.capabilities[0].can === BlobCapabilities.allocate.can + (fork) => fork.capabilities[0].can === W3sBlobCapabilities.allocate.can ) const allocateUcanConcludefx = forkInvocations.find( (fork) => fork.capabilities[0].can === UCAN.conclude.can ) const putfx = forkInvocations.find( - (fork) => fork.capabilities[0].can === BlobCapabilities.put.can + (fork) => fork.capabilities[0].can === HTTPCapabilities.put.can ) if (!allocatefx || !allocateUcanConcludefx || !putfx) { throw new Error('effects not provided') @@ -81,19 +85,22 @@ export const test = { // validate facts exist for `http/put` assert.ok(putfx.facts.length) - const [{ bytes, did }] = putfx.facts - assert.ok(bytes) - assert.ok(did) + assert.ok(putfx.facts[0]['keys']) // Validate `http/put` invocation stored const httpPutGetTask = await context.tasksStorage.get(putfx.cid) assert.ok(httpPutGetTask.ok) - // validate scheduled allocate task ran an its receipt content - const messageCar = CAR.codec.decode( + // validate that scheduled allocate task executed and has its receipt content + const getBlockRes = await findBlock( // @ts-expect-error object of type unknown - allocateUcanConcludefx.capabilities[0].nb.bytes + allocateUcanConcludefx.capabilities[0].nb.message, + allocateUcanConcludefx.iterateIPLDBlocks() ) + if (getBlockRes.error) { + throw new Error('receipt block should exist in invocation') + } + const messageCar = CAR.codec.decode(getBlockRes.ok) const message = Message.view({ root: messageCar.roots[0].cid, store: messageCar.blocks, @@ -108,7 +115,7 @@ export const test = { // @ts-expect-error receipt out is unknown assert.ok(receipt?.out.ok?.address) }, - 'blob/add executes allocation and returns effects for allocate (and its receipt) and accept, but not for put when blob stored': + 'blob/add executes allocation and returns effects for allocate (and its receipt) and accept, but not for put when blob stored': async (assert, context) => { const { proof, spaceDid } = await registerSpace(alice, context) @@ -158,15 +165,20 @@ export const test = { (fork) => fork.capabilities[0].can === UCAN.conclude.can ) const putfx = forkInvocations.find( - (fork) => fork.capabilities[0].can === BlobCapabilities.put.can + (fork) => fork.capabilities[0].can === HTTPCapabilities.put.can ) if (!allocateUcanConcludefx || !putfx) { throw new Error('effects not provided') } - const messageCar = CAR.codec.decode( + const getBlockRes = await findBlock( // @ts-expect-error object of type unknown - allocateUcanConcludefx.capabilities[0].nb.bytes + allocateUcanConcludefx.capabilities[0].nb.message, + allocateUcanConcludefx.iterateIPLDBlocks() ) + if (getBlockRes.error) { + throw new Error('receipt block should exist in invocation') + } + const messageCar = CAR.codec.decode(getBlockRes.ok) const message = Message.view({ root: messageCar.roots[0].cid, store: messageCar.blocks, @@ -220,9 +232,11 @@ export const test = { // @ts-expect-error read only effect const thirdForkInvocations = thirdBlobAdd.fx.fork // no put effect anymore - assert.ok(!thirdForkInvocations.find( - (fork) => fork.capabilities[0].can === BlobCapabilities.put.can - )) + assert.ok( + !thirdForkInvocations.find( + (fork) => fork.capabilities[0].can === HTTPCapabilities.put.can + ) + ) }, 'blob/add fails when a blob with size bigger than maximum size is added': async (assert, context) => { @@ -257,7 +271,7 @@ export const test = { throw new Error('invocation should have failed') } assert.ok(blobAdd.out.error, 'invocation should have failed') - assert.equal(blobAdd.out.error.name, BlobItemSizeExceededName) + assert.equal(blobAdd.out.error.name, BlobExceedsSizeLimitName) }, 'blob/allocate allocates to space and returns presigned url': async ( assert, @@ -293,7 +307,7 @@ export const test = { }) // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ issuer: alice, audience: context.id, with: spaceDid, @@ -391,7 +405,7 @@ export const test = { }) // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ issuer: alice, audience: context.id, with: spaceDid, @@ -469,7 +483,7 @@ export const test = { }) // invoke `service/blob/allocate` capabilities on alice space - const aliceServiceBlobAllocate = BlobCapabilities.allocate.invoke({ + const aliceServiceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ issuer: alice, audience: context.id, with: aliceSpaceDid, @@ -510,7 +524,7 @@ export const test = { assert.equal(goodPut.status, 200, await goodPut.text()) // invoke `service/blob/allocate` capabilities on bob space - const bobServiceBlobAllocate = BlobCapabilities.allocate.invoke({ + const bobServiceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ issuer: bob, audience: context.id, with: bobSpaceDid, @@ -577,7 +591,7 @@ export const test = { }) // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ issuer: alice, audience: context.id, with: spaceDid, @@ -654,7 +668,7 @@ export const test = { }) // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ issuer: alice, audience: context.id, with: spaceDid, @@ -729,7 +743,7 @@ export const test = { }) // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ issuer: alice, audience: context.id, with: spaceDid, @@ -784,7 +798,7 @@ export const test = { return Promise.resolve({ ok: {}, }) - } + }, }, }), }) @@ -817,15 +831,20 @@ export const test = { (fork) => fork.capabilities[0].can === UCAN.conclude.can ) const putfx = forkInvocations.find( - (fork) => fork.capabilities[0].can === BlobCapabilities.put.can + (fork) => fork.capabilities[0].can === HTTPCapabilities.put.can ) if (!allocateUcanConcludefx || !putfx) { throw new Error('effects not provided') } - const blobAllocateMessageCar = CAR.codec.decode( + const getBlockRes = await findBlock( // @ts-expect-error object of type unknown - allocateUcanConcludefx.capabilities[0].nb.bytes + allocateUcanConcludefx.capabilities[0].nb.message, + allocateUcanConcludefx.iterateIPLDBlocks() ) + if (getBlockRes.error) { + throw new Error('receipt block should exist in invocation') + } + const blobAllocateMessageCar = CAR.codec.decode(getBlockRes.ok) const blobAllocateMessage = Message.view({ root: blobAllocateMessageCar.roots[0].cid, store: blobAllocateMessageCar.blocks, @@ -851,11 +870,10 @@ export const test = { assert.equal(goodPut.status, 200, await goodPut.text()) // Create `http/put` receipt - /** @type {{ bytes: Uint8Array, did: string }} */ - // @ts-expect-error facts are unknown - const [{ bytes, did }] = putfx.facts - const putSubject = ed25519.decode(bytes) - const httpPut = BlobCapabilities.put.invoke({ + const keys = putfx.facts[0]['keys'] + // @ts-expect-error Argument of type 'unknown' is not assignable to parameter of type 'SignerArchive<`did:${string}:${string}`, SigAlg>' + const putSubject = ed25519.from(keys) + const httpPut = HTTPCapabilities.put.invoke({ issuer: putSubject, audience: putSubject, with: putSubject.toDIDKey(), @@ -864,7 +882,7 @@ export const test = { content, size, }, - address + address, }, facts: putfx.facts, expiration: Infinity, @@ -880,6 +898,8 @@ export const test = { }) const message = await Message.build({ receipts: [httpPutReceipt] }) const messageCar = await CAR.outbound.encode(message) + const bytes = new Uint8Array(messageCar.body) + const messageLink = await CAR.codec.link(bytes) // Invoke `ucan/conclude` with `http/put` receipt const httpPutConcludeInvocation = UCAN.conclude.invoke({ @@ -887,10 +907,14 @@ export const test = { audience: context.id, with: alice.did(), nb: { - bytes: messageCar.body, + message: messageLink, }, expiration: Infinity, }) + httpPutConcludeInvocation.attach({ + bytes, + cid: messageLink, + }) const ucanConclude = await httpPutConcludeInvocation.execute(connection) if (!ucanConclude.out.ok) { throw new Error('invocation failed', { cause: blobAdd }) @@ -899,12 +923,13 @@ export const test = { // verify accept was scheduled const blobAcceptInvocation = await taskScheduled.promise assert.ok(blobAcceptInvocation) + assert.equal(blobAdd.out.ok.location['ucan/await'][0], '.out.ok.claim') assert.ok( - blobAdd.out.ok.claim['await/ok'].equals(blobAcceptInvocation.cid) + blobAdd.out.ok.location['ucan/await'][1].equals( + blobAcceptInvocation.cid + ) ) assert.ok(blobAdd.fx.join?.link().equals(blobAcceptInvocation.cid)) }, // TODO: Blob accept - // TODO: list - // TODO: remove }