diff --git a/data_juicer/ops/mapper/__init__.py b/data_juicer/ops/mapper/__init__.py index 08cbe84a35..82d7d29c41 100644 --- a/data_juicer/ops/mapper/__init__.py +++ b/data_juicer/ops/mapper/__init__.py @@ -74,6 +74,8 @@ RemoveWordsWithIncorrectSubstringsMapper, ) from .replace_content_mapper import ReplaceContentMapper +from .s3_download_file_mapper import S3DownloadFileMapper +from .s3_upload_file_mapper import S3UploadFileMapper from .sdxl_prompt2prompt_mapper import SDXLPrompt2PromptMapper from .sentence_augmentation_mapper import SentenceAugmentationMapper from .sentence_split_mapper import SentenceSplitMapper @@ -173,6 +175,8 @@ "RemoveTableTextMapper", "RemoveWordsWithIncorrectSubstringsMapper", "ReplaceContentMapper", + "S3DownloadFileMapper", + "S3UploadFileMapper", "SDXLPrompt2PromptMapper", "SentenceAugmentationMapper", "SentenceSplitMapper", diff --git a/data_juicer/ops/mapper/s3_download_file_mapper.py b/data_juicer/ops/mapper/s3_download_file_mapper.py new file mode 100644 index 0000000000..994af5a165 --- /dev/null +++ b/data_juicer/ops/mapper/s3_download_file_mapper.py @@ -0,0 +1,410 @@ +import asyncio +import copy +import os +import os.path as osp +from typing import List, Union + +import boto3 +from botocore.exceptions import ClientError +from loguru import logger + +from data_juicer.ops.base_op import OPERATORS, Mapper +from data_juicer.utils.s3_utils import get_aws_credentials + +OP_NAME = "s3_download_file_mapper" + + +@OPERATORS.register_module(OP_NAME) +class S3DownloadFileMapper(Mapper): + """Mapper to download files from S3 to local files or load them into memory. + + This operator downloads files from S3 URLs (s3://...) or handles local files. It supports: + - Downloading multiple files concurrently + - Saving files to a specified directory or loading content into memory + - Resume download functionality + - S3 authentication with access keys + - Custom S3 endpoints (for S3-compatible services like MinIO) + + The operator processes nested lists of URLs/paths, maintaining the original structure + in the output.""" + + _batched_op = True + + def __init__( + self, + download_field: str = None, + save_dir: str = None, + save_field: str = None, + resume_download: bool = False, + timeout: int = 30, + max_concurrent: int = 10, + # S3 credentials + aws_access_key_id: str = None, + aws_secret_access_key: str = None, + aws_session_token: str = None, + aws_region: str = None, + endpoint_url: str = None, + *args, + **kwargs, + ): + """ + Initialization method. + + :param download_field: The field name to get the URL/path to download. + :param save_dir: The directory to save downloaded files. + :param save_field: The field name to save the downloaded file content. + :param resume_download: Whether to resume download. If True, skip the sample if it exists. + :param timeout: (Deprecated) Kept for backward compatibility, not used for S3 downloads. + :param max_concurrent: Maximum concurrent downloads. + :param aws_access_key_id: AWS access key ID for S3. + :param aws_secret_access_key: AWS secret access key for S3. + :param aws_session_token: AWS session token for S3 (optional). + :param aws_region: AWS region for S3. + :param endpoint_url: Custom S3 endpoint URL (for S3-compatible services). + :param args: extra args + :param kwargs: extra args + """ + super().__init__(*args, **kwargs) + self._init_parameters = self.remove_extra_parameters(locals()) + + self.download_field = download_field + self.save_dir = save_dir + self.save_field = save_field + self.resume_download = resume_download + + if not (self.save_dir or self.save_field): + logger.warning( + "Both `save_dir` and `save_field` are not specified. Use the default `image_bytes` key to " + "save the downloaded contents." + ) + self.save_field = self.image_bytes_key + + if self.save_dir: + os.makedirs(self.save_dir, exist_ok=True) + + self.timeout = timeout + self.max_concurrent = max_concurrent + + # Prepare config dict for get_aws_credentials + ds_config = {} + if aws_access_key_id: + ds_config["aws_access_key_id"] = aws_access_key_id + if aws_secret_access_key: + ds_config["aws_secret_access_key"] = aws_secret_access_key + if aws_session_token: + ds_config["aws_session_token"] = aws_session_token + if aws_region: + ds_config["aws_region"] = aws_region + if endpoint_url: + ds_config["endpoint_url"] = endpoint_url + + # Get credentials with priority: environment variables > operator parameters + ( + resolved_access_key_id, + resolved_secret_access_key, + resolved_session_token, + resolved_region, + ) = get_aws_credentials(ds_config) + + # Store S3 configuration (don't create client here to avoid serialization issues) + self.s3_config = None + self._s3_client = None + if resolved_access_key_id and resolved_secret_access_key: + self.s3_config = { + "aws_access_key_id": resolved_access_key_id, + "aws_secret_access_key": resolved_secret_access_key, + } + if resolved_session_token: + self.s3_config["aws_session_token"] = resolved_session_token + if resolved_region: + self.s3_config["region_name"] = resolved_region + if endpoint_url: + self.s3_config["endpoint_url"] = endpoint_url + logger.info(f"S3 configuration stored with endpoint: {endpoint_url or 'default'}") + else: + logger.info("No S3 credentials provided. S3 URLs will not be supported.") + + @property + def s3_client(self): + """Lazy initialization of S3 client to avoid serialization issues with Ray.""" + if self._s3_client is None and self.s3_config is not None: + self._s3_client = boto3.client("s3", **self.s3_config) + logger.debug("S3 client initialized (lazy)") + return self._s3_client + + def _is_s3_url(self, url: str) -> bool: + """Check if the URL is an S3 URL.""" + return url.startswith("s3://") + + def _parse_s3_url(self, s3_url: str): + """Parse S3 URL into bucket and key. + + Example: s3://bucket-name/path/to/file.mp4 -> ('bucket-name', 'path/to/file.mp4') + """ + if not s3_url.startswith("s3://"): + raise ValueError(f"Invalid S3 URL: {s3_url}") + + parts = s3_url[5:].split("/", 1) + bucket = parts[0] + key = parts[1] if len(parts) > 1 else "" + + return bucket, key + + def _download_from_s3(self, s3_url: str, save_path: str = None, return_content: bool = False): + """Download a file from S3. + + :param s3_url: S3 URL (s3://bucket/key) + :param save_path: Local path to save the file + :param return_content: Whether to return file content as bytes + :return: (status, response, content, save_path) + """ + if not self.s3_client: + raise ValueError("S3 client not initialized. Please provide AWS credentials.") + + try: + bucket, key = self._parse_s3_url(s3_url) + + if save_path: + # Ensure parent directory exists + save_dir = os.path.dirname(save_path) + if save_dir: + os.makedirs(save_dir, exist_ok=True) + + # Download to file + self.s3_client.download_file(bucket, key, save_path) + logger.debug(f"Downloaded S3 file: {s3_url} -> {save_path}") + + # Read content if needed + content = None + if return_content: + with open(save_path, "rb") as f: + content = f.read() + + return "success", None, content, save_path + + elif return_content: + # Download to memory + response = self.s3_client.get_object(Bucket=bucket, Key=key) + content = response["Body"].read() + logger.debug(f"Downloaded S3 file to memory: {s3_url}") + + return "success", None, content, None + + else: + return "success", None, None, None + + except ClientError as e: + error_msg = f"S3 download failed: {e}" + logger.error(error_msg) + return "failed", error_msg, None, None + except Exception as e: + error_msg = f"S3 download error: {e}" + logger.error(error_msg) + return "failed", error_msg, None, None + + async def download_files_async(self, urls, return_contents, save_dir=None, **kwargs): + """Download files asynchronously from S3.""" + + async def _download_file( + semaphore: asyncio.Semaphore, + idx: int, + url: str, + save_dir=None, + return_content=False, + **kwargs, + ) -> dict: + async with semaphore: + try: + status, response, content, save_path = "success", None, None, None + + # Handle S3 URLs (synchronous operation in async context) + if self._is_s3_url(url): + if save_dir: + filename = os.path.basename(self._parse_s3_url(url)[1]) + save_path = osp.join(save_dir, filename) + + # Check if file exists and resume is enabled + if os.path.exists(save_path) and self.resume_download: + if return_content: + with open(save_path, "rb") as f: + content = f.read() + return idx, save_path, status, response, content + + # Download from S3 (run in executor to avoid blocking) + loop = asyncio.get_event_loop() + status, response, content, save_path = await loop.run_in_executor( + None, self._download_from_s3, url, save_path, return_content + ) + return idx, save_path, status, response, content + + # Check for HTTP/HTTPS URLs - not supported + if url.startswith("http://") or url.startswith("https://"): + raise ValueError( + f"HTTP/HTTPS URLs are not supported. This mapper only supports S3 URLs (s3://...) and local files. Got: {url}" + ) + + # Handle local files + if return_content: + with open(url, "rb") as f: + content = f.read() + if save_dir: + save_path = url + + return idx, save_path, status, response, content + + except Exception as e: + status = "failed" + response = str(e) + save_path = None + content = None + + return idx, save_path, status, response, content + + semaphore = asyncio.Semaphore(self.max_concurrent) + tasks = [ + _download_file(semaphore, idx, url, save_dir, return_contents[idx], **kwargs) + for idx, url in enumerate(urls) + ] + results = await asyncio.gather(*tasks) + results.sort(key=lambda x: x[0]) + + return results + + def _flat_urls(self, nested_urls): + """Flatten nested URLs while preserving structure information.""" + flat_urls = [] + structure_info = [] # save as original index, sub index + + for idx, urls in enumerate(nested_urls): + if isinstance(urls, list): + for sub_idx, url in enumerate(urls): + flat_urls.append(url) + structure_info.append((idx, sub_idx)) + else: + flat_urls.append(urls) + structure_info.append((idx, -1)) # -1 means single str element + + return flat_urls, structure_info + + def _create_path_struct(self, nested_urls, keep_failed_url=True) -> List[Union[str, List[str]]]: + """Create path structure for output.""" + if keep_failed_url: + reconstructed = copy.deepcopy(nested_urls) + else: + reconstructed = [] + for item in nested_urls: + if isinstance(item, list): + reconstructed.append([None] * len(item)) + else: + reconstructed.append(None) + + return reconstructed + + def _create_save_field_struct(self, nested_urls, save_field_contents=None) -> List[Union[bytes, List[bytes]]]: + """Create save field structure for output.""" + if save_field_contents is None: + save_field_contents = [] + for item in nested_urls: + if isinstance(item, list): + save_field_contents.append([None] * len(item)) + else: + save_field_contents.append(None) + else: + # check whether the save_field_contents format is correct and correct it automatically + for i, item in enumerate(nested_urls): + if isinstance(item, list): + if not save_field_contents[i] or len(save_field_contents[i]) != len(item): + save_field_contents[i] = [None] * len(item) + + return save_field_contents + + async def download_nested_urls( + self, nested_urls: List[Union[str, List[str]]], save_dir=None, save_field_contents=None + ): + """Download nested URLs with structure preservation.""" + flat_urls, structure_info = self._flat_urls(nested_urls) + + if save_field_contents is None: + # not save contents, set return_contents to False + return_contents = [False] * len(flat_urls) + else: + # if original content None, set bool value to True to get content else False to skip reload it + return_contents = [] + for item in save_field_contents: + if isinstance(item, list): + return_contents.extend([not c for c in item]) + else: + return_contents.append(not item) + + download_results = await self.download_files_async( + flat_urls, + return_contents, + save_dir, + ) + + if self.save_dir: + reconstructed_path = self._create_path_struct(nested_urls) + else: + reconstructed_path = None + + failed_info = "" + for i, (idx, save_path, status, response, content) in enumerate(download_results): + orig_idx, sub_idx = structure_info[i] + if status != "success": + save_path = flat_urls[i] + failed_info += "\n" + str(response) + + if save_field_contents is not None: + if return_contents[i]: + if sub_idx == -1: + save_field_contents[orig_idx] = content + else: + save_field_contents[orig_idx][sub_idx] = content + + if self.save_dir: + if sub_idx == -1: + reconstructed_path[orig_idx] = save_path + else: + reconstructed_path[orig_idx][sub_idx] = save_path + + return save_field_contents, reconstructed_path, failed_info + + def process_batched(self, samples): + """Process a batch of samples.""" + if self.download_field not in samples or not samples[self.download_field]: + return samples + + batch_nested_urls = samples[self.download_field] + + if self.save_field: + if not self.resume_download: + if self.save_field in samples: + raise ValueError( + f"{self.save_field} is already in samples. " + f"If you want to resume download, please set `resume_download=True`" + ) + save_field_contents = self._create_save_field_struct(batch_nested_urls) + else: + if self.save_field not in samples: + save_field_contents = self._create_save_field_struct(batch_nested_urls) + else: + save_field_contents = self._create_save_field_struct(batch_nested_urls, samples[self.save_field]) + else: + save_field_contents = None + + save_field_contents, reconstructed_path, failed_info = asyncio.run( + self.download_nested_urls( + batch_nested_urls, save_dir=self.save_dir, save_field_contents=save_field_contents + ) + ) + + if self.save_dir: + samples[self.download_field] = reconstructed_path + + if self.save_field: + samples[self.save_field] = save_field_contents + + if len(failed_info): + logger.error(f"Failed files:\n{failed_info}") + + return samples diff --git a/data_juicer/ops/mapper/s3_upload_file_mapper.py b/data_juicer/ops/mapper/s3_upload_file_mapper.py new file mode 100644 index 0000000000..7942b6e78d --- /dev/null +++ b/data_juicer/ops/mapper/s3_upload_file_mapper.py @@ -0,0 +1,320 @@ +import asyncio +import os +from typing import List, Union + +import boto3 +from botocore.exceptions import ClientError +from loguru import logger + +from data_juicer.ops.base_op import OPERATORS, Mapper +from data_juicer.utils.s3_utils import get_aws_credentials + +OP_NAME = "s3_upload_file_mapper" + + +@OPERATORS.register_module(OP_NAME) +class S3UploadFileMapper(Mapper): + """Mapper to upload local files to S3 and update paths to S3 URLs. + + This operator uploads files from local paths to S3 storage. It supports: + - Uploading multiple files concurrently + - Updating file paths in the dataset to S3 URLs + - Optional deletion of local files after successful upload + - Custom S3 endpoints (for S3-compatible services like MinIO) + - Skipping already uploaded files (based on S3 key) + + The operator processes nested lists of paths, maintaining the original structure + in the output.""" + + _batched_op = True + + def __init__( + self, + upload_field: str = None, + s3_bucket: str = None, + s3_prefix: str = "", + # S3 credentials + aws_access_key_id: str = None, + aws_secret_access_key: str = None, + aws_session_token: str = None, + aws_region: str = None, + endpoint_url: str = None, + # Upload options + remove_local: bool = False, + skip_existing: bool = True, + max_concurrent: int = 10, + *args, + **kwargs, + ): + """ + Initialization method. + + :param upload_field: The field name containing file paths to upload. + :param s3_bucket: S3 bucket name to upload files to. + :param s3_prefix: Prefix (folder path) in S3 bucket. E.g., 'videos/' or 'data/videos/'. + :param aws_access_key_id: AWS access key ID for S3. + :param aws_secret_access_key: AWS secret access key for S3. + :param aws_session_token: AWS session token for S3 (optional). + :param aws_region: AWS region for S3. + :param endpoint_url: Custom S3 endpoint URL (for S3-compatible services). + :param remove_local: Whether to delete local files after successful upload. + :param skip_existing: Whether to skip uploading if file already exists in S3. + :param max_concurrent: Maximum concurrent uploads. + :param args: extra args + :param kwargs: extra args + """ + super().__init__(*args, **kwargs) + self._init_parameters = self.remove_extra_parameters(locals()) + + self.upload_field = upload_field + self.s3_bucket = s3_bucket + self.s3_prefix = s3_prefix.rstrip("/") + "/" if s3_prefix and not s3_prefix.endswith("/") else s3_prefix or "" + self.remove_local = remove_local + self.skip_existing = skip_existing + self.max_concurrent = max_concurrent + + if not self.s3_bucket: + raise ValueError("s3_bucket must be specified") + + # Prepare config dict for get_aws_credentials + ds_config = {} + if aws_access_key_id: + ds_config["aws_access_key_id"] = aws_access_key_id + if aws_secret_access_key: + ds_config["aws_secret_access_key"] = aws_secret_access_key + if aws_session_token: + ds_config["aws_session_token"] = aws_session_token + if aws_region: + ds_config["aws_region"] = aws_region + if endpoint_url: + ds_config["endpoint_url"] = endpoint_url + + # Get credentials with priority: environment variables > operator parameters + ( + resolved_access_key_id, + resolved_secret_access_key, + resolved_session_token, + resolved_region, + ) = get_aws_credentials(ds_config) + + if not (resolved_access_key_id and resolved_secret_access_key): + raise ValueError( + "AWS credentials (aws_access_key_id and aws_secret_access_key) must be provided " + "either through operator parameters or environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)" + ) + + # Store S3 configuration (don't create client here to avoid serialization issues) + self.s3_config = { + "aws_access_key_id": resolved_access_key_id, + "aws_secret_access_key": resolved_secret_access_key, + } + if resolved_session_token: + self.s3_config["aws_session_token"] = resolved_session_token + if resolved_region: + self.s3_config["region_name"] = resolved_region + if endpoint_url: + self.s3_config["endpoint_url"] = endpoint_url + + self._s3_client = None + logger.info( + f"S3 upload mapper initialized: bucket={s3_bucket}, prefix={self.s3_prefix}, endpoint={endpoint_url or 'default'}" + ) + + @property + def s3_client(self): + """Lazy initialization of S3 client to avoid serialization issues with Ray.""" + if self._s3_client is None: + self._s3_client = boto3.client("s3", **self.s3_config) + logger.debug("S3 client initialized (lazy)") + return self._s3_client + + def _is_s3_url(self, path: str) -> bool: + """Check if the path is already an S3 URL.""" + return isinstance(path, str) and path.startswith("s3://") + + def _check_s3_exists(self, s3_key: str) -> bool: + """Check if a file exists in S3.""" + try: + self.s3_client.head_object(Bucket=self.s3_bucket, Key=s3_key) + return True + except ClientError: + return False + + def _upload_to_s3(self, local_path: str) -> tuple: + """Upload a single file to S3. + + :param local_path: Local file path to upload + :return: (status, s3_url, error_message) + """ + # Already an S3 URL, skip + if self._is_s3_url(local_path): + logger.debug(f"Path is already S3 URL: {local_path}") + return "skipped", local_path, None + + # Check if file exists locally + if not os.path.exists(local_path): + error_msg = f"Local file not found: {local_path}" + logger.warning(error_msg) + return "failed", local_path, error_msg + + try: + # Construct S3 key + filename = os.path.basename(local_path) + s3_key = self.s3_prefix + filename + s3_url = f"s3://{self.s3_bucket}/{s3_key}" + + # Check if file already exists in S3 + if self.skip_existing and self._check_s3_exists(s3_key): + logger.debug(f"File already exists in S3, skipping: {s3_url}") + + # Delete local file if configured + if self.remove_local: + try: + os.remove(local_path) + logger.debug(f"Removed local file: {local_path}") + except Exception as e: + logger.warning(f"Failed to remove local file {local_path}: {e}") + + return "exists", s3_url, None + + # Upload to S3 + self.s3_client.upload_file(local_path, self.s3_bucket, s3_key) + logger.info(f"Uploaded: {local_path} -> {s3_url}") + + # Delete local file if configured + if self.remove_local: + try: + os.remove(local_path) + logger.debug(f"Removed local file: {local_path}") + except Exception as e: + logger.warning(f"Failed to remove local file {local_path}: {e}") + + return "success", s3_url, None + + except ClientError as e: + error_msg = f"S3 upload failed: {e}" + logger.error(error_msg) + return "failed", local_path, error_msg + except Exception as e: + error_msg = f"Upload error: {e}" + logger.error(error_msg) + return "failed", local_path, error_msg + + async def upload_files_async(self, paths: List[str]) -> List[tuple]: + """Upload multiple files asynchronously. + + :param paths: List of local file paths + :return: List of (idx, status, s3_url, error_message) tuples + """ + + async def _upload_file(semaphore: asyncio.Semaphore, idx: int, path: str) -> tuple: + async with semaphore: + try: + # Upload to S3 (run in executor to avoid blocking) + loop = asyncio.get_event_loop() + status, s3_url, error = await loop.run_in_executor(None, self._upload_to_s3, path) + return idx, status, s3_url, error + except Exception as e: + error_msg = f"Upload error: {e}" + logger.error(error_msg) + return idx, "failed", path, error_msg + + semaphore = asyncio.Semaphore(self.max_concurrent) + tasks = [_upload_file(semaphore, idx, path) for idx, path in enumerate(paths)] + results = await asyncio.gather(*tasks) + results = list(results) + results.sort(key=lambda x: x[0]) + + return results + + def _flat_paths(self, nested_paths): + """Flatten nested paths while preserving structure information.""" + flat_paths = [] + structure_info = [] # (original_index, sub_index) + + for idx, paths in enumerate(nested_paths): + if isinstance(paths, list): + for sub_idx, path in enumerate(paths): + flat_paths.append(path) + structure_info.append((idx, sub_idx)) + else: + flat_paths.append(paths) + structure_info.append((idx, -1)) # -1 means single element + + return flat_paths, structure_info + + def _create_path_struct(self, nested_paths) -> List: + """Create path structure for output.""" + reconstructed = [] + for item in nested_paths: + if isinstance(item, list): + reconstructed.append([None] * len(item)) + else: + reconstructed.append(None) + return reconstructed + + async def upload_nested_paths(self, nested_paths: List[Union[str, List[str]]]): + """Upload nested paths with structure preservation. + + :param nested_paths: Nested list of file paths + :return: (reconstructed_paths, failed_info) + """ + flat_paths, structure_info = self._flat_paths(nested_paths) + + # Upload all files asynchronously + upload_results = await self.upload_files_async(flat_paths) + + # Reconstruct nested structure + reconstructed_paths = self._create_path_struct(nested_paths) + + failed_info = "" + success_count = 0 + failed_count = 0 + skipped_count = 0 + exists_count = 0 + + for i, (idx, status, s3_url, error) in enumerate(upload_results): + orig_idx, sub_idx = structure_info[i] + + if status == "success": + success_count += 1 + elif status == "failed": + failed_count += 1 + if error: + failed_info += f"\n{flat_paths[i]}: {error}" + elif status == "skipped": + skipped_count += 1 + elif status == "exists": + exists_count += 1 + + # Update path in reconstructed structure + if sub_idx == -1: + reconstructed_paths[orig_idx] = s3_url + else: + reconstructed_paths[orig_idx][sub_idx] = s3_url + + # Log summary + logger.info( + f"Upload summary: {success_count} uploaded, {exists_count} already exists, " + f"{skipped_count} skipped, {failed_count} failed" + ) + + return reconstructed_paths, failed_info + + def process_batched(self, samples): + """Process a batch of samples.""" + if self.upload_field not in samples or not samples[self.upload_field]: + return samples + + batch_nested_paths = samples[self.upload_field] + + # Upload files and get S3 URLs + reconstructed_paths, failed_info = asyncio.run(self.upload_nested_paths(batch_nested_paths)) + + # Update the field with S3 URLs + samples[self.upload_field] = reconstructed_paths + + if len(failed_info): + logger.error(f"Failed uploads:\n{failed_info}") + + return samples diff --git a/demos/process_video_on_ray/configs/s3_video_processing_config.yaml b/demos/process_video_on_ray/configs/s3_video_processing_config.yaml new file mode 100644 index 0000000000..8a20cbd085 --- /dev/null +++ b/demos/process_video_on_ray/configs/s3_video_processing_config.yaml @@ -0,0 +1,110 @@ +# S3 Video Processing Configuration Example +# Complete workflow: Download from S3 -> Process -> Upload back to S3 + +# Global parameters +project_name: 's3-video-processing-demo' +executor_type: 'ray' +ray_address: 'auto' + +# Working directory +work_dir: './outputs/s3_demo' + +# Dataset configuration +dataset: + configs: + - type: remote + source: s3 + path: s3://your-bucket/dj/dataset/process_video_on_ray/demo-dataset-s3.pdf + # aws_access_key_id: YOUR_KEY # Recommended: set via environment variables (e.g., AWS_ACCESS_KEY_ID) + # aws_secret_access_key: YOUR_SECRET + aws_region: us-east-1 + endpoint_url: null # Optional, for S3-compatible object storage + +# Export configuration - metadata export to S3 +export_path: 's3://your-bucket/dj/dataset/demo-processed' +export_type: 'jsonl' + +# S3 credentials (for metadata export) +export_aws_credentials: + aws_access_key_id: 'YOUR_AWS_ACCESS_KEY_ID' + aws_secret_access_key: 'YOUR_AWS_SECRET_ACCESS_KEY' + aws_region: 'us-east-1' + endpoint_url: 'http://your-s3-endpoint:port' + +# Processing pipeline +process: + # ============================================ + # Step 1: Download videos from S3 to local + # ============================================ + - s3_download_file_mapper: + download_field: 'videos' # Field containing S3 URLs + save_dir: '/tmp/dj_downloaded_videos' # Local download directory + resume_download: true # Skip already downloaded files + timeout: 300 # HTTP/HTTPS download timeout (seconds) + max_concurrent: 5 # Maximum concurrent downloads + # S3 credentials + aws_access_key_id: 'YOUR_AWS_ACCESS_KEY_ID' + aws_secret_access_key: 'YOUR_AWS_SECRET_ACCESS_KEY' + aws_region: 'us-east-1' + endpoint_url: 'http://your-s3-endpoint:port' + + # ============================================ + # Step 2: Video filtering - duration + # ============================================ + - video_duration_filter: + min_duration: 20 # Minimum duration (seconds) + max_duration: 100 # Maximum duration (seconds) + + # ============================================ + # Step 3: Video filtering - resolution + # ============================================ + - video_resolution_filter: + min_width: 200 # Minimum width (pixels) + max_width: 4096 # Maximum width (pixels) + min_height: 200 # Minimum height (pixels) + max_height: 4096 # Maximum height (pixels) + any_or_all: any # 'any' means satisfying any condition is sufficient + + # ============================================ + # Step 4: Split videos by duration + # ============================================ + - video_split_by_duration_mapper: + split_duration: 10 # Duration of each segment (seconds) + min_last_split_duration: 0 # Minimum duration of the last segment + keep_original_sample: false # Do not keep original sample + # Save location for processed videos + save_dir: '/tmp/dj_processed_videos' + + # ============================================ + # Step 5: Adjust video aspect ratio + # ============================================ + - video_resize_aspect_ratio_mapper: + min_ratio: 1.0 # Minimum aspect ratio + max_ratio: 1.1 # Maximum aspect ratio + strategy: increase # Adjustment strategy: increase size + + # ============================================ + # Step 6: Upload processed videos to S3 + # ============================================ + - s3_upload_file_mapper: + upload_field: 'videos' # Field containing local file paths + s3_bucket: 'your-bucket' # S3 bucket name + s3_prefix: 'dj/processed_videos/' # S3 object prefix (folder) + remove_local: true # Delete local files after upload (save space) + skip_existing: true # Skip files already existing in S3 + max_concurrent: 10 # Maximum concurrent uploads + # S3 credentials + aws_access_key_id: 'YOUR_AWS_ACCESS_KEY_ID' + aws_secret_access_key: 'YOUR_AWS_SECRET_ACCESS_KEY' + aws_region: 'us-east-1' + endpoint_url: 'http://your-s3-endpoint:port' + +# ============================================ +# Optional: Other configurations +# ============================================ +# Keep statistics information +keep_stats_in_res_ds: true +keep_hashes_in_res_ds: false + +# Operator fusion (optimize performance) +op_fusion: false diff --git a/demos/process_video_on_ray/data/demo-dataset-s3.jsonl b/demos/process_video_on_ray/data/demo-dataset-s3.jsonl new file mode 100644 index 0000000000..e53bb36c56 --- /dev/null +++ b/demos/process_video_on_ray/data/demo-dataset-s3.jsonl @@ -0,0 +1,3 @@ +{"videos": ["s3://your-bucket/videos/video1.mp4"], "text": "<__dj__video> A sample video"} +{"videos": ["s3://your-bucket/videos/video2.mp4"], "text": "<__dj__video> Another sample video"} +{"videos": ["s3://your-bucket/videos/video3.mp4"], "text": "<__dj__video> Third sample video"} diff --git a/docs/Operators.md b/docs/Operators.md index 5772854cc4..c500a15be7 100644 --- a/docs/Operators.md +++ b/docs/Operators.md @@ -247,6 +247,8 @@ All the specific operators are listed below, each featured with several capabili | remove_table_text_mapper | 🔤Text 💻CPU 🟢Stable | Mapper to remove table texts from text samples. 映射器从文本样本中删除表文本。 | [info](operators/mapper/remove_table_text_mapper.md) | - | | remove_words_with_incorrect_substrings_mapper | 🔤Text 💻CPU 🟢Stable | Mapper to remove words containing specified incorrect substrings. 映射程序删除包含指定的不正确子字符串的单词。 | [info](operators/mapper/remove_words_with_incorrect_substrings_mapper.md) | - | | replace_content_mapper | 🔤Text 💻CPU 🟢Stable | Replaces content in the text that matches a specific regular expression pattern with a designated replacement string. 用指定的替换字符串替换与特定正则表达式模式匹配的文本中的内容。 | [info](operators/mapper/replace_content_mapper.md) | - | +| s3_download_file_mapper | 💻CPU 🔴Alpha | Mapper to download files from S3 to local files or load them into memory. Mapper将文件从S3下载到本地文件或将其加载到内存中。 | - | - | +| s3_upload_file_mapper | 💻CPU 🔴Alpha | Mapper to upload local files to S3 and update paths to S3 URLs. Mapper将本地文件上传到S3并更新S3 url的路径。 | - | - | | sdxl_prompt2prompt_mapper | 🔤Text 🚀GPU 🟢Stable | Generates pairs of similar images using the SDXL model. 使用SDXL模型生成成对的相似图像。 | [info](operators/mapper/sdxl_prompt2prompt_mapper.md) | - | | sentence_augmentation_mapper | 🔤Text 🚀GPU 🧩HF 🟢Stable | Augments sentences by generating enhanced versions using a Hugging Face model. 通过使用拥抱面部模型生成增强版本来增强句子。 | [info](operators/mapper/sentence_augmentation_mapper.md) | - | | sentence_split_mapper | 🔤Text 💻CPU 🟢Stable | Splits text samples into individual sentences based on the specified language. 根据指定的语言将文本样本拆分为单个句子。 | [info](operators/mapper/sentence_split_mapper.md) | - |