Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Data Skipping Index Part 4: BloomFilterSketch
Browse files Browse the repository at this point in the history
Implement BloomFilterSketch.
  • Loading branch information
Chungmin Lee committed Sep 1, 2021
1 parent 9735b57 commit 0eec377
Show file tree
Hide file tree
Showing 25 changed files with 1,363 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.hyperspace.utils.StructTypeUtils
Expand Down Expand Up @@ -151,7 +152,7 @@ case class DataSkippingIndex(
// by combining individual index predicates with And.
// True is returned if there are no index predicates for the source predicate node.
def toIndexPred(sourcePred: Expression): Expression = {
predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(Literal.TrueLiteral)
predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(TrueLiteral)
}

// Compose an index predicate visiting the source predicate tree recursively.
Expand All @@ -168,15 +169,15 @@ case class DataSkippingIndex(
// This is a trimmed down version of the BooleanSimplification rule.
// It's just enough to determine whether the index is applicable or not.
val optimizePredicate: PartialFunction[Expression, Expression] = {
case And(Literal.TrueLiteral, right) => right
case And(left, Literal.TrueLiteral) => left
case Or(Literal.TrueLiteral, _) => Literal.TrueLiteral
case Or(_, Literal.TrueLiteral) => Literal.TrueLiteral
case And(TrueLiteral, right) => right
case And(left, TrueLiteral) => left
case Or(TrueLiteral, _) => TrueLiteral
case Or(_, TrueLiteral) => TrueLiteral
}
val optimizedIndexPredicate = indexPredicate.transformUp(optimizePredicate)

// Return None if the index predicate is True - meaning no conversion can be done.
if (optimizedIndexPredicate == Literal.TrueLiteral) {
if (optimizedIndexPredicate == TrueLiteral) {
None
} else {
Some(optimizedIndexPredicate)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.expressions

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.sketch.BloomFilter

/**
* Aggregation function that collects elements in a bloom filter.
*/
private[dataskipping] case class BloomFilterAgg(
child: Expression,
expectedNumItems: Long,
fpp: Double,
override val mutableAggBufferOffset: Int = 0,
override val inputAggBufferOffset: Int = 0)
extends TypedImperativeAggregate[BloomFilter] {

private def bloomFilterEncoder = BloomFilterEncoderProvider.defaultEncoder

override def prettyName: String = "bloom_filter"

override def dataType: DataType = bloomFilterEncoder.dataType

override def nullable: Boolean = false

override def children: Seq[Expression] = Seq(child)

override def createAggregationBuffer(): BloomFilter = {
BloomFilter.create(expectedNumItems, fpp)
}

override def update(buffer: BloomFilter, input: InternalRow): BloomFilter = {
val value = child.eval(input)
if (value != null) {
BloomFilterUtils.put(buffer, value, child.dataType)
}
buffer
}

override def merge(buffer: BloomFilter, input: BloomFilter): BloomFilter = {
buffer.mergeInPlace(input)
buffer
}

override def eval(buffer: BloomFilter): Any = bloomFilterEncoder.encode(buffer)

override def serialize(buffer: BloomFilter): Array[Byte] = {
val out = new ByteArrayOutputStream()
buffer.writeTo(out)
out.toByteArray
}

override def deserialize(bytes: Array[Byte]): BloomFilter = {
val in = new ByteArrayInputStream(bytes)
BloomFilter.readFrom(in)
}

override def withNewMutableAggBufferOffset(newOffset: Int): BloomFilterAgg =
copy(mutableAggBufferOffset = newOffset)

override def withNewInputAggBufferOffset(newOffset: Int): BloomFilterAgg =
copy(inputAggBufferOffset = newOffset)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.expressions

import org.apache.spark.sql.types.DataType
import org.apache.spark.util.sketch.BloomFilter

/**
* Defines how [[BloomFilter]] should be represented in the Spark DataFrame.
*/
trait BloomFilterEncoder {

/**
* Returns the data type of the value in the DataFrame representing [[BloomFilter]].
*/
def dataType: DataType

/**
* Returns a value representing the given [[BloomFilter]]
* that can be put in the [[InternalRow]].
*/
def encode(bf: BloomFilter): Any

/**
* Returns a [[BloomFilter]] from the value in the DataFrame.
*/
def decode(value: Any): BloomFilter
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.expressions

/**
* Provides the default implementation of [[BloomFilterEncoder]].
*/
object BloomFilterEncoderProvider {

/**
* Returns the default encoder.
*
* It should return a singleton object declared as "object".
*/
def defaultEncoder: BloomFilterEncoder = FastBloomFilterEncoder
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._

/**
* Returns true if the bloom filter (left) might contain the value (right).
*
* If the value (right) is null, null is returned.
*
* The bloom filter must not be null.
*/
private[dataskipping] case class BloomFilterMightContain(left: Expression, right: Expression)
extends BinaryExpression
with Predicate {

override def prettyName: String = "bloom_filter_might_contain"

override def nullable: Boolean = true

override def eval(input: InternalRow): Any = {
val value = right.eval(input)
if (value != null) {
val bfData = left.eval(input)
val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData)
return BloomFilterUtils.mightContain(bf, value, right.dataType)
}
null
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val leftGen = left.genCode(ctx)
val rightGen = right.genCode(ctx)
val bloomFilterEncoder =
BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$")
val bf = s"$bloomFilterEncoder.decode(${leftGen.value})"
val result = BloomFilterUtils.mightContainCodegen(bf, rightGen.value, right.dataType)
val resultCode =
s"""
|if (!(${rightGen.isNull})) {
| ${leftGen.code}
| ${ev.isNull} = false;
| ${ev.value} = $result;
|}
""".stripMargin
ev.copy(code = code"""
${rightGen.code}
boolean ${ev.isNull} = true;
boolean ${ev.value} = false;
$resultCode""")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.sketch.BloomFilter

/**
* Returns true if the bloom filter (child) might contain one of the values.
*
* The bloom filter must not be null.
* The values must be an array without nulls.
* If the element type can be represented as a primitive type in Scala,
* then the array must be an array of the primitive type.
*/
private[dataskipping] case class BloomFilterMightContainAny(
child: Expression,
values: Any,
elementType: DataType)
extends UnaryExpression
with Predicate {

override def prettyName: String = "bloom_filter_might_contain_any"

override def nullable: Boolean = false

override def eval(input: InternalRow): Boolean = {
val bfData = child.eval(input)
val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData)
values
.asInstanceOf[Array[_]]
.exists(BloomFilterUtils.mightContain(bf, _, elementType))
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx);
val bloomFilterEncoder =
BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$")
val bf = ctx.freshName("bf")
val bfType = classOf[BloomFilter].getCanonicalName
val javaType = CodeGenerator.javaType(elementType)
val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]"
val valuesRef = ctx.addReferenceObj("values", values, arrayType)
val valuesArray = ctx.freshName("values")
val i = ctx.freshName("i")
val mightContain =
BloomFilterUtils.mightContainCodegen(bf, s"($javaType) $valuesArray[$i]", elementType)
val resultCode =
s"""
|$bfType $bf = $bloomFilterEncoder.decode(${childGen.value});
|$arrayType $valuesArray = $valuesRef;
|for (int $i = 0; $i < $valuesArray.length; $i++) {
| if ($mightContain) {
| ${ev.value} = true;
| break;
| }
|}
""".stripMargin
ev.copy(
code = code"""
${childGen.code}
boolean ${ev.value} = false;
$resultCode""",
isNull = FalseLiteral)
}

override def equals(that: Any): Boolean = {
that match {
case BloomFilterMightContainAny(thatChild, thatValues, thatElementType) =>
child == thatChild &&
values.asInstanceOf[Array[_]].sameElements(thatValues.asInstanceOf[Array[_]]) &&
elementType == thatElementType
case _ => false
}
}

override def hashCode: Int = {
(child, values.asInstanceOf[Array[_]].toSeq, elementType).hashCode
}
}
Loading

0 comments on commit 0eec377

Please sign in to comment.