diff --git a/package-lock.json b/package-lock.json index ebd5fe19..50d1a357 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5169,7 +5169,6 @@ }, "node_modules/at-least-node": { "version": "1.0.0", - "dev": true, "license": "ISC", "engines": { "node": ">= 4.0.0" @@ -5444,6 +5443,8 @@ }, "node_modules/aws-cdk-lib": { "version": "2.50.0", + "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.50.0.tgz", + "integrity": "sha512-deDbZTI7oyu3rqUyqjwhP6tnUO8MD70lE98yR65xiYty4yXBpsWKbeH3s1wNLpLAWS3hWJYyMtjZ4ZfC35NtVg==", "bundleDependencies": [ "@balena/dockerignore", "case", @@ -5455,7 +5456,6 @@ "semver", "yaml" ], - "license": "Apache-2.0", "dependencies": { "@balena/dockerignore": "^1.0.2", "case": "1.6.3", @@ -8283,7 +8283,6 @@ }, "node_modules/fs-extra": { "version": "9.1.0", - "dev": true, "license": "MIT", "dependencies": { "at-least-node": "^1.0.0", @@ -8606,7 +8605,6 @@ }, "node_modules/graceful-fs": { "version": "4.2.11", - "dev": true, "license": "ISC" }, "node_modules/graphemer": { @@ -9791,7 +9789,6 @@ }, "node_modules/jsonfile": { "version": "6.1.0", - "dev": true, "license": "MIT", "dependencies": { "universalify": "^2.0.0" @@ -13493,7 +13490,6 @@ }, "node_modules/universalify": { "version": "2.0.0", - "dev": true, "license": "MIT", "engines": { "node": ">= 10.0.0" @@ -18158,8 +18154,7 @@ } }, "at-least-node": { - "version": "1.0.0", - "dev": true + "version": "1.0.0" }, "atomically": { "version": "1.7.0" @@ -18338,6 +18333,8 @@ }, "aws-cdk-lib": { "version": "2.50.0", + "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.50.0.tgz", + "integrity": "sha512-deDbZTI7oyu3rqUyqjwhP6tnUO8MD70lE98yR65xiYty4yXBpsWKbeH3s1wNLpLAWS3hWJYyMtjZ4ZfC35NtVg==", "requires": { "@balena/dockerignore": "^1.0.2", "case": "1.6.3", @@ -20139,7 +20136,6 @@ }, "fs-extra": { "version": "9.1.0", - "dev": true, "requires": { "at-least-node": "^1.0.0", "graceful-fs": "^4.2.0", @@ -20338,8 +20334,7 @@ } }, "graceful-fs": { - "version": "4.2.11", - "dev": true + "version": "4.2.11" }, "graphemer": { "version": "1.4.0", @@ -21003,7 +20998,6 @@ }, "jsonfile": { "version": "6.1.0", - "dev": true, "requires": { "graceful-fs": "^4.1.6", "universalify": "^2.0.0" @@ -23294,8 +23288,7 @@ } }, "universalify": { - "version": "2.0.0", - "dev": true + "version": "2.0.0" }, "unpipe": { "version": "1.0.0", diff --git a/package.json b/package.json index 33f7963e..d393aac2 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,6 @@ "test-integration": "ava --verbose --timeout=60s test/*.test.js", "fetch-metrics-for-space": "npm run fetch-metrics-for-space -w tools", "d1-dynamo-migration": "npm run d1-dynamo-migration -w tools" - }, "devDependencies": { "@ipld/car": "^5.1.0", diff --git a/stacks/firehose-stack.js b/stacks/firehose-stack.js index 8ff7fd4f..00959f4f 100644 --- a/stacks/firehose-stack.js +++ b/stacks/firehose-stack.js @@ -5,7 +5,8 @@ import { aws_iam as iam, aws_logs as logs, aws_kinesisfirehose as firehose, - aws_glue as glue + aws_glue as glue, + aws_athena as athena } from 'aws-cdk-lib' import { UcanInvocationStack } from './ucan-invocation-stack.js' @@ -19,7 +20,7 @@ import { /** * @param {import('@serverless-stack/resources').StackContext} properties */ -export function UcanFirehoseStack({ stack, app }) { +export function UcanFirehoseStack ({ stack, app }) { stack.setDefaultFunctionProps({ srcPath: 'ucan-firehose' }) @@ -114,7 +115,7 @@ export function UcanFirehoseStack({ stack, app }) { bufferingHints: { intervalInSeconds: 60 }, - // makes easier to run hight performance, cost efficient analytics with Athena + // makes easier to run high performance, cost efficient analytics with Athena dynamicPartitioningConfiguration: { enabled: true }, @@ -126,7 +127,8 @@ export function UcanFirehoseStack({ stack, app }) { parameters: [ { parameterName: 'MetadataExtractionQuery', - parameterValue: '{issuer: .value.iss}', + // extract yyyy-MM-dd formatted current date from millisecond epoch timestamp "ts" using jq syntax + parameterValue: '{day: (.ts/1000) | strftime("%Y-%m-%d")}', }, { parameterName: 'JsonParsingEngine', @@ -145,8 +147,11 @@ export function UcanFirehoseStack({ stack, app }) { }, ], }, - // See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html - prefix: 'logs/issuer=!{partitionKeyFromQuery:issuer}/', + // See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for general information on partitioning. + // Daily partitions seem right (https://www.upsolver.com/blog/partitioning-data-s3-improve-performance-athena-presto) + // "A rough rule of thumb is that each 100 partitions scanned adds about 1 second of latency to your query in Amazon Athena. This is why minutely or hourly + // partitions are rarely used – typically you would choose between daily, weekly, and monthly partitions, depending on the nature of your queries." + prefix: 'logs/!{partitionKeyFromQuery:day}/', errorOutputPrefix: 'error' } }) @@ -162,31 +167,45 @@ export function UcanFirehoseStack({ stack, app }) { } }) - // TODO: See https://catalog.us-east-1.prod.workshops.aws/workshops/fad47f62-3d06-430b-ad32-8588b74fe16f/en-US/lab-5-athena/55-athena-best-practices const tableName = getCdkNames('ucan-stream-delivery-table', app.stage) const glueTable = new glue.CfnTable(stack, tableName, { catalogId: Aws.ACCOUNT_ID, databaseName, tableInput: { name: tableName, - partitionKeys: [], + partitionKeys: [ + { name: 'day', type: 'date' }, + ], + parameters: { + classification: "json", + typeOfData: "file", + // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection + // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream + "projection.enabled": "true", + "projection.day.type": "date", + "projection.day.format": "yyyy-MM-dd", + "projection.day.range": "2023-01-01,NOW", + "projection.day.interval": "1", + "projection.day.interval.unit": "DAYS", + "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/\${day}/` + }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs`, columns: [ { name: 'carcid', type: 'string' }, { name: 'type', type: 'string' }, - { name: 'value', type: 'STRUCT>,iss:STRING,aud:STRING>' } + // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ + { name: 'value', type: 'STRUCT,consumer:STRING>>>,iss:STRING,aud:STRING>' }, + { name: "out", type: "STRUCT,ok:STRUCT>" }, + { name: "ts", type: "timestamp" } ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', - parameters: { - "classification": "json", - "typeOfData": "file" - }, serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { - paths: 'carCid,invocationCid,out,ts,type,value' + // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ + 'mapping._cid_slash': '/' } } } @@ -194,4 +213,127 @@ export function UcanFirehoseStack({ stack, app }) { }) glueTable.addDependsOn(glueDatabase) + + const inputOutputQueryName = getCdkNames('input-output-query', app.stage) + const inputOutputQuery = new athena.CfnNamedQuery(stack, inputOutputQueryName, { + name: "Inputs and Outputs, last 24 hours", + description: "(w3up preloaded)", + database: databaseName, + queryString: `SELECT + value.att[1] as "in", + out +FROM "${tableName}" +WHERE type = 'receipt' + AND day > (CURRENT_DATE - INTERVAL '1' DAY) +` + }) + inputOutputQuery.addDependsOn(glueTable) + + const dataStoredQueryName = getCdkNames('data-stored-query', app.stage) + const dataStoredQuery = new athena.CfnNamedQuery(stack, dataStoredQueryName, { + name: "Data stored by space, last week", + description: "(w3up preloaded)", + database: databaseName, + queryString: `SELECT + SUM(value.att[1].nb.size) AS size, + value.att[1]."with" AS space +FROM "${tableName}" +WHERE value.att[1].can='store/add' + AND out.ok IS NOT NULL + AND type='receipt' + AND day > (CURRENT_DATE - INTERVAL '7' DAY) +GROUP BY value.att[1]."with" +` + }) + dataStoredQuery.addDependsOn(glueTable) + + const storesBySpaceQueryName = getCdkNames('stores-by-space-query', app.stage) + const storesBySpaceQuery = new athena.CfnNamedQuery(stack, storesBySpaceQueryName, { + name: "Stores by space, last week", + description: "(w3up preloaded)", + database: databaseName, + queryString: `SELECT + value.att[1].nb.size AS size, + value.att[1]."with" AS space, + ts +FROM "${tableName}" +WHERE value.att[1].can='store/add' + AND day > (CURRENT_DATE - INTERVAL '2' DAY) + AND out.ok IS NOT NULL + AND type='receipt' +` + }) + storesBySpaceQuery.addDependsOn(glueTable) + + const uploadsQueryName = getCdkNames('uploads-query', app.stage) + const uploadsQuery = new athena.CfnNamedQuery(stack, uploadsQueryName, { + name: "Uploads, last week", + description: "(w3up preloaded)", + database: databaseName, + queryString: `SELECT + value.att[1].nb.root._cid_slash AS cid, + value.att[1]."with" AS space, + ts +FROM "${tableName}" +WHERE value.att[1].can='upload/add' + AND day > (CURRENT_DATE - INTERVAL '7' DAY) + AND out.ok IS NOT NULL + AND type='receipt' +ORDER BY ts +` + }) + uploadsQuery.addDependsOn(glueTable) + + const spacesByAccountQueryName = getCdkNames('spaces-by-account-query', app.stage) + const spacesByAccountQuery = new athena.CfnNamedQuery(stack, spacesByAccountQueryName, { + name: "Spaces by account", + description: "(w3up preloaded)", + database: databaseName, + queryString: `SELECT + value.att[1].nb.consumer AS space, + value.att[1]."with" AS account +FROM "${tableName}" +WHERE value.att[1].can='provider/add' + AND out.ok IS NOT NULL + AND type='receipt' +` + }) + spacesByAccountQuery.addDependsOn(glueTable) + + const uploadsByAccountQueryName = getCdkNames('uploads-by-account-query', app.stage) + const uploadsByAccountQuery = new athena.CfnNamedQuery(stack, uploadsByAccountQueryName, { + name: "Uploads by account", + description: "(w3up preloaded)", + database: databaseName, + queryString: `WITH +spaces AS ( + SELECT value.att[1].nb.consumer AS did, + value.att[1]."with" AS account + FROM "${tableName}" + WHERE value.att[1].can='provider/add' + AND out.ok IS NOT NULL + AND type='receipt' +), +uploads AS ( + SELECT value.att[1].nb.root._cid_slash AS cid, + value.att[1]."with" AS space, + ts + FROM "${tableName}" + WHERE value.att[1].can='upload/add' + AND out.ok IS NOT NULL + AND type='receipt' +), +uploads_by_account AS ( + SELECT spaces.did AS space, + spaces.account AS account, + uploads.cid AS cid, + uploads.ts AS "timestamp" + FROM uploads LEFT JOIN spaces on spaces.did = uploads.space +) SELECT * + FROM uploads_by_account + ORDER BY timestamp +` + }) + uploadsByAccountQuery.addDependsOn(glueTable) + }