From 260664af37d57d07d8f0165a0a8ffd94b69faab2 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 20 Jan 2026 22:38:58 +0400 Subject: [PATCH] fix: use transactions for atomic database inserts This change introduces atomic database operations for file and chunk insertions. Previously, files and chunks were inserted in separate operations, which could lead to inconsistent states (orphan files) if the process was interrupted. Now, insertions are wrapped in a transaction, ensuring that either all data is persisted or none of it is. --- src/cli/commands.rs | 4 ++-- src/core/db.rs | 46 ++++++++++++++++++++++++++++++++++++++- src/core/indexer.rs | 53 +++++++++++++++++++++++++-------------------- src/watcher.rs | 4 ++-- 4 files changed, 78 insertions(+), 29 deletions(-) diff --git a/src/cli/commands.rs b/src/cli/commands.rs index f128499..1cc2966 100644 --- a/src/cli/commands.rs +++ b/src/cli/commands.rs @@ -516,7 +516,7 @@ fn run_index( } let db = Database::new(&config.db_path()?)?; - let indexer = ServerIndexer::new(db, client, max_size); + let mut indexer = ServerIndexer::new(db, client, max_size); indexer.index_directory(&path, force)?; } Mode::Local => { @@ -531,7 +531,7 @@ fn run_index( let db = Database::new(&config.db_path()?)?; let engine = EmbeddingEngine::new(config)?; - let indexer = Indexer::new(db, engine, max_size); + let mut indexer = Indexer::new(db, engine, max_size); indexer.index_directory(&path, force)?; } } diff --git a/src/core/db.rs b/src/core/db.rs index 080e586..d4a5a68 100644 --- a/src/core/db.rs +++ b/src/core/db.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use rusqlite::{params, Connection}; use std::path::{Path, PathBuf}; @@ -104,6 +104,50 @@ impl Database { } } + pub fn insert_file_with_chunks( + &mut self, + path: &Path, + hash: &str, + chunks: &[ChunkEntry], + ) -> Result<()> { + let path_str = path.to_string_lossy(); + let now = Utc::now().to_rfc3339(); + + let tx = self.conn.transaction()?; + + // Insert file + tx.execute( + "INSERT OR REPLACE INTO files (path, hash, indexed_at) VALUES (?, ?, ?)", + params![path_str.as_ref(), hash, now], + )?; + + let file_id = tx.last_insert_rowid(); + + // Insert chunks + { + let mut stmt = tx.prepare( + r"INSERT OR REPLACE INTO chunks + (file_id, chunk_index, content, start_line, end_line, embedding) + VALUES (?, ?, ?, ?, ?, ?)", + )?; + + for chunk in chunks { + let embedding_bytes = embedding_to_bytes(&chunk.embedding); + stmt.execute(params![ + file_id, + chunk.chunk_index, + chunk.content, + chunk.start_line, + chunk.end_line, + embedding_bytes + ])?; + } + } + + tx.commit()?; + Ok(()) + } + pub fn insert_file(&self, path: &Path, hash: &str) -> Result { let path_str = path.to_string_lossy(); let now = Utc::now().to_rfc3339(); diff --git a/src/core/indexer.rs b/src/core/indexer.rs index 469c120..cc1d567 100644 --- a/src/core/indexer.rs +++ b/src/core/indexer.rs @@ -6,7 +6,7 @@ use sha2::{Digest, Sha256}; use std::fs; use std::path::{Path, PathBuf}; -use super::db::Database; +use super::db::{ChunkEntry, Database}; use super::embeddings::EmbeddingEngine; use crate::ui; @@ -42,7 +42,7 @@ impl Indexer { } } - pub fn index_directory(&self, path: &Path, force: bool) -> Result<()> { + pub fn index_directory(&mut self, path: &Path, force: bool) -> Result<()> { let abs_path = fs::canonicalize(path).context("Failed to resolve path")?; println!( @@ -159,23 +159,25 @@ impl Indexer { let mut indexed = 0; for pending in &pending_files { - // Insert file - let file_id = self.db.insert_file(&pending.path, &pending.hash)?; - - // Insert chunks with their embeddings + // Prepare chunks with embeddings + let mut chunks_to_insert = Vec::new(); for (chunk_idx, chunk) in pending.chunks.iter().enumerate() { let embedding = &all_embeddings[embedding_idx]; - self.db.insert_chunk( - file_id, - chunk_idx as i32, - &chunk.content, - chunk.start_line, - chunk.end_line, - embedding, - )?; + chunks_to_insert.push(ChunkEntry { + id: 0, // Will be auto-incremented + file_id: 0, // Will be set after file insertion + chunk_index: chunk_idx as i32, + content: chunk.content.clone(), + start_line: chunk.start_line, + end_line: chunk.end_line, + embedding: embedding.clone(), + }); embedding_idx += 1; } + // Insert file and chunks atomically + self.db.insert_file_with_chunks(&pending.path, &pending.hash, &chunks_to_insert)?; + indexed += 1; pb.inc(1); } @@ -410,7 +412,7 @@ impl ServerIndexer { } } - pub fn index_directory(&self, path: &Path, force: bool) -> Result<()> { + pub fn index_directory(&mut self, path: &Path, force: bool) -> Result<()> { let abs_path = fs::canonicalize(path).context("Failed to resolve path")?; println!( @@ -538,21 +540,24 @@ impl ServerIndexer { let mut indexed = 0; for pending in &pending_files { - let file_id = self.db.insert_file(&pending.path, &pending.hash)?; + let mut chunks_to_insert = Vec::new(); for (chunk_idx, chunk) in pending.chunks.iter().enumerate() { let embedding = &all_embeddings[embedding_idx]; - self.db.insert_chunk( - file_id, - chunk_idx as i32, - &chunk.content, - chunk.start_line, - chunk.end_line, - embedding, - )?; + chunks_to_insert.push(ChunkEntry { + id: 0, + file_id: 0, + chunk_index: chunk_idx as i32, + content: chunk.content.clone(), + start_line: chunk.start_line, + end_line: chunk.end_line, + embedding: embedding.clone(), + }); embedding_idx += 1; } + self.db.insert_file_with_chunks(&pending.path, &pending.hash, &chunks_to_insert)?; + indexed += 1; pb.inc(1); } diff --git a/src/watcher.rs b/src/watcher.rs index 816219f..74ce458 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -270,7 +270,7 @@ impl FileWatcher { Mode::Server => { let db = Database::new(&self.config.db_path()?)?; let client = Client::new(&self.config.server_host, self.config.server_port); - let indexer = ServerIndexer::new(db, client, self.config.max_file_size); + let mut indexer = ServerIndexer::new(db, client, self.config.max_file_size); indexer.index_directory(&self.root_path, false)?; } Mode::Local => { @@ -279,7 +279,7 @@ impl FileWatcher { } let db = Database::new(&self.config.db_path()?)?; let engine = crate::core::EmbeddingEngine::new(&self.config)?; - let indexer = Indexer::new(db, engine, self.config.max_file_size); + let mut indexer = Indexer::new(db, engine, self.config.max_file_size); indexer.index_directory(&self.root_path, false)?; } }