From 76a746e69728083bad28e33144e70fdf024c5aa7 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Sun, 18 Apr 2021 12:23:35 +0800 Subject: [PATCH] Revert "[SPARK-35097][SQL] Add column name to SparkUpgradeException about ancient datetime" This reverts commit b5a42684eae2af1d271456a97a434258b4882962. --- .../spark/sql/avro/AvroDeserializer.scala | 11 +++-- .../sql/errors/QueryExecutionErrors.scala | 9 +---- .../orc/OrcColumnarBatchReader.java | 4 +- .../parquet/VectorizedColumnReader.java | 40 +++++++------------ .../parquet/VectorizedPlainValuesReader.java | 4 +- .../parquet/VectorizedRleValuesReader.java | 5 +-- .../vectorized/OffHeapColumnVector.java | 10 ++--- .../vectorized/OnHeapColumnVector.java | 10 ++--- .../vectorized/WritableColumnVector.java | 27 +++++-------- .../datasources/DataSourceUtils.scala | 15 +++---- .../parquet/ParquetRowConverter.scala | 14 +++---- .../connector/JavaColumnarDataSourceV2.java | 4 +- .../sql/SparkSessionExtensionSuite.scala | 6 +-- .../sql/connector/DataSourceV2Suite.scala | 4 +- .../compression/BooleanBitSetSuite.scala | 4 +- .../compression/DictionaryEncodingSuite.scala | 2 +- .../compression/IntegralDeltaSuite.scala | 2 +- .../PassThroughEncodingSuite.scala | 2 +- .../compression/RunLengthEncodingSuite.scala | 2 +- .../vectorized/ColumnVectorSuite.scala | 8 ++-- .../vectorized/ColumnarBatchBenchmark.scala | 16 ++++---- .../vectorized/ColumnarBatchSuite.scala | 4 +- 22 files changed, 88 insertions(+), 115 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 6597c56efbede..a19a7b0d0edd1 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -62,10 +62,10 @@ private[sql] class AvroDeserializer( private lazy val decimalConversions = new DecimalConversion() private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead( - datetimeRebaseMode, "Avro")(_, _) + datetimeRebaseMode, "Avro") private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( - datetimeRebaseMode, "Avro")(_, _) + datetimeRebaseMode, "Avro") private val converter: Any => Option[Any] = try { rootCatalystType match { @@ -126,8 +126,7 @@ private[sql] class AvroDeserializer( updater.setInt(ordinal, value.asInstanceOf[Int]) case (INT, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, - dateRebaseFunc(avroType.getName, catalystType)(value.asInstanceOf[Int])) + updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) case (LONG, LongType) => (updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long]) @@ -138,10 +137,10 @@ private[sql] class AvroDeserializer( case null | _: TimestampMillis => (updater, ordinal, value) => val millis = value.asInstanceOf[Long] val micros = DateTimeUtils.millisToMicros(millis) - updater.setLong(ordinal, timestampRebaseFunc(avroType.getName, catalystType)(micros)) + updater.setLong(ordinal, timestampRebaseFunc(micros)) case _: TimestampMicros => (updater, ordinal, value) => val micros = value.asInstanceOf[Long] - updater.setLong(ordinal, timestampRebaseFunc(avroType.getName, catalystType)(micros)) + updater.setLong(ordinal, timestampRebaseFunc(micros)) case other => throw new IncompatibleSchemaException(errorPrefix + s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 8b2827ae14c5f..c5a608e38da56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -394,16 +394,11 @@ object QueryExecutionErrors { } def sparkUpgradeInReadingDatesError( - colName: String, - dataType: DataType, - format: String, - config: String, - option: String): SparkUpgradeException = { + format: String, config: String, option: String): SparkUpgradeException = { new SparkUpgradeException("3.0", s""" |reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from $format - |files can be ambiguous when read column `${colName}` of datatype `${dataType}`, - |as the files may be written by Spark 2.x or legacy versions of + |files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of |Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic |Gregorian calendar. See more details in SPARK-31404. You can set the SQL config |'$config' or the datasource option '$option' to 'LEGACY' to rebase the datetime values diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index e316ec0c830f4..40ed0b2454c12 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -167,7 +167,7 @@ public void initBatch( for (int i = 0; i < requiredFields.length; i++) { DataType dt = requiredFields[i].dataType(); if (requestedPartitionColIds[i] != -1) { - OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, requiredFields[i].name(),dt); + OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt); ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]); partitionCol.setIsConstant(); orcVectorWrappers[i] = partitionCol; @@ -175,7 +175,7 @@ public void initBatch( int colId = requestedDataColIds[i]; // Initialize the missing columns once. if (colId == -1) { - OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, requiredFields[i].name(), dt); + OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); missingCol.putNulls(0, capacity); missingCol.setIsConstant(); orcVectorWrappers[i] = missingCol; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 36fe7d2252a65..52620b0740851 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -190,13 +190,10 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName return isSupported; } - static int rebaseDays( - int julianDays, - final boolean failIfRebase, - WritableColumnVector c) { + static int rebaseDays(int julianDays, final boolean failIfRebase) { if (failIfRebase) { if (julianDays < RebaseDateTime.lastSwitchJulianDay()) { - throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet"); + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); } else { return julianDays; } @@ -208,11 +205,10 @@ static int rebaseDays( private static long rebaseTimestamp( long julianMicros, final boolean failIfRebase, - WritableColumnVector c, final String format) { if (failIfRebase) { if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) { - throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), format); + throw DataSourceUtils.newRebaseExceptionInRead(format); } else { return julianMicros; } @@ -221,18 +217,12 @@ private static long rebaseTimestamp( } } - static long rebaseMicros( - long julianMicros, - final boolean failIfRebase, - WritableColumnVector c) { - return rebaseTimestamp(julianMicros, failIfRebase, c, "Parquet"); + static long rebaseMicros(long julianMicros, final boolean failIfRebase) { + return rebaseTimestamp(julianMicros, failIfRebase, "Parquet"); } - static long rebaseInt96( - long julianMicros, - final boolean failIfRebase, - WritableColumnVector c) { - return rebaseTimestamp(julianMicros, failIfRebase, c, "Parquet INT96"); + static long rebaseInt96(long julianMicros, final boolean failIfRebase) { + return rebaseTimestamp(julianMicros, failIfRebase, "Parquet INT96"); } /** @@ -397,7 +387,7 @@ private void decodeDictionaryIds( for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i)); - column.putInt(i, rebaseDays(julianDays, failIfRebase, column)); + column.putInt(i, rebaseDays(julianDays, failIfRebase)); } } } else { @@ -442,7 +432,7 @@ private void decodeDictionaryIds( if (!column.isNullAt(i)) { long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); long julianMicros = DateTimeUtils.millisToMicros(julianMillis); - column.putLong(i, rebaseMicros(julianMicros, failIfRebase, column)); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); } } } @@ -451,7 +441,7 @@ private void decodeDictionaryIds( for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i)); - column.putLong(i, rebaseMicros(julianMicros, failIfRebase, column)); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); } } } else { @@ -490,7 +480,7 @@ private void decodeDictionaryIds( if (!column.isNullAt(i)) { Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v); - long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); column.putLong(i, gregorianMicros); } } @@ -510,7 +500,7 @@ private void decodeDictionaryIds( if (!column.isNullAt(i)) { Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v); - long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC); column.putLong(i, adjTime); } @@ -650,7 +640,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { long julianMicros = DateTimeUtils.millisToMicros(dataColumn.readLong()); - column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase, column)); + column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase)); } else { column.putNull(rowId + i); } @@ -708,7 +698,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th if (defColumn.readInteger() == maxDefLevel) { // Read 12 bytes for INT96 long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); - long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); column.putLong(rowId + i, gregorianMicros); } else { column.putNull(rowId + i); @@ -732,7 +722,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th if (defColumn.readInteger() == maxDefLevel) { // Read 12 bytes for INT96 long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); - long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC); column.putLong(rowId + i, adjTime); } else { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 95a4dafb7ad8c..6a0038dbdc44c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -107,7 +107,7 @@ public final void readIntegersWithRebase( } if (rebase) { if (failIfRebase) { - throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet"); + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); } else { for (int i = 0; i < total; i += 1) { c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); @@ -164,7 +164,7 @@ public final void readLongsWithRebase( } if (rebase) { if (failIfRebase) { - throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet"); + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); } else { for (int i = 0; i < total; i += 1) { c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 7297861e5dfce..125506d4d5013 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -264,8 +264,7 @@ public void readIntegersWithRebase( for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { int julianDays = data.readInteger(); - c.putInt(rowId + i, - VectorizedColumnReader.rebaseDays(julianDays, failIfRebase, c)); + c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase)); } else { c.putNull(rowId + i); } @@ -493,7 +492,7 @@ public void readLongsWithRebase( for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { long julianMicros = data.readLong(); - c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase, c)); + c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase)); } else { c.putNull(rowId + i); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 80bd2ee590c70..7da5a287710eb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -50,7 +50,7 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructType sch public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) { OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length]; for (int i = 0; i < fields.length; i++) { - vectors[i] = new OffHeapColumnVector(capacity, fields[i].name(), fields[i].dataType()); + vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType()); } return vectors; } @@ -64,8 +64,8 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] private long lengthData; private long offsetData; - public OffHeapColumnVector(int capacity, String colName, DataType type) { - super(capacity, colName, type); + public OffHeapColumnVector(int capacity, DataType type) { + super(capacity, type); nulls = 0; data = 0; @@ -566,7 +566,7 @@ protected void reserveInternal(int newCapacity) { } @Override - protected OffHeapColumnVector reserveNewColumn(int capacity, String colName, DataType type) { - return new OffHeapColumnVector(capacity, colName, type); + protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) { + return new OffHeapColumnVector(capacity, type); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index f7f28f095f11b..5a7d6cc20971b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -50,7 +50,7 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructType sche public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) { OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length]; for (int i = 0; i < fields.length; i++) { - vectors[i] = new OnHeapColumnVector(capacity, fields[i].name(), fields[i].dataType()); + vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType()); } return vectors; } @@ -73,8 +73,8 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] f private int[] arrayLengths; private int[] arrayOffsets; - public OnHeapColumnVector(int capacity, String colName, DataType type) { - super(capacity, colName, type); + public OnHeapColumnVector(int capacity, DataType type) { + super(capacity, type); reserveInternal(capacity); reset(); @@ -580,7 +580,7 @@ protected void reserveInternal(int newCapacity) { } @Override - protected OnHeapColumnVector reserveNewColumn(int capacity, String colName, DataType type) { - return new OnHeapColumnVector(capacity, colName, type); + protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) { + return new OnHeapColumnVector(capacity, type); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 64ee74dba0d44..8c0f1e1257503 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -165,7 +165,7 @@ public void setDictionary(Dictionary dictionary) { */ public WritableColumnVector reserveDictionaryIds(int capacity) { if (dictionaryIds == null) { - dictionaryIds = reserveNewColumn(capacity, colName, DataTypes.IntegerType); + dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType); } else { dictionaryIds.reset(); dictionaryIds.reserve(capacity); @@ -677,11 +677,6 @@ public WritableColumnVector arrayData() { */ public final void setIsConstant() { isConstant = true; } - /** - * Column name of this column. - */ - public String colName; - /** * Maximum number of rows that can be stored in this column. */ @@ -722,7 +717,7 @@ public WritableColumnVector arrayData() { /** * Reserve a new column. */ - protected abstract WritableColumnVector reserveNewColumn(int capacity, String colName, DataType type); + protected abstract WritableColumnVector reserveNewColumn(int capacity, DataType type); protected boolean isArray() { return type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType || @@ -733,9 +728,8 @@ protected boolean isArray() { * Sets up the common state and also handles creating the child columns if this is a nested * type. */ - protected WritableColumnVector(int capacity, String colName, DataType type) { + protected WritableColumnVector(int capacity, DataType type) { super(type); - this.colName = colName; this.capacity = capacity; if (isArray()) { @@ -748,25 +742,24 @@ protected WritableColumnVector(int capacity, String colName, DataType type) { childCapacity *= DEFAULT_ARRAY_LENGTH; } this.childColumns = new WritableColumnVector[1]; - this.childColumns[0] = reserveNewColumn(childCapacity, colName + ".elem", childType); + this.childColumns[0] = reserveNewColumn(childCapacity, childType); } else if (type instanceof StructType) { StructType st = (StructType)type; this.childColumns = new WritableColumnVector[st.fields().length]; for (int i = 0; i < childColumns.length; ++i) { - this.childColumns[i] = reserveNewColumn(capacity, colName + "." + st.fields()[i].name(), - st.fields()[i].dataType()); + this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType()); } } else if (type instanceof MapType) { MapType mapType = (MapType) type; this.childColumns = new WritableColumnVector[2]; - this.childColumns[0] = reserveNewColumn(capacity, colName + ".key", mapType.keyType()); - this.childColumns[1] = reserveNewColumn(capacity, colName + ".value", mapType.valueType()); + this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType()); + this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType()); } else if (type instanceof CalendarIntervalType) { // Three columns. Months as int. Days as Int. Microseconds as Long. this.childColumns = new WritableColumnVector[3]; - this.childColumns[0] = reserveNewColumn(capacity, colName + ".months", DataTypes.IntegerType); - this.childColumns[1] = reserveNewColumn(capacity, colName + ".days", DataTypes.IntegerType); - this.childColumns[2] = reserveNewColumn(capacity, colName + ".microseconds", DataTypes.LongType); + this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType); + this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType); + this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType); } else { this.childColumns = null; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 2dc680a0e5691..2b10e4efd9ab8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -131,10 +131,7 @@ object DataSourceUtils { }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) } - def newRebaseExceptionInRead( - colName: String, - dataType: DataType, - format: String): SparkUpgradeException = { + def newRebaseExceptionInRead(format: String): SparkUpgradeException = { val (config, option) = format match { case "Parquet INT96" => (SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key, ParquetOptions.INT96_REBASE_MODE) @@ -144,7 +141,7 @@ object DataSourceUtils { (SQLConf.AVRO_REBASE_MODE_IN_READ.key, "datetimeRebaseMode") case _ => throw QueryExecutionErrors.unrecognizedFileFormatError(format) } - QueryExecutionErrors.sparkUpgradeInReadingDatesError(colName, dataType, format, config, option) + QueryExecutionErrors.sparkUpgradeInReadingDatesError(format, config, option) } def newRebaseExceptionInWrite(format: String): SparkUpgradeException = { @@ -159,10 +156,10 @@ object DataSourceUtils { def creteDateRebaseFuncInRead( rebaseMode: LegacyBehaviorPolicy.Value, - format: String)(colName: String, dataType: DataType): Int => Int = rebaseMode match { + format: String): Int => Int = rebaseMode match { case LegacyBehaviorPolicy.EXCEPTION => days: Int => if (days < RebaseDateTime.lastSwitchJulianDay) { - throw DataSourceUtils.newRebaseExceptionInRead(colName, dataType, format) + throw DataSourceUtils.newRebaseExceptionInRead(format) } days case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianDays @@ -183,10 +180,10 @@ object DataSourceUtils { def creteTimestampRebaseFuncInRead( rebaseMode: LegacyBehaviorPolicy.Value, - format: String)(colName: String, dataType: DataType): Long => Long = rebaseMode match { + format: String): Long => Long = rebaseMode match { case LegacyBehaviorPolicy.EXCEPTION => micros: Long => if (micros < RebaseDateTime.lastSwitchJulianTs) { - throw DataSourceUtils.newRebaseExceptionInRead(colName, dataType, format) + throw DataSourceUtils.newRebaseExceptionInRead(format) } micros case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianMicros diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index d02e162c2b612..0a1cca7ed0f3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -189,13 +189,13 @@ private[parquet] class ParquetRowConverter( def currentRecord: InternalRow = currentRow private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead( - datetimeRebaseMode, "Parquet")(_, _) + datetimeRebaseMode, "Parquet") private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( - datetimeRebaseMode, "Parquet")(_, _) + datetimeRebaseMode, "Parquet") private val int96RebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( - int96RebaseMode, "Parquet INT96")(_, _) + int96RebaseMode, "Parquet INT96") // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { @@ -332,7 +332,7 @@ private[parquet] class ParquetRowConverter( case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { - updater.setLong(timestampRebaseFunc(parquetType.getName, catalystType)(value)) + updater.setLong(timestampRebaseFunc(value)) } } @@ -340,7 +340,7 @@ private[parquet] class ParquetRowConverter( new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { val micros = DateTimeUtils.millisToMicros(value) - updater.setLong(timestampRebaseFunc(parquetType.getName, catalystType)(micros)) + updater.setLong(timestampRebaseFunc(micros)) } } @@ -350,7 +350,7 @@ private[parquet] class ParquetRowConverter( // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value) - val gregorianMicros = int96RebaseFunc(parquetType.getName, catalystType)(julianMicros) + val gregorianMicros = int96RebaseFunc(julianMicros) val adjTime = convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _, ZoneOffset.UTC)) .getOrElse(gregorianMicros) updater.setLong(adjTime) @@ -360,7 +360,7 @@ private[parquet] class ParquetRowConverter( case DateType => new ParquetPrimitiveConverter(updater) { override def addInt(value: Int): Unit = { - updater.set(dateRebaseFunc(parquetType.getName, catalystType)(value)) + updater.set(dateRebaseFunc(value)) } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaColumnarDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaColumnarDataSourceV2.java index 3bdc669dbf311..2f10c84c999f9 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaColumnarDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaColumnarDataSourceV2.java @@ -77,8 +77,8 @@ public PartitionReader createReader(InputPartition partition) { @Override public PartitionReader createColumnarReader(InputPartition partition) { JavaRangeInputPartition p = (JavaRangeInputPartition) partition; - OnHeapColumnVector i = new OnHeapColumnVector(BATCH_SIZE, "", DataTypes.IntegerType); - OnHeapColumnVector j = new OnHeapColumnVector(BATCH_SIZE, "", DataTypes.IntegerType); + OnHeapColumnVector i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType); + OnHeapColumnVector j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType); ColumnVector[] vectors = new ColumnVector[2]; vectors[0] = i; vectors[1] = j; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 951ee81ac7de2..d4a6d84ce2b30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -675,7 +675,7 @@ class BrokenColumnarAdd( } else if (lhs.isInstanceOf[ColumnVector] && rhs.isInstanceOf[ColumnVector]) { val l = lhs.asInstanceOf[ColumnVector] val r = rhs.asInstanceOf[ColumnVector] - val result = new OnHeapColumnVector(batch.numRows(), "", dataType) + val result = new OnHeapColumnVector(batch.numRows(), dataType) ret = result for (i <- 0 until batch.numRows()) { @@ -684,7 +684,7 @@ class BrokenColumnarAdd( } else if (rhs.isInstanceOf[ColumnVector]) { val l = lhs.asInstanceOf[Long] val r = rhs.asInstanceOf[ColumnVector] - val result = new OnHeapColumnVector(batch.numRows(), "", dataType) + val result = new OnHeapColumnVector(batch.numRows(), dataType) ret = result for (i <- 0 until batch.numRows()) { @@ -693,7 +693,7 @@ class BrokenColumnarAdd( } else if (lhs.isInstanceOf[ColumnVector]) { val l = lhs.asInstanceOf[ColumnVector] val r = rhs.asInstanceOf[Long] - val result = new OnHeapColumnVector(batch.numRows(), "", dataType) + val result = new OnHeapColumnVector(batch.numRows(), dataType) ret = result for (i <- 0 until batch.numRows()) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 562596c9fbf8e..49a1078800552 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -682,8 +682,8 @@ object ColumnarReaderFactory extends PartitionReaderFactory { override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { val RangeInputPartition(start, end) = partition new PartitionReader[ColumnarBatch] { - private lazy val i = new OnHeapColumnVector(BATCH_SIZE, "", IntegerType) - private lazy val j = new OnHeapColumnVector(BATCH_SIZE, "", IntegerType) + private lazy val i = new OnHeapColumnVector(BATCH_SIZE, IntegerType) + private lazy val j = new OnHeapColumnVector(BATCH_SIZE, IntegerType) private lazy val batch = new ColumnarBatch(Array(i, j)) private var current = start diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala index 790cda9bfa5b1..111a620df8c24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala @@ -105,7 +105,7 @@ class BooleanBitSetSuite extends SparkFunSuite { assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) - val columnVector = new OnHeapColumnVector(values.length, "", BooleanType) + val columnVector = new OnHeapColumnVector(values.length, BooleanType) decoder.decompress(columnVector, values.length) if (values.nonEmpty) { @@ -175,7 +175,7 @@ class BooleanBitSetSuite extends SparkFunSuite { assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) - val columnVector = new OnHeapColumnVector(numRows, "", BooleanType) + val columnVector = new OnHeapColumnVector(numRows, BooleanType) decoder.decompress(columnVector, numRows) (0 until numRows).foreach { rowNum => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala index 3fc556557fa0e..61e4cc068fa80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala @@ -142,7 +142,7 @@ class DictionaryEncodingSuite extends SparkFunSuite { assertResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) val decoder = DictionaryEncoding.decoder(buffer, columnType) - val columnVector = new OnHeapColumnVector(inputSeq.length, "", columnType.dataType) + val columnVector = new OnHeapColumnVector(inputSeq.length, columnType.dataType) decoder.decompress(columnVector, inputSeq.length) if (inputSeq.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala index af1c5a34e35b0..b5630488b3667 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -136,7 +136,7 @@ class IntegralDeltaSuite extends SparkFunSuite { assertResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt()) val decoder = scheme.decoder(buffer, columnType) - val columnVector = new OnHeapColumnVector(input.length, "", columnType.dataType) + val columnVector = new OnHeapColumnVector(input.length, columnType.dataType) decoder.decompress(columnVector, input.length) if (input.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala index 081d4dfaae4f3..c6fe64d1058ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala @@ -117,7 +117,7 @@ class PassThroughSuite extends SparkFunSuite { assertResult(PassThrough.typeId, "Wrong compression scheme ID")(buffer.getInt()) val decoder = PassThrough.decoder(buffer, columnType) - val columnVector = new OnHeapColumnVector(input.length, "", columnType.dataType) + val columnVector = new OnHeapColumnVector(input.length, columnType.dataType) decoder.decompress(columnVector, input.length) if (input.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala index 110dc6681200b..29dbc13b59c6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala @@ -126,7 +126,7 @@ class RunLengthEncodingSuite extends SparkFunSuite { assertResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) val decoder = RunLengthEncoding.decoder(buffer, columnType) - val columnVector = new OnHeapColumnVector(inputSeq.length, "", columnType.dataType) + val columnVector = new OnHeapColumnVector(inputSeq.length, columnType.dataType) decoder.decompress(columnVector, inputSeq.length) if (inputSeq.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 97be5e5e1221a..247efd5554a8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -38,8 +38,8 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { size: Int, dt: DataType)( block: WritableColumnVector => Unit): Unit = { - withVector(new OnHeapColumnVector(size, "", dt))(block) - withVector(new OffHeapColumnVector(size, "", dt))(block) + withVector(new OnHeapColumnVector(size, dt))(block) + withVector(new OffHeapColumnVector(size, dt))(block) } private def testVectors( @@ -259,7 +259,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } test("[SPARK-22092] off-heap column vector reallocation corrupts array data") { - withVector(new OffHeapColumnVector(8, "", arrayType)) { testVector => + withVector(new OffHeapColumnVector(8, arrayType)) { testVector => val data = testVector.arrayData() (0 until 8).foreach(i => data.putInt(i, i)) (0 until 8).foreach(i => testVector.putArray(i, i, 1)) @@ -275,7 +275,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") { - withVector(new OffHeapColumnVector(8, "", structType)) { testVector => + withVector(new OffHeapColumnVector(8, structType)) { testVector => (0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i)) testVector.reserve(16) (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index eb9f70902add0..f9ae611691a7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -147,7 +147,7 @@ object ColumnarBatchBenchmark extends BenchmarkBase { // Access through the column API with on heap memory val columnOnHeap = { i: Int => - val col = new OnHeapColumnVector(count, "", IntegerType) + val col = new OnHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -166,7 +166,7 @@ object ColumnarBatchBenchmark extends BenchmarkBase { // Access through the column API with off heap memory def columnOffHeap = { i: Int => { - val col = new OffHeapColumnVector(count, "", IntegerType) + val col = new OffHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -185,7 +185,7 @@ object ColumnarBatchBenchmark extends BenchmarkBase { // Access by directly getting the buffer backing the column. val columnOffheapDirect = { i: Int => - val col = new OffHeapColumnVector(count, "", IntegerType) + val col = new OffHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var addr = col.valuesNativeAddress() @@ -251,7 +251,7 @@ object ColumnarBatchBenchmark extends BenchmarkBase { // Adding values by appending, instead of putting. val onHeapAppend = { i: Int => - val col = new OnHeapColumnVector(count, "", IntegerType) + val col = new OnHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -347,9 +347,9 @@ object ColumnarBatchBenchmark extends BenchmarkBase { def column(memoryMode: MemoryMode) = { i: Int => val column = if (memoryMode == MemoryMode.OFF_HEAP) { - new OffHeapColumnVector(count, "", BinaryType) + new OffHeapColumnVector(count, BinaryType) } else { - new OnHeapColumnVector(count, "", BinaryType) + new OnHeapColumnVector(count, BinaryType) } var sum = 0L @@ -378,8 +378,8 @@ object ColumnarBatchBenchmark extends BenchmarkBase { val random = new Random(0) val count = 4 * 1000 - val onHeapVector = new OnHeapColumnVector(count, "", ArrayType(IntegerType)) - val offHeapVector = new OffHeapColumnVector(count, "", ArrayType(IntegerType)) + val onHeapVector = new OnHeapColumnVector(count, ArrayType(IntegerType)) + val offHeapVector = new OffHeapColumnVector(count, ArrayType(IntegerType)) val minSize = 3 val maxSize = 32 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index a6b210277b6ca..bd69bab6f5da2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -44,9 +44,9 @@ class ColumnarBatchSuite extends SparkFunSuite { private def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = { if (memMode == MemoryMode.OFF_HEAP) { - new OffHeapColumnVector(capacity, "", dt) + new OffHeapColumnVector(capacity, dt) } else { - new OnHeapColumnVector(capacity, "", dt) + new OnHeapColumnVector(capacity, dt) } }