Skip to content

Conversation

@kyo-tom
Copy link
Contributor

@kyo-tom kyo-tom commented Dec 4, 2025

feat: add S3 download and upload mappers for distributed processing

Add two new mappers to support S3-based workflows in distributed Ray environments:

  • s3_download_file_mapper: Download files from S3 to local storage
  • s3_upload_file_mapper: Upload local files to S3 and update dataset paths

Key features:

  • Full AWS S3 credential support (access key, secret key, session token, region, endpoint)
  • S3-compatible storage support (MinIO, Ceph, etc.)
  • Concurrent download/upload with configurable limits
  • Lazy initialization of boto3 clients for Ray serialization compatibility
  • Resume/skip functionality to avoid redundant transfers

This addresses the distributed file management challenge where processed
files are scattered across Ray worker nodes without centralized tracking.

Related Issues
#838

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kyo-tom, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data processing capabilities by integrating robust S3 download and upload functionalities. It introduces two specialized mappers designed for distributed environments, allowing users to efficiently manage data transfer between S3 and local processing nodes. This enables complex workflows, such as processing large media files, directly from and to cloud storage, streamlining distributed data pipelines.

Highlights

  • New S3 Download Mapper: Introduced S3DownloadFileMapper to download files from S3, HTTP, or HTTPS to local storage or memory. It supports concurrent downloads, resume functionality, and flexible S3 credential/endpoint configuration.
  • New S3 Upload Mapper: Added S3UploadFileMapper for uploading local files to S3 buckets. This mapper updates dataset paths to S3 URLs and includes features like concurrent uploads, optional deletion of local files after upload, and skipping files that already exist in S3.
  • Integration and Demo: The new S3 mappers are integrated into the system, and a comprehensive demo configuration (s3_video_processing_config.yaml) along with a sample dataset (demo-dataset-s3.jsonl) has been added to showcase an end-to-end S3 video processing workflow.
  • Distributed Processing Support: Both mappers are designed for distributed Ray environments, featuring lazy initialization of boto3 clients to prevent serialization issues and robust error handling for S3 operations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces two new mappers, S3DownloadFileMapper and S3UploadFileMapper, to facilitate file transfers with S3 in distributed environments. The overall implementation is solid, featuring concurrent operations and lazy client initialization for Ray compatibility. My review focuses on a critical bug fix in the download mapper, improving type hints for correctness, removing unused code, and enhancing security practices in the provided example configuration. With these adjustments, the new mappers will be more robust and reliable.

@HYLcool HYLcool requested a review from cyruszhang December 4, 2025 07:23
@kyo-tom kyo-tom force-pushed the download_and_upload_base_s3 branch 2 times, most recently from 92312f4 to 49958c3 Compare December 4, 2025 13:22
@kyo-tom kyo-tom force-pushed the download_and_upload_base_s3 branch from 49958c3 to 9ccf2bf Compare December 9, 2025 02:30
@kyo-tom
Copy link
Contributor Author

kyo-tom commented Dec 9, 2025

Formatted code

Copy link
Collaborator

@cyruszhang cyruszhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall agree with the problem we are trying to tackle here: s3 url in individual field. couple of comments to clean it up. Can you also share a successful run with the code?

@kyo-tom
Copy link
Contributor Author

kyo-tom commented Dec 10, 2025

Thanks to @cyruszhang for reviewing the code. I have submitted a new commit to fix the issue you mentioned.
A sample example can be found in demos/process_video_on_ray/configs/s3_video_processing_config.yaml

This commit refactors both s3_download_file_mapper and s3_upload_file_mapper
to use a consistent asyncio implementation pattern and support AWS credentials
from environment variables with proper priority.

Changes to data_juicer/ops/mapper/s3_download_file_mapper.py:

  1. Integrate environment-based credential resolution
  2. Refactor to true async implementation
  3. Convert download_nested_urls() to async

Changes to data_juicer/ops/mapper/s3_upload_file_mapper.py:

  1. Integrate environment-based credential resolution
  2. Migrate from ThreadPoolExecutor to asyncio
  3. Convert upload_nested_paths() to async

Once the PR is approved, I will squash all commits.

@cyruszhang
Copy link
Collaborator

LGTM

@cyruszhang
Copy link
Collaborator

ready to merge; please resolve comments, merge master

@kyo-tom kyo-tom force-pushed the download_and_upload_base_s3 branch 2 times, most recently from 49ff42b to 3cf4c86 Compare December 20, 2025 02:36
@kyo-tom
Copy link
Contributor Author

kyo-tom commented Dec 20, 2025

It is ready to merge. @cyruszhang

Copy link
Collaborator

@Dludora Dludora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approve

Copy link
Collaborator

@Dludora Dludora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approve and run test

@HYLcool
Copy link
Collaborator

HYLcool commented Jan 5, 2026

Hi @kyo-tom

Thanks for your contribution! There are two issues from latest PRs or updates of third-party dependency and I have fixed them in this PR to the branch in your fork here. It's supposed to solve the test issues now in this PR. Please check and merge it before this PR is finished. Thanks again!

@HYLcool
Copy link
Collaborator

HYLcool commented Jan 6, 2026

Hi @kyo-tom

Thanks for your contribution! There are two issues from latest PRs or updates of third-party dependency and I have fixed them in this PR to the branch in your fork here. It's supposed to solve the test issues now in this PR. Please check and merge it before this PR is finished. Thanks again!

Hi, these modifications have been merged into the main branch in PR #876 . Please pull the latest code in the main branch and merge it into this PR.

@kyo-tom
Copy link
Contributor Author

kyo-tom commented Jan 7, 2026

Hi @HYLcool

Thanks for your help! I've already pulled the latest code from the main branch (including the fixes from #876) and force-pushed to this PR branch.

The commit f06c3bb now includes all the latest updates. The CI tests are currently running - once they pass, this PR should be ready to merge.

Please let me know if there's anything else I need to address. Thanks again for your assistance! 🙏

@kyo-tom
Copy link
Contributor Author

kyo-tom commented Jan 7, 2026

@HYLcool It looks like the unit tests ran correctly and finished, but the exit code is 1.
image

@HYLcool
Copy link
Collaborator

HYLcool commented Jan 7, 2026

@HYLcool It looks like the unit tests ran correctly and finished, but the exit code is 1. image

Recently the network issues are more often and it leads to failure of some API-based test cases. Besides, it seems some fixed bugs still exist in this PR.

But it's fine anyway. After the unit test, if there are no errors about the contents in this PR, we can merge it still. We will check the whole system later in new PRs.

@HYLcool
Copy link
Collaborator

HYLcool commented Jan 8, 2026

The issues occured in the unit test of this PR are resolved in other PRs (e.g. #824 ). Here we just ignore them and merge this PR.

@HYLcool HYLcool merged commit 36e6389 into datajuicer:main Jan 8, 2026
2 of 7 checks passed
@HYLcool HYLcool mentioned this pull request Jan 13, 2026
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants