-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Arrow, Spark 3.4: Support vectorized reads with struct constants #8466
Arrow, Spark 3.4: Support vectorized reads with struct constants #8466
Conversation
@@ -142,6 +152,12 @@ public ConstantVectorHolder(int numRows, T constantValue) { | |||
this.constantValue = constantValue; | |||
} | |||
|
|||
public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) { | |||
super(icebergField); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each VectorHolder
actually has icebergField
but we were not setting it for constants.
@@ -119,6 +123,10 @@ private enum ReadType { | |||
DICTIONARY | |||
} | |||
|
|||
protected Types.NestedField icebergField() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposing it to all readers so that we can construct typed constant vectors later.
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec; | ||
import scala.collection.Seq; | ||
|
||
public class SparkPlanUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is located in tests and uses AdaptiveSparkPlanHelper
from Spark. Otherwise, I would have to write a lot of ugly Java code to work with Scala SparkPlan
(e.g. unwrap AQE).
@@ -172,7 +177,9 @@ protected void initTable() { | |||
tableName, PARQUET_VECTORIZATION_ENABLED, vectorized); | |||
break; | |||
case "orc": | |||
Assert.assertTrue(vectorized); | |||
sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was not set correctly before, probably from earlier days.
@@ -38,8 +39,10 @@ public ColumnVector build(VectorHolder holder, int numRows) { | |||
if (holder instanceof VectorHolder.DeletedVectorHolder) { | |||
return new DeletedColumnVector(Types.BooleanType.get(), isDeleted); | |||
} else if (holder instanceof ConstantVectorHolder) { | |||
return new ConstantColumnVector( | |||
Types.IntegerType.get(), numRows, ((ConstantVectorHolder<?>) holder).getConstant()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the primary problem: we always assumed metadata columns were integers.
private final Object constant; | ||
private final int batchSize; | ||
|
||
ConstantColumnVector(Type type, int batchSize, Object constant) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed type
to icebergType
cause the parent class already provides type
variable but it is a Spark type.
@@ -114,13 +115,11 @@ public PartitionReaderFactory createReaderFactory() { | |||
|
|||
// conditions for using Parquet batch reads: | |||
// - Parquet vectorization is enabled | |||
// - at least one column is projected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test that projecting at least one data column is not a requirement.
} | ||
|
||
@Test | ||
public void testEmptyTableProjection() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the test that uses an empty projection.
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); | ||
|
||
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); | ||
if (mode(table) == COPY_ON_WRITE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the method isCopyOnWrite
protected and reuse the method isCopyOnWrite
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mode
method is used in a lot of places and it is a bit more reliable cause it checks the table. I used it here so that tests are consistent. We may reconsider that in a follow-up but I don't think it is a big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I got it.
03dc5be
to
09c4283
Compare
@@ -59,12 +59,17 @@ public VectorHolder( | |||
|
|||
// Only used for returning dummy holder | |||
private VectorHolder() { | |||
this(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to think more about untyped null holders. I am not sure it is a good idea to have them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it is arguable, the current solution works as null checks are performed prior to accessing values. Supporting typed null vectors would be a substantial change in our Arrow codebase and I am not convinced it would be worth it. Keeping as-is for now, can be done in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to make icebergField
non-final and protected
. Then we don't have to change the constructor, we can add it in here. Either works to me though since we got a null-check now.
public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) {
this.icebergField = icebergField;
this.numRows = numRows;
this.constantValue = constantValue;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposing fields directly would cause a checkstyle violation. I feel the current approach is simple enough.
09c4283
to
525baba
Compare
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) { | ||
if (setArrowValidityVector) { | ||
reorderedFields.add(VectorizedArrowReader.positionsWithSetArrowValidityVector()); | ||
} else { | ||
reorderedFields.add(VectorizedArrowReader.positions()); | ||
} | ||
} else if (id == MetadataColumns.IS_DELETED.fieldId()) { | ||
reorderedFields.add(new VectorizedArrowReader.DeletedVectorReader()); | ||
reorderedFields.add(new DeletedVectorReader()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to import ConstantVectorReader
to stay on one line above and because we usually prefer direct imports. I changed this line for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left minor comments. Thanks @aokolnychyi for the change. Sorry for the delay.
@@ -59,12 +59,17 @@ public VectorHolder( | |||
|
|||
// Only used for returning dummy holder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly related to your PR. Maybe, we should reword it and reformat it like this
/**
* A dummy holder constructor.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
this(null); | ||
} | ||
|
||
// Only used for creating constant holders for fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a JavaDoc format like this?
/**
* Constructor for constant holders of fields.
*
* @param field the nested field for the holder.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -59,12 +59,17 @@ public VectorHolder( | |||
|
|||
// Only used for returning dummy holder | |||
private VectorHolder() { | |||
this(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to make icebergField
non-final and protected
. Then we don't have to change the constructor, we can add it in here. Either works to me though since we got a null-check now.
public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) {
this.icebergField = icebergField;
this.numRows = numRows;
this.constantValue = constantValue;
}
cf0d278
to
e3b188a
Compare
Our merge-on-read queries can't benefit from vectorized reads because of
_partition
metadata column being projected for the write distribution. This PR adapts our Arrow and Spark 3.4 logic to support such structs.