Skip to content

Commit

Permalink
feat: add kinesis log stream
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Dec 9, 2022
1 parent f4a9b03 commit 3e2f4b2
Show file tree
Hide file tree
Showing 12 changed files with 10,986 additions and 9,588 deletions.
25 changes: 23 additions & 2 deletions api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { DID } from '@ucanto/core'
import { Kinesis } from '@aws-sdk/client-kinesis'
import * as Sentry from '@sentry/serverless'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'

import { createAccessClient } from '../access.js'
import { persistUcanInvocation } from '../ucan-invocation.js'
import { parseUcanInvocationRequest, persistUcanInvocation } from '../ucan-invocation.js'
import { createCarStore } from '../buckets/car-store.js'
import { createDudewhereStore } from '../buckets/dudewhere-store.js'
import { createUcanStore } from '../buckets/ucan-store.js'
Expand All @@ -16,6 +18,7 @@ Sentry.AWSLambda.init({
tracesSampleRate: 1.0,
})

const kinesisClient = new Kinesis({})
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

// Specified in SST environment
Expand All @@ -40,6 +43,7 @@ async function ucanInvocationRouter (request) {
STORE_BUCKET_NAME: storeBucketName = '',
UPLOAD_TABLE_NAME: uploadTableName = '',
UCAN_BUCKET_NAME: ucanBucketName = '',
UCAN_LOG_STREAM_NAME: ucanLogStreamName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
ACCESS_SERVICE_DID: accessServiceDID = '',
Expand Down Expand Up @@ -82,8 +86,25 @@ async function ucanInvocationRouter (request) {
body: Buffer.from(request.body, 'base64'),
})

const ucanInvocation = await parseUcanInvocationRequest(request)

// persist successful invocation handled
await persistUcanInvocation(request, ucanStoreBucket)
await persistUcanInvocation(ucanInvocation, ucanStoreBucket)

console.log('send to kinesis', ucanInvocation)

// Put invocation to UCAN stream
await kinesisClient.putRecord({
Data: uint8arrayFromString(JSON.stringify({
carCid: ucanInvocation.carCid,
...ucanInvocation.value
})),
// https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html
// A partition key is used to group data by shard within a stream.
// It is required, and now we are starting with one shard. We need to study best partition key
PartitionKey: 'key',
StreamName: ucanLogStreamName,
})

return toLambdaSuccessResponse(response)
}
Expand Down
13 changes: 8 additions & 5 deletions api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
"test": "ava --node-arguments='--experimental-fetch' --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.211.0",
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/s3-request-presigner": "^3.224.0",
"@aws-sdk/util-dynamodb": "^3.211.0",
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-s3": "^3.226.0",
"@aws-sdk/client-kinesis": "^3.226.0",
"@aws-sdk/s3-request-presigner": "^3.226.0",
"@aws-sdk/util-dynamodb": "^3.226.0",
"@ipld/dag-ucan": "3.0.1",
"@sentry/serverless": "^7.22.0",
"@serverless-stack/node": "^1.18.2",
"@ucanto/client": "^4.0.2",
Expand All @@ -21,7 +23,8 @@
"@web3-storage/access": "^9.0.0",
"@web3-storage/capabilities": "^2.0.0",
"multiformats": "^10.0.2",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@ipld/car": "^5.0.1",
Expand Down
2 changes: 1 addition & 1 deletion api/service/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function createServiceRouter (context) {
* @param {import('@ucanto/interface').Signer} serviceSigner
* @param {import('../service/types').UcantoServerContext} context
*/
export async function createUcantoServer (serviceSigner, context) {
export async function createUcantoServer (serviceSigner, context) {
const server = Server.create({
id: serviceSigner,
encoder: CBOR,
Expand Down
3 changes: 2 additions & 1 deletion api/test/service/ucan-invocation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ test('persists ucan invocation CAR file', async t => {
])

// @ts-expect-error different type interface in AWS expected request
await persistUcanInvocation(request, ucanStore)
const ucanInvocationObject = await parseUcanInvocationRequest(request)
await persistUcanInvocation(ucanInvocationObject, ucanStore)

const requestCar = await CAR.codec.decode(request.body)
const requestCarRootCid = requestCar.roots[0].cid
Expand Down
33 changes: 27 additions & 6 deletions api/ucan-invocation.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import * as CAR from '@ucanto/transport/car'
import * as UCAN from '@ipld/dag-ucan'

/**
* @typedef {object} UcanInvocation
* @property {UCAN.Capabilities} att
* @property {`did:${string}:${string}`} aud
* @property {`did:${string}:${string}`} iss
*
* @typedef {object} UcanInvocationWrapper
* @property {string} carCid
* @property {Uint8Array} bytes
* @property {UcanInvocation} value
*/

/**
* Persist successful UCAN invocations handled by the router.
*
* @param {import('aws-lambda').APIGatewayProxyEventV2} request
* @param {UcanInvocationWrapper} ucanInvocation
* @param {import('./service/types').UcanBucket} ucanStore
*/
export async function persistUcanInvocation (request, ucanStore) {
const { carCid, bytes } = await parseUcanInvocationRequest(request)

await ucanStore.put(carCid, bytes)
export async function persistUcanInvocation (ucanInvocation, ucanStore) {
await ucanStore.put(ucanInvocation.carCid, ucanInvocation.bytes)
}

/**
* @param {import('aws-lambda').APIGatewayProxyEventV2} request
* @returns {Promise<UcanInvocationWrapper>}
*/
export async function parseUcanInvocationRequest (request) {
if (!request.body) {
Expand All @@ -24,8 +36,17 @@ export async function parseUcanInvocationRequest (request) {
const car = await CAR.codec.decode(bytes)
const carCid = car.roots[0].cid.toString()

// @ts-ignore
const dagUcan = UCAN.decode(car.roots[0].bytes)
// console.log('dagUcan', dagUcan)

return {
bytes,
carCid
carCid,
value: {
att: dagUcan.att,
aud: dagUcan.aud.did(),
iss: dagUcan.iss.did()
}
}
}
4 changes: 2 additions & 2 deletions carpark/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.216.0",
"@aws-sdk/client-sqs": "^3.213.0",
"@aws-sdk/client-eventbridge": "^3.226.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 3e2f4b2

Please sign in to comment.