Skip to content

Commit

Permalink
Fixed SPARK-1436: in-memory column byte buffer must be able to be acc…
Browse files Browse the repository at this point in the history
…essed multiple times

Forgot to duplicate the in-memory column byte buffer when creating new ColumnAccessor's, so that when the column byte buffer is accessed multiple times, the position is not reset to 0.
  • Loading branch information
liancheng committed Apr 8, 2014
1 parent d7c0e8f commit 1d037b8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)

private[sql] object ColumnAccessor {
def apply(buffer: ByteBuffer): ColumnAccessor = {
val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
// The first 4 bytes in the buffer indicate the column type.
val columnTypeId = buffer.getInt()
val columnTypeId = dup.getInt()

columnTypeId match {
case INT.typeId => new IntColumnAccessor(buffer)
case LONG.typeId => new LongColumnAccessor(buffer)
case FLOAT.typeId => new FloatColumnAccessor(buffer)
case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
case BYTE.typeId => new ByteColumnAccessor(buffer)
case SHORT.typeId => new ShortColumnAccessor(buffer)
case STRING.typeId => new StringColumnAccessor(buffer)
case BINARY.typeId => new BinaryColumnAccessor(buffer)
case GENERIC.typeId => new GenericColumnAccessor(buffer)
case INT.typeId => new IntColumnAccessor(dup)
case LONG.typeId => new LongColumnAccessor(dup)
case FLOAT.typeId => new FloatColumnAccessor(dup)
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
case BYTE.typeId => new ByteColumnAccessor(dup)
case SHORT.typeId => new ShortColumnAccessor(dup)
case STRING.typeId => new StringColumnAccessor(dup)
case BINARY.typeId => new BinaryColumnAccessor(dup)
case GENERIC.typeId => new GenericColumnAccessor(dup)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ class ColumnarQuerySuite extends QueryTest {

checkAnswer(scan, testData.collect().toSeq)
}

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
}
}

0 comments on commit 1d037b8

Please sign in to comment.