Skip to content

Commit

Permalink
implemented suggested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 22, 2024
1 parent de2b19b commit a12d35e
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 214 deletions.
74 changes: 37 additions & 37 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,58 @@ import { Link } from '@ucanto/server'
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

export const egressSchema = Schema.struct({
customer: Schema.did({ method: 'mailto' }),
resource: Schema.link(),
bytes: Schema.bigint(),
servedAt: Schema.date(),
customer: Schema.did({ method: 'mailto' }),
resource: Schema.link(),
bytes: Schema.bigint(),
servedAt: Schema.date(),
})

/** @type {import('../lib/api').Validator<import('../lib/api').EgressTrafficData>} */
export const validate = input => egressSchema.read(input)

/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, string>} */
export const encode = input => {
try {
return {
ok: JSON.stringify({
customer: input.customer.toString(),
resource: input.resource.toString(),
bytes: input.bytes.toString(),
servedAt: input.servedAt.toISOString(),
})
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
try {
return {
ok: JSON.stringify({
customer: input.customer.toString(),
resource: input.resource.toString(),
bytes: input.bytes.toString(),
servedAt: input.servedAt.toISOString(),
})
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<import('../types.js').StoreRecord, import('../lib/api').EgressTrafficData>} */
export const decode = input => {
try {
return {
ok: {
customer: Schema.did({ method: 'mailto' }).from(input.customer),
resource: Link.parse(/** @type {string} */(input.resource)),
bytes: BigInt(input.bytes),
servedAt: new Date(input.servedAt),
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
try {
return {
ok: {
customer: Schema.did({ method: 'mailto' }).from(input.customer),
resource: Link.parse(/** @type {string} */(input.resource)),
bytes: BigInt(input.bytes),
servedAt: new Date(input.servedAt),
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<string, import('../lib/api').EgressTrafficData>} */
export const decodeStr = input => {
try {
return decode(JSON.parse(input))
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err })
}
try {
return decode(JSON.parse(input))
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err })
}
}
}
}
159 changes: 82 additions & 77 deletions billing/functions/egress-traffic-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import { mustGetEnv } from '../../lib/env.js'
import { createCustomerStore } from '../tables/customer.js'
import Stripe from 'stripe'
import { Config } from 'sst/node/config'
import { accountIDToStripeCustomerID } from '../utils/stripe.js'


Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0
})

/**
Expand All @@ -25,50 +27,51 @@ Sentry.AWSLambda.init({

/**
* AWS Lambda handler to process egress events from the egress traffic queue.
* Each event is a JSON object with `customer`, `resource`, `bytes` and `se`.
* Each event is a JSON object with `customer`, `resource`, `bytes` and `servedAt`.
* The message is then deleted from the queue when successful.
*/
export const handler = Sentry.AWSLambda.wrapHandler(
/**
* @param {import('aws-lambda').SQSEvent} event
* @param {import('aws-lambda').Context} context
*/
async (event, context) => {
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME')
const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable })

const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY
if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY')

const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME')
if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME')
/**
* @param {import('aws-lambda').SQSEvent} event
* @param {import('aws-lambda').Context} context
*/
async (event, context) => {
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME')
const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable })

const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' })
const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY
if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY')

for (const record of event.Records) {
try {
const decoded = decodeStr(record.body)
const egressEvent = expect(decoded, 'Failed to decode egress event')
const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME')
if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME')

expect(
await recordEgress(customerStore, stripe, billingMeterName, egressEvent),
`Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}`
)
const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' })
const batchItemFailures = []
for (const record of event.Records) {
try {
const decoded = decodeStr(record.body)
const egressEvent = expect(decoded, 'Failed to decode egress event')

// TODO: delete the message from the queue?
} catch (error) {
console.error('Error processing egress event:', error)
}
}
expect(
await recordEgress(customerStore, stripe, billingMeterName, egressEvent),
`Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}`
)
} catch (error) {
console.error('Error processing egress event:', error)
batchItemFailures.push({ itemIdentifier: record.messageId })
}
}

return {
statusCode: 200,
body: 'Egress events processed successfully'
}
},
return {
statusCode: 200,
body: 'Egress events processed successfully',
// Return the failed records so they can be retried
batchItemFailures
}
},
)

