Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/format.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: format

on:
pull_request:
branches: [dev]
workflow_dispatch:

jobs:
format:
runs-on: blacksmith-4vcpu-ubuntu-2404
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup Bun
uses: ./.github/actions/setup-bun

- name: Check formatting
run: ./script/format.ts --check
env:
CI: true
88 changes: 88 additions & 0 deletions packages/opencode/src/cli/cmd/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import type { Argv } from "yargs"
import { cmd } from "./cmd"
import { bootstrap } from "../bootstrap"
import { Storage } from "../../storage/storage"

export const StorageCommand = cmd({
command: "storage",
describe: "manage storage",
builder: (yargs: Argv) => yargs.command(StorageRepairCommand).command(StorageRestoreCommand).demandCommand(),
async handler() {},
})

export const StorageRepairCommand = cmd({
command: "repair",
describe: "scan storage, quarantine invalid JSON and clean temp files",
builder: (yargs: Argv) =>
yargs
.option("dry-run", {
type: "boolean",
describe: "do not modify files, only report actions",
default: false,
})
.option("prefix", {
type: "array",
describe: "limit scan to a subpath prefix",
})
.option("max-files", {
type: "number",
describe: "maximum number of files to process",
})
.option("max-mib", {
type: "number",
describe: "maximum total megabytes to process",
})
.option("report", {
type: "string",
describe: "write a JSON report to the given path",
}),
handler: async (argv: any) => {
await bootstrap(process.cwd(), async () => {
const result = await Storage.repair({
dryRun: !!argv["dry-run"],
prefix: (argv.prefix as string[] | undefined)?.map(String),
maxFiles: argv["max-files"] ? Number(argv["max-files"]) : undefined,
maxMiB: argv["max-mib"] ? Number(argv["max-mib"]) : undefined,
reportPath: argv.report as string | undefined,
})
console.log(
JSON.stringify(
{
quarantined: result.quarantined,
tempRemoved: result.tempRemoved,
quarantineRoot: result.quarantineRoot,
skippedLocked: result.skippedLocked,
reportPath: result.reportPath,
},
null,
2,
),
)
})
},
})

export const StorageRestoreCommand = cmd({
command: "restore <path>",
describe: "restore quarantined files back to storage",
builder: (yargs: Argv) =>
yargs
.positional("path", { describe: "path to a quarantined file or directory", type: "string" })
.option("dry-run", { type: "boolean", describe: "do not move files, only report", default: false }),
handler: async (argv: any) => {
await bootstrap(process.cwd(), async () => {
const result = await Storage.restore({ path: argv.path as string, dryRun: !!argv["dry-run"] })
console.log(
JSON.stringify(
{
restored: result.restored,
skippedLocked: result.skippedLocked,
files: result.files,
},
null,
2,
),
)
})
},
})
2 changes: 2 additions & 0 deletions packages/opencode/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { EOL } from "os"
import { WebCommand } from "./cli/cmd/web"
import { PrCommand } from "./cli/cmd/pr"
import { SessionCommand } from "./cli/cmd/session"
import { StorageCommand } from "./cli/cmd/storage"

process.on("unhandledRejection", (e) => {
Log.Default.error("rejection", {
Expand Down Expand Up @@ -99,6 +100,7 @@ const cli = yargs(hideBin(process.argv))
.command(GithubCommand)
.command(PrCommand)
.command(SessionCommand)
.command(StorageCommand)
.fail((msg, err) => {
if (
msg?.startsWith("Unknown argument") ||
Expand Down
212 changes: 209 additions & 3 deletions packages/opencode/src/storage/storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Log } from "../util/log"
import path from "path"
import fs from "fs/promises"
import * as nodefs from "fs"
import { Global } from "../global"
import { lazy } from "../util/lazy"
import { Lock } from "../util/lock"
Expand All @@ -20,6 +21,13 @@ export namespace Storage {
}),
)

export const DiskFullError = NamedError.create(
"DiskFullError",
z.object({
message: z.string(),
}),
)

const MIGRATIONS: Migration[] = [
async (dir) => {
const project = path.resolve(dir, "../project")
Expand Down Expand Up @@ -142,6 +150,7 @@ export namespace Storage {

const state = lazy(async () => {
const dir = path.join(Global.Path.data, "storage")
await fs.mkdir(dir, { recursive: true }).catch(() => {})
const migration = await Bun.file(path.join(dir, "migration"))
.json()
.then((x) => parseInt(x))
Expand Down Expand Up @@ -182,7 +191,7 @@ export namespace Storage {
using _ = await Lock.write(target)
const content = await Bun.file(target).json()
fn(content)
await Bun.write(target, JSON.stringify(content, null, 2))
await atomicWrite(target, JSON.stringify(content, null, 2))
return content as T
})
}
Expand All @@ -192,7 +201,7 @@ export namespace Storage {
const target = path.join(dir, ...key) + ".json"
return withErrorHandling(async () => {
using _ = await Lock.write(target)
await Bun.write(target, JSON.stringify(content, null, 2))
await atomicWrite(target, JSON.stringify(content, null, 2))
})
}

Expand All @@ -203,6 +212,9 @@ export namespace Storage {
if (errnoException.code === "ENOENT") {
throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` })
}
if (errnoException.code === "ENOSPC") {
throw new DiskFullError({ message: `No space left on device while writing storage: ${errnoException.path}` })
}
throw e
})
}
Expand All @@ -216,11 +228,205 @@ export namespace Storage {
cwd: path.join(dir, ...prefix),
onlyFiles: true,
}),
).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
).then((results) => (results as string[]).map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
result.sort()
return result
} catch {
return []
}
}

async function atomicWrite(target: string, data: string) {
const dir = path.dirname(target)
await fs.mkdir(dir, { recursive: true })
const tmp = path.join(
dir,
`.oc-${path.basename(target)}.${process.pid}.${Date.now()}.tmp`,
)
const fh = await fs.open(tmp, "w")
try {
await fh.writeFile(data)
const syncFn = (fh as any).sync as (() => Promise<void>) | undefined
if (typeof syncFn === "function") {
await syncFn.call(fh)
} else {
const fd = (fh as any).fd as number | undefined
if (typeof fd === "number") {
await new Promise<void>((resolve, reject) => nodefs.fsync(fd, (err) => (err ? reject(err) : resolve())))
}
}
} finally {
await fh.close().catch(() => {})
}
try {
await fs.rename(tmp, target)
const dirFh = await fs.open(dir, "r").catch(() => null as any)
try {
const dirSync = (dirFh as any)?.sync as (() => Promise<void>) | undefined
if (typeof dirSync === "function") await dirSync.call(dirFh)
else {
const dfd = (dirFh as any)?.fd as number | undefined
if (typeof dfd === "number") {
await new Promise<void>((resolve, reject) => nodefs.fsync(dfd, (err) => (err ? reject(err) : resolve())))
}
}
} finally {
await dirFh?.close?.().catch?.(() => {})
}
} catch (e) {
await fs.rm(tmp, { force: true }).catch(() => {})
throw e
}
}

export async function repair(options?: {
dryRun?: boolean
prefix?: string[]
maxFiles?: number
maxMiB?: number
reportPath?: string
}) {
const dir = await state().then((x) => x.dir)
const ts = Date.now()
const quarantineRoot = path.join(dir, "quarantine", String(ts))
const dryRun = !!options?.dryRun
const base = options?.prefix?.length ? path.join(dir, ...options.prefix) : dir
const maxFiles = options?.maxFiles && options.maxFiles > 0 ? options.maxFiles : Infinity
const maxBytes = options?.maxMiB && options.maxMiB > 0 ? Math.floor(options.maxMiB * 1024 * 1024) : Infinity

let quarantined = 0
let tempRemoved = 0
let skippedLocked = 0
let processedFiles = 0
let processedBytes = 0
const report: { action: string; from: string; to?: string; reason?: string }[] = []

if (!dryRun) await fs.mkdir(quarantineRoot, { recursive: true }).catch(() => {})

for await (const file of new Bun.Glob("**/*.json").scan({ cwd: base, absolute: true })) {
if (processedFiles >= maxFiles || processedBytes >= maxBytes) break
const stat = await fs.stat(file).catch(() => null as any)
const size = stat?.size ?? 0
if (processedBytes + size > maxBytes) break
processedFiles++
processedBytes += size
try {
await Bun.file(file).json()
} catch {
const lock = Lock.tryWrite(file)
if (!lock) {
skippedLocked++
report.push({ action: "skip", from: file, reason: "locked" })
continue
}
try {
const rel = path.relative(dir, file)
const dest = path.join(quarantineRoot, rel)
report.push({ action: dryRun ? "would-move" : "move", from: file, to: dest, reason: "invalid-json" })
if (!dryRun) {
await fs.mkdir(path.dirname(dest), { recursive: true })
await fs.rename(file, dest).catch(async () => {
const content = await Bun.file(file).arrayBuffer().catch(() => new ArrayBuffer(0))
await Bun.write(dest, new Uint8Array(content))
await fs.rm(file, { force: true }).catch(() => {})
})
}
quarantined++
} finally {
;(lock as any)?.[Symbol.dispose]?.()
}
}
}

const walk = async function* (p: string): AsyncGenerator<string> {
const s = await fs.stat(p).catch(() => null as any)
if (!s) return
if (s.isDirectory()) {
for (const entry of await fs.readdir(p).catch(() => [] as string[])) {
yield* walk(path.join(p, entry))
}
return
}
yield p
}

for await (const file of walk(base)) {
const baseName = path.basename(file)
if (baseName.startsWith(".oc-") && baseName.endsWith(".tmp")) {
report.push({ action: dryRun ? "would-remove" : "remove", from: file, reason: "leftover-temp" })
if (!dryRun) await fs.rm(file, { force: true }).catch(() => {})
tempRemoved++
}
}

const finalReportPath = options?.reportPath || path.join(quarantineRoot, "repair-report.json")
await fs.mkdir(path.dirname(finalReportPath), { recursive: true }).catch(() => {})
await Bun.write(
finalReportPath,
JSON.stringify(
{
time: ts,
base,
quarantined,
tempRemoved,
skippedLocked,
processedFiles,
processedBytes,
entries: report,
},
null,
2,
),
)

log.info("storage.repair complete", { quarantined, tempRemoved, skippedLocked })
return { quarantined, tempRemoved, skippedLocked, quarantineRoot, reportPath: finalReportPath }
}

export async function restore(input: { path: string; dryRun?: boolean }) {
const dir = await state().then((x) => x.dir)
const abs = path.resolve(input.path)
const parts = abs.split(path.sep)
const qIndex = parts.lastIndexOf("quarantine")
if (qIndex < 0 || qIndex + 1 >= parts.length) return { restored: 0, skippedLocked: 0 }
const qRoot = parts.slice(0, qIndex + 2).join(path.sep)
const restoredFiles: string[] = []
let restored = 0
let skippedLocked = 0

const walker = async function* (p: string): AsyncGenerator<string> {
const s = await fs.stat(p)
if (s.isDirectory()) {
for await (const item of await fs.readdir(p)) yield* walker(path.join(p, item))
return
}
yield p
}

for await (const src of walker(abs)) {
const rel = path.relative(qRoot, src)
if (rel.startsWith("..")) continue
const dest = path.join(dir, rel)
const lock = Lock.tryWrite(dest)
if (!lock) {
skippedLocked++
continue
}
try {
await fs.mkdir(path.dirname(dest), { recursive: true })
if (!input.dryRun) await fs.rename(src, dest).catch(async () => {
const content = await Bun.file(src).arrayBuffer().catch(() => new ArrayBuffer(0))
await Bun.write(dest, new Uint8Array(content))
await fs.rm(src, { force: true }).catch(() => {})
})
restored++
restoredFiles.push(dest)
} finally {
;(lock as any)?.[Symbol.dispose]?.()
}
}

log.info("storage.restore complete", { restored, skippedLocked })
return { restored, skippedLocked, files: restoredFiles }
}
}
Loading