Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31937][SQL] Support processing ArrayType/MapType/StructType data using no-serde mode script transform #30957

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
adc9ded
[SPARK-31937][SQL] Support processing array and map type using spark …
AngersZhuuuu Dec 29, 2020
6a7438b
Update CatalystTypeConverters.scala
AngersZhuuuu Dec 29, 2020
d3b9cec
fix failed UT
AngersZhuuuu Dec 29, 2020
fdd5225
Update SparkScriptTransformationSuite.scala
AngersZhuuuu Dec 29, 2020
aa16c8f
Update BaseScriptTransformationSuite.scala
AngersZhuuuu Dec 29, 2020
092c927
Update BaseScriptTransformationExec.scala
AngersZhuuuu Dec 29, 2020
9761c0e
Merge branch 'master' into SPARK-31937
AngersZhuuuu Dec 29, 2020
28ad7fa
Update BaseScriptTransformationSuite.scala
AngersZhuuuu Dec 29, 2020
9ac75fc
Merge branch 'master' into SPARK-31937
AngersZhuuuu Jan 4, 2021
33d8b5b
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jan 4, 2021
63f07eb
follow comment
AngersZhuuuu Feb 4, 2021
b631b70
Update BaseScriptTransformationExec.scala
AngersZhuuuu Feb 4, 2021
b7e7f92
follow comment
AngersZhuuuu Feb 5, 2021
8dec5a1
follow comment
AngersZhuuuu Feb 5, 2021
529d54d
Update BaseScriptTransformationExec.scala
AngersZhuuuu Feb 6, 2021
4f0e78f
Avoid construct JsonToStructs repeated
AngersZhuuuu Feb 6, 2021
ed8c54c
remove unused UT
AngersZhuuuu Feb 6, 2021
520f4b8
Update sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScr…
AngersZhuuuu Apr 16, 2021
97f9d58
Merge branch 'master' into SPARK-31937
AngersZhuuuu Apr 16, 2021
b5a4268
[SPARK-35097][SQL] Add column name to SparkUpgradeException about anc…
AngersZhuuuu Apr 18, 2021
76a746e
Revert "[SPARK-35097][SQL] Add column name to SparkUpgradeException a…
AngersZhuuuu Apr 18, 2021
6aa05fc
fix UT
AngersZhuuuu Apr 19, 2021
9e3f808
Revert "fix UT"
AngersZhuuuu Apr 19, 2021
3f51d27
fix UT
AngersZhuuuu Apr 19, 2021
adf8a66
Update sql-migration-guide.md
AngersZhuuuu Apr 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ private[sql] class AvroDeserializer(
private lazy val decimalConversions = new DecimalConversion()

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
datetimeRebaseMode, "Avro")
datetimeRebaseMode, "Avro")(_, _)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu, seems like this commit was sneaked in mistakenly in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu, seems like this commit was sneaked in mistakenly in this PR

Yea..when work on other pr, forgot to change branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")
datetimeRebaseMode, "Avro")(_, _)

private val converter: Any => Option[Any] = try {
rootCatalystType match {
Expand Down Expand Up @@ -126,7 +126,8 @@ private[sql] class AvroDeserializer(
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
updater.setInt(ordinal,
dateRebaseFunc(avroType.getName, catalystType)(value.asInstanceOf[Int]))

case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
Expand All @@ -137,10 +138,10 @@ private[sql] class AvroDeserializer(
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
updater.setLong(ordinal, timestampRebaseFunc(micros))
updater.setLong(ordinal, timestampRebaseFunc(avroType.getName, catalystType)(micros))
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, timestampRebaseFunc(micros))
updater.setLong(ordinal, timestampRebaseFunc(avroType.getName, catalystType)(micros))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,16 @@ object QueryExecutionErrors {
}

def sparkUpgradeInReadingDatesError(
format: String, config: String, option: String): SparkUpgradeException = {
colName: String,
dataType: DataType,
format: String,
config: String,
option: String): SparkUpgradeException = {
new SparkUpgradeException("3.0",
s"""
|reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from $format
|files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of
|files can be ambiguous when read column `${colName}` of datatype `${dataType}`,
|as the files may be written by Spark 2.x or legacy versions of
|Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic
|Gregorian calendar. See more details in SPARK-31404. You can set the SQL config
|'$config' or the datasource option '$option' to 'LEGACY' to rebase the datetime values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ public void initBatch(
for (int i = 0; i < requiredFields.length; i++) {
DataType dt = requiredFields[i].dataType();
if (requestedPartitionColIds[i] != -1) {
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, requiredFields[i].name(),dt);
ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
partitionCol.setIsConstant();
orcVectorWrappers[i] = partitionCol;
} else {
int colId = requestedDataColIds[i];
// Initialize the missing columns once.
if (colId == -1) {
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, requiredFields[i].name(), dt);
missingCol.putNulls(0, capacity);
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,13 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName
return isSupported;
}

static int rebaseDays(int julianDays, final boolean failIfRebase) {
static int rebaseDays(
int julianDays,
final boolean failIfRebase,
WritableColumnVector c) {
if (failIfRebase) {
if (julianDays < RebaseDateTime.lastSwitchJulianDay()) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet");
} else {
return julianDays;
}
Expand All @@ -205,10 +208,11 @@ static int rebaseDays(int julianDays, final boolean failIfRebase) {
private static long rebaseTimestamp(
long julianMicros,
final boolean failIfRebase,
WritableColumnVector c,
final String format) {
if (failIfRebase) {
if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
throw DataSourceUtils.newRebaseExceptionInRead(format);
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), format);
} else {
return julianMicros;
}
Expand All @@ -217,12 +221,18 @@ private static long rebaseTimestamp(
}
}

static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
return rebaseTimestamp(julianMicros, failIfRebase, "Parquet");
static long rebaseMicros(
long julianMicros,
final boolean failIfRebase,
WritableColumnVector c) {
return rebaseTimestamp(julianMicros, failIfRebase, c, "Parquet");
}

static long rebaseInt96(long julianMicros, final boolean failIfRebase) {
return rebaseTimestamp(julianMicros, failIfRebase, "Parquet INT96");
static long rebaseInt96(
long julianMicros,
final boolean failIfRebase,
WritableColumnVector c) {
return rebaseTimestamp(julianMicros, failIfRebase, c, "Parquet INT96");
}

/**
Expand Down Expand Up @@ -387,7 +397,7 @@ private void decodeDictionaryIds(
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
column.putInt(i, rebaseDays(julianDays, failIfRebase));
column.putInt(i, rebaseDays(julianDays, failIfRebase, column));
}
}
} else {
Expand Down Expand Up @@ -432,7 +442,7 @@ private void decodeDictionaryIds(
if (!column.isNullAt(i)) {
long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
long julianMicros = DateTimeUtils.millisToMicros(julianMillis);
column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase, column));
}
}
}
Expand All @@ -441,7 +451,7 @@ private void decodeDictionaryIds(
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase, column));
}
}
} else {
Expand Down Expand Up @@ -480,7 +490,7 @@ private void decodeDictionaryIds(
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
column.putLong(i, gregorianMicros);
}
}
Expand All @@ -500,7 +510,7 @@ private void decodeDictionaryIds(
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(i, adjTime);
}
Expand Down Expand Up @@ -640,7 +650,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
long julianMicros = DateTimeUtils.millisToMicros(dataColumn.readLong());
column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase));
column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase, column));
} else {
column.putNull(rowId + i);
}
Expand Down Expand Up @@ -698,7 +708,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
column.putLong(rowId + i, gregorianMicros);
} else {
column.putNull(rowId + i);
Expand All @@ -722,7 +732,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase, column);
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(rowId + i, adjTime);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public final void readIntegersWithRebase(
}
if (rebase) {
if (failIfRebase) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet");
} else {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
Expand Down Expand Up @@ -164,7 +164,7 @@ public final void readLongsWithRebase(
}
if (rebase) {
if (failIfRebase) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
throw DataSourceUtils.newRebaseExceptionInRead(c.colName, c.dataType(), "Parquet");
} else {
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ public void readIntegersWithRebase(
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
int julianDays = data.readInteger();
c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase));
c.putInt(rowId + i,
VectorizedColumnReader.rebaseDays(julianDays, failIfRebase, c));
} else {
c.putNull(rowId + i);
}
Expand Down Expand Up @@ -492,7 +493,7 @@ public void readLongsWithRebase(
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
long julianMicros = data.readLong();
c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase));
c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase, c));
} else {
c.putNull(rowId + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructType sch
public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
vectors[i] = new OffHeapColumnVector(capacity, fields[i].name(), fields[i].dataType());
}
return vectors;
}
Expand All @@ -64,8 +64,8 @@ public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[]
private long lengthData;
private long offsetData;

public OffHeapColumnVector(int capacity, DataType type) {
super(capacity, type);
public OffHeapColumnVector(int capacity, String colName, DataType type) {
super(capacity, colName, type);

nulls = 0;
data = 0;
Expand Down Expand Up @@ -566,7 +566,7 @@ protected void reserveInternal(int newCapacity) {
}

@Override
protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
return new OffHeapColumnVector(capacity, type);
protected OffHeapColumnVector reserveNewColumn(int capacity, String colName, DataType type) {
return new OffHeapColumnVector(capacity, colName, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructType sche
public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
vectors[i] = new OnHeapColumnVector(capacity, fields[i].name(), fields[i].dataType());
}
return vectors;
}
Expand All @@ -73,8 +73,8 @@ public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] f
private int[] arrayLengths;
private int[] arrayOffsets;

public OnHeapColumnVector(int capacity, DataType type) {
super(capacity, type);
public OnHeapColumnVector(int capacity, String colName, DataType type) {
super(capacity, colName, type);

reserveInternal(capacity);
reset();
Expand Down Expand Up @@ -580,7 +580,7 @@ protected void reserveInternal(int newCapacity) {
}

@Override
protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
return new OnHeapColumnVector(capacity, type);
protected OnHeapColumnVector reserveNewColumn(int capacity, String colName, DataType type) {
return new OnHeapColumnVector(capacity, colName, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void setDictionary(Dictionary dictionary) {
*/
public WritableColumnVector reserveDictionaryIds(int capacity) {
if (dictionaryIds == null) {
dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType);
dictionaryIds = reserveNewColumn(capacity, colName, DataTypes.IntegerType);
} else {
dictionaryIds.reset();
dictionaryIds.reserve(capacity);
Expand Down Expand Up @@ -677,6 +677,11 @@ public WritableColumnVector arrayData() {
*/
public final void setIsConstant() { isConstant = true; }

/**
* Column name of this column.
*/
public String colName;

/**
* Maximum number of rows that can be stored in this column.
*/
Expand Down Expand Up @@ -717,7 +722,7 @@ public WritableColumnVector arrayData() {
/**
* Reserve a new column.
*/
protected abstract WritableColumnVector reserveNewColumn(int capacity, DataType type);
protected abstract WritableColumnVector reserveNewColumn(int capacity, String colName, DataType type);

protected boolean isArray() {
return type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType ||
Expand All @@ -728,8 +733,9 @@ protected boolean isArray() {
* Sets up the common state and also handles creating the child columns if this is a nested
* type.
*/
protected WritableColumnVector(int capacity, DataType type) {
protected WritableColumnVector(int capacity, String colName, DataType type) {
super(type);
this.colName = colName;
this.capacity = capacity;

if (isArray()) {
Expand All @@ -742,24 +748,25 @@ protected WritableColumnVector(int capacity, DataType type) {
childCapacity *= DEFAULT_ARRAY_LENGTH;
}
this.childColumns = new WritableColumnVector[1];
this.childColumns[0] = reserveNewColumn(childCapacity, childType);
this.childColumns[0] = reserveNewColumn(childCapacity, colName + ".elem", childType);
} else if (type instanceof StructType) {
StructType st = (StructType)type;
this.childColumns = new WritableColumnVector[st.fields().length];
for (int i = 0; i < childColumns.length; ++i) {
this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType());
this.childColumns[i] = reserveNewColumn(capacity, colName + "." + st.fields()[i].name(),
st.fields()[i].dataType());
}
} else if (type instanceof MapType) {
MapType mapType = (MapType) type;
this.childColumns = new WritableColumnVector[2];
this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
this.childColumns[0] = reserveNewColumn(capacity, colName + ".key", mapType.keyType());
this.childColumns[1] = reserveNewColumn(capacity, colName + ".value", mapType.valueType());
} else if (type instanceof CalendarIntervalType) {
// Three columns. Months as int. Days as Int. Microseconds as Long.
this.childColumns = new WritableColumnVector[3];
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType);
this.childColumns[0] = reserveNewColumn(capacity, colName + ".months", DataTypes.IntegerType);
this.childColumns[1] = reserveNewColumn(capacity, colName + ".days", DataTypes.IntegerType);
this.childColumns[2] = reserveNewColumn(capacity, colName + ".microseconds", DataTypes.LongType);
} else {
this.childColumns = null;
}
Expand Down
Loading