Skip to content

Commit

Permalink
Merge pull request apache#33 from sameeragarwal/fix
Browse files Browse the repository at this point in the history
resolve merge conflicts in vectorized parquet reader
  • Loading branch information
marmbrus committed Mar 18, 2016
2 parents b0cd621 + 93f2cef commit 40037e6
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 382 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " +
s"deprecated and will be ignored. Vectorized parquet reader will be used instead.")
Seq(Row(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, "true"))
}
(keyValueOutput, runFunc)

// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}

Expand Down Expand Up @@ -99,8 +99,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](

// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
protected val enableUnsafeRowParquetReader: Boolean =
sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean
protected val enableVectorizedParquetReader: Boolean =
sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
protected val enableWholestageCodegen: Boolean =
Expand Down Expand Up @@ -174,19 +172,17 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
* fails (for example, unsupported schema), try with the normal reader.
* TODO: plumb this through a different way?
*/
if (enableUnsafeRowParquetReader &&
if (enableVectorizedParquetReader &&
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
val parquetReader: UnsafeRowParquetRecordReader = new UnsafeRowParquetRecordReader()
val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
if (!parquetReader.tryInitialize(
split.serializableHadoopSplit.value, hadoopAttemptContext)) {
parquetReader.close()
} else {
reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
if (enableVectorizedParquetReader) {
parquetReader.resultBatch()
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
if (enableWholestageCodegen) parquetReader.enableReturningBatches();
}
parquetReader.resultBatch()
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
if (enableWholestageCodegen) parquetReader.enableReturningBatches()
}
}

Expand All @@ -203,7 +199,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
private[this] var finished = false

override def hasNext: Boolean = {
if (context.isInterrupted) {
if (context.isInterrupted()) {
throw new TaskKilledException
}
if (!finished && !havePair) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,6 @@ object SQLConf {
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\".")

val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf(
key = "spark.sql.parquet.enableUnsafeRowRecordReader",
defaultValue = Some(true),
doc = "Enables using the custom ParquetUnsafeRowRecordReader.")

val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
key = "spark.sql.parquet.enableVectorizedReader",
defaultValue = Some(true),
Expand Down Expand Up @@ -532,6 +527,7 @@ object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = "spark.sql.parquet.enableUnsafeRowRecordReader"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head

val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
Expand All @@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
data.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head

val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val output = predicate.collect { case a: Attribute => a }.distinct

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
Expand Down Expand Up @@ -446,7 +446,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
Expand Down Expand Up @@ -520,7 +520,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
test("SPARK-11164: test the parquet filter in") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
var hash1: Int = 0
var hash2: Int = 0
(false :: true :: Nil).foreach { v =>
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> v.toString) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> v.toString) {
val df = sqlContext.read.parquet(dir.getCanonicalPath)
val rows = df.queryExecution.toRdd.map(_.copy()).collect()
val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
Expand All @@ -672,13 +672,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("UnsafeRowParquetRecordReader - direct path read") {
val data = (0 to 10).map(i => (i, ((i + 'a').toChar.toString)))
test("VectorizedParquetRecordReader - direct path read") {
val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
withTempPath { dir =>
sqlContext.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
{
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, null)
val result = mutable.ArrayBuffer.empty[(Int, String)]
Expand All @@ -695,7 +695,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {

// Project just one column
{
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, ("_2" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String)]
Expand All @@ -711,7 +711,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {

// Project columns in opposite order
{
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String, Int)]
Expand All @@ -728,7 +728,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {

// Empty projection
{
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, List[String]().asJava)
var result = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,38 +82,17 @@ object ParquetReadBenchmark {
}

sqlBenchmark.addCase("SQL Parquet MR") { iter =>
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(id) from tempTable").collect()
}
}

sqlBenchmark.addCase("SQL Parquet Non-Vectorized") { iter =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(id) from tempTable").collect()
}
}

val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
// Driving the parquet reader directly without Spark.
parquetReaderBenchmark.addCase("ParquetReader Non-Vectorized") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
reader.initialize(p, ("id" :: Nil).asJava)

while (reader.nextKeyValue()) {
val record = reader.getCurrentValue.asInstanceOf[InternalRow]
if (!record.isNullAt(0)) sum += record.getInt(0)
}
reader.close()
}
}

// Driving the parquet reader in batch mode directly.
parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
Expand All @@ -136,7 +115,7 @@ object ParquetReadBenchmark {
parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
Expand All @@ -159,17 +138,15 @@ object ParquetReadBenchmark {
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 215 / 262 73.0 13.7 1.0X
SQL Parquet MR 1946 / 2083 8.1 123.7 0.1X
SQL Parquet Non-Vectorized 1079 / 1213 14.6 68.6 0.2X
*/
sqlBenchmark.run()

/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
ParquetReader Non-Vectorized 610 / 737 25.8 38.8 1.0X
ParquetReader Vectorized 123 / 152 127.8 7.8 5.0X
ParquetReader Vectorized -> Row 165 / 180 95.2 10.5 3.7X
ParquetReader Vectorized 123 / 152 127.8 7.8 1.0X
ParquetReader Vectorized -> Row 165 / 180 95.2 10.5 0.7X
*/
parquetReaderBenchmark.run()
}
Expand All @@ -191,41 +168,19 @@ object ParquetReadBenchmark {
}

benchmark.addCase("SQL Parquet MR") { iter =>
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
}
}

benchmark.addCase("SQL Parquet Non-vectorized") { iter =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
}
}

val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
benchmark.addCase("ParquetReader Non-vectorized") { num =>
var sum1 = 0L
var sum2 = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
reader.initialize(p, null)
while (reader.nextKeyValue()) {
val record = reader.getCurrentValue.asInstanceOf[InternalRow]
if (!record.isNullAt(0)) sum1 += record.getInt(0)
if (!record.isNullAt(1)) sum2 += record.getUTF8String(1).numBytes()
}
reader.close()
}
}

/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 628 / 720 16.7 59.9 1.0X
SQL Parquet MR 1905 / 2239 5.5 181.7 0.3X
SQL Parquet Non-vectorized 1429 / 1732 7.3 136.3 0.4X
ParquetReader Non-vectorized 989 / 1357 10.6 94.3 0.6X
*/
benchmark.run()
}
Expand All @@ -247,7 +202,7 @@ object ParquetReadBenchmark {
}

benchmark.addCase("SQL Parquet MR") { iter =>
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(length(c1)) from tempTable").collect
}
}
Expand Down Expand Up @@ -293,7 +248,7 @@ object ParquetReadBenchmark {
Read data column 191 / 250 82.1 12.2 1.0X
Read partition column 82 / 86 192.4 5.2 2.3X
Read both columns 220 / 248 71.5 14.0 0.9X
*/
*/
benchmark.run()
}
}
Expand All @@ -319,7 +274,7 @@ object ParquetReadBenchmark {
benchmark.addCase("PR Vectorized") { num =>
var sum = 0
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
Expand All @@ -340,7 +295,7 @@ object ParquetReadBenchmark {
benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,10 @@ abstract class HiveComparisonTest
}

try {
new TestHive.QueryExecution(convertedSQL)
val queryExecution = new TestHive.QueryExecution(convertedSQL)
// Trigger the analysis of this converted SQL query.
queryExecution.analyzed
queryExecution
} catch {
case NonFatal(e) => fail(
s"""Failed to analyze the converted SQL string:
Expand Down

0 comments on commit 40037e6

Please sign in to comment.