diff --git a/packages/upload-client/src/blob.js b/packages/upload-client/src/blob.js index fce4a8b91..85c475eb6 100644 --- a/packages/upload-client/src/blob.js +++ b/packages/upload-client/src/blob.js @@ -41,6 +41,7 @@ async function getReceipt(taskCid, options = {}) { taskCid.toString(), options.receiptsEndpoint ?? receiptsEndpoint ) + /* c8 ignore next */ const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis) const workflowResponse = await fetchReceipt(url) /* c8 ignore start */ @@ -378,6 +379,7 @@ export async function add( }, null ) + /* c8 ignore next 5 */ if (!site) { throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { cause: 'failed to get blob/accept receipt', diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index cdc5e560b..ccbf53fd1 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -4,7 +4,6 @@ import { Storefront } from '@web3-storage/filecoin-client' import { ShardedDAGIndex } from '@web3-storage/blob-index' import * as Link from 'multiformats/link' import * as raw from 'multiformats/codecs/raw' -import { create as createLink } from 'multiformats/link' import * as Store from './store.js' import * as Blob from './blob.js' import * as Index from './dag-index.js' @@ -139,45 +138,50 @@ async function uploadBlockStream( await blocks .pipeThrough(new ShardingStream(options)) .pipeThrough( - new TransformStream({ - async transform(car, controller) { - const bytes = new Uint8Array(await car.arrayBuffer()) - // Invoke blob/add and write bytes to write target - const commitment = await Blob.add(conf, bytes, options) - // @ts-ignore Element - const { multihash } = commitment.capabilities[0].nb.content - // Should this be raw instead? - const cid = createLink(carCodec.code, multihash) - let piece - if (pieceHasher) { - const multihashDigest = await pieceHasher.digest(bytes) - /** @type {import('@web3-storage/capabilities/types').PieceLink} */ - piece = createLink(raw.code, multihashDigest) - // Invoke filecoin/offer for data - const result = await Storefront.filecoinOffer( - { - issuer: conf.issuer, - audience: conf.audience, - // Resource of invocation is the issuer did for being self issued - with: conf.issuer.did(), - proofs: conf.proofs, - }, - cid, - piece, - options - ) + /** @type {TransformStream} */ + ( + new TransformStream({ + async transform(car, controller) { + const bytes = new Uint8Array(await car.arrayBuffer()) + // Invoke blob/add and write bytes to write target + const commitment = await Blob.add(conf, bytes, options) + // @ts-ignore Element + const { multihash } = commitment.capabilities[0].nb.content + // Should this be raw instead? + const cid = Link.create(carCodec.code, multihash) + let piece + if (pieceHasher) { + const multihashDigest = await pieceHasher.digest(bytes) + /** @type {import('@web3-storage/capabilities/types').PieceLink} */ + piece = Link.create(raw.code, multihashDigest) + const content = Link.create(raw.code, multihash) - if (result.out.error) { - throw new Error( - 'failed to offer piece for aggregation into filecoin deal', - { cause: result.out.error } + // Invoke filecoin/offer for data + const result = await Storefront.filecoinOffer( + { + issuer: conf.issuer, + audience: conf.audience, + // Resource of invocation is the issuer did for being self issued + with: conf.issuer.did(), + proofs: conf.proofs, + }, + content, + piece, + options ) + + if (result.out.error) { + throw new Error( + 'failed to offer piece for aggregation into filecoin deal', + { cause: result.out.error } + ) + } } - } - const { version, roots, size, slices } = car - controller.enqueue({ version, roots, size, cid, piece, slices }) - }, - }) + const { version, roots, size, slices } = car + controller.enqueue({ version, roots, size, cid, piece, slices }) + }, + }) + ) ) .pipeTo( new WritableStream({ @@ -209,8 +213,12 @@ async function uploadBlockStream( } // Store the index in the space - const indexDigest = await Blob.add(conf, indexBytes.ok, options) - const indexLink = Link.create(carCodec.code, indexDigest) + const commitment = await Blob.add(conf, indexBytes.ok, options) + const indexLink = Link.create( + carCodec.code, + // @ts-ignore Element + commitment.capabilities[0].nb.content.multihash + ) // Register the index with the service await Index.add(conf, indexLink, options) diff --git a/packages/upload-client/test/blob.test.js b/packages/upload-client/test/blob.test.js index 67e258395..e936b8522 100644 --- a/packages/upload-client/test/blob.test.js +++ b/packages/upload-client/test/blob.test.js @@ -1,4 +1,5 @@ import assert from 'assert' +import { Message } from '@ucanto/core' import { create as createLink } from 'multiformats/link' import { sha256 } from 'multiformats/hashes/sha2' import * as Client from '@ucanto/client' @@ -248,6 +249,72 @@ describe('Blob.add', () => { ) }) + it('throws when it there is no blob/accept receipt', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { + connection, + retries: 0, + fetch: async (url) => { + // @ts-ignore Parameter + if (!url.pathname) { + return await fetch(url) + } + const message = await Message.build({}) + const request = CAR.request.encode(message) + return new Response(request.body.buffer) + }, + } + ), + { + message: 'failed blob/add invocation', + } + ) + }) it('throws for bucket URL client error 4xx', async () => { const space = await Signer.generate() const agent = await Signer.generate() diff --git a/packages/upload-client/test/helpers/utils.js b/packages/upload-client/test/helpers/utils.js index 5c3f398b1..7cc37c841 100644 --- a/packages/upload-client/test/helpers/utils.js +++ b/packages/upload-client/test/helpers/utils.js @@ -16,10 +16,13 @@ export const validateAuthorization = () => ({ ok: {} }) */ export const setupGetReceipt = (content) => { // @ts-ignore Parameter - return async (url) => { + return async (url, options) => { // need to handle using regular fetch when not actually getting a receipt - if (!url.pathname) { - return await fetch(url) + if ( + !url.pathname || + (url.pathname.contains && !url.pathname.contains('/receipt/')) + ) { + return await fetch(url, options) } const taskID = url.pathname.replace('/receipt/', '') diff --git a/packages/w3up-client/test/capability/blob.test.js b/packages/w3up-client/test/capability/blob.test.js index 27e447975..fbedeba6a 100644 --- a/packages/w3up-client/test/capability/blob.test.js +++ b/packages/w3up-client/test/capability/blob.test.js @@ -40,15 +40,12 @@ export const BlobClient = Test.withContext({ fetch: setupGetReceipt(link), }) - // @ts-ignore Element - console.log(commitment.capabilities[0].nb.content) - console.log(allocationsStorage) // TODO we should check blobsStorage as well assert.deepEqual( await allocationsStorage.exists( space.did(), // @ts-ignore Element - commitment.capabilities[0].nb.content.bytes.slice(3) + commitment.capabilities[0].nb.content.multihash.bytes ), { ok: true, diff --git a/packages/w3up-client/test/capability/index.test.js b/packages/w3up-client/test/capability/index.test.js index bd328b677..4559ecf5f 100644 --- a/packages/w3up-client/test/capability/index.test.js +++ b/packages/w3up-client/test/capability/index.test.js @@ -3,6 +3,7 @@ import * as Link from 'multiformats/link' import * as Result from '../../src/result.js' import { randomCAR } from '../helpers/random.js' import * as Test from '../test.js' +import { setupGetReceipt } from '../helpers/utils.js' export const IndexClient = Test.withContext({ add: { @@ -27,11 +28,21 @@ export const IndexClient = Test.withContext({ const index = ShardedDAGIndex.create(car.cid) const indexBytes = Result.unwrap(await index.archive()) - const indexDigest = await alice.capability.blob.add( - new Blob([indexBytes]) + const commitment = await alice.capability.blob.add( + new Blob([indexBytes]), + { + fetch: setupGetReceipt(car.cid), + } ) + assert.ok( - await alice.capability.index.add(Link.create(0x0202, indexDigest)) + await alice.capability.index.add( + Link.create( + 0x0202, + // @ts-ignore Element + commitment.capabilities[0].nb.content.multihash + ) + ) ) }, }, diff --git a/packages/w3up-client/test/helpers/utils.js b/packages/w3up-client/test/helpers/utils.js index 6787ce710..b3fbe544e 100644 --- a/packages/w3up-client/test/helpers/utils.js +++ b/packages/w3up-client/test/helpers/utils.js @@ -18,10 +18,13 @@ export const validateAuthorization = () => ({ ok: {} }) */ export const setupGetReceipt = (content) => { // @ts-ignore Parameter - return async (url) => { + return async (url, options) => { // need to handle using regular fetch when not actually getting a receipt - if (!url.pathname) { - return await fetch(url) + if ( + !url.pathname || + (url.pathname.contains && !url.pathname.contains('/receipt/')) + ) { + return await fetch(url, options) } const taskID = url.pathname.replace('/receipt/', '')