diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index d5d1b4a8b32d..3b6e779d261f 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -27,6 +27,7 @@ from langflow.api.v2 import files_router as files_router_v2 from langflow.api.v2 import mcp_router as mcp_router_v2 from langflow.api.v2 import registration_router as registration_router_v2 +from langflow.api.v2 import storage_settings_router as storage_settings_router_v2 from langflow.api.v2 import workflow_router as workflow_router_v2 router_v1 = APIRouter( @@ -62,6 +63,7 @@ router_v2.include_router(files_router_v2) router_v2.include_router(mcp_router_v2) router_v2.include_router(registration_router_v2) +router_v2.include_router(storage_settings_router_v2) router_v2.include_router(workflow_router_v2) router = APIRouter( diff --git a/src/backend/base/langflow/api/v2/__init__.py b/src/backend/base/langflow/api/v2/__init__.py index b3a6e06f570c..65862e09755b 100644 --- a/src/backend/base/langflow/api/v2/__init__.py +++ b/src/backend/base/langflow/api/v2/__init__.py @@ -1,8 +1,15 @@ """V2 API module.""" +from langflow.api.v2.storage_settings import router as storage_settings_router + from .files import router as files_router from .mcp import router as mcp_router from .registration import router as registration_router from .workflow import router as workflow_router -__all__ = ["files_router", "mcp_router", "registration_router", "workflow_router"] +__all__ = [ + "files_router", + "mcp_router", + "registration_router", + "workflow_routerstorage_settings_router", +] diff --git a/src/backend/base/langflow/api/v2/storage_settings.py b/src/backend/base/langflow/api/v2/storage_settings.py new file mode 100644 index 000000000000..6ddcc4681439 --- /dev/null +++ b/src/backend/base/langflow/api/v2/storage_settings.py @@ -0,0 +1,270 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from langflow.api.utils import CurrentActiveUser +from langflow.services.deps import get_settings_service, session_scope +from langflow.services.settings.service import SettingsService +from langflow.services.variable.constants import CREDENTIAL_TYPE +from langflow.services.variable.service import DatabaseVariableService + +router = APIRouter(tags=["Storage Settings"], prefix="/storage-settings") + +# Storage settings are persisted as internal variables with this prefix +STORAGE_SETTINGS_PREFIX = "__storage_" + + +class StorageSettingsResponse(BaseModel): + """Storage settings response model.""" + + default_storage_location: str + component_aws_access_key_id: str | None + component_aws_secret_access_key: str | None + component_aws_default_bucket: str | None + component_aws_default_region: str | None + component_google_drive_service_account_key: str | None + component_google_drive_default_folder_id: str | None + + +class StorageSettingsUpdate(BaseModel): + """Storage settings update model.""" + + default_storage_location: str | None = None + component_aws_access_key_id: str | None = None + component_aws_secret_access_key: str | None = None + component_aws_default_bucket: str | None = None + component_aws_default_region: str | None = None + component_google_drive_service_account_key: str | None = None + component_google_drive_default_folder_id: str | None = None + + +@router.get("", response_model=StorageSettingsResponse) +async def get_storage_settings( + current_user: CurrentActiveUser, + settings_service: Annotated[SettingsService, Depends(get_settings_service)], +): + """Get global storage settings for file components. + + Settings are loaded from database variables (persisted) with fallback to + in-memory settings and environment variables. + """ + async with session_scope() as session: + variable_service = DatabaseVariableService(settings_service) + + # Helper to get variable value with fallback to settings + async def get_setting(var_name: str, setting_attr: str) -> str | None: + try: + var = await variable_service.get_variable_object( + user_id=current_user.id, name=f"{STORAGE_SETTINGS_PREFIX}{var_name}", session=session + ) + # Decrypt the value + from langflow.services.auth import utils as auth_utils + + return auth_utils.decrypt_api_key(var.value, settings_service=settings_service) + except ValueError: + # Variable not found in DB, use in-memory setting + return getattr(settings_service.settings, setting_attr) + + # Load settings from database or fallback to in-memory + default_storage_location = await get_setting("default_storage_location", "default_storage_location") or "Local" + aws_access_key_id = await get_setting("component_aws_access_key_id", "component_aws_access_key_id") + aws_secret_access_key = await get_setting("component_aws_secret_access_key", "component_aws_secret_access_key") + aws_default_bucket = await get_setting("component_aws_default_bucket", "component_aws_default_bucket") + aws_default_region = await get_setting("component_aws_default_region", "component_aws_default_region") + gdrive_service_account_key = await get_setting( + "component_google_drive_service_account_key", "component_google_drive_service_account_key" + ) + gdrive_default_folder_id = await get_setting( + "component_google_drive_default_folder_id", "component_google_drive_default_folder_id" + ) + + # Mask sensitive values for security + masked_aws_secret = "*" * 8 if aws_secret_access_key else None + masked_gdrive_key = "*" * 8 if gdrive_service_account_key else None + + return StorageSettingsResponse( + default_storage_location=default_storage_location, + component_aws_access_key_id=aws_access_key_id, + component_aws_secret_access_key=masked_aws_secret, + component_aws_default_bucket=aws_default_bucket, + component_aws_default_region=aws_default_region, + component_google_drive_service_account_key=masked_gdrive_key, + component_google_drive_default_folder_id=gdrive_default_folder_id, + ) + + +@router.patch("", response_model=StorageSettingsResponse) +async def update_storage_settings( + settings_update: StorageSettingsUpdate, + current_user: CurrentActiveUser, + settings_service: Annotated[SettingsService, Depends(get_settings_service)], +): + """Update global storage settings for file components. + + Settings are persisted to the database and will survive restarts. + """ + from langflow.services.auth import utils as auth_utils + + async with session_scope() as session: + variable_service = DatabaseVariableService(settings_service) + + # Helper to get current value from DB or fallback to in-memory + async def get_current_value(var_name: str, setting_attr: str) -> str | None: + try: + var = await variable_service.get_variable_object( + user_id=current_user.id, name=f"{STORAGE_SETTINGS_PREFIX}{var_name}", session=session + ) + return auth_utils.decrypt_api_key(var.value, settings_service=settings_service) + except ValueError: + return getattr(settings_service.settings, setting_attr) + + # Determine the final storage location after update + final_storage_location = ( + settings_update.default_storage_location + if settings_update.default_storage_location is not None + else await get_current_value("default_storage_location", "default_storage_location") or "Local" + ) + + # Validate AWS credentials if AWS is selected + if final_storage_location == "AWS": + # Check if we're updating credentials or if they already exist + final_aws_key_id = ( + settings_update.component_aws_access_key_id + if settings_update.component_aws_access_key_id is not None + else await get_current_value("component_aws_access_key_id", "component_aws_access_key_id") + ) + final_aws_secret = await get_current_value( + "component_aws_secret_access_key", "component_aws_secret_access_key" + ) + if settings_update.component_aws_secret_access_key is not None and not all( + c == "*" for c in settings_update.component_aws_secret_access_key + ): + final_aws_secret = settings_update.component_aws_secret_access_key + + final_aws_bucket = ( + settings_update.component_aws_default_bucket + if settings_update.component_aws_default_bucket is not None + else await get_current_value("component_aws_default_bucket", "component_aws_default_bucket") + ) + + # Validate required AWS fields + if not final_aws_key_id: + raise HTTPException( + status_code=400, + detail="AWS Access Key ID is required when AWS storage is selected", + ) + if not final_aws_secret: + raise HTTPException( + status_code=400, + detail="AWS Secret Access Key is required when AWS storage is selected", + ) + if not final_aws_bucket: + raise HTTPException( + status_code=400, + detail="AWS Default Bucket is required when AWS storage is selected", + ) + + # Validate Google Drive credentials if Google Drive is selected + if final_storage_location == "Google Drive": + # Check if we're updating credentials or if they already exist + final_gdrive_key = await get_current_value( + "component_google_drive_service_account_key", "component_google_drive_service_account_key" + ) + if settings_update.component_google_drive_service_account_key is not None and not all( + c == "*" for c in settings_update.component_google_drive_service_account_key + ): + final_gdrive_key = settings_update.component_google_drive_service_account_key + + # Validate required Google Drive fields + if not final_gdrive_key: + raise HTTPException( + status_code=400, + detail="Google Drive Service Account Key is required when Google Drive storage is selected", + ) + + # Helper to create or update a variable in database + async def save_setting(var_name: str, value: str) -> None: + full_var_name = f"{STORAGE_SETTINGS_PREFIX}{var_name}" + try: + # Try to update existing + await variable_service.update_variable( + user_id=current_user.id, name=full_var_name, value=value, session=session + ) + except ValueError: + # Variable doesn't exist, create it + await variable_service.create_variable( + user_id=current_user.id, + name=full_var_name, + value=value, + type_=CREDENTIAL_TYPE, + session=session, + ) + + # Persist updates to database + if settings_update.default_storage_location is not None: + await save_setting("default_storage_location", settings_update.default_storage_location) + + if settings_update.component_aws_access_key_id is not None: + await save_setting("component_aws_access_key_id", settings_update.component_aws_access_key_id) + + # Only update secret if not masked (not just asterisks) + if settings_update.component_aws_secret_access_key is not None and not all( + c == "*" for c in settings_update.component_aws_secret_access_key + ): + await save_setting("component_aws_secret_access_key", settings_update.component_aws_secret_access_key) + + if settings_update.component_aws_default_bucket is not None: + await save_setting("component_aws_default_bucket", settings_update.component_aws_default_bucket) + + if settings_update.component_aws_default_region is not None: + await save_setting("component_aws_default_region", settings_update.component_aws_default_region) + + # Only update service account key if not masked + if settings_update.component_google_drive_service_account_key is not None and not all( + c == "*" for c in settings_update.component_google_drive_service_account_key + ): + await save_setting( + "component_google_drive_service_account_key", settings_update.component_google_drive_service_account_key + ) + + if settings_update.component_google_drive_default_folder_id is not None: + await save_setting( + "component_google_drive_default_folder_id", + settings_update.component_google_drive_default_folder_id, + ) + + # Commit the transaction + await session.commit() + + # Get final values for response + final_aws_access_key_id = await get_current_value("component_aws_access_key_id", "component_aws_access_key_id") + final_aws_secret_access_key = await get_current_value( + "component_aws_secret_access_key", "component_aws_secret_access_key" + ) + final_aws_default_bucket = await get_current_value( + "component_aws_default_bucket", "component_aws_default_bucket" + ) + final_aws_default_region = await get_current_value( + "component_aws_default_region", "component_aws_default_region" + ) + final_gdrive_service_account_key = await get_current_value( + "component_google_drive_service_account_key", "component_google_drive_service_account_key" + ) + final_gdrive_default_folder_id = await get_current_value( + "component_google_drive_default_folder_id", "component_google_drive_default_folder_id" + ) + + # Return masked values for security + masked_aws_secret = "*" * 8 if final_aws_secret_access_key else None + masked_gdrive_key = "*" * 8 if final_gdrive_service_account_key else None + + return StorageSettingsResponse( + default_storage_location=final_storage_location, + component_aws_access_key_id=final_aws_access_key_id, + component_aws_secret_access_key=masked_aws_secret, + component_aws_default_bucket=final_aws_default_bucket, + component_aws_default_region=final_aws_default_region, + component_google_drive_service_account_key=masked_gdrive_key, + component_google_drive_default_folder_id=final_gdrive_default_folder_id, + ) diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json b/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json index 02d18cc4c014..bcd51bce0cf1 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json @@ -1307,7 +1307,7 @@ "legacy": false, "lf_version": "1.4.3", "metadata": { - "code_hash": "5008cc086d7f", + "code_hash": "77f7fd5e1f43", "dependencies": { "dependencies": [ { @@ -1467,7 +1467,7 @@ "show": true, "title_case": false, "type": "code", - "value": "\"\"\"Enhanced file component with Docling support and process isolation.\n\nNotes:\n-----\n- ALL Docling parsing/export runs in a separate OS process to prevent memory\n growth and native library state from impacting the main Langflow process.\n- Standard text/structured parsing continues to use existing BaseFileComponent\n utilities (and optional threading via `parallel_load_data`).\n\"\"\"\n\nfrom __future__ import annotations\n\nimport contextlib\nimport json\nimport subprocess\nimport sys\nimport textwrap\nfrom copy import deepcopy\nfrom pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom typing import Any\n\nfrom lfx.base.data.base_file import BaseFileComponent\nfrom lfx.base.data.storage_utils import parse_storage_path, read_file_bytes, validate_image_content_type\nfrom lfx.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom lfx.inputs import SortableListInput\nfrom lfx.inputs.inputs import DropdownInput, MessageTextInput, StrInput\nfrom lfx.io import BoolInput, FileInput, IntInput, Output, SecretStrInput\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame # noqa: TC001\nfrom lfx.schema.message import Message\nfrom lfx.services.deps import get_settings_service, get_storage_service\nfrom lfx.utils.async_helpers import run_until_complete\nfrom lfx.utils.validate_cloud import is_astra_cloud_environment\n\n\ndef _get_storage_location_options():\n \"\"\"Get storage location options, filtering out Local if in Astra cloud environment.\"\"\"\n all_options = [{\"name\": \"AWS\", \"icon\": \"Amazon\"}, {\"name\": \"Google Drive\", \"icon\": \"google\"}]\n if is_astra_cloud_environment():\n return all_options\n return [{\"name\": \"Local\", \"icon\": \"hard-drive\"}, *all_options]\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"File component with optional Docling processing (isolated in a subprocess).\"\"\"\n\n display_name = \"Read File\"\n # description is now a dynamic property - see get_tool_description()\n _base_description = \"Loads content from one or more files.\"\n documentation: str = \"https://docs.langflow.org/read-file\"\n icon = \"file-text\"\n name = \"File\"\n add_tool_output = True # Enable tool mode toggle without requiring tool_mode inputs\n\n # Extensions that can be processed without Docling (using standard text parsing)\n TEXT_EXTENSIONS = TEXT_FILE_TYPES\n\n # Extensions that require Docling for processing (images, advanced office formats, etc.)\n DOCLING_ONLY_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"jpg\",\n \"jpeg\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"webp\",\n ]\n\n # Docling-supported/compatible extensions; TEXT_FILE_TYPES are supported by the base loader.\n VALID_EXTENSIONS = [\n *TEXT_EXTENSIONS,\n *DOCLING_ONLY_EXTENSIONS,\n ]\n\n # Fixed export settings used when markdown export is requested.\n EXPORT_FORMAT = \"Markdown\"\n IMAGE_MODE = \"placeholder\"\n\n _base_inputs = deepcopy(BaseFileComponent.get_base_inputs())\n\n for input_item in _base_inputs:\n if isinstance(input_item, FileInput) and input_item.name == \"path\":\n input_item.real_time_refresh = True\n input_item.tool_mode = False # Disable tool mode for file upload input\n input_item.required = False # Make it optional so it doesn't error in tool mode\n break\n\n inputs = [\n SortableListInput(\n name=\"storage_location\",\n display_name=\"Storage Location\",\n placeholder=\"Select Location\",\n info=\"Choose where to read the file from.\",\n options=_get_storage_location_options(),\n real_time_refresh=True,\n limit=1,\n ),\n *_base_inputs,\n StrInput(\n name=\"file_path_str\",\n display_name=\"File Path\",\n info=(\n \"Path to the file to read. Used when component is called as a tool. \"\n \"If not provided, will use the uploaded file from 'path' input.\"\n ),\n show=False,\n advanced=True,\n tool_mode=True, # Required for Toolset toggle, but _get_tools() ignores this parameter\n required=False,\n ),\n # AWS S3 specific inputs\n SecretStrInput(\n name=\"aws_access_key_id\",\n display_name=\"AWS Access Key ID\",\n info=\"AWS Access key ID.\",\n show=False,\n advanced=False,\n required=True,\n ),\n SecretStrInput(\n name=\"aws_secret_access_key\",\n display_name=\"AWS Secret Key\",\n info=\"AWS Secret Key.\",\n show=False,\n advanced=False,\n required=True,\n ),\n StrInput(\n name=\"bucket_name\",\n display_name=\"S3 Bucket Name\",\n info=\"Enter the name of the S3 bucket.\",\n show=False,\n advanced=False,\n required=True,\n ),\n StrInput(\n name=\"aws_region\",\n display_name=\"AWS Region\",\n info=\"AWS region (e.g., us-east-1, eu-west-1).\",\n show=False,\n advanced=False,\n ),\n StrInput(\n name=\"s3_file_key\",\n display_name=\"S3 File Key\",\n info=\"The key (path) of the file in S3 bucket.\",\n show=False,\n advanced=False,\n required=True,\n ),\n # Google Drive specific inputs\n SecretStrInput(\n name=\"service_account_key\",\n display_name=\"GCP Credentials Secret Key\",\n info=\"Your Google Cloud Platform service account JSON key as a secret string (complete JSON content).\",\n show=False,\n advanced=False,\n required=True,\n ),\n StrInput(\n name=\"file_id\",\n display_name=\"Google Drive File ID\",\n info=(\"The Google Drive file ID to read. The file must be shared with the service account email.\"),\n show=False,\n advanced=False,\n required=True,\n ),\n BoolInput(\n name=\"advanced_mode\",\n display_name=\"Advanced Parser\",\n value=False,\n real_time_refresh=True,\n info=(\n \"Enable advanced document processing and export with Docling for PDFs, images, and office documents. \"\n \"Note that advanced document processing can consume significant resources.\"\n ),\n # Disabled in cloud\n show=not is_astra_cloud_environment(),\n ),\n DropdownInput(\n name=\"pipeline\",\n display_name=\"Pipeline\",\n info=\"Docling pipeline to use\",\n options=[\"standard\", \"vlm\"],\n value=\"standard\",\n advanced=True,\n real_time_refresh=True,\n ),\n DropdownInput(\n name=\"ocr_engine\",\n display_name=\"OCR Engine\",\n info=\"OCR engine to use. Only available when pipeline is set to 'standard'.\",\n options=[\"None\", \"easyocr\"],\n value=\"easyocr\",\n show=False,\n advanced=True,\n ),\n StrInput(\n name=\"md_image_placeholder\",\n display_name=\"Image placeholder\",\n info=\"Specify the image placeholder for markdown exports.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n StrInput(\n name=\"md_page_break_placeholder\",\n display_name=\"Page break placeholder\",\n info=\"Add this placeholder between pages in the markdown output.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n MessageTextInput(\n name=\"doc_key\",\n display_name=\"Doc Key\",\n info=\"The key to use for the DoclingDocument column.\",\n value=\"doc\",\n advanced=True,\n show=False,\n ),\n # Deprecated input retained for backward-compatibility.\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n BoolInput(\n name=\"markdown\",\n display_name=\"Markdown Export\",\n info=\"Export processed documents to Markdown format. Only available when advanced mode is enabled.\",\n value=False,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\", tool_mode=True),\n ]\n\n # ------------------------------ Tool description with file names --------------\n\n def get_tool_description(self) -> str:\n \"\"\"Return a dynamic description that includes the names of uploaded files.\n\n This helps the Agent understand which files are available to read.\n \"\"\"\n base_description = \"Loads and returns the content from uploaded files.\"\n\n # Get the list of uploaded file paths\n file_paths = getattr(self, \"path\", None)\n if not file_paths:\n return base_description\n\n # Ensure it's a list\n if not isinstance(file_paths, list):\n file_paths = [file_paths]\n\n # Extract just the file names from the paths\n file_names = []\n for fp in file_paths:\n if fp:\n name = Path(fp).name\n file_names.append(name)\n\n if file_names:\n files_str = \", \".join(file_names)\n return f\"{base_description} Available files: {files_str}. Call this tool to read these files.\"\n\n return base_description\n\n @property\n def description(self) -> str:\n \"\"\"Dynamic description property that includes uploaded file names.\"\"\"\n return self.get_tool_description()\n\n async def _get_tools(self) -> list:\n \"\"\"Override to create a tool without parameters.\n\n The Read File component should use the files already uploaded via UI,\n not accept file paths from the Agent (which wouldn't know the internal paths).\n \"\"\"\n from langchain_core.tools import StructuredTool\n from pydantic import BaseModel\n\n # Empty schema - no parameters needed\n class EmptySchema(BaseModel):\n \"\"\"No parameters required - uses pre-uploaded files.\"\"\"\n\n async def read_files_tool() -> str:\n \"\"\"Read the content of uploaded files.\"\"\"\n try:\n result = self.load_files_message()\n if hasattr(result, \"get_text\"):\n return result.get_text()\n if hasattr(result, \"text\"):\n return result.text\n return str(result)\n except (FileNotFoundError, ValueError, OSError, RuntimeError) as e:\n return f\"Error reading files: {e}\"\n\n description = self.get_tool_description()\n\n tool = StructuredTool(\n name=\"load_files_message\",\n description=description,\n coroutine=read_files_tool,\n args_schema=EmptySchema,\n handle_tool_error=True,\n tags=[\"load_files_message\"],\n metadata={\n \"display_name\": \"Read File\",\n \"display_description\": description,\n },\n )\n\n return [tool]\n\n # ------------------------------ UI helpers --------------------------------------\n\n def _path_value(self, template: dict) -> list[str]:\n \"\"\"Return the list of currently selected file paths from the template.\"\"\"\n return template.get(\"path\", {}).get(\"file_path\", [])\n\n def _disable_docling_fields_in_cloud(self, build_config: dict[str, Any]) -> None:\n \"\"\"Disable all Docling-related fields in cloud environments.\"\"\"\n if \"advanced_mode\" in build_config:\n build_config[\"advanced_mode\"][\"show\"] = False\n build_config[\"advanced_mode\"][\"value\"] = False\n # Hide all Docling-related fields\n docling_fields = (\"pipeline\", \"ocr_engine\", \"doc_key\", \"md_image_placeholder\", \"md_page_break_placeholder\")\n for field in docling_fields:\n if field in build_config:\n build_config[field][\"show\"] = False\n # Also disable OCR engine specifically\n if \"ocr_engine\" in build_config:\n build_config[\"ocr_engine\"][\"value\"] = \"None\"\n\n def update_build_config(\n self,\n build_config: dict[str, Any],\n field_value: Any,\n field_name: str | None = None,\n ) -> dict[str, Any]:\n \"\"\"Show/hide Advanced Parser and related fields based on selection context.\"\"\"\n # Update storage location options dynamically based on cloud environment\n if \"storage_location\" in build_config:\n updated_options = _get_storage_location_options()\n build_config[\"storage_location\"][\"options\"] = updated_options\n\n # Handle storage location selection\n if field_name == \"storage_location\":\n # Extract selected storage location\n selected = [location[\"name\"] for location in field_value] if isinstance(field_value, list) else []\n\n # Hide all storage-specific fields first\n storage_fields = [\n \"aws_access_key_id\",\n \"aws_secret_access_key\",\n \"bucket_name\",\n \"aws_region\",\n \"s3_file_key\",\n \"service_account_key\",\n \"file_id\",\n ]\n\n for f_name in storage_fields:\n if f_name in build_config:\n build_config[f_name][\"show\"] = False\n\n # Show fields based on selected storage location\n if len(selected) == 1:\n location = selected[0]\n\n if location == \"Local\":\n # Show file upload input for local storage\n if \"path\" in build_config:\n build_config[\"path\"][\"show\"] = True\n\n elif location == \"AWS\":\n # Hide file upload input, show AWS fields\n if \"path\" in build_config:\n build_config[\"path\"][\"show\"] = False\n\n aws_fields = [\n \"aws_access_key_id\",\n \"aws_secret_access_key\",\n \"bucket_name\",\n \"aws_region\",\n \"s3_file_key\",\n ]\n for f_name in aws_fields:\n if f_name in build_config:\n build_config[f_name][\"show\"] = True\n build_config[f_name][\"advanced\"] = False\n\n elif location == \"Google Drive\":\n # Hide file upload input, show Google Drive fields\n if \"path\" in build_config:\n build_config[\"path\"][\"show\"] = False\n\n gdrive_fields = [\"service_account_key\", \"file_id\"]\n for f_name in gdrive_fields:\n if f_name in build_config:\n build_config[f_name][\"show\"] = True\n build_config[f_name][\"advanced\"] = False\n # No storage location selected - show file upload by default\n elif \"path\" in build_config:\n build_config[\"path\"][\"show\"] = True\n\n return build_config\n\n if field_name == \"path\":\n paths = self._path_value(build_config)\n\n # Disable in cloud environments\n if is_astra_cloud_environment():\n self._disable_docling_fields_in_cloud(build_config)\n else:\n # If all files can be processed by docling, do so\n allow_advanced = all(not file_path.endswith((\".csv\", \".xlsx\", \".parquet\")) for file_path in paths)\n build_config[\"advanced_mode\"][\"show\"] = allow_advanced\n if not allow_advanced:\n build_config[\"advanced_mode\"][\"value\"] = False\n docling_fields = (\n \"pipeline\",\n \"ocr_engine\",\n \"doc_key\",\n \"md_image_placeholder\",\n \"md_page_break_placeholder\",\n )\n for field in docling_fields:\n if field in build_config:\n build_config[field][\"show\"] = False\n\n # Docling Processing\n elif field_name == \"advanced_mode\":\n # Disable in cloud environments - don't show Docling fields even if advanced_mode is toggled\n if is_astra_cloud_environment():\n self._disable_docling_fields_in_cloud(build_config)\n else:\n docling_fields = (\n \"pipeline\",\n \"ocr_engine\",\n \"doc_key\",\n \"md_image_placeholder\",\n \"md_page_break_placeholder\",\n )\n for field in docling_fields:\n if field in build_config:\n build_config[field][\"show\"] = bool(field_value)\n if field == \"pipeline\":\n build_config[field][\"advanced\"] = not bool(field_value)\n\n elif field_name == \"pipeline\":\n # Disable in cloud environments - don't show OCR engine even if pipeline is changed\n if is_astra_cloud_environment():\n self._disable_docling_fields_in_cloud(build_config)\n elif field_value == \"standard\":\n build_config[\"ocr_engine\"][\"show\"] = True\n build_config[\"ocr_engine\"][\"value\"] = \"easyocr\"\n else:\n build_config[\"ocr_engine\"][\"show\"] = False\n build_config[\"ocr_engine\"][\"value\"] = \"None\"\n\n return build_config\n\n def update_outputs(self, frontend_node: dict[str, Any], field_name: str, field_value: Any) -> dict[str, Any]: # noqa: ARG002\n \"\"\"Dynamically show outputs based on file count/type and advanced mode.\"\"\"\n if field_name not in [\"path\", \"advanced_mode\", \"pipeline\"]:\n return frontend_node\n\n template = frontend_node.get(\"template\", {})\n paths = self._path_value(template)\n if not paths:\n return frontend_node\n\n frontend_node[\"outputs\"] = []\n if len(paths) == 1:\n file_path = paths[0] if field_name == \"path\" else frontend_node[\"template\"][\"path\"][\"file_path\"][0]\n if file_path.endswith((\".csv\", \".xlsx\", \".parquet\")):\n frontend_node[\"outputs\"].append(\n Output(\n display_name=\"Structured Content\",\n name=\"dataframe\",\n method=\"load_files_structured\",\n tool_mode=True,\n ),\n )\n elif file_path.endswith(\".json\"):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"json\", method=\"load_files_json\", tool_mode=True),\n )\n\n advanced_mode = frontend_node.get(\"template\", {}).get(\"advanced_mode\", {}).get(\"value\", False)\n if advanced_mode:\n frontend_node[\"outputs\"].append(\n Output(\n display_name=\"Structured Output\",\n name=\"advanced_dataframe\",\n method=\"load_files_dataframe\",\n tool_mode=True,\n ),\n )\n frontend_node[\"outputs\"].append(\n Output(\n display_name=\"Markdown\", name=\"advanced_markdown\", method=\"load_files_markdown\", tool_mode=True\n ),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\", tool_mode=True),\n )\n else:\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\", tool_mode=True),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\", tool_mode=True),\n )\n else:\n # Multiple files => DataFrame output; advanced parser disabled\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Files\", name=\"dataframe\", method=\"load_files\", tool_mode=True)\n )\n\n return frontend_node\n\n # ------------------------------ Core processing ----------------------------------\n\n def _get_selected_storage_location(self) -> str:\n \"\"\"Get the selected storage location from the SortableListInput.\"\"\"\n if hasattr(self, \"storage_location\") and self.storage_location:\n if isinstance(self.storage_location, list) and len(self.storage_location) > 0:\n return self.storage_location[0].get(\"name\", \"\")\n if isinstance(self.storage_location, dict):\n return self.storage_location.get(\"name\", \"\")\n return \"Local\" # Default to Local if not specified\n\n def _validate_and_resolve_paths(self) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Override to handle file_path_str input from tool mode and cloud storage.\n\n Priority:\n 1. Cloud storage (AWS/Google Drive) if selected\n 2. file_path_str (if provided by the tool call)\n 3. path (uploaded file from UI)\n \"\"\"\n storage_location = self._get_selected_storage_location()\n\n # Handle AWS S3\n if storage_location == \"AWS\":\n return self._read_from_aws_s3()\n\n # Handle Google Drive\n if storage_location == \"Google Drive\":\n return self._read_from_google_drive()\n\n # Handle Local storage\n # Check if file_path_str is provided (from tool mode)\n file_path_str = getattr(self, \"file_path_str\", None)\n if file_path_str:\n # Use the string path from tool mode\n from pathlib import Path\n\n from lfx.schema.data import Data\n\n resolved_path = Path(self.resolve_path(file_path_str))\n if not resolved_path.exists():\n msg = f\"File or directory not found: {file_path_str}\"\n self.log(msg)\n if not self.silent_errors:\n raise ValueError(msg)\n return []\n\n data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: str(resolved_path)})\n return [BaseFileComponent.BaseFile(data_obj, resolved_path, delete_after_processing=False)]\n\n # Otherwise use the default implementation (uses path FileInput)\n return super()._validate_and_resolve_paths()\n\n def _read_from_aws_s3(self) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Read file from AWS S3.\"\"\"\n from lfx.base.data.cloud_storage_utils import create_s3_client, validate_aws_credentials\n\n # Validate AWS credentials\n validate_aws_credentials(self)\n if not getattr(self, \"s3_file_key\", None):\n msg = \"S3 File Key is required\"\n raise ValueError(msg)\n\n # Create S3 client\n s3_client = create_s3_client(self)\n\n # Download file to temp location\n import tempfile\n\n # Get file extension from S3 key\n file_extension = Path(self.s3_file_key).suffix or \"\"\n\n with tempfile.NamedTemporaryFile(mode=\"wb\", suffix=file_extension, delete=False) as temp_file:\n temp_file_path = temp_file.name\n try:\n s3_client.download_fileobj(self.bucket_name, self.s3_file_key, temp_file)\n except Exception as e:\n # Clean up temp file on failure\n with contextlib.suppress(OSError):\n Path(temp_file_path).unlink()\n msg = f\"Failed to download file from S3: {e}\"\n raise RuntimeError(msg) from e\n\n # Create BaseFile object\n from lfx.schema.data import Data\n\n temp_path = Path(temp_file_path)\n data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: str(temp_path)})\n return [BaseFileComponent.BaseFile(data_obj, temp_path, delete_after_processing=True)]\n\n def _read_from_google_drive(self) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Read file from Google Drive.\"\"\"\n import tempfile\n\n from googleapiclient.http import MediaIoBaseDownload\n\n from lfx.base.data.cloud_storage_utils import create_google_drive_service\n\n # Validate Google Drive credentials\n if not getattr(self, \"service_account_key\", None):\n msg = \"GCP Credentials Secret Key is required for Google Drive storage\"\n raise ValueError(msg)\n if not getattr(self, \"file_id\", None):\n msg = \"Google Drive File ID is required\"\n raise ValueError(msg)\n\n # Create Google Drive service with read-only scope\n drive_service = create_google_drive_service(\n self.service_account_key, scopes=[\"https://www.googleapis.com/auth/drive.readonly\"]\n )\n\n # Get file metadata to determine file name and extension\n try:\n file_metadata = drive_service.files().get(fileId=self.file_id, fields=\"name,mimeType\").execute()\n file_name = file_metadata.get(\"name\", \"download\")\n except Exception as e:\n msg = (\n f\"Unable to access file with ID '{self.file_id}'. \"\n f\"Error: {e!s}. \"\n \"Please ensure: 1) The file ID is correct, 2) The file exists, \"\n \"3) The service account has been granted access to this file.\"\n )\n raise ValueError(msg) from e\n\n # Download file to temp location\n file_extension = Path(file_name).suffix or \"\"\n with tempfile.NamedTemporaryFile(mode=\"wb\", suffix=file_extension, delete=False) as temp_file:\n temp_file_path = temp_file.name\n try:\n request = drive_service.files().get_media(fileId=self.file_id)\n downloader = MediaIoBaseDownload(temp_file, request)\n done = False\n while not done:\n _status, done = downloader.next_chunk()\n except Exception as e:\n # Clean up temp file on failure\n with contextlib.suppress(OSError):\n Path(temp_file_path).unlink()\n msg = f\"Failed to download file from Google Drive: {e}\"\n raise RuntimeError(msg) from e\n\n # Create BaseFile object\n from lfx.schema.data import Data\n\n temp_path = Path(temp_file_path)\n data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: str(temp_path)})\n return [BaseFileComponent.BaseFile(data_obj, temp_path, delete_after_processing=True)]\n\n def _is_docling_compatible(self, file_path: str) -> bool:\n \"\"\"Lightweight extension gate for Docling-compatible types.\"\"\"\n docling_exts = (\n \".adoc\",\n \".asciidoc\",\n \".asc\",\n \".bmp\",\n \".csv\",\n \".dotx\",\n \".dotm\",\n \".docm\",\n \".docx\",\n \".htm\",\n \".html\",\n \".jpg\",\n \".jpeg\",\n \".json\",\n \".md\",\n \".pdf\",\n \".png\",\n \".potx\",\n \".ppsx\",\n \".pptm\",\n \".potm\",\n \".ppsm\",\n \".pptx\",\n \".tiff\",\n \".txt\",\n \".xls\",\n \".xlsx\",\n \".xhtml\",\n \".xml\",\n \".webp\",\n )\n return file_path.lower().endswith(docling_exts)\n\n async def _get_local_file_for_docling(self, file_path: str) -> tuple[str, bool]:\n \"\"\"Get a local file path for Docling processing, downloading from S3 if needed.\n\n Args:\n file_path: Either a local path or S3 key (format \"flow_id/filename\")\n\n Returns:\n tuple[str, bool]: (local_path, should_delete) where should_delete indicates\n if this is a temporary file that should be cleaned up\n \"\"\"\n settings = get_settings_service().settings\n if settings.storage_type == \"local\":\n return file_path, False\n\n # S3 storage - download to temp file\n parsed = parse_storage_path(file_path)\n if not parsed:\n msg = f\"Invalid S3 path format: {file_path}. Expected 'flow_id/filename'\"\n raise ValueError(msg)\n\n storage_service = get_storage_service()\n flow_id, filename = parsed\n\n # Get file content from S3\n content = await storage_service.get_file(flow_id, filename)\n\n suffix = Path(filename).suffix\n with NamedTemporaryFile(mode=\"wb\", suffix=suffix, delete=False) as tmp_file:\n tmp_file.write(content)\n temp_path = tmp_file.name\n\n return temp_path, True\n\n def _process_docling_in_subprocess(self, file_path: str) -> Data | None:\n \"\"\"Run Docling in a separate OS process and map the result to a Data object.\n\n We avoid multiprocessing pickling by launching `python -c \"