Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check for blob/accept receipts before blob/add is concluded #1459

Merged
merged 39 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
06e42e9
fix: check for blob/accept receipts before blob/add is concluded
joaosa May 14, 2024
3ac98a3
fix: address passing receipts endpoint
joaosa May 15, 2024
f186499
fix: return the site multihash for blob/add
joaosa May 21, 2024
8773f33
feat: return the location commitment for blob/add
joaosa May 23, 2024
cbf1bf0
fix: address tests to return location commitment for blob/add
joaosa May 23, 2024
5a55ba2
fix: address index/add tests
joaosa May 23, 2024
62c32e6
fix: use generators for getting receipts in upload-client tests
joaosa May 24, 2024
bcca6c7
fix: use generators for getting receipts in w3up-client tests
joaosa May 27, 2024
a089685
fix: pass w3up-client tests
joaosa May 28, 2024
9764432
chore: bump content-claims to 5.0.0
joaosa May 30, 2024
8ab1e9d
chore: cleanup receipts endpoint setup
joaosa May 30, 2024
00326be
fix: relock deps
joaosa May 30, 2024
9b84fbd
chore: remove unneeded generator from helper
joaosa May 30, 2024
61280e0
feat: wrap blob add response
joaosa May 30, 2024
021f353
Merge branch 'main' into fix/blob-add-cli
joaosa May 31, 2024
8382376
chore: reuse getReceipt code
joaosa Jun 3, 2024
37f8c84
fix: address tests to use space/blob/*
joaosa Jun 3, 2024
14cae1c
chore: add receipts server to upload-client
joaosa Jun 3, 2024
3212305
chore: extract receipts polling
joaosa Jun 3, 2024
161e086
chore: remove receipt mocking from upload-client tests
joaosa Jun 3, 2024
5e7c5c0
chore: remove receipt mocking from w3up-client tests
joaosa Jun 3, 2024
d0dc1ea
chore: add a test to cover failing to get a receipt
joaosa Jun 3, 2024
9b53536
chore: do not cover options
joaosa Jun 3, 2024
89711ea
fix: load receipt fixtures correctly
joaosa Jun 3, 2024
b732be4
chore: revert code change
joaosa Jun 4, 2024
740975e
chore: distinguish receipt errors
joaosa Jun 4, 2024
412ea48
chore: propagate the receipt not found error
joaosa Jun 4, 2024
24258da
fix: filter out getting the receipt not found error
joaosa Jun 4, 2024
7ee6b58
chore: remove redundant error
joaosa Jun 4, 2024
06606b1
chore: add receipt missing error
joaosa Jun 4, 2024
74e4c7a
fix: return blob/add location commitment delegation
joaosa Jun 4, 2024
444dd8c
chore: readd whitespace to avoid release
joaosa Jun 4, 2024
6c650c5
fix: test blob/add location commitment
joaosa Jun 4, 2024
7ad41d7
chore: cleanup error propagation
joaosa Jun 4, 2024
581cb82
chore: improve error desc
joaosa Jun 4, 2024
46a11b9
chore: simplify response check
joaosa Jun 4, 2024
1faa3dc
chore: remove unneeded error check
joaosa Jun 4, 2024
1f08812
chore: move indexShardedDAG
joaosa Jun 4, 2024
c045997
chore: break down the receipt class
joaosa Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/blob-index/src/index.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * as ShardedDAGIndex from './sharded-dag-index.js'
export * from './digest-map.js'
export { indexShardedDAG } from './util.js'
16 changes: 16 additions & 0 deletions packages/blob-index/src/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,19 @@ export const fromShardArchives = async (content, shards) => {
}
return index
}

/**
* Indexes a sharded DAG
*
* @param {import('multiformats').Link} root
* @param {import('@web3-storage/capabilities/types').CARLink[]} shards
* @param {Array<Map<API.SliceDigest, API.Position>>} shardIndexes
*/
export async function indexShardedDAG(root, shards, shardIndexes) {
const index = create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
return await index.archive()
}
2 changes: 2 additions & 0 deletions packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"test:node": "hundreds -r html -r text mocha 'test/**/!(*.browser).test.js' -n experimental-vm-modules -n no-warnings",
"test:browser": "playwright-test 'test/**/!(*.node).test.js'",
"mock": "run-p mock:*",
"mock:receipts-server": "PORT=9201 node test/helpers/receipts-server.js",
"mock:bucket-200": "PORT=9200 STATUS=200 node test/helpers/bucket-server.js",
"mock:bucket-401": "PORT=9400 STATUS=400 node test/helpers/bucket-server.js",
"mock:bucket-500": "PORT=9500 STATUS=500 node test/helpers/bucket-server.js",
Expand Down Expand Up @@ -94,6 +95,7 @@
"@types/varint": "^6.0.1",
"@ucanto/principal": "^9.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/content-claims": "^5.0.0",
"@web3-storage/eslint-config-w3up": "workspace:^",
"assert": "^2.0.0",
"blockstore-core": "^3.0.0",
Expand Down
26 changes: 23 additions & 3 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import { sha256 } from 'multiformats/hashes/sha2'
import { ed25519 } from '@ucanto/principal'
import { conclude } from '@web3-storage/capabilities/ucan'
import * as UCAN from '@web3-storage/capabilities/ucan'
import { Receipt } from '@ucanto/core'
import { Delegation, Receipt } from '@ucanto/core'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
import * as BlobCapabilities from '@web3-storage/capabilities/blob'
import * as HTTPCapabilities from '@web3-storage/capabilities/http'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry, { AbortError } from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'
import { poll } from './receipts.js'

