Skip to content

Commit

Permalink
feat: ucan stream consumer provider add count
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 30, 2023
1 parent abe2410 commit 0af0278
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 12,058 deletions.
12,086 changes: 31 additions & 12,055 deletions package-lock.json

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ export function UcanInvocationStack({ stack, app }) {
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
})

// metrics provider/add count
const metricsProviderAddTotalDLQ = new Queue(stack, 'metrics-provider-add-total-dlq')
const metricsProviderAddTotalConsumer = new Function(stack, 'metrics-provider-add-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-provider-add-total.consumer',
deadLetterQueue: metricsProviderAddTotalDLQ.cdk.queue,
})

// metrics per space
const spaceMetricsDLQ = new Queue(stack, 'space-metrics-dlq')
Expand Down Expand Up @@ -258,6 +269,14 @@ export function UcanInvocationStack({ stack, app }) {
}
}
},
metricsConsumerAddTotalConsumer: {
function: metricsProviderAddTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsUploadAddTotalConsumer: {
function: spaceMetricsUploadAddTotalConsumer,
cdk: {
Expand Down
5 changes: 5 additions & 0 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import {
add as providerAdd,
} from '@web3-storage/capabilities/provider'
import {
add as storeAdd,
remove as storeRemove
Expand All @@ -17,6 +20,7 @@ export const STORE_ADD = storeAdd.can
export const STORE_REMOVE = storeRemove.can
export const UPLOAD_ADD = uploadAdd.can
export const UPLOAD_REMOVE = uploadRemove.can
export const PROVIDER_ADD = providerAdd.can

// Admin Metrics
export const METRICS_NAMES = {
Expand All @@ -26,6 +30,7 @@ export const METRICS_NAMES = {
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
STORE_REMOVE_SIZE_TOTAL: `${STORE_REMOVE}-size-total`,
PROVIDER_ADD_TOTAL: `${PROVIDER_ADD}-total`,
}

// Spade Metrics
Expand Down
47 changes: 47 additions & 0 deletions ucan-invocation/functions/metrics-provider-add-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as Sentry from '@sentry/serverless'

import { hasOkReceipt } from '../utils/receipt.js'
import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { PROVIDER_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateProviderAddTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateProviderAddTotal (ucanInvocations, ctx) {
const invocationsWithConsumerAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === PROVIDER_ADD) && hasOkReceipt(inv)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementProviderAddTotal(invocationsWithConsumerAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
2 changes: 1 addition & 1 deletion ucan-invocation/functions/metrics-upload-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ export async function updateUploadAddTotal (ucanInvocations, ctx) {
await ctx.metricsTable.incrementUploadAddTotal(invocationsWithUploadAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
export const consumer = Sentry.AWSLambda.wrapHandler(handler)
5 changes: 3 additions & 2 deletions ucan-invocation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-eventbridge": "^3.218.0",
"@sentry/serverless": "^7.22.0",
"@web3-storage/capabilities": "^3.2.0",
"@web3-storage/capabilities": "^4.0.1",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-s3": "^3.226.0",
"@serverless-stack/resources": "*",
"@ucanto/interface": "^4.2.3",
"@ucanto/interface": "^5.0.0",
"@ucanto/principal": "^5.0.0",
"ava": "^4.3.3",
"multiformats": "^11.0.1",
"nanoid": "4.0.0",
Expand Down
22 changes: 22 additions & 0 deletions ucan-invocation/tables/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total count from provider/add operations.
*
* @param {Capabilities} operationsInv
*/
incrementProviderAddTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.PROVIDER_ADD_TOTAL
})
})

await dynamoDb.send(updateCmd)
}
}
Expand Down
225 changes: 225 additions & 0 deletions ucan-invocation/test/functions/metrics-provider-add-total.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import { testConsumer as test } from '../helpers/context.js'

import * as Signer from '@ucanto/principal/ed25519'
import * as ProviderCapabilities from '@web3-storage/capabilities/provider'

import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoTable, getItemFromTable} from '../helpers/tables.js'
import { adminMetricsTableProps } from '../../tables/index.js'

import { updateUploadAddTotal } from '../../functions/metrics-upload-add-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
import { METRICS_NAMES, STREAM_TYPE } from '../../constants.js'

const REGION = 'us-west-2'

test.before(async t => {
// Dynamo DB
const {
client: dynamo,
endpoint: dbEndpoint
} = await createDynamodDb({ port: 8000 })

t.context.dbEndpoint = dbEndpoint
t.context.dynamoClient = dynamo
})

test('handles a batch of single invocation with provider/add', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const car = await randomCAR(128)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

// const invocations = [{
// carCid: car.cid.toString(),
// value: {
// att: [
// ProviderCapabilities.add.create({
// with: alice.did(),
// nb: {
// root: car.cid,
// shards: [car.cid]
// }
// })
// ],
// aud: uploadService.did(),
// iss: alice.did()
// },
// type: STREAM_TYPE.RECEIPT,
// out: {
// ok: true
// },
// ts: Date.now()
// }]

// // @ts-expect-error
// await updateUploadAddTotal(invocations, {
// metricsTable
// })

// const item = await getItemFromTable(t.context.dynamoClient, tableName, {
// name: METRICS_NAMES.UPLOAD_ADD_TOTAL
// })
// t.truthy(item)
// t.is(item?.name, METRICS_NAMES.UPLOAD_ADD_TOTAL)
// t.is(item?.value, 1)

t.is(1, 1)
})

// test('handles batch of single invocations with multiple upload/add attributes', async t => {
// const { tableName } = await prepareResources(t.context.dynamoClient)
// const uploadService = await Signer.generate()
// const alice = await Signer.generate()
// const { spaceDid } = await createSpace(alice)

// const cars = await Promise.all(
// Array.from({ length: 10 }).map(() => randomCAR(128))
// )

// const metricsTable = createMetricsTable(REGION, tableName, {
// endpoint: t.context.dbEndpoint
// })

// const invocations = [{
// carCid: cars[0].cid.toString(),
// value: {
// att: cars.map((car) => UploadCapabilities.add.create({
// with: spaceDid,
// nb: {
// root: car.cid,
// shards: [car.cid]
// }
// })),
// aud: uploadService.did(),
// iss: alice.did()
// },
// type: STREAM_TYPE.RECEIPT,
// out: {
// ok: true
// },
// ts: Date.now()
// }]

// // @ts-expect-error
// await updateUploadAddTotal(invocations, {
// metricsTable
// })

// const item = await getItemFromTable(t.context.dynamoClient, tableName, {
// name: METRICS_NAMES.UPLOAD_ADD_TOTAL
// })

// t.truthy(item)
// t.is(item?.name, METRICS_NAMES.UPLOAD_ADD_TOTAL)
// t.is(item?.value, cars.length)
// })

// test('handles a batch of single invocation without upload/add', async t => {
// const { tableName } = await prepareResources(t.context.dynamoClient)
// const uploadService = await Signer.generate()
// const alice = await Signer.generate()
// const { spaceDid } = await createSpace(alice)
// const car = await randomCAR(128)

// const metricsTable = createMetricsTable(REGION, tableName, {
// endpoint: t.context.dbEndpoint
// })

// const invocations = [{
// carCid: car.cid.toString(),
// value: {
// att: [
// UploadCapabilities.remove.create({
// with: spaceDid,
// nb: {
// root: car.cid,
// }
// })
// ],
// aud: uploadService.did(),
// iss: alice.did()
// },
// type: STREAM_TYPE.RECEIPT,
// out: {
// ok: true
// },
// ts: Date.now()
// }]

// // @ts-expect-error
// await updateUploadAddTotal(invocations, {
// metricsTable
// })

// const item = await getItemFromTable(t.context.dynamoClient, tableName, {
// name: METRICS_NAMES.UPLOAD_ADD_TOTAL
// })

// t.truthy(item)
// t.is(item?.name, METRICS_NAMES.UPLOAD_ADD_TOTAL)
// t.is(item?.value, 0)
// })

// test('handles a batch of single invocation without receipts', async t => {
// const { tableName } = await prepareResources(t.context.dynamoClient)
// const uploadService = await Signer.generate()
// const alice = await Signer.generate()
// const { spaceDid } = await createSpace(alice)
// const car = await randomCAR(128)

// const metricsTable = createMetricsTable(REGION, tableName, {
// endpoint: t.context.dbEndpoint
// })

// const invocations = [{
// carCid: car.cid.toString(),
// value: {
// att: [
// UploadCapabilities.add.create({
// with: spaceDid,
// nb: {
// root: car.cid,
// shards: [car.cid]
// }
// })
// ],
// aud: uploadService.did(),
// iss: alice.did()
// },
// type: STREAM_TYPE.WORKFLOW,
// ts: Date.now()
// }]

// // @ts-expect-error
// await updateUploadAddTotal(invocations, {
// metricsTable
// })

// const item = await getItemFromTable(t.context.dynamoClient, tableName, {
// name: METRICS_NAMES.UPLOAD_ADD_TOTAL
// })

// t.truthy(item)
// t.is(item?.name, METRICS_NAMES.UPLOAD_ADD_TOTAL)
// t.is(item?.value, 0)
// })

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamoTable(dynamoClient, adminMetricsTableProps),
])

return {
tableName
}
}
5 changes: 5 additions & 0 deletions ucan-invocation/test/helpers/ucanto.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as UcantoClient from '@ucanto/client'
import * as principal from '@ucanto/principal'
import * as Signer from '@ucanto/principal/ed25519'

/**
Expand All @@ -17,3 +18,7 @@ export async function createSpace (audience) {
spaceDid
}
}

export function createAccount () {
return principal.Absentee.from({ id: 'did:mailto:foo' })
}
Loading

0 comments on commit 0af0278

Please sign in to comment.