From 00735a80dfddbe86359af78ed9bd182f4804691f Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 12 Apr 2024 09:53:27 +0100 Subject: [PATCH] feat: blob, web3.storage and ucan conclude capabilities together with api handlers (#1342) Adds implementation of `blob/*`, `web3.storage/*` and `ucan/conclude` handlers and capabilities. --- packages/capabilities/package.json | 7 +- packages/capabilities/src/blob.js | 75 ++ packages/capabilities/src/http.js | 49 ++ packages/capabilities/src/index.js | 10 + packages/capabilities/src/types.ts | 117 ++- packages/capabilities/src/ucan.js | 30 +- packages/capabilities/src/utils.js | 101 ++- .../capabilities/src/web3.storage/blob.js | 108 +++ packages/upload-api/package.json | 3 + packages/upload-api/src/blob.js | 11 + packages/upload-api/src/blob/accept.js | 62 ++ packages/upload-api/src/blob/add.js | 350 +++++++++ packages/upload-api/src/blob/allocate.js | 95 +++ packages/upload-api/src/blob/lib.js | 71 ++ packages/upload-api/src/errors.js | 36 + packages/upload-api/src/lib.js | 4 + packages/upload-api/src/service.js | 15 + packages/upload-api/src/types.ts | 94 ++- packages/upload-api/src/types/blob.ts | 74 ++ packages/upload-api/src/types/service.ts | 15 + packages/upload-api/src/types/storage.ts | 20 + packages/upload-api/src/ucan.js | 2 + packages/upload-api/src/ucan/conclude.js | 153 ++++ packages/upload-api/src/ucan/lib.js | 30 + packages/upload-api/test/handlers/blob.js | 337 +++++++++ .../upload-api/test/handlers/blob.spec.js | 4 + packages/upload-api/test/handlers/ucan.js | 196 ++++- .../upload-api/test/handlers/web3.storage.js | 680 ++++++++++++++++++ .../test/handlers/web3.storage.spec.js | 4 + packages/upload-api/test/helpers/blob.js | 69 ++ packages/upload-api/test/helpers/context.js | 65 +- packages/upload-api/test/lib.js | 3 + .../test/storage/allocations-storage-tests.js | 313 ++++++++ .../test/storage/allocations-storage.js | 105 +++ .../test/storage/allocations-storage.spec.js | 3 + .../test/storage/blobs-storage-tests.js | 46 ++ .../upload-api/test/storage/blobs-storage.js | 167 +++++ .../test/storage/blobs-storage.spec.js | 3 + packages/upload-api/test/storage/index.js | 58 ++ .../test/storage/receipts-storage-tests.js | 108 +++ .../test/storage/receipts-storage.js | 64 ++ .../test/storage/receipts-storage.spec.js | 3 + .../test/storage/tasks-storage-tests.js | 95 +++ .../upload-api/test/storage/tasks-storage.js | 72 ++ .../test/storage/tasks-storage.spec.js | 3 + pnpm-lock.yaml | 22 +- 46 files changed, 3903 insertions(+), 49 deletions(-) create mode 100644 packages/capabilities/src/blob.js create mode 100644 packages/capabilities/src/http.js create mode 100644 packages/capabilities/src/web3.storage/blob.js create mode 100644 packages/upload-api/src/blob.js create mode 100644 packages/upload-api/src/blob/accept.js create mode 100644 packages/upload-api/src/blob/add.js create mode 100644 packages/upload-api/src/blob/allocate.js create mode 100644 packages/upload-api/src/blob/lib.js create mode 100644 packages/upload-api/src/errors.js create mode 100644 packages/upload-api/src/service.js create mode 100644 packages/upload-api/src/types/blob.ts create mode 100644 packages/upload-api/src/types/service.ts create mode 100644 packages/upload-api/src/types/storage.ts create mode 100644 packages/upload-api/src/ucan/conclude.js create mode 100644 packages/upload-api/src/ucan/lib.js create mode 100644 packages/upload-api/test/handlers/blob.js create mode 100644 packages/upload-api/test/handlers/blob.spec.js create mode 100644 packages/upload-api/test/handlers/web3.storage.js create mode 100644 packages/upload-api/test/handlers/web3.storage.spec.js create mode 100644 packages/upload-api/test/helpers/blob.js create mode 100644 packages/upload-api/test/storage/allocations-storage-tests.js create mode 100644 packages/upload-api/test/storage/allocations-storage.js create mode 100644 packages/upload-api/test/storage/allocations-storage.spec.js create mode 100644 packages/upload-api/test/storage/blobs-storage-tests.js create mode 100644 packages/upload-api/test/storage/blobs-storage.js create mode 100644 packages/upload-api/test/storage/blobs-storage.spec.js create mode 100644 packages/upload-api/test/storage/index.js create mode 100644 packages/upload-api/test/storage/receipts-storage-tests.js create mode 100644 packages/upload-api/test/storage/receipts-storage.js create mode 100644 packages/upload-api/test/storage/receipts-storage.spec.js create mode 100644 packages/upload-api/test/storage/tasks-storage-tests.js create mode 100644 packages/upload-api/test/storage/tasks-storage.js create mode 100644 packages/upload-api/test/storage/tasks-storage.spec.js diff --git a/packages/capabilities/package.json b/packages/capabilities/package.json index c244c500a..caf28e015 100644 --- a/packages/capabilities/package.json +++ b/packages/capabilities/package.json @@ -56,6 +56,10 @@ "types": "./dist/src/filecoin/dealer.d.ts", "import": "./src/filecoin/dealer.js" }, + "./web3.storage/blob": { + "types": "./dist/src/web3.storage/blob.d.ts", + "import": "./src/web3.storage/blob.js" + }, "./types": { "types": "./dist/src/types.d.ts", "import": "./src/types.js" @@ -88,7 +92,8 @@ "@ucanto/principal": "^9.0.1", "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", - "@web3-storage/data-segment": "^3.2.0" + "@web3-storage/data-segment": "^3.2.0", + "uint8arrays": "^5.0.3" }, "devDependencies": { "@web3-storage/eslint-config-w3up": "workspace:^", diff --git a/packages/capabilities/src/blob.js b/packages/capabilities/src/blob.js new file mode 100644 index 000000000..7d8b89506 --- /dev/null +++ b/packages/capabilities/src/blob.js @@ -0,0 +1,75 @@ +/** + * Blob Capabilities. + * + * Blob is a fixed size byte array addressed by the multihash. + * Usually blobs are used to represent set of IPLD blocks at different byte ranges. + * + * These can be imported directly with: + * ```js + * import * as Blob from '@web3-storage/capabilities/blob' + * ``` + * + * @module + */ +import { capability, Schema } from '@ucanto/validator' +import { equalBlob, equalWith, SpaceDID } from './utils.js' + +/** + * Agent capabilities for Blob protocol + */ + +/** + * Capability can only be delegated (but not invoked) allowing audience to + * derived any `blob/` prefixed capability for the (memory) space identified + * by DID in the `with` field. + */ +export const blob = capability({ + can: 'blob/*', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + derives: equalWith, +}) + +/** + * Blob description for being ingested by the service. + */ +export const content = Schema.struct({ + /** + * A multihash digest of the blob payload bytes, uniquely identifying blob. + */ + digest: Schema.bytes(), + /** + * Number of bytes contained by this blob. Service will provision write target + * for this exact size. Attempt to write a larger Blob file will fail. + */ + size: Schema.integer(), +}) + +/** + * `blob/add` capability allows agent to store a Blob into a (memory) space + * identified by did:key in the `with` field. Agent should compute blob multihash + * and size and provide it under `nb.blob` field, allowing a service to provision + * a write location for the agent to PUT desired Blob into. + */ +export const add = capability({ + can: 'blob/add', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * Blob to be added on the space. + */ + blob: content, + }), + derives: equalBlob, +}) + +// ⚠️ We export imports here so they are not omitted in generated typedefs +// @see https://github.com/microsoft/TypeScript/issues/51548 +export { Schema } diff --git a/packages/capabilities/src/http.js b/packages/capabilities/src/http.js new file mode 100644 index 000000000..a20b025f8 --- /dev/null +++ b/packages/capabilities/src/http.js @@ -0,0 +1,49 @@ +/** + * HTTP Capabilities. + * + * These can be imported directly with: + * ```js + * import * as HTTP from '@web3-storage/capabilities/http' + * ``` + * + * @module + */ +import { capability, Schema, ok } from '@ucanto/validator' +import { content } from './blob.js' +import { equal, equalBody, equalWith, SpaceDID, Await, and } from './utils.js' + +/** + * `http/put` capability invocation MAY be performed by any authorized agent on behalf of the subject + * as long as they have referenced `body` content to do so. + */ +export const put = capability({ + can: 'http/put', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * Description of body to send (digest/size). + */ + body: content, + /** + * HTTP(S) location that can receive blob content via HTTP PUT request. + */ + url: Schema.string().or(Await), + /** + * HTTP headers. + */ + headers: Schema.dictionary({ value: Schema.string() }).or(Await), + }), + derives: (claim, from) => { + return ( + and(equalWith(claim, from)) || + and(equalBody(claim, from)) || + and(equal(claim.nb.url, from.nb, 'url')) || + and(equal(claim.nb.headers, from.nb, 'headers')) || + ok({}) + ) + }, +}) diff --git a/packages/capabilities/src/index.js b/packages/capabilities/src/index.js index d80fbff46..c5d9385d1 100644 --- a/packages/capabilities/src/index.js +++ b/packages/capabilities/src/index.js @@ -19,6 +19,9 @@ import * as DealTracker from './filecoin/deal-tracker.js' import * as UCAN from './ucan.js' import * as Plan from './plan.js' import * as Usage from './usage.js' +import * as Blob from './blob.js' +import * as W3sBlob from './web3.storage/blob.js' +import * as HTTP from './http.js' export { Access, @@ -63,6 +66,7 @@ export const abilitiesAsStrings = [ Access.access.can, Access.authorize.can, UCAN.attest.can, + UCAN.conclude.can, Customer.get.can, Consumer.has.can, Consumer.get.can, @@ -86,4 +90,10 @@ export const abilitiesAsStrings = [ Plan.get.can, Usage.usage.can, Usage.report.can, + Blob.blob.can, + Blob.add.can, + W3sBlob.blob.can, + W3sBlob.allocate.can, + W3sBlob.accept.can, + HTTP.put.can, ] diff --git a/packages/capabilities/src/types.ts b/packages/capabilities/src/types.ts index 9848a42ca..279105585 100644 --- a/packages/capabilities/src/types.ts +++ b/packages/capabilities/src/types.ts @@ -21,6 +21,9 @@ import { import { space, info } from './space.js' import * as provider from './provider.js' import { top } from './top.js' +import * as BlobCaps from './blob.js' +import * as W3sBlobCaps from './web3.storage/blob.js' +import * as HTTPCaps from './http.js' import * as StoreCaps from './store.js' import * as UploadCaps from './upload.js' import * as AccessCaps from './access.js' @@ -41,6 +44,10 @@ export type ISO8601Date = string export type { Unit, PieceLink } +export interface UCANAwait { + 'ucan/await': [Selector, Link] +} + /** * An IPLD Link that has the CAR codec code. */ @@ -439,6 +446,95 @@ export interface UploadNotFound extends Ucanto.Failure { export type UploadGetFailure = UploadNotFound | Ucanto.Failure +// HTTP +export type HTTPPut = InferInvokedCapability + +// Blob +export type Blob = InferInvokedCapability +export type BlobAdd = InferInvokedCapability +export type ServiceBlob = InferInvokedCapability +export type BlobAllocate = InferInvokedCapability +export type BlobAccept = InferInvokedCapability + +export type BlobMultihash = Uint8Array +export interface BlobModel { + digest: BlobMultihash + size: number +} + +// Blob add +export interface BlobAddSuccess { + site: UCANAwait<'.out.ok.site'> +} + +export interface BlobSizeOutsideOfSupportedRange extends Ucanto.Failure { + name: 'BlobSizeOutsideOfSupportedRange' +} + +export interface AwaitError extends Ucanto.Failure { + name: 'AwaitError' +} + +// TODO: We need Ucanto.Failure because provideAdvanced can't handle errors without it +export type BlobAddFailure = + | BlobSizeOutsideOfSupportedRange + | AwaitError + | StorageGetError + | Ucanto.Failure + +export interface BlobListItem { + blob: BlobModel + insertedAt: ISO8601Date +} + +// Blob allocate +export interface BlobAllocateSuccess { + size: number + address?: BlobAddress +} + +export interface BlobAddress { + url: ToString + headers: Record + expiresAt: ISO8601Date +} + +// If user space has not enough space to allocate the blob. +export interface NotEnoughStorageCapacity extends Ucanto.Failure { + name: 'NotEnoughStorageCapacity' +} + +export type BlobAllocateFailure = NotEnoughStorageCapacity | Ucanto.Failure + +// Blob accept +export interface BlobAcceptSuccess { + // A Link for a delegation with site commiment for the added blob. + site: Link +} + +export interface AllocatedMemoryHadNotBeenWrittenTo extends Ucanto.Failure { + name: 'AllocatedMemoryHadNotBeenWrittenTo' +} + +// TODO: We should type the store errors and add them here, instead of Ucanto.Failure +export type BlobAcceptFailure = + | AllocatedMemoryHadNotBeenWrittenTo + | Ucanto.Failure + +// Storage errors +export type StoragePutError = StorageOperationError +export type StorageGetError = StorageOperationError | RecordNotFound + +// Operation on a storage failed with unexpected error +export interface StorageOperationError extends Error { + name: 'StorageOperationFailed' +} + +// Record requested not found in the storage +export interface RecordNotFound extends Error { + name: 'RecordNotFound' +} + // Store export type Store = InferInvokedCapability export type StoreAdd = InferInvokedCapability @@ -530,6 +626,7 @@ export interface UploadListSuccess extends ListResponse {} export type UCANRevoke = InferInvokedCapability export type UCANAttest = InferInvokedCapability +export type UCANConclude = InferInvokedCapability export interface Timestamp { /** @@ -540,6 +637,8 @@ export interface Timestamp { export type UCANRevokeSuccess = Timestamp +export type UCANConcludeSuccess = Timestamp + /** * Error is raised when `UCAN` being revoked is not supplied or it's proof chain * leading to supplied `scope` is not supplied. @@ -578,6 +677,15 @@ export type UCANRevokeFailure = | UnauthorizedRevocation | RevocationsStoreFailure +/** + * Error is raised when receipt is received for unknown invocation + */ +export interface ReferencedInvocationNotFound extends Ucanto.Failure { + name: 'ReferencedInvocationNotFound' +} + +export type UCANConcludeFailure = ReferencedInvocationNotFound | Ucanto.Failure + // Admin export type Admin = InferInvokedCapability export type AdminUploadInspect = InferInvokedCapability< @@ -686,6 +794,7 @@ export type ServiceAbilityArray = [ Access['can'], AccessAuthorize['can'], UCANAttest['can'], + UCANConclude['can'], CustomerGet['can'], ConsumerHas['can'], ConsumerGet['can'], @@ -708,7 +817,13 @@ export type ServiceAbilityArray = [ AdminStoreInspect['can'], PlanGet['can'], Usage['can'], - UsageReport['can'] + UsageReport['can'], + Blob['can'], + BlobAdd['can'], + ServiceBlob['can'], + BlobAllocate['can'], + BlobAccept['can'], + HTTPPut['can'] ] /** diff --git a/packages/capabilities/src/ucan.js b/packages/capabilities/src/ucan.js index fe38b757e..b8ceb9d87 100644 --- a/packages/capabilities/src/ucan.js +++ b/packages/capabilities/src/ucan.js @@ -2,7 +2,7 @@ * UCAN core capabilities. */ -import { capability, Schema } from '@ucanto/validator' +import { capability, Schema, ok } from '@ucanto/validator' import * as API from '@ucanto/interface' import { equalWith, equal, and, checkLink } from './utils.js' @@ -74,6 +74,34 @@ export const revoke = capability({ ), }) +/** + * `ucan/conclude` capability represents a receipt using a special UCAN capability. + * + * The UCAN invocation specification defines receipt record, that is cryptographically + * signed description of the invocation output and requested effects. Receipt + * structure is very similar to UCAN except it has no notion of expiry nor it is + * possible to delegate ability to issue receipt to another principal. + */ +export const conclude = capability({ + can: 'ucan/conclude', + /** + * DID of the principal representing the Conclusion Authority. + * MUST be the DID of the audience of the ran invocation. + */ + with: Schema.did(), + nb: Schema.struct({ + /** + * CID of the content with the Receipt. + */ + receipt: Schema.link(), + }), + derives: (claim, from) => + // With field MUST be the same + and(equalWith(claim, from)) || + and(checkLink(claim.nb.receipt, from.nb.receipt, 'nb.receipt')) || + ok({}), +}) + /** * Issued by trusted authority (usually the one handling invocation) that attest * that specific UCAN delegation has been considered authentic. diff --git a/packages/capabilities/src/utils.js b/packages/capabilities/src/utils.js index ac1e7e317..00e512d1c 100644 --- a/packages/capabilities/src/utils.js +++ b/packages/capabilities/src/utils.js @@ -1,7 +1,9 @@ -import { DID, fail, ok } from '@ucanto/validator' +import { DID, Schema, fail, ok } from '@ucanto/validator' // eslint-disable-next-line no-unused-vars import * as Types from '@ucanto/interface' +import { equals } from 'uint8arrays/equals' + // e.g. did:web:web3.storage or did:web:staging.web3.storage export const ProviderDID = DID.match({ method: 'web' }) @@ -9,6 +11,10 @@ export const SpaceDID = DID.match({ method: 'key' }) export const AccountDID = DID.match({ method: 'mailto' }) +export const Await = Schema.struct({ + 'ucan/await': Schema.tuple([Schema.string(), Schema.link()]), +}) + /** * Check URI can be delegated * @@ -85,6 +91,99 @@ export const equalLink = (claimed, delegated) => { } } +/** + * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"web3.storage/blob/allocate"|"web3.storage/blob/accept", Types.URI<'did:'>, {blob: { digest: Uint8Array, size: number }}>} T + * @param {T} claimed + * @param {T} delegated + * @returns {Types.Result<{}, Types.Failure>} + */ +export const equalBlob = (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } else if ( + delegated.nb.blob.digest && + !equals(delegated.nb.blob.digest, claimed.nb.blob.digest) + ) { + return fail( + `Link ${ + claimed.nb.blob.digest ? `${claimed.nb.blob.digest}` : '' + } violates imposed ${delegated.nb.blob.digest} constraint.` + ) + } else if ( + claimed.nb.blob.size !== undefined && + delegated.nb.blob.size !== undefined + ) { + return claimed.nb.blob.size > delegated.nb.blob.size + ? fail( + `Size constraint violation: ${claimed.nb.blob.size} > ${delegated.nb.blob.size}` + ) + : ok({}) + } else { + return ok({}) + } +} + +/** + * @template {Types.ParsedCapability<"http/put", Types.URI<'did:'>, {body: { digest: Uint8Array, size: number }}>} T + * @param {T} claimed + * @param {T} delegated + * @returns {Types.Result<{}, Types.Failure>} + */ +export const equalBody = (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } else if ( + delegated.nb.body.digest && + !equals(delegated.nb.body.digest, claimed.nb.body.digest) + ) { + return fail( + `Link ${ + claimed.nb.body.digest ? `${claimed.nb.body.digest}` : '' + } violates imposed ${delegated.nb.body.digest} constraint.` + ) + } else if ( + claimed.nb.body.size !== undefined && + delegated.nb.body.size !== undefined + ) { + return claimed.nb.body.size > delegated.nb.body.size + ? fail( + `Size constraint violation: ${claimed.nb.body.size} > ${delegated.nb.body.size}` + ) + : ok({}) + } else { + return ok({}) + } +} + +/** + * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"blob/allocate"|"blob/accept"|"http/put", Types.URI<'did:'>, {content: Uint8Array}>} T + * @param {T} claimed + * @param {T} delegated + * @returns {Types.Result<{}, Types.Failure>} + */ +export const equalContent = (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } else if ( + delegated.nb.content && + !equals(delegated.nb.content, claimed.nb.content) + ) { + return fail( + `Link ${ + claimed.nb.content ? `${claimed.nb.content}` : '' + } violates imposed ${delegated.nb.content} constraint.` + ) + } else { + return ok({}) + } +} + /** * Checks that `claimed` {@link Types.Link} meets an `imposed` constraint. * diff --git a/packages/capabilities/src/web3.storage/blob.js b/packages/capabilities/src/web3.storage/blob.js new file mode 100644 index 000000000..42085380f --- /dev/null +++ b/packages/capabilities/src/web3.storage/blob.js @@ -0,0 +1,108 @@ +import { capability, Schema, Link, ok } from '@ucanto/validator' +import { content } from '../blob.js' +import { + equalBlob, + equalWith, + SpaceDID, + and, + equal, + checkLink, + Await, +} from '../utils.js' + +/** + * Service capabilities for Blob protocol + */ +/** + * Capability can only be delegated (but not invoked) allowing audience to + * derived any `web3.storage/blob/` prefixed capability for the (memory) space identified + * by DID in the `with` field. + */ +export const blob = capability({ + can: 'web3.storage/blob/*', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + derives: equalWith, +}) + +/** + * `web3.storage/blob//allocate` capability can be invoked to create a memory + * address where blob content can be written via HTTP PUT request. + */ +export const allocate = capability({ + can: 'web3.storage/blob/allocate', + /** + * Provider DID. + */ + with: Schema.did(), + nb: Schema.struct({ + /** + * Blob to allocate on the space. + */ + blob: content, + /** + * The Link for an Add Blob task, that caused an allocation + */ + cause: Link, + /** + * DID of the user space where allocation takes place + */ + space: SpaceDID, + }), + derives: (claim, from) => { + return ( + and(equalWith(claim, from)) || + and(equalBlob(claim, from)) || + and(checkLink(claim.nb.cause, from.nb.cause, 'cause')) || + and(equal(claim.nb.space, from.nb.space, 'space')) || + ok({}) + ) + }, +}) + +/** + * `blob/accept` capability invocation should either succeed when content is + * delivered on allocated address or fail if no content is allocation expires + * without content being delivered. + */ +export const accept = capability({ + can: 'web3.storage/blob/accept', + /** + * Provider DID. + */ + with: Schema.did(), + nb: Schema.struct({ + /** + * Blob to accept. + */ + blob: content, + /** + * Content location commitment time to live, which will be encoded as expiry of the issued location claim. + */ + ttl: Schema.integer().optional(), + /** + * DID of the user space where allocation took place + */ + space: SpaceDID, + /** + * This task is blocked on `http/put` receipt available + */ + _put: Await, + }), + derives: (claim, from) => { + return ( + and(equalWith(claim, from)) || + and(equalBlob(claim, from)) || + and(equal(claim.nb.ttl, from.nb.ttl, 'ttl')) || + and(equal(claim.nb.space, from.nb.space, 'space')) || + ok({}) + ) + }, +}) + +// ⚠️ We export imports here so they are not omitted in generated typedefs +// @see https://github.com/microsoft/TypeScript/issues/51548 +export { Schema, Link } diff --git a/packages/upload-api/package.json b/packages/upload-api/package.json index 2cac851c0..ceb4e91a5 100644 --- a/packages/upload-api/package.json +++ b/packages/upload-api/package.json @@ -179,9 +179,11 @@ "@ucanto/validator": "^9.0.2", "@web3-storage/access": "workspace:^", "@web3-storage/capabilities": "workspace:^", + "@web3-storage/content-claims": "^4.0.4", "@web3-storage/did-mailto": "workspace:^", "@web3-storage/filecoin-api": "workspace:^", "multiformats": "^12.1.2", + "uint8arrays": "^5.0.3", "p-retry": "^5.1.2" }, "devDependencies": { @@ -195,6 +197,7 @@ "is-subset": "^0.1.1", "mocha": "^10.2.0", "one-webcrypto": "git://github.com/web3-storage/one-webcrypto", + "p-defer": "^4.0.1", "typescript": "5.2.2" }, "eslintConfig": { diff --git a/packages/upload-api/src/blob.js b/packages/upload-api/src/blob.js new file mode 100644 index 000000000..84187cefd --- /dev/null +++ b/packages/upload-api/src/blob.js @@ -0,0 +1,11 @@ +import { blobAddProvider } from './blob/add.js' +import * as API from './types.js' + +/** + * @param {API.BlobServiceContext} context + */ +export function createService(context) { + return { + add: blobAddProvider(context), + } +} diff --git a/packages/upload-api/src/blob/accept.js b/packages/upload-api/src/blob/accept.js new file mode 100644 index 000000000..13cee2868 --- /dev/null +++ b/packages/upload-api/src/blob/accept.js @@ -0,0 +1,62 @@ +import * as Server from '@ucanto/server' +import * as DID from '@ipld/dag-ucan/did' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' +import { Assert } from '@web3-storage/content-claims/capability' +import { create as createLink } from 'multiformats/link' +import { Digest } from 'multiformats/hashes/digest' +import { sha256 } from 'multiformats/hashes/sha2' +import { code as rawCode } from 'multiformats/codecs/raw' +import * as API from '../types.js' +import { AllocatedMemoryHadNotBeenWrittenTo } from './lib.js' + +const R2_REGION = 'auto' +const R2_BUCKET = 'carpark-prod-0' + +/** + * @param {API.W3ServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAcceptProvider(context) { + return Server.provideAdvanced({ + capability: W3sBlob.accept, + handler: async ({ capability }) => { + const { blob, space } = capability.nb + // If blob is not stored, we must fail + const hasBlob = await context.blobsStorage.has(blob.digest) + if (hasBlob.error) { + return hasBlob + } else if (!hasBlob.ok) { + return { + error: new AllocatedMemoryHadNotBeenWrittenTo(), + } + } + + // TODO: we need to support multihash in claims, or specify hardcoded codec + const digest = new Digest(sha256.code, 32, blob.digest, blob.digest) + const content = createLink(rawCode, digest) + const w3link = `https://w3s.link/ipfs/${content.toString()}?origin=r2://${R2_REGION}/${R2_BUCKET}` + + const locationClaim = await Assert.location.delegate({ + issuer: context.id, + audience: DID.parse(space), + with: context.id.toDIDKey(), + nb: { + content, + location: [ + // @ts-expect-error Type 'string' is not assignable to type '`${string}:${string}`' + w3link, + ], + }, + expiration: Infinity, + }) + + // Create result object + /** @type {API.OkBuilder} */ + const result = Server.ok({ + site: locationClaim.cid, + }) + + return result.fork(locationClaim) + }, + }) +} diff --git a/packages/upload-api/src/blob/add.js b/packages/upload-api/src/blob/add.js new file mode 100644 index 000000000..0d0fdc533 --- /dev/null +++ b/packages/upload-api/src/blob/add.js @@ -0,0 +1,350 @@ +import * as Server from '@ucanto/server' +import { ed25519 } from '@ucanto/principal' +import * as Blob from '@web3-storage/capabilities/blob' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' +import * as HTTP from '@web3-storage/capabilities/http' +import * as API from '../types.js' + +import { createConcludeInvocation } from '../ucan/conclude.js' +import { BlobSizeOutsideOfSupportedRange, AwaitError } from './lib.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAddProvider(context) { + return Server.provideAdvanced({ + capability: Blob.add, + handler: async ({ capability, invocation }) => { + // Prepare context + const { blob } = capability.nb + const space = capability.with + + // Verify blob is within accept size + if (blob.size > context.maxUploadSize) { + return { + error: new BlobSizeOutsideOfSupportedRange( + blob.size, + context.maxUploadSize + ), + } + } + + // Create allocate task, get its receipt if available and execute it if necessary + const allocateRes = await allocate({ + context, + blob, + space, + cause: invocation.link(), + }) + if (allocateRes.error) { + return allocateRes + } + + // Create put task and get its receipt if available + const putRes = await put({ + context, + blob, + allocateTask: allocateRes.ok.task, + }) + if (putRes.error) { + return putRes + } + + // Create accept task, get its receipt if available and execute if necessary and ready + const acceptRes = await accept({ + context, + blob, + space, + putTask: putRes.ok.task, + putReceipt: putRes.ok.receipt, + }) + if (acceptRes.error) { + return acceptRes + } + + // Create result object + /** @type {API.OkBuilder} */ + const result = Server.ok({ + site: { + 'ucan/await': ['.out.ok.site', acceptRes.ok.task.link()], + }, + }) + + // If there is no receipt for `http/put` we also still are pending receipt for `accept` + if (!putRes.ok.receipt || !acceptRes.ok.receipt) { + return ( + result + // 1. System attempts to allocate memory in user space for the blob. + .fork(allocateRes.ok.task) + .fork(allocateRes.ok.receipt) + // 2. System requests user agent (or anyone really) to upload the content + // corresponding to the blob + // via HTTP PUT to given location. + .fork(putRes.ok.task) + // 3. System will attempt to accept uploaded content that matches blob + // multihash and size. + .join(acceptRes.ok.task) + ) + } + + return ( + result + // 1. System attempts to allocate memory in user space for the blob. + .fork(allocateRes.ok.task) + .fork(allocateRes.ok.receipt) + // 2. System requests user agent (or anyone really) to upload the content + // corresponding to the blob + // via HTTP PUT to given location. + .fork(putRes.ok.task) + .fork(putRes.ok.receipt) + // 3. System will attempt to accept uploaded content that matches blob + // multihash and size. + .join(acceptRes.ok.task) + .fork(acceptRes.ok.receipt) + ) + }, + }) +} + +/** + * Create allocate and run task if there is no receipt for it already. + * If there is a non expired receipt available, it is returned insted of runing the task again. + * Otherwise, allocation task is executed. + * + * @param {object} allocate + * @param {API.BlobServiceContext} allocate.context + * @param {API.BlobModel} allocate.blob + * @param {API.DIDKey} allocate.space + * @param {API.Link} allocate.cause + */ +async function allocate({ context, blob, space, cause }) { + // 1. Create web3.storage/blob/allocate invocation and task + const allocate = W3sBlob.allocate.invoke({ + issuer: context.id, + audience: context.id, + with: context.id.did(), + nb: { + blob, + cause: cause, + space, + }, + expiration: Infinity, + }) + const task = await allocate.delegate() + + /** @type {import('@ucanto/interface').Receipt | undefined} */ + let blobAllocateReceipt + + // 2. Get receipt for `blob/allocate` if available, otherwise schedule invocation + const receiptGet = await context.receiptsStorage.get(task.link()) + if (receiptGet.error && receiptGet.error.name !== 'RecordNotFound') { + return { + error: receiptGet.error, + } + } else if (receiptGet.ok) { + blobAllocateReceipt = receiptGet.ok + + // Verify if allocation is expired before "accepting" this receipt. + // Note that if there is no address, means it was already allocated successfully before + const expiresAt = blobAllocateReceipt?.out.ok?.address?.expiresAt + if (expiresAt && new Date().getTime() > new Date(expiresAt).getTime()) { + // if expired, we must see if blob was written to avoid allocating one more time + const hasBlobStore = await context.blobsStorage.has(blob.digest) + if (hasBlobStore.error) { + return hasBlobStore + } else if (!hasBlobStore.ok) { + blobAllocateReceipt = undefined + } + } + } + + // 3. if not already allocated (or expired) execute `blob/allocate` + if (!blobAllocateReceipt) { + // Execute allocate invocation + const allocateRes = await allocate.execute(context.getServiceConnection()) + if (allocateRes.out.error) { + return { + error: new AwaitError({ + cause: allocateRes.out.error, + at: 'ucan/wait', + reference: ['.out.ok', task.link()], + }), + } + } + blobAllocateReceipt = allocateRes + } + + // 4. Create `blob/allocate` receipt as conclude invocation to inline as effect + const concludeAllocate = createConcludeInvocation( + context.id, + context.id, + blobAllocateReceipt + ) + + return { + ok: { + task, + receipt: await concludeAllocate.delegate(), + }, + } +} + +/** + * Create put task and check if there is a receipt for it already. + * A `http/put` should be task is stored by the service, if it does not exist + * and a receipt is fetched if already available. + * + * @param {object} put + * @param {API.BlobServiceContext} put.context + * @param {API.BlobModel} put.blob + * @param {API.Invocation} put.allocateTask + */ +async function put({ context, blob, allocateTask }) { + // 1. Create http/put invocation as task + + // We derive principal from the blob multihash to be an audience + // of the `http/put` invocation. That way anyone with blob digest + // could perform the invocation and issue receipt by deriving same + // principal + const blobProvider = await ed25519.derive( + blob.digest.subarray(-32) + ) + const facts = [ + { + keys: blobProvider.toArchive(), + }, + ] + const put = HTTP.put.invoke({ + issuer: blobProvider, + audience: blobProvider, + with: blobProvider.toDIDKey(), + nb: { + body: blob, + url: { + 'ucan/await': ['.out.ok.address.url', allocateTask.link()], + }, + headers: { + 'ucan/await': ['.out.ok.address.headers', allocateTask.link()], + }, + }, + facts, + expiration: Infinity, + }) + const task = await put.delegate() + + // 2. Get receipt for `http/put` if available + const receiptGet = await context.receiptsStorage.get(task.link()) + // Storage get can fail with `RecordNotFound` or other unexpected errors. + // If 'RecordNotFound' we proceed, otherwise we fail with the received error. + if (receiptGet.error && receiptGet.error.name !== 'RecordNotFound') { + return { + error: receiptGet.error, + } + } else if (receiptGet.ok) { + // 3. Create `blob/allocate` receipt as conclude invocation to inline as effect + const concludePut = createConcludeInvocation( + context.id, + context.id, + receiptGet.ok + ) + return { + ok: { + task, + receipt: await concludePut.delegate(), + }, + } + } + + // 3. store `http/put` invocation + const invocationPutRes = await context.tasksStorage.put(task) + if (invocationPutRes.error) { + return { + error: invocationPutRes.error, + } + } + + return { + ok: { + task, + receipt: undefined, + }, + } +} + +/** + * Create accept and run task if there is no receipt. + * A accept task can run when `http/put` receipt already exists. + * + * @param {object} accept + * @param {API.BlobServiceContext} accept.context + * @param {API.BlobModel} accept.blob + * @param {API.DIDKey} accept.space + * @param {API.Invocation} accept.putTask + * @param {API.Invocation} [accept.putReceipt] + */ +async function accept({ context, blob, space, putTask, putReceipt }) { + // 1. Create web3.storage/blob/accept invocation and task + const accept = W3sBlob.accept.invoke({ + issuer: context.id, + audience: context.id, + with: context.id.did(), + nb: { + blob, + space, + _put: { 'ucan/await': ['.out.ok', putTask.link()] }, + }, + expiration: Infinity, + }) + const task = await accept.delegate() + + // 2. If there is not put receipt, `accept` is still blocked + if (!putReceipt) { + return { + ok: { + task, + receipt: undefined, + }, + } + } + + // 3. Get receipt for `blob/accept` if available, otherwise execute invocation + let blobAcceptReceipt + const receiptGet = await context.receiptsStorage.get(task.link()) + if (receiptGet.error && receiptGet.error.name !== 'RecordNotFound') { + return { + error: receiptGet.error, + } + } else if (receiptGet.ok) { + blobAcceptReceipt = receiptGet.ok + } + + // 4. if not already accepted execute `blob/accept` + if (!blobAcceptReceipt) { + // Execute accept invocation + const acceptRes = await accept.execute(context.getServiceConnection()) + if (acceptRes.out.error) { + return { + error: new AwaitError({ + cause: acceptRes.out.error, + at: 'ucan/wait', + reference: ['.out.ok', task.link()], + }), + } + } + blobAcceptReceipt = acceptRes + } + + // Create `blob/accept` receipt as conclude invocation to inline as effect + const concludeAccept = createConcludeInvocation( + context.id, + context.id, + blobAcceptReceipt + ) + return { + ok: { + task, + receipt: await concludeAccept.delegate(), + }, + } +} diff --git a/packages/upload-api/src/blob/allocate.js b/packages/upload-api/src/blob/allocate.js new file mode 100644 index 000000000..a1f256040 --- /dev/null +++ b/packages/upload-api/src/blob/allocate.js @@ -0,0 +1,95 @@ +import * as Server from '@ucanto/server' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' +import * as API from '../types.js' + +/** + * @param {API.W3ServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAllocateProvider(context) { + return Server.provide( + W3sBlob.allocate, + async ({ capability, invocation }) => { + const { blob, cause, space } = capability.nb + let size = blob.size + + // We check if space has storage provider associated. If it does not + // we return `InsufficientStorage` error as storage capacity is considered + // to be 0. + const result = await context.provisionsStorage.hasStorageProvider(space) + if (result.error) { + return result + } + if (!result.ok) { + return { + /** @type {API.AllocationError} */ + error: { + name: 'InsufficientStorage', + message: `${space} has no storage provider`, + }, + } + } + + // Allocate memory space for the blob. If memory for this blob is + // already allocated, this allocates 0 bytes. + const allocationInsert = await context.allocationsStorage.insert({ + space, + blob, + invocation: cause, + }) + if (allocationInsert.error) { + // if the insert failed with conflict then this item has already been + // added to the space and there is no allocation change. + // If record exists but is expired, it can be re-written + if (allocationInsert.error.name === 'RecordKeyConflict') { + size = 0 + } else { + return { + error: allocationInsert.error, + } + } + } + + // Check if blob already exists + // TODO: this may depend on the region we want to allocate and will need changes in the future. + const hasBlobStore = await context.blobsStorage.has(blob.digest) + if (hasBlobStore.error) { + return hasBlobStore + } + + // If blob is stored, we can just allocate it to the space with the allocated size + // TODO: this code path MAY lead to await failures - awaited http/put and blob/accept tasks + // are supposed to fail if path does not exists. + if (hasBlobStore.ok) { + return { + ok: { size }, + } + } + + // Get presigned URL for the write target + const expiresIn = 60 * 60 * 24 // 1 day + const expiresAt = new Date(Date.now() + expiresIn).toISOString() + const createUploadUrl = await context.blobsStorage.createUploadUrl( + blob.digest, + blob.size, + expiresIn + ) + if (createUploadUrl.error) { + return createUploadUrl + } + + const address = { + url: createUploadUrl.ok.url.toString(), + headers: createUploadUrl.ok.headers, + expiresAt, + } + + return { + ok: { + size, + address, + }, + } + } + ) +} diff --git a/packages/upload-api/src/blob/lib.js b/packages/upload-api/src/blob/lib.js new file mode 100644 index 000000000..59f939794 --- /dev/null +++ b/packages/upload-api/src/blob/lib.js @@ -0,0 +1,71 @@ +import { Failure } from '@ucanto/server' + +export const AllocatedMemoryHadNotBeenWrittenToName = + 'AllocatedMemoryHadNotBeenWrittenTo' +export class AllocatedMemoryHadNotBeenWrittenTo extends Failure { + get name() { + return AllocatedMemoryHadNotBeenWrittenToName + } + + describe() { + return `Blob not found` + } +} + +export const BlobSizeOutsideOfSupportedRangeName = + 'BlobSizeOutsideOfSupportedRange' +export class BlobSizeOutsideOfSupportedRange extends Failure { + /** + * @param {Number} blobSize + * @param {Number} maxUploadSize + */ + constructor(blobSize, maxUploadSize) { + super() + this.blobSize = blobSize + this.maxUploadSize = maxUploadSize + } + + get name() { + return BlobSizeOutsideOfSupportedRangeName + } + + describe() { + return `Blob size ${this.blobSize} exceeded maximum size limit: ${this.maxUploadSize}, consider splitting it into blobs that fit limit.` + } + + toJSON() { + return { + ...super.toJSON(), + maxUploadSize: this.maxUploadSize, + blobSize: this.blobSize, + } + } +} + +export const AwaitErrorName = 'AwaitError' +export class AwaitError extends Failure { + /** + * @param {object} source + * @param {string} source.at - argument path that referenced failed `await` + * @param {[selector: string, task: import('@ucanto/interface').UnknownLink]} source.reference - awaited reference that failed + * @param {import('@ucanto/interface').Failure} source.cause - error that caused referenced `await` to fail + */ + constructor({ at, reference, cause }) { + super() + this.at = at + this.reference = reference + this.cause = cause + } + describe() { + const [selector, task] = this.reference + return `Awaited (${selector} ${task}) reference at ${this.at} has failed:\n${this.cause}` + } + get name() { + return AwaitErrorName + } + toJSON() { + return { + ...super.toJSON(), + } + } +} diff --git a/packages/upload-api/src/errors.js b/packages/upload-api/src/errors.js new file mode 100644 index 000000000..9c7df6fe8 --- /dev/null +++ b/packages/upload-api/src/errors.js @@ -0,0 +1,36 @@ +import * as Server from '@ucanto/server' + +export const StoreOperationErrorName = /** @type {const} */ ( + 'StoreOperationFailed' +) +export class StoreOperationFailed extends Server.Failure { + get reason() { + return this.message + } + + get name() { + return StoreOperationErrorName + } +} + +export const RecordKeyConflictName = /** @type {const} */ ('RecordKeyConflict') +export class RecordKeyConflict extends Server.Failure { + get reason() { + return this.message + } + + get name() { + return RecordKeyConflictName + } +} + +export const RecordNotFoundErrorName = /** @type {const} */ ('RecordNotFound') +export class RecordNotFound extends Server.Failure { + get reason() { + return this.message + } + + get name() { + return RecordNotFoundErrorName + } +} diff --git a/packages/upload-api/src/lib.js b/packages/upload-api/src/lib.js index c4fd4ebaa..03ba7184b 100644 --- a/packages/upload-api/src/lib.js +++ b/packages/upload-api/src/lib.js @@ -4,6 +4,7 @@ import * as Types from './types.js' import * as Legacy from '@ucanto/transport/legacy' import * as CAR from '@ucanto/transport/car' import { create as createRevocationChecker } from './utils/revocation.js' +import { createService as createBlobService } from './blob.js' import { createService as createStoreService } from './store.js' import { createService as createUploadService } from './upload.js' import { createService as createConsoleService } from './console.js' @@ -16,6 +17,7 @@ import { createService as createSubscriptionService } from './subscription.js' import { createService as createAdminService } from './admin.js' import { createService as createRateLimitService } from './rate-limit.js' import { createService as createUcanService } from './ucan.js' +import { createService as createW3sService } from './service.js' import { createService as createPlanService } from './plan.js' import { createService as createUsageService } from './usage.js' import { createService as createFilecoinService } from '@web3-storage/filecoin-api/storefront/service' @@ -43,6 +45,7 @@ export const createServer = ({ id, codec = Legacy.inbound, ...context }) => */ export const createService = (context) => ({ access: createAccessService(context), + blob: createBlobService(context), console: createConsoleService(context), consumer: createConsumerService(context), customer: createCustomerService(context), @@ -55,6 +58,7 @@ export const createService = (context) => ({ upload: createUploadService(context), ucan: createUcanService(context), plan: createPlanService(context), + ['web3.storage']: createW3sService(context), // storefront of filecoin pipeline filecoin: createFilecoinService(context).filecoin, usage: createUsageService(context), diff --git a/packages/upload-api/src/service.js b/packages/upload-api/src/service.js new file mode 100644 index 000000000..e650e13d0 --- /dev/null +++ b/packages/upload-api/src/service.js @@ -0,0 +1,15 @@ +import { blobAllocateProvider } from './blob/allocate.js' +import { blobAcceptProvider } from './blob/accept.js' +import * as API from './types.js' + +/** + * @param {API.W3ServiceContext} context + */ +export function createService(context) { + return { + blob: { + allocate: blobAllocateProvider(context), + accept: blobAcceptProvider(context), + }, + } +} diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index 789be64e1..ff91d076f 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -54,6 +54,15 @@ export interface DebugEmail extends Email { } import { + BlobAdd, + BlobAddSuccess, + BlobAddFailure, + BlobAllocate, + BlobAllocateSuccess, + BlobAllocateFailure, + BlobAccept, + BlobAcceptSuccess, + BlobAcceptFailure, StoreAdd, StoreGet, StoreAddSuccess, @@ -123,6 +132,9 @@ import { CARLink, StoreGetSuccess, UploadGetSuccess, + UCANConclude, + UCANConcludeSuccess, + UCANConcludeFailure, UCANRevoke, UCANRevokeSuccess, UCANRevokeFailure, @@ -161,8 +173,20 @@ import { SubscriptionsStorage } from './types/subscriptions.js' export type { SubscriptionsStorage } import { UsageStorage } from './types/usage.js' export type { UsageStorage } - -export interface Service extends StorefrontService { +import { ReceiptsStorage, TasksScheduler } from './types/service.js' +export type { ReceiptsStorage, TasksScheduler } +import { + AllocationsStorage, + BlobsStorage, + TasksStorage, + BlobAddInput, +} from './types/blob.js' +export type { AllocationsStorage, BlobsStorage, TasksStorage, BlobAddInput } + +export interface Service extends StorefrontService, W3sService { + blob: { + add: ServiceMethod + } store: { add: ServiceMethod get: ServiceMethod @@ -239,6 +263,11 @@ export interface Service extends StorefrontService { } ucan: { + conclude: ServiceMethod< + UCANConclude, + UCANConcludeSuccess, + UCANConcludeFailure + > revoke: ServiceMethod } @@ -273,16 +302,51 @@ export interface Service extends StorefrontService { } } -export type StoreServiceContext = SpaceServiceContext & { +export interface W3sService { + ['web3.storage']: { + blob: { + allocate: ServiceMethod< + BlobAllocate, + BlobAllocateSuccess, + BlobAllocateFailure + > + accept: ServiceMethod + } + } +} + +export type BlobServiceContext = SpaceServiceContext & { + /** + * Service signer + */ + id: Signer maxUploadSize: number + allocationsStorage: AllocationsStorage + blobsStorage: BlobsStorage + tasksStorage: TasksStorage + receiptsStorage: ReceiptsStorage + getServiceConnection: () => ConnectionView +} + +export type W3ServiceContext = SpaceServiceContext & { + /** + * Service signer + */ + id: Signer + allocationsStorage: AllocationsStorage + blobsStorage: BlobsStorage +} +export type StoreServiceContext = SpaceServiceContext & { + maxUploadSize: number storeTable: StoreTable carStoreBucket: CarStoreBucket } export type UploadServiceContext = ConsumerServiceContext & SpaceServiceContext & - RevocationServiceContext & { + RevocationServiceContext & + ConcludeServiceContext & { signer: EdSigner.Signer uploadTable: UploadTable dudewhereBucket: DudewhereBucket @@ -345,6 +409,25 @@ export interface RevocationServiceContext { revocationsStorage: RevocationsStorage } +export interface ConcludeServiceContext { + /** + * Service signer + */ + id: Signer + /** + * Stores receipts for tasks. + */ + receiptsStorage: ReceiptsStorage + /** + * Stores tasks. + */ + tasksStorage: TasksStorage + /** + * Task scheduler. + */ + tasksScheduler: TasksScheduler +} + export interface PlanServiceContext { plansStorage: PlansStorage } @@ -362,6 +445,8 @@ export interface ServiceContext ProviderServiceContext, SpaceServiceContext, StoreServiceContext, + BlobServiceContext, + ConcludeServiceContext, SubscriptionServiceContext, RateLimitServiceContext, RevocationServiceContext, @@ -379,6 +464,7 @@ export interface UcantoServerContext extends ServiceContext, RevocationChecker { export interface UcantoServerTestContext extends UcantoServerContext, StoreTestContext, + BlobServiceContext, UploadTestContext { connection: ConnectionView mail: DebugEmail diff --git a/packages/upload-api/src/types/blob.ts b/packages/upload-api/src/types/blob.ts new file mode 100644 index 000000000..5e5e8b806 --- /dev/null +++ b/packages/upload-api/src/types/blob.ts @@ -0,0 +1,74 @@ +import type { + UnknownLink, + Invocation, + Result, + Failure, + DID, +} from '@ucanto/interface' +import { BlobMultihash, BlobListItem } from '@web3-storage/capabilities/types' + +import { RecordKeyConflict, ListOptions, ListResponse } from '../types.js' +import { Storage } from './storage.js' + +export type TasksStorage = Storage + +export interface AllocationsStorage { + get: ( + space: DID, + blobMultihash: BlobMultihash + ) => Promise> + exists: ( + space: DID, + blobMultihash: BlobMultihash + ) => Promise> + /** Inserts an item in the table if it does not already exist. */ + insert: ( + item: BlobAddInput + ) => Promise> + list: ( + space: DID, + options?: ListOptions + ) => Promise, Failure>> +} + +export interface BlobModel { + digest: BlobMultihash + size: number +} + +export interface BlobAddInput { + space: DID + invocation: UnknownLink + blob: BlobModel +} + +export interface BlobAddOutput + extends Omit {} + +export interface BlobGetOutput { + blob: { digest: Uint8Array; size: number } + invocation: UnknownLink +} + +export interface BlobsStorage { + has: (content: BlobMultihash) => Promise> + createUploadUrl: ( + content: BlobMultihash, + size: number, + /** + * The number of seconds before the presigned URL expires + */ + expiresIn: number + ) => Promise< + Result< + { + url: URL + headers: { + 'x-amz-checksum-sha256': string + 'content-length': string + } & Record + }, + Failure + > + > +} diff --git a/packages/upload-api/src/types/service.ts b/packages/upload-api/src/types/service.ts new file mode 100644 index 000000000..2575e5a3a --- /dev/null +++ b/packages/upload-api/src/types/service.ts @@ -0,0 +1,15 @@ +import type { + UnknownLink, + Receipt, + Invocation, + Result, + Unit, + Failure, +} from '@ucanto/interface' +import { Storage } from './storage.js' + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ReceiptsStorage = Storage> +export interface TasksScheduler { + schedule: (invocation: Invocation) => Promise> +} diff --git a/packages/upload-api/src/types/storage.ts b/packages/upload-api/src/types/storage.ts new file mode 100644 index 000000000..7cf5fea72 --- /dev/null +++ b/packages/upload-api/src/types/storage.ts @@ -0,0 +1,20 @@ +import type { Unit, Result } from '@ucanto/interface' +import { + StorageGetError, + StoragePutError, +} from '@web3-storage/capabilities/types' + +export interface Storage { + /** + * Puts a record in the store. + */ + put: (record: Rec) => Promise> + /** + * Gets a record from the store. + */ + get: (key: RecKey) => Promise> + /** + * Determine if a record already exists in the store for the given key. + */ + has: (key: RecKey) => Promise> +} diff --git a/packages/upload-api/src/ucan.js b/packages/upload-api/src/ucan.js index 9bf9b14a6..47bc64df8 100644 --- a/packages/upload-api/src/ucan.js +++ b/packages/upload-api/src/ucan.js @@ -1,4 +1,5 @@ import { ucanRevokeProvider } from './ucan/revoke.js' +import { ucanConcludeProvider } from './ucan/conclude.js' import * as API from './types.js' /** @@ -6,6 +7,7 @@ import * as API from './types.js' */ export const createService = (context) => { return { + conclude: ucanConcludeProvider(context), revoke: ucanRevokeProvider(context), } } diff --git a/packages/upload-api/src/ucan/conclude.js b/packages/upload-api/src/ucan/conclude.js new file mode 100644 index 000000000..564811efe --- /dev/null +++ b/packages/upload-api/src/ucan/conclude.js @@ -0,0 +1,153 @@ +import * as API from '../types.js' +import { provide } from '@ucanto/server' +import { Receipt } from '@ucanto/core' +import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob' +import * as HTTP from '@web3-storage/capabilities/http' +import { conclude } from '@web3-storage/capabilities/ucan' +import { equals } from 'uint8arrays/equals' + +import { ReferencedInvocationNotFound } from './lib.js' + +/** + * @param {API.ConcludeServiceContext} context + * @returns {API.ServiceMethod} + */ +export const ucanConcludeProvider = ({ + id, + receiptsStorage, + tasksStorage, + tasksScheduler, +}) => + provide(conclude, async ({ invocation }) => { + const receipt = getConcludeReceipt(invocation) + + // Verify invocation exists failing with ReceiptInvocationNotFound + const ranInvocation = receipt.ran + const httpPutTaskGetRes = await tasksStorage.get(ranInvocation.link()) + if (httpPutTaskGetRes.error) { + if (httpPutTaskGetRes.error.name === 'RecordNotFound') { + return { + error: new ReferencedInvocationNotFound(ranInvocation.link()), + } + } + return httpPutTaskGetRes + } + + // Store receipt + const receiptPutRes = await receiptsStorage.put(receipt) + if (receiptPutRes.error) { + return { + error: receiptPutRes.error, + } + } + + // THIS IS A TEMPORARY HACK + // Schedule `blob/accept` if there is a `http/put` capabilities + // inside the invocation that this receipt comes from + const scheduleRes = await Promise.all( + httpPutTaskGetRes.ok.capabilities + // Go through invocation tasks and get all `http/put` + .filter((cap) => cap.can === HTTP.put.can) + // @ts-expect-error body exists in `http/put` but unknown type here + .map(async (/** @type {API.HTTPPut} */ cap) => { + // Get triggering task (blob/allocate) by checking blocking task from `url` + /** @type {API.UnknownLink} */ + // @ts-expect-error ts does not know how to get this + const [, blobAllocateTaskCid] = cap.nb.url['ucan/await'] + const blobAllocateTaskGet = await tasksStorage.get( + blobAllocateTaskCid + ) + if (blobAllocateTaskGet.error) { + return blobAllocateTaskGet + } + + /** @type {API.BlobAllocate} */ + // @ts-expect-error ts does not know how to get this + const allocateCapability = blobAllocateTaskGet.ok.capabilities.find( + // @ts-expect-error ts does not know how to get this + (/** @type {API.BlobAllocate} */ allocateCap) => + equals(allocateCap.nb.blob.digest, cap.nb.body.digest) && + allocateCap.can === W3sBlob.allocate.can + ) + + const blobAccept = await W3sBlob.accept + .invoke({ + issuer: id, + audience: id, + with: id.toDIDKey(), + nb: { + blob: cap.nb.body, + space: allocateCapability.nb.space, + _put: { + 'ucan/await': ['.out.ok', ranInvocation.link()], + }, + }, + // Expiry is set to `Infinity` so that CID will come out the same + // as returned in effect on `blob/add` + expiration: Infinity, + }) + .delegate() + + return tasksScheduler.schedule(blobAccept) + }) + ) + + const scheduleErrors = scheduleRes.filter((res) => res.error) + if (scheduleErrors.length && scheduleErrors[0].error) { + return { + error: scheduleErrors[0].error, + } + } + + return { + ok: { time: Date.now() }, + } + }) + +/** + * @param {import('@ucanto/interface').Invocation} concludeFx + */ +export function getConcludeReceipt(concludeFx) { + const receiptBlocks = new Map() + for (const block of concludeFx.iterateIPLDBlocks()) { + receiptBlocks.set(`${block.cid}`, block) + } + return Receipt.view({ + // @ts-expect-error object of type unknown + root: concludeFx.capabilities[0].nb.receipt, + blocks: receiptBlocks, + }) +} + +/** + * @param {API.Signer} id + * @param {API.Verifier} serviceDid + * @param {API.Receipt} receipt + */ +export function createConcludeInvocation(id, serviceDid, receipt) { + const receiptBlocks = [] + const receiptCids = [] + for (const block of receipt.iterateIPLDBlocks()) { + receiptBlocks.push(block) + receiptCids.push(block.cid) + } + const concludeAllocatefx = conclude.invoke({ + issuer: id, + audience: serviceDid, + with: id.toDIDKey(), + nb: { + receipt: receipt.link(), + }, + expiration: Infinity, + facts: [ + { + ...receiptCids, + }, + ], + }) + for (const block of receiptBlocks) { + concludeAllocatefx.attach(block) + } + + return concludeAllocatefx +} diff --git a/packages/upload-api/src/ucan/lib.js b/packages/upload-api/src/ucan/lib.js new file mode 100644 index 000000000..7d5d33c43 --- /dev/null +++ b/packages/upload-api/src/ucan/lib.js @@ -0,0 +1,30 @@ +import { Failure } from '@ucanto/server' + +export const ReferencedInvocationNotFoundName = 'ReferencedInvocationNotFound' +export class ReferencedInvocationNotFound extends Failure { + /** + * @param {import('@ucanto/interface').Link} [invocation] + */ + constructor(invocation) { + super() + this.invocation = invocation + } + + get name() { + return ReferencedInvocationNotFoundName + } + + describe() { + if (this.invocation) { + return `Invocation not found in ${this.invocation.toString()}` + } + return `Invocation not found` + } + + toJSON() { + return { + ...super.toJSON(), + invocation: this.invocation, + } + } +} diff --git a/packages/upload-api/test/handlers/blob.js b/packages/upload-api/test/handlers/blob.js new file mode 100644 index 000000000..ac9eb2366 --- /dev/null +++ b/packages/upload-api/test/handlers/blob.js @@ -0,0 +1,337 @@ +import * as API from '../../src/types.js' +import { sha256 } from 'multiformats/hashes/sha2' +import { ed25519 } from '@ucanto/principal' +import { Receipt } from '@ucanto/core' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import * as HTTPCapabilities from '@web3-storage/capabilities/http' + +import { createServer, connect } from '../../src/lib.js' +import { alice, registerSpace } from '../util.js' +import { BlobSizeOutsideOfSupportedRangeName } from '../../src/blob/lib.js' +import { createConcludeInvocation } from '../../src/ucan/conclude.js' +import { parseBlobAddReceiptNext } from '../helpers/blob.js' + +/** + * @type {API.Tests} + */ +export const test = { + 'blob/add schedules allocation and returns effects for allocate (and its receipt), put and accept': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // invoke `blob/add` + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const blobAdd = await invocation.execute(connection) + if (!blobAdd.out.ok) { + throw new Error('invocation failed', { cause: blobAdd }) + } + + // Validate receipt structure + assert.ok(blobAdd.out.ok.site) + assert.equal(blobAdd.out.ok.site['ucan/await'][0], '.out.ok.site') + assert.ok( + blobAdd.out.ok.site['ucan/await'][1].equals(blobAdd.fx.join?.link()) + ) + assert.ok(blobAdd.fx.join) + assert.equal(blobAdd.fx.fork.length, 3) + + // validate receipt next + const next = parseBlobAddReceiptNext(blobAdd) + assert.ok(next.allocate.task) + assert.ok(next.put.task) + assert.ok(next.accept.task) + assert.ok(next.allocate.receipt) + assert.ok(!next.put.receipt) + assert.ok(!next.accept.receipt) + + // validate facts exist for `http/put` + assert.ok(next.put.task.facts.length) + assert.ok(next.put.task.facts[0]['keys']) + + // Validate `http/put` invocation was stored + const httpPutGetTask = await context.tasksStorage.get(next.put.task.cid) + assert.ok(httpPutGetTask.ok) + + // validate that scheduled allocate task executed and has its receipt content + const receipt = next.allocate.receipt + assert.ok(receipt.out) + assert.ok(receipt.out.ok) + assert.equal(receipt.out.ok?.size, size) + assert.ok(receipt.out.ok?.address) + }, + 'blob/add schedules allocation only on first blob/add': async ( + assert, + context + ) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + // Invoke `blob/add` for the first time + const firstBlobAdd = await invocation.execute(connection) + if (!firstBlobAdd.out.ok) { + throw new Error('invocation failed', { cause: firstBlobAdd }) + } + + // parse first receipt next + const firstNext = parseBlobAddReceiptNext(firstBlobAdd) + assert.ok(firstNext.allocate.task) + assert.ok(firstNext.put.task) + assert.ok(firstNext.accept.task) + assert.ok(firstNext.allocate.receipt) + assert.ok(!firstNext.put.receipt) + assert.ok(!firstNext.accept.receipt) + + // Store allocate receipt to not re-schedule + const receiptPutRes = await context.receiptsStorage.put( + firstNext.allocate.receipt + ) + assert.ok(receiptPutRes.ok) + + // Invoke `blob/add` for the second time (without storing the blob) + const secondBlobAdd = await invocation.execute(connection) + if (!secondBlobAdd.out.ok) { + throw new Error('invocation failed', { cause: secondBlobAdd }) + } + + // parse second receipt next + const secondNext = parseBlobAddReceiptNext(secondBlobAdd) + assert.ok(secondNext.allocate.task) + assert.ok(secondNext.put.task) + assert.ok(secondNext.accept.task) + assert.ok(secondNext.allocate.receipt) + assert.ok(!secondNext.put.receipt) + assert.ok(!secondNext.accept.receipt) + // allocate receipt is from same invocation CID + assert.ok( + firstNext.allocate.task.link().equals(secondNext.allocate.task.link()) + ) + }, + 'blob/add schedules allocation and returns effects for allocate, accept and put together with their receipts (when stored)': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + // Invoke `blob/add` for the first time + const firstBlobAdd = await invocation.execute(connection) + if (!firstBlobAdd.out.ok) { + throw new Error('invocation failed', { cause: firstBlobAdd }) + } + + // parse first receipt next + const firstNext = parseBlobAddReceiptNext(firstBlobAdd) + assert.ok(firstNext.allocate.task) + assert.ok(firstNext.put.task) + assert.ok(firstNext.accept.task) + assert.ok(firstNext.allocate.receipt) + assert.ok(!firstNext.put.receipt) + assert.ok(!firstNext.accept.receipt) + + // Store allocate receipt to not re-schedule + const receiptPutRes = await context.receiptsStorage.put( + firstNext.allocate.receipt + ) + assert.ok(receiptPutRes.ok) + + /** @type {import('@web3-storage/capabilities/types').BlobAddress} */ + // @ts-expect-error receipt type is unknown + const address = firstNext.allocate.receipt.out.ok.address + + // Store the blob to the address + const goodPut = await fetch(address.url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: address.headers, + }) + assert.equal(goodPut.status, 200, await goodPut.text()) + + // Invoke `blob/add` for the second time (after storing the blob but not invoking conclude) + const secondBlobAdd = await invocation.execute(connection) + if (!secondBlobAdd.out.ok) { + throw new Error('invocation failed', { cause: secondBlobAdd }) + } + + // parse second receipt next + const secondNext = parseBlobAddReceiptNext(secondBlobAdd) + assert.ok(secondNext.allocate.task) + assert.ok(secondNext.put.task) + assert.ok(secondNext.accept.task) + assert.ok(secondNext.allocate.receipt) + assert.ok(!secondNext.put.receipt) + assert.ok(!secondNext.accept.receipt) + + // Store blob/allocate given conclude needs it to schedule blob/accept + // Store allocate task to be fetchable from allocate + await context.tasksStorage.put(secondNext.allocate.task) + + // Invoke `conclude` with `http/put` receipt + const keys = secondNext.put.task.facts[0]['keys'] + // @ts-expect-error Argument of type 'unknown' is not assignable to parameter of type 'SignerArchive<`did:${string}:${string}`, SigAlg>' + const blobProvider = ed25519.from(keys) + const httpPut = HTTPCapabilities.put.invoke({ + issuer: blobProvider, + audience: blobProvider, + with: blobProvider.toDIDKey(), + nb: { + body: { + digest, + size, + }, + url: { + 'ucan/await': ['.out.ok.address.url', secondNext.allocate.task.cid], + }, + headers: { + 'ucan/await': [ + '.out.ok.address.headers', + secondNext.allocate.task.cid, + ], + }, + }, + facts: secondNext.put.task.facts, + expiration: Infinity, + }) + + const httpPutDelegation = await httpPut.delegate() + const httpPutReceipt = await Receipt.issue({ + issuer: blobProvider, + ran: httpPutDelegation.cid, + result: { + ok: {}, + }, + }) + const httpPutConcludeInvocation = createConcludeInvocation( + alice, + context.id, + httpPutReceipt + ) + const ucanConclude = await httpPutConcludeInvocation.execute(connection) + if (!ucanConclude.out.ok) { + throw new Error('invocation failed', { cause: ucanConclude.out }) + } + + // Invoke `blob/add` for the third time (after invoking conclude) + const thirdBlobAdd = await invocation.execute(connection) + if (!thirdBlobAdd.out.ok) { + throw new Error('invocation failed', { cause: thirdBlobAdd }) + } + + // parse third receipt next + const thirdNext = parseBlobAddReceiptNext(thirdBlobAdd) + assert.ok(thirdNext.allocate.task) + assert.ok(thirdNext.put.task) + assert.ok(thirdNext.accept.task) + assert.ok(thirdNext.allocate.receipt) + assert.ok(thirdNext.put.receipt) + assert.ok(thirdNext.accept.receipt) + + assert.ok(thirdNext.allocate.receipt.out.ok?.address) + assert.deepEqual(thirdNext.put.receipt?.out.ok, {}) + assert.ok(thirdNext.accept.receipt?.out.ok?.site) + }, + 'blob/add fails when a blob with size bigger than maximum size is added': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // invoke `blob/add` + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size: Number.MAX_SAFE_INTEGER, + }, + }, + proofs: [proof], + }) + const blobAdd = await invocation.execute(connection) + if (!blobAdd.out.error) { + throw new Error('invocation should have failed') + } + assert.ok(blobAdd.out.error, 'invocation should have failed') + assert.equal(blobAdd.out.error.name, BlobSizeOutsideOfSupportedRangeName) + }, +} diff --git a/packages/upload-api/test/handlers/blob.spec.js b/packages/upload-api/test/handlers/blob.spec.js new file mode 100644 index 000000000..c8bd740b7 --- /dev/null +++ b/packages/upload-api/test/handlers/blob.spec.js @@ -0,0 +1,4 @@ +import { test } from '../test.js' +import * as Blob from './blob.js' + +test({ 'blob/*': Blob.test }) diff --git a/packages/upload-api/test/handlers/ucan.js b/packages/upload-api/test/handlers/ucan.js index 1686decd4..0a586db98 100644 --- a/packages/upload-api/test/handlers/ucan.js +++ b/packages/upload-api/test/handlers/ucan.js @@ -1,6 +1,18 @@ import * as API from '../../src/types.js' -import { alice, bob, mallory } from '../util.js' import { UCAN, Console } from '@web3-storage/capabilities' +import pDefer from 'p-defer' +import { Receipt } from '@ucanto/core' +import { ed25519 } from '@ucanto/principal' +import { sha256 } from 'multiformats/hashes/sha2' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' +import * as HTTPCapabilities from '@web3-storage/capabilities/http' + +import { createServer, connect } from '../../src/lib.js' +import { alice, bob, mallory, registerSpace } from '../util.js' +import { createConcludeInvocation } from '../../src/ucan/conclude.js' +import { ReferencedInvocationNotFoundName } from '../../src/ucan/lib.js' +import { parseBlobAddReceiptNext } from '../helpers/blob.js' /** * @type {API.Tests} @@ -352,4 +364,186 @@ export const test = { assert.ok(String(revoke.out.error?.message).match(/Constrain violation/)) }, + 'ucan/conclude writes a receipt for a task previously scheduled': async ( + assert, + context + ) => { + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + // Invoke something + const proof = await Console.log.delegate({ + issuer: context.id, + audience: alice, + with: context.id.did(), + }) + const invocation = Console.log.invoke({ + issuer: alice, + audience: context.id, + with: context.id.did(), + nb: { value: 'hello' }, + proofs: [proof], + }) + + const success = await invocation.execute(connection) + + if (!success.out.ok) { + throw new Error('invocation failed', { cause: success }) + } + + // Create conclude invocation + const concludeInvocation = createConcludeInvocation( + alice, + context.id, + success + ) + const ucanConcludeFail = await concludeInvocation.execute(connection) + assert.ok(ucanConcludeFail.out.error) + assert.equal( + ucanConcludeFail.out.error?.name, + ReferencedInvocationNotFoundName + ) + + // Store scheduled task + await context.tasksStorage.put(await invocation.delegate()) + + const ucanConcludeSuccess = await concludeInvocation.execute(connection) + assert.ok(ucanConcludeSuccess.out.ok) + assert.ok(ucanConcludeSuccess.out.ok?.time) + }, + 'ucan/conclude schedules web3.storage/blob/accept if invoked with the blob put receipt': + async (assert, context) => { + const taskScheduled = pDefer() + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer({ + ...context, + tasksScheduler: { + schedule: (invocation) => { + taskScheduled.resolve(invocation) + + return Promise.resolve({ + ok: {}, + }) + }, + }, + }), + }) + + // invoke `blob/add` + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const blobAdd = await blobAddInvocation.execute(connection) + if (!blobAdd.out.ok) { + throw new Error('invocation failed', { cause: blobAdd }) + } + + // Get receipt relevant content + const next = parseBlobAddReceiptNext(blobAdd) + + /** + * @type {import('@web3-storage/capabilities/types').BlobAddress} + **/ + // @ts-expect-error receipt out is unknown + const address = next.allocate.receipt.out.ok?.address + assert.ok(address) + + // Store allocate task to be fetchable from allocate + await context.tasksStorage.put(next.allocate.task) + + // Write blob + const goodPut = await fetch(address.url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: address?.headers, + }) + assert.equal(goodPut.status, 200, await goodPut.text()) + + // Create `http/put` receipt + const keys = next.put.task.facts[0]['keys'] + // @ts-expect-error Argument of type 'unknown' is not assignable to parameter of type 'SignerArchive<`did:${string}:${string}`, SigAlg>' + const blobProvider = ed25519.from(keys) + const httpPut = HTTPCapabilities.put.invoke({ + issuer: blobProvider, + audience: blobProvider, + with: blobProvider.toDIDKey(), + nb: { + body: { + digest, + size, + }, + url: { + 'ucan/await': ['.out.ok.address.url', next.allocate.task.link()], + }, + headers: { + 'ucan/await': [ + '.out.ok.address.headers', + next.allocate.task.link(), + ], + }, + }, + facts: next.put.task.facts, + expiration: Infinity, + }) + + const httpPutDelegation = await httpPut.delegate() + const httpPutReceipt = await Receipt.issue({ + issuer: blobProvider, + ran: httpPutDelegation.cid, + result: { + ok: {}, + }, + }) + const httpPutConcludeInvocation = createConcludeInvocation( + alice, + context.id, + httpPutReceipt + ) + const ucanConclude = await httpPutConcludeInvocation.execute(connection) + if (!ucanConclude.out.ok) { + throw new Error('invocation failed', { cause: blobAdd }) + } + + // verify accept was scheduled + /** @type {import('@ucanto/interface').Invocation} */ + const blobAcceptInvocation = await taskScheduled.promise + assert.equal(blobAcceptInvocation.capabilities.length, 1) + assert.equal( + blobAcceptInvocation.capabilities[0].can, + W3sBlobCapabilities.accept.can + ) + assert.equal( + blobAcceptInvocation.capabilities[0].nb._put['ucan/await'][0], + '.out.ok' + ) + assert.ok( + blobAcceptInvocation.capabilities[0].nb._put['ucan/await'][1].equals( + httpPutDelegation.cid + ) + ) + assert.ok(blobAcceptInvocation.capabilities[0].nb.blob) + assert.equal(blobAcceptInvocation.capabilities[0].nb.space, spaceDid) + }, } diff --git a/packages/upload-api/test/handlers/web3.storage.js b/packages/upload-api/test/handlers/web3.storage.js new file mode 100644 index 000000000..a527e3707 --- /dev/null +++ b/packages/upload-api/test/handlers/web3.storage.js @@ -0,0 +1,680 @@ +import * as API from '../../src/types.js' +import { equals } from 'uint8arrays' +import { create as createLink } from 'multiformats/link' +import { Absentee } from '@ucanto/principal' +import { Digest } from 'multiformats/hashes/digest' +import { sha256 } from 'multiformats/hashes/sha2' +import { code as rawCode } from 'multiformats/codecs/raw' +import { Assert } from '@web3-storage/content-claims/capability' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' +import { base64pad } from 'multiformats/bases/base64' + +import { AllocatedMemoryHadNotBeenWrittenToName } from '../../src/blob/lib.js' +import { provisionProvider } from '../helpers/utils.js' +import { createServer, connect } from '../../src/lib.js' +import { alice, bob, createSpace, registerSpace } from '../util.js' +import { parseBlobAddReceiptNext } from '../helpers/blob.js' + +/** + * @type {API.Tests} + */ +export const test = { + 'web3.storage/blob/allocate allocates to space and returns presigned url': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + + // invoke `web3.storage/blob/allocate` + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + + // Validate response + assert.equal(blobAllocate.out.ok.size, size) + assert.ok(blobAllocate.out.ok.address) + assert.ok(blobAllocate.out.ok.address?.headers) + assert.ok(blobAllocate.out.ok.address?.url) + assert.ok(blobAllocate.out.ok.address?.expiresAt) + assert.equal( + blobAllocate.out.ok.address?.headers?.['content-length'], + String(size) + ) + assert.deepEqual( + blobAllocate.out.ok.address?.headers?.['x-amz-checksum-sha256'], + base64pad.baseEncode(multihash.digest) + ) + + const url = + blobAllocate.out.ok.address?.url && + new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const signedHeaders = url.searchParams.get('X-Amz-SignedHeaders') + + assert.equal( + signedHeaders, + 'content-length;host;x-amz-checksum-sha256', + 'content-length and checksum must be part of the signature' + ) + + // Validate allocation state + const spaceAllocations = await context.allocationsStorage.list(spaceDid) + assert.ok(spaceAllocations.ok) + assert.equal(spaceAllocations.ok?.size, 1) + const allocatedEntry = spaceAllocations.ok?.results[0] + if (!allocatedEntry) { + throw new Error('Expected presigned allocatedEntry in response') + } + assert.ok(equals(allocatedEntry.blob.digest, digest)) + assert.equal(allocatedEntry.blob.size, size) + + // Validate presigned url usage + const goodPut = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: blobAllocate.out.ok.address?.headers, + }) + + assert.equal(goodPut.status, 200, await goodPut.text()) + }, + 'web3.storage/blob/allocate does not allocate more space to already allocated content': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + + // invoke `web3.storage/blob/allocate` + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + + // second blob allocate invocation + const secondBlobAllocate = await serviceBlobAllocate.execute(connection) + if (!secondBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: secondBlobAllocate }) + } + + // Validate response + assert.equal(secondBlobAllocate.out.ok.size, 0) + assert.ok(!!blobAllocate.out.ok.address) + }, + 'web3.storage/blob/allocate can allocate to different space after write to one space': + async (assert, context) => { + const { proof: aliceProof, spaceDid: aliceSpaceDid } = + await registerSpace(alice, context) + const { proof: bobProof, spaceDid: bobSpaceDid } = await registerSpace( + bob, + context, + 'bob' + ) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocations + const aliceBlobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [aliceProof], + }) + const bobBlobAddInvocation = BlobCapabilities.add.invoke({ + issuer: bob, + audience: context.id, + with: bobSpaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [bobProof], + }) + + // invoke `web3.storage/blob/allocate` capabilities on alice space + const aliceServiceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: { + digest, + size, + }, + cause: (await aliceBlobAddInvocation.delegate()).cid, + space: aliceSpaceDid, + }, + proofs: [aliceProof], + }) + const aliceBlobAllocate = await aliceServiceBlobAllocate.execute( + connection + ) + if (!aliceBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: aliceBlobAllocate }) + } + // there is address to write + assert.ok(aliceBlobAllocate.out.ok.address) + assert.equal(aliceBlobAllocate.out.ok.size, size) + + // write to presigned url + const url = + aliceBlobAllocate.out.ok.address?.url && + new URL(aliceBlobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const goodPut = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: aliceBlobAllocate.out.ok.address?.headers, + }) + + assert.equal(goodPut.status, 200, await goodPut.text()) + + // invoke `web3.storage/blob/allocate` capabilities on bob space + const bobServiceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ + issuer: bob, + audience: context.id, + with: bobSpaceDid, + nb: { + blob: { + digest, + size, + }, + cause: (await bobBlobAddInvocation.delegate()).cid, + space: bobSpaceDid, + }, + proofs: [bobProof], + }) + const bobBlobAllocate = await bobServiceBlobAllocate.execute(connection) + if (!bobBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: bobBlobAllocate }) + } + // there is no address to write + assert.ok(!bobBlobAllocate.out.ok.address) + assert.equal(bobBlobAllocate.out.ok.size, size) + + // Validate allocation state + const aliceSpaceAllocations = await context.allocationsStorage.list( + aliceSpaceDid + ) + assert.ok(aliceSpaceAllocations.ok) + assert.equal(aliceSpaceAllocations.ok?.size, 1) + + const bobSpaceAllocations = await context.allocationsStorage.list( + bobSpaceDid + ) + assert.ok(bobSpaceAllocations.ok) + assert.equal(bobSpaceAllocations.ok?.size, 1) + }, + 'web3.storage/blob/allocate creates presigned url that can only PUT a payload with right length': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const longer = new Uint8Array([11, 22, 34, 44, 55, 66]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + + // invoke `web3.storage/blob/allocate` + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + // there is address to write + assert.ok(blobAllocate.out.ok.address) + assert.equal(blobAllocate.out.ok.size, size) + + // write to presigned url + const url = + blobAllocate.out.ok.address?.url && + new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const contentLengthFailSignature = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: longer, + headers: { + ...blobAllocate.out.ok.address?.headers, + 'content-length': longer.byteLength.toString(10), + }, + }) + + assert.equal( + contentLengthFailSignature.status >= 400, + true, + 'should fail to upload as content-length differs from that used to sign the url' + ) + }, + 'web3.storage/blob/allocate creates presigned url that can only PUT a payload with exact bytes': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const other = new Uint8Array([10, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + + // invoke `web3.storage/blob/allocate` + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + // there is address to write + assert.ok(blobAllocate.out.ok.address) + assert.equal(blobAllocate.out.ok.size, size) + + // write to presigned url + const url = + blobAllocate.out.ok.address?.url && + new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const failChecksum = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: other, + headers: blobAllocate.out.ok.address?.headers, + }) + + assert.equal( + failChecksum.status, + 400, + 'should fail to upload any other data.' + ) + }, + 'web3.storage/blob/allocate disallowed if invocation fails access verification': + async (assert, context) => { + const { proof, space, spaceDid } = await createSpace(alice) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + + // invoke `web3.storage/blob/allocate` + const serviceBlobAllocate = W3sBlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + assert.ok(blobAllocate.out.error) + assert.equal(blobAllocate.out.error?.message.includes('no storage'), true) + + // Register space and retry + const account = Absentee.from({ + id: 'did:mailto:test.web3.storage:alice', + }) + const providerAdd = await provisionProvider({ + service: /** @type {API.Signer>} */ (context.signer), + agent: alice, + space, + account, + connection, + }) + assert.ok(providerAdd.out.ok) + + const retryBlobAllocate = await serviceBlobAllocate.execute(connection) + assert.equal(retryBlobAllocate.out.error, undefined) + }, + 'web3.storage/blob/accept returns site delegation': async ( + assert, + context + ) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + const content = createLink( + rawCode, + new Digest(sha256.code, 32, digest, digest) + ) + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const blobAdd = await blobAddInvocation.execute(connection) + if (!blobAdd.out.ok) { + throw new Error('invocation failed', { cause: blobAdd }) + } + + // parse receipt next + const next = parseBlobAddReceiptNext(blobAdd) + + /** @type {import('@web3-storage/capabilities/types').BlobAddress} */ + // @ts-expect-error receipt type is unknown + const address = next.allocate.receipt.out.ok.address + + // Store the blob to the address + const goodPut = await fetch(address.url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: address.headers, + }) + assert.equal(goodPut.status, 200, await goodPut.text()) + + // invoke `web3.storage/blob/accept` + const serviceBlobAccept = W3sBlobCapabilities.accept.invoke({ + issuer: context.id, + audience: context.id, + with: context.id.did(), + nb: { + blob: { + digest, + size, + }, + space: spaceDid, + _put: { 'ucan/await': ['.out.ok', next.put.task.link()] }, + }, + proofs: [proof], + }) + const blobAccept = await serviceBlobAccept.execute(connection) + if (!blobAccept.out.ok) { + throw new Error('invocation failed', { cause: blobAccept }) + } + // Validate out + assert.ok(blobAccept.out.ok) + assert.ok(blobAccept.out.ok.site) + + // Validate effect + assert.equal(blobAccept.fx.fork.length, 1) + /** @type {import('@ucanto/interface').Delegation} */ + // @ts-expect-error delegation not assignable to Effect per TS understanding + const delegation = blobAccept.fx.fork[0] + assert.equal(delegation.capabilities.length, 1) + assert.ok(delegation.capabilities[0].can, Assert.location.can) + // @ts-expect-error nb unknown + assert.ok(delegation.capabilities[0].nb.content.equals(content)) + // @ts-expect-error nb unknown + const locations = delegation.capabilities[0].nb.location + assert.equal(locations.length, 1) + assert.ok( + locations[0].includes( + `https://w3s.link/ipfs/${content.toString()}?origin` + ) + ) + }, + 'web3.storage/blob/accept fails to provide site delegation when blob was not stored': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const blobAdd = await blobAddInvocation.execute(connection) + if (!blobAdd.out.ok) { + throw new Error('invocation failed', { cause: blobAdd }) + } + + // parse receipt next + const next = parseBlobAddReceiptNext(blobAdd) + + // invoke `web3.storage/blob/accept` + const serviceBlobAccept = W3sBlobCapabilities.accept.invoke({ + issuer: context.id, + audience: context.id, + with: context.id.did(), + nb: { + blob: { + digest, + size, + }, + space: spaceDid, + _put: { 'ucan/await': ['.out.ok', next.put.task.link()] }, + }, + proofs: [proof], + }) + const blobAccept = await serviceBlobAccept.execute(connection) + // Validate out error + assert.ok(blobAccept.out.error) + assert.equal( + blobAccept.out.error?.name, + AllocatedMemoryHadNotBeenWrittenToName + ) + }, +} diff --git a/packages/upload-api/test/handlers/web3.storage.spec.js b/packages/upload-api/test/handlers/web3.storage.spec.js new file mode 100644 index 000000000..b68050534 --- /dev/null +++ b/packages/upload-api/test/handlers/web3.storage.spec.js @@ -0,0 +1,4 @@ +import { test } from '../test.js' +import * as W3s from './web3.storage.js' + +test({ 'web3.storage/*': W3s.test }) diff --git a/packages/upload-api/test/helpers/blob.js b/packages/upload-api/test/helpers/blob.js new file mode 100644 index 000000000..1bd2a36d6 --- /dev/null +++ b/packages/upload-api/test/helpers/blob.js @@ -0,0 +1,69 @@ +import * as API from '../../src/types.js' + +import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' +import * as HTTPCapabilities from '@web3-storage/capabilities/http' +import * as UCAN from '@web3-storage/capabilities/ucan' + +import { getConcludeReceipt } from '../../src/ucan/conclude.js' + +/** + * @param {API.Receipt} receipt + */ +export function parseBlobAddReceiptNext(receipt) { + // Get invocations next + /** + * @type {import('@ucanto/interface').Invocation[]} + **/ + // @ts-expect-error read only effect + const forkInvocations = receipt.fx.fork + const allocateTask = forkInvocations.find( + (fork) => fork.capabilities[0].can === W3sBlobCapabilities.allocate.can + ) + const concludefxs = forkInvocations.filter( + (fork) => fork.capabilities[0].can === UCAN.conclude.can + ) + const putTask = forkInvocations.find( + (fork) => fork.capabilities[0].can === HTTPCapabilities.put.can + ) + const acceptTask = receipt.fx.join + if (!allocateTask || !concludefxs.length || !putTask || !acceptTask) { + throw new Error('mandatory effects not received') + } + + // Decode receipts available + const nextReceipts = concludefxs.map((fx) => getConcludeReceipt(fx)) + /** @type {API.Receipt | undefined} */ + // @ts-expect-error types unknown for next + const allocateReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(allocateTask.cid) + ) + /** @type {API.Receipt<{}, API.Failure> | undefined} */ + // @ts-expect-error types unknown for next + const putReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(putTask.cid) + ) + /** @type {API.Receipt | undefined} */ + // @ts-expect-error types unknown for next + const acceptReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(acceptTask.link()) + ) + + if (!allocateReceipt) { + throw new Error('mandatory effects not received') + } + + return { + allocate: { + task: allocateTask, + receipt: allocateReceipt, + }, + put: { + task: putTask, + receipt: putReceipt, + }, + accept: { + task: acceptTask, + receipt: acceptReceipt, + }, + } +} diff --git a/packages/upload-api/test/helpers/context.js b/packages/upload-api/test/helpers/context.js index 0c126d122..e80fdd53b 100644 --- a/packages/upload-api/test/helpers/context.js +++ b/packages/upload-api/test/helpers/context.js @@ -2,26 +2,18 @@ import * as Signer from '@ucanto/principal/ed25519' import { getConnection, getMockService, - getStoreImplementations, - getQueueImplementations, + getStoreImplementations as getFilecoinStoreImplementations, + getQueueImplementations as getFilecoinQueueImplementations, } from '@web3-storage/filecoin-api/test/context/service' +import { BlobsStorage } from '../storage/blobs-storage.js' import { CarStoreBucket } from '../storage/car-store-bucket.js' -import { StoreTable } from '../storage/store-table.js' -import { UploadTable } from '../storage/upload-table.js' -import { DudewhereBucket } from '../storage/dude-where-bucket.js' -import { ProvisionsStorage } from '../storage/provisions-storage.js' -import { DelegationsStorage } from '../storage/delegations-storage.js' -import { RateLimitsStorage } from '../storage/rate-limits-storage.js' -import { RevocationsStorage } from '../storage/revocations-storage.js' import * as Email from '../../src/utils/email.js' import { create as createRevocationChecker } from '../../src/utils/revocation.js' import { createServer, connect } from '../../src/lib.js' import * as Types from '../../src/types.js' import * as TestTypes from '../types.js' import { confirmConfirmationUrl } from './utils.js' -import { PlansStorage } from '../storage/plans-storage.js' -import { UsageStorage } from '../storage/usage-storage.js' -import { SubscriptionsStorage } from '../storage/subscriptions-storage.js' +import { getServiceStorageImplementations } from '../storage/index.js' /** * @param {object} options @@ -35,15 +27,6 @@ export const createContext = async ( options = { requirePaymentPlan: false } ) => { const requirePaymentPlan = options.requirePaymentPlan - const storeTable = new StoreTable() - const uploadTable = new UploadTable() - const carStoreBucket = await CarStoreBucket.activate(options) - const dudewhereBucket = new DudewhereBucket() - const revocationsStorage = new RevocationsStorage() - const plansStorage = new PlansStorage() - const usageStorage = new UsageStorage(storeTable) - const provisionsStorage = new ProvisionsStorage(options.providers) - const subscriptionsStorage = new SubscriptionsStorage(provisionsStorage) const signer = await Signer.generate() const aggregatorSigner = await Signer.generate() const dealTrackerSigner = await Signer.generate() @@ -55,14 +38,16 @@ export const createContext = async ( service ).connection + const serviceStores = await getServiceStorageImplementations(options) + /** @type {Map} */ const queuedMessages = new Map() const { storefront: { filecoinSubmitQueue, pieceOfferQueue }, - } = getQueueImplementations(queuedMessages) + } = getFilecoinQueueImplementations(queuedMessages) const { storefront: { pieceStore, receiptStore, taskStore }, - } = getStoreImplementations() + } = getFilecoinStoreImplementations() const email = Email.debug() /** @type { import('../../src/types.js').UcantoServerContext } */ @@ -71,14 +56,19 @@ export const createContext = async ( aggregatorId: aggregatorSigner, signer: id, email, + requirePaymentPlan, url: new URL('http://localhost:8787'), - provisionsStorage, - subscriptionsStorage, - delegationsStorage: new DelegationsStorage(), - rateLimitsStorage: new RateLimitsStorage(), - plansStorage, - usageStorage, - revocationsStorage, + ...serviceStores, + tasksScheduler: { + schedule: () => + Promise.resolve({ + ok: {}, + }), + }, + getServiceConnection: () => connection, + ...createRevocationChecker({ + revocationsStorage: serviceStores.revocationsStorage, + }), errorReporter: { catch(error) { if (options.assert) { @@ -88,17 +78,13 @@ export const createContext = async ( } }, }, + // Filecoin maxUploadSize: 5_000_000_000, - storeTable, - uploadTable, - carStoreBucket, - dudewhereBucket, filecoinSubmitQueue, pieceOfferQueue, pieceStore, receiptStore, taskStore, - requirePaymentPlan, dealTrackerService: { connection: dealTrackerConnection, invocationConfig: { @@ -107,7 +93,6 @@ export const createContext = async ( audience: dealTrackerSigner, }, }, - ...createRevocationChecker({ revocationsStorage }), } const connection = connect({ @@ -132,7 +117,11 @@ export const createContext = async ( export const cleanupContext = async (context) => { /** @type {CarStoreBucket & { deactivate: () => Promise }}} */ // @ts-ignore type misses S3 bucket properties like accessKey - const store = context.carStoreBucket + const carStoreBucket = context.carStoreBucket + await carStoreBucket.deactivate() - await store.deactivate() + /** @type {BlobsStorage & { deactivate: () => Promise }}} */ + // @ts-ignore type misses S3 bucket properties like accessKey + const blobsStorage = context.blobsStorage + await blobsStorage.deactivate() } diff --git a/packages/upload-api/test/lib.js b/packages/upload-api/test/lib.js index 1d830e8cc..15425b86c 100644 --- a/packages/upload-api/test/lib.js +++ b/packages/upload-api/test/lib.js @@ -7,6 +7,7 @@ import * as RateLimitAdd from './handlers/rate-limit/add.js' import * as RateLimitList from './handlers/rate-limit/list.js' import * as RateLimitRemove from './handlers/rate-limit/remove.js' import * as Store from './handlers/store.js' +import * as Blob from './handlers/blob.js' import * as Subscription from './handlers/subscription.js' import * as Upload from './handlers/upload.js' import * as Plan from './handlers/plan.js' @@ -23,6 +24,7 @@ export * from './util.js' export const test = { ...Store.test, + ...Blob.test, ...Upload.test, } @@ -44,6 +46,7 @@ export const handlerTests = { ...RateLimitList, ...RateLimitRemove, ...Store.test, + ...Blob.test, ...Subscription.test, ...Upload.test, ...Plan.test, diff --git a/packages/upload-api/test/storage/allocations-storage-tests.js b/packages/upload-api/test/storage/allocations-storage-tests.js new file mode 100644 index 000000000..cb74574a9 --- /dev/null +++ b/packages/upload-api/test/storage/allocations-storage-tests.js @@ -0,0 +1,313 @@ +import * as API from '../../src/types.js' + +import { sha256 } from 'multiformats/hashes/sha2' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import { equals } from 'uint8arrays' + +import { + RecordKeyConflictName, + RecordNotFoundErrorName, +} from '../../src/errors.js' +import { alice, bob, registerSpace } from '../util.js' + +/** + * @type {API.Tests} + */ +export const test = { + 'should store allocations': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const allocationsStorage = context.allocationsStorage + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const invocation = (await blobAdd.delegate()).link() + + const allocationInsert = await allocationsStorage.insert({ + space: spaceDid, + blob: { + digest, + size, + }, + invocation, + }) + + assert.ok(allocationInsert.ok) + assert.ok(allocationInsert.ok?.blob) + }, + 'should store same allocation once': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const allocationsStorage = context.allocationsStorage + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const invocation = (await blobAdd.delegate()).link() + + const allocationInsert0 = await allocationsStorage.insert({ + space: spaceDid, + blob: { + digest, + size, + }, + invocation, + }) + assert.ok(allocationInsert0.ok) + + const allocationInsert1 = await allocationsStorage.insert({ + space: spaceDid, + blob: { + digest, + size, + }, + invocation, + }) + assert.ok(allocationInsert1.error) + assert.equal(allocationInsert1.error?.name, RecordKeyConflictName) + }, + 'should get allocations only when available': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const allocationsStorage = context.allocationsStorage + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + const allocationGet0 = await allocationsStorage.get(spaceDid, digest) + assert.ok(allocationGet0.error) + assert.equal(allocationGet0.error?.name, RecordNotFoundErrorName) + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const invocation = (await blobAdd.delegate()).link() + + const allocationInsert = await allocationsStorage.insert({ + space: spaceDid, + blob: { + digest, + size, + }, + invocation, + }) + + assert.ok(allocationInsert.ok) + assert.ok(allocationInsert.ok?.blob) + + const allocationGet1 = await allocationsStorage.get(spaceDid, digest) + assert.ok(allocationGet1.ok) + assert.ok(allocationGet1.ok?.blob) + assert.equal(allocationGet1.ok?.blob.size, size) + assert.ok( + equals(digest, allocationGet1.ok?.blob.digest || new Uint8Array()) + ) + assert.ok(allocationGet1.ok?.invocation) + }, + 'should verify allocations exist': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const allocationsStorage = context.allocationsStorage + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + const allocationExist0 = await allocationsStorage.exists(spaceDid, digest) + assert.ok(!allocationExist0.error) + assert.ok(!allocationExist0.ok) + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const invocation = (await blobAdd.delegate()).link() + + const allocationInsert = await allocationsStorage.insert({ + space: spaceDid, + blob: { + digest, + size, + }, + invocation, + }) + + assert.ok(allocationInsert.ok) + assert.ok(allocationInsert.ok?.blob) + + const allocationExist1 = await allocationsStorage.exists(spaceDid, digest) + assert.ok(allocationExist1.ok) + assert.ok(!allocationExist1.error) + }, + 'should list all allocations in a space': async (assert, context) => { + const { proof: aliceProof, spaceDid: aliceSpaceDid } = await registerSpace( + alice, + context + ) + const { proof: bobProof, spaceDid: bobSpaceDid } = await registerSpace( + bob, + context + ) + const allocationsStorage = context.allocationsStorage + + // Data for alice + const data0 = new Uint8Array([11, 22, 34, 44, 55]) + const multihash0 = await sha256.digest(data0) + const digest0 = multihash0.bytes + const size0 = data0.byteLength + const blob0 = { + digest: digest0, + size: size0, + } + // Data for bob + const data1 = new Uint8Array([66, 77, 88, 99, 0]) + const multihash1 = await sha256.digest(data1) + const digest1 = multihash1.bytes + const size1 = data1.byteLength + const blob1 = { + digest: digest1, + size: size1, + } + + // Get alice empty allocations + const allocationsAllice0 = await allocationsStorage.list(aliceSpaceDid) + assert.ok(allocationsAllice0.ok) + assert.deepEqual(allocationsAllice0.ok?.results, []) + assert.equal(allocationsAllice0.ok?.size, 0) + + // invoke `blob/add` with alice + const aliceBlobAdd0 = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: blob0, + }, + proofs: [aliceProof], + }) + const aliceInvocation = (await aliceBlobAdd0.delegate()).link() + + // Add alice allocations + const aliceAllocationInsert0 = await allocationsStorage.insert({ + space: aliceSpaceDid, + blob: blob0, + invocation: aliceInvocation, + }) + assert.ok(aliceAllocationInsert0.ok) + + // invoke `blob/add` with bob + const bobBlobAdd = BlobCapabilities.add.invoke({ + issuer: bob, + audience: context.id, + with: bobSpaceDid, + nb: { + blob: blob1, + }, + proofs: [bobProof], + }) + const invocation = (await bobBlobAdd.delegate()).link() + + // Add bob allocations + const bobAllocationInsert = await allocationsStorage.insert({ + space: bobSpaceDid, + blob: blob1, + invocation, + }) + assert.ok(bobAllocationInsert.ok) + + const allocationsAllice1 = await allocationsStorage.list(aliceSpaceDid) + assert.ok(allocationsAllice1.ok) + assert.equal(allocationsAllice1.ok?.size, 1) + assert.equal(allocationsAllice1.ok?.results.length, 1) + assert.ok( + equals( + blob0.digest, + allocationsAllice1.ok?.results[0].blob.digest || new Uint8Array() + ) + ) + + // Add bob's data on alice alloctions + const aliceBlobAdd01 = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: blob1, + }, + proofs: [aliceProof], + }) + const aliceInvocation1 = (await aliceBlobAdd01.delegate()).link() + + // Add alice allocations + const aliceAllocationInsert1 = await allocationsStorage.insert({ + space: aliceSpaceDid, + blob: blob1, + invocation: aliceInvocation1, + }) + assert.ok(aliceAllocationInsert1.ok) + + const allocationsAllice2 = await allocationsStorage.list(aliceSpaceDid) + assert.ok(allocationsAllice2.ok) + assert.equal(allocationsAllice2.ok?.size, 2) + assert.equal(allocationsAllice2.ok?.results.length, 2) + assert.ok( + allocationsAllice2.ok?.results.find((res) => + equals(res.blob.digest, blob0.digest) + ) + ) + assert.ok( + allocationsAllice2.ok?.results.find((res) => + equals(res.blob.digest, blob1.digest) + ) + ) + }, +} diff --git a/packages/upload-api/test/storage/allocations-storage.js b/packages/upload-api/test/storage/allocations-storage.js new file mode 100644 index 000000000..ba3b06181 --- /dev/null +++ b/packages/upload-api/test/storage/allocations-storage.js @@ -0,0 +1,105 @@ +import * as Types from '../../src/types.js' +import { equals } from 'uint8arrays/equals' +import { RecordKeyConflict, RecordNotFound } from '../../src/errors.js' + +/** + * @implements {Types.AllocationsStorage} + */ +export class AllocationsStorage { + constructor() { + /** @type {(Types.BlobAddInput & Types.BlobListItem)[]} */ + this.items = [] + } + + /** + * @param {Types.BlobAddInput} input + * @returns {ReturnType} + */ + async insert({ space, invocation, ...output }) { + if ( + this.items.some( + (i) => i.space === space && equals(i.blob.digest, output.blob.digest) + ) + ) { + return { + error: new RecordKeyConflict(), + } + } + this.items.unshift({ + space, + invocation, + ...output, + insertedAt: new Date().toISOString(), + }) + return { ok: output } + } + + /** + * @param {Types.DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + async get(space, blobMultihash) { + const item = this.items.find( + (i) => i.space === space && equals(i.blob.digest, blobMultihash) + ) + if (!item) { + return { error: new RecordNotFound() } + } + return { ok: item } + } + + /** + * @param {Types.DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + async exists(space, blobMultihash) { + const item = this.items.find( + (i) => i.space === space && equals(i.blob.digest, blobMultihash) + ) + return { ok: !!item } + } + + /** + * @param {Types.DID} space + * @param {Types.ListOptions} options + * @returns {ReturnType} + */ + async list( + space, + { cursor = '0', pre = false, size = this.items.length } = {} + ) { + const offset = parseInt(cursor, 10) + const items = pre ? this.items.slice(0, offset) : this.items.slice(offset) + + const matches = [...items.entries()] + .filter(([n, item]) => item.space === space) + .slice(0, size) + + if (matches.length === 0) { + return { ok: { size: 0, results: [] } } + } + + const first = matches[0] + const last = matches[matches.length - 1] + + const start = first[0] || 0 + const end = last[0] || 0 + const values = matches.map(([_, item]) => item) + + const [before, after, results] = pre + ? [`${start}`, `${end + 1}`, values] + : [`${start + offset}`, `${end + 1 + offset}`, values] + + return { + ok: { + size: values.length, + before, + after, + cursor: after, + results, + }, + } + } +} diff --git a/packages/upload-api/test/storage/allocations-storage.spec.js b/packages/upload-api/test/storage/allocations-storage.spec.js new file mode 100644 index 000000000..817c610f1 --- /dev/null +++ b/packages/upload-api/test/storage/allocations-storage.spec.js @@ -0,0 +1,3 @@ +import * as AllocationsStorage from './allocations-storage-tests.js' +import { test } from '../test.js' +test({ 'in memory allocations storage': AllocationsStorage.test }) diff --git a/packages/upload-api/test/storage/blobs-storage-tests.js b/packages/upload-api/test/storage/blobs-storage-tests.js new file mode 100644 index 000000000..5936b6d47 --- /dev/null +++ b/packages/upload-api/test/storage/blobs-storage-tests.js @@ -0,0 +1,46 @@ +import * as API from '../../src/types.js' + +import { sha256 } from 'multiformats/hashes/sha2' + +/** + * @type {API.Tests} + */ +export const test = { + 'should create valid presigned URL for blobs that can be used to write': + async (assert, context) => { + const blobsStorage = context.blobsStorage + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash0 = await sha256.digest(data) + const digest = multihash0.bytes + const size = data.byteLength + const expiresIn = 60 * 60 * 24 // 1 day + const blob = { + digest: digest, + size: size, + } + const createUploadUrl = await blobsStorage.createUploadUrl( + blob.digest, + blob.size, + expiresIn + ) + if (!createUploadUrl.ok) { + throw new Error('should create presigned url') + } + + assert.ok(createUploadUrl.ok.headers['content-length']) + assert.ok(createUploadUrl.ok.headers['x-amz-checksum-sha256']) + + // Store the blob to the address + const goodPut = await fetch(createUploadUrl.ok.url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: createUploadUrl.ok.headers, + }) + assert.equal(goodPut.status, 200, await goodPut.text()) + + // check it exists + const hasBlob = await blobsStorage.has(blob.digest) + assert.ok(hasBlob.ok) + }, +} diff --git a/packages/upload-api/test/storage/blobs-storage.js b/packages/upload-api/test/storage/blobs-storage.js new file mode 100644 index 000000000..b5fa1e61f --- /dev/null +++ b/packages/upload-api/test/storage/blobs-storage.js @@ -0,0 +1,167 @@ +import * as Types from '../../src/types.js' + +import { base64pad } from 'multiformats/bases/base64' +import { decode as digestDecode } from 'multiformats/hashes/digest' +import { SigV4 } from '@web3-storage/sigv4' +import { base58btc } from 'multiformats/bases/base58' +import { sha256 } from 'multiformats/hashes/sha2' + +/** + * @implements {Types.BlobsStorage} + */ +export class BlobsStorage { + /** + * @param {Types.CarStoreBucketOptions & {http?: import('http')}} options + */ + static async activate({ http, ...options } = {}) { + const content = new Map() + if (http) { + const server = http.createServer(async (request, response) => { + if (request.method === 'PUT') { + const buffer = new Uint8Array( + parseInt(request.headers['content-length'] || '0') + ) + let offset = 0 + for await (const chunk of request) { + buffer.set(chunk, offset) + offset += chunk.length + } + const hash = await sha256.digest(buffer) + const checksum = base64pad.baseEncode(hash.digest) + + if (checksum !== request.headers['x-amz-checksum-sha256']) { + response.writeHead(400, `checksum mismatch`) + } else { + const { pathname } = new URL(request.url || '/', url) + content.set(pathname, buffer) + response.writeHead(200) + } + } else { + response.writeHead(405) + } + + response.end() + // otherwise it keep connection lingering + response.destroy() + }) + await new Promise((resolve) => server.listen(resolve)) + + // @ts-ignore - this is actually what it returns on http + const port = server.address().port + const url = new URL(`http://localhost:${port}`) + + return new BlobsStorage({ + ...options, + content, + url, + server, + }) + } else { + return new BlobsStorage({ + ...options, + content, + url: new URL(`http://localhost:8989`), + }) + } + } + + /** + * @returns {Promise} + */ + async deactivate() { + const { server } = this + if (server) { + await new Promise((resolve, reject) => { + // does not exist in node 16 + if (typeof server.closeAllConnections === 'function') { + server.closeAllConnections() + } + + server.close((error) => { + if (error) { + reject(error) + } else { + resolve(undefined) + } + }) + }) + } + } + + /** + * @param {Types.CarStoreBucketOptions & { server?: import('http').Server, url: URL, content: Map }} options + */ + constructor({ + content, + url, + server, + accessKeyId = 'id', + secretAccessKey = 'secret', + bucket = 'my-bucket', + region = 'eu-central-1', + }) { + this.server = server + this.baseURL = url + this.accessKeyId = accessKeyId + this.secretAccessKey = secretAccessKey + this.bucket = bucket + this.region = region + this.content = content + } + + /** + * @param {Uint8Array} multihash + */ + async has(multihash) { + const encodedMultihash = base58btc.encode(multihash) + return { + ok: this.content.has( + `/${this.bucket}/${encodedMultihash}/${encodedMultihash}.blob` + ), + } + } + + /** + * @param {Uint8Array} multihash + * @param {number} size + * @param {number} expiresIn + */ + async createUploadUrl(multihash, size, expiresIn) { + const { bucket, accessKeyId, secretAccessKey, region, baseURL } = this + const encodedMultihash = base58btc.encode(multihash) + const multihashDigest = digestDecode(multihash) + // sigv4 + const sig = new SigV4({ + accessKeyId, + secretAccessKey, + region, + }) + + const checksum = base64pad.baseEncode(multihashDigest.digest) + const { pathname, search, hash } = sig.sign({ + key: `${encodedMultihash}/${encodedMultihash}.blob`, + checksum, + bucket, + expires: expiresIn, + }) + + const url = new URL(baseURL) + url.search = search + url.pathname = `/${bucket}${pathname}` + url.hash = hash + url.searchParams.set( + 'X-Amz-SignedHeaders', + ['content-length', 'host', 'x-amz-checksum-sha256'].join(';') + ) + + return { + ok: { + url, + headers: { + 'x-amz-checksum-sha256': checksum, + 'content-length': String(size), + }, + }, + } + } +} diff --git a/packages/upload-api/test/storage/blobs-storage.spec.js b/packages/upload-api/test/storage/blobs-storage.spec.js new file mode 100644 index 000000000..33682e9d4 --- /dev/null +++ b/packages/upload-api/test/storage/blobs-storage.spec.js @@ -0,0 +1,3 @@ +import * as BlobsStorage from './blobs-storage-tests.js' +import { test } from '../test.js' +test({ 'in memory blobs storage': BlobsStorage.test }) diff --git a/packages/upload-api/test/storage/index.js b/packages/upload-api/test/storage/index.js new file mode 100644 index 000000000..89d3fc51c --- /dev/null +++ b/packages/upload-api/test/storage/index.js @@ -0,0 +1,58 @@ +import { AllocationsStorage } from './allocations-storage.js' +import { BlobsStorage } from './blobs-storage.js' +import { CarStoreBucket } from './car-store-bucket.js' +import { StoreTable } from './store-table.js' +import { UploadTable } from './upload-table.js' +import { DudewhereBucket } from './dude-where-bucket.js' +import { ProvisionsStorage } from './provisions-storage.js' +import { DelegationsStorage } from './delegations-storage.js' +import { RateLimitsStorage } from './rate-limits-storage.js' +import { RevocationsStorage } from './revocations-storage.js' +import { PlansStorage } from './plans-storage.js' +import { UsageStorage } from './usage-storage.js' +import { SubscriptionsStorage } from './subscriptions-storage.js' +import { TasksStorage } from './tasks-storage.js' +import { ReceiptsStorage } from './receipts-storage.js' + +/** + * @param {object} options + * @param {string[]} [options.providers] + * @param {boolean} [options.requirePaymentPlan] + * @param {import('http')} [options.http] + * @param {{fail(error:unknown): unknown}} [options.assert] + */ +export async function getServiceStorageImplementations(options) { + const storeTable = new StoreTable() + const allocationsStorage = new AllocationsStorage() + const uploadTable = new UploadTable() + const blobsStorage = await BlobsStorage.activate(options) + const carStoreBucket = await CarStoreBucket.activate(options) + const dudewhereBucket = new DudewhereBucket() + const revocationsStorage = new RevocationsStorage() + const plansStorage = new PlansStorage() + const usageStorage = new UsageStorage(storeTable) + const provisionsStorage = new ProvisionsStorage(options.providers) + const subscriptionsStorage = new SubscriptionsStorage(provisionsStorage) + const delegationsStorage = new DelegationsStorage() + const rateLimitsStorage = new RateLimitsStorage() + const tasksStorage = new TasksStorage() + const receiptsStorage = new ReceiptsStorage() + + return { + storeTable, + allocationsStorage, + uploadTable, + blobsStorage, + carStoreBucket, + dudewhereBucket, + revocationsStorage, + plansStorage, + usageStorage, + provisionsStorage, + subscriptionsStorage, + delegationsStorage, + rateLimitsStorage, + tasksStorage, + receiptsStorage, + } +} diff --git a/packages/upload-api/test/storage/receipts-storage-tests.js b/packages/upload-api/test/storage/receipts-storage-tests.js new file mode 100644 index 000000000..4c19a6e54 --- /dev/null +++ b/packages/upload-api/test/storage/receipts-storage-tests.js @@ -0,0 +1,108 @@ +import * as API from '../../src/types.js' + +import { sha256 } from 'multiformats/hashes/sha2' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' + +import { RecordNotFoundErrorName } from '../../src/errors.js' +import { alice, registerSpace } from '../util.js' + +/** + * @type {API.Tests} + */ +export const test = { + 'should be able to store receipts, even the same': async ( + assert, + context + ) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const receiptsStorage = context.receiptsStorage + const connection = context.connection + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + // Invoke `blob/add` + const receipt = await blobAdd.execute(connection) + if (!receipt.out.ok) { + throw new Error('invocation failed', { cause: receipt }) + } + + const putTask0 = await receiptsStorage.put(receipt) + assert.ok(putTask0.ok) + + // same put + const putTask1 = await receiptsStorage.put(receipt) + assert.ok(putTask1.ok) + }, + 'should be able to get stored receipts, or check if they exist': async ( + assert, + context + ) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const receiptsStorage = context.receiptsStorage + const connection = context.connection + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + // Invoke `blob/add` + const receipt = await blobAdd.execute(connection) + if (!receipt.out.ok) { + throw new Error('invocation failed', { cause: receipt }) + } + + // Get before put + const getTask0 = await receiptsStorage.get(receipt.ran.link()) + assert.ok(getTask0.error) + assert.equal(getTask0.error?.name, RecordNotFoundErrorName) + + // Has before put + const hasTask0 = await receiptsStorage.has(receipt.ran.link()) + assert.ok(!hasTask0.error) + assert.ok(!hasTask0.ok) + + // Put task + const putTask = await receiptsStorage.put(receipt) + assert.ok(putTask.ok) + + // Get after put + const getTask1 = await receiptsStorage.get(receipt.ran.link()) + assert.ok(getTask1.ok) + assert.ok(getTask1.ok?.ran.link().equals(receipt.ran.link())) + + // Has after put + const hasTask1 = await receiptsStorage.has(receipt.ran.link()) + assert.ok(!hasTask1.error) + assert.ok(hasTask1.ok) + }, +} diff --git a/packages/upload-api/test/storage/receipts-storage.js b/packages/upload-api/test/storage/receipts-storage.js new file mode 100644 index 000000000..4faf5b365 --- /dev/null +++ b/packages/upload-api/test/storage/receipts-storage.js @@ -0,0 +1,64 @@ +import * as API from '../../src/types.js' + +import { RecordNotFound } from '../../src/errors.js' + +/** + * @typedef {import('@web3-storage/capabilities/types').StorageGetError} StorageGetError + * @typedef {import('@web3-storage/capabilities/types').StoragePutError} StoragePutError + * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink + * @typedef {import('@ucanto/interface').Receipt} Receipt + */ + +/** + * @implements {API.ReceiptsStorage} + */ +export class ReceiptsStorage { + constructor() { + /** @type {Map} */ + this.items = new Map() + } + + /** + * @param {Receipt} record + * @returns {Promise>} + */ + async put(record) { + this.items.set(record.ran.link().toString(), record) + + return Promise.resolve({ + ok: {}, + }) + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async get(link) { + const record = this.items.get(link.toString()) + if (!record) { + return { + error: new RecordNotFound('not found'), + } + } + return { + ok: record, + } + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async has(link) { + const record = this.items.get(link.toString()) + if (!record) { + return { + ok: false, + } + } + return { + ok: Boolean(record), + } + } +} diff --git a/packages/upload-api/test/storage/receipts-storage.spec.js b/packages/upload-api/test/storage/receipts-storage.spec.js new file mode 100644 index 000000000..b43c99d41 --- /dev/null +++ b/packages/upload-api/test/storage/receipts-storage.spec.js @@ -0,0 +1,3 @@ +import * as ReceiptsStorage from './receipts-storage-tests.js' +import { test } from '../test.js' +test({ 'in memory receipts storage': ReceiptsStorage.test }) diff --git a/packages/upload-api/test/storage/tasks-storage-tests.js b/packages/upload-api/test/storage/tasks-storage-tests.js new file mode 100644 index 000000000..5c51b817e --- /dev/null +++ b/packages/upload-api/test/storage/tasks-storage-tests.js @@ -0,0 +1,95 @@ +import * as API from '../../src/types.js' + +import { sha256 } from 'multiformats/hashes/sha2' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' + +import { RecordNotFoundErrorName } from '../../src/errors.js' +import { alice, registerSpace } from '../util.js' + +/** + * @type {API.Tests} + */ +export const test = { + 'should be able to store tasks, even the same': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const tasksStorage = context.tasksStorage + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const task = await blobAdd.delegate() + + const putTask0 = await tasksStorage.put(task) + assert.ok(putTask0.ok) + + // same put + const putTask1 = await tasksStorage.put(task) + assert.ok(putTask1.ok) + }, + 'should be able to get stored tasks, or check if they exist': async ( + assert, + context + ) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const tasksStorage = context.tasksStorage + + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const digest = multihash.bytes + const size = data.byteLength + + // invoke `blob/add` + const blobAdd = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + digest, + size, + }, + }, + proofs: [proof], + }) + const task = await blobAdd.delegate() + + // Get before put + const getTask0 = await tasksStorage.get(task.link()) + assert.ok(getTask0.error) + assert.equal(getTask0.error?.name, RecordNotFoundErrorName) + + // Has before put + const hasTask0 = await tasksStorage.has(task.link()) + assert.ok(!hasTask0.error) + assert.ok(!hasTask0.ok) + + // Put task + const putTask = await tasksStorage.put(task) + assert.ok(putTask.ok) + + // Get after put + const getTask1 = await tasksStorage.get(task.link()) + assert.ok(getTask1.ok) + assert.ok(getTask1.ok?.link().equals(task.link())) + + // Has after put + const hasTask1 = await tasksStorage.has(task.link()) + assert.ok(!hasTask1.error) + assert.ok(hasTask1.ok) + }, +} diff --git a/packages/upload-api/test/storage/tasks-storage.js b/packages/upload-api/test/storage/tasks-storage.js new file mode 100644 index 000000000..7dfb00014 --- /dev/null +++ b/packages/upload-api/test/storage/tasks-storage.js @@ -0,0 +1,72 @@ +import * as API from '../../src/types.js' + +import { RecordNotFound } from '../../src/errors.js' + +/** + * @typedef {import('@web3-storage/capabilities/types').StorageGetError} StorageGetError + * @typedef {import('@web3-storage/capabilities/types').StoragePutError} StoragePutError + * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink + * @typedef {import('@ucanto/interface').Invocation} Invocation + */ + +/** + * @implements {API.TasksStorage} + */ +export class TasksStorage { + constructor() { + /** @type {Map} */ + this.items = new Map() + } + + /** + * @param {Invocation} record + * @returns {Promise>} + */ + async put(record) { + this.items.set(record.cid.toString(), record) + + // TODO: store implementation + // const archiveDelegationRes = await task.archive() + // if (archiveDelegationRes.error) { + // return { + // error: archiveDelegationRes.error + // } + // } + + return Promise.resolve({ + ok: {}, + }) + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async get(link) { + const record = this.items.get(link.toString()) + if (!record) { + return { + error: new RecordNotFound(), + } + } + return { + ok: record, + } + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async has(link) { + const record = this.items.get(link.toString()) + if (!record) { + return { + ok: false, + } + } + return { + ok: Boolean(record), + } + } +} diff --git a/packages/upload-api/test/storage/tasks-storage.spec.js b/packages/upload-api/test/storage/tasks-storage.spec.js new file mode 100644 index 000000000..afc9a397b --- /dev/null +++ b/packages/upload-api/test/storage/tasks-storage.spec.js @@ -0,0 +1,3 @@ +import * as TasksStorage from './tasks-storage-tests.js' +import { test } from '../test.js' +test({ 'in memory tasks storage': TasksStorage.test }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4285e1c07..d1ac87d4b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -169,6 +169,9 @@ importers: '@web3-storage/data-segment': specifier: ^3.2.0 version: 3.2.0 + uint8arrays: + specifier: ^5.0.3 + version: 5.0.3 devDependencies: '@types/assert': specifier: ^1.5.6 @@ -400,6 +403,9 @@ importers: '@web3-storage/capabilities': specifier: workspace:^ version: link:../capabilities + '@web3-storage/content-claims': + specifier: ^4.0.4 + version: 4.0.4 '@web3-storage/did-mailto': specifier: workspace:^ version: link:../did-mailto @@ -412,6 +418,9 @@ importers: p-retry: specifier: ^5.1.2 version: 5.1.2 + uint8arrays: + specifier: ^5.0.3 + version: 5.0.3 devDependencies: '@ipld/car': specifier: ^5.1.1 @@ -443,6 +452,9 @@ importers: one-webcrypto: specifier: git://github.com/web3-storage/one-webcrypto version: github.com/web3-storage/one-webcrypto/5148cd14d5489a8ac4cd38223870e02db15a2382 + p-defer: + specifier: ^4.0.1 + version: 4.0.1 typescript: specifier: 5.2.2 version: 5.2.2 @@ -8378,7 +8390,7 @@ packages: /it-parallel@3.0.6: resolution: {integrity: sha512-i7UM7I9LTkDJw3YIqXHFAPZX6CWYzGc+X3irdNrVExI4vPazrJdI7t5OqrSVN8CONXLAunCiqaSV/zZRbQR56A==} dependencies: - p-defer: 4.0.0 + p-defer: 4.0.1 dev: true /it-pipe@2.0.5: @@ -8393,7 +8405,7 @@ packages: /it-pushable@3.2.3: resolution: {integrity: sha512-gzYnXYK8Y5t5b/BnJUr7glfQLO4U5vyb05gPx/TyTw+4Bv1zM9gFk4YsOrnulWefMewlphCjKkakFvj1y99Tcg==} dependencies: - p-defer: 4.0.0 + p-defer: 4.0.1 dev: true /it-stream-types@1.0.5: @@ -9961,6 +9973,12 @@ packages: /p-defer@4.0.0: resolution: {integrity: sha512-Vb3QRvQ0Y5XnF40ZUWW7JfLogicVh/EnA5gBIvKDJoYpeI82+1E3AlB9yOcKFS0AhHrWVnAQO39fbR0G99IVEQ==} engines: {node: '>=12'} + dev: false + + /p-defer@4.0.1: + resolution: {integrity: sha512-Mr5KC5efvAK5VUptYEIopP1bakB85k2IWXaRC0rsh1uwn1L6M0LVml8OIQ4Gudg4oyZakf7FmeRLkMMtZW1i5A==} + engines: {node: '>=12'} + dev: true /p-event@6.0.0: resolution: {integrity: sha512-Xbfxd0CfZmHLGKXH32k1JKjQYX6Rkv0UtQdaFJ8OyNcf+c0oWCeXHc1C4CX/IESZLmcvfPa5aFIO/vCr5gqtag==}