diff --git a/CHANGELOG.md b/CHANGELOG.md index c42ac800..96a6d07e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ## [Unreleased] +### Changes + +- **PostgreSQL backend**: add PostgreSQL + pgvector as an alternative storage + backend for shared and multi-agent deployments where multiple processes need + concurrent access to the same QMD index. Configure with `QMD_BACKEND=postgres` + and `QMD_POSTGRES_URL`. + ## [2.0.1] - 2026-03-10 ### Changes diff --git a/README.md b/README.md index 3c39c6ce..fe463b5f 100644 --- a/README.md +++ b/README.md @@ -731,7 +731,10 @@ qmd cleanup ## Data Storage -Index stored in: `~/.cache/qmd/index.sqlite` +Default index (SQLite) stored in: `~/.cache/qmd/index.sqlite` + +When using PostgreSQL (`QMD_BACKEND=postgres`), QMD stores all index data in the +database specified by `QMD_POSTGRES_URL`. ### Schema @@ -750,6 +753,26 @@ llm_cache -- Cached LLM responses (query expansion, rerank scores) | Variable | Default | Description | |----------|---------|-------------| | `XDG_CACHE_HOME` | `~/.cache` | Cache directory location | +| `QMD_BACKEND` | `sqlite` | Storage backend (`sqlite` or `postgres`) | +| `QMD_POSTGRES_URL` | _(unset)_ | PostgreSQL URL used when `QMD_BACKEND=postgres` | + +### PostgreSQL Backend + +QMD supports PostgreSQL + pgvector as an alternative backend for shared or +multi-agent deployments where multiple processes need concurrent access to the +same index. SQLite remains the simplest default for single-user local use. + +```sh +export QMD_BACKEND=postgres +export QMD_POSTGRES_URL=postgresql://user:pass@localhost:5432/qmd + +# initialize / migrate schema on first run +qmd status +``` + +Requirements: +- PostgreSQL with `pgvector` installed +- `vector` extension available in the target database (`CREATE EXTENSION vector;`) ## How It Works diff --git a/bun.lock b/bun.lock index 9cb44a60..b8788f94 100644 --- a/bun.lock +++ b/bun.lock @@ -10,6 +10,7 @@ "fast-glob": "^3.3.0", "node-llama-cpp": "^3.17.1", "picomatch": "^4.0.0", + "postgres": "^3.4.8", "sqlite-vec": "^0.1.7-alpha.2", "yaml": "^2.8.2", "zod": "^4.2.1", @@ -22,8 +23,9 @@ "optionalDependencies": { "sqlite-vec-darwin-arm64": "^0.1.7-alpha.2", "sqlite-vec-darwin-x64": "^0.1.7-alpha.2", + "sqlite-vec-linux-arm64": "^0.1.7-alpha.2", "sqlite-vec-linux-x64": "^0.1.7-alpha.2", - "sqlite-vec-win32-x64": "^0.1.7-alpha.2", + "sqlite-vec-windows-x64": "^0.1.7-alpha.2", }, "peerDependencies": { "typescript": "^5.9.3", @@ -539,6 +541,8 @@ "postcss": ["postcss@8.5.6", "", { "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", "source-map-js": "^1.2.1" } }, "sha512-3Ybi1tAuwAP9s0r1UQ2J4n5Y0G05bJkpUIO0/bI9MhwmD70S5aTWbXGBwxHrelT+XM1k6dM0pk+SwNkpTRN7Pg=="], + "postgres": ["postgres@3.4.8", "", {}, "sha512-d+JFcLM17njZaOLkv6SCev7uoLaBtfK86vMUXhW1Z4glPWh4jozno9APvW/XKFJ3CCxVoC7OL38BqRydtu5nGg=="], + "prebuild-install": ["prebuild-install@7.1.3", "", { "dependencies": { "detect-libc": "^2.0.0", "expand-template": "^2.0.3", "github-from-package": "0.0.0", "minimist": "^1.2.3", "mkdirp-classic": "^0.5.3", "napi-build-utils": "^2.0.0", "node-abi": "^3.3.0", "pump": "^3.0.0", "rc": "^1.2.7", "simple-get": "^4.0.0", "tar-fs": "^2.0.0", "tunnel-agent": "^0.6.0" }, "bin": { "prebuild-install": "bin.js" } }, "sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug=="], "pretty-bytes": ["pretty-bytes@6.1.1", "", {}, "sha512-mQUvGU6aUFQ+rNvTIAcZuWGRT9a6f6Yrg9bHs4ImKF+HZCEK+plBvnAZYSIQztknZF2qnzNtr6F8s0+IuptdlQ=="], diff --git a/package.json b/package.json index 95c51a4b..84f2faa0 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,7 @@ "fast-glob": "^3.3.0", "node-llama-cpp": "^3.17.1", "picomatch": "^4.0.0", + "postgres": "^3.4.8", "sqlite-vec": "^0.1.7-alpha.2", "yaml": "^2.8.2", "zod": "^4.2.1" diff --git a/src/cli/qmd.ts b/src/cli/qmd.ts index 52a076da..d85c25f5 100755 --- a/src/cli/qmd.ts +++ b/src/cli/qmd.ts @@ -308,10 +308,18 @@ async function showStatus(): Promise { // Index size let indexSize = 0; - try { - const stat = statSync(dbPath).size; - indexSize = stat; - } catch { } + const backend = process.env.QMD_BACKEND || "sqlite"; + if (backend === "postgres") { + try { + const sizeResult = db.prepare(`SELECT pg_database_size(current_database()) as size`).get() as { size: number }; + indexSize = sizeResult?.size ?? 0; + } catch { } + } else { + try { + const stat = statSync(dbPath).size; + indexSize = stat; + } catch { } + } // Collections info (from YAML + database stats) const collections = listCollections(db); @@ -325,7 +333,14 @@ async function showStatus(): Promise { const mostRecent = db.prepare(`SELECT MAX(modified_at) as latest FROM documents WHERE active = 1`).get() as { latest: string | null }; console.log(`${c.bold}QMD Status${c.reset}\n`); - console.log(`Index: ${dbPath}`); + if (backend === "postgres") { + const pgUrl = process.env.QMD_POSTGRES_URL || "postgresql://localhost/qmd"; + // Show just host/db, strip credentials + const dbName = pgUrl.split("/").pop()?.split("?")[0] || "qmd"; + console.log(`Backend: PostgreSQL (${dbName})`); + } else { + console.log(`Index: ${dbPath}`); + } console.log(`Size: ${formatBytes(indexSize)}`); // MCP daemon status (check PID file liveness) diff --git a/src/db.ts b/src/db.ts index 1e5e5709..21be302c 100644 --- a/src/db.ts +++ b/src/db.ts @@ -1,13 +1,56 @@ /** - * db.ts - Cross-runtime SQLite compatibility layer + * db.ts - Cross-runtime SQLite compatibility layer + backend selection. * * Provides a unified Database export that works under both Bun (bun:sqlite) * and Node.js (better-sqlite3). The APIs are nearly identical — the main * difference is the import path. + * + * Backend selection (QMD_BACKEND env var): + * - 'sqlite' (default): uses SQLite via bun:sqlite or better-sqlite3 + * - 'postgres': uses PostgreSQL via pg-worker + Atomics sync wrapper */ export const isBun = typeof globalThis.Bun !== "undefined"; +// --------------------------------------------------------------------------- +// Backend selection +// --------------------------------------------------------------------------- + +export type Backend = 'sqlite' | 'postgres'; + +/** + * Return the active backend. Reads QMD_BACKEND env; defaults to 'sqlite'. + */ +export function getBackend(): Backend { + const v = process.env.QMD_BACKEND; + if (v === 'postgres') return 'postgres'; + return 'sqlite'; +} + +// Loaded eagerly so backend can be switched before createStore() in tests. +let _openPgDatabase: ((url: string) => Database) | null = null; + +try { + const pg = await import('./pg.js'); + _openPgDatabase = pg.openPgDatabase; +} catch (err) { + if (getBackend() === 'postgres') { + throw err; + } +} + +/** + * Open a PostgreSQL database. Returns a Database-compatible object that + * wraps a worker thread + Atomics for synchronous-looking access. + * Only available when QMD_BACKEND=postgres. + */ +export function openPgDatabase(url: string): Database { + if (!_openPgDatabase) { + throw new Error('PostgreSQL backend is unavailable in this runtime.'); + } + return _openPgDatabase(url); +} + let _Database: any; let _sqliteVecLoad: (db: any) => void; diff --git a/src/index.ts b/src/index.ts index b921b514..2d39039a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -66,6 +66,7 @@ import { import { LlamaCpp, } from "./llm.js"; +import type { Backend } from "./db.js"; import { setConfigSource, loadConfig, @@ -106,7 +107,7 @@ export type { }; // Re-export the internal Store type for advanced consumers -export type { InternalStore }; +export type { Backend, InternalStore }; // Re-export utility functions used by frontends export { extractSnippet, addLineNumbers, DEFAULT_MULTI_GET_MAX_BYTES }; @@ -194,7 +195,7 @@ export interface ExpandQueryOptions { * DB state (useful for reopening a previously-configured store). */ export interface StoreOptions { - /** Path to the SQLite database file */ + /** Path to the SQLite database file or PostgreSQL connection URL */ dbPath: string; /** Path to a YAML config file (mutually exclusive with `config`) */ configPath?: string; @@ -212,7 +213,9 @@ export interface StoreOptions { export interface QMDStore { /** The underlying internal store (for advanced use) */ readonly internal: InternalStore; - /** Path to the SQLite database */ + /** Active storage backend */ + readonly backend: Backend; + /** Active SQLite database path or PostgreSQL connection URL */ readonly dbPath: string; // ── Search ────────────────────────────────────────────────────────── @@ -343,7 +346,7 @@ export async function createStore(options: StoreOptions): Promise { // Track whether we have a YAML config path for write-through const hasYamlConfig = !!options.configPath; - // Sync config into SQLite store_collections + // Sync config into the store_collections table for the active backend if (options.configPath) { // YAML mode: inject config source for write-through, sync to DB setConfigSource({ configPath: options.configPath }); @@ -366,6 +369,7 @@ export async function createStore(options: StoreOptions): Promise { const store: QMDStore = { internal, + backend: internal.backend, dbPath: internal.dbPath, // Search @@ -413,7 +417,7 @@ export async function createStore(options: StoreOptions): Promise { }, multiGet: async (pattern, opts) => internal.findDocuments(pattern, opts), - // Collection Management — write to SQLite + write-through to YAML/inline if configured + // Collection Management — write to store tables + write-through to YAML/inline if configured addCollection: async (name, opts) => { upsertStoreCollection(db, name, { path: opts.path, pattern: opts.pattern, ignore: opts.ignore }); if (hasYamlConfig || options.config) { @@ -440,7 +444,7 @@ export async function createStore(options: StoreOptions): Promise { return collections.filter(c => c.includeByDefault).map(c => c.name); }, - // Context Management — write to SQLite + write-through to YAML/inline if configured + // Context Management — write to store tables + write-through to YAML/inline if configured addContext: async (collectionName, pathPrefix, contextText) => { const result = updateStoreContext(db, collectionName, pathPrefix, contextText); if (hasYamlConfig || options.config) { @@ -464,7 +468,7 @@ export async function createStore(options: StoreOptions): Promise { getGlobalContext: async () => getStoreGlobalContext(db), listContexts: async () => getStoreContexts(db), - // Indexing — reads collections from SQLite + // Indexing — reads collections from store_collections update: async (updateOpts) => { const collections = getStoreCollections(db); const filtered = updateOpts?.collections diff --git a/src/pg-worker.ts b/src/pg-worker.ts new file mode 100644 index 00000000..99720f77 --- /dev/null +++ b/src/pg-worker.ts @@ -0,0 +1,100 @@ +/** + * pg-worker.ts — Worker thread that manages a PostgreSQL connection pool. + * + * Runs inside a worker_threads Worker. The main thread sends query messages + * and blocks on a SharedArrayBuffer using Atomics. This worker executes the + * async postgres query, writes the result to the message port, then + * signals the main thread via Atomics.notify(). + * + * Protocol: + * Main → Worker: { type, query, params } + * Worker → Main: { result, error } + * Worker: Atomics.store(sharedInt32, 0, 1); Atomics.notify(sharedInt32, 0) + */ + +import { workerData } from 'node:worker_threads'; +import postgres from 'postgres'; + +const pgUrl: string = workerData.pgUrl; +const sharedBuffer: SharedArrayBuffer = workerData.sharedBuffer; +const port = workerData.port; + +const sharedInt32 = new Int32Array(sharedBuffer); + +// Single connection — one query at a time (matching synchronous caller semantics) +const sql = postgres(pgUrl, { + max: 1, + idle_timeout: 60, + connect_timeout: 10, + // Parse int8 (bigint) as regular JS numbers to match SQLite behavior + types: { + bigint: { + to: 20, + from: [20], + serialize: (x: bigint | number | string) => String(x), + parse: (x: string) => Number(x), + }, + }, +}); + +/** + * Convert BigInt values in a row to Number to ensure postMessage + * compatibility and match SQLite's numeric behavior. + */ +function normalizeRow(row: Record): Record { + const out: Record = {}; + for (const [k, v] of Object.entries(row)) { + out[k] = typeof v === 'bigint' ? Number(v) : v; + } + return out; +} + +function normalizeRows(rows: readonly Record[]): Record[] { + return rows.map(normalizeRow); +} + +type QueryMessage = { + type: 'exec' | 'run' | 'get' | 'all' | 'close'; + query: string; + params: unknown[]; +}; + +port.on('message', async (msg: QueryMessage) => { + const { type, query, params } = msg; + let result: unknown = null; + let error: string | null = null; + + try { + if (type === 'close') { + await sql.end({ timeout: 5 }); + result = null; + } else if (type === 'exec') { + await sql.unsafe(query, []); + result = { changes: 0, lastInsertRowid: 0 }; + } else if (type === 'run') { + const rows = await sql.unsafe(query, params as postgres.ParameterOrJSON[]); + result = { + changes: (rows as unknown as { count: number }).count ?? 0, + lastInsertRowid: 0, + }; + } else if (type === 'get') { + const rows = await sql.unsafe(query, params as postgres.ParameterOrJSON[]); + result = rows.length > 0 ? normalizeRow(rows[0] as Record) : null; + } else if (type === 'all') { + const rows = await sql.unsafe(query, params as postgres.ParameterOrJSON[]); + result = normalizeRows(rows as readonly Record[]); + } + } catch (err: unknown) { + error = err instanceof Error ? err.message : String(err); + if (type !== 'close') { + console.error('[pg-worker] query error:', error, '\nSQL:', query, '\nParams:', params); + } + } + + // Post result before signalling so main thread can receiveMessageOnPort + port.postMessage({ result, error }); + + // Signal main thread that result is ready + Atomics.store(sharedInt32, 0, 1); + Atomics.notify(sharedInt32, 0, 1); +}); diff --git a/src/pg.ts b/src/pg.ts new file mode 100644 index 00000000..b56f674f --- /dev/null +++ b/src/pg.ts @@ -0,0 +1,190 @@ +/** + * pg.ts — PostgreSQL adapter implementing the Database/Statement interfaces. + * + * Uses a Worker thread to run async postgres queries and Atomics.wait() to + * block the caller, exposing a synchronous API compatible with SQLite adapters. + * + * SQL differences handled here: + * - ? placeholders -> $1, $2, ... + * - Float32Array params -> pgvector literal '[f1,f2,...]' + * - loadExtension() is a no-op + */ + +import { + MessageChannel, + receiveMessageOnPort, + type MessagePort, + Worker, +} from "node:worker_threads"; +import { fileURLToPath } from "node:url"; +import type { Database, Statement } from "./db.js"; + +type QueryType = "exec" | "run" | "get" | "all" | "close"; + +type WorkerResponse = { + result: unknown; + error: string | null; +}; + +/** + * Translate SQLite-style `?` placeholders to PostgreSQL `$N` placeholders. + * Skips placeholders inside SQL string literals. + */ +function translatePlaceholders(sql: string): string { + let i = 0; + let index = 0; + let out = ""; + + while (i < sql.length) { + const ch = sql[i]!; + + if (ch === "'") { + out += ch; + i++; + while (i < sql.length) { + const sch = sql[i]!; + out += sch; + if (sch === "'") { + if (sql[i + 1] === "'") { + out += "'"; + i += 2; + } else { + i++; + break; + } + } else { + i++; + } + } + continue; + } + + if (ch === "?") { + index += 1; + out += `$${index}`; + i++; + continue; + } + + out += ch; + i++; + } + + return out; +} + +/** + * Convert Float32Array params to pgvector text literal. + */ +function convertParams(params: unknown[]): unknown[] { + return params.map((param) => { + if (param instanceof Float32Array) { + return `[${Array.from(param).join(",")}]`; + } + return param; + }); +} + +function resolveWorkerPath(): string { + const thisFile = fileURLToPath(import.meta.url); + const workerFile = thisFile.endsWith(".ts") ? "pg-worker.ts" : "pg-worker.js"; + return fileURLToPath(new URL(`./${workerFile}`, import.meta.url)); +} + +class PgStatement implements Statement { + constructor( + private readonly db: PgDatabase, + private readonly sql: string, + ) {} + + run(...params: unknown[]): { changes: number; lastInsertRowid: number | bigint } { + return this.db.syncQuery("run", this.sql, params) as { + changes: number; + lastInsertRowid: number | bigint; + }; + } + + get(...params: unknown[]): unknown { + return this.db.syncQuery("get", this.sql, params); + } + + all(...params: unknown[]): unknown[] { + return this.db.syncQuery("all", this.sql, params) as unknown[]; + } +} + +export class PgDatabase implements Database { + private readonly worker: Worker; + private readonly port: MessagePort; + private readonly waitState: Int32Array; + + constructor(url: string) { + const sharedBuffer = new SharedArrayBuffer(4); + this.waitState = new Int32Array(sharedBuffer); + + const { port1, port2 } = new MessageChannel(); + this.port = port1; + + const workerPath = resolveWorkerPath(); + const isBunRuntime = typeof (globalThis as Record).Bun !== "undefined"; + const workerNeedsTsx = !isBunRuntime && workerPath.endsWith(".ts"); + + this.worker = new Worker(workerPath, { + workerData: { + pgUrl: url, + sharedBuffer, + port: port2, + }, + transferList: [port2], + execArgv: workerNeedsTsx ? ["--import", "tsx/esm"] : [], + }); + } + + syncQuery(type: QueryType, query: string, params: unknown[]): unknown { + Atomics.store(this.waitState, 0, 0); + + this.port.postMessage({ + type, + query, + params: convertParams(params), + }); + + Atomics.wait(this.waitState, 0, 0); + + const response = receiveMessageOnPort(this.port); + if (!response?.message) { + throw new Error("[PgDatabase] no response from postgres worker"); + } + + const payload = response.message as WorkerResponse; + if (payload.error) { + throw new Error(payload.error); + } + + return payload.result; + } + + exec(sql: string): void { + this.syncQuery("exec", sql, []); + } + + prepare(sql: string): Statement { + return new PgStatement(this, translatePlaceholders(sql)); + } + + // PostgreSQL extensions are managed by CREATE EXTENSION. + loadExtension(_path: string): void {} + + close(): void { + try { + this.syncQuery("close", "", []); + } catch { + // Ignore close errors (worker may already be terminating). + } + void this.worker.terminate(); + } +} + +export function openPgDatabase(url: string): Database { + return new PgDatabase(url); +} diff --git a/src/store.ts b/src/store.ts index aa5fae4f..f741db2b 100644 --- a/src/store.ts +++ b/src/store.ts @@ -11,8 +11,8 @@ * const store = createStore(); */ -import { openDatabase, loadSqliteVec } from "./db.js"; -import type { Database } from "./db.js"; +import { getBackend, openDatabase, openPgDatabase, loadSqliteVec } from "./db.js"; +import type { Backend, Database } from "./db.js"; import picomatch from "picomatch"; import { createHash } from "crypto"; import { readFileSync, realpathSync, statSync, mkdirSync } from "node:fs"; @@ -46,6 +46,31 @@ export const DEFAULT_QUERY_MODEL = "Qwen/Qwen3-1.7B"; export const DEFAULT_GLOB = "**/*.md"; export const DEFAULT_MULTI_GET_MAX_BYTES = 10 * 1024; // 10KB +// Track backend per opened DB so operations remain correct even if env changes. +const dbBackendMap = new WeakMap(); + +function registerDbBackend(db: Database, backend: Backend): void { + dbBackendMap.set(db, backend); +} + +function getDbBackend(db: Database): Backend { + return dbBackendMap.get(db) ?? getBackend(); +} + +function isPostgresDb(db: Database): boolean { + return getDbBackend(db) === "postgres"; +} + +function getPostgresUrl(): string { + const url = process.env.QMD_POSTGRES_URL?.trim(); + if (!url) { + throw new Error( + 'QMD_BACKEND=postgres requires QMD_POSTGRES_URL (e.g. postgresql://user@localhost/qmd).', + ); + } + return url; +} + // Chunking: 900 tokens per chunk with 15% overlap // Increased from 800 to accommodate smart chunking finding natural break points export const CHUNK_SIZE_TOKENS = 900; @@ -627,6 +652,15 @@ export function verifySqliteVecLoaded(db: Database): void { let _sqliteVecAvailable: boolean | null = null; function initializeDatabase(db: Database): void { + if (isPostgresDb(db)) { + initializePostgresDatabase(db); + _sqliteVecAvailable = false; + return; + } + initializeSqliteDatabase(db); +} + +function initializeSqliteDatabase(db: Database): void { try { loadSqliteVec(db); verifySqliteVecLoaded(db); @@ -767,6 +801,67 @@ function initializeDatabase(db: Database): void { `); } +function initializePostgresDatabase(db: Database): void { + db.exec(`CREATE EXTENSION IF NOT EXISTS vector`); + + // Drop legacy tables that are now managed in YAML + db.exec(`DROP TABLE IF EXISTS path_contexts`); + db.exec(`DROP TABLE IF EXISTS collections`); + + db.exec(` + CREATE TABLE IF NOT EXISTS content ( + hash TEXT PRIMARY KEY, + doc TEXT NOT NULL, + created_at TEXT NOT NULL, + tsv tsvector GENERATED ALWAYS AS ( + to_tsvector('english', COALESCE(doc, '')) + ) STORED + ) + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS documents ( + id BIGSERIAL PRIMARY KEY, + collection TEXT NOT NULL, + path TEXT NOT NULL, + title TEXT NOT NULL, + hash TEXT NOT NULL, + created_at TEXT NOT NULL, + modified_at TEXT NOT NULL, + active INTEGER NOT NULL DEFAULT 1, + FOREIGN KEY (hash) REFERENCES content(hash) ON DELETE CASCADE, + UNIQUE(collection, path) + ) + `); + + db.exec(`CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection, active)`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash)`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_documents_path ON documents(path, active)`); + + db.exec(` + CREATE TABLE IF NOT EXISTS llm_cache ( + hash TEXT PRIMARY KEY, + result TEXT NOT NULL, + created_at TEXT NOT NULL + ) + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS content_vectors ( + hash TEXT NOT NULL, + seq INTEGER NOT NULL DEFAULT 0, + pos INTEGER NOT NULL DEFAULT 0, + model TEXT NOT NULL, + embedded_at TEXT NOT NULL, + PRIMARY KEY (hash, seq) + ) + `); + db.exec(`CREATE INDEX IF NOT EXISTS idx_content_vectors_hash ON content_vectors(hash)`); + + // Native Postgres full-text search index. + db.exec(`CREATE INDEX IF NOT EXISTS idx_content_tsv_gin ON content USING GIN (tsv)`); +} + // ============================================================================= // Store Collections — DB accessor functions // ============================================================================= @@ -946,7 +1041,7 @@ export function isSqliteVecAvailable(): boolean { return _sqliteVecAvailable === true; } -function ensureVecTableInternal(db: Database, dimensions: number): void { +function ensureSqliteVecTableInternal(db: Database, dimensions: number): void { if (!_sqliteVecAvailable) { throw new Error("sqlite-vec is not available. Vector operations require a SQLite build with extension loading support."); } @@ -963,6 +1058,54 @@ function ensureVecTableInternal(db: Database, dimensions: number): void { db.exec(`CREATE VIRTUAL TABLE vectors_vec USING vec0(hash_seq TEXT PRIMARY KEY, embedding float[${dimensions}] distance_metric=cosine)`); } +function ensurePgVectorTableInternal(db: Database, dimensions: number): void { + db.exec(`CREATE EXTENSION IF NOT EXISTS vector`); + const tableInfo = db.prepare(` + SELECT format_type(a.atttypid, a.atttypmod) AS embedding_type + FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = current_schema() + AND c.relname = 'vectors' + AND a.attname = 'embedding' + AND a.attnum > 0 + AND NOT a.attisdropped + LIMIT 1 + `).get() as { embedding_type: string } | null; + + if (tableInfo) { + const match = tableInfo.embedding_type.match(/^vector\((\d+)\)$/); + const existingDims = match?.[1] ? parseInt(match[1], 10) : null; + if (existingDims !== dimensions) { + db.exec(`DROP TABLE IF EXISTS vectors`); + } + } + + db.exec(` + CREATE TABLE IF NOT EXISTS vectors ( + hash_seq TEXT PRIMARY KEY, + embedding vector(${dimensions}) NOT NULL + ) + `); + db.exec(`CREATE INDEX IF NOT EXISTS idx_vectors_embedding_hnsw ON vectors USING hnsw (embedding vector_cosine_ops)`); +} + +function ensureVecTableInternal(db: Database, dimensions: number): void { + if (isPostgresDb(db)) { + ensurePgVectorTableInternal(db, dimensions); + return; + } + ensureSqliteVecTableInternal(db, dimensions); +} + +function hasVectorIndex(db: Database): boolean { + if (isPostgresDb(db)) { + const row = db.prepare(`SELECT to_regclass(current_schema() || '.vectors') AS table_name`).get() as { table_name: string | null } | null; + return !!row?.table_name; + } + return !!db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'`).get(); +} + // ============================================================================= // Store Factory // ============================================================================= @@ -970,6 +1113,7 @@ function ensureVecTableInternal(db: Database, dimensions: number): void { export type Store = { db: Database; dbPath: string; + backend: Backend; /** Optional LlamaCpp instance for this store (overrides the global singleton) */ llm?: LlamaCpp; close: () => void; @@ -1311,19 +1455,26 @@ export async function generateEmbeddings( /** * Create a new store instance with the given database path. - * If no path is provided, uses the default path (~/.cache/qmd/index.sqlite). + * If no path is provided: + * - sqlite backend: ~/.cache/qmd/index.sqlite + * - postgres backend: QMD_POSTGRES_URL * - * @param dbPath - Path to the SQLite database file + * @param dbPath - SQLite file path or PostgreSQL connection URL * @returns Store instance with all methods bound to the database */ export function createStore(dbPath?: string): Store { - const resolvedPath = dbPath || getDefaultDbPath(); - const db = openDatabase(resolvedPath); + const backend = getBackend(); + const resolvedPath = dbPath || (backend === "postgres" ? getPostgresUrl() : getDefaultDbPath()); + const db = backend === "postgres" + ? openPgDatabase(resolvedPath) + : openDatabase(resolvedPath); + registerDbBackend(db, backend); initializeDatabase(db); const store: Store = { db, dbPath: resolvedPath, + backend, close: () => db.close(), ensureVecTable: (dimensions: number) => ensureVecTableInternal(db, dimensions), @@ -1637,7 +1788,17 @@ export function getCachedResult(db: Database, cacheKey: string): string | null { export function setCachedResult(db: Database, cacheKey: string, result: string): void { const now = new Date().toISOString(); - db.prepare(`INSERT OR REPLACE INTO llm_cache (hash, result, created_at) VALUES (?, ?, ?)`).run(cacheKey, result, now); + if (isPostgresDb(db)) { + db.prepare(` + INSERT INTO llm_cache (hash, result, created_at) + VALUES (?, ?, ?) + ON CONFLICT(hash) DO UPDATE SET + result = excluded.result, + created_at = excluded.created_at + `).run(cacheKey, result, now); + } else { + db.prepare(`INSERT OR REPLACE INTO llm_cache (hash, result, created_at) VALUES (?, ?, ?)`).run(cacheKey, result, now); + } if (Math.random() < 0.01) { db.exec(`DELETE FROM llm_cache WHERE hash NOT IN (SELECT hash FROM llm_cache ORDER BY created_at DESC LIMIT 1000)`); } @@ -1686,12 +1847,7 @@ export function cleanupOrphanedContent(db: Database): number { * Returns the number of orphaned embedding chunks deleted. */ export function cleanupOrphanedVectors(db: Database): number { - // Check if vectors_vec table exists - const tableExists = db.prepare(` - SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec' - `).get(); - - if (!tableExists) { + if (!hasVectorIndex(db)) { return 0; } @@ -1707,15 +1863,25 @@ export function cleanupOrphanedVectors(db: Database): number { return 0; } - // Delete from vectors_vec first - db.exec(` - DELETE FROM vectors_vec WHERE hash_seq IN ( - SELECT cv.hash || '_' || cv.seq FROM content_vectors cv - WHERE NOT EXISTS ( + if (isPostgresDb(db)) { + db.exec(` + DELETE FROM vectors v + USING content_vectors cv + WHERE v.hash_seq = cv.hash || '_' || cv.seq + AND NOT EXISTS ( SELECT 1 FROM documents d WHERE d.hash = cv.hash AND d.active = 1 ) - ) - `); + `); + } else { + db.exec(` + DELETE FROM vectors_vec WHERE hash_seq IN ( + SELECT cv.hash || '_' || cv.seq FROM content_vectors cv + WHERE NOT EXISTS ( + SELECT 1 FROM documents d WHERE d.hash = cv.hash AND d.active = 1 + ) + ) + `); + } // Delete from content_vectors db.exec(` @@ -1786,8 +1952,16 @@ export function extractTitle(content: string, filename: string): string { * Uses INSERT OR IGNORE so duplicate hashes are skipped. */ export function insertContent(db: Database, hash: string, content: string, createdAt: string): void { - db.prepare(`INSERT OR IGNORE INTO content (hash, doc, created_at) VALUES (?, ?, ?)`) - .run(hash, content, createdAt); + if (isPostgresDb(db)) { + db.prepare(` + INSERT INTO content (hash, doc, created_at) + VALUES (?, ?, ?) + ON CONFLICT(hash) DO NOTHING + `).run(hash, content, createdAt); + } else { + db.prepare(`INSERT OR IGNORE INTO content (hash, doc, created_at) VALUES (?, ?, ?)`) + .run(hash, content, createdAt); + } } /** @@ -2616,6 +2790,70 @@ export function validateLexQuery(query: string): string | null { } export function searchFTS(db: Database, query: string, limit: number = 20, collectionName?: string): SearchResult[] { + if (isPostgresDb(db)) { + const rawQuery = query.trim(); + if (!rawQuery) return []; + + let sql = ` + WITH q AS ( + SELECT websearch_to_tsquery('english', ?) AS tsq + ) + SELECT + 'qmd://' || d.collection || '/' || d.path as filepath, + d.collection || '/' || d.path as display_path, + d.title, + content.doc as body, + d.hash, + ts_rank(content.tsv, q.tsq) as bm25_score + FROM q + JOIN documents d ON d.active = 1 + JOIN content ON content.hash = d.hash + WHERE content.tsv @@ q.tsq + `; + const params: (string | number)[] = [rawQuery]; + + if (collectionName) { + sql += ` AND d.collection = ?`; + params.push(String(collectionName)); + } + + sql += ` ORDER BY bm25_score DESC LIMIT ?`; + params.push(limit); + + try { + const rows = db.prepare(sql).all(...params) as { + filepath: string; + display_path: string; + title: string; + body: string; + hash: string; + bm25_score: number; + }[]; + + return rows.map((row) => { + const rowCollectionName = row.filepath.split("//")[1]?.split("/")[0] || ""; + const rawScore = Number(row.bm25_score); + const score = rawScore > 0 ? rawScore / (1 + rawScore) : 0; + return { + filepath: row.filepath, + displayPath: row.display_path, + title: row.title, + hash: row.hash, + docid: getDocid(row.hash), + collectionName: rowCollectionName, + modifiedAt: "", + bodyLength: row.body.length, + body: row.body, + context: getContextForFile(db, row.filepath), + score, + source: "fts" as const, + }; + }); + } catch { + return []; + } + } + const ftsQuery = buildFTS5Query(query); if (!ftsQuery) return []; @@ -2645,11 +2883,11 @@ export function searchFTS(db: Database, query: string, limit: number = 20, colle const rows = db.prepare(sql).all(...params) as { filepath: string; display_path: string; title: string; body: string; hash: string; bm25_score: number }[]; return rows.map(row => { - const collectionName = row.filepath.split('//')[1]?.split('/')[0] || ""; + const rowCollectionName = row.filepath.split('//')[1]?.split('/')[0] || ""; // Convert bm25 (negative, lower is better) into a stable [0..1) score where higher is better. // FTS5 BM25 scores are negative (e.g., -10 is strong, -2 is weak). - // |x| / (1 + |x|) maps: strong(-10)→0.91, medium(-2)→0.67, weak(-0.5)→0.33, none(0)→0. - // Monotonic and query-independent — no per-query normalization needed. + // |x| / (1 + |x|) maps: strong(-10)->0.91, medium(-2)->0.67, weak(-0.5)->0.33, none(0)->0. + // Monotonic and query-independent - no per-query normalization needed. const score = Math.abs(row.bm25_score) / (1 + Math.abs(row.bm25_score)); return { filepath: row.filepath, @@ -2657,7 +2895,7 @@ export function searchFTS(db: Database, query: string, limit: number = 20, colle title: row.title, hash: row.hash, docid: getDocid(row.hash), - collectionName, + collectionName: rowCollectionName, modifiedAt: "", // Not available in FTS query bodyLength: row.body.length, body: row.body, @@ -2673,12 +2911,82 @@ export function searchFTS(db: Database, query: string, limit: number = 20, colle // ============================================================================= export async function searchVec(db: Database, query: string, model: string, limit: number = 20, collectionName?: string, session?: ILLMSession, precomputedEmbedding?: number[]): Promise { - const tableExists = db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'`).get(); - if (!tableExists) return []; + if (!hasVectorIndex(db)) return []; const embedding = precomputedEmbedding ?? await getEmbedding(query, model, true, session); if (!embedding) return []; + if (isPostgresDb(db)) { + let sql = ` + WITH query_vec AS (SELECT ?::vector AS embedding) + SELECT + v.hash_seq, + cv.hash, + cv.pos, + 'qmd://' || d.collection || '/' || d.path as filepath, + d.collection || '/' || d.path as display_path, + d.title, + content.doc as body, + (v.embedding <=> query_vec.embedding) as distance + FROM query_vec + JOIN vectors v ON TRUE + JOIN content_vectors cv ON cv.hash || '_' || cv.seq = v.hash_seq + JOIN documents d ON d.hash = cv.hash AND d.active = 1 + JOIN content ON content.hash = d.hash + `; + const params: (string | number | Float32Array)[] = [new Float32Array(embedding)]; + + if (collectionName) { + sql += ` WHERE d.collection = ?`; + params.push(collectionName); + } + + sql += ` ORDER BY distance ASC LIMIT ?`; + params.push(limit * 3); + + const docRows = db.prepare(sql).all(...params) as { + hash_seq: string; + hash: string; + pos: number; + filepath: string; + display_path: string; + title: string; + body: string; + distance: number; + }[]; + + const seen = new Map(); + for (const row of docRows) { + const distance = Number(row.distance); + const existing = seen.get(row.filepath); + if (!existing || distance < existing.bestDist) { + seen.set(row.filepath, { row, bestDist: distance }); + } + } + + return Array.from(seen.values()) + .sort((a, b) => a.bestDist - b.bestDist) + .slice(0, limit) + .map(({ row, bestDist }) => { + const rowCollectionName = row.filepath.split('//')[1]?.split('/')[0] || ""; + return { + filepath: row.filepath, + displayPath: row.display_path, + title: row.title, + hash: row.hash, + docid: getDocid(row.hash), + collectionName: rowCollectionName, + modifiedAt: "", + bodyLength: row.body.length, + body: row.body, + context: getContextForFile(db, row.filepath), + score: 1 - bestDist, + source: "vec" as const, + chunkPos: row.pos, + }; + }); + } + // IMPORTANT: We use a two-step query approach here because sqlite-vec virtual tables // hang indefinitely when combined with JOINs in the same query. Do NOT try to // "optimize" this by combining into a single query with JOINs - it will break. @@ -2776,6 +3084,16 @@ async function getEmbedding(text: string, model: string, isQuery: boolean, sessi * Returns hash, document body, and a sample path for display purposes. */ export function getHashesForEmbedding(db: Database): { hash: string; body: string; path: string }[] { + if (isPostgresDb(db)) { + return db.prepare(` + SELECT d.hash, MIN(c.doc) as body, MIN(d.path) as path + FROM documents d + JOIN content c ON d.hash = c.hash + LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0 + WHERE d.active = 1 AND v.hash IS NULL + GROUP BY d.hash + `).all() as { hash: string; body: string; path: string }[]; + } return db.prepare(` SELECT d.hash, c.doc as body, MIN(d.path) as path FROM documents d @@ -2788,16 +3106,16 @@ export function getHashesForEmbedding(db: Database): { hash: string; body: strin /** * Clear all embeddings from the database (force re-index). - * Deletes all rows from content_vectors and drops the vectors_vec table. + * Deletes all rows from content_vectors and drops the backend vector table. */ export function clearAllEmbeddings(db: Database): void { db.exec(`DELETE FROM content_vectors`); - db.exec(`DROP TABLE IF EXISTS vectors_vec`); + db.exec(isPostgresDb(db) ? `DROP TABLE IF EXISTS vectors` : `DROP TABLE IF EXISTS vectors_vec`); } /** - * Insert a single embedding into both content_vectors and vectors_vec tables. - * The hash_seq key is formatted as "hash_seq" for the vectors_vec table. + * Insert a single embedding into content_vectors and the backend vector table. + * The hash_seq key is formatted as "hash_seq" for vector lookup rows. */ export function insertEmbedding( db: Database, @@ -2809,6 +3127,26 @@ export function insertEmbedding( embeddedAt: string ): void { const hashSeq = `${hash}_${seq}`; + if (isPostgresDb(db)) { + const insertVecStmt = db.prepare(` + INSERT INTO vectors (hash_seq, embedding) + VALUES (?, ?::vector) + ON CONFLICT(hash_seq) DO UPDATE SET + embedding = excluded.embedding + `); + const insertContentVectorStmt = db.prepare(` + INSERT INTO content_vectors (hash, seq, pos, model, embedded_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(hash, seq) DO UPDATE SET + pos = excluded.pos, + model = excluded.model, + embedded_at = excluded.embedded_at + `); + insertVecStmt.run(hashSeq, embedding); + insertContentVectorStmt.run(hash, seq, pos, model, embeddedAt); + return; + } + const insertVecStmt = db.prepare(`INSERT OR REPLACE INTO vectors_vec (hash_seq, embedding) VALUES (?, ?)`); const insertContentVectorStmt = db.prepare(`INSERT OR REPLACE INTO content_vectors (hash, seq, pos, model, embedded_at) VALUES (?, ?, ?, ?, ?)`); @@ -3350,7 +3688,7 @@ export function getStatus(db: Database): IndexStatus { const totalDocs = (db.prepare(`SELECT COUNT(*) as c FROM documents WHERE active = 1`).get() as { c: number }).c; const needsEmbedding = getHashesNeedingEmbedding(db); - const hasVectors = !!db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'`).get(); + const hasVectors = hasVectorIndex(db); return { totalDocuments: totalDocs, @@ -3581,9 +3919,7 @@ export async function hybridQuery( const rankedLists: RankedResult[][] = []; const rankedListMeta: RankedListMeta[] = []; const docidMap = new Map(); // filepath -> docid - const hasVectors = !!store.db.prepare( - `SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'` - ).get(); + const hasVectors = hasVectorIndex(store.db); // Step 1: BM25 probe — strong signal skips expensive LLM expansion // When intent is provided, disable strong-signal bypass — the obvious BM25 @@ -3881,9 +4217,7 @@ export async function vectorSearchQuery( const collection = options?.collection; const intent = options?.intent; - const hasVectors = !!store.db.prepare( - `SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'` - ).get(); + const hasVectors = hasVectorIndex(store.db); if (!hasVectors) return []; // Expand query — filter to vec/hyde only (lex queries target FTS, not vector) @@ -3997,9 +4331,7 @@ export async function structuredSearch( const rankedLists: RankedResult[][] = []; const rankedListMeta: RankedListMeta[] = []; const docidMap = new Map(); // filepath -> docid - const hasVectors = !!store.db.prepare( - `SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'` - ).get(); + const hasVectors = hasVectorIndex(store.db); // Helper to run search across collections (or all if undefined) const collectionList = collections ?? [undefined]; // undefined = all collections diff --git a/test/llm.test.ts b/test/llm.test.ts index b5de9e08..78cab7de 100644 --- a/test/llm.test.ts +++ b/test/llm.test.ts @@ -17,6 +17,7 @@ import { SessionReleasedError, type RerankDocument, type ILLMSession, + type EmbeddingResult, } from "../src/llm.js"; // ============================================================================= @@ -246,24 +247,30 @@ describe.skipIf(!!process.env.CI)("LlamaCpp Integration", () => { expect(results).toHaveLength(0); }); - test("batch is faster than sequential", async () => { - const texts = Array(10).fill(null).map((_, i) => `Document number ${i} with content`); + test("splits work across embedding contexts and preserves input order", async () => { + const freshLlm = new LlamaCpp({}) as any; + const ctx1 = { + getEmbeddingFor: vi.fn(async (text: string) => ({ + vector: Float32Array.from([text.length, 1]), + })), + }; + const ctx2 = { + getEmbeddingFor: vi.fn(async (text: string) => ({ + vector: Float32Array.from([text.length, 2]), + })), + }; - // Time batch - const batchStart = Date.now(); - await llm.embedBatch(texts); - const batchTime = Date.now() - batchStart; + freshLlm.touchActivity = vi.fn(); + freshLlm.ensureEmbedContexts = vi.fn().mockResolvedValue([ctx1, ctx2]); - // Time sequential - const seqStart = Date.now(); - for (const text of texts) { - await llm.embed(text); - } - const seqTime = Date.now() - seqStart; + const texts = ["one", "two", "three", "four"]; + const results: (EmbeddingResult | null)[] = await freshLlm.embedBatch(texts); - console.log(`Batch: ${batchTime}ms, Sequential: ${seqTime}ms`); - // Performance is machine/load dependent. We only assert batch isn't drastically worse. - expect(batchTime).toBeLessThanOrEqual(seqTime * 3); + expect(freshLlm.ensureEmbedContexts).toHaveBeenCalledTimes(1); + expect(ctx1.getEmbeddingFor.mock.calls.map(([text]: [string]) => text)).toEqual(["one", "two"]); + expect(ctx2.getEmbeddingFor.mock.calls.map(([text]: [string]) => text)).toEqual(["three", "four"]); + expect(results.map((result) => result?.embedding[0])).toEqual([3, 3, 5, 4]); + expect(results.map((result) => result?.embedding[1])).toEqual([1, 1, 2, 2]); }); test("handles concurrent embedBatch calls on fresh instance without race condition", async () => { diff --git a/test/store.postgres.test.ts b/test/store.postgres.test.ts new file mode 100644 index 00000000..86610a25 --- /dev/null +++ b/test/store.postgres.test.ts @@ -0,0 +1,207 @@ +import { afterEach, beforeAll, beforeEach, describe, expect, test } from "vitest"; +import { mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import postgres from "postgres"; +import YAML from "yaml"; + +// Run manually with: +// QMD_ENABLE_POSTGRES_TESTS=1 \ +// QMD_TEST_POSTGRES_URL=postgresql://localhost/qmd_test \ +// bun test --preload ./src/test-preload.ts test/store.postgres.test.ts +const RUN_POSTGRES_TESTS = process.env.QMD_ENABLE_POSTGRES_TESTS === "1"; + +// Opt-in integration tests. Defaults assume local PostgreSQL socket/TCP access +// with the current OS user; override in CI or custom environments as needed. +const TEST_POSTGRES_URL = process.env.QMD_TEST_POSTGRES_URL ?? "postgresql://localhost/qmd_test"; +const TEST_POSTGRES_DB = getDatabaseName(TEST_POSTGRES_URL); +const ADMIN_POSTGRES_URL = process.env.QMD_TEST_POSTGRES_ADMIN_URL ?? replaceDatabase(TEST_POSTGRES_URL, "postgres"); + +let testConfigDir = ""; + +function getDatabaseName(connectionString: string): string { + const url = new URL(connectionString); + const databaseName = decodeURIComponent(url.pathname.replace(/^\/+/, "")); + if (!databaseName) { + throw new Error(`Expected database name in QMD_TEST_POSTGRES_URL: ${connectionString}`); + } + return databaseName; +} + +function replaceDatabase(connectionString: string, databaseName: string): string { + const url = new URL(connectionString); + url.pathname = `/${databaseName}`; + return url.toString(); +} + +function quoteIdentifier(identifier: string): string { + return `"${identifier.replace(/"/g, '""')}"`; +} + +async function ensureTestDatabase(): Promise { + const admin = postgres(ADMIN_POSTGRES_URL, { + max: 1, + idle_timeout: 1, + connect_timeout: 5, + }); + + try { + await admin.unsafe(`CREATE DATABASE ${quoteIdentifier(TEST_POSTGRES_DB)}`); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (!message.includes("already exists") && !message.includes("duplicate_database")) { + throw err; + } + } finally { + await admin.end({ timeout: 5 }); + } + + const sql = postgres(TEST_POSTGRES_URL, { + max: 1, + idle_timeout: 1, + connect_timeout: 5, + }); + try { + await sql.unsafe(`CREATE EXTENSION IF NOT EXISTS vector`); + } finally { + await sql.end({ timeout: 5 }); + } +} + +async function resetDatabase(): Promise { + const sql = postgres(TEST_POSTGRES_URL, { + max: 1, + idle_timeout: 1, + connect_timeout: 5, + }); + + try { + await sql.unsafe(`DROP TABLE IF EXISTS vectors CASCADE`); + await sql.unsafe(`DROP TABLE IF EXISTS content_vectors CASCADE`); + await sql.unsafe(`DROP TABLE IF EXISTS llm_cache CASCADE`); + await sql.unsafe(`DROP TABLE IF EXISTS documents CASCADE`); + await sql.unsafe(`DROP TABLE IF EXISTS content CASCADE`); + await sql.unsafe(`DROP TABLE IF EXISTS path_contexts CASCADE`); + await sql.unsafe(`DROP TABLE IF EXISTS collections CASCADE`); + await sql.unsafe(`CREATE EXTENSION IF NOT EXISTS vector`); + } finally { + await sql.end({ timeout: 5 }); + } +} + +async function setupConfigDir(): Promise { + testConfigDir = await mkdtemp(join(tmpdir(), "qmd-pg-config-")); + process.env.QMD_CONFIG_DIR = testConfigDir; + await writeFile(join(testConfigDir, "index.yml"), YAML.stringify({ collections: {} }), "utf-8"); +} + +async function cleanupConfigDir(): Promise { + if (testConfigDir) { + await rm(testConfigDir, { recursive: true, force: true }); + } + delete process.env.QMD_CONFIG_DIR; +} + +async function importStoreModule() { + process.env.QMD_BACKEND = "postgres"; + process.env.QMD_POSTGRES_URL = TEST_POSTGRES_URL; + return await import("../src/store.js"); +} + +describe.skipIf(!RUN_POSTGRES_TESTS)("Postgres backend integration", () => { + beforeAll(async () => { + await ensureTestDatabase(); + }); + + beforeEach(async () => { + await resetDatabase(); + await setupConfigDir(); + }); + + afterEach(async () => { + await cleanupConfigDir(); + delete process.env.QMD_BACKEND; + delete process.env.QMD_POSTGRES_URL; + }); + + test("createStore initializes postgres schema and pgvector index", async () => { + const { createStore } = await importStoreModule(); + const store = createStore(); + + try { + expect(store.backend).toBe("postgres"); + store.ensureVecTable(3); + + const table = store.db.prepare(`SELECT to_regclass(current_schema() || '.vectors') AS name`).get() as { + name: string | null; + } | null; + expect(table?.name).toBeTruthy(); + + const indexes = store.db.prepare(` + SELECT indexname + FROM pg_indexes + WHERE schemaname = current_schema() AND tablename = 'vectors' + `).all() as { indexname: string }[]; + expect(indexes.some((idx) => idx.indexname === "idx_vectors_embedding_hnsw")).toBe(true); + } finally { + store.close(); + } + }); + + test("searchFTS uses postgres tsvector index", async () => { + const { createStore } = await importStoreModule(); + const store = createStore(); + + try { + const now = new Date().toISOString(); + const body = "postgres vector index with tsvector ranking"; + const hash = "hash-fts-1"; + + store.insertContent(hash, body, now); + store.insertDocument("notes", "pg/fts.md", "Postgres FTS", hash, now, now); + + const results = store.searchFTS("postgres ranking", 10); + expect(results.length).toBe(1); + expect(results[0]?.displayPath).toBe("notes/pg/fts.md"); + expect(results[0]?.score).toBeGreaterThan(0); + } finally { + store.close(); + } + }); + + test("searchVec uses pgvector distance ordering", async () => { + const { createStore, DEFAULT_EMBED_MODEL } = await importStoreModule(); + const store = createStore(); + + try { + store.ensureVecTable(3); + const now = new Date().toISOString(); + + const hash1 = "hash-vec-1"; + const hash2 = "hash-vec-2"; + + store.insertContent(hash1, "alpha semantic content", now); + store.insertDocument("notes", "pg/a.md", "Alpha", hash1, now, now); + store.insertEmbedding(hash1, 0, 0, new Float32Array([1, 0, 0]), "test", now); + + store.insertContent(hash2, "beta semantic content", now); + store.insertDocument("notes", "pg/b.md", "Beta", hash2, now, now); + store.insertEmbedding(hash2, 0, 0, new Float32Array([0, 1, 0]), "test", now); + + const results = await store.searchVec( + "semantic query", + DEFAULT_EMBED_MODEL, + 5, + undefined, + undefined, + [1, 0, 0], + ); + + expect(results.length).toBeGreaterThan(0); + expect(results[0]?.displayPath).toBe("notes/pg/a.md"); + expect(results[0]?.score).toBeGreaterThan(results[1]?.score ?? -1); + } finally { + store.close(); + } + }); +});