Skip to content

Commit

Permalink
feat: egress traffic tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 17, 2024
1 parent eec7354 commit 5f86e7d
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 4 deletions.
40 changes: 40 additions & 0 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

export const egressSchema = Schema.struct({
customer: Schema.did({ method: 'mailto' }),
resource: Schema.link(),
bytes: Schema.bigint(),
servedAt: Schema.date(),
})

/** @type {import('../lib/api').Validator<import('../lib/api').EgressTrafficData>} */
export const validate = input => egressSchema.read(input)

/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, string>} */
export const encode = input => {
try {
return { ok: JSON.stringify(input) }
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<import('../types.js').StoreRecord, import('../lib/api').EgressTrafficData>} */
export const decode = input => {
try {
return {
ok: {
customer: Schema.did({ method: 'mailto' }).from(input.customer),
resource: Schema.link().from(input.resourceId),
bytes: Schema.bigint().from(input.bytes),
servedAt: Schema.date().from(input.servedAt),
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
}
}
131 changes: 131 additions & 0 deletions billing/functions/egress-traffic-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import * as Sentry from '@sentry/serverless'
import { expect } from './lib.js'
import { decode } from '../data/egress.js'
import { SQSClient, DeleteMessageCommand } from '@aws-sdk/client-sqs'
import { mustGetEnv } from '../../lib/env.js'
import { createCustomerStore } from '../tables/customer.js'
import Stripe from 'stripe'
import { Config } from 'sst/node/config'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0
})

/**
* @typedef {{
* region?: 'us-west-2'|'us-east-2'
* queueUrl?: string
* customerTable?: string
* billingMeterName?: string
* stripeSecretKey?: string
* }} CustomHandlerContext
*/

/**
* AWS Lambda handler to process egress events from the egress traffic queue.
* Each event is a JSON object with `customer`, `resource`, `bytes` and `timestamp`.
* The message is then deleted from the queue when successful.
*/
export const handler = Sentry.AWSLambda.wrapHandler(
/**
* @param {import('aws-lambda').SQSEvent} event
* @param {import('aws-lambda').Context} context
*/

async (event, context) => {
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
const queueUrl = customContext?.queueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL')
const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME')
const sqsClient = new SQSClient({ region })
const customerStore = createCustomerStore({ region }, { tableName: customerTable })
const billingMeterName = customContext?.billingMeterName ?? 'gateway_egress_traffic'
const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY

if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY')
const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' })

for (const record of event.Records) {
try {
const messageBody = JSON.parse(record.body)
const decoded = decode(messageBody)
const egressEvent = expect(decoded, 'Failed to decode egress message')

expect(
await recordEgress(customerStore, stripe, billingMeterName, egressEvent),
`Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}`
)

/**
* SQS requires explicit acknowledgment that a message has been successfully processed.
* This is done by deleting the message from the queue using its ReceiptHandle
*/
await sqsClient.send(new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: record.receiptHandle
}))
} catch (error) {
console.error('Error processing egress event:', error)
}
}

return {
statusCode: 200,
body: 'Egress events processed successfully'
}
},
)

/**
* Finds the Stripe customer ID for the given customer and records the egress traffic data in the Stripe Billing Meter API.
*
* @param {import('../lib/api.ts').CustomerStore} customerStore
* @param {import('stripe').Stripe} stripe
* @param {string} billingMeterName
* @param {import('../lib/api.ts').EgressTrafficData} egressEventData
*/
async function recordEgress(customerStore, stripe, billingMeterName, egressEventData) {
const response = await customerStore.get({ customer: egressEventData.customer })
if (response.error) {
return {
error: {
name: 'CustomerNotFound',
message: `Error getting customer ${egressEventData.customer}`,
cause: response.error
}
}
}
const stripeCustomerId = response.ok.account.slice('stripe:'.length)
/** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */
const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId)
if (stripeCustomer.deleted) {
return {
error: {
name: 'StripeCustomerNotFound',
message: `Customer ${stripeCustomerId} has been deleted from Stripe`,
}
}
}

