Skip to content

Commit e0bc389

Browse files
authored
feat(notifications): Phase 4 — notification delivery relay (#2513)
* feat(notifications): Phase 4 — notification delivery relay - api/notify.ts: Vercel edge bridge — validates Clerk JWT, publishes wm:breaking-news events to Upstash wm:events:notify channel - scripts/notification-relay.cjs: Railway relay subscribing to Upstash long-poll; fans out to Telegram (with 429 retry + 403 deactivation), Slack (SSRF-guarded DNS check + AES-256-GCM decrypt), and Resend email; SET NX dedup prevents double-delivery within 30min - breaking-news-alerts.ts: fire-and-forget POST to /api/notify after wm:breaking-news dispatch (web-only, no-op if not signed in) - convex/notificationChannels.ts: add getChannelsByUserId query for relay use (fetches channels by userId without auth requirement) - .env.example: add RESEND_FROM_EMAIL * fix(notify-relay): address Greptile P0/P1 security findings - Change getChannelsByUserId to internalQuery (P0: was public/unauthenticated) - Add /relay/channels HTTP action with timing-safe RELAY_SECRET validation - Relay now calls Convex HTTP action instead of public query - Remove parse_mode: 'HTML' from sendTelegram (P1: injection risk) - Add AbortSignal.timeout(10000) to sendTelegram (P1: hung connections) - Add res.ok guard in upstashRest with warning log (P1: silent error swallow) - Document RELAY_SECRET in .env.example (must be set in both Railway + Convex) * fix(notify-relay): reuse RELAY_SHARED_SECRET instead of new RELAY_SECRET var
1 parent 77ca497 commit e0bc389

6 files changed

Lines changed: 428 additions & 1 deletion

File tree

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,3 +305,6 @@ NOTIFICATION_ENCRYPTION_KEY=
305305
# Resend API key for email notification delivery.
306306
# Get from: resend.com/api-keys
307307
RESEND_API_KEY=
308+
309+
# "From" address for email notifications (must be a verified Resend sender domain)
310+
RESEND_FROM_EMAIL=WorldMonitor <alerts@worldmonitor.app>

api/notify.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* Notification publish endpoint.
3+
*
4+
* POST /api/notify — validates Clerk JWT, publishes event to Upstash wm:events:notify channel
5+
*
6+
* Authentication: Clerk Bearer token in Authorization header.
7+
* Requires UPSTASH_REDIS_REST_URL + UPSTASH_REDIS_REST_TOKEN env vars.
8+
*/
9+
10+
export const config = { runtime: 'edge' };
11+
12+
// @ts-expect-error — JS module, no declaration file
13+
import { getCorsHeaders, isDisallowedOrigin } from './_cors.js';
14+
// @ts-expect-error — JS module, no declaration file
15+
import { jsonResponse } from './_json-response.js';
16+
import { validateBearerToken } from '../server/auth-session';
17+
18+
export default async function handler(req: Request): Promise<Response> {
19+
if (isDisallowedOrigin(req)) {
20+
return jsonResponse({ error: 'Origin not allowed' }, 403);
21+
}
22+
23+
const cors = getCorsHeaders(req, 'POST, OPTIONS');
24+
25+
if (req.method === 'OPTIONS') {
26+
return new Response(null, { status: 204, headers: cors });
27+
}
28+
29+
if (req.method !== 'POST') {
30+
return jsonResponse({ error: 'Method not allowed' }, 405, cors);
31+
}
32+
33+
const authHeader = req.headers.get('Authorization') ?? '';
34+
const token = authHeader.startsWith('Bearer ') ? authHeader.slice(7) : '';
35+
if (!token) {
36+
return jsonResponse({ error: 'UNAUTHENTICATED' }, 401, cors);
37+
}
38+
39+
const session = await validateBearerToken(token);
40+
if (!session.valid || !session.userId) {
41+
return jsonResponse({ error: 'UNAUTHENTICATED' }, 401, cors);
42+
}
43+
44+
let body: { eventType?: unknown; payload?: unknown; severity?: unknown };
45+
try {
46+
body = await req.json();
47+
} catch {
48+
return jsonResponse({ error: 'Invalid JSON' }, 400, cors);
49+
}
50+
51+
if (typeof body.eventType !== 'string' || !body.eventType || body.eventType.length > 64) {
52+
return jsonResponse({ error: 'eventType required (string, max 64 chars)' }, 400, cors);
53+
}
54+
55+
if (typeof body.payload !== 'object' || body.payload === null || Array.isArray(body.payload)) {
56+
return jsonResponse({ error: 'payload must be an object' }, 400, cors);
57+
}
58+
59+
const upstashUrl = process.env.UPSTASH_REDIS_REST_URL;
60+
const upstashToken = process.env.UPSTASH_REDIS_REST_TOKEN;
61+
62+
if (!upstashUrl || !upstashToken) {
63+
return jsonResponse({ error: 'Service unavailable' }, 503, cors);
64+
}
65+
66+
const { eventType, payload } = body;
67+
const severity = typeof body.severity === 'string' ? body.severity : 'high';
68+
69+
const msg = JSON.stringify({
70+
eventType,
71+
payload,
72+
severity,
73+
publishedAt: Date.now(),
74+
userId: session.userId,
75+
});
76+
77+
const res = await fetch(
78+
`${upstashUrl}/publish/wm:events:notify/${encodeURIComponent(msg)}`,
79+
{ method: 'POST', headers: { Authorization: `Bearer ${upstashToken}` } },
80+
);
81+
82+
if (!res.ok) {
83+
return jsonResponse({ error: 'Publish failed' }, 502, cors);
84+
}
85+
86+
return jsonResponse({ ok: true }, 200, cors);
87+
}

convex/http.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { anyApi, httpRouter } from "convex/server";
22
import { httpAction } from "./_generated/server";
3+
import { internal } from "./_generated/api";
34

45
const TRUSTED = [
56
"https://worldmonitor.app",
@@ -181,4 +182,46 @@ http.route({
181182
}),
182183
});
183184

185+
http.route({
186+
path: "/relay/channels",
187+
method: "POST",
188+
handler: httpAction(async (ctx, request) => {
189+
const secret = process.env.RELAY_SHARED_SECRET ?? "";
190+
const provided = (request.headers.get("Authorization") ?? "").replace(/^Bearer\s+/, "");
191+
192+
if (!secret || !(await timingSafeEqualStrings(provided, secret))) {
193+
return new Response(JSON.stringify({ error: "UNAUTHORIZED" }), {
194+
status: 401,
195+
headers: { "Content-Type": "application/json" },
196+
});
197+
}
198+
199+
let body: { userId?: string };
200+
try {
201+
body = await request.json();
202+
} catch {
203+
return new Response(JSON.stringify({ error: "INVALID_JSON" }), {
204+
status: 400,
205+
headers: { "Content-Type": "application/json" },
206+
});
207+
}
208+
209+
if (typeof body.userId !== "string" || !body.userId) {
210+
return new Response(JSON.stringify({ error: "MISSING_USER_ID" }), {
211+
status: 400,
212+
headers: { "Content-Type": "application/json" },
213+
});
214+
}
215+
216+
const channels = await ctx.runQuery(internal.notificationChannels.getChannelsByUserId, {
217+
userId: body.userId,
218+
});
219+
220+
return new Response(JSON.stringify(channels ?? []), {
221+
status: 200,
222+
headers: { "Content-Type": "application/json" },
223+
});
224+
}),
225+
});
226+
184227
export default http;

convex/notificationChannels.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
import { ConvexError, v } from "convex/values";
2-
import { mutation, query } from "./_generated/server";
2+
import { internalQuery, mutation, query } from "./_generated/server";
33
import { channelTypeValidator } from "./constants";
44

5+
export const getChannelsByUserId = internalQuery({
6+
args: { userId: v.string() },
7+
handler: async (ctx, args) => {
8+
return await ctx.db
9+
.query("notificationChannels")
10+
.withIndex("by_user", (q) => q.eq("userId", args.userId))
11+
.collect();
12+
},
13+
});
14+
515
export const getChannels = query({
616
args: {},
717
handler: async (ctx) => {

0 commit comments

Comments
 (0)