Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kinesis delivery stream updates #237

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"test-integration": "ava --verbose --timeout=60s test/*.test.js",
"fetch-metrics-for-space": "npm run fetch-metrics-for-space -w tools",
"d1-dynamo-migration": "npm run d1-dynamo-migration -w tools"

},
"devDependencies": {
"@ipld/car": "^5.1.0",
Expand Down
170 changes: 156 additions & 14 deletions stacks/firehose-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import {
aws_iam as iam,
aws_logs as logs,
aws_kinesisfirehose as firehose,
aws_glue as glue
aws_glue as glue,
aws_athena as athena
} from 'aws-cdk-lib'

import { UcanInvocationStack } from './ucan-invocation-stack.js'
Expand All @@ -19,7 +20,7 @@ import {
/**
* @param {import('@serverless-stack/resources').StackContext} properties
*/
export function UcanFirehoseStack({ stack, app }) {
export function UcanFirehoseStack ({ stack, app }) {
stack.setDefaultFunctionProps({
srcPath: 'ucan-firehose'
})
Expand Down Expand Up @@ -114,7 +115,7 @@ export function UcanFirehoseStack({ stack, app }) {
bufferingHints: {
intervalInSeconds: 60
},
// makes easier to run hight performance, cost efficient analytics with Athena
// makes easier to run high performance, cost efficient analytics with Athena
dynamicPartitioningConfiguration: {
enabled: true
},
Expand All @@ -126,7 +127,8 @@ export function UcanFirehoseStack({ stack, app }) {
parameters: [
{
parameterName: 'MetadataExtractionQuery',
parameterValue: '{issuer: .value.iss}',
// extract yyyy-MM-dd formatted current date from millisecond epoch timestamp "ts" using jq syntax
parameterValue: '{day: (.ts/1000) | strftime("%Y-%m-%d")}',
travis marked this conversation as resolved.
Show resolved Hide resolved
},
{
parameterName: 'JsonParsingEngine',
Expand All @@ -145,8 +147,11 @@ export function UcanFirehoseStack({ stack, app }) {
},
],
},
// See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html
prefix: 'logs/issuer=!{partitionKeyFromQuery:issuer}/',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my logic here was to enable performant queries per issuer.

However, after reading through requirements docs, I think we should partition as you suggest instead. We can still query by issuer, but within a range, instead of all time. Which is good, given we already save space metrics anyway :)

// See https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for general information on partitioning.
// Daily partitions seem right (https://www.upsolver.com/blog/partitioning-data-s3-improve-performance-athena-presto)
// "A rough rule of thumb is that each 100 partitions scanned adds about 1 second of latency to your query in Amazon Athena. This is why minutely or hourly
// partitions are rarely used – typically you would choose between daily, weekly, and monthly partitions, depending on the nature of your queries."
prefix: 'logs/!{partitionKeyFromQuery:day}/',
errorOutputPrefix: 'error'
}
})
Expand All @@ -162,36 +167,173 @@ export function UcanFirehoseStack({ stack, app }) {
}
})

// TODO: See https://catalog.us-east-1.prod.workshops.aws/workshops/fad47f62-3d06-430b-ad32-8588b74fe16f/en-US/lab-5-athena/55-athena-best-practices
const tableName = getCdkNames('ucan-stream-delivery-table', app.stage)
const glueTable = new glue.CfnTable(stack, tableName, {
catalogId: Aws.ACCOUNT_ID,
databaseName,
tableInput: {
name: tableName,
partitionKeys: [],
partitionKeys: [
{ name: 'day', type: 'date' },
],
parameters: {
classification: "json",
typeOfData: "file",
// @see https://docs.aws.amazon.com/athena/latest/ug/partition-projection-kinesis-firehose-example.html for more information on projection
// configuration - this should match the "day" parameter and S3 prefix configured in the delivery stream
"projection.enabled": "true",
"projection.day.type": "date",
"projection.day.format": "yyyy-MM-dd",
"projection.day.range": "2023-01-01,NOW",
travis marked this conversation as resolved.
Show resolved Hide resolved
"projection.day.interval": "1",
"projection.day.interval.unit": "DAYS",
"storage.location.template": `s3://${streamLogBucket.bucketName}/logs/\${day}/`
},
storageDescriptor: {
location: `s3://${streamLogBucket.bucketName}/logs`,
columns: [
{ name: 'carcid', type: 'string' },
{ name: 'type', type: 'string' },
{ name: 'value', type: 'STRUCT<att:ARRAY<struct<can:STRING>>,iss:STRING,aud:STRING>' }
// STRUCT here refers to the Apache Hive STRUCT datatype - see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/
{ name: 'value', type: 'STRUCT<att:ARRAY<struct<can:STRING,with:STRING,nb:STRUCT<size:BIGINT,root:STRUCT<_cid_slash:STRING>,consumer:STRING>>>,iss:STRING,aud:STRING>' },
{ name: "out", type: "STRUCT<error:STRUCT<name:STRING>,ok:STRUCT<id:STRING,delegations:STRING>>" },
{ name: "ts", type: "timestamp" }
],
inputFormat: 'org.apache.hadoop.mapred.TextInputFormat',
outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
parameters: {
"classification": "json",
"typeOfData": "file"
},
serdeInfo: {
serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe',
parameters: {
paths: 'carCid,invocationCid,out,ts,type,value'
// see https://aws.amazon.com/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/
'mapping._cid_slash': '/'
}
}
}
}
})

