diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala index 990ba73b5..672e38a90 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.functions.input_file_name import org.apache.spark.sql.hyperspace.utils.StructTypeUtils @@ -151,7 +152,7 @@ case class DataSkippingIndex( // by combining individual index predicates with And. // True is returned if there are no index predicates for the source predicate node. def toIndexPred(sourcePred: Expression): Expression = { - predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(Literal.TrueLiteral) + predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(TrueLiteral) } // Compose an index predicate visiting the source predicate tree recursively. @@ -168,15 +169,15 @@ case class DataSkippingIndex( // This is a trimmed down version of the BooleanSimplification rule. // It's just enough to determine whether the index is applicable or not. val optimizePredicate: PartialFunction[Expression, Expression] = { - case And(Literal.TrueLiteral, right) => right - case And(left, Literal.TrueLiteral) => left - case Or(Literal.TrueLiteral, _) => Literal.TrueLiteral - case Or(_, Literal.TrueLiteral) => Literal.TrueLiteral + case And(TrueLiteral, right) => right + case And(left, TrueLiteral) => left + case Or(TrueLiteral, _) => TrueLiteral + case Or(_, TrueLiteral) => TrueLiteral } val optimizedIndexPredicate = indexPredicate.transformUp(optimizePredicate) // Return None if the index predicate is True - meaning no conversion can be done. - if (optimizedIndexPredicate == Literal.TrueLiteral) { + if (optimizedIndexPredicate == TrueLiteral) { None } else { Some(optimizedIndexPredicate) @@ -321,7 +322,7 @@ object DataSkippingIndex { assert(aggrs.nonEmpty) aggrs.zipWithIndex.map { case (aggr, idx) => - new Column(aggr).as(getNormalizeColumnName(s"${s}_$idx")) + new Column(aggr.toAggregateExpression).as(getNormalizeColumnName(s"${s}_$idx")) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterAgg.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterAgg.scala new file mode 100644 index 000000000..95afffd49 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterAgg.scala @@ -0,0 +1,83 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.sketch.BloomFilter + +/** + * Aggregation function that collects elements in a bloom filter. + */ +private[dataskipping] case class BloomFilterAgg( + child: Expression, + expectedNumItems: Long, // expected number of distinct elements + fpp: Double, // target false positive probability + override val mutableAggBufferOffset: Int = 0, + override val inputAggBufferOffset: Int = 0) + extends TypedImperativeAggregate[BloomFilter] { + + override def prettyName: String = "bloom_filter" + + override def dataType: DataType = bloomFilterEncoder.dataType + + override def nullable: Boolean = false + + override def children: Seq[Expression] = Seq(child) + + override def createAggregationBuffer(): BloomFilter = { + BloomFilter.create(expectedNumItems, fpp) + } + + override def update(buffer: BloomFilter, input: InternalRow): BloomFilter = { + val value = child.eval(input) + if (value != null) { + BloomFilterUtils.put(buffer, value, child.dataType) + } + buffer + } + + override def merge(buffer: BloomFilter, input: BloomFilter): BloomFilter = { + buffer.mergeInPlace(input) + buffer + } + + override def eval(buffer: BloomFilter): Any = bloomFilterEncoder.encode(buffer) + + override def serialize(buffer: BloomFilter): Array[Byte] = { + val out = new ByteArrayOutputStream() + buffer.writeTo(out) + out.toByteArray + } + + override def deserialize(bytes: Array[Byte]): BloomFilter = { + val in = new ByteArrayInputStream(bytes) + BloomFilter.readFrom(in) + } + + override def withNewMutableAggBufferOffset(newOffset: Int): BloomFilterAgg = + copy(mutableAggBufferOffset = newOffset) + + override def withNewInputAggBufferOffset(newOffset: Int): BloomFilterAgg = + copy(inputAggBufferOffset = newOffset) + + private def bloomFilterEncoder = BloomFilterEncoderProvider.defaultEncoder +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterEncoder.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterEncoder.scala new file mode 100644 index 000000000..3c956a8d3 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterEncoder.scala @@ -0,0 +1,42 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.sketch.BloomFilter + +/** + * Defines how [[BloomFilter]] should be represented in the Spark DataFrame. + */ +trait BloomFilterEncoder { + + /** + * Returns the data type of the value in the DataFrame representing [[BloomFilter]]. + */ + def dataType: DataType + + /** + * Returns a value representing the given [[BloomFilter]] + * that can be put in the [[InternalRow]]. + */ + def encode(bf: BloomFilter): Any + + /** + * Returns a [[BloomFilter]] from the value in the DataFrame. + */ + def decode(value: Any): BloomFilter +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterEncoderProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterEncoderProvider.scala new file mode 100644 index 000000000..2f65624db --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterEncoderProvider.scala @@ -0,0 +1,30 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +/** + * Provides the default implementation of [[BloomFilterEncoder]]. + */ +object BloomFilterEncoderProvider { + + /** + * Returns the default encoder. + * + * It should return a singleton object declared as "object". + */ + def defaultEncoder: BloomFilterEncoder = FastBloomFilterEncoder +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContain.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContain.scala new file mode 100644 index 000000000..6bc7f23d8 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContain.scala @@ -0,0 +1,71 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ + +/** + * Returns true if the bloom filter (left) might contain the value (right). + * + * If the value (right) is null, null is returned. + * + * Preconditions (unchecked): + * - The bloom filter must not be null. + */ +private[dataskipping] case class BloomFilterMightContain(left: Expression, right: Expression) + extends BinaryExpression + with Predicate { + + override def prettyName: String = "bloom_filter_might_contain" + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { + val value = right.eval(input) + if (value != null) { + val bfData = left.eval(input) + val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData) + return BloomFilterUtils.mightContain(bf, value, right.dataType) + } + null + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val leftGen = left.genCode(ctx) + val rightGen = right.genCode(ctx) + val bloomFilterEncoder = + BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$") + val bf = s"$bloomFilterEncoder.decode(${leftGen.value})" + val result = BloomFilterUtils.mightContainCodegen(bf, rightGen.value, right.dataType) + val resultCode = + s""" + |if (!(${rightGen.isNull})) { + | ${leftGen.code} + | ${ev.isNull} = false; + | ${ev.value} = $result; + |} + """.stripMargin + ev.copy(code = code""" + ${rightGen.code} + boolean ${ev.isNull} = true; + boolean ${ev.value} = false; + $resultCode""") + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainAny.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainAny.scala new file mode 100644 index 000000000..4b6c4189d --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainAny.scala @@ -0,0 +1,85 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.sketch.BloomFilter + +/** + * Returns true if the bloom filter (child) might contain one of the values. + * + * Preconditions (unchecked): + * - The bloom filter must not be null. + * - The values must be an array without nulls. + * - If the element type can be represented as a primitive type in Scala, + * then the array must be an array of the primitive type. + */ +private[dataskipping] case class BloomFilterMightContainAny( + child: Expression, + values: Any, + elementType: DataType) + extends UnaryExpression + with Predicate { + + override def prettyName: String = "bloom_filter_might_contain_any" + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Boolean = { + val bfData = child.eval(input) + val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData) + values + .asInstanceOf[Array[_]] + .exists(BloomFilterUtils.mightContain(bf, _, elementType)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx); + val bloomFilterEncoder = + BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$") + val bf = ctx.freshName("bf") + val bfType = classOf[BloomFilter].getCanonicalName + val javaType = CodeGenerator.javaType(elementType) + val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]" + val valuesRef = ctx.addReferenceObj("values", values, arrayType) + val valuesArray = ctx.freshName("values") + val i = ctx.freshName("i") + val mightContain = + BloomFilterUtils.mightContainCodegen(bf, s"($javaType) $valuesArray[$i]", elementType) + val resultCode = + s""" + |$bfType $bf = $bloomFilterEncoder.decode(${childGen.value}); + |$arrayType $valuesArray = $valuesRef; + |for (int $i = 0; $i < $valuesArray.length; $i++) { + | if ($mightContain) { + | ${ev.value} = true; + | break; + | } + |} + """.stripMargin + ev.copy( + code = code""" + ${childGen.code} + boolean ${ev.value} = false; + $resultCode""", + isNull = FalseLiteral) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterUtils.scala new file mode 100644 index 000000000..2490b638a --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterUtils.scala @@ -0,0 +1,62 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.HyperspaceException + +// TODO: Support more types. +// Currently we are relying on org.apache.spark.util.sketch.BloomFilter and +// supported types are restricted by the implementation. To support more types +// without changing the underlying implementation, we can convert Spark values +// to and from byte arrays. +private[dataskipping] object BloomFilterUtils { + def put(bf: BloomFilter, value: Any, dataType: DataType): Boolean = + dataType match { + case LongType => bf.putLong(value.asInstanceOf[Long]) + case IntegerType => bf.putLong(value.asInstanceOf[Int]) + case ByteType => bf.putLong(value.asInstanceOf[Byte]) + case ShortType => bf.putLong(value.asInstanceOf[Short]) + case StringType => bf.putBinary(value.asInstanceOf[UTF8String].getBytes) + case BinaryType => bf.putBinary(value.asInstanceOf[Array[Byte]]) + case _ => throw HyperspaceException(s"BloomFilter does not support ${dataType}") + } + + def mightContain(bf: BloomFilter, value: Any, dataType: DataType): Boolean = { + dataType match { + case LongType => bf.mightContainLong(value.asInstanceOf[Long]) + case IntegerType => bf.mightContainLong(value.asInstanceOf[Int]) + case ByteType => bf.mightContainLong(value.asInstanceOf[Byte]) + case ShortType => bf.mightContainLong(value.asInstanceOf[Short]) + case StringType => bf.mightContainBinary(value.asInstanceOf[UTF8String].getBytes) + case BinaryType => bf.mightContainBinary(value.asInstanceOf[Array[Byte]]) + case _ => throw HyperspaceException(s"BloomFilter does not support ${dataType}") + } + } + + def mightContainCodegen(bf: String, value: String, dataType: DataType): String = { + dataType match { + case LongType | IntegerType | ByteType | ShortType => s"$bf.mightContainLong($value)" + case StringType => s"$bf.mightContainBinary(($value).getBytes())" + case BinaryType => s"$bf.mightContainBinary($value)" + case _ => throw HyperspaceException(s"BloomFilter does not support ${dataType}") + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/FastBloomFilterEncoder.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/FastBloomFilterEncoder.scala new file mode 100644 index 000000000..36b222896 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/FastBloomFilterEncoder.scala @@ -0,0 +1,60 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType, StructField, StructType} +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.dataskipping.util.ReflectionHelper + +/** + * A [[BloomFilterEncoder]] implementation that avoids copying arrays. + */ +object FastBloomFilterEncoder extends BloomFilterEncoder with ReflectionHelper { + override val dataType: StructType = StructType( + StructField("numHashFunctions", IntegerType, nullable = false) :: + StructField("bitCount", LongType, nullable = false) :: + StructField("data", ArrayType(LongType, containsNull = false), nullable = false) :: Nil) + + override def encode(bf: BloomFilter): InternalRow = { + val bloomFilterImplClass = bf.getClass + val bits = get(bloomFilterImplClass, "bits", bf) + val bitArrayClass = bits.getClass + InternalRow( + getInt(bloomFilterImplClass, "numHashFunctions", bf), + getLong(bitArrayClass, "bitCount", bits), + ArrayData.toArrayData(get(bitArrayClass, "data", bits).asInstanceOf[Array[Long]])) + } + + override def decode(value: Any): BloomFilter = { + val struct = value.asInstanceOf[InternalRow] + val numHashFunctions = struct.getInt(0) + val bitCount = struct.getLong(1) + val data = struct.getArray(2).toLongArray() + + val bf = BloomFilter.create(1) + val bloomFilterImplClass = bf.getClass + val bits = get(bloomFilterImplClass, "bits", bf) + val bitArrayClass = bits.getClass + setInt(bloomFilterImplClass, "numHashFunctions", bf, numHashFunctions) + setLong(bitArrayClass, "bitCount", bits, bitCount) + set(bitArrayClass, "data", bits, data) + bf + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayLowerBound.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayLowerBound.scala index 48ed15ff9..9199f7882 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayLowerBound.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayLowerBound.scala @@ -46,9 +46,9 @@ private[dataskipping] case class SortedArrayLowerBound(left: Expression, right: override def nullable: Boolean = true override def eval(input: InternalRow): Any = { - val arr = left.eval(input).asInstanceOf[ArrayData] val value = right.eval(input) if (value != null) { + val arr = left.eval(input).asInstanceOf[ArrayData] val dt = right.dataType val n = arr.numElements() if (n > 0) { @@ -77,6 +77,7 @@ private[dataskipping] case class SortedArrayLowerBound(left: Expression, right: val resultCode = s""" |if (!(${rightGen.isNull})) { + | ${leftGen.code} | int $n = $arr.numElements(); | if ($n > 0) { | if (!(${ctx.genGreater(dt, value, firstValueInArr)})) { @@ -90,7 +91,6 @@ private[dataskipping] case class SortedArrayLowerBound(left: Expression, right: |} """.stripMargin ev.copy(code = code""" - ${leftGen.code} ${rightGen.code} boolean ${ev.isNull} = true; int ${ev.value} = 0; diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/StreamBloomFilterEncoder.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/StreamBloomFilterEncoder.scala new file mode 100644 index 000000000..78a768b7a --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/StreamBloomFilterEncoder.scala @@ -0,0 +1,40 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.util.sketch.BloomFilter + +/** + * A [[BloomFilterEncoder]] implementation based on byte array streams. + */ +object StreamBloomFilterEncoder extends BloomFilterEncoder { + val dataType: BinaryType = BinaryType + + def encode(bf: BloomFilter): Array[Byte] = { + val out = new ByteArrayOutputStream() + bf.writeTo(out) + out.toByteArray + } + + def decode(value: Any): BloomFilter = { + val in = new ByteArrayInputStream(value.asInstanceOf[Array[Byte]]) + BloomFilter.readFrom(in) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/BloomFilterSketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/BloomFilterSketch.scala new file mode 100644 index 000000000..556d8fdab --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/BloomFilterSketch.scala @@ -0,0 +1,87 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketches + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction +import org.apache.spark.sql.types.DataType + +import com.microsoft.hyperspace.index.dataskipping.expressions._ +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray + +/** + * Sketch based on a bloom filter for a given expression. + * + * Being a probabilistic structure, it is more efficient in terms of the index + * data size than [[ValueListSketch]] if the number of distinct values for the + * expression is large, but can be less efficient in terms of query optimization + * than [[ValueListSketch]] due to false positives. + * + * Users can specify the target false positive rate and the expected number of + * distinct values per file. These variables determine the size of the bloom + * filters and thus the size of the index data. + * + * @param expr Expression this sketch is based on + * @param fpp Target false positive rate + * @param expectedDistinctCountPerFile Expected number of distinct values per file + * @param dataType Optional data type to specify the expected data type of the + * expression. If not specified, it is deduced automatically. + * If the actual data type of the expression is different from this, + * an error is thrown. Users are recommended to leave this parameter to + * None. + */ +case class BloomFilterSketch( + override val expr: String, + fpp: Double, + expectedDistinctCountPerFile: Long, + override val dataType: Option[DataType] = None) + extends SingleExprSketch[BloomFilterSketch](expr, dataType) { + override def name: String = "BloomFilter" + + override def toString: String = s"$name($expr, $fpp, $expectedDistinctCountPerFile)" + + override def withNewExpression(newExpr: (String, Option[DataType])): BloomFilterSketch = { + copy(expr = newExpr._1, dataType = newExpr._2) + } + + override def aggregateFunctions: Seq[AggregateFunction] = { + BloomFilterAgg(parsedExpr, expectedDistinctCountPerFile, fpp) :: Nil + } + + override def convertPredicate( + predicate: Expression, + resolvedExprs: Seq[Expression], + sketchValues: Seq[Expression], + nameMap: Map[ExprId, String], + valueExtractor: ExpressionExtractor): Option[Expression] = { + val bf = sketchValues.head + val resolvedExpr = resolvedExprs.head + val dataType = resolvedExpr.dataType + val exprExtractor = NormalizedExprExtractor(resolvedExpr, nameMap) + val ExprEqualTo = EqualToExtractor(exprExtractor, valueExtractor) + val ExprEqualNullSafe = EqualNullSafeExtractor(exprExtractor, valueExtractor) + val ExprIn = InExtractor(exprExtractor, valueExtractor) + val ExprInSet = InSetExtractor(exprExtractor) + Option(predicate).collect { + case ExprEqualTo(_, v) => BloomFilterMightContain(bf, v) + case ExprEqualNullSafe(_, v) => Or(IsNull(v), BloomFilterMightContain(bf, v)) + case ExprIn(_, vs) => vs.map(BloomFilterMightContain(bf, _)).reduceLeft(Or) + case ExprInSet(_, vs) => + BloomFilterMightContainAny(bf, toArray(vs.filter(_ != null).toSeq, dataType), dataType) + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketch.scala index 6d8a143dc..808c8a0c9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketch.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketch.scala @@ -17,7 +17,7 @@ package com.microsoft.hyperspace.index.dataskipping.sketches import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{Max, Min} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, Max, Min} import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} import org.apache.spark.sql.types.{ArrayType, DataType} @@ -42,8 +42,9 @@ case class MinMaxSketch(override val expr: String, override val dataType: Option copy(expr = newExpr._1, dataType = newExpr._2) } - override def aggregateFunctions: Seq[Expression] = - Min(parsedExpr).toAggregateExpression() :: Max(parsedExpr).toAggregateExpression() :: Nil + override def aggregateFunctions: Seq[AggregateFunction] = { + Min(parsedExpr) :: Max(parsedExpr) :: Nil + } override def convertPredicate( predicate: Expression, diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketch.scala index 71db379ac..4b247ddd4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketch.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketch.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index.dataskipping.sketches import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.types.DataType import com.microsoft.hyperspace.index.dataskipping.expressions._ @@ -47,10 +48,10 @@ private[dataskipping] case class PartitionSketch( copy(expressions = newExpressions) } - override def aggregateFunctions: Seq[Expression] = { + override def aggregateFunctions: Seq[AggregateFunction] = { val parser = SparkSession.getActiveSession.get.sessionState.sqlParser exprStrings.map { e => - FirstNullSafe(parser.parseExpression(e)).toAggregateExpression() + FirstNullSafe(parser.parseExpression(e)) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/Sketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/Sketch.scala index 2bab62dee..743d566d4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/Sketch.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/Sketch.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index.dataskipping.sketches import com.fasterxml.jackson.annotation.JsonTypeInfo import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.types.DataType import com.microsoft.hyperspace.index.dataskipping.expressions.ExpressionExtractor @@ -58,7 +59,7 @@ trait Sketch { * Returns aggregate functions that can be used to compute the actual sketch * values from source data. */ - def aggregateFunctions: Seq[Expression] + def aggregateFunctions: Seq[AggregateFunction] /** * Returns a human-readable string describing this sketch. diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ReflectionHelper.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ReflectionHelper.scala new file mode 100644 index 000000000..aecc5b9c4 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ReflectionHelper.scala @@ -0,0 +1,51 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import java.lang.reflect.Field + +trait ReflectionHelper { + def getAccesibleDeclaredField(clazz: Class[_], name: String): Field = { + val field = clazz.getDeclaredField(name) + field.setAccessible(true) + field + } + + def get(clazz: Class[_], fieldName: String, obj: Any): Any = { + getAccesibleDeclaredField(clazz, fieldName).get(obj) + } + + def getInt(clazz: Class[_], fieldName: String, obj: Any): Int = { + getAccesibleDeclaredField(clazz, fieldName).getInt(obj) + } + + def getLong(clazz: Class[_], fieldName: String, obj: Any): Long = { + getAccesibleDeclaredField(clazz, fieldName).getLong(obj) + } + + def set(clazz: Class[_], fieldName: String, obj: Any, value: Any): Unit = { + getAccesibleDeclaredField(clazz, fieldName).set(obj, value) + } + + def setInt(clazz: Class[_], fieldName: String, obj: Any, value: Int): Unit = { + getAccesibleDeclaredField(clazz, fieldName).setInt(obj, value) + } + + def setLong(clazz: Class[_], fieldName: String, obj: Any, value: Long): Unit = { + getAccesibleDeclaredField(clazz, fieldName).setLong(obj, value) + } +} diff --git a/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala b/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala index 532634a3b..fe04d7f58 100644 --- a/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala +++ b/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala @@ -16,6 +16,8 @@ package com.microsoft.hyperspace.util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.execution.command.ExplainCommand @@ -32,4 +34,8 @@ object SparkTestShims { ExplainCommand(logicalPlan, extended = false) } } + + def fromRow[T](encoder: ExpressionEncoder[T], row: InternalRow): T = { + encoder.fromRow(row) + } } diff --git a/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala b/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala index 556b167a2..9e475f54e 100644 --- a/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala +++ b/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala @@ -16,6 +16,8 @@ package com.microsoft.hyperspace.util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.execution.SimpleMode @@ -33,4 +35,8 @@ object SparkTestShims { ExplainCommand(logicalPlan, SimpleMode) } } + + def fromRow[T](encoder: ExpressionEncoder[T], row: InternalRow): T = { + encoder.createDeserializer().apply(row) + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/BloomFilterTestUtils.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/BloomFilterTestUtils.scala new file mode 100644 index 000000000..ca1507c01 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/BloomFilterTestUtils.scala @@ -0,0 +1,40 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.dataskipping.expressions.BloomFilterEncoderProvider +import com.microsoft.hyperspace.util.SparkTestShims + +trait BloomFilterTestUtils { + def encodeExternal(bf: BloomFilter): Any = { + val bloomFilterEncoder = BloomFilterEncoderProvider.defaultEncoder + val data = bloomFilterEncoder.encode(bf) + val dataType = bloomFilterEncoder.dataType + dataType match { + case st: StructType => + SparkTestShims.fromRow(RowEncoder(st).resolveAndBind(), data.asInstanceOf[InternalRow]) + case _ => + val encoder = RowEncoder(StructType(StructField("x", dataType) :: Nil)).resolveAndBind() + SparkTestShims.fromRow(encoder, InternalRow(data)).get(0) + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala index 9596310fc..e10dd7a6b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala @@ -19,12 +19,13 @@ package com.microsoft.hyperspace.index.dataskipping import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions.{input_file_name, max, min} import org.apache.spark.sql.types.{IntegerType, LongType, StringType} +import org.apache.spark.util.sketch.BloomFilter import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index.IndexConstants import com.microsoft.hyperspace.index.dataskipping.sketches._ -class DataSkippingIndexConfigTest extends DataSkippingSuite { +class DataSkippingIndexConfigTest extends DataSkippingSuite with BloomFilterTestUtils { test("indexName returns the index name.") { val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A")) assert(indexConfig.indexName === "myIndex") @@ -85,6 +86,30 @@ class DataSkippingIndexConfigTest extends DataSkippingSuite { checkAnswer(indexData, withFileId(expectedSketchValues)) } + test("createIndex works correctly with a BloomFilterSketch.") { + val sourceData = createSourceData(spark.range(100).toDF("A")) + val indexConfig = DataSkippingIndexConfig("MyIndex", BloomFilterSketch("A", 0.001, 20)) + val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map()) + assert(index.sketches === Seq(BloomFilterSketch("A", 0.001, 20, Some(LongType)))) + val valuesAndBloomFilters = indexData + .collect() + .map { row => + val fileId = row.getAs[Long](IndexConstants.DATA_FILE_NAME_ID) + val filePath = fileIdTracker.getIdToFileMapping().toMap.apply(fileId) + val values = spark.read.parquet(filePath).collect().toSeq.map(_.getLong(0)) + val bfData = row.getAs[Any]("BloomFilter_A__0.001__20__0") + (values, bfData) + } + valuesAndBloomFilters.foreach { + case (values, bfData) => + val bf = BloomFilter.create(20, 0.001) + values.foreach(bf.put) + assert(bfData === encodeExternal(bf)) + } + assert( + indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "BloomFilter_A__0.001__20__0")) + } + test("createIndex resolves column names and data types.") { val sourceData = createSourceData(spark.range(10).toDF("Foo")) val indexConfig = DataSkippingIndexConfig("MyIndex", MinMaxSketch("foO")) diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala index 11133a7cf..263425fb6 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala @@ -27,7 +27,6 @@ import com.microsoft.hyperspace.index.IndexConstants import com.microsoft.hyperspace.index.covering.CoveringIndexConfig import com.microsoft.hyperspace.index.dataskipping.sketches._ import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation -import com.microsoft.hyperspace.shim.ExtractFileSourceScanExecRelation class DataSkippingIndexIntegrationTest extends DataSkippingSuite with IcebergTestUtils { import spark.implicits._ @@ -182,6 +181,59 @@ class DataSkippingIndexIntegrationTest extends DataSkippingSuite with IcebergTes checkIndexApplied(query, numParallelism + 1) } + test("BloomFilter index is applied for a filter query (EqualTo).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test( + "BloomFilter index is applied for a filter query (EqualTo) " + + "where some source data files has only null values.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(Seq[Integer](1, 2, 3, null, 5, null, 7, 8, 9, null).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test("BloomFilter index is applied for a filter query (In).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A in (1, 11, 19)") + checkIndexApplied(query, 2) + } + } + } + + test("BloomFilter index support string type.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(('a' to 'z').map(_.toString).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A = 'a'") + checkIndexApplied(query, 1) + } + } + } + + test("BloomFilter index does not support double type.") { + val df = createSourceData((0 until 10).map(_.toDouble).toDF("A")) + val ex = intercept[SparkException]( + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10)))) + assert(ex.getCause().getMessage().contains("BloomFilter does not support DoubleType")) + } + test( "DataSkippingIndex works correctly for CSV where the same source data files can be " + "interpreted differently.") { @@ -275,6 +327,20 @@ class DataSkippingIndexIntegrationTest extends DataSkippingSuite with IcebergTes } } + test( + "BloomFilter index can be applied without refresh when source files are deleted " + + "if hybrid scan is enabled.") { + withSQLConf( + IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true", + IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> "1") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.001, 10))) + deleteFile(listFiles(dataPath()).filter(isParquet).head.getPath) + def query: DataFrame = spark.read.parquet(dataPath().toString).filter("A in (25, 50, 75)") + checkIndexApplied(query, 3) + } + } + test("Empty source data does not cause an error.") { val df = createSourceData(spark.range(0).toDF("A")) hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterAggTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterAggTest.scala new file mode 100644 index 000000000..3c38dd627 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterAggTest.scala @@ -0,0 +1,51 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.col +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.BloomFilterTestUtils + +class BloomFilterAggTest extends HyperspaceSuite with BloomFilterTestUtils { + import spark.implicits._ + + test("BloomFilterAgg computes BloomFilter correctly.") { + val n = 10000 + val m = 3000 + val fpp = 0.01 + + val agg = new Column(BloomFilterAgg(col("a").expr, m, fpp).toAggregateExpression()) + val df = spark + .range(n) + .toDF("a") + .filter(col("a") % 3 === 0) + .union(Seq[Integer](null).toDF("a")) + .agg(agg) + val bfData = df.collect()(0).getAs[Any](0) + + val expectedBf = BloomFilter.create(m, fpp) + for (i <- 0 until n) { + if (i % 3 == 0) { + expectedBf.put(i.toLong) + } + } + assert(bfData === encodeExternal(expectedBf)) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainAnyTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainAnyTest.scala new file mode 100644 index 000000000..3162a282c --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainAnyTest.scala @@ -0,0 +1,65 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.types._ +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray + +class BloomFilterMightContainAnyTest extends HyperspaceSuite { + def test(values: Seq[Any], dataType: DataType): Unit = { + val bf = BloomFilter.create(values.length, 0.01) + val bfData = Literal( + BloomFilterEncoderProvider.defaultEncoder.encode(bf), + BloomFilterEncoderProvider.defaultEncoder.dataType) + for (k <- 1 to 3) { + values.grouped(k).foreach { vs => + val valuesArray = toArray(values.map(Literal.create(_, dataType).eval()), dataType) + assert( + BloomFilterMightContainAny(bfData, valuesArray, dataType).eval() === vs.contains( + bf.mightContain(_))) + } + } + } + + test("BloomFilterMightContainAny works correctly for an int array.") { + test((0 until 1000).map(_ * 2), IntegerType) + } + + test("BloomFilterMightContainAny works correctly for a long array.") { + test((0L until 1000L).map(_ * 2), LongType) + } + + test("BloomFilterMightContainAny works correctly for a byte array.") { + test(Seq(0, 1, 3, 7, 15, 31, 63, 127).map(_.toByte), ByteType) + } + + test("BloomFilterMightContainAny works correctly for a short array.") { + test(Seq(1, 3, 5, 7, 9).map(_.toShort), ShortType) + } + + test("BloomFilterMightContainAny works correctly for a string array.") { + test(Seq("hello", "world", "foo", "bar"), StringType) + } + + test("BloomFilterMightContainAny works correctly for a binary array.") { + test(Seq(Array[Byte](1, 2), Array[Byte](3, 4)), BinaryType) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainTest.scala new file mode 100644 index 000000000..7e354163f --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterMightContainTest.scala @@ -0,0 +1,68 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite + +class BloomFilterMightContainTest extends HyperspaceSuite with ExpressionEvalHelper { + def test(values: Seq[Any], dataType: DataType): Unit = { + val bf = BloomFilter.create(values.length, 0.01) + val bfData = Literal( + BloomFilterEncoderProvider.defaultEncoder.encode(bf), + BloomFilterEncoderProvider.defaultEncoder.dataType) + values.foreach { v => + val lit = Literal.create(v, dataType) + checkEvaluation(BloomFilterMightContain(bfData, lit), bf.mightContain(v)) + } + } + + test("BloomFilterMightContain works correctly for an int array.") { + test((0 until 50).map(_ * 2), IntegerType) + } + + test("BloomFilterMightContain works correctly for a long array.") { + test((0L until 50L).map(_ * 2), LongType) + } + + test("BloomFilterMightContain works correctly for a byte array.") { + test(Seq(0, 1, 3, 7, 15, 31, 63, 127).map(_.toByte), ByteType) + } + + test("BloomFilterMightContain works correctly for a short array.") { + test(Seq(1, 3, 5, 7, 9).map(_.toShort), ShortType) + } + + test("BloomFilterMightContain works correctly for a string array.") { + test(Seq("hello", "world", "foo", "bar"), StringType) + } + + test("BloomFilterMightContain works correctly for a binary array.") { + test(Seq(Array[Byte](1, 2), Array[Byte](3, 4)), BinaryType) + } + + test("BloomFilterMightContain returns null if the value is null.") { + val bf = BloomFilter.create(10, 0.01) + val bfData = Literal( + BloomFilterEncoderProvider.defaultEncoder.encode(bf), + BloomFilterEncoderProvider.defaultEncoder.dataType) + checkEvaluation(BloomFilterMightContain(bfData, Literal(null, IntegerType)), null) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterUtilsTest.scala new file mode 100644 index 000000000..f67850262 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/BloomFilterUtilsTest.scala @@ -0,0 +1,149 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.HyperspaceSuite + +class BloomFilterUtilsTest extends HyperspaceSuite { + + def testPut(value: Any, dataType: DataType): Unit = { + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, dataType) + val expected = BloomFilter.create(100, 0.01) + expected.put(value) + assert(bf === expected) + } + + test("put: long") { + testPut(10L, LongType) + } + + test("put: int") { + testPut(10, IntegerType) + } + + test("put: byte") { + testPut(10.toByte, ByteType) + } + + test("put: short") { + testPut(10.toShort, ShortType) + } + + test("put: string") { + val value = UTF8String.fromString("hello") + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, StringType) + val expected = BloomFilter.create(100, 0.01) + expected.put(value.getBytes) + assert(bf === expected) + } + + test("put: binary") { + testPut(Array[Byte](1, 2, 3, 4), BinaryType) + } + + test("put throws an exception for unsupported types.") { + val ex = intercept[HyperspaceException](testPut(3.14, DoubleType)) + assert(ex.msg.contains("BloomFilter does not support DoubleType")) + } + + def testMightContain(value: Any, value2: Any, dataType: DataType): Unit = { + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, dataType) + assert(BloomFilterUtils.mightContain(bf, value, dataType) === bf.mightContain(value)) + assert(BloomFilterUtils.mightContain(bf, value2, dataType) === bf.mightContain(value2)) + } + + test("mightContain: int") { + testMightContain(1, 0, IntegerType) + } + + test("mightContain: long") { + testMightContain(1L, 0L, LongType) + } + + test("mightContain: byte") { + testMightContain(1.toByte, 0.toByte, ByteType) + } + + test("mightContain: short") { + testMightContain(1.toShort, 0.toShort, ShortType) + } + + test("mightContain: string") { + val value = UTF8String.fromString("hello") + val value2 = UTF8String.fromString("world") + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, StringType) + assert( + BloomFilterUtils.mightContain(bf, value, StringType) === bf.mightContain(value.getBytes)) + assert( + BloomFilterUtils.mightContain(bf, value2, StringType) === bf.mightContain(value2.getBytes)) + } + + test("mightContain: binary") { + testMightContain(Array[Byte](1, 2), Array[Byte](3, 4), BinaryType) + } + + test("mightContain throws an exception for unsupported types.") { + val bf = BloomFilter.create(100, 0.01) + val ex = intercept[HyperspaceException](BloomFilterUtils.mightContain(bf, 3.14, DoubleType)) + assert(ex.msg.contains("BloomFilter does not support DoubleType")) + } + + test("mightContainCodegen: int") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", IntegerType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: long") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", LongType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: byte") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", ByteType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: short") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", ShortType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: string") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", StringType) + assert(code === "fb.mightContainBinary((vl).getBytes())") + } + + test("mightContainCodegen: binary") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", BinaryType) + assert(code === "fb.mightContainBinary(vl)") + } + + test("mightContainCodegen throws an exception for unsupported types.") { + val ex = + intercept[HyperspaceException](BloomFilterUtils.mightContainCodegen("fb", "vl", DoubleType)) + assert(ex.msg.contains("BloomFilter does not support DoubleType")) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/FastBloomFilterEncoderTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/FastBloomFilterEncoderTest.scala new file mode 100644 index 000000000..08a030dd6 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/FastBloomFilterEncoderTest.scala @@ -0,0 +1,38 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite + +class FastBloomFilterEncoderTest extends HyperspaceSuite { + test("encode and decode restores empty bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + val data = FastBloomFilterEncoder.encode(bf) + val bf2 = FastBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } + + test("encode and decode restores the original bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + bf.put(42) + val data = FastBloomFilterEncoder.encode(bf) + val bf2 = FastBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/StreamBloomFilterEncoderTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/StreamBloomFilterEncoderTest.scala new file mode 100644 index 000000000..b549285ca --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/StreamBloomFilterEncoderTest.scala @@ -0,0 +1,38 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite + +class StreamBloomFilterEncoderTest extends HyperspaceSuite { + test("encode and decode restores empty bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + val data = StreamBloomFilterEncoder.encode(bf) + val bf2 = StreamBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } + + test("encode and decode restores the original bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + bf.put(42) + val data = StreamBloomFilterEncoder.encode(bf) + val bf2 = StreamBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala index 98dca4f18..9175c766c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala @@ -237,6 +237,27 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataI, "!(A < 20)", MinMaxSketch("A"), 8), Param(dataI, "not (A not in (1, 2, 3))", MinMaxSketch("A"), 1), Param(dataS, "A < 'foo'", MinMaxSketch("A"), 1), + Param(dataS, "A in ('foo1', 'foo5', 'foo9')", BloomFilterSketch("A", 0.01, 10), 3), + Param( + dataS, + "A in ('foo1','goo1','hoo1','i1','j','k','l','m','n','o','p')", + BloomFilterSketch("A", 0.01, 10), + 1), + Param(dataI, "A = 10", BloomFilterSketch("A", 0.01, 10), 1), + Param(dataI, "A <=> 20", BloomFilterSketch("A", 0.01, 10), 1), + Param(dataI, "A <=> null", BloomFilterSketch("A", 0.01, 10), 10), + Param(dataI, "A in (2, 3, 5, 7, 11, 13, 17, 19)", BloomFilterSketch("A", 0.001, 10), 2), + Param( + dataI, + "A in (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)", + BloomFilterSketch("A", 0.001, 10), + 3), + Param( + dataIN, + "A in (0,1,10,100,1000,10000,100000,1000000,-1,-2,-3,-4,-5,-6,-7,-8,null)", + BloomFilterSketch("A", 0.001, 10), + 1), + Param(dataI, "A != 10", BloomFilterSketch("A", 0.001, 10), 10), Param(dataI, "a = 10", MinMaxSketch("A"), 1), Param(dataI, "A = 10", MinMaxSketch("a"), 1), Param(dataI, "A in (1, 2, 3, null, 10)", MinMaxSketch("A"), 2), @@ -265,6 +286,11 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataII, "A < 30 and B > 20", MinMaxSketch("A"), 3), Param(dataII, "A < 30 and b > 40", Seq(MinMaxSketch("a"), MinMaxSketch("B")), 1), Param(dataII, "A = 10 and B = 90", Seq(MinMaxSketch("A"), MinMaxSketch("B")), 0), + Param( + dataII, + "A < 31 and B in (1, 2, 11, 12, 21, 22)", + Seq(MinMaxSketch("A"), BloomFilterSketch("B", 0.001, 10)), + 2), Param(dataIN, "A is not null", MinMaxSketch("A"), 7), Param(dataIN, "!(A <=> null)", MinMaxSketch("A"), 7), Param(dataIN, "A = 2", MinMaxSketch("A"), 1), diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/BloomFilterSketchTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/BloomFilterSketchTest.scala new file mode 100644 index 000000000..9a4f74681 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/BloomFilterSketchTest.scala @@ -0,0 +1,162 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketches + +import org.apache.spark.sql.{Column, QueryTest} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.BloomFilterTestUtils +import com.microsoft.hyperspace.index.dataskipping.expressions._ + +class BloomFilterSketchTest extends QueryTest with HyperspaceSuite with BloomFilterTestUtils { + import spark.implicits._ + + val valueExtractor = AttrValueExtractor(Map.empty) + + test("indexedColumns returns the indexed column.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + assert(sketch.indexedColumns === Seq("A")) + } + + test("referencedColumns returns the indexed column.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + assert(sketch.referencedColumns === Seq("A")) + } + + test( + "aggregateFunctions returns an aggregation function that collects values in a bloom filter.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val aggrs = sketch.aggregateFunctions.map(f => new Column(f.toAggregateExpression)) + assert(aggrs.length === 1) + val data = Seq(1, -1, 10, 2, 4, 2, 0, 10) + val bf = BloomFilter.create(100, 0.01) + data.foreach(bf.put) + val bfData = data.toDF("A").select(aggrs.head).collect()(0).getAs[Any](0) + assert(bfData === encodeExternal(bf)) + } + + test("toString returns a reasonable string.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + assert(sketch.toString === "BloomFilter(A, 0.01, 100)") + } + + test("Two sketches are equal if their columns are equal.") { + assert(BloomFilterSketch("A", 0.01, 100) === BloomFilterSketch("A", 0.001, 1000)) + assert(BloomFilterSketch("A", 0.01, 100) !== BloomFilterSketch("a", 0.01, 100)) + assert(BloomFilterSketch("b", 0.01, 100) !== BloomFilterSketch("B", 0.01, 100)) + assert(BloomFilterSketch("B", 0.01, 100) === BloomFilterSketch("B", 0.001, 1000)) + } + + test("hashCode is reasonably implemented.") { + assert( + BloomFilterSketch("A", 0.01, 100).hashCode === BloomFilterSketch("A", 0.001, 1000).hashCode) + assert( + BloomFilterSketch("A", 0.01, 100).hashCode !== BloomFilterSketch("a", 0.001, 1000).hashCode) + } + + test("covertPredicate converts EqualTo.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val nameMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId)), + sketchValues, + nameMap, + valueExtractor) + val expected = Some(BloomFilterMightContain(sketchValues(0), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts EqualTo - string type.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = + EqualTo(AttributeReference("A", StringType)(ExprId(0)), Literal.create("hello", StringType)) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val nameMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId)), + sketchValues, + nameMap, + valueExtractor) + val expected = + Some(BloomFilterMightContain(sketchValues(0), Literal.create("hello", StringType))) + assert(result === expected) + } + + test("covertPredicate converts In.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = + In(AttributeReference("A", IntegerType)(ExprId(0)), Seq(Literal(42), Literal(23))) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val nameMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId)), + sketchValues, + nameMap, + valueExtractor) + val expected = Some( + Or( + BloomFilterMightContain(sketchValues(0), Literal(42)), + BloomFilterMightContain(sketchValues(0), Literal(23)))) + assert(result === expected) + } + + test("covertPredicate converts In - string type.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = + In( + AttributeReference("A", StringType)(ExprId(0)), + Seq(Literal.create("hello", StringType), Literal.create("world", StringType))) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val nameMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId)), + sketchValues, + nameMap, + valueExtractor) + val expected = Some( + Or( + BloomFilterMightContain(sketchValues(0), Literal.create("hello", StringType)), + BloomFilterMightContain(sketchValues(0), Literal.create("world", StringType)))) + assert(result === expected) + } + + test("covertPredicate does not convert Not(EqualTo(, )).") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = Not(EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42))) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val nameMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId)), + sketchValues, + nameMap, + valueExtractor) + val expected = None + assert(result === expected) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketchTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketchTest.scala index 7962a9990..e52b3e0e9 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketchTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/MinMaxSketchTest.scala @@ -42,7 +42,7 @@ class MinMaxSketchTest extends QueryTest with HyperspaceSuite { test("aggregateFunctions returns min and max aggregation functions.") { val sketch = MinMaxSketch("A") - val aggrs = sketch.aggregateFunctions.map(new Column(_)) + val aggrs = sketch.aggregateFunctions.map(f => new Column(f.toAggregateExpression)) val data = Seq(1, -1, 10, 2, 4).toDF("A") checkAnswer(data.select(aggrs: _*), Seq((-1, 10)).toDF) } diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketchTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketchTest.scala index ecc3e9b7d..3671699c4 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketchTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/PartitionSketchTest.scala @@ -40,7 +40,7 @@ class PartitionSketchTest extends QueryTest with HyperspaceSuite { test("aggregateFunctions returns first aggregation function.") { val sketch = PartitionSketch(Seq(("A", None))) - val aggrs = sketch.aggregateFunctions.map(new Column(_)) + val aggrs = sketch.aggregateFunctions.map(f => new Column(f.toAggregateExpression)) val data = Seq(1, 1, 1, 1, 1).toDF("A") checkAnswer(data.select(aggrs: _*), Seq(1).toDF) }