Skip to content

Commit

Permalink
fix: check for blob/accept receipts before blob/add is concluded
Browse files Browse the repository at this point in the history
  • Loading branch information
joaosa committed May 15, 2024
1 parent 49aef56 commit 3d0ebf1
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 8 deletions.
57 changes: 57 additions & 0 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { CAR } from '@ucanto/transport'
import { sha256 } from 'multiformats/hashes/sha2'
import { ed25519 } from '@ucanto/principal'
import { conclude } from '@web3-storage/capabilities/ucan'
Expand Down Expand Up @@ -26,6 +27,38 @@ function createUploadProgressHandler(url, handler) {
return onUploadProgress
}

// FIXME this code was copied over from w3up-client and modified to parameterise receiptsEndpoint
export const receiptsEndpoint = 'https://up.web3.storage/receipt/'
/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
*/
async function getReceipt(taskCid, options = {}) {
// Fetch receipt from endpoint
// FIXME handle config
const url = new URL(taskCid.toString(), 'https://localhost/receipt/')
const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis)
const workflowResponse = await fetchReceipt(url)
/* c8 ignore start */
if (!workflowResponse.ok) {
throw new Error(
`no receipt available for requested task ${taskCid.toString()}`
)
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer())
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
return agentMessage.receipts.get(taskCid.toString())
}

// FIXME this code has been copied over from upload-api
/**
* @param {import('@ucanto/interface').Invocation} concludeFx
Expand Down Expand Up @@ -298,6 +331,30 @@ export async function add(
})
}

// Ensure the blob has been accepted
const acceptReceipt = await retry(
async () => {
try {
return await getReceipt(nextTasks.accept.task.link(), options)
} catch (err) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: err,
})
}
},
{
onFailedAttempt: console.warn,
retries: options.retries ?? REQUEST_RETRIES,
}
)

// TODO cover this
if (!acceptReceipt) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: 'failed to get blob/accept receipt',
})
}

return multihash
}

Expand Down
68 changes: 68 additions & 0 deletions packages/upload-client/test/blob.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
setupBlobAddSuccessResponse,
setupBlobAdd4xxResponse,
setupBlobAdd5xxResponse,
setupGetReceipt,
} from './helpers/utils.js'
import { fetchWithUploadProgress } from '../src/fetch-with-upload-progress.js'

Expand Down Expand Up @@ -82,6 +83,7 @@ describe('Blob.add', () => {
progress.push(status)
},
fetchWithUploadProgress,
fetch: setupGetReceipt,
}
)

Expand All @@ -106,6 +108,7 @@ describe('Blob.add', () => {
onUploadProgress: (status) => {
progressWithoutUploadProgress.push(status)
},
fetch: setupGetReceipt,
}
)
assert.deepEqual(addedWithoutUploadProgress.bytes, bytesHash.bytes)
Expand Down Expand Up @@ -173,6 +176,71 @@ describe('Blob.add', () => {
)
})

it('throws when it cannot get the blob/accept receipt', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const bytes = await randomBytes(128)

const proofs = [
await BlobCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
]

const service = mockService({
ucan: {
conclude: provide(UCAN.conclude, () => {
return { ok: { time: Date.now() } }
}),
},
blob: {
// @ts-ignore Argument of type
add: provide(BlobCapabilities.add, ({ invocation }) => {
return setupBlobAddSuccessResponse(
{ issuer: space, audience: agent, with: space, proofs },
invocation
)
}),
},
})

const server = Server.create({
id: serviceSigner,
service,
codec: CAR.inbound,
validateAuthorization,
})
const connection = Client.connect({
id: serviceSigner,
codec: CAR.outbound,
channel: server,
})

await assert.rejects(
Blob.add(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
bytes,
{
connection,
retries: 0,
fetch: async (url) => {
// @ts-ignore Parameter
if (!url.pathname) {
return await fetch(url)
}
throw new Server.Failure('boom')
},
}
),
{
message: 'failed blob/add invocation',
}
)
})

it('throws for bucket URL client error 4xx', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
Expand Down
29 changes: 28 additions & 1 deletion packages/upload-client/test/helpers/utils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { Receipt } from '@ucanto/core'
import { base32 } from 'multiformats/bases/base32'
import * as Signer from '@ucanto/principal/ed25519'
import { CID } from 'multiformats'
import { Receipt, Message } from '@ucanto/core'
import * as CAR from '@ucanto/transport/car'
import * as Server from '@ucanto/server'
import * as HTTP from '@web3-storage/capabilities/http'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
Expand All @@ -7,6 +11,29 @@ import { createConcludeInvocation } from '../../../upload-client/src/blob.js'

export const validateAuthorization = () => ({ ok: {} })

// @ts-ignore Parameter
export const setupGetReceipt = async (url) => {
// need to handle using regular fetch when not actually getting a receipt
if (!url.pathname) {
return await fetch(url)
}

const cid = url.pathname.replace('/receipt/', '')
const receipt = await Receipt.issue({
issuer: await Signer.generate(),
// @ts-ignore Type
ran: CID.parse(cid, base32),
result: { ok: {} },
})
const message = await Message.build({
// @ts-ignore
invocations: [],
receipts: [receipt],
})
const request = CAR.request.encode(message)
return new Response(request.body.buffer)
}

export const setupBlobAddSuccessResponse = async function (
// @ts-ignore
options,
Expand Down
25 changes: 22 additions & 3 deletions packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { mockService } from './helpers/mocks.js'
import {
validateAuthorization,
setupBlobAddSuccessResponse,
setupGetReceipt,
} from './helpers/utils.js'
import {
blockEncodingLength,
Expand Down Expand Up @@ -127,6 +128,7 @@ describe('uploadFile', () => {
onShardStored: (meta) => {
carCID = meta.cid
},
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -225,6 +227,7 @@ describe('uploadFile', () => {
// so we actually end up with a shard for each block - 5 CARs!
shardSize: 1024 * 1024 * 2,
onShardStored: (meta) => carCIDs.push(meta.cid),
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -300,6 +303,7 @@ describe('uploadFile', () => {
file,
{
connection,
fetch: setupGetReceipt,
}
)
)
Expand Down Expand Up @@ -405,6 +409,7 @@ describe('uploadDirectory', () => {
onShardStored: (meta) => {
carCID = meta.cid
},
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -498,6 +503,7 @@ describe('uploadDirectory', () => {
connection,
shardSize: 500_056, // should end up with 2 CAR files
onShardStored: (meta) => carCIDs.push(meta.cid),
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -594,15 +600,21 @@ describe('uploadDirectory', () => {
const uploadedDirUnsorted = await uploadDirectory(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
unsortedFiles,
{ connection: uploadServiceForUnordered.connection }
{
connection: uploadServiceForUnordered.connection,
fetch: setupGetReceipt,
}
)

const uploadServiceForOrdered = createSimpleMockUploadServer()
// uploading sorted files should also work
const uploadedDirSorted = await uploadDirectory(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
[...unsortedFiles].sort(defaultFileComparator),
{ connection: uploadServiceForOrdered.connection }
{
connection: uploadServiceForOrdered.connection,
fetch: setupGetReceipt,
}
)

// upload/add roots should be the same.
Expand Down Expand Up @@ -643,7 +655,12 @@ describe('uploadDirectory', () => {
const uploadedDirCustomOrder = await uploadDirectory(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
[...unsortedFiles],
{ connection: uploadServiceForCustomOrder.connection, customOrder: true }
{
connection: uploadServiceForCustomOrder.connection,
customOrder: true,
// @ts-ignore
fetch: setupGetReceipt,
}
)
const shardsForCustomOrder = uploadServiceForCustomOrder.invocations
.flatMap((i) =>
Expand Down Expand Up @@ -775,6 +792,7 @@ describe('uploadCAR', () => {
connection,
onShardStored: (meta) => carCIDs.push(meta.cid),
shardSize,
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -883,6 +901,7 @@ describe('uploadCAR', () => {
onShardStored: (meta) => {
if (meta.piece) pieceCIDs.push(meta.piece)
},
fetch: setupGetReceipt,
}
)

Expand Down
19 changes: 15 additions & 4 deletions packages/w3up-client/test/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { toCAR } from './helpers/car.js'
import { File } from './helpers/shims.js'
import { Client } from '../src/client.js'
import * as Test from './test.js'
import { setupGetReceipt } from './helpers/utils.js'

/** @type {Test.Suite} */
export const testClient = {
Expand Down Expand Up @@ -45,6 +46,7 @@ export const testClient = {
onShardStored: (meta) => {
carCID = meta.cid
},
fetch: setupGetReceipt,
})

