Skip to content

Commit

Permalink
feat: add replicator bucket event
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed May 16, 2023
1 parent 6a19d2b commit 16ee9d9
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ UPLOAD_API_DID = ''
ACCESS_SERVICE_URL = ''
R2_ACCESS_KEY_ID = ''
R2_CARPARK_BUCKET_NAME = ''
R2_CARPARK_BUCKET_PUBLIC_URL = ''
R2_DUDEWHERE_BUCKET_NAME = ''
R2_ENDPOINT = ''
R2_REGION = ''
R2_SATNAV_BUCKET_NAME = ''
R2_SATNAV_BUCKET_PUBLIC_URL = ''
R2_SECRET_ACCESS_KEY = ''
R2_UCAN_BUCKET_NAME = ''
SATNAV_BUCKET_NAME = ''
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,18 @@ Endpoint for S3 like cloud object storage to replicate content into.

Bucket name to replicate written CAR files.

#### `R2_CARPARK_BUCKET_PUBLIC_URL`

Bucket public URL where replicated CAR files are written into.

#### `R2_SATNAV_BUCKET_NAME`

Bucket name to replicate written .idx files.

#### `R2_SATNAV_BUCKET_PUBLIC_URL`

Bucket public URL where replicated .idx files are written into.

#### `R2_DUDEWHERE_BUCKET_NAME`

Bucket name to replicate root CID to car CIDs mapping.
Expand Down
24 changes: 24 additions & 0 deletions replicator/event-bus/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
export const REPLICATOR_EVENT_BRIDGE_SOURCE_EVENT = 'w3infra-replicator'

/**
* @typedef {object} EventBridgeDetail
* @property {string} key
* @property {string} url
* @property {string} destinationName
*/

/**
* @param {EventBridgeDetail} detail
* @param {import('@aws-sdk/client-eventbridge').EventBridge} eventBridge
* @param {string} eventBusName
*/
export async function notifyBus(detail, eventBridge, eventBusName) {
await eventBridge.putEvents({
Entries: [{
EventBusName: eventBusName,
Source: REPLICATOR_EVENT_BRIDGE_SOURCE_EVENT,
DetailType: 'file_replicated',
Detail: JSON.stringify(detail)
}]
})
}
22 changes: 19 additions & 3 deletions replicator/functions/replicator.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { EventBridge } from '@aws-sdk/client-eventbridge'
import { S3Client } from '@aws-sdk/client-s3'
import * as Sentry from '@sentry/serverless'

import { replicate } from '../index.js'
import parseSqsEvent from '../utils/parse-sqs-event.js'
import { notifyBus } from '../event-bus/index.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand All @@ -15,12 +17,14 @@ Sentry.AWSLambda.init({
*
* @param {import('aws-lambda').SQSEvent} event
*/
function replicatorHandler (event) {
async function replicatorHandler (event) {
const {
REPLICATOR_ENDPOINT,
REPLICATOR_ACCESS_KEY_ID,
REPLICATOR_SECRET_ACCESS_KEY,
REPLICATOR_BUCKET_NAME,
REPLICATOR_BUCKET_PUBLIC_URL,
EVENT_BUS_ARN,
} = getEnv()

const record = parseSqsEvent(event)
Expand All @@ -38,12 +42,22 @@ function replicatorHandler (event) {
})

const originBucket = new S3Client({ region: record.bucketRegion })
return replicate({
await replicate({
record,
destinationBucket,
originBucket,
destinationBucketName: REPLICATOR_BUCKET_NAME,
})

if (EVENT_BUS_ARN) {
const bus = new EventBridge({})
const detail = {
key: record.key,
url: (new URL(record.key, REPLICATOR_BUCKET_PUBLIC_URL)).toString(),
destinationName: REPLICATOR_BUCKET_NAME,
}
await notifyBus(detail, bus, EVENT_BUS_ARN)
}
}

export const handler = Sentry.AWSLambda.wrapHandler(replicatorHandler)
Expand All @@ -56,7 +70,9 @@ function getEnv() {
REPLICATOR_ENDPOINT: mustGetEnv('REPLICATOR_ENDPOINT'),
REPLICATOR_ACCESS_KEY_ID: mustGetEnv('REPLICATOR_ACCESS_KEY_ID'),
REPLICATOR_SECRET_ACCESS_KEY: mustGetEnv('REPLICATOR_SECRET_ACCESS_KEY'),
REPLICATOR_BUCKET_NAME: mustGetEnv('REPLICATOR_BUCKET_NAME')
REPLICATOR_BUCKET_NAME: mustGetEnv('REPLICATOR_BUCKET_NAME'),
REPLICATOR_BUCKET_PUBLIC_URL: mustGetEnv('REPLICATOR_BUCKET_PUBLIC_URL'),
EVENT_BUS_ARN: process.env.EVENT_BUS_ARN,
}
}

Expand Down
1 change: 1 addition & 0 deletions replicator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.226.0",
"@aws-sdk/client-s3": "^3.226.0",
"@sentry/serverless": "^7.22.0"
},
Expand Down
38 changes: 38 additions & 0 deletions replicator/test/event-bus.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { test } from './helpers/context.js'

import { notifyBus, REPLICATOR_EVENT_BRIDGE_SOURCE_EVENT } from '../event-bus/index.js'

const eventBusName = 'event-bus-arn'

test('notifies event bus when new file is replicated', async t => {
const detail = {
key: 'bafyfoo/bafyfoo.car',
url: 'https://endpoint.io/bafyfoo/bafyfoo.car',
destinationName: 'carpark-prod-1'
}
const bus = {
putEvents: (/** @type {any} */ data) => {
t.is(data.Entries.length, 1)
for (let i = 0; i < data.Entries.length; i++) {
const entry = data.Entries[i]

t.is(entry.EventBusName, eventBusName)
t.is(entry.Source, REPLICATOR_EVENT_BRIDGE_SOURCE_EVENT)
t.is(entry.DetailType, 'file_replicated')

const entryDetail = JSON.parse(entry.Detail)
t.deepEqual(entryDetail, detail)
}
return {
promise: () => Promise.resolve(data)
}
}
}

await notifyBus(
detail,
// @ts-expect-error non complete event bus implementation
bus,
eventBusName
)
})
4 changes: 4 additions & 0 deletions stacks/replicator-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export function ReplicatorStack({ stack, app }) {
REPLICATOR_SECRET_ACCESS_KEY:
process.env.R2_SECRET_ACCESS_KEY || '',
REPLICATOR_BUCKET_NAME: process.env.R2_CARPARK_BUCKET_NAME || '',
REPLICATOR_BUCKET_PUBLIC_URL: process.env.R2_CARPARK_BUCKET_PUBLIC_URL || '',
EVENT_BUS_ARN: eventBus.eventBusArn
},
permissions: ['s3:*'],
handler: 'functions/replicator.handler',
Expand All @@ -53,6 +55,8 @@ export function ReplicatorStack({ stack, app }) {
REPLICATOR_SECRET_ACCESS_KEY:
process.env.R2_SECRET_ACCESS_KEY || '',
REPLICATOR_BUCKET_NAME: process.env.R2_SATNAV_BUCKET_NAME || '',
REPLICATOR_BUCKET_PUBLIC_URL: process.env.R2_SATNAV_BUCKET_PUBLIC_URL || '',
EVENT_BUS_ARN: eventBus.eventBusArn
},
permissions: ['s3:*'],
handler: 'functions/replicator.handler',
Expand Down

0 comments on commit 16ee9d9

Please sign in to comment.