Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## [Unreleased]

### Changes

- **PostgreSQL backend**: add PostgreSQL + pgvector as an alternative storage
backend for shared and multi-agent deployments where multiple processes need
concurrent access to the same QMD index. Configure with `QMD_BACKEND=postgres`
and `QMD_POSTGRES_URL`.

## [1.1.5] - 2026-03-07

Ambiguous queries like "performance" now produce dramatically better results
Expand Down
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,10 @@ qmd cleanup

## Data Storage

Index stored in: `~/.cache/qmd/index.sqlite`
Default index (SQLite) stored in: `~/.cache/qmd/index.sqlite`

When using PostgreSQL (`QMD_BACKEND=postgres`), QMD stores all index data in the
database specified by `QMD_POSTGRES_URL`.

### Schema

Expand All @@ -516,6 +519,26 @@ llm_cache -- Cached LLM responses (query expansion, rerank scores)
| Variable | Default | Description |
|----------|---------|-------------|
| `XDG_CACHE_HOME` | `~/.cache` | Cache directory location |
| `QMD_BACKEND` | `sqlite` | Storage backend (`sqlite` or `postgres`) |
| `QMD_POSTGRES_URL` | _(unset)_ | PostgreSQL URL used when `QMD_BACKEND=postgres` |

### PostgreSQL Backend

QMD supports PostgreSQL + pgvector as an alternative backend for shared or
multi-agent deployments where multiple processes need concurrent access to the
same index. SQLite remains the simplest default for single-user local use.

```sh
export QMD_BACKEND=postgres
export QMD_POSTGRES_URL=postgresql://user:pass@localhost:5432/qmd

# initialize / migrate schema on first run
qmd status
```

Requirements:
- PostgreSQL with `pgvector` installed
- `vector` extension available in the target database (`CREATE EXTENSION vector;`)

## How It Works

Expand Down
6 changes: 5 additions & 1 deletion bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"fast-glob": "^3.3.0",
"node-llama-cpp": "^3.17.1",
"picomatch": "^4.0.0",
"postgres": "^3.4.8",
"sqlite-vec": "^0.1.7-alpha.2",
"yaml": "^2.8.2",
"zod": "^4.2.1"
Expand Down
45 changes: 44 additions & 1 deletion src/db.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,56 @@
/**
* db.ts - Cross-runtime SQLite compatibility layer
* db.ts - Cross-runtime SQLite compatibility layer + backend selection.
*
* Provides a unified Database export that works under both Bun (bun:sqlite)
* and Node.js (better-sqlite3). The APIs are nearly identical — the main
* difference is the import path.
*
* Backend selection (QMD_BACKEND env var):
* - 'sqlite' (default): uses SQLite via bun:sqlite or better-sqlite3
* - 'postgres': uses PostgreSQL via pg-worker + Atomics sync wrapper
*/

export const isBun = typeof globalThis.Bun !== "undefined";

// ---------------------------------------------------------------------------
// Backend selection
// ---------------------------------------------------------------------------

export type Backend = 'sqlite' | 'postgres';

/**
* Return the active backend. Reads QMD_BACKEND env; defaults to 'sqlite'.
*/
export function getBackend(): Backend {
const v = process.env.QMD_BACKEND;
if (v === 'postgres') return 'postgres';
return 'sqlite';
}

// Loaded eagerly so backend can be switched before createStore() in tests.
let _openPgDatabase: ((url: string) => Database) | null = null;

try {
const pg = await import('./pg.js');
_openPgDatabase = pg.openPgDatabase;
} catch (err) {
if (getBackend() === 'postgres') {
throw err;
}
}

/**
* Open a PostgreSQL database. Returns a Database-compatible object that
* wraps a worker thread + Atomics for synchronous-looking access.
* Only available when QMD_BACKEND=postgres.
*/
export function openPgDatabase(url: string): Database {
if (!_openPgDatabase) {
throw new Error('PostgreSQL backend is unavailable in this runtime.');
}
return _openPgDatabase(url);
}

let _Database: any;
let _sqliteVecLoad: (db: any) => void;

Expand Down
100 changes: 100 additions & 0 deletions src/pg-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* pg-worker.ts — Worker thread that manages a PostgreSQL connection pool.
*
* Runs inside a worker_threads Worker. The main thread sends query messages
* and blocks on a SharedArrayBuffer using Atomics. This worker executes the
* async postgres query, writes the result to the message port, then
* signals the main thread via Atomics.notify().
*
* Protocol:
* Main → Worker: { type, query, params }
* Worker → Main: { result, error }
* Worker: Atomics.store(sharedInt32, 0, 1); Atomics.notify(sharedInt32, 0)
*/

import { workerData } from 'node:worker_threads';
import postgres from 'postgres';

const pgUrl: string = workerData.pgUrl;
const sharedBuffer: SharedArrayBuffer = workerData.sharedBuffer;
const port = workerData.port;

const sharedInt32 = new Int32Array(sharedBuffer);

// Single connection — one query at a time (matching synchronous caller semantics)
const sql = postgres(pgUrl, {
max: 1,
idle_timeout: 60,
connect_timeout: 10,
// Parse int8 (bigint) as regular JS numbers to match SQLite behavior
types: {
bigint: {
to: 20,
from: [20],
serialize: (x: bigint | number | string) => String(x),
parse: (x: string) => Number(x),
},
},
});

/**
* Convert BigInt values in a row to Number to ensure postMessage
* compatibility and match SQLite's numeric behavior.
*/
function normalizeRow(row: Record<string, unknown>): Record<string, unknown> {
const out: Record<string, unknown> = {};
for (const [k, v] of Object.entries(row)) {
out[k] = typeof v === 'bigint' ? Number(v) : v;
}
return out;
}

function normalizeRows(rows: readonly Record<string, unknown>[]): Record<string, unknown>[] {
return rows.map(normalizeRow);
}

type QueryMessage = {
type: 'exec' | 'run' | 'get' | 'all' | 'close';
query: string;
params: unknown[];
};

port.on('message', async (msg: QueryMessage) => {
const { type, query, params } = msg;
let result: unknown = null;
let error: string | null = null;

try {
if (type === 'close') {
await sql.end({ timeout: 5 });
result = null;
} else if (type === 'exec') {
await sql.unsafe(query, []);
result = { changes: 0, lastInsertRowid: 0 };
} else if (type === 'run') {
const rows = await sql.unsafe(query, params as postgres.ParameterOrJSON<never>[]);
result = {
changes: (rows as unknown as { count: number }).count ?? 0,
lastInsertRowid: 0,
};
} else if (type === 'get') {
const rows = await sql.unsafe(query, params as postgres.ParameterOrJSON<never>[]);
result = rows.length > 0 ? normalizeRow(rows[0] as Record<string, unknown>) : null;
} else if (type === 'all') {
const rows = await sql.unsafe(query, params as postgres.ParameterOrJSON<never>[]);
result = normalizeRows(rows as readonly Record<string, unknown>[]);
}
} catch (err: unknown) {
error = err instanceof Error ? err.message : String(err);
if (type !== 'close') {
console.error('[pg-worker] query error:', error, '\nSQL:', query, '\nParams:', params);
}
}

// Post result before signalling so main thread can receiveMessageOnPort
port.postMessage({ result, error });

// Signal main thread that result is ready
Atomics.store(sharedInt32, 0, 1);
Atomics.notify(sharedInt32, 0, 1);
});
Loading