From 3d0ebf1faa72f1f24292f25b9da60e76e2b74a91 Mon Sep 17 00:00:00 2001 From: Joao Andrade Date: Tue, 14 May 2024 17:14:39 +0100 Subject: [PATCH] fix: check for blob/accept receipts before blob/add is concluded --- packages/upload-client/src/blob.js | 57 ++++++++++++++++ packages/upload-client/test/blob.test.js | 68 ++++++++++++++++++++ packages/upload-client/test/helpers/utils.js | 29 ++++++++- packages/upload-client/test/index.test.js | 25 ++++++- packages/w3up-client/test/client.test.js | 19 ++++-- packages/w3up-client/test/helpers/utils.js | 31 +++++++++ 6 files changed, 221 insertions(+), 8 deletions(-) diff --git a/packages/upload-client/src/blob.js b/packages/upload-client/src/blob.js index f75585754..697a32421 100644 --- a/packages/upload-client/src/blob.js +++ b/packages/upload-client/src/blob.js @@ -1,3 +1,4 @@ +import { CAR } from '@ucanto/transport' import { sha256 } from 'multiformats/hashes/sha2' import { ed25519 } from '@ucanto/principal' import { conclude } from '@web3-storage/capabilities/ucan' @@ -26,6 +27,38 @@ function createUploadProgressHandler(url, handler) { return onUploadProgress } +// FIXME this code was copied over from w3up-client and modified to parameterise receiptsEndpoint +export const receiptsEndpoint = 'https://up.web3.storage/receipt/' +/** + * Get a receipt for an executed task by its CID. + * + * @param {import('multiformats').UnknownLink} taskCid + * @param {import('./types.js').RequestOptions} [options] + */ +async function getReceipt(taskCid, options = {}) { + // Fetch receipt from endpoint + // FIXME handle config + const url = new URL(taskCid.toString(), 'https://localhost/receipt/') + const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis) + const workflowResponse = await fetchReceipt(url) + /* c8 ignore start */ + if (!workflowResponse.ok) { + throw new Error( + `no receipt available for requested task ${taskCid.toString()}` + ) + } + /* c8 ignore stop */ + // Get receipt from Message Archive + const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer()) + // Decode message + const agentMessage = await CAR.request.decode({ + body: agentMessageBytes, + headers: {}, + }) + // Get receipt from the potential multiple receipts in the message + return agentMessage.receipts.get(taskCid.toString()) +} + // FIXME this code has been copied over from upload-api /** * @param {import('@ucanto/interface').Invocation} concludeFx @@ -298,6 +331,30 @@ export async function add( }) } + // Ensure the blob has been accepted + const acceptReceipt = await retry( + async () => { + try { + return await getReceipt(nextTasks.accept.task.link(), options) + } catch (err) { + throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { + cause: err, + }) + } + }, + { + onFailedAttempt: console.warn, + retries: options.retries ?? REQUEST_RETRIES, + } + ) + + // TODO cover this + if (!acceptReceipt) { + throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { + cause: 'failed to get blob/accept receipt', + }) + } + return multihash } diff --git a/packages/upload-client/test/blob.test.js b/packages/upload-client/test/blob.test.js index 8d6316ca5..dcaf8c5a3 100644 --- a/packages/upload-client/test/blob.test.js +++ b/packages/upload-client/test/blob.test.js @@ -16,6 +16,7 @@ import { setupBlobAddSuccessResponse, setupBlobAdd4xxResponse, setupBlobAdd5xxResponse, + setupGetReceipt, } from './helpers/utils.js' import { fetchWithUploadProgress } from '../src/fetch-with-upload-progress.js' @@ -82,6 +83,7 @@ describe('Blob.add', () => { progress.push(status) }, fetchWithUploadProgress, + fetch: setupGetReceipt, } ) @@ -106,6 +108,7 @@ describe('Blob.add', () => { onUploadProgress: (status) => { progressWithoutUploadProgress.push(status) }, + fetch: setupGetReceipt, } ) assert.deepEqual(addedWithoutUploadProgress.bytes, bytesHash.bytes) @@ -173,6 +176,71 @@ describe('Blob.add', () => { ) }) + it('throws when it cannot get the blob/accept receipt', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { + connection, + retries: 0, + fetch: async (url) => { + // @ts-ignore Parameter + if (!url.pathname) { + return await fetch(url) + } + throw new Server.Failure('boom') + }, + } + ), + { + message: 'failed blob/add invocation', + } + ) + }) + it('throws for bucket URL client error 4xx', async () => { const space = await Signer.generate() const agent = await Signer.generate() diff --git a/packages/upload-client/test/helpers/utils.js b/packages/upload-client/test/helpers/utils.js index 874d88d29..674b11165 100644 --- a/packages/upload-client/test/helpers/utils.js +++ b/packages/upload-client/test/helpers/utils.js @@ -1,4 +1,8 @@ -import { Receipt } from '@ucanto/core' +import { base32 } from 'multiformats/bases/base32' +import * as Signer from '@ucanto/principal/ed25519' +import { CID } from 'multiformats' +import { Receipt, Message } from '@ucanto/core' +import * as CAR from '@ucanto/transport/car' import * as Server from '@ucanto/server' import * as HTTP from '@web3-storage/capabilities/http' import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' @@ -7,6 +11,29 @@ import { createConcludeInvocation } from '../../../upload-client/src/blob.js' export const validateAuthorization = () => ({ ok: {} }) +// @ts-ignore Parameter +export const setupGetReceipt = async (url) => { + // need to handle using regular fetch when not actually getting a receipt + if (!url.pathname) { + return await fetch(url) + } + + const cid = url.pathname.replace('/receipt/', '') + const receipt = await Receipt.issue({ + issuer: await Signer.generate(), + // @ts-ignore Type + ran: CID.parse(cid, base32), + result: { ok: {} }, + }) + const message = await Message.build({ + // @ts-ignore + invocations: [], + receipts: [receipt], + }) + const request = CAR.request.encode(message) + return new Response(request.body.buffer) +} + export const setupBlobAddSuccessResponse = async function ( // @ts-ignore options, diff --git a/packages/upload-client/test/index.test.js b/packages/upload-client/test/index.test.js index 77173d4f6..7b0fd7178 100644 --- a/packages/upload-client/test/index.test.js +++ b/packages/upload-client/test/index.test.js @@ -18,6 +18,7 @@ import { mockService } from './helpers/mocks.js' import { validateAuthorization, setupBlobAddSuccessResponse, + setupGetReceipt, } from './helpers/utils.js' import { blockEncodingLength, @@ -127,6 +128,7 @@ describe('uploadFile', () => { onShardStored: (meta) => { carCID = meta.cid }, + fetch: setupGetReceipt, } ) @@ -225,6 +227,7 @@ describe('uploadFile', () => { // so we actually end up with a shard for each block - 5 CARs! shardSize: 1024 * 1024 * 2, onShardStored: (meta) => carCIDs.push(meta.cid), + fetch: setupGetReceipt, } ) @@ -300,6 +303,7 @@ describe('uploadFile', () => { file, { connection, + fetch: setupGetReceipt, } ) ) @@ -405,6 +409,7 @@ describe('uploadDirectory', () => { onShardStored: (meta) => { carCID = meta.cid }, + fetch: setupGetReceipt, } ) @@ -498,6 +503,7 @@ describe('uploadDirectory', () => { connection, shardSize: 500_056, // should end up with 2 CAR files onShardStored: (meta) => carCIDs.push(meta.cid), + fetch: setupGetReceipt, } ) @@ -594,7 +600,10 @@ describe('uploadDirectory', () => { const uploadedDirUnsorted = await uploadDirectory( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, unsortedFiles, - { connection: uploadServiceForUnordered.connection } + { + connection: uploadServiceForUnordered.connection, + fetch: setupGetReceipt, + } ) const uploadServiceForOrdered = createSimpleMockUploadServer() @@ -602,7 +611,10 @@ describe('uploadDirectory', () => { const uploadedDirSorted = await uploadDirectory( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, [...unsortedFiles].sort(defaultFileComparator), - { connection: uploadServiceForOrdered.connection } + { + connection: uploadServiceForOrdered.connection, + fetch: setupGetReceipt, + } ) // upload/add roots should be the same. @@ -643,7 +655,12 @@ describe('uploadDirectory', () => { const uploadedDirCustomOrder = await uploadDirectory( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, [...unsortedFiles], - { connection: uploadServiceForCustomOrder.connection, customOrder: true } + { + connection: uploadServiceForCustomOrder.connection, + customOrder: true, + // @ts-ignore + fetch: setupGetReceipt, + } ) const shardsForCustomOrder = uploadServiceForCustomOrder.invocations .flatMap((i) => @@ -775,6 +792,7 @@ describe('uploadCAR', () => { connection, onShardStored: (meta) => carCIDs.push(meta.cid), shardSize, + fetch: setupGetReceipt, } ) @@ -883,6 +901,7 @@ describe('uploadCAR', () => { onShardStored: (meta) => { if (meta.piece) pieceCIDs.push(meta.piece) }, + fetch: setupGetReceipt, } ) diff --git a/packages/w3up-client/test/client.test.js b/packages/w3up-client/test/client.test.js index fda4a546a..28e925aef 100644 --- a/packages/w3up-client/test/client.test.js +++ b/packages/w3up-client/test/client.test.js @@ -6,6 +6,7 @@ import { toCAR } from './helpers/car.js' import { File } from './helpers/shims.js' import { Client } from '../src/client.js' import * as Test from './test.js' +import { setupGetReceipt } from './helpers/utils.js' /** @type {Test.Suite} */ export const testClient = { @@ -45,6 +46,7 @@ export const testClient = { onShardStored: (meta) => { carCID = meta.cid }, + fetch: setupGetReceipt, }) assert.deepEqual(await uploadTable.exists(space.did(), dataCID), { @@ -82,6 +84,7 @@ export const testClient = { await assert.rejects(alice.uploadFile(file), { message: 'missing current space: use createSpace() or setCurrentSpace()', + fetch: setupGetReceipt, }) }, }), @@ -124,6 +127,7 @@ export const testClient = { onShardStored: (meta) => { carCID = meta.cid }, + fetch: setupGetReceipt, }) assert.deepEqual(await uploadTable.exists(space.did(), dataCID), { @@ -166,6 +170,7 @@ export const testClient = { onShardStored: (meta) => { carCID = meta.cid }, + fetch: setupGetReceipt, }) assert.deepEqual(await uploadTable.exists(space.did(), root), { @@ -184,7 +189,7 @@ export const testClient = { ) }, }), - getRecipt: Test.withContext({ + getReceipt: Test.withContext({ 'should find a receipt': async (assert, { connection }) => { const taskCid = parseLink( 'bafyreibo6nqtvp67daj7dkmeb5c2n6bg5bunxdmxq3lghtp3pmjtzpzfma' @@ -403,7 +408,9 @@ export const testClient = { consumer: space.did(), }) - const root = await alice.uploadFile(new Blob([bytes])) + const root = await alice.uploadFile(new Blob([bytes]), { + fetch: setupGetReceipt, + }) assert.deepEqual(await uploadTable.exists(space.did(), root), { ok: true, @@ -450,7 +457,9 @@ export const testClient = { consumer: space.did(), }) - const root = await alice.uploadFile(new Blob([bytes])) + const root = await alice.uploadFile(new Blob([bytes]), { + fetch: setupGetReceipt, + }) assert.deepEqual(await uploadTable.exists(space.did(), root), { ok: true, @@ -524,7 +533,9 @@ export const testClient = { consumer: space.did(), }) - const root = await alice.uploadFile(new Blob(bytesArray)) + const root = await alice.uploadFile(new Blob(bytesArray), { + fetch: setupGetReceipt, + }) const upload = await uploadTable.get(space.did(), root) diff --git a/packages/w3up-client/test/helpers/utils.js b/packages/w3up-client/test/helpers/utils.js index 4dd6dbc66..17280be36 100644 --- a/packages/w3up-client/test/helpers/utils.js +++ b/packages/w3up-client/test/helpers/utils.js @@ -1,9 +1,40 @@ +import * as base32 from 'multiformats/bases/base32' +import { CID } from 'multiformats' +import { Receipt, Message } from '@ucanto/core' +import * as CAR from '@ucanto/transport/car' import * as Server from '@ucanto/server' import { UCAN } from '@web3-storage/capabilities' import * as Types from '../../src/types.js' +import * as Signer from '@ucanto/principal/ed25519' export const validateAuthorization = () => ({ ok: {} }) +// @ts-ignore Parameter +export const setupGetReceipt = async (url) => { + console.log('FOO', url) + // need to handle using regular fetch when not actually getting a receipt + if (!url.pathname) { + return await fetch(url) + } + + const cid = url.pathname.replace('/receipt/', '') + console.log(cid) + const receipt = await Receipt.issue({ + issuer: await Signer.generate(), + // @ts-ignore Type + ran: CID.parse(cid, base32.base32), + result: { ok: {} }, + }) + console.log(receipt) + const message = await Message.build({ + // @ts-ignore + invocations: [], + receipts: [receipt], + }) + const request = CAR.request.encode(message) + return new Response(request.body.buffer) +} + /** * Utility function that creates a delegation from account to agent and an * attestation from service to proof it. Proofs can be used to invoke any