|
1 | 1 | import { Database } from "bun:sqlite"; |
2 | 2 | import { join, basename, isAbsolute } from "node:path"; |
| 3 | +import { existsSync } from "node:fs"; |
3 | 4 | import { CONFIG } from "../../config.js"; |
4 | 5 | import { connectionManager } from "./connection-manager.js"; |
5 | 6 | import { log } from "../logger.js"; |
@@ -188,13 +189,88 @@ export class ShardManager { |
188 | 189 | db.run(`CREATE INDEX IF NOT EXISTS idx_is_pinned ON memories(is_pinned)`); |
189 | 190 | } |
190 | 191 |
|
| 192 | + /** |
| 193 | + * Check if the shard DB file exists and contains the required 'memories' table. |
| 194 | + * Returns false if the file is missing or the table doesn't exist. |
| 195 | + */ |
| 196 | + private isShardValid(shard: ShardInfo): boolean { |
| 197 | + // Check if the DB file exists on disk |
| 198 | + if (!existsSync(shard.dbPath)) { |
| 199 | + log("Shard DB file missing", { dbPath: shard.dbPath, shardId: shard.id }); |
| 200 | + return false; |
| 201 | + } |
| 202 | + |
| 203 | + // Check if the 'memories' table exists |
| 204 | + try { |
| 205 | + const db = connectionManager.getConnection(shard.dbPath); |
| 206 | + const result = db.prepare( |
| 207 | + `SELECT name FROM sqlite_master WHERE type='table' AND name='memories'` |
| 208 | + ).get() as any; |
| 209 | + if (!result) { |
| 210 | + log("Shard DB missing 'memories' table", { dbPath: shard.dbPath, shardId: shard.id }); |
| 211 | + return false; |
| 212 | + } |
| 213 | + return true; |
| 214 | + } catch (error) { |
| 215 | + log("Error validating shard DB", { dbPath: shard.dbPath, error: String(error) }); |
| 216 | + return false; |
| 217 | + } |
| 218 | + } |
| 219 | + |
| 220 | + /** |
| 221 | + * Ensure the shard DB has all required tables. If tables are missing, |
| 222 | + * re-initialize them. This handles cases where the DB file exists but |
| 223 | + * was corrupted or partially created. |
| 224 | + */ |
| 225 | + ensureShardTables(shard: ShardInfo): void { |
| 226 | + try { |
| 227 | + const db = connectionManager.getConnection(shard.dbPath); |
| 228 | + this.initShardDb(db); |
| 229 | + } catch (error) { |
| 230 | + log("Error ensuring shard tables", { dbPath: shard.dbPath, error: String(error) }); |
| 231 | + } |
| 232 | + } |
| 233 | + |
| 234 | + /** |
| 235 | + * Remove a stale shard record from metadata.db. |
| 236 | + * Used when the actual shard DB file has been deleted externally. |
| 237 | + */ |
| 238 | + private removeStaleShardRecord(shardId: number): void { |
| 239 | + log("Removing stale shard record from metadata", { shardId }); |
| 240 | + connectionManager.closeConnection( |
| 241 | + this.metadataDb.prepare(`SELECT db_path FROM shards WHERE id = ?`).get(shardId) as any |
| 242 | + ); |
| 243 | + const stmt = this.metadataDb.prepare(`DELETE FROM shards WHERE id = ?`); |
| 244 | + stmt.run(shardId); |
| 245 | + } |
| 246 | + |
191 | 247 | getWriteShard(scope: "user" | "project", scopeHash: string): ShardInfo { |
192 | 248 | let shard = this.getActiveShard(scope, scopeHash); |
193 | 249 |
|
194 | 250 | if (!shard) { |
195 | 251 | return this.createShard(scope, scopeHash, 0); |
196 | 252 | } |
197 | 253 |
|
| 254 | + // Validate that the shard DB file exists and has required tables |
| 255 | + if (!this.isShardValid(shard)) { |
| 256 | + log("Active shard is invalid, recreating", { |
| 257 | + scope, |
| 258 | + scopeHash, |
| 259 | + shardIndex: shard.shardIndex, |
| 260 | + dbPath: shard.dbPath, |
| 261 | + }); |
| 262 | + |
| 263 | + // Close any cached connection to the invalid shard |
| 264 | + connectionManager.closeConnection(shard.dbPath); |
| 265 | + |
| 266 | + // Remove the stale metadata record |
| 267 | + const deleteStmt = this.metadataDb.prepare(`DELETE FROM shards WHERE id = ?`); |
| 268 | + deleteStmt.run(shard.id); |
| 269 | + |
| 270 | + // Create a fresh shard with the same index |
| 271 | + return this.createShard(scope, scopeHash, shard.shardIndex); |
| 272 | + } |
| 273 | + |
198 | 274 | if (shard.vectorCount >= CONFIG.maxVectorsPerShard) { |
199 | 275 | this.markShardReadOnly(shard.id); |
200 | 276 | return this.createShard(scope, scopeHash, shard.shardIndex + 1); |
|
0 commit comments