From e861a6837382eaddd2dbd85d01cbc99c0bcd06fa Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 21 Mar 2016 22:31:21 -0700 Subject: [PATCH 1/4] WIP --- .../apache/spark/sql/ContinuousQuery.scala | 8 + .../spark/sql/ContinuousQueryException.scala | 3 +- .../execution/datasources/DataSource.scala | 58 ++++++- .../parquet/CatalystWriteSupport.scala | 2 +- .../execution/streaming/FileStreamSink.scala | 126 +++++++++++++++ .../streaming/FileStreamSource.scala | 12 +- .../execution/streaming/HDFSMetadataLog.scala | 7 +- .../sql/execution/streaming/MetadataLog.scala | 2 +- .../execution/streaming/StreamExecution.scala | 26 ++- .../apache/spark/sql/sources/interfaces.scala | 20 ++- .../streaming/HDFSMetadataLogSuite.scala | 2 + .../sql/streaming/FileStreamSinkSuite.scala | 148 ++++++++++++++++++ 12 files changed, 393 insertions(+), 21 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala index eb69804c39b5d..56c38187ef132 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala @@ -91,6 +91,14 @@ trait ContinuousQuery { */ def awaitTermination(timeoutMs: Long): Boolean + /** + * Blocks until all available data in the source has been processed an committed to the sink. + * This method is intended for testing. Note that in the case of continually arriving data, this + * method may block forever. Additionally, it only works with sources that synchronously append + * data (i.e. `getOffset` will show the new data immediately upon addition). + */ + def processAllAvailable(): Unit + /** * Stops the execution of this query if it is running. This method blocks until the threads * performing execution has stopped. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala index 67dd9dbe23726..6222ab90611f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala @@ -32,12 +32,13 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} */ @Experimental class ContinuousQueryException private[sql]( + @transient val query: ContinuousQuery, val message: String, val cause: Throwable, val startOffset: Option[Offset] = None, val endOffset: Option[Offset] = None - ) extends Exception(message, cause) { + ) extends Exception(message, cause) with Serializable { /** Time when the exception occurred */ val time: Long = System.currentTimeMillis diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index fac2a64726618..588a9145d9247 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -21,6 +21,7 @@ import java.util.ServiceLoader import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} +import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path @@ -29,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{CalendarIntervalType, StructType} import org.apache.spark.util.Utils @@ -176,14 +177,38 @@ case class DataSource( /** Returns a sink that can be used to continually write data. */ def createSink(): Sink = { - val datasourceClass = providingClass.newInstance() match { - case s: StreamSinkProvider => s + providingClass.newInstance() match { + case s: StreamSinkProvider => s.createSink(sqlContext, options, partitionColumns) + case format: FileFormat => + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val path = caseInsensitiveOptions.getOrElse("path", { + throw new IllegalArgumentException("'path' is not specified") + }) + + new FileStreamSink(sqlContext, path, format) case _ => throw new UnsupportedOperationException( s"Data source $className does not support streamed writing") } + } - datasourceClass.createSink(sqlContext, options, partitionColumns) + def hasMetadata(path: Seq[String]): Boolean = { + path match { + case Seq(singlePath) => + try { + val hdfsPath = new Path(singlePath) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) + val res = fs.exists(metadataPath) + println(s"checking for metadata at $metadataPath : $res") + res + } catch { + case NonFatal(e) => + logWarning(s"Error while looking for metadata directory.") + false + } + case _ => false + } } /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */ @@ -200,6 +225,31 @@ case class DataSource( case (_: RelationProvider, Some(_)) => throw new AnalysisException(s"$className does not allow user-specified schemas.") + case (format: FileFormat, _) + if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => + val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) + val fileCatalog = + new StreamFileCatalog(sqlContext, basePath) + val dataSchema = userSpecifiedSchema.orElse { + format.inferSchema( + sqlContext, + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { + throw new AnalysisException( + s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " + + "It must be specified manually") + } + + HadoopFsRelation( + sqlContext, + fileCatalog, + partitionSchema = fileCatalog.partitionSpec().partitionColumns, + dataSchema = dataSchema, + bucketSpec = None, + format, + options) + case (format: FileFormat, _) => val allPaths = caseInsensitiveOptions.get("path") ++ paths val globbedPaths = allPaths.flatMap { path => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 67bfd39697ed7..2e9bf8acfafc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -55,7 +55,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi private type ValueWriter = (SpecializedGetters, Int) => Unit // Schema of the `InternalRow`s to be written - private var schema: StructType = _ + var schema: StructType = _ // `ValueWriter`s for all fields of the schema private var rootFieldWriters: Seq[ValueWriter] = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala new file mode 100644 index 0000000000000..f5e4ba56f9077 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.UUID + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.PartitionSpec +import org.apache.spark.sql.sources.{Partition, FileCatalog, FileFormat} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.util.collection.OpenHashSet + +object FileStreamSink { + val metadataDir = "_spark_metadata" +} + +class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging { + val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading log from $metadataDirectory") + val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + + override def paths: Seq[Path] = path :: Nil + + override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil) + + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is + * unpartitioned, this will return a single partition with not partition values. + * + * @param filters the filters used to prune which partitions are returned. These filters must + * only refer to partition columns and this method will only return files + * where these predicates are guaranteed to evaluate to `true`. Thus, these + * filters will not need to be evaluated again on the returned data. + */ + override def listFiles(filters: Seq[Expression]): Seq[Partition] = + Partition(InternalRow.empty, allFiles()) :: Nil + + override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path) + + override def refresh(): Unit = {} + + override def allFiles(): Seq[FileStatus] = { + fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_))) + } +} + +/** + * + */ +class FileStreamSink( + sqlContext: SQLContext, + path: String, + fileFormat: FileFormat) extends Sink with Logging { + + val basePath = new Path(path) + val logPath = new Path(basePath, FileStreamSink.metadataDir) + logInfo(s"Logging to $logPath") + val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString) + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + logInfo(s"STARTING BATCH COMMIT $batchId") + if (fileLog.get(batchId).isDefined) { + logInfo(s"Skipping already commited batch $batchId") + } else { + val files = writeFiles(data) + + println(s"Wrote ${files.size}") + + if (fileLog.add(batchId, files)) { + logInfo(s"Wrote batch $batchId") + } else { + logInfo(s"Someone beat us to batch $batchId") + } + } + logInfo(s"DONE BATCH COMMIT $batchId") + } + + private def writeFiles(data: DataFrame): Seq[String] = { + val ctx = sqlContext + val outputDir = path + val format = fileFormat + val schema = data.schema + + /* + data.queryExecution.executedPlan.execute().mapPartitions { rows => + val basePath = new Path(outputDir) + val file = new Path(basePath, UUID.randomUUID().toString) + val writer = format.openWriter(ctx, file.toUri.toString, schema) + rows.foreach(writer.write) + + Iterator(file.toString) + }.collect().toSeq + */ + + val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString + println(s"writing to uuid at $file") + data.write.parquet(file) + sqlContext.read + .schema(data.schema) + .parquet(file).inputFiles + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index d13b1a6166798..dbd9e320ce6cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -44,7 +44,7 @@ class FileStreamSource( private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) private val seenFiles = new OpenHashSet[String] - metadataLog.get(None, maxBatchId).foreach { case (batchId, files) => + metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => files.foreach(seenFiles.add) } @@ -114,17 +114,21 @@ class FileStreamSource( val endId = end.asInstanceOf[LongOffset].offset assert(startId <= endId) - val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten - logDebug(s"Return files from batches ${startId + 1}:$endId") + val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten + logError(s"Processing ${files.length} files from ${startId + 1}:$endId") logDebug(s"Streaming ${files.mkString(", ")}") dataFrameBuilder(files) } private def fetchAllFiles(): Seq[String] = { - fs.listStatus(new Path(path)) + val startTime = System.nanoTime() + val files = fs.listStatus(new Path(path)) .filterNot(_.getPath.getName.startsWith("_")) .map(_.getPath.toUri.toString) + val endTime = System.nanoTime() + logError(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms") + files } override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 298b5d292e8e4..f27d23b1cdcdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -170,11 +170,12 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) } } - override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = { - val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter) + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = { + val files = fc.util().listStatus(metadataPath, batchFilesFilter) + val batchIds = files .map(_.getPath.getName.toLong) .filter { batchId => - batchId <= endId && (startId.isEmpty || batchId >= startId.get) + (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) } batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { case (batchId, metadataOption) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 3f9896d23ce36..cc70e1d314d1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -42,7 +42,7 @@ trait MetadataLog[T] { * Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is * `None`, just return all batches before endId (inclusive). */ - def get(startId: Option[Long], endId: Long): Array[(Long, T)] + def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] /** * Return the latest batch Id and its metadata if exist. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c5fefb5346bc7..a87805c0ebcb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -164,7 +164,7 @@ class StreamExecution( s"Query $name terminated with exception: ${e.getMessage}", e, Some(committedOffsets.toCompositeOffset(sources))) - logError(s"Query $name terminated with error", e) + logDebug(s"Query $name terminated with error", e) } finally { state = TERMINATED sqlContext.streams.notifyQueryTermination(StreamExecution.this) @@ -196,7 +196,7 @@ class StreamExecution( } case None => // We are starting this stream for the first time. - logInfo(s"Starting new continuous query.") + logDebug(s"Starting new continuous query.") currentBatchId = 0 commitAndConstructNextBatch() } @@ -239,6 +239,12 @@ class StreamExecution( logInfo(s"Committed offsets for batch $currentBatchId.") true } else { + noNewData = true + awaitBatchLock.synchronized { + // Wake up any threads that are waiting for the stream to progress. + awaitBatchLock.notifyAll() + } + false } } @@ -297,7 +303,7 @@ class StreamExecution( } val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 - logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") + logDebug(s"Completed up to $availableOffsets in ${batchTime}ms") postEvent(new QueryProgress(this)) } @@ -317,7 +323,7 @@ class StreamExecution( microBatchThread.interrupt() microBatchThread.join() } - logInfo(s"Query $name was stopped") + logDebug(s"Query $name was stopped") } /** @@ -328,7 +334,7 @@ class StreamExecution( def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset while (notDone) { - logInfo(s"Waiting until $newOffset at $source") + logDebug(s"Waiting until $newOffset at $source") awaitBatchLock.synchronized { awaitBatchLock.wait(100) } } logDebug(s"Unblocked at $newOffset for $source") @@ -384,6 +390,16 @@ class StreamExecution( case object INITIALIZED extends State case object ACTIVE extends State case object TERMINATED extends State + + var noNewData = false + override def processAllAvailable(): Unit = { + noNewData = false + while (!noNewData) { + awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } + if (streamDeathCause != null) { throw streamDeathCause } + } + if (streamDeathCause != null) { throw streamDeathCause } + } } private[sql] object StreamExecution { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 1e02354edf4c1..b5a50bbc090fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -418,8 +418,10 @@ case class HadoopFsRelation( s"HadoopFiles" /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = + override def inputFiles: Array[String] = { + println(s"inputfiles: $location") location.allFiles().map(_.getPath.toUri.toString).toArray + } override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum } @@ -427,7 +429,7 @@ case class HadoopFsRelation( /** * Used to read and write data stored in files to/from the [[InternalRow]] format. */ -trait FileFormat { +trait FileFormat extends Serializable { // TODO: Remove /** * When possible, this method should return the schema of the given `files`. When the format * does not support inference, or no valid files are given should return None. In these cases @@ -482,6 +484,20 @@ trait FileFormat { // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. throw new UnsupportedOperationException(s"buildReader is not supported for $this") } + + def openWriter( + sqlContext: SQLContext, + path: String, + dataSchema: StructType): RowWriter = { + // TODO: Remove this default implementation when the other formats have been ported + // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. + throw new UnsupportedOperationException(s"openWriter is not supported for $this") + } +} + +abstract class RowWriter { + def write(row: InternalRow): Unit + def close(): Unit } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 4ddc218455eb2..9ed5686d977c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { + private implicit def toOption[A](a: A): Option[A] = Option(a) + test("basic") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala new file mode 100644 index 0000000000000..bcf4b4aa80376 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File +import java.util.UUID + +import org.apache.spark.sql.{ContinuousQueryException, ContinuousQuery, StreamTest} +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +import scala.util.Random +import scala.util.control.NonFatal + +class FileStressSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("fault tolerance stress test") { + val numRecords = 1000000 + val inputDir = Utils.createTempDir("stream.input").getCanonicalPath + val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath + val outputDir = Utils.createTempDir("stream.output").getCanonicalPath + val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath + + @volatile + var continue = true + var stream: ContinuousQuery = null + + val writer = new Thread("stream writer") { + override def run(): Unit = { + var i = numRecords + while (i > 0) { + val count = Random.nextInt(100) + var j = 0 + var string = "" + while (j < count && i > 0) { + string = string + i + "\n" + j += 1 + i -= 1 + } + + val uuid = UUID.randomUUID().toString + val fileName = new File(stagingDir, uuid) + stringToFile(fileName, string) + fileName.renameTo(new File(inputDir, uuid)) + val sleep = Random.nextInt(100) + Thread.sleep(sleep) + } + + println("DONE WRITING") + var done = false + while (!done) { + try { + stream.processAllAvailable() + done = true + } catch { + case NonFatal(_) => + } + } + + println("STOPPING QUERY") + continue = false + stream.stop() + } + } + writer.start() + + val input = sqlContext.read.format("text").stream(inputDir) + def startStream(): ContinuousQuery = input + .repartition(5) + .as[String] + .mapPartitions { iter => + val rand = Random.nextInt(100) + if (rand < 5) { sys.error("failure") } + iter.map(_.toLong) + } + .write + .format("parquet") + .option("checkpointLocation", checkpoint) + .startStream(outputDir) + + var failures = 0 + val streamThread = new Thread("stream runner") { + while (continue) { + println("Starting stream") + stream = startStream() + + try { + stream.awaitTermination() + } catch { + case ce: ContinuousQueryException => + failures += 1 + } + } + } + + streamThread.join() + + println(s"Stream restarted $failures times.") + assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords) + } +} + +class FileStreamSinkSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("unpartitioned writing") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir("stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath + + val query = + df.write + .format("parquet") + .option("checkpointLocation", checkpointDir) + .startStream(outputDir) + + inputData.addData(1, 2, 3) + println("blocking") + failAfter(streamingTimeout) { query.processAllAvailable() } + println("done") + + val outputDf = sqlContext.read.parquet(outputDir).as[Int] + + checkDataset( + outputDf, + 1, 2, 3) + } +} \ No newline at end of file From 8e8e4c60a8b6a2289de5867767e1d8ebd21d32ba Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 22 Mar 2016 11:26:13 -0700 Subject: [PATCH 2/4] cleanup --- .../apache/spark/sql/ContinuousQuery.scala | 5 +- .../spark/sql/ContinuousQueryException.scala | 3 +- .../execution/datasources/DataSource.scala | 10 +- .../parquet/CatalystWriteSupport.scala | 2 +- .../execution/streaming/FileStreamSink.scala | 81 +++-------- .../streaming/FileStreamSource.scala | 4 +- .../execution/streaming/StreamExecution.scala | 32 +++-- .../streaming/StreamFileCatalog.scala | 59 ++++++++ .../apache/spark/sql/sources/interfaces.scala | 20 +-- .../sql/streaming/FileStreamSinkSuite.scala | 103 +------------- .../spark/sql/streaming/FileStressSuite.scala | 128 ++++++++++++++++++ 11 files changed, 240 insertions(+), 207 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala index 56c38187ef132..1dc9a6893ebb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala @@ -94,8 +94,9 @@ trait ContinuousQuery { /** * Blocks until all available data in the source has been processed an committed to the sink. * This method is intended for testing. Note that in the case of continually arriving data, this - * method may block forever. Additionally, it only works with sources that synchronously append - * data (i.e. `getOffset` will show the new data immediately upon addition). + * method may block forever. Additionally, this method is only guranteed to block until data that + * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] + * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). */ def processAllAvailable(): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala index 6222ab90611f4..5eb5bcd4e74dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala @@ -32,8 +32,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} */ @Experimental class ContinuousQueryException private[sql]( - @transient - val query: ContinuousQuery, + @transient val query: ContinuousQuery, val message: String, val cause: Throwable, val startOffset: Option[Offset] = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 588a9145d9247..231dd0f8f7753 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -21,8 +21,8 @@ import java.util.ServiceLoader import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} -import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path @@ -192,6 +192,10 @@ case class DataSource( } } + /** + * Returns true if there is a single path that has a metadata log indicating which files should + * be read. + */ def hasMetadata(path: Seq[String]): Boolean = { path match { case Seq(singlePath) => @@ -200,7 +204,6 @@ case class DataSource( val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) val res = fs.exists(metadataPath) - println(s"checking for metadata at $metadataPath : $res") res } catch { case NonFatal(e) => @@ -225,6 +228,8 @@ case class DataSource( case (_: RelationProvider, Some(_)) => throw new AnalysisException(s"$className does not allow user-specified schemas.") + // We are reading from the results of a streaming query. Load files from the metadata log + // instead of listing them using HDFS APIs. case (format: FileFormat, _) if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) @@ -250,6 +255,7 @@ case class DataSource( format, options) + // This is a non-streaming file based datasource. case (format: FileFormat, _) => val allPaths = caseInsensitiveOptions.get("path") ++ paths val globbedPaths = allPaths.flatMap { path => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 2e9bf8acfafc1..67bfd39697ed7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -55,7 +55,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi private type ValueWriter = (SpecializedGetters, Int) => Unit // Schema of the `InternalRow`s to be written - var schema: StructType = _ + private var schema: StructType = _ // `ValueWriter`s for all fields of the schema private var rootFieldWriters: Seq[ValueWriter] = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index f5e4ba56f9077..b4419382f8147 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -19,57 +19,23 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.PartitionSpec -import org.apache.spark.sql.sources.{Partition, FileCatalog, FileFormat} - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.util.collection.OpenHashSet +import org.apache.spark.sql.sources.FileFormat object FileStreamSink { + // The name of the subdirectory that is used to store metadata about which files are valid. val metadataDir = "_spark_metadata" } -class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging { - val metadataDirectory = new Path(path, FileStreamSink.metadataDir) - logInfo(s"Reading log from $metadataDirectory") - val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - - override def paths: Seq[Path] = path :: Nil - - override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil) - - /** - * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with not partition values. - * - * @param filters the filters used to prune which partitions are returned. These filters must - * only refer to partition columns and this method will only return files - * where these predicates are guaranteed to evaluate to `true`. Thus, these - * filters will not need to be evaluated again on the returned data. - */ - override def listFiles(filters: Seq[Expression]): Seq[Partition] = - Partition(InternalRow.empty, allFiles()) :: Nil - - override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path) - - override def refresh(): Unit = {} - - override def allFiles(): Seq[FileStatus] = { - fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_))) - } -} - /** - * + * A sink that writes out results to parquet files. Each batch is written out to a unique + * directory. After all of the files in a batch have been succesfully written, the list of + * file paths is appended to the log atomically. In the case of partial failures, some duplicate + * data may be present in the target directory, but only one copy of each file will be present + * in the log. */ class FileStreamSink( sqlContext: SQLContext, @@ -78,49 +44,36 @@ class FileStreamSink( val basePath = new Path(path) val logPath = new Path(basePath, FileStreamSink.metadataDir) - logInfo(s"Logging to $logPath") val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString) override def addBatch(batchId: Long, data: DataFrame): Unit = { - logInfo(s"STARTING BATCH COMMIT $batchId") if (fileLog.get(batchId).isDefined) { - logInfo(s"Skipping already commited batch $batchId") + logInfo(s"Skipping already committed batch $batchId") } else { val files = writeFiles(data) - - println(s"Wrote ${files.size}") - if (fileLog.add(batchId, files)) { - logInfo(s"Wrote batch $batchId") + logInfo(s"Committed batch $batchId") } else { - logInfo(s"Someone beat us to batch $batchId") + logWarning(s"Race while writing batch $batchId") } } - logInfo(s"DONE BATCH COMMIT $batchId") } + /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */ private def writeFiles(data: DataFrame): Seq[String] = { val ctx = sqlContext val outputDir = path val format = fileFormat val schema = data.schema - /* - data.queryExecution.executedPlan.execute().mapPartitions { rows => - val basePath = new Path(outputDir) - val file = new Path(basePath, UUID.randomUUID().toString) - val writer = format.openWriter(ctx, file.toUri.toString, schema) - rows.foreach(writer.write) - - Iterator(file.toString) - }.collect().toSeq - */ - val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString - println(s"writing to uuid at $file") data.write.parquet(file) sqlContext.read .schema(data.schema) - .parquet(file).inputFiles + .parquet(file) + .inputFiles + .map(new Path(_)) + .filterNot(_.getName.startsWith("_")) + .map(_.toUri.toString) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index dbd9e320ce6cf..4965528cd9f4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -115,7 +115,7 @@ class FileStreamSource( assert(startId <= endId) val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten - logError(s"Processing ${files.length} files from ${startId + 1}:$endId") + logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") logDebug(s"Streaming ${files.mkString(", ")}") dataFrameBuilder(files) @@ -127,7 +127,7 @@ class FileStreamSource( .filterNot(_.getPath.getName.startsWith("_")) .map(_.getPath.toUri.toString) val endTime = System.nanoTime() - logError(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms") + logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms") files } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a87805c0ebcb5..40aebc4385edf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -164,7 +164,7 @@ class StreamExecution( s"Query $name terminated with exception: ${e.getMessage}", e, Some(committedOffsets.toCompositeOffset(sources))) - logDebug(s"Query $name terminated with error", e) + logError(s"Query $name terminated with error", e) } finally { state = TERMINATED sqlContext.streams.notifyQueryTermination(StreamExecution.this) @@ -196,7 +196,7 @@ class StreamExecution( } case None => // We are starting this stream for the first time. - logDebug(s"Starting new continuous query.") + logInfo(s"Starting new continuous query.") currentBatchId = 0 commitAndConstructNextBatch() } @@ -303,7 +303,7 @@ class StreamExecution( } val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 - logDebug(s"Completed up to $availableOffsets in ${batchTime}ms") + logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") postEvent(new QueryProgress(this)) } @@ -323,7 +323,7 @@ class StreamExecution( microBatchThread.interrupt() microBatchThread.join() } - logDebug(s"Query $name was stopped") + logInfo(s"Query $name was stopped") } /** @@ -334,12 +334,24 @@ class StreamExecution( def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset while (notDone) { - logDebug(s"Waiting until $newOffset at $source") + logInfo(s"Waiting until $newOffset at $source") awaitBatchLock.synchronized { awaitBatchLock.wait(100) } } logDebug(s"Unblocked at $newOffset for $source") } + /** A flag to indicate that a batch has completed with no new data available. */ + @volatile private var noNewData = false + + override def processAllAvailable(): Unit = { + noNewData = false + while (!noNewData) { + awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } + if (streamDeathCause != null) { throw streamDeathCause } + } + if (streamDeathCause != null) { throw streamDeathCause } + } + override def awaitTermination(): Unit = { if (state == INITIALIZED) { throw new IllegalStateException("Cannot wait for termination on a query that has not started") @@ -390,16 +402,6 @@ class StreamExecution( case object INITIALIZED extends State case object ACTIVE extends State case object TERMINATED extends State - - var noNewData = false - override def processAllAvailable(): Unit = { - noNewData = false - while (!noNewData) { - awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } - if (streamDeathCause != null) { throw streamDeathCause } - } - if (streamDeathCause != null) { throw streamDeathCause } - } } private[sql] object StreamExecution { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala new file mode 100644 index 0000000000000..b8d69b18450cf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.PartitionSpec +import org.apache.spark.sql.sources.{FileCatalog, Partition} +import org.apache.spark.sql.types.StructType + +class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging { + val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading streaming file log from $metadataDirectory") + val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + + override def paths: Seq[Path] = path :: Nil + + override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil) + + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is + * unpartitioned, this will return a single partition with not partition values. + * + * @param filters the filters used to prune which partitions are returned. These filters must + * only refer to partition columns and this method will only return files + * where these predicates are guaranteed to evaluate to `true`. Thus, these + * filters will not need to be evaluated again on the returned data. + */ + override def listFiles(filters: Seq[Expression]): Seq[Partition] = + Partition(InternalRow.empty, allFiles()) :: Nil + + override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path) + + override def refresh(): Unit = {} + + override def allFiles(): Seq[FileStatus] = { + fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_))) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index b5a50bbc090fd..1e02354edf4c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -418,10 +418,8 @@ case class HadoopFsRelation( s"HadoopFiles" /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = { - println(s"inputfiles: $location") + override def inputFiles: Array[String] = location.allFiles().map(_.getPath.toUri.toString).toArray - } override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum } @@ -429,7 +427,7 @@ case class HadoopFsRelation( /** * Used to read and write data stored in files to/from the [[InternalRow]] format. */ -trait FileFormat extends Serializable { // TODO: Remove +trait FileFormat { /** * When possible, this method should return the schema of the given `files`. When the format * does not support inference, or no valid files are given should return None. In these cases @@ -484,20 +482,6 @@ trait FileFormat extends Serializable { // TODO: Remove // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. throw new UnsupportedOperationException(s"buildReader is not supported for $this") } - - def openWriter( - sqlContext: SQLContext, - path: String, - dataSchema: StructType): RowWriter = { - // TODO: Remove this default implementation when the other formats have been ported - // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. - throw new UnsupportedOperationException(s"openWriter is not supported for $this") - } -} - -abstract class RowWriter { - def write(row: InternalRow): Unit - def close(): Unit } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index bcf4b4aa80376..7f316113835ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -17,107 +17,11 @@ package org.apache.spark.sql.streaming -import java.io.File -import java.util.UUID - -import org.apache.spark.sql.{ContinuousQueryException, ContinuousQuery, StreamTest} -import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils -import scala.util.Random -import scala.util.control.NonFatal - -class FileStressSuite extends StreamTest with SharedSQLContext { - import testImplicits._ - - test("fault tolerance stress test") { - val numRecords = 1000000 - val inputDir = Utils.createTempDir("stream.input").getCanonicalPath - val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath - val outputDir = Utils.createTempDir("stream.output").getCanonicalPath - val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath - - @volatile - var continue = true - var stream: ContinuousQuery = null - - val writer = new Thread("stream writer") { - override def run(): Unit = { - var i = numRecords - while (i > 0) { - val count = Random.nextInt(100) - var j = 0 - var string = "" - while (j < count && i > 0) { - string = string + i + "\n" - j += 1 - i -= 1 - } - - val uuid = UUID.randomUUID().toString - val fileName = new File(stagingDir, uuid) - stringToFile(fileName, string) - fileName.renameTo(new File(inputDir, uuid)) - val sleep = Random.nextInt(100) - Thread.sleep(sleep) - } - - println("DONE WRITING") - var done = false - while (!done) { - try { - stream.processAllAvailable() - done = true - } catch { - case NonFatal(_) => - } - } - - println("STOPPING QUERY") - continue = false - stream.stop() - } - } - writer.start() - - val input = sqlContext.read.format("text").stream(inputDir) - def startStream(): ContinuousQuery = input - .repartition(5) - .as[String] - .mapPartitions { iter => - val rand = Random.nextInt(100) - if (rand < 5) { sys.error("failure") } - iter.map(_.toLong) - } - .write - .format("parquet") - .option("checkpointLocation", checkpoint) - .startStream(outputDir) - - var failures = 0 - val streamThread = new Thread("stream runner") { - while (continue) { - println("Starting stream") - stream = startStream() - - try { - stream.awaitTermination() - } catch { - case ce: ContinuousQueryException => - failures += 1 - } - } - } - - streamThread.join() - - println(s"Stream restarted $failures times.") - assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords) - } -} - class FileStreamSinkSuite extends StreamTest with SharedSQLContext { import testImplicits._ @@ -135,14 +39,11 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { .startStream(outputDir) inputData.addData(1, 2, 3) - println("blocking") failAfter(streamingTimeout) { query.processAllAvailable() } - println("done") val outputDf = sqlContext.read.parquet(outputDir).as[Int] - checkDataset( outputDf, 1, 2, 3) } -} \ No newline at end of file +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala new file mode 100644 index 0000000000000..9cd2968925735 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File +import java.util.UUID + +import scala.util.Random +import scala.util.control.NonFatal + +import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest} +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +/** + * A stress test for streamign queries that read and write files. This test constists of + * two threads: + * - one that writes out `numRecords` distinct integers to files of random sizes (the total + * number of records is fixed but each files size / creation time is random). + * - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on + * any partition). + * + * At the end, the resulting files are loaded and the answer is checked. + */ +class FileStressSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("fault tolerance stress test") { + val numRecords = 10000 + val inputDir = Utils.createTempDir("stream.input").getCanonicalPath + val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath + val outputDir = Utils.createTempDir("stream.output").getCanonicalPath + val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath + + @volatile + var continue = true + var stream: ContinuousQuery = null + + val writer = new Thread("stream writer") { + override def run(): Unit = { + var i = numRecords + while (i > 0) { + val count = Random.nextInt(100) + var j = 0 + var string = "" + while (j < count && i > 0) { + if (i % 10000 == 0) { logError(s"Wrote record $i") } + string = string + i + "\n" + j += 1 + i -= 1 + } + + val uuid = UUID.randomUUID().toString + val fileName = new File(stagingDir, uuid) + stringToFile(fileName, string) + fileName.renameTo(new File(inputDir, uuid)) + val sleep = Random.nextInt(100) + Thread.sleep(sleep) + } + + logError("== DONE WRITING ==") + var done = false + while (!done) { + try { + stream.processAllAvailable() + done = true + } catch { + case NonFatal(_) => + } + } + + continue = false + stream.stop() + } + } + writer.start() + + val input = sqlContext.read.format("text").stream(inputDir) + def startStream(): ContinuousQuery = input + .repartition(5) + .as[String] + .mapPartitions { iter => + val rand = Random.nextInt(100) + if (rand < 5) { sys.error("failure") } + iter.map(_.toLong) + } + .write + .format("parquet") + .option("checkpointLocation", checkpoint) + .startStream(outputDir) + + var failures = 0 + val streamThread = new Thread("stream runner") { + while (continue) { + if (failures % 10 == 0) { logError(s"Query restart #$failures") } + stream = startStream() + + try { + stream.awaitTermination() + } catch { + case ce: ContinuousQueryException => + failures += 1 + } + } + } + + streamThread.join() + + logError(s"Stream restarted $failures times.") + assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords) + } +} From 7da7c7eb455d065a67a5f6143ce8ca99102f3742 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 22 Mar 2016 16:29:33 -0700 Subject: [PATCH 3/4] comments --- .../org/apache/spark/sql/ContinuousQueryException.scala | 4 ++-- .../spark/sql/execution/streaming/CompositeOffset.scala | 3 +++ .../spark/sql/execution/streaming/FileStreamSink.scala | 8 +++++--- .../spark/sql/execution/streaming/FileStreamSource.scala | 2 ++ .../apache/spark/sql/execution/streaming/LongOffset.scala | 2 ++ 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala index 5eb5bcd4e74dd..fec38629d914e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala @@ -36,8 +36,8 @@ class ContinuousQueryException private[sql]( val message: String, val cause: Throwable, val startOffset: Option[Offset] = None, - val endOffset: Option[Offset] = None - ) extends Exception(message, cause) with Serializable { + val endOffset: Option[Offset] = None) + extends Exception(message, cause) { /** Time when the exception occurred */ val time: Long = System.currentTimeMillis diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index e48ac598929ab..729c8462fed65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -64,6 +64,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { assert(sources.size == offsets.size) new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) } } + + override def toString: String = + offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]") } object CompositeOffset { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index b4419382f8147..e819e95d61f9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -42,9 +42,9 @@ class FileStreamSink( path: String, fileFormat: FileFormat) extends Sink with Logging { - val basePath = new Path(path) - val logPath = new Path(basePath, FileStreamSink.metadataDir) - val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString) + private val basePath = new Path(path) + private val logPath = new Path(basePath, FileStreamSink.metadataDir) + private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString) override def addBatch(batchId: Long, data: DataFrame): Unit = { if (fileLog.get(batchId).isDefined) { @@ -76,4 +76,6 @@ class FileStreamSink( .filterNot(_.getName.startsWith("_")) .map(_.toUri.toString) } + + override def toString: String = s"FileSink[$path]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 4965528cd9f4e..c382586f2375c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -132,4 +132,6 @@ class FileStreamSource( } override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) + + override def toString: String = s"FileSink[$path]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 008195af38b75..bb176408d8f59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -30,4 +30,6 @@ case class LongOffset(offset: Long) extends Offset { def +(increment: Long): LongOffset = new LongOffset(offset + increment) def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) + + override def toString: String = s"#$offset" } From e821f2f558fc3426a13e89598eb1a3eb887daeff Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 22 Mar 2016 16:59:36 -0700 Subject: [PATCH 4/4] more comments --- .../scala/org/apache/spark/sql/streaming/FileStressSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 9cd2968925735..5a1bfb3a005c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -50,6 +50,7 @@ class FileStressSuite extends StreamTest with SharedSQLContext { @volatile var continue = true + @volatile var stream: ContinuousQuery = null val writer = new Thread("stream writer") {