Skip to content

Commit

Permalink
Added test suite for BooleanBitSet, refactored other test suites
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Apr 5, 2014
1 parent 44fe4b2 commit 3c1ad7a
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
override val typeId = 2

// 32K unique values allowed
private val MAX_DICT_SIZE = Short.MaxValue - 1
val MAX_DICT_SIZE = Short.MaxValue

override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
new this.Decoder(buffer, columnType)
Expand Down Expand Up @@ -272,9 +272,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
private[sql] case object BooleanBitSet extends CompressionScheme {
override val typeId = 3

private val BITS_PER_LONG = 64

private var _uncompressedSize = 0
val BITS_PER_LONG = 64

override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]]
Expand All @@ -285,11 +283,13 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
override def supports(columnType: ColumnType[_, _]) = columnType == BOOLEAN

class Encoder extends compression.Encoder[BooleanType.type] {
private var _uncompressedSize = 0

override def gatherCompressibilityStats(
value: Boolean,
columnType: NativeColumnType[BooleanType.type]) {

_uncompressedSize += columnType.actualSize(value)
_uncompressedSize += BOOLEAN.defaultSize
}

override def compress(
Expand All @@ -301,7 +301,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
// Total element count (1 byte per Boolean value)
.putInt(from.remaining)

while (from.remaining > BITS_PER_LONG) {
while (from.remaining >= BITS_PER_LONG) {
var word = 0: Long
var i = 0

Expand Down Expand Up @@ -344,7 +344,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] {
private val count = buffer.getInt()

private var currentWord = if (count > 0) buffer.getLong() else 0: Long
private var currentWord = 0: Long

private var visited: Int = 0

Expand All @@ -356,7 +356,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
currentWord = buffer.getLong()
}

((currentWord >> bit) & 1) > 0
((currentWord >> bit) & 1) != 0
}

override def hasNext: Boolean = visited < count
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.columnar.compression

import org.scalatest.FunSuite

import org.apache.spark.sql.Row
import org.apache.spark.sql.columnar.{BOOLEAN, BooleanColumnStats}
import org.apache.spark.sql.columnar.ColumnarTestUtils._

class BooleanBitSetSuite extends FunSuite {
import BooleanBitSet._

def skeleton(count: Int) {
// -------------
// Tests encoder
// -------------

val builder = TestCompressibleColumnBuilder(new BooleanColumnStats, BOOLEAN, BooleanBitSet)
val rows = Seq.fill[Row](count)(makeRandomRow(BOOLEAN))
val values = rows.map(_.head)

rows.foreach(builder.appendFrom(_, 0))
val buffer = builder.build()

// Column type ID + null count + null positions
val headerSize = CompressionScheme.columnHeaderSize(buffer)

// Compression scheme ID + element count + bitset words
val compressedSize = 4 + 4 + {
val extra = if (count % BITS_PER_LONG == 0) 0 else 1
(count / BITS_PER_LONG + extra) * 8
}

// 4 extra bytes for compression scheme type ID
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
expectResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt())
expectResult(count, "Wrong element count")(buffer.getInt())

var word = 0: Long
for (i <- 0 until count) {
val bit = i % BITS_PER_LONG
word = if (bit == 0) buffer.getLong() else word
expectResult(values(i), s"Wrong value in compressed buffer, index=$i") {
(word & ((1: Long) << bit)) != 0
}
}

// -------------
// Tests decoder
// -------------

// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
values.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
assert(!decoder.hasNext)
}

test(s"$BooleanBitSet: empty") {
skeleton(0)
}

test(s"$BooleanBitSet: less than 1 word") {
skeleton(BITS_PER_LONG - 1)
}

test(s"$BooleanBitSet: exactly 1 word") {
skeleton(BITS_PER_LONG)
}

test(s"$BooleanBitSet: multiple whole words") {
skeleton(BITS_PER_LONG * 2)
}

test(s"$BooleanBitSet: multiple words and 1 more bit") {
skeleton(BITS_PER_LONG * 2 + 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.columnar.ColumnarTestUtils._
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow

class DictionaryEncodingSuite extends FunSuite {
testDictionaryEncoding(new IntColumnStats, INT)
Expand All @@ -41,73 +40,82 @@ class DictionaryEncodingSuite extends FunSuite {
(0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap
}

test(s"$DictionaryEncoding with $typeName: simple case") {
def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) {
Seq.empty
} else {
seq.head +: seq.tail.filterNot(_ == seq.head)
}

def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) {
// -------------
// Tests encoder
// -------------

val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding)
val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)

builder.initialize(0)
builder.appendFrom(rows(0), 0)
builder.appendFrom(rows(1), 0)
builder.appendFrom(rows(0), 0)
builder.appendFrom(rows(1), 0)

val buffer = builder.build()
val headerSize = CompressionScheme.columnHeaderSize(buffer)
// 4 extra bytes for dictionary size
val dictionarySize = 4 + values.map(columnType.actualSize).sum
// 4 `Short`s, 2 bytes each
val compressedSize = dictionarySize + 2 * 4
// 4 extra bytes for compression scheme type ID
expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())

val dictionary = buildDictionary(buffer)
Array[Short](0, 1).foreach { i =>
expectResult(i, "Wrong dictionary entry")(dictionary(values(i)))
}

Array[Short](0, 1, 0, 1).foreach {
expectResult(_, "Wrong column element value")(buffer.getShort())
val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
val dictValues = stableDistinct(inputSeq)

inputSeq.foreach(i => builder.appendFrom(rows(i), 0))

if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) {
withClue("Dictionary overflowed, compression should fail") {
intercept[Throwable] {
builder.build()
}
}
} else {
val buffer = builder.build()
val headerSize = CompressionScheme.columnHeaderSize(buffer)
// 4 extra bytes for dictionary size
val dictionarySize = 4 + values.map(columnType.actualSize).sum
// 2 bytes for each `Short`
val compressedSize = 4 + dictionarySize + 2 * inputSeq.length
// 4 extra bytes for compression scheme type ID
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())

val dictionary = buildDictionary(buffer).toMap

dictValues.foreach { i =>
expectResult(i, "Wrong dictionary entry") {
dictionary(values(i))
}
}

inputSeq.foreach { i =>
expectResult(i.toShort, "Wrong column element value")(buffer.getShort())
}

// -------------
// Tests decoder
// -------------

// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = DictionaryEncoding.decoder(buffer, columnType)

inputSeq.foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
}

assert(!decoder.hasNext)
}

// -------------
// Tests decoder
// -------------

// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = new DictionaryEncoding.Decoder[T](buffer, columnType)

Array[Short](0, 1, 0, 1).foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
}