assert.deepEqual(await uploadTable.exists(space.did(), dataCID), {
Expand Down Expand Up @@ -82,6 +84,7 @@ export const testClient = {
await assert.rejects(alice.uploadFile(file), {
message:
'missing current space: use createSpace() or setCurrentSpace()',
fetch: setupGetReceipt,
})
},
}),
Expand Down Expand Up @@ -124,6 +127,7 @@ export const testClient = {
onShardStored: (meta) => {
carCID = meta.cid
},
fetch: setupGetReceipt,
})

assert.deepEqual(await uploadTable.exists(space.did(), dataCID), {
Expand Down Expand Up @@ -166,6 +170,7 @@ export const testClient = {
onShardStored: (meta) => {
carCID = meta.cid
},
fetch: setupGetReceipt,
})

assert.deepEqual(await uploadTable.exists(space.did(), root), {
Expand All @@ -184,7 +189,7 @@ export const testClient = {
)
},
}),
getRecipt: Test.withContext({
getReceipt: Test.withContext({
'should find a receipt': async (assert, { connection }) => {
const taskCid = parseLink(
'bafyreibo6nqtvp67daj7dkmeb5c2n6bg5bunxdmxq3lghtp3pmjtzpzfma'
Expand Down Expand Up @@ -403,7 +408,9 @@ export const testClient = {
consumer: space.did(),
})

const root = await alice.uploadFile(new Blob([bytes]))
const root = await alice.uploadFile(new Blob([bytes]), {
fetch: setupGetReceipt,
})

assert.deepEqual(await uploadTable.exists(space.did(), root), {
ok: true,
Expand Down Expand Up @@ -450,7 +457,9 @@ export const testClient = {
consumer: space.did(),
})

const root = await alice.uploadFile(new Blob([bytes]))
const root = await alice.uploadFile(new Blob([bytes]), {
fetch: setupGetReceipt,
})

assert.deepEqual(await uploadTable.exists(space.did(), root), {
ok: true,
Expand Down Expand Up @@ -524,7 +533,9 @@ export const testClient = {
consumer: space.did(),
})

const root = await alice.uploadFile(new Blob(bytesArray))
const root = await alice.uploadFile(new Blob(bytesArray), {
fetch: setupGetReceipt,
})

const upload = await uploadTable.get(space.did(), root)

Expand Down
Loading

0 comments on commit 3d0ebf1

Please sign in to comment.