Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17374][SQL] Better error messages when parsing JSON using DataFrameReader #14929

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -52,6 +53,11 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))

@transient
private[this] var isWarningPrintedForMalformedRecord: Boolean = false

/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
Expand All @@ -62,8 +68,39 @@ class JacksonParser(
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
logWarning(s"Dropping malformed line: $record")
if (!isWarningPrintedForMalformedRecord) {
logWarning(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only print a warning for the first time.

s"""Found at least one malformed records (sample: $record). The JSON reader will drop
|all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
|corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
|mode and use the default inferred schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
isWarningPrintedForMalformedRecord = true
}
Nil
} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
if (!isWarningPrintedForMalformedRecord) {
logWarning(
s"""Found at least one malformed records (sample: $record). The JSON reader will replace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make a common method to produce this error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is different, although similar.

|all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
|To find out which corrupted records have been replaced with null, please use the
|default inferred schema instead of providing a custom schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}.
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
isWarningPrintedForMalformedRecord = true
}
emptyRow
} else {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}

test("Corrupt records: PERMISSIVE mode") {
test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
withTempView("jsonTable") {
val schema = StructType(
StructField("a", StringType, true) ::
StructField("b", StringType, true) ::
StructField("c", StringType, true) :: Nil)

val jsonDF = spark.read.schema(schema).json(corruptRecords)
jsonDF.createOrReplaceTempView("jsonTable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create this temp view? we can

checkAnswer(jsonDF.select($"a", $"b", $"c"), Seq(Row...))


checkAnswer(
sql(
"""
|SELECT a, b, c
|FROM jsonTable
""".stripMargin),
Seq(
// Corrupted records are replaced with null
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
}
}

test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempView("jsonTable") {
Expand Down