Skip to content

Commit

Permalink
fix: use transport sniffing on incoming requests (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gozala authored May 8, 2023
1 parent 6470937 commit 6a19d2b
Showing 1 changed file with 40 additions and 7 deletions.
47 changes: 40 additions & 7 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { createUploadTable } from '../tables/upload.js'
import { getServiceSigner } from '../config.js'
import { createUcantoServer } from '../service.js'
import { Config } from '@serverless-stack/node/config/index.js'
import { CAR } from '@ucanto/transport'
import { CAR, Legacy, Codec } from '@ucanto/transport'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand All @@ -41,6 +41,28 @@ const R2_REGION = process.env.R2_REGION || 'auto'
const R2_DUDEWHERE_BUCKET_NAME = process.env.R2_DUDEWHERE_BUCKET_NAME || ''
const R2_ENDPOINT = process.env.R2_ENDPOINT || ``

/**
* We define a ucanto codec that will switch encoder / decoder based on the
* `content-type` and `accept` headers of the request.
*/
const codec = Codec.inbound({
decoders: {
// If the `content-type` is set to `application/vnd.ipld.car` use CAR codec.
[CAR.contentType]: CAR.request,
// If the `content-type` is set to `application/car` use legacy CAR codec
// which unlike current CAR codec used CAR roots to signal invocations.
[Legacy.contentType]: Legacy.request,
},
encoders: {
// Legacy clients did not set `accept` header so catch them using `*/*`
// and encode responses using legacy (CBOR) encoder.
'*/*;q=0.1': Legacy.response,
// Modern clients set `accept` header to `application/vnd.ipld.car` and
// we encode responses to them in CAR encoding.
[CAR.contentType]: CAR.response,
},
})

/**
* AWS HTTP Gateway handler for POST / with ucan invocation router.
*
Expand Down Expand Up @@ -82,6 +104,7 @@ export async function ucanInvocationRouter(request) {
const workflowBucket = createWorkflowStore(AWS_REGION, workflowBucketName)

const server = await createUcantoServer(serviceSigner, {
codec,
storeTable: createStoreTable(AWS_REGION, storeTableName, {
endpoint: dbEndpoint,
}),
Expand Down Expand Up @@ -111,24 +134,34 @@ export async function ucanInvocationRouter(request) {
kinesisClient,
}

const payload = {
headers: /** @type {Record<string, string>} */ (request.headers),
body: Buffer.from(request.body, 'base64'),
}

const result = server.codec.accept(payload)
// TODO: better error handling
if (result.error) {
throw result.error
}

const connection = result.ok

// Process workflow
// We block until we can log the UCAN invocation if this fails we return a 500
// to the client. That is because in the future we expect that invocations will
// be written to a queue first and then processed asynchronously, so if we
// fail to queue the invocation we should not handle it.
const incoming = await processAgentMessageArchive(
{
headers: /** @type {Record<string, string>} */ (request.headers),
body: Buffer.from(request.body, 'base64'),
},
CAR.request.encode(await connection.decoder.decode(payload)),
processingCtx
)

// Execute invocations
const outgoing = await Server.execute(incoming, server)
const response = CAR.response.encode(outgoing)

await processAgentMessageArchive(response, processingCtx)
await processAgentMessageArchive(CAR.response.encode(outgoing), processingCtx)
const response = await connection.encoder.encode(outgoing)

return toLambdaSuccessResponse(response)
}
Expand Down

0 comments on commit 6a19d2b

Please sign in to comment.