diff --git a/billing/data/egress.js b/billing/data/egress.js index 3e7b630f..094b3e47 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -2,8 +2,9 @@ import { Link } from '@ucanto/server' import { DecodeFailure, EncodeFailure, Schema } from './lib.js' /** - * @typedef { import('../types').InferStoreRecord } EgressTrafficStoreRecord - * @typedef { import('../types').InferStoreRecord } EgressTrafficKeyStoreRecord + * @typedef { import('../lib/api').EgressTrafficData } EgressTrafficData + * @typedef { import('../types').InferStoreRecord & { pk: string, sk: string } } EgressTrafficStoreRecord + * @typedef {{ pk: string, sk: string }} EgressTrafficKeyStoreRecord */ export const egressSchema = Schema.struct({ @@ -15,14 +16,16 @@ export const egressSchema = Schema.struct({ cause: Schema.link(), }) -/** @type {import('../lib/api').Validator} */ +/** @type {import('../lib/api').Validator} */ export const validate = input => egressSchema.read(input) -/** @type {import('../lib/api').Encoder} */ +/** @type {import('../lib/api').Encoder} */ export const encode = input => { try { return { ok: { + pk: `${input.space.toString()}#${input.resource.toString()}`, + sk: `${input.servedAt.toISOString()}#${input.cause.toString()}`, space: input.space.toString(), customer: input.customer.toString(), resource: input.resource.toString(), @@ -86,19 +89,21 @@ export const lister = { /** @type {import('../lib/api').Encoder} */ encodeKey: input => ({ ok: { - space: input.space.toString(), - customer: input.customer.toString(), - from: input.from.toISOString() + pk: `${input.space.toString()}#${input.resource.toString()}`, + sk: `${input.servedAt.toISOString()}#${input.cause.toString()}`, } }), /** @type {import('../lib/api').Decoder} */ decodeKey: input => { try { + const [space, resource] = input.pk.split('#') + const [servedAt, cause] = input.sk.split('#') return { ok: { - space: Schema.did({ method: 'key' }).from(input.space), - customer: Schema.did({ method: 'mailto' }).from(input.customer), - from: new Date(input.from) + space: Schema.did({ method: 'key' }).from(space), + resource: Link.parse(resource), + servedAt: new Date(servedAt), + cause: Link.parse(cause), } } } catch (/** @type {any} */ err) { diff --git a/billing/functions/egress-traffic-queue.js b/billing/functions/egress-traffic-queue.js index 34a565f0..3343a210 100644 --- a/billing/functions/egress-traffic-queue.js +++ b/billing/functions/egress-traffic-queue.js @@ -60,21 +60,13 @@ export const handler = Sentry.AWSLambda.wrapHandler( const decoded = decodeStr(record.body) const egressData = expect(decoded, 'Failed to decode egress event') - const putResult = await egressTrafficEventStore.put(egressData) - if (putResult.error) throw putResult.error + const result = await egressTrafficEventStore.put(egressData) + expect(result, 'Failed to save egress event in database') const response = await customerStore.get({ customer: egressData.customer }) - if (response.error) { - return { - error: { - name: 'CustomerNotFound', - message: `Error getting customer ${egressData.customer}`, - cause: response.error - } - } - } - const customerAccount = response.ok.account + if (response.error) throw response.error + const customerAccount = response.ok.account expect( await recordBillingMeterEvent(stripe, billingMeterName, egressData, customerAccount), `Failed to record egress event in Stripe API for customer: ${egressData.customer}, account: ${customerAccount}, bytes: ${egressData.bytes}, servedAt: ${egressData.servedAt.toISOString()}, resource: ${egressData.resource}` diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 1b9c03c9..8942b1f3 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -189,7 +189,12 @@ export type EgressTrafficQueue = QueueAdder /** * List key for egress traffic data. */ -export interface EgressTrafficEventListKey { space: ConsumerDID, customer: CustomerDID, from: Date } +export interface EgressTrafficEventListKey { + space: ConsumerDID, + resource: UnknownLink, + servedAt: Date + cause: UnknownLink, +} /** * Captures details about a space that should be billed for a given customer in diff --git a/billing/tables/egress-traffic.js b/billing/tables/egress-traffic.js index fadd160c..6cabf639 100644 --- a/billing/tables/egress-traffic.js +++ b/billing/tables/egress-traffic.js @@ -8,6 +8,10 @@ import { validate, encode, lister, decode } from '../data/egress.js' */ export const egressTrafficTableProps = { fields: { + /** Composite key with format: "space#resource" */ + pk: 'string', + /** Composite key with format: "servedAt#cause" */ + sk: 'string', /** Space DID (did:key:...). */ space: 'string', /** Customer DID (did:mailto:...). */ @@ -21,11 +25,11 @@ export const egressTrafficTableProps = { /** UCAN invocation ID that caused the egress traffic. */ cause: 'string', }, - primaryIndex: { partitionKey: 'space', sortKey: 'servedAt' }, + primaryIndex: { partitionKey: 'pk', sortKey: 'sk' }, globalIndexes: { customer: { partitionKey: 'customer', - sortKey: 'servedAt', + sortKey: 'sk', projection: ['space', 'resource', 'bytes', 'cause', 'servedAt'] } } diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index ea321a73..bbf41bcc 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -191,7 +191,7 @@ export const createEgressTrafficTestContext = async () => { }) } - const egressTrafficTable = await createTable(awsServices.dynamo.client, egressTrafficTableProps, 'egress-traffic-') + const egressTrafficTable = await createTable(awsServices.dynamo.client, egressTrafficTableProps, 'egress-traffic-events-') const egressTrafficEventStore = { ...createEgressTrafficEventStore(awsServices.dynamo.client, { tableName: egressTrafficTable }), ...createStorePutterClient(awsServices.dynamo.client, { diff --git a/billing/utils/stripe.js b/billing/utils/stripe.js index 8eca2b86..a2cf4e2e 100644 --- a/billing/utils/stripe.js +++ b/billing/utils/stripe.js @@ -91,12 +91,14 @@ export async function recordBillingMeterEvent(stripe, billingMeterEventName, egr event_name: billingMeterEventName, payload: { stripe_customer_id: stripeCustomerId, + // Stripe expects the value to be a string value: egressData.bytes.toString(), }, + // Stripe expects the timestamp to be in seconds timestamp: Math.floor(egressData.servedAt.getTime() / 1000), }, { - idempotencyKey: `${egressData.servedAt.toISOString()}-${egressData.space}-${egressData.customer}-${egressData.resource}` + idempotencyKey: `${egressData.servedAt.toISOString()}-${egressData.space}-${egressData.customer}-${egressData.resource}-${egressData.cause}` } ) diff --git a/package-lock.json b/package-lock.json index bb860fad..992985ec 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12412,9 +12412,9 @@ } }, "node_modules/@web3-storage/upload-api": { - "version": "18.1.2", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.1.2.tgz", - "integrity": "sha512-C89PHJMtg64YpP7Hm62OC3m28BoMCB3Vz7SpAC7qifOPmxtU8EEUcN4x1dvbZkGW7HMpdDOhHVwWzzHY//G0BQ==", + "version": "18.1.3", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.1.3.tgz", + "integrity": "sha512-X3W4G/Pr4XRLMZ7PWmjbTg1T0llPIgvBAL8FvXxevM8lDczCAscz/EcqExcFY68WEu7ggBK7DiSERpnbIyAnBA==", "dependencies": { "@ucanto/client": "^9.0.1", "@ucanto/interface": "^10.0.1", @@ -30302,7 +30302,7 @@ "@web3-storage/access": "^20.0.0", "@web3-storage/capabilities": "^17.4.1", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.1.2", + "@web3-storage/upload-api": "^18.1.3", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "one-webcrypto": "^1.0.3", diff --git a/package.json b/package.json index f14924aa..b7b33340 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "packageManager": "npm@10.8.2+sha256.c8c61ba0fa0ab3b5120efd5ba97fdaf0e0b495eef647a97c4413919eda0a878b", "type": "module", "scripts": { - "start": "sst start", + "start": "sst dev", "build": "sst build", "check": "tsc --build", "deploy": "sst deploy --outputs-file .test-env.json", diff --git a/stacks/billing-db-stack.js b/stacks/billing-db-stack.js index 42712a2b..429f11d1 100644 --- a/stacks/billing-db-stack.js +++ b/stacks/billing-db-stack.js @@ -16,7 +16,7 @@ export const BillingDbStack = ({ stack }) => { ...usageTableProps, stream: 'new_image' }) - const egressTrafficTable = new Table(stack, 'egress-traffic', egressTrafficTableProps) + const egressTrafficTable = new Table(stack, 'egress-traffic-events', egressTrafficTableProps) stack.addOutputs({ customerTableName: customerTable.tableName, diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index f8b158f3..c87fa969 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -202,7 +202,7 @@ export function BillingStack ({ stack, app }) { consumer: { function: egressTrafficQueueHandler, deadLetterQueue: egressTrafficDLQ.cdk.queue, - cdk: { eventSource: { batchSize: 1 } } + cdk: { eventSource: { batchSize: 10 } } }, cdk: { queue: { visibilityTimeout: Duration.minutes(15) } } }) diff --git a/upload-api/package.json b/upload-api/package.json index ca31581d..3875f928 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -25,7 +25,7 @@ "@web3-storage/access": "^20.0.0", "@web3-storage/capabilities": "^17.4.1", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.1.2", + "@web3-storage/upload-api": "^18.1.3", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "one-webcrypto": "^1.0.3", diff --git a/upload-api/stores/usage.js b/upload-api/stores/usage.js index 89c01a0f..e51b8353 100644 --- a/upload-api/stores/usage.js +++ b/upload-api/stores/usage.js @@ -82,7 +82,10 @@ export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTraffi } const result = await egressTrafficQueue.add(record) - if (result.error) return result + if (result.error) { + console.error('Error sending egress event to queue:', result.error) + return result + } return { ok: { ...record, servedAt: servedAt.toISOString() } } }