diff --git a/packages/blob-index/src/index.js b/packages/blob-index/src/index.js index 92cfe301f..164082518 100644 --- a/packages/blob-index/src/index.js +++ b/packages/blob-index/src/index.js @@ -1,2 +1,3 @@ export * as ShardedDAGIndex from './sharded-dag-index.js' export * from './digest-map.js' +export { indexShardedDAG } from './util.js' diff --git a/packages/blob-index/src/util.js b/packages/blob-index/src/util.js index bb918da35..b0a2bd45c 100644 --- a/packages/blob-index/src/util.js +++ b/packages/blob-index/src/util.js @@ -32,3 +32,19 @@ export const fromShardArchives = async (content, shards) => { } return index } + +/** + * Indexes a sharded DAG + * + * @param {import('multiformats').Link} root + * @param {import('@web3-storage/capabilities/types').CARLink[]} shards + * @param {Array>} shardIndexes + */ +export async function indexShardedDAG(root, shards, shardIndexes) { + const index = create(root) + for (const [i, shard] of shards.entries()) { + const slices = shardIndexes[i] + index.shards.set(shard.multihash, slices) + } + return await index.archive() +} diff --git a/packages/upload-client/package.json b/packages/upload-client/package.json index 2467ad36d..21688e68c 100644 --- a/packages/upload-client/package.json +++ b/packages/upload-client/package.json @@ -23,6 +23,7 @@ "test:node": "hundreds -r html -r text mocha 'test/**/!(*.browser).test.js' -n experimental-vm-modules -n no-warnings", "test:browser": "playwright-test 'test/**/!(*.node).test.js'", "mock": "run-p mock:*", + "mock:receipts-server": "PORT=9201 node test/helpers/receipts-server.js", "mock:bucket-200": "PORT=9200 STATUS=200 node test/helpers/bucket-server.js", "mock:bucket-401": "PORT=9400 STATUS=400 node test/helpers/bucket-server.js", "mock:bucket-500": "PORT=9500 STATUS=500 node test/helpers/bucket-server.js", @@ -94,6 +95,7 @@ "@types/varint": "^6.0.1", "@ucanto/principal": "^9.0.1", "@ucanto/server": "^10.0.0", + "@web3-storage/content-claims": "^5.0.0", "@web3-storage/eslint-config-w3up": "workspace:^", "assert": "^2.0.0", "blockstore-core": "^3.0.0", diff --git a/packages/upload-client/src/blob.js b/packages/upload-client/src/blob.js index 654a6606e..dd29d7b6c 100644 --- a/packages/upload-client/src/blob.js +++ b/packages/upload-client/src/blob.js @@ -2,7 +2,7 @@ import { sha256 } from 'multiformats/hashes/sha2' import { ed25519 } from '@ucanto/principal' import { conclude } from '@web3-storage/capabilities/ucan' import * as UCAN from '@web3-storage/capabilities/ucan' -import { Receipt } from '@ucanto/core' +import { Delegation, Receipt } from '@ucanto/core' import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' import * as BlobCapabilities from '@web3-storage/capabilities/blob' import * as HTTPCapabilities from '@web3-storage/capabilities/http' @@ -10,6 +10,7 @@ import { SpaceDID } from '@web3-storage/capabilities/utils' import retry, { AbortError } from 'p-retry' import { servicePrincipal, connection } from './service.js' import { REQUEST_RETRIES } from './constants.js' +import { poll } from './receipts.js' /** * @param {string} url @@ -166,7 +167,7 @@ export function createConcludeInvocation(id, serviceDid, receipt) { * The issuer needs the `blob/add` delegated capability. * @param {Blob|Uint8Array} data Blob data. * @param {import('./types.js').RequestOptions} [options] - * @returns {Promise} + * @returns {Promise} */ export async function add( { issuer, with: resource, proofs, audience }, @@ -303,7 +304,26 @@ export async function add( }) } - return multihash + // Ensure the blob has been accepted + const acceptReceipt = await poll(nextTasks.accept.task.link(), options) + + const blocks = new Map( + [...acceptReceipt.iterateIPLDBlocks()].map((block) => [ + `${block.cid}`, + block, + ]) + ) + const site = Delegation.view({ + root: /** @type {import('@ucanto/interface').UCANLink} */ ( + acceptReceipt.out.ok.site + ), + blocks, + }) + + return { + multihash, + site, + } } /** diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index 0f573e57c..77aa3f666 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -1,6 +1,5 @@ import * as PieceHasher from '@web3-storage/data-segment/multihash' import { Storefront } from '@web3-storage/filecoin-client' -import { ShardedDAGIndex } from '@web3-storage/blob-index' import * as Link from 'multiformats/link' import * as raw from 'multiformats/codecs/raw' import * as Store from './store.js' @@ -10,10 +9,12 @@ import * as Upload from './upload.js' import * as UnixFS from './unixfs.js' import * as CAR from './car.js' import { ShardingStream, defaultFileComparator } from './sharding.js' -import { codec as carCodec } from '@ucanto/transport/car' +import { indexShardedDAG } from '@web3-storage/blob-index' export { Blob, Index, Store, Upload, UnixFS, CAR } export * from './sharding.js' +export { receiptsEndpoint } from './service.js' +export * as Receipt from './receipts.js' /** * Uploads a file to the service and returns the root data CID for the @@ -144,9 +145,9 @@ async function uploadBlockStream( async transform(car, controller) { const bytes = new Uint8Array(await car.arrayBuffer()) // Invoke blob/add and write bytes to write target - const multihash = await Blob.add(conf, bytes, options) + const { multihash } = await Blob.add(conf, bytes, options) // Should this be raw instead? - const cid = Link.create(carCodec.code, multihash) + const cid = Link.create(CAR.code, multihash) let piece if (pieceHasher) { const multihashDigest = await pieceHasher.digest(bytes) @@ -199,20 +200,15 @@ async function uploadBlockStream( /* c8 ignore next */ if (!root) throw new Error('missing root CID') - const index = ShardedDAGIndex.create(root) - for (const [i, shard] of shards.entries()) { - const slices = shardIndexes[i] - index.shards.set(shard.multihash, slices) - } - const indexBytes = await index.archive() + const indexBytes = await indexShardedDAG(root, shards, shardIndexes) /* c8 ignore next 3 */ if (!indexBytes.ok) { throw new Error('failed to archive DAG index', { cause: indexBytes.error }) } // Store the index in the space - const indexDigest = await Blob.add(conf, indexBytes.ok, options) - const indexLink = Link.create(carCodec.code, indexDigest) + const { multihash } = await Blob.add(conf, indexBytes.ok, options) + const indexLink = Link.create(CAR.code, multihash) // Register the index with the service await Index.add(conf, indexLink, options) diff --git a/packages/upload-client/src/receipts.js b/packages/upload-client/src/receipts.js new file mode 100644 index 000000000..3851a7cc2 --- /dev/null +++ b/packages/upload-client/src/receipts.js @@ -0,0 +1,119 @@ +import retry, { AbortError } from 'p-retry' +import { CAR } from '@ucanto/transport' +import { receiptsEndpoint } from './service.js' +import { REQUEST_RETRIES } from './constants.js' + +export class ReceiptNotFound extends Error { + /** + * @param {import('multiformats').UnknownLink} taskCid + */ + constructor(taskCid) { + super() + this.taskCid = taskCid + } + + /* c8 ignore start */ + get reason() { + return `receipt not found for task ${this.taskCid} in the indexed workflow` + } + /* c8 ignore end */ + + get name() { + return 'ReceiptNotFound' + } +} + +export class ReceiptMissing extends Error { + /** + * @param {import('multiformats').UnknownLink} taskCid + */ + constructor(taskCid) { + super() + this.taskCid = taskCid + } + + /* c8 ignore start */ + get reason() { + return `receipt missing for task ${this.taskCid}` + } + /* c8 ignore end */ + + get name() { + return 'ReceiptMissing' + } +} + +/** + * Polls for a receipt for an executed task by its CID. + * + * @param {import('multiformats').UnknownLink} taskCid + * @param {import('./types.js').RequestOptions} [options] + * @returns {Promise} + */ +export async function poll(taskCid, options = {}) { + return await retry( + async () => { + const res = await get(taskCid, options) + if (res.error) { + // @ts-ignore + if (res.error.name === 'ReceiptNotFound') { + // throw an error that will cause `p-retry` to retry with + throw res.error + } else { + throw new AbortError( + new Error('failed to fetch blob/accept receipt', { + cause: res.error, + }) + ) + } + } + return res.ok + }, + { + onFailedAttempt: console.warn, + /* c8 ignore next */ + retries: options.retries ?? REQUEST_RETRIES, + } + ) +} + +/** + * Get a receipt for an executed task by its CID. + * + * @param {import('multiformats').UnknownLink} taskCid + * @param {import('./types.js').RequestOptions} [options] + * @returns {Promise>} + */ +async function get(taskCid, options = {}) { + // Fetch receipt from endpoint + const url = new URL( + taskCid.toString(), + options.receiptsEndpoint ?? receiptsEndpoint + ) + const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis) + const workflowResponse = await fetchReceipt(url) + /* c8 ignore start */ + if (workflowResponse.status === 404) { + return { + error: new ReceiptNotFound(taskCid), + } + } + /* c8 ignore stop */ + // Get receipt from Message Archive + const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer()) + // Decode message + const agentMessage = await CAR.request.decode({ + body: agentMessageBytes, + headers: {}, + }) + // Get receipt from the potential multiple receipts in the message + const receipt = agentMessage.receipts.get(taskCid.toString()) + if (!receipt) { + return { + error: new ReceiptMissing(taskCid), + } + } + return { + ok: receipt, + } +} diff --git a/packages/upload-client/src/service.js b/packages/upload-client/src/service.js index 68ff213c5..945671a31 100644 --- a/packages/upload-client/src/service.js +++ b/packages/upload-client/src/service.js @@ -4,6 +4,7 @@ import * as DID from '@ipld/dag-ucan/did' export const serviceURL = new URL('https://up.web3.storage') export const servicePrincipal = DID.parse('did:web:web3.storage') +export const receiptsEndpoint = 'https://up.web3.storage/receipt/' /** @type {import('@ucanto/interface').ConnectionView} */ export const connection = connect({ diff --git a/packages/upload-client/src/types.ts b/packages/upload-client/src/types.ts index bb6d8f809..23c3b2052 100644 --- a/packages/upload-client/src/types.ts +++ b/packages/upload-client/src/types.ts @@ -2,7 +2,13 @@ import type { FetchOptions as IpfsUtilsFetchOptions, ProgressStatus as XHRProgressStatus, } from 'ipfs-utils/src/types.js' -import { Link, UnknownLink, Version, MultihashHasher } from 'multiformats' +import { + MultihashDigest, + Link, + UnknownLink, + Version, + MultihashHasher, +} from 'multiformats' import { Block } from '@ipld/unixfs' import { ServiceMethod, @@ -12,6 +18,8 @@ import { DID, Principal, Failure, + Delegation, + Capabilities, } from '@ucanto/interface' import { UCANConclude, @@ -307,6 +315,7 @@ export interface RequestOptions UploadProgressTrackable { fetch?: typeof globalThis.fetch nonce?: string + receiptsEndpoint?: string } export interface ListRequestOptions extends RequestOptions, Pageable {} @@ -374,3 +383,8 @@ export interface FileLike extends BlobLike { */ name: string } + +export interface BlobAddOk { + multihash: MultihashDigest + site: Delegation +} diff --git a/packages/upload-client/test/blob.test.js b/packages/upload-client/test/blob.test.js index c9ee70cee..06f44a1a8 100644 --- a/packages/upload-client/test/blob.test.js +++ b/packages/upload-client/test/blob.test.js @@ -16,8 +16,11 @@ import { setupBlobAddSuccessResponse, setupBlobAdd4xxResponse, setupBlobAdd5xxResponse, + receiptsEndpoint, } from './helpers/utils.js' import { fetchWithUploadProgress } from '../src/fetch-with-upload-progress.js' +import { ReceiptNotFound } from '../src/receipts.js' +import { Assert } from '@web3-storage/content-claims/capability' describe('Blob.add', () => { it('stores bytes with the service', async () => { @@ -74,7 +77,7 @@ describe('Blob.add', () => { /** @type {import('../src/types.js').ProgressStatus[]} */ const progress = [] - const multihash = await Blob.add( + const { site, multihash } = await Blob.add( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, bytes, { @@ -84,9 +87,16 @@ describe('Blob.add', () => { progress.push(status) }, fetchWithUploadProgress, + receiptsEndpoint, } ) + assert(site) + assert.equal(site.capabilities[0].can, Assert.location.can) + // we're not verifying this as it's a mocked value + // @ts-ignore nb unknown + assert.ok(site.capabilities[0].nb.content.multihash.bytes) + assert(service.space.blob.add.called) assert.equal(service.space.blob.add.callCount, 1) assert.equal( @@ -95,12 +105,12 @@ describe('Blob.add', () => { ) assert(multihash) - assert.deepEqual(multihash.bytes, bytesHash.bytes) + assert.deepEqual(multihash, bytesHash) // make sure it can also work without fetchWithUploadProgress /** @type {import('../src/types.js').ProgressStatus[]} */ let progressWithoutUploadProgress = [] - const addedWithoutUploadProgress = await Blob.add( + const { multihash: multihashWithoutUploadProgress } = await Blob.add( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, bytes, { @@ -108,9 +118,10 @@ describe('Blob.add', () => { onUploadProgress: (status) => { progressWithoutUploadProgress.push(status) }, + receiptsEndpoint, } ) - assert.deepEqual(addedWithoutUploadProgress.bytes, bytesHash.bytes) + assert.deepEqual(multihashWithoutUploadProgress, bytesHash) assert.equal( progressWithoutUploadProgress.reduce( (max, { loaded }) => Math.max(max, loaded), @@ -177,6 +188,126 @@ describe('Blob.add', () => { ) }) + it('throws when it cannot get blob/accept receipt', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + space: { + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { + connection, + retries: 0, + receiptsEndpoint: 'http://localhost:9201/failed/', + } + ), + { + message: 'failed to fetch blob/accept receipt', + } + ) + }) + + it('throws when the blob/accept receipt is not yet available', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + space: { + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { + connection, + retries: 0, + receiptsEndpoint: 'http://localhost:9201/unavailable/', + } + ), + ReceiptNotFound + ) + }) + it('throws for bucket URL client error 4xx', async () => { const space = await Signer.generate() const agent = await Signer.generate() @@ -727,7 +858,7 @@ describe('Blob.remove', () => { Blob.remove( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, bytesHash, - { connection } + { connection, receiptsEndpoint } ), { message: 'failed space/blob/remove invocation' } ) diff --git a/packages/upload-client/test/helpers/receipts-server.js b/packages/upload-client/test/helpers/receipts-server.js new file mode 100644 index 000000000..cc74b77ba --- /dev/null +++ b/packages/upload-client/test/helpers/receipts-server.js @@ -0,0 +1,68 @@ +import { createServer } from 'http' +import { parseLink } from '@ucanto/server' +import * as Signer from '@ucanto/principal/ed25519' +import { Receipt, Message } from '@ucanto/core' +import * as CAR from '@ucanto/transport/car' +import { Assert } from '@web3-storage/content-claims/capability' +import { randomCAR } from './random.js' + +const port = process.env.PORT ?? 9201 + +/** + * @param {string} taskCid + */ +const generateReceipt = async (taskCid) => { + const issuer = await Signer.generate() + const content = (await randomCAR(128)).cid + const locationClaim = await Assert.location.delegate({ + issuer, + audience: issuer, + with: issuer.toDIDKey(), + nb: { + content, + location: ['http://localhost'], + }, + expiration: Infinity, + }) + + const receipt = await Receipt.issue({ + issuer, + fx: { + fork: [locationClaim], + }, + ran: parseLink(taskCid), + result: { + ok: { + site: locationClaim.link(), + }, + }, + }) + + const message = await Message.build({ + receipts: [receipt], + }) + return CAR.request.encode(message).body +} + +const server = createServer(async (req, res) => { + res.setHeader('Access-Control-Allow-Origin', '*') + res.setHeader('Access-Control-Allow-Methods', '*') + res.setHeader('Access-Control-Allow-Headers', '*') + + const taskCid = req.url?.split('/')[1] ?? '' + if (taskCid === 'unavailable') { + res.writeHead(404) + res.end() + } else if (taskCid === 'failed') { + const body = await generateReceipt((await randomCAR(128)).cid.toString()) + res.writeHead(200) + res.end(body) + } else { + const body = await generateReceipt(taskCid) + res.writeHead(200) + res.end(body) + } +}) + +server.listen(port, () => console.log(`Listening on :${port}`)) +process.on('SIGTERM', () => process.exit(0)) diff --git a/packages/upload-client/test/helpers/utils.js b/packages/upload-client/test/helpers/utils.js index c7122bfd9..e24bef3d3 100644 --- a/packages/upload-client/test/helpers/utils.js +++ b/packages/upload-client/test/helpers/utils.js @@ -7,6 +7,8 @@ import { createConcludeInvocation } from '../../../upload-client/src/blob.js' export const validateAuthorization = () => ({ ok: {} }) +export const receiptsEndpoint = 'http://localhost:9201' + export const setupBlobAddSuccessResponse = async function ( // @ts-ignore options, diff --git a/packages/upload-client/test/index.test.js b/packages/upload-client/test/index.test.js index cdb3aa935..9dc5bb973 100644 --- a/packages/upload-client/test/index.test.js +++ b/packages/upload-client/test/index.test.js @@ -19,6 +19,7 @@ import { mockService } from './helpers/mocks.js' import { validateAuthorization, setupBlobAddSuccessResponse, + receiptsEndpoint, } from './helpers/utils.js' import { blockEncodingLength, @@ -145,6 +146,7 @@ describe('uploadFile', () => { onShardStored: (meta) => { carCID = meta.cid }, + receiptsEndpoint, } ) @@ -262,6 +264,7 @@ describe('uploadFile', () => { // so we actually end up with a shard for each block - 5 CARs! shardSize: 1024 * 1024 * 2 + 1, onShardStored: (meta) => carCIDs.push(meta.cid), + receiptsEndpoint, } ) @@ -339,6 +342,7 @@ describe('uploadFile', () => { file, { connection, + receiptsEndpoint, } ) ) @@ -461,6 +465,7 @@ describe('uploadDirectory', () => { onShardStored: (meta) => { carCID = meta.cid }, + receiptsEndpoint, } ) @@ -573,6 +578,7 @@ describe('uploadDirectory', () => { connection, shardSize: 500_057, // should end up with 2 CAR files onShardStored: (meta) => carCIDs.push(meta.cid), + receiptsEndpoint, } ) @@ -686,7 +692,10 @@ describe('uploadDirectory', () => { const uploadedDirUnsorted = await uploadDirectory( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, unsortedFiles, - { connection: uploadServiceForUnordered.connection } + { + connection: uploadServiceForUnordered.connection, + receiptsEndpoint, + } ) const uploadServiceForOrdered = createSimpleMockUploadServer() @@ -694,7 +703,10 @@ describe('uploadDirectory', () => { const uploadedDirSorted = await uploadDirectory( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, [...unsortedFiles].sort(defaultFileComparator), - { connection: uploadServiceForOrdered.connection } + { + connection: uploadServiceForOrdered.connection, + receiptsEndpoint, + } ) // upload/add roots should be the same. @@ -735,7 +747,11 @@ describe('uploadDirectory', () => { const uploadedDirCustomOrder = await uploadDirectory( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, [...unsortedFiles], - { connection: uploadServiceForCustomOrder.connection, customOrder: true } + { + connection: uploadServiceForCustomOrder.connection, + customOrder: true, + receiptsEndpoint, + } ) const shardsForCustomOrder = uploadServiceForCustomOrder.invocations .flatMap((i) => @@ -884,6 +900,7 @@ describe('uploadCAR', () => { connection, onShardStored: (meta) => carCIDs.push(meta.cid), shardSize, + receiptsEndpoint, } ) @@ -1011,6 +1028,7 @@ describe('uploadCAR', () => { onShardStored: (meta) => { if (meta.piece) pieceCIDs.push(meta.piece) }, + receiptsEndpoint, } ) diff --git a/packages/w3up-client/package.json b/packages/w3up-client/package.json index ef99e333c..aefa188df 100644 --- a/packages/w3up-client/package.json +++ b/packages/w3up-client/package.json @@ -133,10 +133,12 @@ }, "devDependencies": { "@ipld/car": "^5.1.1", + "@ipld/unixfs": "^2.1.1", "@types/assert": "^1.5.6", "@types/mocha": "^10.0.1", "@types/node": "^20.8.4", "@ucanto/server": "^10.0.0", + "@web3-storage/content-claims": "^4.0.4", "@web3-storage/data-segment": "^5.0.0", "@web3-storage/eslint-config-w3up": "workspace:^", "@web3-storage/upload-api": "workspace:^", diff --git a/packages/w3up-client/src/client.js b/packages/w3up-client/src/client.js index 3b524fb2c..4139bf373 100644 --- a/packages/w3up-client/src/client.js +++ b/packages/w3up-client/src/client.js @@ -2,6 +2,7 @@ import { uploadFile, uploadDirectory, uploadCAR, + Receipt, } from '@web3-storage/upload-client' import { Blob as BlobCapabilities, @@ -9,7 +10,6 @@ import { Upload as UploadCapabilities, Filecoin as FilecoinCapabilities, } from '@web3-storage/capabilities' -import { CAR } from '@ucanto/transport' import { Base } from './base.js' import * as Account from './account.js' import { Space } from './space.js' @@ -171,28 +171,8 @@ export class Client extends Base { * @param {import('multiformats').UnknownLink} taskCid */ async getReceipt(taskCid) { - // Fetch receipt from endpoint - const workflowResponse = await fetch( - new URL(taskCid.toString(), this._receiptsEndpoint) - ) - /* c8 ignore start */ - if (!workflowResponse.ok) { - throw new Error( - `no receipt available for requested task ${taskCid.toString()}` - ) - } - /* c8 ignore stop */ - // Get receipt from Message Archive - const agentMessageBytes = new Uint8Array( - await workflowResponse.arrayBuffer() - ) - // Decode message - const agentMessage = await CAR.request.decode({ - body: agentMessageBytes, - headers: {}, - }) - // Get receipt from the potential multiple receipts in the message - return agentMessage.receipts.get(taskCid.toString()) + const receiptsEndpoint = new URL(this._receiptsEndpoint).toString() + return Receipt.poll(taskCid, { receiptsEndpoint }) } /** diff --git a/packages/w3up-client/src/service.js b/packages/w3up-client/src/service.js index 836c37bde..adf19d84d 100644 --- a/packages/w3up-client/src/service.js +++ b/packages/w3up-client/src/service.js @@ -1,6 +1,7 @@ import * as client from '@ucanto/client' import { CAR, HTTP } from '@ucanto/transport' import * as DID from '@ipld/dag-ucan/did' +import { receiptsEndpoint } from '@web3-storage/upload-client' export const accessServiceURL = new URL('https://up.web3.storage') export const accessServicePrincipal = DID.parse('did:web:web3.storage') @@ -45,4 +46,4 @@ export const serviceConf = { filecoin: filecoinServiceConnection, } -export const receiptsEndpoint = 'https://up.web3.storage/receipt/' +export { receiptsEndpoint } diff --git a/packages/w3up-client/test/capability/blob.test.js b/packages/w3up-client/test/capability/blob.test.js index 441b62ae4..2ab163812 100644 --- a/packages/w3up-client/test/capability/blob.test.js +++ b/packages/w3up-client/test/capability/blob.test.js @@ -3,6 +3,7 @@ import { AgentData } from '@web3-storage/access/agent' import { randomBytes } from '../helpers/random.js' import { Client } from '../../src/client.js' import * as Test from '../test.js' +import { receiptsEndpoint } from '../helpers/utils.js' export const BlobClient = Test.withContext({ 'should store a blob': async ( @@ -32,7 +33,9 @@ export const BlobClient = Test.withContext({ const bytes = await randomBytes(128) const bytesHash = await sha256.digest(bytes) - const multihash = await alice.capability.blob.add(new Blob([bytes])) + const { multihash } = await alice.capability.blob.add(new Blob([bytes]), { + receiptsEndpoint, + }) // TODO we should check blobsStorage as well assert.deepEqual( @@ -42,7 +45,7 @@ export const BlobClient = Test.withContext({ } ) - assert.deepEqual(multihash, bytesHash) + assert.deepEqual(multihash.bytes, bytesHash.bytes) }, 'should list stored blobs': async ( assert, @@ -71,8 +74,10 @@ export const BlobClient = Test.withContext({ const bytes = await randomBytes(128) const bytesHash = await sha256.digest(bytes) - const multihash = await alice.capability.blob.add(new Blob([bytes])) - assert.deepEqual(multihash, bytesHash) + const { multihash } = await alice.capability.blob.add(new Blob([bytes]), { + receiptsEndpoint, + }) + assert.deepEqual(multihash.bytes, bytesHash.bytes) const { results: [entry], @@ -107,7 +112,9 @@ export const BlobClient = Test.withContext({ }) const bytes = await randomBytes(128) - const multihash = await alice.capability.blob.add(new Blob([bytes])) + const { multihash } = await alice.capability.blob.add(new Blob([bytes]), { + receiptsEndpoint, + }) const result = await alice.capability.blob.remove(multihash) assert.ok(result.ok) diff --git a/packages/w3up-client/test/capability/index.test.js b/packages/w3up-client/test/capability/index.test.js index bd328b677..6a812330f 100644 --- a/packages/w3up-client/test/capability/index.test.js +++ b/packages/w3up-client/test/capability/index.test.js @@ -1,14 +1,16 @@ import { ShardedDAGIndex } from '@web3-storage/blob-index' +import { codec as CAR } from '@ucanto/transport/car' import * as Link from 'multiformats/link' import * as Result from '../../src/result.js' import { randomCAR } from '../helpers/random.js' import * as Test from '../test.js' +import { receiptsEndpoint } from '../helpers/utils.js' export const IndexClient = Test.withContext({ add: { 'should register an index': async ( assert, - { client: alice, service, provisionsStorage, uploadTable } + { client: alice, service, provisionsStorage } ) => { const car = await randomCAR(128) @@ -27,11 +29,15 @@ export const IndexClient = Test.withContext({ const index = ShardedDAGIndex.create(car.cid) const indexBytes = Result.unwrap(await index.archive()) - const indexDigest = await alice.capability.blob.add( - new Blob([indexBytes]) + const { multihash } = await alice.capability.blob.add( + new Blob([indexBytes]), + { + receiptsEndpoint, + } ) + assert.ok( - await alice.capability.index.add(Link.create(0x0202, indexDigest)) + await alice.capability.index.add(Link.create(CAR.code, multihash)) ) }, }, diff --git a/packages/w3up-client/test/capability/usage.test.js b/packages/w3up-client/test/capability/usage.test.js index aca15b60a..13d598dd3 100644 --- a/packages/w3up-client/test/capability/usage.test.js +++ b/packages/w3up-client/test/capability/usage.test.js @@ -1,6 +1,7 @@ import { AgentData } from '@web3-storage/access/agent' import { Client } from '../../src/client.js' import * as Test from '../test.js' +import { receiptsEndpoint } from '../helpers/utils.js' export const UsageClient = Test.withContext({ report: { @@ -29,7 +30,9 @@ export const UsageClient = Test.withContext({ }) const content = new Blob(['hello world']) - await alice.uploadFile(content) + await alice.uploadFile(content, { + receiptsEndpoint, + }) const period = { from: new Date(0), to: new Date() } diff --git a/packages/w3up-client/test/client.test.js b/packages/w3up-client/test/client.test.js index fda4a546a..b5ba8f8fe 100644 --- a/packages/w3up-client/test/client.test.js +++ b/packages/w3up-client/test/client.test.js @@ -6,6 +6,7 @@ import { toCAR } from './helpers/car.js' import { File } from './helpers/shims.js' import { Client } from '../src/client.js' import * as Test from './test.js' +import { receiptsEndpoint } from './helpers/utils.js' /** @type {Test.Suite} */ export const testClient = { @@ -45,6 +46,7 @@ export const testClient = { onShardStored: (meta) => { carCID = meta.cid }, + receiptsEndpoint, }) assert.deepEqual(await uploadTable.exists(space.did(), dataCID), { @@ -124,6 +126,7 @@ export const testClient = { onShardStored: (meta) => { carCID = meta.cid }, + receiptsEndpoint, }) assert.deepEqual(await uploadTable.exists(space.did(), dataCID), { @@ -166,6 +169,7 @@ export const testClient = { onShardStored: (meta) => { carCID = meta.cid }, + receiptsEndpoint, }) assert.deepEqual(await uploadTable.exists(space.did(), root), { @@ -184,7 +188,7 @@ export const testClient = { ) }, }), - getRecipt: Test.withContext({ + getReceipt: Test.withContext({ 'should find a receipt': async (assert, { connection }) => { const taskCid = parseLink( 'bafyreibo6nqtvp67daj7dkmeb5c2n6bg5bunxdmxq3lghtp3pmjtzpzfma' @@ -403,15 +407,18 @@ export const testClient = { consumer: space.did(), }) - const root = await alice.uploadFile(new Blob([bytes])) + const content = new Blob([bytes]) + const fileLink = await alice.uploadFile(content, { + receiptsEndpoint, + }) - assert.deepEqual(await uploadTable.exists(space.did(), root), { + assert.deepEqual(await uploadTable.exists(space.did(), fileLink), { ok: true, }) assert.deepEqual( await alice - .remove(root, { shards: true }) + .remove(fileLink, { shards: true }) .then((ok) => ({ ok: {} })) .catch((error) => { error @@ -419,7 +426,7 @@ export const testClient = { { ok: {} } ) - assert.deepEqual(await uploadTable.exists(space.did(), root), { + assert.deepEqual(await uploadTable.exists(space.did(), fileLink), { ok: false, }) }, @@ -450,15 +457,18 @@ export const testClient = { consumer: space.did(), }) - const root = await alice.uploadFile(new Blob([bytes])) + const content = new Blob([bytes]) + const fileLink = await alice.uploadFile(content, { + receiptsEndpoint, + }) - assert.deepEqual(await uploadTable.exists(space.did(), root), { + assert.deepEqual(await uploadTable.exists(space.did(), fileLink), { ok: true, }) assert.deepEqual( await alice - .remove(root) + .remove(fileLink) .then((ok) => ({ ok: {} })) .catch((error) => { error @@ -466,7 +476,7 @@ export const testClient = { { ok: {} } ) - assert.deepEqual(await uploadTable.exists(space.did(), root), { + assert.deepEqual(await uploadTable.exists(space.did(), fileLink), { ok: false, }) }, @@ -524,9 +534,12 @@ export const testClient = { consumer: space.did(), }) - const root = await alice.uploadFile(new Blob(bytesArray)) + const content = new Blob(bytesArray) + const fileLink = await alice.uploadFile(content, { + receiptsEndpoint, + }) - const upload = await uploadTable.get(space.did(), root) + const upload = await uploadTable.get(space.did(), fileLink) const shard = upload.ok?.shards?.[0] if (!shard) { @@ -538,7 +551,7 @@ export const testClient = { assert.deepEqual( await alice - .remove(root, { shards: true }) + .remove(fileLink, { shards: true }) .then(() => ({ ok: {} })) .catch((error) => ({ error })), { ok: {} } diff --git a/packages/w3up-client/test/helpers/receipts-server.js b/packages/w3up-client/test/helpers/receipts-server.js index 29a414ac6..4664f9a0e 100644 --- a/packages/w3up-client/test/helpers/receipts-server.js +++ b/packages/w3up-client/test/helpers/receipts-server.js @@ -2,29 +2,67 @@ import { createServer } from 'http' import fs from 'fs' import path from 'path' import { fileURLToPath } from 'url' +import { parseLink } from '@ucanto/server' +import * as Signer from '@ucanto/principal/ed25519' +import { Receipt, Message } from '@ucanto/core' +import * as CAR from '@ucanto/transport/car' +import { Assert } from '@web3-storage/content-claims/capability' +import { randomCAR } from './random.js' const port = process.env.PORT ?? 9201 const __dirname = path.dirname(fileURLToPath(import.meta.url)) const fixtureName = process.env.FIXTURE_NAME || 'workflow.car' +const fixtureContent = fs.readFileSync( + path.resolve(`${__dirname}`, '..', 'fixtures', fixtureName) +) -const server = createServer((req, res) => { +const server = createServer(async (req, res) => { res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Methods', '*') res.setHeader('Access-Control-Allow-Headers', '*') - fs.readFile( - path.resolve(`${__dirname}`, '..', 'fixtures', fixtureName), - (error, content) => { - if (error) { - res.writeHead(500) - res.end() - } - res.writeHead(200, { - 'Content-disposition': 'attachment; filename=' + fixtureName, - }) - res.end(content) - } - ) + const taskCid = req.url?.split('/')[1] ?? '' + if ( + taskCid === 'bafyreibo6nqtvp67daj7dkmeb5c2n6bg5bunxdmxq3lghtp3pmjtzpzfma' + ) { + res.writeHead(200, { + 'Content-disposition': 'attachment; filename=' + fixtureName, + }) + return res.end(fixtureContent) + } + + const issuer = await Signer.generate() + const content = (await randomCAR(128)).cid + const locationClaim = await Assert.location.delegate({ + issuer, + audience: issuer, + with: issuer.toDIDKey(), + nb: { + content, + location: ['http://localhost'], + }, + expiration: Infinity, + }) + + const receipt = await Receipt.issue({ + issuer, + fx: { + fork: [locationClaim], + }, + ran: parseLink(taskCid), + result: { + ok: { + site: locationClaim.link(), + }, + }, + }) + + const message = await Message.build({ + receipts: [receipt], + }) + const request = CAR.request.encode(message) + res.writeHead(200) + res.end(request.body) }) server.listen(port, () => console.log(`Listening on :${port}`)) diff --git a/packages/w3up-client/test/helpers/utils.js b/packages/w3up-client/test/helpers/utils.js index 7d4c4ccca..a14f2f133 100644 --- a/packages/w3up-client/test/helpers/utils.js +++ b/packages/w3up-client/test/helpers/utils.js @@ -1,5 +1,5 @@ -import { Receipt } from '@ucanto/core' import { conclude } from '@web3-storage/capabilities/ucan' +import { Receipt } from '@ucanto/core' import * as Server from '@ucanto/server' import { UCAN } from '@web3-storage/capabilities' import * as HTTP from '@web3-storage/capabilities/http' @@ -9,6 +9,8 @@ import * as Types from '../../src/types.js' export const validateAuthorization = () => ({ ok: {} }) +export const receiptsEndpoint = 'http://localhost:9201' + /** * Utility function that creates a delegation from account to agent and an * attestation from service to proof it. Proofs can be used to invoke any diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a63ac2398..ba0ae0d17 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -565,6 +565,9 @@ importers: '@ucanto/server': specifier: ^10.0.0 version: 10.0.0 + '@web3-storage/content-claims': + specifier: ^5.0.0 + version: 5.0.0 '@web3-storage/eslint-config-w3up': specifier: workspace:^ version: link:../eslint-config-w3up @@ -638,6 +641,9 @@ importers: '@ipld/car': specifier: ^5.1.1 version: 5.3.0 + '@ipld/unixfs': + specifier: ^2.1.1 + version: 2.2.0 '@types/assert': specifier: ^1.5.6 version: 1.5.10 @@ -650,6 +656,9 @@ importers: '@ucanto/server': specifier: ^10.0.0 version: 10.0.0 + '@web3-storage/content-claims': + specifier: ^4.0.4 + version: 4.0.5 '@web3-storage/data-segment': specifier: ^5.0.0 version: 5.1.0 @@ -3405,7 +3414,6 @@ packages: multiformats: 11.0.2 protobufjs: 7.2.6 rabin-rs: 2.1.0 - dev: false /@istanbuljs/schema@0.1.3: resolution: {integrity: sha512-ZXRY4jNvVgSVQ8DL3LTcakaAtXwTVUxE81hslsyD2AtoXW/wVob10HkOJ1X/pAlcI7D+2YoZKg5do8G/w6RYgA==} @@ -3553,7 +3561,6 @@ packages: dependencies: '@multiformats/murmur3': 2.1.8 murmurhash3js-revisited: 3.0.0 - dev: false /@pnpm/config.env-replace@1.1.0: resolution: {integrity: sha512-htyl8TWnKL7K/ESFa1oW2UB5lVDxuF5DpM7tBi6Hu2LNL3mWkIzNLG6N4zoCUP1lCKNxWy/3iu8mS8MvToGd6w==} @@ -4334,7 +4341,6 @@ packages: dependencies: '@ucanto/core': 10.0.1 '@ucanto/interface': 10.0.1 - dev: false /@ucanto/core@10.0.1: resolution: {integrity: sha512-1BfUaJu0/c9Rl/WdZSDbScJJLsPsPe1g4ynl5kubUj3xDD/lyp/Q12PQVQ2X7hDiWwkpwmxCkRMkOxwc70iNKQ==} @@ -4443,6 +4449,17 @@ packages: web-streams-polyfill: 3.3.3 dev: true + /@web3-storage/content-claims@4.0.5: + resolution: {integrity: sha512-+WpCkTN8aRfUCrCm0kOMZad+FRnFymVDFvS6/+PJMPGP17cci1/c5lqYdrjFV+5MkhL+BkUJVtRTx02G31FHmQ==} + dependencies: + '@ucanto/client': 9.0.1 + '@ucanto/interface': 10.0.1 + '@ucanto/server': 10.0.0 + '@ucanto/transport': 9.1.1 + carstream: 1.1.1 + multiformats: 12.1.3 + dev: true + /@web3-storage/content-claims@5.0.0: resolution: {integrity: sha512-HJFRFsR0qHCe0cOERsb3AjAxxzohYMMoIWaGJgrShDycnl6yqXHrGcdua1BWUDu5pmvKzwD9D7VmI8aSfrCcRA==} dependencies: @@ -4452,7 +4469,6 @@ packages: '@ucanto/transport': 9.1.1 carstream: 2.1.0 multiformats: 13.1.0 - dev: false /@web3-storage/data-segment@3.2.0: resolution: {integrity: sha512-SM6eNumXzrXiQE2/J59+eEgCRZNYPxKhRoHX2QvV3/scD4qgcf4g+paWBc3UriLEY1rCboygGoPsnqYJNyZyfA==} @@ -4644,7 +4660,6 @@ packages: /actor@2.3.1: resolution: {integrity: sha512-ST/3wnvcP2tKDXnum7nLCLXm+/rsf8vPocXH2Fre6D8FQwNkGDd4JEitBlXj007VQJfiGYRQvXqwOBZVi+JtRg==} - dev: false /address@1.2.2: resolution: {integrity: sha512-4B/qKCfeE/ODUaAUpSwfzazo5x29WD4r3vXiWsB7I2mSDAihwEqKO+g8GELZUQSSAo5e1XTYh3ZVfLyxBc12nA==} @@ -5262,13 +5277,20 @@ packages: redeyed: 2.1.1 dev: true + /carstream@1.1.1: + resolution: {integrity: sha512-cgn3TqHo6SPsHBTfM5QgXngv6HtwgO1bKCHcdS35vBrweLcYrIG/+UboCbvnIGA0k8NtAYl/DvDdej/9pZGZxQ==} + dependencies: + '@ipld/dag-cbor': 9.2.0 + multiformats: 12.1.3 + uint8arraylist: 2.4.8 + dev: true + /carstream@2.1.0: resolution: {integrity: sha512-4kYIT1Y+GW/+o6wxS2tZlKnnINcgm4ceODBmyoLNaiQ17G2FNmzvUnQnVQkugC4NORTMCzD6KZEMT534XMJ4Yw==} dependencies: '@ipld/dag-cbor': 9.2.0 multiformats: 13.1.0 uint8arraylist: 2.4.8 - dev: false /cborg@4.2.0: resolution: {integrity: sha512-q6cFW5m3KxfP/9xGI3yGLaC1l5DP6DWM9IvjiJojnIwohL5CQDl02EXViPV852mOfQo+7PJGPN01MI87vFGzyA==} @@ -10966,7 +10988,6 @@ packages: /rabin-rs@2.1.0: resolution: {integrity: sha512-5y72gAXPzIBsAMHcpxZP8eMDuDT98qMP1BqSDHRbHkJJXEgWIN1lA47LxUqzsK6jknOJtgfkQr9v+7qMlFDm6g==} - dev: false /randombytes@2.1.0: resolution: {integrity: sha512-vYl3iOX+4CKUWuxGi9Ukhie6fsqXqS9FE2Zaic4tNFD2N2QQaXOMFbuKK4QmDHC0JO6B1Zp41J0LpT0oR68amQ==} @@ -12537,7 +12558,6 @@ packages: resolution: {integrity: sha512-vc1PlGOzglLF0eae1M8mLRTBivsvrGsdmJ5RbK3e+QRvRLOZfZhQROTwH/OfyF3+ZVUg9/8hE8bmKP2CvP9quQ==} dependencies: uint8arrays: 5.0.3 - dev: false /uint8arrays@4.0.10: resolution: {integrity: sha512-AnJNUGGDJAgFw/eWu/Xb9zrVKEGlwJJCaeInlf3BkecE/zcTobk5YXYIPNQJO1q5Hh1QZrQQHf0JvcHqz2hqoA==}