-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15214][SQL] Code-generation for Generate #13065
Conversation
Test build #58418 has finished for PR 13065 at commit
|
Looks like a good start. Once you flush this out more, can you print the generated code ("explain codegen ..."), and do some microbenchmarks similar to the ones done in the benchmark package? |
Seems like the build is stuck. I might have introduced a infinite loop :(... |
Test build #58454 has finished for PR 13065 at commit
|
Test build #58505 has finished for PR 13065 at commit
|
Test build #58513 has finished for PR 13065 at commit
|
Test build #58517 has finished for PR 13065 at commit
|
Test build #58705 has finished for PR 13065 at commit
|
Test build #59965 has finished for PR 13065 at commit
|
Test build #60131 has finished for PR 13065 at commit
|
Test build #60137 has finished for PR 13065 at commit
|
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
Test build #68091 has finished for PR 13065 at commit
|
ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new InternalRow[$numRows];") | ||
val values = children.tail | ||
val dataTypes = values.take(numFields).map(_.dataType) | ||
val rows = for (row <- 0 until numRows) yield { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we split these into multiple funcitons just in case of numRow is large?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
protected override def doProduce(ctx: CodegenContext): String = { | ||
// We need to add some code here for terminating generators. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to the doc. The problem is that we currently do not have a generator in Spark SQL that requires this.
} | ||
|
||
// Generate the driving expression. | ||
val data = boundGenerator.genCode(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will moving this into codeGenCollection/codeGenTraversableOnce be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
generate explode array wholestage off 6920 / 7129 2.4 412.5 1.0X | ||
generate explode array wholestage on 623 / 646 26.9 37.1 11.1X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you post the result before and after this patch? so we can know what's the improvement come from this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Post it in the PR description should be enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #68671 has finished for PR 13065 at commit
|
The performance improvements look awesome, I will finished the review today. |
@@ -40,6 +42,10 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In | |||
* output of each into a new stream of rows. This operation is similar to a `flatMap` in functional | |||
* programming with one important additional feature, which allows the input rows to be joined with | |||
* their output. | |||
* | |||
* This operator supports whole stage code generation for generators that do not implement | |||
* terminate(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we check it and fallback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be done in
override def supportCodegen(): Boolean = {
generator.terminate().isEmpty
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this holds. This method implies that the generator holds some state, and that calling this on an un-used generator might return the correct result.
I have added an explicit GeneratorWithCodegen
trait which all generators must implement if they want to be eligible for code generation. The terminate() method in this trait has been closed off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, those Generator with terminate implements CodegenFallback, then GenerateExec will also fallback, nvm.
val init = concatIfOuter(s"boolean $hasNextCode", s", $outerVal = true") | ||
val check = concatIfOuter(hasNext, s"|| $outerVal") | ||
val update = concatIfOuter(hasNextCode, s", $outerVal = false") | ||
val next = if (outer) s"$hasNext ? $iterator.next() : null" else s"$iterator.next()" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: will these be easier to understand by having different branch for outer? for example:
if (outer) {
} else {
}
It will end with more lines of codes, but easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that I meant put the whole block in it, avoid the using of cancatIfOuter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, I'll update :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new InternalRow[$numRows];") | ||
val values = children.tail | ||
val dataTypes = values.take(numFields).map(_.dataType) | ||
val code = ctx.splitExpressions(ctx.INPUT_ROW, Seq.tabulate(numRows) { row => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, splitExpressions does not work with whole stage codegen, because the input of children expression is not ctx.INPUT_ROW
, we have to fallback if there are many rows.
Can you add a test for that (many rows)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a test to check if it will fail compilation for a large stack expression. Testing the actual fallback is nearly impossible. I would need to change something on this line: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L359
assert(ds.collect() === Array(Row(0, 1), Row(0, 2), Row(1, 2), Row(1, 3))) | ||
} | ||
|
||
test("large inline generate should fail in WholeStageCodegen") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this still work (by fallback)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it still falls back like it should (I tested that manually). The problem is that the fallback code path is not testable at the moment. So I am currently testing if inline fails for large inline cases, assuming that the fallback works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean it should not reply on the fallback for failed compiling (whole plan will fallback), but don't use whole stage codegen for this Generate.
This could be done by checking it in Generate.supportCodegen()
Test build #68804 has finished for PR 13065 at commit
|
| InternalRow $current = (InternalRow)($next); | ||
| ${consume(ctx, input ++ values)} | ||
|} | ||
""".stripMargin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: I'm thinking of
if (outer) {
s"""
|${data.code}
|scala.collection.Iterator<InternalRow> $iterator = ${data.value}.toIterator();
|boolean $outerVal = true;
|while ($iterator.hasNext() || $outerVal) {
| $numOutput.add(1);
| InternalRow $current = (InternalRow)($iterator.hasNext()? $iterator.next() : null);
| $outerVal = false;
| ${consume(ctx, input ++ values)}
|}
""".stripMargin
} else {
s"""
|${data.code}
|scala.collection.Iterator<InternalRow> $iterator = ${data.value}.toIterator();
|while ($iterator.hasNext()) {
| $numOutput.add(1);
| InternalRow $current = (InternalRow)($iterator.next());
| ${consume(ctx, input ++ values)}
|}
""".stripMargin
}
…r and regular code paths more explicit.
Test build #68875 has finished for PR 13065 at commit
|
Test build #68876 has finished for PR 13065 at commit
|
// iterator contains no input. We do this by adding an 'outer' variable which guarantees | ||
// execution of the first iteration even if there is no input. Evaluation of the iterator is | ||
// prevented by checks in the next() and accessor code. | ||
val outerVal = ctx.freshName("outer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comment: this statement can be move into a then
clause.
Test build #68885 has finished for PR 13065 at commit
|
retest this please |
Test build #68891 has finished for PR 13065 at commit
|
LGTM |
Merging in master. |
## What changes were proposed in this pull request? This PR adds code generation to `Generate`. It supports two code paths: - General `TraversableOnce` based iteration. This used for regular `Generator` (code generation supporting) expressions. This code path expects the expression to return a `TraversableOnce[InternalRow]` and it will iterate over the returned collection. This PR adds code generation for the `stack` generator. - Specialized `ArrayData/MapData` based iteration. This is used for the `explode`, `posexplode` & `inline` functions and operates directly on the `ArrayData`/`MapData` result that the child of the generator returns. ### Benchmarks I have added some benchmarks and it seems we can create a nice speedup for explode: #### Environment ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 Intel(R) Core(TM) i7-4980HQ CPU 2.80GHz ``` #### Explode Array ##### Before ``` generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode array wholestage off 7377 / 7607 2.3 439.7 1.0X generate explode array wholestage on 6055 / 6086 2.8 360.9 1.2X ``` ##### After ``` generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode array wholestage off 7432 / 7696 2.3 443.0 1.0X generate explode array wholestage on 631 / 646 26.6 37.6 11.8X ``` #### Explode Map ##### Before ``` generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode map wholestage off 12792 / 12848 1.3 762.5 1.0X generate explode map wholestage on 11181 / 11237 1.5 666.5 1.1X ``` ##### After ``` generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode map wholestage off 10949 / 10972 1.5 652.6 1.0X generate explode map wholestage on 870 / 913 19.3 51.9 12.6X ``` #### Posexplode ##### Before ``` generate posexplode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate posexplode array wholestage off 7547 / 7580 2.2 449.8 1.0X generate posexplode array wholestage on 5786 / 5838 2.9 344.9 1.3X ``` ##### After ``` generate posexplode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate posexplode array wholestage off 7535 / 7548 2.2 449.1 1.0X generate posexplode array wholestage on 620 / 624 27.1 37.0 12.1X ``` #### Inline ##### Before ``` generate inline array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate inline array wholestage off 6935 / 6978 2.4 413.3 1.0X generate inline array wholestage on 6360 / 6400 2.6 379.1 1.1X ``` ##### After ``` generate inline array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate inline array wholestage off 6940 / 6966 2.4 413.6 1.0X generate inline array wholestage on 1002 / 1012 16.7 59.7 6.9X ``` #### Stack ##### Before ``` generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate stack wholestage off 12980 / 13104 1.3 773.7 1.0X generate stack wholestage on 11566 / 11580 1.5 689.4 1.1X ``` ##### After ``` generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate stack wholestage off 12875 / 12949 1.3 767.4 1.0X generate stack wholestage on 840 / 845 20.0 50.0 15.3X ``` ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Author: Herman van Hovell <[email protected]> Closes apache#13065 from hvanhovell/SPARK-15214.
This PR adds code generation to `Generate`. It supports two code paths: - General `TraversableOnce` based iteration. This used for regular `Generator` (code generation supporting) expressions. This code path expects the expression to return a `TraversableOnce[InternalRow]` and it will iterate over the returned collection. This PR adds code generation for the `stack` generator. - Specialized `ArrayData/MapData` based iteration. This is used for the `explode`, `posexplode` & `inline` functions and operates directly on the `ArrayData`/`MapData` result that the child of the generator returns. I have added some benchmarks and it seems we can create a nice speedup for explode: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 Intel(R) Core(TM) i7-4980HQ CPU 2.80GHz ``` ``` generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode array wholestage off 7377 / 7607 2.3 439.7 1.0X generate explode array wholestage on 6055 / 6086 2.8 360.9 1.2X ``` ``` generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode array wholestage off 7432 / 7696 2.3 443.0 1.0X generate explode array wholestage on 631 / 646 26.6 37.6 11.8X ``` ``` generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode map wholestage off 12792 / 12848 1.3 762.5 1.0X generate explode map wholestage on 11181 / 11237 1.5 666.5 1.1X ``` ``` generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate explode map wholestage off 10949 / 10972 1.5 652.6 1.0X generate explode map wholestage on 870 / 913 19.3 51.9 12.6X ``` ``` generate posexplode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate posexplode array wholestage off 7547 / 7580 2.2 449.8 1.0X generate posexplode array wholestage on 5786 / 5838 2.9 344.9 1.3X ``` ``` generate posexplode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate posexplode array wholestage off 7535 / 7548 2.2 449.1 1.0X generate posexplode array wholestage on 620 / 624 27.1 37.0 12.1X ``` ``` generate inline array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate inline array wholestage off 6935 / 6978 2.4 413.3 1.0X generate inline array wholestage on 6360 / 6400 2.6 379.1 1.1X ``` ``` generate inline array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate inline array wholestage off 6940 / 6966 2.4 413.6 1.0X generate inline array wholestage on 1002 / 1012 16.7 59.7 6.9X ``` ``` generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate stack wholestage off 12980 / 13104 1.3 773.7 1.0X generate stack wholestage on 11566 / 11580 1.5 689.4 1.1X ``` ``` generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ generate stack wholestage off 12875 / 12949 1.3 767.4 1.0X generate stack wholestage on 840 / 845 20.0 50.0 15.3X ``` Existing tests. Author: Herman van Hovell <[email protected]> Author: Herman van Hovell <[email protected]> Closes apache#13065 from hvanhovell/SPARK-15214.
What changes were proposed in this pull request?
This PR adds code generation to
Generate
. It supports two code paths:TraversableOnce
based iteration. This used for regularGenerator
(code generation supporting) expressions. This code path expects the expression to return aTraversableOnce[InternalRow]
and it will iterate over the returned collection. This PR adds code generation for thestack
generator.ArrayData/MapData
based iteration. This is used for theexplode
,posexplode
&inline
functions and operates directly on theArrayData
/MapData
result that the child of the generator returns.Benchmarks
I have added some benchmarks and it seems we can create a nice speedup for explode:
Environment
Explode Array
Before
After
Explode Map
Before
After
Posexplode
Before
After
Inline
Before
After
Stack
Before
After
How was this patch tested?
Existing tests.