Skip to content

Commit

Permalink
[SPARK-20566][SQL] ColumnVector should support appendFloats for array
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR aims to add a missing `appendFloats` API for array into **ColumnVector** class. For double type, there is `appendDoubles` for array [here](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java#L818-L824).

## How was this patch tested?

Pass the Jenkins with a newly added test case.

Author: Dongjoon Hyun <[email protected]>

Closes #17836 from dongjoon-hyun/SPARK-20566.
  • Loading branch information
dongjoon-hyun authored and cloud-fan committed May 4, 2017
1 parent c5dceb8 commit bfc8c79
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,14 @@ public final int appendFloats(int count, float v) {
return result;
}

public final int appendFloats(int length, float[] src, int offset) {
reserve(elementsAppended + length);
int result = elementsAppended;
putFloats(elementsAppended, length, src, offset);
elementsAppended += length;
return result;
}

public final int appendDouble(double v) {
reserve(elementsAppended + 1);
putDouble(elementsAppended, v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,58 @@ class ColumnarBatchSuite extends SparkFunSuite {
val column = ColumnVector.allocate(1024, IntegerType, memMode)
var idx = 0
assert(column.anyNullsSet() == false)
assert(column.numNulls() == 0)

column.appendNotNull()
reference += false
assert(column.anyNullsSet() == false)
assert(column.numNulls() == 0)

column.appendNotNulls(3)
(1 to 3).foreach(_ => reference += false)
assert(column.anyNullsSet() == false)
assert(column.numNulls() == 0)

column.appendNull()
reference += true
assert(column.anyNullsSet())
assert(column.numNulls() == 1)

column.appendNulls(3)
(1 to 3).foreach(_ => reference += true)
assert(column.anyNullsSet())
assert(column.numNulls() == 4)

idx = column.elementsAppended

column.putNotNull(idx)
reference += false
idx += 1
assert(column.anyNullsSet() == false)
assert(column.anyNullsSet())
assert(column.numNulls() == 4)

column.putNull(idx)
reference += true
idx += 1
assert(column.anyNullsSet() == true)
assert(column.numNulls() == 1)
assert(column.anyNullsSet())
assert(column.numNulls() == 5)

column.putNulls(idx, 3)
reference += true
reference += true
reference += true
idx += 3
assert(column.anyNullsSet() == true)
assert(column.anyNullsSet())
assert(column.numNulls() == 8)

column.putNotNulls(idx, 4)
reference += false
reference += false
reference += false
reference += false
idx += 4
assert(column.anyNullsSet() == true)
assert(column.numNulls() == 4)
assert(column.anyNullsSet())
assert(column.numNulls() == 8)

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.isNullAt(v._2))
Expand All @@ -85,9 +110,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Byte]

val column = ColumnVector.allocate(1024, ByteType, memMode)
var idx = 0

val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
column.appendBytes(2, values, 0)
reference += 10.toByte
reference += 20.toByte

column.appendBytes(3, values, 2)
reference += 30.toByte
reference += 40.toByte
reference += 50.toByte

column.appendBytes(6, 60.toByte)
(1 to 6).foreach(_ => reference += 60.toByte)

column.appendByte(70.toByte)
reference += 70.toByte

var idx = column.elementsAppended

values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray
column.putBytes(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -126,9 +168,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Short]

val column = ColumnVector.allocate(1024, ShortType, memMode)
var idx = 0

val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toShort).toArray
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toShort).toArray
column.appendShorts(2, values, 0)
reference += 10.toShort
reference += 20.toShort

column.appendShorts(3, values, 2)
reference += 30.toShort
reference += 40.toShort
reference += 50.toShort

column.appendShorts(6, 60.toShort)
(1 to 6).foreach(_ => reference += 60.toShort)

column.appendShort(70.toShort)
reference += 70.toShort

var idx = column.elementsAppended

values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toShort).toArray
column.putShorts(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -189,9 +248,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Int]

val column = ColumnVector.allocate(1024, IntegerType, memMode)
var idx = 0

val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).toArray
column.appendInts(2, values, 0)
reference += 10
reference += 20

column.appendInts(3, values, 2)
reference += 30
reference += 40
reference += 50

column.appendInts(6, 60)
(1 to 6).foreach(_ => reference += 60)

column.appendInt(70)
reference += 70

var idx = column.elementsAppended

values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
column.putInts(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -257,9 +333,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Long]

val column = ColumnVector.allocate(1024, LongType, memMode)
var idx = 0

val values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray
var values = (10L :: 20L :: 30L :: 40L :: 50L :: Nil).toArray
column.appendLongs(2, values, 0)
reference += 10L
reference += 20L

column.appendLongs(3, values, 2)
reference += 30L
reference += 40L
reference += 50L

column.appendLongs(6, 60L)
(1 to 6).foreach(_ => reference += 60L)

