Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal #45267

Closed
wants to merge 14 commits into from

Conversation

szehon-ho
Copy link
Contributor

What changes were proposed in this pull request?

-- Allow SPJ between 'compatible' bucket funtions
-- Add a mechanism to define 'reducible' functions, one function whose output can be 'reduced' to another for all inputs.

Why are the changes needed?

-- SPJ currently applies only if the partition transform expressions on both sides are identifical.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new tests in KeyGroupedPartitioningSuite

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Feb 27, 2024
@himadripal
Copy link

himadripal commented Feb 27, 2024

I was trying this case,

val partition1 = Array(Expressions.years("ts"), bucket(2, "id")) val partition2 = Array(Expressions.days("ts"), bucket(4, "id"))

this throws an exception when paritiallyClustered = false and allowCompatibleTransform=true.

exception :
class java.lang.Integer cannot be cast to class java.lang.Long (java.lang.Integer and java.lang.Long are in module java.base of loader 'bootstrap') java.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.lang.Long (java.lang.Integer and java.lang.Long are in module java.base of loader 'bootstrap') at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:103) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)

@sunchao
Copy link
Member

sunchao commented Feb 27, 2024

Thanks @szehon-ho ! will take a look in a few days.

@szehon-ho
Copy link
Contributor Author

Thanks @himadripal I was able to modify one of my tests to also reproduce it, and it should be fixed now.

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @szehon-ho, great to see this feature landing on the Spark side.

To be honest, I am implementing a similar feature called bucket rescaling, but on the Iceberg side. By bucket rescaling, it means that bucketed Iceberg table could report its partitioning with an on-demand bucket number(compatible with current spec's bucket number, a.k.a on_demand_bucket_number % current_bucket_num = 0 or current_bucket_number % on_demand_bucket_number = 0). It's also possible for the bucketed iceberg table to have mixed but compatible bucket numbers in its specs.

So, I'd like to share two cents about this topic, hopes that it can share some insights.

  1. besides down scale a bucket, it's also possible to upscale a bucket as long as we put a bucket_id filter after the the scan node. For example, to upscale bucket number from 4 to 8 for the bucket_id = 3 bucket. It could be divided into two buckets: bucket_id_8=3 with filter bucket(8, id) = 3 and bucket_id_8 = 7 with filter bucket(8, id) = 7
  2. it's possible to support bucket number evolution in one bucket partition field.

Anyway, it's game change for Spark's SPJ to support compatible bucket numbers. The up scale cases might be supported in follow-up prs.

val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
partitions.length)

val expectedBuckets = Math.min(table1buckets, table2buckets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the bucket numbers are always coalesced to a smaller in the current impl?

It might not be the desired behavior when the coalesced bucket is extreme small, like 1 or 2 or 4...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in fact it is definitely a follow up to do this with 'partiallyClustered' (which is currently not enabled). This mode today 'duplicate' partitions for the partially clustered side, and I think can be used to turn on the same for compatible transforms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it will be very useful to make partially clustered distribution work with the feature introduced in this PR.

* Returning none also indicates that none of the partition expressions can be reduced on the
* corresponding expression on the other shuffle spec.
*/
def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit over engineering to introduce this, Reducer and ReducibleFunction to check bucket numbers are compatible and reduce bucket numbers.

Do you have any user case in mind that partition transforms other than bucket transform could leverage this?
If not, these classes might be used internally rather than exposed as a public API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @advancedxy yea, it is definitely increases the scope to make a generic mechanism. But there are actually many, the main example is that even Iceberg bucket function is not the same as Spark's and would need to implement this somehow, but obviously anything the user registers in the function catalog. For instance, geo bucketing functions as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One use case I think could be hours(col) vs days(col), in this case we can "reduce" the former into the latter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, geo bucketing functions as well.

emm, of course, geo bucketing makes a lot of sense.

I think could be hours(col) vs days(col)

I'm not sure about this use case. Theoretically, we can "reduce" the former into the latter, however it seems impractical to me to "reduce" hours into days. Suppose we have table A with hours(col) partition transform, and table B with days(col) partition transform and we are going to do join with A.col = B.col. If A's hours partitions are reduced to days partition, it means we need to process ~24 times partition data in one task, which might already been big enough from table A's perspective?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just one potential example. It could be useful, for instance, if the table with hours(col) has much less data than the other side (but not small enough to trigger BHJ). The Reducer here allows data sources to specify relationships between transforms beyond the bucketing case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Reducer here allows data sources to specify relationships between transforms beyond the bucketing case.

Yeah, I can get the potential usage now. However, It's still hard for developers to correctly understands what Reducer actually mean (it's mathematically clear though) and how does it work. Maybe we should add some concrete examples in the JavaDoc of Reducer class. It would also be great to demonstrate the use case beyonds the bucketing case in the test, but I think that's optional.

