Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kinesis log stream #86

Merged
merged 5 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions carpark/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.216.0",
"@aws-sdk/client-sqs": "^3.213.0",
"@aws-sdk/client-eventbridge": "^3.226.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0"
},
"devDependencies": {
Expand Down
12,608 changes: 4,726 additions & 7,882 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"rules": {
"unicorn/prefer-number-properties": "off",
"unicorn/no-null": "off",
"unicorn/no-array-callback-reference": "off",
"unicorn/no-array-for-each": "off",
"unicorn/no-await-expression-member": "off",
"unicorn/prefer-set-has": "off",
Expand Down
2 changes: 1 addition & 1 deletion replicator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-s3": "^3.226.0",
"@sentry/serverless": "^7.22.0"
},
"devDependencies": {
Expand Down
4 changes: 2 additions & 2 deletions satnav/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-eventbridge": "^3.218.0",
"@aws-sdk/client-s3": "^3.226.0",
"@aws-sdk/client-eventbridge": "^3.226.0",
"@sentry/serverless": "^7.22.0",
"@ipld/car": "^5.0.1",
"cardex": "^1.0.0",
Expand Down
19 changes: 17 additions & 2 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import {
Bucket,
Function,
KinesisStream,
use
} from '@serverless-stack/resources'
import { Duration } from 'aws-cdk-lib'

import { BusStack } from './bus-stack.js'
import { getConfig, setupSentry } from './config.js'
Expand Down Expand Up @@ -44,7 +46,20 @@ export function UcanInvocationStack({ stack, app }) {
}
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
cdk: {
stream: {
retentionPeriod: Duration.days(365)
}
},
consumers: {
// consumer1: 'functions/consumer1.handler'
},
})

