diff --git a/packages/filecoin-api/package.json b/packages/filecoin-api/package.json index e17676882..07c9366bd 100644 --- a/packages/filecoin-api/package.json +++ b/packages/filecoin-api/package.json @@ -159,6 +159,7 @@ "@ucanto/server": "^9.0.1", "@ucanto/transport": "^9.0.0", "@web3-storage/capabilities": "workspace:^", + "@web3-storage/content-claims": "^4.0.2", "@web3-storage/data-segment": "^4.0.0", "p-map": "^6.0.0" }, diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index 8e2f7d37f..1b84fe621 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -1,10 +1,13 @@ -import type { Signer, Principal, Link } from '@ucanto/interface' +import type { Signer, Principal, Link, ConnectionView } from '@ucanto/interface' +import { Failure, ServiceMethod } from '@ucanto/server' import { InclusionProof } from '@web3-storage/capabilities/types' import { PieceLink } from '@web3-storage/data-segment' import { AggregatorService, DealerService, + InvocationConfig, } from '@web3-storage/filecoin-client/types' +import { AssertInclusion } from '@web3-storage/content-claims/capability/api' import { Store, UpdatableStore, @@ -53,6 +56,20 @@ export interface ServiceContext { inclusionStore: InclusionStore } +export interface InvocationConfigWithRequiredAudience extends InvocationConfig { + /** + * The principal delegated to in the current UCAN. + */ + audience: Principal +} + +export interface ServiceConfigWithRequiredAudience< + T extends Record +> { + connection: ConnectionView + invocationConfig: InvocationConfigWithRequiredAudience +} + export interface PieceMessageContext extends Pick {} @@ -83,6 +100,23 @@ export interface InclusionInsertEventToIssuePieceAccept { aggregatorService: ServiceConfig } +export type AssertInclusionServiceMethod = ServiceMethod< + AssertInclusion, + {}, + Failure +> + +export interface InclusionInsertEventToIssueInclusionClaim { + /** + * Content claims connection to claim inclusion. + */ + assertService: ServiceConfigWithRequiredAudience<{ + assert: { + inclusion: AssertInclusionServiceMethod + } + }> +} + export interface AggregateInsertEventToPieceAcceptQueueContext { /** * Store of CID => Buffer Record @@ -226,6 +260,10 @@ export interface InclusionRecordKey export interface InclusionRecordQueryByGroup extends Pick {} +export interface InclusionRecordWithProof extends InclusionRecord { + proof: Link +} + export type BufferedPiece = { /** * Piece CID for the content. diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index 0bf74d9f9..ab947fbed 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -184,10 +184,12 @@ export function aggregatePieces(bufferedPieces, config) { const remainingBufferedPieces = [] // start by adding prepend buffered pieces if available - for (const bufferedPiece of (config.prependBufferedPieces || [])) { + for (const bufferedPiece of config.prependBufferedPieces || []) { const p = Piece.fromLink(bufferedPiece.piece) if (builder.estimate(p).error) { - throw new Error('aggregate builder is not able to create aggregates with only prepend buffered pieces') + throw new Error( + 'aggregate builder is not able to create aggregates with only prepend buffered pieces' + ) } builder.write(p) addedBufferedPieces.push(bufferedPiece) diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index 66b00c911..fbebf7aea 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -1,4 +1,5 @@ import { Aggregator, Dealer } from '@web3-storage/filecoin-client' +import { Assert } from '@web3-storage/content-claims/capability' import { Aggregate, Piece } from '@web3-storage/data-segment' import { CBOR } from '@ucanto/core' @@ -114,7 +115,7 @@ export const handleBufferQueueMessage = async (context, records) => { maxAggregateSize: context.config.maxAggregateSize, minAggregateSize: context.config.minAggregateSize, minUtilizationFactor: context.config.minUtilizationFactor, - prependBufferedPieces: context.config.prependBufferedPieces + prependBufferedPieces: context.config.prependBufferedPieces, }) // Store buffered pieces if not enough to do aggregate and re-queue them @@ -324,6 +325,42 @@ export const handleInclusionInsertToIssuePieceAccept = async ( return { ok: {} } } +/** + * Handle issueing inclusion claims once piece is included in an aggregate. + * + * @param {import('./api.js').InclusionInsertEventToIssueInclusionClaim} context + * @param {import('./api.js').InclusionRecordWithProof} record + */ +export const handleInclusionInsertToIssueInclusionClaim = async ( + context, + record +) => { + const claimResult = await Assert.inclusion + .invoke({ + issuer: context.assertService.invocationConfig.issuer, + audience: context.assertService.invocationConfig.audience, + with: context.assertService.invocationConfig.with, + nb: { + content: record.aggregate, + includes: record.piece, + proof: record.proof, + }, + expiration: Infinity, + proofs: context.assertService.invocationConfig.proofs, + }) + .execute(context.assertService.connection) + + if (claimResult.out.error) { + return { + error: claimResult.out.error, + } + } + + return { + ok: {}, + } +} + /** * On Aggregate store insert, offer inserted aggregate for deal. * diff --git a/packages/filecoin-api/test/aggregator.spec.js b/packages/filecoin-api/test/aggregator.spec.js index a496bb4ad..fe77afb5a 100644 --- a/packages/filecoin-api/test/aggregator.spec.js +++ b/packages/filecoin-api/test/aggregator.spec.js @@ -6,7 +6,11 @@ import * as AggregatorEvents from './events/aggregator.js' import { getStoreImplementations } from './context/store-implementations.js' import { Queue } from './context/queue.js' -import { getMockService, getConnection } from './context/service.js' +import { + getMockService, + getContentClaimsMockService, + getConnection, +} from './context/service.js' import { validateAuthorization } from './utils.js' describe('Aggregator', () => { @@ -67,13 +71,20 @@ describe('Aggregator', () => { define(name, async () => { const aggregatorSigner = await Signer.generate() const dealerSigner = await Signer.generate() + const assertSigner = await Signer.generate() const service = getMockService() + const contentClaimsService = getContentClaimsMockService() + const aggregatorConnection = getConnection( aggregatorSigner, service ).connection const dealerConnection = getConnection(dealerSigner, service).connection + const assertConnect = getConnection( + assertSigner, + contentClaimsService + ).connection // resources /** @type {Map} */ @@ -120,8 +131,17 @@ describe('Aggregator', () => { audience: aggregatorSigner, }, }, + assertService: { + connection: assertConnect, + invocationConfig: { + issuer: assertSigner, + with: assertSigner.did(), + audience: assertSigner, + }, + }, queuedMessages, service, + contentClaimsService, errorReporter: { catch(error) { assert.fail(error) diff --git a/packages/filecoin-api/test/context/mocks.js b/packages/filecoin-api/test/context/mocks.js index f554c7d77..2551699b6 100644 --- a/packages/filecoin-api/test/context/mocks.js +++ b/packages/filecoin-api/test/context/mocks.js @@ -4,6 +4,21 @@ const notImplemented = () => { throw new Server.Failure('not implemented') } +/** + * @param {{ + * assert: { + * inclusion: import('../../src/aggregator/api.js').AssertInclusionServiceMethod + * } + * }} impl + */ +export function mockContentClaimsService(impl) { + return { + assert: { + inclusion: withCallParams(impl.assert.inclusion ?? notImplemented), + }, + } +} + /** * @param {Partial<{ * filecoin: Partial diff --git a/packages/filecoin-api/test/context/service.js b/packages/filecoin-api/test/context/service.js index e614af6da..78c98d833 100644 --- a/packages/filecoin-api/test/context/service.js +++ b/packages/filecoin-api/test/context/service.js @@ -2,6 +2,7 @@ import * as Client from '@ucanto/client' import * as Server from '@ucanto/server' import * as CAR from '@ucanto/transport/car' +import * as AssertCaps from '@web3-storage/content-claims/capability' import * as StorefrontCaps from '@web3-storage/capabilities/filecoin/storefront' import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator' import * as DealerCaps from '@web3-storage/capabilities/filecoin/dealer' @@ -11,7 +12,7 @@ import * as DealTrackerCaps from '@web3-storage/capabilities/filecoin/deal-track import * as API from '../../src/types.js' import { validateAuthorization } from '../utils.js' -import { mockService } from './mocks.js' +import { mockService, mockContentClaimsService } from './mocks.js' export { getStoreImplementations } from './store-implementations.js' export { getQueueImplementations } from './queue-implementations.js' @@ -218,6 +219,19 @@ export function getMockService() { }) } +export function getContentClaimsMockService() { + return mockContentClaimsService({ + assert: { + inclusion: Server.provideAdvanced({ + capability: AssertCaps.Assert.inclusion, + handler: async ({ invocation, context }) => { + return Server.ok({}) + }, + }), + }, + }) +} + /** * @param {any} service * @param {any} id diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index d83671367..fc0b3d8fb 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -176,29 +176,32 @@ export const test = { bufferQueue: new FailingQueue(), }) ), - 'handles buffer queue messages repeated items as unique': async ( - assert, - context - ) => { - const group = context.id.did() - const { buffers, blocks } = await getBuffers(1, group) - - // Store buffers - for (let i = 0; i < blocks.length; i++) { - const putBufferRes = await context.bufferStore.put({ - buffer: buffers[i], - block: blocks[i].cid, - }) - assert.ok(putBufferRes.ok) - } - - const bufferedPieces = await getBufferedPieces( - [blocks[0].cid, blocks[0].cid], - context.bufferStore - ) + 'handles buffer queue messages repeated items as unique': async ( + assert, + context + ) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(1, group) - assert.equal(bufferedPieces.ok?.bufferedPieces.length, buffers[0].pieces.length) - }, + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } + + const bufferedPieces = await getBufferedPieces( + [blocks[0].cid, blocks[0].cid], + context.bufferStore + ) + + assert.equal( + bufferedPieces.ok?.bufferedPieces.length, + buffers[0].pieces.length + ) + }, 'handles buffer queue messages successfully to requeue bigger buffer': async ( assert, context @@ -397,77 +400,82 @@ export const test = { message.minPieceInsertedAt ) }, - 'handles buffer queue messages successfully to queue aggregate prepended with a buffer piece': async ( - assert, - context - ) => { - const group = context.id.did() - const { buffers, blocks } = await getBuffers(2, group, { - length: 100, - size: 128, - }) + 'handles buffer queue messages successfully to queue aggregate prepended with a buffer piece': + async (assert, context) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(2, group, { + length: 100, + size: 128, + }) - const [cargo] = await randomCargo(1, 128) - /** @type {import('../../src/aggregator/api.js').BufferedPiece} */ - const bufferedPiece = { - piece: cargo.link.link(), - policy: 0, - insertedAt: (new Date()).toISOString() - } + const [cargo] = await randomCargo(1, 128) + /** @type {import('../../src/aggregator/api.js').BufferedPiece} */ + const bufferedPiece = { + piece: cargo.link.link(), + policy: 0, + insertedAt: new Date().toISOString(), + } - const totalPieces = buffers.reduce((acc, v) => { - acc += v.pieces.length - return acc - }, 0) + const totalPieces = buffers.reduce((acc, v) => { + acc += v.pieces.length + return acc + }, 0) - // Store buffers - for (let i = 0; i < blocks.length; i++) { - const putBufferRes = await context.bufferStore.put({ - buffer: buffers[i], - block: blocks[i].cid, - }) - assert.ok(putBufferRes.ok) - } + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } - // Handle messages - const handledMessageRes = await AggregatorEvents.handleBufferQueueMessage( - { - ...context, - config: { - minAggregateSize: 2 ** 19, - minUtilizationFactor: 10e5, - maxAggregateSize: 2 ** 35, - prependBufferedPieces: [bufferedPiece] + // Handle messages + const handledMessageRes = await AggregatorEvents.handleBufferQueueMessage( + { + ...context, + config: { + minAggregateSize: 2 ** 19, + minUtilizationFactor: 10e5, + maxAggregateSize: 2 ** 35, + prependBufferedPieces: [bufferedPiece], + }, }, - }, - blocks.map((b) => ({ - pieces: b.cid, - group, - })) - ) - assert.ok(handledMessageRes.ok) - assert.equal(handledMessageRes.ok?.aggregatedPieces, totalPieces + 1) + blocks.map((b) => ({ + pieces: b.cid, + group, + })) + ) + assert.ok(handledMessageRes.ok) + assert.equal(handledMessageRes.ok?.aggregatedPieces, totalPieces + 1) - // Validate queue and store - await pWaitFor( - () => - context.queuedMessages.get('aggregateOfferQueue')?.length === 1 - ) + // Validate queue and store + await pWaitFor( + () => context.queuedMessages.get('aggregateOfferQueue')?.length === 1 + ) - /** @type {AggregateOfferMessage} */ - // @ts-expect-error cannot infer buffer message - const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] - const bufferGet = await context.bufferStore.get(message.buffer) - assert.ok(bufferGet.ok) - assert.ok(bufferGet.ok?.block.equals(message.buffer)) - assert.equal(bufferGet.ok?.buffer.group, group) - assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) - assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces + 1) + /** @type {AggregateOfferMessage} */ + // @ts-expect-error cannot infer buffer message + const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] + const bufferGet = await context.bufferStore.get(message.buffer) + assert.ok(bufferGet.ok) + assert.ok(bufferGet.ok?.block.equals(message.buffer)) + assert.equal(bufferGet.ok?.buffer.group, group) + assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) + assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces + 1) - // prepended piece - assert.ok(bufferGet.ok?.buffer.pieces.find(p => p.piece.link().equals(bufferedPiece.piece.link()))) - assert.ok(bufferGet.ok?.buffer.pieces[0].piece.link().equals(bufferedPiece.piece.link())) - }, + // prepended piece + assert.ok( + bufferGet.ok?.buffer.pieces.find((p) => + p.piece.link().equals(bufferedPiece.piece.link()) + ) + ) + assert.ok( + bufferGet.ok?.buffer.pieces[0].piece + .link() + .equals(bufferedPiece.piece.link()) + ) + }, 'handles buffer queue messages successfully to queue aggregate and remaining buffer': async (assert, context) => { const group = context.id.did() @@ -1228,6 +1236,64 @@ export const test = { } } ), + 'handles inclusion insert to issue inclusion claim successfully': async ( + assert, + context + ) => { + const group = context.id.did() + const { aggregate, pieces } = await randomAggregate(100, 128) + const piece = pieces[0].link + + // Create inclusion proof + const inclusionProof = aggregate.resolveProof(piece) + if (!inclusionProof.ok) { + throw new Error() + } + const inclusionBlock = await CBOR.write(inclusionProof.ok) + + // Insert inclusion + const inclusionRecord = { + aggregate: aggregate.link, + piece, + group, + inclusion: { + subtree: inclusionProof.ok[0], + index: inclusionProof.ok[1], + }, + insertedAt: new Date().toISOString(), + proof: inclusionBlock.cid, + } + + // Handle insert event + const handledMessageRes = + await AggregatorEvents.handleInclusionInsertToIssueInclusionClaim( + context, + inclusionRecord + ) + assert.ok(handledMessageRes.ok) + + // Verify invocation + // @ts-expect-error not typed hooks + assert.equal(context.contentClaimsService?.assert.inclusion.callCount, 1) + assert.ok( + inclusionRecord.piece.equals( + // @ts-expect-error not typed hooks + context.contentClaimsService?.assert.inclusion._params[0].nb.includes + ) + ) + assert.ok( + inclusionRecord.aggregate.equals( + // @ts-expect-error not typed hooks + context.contentClaimsService?.assert.inclusion._params[0].nb.content + ) + ) + assert.ok( + inclusionRecord.proof.equals( + // @ts-expect-error not typed hooks + context.contentClaimsService?.assert.inclusion._params[0].nb.proof + ) + ) + }, 'handles aggregate insert to invoke aggregate offer successfully': async ( assert, context diff --git a/packages/filecoin-api/test/types.ts b/packages/filecoin-api/test/types.ts index d17cda9b6..59706ba66 100644 --- a/packages/filecoin-api/test/types.ts +++ b/packages/filecoin-api/test/types.ts @@ -1,4 +1,5 @@ import type { Signer } from '@ucanto/interface' +import { AssertInclusionServiceMethod } from '../src/aggregator/api.js' import * as AggregatorInterface from '../src/aggregator/api.js' import * as DealerInterface from '../src/dealer/api.js' import * as StorefrontInterface from '../src/storefront/api.js' @@ -10,6 +11,7 @@ export interface AggregatorTestEventsContext AggregatorInterface.PieceInsertEventContext, AggregatorInterface.InclusionInsertEventToUpdateState, AggregatorInterface.InclusionInsertEventToIssuePieceAccept, + AggregatorInterface.InclusionInsertEventToIssueInclusionClaim, AggregatorInterface.AggregateInsertEventToAggregateOfferContext, AggregatorInterface.AggregateInsertEventToPieceAcceptQueueContext, AggregatorInterface.BufferMessageContext { @@ -20,6 +22,11 @@ export interface AggregatorTestEventsContext aggregate: Partial deal: Partial }> + contentClaimsService: { + assert: { + inclusion: AssertInclusionServiceMethod + } + } } export interface DealerTestEventsContext diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ddf197f1c..d52695ec9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -257,6 +257,9 @@ importers: '@web3-storage/capabilities': specifier: workspace:^ version: link:../capabilities + '@web3-storage/content-claims': + specifier: 4.0.2 + version: 4.0.2 '@web3-storage/data-segment': specifier: ^4.0.0 version: 4.0.0 @@ -4388,6 +4391,17 @@ packages: web-streams-polyfill: 3.2.1 dev: false + /@web3-storage/content-claims@4.0.2: + resolution: {integrity: sha512-k6tIc7YjQtdKWi01r7+5stp2lo13ztwpIz+7NQYEbu5fZEsKKes5B4FKRqPWkZYO17+rPaihOY6sICT498c9EA==} + dependencies: + '@ucanto/client': 9.0.0 + '@ucanto/interface': 9.0.0 + '@ucanto/server': 9.0.1 + '@ucanto/transport': 9.0.0 + carstream: 1.1.1 + multiformats: 12.1.3 + dev: false + /@web3-storage/data-segment@3.2.0: resolution: {integrity: sha512-SM6eNumXzrXiQE2/J59+eEgCRZNYPxKhRoHX2QvV3/scD4qgcf4g+paWBc3UriLEY1rCboygGoPsnqYJNyZyfA==} dependencies: @@ -5225,6 +5239,14 @@ packages: redeyed: 2.1.1 dev: true + /carstream@1.1.1: + resolution: {integrity: sha512-cgn3TqHo6SPsHBTfM5QgXngv6HtwgO1bKCHcdS35vBrweLcYrIG/+UboCbvnIGA0k8NtAYl/DvDdej/9pZGZxQ==} + dependencies: + '@ipld/dag-cbor': 9.0.6 + multiformats: 12.1.3 + uint8arraylist: 2.4.8 + dev: false + /cborg@4.0.5: resolution: {integrity: sha512-q8TAjprr8pn9Fp53rOIGp/UFDdFY6os2Nq62YogPSIzczJD9M6g2b6igxMkpCiZZKJ0kn/KzDLDvG+EqBIEeCg==} hasBin: true @@ -9623,6 +9645,10 @@ packages: resolution: {integrity: sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} + /multiformats@13.0.1: + resolution: {integrity: sha512-bt3R5iXe2O8xpp3wkmQhC73b/lC4S2ihU8Dndwcsysqbydqb8N+bpP116qMcClZ17g58iSIwtXUTcg2zT4sniA==} + dev: false + /multimatch@5.0.0: resolution: {integrity: sha512-ypMKuglUrZUD99Tk2bUQ+xNQj43lPEfAeX2o9cTteAmShXy2VHDJpuwu1o0xqoKCt9jLVAvwyFKdLTPXKAfJyA==} engines: {node: '>=10'} @@ -12437,11 +12463,23 @@ packages: dev: true optional: true + /uint8arraylist@2.4.8: + resolution: {integrity: sha512-vc1PlGOzglLF0eae1M8mLRTBivsvrGsdmJ5RbK3e+QRvRLOZfZhQROTwH/OfyF3+ZVUg9/8hE8bmKP2CvP9quQ==} + dependencies: + uint8arrays: 5.0.2 + dev: false + /uint8arrays@4.0.6: resolution: {integrity: sha512-4ZesjQhqOU2Ip6GPReIwN60wRxIupavL8T0Iy36BBHr2qyMrNxsPJvr7vpS4eFt8F8kSguWUPad6ZM9izs/vyw==} dependencies: multiformats: 12.1.3 + /uint8arrays@5.0.2: + resolution: {integrity: sha512-S0GaeR+orZt7LaqzTRs4ZP8QqzAauJ+0d4xvP2lJTA99jIkKsE2FgDs4tGF/K/z5O9I/2W5Yvrh7IuqNeYH+0Q==} + dependencies: + multiformats: 13.0.1 + dev: false + /unbox-primitive@1.0.2: resolution: {integrity: sha512-61pPlCD9h51VoreyJ0BReideM3MDKMKnh6+V9L08331ipq6Q8OFXZYiqP6n/tbHx4s5I9uRhcye6BrbkizkBDw==} dependencies: