-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Fix DB backup crash on large databases by streaming writes #1909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| import { existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs"; | ||
| import { readFile, writeFile } from "node:fs/promises"; | ||
| import { createReadStream, createWriteStream, existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs"; | ||
| import { basename, resolve } from "node:path"; | ||
| import { createInterface } from "node:readline"; | ||
| import postgres from "postgres"; | ||
|
|
||
| export type RunDatabaseBackupOptions = { | ||
|
|
@@ -153,23 +153,30 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| try { | ||
| await sql`SELECT 1`; | ||
|
|
||
| const lines: string[] = []; | ||
| const emit = (line: string) => lines.push(line); | ||
| const emitStatement = (statement: string) => { | ||
| emit(statement); | ||
| emit(STATEMENT_BREAKPOINT); | ||
| mkdirSync(opts.backupDir, { recursive: true }); | ||
| const backupFile = resolve(opts.backupDir, `${filenamePrefix}-${timestamp()}.sql`); | ||
| const stream = createWriteStream(backupFile, { encoding: "utf8" }); | ||
|
|
||
| const writeLine = async (line: string): Promise<void> => { | ||
| const ok = stream.write(line + "\n"); | ||
| if (!ok) await new Promise<void>((resolve) => stream.once("drain", resolve)); | ||
| }; | ||
|
Comment on lines
+158
to
163
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The Attach an error listener before writing: const stream = createWriteStream(backupFile, { encoding: "utf8" });
// Capture the first write error so it can be re-thrown via writeLine
let streamError: Error | null = null;
stream.once("error", (err) => { streamError = err; });
const writeLine = async (line: string): Promise<void> => {
if (streamError) throw streamError;
const ok = stream.write(line + "\n");
if (!ok) await new Promise<void>((resolve) => stream.once("drain", resolve));
if (streamError) throw streamError;
};Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/db/src/backup-lib.ts
Line: 158-163
Comment:
**No `error` event listener on the write stream**
`fs.WriteStream` emits an `'error'` event when a write fails (e.g., disk full, permission denied). Without a registered listener, this fires as an unhandled EventEmitter error in Node.js, which crashes the entire process — ironically the same outcome the PR is trying to prevent.
The `stream.end()` callback at line 513 only catches errors from the final flush/close, not from the intermediate `stream.write()` calls inside `writeLine`. A mid-stream disk error would still kill the process.
Attach an error listener before writing:
```typescript
const stream = createWriteStream(backupFile, { encoding: "utf8" });
// Capture the first write error so it can be re-thrown via writeLine
let streamError: Error | null = null;
stream.once("error", (err) => { streamError = err; });
const writeLine = async (line: string): Promise<void> => {
if (streamError) throw streamError;
const ok = stream.write(line + "\n");
if (!ok) await new Promise<void>((resolve) => stream.once("drain", resolve));
if (streamError) throw streamError;
};
```
How can I resolve this? If you propose a fix, please make it concise. |
||
| const emitStatementBoundary = () => { | ||
| emit(STATEMENT_BREAKPOINT); | ||
| const emit = writeLine; | ||
| const emitStatement = async (statement: string) => { | ||
| await emit(statement); | ||
| await emit(STATEMENT_BREAKPOINT); | ||
| }; | ||
| const emitStatementBoundary = async () => { | ||
| await emit(STATEMENT_BREAKPOINT); | ||
| }; | ||
|
|
||
| emit("-- Paperclip database backup"); | ||
| emit(`-- Created: ${new Date().toISOString()}`); | ||
| emit(""); | ||
| emitStatement("BEGIN;"); | ||
| emitStatement("SET LOCAL session_replication_role = replica;"); | ||
| emitStatement("SET LOCAL client_min_messages = warning;"); | ||
| emit(""); | ||
| await emit("-- Paperclip database backup"); | ||
| await emit(`-- Created: ${new Date().toISOString()}`); | ||
| await emit(""); | ||
| await emitStatement("BEGIN;"); | ||
| await emitStatement("SET LOCAL session_replication_role = replica;"); | ||
| await emitStatement("SET LOCAL client_min_messages = warning;"); | ||
| await emit(""); | ||
|
|
||
| const allTables = await sql<TableDefinition[]>` | ||
| SELECT table_schema AS schema_name, table_name AS tablename | ||
|
|
@@ -197,9 +204,9 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
|
|
||
| for (const e of enums) { | ||
| const labels = e.labels.map((l) => `'${l.replace(/'/g, "''")}'`).join(", "); | ||
| emitStatement(`CREATE TYPE "public"."${e.typname}" AS ENUM (${labels});`); | ||
| await emitStatement(`CREATE TYPE "public"."${e.typname}" AS ENUM (${labels});`); | ||
| } | ||
| if (enums.length > 0) emit(""); | ||
| if (enums.length > 0) await emit(""); | ||
|
|
||
| const allSequences = await sql<SequenceDefinition[]>` | ||
| SELECT | ||
|
|
@@ -234,23 +241,23 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| for (const seq of sequences) schemas.add(seq.sequence_schema); | ||
| const extraSchemas = [...schemas].filter((schemaName) => schemaName !== "public"); | ||
| if (extraSchemas.length > 0) { | ||
| emit("-- Schemas"); | ||
| await emit("-- Schemas"); | ||
| for (const schemaName of extraSchemas) { | ||
| emitStatement(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(schemaName)};`); | ||
| await emitStatement(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(schemaName)};`); | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| if (sequences.length > 0) { | ||
| emit("-- Sequences"); | ||
| await emit("-- Sequences"); | ||
| for (const seq of sequences) { | ||
| const qualifiedSequenceName = quoteQualifiedName(seq.sequence_schema, seq.sequence_name); | ||
| emitStatement(`DROP SEQUENCE IF EXISTS ${qualifiedSequenceName} CASCADE;`); | ||
| emitStatement( | ||
| await emitStatement(`DROP SEQUENCE IF EXISTS ${qualifiedSequenceName} CASCADE;`); | ||
| await emitStatement( | ||
| `CREATE SEQUENCE ${qualifiedSequenceName} AS ${seq.data_type} INCREMENT BY ${seq.increment} MINVALUE ${seq.minimum_value} MAXVALUE ${seq.maximum_value} START WITH ${seq.start_value}${seq.cycle_option === "YES" ? " CYCLE" : " NO CYCLE"};`, | ||
| ); | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| // Get full CREATE TABLE DDL via column info | ||
|
|
@@ -273,8 +280,8 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| ORDER BY ordinal_position | ||
| `; | ||
|
|
||
| emit(`-- Table: ${schema_name}.${tablename}`); | ||
| emitStatement(`DROP TABLE IF EXISTS ${qualifiedTableName} CASCADE;`); | ||
| await emit(`-- Table: ${schema_name}.${tablename}`); | ||
| await emitStatement(`DROP TABLE IF EXISTS ${qualifiedTableName} CASCADE;`); | ||
|
|
||
| const colDefs: string[] = []; | ||
| for (const col of columns) { | ||
|
|
@@ -318,22 +325,22 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| colDefs.push(` CONSTRAINT "${p.constraint_name}" PRIMARY KEY (${cols})`); | ||
| } | ||
|
|
||
| emit(`CREATE TABLE ${qualifiedTableName} (`); | ||
| emit(colDefs.join(",\n")); | ||
| emit(");"); | ||
| emitStatementBoundary(); | ||
| emit(""); | ||
| await emit(`CREATE TABLE ${qualifiedTableName} (`); | ||
| await emit(colDefs.join(",\n")); | ||
| await emit(");"); | ||
| await emitStatementBoundary(); | ||
| await emit(""); | ||
| } | ||
|
|
||
| const ownedSequences = sequences.filter((seq) => seq.owner_table && seq.owner_column); | ||
| if (ownedSequences.length > 0) { | ||
| emit("-- Sequence ownership"); | ||
| await emit("-- Sequence ownership"); | ||
| for (const seq of ownedSequences) { | ||
| emitStatement( | ||
| await emitStatement( | ||
| `ALTER SEQUENCE ${quoteQualifiedName(seq.sequence_schema, seq.sequence_name)} OWNED BY ${quoteQualifiedName(seq.owner_schema ?? "public", seq.owner_table!)}.${quoteIdentifier(seq.owner_column!)};`, | ||
| ); | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| // Foreign keys (after all tables created) | ||
|
|
@@ -378,15 +385,15 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| ); | ||
|
|
||
| if (fks.length > 0) { | ||
| emit("-- Foreign keys"); | ||
| await emit("-- Foreign keys"); | ||
| for (const fk of fks) { | ||
| const srcCols = fk.source_columns.map((c) => `"${c}"`).join(", "); | ||
| const tgtCols = fk.target_columns.map((c) => `"${c}"`).join(", "); | ||
| emitStatement( | ||
| await emitStatement( | ||
| `ALTER TABLE ${quoteQualifiedName(fk.source_schema, fk.source_table)} ADD CONSTRAINT "${fk.constraint_name}" FOREIGN KEY (${srcCols}) REFERENCES ${quoteQualifiedName(fk.target_schema, fk.target_table)} (${tgtCols}) ON UPDATE ${fk.update_rule} ON DELETE ${fk.delete_rule};`, | ||
| ); | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| // Unique constraints | ||
|
|
@@ -414,12 +421,12 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| const uniques = allUniqueConstraints.filter((entry) => includedTableNames.has(tableKey(entry.schema_name, entry.tablename))); | ||
|
|
||
| if (uniques.length > 0) { | ||
| emit("-- Unique constraints"); | ||
| await emit("-- Unique constraints"); | ||
| for (const u of uniques) { | ||
| const cols = u.column_names.map((c) => `"${c}"`).join(", "); | ||
| emitStatement(`ALTER TABLE ${quoteQualifiedName(u.schema_name, u.tablename)} ADD CONSTRAINT "${u.constraint_name}" UNIQUE (${cols});`); | ||
| await emitStatement(`ALTER TABLE ${quoteQualifiedName(u.schema_name, u.tablename)} ADD CONSTRAINT "${u.constraint_name}" UNIQUE (${cols});`); | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| // Indexes (non-primary, non-unique-constraint) | ||
|
|
@@ -440,11 +447,11 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| const indexes = allIndexes.filter((entry) => includedTableNames.has(tableKey(entry.schema_name, entry.tablename))); | ||
|
|
||
| if (indexes.length > 0) { | ||
| emit("-- Indexes"); | ||
| await emit("-- Indexes"); | ||
| for (const idx of indexes) { | ||
| emitStatement(`${idx.indexdef};`); | ||
| await emitStatement(`${idx.indexdef};`); | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| // Dump data for each table | ||
|
|
@@ -462,7 +469,7 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| `; | ||
| const colNames = cols.map((c) => `"${c.column_name}"`).join(", "); | ||
|
|
||
| emit(`-- Data for: ${schema_name}.${tablename} (${count[0]!.n} rows)`); | ||
| await emit(`-- Data for: ${schema_name}.${tablename} (${count[0]!.n} rows)`); | ||
|
|
||
| const rows = await sql.unsafe(`SELECT * FROM ${qualifiedTableName}`).values(); | ||
| const nullifiedColumns = nullifiedColumnsByTable.get(tablename) ?? new Set<string>(); | ||
|
|
@@ -477,14 +484,14 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| if (typeof val === "object") return formatSqlLiteral(JSON.stringify(val)); | ||
| return formatSqlLiteral(String(val)); | ||
| }); | ||
| emitStatement(`INSERT INTO ${qualifiedTableName} (${colNames}) VALUES (${values.join(", ")});`); | ||
| await emitStatement(`INSERT INTO ${qualifiedTableName} (${colNames}) VALUES (${values.join(", ")});`); | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| // Sequence values | ||
| if (sequences.length > 0) { | ||
| emit("-- Sequence values"); | ||
| await emit("-- Sequence values"); | ||
| for (const seq of sequences) { | ||
| const qualifiedSequenceName = quoteQualifiedName(seq.sequence_schema, seq.sequence_name); | ||
| const val = await sql.unsafe<{ last_value: string; is_called: boolean }[]>( | ||
|
|
@@ -494,19 +501,16 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise | |
| seq.owner_table !== null | ||
| && excludedTableNames.has(seq.owner_table); | ||
| if (val[0] && !skipSequenceValue) { | ||
| emitStatement(`SELECT setval('${qualifiedSequenceName.replaceAll("'", "''")}', ${val[0].last_value}, ${val[0].is_called ? "true" : "false"});`); | ||
| await emitStatement(`SELECT setval('${qualifiedSequenceName.replaceAll("'", "''")}', ${val[0].last_value}, ${val[0].is_called ? "true" : "false"});`); | ||
| } | ||
| } | ||
| emit(""); | ||
| await emit(""); | ||
| } | ||
|
|
||
| emitStatement("COMMIT;"); | ||
| emit(""); | ||
| await emitStatement("COMMIT;"); | ||
| await emit(""); | ||
|
|
||
| // Write the backup file | ||
| mkdirSync(opts.backupDir, { recursive: true }); | ||
| const backupFile = resolve(opts.backupDir, `${filenamePrefix}-${timestamp()}.sql`); | ||
| await writeFile(backupFile, lines.join("\n"), "utf8"); | ||
| await new Promise<void>((resolve, reject) => stream.end((err: Error | null | undefined) => (err ? reject(err) : resolve()))); | ||
|
|
||
| const sizeBytes = statSync(backupFile).size; | ||
| const prunedCount = pruneOldBackups(opts.backupDir, retentionDays, filenamePrefix); | ||
|
|
@@ -527,14 +531,27 @@ export async function runDatabaseRestore(opts: RunDatabaseRestoreOptions): Promi | |
|
|
||
| try { | ||
| await sql`SELECT 1`; | ||
| const contents = await readFile(opts.backupFile, "utf8"); | ||
| const statements = contents | ||
| .split(STATEMENT_BREAKPOINT) | ||
| .map((statement) => statement.trim()) | ||
| .filter((statement) => statement.length > 0); | ||
|
|
||
| for (const statement of statements) { | ||
| await sql.unsafe(statement).execute(); | ||
|
|
||
| const rl = createInterface({ | ||
| input: createReadStream(opts.backupFile, { encoding: "utf8" }), | ||
| crlfDelay: Infinity, | ||
| }); | ||
|
|
||
| let currentStatement = ""; | ||
| for await (const line of rl) { | ||
| if (line === STATEMENT_BREAKPOINT) { | ||
| const statement = currentStatement.trim(); | ||
| if (statement.length > 0) { | ||
| await sql.unsafe(statement).execute(); | ||
| } | ||
| currentStatement = ""; | ||
| } else { | ||
| currentStatement += (currentStatement.length > 0 ? "\n" : "") + line; | ||
| } | ||
| } | ||
| const remaining = currentStatement.trim(); | ||
| if (remaining.length > 0) { | ||
| await sql.unsafe(remaining).execute(); | ||
| } | ||
| } catch (error) { | ||
| const statementPreview = typeof error === "object" && error !== null && typeof (error as Record<string, unknown>).query === "string" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The write stream is opened at the very start of the
tryblock (line 158), but thefinallyblock only callsawait sql.end(). If any SQL query or write call throws an error mid-backup, the stream is never closed/destroyed and the partial.sqlfile is silently left on disk.This is a regression from the previous implementation, where
writeFilewas called only after all SQL was fully buffered — meaning a failure during SQL queries left no file at all. Now a partial file (with a normal timestamp-based filename) persists, which could be picked up by restore tooling or confuse operators.The stream should be cleaned up in a
finally-style guard. For example:Prompt To Fix With AI