From 0d0493fcf7fc86d30b0ddd4e2c5a293c5c88eb9d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 7 Apr 2014 22:24:12 -0700 Subject: [PATCH] [SPARK-1402] Added 3 more compression schemes JIRA issue: [SPARK-1402](https://issues.apache.org/jira/browse/SPARK-1402) This PR provides 3 more compression schemes for Spark SQL in-memory columnar storage: * `BooleanBitSet` * `IntDelta` * `LongDelta` Now there are 6 compression schemes in total, including the no-op `PassThrough` scheme. Also fixed a bug in PR #286: not all compression schemes are added as available schemes when accessing an in-memory column, and when a column is compressed with an unrecognised scheme, `ColumnAccessor` throws exception. Author: Cheng Lian Closes #330 from liancheng/moreCompressionSchemes and squashes the following commits: 1d037b8 [Cheng Lian] Fixed SPARK-1436: in-memory column byte buffer must be able to be accessed multiple times d7c0e8f [Cheng Lian] Added test suite for IntegralDelta (IntDelta & LongDelta) 3c1ad7a [Cheng Lian] Added test suite for BooleanBitSet, refactored other test suites 44fe4b2 [Cheng Lian] Refactored CompressionScheme, added 3 more compression schemes. --- .../spark/sql/columnar/ColumnAccessor.scala | 23 +- .../spark/sql/columnar/ColumnStats.scala | 6 + .../CompressibleColumnBuilder.scala | 6 +- .../compression/CompressionScheme.scala | 28 +- .../compression/compressionSchemes.scala | 266 +++++++++++++++--- .../sql/columnar/ColumnarQuerySuite.scala | 8 + .../compression/BooleanBitSetSuite.scala | 98 +++++++ .../compression/DictionaryEncodingSuite.scala | 122 ++++---- .../compression/IntegralDeltaSuite.scala | 115 ++++++++ .../compression/RunLengthEncodingSuite.scala | 87 +++--- .../TestCompressibleColumnBuilder.scala | 6 +- 11 files changed, 586 insertions(+), 179 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index ffd4894b5213d..3c39e1d350fa8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer) private[sql] object ColumnAccessor { def apply(buffer: ByteBuffer): ColumnAccessor = { + val dup = buffer.duplicate().order(ByteOrder.nativeOrder) // The first 4 bytes in the buffer indicate the column type. - val columnTypeId = buffer.getInt() + val columnTypeId = dup.getInt() columnTypeId match { - case INT.typeId => new IntColumnAccessor(buffer) - case LONG.typeId => new LongColumnAccessor(buffer) - case FLOAT.typeId => new FloatColumnAccessor(buffer) - case DOUBLE.typeId => new DoubleColumnAccessor(buffer) - case BOOLEAN.typeId => new BooleanColumnAccessor(buffer) - case BYTE.typeId => new ByteColumnAccessor(buffer) - case SHORT.typeId => new ShortColumnAccessor(buffer) - case STRING.typeId => new StringColumnAccessor(buffer) - case BINARY.typeId => new BinaryColumnAccessor(buffer) - case GENERIC.typeId => new GenericColumnAccessor(buffer) + case INT.typeId => new IntColumnAccessor(dup) + case LONG.typeId => new LongColumnAccessor(dup) + case FLOAT.typeId => new FloatColumnAccessor(dup) + case DOUBLE.typeId => new DoubleColumnAccessor(dup) + case BOOLEAN.typeId => new BooleanColumnAccessor(dup) + case BYTE.typeId => new ByteColumnAccessor(dup) + case SHORT.typeId => new ShortColumnAccessor(dup) + case STRING.typeId => new StringColumnAccessor(dup) + case BINARY.typeId => new BinaryColumnAccessor(dup) + case GENERIC.typeId => new GenericColumnAccessor(dup) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index 30c6bdc7912fc..95602d321dc6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.types._ +/** + * Used to collect statistical information when building in-memory columns. + * + * NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]` + * brings significant performance penalty. + */ private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable { /** * Closed lower bound of this column. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala index 3ac4b358ddf83..fd3b1adf9687a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala @@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType] import CompressionScheme._ - val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder) + val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T]) - protected def isWorthCompressing(encoder: Encoder) = { + protected def isWorthCompressing(encoder: Encoder[T]) = { encoder.compressionRatio < 0.8 } @@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType] abstract override def build() = { val rawBuffer = super.build() - val encoder = { + val encoder: Encoder[T] = { val candidate = compressionEncoders.minBy(_.compressionRatio) if (isWorthCompressing(candidate)) candidate else PassThrough.encoder } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala index d3a4ac8df926b..c605a8e4434e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala @@ -22,10 +22,8 @@ import java.nio.ByteBuffer import org.apache.spark.sql.catalyst.types.NativeType import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType} -private[sql] trait Encoder { - def gatherCompressibilityStats[T <: NativeType]( - value: T#JvmType, - columnType: ColumnType[T, T#JvmType]) {} +private[sql] trait Encoder[T <: NativeType] { + def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {} def compressedSize: Int @@ -35,10 +33,7 @@ private[sql] trait Encoder { if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0 } - def compress[T <: NativeType]( - from: ByteBuffer, - to: ByteBuffer, - columnType: ColumnType[T, T#JvmType]): ByteBuffer + def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer } private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType] @@ -48,7 +43,7 @@ private[sql] trait CompressionScheme { def supports(columnType: ColumnType[_, _]): Boolean - def encoder: Encoder + def encoder[T <: NativeType]: Encoder[T] def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] } @@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes { } private[sql] trait AllCompressionSchemes extends WithCompressionSchemes { - override val schemes: Seq[CompressionScheme] = { - Seq(PassThrough, RunLengthEncoding, DictionaryEncoding) - } + override val schemes: Seq[CompressionScheme] = CompressionScheme.all } private[sql] object CompressionScheme { - def apply(typeId: Int): CompressionScheme = typeId match { - case PassThrough.typeId => PassThrough - case _ => throw new UnsupportedOperationException() + val all: Seq[CompressionScheme] = + Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta) + + private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap + + def apply(typeId: Int): CompressionScheme = { + typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException( + s"Unrecognized compression scheme type ID: $typeId")) } def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala index dc2c153faf8ad..df8220b556edd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala @@ -24,7 +24,7 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.runtimeMirror import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.catalyst.types.NativeType +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.columnar._ private[sql] case object PassThrough extends CompressionScheme { @@ -32,22 +32,18 @@ private[sql] case object PassThrough extends CompressionScheme { override def supports(columnType: ColumnType[_, _]) = true - override def encoder = new this.Encoder + override def encoder[T <: NativeType] = new this.Encoder[T] override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { new this.Decoder(buffer, columnType) } - class Encoder extends compression.Encoder { + class Encoder[T <: NativeType] extends compression.Encoder[T] { override def uncompressedSize = 0 override def compressedSize = 0 - override def compress[T <: NativeType]( - from: ByteBuffer, - to: ByteBuffer, - columnType: ColumnType[T, T#JvmType]) = { - + override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]) = { // Writes compression type ID and copies raw contents to.putInt(PassThrough.typeId).put(from).rewind() to @@ -64,9 +60,9 @@ private[sql] case object PassThrough extends CompressionScheme { } private[sql] case object RunLengthEncoding extends CompressionScheme { - override def typeId = 1 + override val typeId = 1 - override def encoder = new this.Encoder + override def encoder[T <: NativeType] = new this.Encoder[T] override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { new this.Decoder(buffer, columnType) @@ -77,7 +73,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { case _ => false } - class Encoder extends compression.Encoder { + class Encoder[T <: NativeType] extends compression.Encoder[T] { private var _uncompressedSize = 0 private var _compressedSize = 0 @@ -89,10 +85,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { override def compressedSize = _compressedSize - override def gatherCompressibilityStats[T <: NativeType]( - value: T#JvmType, - columnType: ColumnType[T, T#JvmType]) { - + override def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) { val actualSize = columnType.actualSize(value) _uncompressedSize += actualSize @@ -111,11 +104,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { } } - override def compress[T <: NativeType]( - from: ByteBuffer, - to: ByteBuffer, - columnType: ColumnType[T, T#JvmType]) = { - + override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]) = { to.putInt(RunLengthEncoding.typeId) if (from.hasRemaining) { @@ -172,23 +161,23 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { } private[sql] case object DictionaryEncoding extends CompressionScheme { - override def typeId: Int = 2 + override val typeId = 2 // 32K unique values allowed - private val MAX_DICT_SIZE = Short.MaxValue - 1 + val MAX_DICT_SIZE = Short.MaxValue override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { - new this.Decoder[T](buffer, columnType) + new this.Decoder(buffer, columnType) } - override def encoder = new this.Encoder + override def encoder[T <: NativeType] = new this.Encoder[T] override def supports(columnType: ColumnType[_, _]) = columnType match { case INT | LONG | STRING => true case _ => false } - class Encoder extends compression.Encoder{ + class Encoder[T <: NativeType] extends compression.Encoder[T] { // Size of the input, uncompressed, in bytes. Note that we only count until the dictionary // overflows. private var _uncompressedSize = 0 @@ -201,7 +190,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { private var count = 0 // The reverse mapping of _dictionary, i.e. mapping encoded integer to the value itself. - private var values = new mutable.ArrayBuffer[Any](1024) + private var values = new mutable.ArrayBuffer[T#JvmType](1024) // The dictionary that maps a value to the encoded short integer. private val dictionary = mutable.HashMap.empty[Any, Short] @@ -210,10 +199,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { // to store dictionary element count. private var dictionarySize = 4 - override def gatherCompressibilityStats[T <: NativeType]( - value: T#JvmType, - columnType: ColumnType[T, T#JvmType]) { - + override def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) { if (!overflow) { val actualSize = columnType.actualSize(value) count += 1 @@ -234,11 +220,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { } } - override def compress[T <: NativeType]( - from: ByteBuffer, - to: ByteBuffer, - columnType: ColumnType[T, T#JvmType]) = { - + override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]) = { if (overflow) { throw new IllegalStateException( "Dictionary encoding should not be used because of dictionary overflow.") @@ -249,7 +231,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { var i = 0 while (i < values.length) { - columnType.append(values(i).asInstanceOf[T#JvmType], to) + columnType.append(values(i), to) i += 1 } @@ -286,3 +268,215 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { override def hasNext = buffer.hasRemaining } } + +private[sql] case object BooleanBitSet extends CompressionScheme { + override val typeId = 3 + + val BITS_PER_LONG = 64 + + override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: NativeType] = (new this.Encoder).asInstanceOf[compression.Encoder[T]] + + override def supports(columnType: ColumnType[_, _]) = columnType == BOOLEAN + + class Encoder extends compression.Encoder[BooleanType.type] { + private var _uncompressedSize = 0 + + override def gatherCompressibilityStats( + value: Boolean, + columnType: NativeColumnType[BooleanType.type]) { + + _uncompressedSize += BOOLEAN.defaultSize + } + + override def compress( + from: ByteBuffer, + to: ByteBuffer, + columnType: NativeColumnType[BooleanType.type]) = { + + to.putInt(BooleanBitSet.typeId) + // Total element count (1 byte per Boolean value) + .putInt(from.remaining) + + while (from.remaining >= BITS_PER_LONG) { + var word = 0: Long + var i = 0 + + while (i < BITS_PER_LONG) { + if (BOOLEAN.extract(from)) { + word |= (1: Long) << i + } + i += 1 + } + + to.putLong(word) + } + + if (from.hasRemaining) { + var word = 0: Long + var i = 0 + + while (from.hasRemaining) { + if (BOOLEAN.extract(from)) { + word |= (1: Long) << i + } + i += 1 + } + + to.putLong(word) + } + + to.rewind() + to + } + + override def uncompressedSize = _uncompressedSize + + override def compressedSize = { + val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1 + (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4 + } + } + + class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] { + private val count = buffer.getInt() + + private var currentWord = 0: Long + + private var visited: Int = 0 + + override def next(): Boolean = { + val bit = visited % BITS_PER_LONG + + visited += 1 + if (bit == 0) { + currentWord = buffer.getLong() + } + + ((currentWord >> bit) & 1) != 0 + } + + override def hasNext: Boolean = visited < count + } +} + +private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends CompressionScheme { + override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + new this.Decoder(buffer, columnType.asInstanceOf[NativeColumnType[I]]) + .asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: NativeType] = (new this.Encoder).asInstanceOf[compression.Encoder[T]] + + /** + * Computes `delta = x - y`, returns `(true, delta)` if `delta` can fit into a single byte, or + * `(false, 0: Byte)` otherwise. + */ + protected def byteSizedDelta(x: I#JvmType, y: I#JvmType): (Boolean, Byte) + + /** + * Simply computes `x + delta` + */ + protected def addDelta(x: I#JvmType, delta: Byte): I#JvmType + + class Encoder extends compression.Encoder[I] { + private var _compressedSize: Int = 0 + + private var _uncompressedSize: Int = 0 + + private var prev: I#JvmType = _ + + private var initial = true + + override def gatherCompressibilityStats(value: I#JvmType, columnType: NativeColumnType[I]) { + _uncompressedSize += columnType.defaultSize + + if (initial) { + initial = false + prev = value + _compressedSize += 1 + columnType.defaultSize + } else { + val (smallEnough, _) = byteSizedDelta(value, prev) + _compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize) + } + } + + override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = { + to.putInt(typeId) + + if (from.hasRemaining) { + val prev = columnType.extract(from) + + to.put(Byte.MinValue) + columnType.append(prev, to) + + while (from.hasRemaining) { + val current = columnType.extract(from) + val (smallEnough, delta) = byteSizedDelta(current, prev) + + if (smallEnough) { + to.put(delta) + } else { + to.put(Byte.MinValue) + columnType.append(current, to) + } + } + } + + to.rewind() + to + } + + override def uncompressedSize = _uncompressedSize + + override def compressedSize = _compressedSize + } + + class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[I]) + extends compression.Decoder[I] { + + private var prev: I#JvmType = _ + + override def next() = { + val delta = buffer.get() + + if (delta > Byte.MinValue) { + addDelta(prev, delta) + } else { + prev = columnType.extract(buffer) + prev + } + } + + override def hasNext = buffer.hasRemaining + } +} + +private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] { + override val typeId = 4 + + override def supports(columnType: ColumnType[_, _]) = columnType == INT + + override protected def addDelta(x: Int, delta: Byte) = x + delta + + override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = { + val delta = x - y + if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) + } +} + +private[sql] case object LongDelta extends IntegralDelta[LongType.type] { + override val typeId = 5 + + override def supports(columnType: ColumnType[_, _]) = columnType == LONG + + override protected def addDelta(x: Long, delta: Byte) = x + delta + + override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = { + val delta = x - y + if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala index 70b2e851737f8..2ed4cf2170f9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -31,4 +31,12 @@ class ColumnarQuerySuite extends QueryTest { checkAnswer(scan, testData.collect().toSeq) } + + test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { + val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + + checkAnswer(scan, testData.collect().toSeq) + checkAnswer(scan, testData.collect().toSeq) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala new file mode 100644 index 0000000000000..a754f98f7fbf1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.columnar.compression + +import org.scalatest.FunSuite + +import org.apache.spark.sql.Row +import org.apache.spark.sql.columnar.{BOOLEAN, BooleanColumnStats} +import org.apache.spark.sql.columnar.ColumnarTestUtils._ + +class BooleanBitSetSuite extends FunSuite { + import BooleanBitSet._ + + def skeleton(count: Int) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(new BooleanColumnStats, BOOLEAN, BooleanBitSet) + val rows = Seq.fill[Row](count)(makeRandomRow(BOOLEAN)) + val values = rows.map(_.head) + + rows.foreach(builder.appendFrom(_, 0)) + val buffer = builder.build() + + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + element count + bitset words + val compressedSize = 4 + 4 + { + val extra = if (count % BITS_PER_LONG == 0) 0 else 1 + (count / BITS_PER_LONG + extra) * 8 + } + + // 4 extra bytes for compression scheme type ID + expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + expectResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) + expectResult(count, "Wrong element count")(buffer.getInt()) + + var word = 0: Long + for (i <- 0 until count) { + val bit = i % BITS_PER_LONG + word = if (bit == 0) buffer.getLong() else word + expectResult(values(i), s"Wrong value in compressed buffer, index=$i") { + (word & ((1: Long) << bit)) != 0 + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) + values.foreach(expectResult(_, "Wrong decoded value")(decoder.next())) + assert(!decoder.hasNext) + } + + test(s"$BooleanBitSet: empty") { + skeleton(0) + } + + test(s"$BooleanBitSet: less than 1 word") { + skeleton(BITS_PER_LONG - 1) + } + + test(s"$BooleanBitSet: exactly 1 word") { + skeleton(BITS_PER_LONG) + } + + test(s"$BooleanBitSet: multiple whole words") { + skeleton(BITS_PER_LONG * 2) + } + + test(s"$BooleanBitSet: multiple words and 1 more bit") { + skeleton(BITS_PER_LONG * 2 + 1) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala index 184691ab5b46a..eab27987e08ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala @@ -24,7 +24,6 @@ import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types.NativeType import org.apache.spark.sql.columnar._ import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow class DictionaryEncodingSuite extends FunSuite { testDictionaryEncoding(new IntColumnStats, INT) @@ -41,73 +40,82 @@ class DictionaryEncodingSuite extends FunSuite { (0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap } - test(s"$DictionaryEncoding with $typeName: simple case") { + def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) { + Seq.empty + } else { + seq.head +: seq.tail.filterNot(_ == seq.head) + } + + def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) { // ------------- // Tests encoder // ------------- val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding) - val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2) - - builder.initialize(0) - builder.appendFrom(rows(0), 0) - builder.appendFrom(rows(1), 0) - builder.appendFrom(rows(0), 0) - builder.appendFrom(rows(1), 0) - - val buffer = builder.build() - val headerSize = CompressionScheme.columnHeaderSize(buffer) - // 4 extra bytes for dictionary size - val dictionarySize = 4 + values.map(columnType.actualSize).sum - // 4 `Short`s, 2 bytes each - val compressedSize = dictionarySize + 2 * 4 - // 4 extra bytes for compression scheme type ID - expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - val dictionary = buildDictionary(buffer) - Array[Short](0, 1).foreach { i => - expectResult(i, "Wrong dictionary entry")(dictionary(values(i))) - } - - Array[Short](0, 1, 0, 1).foreach { - expectResult(_, "Wrong column element value")(buffer.getShort()) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val dictValues = stableDistinct(inputSeq) + + inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) + + if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) { + withClue("Dictionary overflowed, compression should fail") { + intercept[Throwable] { + builder.build() + } + } + } else { + val buffer = builder.build() + val headerSize = CompressionScheme.columnHeaderSize(buffer) + // 4 extra bytes for dictionary size + val dictionarySize = 4 + values.map(columnType.actualSize).sum + // 2 bytes for each `Short` + val compressedSize = 4 + dictionarySize + 2 * inputSeq.length + // 4 extra bytes for compression scheme type ID + expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val dictionary = buildDictionary(buffer).toMap + + dictValues.foreach { i => + expectResult(i, "Wrong dictionary entry") { + dictionary(values(i)) + } + } + + inputSeq.foreach { i => + expectResult(i.toShort, "Wrong column element value")(buffer.getShort()) + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = DictionaryEncoding.decoder(buffer, columnType) + + inputSeq.foreach { i => + expectResult(values(i), "Wrong decoded value")(decoder.next()) + } + + assert(!decoder.hasNext) } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = new DictionaryEncoding.Decoder[T](buffer, columnType) - - Array[Short](0, 1, 0, 1).foreach { i => - expectResult(values(i), "Wrong decoded value")(decoder.next()) - } - - assert(!decoder.hasNext) } - } - test(s"$DictionaryEncoding: overflow") { - val builder = TestCompressibleColumnBuilder(new IntColumnStats, INT, DictionaryEncoding) - builder.initialize(0) + test(s"$DictionaryEncoding with $typeName: empty") { + skeleton(0, Seq.empty) + } - (0 to Short.MaxValue).foreach { n => - val row = new GenericMutableRow(1) - row.setInt(0, n) - builder.appendFrom(row, 0) + test(s"$DictionaryEncoding with $typeName: simple case") { + skeleton(2, Seq(0, 1, 0, 1)) } - withClue("Dictionary overflowed, encoding should fail") { - intercept[Throwable] { - builder.build() - } + test(s"$DictionaryEncoding with $typeName: dictionary overflow") { + skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala new file mode 100644 index 0000000000000..1390e5eef6106 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.columnar.compression + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.types.IntegralType +import org.apache.spark.sql.columnar._ + +class IntegralDeltaSuite extends FunSuite { + testIntegralDelta(new IntColumnStats, INT, IntDelta) + testIntegralDelta(new LongColumnStats, LONG, LongDelta) + + def testIntegralDelta[I <: IntegralType]( + columnStats: NativeColumnStats[I], + columnType: NativeColumnType[I], + scheme: IntegralDelta[I]) { + + def skeleton(input: Seq[I#JvmType]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme) + val deltas = if (input.isEmpty) { + Seq.empty[Long] + } else { + (input.tail, input.init).zipped.map { + case (x: Int, y: Int) => (x - y).toLong + case (x: Long, y: Long) => x - y + } + } + + input.map { value => + val row = new GenericMutableRow(1) + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + + val buffer = builder.build() + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + (if (deltas.isEmpty) { + 0 + } else { + val oneBoolean = columnType.defaultSize + 1 + oneBoolean + deltas.map { + d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean + }.sum + }) + + // 4 extra bytes for compression scheme type ID + expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + buffer.position(headerSize) + expectResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + if (input.nonEmpty) { + expectResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get()) + expectResult(input.head, "The first value is wrong")(columnType.extract(buffer)) + + (input.tail, deltas).zipped.foreach { (value, delta) => + if (delta < Byte.MaxValue) { + expectResult(delta, "Wrong delta")(buffer.get()) + } else { + expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get()) + expectResult(value, "Wrong value")(columnType.extract(buffer)) + } + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = scheme.decoder(buffer, columnType) + input.foreach(expectResult(_, "Wrong decoded value")(decoder.next())) + assert(!decoder.hasNext) + } + + test(s"$scheme: empty column") { + skeleton(Seq.empty) + } + + test(s"$scheme: simple case") { + val input = columnType match { + case INT => Seq(1: Int, 2: Int, 130: Int) + case LONG => Seq(1: Long, 2: Long, 130: Long) + } + + skeleton(input.map(_.asInstanceOf[I#JvmType])) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala index 2089ad120d4f2..89f9b60a4397b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala @@ -37,34 +37,39 @@ class RunLengthEncodingSuite extends FunSuite { val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - test(s"$RunLengthEncoding with $typeName: simple case") { + def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) { // ------------- // Tests encoder // ------------- val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding) - val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2) - - builder.initialize(0) - builder.appendFrom(rows(0), 0) - builder.appendFrom(rows(0), 0) - builder.appendFrom(rows(1), 0) - builder.appendFrom(rows(1), 0) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val inputSeq = inputRuns.flatMap { case (index, run) => + Seq.fill(run)(index) + } + inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) val buffer = builder.build() + + // Column type ID + null count + null positions val headerSize = CompressionScheme.columnHeaderSize(buffer) - // 4 extra bytes each run for run length - val compressedSize = values.map(columnType.actualSize(_) + 4).sum + + // Compression scheme ID + compressed contents + val compressedSize = 4 + inputRuns.map { case (index, _) => + // 4 extra bytes each run for run length + columnType.actualSize(values(index)) + 4 + }.sum + // 4 extra bytes for compression scheme type ID - expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity) + expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) // Skips column header buffer.position(headerSize) expectResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) - Array(0, 1).foreach { i => - expectResult(values(i), "Wrong column element value")(columnType.extract(buffer)) - expectResult(2, "Wrong run length")(buffer.getInt()) + inputRuns.foreach { case (index, run) => + expectResult(values(index), "Wrong column element value")(columnType.extract(buffer)) + expectResult(run, "Wrong run length")(buffer.getInt()) } // ------------- @@ -74,57 +79,29 @@ class RunLengthEncodingSuite extends FunSuite { // Rewinds, skips column header and 4 more bytes for compression scheme ID buffer.rewind().position(headerSize + 4) - val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType) + val decoder = RunLengthEncoding.decoder(buffer, columnType) - Array(0, 0, 1, 1).foreach { i => + inputSeq.foreach { i => expectResult(values(i), "Wrong decoded value")(decoder.next()) } assert(!decoder.hasNext) } - test(s"$RunLengthEncoding with $typeName: run length == 1") { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding) - val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2) - - builder.initialize(0) - builder.appendFrom(rows(0), 0) - builder.appendFrom(rows(1), 0) - - val buffer = builder.build() - val headerSize = CompressionScheme.columnHeaderSize(buffer) - // 4 bytes each run for run length - val compressedSize = values.map(columnType.actualSize(_) + 4).sum - // 4 bytes for compression scheme type ID - expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - expectResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - Array(0, 1).foreach { i => - expectResult(values(i), "Wrong column element value")(columnType.extract(buffer)) - expectResult(1, "Wrong run length")(buffer.getInt()) - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) + test(s"$RunLengthEncoding with $typeName: empty column") { + skeleton(0, Seq.empty) + } - val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType) + test(s"$RunLengthEncoding with $typeName: simple case") { + skeleton(2, Seq(0 -> 2, 1 ->2)) + } - Array(0, 1).foreach { i => - expectResult(values(i), "Wrong decoded value")(decoder.next()) - } + test(s"$RunLengthEncoding with $typeName: run length == 1") { + skeleton(2, Seq(0 -> 1, 1 ->1)) + } - assert(!decoder.hasNext) + test(s"$RunLengthEncoding with $typeName: single long run") { + skeleton(1, Seq(0 -> 1000)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala index e0ec812863dcf..81bf5e99d19b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala @@ -28,7 +28,7 @@ class TestCompressibleColumnBuilder[T <: NativeType]( with NullableColumnBuilder with CompressibleColumnBuilder[T] { - override protected def isWorthCompressing(encoder: Encoder) = true + override protected def isWorthCompressing(encoder: Encoder[T]) = true } object TestCompressibleColumnBuilder { @@ -37,7 +37,9 @@ object TestCompressibleColumnBuilder { columnType: NativeColumnType[T], scheme: CompressionScheme) = { - new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme)) + val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme)) + builder.initialize(0) + builder } }