assert(!decoder.hasNext)
}
}

test(s"$DictionaryEncoding: overflow") {
val builder = TestCompressibleColumnBuilder(new IntColumnStats, INT, DictionaryEncoding)
builder.initialize(0)
test(s"$DictionaryEncoding with $typeName: empty") {
skeleton(0, Seq.empty)
}

(0 to Short.MaxValue).foreach { n =>
val row = new GenericMutableRow(1)
row.setInt(0, n)
builder.appendFrom(row, 0)
test(s"$DictionaryEncoding with $typeName: simple case") {
skeleton(2, Seq(0, 1, 0, 1))
}

withClue("Dictionary overflowed, encoding should fail") {
intercept[Throwable] {
builder.build()
}
test(s"$DictionaryEncoding with $typeName: dictionary overflow") {
skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ class RunLengthEncodingSuite extends FunSuite {
// -------------

val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding)
builder.initialize(0)

val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
val inputSeq = inputRuns.flatMap { case (index, run) =>
Seq.fill(run)(index)
Expand All @@ -56,13 +54,14 @@ class RunLengthEncodingSuite extends FunSuite {
// Column type ID + null count + null positions
val headerSize = CompressionScheme.columnHeaderSize(buffer)

// 4 extra bytes each run for run length
val compressedSize = inputRuns.map { case (index, _) =>
// Compression scheme ID + compressed contents
val compressedSize = 4 + inputRuns.map { case (index, _) =>
// 4 extra bytes each run for run length
columnType.actualSize(values(index)) + 4
}.sum

// 4 extra bytes for compression scheme type ID
expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
Expand All @@ -80,7 +79,7 @@ class RunLengthEncodingSuite extends FunSuite {
// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType)
val decoder = RunLengthEncoding.decoder(buffer, columnType)

inputSeq.foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
Expand All @@ -89,6 +88,10 @@ class RunLengthEncodingSuite extends FunSuite {
assert(!decoder.hasNext)
}

test(s"$RunLengthEncoding with $typeName: empty column") {
skeleton(0, Seq.empty)
}

test(s"$RunLengthEncoding with $typeName: simple case") {
skeleton(2, Seq(0 -> 2, 1 ->2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ object TestCompressibleColumnBuilder {
columnType: NativeColumnType[T],
scheme: CompressionScheme) = {

new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
builder.initialize(0)
builder
}
}

0 comments on commit 3c1ad7a

Please sign in to comment.