Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ cython_debug/
# App
workspace/datasets/built/*/*.json
workspace/datasets/raw/squad/*.json
workspace/datasets/raw/bipia/
workspace/experiments/archive/
149 changes: 113 additions & 36 deletions src/dcv_benchmark/cli/data.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import sys
from pathlib import Path
from typing import Any

import yaml

from dcv_benchmark.constants import BUILT_DATASETS_DIR, RAW_DATASETS_DIR
from dcv_benchmark.data_factory.builder import DatasetBuilder
from dcv_benchmark.data_factory.bipia.bipia import BipiaBuilder
from dcv_benchmark.data_factory.downloader import download_bipia, download_squad
from dcv_benchmark.data_factory.injector import AttackInjector
from dcv_benchmark.data_factory.loaders import SquadLoader
from dcv_benchmark.data_factory.squad.squad_builder import SquadBuilder
from dcv_benchmark.models.bipia_config import BipiaConfig
from dcv_benchmark.models.data_factory import DataFactoryConfig
from dcv_benchmark.models.dataset import (
BipiaDataset,
DatasetMeta,
)
from dcv_benchmark.utils.logger import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -56,67 +63,137 @@ def build_data(
if potential.exists():
config_path = potential
else:
logger.error(
"Directory provided but 'dataset_config.yaml' not found in "
f"{config_path}"
)
sys.exit(1)
# Fallback for BIPIA naming convention if user put it there
potential_bipia = config_path / "bipia_config.yaml"
if potential_bipia.exists():
config_path = potential_bipia
else:
logger.error(f"No config file found in {config_path}")
sys.exit(1)

if not config_path.exists():
logger.error(f"Config file not found: {config_path}")
sys.exit(1)

# Load Config
# 1. Load Raw YAML to determine type
try:
with open(config_path, encoding="utf-8") as f:
raw_config = yaml.safe_load(f)
config = DataFactoryConfig(**raw_config)
raw_yaml = yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load config: {e}")
logger.error(f"Failed to parse YAML: {e}")
sys.exit(1)

# Determine Dataset Name (CLI override > Config > Folder Name)
if name:
dataset_name = name
# Update config to match the build name so metadata is consistent
config.dataset_name = dataset_name
# 2. Branch Logic
dataset_type = raw_yaml.get("type")

if not dataset_type:
# Fallback for legacy configs that haven't been migrated yet
# We'll infer based on 'tasks' for now but warn
if "tasks" in raw_yaml:
logger.warning("Config missing 'type', inferring 'bipia' from 'tasks'.")
dataset_type = "bipia"
else:
logger.warning("Config missing 'type', inferring 'squad'.")
dataset_type = "squad"

if dataset_type == "bipia":
_build_bipia(raw_yaml, name, overwrite)
elif dataset_type == "squad":
_build_squad(raw_yaml, name, overwrite)
else:
dataset_name = config.dataset_name
logger.error(f"Unknown dataset type: '{dataset_type}'. Options: bipia, squad")
sys.exit(1)


def _build_bipia(raw_config: dict[str, Any], name: str | None, overwrite: bool) -> None:
"""Handler for BIPIA datasets."""
try:
config = BipiaConfig(**raw_config)
except Exception as e:
logger.error(f"Invalid BIPIA config: {e}")
sys.exit(1)

dataset_name = name or config.dataset_name
target_dir = BUILT_DATASETS_DIR / dataset_name
target_dir.mkdir(parents=True, exist_ok=True)
output_file = target_dir / "dataset.json"

if output_file.exists() and not overwrite:
logger.error(f"Dataset '{output_file}' exists. Use --overwrite.")
sys.exit(1)

logger.info(f"Building BIPIA dataset '{dataset_name}'...")

try:
# Initialize Builder
builder = BipiaBuilder(raw_dir=RAW_DATASETS_DIR / "bipia", seed=config.seed)

# Build Samples
raw_samples = builder.build(
tasks=config.tasks,
injection_pos=config.injection_pos,
max_samples=config.max_samples,
)

# Wrap in Standard Dataset Object for compatibility with Runner
dataset = BipiaDataset(
meta=DatasetMeta(
name=dataset_name,
type="bipia",
version="1.0.0",
description=(
f"BIPIA Benchmark (Tasks: {config.tasks}, Pos: "
f"{config.injection_pos})"
),
author="Deconvolute Labs / Microsoft BIPIA",
),
samples=raw_samples,
)

# Save
with open(output_file, "w", encoding="utf-8") as f:
f.write(dataset.model_dump_json(indent=2))

logger.info(
f"Build successful! Saved {len(raw_samples)} samples to: {output_file}"
)
# Save config copy for reproducibility
with open(target_dir / "bipia_config.yaml", "w", encoding="utf-8") as f:
yaml.dump(raw_config, f)

except Exception as e:
logger.exception(f"BIPIA Build failed: {e}")
sys.exit(1)


def _build_squad(raw_config: dict[str, Any], name: str | None, overwrite: bool) -> None:
"""Handler for Standard (SQuAD/Canary) datasets."""
try:
config = DataFactoryConfig(**raw_config)
except Exception as e:
logger.error(f"Invalid Standard config: {e}")
sys.exit(1)

dataset_name = name or config.dataset_name
target_dir = BUILT_DATASETS_DIR / dataset_name
target_dir.mkdir(parents=True, exist_ok=True)
output_file = target_dir / "dataset.json"

# Check Overwrite
if output_file.exists():
if not overwrite:
logger.error(f"Dataset artifact '{output_file}' already exists.")
logger.info("Use --overwrite to replace it.")
sys.exit(1)
else:
logger.warning(f"Overwriting existing dataset artifact at {output_file}...")
output_file.unlink()
if output_file.exists() and not overwrite:
logger.error(f"Dataset '{output_file}' exists. Use --overwrite.")
sys.exit(1)

# Build Dataset
logger.info(f"Building dataset '{dataset_name}' from {config_path}...")
logger.info(f"Building Standard dataset '{dataset_name}'...")

try:
# Note: We default to SquadLoader as it handles the JSON format.
loader = SquadLoader()
injector = AttackInjector(config)
builder = DatasetBuilder(loader=loader, injector=injector, config=config)

builder = SquadBuilder(loader=loader, injector=injector, config=config)
dataset = builder.build()

# Save Artifacts
builder.save(dataset, output_file)

logger.info(f"Build successful! Artifacts saved to: {target_dir}")

except Exception as e:
logger.exception(f"Build failed: {e}")
# Cleanup partial build
if output_file.exists() and not overwrite:
output_file.unlink()
logger.exception(f"Standard Build failed: {e}")
sys.exit(1)
133 changes: 133 additions & 0 deletions src/dcv_benchmark/core/factories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import re
from typing import Any

from dcv_benchmark.constants import (
AVAILABLE_EVALUATORS,
BASELINE_TARGET_KEYWORD,
BUILT_DATASETS_DIR,
)
from dcv_benchmark.evaluators.base import BaseEvaluator
from dcv_benchmark.evaluators.bipia import BipiaEvaluator
from dcv_benchmark.evaluators.canary import CanaryEvaluator
from dcv_benchmark.evaluators.keyword import KeywordEvaluator
from dcv_benchmark.evaluators.language import LanguageMismatchEvaluator
from dcv_benchmark.models.config.experiment import EvaluatorConfig, ExperimentConfig
from dcv_benchmark.models.dataset import BaseDataset
from dcv_benchmark.targets.basic_rag import BasicRAG
from dcv_benchmark.targets.basic_rag_guard import BasicRAGGuard
from dcv_benchmark.utils.dataset_loader import DatasetLoader
from dcv_benchmark.utils.logger import get_logger

logger = get_logger(__name__)


def load_dataset(experiment_config: ExperimentConfig) -> BaseDataset:
"""Loads dataset based on config or default path."""
dataset_path_or_name = experiment_config.input.dataset_name
if not dataset_path_or_name:
fallback_path = BUILT_DATASETS_DIR / experiment_config.name / "dataset.json"
if not fallback_path.exists():
error_msg = (
"No dataset path provided and default path not found: "
f"{fallback_path}\n"
"Please provide 'input.dataset_name' in config or ensure the "
"default dataset exists."
)
logger.error(error_msg)
raise ValueError(error_msg)

logger.info(f"No dataset provided. Using default path: {fallback_path}")
dataset_path_or_name = str(fallback_path)

dataset: BaseDataset = DatasetLoader(dataset_path_or_name).load()
logger.info(f"Loaded dataset: {dataset.meta.name} (v{dataset.meta.version})")
logger.info(f"Description: {dataset.meta.description}")
return dataset


def create_target(experiment_config: ExperimentConfig) -> BasicRAG | BasicRAGGuard:
"""Creates the target instance."""
target_name = experiment_config.target.name
if target_name == "basic_rag":
return BasicRAG(experiment_config.target)
elif target_name == "basic_rag_guard":
return BasicRAGGuard(experiment_config.target)
else:
raise ValueError(
f"Unsupported target: '{target_name}'. "
"Available targets: 'basic_rag', 'basic_rag_guard'."
)


def _validate_baseline_payload(dataset: BaseDataset) -> None:
"""Helper to validate dataset payload for Keyword evaluation."""
attack_info = dataset.meta.attack_info
if not attack_info:
logger.warning(
"Dataset metadata is missing 'attack_info'. Skipping validation."
)
return

pattern = re.compile(rf"\b{re.escape(BASELINE_TARGET_KEYWORD)}\b")
if not pattern.search(attack_info.payload):
error_msg = (
"Configuration Mismatch! \n"
f"Evaluator expects: '{BASELINE_TARGET_KEYWORD}'\n"
f"Dataset payload: '{attack_info.payload}'"
)
logger.error(error_msg)
raise ValueError(error_msg)

logger.info(
f"Confirmed: Dataset payload contains target '{BASELINE_TARGET_KEYWORD}'."
)


def create_evaluator(
config: EvaluatorConfig | None,
target: Any = None,
dataset: BaseDataset | None = None,
) -> BaseEvaluator:
"""Creates the evaluator instance."""
if config is None:
error_msg = (
"Missing Configuration: No evaluator specified.\nYou must explicitly"
" define an 'evaluator' section in your experiment YAML.\n"
f"Available types: {', '.join(AVAILABLE_EVALUATORS)}"
)
logger.error(error_msg)
raise ValueError(error_msg)

if config.type == "canary":
logger.info("Evaluator: Canary Defense Integrity")
return CanaryEvaluator()

elif config.type == "keyword":
if dataset:
_validate_baseline_payload(dataset)
kw = config.target_keyword or BASELINE_TARGET_KEYWORD
logger.info(f"Evaluator: Keyword (Target: '{kw}')")
return KeywordEvaluator(target_keyword=kw)

elif config.type == "language_mismatch":
logger.info(
f"Evaluator: Language Mismatch (Expected: {config.expected_language})"
)
try:
return LanguageMismatchEvaluator(
expected_language=config.expected_language,
strict=config.strict,
)
except ImportError as e:
logger.error("Missing dependencies for Language Evaluator.")
raise e
elif config.type == "bipia":
logger.info("Evaluator: BIPIA (LLM Judge + Pattern Match)")
judge_llm = getattr(target, "llm", None)
if not judge_llm:
logger.warning(
"BIPIA Evaluator initialized without an LLM! Text tasks will fail."
)
return BipiaEvaluator(judge_llm=judge_llm)
else:
raise ValueError(f"Unknown evaluator type: {config.type}")
Loading