Skip to content

Commit

Permalink
feat(filecoin-api): paginated queries (#1521)
Browse files Browse the repository at this point in the history
This PR updates filecoin-api to add pagination to the query interfaces.
Currently only the first page of results is returned.

This means in the fileocin pipeline cron jobs we never process more than
1 page of items, even though there are sometimes many more items to
process and ample execution time remaining in the lambda.

If only the first page of items is processed it gives a false sense of
everything operating ok, when actually things may be backing up. It
essentially smothers the issue. If the lambda times out because there
are too many items to process then this is a good indication that it
needs some attention.

Essentially, the interface changes from:

```ts
interface Queryable<Q, T> {
  query (q: Q): Promise<Result<T[]>>
}
```

To this:

```ts
interface Pageable {
  cursor?: string
  size?: number
}

interface ListSuccess<T> {
  results: T[]
  cursor?: string
}

interface Queryable<Q, T> {
  query (q: Q, options?: Pageable): Promise<Result<ListSuccess<T>>>
}
```

Context: we currently have a _lot_ of `filecoin/accept` receipts we need
to issue that are being done in a cron job.
  • Loading branch information
alanshaw committed Jul 23, 2024
1 parent 9421a2e commit 25ed7d7
Show file tree
Hide file tree
Showing 21 changed files with 775 additions and 754 deletions.
4 changes: 3 additions & 1 deletion packages/filecoin-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@
"check": "tsc --build",
"lint": "tsc --build && eslint '**/*.{js,ts}'",
"test": "mocha --bail --timeout 10s -n no-warnings -n experimental-vm-modules -n experimental-fetch test/**/*.spec.js",
"test-watch": "pnpm build && mocha --bail --timeout 10s --watch --parallel -n no-warnings -n experimental-vm-modules -n experimental-fetch --watch-files src,test"
"test-watch": "pnpm build && mocha --bail --timeout 10s --watch --parallel -n no-warnings -n experimental-vm-modules -n experimental-fetch --watch-files src,test",
"coverage": "c8 -r text -r html npm run test"
},
"dependencies": {
"@ipld/dag-ucan": "^3.4.0",
Expand All @@ -172,6 +173,7 @@
"@web-std/blob": "^3.0.5",
"@web3-storage/eslint-config-w3up": "workspace:^",
"@web3-storage/filecoin-client": "workspace:^",
"c8": "^10.1.2",
"mocha": "^10.2.0",
"multiformats": "^12.1.2",
"one-webcrypto": "git://github.com/web3-storage/one-webcrypto",
Expand Down
10 changes: 4 additions & 6 deletions packages/filecoin-api/src/aggregator/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ import {
ServiceConfig,
} from '../types.js'

export type PieceStore = UpdatableStore<PieceRecordKey, PieceRecord>
export type PieceStore = Store<PieceRecordKey, PieceRecord> &
UpdatableStore<PieceRecordKey, PieceRecord>
export type PieceQueue = Queue<PieceMessage>
export type BufferQueue = Queue<BufferMessage>
export type BufferStore = Store<Link, BufferRecord>
export type AggregateStore = Store<AggregateRecordKey, AggregateRecord>
export type PieceAcceptQueue = Queue<PieceAcceptMessage>
export type InclusionStore = QueryableStore<
InclusionRecordKey,
InclusionRecord,
InclusionRecordQueryByGroup
>
export type InclusionStore = Store<InclusionRecordKey, InclusionRecord> &
QueryableStore<InclusionRecordQueryByGroup, InclusionRecord>
export type AggregateOfferQueue = Queue<AggregateOfferMessage>

export interface ServiceContext {
Expand Down
4 changes: 2 additions & 2 deletions packages/filecoin-api/src/aggregator/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export const pieceAccept = async ({ capability }, context) => {
error: new StoreOperationFailed(getInclusionRes.error?.message),
}
}
if (!getInclusionRes.ok.length) {
if (!getInclusionRes.ok.results.length) {
return {
error: new UnexpectedState(
`no inclusion proof found for pair {${piece}, ${group}}`
Expand All @@ -91,7 +91,7 @@ export const pieceAccept = async ({ capability }, context) => {
}

// Get buffered pieces
const [{ aggregate, inclusion }] = getInclusionRes.ok
const [{ aggregate, inclusion }] = getInclusionRes.ok.results
const getAggregateRes = await context.aggregateStore.get({ aggregate })
if (getAggregateRes.error) {
return {
Expand Down
9 changes: 3 additions & 6 deletions packages/filecoin-api/src/deal-tracker/api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import type { Signer } from '@ucanto/interface'
import { PieceLink } from '@web3-storage/data-segment'
import { QueryableStore } from '../types.js'
import { Store, QueryableStore } from '../types.js'

export type DealStore = QueryableStore<
DealRecordKey,
DealRecord,
DealRecordQueryByPiece
>
export type DealStore = Store<DealRecordKey, DealRecord> &
QueryableStore<DealRecordQueryByPiece, DealRecord>

export interface ServiceContext {
/**
Expand Down
18 changes: 12 additions & 6 deletions packages/filecoin-api/src/deal-tracker/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ import { StoreOperationFailed } from '../errors.js'
export const dealInfo = async ({ capability }, context) => {
const { piece } = capability.nb

const storeGet = await context.dealStore.query({ piece })
if (storeGet.error) {
return {
error: new StoreOperationFailed(storeGet.error.message),
const records = []
/** @type {string|undefined} */
let cursor
do {
const storeQuery = await context.dealStore.query({ piece }, { cursor })
if (storeQuery.error) {
return { error: new StoreOperationFailed(storeQuery.error.message) }
}
}

records.push(...storeQuery.ok.results)
cursor = storeQuery.ok.cursor
} while (cursor)

return {
ok: {
deals: storeGet.ok.reduce((acc, curr) => {
deals: records.reduce((acc, curr) => {
acc[`${curr.dealId}`] = {
provider: curr.provider,
}
Expand Down
14 changes: 7 additions & 7 deletions packages/filecoin-api/src/dealer/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import {
DealTrackerService,
} from '@web3-storage/filecoin-client/types'
import {
Store,
UpdatableStore,
UpdatableAndQueryableStore,
QueryableStore,
ServiceConfig,
} from '../types.js'

export type OfferStore<OfferDoc> = UpdatableStore<string, OfferDoc>
export type AggregateStore = UpdatableAndQueryableStore<
AggregateRecordKey,
AggregateRecord,
Pick<AggregateRecord, 'status'>
>
export type OfferStore<OfferDoc> = Store<string, OfferDoc> &
UpdatableStore<string, OfferDoc>
export type AggregateStore = Store<AggregateRecordKey, AggregateRecord> &
UpdatableStore<AggregateRecordKey, AggregateRecord> &
QueryableStore<Pick<AggregateRecord, 'status'>, AggregateRecord>

export interface ServiceContext<OfferDoc = OfferDocument> {
id: Signer
Expand Down
75 changes: 45 additions & 30 deletions packages/filecoin-api/src/dealer/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { StoreOperationFailed } from '../errors.js'
* @typedef {import('./api.js').AggregateRecordKey} AggregateRecordKey
*/

/** Max items per page of query. */
const MAX_PAGE_SIZE = 20

/**
* On aggregate insert event, update offer key with date to be retrievable by broker.
*
Expand Down Expand Up @@ -55,45 +58,57 @@ export const handleAggregateUpdatedStatus = async (context, record) => {
* @param {import('./api.js').CronContext} context
*/
export const handleCronTick = async (context) => {
// Get offered deals pending approval/rejection
const offeredDeals = await context.aggregateStore.query({
status: 'offered',
})
if (offeredDeals.error) {
return {
error: offeredDeals.error,
let totalDealsCount = 0
let updatedDealsCount = 0
/** @type {string|undefined} */
let cursor
do {
// Get offered deals pending approval/rejection
const offeredDeals = await context.aggregateStore.query(
{
status: 'offered',
},
{ cursor, size: MAX_PAGE_SIZE }
)
if (offeredDeals.error) {
return {
error: offeredDeals.error,
}
}
}

// Update approved deals from the ones resolved
const updatedResponses = await Promise.all(
offeredDeals.ok.map((deal) =>
updateApprovedDeals({
deal,
aggregateStore: context.aggregateStore,
dealTrackerServiceConnection: context.dealTrackerService.connection,
dealTrackerInvocationConfig:
context.dealTrackerService.invocationConfig,
})
totalDealsCount += offeredDeals.ok.results.length

// Update approved deals from the ones resolved
const updatedResponses = await Promise.all(
offeredDeals.ok.results.map((deal) =>
updateApprovedDeals({
deal,
aggregateStore: context.aggregateStore,
dealTrackerServiceConnection: context.dealTrackerService.connection,
dealTrackerInvocationConfig:
context.dealTrackerService.invocationConfig,
})
)
)
)

// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
}
}
}

updatedDealsCount += updatedResponses.filter((r) => r.ok?.updated).length
cursor = offeredDeals.ok.cursor
} while (cursor)

// Return successful update operation
// Include in response the ones that were Updated, and the ones still pending response.
const updatedDealsCount = updatedResponses.filter((r) => r.ok?.updated).length
return {
ok: {
updatedCount: updatedDealsCount,
pendingCount: updatedResponses.length - updatedDealsCount,
pendingCount: totalDealsCount - updatedDealsCount,
},
}
}
Expand All @@ -103,7 +118,7 @@ export const handleCronTick = async (context) => {
*
* @param {object} context
* @param {AggregateRecord} context.deal
* @param {import('../types.js').UpdatableAndQueryableStore<AggregateRecordKey, AggregateRecord, Pick<AggregateRecord, 'status'>>} context.aggregateStore
* @param {import('../types.js').UpdatableStore<AggregateRecordKey, AggregateRecord>} context.aggregateStore
* @param {import('@ucanto/interface').ConnectionView<any>} context.dealTrackerServiceConnection
* @param {import('@web3-storage/filecoin-client/types').InvocationConfig} context.dealTrackerInvocationConfig
*/
Expand Down
11 changes: 5 additions & 6 deletions packages/filecoin-api/src/storefront/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ import {
} from '@web3-storage/filecoin-client/types'
import {
Store,
UpdatableAndQueryableStore,
UpdatableStore,
QueryableStore,
Queue,
ServiceConfig,
StoreGetError,
} from '../types.js'

export type PieceStore = UpdatableAndQueryableStore<
PieceRecordKey,
PieceRecord,
Pick<PieceRecord, 'status'>
>
export type PieceStore = Store<PieceRecordKey, PieceRecord> &
UpdatableStore<PieceRecordKey, PieceRecord> &
QueryableStore<Pick<PieceRecord, 'status'>, PieceRecord>
export type FilecoinSubmitQueue = Queue<FilecoinSubmitMessage>
export type PieceOfferQueue = Queue<PieceOfferMessage>
export type TaskStore = Store<UnknownLink, Invocation>
Expand Down
82 changes: 46 additions & 36 deletions packages/filecoin-api/src/storefront/events.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pMap from 'p-map'
import { Storefront, Aggregator } from '@web3-storage/filecoin-client'
import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator'
import { Assert } from '@web3-storage/content-claims/capability'
Expand All @@ -17,9 +16,12 @@ import {
/**
* @typedef {import('./api.js').PieceRecord} PieceRecord
* @typedef {import('./api.js').PieceRecordKey} PieceRecordKey
* @typedef {import('../types.js').UpdatableAndQueryableStore<PieceRecordKey, PieceRecord, Pick<PieceRecord, 'status'>>} PieceStore
* @typedef {import('./api.js').PieceStore} PieceStore
*/

/** Max items per page of query. */
const MAX_PAGE_SIZE = 20

/**
* On filecoin submit queue messages, validate piece for given content and store it in store.
*
Expand Down Expand Up @@ -185,49 +187,57 @@ export const handlePieceStatusUpdate = async (context, record) => {
* @param {import('./api.js').CronContext} context
*/
export const handleCronTick = async (context) => {
const submittedPieces = await context.pieceStore.query({
status: 'submitted',
})
if (submittedPieces.error) {
return {
error: submittedPieces.error,
}
}
// Update approved pieces from the ones resolved
const updatedResponses = await pMap(
submittedPieces.ok,
(pieceRecord) =>
updatePiecesWithDeal({
id: context.id,
aggregatorId: context.aggregatorId,
pieceRecord,
pieceStore: context.pieceStore,
taskStore: context.taskStore,
receiptStore: context.receiptStore,
}),
{
concurrency: 20,
let totalPiecesCount = 0
let updatedPiecesCount = 0
/** @type {string|undefined} */
let cursor
do {
const submittedPieces = await context.pieceStore.query(
{
status: 'submitted',
},
{ cursor, size: MAX_PAGE_SIZE }
)
if (submittedPieces.error) {
return {
error: submittedPieces.error,
}
}
)
totalPiecesCount += submittedPieces.ok.results.length

// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
// Update approved pieces from the ones resolved
const updatedResponses = await Promise.all(
submittedPieces.ok.results.map((pieceRecord) =>
updatePiecesWithDeal({
id: context.id,
aggregatorId: context.aggregatorId,
pieceRecord,
pieceStore: context.pieceStore,
taskStore: context.taskStore,
receiptStore: context.receiptStore,
})
)
)

// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
}
}
}

updatedPiecesCount += updatedResponses.filter((r) => r.ok?.updated).length
cursor = submittedPieces.ok.cursor
} while (cursor)

// Return successful update operation
// Include in response the ones that were Updated, and the ones still pending response.
const updatedPiecesCount = updatedResponses.filter(
(r) => r.ok?.updated
).length
return {
ok: {
updatedCount: updatedPiecesCount,
pendingCount: updatedResponses.length - updatedPiecesCount,
pendingCount: totalPiecesCount - updatedPiecesCount,
},
}
}
Expand Down
Loading

0 comments on commit 25ed7d7

Please sign in to comment.