Skip to content

Commit

Permalink
feat: add kinesis log stream
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Dec 8, 2022
1 parent ae5f7a9 commit 6cd83e2
Show file tree
Hide file tree
Showing 13 changed files with 14,930 additions and 13,196 deletions.
15 changes: 14 additions & 1 deletion api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { DID } from '@ucanto/core'
import { Kinesis } from '@aws-sdk/client-kinesis'
import * as Sentry from '@sentry/serverless'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'

import { createAccessClient } from '../access.js'
import { createSigner } from '../signer.js'
Expand All @@ -8,13 +10,15 @@ import { createDudewhereStore } from '../buckets/dudewhere-store.js'
import { createStoreTable } from '../tables/store.js'
import { createUploadTable } from '../tables/upload.js'
import { getServiceSigner } from '../config.js'
import { createUcantoServer } from '../service/index.js'
import { createUcantoServer, createUcanLogObject } from '../service/index.js'

Sentry.AWSLambda.init({
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const kinesisClient = new Kinesis({})

const AWS_ACCESS_KEY_ID = process.env.AWS_ACCESS_KEY_ID || ''
const AWS_SECRET_ACCESS_KEY = process.env.AWS_SECRET_ACCESS_KEY || ''
const AWS_SESSION_TOKEN = process.env.AWS_SESSION_TOKEN || ''
Expand All @@ -41,6 +45,7 @@ async function ucanInvocationRouter (request) {
STORE_TABLE_NAME: storeTableName = '',
STORE_BUCKET_NAME: storeBucketName = '',
UPLOAD_TABLE_NAME: uploadTableName = '',
UCAN_LOG_STREAM_NAME: ucanLogStreamName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
ACCESS_SERVICE_DID: accessServiceDID = '',
Expand Down Expand Up @@ -88,6 +93,14 @@ async function ucanInvocationRouter (request) {
body: Buffer.from(request.body, 'base64'),
})

// Put invocation
const ucanLogObject = await createUcanLogObject(request)
await kinesisClient.putRecord({
Data: uint8arrayFromString(JSON.stringify(ucanLogObject)),
PartitionKey: 'key',
StreamName: ucanLogStreamName,
})

return toLambdaSuccessResponse(response)
}

Expand Down
12 changes: 8 additions & 4 deletions api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.211.0",
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/util-dynamodb": "^3.211.0",
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-s3": "^3.226.0",
"@aws-sdk/client-kinesis": "^3.226.0",
"@aws-sdk/util-dynamodb": "^3.226.0",
"@ipld/dag-json": "9.0.1",
"@ipld/dag-ucan": "3.0.1",
"@sentry/serverless": "^7.22.0",
"@serverless-stack/node": "^1.18.2",
"@ucanto/interface": "^3.0.1",
Expand All @@ -20,7 +23,8 @@
"@web3-storage/capabilities": "^1.0.0",
"@web3-storage/sigv4": "^1.0.2",
"multiformats": "^10.0.2",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@ipld/car": "^5.0.1",
Expand Down
30 changes: 28 additions & 2 deletions api/service/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as Server from '@ucanto/server'
import * as CAR from '@ucanto/transport/car'
import * as CBOR from '@ucanto/transport/cbor'
import * as DAGJSON from '@ipld/dag-json'
import { toString } from 'uint8arrays/to-string'

import { createStoreService } from './store/index.js'
import { createUploadService } from './upload/index.js'
Expand All @@ -20,7 +22,7 @@ export function createServiceRouter (context) {
* @param {import('@ucanto/interface').Signer} serviceSigner
* @param {import('../service/types').UcantoServerContext} context
*/
export async function createUcantoServer (serviceSigner, context) {
export async function createUcantoServer (serviceSigner, context) {
const server = Server.create({
id: serviceSigner,
encoder: CBOR,
Expand All @@ -33,4 +35,28 @@ export function createServiceRouter (context) {
})

return server
}
}

/**
* @param {import('aws-lambda').APIGatewayProxyEventV2} request
*/
export async function createUcanLogObject (request) {
if (!request.body) {
throw new Error('service requests are required to have body')
}

// TODO: we should be able to get this from a ucantoServer callback
// we only need to do a DAGJSON.encode(cbor)
const bytes = Buffer.from(request.body, 'base64')
const car = await CAR.codec.decode(bytes)
const root = car.roots[0].cid.toString()

const cbor = CBOR.codec.decode(car.roots[0].bytes)
const jsonEncoded = DAGJSON.encode(cbor)

return {
root,
bytes: request.body,
value: toString(jsonEncoded)
}
}
75 changes: 75 additions & 0 deletions api/test/service/ucan-log-object.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { test } from '../helpers/context.js'
import { createUcanLogObject } from '../../service/index.js'

import * as Signer from '@ucanto/principal/ed25519'
import { CAR, CBOR } from '@ucanto/transport'
import * as DAGJSON from '@ipld/dag-json'
import * as UCAN from '@ipld/dag-ucan'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'

import { createSpace } from '../helpers/ucanto.js'

test('writes invocation to ucan log table', async t => {
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { proof, spaceDid } = await createSpace(alice)

const data = new Uint8Array([11, 22, 34, 44, 55])
const link = await CAR.codec.link(data)
const nb = { link, size: data.byteLength }
const can = 'store/add'

const request = await CAR.encode([
{
issuer: alice,
audience: uploadService,
capabilities: [{
can,
with: spaceDid,
nb
}],
proofs: [proof],
}
])


// @ts-expect-error different type interface in AWS expected request
const ucanLogObj = await createUcanLogObject(request)

const requestCar = await CAR.codec.decode(request.body)
const requestCarRootCid = requestCar.roots[0].cid

t.is(ucanLogObj.root, requestCarRootCid.toString())
t.truthy(ucanLogObj.bytes)
t.truthy(ucanLogObj.value)

// Decode and validate bytes
const streamCar = await CAR.codec.decode(uint8arrayFromString(ucanLogObj.bytes))
// @ts-expect-error UCAN.View<UCAN.Capabilities> inferred as UCAN.View<unknown>
const ucan = UCAN.decode(streamCar.roots[0].bytes)

t.is(ucan.iss.did(), alice.did())
t.is(ucan.aud.did(), uploadService.did())
t.deepEqual(ucan.prf, [proof.root.cid])
t.is(ucan.att.length, 1)
t.like(ucan.att[0], {
nb,
can,
with: spaceDid,
})

// Decode and validate value
const jsonDecodedUcan = DAGJSON.decode(uint8arrayFromString(ucanLogObj.value))
const cborEncoded = CBOR.codec.encode(jsonDecodedUcan)
const ucanFromJson = UCAN.decode(cborEncoded)

t.is(ucanFromJson.iss.did(), alice.did())
t.is(ucanFromJson.aud.did(), uploadService.did())
t.deepEqual(ucanFromJson.prf, [proof.root.cid])
t.is(ucanFromJson.att.length, 1)
t.like(ucanFromJson.att[0], {
nb,
can,
with: spaceDid,
})
})
4 changes: 2 additions & 2 deletions carpark/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.216.0",
"@aws-sdk/client-sqs": "^3.213.0",
"@aws-sdk/client-eventbridge": "^3.226.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 6cd83e2

Please sign in to comment.