Skip to content

Commit

Permalink
fix(egress-tracking): new partition and sort key for dynamo table (#443)
Browse files Browse the repository at this point in the history
**Context**
I've identified an issue in our DynamoDB table where egress events were
not being stored correctly. This was traced back to the configuration of
our partition key (PK) and sort key (SK). In DynamoDB, the partition key
is crucial for uniquely identifying each item, while the sort key helps
organize data within a partition.

This change ensures that the combination of partition key and sort key
is unique for each record, preventing DynamoDB from overriding existing
records in case of a collision.

**Changes Made**

1. **Modification of Partition Key (PK) & Sort Key (SK):** 
- The PK was updated to ensure it uniquely identifies each record.
Previously, key collisions occurred because multiple records were using
the same PK value, leading to storage issues.

2. **Inclusion of `cause` in Stripe Idempotent Key:**
- Added `cause` as part of the Stripe idempotent key to ensure the
uniqueness of requests to Stripe, preventing duplicate processing.

3. **Renaming of Table from `egress-traffic` to
`egress-traffic-events`:**
- Since existing partition keys and sort keys cannot be updated, the
table was renamed and the keys were updated. The existing table will be
dropped to accommodate these changes.

**Impact**
- These changes resolve the storage issues for egress events and improve
data integrity and retrieval efficiency.
- The renaming and restructuring of the table ensure that future records
are stored and managed correctly.

Blocked by storacha/w3up#1588
  • Loading branch information
fforbeck authored Nov 25, 2024
1 parent 0efa41d commit 180842f
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 36 deletions.
25 changes: 15 additions & 10 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { Link } from '@ucanto/server'
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

/**
* @typedef { import('../types').InferStoreRecord<import('../lib/api').EgressTrafficData> } EgressTrafficStoreRecord
* @typedef { import('../types').InferStoreRecord<import('../lib/api').EgressTrafficEventListKey> } EgressTrafficKeyStoreRecord
* @typedef { import('../lib/api').EgressTrafficData } EgressTrafficData
* @typedef { import('../types').InferStoreRecord<EgressTrafficData> & { pk: string, sk: string } } EgressTrafficStoreRecord
* @typedef {{ pk: string, sk: string }} EgressTrafficKeyStoreRecord
*/

export const egressSchema = Schema.struct({
Expand All @@ -15,14 +16,16 @@ export const egressSchema = Schema.struct({
cause: Schema.link(),
})

/** @type {import('../lib/api').Validator<import('../lib/api').EgressTrafficData>} */
/** @type {import('../lib/api').Validator<EgressTrafficData>} */
export const validate = input => egressSchema.read(input)

/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, EgressTrafficStoreRecord>} */
/** @type {import('../lib/api').Encoder<EgressTrafficData, EgressTrafficStoreRecord>} */
export const encode = input => {
try {
return {
ok: {
pk: `${input.space.toString()}#${input.resource.toString()}`,
sk: `${input.servedAt.toISOString()}#${input.cause.toString()}`,
space: input.space.toString(),
customer: input.customer.toString(),
resource: input.resource.toString(),
Expand Down Expand Up @@ -86,19 +89,21 @@ export const lister = {
/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficEventListKey, EgressTrafficKeyStoreRecord>} */
encodeKey: input => ({
ok: {
space: input.space.toString(),
customer: input.customer.toString(),
from: input.from.toISOString()
pk: `${input.space.toString()}#${input.resource.toString()}`,
sk: `${input.servedAt.toISOString()}#${input.cause.toString()}`,
}
}),
/** @type {import('../lib/api').Decoder<EgressTrafficKeyStoreRecord, import('../lib/api').EgressTrafficEventListKey>} */
decodeKey: input => {
try {
const [space, resource] = input.pk.split('#')
const [servedAt, cause] = input.sk.split('#')
return {
ok: {
space: Schema.did({ method: 'key' }).from(input.space),
customer: Schema.did({ method: 'mailto' }).from(input.customer),
from: new Date(input.from)
space: Schema.did({ method: 'key' }).from(space),
resource: Link.parse(resource),
servedAt: new Date(servedAt),
cause: Link.parse(cause),
}
}
} catch (/** @type {any} */ err) {
Expand Down
16 changes: 4 additions & 12 deletions billing/functions/egress-traffic-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,13 @@ export const handler = Sentry.AWSLambda.wrapHandler(
const decoded = decodeStr(record.body)
const egressData = expect(decoded, 'Failed to decode egress event')

const putResult = await egressTrafficEventStore.put(egressData)
if (putResult.error) throw putResult.error
const result = await egressTrafficEventStore.put(egressData)
expect(result, 'Failed to save egress event in database')

const response = await customerStore.get({ customer: egressData.customer })
if (response.error) {
return {
error: {
name: 'CustomerNotFound',
message: `Error getting customer ${egressData.customer}`,
cause: response.error
}
}
}
const customerAccount = response.ok.account
if (response.error) throw response.error

const customerAccount = response.ok.account
expect(
await recordBillingMeterEvent(stripe, billingMeterName, egressData, customerAccount),
`Failed to record egress event in Stripe API for customer: ${egressData.customer}, account: ${customerAccount}, bytes: ${egressData.bytes}, servedAt: ${egressData.servedAt.toISOString()}, resource: ${egressData.resource}`
Expand Down
7 changes: 6 additions & 1 deletion billing/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ export type EgressTrafficQueue = QueueAdder<EgressTrafficData>
/**
* List key for egress traffic data.
*/
export interface EgressTrafficEventListKey { space: ConsumerDID, customer: CustomerDID, from: Date }
export interface EgressTrafficEventListKey {
space: ConsumerDID,
resource: UnknownLink,
servedAt: Date
cause: UnknownLink,
}

/**
* Captures details about a space that should be billed for a given customer in
Expand Down
8 changes: 6 additions & 2 deletions billing/tables/egress-traffic.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import { validate, encode, lister, decode } from '../data/egress.js'
*/
export const egressTrafficTableProps = {
fields: {
/** Composite key with format: "space#resource" */
pk: 'string',
/** Composite key with format: "servedAt#cause" */
sk: 'string',
/** Space DID (did:key:...). */
space: 'string',
/** Customer DID (did:mailto:...). */
Expand All @@ -21,11 +25,11 @@ export const egressTrafficTableProps = {
/** UCAN invocation ID that caused the egress traffic. */
cause: 'string',
},
primaryIndex: { partitionKey: 'space', sortKey: 'servedAt' },
primaryIndex: { partitionKey: 'pk', sortKey: 'sk' },
globalIndexes: {
customer: {
partitionKey: 'customer',
sortKey: 'servedAt',
sortKey: 'sk',
projection: ['space', 'resource', 'bytes', 'cause', 'servedAt']
}
}
Expand Down
2 changes: 1 addition & 1 deletion billing/test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export const createEgressTrafficTestContext = async () => {
})
}

const egressTrafficTable = await createTable(awsServices.dynamo.client, egressTrafficTableProps, 'egress-traffic-')
const egressTrafficTable = await createTable(awsServices.dynamo.client, egressTrafficTableProps, 'egress-traffic-events-')
const egressTrafficEventStore = {
...createEgressTrafficEventStore(awsServices.dynamo.client, { tableName: egressTrafficTable }),
...createStorePutterClient(awsServices.dynamo.client, {
Expand Down
4 changes: 3 additions & 1 deletion billing/utils/stripe.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ export async function recordBillingMeterEvent(stripe, billingMeterEventName, egr
event_name: billingMeterEventName,
payload: {
stripe_customer_id: stripeCustomerId,
// Stripe expects the value to be a string
value: egressData.bytes.toString(),
},
// Stripe expects the timestamp to be in seconds
timestamp: Math.floor(egressData.servedAt.getTime() / 1000),
},
{
idempotencyKey: `${egressData.servedAt.toISOString()}-${egressData.space}-${egressData.customer}-${egressData.resource}`
idempotencyKey: `${egressData.servedAt.toISOString()}-${egressData.space}-${egressData.customer}-${egressData.resource}-${egressData.cause}`
}
)

Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"packageManager": "[email protected]+sha256.c8c61ba0fa0ab3b5120efd5ba97fdaf0e0b495eef647a97c4413919eda0a878b",
"type": "module",
"scripts": {
"start": "sst start",
"start": "sst dev",
"build": "sst build",
"check": "tsc --build",
"deploy": "sst deploy --outputs-file .test-env.json",
Expand Down
2 changes: 1 addition & 1 deletion stacks/billing-db-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const BillingDbStack = ({ stack }) => {
...usageTableProps,
stream: 'new_image'
})
const egressTrafficTable = new Table(stack, 'egress-traffic', egressTrafficTableProps)
const egressTrafficTable = new Table(stack, 'egress-traffic-events', egressTrafficTableProps)

stack.addOutputs({
customerTableName: customerTable.tableName,
Expand Down
2 changes: 1 addition & 1 deletion stacks/billing-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export function BillingStack ({ stack, app }) {
consumer: {
function: egressTrafficQueueHandler,
deadLetterQueue: egressTrafficDLQ.cdk.queue,
cdk: { eventSource: { batchSize: 1 } }
cdk: { eventSource: { batchSize: 10 } }
},
cdk: { queue: { visibilityTimeout: Duration.minutes(15) } }
})
Expand Down
2 changes: 1 addition & 1 deletion upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"@web3-storage/access": "^20.0.0",
"@web3-storage/capabilities": "^17.4.1",
"@web3-storage/did-mailto": "^2.1.0",
"@web3-storage/upload-api": "^18.1.2",
"@web3-storage/upload-api": "^18.1.3",
"multiformats": "^13.1.0",
"nanoid": "^5.0.2",
"one-webcrypto": "^1.0.3",
Expand Down
5 changes: 4 additions & 1 deletion upload-api/stores/usage.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTraffi
}

const result = await egressTrafficQueue.add(record)
if (result.error) return result
if (result.error) {
console.error('Error sending egress event to queue:', result.error)
return result
}

return { ok: { ...record, servedAt: servedAt.toISOString() } }
}
Expand Down

0 comments on commit 180842f

Please sign in to comment.