Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collated predicate #3655

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ val hadoopVersion = "3.3.4"
val scalaTestVersion = "3.2.15"
val scalaTestVersionForConnectors = "3.0.8"
val parquet4sVersion = "1.9.4"
val icu4jVersion = "75.1"

// Versions for Hive 3
val hadoopVersionForHive3 = "3.1.0"
Expand Down Expand Up @@ -656,6 +657,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",
"com.ibm.icu" % "icu4j" % icu4jVersion,

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1725838990599,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"true","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.2.0","txnId":"0c4e0d4d-0f13-4726-ad61-f10192ab81e2"}}
{"metaData":{"id":"9a7918b4-42a5-4b47-bf27-0e8a7289d654","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1725838990522}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1725839016423,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"440"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.2.0","txnId":"94355dc1-e083-4da0-9934-716b093eaf3a"}}
{"add":{"path":"part-00000-b839639e-0620-4d9c-baf8-20206fc2b063-c000.snappy.parquet","partitionValues":{},"size":440,"modificationTime":1725839016378,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c1\":\"a\"},\"maxValues\":{\"c1\":\"a\"},\"nullCount\":{\"c1\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1725839020852,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"440"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.2.0","txnId":"0b39a4b8-878c-4092-a46e-c2bccfa9aaf3"}}
{"add":{"path":"part-00000-fdd2f0a3-75ba-4b6d-85a3-03173742c909-c000.snappy.parquet","partitionValues":{},"size":440,"modificationTime":1725839020847,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c1\":\"A\"},\"maxValues\":{\"c1\":\"A\"},\"nullCount\":{\"c1\":0}}"}}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.delta.kernel.expressions;

