-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark partial limit push down #10943
base: main
Are you sure you want to change the base?
Conversation
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
Show resolved
Hide resolved
@@ -1151,6 +1152,11 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { | |||
return this; | |||
} | |||
|
|||
public ReadBuilder pushedlimit(int limit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert this is greater than 1? I assume and input 0 or negative is a bad call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are OK because Spark has assert for limit. If the limit is negative, e.g. SELECT * FROM table limit -2
, Spark will throw
org.apache.spark.sql.AnalysisException: [INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE] The limit like expression "-2" is invalid. The limit expression must be equal to or greater than 0, but got -2.;
If the limit is 0, e.g. SELECT * FROM table limit 0
, Spark changes this to an empty table scan and it won't reach here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is public though, and anyone can call it so we can't rely on Spark to cover us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I forgot that. I've added a check to throw an IllegalArgumentException
if the limit is <= 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we do Preconditions.checkArgument
for things like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to Preconditions.checkArgument
. Thanks
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Show resolved
Hide resolved
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
advance(); | ||
int expectedBatchSize; | ||
if (numValsToRead < 0) { | ||
throw new IllegalStateException("numValsToRead has invalid value"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Cannot X (because Y) (recover by Z)" - > "Cannot read a negative number of values. numValsToRead = %D"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
Schema expectedSchema, | ||
List<Expression> filters, | ||
Supplier<ScanReport> scanReportSupplier, | ||
Integer pushedLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should include this in SparkReadConf rather than having a separate argument. I have similar thoughts around SparkInputPartition. I'm not a big fan of having to plumb the new arguments all the way through the code base but those two options may not look great either since they aren't a great fit imho.
Ideally I think I would want something like SparkReadContext but I don't know how often more things like this will come up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RussellSpitzer for sharing your concern and suggestion!
I think we can borrow @aokolnychyi 's idea of adding ParquetBatchReadConf
and OrcBatchReadConf
. We can have something like
@Value.Immutable
public interface ParquetBatchReadConf extends Serializable {
ParquetReaderType readerType(); // this is for comet, we don't need this for now
int batchSize();
@Nullable
Integer limit();
}
@Value.Immutable
public interface OrcBatchReadConf extends Serializable {
int batchSize();
}
Similarly, we can also have ParquetRowReadConf
and OrcRowReadConf
.
I have changed the code to add ParquetBatchReadConf
and OrcBatchReadConf
. We still need to pass pushedLimit
to the SparkPartitioningAwareScan
, SparkScan
and SparkBatch
constructors, so pushedLimit
can be passed from SparkScanBuilder
to SparkBatch
, this is because pushedLimit
is not available in SparkReadConf
, we have to call SparkScanBuilder
.pushLimit
to get pushedLimit
.
Please let me know if this approach looks OK to you. Thanks!
} | ||
|
||
private OrcBatchReadConf orcBatchReadConf() { | ||
return ImmutableOrcBatchReadConf.builder().batchSize(readConf.parquetBatchSize()).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
orcBatchSize?
if (limitPushedDown) { | ||
assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(limit); | ||
} else { | ||
assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(null); | |
assertThat(pushedLimit).as("Pushed down limit should be " + limit).isNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks
import scala.collection.JavaConverters; | ||
|
||
@ExtendWith(ParameterizedTestExtension.class) | ||
public class TestLimitPushDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably extend TestBase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left my last few little nits on the PR, I want to make sure we get @aokolnychyi to take a peak too though since it was his suggestion to add in the Immutable Parquet and ORC Conf Classes. I wasn't sure how far he wanted to take them down the Spark Hierarchy.
Ack, will try to get to this asap. I am partially off this week. |
pushedLimit); | ||
} | ||
|
||
private boolean hasDeletes(org.apache.iceberg.Scan scan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too expensive. We should be using snapshot summary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's optional in the spec, and delete content is also optional even if the snapshot summary is present. Can we rely on that here?
I think we would have to fall back to the scan anyway if they are missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a few places, we default to what's safe. In this case it would be assuming there are deletes. We can also check if there are delete manifests in the snapshot we read, but doing an extra scan and iterating through all files seems too expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the scan is probably too expensive here and we probably should only be doing it once for the various requirements we have. I also think we should codify some of the keys and elements of the Summary, But for now I think no one should be relying on the summary as a primary source of truth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the deletes checking to SparkPartitioningAwareScan.tasks() so we don't need to do an extra planFiles
boolean caseSensitive, | ||
Integer pushedLimit) { | ||
this(table, taskGroup, tableSchema, expectedSchema, caseSensitive); | ||
this.pushedLimit = pushedLimit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we push it all the way to the reader instead of simply reading just enough files to match the limit based on the number of records we have in the metadata for each data file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial thought is that we can use ExpressionUtil$selectsPartitions
to test if the filter selects entire files. If so, we can simply find matching files so that the sum of recordCount
exceeds the pushed limit. It may be more complicated for filters that only select some records within files. Let me explore this PR more for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think pushing it all the way to the reader could yield a bit more performance gain.
Actually if filter exists, Spark side doesn't push down limit.
For sql such as
SELECT * FROM table LIMIT n
, push down Spark's partial limit to Iceberg, so that Iceberg can stop reading data once the limit is reached.