column.appendLong(70L)
reference += 70L

var idx = column.elementsAppended

values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray
column.putLongs(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -320,16 +413,124 @@ class ColumnarBatchSuite extends SparkFunSuite {
}}
}

test("Float APIs") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Float]

val column = ColumnVector.allocate(1024, FloatType, memMode)

var values = (.1f :: .2f :: .3f :: .4f :: .5f :: Nil).toArray
column.appendFloats(2, values, 0)
reference += .1f
reference += .2f

column.appendFloats(3, values, 2)
reference += .3f
reference += .4f
reference += .5f

column.appendFloats(6, .6f)
(1 to 6).foreach(_ => reference += .6f)

column.appendFloat(.7f)
reference += .7f

var idx = column.elementsAppended

values = (1.0f :: 2.0f :: 3.0f :: 4.0f :: 5.0f :: Nil).toArray
column.putFloats(idx, 2, values, 0)
reference += 1.0f
reference += 2.0f
idx += 2

column.putFloats(idx, 3, values, 2)
reference += 3.0f
reference += 4.0f
reference += 5.0f
idx += 3

val buffer = new Array[Byte](8)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Little Endian floats
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0))
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4))
}

column.putFloats(idx, 1, buffer, 4)
column.putFloats(idx + 1, 1, buffer, 0)
reference += 1.123f
reference += 2.234f
idx += 2

column.putFloats(idx, 2, buffer, 0)
reference += 2.234f
reference += 1.123f
idx += 2

while (idx < column.capacity) {
val single = random.nextBoolean()
if (single) {
val v = random.nextFloat()
column.putFloat(idx, v)
reference += v
idx += 1
} else {
val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
val v = random.nextFloat()
column.putFloats(idx, n, v)
var i = 0
while (i < n) {
reference += v
i += 1
}
idx += n
}
}

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getFloat(null, addr + 4 * v._2))
}
}
column.close
}}
}

test("Double APIs") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Double]

val column = ColumnVector.allocate(1024, DoubleType, memMode)
var idx = 0

val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
var values = (.1 :: .2 :: .3 :: .4 :: .5 :: Nil).toArray
column.appendDoubles(2, values, 0)
reference += .1
reference += .2

column.appendDoubles(3, values, 2)
reference += .3
reference += .4
reference += .5

column.appendDoubles(6, .6)
(1 to 6).foreach(_ => reference += .6)

column.appendDouble(.7)
reference += .7

var idx = column.elementsAppended

values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
column.putDoubles(idx, 2, values, 0)
reference += 1.0
reference += 2.0
Expand All @@ -346,8 +547,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Liitle Endian doubles
var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
// Ensure array contains Little Endian doubles
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8))
}
Expand Down Expand Up @@ -400,40 +601,47 @@ class ColumnarBatchSuite extends SparkFunSuite {

val column = ColumnVector.allocate(6, BinaryType, memMode)
assert(column.arrayData().elementsAppended == 0)
var idx = 0

val str = "string"
column.appendByteArray(str.getBytes(StandardCharsets.UTF_8),
0, str.getBytes(StandardCharsets.UTF_8).length)
reference += str
assert(column.arrayData().elementsAppended == 6)

var idx = column.elementsAppended

val values = ("Hello" :: "abc" :: Nil).toArray
column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8),
0, values(0).getBytes(StandardCharsets.UTF_8).length)
reference += values(0)
idx += 1
assert(column.arrayData().elementsAppended == 5)
assert(column.arrayData().elementsAppended == 11)

column.putByteArray(idx, values(1).getBytes(StandardCharsets.UTF_8),
0, values(1).getBytes(StandardCharsets.UTF_8).length)
reference += values(1)
idx += 1
assert(column.arrayData().elementsAppended == 8)
assert(column.arrayData().elementsAppended == 14)

// Just put llo
val offset = column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8),
2, values(0).getBytes(StandardCharsets.UTF_8).length - 2)
reference += "llo"
idx += 1
assert(column.arrayData().elementsAppended == 11)
assert(column.arrayData().elementsAppended == 17)

// Put the same "ll" at offset. This should not allocate more memory in the column.
column.putArray(idx, offset, 2)
reference += "ll"
idx += 1
assert(column.arrayData().elementsAppended == 11)
assert(column.arrayData().elementsAppended == 17)

// Put a long string
val s = "abcdefghijklmnopqrstuvwxyz"
column.putByteArray(idx, (s + s).getBytes(StandardCharsets.UTF_8))
reference += (s + s)
idx += 1
assert(column.arrayData().elementsAppended == 11 + (s + s).length)
assert(column.arrayData().elementsAppended == 17 + (s + s).length)

reference.zipWithIndex.foreach { v =>
assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
Expand Down

0 comments on commit bfc8c79

Please sign in to comment.