-
Notifications
You must be signed in to change notification settings - Fork 107
feat(datasets): SparkDataset Rewrite #1185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Digging in a bit, this feels less like a rewrite and more like a refactoring. Here are my initial thoughts:
- I've added a comment re my concern removing the
_dbfs_globlogic. This needs to be validated carefully (perhaps Databricks improved performance of regular glob?) so we don't reintroduce a performance issue. I remember debugging this on a client project, because IIRC (it's been years) performance degrades to the point of unusability with a large number of versions. - Will this provide the best experience with
spark-connectanddatabricks-connect? (FWIWdatabricks-connectis a bit annoying to look into since it's not open source.) Spark 3.4 introduced Spark Connect, and Spark 4 includes major refactors to really make it part of the core (e.g.pyspark.sql.classicis moved to the same level aspyspark.sql.connect, and they inherit from the same baseDataFrameand all—wasn't the case before). IMO Spark Connect looks like the future of Spark, and aSparkDatasetrefresh should work seamlessly with it. Spark Connect (and Databricks Connect) are also potentially great for users who struggle with the deployment experience (e.g. need to get code onto Databricks from local). That said, the classic experience is still likely a very common way for teams who are working more from within Databricks to operate. - I like the fact that HDFS is supported through PyArrow now. If there's still concern that people may need the old, separate HDFS client (not sure there is?
hdfshasn't had a release in two years and doesn't support Python 3.13 for example), maybe that could be handled through some sort of fallback logic?
|
Thanks @deepyaman you're right about the DBFS glob issue that is a good catch we'll add that back in. Regarding refactor vs rewrite, we chose V2 for safety, but I'm open to discussing whether we should refactor original instead if you think that's better. |
Yeah, if course. I think can get the V2 "ready", and then see if it's sufficiently different that it needs to be breaking/a separate dataset. |
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
|
@noklam would also appreciate your thoughts on this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I don't have time to review this in details but I don't want to block this. A quick question top of my head:
- When should I use
databricksspecific dataset and spark.SparkDataset on Databricks? I recalled there are already something that is only possible with the databricks one. If we are re-writing this I think we should have a look at this. - dbfs is a bit annoying - Databricks already deprecated it, new cluster are default to UC's volume but still a lot of people are using dbfs in older cluster.
- Is there a goal/additional things that this rewrite improve? Or is it more like refactoring?
Hey @noklam thanks, I think the Databricks datasets are more for TABLE operations while the SparkDataset is for FILE operations. The new V2 handles both DBFS and UC Volumes properly, they still supports /dbfs/, dbfs:/, and /Volumes/ paths and we only do the DBFS optimisations only when needed. This goes a bit beyond a refactor I think, we solving some long standing issues such as the Databricks users can now actually use it, we add Spark Connect for Spark 4.0 and now the users can choose their deps instead of installing everything via pyproject.toml changes. It makes the dataset more usable. |
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
…rg/kedro-plugins into dev/sparkdataset-rewrite Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
Signed-off-by: Sajid Alam <[email protected]>
|
Hey team, this PR is ready for another review round. Since the last discussion:
Open questions:
Would appreciate reviews from @deepyaman @noklam @merelcht @ravi-kumar-pilla on the latest changes . Also I'm not familiar with all these platforms could someone with access test this dataset on:
|
Hi @SajidAlamQB , I tested locally before and can test again. I will also test on Databricks. I am not familiar with AWS EMR or GCP but I can try. Thank you |
|
Hi @SajidAlamQB , I have done some basic testing on databricks. Below is my notebook which I tried after a basic setup using UC volumes: from kedro_datasets.spark import SparkDataset, SparkDatasetV2
test_catalog = "spark_v2_catalog"
test_schema = "default"
volume_name = "spark_test_volume"
uc_base_path = f"/Volumes/{test_catalog}/{test_schema}/{volume_name}"
temp_dir = f"{uc_base_path}/basic"
dbutils.fs.mkdirs(temp_dir)
# Test 1: Parquet format (fails with SparkDatasetV2 but passes with SparkDataset)
parquet_path = f"{temp_dir}/basic_test.parquet"
dataset_parquet = SparkDatasetV2(filepath=parquet_path, file_format="parquet")
start_time = time.time()
dataset_parquet.save(test_df)
save_time = time.time() - start_time
print(f"✅ Saved parquet in {save_time:.3f}s to: {parquet_path}")
start_time = time.time()
loaded_df = dataset_parquet.load()
load_time = time.time() - start_time
print(f"✅ Loaded parquet in {load_time:.3f}s - {loaded_df.count()} rows")
# Verify data integrity
assert test_df.count() == loaded_df.count(), "Row count mismatch"
print("✅ Data integrity verified")
# Test 2: CSV with custom args (fails with SparkDatasetV2 but passes with SparkDataset)
print("\n📁 Test 2: CSV with Custom Arguments")
csv_path = f"{temp_dir}/basic_test.csv"
dataset_csv = SparkDataset(
filepath=csv_path,
file_format="csv",
save_args={"header": True, "sep": "|"},
load_args={"header": True, "sep": "|", "inferSchema": True}
)
dataset_csv.save(test_df)
loaded_csv_df = dataset_csv.load()
print(f"✅ CSV save/load successful - {loaded_csv_df.count()} rows")
loaded_csv_df.show(3)
# Test 3: JSON format (fails with SparkDatasetV2 but passes with SparkDataset)
print("\n📁 Test 3: JSON Format")
json_path = f"{temp_dir}/basic_test.json"
dataset_json = SparkDataset(filepath=json_path, file_format="json")
# DatasetError: Failed while saving data to dataset kedro_datasets.spark.spark_dataset_v2.SparkDatasetV2(filepath='file:///Volumes/spark_v2_catalog/default/spark_test_volume/basic/basic_test.csv', file_format='csv', load_args={'header': True, 'sep': '|', 'inferSchema': True}, save_args={'header': True, 'sep': '|'}, protocol='file').
# (java.lang.SecurityException) Cannot use com.databricks.backend.daemon.driver.WorkspaceLocalFileSystem - local filesystem access is forbiddenObservations:
Thank you |
Signed-off-by: Sajid Alam <[email protected]>
Hey @ravi-kumar-pilla, thanks for the detailed testing, I've pushed a fix for the UC volume issues, could you retest it, thank you! |
The UC volumes works. Though I am still receiving errors with DBFS. Lets connect tomorrow to discuss on this. Thank you |
|
Hi @SajidAlamQB , a related ticket we should address in this ticket if possible - #216 , #1210 |
…rg/kedro-plugins into dev/sparkdataset-rewrite Signed-off-by: Sajid Alam <[email protected]>
Description
This PR introduces
SparkDatasetV2an alternative cleaner version ofSparkDataset.The
SparkDatasetis currently frustrating to work with for several reasons that have been outlined in #135.split_filepathvs. the normalget_protocol_and_path), leading to duplication and inconsistencies (e.g., S3 paths handle "s3a://" differently).Dependency Issues:
spark-sparkdataset = ["kedro-datasets[spark-base,hdfs-base,s3fs-base]"]forces all three, butHDFSis rarely used nowadays. Databricks datasets rely on SparkDataset's parsing utils, creating circular deps.Testing and Bugs:
Development notes
This PR introduces
SparkDatasetV2to:Dependency Restructuring
spark-corewith zero dependenciesspark-local,spark-databricks,spark-emr)spark-s3,spark-gcs,spark-azure)Code Improvements for
SparkDatasetTYPE_CHECKINGfor lazy PySpark importsget_protocol_and_pathNow:
kedro-datasets[spark]will need to choose specific bundlesspark-hdfs)Checklist
jsonschema/kedro-catalog-X.XX.jsonif necessaryRELEASE.mdfile