From a28f5dda26f3431c29bd959cd6079f60e6ecde2f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 19 Sep 2024 17:37:39 -0700 Subject: [PATCH] feat(firehose): add a manual put firehose add a second firehose for putting records manually --- stacks/firehose-stack.js | 1198 ++++++++++++++++++++++---------------- 1 file changed, 702 insertions(+), 496 deletions(-) diff --git a/stacks/firehose-stack.js b/stacks/firehose-stack.js index cfccb7a0..b2cc2f86 100644 --- a/stacks/firehose-stack.js +++ b/stacks/firehose-stack.js @@ -7,21 +7,17 @@ import { aws_kinesisfirehose as firehose, aws_glue as glue, aws_athena as athena, - aws_sam as sam + aws_sam as sam, } from 'aws-cdk-lib' import { UcanInvocationStack } from './ucan-invocation-stack.js' -import { - getBucketConfig, - getCdkNames, - setupSentry -} from './config.js' +import { getBucketConfig, getCdkNames, setupSentry } from './config.js' /** * @param {import('sst/constructs').StackContext} properties */ -export function UcanFirehoseStack ({ stack, app }) { +export function UcanFirehoseStack({ stack, app }) { // Setup app monitoring with Sentry setupSentry(app, stack) @@ -40,22 +36,28 @@ export function UcanFirehoseStack ({ stack, app }) { // enabled: true, // expiration: Duration.days(90) // } - ] - } - } + ], + }, + }, }) // Roles for delivery stream - const deliveryStreamRoleName = getCdkNames('ucan-stream-delivery-role', app.stage) + const deliveryStreamRoleName = getCdkNames( + 'ucan-stream-delivery-role', + app.stage + ) const deliveryStreamRole = new iam.Role(stack, deliveryStreamRoleName, { assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'), - roleName: deliveryStreamRoleName + roleName: deliveryStreamRoleName, }) - const logGroupName = getCdkNames('ucan-stream-delivery-error-logging', app.stage) + const logGroupName = getCdkNames( + 'ucan-stream-delivery-error-logging', + app.stage + ) const cfnLogGroup = new logs.CfnLogGroup(stack, logGroupName, { retentionInDays: 90, - logGroupName + logGroupName, }) // Assign permissions @@ -78,7 +80,7 @@ export function UcanFirehoseStack ({ stack, app }) { effect: iam.Effect.ALLOW, resources: [ streamLogBucket.bucketArn, - `${streamLogBucket.bucketArn}/*` + `${streamLogBucket.bucketArn}/*`, ], actions: [ 's3:AbortMultipartUpload', @@ -93,89 +95,154 @@ export function UcanFirehoseStack ({ stack, app }) { new iam.PolicyStatement({ effect: iam.Effect.ALLOW, resources: [cfnLogGroup.attrArn], - actions: [ - 'logs:PutLogEvents' - ], + actions: ['logs:PutLogEvents'], }), ], }) // Create AWS Kinesis Firehose const deliveryStreamName = getCdkNames('ucan-stream-delivery', app.stage) - const deliveryFirehose = new firehose.CfnDeliveryStream(stack, deliveryStreamName, { + const deliveryFirehose = new firehose.CfnDeliveryStream( + stack, deliveryStreamName, - deliveryStreamType: 'KinesisStreamAsSource', - kinesisStreamSourceConfiguration: { - kinesisStreamArn: ucanStream.streamArn, - roleArn: deliveryStreamRole.roleArn - }, - extendedS3DestinationConfiguration: { - bucketArn: streamLogBucket.bucketArn, - roleArn: deliveryStreamRole.roleArn, - bufferingHints: { - intervalInSeconds: 60 + { + deliveryStreamName, + deliveryStreamType: 'KinesisStreamAsSource', + kinesisStreamSourceConfiguration: { + kinesisStreamArn: ucanStream.streamArn, + roleArn: deliveryStreamRole.roleArn, }, - // makes easier to run high performance, cost efficient analytics with Athena - dynamicPartitioningConfiguration: { - enabled: true - }, - processingConfiguration: { - enabled: true, - processors: [ - { - type: 'MetadataExtraction', - parameters: [ - { - parameterName: 'MetadataExtractionQuery', - // extract yyyy-MM-dd formatted current date from millisecond epoch timestamp "ts" using jq syntax - // extract type ('workflow' or 'receipt') - // extract the UCAN ability of the invocation to a key named "op" - this matches the latest UCAN spec https://github.com/ucan-wg/invocation/pull/21/files#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R208 - // we replace / with _ here since it will be used in the S3 bucket path and we a) don't want it to collide with the path separator and b) want it to be easy to refer to in queries - // eslint-disable-next-line no-useless-escape - parameterValue: '{day: (.ts/1000) | strftime("%Y-%m-%d"), type: .type, op: (.value.att[0].can | gsub("\/"; "_"))}', - }, - { - parameterName: 'JsonParsingEngine', - parameterValue: 'JQ-1.6', - } - ] - }, - { - type: 'AppendDelimiterToRecord', - parameters: [ - { - parameterName: 'Delimiter', - parameterValue: '\\n', - }, - ], - }, - ], + extendedS3DestinationConfiguration: { + bucketArn: streamLogBucket.bucketArn, + roleArn: deliveryStreamRole.roleArn, + bufferingHints: { + intervalInSeconds: 60, + }, + // makes easier to run high performance, cost efficient analytics with Athena + dynamicPartitioningConfiguration: { + enabled: true, + }, + processingConfiguration: { + enabled: true, + processors: [ + { + type: 'MetadataExtraction', + parameters: [ + { + parameterName: 'MetadataExtractionQuery', + // extract yyyy-MM-dd formatted current date from millisecond epoch timestamp "ts" using jq syntax + // extract type ('workflow' or 'receipt') + // extract the UCAN ability of the invocation to a key named "op" - this matches the latest UCAN spec https://github.com/ucan-wg/invocation/pull/21/files#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R208 + // we replace / with _ here since it will be used in the S3 bucket path and we a) don't want it to collide with the path separator and b) want it to be easy to refer to in queries + // eslint-disable-next-line no-useless-escape + parameterValue: + '{day: (.ts/1000) | strftime("%Y-%m-%d"), type: .type, op: (.value.att[0].can | gsub("/"; "_"))}', + }, + { + parameterName: 'JsonParsingEngine', + parameterValue: 'JQ-1.6', + }, + ], + }, + { + type: 'AppendDelimiterToRecord', + parameters: [ + { + parameterName: 'Delimiter', + parameterValue: '\\n', + }, + ], + }, + ], + }, + // See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for general information on partitioning. + // Daily partitions seem right (https://www.upsolver.com/blog/partitioning-data-s3-improve-performance-athena-presto) + // "A rough rule of thumb is that each 100 partitions scanned adds about 1 second of latency to your query in Amazon Athena. This is why minutely or hourly + // partitions are rarely used – typically you would choose between daily, weekly, and monthly partitions, depending on the nature of your queries." + // We also partition by "type" (workflow or receipt) and "op" (the invoked UCAN ability name): + // 1) Receipts generally have all the information workflows have so we can safely ignore workflows. + // 2) Partitioning by "op" lets us ignore large classes of operations that we don't care about and should + // make queries significantly more efficient. + prefix: + 'logs/!{partitionKeyFromQuery:type}/!{partitionKeyFromQuery:op}/!{partitionKeyFromQuery:day}/', + errorOutputPrefix: 'error', }, - // See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for general information on partitioning. - // Daily partitions seem right (https://www.upsolver.com/blog/partitioning-data-s3-improve-performance-athena-presto) - // "A rough rule of thumb is that each 100 partitions scanned adds about 1 second of latency to your query in Amazon Athena. This is why minutely or hourly - // partitions are rarely used – typically you would choose between daily, weekly, and monthly partitions, depending on the nature of your queries." - // We also partition by "type" (workflow or receipt) and "op" (the invoked UCAN ability name): - // 1) Receipts generally have all the information workflows have so we can safely ignore workflows. - // 2) Partitioning by "op" lets us ignore large classes of operations that we don't care about and should - // make queries significantly more efficient. - prefix: 'logs/!{partitionKeyFromQuery:type}/!{partitionKeyFromQuery:op}/!{partitionKeyFromQuery:day}/', - errorOutputPrefix: 'error' } - }) + ) deliveryFirehose.node.addDependency(deliveryStreamRole) + const manualDeliveryStreamName = getCdkNames( + 'manual-stream-delivery', + app.stage + ) + const manualDeliveryFirehose = new firehose.CfnDeliveryStream( + stack, + manualDeliveryStreamName, + { + deliveryStreamName: manualDeliveryStreamName, + deliveryStreamType: 'DirectPut', // Changed to DirectPut + extendedS3DestinationConfiguration: { + bucketArn: streamLogBucket.bucketArn, + roleArn: deliveryStreamRole.roleArn, + bufferingHints: { + intervalInSeconds: 60, + }, + dynamicPartitioningConfiguration: { + enabled: true, + }, + processingConfiguration: { + enabled: true, + processors: [ + { + type: 'MetadataExtraction', + parameters: [ + { + parameterName: 'MetadataExtractionQuery', + // extract yyyy-MM-dd formatted current date from millisecond epoch timestamp "ts" using jq syntax + // extract type ('workflow' or 'receipt') + // extract the UCAN ability of the invocation to a key named "op" - this matches the latest UCAN spec https://github.com/ucan-wg/invocation/pull/21/files#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R208 + // we replace / with _ here since it will be used in the S3 bucket path and we a) don't want it to collide with the path separator and b) want it to be easy to refer to in queries + // eslint-disable-next-line no-useless-escape + parameterValue: + '{day: (.ts/1000) | strftime("%Y-%m-%d"), type: .type, op: (.value.att[0].can | gsub("/"; "_"))}', + }, + { + parameterName: 'JsonParsingEngine', + parameterValue: 'JQ-1.6', + }, + ], + }, + { + type: 'AppendDelimiterToRecord', + parameters: [ + { + parameterName: 'Delimiter', + parameterValue: '\\n', + }, + ], + }, + ], + }, + prefix: + 'logs/!{partitionKeyFromQuery:type}/!{partitionKeyFromQuery:op}/!{partitionKeyFromQuery:day}/', + errorOutputPrefix: 'error', + }, + } + ) + + manualDeliveryFirehose.node.addDependency(deliveryStreamRole) + // Glue database const databaseName = getCdkNames('ucan-stream-delivery-database', app.stage) const glueDatabase = new glue.CfnDatabase(stack, databaseName, { catalogId: Aws.ACCOUNT_ID, databaseInput: { name: databaseName, - } + }, }) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -187,47 +254,55 @@ export function UcanFirehoseStack ({ stack, app }) { name: receiptTableName, partitionKeys: [ { name: 'day', type: 'date' }, - { name: 'op', type: 'string' } + { name: 'op', type: 'string' }, ], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "projection.op.type": "enum", - "projection.op.values": 'access_authorize,access_claim,access_delegate,access_session,admin_store_inspect,admin_upload_inspect,aggregate_accept,aggregate_offer,deal_info,consumer_get,consumer_has,customer_get,filecoin_accept,filecoin_info,filecoin_offer,filecoin_submit,piece_accept,piece_offer,provider_add,rate-limit_add,rate-limit_list,rate-limit_remove,space_info,store_add,store_remove,subscription_get,ucan_revoke,upload_add,upload_list,upload_remove,space_blob_add,web3.storage_blob_allocate,web3.storage_blob_accept,', - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/\${op}/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'projection.op.type': 'enum', + 'projection.op.values': + 'access_authorize,access_claim,access_delegate,access_session,admin_store_inspect,admin_upload_inspect,aggregate_accept,aggregate_offer,deal_info,consumer_get,consumer_has,customer_get,filecoin_accept,filecoin_info,filecoin_offer,filecoin_submit,piece_accept,piece_offer,provider_add,rate-limit_add,rate-limit_list,rate-limit_remove,space_info,store_add,store_remove,subscription_get,ucan_revoke,upload_add,upload_list,upload_remove,space_blob_add,web3.storage_blob_allocate,web3.storage_blob_accept,', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/\${op}/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRING>" }, - { name: "ts", type: "timestamp" } + { + name: 'value', + type: 'STRUCT>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRING>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) receiptTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -237,46 +312,51 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: storeAddTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/store_add/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/store_add/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/store_add/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>>>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRUCT>>" }, - { name: "ts", type: 'timestamp' } + { + name: 'value', + type: 'STRUCT>>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT>>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) storeAddTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -286,46 +366,51 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: spaceBlobAddTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/space_blob_add/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/space_blob_add/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/space_blob_add/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>>>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRUCT>" }, - { name: "ts", type: 'timestamp' } + { + name: 'value', + type: 'STRUCT>>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) spaceBlobAddTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -335,47 +420,52 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: blobAllocateTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/web3.storage_blob_allocate/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/web3.storage_blob_allocate/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/web3.storage_blob_allocate/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT,space:STRING,cause:STRUCT<_cid_slash:STRING>>>>,iss:STRING,aud:STRING>' }, + { + name: 'value', + type: 'STRUCT,space:STRING,cause:STRUCT<_cid_slash:STRING>>>>,iss:STRING,aud:STRING>', + }, // No need to store URL for allocation here for now - { name: "out", type: "STRUCT,ok:STRUCT>" }, - { name: "ts", type: "timestamp" } + { + name: 'out', + type: 'STRUCT,ok:STRUCT>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) blobAllocateTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -385,47 +475,52 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: blobAcceptTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/web3.storage_blob_accept/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/web3.storage_blob_accept/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/web3.storage_blob_accept/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT,space:STRING>>>,iss:STRING,aud:STRING>' }, + { + name: 'value', + type: 'STRUCT,space:STRING>>>,iss:STRING,aud:STRING>', + }, // No need to store URL for allocation here for now - { name: "out", type: "STRUCT,ok:STRUCT>>" }, - { name: "ts", type: "timestamp" } + { + name: 'out', + type: 'STRUCT,ok:STRUCT>>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) blobAcceptTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -435,46 +530,51 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: uploadAddTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/upload_add/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/upload_add/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/upload_add/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT,shards:ARRAY>>>>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRUCT,shards:ARRAY>>>" }, - { name: "ts", type: "timestamp" } + { + name: 'value', + type: 'STRUCT,shards:ARRAY>>>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT,shards:ARRAY>>>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) uploadAddTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -484,46 +584,51 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: storeRemoveTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/store_remove/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/store_remove/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/store_remove/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>>>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRUCT>" }, - { name: "ts", type: "timestamp" } + { + name: 'value', + type: 'STRUCT>>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) storeRemoveTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -533,46 +638,51 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: blobRemoveTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/blob_remove/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/blob_remove/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/blob_remove/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRUCT>" }, - { name: "ts", type: "timestamp" } + { + name: 'value', + type: 'STRUCT>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) blobRemoveTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -582,46 +692,51 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: uploadRemoveTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/upload_remove/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/upload_remove/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/upload_remove/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT,shards:ARRAY>>>>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRUCT,shards:ARRAY>>>" }, - { name: "ts", type: "timestamp" } + { + name: 'value', + type: 'STRUCT,shards:ARRAY>>>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT,shards:ARRAY>>>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) uploadRemoveTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor @@ -631,141 +746,170 @@ export function UcanFirehoseStack ({ stack, app }) { databaseName, tableInput: { name: providerAddTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], + partitionKeys: [{ name: 'day', type: 'date' }], parameters: { - classification: "json", - typeOfData: "file", + classification: 'json', + typeOfData: 'file', // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/provider_add/\${day}/` + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/provider_add/\${day}/`, }, storageDescriptor: { location: `s3://${streamLogBucket.bucketName}/logs/receipt/provider_add/`, columns: [ { name: 'carcid', type: 'string' }, // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>>,iss:STRING,aud:STRING>' }, - { name: "out", type: "STRUCT,ok:STRUCT>>" }, - { name: "ts", type: "timestamp" } + { + name: 'value', + type: 'STRUCT>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT>>', + }, + { name: 'ts', type: 'timestamp' }, ], inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', serdeInfo: { serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', parameters: { // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } - } + 'mapping._cid_slash': '/', + }, + }, + }, + }, }) providerAddTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor - const aggregateOfferTableName = getCdkNames('aggregate-offer-table', app.stage) - const aggregateOfferTable = new glue.CfnTable(stack, aggregateOfferTableName, { - catalogId: Aws.ACCOUNT_ID, - databaseName, - tableInput: { - name: aggregateOfferTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], - parameters: { - classification: "json", - typeOfData: "file", - // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection - // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_offer/\${day}/` + const aggregateOfferTableName = getCdkNames( + 'aggregate-offer-table', + app.stage + ) + const aggregateOfferTable = new glue.CfnTable( + stack, + aggregateOfferTableName, + { + catalogId: Aws.ACCOUNT_ID, + databaseName, + tableInput: { + name: aggregateOfferTableName, + partitionKeys: [{ name: 'day', type: 'date' }], + parameters: { + classification: 'json', + typeOfData: 'file', + // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection + // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_offer/\${day}/`, + }, + storageDescriptor: { + location: `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_offer/`, + columns: [ + { name: 'carcid', type: 'string' }, + // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ + { + name: 'value', + type: 'STRUCT>>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT>>', + }, + { name: 'ts', type: 'timestamp' }, + ], + inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + serdeInfo: { + serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', + parameters: { + // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ + 'mapping._cid_slash': '/', + }, + }, + }, }, - storageDescriptor: { - location: `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_offer/`, - columns: [ - { name: 'carcid', type: 'string' }, - // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>>>,iss:STRING,aud:STRING>' }, - { name: 'out', type: 'STRUCT,ok:STRUCT>>' }, - { name: 'ts', type: 'timestamp' } - ], - inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', - serdeInfo: { - serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', - parameters: { - // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } } - }) + ) aggregateOfferTable.addDependsOn(glueDatabase) - // creates a table that can be seen in the AWS Glue table browser at + // creates a table that can be seen in the AWS Glue table browser at // https://console.aws.amazon.com/glue/home#/v2/data-catalog/tables // and in the data browser in the Athena Query editor at // https://console.aws.amazon.com/athena/home#/query-editor - const aggregateAcceptTableName = getCdkNames('aggregate-accept-table', app.stage) - const aggregateAcceptTable = new glue.CfnTable(stack, aggregateAcceptTableName, { - catalogId: Aws.ACCOUNT_ID, - databaseName, - tableInput: { - name: aggregateAcceptTableName, - partitionKeys: [ - { name: 'day', type: 'date' } - ], - parameters: { - classification: "json", - typeOfData: "file", - // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection - // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream - "projection.enabled": "true", - "projection.day.type": "date", - "projection.day.format": "yyyy-MM-dd", - "projection.day.range": "2023-01-01,NOW", - "projection.day.interval": "1", - "projection.day.interval.unit": "DAYS", - "storage.location.template": `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_accept/\${day}/` + const aggregateAcceptTableName = getCdkNames( + 'aggregate-accept-table', + app.stage + ) + const aggregateAcceptTable = new glue.CfnTable( + stack, + aggregateAcceptTableName, + { + catalogId: Aws.ACCOUNT_ID, + databaseName, + tableInput: { + name: aggregateAcceptTableName, + partitionKeys: [{ name: 'day', type: 'date' }], + parameters: { + classification: 'json', + typeOfData: 'file', + // @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection + // configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream + 'projection.enabled': 'true', + 'projection.day.type': 'date', + 'projection.day.format': 'yyyy-MM-dd', + 'projection.day.range': '2023-01-01,NOW', + 'projection.day.interval': '1', + 'projection.day.interval.unit': 'DAYS', + 'storage.location.template': `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_accept/\${day}/`, + }, + storageDescriptor: { + location: `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_accept/`, + columns: [ + { name: 'carcid', type: 'string' }, + // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ + { + name: 'value', + type: 'STRUCT>>>,iss:STRING,aud:STRING>', + }, + { + name: 'out', + type: 'STRUCT,ok:STRUCT>>', + }, + { name: 'ts', type: 'timestamp' }, + ], + inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', + outputFormat: + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + serdeInfo: { + serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', + parameters: { + // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ + 'mapping._cid_slash': '/', + }, + }, + }, }, - storageDescriptor: { - location: `s3://${streamLogBucket.bucketName}/logs/receipt/aggregate_accept/`, - columns: [ - { name: 'carcid', type: 'string' }, - // STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - { name: 'value', type: 'STRUCT>>>,iss:STRING,aud:STRING>' }, - { name: 'out', type: 'STRUCT,ok:STRUCT>>' }, - { name: 'ts', type: 'timestamp' } - ], - inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', - outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', - serdeInfo: { - serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe', - parameters: { - // see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/ - 'mapping._cid_slash': '/' - } - } - } } - }) + ) aggregateAcceptTable.addDependsOn(glueDatabase) const athenaResultsBucket = new Bucket(stack, 'athena-w3up-results', { @@ -773,16 +917,18 @@ export function UcanFirehoseStack ({ stack, app }) { cdk: { bucket: { ...getBucketConfig('athena-w3up-results', app.stage), - lifecycleRules: [{ - enabled: true, - expiration: Duration.days(7) - }] - } - } + lifecycleRules: [ + { + enabled: true, + expiration: Duration.days(7), + }, + ], + }, + }, }) // create a workgroup to keep queries organized by `app.stage` - // to use the queries below, Athena users must select the appropriate + // to use the queries below, Athena users must select the appropriate // workspace in the query editor at: // https://console.aws.amazon.com/athena/home#/query-editor const workgroupName = getCdkNames('w3up', app.stage) @@ -790,36 +936,40 @@ export function UcanFirehoseStack ({ stack, app }) { name: workgroupName, workGroupConfiguration: { resultConfiguration: { - outputLocation: `s3://${athenaResultsBucket.bucketName}` - } - } + outputLocation: `s3://${athenaResultsBucket.bucketName}`, + }, + }, }) - // create a query that can be executed by going to + // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right const inputOutputQueryName = getCdkNames('input-output-query', app.stage) - const inputOutputQuery = new athena.CfnNamedQuery(stack, inputOutputQueryName, { - name: "Inputs and Outputs, last 24 hours", - description: `${app.stage} w3up preload`, - database: databaseName, - workGroup: workgroupName, - queryString: `SELECT + const inputOutputQuery = new athena.CfnNamedQuery( + stack, + inputOutputQueryName, + { + name: 'Inputs and Outputs, last 24 hours', + description: `${app.stage} w3up preload`, + database: databaseName, + workGroup: workgroupName, + queryString: `SELECT value.att[1] as "in", out FROM "AwsDataCatalog"."${databaseName}"."${receiptTableName}" WHERE day >= (CURRENT_DATE - INTERVAL '1' DAY) -` - }) +`, + } + ) inputOutputQuery.addDependsOn(workgroup) inputOutputQuery.addDependsOn(receiptTable) - // create a query that can be executed by going to + // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right const dataStoredQueryName = getCdkNames('data-stored-query', app.stage) const dataStoredQuery = new athena.CfnNamedQuery(stack, dataStoredQueryName, { - name: "Data stored by space, past 7 days", + name: 'Data stored by space, past 7 days', description: `${app.stage} w3up preload`, database: databaseName, workGroup: workgroupName, @@ -830,23 +980,26 @@ FROM "AwsDataCatalog"."${databaseName}"."${storeAddTableName}" WHERE out.ok IS NOT NULL AND day >= (CURRENT_DATE - INTERVAL '7' DAY) GROUP BY value.att[1]."with" -` +`, }) dataStoredQuery.addDependsOn(workgroup) dataStoredQuery.addDependsOn(storeAddTable) - // create a query that can be executed by going to + // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right const storesBySpaceQueryName = getCdkNames('stores-by-space-query', app.stage) - const storesBySpaceQuery = new athena.CfnNamedQuery(stack, storesBySpaceQueryName, { - name: "Stores, past 7 days", - description: `${app.stage} w3up preload + const storesBySpaceQuery = new athena.CfnNamedQuery( + stack, + storesBySpaceQueryName, + { + name: 'Stores, past 7 days', + description: `${app.stage} w3up preload Recent uploaded CARs by Customer email and CID`, - database: databaseName, - workGroup: workgroupName, - queryString: `WITH + database: databaseName, + workGroup: workgroupName, + queryString: `WITH spaces AS ( SELECT value.att[1].nb.consumer AS did, value.att[1]."with" AS account @@ -872,18 +1025,19 @@ stores_by_account AS ( ) SELECT * FROM stores_by_account ORDER BY ts -` - }) +`, + } + ) storesBySpaceQuery.addDependsOn(workgroup) storesBySpaceQuery.addDependsOn(providerAddTable) storesBySpaceQuery.addDependsOn(storeAddTable) - // create a query that can be executed by going to + // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right const uploadsQueryName = getCdkNames('uploads-query', app.stage) const uploadsQuery = new athena.CfnNamedQuery(stack, uploadsQueryName, { - name: "Uploads, past 7 days", + name: 'Uploads, past 7 days', description: `${app.stage} w3up preload Recent uploaded content by Customer email and CID`, @@ -915,24 +1069,30 @@ uploads_by_account AS ( ) SELECT * FROM uploads_by_account ORDER BY ts - ` + `, }) uploadsQuery.addDependsOn(workgroup) uploadsQuery.addDependsOn(providerAddTable) uploadsQuery.addDependsOn(uploadAddTable) - // create a query that can be executed by going to + // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right - const uploadVolumeSizeQueryName = getCdkNames('upload-volume-size-query', app.stage) - const uploadVolumeSizeQuery = new athena.CfnNamedQuery(stack, uploadVolumeSizeQueryName, { - name: "Users with highest upload volume (by size), past day", - description: `${app.stage} w3up preload + const uploadVolumeSizeQueryName = getCdkNames( + 'upload-volume-size-query', + app.stage + ) + const uploadVolumeSizeQuery = new athena.CfnNamedQuery( + stack, + uploadVolumeSizeQueryName, + { + name: 'Users with highest upload volume (by size), past day', + description: `${app.stage} w3up preload Global view of users with most upload volume (by size) in the last 24 hours by their registered email`, - database: databaseName, - workGroup: workgroupName, - queryString: `WITH + database: databaseName, + workGroup: workgroupName, + queryString: `WITH spaces AS ( SELECT value.att[1].nb.consumer AS did, value.att[1]."with" AS account @@ -956,8 +1116,9 @@ stores_by_account AS ( FROM stores_by_account GROUP BY account ORDER BY size DESC -` - }) +`, + } + ) uploadVolumeSizeQuery.addDependsOn(workgroup) uploadVolumeSizeQuery.addDependsOn(providerAddTable) uploadVolumeSizeQuery.addDependsOn(storeAddTable) @@ -965,15 +1126,21 @@ stores_by_account AS ( // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right - const uploadVolumeCountQueryName = getCdkNames('upload-volume-count-query', app.stage) - const uploadVolumeCountQuery = new athena.CfnNamedQuery(stack, uploadVolumeCountQueryName, { - name: "Users with highest upload volume (by count), past day", - description: `${app.stage} w3up preload + const uploadVolumeCountQueryName = getCdkNames( + 'upload-volume-count-query', + app.stage + ) + const uploadVolumeCountQuery = new athena.CfnNamedQuery( + stack, + uploadVolumeCountQueryName, + { + name: 'Users with highest upload volume (by count), past day', + description: `${app.stage} w3up preload Global view of users with most upload volume (by count) in the last 24 hours by their registered email`, - database: databaseName, - workGroup: workgroupName, - queryString: `WITH + database: databaseName, + workGroup: workgroupName, + queryString: `WITH spaces AS ( SELECT value.att[1].nb.consumer AS did, value.att[1]."with" AS account @@ -998,8 +1165,9 @@ uploads_by_account AS ( FROM uploads_by_account GROUP BY account ORDER BY count DESC -` - }) +`, + } + ) uploadVolumeCountQuery.addDependsOn(workgroup) uploadVolumeCountQuery.addDependsOn(providerAddTable) uploadVolumeCountQuery.addDependsOn(storeAddTable) @@ -1007,15 +1175,21 @@ uploads_by_account AS ( // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right - const uploadsBySpaceAndSizeQueryName = getCdkNames('uploads-by-space-and-size', app.stage) - const uploadsBySpaceAndSizeQuery = new athena.CfnNamedQuery(stack, uploadsBySpaceAndSizeQueryName, { - name: "Uploads by space and size, last 2 days", - description: `${app.stage} w3up preload + const uploadsBySpaceAndSizeQueryName = getCdkNames( + 'uploads-by-space-and-size', + app.stage + ) + const uploadsBySpaceAndSizeQuery = new athena.CfnNamedQuery( + stack, + uploadsBySpaceAndSizeQueryName, + { + name: 'Uploads by space and size, last 2 days', + description: `${app.stage} w3up preload Uploads over the last 2 days, with size aggregated from corresponding "store" operations.`, - database: databaseName, - workGroup: workgroupName, - queryString: `WITH + database: databaseName, + workGroup: workgroupName, + queryString: `WITH uploads_by_shard AS ( SELECT carcid AS id, @@ -1053,24 +1227,31 @@ FROM upload_shards_by_size WHERE upload_ts >= (CURRENT_TIMESTAMP - INTERVAL '2' DAY) GROUP BY upload_ts, space, content_cid ORDER BY upload_ts DESC -` - }) +`, + } + ) uploadsBySpaceAndSizeQuery.addDependsOn(workgroup) uploadsBySpaceAndSizeQuery.addDependsOn(uploadAddTable) uploadsBySpaceAndSizeQuery.addDependsOn(storeAddTable) - // create a query that can be executed by going to + // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right - const aggregateHoursToDataCommitedQueryName = getCdkNames('aggregate-hours-to-data-commited-query', app.stage) - const aggregateHoursToDataCommitedQuery = new athena.CfnNamedQuery(stack, aggregateHoursToDataCommitedQueryName, { - name: "Hours to data commited per aggregate in the last 7 days", - description: `${app.stage} w3up preload + const aggregateHoursToDataCommitedQueryName = getCdkNames( + 'aggregate-hours-to-data-commited-query', + app.stage + ) + const aggregateHoursToDataCommitedQuery = new athena.CfnNamedQuery( + stack, + aggregateHoursToDataCommitedQueryName, + { + name: 'Hours to data commited per aggregate in the last 7 days', + description: `${app.stage} w3up preload Hours to data commited per aggregate in the last 7 days`, - database: databaseName, - workGroup: workgroupName, - queryString: `WITH + database: databaseName, + workGroup: workgroupName, + queryString: `WITH accepted_aggregates AS ( SELECT value.att[1].nb.aggregate._cid_slash as cid, ts as accept_ts @@ -1083,55 +1264,67 @@ SELECT cid, CAST((to_unixtime(accept_ts) - to_unixtime(ts))/3600 as integer) as hrs_to_data_commited FROM "AwsDataCatalog"."${databaseName}."${aggregateOfferTableName}" INNER JOIN accepted_aggregates ON accepted_aggregates.cid = value.att[1].nb.aggregate._cid_slash -` - }) +`, + } + ) aggregateHoursToDataCommitedQuery.addDependsOn(workgroup) aggregateHoursToDataCommitedQuery.addDependsOn(aggregateAcceptTable) aggregateHoursToDataCommitedQuery.addDependsOn(aggregateOfferTable) // configure the Athena Dynamo connector - // Considering Lambda functions limits response sizes, responses larger than the threshold - // spill into an Amazon S3 location that you specify when you create your Lambda function. + // Considering Lambda functions limits response sizes, responses larger than the threshold + // spill into an Amazon S3 location that you specify when you create your Lambda function. // Athena reads these responses from Amazon S3 directly. const athenaDynamoSpillBucket = new Bucket(stack, 'athena-dynamo-spill', { cors: true, cdk: { bucket: { ...getBucketConfig('athena-dynamo-spill', app.stage), - lifecycleRules: [{ - enabled: true, - expiration: Duration.days(1) - }] - } - } + lifecycleRules: [ + { + enabled: true, + expiration: Duration.days(1), + }, + ], + }, + }, }) const dynamoAthenaLambdaName = getCdkNames('dynamo-athena', app.stage) - const athenaDynamoConnector = new sam.CfnApplication(stack, getCdkNames('athena-dynamo-connector', app.stage), { - // I got this ARN and version from the AWS admin UI after configuring the Athena Dynamo connector manually using these instructions: - // https://docs.aws.amazon.com/athena/latest/ug/connect-data-source-serverless-app-repo.html - location: { - applicationId: 'arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaDynamoDBConnector', - semanticVersion: '2023.38.1' - }, - parameters: { - AthenaCatalogName: dynamoAthenaLambdaName, - SpillBucket: athenaDynamoSpillBucket.bucketName + const athenaDynamoConnector = new sam.CfnApplication( + stack, + getCdkNames('athena-dynamo-connector', app.stage), + { + // I got this ARN and version from the AWS admin UI after configuring the Athena Dynamo connector manually using these instructions: + // https://docs.aws.amazon.com/athena/latest/ug/connect-data-source-serverless-app-repo.html + location: { + applicationId: + 'arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaDynamoDBConnector', + semanticVersion: '2023.38.1', + }, + parameters: { + AthenaCatalogName: dynamoAthenaLambdaName, + SpillBucket: athenaDynamoSpillBucket.bucketName, + }, } - }) + ) // creates an Athena data source that will enable Athena to query our dynamo tables: // https://console.aws.amazon.com/athena/home#/data-sources const dynamoDataCatalogName = getCdkNames('dynamo-data-catalog', app.stage) const dynamoDataCatalogDatabaseName = getCdkNames('dynamo', app.stage) - const dynamoDataCatalog = new athena.CfnDataCatalog(stack, dynamoDataCatalogName, { - name: dynamoDataCatalogDatabaseName, - type: 'LAMBDA', - parameters: { - function: `arn:aws:lambda:${stack.region}:${stack.account}:function:${dynamoAthenaLambdaName}` + const dynamoDataCatalog = new athena.CfnDataCatalog( + stack, + dynamoDataCatalogName, + { + name: dynamoDataCatalogDatabaseName, + type: 'LAMBDA', + parameters: { + function: `arn:aws:lambda:${stack.region}:${stack.account}:function:${dynamoAthenaLambdaName}`, + }, } - }) + ) dynamoDataCatalog.addDependsOn(athenaDynamoConnector) // queries that depend on the Athena Dynamo connector @@ -1139,33 +1332,46 @@ INNER JOIN accepted_aggregates ON accepted_aggregates.cid = value.att[1].nb.aggr // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right - const spacesByAccountQueryName = getCdkNames('spaces-by-account-query', app.stage) - const spacesByAccountQuery = new athena.CfnNamedQuery(stack, spacesByAccountQueryName, { - name: "Dynamo: spaces by account", - description: `${app.stage} w3up preload`, - database: databaseName, - workGroup: workgroupName, - queryString: `SELECT + const spacesByAccountQueryName = getCdkNames( + 'spaces-by-account-query', + app.stage + ) + const spacesByAccountQuery = new athena.CfnNamedQuery( + stack, + spacesByAccountQueryName, + { + name: 'Dynamo: spaces by account', + description: `${app.stage} w3up preload`, + database: databaseName, + workGroup: workgroupName, + queryString: `SELECT customer as account, consumer as space FROM "${dynamoDataCatalogDatabaseName}"."default"."${app.stage}-w3infra-subscription" AS sub LEFT JOIN "${dynamoDataCatalogDatabaseName}"."default"."${app.stage}-w3infra-consumer" AS space ON space.subscription = sub.subscription -` - }) +`, + } + ) spacesByAccountQuery.addDependsOn(dynamoDataCatalog) spacesByAccountQuery.addDependsOn(workgroup) // create a query that can be executed by going to // https://console.aws.amazon.com/athena/home#/query-editor/saved-queries // and selecting the appropriate Workgroup from the dropdown in the upper right - const uploadsByAccountQueryName = getCdkNames('uploads-by-account-query', app.stage) - const uploadsByAccountQuery = new athena.CfnNamedQuery(stack, uploadsByAccountQueryName, { - name: "Dynamo: uploads by account", - description: `${app.stage} w3up preload`, - database: databaseName, - workGroup: workgroupName, - queryString: `WITH + const uploadsByAccountQueryName = getCdkNames( + 'uploads-by-account-query', + app.stage + ) + const uploadsByAccountQuery = new athena.CfnNamedQuery( + stack, + uploadsByAccountQueryName, + { + name: 'Dynamo: uploads by account', + description: `${app.stage} w3up preload`, + database: databaseName, + workGroup: workgroupName, + queryString: `WITH spaces AS ( SELECT customer as account, consumer as did @@ -1191,10 +1397,10 @@ uploads_by_account AS ( ) SELECT * FROM uploads_by_account ORDER BY timestamp -` - }) +`, + } + ) uploadsByAccountQuery.addDependsOn(receiptTable) uploadsByAccountQuery.addDependsOn(dynamoDataCatalog) uploadsByAccountQuery.addDependsOn(workgroup) - }