Skip to content

Commit

Permalink
feat: add kinesis log stream (#86)
Browse files Browse the repository at this point in the history
Adds kinesis ucan log stream to ucanto service. Once a UCAN invocation
is handled by the service, it is sent to Amazon Kinesis data streams for
post processing (JSON with invocation CID, invocation bytes, and decoded
invocation).

Kinesis log stream has its own stack named `UcanStreamStack` which will
include resources needed for post processing of ucan stream ops.
`ApiStack` depends on `UcanStreamStack` given it will use its stream, as
well as its data further down the line to get content like user facing
stats

Per https://www.notion.so/UCAN-LOG-0f3870fc4b404f5cbf646bf16b463365

Implementation details:
- Invocation view content
- `{ carCid: string, value: { att: UCAN.Capabilities, aud:
'did:${string}:${string}', iss: 'did:${string}:${string}' } }`
- having att, audience and issuer should be enough for all the
operations we intend to perform. Skipped `prf`, `exp`, `nbf`, `fct`,
`nnc`, `v`, and `signature`.
  - see format in comment below

Other notes:
- SST Kinesis guide:
https://sst.dev/examples/how-to-use-kinesis-data-streams-in-your-serverless-app.html
- we are configuring 365 days for now
https://docs.aws.amazon.com/cdk/api/v1/docs/aws-kinesis-readme.html#streams
- aws related deps updated to use same everywhere
- we are currently doing redundant encodings/decodings that could be
avoided if we do storacha-network/ucanto#169
- Follow up PRs will be created with consumers:
  - data aggregation lambda for user facing stats
  - dynamoDB
  - ...
- NOTE: they can start from before as we talked
https://docs.aws.amazon.com/cdk/api/v1/docs/@aws-cdk_aws-lambda-event-sources.KinesisEventSourceProps.html#properties
- we need to define a partition key, but we do not need to commit now to
one. Only by the time we want more shards for scaling up

Follow ups:
- #98

Co-authored-by: Alan Shaw <[email protected]>
  • Loading branch information
vasco-santos and alanshaw committed Dec 13, 2022
1 parent 036e837 commit 3393ab2
Show file tree
Hide file tree
Showing 14 changed files with 4,904 additions and 7,908 deletions.
4 changes: 2 additions & 2 deletions carpark/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.216.0",
"@aws-sdk/client-sqs": "^3.213.0",
"@aws-sdk/client-eventbridge": "^3.226.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0"
},
"devDependencies": {
Expand Down
12,608 changes: 4,726 additions & 7,882 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"rules": {
"unicorn/prefer-number-properties": "off",
"unicorn/no-null": "off",
"unicorn/no-array-callback-reference": "off",
"unicorn/no-array-for-each": "off",
"unicorn/no-await-expression-member": "off",
"unicorn/prefer-set-has": "off",
Expand Down
2 changes: 1 addition & 1 deletion replicator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-s3": "^3.226.0",
"@sentry/serverless": "^7.22.0"
},
"devDependencies": {
Expand Down
4 changes: 2 additions & 2 deletions satnav/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-eventbridge": "^3.218.0",
"@aws-sdk/client-s3": "^3.226.0",
"@aws-sdk/client-eventbridge": "^3.226.0",
"@sentry/serverless": "^7.22.0",
"@ipld/car": "^5.0.1",
"cardex": "^1.0.0",
Expand Down
19 changes: 17 additions & 2 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import {
Bucket,
Function,
KinesisStream,
use
} from '@serverless-stack/resources'
import { Duration } from 'aws-cdk-lib'

import { BusStack } from './bus-stack.js'
import { getConfig, setupSentry } from './config.js'
Expand Down Expand Up @@ -44,7 +46,20 @@ export function UcanInvocationStack({ stack, app }) {
}
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
cdk: {
stream: {
retentionPeriod: Duration.days(365)
}
},
consumers: {
// consumer1: 'functions/consumer1.handler'
},
})

return {
ucanBucket
ucanBucket,
ucanStream
}
}
}
5 changes: 3 additions & 2 deletions stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function UploadApiStack({ stack, app }) {
// Get references to constructs created in other stacks
const { carparkBucket } = use(CarparkStack)
const { storeTable, uploadTable } = use(UploadDbStack)
const { ucanBucket } = use(UcanInvocationStack)
const { ucanBucket, ucanStream } = use(UcanInvocationStack)

// Setup API
const customDomain = getCustomDomain(stack.stage, process.env.HOSTED_ZONE)
Expand All @@ -35,12 +35,13 @@ export function UploadApiStack({ stack, app }) {
customDomain,
defaults: {
function: {
permissions: [storeTable, uploadTable, carparkBucket, ucanBucket],
permissions: [storeTable, uploadTable, carparkBucket, ucanBucket, ucanStream],
environment: {
STORE_TABLE_NAME: storeTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
UPLOAD_TABLE_NAME: uploadTable.tableName,
UCAN_BUCKET_NAME: ucanBucket.bucketName,
UCAN_LOG_STREAM_NAME: ucanStream.streamName,
NAME: pkg.name,
VERSION: pkg.version,
COMMIT: git.commmit,
Expand Down
3 changes: 2 additions & 1 deletion ucan-invocation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
},
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.218.0",
"@sentry/serverless": "^7.22.0"
"@sentry/serverless": "^7.22.0",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@serverless-stack/resources": "*",
Expand Down
24 changes: 22 additions & 2 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { DID } from '@ucanto/core'
import { Kinesis } from '@aws-sdk/client-kinesis'
import * as Sentry from '@sentry/serverless'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'

import { createAccessClient } from '../access.js'
import { persistUcanInvocation } from '../ucan-invocation.js'
import { parseUcanInvocationRequest, persistUcanInvocation } from '../ucan-invocation.js'
import { createCarStore } from '../buckets/car-store.js'
import { createDudewhereStore } from '../buckets/dudewhere-store.js'
import { createUcanStore } from '../buckets/ucan-store.js'
Expand All @@ -17,6 +19,7 @@ Sentry.AWSLambda.init({
tracesSampleRate: 1.0,
})

const kinesisClient = new Kinesis({})
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

// Specified in SST environment
Expand All @@ -41,6 +44,7 @@ async function ucanInvocationRouter (request) {
STORE_BUCKET_NAME: storeBucketName = '',
UPLOAD_TABLE_NAME: uploadTableName = '',
UCAN_BUCKET_NAME: ucanBucketName = '',
UCAN_LOG_STREAM_NAME: ucanLogStreamName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
ACCESS_SERVICE_DID: accessServiceDID = '',
Expand Down Expand Up @@ -85,8 +89,24 @@ async function ucanInvocationRouter (request) {
body: Buffer.from(request.body, 'base64'),
})

const ucanInvocation = await parseUcanInvocationRequest(request)

// persist successful invocation handled
await persistUcanInvocation(request, ucanStoreBucket)
await persistUcanInvocation(ucanInvocation, ucanStoreBucket)

// Put invocation to UCAN stream
await kinesisClient.putRecord({
Data: uint8arrayFromString(JSON.stringify({
carCid: ucanInvocation.carCid,
value: ucanInvocation.value,
ts: Date.now()
})),
// https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html
// A partition key is used to group data by shard within a stream.
// It is required, and now we are starting with one shard. We need to study best partition key
PartitionKey: 'key',
StreamName: ucanLogStreamName,
})

return toLambdaSuccessResponse(response)
}
Expand Down
13 changes: 8 additions & 5 deletions upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
"test": "ava --node-arguments='--experimental-fetch' --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.211.0",
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/s3-request-presigner": "^3.224.0",
"@aws-sdk/util-dynamodb": "^3.211.0",
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-s3": "^3.226.0",
"@aws-sdk/client-kinesis": "^3.226.0",
"@aws-sdk/s3-request-presigner": "^3.226.0",
"@aws-sdk/util-dynamodb": "^3.226.0",
"@ipld/dag-ucan": "3.0.1",
"@sentry/serverless": "^7.22.0",
"@serverless-stack/node": "^1.18.2",
"@ucanto/client": "^4.0.2",
Expand All @@ -21,7 +23,8 @@
"@web3-storage/access": "^9.0.0",
"@web3-storage/capabilities": "^2.0.0",
"multiformats": "^10.0.2",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@ipld/car": "^5.0.1",
Expand Down
2 changes: 1 addition & 1 deletion upload-api/service/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function createServiceRouter (context) {
* @param {import('@ucanto/interface').Signer} serviceSigner
* @param {import('../service/types').UcantoServerContext} context
*/
export async function createUcantoServer (serviceSigner, context) {
export async function createUcantoServer (serviceSigner, context) {
const server = Server.create({
id: serviceSigner,
encoder: CBOR,
Expand Down
6 changes: 6 additions & 0 deletions upload-api/service/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
} from '@ucanto/interface'
import type { API } from '@ucanto/server'

import { ToString, UnknownLink } from 'multiformats'

import {
StoreAdd,
StoreRemove,
Expand Down Expand Up @@ -130,6 +132,10 @@ export interface AccessClient {
verifyInvocation: (invocation: Invocation) => Promise<boolean>
}

export interface LinkJSON<T extends UnknownLink = UnknownLink> {
'/': ToString<T>
}

// would be generated by sst, but requires `sst build` to be run, which calls out to aws; not great for CI
declare module "@serverless-stack/node/config" {
export interface SecretResources {
Expand Down
47 changes: 45 additions & 2 deletions upload-api/test/service/ucan-invocation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import * as UCAN from '@ipld/dag-ucan'

import { createSpace } from '../helpers/ucanto.js'
import { createS3, createBucket } from '../helpers/resources.js'
import { randomCAR } from '../helpers/random.js'

import { createUcanStore } from '../../buckets/ucan-store.js'
import { parseUcanInvocationRequest, persistUcanInvocation } from '../../ucan-invocation.js'
import { parseUcanInvocationRequest, persistUcanInvocation, replaceAllLinkValues } from '../../ucan-invocation.js'

test.before(async t => {
const { client: s3Client, clientOpts: s3ClientOpts } = await createS3({ port: 9000 })
Expand Down Expand Up @@ -93,7 +94,8 @@ test('persists ucan invocation CAR file', async t => {
])

// @ts-expect-error different type interface in AWS expected request
await persistUcanInvocation(request, ucanStore)
const ucanInvocationObject = await parseUcanInvocationRequest(request)
await persistUcanInvocation(ucanInvocationObject, ucanStore)

const requestCar = await CAR.codec.decode(request.body)
const requestCarRootCid = requestCar.roots[0].cid.toString()
Expand Down Expand Up @@ -126,6 +128,46 @@ test('persists ucan invocation CAR file', async t => {
})
})

test('replace all link values as object and array', async t => {
const car = await randomCAR(128)
const otherCar = await randomCAR(40)

// invoke a upload/add with proof
const root = car.roots[0]
const shards = [car.cid, otherCar.cid].sort()

const att = [
{
nb: {
link: root,
size: car.size
},
can: 'store/add',
with: 'did:key:z6MkfTDbhRZz26kcDNmmehPxeujSkbXe8jqv5fLpKvtc3Wcv',
},
{
nb: {
root,
shards: [...shards]
},
can: 'upload/add',
with: 'did:key:z6MkfTDbhRZz26kcDNmmehPxeujSkbXe8jqv5fLpKvtc3Wcv'
},
]

att.map(replaceAllLinkValues)

// Object with Link
// @ts-expect-error Property '/' does not exist on type 'Link<Partial<Model>
t.is(att[0].nb.link['/'], root.toString())
// @ts-expect-error Property '/' does not exist on type 'Link<Partial<Model>
t.is(att[1].nb.root['/'], root.toString())

// Array with Link
// @ts-expect-error Property '/' does not exist on type 'Link<Partial<Model>
t.deepEqual(att[1].nb.shards?.map(s => s['/']), shards.map(s => s.toString()))
})

/**
* @param {import("@aws-sdk/client-s3").S3Client} s3Client
*/
Expand All @@ -136,3 +178,4 @@ async function prepareResources (s3Client) {
bucketName
}
}

74 changes: 68 additions & 6 deletions upload-api/ucan-invocation.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
import * as CAR from '@ucanto/transport/car'
import * as UCAN from '@ipld/dag-ucan'
import * as Link from 'multiformats/link'

/**
* @typedef {object} UcanInvocation
* @property {UCAN.Capabilities} att
* @property {`did:${string}:${string}`} aud
* @property {`did:${string}:${string}`} iss
*
* @typedef {object} UcanInvocationWrapper
* @property {string} carCid
* @property {Uint8Array} bytes
* @property {UcanInvocation} value
*/

/**
* Persist successful UCAN invocations handled by the router.
*
* @param {import('aws-lambda').APIGatewayProxyEventV2} request
* @param {UcanInvocationWrapper} ucanInvocation
* @param {import('./service/types').UcanBucket} ucanStore
*/
export async function persistUcanInvocation (request, ucanStore) {
const { carCid, bytes } = await parseUcanInvocationRequest(request)

await ucanStore.put(carCid, bytes)
export async function persistUcanInvocation (ucanInvocation, ucanStore) {
await ucanStore.put(ucanInvocation.carCid, ucanInvocation.bytes)
}

/**
* @param {import('aws-lambda').APIGatewayProxyEventV2} request
* @returns {Promise<UcanInvocationWrapper>}
*/
export async function parseUcanInvocationRequest (request) {
if (!request.body) {
Expand All @@ -24,8 +37,57 @@ export async function parseUcanInvocationRequest (request) {
const car = await CAR.codec.decode(bytes)
const carCid = car.roots[0].cid.toString()

// @ts-expect-error 'ByteView<unknown>' is not assignable to parameter of type 'ByteView<UCAN<Capabilities>>'
const dagUcan = UCAN.decode(car.roots[0].bytes)

return {
bytes,
carCid
carCid,
value: {
// Workaround for:
// https://github.com/web3-storage/ucanto/issues/171
// https://github.com/multiformats/js-multiformats/issues/228
// @ts-ignore missing types
att: dagUcan.att.map(replaceAllLinkValues),
aud: dagUcan.aud.did(),
iss: dagUcan.iss.did(),
prf: replaceAllLinkValues(dagUcan.prf)
}
}
}

/**
* @param {any} value
*/
export const replaceAllLinkValues = (value) => {
// Array with Links?
if (Array.isArray(value)) {
for (let i = 0; i < value.length; i++) {
if (Link.isLink(value[i])) {
value[i] = toJSON(value[i])
} else {
replaceAllLinkValues(value[i])
}
}
}
// Object with Links?
else if (typeof value === 'object') {
for (const key of Object.keys(value)) {
if (Link.isLink(value[key])) {
value[key] = toJSON(value[key])
}
replaceAllLinkValues(value[key])
}
}

return value
}

/**
* @template {import('multiformats').UnknownLink} Link
* @param {Link} link
*/
export const toJSON = link =>
/** @type {import('./service/types').LinkJSON<Link>} */ ({
'/': link.toString(),
})

0 comments on commit 3393ab2

Please sign in to comment.