diff --git a/src/handlers/import.rs b/src/handlers/import.rs index 4669225d..2a40b6db 100644 --- a/src/handlers/import.rs +++ b/src/handlers/import.rs @@ -2,7 +2,7 @@ use axum::{extract::State, Json}; use chrono::Utc; use std::sync::Arc; use uuid::Uuid; -use worker::{query, Env}; +use worker::{query, D1PreparedStatement, Env}; use crate::auth::Claims; use crate::db; @@ -11,6 +11,38 @@ use crate::models::cipher::{Cipher, CipherData}; use crate::models::folder::Folder; use crate::models::import::ImportRequest; +/// Get the batch size from environment variable IMPORT_BATCH_SIZE. +/// Defaults to 30 if not set or invalid. +fn get_batch_size(env: &Env) -> usize { + env.var("IMPORT_BATCH_SIZE") + .ok() + .and_then(|v| v.to_string().parse::().ok()) + .unwrap_or(30) +} + +/// Execute statements in batches. If batch_size is 0, execute all in one batch. +async fn execute_in_batches( + db: &worker::D1Database, + statements: Vec, + batch_size: usize, +) -> Result<(), AppError> { + if statements.is_empty() { + return Ok(()); + } + + if batch_size == 0 { + // Execute all statements in one batch + db.batch(statements).await?; + } else { + // Execute in chunks of batch_size + for chunk in statements.chunks(batch_size) { + db.batch(chunk.to_vec()).await?; + } + } + + Ok(()) +} + #[worker::send] pub async fn import_data( claims: Claims, @@ -20,6 +52,10 @@ pub async fn import_data( let db = db::get_db(&env)?; let now = Utc::now(); let now = now.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + let batch_size = get_batch_size(&env); + + // Prepare all folder insert statements + let mut folder_statements: Vec = Vec::with_capacity(payload.folders.len()); for import_folder in &payload.folders { let folder = Folder { @@ -30,7 +66,7 @@ pub async fn import_data( updated_at: now.clone(), }; - query!( + let stmt = query!( &db, "INSERT OR IGNORE INTO folders (id, user_id, name, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5)", folder.id, @@ -39,11 +75,15 @@ pub async fn import_data( folder.created_at, folder.updated_at ) - .map_err(|_| AppError::Database)? - .run() - .await?; + .map_err(|_| AppError::Database)?; + + folder_statements.push(stmt); } + // Execute folder inserts in batches + execute_in_batches(&db, folder_statements, batch_size).await?; + + // Process folder relationships for relationship in payload.folder_relationships { if let Some(cipher) = payload.ciphers.get_mut(relationship.key) { if let Some(folder) = payload.folders.get(relationship.value) { @@ -52,6 +92,9 @@ pub async fn import_data( } } + // Prepare all cipher insert statements + let mut cipher_statements: Vec = Vec::with_capacity(payload.ciphers.len()); + for import_cipher in payload.ciphers { if import_cipher.encrypted_for != claims.sub { return Err(AppError::BadRequest("Cipher encrypted for wrong user".to_string())); @@ -91,7 +134,7 @@ pub async fn import_data( let data = serde_json::to_string(&cipher.data).map_err(|_| AppError::Internal)?; - query!( + let stmt = query!( &db, "INSERT OR IGNORE INTO ciphers (id, user_id, organization_id, type, data, favorite, folder_id, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", @@ -104,10 +147,13 @@ pub async fn import_data( cipher.folder_id, cipher.created_at, cipher.updated_at, - ).map_err(|_|AppError::Database)? - .run() - .await?; + ).map_err(|_| AppError::Database)?; + + cipher_statements.push(stmt); } + // Execute cipher inserts in batches + execute_in_batches(&db, cipher_statements, batch_size).await?; + Ok(Json(())) } \ No newline at end of file diff --git a/wrangler.toml b/wrangler.toml index 21f81723..f0206381 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -5,6 +5,11 @@ compatibility_date = "2025-09-19" [build] command = "cargo install -q worker-build && worker-build --release" +[vars] +# Optional: Set the batch size for imports. Defaults to 30 if not set. +# Set to 0 means no batching (all records imported in a single batch). +# IMPORT_BATCH_SIZE = "30" + [[d1_databases]] binding = "vault1" database_name = "vault1"