Skip to content

Commit

Permalink
feat: kinesis delivery stream updates (#237)
Browse files Browse the repository at this point in the history
merging into #191 - mostly put this up so @vasco-santos can review
before I push it into the PR he opened!

- implement date based partitioning
- expand `value` and `out` struct definitions to allow for more
interesting queries
  • Loading branch information
travis authored Sep 26, 2023
1 parent e37a0e3 commit 66ad70c
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 29 deletions.
21 changes: 7 additions & 14 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"test-integration": "ava --verbose --timeout=60s test/*.test.js",
"fetch-metrics-for-space": "npm run fetch-metrics-for-space -w tools",
"d1-dynamo-migration": "npm run d1-dynamo-migration -w tools"

},
"devDependencies": {
"@ipld/car": "^5.1.0",
Expand Down
170 changes: 156 additions & 14 deletions stacks/firehose-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import {
aws_iam as iam,
aws_logs as logs,
aws_kinesisfirehose as firehose,
aws_glue as glue
aws_glue as glue,
aws_athena as athena
} from 'aws-cdk-lib'

import { UcanInvocationStack } from './ucan-invocation-stack.js'
Expand All @@ -19,7 +20,7 @@ import {
/**
* @param {import('@serverless-stack/resources').StackContext} properties
*/
export function UcanFirehoseStack({ stack, app }) {
export function UcanFirehoseStack ({ stack, app }) {
stack.setDefaultFunctionProps({
srcPath: 'ucan-firehose'
})
Expand Down Expand Up @@ -114,7 +115,7 @@ export function UcanFirehoseStack({ stack, app }) {
bufferingHints: {
intervalInSeconds: 60
},
// makes easier to run hight performance, cost efficient analytics with Athena
// makes easier to run high performance, cost efficient analytics with Athena
dynamicPartitioningConfiguration: {
enabled: true
},
Expand All @@ -126,7 +127,8 @@ export function UcanFirehoseStack({ stack, app }) {
parameters: [
{
parameterName: 'MetadataExtractionQuery',
parameterValue: '{issuer: .value.iss}',
// extract yyyy-MM-dd formatted current date from millisecond epoch timestamp "ts" using jq syntax
parameterValue: '{day: (.ts/1000) | strftime("%Y-%m-%d")}',
},
{
parameterName: 'JsonParsingEngine',
Expand All @@ -145,8 +147,11 @@ export function UcanFirehoseStack({ stack, app }) {
},
],
},
// See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html
prefix: 'logs/issuer=!{partitionKeyFromQuery:issuer}/',
// See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for general information on partitioning.
// Daily partitions seem right (https://www.upsolver.com/blog/partitioning-data-s3-improve-performance-athena-presto)
// "A rough rule of thumb is that each 100 partitions scanned adds about 1 second of latency to your query in Amazon Athena. This is why minutely or hourly
// partitions are rarely used – typically you would choose between daily, weekly, and monthly partitions, depending on the nature of your queries."
prefix: 'logs/!{partitionKeyFromQuery:day}/',
errorOutputPrefix: 'error'
}
})
Expand All @@ -162,36 +167,173 @@ export function UcanFirehoseStack({ stack, app }) {
}
})

// TODO: See https://catalog.us-east-1.prod.workshops.aws/workshops/fad47f62-3d06-430b-ad32-8588b74fe16f/en-US/lab-5-athena/55-athena-best-practices
const tableName = getCdkNames('ucan-stream-delivery-table', app.stage)
const glueTable = new glue.CfnTable(stack, tableName, {
catalogId: Aws.ACCOUNT_ID,
databaseName,
tableInput: {
name: tableName,
partitionKeys: [],
partitionKeys: [
{ name: 'day', type: 'date' },
],
parameters: {
classification: "json",
typeOfData: "file",
// @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection
// configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream
"projection.enabled": "true",
"projection.day.type": "date",
"projection.day.format": "yyyy-MM-dd",
"projection.day.range": "2023-01-01,NOW",
"projection.day.interval": "1",
"projection.day.interval.unit": "DAYS",
"storage.location.template": `s3://${streamLogBucket.bucketName}/logs/\${day}/`
},
storageDescriptor: {
location: `s3://${streamLogBucket.bucketName}/logs`,
columns: [
{ name: 'carcid', type: 'string' },
{ name: 'type', type: 'string' },
{ name: 'value', type: 'STRUCT<att:ARRAY<struct<can:STRING>>,iss:STRING,aud:STRING>' }
// STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/
{ name: 'value', type: 'STRUCT<att:ARRAY<struct<can:STRING,with:STRING,nb:STRUCT<size:BIGINT,root:STRUCT<_cid_slash:STRING>,consumer:STRING>>>,iss:STRING,aud:STRING>' },
{ name: "out", type: "STRUCT<error:STRUCT<name:STRING>,ok:STRUCT<id:STRING,delegations:STRING>>" },
{ name: "ts", type: "timestamp" }
],
inputFormat: 'org.apache.hadoop.mapred.TextInputFormat',
outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
parameters: {
"classification": "json",
"typeOfData": "file"
},
serdeInfo: {
serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe',
parameters: {
paths: 'carCid,invocationCid,out,ts,type,value'
// see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/
'mapping._cid_slash': '/'
}
}
}
}
})

glueTable.addDependsOn(glueDatabase)

const inputOutputQueryName = getCdkNames('input-output-query', app.stage)
const inputOutputQuery = new athena.CfnNamedQuery(stack, inputOutputQueryName, {
name: "Inputs and Outputs, last 24 hours",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1] as "in",
out
FROM "${tableName}"
WHERE type = 'receipt'
AND day > (CURRENT_DATE - INTERVAL '1' DAY)
`
})
inputOutputQuery.addDependsOn(glueTable)

const dataStoredQueryName = getCdkNames('data-stored-query', app.stage)
const dataStoredQuery = new athena.CfnNamedQuery(stack, dataStoredQueryName, {
name: "Data stored by space, last week",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
SUM(value.att[1].nb.size) AS size,
value.att[1]."with" AS space
FROM "${tableName}"
WHERE value.att[1].can='store/add'
AND out.ok IS NOT NULL
AND type='receipt'
AND day > (CURRENT_DATE - INTERVAL '7' DAY)
GROUP BY value.att[1]."with"
`
})
dataStoredQuery.addDependsOn(glueTable)

