Skip to content

Commit

Permalink
fix: check for blob/accept receipts before blob/add is concluded
Browse files Browse the repository at this point in the history
  • Loading branch information
joaosa committed May 15, 2024
1 parent 49aef56 commit 7331b22
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 12 deletions.
57 changes: 57 additions & 0 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { CAR } from '@ucanto/transport'
import { sha256 } from 'multiformats/hashes/sha2'
import { ed25519 } from '@ucanto/principal'
import { conclude } from '@web3-storage/capabilities/ucan'
Expand Down Expand Up @@ -26,6 +27,38 @@ function createUploadProgressHandler(url, handler) {
return onUploadProgress
}

// FIXME this code was copied over from w3up-client and modified to parameterise receiptsEndpoint
export const receiptsEndpoint = 'https://up.web3.storage/receipt/'
/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
*/
async function getReceipt(taskCid, options = {}) {
// Fetch receipt from endpoint
// FIXME handle config
const url = new URL(taskCid.toString(), 'https://localhost/receipt/')
const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis)
const workflowResponse = await fetchReceipt(url)
/* c8 ignore start */
if (!workflowResponse.ok) {
throw new Error(
`no receipt available for requested task ${taskCid.toString()}`
)
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer())
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
return agentMessage.receipts.get(taskCid.toString())
}