/**
* @param {string} url
Expand Down Expand Up @@ -166,7 +167,7 @@ export function createConcludeInvocation(id, serviceDid, receipt) {
* The issuer needs the `blob/add` delegated capability.
* @param {Blob|Uint8Array} data Blob data.
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('multiformats').MultihashDigest>}
* @returns {Promise<import('./types.js').BlobAddOk>}
*/
export async function add(
{ issuer, with: resource, proofs, audience },
Expand Down Expand Up @@ -303,7 +304,26 @@ export async function add(
})
}

return multihash
// Ensure the blob has been accepted
const acceptReceipt = await poll(nextTasks.accept.task.link(), options)

const blocks = new Map(
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [
`${block.cid}`,
block,
])
)
const site = Delegation.view({
root: /** @type {import('@ucanto/interface').UCANLink} */ (
acceptReceipt.out.ok.site
),
blocks,
})

return {
multihash,
site,
}
}

/**
Expand Down
20 changes: 8 additions & 12 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as PieceHasher from '@web3-storage/data-segment/multihash'
import { Storefront } from '@web3-storage/filecoin-client'
import { ShardedDAGIndex } from '@web3-storage/blob-index'
import * as Link from 'multiformats/link'
import * as raw from 'multiformats/codecs/raw'
import * as Store from './store.js'
Expand All @@ -10,10 +9,12 @@ import * as Upload from './upload.js'
import * as UnixFS from './unixfs.js'
import * as CAR from './car.js'
import { ShardingStream, defaultFileComparator } from './sharding.js'
import { codec as carCodec } from '@ucanto/transport/car'
import { indexShardedDAG } from '@web3-storage/blob-index'

export { Blob, Index, Store, Upload, UnixFS, CAR }
export * from './sharding.js'
export { receiptsEndpoint } from './service.js'
export * as Receipt from './receipts.js'

/**
* Uploads a file to the service and returns the root data CID for the
Expand Down Expand Up @@ -144,9 +145,9 @@ async function uploadBlockStream(
async transform(car, controller) {
const bytes = new Uint8Array(await car.arrayBuffer())
// Invoke blob/add and write bytes to write target
const multihash = await Blob.add(conf, bytes, options)
const { multihash } = await Blob.add(conf, bytes, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure to create an issue where we can follow up to have user make the site delegation public by default by invoking the Claim delegation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #1486 👍

// Should this be raw instead?
const cid = Link.create(carCodec.code, multihash)
const cid = Link.create(CAR.code, multihash)
let piece
if (pieceHasher) {
const multihashDigest = await pieceHasher.digest(bytes)
Expand Down Expand Up @@ -199,20 +200,15 @@ async function uploadBlockStream(
/* c8 ignore next */
if (!root) throw new Error('missing root CID')

const index = ShardedDAGIndex.create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
const indexBytes = await index.archive()
const indexBytes = await indexShardedDAG(root, shards, shardIndexes)
/* c8 ignore next 3 */
if (!indexBytes.ok) {
throw new Error('failed to archive DAG index', { cause: indexBytes.error })
}

// Store the index in the space
const indexDigest = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(carCodec.code, indexDigest)
const { multihash } = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(CAR.code, multihash)

// Register the index with the service
await Index.add(conf, indexLink, options)
Expand Down
119 changes: 119 additions & 0 deletions packages/upload-client/src/receipts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import retry, { AbortError } from 'p-retry'
import { CAR } from '@ucanto/transport'
import { receiptsEndpoint } from './service.js'
import { REQUEST_RETRIES } from './constants.js'

export class ReceiptNotFound extends Error {
/**
* @param {import('multiformats').UnknownLink} taskCid
*/
constructor(taskCid) {
super()
this.taskCid = taskCid
}

/* c8 ignore start */
get reason() {
return `receipt not found for task ${this.taskCid} in the indexed workflow`
}
/* c8 ignore end */

get name() {
return 'ReceiptNotFound'
}
}

export class ReceiptMissing extends Error {
/**
* @param {import('multiformats').UnknownLink} taskCid
*/
constructor(taskCid) {
super()
this.taskCid = taskCid
}

/* c8 ignore start */
get reason() {
return `receipt missing for task ${this.taskCid}`
}
/* c8 ignore end */

get name() {
return 'ReceiptMissing'
}
}

/**
* Polls for a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('@ucanto/interface').Receipt>}
*/
export async function poll(taskCid, options = {}) {
return await retry(
async () => {
const res = await get(taskCid, options)
if (res.error) {
// @ts-ignore
if (res.error.name === 'ReceiptNotFound') {
// throw an error that will cause `p-retry` to retry with
throw res.error
} else {
throw new AbortError(
new Error('failed to fetch blob/accept receipt', {
cause: res.error,
})
)
}
}
return res.ok
},
{
onFailedAttempt: console.warn,
/* c8 ignore next */
retries: options.retries ?? REQUEST_RETRIES,
}
)
}

/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('@ucanto/client').Result<import('@ucanto/interface').Receipt, Error>>}
*/
async function get(taskCid, options = {}) {
// Fetch receipt from endpoint
const url = new URL(
taskCid.toString(),
options.receiptsEndpoint ?? receiptsEndpoint
)
const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis)
const workflowResponse = await fetchReceipt(url)
/* c8 ignore start */
if (workflowResponse.status === 404) {
return {
error: new ReceiptNotFound(taskCid),
}
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer())
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
const receipt = agentMessage.receipts.get(taskCid.toString())
if (!receipt) {
return {
error: new ReceiptMissing(taskCid),
}
}
return {
ok: receipt,
}
}
1 change: 1 addition & 0 deletions packages/upload-client/src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as DID from '@ipld/dag-ucan/did'

export const serviceURL = new URL('https://up.web3.storage')
export const servicePrincipal = DID.parse('did:web:web3.storage')
export const receiptsEndpoint = 'https://up.web3.storage/receipt/'

/** @type {import('@ucanto/interface').ConnectionView<import('./types.js').Service>} */
export const connection = connect({
Expand Down
16 changes: 15 additions & 1 deletion packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import type {
FetchOptions as IpfsUtilsFetchOptions,
ProgressStatus as XHRProgressStatus,
} from 'ipfs-utils/src/types.js'
import { Link, UnknownLink, Version, MultihashHasher } from 'multiformats'
import {
MultihashDigest,
Link,
UnknownLink,
Version,
MultihashHasher,
} from 'multiformats'
import { Block } from '@ipld/unixfs'
import {
ServiceMethod,
Expand All @@ -12,6 +18,8 @@ import {
DID,
Principal,
Failure,
Delegation,
Capabilities,
} from '@ucanto/interface'
import {
UCANConclude,
Expand Down Expand Up @@ -307,6 +315,7 @@ export interface RequestOptions
UploadProgressTrackable {
fetch?: typeof globalThis.fetch
nonce?: string
receiptsEndpoint?: string
}

export interface ListRequestOptions extends RequestOptions, Pageable {}
Expand Down Expand Up @@ -374,3 +383,8 @@ export interface FileLike extends BlobLike {
*/
name: string
}

export interface BlobAddOk {
multihash: MultihashDigest
site: Delegation<Capabilities>
}
Loading
Loading