Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions convex/crons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ crons.interval(
{ batchSize: 200, maxBatches: 5 },
)

crons.interval(
'download-dedupe-prune',
{ hours: 24 },
internal.downloads.pruneDownloadDedupesInternal,
{},
)

export default crons
12 changes: 12 additions & 0 deletions convex/downloads.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { describe, expect, it } from 'vitest'
import { __test } from './downloads'

describe('downloads helpers', () => {
it('calculates day start boundaries', () => {
const day = 86_400_000
expect(__test.getDayStart(0)).toBe(0)
expect(__test.getDayStart(day - 1)).toBe(0)
expect(__test.getDayStart(day)).toBe(day)
expect(__test.getDayStart(day + 1)).toBe(day)
})
})
94 changes: 83 additions & 11 deletions convex/downloads.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { v } from 'convex/values'
import { zipSync } from 'fflate'
import { api } from './_generated/api'
import { httpAction, mutation } from './_generated/server'
import { api, internal } from './_generated/api'
import { httpAction, internalMutation } from './_generated/server'
import { applyRateLimit, getClientIp } from './lib/httpRateLimit'
import { applySkillStatDeltas, bumpDailySkillStats } from './lib/skillStats'
import { hashToken } from './lib/tokens'

const DAY_MS = 86_400_000
const DEDUPE_RETENTION_DAYS = 14

export const downloadZip = httpAction(async (ctx, request) => {
const url = new URL(request.url)
Expand All @@ -14,6 +19,9 @@ export const downloadZip = httpAction(async (ctx, request) => {
return new Response('Missing slug', { status: 400 })
}

const rate = await applyRateLimit(ctx, request, 'download')
if (!rate.ok) return rate.response

const skillResult = await ctx.runQuery(api.skills.getBySlug, { slug })
if (!skillResult?.skill) {
return new Response('Skill not found', { status: 404 })
Expand Down Expand Up @@ -53,29 +61,93 @@ export const downloadZip = httpAction(async (ctx, request) => {
const zipArray = Uint8Array.from(zipData)
const zipBlob = new Blob([zipArray], { type: 'application/zip' })

await ctx.runMutation(api.downloads.increment, { skillId: skill._id })
const ip = getClientIp(request) ?? 'unknown'
const ipHash = await hashToken(ip)
const dayStart = getDayStart(Date.now())
try {
await ctx.runMutation(internal.downloads.recordDownloadInternal, {
skillId: skill._id,
ipHash,
dayStart,
})
} catch {
// Ignore download count failures.
}

return new Response(zipBlob, {
status: 200,
headers: {
headers: mergeHeaders(rate.headers, {
'Content-Type': 'application/zip',
'Content-Disposition': `attachment; filename="${slug}-${version.version}.zip"`,
'Cache-Control': 'private, max-age=60',
},
}),
})
})

export const increment = mutation({
args: { skillId: v.id('skills') },
export const recordDownloadInternal = internalMutation({
args: {
skillId: v.id('skills'),
ipHash: v.string(),
dayStart: v.number(),
},
handler: async (ctx, args) => {
const skill = await ctx.db.get(args.skillId)
if (!skill) return

const existing = await ctx.db
.query('downloadDedupes')
.withIndex('by_skill_ip_day', (q) =>
q.eq('skillId', args.skillId).eq('ipHash', args.ipHash).eq('dayStart', args.dayStart),
)
.unique()
if (existing) return

const now = Date.now()
const patch = applySkillStatDeltas(skill, { downloads: 1 })
await ctx.db.patch(skill._id, {
...patch,
updatedAt: now,
await ctx.db.insert('downloadDedupes', {
skillId: args.skillId,
ipHash: args.ipHash,
dayStart: args.dayStart,
createdAt: now,
})

const patch = applySkillStatDeltas(skill, { downloads: 1 })
await ctx.db.patch(skill._id, { ...patch, updatedAt: now })
await bumpDailySkillStats(ctx, { skillId: skill._id, now, downloads: 1 })
},
})

export const pruneDownloadDedupesInternal = internalMutation({
args: {},
handler: async (ctx) => {
const cutoff = Date.now() - DEDUPE_RETENTION_DAYS * DAY_MS
let remaining = true
let batches = 0
while (remaining && batches < 10) {
const stale = await ctx.db
.query('downloadDedupes')
.withIndex('by_day')
.filter((q) => q.lt(q.field('dayStart'), cutoff))
.take(200)
if (stale.length === 0) {
remaining = false
break
}
for (const entry of stale) {
await ctx.db.delete(entry._id)
}
batches += 1
}
},
})

export function getDayStart(timestamp: number) {
return Math.floor(timestamp / DAY_MS) * DAY_MS
}

export const __test = {
getDayStart,
}

function mergeHeaders(base: HeadersInit, extra: HeadersInit) {
return { ...(base as Record<string, string>), ...(extra as Record<string, string>) }
}
91 changes: 2 additions & 89 deletions convex/httpApiV1.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import { CliPublishRequestSchema, parseArk } from 'clawdhub-schema'
import { api, internal } from './_generated/api'
import type { Doc, Id } from './_generated/dataModel'
import type { ActionCtx } from './_generated/server'
import { httpAction } from './_generated/server'
import { type ActionCtx, httpAction } from './_generated/server'
import { requireApiTokenUser } from './lib/apiTokenAuth'
import { hashToken } from './lib/tokens'
import { applyRateLimit, parseBearerToken } from './lib/httpRateLimit'
import { publishVersionForUser } from './skills'
import { publishSoulVersionForUser } from './souls'

const RATE_LIMIT_WINDOW_MS = 60_000
const RATE_LIMITS = {
read: { ip: 120, key: 600 },
write: { ip: 30, key: 120 },
} as const
const MAX_RAW_FILE_BYTES = 200 * 1024

type SearchSkillEntry = {
Expand Down Expand Up @@ -631,87 +625,6 @@ async function resolveTags(
return resolved
}

async function applyRateLimit(
ctx: ActionCtx,
request: Request,
kind: 'read' | 'write',
): Promise<{ ok: true; headers: HeadersInit } | { ok: false; response: Response }> {
const ip = getClientIp(request) ?? 'unknown'
const ipResult = await checkRateLimit(ctx, `ip:${ip}`, RATE_LIMITS[kind].ip)
const token = parseBearerToken(request)
const keyResult = token
? await checkRateLimit(ctx, `key:${await hashToken(token)}`, RATE_LIMITS[kind].key)
: null

const chosen = pickMostRestrictive(ipResult, keyResult)
const headers = rateHeaders(chosen)

if (!ipResult.allowed || (keyResult && !keyResult.allowed)) {
return {
ok: false,
response: text('Rate limit exceeded', 429, headers),
}
}

return { ok: true, headers }
}

type RateLimitResult = {
allowed: boolean
remaining: number
limit: number
resetAt: number
}

async function checkRateLimit(
ctx: ActionCtx,
key: string,
limit: number,
): Promise<RateLimitResult> {
return (await ctx.runMutation(internal.rateLimits.checkRateLimitInternal, {
key,
limit,
windowMs: RATE_LIMIT_WINDOW_MS,
})) as RateLimitResult
}

function pickMostRestrictive(primary: RateLimitResult, secondary: RateLimitResult | null) {
if (!secondary) return primary
if (!primary.allowed) return primary
if (!secondary.allowed) return secondary
return secondary.remaining < primary.remaining ? secondary : primary
}

function rateHeaders(result: RateLimitResult): HeadersInit {
const resetSeconds = Math.ceil(result.resetAt / 1000)
return {
'X-RateLimit-Limit': String(result.limit),
'X-RateLimit-Remaining': String(result.remaining),
'X-RateLimit-Reset': String(resetSeconds),
...(result.allowed ? {} : { 'Retry-After': String(resetSeconds) }),
}
}

function getClientIp(request: Request) {
const header =
request.headers.get('cf-connecting-ip') ??
request.headers.get('x-real-ip') ??
request.headers.get('x-forwarded-for') ??
request.headers.get('fly-client-ip')
if (!header) return null
if (header.includes(',')) return header.split(',')[0]?.trim() || null
return header.trim()
}

function parseBearerToken(request: Request) {
const header = request.headers.get('authorization') ?? request.headers.get('Authorization')
if (!header) return null
const trimmed = header.trim()
if (!trimmed.toLowerCase().startsWith('bearer ')) return null
const token = trimmed.slice(7).trim()
return token || null
}

function json(value: unknown, status = 200, headers?: HeadersInit) {
return new Response(JSON.stringify(value), {
status,
Expand Down
23 changes: 23 additions & 0 deletions convex/lib/httpRateLimit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* @vitest-environment node */
import { describe, expect, it } from 'vitest'
import { getClientIp } from './httpRateLimit'

describe('getClientIp', () => {
it('returns null when cf-connecting-ip missing', () => {
const request = new Request('https://example.com', {
headers: {
'x-forwarded-for': '203.0.113.9',
},
})
expect(getClientIp(request)).toBeNull()
})

it('returns first ip from cf-connecting-ip', () => {
const request = new Request('https://example.com', {
headers: {
'cf-connecting-ip': '203.0.113.1, 198.51.100.2',
},
})
expect(getClientIp(request)).toBe('203.0.113.1')
})
})
100 changes: 100 additions & 0 deletions convex/lib/httpRateLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { internal } from '../_generated/api'
import type { ActionCtx } from '../_generated/server'
import { hashToken } from './tokens'

const RATE_LIMIT_WINDOW_MS = 60_000
export const RATE_LIMITS = {
read: { ip: 120, key: 600 },
write: { ip: 30, key: 120 },
download: { ip: 20, key: 120 },
} as const

type RateLimitResult = {
allowed: boolean
remaining: number
limit: number
resetAt: number
}

export async function applyRateLimit(
ctx: ActionCtx,
request: Request,
kind: keyof typeof RATE_LIMITS,
): Promise<{ ok: true; headers: HeadersInit } | { ok: false; response: Response }> {
const ip = getClientIp(request) ?? 'unknown'
const ipResult = await checkRateLimit(ctx, `ip:${ip}`, RATE_LIMITS[kind].ip)
const token = parseBearerToken(request)
const keyResult = token
? await checkRateLimit(ctx, `key:${await hashToken(token)}`, RATE_LIMITS[kind].key)
: null

const chosen = pickMostRestrictive(ipResult, keyResult)
const headers = rateHeaders(chosen)

if (!ipResult.allowed || (keyResult && !keyResult.allowed)) {
return {
ok: false,
response: new Response('Rate limit exceeded', {
status: 429,
headers: mergeHeaders(
{
'Content-Type': 'text/plain; charset=utf-8',
'Cache-Control': 'no-store',
},
headers,
),
}),
}
}

return { ok: true, headers }
}

export function getClientIp(request: Request) {
const header = request.headers.get('cf-connecting-ip')
if (!header) return null
if (header.includes(',')) return header.split(',')[0]?.trim() || null
return header.trim()
}

async function checkRateLimit(
ctx: ActionCtx,
key: string,
limit: number,
): Promise<RateLimitResult> {
return (await ctx.runMutation(internal.rateLimits.checkRateLimitInternal, {
key,
limit,
windowMs: RATE_LIMIT_WINDOW_MS,
})) as RateLimitResult
}

function pickMostRestrictive(primary: RateLimitResult, secondary: RateLimitResult | null) {
if (!secondary) return primary
if (!primary.allowed) return primary
if (!secondary.allowed) return secondary
return secondary.remaining < primary.remaining ? secondary : primary
}

function rateHeaders(result: RateLimitResult): HeadersInit {
const resetSeconds = Math.ceil(result.resetAt / 1000)
return {
'X-RateLimit-Limit': String(result.limit),
'X-RateLimit-Remaining': String(result.remaining),
'X-RateLimit-Reset': String(resetSeconds),
...(result.allowed ? {} : { 'Retry-After': String(resetSeconds) }),
}
}

export function parseBearerToken(request: Request) {
const header = request.headers.get('authorization') ?? request.headers.get('Authorization')
if (!header) return null
const trimmed = header.trim()
if (!trimmed.toLowerCase().startsWith('bearer ')) return null
const token = trimmed.slice(7).trim()
return token || null
}

function mergeHeaders(base: HeadersInit, extra?: HeadersInit) {
return { ...(base as Record<string, string>), ...(extra as Record<string, string>) }
}
Loading