glueTable.addDependsOn(glueDatabase)

const inputOutputQueryName = getCdkNames('input-output-query', app.stage)
const inputOutputQuery = new athena.CfnNamedQuery(stack, inputOutputQueryName, {
name: "Inputs and Outputs, last 24 hours",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1] as "in",
out
FROM "${tableName}"
WHERE type = 'receipt'
AND day > (CURRENT_DATE - INTERVAL '1' DAY)
`
})
inputOutputQuery.addDependsOn(glueTable)

const dataStoredQueryName = getCdkNames('data-stored-query', app.stage)
const dataStoredQuery = new athena.CfnNamedQuery(stack, dataStoredQueryName, {
name: "Data stored by space, last week",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
SUM(value.att[1].nb.size) AS size,
value.att[1]."with" AS space
FROM "${tableName}"
WHERE value.att[1].can='store/add'
AND out.ok IS NOT NULL
AND type='receipt'
AND day > (CURRENT_DATE - INTERVAL '7' DAY)
GROUP BY value.att[1]."with"
`
})
dataStoredQuery.addDependsOn(glueTable)

const storesBySpaceQueryName = getCdkNames('stores-by-space-query', app.stage)
const storesBySpaceQuery = new athena.CfnNamedQuery(stack, storesBySpaceQueryName, {
name: "Stores by space, last week",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1].nb.size AS size,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
WHERE value.att[1].can='store/add'
AND day > (CURRENT_DATE - INTERVAL '2' DAY)
AND out.ok IS NOT NULL
AND type='receipt'
`
})
storesBySpaceQuery.addDependsOn(glueTable)

const uploadsQueryName = getCdkNames('uploads-query', app.stage)
const uploadsQuery = new athena.CfnNamedQuery(stack, uploadsQueryName, {
name: "Uploads, last week",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1].nb.root._cid_slash AS cid,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
WHERE value.att[1].can='upload/add'
AND day > (CURRENT_DATE - INTERVAL '7' DAY)
AND out.ok IS NOT NULL
AND type='receipt'
ORDER BY ts
`
})
uploadsQuery.addDependsOn(glueTable)

const spacesByAccountQueryName = getCdkNames('spaces-by-account-query', app.stage)
const spacesByAccountQuery = new athena.CfnNamedQuery(stack, spacesByAccountQueryName, {
name: "Spaces by account",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1].nb.consumer AS space,
value.att[1]."with" AS account
FROM "${tableName}"
WHERE value.att[1].can='provider/add'
AND out.ok IS NOT NULL
AND type='receipt'
`
})
spacesByAccountQuery.addDependsOn(glueTable)

const uploadsByAccountQueryName = getCdkNames('uploads-by-account-query', app.stage)
const uploadsByAccountQuery = new athena.CfnNamedQuery(stack, uploadsByAccountQueryName, {
name: "Uploads by account",
description: "(w3up preloaded)",
database: databaseName,
queryString: `WITH
spaces AS (
SELECT value.att[1].nb.consumer AS did,
value.att[1]."with" AS account
FROM "${tableName}"
WHERE value.att[1].can='provider/add'
AND out.ok IS NOT NULL
AND type='receipt'
),
uploads AS (
SELECT value.att[1].nb.root._cid_slash AS cid,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
WHERE value.att[1].can='upload/add'
AND out.ok IS NOT NULL
AND type='receipt'
),
uploads_by_account AS (
SELECT spaces.did AS space,
spaces.account AS account,
uploads.cid AS cid,
uploads.ts AS "timestamp"
FROM uploads LEFT JOIN spaces on spaces.did = uploads.space
) SELECT *
FROM uploads_by_account
ORDER BY timestamp
`
})
uploadsByAccountQuery.addDependsOn(glueTable)

}