Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Scala 2.13 support #823

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-20.04
strategy:
matrix:
scala: [2.12.14, 2.13.5]
env:
SCALA_VERSION: ${{ matrix.scala }}
steps:
- uses: actions/checkout@v2
- name: install java
Expand All @@ -21,7 +26,7 @@ jobs:
# the above directories when we use the key for the first time. After that, each run will
# just use the cache. The cache is immutable so we need to use a new key when trying to
# cache new stuff.
key: delta-sbt-cache-spark3.2
key: delta-sbt-cache-spark3.2-scala${{ matrix.scala }}
- name: Install Job dependencies
shell: bash -l {0}
run: |
Expand Down
21 changes: 18 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
import java.nio.file.Files

val sparkVersion = "3.2.0"
scalaVersion := "2.12.14"
val scala212 = "2.12.14"
val scala213 = "2.13.5"

scalaVersion := scala212

// crossScalaVersions must be set to Nil on the aggregating project
crossScalaVersions := Nil

lazy val commonSettings = Seq(
organization := "io.delta",
scalaVersion := "2.12.14",
crossScalaVersions := Seq(scala212, scala213),
fork := true
)

Expand All @@ -42,7 +49,7 @@ lazy val core = (project in file("core"))
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "provided",

// Test deps
"org.scalatest" %% "scalatest" % "3.1.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.9" % "test",
"junit" % "junit" % "4.12" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
Expand Down Expand Up @@ -212,7 +219,15 @@ def getPrevVersion(currentVersion: String): String = {

lazy val mimaSettings = Seq(
Test / test := ((Test / test) dependsOn mimaReportBinaryIssues).value,
mimaPreviousArtifacts := Set("io.delta" %% "delta-core" % getPrevVersion(version.value)),
mimaPreviousArtifacts := {
if (CrossVersion.partialVersion(scalaVersion.value) == Some((2, 13))) {
// Skip mima check since we don't have a Scala 2.13 release yet.
// TODO Update this after releasing 1.1.0.
Set.empty
} else {
Set("io.delta" %% "delta-core" % getPrevVersion(version.value))
}
},
mimaBinaryIssueFilters ++= MimaExcludes.ignoredABIProblems
)

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
}

protected def visitTableIdentifier(ctx: QualifiedNameContext): TableIdentifier = withOrigin(ctx) {
ctx.identifier.asScala match {
ctx.identifier.asScala.toSeq match {
case Seq(tbl) => TableIdentifier(tbl.getText)
case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx)
Expand All @@ -199,7 +199,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null

override def visitColTypeList(ctx: ColTypeListContext): Seq[StructField] = withOrigin(ctx) {
ctx.colType().asScala.map(visitColType)
ctx.colType().asScala.map(visitColType).toSeq
}

override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
Expand Down Expand Up @@ -234,16 +234,16 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
val checkConstraint = ctx.constraint().asInstanceOf[CheckConstraintContext]

AlterTableAddConstraint(
createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText),
createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq,
"ALTER TABLE ... ADD CONSTRAINT"),
ctx.name.getText,
buildCheckConstraintText(checkConstraint.checkExprToken().asScala))
buildCheckConstraintText(checkConstraint.checkExprToken().asScala.toSeq))
}

