Skip to content

Commit

Permalink
Migrate CSV and XML datasources
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Feb 10, 2024
1 parent e187e3b commit 7ceeacf
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 17 deletions.
35 changes: 35 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -7685,6 +7685,41 @@
"cannot generate code for unsupported type: <dataType>"
]
},
"_LEGACY_ERROR_TEMP_3234" : {
"message" : [
"Unsupported input type <other>"
]
},
"_LEGACY_ERROR_TEMP_3235" : {
"message" : [
"The numbers of zipped arrays and field names should be the same"
]
},
"_LEGACY_ERROR_TEMP_3236" : {
"message" : [
"Unsupported special character for delimiter: <str>"
]
},
"_LEGACY_ERROR_TEMP_3237" : {
"message" : [
"Delimiter cannot be more than one character: <str>"
]
},
"_LEGACY_ERROR_TEMP_3238" : {
"message" : [
"Failed to convert value <v> (class of <class>) in type <dt> to XML."
]
},
"_LEGACY_ERROR_TEMP_3239" : {
"message" : [
"Failed to parse data with unexpected event <e>"
]
},
"_LEGACY_ERROR_TEMP_3240" : {
"message" : [
"Failed to parse a value for data type <dt> with event <e>"
]
},
"_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.csv

import org.apache.commons.lang3.StringUtils

import org.apache.spark.SparkIllegalArgumentException

object CSVExprUtils {
/**
* Filter ignorable rows for CSV iterator (lines empty and starting with `comment`).
Expand Down Expand Up @@ -81,9 +83,11 @@ object CSVExprUtils {
case Seq('\\', '\\') => '\\'
case _ if str == "\u0000" => '\u0000'
case Seq('\\', _) =>
throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3236", messageParameters = Map("str" -> str))
case _ =>
throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3237", messageParameters = Map("str" -> str))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Comparator
import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.{QueryContext, SparkException, SparkIllegalArgumentException}
import org.apache.spark.SparkException.internalError
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, UnresolvedAttribute, UnresolvedSeed}
Expand Down Expand Up @@ -302,8 +302,7 @@ case class ArraysZip(children: Seq[Expression], names: Seq[Expression])
}

if (children.size != names.size) {
throw new IllegalArgumentException(
"The numbers of zipped arrays and field names should be the same")
throw new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3235")
}

final override val nodePatterns: Seq[TreePattern] = Seq(ARRAYS_ZIP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.CharArrayWriter

import com.univocity.parsers.csv.CsvParser

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
Expand Down Expand Up @@ -266,8 +266,9 @@ case class StructsToCsv(
@transient
lazy val inputSchema: StructType = child.dataType match {
case st: StructType => st
case other =>
throw new IllegalArgumentException(s"Unsupported input type ${other.catalogString}")
case other => throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3234",
messageParameters = Map("other" -> other.catalogString))
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package org.apache.spark.sql.catalyst.expressions

import java.io.CharArrayWriter

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
Expand Down Expand Up @@ -283,8 +283,9 @@ case class StructsToXml(
@transient
lazy val inputSchema: StructType = child.dataType match {
case st: StructType => st
case other =>
throw new IllegalArgumentException(s"Unsupported input type ${other.catalogString}")
case other => throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3234",
messageParameters = Map("other" -> other.catalogString))
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.Map
import com.sun.xml.txw2.output.IndentingXMLStreamWriter
import org.apache.hadoop.shaded.com.ctc.wstx.api.WstxOutputProperties

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, DateFormatter, DateTimeUtils, MapData, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
Expand Down Expand Up @@ -219,8 +220,12 @@ class StaxXmlGenerator(
}

case (_, _) =>
throw new IllegalArgumentException(
s"Failed to convert value $v (class of ${v.getClass}) in type $dt to XML.")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3238",
messageParameters = scala.collection.immutable.Map(
"v" -> v.toString,
"class" -> v.getClass.toString,
"dt" -> dt.toString))
}

def writeMapData(mapType: MapType, map: MapData): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.xml.SAXException

import org.apache.commons.lang3.exception.ExceptionUtils

import org.apache.spark.SparkUpgradeException
import org.apache.spark.{SparkIllegalArgumentException, SparkUpgradeException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.ExprUtils
Expand Down Expand Up @@ -245,8 +245,11 @@ class StaxXmlParser(
StaxXmlParserUtils.skipNextEndElement(parser, startElementName, options)
value
case (e: XMLEvent, dt: DataType) =>
throw new IllegalArgumentException(
s"Failed to parse a value for data type $dt with event ${e.toString}")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3240",
messageParameters = Map(
"dt" -> dt.toString,
"e" -> e.toString))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.control.Exception._
import scala.util.control.NonFatal
import scala.xml.SAXException

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
Expand Down Expand Up @@ -209,7 +210,9 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
case _ => structType
}
case e: XMLEvent =>
throw new IllegalArgumentException(s"Failed to parse data with unexpected event $e")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3239",
messageParameters = Map("e" -> e.toString))
}
}

Expand Down

0 comments on commit 7ceeacf

Please sign in to comment.