diff --git a/packages/capabilities/src/filecoin.js b/packages/capabilities/src/filecoin.js index 2c4293d5d..8fb89b31f 100644 --- a/packages/capabilities/src/filecoin.js +++ b/packages/capabilities/src/filecoin.js @@ -83,7 +83,11 @@ export const pieceAdd = capability({ }) ), /** - * CID of the content that resulted in Filecoin piece. + * Space where the piece is aggregated + */ + space: Schema.text(), + /** + * Grouping for the piece to be aggregated */ group: Schema.text(), }), diff --git a/packages/filecoin-api/src/aggregator.js b/packages/filecoin-api/src/aggregator.js index 45af50fd5..562e49e1e 100644 --- a/packages/filecoin-api/src/aggregator.js +++ b/packages/filecoin-api/src/aggregator.js @@ -12,28 +12,28 @@ import { QueueOperationFailed, StoreOperationFailed } from './errors.js' * @returns {Promise | API.UcantoInterface.JoinBuilder>} */ export const claim = async ({ capability }, context) => { - const { piece, group } = capability.nb + const { piece, space, group } = capability.nb // Check if self signed to call queue handler if (context.id.did() === capability.with) { - return queueHandler(piece, group, context) + return queueHandler(piece, space, group, context) } - return queueAdd(piece, group, context) + return queueAdd(piece, space, group, context) } /** * @param {import('@web3-storage/data-segment').PieceLink} piece + * @param {string} space * @param {string} group * @param {API.AggregatorServiceContext} context * @returns {Promise | API.UcantoInterface.JoinBuilder>} */ -async function queueAdd(piece, group, context) { +async function queueAdd(piece, space, group, context) { const queued = await context.addQueue.add( { piece, - }, - { - group, + space, + group } ) if (queued.error) { @@ -50,6 +50,7 @@ async function queueAdd(piece, group, context) { with: context.id.did(), nb: { piece, + space, group, }, }) @@ -63,14 +64,16 @@ async function queueAdd(piece, group, context) { /** * @param {import('@web3-storage/data-segment').PieceLink} piece + * @param {string} space * @param {string} group * @param {API.AggregatorServiceContext} context * @returns {Promise | API.UcantoInterface.JoinBuilder>} */ -async function queueHandler(piece, group, context) { +async function queueHandler(piece, space, group, context) { const put = await context.pieceStore.put({ piece, - // TODO + space, + group }) if (put.error) { diff --git a/packages/filecoin-api/src/types.ts b/packages/filecoin-api/src/types.ts index 6a841715d..54962dabd 100644 --- a/packages/filecoin-api/src/types.ts +++ b/packages/filecoin-api/src/types.ts @@ -10,6 +10,7 @@ import type { import type { ProviderInput } from '@ucanto/server' import { PieceLink } from '@web3-storage/data-segment' import { UnknownLink } from '@ucanto/interface' +import { DealConfig } from '@web3-storage/filecoin-client/types' export * as UcantoInterface from '@ucanto/interface' export * from '@web3-storage/filecoin-client/types' @@ -27,7 +28,7 @@ export interface Store { // Services export interface StorefrontServiceContext { id: Signer - addQueue: Queue + addQueue: Queue pieceStore: Store aggregatorDid: string aggregatorUrl: string @@ -35,33 +36,36 @@ export interface StorefrontServiceContext { export interface AggregatorServiceContext { id: Signer - addQueue: Queue - pieceStore: Store + addQueue: Queue + pieceStore: Store brokerDid: string brokerUrl: string } export interface BrokerServiceContext { id: Signer - addQueue: Queue - offerStore: Store + addQueue: Queue + offerStore: Store } // Service Types -export interface StorefrontQueueRecord { +export interface StorefrontRecord { piece: PieceLink content: UnknownLink // TODO: Source } -export interface StorefrontRecord { +export interface AggregatorRecord { piece: PieceLink - content: UnknownLink + space: string + group: string } -export interface AggregatorQueueRecord { +export interface BrokerRecord { piece: PieceLink + offer: PieceLink[] + deal: DealConfig } // Errors diff --git a/packages/filecoin-api/test/services/aggregator.js b/packages/filecoin-api/test/services/aggregator.js index d2833ba6b..06e186bdb 100644 --- a/packages/filecoin-api/test/services/aggregator.js +++ b/packages/filecoin-api/test/services/aggregator.js @@ -8,8 +8,8 @@ import { createServer, connect } from '../../src/aggregator.js' /** * @type {API.Tests - * pieceStore: API.TestStore + * addQueue: API.TestQueue + * pieceStore: API.TestStore * }>} */ export const test = { @@ -22,7 +22,8 @@ export const test = { // Generate piece for test const [cargo] = await randomCargo(1, 128) - const group = storefront.did() + const space = storefront.did() + const group = 'did:web:free.web3.storage' // storefront invocation const pieceAddInv = Filecoin.pieceAdd.invoke({ @@ -31,6 +32,7 @@ export const test = { with: storefront.did(), nb: { piece: cargo.link.link(), + space, group, }, }) @@ -50,6 +52,7 @@ export const test = { with: context.id.did(), nb: { piece: cargo.link.link(), + space, group, }, }) @@ -77,7 +80,8 @@ export const test = { // Generate piece for test const [cargo] = await randomCargo(1, 128) - const group = storefront.did() + const space = storefront.did() + const group = 'did:web:free.web3.storage' // aggregator invocation const pieceAddInv = Filecoin.pieceAdd.invoke({ @@ -86,6 +90,7 @@ export const test = { with: context.id.did(), nb: { piece: cargo.link.link(), + space, group, }, }) @@ -114,7 +119,8 @@ export const test = { // Generate piece for test const [cargo] = await randomCargo(1, 128) - const group = storefront.did() + const space = storefront.did() + const group = 'did:web:free.web3.storage' // aggregator invocation const pieceAddInv = Filecoin.pieceAdd.invoke({ @@ -123,6 +129,7 @@ export const test = { with: context.id.did(), nb: { piece: cargo.link.link(), + space, group, }, }) diff --git a/packages/filecoin-api/test/services/broker.js b/packages/filecoin-api/test/services/broker.js index 1dfdb1ca1..e020d8344 100644 --- a/packages/filecoin-api/test/services/broker.js +++ b/packages/filecoin-api/test/services/broker.js @@ -9,8 +9,8 @@ import { createServer, connect } from '../../src/broker.js' /** * @type {API.Tests - * offerStore: API.TestStore + * addQueue: API.TestQueue + * offerStore: API.TestStore * }>} */ export const test = { diff --git a/packages/filecoin-api/test/services/storefront.js b/packages/filecoin-api/test/services/storefront.js index 0d443c535..cf90d1db3 100644 --- a/packages/filecoin-api/test/services/storefront.js +++ b/packages/filecoin-api/test/services/storefront.js @@ -8,7 +8,7 @@ import { createServer, connect } from '../../src/storefront.js' /** * @type {API.Tests + * addQueue: API.TestQueue * pieceStore: API.TestStore * }>} */ diff --git a/packages/filecoin-client/src/aggregator.js b/packages/filecoin-client/src/aggregator.js index dd36ae14a..23011ff64 100644 --- a/packages/filecoin-client/src/aggregator.js +++ b/packages/filecoin-client/src/aggregator.js @@ -24,12 +24,14 @@ export const connection = connect({ * * @param {import('./types.js').InvocationConfig} conf - Configuration * @param {import('@web3-storage/data-segment').PieceLink} piece + * @param {string} space * @param {string} group * @param {import('./types.js').RequestOptions} [options] */ export async function pieceAdd( { issuer, with: resource, proofs, audience }, piece, + space, group, options = {} ) { @@ -43,6 +45,7 @@ export async function pieceAdd( with: resource, nb: { group, + space, piece, }, proofs, diff --git a/packages/filecoin-client/test/aggregator.test.js b/packages/filecoin-client/test/aggregator.test.js index 33b59fa5d..f91735409 100644 --- a/packages/filecoin-client/test/aggregator.test.js +++ b/packages/filecoin-client/test/aggregator.test.js @@ -18,7 +18,8 @@ describe('piece.add', () => { // Generate cargo to add const [cargo] = await randomCargo(1, 100) - const group = 'group' + const space = storefront.did() + const group = 'did:web:free.web3.storage' /** @type {import('@web3-storage/capabilities/types').PieceAddSuccess} */ const pieceAddResponse = { @@ -67,6 +68,7 @@ describe('piece.add', () => { audience: aggregatorService, }, cargo.link.link(), + space, group, { connection: getConnection(service).connection } ) @@ -78,9 +80,12 @@ describe('piece.add', () => { }) it('aggregator self invokes add a filecoin piece to accept the piece queued', async () => { + const { storefront } = await getContext() + // Generate cargo to add const [cargo] = await randomCargo(1, 100) - const group = 'group' + const space = storefront.did() + const group = 'did:web:free.web3.storage' /** @type {import('@web3-storage/capabilities/types').PieceAddSuccess} */ const pieceAddResponse = { @@ -118,6 +123,7 @@ describe('piece.add', () => { audience: aggregatorService, }, cargo.link.link(), + space, group, { connection: getConnection(service).connection } ) @@ -129,9 +135,12 @@ describe('piece.add', () => { }) it('aggregator self invokes add a filecoin piece to reject the piece queued', async () => { + const { storefront } = await getContext() + // Generate cargo to add const [cargo] = await randomCargo(1, 100) - const group = 'group' + const space = storefront.did() + const group = 'did:web:free.web3.storage' /** @type {import('@web3-storage/capabilities/types').PieceAddFailure} */ const pieceAddResponse = new OperationFailed( @@ -171,6 +180,7 @@ describe('piece.add', () => { audience: aggregatorService, }, cargo.link.link(), + space, group, { connection: getConnection(service).connection } )