Skip to content

Commit

Permalink
feat: add dynamo connector
Browse files Browse the repository at this point in the history
plus example query joining dynamo to ucan logs
  • Loading branch information
travis committed Sep 26, 2023
1 parent e3833a0 commit 37a476a
Showing 1 changed file with 61 additions and 22 deletions.
83 changes: 61 additions & 22 deletions stacks/firehose-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {
aws_logs as logs,
aws_kinesisfirehose as firehose,
aws_glue as glue,
aws_athena as athena
aws_athena as athena,
aws_sam
} from 'aws-cdk-lib'

import { UcanInvocationStack } from './ucan-invocation-stack.js'
Expand Down Expand Up @@ -222,7 +223,7 @@ export function UcanFirehoseStack ({ stack, app }) {
queryString: `SELECT
value.att[1] as "in",
out
FROM "${tableName}"
FROM "AwsDataCatalog"."${databaseName}"."${tableName}"
WHERE type = 'receipt'
AND day > (CURRENT_DATE - INTERVAL '1' DAY)
`
Expand All @@ -237,7 +238,7 @@ WHERE type = 'receipt'
queryString: `SELECT
SUM(value.att[1].nb.size) AS size,
value.att[1]."with" AS space
FROM "${tableName}"
FROM "AwsDataCatalog"."${databaseName}"."${tableName}"
WHERE value.att[1].can='store/add'
AND out.ok IS NOT NULL
AND type='receipt'
Expand All @@ -256,7 +257,7 @@ GROUP BY value.att[1]."with"
value.att[1].nb.size AS size,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
FROM "AwsDataCatalog"."${databaseName}"."${tableName}"
WHERE value.att[1].can='store/add'
AND day > (CURRENT_DATE - INTERVAL '2' DAY)
AND out.ok IS NOT NULL
Expand All @@ -274,7 +275,7 @@ WHERE value.att[1].can='store/add'
value.att[1].nb.root._cid_slash AS cid,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
FROM "AwsDataCatalog"."${databaseName}"."${tableName}"
WHERE value.att[1].can='upload/add'
AND day > (CURRENT_DATE - INTERVAL '7' DAY)
AND out.ok IS NOT NULL
Expand All @@ -284,21 +285,60 @@ ORDER BY ts
})
uploadsQuery.addDependsOn(glueTable)

// configure the Athena Dynamo connector

const athenaDynamoSpillBucket = new Bucket(stack, 'athena-dynamo-spill', {
cors: true,
cdk: {
bucket: {
...getBucketConfig('athena-dynamo-spill', app.stage),
lifecycleRules: [{
enabled: true,
expiration: Duration.days(1)
}]
}
}
})

const dynamoAthenaLambdaName = getCdkNames('dynamo-athena', app.stage)
const athenaDynamoConnector = new aws_sam.CfnApplication(stack, getCdkNames('athena-dynamo-connector', app.stage), {
location: {
applicationId: 'arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaDynamoDBConnector',
semanticVersion: '2023.38.1'
},
parameters: {
AthenaCatalogName: dynamoAthenaLambdaName,
SpillBucket: athenaDynamoSpillBucket.bucketName
}
})

const dynamoDataCatalogName = getCdkNames('dynamo-data-catalog', app.stage)
const dynamoDataCatalogDatabaseName = getCdkNames('dynamo', app.stage)
const dynamoDataCatalog = new athena.CfnDataCatalog(stack, dynamoDataCatalogName, {
name: dynamoDataCatalogDatabaseName,
type: 'LAMBDA',
parameters: {
function: `arn:aws:lambda:${stack.region}:${stack.account}:function:${dynamoAthenaLambdaName}`
}
})
dynamoDataCatalog.addDependsOn(athenaDynamoConnector)

// queries that depend on the Athena Dynamo connector

const spacesByAccountQueryName = getCdkNames('spaces-by-account-query', app.stage)
const spacesByAccountQuery = new athena.CfnNamedQuery(stack, spacesByAccountQueryName, {
name: "Spaces by account",
description: "(w3up preloaded)",
database: databaseName,
queryString: `SELECT
value.att[1].nb.consumer AS space,
value.att[1]."with" AS account
FROM "${tableName}"
WHERE value.att[1].can='provider/add'
AND out.ok IS NOT NULL
AND type='receipt'
queryString: `SELECT
customer as account,
consumer as space
FROM "${dynamoDataCatalogDatabaseName}"."default"."${app.stage}-w3infra-subscription" AS sub
LEFT JOIN "${dynamoDataCatalogDatabaseName}"."default"."${app.stage}-w3infra-consumer" AS space
ON space.subscription = sub.subscription
`
})
spacesByAccountQuery.addDependsOn(glueTable)
spacesByAccountQuery.addDependsOn(dynamoDataCatalog)

const uploadsByAccountQueryName = getCdkNames('uploads-by-account-query', app.stage)
const uploadsByAccountQuery = new athena.CfnNamedQuery(stack, uploadsByAccountQueryName, {
Expand All @@ -307,18 +347,17 @@ WHERE value.att[1].can='provider/add'
database: databaseName,
queryString: `WITH
spaces AS (
SELECT value.att[1].nb.consumer AS did,
value.att[1]."with" AS account
FROM "${tableName}"
WHERE value.att[1].can='provider/add'
AND out.ok IS NOT NULL
AND type='receipt'
SELECT customer as account,
consumer as did
FROM "${dynamoDataCatalogDatabaseName}"."default"."${app.stage}-w3infra-subscription" AS sub
LEFT JOIN "${dynamoDataCatalogDatabaseName}"."default"."${app.stage}-w3infra-consumer" AS space
ON space.subscription = sub.subscription
),
uploads AS (
SELECT value.att[1].nb.root._cid_slash AS cid,
SELECT value.att[1].nb.root._cid_slash AS cid,
value.att[1]."with" AS space,
ts
FROM "${tableName}"
FROM "AwsDataCatalog"."${databaseName}"."${tableName}"
WHERE value.att[1].can='upload/add'
AND out.ok IS NOT NULL
AND type='receipt'
Expand All @@ -335,5 +374,5 @@ uploads_by_account AS (
`
})
uploadsByAccountQuery.addDependsOn(glueTable)

uploadsByAccountQuery.addDependsOn(dynamoDataCatalog)
}

0 comments on commit 37a476a

Please sign in to comment.