/**
* If this function is 'reducible' on another function, return the {@link Reducer} function.
* @param other other function
* @param thisArgument argument for this function instance
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the arguments here are a bit confusing - as a caller of this function, how do I know what should I pass here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadocs

* Returning none also indicates that none of the partition expressions can be reduced on the
* corresponding expression on the other shuffle spec.
*/
def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should belong to KeyGroupedShuffleSpec? I'm not sure whether this API is meaningful for other shuffle specs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -829,20 +846,59 @@ case class KeyGroupedShuffleSpec(
}
}

override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems unnecessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, i think i was trying to move the private method to the bottom

* @since 4.0.0
*/
@Evolving
public interface ReducibleFunction<T, A> extends ScalarFunction<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is T and what is A? should Reducer also have two type parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added javadocs

other match {
case otherSpec: KeyGroupedShuffleSpec =>
val results = partitioning.expressions.zip(otherSpec.partitioning.expressions).map {
case (e1: TransformExpression, e2: TransformExpression)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's better to move this logic into TransformExpression itself, e.g., add a TransformExpression.reducer method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

… they are not equal

  ### What changes were proposed in this pull request?
-- Allow SPJ between 'compatible' bucket funtions
-- Add a mechanism to define 'reducible' functions, one function whose output can be 'reduced' to another for all inputs.

  ### Why are the changes needed?
-- SPJ currently applies only if the partition transform expressions on both sides are identifical.

  ### Does this PR introduce _any_ user-facing change?
No

  ### How was this patch tested?
Added new tests in KeyGroupedPartitioningSuite

  ### Was this patch authored or co-authored using generative AI tooling?
No
* <li>other param = Int(2)</li>
* </ul>
* @param other the other function
* @param thisParam param for this function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear to me what should this be beyond the bucketing case. Should we add a separate method just for the special case of bucketing?

    Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other);

    Option<Reducer<I, O>> bucketReducer(ReducibleFunction<I, O> other, int numBuckets,
                                  int otherNumBuckets);

Copy link
Contributor Author

@szehon-ho szehon-ho Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, made two methods and a default for both, so that implementations can pick one to override

Copy link
Contributor Author

@szehon-ho szehon-ho Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on second thought, we do plan to add geo partition transforms , for example xz2 ordering https://github.com/spatialx-project/geolake/blob/main/api/src/main/java/org/apache/iceberg/transforms/ExtendedZCurve.java which need 'resolution' parameter.

as well as multi-arg bucket transforms: https://docs.google.com/document/d/1aDoZqRgvDOOUVAGhvKZbp5vFstjsAMY4EFCyjlxpaaw/

So i added back the type to Object in latest pr.

This is one approach, we can probably re-use int in some of these, but it seemed cleaner to have a generic type. Let me know what you think.

Note, I tried awhile to make the method type-parameterized like

<T> Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other, T numBuckets,
                                  T otherNumBuckets);

but was not able to override it successfully in Bucket in scala

val leftReducers = leftSpec.reducers(rightSpec)
val rightReducers = rightSpec.reducers(leftSpec)

