diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 067c6376b4c..ab1fd3917e1 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -3,6 +3,11 @@ on: [push, pull_request] jobs: test: runs-on: ubuntu-20.04 + strategy: + matrix: + scala: [2.12.14, 2.13.5] + env: + SCALA_VERSION: ${{ matrix.scala }} steps: - uses: actions/checkout@v2 - name: install java @@ -21,7 +26,7 @@ jobs: # the above directories when we use the key for the first time. After that, each run will # just use the cache. The cache is immutable so we need to use a new key when trying to # cache new stuff. - key: delta-sbt-cache-spark3.2 + key: delta-sbt-cache-spark3.2-scala${{ matrix.scala }} - name: Install Job dependencies shell: bash -l {0} run: | diff --git a/build.sbt b/build.sbt index e30ac1bc5d5..cb1b95cbbfc 100644 --- a/build.sbt +++ b/build.sbt @@ -17,11 +17,18 @@ import java.nio.file.Files val sparkVersion = "3.2.0" -scalaVersion := "2.12.14" +val scala212 = "2.12.14" +val scala213 = "2.13.5" + +scalaVersion := scala212 + +// crossScalaVersions must be set to Nil on the aggregating project +crossScalaVersions := Nil lazy val commonSettings = Seq( organization := "io.delta", scalaVersion := "2.12.14", + crossScalaVersions := Seq(scala212, scala213), fork := true ) @@ -42,7 +49,7 @@ lazy val core = (project in file("core")) "org.apache.spark" %% "spark-catalyst" % sparkVersion % "provided", // Test deps - "org.scalatest" %% "scalatest" % "3.1.0" % "test", + "org.scalatest" %% "scalatest" % "3.2.9" % "test", "junit" % "junit" % "4.12" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", @@ -212,7 +219,15 @@ def getPrevVersion(currentVersion: String): String = { lazy val mimaSettings = Seq( Test / test := ((Test / test) dependsOn mimaReportBinaryIssues).value, - mimaPreviousArtifacts := Set("io.delta" %% "delta-core" % getPrevVersion(version.value)), + mimaPreviousArtifacts := { + if (CrossVersion.partialVersion(scalaVersion.value) == Some((2, 13))) { + // Skip mima check since we don't have a Scala 2.13 release yet. + // TODO Update this after releasing 1.1.0. + Set.empty + } else { + Set("io.delta" %% "delta-core" % getPrevVersion(version.value)) + } + }, mimaBinaryIssueFilters ++= MimaExcludes.ignoredABIProblems ) diff --git a/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index 4ad752da9fa..3b8677ead65 100644 --- a/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -189,7 +189,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { } protected def visitTableIdentifier(ctx: QualifiedNameContext): TableIdentifier = withOrigin(ctx) { - ctx.identifier.asScala match { + ctx.identifier.asScala.toSeq match { case Seq(tbl) => TableIdentifier(tbl.getText) case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText)) case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx) @@ -199,7 +199,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null override def visitColTypeList(ctx: ColTypeListContext): Seq[StructField] = withOrigin(ctx) { - ctx.colType().asScala.map(visitColType) + ctx.colType().asScala.map(visitColType).toSeq } override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) { @@ -234,16 +234,16 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { val checkConstraint = ctx.constraint().asInstanceOf[CheckConstraintContext] AlterTableAddConstraint( - createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText), + createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, "ALTER TABLE ... ADD CONSTRAINT"), ctx.name.getText, - buildCheckConstraintText(checkConstraint.checkExprToken().asScala)) + buildCheckConstraintText(checkConstraint.checkExprToken().asScala.toSeq)) } override def visitDropTableConstraint( ctx: DropTableConstraintContext): LogicalPlan = withOrigin(ctx) { AlterTableDropConstraint( - createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText), + createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, "ALTER TABLE ... DROP CONSTRAINT"), ctx.name.getText) } diff --git a/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala b/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala index 8397d76fddd..fca4653aeea 100644 --- a/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala +++ b/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala @@ -324,7 +324,7 @@ class DeltaTableBuilder private[tables]( case CreateTableOptions(ifNotExists) => CreateTableStatement( table, - StructType(columns), + StructType(columns.toSeq), partitioning, None, this.properties, @@ -339,7 +339,7 @@ class DeltaTableBuilder private[tables]( case ReplaceTableOptions(orCreate) => ReplaceTableStatement( table, - StructType(columns), + StructType(columns.toSeq), partitioning, None, this.properties, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala index 5f805518ad6..ecbdee430b8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala @@ -85,7 +85,7 @@ object ColumnWithDefaultExprUtils extends DeltaLogging { case _ => data.select(selectExprs: _*) } recordDeltaEvent(deltaLog, "delta.generatedColumns.write") - (newData, constraints) + (newData, constraints.toSeq) } /** diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index 2fb748a2b0e..9ffd6d823dd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -501,7 +501,7 @@ object DeltaHistoryManager extends DeltaLogging { */ private def flushBuffer(): Unit = { if (maybeDeleteFiles.lastOption.exists(shouldDeleteFile)) { - filesToDelete.enqueue(maybeDeleteFiles: _*) + filesToDelete ++= maybeDeleteFiles } maybeDeleteFiles.clear() } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 8aba45654e3..e9ca0d51c46 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -528,7 +528,7 @@ object DeltaLog extends DeltaLogging { DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) { // We pick up only file system options so that we don't pass any parquet or json options to // the code that reads Delta transaction logs. - options.filterKeys(_.startsWith("fs.")) + options.filterKeys(_.startsWith("fs.")).toMap } else { Map.empty } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 5998c648d31..312282a3d90 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -41,14 +41,15 @@ object DeltaOperations { sealed abstract class Operation(val name: String) { val parameters: Map[String, Any] - lazy val jsonEncodedValues: Map[String, String] = parameters.mapValues(JsonUtils.toJson(_)) + lazy val jsonEncodedValues: Map[String, String] = + parameters.mapValues(JsonUtils.toJson(_)).toMap val operationMetrics: Set[String] = Set() def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = { metrics.filterKeys( s => operationMetrics.contains(s) - ).transform((_, v) => v.value.toString) + ).mapValues(_.value.toString).toMap } val userMetadata: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index ca8c4dd4925..5246c61356b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -489,7 +489,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport } val currentTransactionInfo = CurrentTransactionInfo( - readPredicates = readPredicates, + readPredicates = readPredicates.toSeq, readFiles = readFiles.toSet, readWholeTable = readTheWholeTable, readAppIds = readTxn.toSet, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 10d3c49df7f..1343df17e44 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -160,7 +160,7 @@ object Protocol { minimumRequired = DeltaColumnMapping.MIN_PROTOCOL_VERSION } - minimumRequired -> featuresUsed + minimumRequired -> featuresUsed.toSeq } /** Cast the table property for the protocol version to an integer. */ diff --git a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala index ec5ea150479..8a6bde8bda2 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala @@ -280,7 +280,7 @@ class DeltaCatalog extends DelegatingCatalogExtension throw DeltaErrors.operationNotSupportedException(s"Partitioning by expressions") } - (identityCols, bucketSpec) + (identityCols.toSeq, bucketSpec) } /** Performs checks on the parameters provided for table creation for a Delta table. */ diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index 6d364f202b8..75c4752a265 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -118,7 +118,7 @@ abstract class ConvertToDeltaCommandBase( case v1: V1Table => val table = v1.catalogTable // Hive adds some transient table properties which should be ignored - val props = table.properties.filterKeys(_ != "transient_lastDdlTime") + val props = table.properties.filterKeys(_ != "transient_lastDdlTime").toMap Some(ConvertTarget(Some(table), table.provider, new Path(table.location).toString, props)) case _: DeltaTableV2 => // Already a Delta table diff --git a/core/src/main/scala/org/apache/spark/sql/delta/constraints/CheckDeltaInvariant.scala b/core/src/main/scala/org/apache/spark/sql/delta/constraints/CheckDeltaInvariant.scala index 649c155029e..d38877a4b40 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/constraints/CheckDeltaInvariant.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/constraints/CheckDeltaInvariant.scala @@ -59,7 +59,7 @@ case class CheckDeltaInvariant( case c: Check => val result = child.eval(input) if (result == null || result == false) { - throw InvariantViolationException(c, columnExtractors.mapValues(_.eval(input))) + throw InvariantViolationException(c, columnExtractors.mapValues(_.eval(input)).toMap) } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/files/DelayedCommitProtocol.scala b/core/src/main/scala/org/apache/spark/sql/delta/files/DelayedCommitProtocol.scala index a7d7c361dc0..4a426305ee8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/files/DelayedCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/files/DelayedCommitProtocol.scala @@ -150,7 +150,7 @@ class DelayedCommitProtocol( val stat = fs.getFileStatus(filePath) buildActionFromAddedFile(f, stat, taskContext) - } + }.toSeq new TaskCommitMessage(statuses) } else { diff --git a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala index a0107d7ca5c..68fd8eef26e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala @@ -229,7 +229,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl // scalastyle:on deltahadoopconfiguration partitionColumns = partitioningColumns, bucketSpec = None, - statsTrackers = statsTrackers, + statsTrackers = statsTrackers.toSeq, options = Map.empty) } catch { case s: SparkException => @@ -243,6 +243,6 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl } } - committer.addedStatuses + committer.addedStatuses.toSeq } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala b/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala index 3e314215433..5bad807193c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta.metering +import scala.collection.mutable import scala.util.Try import scala.util.control.NonFatal @@ -75,10 +76,12 @@ trait DeltaLogging } else { Map.empty[TagDefinition, String] } - + val finalTags = mutable.Map[TagDefinition, String](TAG_OP_TYPE -> opType) + finalTags ++= tableTags + finalTags ++= tags recordProductEvent( EVENT_TAHOE, - Map(TAG_OP_TYPE -> opType) ++ tableTags ++ tags, + finalTags.toMap, blob = json) } catch { case NonFatal(e) => @@ -107,9 +110,12 @@ trait DeltaLogging } else { Map.empty } + val finalTags = mutable.Map[TagDefinition, String]() + finalTags ++= tableTags + finalTags ++= tags recordOperation( new OpType(opType, ""), - extraTags = tableTags ++ tags) { + extraTags = finalTags.toMap) { thunk } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 74c931dccdf..0f7dc1628c6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -98,7 +98,7 @@ trait ImplicitMetadataOperation extends DeltaLogging { throw DeltaErrors.unexpectedDataChangeException("Create a Delta table") } val description = configuration.get("comment").orNull - val cleanedConfs = configuration.filterKeys(_ != "comment") + val cleanedConfs = configuration.filterKeys(_ != "comment").toMap txn.updateMetadata( Metadata( description = description, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/schema/InvariantViolationException.scala b/core/src/main/scala/org/apache/spark/sql/delta/schema/InvariantViolationException.scala index 986534a8310..1c8ebf6765e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/schema/InvariantViolationException.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/schema/InvariantViolationException.scala @@ -45,7 +45,8 @@ object InvariantViolationException { return new InvariantViolationException("Exceeds char/varchar type length limitation") } - val valueLines = values.map { + // Sort by the column name to generate consistent error messages in Scala 2.12 and 2.13. + val valueLines = values.toSeq.sortBy(_._1).map { case (column, value) => s" - $column : $value" }.mkString("\n") diff --git a/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 5a8103220c4..06e01f8b9fa 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -722,7 +722,8 @@ object SchemaUtils { check(fromValue, toValue, columnPath :+ "value") case (StructType(fromFields), StructType(toFields)) => - val remainingFields = fromFields.to[mutable.Set] + val remainingFields = mutable.Set[StructField]() + remainingFields ++= fromFields toFields.foreach { toField => fromFields.find(field => resolver(field.name, toField.name)) match { case Some(fromField) => diff --git a/core/src/main/scala/org/apache/spark/sql/delta/storage/ClosableIterator.scala b/core/src/main/scala/org/apache/spark/sql/delta/storage/ClosableIterator.scala index 088339526d0..bfd95a3292c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/storage/ClosableIterator.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/storage/ClosableIterator.scala @@ -25,19 +25,19 @@ object ClosableIterator { * An implicit class for applying a function to a [[ClosableIterator]] and returning the * resulting iterator as a [[ClosableIterator]] with the original `close()` method. */ - implicit class IteratorCloseOps[A](val iterator: ClosableIterator[A]) extends AnyVal { + implicit class IteratorCloseOps[A](val closableIter: ClosableIterator[A]) extends AnyVal { def withClose[B](f: Iterator[A] => Iterator[B]): ClosableIterator[B] = new ClosableIterator[B] { private val iter = try { - f(iterator) + f(closableIter) } catch { case e: Throwable => - iterator.close() + closableIter.close() throw e } override def next(): B = iter.next() override def hasNext: Boolean = iter.hasNext - override def close(): Unit = iterator.close() + override def close(): Unit = closableIter.close() } } @@ -46,12 +46,12 @@ object ClosableIterator { * which (a) closes inner iterators upon reaching their end, and (b) has a `close()` method * that closes any opened and unclosed inner iterators. */ - implicit class IteratorFlatMapCloseOp[A](val iterator: Iterator[A]) extends AnyVal { + implicit class IteratorFlatMapCloseOp[A](val closableIter: Iterator[A]) extends AnyVal { def flatMapWithClose[B](f: A => ClosableIterator[B]): ClosableIterator[B] = new ClosableIterator[B] { private var iter_curr = - if (iterator.hasNext) { - f(iterator.next()) + if (closableIter.hasNext) { + f(closableIter.next()) } else { null } @@ -71,8 +71,8 @@ object ClosableIterator { } else { iter_curr.close() - if (iterator.hasNext) { - iter_curr = f(iterator.next()) + if (closableIter.hasNext) { + iter_curr = f(closableIter.next()) hasNext } else { iter_curr = null @@ -92,10 +92,10 @@ object ClosableIterator { * An implicit class for wrapping an iterator to be a [[ClosableIterator]] with a `close` method * that does nothing. */ - implicit class ClosableWrapper[A](val iterator: Iterator[A]) extends AnyVal { + implicit class ClosableWrapper[A](val iter: Iterator[A]) extends AnyVal { def toClosable: ClosableIterator[A] = new ClosableIterator[A] { - override def next(): A = iterator.next() - override def hasNext: Boolean = iterator.hasNext + override def next(): A = iter.next() + override def hasNext: Boolean = iter.hasNext override def close(): Unit = () } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/storage/HadoopFileSystemLogStore.scala b/core/src/main/scala/org/apache/spark/sql/delta/storage/HadoopFileSystemLogStore.scala index c8fb2149502..837eedfd636 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/storage/HadoopFileSystemLogStore.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/storage/HadoopFileSystemLogStore.scala @@ -54,7 +54,7 @@ abstract class HadoopFileSystemLogStore( val stream = fs.open(path) try { val reader = new BufferedReader(new InputStreamReader(stream, UTF_8)) - IOUtils.readLines(reader).asScala.map(_.trim) + IOUtils.readLines(reader).asScala.map(_.trim).toSeq } finally { stream.close() } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala index 2ed877e62d3..2db1da91d5f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala @@ -324,7 +324,7 @@ private[delta] object PartitionUtils { (None, Some(path)) } else { val (columnNames, values) = columns.reverse.unzip - (Some(PartitionValues(columnNames, values)), Some(currentPath)) + (Some(PartitionValues(columnNames.toSeq, values.toSeq)), Some(currentPath)) } } @@ -471,7 +471,7 @@ private[delta] object PartitionUtils { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = - seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) + seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }).toMap val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { case (path, partValues) => partValues.columnNames -> path diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala index c9c008998b8..cd05e00d361 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala @@ -183,12 +183,12 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession { val colName = partCol.phy(deltaLog) deltaLog.update().allFiles.collect() .groupBy(_.partitionValues(colName)) - .mapValues(_.map(deltaLog.dataPath.toUri.getPath + "/" + _.path)) + .mapValues(_.map(deltaLog.dataPath.toUri.getPath + "/" + _.path)).toMap } else { val partColEscaped = s"${ExternalCatalogUtils.escapePathName(partCol)}" val dataPath = new File(deltaLog.dataPath.toUri.getPath) dataPath.listFiles().filter(_.getName.startsWith(s"$partColEscaped=")) - .groupBy(_.getName.split("=").last).mapValues(_.map(_.getPath)) + .groupBy(_.getName.split("=").last).mapValues(_.map(_.getPath)).toMap } } @@ -206,7 +206,7 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession { val grouped = inputFiles.flatMap { f => allFiles.find(af => f.contains(af.path)).head.partitionValues.map(entry => (f, entry)) }.groupBy(_._2) - grouped.mapValues(_.map(_._1)) + grouped.mapValues(_.map(_._1)).toMap } else { inputFiles.groupBy(p => { val nameParts = new Path(p).getParent.getName.split("=") diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaDataFrameWriterV2Suite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaDataFrameWriterV2Suite.scala index e4a0a917fac..173fe540dd9 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaDataFrameWriterV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaDataFrameWriterV2Suite.scala @@ -66,6 +66,7 @@ trait OpenSourceDataFrameWriterV2Tests .filterKeys(!reservedProp.contains(_)) .filterKeys(k => k != Protocol.MIN_READER_VERSION_PROP && k != Protocol.MIN_WRITER_VERSION_PROP) + .toMap } test("Append: basic append") { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 393ea3a6f58..abdc71f1171 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import java.io.{PrintWriter, StringWriter} +import scala.collection.mutable import scala.sys.process.Process import org.apache.hadoop.fs.Path @@ -60,8 +61,12 @@ trait DeltaErrorsSuiteBase DeltaErrors.faqRelativePath, skipValidation = true), path)) - def errorMessagesToTest: Map[String, String] = - errorsToTest.mapValues(_.getMessage) ++ otherMessagesToTest + def errorMessagesToTest: Map[String, String] = { + val map = mutable.Map[String, String]() + map ++= errorsToTest.mapValues(_.getMessage) + map ++= otherMessagesToTest + map.toMap + } def checkIfValidResponse(url: String, response: String): Boolean = { response.contains("HTTP/1.1 200 OK") || response.contains("HTTP/2 200") diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala index 2029146ea3e..7ac4bd5242b 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala @@ -1641,6 +1641,7 @@ class DeltaTableCreationSuite .filterKeys(!CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(_)) .filterKeys(k => k != Protocol.MIN_READER_VERSION_PROP && k != Protocol.MIN_WRITER_VERSION_PROP) + .toMap } testQuietly("REPLACE TABLE") { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala index 32281277a5c..60a7fb9df1d 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.schema // scalastyle:off import.ordering.noEmptyLine import java.util.Locale +import java.util.regex.Pattern import org.apache.spark.sql.delta.schema.SchemaMergingUtils._ import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY @@ -45,6 +46,16 @@ class SchemaUtilsSuite extends QueryTest s"Error message '$msg' didn't contain: $shouldContain") } + private def expectFailurePattern(shouldContainPatterns: String*)(f: => Unit): Unit = { + val e = intercept[AnalysisException] { + f + } + val patterns = + shouldContainPatterns.map(regex => Pattern.compile(regex, Pattern.CASE_INSENSITIVE)) + assert(patterns.forall(_.matcher(e.getMessage).find()), + s"Error message '${e.getMessage}' didn't contain the patterns: $shouldContainPatterns") + } + ///////////////////////////// // Duplicate Column Checks ///////////////////////////// @@ -1046,7 +1057,12 @@ class SchemaUtilsSuite extends QueryTest mergeSchemas(base, new StructType() .add("struct", new StructType().add("a", DateType))) } - expectFailure("'struct'", "structType", "MapType") { + // StructType's toString is different between Scala 2.12 and 2.13. + // - In Scala 2.12, it extends `scala.collection.Seq` which returns + // `StructType(StructField(a,IntegerType,true))`. + // - In Scala 2.13, it extends `scala.collection.immutable.Seq` which returns + // `Seq(StructField(a,IntegerType,true))`. + expectFailurePattern("'struct'", "StructType|Seq\\(", "MapType") { mergeSchemas(base, new StructType() .add("struct", MapType(StringType, IntegerType))) } @@ -1062,7 +1078,8 @@ class SchemaUtilsSuite extends QueryTest mergeSchemas(base, new StructType() .add("array", ArrayType(new StructType().add("b", DecimalType(16, 10))))) } - expectFailure("'map'", "MapType", "StructType") { + // See the above comment about `StructType` + expectFailurePattern("'map'", "MapType", "StructType|Seq\\(") { mergeSchemas(base, new StructType() .add("map", new StructType().add("b", StringType))) } diff --git a/run-tests.py b/run-tests.py index 04ea89d3784..6d5764ded4c 100755 --- a/run-tests.py +++ b/run-tests.py @@ -21,10 +21,13 @@ from os import path -def run_sbt_tests(root_dir): +def run_sbt_tests(root_dir, scala_version=None): print("##### Running SBT tests #####") sbt_path = path.join(root_dir, path.join("build", "sbt")) - run_cmd([sbt_path, "clean", "test"], stream_output=True) + if scala_version is None: + run_cmd([sbt_path, "clean", "+test"], stream_output=True) + else: + run_cmd([sbt_path, "clean", "++ %s test" % scala_version], stream_output=True) def run_python_tests(root_dir): @@ -72,5 +75,8 @@ def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): run_cmd(cmd, stream_output=True) else: root_dir = os.path.dirname(os.path.dirname(__file__)) - run_sbt_tests(root_dir) - run_python_tests(root_dir) + scala_version = os.getenv("SCALA_VERSION") + run_sbt_tests(root_dir, scala_version) + # Python tests are skipped when using Scala 2.13 as PySpark doesn't support it. + if scala_version is None or scala_version.startswith("2.12"): + run_python_tests(root_dir)