diff --git a/packages/db/src/backup-lib.ts b/packages/db/src/backup-lib.ts index 26f918c3d7..882047e70c 100644 --- a/packages/db/src/backup-lib.ts +++ b/packages/db/src/backup-lib.ts @@ -1,6 +1,6 @@ -import { existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs"; -import { readFile, writeFile } from "node:fs/promises"; +import { createReadStream, createWriteStream, existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs"; import { basename, resolve } from "node:path"; +import { createInterface } from "node:readline"; import postgres from "postgres"; export type RunDatabaseBackupOptions = { @@ -153,23 +153,30 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise try { await sql`SELECT 1`; - const lines: string[] = []; - const emit = (line: string) => lines.push(line); - const emitStatement = (statement: string) => { - emit(statement); - emit(STATEMENT_BREAKPOINT); + mkdirSync(opts.backupDir, { recursive: true }); + const backupFile = resolve(opts.backupDir, `${filenamePrefix}-${timestamp()}.sql`); + const stream = createWriteStream(backupFile, { encoding: "utf8" }); + + const writeLine = async (line: string): Promise => { + const ok = stream.write(line + "\n"); + if (!ok) await new Promise((resolve) => stream.once("drain", resolve)); }; - const emitStatementBoundary = () => { - emit(STATEMENT_BREAKPOINT); + const emit = writeLine; + const emitStatement = async (statement: string) => { + await emit(statement); + await emit(STATEMENT_BREAKPOINT); + }; + const emitStatementBoundary = async () => { + await emit(STATEMENT_BREAKPOINT); }; - emit("-- Paperclip database backup"); - emit(`-- Created: ${new Date().toISOString()}`); - emit(""); - emitStatement("BEGIN;"); - emitStatement("SET LOCAL session_replication_role = replica;"); - emitStatement("SET LOCAL client_min_messages = warning;"); - emit(""); + await emit("-- Paperclip database backup"); + await emit(`-- Created: ${new Date().toISOString()}`); + await emit(""); + await emitStatement("BEGIN;"); + await emitStatement("SET LOCAL session_replication_role = replica;"); + await emitStatement("SET LOCAL client_min_messages = warning;"); + await emit(""); const allTables = await sql` SELECT table_schema AS schema_name, table_name AS tablename @@ -197,9 +204,9 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise for (const e of enums) { const labels = e.labels.map((l) => `'${l.replace(/'/g, "''")}'`).join(", "); - emitStatement(`CREATE TYPE "public"."${e.typname}" AS ENUM (${labels});`); + await emitStatement(`CREATE TYPE "public"."${e.typname}" AS ENUM (${labels});`); } - if (enums.length > 0) emit(""); + if (enums.length > 0) await emit(""); const allSequences = await sql` SELECT @@ -234,23 +241,23 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise for (const seq of sequences) schemas.add(seq.sequence_schema); const extraSchemas = [...schemas].filter((schemaName) => schemaName !== "public"); if (extraSchemas.length > 0) { - emit("-- Schemas"); + await emit("-- Schemas"); for (const schemaName of extraSchemas) { - emitStatement(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(schemaName)};`); + await emitStatement(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(schemaName)};`); } - emit(""); + await emit(""); } if (sequences.length > 0) { - emit("-- Sequences"); + await emit("-- Sequences"); for (const seq of sequences) { const qualifiedSequenceName = quoteQualifiedName(seq.sequence_schema, seq.sequence_name); - emitStatement(`DROP SEQUENCE IF EXISTS ${qualifiedSequenceName} CASCADE;`); - emitStatement( + await emitStatement(`DROP SEQUENCE IF EXISTS ${qualifiedSequenceName} CASCADE;`); + await emitStatement( `CREATE SEQUENCE ${qualifiedSequenceName} AS ${seq.data_type} INCREMENT BY ${seq.increment} MINVALUE ${seq.minimum_value} MAXVALUE ${seq.maximum_value} START WITH ${seq.start_value}${seq.cycle_option === "YES" ? " CYCLE" : " NO CYCLE"};`, ); } - emit(""); + await emit(""); } // Get full CREATE TABLE DDL via column info @@ -273,8 +280,8 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise ORDER BY ordinal_position `; - emit(`-- Table: ${schema_name}.${tablename}`); - emitStatement(`DROP TABLE IF EXISTS ${qualifiedTableName} CASCADE;`); + await emit(`-- Table: ${schema_name}.${tablename}`); + await emitStatement(`DROP TABLE IF EXISTS ${qualifiedTableName} CASCADE;`); const colDefs: string[] = []; for (const col of columns) { @@ -318,22 +325,22 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise colDefs.push(` CONSTRAINT "${p.constraint_name}" PRIMARY KEY (${cols})`); } - emit(`CREATE TABLE ${qualifiedTableName} (`); - emit(colDefs.join(",\n")); - emit(");"); - emitStatementBoundary(); - emit(""); + await emit(`CREATE TABLE ${qualifiedTableName} (`); + await emit(colDefs.join(",\n")); + await emit(");"); + await emitStatementBoundary(); + await emit(""); } const ownedSequences = sequences.filter((seq) => seq.owner_table && seq.owner_column); if (ownedSequences.length > 0) { - emit("-- Sequence ownership"); + await emit("-- Sequence ownership"); for (const seq of ownedSequences) { - emitStatement( + await emitStatement( `ALTER SEQUENCE ${quoteQualifiedName(seq.sequence_schema, seq.sequence_name)} OWNED BY ${quoteQualifiedName(seq.owner_schema ?? "public", seq.owner_table!)}.${quoteIdentifier(seq.owner_column!)};`, ); } - emit(""); + await emit(""); } // Foreign keys (after all tables created) @@ -378,15 +385,15 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise ); if (fks.length > 0) { - emit("-- Foreign keys"); + await emit("-- Foreign keys"); for (const fk of fks) { const srcCols = fk.source_columns.map((c) => `"${c}"`).join(", "); const tgtCols = fk.target_columns.map((c) => `"${c}"`).join(", "); - emitStatement( + await emitStatement( `ALTER TABLE ${quoteQualifiedName(fk.source_schema, fk.source_table)} ADD CONSTRAINT "${fk.constraint_name}" FOREIGN KEY (${srcCols}) REFERENCES ${quoteQualifiedName(fk.target_schema, fk.target_table)} (${tgtCols}) ON UPDATE ${fk.update_rule} ON DELETE ${fk.delete_rule};`, ); } - emit(""); + await emit(""); } // Unique constraints @@ -414,12 +421,12 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise const uniques = allUniqueConstraints.filter((entry) => includedTableNames.has(tableKey(entry.schema_name, entry.tablename))); if (uniques.length > 0) { - emit("-- Unique constraints"); + await emit("-- Unique constraints"); for (const u of uniques) { const cols = u.column_names.map((c) => `"${c}"`).join(", "); - emitStatement(`ALTER TABLE ${quoteQualifiedName(u.schema_name, u.tablename)} ADD CONSTRAINT "${u.constraint_name}" UNIQUE (${cols});`); + await emitStatement(`ALTER TABLE ${quoteQualifiedName(u.schema_name, u.tablename)} ADD CONSTRAINT "${u.constraint_name}" UNIQUE (${cols});`); } - emit(""); + await emit(""); } // Indexes (non-primary, non-unique-constraint) @@ -440,11 +447,11 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise const indexes = allIndexes.filter((entry) => includedTableNames.has(tableKey(entry.schema_name, entry.tablename))); if (indexes.length > 0) { - emit("-- Indexes"); + await emit("-- Indexes"); for (const idx of indexes) { - emitStatement(`${idx.indexdef};`); + await emitStatement(`${idx.indexdef};`); } - emit(""); + await emit(""); } // Dump data for each table @@ -462,7 +469,7 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise `; const colNames = cols.map((c) => `"${c.column_name}"`).join(", "); - emit(`-- Data for: ${schema_name}.${tablename} (${count[0]!.n} rows)`); + await emit(`-- Data for: ${schema_name}.${tablename} (${count[0]!.n} rows)`); const rows = await sql.unsafe(`SELECT * FROM ${qualifiedTableName}`).values(); const nullifiedColumns = nullifiedColumnsByTable.get(tablename) ?? new Set(); @@ -477,14 +484,14 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise if (typeof val === "object") return formatSqlLiteral(JSON.stringify(val)); return formatSqlLiteral(String(val)); }); - emitStatement(`INSERT INTO ${qualifiedTableName} (${colNames}) VALUES (${values.join(", ")});`); + await emitStatement(`INSERT INTO ${qualifiedTableName} (${colNames}) VALUES (${values.join(", ")});`); } - emit(""); + await emit(""); } // Sequence values if (sequences.length > 0) { - emit("-- Sequence values"); + await emit("-- Sequence values"); for (const seq of sequences) { const qualifiedSequenceName = quoteQualifiedName(seq.sequence_schema, seq.sequence_name); const val = await sql.unsafe<{ last_value: string; is_called: boolean }[]>( @@ -494,19 +501,16 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise seq.owner_table !== null && excludedTableNames.has(seq.owner_table); if (val[0] && !skipSequenceValue) { - emitStatement(`SELECT setval('${qualifiedSequenceName.replaceAll("'", "''")}', ${val[0].last_value}, ${val[0].is_called ? "true" : "false"});`); + await emitStatement(`SELECT setval('${qualifiedSequenceName.replaceAll("'", "''")}', ${val[0].last_value}, ${val[0].is_called ? "true" : "false"});`); } } - emit(""); + await emit(""); } - emitStatement("COMMIT;"); - emit(""); + await emitStatement("COMMIT;"); + await emit(""); - // Write the backup file - mkdirSync(opts.backupDir, { recursive: true }); - const backupFile = resolve(opts.backupDir, `${filenamePrefix}-${timestamp()}.sql`); - await writeFile(backupFile, lines.join("\n"), "utf8"); + await new Promise((resolve, reject) => stream.end((err: Error | null | undefined) => (err ? reject(err) : resolve()))); const sizeBytes = statSync(backupFile).size; const prunedCount = pruneOldBackups(opts.backupDir, retentionDays, filenamePrefix); @@ -527,14 +531,27 @@ export async function runDatabaseRestore(opts: RunDatabaseRestoreOptions): Promi try { await sql`SELECT 1`; - const contents = await readFile(opts.backupFile, "utf8"); - const statements = contents - .split(STATEMENT_BREAKPOINT) - .map((statement) => statement.trim()) - .filter((statement) => statement.length > 0); - - for (const statement of statements) { - await sql.unsafe(statement).execute(); + + const rl = createInterface({ + input: createReadStream(opts.backupFile, { encoding: "utf8" }), + crlfDelay: Infinity, + }); + + let currentStatement = ""; + for await (const line of rl) { + if (line === STATEMENT_BREAKPOINT) { + const statement = currentStatement.trim(); + if (statement.length > 0) { + await sql.unsafe(statement).execute(); + } + currentStatement = ""; + } else { + currentStatement += (currentStatement.length > 0 ? "\n" : "") + line; + } + } + const remaining = currentStatement.trim(); + if (remaining.length > 0) { + await sql.unsafe(remaining).execute(); } } catch (error) { const statementPreview = typeof error === "object" && error !== null && typeof (error as Record).query === "string"