diff --git a/evals/evaluation/rag_pilot/RAG_Pilot.png b/evals/evaluation/rag_pilot/RAG_Pilot.png index 61ece95f..18939ce5 100644 Binary files a/evals/evaluation/rag_pilot/RAG_Pilot.png and b/evals/evaluation/rag_pilot/RAG_Pilot.png differ diff --git a/evals/evaluation/rag_pilot/README.md b/evals/evaluation/rag_pilot/README.md index 9399a31d..044db1c5 100644 --- a/evals/evaluation/rag_pilot/README.md +++ b/evals/evaluation/rag_pilot/README.md @@ -1,108 +1,131 @@ +# 🚀 RAG Pilot - A RAG Pipeline Tuning Tool -# RAG Pilot - A RAG Pipeline Tuning Tool - -## Overview +## 📖 Overview RAG Pilot provides a set of tuners to optimize various parameters in a retrieval-augmented generation (RAG) pipeline. Each tuner allows fine-grained control over key aspects of parsing, chunking, postporcessing, and generating selection, enabling better retrieval and response generation. -### Available Tuners +### 🧠 Available Tuners + +| Tuner | Stage | Function | Configuration | +|---|---|---|---| +| **EmbeddingTuner** | Retrieval | Tune embedding model and related parameters | Allows selection and configuration of the embedding model used for vectorization, including model name and optional parameters like dimension or backend. | +| **NodeParserTuner** | Retrieval | Tune node parser parameters | General tuner for configuring node parsers, possibly extending to custom strategies or pre-processing logic. | +| **SimpleNodeParserChunkTuner** | Retrieval | Tune `SentenceSplitter`'s `chunk_size` and `chunk_overlap` | Configures chunking behavior for document parsing by adjusting the size of individual text chunks and their overlap to ensure context retention. | +| **RetrievalTopkTuner** | Retrieval | Tune `top_k` for retriever | Adjusts how many documents are retrieved before reranking, balancing recall and performance. | +| **RerankerTopnTuner** | Postprocessing | Tune `top_n` for reranking | Adjusts the number of top-ranked documents returned after reranking, optimizing relevance and conciseness. | -| Tuner | Function | Configuration | -|---|---|---| -| **NodeParserTypeTuner** | Switch between `simple` and `hierarchical` node parsers | The `simple` parser splits text into basic chunks using [`SentenceSplitter`](https://docs.llamaindex.ai/en/stable/api_reference/node_parsers/sentence_splitter/), while the `hierarchical` parser ([`HierarchicalNodeParser`](https://docs.llamaindex.ai/en/v0.10.17/api/llama_index.core.node_parser.HierarchicalNodeParser.html)) creates a structured hierarchy of nodes to maintain contextual relationships. | -| **SimpleNodeParserChunkTuner** | Tune `SentenceSplitter`'s `chunk_size` and `chunk_overlap` | Configures chunking behavior for document parsing by adjusting the size of individual text chunks and their overlap to ensure context retention. | -| **RerankerTopnTuner** | Tune `top_n` for reranking | Adjusts the number of top-ranked documents retrieved, optimizing the relevance of retrieved results. | -| **EmbeddingLanguageTuner** | Select the embedding model | Configures the embedding model for retrieval, allowing users to select different models for vector representation. | These tuners help in optimizing document parsing, chunking strategies, reranking efficiency, and embedding selection for improved RAG performance. -## Online RAG Tuning +## 🌐 Online RAG Tuning -### Dependencies and Environment Setup +### ⚙️ Dependencies and Environment Setup -#### Setup EdgeCraftRAG +#### 🛠️ Setup EdgeCraftRAG Setup EdgeCraftRAG pipeline based on this [link](https://github.com/opea-project/GenAIExamples/tree/main/EdgeCraftRAG). Load documents in EdgeCraftRAG before running RAG Pilot. -#### Create Running Environment +#### 🧪 Create Running Environment ```bash # Create a virtual environment -python3 -m venv tuning -source tuning/bin/activate +python3 -m venv rag_pilot +source rag_pilot/bin/activate # Install dependencies pip install -r requirements.txt ``` -### Launch RAG Pilot in Online Mode +### 🚦 Launch RAG Pilot in Online Mode To launch RAG Pilot, create the following *required files* before running the command: -#### QA List File (`your_qa_list.json`) -Contains queries and optional ground truth answers. Below is a sample format: - -```json -[ - { - "query": "鸟类的祖先是恐龙吗?哪篇课文里讲了相关的内容?", - "ground_truth": "是的,鸟类的祖先是恐龙,这一内容在《飞向蓝天的恐龙》一文中有所讨论" - }, - { - "query": "桃花水是什么季节的水?" - } -] +#### 🔹Input file: QA List File (`your_queries.csv`) + +The input CSV file should contain queries and associated ground truth data (optional) used for evaluation or tuning. Each row corresponds to a specific query and context file. The CSV must include the following **columns**: + +| Column | Required | Description | +|--------|----------|-------------| +| `query_id` | ✅ Yes | Unique identifier for the query. Can be used to group multiple context entries under the same query. | +| `query` | ✅ Yes (at least one per `query_id`) | The actual query string. If left empty for some rows sharing the same `query_id`, the query from the first row with a non-empty value will be used. | +| `file_name` | ✅ Yes | The name of the file or document where the context (for retrieval or grounding) is drawn from. | +| `gt_context` | ✅ Yes | The ground truth context string that should be retrieved or matched against. | +| `ground_truth` | ❌ Optional | The ideal answer or response for the query, used for optional answer-level evaluation. | + +##### 📌 CSV File Example + +```csv +query_id,query,file_name,gt_context,ground_truth +53,故障来源有哪些?,故障处理记录表.txt,故障来源:用户投诉、日志系统、例行维护中发现、其它来源。,故障来源:用户投诉、日志系统、例行维护中发现、其它来源。 +93,uMAC网元VNFC有哪几种备份方式,index.txt,ZUF-76-04-005 VNFC支持1+1主备冗余,uMAC网元VFNC有3中备份方式: 支持1+1主备冗余,支持N+M负荷分担冗余, 支持1+1互备冗余。 +93,,index.txt,ZUF-76-04-006 VNFC支持N+M负荷分担冗余, +93,,index.txt,ZUF-76-04-008 VNFC支持1+1互备冗余, ``` -Run the following command to start the tuning process. The output RAG results will be stored in `rag_pipeline_out.json`: +#### ▶️ Run RAG Pilot + +Run the following command to start the tuning process. ```bash # Run pipeline tuning tool export ECRAG_SERVICE_HOST_IP="ecrag_host_ip" -python3 -m pipeline_tune -q "your_qa_list.json" -o "rag_pipeline_out.json" +python3 -m run_pilot -q "your_queries.csv" ``` -## Offline RAG Tuning +#### 📦 Output Files and Structure -RAG Pilot supports offline mode using a RAG configuration file. +Each tuning run in **RAG Pilot** generates a set of structured output files for analyzing and comparing different RAG pipeline configurations. -### Environment Setup +##### 📁 Directory Layout -Refer to [Create Running Environment](#create-running-environment) in the Online RAG pipeline tuning section for setting up the environment before proceeding. +- `rag_pilot_/`: Main folder for a tuning session. + - `curr_pipeline.json` – Best pipeline configuration. + - `curr_rag_results.json` – Results of the best pipeline. + - `rag_summary.csv` – Query-wise summary. + - `rag_contexts.csv` – Detailed context analysis. + - `summary.csv` – Overall performance metrics. + - `entry_/`: Subfolders for each tried pipeline with the same file structure: + - `pipeline.json` + - `rag_results.json` + - `rag_summary.csv` + - `rag_contexts.csv` -### Launch RAG Pilot in Offline Mode +##### 🗂️ Output File Overview -To launch RAG Pilot, create the following *required files* before running the command: +| File Name | Description | +|----------------------|-----------------------------------------------------------------------------| +| `pipeline.json` | RAG pipeline configuration used in a specific trial | +| `rag_results.json` | List of results for each query, including metadata and context sets | +| `rag_summary.csv` | Summary of each query's outcome, including response and context hit counts | +| `rag_contexts.csv` | Breakdown of retrieved/reranked contexts and mapping to ground truth | +| `summary.csv` | Aggregated performance summary across all queries | -#### RAG Configuration File (`your_rag_pipeline.json`) -Settings for the RAG pipeline. Please follow the format of file `configs/pipeline_sample.json`, which is compatible with [EdgeCraftRAG](https://github.com/opea-project/GenAIExamples/tree/main/EdgeCraftRAG) +**Context Mapping Notes:** -#### RAG Results File (`your_rag_results.json`) -Contains queries, responses, lists of contexts, and optional ground truth. Below is a sample format: +- Contexts are categorized as `gt_contexts`, `retrieval_contexts`, or `postprocessing_contexts`. +- Mappings track which retrieved or postprocessed contexts hit the ground truth. +- Each context is associated with a `query_id` and indexed for traceability. -```json -[ - { - "query": "鸟类的祖先是恐龙吗?哪篇课文里讲了相关的内容?", - "contexts": ["恐龙演化成鸟类的证据..."], - "response": "是的,鸟类的祖先是恐龙。", - "ground_truth": "是的,鸟类的祖先是恐龙,这一内容在《飞向蓝天的恐龙》一文中有所讨论" - } -] -``` -Run the following command to start offline tuning. The output RAG results will be stored in `rag_pipeline_out.json`: +## 📴 Offline RAG Tuning + +RAG Pilot supports offline mode using a RAG configuration file. + +### ⚙️ Environment Setup + +Refer to [Create Running Environment](#create-running-environment) in the Online RAG pipeline tuning section for setting up the environment before proceeding. + +### 🚦 Launch RAG Pilot in Offline Mode + +To be added in later release -```bash -python3 -m pipeline_tune --offline -c "your_rag_pipeline.json" -r "your_rag_results.json" -o "rag_pipeline_out.json" -``` -## How to use RAG Pilot to tune your RAG solution +## 🔧 How to Adjust RAG Pilot to Tune Your RAG Solution -### What's Nodes and Modules +### 🧩 What's Nodes and Modules RAG Pilot represents each stage of the RAG pipeline as a **node**, such as `node_parser`, `indexer`, `retriever`, etc. Each node can have different **modules** that define its type and configuration. The nodes and modules are specified in a YAML file, allowing users to switch between different implementations easily. @@ -110,7 +133,7 @@ Here is an example of nodes and modules for EdgeCraftRAG. ![RAG Pilot Architecture](RAG_Pilot.png) -### How to configure Nodes and Modules +### ⚙️ How to Configure Nodes and Modules The available nodes and their modules are stored in a YAML file (i.e. `configs/ecrag.yaml` for EdgeCraftRAG as below). Each node can have multiple modules, and both nodes and modules have configurable parameters that can be tuned. @@ -122,9 +145,14 @@ nodes: chunk_size: 400 chunk_overlap: 48 - module_type: hierarchical - chunk_sizes: [256, 384, 512] + chunk_sizes: + - 256 + - 384 + - 512 - node: indexer - embedding_model: [BAAI/bge-small-zh-v1.5, BAAI/bge-small-en-v1.5] + embedding_model: + - BAAI/bge-small-zh-v1.5 + - BAAI/bge-small-en-v1.5 modules: - module_type: vector - module_type: faiss_vector @@ -141,8 +169,11 @@ nodes: reranker_model: BAAI/bge-reranker-large - module_type: metadata_replace - node: generator - model: [Qwen/Qwen2-7B-Instruct] - inference_type: [local, vllm] + model: + - Qwen/Qwen2-7B-Instruct + inference_type: + - local + - vllm prompt: null ``` @@ -169,11 +200,11 @@ nodes: } ``` -### How to use Nodes and Modules +### 🧑‍💻 How to Use Nodes and Modules Besides the YAML configuration file, the tool also uses a module map to associate each module with a runnable instance. This ensures that the tool correctly links each module type to its respective function within the pipeline. -#### Example: Mapping Modules to Functions +#### 🧾 Example: Mapping Modules to Functions The function below defines how different module types are mapped to their respective components in EdgeCraftRAG: ```python diff --git a/evals/evaluation/rag_pilot/components/adaptor/__init__.py b/evals/evaluation/rag_pilot/components/adaptor/__init__.py deleted file mode 100644 index 4057dc01..00000000 --- a/evals/evaluation/rag_pilot/components/adaptor/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2025 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 diff --git a/evals/evaluation/rag_pilot/components/adaptor/adaptor.py b/evals/evaluation/rag_pilot/components/adaptor/adaptor.py deleted file mode 100644 index 31f486a3..00000000 --- a/evals/evaluation/rag_pilot/components/adaptor/adaptor.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright (C) 2025 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from typing import Callable, Optional - -from components.adaptor.base import Module, Node, convert_tuple, get_support_modules - - -class Adaptor: - - def __init__(self, yaml_data: str): - self.nodes = self.parse_nodes(yaml_data) - self.root_func: Optional[Callable] = None - - def parse_nodes(self, yaml_data): - parsed_nodes = {} - for node in yaml_data.get("nodes", []): - node_type = node.get("node") - modules_dict = { - mod.get("module_type"): Module( - type=mod.get("module_type", ""), - params={k: convert_tuple(v) for k, v in mod.items() if k not in ["module_type"]}, - ) - for mod in node.get("modules", []) - if mod.get("module_type") - } - node_params = {k: convert_tuple(v) for k, v in node.items() if k not in ["node", "node_type", "modules"]} - cur_node = Node(type=node_type, params=node_params, modules=modules_dict) - if node_type in parsed_nodes: - parsed_nodes[node_type].append(cur_node) - else: - parsed_nodes[node_type] = [cur_node] - return parsed_nodes - - def get_node(self, node_type, idx=0): - nodes = self.nodes[node_type] if node_type in self.nodes else None - return nodes[idx] if nodes and idx < len(nodes) else None - - def get_modules_from_node(self, node_type, idx=0): - node = self.get_node(node_type, idx) - return node.modules if node else None - - def get_module(self, node_type, module_type, idx=0): - if module_type is None: - return self.get_node(node_type, idx) - else: - modules = self.get_modules_from_node(node_type, idx) - return modules[module_type] if modules and module_type in modules else None - - def update_all_module_functions(self, module_map, node_type_map): - self.root_func = get_support_modules("root", module_map) - - for node_list in self.nodes.values(): - for node in node_list: - node.update_func(module_map) - node.is_active = False - for module in node.modules.values(): - module.update_func(module_map) - module.is_active = False - - self.activate_modules_based_on_type(node_type_map) - - def activate_modules_based_on_type(self, node_type_map): - if not self.root_func: - return - - for node_list in self.nodes.values(): - for node in node_list: - node_type = node.type - if not getattr(self.root_func, node_type, None): - continue - node.is_active = True - active_module_type = getattr(node.func, node_type_map[node_type], None) - if active_module_type and active_module_type in node.modules: - node.modules[active_module_type].is_active = True diff --git a/evals/evaluation/rag_pilot/components/constructor/constructor.py b/evals/evaluation/rag_pilot/components/constructor/constructor.py deleted file mode 100644 index 5069cec7..00000000 --- a/evals/evaluation/rag_pilot/components/constructor/constructor.py +++ /dev/null @@ -1,192 +0,0 @@ -# Copyright (C) 2025 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import copy -import json -from typing import Dict, Optional - -from components.constructor.ecrag.api_schema import ( - GeneratorIn, - IndexerIn, - ModelIn, - NodeParserIn, - PipelineCreateIn, - PostProcessorIn, - RetrieverIn, -) - - -def convert_dict_to_pipeline(pl: dict) -> PipelineCreateIn: - def initialize_component(cls, data, extra=None, key_map=None, nested_fields=None): - if not data: - return None - - extra = extra or {} - key_map = key_map or {} - nested_fields = nested_fields or {} - - processed_data = {} - for k, v in data.items(): - mapped_key = key_map.get(k, k) - if mapped_key in nested_fields: - processed_data[mapped_key] = initialize_component(nested_fields[mapped_key], v) - else: - processed_data[mapped_key] = v - if cls == ModelIn: - processed_data["model_type"] = data.get("type", processed_data.get("model_type")) - - processed_data.update(extra) - return cls(**processed_data) - - return PipelineCreateIn( - idx=pl.get("idx"), - name=pl.get("name"), - node_parser=initialize_component(NodeParserIn, pl.get("node_parser"), key_map={"idx": "idx"}), - indexer=initialize_component( - IndexerIn, - pl.get("indexer"), - key_map={"model": "embedding_model", "idx": "idx"}, - nested_fields={"embedding_model": ModelIn}, - ), - retriever=initialize_component(RetrieverIn, pl.get("retriever"), key_map={"idx": "idx"}), - postprocessor=[ - initialize_component( - PostProcessorIn, - pp, - extra={"processor_type": pp.get("processor_type")}, - key_map={"model": "reranker_model", "idx": "idx"}, - nested_fields={"reranker_model": ModelIn}, - ) - for pp in pl.get("postprocessor", []) - ], - generator=initialize_component( - GeneratorIn, pl.get("generator"), key_map={"idx": "idx"}, nested_fields={"model": ModelIn} - ), - active=pl.get("status", {}).get("active", False), - ) - - -class Constructor: - - def __init__(self): - self.pl: Optional[PipelineCreateIn] = None - self._backup = None - - def _replace_model_with_id(self): - self._backup = {} - - def extract_model_id(model): - if model: - if model.model_type not in self._backup: - self._backup[model.model_type] = [] - self._backup[model.model_type].append(model) - return model.model_id - return None - - if self.pl.indexer and self.pl.indexer.embedding_model: - self.pl.indexer.embedding_model = extract_model_id(self.pl.indexer.embedding_model) - - if self.pl.postprocessor: - for proc in self.pl.postprocessor: - if proc.reranker_model: - proc.reranker_model = extract_model_id(proc.reranker_model) - - if self.pl.generator and self.pl.generator.model: - self.pl.generator.model = extract_model_id(self.pl.generator.model) - - def _restore_model_instances(self): - if not self._backup: - self._backup = {} - - def restore_model(model_id, model_type, is_generator=False): - if model_type in self._backup: - for existing_model in self._backup[model_type]: - if existing_model.model_id == model_id: - return existing_model - - weight = self._backup[model_type][0].weight - device = self._backup[model_type][0].device - else: - weight = "INT4" if is_generator else "" - device = "auto" - - model_path = f"./models/{model_id}" - if is_generator: - model_path += f"/{weight}_compressed_weights" - - return ModelIn( - model_type=model_type, model_id=model_id, model_path=model_path, weight=weight, device=device - ) - - if self.pl.indexer and isinstance(self.pl.indexer.embedding_model, str): - self.pl.indexer.embedding_model = restore_model(self.pl.indexer.embedding_model, "embedding") - - if self.pl.postprocessor: - for proc in self.pl.postprocessor: - if isinstance(proc.reranker_model, str): - proc.reranker_model = restore_model(proc.reranker_model, "reranker") - - if self.pl.generator and isinstance(self.pl.generator.model, str): - self.pl.generator.model = restore_model(self.pl.generator.model, "llm", is_generator=True) - - self._backup = None - - def set_pipeline(self, pl): - self.pl = pl - self._replace_model_with_id() - - def export_pipeline(self): - self._restore_model_instances() - exported_pl = copy.deepcopy(self.pl) - self._replace_model_with_id() - return exported_pl - - def activate_pl(self): - self.pl.active = True - - def deactivate_pl(self): - self.pl.active = False - - def save_pipeline_to_json(self, save_path="pipeline.json"): - if self.pl: - pipeline_dict = self.export_pipeline().dict() - with open(save_path, "w") as json_file: - json.dump(pipeline_dict, json_file, indent=4) - print(f'RAG pipeline is successfully exported to "{save_path}"') - - -def get_ecrag_module_map(ecrag_pl): - ecrag_modules = { - # root - "root": (ecrag_pl, ""), - # node_parser - "node_parser": (ecrag_pl, "node_parser"), - "simple": (ecrag_pl, "node_parser"), - "hierarchical": (ecrag_pl, "node_parser"), - "sentencewindow": (ecrag_pl, "node_parser"), - # indexer - "indexer": (ecrag_pl, "indexer"), - "vector": (ecrag_pl, "indexer"), - "faiss_vector": (ecrag_pl, "indexer"), - # retriever - "retriever": (ecrag_pl, "retriever"), - "vectorsimilarity": (ecrag_pl, "retriever"), - "auto_merge": (ecrag_pl, "retriever"), - "bm25": (ecrag_pl, "retriever"), - # postprocessor - "postprocessor": (ecrag_pl, "postprocessor[0]"), - "reranker": (ecrag_pl, "postprocessor[0]"), - "metadata_replace": (ecrag_pl, "postprocessor[0]"), - # generator - "generator": (ecrag_pl, "generator"), - } - return ecrag_modules - - -COMP_TYPE_MAP = { - "node_parser": "parser_type", - "indexer": "indexer_type", - "retriever": "retriever_type", - "postprocessor": "processor_type", - "generator": "inference_type", -} diff --git a/evals/evaluation/rag_pilot/components/pilot/base.py b/evals/evaluation/rag_pilot/components/pilot/base.py new file mode 100644 index 00000000..4c882ac0 --- /dev/null +++ b/evals/evaluation/rag_pilot/components/pilot/base.py @@ -0,0 +1,432 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import copy +import csv +import hashlib +import json +import re +from difflib import SequenceMatcher +from enum import Enum +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple, Union + +import numpy as np +from components.pilot.ecrag.api_schema import ( + GeneratorIn, + IndexerIn, + ModelIn, + NodeParserIn, + PipelineCreateIn, + PostProcessorIn, + RetrieverIn, +) +from pydantic import BaseModel + + +def convert_dict_to_pipeline(pl: dict) -> PipelineCreateIn: + def initialize_component(cls, data, extra=None, key_map=None, nested_fields=None): + if not data: + return None + + extra = extra or {} + key_map = key_map or {} + nested_fields = nested_fields or {} + + processed_data = {} + for k, v in data.items(): + mapped_key = key_map.get(k, k) + if mapped_key in nested_fields: + processed_data[mapped_key] = initialize_component(nested_fields[mapped_key], v) + else: + processed_data[mapped_key] = v + if cls == ModelIn: + processed_data["model_type"] = data.get("type", processed_data.get("model_type")) + + processed_data.update(extra) + return cls(**processed_data) + + return PipelineCreateIn( + idx=pl.get("idx"), + name=pl.get("name"), + node_parser=initialize_component(NodeParserIn, pl.get("node_parser"), key_map={"idx": "idx"}), + indexer=initialize_component( + IndexerIn, + pl.get("indexer"), + key_map={"model": "embedding_model", "idx": "idx"}, + nested_fields={"embedding_model": ModelIn}, + ), + retriever=initialize_component(RetrieverIn, pl.get("retriever"), key_map={"idx": "idx"}), + postprocessor=[ + initialize_component( + PostProcessorIn, + pp, + extra={"processor_type": pp.get("processor_type")}, + key_map={"model": "reranker_model", "idx": "idx"}, + nested_fields={"reranker_model": ModelIn}, + ) + for pp in pl.get("postprocessor", []) + ], + generator=initialize_component( + GeneratorIn, pl.get("generator"), key_map={"idx": "idx"}, nested_fields={"model": ModelIn} + ), + active=pl.get("status", {}).get("active", False), + ) + + +def generate_json_id(config, length=8): + if "active" in config: + del config["active"] + if "name" in config: + del config["name"] + config_str = json.dumps(config, sort_keys=True) + unique_id = hashlib.sha256(config_str.encode()).hexdigest() + return unique_id[:length] + + +class RAGPipeline: + pl: PipelineCreateIn + id: int + _backup: Dict + + def __init__(self, pl): + self.pl = pl + self._replace_model_with_id() + self.id = generate_json_id(self.pl.dict()) + + def _replace_model_with_id(self): + self._backup = {} + + def extract_model_id(model): + if model: + if model.model_type not in self._backup: + self._backup[model.model_type] = [] + self._backup[model.model_type].append(model) + return model.model_id + return None + + if self.pl.indexer and self.pl.indexer.embedding_model: + self.pl.indexer.embedding_model = extract_model_id(self.pl.indexer.embedding_model) + + if self.pl.postprocessor: + for proc in self.pl.postprocessor: + if proc.reranker_model: + proc.reranker_model = extract_model_id(proc.reranker_model) + + if self.pl.generator and self.pl.generator.model: + self.pl.generator.model = extract_model_id(self.pl.generator.model) + + def _restore_model_instances(self): + if not self._backup: + self._backup = {} + + def restore_model(model_id, model_type, is_generator=False): + if model_type in self._backup: + for existing_model in self._backup[model_type]: + if existing_model.model_id == model_id: + return existing_model + + weight = self._backup[model_type][0].weight + device = self._backup[model_type][0].device + else: + weight = "INT4" if is_generator else "" + device = "auto" + + model_path = f"./models/{model_id}" + if is_generator: + model_path += f"/{weight}_compressed_weights" + + return ModelIn( + model_type=model_type, model_id=model_id, model_path=model_path, weight=weight, device=device + ) + + if self.pl.indexer and isinstance(self.pl.indexer.embedding_model, str): + self.pl.indexer.embedding_model = restore_model(self.pl.indexer.embedding_model, "embedding") + + if self.pl.postprocessor: + for proc in self.pl.postprocessor: + if isinstance(proc.reranker_model, str): + proc.reranker_model = restore_model(proc.reranker_model, "reranker") + + if self.pl.generator and isinstance(self.pl.generator.model, str): + self.pl.generator.model = restore_model(self.pl.generator.model, "llm", is_generator=True) + + self._backup = None + + def export_pipeline(self): + self._restore_model_instances() + exported_pl = copy.deepcopy(self.pl) + self._replace_model_with_id() + return exported_pl + + def copy(self): + return copy.deepcopy(self) + + def regenerate_id(self): + self.id = generate_json_id(self.pl.dict()) + + def activate_pl(self): + self.pl.active = True + + def deactivate_pl(self): + self.pl.active = False + + def save_to_json(self, save_path="pipeline.json"): + if self.pl: + pipeline_dict = self.export_pipeline().dict() + with open(save_path, "w") as json_file: + json.dump(pipeline_dict, json_file, indent=4) + # print(f'RAG pipeline is successfully exported to "{save_path}"') + + +class ContextType(str, Enum): + GT = "gt" + RETRIEVAL = "retrieval" + POSTPROCESSING = "postprocessing" + + +class RAGStage(str, Enum): + RETRIEVAL = "retrieval" + POSTPROCESSING = "postprocessing" + GENERATION = "generation" + + +class ContextItem(BaseModel): + context_idx: Optional[int] = None + file_name: Optional[str] = None + text: str = "" + metadata: Optional[Dict[str, Union[float, int, list]]] = {} + + +def normalize_text(text): + """Removes whitespace and English/Chinese punctuation from text for fair comparison.""" + return re.sub(r"[ \u3000\n\t,。!?;:“”‘’\"',.;!?()\[\]{}<>《》|]+", "", text) + + +def split_text(text): + """ + Splits text into tokens using a robust regex that handles: + - Whitespace (\n, \t, space) + - English and Chinese punctuation + - Markdown/table symbols like '---|---' and ' | ' + """ + return [t for t in re.split(r"(?:\s*\|+\s*|[ \u3000\n\t,。!?;:“”‘’\"',.;!?()\[\]{}<>《》]+|---+)", text) if t] + + +def calculate_similarity(a, b): + return SequenceMatcher(None, a, b).ratio() + + +def fuzzy_contains(needle, haystack, threshold): + needle_norm = normalize_text(needle) + tokens = split_text(haystack) + + for i in range(len(tokens)): + for j in range(i + 1, len(tokens) + 1): + subtext = "".join(tokens[i:j]) + subtext_norm = normalize_text(subtext) + score = calculate_similarity(needle_norm, subtext_norm) + if score >= threshold: + return True + return False + + +class RAGResult(BaseModel): + metadata: Optional[Dict[str, Union[float, int, list]]] = {} + query_id: Optional[int] = None + query: str + ground_truth: Optional[str] = None + response: Optional[str] = None + + gt_contexts: Optional[List[ContextItem]] = None + retrieval_contexts: Optional[List[ContextItem]] = None + postprocessing_contexts: Optional[List[ContextItem]] = None + + def __init__(self, **data): + super().__init__(**data) + + def __post_init__(self): + for context_type in ContextType: + self.init_context_idx(context_type) + + def copy(self): + return copy.deepcopy(self) + + def init_context_idx(self, context_type): + context_list_name = f"{context_type.value}_contexts" + context_list = getattr(self, context_list_name, None) + if context_list is not None: + for idx, context in enumerate(context_list): + context.context_idx = idx + + def add_context(self, context_type: ContextType, context: ContextItem): + context_list_name = f"{context_type.value}_contexts" + context_list = getattr(self, context_list_name, None) + if context_list is None: + context_list = [] + setattr(self, context_list_name, context_list) + context.context_idx = len(context_list) + context_list.append(context) + + def update_metadata_hits(self, threshold=1): + if self.gt_contexts: + for context_type in [ContextType.RETRIEVAL, ContextType.POSTPROCESSING]: + context_list_name = f"{context_type.value}_contexts" + context_list = getattr(self, context_list_name, None) + if context_list is None: + continue + for context in context_list: + self.context_matches_gt(self.gt_contexts, context, context_type, threshold) + + for context_type in [ContextType.RETRIEVAL, ContextType.POSTPROCESSING]: + count = 0 + for gt_context in self.gt_contexts: + if gt_context.metadata.get(context_type, None): + count += 1 + self.metadata[context_type] = count + + def set_response(self, response: str): + self.response = response + # if self.ground_truth: + # self.metadata = self.cal_metric(self.query, self.ground_truth, response) + + @classmethod + def check_parts_in_text(cls, gt_context, text, threshold): + if threshold < 1: + return fuzzy_contains(gt_context, text, threshold) + else: + parts = gt_context.split() + return all(part in text for part in parts) + + @classmethod + def context_matches_gt( + cls, gt_contexts: List[ContextItem], candidate_context: ContextItem, context_type: ContextType, threshold + ): + for gt in gt_contexts: + if ( + candidate_context.file_name and gt.file_name and gt.file_name in candidate_context.file_name + ) or gt.file_name == "": + if candidate_context.text in gt.text or cls.check_parts_in_text( + gt.text, candidate_context.text, threshold + ): + gt.metadata = gt.metadata or {} + retrieved_file_name_list = gt.metadata.get(context_type, []) + retrieved_file_name_list.append(candidate_context.context_idx) + gt.metadata[context_type] = retrieved_file_name_list + + candidate_context.metadata = candidate_context.metadata or {} + candidate_context.metadata["hit"] = gt.context_idx + return True + return False + + @classmethod + def cal_metric(cls, query: str, ground_truth: str, response: str) -> Dict[str, float]: + # Placeholder: Use actual metric calculations as needed. + accuracy = float(ground_truth in response) + return {"accuracy": accuracy} + + +class RAGResults(BaseModel): + metadata: Optional[Dict[str, Union[float, int, list]]] = None + results: List[RAGResult] = [] + + def add_result(self, result): + self.results.append(result) + self.cal_recall() + + def cal_recall(self): + recall_rates = {} + for context_type in [ContextType.RETRIEVAL, ContextType.POSTPROCESSING]: + hit_count = 0 + gt_count = 0 + for result in self.results: + gt_count += len(result.gt_contexts) if result.gt_contexts else 0 + hit_count += result.metadata.get(context_type, 0) if result.metadata else 0 + + recall_rate = hit_count / gt_count if gt_count > 0 else np.nan + recall_rates[context_type] = recall_rate + + self.metadata = self.metadata or {} + self.metadata["recall_rate"] = recall_rates + + def check_metadata(self): + recall_rates = self.metadata.get("recall_rate", {}) + for key, rate in recall_rates.items(): + print(f"{key}: {rate}") + + def save_to_json(self, file_path: str): + cleaned_metadata = None + if self.metadata: + cleaned_metadata = {str(k): v for k, v in self.metadata.items()} + rag_results_dict = { + **self.dict(exclude={"metadata"}), + "metadata": cleaned_metadata, + } + + with open(file_path, "w", encoding="utf-8") as f: + json.dump(rag_results_dict, f, ensure_ascii=False, indent=4) + # print(f"RAGResults saved to {file_path}") + + def save_to_csv(self, output_dir: str): + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # --- CSV 1: Contexts --- + contexts_csv = output_dir / "rag_contexts.csv" + with contexts_csv.open("w", newline="", encoding="utf-8-sig") as f: + fieldnames = ["query_id", "context_type", "context_idx", "file_name", "text"] + metadata_keys = set() + for result in self.results: + for context_type in ContextType: + context_type_str = f"{context_type.value}_contexts" + context_list: List[ContextItem] = getattr(result, context_type_str) or [] + for ctx in context_list: + if ctx.metadata: + metadata_keys.update(ctx.metadata.keys()) + fieldnames.extend(metadata_keys) + + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + + for result in self.results: + for context_type in ContextType: + context_type_str = f"{context_type.value}_contexts" + context_list: List[ContextItem] = getattr(result, context_type_str) or [] + for ctx in context_list: + row = { + "query_id": result.query_id, + "context_type": context_type_str, + "context_idx": ctx.context_idx, + "file_name": ctx.file_name, + "text": ctx.text, + } + if ctx.metadata: + for key in metadata_keys: + row[key] = ctx.metadata.get(key, "") + writer.writerow(row) + + # --- CSV 2: Summary --- + summary_csv = output_dir / "rag_summary.csv" + with summary_csv.open("w", newline="", encoding="utf-8-sig") as f: + fieldnames = ["query_id", "query", "ground_truth", "response", "gt_count"] + metadata_keys = set() + for result in self.results: + if result.metadata: + metadata_keys.update(result.metadata.keys()) + fieldnames.extend(metadata_keys) + + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + + for result in self.results: + row = { + "query_id": result.query_id, + "query": result.query, + "ground_truth": result.ground_truth, + "response": result.response, + "gt_count": len(result.gt_contexts), + } + if result.metadata: + for key in metadata_keys: + row[key] = result.metadata.get(key, "") + writer.writerow(row) diff --git a/evals/evaluation/rag_pilot/components/constructor/connector.py b/evals/evaluation/rag_pilot/components/pilot/connector.py similarity index 53% rename from evals/evaluation/rag_pilot/components/constructor/connector.py rename to evals/evaluation/rag_pilot/components/pilot/connector.py index abcd64ce..c2094746 100644 --- a/evals/evaluation/rag_pilot/components/constructor/connector.py +++ b/evals/evaluation/rag_pilot/components/pilot/connector.py @@ -4,8 +4,8 @@ import os import requests -from components.constructor.constructor import convert_dict_to_pipeline -from components.constructor.ecrag.api_schema import PipelineCreateIn, RagOut +from components.pilot.base import convert_dict_to_pipeline +from components.pilot.ecrag.api_schema import PipelineCreateIn, RagOut ECRAG_SERVICE_HOST_IP = os.getenv("ECRAG_SERVICE_HOST_IP", "127.0.0.1") ECRAG_SERVICE_PORT = int(os.getenv("ECRAG_SERVICE_PORT", 16010)) @@ -52,3 +52,40 @@ def update_active_pipeline(pipeline): pipeline.active = True res = update_pipeline(pipeline) return res.status_code == 200 + + +def get_ecrag_module_map(ecrag_pl): + ecrag_modules = { + # root + "root": (ecrag_pl, ""), + # node_parser + "node_parser": (ecrag_pl, "node_parser"), + "simple": (ecrag_pl, "node_parser"), + "hierarchical": (ecrag_pl, "node_parser"), + "sentencewindow": (ecrag_pl, "node_parser"), + # indexer + "indexer": (ecrag_pl, "indexer"), + "vector": (ecrag_pl, "indexer"), + "faiss_vector": (ecrag_pl, "indexer"), + # retriever + "retriever": (ecrag_pl, "retriever"), + "vectorsimilarity": (ecrag_pl, "retriever"), + "auto_merge": (ecrag_pl, "retriever"), + "bm25": (ecrag_pl, "retriever"), + # postprocessor + "postprocessor": (ecrag_pl, "postprocessor[0]"), + "reranker": (ecrag_pl, "postprocessor[0]"), + "metadata_replace": (ecrag_pl, "postprocessor[0]"), + # generator + "generator": (ecrag_pl, "generator"), + } + return ecrag_modules + + +COMP_TYPE_MAP = { + "node_parser": "parser_type", + "indexer": "indexer_type", + "retriever": "retriever_type", + "postprocessor": "processor_type", + "generator": "inference_type", +} diff --git a/evals/evaluation/rag_pilot/components/constructor/ecrag/api_schema.py b/evals/evaluation/rag_pilot/components/pilot/ecrag/api_schema.py similarity index 90% rename from evals/evaluation/rag_pilot/components/constructor/ecrag/api_schema.py rename to evals/evaluation/rag_pilot/components/pilot/ecrag/api_schema.py index dbdc96e2..48c08548 100644 --- a/evals/evaluation/rag_pilot/components/constructor/ecrag/api_schema.py +++ b/evals/evaluation/rag_pilot/components/pilot/ecrag/api_schema.py @@ -1,7 +1,7 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -from typing import Optional +from typing import Any, Optional from pydantic import BaseModel @@ -10,7 +10,7 @@ class ModelIn(BaseModel): model_type: Optional[str] = "LLM" model_id: Optional[str] model_path: Optional[str] = "./" - weight: Optional[str] + weight: Optional[str] = "INT4" device: Optional[str] = "cpu" @@ -19,7 +19,7 @@ class NodeParserIn(BaseModel): chunk_overlap: Optional[int] = None chunk_sizes: Optional[list] = None parser_type: str - window_size: Optional[int] = None + window_size: Optional[int] = 3 class IndexerIn(BaseModel): @@ -65,5 +65,5 @@ class FilesIn(BaseModel): class RagOut(BaseModel): query: str - contexts: Optional[list[str]] = None + contexts: Optional[dict[str, Any]] = None response: str diff --git a/evals/evaluation/rag_pilot/components/constructor/ecrag/base.py b/evals/evaluation/rag_pilot/components/pilot/ecrag/base.py similarity index 100% rename from evals/evaluation/rag_pilot/components/constructor/ecrag/base.py rename to evals/evaluation/rag_pilot/components/pilot/ecrag/base.py diff --git a/evals/evaluation/rag_pilot/components/pilot/pilot.py b/evals/evaluation/rag_pilot/components/pilot/pilot.py new file mode 100644 index 00000000..25c1e209 --- /dev/null +++ b/evals/evaluation/rag_pilot/components/pilot/pilot.py @@ -0,0 +1,169 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import csv +import os +from datetime import datetime + +from components.pilot.base import ContextItem, ContextType, RAGPipeline, RAGResults, RAGStage +from components.pilot.connector import get_ragqna, update_active_pipeline + + +def get_rag_results_with_rag_pipeline(rag_pipeline: RAGPipeline, rag_results_sample: RAGResults, hit_threshold): + # Update rag_pipeline + ecrag_pipeline = rag_pipeline.export_pipeline() + update_active_pipeline(ecrag_pipeline) + + # Create a new instance of RAGResults + new_rag_results = RAGResults() + + # Update each rag_result in rag_results and add to new instance + for result in rag_results_sample.results: + query = result.query + ragqna = get_ragqna(query) + + # Create a new result object to avoid modifying the input + new_result = result.copy() + for key, nodes in ragqna.contexts.items(): + for node in nodes: + context_item = ContextItem(file_name=node["node"]["metadata"]["file_name"], text=node["node"]["text"]) + if key == "retriever": + new_result.add_context(ContextType.RETRIEVAL, context_item) + else: + new_result.add_context(ContextType.POSTPROCESSING, context_item) + new_result.update_metadata_hits(hit_threshold) + new_result.set_response(ragqna.response) + new_rag_results.add_result(new_result) + + return new_rag_results + + +class Pilot: + rag_pipeline_dict: dict[int, RAGPipeline] = {} + rag_results_dict: dict[int, RAGResults] = {} + curr_pl_id: int = None + + rag_results_sample: RAGResults + hit_threshold: float + + base_folder: str + + def __init__(self, rag_results_sample, hit_threshold=1): + self.rag_results_sample = rag_results_sample + self.hit_threshold = hit_threshold + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.base_folder = os.path.join(os.getcwd(), f"rag_pilot_{timestamp}") + + def add_rag_pipeline(self, rag_pipeline): + id = rag_pipeline.id + self.rag_pipeline_dict[id] = rag_pipeline + if not self.curr_pl_id: + self.curr_pl_id = id + + def add_rag_results(self, pl_id, rag_results): + self.rag_results_dict[pl_id] = rag_results + + def run_curr_pl(self): + curr_rag_pl = self.rag_pipeline_dict[self.curr_pl_id] + rag_results = get_rag_results_with_rag_pipeline(curr_rag_pl, self.rag_results_sample, self.hit_threshold) + self.add_rag_results(self.curr_pl_id, rag_results) + + def get_curr_pl(self): + return self.rag_pipeline_dict[self.curr_pl_id] + + def get_results(self, id): + return self.rag_results_dict[id] if id in self.rag_results_dict else None + + def get_curr_results(self): + return self.get_results(self.curr_pl_id) + + def change_best_recall_pl(self, stage: RAGStage = None): + best_pl_id = None + best_avg_recall = float("-inf") + + for pl_id, rag_results in self.rag_results_dict.items(): + if not rag_results or not rag_results.metadata: + continue + + recall_rate_dict = rag_results.metadata.get("recall_rate", {}) + if stage in [RAGStage.RETRIEVAL, RAGStage.POSTPROCESSING]: + recall_rate = recall_rate_dict.get(stage, float("-inf")) + else: + recall_rate = float("-inf") + + if recall_rate > best_avg_recall: + best_avg_recall = recall_rate + best_pl_id = pl_id + + if best_pl_id is not None: + self.curr_pl_id = best_pl_id + + def save_dicts(self): + parent_folder = self.base_folder + os.makedirs(parent_folder, exist_ok=True) + + for key, pipeline in self.rag_pipeline_dict.items(): + subfolder = os.path.join(parent_folder, f"entry_{key}") + os.makedirs(subfolder, exist_ok=True) + + pipeline_path = os.path.join(subfolder, "pipeline.json") + pipeline.save_to_json(save_path=pipeline_path) + + rag_results = self.rag_results_dict.get(key) + if rag_results: + rag_results.save_to_json(os.path.join(subfolder, "rag_results.json")) + rag_results.save_to_csv(subfolder) + + self.get_curr_pl().save_to_json(save_path=f"{parent_folder}/curr_pipeline.json") + + curr_results = self.get_curr_results() + if curr_results: + curr_results.save_to_json(os.path.join(parent_folder, "curr_rag_results.json")) + curr_results.save_to_csv(parent_folder) + + self.export_config_and_metadata_csv(save_path=f"{parent_folder}/summary.csv") + print(f"Saved RAG pipeline and results to {parent_folder}") + + def export_config_and_metadata_csv(self, save_path: str): + rows = [] + fieldnames = ["pipeline_id"] + + for pl_id, pipeline in self.rag_pipeline_dict.items(): + config = pipeline.pl.dict() + rag_results = self.rag_results_dict.get(pl_id) + recall_rates = {} + + if rag_results: + recall_rate = rag_results.metadata.get("recall_rate", {}) + retrieval_recall_rate = recall_rate.get(ContextType.RETRIEVAL, None) + postprocessing_recall_rate = recall_rate.get(ContextType.POSTPROCESSING, None) + + if retrieval_recall_rate is not None: + recall_rates["retrieval_recall_rate"] = retrieval_recall_rate + if postprocessing_recall_rate is not None: + recall_rates["postprocessing_recall_rate"] = postprocessing_recall_rate + + for key in config.keys(): + key = f"config_{key}" + if key not in fieldnames: + fieldnames.append(key) + + for recall_key in recall_rates.keys(): + if recall_key not in fieldnames: + fieldnames.append(recall_key) + + row = { + "pipeline_id": pl_id, + **{f"config_{key}": value for key, value in config.items()}, + **recall_rates, # Add recall rates to the row + } + + rows.append(row) + + # Write to CSV + with open(save_path, "w", newline="", encoding="utf-8-sig") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=sorted(fieldnames)) + writer.writeheader() + for row in rows: + writer.writerow(row) diff --git a/evals/evaluation/rag_pilot/components/adaptor/base.py b/evals/evaluation/rag_pilot/components/tuner/adaptor.py similarity index 53% rename from evals/evaluation/rag_pilot/components/adaptor/base.py rename to evals/evaluation/rag_pilot/components/tuner/adaptor.py index b13e2df0..61985020 100644 --- a/evals/evaluation/rag_pilot/components/adaptor/base.py +++ b/evals/evaluation/rag_pilot/components/tuner/adaptor.py @@ -6,6 +6,9 @@ from dataclasses import dataclass, field from typing import Any, Callable, Dict, Optional, Tuple +from components.pilot.base import RAGPipeline +from components.pilot.connector import COMP_TYPE_MAP, get_ecrag_module_map + def get_support_modules(module_name: str, module_map) -> Callable: support_modules = module_map @@ -140,3 +143,92 @@ def set_value(self, attr, value): val = module.get_params(param) if val: module.set_value(param, val) + + +class Adaptor: + + def __init__(self, yaml_data: str): + self.nodes = self.parse_nodes(yaml_data) + self.root_func: Optional[Callable] = None + + self.rag_pipeline: Optional[RAGPipeline] = None + + def parse_nodes(self, yaml_data): + parsed_nodes = {} + for node in yaml_data.get("nodes", []): + node_type = node.get("node") + modules_dict = { + mod.get("module_type"): Module( + type=mod.get("module_type", ""), + params={k: convert_tuple(v) for k, v in mod.items() if k not in ["module_type"]}, + ) + for mod in node.get("modules", []) + if mod.get("module_type") + } + node_params = {k: convert_tuple(v) for k, v in node.items() if k not in ["node", "node_type", "modules"]} + cur_node = Node(type=node_type, params=node_params, modules=modules_dict) + if node_type in parsed_nodes: + parsed_nodes[node_type].append(cur_node) + else: + parsed_nodes[node_type] = [cur_node] + return parsed_nodes + + def get_node(self, node_type, idx=0): + nodes = self.nodes[node_type] if node_type in self.nodes else None + return nodes[idx] if nodes and idx < len(nodes) else None + + def get_modules_from_node(self, node_type, idx=0): + node = self.get_node(node_type, idx) + return node.modules if node else None + + def get_module(self, node_type, module_type, idx=0): + if module_type is None: + return self.get_node(node_type, idx) + else: + modules = self.get_modules_from_node(node_type, idx) + return modules[module_type] if modules and module_type in modules else None + + def update_all_module_functions_tmp(self, rag_pipeline, node_type_map=COMP_TYPE_MAP): + module_map = get_ecrag_module_map(rag_pipeline.pl) + self.root_func = get_support_modules("root", module_map) + + for node_list in self.nodes.values(): + for node in node_list: + node.update_func(module_map) + node.is_active = False + for module in node.modules.values(): + module.update_func(module_map) + module.is_active = False + + self.activate_modules_based_on_type(node_type_map) + + def update_all_module_functions(self, rag_pipeline, node_type_map=COMP_TYPE_MAP): + self.update_all_module_functions_tmp(rag_pipeline, node_type_map) + self.rag_pipeline = rag_pipeline + + def activate_modules_based_on_type(self, node_type_map): + if not self.root_func: + return + + for node_list in self.nodes.values(): + for node in node_list: + node_type = node.type + if not getattr(self.root_func, node_type, None): + continue + node.is_active = True + active_module_type = getattr(node.func, node_type_map[node_type], None) + if active_module_type and active_module_type in node.modules: + node.modules[active_module_type].is_active = True + + def get_rag_pipelines_candidates(self, params_candidates): + rag_pls = [] + for params_candidate in params_candidates: + rag_pl = self.rag_pipeline.copy() + self.update_all_module_functions_tmp(rag_pl) + for attr, ((node_type, module_type), val) in params_candidate.items(): + module = self.get_module(node_type, module_type) + module.set_value(attr, val) + rag_pl.regenerate_id() + rag_pls.append(rag_pl) + self.update_all_module_functions(self.rag_pipeline) + return rag_pls, params_candidates diff --git a/evals/evaluation/rag_pilot/components/tuner/base.py b/evals/evaluation/rag_pilot/components/tuner/base.py index e56ffe9f..a93cb828 100644 --- a/evals/evaluation/rag_pilot/components/tuner/base.py +++ b/evals/evaluation/rag_pilot/components/tuner/base.py @@ -1,72 +1,82 @@ # Copyright (C) 2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -from enum import Enum +from enum import Enum, auto from typing import Callable, List, Optional, Tuple, Union -from pydantic import BaseModel +from pydantic import BaseModel, validator -class RagResult(BaseModel): - query: str - contexts: Optional[List[str]] = None - ground_truth: Optional[str] = None - response: str +class ContentType(Enum): + ALL_CONTEXTS = auto() + CONTEXT = auto() + RESPONSE = auto() -class ContentType(str, Enum): - ALL_CONTEXTS = "all_contexts" - CONTEXT = "context" - RESPONSE = "response" +class OptionItem(BaseModel): + idx: int + content: str + def __str__(self): + return f"{self.idx}: {self.content}" -class QuestionType(str, Enum): - RATING5 = "1-5" - RATING3 = "1-3" - BOOL = "y/n" - SCORE = "Input a number" - MINUS_ONE_ZERO_ONE = "-1, 0, 1" +class UserInput(BaseModel): + hint: str + options: Optional[List[OptionItem]] = None -class Question(BaseModel): - question: str - question_type: QuestionType - content_type: ContentType + @validator("options", pre=True) + def auto_generate_ids(cls, v): + if v is None: + return v + if all(isinstance(item, str) for item in v): + return [{"idx": idx, "content": item} for idx, item in enumerate(v, start=1)] + return v + def __str__(self): + options_str = "" + if self.options: + options_str = "\n" + "\n".join(str(option) for option in self.options) + return f"{self.hint}{options_str}" -class Feedback(BaseModel): - type: QuestionType - feedback: bool | int +class Question(UserInput): + status: Optional[str] = None -class SuggestionType(str, Enum): - SET = "set" - OFFSET = "offset" - CHOOSE = "choose" + def __str__(self): + status_str = f"{self.status}\n" if self.status else "" + options_str = "" + if self.options: + options_str = "\n" + "\n".join(str(option) for option in self.options) + return f"{status_str}{self.hint}{options_str}" -class DirectionType(str, Enum): - INCREASE = "increase" - DECREASE = "decrease" +class SuggestionType(Enum): + CHOOSE = auto() + ITERATE = auto() + SET = auto() + STEPWISE_GROUPED = auto() + STEPWISE = auto() + GRID_SEARCH = auto() -class SuggestionValue(BaseModel): +class Suggestion(UserInput): suggestion_type: SuggestionType - step: Optional[int] = None - direction: Optional[DirectionType] = None - choices: Optional[List] = None - val: Optional[Union[int, float, str]] = None -class Suggestion(BaseModel): +class Feedback(BaseModel): + feedback: bool | int + + +class DirectionType(Enum): + INCREASE = auto() + DECREASE = auto() + + +class Target(BaseModel): + node_type: str + module_type: Optional[str] = None attribute: str - svalue: SuggestionValue - origval: Optional[Union[int, float, str]] = None - is_accepted: bool = False - - def reset(self): - self.is_accepted = False - self.origval = None - self.svalue.val = None - self.svalue.direction = None - self.svalue.choices = None + orig_val: Optional[Union[int, float, str]] = None + new_vals: List[Union[int, float, str]] = None + suggestion: Suggestion = None diff --git a/evals/evaluation/rag_pilot/components/tuner/tuner.py b/evals/evaluation/rag_pilot/components/tuner/tuner.py index f53856c9..2295ab04 100644 --- a/evals/evaluation/rag_pilot/components/tuner/tuner.py +++ b/evals/evaluation/rag_pilot/components/tuner/tuner.py @@ -2,82 +2,36 @@ # SPDX-License-Identifier: Apache-2.0 from abc import ABC, abstractmethod +from itertools import product +from typing import Dict, Optional -from components.tuner.base import ( - ContentType, - DirectionType, - Feedback, - Question, - QuestionType, - RagResult, - Suggestion, - SuggestionType, - SuggestionValue, -) - - -def input_parser(qtype: QuestionType = QuestionType.BOOL): - user_input = input(f"({qtype.value}): ") - - match qtype: - case QuestionType.RATING5: - if user_input.isdigit() and 1 <= int(user_input) <= 5: - return True, int(user_input) - else: - print("Invalid input. Please enter a number between 1 and 5.") - return False, None - - case QuestionType.RATING3: - if user_input.isdigit() and 1 <= int(user_input) <= 3: - return True, int(user_input) - else: - print("Invalid input. Please enter a number between 1 and 3.") - return False, None - - case QuestionType.BOOL: - if user_input.lower() == "y": - return True, True - elif user_input.lower() == "n": - return True, False - else: - print("Invalid input. Please enter 'Y' or 'N'.") - return False, None - - case QuestionType.SCORE: - if user_input.isdigit(): - return True, int(user_input) - else: - print("Invalid input. Please enter a valid score (integer).") - return False, None - - case QuestionType.MINUS_ONE_ZERO_ONE: - if user_input in {"-1", "0", "1"}: - return True, int(user_input) - else: - print("Invalid input. Please enter -1, 0, or 1.") - return False, None - - case _: - print("Unknown question type.") - return False, None - - -def display_ragqna(ragqna, content_type=ContentType.RESPONSE, context_idx=0): +from components.tuner.adaptor import Adaptor +from components.tuner.base import ContentType, Feedback, Question, Suggestion, SuggestionType, Target + + +def input_parser(upper_limit: int = None): + if upper_limit: + user_input = input(f"(1 - {upper_limit}): ") + else: + user_input = input("Provide a number: ") + upper_limit = 10000 + + if user_input.isdigit() and 1 <= int(user_input) <= upper_limit: + return True, int(user_input) + else: + print(f"Invalid input. Please enter a number between 1 and {upper_limit}.") + return False, None + + +def display_ragqna(ragqna): print("\nRAG Query\n" "---------\n" f"{ragqna.query}\n\n" "RAG Response\n" "------------\n" f"{ragqna.response}\n") - if content_type == ContentType.ALL_CONTEXTS: - if ragqna.contexts: - for index, context in enumerate(ragqna.contexts): - cleaned_context = context.replace("\n", " ") - print(f"RAG Context {index}\n" "-------------------\n" f"{cleaned_context}\n") - else: - print("RAG Contexts\n" "------------\n" "None\n") - elif content_type == ContentType.CONTEXT: - print( - f"RAG Contexts {context_idx}\n" - "--------------------------\n" - f"{ragqna.contexts[context_idx] if context_idx in ragqna.contexts else 'None'}\n" - ) + if ragqna.contexts: + for index, context in enumerate(ragqna.contexts): + cleaned_context = context.replace("\n", " ") + print(f"RAG Context {index}\n" "-------------------\n" f"{cleaned_context}\n") + else: + print("RAG Contexts\n" "------------\n" "None\n") def display_list(list): @@ -87,277 +41,432 @@ def display_list(list): class Tuner(ABC): - def __init__(self, ragqna: RagResult, question: str, question_type: QuestionType, content_type: ContentType): - - self.question = Question( - question=question, - question_type=question_type, - content_type=content_type, - ) - self.user_feedback: Feedback = None - self.suggestions: dict[str, Suggestion] = {} + def __init__(self, question: Question, adaptor: Adaptor, targets: Dict[str, Target]): + self.question = question - self.ragqna: RagResult = ragqna + self.adaptor = adaptor + self.targets = targets - self.node_type = None - self.module_type = None - self.module = None - self.module_active = False - - def init_ragqna(self, ragqna): - self.ragqna = ragqna - self.user_feedback = None - for suggestion in self.suggestions.values(): - suggestion.reset() + def check_active(self): + for target in self.targets.values(): + target_obj = self.adaptor.get_module(target.node_type, target.module_type) + if not target_obj.get_status(): + return False + return True - def update_module(self, module): - self.module = module - self.module_active = module.get_status() + def set_param( + self, + param_name, + suggestion_type: SuggestionType, + new_vals: Optional[int] = None, + step: Optional[int] = None, + lower_limit: Optional[int] = None, + count: Optional[int] = 1, + ): + target_obj = None + if param_name in self.targets: + target = self.targets[param_name] + target_obj = self.adaptor.get_module(target.node_type, target.module_type) + + if not target_obj: + print(f"[!] Target not found: node={target.node_type}, module={target.module_type}") + return - def get_suggestions_origval(self): - if not self.module_active: + if not target_obj.get_status(): + print(f"[!] Skipping inactive component: node={target.node_type}, module={target.module_type}") return - for suggestion in self.suggestions.values(): - suggestion.origval = self.module.get_value(suggestion.attribute) - def _display_question(self, content_type, context_idx=0): - display_ragqna(self.ragqna, content_type, context_idx) - print(f"{self}: {self.question.question}\n") + target.orig_val = target_obj.get_value(target.attribute) + + match suggestion_type: + case SuggestionType.STEPWISE_GROUPED | SuggestionType.GRID_SEARCH | SuggestionType.STEPWISE: + if new_vals: + target.new_vals = new_vals + else: + if step is None: + raise ValueError("Step must be provided for stepwise tuning.") + if lower_limit: + start = lower_limit + else: + start = target.orig_val + + if count: + target.new_vals = [start + i * step for i in range(count)] + else: + target.new_vals = [start + step] + + target.suggestion = Suggestion( + hint=f"{target.attribute}'s current value: {target.orig_val}\n" f"Setting it to {target.new_vals}", + suggestion_type=suggestion_type, + ) + case SuggestionType.CHOOSE: + target.new_vals = target_obj.get_params(target.attribute) + target.suggestion = Suggestion( + hint=f"{target.attribute}'s current value: {target.orig_val}\n" + f"Please choose a new value from below:", + options=target.new_vals, + suggestion_type=suggestion_type, + ) + case SuggestionType.ITERATE: + target.new_vals = target_obj.get_params(target.attribute) + target.suggestion = Suggestion( + hint=f"{target.attribute}'s current value: {target.orig_val}\n" f"Iterate from available values", + options=target.new_vals, + suggestion_type=suggestion_type, + ) + case SuggestionType.SET: + if new_vals: + target.new_vals = new_vals + hint = f"Change {target.attribute}'s value: {target.orig_val} -> {new_vals}\n" + else: + target.new_vals = None + hint = f"{target.attribute}'s current value: {target.orig_val}\nPlease enter a new value: " + + target.suggestion = Suggestion(hint=hint, options=target.new_vals, suggestion_type=suggestion_type) def request_feedback(self): - if not self.module_active: - return - - self._display_question(self.question.content_type) + if not self.check_active(): + return False - valid, user_input = input_parser(self.question.question_type) + print(f"\033[1m\033[93m{self}\033[0m: {self.question}\n") + valid, user_input = input_parser(len(self.question.options)) if not valid: return False - self.user_feedback = Feedback(type=self.question.question_type, feedback=user_input) - self.get_suggestions_origval() - return True + self.user_feedback = Feedback(feedback=user_input) + return self._feedback_to_suggestions() @abstractmethod def _feedback_to_suggestions(self): pass - def make_suggestions(self): - if not self.module_active: + def apply_suggestions(self): + if not self.check_active(): return - self._feedback_to_suggestions() - - old_new_values = {} - for attr, suggestion in self.suggestions.items(): - svalue = suggestion.svalue - origval = suggestion.origval - - match svalue.suggestion_type: + params_candidates = [] + + new_values_dict = {} + + # STEPWISE_GROUPED + grouped_targets = { + a: t + for a, t in self.targets.items() + if t.suggestion and t.suggestion.suggestion_type == SuggestionType.STEPWISE_GROUPED + } + if grouped_targets: + count = min(len(t.new_vals) for t in grouped_targets.values()) + + for idx in range(count): + candidate = {a: t.new_vals[idx] for a, t in grouped_targets.items()} + new_values_dict = { + a: [(t.node_type, t.module_type), t.new_vals[idx]] for a, t in grouped_targets.items() + } + params_candidates.append(new_values_dict) + if len(params_candidates) > 0: + return self.adaptor.get_rag_pipelines_candidates(params_candidates) + + # GRID_SEARCH + + grid_targets = { + a: t + for a, t in self.targets.items() + if t.suggestion and t.suggestion.suggestion_type == SuggestionType.GRID_SEARCH + } + if grid_targets: + keys, values_list = zip(*((a, t.new_vals) for a, t in grid_targets.items())) + for combination in product(*values_list): + candidate = dict(zip(keys, combination)) + new_values_dict = {} + for a, val in candidate.items(): + new_values_dict[a] = [(self.targets[a].node_type, self.targets[a].module_type), val] + params_candidates.append(new_values_dict) + if len(params_candidates) > 0: + return self.adaptor.get_rag_pipelines_candidates(params_candidates) + + new_values_dict = {} + for attr, target in self.targets.items(): + suggestion = target.suggestion + if not suggestion or attr in new_values_dict: + continue + + orig_val = target.orig_val + match suggestion.suggestion_type: case SuggestionType.SET: - choices = suggestion.svalue.choices - if not choices: - continue - - print( - f"{attr}'s current value: {origval}\n" - f"We suggest setting a new value from choices: {choices}\n" - f"Please enter a new value: " - ) - valid, user_input = input_parser(QuestionType.SCORE) - if valid: - old_new_values[attr] = [origval, user_input] - - case SuggestionType.OFFSET: - match svalue.direction: - case DirectionType.INCREASE: - old_new_values[attr] = [origval, origval + svalue.step] - case DirectionType.DECREASE: - old_new_values[attr] = [origval, origval - svalue.step] - case _: - print(f"ERROR: Unknown direction '{svalue.direction}' for OFFSET.") + print(f"{suggestion}") + if suggestion.options: + new_values_dict[attr] = [(target.node_type, target.module_type), suggestion.options[0].content] + else: + valid, user_input = input_parser() + if valid: + new_values_dict[attr] = [(target.node_type, target.module_type), user_input] case SuggestionType.CHOOSE: - choices = suggestion.svalue.choices - if not choices: - continue - - display_list(choices) - print(f"{attr}'s current value: {origval}\n" f"Please enter the index of a new value") - valid, user_input = input_parser(QuestionType.SCORE) + print(f"{suggestion}") + new_options = [x for x in suggestion.options if x != orig_val] + valid, user_input = input_parser(len(new_options)) if valid: - if user_input < 0 or user_input >= len(choices): - print(f"ERROR: The chosen index {user_input} is out of range") - elif origval == choices[user_input]: - print(f"ERROR: You chose the current value {user_input}: {choices[user_input]}") - else: - old_new_values[attr] = [origval, choices[user_input]] + chosed_val = suggestion.options[user_input - 1] + new_values_dict[attr] = [(target.node_type, target.module_type), chosed_val.content] + + case SuggestionType.ITERATE: + print(f"{suggestion}") + for option in suggestion.options: + new_values_dict = {} + new_values_dict[attr] = [(target.node_type, target.module_type), option.content] + params_candidates.append(new_values_dict) + if len(params_candidates) > 0: + return self.adaptor.get_rag_pipelines_candidates(params_candidates) + + case SuggestionType.STEPWISE: + if len(target.new_vals) == 1: + val = target.new_vals[idx] + new_values_dict[attr] = [(target.node_type, target.module_type), val] + else: + for idx in range(len(target.new_vals)): + new_values_dict = {} + val = target.new_vals[idx] + new_values_dict[attr] = [(target.node_type, target.module_type), val] + params_candidates.append(new_values_dict) + if len(params_candidates) > 0: + return self.adaptor.get_rag_pipelines_candidates(params_candidates) case _: - print(f"ERROR: Unknown suggestion type '{svalue.suggestion_type}'.") + print(f"ERROR: Unknown suggestion type '{suggestion.suggestion_type}'.") - if not old_new_values: - return False - - is_changed = False - for k, v in old_new_values.items(): - if self._confirm_suggestion(self.suggestions[k], v, skip_confirming=True): - is_changed = True + params_candidates.append(new_values_dict) + return self.adaptor.get_rag_pipelines_candidates(params_candidates) - if is_changed: - self._apply_suggestions() + def __str__(self): + return f"{self.__class__.__name__}" - return is_changed - def _confirm_suggestion(self, suggestion, old_new_value, skip_confirming=False): - svalue = suggestion.svalue - origval, newval = old_new_value +class EmbeddingTuner(Tuner): + def __init__(self, adaptor: Adaptor): + # question + question = Question( + hint="Do you want to tune embedding model", + options=["Yes, iterate it from available options", "No, skip this tuner"], + ) - print( - f"Based on your feedback, {svalue.direction.value if svalue.direction else 'modify'} {suggestion.attribute} {origval} -> {newval}" + targets = {} + # targets + attribute = "embedding_model" + target = Target( + node_type="indexer", + attribute=attribute, ) - if skip_confirming: - valid, user_input = True, True - else: - valid, user_input = input_parser(QuestionType.BOOL) + targets[attribute] = target - if not valid: - return False + super().__init__(question, adaptor, targets) - suggestion.is_accepted = user_input - if suggestion.is_accepted: - suggestion.svalue.val = newval + def _feedback_to_suggestions(self): + assert isinstance(self.user_feedback, Feedback) + if self.user_feedback.feedback == 1: + self.set_param(param_name="embedding_model", suggestion_type=SuggestionType.ITERATE) return True + else: + return False - return False - def _apply_suggestions(self): - for suggestion in self.suggestions.values(): - if suggestion.is_accepted: - self.module.set_value(suggestion.attribute, suggestion.svalue.val) +class NodeParserTuner(Tuner): - def __str__(self): - return f"{self.__class__.__name__}" + def __init__(self, adaptor: Adaptor): + # question + question = Question( + hint="Do you want to tune node parser", + options=["Yes, iterate it from available options", "No, skip this tuner"], + ) + targets = {} + # targets + attribute = "parser_type" + target = Target( + node_type="node_parser", + attribute=attribute, + ) + targets[attribute] = target -class SimpleNodeParserChunkTuner(Tuner): + super().__init__(question, adaptor, targets) + + def _feedback_to_suggestions(self): + assert isinstance(self.user_feedback, Feedback) + if self.user_feedback.feedback == 1: + self.set_param(param_name="parser_type", suggestion_type=SuggestionType.ITERATE) + return True + else: + return False - def __init__(self, ragqna=None): - q = "Is each context split properly? \n -1) Incomplete \n 0) Good enough or skip current tuner \n 1) Includes too many contents" - super().__init__(ragqna, q, QuestionType.MINUS_ONE_ZERO_ONE, ContentType.ALL_CONTEXTS) - self.node_type = "node_parser" - self.module_type = "simple" +class SimpleNodeParserChunkTuner(Tuner): + + def __init__(self, adaptor: Adaptor): + # question + question = Question( + hint="Do you want to tune chunk size and chunk overlap", + options=[ + "Yes, iterate the chunk size and chunk overlap based on current values stepwisely", + "Yes, set them to designated values", + "No, skip this tuner", + ], + ) + targets = {} + # targets attribute = "chunk_size" - suggestion = Suggestion( - svalue=SuggestionValue(suggestion_type=SuggestionType.OFFSET, step=100), + target = Target( + node_type="node_parser", + module_type="simple", attribute=attribute, ) - self.suggestions[attribute] = suggestion + targets[attribute] = target attribute = "chunk_overlap" - suggestion = Suggestion( - svalue=SuggestionValue(suggestion_type=SuggestionType.OFFSET, step=8), + target = Target( + node_type="node_parser", + module_type="simple", attribute=attribute, ) - self.suggestions[attribute] = suggestion + targets[attribute] = target + + super().__init__(question, adaptor, targets) def _feedback_to_suggestions(self): assert isinstance(self.user_feedback, Feedback) - if self.user_feedback.type == QuestionType.MINUS_ONE_ZERO_ONE: - if self.user_feedback.feedback == 1: - for suggestion in self.suggestions.values(): - suggestion.svalue.direction = DirectionType.DECREASE - elif self.user_feedback.feedback == -1: - for suggestion in self.suggestions.values(): - suggestion.svalue.direction = DirectionType.INCREASE + if self.user_feedback.feedback == 1: + self.set_param(param_name="chunk_size", suggestion_type=SuggestionType.STEPWISE_GROUPED, step=100, count=3) + self.set_param( + param_name="chunk_overlap", suggestion_type=SuggestionType.STEPWISE_GROUPED, step=16, count=3 + ) + return True + elif self.user_feedback.feedback == 2: + self.set_param(param_name="chunk_size", suggestion_type=SuggestionType.SET) + self.set_param(param_name="chunk_overlap", suggestion_type=SuggestionType.SET) + return True + else: + return False -class RerankerTopnTuner(Tuner): +class RetrievalTopkTuner(Tuner): - def __init__(self, ragqna=None): - q = ( - "Are the contexts not enough for answering the question or some of them contain irrelevant information?\n" - + " -1) Not enough \n 0) Fine or skip current tuner \n 1) Too many contexts" + def __init__(self, adaptor: Adaptor): + # question + question = Question( + hint="Do you want to tune retrieve's topk", + options=[ + "Yes, iterate it based on current values stepwisely", + "Yes, set it to designated value", + "No, skip this tuner", + ], ) - super().__init__(ragqna, q, QuestionType.MINUS_ONE_ZERO_ONE, ContentType.ALL_CONTEXTS) - - self.node_type = "postprocessor" - self.module_type = "reranker" - attribute = "top_n" - suggestion = Suggestion( - svalue=SuggestionValue(suggestion_type=SuggestionType.OFFSET, step=1), + targets = {} + # targets + attribute = "retrieve_topk" + target = Target( + node_type="retriever", attribute=attribute, ) - self.suggestions[attribute] = suggestion + targets[attribute] = target + + super().__init__(question, adaptor, targets) def _feedback_to_suggestions(self): assert isinstance(self.user_feedback, Feedback) - if self.user_feedback.type == QuestionType.MINUS_ONE_ZERO_ONE: - if self.user_feedback.feedback == 1: - for suggestion in self.suggestions.values(): - suggestion.svalue.direction = DirectionType.DECREASE - elif self.user_feedback.feedback == -1: - for suggestion in self.suggestions.values(): - suggestion.svalue.direction = DirectionType.INCREASE + if self.user_feedback.feedback == 1: + self.set_param( + param_name="retrieve_topk", suggestion_type=SuggestionType.STEPWISE, step=15, lower_limit=30, count=4 + ) + return True + if self.user_feedback.feedback == 2: + self.set_param( + param_name="retrieve_topk", + suggestion_type=SuggestionType.SET, + ) + return True + else: + return False -class EmbeddingLanguageTuner(Tuner): +class RerankerTopnTuner(Tuner): - def __init__(self, ragqna=None): - q = ( - "Does any context contain relevant information for the given RAG query?\n" - + " y) yes or skip current tuner \n n) no" + def __init__(self, adaptor: Adaptor): + # question + question = Question( + hint="Do you want to tune reranker's top_n", + options=["Yes, iterate it based on current values stepwisely", "No, skip this tuner"], ) - super().__init__(ragqna, q, QuestionType.BOOL, ContentType.ALL_CONTEXTS) - - self.node_type = "indexer" - self.module_type = None - attribute = "embedding_model" - suggestion = Suggestion( - svalue=SuggestionValue( - suggestion_type=SuggestionType.CHOOSE, - ), + targets = {} + # targets + attribute = "top_n" + target = Target( + node_type="postprocessor", + module_type="reranker", attribute=attribute, ) - self.suggestions[attribute] = suggestion + targets[attribute] = target + + super().__init__(question, adaptor, targets) def _feedback_to_suggestions(self): assert isinstance(self.user_feedback, Feedback) - if self.user_feedback.type == QuestionType.BOOL: - if not self.user_feedback.feedback: - for suggestion in self.suggestions.values(): - suggestion.svalue.choices = self.module.get_params(suggestion.attribute) + if self.user_feedback.feedback == 1: + self.set_param(param_name="top_n", suggestion_type=SuggestionType.STEPWISE, step=5, lower_limit=5, count=2) + return True + else: + return False -class NodeParserTypeTuner(Tuner): +class RetrievalTopkRerankerTopnTuner(Tuner): - def __init__(self, ragqna=None): - q = ( - "Are all contexts split with similar amount of information? \n" - + "y) All contexts contain similar amount of information or skip current tuner \n" - + "n) Some contexts contain more information while some contain less" + def __init__(self, adaptor: Adaptor): + # question + question = Question( + hint="Do you want to tune retrieve_topk and reranker's top_n", + options=[ + "Yes, iterate it based on current values stepwisely", + "Yes, set retrieve_topk to [30, 50, 100, 200], top_n to [5, 10]", + "No, skip this tuner", + ], ) - super().__init__(ragqna, q, QuestionType.BOOL, ContentType.ALL_CONTEXTS) - self.node_type = "node_parser" - self.module_type = None + targets = {} + # targets + attribute = "retrieve_topk" + target = Target( + node_type="retriever", + attribute=attribute, + ) + targets[attribute] = target - attribute = "parser_type" - suggestion = Suggestion( - svalue=SuggestionValue( - suggestion_type=SuggestionType.CHOOSE, - ), + attribute = "top_n" + target = Target( + node_type="postprocessor", + module_type="reranker", attribute=attribute, ) - self.suggestions[attribute] = suggestion + targets[attribute] = target + + super().__init__(question, adaptor, targets) def _feedback_to_suggestions(self): assert isinstance(self.user_feedback, Feedback) - if self.user_feedback.type == QuestionType.BOOL: - if not self.user_feedback.feedback: - for suggestion in self.suggestions.values(): - suggestion.svalue.choices = self.module.get_params(suggestion.attribute) + if self.user_feedback.feedback == 1: + self.set_param( + param_name="retrieve_topk", suggestion_type=SuggestionType.GRID_SEARCH, step=15, lower_limit=30, count=4 + ) + self.set_param( + param_name="top_n", suggestion_type=SuggestionType.GRID_SEARCH, step=5, lower_limit=5, count=2 + ) + return True + if self.user_feedback.feedback == 2: + self.set_param( + param_name="retrieve_topk", suggestion_type=SuggestionType.GRID_SEARCH, new_vals=[30, 50, 100, 200] + ) + self.set_param( + param_name="top_n", suggestion_type=SuggestionType.GRID_SEARCH, step=5, lower_limit=5, count=2 + ) + return True + else: + return False diff --git a/evals/evaluation/rag_pilot/configs/ecrag.yaml b/evals/evaluation/rag_pilot/configs/ecrag.yaml index 33d3c2bd..efea8f86 100644 --- a/evals/evaluation/rag_pilot/configs/ecrag.yaml +++ b/evals/evaluation/rag_pilot/configs/ecrag.yaml @@ -8,9 +8,15 @@ nodes: chunk_size: 400 chunk_overlap: 48 - module_type: hierarchical - chunk_sizes: [256, 384, 512] + chunk_sizes: + - 384 + - 512 + - 640 - node: indexer - embedding_model: [BAAI/bge-small-zh-v1.5, BAAI/bge-small-en-v1.5] + embedding_model: + - BAAI/bge-m3 + - BAAI/bge-large-zh-v1.5 + - BAAI/bge-large-en-v1.5 modules: - module_type: vector - module_type: faiss_vector @@ -27,6 +33,9 @@ nodes: reranker_model: BAAI/bge-reranker-large - module_type: metadata_replace - node: generator - model: [Qwen/Qwen2-7B-Instruct] - inference_type: [local, vllm] + model: + - Qwen/Qwen2-7B-Instruct + inference_type: + - local + - vllm prompt: null diff --git a/evals/evaluation/rag_pilot/configs/netsec_sample.csv b/evals/evaluation/rag_pilot/configs/netsec_sample.csv new file mode 100644 index 00000000..63eaed1c --- /dev/null +++ b/evals/evaluation/rag_pilot/configs/netsec_sample.csv @@ -0,0 +1,6 @@ +query_id,query,file_name,gt_context,ground_truth +53,故障来源有哪些?,故障处理记录表.txt,故障来源:用户投诉、日志系统、例行维护中发现、其它来源。,故障来源:用户投诉、日志系统、例行维护中发现、其它来源。 +73,故障类别有哪些?,故障处理记录表.txt,故障类别:硬件设备故障、电源故障、传输网故障、数据修改、其它故障。,"故障类别:硬件设备故障、电源故障、传输网故障、数据修改、其它故障。 故障类别:硬件设备故障、电源故障、传输网故障、数据修改、其它故障。" +93,uMAC网元VNFC有哪几种备份方式,index.txt,ZUF-76-04-005 VNFC支持1+1主备冗余,uMAC网元VFNC有3中备份方式: 支持1+1主备冗余,支持N+M负荷分担冗余, 支持1+1互备冗余。 +93,,index.txt,ZUF-76-04-006 VNFC支持N+M负荷分担冗余, +93,,index.txt,ZUF-76-04-008 VNFC支持1+1互备冗余, diff --git a/evals/evaluation/rag_pilot/configs/qa_list_sample.json b/evals/evaluation/rag_pilot/configs/qa_list_sample.json deleted file mode 100644 index 52cd0ae9..00000000 --- a/evals/evaluation/rag_pilot/configs/qa_list_sample.json +++ /dev/null @@ -1,9 +0,0 @@ -[ - { - "query": "鸟类的祖先是恐龙吗?哪篇课文里讲了相关的内容?", - "ground_truth": "是的,鸟类的祖先是恐龙,这一内容在《飞向蓝天的恐龙》一文中有所讨论" - }, - { - "query": "桃花水是什么季节的水?" - } -] diff --git a/evals/evaluation/rag_pilot/configs/rag_results_sample.json b/evals/evaluation/rag_pilot/configs/rag_results_sample.json deleted file mode 100644 index f72f65d1..00000000 --- a/evals/evaluation/rag_pilot/configs/rag_results_sample.json +++ /dev/null @@ -1,15 +0,0 @@ -[ - { - "query": "鸟类的祖先是恐龙吗?哪篇课文里讲了相关的内容?", - "contexts": [ - "默读课文,把不懂的问题写下来,并试着解决。\n 假如你是一个解说员,会怎样简明扼要地介绍恐龙飞向蓝天 ,\n演化成鸟类的过程?\n 课文中的不少语句表达很准确,如 “科学家们希望能够全面揭示\n这一历史进程” 。找出这样的语句读一读,说说自己的体会。 \n小练笔\n        读一读,注意加点的部分,再照样子写一段话。\n数千万年后,它的后代繁衍成一个形态各异的庞大家族。\n有些恐龙像它们的祖先一样用两足奔跑\n3333\n,有些恐龙则用四足行\n333\n走\n3\n。有些恐龙身\n3\n长\n3\n几十米\n333\n,重达数十吨\n33333\n;有些恐龙则身材小巧\n3333\n,\n体重\n33\n只有\n33\n几\n3\n千克\n33\n。有些恐龙凶猛异常\n3333\n,是茹\nrP\n毛饮血的食肉\n33\n动物;\n有些恐龙则温顺可爱\n3333\n,以植物为食\n33333\n。\n科学界存在着多种解释鸟类起源的假说。", - "说到恐龙,人们往往想到凶猛的霸王龙或者笨重、迟\n钝\ndMn\n的马门溪龙;谈起鸟类,我们头脑中自然会浮现轻灵的\n鸽子或者五彩斑斓\nlWn\n的孔雀。二者似乎毫不相干,但近年来\n发现的大量化石显示:在中生代时期,恐龙的一支经过漫\nmSn\n长的演化,最终变成了凌空翱\nWo\n翔的鸟儿。\n早在19世纪,英国学者赫\nhF\n胥\nxO\n黎就注意到恐龙和鸟类在\n骨骼\ngR\n结构上有许多相似之处。在研究了大量恐龙和鸟类化\n石之后,科学家们提出,鸟类不仅\njJn\n和恐龙有亲缘关系,而\n且很可能就是一种小型恐龙的后裔\nyK\n。根据这一假说,一些\n与鸟类亲缘关系较近的恐龙应该长 有羽毛,但一直没有找\n到化石证据。20世纪末期,我国科学家在辽宁西部首次发\n本文作者徐星,选作课文时有改动。\n飞向蓝天的恐龙6\n20\n~ß\u0016rH" - ], - "response": "是的,鸟类的祖先是恐龙,这一内容在《飞向蓝天的恐龙》一文中有所讨论", - "ground_truth": "是的,鸟类的祖先是恐龙,这一内容在《飞向蓝天的恐龙》一文中有所讨论" - }, - { - "query": "桃花水是什么季节的水?", - "response": "桃花水是春季的水" - } -] diff --git a/evals/evaluation/rag_pilot/pipeline_tune.py b/evals/evaluation/rag_pilot/pipeline_tune.py deleted file mode 100644 index b79684df..00000000 --- a/evals/evaluation/rag_pilot/pipeline_tune.py +++ /dev/null @@ -1,252 +0,0 @@ -# Copyright (C) 2025 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import argparse -import json -from enum import Enum - -import yaml -from components.adaptor.adaptor import Adaptor -from components.constructor.connector import get_active_pipeline, get_ragqna, reindex_data, update_active_pipeline -from components.constructor.constructor import ( - COMP_TYPE_MAP, - Constructor, - convert_dict_to_pipeline, - get_ecrag_module_map, -) -from components.tuner.base import ContentType, QuestionType, RagResult -from components.tuner.tuner import ( - EmbeddingLanguageTuner, - NodeParserTypeTuner, - RerankerTopnTuner, - SimpleNodeParserChunkTuner, - display_ragqna, - input_parser, -) - - -def load_qa_list_from_json(file_path): - try: - with open(file_path, "r", encoding="utf-8") as file: - data = json.load(file) - - qa_list = [] - for item in data: - query = item.get("query", "").strip() - ground_truth = item.get("ground_truth", "").strip() - - if query: - qa_list.append({"query": query, "ground_truth": ground_truth}) - - return qa_list - except FileNotFoundError: - print(f"The file '{file_path}' was not found.") - except json.JSONDecodeError: - print(f"Error decoding JSON in the file '{file_path}'.") - except Exception as e: - print(f"An unexpected error occurred: {e}") - - return [] - - -def load_ragresults_from_json(file_path): - try: - with open(file_path, "r", encoding="utf-8") as file: - data = json.load(file) - - qa_list = [] - ragresults = [] - for item in data: - query = item.get("query", "").strip() - ground_truth = item.get("ground_truth", "").strip() - response = item.get("response", "").strip() - contexts = item.get("contexts", []) - - if query and response: - qa_list.append({"query": query, "ground_truth": ground_truth}) - ragresults.append( - RagResult(query=query, ground_truth=ground_truth, response=response, contexts=contexts) - ) - - return qa_list, ragresults - except FileNotFoundError: - print(f"The file '{file_path}' was not found.") - except json.JSONDecodeError: - print(f"Error decoding JSON in the file '{file_path}'.") - except Exception as e: - print(f"An unexpected error occurred: {e}") - - return [], [] - - -def load_pipeline_from_json(file_path): - try: - with open(file_path, "r", encoding="utf-8") as file: - data = json.load(file) - return convert_dict_to_pipeline(data) - except FileNotFoundError: - print(f"The file '{file_path}' was not found.") - except json.JSONDecodeError: - print(f"Error decoding JSON in the file '{file_path}'.") - except Exception as e: - print(f"An unexpected error occurred: {e}") - - return None - - -def read_yaml(file_path): - with open(file_path, "r") as file: - yaml_content = file.read() - return yaml.safe_load(yaml_content) - - -class Mode(str, Enum): - ONLINE = "online" - OFFLINE = "offline" - - -def main(): - parser = argparse.ArgumentParser() - - # common - parser.add_argument( - "-y", - "--rag_module_yaml", - default="configs/ecrag.yaml", - type=str, # TODO rename - help="Path to the YAML file containing all tunable rag configurations.", - ) - parser.add_argument( - "-o", - "--output_config_json", - default="rag_pipeline_out.json", - type=str, - help="Path to the JSON file containing the list of queries.", - ) - - # online - parser.add_argument( - "-q", - "--qa_list_json", - default="configs/qa_list_sample.json", - type=str, - help="Path to the JSON file containing the list of queries.", - ) - - # offline - parser.add_argument("--offline", action="store_true", help="Run offline mode. Default is online mode.") - parser.add_argument( - "-c", - "--rag_pipeline_json", - default="configs/rag_pipeline_sample.json", - type=str, - help="Path to the JSON file containing the list of queries.", - ) - parser.add_argument( - "-r", - "--ragresults_json", - default="configs/rag_results_sample.json", - type=str, - help="Path to the JSON file of the ragresult.", - ) - - args = parser.parse_args() - - mode = Mode.OFFLINE if args.offline else Mode.ONLINE - print(f"Running in {mode.value} mode.") - - constructor = Constructor() - adaptor = Adaptor(read_yaml(args.rag_module_yaml)) - tuners = [ - EmbeddingLanguageTuner(), - NodeParserTypeTuner(), - SimpleNodeParserChunkTuner(), - RerankerTopnTuner(), - ] - - if mode == Mode.ONLINE: - qa_list = load_qa_list_from_json(args.qa_list_json) - else: - qa_list, ragresults = load_ragresults_from_json(args.ragresults_json) - - for qa in qa_list: - query = qa["query"] - ground_truth = qa["ground_truth"] - if mode == Mode.ONLINE: - reindex_data() - while True: - if mode == Mode.ONLINE: - active_pl = get_active_pipeline() - ragqna = get_ragqna(query) - else: - active_pl = load_pipeline_from_json(args.rag_pipeline_json) - ragqna = next((r for r in ragresults if query == query), None) - - if not ragqna or not active_pl: - if not ragqna: - print("Error: RAG result is invalid") - if not active_pl: - print("Error: RAG pipeline is invalid") - break - - constructor.set_pipeline(active_pl) - - print("############################################") - print(f"Tuning RAG with query: “{query}”") - display_ragqna(ragqna) - print(f'Is the response correct or do you want to stop tuning query "{query}"?') - if ground_truth: - print(f'\033[90mGround truth for this query is: "{ground_truth}"\033[0m') - print("y: Completely correct or stop tuning") - print("n: Not correct and keep tuning") - valid, correctness = input_parser(QuestionType.BOOL) - - if valid and correctness: - print("Do you want to check the tuned contexts?") - valid, confirm = input_parser(QuestionType.BOOL) - if valid and confirm: - display_ragqna(ragqna, ContentType.ALL_CONTEXTS) - print("Do you want to save the tuned RAG config?") - valid, save = input_parser(QuestionType.BOOL) - if valid and save: - constructor.save_pipeline_to_json(args.output_config_json) - print("\n\n") - break - - adaptor.update_all_module_functions(get_ecrag_module_map(constructor.pl), COMP_TYPE_MAP) - is_changed = False - for tuner in tuners: - # Tuner setup - module = adaptor.get_module(tuner.node_type, tuner.module_type) - tuner.update_module(module) - tuner.init_ragqna(RagResult(**ragqna.model_dump())) - - # Tuning - if tuner.request_feedback(): - if tuner.make_suggestions(): - is_changed = True - break - - if not is_changed: - print("Nothing changed, tune again?") - valid, cont = input_parser(QuestionType.BOOL) - if valid and not cont: - break - else: - if mode == Mode.ONLINE: - print("Updating pipeline . . .") - pipeline = constructor.export_pipeline() - update_active_pipeline(pipeline) - else: - print("\n") - constructor.save_pipeline_to_json(args.output_config_json) - print( - f"\n\nDo you want to continue with previous RAG pipeline file {args.rag_pipeline_json} and rag_results {args.ragresults_json}?" - ) - valid, cont = input_parser(QuestionType.BOOL) - if valid and not cont: - return - - -if __name__ == "__main__": - main() diff --git a/evals/evaluation/rag_pilot/run_pilot.py b/evals/evaluation/rag_pilot/run_pilot.py new file mode 100644 index 00000000..6b41c8c2 --- /dev/null +++ b/evals/evaluation/rag_pilot/run_pilot.py @@ -0,0 +1,230 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import json +from collections import defaultdict +from enum import Enum +from time import sleep + +import pandas as pd +import yaml +from components.pilot.base import ( + ContextItem, + ContextType, + RAGPipeline, + RAGResult, + RAGResults, + RAGStage, + convert_dict_to_pipeline, +) +from components.pilot.connector import get_active_pipeline, reindex_data +from components.pilot.pilot import Pilot +from components.tuner.adaptor import Adaptor +from components.tuner.tuner import ( + EmbeddingTuner, + NodeParserTuner, + RerankerTopnTuner, + RetrievalTopkRerankerTopnTuner, + RetrievalTopkTuner, + SimpleNodeParserChunkTuner, + input_parser, +) + + +def load_rag_results_from_csv(file_path): + rag_results_raw = pd.read_csv(file_path) + rag_results_dict = defaultdict(lambda: {"gt_contexts": []}) + + for _, rag_result in rag_results_raw.iterrows(): + query_id = rag_result.get("query_id") + + if "query" not in rag_results_dict[query_id]: + rag_results_dict[query_id].update( + { + "query": rag_result.get("query", ""), + "ground_truth": rag_result.get("ground_truth", ""), + } + ) + + gt_context = rag_result.get("gt_context", "") + file_name = rag_result.get("file_name", "") + file_name = "" if pd.isna(file_name) else str(file_name) + + if gt_context: + rag_results_dict[query_id]["gt_contexts"].append(ContextItem(text=gt_context, file_name=file_name)) + + rag_results = RAGResults() + for query_id, data in rag_results_dict.items(): + result = RAGResult( + query_id=query_id, + query=data["query"], + gt_contexts=data["gt_contexts"] if data["gt_contexts"] else None, + ground_truth=data["ground_truth"], + ) + result.init_context_idx(ContextType.GT) + rag_results.add_result(result) + + return rag_results + + +def load_pipeline_from_json(file_path): + try: + with open(file_path, "r", encoding="utf-8") as file: + data = json.load(file) + return convert_dict_to_pipeline(data) + except FileNotFoundError: + print(f"The file '{file_path}' was not found.") + except json.JSONDecodeError: + print(f"Error decoding JSON in the file '{file_path}'.") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + return None + + +def read_yaml(file_path): + with open(file_path, "r") as file: + yaml_content = file.read() + return yaml.safe_load(yaml_content) + + +class Mode(str, Enum): + ONLINE = "online" + OFFLINE = "offline" + + +RESET = "\033[0m" +BOLD = "\033[1m" +YELLOW = "\033[93m" +GREEN = "\033[92m" + + +def main(): + parser = argparse.ArgumentParser() + + # common + parser.add_argument( + "-y", + "--rag_module_yaml", + default="configs/ecrag.yaml", + type=str, + help="Path to the YAML file containing all tunable rag configurations.", + ) + + # online + parser.add_argument( + "-q", + "--qa_list", + default="configs/netsec_sample.csv", + type=str, + help="Path to the file containing the list of queries.", + ) + + args = parser.parse_args() + + adaptor = Adaptor(read_yaml(args.rag_module_yaml)) + + retrieval_tuner_list = [ + EmbeddingTuner(adaptor), + NodeParserTuner(adaptor), + SimpleNodeParserChunkTuner(adaptor), + RetrievalTopkTuner(adaptor), + ] + postprocessing_tuner_list = [RerankerTopnTuner(adaptor)] + + rag_results = load_rag_results_from_csv(args.qa_list) + pilot = Pilot(rag_results_sample=rag_results, hit_threshold=0.9) + + active_pl = RAGPipeline(get_active_pipeline()) + active_pl.regenerate_id() + + pilot.add_rag_pipeline(active_pl) + pilot.run_curr_pl() + + def ask_stage_satisfaction(stage) -> bool: + results = pilot.get_curr_results() + recall_rate = results.metadata.get("recall_rate", {}) if results else {} + + print(f"\n{BOLD}{YELLOW}[STAGE {stage.value}]{RESET} recall_rate is {recall_rate.get(stage)}") + print("Are you satisfied with this metric?\n 1: Yes and jump to next stage\n 2: No and keep tuning") + valid, user_input = input_parser(2) + return valid and user_input == 1 + + def run_tuner_stage(tuner_list, stage): + print(f"\n{BOLD}{YELLOW}🔄 Starting tuning stage: {stage.value}{RESET}") + + for i, tuner in enumerate(tuner_list): + active_pl = pilot.get_curr_pl() + adaptor.update_all_module_functions(active_pl) + + pl_list = [] + params_candidates = [] + + print("") + if tuner.request_feedback(): + pl_list, params_candidates = tuner.apply_suggestions() + for pl, params in zip(pl_list, params_candidates): + print(f"Trying to update pipeline to {params}") + if pl.id != active_pl.id: + pilot.add_rag_pipeline(pl) + pilot.curr_pl_id = pl.id + reindex_data() + pilot.run_curr_pl() + print("Metrics of this pipeline:") + results = pilot.get_results(pl.id) + if results: + results.check_metadata() + + pilot.change_best_recall_pl(stage) + + print("") + for pl, params in zip(pl_list, params_candidates): + if pl.id == pilot.curr_pl_id: + print(f"{BOLD}{GREEN}✅ Changing pipeline to {params} with below metrics:{RESET}") + break + else: + print(f"{BOLD}{GREEN}↩️ Fallback to previous pipeline with below metrics:{RESET}") + pilot.get_curr_results().check_metadata() + + # Ask satisfaction only if not the last tuner + if i < len(tuner_list) - 1: + if ask_stage_satisfaction(stage): + return True + else: + print(f"{BOLD}{YELLOW}⏭️ All tuners tried for {stage.value}, proceeding to next stage...{RESET}") + + return False + + def run_full_tuning(): + # Step 1: POSTPROCESSING initial check + if ask_stage_satisfaction(RAGStage.POSTPROCESSING): + print("User satisfied with POSTPROCESSING. Exiting.") + return + + # Step 2: RETRIEVAL + if ask_stage_satisfaction(RAGStage.RETRIEVAL): + print("User satisfied with RETRIEVAL. Proceeding to POSTPROCESSING tuning...") + else: + _ = run_tuner_stage(retrieval_tuner_list, RAGStage.RETRIEVAL) + if ask_stage_satisfaction(RAGStage.POSTPROCESSING): + print("User satisfied with POSTPROCESSING. Exiting.") + return + sleep(1) + + # Step 3: POSTPROCESSING tuning + print("\nStarting POSTPROCESSING tuning...") + _ = run_tuner_stage(postprocessing_tuner_list, RAGStage.POSTPROCESSING) + + print(f"\n{BOLD}{GREEN}🎯 Tuning complete.{RESET}") + + run_full_tuning() + + print("Metrics of final pipeline:") + pilot.get_curr_results().check_metadata() + + pilot.save_dicts() + + +if __name__ == "__main__": + main()