/**
Expand All @@ -77,48 +80,50 @@ export const handler = Sentry.AWSLambda.wrapHandler(
* @param {import('../lib/api.js').CustomerStore} customerStore
* @param {import('stripe').Stripe} stripe
* @param {string} billingMeterEventName
* @param {import('../lib/api.js').EgressTrafficData} egressEventData
* @param {import('../lib/api').EgressTrafficData} egressEventData
*/
async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) {
const response = await customerStore.get({ customer: egressEventData.customer })
if (response.error) {
return {
error: {
name: 'CustomerNotFound',
message: `Error getting customer ${egressEventData.customer}`,
cause: response.error
}
}
const response = await customerStore.get({ customer: egressEventData.customer })
if (response.error) {
return {
error: {
name: 'CustomerNotFound',
message: `Error getting customer ${egressEventData.customer}`,
cause: response.error
}
}
const stripeCustomerId = response.ok.account.slice('stripe:'.length)
/** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */
const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId)
if (stripeCustomer.deleted) {
return {
error: {
name: 'StripeCustomerNotFound',
message: `Customer ${stripeCustomerId} has been deleted from Stripe`,
}
}
}

const stripeCustomerId = accountIDToStripeCustomerID(response.ok.account)
/** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */
const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId)
if (stripeCustomer.deleted) {
return {
error: {
name: 'StripeCustomerNotFound',
message: `Customer ${stripeCustomerId} has been deleted from Stripe`,
}
}
}

// TODO (fforbeck): implement some retry logic in case rate limiting errors
/** @type {import('stripe').Stripe.Billing.MeterEvent} */
const meterEvent = await stripe.billing.meterEvents.create({
event_name: billingMeterEventName,
payload: {
stripe_customer_id: stripeCustomerId,
value: egressEventData.bytes.toString(),
},
timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000)
})
if (meterEvent.identifier) {
return { ok: { meterEvent } }
}
return {
error: {
name: 'StripeBillingMeterEventCreationFailed',
message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`,
}
/** @type {import('stripe').Stripe.Billing.MeterEvent} */
const meterEvent = await stripe.billing.meterEvents.create({
event_name: billingMeterEventName,
payload: {
stripe_customer_id: stripeCustomerId,
value: egressEventData.bytes.toString(),
},
timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000)
})

// Identifier is only set if the event was successfully created
if (meterEvent.identifier) {
return { ok: { meterEvent } }
}
return {
error: {
name: 'StripeBillingMeterEventCreationFailed',
message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`,
}
}
}
2 changes: 1 addition & 1 deletion billing/queues/egress-traffic.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import { encode, validate } from '../data/egress.js'
* @param {{ url: URL }} context
*/
export const createEgressTrafficQueue = (conf, { url }) =>
createQueueAdderClient(conf, { url, encode, validate })
createQueueAdderClient(conf, { url, encode, validate })
12 changes: 6 additions & 6 deletions billing/test/helpers/egress.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { randomLink } from './dag.js'
* @returns {import('../../lib/api').EgressTrafficData}
*/
export const randomEgressEvent = (customer) => ({
customer: customer.customer,
resource: randomLink(),
bytes: BigInt(Math.floor(Math.random() * 1000000)),
// Random timestamp within the last 1 hour
servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)),
})
customer: customer.customer,
resource: randomLink(),
bytes: BigInt(Math.floor(Math.random() * 1000000)),
// Random timestamp within the last 1 hour
servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000))
})
Loading

0 comments on commit a12d35e

Please sign in to comment.