diff --git a/src/main/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderField.scala b/src/main/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderField.scala new file mode 100644 index 000000000..fb78697df --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderField.scala @@ -0,0 +1,569 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.zordercovering + +import java.util.BitSet + +import org.apache.spark.sql.catalyst.util +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace.HyperspaceException + +abstract class ZOrderField extends Serializable { + val name: String + val bitLen: Int + def getBitSet(valueAny: Any): BitSet + def throwOutOfRangeException(minVal: Any, maxVal: Any, value: Any): Unit = { + throw HyperspaceException( + "Unexpected operation on ZOrderField. Value out of range: " + + s"name=$name, value=$value, min=$minVal, max=$maxVal") + } + + protected val emptyBitSet = new BitSet + protected def checkEmptySetEligible(valueAny: Any): Boolean = { + bitLen == 0 || valueAny == null + } +} + +/** + * ZOrderField for numeric types based on percentile values of the original column values. + * + * If the data is skewed, using the original value for z-address cannot significantly affect + * the final z-order thus the column values might not be distributed well compared to other + * evenly distributed z-order columns. To mitigate the issue, we use the percentile based + * bucket index for z-address. + * + * We assign a bucket index (0 to numDistBuckets - 1) for each value based on its percentile + * and use the index value for z-address. Unlike bucket sorting, all values in a bucket will + * just have the same bucket index. + * + * Steps to calculate the bucket index: + * 1) define range subsets based on the given approximate percentiles. + * 2) get the range subset index that a value is belong to. + * 3) get the ratio where the value is located in the range. + * 4) return the bucket id using subset index and the ratio. + * + * The following example would help to understand why using the approx percentiles. + * Assume we have double values of + * [-10, -9.9, -9.8, -9.7, -1, 0, 1000, 1500, 2000, 3000, 10000, 20000] + * - Min: -10 + * - 25% percentile: -9.8 + * - 50% percentile: 0 + * - 75% percentile: 2000 + * - Max: 20000 + * + * With the info, value 10000 is belong to the 4th group, [2000, 20000], so we interpolate it as + * (10000 - 2000) / (20000 - 2000) * 0.25 + 0.75 = 0.44 * 0.25 + 0.75 = 0.86 and give + * the corresponding index by 0.86 * numDistBuckets. If we used only min/max, then we would end up + * with (10000 - (-10)) / (20000 - (-10)) * 1.0 = 0.5, which may cause worse result ordering of the + * column values. + * + * In order to figure out the result ordering along with the index distribution, consider + * ColA => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + * ColB => [1, 2, 3, 4, 5, 1025, 1026, 1027, 1028, 1029] + * When z-ordering by colA and colB, 1025 to 1029 values of ColB cannot affect the order. + * Since their upper bits (0b10000000XXX) are the same while ColA values have all different + * bits. As Z-ordering defines the order by interleaving bit sequence of z-addresses, + * ColA would have better result ordering than ColB. + */ +trait PercentileBasedZOrderField[T] extends ZOrderField { + // Use Seq[Double] as Spark's approxQuantile function returns the result in double type + // regardless of the original numeric type. + val rawPercentile: Seq[Double] // [minVal, approx_percentile1, approx_percentile2, ..., maxVal] + def toDouble(v: T): Double + + // The function is not idempotent. + private def normalizeVal(d: Double): Double = { + val v = d match { + case Double.PositiveInfinity => + Double.MaxValue + case Double.NegativeInfinity => + Double.MinValue + case Double.NaN => + Double.MaxValue + case -0.0 => + 0.0 + case _ => + d + } + // Use / 2.0 value since we need to calculate length of range between two doubles. + // If maxVal is Double.MaxValue and minVal is negative, the length will be inf, + // so we cannot calculate the percentile of a value properly. + v / 2.0 + } + + val numDistBuckets: Int + protected val percentile = rawPercentile.map(normalizeVal) + private val lengthOfPercentile = + percentile.zipWithIndex.tail.map(vi => vi._1 - percentile(vi._2 - 1)) + + override lazy val bitLen: Int = { + if (rawPercentile.head == rawPercentile.last) { + 0 + } else { + (numDistBuckets - 1).toBinaryString.length + } + } + + private def getPercentileIdx(value: Double): Int = { + percentile.tail.indexWhere(p => value <= p) + } + + // Return the index of buckets that rawValue is belong to. + private def getBucketId(value: Double): Long = { + val pIdx = getPercentileIdx(value) + + val diffInPercentile = value - percentile(pIdx) // value - minValInPercentile + val ratioInPercentile = diffInPercentile / lengthOfPercentile(pIdx) + val globalPercentile = (pIdx.toDouble + ratioInPercentile) / (percentile.length - 1) + (globalPercentile * numDistBuckets).toLong.min(numDistBuckets - 1) + } + + override def getBitSet(valueAny: Any): BitSet = { + if (checkEmptySetEligible(valueAny)) { + emptyBitSet + } else { + val value = valueAny.asInstanceOf[T] + // Since Long.toDouble might lose some precision, we don't check the value range here. + // Instead take min/max to calculate the bucket id properly. + val doubleValue = normalizeVal(toDouble(value)).max(percentile.head).min(percentile.last) + BitSet.valueOf(Array(getBucketId(doubleValue))) + } + } +} + +/** + * Min/max based z-address calculator for Integer type. + * + * Get z-address bits using (value - minValue), after casting to Long. + * In this way, we could achieve + * 1) smaller value means less number of bits to represent. + * 2) (value - min) can't be negative value which requires special handling. + * 3) Remove meaningless upper bits which can make a better result ordering. + * + * For 3), the following example would help to understand: + * ColA => [1, 2, 3, 4, 5, 6, 7] + * ColB => [1024, 1026, 1027, 1028, 1029, 1030, 1031] + * + * If we use the original integer value for z-address, 1024 to 1031 values of ColB cannot + * affect the order. Since their upper bits (0b1000000XXXX) are the same while ColA values + * have all different bits. As a result, ColA would have better result ordering than ColB + * because Z-ordering defines the order by interleaving bit sequence of z-addresses. + * The result cannot satisfy the purpose of z-ordering; the data of both columns are clustered + * in some degree, so that we could skip uninterested data for the both columns. + * + * Using (value - min) can mitigate the issue: + * ColA => [0, 1, 2, 3, 4, 5, 6] + * ColB => [0, 2, 3, 4, 5, 6, 7] + * + * Now both column don't have the meaningless bits, they can contribute the order fairly. + */ +trait IntegerMinMaxBasedZOrderField[T] extends ZOrderField { + val minVal: T + val maxVal: T + implicit val n: Numeric[T] + + override lazy val bitLen: Int = { + if (minVal == maxVal) { + 0 + } else { + (n.toLong(maxVal) - n.toLong(minVal)).toBinaryString.length + } + } + private lazy val minLong = n.toLong(minVal) + + override def getBitSet(valueAny: Any): BitSet = { + if (checkEmptySetEligible(valueAny)) { + emptyBitSet + } else { + val value = valueAny.asInstanceOf[T] + if (n.lt(value, minVal) || n.gt(value, maxVal)) { + throwOutOfRangeException(minVal, maxVal, value) + } + BitSet.valueOf(Array(n.toLong(value) - minLong)) + } + } +} + +/** + * For Integer types (Long, Int, Short, Byte), use (value - minValue) for z-address. + * See [[IntegerMinMaxBasedZOrderField]] for detail. + */ +case class LongMinMaxZOrderField(override val name: String, minVal: Long, maxVal: Long)(implicit + val n: Numeric[Long]) + extends IntegerMinMaxBasedZOrderField[Long] {} + +case class IntMinMaxZOrderField(override val name: String, minVal: Int, maxVal: Int)(implicit + val n: Numeric[Int]) + extends IntegerMinMaxBasedZOrderField[Int] {} + +case class ShortMinMaxZOrderField(override val name: String, minVal: Short, maxVal: Short)( + implicit val n: Numeric[Short]) + extends IntegerMinMaxBasedZOrderField[Short] {} + +case class ByteMinMaxZOrderField(override val name: String, minVal: Byte, maxVal: Byte)(implicit + val n: Numeric[Byte]) + extends IntegerMinMaxBasedZOrderField[Byte] {} + +/** + * For Numeric types (Long, Int, Short, Byte, Double, Float), if optimize.zorder.quantile.enabled + * is true, divide the values into buckets based on approximate percentiles. + * Use the bucket idx for z-address for better distribution in case of skewed data. + */ +case class LongPercentileZOrderField(override val name: String, rawPercentile: Seq[Double]) + extends PercentileBasedZOrderField[Long] { + override def toDouble(n: Long): Double = n.doubleValue() + // Set the max of bucket number as (1L << 15) to reduce the length of z-address. + // We need to select a number that is larger than the total number of files or parquet row groups. + // It could be a larger value, but it doesn't make much difference on the quality of result. + private val defaultNumDistBuckets = 1 << 15 + override lazy val numDistBuckets = { + // min(32768, maxVal - minVal + 1) + (percentile.last - percentile.head + 1).toLong.min(defaultNumDistBuckets).toInt + } +} + +case class IntPercentileZOrderField(override val name: String, rawPercentile: Seq[Double]) + extends PercentileBasedZOrderField[Int] { + override def toDouble(n: Int): Double = n.doubleValue() + private val defaultNumDistBuckets = 1 << 14 + override lazy val numDistBuckets = { + // min(16384, maxVal - minVal + 1) + (percentile.last - percentile.head + 1).toLong.min(defaultNumDistBuckets).toInt + } +} + +case class ShortPercentileZOrderField(override val name: String, rawPercentile: Seq[Double]) + extends PercentileBasedZOrderField[Short] { + override def toDouble(n: Short): Double = n.doubleValue() + private val defaultNumDistBuckets = 1 << 10 + override lazy val numDistBuckets = { + // min(2048, maxVal - minVal + 1) + (percentile.last - percentile.head + 1).toInt.min(defaultNumDistBuckets) + } +} + +case class BytePercentileZOrderField(override val name: String, rawPercentile: Seq[Double]) + extends PercentileBasedZOrderField[Byte] { + override def toDouble(n: Byte): Double = n.doubleValue() + private val defaultNumDistBuckets = 1 << 5 + override lazy val numDistBuckets = { + // min(64, maxVal - minVal + 1) + (percentile.last.toByte - percentile.head.toByte + 1).min(defaultNumDistBuckets) + } +} + +case class DoublePercentileZOrderField(override val name: String, rawPercentile: Seq[Double]) + extends PercentileBasedZOrderField[Double] { + private val defaultNumDistBuckets = 1 << 15 + override def toDouble(n: Double): Double = n.doubleValue() + // For Double, do not take (maxVal - minVal + 1) as it can be less than 1. + override lazy val numDistBuckets = defaultNumDistBuckets +} + +case class FloatPercentileZOrderField(override val name: String, rawPercentile: Seq[Double]) + extends PercentileBasedZOrderField[Float] { + override def toDouble(n: Float): Double = n.doubleValue() + private val defaultNumDistBuckets = 1 << 14 + // For Float, do not take (maxVal - minVal + 1) as it can be less than 1. + override val numDistBuckets = defaultNumDistBuckets +} + +/** + * For Boolean, use 1 bit for true or false. + */ +case class BooleanZOrderField(override val name: String, minVal: Boolean, maxVal: Boolean) + extends ZOrderField { + override val bitLen: Int = { + 1 + } + + private val one = { + val b = new BitSet() + b.set(0) + b + } + + override def getBitSet(valueAny: Any): BitSet = { + if (checkEmptySetEligible(valueAny)) { + emptyBitSet + } else { + val value = valueAny.asInstanceOf[Boolean] + if (value) { + one + } else { + emptyBitSet + } + } + } +} + +/** + * For DecimalType, only use the unscaled value of (value - minValue) as + * assume that scale will be the same for all values. + */ +case class DecimalZOrderField( + override val name: String, + minVal: java.math.BigDecimal, + maxVal: java.math.BigDecimal) + extends ZOrderField { + + override val bitLen: Int = { + maxVal.subtract(minVal).unscaledValue.bitLength() + } + + override def getBitSet(valueAny: Any): BitSet = { + if (checkEmptySetEligible(valueAny)) { + emptyBitSet + } else { + val value = valueAny.asInstanceOf[java.math.BigDecimal] + if (value.unscaledValue().compareTo(maxVal.unscaledValue()) == 1 || + value.unscaledValue().compareTo(minVal.unscaledValue()) == -1) { + throwOutOfRangeException(minVal, maxVal, value) + } + BitSet.valueOf(value.subtract(minVal).unscaledValue().toByteArray.reverse) + } + } +} + +/** + * For String, + * 1) get first diff character between minVal and maxVal. + * 2) from that char, use 4 char to construct z-address. + * 3) keep the first diff char of minVal, so that we can remove + * the same upper bit sequence efficiently. + */ +case class StringZOrderField(override val name: String, minVal: String, maxVal: String) + extends ZOrderField { + + // Find first diff index between minVal and maxVal. + val firstDiffIdx = minVal.zip(maxVal).takeWhile(Function.tupled(_ == _)).length + + // Compare 4 chars from firstDiffIdx. + val byteLen: Int = 4 + + val minFirstDiff: Byte = if (minVal.length == firstDiffIdx) { + 0.toByte + } else { + minVal(firstDiffIdx).toByte + } + + override val bitLen: Int = { + // TODO Add a config for byteLen to construct z-address. If all other values + // except for maxVal begins with "asdf" and maxVal is "zoo", the result layout + // cannot be affected by the order of the string values. + + if (maxVal.equals(minVal)) { + 0 + } else { + (maxVal(firstDiffIdx).toByte - minFirstDiff).toBinaryString.length + ((byteLen - 1) * 8) + } + } + + override def getBitSet(valueAny: Any): BitSet = { + if (checkEmptySetEligible(valueAny)) { + emptyBitSet + } else { + val valueStr = valueAny.asInstanceOf[String] + + if (valueStr > maxVal || valueStr < minVal) { + throwOutOfRangeException(minVal, maxVal, valueStr) + } + val value = valueStr.substring(firstDiffIdx.min(valueStr.length)) + val len = math.min(byteLen, value.length) + val padArr = Seq.fill(byteLen - len) { 0.toByte } + // For the first diff char, use (the char - minFirstDiff) + // to remove same upper bit sequence. + val firstByte = if (value.nonEmpty) { + (value.head - minFirstDiff).toByte + } else { + 0.toByte + } + BitSet.valueOf((firstByte +: (value.slice(1, len).getBytes ++ padArr)).reverse) + } + } +} + +/** + * For Timestamp type, handle it as Long type. + * See [[LongMinMaxZOrderField]]. + * + * TODO: Support PercentileBasedZOrderField for Timestamp and DateType. + * Spark cannot calculate quantiles for non-numeric type, need to cast to Long type + * before collecting stats. + */ +case class TimestampMinMaxZOrderField( + override val name: String, + minVal: java.sql.Timestamp, + maxVal: java.sql.Timestamp) + extends ZOrderField { + + val longField = + LongMinMaxZOrderField( + name, + util.DateTimeUtils.fromJavaTimestamp(minVal), + util.DateTimeUtils.fromJavaTimestamp(maxVal)) + + override val bitLen: Int = { + longField.bitLen + } + + override def getBitSet(valueAny: Any): BitSet = { + if (checkEmptySetEligible(valueAny)) { + emptyBitSet + } else { + val value = valueAny.asInstanceOf[java.sql.Timestamp] + if (value.compareTo(minVal) < 0 || value.compareTo(maxVal) > 0) { + throwOutOfRangeException(minVal, maxVal, value) + } + val valueTime = util.DateTimeUtils.fromJavaTimestamp(value) + longField.getBitSet(valueTime) + } + } +} + +/** + * For Date type, handle it as Int type after converting it as the number of days. + * See [[IntZOrderField]]. + */ +case class DateMinMaxZOrderField( + override val name: String, + minVal: java.sql.Date, + maxVal: java.sql.Date) + extends ZOrderField { + + val intField = + IntMinMaxZOrderField( + name, + util.DateTimeUtils.fromJavaDate(minVal), + util.DateTimeUtils.fromJavaDate(maxVal)) + + override val bitLen: Int = { + intField.bitLen + } + + override def getBitSet(valueAny: Any): BitSet = { + if (checkEmptySetEligible(valueAny)) { + emptyBitSet + } else { + val value = valueAny.asInstanceOf[java.sql.Date] + if (value.compareTo(minVal) < 0 || value.compareTo(maxVal) > 0) { + throwOutOfRangeException(minVal, maxVal, value) + } + val valueDay = util.DateTimeUtils.fromJavaDate(value) + intField.getBitSet(valueDay) + } + } +} + +object ZOrderField { + def build( + name: String, + dataType: DataType, + minVal: Any, + maxVal: Any, + quantiles: Seq[Any] = Nil, + quantileEnabled: Boolean = false): ZOrderField = { + dataType match { + case LongType => + if (quantileEnabled) { + LongPercentileZOrderField( + name, + (minVal +: quantiles :+ maxVal).asInstanceOf[Seq[Double]]) + } else { + LongMinMaxZOrderField(name, minVal.asInstanceOf[Long], maxVal.asInstanceOf[Long]) + } + case IntegerType => + if (quantileEnabled) { + IntPercentileZOrderField( + name, + (minVal +: quantiles :+ maxVal).asInstanceOf[Seq[Double]]) + } else { + IntMinMaxZOrderField(name, minVal.asInstanceOf[Int], maxVal.asInstanceOf[Int]) + } + case ShortType => + if (quantileEnabled) { + ShortPercentileZOrderField( + name, + (minVal +: quantiles :+ maxVal).asInstanceOf[Seq[Double]]) + } else { + ShortMinMaxZOrderField(name, minVal.asInstanceOf[Short], maxVal.asInstanceOf[Short]) + } + case ByteType => + if (quantileEnabled) { + BytePercentileZOrderField( + name, + (minVal +: quantiles :+ maxVal).asInstanceOf[Seq[Double]]) + } else { + ByteMinMaxZOrderField(name, minVal.asInstanceOf[Byte], maxVal.asInstanceOf[Byte]) + } + case DoubleType => + if (quantileEnabled) { + DoublePercentileZOrderField( + name, + (minVal +: quantiles :+ maxVal).asInstanceOf[Seq[Double]]) + } else { + DoublePercentileZOrderField( + name, + Seq(minVal.asInstanceOf[Double], maxVal.asInstanceOf[Double])) + } + case FloatType => + // minVal, maxVal can be Float. + val (minValue, maxValue) = if (minVal.isInstanceOf[Float]) { + (minVal.asInstanceOf[Float].toDouble, maxVal.asInstanceOf[Float].toDouble) + } else { + (minVal.asInstanceOf[Double], maxVal.asInstanceOf[Double]) + } + if (quantileEnabled) { + FloatPercentileZOrderField( + name, + (minValue +: quantiles :+ maxValue).asInstanceOf[Seq[Double]]) + } else { + FloatPercentileZOrderField(name, Seq(minValue, maxValue)) + } + case DecimalType() => + DecimalZOrderField( + name, + minVal.asInstanceOf[java.math.BigDecimal], + maxVal.asInstanceOf[java.math.BigDecimal]) + case StringType => + StringZOrderField(name, minVal.asInstanceOf[String], maxVal.asInstanceOf[String]) + case BooleanType => + BooleanZOrderField(name, minVal.asInstanceOf[Boolean], maxVal.asInstanceOf[Boolean]) + case TimestampType => + TimestampMinMaxZOrderField( + name, + minVal.asInstanceOf[java.sql.Timestamp], + maxVal.asInstanceOf[java.sql.Timestamp]) + case DateType => + DateMinMaxZOrderField( + name, + minVal.asInstanceOf[java.sql.Date], + maxVal.asInstanceOf[java.sql.Date]) + case _ => + throw HyperspaceException("Unsupported data type: " + maxVal.getClass) + } + } + + /** + * Return true for numeric type. + */ + def percentileApplicableType(dataType: DataType): Boolean = { + Seq(DoubleType, FloatType, LongType, IntegerType, ShortType, ByteType) + .contains(dataType) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderUDF.scala b/src/main/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderUDF.scala new file mode 100644 index 000000000..da12d98e4 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderUDF.scala @@ -0,0 +1,101 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.zordercovering + +import java.util + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.udf + +/** + * Define UDF for generating z-address for each row based on zOrderStruct. + * + * @param zOrderStruct List of ZOrderField which should be constructed with + * proper min/max values for each field. + */ +case class ZOrderUDF(zOrderStruct: Seq[ZOrderField]) { + // TODO Convert UDF to spark Expression for better performance with code generation. + // Though it can reduce optimize time, leave it as a backlog as + // 1) it requires a lot of code work and + // 2) RepartitionByRange & Sorting cost outweighs z-address calculation time. + + val totalBitLen = zOrderStruct.map(_.bitLen).sum + + // The map of (column name -> bit indexes in z-address for each bit of column value) + // For example, (colB -> [8, 6, 4, 2]) stands for + // 1) use 4 bits of colB + // 2) each bit will be located at [8, 6, 4, 2]-th bit in z-address. + // + // As the bit length to represent z-order for each column can vary, + // use iterators for each length to implement interleaving assignment. + private val bitIdxMapsVal = { + var iterators = zOrderStruct.map(z => (z.name, (0 until z.bitLen).reverseIterator)).toMap + val idxMap = zOrderStruct.map(z => (z.name, ArrayBuffer[Int]())).toMap + // Assign bit index of z-address to each column value one by one, from highest bit. + // In this way, all z-order columns can contribute the order. + var idxInZAddress = totalBitLen - 1 + while (iterators.nonEmpty) { + iterators = iterators.filter(it => + if (it._2.hasNext) { + it._2.next + idxMap(it._1).append(idxInZAddress) + idxInZAddress = idxInZAddress - 1 + true + } else { + false + }) + } + // Explicitly calls map since mapValues uses iterator + // which makes spark tasks not serializable. + idxMap.map(kv => (kv._1, kv._2.reverseIterator.toArray)) + } + + // Calculate Z-address for each row. + // For each z-order column, + // 1) get a bit set to represent the order of the column values + // 2) set the bits in the final zAddressBitSet using the bit index mapping + // + // Use Array[Long] type rather than BinaryType to optimize comparison cost. + val zAddressUdf = udf { (row: Row) => + val zAddressBitSet = new util.BitSet(totalBitLen + 1) + // Set totalBitLen location for byte array comparison. + zAddressBitSet.set(totalBitLen) + zOrderStruct.indices.foreach { idx => + // Get bitset using the column value. + val z = zOrderStruct(idx) + if (!row.isNullAt(idx)) { + val zBitSet = z.getBitSet(row.getAs[Any](idx)) + // mapping from the bit index in zBitSet to the bit index in zAddress. + val idxMap = bitIdxMapsVal(z.name) + var fromIdx = 0 + while (fromIdx >= 0) { + val nextIdx = zBitSet.nextSetBit(fromIdx) + if (nextIdx >= 0) { + zAddressBitSet.set(idxMap(nextIdx)) + fromIdx = nextIdx + 1 + } else { + fromIdx = -1 + } + } + } + } + zAddressBitSet.toLongArray.reverse + } +} + diff --git a/src/test/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderFieldTest.scala b/src/test/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderFieldTest.scala new file mode 100644 index 000000000..5ad9c27d8 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderFieldTest.scala @@ -0,0 +1,1128 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.zordercovering + +import java.util + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace.HyperspaceException + +class ZOrderFieldTest extends SparkFunSuite { + + test("Test null values in ZOrderField.") { + { + val minVal = -11L + val maxVal = 10L + val result = ZOrderField.build("LongColumnName", LongType, minVal, maxVal) + assert(result.isInstanceOf[LongMinMaxZOrderField]) + val col = result.asInstanceOf[LongMinMaxZOrderField] + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 5) + + // null value is not considered as min or max value. + // So it's possible to get null value in getBitSet. + // ZOrderField should return empty bitset for null value. + val res = col.getBitSet(null).stream().toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + // If all values are null, min and max can be null. + val minVal = null + val maxVal = null + val result = ZOrderField.build("LongColumnName", LongType, minVal, maxVal) + assert(result.isInstanceOf[LongMinMaxZOrderField]) + val col = result.asInstanceOf[LongMinMaxZOrderField] + // we don't consider this column if minVal == maxVal + assert(col.bitLen == 0) + + { + val res = col.getBitSet(null).stream().toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(10L).stream().toArray.toSeq + assert(res.equals(Seq()), res) + } + } + + // Test same case for LongPercentileZOrderField. + { + val minVal = -11.0 + val maxVal = 10.0 + val result = + ZOrderField.build("LongColumnName", LongType, minVal, maxVal, quantileEnabled = true) + assert(result.isInstanceOf[LongPercentileZOrderField]) + val col = result.asInstanceOf[LongPercentileZOrderField] + assert(col.bitLen == 4) + + // null value is not considered as min or max value. + // So it's possible to get null value in getBitSet. + // ZOrderField should return empty bitset for null value. + val res = col.getBitSet(null).stream().toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + // If all values are null, min and max can be null. + val minVal = null + val maxVal = null + val result = + ZOrderField.build("LongColumnName", LongType, minVal, maxVal, quantileEnabled = true) + assert(result.isInstanceOf[LongPercentileZOrderField]) + val col = result.asInstanceOf[LongPercentileZOrderField] + // we don't consider this column if minVal == maxVal + assert(col.bitLen == 0) + + { + val res = col.getBitSet(null).stream().toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(10L).stream().toArray.toSeq + assert(res.equals(Seq()), res) + } + } + } + + test("Test LongMinMaxZOrderField.") { + { + val minVal = -11L + val maxVal = 10L + val result = ZOrderField.build("LongColumnName", LongType, minVal, maxVal) + assert(result.isInstanceOf[LongMinMaxZOrderField]) + val col = result.asInstanceOf[LongMinMaxZOrderField] + assert(col.name.equals("LongColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 5) + + // value - minValue = 9 - (-11) = 20 = 0b10100 + val res = col.getBitSet(9L).stream.toArray.toSeq + assert(res.equals(Seq(2, 4))) + val e = intercept[HyperspaceException](col.getBitSet((-12L))) + assert(e.msg.contains("value=-12, min=-11, max=10")) + val e2 = intercept[HyperspaceException](col.getBitSet(11L)) + assert(e2.msg.contains("value=11, min=-11, max=10")) + } + { + val minVal = Long.MinValue + val maxVal = Long.MaxValue + val result = ZOrderField.build("LongColumnName", LongType, minVal, maxVal) + assert(result.isInstanceOf[LongMinMaxZOrderField]) + val col = result.asInstanceOf[LongMinMaxZOrderField] + assert(col.name.equals("LongColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 64) + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals((0 to 63).toArray.toSeq), res) + } + { + val res = col.getBitSet(0L).stream.toArray.toSeq + assert(res.equals(Seq(63)), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + } + { + val minVal = 10000000000L + val maxVal = 10000001024L + val result = ZOrderField.build("LongColumnName", LongType, minVal, maxVal) + assert(result.isInstanceOf[LongMinMaxZOrderField]) + val col = result.asInstanceOf[LongMinMaxZOrderField] + assert(col.name.equals("LongColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 11) + + // Only use 0~1024 value for z-address. + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(10).toArray.toSeq), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + { + val res = col.getBitSet((minVal + 1L)).stream.toArray.toSeq + assert(res.equals(Seq(0)), res) + } + } + } + + test("Test LongPercentileZOrderField.") { + { + val minVal = 2211231L + val maxVal = 3122345L + val result = + ZOrderField.build( + "LongColumnName", + LongType, + minVal.toDouble, + maxVal.toDouble, + quantileEnabled = true) + assert(result.isInstanceOf[LongPercentileZOrderField]) + val col = result.asInstanceOf[LongPercentileZOrderField] + assert(col.name.equals("LongColumnName")) + assert(col.rawPercentile.head.equals(minVal.toDouble)) + assert(col.rawPercentile.last.equals(maxVal.toDouble)) + assert(col.rawPercentile.size == 2) + assert(col.bitLen == (32768 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 14), res) + } + + { + val p50 = (maxVal - minVal) / 2.0 + minVal + val res = col.getBitSet(p50.toLong).stream.toArray.toSeq + assert(res.equals(Seq(14)), res) + } + { + val p10 = (maxVal - minVal) / 10.0 + minVal + val res = col.getBitSet(p10.toLong).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((32768 * 0.1).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = (maxVal - minVal) * 0.9 + minVal + val res = col.getBitSet(p90.toLong).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((32768 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + { + val minVal = Long.MinValue + val maxVal = Long.MaxValue + val result = + ZOrderField.build( + "LongColumnName", + LongType, + minVal.toDouble, + maxVal.toDouble, + quantileEnabled = true) + assert(result.isInstanceOf[LongPercentileZOrderField]) + val col = result.asInstanceOf[LongPercentileZOrderField] + assert(col.name.equals("LongColumnName")) + assert(col.rawPercentile.head.equals(minVal.toDouble)) + assert(col.rawPercentile.last.equals(maxVal.toDouble)) + assert(col.rawPercentile.size == 2) + assert(col.bitLen == (32768 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 14), res) + } + + { + val p50 = 0L + val res = col.getBitSet(p50).stream.toArray.toSeq + assert(res.equals(Seq(14)), res) + } + { + val p10 = minVal * 0.8 + val res = col.getBitSet(p10.toLong).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((32768 * 0.1).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = maxVal * 0.8f + val res = col.getBitSet(p90.toLong).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((32768 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + + { + val minVal = 0.0 + val maxVal = 1000.0 + val result = + ZOrderField.build( + "LongColumnName", + LongType, + minVal, + maxVal, + Seq(10.0, 100.0, 200.0, 500.0), + quantileEnabled = true) + assert(result.isInstanceOf[LongPercentileZOrderField]) + val col = result.asInstanceOf[LongPercentileZOrderField] + assert(col.name.equals("LongColumnName")) + assert(col.rawPercentile.head.equals(minVal)) + assert(col.rawPercentile.last.equals(maxVal)) + assert(col.rawPercentile.size == 6) + val bucketSize = (maxVal / 2.0 - minVal / 2.0 + 1).toLong + // For LongType, use min(32768, maxVal - minVal + 1). "/ 2.0" for normalize. + assert(col.bitLen == (bucketSize - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal.toLong).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal.toLong).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array(bucketSize - 1)).stream.toArray.toSeq + assert(res.equals(expected), expected) + } + + { + val value = 500L + val res = col.getBitSet(value).stream.toArray.toSeq + // value is not 50% percentile, 80% based on the given quantiles. + val expected = + util.BitSet.valueOf(Array((bucketSize * 0.8).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val value = 750L + val res = col.getBitSet(value).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((bucketSize * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = 150L + val res = col.getBitSet(p90).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((bucketSize * 0.5).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + } + + test("Test IntMinMaxZOrderField.") { + { + val minVal = -11 + val maxVal = 10 + val result = ZOrderField.build("IntColumnName", IntegerType, minVal, maxVal) + assert(result.isInstanceOf[IntMinMaxZOrderField]) + val col = result.asInstanceOf[IntMinMaxZOrderField] + assert(col.name.equals("IntColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 5) + + val res = col.getBitSet(9).stream.toArray.toSeq + // value - minValue = 9 - (-11) = 20 = 0b10100 + assert(res.equals(Seq(2, 4)), res) + val e = intercept[HyperspaceException](col.getBitSet((-12))) + assert(e.msg.contains("value=-12, min=-11, max=10")) + val e2 = intercept[HyperspaceException](col.getBitSet(11)) + assert(e2.msg.contains("value=11, min=-11, max=10")) + } + { + val minVal = Int.MinValue + val maxVal = Int.MaxValue + val result = ZOrderField.build("IntColumnName", IntegerType, minVal, maxVal) + assert(result.isInstanceOf[IntMinMaxZOrderField]) + val col = result.asInstanceOf[IntMinMaxZOrderField] + assert(col.name.equals("IntColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 32) + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals((0 to 31).toArray.toSeq), res) + } + { + val res = col.getBitSet(0).stream.toArray.toSeq + assert(res.equals(Seq(31)), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + } + { + val minVal = 1000000000 + val maxVal = 1000001024 + val result = ZOrderField.build("IntColumnName", IntegerType, minVal, maxVal) + assert(result.isInstanceOf[IntMinMaxZOrderField]) + val col = result.asInstanceOf[IntMinMaxZOrderField] + assert(col.name.equals("IntColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 11) + + // Only use 0~1024 value for z-address. + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(10).toArray.toSeq), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + { + val res = col.getBitSet((minVal + 1)).stream.toArray.toSeq + assert(res.equals(Seq(0)), res) + } + } + } + + test("Test ShortMinMaxZOrderField.") { + { + val minVal = -11.toShort + val maxVal = 10.toShort + val result = ZOrderField.build("ShortColumnName", ShortType, minVal, maxVal) + assert(result.isInstanceOf[ShortMinMaxZOrderField]) + val col = result.asInstanceOf[ShortMinMaxZOrderField] + assert(col.name.equals("ShortColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 5) + + val res = col.getBitSet(9.toShort).stream.toArray.toSeq + // value - minValue = 9 - -11 = 20 = 0b10100 + assert(res.equals(Seq(2, 4)), res) + val e = intercept[HyperspaceException](col.getBitSet((-12).toShort)) + assert(e.msg.contains("value=-12, min=-11, max=10")) + val e2 = intercept[HyperspaceException](col.getBitSet(11.toShort)) + assert(e2.msg.contains("value=11, min=-11, max=10")) + } + { + val minVal = Short.MinValue + val maxVal = Short.MaxValue + val result = ZOrderField.build("ShortColumnName", ShortType, minVal, maxVal) + assert(result.isInstanceOf[ShortMinMaxZOrderField]) + val col = result.asInstanceOf[ShortMinMaxZOrderField] + assert(col.name.equals("ShortColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 16) + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals((0 to 15).toArray.toSeq), res) + } + { + val res = col.getBitSet(0.toShort).stream.toArray.toSeq + assert(res.equals(Seq(15)), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + } + { + val minVal = 10000.toShort + val maxVal = 11024.toShort + val result = ZOrderField.build("ShortColumnName", ShortType, minVal, maxVal) + assert(result.isInstanceOf[ShortMinMaxZOrderField]) + val col = result.asInstanceOf[ShortMinMaxZOrderField] + assert(col.name.equals("ShortColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 11) + + // Only use 0~1024 value for z-address. + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(10).toArray.toSeq), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + { + val res = col.getBitSet((minVal + 1).toShort).stream.toArray.toSeq + assert(res.equals(Seq(0)), res) + } + } + } + + test("Test ByteMinMaxZOrderField.") { + { + val minVal = (-11).toByte + val maxVal = (10).toByte + val result = ZOrderField.build("ByteColumnName", ByteType, minVal, maxVal) + assert(result.isInstanceOf[ByteMinMaxZOrderField]) + val col = result.asInstanceOf[ByteMinMaxZOrderField] + assert(col.name.equals("ByteColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 5) + + val res = col.getBitSet(9.toByte).stream.toArray.toSeq + // value - minValue = 9 - -11 = 20 = 0b10100 + assert(res.equals(Seq(2, 4)), res) + val e = intercept[HyperspaceException](col.getBitSet((-12).toByte)) + assert(e.msg.contains("value=-12, min=-11, max=10")) + val e2 = intercept[HyperspaceException](col.getBitSet(11.toByte)) + assert(e2.msg.contains("value=11, min=-11, max=10")) + } + { + val minVal = Byte.MinValue + val maxVal = Byte.MaxValue + val result = ZOrderField.build("ByteColumnName", ByteType, minVal, maxVal) + assert(result.isInstanceOf[ByteMinMaxZOrderField]) + val col = result.asInstanceOf[ByteMinMaxZOrderField] + assert(col.name.equals("ByteColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 8) + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals((0 to 7).toArray.toSeq), res) + } + { + val res = col.getBitSet(0.toByte).stream.toArray.toSeq + assert(res.equals(Seq(7)), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + } + { + val minVal = 200.toByte + val maxVal = 216.toByte + val result = ZOrderField.build("ByteColumnName", ByteType, minVal, maxVal) + assert(result.isInstanceOf[ByteMinMaxZOrderField]) + val col = result.asInstanceOf[ByteMinMaxZOrderField] + assert(col.name.equals("ByteColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 5) + + // Only use 0~16 value for z-address. + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(4).toArray.toSeq), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + { + val res = col.getBitSet((minVal + 1).toByte).stream.toArray.toSeq + assert(res.equals(Seq(0)), res) + } + } + } + + test("Test BooleanZOrderField.") { + { + val minVal = false + val maxVal = true + val result = ZOrderField.build("BooleanColumnName", BooleanType, minVal, maxVal) + assert(result.isInstanceOf[BooleanZOrderField]) + val col = result.asInstanceOf[BooleanZOrderField] + assert(col.name.equals("BooleanColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 1) + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(0)), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + } + } + + test("Test DecimalZOrderField.") { + def getBigDecimal(v: Long): java.math.BigDecimal = { + new java.math.BigDecimal(v).setScale(0, java.math.RoundingMode.FLOOR) + } + + { + val minVal = getBigDecimal(-11) + val maxVal = getBigDecimal(10) + val result = ZOrderField.build("DecimalColumnName", DecimalType(1, 1), minVal, maxVal) + assert(result.isInstanceOf[DecimalZOrderField]) + val col = result.asInstanceOf[DecimalZOrderField] + assert(col.name.equals("DecimalColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 5) + + // value - minValue = 9 - (-11) = 20 = 0b10100 + val res = col.getBitSet(getBigDecimal(9)).stream.toArray.toSeq + assert(res.equals(Seq(2, 4)), res) + val e = intercept[HyperspaceException](col.getBitSet(getBigDecimal(-12))) + assert(e.msg.contains("value=-12, min=-11, max=10")) + val e2 = intercept[HyperspaceException](col.getBitSet(getBigDecimal(11))) + assert(e2.msg.contains("value=11, min=-11, max=10")) + } + + { + // Test 2 bytes + val minVal = getBigDecimal(-256) + val maxVal = getBigDecimal(256) + val result = ZOrderField.build("DecimalColumnName", DecimalType(1, 1), minVal, maxVal) + assert(result.isInstanceOf[DecimalZOrderField]) + val col = result.asInstanceOf[DecimalZOrderField] + assert(col.name.equals("DecimalColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 10) + + { + // value - minValue = 0 - (-256) = 256 = 0b100000000 + val res = col.getBitSet(getBigDecimal(0)).stream.toArray.toSeq + assert(res.equals(Seq(8)), res) + } + { + // value - minValue = 256 - (-256) = 512 = 0b1000000000 + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(9)), res) + } + + { + // value - minValue = -256 - (-256) = 0 + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + // value - minValue = 8 - (-256) = 264 = 0b0100001000 + val res = col.getBitSet(getBigDecimal(8)).stream.toArray.toSeq + assert(res.equals(Seq(3, 8)), res) + } + + val e = + intercept[HyperspaceException](col.getBitSet(getBigDecimal(-277))) + assert(e.msg.contains("value=-277, min=-256, max=256")) + val e2 = + intercept[HyperspaceException](col.getBitSet(getBigDecimal(277))) + assert(e2.msg.contains("value=277, min=-256, max=256")) + } + { + val minVal = new java.math.BigDecimal("-98765432109876543210987654321098765432").setScale(0) + val maxVal = new java.math.BigDecimal("98765432109876543210987654321098765432").setScale(0) + val result = ZOrderField.build("DecimalColumnName", DecimalType(1, 1), minVal, maxVal) + assert(result.isInstanceOf[DecimalZOrderField]) + val col = result.asInstanceOf[DecimalZOrderField] + assert(col.name.equals("DecimalColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 128) + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + val expected = Seq(4, 5, 6, 7, 11, 13, 14, 16, 22, 23, 25, 26, 32, 33, 37, 39, 40, 41, 42, + 45, 48, 49, 50, 51, 52, 55, 59, 60, 61, 63, 64, 67, 71, 72, 73, 75, 78, 86, 91, 92, 93, + 95, 96, 98, 103, 104, 105, 106, 107, 112, 113, 115, 116, 119, 122, 124, 127) + assert(res.equals(expected), res) + } + { + val res = col.getBitSet(getBigDecimal(0)).stream.toArray.toSeq + val expected = Seq(3, 4, 5, 6, 10, 12, 13, 15, 21, 22, 24, 25, 31, 32, 36, 38, 39, 40, 41, + 44, 47, 48, 49, 50, 51, 54, 58, 59, 60, 62, 63, 66, 70, 71, 72, 74, 77, 85, 90, 91, 92, + 94, 95, 97, 102, 103, 104, 105, 106, 111, 112, 114, 115, 118, 121, 123, 126) + assert(res.equals(expected), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + } + { + val minVal = getBigDecimal(10000000000L) + val maxVal = getBigDecimal(10000001024L) + val result = ZOrderField.build("DecimalColumnName", DecimalType(1, 1), minVal, maxVal) + assert(result.isInstanceOf[DecimalZOrderField]) + val col = result.asInstanceOf[DecimalZOrderField] + assert(col.name.equals("DecimalColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 11) + + // Only use 0~1024 value for z-address. + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(10).toArray.toSeq), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + { + val res = col + .getBitSet(getBigDecimal(10000000000L + 1L)) + .stream + .toArray + .toSeq + assert(res.equals(Seq(0)), res) + } + } + } + + test("Test DoublePercentileZOrderField.") { + { + val minVal = -2.211231 + val maxVal = 3.122345 + val result = ZOrderField.build("DoubleColumnName", DoubleType, minVal, maxVal) + assert(result.isInstanceOf[DoublePercentileZOrderField]) + val col = result.asInstanceOf[DoublePercentileZOrderField] + assert(col.name.equals("DoubleColumnName")) + assert(col.rawPercentile.head.equals(minVal)) + assert(col.rawPercentile.last.equals(maxVal)) + assert(col.rawPercentile.size == 2) + assert(col.bitLen == (32768 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 14), res) + } + + { + val p50 = (maxVal - minVal) / 2.0 + minVal + val res = col.getBitSet(p50).stream.toArray.toSeq + assert(res.equals(Seq(14)), res) + } + { + val p10 = (maxVal - minVal) / 10.0 + minVal + val res = col.getBitSet(p10).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((32768 * 0.1).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = (maxVal - minVal) * 0.9 + minVal + val res = col.getBitSet(p90).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((32768 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + + { + val minVal = Double.MinValue + val maxVal = Double.MaxValue + val result = ZOrderField.build("DoubleColumnName", DoubleType, minVal, maxVal) + assert(result.isInstanceOf[DoublePercentileZOrderField]) + val col = result.asInstanceOf[DoublePercentileZOrderField] + assert(col.name.equals("DoubleColumnName")) + assert(col.rawPercentile.head.equals(minVal)) + assert(col.rawPercentile.last.equals(maxVal)) + assert(col.rawPercentile.size == 2) + assert(col.bitLen == (32768 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 14), res) + } + + { + val p50 = 0.0 + val res = col.getBitSet(p50).stream.toArray.toSeq + assert(res.equals(Seq(14)), res) + } + { + val p10 = minVal * 0.8 + val res = col.getBitSet(p10).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((32768 * 0.1).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = maxVal * 0.8 + val res = col.getBitSet(p90).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((32768 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + + { + val minVal = 0.0 + val maxVal = 1000.0 + val result = + ZOrderField.build( + "DoubleColumnName", + DoubleType, + minVal, + maxVal, + Seq(10.0, 100.0, 200.0, 500.0), + quantileEnabled = true) + assert(result.isInstanceOf[DoublePercentileZOrderField]) + val col = result.asInstanceOf[DoublePercentileZOrderField] + assert(col.name.equals("DoubleColumnName")) + assert(col.rawPercentile.head.equals(minVal)) + assert(col.rawPercentile.last.equals(maxVal)) + assert(col.rawPercentile.size == 6) + assert(col.bitLen == (32768 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 14), res) + } + + { + val value = 500.0 + val res = col.getBitSet(value).stream.toArray.toSeq + // value is not 50% percentile, 80% based on the given quantiles. + val expected = + util.BitSet.valueOf(Array((32768 * 0.8).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val value = 750.0 + val res = col.getBitSet(value).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((32768 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = 150.0 + val res = col.getBitSet(p90).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((32768 * 0.5).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + } + + test("Test FloatPercentileZOrderField.") { + { + val minVal = -2.211231f + val maxVal = 3.122345f + val result = + ZOrderField.build("FloatColumnName", FloatType, minVal.toDouble, maxVal.toDouble) + assert(result.isInstanceOf[FloatPercentileZOrderField]) + val col = result.asInstanceOf[FloatPercentileZOrderField] + assert(col.name.equals("FloatColumnName")) + assert(col.rawPercentile.head.equals(minVal.toDouble)) + assert(col.rawPercentile.last.equals(maxVal.toDouble)) + assert(col.rawPercentile.size == 2) + assert(col.bitLen == (16384 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 13), res) + } + + { + val p50 = (maxVal - minVal) / 2.0f + minVal + val res = col.getBitSet(p50).stream.toArray.toSeq + assert(res.equals(Seq(13)), res) + } + { + val p10 = (maxVal - minVal) / 10.0f + minVal + val res = col.getBitSet(p10).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((16384 * 0.1).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = (maxVal - minVal) * 9.0f / 10.0f + minVal + val res = col.getBitSet(p90).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((16384 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + { + val minVal = Float.MinValue + val maxVal = Float.MaxValue + val result = + ZOrderField.build("FloatColumnName", FloatType, minVal.toDouble, maxVal.toDouble) + assert(result.isInstanceOf[FloatPercentileZOrderField]) + val col = result.asInstanceOf[FloatPercentileZOrderField] + assert(col.name.equals("FloatColumnName")) + assert(col.rawPercentile.head.equals(minVal.toDouble)) + assert(col.rawPercentile.last.equals(maxVal.toDouble)) + assert(col.rawPercentile.size == 2) + assert(col.bitLen == (16384 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 13), res) + } + + { + val p50 = 0.0f + val res = col.getBitSet(p50).stream.toArray.toSeq + assert(res.equals(Seq(13)), res) + } + { + val p10 = minVal * 0.8f + val res = col.getBitSet(p10).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((16384 * 0.1).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = maxVal * 0.8f + val res = col.getBitSet(p90).stream.toArray.toSeq + val expected = util.BitSet.valueOf(Array((16384 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + + { + val minVal = 0.0f + val maxVal = 1000.0f + val result = + ZOrderField.build( + "FloatColumnName", + FloatType, + minVal, + maxVal, + Seq(10.0, 100.0, 200.0, 500.0), + quantileEnabled = true) + assert(result.isInstanceOf[FloatPercentileZOrderField]) + val col = result.asInstanceOf[FloatPercentileZOrderField] + assert(col.name.equals("FloatColumnName")) + assert(col.rawPercentile.head.equals(minVal.toDouble)) + assert(col.rawPercentile.last.equals(maxVal.toDouble)) + assert(col.rawPercentile.size == 6) + assert(col.bitLen == (16384 - 1).toBinaryString.length) + + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(0 to 13), res) + } + + { + val value = 500.0f + val res = col.getBitSet(value).stream.toArray.toSeq + // value is not 50% percentile, 80% based on the given quantiles. + val expected = + util.BitSet.valueOf(Array((16384 * 0.8).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val value = 750.0f + val res = col.getBitSet(value).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((16384 * 0.9).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + { + val p90 = 150.0f + val res = col.getBitSet(p90).stream.toArray.toSeq + val expected = + util.BitSet.valueOf(Array((16384 * 0.5).toLong)).stream.toArray.toSeq + assert(res.equals(expected), res) + } + } + } + + test("Test StringZOrderField.") { + { + val minVal = "cat" + val maxVal = "zookeeper" + val result = ZOrderField.build("StringColumnName", StringType, minVal, maxVal) + assert(result.isInstanceOf[StringZOrderField]) + val col = result.asInstanceOf[StringZOrderField] + assert(col.name.equals("StringColumnName")) + assert(col.minVal.equals(minVal)) + assert(col.maxVal.equals(maxVal)) + assert(col.bitLen == 29) + + { + val res = col.getBitSet("dog").stream.toArray.toSeq + // d - c = 100 - 99 = 0b1 = (0) + 24 + // o = 111 = 0b1101111 = (0, 1, 2, 3, 5, 6) + 16 + // g = 103 = 0b1100111 = (0, 1, 2, 5, 6) + 8 + assert(res.equals(Seq(8, 9, 10, 13, 14, 16, 17, 18, 19, 21, 22, 24)), res) + } + + { + val res = col.getBitSet("z").stream.toArray.toSeq + // z - c = 122 - 99 = 0b10111 = (0, 1, 2, 4) + 24 + assert(res.equals(Seq(24, 25, 26, 28)), res) + } + + { + val res = col.getBitSet("zookeeper").stream.toArray.toSeq + // z - c = 122 - 99 = 0b10111 = (0, 1, 2, 4) + 24 + // o = 111 = 0b1101111 = (0, 1, 2, 3, 5, 6) + 16 + // o = 111 = 0b1101111 = (0, 1, 2, 3, 5, 6) + 8 + // k = 107 = 0b1101011 = (0, 1, 3, 5, 6) + 0 + assert( + res.equals( + Seq(0, 1, 3, 5, 6, 8, 9, 10, 11, 13, 14, 16, 17, 18, 19, 21, 22, 24, 25, 26, 28)), + res) + } + + { + // Result should be same as only first 4 bytes are used. + val res1 = col.getBitSet("deadbeef").stream.toArray.toSeq + val res2 = col.getBitSet("deadwalking").stream.toArray.toSeq + assert(res1.equals(res2)) + } + + val e = intercept[HyperspaceException](col.getBitSet("Z")) + assert(e.msg.contains("value=Z, min=cat, max=zookeeper")) + } + } + + test("Test DateMinMaxZOrderField.") { + def date(str: String): java.sql.Date = { + java.sql.Date.valueOf(str) + } + + { + val minVal = date("2021-03-01") + val maxVal = date("2021-04-30") + val result = ZOrderField.build("DateColumnName", DateType, minVal, maxVal) + assert(result.isInstanceOf[DateMinMaxZOrderField]) + val col = result.asInstanceOf[DateMinMaxZOrderField] + assert(col.name.equals("DateColumnName")) + assert(col.bitLen == 6) + + val res = col.getBitSet(date("2021-03-02")).stream.toArray.toSeq + assert(res.equals(Seq(0)), res) + + val e = + intercept[HyperspaceException](col.getBitSet(date("2020-02-02"))) + assert(e.msg.contains("value=2020-02-02, min=2021-03-01, max=2021-04-30")) + val e2 = + intercept[HyperspaceException](col.getBitSet(date("2022-02-02"))) + assert(e2.msg.contains("value=2022-02-02, min=2021-03-01, max=2021-04-30")) + } + { + val minVal = date("1970-01-01") + val maxVal = date("2100-01-01") + val result = ZOrderField.build("DateColumnName", DateType, minVal, maxVal) + assert(result.isInstanceOf[DateMinMaxZOrderField]) + val col = result.asInstanceOf[DateMinMaxZOrderField] + assert(col.name.equals("DateColumnName")) + assert(col.bitLen == 16) + + { + // 47482 days = 0b1011100101111010 + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(1, 3, 4, 5, 6, 8, 11, 12, 13, 15)), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + { + // 365 days = 0b101101101 + val res = col.getBitSet(date("1971-01-01")).stream.toArray.toSeq + assert(res.equals(Seq(0, 2, 3, 5, 6, 8)), res) + } + } + } + + test("Test TimestampMinMaxZOrderField.") { + def ts(str: String): java.sql.Timestamp = { + java.sql.Timestamp.valueOf(str) + } + + { + val minVal = ts("2021-03-01 00:00:00") + val maxVal = ts("2021-03-02 00:00:00") + val result = ZOrderField.build("TimestampColumnName", TimestampType, minVal, maxVal) + assert(result.isInstanceOf[TimestampMinMaxZOrderField]) + val col = result.asInstanceOf[TimestampMinMaxZOrderField] + assert(col.name.equals("TimestampColumnName")) + // in millis = 1000000 * 60 * 60 * 24 = 86400000000 = 0b1010000011101110101110110000000000000 + assert(col.bitLen == 37) + + val res = + col.getBitSet(ts("2021-03-01 00:00:00.001")).stream.toArray.toSeq + + // 1000 = 0b1111101000 + assert(res.equals(Seq(3, 5, 6, 7, 8, 9)), res) + + val e = + intercept[HyperspaceException](col.getBitSet(ts("2020-02-02 00:00:00"))) + assert( + e.msg.contains( + "value=2020-02-02 00:00:00.0, min=2021-03-01 00:00:00.0, max=2021-03-02 00:00:00.0")) + val e2 = + intercept[HyperspaceException](col.getBitSet(ts("2022-02-02 00:00:00"))) + assert( + e2.msg.contains( + "value=2022-02-02 00:00:00.0, min=2021-03-01 00:00:00.0, max=2021-03-02 00:00:00.0")) + } + { + val minVal = ts("1970-01-01 00:00:00.0") + val maxVal = ts("1970-01-01 00:01:00.0") + val result = ZOrderField.build("TimestampColumnName", TimestampType, minVal, maxVal) + assert(result.isInstanceOf[TimestampMinMaxZOrderField]) + val col = result.asInstanceOf[TimestampMinMaxZOrderField] + assert(col.name.equals("TimestampColumnName")) + // 1000 * 1000 * 60 = 60000000 = 0b11100100111000011100000000 + assert(col.bitLen == 26) + + { + val res = col.getBitSet(maxVal).stream.toArray.toSeq + assert(res.equals(Seq(8, 9, 10, 15, 16, 17, 20, 23, 24, 25)), res) + } + { + val res = col.getBitSet(minVal).stream.toArray.toSeq + assert(res.equals(Seq()), res) + } + { + val res = + col.getBitSet(ts("1970-01-01 00:00:00.000001")).stream.toArray.toSeq + assert(res.equals(Seq(0)), res) + } + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderUDFTest.scala b/src/test/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderUDFTest.scala new file mode 100644 index 000000000..f8db76a15 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/zordercovering/ZOrderUDFTest.scala @@ -0,0 +1,60 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.zordercovering + +import scala.collection.mutable + +import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace.index.HyperspaceSuite + +class ZOrderUDFTest extends HyperspaceSuite { + + test("test ZOrderUDF simple") { + val field1 = ZOrderField.build("intColName", IntegerType, 1, 9, Nil, false) + val field2 = ZOrderField.build("longColName", LongType, 101L, 109L, Nil, false) + + val zudf = ZOrderUDF(Seq(field1, field2)) + assert(zudf.totalBitLen == 8) // 4 + 4 + + def getZAddress(intVal: Int, longVal: Long): Seq[Long] = { + import spark.implicits._ + val df = Seq((intVal, longVal)).toDF("intColName", "longColName") + val dfWithZAddr = + df.withColumn("zaddr", zudf.zAddressUdf(struct(df("intColName"), df("longColName")))) + val res = dfWithZAddr.head.getValuesMap(dfWithZAddr.schema.fieldNames) + res("zaddr").asInstanceOf[mutable.WrappedArray[Long]] + } + + // (1, 100L) => 0b100000000, as we always set (totalBitLen)-th bit. + // Both are minimum so other bits are 0. + assert(getZAddress(1, 101L).equals(Seq(256L))) + // (9, 109L) => 0b111000000 = 256 + 128 + 64 = 448 + assert(getZAddress(9, 109L).equals(Seq(448L))) + // (1, 109L) => 0b101000000 = 256 + 64 = 320 + assert(getZAddress(1, 109L).equals(Seq(320L))) + // (9, 101L) => 0b110000000 = 256 + 128 = 384 + assert(getZAddress(9, 101L).equals(Seq(384L))) + // (5, 101L) => 0b100100000 = 256 + 32 = 288 + assert(getZAddress(5, 101L).equals(Seq(288L))) + // (8, 101L) => 0b100101010 = 256 + 32 + 8 + 2 = 298 + assert(getZAddress(8, 101L).equals(Seq(298L))) + // (1, 108L) => 0b100010101 = 256 + 16 + 4 + 1 = 277 + assert(getZAddress(1, 108L).equals(Seq(277L))) + } +}