Skip to content

Commit

Permalink
Revert "[SPARK-35097][SQL] Add column name to SparkUpgradeException a…
Browse files Browse the repository at this point in the history
…bout ancient datetime"

This reverts commit b5a4268.
  • Loading branch information
AngersZhuuuu committed Apr 18, 2021
1 parent b5a4268 commit 76a746e
Show file tree
Hide file tree
Showing 22 changed files with 88 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ private[sql] class AvroDeserializer(
private lazy val decimalConversions = new DecimalConversion()

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
datetimeRebaseMode, "Avro")(_, _)
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")(_, _)
datetimeRebaseMode, "Avro")

private val converter: Any => Option[Any] = try {
rootCatalystType match {
Expand Down Expand Up @@ -126,8 +126,7 @@ private[sql] class AvroDeserializer(
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal,
dateRebaseFunc(avroType.getName, catalystType)(value.asInstanceOf[Int]))
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))

case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
Expand All @@ -138,10 +137,10 @@ private[sql] class AvroDeserializer(
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
updater.setLong(ordinal, timestampRebaseFunc(avroType.getName, catalystType)(micros))
updater.setLong(ordinal, timestampRebaseFunc(micros))
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, timestampRebaseFunc(avroType.getName, catalystType)(micros))
updater.setLong(ordinal, timestampRebaseFunc(micros))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,11 @@ object QueryExecutionErrors {
}

def sparkUpgradeInReadingDatesError(
colName: String,
dataType: DataType,
format: String,
config: String,
option: String): SparkUpgradeException = {
format: String, config: String, option: String): SparkUpgradeException = {
new SparkUpgradeException("3.0",
s"""
|reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from $format
|files can be ambiguous when read column `${colName}` of datatype `${dataType}`,
|as the files may be written by Spark 2.x or legacy versions of
|files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of
|Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic
|Gregorian calendar. See more details in SPARK-31404. You can set the SQL config
|'$config' or the datasource option '$option' to 'LEGACY' to rebase the datetime values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ public void initBatch(
for (int i = 0; i < requiredFields.length; i++) {
DataType dt = requiredFields[i].dataType();
if (requestedPartitionColIds[i] != -1) {
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, requiredFields[i].name(),dt);
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
partitionCol.setIsConstant();
orcVectorWrappers[i] = partitionCol;
} else {
int colId = requestedDataColIds[i];
// Initialize the missing columns once.
if (colId == -1) {
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, requiredFields[i].name(), dt);
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
missingCol.putNulls(0, capacity);
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,10 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName
return isSupported;
}

static int rebaseDays(
int julianDays,
final boolean failIfRebase,
WritableColumnVector c) {
static int rebaseDays(int julianDays, final boolean failIfRebase) {
if (failIfRebase) {
if (julianDays < RebaseDateTime.lastSwitchJulianDay()) {
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet");
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
} else {
return julianDays;
}
Expand All @@ -208,11 +205,10 @@ static int rebaseDays(
private static long rebaseTimestamp(
long julianMicros,
final boolean failIfRebase,
WritableColumnVector c,
final String format) {
if (failIfRebase) {
if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), format);
throw DataSourceUtils.newRebaseExceptionInRead(format);
} else {
return julianMicros;
}
Expand All @@ -221,18 +217,12 @@ private static long rebaseTimestamp(
}
}

static long rebaseMicros(
long julianMicros,
final boolean failIfRebase,
WritableColumnVector c) {
return rebaseTimestamp(julianMicros, failIfRebase, c, "Parquet");
static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
return rebaseTimestamp(julianMicros, failIfRebase, "Parquet");
}

static long rebaseInt96(
long julianMicros,
final boolean failIfRebase,
WritableColumnVector c) {
return rebaseTimestamp(julianMicros, failIfRebase, c, "Parquet INT96");
static long rebaseInt96(long julianMicros, final boolean failIfRebase) {
return rebaseTimestamp(julianMicros, failIfRebase, "Parquet INT96");
}

/**
Expand Down Expand Up @@ -397,7 +387,7 @@ private void decodeDictionaryIds(
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
column.putInt(i, rebaseDays(julianDays, failIfRebase, column));
column.putInt(i, rebaseDays(julianDays, failIfRebase));
}
}
} else {
Expand Down Expand Up @@ -442,7 +432,7 @@ private void decodeDictionaryIds(
if (!column.isNullAt(i)) {
long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
long julianMicros = DateTimeUtils.millisToMicros(julianMillis);
column.putLong(i, rebaseMicros(julianMicros, failIfRebase, column));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
}
}
}
Expand All @@ -451,7 +441,7 @@ private void decodeDictionaryIds(
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase, column));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
}
}
} else {
Expand Down Expand Up @@ -490,7 +480,7 @@ private void decodeDictionaryIds(
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
column.putLong(i, gregorianMicros);
}
}
Expand All @@ -510,7 +500,7 @@ private void decodeDictionaryIds(
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(i, adjTime);
}
Expand Down Expand Up @@ -650,7 +640,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
long julianMicros = DateTimeUtils.millisToMicros(dataColumn.readLong());
column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase, column));
column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase));
} else {
column.putNull(rowId + i);
}
Expand Down Expand Up @@ -708,7 +698,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
column.putLong(rowId + i, gregorianMicros);
} else {
column.putNull(rowId + i);
Expand All @@ -732,7 +722,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(rowId + i, adjTime);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public final void readIntegersWithRebase(
}
if (rebase) {
if (failIfRebase) {
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet");
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
} else {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
Expand Down Expand Up @@ -164,7 +164,7 @@ public final void readLongsWithRebase(
}
if (rebase) {
if (failIfRebase) {
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet");
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
} else {
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ public void readIntegersWithRebase(
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
int julianDays = data.readInteger();
c.putInt(rowId + i,
VectorizedColumnReader.rebaseDays(julianDays, failIfRebase, c));
c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase));
} else {
c.putNull(rowId + i);
}
Expand Down Expand Up @@ -493,7 +492,7 @@ public void readLongsWithRebase(
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
long julianMicros = data.readLong();
c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase, c));
c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase));
} else {
c.putNull(rowId + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructType sch
public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new OffHeapColumnVector(capacity, fields[i].name(), fields[i].dataType());
vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
}
return vectors;
}
Expand All @@ -64,8 +64,8 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[]
private long lengthData;
private long offsetData;

public OffHeapColumnVector(int capacity, String colName, DataType type) {
super(capacity, colName, type);
public OffHeapColumnVector(int capacity, DataType type) {
super(capacity, type);

nulls = 0;
data = 0;
Expand Down Expand Up @@ -566,7 +566,7 @@ protected void reserveInternal(int newCapacity) {
}

@Override
protected OffHeapColumnVector reserveNewColumn(int capacity, String colName, DataType type) {
return new OffHeapColumnVector(capacity, colName, type);
protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
return new OffHeapColumnVector(capacity, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructType sche
public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new OnHeapColumnVector(capacity, fields[i].name(), fields[i].dataType());
vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
}
return vectors;
}
Expand All @@ -73,8 +73,8 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] f
private int[] arrayLengths;
private int[] arrayOffsets;

public OnHeapColumnVector(int capacity, String colName, DataType type) {
super(capacity, colName, type);
public OnHeapColumnVector(int capacity, DataType type) {
super(capacity, type);

reserveInternal(capacity);
reset();
Expand Down Expand Up @@ -580,7 +580,7 @@ protected void reserveInternal(int newCapacity) {
}

@Override
protected OnHeapColumnVector reserveNewColumn(int capacity, String colName, DataType type) {
return new OnHeapColumnVector(capacity, colName, type);
protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
return new OnHeapColumnVector(capacity, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void setDictionary(Dictionary dictionary) {
*/
public WritableColumnVector reserveDictionaryIds(int capacity) {
if (dictionaryIds == null) {
dictionaryIds = reserveNewColumn(capacity, colName, DataTypes.IntegerType);
dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType);
} else {
dictionaryIds.reset();
dictionaryIds.reserve(capacity);
Expand Down Expand Up @@ -677,11 +677,6 @@ public WritableColumnVector arrayData() {
*/
public final void setIsConstant() { isConstant = true; }

/**
* Column name of this column.
*/
public String colName;

/**
* Maximum number of rows that can be stored in this column.
*/
Expand Down Expand Up @@ -722,7 +717,7 @@ public WritableColumnVector arrayData() {
/**
* Reserve a new column.
*/
protected abstract WritableColumnVector reserveNewColumn(int capacity, String colName, DataType type);
protected abstract WritableColumnVector reserveNewColumn(int capacity, DataType type);

protected boolean isArray() {
return type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType ||
Expand All @@ -733,9 +728,8 @@ protected boolean isArray() {
* Sets up the common state and also handles creating the child columns if this is a nested
* type.
*/
protected WritableColumnVector(int capacity, String colName, DataType type) {
protected WritableColumnVector(int capacity, DataType type) {
super(type);
this.colName = colName;
this.capacity = capacity;

if (isArray()) {
Expand All @@ -748,25 +742,24 @@ protected WritableColumnVector(int capacity, String colName, DataType type) {
childCapacity *= DEFAULT_ARRAY_LENGTH;
}
this.childColumns = new WritableColumnVector[1];
this.childColumns[0] = reserveNewColumn(childCapacity, colName + ".elem", childType);
this.childColumns[0] = reserveNewColumn(childCapacity, childType);
} else if (type instanceof StructType) {
StructType st = (StructType)type;
this.childColumns = new WritableColumnVector[st.fields().length];
for (int i = 0; i < childColumns.length; ++i) {
this.childColumns[i] = reserveNewColumn(capacity, colName + "." + st.fields()[i].name(),
st.fields()[i].dataType());
this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType());
}
} else if (type instanceof MapType) {
MapType mapType = (MapType) type;
this.childColumns = new WritableColumnVector[2];
this.childColumns[0] = reserveNewColumn(capacity, colName + ".key", mapType.keyType());
this.childColumns[1] = reserveNewColumn(capacity, colName + ".value", mapType.valueType());
this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
} else if (type instanceof CalendarIntervalType) {
// Three columns. Months as int. Days as Int. Microseconds as Long.
this.childColumns = new WritableColumnVector[3];
this.childColumns[0] = reserveNewColumn(capacity, colName + ".months", DataTypes.IntegerType);
this.childColumns[1] = reserveNewColumn(capacity, colName + ".days", DataTypes.IntegerType);
this.childColumns[2] = reserveNewColumn(capacity, colName + ".microseconds", DataTypes.LongType);
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType);
} else {
this.childColumns = null;
}
Expand Down
Loading

0 comments on commit 76a746e

Please sign in to comment.