-
Notifications
You must be signed in to change notification settings - Fork 316
feat: Support iceberg、hudi、delta、hdfs data source. #875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @Dludora, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly extends the data handling capabilities of the system by integrating support for several popular distributed data sources and data lake table formats, including Iceberg, Hudi, Delta Lake, and HDFS. It introduces dedicated loading strategies for both local and Ray-based executors for these new sources. Furthermore, it enhances the data export functionality to allow direct writing to HDFS and Iceberg tables. A key improvement is the refactoring of filesystem creation, centralizing the logic for configuring S3 and HDFS access, which makes the codebase cleaner and more maintainable. These changes collectively enable more flexible and robust data processing workflows, especially in distributed environments. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds support for several new data sources (Iceberg, Hudi, Delta, HDFS) and export targets (HDFS, Iceberg), which is a great enhancement. The implementation is mostly solid, with good use of utility functions for creating filesystems and robust fallback mechanisms for exporting data.
My review includes a few suggestions:
- Refactoring duplicated code for HDFS filesystem creation into a shared utility.
- Removing some commented-out code.
- Fixing an inconsistency in the configuration validation for the Hudi data loading strategy.
- Correcting the format of a new demo data file.
Overall, these are great additions to the project's capabilities.
| CONFIG_VALIDATION_RULES = { | ||
| "required_fields": ["table_uri"], | ||
| "optional_fields": [], | ||
| "field_types": {"path": str}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field_types in CONFIG_VALIDATION_RULES is inconsistent. It specifies "path": str, but the required field is table_uri. This should be changed to "table_uri": str to match the usage in load_data and the required_fields list.
| "field_types": {"path": str}, | |
| "field_types": {"table_uri": str}, |
| def _create_hdfs_fs(self): | ||
| import pyarrow.fs as fs | ||
|
|
||
| host = self.ds_config.get("host", None) | ||
| port = self.ds_config.get("port", None) | ||
| if port is not None: | ||
| port = int(port) | ||
| user = self.ds_config.get("user", None) | ||
| kerb_ticket = self.ds_config.get("kerb_ticket", None) | ||
| extra_conf = self.ds_config.get("extra_conf", None) | ||
| return fs.HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # from data_juicer.utils.s3_utils import get_aws_credentials | ||
| from data_juicer.utils.model_utils import filter_arguments | ||
|
|
||
| # s3_config = {} | ||
| # if "s3.access_key_id" in catalog_kwargs: | ||
| # s3_config["aws_access_key_id"] = catalog_kwargs.pop("s3.access_key_id") | ||
| # if "s3.secret_access_key" in catalog_kwargs: | ||
| # s3_config["aws_secret_access_key"] = catalog_kwargs.pop("s3.secret_access_key") | ||
| # if "s3.session_token" in catalog_kwargs: | ||
| # s3_config["aws_session_token"] = catalog_kwargs.pop("s3.session_token") | ||
| # if "s3.region" in catalog_kwargs: | ||
| # s3_config["aws_region"] = catalog_kwargs.pop("s3.region") | ||
| # if "s3.endpoint" in catalog_kwargs: | ||
| # s3_config["endpoint_url"] = catalog_kwargs.pop("s3.endpoint") | ||
| # aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region = get_aws_credentials(s3_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @@ -0,0 +1,146 @@ | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HYLcool
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add new test cases for the new features in the corresponding test files.
| if path.startswith("s3://"): | ||
| validate_s3_path(path) | ||
|
|
||
| s3_keys = ["aws_access_key_id", "aws_secret_access_key", "aws_session_token", "aws_region", "endpoint_url"] | ||
| s3_conf = {k: args.pop(k) for k in s3_keys if k in args} | ||
| fs = create_pyarrow_s3_filesystem(s3_conf) | ||
| logger.info(f"Detected S3 export path: {path}. S3 filesystem configured.") | ||
|
|
||
| elif path.startswith("hdfs://"): | ||
| import pyarrow.fs as pa_fs | ||
|
|
||
| hdfs_keys = ["host", "port", "user", "kerb_ticket", "extra_conf"] | ||
| hdfs_conf = {k: args.pop(k) for k in hdfs_keys if k in args} | ||
| if "port" in hdfs_conf: | ||
| hdfs_conf["port"] = int(hdfs_conf["port"]) | ||
| fs = pa_fs.HadoopFileSystem(**hdfs_conf) | ||
| logger.info(f"Detected HDFS export path: {path}. HDFS filesystem configured.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an extra else branch to raise a warning or error for unsupported prefix.
| self.s3_filesystem = create_pyarrow_s3_filesystem(s3_config) | ||
| logger.info(f"Detected S3 export path: {export_path}. S3 filesystem configured.") | ||
| fs_args = copy.deepcopy(self.export_extra_args) | ||
| self.fs = create_filesystem_from_args(export_path, fs_args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking if the returned fs is None might be necessary.
As the title says