public class CollatedPredicate extends Predicate {
public CollatedPredicate(String name, Expression left, Expression right, CollationIdentifier collationIdentifier) {
super(name, left, right);
this.collationIdentifier = collationIdentifier;
}

public CollationIdentifier getCollationIdentifier() {
return collationIdentifier;
}

private final CollationIdentifier collationIdentifier;

@Override
public String toString() {
if (BINARY_OPERATORS.contains(name)) {
return String.format("(%s %s %s [%s])", children.get(0), name, children.get(1), collationIdentifier);
}
return super.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.expressions;

import java.util.Optional;

public class CollationIdentifier {
public static final String PROVIDER_SPARK = "SPARK";
public static final String PROVIDER_ICU = "ICU";

public static final String ICU_COLLATOR_VERSION = "75.1";

public static final String DEFAULT_COLLATION_NAME = "UTF8_BINARY";

public static final CollationIdentifier DEFAULT_COLLATION_IDENTIFIER =
new CollationIdentifier(PROVIDER_SPARK, DEFAULT_COLLATION_NAME);

private final String provider;
private final String name;
private final Optional<String> version;

public CollationIdentifier(String provider, String collationName) {
this.provider = provider.toUpperCase();
this.name = collationName.toUpperCase();
this.version = Optional.empty();
}

public CollationIdentifier(String provider, String collationName, Optional<String> version) {
this.provider = provider.toUpperCase();
this.name = collationName.toUpperCase();
if (version.isPresent()) {
this.version = Optional.of(version.get().toUpperCase());
} else {
this.version = Optional.empty();
}
}

public String toStringWithoutVersion() {
return String.format("%s.%s", provider, name);
}

public String getProvider() {
return provider;
}

public String getName() {
return name;
}

// Returns Optional.empty()
public Optional<String> getVersion() {
return version;
}

public static CollationIdentifier fromString(String identifier) {
long numDots = identifier.chars().filter(ch -> ch == '.').count();
if (numDots == 0) {
throw new IllegalArgumentException(
String.format("Invalid collation identifier: %s", identifier));
} else if (numDots == 1) {
String[] parts = identifier.split("\\.");
return new CollationIdentifier(parts[0], parts[1]);
} else {
String[] parts = identifier.split("\\.", 3);
return new CollationIdentifier(parts[0], parts[1], Optional.of(parts[2]));
}
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CollationIdentifier)) {
return false;
}

CollationIdentifier other = (CollationIdentifier) o;
return this.provider.equals(other.provider)
&& this.name.equals(other.name)
&& this.version.equals(other.version);
}

@Override
public String toString() {
if (version.isPresent()) {
return String.format("%s.%s.%s", provider, name, version.get());
} else {
return String.format("%s.%s", provider, name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,6 @@ public String toString() {
return super.toString();
}

private static final Set<String> BINARY_OPERATORS =
protected static final Set<String> BINARY_OPERATORS =
Stream.of("<", "<=", ">", ">=", "=", "AND", "OR").collect(Collectors.toSet());
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,12 @@ public void close() throws IOException {
private Optional<DataSkippingPredicate> getDataSkippingFilter() {
return getDataFilters()
.flatMap(
dataFilters ->
DataSkippingUtils.constructDataSkippingFilter(
dataFilters, metadata.getDataSchema()));
dataFilters -> {
dataFilters = DataSkippingUtils.omitCollatedPredicateFromDataSkippingFilter(dataFilters);
return DataSkippingUtils.constructDataSkippingFilter(
dataFilters, metadata.getDataSchema());
}
);
}

private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.delta.kernel.internal.skipping;

import static io.delta.kernel.internal.DeltaErrors.timestampAfterLatestCommit;
import static io.delta.kernel.internal.DeltaErrors.wrapEngineException;
import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_ORDINAL;
import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_STATS_ORDINAL;
Expand Down Expand Up @@ -75,6 +76,153 @@ public static Optional<DataSkippingPredicate> constructDataSkippingFilter(
return constructDataSkippingFilter(dataFilters, schemaHelper);
}

public static Predicate omitCollatedPredicateFromDataSkippingFilter(Predicate dataFilters) {
return omitCollatedPredicateFromDataSkippingFilter(dataFilters, false)._1;
}

/**
* TODO
*
* @param dataFilters
* @param isNotPropagated
* @return
*/
private static Tuple2<Predicate, Boolean> omitCollatedPredicateFromDataSkippingFilter(Predicate dataFilters,
boolean isNotPropagated) {
if (dataFilters instanceof CollatedPredicate) {
return new Tuple2<>(AlwaysTrue.ALWAYS_TRUE, true);
}

String predicateName = dataFilters.getName().toUpperCase(Locale.ROOT);
if (isNotPropagated && REVERSE_PREDICATE.containsKey(predicateName)) {
predicateName = REVERSE_PREDICATE.get(predicateName);
}

switch (predicateName) {
case "AND":
Predicate leftPredicate = asPredicate(getLeft(dataFilters));
Predicate rightPredicate = asPredicate(getRight(dataFilters));
Tuple2<Predicate, Boolean> leftResult = omitCollatedPredicateFromDataSkippingFilter(leftPredicate, isNotPropagated);
Tuple2<Predicate, Boolean> rightResult = omitCollatedPredicateFromDataSkippingFilter(rightPredicate, isNotPropagated);
boolean hasCollatedPredicate = leftResult._2 || rightResult._2;
Predicate resultingPredicate = AlwaysTrue.ALWAYS_TRUE;
if (leftResult._1 != AlwaysTrue.ALWAYS_TRUE) {
resultingPredicate = leftResult._1;
}
if (rightResult._1 != AlwaysTrue.ALWAYS_TRUE) {
if (resultingPredicate == AlwaysTrue.ALWAYS_TRUE) {
resultingPredicate = rightResult._1;
} else {
resultingPredicate = new And(leftResult._1, rightResult._1);
}
}
return new Tuple2<>(resultingPredicate, hasCollatedPredicate);
case "OR":
leftPredicate = asPredicate(getLeft(dataFilters));
rightPredicate = asPredicate(getRight(dataFilters));
leftResult = omitCollatedPredicateFromDataSkippingFilter(leftPredicate, isNotPropagated);
rightResult = omitCollatedPredicateFromDataSkippingFilter(rightPredicate, isNotPropagated);
hasCollatedPredicate = leftResult._2 || rightResult._2;
if (leftResult._1 == AlwaysTrue.ALWAYS_TRUE || rightResult._1 == AlwaysTrue.ALWAYS_TRUE) {
resultingPredicate = AlwaysTrue.ALWAYS_TRUE;
} else {
resultingPredicate = new Or(leftResult._1, rightResult._1);
}
return new Tuple2<>(resultingPredicate, hasCollatedPredicate);
case "=":
Expression left = getLeft(dataFilters);
Expression right = getRight(dataFilters);
hasCollatedPredicate = false;
if (left instanceof Predicate) {
leftResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) left, isNotPropagated);
left = leftResult._1;
hasCollatedPredicate |= leftResult._2;
}
if (right instanceof Predicate) {
rightResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) right, isNotPropagated);
right = rightResult._1;
hasCollatedPredicate |= rightResult._2;
}
if (hasCollatedPredicate) {
return new Tuple2<>(AlwaysTrue.ALWAYS_TRUE, true);
}
if (isNotPropagated) {
return new Tuple2<>(
new Or(new Predicate("<", left, right),
new Predicate("<", right, left)),
false);
} else {
return new Tuple2<>(new Predicate("=", left, right), false);
}
case ">":
case ">=":
left = getLeft(dataFilters);
right = getRight(dataFilters);
boolean hasCollatedPredicateOnLeft = false;
if (left instanceof Predicate) {
leftResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) left, isNotPropagated);
left = leftResult._1;
hasCollatedPredicateOnLeft = leftResult._2;
}
boolean hasCollatedPredicateOnRight = false;
if (right instanceof Predicate) {
rightResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) right, isNotPropagated);
right = rightResult._1;
hasCollatedPredicateOnRight = rightResult._2;
}
if (hasCollatedPredicateOnRight) {
return new Tuple2<>(AlwaysTrue.ALWAYS_TRUE, true);
}
return new Tuple2<>(new Predicate(predicateName, left, right),
hasCollatedPredicateOnLeft);
case "<":
case "<=":
left = getLeft(dataFilters);
right = getRight(dataFilters);
hasCollatedPredicateOnLeft = false;
if (left instanceof Predicate) {
leftResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) left, isNotPropagated);
left = leftResult._1;
hasCollatedPredicateOnLeft = leftResult._2;
}
hasCollatedPredicateOnRight = false;
if (right instanceof Predicate) {
rightResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) right, isNotPropagated);
right = rightResult._1;
hasCollatedPredicateOnRight = rightResult._2;
}
if (hasCollatedPredicateOnLeft) {
return new Tuple2<>(AlwaysTrue.ALWAYS_TRUE, true);
}
return new Tuple2<>(new Predicate(predicateName, left, right),
hasCollatedPredicateOnRight);
case "IS_NULL":
Expression child = getUnaryChild(dataFilters);
if (!(child instanceof Predicate)) {
return new Tuple2<>(dataFilters, false);
}
Tuple2<Predicate, Boolean> childResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) child, isNotPropagated);
if (childResult._2) {
return new Tuple2<>(AlwaysTrue.ALWAYS_TRUE, true);
} else {
return new Tuple2<>(childResult._1, false);
}
case "IS_NOT_NULL":
child = getUnaryChild(dataFilters);
if (!(child instanceof Predicate)) {
return new Tuple2<>(dataFilters, false);
}
childResult = omitCollatedPredicateFromDataSkippingFilter((Predicate) child, isNotPropagated);
return new Tuple2<>(childResult._1, childResult._2);
case "NOT":
Predicate childPredicate = asPredicate(getUnaryChild(dataFilters));
return omitCollatedPredicateFromDataSkippingFilter(childPredicate, !isNotPropagated);
}

throw new IllegalArgumentException(
String.format("Invalid predicate name: %s.", predicateName));
}

//////////////////////////////////////////////////////////////////////////////////
// Helper functions
//////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -334,20 +482,25 @@ private static DataSkippingPredicate constructBinaryDataSkippingPredicate(
exprName, Arrays.asList(adjColExpr, lit), Collections.singleton(column));
}

private static final Map<String, String> REVERSE_COMPARATORS =
private static final Map<String, String> REVERSE_PREDICATE =
new HashMap<String, String>() {
{
put("AND", "OR");
put("OR", "AND");
put("IS_NULL", "IS_NOT_NULL");
put("IS_NOT_NULL", "IS_NULL");
put("NOT", "NOT");
put("=", "=");
put("<", ">");
put("<=", ">=");
put(">", "<");
put(">=", "<=");
put("<", ">=");
put("<=", ">");
put(">", "<=");
put(">=", "<");
}
};

private static Predicate reverseComparatorFilter(Predicate predicate) {
return new Predicate(
REVERSE_COMPARATORS.get(predicate.getName().toUpperCase(Locale.ROOT)),
REVERSE_PREDICATE.get(predicate.getName().toUpperCase(Locale.ROOT)),
getRight(predicate),
getLeft(predicate));
}
Expand Down
Loading
Loading