Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check for blob/accept receipts before blob/add is concluded #1459

Merged
merged 39 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
06e42e9
fix: check for blob/accept receipts before blob/add is concluded
joaosa May 14, 2024
3ac98a3
fix: address passing receipts endpoint
joaosa May 15, 2024
f186499
fix: return the site multihash for blob/add
joaosa May 21, 2024
8773f33
feat: return the location commitment for blob/add
joaosa May 23, 2024
cbf1bf0
fix: address tests to return location commitment for blob/add
joaosa May 23, 2024
5a55ba2
fix: address index/add tests
joaosa May 23, 2024
62c32e6
fix: use generators for getting receipts in upload-client tests
joaosa May 24, 2024
bcca6c7
fix: use generators for getting receipts in w3up-client tests
joaosa May 27, 2024
a089685
fix: pass w3up-client tests
joaosa May 28, 2024
9764432
chore: bump content-claims to 5.0.0
joaosa May 30, 2024
8ab1e9d
chore: cleanup receipts endpoint setup
joaosa May 30, 2024
00326be
fix: relock deps
joaosa May 30, 2024
9b84fbd
chore: remove unneeded generator from helper
joaosa May 30, 2024
61280e0
feat: wrap blob add response
joaosa May 30, 2024
021f353
Merge branch 'main' into fix/blob-add-cli
joaosa May 31, 2024
8382376
chore: reuse getReceipt code
joaosa Jun 3, 2024
37f8c84
fix: address tests to use space/blob/*
joaosa Jun 3, 2024
14cae1c
chore: add receipts server to upload-client
joaosa Jun 3, 2024
3212305
chore: extract receipts polling
joaosa Jun 3, 2024
161e086
chore: remove receipt mocking from upload-client tests
joaosa Jun 3, 2024
5e7c5c0
chore: remove receipt mocking from w3up-client tests
joaosa Jun 3, 2024
d0dc1ea
chore: add a test to cover failing to get a receipt
joaosa Jun 3, 2024
9b53536
chore: do not cover options
joaosa Jun 3, 2024
89711ea
fix: load receipt fixtures correctly
joaosa Jun 3, 2024
b732be4
chore: revert code change
joaosa Jun 4, 2024
740975e
chore: distinguish receipt errors
joaosa Jun 4, 2024
412ea48
chore: propagate the receipt not found error
joaosa Jun 4, 2024
24258da
fix: filter out getting the receipt not found error
joaosa Jun 4, 2024
7ee6b58
chore: remove redundant error
joaosa Jun 4, 2024
06606b1
chore: add receipt missing error
joaosa Jun 4, 2024
74e4c7a
fix: return blob/add location commitment delegation
joaosa Jun 4, 2024
444dd8c
chore: readd whitespace to avoid release
joaosa Jun 4, 2024
6c650c5
fix: test blob/add location commitment
joaosa Jun 4, 2024
7ad41d7
chore: cleanup error propagation
joaosa Jun 4, 2024
581cb82
chore: improve error desc
joaosa Jun 4, 2024
46a11b9
chore: simplify response check
joaosa Jun 4, 2024
1faa3dc
chore: remove unneeded error check
joaosa Jun 4, 2024
1f08812
chore: move indexShardedDAG
joaosa Jun 4, 2024
c045997
chore: break down the receipt class
joaosa Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/upload-api/src/blob/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import * as Blob from '@web3-storage/capabilities/blob'
import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob'
import * as HTTP from '@web3-storage/capabilities/http'
import * as API from '../types.js'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we take this out to avoid a upload-api release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

import { createConcludeInvocation } from '../ucan/conclude.js'
import { AwaitError } from './lib.js'

Expand Down
1 change: 1 addition & 0 deletions packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"@types/varint": "^6.0.1",
"@ucanto/principal": "^9.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/content-claims": "^5.0.0",
"@web3-storage/eslint-config-w3up": "workspace:^",
"assert": "^2.0.0",
"blockstore-core": "^3.0.0",
Expand Down
93 changes: 89 additions & 4 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { CAR } from '@ucanto/transport'
import { sha256 } from 'multiformats/hashes/sha2'
import { ed25519 } from '@ucanto/principal'
import { conclude } from '@web3-storage/capabilities/ucan'
import * as UCAN from '@web3-storage/capabilities/ucan'
import { Receipt } from '@ucanto/core'
import { Delegation, Receipt } from '@ucanto/core'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
import * as BlobCapabilities from '@web3-storage/capabilities/blob'
import * as HTTPCapabilities from '@web3-storage/capabilities/http'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry, { AbortError } from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { receiptsEndpoint, servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'

/**
Expand All @@ -26,6 +27,40 @@ function createUploadProgressHandler(url, handler) {
return onUploadProgress
}

// FIXME this code was copied over from w3up-client and modified to parameterise receiptsEndpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though I wrote this earlier, but can't find it now. Can we make w3up to use this function from upload-client instead of having same code in both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed. I think I somehow mixed it with doing the same for the receipts endpoint

/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
*/
async function getReceipt(taskCid, options = {}) {
// Fetch receipt from endpoint
const url = new URL(
taskCid.toString(),
options.receiptsEndpoint ?? receiptsEndpoint
Copy link
Contributor Author

@joaosa joaosa May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm wondering about is if there's a better way to achieve this param config and what kind implications this would have in terms of configuration in a testing environment 🤔 . Also, I'm usually wary of defaulting to production, but I see this is a pattern on the codebase and is, as such, likely to be okay here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think this is the pattern here of defaulting to prod. So would say is good

)
/* c8 ignore next */
const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis)
const workflowResponse = await fetchReceipt(url)
/* c8 ignore start */
if (!workflowResponse.ok) {
throw new Error(
`no receipt available for requested task ${taskCid.toString()}`
)
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer())
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
return agentMessage.receipts.get(taskCid.toString())
}

