Skip to content

Commit 110a0a1

Browse files
authored
Merge pull request #389 from cursor/feat/replace-workflow-with-supabase-queues
Replace Vercel Workflow with Supabase Queues + duplicate detection
2 parents d7c6469 + b99b94e commit 110a0a1

19 files changed

Lines changed: 553 additions & 1198 deletions

File tree

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,35 @@ See the [Open Plugins specification](https://open-plugins.com/plugin-builders/sp
100100
- **Framework**: [Next.js](https://nextjs.org) (App Router, Turbopack)
101101
- **Runtime**: [Bun](https://bun.sh)
102102
- **Database**: [Supabase](https://supabase.com) (PostgreSQL)
103+
- **Job Queue**: [Supabase Queues](https://supabase.com/docs/guides/queues) (`pgmq`) drained by a 1-min Vercel cron
103104
- **Styling**: [Tailwind CSS](https://tailwindcss.com)
104105
- **UI**: [Radix UI](https://radix-ui.com) + [shadcn/ui](https://ui.shadcn.com)
105106
- **Search**: [Fuse.js](https://fusejs.io) (client-side fuzzy search)
106107
- **URL State**: [nuqs](https://nuqs.47ng.com)
107108
- **Linting**: [Biome](https://biomejs.dev)
108109

110+
## Plugin security scan
111+
112+
Submitted plugins are auto-reviewed by a Cursor SDK agent (`composer-2`) running
113+
in `local` mode against a fresh clone of the plugin's repo plus its inline
114+
component content. The verdict (`safe` / `suspicious` / `malicious`) is written
115+
back to `plugins.scan_status` and surfaces in the admin queue.
116+
117+
The scan is asynchronous and runs out of the request lifecycle:
118+
119+
1. **Enqueue** — server actions and the recover-stuck-scans cron call
120+
`enqueuePluginScan(pluginId)` which sends a message to the `plugin_scans`
121+
pgmq queue.
122+
2. **Kick** — user-facing actions also fire `kickDrainAfterResponse()` so the
123+
drain route is called via `next/server` `after()` immediately after the
124+
response is flushed. Scans typically start within a few hundred ms.
125+
3. **Drain**`/api/queue/plugin-scans/drain` reads one message
126+
(`vt=900s`, `n=1`), runs `runPluginScan(pluginId)`, archives the message on
127+
success, leaves it for VT-expiry on retryable error, or buries it after
128+
`MAX_ATTEMPTS=5` deliveries.
129+
4. **Cron safety net** — Vercel cron hits the same drain route every minute so
130+
any messages that missed their kick still get processed.
131+
132+
The drain route uses `maxDuration = 800` (Vercel Pro+ Fluid Compute ceiling) and
133+
relies on `CURSOR_API_KEY` + `CRON_SECRET` from the env file.
134+

apps/cursor/.env.example

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ NEXT_PUBLIC_ADMIN_USER_IDS=
2121
# local dev; otherwise checkRateLimit is a no-op outside production.
2222
NEXT_PUBLIC_VERCEL_FIREWALL_HOST_FOR_DEVELOPMENT=
2323

24-
# Cursor SDK — required for the plugin security scan workflow
25-
# (src/workflows/scan-plugin.ts). Mint a key at
24+
# Cursor SDK — required for the plugin security scan worker
25+
# (src/lib/plugins/scan.ts, drained from a Supabase Queue by
26+
# /api/queue/plugin-scans/drain). Mint a key at
2627
# https://cursor.com/dashboard/cloud-agents or use a team service-account key.
2728
CURSOR_API_KEY=
2829

apps/cursor/next.config.mjs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { dirname, resolve } from "node:path";
22
import { fileURLToPath } from "node:url";
3-
import { withWorkflow } from "workflow/next";
43

54
const __dirname = dirname(fileURLToPath(import.meta.url));
65

@@ -115,4 +114,4 @@ const nextConfig = {
115114
},
116115
};
117116

118-
export default withWorkflow(nextConfig);
117+
export default nextConfig;

apps/cursor/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
"slugify": "^1.6.9",
4949
"sonner": "^2.0.7",
5050
"tailwind-merge": "^3.5.0",
51-
"workflow": "^4.2.4",
5251
"zod": "^4.4.3"
5352
},
5453
"devDependencies": {

apps/cursor/src/actions/review-flagged-plugin.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
"use server";
22

33
import { revalidatePath } from "next/cache";
4-
import { start } from "workflow/api";
54
import { z } from "zod";
5+
import { enqueuePluginScan, kickDrainAfterResponse } from "@/lib/plugins/queue";
66
import { createClient } from "@/utils/supabase/admin-client";
7-
import { scanPluginWorkflow } from "@/workflows/scan-plugin";
87
import { ActionError, adminActionClient } from "./safe-action";
98

109
const pluginIdSchema = z.object({ pluginId: z.string().uuid() });
@@ -89,7 +88,8 @@ export const rescanPluginAction = adminActionClient
8988
}
9089

9190
try {
92-
await start(scanPluginWorkflow, [pluginId]);
91+
await enqueuePluginScan(pluginId);
92+
kickDrainAfterResponse();
9393
} catch (err) {
9494
throw new ActionError(
9595
`Failed to enqueue scan: ${err instanceof Error ? err.message : String(err)}`,

apps/cursor/src/actions/update-plugin.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
"use server";
22

33
import { revalidatePath } from "next/cache";
4-
import { start } from "workflow/api";
54
import { z } from "zod";
5+
import { enqueuePluginScan, kickDrainAfterResponse } from "@/lib/plugins/queue";
66
import { pluginScanLimit } from "@/lib/rate-limit";
77
import { createClient } from "@/utils/supabase/admin-client";
8-
import { scanPluginWorkflow } from "@/workflows/scan-plugin";
98
import { ActionError, authActionClient } from "./safe-action";
109

1110
const componentSchema = z.object({
@@ -148,9 +147,10 @@ export const updatePluginAction = authActionClient
148147
}
149148

150149
try {
151-
await start(scanPluginWorkflow, [id]);
152-
} catch (workflowError) {
153-
console.error("Failed to enqueue scan workflow", workflowError);
150+
await enqueuePluginScan(id);
151+
kickDrainAfterResponse();
152+
} catch (queueError) {
153+
console.error("Failed to enqueue plugin scan", queueError);
154154
}
155155

156156
revalidatePath("/");

apps/cursor/src/app/api/cron/recover-stuck-scans/route.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import { type NextRequest, NextResponse } from "next/server";
2-
import { start } from "workflow/api";
32
import { requireCronAuth } from "@/lib/cron-auth";
3+
import { enqueuePluginScan } from "@/lib/plugins/queue";
44
import { createClient } from "@/utils/supabase/admin-client";
5-
import { scanPluginWorkflow } from "@/workflows/scan-plugin";
65

76
export const dynamic = "force-dynamic";
87
export const maxDuration = 60;
@@ -12,8 +11,8 @@ export const maxDuration = 60;
1211
// and this cron see the same set of rows.
1312
const STALE_AFTER_MS = 15 * 60 * 1000;
1413

15-
// Hard cap per cron tick. Each start() is cheap (it just enqueues the workflow)
16-
// but we don't want to flood the workflow service if a backlog builds up.
14+
// Hard cap per cron tick. Each enqueuePluginScan() is cheap (just a pgmq.send)
15+
// but we don't want to flood the queue if a backlog builds up.
1716
const MAX_RETRIES_PER_RUN = 25;
1817

1918
export async function GET(request: NextRequest) {
@@ -36,10 +35,15 @@ export async function GET(request: NextRequest) {
3635
throw new Error(`Failed to query stuck plugins: ${error.message}`);
3736
}
3837

39-
const results: Array<{ id: string; slug: string; ok: boolean; error?: string }> = [];
38+
const results: Array<{
39+
id: string;
40+
slug: string;
41+
ok: boolean;
42+
error?: string;
43+
}> = [];
4044

4145
for (const plugin of stuck ?? []) {
42-
// Reset back to pending so the workflow's loadPlugin → markScanning
46+
// Reset back to pending so the worker's loadPlugin → markScanning
4347
// transition is idempotent (the prevActive snapshot is recomputed).
4448
if (plugin.scan_status === "scanning") {
4549
await supabase
@@ -49,7 +53,7 @@ export async function GET(request: NextRequest) {
4953
}
5054

5155
try {
52-
await start(scanPluginWorkflow, [plugin.id]);
56+
await enqueuePluginScan(plugin.id);
5357
results.push({ id: plugin.id, slug: plugin.slug, ok: true });
5458
} catch (err) {
5559
const message = err instanceof Error ? err.message : String(err);
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import { type NextRequest, NextResponse } from "next/server";
2+
import { requireCronAuth } from "@/lib/cron-auth";
3+
import {
4+
archivePluginScan,
5+
PLUGIN_SCAN_QUEUE,
6+
readNextPluginScan,
7+
} from "@/lib/plugins/queue";
8+
import {
9+
FatalScanError,
10+
markScanFailed,
11+
runPluginScan,
12+
} from "@/lib/plugins/scan";
13+
14+
// Vercel max for Pro / Enterprise + Fluid Compute (default since 2025) is 800s.
15+
// Source: https://vercel.com/docs/functions/configuring-functions/duration
16+
//
17+
// The `Agent.prompt` step can take 1–3 minutes for a typical plugin; the git
18+
// clone is bounded by CLONE_TIMEOUT_MS (60s) inside scan.ts. 800s gives us
19+
// generous headroom for the worst-case agent run.
20+
export const dynamic = "force-dynamic";
21+
export const maxDuration = 800;
22+
23+
// Visibility timeout: how long the message is invisible to other consumers
24+
// after a successful `read`. Set comfortably longer than `maxDuration` so we
25+
// can never hand the same message to a second drain invocation while the
26+
// first one is still running.
27+
const VT_SECONDS = 900;
28+
29+
// Bury after this many delivery attempts. With per-cron `n=1` and a 1-min
30+
// schedule, this means a poisonous message stays in the queue for ~5 min
31+
// after `read_ct=1` (we only see read_ct on the next read after the VT
32+
// expires) before we mark the plugin errored and stop retrying.
33+
const MAX_ATTEMPTS = 5;
34+
35+
function logInfo(msg: string, meta?: Record<string, unknown>) {
36+
console.log(`[scan-drain] ${msg}${meta ? ` ${JSON.stringify(meta)}` : ""}`);
37+
}
38+
39+
function logError(msg: string, err: unknown) {
40+
const detail =
41+
err instanceof Error
42+
? { name: err.name, message: err.message, stack: err.stack }
43+
: { value: String(err) };
44+
console.error(`[scan-drain] ${msg}`, detail);
45+
}
46+
47+
export async function GET(request: NextRequest) {
48+
const unauthorized = requireCronAuth(request);
49+
if (unauthorized) return unauthorized;
50+
51+
let msg: Awaited<ReturnType<typeof readNextPluginScan>>;
52+
try {
53+
msg = await readNextPluginScan(VT_SECONDS);
54+
} catch (err) {
55+
logError("readNextPluginScan failed", err);
56+
return NextResponse.json(
57+
{ ok: false, error: "queue_read_failed" },
58+
{ status: 500 },
59+
);
60+
}
61+
62+
if (!msg) {
63+
return NextResponse.json({
64+
ok: true,
65+
queue: PLUGIN_SCAN_QUEUE,
66+
drained: 0,
67+
});
68+
}
69+
70+
const { msg_id, read_ct, message } = msg;
71+
const pluginId = message.plugin_id;
72+
73+
if (!pluginId || typeof pluginId !== "string") {
74+
// Malformed payload — archive it so it doesn't keep getting retried.
75+
logError(
76+
"malformed message; archiving",
77+
new Error(JSON.stringify(message)),
78+
);
79+
await archivePluginScan(msg_id).catch((err) =>
80+
logError("archive (malformed) failed", err),
81+
);
82+
return NextResponse.json(
83+
{ ok: false, archived: msg_id, reason: "malformed_message" },
84+
{ status: 200 },
85+
);
86+
}
87+
88+
if (read_ct > MAX_ATTEMPTS) {
89+
logInfo("exceeded MAX_ATTEMPTS; burying", {
90+
pluginId,
91+
msg_id,
92+
read_ct,
93+
max: MAX_ATTEMPTS,
94+
});
95+
await markScanFailed(pluginId, `Exceeded ${MAX_ATTEMPTS} scan attempts`);
96+
await archivePluginScan(msg_id);
97+
return NextResponse.json({
98+
ok: true,
99+
buried: pluginId,
100+
msg_id,
101+
read_ct,
102+
});
103+
}
104+
105+
logInfo("processing", { pluginId, msg_id, read_ct });
106+
107+
try {
108+
await runPluginScan(pluginId);
109+
await archivePluginScan(msg_id);
110+
logInfo("scanned ok", { pluginId, msg_id });
111+
return NextResponse.json({ ok: true, scanned: pluginId, msg_id });
112+
} catch (err) {
113+
if (err instanceof FatalScanError) {
114+
// runPluginScan already wrote `scan_status='error'` via its compensation
115+
// path. Archive so the message doesn't get retried.
116+
logError("fatal; archiving", err);
117+
await archivePluginScan(msg_id).catch((archiveErr) =>
118+
logError("archive (fatal) failed", archiveErr),
119+
);
120+
return NextResponse.json(
121+
{
122+
ok: false,
123+
fatal: true,
124+
pluginId,
125+
msg_id,
126+
error: err.message,
127+
},
128+
{ status: 200 },
129+
);
130+
}
131+
132+
// Retryable: do NOT archive. The pgmq visibility timeout (VT_SECONDS)
133+
// expires and the next cron tick re-reads the message with read_ct + 1.
134+
logError("retryable; leaving message for VT to expire", err);
135+
return NextResponse.json(
136+
{
137+
ok: false,
138+
retryable: true,
139+
pluginId,
140+
msg_id,
141+
read_ct,
142+
error: err instanceof Error ? err.message : String(err),
143+
},
144+
{ status: 500 },
145+
);
146+
}
147+
}

apps/cursor/src/lib/github-plugin/parse.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Parse a public GitHub repo into a Cursor plugin shape.
33
*
44
* Pure module — no `"use server"`, no auth context, no DB calls. Safe to call
5-
* from server actions, workflows, or one-shot scripts.
5+
* from server actions, queue workers, or one-shot scripts.
66
*
77
* Optional `GITHUB_TOKEN` env var bumps the rate limit on the Repos / git tree
88
* endpoints from 60 req/h (unauth) to 5,000 req/h (auth).

apps/cursor/src/lib/plugins/insert.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@
44
* Wrapped by [`createPluginAction`](src/actions/create-plugin.ts) for the auth'd
55
* user-submission path and by the seed scripts for bulk import. Server actions
66
* stay responsible for auth, rate-limiting, and `revalidatePath` — this lib
7-
* only touches the database and (optionally) the scan workflow.
7+
* only touches the database and (optionally) enqueues the security scan.
88
*/
99

10-
import { start } from "workflow/api";
10+
import { enqueuePluginScan, kickDrainAfterResponse } from "@/lib/plugins/queue";
1111
import { createClient } from "@/utils/supabase/admin-client";
12-
import { scanPluginWorkflow } from "@/workflows/scan-plugin";
1312

1413
type ComponentInput = {
1514
type: string;
@@ -45,7 +44,7 @@ export type InsertPluginOptions = {
4544
* and skip the security scan. Used for the curated bulk seed.
4645
*
4746
* When false (default for user submissions): insert as `active=false,
48-
* scan_status='pending'` and enqueue `scanPluginWorkflow`.
47+
* scan_status='pending'` and enqueue the scan via `enqueuePluginScan`.
4948
*/
5049
skipScan?: boolean;
5150
};
@@ -156,9 +155,13 @@ export async function insertPlugin(
156155

157156
if (!skipScan) {
158157
try {
159-
await start(scanPluginWorkflow, [plugin.id]);
160-
} catch (workflowError) {
161-
console.error("Failed to enqueue scan workflow", workflowError);
158+
await enqueuePluginScan(plugin.id);
159+
kickDrainAfterResponse();
160+
} catch (queueError) {
161+
// Don't fail the insert if the queue is unreachable — the
162+
// recover-stuck-scans cron will re-enqueue any rows left at
163+
// scan_status='pending' after 15 min.
164+
console.error("Failed to enqueue plugin scan", queueError);
162165
}
163166
}
164167

0 commit comments

Comments
 (0)