if (leftReducers.isDefined || rightReducers.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we should be able to support the case where the number of buckets are not divisible but have a greatest common divisor?
such as bucket(16, x) vs bucket(12, y)? in this case, since the common divisor is 4, we can have reducers for both x and y, dividing inputs by 4 and 3 respectively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, or in other words both sides mod the gcd. Ie bucket(16, x) % 4 and bucket(12, y) % 4.

Implemented and added test case in latest patch.

@szehon-ho
Copy link
Contributor Author

@sunchao can you take another look? I guess open question is how to support other potential parameterized transforms in the future, let me know your thoughts.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szehon-ho , looks much better now. I mainly have one more question on the params.

* <li>otherParam = Int(2)</li>
* </ul>
*
* @param thisParam parameter for this function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious what will thisParam and otherParam be if there are multiple parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking a list, would that work or cause any issue ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking whether we can use InternalRow for this, which is consistent with the v2 function API (see ScalarFunction and AggregateFunction, and can support multiple parameters.

We need to define a clear contract between Spark and the callers of this method though: what should a caller expect to see from thisParam and otherParam.

For bucket it is quite clear, but I'm not sure how this will this work with geohash function that you pointed out. In Spark, we need a way for the data source provider to pass the resolution of the function to Spark via v2 catalog, and then pass it back to the data source via reducer. Any idea?

Copy link
Contributor Author

@szehon-ho szehon-ho Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think you are right, it requires more changes to Spark, V2ExpressionUtil, etc to add these. Let me revert back to bucketReducer() and then later if we figure that out, we can make this method invoke the general API.

*/
default Reducer<I, O> reducer(
Object thisParam,
ReducibleFunction<?, ?> otherFunction,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReducibleFunction<I, O> otherFunction did not work? just wonder why

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think I get an error like:

/Users/szehon/repos/apache-spark/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:105:42: type mismatch;
[error]  found   : org.apache.spark.sql.connector.catalog.functions.ReducibleFunction[_$5,_$6] where type _$6, type _$5
[error]  required: org.apache.spark.sql.connector.catalog.functions.ReducibleFunction[_$3,_$4]
[error]         thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
[error]                                          ^
[error] /Users/szehon/repos/apache-spark/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:106:38: type mismatch;
[error]  found   : org.apache.spark.sql.connector.catalog.functions.ReducibleFunction[_$5,_$6] where type _$6, type _$5

But actually there is no requirement they have to be the exact same Input, Output type, in theory the other function could have different types.

@szehon-ho
Copy link
Contributor Author

szehon-ho commented Mar 22, 2024

@sunchao thanks ! addressed review comments. Lmk if supporting generic function parameter is too messy, and we want to switch back to just (int numbucket) for first cut.

@szehon-ho
Copy link
Contributor Author

@sunchao if you have time for another look, reverted to use specific argument type and method for bucket, and worry about other parameterized transforms later.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good!

@szehon-ho
Copy link
Contributor Author

szehon-ho commented Apr 4, 2024

@sunchao thanks can you take another look? I also simplified the bucket logic , to always use gcd (it seems its the general case).

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link

@himadripal himadripal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this feature. @szehon-ho

@sunchao sunchao closed this in 11abc64 Apr 6, 2024
@sunchao
Copy link
Member

sunchao commented Apr 6, 2024

Thanks, merged to master!

@szehon-ho
Copy link
Contributor Author

Thanks @sunchao and all for the review!

szehon-ho added a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
… they are not equal (apache#1946)

-- Allow SPJ between 'compatible' bucket funtions
-- Add a mechanism to define 'reducible' functions, one function whose output can be 'reduced' to another for all inputs.

  ### Why are the changes needed?
-- SPJ currently applies only if the partition transform expressions on both sides are identifical.

  ### Does this PR introduce _any_ user-facing change?
No

  ### How was this patch tested?
Added new tests in KeyGroupedPartitioningSuite

  ### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45267 from szehon-ho/spj-uneven-buckets.

Authored-by: Szehon Ho <[email protected]>

Signed-off-by: Chao Sun <[email protected]>
Co-authored-by: Szehon Ho <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants