From 0445be572afc0b1b8b04024f15ac84b7d2254570 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 15 May 2015 16:20:49 +0800 Subject: [PATCH] [SPARK-7591] [SQL] Partitioning support API tweaks Please see [SPARK-7591] [1] for the details. /cc rxin marmbrus yhuai [1]: https://issues.apache.org/jira/browse/SPARK-7591 Author: Cheng Lian Closes #6150 from liancheng/spark-7591 and squashes the following commits: af422e7 [Cheng Lian] Addresses @rxin's comments 37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization 2fc680a [Cheng Lian] Fixes Scala style issue 189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments 522c24e [Cheng Lian] Adds OutputWriterFactory 047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2 --- .../org/apache/spark/sql/SQLContext.scala | 14 +- ...{fsBasedParquet.scala => newParquet.scala} | 71 ++++----- .../sql/sources/DataSourceStrategy.scala | 10 +- .../spark/sql/sources/PartitioningUtils.scala | 4 + .../apache/spark/sql/sources/commands.scala | 23 ++- .../org/apache/spark/sql/sources/ddl.scala | 8 +- .../apache/spark/sql/sources/interfaces.scala | 140 +++++++++--------- .../org/apache/spark/sql/sources/rules.scala | 2 +- .../sql/parquet/ParquetFilterSuite.scala | 2 +- .../sql/parquet/ParquetSchemaSuite.scala | 12 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +- .../spark/sql/hive/execution/commands.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 6 +- .../sql/hive/execution/SQLQuerySuite.scala | 8 +- .../apache/spark/sql/hive/parquetSuites.scala | 20 +-- .../sql/sources/SimpleTextRelation.scala | 47 +++--- ...tes.scala => hadoopFsRelationSuites.scala} | 8 +- 17 files changed, 195 insertions(+), 194 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/parquet/{fsBasedParquet.scala => newParquet.scala} (92%) rename sql/hive/src/test/scala/org/apache/spark/sql/sources/{fsBasedRelationSuites.scala => hadoopFsRelationSuites.scala} (98%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b33a700208014..9fb355eb81939 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } else if (conf.parquetUseDataSourceApi) { val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray baseRelationToDataFrame( - new FSBasedParquetRelation( + new ParquetRelation2( globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this)) } else { DataFrame(this, parquet.ParquetRelation( @@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def jdbc(url: String, table: String): DataFrame = { jdbc(url, table, JDBCRelation.columnPartition(null), new Properties()) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def jdbc(url: String, table: String, properties: Properties): DataFrame = { jdbc(url, table, JDBCRelation.columnPartition(null), properties) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def jdbc( url: String, - table: String, + table: String, columnName: String, lowerBound: Long, upperBound: Long, @@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext) val parts = JDBCRelation.columnPartition(partitioning) jdbc(url, table, parts, properties) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } jdbc(url, table, parts, properties) } - + private def jdbc( url: String, table: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala similarity index 92% rename from sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala rename to sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c83a9c35dbddf..946062f6ea64e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -41,27 +41,23 @@ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLConf, SQLContext} import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} -private[sql] class DefaultSource extends FSBasedRelationProvider { +private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( sqlContext: SQLContext, paths: Array[String], schema: Option[StructType], partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation = { + parameters: Map[String, String]): HadoopFsRelation = { val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty)) - new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext) + new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext) } } // NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter extends OutputWriter { - private var recordWriter: RecordWriter[Void, Row] = _ - private var taskAttemptContext: TaskAttemptContext = _ - - override def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = { +private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) + extends OutputWriter { + + private val recordWriter: RecordWriter[Void, Row] = { val conf = context.getConfiguration val outputFormat = { // When appending new Parquet files to an existing Parquet file directory, to avoid @@ -77,7 +73,7 @@ private[sql] class ParquetOutputWriter extends OutputWriter { if (fs.exists(outputPath)) { // Pattern used to match task ID in part file names, e.g.: // - // part-r-00001.gz.part + // part-r-00001.gz.parquet // ^~~~~ val partFilePattern = """part-.-(\d{1,}).*""".r @@ -86,9 +82,8 @@ private[sql] class ParquetOutputWriter extends OutputWriter { case name if name.startsWith("_") => 0 case name if name.startsWith(".") => 0 case name => sys.error( - s"""Trying to write Parquet files to directory $outputPath, - |but found items with illegal name "$name" - """.stripMargin.replace('\n', ' ').trim) + s"Trying to write Parquet files to directory $outputPath, " + + s"but found items with illegal name '$name'.") }.reduceOption(_ max _).getOrElse(0) } else { 0 @@ -111,37 +106,39 @@ private[sql] class ParquetOutputWriter extends OutputWriter { } } - recordWriter = outputFormat.getRecordWriter(context) - taskAttemptContext = context + outputFormat.getRecordWriter(context) } override def write(row: Row): Unit = recordWriter.write(null, row) - override def close(): Unit = recordWriter.close(taskAttemptContext) + override def close(): Unit = recordWriter.close(context) } -private[sql] class FSBasedParquetRelation( - paths: Array[String], +private[sql] class ParquetRelation2( + override val paths: Array[String], private val maybeDataSchema: Option[StructType], private val maybePartitionSpec: Option[PartitionSpec], parameters: Map[String, String])( val sqlContext: SQLContext) - extends FSBasedRelation(paths, maybePartitionSpec) + extends HadoopFsRelation(maybePartitionSpec) with Logging { // Should we merge schemas from all Parquet part-files? private val shouldMergeSchemas = - parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean + parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean private val maybeMetastoreSchema = parameters - .get(FSBasedParquetRelation.METASTORE_SCHEMA) + .get(ParquetRelation2.METASTORE_SCHEMA) .map(DataType.fromJson(_).asInstanceOf[StructType]) - private val metadataCache = new MetadataCache - metadataCache.refresh() + private lazy val metadataCache: MetadataCache = { + val meta = new MetadataCache + meta.refresh() + meta + } override def equals(other: scala.Any): Boolean = other match { - case that: FSBasedParquetRelation => + case that: ParquetRelation2 => val schemaEquality = if (shouldMergeSchemas) { this.shouldMergeSchemas == that.shouldMergeSchemas } else { @@ -175,8 +172,6 @@ private[sql] class FSBasedParquetRelation( } } - override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter] - override def dataSchema: StructType = metadataCache.dataSchema override private[sql] def refresh(): Unit = { @@ -187,9 +182,12 @@ private[sql] class FSBasedParquetRelation( // Parquet data source always uses Catalyst internal representations. override val needConversion: Boolean = false - override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum + override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum + + override def userDefinedPartitionColumns: Option[StructType] = + maybePartitionSpec.map(_.partitionColumns) - override def prepareForWrite(job: Job): Unit = { + override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = ContextUtil.getConfiguration(job) val committerClass = @@ -224,6 +222,13 @@ private[sql] class FSBasedParquetRelation( .getOrElse( sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name()) + + new OutputWriterFactory { + override def newInstance( + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + new ParquetOutputWriter(path, context) + } + } } override def buildScan( @@ -385,7 +390,7 @@ private[sql] class FSBasedParquetRelation( // case insensitivity issue and possible schema mismatch (probably caused by schema // evolution). maybeMetastoreSchema - .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0)) + .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0)) .getOrElse(dataSchema0) } } @@ -439,12 +444,12 @@ private[sql] class FSBasedParquetRelation( "No schema defined, " + s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") - FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext) + ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) } } } -private[sql] object FSBasedParquetRelation extends Logging { +private[sql] object ParquetRelation2 extends Logging { // Whether we should merge schemas collected from all Parquet part-files. private[sql] val MERGE_SCHEMA = "mergeSchema" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index ee099ab9593c7..e6324b20b3065 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, _) => t.buildScan(a)) :: Nil // Scanning partitioned FSBasedRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray @@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { selectedPartitions) :: Nil // Scanning non-partitioned FSBasedRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) => + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) => val inputPaths = t.paths.map(new Path(_)).flatMap { path => val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration) val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty => + l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append execution.ExecutedCommand( - InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil + InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil case _ => Nil } @@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { partitionColumns: StructType, partitions: Array[Partition]) = { val output = projections.map(_.toAttribute) - val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation] + val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] // Builds RDD[Row]s for each selected partition. val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index d30f7f65e21c0..d1f0cdab55f66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String) private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) private[sql] object PartitioningUtils { + // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't + // depend on Hive. + private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { require(columnNames.size == literals.size) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 7879328bbaaab..a09bb08de736a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource( } } -private[sql] case class InsertIntoFSBasedRelation( - @transient relation: FSBasedRelation, +private[sql] case class InsertIntoHadoopFsRelation( + @transient relation: HadoopFsRelation, @transient query: LogicalPlan, partitionColumns: Array[String], mode: SaveMode) @@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation( insert(new DefaultWriterContainer(relation, job), df) } else { val writerContainer = new DynamicPartitionWriterContainer( - relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__") + relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME) insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns) } } @@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation( } private[sql] abstract class BaseWriterContainer( - @transient val relation: FSBasedRelation, + @transient val relation: HadoopFsRelation, @transient job: Job) extends SparkHadoopMapReduceUtil with Logging @@ -261,7 +261,7 @@ private[sql] abstract class BaseWriterContainer( protected val dataSchema = relation.dataSchema - protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass + protected var outputWriterFactory: OutputWriterFactory = _ private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _ @@ -269,7 +269,7 @@ private[sql] abstract class BaseWriterContainer( setupIDs(0, 0, 0) setupConf() taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) - relation.prepareForWrite(job) + outputWriterFactory = relation.prepareJobForWrite(job) outputFormatClass = job.getOutputFormatClass outputCommitter = newOutputCommitter(taskAttemptContext) outputCommitter.setupJob(jobContext) @@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer( } private[sql] class DefaultWriterContainer( - @transient relation: FSBasedRelation, + @transient relation: HadoopFsRelation, @transient job: Job) extends BaseWriterContainer(relation, job) { @transient private var writer: OutputWriter = _ override protected def initWriters(): Unit = { - writer = outputWriterClass.newInstance() taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath) - writer.init(getWorkPath, dataSchema, taskAttemptContext) + writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) } override def outputWriterForRow(row: Row): OutputWriter = writer @@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer( } private[sql] class DynamicPartitionWriterContainer( - @transient relation: FSBasedRelation, + @transient relation: HadoopFsRelation, @transient job: Job, partitionColumns: Array[String], defaultPartitionName: String) @@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer( outputWriters.getOrElseUpdate(partitionPath, { val path = new Path(getWorkPath, partitionPath) - val writer = outputWriterClass.newInstance() taskAttemptContext.getConfiguration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - writer.init(path.toString, dataSchema, taskAttemptContext) - writer + outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) }) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 595c5eb40e295..37a569db311ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -226,7 +226,7 @@ private[sql] object ResolvedDataSource { case Some(schema: StructType) => clazz.newInstance() match { case dataSource: SchemaRelationProvider => dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => val maybePartitionsSchema = if (partitionColumns.isEmpty) { None } else { @@ -256,7 +256,7 @@ private[sql] object ResolvedDataSource { case None => clazz.newInstance() match { case dataSource: RelationProvider => dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { val patternPath = new Path(caseInsensitiveOptions("path")) @@ -296,7 +296,7 @@ private[sql] object ResolvedDataSource { val relation = clazz.newInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation(sqlContext, mode, options, data) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => // Don't glob path for the write path. The contracts here are: // 1. Only one output path can be specified on the write path; // 2. Output path must be a legal HDFS style file system path; @@ -315,7 +315,7 @@ private[sql] object ResolvedDataSource { Some(partitionColumnsSchema(data.schema, partitionColumns)), caseInsensitiveOptions) sqlContext.executePlan( - InsertIntoFSBasedRelation( + InsertIntoHadoopFsRelation( r, data.logicalPlan, partitionColumns.toArray, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6f315305c11d6..274ab4485217a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, _} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.types.{StructField, StructType} @@ -94,7 +94,7 @@ trait SchemaRelationProvider { * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source * with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a - * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined + * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined * schema, and an optional list of partition columns, this interface is used to pass in the * parameters specified by a user. * @@ -105,15 +105,15 @@ trait SchemaRelationProvider { * * A new instance of this class with be instantiated each time a DDL call is made. * - * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is + * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is * that users need to provide a schema and a (possibly empty) list of partition columns when * using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]], - * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified + * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified * schemas, and accessing partitioned relations. * * @since 1.4.0 */ -trait FSBasedRelationProvider { +trait HadoopFsRelationProvider { /** * Returns a new base relation with the given parameters, a user defined schema, and a list of * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity @@ -124,7 +124,7 @@ trait FSBasedRelationProvider { paths: Array[String], schema: Option[StructType], partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation + parameters: Map[String, String]): HadoopFsRelation } /** @@ -280,33 +280,42 @@ trait CatalystScan { /** * ::Experimental:: - * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the - * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. - * An [[OutputWriter]] instance is created and initialized when a new output file is opened on - * executor side. This instance is used to persist rows to this single output file. + * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver + * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized + * to executor side to create actual [[OutputWriter]]s on the fly. * * @since 1.4.0 */ @Experimental -abstract class OutputWriter { +abstract class OutputWriterFactory extends Serializable { /** - * Initializes this [[OutputWriter]] before any rows are persisted. + * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side + * to instantiate new [[OutputWriter]]s. * * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that * this may not point to the final output file. For example, `FileOutputFormat` writes to * temporary directories and then merge written files back to the final destination. In * this case, `path` points to a temporary output file under the temporary directory. * @param dataSchema Schema of the rows to be written. Partition columns are not included in the - * schema if the corresponding relation is partitioned. + * schema if the relation being written is partitioned. * @param context The Hadoop MapReduce task context. * * @since 1.4.0 */ - def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = () + def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter +} +/** + * ::Experimental:: + * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the + * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. + * An [[OutputWriter]] instance is created and initialized when a new output file is opened on + * executor side. This instance is used to persist rows to this single output file. + * + * @since 1.4.0 + */ +@Experimental +abstract class OutputWriter { /** * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned * tables, dynamic partition columns are not included in rows to be written. @@ -333,74 +342,71 @@ abstract class OutputWriter { * filter using selected predicates before producing an RDD containing all matching tuples as * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file * systems, it's able to discover partitioning information from the paths of input directories, and - * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must - * override one of the three `buildScan` methods to implement the read path. + * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]] + * must override one of the three `buildScan` methods to implement the read path. * * For the write path, it provides the ability to write to both non-partitioned and partitioned * tables. Directory layout of the partitioned tables is compatible with Hive. * * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for * implementing metastore table conversion. - * @param paths Base paths of this relation. For partitioned relations, it should be the root - * directories of all partition directories. - * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional + * + * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional * [[PartitionSpec]], so that partition discovery can be skipped. * * @since 1.4.0 */ @Experimental -abstract class FSBasedRelation private[sql]( - val paths: Array[String], - maybePartitionSpec: Option[PartitionSpec]) +abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec]) extends BaseRelation { + def this() = this(None) + + private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + + private val codegenEnabled = sqlContext.conf.codegenEnabled + + private var _partitionSpec: PartitionSpec = _ + + final private[sql] def partitionSpec: PartitionSpec = { + if (_partitionSpec == null) { + _partitionSpec = maybePartitionSpec + .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable)) + .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition]))) + .getOrElse { + if (sqlContext.conf.partitionDiscoveryEnabled()) { + discoverPartitions() + } else { + PartitionSpec(StructType(Nil), Array.empty[Partition]) + } + } + } + _partitionSpec + } + /** - * Constructs an [[FSBasedRelation]]. - * - * @param paths Base paths of this relation. For partitioned relations, it should be either root - * directories of all partition directories. - * @param partitionColumns Partition columns of this relation. + * Base paths of this relation. For partitioned relations, it should be either root directories + * of all partition directories. * * @since 1.4.0 */ - def this(paths: Array[String], partitionColumns: StructType) = - this(paths, { - if (partitionColumns.isEmpty) None - else Some(PartitionSpec(partitionColumns, Array.empty[Partition])) - }) + def paths: Array[String] /** - * Constructs an [[FSBasedRelation]]. - * - * @param paths Base paths of this relation. For partitioned relations, it should be root - * directories of all partition directories. + * Partition columns. Can be either defined by [[userDefinedPartitionColumns]] or automatically + * discovered. Note that they should always be nullable. * * @since 1.4.0 */ - def this(paths: Array[String]) = this(paths, None) - - private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - - private val codegenEnabled = sqlContext.conf.codegenEnabled - - private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec => - spec.copy(partitionColumns = spec.partitionColumns.asNullable) - }.getOrElse { - if (sqlContext.conf.partitionDiscoveryEnabled()) { - discoverPartitions() - } else { - PartitionSpec(StructType(Nil), Array.empty[Partition]) - } - } - - private[sql] def partitionSpec: PartitionSpec = _partitionSpec + final def partitionColumns: StructType = + userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns) /** - * Partition columns. Note that they are always nullable. + * Optional user defined partition columns. * * @since 1.4.0 */ - def partitionColumns: StructType = partitionSpec.partitionColumns + def userDefinedPartitionColumns: Option[StructType] = None private[sql] def refresh(): Unit = { if (sqlContext.conf.partitionDiscoveryEnabled()) { @@ -419,7 +425,7 @@ abstract class FSBasedRelation private[sql]( }.map(_.getPath) if (leafDirs.nonEmpty) { - PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__") + PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) } else { PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition]) } @@ -458,7 +464,7 @@ abstract class FSBasedRelation private[sql]( * @since 1.4.0 */ def buildScan(inputPaths: Array[String]): RDD[Row] = { - throw new RuntimeException( + throw new UnsupportedOperationException( "At least one buildScan() method should be overridden to read the relation.") } @@ -520,8 +526,8 @@ abstract class FSBasedRelation private[sql]( } /** - * Client side preparation for data writing can be put here. For example, user defined output - * committer can be configured here. + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can + * be put here. For example, user defined output committer can be configured here. * * Note that the only side effect expected here is mutating `job` via its setters. Especially, * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states @@ -529,13 +535,5 @@ abstract class FSBasedRelation private[sql]( * * @since 1.4.0 */ - def prepareForWrite(job: Job): Unit = () - - /** - * This method is responsible for producing a new [[OutputWriter]] for each newly opened output - * file on the executor side. - * - * @since 1.4.0 - */ - def outputWriterClass: Class[_ <: OutputWriter] + def prepareJobForWrite(job: Job): OutputWriterFactory } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index aad1d248d0a28..1eacdde7413f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK - case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK + case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 3bbc5b05868af..5ad439584716f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { }.flatten.reduceOption(_ && _) val forParquetDataSource = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters + case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters }.flatten.reduceOption(_ && _) forParquetTableScan.orElse(forParquetDataSource) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index fc90e3edce7fe..c964b6d984557 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("lowerCase", StringType), StructField("UPPERCase", DoubleType, nullable = false)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("lowercase", StringType), StructField("uppercase", DoubleType, nullable = false))), @@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructType(Seq( StructField("UPPERCase", DoubleType, nullable = false)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false))), @@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Metastore schema contains additional non-nullable fields. assert(intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false), StructField("lowerCase", BinaryType, nullable = false))), @@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Conflicting non-nullable field names intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq(StructField("lower", StringType, nullable = false))), StructType(Seq(StructField("lowerCase", BinaryType)))) } @@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("firstField", StringType, nullable = true), StructField("secondField", StringType, nullable = true), StructField("thirdfield", StringType, nullable = true)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), @@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Merge should fail if the Metastore contains any additional fields that are not // nullable. assert(intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b0e82c8d033b2..2aa80b47a97e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources} @@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // serialize the Metastore schema to JSON and pass it as a data source option because of the // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. val parquetOptions = Map( - FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, - FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) + ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json, + ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) @@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) => + case logical@LogicalRelation(parquetRelation: ParquetRelation2) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = @@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec)) val parquetRelation = cached.getOrElse { val created = LogicalRelation( - new FSBasedParquetRelation( + new ParquetRelation2( paths.toArray, None, Some(partitionSpec), parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created @@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, None) val parquetRelation = cached.getOrElse { val created = LogicalRelation( - new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive)) + new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 8e405e080489f..6609763343752 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match { - case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) => + case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) => if (l.relation != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index da5d203d9d343..1bf1c1be3e3d3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: FSBasedParquetRelation) => // OK + case LogicalRelation(p: ParquetRelation2) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[FSBasedParquetRelation].getCanonicalName}") + s"${classOf[ParquetRelation2].getCanonicalName}") } // Clenup and reset confs. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5c7152e2140db..dfe73c62c42b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation} -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ @@ -175,17 +175,17 @@ class SQLQuerySuite extends QueryTest { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) relation match { - case LogicalRelation(r: FSBasedParquetRelation) => + case LogicalRelation(r: ParquetRelation2) => if (!isDataSourceParquet) { fail( s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + - s"${FSBasedParquetRelation.getClass.getCanonicalName}.") + s"${ParquetRelation2.getClass.getCanonicalName}.") } case r: MetastoreRelation => if (isDataSourceParquet) { fail( - s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " + + s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " + s"${classOf[MetastoreRelation].getCanonicalName}.") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 41bcbe84b0ef2..b6be09e2f8837 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan} -import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation} +import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} +import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types._ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode} import org.apache.spark.util.Utils @@ -291,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(_: FSBasedParquetRelation) => // OK + case LogicalRelation(_: ParquetRelation2) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[FSBasedParquetRelation].getCanonicalName}") + s"${classOf[ParquetRelation2].getCanonicalName}") } sql("DROP TABLE IF EXISTS test_parquet_ctas") @@ -315,9 +315,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + s"However, found a ${o.toString} ") } @@ -345,9 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + s"However, found a ${o.toString} ") } @@ -378,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { assertResult(2) { analyzed.collect { - case r @ LogicalRelation(_: FSBasedParquetRelation) => r + case r @ LogicalRelation(_: ParquetRelation2) => r }.size } @@ -390,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match { case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK + case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 8801aba2f64c3..29b21586f9c2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -24,7 +24,7 @@ import com.google.common.base.Objects import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} -import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} +import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} @@ -32,17 +32,16 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} /** - * A simple example [[FSBasedRelationProvider]]. + * A simple example [[HadoopFsRelationProvider]]. */ -class SimpleTextSource extends FSBasedRelationProvider { +class SimpleTextSource extends HadoopFsRelationProvider { override def createRelation( sqlContext: SQLContext, paths: Array[String], schema: Option[StructType], partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation = { - val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField])) - new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext) + parameters: Map[String, String]): HadoopFsRelation = { + new SimpleTextRelation(paths, schema, partitionColumns, parameters)(sqlContext) } } @@ -59,38 +58,30 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW } } -class SimpleTextOutputWriter extends OutputWriter { - private var recordWriter: RecordWriter[NullWritable, Text] = _ - private var taskAttemptContext: TaskAttemptContext = _ - - override def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = { - recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) - taskAttemptContext = context - } +class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { + private val recordWriter: RecordWriter[NullWritable, Text] = + new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) override def write(row: Row): Unit = { val serialized = row.toSeq.map(_.toString).mkString(",") recordWriter.write(null, new Text(serialized)) } - override def close(): Unit = recordWriter.close(taskAttemptContext) + override def close(): Unit = recordWriter.close(context) } /** - * A simple example [[FSBasedRelation]], used for testing purposes. Data are stored as comma + * A simple example [[HadoopFsRelation]], used for testing purposes. Data are stored as comma * separated string lines. When scanning data, schema must be explicitly provided via data source * option `"dataSchema"`. */ class SimpleTextRelation( - paths: Array[String], + override val paths: Array[String], val maybeDataSchema: Option[StructType], - partitionsSchema: StructType, + override val userDefinedPartitionColumns: Option[StructType], parameters: Map[String, String])( @transient val sqlContext: SQLContext) - extends FSBasedRelation(paths, partitionsSchema) { + extends HadoopFsRelation { import sqlContext.sparkContext @@ -110,9 +101,6 @@ class SimpleTextRelation( override def hashCode(): Int = Objects.hashCode(paths, maybeDataSchema, dataSchema) - override def outputWriterClass: Class[_ <: OutputWriter] = - classOf[SimpleTextOutputWriter] - override def buildScan(inputPaths: Array[String]): RDD[Row] = { val fields = dataSchema.map(_.dataType) @@ -122,4 +110,13 @@ class SimpleTextRelation( }: _*) } } + + override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala similarity index 98% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 394833f22907d..cf6afd25ae5a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ // TODO Don't extend ParquetTest // This test suite extends ParquetTest for some convenient utility methods. These methods should be // moved to some more general places, maybe QueryTest. -class FSBasedRelationTest extends QueryTest with ParquetTest { +class HadoopFsRelationTest extends QueryTest with ParquetTest { override val sqlContext: SQLContext = TestHive import sqlContext._ @@ -487,7 +487,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest { } val actualPaths = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: FSBasedRelation) => + case LogicalRelation(relation: HadoopFsRelation) => relation.paths.toSet }.getOrElse { fail("Expect an FSBasedRelation, but none could be found") @@ -499,7 +499,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest { } } -class SimpleTextRelationSuite extends FSBasedRelationTest { +class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName import sqlContext._ @@ -530,7 +530,7 @@ class SimpleTextRelationSuite extends FSBasedRelationTest { } } -class FSBasedParquetRelationSuite extends FSBasedRelationTest { +class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName import sqlContext._