Skip to content
Closed
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
"express": "^4.18.2",
"glob": "^11.0.3",
"handlebars": "^4.7.8",
"proper-lockfile": "^4.1.2",
"react": "^18.3.1",
"react-dom": "^18.3.1",
"yaml": "^2.8.2",
Expand All @@ -113,6 +114,7 @@
"@types/dompurify": "^3.0.5",
"@types/express": "^4.17.21",
"@types/node": "^20.0.0",
"@types/proper-lockfile": "^4.1.4",
"@types/react": "^18.3.5",
"@types/react-dom": "^18.3.0",
"esbuild": "^0.27.2",
Expand Down
20 changes: 0 additions & 20 deletions plugin/hooks/hooks.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/smart-install.js\"",
"timeout": 300
},
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" start",
"timeout": 60
},
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" hook claude-code context",
Expand All @@ -38,11 +33,6 @@
"UserPromptSubmit": [
{
"hooks": [
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" start",
"timeout": 60
},
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" hook claude-code session-init",
Expand All @@ -55,11 +45,6 @@
{
"matcher": "*",
"hooks": [
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" start",
"timeout": 60
},
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" hook claude-code observation",
Expand All @@ -71,11 +56,6 @@
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" start",
"timeout": 60
},
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" hook claude-code summarize",
Expand Down
54 changes: 53 additions & 1 deletion src/services/infrastructure/ProcessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const ORPHAN_PROCESS_PATTERNS = [
];

// Only kill processes older than this to avoid killing the current session
const ORPHAN_MAX_AGE_MINUTES = 30;
const ORPHAN_MAX_AGE_MINUTES = 5;

export interface PidInfo {
pid: number;
Expand Down Expand Up @@ -337,6 +337,58 @@ export async function cleanupOrphanedProcesses(): Promise<void> {
}

logger.info('SYSTEM', 'Orphaned processes cleaned up', { count: pidsToKill.length });

// Also check for excess chroma-mcp by count (catches recent spawn storms)
await cleanupExcessChromaProcesses();
}

/**
* Kill excess chroma-mcp processes by count, regardless of age.
* Keeps the newest MAX_CHROMA_PROCESSES alive, kills the rest.
* This catches spawn storms where all processes are < ORPHAN_MAX_AGE_MINUTES old.
*/
const MAX_CHROMA_PROCESSES = 2;

export async function cleanupExcessChromaProcesses(): Promise<void> {
if (process.platform === 'win32') {
// Windows cleanup relies on age-based cleanupOrphanedProcesses() above
// which uses PowerShell Get-CimInstance. Count-based cleanup not yet implemented.
return;
}

try {
const { stdout } = await execAsync(
'ps -eo pid,etimes,command | grep chroma-mcp | grep -v grep | sort -k2 -n || true'
);

if (!stdout.trim()) return;

const lines = stdout.trim().split('\n');
if (lines.length <= MAX_CHROMA_PROCESSES) return;

// etimes = elapsed seconds, sort -k2 -n ascending = newest first (smallest etimes)
// Keep the newest (start of list), kill the oldest (end of list)
const toKill = lines.slice(MAX_CHROMA_PROCESSES);

for (const line of toKill) {
const match = line.trim().match(/^(\d+)/);
if (!match) continue;
const pid = parseInt(match[1], 10);
if (pid <= 0 || pid === process.pid) continue;
try {
process.kill(pid, 'SIGKILL');
logger.info('SYSTEM', 'Killed excess chroma-mcp process', { pid });
} catch {}
}

logger.info('SYSTEM', 'Excess chroma cleanup complete', {
found: lines.length,
killed: toKill.length,
kept: MAX_CHROMA_PROCESSES
});
} catch (error) {
logger.debug('SYSTEM', 'Excess chroma cleanup failed (non-critical)', {}, error as Error);
}
}

/**
Expand Down
61 changes: 61 additions & 0 deletions src/services/infrastructure/singleton-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Filesystem-based mutex for worker spawn coordination.
*
* Prevents TOCTOU race: only one process at a time can attempt to spawn
* a worker daemon. Concurrent hook invocations that lose the lock race
* fall back to waiting for port health.
*
* Uses proper-lockfile (same library npm uses for package-lock coordination).
* Lock location: ~/.claude-mem/worker-spawn-<port>.lock
*/

import path from 'path';
import { homedir } from 'os';
import { mkdirSync, writeFileSync, existsSync } from 'fs';
import lockfile from 'proper-lockfile';
import { logger } from '../../utils/logger.js';

const DATA_DIR = path.join(homedir(), '.claude-mem');

/**
* Acquire the spawn lock and execute the callback.
* Returns null if the lock is already held (another process is spawning).
* Lock path is per-port to support multiple worker instances.
*/
export async function acquireSpawnLock<T>(
fn: () => Promise<T>,
port: number = 37777
): Promise<T | null> {
const lockPath = path.join(DATA_DIR, `worker-spawn-${port}.lock`);

// Ensure lock file exists (proper-lockfile needs the file to exist)
mkdirSync(DATA_DIR, { recursive: true });
if (!existsSync(lockPath)) {
writeFileSync(lockPath, '');
}

let release: (() => Promise<void>) | null = null;
try {
release = await lockfile.lock(lockPath, {
realpath: false,
retries: 0, // Don't wait — if locked, fall back immediately
stale: 10_000 // Auto-release after 10s (handles crashed processes)
});
} catch (err: any) {
if (err.code === 'ELOCKED') {
logger.info('SYSTEM', 'Spawn lock held by another process — waiting for port health instead');
return null;
}
// Non-lock errors (filesystem, permissions) — log and proceed without lock
logger.warn('SYSTEM', 'Failed to acquire spawn lock, proceeding without', { error: err.message });
return fn();
}

try {
return await fn();
} finally {
if (release) {
try { await release(); } catch {}
}
}
}
124 changes: 71 additions & 53 deletions src/services/sync/ChromaSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ export class ChromaSync {
// MCP SDK's StdioClientTransport uses shell:false and no detached flag, so console is inherited.
private readonly disabled: boolean = false;

// Connection mutex — coalesces concurrent ensureConnection() calls onto single spawn
private connectionPromise: Promise<void> | null = null;

// Circuit breaker — stops retry storms after repeated failures
private consecutiveFailures: number = 0;
private circuitOpenUntil: number = 0;
private static readonly MAX_FAILURES = 3;
private static readonly CIRCUIT_OPEN_MS = 60_000;

constructor(project: string) {
this.project = project;
this.collectionName = `cm__${project}`;
Expand Down Expand Up @@ -186,15 +195,49 @@ export class ChromaSync {
return;
}

// Circuit breaker: stop retrying after repeated failures
if (Date.now() < this.circuitOpenUntil) {
throw new Error('Chroma circuit breaker open — connection disabled for 60s after repeated failures');
}

// Connection mutex: coalesce concurrent callers onto single spawn
if (this.connectionPromise) {
return this.connectionPromise;
}

// Capture reference to detect if a newer call replaced us.
// Without this, a concurrent caller's promise could be cleared by
// an older caller's finally{} block — a subtle race condition.
const p = this.connectionPromise = this._doConnect();
try {
await p;
this.consecutiveFailures = 0; // Reset on success
} catch (error) {
this.consecutiveFailures++;
if (this.consecutiveFailures >= ChromaSync.MAX_FAILURES) {
this.circuitOpenUntil = Date.now() + ChromaSync.CIRCUIT_OPEN_MS;
logger.warn('CHROMA_SYNC', 'Circuit breaker tripped — disabling Chroma for 60s', {
failures: this.consecutiveFailures
});
}
throw error;
} finally {
// Only clear if still the same promise (newer call may have replaced it)
if (this.connectionPromise === p) {
this.connectionPromise = null;
}
}
}

/**
* Internal connection logic — called only by ensureConnection() mutex
*/
private async _doConnect(): Promise<void> {
logger.info('CHROMA_SYNC', 'Connecting to Chroma MCP server...', { project: this.project });

try {
// Use Python 3.13 by default to avoid onnxruntime compatibility issues with Python 3.14+
// See: https://github.com/thedotmack/claude-mem/issues/170 (Python 3.14 incompatibility)
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const pythonVersion = settings.CLAUDE_MEM_PYTHON_VERSION;

// Get combined SSL certificate bundle for Zscaler/corporate proxy environments
const combinedCertPath = this.getCombinedCertPath();

const transportOptions: any = {
Expand All @@ -208,7 +251,6 @@ export class ChromaSync {
stderr: 'ignore'
};

// Add SSL certificate environment variables for corporate proxy/Zscaler environments
if (combinedCertPath) {
transportOptions.env = {
...process.env,
Expand All @@ -221,13 +263,8 @@ export class ChromaSync {
});
}

// Note: windowsHide is not needed here because the worker daemon starts with
// -WindowStyle Hidden, so child processes inherit the hidden console.
// The MCP SDK ignores custom windowsHide anyway (overridden internally).

this.transport = new StdioClientTransport(transportOptions);

// Empty capabilities object: this client only calls Chroma tools, doesn't expose any
this.client = new Client({
name: 'claude-mem-chroma-sync',
version: packageVersion
Expand All @@ -240,6 +277,13 @@ export class ChromaSync {

logger.info('CHROMA_SYNC', 'Connected to Chroma MCP server', { project: this.project });
} catch (error) {
// Safe cleanup: close transport before nulling reference
if (this.transport) {
try { await this.transport.close(); } catch {}
}
this.transport = null;
this.client = null;
this.connected = false;
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', { project: this.project }, error as Error);
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
}
Expand Down Expand Up @@ -278,19 +322,7 @@ export class ChromaSync {
errorMessage.includes('MCP error -32000');

if (isConnectionError) {
// FIX: Close transport to kill subprocess before resetting state
// Without this, old chroma-mcp processes leak as zombies
if (this.transport) {
try {
await this.transport.close();
} catch (closeErr) {
logger.debug('CHROMA_SYNC', 'Transport close error (expected if already dead)', {}, closeErr as Error);
}
}
// Reset connection state so next call attempts reconnect
this.connected = false;
this.client = null;
this.transport = null;
await this.safeResetConnection();
logger.error('CHROMA_SYNC', 'Connection lost during collection check',
{ collection: this.collectionName }, error as Error);
throw new Error(`Chroma connection lost: ${errorMessage}`);
Expand Down Expand Up @@ -948,18 +980,7 @@ export class ChromaSync {
errorMessage.includes('MCP error -32000');

if (isConnectionError) {
// FIX: Close transport to kill subprocess before resetting state
if (this.transport) {
try {
await this.transport.close();
} catch (closeErr) {
logger.debug('CHROMA_SYNC', 'Transport close error (expected if already dead)', {}, closeErr as Error);
}
}
// Reset connection state so next call attempts reconnect
this.connected = false;
this.client = null;
this.transport = null;
await this.safeResetConnection();
logger.error('CHROMA_SYNC', 'Connection lost during query',
{ project: this.project, query }, error as Error);
throw new Error(`Chroma query failed - connection lost: ${errorMessage}`);
Expand Down Expand Up @@ -1019,26 +1040,23 @@ export class ChromaSync {
/**
* Close the Chroma client connection and cleanup subprocess
*/
async close(): Promise<void> {
if (!this.connected && !this.client && !this.transport) {
return;
}

// Close client first
if (this.client) {
await this.client.close();
}

// Explicitly close transport to kill subprocess
if (this.transport) {
await this.transport.close();
}

logger.info('CHROMA_SYNC', 'Chroma client and subprocess closed', { project: this.project });

// Always reset state
/**
* Safely reset connection state — closes transport BEFORE nulling reference.
* Prevents orphaned chroma-mcp subprocesses.
*/
private async safeResetConnection(): Promise<void> {
const t = this.transport;
const c = this.client;
this.connected = false;
this.client = null;
this.transport = null;
if (c) { try { await c.close(); } catch {} }
if (t) { try { await t.close(); } catch {} }
}

async close(): Promise<void> {
await this.safeResetConnection();
this.connectionPromise = null;
logger.info('CHROMA_SYNC', 'Chroma client and subprocess closed', { project: this.project });
}
}
Loading
Loading