Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: add support for null vectors #10953

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ac6440a
#10275 - fix NullPointerException
sl255051 May 8, 2024
becf6f7
Change how the unit test asserts the correct exception is thrown
sl255051 May 8, 2024
4e2cb86
Remove test dependency on Apache Spark
sl255051 May 8, 2024
1193d02
Merge branch 'main' into issue-10275
sl255051 May 28, 2024
12bc3de
Add new unit test
sl255051 Jun 11, 2024
d8f3e13
Merge branch 'apache:main' into issue-10275
slessard Jun 11, 2024
bb4e010
Add comments to unit test
sl255051 Jun 12, 2024
6e7a1aa
Merge branch 'issue-10275' of https://github.com/slessard/iceberg int…
sl255051 Jun 12, 2024
28451a5
Update arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowR…
slessard Jun 14, 2024
24a9932
Update arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowR…
slessard Jun 14, 2024
9bcb2b1
Address code review comments
sl255051 Jun 14, 2024
7a25b52
Merge branch 'apache:main' into issue-10275
slessard Jun 14, 2024
a31bf94
Merge branch 'main' into issue-10275
sl255051 Jul 29, 2024
44a7f91
Merge branch 'main' into issue-10275
sl255051 Aug 5, 2024
c2eaf24
Merge branch 'apache:main' into issue-10275
slessard Aug 9, 2024
e323db7
DRAFT: alternate solution 2: hack in support for NullVector
slessard Aug 10, 2024
061ab02
Merge branch 'apache:main' into issue-10275-alt2
slessard Aug 12, 2024
bf0c905
Issue 10275 - Add rough draft vector support for null columns
slessard Aug 13, 2024
5610dd4
Merge branch 'issue-10275-alt2' into issue-10275-alt3
slessard Aug 13, 2024
a13415d
Merge branch 'main' into issue-10275-alt3
slessard Aug 13, 2024
2eaa63f
Merge branch 'main' into issue-10275-alt3
slessard Aug 16, 2024
62108da
remove obsolete comment; adapt unit test to match new functionality
slessard Aug 16, 2024
7115e93
Merge branch 'apache:main' into issue-10275-alt3
slessard Aug 16, 2024
08bb07c
Address code review feedback
slessard Sep 5, 2024
442b381
Add a NullabilityHolder instance to the NullVector instance
slessard Sep 5, 2024
5e7668e
Merge branch 'apache:main' into issue-10275-alt3
slessard Sep 6, 2024
83913a0
Remove test class GenericArrowVectorAccessorFactoryTest
slessard Sep 9, 2024
e2b428e
Fix compile error; format source code
slessard Sep 9, 2024
7ffa7ed
Address code review comments
slessard Sep 11, 2024
cda0423
Adopt changes suggested by @nastra in code review
slessard Sep 17, 2024
9aec9e5
Update unit test to add a second row to the table being tested
slessard Sep 17, 2024
0c87dc7
Code cleanup
slessard Sep 19, 2024
e5eebd0
Undo adding a second row to the table
slessard Sep 20, 2024
fe60793
Expand calls to checkAllVectorTypes and checkAllVectorValues
slessard Sep 20, 2024
1a3896b
replace hard-coded magic values with descriptively named variables
slessard Sep 20, 2024
5c3b460
Add unit tests for VectorHolder
slessard Sep 24, 2024
a2df95c
Update `isDummy` method to remove one condition that would never be r…
slessard Sep 24, 2024
bbc776d
Fix code style issues
slessard Sep 25, 2024
2bf5b2f
Update VectorHolder unit tests for isDummy method
slessard Sep 26, 2024
1edd680
Convert to fluent assertions
slessard Sep 26, 2024
e1b3931
inline variables that are only used once; remove `this.` prefix
slessard Sep 26, 2024
e574623
Merge branch 'main' into issue-10275-alt3
slessard Sep 26, 2024
c8bcc1c
Update arrow/src/main/java/org/apache/iceberg/arrow/vectorized/Vector…
nastra Sep 27, 2024
da9e514
Only create a NullVector when the constant value is null
slessard Sep 27, 2024
fe83726
Merge remote-tracking branch 'origin/issue-10275-alt3' into issue-102…
slessard Sep 27, 2024
01f96f0
Correct the comment in a test method
slessard Sep 30, 2024
f509e47
Add one more unit test
slessard Sep 30, 2024
163ee62
Make style changes as requested in code review feedback
slessard Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
// Handle null vector for constant case
columnVectors[i] = new ColumnVector(vectorHolders[i]);
}
return new ColumnarBatch(numRowsToRead, columnVectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
Expand Down Expand Up @@ -177,6 +178,7 @@ public ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getVecto
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getPlainVectorAccessor(
FieldVector vector, PrimitiveType primitive) {
Preconditions.checkArgument(null != vector, "Invalid field vector: null");
if (vector instanceof BitVector) {
return new BooleanAccessor<>((BitVector) vector);
} else if (vector instanceof IntVector) {
Expand Down Expand Up @@ -220,6 +222,8 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getPlai
}
return new FixedSizeBinaryAccessor<>(
(FixedSizeBinaryVector) vector, stringFactorySupplier.get());
} else if (vector instanceof NullVector) {
return new NullAccessor<>((NullVector) vector);
slessard marked this conversation as resolved.
Show resolved Hide resolved
}
throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
}
Expand All @@ -244,6 +248,15 @@ public final boolean getBoolean(int rowId) {
}
}