return {
ucanBucket
ucanBucket,
ucanStream
}
}
}
5 changes: 3 additions & 2 deletions stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function UploadApiStack({ stack, app }) {
// Get references to constructs created in other stacks
const { carparkBucket } = use(CarparkStack)
const { storeTable, uploadTable } = use(UploadDbStack)
const { ucanBucket } = use(UcanInvocationStack)
const { ucanBucket, ucanStream } = use(UcanInvocationStack)

// Setup API
const customDomain = getCustomDomain(stack.stage, process.env.HOSTED_ZONE)
Expand All @@ -35,12 +35,13 @@ export function UploadApiStack({ stack, app }) {
customDomain,
defaults: {
function: {
permissions: [storeTable, uploadTable, carparkBucket, ucanBucket],
permissions: [storeTable, uploadTable, carparkBucket, ucanBucket, ucanStream],
environment: {
STORE_TABLE_NAME: storeTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
UPLOAD_TABLE_NAME: uploadTable.tableName,
UCAN_BUCKET_NAME: ucanBucket.bucketName,
UCAN_LOG_STREAM_NAME: ucanStream.streamName,
NAME: pkg.name,
VERSION: pkg.version,
COMMIT: git.commmit,
Expand Down
3 changes: 2 additions & 1 deletion ucan-invocation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
},
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.218.0",
"@sentry/serverless": "^7.22.0"
"@sentry/serverless": "^7.22.0",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@serverless-stack/resources": "*",
Expand Down
24 changes: 22 additions & 2 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { DID } from '@ucanto/core'
import { Kinesis } from '@aws-sdk/client-kinesis'
import * as Sentry from '@sentry/serverless'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'

import { createAccessClient } from '../access.js'
import { persistUcanInvocation } from '../ucan-invocation.js'
import { parseUcanInvocationRequest, persistUcanInvocation } from '../ucan-invocation.js'
import { createCarStore } from '../buckets/car-store.js'
import { createDudewhereStore } from '../buckets/dudewhere-store.js'
import { createUcanStore } from '../buckets/ucan-store.js'
Expand All @@ -16,6 +18,7 @@ Sentry.AWSLambda.init({
tracesSampleRate: 1.0,
})

const kinesisClient = new Kinesis({})
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

// Specified in SST environment
Expand All @@ -40,6 +43,7 @@ async function ucanInvocationRouter (request) {
STORE_BUCKET_NAME: storeBucketName = '',
UPLOAD_TABLE_NAME: uploadTableName = '',
UCAN_BUCKET_NAME: ucanBucketName = '',
UCAN_LOG_STREAM_NAME: ucanLogStreamName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
ACCESS_SERVICE_DID: accessServiceDID = '',
Expand Down Expand Up @@ -82,8 +86,24 @@ async function ucanInvocationRouter (request) {
body: Buffer.from(request.body, 'base64'),
})

const ucanInvocation = await parseUcanInvocationRequest(request)

// persist successful invocation handled
await persistUcanInvocation(request, ucanStoreBucket)
await persistUcanInvocation(ucanInvocation, ucanStoreBucket)

// Put invocation to UCAN stream
await kinesisClient.putRecord({
Data: uint8arrayFromString(JSON.stringify({
carCid: ucanInvocation.carCid,
value: ucanInvocation.value,
ts: Date.now()
})),
// https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html
// A partition key is used to group data by shard within a stream.
// It is required, and now we are starting with one shard. We need to study best partition key
PartitionKey: 'key',
StreamName: ucanLogStreamName,
})

return toLambdaSuccessResponse(response)
}
Expand Down
13 changes: 8 additions & 5 deletions upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
"test": "ava --node-arguments='--experimental-fetch' --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.211.0",
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/s3-request-presigner": "^3.224.0",
"@aws-sdk/util-dynamodb": "^3.211.0",
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-s3": "^3.226.0",
"@aws-sdk/client-kinesis": "^3.226.0",
"@aws-sdk/s3-request-presigner": "^3.226.0",
"@aws-sdk/util-dynamodb": "^3.226.0",
"@ipld/dag-ucan": "3.0.1",
"@sentry/serverless": "^7.22.0",
"@serverless-stack/node": "^1.18.2",
"@ucanto/client": "^4.0.2",
Expand All @@ -21,7 +23,8 @@
"@web3-storage/access": "^9.0.0",
"@web3-storage/capabilities": "^2.0.0",
"multiformats": "^10.0.2",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@ipld/car": "^5.0.1",
Expand Down
2 changes: 1 addition & 1 deletion upload-api/service/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function createServiceRouter (context) {
* @param {import('@ucanto/interface').Signer} serviceSigner
* @param {import('../service/types').UcantoServerContext} context
*/
export async function createUcantoServer (serviceSigner, context) {
export async function createUcantoServer (serviceSigner, context) {
const server = Server.create({
id: serviceSigner,
encoder: CBOR,
Expand Down
6 changes: 6 additions & 0 deletions upload-api/service/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
} from '@ucanto/interface'
import type { API } from '@ucanto/server'

import { ToString, UnknownLink } from 'multiformats'

import {
StoreAdd,
StoreRemove,
Expand Down Expand Up @@ -130,6 +132,10 @@ export interface AccessClient {
verifyInvocation: (invocation: Invocation) => Promise<boolean>
}

export interface LinkJSON<T extends UnknownLink = UnknownLink> {
'/': ToString<T>
}

// would be generated by sst, but requires `sst build` to be run, which calls out to aws; not great for CI
declare module "@serverless-stack/node/config" {
export interface SecretResources {
Expand Down
47 changes: 45 additions & 2 deletions upload-api/test/service/ucan-invocation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import * as UCAN from '@ipld/dag-ucan'

import { createSpace } from '../helpers/ucanto.js'
import { createS3, createBucket } from '../helpers/resources.js'
import { randomCAR } from '../helpers/random.js'

import { createUcanStore } from '../../buckets/ucan-store.js'
import { parseUcanInvocationRequest, persistUcanInvocation } from '../../ucan-invocation.js'
import { parseUcanInvocationRequest, persistUcanInvocation, replaceAllLinkValues } from '../../ucan-invocation.js'

test.before(async t => {
const { client: s3Client, clientOpts: s3ClientOpts } = await createS3({ port: 9000 })
Expand Down Expand Up @@ -93,7 +94,8 @@ test('persists ucan invocation CAR file', async t => {
])

// @ts-expect-error different type interface in AWS expected request
await persistUcanInvocation(request, ucanStore)
const ucanInvocationObject = await parseUcanInvocationRequest(request)
await persistUcanInvocation(ucanInvocationObject, ucanStore)

const requestCar = await CAR.codec.decode(request.body)
const requestCarRootCid = requestCar.roots[0].cid.toString()
Expand Down Expand Up @@ -126,6 +128,46 @@ test('persists ucan invocation CAR file', async t => {
})
})

test('replace all link values as object and array', async t => {
const car = await randomCAR(128)
const otherCar = await randomCAR(40)

// invoke a upload/add with proof
const root = car.roots[0]
const shards = [car.cid, otherCar.cid].sort()

const att = [
{
nb: {
link: root,
size: car.size
},
can: 'store/add',
with: 'did:key:z6MkfTDbhRZz26kcDNmmehPxeujSkbXe8jqv5fLpKvtc3Wcv',
},
{
nb: {
root,
shards: [...shards]
},
can: 'upload/add',
with: 'did:key:z6MkfTDbhRZz26kcDNmmehPxeujSkbXe8jqv5fLpKvtc3Wcv'
},
]

att.map(replaceAllLinkValues)

// Object with Link
// @ts-expect-error Property '/' does not exist on type 'Link<Partial<Model>
t.is(att[0].nb.link['/'], root.toString())
// @ts-expect-error Property '/' does not exist on type 'Link<Partial<Model>
t.is(att[1].nb.root['/'], root.toString())

// Array with Link
// @ts-expect-error Property '/' does not exist on type 'Link<Partial<Model>
t.deepEqual(att[1].nb.shards?.map(s => s['/']), shards.map(s => s.toString()))
})

/**
* @param {import("@aws-sdk/client-s3").S3Client} s3Client
*/
Expand All @@ -136,3 +178,4 @@ async function prepareResources (s3Client) {
bucketName
}
}

74 changes: 68 additions & 6 deletions upload-api/ucan-invocation.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
import * as CAR from '@ucanto/transport/car'
import * as UCAN from '@ipld/dag-ucan'
import * as Link from 'multiformats/link'

/**
* @typedef {object} UcanInvocation
* @property {UCAN.Capabilities} att
* @property {`did:${string}:${string}`} aud
* @property {`did:${string}:${string}`} iss
*
* @typedef {object} UcanInvocationWrapper
* @property {string} carCid
* @property {Uint8Array} bytes
* @property {UcanInvocation} value
*/

/**
* Persist successful UCAN invocations handled by the router.
*
* @param {import('aws-lambda').APIGatewayProxyEventV2} request
* @param {UcanInvocationWrapper} ucanInvocation
* @param {import('./service/types').UcanBucket} ucanStore
*/
export async function persistUcanInvocation (request, ucanStore) {
const { carCid, bytes } = await parseUcanInvocationRequest(request)

await ucanStore.put(carCid, bytes)
export async function persistUcanInvocation (ucanInvocation, ucanStore) {
await ucanStore.put(ucanInvocation.carCid, ucanInvocation.bytes)
}

/**
* @param {import('aws-lambda').APIGatewayProxyEventV2} request
* @returns {Promise<UcanInvocationWrapper>}
*/
export async function parseUcanInvocationRequest (request) {
if (!request.body) {
Expand All @@ -24,8 +37,57 @@ export async function parseUcanInvocationRequest (request) {
const car = await CAR.codec.decode(bytes)
const carCid = car.roots[0].cid.toString()

// @ts-expect-error 'ByteView<unknown>' is not assignable to parameter of type 'ByteView<UCAN<Capabilities>>'
const dagUcan = UCAN.decode(car.roots[0].bytes)

return {
bytes,
carCid
carCid,
value: {
// Workaround for:
// https://github.com/web3-storage/ucanto/issues/171
// https://github.com/multiformats/js-multiformats/issues/228
// @ts-ignore missing types
att: dagUcan.att.map(replaceAllLinkValues),
Comment on lines +47 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on proposal there.

aud: dagUcan.aud.did(),
iss: dagUcan.iss.did(),
prf: replaceAllLinkValues(dagUcan.prf)
}
}
}

/**
* @param {any} value
*/
export const replaceAllLinkValues = (value) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked into npm modules available, but could not get something useful for it. If there are suggestions of modules you know that could help please let me know.

This small function goes through the Object through all its depth and replaces all Link instances by a JSON based on proposal storacha/ucanto#171 + multiformats/js-multiformats#228

Once this lands in multiformats, we will be able to drop this adapter function.

// Array with Links?
if (Array.isArray(value)) {
for (let i = 0; i < value.length; i++) {
if (Link.isLink(value[i])) {
value[i] = toJSON(value[i])
} else {
replaceAllLinkValues(value[i])
}
}
}
// Object with Links?
else if (typeof value === 'object') {
for (const key of Object.keys(value)) {
if (Link.isLink(value[key])) {
value[key] = toJSON(value[key])
}
replaceAllLinkValues(value[key])
}
}

return value
}

/**
* @template {import('multiformats').UnknownLink} Link
* @param {Link} link
*/
export const toJSON = link =>
/** @type {import('./service/types').LinkJSON<Link>} */ ({
'/': link.toString(),
})