diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala index eb69804c39b5d..1dc9a6893ebb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala @@ -91,6 +91,15 @@ trait ContinuousQuery { */ def awaitTermination(timeoutMs: Long): Boolean + /** + * Blocks until all available data in the source has been processed an committed to the sink. + * This method is intended for testing. Note that in the case of continually arriving data, this + * method may block forever. Additionally, this method is only guranteed to block until data that + * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] + * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). + */ + def processAllAvailable(): Unit + /** * Stops the execution of this query if it is running. This method blocks until the threads * performing execution has stopped. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala index 67dd9dbe23726..fec38629d914e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala @@ -32,12 +32,12 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} */ @Experimental class ContinuousQueryException private[sql]( - val query: ContinuousQuery, + @transient val query: ContinuousQuery, val message: String, val cause: Throwable, val startOffset: Option[Offset] = None, - val endOffset: Option[Offset] = None - ) extends Exception(message, cause) { + val endOffset: Option[Offset] = None) + extends Exception(message, cause) { /** Time when the exception occurred */ val time: Long = System.currentTimeMillis diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index fac2a64726618..231dd0f8f7753 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -22,6 +22,7 @@ import java.util.ServiceLoader import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path @@ -29,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{CalendarIntervalType, StructType} import org.apache.spark.util.Utils @@ -176,14 +177,41 @@ case class DataSource( /** Returns a sink that can be used to continually write data. */ def createSink(): Sink = { - val datasourceClass = providingClass.newInstance() match { - case s: StreamSinkProvider => s + providingClass.newInstance() match { + case s: StreamSinkProvider => s.createSink(sqlContext, options, partitionColumns) + case format: FileFormat => + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val path = caseInsensitiveOptions.getOrElse("path", { + throw new IllegalArgumentException("'path' is not specified") + }) + + new FileStreamSink(sqlContext, path, format) case _ => throw new UnsupportedOperationException( s"Data source $className does not support streamed writing") } + } - datasourceClass.createSink(sqlContext, options, partitionColumns) + /** + * Returns true if there is a single path that has a metadata log indicating which files should + * be read. + */ + def hasMetadata(path: Seq[String]): Boolean = { + path match { + case Seq(singlePath) => + try { + val hdfsPath = new Path(singlePath) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) + val res = fs.exists(metadataPath) + res + } catch { + case NonFatal(e) => + logWarning(s"Error while looking for metadata directory.") + false + } + case _ => false + } } /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */ @@ -200,6 +228,34 @@ case class DataSource( case (_: RelationProvider, Some(_)) => throw new AnalysisException(s"$className does not allow user-specified schemas.") + // We are reading from the results of a streaming query. Load files from the metadata log + // instead of listing them using HDFS APIs. + case (format: FileFormat, _) + if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => + val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) + val fileCatalog = + new StreamFileCatalog(sqlContext, basePath) + val dataSchema = userSpecifiedSchema.orElse { + format.inferSchema( + sqlContext, + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { + throw new AnalysisException( + s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " + + "It must be specified manually") + } + + HadoopFsRelation( + sqlContext, + fileCatalog, + partitionSchema = fileCatalog.partitionSpec().partitionColumns, + dataSchema = dataSchema, + bucketSpec = None, + format, + options) + + // This is a non-streaming file based datasource. case (format: FileFormat, _) => val allPaths = caseInsensitiveOptions.get("path") ++ paths val globbedPaths = allPaths.flatMap { path => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index e48ac598929ab..729c8462fed65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -64,6 +64,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { assert(sources.size == offsets.size) new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) } } + + override def toString: String = + offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]") } object CompositeOffset { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala new file mode 100644 index 0000000000000..e819e95d61f9a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.UUID + +import org.apache.hadoop.fs.Path + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.sources.FileFormat + +object FileStreamSink { + // The name of the subdirectory that is used to store metadata about which files are valid. + val metadataDir = "_spark_metadata" +} + +/** + * A sink that writes out results to parquet files. Each batch is written out to a unique + * directory. After all of the files in a batch have been succesfully written, the list of + * file paths is appended to the log atomically. In the case of partial failures, some duplicate + * data may be present in the target directory, but only one copy of each file will be present + * in the log. + */ +class FileStreamSink( + sqlContext: SQLContext, + path: String, + fileFormat: FileFormat) extends Sink with Logging { + + private val basePath = new Path(path) + private val logPath = new Path(basePath, FileStreamSink.metadataDir) + private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString) + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + if (fileLog.get(batchId).isDefined) { + logInfo(s"Skipping already committed batch $batchId") + } else { + val files = writeFiles(data) + if (fileLog.add(batchId, files)) { + logInfo(s"Committed batch $batchId") + } else { + logWarning(s"Race while writing batch $batchId") + } + } + } + + /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */ + private def writeFiles(data: DataFrame): Seq[String] = { + val ctx = sqlContext + val outputDir = path + val format = fileFormat + val schema = data.schema + + val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString + data.write.parquet(file) + sqlContext.read + .schema(data.schema) + .parquet(file) + .inputFiles + .map(new Path(_)) + .filterNot(_.getName.startsWith("_")) + .map(_.toUri.toString) + } + + override def toString: String = s"FileSink[$path]" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index d13b1a6166798..c382586f2375c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -44,7 +44,7 @@ class FileStreamSource( private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) private val seenFiles = new OpenHashSet[String] - metadataLog.get(None, maxBatchId).foreach { case (batchId, files) => + metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => files.foreach(seenFiles.add) } @@ -114,18 +114,24 @@ class FileStreamSource( val endId = end.asInstanceOf[LongOffset].offset assert(startId <= endId) - val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten - logDebug(s"Return files from batches ${startId + 1}:$endId") + val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten + logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") logDebug(s"Streaming ${files.mkString(", ")}") dataFrameBuilder(files) } private def fetchAllFiles(): Seq[String] = { - fs.listStatus(new Path(path)) + val startTime = System.nanoTime() + val files = fs.listStatus(new Path(path)) .filterNot(_.getPath.getName.startsWith("_")) .map(_.getPath.toUri.toString) + val endTime = System.nanoTime() + logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms") + files } override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) + + override def toString: String = s"FileSink[$path]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 298b5d292e8e4..f27d23b1cdcdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -170,11 +170,12 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) } } - override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = { - val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter) + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = { + val files = fc.util().listStatus(metadataPath, batchFilesFilter) + val batchIds = files .map(_.getPath.getName.toLong) .filter { batchId => - batchId <= endId && (startId.isEmpty || batchId >= startId.get) + (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) } batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { case (batchId, metadataOption) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 008195af38b75..bb176408d8f59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -30,4 +30,6 @@ case class LongOffset(offset: Long) extends Offset { def +(increment: Long): LongOffset = new LongOffset(offset + increment) def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) + + override def toString: String = s"#$offset" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 3f9896d23ce36..cc70e1d314d1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -42,7 +42,7 @@ trait MetadataLog[T] { * Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is * `None`, just return all batches before endId (inclusive). */ - def get(startId: Option[Long], endId: Long): Array[(Long, T)] + def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] /** * Return the latest batch Id and its metadata if exist. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c5fefb5346bc7..40aebc4385edf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -239,6 +239,12 @@ class StreamExecution( logInfo(s"Committed offsets for batch $currentBatchId.") true } else { + noNewData = true + awaitBatchLock.synchronized { + // Wake up any threads that are waiting for the stream to progress. + awaitBatchLock.notifyAll() + } + false } } @@ -334,6 +340,18 @@ class StreamExecution( logDebug(s"Unblocked at $newOffset for $source") } + /** A flag to indicate that a batch has completed with no new data available. */ + @volatile private var noNewData = false + + override def processAllAvailable(): Unit = { + noNewData = false + while (!noNewData) { + awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } + if (streamDeathCause != null) { throw streamDeathCause } + } + if (streamDeathCause != null) { throw streamDeathCause } + } + override def awaitTermination(): Unit = { if (state == INITIALIZED) { throw new IllegalStateException("Cannot wait for termination on a query that has not started") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala new file mode 100644 index 0000000000000..b8d69b18450cf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.PartitionSpec +import org.apache.spark.sql.sources.{FileCatalog, Partition} +import org.apache.spark.sql.types.StructType + +class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging { + val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading streaming file log from $metadataDirectory") + val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + + override def paths: Seq[Path] = path :: Nil + + override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil) + + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is + * unpartitioned, this will return a single partition with not partition values. + * + * @param filters the filters used to prune which partitions are returned. These filters must + * only refer to partition columns and this method will only return files + * where these predicates are guaranteed to evaluate to `true`. Thus, these + * filters will not need to be evaluated again on the returned data. + */ + override def listFiles(filters: Seq[Expression]): Seq[Partition] = + Partition(InternalRow.empty, allFiles()) :: Nil + + override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path) + + override def refresh(): Unit = {} + + override def allFiles(): Seq[FileStatus] = { + fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_))) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 4ddc218455eb2..9ed5686d977c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { + private implicit def toOption[A](a: A): Option[A] = Option(a) + test("basic") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala new file mode 100644 index 0000000000000..7f316113835ff --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.StreamTest +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +class FileStreamSinkSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("unpartitioned writing") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir("stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath + + val query = + df.write + .format("parquet") + .option("checkpointLocation", checkpointDir) + .startStream(outputDir) + + inputData.addData(1, 2, 3) + failAfter(streamingTimeout) { query.processAllAvailable() } + + val outputDf = sqlContext.read.parquet(outputDir).as[Int] + checkDataset( + outputDf, + 1, 2, 3) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala new file mode 100644 index 0000000000000..5a1bfb3a005c8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File +import java.util.UUID + +import scala.util.Random +import scala.util.control.NonFatal + +import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest} +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +/** + * A stress test for streamign queries that read and write files. This test constists of + * two threads: + * - one that writes out `numRecords` distinct integers to files of random sizes (the total + * number of records is fixed but each files size / creation time is random). + * - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on + * any partition). + * + * At the end, the resulting files are loaded and the answer is checked. + */ +class FileStressSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("fault tolerance stress test") { + val numRecords = 10000 + val inputDir = Utils.createTempDir("stream.input").getCanonicalPath + val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath + val outputDir = Utils.createTempDir("stream.output").getCanonicalPath + val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath + + @volatile + var continue = true + @volatile + var stream: ContinuousQuery = null + + val writer = new Thread("stream writer") { + override def run(): Unit = { + var i = numRecords + while (i > 0) { + val count = Random.nextInt(100) + var j = 0 + var string = "" + while (j < count && i > 0) { + if (i % 10000 == 0) { logError(s"Wrote record $i") } + string = string + i + "\n" + j += 1 + i -= 1 + } + + val uuid = UUID.randomUUID().toString + val fileName = new File(stagingDir, uuid) + stringToFile(fileName, string) + fileName.renameTo(new File(inputDir, uuid)) + val sleep = Random.nextInt(100) + Thread.sleep(sleep) + } + + logError("== DONE WRITING ==") + var done = false + while (!done) { + try { + stream.processAllAvailable() + done = true + } catch { + case NonFatal(_) => + } + } + + continue = false + stream.stop() + } + } + writer.start() + + val input = sqlContext.read.format("text").stream(inputDir) + def startStream(): ContinuousQuery = input + .repartition(5) + .as[String] + .mapPartitions { iter => + val rand = Random.nextInt(100) + if (rand < 5) { sys.error("failure") } + iter.map(_.toLong) + } + .write + .format("parquet") + .option("checkpointLocation", checkpoint) + .startStream(outputDir) + + var failures = 0 + val streamThread = new Thread("stream runner") { + while (continue) { + if (failures % 10 == 0) { logError(s"Query restart #$failures") } + stream = startStream() + + try { + stream.awaitTermination() + } catch { + case ce: ContinuousQueryException => + failures += 1 + } + } + } + + streamThread.join() + + logError(s"Stream restarted $failures times.") + assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords) + } +}