// FIXME this code has been copied over from upload-api
/**
* @param {import('@ucanto/interface').Invocation} concludeFx
Expand Down Expand Up @@ -298,6 +331,30 @@ export async function add(
})
}

// Ensure the blob has been accepted
const acceptReceipt = await retry(
async () => {
try {
return await getReceipt(nextTasks.accept.task.link(), options)
} catch (err) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: err,
})
}
},
{
onFailedAttempt: console.warn,
retries: options.retries ?? REQUEST_RETRIES,
}
)

// TODO cover this
if (!acceptReceipt) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: 'failed to get blob/accept receipt',
})
}

return multihash
}

Expand Down
68 changes: 68 additions & 0 deletions packages/upload-client/test/blob.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
setupBlobAddSuccessResponse,
setupBlobAdd4xxResponse,
setupBlobAdd5xxResponse,
setupGetReceipt,
} from './helpers/utils.js'
import { fetchWithUploadProgress } from '../src/fetch-with-upload-progress.js'

Expand Down Expand Up @@ -82,6 +83,7 @@ describe('Blob.add', () => {
progress.push(status)
},
fetchWithUploadProgress,
fetch: setupGetReceipt,
}
)

Expand All @@ -106,6 +108,7 @@ describe('Blob.add', () => {
onUploadProgress: (status) => {
progressWithoutUploadProgress.push(status)
},
fetch: setupGetReceipt,
}
)
assert.deepEqual(addedWithoutUploadProgress.bytes, bytesHash.bytes)
Expand Down Expand Up @@ -173,6 +176,71 @@ describe('Blob.add', () => {
)
})

it('throws when it cannot get the blob/accept receipt', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const bytes = await randomBytes(128)

const proofs = [
await BlobCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
]

const service = mockService({
ucan: {
conclude: provide(UCAN.conclude, () => {
return { ok: { time: Date.now() } }
}),
},
blob: {
// @ts-ignore Argument of type
add: provide(BlobCapabilities.add, ({ invocation }) => {
return setupBlobAddSuccessResponse(
{ issuer: space, audience: agent, with: space, proofs },
invocation
)
}),
},
})

const server = Server.create({
id: serviceSigner,
service,
codec: CAR.inbound,
validateAuthorization,
})
const connection = Client.connect({
id: serviceSigner,
codec: CAR.outbound,
channel: server,
})

await assert.rejects(
Blob.add(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
bytes,
{
connection,
retries: 0,
fetch: async (url) => {
// @ts-ignore Parameter
if (!url.pathname) {
return await fetch(url)
}
throw new Server.Failure('boom')
},
}
),
{
message: 'failed blob/add invocation',
}
)
})

it('throws for bucket URL client error 4xx', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
Expand Down
29 changes: 28 additions & 1 deletion packages/upload-client/test/helpers/utils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { Receipt } from '@ucanto/core'
import { base32 } from 'multiformats/bases/base32'
import * as Signer from '@ucanto/principal/ed25519'
import { CID } from 'multiformats'
import { Receipt, Message } from '@ucanto/core'
import * as CAR from '@ucanto/transport/car'
import * as Server from '@ucanto/server'
import * as HTTP from '@web3-storage/capabilities/http'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
Expand All @@ -7,6 +11,29 @@ import { createConcludeInvocation } from '../../../upload-client/src/blob.js'

export const validateAuthorization = () => ({ ok: {} })

// @ts-ignore Parameter
export const setupGetReceipt = async (url) => {
// need to handle using regular fetch when not actually getting a receipt
if (!url.pathname) {
return await fetch(url)
}

const cid = url.pathname.replace('/receipt/', '')
const receipt = await Receipt.issue({
issuer: await Signer.generate(),
// @ts-ignore Type
ran: CID.parse(cid, base32),
result: { ok: {} },
})
const message = await Message.build({
// @ts-ignore
invocations: [],
receipts: [receipt],
})
const request = CAR.request.encode(message)
return new Response(request.body.buffer)
}

export const setupBlobAddSuccessResponse = async function (
// @ts-ignore
options,
Expand Down
25 changes: 22 additions & 3 deletions packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { mockService } from './helpers/mocks.js'
import {
validateAuthorization,
setupBlobAddSuccessResponse,
setupGetReceipt,
} from './helpers/utils.js'
import {
blockEncodingLength,
Expand Down Expand Up @@ -127,6 +128,7 @@ describe('uploadFile', () => {
onShardStored: (meta) => {
carCID = meta.cid
},
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -225,6 +227,7 @@ describe('uploadFile', () => {
// so we actually end up with a shard for each block - 5 CARs!
shardSize: 1024 * 1024 * 2,
onShardStored: (meta) => carCIDs.push(meta.cid),
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -300,6 +303,7 @@ describe('uploadFile', () => {
file,
{
connection,
fetch: setupGetReceipt,
}
)
)
Expand Down Expand Up @@ -405,6 +409,7 @@ describe('uploadDirectory', () => {
onShardStored: (meta) => {
carCID = meta.cid
},
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -498,6 +503,7 @@ describe('uploadDirectory', () => {
connection,
shardSize: 500_056, // should end up with 2 CAR files
onShardStored: (meta) => carCIDs.push(meta.cid),
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -594,15 +600,21 @@ describe('uploadDirectory', () => {
const uploadedDirUnsorted = await uploadDirectory(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
unsortedFiles,
{ connection: uploadServiceForUnordered.connection }
{
connection: uploadServiceForUnordered.connection,
fetch: setupGetReceipt,
}
)

const uploadServiceForOrdered = createSimpleMockUploadServer()
// uploading sorted files should also work
const uploadedDirSorted = await uploadDirectory(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
[...unsortedFiles].sort(defaultFileComparator),
{ connection: uploadServiceForOrdered.connection }
{
connection: uploadServiceForOrdered.connection,
fetch: setupGetReceipt,
}
)

// upload/add roots should be the same.
Expand Down Expand Up @@ -643,7 +655,12 @@ describe('uploadDirectory', () => {
const uploadedDirCustomOrder = await uploadDirectory(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
[...unsortedFiles],
{ connection: uploadServiceForCustomOrder.connection, customOrder: true }
{
connection: uploadServiceForCustomOrder.connection,
customOrder: true,
// @ts-ignore
fetch: setupGetReceipt,
}
)
const shardsForCustomOrder = uploadServiceForCustomOrder.invocations
.flatMap((i) =>
Expand Down Expand Up @@ -775,6 +792,7 @@ describe('uploadCAR', () => {
connection,
onShardStored: (meta) => carCIDs.push(meta.cid),
shardSize,
fetch: setupGetReceipt,
}
)

Expand Down Expand Up @@ -883,6 +901,7 @@ describe('uploadCAR', () => {
onShardStored: (meta) => {
if (meta.piece) pieceCIDs.push(meta.piece)
},
fetch: setupGetReceipt,
}
)

Expand Down
13 changes: 10 additions & 3 deletions packages/w3up-client/test/capability/blob.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AgentData } from '@web3-storage/access/agent'
import { randomBytes } from '../helpers/random.js'
import { Client } from '../../src/client.js'
import * as Test from '../test.js'
import { setupGetReceipt } from '../helpers/utils.js'

export const BlobClient = Test.withContext({
'should store a blob': async (
Expand Down Expand Up @@ -32,7 +33,9 @@ export const BlobClient = Test.withContext({

const bytes = await randomBytes(128)
const bytesHash = await sha256.digest(bytes)
const multihash = await alice.capability.blob.add(new Blob([bytes]))
const multihash = await alice.capability.blob.add(new Blob([bytes]), {
fetch: setupGetReceipt,
})

// TODO we should check blobsStorage as well
assert.deepEqual(
Expand Down Expand Up @@ -71,7 +74,9 @@ export const BlobClient = Test.withContext({

const bytes = await randomBytes(128)
const bytesHash = await sha256.digest(bytes)
const multihash = await alice.capability.blob.add(new Blob([bytes]))
const multihash = await alice.capability.blob.add(new Blob([bytes]), {
fetch: setupGetReceipt,
})
assert.deepEqual(multihash, bytesHash)

const {
Expand Down Expand Up @@ -107,7 +112,9 @@ export const BlobClient = Test.withContext({
})

const bytes = await randomBytes(128)
const multihash = await alice.capability.blob.add(new Blob([bytes]))
const multihash = await alice.capability.blob.add(new Blob([bytes]), {
fetch: setupGetReceipt,
})

const result = await alice.capability.blob.remove(multihash)
assert.ok(result.ok)
Expand Down
5 changes: 4 additions & 1 deletion packages/w3up-client/test/capability/usage.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AgentData } from '@web3-storage/access/agent'
import { Client } from '../../src/client.js'
import * as Test from '../test.js'
import { setupGetReceipt } from '../helpers/utils.js'

export const UsageClient = Test.withContext({
report: {
Expand Down Expand Up @@ -29,7 +30,9 @@ export const UsageClient = Test.withContext({
})

const content = new Blob(['hello world'])
await alice.uploadFile(content)
await alice.uploadFile(content, {
fetch: setupGetReceipt,
})

const period = { from: new Date(0), to: new Date() }

Expand Down
Loading

0 comments on commit 7331b22

Please sign in to comment.