Skip to content

Commit

Permalink
Webhooks (#1)
Browse files Browse the repository at this point in the history
* Create dev environment

* Add webhook support

* Add test webhook

* Deploy webhook handler

* Add runPriority back

* Add trailId to trail status response

* Fix

* Save webhook run state

* Create a cache for each request rather than lambda instance
  • Loading branch information
jschr authored Oct 25, 2020
1 parent 21e7fb1 commit 1526e5b
Show file tree
Hide file tree
Showing 22 changed files with 611 additions and 21 deletions.
14 changes: 6 additions & 8 deletions .env.dev.sample
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
PROJECT=TrailStatusAppDev

TLD=localhost
TLD=
API_SUBDOMAIN=
API_ENDPOINT=https://${API_SUBDOMAIN}.${TLD}
SSL_ARN=

# Removes dynamodb tables when tearing down
USER_RESOURCE_REMOVAL_POLICY=destroy
USER_RESOURCE_REMOVAL_POLICY=retain

API_PORT=4000
API_ENDPOINT=https://${TLD}:${API_PORT}

FRONTEND_PORT=3000
FRONTEND_ENDPOINT=https://${TLD}:${FRONTEND_PORT}
FRONTEND_ENDPOINT=https://${TLD}
12 changes: 12 additions & 0 deletions .env.local.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
PROJECT=TrailStatusAppDev

TLD=localhost

# Removes dynamodb tables when tearing down
USER_RESOURCE_REMOVAL_POLICY=destroy

API_PORT=4000
API_ENDPOINT=https://${TLD}:${API_PORT}

FRONTEND_PORT=3000
FRONTEND_ENDPOINT=https://${TLD}:${FRONTEND_PORT}
1 change: 1 addition & 0 deletions .env.production.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ PROJECT=TrailStatusApp
TLD=
API_SUBDOMAIN=
API_ENDPOINT=https://${API_SUBDOMAIN}.${TLD}
SSL_ARN=

USER_RESOURCE_REMOVAL_POLICY=retain

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ data
.cache
out
*.env
*.env.local
*.env.dev
*.env.production
*.log
Expand Down
82 changes: 82 additions & 0 deletions api/src/handlers/runWebhooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import fetch from 'node-fetch';
import withSQSHandler from '../withSQSHandler';
import WebhookModel from '../models/WebhookModel';
import TrailStatusModel from '../models/TrailStatusModel';

export default withSQSHandler(async event => {
if (!Array.isArray(event?.Records) || !event.Records.length) {
console.info(`Received empty messages, do nothing.`);
return;
}

// Messages should be grouped by trail id so cache the trail status to
// avoid unecessary db lookups while processing the webhooks for a trail.
const getTrailStatus = createTrailStatusCache();

for (const message of event.Records) {
let webhookId: string | null = null;
try {
({ webhookId } = JSON.parse(message.body));
} catch (err) {
console.error(`Failed to parse message body '${message.body}'`);
}

if (!webhookId) {
console.error(`Missing webhookId in message body '${message.body}'`);
continue;
}

const webhook = await WebhookModel.get(webhookId);
if (!webhook) {
console.error(`Failed to find webhook for '${webhookId}'`);
continue;
}

const trailStatus = await getTrailStatus(webhook.trailId);
if (!trailStatus) {
console.error(`Failed to find trail for '${webhook.trailId}'`);
continue;
}

const res = await fetch(`${webhook.url}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(trailStatus),
});

// If the webhook fails throw an error. This triggers the message batch to be retried.
// There should only be one message per batch but this handles multiple. If multiple
// messages are sent in the batch and one fails, the entire batch is re-tried. Webhooks
// should not expect to messages to be delivered only once.
if (!res.ok) {
const errorMessage = `Failed to process webhook '${webhookId}', invalid response '${res.status}' from '${webhook.url}'`;
await webhook.save({
lastRanAt: new Date().toISOString(),
error: errorMessage,
});

throw new Error(errorMessage);
}

await webhook.save({
lastRanAt: new Date().toISOString(),
error: '',
});

console.info(
`Successfully ran webhook '${webhookId}', received status '${res.status}' from '${webhook.url}'`,
);
}
});

const createTrailStatusCache = () => {
const cache: Record<string, TrailStatusModel | null> = {};
return async (trailId: string) => {
if (trailId in cache) {
return cache[trailId];
}
const result = await TrailStatusModel.get(trailId);
cache[trailId] = result;
return result;
};
};
38 changes: 38 additions & 0 deletions api/src/handlers/syncTrailStatus.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as AWS from 'aws-sdk';
import TrailSettingsModel from '../models/TrailSettingsModel';
import TrailStatusModel from '../models/TrailStatusModel';
import UserModel from '../models/UserModel';
import WebhookModel from '../models/WebhookModel';
import * as instagram from '../clients/instagram';
import withScheduledHandler from '../withScheduledHandler';

Expand All @@ -13,6 +15,14 @@ interface TrailUpdateResult {
failed?: boolean;
reason?: string;
skipped?: boolean;
webhookJobsCreated?: number;
}

const sqs = new AWS.SQS();
const webhookQueueUrl = process.env.WEBHOOK_QUEUE_URL;
if (!webhookQueueUrl) {
// Log error but don't throw.
console.error(`Missing environment variable 'WEBHOOK_QUEUE_URL'`);
}

export default withScheduledHandler(async () => {
Expand Down Expand Up @@ -63,6 +73,7 @@ const updateTrailStatus = async (
let message: string | undefined;
let imageUrl: string | undefined;
let instagramPostId: string | undefined;
let webhookJobsCreated: number | undefined;
let skipped = true;

for (const { id, caption, mediaUrl } of userMedia) {
Expand Down Expand Up @@ -120,12 +131,20 @@ const updateTrailStatus = async (
lastSyncdAt: new Date().toISOString(),
});

const webhooks = await WebhookModel.getTrailWebhooks(
trailSettings.trailId,
);

await Promise.all(webhooks.map(createWebhookJob));
webhookJobsCreated = webhooks.length;

skipped = false;
}

return {
trailId: trailSettings.trailId,
success: true,
webhookJobsCreated,
status,
message,
imageUrl,
Expand All @@ -140,5 +159,24 @@ const updateTrailStatus = async (
}
};

const createWebhookJob = async (webhook: WebhookModel) => {
if (!webhookQueueUrl) return;

const params: AWS.SQS.SendMessageRequest = {
MessageGroupId: webhook.trailId,
MessageDeduplicationId: webhook.webhookId,
MessageBody: JSON.stringify({ webhookId: webhook.webhookId }),
QueueUrl: webhookQueueUrl,
};

try {
await sqs.sendMessage(params).promise();
} catch (err) {
throw new Error(
`Failed to create webhook job for '${webhook.webhookId}' with '${err.message}'`,
);
}
};

const stripHashtags = (value: string) =>
value.replace(/\#[a-zA-Z0-9-]+(\s|\.|$)/g, '').trim();
8 changes: 8 additions & 0 deletions api/src/handlers/testWebhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { json } from '../responses';
import withApiHandler from '../withApiHandler';

export default withApiHandler([], async event => {
const payload = event.body;
console.info(`Test webhook received payload '${event.body}'`);
return json(payload);
});
1 change: 1 addition & 0 deletions api/src/models/TrailStatusModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ export default class TrailStatusModel {

public toJSON() {
return {
trailId: this.trailId,
status: this.status,
message: this.message,
imageUrl: this.imageUrl,
Expand Down
Loading

0 comments on commit 1526e5b

Please sign in to comment.