From 92bda0daf2fffeea0f1de9199fc71fe978a165c7 Mon Sep 17 00:00:00 2001 From: Kevin Mader Date: Mon, 20 Oct 2014 19:17:18 +0200 Subject: [PATCH] added new tests, renamed files, fixed several of the javaapi functions, formatted code more nicely --- .../spark/api/java/JavaSparkContext.scala | 5 +- .../input/FixedLengthBinaryInputFormat.scala | 10 ---- .../input/FixedLengthBinaryRecordReader.scala | 18 ------ ...leInput.scala => PortableDataStream.scala} | 0 .../org/apache/spark/rdd/BinaryFileRDD.scala | 5 +- .../java/org/apache/spark/JavaAPISuite.java | 4 +- .../scala/org/apache/spark/FileSuite.scala | 60 +++++++++++++++++++ 7 files changed, 66 insertions(+), 36 deletions(-) rename core/src/main/scala/org/apache/spark/input/{RawFileInput.scala => PortableDataStream.scala} (100%) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index ef107b27c87fb..7403a0176bb5f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -256,9 +256,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + def binaryFiles(path: String, minPartitions: Int): JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + def binaryFiles(path: String): + JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,defaultMinPartitions)) + /** * Read a directory of files as DataInputStream from HDFS, * a local file system (available on all nodes), or any Hadoop-supported file system URI diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala index 646fe23738a66..852be54d181d4 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt */ private[spark] object FixedLengthBinaryInputFormat { - /** * This function retrieves the recordLength by checking the configuration parameter * @@ -39,13 +38,10 @@ private[spark] object FixedLengthBinaryInputFormat { // retrieve record length from configuration context.getConfiguration.get("recordLength").toInt } - } private[spark] class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] { - - /** * Override of isSplitable to ensure initial computation of the record length */ @@ -60,7 +56,6 @@ private[spark] class FixedLengthBinaryInputFormat } else { true } - } /** @@ -69,14 +64,11 @@ private[spark] class FixedLengthBinaryInputFormat * will start at the first byte of a record, and the last byte will the last byte of a record. */ override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = { - val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize) - // If the default size is less than the length of a record, make it equal to it // Otherwise, make sure the split size is as close to possible as the default size, // but still contains a complete set of records, with the first record // starting at the first byte in the split and the last record ending with the last byte - if (defaultSize < recordLength) { recordLength.toLong } else { @@ -91,7 +83,5 @@ private[spark] class FixedLengthBinaryInputFormat RecordReader[LongWritable, BytesWritable] = { new FixedLengthBinaryRecordReader } - var recordLength = -1 - } diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala index eb27a98fe09f2..98988910831be 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala @@ -76,66 +76,48 @@ private[spark] class FixedLengthBinaryRecordReader // the actual file we will be reading from val file = fileSplit.getPath - // job configuration val job = context.getConfiguration - // check compression val codec = new CompressionCodecFactory(job).getCodec(file) if (codec != null) { throw new IOException("FixedLengthRecordReader does not support reading compressed files") } - // get the record length recordLength = FixedLengthBinaryInputFormat.getRecordLength(context) - // get the filesystem val fs = file.getFileSystem(job) - // open the File fileInputStream = fs.open(file) - // seek to the splitStart position fileInputStream.seek(splitStart) - // set our current position currentPosition = splitStart - } override def nextKeyValue(): Boolean = { - if (recordKey == null) { recordKey = new LongWritable() } - // the key is a linear index of the record, given by the // position the record starts divided by the record length recordKey.set(currentPosition / recordLength) - // the recordValue to place the bytes into if (recordValue == null) { recordValue = new BytesWritable(new Array[Byte](recordLength)) } - // read a record if the currentPosition is less than the split end if (currentPosition < splitEnd) { - // setup a buffer to store the record val buffer = recordValue.getBytes - fileInputStream.read(buffer, 0, recordLength) - // update our current position currentPosition = currentPosition + recordLength - // return true return true } - false } - var splitStart: Long = 0L var splitEnd: Long = 0L var currentPosition: Long = 0L diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala similarity index 100% rename from core/src/main/scala/org/apache/spark/input/RawFileInput.scala rename to core/src/main/scala/org/apache/spark/input/PortableDataStream.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index c7dc50820d59b..3d1b5f1b543f5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -17,14 +17,11 @@ package org.apache.spark.rdd -/** Allows better control of the partitioning - * - */ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ -import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext} import org.apache.spark.input.StreamFileInputFormat +import org.apache.spark.{Partition, SparkContext} private[spark] class BinaryFileRDD[T]( sc : SparkContext, diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 18566e633ce8e..a39ee903ba3c9 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -844,7 +844,6 @@ public void binaryFiles() throws Exception { // Reusing the wholeText files example byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); - String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); @@ -866,7 +865,6 @@ public void binaryFilesCaching() throws Exception { // Reusing the wholeText files example byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); - String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); @@ -877,7 +875,7 @@ public void binaryFilesCaching() throws Exception { channel1.write(bbuf); channel1.close(); - JavaPairRDD readRDD = sc.binaryFiles(tempDirName,3).cache(); + JavaPairRDD readRDD = sc.binaryFiles(tempDirName).cache(); readRDD.foreach(new VoidFunction>() { @Override public void call(Tuple2 stringPortableDataStreamTuple2) throws Exception { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index e1c33d679fd24..e265b43fb79d1 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io.{File, FileWriter} import org.apache.spark.input.PortableDataStream +import org.apache.spark.storage.StorageLevel import scala.io.Source @@ -280,6 +281,37 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(indata.toArray === testOutput) } + test("portabledatastream persist disk storage") { + sc = new SparkContext("local", "test") + val outFile = new File(tempDir, "record-bytestream-00000.bin") + val outFileName = outFile.getAbsolutePath() + + // create file + val testOutput = Array[Byte](1,2,3,4,5,6) + val bbuf = java.nio.ByteBuffer.wrap(testOutput) + // write data to file + val file = new java.io.FileOutputStream(outFile) + val channel = file.getChannel + channel.write(bbuf) + channel.close() + file.close() + + val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY) + inRdd.foreach{ + curData: (String, PortableDataStream) => + curData._2.toArray() // force the file to read + } + val mappedRdd = inRdd.map{ + curData: (String, PortableDataStream) => + (curData._2.getPath(),curData._2) + } + val (infile: String, indata: PortableDataStream) = mappedRdd.first + + // Try reading the output back as an object file + + assert(indata.toArray === testOutput) + } + test("portabledatastream flatmap tests") { sc = new SparkContext("local", "test") val outFile = new File(tempDir, "record-bytestream-00000.bin") @@ -348,6 +380,34 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(indata === testOutput) } + test ("negative binary record length should raise an exception") { + // a fixed length of 6 bytes + sc = new SparkContext("local", "test") + + val outFile = new File(tempDir, "record-bytestream-00000.bin") + val outFileName = outFile.getAbsolutePath() + + // create file + val testOutput = Array[Byte](1,2,3,4,5,6) + val testOutputCopies = 10 + + // write data to file + val file = new java.io.FileOutputStream(outFile) + val channel = file.getChannel + for(i <- 1 to testOutputCopies) { + val bbuf = java.nio.ByteBuffer.wrap(testOutput) + channel.write(bbuf) + } + channel.close() + file.close() + + val inRdd = sc.binaryRecords(outFileName, -1) + + intercept[SparkException] { + inRdd.count + } + } + test("file caching") { sc = new SparkContext("local", "test") val out = new FileWriter(tempDir + "/input")