diff --git a/billing/data/egress.js b/billing/data/egress.js new file mode 100644 index 00000000..fb12624f --- /dev/null +++ b/billing/data/egress.js @@ -0,0 +1,84 @@ +import { DecodeFailure, EncodeFailure, Schema } from './lib.js' + +/** + * @typedef {import('../lib/api').EgressEvent} EgressEvent + * @typedef {import('../types').InferStoreRecord & { pk: string, sk: string }} EgressEventStoreRecord + * @typedef {import('../types').StoreRecord} StoreRecord + * @typedef {import('../lib/api').EgressEventListKey} EgressEventListKey + * @typedef {{ pk: string, sk: string }} EgressEventListStoreRecord + */ + +export const egressSchema = Schema.struct({ + customerId: Schema.did({ method: 'mailto' }), + resourceId: Schema.text(), + timestamp: Schema.date(), +}) + +/** @type {import('../lib/api').Validator} */ +export const validate = input => egressSchema.read(input) + +/** @type {import('../lib/api').Encoder} */ +export const encode = input => { + try { + return { + ok: { + pk: `${input.timestamp.toISOString()}#${input.customerId}`, + sk: `${input.timestamp.toISOString()}#${input.customerId}#${input.resourceId}`, + customerId: input.customerId, + resourceId: input.resourceId, + timestamp: input.timestamp.toISOString(), + } + } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Encoder} */ +export const encodeStr = input => { + try { + const data = encode(input) + if (data.error) throw data.error + return { ok: JSON.stringify(data.ok) } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decode = input => { + try { + return { + ok: { + customerId: Schema.did({ method: 'mailto' }).from(input.customerId), + resourceId: /** @type {string} */ (input.resourceId), + timestamp: new Date(input.timestamp), + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decodeStr = input => { + const data = decode(JSON.parse(input)) + if (data.error) throw data.error + return { ok: data.ok } +} + +export const lister = { + /** @type {import('../lib/api').Encoder} */ + encodeKey: input => ({ + ok: { + pk: `${input.from.toISOString()}#${input.customerId}`, + sk: `${input.from.toISOString()}#${input.customerId}#${input.resourceId}` + } + }) +} \ No newline at end of file diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js new file mode 100644 index 00000000..e28aee67 --- /dev/null +++ b/billing/functions/egress-traffic-handler.js @@ -0,0 +1,111 @@ +import * as Sentry from '@sentry/serverless' +import { Config } from 'sst/node/config' +import { expect } from './lib.js' +import { createEgressEventStore } from '../tables/egress.js' +import { decode } from '../data/egress.js' +import { SQSClient, DeleteMessageCommand } from '@aws-sdk/client-sqs' +import { mustGetEnv } from '../../lib/env.js' +import Stripe from 'stripe' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0 +}) + +/** + * @typedef {{ + * egressTable?: string + * queueUrl?: string + * region?: 'us-west-2'|'us-east-2' + * stripeSecretKey?: string + * }} CustomHandlerContext + */ + +/** + * AWS Lambda handler to process egress events from the egress traffic queue. + * Each event is a JSON object with a `customerId`, `resourceId` and `timestamp`. + * The event is decoded and stored in the egress event table. + * The message is then deleted from the queue when successful. + */ +export const handler = Sentry.AWSLambda.wrapHandler( + /** + * @param {import('aws-lambda').SQSEvent} event + * @param {import('aws-lambda').Context} context + */ + async (event, context) => { + /** @type {CustomHandlerContext|undefined} */ + const customContext = context?.clientContext?.Custom + const region = customContext?.region ?? mustGetEnv('AWS_REGION') + const egressTable = customContext?.egressTable ?? mustGetEnv('EGRESS_TABLE_NAME') + const queueUrl = customContext?.queueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL') + const sqsClient = new SQSClient({ region }) + const egressEventStore = createEgressEventStore({ region }, { tableName: egressTable }) + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + + if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + + for (const record of event.Records) { + try { + const messageBody = JSON.parse(record.body) + const decoded = decode(messageBody) + const egressEvent = expect(decoded, 'Failed to decode egress message') + + expect( + await egressEventStore.put(egressEvent), + `Failed to store egress event for customerId: ${egressEvent.customerId}, resourceId: ${egressEvent.resourceId}, timestamp: ${egressEvent.timestamp.toISOString()}` + ) + + expect( + await sendRecordUsageToStripe(stripe, egressEvent), + `Failed to send record usage to Stripe for customerId: ${egressEvent.customerId}, resourceId: ${egressEvent.resourceId}, timestamp: ${egressEvent.timestamp.toISOString()}` + ) + + /** + * SQS requires explicit acknowledgment that a message has been successfully processed. + * This is done by deleting the message from the queue using its ReceiptHandle + */ + await sqsClient.send(new DeleteMessageCommand({ + QueueUrl: queueUrl, + ReceiptHandle: record.receiptHandle + })) + } catch (error) { + console.error('Error processing egress event:', error) + } + } + + return { + statusCode: 200, + body: 'Egress events processed successfully' + } + }) + +/** + * Sends a record usage to Stripe for a given egress event. + * It uses the Stripe API v2023-10-16 to create a usage record for the given subscription item and quantity. + * The response is checked to ensure the usage record was created successfully. + * + * @param {import('stripe').Stripe} stripe + * @param {import('../data/egress.js').EgressEvent} egressEvent + * @returns {Promise>} + */ +async function sendRecordUsageToStripe(stripe, egressEvent) { + const subscriptionItem = { + id: 'sub_123', // FIXME (fforbeck): + // Where do we get this from? + // Should be in the event? + // Should we find it in the Stripe API using the customerId? + } + const response = await stripe.subscriptionItems.createUsageRecord( + subscriptionItem.id, + { + quantity: 1, // always 1 for each egress event + timestamp: egressEvent.timestamp.getTime() + } + ) + if (response.object === 'usage_record') { + return { ok: true } + } + return { error: new Error('Failed to send record usage to Stripe') } +} diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 22df39de..dfdb6167 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -90,7 +90,7 @@ export interface CustomerKey { customer: CustomerDID } -export interface CustomerListOptions extends Pageable {} +export interface CustomerListOptions extends Pageable { } export type CustomerStore = & StoreGetter @@ -133,6 +133,21 @@ export interface UsageListKey { customer: CustomerDID, from: Date } export type UsageStore = StorePutter + +/** + * The event that is emitted when egress traffic is detected. + */ +export interface EgressEvent { + customerId: string + resourceId: string + timestamp: Date +} + + +export interface EgressEventListKey { customerId: string, resourceId: string, from: Date } + +export type EgressEventStore = StorePutter & StoreLister + // Billing queues ///////////////////////////////////////////////////////////// /** @@ -188,7 +203,7 @@ export interface ConsumerListKey { consumer: ConsumerDID } export type ConsumerStore = & StoreGetter - & StoreLister> + & StoreLister> export interface Subscription { customer: CustomerDID @@ -205,7 +220,7 @@ export interface SubscriptionListKey { customer: CustomerDID } export type SubscriptionStore = & StoreGetter - & StoreLister> + & StoreLister> // UCAN invocation //////////////////////////////////////////////////////////// @@ -302,7 +317,7 @@ export interface InsufficientRecords extends Failure { /** StorePutter allows a single item to be put in the store by it's key. */ export interface StorePutter { /** Puts a single item into the store by it's key */ - put: (rec: T) => Promise> + put: (rec: T) => Promise> } /** @@ -316,23 +331,23 @@ export interface StoreBatchPutter { * not transactional. A failure may mean 1 or more records succeeded to * be written. */ - batchPut: (rec: Iterable) => Promise> + batchPut: (rec: Iterable) => Promise> } /** StoreGetter allows a single item to be retrieved by it's key. */ export interface StoreGetter { /** Gets a single item by it's key. */ - get: (key: K) => Promise|DecodeFailure|StoreOperationFailure>> + get: (key: K) => Promise | DecodeFailure | StoreOperationFailure>> } /** StoreLister allows items in the store to be listed page by page. */ export interface StoreLister { /** Lists items in the store. */ - list: (key: K, options?: Pageable) => Promise, EncodeFailure|DecodeFailure|StoreOperationFailure>> + list: (key: K, options?: Pageable) => Promise, EncodeFailure | DecodeFailure | StoreOperationFailure>> } /** QueueAdder allows messages to be added to the end of the queue. */ export interface QueueAdder { /** Adds a message to the end of the queue. */ - add: (message: T) => Promise> + add: (message: T) => Promise> } diff --git a/billing/queues/egress.js b/billing/queues/egress.js new file mode 100644 index 00000000..c2cad3a9 --- /dev/null +++ b/billing/queues/egress.js @@ -0,0 +1,9 @@ +import { createQueueAdderClient } from './client.js' +import { encodeStr, validate } from '../data/egress.js' + +/** + * @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf + * @param {{ url: URL }} context + */ +export const createEgressEventQueue = (conf, { url }) => + createQueueAdderClient(conf, { url, encode:encodeStr, validate }) diff --git a/billing/tables/egress.js b/billing/tables/egress.js new file mode 100644 index 00000000..a0b8ebd5 --- /dev/null +++ b/billing/tables/egress.js @@ -0,0 +1,33 @@ +import { createStorePutterClient, createStoreListerClient } from './client.js' +import { validate, encode, lister, decode } from '../data/egress.js' + +/** + * Stores egress events for tracking requests served to customers. + * + * @type {import('sst/constructs').TableProps} + */ +export const egressTableProps = { + fields: { + /** Composite key with format: "customerId" */ + pk: 'string', + /** Composite key with format: "timestamp#customerId#resourceId" */ + sk: 'string', + /** Customer DID (did:mailto:...). */ + customerId: 'string', + /** Resource CID. */ + resourceId: 'string', + /** ISO timestamp of the event. */ + timestamp: 'string', + }, + primaryIndex: { partitionKey: 'pk', sortKey: 'sk' } +} + +/** + * @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf + * @param {{ tableName: string }} context + * @returns {import('../lib/api.js').EgressEventStore} + */ +export const createEgressEventStore = (conf, { tableName }) => ({ + ...createStorePutterClient(conf, { tableName, validate, encode }), + ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) +}) diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 478c5d1f..4e8fde2c 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -6,6 +6,7 @@ import { decode as decodeSpaceBillingInstruction } from '../../data/space-billin import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js' import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js' import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js' +import { encode as encodeEgressEvent, decodeStr as decodeEgressEvent, validate as validateEgressEvent } from '../../data/egress.js' import { createCustomerBillingQueue } from '../../queues/customer.js' import { createSpaceBillingQueue } from '../../queues/space.js' import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js' @@ -16,7 +17,9 @@ import { createSpaceDiffStore, spaceDiffTableProps } from '../../tables/space-di import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/space-snapshot.js' import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' - +import { createEgressEventQueue } from '../../queues/egress.js' +import { egressTableProps, createEgressEventStore } from '../../tables/egress.js' +import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js' /** * @typedef {{ * dynamo: import('./aws').AWSService @@ -137,6 +140,46 @@ export const createUCANStreamTestContext = async () => { return { consumerStore, spaceDiffStore } } +export const createEgressTrafficQueueTestContext = async () => { + await createAWSServices() + + const egressTableName = await createTable(awsServices.dynamo.client, egressTableProps, 'egress-') + const store = createEgressEventStore(awsServices.dynamo.client, { tableName: egressTableName }) + const egressEventStore = { + put: store.put, + list: store.list, + } + + const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-')) + const egressQueue = { + add: createEgressEventQueue(awsServices.sqs.client, { url: egressQueueURL }).add, + remove: createQueueRemoverClient(awsServices.sqs.client, { url: egressQueueURL, decode: decodeEgressEvent }).remove, + } + + return { + egressEventStore, + egressQueue, + egressHandler: createEgressTrafficHandler, + egressTable: egressTableName, + queueUrl: egressQueueURL, + accountId: (await awsServices.dynamo.client.config.credentials()).accountId, + callbackWaitsForEmptyEventLoop: true, + functionName: 'your-function-name', + functionVersion: 'your-function-version', + region: awsServices.dynamo.client.config.region, + invokedFunctionArn: `arn:aws:lambda:${awsServices.dynamo.client.config.region}:${awsServices.dynamo.client.config.credentials().accountId}:function:your-function-name`, + memoryLimitInMB: '128', + awsRequestId: 'your-request-id', + logGroupName: '/aws/lambda/your-function-name', + logStreamName: 'your-log-stream', + getRemainingTimeInMillis: () => 1000, + done: () => {}, + fail: () => {}, + succeed: () => {}, + stripeSecretKey: "", // FIXME (fforbeck): how to get Stripe secret key in a test? + } +} + /** * @template C * @param {import('../lib/api').TestSuite} suite diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js new file mode 100644 index 00000000..e63f77bb --- /dev/null +++ b/billing/test/helpers/egress.js @@ -0,0 +1,13 @@ +import { randomDID } from './did.js' +import { randomLink } from './dag.js' + +/** + * @param {Partial} [base] + * @returns {Promise} + */ +export const randomEgressEvent = async (base = {}) => ({ + customerId: await randomDID(), + resourceId: randomLink().toString(), + timestamp: new Date(), + ...base +}) \ No newline at end of file diff --git a/billing/test/lib.egress-traffic-queue.spec.js b/billing/test/lib.egress-traffic-queue.spec.js new file mode 100644 index 00000000..14f24865 --- /dev/null +++ b/billing/test/lib.egress-traffic-queue.spec.js @@ -0,0 +1,4 @@ +import * as EgressTrafficQueue from './lib/egress-traffic-queue.js' +import { bindTestContext, createEgressTrafficQueueTestContext } from './helpers/context.js' + +export const test = bindTestContext(EgressTrafficQueue.test, createEgressTrafficQueueTestContext) \ No newline at end of file diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index 68814ca9..a3737679 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -18,8 +18,12 @@ import { SpaceSnapshotStore, UsageStore, UsageListKey, - Usage + Usage, + QueueAdder, + EgressEventStore } from '../../lib/api.js' +import { EgressEvent } from '../../data/egress.js' +import { Context } from 'aws-lambda' export interface BillingCronTestContext { customerStore: CustomerStore & StorePutter @@ -38,6 +42,18 @@ export interface SpaceBillingQueueTestContext { usageStore: UsageStore & StoreLister } +export type EgressQueue = QueueAdder + +export interface EgressTestContext extends Context { + egressQueue: EgressQueue & QueueRemover + egressEventStore: EgressEventStore + egressHandler: (event: import('aws-lambda').SQSEvent, context: import('aws-lambda').Context) => Promise<{ statusCode: number, body: string }> + queueUrl: string + region: string + stripeSecretKey: string + accountId: string +} + export interface StripeTestContext { customerStore: CustomerStore } @@ -53,6 +69,7 @@ export type TestContext = & SpaceBillingQueueTestContext & StripeTestContext & UCANStreamTestContext + & EgressTestContext /** QueueRemover can remove items from the head of the queue. */ export interface QueueRemover { diff --git a/billing/test/lib/egress-traffic-queue.js b/billing/test/lib/egress-traffic-queue.js new file mode 100644 index 00000000..5fb0e452 --- /dev/null +++ b/billing/test/lib/egress-traffic-queue.js @@ -0,0 +1,68 @@ +import { randomEgressEvent } from '../helpers/egress.js' +import { collectQueueMessages } from '../helpers/queue.js' + +/** @type {import('./api').TestSuite} */ +export const test = { + 'should process egress events': async (/** @type {import('entail').assert} */ assert, ctx) => { + const maxEvents = 100 + const events = await Promise.all( + Array.from({ length: maxEvents }, () => randomEgressEvent()) + ) + + // add egress events to the queue to simulate events from the Freeway worker + for (const e of events) { + console.log(`Adding egress event to the queue: CustomerId: ${e.customerId}, ResourceId: ${e.resourceId}, Timestamp: ${e.timestamp.toISOString()}`) + await ctx.egressQueue.add(e) + } + + // simulate the SQS event that triggers the handler + // FIXME (fforbeck): why the events are not collected? + const collected = await collectQueueMessages(ctx.egressQueue) + assert.ok(collected.ok, 'Failed to collect queue messages') + assert.equal(collected.ok.length, events.length, 'Collected queue messages length does not match') + + // @type {import('aws-lambda').SQSEvent} + const sqsEvent = { + Records: collected.ok.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: JSON.stringify(e), + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.queueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.timestamp.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.timestamp.getTime().toString(), + }, + messageAttributes: {}, + })) + } + + const response = await ctx.egressHandler(sqsEvent, ctx) + assert.equal(response.statusCode, 200) + assert.equal(response.body, 'Egress events processed successfully') + + // ensure we got a egress record for each event + for (const e of events) { + const record = await ctx.egressEventStore.list({ + customerId: e.customerId, + resourceId: e.resourceId, + from: e.timestamp, + }) + assert.ok(record.ok) + assert.equal(record.ok.results.length, 1) + assert.equal(record.ok.results[0].customerId, e.customerId) + assert.equal(record.ok.results[0].resourceId, e.resourceId) + assert.equal(record.ok.results[0].timestamp, e.timestamp) + } + // FIXME (fforbeck): how to check we send the events to stripe? + // we need to mock the stripe client + // and check that the correct events are sent to stripe + } +}