Skip to content

Commit

Permalink
fix: add space to piece record
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Aug 3, 2023
1 parent d2bc770 commit 5b15429
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 30 deletions.
6 changes: 5 additions & 1 deletion packages/capabilities/src/filecoin.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ export const pieceAdd = capability({
})
),
/**
* CID of the content that resulted in Filecoin piece.
* Space where the piece is aggregated
*/
space: Schema.text(),
/**
* Grouping for the piece to be aggregated
*/
group: Schema.text(),
}),
Expand Down
21 changes: 12 additions & 9 deletions packages/filecoin-api/src/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import { QueueOperationFailed, StoreOperationFailed } from './errors.js'
* @returns {Promise<API.UcantoInterface.Result<API.PieceAddSuccess, API.PieceAddFailure> | API.UcantoInterface.JoinBuilder<API.PieceAddSuccess>>}
*/
export const claim = async ({ capability }, context) => {
const { piece, group } = capability.nb
const { piece, space, group } = capability.nb
// Check if self signed to call queue handler
if (context.id.did() === capability.with) {
return queueHandler(piece, group, context)
return queueHandler(piece, space, group, context)
}

return queueAdd(piece, group, context)
return queueAdd(piece, space, group, context)
}

/**
* @param {import('@web3-storage/data-segment').PieceLink} piece
* @param {string} space
* @param {string} group
* @param {API.AggregatorServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.PieceAddSuccess, API.PieceAddFailure> | API.UcantoInterface.JoinBuilder<API.PieceAddSuccess>>}
*/
async function queueAdd(piece, group, context) {
async function queueAdd(piece, space, group, context) {
const queued = await context.addQueue.add(
{
piece,
},
{
group,
space,
group
}
)
if (queued.error) {
Expand All @@ -50,6 +50,7 @@ async function queueAdd(piece, group, context) {
with: context.id.did(),
nb: {
piece,
space,
group,
},
})
Expand All @@ -63,14 +64,16 @@ async function queueAdd(piece, group, context) {

/**
* @param {import('@web3-storage/data-segment').PieceLink} piece
* @param {string} space
* @param {string} group
* @param {API.AggregatorServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.PieceAddSuccess, API.PieceAddFailure> | API.UcantoInterface.JoinBuilder<API.PieceAddSuccess>>}
*/
async function queueHandler(piece, group, context) {
async function queueHandler(piece, space, group, context) {
const put = await context.pieceStore.put({
piece,
// TODO
space,
group
})

if (put.error) {
Expand Down
22 changes: 13 additions & 9 deletions packages/filecoin-api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
import type { ProviderInput } from '@ucanto/server'
import { PieceLink } from '@web3-storage/data-segment'
import { UnknownLink } from '@ucanto/interface'
import { DealConfig } from '@web3-storage/filecoin-client/types'

export * as UcantoInterface from '@ucanto/interface'
export * from '@web3-storage/filecoin-client/types'
Expand All @@ -27,41 +28,44 @@ export interface Store<Record> {
// Services
export interface StorefrontServiceContext {
id: Signer
addQueue: Queue<StorefrontQueueRecord>
addQueue: Queue<StorefrontRecord>
pieceStore: Store<StorefrontRecord>
aggregatorDid: string
aggregatorUrl: string
}

export interface AggregatorServiceContext {
id: Signer
addQueue: Queue<AggregatorQueueRecord>
pieceStore: Store<any>
addQueue: Queue<AggregatorRecord>
pieceStore: Store<AggregatorRecord>
brokerDid: string
brokerUrl: string
}

export interface BrokerServiceContext {
id: Signer
addQueue: Queue<any>
offerStore: Store<any>
addQueue: Queue<BrokerRecord>
offerStore: Store<BrokerRecord>
}

// Service Types

export interface StorefrontQueueRecord {
export interface StorefrontRecord {
piece: PieceLink
content: UnknownLink
// TODO: Source
}

export interface StorefrontRecord {
export interface AggregatorRecord {
piece: PieceLink
content: UnknownLink
space: string
group: string
}

export interface AggregatorQueueRecord {
export interface BrokerRecord {
piece: PieceLink
offer: PieceLink[]
deal: DealConfig
}

// Errors
Expand Down
17 changes: 12 additions & 5 deletions packages/filecoin-api/test/services/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import { createServer, connect } from '../../src/aggregator.js'

/**
* @type {API.Tests<API.AggregatorServiceContext & {
* addQueue: API.TestQueue<API.AggregatorQueueRecord>
* pieceStore: API.TestStore<any>
* addQueue: API.TestQueue<API.AggregatorRecord>
* pieceStore: API.TestStore<API.AggregatorRecord>
* }>}
*/
export const test = {
Expand All @@ -22,7 +22,8 @@ export const test = {

// Generate piece for test
const [cargo] = await randomCargo(1, 128)
const group = storefront.did()
const space = storefront.did()
const group = 'did:web:free.web3.storage'

// storefront invocation
const pieceAddInv = Filecoin.pieceAdd.invoke({
Expand All @@ -31,6 +32,7 @@ export const test = {
with: storefront.did(),
nb: {
piece: cargo.link.link(),
space,
group,
},
})
Expand All @@ -50,6 +52,7 @@ export const test = {
with: context.id.did(),
nb: {
piece: cargo.link.link(),
space,
group,
},
})
Expand Down Expand Up @@ -77,7 +80,8 @@ export const test = {

// Generate piece for test
const [cargo] = await randomCargo(1, 128)
const group = storefront.did()
const space = storefront.did()
const group = 'did:web:free.web3.storage'

// aggregator invocation
const pieceAddInv = Filecoin.pieceAdd.invoke({
Expand All @@ -86,6 +90,7 @@ export const test = {
with: context.id.did(),
nb: {
piece: cargo.link.link(),
space,
group,
},
})
Expand Down Expand Up @@ -114,7 +119,8 @@ export const test = {

// Generate piece for test
const [cargo] = await randomCargo(1, 128)
const group = storefront.did()
const space = storefront.did()
const group = 'did:web:free.web3.storage'

// aggregator invocation
const pieceAddInv = Filecoin.pieceAdd.invoke({
Expand All @@ -123,6 +129,7 @@ export const test = {
with: context.id.did(),
nb: {
piece: cargo.link.link(),
space,
group,
},
})
Expand Down
4 changes: 2 additions & 2 deletions packages/filecoin-api/test/services/broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { createServer, connect } from '../../src/broker.js'

/**
* @type {API.Tests<API.BrokerServiceContext & {
* addQueue: API.TestQueue<any>
* offerStore: API.TestStore<any>
* addQueue: API.TestQueue<API.BrokerRecord>
* offerStore: API.TestStore<API.BrokerRecord>
* }>}
*/
export const test = {
Expand Down
2 changes: 1 addition & 1 deletion packages/filecoin-api/test/services/storefront.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { createServer, connect } from '../../src/storefront.js'

/**
* @type {API.Tests<API.StorefrontServiceContext & {
* addQueue: API.TestQueue<API.StorefrontQueueRecord>
* addQueue: API.TestQueue<API.StorefrontRecord>
* pieceStore: API.TestStore<API.StorefrontRecord>
* }>}
*/
Expand Down
3 changes: 3 additions & 0 deletions packages/filecoin-client/src/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ export const connection = connect({
*
* @param {import('./types.js').InvocationConfig} conf - Configuration
* @param {import('@web3-storage/data-segment').PieceLink} piece
* @param {string} space
* @param {string} group
* @param {import('./types.js').RequestOptions} [options]
*/
export async function pieceAdd(
{ issuer, with: resource, proofs, audience },
piece,
space,
group,
options = {}
) {
Expand All @@ -43,6 +45,7 @@ export async function pieceAdd(
with: resource,
nb: {
group,
space,
piece,
},
proofs,
Expand Down
16 changes: 13 additions & 3 deletions packages/filecoin-client/test/aggregator.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ describe('piece.add', () => {

// Generate cargo to add
const [cargo] = await randomCargo(1, 100)
const group = 'group'
const space = storefront.did()
const group = 'did:web:free.web3.storage'

/** @type {import('@web3-storage/capabilities/types').PieceAddSuccess} */
const pieceAddResponse = {
Expand Down Expand Up @@ -67,6 +68,7 @@ describe('piece.add', () => {
audience: aggregatorService,
},
cargo.link.link(),
space,
group,
{ connection: getConnection(service).connection }
)
Expand All @@ -78,9 +80,12 @@ describe('piece.add', () => {
})

it('aggregator self invokes add a filecoin piece to accept the piece queued', async () => {
const { storefront } = await getContext()

// Generate cargo to add
const [cargo] = await randomCargo(1, 100)
const group = 'group'
const space = storefront.did()
const group = 'did:web:free.web3.storage'

/** @type {import('@web3-storage/capabilities/types').PieceAddSuccess} */
const pieceAddResponse = {
Expand Down Expand Up @@ -118,6 +123,7 @@ describe('piece.add', () => {
audience: aggregatorService,
},
cargo.link.link(),
space,
group,
{ connection: getConnection(service).connection }
)
Expand All @@ -129,9 +135,12 @@ describe('piece.add', () => {
})

it('aggregator self invokes add a filecoin piece to reject the piece queued', async () => {
const { storefront } = await getContext()

// Generate cargo to add
const [cargo] = await randomCargo(1, 100)
const group = 'group'
const space = storefront.did()
const group = 'did:web:free.web3.storage'

/** @type {import('@web3-storage/capabilities/types').PieceAddFailure} */
const pieceAddResponse = new OperationFailed(
Expand Down Expand Up @@ -171,6 +180,7 @@ describe('piece.add', () => {
audience: aggregatorService,
},
cargo.link.link(),
space,
group,
{ connection: getConnection(service).connection }
)
Expand Down

0 comments on commit 5b15429

Please sign in to comment.