/** @type {import('stripe').Stripe.Billing.MeterEvent} */
const meterEvent = await stripe.billing.meterEvents.create({
event_name: billingMeterName,
payload: {
stripe_customer_id: stripeCustomerId,
value: egressEventData.bytes.toString(),
},
timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000)
})
if (meterEvent.identifier) {
return { ok: { meterEvent } }
}
return {
error: {
name: 'StripeBillingMeterEventCreationFailed',
message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`,
}
}
}
21 changes: 20 additions & 1 deletion billing/lib/api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure } from '@ucanto/interface'
import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure, UnknownLink } from '@ucanto/interface'

// Billing stores /////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -158,6 +158,25 @@ export interface CustomerBillingInstruction {

export type CustomerBillingQueue = QueueAdder<CustomerBillingInstruction>

/**
* Captures details about egress traffic that should be billed for a given period
*/
export interface EgressTrafficData {
/** Customer DID (did:mailto:...). */
customer: CustomerDID
/** Resource that was served. */
resource: UnknownLink
/** Number of bytes that were served. */
bytes: bigint
/** Time the egress traffic was served at. */
servedAt: Date
}

/**
* Queue for egress traffic data.
*/
export type EgressTrafficQueue = QueueAdder<EgressTrafficData>

/**
* Captures details about a space that should be billed for a given customer in
* the given period of usage.
Expand Down
9 changes: 9 additions & 0 deletions billing/queues/egress-traffic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { createQueueAdderClient } from './client.js'
import { encode, validate } from '../data/egress.js'

/**
* @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf
* @param {{ url: URL }} context
*/
export const createEgressTrafficQueue = (conf, { url }) =>
createQueueAdderClient(conf, { url, encode, validate })
6 changes: 5 additions & 1 deletion upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { createStripeBillingProvider } from '../billing.js'
import { createIPNIService } from '../external-services/ipni-service.js'
import * as UploadAPI from '@web3-storage/upload-api'
import { mustGetEnv } from '../../lib/env.js'
import { createEgressTrafficQueue } from '@web3-storage/w3infra-billing/queues/egress-traffic.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -120,6 +121,7 @@ export async function ucanInvocationRouter(request) {
dealTrackerUrl,
pieceOfferQueueUrl,
filecoinSubmitQueueUrl,
egressTrafficQueueUrl,
requirePaymentPlan,
// set for testing
dbEndpoint,
Expand Down Expand Up @@ -201,7 +203,8 @@ export async function ucanInvocationRouter(request) {
const revocationsStorage = createRevocationsTable(AWS_REGION, revocationTableName)
const spaceDiffStore = createSpaceDiffStore({ region: AWS_REGION }, { tableName: spaceDiffTableName })
const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName })
const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore })
const egressTrafficQueue = createEgressTrafficQueue({ region: AWS_REGION }, { url: new URL(egressTrafficQueueUrl) })
const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore, egressTrafficQueue })

const dealTrackerConnection = getServiceConnection({
did: dealTrackerDid,
Expand Down Expand Up @@ -346,6 +349,7 @@ function getLambdaEnv () {
spaceSnapshotTableName: mustGetEnv('SPACE_SNAPSHOT_TABLE_NAME'),
pieceOfferQueueUrl: mustGetEnv('PIECE_OFFER_QUEUE_URL'),
filecoinSubmitQueueUrl: mustGetEnv('FILECOIN_SUBMIT_QUEUE_URL'),
egressTrafficQueueUrl: mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL'),
r2DelegationBucketEndpoint: mustGetEnv('R2_ENDPOINT'),
r2DelegationBucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'),
r2DelegationBucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'),
Expand Down
28 changes: 26 additions & 2 deletions upload-api/stores/usage.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import { iterateSpaceDiffs } from '@web3-storage/w3infra-billing/lib/space-billi
* @param {object} conf
* @param {import('@web3-storage/w3infra-billing/lib/api').SpaceSnapshotStore} conf.spaceSnapshotStore
* @param {import('@web3-storage/w3infra-billing/lib/api').SpaceDiffStore} conf.spaceDiffStore
* @param {import('@web3-storage/w3infra-billing/lib/api').EgressTrafficQueue} conf.egressTrafficQueue
*/
export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) {
export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTrafficQueue }) {
return {
/**
* @param {import('@web3-storage/upload-api').ProviderDID} provider
* @param {import('@web3-storage/upload-api').SpaceDID} space
* @param {{ from: Date, to: Date }} period
*/
async report (provider, space, period) {
async report(provider, space, period) {
const snapResult = await spaceSnapshotStore.get({
provider,
space,
Expand Down Expand Up @@ -57,6 +58,29 @@ export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) {
events,
}
return { ok: report }
},

/**
* Handle egress traffic data and enqueues it, so the billing system can process it and update the Stripe Billing Meter API.
*
* @param {import('@web3-storage/upload-api').AccountDID} customer
* @param {import('@web3-storage/upload-api').UnknownLink} resource
* @param {bigint} bytes
* @param {Date} servedAt
* @returns {Promise<import('@ucanto/interface').Result<import('@ucanto/interface').Unit, import('@ucanto/interface').Failure>>}
*/
async record(customer, resource, bytes, servedAt) {
const record = {
customer,
resource,
bytes,
servedAt
}

const result = await egressTrafficQueue.add(record)
if (result.error) return result

return { ok: record }
}
}
}

0 comments on commit 5f86e7d

Please sign in to comment.