From a12d35eb2fcce8f62eaeadf34665835ec7d69204 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Tue, 22 Oct 2024 15:55:21 -0300 Subject: [PATCH] implemented suggested changes --- billing/data/egress.js | 74 ++++---- billing/functions/egress-traffic-handler.js | 159 +++++++++-------- billing/queues/egress-traffic.js | 2 +- billing/test/helpers/egress.js | 12 +- billing/test/lib/egress-traffic.js | 184 ++++++++++---------- billing/test/utils/stripe.js | 17 +- billing/utils/stripe.js | 14 ++ 7 files changed, 248 insertions(+), 214 deletions(-) diff --git a/billing/data/egress.js b/billing/data/egress.js index d44bbd3c..37d68c3d 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -2,10 +2,10 @@ import { Link } from '@ucanto/server' import { DecodeFailure, EncodeFailure, Schema } from './lib.js' export const egressSchema = Schema.struct({ - customer: Schema.did({ method: 'mailto' }), - resource: Schema.link(), - bytes: Schema.bigint(), - servedAt: Schema.date(), + customer: Schema.did({ method: 'mailto' }), + resource: Schema.link(), + bytes: Schema.bigint(), + servedAt: Schema.date(), }) /** @type {import('../lib/api').Validator} */ @@ -13,47 +13,47 @@ export const validate = input => egressSchema.read(input) /** @type {import('../lib/api').Encoder} */ export const encode = input => { - try { - return { - ok: JSON.stringify({ - customer: input.customer.toString(), - resource: input.resource.toString(), - bytes: input.bytes.toString(), - servedAt: input.servedAt.toISOString(), - }) - } - } catch (/** @type {any} */ err) { - return { - error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) - } + try { + return { + ok: JSON.stringify({ + customer: input.customer.toString(), + resource: input.resource.toString(), + bytes: input.bytes.toString(), + servedAt: input.servedAt.toISOString(), + }) } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } } /** @type {import('../lib/api').Decoder} */ export const decode = input => { - try { - return { - ok: { - customer: Schema.did({ method: 'mailto' }).from(input.customer), - resource: Link.parse(/** @type {string} */(input.resource)), - bytes: BigInt(input.bytes), - servedAt: new Date(input.servedAt), - } - } - } catch (/** @type {any} */ err) { - return { - error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) - } + try { + return { + ok: { + customer: Schema.did({ method: 'mailto' }).from(input.customer), + resource: Link.parse(/** @type {string} */(input.resource)), + bytes: BigInt(input.bytes), + servedAt: new Date(input.servedAt), + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) } + } } /** @type {import('../lib/api').Decoder} */ export const decodeStr = input => { - try { - return decode(JSON.parse(input)) - } catch (/** @type {any} */ err) { - return { - error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) - } + try { + return decode(JSON.parse(input)) + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) } -} + } +} \ No newline at end of file diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js index fe9a29c9..6d4c2b4d 100644 --- a/billing/functions/egress-traffic-handler.js +++ b/billing/functions/egress-traffic-handler.js @@ -5,11 +5,13 @@ import { mustGetEnv } from '../../lib/env.js' import { createCustomerStore } from '../tables/customer.js' import Stripe from 'stripe' import { Config } from 'sst/node/config' +import { accountIDToStripeCustomerID } from '../utils/stripe.js' + Sentry.AWSLambda.init({ - environment: process.env.SST_STAGE, - dsn: process.env.SENTRY_DSN, - tracesSampleRate: 1.0 + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0 }) /** @@ -25,50 +27,51 @@ Sentry.AWSLambda.init({ /** * AWS Lambda handler to process egress events from the egress traffic queue. - * Each event is a JSON object with `customer`, `resource`, `bytes` and `se`. + * Each event is a JSON object with `customer`, `resource`, `bytes` and `servedAt`. * The message is then deleted from the queue when successful. */ export const handler = Sentry.AWSLambda.wrapHandler( - /** - * @param {import('aws-lambda').SQSEvent} event - * @param {import('aws-lambda').Context} context - */ - async (event, context) => { - /** @type {CustomHandlerContext|undefined} */ - const customContext = context?.clientContext?.Custom - const region = customContext?.region ?? mustGetEnv('AWS_REGION') - const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') - const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) - - const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY - if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') - - const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') - if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') + /** + * @param {import('aws-lambda').SQSEvent} event + * @param {import('aws-lambda').Context} context + */ + async (event, context) => { + /** @type {CustomHandlerContext|undefined} */ + const customContext = context?.clientContext?.Custom + const region = customContext?.region ?? mustGetEnv('AWS_REGION') + const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') + const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) - const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') - for (const record of event.Records) { - try { - const decoded = decodeStr(record.body) - const egressEvent = expect(decoded, 'Failed to decode egress event') + const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') + if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') - expect( - await recordEgress(customerStore, stripe, billingMeterName, egressEvent), - `Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}` - ) + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + const batchItemFailures = [] + for (const record of event.Records) { + try { + const decoded = decodeStr(record.body) + const egressEvent = expect(decoded, 'Failed to decode egress event') - // TODO: delete the message from the queue? - } catch (error) { - console.error('Error processing egress event:', error) - } - } + expect( + await recordEgress(customerStore, stripe, billingMeterName, egressEvent), + `Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}` + ) + } catch (error) { + console.error('Error processing egress event:', error) + batchItemFailures.push({ itemIdentifier: record.messageId }) + } + } - return { - statusCode: 200, - body: 'Egress events processed successfully' - } - }, + return { + statusCode: 200, + body: 'Egress events processed successfully', + // Return the failed records so they can be retried + batchItemFailures + } + }, ) /** @@ -77,48 +80,50 @@ export const handler = Sentry.AWSLambda.wrapHandler( * @param {import('../lib/api.js').CustomerStore} customerStore * @param {import('stripe').Stripe} stripe * @param {string} billingMeterEventName - * @param {import('../lib/api.js').EgressTrafficData} egressEventData + * @param {import('../lib/api').EgressTrafficData} egressEventData */ async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) { - const response = await customerStore.get({ customer: egressEventData.customer }) - if (response.error) { - return { - error: { - name: 'CustomerNotFound', - message: `Error getting customer ${egressEventData.customer}`, - cause: response.error - } - } + const response = await customerStore.get({ customer: egressEventData.customer }) + if (response.error) { + return { + error: { + name: 'CustomerNotFound', + message: `Error getting customer ${egressEventData.customer}`, + cause: response.error + } } - const stripeCustomerId = response.ok.account.slice('stripe:'.length) - /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ - const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) - if (stripeCustomer.deleted) { - return { - error: { - name: 'StripeCustomerNotFound', - message: `Customer ${stripeCustomerId} has been deleted from Stripe`, - } - } + } + + const stripeCustomerId = accountIDToStripeCustomerID(response.ok.account) + /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ + const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) + if (stripeCustomer.deleted) { + return { + error: { + name: 'StripeCustomerNotFound', + message: `Customer ${stripeCustomerId} has been deleted from Stripe`, + } } + } - // TODO (fforbeck): implement some retry logic in case rate limiting errors - /** @type {import('stripe').Stripe.Billing.MeterEvent} */ - const meterEvent = await stripe.billing.meterEvents.create({ - event_name: billingMeterEventName, - payload: { - stripe_customer_id: stripeCustomerId, - value: egressEventData.bytes.toString(), - }, - timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000) - }) - if (meterEvent.identifier) { - return { ok: { meterEvent } } - } - return { - error: { - name: 'StripeBillingMeterEventCreationFailed', - message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`, - } + /** @type {import('stripe').Stripe.Billing.MeterEvent} */ + const meterEvent = await stripe.billing.meterEvents.create({ + event_name: billingMeterEventName, + payload: { + stripe_customer_id: stripeCustomerId, + value: egressEventData.bytes.toString(), + }, + timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000) + }) + + // Identifier is only set if the event was successfully created + if (meterEvent.identifier) { + return { ok: { meterEvent } } + } + return { + error: { + name: 'StripeBillingMeterEventCreationFailed', + message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`, } + } } \ No newline at end of file diff --git a/billing/queues/egress-traffic.js b/billing/queues/egress-traffic.js index 95f79737..5add3f7c 100644 --- a/billing/queues/egress-traffic.js +++ b/billing/queues/egress-traffic.js @@ -6,4 +6,4 @@ import { encode, validate } from '../data/egress.js' * @param {{ url: URL }} context */ export const createEgressTrafficQueue = (conf, { url }) => - createQueueAdderClient(conf, { url, encode, validate }) \ No newline at end of file + createQueueAdderClient(conf, { url, encode, validate }) diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js index f2110081..01321ddf 100644 --- a/billing/test/helpers/egress.js +++ b/billing/test/helpers/egress.js @@ -5,9 +5,9 @@ import { randomLink } from './dag.js' * @returns {import('../../lib/api').EgressTrafficData} */ export const randomEgressEvent = (customer) => ({ - customer: customer.customer, - resource: randomLink(), - bytes: BigInt(Math.floor(Math.random() * 1000000)), - // Random timestamp within the last 1 hour - servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), -}) \ No newline at end of file + customer: customer.customer, + resource: randomLink(), + bytes: BigInt(Math.floor(Math.random() * 1000000)), + // Random timestamp within the last 1 hour + servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)) +}) diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js index 3d3cabab..189bb250 100644 --- a/billing/test/lib/egress-traffic.js +++ b/billing/test/lib/egress-traffic.js @@ -6,104 +6,104 @@ import * as DidMailto from '@web3-storage/did-mailto' /** @type {import('./api').TestSuite} */ export const test = { - /** - * @param {import('entail').assert} assert - * @param {import('./api').EgressTrafficTestContext} ctx - */ - 'should process all the egress traffic events from the queue': async (assert, ctx) => { - let stripeCustomerId; - try { - // 0. Create a test customer email, add it to stripe and to the customer store - const didMailto = randomDIDMailto() - const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) - const stripeCustomer = await ctx.stripe.customers.create({ email }) - assert.ok(stripeCustomer.id, 'Error adding customer to stripe') - stripeCustomerId = stripeCustomer.id + /** + * @param {import('entail').assert} assert + * @param {import('./api').EgressTrafficTestContext} ctx + */ + 'should process all the egress traffic events from the queue': async (assert, ctx) => { + let stripeCustomerId; + try { + // 0. Create a test customer email, add it to stripe and to the customer store + const didMailto = randomDIDMailto() + const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) + const stripeCustomer = await ctx.stripe.customers.create({ email }) + assert.ok(stripeCustomer.id, 'Error adding customer to stripe') + stripeCustomerId = stripeCustomer.id - const customer = randomCustomer({ - customer: didMailto, - /** @type {`stripe:${string}`} */ - account: `stripe:${stripeCustomerId}` - }) - const { error } = await ctx.customerStore.put(customer) - assert.ok(!error, 'Error adding customer') + const customer = randomCustomer({ + customer: didMailto, + /** @type {`stripe:${string}`} */ + account: `stripe:${stripeCustomerId}` + }) + const { error } = await ctx.customerStore.put(customer) + assert.ok(!error, 'Error adding customer') - // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker - const maxEvents = 10 - /** @type {import('../../lib/api').EgressTrafficData[]} */ - const events = await Promise.all( - Array.from( - { length: maxEvents }, - () => randomEgressEvent(customer) - ) - ) + // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker + const maxEvents = 10 + /** @type {import('../../lib/api').EgressTrafficData[]} */ + const events = await Promise.all( + Array.from( + { length: maxEvents }, + () => randomEgressEvent(customer) + ) + ) - for (const e of events) { - console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) - const result = await ctx.egressTrafficQueue.add(e) - assert.ok(!result.error, 'Error adding egress event to the queue') - } + for (const e of events) { + console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) + const result = await ctx.egressTrafficQueue.add(e) + assert.ok(!result.error, 'Error adding egress event to the queue') + } - // 2. Create a SQS event batch - // @type {import('aws-lambda').SQSEvent} - const sqsEventBatch = { - Records: events.map(e => ({ - // @type {import('aws-lambda').SQSRecord} - body: encode(e).ok ?? '', - messageId: Math.random().toString(), - receiptHandle: Math.random().toString(), - awsRegion: ctx.region, - eventSource: 'aws:sqs', - eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, - awsAccountId: ctx.accountId, - md5OfBody: '', - md5OfMessageAttributes: '', - attributes: { - ApproximateReceiveCount: '1', - SentTimestamp: e.servedAt.getTime().toString(), - SenderId: ctx.accountId, - ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), - }, - messageAttributes: {}, - })) - } + // 2. Create a SQS event batch + // @type {import('aws-lambda').SQSEvent} + const sqsEventBatch = { + Records: events.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: encode(e).ok ?? '', + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.servedAt.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), + }, + messageAttributes: {}, + })) + } - // 3. Process the SQS event to trigger the handler using the custom context - const customCtx = { - clientContext: { - Custom: ctx, - }, - } - // @ts-expect-error -- Don't need to initialize the full lambda context for testing - await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { - if (err) { - assert.fail(err) - } - assert.ok(res) - assert.equal(res.statusCode, 200) - assert.equal(res.body, 'Egress events processed successfully') - }) + // 3. Process the SQS event to trigger the handler using the custom context + const customCtx = { + clientContext: { + Custom: ctx, + }, + } + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { + if (err) { + assert.fail(err) + } + assert.ok(res) + assert.equal(res.statusCode, 200) + assert.equal(res.body, 'Egress events processed successfully') + }) - // 4. Check if the aggregated meter event exists and has a value greater than 0 - const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( - ctx.billingMeterId, - { - customer: stripeCustomerId, - start_time: Math.floor(events[0].servedAt.getTime() / 1000), - end_time: Math.floor(Date.now() / 1000), - } - ) - assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') - assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') - // We can't verify the total bytes served because the meter events are not immediately available in stripe - // and the test would fail intermittently - assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') - } finally { - if (stripeCustomerId) { - // 5. Delete the test customer from stripe - const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); - assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') - } + // 4. Check if the aggregated meter event exists and has a value greater than 0 + const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( + ctx.billingMeterId, + { + customer: stripeCustomerId, + start_time: Math.floor(events[0].servedAt.getTime() / 1000), + end_time: Math.floor(Date.now() / 1000), } + ) + assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') + assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') + // We can't verify the total bytes served because the meter events are not immediately available in stripe + // and the test would fail intermittently + assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') + } finally { + if (stripeCustomerId) { + // 5. Delete the test customer from stripe + const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); + assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') + } } + } } \ No newline at end of file diff --git a/billing/test/utils/stripe.js b/billing/test/utils/stripe.js index 72828296..2b7c6206 100644 --- a/billing/test/utils/stripe.js +++ b/billing/test/utils/stripe.js @@ -1,4 +1,7 @@ -import { handleCustomerSubscriptionCreated } from '../../utils/stripe.js' +import { + accountIDToStripeCustomerID, + stripeIDToAccountID, + handleCustomerSubscriptionCreated } from '../../utils/stripe.js' import * as DidMailto from '@web3-storage/did-mailto' @@ -44,5 +47,17 @@ export const test = { assert.ok(result.ok) const customerRecord = await ctx.customerStore.get({ customer }) assert.equal(customerRecord.ok?.product, product) + }, + + 'should convert an account ID to a stripe customer ID': (/** @type {import('entail').assert} */ assert) => { + const accountID = 'stripe:cus_1234567890' + const stripeCustomerId = accountIDToStripeCustomerID(accountID) + assert.equal(stripeCustomerId, 'cus_1234567890') + }, + + 'should convert a stripe customer ID to an account ID': (/** @type {import('entail').assert} */ assert) => { + const stripeCustomerId = 'cus_1234567890' + const accountID = stripeIDToAccountID(stripeCustomerId) + assert.equal(accountID, 'stripe:cus_1234567890') } } diff --git a/billing/utils/stripe.js b/billing/utils/stripe.js index b47b3ded..5fe8124e 100644 --- a/billing/utils/stripe.js +++ b/billing/utils/stripe.js @@ -7,6 +7,9 @@ import * as DidMailto from '@web3-storage/did-mailto' */ /** + * Converts a Stripe customer ID to an Account ID. + * e.g: + * cus_1234567890 -> stripe:cus_1234567890 * * @param {string} stripeID * @returns {AccountID} @@ -15,6 +18,17 @@ export function stripeIDToAccountID(stripeID) { return /** @type {AccountID} */(`stripe:${stripeID}`) } +/** + * Converts an Account ID to a Stripe customer ID. + * e.g: + * stripe:cus_1234567890 -> cus_1234567890 + * + * @param {AccountID} accountID + * @returns {string} + */ +export const accountIDToStripeCustomerID = (accountID) => accountID.slice('stripe:'.length) + + /** * * @param {Stripe} stripe