const storesBySpaceQueryName = getCdkNames('stores-by-space-query', app.stage)
const storesBySpaceQuery = new athena.CfnNamedQuery(stack, storesBySpaceQueryName, {
name: "Stores by space, last week",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1].nb.size AS size,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
WHERE value.att[1].can='store/add'
AND day > (CURRENT_DATE - INTERVAL '2' DAY)
AND out.ok IS NOT NULL
AND type='receipt'
`
})
storesBySpaceQuery.addDependsOn(glueTable)

const uploadsQueryName = getCdkNames('uploads-query', app.stage)
const uploadsQuery = new athena.CfnNamedQuery(stack, uploadsQueryName, {
name: "Uploads, last week",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1].nb.root._cid_slash AS cid,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
WHERE value.att[1].can='upload/add'
AND day > (CURRENT_DATE - INTERVAL '7' DAY)
AND out.ok IS NOT NULL
AND type='receipt'
ORDER BY ts
`
})
uploadsQuery.addDependsOn(glueTable)

const spacesByAccountQueryName = getCdkNames('spaces-by-account-query', app.stage)
const spacesByAccountQuery = new athena.CfnNamedQuery(stack, spacesByAccountQueryName, {
name: "Spaces by account",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1].nb.consumer AS space,
value.att[1]."with" AS account
FROM "${tableName}"
WHERE value.att[1].can='provider/add'
AND out.ok IS NOT NULL
AND type='receipt'
`
})
spacesByAccountQuery.addDependsOn(glueTable)

const uploadsByAccountQueryName = getCdkNames('uploads-by-account-query', app.stage)
const uploadsByAccountQuery = new athena.CfnNamedQuery(stack, uploadsByAccountQueryName, {
name: "Uploads by account",
description: "(w3up preloaded)",
database: databaseName,
queryString: `WITH
spaces AS (
SELECT value.att[1].nb.consumer AS did,
value.att[1]."with" AS account
FROM "${tableName}"
WHERE value.att[1].can='provider/add'
AND out.ok IS NOT NULL
AND type='receipt'
),
uploads AS (
SELECT value.att[1].nb.root._cid_slash AS cid,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
WHERE value.att[1].can='upload/add'
AND out.ok IS NOT NULL
AND type='receipt'
),
uploads_by_account AS (
SELECT spaces.did AS space,
spaces.account AS account,
uploads.cid AS cid,
uploads.ts AS "timestamp"
FROM uploads LEFT JOIN spaces on spaces.did = uploads.space
) SELECT *
FROM uploads_by_account
ORDER BY timestamp
`
})
uploadsByAccountQuery.addDependsOn(glueTable)

}

0 comments on commit 66ad70c

Please sign in to comment.