Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 55 additions & 9 deletions src/handlers/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axum::{extract::State, Json};
use chrono::Utc;
use std::sync::Arc;
use uuid::Uuid;
use worker::{query, Env};
use worker::{query, D1PreparedStatement, Env};

use crate::auth::Claims;
use crate::db;
Expand All @@ -11,6 +11,38 @@ use crate::models::cipher::{Cipher, CipherData};
use crate::models::folder::Folder;
use crate::models::import::ImportRequest;

/// Get the batch size from environment variable IMPORT_BATCH_SIZE.
/// Defaults to 30 if not set or invalid.
fn get_batch_size(env: &Env) -> usize {
env.var("IMPORT_BATCH_SIZE")
.ok()
.and_then(|v| v.to_string().parse::<usize>().ok())
.unwrap_or(30)
}

/// Execute statements in batches. If batch_size is 0, execute all in one batch.
async fn execute_in_batches(
db: &worker::D1Database,
statements: Vec<D1PreparedStatement>,
batch_size: usize,
) -> Result<(), AppError> {
if statements.is_empty() {
return Ok(());
}

if batch_size == 0 {
// Execute all statements in one batch
db.batch(statements).await?;
} else {
// Execute in chunks of batch_size
for chunk in statements.chunks(batch_size) {
db.batch(chunk.to_vec()).await?;
}
}

Ok(())
}

#[worker::send]
pub async fn import_data(
claims: Claims,
Expand All @@ -20,6 +52,10 @@ pub async fn import_data(
let db = db::get_db(&env)?;
let now = Utc::now();
let now = now.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
let batch_size = get_batch_size(&env);

// Prepare all folder insert statements
let mut folder_statements: Vec<D1PreparedStatement> = Vec::with_capacity(payload.folders.len());

for import_folder in &payload.folders {
let folder = Folder {
Expand All @@ -30,7 +66,7 @@ pub async fn import_data(
updated_at: now.clone(),
};

query!(
let stmt = query!(
&db,
"INSERT OR IGNORE INTO folders (id, user_id, name, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5)",
folder.id,
Expand All @@ -39,11 +75,15 @@ pub async fn import_data(
folder.created_at,
folder.updated_at
)
.map_err(|_| AppError::Database)?
.run()
.await?;
.map_err(|_| AppError::Database)?;

folder_statements.push(stmt);
}

// Execute folder inserts in batches
execute_in_batches(&db, folder_statements, batch_size).await?;

// Process folder relationships
for relationship in payload.folder_relationships {
if let Some(cipher) = payload.ciphers.get_mut(relationship.key) {
if let Some(folder) = payload.folders.get(relationship.value) {
Expand All @@ -52,6 +92,9 @@ pub async fn import_data(
}
}

// Prepare all cipher insert statements
let mut cipher_statements: Vec<D1PreparedStatement> = Vec::with_capacity(payload.ciphers.len());

for import_cipher in payload.ciphers {
if import_cipher.encrypted_for != claims.sub {
return Err(AppError::BadRequest("Cipher encrypted for wrong user".to_string()));
Expand Down Expand Up @@ -91,7 +134,7 @@ pub async fn import_data(

let data = serde_json::to_string(&cipher.data).map_err(|_| AppError::Internal)?;

query!(
let stmt = query!(
&db,
"INSERT OR IGNORE INTO ciphers (id, user_id, organization_id, type, data, favorite, folder_id, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
Expand All @@ -104,10 +147,13 @@ pub async fn import_data(
cipher.folder_id,
cipher.created_at,
cipher.updated_at,
).map_err(|_|AppError::Database)?
.run()
.await?;
).map_err(|_| AppError::Database)?;

cipher_statements.push(stmt);
}

// Execute cipher inserts in batches
execute_in_batches(&db, cipher_statements, batch_size).await?;

Ok(Json(()))
}
5 changes: 5 additions & 0 deletions wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ compatibility_date = "2025-09-19"
[build]
command = "cargo install -q worker-build && worker-build --release"

[vars]
# Optional: Set the batch size for imports. Defaults to 30 if not set.
# Set to 0 means no batching (all records imported in a single batch).
# IMPORT_BATCH_SIZE = "30"

[[d1_databases]]
binding = "vault1"
database_name = "vault1"
Expand Down
Loading