diff --git a/billing/data/egress.js b/billing/data/egress.js index 87f5dde1..fb12624f 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -66,6 +66,13 @@ export const decode = input => { } } +/** @type {import('../lib/api').Decoder} */ +export const decodeStr = input => { + const data = decode(JSON.parse(input)) + if (data.error) throw data.error + return { ok: data.ok } +} + export const lister = { /** @type {import('../lib/api').Encoder} */ encodeKey: input => ({ diff --git a/billing/functions/egress-traffic-queue.js b/billing/functions/egress-traffic-handler.js similarity index 96% rename from billing/functions/egress-traffic-queue.js rename to billing/functions/egress-traffic-handler.js index 1ecb8fb7..e28aee67 100644 --- a/billing/functions/egress-traffic-queue.js +++ b/billing/functions/egress-traffic-handler.js @@ -74,6 +74,11 @@ export const handler = Sentry.AWSLambda.wrapHandler( console.error('Error processing egress event:', error) } } + + return { + statusCode: 200, + body: 'Egress events processed successfully' + } }) /** @@ -82,7 +87,7 @@ export const handler = Sentry.AWSLambda.wrapHandler( * The response is checked to ensure the usage record was created successfully. * * @param {import('stripe').Stripe} stripe - * @param {import('../data/egress').EgressEvent} egressEvent + * @param {import('../data/egress.js').EgressEvent} egressEvent * @returns {Promise>} */ async function sendRecordUsageToStripe(stripe, egressEvent) { diff --git a/billing/tables/egress.js b/billing/tables/egress.js index 8e2882ee..a0b8ebd5 100644 --- a/billing/tables/egress.js +++ b/billing/tables/egress.js @@ -29,5 +29,5 @@ export const egressTableProps = { */ export const createEgressEventStore = (conf, { tableName }) => ({ ...createStorePutterClient(conf, { tableName, validate, encode }), - ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) + ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) }) diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 478c5d1f..4e8fde2c 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -6,6 +6,7 @@ import { decode as decodeSpaceBillingInstruction } from '../../data/space-billin import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js' import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js' import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js' +import { encode as encodeEgressEvent, decodeStr as decodeEgressEvent, validate as validateEgressEvent } from '../../data/egress.js' import { createCustomerBillingQueue } from '../../queues/customer.js' import { createSpaceBillingQueue } from '../../queues/space.js' import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js' @@ -16,7 +17,9 @@ import { createSpaceDiffStore, spaceDiffTableProps } from '../../tables/space-di import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/space-snapshot.js' import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' - +import { createEgressEventQueue } from '../../queues/egress.js' +import { egressTableProps, createEgressEventStore } from '../../tables/egress.js' +import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js' /** * @typedef {{ * dynamo: import('./aws').AWSService @@ -137,6 +140,46 @@ export const createUCANStreamTestContext = async () => { return { consumerStore, spaceDiffStore } } +export const createEgressTrafficQueueTestContext = async () => { + await createAWSServices() + + const egressTableName = await createTable(awsServices.dynamo.client, egressTableProps, 'egress-') + const store = createEgressEventStore(awsServices.dynamo.client, { tableName: egressTableName }) + const egressEventStore = { + put: store.put, + list: store.list, + } + + const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-')) + const egressQueue = { + add: createEgressEventQueue(awsServices.sqs.client, { url: egressQueueURL }).add, + remove: createQueueRemoverClient(awsServices.sqs.client, { url: egressQueueURL, decode: decodeEgressEvent }).remove, + } + + return { + egressEventStore, + egressQueue, + egressHandler: createEgressTrafficHandler, + egressTable: egressTableName, + queueUrl: egressQueueURL, + accountId: (await awsServices.dynamo.client.config.credentials()).accountId, + callbackWaitsForEmptyEventLoop: true, + functionName: 'your-function-name', + functionVersion: 'your-function-version', + region: awsServices.dynamo.client.config.region, + invokedFunctionArn: `arn:aws:lambda:${awsServices.dynamo.client.config.region}:${awsServices.dynamo.client.config.credentials().accountId}:function:your-function-name`, + memoryLimitInMB: '128', + awsRequestId: 'your-request-id', + logGroupName: '/aws/lambda/your-function-name', + logStreamName: 'your-log-stream', + getRemainingTimeInMillis: () => 1000, + done: () => {}, + fail: () => {}, + succeed: () => {}, + stripeSecretKey: "", // FIXME (fforbeck): how to get Stripe secret key in a test? + } +} + /** * @template C * @param {import('../lib/api').TestSuite} suite diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js new file mode 100644 index 00000000..e63f77bb --- /dev/null +++ b/billing/test/helpers/egress.js @@ -0,0 +1,13 @@ +import { randomDID } from './did.js' +import { randomLink } from './dag.js' + +/** + * @param {Partial} [base] + * @returns {Promise} + */ +export const randomEgressEvent = async (base = {}) => ({ + customerId: await randomDID(), + resourceId: randomLink().toString(), + timestamp: new Date(), + ...base +}) \ No newline at end of file diff --git a/billing/test/lib.egress-traffic-queue.spec.js b/billing/test/lib.egress-traffic-queue.spec.js new file mode 100644 index 00000000..14f24865 --- /dev/null +++ b/billing/test/lib.egress-traffic-queue.spec.js @@ -0,0 +1,4 @@ +import * as EgressTrafficQueue from './lib/egress-traffic-queue.js' +import { bindTestContext, createEgressTrafficQueueTestContext } from './helpers/context.js' + +export const test = bindTestContext(EgressTrafficQueue.test, createEgressTrafficQueueTestContext) \ No newline at end of file diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index 68814ca9..a3737679 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -18,8 +18,12 @@ import { SpaceSnapshotStore, UsageStore, UsageListKey, - Usage + Usage, + QueueAdder, + EgressEventStore } from '../../lib/api.js' +import { EgressEvent } from '../../data/egress.js' +import { Context } from 'aws-lambda' export interface BillingCronTestContext { customerStore: CustomerStore & StorePutter @@ -38,6 +42,18 @@ export interface SpaceBillingQueueTestContext { usageStore: UsageStore & StoreLister } +export type EgressQueue = QueueAdder + +export interface EgressTestContext extends Context { + egressQueue: EgressQueue & QueueRemover + egressEventStore: EgressEventStore + egressHandler: (event: import('aws-lambda').SQSEvent, context: import('aws-lambda').Context) => Promise<{ statusCode: number, body: string }> + queueUrl: string + region: string + stripeSecretKey: string + accountId: string +} + export interface StripeTestContext { customerStore: CustomerStore } @@ -53,6 +69,7 @@ export type TestContext = & SpaceBillingQueueTestContext & StripeTestContext & UCANStreamTestContext + & EgressTestContext /** QueueRemover can remove items from the head of the queue. */ export interface QueueRemover { diff --git a/billing/test/lib/egress-traffic-queue.js b/billing/test/lib/egress-traffic-queue.js new file mode 100644 index 00000000..5fb0e452 --- /dev/null +++ b/billing/test/lib/egress-traffic-queue.js @@ -0,0 +1,68 @@ +import { randomEgressEvent } from '../helpers/egress.js' +import { collectQueueMessages } from '../helpers/queue.js' + +/** @type {import('./api').TestSuite} */ +export const test = { + 'should process egress events': async (/** @type {import('entail').assert} */ assert, ctx) => { + const maxEvents = 100 + const events = await Promise.all( + Array.from({ length: maxEvents }, () => randomEgressEvent()) + ) + + // add egress events to the queue to simulate events from the Freeway worker + for (const e of events) { + console.log(`Adding egress event to the queue: CustomerId: ${e.customerId}, ResourceId: ${e.resourceId}, Timestamp: ${e.timestamp.toISOString()}`) + await ctx.egressQueue.add(e) + } + + // simulate the SQS event that triggers the handler + // FIXME (fforbeck): why the events are not collected? + const collected = await collectQueueMessages(ctx.egressQueue) + assert.ok(collected.ok, 'Failed to collect queue messages') + assert.equal(collected.ok.length, events.length, 'Collected queue messages length does not match') + + // @type {import('aws-lambda').SQSEvent} + const sqsEvent = { + Records: collected.ok.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: JSON.stringify(e), + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.queueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.timestamp.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.timestamp.getTime().toString(), + }, + messageAttributes: {}, + })) + } + + const response = await ctx.egressHandler(sqsEvent, ctx) + assert.equal(response.statusCode, 200) + assert.equal(response.body, 'Egress events processed successfully') + + // ensure we got a egress record for each event + for (const e of events) { + const record = await ctx.egressEventStore.list({ + customerId: e.customerId, + resourceId: e.resourceId, + from: e.timestamp, + }) + assert.ok(record.ok) + assert.equal(record.ok.results.length, 1) + assert.equal(record.ok.results[0].customerId, e.customerId) + assert.equal(record.ok.results[0].resourceId, e.resourceId) + assert.equal(record.ok.results[0].timestamp, e.timestamp) + } + // FIXME (fforbeck): how to check we send the events to stripe? + // we need to mock the stripe client + // and check that the correct events are sent to stripe + } +}