// FIXME this code has been copied over from upload-api
/**
* @param {import('@ucanto/interface').Invocation} concludeFx
Expand Down Expand Up @@ -166,7 +201,7 @@ export function createConcludeInvocation(id, serviceDid, receipt) {
* The issuer needs the `blob/add` delegated capability.
* @param {Blob|Uint8Array} data Blob data.
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('multiformats').MultihashDigest>}
* @returns {Promise<import('./types.js').BlobAddOk>}
*/
export async function add(
{ issuer, with: resource, proofs, audience },
Expand Down Expand Up @@ -303,7 +338,57 @@ export async function add(
})
}

return multihash
// Ensure the blob has been accepted
const acceptReceipt = await retry(
async () => {
try {
return await getReceipt(nextTasks.accept.task.link(), options)
} catch (err) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct message here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to distinguish between not there yet vs an actual error, and quit early from retry if the latter. Typically new code returns Result types, which allow us to properly type possible error states. So we might do something like:

const acceptReceipt = await retry(async () => {
  const res = await getReceipt(nextTasks.accept.task.link(), options)
  if (res.error) {
    if (res.error.name === 'ReceiptNotFound') {
      // throw an error that will cause `p-retry` to retry with
      throw new Error('receipt not yet available')
    } else {
      // bail the retry using `import { AbortError } from 'p-retry'`
      throw new AbortError(new Error('failed to fetch blob/accept receipt', { cause: res.error }))
    }
  }
  return res.ok
})

Not 100% necessary but I would then consider extracting this as a utility like:

const acceptReceipt = await Receipt.poll(nextTasks.accept.task.link(), options)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on this 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

cause: err,
})
}
},
{
onFailedAttempt: console.warn,
retries: options.retries ?? REQUEST_RETRIES,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to test that these defaults work nicely in a real environment, as they may exist a delay in between the receipt to be there. Did you try it against staging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not tried this but that is a good point

)

// @ts-ignore Property
if (!acceptReceipt?.out.ok?.site) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: 'failed to get blob/accept receipt',
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be good to distinct this error, from Not Found, or Other errors

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I'd just use acceptReceipt.out.error as the cause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed


const blocks = new Map(
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [
`${block.cid}`,
block,
])
)
const site = Delegation.view(
{
root: /** @type {import('@ucanto/interface').UCANLink} */ (
// @ts-ignore Property
acceptReceipt.out.ok.site
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should check for error and then you don't need to ts-ignore here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I need at all anymore, as previously the receipt acquisition method could return null. Now it's either a receipt or it throws.

),
blocks,
},
null
)
/* c8 ignore next 5 */
if (!site) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this be falsey?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you need to not pass null above (pass undefined or simply omit) for view(...) to throw in the case where the root does not exist in the passed blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: 'failed to get blob/accept receipt',
})
}

