From 5254559e5463ddd63d3c1cacfca835d26a863d9b Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 12 May 2016 00:27:42 +0200 Subject: [PATCH 01/27] CG for Generate/Explode --- .../sql/catalyst/expressions/generators.scala | 8 +- .../spark/sql/execution/GenerateExec.scala | 81 ++++++++++++++++++- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 12c35644e564c..37e99851c592b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ @@ -100,7 +100,7 @@ case class UserDefinedGenerator( @ExpressionDescription( usage = "_FUNC_(a) - Separates the elements of array a into multiple rows, or the elements of a map into multiple rows and columns.") // scalastyle:on line.size.limit -case class Explode(child: Expression) extends UnaryExpression with Generator with CodegenFallback { +case class Explode(child: Expression) extends UnaryExpression with Generator { override def children: Seq[Expression] = child :: Nil @@ -148,4 +148,8 @@ case class Explode(child: Expression) extends UnaryExpression with Generator wit } } } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + sys.error("Just don't") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 8b62c5507c0c8..40bdb33efc287 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.types.{ArrayType, DataType, MapType} /** * For lazy computing, be sure the generator.terminate() called in the very last @@ -53,7 +55,7 @@ case class GenerateExec( outer: Boolean, output: Seq[Attribute], child: SparkPlan) - extends UnaryExecNode { + extends UnaryExecNode with CodegenSupport { private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -99,5 +101,80 @@ case class GenerateExec( } } } -} + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { + // We need to add some code here for terminating generators. + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + generator match { + case e: Explode => codegenExplode(e, ctx, input, row) + } + } + + private def codegenExplode( + e: Explode, + ctx: CodegenContext, + input: Seq[ExprCode], + row: ExprCode): String = { + ctx.currentVars = input + ctx.copyResult = true + + // Generate the driving expression. + val data = e.child.genCode(ctx) + + // Generate looping variables. + val numOutput = metricTerm(ctx, "numOutputRows") + val index = ctx.freshName("index") + val numElements = ctx.freshName("numElements") + + // Generate accessor for MapData/Array element(s). + def accessor(src: String, field: String, dt: DataType, nullable: Boolean): ExprCode = { + val data = src + field + val value = ctx.freshName("value") + val javaType = ctx.javaType(dt) + val getter = ctx.getValue(data, dt, index) + if (outer || nullable) { + val isNull = ctx.freshName("isNull") + val code = + s""" + |boolean $isNull = $src == null || $data.isNullAt($index); + |$javaType $value = $isNull ? ${ctx.defaultValue(dt)} : $getter; + """.stripMargin + ExprCode(code, isNull, value) + } else { + ExprCode(s"$javaType $value = $getter;", "false", value) + } + } + val values = e.dataType match { + case ArrayType(dataType, nullable) => + Seq(accessor(data.value, "", dataType, nullable)) + case MapType(keyType, valueType, valueContainsNull) => + Seq(accessor(data.value, ".keyArray()", keyType, nullable = false), + accessor(data.value, ".valueArray()", valueType, valueContainsNull)) + } + + // Determine result vars. + val output = if (join) { + input ++ values + } else { + values + } + + s""" + |${data.code} + |int $index = 0; + |int $numElements = ${data.isNull} ? ${if (outer) 1 else 0} : ${data.value}.numElements(); + |while ($index < $numElements) { + | ${consume(ctx, output)} + | $numOutput.add(1); + | $index++; + |} + """.stripMargin + } +} From 5d068b50cdba415b2a71f9c8d44247fba1d5198b Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 12 May 2016 08:26:30 +0200 Subject: [PATCH 02/27] Fix compilation and binding errors. --- .../scala/org/apache/spark/sql/execution/GenerateExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 40bdb33efc287..134e4cb35ed09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -112,7 +112,7 @@ case class GenerateExec( } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - generator match { + boundGenerator match { case e: Explode => codegenExplode(e, ctx, input, row) } } @@ -151,7 +151,7 @@ case class GenerateExec( ExprCode(s"$javaType $value = $getter;", "false", value) } } - val values = e.dataType match { + val values = e.child.dataType match { case ArrayType(dataType, nullable) => Seq(accessor(data.value, "", dataType, nullable)) case MapType(keyType, valueType, valueContainsNull) => From b2d663bbb6985e0d806adafc54f6d071722bfeb1 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 12 May 2016 19:24:19 +0200 Subject: [PATCH 03/27] Fix infinite loop on continue. --- .../spark/sql/execution/GenerateExec.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 134e4cb35ed09..9d1f46d9b83be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -134,16 +134,16 @@ case class GenerateExec( val numElements = ctx.freshName("numElements") // Generate accessor for MapData/Array element(s). - def accessor(src: String, field: String, dt: DataType, nullable: Boolean): ExprCode = { - val data = src + field - val value = ctx.freshName("value") + def accessor(field: String, name: String, dt: DataType, nullable: Boolean): ExprCode = { + val source = data.value + field + val value = ctx.freshName(name) val javaType = ctx.javaType(dt) - val getter = ctx.getValue(data, dt, index) + val getter = ctx.getValue(source, dt, index) if (outer || nullable) { val isNull = ctx.freshName("isNull") val code = s""" - |boolean $isNull = $src == null || $data.isNullAt($index); + |boolean $isNull = ${data.isNull} || $source.isNullAt($index); |$javaType $value = $isNull ? ${ctx.defaultValue(dt)} : $getter; """.stripMargin ExprCode(code, isNull, value) @@ -153,10 +153,10 @@ case class GenerateExec( } val values = e.child.dataType match { case ArrayType(dataType, nullable) => - Seq(accessor(data.value, "", dataType, nullable)) + Seq(accessor("", "col", dataType, nullable)) case MapType(keyType, valueType, valueContainsNull) => - Seq(accessor(data.value, ".keyArray()", keyType, nullable = false), - accessor(data.value, ".valueArray()", valueType, valueContainsNull)) + Seq(accessor(".keyArray()", "key", keyType, nullable = false), + accessor(".valueArray()", "value", valueType, valueContainsNull)) } // Determine result vars. @@ -168,12 +168,10 @@ case class GenerateExec( s""" |${data.code} - |int $index = 0; |int $numElements = ${data.isNull} ? ${if (outer) 1 else 0} : ${data.value}.numElements(); - |while ($index < $numElements) { - | ${consume(ctx, output)} + |for (int $index = 0; $index < $numElements; $index++) { | $numOutput.add(1); - | $index++; + | ${consume(ctx, output)} |} """.stripMargin } From e04d66f57c3211b4a01dc5cef2bff14f88bb2a05 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 12 May 2016 22:02:04 +0200 Subject: [PATCH 04/27] Fix outer. --- .../org/apache/spark/sql/execution/GenerateExec.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 9d1f46d9b83be..0d78b1012c4c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -143,7 +143,7 @@ case class GenerateExec( val isNull = ctx.freshName("isNull") val code = s""" - |boolean $isNull = ${data.isNull} || $source.isNullAt($index); + |boolean $isNull = $numElements == 0 || $source.isNullAt($index); |$javaType $value = $isNull ? ${ctx.defaultValue(dt)} : $getter; """.stripMargin ExprCode(code, isNull, value) @@ -166,10 +166,12 @@ case class GenerateExec( values } + // Evaluate at least once in case of outer. + val cmp = if (outer) "<=" else "<" s""" |${data.code} - |int $numElements = ${data.isNull} ? ${if (outer) 1 else 0} : ${data.value}.numElements(); - |for (int $index = 0; $index < $numElements; $index++) { + |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); + |for (int $index = 0; $index $cmp $numElements; $index++) { | $numOutput.add(1); | ${consume(ctx, output)} |} From 43a04bf9c15ed894f19bb0168d71e45a2caa3b61 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 13 May 2016 00:30:26 +0200 Subject: [PATCH 05/27] Really really fix lateral outer view explode(...). --- .../org/apache/spark/sql/execution/GenerateExec.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 0d78b1012c4c8..99c2d0f150d81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -166,12 +166,14 @@ case class GenerateExec( values } - // Evaluate at least once in case of outer. - val cmp = if (outer) "<=" else "<" + // In case of outer we need to make sure the loop is executed at-least once when the array/map + // contains no input. We do this by setting the looping index to -1 if there is no input, + // evaluation of the array is prevented by a check in the accessor code. + val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0" s""" |${data.code} |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); - |for (int $index = 0; $index $cmp $numElements; $index++) { + |for (int $index = $init; $index < $numElements; $index++) { | $numOutput.add(1); | ${consume(ctx, output)} |} From 7b4772d144d8ea9510c44e2a207f2ac380b123e1 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 17 May 2016 18:19:13 +0200 Subject: [PATCH 06/27] Add benchmark & fix subexpressionsuite. --- .../SubexpressionEliminationSuite.scala | 16 +++++++++++----- .../execution/benchmark/MiscBenchmark.scala | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index 90e97d718a9fc..b826906499e3e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{DataType, IntegerType} class SubexpressionEliminationSuite extends SparkFunSuite { test("Semantic equals and hash") { @@ -158,13 +159,18 @@ class SubexpressionEliminationSuite extends SparkFunSuite { test("Children of CodegenFallback") { val one = Literal(1) val two = Add(one, one) - val explode = Explode(two) - val add = Add(two, explode) + val fallback = CodegenFallbackExpression(two) + val add = Add(two, fallback) - var equivalence = new EquivalentExpressions + val equivalence = new EquivalentExpressions equivalence.addExprTree(add, true) - // the `two` inside `explode` should not be added + // the `two` inside `fallback` should not be added assert(equivalence.getAllEquivalentExprs.count(_.size > 1) == 0) assert(equivalence.getAllEquivalentExprs.count(_.size == 1) == 3) // add, two, explode } } + +case class CodegenFallbackExpression(child: Expression) + extends UnaryExpression with CodegenFallback { + override def dataType: DataType = child.dataType +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index 470c78120b194..59a8d18d23ed2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -132,4 +132,23 @@ class MiscBenchmark extends BenchmarkBase { collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X */ } + + ignore("generate explode") { + val N = 1 << 24 + runBenchmark("generate explode", N) { + val df = sparkSession.range(N).selectExpr( + "id as key", + "array(rand(), rand(), rand(), rand(), rand()) as values") + df.selectExpr("key", "explode(values) value").count() + } + + /** + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 4.4.0-21-generic + Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz + generate explode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + generate explode wholestage off 8916 / 9250 1.9 531.5 1.0X + generate explode wholestage on 732 / 781 22.9 43.6 12.2X + */ + } } From 09513e767a3b61964d94e0e30ccd6db0b5712f20 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 25 May 2016 17:56:18 +0200 Subject: [PATCH 07/27] Generate WIP --- .../sql/catalyst/expressions/generators.scala | 2 +- .../expressions/jsonExpressions.scala | 134 +++++++++++++----- .../spark/sql/execution/GenerateExec.scala | 5 +- 3 files changed, 100 insertions(+), 41 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 37e99851c592b..b9eb4f8fddf19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -150,6 +150,6 @@ case class Explode(child: Expression) extends UnaryExpression with Generator { } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - sys.error("Just don't") + child.genCode(ctx) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c14a2fb122618..599c4d18eb85c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -25,7 +25,8 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} +import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -325,32 +326,25 @@ case class GetJsonObject(json: Expression, path: Expression) @ExpressionDescription( usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - like get_json_object, but it takes multiple names and return a tuple. All the input parameters and output column types are string.") // scalastyle:on line.size.limit -case class JsonTuple(children: Seq[Expression]) - extends Generator with CodegenFallback { +case class JsonTuple(children: Seq[Expression]) extends Generator { - import SharedFactory._ - - override def nullable: Boolean = { - // a row is always returned - false - } + // a row is always returned + override def nullable: Boolean = false // if processing fails this shared value will be returned @transient private lazy val nullRow: Seq[InternalRow] = - new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil + new GenericInternalRow(fieldExpressions.length) :: Nil // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head // the fields to query are the remaining children - @transient private lazy val fieldExpressions: Seq[Expression] = children.tail + @transient private lazy val fieldExpressions: Array[Expression] = children.tail.toArray // eagerly evaluate any foldable the field names - @transient private lazy val foldableFieldNames: IndexedSeq[String] = { - fieldExpressions.map { - case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString - case _ => null - }.toIndexedSeq + @transient private lazy val foldableFieldNames: Array[String] = fieldExpressions.map { + case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString + case _ => null } // and count the number of foldable fields, we'll use this later to optimize evaluation @@ -372,28 +366,16 @@ case class JsonTuple(children: Seq[Expression]) } } - override def eval(input: InternalRow): TraversableOnce[InternalRow] = { + override def dataType: DataType = StructType(fieldExpressions.zipWithIndex.map { + case (_, idx) => StructField(s"c$idx", StringType, nullable = true) + }) + + override def eval(input: InternalRow): Seq[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] if (json == null) { return nullRow } - try { - Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) { - parser => parseRow(parser, input) - } - } catch { - case _: JsonProcessingException => - nullRow - } - } - - private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = { - // only objects are supported - if (parser.nextToken() != JsonToken.START_OBJECT) { - return nullRow - } - // evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String val fieldNames = if (constantFields == fieldExpressions.length) { @@ -412,7 +394,85 @@ case class JsonTuple(children: Seq[Expression]) } } - val row = Array.ofDim[Any](fieldNames.length) + val values = JsonTuple.extractTuple(json, fieldNames) + if (values != null) { + new GenericInternalRow(values) :: Nil + } else { + nullRow + } + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val arrayDataClass = classOf[GenericArrayData].getName + val rowClass = classOf[GenericInternalRow].getName + def newArrayData(p: String): String = s"new $arrayDataClass(new Object[]{new $rowClass($p)})" + + // Add an empty row to default to. + val fieldCount = fieldExpressions.length + val nullRow = ctx.freshName("nullRow") + ctx.addMutableState( + arrayDataClass, + nullRow, + s"this.$nullRow = ${newArrayData(fieldCount.toString)};") + + // Add the field names as a class field and add the foldable field names. + val fieldNames = ctx.freshName("fieldNames") + val fieldNameValues = foldableFieldNames.map { + case null => "null" + case s => '"' + s + '"' + } + val fieldNamesInitCode = s"this.$fieldNames = new String[]{${fieldNameValues.mkString(", ")}};" + ctx.addMutableState("String[]", fieldNames, fieldNamesInitCode) + + // Resolve the non-foldable field names. + val evalFieldNames = foldableFieldNames.zip(fieldExpressions).zipWithIndex.collect { + case ((null, e), i) => + val code = e.genCode(ctx) + s""" + |${code.code} + |$fieldNames[$i] = ${code.isNull} ? null : ${code.value}; + """.stripMargin + } + + // Create the generated code. + val jsonSource = jsonExpr.genCode(ctx) + val result = ctx.freshName("result") + val jsonTupleClass = classOf[JsonTuple].getName + ev.copy(code = s""" + |${jsonSource.code} + |boolean ${ev.isNull} = false; + |ArrayData ${ev.value} = null; + |if (${jsonSource.isNull}) { + | ${ev.value} = $nullRow; + |} else { + | ${evalFieldNames.mkString("")} + | Object[] $result = $jsonTupleClass.extractTuple(${jsonSource.value}, $fieldNames); + | ${ev.value} = $result == null ? $nullRow : ${newArrayData(result)}; + |} + """.stripMargin) + } +} + +object JsonTuple { + import SharedFactory._ + + def extractTuple(json: UTF8String, fieldNames: Array[String]): Array[Any] = { + try { + Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) { parser => + extractTuple(parser, fieldNames) + } + } catch { + case _: JsonProcessingException => null + } + } + + private def extractTuple(parser: JsonParser, fieldNames: Array[String]): Array[Any] = { + // only objects are supported + if (parser.nextToken() != JsonToken.START_OBJECT) { + return null + } + + val values = Array.ofDim[Any](fieldNames.length) // start reading through the token stream, looking for any requested field names while (parser.nextToken() != JsonToken.END_OBJECT) { @@ -429,16 +489,15 @@ case class JsonTuple(children: Seq[Expression]) generator => copyCurrentStructure(generator, parser) } - row(idx) = UTF8String.fromBytes(output.toByteArray) + values(idx) = UTF8String.fromBytes(output.toByteArray) } } } - // always skip children, it's cheap enough to do even if copyCurrentStructure was called parser.skipChildren() } - new GenericInternalRow(row) :: Nil + values } private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { @@ -466,4 +525,3 @@ case class JsonTuple(children: Seq[Expression]) } } } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 99c2d0f150d81..0d4e1a9699592 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -118,7 +118,7 @@ case class GenerateExec( } private def codegenExplode( - e: Explode, + e: Expression, ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { @@ -126,7 +126,7 @@ case class GenerateExec( ctx.copyResult = true // Generate the driving expression. - val data = e.child.genCode(ctx) + val data = e.genCode(ctx) // Generate looping variables. val numOutput = metricTerm(ctx, "numOutputRows") @@ -153,6 +153,7 @@ case class GenerateExec( } val values = e.child.dataType match { case ArrayType(dataType, nullable) => + // Add unwrapping here for tuples. Seq(accessor("", "col", dataType, nullable)) case MapType(keyType, valueType, valueContainsNull) => Seq(accessor(".keyArray()", "key", keyType, nullable = false), From f7c230702928cf2958b56490fef8b3cfc674fb4a Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 30 May 2016 19:43:34 +0200 Subject: [PATCH 08/27] Update GenerateExec --- .../scala/org/apache/spark/sql/execution/GenerateExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 0d4e1a9699592..5a0c9612c01fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -113,7 +113,7 @@ case class GenerateExec( override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { boundGenerator match { - case e: Explode => codegenExplode(e, ctx, input, row) + case e: Explode => codegenExplode(e.child, ctx, input, row) } } @@ -151,7 +151,7 @@ case class GenerateExec( ExprCode(s"$javaType $value = $getter;", "false", value) } } - val values = e.child.dataType match { + val values = e.dataType match { case ArrayType(dataType, nullable) => // Add unwrapping here for tuples. Seq(accessor("", "col", dataType, nullable)) From 49f9e7ff04ae845dd6279240fac09b7e4a822702 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 3 Jun 2016 21:00:53 +0200 Subject: [PATCH 09/27] Further tweaking... --- .../spark/sql/execution/GenerateExec.scala | 57 ++++++++++++++----- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 5a0c9612c01fd..50adf7cf40b09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.{ArrayType, DataType, MapType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} /** * For lazy computing, be sure the generator.terminate() called in the very last @@ -41,6 +41,7 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional * programming with one important additional feature, which allows the input rows to be joined with * their output. + * * @param generator the generator expression * @param join when true, each output row is implicitly joined with the input tuple that produced * it. @@ -113,11 +114,12 @@ case class GenerateExec( override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { boundGenerator match { - case e: Explode => codegenExplode(e.child, ctx, input, row) + case e: Explode => codegen(e.child, ctx, input, row) + case e => codegen(e, ctx, input, row) } } - private def codegenExplode( + private def codegen( e: Expression, ctx: CodegenContext, input: Seq[ExprCode], @@ -134,16 +136,17 @@ case class GenerateExec( val numElements = ctx.freshName("numElements") // Generate accessor for MapData/Array element(s). - def accessor(field: String, name: String, dt: DataType, nullable: Boolean): ExprCode = { - val source = data.value + field + val outerCheck = optionalCode(outer, s"$numElements == 0") + def accessor(source: String, name: String, dt: DataType, nullable: Boolean): ExprCode = { val value = ctx.freshName(name) val javaType = ctx.javaType(dt) val getter = ctx.getValue(source, dt, index) - if (outer || nullable) { + val checks = outerCheck ++ optionalCode(nullable, s"$source.isNullAt($index)") + if (checks.nonEmpty) { val isNull = ctx.freshName("isNull") val code = s""" - |boolean $isNull = $numElements == 0 || $source.isNullAt($index); + |boolean $isNull = ${checks.mkString(" || ")}; |$javaType $value = $isNull ? ${ctx.defaultValue(dt)} : $getter; """.stripMargin ExprCode(code, isNull, value) @@ -151,17 +154,36 @@ case class GenerateExec( ExprCode(s"$javaType $value = $getter;", "false", value) } } - val values = e.dataType match { + + val (initArrayData, initValues, values) = e.dataType match { + /* + case ArrayType(st: StructType, nullable) if output.size > 1 => + val rowCode = accessor("", "col", st, nullable) + st.fields.map { f => + + } + ("", rowCode.code, Seq.empty[ExprCode]) +*/ case ArrayType(dataType, nullable) => - // Add unwrapping here for tuples. - Seq(accessor("", "col", dataType, nullable)) + ("", "", Seq(accessor(data.value, "col", dataType, nullable))) + case MapType(keyType, valueType, valueContainsNull) => - Seq(accessor(".keyArray()", "key", keyType, nullable = false), - accessor(".valueArray()", "value", valueType, valueContainsNull)) + // Materialize the key and the value array before we enter the loop. + val keyArray = ctx.freshName("keyArray") + val valueArray = ctx.freshName("valueArray") + val initArrayData = + s""" + |ArrayData $keyArray = ${data.isNull} ? null : ${data.value}.keyArray(); + |ArrayData $valueArray = ${data.isNull} ? null : ${data.value}.valueArray(); + """.stripMargin + val values = Seq( + accessor(keyArray, "key", keyType, nullable = false), + accessor(valueArray, "value", valueType, valueContainsNull)) + (initArrayData, "", values) } // Determine result vars. - val output = if (join) { + val outputValues = if (join) { input ++ values } else { values @@ -173,11 +195,18 @@ case class GenerateExec( val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0" s""" |${data.code} + |$initArrayData |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); |for (int $index = $init; $index < $numElements; $index++) { | $numOutput.add(1); - | ${consume(ctx, output)} + | $initValues + | ${consume(ctx, outputValues)} |} """.stripMargin } + + private def optionalCode(condition: Boolean, code: => String): Seq[String] = { + if (condition) Seq(code) + else Seq.empty + } } From b3531cb3e3d97b9c578be089c7980dbd2d274785 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 3 Jun 2016 23:28:26 +0200 Subject: [PATCH 10/27] Proper support for json_tuple. --- .../expressions/jsonExpressions.scala | 4 - .../spark/sql/execution/GenerateExec.scala | 80 +++++++++++-------- 2 files changed, 46 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 599c4d18eb85c..f47cf337c26f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -366,10 +366,6 @@ case class JsonTuple(children: Seq[Expression]) extends Generator { } } - override def dataType: DataType = StructType(fieldExpressions.zipWithIndex.map { - case (_, idx) => StructField(s"c$idx", StringType, nullable = true) - }) - override def eval(input: InternalRow): Seq[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] if (json == null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 50adf7cf40b09..35e75156d0227 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -114,14 +114,16 @@ case class GenerateExec( override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { boundGenerator match { - case e: Explode => codegen(e.child, ctx, input, row) - case e => codegen(e, ctx, input, row) + case e: Explode => codeGen(ctx, e.child, expand = false, input, row) + case g => codeGen(ctx, g, expand = true, input, row) } } - private def codegen( - e: Expression, + /** Generate code for Generate. */ + private def codeGen( ctx: CodegenContext, + e: Expression, + expand: Boolean, input: Seq[ExprCode], row: ExprCode): String = { ctx.currentVars = input @@ -135,37 +137,19 @@ case class GenerateExec( val index = ctx.freshName("index") val numElements = ctx.freshName("numElements") - // Generate accessor for MapData/Array element(s). - val outerCheck = optionalCode(outer, s"$numElements == 0") - def accessor(source: String, name: String, dt: DataType, nullable: Boolean): ExprCode = { - val value = ctx.freshName(name) - val javaType = ctx.javaType(dt) - val getter = ctx.getValue(source, dt, index) - val checks = outerCheck ++ optionalCode(nullable, s"$source.isNullAt($index)") - if (checks.nonEmpty) { - val isNull = ctx.freshName("isNull") - val code = - s""" - |boolean $isNull = ${checks.mkString(" || ")}; - |$javaType $value = $isNull ? ${ctx.defaultValue(dt)} : $getter; - """.stripMargin - ExprCode(code, isNull, value) - } else { - ExprCode(s"$javaType $value = $getter;", "false", value) - } - } - + // Add a check if the generate outer flag is true. + val checks = optionalCode(outer, data.isNull) val (initArrayData, initValues, values) = e.dataType match { - /* - case ArrayType(st: StructType, nullable) if output.size > 1 => - val rowCode = accessor("", "col", st, nullable) - st.fields.map { f => - + case ArrayType(st: StructType, nullable) if expand => + val rowCode = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) + val extendedChecks = checks ++ optionalCode(nullable, rowCode.isNull) + val values = st.fields.toSeq.zipWithIndex.map { case (f, i) => + codeGenAccessor(ctx, rowCode.value, f.name, s"$i", f.dataType, f.nullable, extendedChecks) } - ("", rowCode.code, Seq.empty[ExprCode]) -*/ + ("", rowCode.code, values) + case ArrayType(dataType, nullable) => - ("", "", Seq(accessor(data.value, "col", dataType, nullable))) + ("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) case MapType(keyType, valueType, valueContainsNull) => // Materialize the key and the value array before we enter the loop. @@ -177,8 +161,8 @@ case class GenerateExec( |ArrayData $valueArray = ${data.isNull} ? null : ${data.value}.valueArray(); """.stripMargin val values = Seq( - accessor(keyArray, "key", keyType, nullable = false), - accessor(valueArray, "value", valueType, valueContainsNull)) + codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = false, checks), + codeGenAccessor(ctx, valueArray, "value", index, valueType, valueContainsNull, checks)) (initArrayData, "", values) } @@ -205,6 +189,34 @@ case class GenerateExec( """.stripMargin } + /** + * Generate for accessor code for ArrayData and InternalRows. + */ + private def codeGenAccessor( + ctx: CodegenContext, + source: String, + name: String, + index: String, + dt: DataType, + nullable: Boolean, + initialChecks: Seq[String]): ExprCode = { + val value = ctx.freshName(name) + val javaType = ctx.javaType(dt) + val getter = ctx.getValue(source, dt, index) + val checks = initialChecks ++ optionalCode(nullable, s"$source.isNullAt($index)") + if (checks.nonEmpty) { + val isNull = ctx.freshName("isNull") + val code = + s""" + |boolean $isNull = ${checks.mkString(" || ")}; + |$javaType $value = $isNull ? ${ctx.defaultValue(dt)} : $getter; + """.stripMargin + ExprCode(code, isNull, value) + } else { + ExprCode(s"$javaType $value = $getter;", "false", value) + } + } + private def optionalCode(condition: Boolean, code: => String): Seq[String] = { if (condition) Seq(code) else Seq.empty From f86da0f22d45c036f81f6ecdd14ba4843219a351 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 7 Jun 2016 10:14:01 -0700 Subject: [PATCH 11/27] Use TraversableOnce for regular Generators. --- .../expressions/jsonExpressions.scala | 25 ++-- .../spark/sql/execution/GenerateExec.scala | 113 ++++++++++++------ 2 files changed, 88 insertions(+), 50 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index f47cf337c26f0..77adeccb29b0c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -26,7 +26,6 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} -import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -328,9 +327,6 @@ case class GetJsonObject(json: Expression, path: Expression) // scalastyle:on line.size.limit case class JsonTuple(children: Seq[Expression]) extends Generator { - // a row is always returned - override def nullable: Boolean = false - // if processing fails this shared value will be returned @transient private lazy val nullRow: Seq[InternalRow] = new GenericInternalRow(fieldExpressions.length) :: Nil @@ -399,17 +395,16 @@ case class JsonTuple(children: Seq[Expression]) extends Generator { } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val arrayDataClass = classOf[GenericArrayData].getName + val iteratorClass = classOf[Iterator[_]].getName val rowClass = classOf[GenericInternalRow].getName - def newArrayData(p: String): String = s"new $arrayDataClass(new Object[]{new $rowClass($p)})" // Add an empty row to default to. val fieldCount = fieldExpressions.length val nullRow = ctx.freshName("nullRow") ctx.addMutableState( - arrayDataClass, + rowClass, nullRow, - s"this.$nullRow = ${newArrayData(fieldCount.toString)};") + s"this.$nullRow = new $rowClass(${fieldCount.toString});") // Add the field names as a class field and add the foldable field names. val fieldNames = ctx.freshName("fieldNames") @@ -432,19 +427,19 @@ case class JsonTuple(children: Seq[Expression]) extends Generator { // Create the generated code. val jsonSource = jsonExpr.genCode(ctx) - val result = ctx.freshName("result") + val raw = ctx.freshName("raw") + val row = ctx.freshName("row") val jsonTupleClass = classOf[JsonTuple].getName ev.copy(code = s""" |${jsonSource.code} |boolean ${ev.isNull} = false; - |ArrayData ${ev.value} = null; - |if (${jsonSource.isNull}) { - | ${ev.value} = $nullRow; - |} else { + |InternalRow $row = $nullRow; + |if (!(${jsonSource.isNull})) { | ${evalFieldNames.mkString("")} - | Object[] $result = $jsonTupleClass.extractTuple(${jsonSource.value}, $fieldNames); - | ${ev.value} = $result == null ? $nullRow : ${newArrayData(result)}; + | Object[] $raw = $jsonTupleClass.extractTuple(${jsonSource.value}, $fieldNames); + | $row = $raw != null ? new $rowClass($raw) : $nullRow; |} + |$iteratorClass ${ev.value} = $iteratorClass$$.MODULE$$.single($row); """.stripMargin) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 35e75156d0227..87288d759ce7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -113,46 +113,48 @@ case class GenerateExec( } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + ctx.currentVars = input + ctx.copyResult = true + + // Add input rows to the values when we are joining + val values = if (join) { + input + } else { + Seq.empty + } + + // Generate the driving expression. + val data = boundGenerator.genCode(ctx) + boundGenerator match { - case e: Explode => codeGen(ctx, e.child, expand = false, input, row) - case g => codeGen(ctx, g, expand = true, input, row) + case e: Explode => codeGenExplode(ctx, e.child, values, data, row) + case g => codeGenTraversableOnce(ctx, g, values, data, row) } } - /** Generate code for Generate. */ - private def codeGen( + /** + * Generate code for [[Explode]]. + */ + private def codeGenExplode( ctx: CodegenContext, e: Expression, - expand: Boolean, input: Seq[ExprCode], + data: ExprCode, row: ExprCode): String = { - ctx.currentVars = input - ctx.copyResult = true - - // Generate the driving expression. - val data = e.genCode(ctx) // Generate looping variables. - val numOutput = metricTerm(ctx, "numOutputRows") val index = ctx.freshName("index") - val numElements = ctx.freshName("numElements") // Add a check if the generate outer flag is true. val checks = optionalCode(outer, data.isNull) - val (initArrayData, initValues, values) = e.dataType match { - case ArrayType(st: StructType, nullable) if expand => - val rowCode = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) - val extendedChecks = checks ++ optionalCode(nullable, rowCode.isNull) - val values = st.fields.toSeq.zipWithIndex.map { case (f, i) => - codeGenAccessor(ctx, rowCode.value, f.name, s"$i", f.dataType, f.nullable, extendedChecks) - } - ("", rowCode.code, values) + // Generate code for either ArrayData or MapData + val (initMapData, values) = e.dataType match { case ArrayType(dataType, nullable) => - ("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) + ("", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) case MapType(keyType, valueType, valueContainsNull) => - // Materialize the key and the value array before we enter the loop. + // Materialize the key and the value arrays before we enter the loop. val keyArray = ctx.freshName("keyArray") val valueArray = ctx.freshName("valueArray") val initArrayData = @@ -163,34 +165,75 @@ case class GenerateExec( val values = Seq( codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = false, checks), codeGenAccessor(ctx, valueArray, "value", index, valueType, valueContainsNull, checks)) - (initArrayData, "", values) - } - - // Determine result vars. - val outputValues = if (join) { - input ++ values - } else { - values + (initArrayData, values) } // In case of outer we need to make sure the loop is executed at-least once when the array/map // contains no input. We do this by setting the looping index to -1 if there is no input, // evaluation of the array is prevented by a check in the accessor code. + val numElements = ctx.freshName("numElements") val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0" + val numOutput = metricTerm(ctx, "numOutputRows") s""" |${data.code} - |$initArrayData + |$initMapData |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); |for (int $index = $init; $index < $numElements; $index++) { | $numOutput.add(1); - | $initValues - | ${consume(ctx, outputValues)} + | ${consume(ctx, input ++ values)} |} """.stripMargin } /** - * Generate for accessor code for ArrayData and InternalRows. + * Generate code for a regular [[TraversableOnce]] returning [[Generator]]. + */ + private def codeGenTraversableOnce( + ctx: CodegenContext, + e: Expression, + input: Seq[ExprCode], + data: ExprCode, + row: ExprCode): String = { + + // Generate looping variables. + val iterator = ctx.freshName("iterator") + val hasNext = ctx.freshName("hasNext") + val current = ctx.freshName("row") + + // Add a check if the generate outer flag is true. + val checks = optionalCode(outer, hasNext) + val values = e.dataType match { + case ArrayType(st: StructType, nullable) => + st.fields.toSeq.zipWithIndex.map { case (f, i) => + codeGenAccessor(ctx, current, f.name, s"$i", f.dataType, f.nullable, checks) + } + } + + // In case of outer we need to make sure the loop is executed at-least-once when the iterator + // contains no input. We do this by adding an 'outer' variable which guarantees execution of + // the first iteration even if there is no input. Evaluation of the iterator is prevented by a + // check in the accessor code. + val hasNextCode = s"$hasNext = $iterator.hasNext()" + val outerVal = ctx.freshName("outer") + def concatIfOuter(s1: String, s2: String): String = s1 + (if (outer) s2 else "") + val init = concatIfOuter(s"boolean $hasNextCode", s", $outerVal = true") + val check = concatIfOuter(hasNext, s"|| $outerVal") + val update = concatIfOuter(hasNextCode, s", $outerVal = false") + val next = if (outer) s"$hasNext ? $iterator.next() : null" else s"$iterator.next()" + val numOutput = metricTerm(ctx, "numOutputRows") + s""" + |${data.code} + |scala.collection.Iterator $iterator = ${data.value}.toIterator(); + |for ($init; $check; $update) { + | $numOutput.add(1); + | InternalRow $current = (InternalRow) $next; + | ${consume(ctx, input ++ values)} + |} + """.stripMargin + } + + /** + * Generate accessor code for ArrayData and InternalRows. */ private def codeGenAccessor( ctx: CodegenContext, @@ -210,7 +253,7 @@ case class GenerateExec( s""" |boolean $isNull = ${checks.mkString(" || ")}; |$javaType $value = $isNull ? ${ctx.defaultValue(dt)} : $getter; - """.stripMargin + """.stripMargin ExprCode(code, isNull, value) } else { ExprCode(s"$javaType $value = $getter;", "false", value) From 1d2d595c0f5ac52fc896c966c3258f67ec07aabc Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 8 Jun 2016 18:21:25 -0700 Subject: [PATCH 12/27] Add benchmarks for explode map & json_tuple --- .../execution/benchmark/MiscBenchmark.scala | 56 ++++++++++++++++--- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index 59a8d18d23ed2..ffff5e6c5f831 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -102,7 +102,7 @@ class MiscBenchmark extends BenchmarkBase { } benchmark.run() - /** + /* Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- @@ -124,7 +124,7 @@ class MiscBenchmark extends BenchmarkBase { } benchmark.run() - /** + /* model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- @@ -135,20 +135,58 @@ class MiscBenchmark extends BenchmarkBase { ignore("generate explode") { val N = 1 << 24 - runBenchmark("generate explode", N) { + runBenchmark("generate explode array", N) { val df = sparkSession.range(N).selectExpr( "id as key", "array(rand(), rand(), rand(), rand(), rand()) as values") df.selectExpr("key", "explode(values) value").count() } - /** - Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 4.4.0-21-generic - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - generate explode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.4 + Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + + generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + generate explode array wholestage off 7095 / 7331 2.4 422.9 1.0X + generate explode array wholestage on 631 / 641 26.6 37.6 11.2X + */ + + runBenchmark("generate explode map", N) { + val df = sparkSession.range(N).selectExpr( + "id as key", + "map('a', rand(), 'b', rand(), 'c', rand(), 'd', rand(), 'e', rand()) pairs") + df.selectExpr("key", "explode(pairs) as (k, v)").count() + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.4 + Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + + generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + generate explode map wholestage off 12856 / 13569 1.3 766.3 1.0X + generate explode map wholestage on 865 / 873 19.4 51.6 14.9X + */ + } + + ignore("generate regular generator") { + val N = 1 << 20 + runBenchmark("generate json_tuple", N) { + val df = sparkSession.range(N).selectExpr( + "id as key", + "concat('{key: ', id, ', value: \\'v_', id, '\\'}') json") + df.selectExpr("key", "json_tuple(json, 'key', 'value') as (k, v)").count() + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.4 + Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + + generate json_tuple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - generate explode wholestage off 8916 / 9250 1.9 531.5 1.0X - generate explode wholestage on 732 / 781 22.9 43.6 12.2X + generate json_tuple wholestage off 3136 / 3229 0.3 2990.4 1.0X + generate json_tuple wholestage on 2190 / 2211 0.5 2088.2 1.4X */ } } From c9b3eda19b9d4f1071d33760bd96d8875566fc47 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 8 Jun 2016 18:43:57 -0700 Subject: [PATCH 13/27] fix generated json --- .../spark/sql/execution/benchmark/MiscBenchmark.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index ffff5e6c5f831..c60234dedd624 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -170,12 +170,12 @@ class MiscBenchmark extends BenchmarkBase { */ } - ignore("generate regular generator") { - val N = 1 << 20 + test("generate regular generator") { + val N = 1 << 24 runBenchmark("generate json_tuple", N) { val df = sparkSession.range(N).selectExpr( "id as key", - "concat('{key: ', id, ', value: \\'v_', id, '\\'}') json") + "concat('{\"key\": ', id, ', \"value\": \"v_', id, '\"}') json") df.selectExpr("key", "json_tuple(json, 'key', 'value') as (k, v)").count() } @@ -185,8 +185,8 @@ class MiscBenchmark extends BenchmarkBase { generate json_tuple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - generate json_tuple wholestage off 3136 / 3229 0.3 2990.4 1.0X - generate json_tuple wholestage on 2190 / 2211 0.5 2088.2 1.4X + generate json_tuple wholestage off 13097 / 13323 1.3 780.6 1.0X + generate json_tuple wholestage on 11815 / 11899 1.4 704.2 1.1X */ } } From 2732b06ee9843c7492d479c29fff6ce0e9025a89 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 8 Jun 2016 20:06:06 -0700 Subject: [PATCH 14/27] disable benchmark --- .../apache/spark/sql/execution/benchmark/MiscBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index c60234dedd624..90a2bf3531834 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -170,7 +170,7 @@ class MiscBenchmark extends BenchmarkBase { */ } - test("generate regular generator") { + ignore("generate regular generator") { val N = 1 << 24 runBenchmark("generate json_tuple", N) { val df = sparkSession.range(N).selectExpr( From 36cd8261575158e618ddc0439c3264b627022c02 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sun, 26 Jun 2016 13:31:25 -0700 Subject: [PATCH 15/27] Add tests for generate with outer = true --- .../spark/sql/execution/GenerateExec.scala | 6 +- .../org/apache/spark/sql/GeneratorSuite.scala | 55 +++++++++++++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 7 --- 3 files changed, 58 insertions(+), 10 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 87288d759ce7b..f4b14265da912 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -201,7 +201,7 @@ case class GenerateExec( val current = ctx.freshName("row") // Add a check if the generate outer flag is true. - val checks = optionalCode(outer, hasNext) + val checks = optionalCode(outer, s"!$hasNext") val values = e.dataType match { case ArrayType(st: StructType, nullable) => st.fields.toSeq.zipWithIndex.map { case (f, i) => @@ -212,7 +212,7 @@ case class GenerateExec( // In case of outer we need to make sure the loop is executed at-least-once when the iterator // contains no input. We do this by adding an 'outer' variable which guarantees execution of // the first iteration even if there is no input. Evaluation of the iterator is prevented by a - // check in the accessor code. + // checks in the next() and accessor code. val hasNextCode = s"$hasNext = $iterator.hasNext()" val outerVal = ctx.freshName("outer") def concatIfOuter(s1: String, s2: String): String = s1 + (if (outer) s2 else "") @@ -226,7 +226,7 @@ case class GenerateExec( |scala.collection.Iterator $iterator = ${data.value}.toIterator(); |for ($init; $check; $update) { | $numOutput.add(1); - | InternalRow $current = (InternalRow) $next; + | InternalRow $current = (InternalRow)($next); | ${consume(ctx, input ++ values)} |} """.stripMargin diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala new file mode 100644 index 0000000000000..e35de0c99c906 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, Generator} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructType} + +case class EmptyGenerator() extends Generator { + override def children: Seq[Expression] = Nil + override def elementSchema: StructType = new StructType().add("id", IntegerType) + override def eval(input: InternalRow): TraversableOnce[InternalRow] = Seq.empty + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val iteratorClass = classOf[Iterator[_]].getName + ev.copy(code = s"$iteratorClass ${ev.value} = $iteratorClass$$.MODULE$$.empty();") + } +} + +class GeneratorSuite extends QueryTest with SharedSQLContext { + test("SPARK-14986: Outer lateral view with empty generate expression") { + checkAnswer( + sql("select nil from values 1 lateral view outer explode(array()) n as nil"), + Row(null) :: Nil + ) + } + + test("outer explode()") { + checkAnswer( + sql("select * from values 1, 2 lateral view outer explode(array()) a as b"), + Row(1, null) :: Row(2, null) :: Nil) + } + + test("outer generator()") { + spark.sessionState.functionRegistry.registerFunction("empty_gen", _ => EmptyGenerator()) + checkAnswer( + sql("select * from values 1, 2 lateral view outer empty_gen() a as b"), + Row(1, null) :: Row(2, null) :: Nil) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8284e8d6d89b6..1165d929c2d7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2548,13 +2548,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-14986: Outer lateral view with empty generate expression") { - checkAnswer( - sql("select nil from (select 1 as x ) x lateral view outer explode(array()) n as nil"), - Row(null) :: Nil - ) - } - test("data source table created in InMemoryCatalog should be able to read/write") { withTable("tbl") { sql("CREATE TABLE tbl(i INT, j STRING) USING parquet") From c41e308a261fa0303d45a63306732af5909c373e Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 29 Aug 2016 17:32:47 +0200 Subject: [PATCH 16/27] Add new generators & update. --- .../sql/catalyst/expressions/generators.scala | 89 ++++++++++++++----- .../spark/sql/execution/GenerateExec.scala | 39 ++++++-- 2 files changed, 99 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 07841704ed5d7..d10de1779e6e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import scala.collection.AbstractIterator + import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -77,7 +79,9 @@ case class UserDefinedGenerator( private def initializeConverters(): Unit = { inputRow = new InterpretedProjection(children) convertToScala = { - val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true))) + val inputSchema = StructType(children.map { e => + StructField(e.simpleString, e.dataType, nullable = true) + }) CatalystTypeConverters.createToScalaConverter(inputSchema) }.asInstanceOf[InternalRow => Row] } @@ -104,8 +108,7 @@ case class UserDefinedGenerator( @ExpressionDescription( usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.", extended = "> SELECT _FUNC_(2, 1, 2, 3);\n [1,2]\n [3,null]") -case class Stack(children: Seq[Expression]) - extends Expression with Generator with CodegenFallback { +case class Stack(children: Seq[Expression]) extends Expression with Generator { private lazy val numRows = children.head.eval().asInstanceOf[Int] private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt @@ -144,23 +147,64 @@ case class Stack(children: Seq[Expression]) InternalRow(fields: _*) } } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val values = children.tail + val iteratorClass = classOf[AbstractIterator[_]].getName + val rowClass = classOf[GenericInternalRow].getName + def genRowUpdate(row: Int): String = { + val columnUpdates = (0 until numFields).map { col => + val index = row * numFields + col + if (index < values.length) { + val e = values(index) + val eval = values(index).genCode(ctx) + val update = s"data[$col] = ${eval.value};" + if (e.nullable) { + s"if (${eval.isNull}) data[$col] = null; else $update" + } else { + update + } + } else { + s"data[$col] = null;" + } + } + s"case $row:\n${columnUpdates.mkString("\n")}\nbreak;" + } + ev.copy(code = + s""" + |$iteratorClass ${ev.value} = new $iteratorClass<$rowClass>() { + | private int n = 0; + | private Object[] data = new Object[$numFields]; + | private $rowClass row = new $rowClass(data); + | public boolean hasNext() { + | return n < $numRows; + | } + | public $rowClass next() { + | switch (n) { + | ${(0 until numRows).map(genRowUpdate).mkString("\n")} + | } + | n++; + | return row; + | } + |}; + """.stripMargin) + } } /** - * A base class for Explode and PosExplode + * A base class for [[Inline]], [[Explode]] and [[PosExplode]]. */ -abstract class ExplodeBase(child: Expression, position: Boolean) - extends UnaryExpression with Generator with CodegenFallback with Serializable { +abstract class ExplodeBase extends UnaryExpression with Generator with Serializable { + val position: Boolean override def children: Seq[Expression] = child :: Nil - override def checkInputDataTypes(): TypeCheckResult = { - if (child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType]) { + override def checkInputDataTypes(): TypeCheckResult = child.dataType match { + case _: ArrayType | _: MapType => TypeCheckResult.TypeCheckSuccess - } else { + case _ => TypeCheckResult.TypeCheckFailure( s"input to function explode should be array or map type, not ${child.dataType}") - } } // hive-compatible default alias for explode function ("col" for array, "key", "value" for map) @@ -168,7 +212,7 @@ abstract class ExplodeBase(child: Expression, position: Boolean) case ArrayType(et, containsNull) => if (position) { new StructType() - .add("pos", IntegerType, false) + .add("pos", IntegerType, nullable = false) .add("col", et, containsNull) } else { new StructType() @@ -177,12 +221,12 @@ abstract class ExplodeBase(child: Expression, position: Boolean) case MapType(kt, vt, valueContainsNull) => if (position) { new StructType() - .add("pos", IntegerType, false) - .add("key", kt, false) + .add("pos", IntegerType, nullable = false) + .add("key", kt, nullable = false) .add("value", vt, valueContainsNull) } else { new StructType() - .add("key", kt, false) + .add("key", kt, nullable = false) .add("value", vt, valueContainsNull) } } @@ -235,7 +279,9 @@ abstract class ExplodeBase(child: Expression, position: Boolean) usage = "_FUNC_(a) - Separates the elements of array a into multiple rows, or the elements of map a into multiple rows and columns.", extended = "> SELECT _FUNC_(array(10,20));\n 10\n 20") // scalastyle:on line.size.limit -case class Explode(child: Expression) extends ExplodeBase(child, position = false) +case class Explode(child: Expression) extends ExplodeBase { + override val position: Boolean = false +} /** * Given an input array produces a sequence of rows for each position and value in the array. @@ -251,7 +297,9 @@ case class Explode(child: Expression) extends ExplodeBase(child, position = fals usage = "_FUNC_(a) - Separates the elements of array a into multiple rows with positions, or the elements of a map into multiple rows and columns with positions.", extended = "> SELECT _FUNC_(array(10,20));\n 0\t10\n 1\t20") // scalastyle:on line.size.limit -case class PosExplode(child: Expression) extends ExplodeBase(child, position = true) +case class PosExplode(child: Expression) extends ExplodeBase { + override val position = true +} /** * Explodes an array of structs into a table. @@ -259,12 +307,11 @@ case class PosExplode(child: Expression) extends ExplodeBase(child, position = t @ExpressionDescription( usage = "_FUNC_(a) - Explodes an array of structs into a table.", extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n [1,a]\n [2,b]") -case class Inline(child: Expression) extends UnaryExpression with Generator with CodegenFallback { - - override def children: Seq[Expression] = child :: Nil +case class Inline(child: Expression) extends ExplodeBase { + override val position: Boolean = false override def checkInputDataTypes(): TypeCheckResult = child.dataType match { - case ArrayType(et, _) if et.isInstanceOf[StructType] => + case ArrayType(st: StructType, _) => TypeCheckResult.TypeCheckSuccess case _ => TypeCheckResult.TypeCheckFailure( @@ -272,7 +319,7 @@ case class Inline(child: Expression) extends UnaryExpression with Generator with } override def elementSchema: StructType = child.dataType match { - case ArrayType(et : StructType, _) => et + case ArrayType(st: StructType, _) => st } private lazy val numFields = elementSchema.fields.length diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 51ba9b701c68b..86a58eb24044e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -127,17 +127,17 @@ case class GenerateExec( val data = boundGenerator.genCode(ctx) boundGenerator match { - case e: Explode => codeGenExplode(ctx, e.child, values, data, row) + case e: ExplodeBase => codeGenExplode(ctx, e, values, data, row) case g => codeGenTraversableOnce(ctx, g, values, data, row) } } /** - * Generate code for [[Explode]]. + * Generate code for [[ExplodeBase]] expressions. */ private def codeGenExplode( ctx: CodegenContext, - e: Expression, + e: ExplodeBase, input: Seq[ExprCode], data: ExprCode, row: ExprCode): String = { @@ -148,10 +148,32 @@ case class GenerateExec( // Add a check if the generate outer flag is true. val checks = optionalCode(outer, data.isNull) + // Add position + val position = if (e.position) { + Seq(ExprCode("", "false", index)) + } else { + Seq.empty + } + // Generate code for either ArrayData or MapData - val (initMapData, values) = e.dataType match { + val (initMapData, updateRowData, values) = e.child.dataType match { + case ArrayType(struct: StructType, nullable) if e.isInstanceOf[Inline] => + val row = codeGenAccessor(ctx, data.value, "col", index, struct, nullable, checks) + val extendedChecks = checks ++ optionalCode(nullable, row.isNull) + val columns = struct.fields.toSeq.zipWithIndex.map { case (field, fieldIndex) => + codeGenAccessor( + ctx, + row.value, + field.name, + fieldIndex.toString, + field.dataType, + field.nullable, + extendedChecks) + } + ("", row.code, columns) + case ArrayType(dataType, nullable) => - ("", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) + ("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) case MapType(keyType, valueType, valueContainsNull) => // Materialize the key and the value arrays before we enter the loop. @@ -165,7 +187,7 @@ case class GenerateExec( val values = Seq( codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = false, checks), codeGenAccessor(ctx, valueArray, "value", index, valueType, valueContainsNull, checks)) - (initArrayData, values) + (initArrayData, "", values) } // In case of outer we need to make sure the loop is executed at-least once when the array/map @@ -180,7 +202,8 @@ case class GenerateExec( |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); |for (int $index = $init; $index < $numElements; $index++) { | $numOutput.add(1); - | ${consume(ctx, input ++ values)} + | $updateRowData + | ${consume(ctx, input ++ position ++ values)} |} """.stripMargin } @@ -211,7 +234,7 @@ case class GenerateExec( // In case of outer we need to make sure the loop is executed at-least-once when the iterator // contains no input. We do this by adding an 'outer' variable which guarantees execution of - // the first iteration even if there is no input. Evaluation of the iterator is prevented by a + // the first iteration even if there is no input. Evaluation of the iterator is prevented by // checks in the next() and accessor code. val hasNextCode = s"$hasNext = $iterator.hasNext()" val outerVal = ctx.freshName("outer") From 116339a26caf7009eaba7eaa9529d33ebf4f1371 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 30 Aug 2016 12:10:17 +0200 Subject: [PATCH 17/27] Fix Stack --- .../sql/catalyst/expressions/generators.scala | 81 +++++++++---------- .../spark/sql/execution/GenerateExec.scala | 27 +++---- 2 files changed, 47 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index d10de1779e6e4..9d210f519f33a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.expressions -import scala.collection.AbstractIterator - import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -64,6 +62,21 @@ trait Generator extends Expression { def terminate(): TraversableOnce[InternalRow] = Nil } +/** + * A collection producing [[Generator]]. This trait provides a different path for code generation, + * by allowing code generation to return either an [[ArrayData]] or a [[MapData]] object. + */ +trait CollectionGenerator extends Generator { + /** The position of an element within the collection should also be returned. */ + def position: Boolean + + /** Rows will be inlined during generation. */ + def inline: Boolean + + /** The schema of the returned collection object. */ + def collectionSchema: DataType = dataType +} + /** * A generator that produces its output using the provided lambda function. */ @@ -108,7 +121,9 @@ case class UserDefinedGenerator( @ExpressionDescription( usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.", extended = "> SELECT _FUNC_(2, 1, 2, 3);\n [1,2]\n [3,null]") -case class Stack(children: Seq[Expression]) extends Expression with Generator { +case class Stack(children: Seq[Expression]) extends CollectionGenerator { + override val position: Boolean = false + override val inline: Boolean = true private lazy val numRows = children.head.eval().asInstanceOf[Int] private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt @@ -150,54 +165,23 @@ case class Stack(children: Seq[Expression]) extends Expression with Generator { override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val values = children.tail - val iteratorClass = classOf[AbstractIterator[_]].getName - val rowClass = classOf[GenericInternalRow].getName - def genRowUpdate(row: Int): String = { - val columnUpdates = (0 until numFields).map { col => + val dataTypes = values.take(numFields).map(_.dataType) + val rows = for (row <- 0 until numRows) yield { + val fields = for (col <- 0 until numFields) yield { val index = row * numFields + col - if (index < values.length) { - val e = values(index) - val eval = values(index).genCode(ctx) - val update = s"data[$col] = ${eval.value};" - if (e.nullable) { - s"if (${eval.isNull}) data[$col] = null; else $update" - } else { - update - } - } else { - s"data[$col] = null;" - } + if (index < values.length) values(index) else Literal(null, dataTypes(col)) } - s"case $row:\n${columnUpdates.mkString("\n")}\nbreak;" + CreateStruct(fields) } - ev.copy(code = - s""" - |$iteratorClass ${ev.value} = new $iteratorClass<$rowClass>() { - | private int n = 0; - | private Object[] data = new Object[$numFields]; - | private $rowClass row = new $rowClass(data); - | public boolean hasNext() { - | return n < $numRows; - | } - | public $rowClass next() { - | switch (n) { - | ${(0 until numRows).map(genRowUpdate).mkString("\n")} - | } - | n++; - | return row; - | } - |}; - """.stripMargin) + CreateArray(rows).genCode(ctx) } } /** - * A base class for [[Inline]], [[Explode]] and [[PosExplode]]. + * A base class for [[Explode]] and [[PosExplode]]. */ -abstract class ExplodeBase extends UnaryExpression with Generator with Serializable { - val position: Boolean - - override def children: Seq[Expression] = child :: Nil +abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with Serializable { + override val inline: Boolean = false override def checkInputDataTypes(): TypeCheckResult = child.dataType match { case _: ArrayType | _: MapType => @@ -260,6 +244,8 @@ abstract class ExplodeBase extends UnaryExpression with Generator with Serializa } } + override def collectionSchema: DataType = child.dataType + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { child.genCode(ctx) } @@ -307,7 +293,8 @@ case class PosExplode(child: Expression) extends ExplodeBase { @ExpressionDescription( usage = "_FUNC_(a) - Explodes an array of structs into a table.", extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n [1,a]\n [2,b]") -case class Inline(child: Expression) extends ExplodeBase { +case class Inline(child: Expression) extends UnaryExpression with CollectionGenerator { + override val inline: Boolean = true override val position: Boolean = false override def checkInputDataTypes(): TypeCheckResult = child.dataType match { @@ -322,6 +309,8 @@ case class Inline(child: Expression) extends ExplodeBase { case ArrayType(st: StructType, _) => st } + override def collectionSchema: DataType = child.dataType + private lazy val numFields = elementSchema.fields.length override def eval(input: InternalRow): TraversableOnce[InternalRow] = { @@ -333,4 +322,8 @@ case class Inline(child: Expression) extends ExplodeBase { yield inputArray.getStruct(i, numFields) } } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + child.genCode(ctx) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 86a58eb24044e..f71205bf9d216 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -127,17 +127,17 @@ case class GenerateExec( val data = boundGenerator.genCode(ctx) boundGenerator match { - case e: ExplodeBase => codeGenExplode(ctx, e, values, data, row) + case e: CollectionGenerator => codeGenCollection(ctx, e, values, data, row) case g => codeGenTraversableOnce(ctx, g, values, data, row) } } /** - * Generate code for [[ExplodeBase]] expressions. + * Generate code for [[CollectionGenerator]] expressions. */ - private def codeGenExplode( + private def codeGenCollection( ctx: CodegenContext, - e: ExplodeBase, + e: CollectionGenerator, input: Seq[ExprCode], data: ExprCode, row: ExprCode): String = { @@ -156,19 +156,12 @@ case class GenerateExec( } // Generate code for either ArrayData or MapData - val (initMapData, updateRowData, values) = e.child.dataType match { - case ArrayType(struct: StructType, nullable) if e.isInstanceOf[Inline] => - val row = codeGenAccessor(ctx, data.value, "col", index, struct, nullable, checks) - val extendedChecks = checks ++ optionalCode(nullable, row.isNull) - val columns = struct.fields.toSeq.zipWithIndex.map { case (field, fieldIndex) => - codeGenAccessor( - ctx, - row.value, - field.name, - fieldIndex.toString, - field.dataType, - field.nullable, - extendedChecks) + val (initMapData, updateRowData, values) = e.collectionSchema match { + case ArrayType(st: StructType, nullable) if e.inline => + val row = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) + val fieldChecks = checks ++ optionalCode(nullable, row.isNull) + val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) => + codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, f.nullable, fieldChecks) } ("", row.code, columns) From 2c6c7f26e99b119c9e3317bd4243dc7ee4dfb13a Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 30 Aug 2016 14:51:18 +0200 Subject: [PATCH 18/27] Make Stack use the iteration path. --- .../sql/catalyst/expressions/generators.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 9d210f519f33a..d0466b3ff3824 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import scala.collection.mutable + import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -121,9 +123,7 @@ case class UserDefinedGenerator( @ExpressionDescription( usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.", extended = "> SELECT _FUNC_(2, 1, 2, 3);\n [1,2]\n [3,null]") -case class Stack(children: Seq[Expression]) extends CollectionGenerator { - override val position: Boolean = false - override val inline: Boolean = true +case class Stack(children: Seq[Expression]) extends Generator { private lazy val numRows = children.head.eval().asInstanceOf[Int] private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt @@ -164,6 +164,9 @@ case class Stack(children: Seq[Expression]) extends CollectionGenerator { } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + // Rows - we write these into an array. + val rowData = ctx.freshName("rows") + ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new InternalRow[$numRows];") val values = children.tail val dataTypes = values.take(numFields).map(_.dataType) val rows = for (row <- 0 until numRows) yield { @@ -171,9 +174,17 @@ case class Stack(children: Seq[Expression]) extends CollectionGenerator { val index = row * numFields + col if (index < values.length) values(index) else Literal(null, dataTypes(col)) } - CreateStruct(fields) + val eval = CreateStruct(fields).genCode(ctx) + s"${eval.code}\nthis.$rowData[$row] = ${eval.value};" } - CreateArray(rows).genCode(ctx) + + // Create the iterator. + val wrapperClass = classOf[mutable.WrappedArray[_]].getName + ctx.addMutableState( + s"$wrapperClass", + ev.value, + s"this.${ev.value} = $wrapperClass$$.MODULE$$.make(this.$rowData);") + ev.copy(code = rows.mkString("\n"), isNull = "false") } } From d20114bffb3005b74e76c324e4fd421cbcb6f687 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 30 Aug 2016 14:51:32 +0200 Subject: [PATCH 19/27] Update benchmarks --- .../execution/benchmark/MiscBenchmark.scala | 73 ++++++++++++++++--- 1 file changed, 64 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index 90a2bf3531834..f68d6f7a49a1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -143,13 +143,13 @@ class MiscBenchmark extends BenchmarkBase { } /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.4 + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - generate explode array wholestage off 7095 / 7331 2.4 422.9 1.0X - generate explode array wholestage on 631 / 641 26.6 37.6 11.2X + generate explode array wholestage off 6920 / 7129 2.4 412.5 1.0X + generate explode array wholestage on 623 / 646 26.9 37.1 11.1X */ runBenchmark("generate explode map", N) { @@ -160,13 +160,47 @@ class MiscBenchmark extends BenchmarkBase { } /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.4 + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - generate explode map wholestage off 12856 / 13569 1.3 766.3 1.0X - generate explode map wholestage on 865 / 873 19.4 51.6 14.9X + generate explode map wholestage off 11978 / 11993 1.4 714.0 1.0X + generate explode map wholestage on 866 / 919 19.4 51.6 13.8X + */ + + runBenchmark("generate posexplode array", N) { + val df = sparkSession.range(N).selectExpr( + "id as key", + "array(rand(), rand(), rand(), rand(), rand()) as values") + df.selectExpr("key", "posexplode(values) as (idx, value)").count() + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 + Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + + generate posexplode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + generate posexplode array wholestage off 7502 / 7513 2.2 447.1 1.0X + generate posexplode array wholestage on 617 / 623 27.2 36.8 12.2X + */ + + runBenchmark("generate inline array", N) { + val df = sparkSession.range(N).selectExpr( + "id as key", + "array((rand(), rand()), (rand(), rand()), (rand(), 0.0d)) as values") + df.selectExpr("key", "inline(values) as (r1, r2)").count() + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 + Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + + generate inline array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + generate inline array wholestage off 6901 / 6928 2.4 411.3 1.0X + generate inline array wholestage on 1001 / 1010 16.8 59.7 6.9X */ } @@ -180,13 +214,34 @@ class MiscBenchmark extends BenchmarkBase { } /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.4 + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz generate json_tuple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - generate json_tuple wholestage off 13097 / 13323 1.3 780.6 1.0X - generate json_tuple wholestage on 11815 / 11899 1.4 704.2 1.1X + generate json_tuple wholestage off 12695 / 13635 1.3 756.7 1.0X + generate json_tuple wholestage on 12044 / 12162 1.4 717.9 1.1X + */ + + runBenchmark("generate stack", N) { + val df = sparkSession.range(N).selectExpr( + "id as key", + "id % 2 as t1", + "id % 3 as t2", + "id % 5 as t3", + "id % 7 as t4", + "id % 13 as t5") + df.selectExpr("key", "stack(4, t1, t2, t3, t4, t5)").count() + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 + Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + + generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + generate stack wholestage off 12953 / 13070 1.3 772.1 1.0X + generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } } From 757b4704780a0aac81404e87ea46e3eea79fdc0d Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 11 Oct 2016 23:36:23 -0700 Subject: [PATCH 20/27] Revert json_tuple changes --- .../expressions/jsonExpressions.scala | 158 ++++++------------ 1 file changed, 54 insertions(+), 104 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 63b11c6082729..f59ac9982b75d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -24,7 +24,7 @@ import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException} import org.apache.spark.sql.catalyst.util.ParseModes @@ -59,23 +59,23 @@ private[this] object JsonPathParser extends RegexParsers { // parse `[*]` and `[123]` subscripts def subscript: Parser[List[PathInstruction]] = - for { - operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']' - } yield { - Subscript :: operand :: Nil - } + for { + operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']' + } yield { + Subscript :: operand :: Nil + } // parse `.name` or `['name']` child expressions def named: Parser[List[PathInstruction]] = - for { - name <- '.' ~> "[^\\.\\[]+".r | "[\\'" ~> "[^\\'\\?]+" <~ "\\']" - } yield { - Key :: Named(name) :: Nil - } + for { + name <- '.' ~> "[^\\.\\[]+".r | "[\\'" ~> "[^\\'\\?]+" <~ "\\']" + } yield { + Key :: Named(name) :: Nil + } // child wildcards: `..`, `.*` or `['*']` def wildcard: Parser[List[PathInstruction]] = - (".*" | "['*']") ^^^ List(Wildcard) + (".*" | "['*']") ^^^ List(Wildcard) def node: Parser[List[PathInstruction]] = wildcard | @@ -200,10 +200,10 @@ case class GetJsonObject(json: Expression, path: Expression) * have been written to the generator */ private def evaluatePath( - p: JsonParser, - g: JsonGenerator, - style: WriteStyle, - path: List[PathInstruction]): Boolean = { + p: JsonParser, + g: JsonGenerator, + style: WriteStyle, + path: List[PathInstruction]): Boolean = { (p.getCurrentToken, path) match { case (VALUE_STRING, Nil) if style == RawStyle => // there is no array wildcard or slice parent, emit this string without quotes @@ -327,22 +327,32 @@ case class GetJsonObject(json: Expression, path: Expression) @ExpressionDescription( usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - like get_json_object, but it takes multiple names and return a tuple. All the input parameters and output column types are string.") // scalastyle:on line.size.limit -case class JsonTuple(children: Seq[Expression]) extends Generator { +case class JsonTuple(children: Seq[Expression]) + extends Generator with CodegenFallback { + + import SharedFactory._ + + override def nullable: Boolean = { + // a row is always returned + false + } // if processing fails this shared value will be returned @transient private lazy val nullRow: Seq[InternalRow] = - new GenericInternalRow(fieldExpressions.length) :: Nil + new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head // the fields to query are the remaining children - @transient private lazy val fieldExpressions: Array[Expression] = children.tail.toArray + @transient private lazy val fieldExpressions: Seq[Expression] = children.tail // eagerly evaluate any foldable the field names - @transient private lazy val foldableFieldNames: Array[String] = fieldExpressions.map { - case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString - case _ => null + @transient private lazy val foldableFieldNames: IndexedSeq[String] = { + fieldExpressions.map { + case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString + case _ => null + }.toIndexedSeq } // and count the number of foldable fields, we'll use this later to optimize evaluation @@ -364,12 +374,28 @@ case class JsonTuple(children: Seq[Expression]) extends Generator { } } - override def eval(input: InternalRow): Seq[InternalRow] = { + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] if (json == null) { return nullRow } + try { + Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) { + parser => parseRow(parser, input) + } + } catch { + case _: JsonProcessingException => + nullRow + } + } + + private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = { + // only objects are supported + if (parser.nextToken() != JsonToken.START_OBJECT) { + return nullRow + } + // evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String val fieldNames = if (constantFields == fieldExpressions.length) { @@ -388,84 +414,7 @@ case class JsonTuple(children: Seq[Expression]) extends Generator { } } - val values = JsonTuple.extractTuple(json, fieldNames) - if (values != null) { - new GenericInternalRow(values) :: Nil - } else { - nullRow - } - } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val iteratorClass = classOf[Iterator[_]].getName - val rowClass = classOf[GenericInternalRow].getName - - // Add an empty row to default to. - val fieldCount = fieldExpressions.length - val nullRow = ctx.freshName("nullRow") - ctx.addMutableState( - rowClass, - nullRow, - s"this.$nullRow = new $rowClass(${fieldCount.toString});") - - // Add the field names as a class field and add the foldable field names. - val fieldNames = ctx.freshName("fieldNames") - val fieldNameValues = foldableFieldNames.map { - case null => "null" - case s => '"' + s + '"' - } - val fieldNamesInitCode = s"this.$fieldNames = new String[]{${fieldNameValues.mkString(", ")}};" - ctx.addMutableState("String[]", fieldNames, fieldNamesInitCode) - - // Resolve the non-foldable field names. - val evalFieldNames = foldableFieldNames.zip(fieldExpressions).zipWithIndex.collect { - case ((null, e), i) => - val code = e.genCode(ctx) - s""" - |${code.code} - |$fieldNames[$i] = ${code.isNull} ? null : ${code.value}; - """.stripMargin - } - - // Create the generated code. - val jsonSource = jsonExpr.genCode(ctx) - val raw = ctx.freshName("raw") - val row = ctx.freshName("row") - val jsonTupleClass = classOf[JsonTuple].getName - ev.copy(code = s""" - |${jsonSource.code} - |boolean ${ev.isNull} = false; - |InternalRow $row = $nullRow; - |if (!(${jsonSource.isNull})) { - | ${evalFieldNames.mkString("")} - | Object[] $raw = $jsonTupleClass.extractTuple(${jsonSource.value}, $fieldNames); - | $row = $raw != null ? new $rowClass($raw) : $nullRow; - |} - |$iteratorClass ${ev.value} = $iteratorClass$$.MODULE$$.single($row); - """.stripMargin) - } -} - -object JsonTuple { - import SharedFactory._ - - def extractTuple(json: UTF8String, fieldNames: Array[String]): Array[Any] = { - try { - Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) { parser => - extractTuple(parser, fieldNames) - } - } catch { - case _: JsonProcessingException => null - } - } - - private def extractTuple(parser: JsonParser, fieldNames: Array[String]): Array[Any] = { - // only objects are supported - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null - } - - val values = Array.ofDim[Any](fieldNames.length) + val row = Array.ofDim[Any](fieldNames.length) // start reading through the token stream, looking for any requested field names while (parser.nextToken() != JsonToken.END_OBJECT) { @@ -482,15 +431,16 @@ object JsonTuple { generator => copyCurrentStructure(generator, parser) } - values(idx) = UTF8String.fromBytes(output.toByteArray) + row(idx) = UTF8String.fromBytes(output.toByteArray) } } } + // always skip children, it's cheap enough to do even if copyCurrentStructure was called parser.skipChildren() } - values + new GenericInternalRow(row) :: Nil } private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { @@ -543,4 +493,4 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: } override def inputTypes: Seq[AbstractDataType] = StringType :: Nil -} +} \ No newline at end of file From 8c1419414aee2873497d6ce6564cc349f6f4f80e Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 11 Oct 2016 23:58:56 -0700 Subject: [PATCH 21/27] Touch-ups --- .../sql/catalyst/expressions/jsonExpressions.scala | 2 +- .../apache/spark/sql/execution/GenerateExec.scala | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index f59ac9982b75d..2d3b2039a2b2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -493,4 +493,4 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: } override def inputTypes: Seq[AbstractDataType] = StringType :: Nil -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index f71205bf9d216..21342db3feb87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -183,9 +183,9 @@ case class GenerateExec( (initArrayData, "", values) } - // In case of outer we need to make sure the loop is executed at-least once when the array/map - // contains no input. We do this by setting the looping index to -1 if there is no input, - // evaluation of the array is prevented by a check in the accessor code. + // In case of outer=true we need to make sure the loop is executed at-least once when the + // array/map contains no input. We do this by setting the looping index to -1 if there is no + // input, evaluation of the array is prevented by a check in the accessor code. val numElements = ctx.freshName("numElements") val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0" val numOutput = metricTerm(ctx, "numOutputRows") @@ -225,10 +225,10 @@ case class GenerateExec( } } - // In case of outer we need to make sure the loop is executed at-least-once when the iterator - // contains no input. We do this by adding an 'outer' variable which guarantees execution of - // the first iteration even if there is no input. Evaluation of the iterator is prevented by - // checks in the next() and accessor code. + // In case of outer=true we need to make sure the loop is executed at-least-once when the + // iterator contains no input. We do this by adding an 'outer' variable which guarantees + // execution of the first iteration even if there is no input. Evaluation of the iterator is + // prevented by checks in the next() and accessor code. val hasNextCode = s"$hasNext = $iterator.hasNext()" val outerVal = ctx.freshName("outer") def concatIfOuter(s1: String, s2: String): String = s1 + (if (outer) s2 else "") From 459714c7f39046d3b7969e9133ef8aea7641d80d Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 12 Oct 2016 00:03:53 -0700 Subject: [PATCH 22/27] Touch-ups --- .../expressions/jsonExpressions.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 2d3b2039a2b2d..65dbd6a4e3f1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -59,23 +59,23 @@ private[this] object JsonPathParser extends RegexParsers { // parse `[*]` and `[123]` subscripts def subscript: Parser[List[PathInstruction]] = - for { - operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']' - } yield { - Subscript :: operand :: Nil - } + for { + operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']' + } yield { + Subscript :: operand :: Nil + } // parse `.name` or `['name']` child expressions def named: Parser[List[PathInstruction]] = - for { - name <- '.' ~> "[^\\.\\[]+".r | "[\\'" ~> "[^\\'\\?]+" <~ "\\']" - } yield { - Key :: Named(name) :: Nil - } + for { + name <- '.' ~> "[^\\.\\[]+".r | "[\\'" ~> "[^\\'\\?]+" <~ "\\']" + } yield { + Key :: Named(name) :: Nil + } // child wildcards: `..`, `.*` or `['*']` def wildcard: Parser[List[PathInstruction]] = - (".*" | "['*']") ^^^ List(Wildcard) + (".*" | "['*']") ^^^ List(Wildcard) def node: Parser[List[PathInstruction]] = wildcard | @@ -200,10 +200,10 @@ case class GetJsonObject(json: Expression, path: Expression) * have been written to the generator */ private def evaluatePath( - p: JsonParser, - g: JsonGenerator, - style: WriteStyle, - path: List[PathInstruction]): Boolean = { + p: JsonParser, + g: JsonGenerator, + style: WriteStyle, + path: List[PathInstruction]): Boolean = { (p.getCurrentToken, path) match { case (VALUE_STRING, Nil) if style == RawStyle => // there is no array wildcard or slice parent, emit this string without quotes @@ -339,7 +339,7 @@ case class JsonTuple(children: Seq[Expression]) // if processing fails this shared value will be returned @transient private lazy val nullRow: Seq[InternalRow] = - new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil + new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head From f81eed772b2b598e7b0db9aa88b35695984914b3 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 15 Nov 2016 11:49:41 -0800 Subject: [PATCH 23/27] Code review --- .../sql/catalyst/expressions/generators.scala | 18 ++++++++-------- .../spark/sql/execution/GenerateExec.scala | 21 +++++++++++-------- .../execution/benchmark/MiscBenchmark.scala | 17 --------------- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index e5d7f9b56797c..98084561675de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -75,8 +75,8 @@ trait CollectionGenerator extends Generator { /** Rows will be inlined during generation. */ def inline: Boolean - /** The schema of the returned collection object. */ - def collectionSchema: DataType = dataType + /** The type of the returned collection object. */ + def collectionType: DataType = dataType } /** @@ -174,22 +174,22 @@ case class Stack(children: Seq[Expression]) extends Generator { ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new InternalRow[$numRows];") val values = children.tail val dataTypes = values.take(numFields).map(_.dataType) - val rows = for (row <- 0 until numRows) yield { - val fields = for (col <- 0 until numFields) yield { + val code = ctx.splitExpressions(ctx.INPUT_ROW, Seq.tabulate(numRows) { row => + val fields = Seq.tabulate(numFields) { col => val index = row * numFields + col if (index < values.length) values(index) else Literal(null, dataTypes(col)) } val eval = CreateStruct(fields).genCode(ctx) s"${eval.code}\nthis.$rowData[$row] = ${eval.value};" - } + }) - // Create the iterator. + // Create the collection. val wrapperClass = classOf[mutable.WrappedArray[_]].getName ctx.addMutableState( s"$wrapperClass", ev.value, s"this.${ev.value} = $wrapperClass$$.MODULE$$.make(this.$rowData);") - ev.copy(code = rows.mkString("\n"), isNull = "false") + ev.copy(code = code, isNull = "false") } } @@ -260,7 +260,7 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with } } - override def collectionSchema: DataType = child.dataType + override def collectionType: DataType = child.dataType override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { child.genCode(ctx) @@ -340,7 +340,7 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene case ArrayType(st: StructType, _) => st } - override def collectionSchema: DataType = child.dataType + override def collectionType: DataType = child.dataType private lazy val numFields = elementSchema.fields.length diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index d25090c5e140b..026e0facedc72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -43,6 +43,9 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * programming with one important additional feature, which allows the input rows to be joined with * their output. * + * This operator supports whole stage code generation for generators that do not implement + * terminate(). + * * @param generator the generator expression * @param join when true, each output row is implicitly joined with the input tuple that produced * it. @@ -112,7 +115,6 @@ case class GenerateExec( } protected override def doProduce(ctx: CodegenContext): String = { - // We need to add some code here for terminating generators. child.asInstanceOf[CodegenSupport].produce(ctx, this) } @@ -127,12 +129,9 @@ case class GenerateExec( Seq.empty } - // Generate the driving expression. - val data = boundGenerator.genCode(ctx) - boundGenerator match { - case e: CollectionGenerator => codeGenCollection(ctx, e, values, data, row) - case g => codeGenTraversableOnce(ctx, g, values, data, row) + case e: CollectionGenerator => codeGenCollection(ctx, e, values, row) + case g => codeGenTraversableOnce(ctx, g, values, row) } } @@ -143,9 +142,11 @@ case class GenerateExec( ctx: CodegenContext, e: CollectionGenerator, input: Seq[ExprCode], - data: ExprCode, row: ExprCode): String = { + // Generate code for the generator. + val data = e.genCode(ctx) + // Generate looping variables. val index = ctx.freshName("index") @@ -160,7 +161,7 @@ case class GenerateExec( } // Generate code for either ArrayData or MapData - val (initMapData, updateRowData, values) = e.collectionSchema match { + val (initMapData, updateRowData, values) = e.collectionType match { case ArrayType(st: StructType, nullable) if e.inline => val row = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) val fieldChecks = checks ++ optionalCode(nullable, row.isNull) @@ -212,9 +213,11 @@ case class GenerateExec( ctx: CodegenContext, e: Expression, input: Seq[ExprCode], - data: ExprCode, row: ExprCode): String = { + // Generate the code for the generator + val data = e.genCode(ctx) + // Generate looping variables. val iterator = ctx.freshName("iterator") val hasNext = ctx.freshName("hasNext") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index f68d6f7a49a1d..01773c238b0db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -206,23 +206,6 @@ class MiscBenchmark extends BenchmarkBase { ignore("generate regular generator") { val N = 1 << 24 - runBenchmark("generate json_tuple", N) { - val df = sparkSession.range(N).selectExpr( - "id as key", - "concat('{\"key\": ', id, ', \"value\": \"v_', id, '\"}') json") - df.selectExpr("key", "json_tuple(json, 'key', 'value') as (k, v)").count() - } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 - Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz - - generate json_tuple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate json_tuple wholestage off 12695 / 13635 1.3 756.7 1.0X - generate json_tuple wholestage on 12044 / 12162 1.4 717.9 1.1X - */ - runBenchmark("generate stack", N) { val df = sparkSession.range(N).selectExpr( "id as key", From 29c606abf306d3a95ae73bb4a8b62528c7e81227 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 17 Nov 2016 15:30:41 -0800 Subject: [PATCH 24/27] code review --- .../spark/sql/execution/GenerateExec.scala | 12 +++- .../spark/sql/GeneratorFunctionSuite.scala | 34 ++++++++++++ .../org/apache/spark/sql/GeneratorSuite.scala | 55 ------------------- .../execution/WholeStageCodegenSuite.scala | 27 ++++++++- 4 files changed, 70 insertions(+), 58 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 026e0facedc72..94afd257240db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -192,7 +192,11 @@ case class GenerateExec( // array/map contains no input. We do this by setting the looping index to -1 if there is no // input, evaluation of the array is prevented by a check in the accessor code. val numElements = ctx.freshName("numElements") - val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0" + val init = if (outer) { + s"$numElements == 0 ? -1 : 0" + } else { + "0" + } val numOutput = metricTerm(ctx, "numOutputRows") s""" |${data.code} @@ -242,7 +246,11 @@ case class GenerateExec( val init = concatIfOuter(s"boolean $hasNextCode", s", $outerVal = true") val check = concatIfOuter(hasNext, s"|| $outerVal") val update = concatIfOuter(hasNextCode, s", $outerVal = false") - val next = if (outer) s"$hasNext ? $iterator.next() : null" else s"$iterator.next()" + val next = if (outer) { + s"$hasNext ? $iterator.next() : null" + } else { + s"$iterator.next()" + } val numOutput = metricTerm(ctx, "numOutputRows") s""" |${data.code} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index aedc0a8d6f70b..f0995ea1d0025 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -17,8 +17,12 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, Generator} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructType} class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -202,4 +206,34 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.selectExpr("array(struct(a), named_struct('a', b))").selectExpr("inline(*)"), Row(1) :: Row(2) :: Nil) } + + test("SPARK-14986: Outer lateral view with empty generate expression") { + checkAnswer( + sql("select nil from values 1 lateral view outer explode(array()) n as nil"), + Row(null) :: Nil + ) + } + + test("outer explode()") { + checkAnswer( + sql("select * from values 1, 2 lateral view outer explode(array()) a as b"), + Row(1, null) :: Row(2, null) :: Nil) + } + + test("outer generator()") { + spark.sessionState.functionRegistry.registerFunction("empty_gen", _ => EmptyGenerator()) + checkAnswer( + sql("select * from values 1, 2 lateral view outer empty_gen() a as b"), + Row(1, null) :: Row(2, null) :: Nil) + } +} + +case class EmptyGenerator() extends Generator { + override def children: Seq[Expression] = Nil + override def elementSchema: StructType = new StructType().add("id", IntegerType) + override def eval(input: InternalRow): TraversableOnce[InternalRow] = Seq.empty + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val iteratorClass = classOf[Iterator[_]].getName + ev.copy(code = s"$iteratorClass ${ev.value} = $iteratorClass$$.MODULE$$.empty();") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala deleted file mode 100644 index e35de0c99c906..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, Generator} -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} - -case class EmptyGenerator() extends Generator { - override def children: Seq[Expression] = Nil - override def elementSchema: StructType = new StructType().add("id", IntegerType) - override def eval(input: InternalRow): TraversableOnce[InternalRow] = Seq.empty - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val iteratorClass = classOf[Iterator[_]].getName - ev.copy(code = s"$iteratorClass ${ev.value} = $iteratorClass$$.MODULE$$.empty();") - } -} - -class GeneratorSuite extends QueryTest with SharedSQLContext { - test("SPARK-14986: Outer lateral view with empty generate expression") { - checkAnswer( - sql("select nil from values 1 lateral view outer explode(array()) n as nil"), - Row(null) :: Nil - ) - } - - test("outer explode()") { - checkAnswer( - sql("select * from values 1, 2 lateral view outer explode(array()) a as b"), - Row(1, null) :: Row(2, null) :: Nil) - } - - test("outer generator()") { - spark.sessionState.functionRegistry.registerFunction("empty_gen", _ => EmptyGenerator()) - checkAnswer( - sql("select * from values 1, 2 lateral view outer empty_gen() a as b"), - Row(1, null) :: Row(2, null) :: Nil) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index f26e5e7b6990d..4273d6f17b725 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,7 +17,11 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.Row +import org.codehaus.janino.JaninoRuntimeException + +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.expressions.scalalang.typed @@ -113,4 +117,25 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } + + test("generate should be included in WholeStageCodegen") { + import org.apache.spark.sql.functions._ + val ds = spark.range(2).select( + col("id"), + explode(array(col("id") + 1, col("id") + 2)).as("value")) + val plan = ds.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[GenerateExec]).isDefined) + assert(ds.collect() === Array(Row(0, 1), Row(0, 2), Row(1, 2), Row(1, 3))) + } + + test("large inline generate should fail in WholeStageCodegen") { + val N = 1000 + val id = UnresolvedAttribute("id") + val stack = Stack(Literal(N) +: Seq.tabulate(N)(i => Add(id, Literal(i)))) + intercept[Exception] { + spark.range(500).select(Column(stack)).show() + } + } } From af9a5166d309c40d63b1ec167702c5fa3b818f96 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 18 Nov 2016 14:37:16 -0800 Subject: [PATCH 25/27] code review 2 --- .../spark/sql/execution/GenerateExec.scala | 16 +++++++++------- .../sql/execution/WholeStageCodegenSuite.scala | 2 -- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 94afd257240db..b8c7eb4e38041 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -242,14 +242,16 @@ case class GenerateExec( // prevented by checks in the next() and accessor code. val hasNextCode = s"$hasNext = $iterator.hasNext()" val outerVal = ctx.freshName("outer") - def concatIfOuter(s1: String, s2: String): String = s1 + (if (outer) s2 else "") - val init = concatIfOuter(s"boolean $hasNextCode", s", $outerVal = true") - val check = concatIfOuter(hasNext, s"|| $outerVal") - val update = concatIfOuter(hasNextCode, s", $outerVal = false") - val next = if (outer) { - s"$hasNext ? $iterator.next() : null" + val (init, check, update, next) = if (outer) { + (s"boolean $hasNextCode, $outerVal = true", + s"$hasNext || $outerVal", + s"$hasNextCode, $outerVal = false", + s"$hasNext ? $iterator.next() : null") } else { - s"$iterator.next()" + (s"boolean $hasNextCode", + s"$hasNext", + s"$hasNextCode", + s"$iterator.next()") } val numOutput = metricTerm(ctx, "numOutputRows") s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 4273d6f17b725..5bc0c083b2dc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import org.codehaus.janino.JaninoRuntimeException - import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack} From 3146cc514f25ae74d8de86bae39b44fd80da2175 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 18 Nov 2016 15:40:57 -0800 Subject: [PATCH 26/27] Add proper fallback for 'Stack' generator. Make traversable once outer and regular code paths more explicit. --- .../sql/catalyst/expressions/generators.scala | 11 +++++ .../spark/sql/execution/GenerateExec.scala | 45 ++++++++++--------- .../execution/WholeStageCodegenSuite.scala | 21 ++++++--- 3 files changed, 50 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 98084561675de..6c38f4998e914 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -62,6 +62,11 @@ trait Generator extends Expression { * rows can be made here. */ def terminate(): TraversableOnce[InternalRow] = Nil + + /** + * Check if this generator supports code generation. + */ + def supportCodegen: Boolean = !isInstanceOf[CodegenFallback] } /** @@ -168,6 +173,12 @@ case class Stack(children: Seq[Expression]) extends Generator { } } + + /** + * Only support code generation when stack produces 50 rows or less. + */ + override def supportCodegen: Boolean = numRows <= 50 + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { // Rows - we write these into an array. val rowData = ctx.freshName("rows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index b8c7eb4e38041..e87aa972eaea2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -110,6 +110,8 @@ case class GenerateExec( } } + override def supportCodegen: Boolean = generator.supportCodegen + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() } @@ -240,29 +242,32 @@ case class GenerateExec( // iterator contains no input. We do this by adding an 'outer' variable which guarantees // execution of the first iteration even if there is no input. Evaluation of the iterator is // prevented by checks in the next() and accessor code. - val hasNextCode = s"$hasNext = $iterator.hasNext()" val outerVal = ctx.freshName("outer") - val (init, check, update, next) = if (outer) { - (s"boolean $hasNextCode, $outerVal = true", - s"$hasNext || $outerVal", - s"$hasNextCode, $outerVal = false", - s"$hasNext ? $iterator.next() : null") + val numOutput = metricTerm(ctx, "numOutputRows") + if (outer) { + s""" + |${data.code} + |scala.collection.Iterator $iterator = ${data.value}.toIterator(); + |boolean $outerVal = true; + |while ($iterator.hasNext() || $outerVal) { + | $numOutput.add(1); + | boolean $hasNext = $iterator.hasNext(); + | InternalRow $current = (InternalRow)($hasNext? $iterator.next() : null); + | $outerVal = false; + | ${consume(ctx, input ++ values)} + |} + """.stripMargin } else { - (s"boolean $hasNextCode", - s"$hasNext", - s"$hasNextCode", - s"$iterator.next()") + s""" + |${data.code} + |scala.collection.Iterator $iterator = ${data.value}.toIterator(); + |while ($iterator.hasNext()) { + | $numOutput.add(1); + | InternalRow $current = (InternalRow)($iterator.next()); + | ${consume(ctx, input ++ values)} + |} + """.stripMargin } - val numOutput = metricTerm(ctx, "numOutputRows") - s""" - |${data.code} - |scala.collection.Iterator $iterator = ${data.value}.toIterator(); - |for ($init; $check; $update) { - | $numOutput.add(1); - | InternalRow $current = (InternalRow)($next); - | ${consume(ctx, input ++ values)} - |} - """.stripMargin } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 5bc0c083b2dc9..e8ea7758cf598 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.{Column, Dataset, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack} import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -128,12 +128,19 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { assert(ds.collect() === Array(Row(0, 1), Row(0, 2), Row(1, 2), Row(1, 3))) } - test("large inline generate should fail in WholeStageCodegen") { - val N = 1000 - val id = UnresolvedAttribute("id") - val stack = Stack(Literal(N) +: Seq.tabulate(N)(i => Add(id, Literal(i)))) - intercept[Exception] { - spark.range(500).select(Column(stack)).show() + test("large stack generator should not use WholeStageCodegen") { + def createStackGenerator(rows: Int): SparkPlan = { + val id = UnresolvedAttribute("id") + val stack = Stack(Literal(rows) +: Seq.tabulate(rows)(i => Add(id, Literal(i)))) + spark.range(500).select(Column(stack)).queryExecution.executedPlan } + val isCodeGenerated: SparkPlan => Boolean = { + case WholeStageCodegenExec(_: GenerateExec) => true + case _ => false + } + + // Only 'stack' generators that produce 50 rows or less are code generated. + assert(createStackGenerator(50).find(isCodeGenerated).isDefined) + assert(createStackGenerator(100).find(isCodeGenerated).isEmpty) } } From ffd5ef83fe5d5f85582faa275a5a58816da0c712 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 19 Nov 2016 04:00:47 -0800 Subject: [PATCH 27/27] Minor thing --- .../scala/org/apache/spark/sql/execution/GenerateExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index e87aa972eaea2..f80214af43fc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -242,9 +242,9 @@ case class GenerateExec( // iterator contains no input. We do this by adding an 'outer' variable which guarantees // execution of the first iteration even if there is no input. Evaluation of the iterator is // prevented by checks in the next() and accessor code. - val outerVal = ctx.freshName("outer") val numOutput = metricTerm(ctx, "numOutputRows") if (outer) { + val outerVal = ctx.freshName("outer") s""" |${data.code} |scala.collection.Iterator $iterator = ${data.value}.toIterator();