private static class NullAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

NullAccessor(NullVector vector) {
super(vector);
}
}

private static class IntAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.arrow.vectorized;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullVector;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -128,7 +129,7 @@ public static VectorHolder dummyHolder(int numRows) {
}

public boolean isDummy() {
return vector == null;
return vector == null || vector instanceof NullVector;
}

/**
Expand All @@ -140,12 +141,20 @@ public static class ConstantVectorHolder<T> extends VectorHolder {
private final int numRows;

public ConstantVectorHolder(int numRows) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we keeping this constructor? Is it needed if we can make a NullVectorHolder?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor could be deleted but that would be an API breaking change because this is a public constructor in a public class. Which would you prefer, an extra constructor or a breaking change?

super(new NullVector("_dummy_", numRows), null, new NullabilityHolder(numRows));
nullabilityHolder().setNulls(0, numRows);
this.numRows = numRows;
this.constantValue = null;
}

public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) {
super(icebergField);
super(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of placing an if-statement in the constructor itself, it feels more like we should just have a different constructor or class?

Copy link
Author

@slessard slessard Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer In an earlier draft of this PR I did create a NullVectorHolder class but was told to instead adapt ConstantVectorHolder to work for my case, so I did that.

Here's a patch to add a new constructor, but it has at least one issue. Passing null as the constant value in the existing constructor is no longer allowed. This is a semantic breaking change, though not an API breaking change. What this means is that third party code that upgrades to a version of Apache Iceberg containing this patch that was previously using this constructor and passing in a null constant value will still compile fine but will have a runtime failure. That doesn't seem like an appealing solution to me.

I think adding a new class such as NullVectorHolder has its own issue. What would be the use case for a ConstantVectorHolder containing a null value versus a NullVectorHolder containing a null value?

Which solution would you prefer?

  1. ternary operator in the super call
  2. Overloaded constructor with semantics breaking change
  3. New class, NullvectorHolder, with ambiguous use case
Subject: [PATCH] Changes
---
Index: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java	(revision 163ee62ce52bc9611198325997010c7e2b793c71)
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java	(date 1728512295607)
@@ -148,16 +148,21 @@
     }
 
     public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) {
+      super(icebergField);
+      Preconditions.checkNotNull(constantValue, "Constant value cannot be null");
+      this.numRows = numRows;
+      this.constantValue = constantValue;
+    }
+
+    public ConstantVectorHolder(Types.NestedField icebergField, int numRows) {
       super(
-          (null == constantValue) ? new NullVector(icebergField.name(), numRows) : null,
-          icebergField,
-          new NullabilityHolder(numRows));
-      if (null == constantValue) {
-        nullabilityHolder().setNulls(0, numRows);
-      }
+              new NullVector(icebergField.name(), numRows),
+              icebergField,
+              new NullabilityHolder(numRows));
+      nullabilityHolder().setNulls(0, numRows);
 
       this.numRows = numRows;
-      this.constantValue = constantValue;
+      this.constantValue = null;
     }
 
     @Override

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok having gone through the code I think I may have a strategy here but please let me know what i'm missing in this interpretation.

ConstantVectorHolder is used as a dummy holder prior to this commit. It has no vector , column descriptor, etc ... everything is null. The current approach sometimes makes a ConstantVectorHolder which sometimes does have something inside of it. This is done so that GenericArrowVectorAccessorFactory will have something to work with.

From what I can see in GenericArrowVectorAccessorFactory is that it doesn't work with ConstantVectorHolders at all. It currently can only handle cases in which holder.vector() is a non null type and matches some known vector type. Spark on the other hand, does not actually use this path and when it sees a ConstantVectorHolder it instead creates a o.a.i.s.ConstantColumnVector. This is why I don't think Spark has any issue with this. (cc @amogh-jahagirdar )

Now this makes me think what we really need is to just fix GenericArrowVectorAccessorFactory to handle the case where the "vector" is null and in that circumstance attempt to cast the VectorHolder to a ConstantVectorHolder and create the appropriate vector type or accessor. I think we have a few options to do this.

I think the easiest may be to just create an accessor that looks like the Spark ConstantColumnVector and just use that as our generic accessor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The another approach is to directly mimic the Spark implementation and fork here

And if the holder is a ConstantVectorHolder set the vector to a new ConstantVector which we define similar to the Spark ConstantColumnVector code again.

(null == constantValue) ? new NullVector(icebergField.name(), numRows) : null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(null == constantValue) ? new NullVector(icebergField.name(), numRows) : null,
null == constantValue ? new NullVector(icebergField.name(), numRows) : null,

icebergField,
new NullabilityHolder(numRows));
if (null == constantValue) {
nullabilityHolder().setNulls(0, numRows);
}
slessard marked this conversation as resolved.
Show resolved Hide resolved
this.numRows = numRows;
this.constantValue = constantValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ public static VectorizedArrowReader nulls() {
return NullVectorReader.INSTANCE;
}

public static VectorizedArrowReader nulls(Types.NestedField icebergField) {
return new NullVectorReader(icebergField);
}

public static VectorizedArrowReader positions() {
return new PositionVectorReader(false);
}
Expand All @@ -464,11 +468,15 @@ public static VectorizedArrowReader positionsWithSetArrowValidityVector() {
}

private static final class NullVectorReader extends VectorizedArrowReader {
private static final NullVectorReader INSTANCE = new NullVectorReader();
private static final NullVectorReader INSTANCE = new NullVectorReader(null);

private NullVectorReader(Types.NestedField icebergField) {
super(icebergField);
}

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.dummyHolder(numValsToRead);
return new VectorHolder.ConstantVectorHolder<>(icebergField(), numValsToRead, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public VectorizedReader<?> message(
} else if (reader != null) {
reorderedFields.add(reader);
} else {
reorderedFields.add(VectorizedArrowReader.nulls());
nastra marked this conversation as resolved.
Show resolved Hide resolved
reorderedFields.add(VectorizedArrowReader.nulls(field));
}
}
return vectorizedReader(reorderedFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
Expand All @@ -59,6 +60,7 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand All @@ -70,6 +72,7 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand Down Expand Up @@ -262,6 +265,142 @@ public void testReadColumnFilter2() throws Exception {
scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ImmutableList.of("timestamp"));
}

@Test
public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception {
rowsWritten = Lists.newArrayList();
tables = new HadoopTables();

List<Field> expectedFields =
ImmutableList.of(
new Field("a", new FieldType(false, MinorType.INT.getType(), null), null),
new Field("b", new FieldType(true, MinorType.INT.getType(), null), null),
new Field("z", new FieldType(true, MinorType.NULL.getType(), null), null));
org.apache.arrow.vector.types.pojo.Schema expectedSchema =
new org.apache.arrow.vector.types.pojo.Schema(expectedFields);

int batchSize = 1;
int expectedNumRowsPerBatch = 1;
int expectedTotalRows = 1;
slessard marked this conversation as resolved.
Show resolved Hide resolved

Schema tableSchemaV1 =
slessard marked this conversation as resolved.
Show resolved Hide resolved
new Schema(
Types.NestedField.required(1, "a", Types.IntegerType.get()),
Types.NestedField.optional(2, "b", Types.IntegerType.get()));

PartitionSpec spec = PartitionSpec.builderFor(tableSchemaV1).build();
Table table = tables.create(tableSchemaV1, spec, tableLocation);

// Add one record to the table
GenericRecord rec = GenericRecord.create(tableSchemaV1);
rec.setField("a", 1);
List<GenericRecord> genericRecords = Lists.newArrayList();
genericRecords.add(rec);

AppendFiles appendFiles = table.newAppend();
appendFiles.appendFile(writeParquetFile(table, genericRecords));
appendFiles.commit();

// Alter the table schema by adding a new, optional column.
// Do not add any data for this new column in the one existing row in the table
// and do not insert any new rows into the table.
Comment on lines +299 to +301
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be confusing vectorized read paths but I'm curious why this isn't reproducible in Spark vectorized reads? Or is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try and repro in a unit test for Spark and see if it's the case. To be clear I don't want to hold up this PR on that though since it does seem like a legitimate problem based on the test being done here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never tried reproducing the issue in Spark. My guess as to why there was no existing test case is that null vectors isn't a bug so much as it is a subfeature that was never implemented. No sense in creating a test for a subfeature that was knowingly never implemented.

What makes me say null vector support was knowingly never implemented? Look at the removed comment in arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java. That comment wasn't a "this is how this code works" type of comment. In my opinion that comment is a TODO comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to explore and see how this looks in Spark's vectorized path

UpdateSchema updateSchema = table.updateSchema().addColumn("z", Types.IntegerType.get());
updateSchema.apply();
updateSchema.commit();

// Select all columns, all rows from the table
TableScan scan = table.newScan().select("*");

Set<String> columns = ImmutableSet.of("a", "b", "z");
// Read the data and verify that the returned ColumnarBatches match expected rows.
try (VectorizedTableScanIterable itr =
new VectorizedTableScanIterable(scan, batchSize, false)) {
int rowIndex = 0;
for (ColumnarBatch batch : itr) {
List<GenericRecord> expectedRows =
rowsWritten.subList(rowIndex, rowIndex + expectedNumRowsPerBatch);
rowIndex++;

assertThat(batch.numRows()).isEqualTo(expectedNumRowsPerBatch);
assertThat(batch.numCols()).isEqualTo(columns.size());

checkColumnarArrayValues(
expectedNumRowsPerBatch,
expectedRows,
batch,
0,
columns,
"a",
(records, i) -> records.get(i).getField("a"),
ColumnVector::getInt);
checkColumnarArrayValues(
expectedNumRowsPerBatch,
expectedRows,
batch,
1,
columns,
"b",
(records, i) -> records.get(i).getField("b"),
(columnVector, i) -> columnVector.isNullAt(i) ? null : columnVector.getInt(i));
checkColumnarArrayValues(
expectedNumRowsPerBatch,
expectedRows,
batch,
2,
columns,
"z",
(records, i) -> records.get(i).getField("z"),
(columnVector, i) -> columnVector.isNullAt(i) ? null : columnVector.getInt(i));
}
}

// Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows.
slessard marked this conversation as resolved.
Show resolved Hide resolved
try (VectorizedTableScanIterable itr =
new VectorizedTableScanIterable(scan, batchSize, false)) {
int totalRows = 0;
int rowIndex = 0;
for (ColumnarBatch batch : itr) {
List<GenericRecord> expectedRows =
rowsWritten.subList(rowIndex, rowIndex + expectedNumRowsPerBatch);
rowIndex++;
VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
assertThat(root.getSchema()).isEqualTo(expectedSchema);

// check all vector types
assertThat(root.getVector("a").getClass()).isEqualTo(IntVector.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(root.getVector("a").getClass()).isEqualTo(IntVector.class);
assertThat(root.getVector("a")).isInstanceOf(IntVector.class);

please also update all other places in this test class

assertThat(root.getVector("b").getClass()).isEqualTo(IntVector.class);
assertThat(root.getVector("z").getClass()).isEqualTo(NullVector.class);

checkVectorValues(
expectedNumRowsPerBatch,
expectedRows,
root,
columns,
"a",
(records, i) -> records.get(i).getField("a"),
(vector, i) -> ((IntVector) vector).get(i));
checkVectorValues(
expectedNumRowsPerBatch,
expectedRows,
root,
columns,
"b",
(records, i) -> records.get(i).getField("b"),
(vector, i) -> vector.isNull(i) ? null : ((IntVector) vector).get(i));
checkVectorValues(
expectedNumRowsPerBatch,
expectedRows,
root,
columns,
"z",
(records, i) -> records.get(i).getField("z"),
(vector, i) -> vector.getObject(i));

totalRows += root.getRowCount();
assertThat(totalRows).isEqualTo(expectedTotalRows);
}
}
}

/**
* The test asserts that {@link CloseableIterator#hasNext()} returned by the {@link ArrowReader}
* is idempotent.
Expand Down
Loading