diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index ffd4894b5213d..3c39e1d350fa8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer) private[sql] object ColumnAccessor { def apply(buffer: ByteBuffer): ColumnAccessor = { + val dup = buffer.duplicate().order(ByteOrder.nativeOrder) // The first 4 bytes in the buffer indicate the column type. - val columnTypeId = buffer.getInt() + val columnTypeId = dup.getInt() columnTypeId match { - case INT.typeId => new IntColumnAccessor(buffer) - case LONG.typeId => new LongColumnAccessor(buffer) - case FLOAT.typeId => new FloatColumnAccessor(buffer) - case DOUBLE.typeId => new DoubleColumnAccessor(buffer) - case BOOLEAN.typeId => new BooleanColumnAccessor(buffer) - case BYTE.typeId => new ByteColumnAccessor(buffer) - case SHORT.typeId => new ShortColumnAccessor(buffer) - case STRING.typeId => new StringColumnAccessor(buffer) - case BINARY.typeId => new BinaryColumnAccessor(buffer) - case GENERIC.typeId => new GenericColumnAccessor(buffer) + case INT.typeId => new IntColumnAccessor(dup) + case LONG.typeId => new LongColumnAccessor(dup) + case FLOAT.typeId => new FloatColumnAccessor(dup) + case DOUBLE.typeId => new DoubleColumnAccessor(dup) + case BOOLEAN.typeId => new BooleanColumnAccessor(dup) + case BYTE.typeId => new ByteColumnAccessor(dup) + case SHORT.typeId => new ShortColumnAccessor(dup) + case STRING.typeId => new StringColumnAccessor(dup) + case BINARY.typeId => new BinaryColumnAccessor(dup) + case GENERIC.typeId => new GenericColumnAccessor(dup) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala index 70b2e851737f8..2ed4cf2170f9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -31,4 +31,12 @@ class ColumnarQuerySuite extends QueryTest { checkAnswer(scan, testData.collect().toSeq) } + + test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { + val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + + checkAnswer(scan, testData.collect().toSeq) + checkAnswer(scan, testData.collect().toSeq) + } }