override def visitDropTableConstraint(
ctx: DropTableConstraintContext): LogicalPlan = withOrigin(ctx) {
AlterTableDropConstraint(
createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText),
createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq,
"ALTER TABLE ... DROP CONSTRAINT"),
ctx.name.getText)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class DeltaTableBuilder private[tables](
case CreateTableOptions(ifNotExists) =>
CreateTableStatement(
table,
StructType(columns),
StructType(columns.toSeq),
partitioning,
None,
this.properties,
Expand All @@ -339,7 +339,7 @@ class DeltaTableBuilder private[tables](
case ReplaceTableOptions(orCreate) =>
ReplaceTableStatement(
table,
StructType(columns),
StructType(columns.toSeq),
partitioning,
None,
this.properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {
case _ => data.select(selectExprs: _*)
}
recordDeltaEvent(deltaLog, "delta.generatedColumns.write")
(newData, constraints)
(newData, constraints.toSeq)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ object DeltaHistoryManager extends DeltaLogging {
*/
private def flushBuffer(): Unit = {
if (maybeDeleteFiles.lastOption.exists(shouldDeleteFile)) {
filesToDelete.enqueue(maybeDeleteFiles: _*)
filesToDelete ++= maybeDeleteFiles
}
maybeDeleteFiles.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ object DeltaLog extends DeltaLogging {
DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) {
// We pick up only file system options so that we don't pass any parquet or json options to
// the code that reads Delta transaction logs.
options.filterKeys(_.startsWith("fs."))
options.filterKeys(_.startsWith("fs.")).toMap
} else {
Map.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ object DeltaOperations {
sealed abstract class Operation(val name: String) {
val parameters: Map[String, Any]

lazy val jsonEncodedValues: Map[String, String] = parameters.mapValues(JsonUtils.toJson(_))
lazy val jsonEncodedValues: Map[String, String] =
parameters.mapValues(JsonUtils.toJson(_)).toMap

val operationMetrics: Set[String] = Set()

def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = {
metrics.filterKeys( s =>
operationMetrics.contains(s)
).transform((_, v) => v.value.toString)
).mapValues(_.value.toString).toMap
}

val userMetadata: Option[String] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
}

val currentTransactionInfo = CurrentTransactionInfo(
readPredicates = readPredicates,
readPredicates = readPredicates.toSeq,
readFiles = readFiles.toSet,
readWholeTable = readTheWholeTable,
readAppIds = readTxn.toSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ object Protocol {
minimumRequired = DeltaColumnMapping.MIN_PROTOCOL_VERSION
}

minimumRequired -> featuresUsed
minimumRequired -> featuresUsed.toSeq
}

/** Cast the table property for the protocol version to an integer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
throw DeltaErrors.operationNotSupportedException(s"Partitioning by expressions")
}

(identityCols, bucketSpec)
(identityCols.toSeq, bucketSpec)
}

/** Performs checks on the parameters provided for table creation for a Delta table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ abstract class ConvertToDeltaCommandBase(
case v1: V1Table =>
val table = v1.catalogTable
// Hive adds some transient table properties which should be ignored
val props = table.properties.filterKeys(_ != "transient_lastDdlTime")
val props = table.properties.filterKeys(_ != "transient_lastDdlTime").toMap
Some(ConvertTarget(Some(table), table.provider, new Path(table.location).toString, props))
case _: DeltaTableV2 =>
// Already a Delta table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ case class CheckDeltaInvariant(
case c: Check =>
val result = child.eval(input)
if (result == null || result == false) {
throw InvariantViolationException(c, columnExtractors.mapValues(_.eval(input)))
throw InvariantViolationException(c, columnExtractors.mapValues(_.eval(input)).toMap)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class DelayedCommitProtocol(
val stat = fs.getFileStatus(filePath)

buildActionFromAddedFile(f, stat, taskContext)
}
}.toSeq

new TaskCommitMessage(statuses)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
// scalastyle:on deltahadoopconfiguration
partitionColumns = partitioningColumns,
bucketSpec = None,
statsTrackers = statsTrackers,
statsTrackers = statsTrackers.toSeq,
options = Map.empty)
} catch {
case s: SparkException =>
Expand All @@ -243,6 +243,6 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
}
}

committer.addedStatuses
committer.addedStatuses.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta.metering

import scala.collection.mutable
import scala.util.Try
import scala.util.control.NonFatal

Expand Down Expand Up @@ -75,10 +76,12 @@ trait DeltaLogging
} else {
Map.empty[TagDefinition, String]
}

val finalTags = mutable.Map[TagDefinition, String](TAG_OP_TYPE -> opType)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manually create the map as immutable.Map ++ got removed in Scala 2.13.

finalTags ++= tableTags
finalTags ++= tags
recordProductEvent(
EVENT_TAHOE,
Map(TAG_OP_TYPE -> opType) ++ tableTags ++ tags,
finalTags.toMap,
blob = json)
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -107,9 +110,12 @@ trait DeltaLogging
} else {
Map.empty
}
val finalTags = mutable.Map[TagDefinition, String]()
finalTags ++= tableTags
finalTags ++= tags
recordOperation(
new OpType(opType, ""),
extraTags = tableTags ++ tags) {
extraTags = finalTags.toMap) {
thunk
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ trait ImplicitMetadataOperation extends DeltaLogging {
throw DeltaErrors.unexpectedDataChangeException("Create a Delta table")
}
val description = configuration.get("comment").orNull
val cleanedConfs = configuration.filterKeys(_ != "comment")
val cleanedConfs = configuration.filterKeys(_ != "comment").toMap
txn.updateMetadata(
Metadata(
description = description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ object InvariantViolationException {
return new InvariantViolationException("Exceeds char/varchar type length limitation")
}

val valueLines = values.map {
// Sort by the column name to generate consistent error messages in Scala 2.12 and 2.13.
val valueLines = values.toSeq.sortBy(_._1).map {
case (column, value) =>
s" - $column : $value"
}.mkString("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,8 @@ object SchemaUtils {
check(fromValue, toValue, columnPath :+ "value")

case (StructType(fromFields), StructType(toFields)) =>
val remainingFields = fromFields.to[mutable.Set]
val remainingFields = mutable.Set[StructField]()
remainingFields ++= fromFields
toFields.foreach { toField =>
fromFields.find(field => resolver(field.name, toField.name)) match {
case Some(fromField) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ object ClosableIterator {
* An implicit class for applying a function to a [[ClosableIterator]] and returning the
* resulting iterator as a [[ClosableIterator]] with the original `close()` method.
*/
implicit class IteratorCloseOps[A](val iterator: ClosableIterator[A]) extends AnyVal {
implicit class IteratorCloseOps[A](val closableIter: ClosableIterator[A]) extends AnyVal {
def withClose[B](f: Iterator[A] => Iterator[B]): ClosableIterator[B] = new ClosableIterator[B] {
private val iter =
try {
f(iterator)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is because scala/scala-collection-compat#7 changed iterator() to iterator and it conflicts with the IteratorCloseOps.iterator field. Renaming it to fix it for 2.13.

f(closableIter)
} catch {
case e: Throwable =>
iterator.close()
closableIter.close()
throw e
}
override def next(): B = iter.next()
override def hasNext: Boolean = iter.hasNext
override def close(): Unit = iterator.close()
override def close(): Unit = closableIter.close()
}
}

Expand All @@ -46,12 +46,12 @@ object ClosableIterator {
* which (a) closes inner iterators upon reaching their end, and (b) has a `close()` method
* that closes any opened and unclosed inner iterators.
*/
implicit class IteratorFlatMapCloseOp[A](val iterator: Iterator[A]) extends AnyVal {
implicit class IteratorFlatMapCloseOp[A](val closableIter: Iterator[A]) extends AnyVal {
def flatMapWithClose[B](f: A => ClosableIterator[B]): ClosableIterator[B] =
new ClosableIterator[B] {
private var iter_curr =
if (iterator.hasNext) {
f(iterator.next())
if (closableIter.hasNext) {
f(closableIter.next())
} else {
null
}
Expand All @@ -71,8 +71,8 @@ object ClosableIterator {
}
else {
iter_curr.close()
if (iterator.hasNext) {
iter_curr = f(iterator.next())
if (closableIter.hasNext) {
iter_curr = f(closableIter.next())
hasNext
} else {
iter_curr = null
Expand All @@ -92,10 +92,10 @@ object ClosableIterator {
* An implicit class for wrapping an iterator to be a [[ClosableIterator]] with a `close` method
* that does nothing.
*/
implicit class ClosableWrapper[A](val iterator: Iterator[A]) extends AnyVal {
implicit class ClosableWrapper[A](val iter: Iterator[A]) extends AnyVal {
def toClosable: ClosableIterator[A] = new ClosableIterator[A] {
override def next(): A = iterator.next()
override def hasNext: Boolean = iterator.hasNext
override def next(): A = iter.next()
override def hasNext: Boolean = iter.hasNext
override def close(): Unit = ()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ abstract class HadoopFileSystemLogStore(
val stream = fs.open(path)
try {
val reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
IOUtils.readLines(reader).asScala.map(_.trim)
IOUtils.readLines(reader).asScala.map(_.trim).toSeq
} finally {
stream.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private[delta] object PartitionUtils {
(None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
(Some(PartitionValues(columnNames, values)), Some(currentPath))
(Some(PartitionValues(columnNames.toSeq, values.toSeq)), Some(currentPath))
}
}

Expand Down Expand Up @@ -471,7 +471,7 @@ private[delta] object PartitionUtils {
val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct

def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }).toMap

val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
case (path, partValues) => partValues.columnNames -> path
Expand Down
Loading