diff --git a/package-lock.json b/package-lock.json index 08e7d4ea..0dc270f1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6078,6 +6078,135 @@ "@noble/hashes": "^1.0.0" } }, + "node_modules/@web3-storage/upload-api": { + "version": "9.1.4", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-9.1.4.tgz", + "integrity": "sha512-ctrM+pq5Z18JsDMVlhvzfBz4luwJ1b5l+Iv8wAz2wsYZQ3Iguky4Xtpfi1pcKP3uCCR6j9yg1Q9VT6PFrAUyEw==", + "dependencies": { + "@ucanto/client": "^9.0.1", + "@ucanto/interface": "^10.0.1", + "@ucanto/principal": "^9.0.1", + "@ucanto/server": "^10.0.0", + "@ucanto/transport": "^9.1.1", + "@ucanto/validator": "^9.0.2", + "@web3-storage/access": "^18.3.0", + "@web3-storage/capabilities": "^13.3.1", + "@web3-storage/content-claims": "^4.0.4", + "@web3-storage/did-mailto": "^2.1.0", + "@web3-storage/filecoin-api": "^4.6.1", + "multiformats": "^12.1.2", + "p-retry": "^5.1.2", + "uint8arrays": "^5.0.3" + }, + "engines": { + "node": ">=16.15" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/@types/retry": { + "version": "0.12.1", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.1.tgz", + "integrity": "sha512-xoDlM2S4ortawSWORYqsdU+2rxdh4LRW9ytc3zmT37RIKQh6IHyKwwtKhKis9ah8ol07DCkZxPt8BBvPjC6v4g==" + }, + "node_modules/@web3-storage/upload-api/node_modules/@ucanto/core": { + "version": "10.0.1", + "resolved": "https://registry.npmjs.org/@ucanto/core/-/core-10.0.1.tgz", + "integrity": "sha512-1BfUaJu0/c9Rl/WdZSDbScJJLsPsPe1g4ynl5kubUj3xDD/lyp/Q12PQVQ2X7hDiWwkpwmxCkRMkOxwc70iNKQ==", + "dependencies": { + "@ipld/car": "^5.1.0", + "@ipld/dag-cbor": "^9.0.0", + "@ipld/dag-ucan": "^3.4.0", + "@ucanto/interface": "^10.0.1", + "multiformats": "^11.0.2" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/@ucanto/core/node_modules/multiformats": { + "version": "11.0.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", + "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/@ucanto/interface": { + "version": "10.0.1", + "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-10.0.1.tgz", + "integrity": "sha512-+Vr/N4mLsdynV9/bqtdFiq7WsUf3265/Qx2aHJmPtXo9/QvWKthJtpe0g8U4NWkWpVfqIFvyAO2db6D9zWQfQw==", + "dependencies": { + "@ipld/dag-ucan": "^3.4.0", + "multiformats": "^11.0.2" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/@ucanto/interface/node_modules/multiformats": { + "version": "11.0.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", + "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/@ucanto/server": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/@ucanto/server/-/server-10.0.0.tgz", + "integrity": "sha512-JMDMT3tFRE0S1cdtx/Hhh7v9FizV6IS0fPrh6pcli7AzKvXVy8Xu6EQ/66Fax4AQM2tkGxNNxjj2wHM7P4CqAg==", + "dependencies": { + "@ucanto/core": "^10.0.0", + "@ucanto/interface": "^10.0.0", + "@ucanto/principal": "^9.0.0", + "@ucanto/validator": "^9.0.1" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/@web3-storage/content-claims": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/@web3-storage/content-claims/-/content-claims-4.0.4.tgz", + "integrity": "sha512-zt5psR3SkLbPPHzGzhFXYSJEssDl/ELYbNhEez+tNZLZiagv3Vl0RSt+x3CFFgR5ovO6Zn+pLJJcMjpMiHw0Yw==", + "dependencies": { + "@ucanto/client": "^9.0.1", + "@ucanto/interface": "^10.0.0", + "@ucanto/server": "^10.0.0", + "@ucanto/transport": "^9.1.1", + "carstream": "^1.0.2", + "multiformats": "^12.0.1" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/multiformats": { + "version": "12.1.3", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-12.1.3.tgz", + "integrity": "sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/p-retry": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.1.2.tgz", + "integrity": "sha512-couX95waDu98NfNZV+i/iLt+fdVxmI7CbrrdC2uDWfPdUAApyxT4wmDlyOtR5KtTDmkDO0zDScDjDou9YHhd9g==", + "dependencies": { + "@types/retry": "0.12.1", + "retry": "^0.13.1" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/uint8arrays": { + "version": "5.0.3", + "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.0.3.tgz", + "integrity": "sha512-6LBuKji28kHjgPJMkQ6GDaBb1lRwIhyOYq6pDGwYMoDPfImE9SkuYENVmR0yu9yGgs2clHUSY9fKDukR+AXfqQ==", + "dependencies": { + "multiformats": "^13.0.0" + } + }, + "node_modules/@web3-storage/upload-api/node_modules/uint8arrays/node_modules/multiformats": { + "version": "13.1.0", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-13.1.0.tgz", + "integrity": "sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==" + }, "node_modules/@web3-storage/upload-client": { "version": "13.0.1", "resolved": "https://registry.npmjs.org/@web3-storage/upload-client/-/upload-client-13.0.1.tgz", @@ -18385,7 +18514,7 @@ "@web3-storage/access": "^18.3.0", "@web3-storage/capabilities": "^13.3.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "9.0.1", + "@web3-storage/upload-api": "^9.1.4", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "preact": "^10.14.1", @@ -18413,11 +18542,6 @@ "node": ">=16.15" } }, - "upload-api/node_modules/@types/retry": { - "version": "0.12.1", - "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.1.tgz", - "integrity": "sha512-xoDlM2S4ortawSWORYqsdU+2rxdh4LRW9ytc3zmT37RIKQh6IHyKwwtKhKis9ah8ol07DCkZxPt8BBvPjC6v4g==" - }, "upload-api/node_modules/@ucanto/core": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@ucanto/core/-/core-10.0.1.tgz", @@ -18487,52 +18611,6 @@ "web-streams-polyfill": "^3.1.1" } }, - "upload-api/node_modules/@web3-storage/upload-api": { - "version": "9.0.1", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-9.0.1.tgz", - "integrity": "sha512-DbxQPmdwu8awZbPoMnn44mJiO3UTV7OtIRDnW2GzwNKQDADMKFHnt0cUqsUrG7SiaGA80dOthnrLsIYIHpZ80A==", - "dependencies": { - "@ucanto/client": "^9.0.1", - "@ucanto/interface": "^10.0.1", - "@ucanto/principal": "^9.0.1", - "@ucanto/server": "^10.0.0", - "@ucanto/transport": "^9.1.1", - "@ucanto/validator": "^9.0.2", - "@web3-storage/access": "^18.2.0", - "@web3-storage/capabilities": "^13.2.1", - "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-api": "^4.6.1", - "multiformats": "^12.1.2", - "p-retry": "^5.1.2" - }, - "engines": { - "node": ">=16.15" - } - }, - "upload-api/node_modules/@web3-storage/upload-api/node_modules/multiformats": { - "version": "12.1.3", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-12.1.3.tgz", - "integrity": "sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==", - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, - "upload-api/node_modules/@web3-storage/upload-api/node_modules/p-retry": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.1.2.tgz", - "integrity": "sha512-couX95waDu98NfNZV+i/iLt+fdVxmI7CbrrdC2uDWfPdUAApyxT4wmDlyOtR5KtTDmkDO0zDScDjDou9YHhd9g==", - "dependencies": { - "@types/retry": "0.12.1", - "retry": "^0.13.1" - }, - "engines": { - "node": "^12.20.0 || ^14.13.1 || >=16.0.0" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "upload-api/node_modules/nanoid": { "version": "5.0.6", "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-5.0.6.tgz", diff --git a/stacks/upload-api-stack.js b/stacks/upload-api-stack.js index a46a0a19..5cace064 100644 --- a/stacks/upload-api-stack.js +++ b/stacks/upload-api-stack.js @@ -25,7 +25,7 @@ export function UploadApiStack({ stack, app }) { // Get references to constructs created in other stacks const { carparkBucket } = use(CarparkStack) - const { storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack) + const { allocationTable, storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack) const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack) const { customerTable, spaceDiffTable, spaceSnapshotTable, stripeSecretKey } = use(BillingDbStack) const { pieceOfferQueue, filecoinSubmitQueue } = use(FilecoinStack) @@ -41,6 +41,7 @@ export function UploadApiStack({ stack, app }) { defaults: { function: { permissions: [ + allocationTable, storeTable, uploadTable, customerTable, @@ -66,6 +67,7 @@ export function UploadApiStack({ stack, app }) { environment: { DID: process.env.UPLOAD_API_DID ?? '', AGGREGATOR_DID, + ALLOCATION_TABLE_NAME: allocationTable.tableName, STORE_TABLE_NAME: storeTable.tableName, STORE_BUCKET_NAME: carparkBucket.bucketName, UPLOAD_TABLE_NAME: uploadTable.tableName, @@ -92,6 +94,7 @@ export function UploadApiStack({ stack, app }) { COMMIT: git.commmit, STAGE: stack.stage, ACCESS_SERVICE_URL: getServiceURL(stack) ?? '', + UPLOAD_SERVICE_URL: customDomain?.domainName ? `https://${customDomain?.domainName}` : '', POSTMARK_TOKEN: process.env.POSTMARK_TOKEN ?? '', PROVIDERS: process.env.PROVIDERS ?? '', R2_ACCESS_KEY_ID: process.env.R2_ACCESS_KEY_ID ?? '', diff --git a/stacks/upload-db-stack.js b/stacks/upload-db-stack.js index b7ee2b29..903ba7f3 100644 --- a/stacks/upload-db-stack.js +++ b/stacks/upload-db-stack.js @@ -1,6 +1,7 @@ import { Table, Bucket, Config } from 'sst/constructs' import { + allocationTableProps, storeTableProps, uploadTableProps, consumerTableProps, @@ -31,6 +32,12 @@ export function UploadDbStack({ stack, app }) { // TODO: we should look into creating a trust layer for content claims const contentClaimsPrivateKey = new Config.Secret(stack, 'CONTENT_CLAIMS_PRIVATE_KEY') + /** + * The allocation table tracks allocated multihashes per space. + * Used by the blob/* service capabilities. + */ + const allocationTable = new Table(stack, 'allocation', allocationTableProps) + /** * This table takes a stored CAR and makes an entry in the store table * Used by the store/* service capabilities. @@ -99,6 +106,7 @@ export function UploadDbStack({ stack, app }) { const spaceMetricsTable = new Table(stack, 'space-metrics', spaceMetricsTableProps) return { + allocationTable, storeTable, uploadTable, pieceTable, diff --git a/upload-api/functions/ucan-invocation-router.js b/upload-api/functions/ucan-invocation-router.js index 2d8635b4..6d4212ff 100644 --- a/upload-api/functions/ucan-invocation-router.js +++ b/upload-api/functions/ucan-invocation-router.js @@ -23,6 +23,10 @@ import { createUcantoServer } from '../service.js' import { Config } from 'sst/node/config' import { CAR, Legacy, Codec } from '@ucanto/transport' import { Email } from '../email.js' +import { createTasksStorage } from '../stores/tasks.js' +import { createReceiptsStorage } from '../stores/receipts.js' +import { createAllocationsStorage } from '../stores/allocations.js' +import { createBlobsStorage, composeblobStoragesWithOrderedHas } from '../stores/blobs.js' import { useProvisionStore } from '../stores/provisions.js' import { useSubscriptionsStore } from '../stores/subscriptions.js' import { createDelegationsTable } from '../tables/delegations.js' @@ -39,6 +43,7 @@ import { createSpaceDiffStore } from '@web3-storage/w3infra-billing/tables/space import { createSpaceSnapshotStore } from '@web3-storage/w3infra-billing/tables/space-snapshot.js' import { useUsageStore } from '../stores/usage.js' import { createStripeBillingProvider } from '../billing.js' +import { createTasksScheduler } from '../scheduler.js' Sentry.AWSLambda.init({ environment: process.env.SST_STAGE, @@ -99,6 +104,7 @@ export async function ucanInvocationRouter(request) { storeTableName, storeBucketName, uploadTableName, + allocationTableName, consumerTableName, customerTableName, subscriptionTableName, @@ -122,6 +128,7 @@ export async function ucanInvocationRouter(request) { aggregatorDid, dealTrackerDid, dealTrackerUrl, + uploadServiceURL, pieceOfferQueueUrl, filecoinSubmitQueueUrl, requirePaymentPlan, @@ -144,6 +151,22 @@ export async function ucanInvocationRouter(request) { const { PRIVATE_KEY, STRIPE_SECRET_KEY } = Config const serviceSigner = getServiceSigner({ did: UPLOAD_API_DID, privateKey: PRIVATE_KEY }) + const tasksStorage = createTasksStorage(AWS_REGION, invocationBucketName, workflowBucketName) + const receiptsStorage = createReceiptsStorage(AWS_REGION, taskBucketName, invocationBucketName, workflowBucketName) + const allocationsStorage = createAllocationsStorage(AWS_REGION, allocationTableName, { + endpoint: dbEndpoint, + }) + const blobsStorage = composeblobStoragesWithOrderedHas( + createBlobsStorage(R2_REGION, carparkBucketName, { + endpoint: carparkBucketEndpoint, + credentials: { + accessKeyId: carparkBucketAccessKeyId, + secretAccessKey: carparkBucketSecretAccessKey, + }, + }), + createBlobsStorage(AWS_REGION, storeBucketName), + ) + const invocationBucket = createInvocationStore( AWS_REGION, invocationBucketName @@ -172,16 +195,29 @@ export async function ucanInvocationRouter(request) { const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName }) const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore }) - const connection = getServiceConnection({ + const dealTrackerConnection = getServiceConnection({ did: dealTrackerDid, url: dealTrackerUrl }) + const selfConnection = getServiceConnection({ + did: serviceSigner.did(), + url: uploadServiceURL + }) + const tasksScheduler = createTasksScheduler(() => selfConnection) const server = createUcantoServer(serviceSigner, { codec, + allocationsStorage, + blobsStorage, + tasksStorage, + receiptsStorage, + tasksScheduler, + getServiceConnection: () => selfConnection, + // TODO: to be deprecated with `store/*` protocol storeTable: createStoreTable(AWS_REGION, storeTableName, { endpoint: dbEndpoint, }), + // TODO: to be deprecated with `store/*` protocol carStoreBucket: composeCarStoresWithOrderedHas( createCarStore(AWS_REGION, storeBucketName), createCarStore(R2_REGION, carparkBucketName, { @@ -192,6 +228,7 @@ export async function ucanInvocationRouter(request) { }, }), ), + // TODO: to be deprecated with `store/*` protocol dudewhereBucket: createDudewhereStore(R2_REGION, R2_DUDEWHERE_BUCKET_NAME, { endpoint: R2_ENDPOINT, credentials: { @@ -218,10 +255,10 @@ export async function ucanInvocationRouter(request) { pieceOfferQueue: createPieceOfferQueueClient({ region: AWS_REGION }, { queueUrl: pieceOfferQueueUrl }), filecoinSubmitQueue: createFilecoinSubmitQueueClient({ region: AWS_REGION }, { queueUrl: filecoinSubmitQueueUrl }), dealTrackerService: { - connection, + connection: dealTrackerConnection, invocationConfig: { issuer: serviceSigner, - audience: connection.id, + audience: dealTrackerConnection.id, with: serviceSigner.did() } }, @@ -316,6 +353,7 @@ function getLambdaEnv () { storeTableName: mustGetEnv('STORE_TABLE_NAME'), storeBucketName: mustGetEnv('STORE_BUCKET_NAME'), uploadTableName: mustGetEnv('UPLOAD_TABLE_NAME'), + allocationTableName: mustGetEnv('ALLOCATION_TABLE_NAME'), consumerTableName: mustGetEnv('CONSUMER_TABLE_NAME'), customerTableName: mustGetEnv('CUSTOMER_TABLE_NAME'), subscriptionTableName: mustGetEnv('SUBSCRIPTION_TABLE_NAME'), @@ -339,6 +377,7 @@ function getLambdaEnv () { postmarkToken: mustGetEnv('POSTMARK_TOKEN'), providers: mustGetEnv('PROVIDERS'), accessServiceURL: mustGetEnv('ACCESS_SERVICE_URL'), + uploadServiceURL: mustGetEnv('UPLOAD_SERVICE_URL'), aggregatorDid: mustGetEnv('AGGREGATOR_DID'), requirePaymentPlan: (process.env.REQUIRE_PAYMENT_PLAN === 'true'), dealTrackerDid: mustGetEnv('DEAL_TRACKER_DID'), diff --git a/upload-api/package.json b/upload-api/package.json index c6e57cd1..448a57fe 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -25,7 +25,7 @@ "@web3-storage/access": "^18.3.0", "@web3-storage/capabilities": "^13.3.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^9.0.1", + "@web3-storage/upload-api": "^9.1.4", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "preact": "^10.14.1", @@ -65,7 +65,8 @@ "eslintConfig": { "rules": { "unicorn/consistent-destructuring": "off", - "unicorn/prefer-array-flat-map": "off" + "unicorn/prefer-array-flat-map": "off", + "unicorn/no-useless-undefined": "off" } } } diff --git a/upload-api/scheduler.js b/upload-api/scheduler.js new file mode 100644 index 00000000..9c103c84 --- /dev/null +++ b/upload-api/scheduler.js @@ -0,0 +1,43 @@ +/** + * @typedef {import('@web3-storage/upload-api/types').TasksScheduler} TasksSchedulerInterface + * @typedef {import('@web3-storage/upload-api/types').Service} Service + * @typedef {import('@ucanto/interface').ConnectionView} Connection + * @typedef {import('@ucanto/interface').ServiceInvocation} ServiceInvocation + * @typedef {import('@ucanto/interface').Failure} Failure + * @typedef {import('@ucanto/interface').Unit} Unit + * @typedef {import('@ucanto/interface').Result} Result + */ + +/** + * @param {() => Connection} getServiceConnection + */ +export const createTasksScheduler = (getServiceConnection) => new TasksScheduler(getServiceConnection) + +/** + * @implements {TasksSchedulerInterface} + */ +export class TasksScheduler { + /** + * + * @param {() => Connection} getServiceConnection + */ + constructor (getServiceConnection) { + this.getServiceConnection = getServiceConnection + } + + /** + * @param {ServiceInvocation} invocation + * @returns {Promise} + */ + async schedule(invocation) { + const connection = this.getServiceConnection() + const [res] = await connection.execute(invocation) + + if (res.out.error) { + return res.out + } + return { + ok: {}, + } + } +} diff --git a/upload-api/stores/allocations.js b/upload-api/stores/allocations.js new file mode 100644 index 00000000..c79e700e --- /dev/null +++ b/upload-api/stores/allocations.js @@ -0,0 +1,211 @@ +import { + DynamoDBClient, + GetItemCommand, + PutItemCommand, + QueryCommand, +} from '@aws-sdk/client-dynamodb' +import { marshall, unmarshall } from '@aws-sdk/util-dynamodb' +import { RecordKeyConflict, RecordNotFound } from '@web3-storage/upload-api/errors' +import { base58btc } from 'multiformats/bases/base58' +import * as Link from 'multiformats/link' + +/** + * @typedef {import('@web3-storage/upload-api/types').AllocationsStorage} AllocationsStorage + * @typedef {import('@web3-storage/upload-api/types').BlobAddInput} BlobAddInput + * @typedef {import('@web3-storage/upload-api/types').BlobListItem} BlobListItem + * @typedef {import('@web3-storage/upload-api/types').ListOptions} ListOptions + * @typedef {import('@web3-storage/upload-api/types').DID} DID + */ + +/** + * Abstraction layer to handle operations on Store Table. + * + * @param {string} region + * @param {string} tableName + * @param {object} [options] + * @param {string} [options.endpoint] + * @returns {AllocationsStorage} + */ +export const createAllocationsStorage = (region, tableName, options = {}) => { + const dynamoDb = new DynamoDBClient({ + region, + endpoint: options.endpoint, + }) + + return useAllocationsStorage(dynamoDb, tableName) +} + +/** + * @param {DynamoDBClient} dynamoDb + * @param {string} tableName + * @returns {AllocationsStorage} + */ +export function useAllocationsStorage(dynamoDb, tableName) { + return { + /** + * Check if the given link CID is bound to the uploader account + * + * @param {import('@ucanto/interface').DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + exists: async (space, blobMultihash) => { + const key = getKey(space, blobMultihash) + const cmd = new GetItemCommand({ + TableName: tableName, + Key: key, + AttributesToGet: ['space'], + }) + + try { + const response = await dynamoDb.send(cmd) + return { ok: Boolean(response.Item) } + } catch { + return { ok: false } + } + }, + /** + * @param {import('@web3-storage/upload-api').DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + async get(space, blobMultihash) { + const key = getKey(space, blobMultihash) + const cmd = new GetItemCommand({ + TableName: tableName, + Key: key, + }) + + const response = await dynamoDb.send(cmd) + if (!response.Item) { + return { error: new RecordNotFound() } + } + + const raw = unmarshall(response.Item) + return { + ok: { + blob: { + digest: base58btc.decode(raw.multihash), + size: raw.size + }, + invocation: Link.parse(raw.invocation) + } + } + }, + /** + * Bind a link CID to an account + * + * @param {BlobAddInput} item + * @returns {ReturnType} + */ + insert: async ({ space, blob, invocation }) => { + const insertedAt = new Date().toISOString() + const multihash58btc = base58btc.encode(blob.digest) + + const item = { + space, + multihash: multihash58btc, + size: blob.size, + invocation: invocation.toString(), + insertedAt, + } + const cmd = new PutItemCommand({ + TableName: tableName, + Item: marshall(item, { removeUndefinedValues: true }), + ConditionExpression: 'attribute_not_exists(#S) AND attribute_not_exists(#M)', + ExpressionAttributeNames: { '#S': 'space', '#M': 'multihash' } + }) + + try { + await dynamoDb.send(cmd) + } catch (/** @type {any} */ err) { + if (err.name === 'ConditionalCheckFailedException') { + return { error: new RecordKeyConflict() } + } + throw err + } + return { ok: { blob } } + }, + /** + * List all CARs bound to an account + * + * @param {import('@ucanto/interface').DID} space + * @param {import('@web3-storage/upload-api').ListOptions} [options] + * @returns {ReturnType} + */ + list: async (space, options = {}) => { + const exclusiveStartKey = options.cursor + ? marshall({ + space, + link: options.cursor, + }) + : undefined + + const cmd = new QueryCommand({ + TableName: tableName, + Limit: options.size || 20, + KeyConditions: { + space: { + ComparisonOperator: 'EQ', + AttributeValueList: [{ S: space }], + }, + }, + ScanIndexForward: !options.pre, + ExclusiveStartKey: exclusiveStartKey, + AttributesToGet: ['multihash', 'size', 'insertedAt'], + }) + const response = await dynamoDb.send(cmd) + + const results = + response.Items?.map((i) => toBlobListResult(unmarshall(i))) ?? [] + const firstLinkCID = results[0] ? base58btc.encode(results[0].blob.digest) : undefined + // Get cursor of the item where list operation stopped (inclusive). + // This value can be used to start a new operation to continue listing. + const lastKey = + response.LastEvaluatedKey && unmarshall(response.LastEvaluatedKey) + const lastLinkCID = lastKey ? lastKey.multihash : undefined + + const before = options.pre ? lastLinkCID : firstLinkCID + const after = options.pre ? firstLinkCID : lastLinkCID + return { + ok: { + size: results.length, + before, + after, + cursor: after, + results: options.pre ? results.reverse() : results, + } + } + }, + } +} + +/** + * Upgrade from the db representation + * + * @param {Record} item + * @returns {BlobListItem} + */ +export function toBlobListResult({ multihash, size, insertedAt }) { + return { + blob: { + digest: base58btc.decode(multihash), + size + }, + insertedAt, + } +} + +/** + * @param {import('@web3-storage/upload-api').DID} space + * @param {Uint8Array} blobMultihash + */ +const getKey = (space, blobMultihash) => { + const multihash58btc = base58btc.encode(blobMultihash) + const item = { + space, + multihash: multihash58btc.toString(), + } + + return marshall(item) +} diff --git a/upload-api/stores/blobs.js b/upload-api/stores/blobs.js new file mode 100644 index 00000000..a7c4b93b --- /dev/null +++ b/upload-api/stores/blobs.js @@ -0,0 +1,134 @@ +import { base64pad } from 'multiformats/bases/base64' +import { decode as digestDecode } from 'multiformats/hashes/digest' +import { base58btc } from 'multiformats/bases/base58' + +import { + S3Client, + HeadObjectCommand, + PutObjectCommand, +} from '@aws-sdk/client-s3' +import { getSignedUrl } from '@aws-sdk/s3-request-presigner' + +/** + * @typedef {import('@web3-storage/upload-api/types').BlobsStorage} BlobsStorage + * @typedef {import('@ucanto/interface').Failure} Failure + * @typedef {import('@ucanto/interface').Result} HasResult + */ + +/** + * Abstraction layer with Factory to perform operations on bucket storing Blobs. + * + * @param {string} region + * @param {string} bucketName + * @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options] + */ +export function createBlobsStorage(region, bucketName, options) { + const s3 = new S3Client({ + region, + ...options, + }) + return useBlobsStorage(s3, bucketName) +} + +/** + * + * @param {S3Client} s3 + * @param {string} bucketName + * @returns {BlobsStorage} + */ +export function useBlobsStorage(s3, bucketName) { + return { + /** + * @param {Uint8Array} multihash + */ + has: async (multihash) => { + const encodedMultihash = base58btc.encode(multihash) + const cmd = new HeadObjectCommand({ + Key: `${encodedMultihash}/${encodedMultihash}.blob`, + Bucket: bucketName, + }) + try { + await s3.send(cmd) + } catch (cause) { + // @ts-expect-error + if (cause?.$metadata?.httpStatusCode === 404) { + return { ok: false } + } + throw new Error('Failed to check if car-store', { cause }) + } + return { ok: true } + }, + /** + * Create a presigned s3 url allowing the recipient to upload + * only the CAR that matches the provided Link + * + * @param {Uint8Array} multihash + * @param {number} size + */ + createUploadUrl: async (multihash, size) => { + const encodedMultihash = base58btc.encode(multihash) + const multihashDigest = digestDecode(multihash) + const checksum = base64pad.baseEncode(multihashDigest.digest) + const cmd = new PutObjectCommand({ + Key: `${encodedMultihash}/${encodedMultihash}.blob`, + Bucket: bucketName, + ChecksumSHA256: checksum, + ContentLength: size, + }) + const expiresIn = 60 * 60 * 24 // 1 day + const url = new URL( + await getSignedUrl(s3, cmd, { + expiresIn, + unhoistableHeaders: new Set(['x-amz-checksum-sha256']), + }) + ) + return { + ok: { + url, + headers: { + 'x-amz-checksum-sha256': checksum, + 'content-length': String(size), + }, + } + } + }, + } +} + +/** + * compose many blob stores. + * store#createUploadUrl is from first store. + * store#has will check stores in order until 0-1 `true` are found. + * + * @param {BlobsStorage} blobStorage + * @param {Array} moreblobStorages + */ +export function composeblobStoragesWithOrderedHas(blobStorage, ...moreblobStorages) { + return { + ...blobStorage, + has: composeSome(blobStorage.has, ...moreblobStorages.map(s => s.has.bind(s))), + } +} + +/** + * compose async functions that return Promise>. + * The returned function will have the same signature, + * but will try the composed functions in order until one (or none) returns true. + * + * @template T + * @param {Array<(e: T) => Promise>} hasFunctions + */ +function composeSome(...hasFunctions) { + /** + * @param {T} e + */ + return async function (e) { + for (const has of hasFunctions) { + const hasResult = await has(e) + if (hasResult.ok) { + return hasResult + } + } + return { ok: false } + } +} diff --git a/upload-api/stores/lib.js b/upload-api/stores/lib.js new file mode 100644 index 00000000..eea8513e --- /dev/null +++ b/upload-api/stores/lib.js @@ -0,0 +1,101 @@ +import { + GetObjectCommand, + ListObjectsV2Command +} from '@aws-sdk/client-s3' +import * as CAR from '@ucanto/transport/car' +import { RecordNotFound, StorageOperationFailed } from '@web3-storage/upload-api/errors' + +/** + * @param {string} messageCid + * @param {object} props + * @param {string} props.workflowBucketName + * @param {import('@aws-sdk/client-s3').S3Client} props.s3client + */ +export async function getAgentMessage (messageCid, { workflowBucketName, s3client }) { + const encodedAgentMessageArchiveKey = `${messageCid}/${messageCid}` + const getCmd = new GetObjectCommand({ + Bucket: workflowBucketName, + Key: encodedAgentMessageArchiveKey, + }) + + let res + try { + res = await s3client.send(getCmd) + } catch (/** @type {any} */ error) { + if (error?.$metadata?.httpStatusCode === 404) { + return { + error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} not found in store`) + } + } + return { + error: new StorageOperationFailed(error.message) + } + } + if (!res || !res.Body) { + return { + error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} not found in store`) + } + } + + const agentMessageBytes = await res.Body.transformToByteArray() + const agentMessage = await CAR.request.decode({ + body: agentMessageBytes, + headers: {}, + }) + + return { + ok: agentMessage + } +} + + + +/** + * @param {import('@ucanto/interface').UnknownLink} invocationCid + * @param {object} props + * @param {string} props.invocationBucketName + * @param {import('@aws-sdk/client-s3').S3Client} props.s3client + * @param {'.out' | '.in'} props.endsWith + */ +export async function getAgentMessageCidWithInvocation (invocationCid, { invocationBucketName, s3client, endsWith }) { + // Find agent message archive CID where this receipt was stored + const encodedInvocationKeyPrefix = `${invocationCid.toString()}/` + const listCmd = new ListObjectsV2Command({ + Bucket: invocationBucketName, + Prefix: encodedInvocationKeyPrefix + }) + let listRes + try { + listRes = await s3client.send(listCmd) + } catch (/** @type {any} */ error) { + if (error?.$metadata?.httpStatusCode === 404) { + return { + error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found`) + } + } + return { + error: new StorageOperationFailed(error.message) + } + } + if (!listRes.Contents?.length) { + return { + error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found`) + } + } + // Key in format `${invocation.cid}/${agentMessageArchive.cid}.out` + const agentMessageArchiveWithReceipt = listRes.Contents.find(c => c.Key?.endsWith(endsWith)) + if (!agentMessageArchiveWithReceipt || !agentMessageArchiveWithReceipt.Key) { + return { + error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found with a receipt`) + } + } + + // Get Message Archive with receipt + const agentMessageArchiveWithReceiptCid = agentMessageArchiveWithReceipt.Key + .replace(encodedInvocationKeyPrefix, '') + .replace(endsWith, '') + + return { + ok: agentMessageArchiveWithReceiptCid + } +} diff --git a/upload-api/stores/receipts.js b/upload-api/stores/receipts.js new file mode 100644 index 00000000..6ae5159e --- /dev/null +++ b/upload-api/stores/receipts.js @@ -0,0 +1,191 @@ +import { + S3Client, + PutObjectCommand, + HeadObjectCommand, +} from '@aws-sdk/client-s3' +import * as CAR from '@ucanto/transport/car' +import { CBOR, Message } from '@ucanto/core' +import pRetry from 'p-retry' +import { RecordNotFound, RecordNotFoundErrorName, StorageOperationFailed } from '@web3-storage/upload-api/errors' +import { getAgentMessage, getAgentMessageCidWithInvocation } from './lib.js' + +/** + * @typedef {import('@web3-storage/upload-api/types').ReceiptsStorage} ReceiptsStorage + * @typedef {import('@ucanto/interface').Receipt} Receipt + */ + +/** + * Abstraction layer with Factory to perform operations on bucket storing + * handled receipts. It follows pattern documented in + * https://github.com/web3-storage/w3infra/blob/main/docs/ucan-invocation-stream.md#buckets + * + * @param {string} region + * @param {string} taskBucketName + * @param {string} invocationBucketName + * @param {string} workflowBucketName + * @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options] + */ +export function createReceiptsStorage(region, taskBucketName, invocationBucketName, workflowBucketName, options = {}) { + const s3client = new S3Client({ + region, + ...options, + }) + return useReceiptsStorage(s3client, taskBucketName, invocationBucketName, workflowBucketName) +} + +/** + * @param {S3Client} s3client + * @param {string} taskBucketName + * @param {string} invocationBucketName + * @param {string} workflowBucketName + * @returns {ReceiptsStorage} + */ +export const useReceiptsStorage = (s3client, taskBucketName, invocationBucketName, workflowBucketName) => { + return { + /** + * Creates an agent message to write the receipt received to be written. + * Also write all indexes needed to lookup for receipt. + * + * @param {Receipt} receipt + */ + put: async (receipt) => { + const message = await Message.build({ + receipts: [receipt] + }) + const req = CAR.request.encode(message) + const messageCid = message.root.cid + const invocationCid = receipt.ran.link() + const taskCid = invocationCid + + // Store workflow + // The entire encoded agent message files containing receipt stored. + const workflowPutCmd = new PutObjectCommand({ + Bucket: workflowBucketName, + Key: `${messageCid}/${messageCid}`, + Body: new Uint8Array(req.body.buffer), + }) + + // Store mapping for where each receipt lives in agent message file. + // a pseudo symlink to `/${agentMessageArchive.cid}/${agentMessageArchive.cid}` via key + // `${invocation.cid}/${agentMessageArchive.cid}.out` to track where each receipt lives + // in a agent message file. As a pseudo symlink, it is an empty object. + const outLinkPutCmd = new PutObjectCommand({ + Bucket: invocationBucketName, + Key: `${invocationCid}/${messageCid}.out`, + }) + + // Store mapping task to invocation + // a pseudo symlink to `/${invocation.cid}/${invocation.cid}` via + // `${task.cid}/${invocation.cid}.invocation` to enable looking up invocations and + // receipts by a task. As a pseudo symlink, it is an empty object. + const invocationLinkPutCmd = new PutObjectCommand({ + Bucket: taskBucketName, + Key: `${taskCid}/${invocationCid}.invocation`, + }) + + // Store receipt output + // A block containing the out field of the receipt. + const taskResult = await CBOR.write({ + out: receipt.out, + }) + const receiptOutputPutCmd = new PutObjectCommand({ + Bucket: taskBucketName, + Key: `${taskCid}/${taskCid}.result`, + Body: taskResult.bytes, + }) + + try { + await Promise.all([ + // see `src/ucan-invocation.js` on how we store workflows and tasks + pRetry(() => s3client.send(workflowPutCmd)), + pRetry(() => s3client.send(outLinkPutCmd)), + pRetry(() => s3client.send(invocationLinkPutCmd)), + pRetry(() => s3client.send(receiptOutputPutCmd)), + ]) + } catch { + return { + error: new StorageOperationFailed('no new receipt should be put by storefront') + } + } + + return { + ok: {} + } + }, + get: async (taskCid) => { + // TODO: When we distinct between TaskCid and InvocationCid, we also need to see this mapping. + const invocationCid = taskCid + const getAgentMessageCid = await getAgentMessageCidWithInvocation(invocationCid, { + invocationBucketName, + s3client, + endsWith: '.out' + }) + if (getAgentMessageCid.error) { + return getAgentMessageCid + } + const getAgentMessageRes = await getAgentMessage(getAgentMessageCid.ok, { + workflowBucketName, + s3client + }) + if (getAgentMessageRes.error) { + return getAgentMessageRes + } + + // @ts-expect-error unknown link does not mach expectations + const receipt = getAgentMessageRes.ok.receipts.get(taskCid.toString()) + if (!receipt) { + return { + error: new RecordNotFound(`agent message archive ${getAgentMessageCid.ok} does not include receipt for invocation ${taskCid.toString()}`) + } + } + return { + ok: receipt + } + }, + has: async (taskCid) => { + // TODO: When we distinct between TaskCid and InvocationCid, we also need to see this mapping. + const invocationCid = taskCid + const getAgentMessageCid = await getAgentMessageCidWithInvocation(invocationCid, { + invocationBucketName, + s3client, + endsWith: '.out' + }) + if (getAgentMessageCid.error) { + if (getAgentMessageCid.error.name === RecordNotFoundErrorName) { + return { + ok: false + } + } + return getAgentMessageCid + } + + const encodedAgentMessageArchiveKey = `${getAgentMessageCid.ok}/${getAgentMessageCid.ok}` + const headCmd = new HeadObjectCommand({ + Bucket: workflowBucketName, + Key: encodedAgentMessageArchiveKey, + }) + + let res + try { + res = await s3client.send(headCmd) + } catch (/** @type {any} */ error) { + if (error?.$metadata?.httpStatusCode === 404) { + return { + ok: false + } + } + return { + error: new StorageOperationFailed(error.message) + } + } + if (!res) { + return { + error: new StorageOperationFailed(`Head request to check agent message existence failed`) + } + } + return { + ok: true + } + } + } +} diff --git a/upload-api/stores/tasks.js b/upload-api/stores/tasks.js new file mode 100644 index 00000000..b51d4e3a --- /dev/null +++ b/upload-api/stores/tasks.js @@ -0,0 +1,156 @@ +import { + S3Client, + PutObjectCommand, +} from '@aws-sdk/client-s3' +import * as CAR from '@ucanto/transport/car' +import { Message } from '@ucanto/core' +import pRetry from 'p-retry' +import { StorageOperationFailed, RecordNotFound, RecordNotFoundErrorName } from '@web3-storage/upload-api/errors' +import { getAgentMessage, getAgentMessageCidWithInvocation } from './lib.js' + +/** + * @typedef {import('@web3-storage/upload-api/types').TasksStorage} TasksStorage + * @typedef {import('@web3-storage/capabilities/types').StorageGetError} StorageGetError + * @typedef {import('@web3-storage/capabilities/types').StoragePutError} StoragePutError + * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink + * @typedef {import('@ucanto/interface').Invocation} Invocation + */ + +/** + * Abstraction layer with Factory to perform operations on bucket storing + * handled receipts. It follows pattern documented in + * https://github.com/web3-storage/w3infra/blob/main/docs/ucan-invocation-stream.md#buckets + * + * @param {string} region + * @param {string} invocationBucketName + * @param {string} workflowBucketName + * @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options] + */ +export function createTasksStorage(region, invocationBucketName, workflowBucketName, options = {}) { + const s3client = new S3Client({ + region, + ...options, + }) + return useTasksStorage(s3client, invocationBucketName, workflowBucketName) +} + +/** + * @param {S3Client} s3client + * @param {string} invocationBucketName + * @param {string} workflowBucketName + * @returns {TasksStorage} + */ +export const useTasksStorage = (s3client, invocationBucketName, workflowBucketName) => { + return { + /** + * Creates an agent message to write the invocation received to be written. + * Also write all indexes needed to lookup for invocation. + * + * @param {Invocation} invocation + */ + put: async (invocation) => { + const message = await Message.build({ + invocations: [invocation] + }) + const req = CAR.request.encode(message) + const messageCid = message.root.cid + const invocationCid = invocation.cid + + // Store workflow + // The entire encoded agent message files containing invocations to be executed. + const workflowPutCmd = new PutObjectCommand({ + Bucket: workflowBucketName, + Key: `${messageCid}/${messageCid}`, + Body: new Uint8Array(req.body.buffer), + }) + // Store mapping for where each receipt lives in agent message file. + // A pseudo symlink to `/${agentMessageArchive.cid}/${agentMessageArchive.cid}` via key + // `${invocation.cid}/${agentMessageArchive.cid}`.in to track where each invocation lives + // in a agent message file. As a pseudo symlink, it is an empty object. + const inLinkPutCmd = new PutObjectCommand({ + Bucket: invocationBucketName, + Key: `${invocationCid}/${messageCid}.in`, + }) + + try { + await Promise.all([ + // see `src/ucan-invocation.js` on how we store workflows and tasks + pRetry(() => s3client.send(workflowPutCmd)), + pRetry(() => s3client.send(inLinkPutCmd)), + ]) + } catch { + return { + error: new StorageOperationFailed('no new receipt should be put by storefront') + } + } + + return { + ok: {} + } + }, + get: async (taskCid) => { + // TODO: When we distinct between TaskCid and InvocationCid, we also need to see this mapping. + const invocationCid = taskCid + const getAgentMessageCid = await getAgentMessageCidWithInvocation(invocationCid, { + invocationBucketName, + s3client, + endsWith: '.in' + }) + if (getAgentMessageCid.error) { + return getAgentMessageCid + } + const getAgentMessageRes = await getAgentMessage(getAgentMessageCid.ok, { + workflowBucketName, + s3client + }) + if (getAgentMessageRes.error) { + return getAgentMessageRes + } + + const invocation = getAgentMessageRes.ok.invocations.find(inv => inv.cid.equals(taskCid)) + if (!invocation) { + return { + error: new RecordNotFound(), + } + } + return { + ok: invocation + } + }, + has: async (taskCid) => { + // TODO: When we distinct between TaskCid and InvocationCid, we also need to see this mapping. + const invocationCid = taskCid + const getAgentMessageCid = await getAgentMessageCidWithInvocation(invocationCid, { + invocationBucketName, + s3client, + endsWith: '.in' + }) + if (getAgentMessageCid.error) { + if (getAgentMessageCid.error.name === RecordNotFoundErrorName) { + return { + ok: false + } + } + return getAgentMessageCid + } + + const getAgentMessageRes = await getAgentMessage(getAgentMessageCid.ok, { + workflowBucketName, + s3client + }) + if (getAgentMessageRes.error) { + return getAgentMessageRes + } + + const invocation = getAgentMessageRes.ok.invocations.find(inv => inv.cid.equals(taskCid)) + if (!invocation) { + return { + ok: false + } + } + return { + ok: true + } + } + } +} diff --git a/upload-api/tables/index.js b/upload-api/tables/index.js index 7e91b858..2f5800af 100644 --- a/upload-api/tables/index.js +++ b/upload-api/tables/index.js @@ -33,6 +33,22 @@ export const uploadTableProps = { } } +/** @type TableProps */ +export const allocationTableProps = { + fields: { + space: 'string', // `did:key:space` + multihash: 'string', // `bagy...1` + size: 'number', // `101` + invocation: 'string', // `baf...ucan` (CID of invcation UCAN) + insertedAt: 'string', // `2022-12-24T...` + }, + // space + link must be unique to satisfy index constraint + primaryIndex: { partitionKey: 'space', sortKey: 'multihash' }, + globalIndexes: { + multihash: { partitionKey: 'multihash', sortKey: 'space', projection: ['space', 'insertedAt'] } + } +} + /** @type TableProps */ export const delegationTableProps = { fields: { diff --git a/upload-api/test/helpers/ucan.js b/upload-api/test/helpers/ucan.js index 66f97931..f0a961f2 100644 --- a/upload-api/test/helpers/ucan.js +++ b/upload-api/test/helpers/ucan.js @@ -10,7 +10,11 @@ import { createBucket, createTable } from '../helpers/resources.js'; -import { storeTableProps, uploadTableProps, consumerTableProps, delegationTableProps, subscriptionTableProps, rateLimitTableProps, revocationTableProps, spaceMetricsTableProps } from '../../tables/index.js'; +import { storeTableProps, uploadTableProps, allocationTableProps, consumerTableProps, delegationTableProps, subscriptionTableProps, rateLimitTableProps, revocationTableProps, spaceMetricsTableProps } from '../../tables/index.js'; +import { useTasksStorage } from '../../stores/tasks.js'; +import { useReceiptsStorage } from '../../stores/receipts.js'; +import { useAllocationsStorage } from '../../stores/allocations.js'; +import { composeblobStoragesWithOrderedHas, useBlobsStorage } from '../../stores/blobs.js'; import { composeCarStoresWithOrderedHas, useCarStore } from '../../buckets/car-store.js'; import { useDudewhereStore } from '../../buckets/dudewhere-store.js'; import { useStoreTable } from '../../tables/store.js'; @@ -32,6 +36,7 @@ import { usePieceTable } from '../../../filecoin/store/piece.js' import { createTaskStore as createFilecoinTaskStore } from '../../../filecoin/store/task.js' import { createReceiptStore as createFilecoinReceiptStore } from '../../../filecoin/store/receipt.js' import { createTestBillingProvider } from './billing.js'; +import { createTasksScheduler } from '../../scheduler.js'; export { API } @@ -184,8 +189,34 @@ export async function executionContextToUcantoTestServerContext(t) { 'did:web:test.web3.storage' ); const { dynamo, s3, r2 } = t.context; - const bucketName = await createBucket(s3); + const bucketName = await createBucket(s3) + const r2CarStoreBucketName = r2 + ? await createBucket(r2) + : undefined + const tasksBucketName = await createBucket(s3) + const delegationsBucketName = await createBucket(s3) + const invocationsBucketName = await createBucket(s3) + const workflowBucketName = await createBucket(s3) + + const s3BlobsStorageBucket = useBlobsStorage(s3, bucketName) + const r2BlobsStorageBucket = r2CarStoreBucketName + ? useBlobsStorage(r2, r2CarStoreBucketName) + : undefined + const blobsStorage = r2BlobsStorageBucket + ? composeblobStoragesWithOrderedHas( + s3BlobsStorageBucket, + r2BlobsStorageBucket, + ) + : s3BlobsStorageBucket + const tasksStorage = useTasksStorage(s3, invocationsBucketName, workflowBucketName) + const receiptsStorage = useReceiptsStorage(s3, tasksBucketName, invocationsBucketName, workflowBucketName) + const allocationsStorage = useAllocationsStorage(dynamo, + await createTable(dynamo, allocationTableProps) + ) + const getServiceConnection = () => connection + const tasksScheduler = createTasksScheduler(getServiceConnection) + // To be deprecated const storeTable = useStoreTable( dynamo, await createTable(dynamo, storeTableProps) @@ -196,14 +227,11 @@ export async function executionContextToUcantoTestServerContext(t) { await createTable(dynamo, uploadTableProps) ); - const r2CarStoreBucketName = r2 - ? await createBucket(r2) - : undefined + // To be deprecated + const s3CarStoreBucket = useCarStore(s3, bucketName) const r2CarStoreBucket = r2CarStoreBucketName ? useCarStore(r2, r2CarStoreBucketName) : undefined - const s3CarStoreBucket = useCarStore(s3, bucketName); - const carStoreBucket = r2CarStoreBucket ? composeCarStoresWithOrderedHas( s3CarStoreBucket, @@ -217,10 +245,6 @@ export async function executionContextToUcantoTestServerContext(t) { const id = signer.withDID('did:web:test.web3.storage'); const aggregatorSigner = await Signer.Signer.generate(); - const delegationsBucketName = await createBucket(s3); - const invocationsBucketName = await createBucket(s3); - const workflowBucketName = await createBucket(s3); - const revocationsStorage = useRevocationsTable( dynamo, await createTable(dynamo, revocationTableProps) @@ -271,6 +295,12 @@ export async function executionContextToUcantoTestServerContext(t) { signer: id, email, url: new URL('http://example.com'), + allocationsStorage, + blobsStorage, + tasksStorage, + receiptsStorage, + tasksScheduler, + getServiceConnection, provisionsStorage, delegationsStorage, rateLimitsStorage, @@ -282,10 +312,13 @@ export async function executionContextToUcantoTestServerContext(t) { }, }, maxUploadSize: 5_000_000_000, + // TODO: to be deprecated with `store/*` protocol storeTable, uploadTable, + // TODO: to be deprecated with `store/*` protocol carStoreBucket, r2CarStoreBucket, + // TODO: to be deprecated with `store/*` protocol dudewhereBucket, validateAuthorization: () => ({ ok: {} }), // filecoin/* @@ -308,7 +341,6 @@ export async function executionContextToUcantoTestServerContext(t) { channel: createServer(serviceContext) }); - return { ...serviceContext, mail: email, diff --git a/upload-api/test/service/blob.test.js b/upload-api/test/service/blob.test.js new file mode 100644 index 00000000..92663c5d --- /dev/null +++ b/upload-api/test/service/blob.test.js @@ -0,0 +1,32 @@ +/* eslint-disable no-loop-func, no-nested-ternary, no-only-tests/no-only-tests */ +import { Blob } from '@web3-storage/upload-api/test' +import { test } from '../helpers/context.js' +import { + createS3, + createDynamodDb, + createR2, +} from '../helpers/resources.js' +import { executionContextToUcantoTestServerContext } from '../helpers/ucan.js' +import { assertsFromExecutionContext } from '../helpers/assert.js' + +test.before(async (t) => { + Object.assign(t.context, { + dynamo: await createDynamodDb(), + r2: (await createR2()).client, + s3: (await createS3()).client, + }) +}) + +for (const [title, unit] of Object.entries(Blob.test)) { + const define = title.startsWith('only ') + ? test.only + : title.startsWith('skip ') + ? test.skip + : test + define(title, async (t) => { + await unit( + assertsFromExecutionContext(t), + await executionContextToUcantoTestServerContext(t) + ) + }) +} diff --git a/upload-api/test/service/ucan.test.js b/upload-api/test/service/ucan.test.js new file mode 100644 index 00000000..493fddf4 --- /dev/null +++ b/upload-api/test/service/ucan.test.js @@ -0,0 +1,32 @@ +/* eslint-disable no-loop-func, no-nested-ternary, no-only-tests/no-only-tests */ +import { Ucan } from '@web3-storage/upload-api/test' +import { test } from '../helpers/context.js' +import { + createS3, + createDynamodDb, + createR2, +} from '../helpers/resources.js' +import { executionContextToUcantoTestServerContext } from '../helpers/ucan.js' +import { assertsFromExecutionContext } from '../helpers/assert.js' + +test.before(async (t) => { + Object.assign(t.context, { + dynamo: await createDynamodDb(), + r2: (await createR2()).client, + s3: (await createS3()).client, + }) +}) + +for (const [title, unit] of Object.entries(Ucan.test)) { + const define = title.startsWith('only ') + ? test.only + : title.startsWith('skip ') + ? test.skip + : test + define(title, async (t) => { + await unit( + assertsFromExecutionContext(t), + await executionContextToUcantoTestServerContext(t) + ) + }) +} diff --git a/upload-api/test/service/web3.storage.test.js b/upload-api/test/service/web3.storage.test.js new file mode 100644 index 00000000..028c4a0e --- /dev/null +++ b/upload-api/test/service/web3.storage.test.js @@ -0,0 +1,33 @@ +/* eslint-disable no-loop-func, no-nested-ternary, no-only-tests/no-only-tests */ +import { Web3Storage } from '@web3-storage/upload-api/test' +import { test } from '../helpers/context.js' +import { + createS3, + createDynamodDb, + createR2, +} from '../helpers/resources.js' +import { executionContextToUcantoTestServerContext } from '../helpers/ucan.js' +import { assertsFromExecutionContext } from '../helpers/assert.js' + +test.before(async (t) => { + Object.assign(t.context, { + dynamo: await createDynamodDb(), + r2: (await createR2()).client, + s3: (await createS3()).client, + }) +}) + +for (const [title, unit] of Object.entries(Web3Storage.test)) { + + const define = title.startsWith('only ') + ? test.only + : title.startsWith('skip ') + ? test.skip + : test + define(title, async (t) => { + await unit( + assertsFromExecutionContext(t), + await executionContextToUcantoTestServerContext(t) + ) + }) +} diff --git a/upload-api/test/storage/allocations.test.js b/upload-api/test/storage/allocations.test.js new file mode 100644 index 00000000..1d3a5297 --- /dev/null +++ b/upload-api/test/storage/allocations.test.js @@ -0,0 +1,32 @@ +/* eslint-disable no-loop-func, no-nested-ternary, no-only-tests/no-only-tests */ +import { allocationsStorageTests } from '@web3-storage/upload-api/test' +import { test } from '../helpers/context.js' +import { + createS3, + createR2, + createDynamodDb, +} from '../helpers/resources.js' +import { executionContextToUcantoTestServerContext } from '../helpers/ucan.js' +import { assertsFromExecutionContext } from '../helpers/assert.js' + +test.before(async (t) => { + Object.assign(t.context, { + dynamo: await createDynamodDb(), + s3: (await createS3()).client, + r2: (await createR2()).client, + }) +}) + +for (const [title, unit] of Object.entries(allocationsStorageTests)) { + const define = title.startsWith('only ') + ? test.only + : title.startsWith('skip ') + ? test.skip + : test + define(title, async (t) => { + await unit( + assertsFromExecutionContext(t), + await executionContextToUcantoTestServerContext(t) + ) + }) +} diff --git a/upload-api/test/storage/blobs.test.js b/upload-api/test/storage/blobs.test.js new file mode 100644 index 00000000..322cb031 --- /dev/null +++ b/upload-api/test/storage/blobs.test.js @@ -0,0 +1,32 @@ +/* eslint-disable no-loop-func, no-nested-ternary, no-only-tests/no-only-tests */ +import { blobsStorageTests } from '@web3-storage/upload-api/test' +import { test } from '../helpers/context.js' +import { + createS3, + createR2, + createDynamodDb, +} from '../helpers/resources.js' +import { executionContextToUcantoTestServerContext } from '../helpers/ucan.js' +import { assertsFromExecutionContext } from '../helpers/assert.js' + +test.before(async (t) => { + Object.assign(t.context, { + dynamo: await createDynamodDb(), + s3: (await createS3()).client, + r2: (await createR2()).client, + }) +}) + +for (const [title, unit] of Object.entries(blobsStorageTests)) { + const define = title.startsWith('only ') + ? test.only + : title.startsWith('skip ') + ? test.skip + : test + define(title, async (t) => { + await unit( + assertsFromExecutionContext(t), + await executionContextToUcantoTestServerContext(t) + ) + }) +} diff --git a/upload-api/test/storage/receipts.test.js b/upload-api/test/storage/receipts.test.js new file mode 100644 index 00000000..21b1f074 --- /dev/null +++ b/upload-api/test/storage/receipts.test.js @@ -0,0 +1,32 @@ +/* eslint-disable no-loop-func, no-nested-ternary, no-only-tests/no-only-tests */ +import { receiptsStorageTests } from '@web3-storage/upload-api/test' +import { test } from '../helpers/context.js' +import { + createS3, + createR2, + createDynamodDb, +} from '../helpers/resources.js' +import { executionContextToUcantoTestServerContext } from '../helpers/ucan.js' +import { assertsFromExecutionContext } from '../helpers/assert.js' + +test.before(async (t) => { + Object.assign(t.context, { + dynamo: await createDynamodDb(), + s3: (await createS3()).client, + r2: (await createR2()).client, + }) +}) + +for (const [title, unit] of Object.entries(receiptsStorageTests)) { + const define = title.startsWith('only ') + ? test.only + : title.startsWith('skip ') + ? test.skip + : test + define(title, async (t) => { + await unit( + assertsFromExecutionContext(t), + await executionContextToUcantoTestServerContext(t) + ) + }) +} diff --git a/upload-api/test/storage/tasks.test.js b/upload-api/test/storage/tasks.test.js new file mode 100644 index 00000000..712ec19e --- /dev/null +++ b/upload-api/test/storage/tasks.test.js @@ -0,0 +1,32 @@ +/* eslint-disable no-loop-func, no-nested-ternary, no-only-tests/no-only-tests */ +import { tasksStorageTests } from '@web3-storage/upload-api/test' +import { test } from '../helpers/context.js' +import { + createS3, + createR2, + createDynamodDb, +} from '../helpers/resources.js' +import { executionContextToUcantoTestServerContext } from '../helpers/ucan.js' +import { assertsFromExecutionContext } from '../helpers/assert.js' + +test.before(async (t) => { + Object.assign(t.context, { + dynamo: await createDynamodDb(), + s3: (await createS3()).client, + r2: (await createR2()).client, + }) +}) + +for (const [title, unit] of Object.entries(tasksStorageTests)) { + const define = title.startsWith('only ') + ? test.only + : title.startsWith('skip ') + ? test.skip + : test + define(title, async (t) => { + await unit( + assertsFromExecutionContext(t), + await executionContextToUcantoTestServerContext(t) + ) + }) +}