Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47094][SQL] SPJ : fix bucket reducer function #47126

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

himadripal
Copy link

@himadripal himadripal commented Jun 27, 2024

What changes were proposed in this pull request?

SPJ compatible bucket issue has an implementation of reducible function. This patch fixes the implementation and make it same as in apache iceberg one.

Why are the changes needed?

With this fix, incompatible number of buckets do not return 1 as GCD, hence the buckets do not reduce to 1 when it used in incompatible number of buckets.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

With unit tests

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jun 27, 2024
@himadripal
Copy link
Author

@szehon-ho please take a look.

Copy link
Contributor

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks , some preliminary comment.

As this is just fixing test transform, I think we should just add one minimum negative test for this (to assert no SPJ in this case)

@@ -1558,7 +1558,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}

test("SPARK-47094: Support compatible buckets with common divisor") {
test("SPARK-47094: SPJ:Support compatible buckets with common divisor") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space before Support ? Maybe SPJ is redundant here actually

Copy link
Author

@himadripal himadripal Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other ones in SPJ has this SPJ: , can remove it too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, its fine then, maybe just put the space


Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
createTable(tab, columns2, part)
val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I added a ton of values to test the positive case, but do you think we can just have the bare minimum values on each side?

}


test("SPARK-47094: SPJ: Support compatible buckets common divisor is one of the numbers") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is already covered by "Support compatible bucket" , and would just say no need to add this test?

Copy link
Author

@himadripal himadripal Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to check if we can test the fact that there is no grouping happening on the side which has same number of bucket as gcd. Not sure whether we can assert that or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to check if we can test the fact that there is no grouping happening on the side which has same number of bucket as gcd. Not sure whether we can assert that or not.

Do you mean expected scans?

I'm still not seeing the difference, this test the case that one divides into the other (4, 8), (3, 9), what's the difference with the existing test of (2, 4), (2, 6) above? (Support compatible bucket) It also asserts the expected scans up there too.

@@ -101,8 +101,8 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In

if (otherFunc == BucketFunction) {
val gcd = this.gcd(thisNumBuckets, otherNumBuckets)
if (gcd != thisNumBuckets) {
return BucketReducer(thisNumBuckets, gcd)
if (gcd>1 && gcd!=thisNumBuckets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: spacing (gcd > 1 && gcd != thisNumBuckets)

@HyukjinKwon HyukjinKwon changed the title [SPARK-47094] SPJ : fix bucket reducer function [SPARK-47094][SQL] SPJ : fix bucket reducer function Jun 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants