Skip to content

Commit

Permalink
chore: break down the receipt class
Browse files Browse the repository at this point in the history
  • Loading branch information
joaosa committed Jun 4, 2024
1 parent 1f08812 commit c045997
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 82 deletions.
6 changes: 2 additions & 4 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry, { AbortError } from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'
import { Receipt as ReceiptPoller } from './receipts.js'
import { poll } from './receipts.js'

/**
* @param {string} url
Expand Down Expand Up @@ -305,9 +305,7 @@ export async function add(
}

// Ensure the blob has been accepted
const acceptReceipt = await new ReceiptPoller(options).poll(
nextTasks.accept.task.link()
)
const acceptReceipt = await poll(nextTasks.accept.task.link(), options)

const blocks = new Map(
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [
Expand Down
2 changes: 1 addition & 1 deletion packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { indexShardedDAG } from '@web3-storage/blob-index'
export { Blob, Index, Store, Upload, UnixFS, CAR }
export * from './sharding.js'
export { receiptsEndpoint } from './service.js'
export { Receipt } from './receipts.js'
export * as Receipt from './receipts.js'

/**
* Uploads a file to the service and returns the root data CID for the
Expand Down
141 changes: 66 additions & 75 deletions packages/upload-client/src/receipts.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,86 +43,77 @@ export class ReceiptMissing extends Error {
}
}

export class Receipt {
/**
* @param {import('./types.js').RequestOptions} [options]
*/
constructor(options = {}) {
/* c8 ignore start */
this.receiptsEndpoint = options.receiptsEndpoint ?? receiptsEndpoint
this.retries = options.retries ?? REQUEST_RETRIES
this.fetch = options.fetch ?? globalThis.fetch.bind(globalThis)
/* c8 ignore stop */
}

/**
* Polls for a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @returns {Promise<import('@ucanto/interface').Receipt>}
*/
async poll(taskCid) {
return await retry(
async () => {
const res = await this.get(taskCid)
if (res.error) {
// @ts-ignore
if (res.error.name === 'ReceiptNotFound') {
// throw an error that will cause `p-retry` to retry with
throw res.error
} else {
throw new AbortError(
new Error('failed to fetch blob/accept receipt', {
cause: res.error,
})
)
}
/**
* Polls for a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('@ucanto/interface').Receipt>}
*/
export async function poll(taskCid, options = {}) {
return await retry(
async () => {
const res = await get(taskCid, options)
if (res.error) {
// @ts-ignore
if (res.error.name === 'ReceiptNotFound') {
// throw an error that will cause `p-retry` to retry with
throw res.error
} else {
throw new AbortError(
new Error('failed to fetch blob/accept receipt', {
cause: res.error,
})
)
}
return res.ok
},
{
onFailedAttempt: console.warn,
/* c8 ignore next */
retries: this.retries ?? REQUEST_RETRIES,
}
)
}

/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @returns {Promise<import('@ucanto/client').Result<import('@ucanto/interface').Receipt, Error>>}
*/
async get(taskCid) {
// Fetch receipt from endpoint
const url = new URL(taskCid.toString(), this.receiptsEndpoint)
const workflowResponse = await this.fetch(url)
/* c8 ignore start */
if (workflowResponse.status === 404) {
return {
error: new ReceiptNotFound(taskCid),
}
return res.ok
},
{
onFailedAttempt: console.warn,
/* c8 ignore next */
retries: options.retries ?? REQUEST_RETRIES,
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(
await workflowResponse.arrayBuffer()
)
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
const receipt = agentMessage.receipts.get(taskCid.toString())
if (!receipt) {
return {
error: new ReceiptMissing(taskCid),
}
)
}

/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('@ucanto/client').Result<import('@ucanto/interface').Receipt, Error>>}
*/
async function get(taskCid, options = {}) {
// Fetch receipt from endpoint
const url = new URL(
taskCid.toString(),
options.receiptsEndpoint ?? receiptsEndpoint
)
const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis)
const workflowResponse = await fetchReceipt(url)
/* c8 ignore start */
if (workflowResponse.status === 404) {
return {
error: new ReceiptNotFound(taskCid),
}
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer())
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
const receipt = agentMessage.receipts.get(taskCid.toString())
if (!receipt) {
return {
ok: receipt,
error: new ReceiptMissing(taskCid),
}
}
return {
ok: receipt,
}
}
3 changes: 1 addition & 2 deletions packages/w3up-client/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ export class Client extends Base {
*/
async getReceipt(taskCid) {
const receiptsEndpoint = new URL(this._receiptsEndpoint).toString()
const poller = new Receipt({ receiptsEndpoint })
return poller.poll(taskCid)
return Receipt.poll(taskCid, { receiptsEndpoint })
}

/**
Expand Down

0 comments on commit c045997

Please sign in to comment.