return {
multihash,
site: site.link(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should return the delegation, so that the client can invoke it if intended

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! addressed

}
}

/**
Expand Down
33 changes: 22 additions & 11 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import * as Upload from './upload.js'
import * as UnixFS from './unixfs.js'
import * as CAR from './car.js'
import { ShardingStream, defaultFileComparator } from './sharding.js'
import { codec as carCodec } from '@ucanto/transport/car'

export { Blob, Index, Store, Upload, UnixFS, CAR }
export * from './sharding.js'
export { receiptsEndpoint } from './service.js'

/**
* Uploads a file to the service and returns the root data CID for the
Expand Down Expand Up @@ -144,9 +144,9 @@ async function uploadBlockStream(
async transform(car, controller) {
const bytes = new Uint8Array(await car.arrayBuffer())
// Invoke blob/add and write bytes to write target
const multihash = await Blob.add(conf, bytes, options)
const { multihash } = await Blob.add(conf, bytes, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure to create an issue where we can follow up to have user make the site delegation public by default by invoking the Claim delegation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #1486 👍

// Should this be raw instead?
const cid = Link.create(carCodec.code, multihash)
const cid = Link.create(CAR.code, multihash)
let piece
if (pieceHasher) {
const multihashDigest = await pieceHasher.digest(bytes)
Expand Down Expand Up @@ -199,20 +199,15 @@ async function uploadBlockStream(
/* c8 ignore next */
if (!root) throw new Error('missing root CID')

const index = ShardedDAGIndex.create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
const indexBytes = await index.archive()
const indexBytes = await indexShardedDAG(root, shards, shardIndexes)
/* c8 ignore next 3 */
if (!indexBytes.ok) {
throw new Error('failed to archive DAG index', { cause: indexBytes.error })
}

// Store the index in the space
const indexDigest = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(carCodec.code, indexDigest)
const { multihash } = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(CAR.code, multihash)

// Register the index with the service
await Index.add(conf, indexLink, options)
Expand All @@ -221,3 +216,19 @@ async function uploadBlockStream(

return root
}

/**
* Indexes a sharded DAG
*
* @param {import('multiformats').Link} root
* @param {import('./types.js').CARLink[]} shards
* @param {Array<Map<import('./types.js').SliceDigest, import('./types.js').Position>>} shardIndexes
*/
export async function indexShardedDAG(root, shards, shardIndexes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an export from the blob-index utils.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

const index = ShardedDAGIndex.create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
return await index.archive()
}
1 change: 1 addition & 0 deletions packages/upload-client/src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as DID from '@ipld/dag-ucan/did'

export const serviceURL = new URL('https://up.web3.storage')
export const servicePrincipal = DID.parse('did:web:web3.storage')
export const receiptsEndpoint = 'https://up.web3.storage/receipt/'

/** @type {import('@ucanto/interface').ConnectionView<import('./types.js').Service>} */
export const connection = connect({
Expand Down
15 changes: 14 additions & 1 deletion packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import type {
FetchOptions as IpfsUtilsFetchOptions,
ProgressStatus as XHRProgressStatus,
} from 'ipfs-utils/src/types.js'
import { Link, UnknownLink, Version, MultihashHasher } from 'multiformats'
import {
MultihashDigest,
Link,
UnknownLink,
Version,
MultihashHasher,
} from 'multiformats'
import { Block } from '@ipld/unixfs'
import {
ServiceMethod,
Expand All @@ -12,6 +18,7 @@ import {
DID,
Principal,
Failure,
UCANLink,
} from '@ucanto/interface'
import {
UCANConclude,
Expand Down Expand Up @@ -305,6 +312,7 @@ export interface RequestOptions
UploadProgressTrackable {
fetch?: typeof globalThis.fetch
nonce?: string
receiptsEndpoint?: URL
}

export interface ListRequestOptions extends RequestOptions, Pageable {}
Expand Down Expand Up @@ -372,3 +380,8 @@ export interface FileLike extends BlobLike {
*/
name: string
}

export interface BlobAddOk {
multihash: MultihashDigest
site: UCANLink
}
Loading
Loading