-
Notifications
You must be signed in to change notification settings - Fork 81
Iceberg unit tests, support Iceberg + nonhive catalogs, Iceberg Kryo Serializer #993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very clean!
.idea/ | ||
*.jvmopts | ||
.bloop* | ||
.metals* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is working with metals relative to intellij? does the debugger work as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really good actually. The debugged worked out of the box, I found it comparable to IntelliJ overall.
I'd recommend it to anyone who has remote dev boxes since VSCode's integration is far better in my experience. All the tests run a lot faster and I got in way more dev cycles. I probably would only recommend over IntelliJ with a dev box though.
sparkSession: SparkSession): Seq[Map[String, String]] = { | ||
sparkSession.sqlContext | ||
.sql(s"SHOW PARTITIONS $tableName") | ||
.sql(s"SELECT partition FROM $tableName" ++ ".partitions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc does this work for regular hive tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for iceberg, Hive support is here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should work for hive tables, and the internal target I'm hitting are more or less "regular hive tables". Iceberg abstracts itself from the catalog implementation, so as long as your iceberg has an interface to your catalog implementation, it will work.
LGTM. lets merge this ASAP |
+1 I anecdotally got some better performance out of this too, probably because we got better file pruning from the iceberg manifests vs directly querying Hive. I need another approval though. |
@pengyu-hou could you PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Take a first pass and I will verify the new method to show partitions can work on our end.
"spark.chronon.table_write.format" -> "delta" | ||
) | ||
(configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") | ||
(configMap, "ai.chronon.spark.ChrononKryoRegistrator") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this duplicate? There is same thing on line 69?
rdd | ||
} | ||
|
||
def tableExists(tableName: String): Boolean = sparkSession.catalog.tableExists(tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, does the old method not work for iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! It does work for non Iceberg tables IF your underlying catalog supports this operation. This line of code is querying the catalog directly, but the more idiomatic thing to do with Iceberg is to use its built in partition APIs, which will be agnostic to your underlying catalog https://iceberg.apache.org/docs/latest/spark-queries/#spark-queries (note here that this also documents the point I made that Iceberg doesn't support DSv1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure most setups use a catalog that works with DSv1, but ours does not. I have read that there's better pushdown in V2 sources but I can't really be a good source for that benchmark considering my setup doesn't work with V1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, this should be fine and they are equivalent anyways.
partitions: Seq[String], | ||
partitionColumn: String = partitionColumn, | ||
subPartitionFilters: Map[String, String] = Map.empty): Unit = { | ||
// TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you explain more on the dsv1 and dsv2? Do you have a pointer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! It's largely historical, but the tl;dr was that at some point the way you build a datasource connector was redone to support more sink formats with better performance. This is a good article: https://blog.madhukaraphatak.com/spark-datasource-v2-part-1 and this pdf: https://issues.apache.org/jira/browse/SPARK-15689. The most notable reason this is coming up is that Iceberg is not integrated with DataSourceV1.
} | ||
|
||
@Test | ||
def testEventsEventsTemporalLongDs(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test makes sense for long ds, but I am wondering do we want to use this particular test case? It might significantly increase the CI time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think this could be reasonable to remove, however we use dateints really heavily and I like the insurance of having a high level unit test just to make sure it works end to end. My other testing might be sufficient alone, but we did have a recent regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, let's keep it then.
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Pengyu Hou <[email protected]> Signed-off-by: Abby Whittier <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much @abbywh !
Summary
We want to unit test with Iceberg test via CI as well as improve the support in the Chronon OSS package.
Why / Goal
Follow Ups
Test Plan
Added circleCI check
Checklist
Reviewers