Skip to content

Commit

Permalink
[SPARK-17374][SQL] Better error messages when parsing JSON using Data…
Browse files Browse the repository at this point in the history
…FrameReader

## What changes were proposed in this pull request?

This PR adds better error messages for malformed record when reading a JSON file using DataFrameReader.

For example, for query:
```
import org.apache.spark.sql.types._
val corruptRecords = spark.sparkContext.parallelize("""{"a":{, b:3}""" :: Nil)
val schema = StructType(StructField("a", StringType, true) :: Nil)
val jsonDF = spark.read.schema(schema).json(corruptRecords)
```

**Before change:**
We silently replace corrupted line with null
```
scala> jsonDF.show
+----+
|   a|
+----+
|null|
+----+
```

**After change:**
Add an explicit warning message:
```
scala> jsonDF.show
16/09/02 14:43:16 WARN JacksonParser: Found at least one malformed records (sample: {"a":{, b:3}). The JSON reader will replace
all malformed records with placeholder null in current PERMISSIVE parser mode.
To find out which corrupted records have been replaced with null, please use the
default inferred schema instead of providing a custom schema.

Code example to print all malformed records (scala):
===================================================
// The corrupted record exists in column _corrupt_record.
val parsedJson = spark.read.json("/path/to/json/file/test.json")

+----+
|   a|
+----+
|null|
+----+
```

###

## How was this patch tested?

Unit test.

Author: Sean Zhong <[email protected]>

Closes apache#14929 from clockfly/logwarning_if_schema_not_contain_corrupted_record.
  • Loading branch information
clockfly authored and cloud-fan committed Sep 6, 2016
1 parent 39d538d commit bc2767d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -52,6 +53,11 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))

@transient
private[this] var isWarningPrintedForMalformedRecord: Boolean = false

/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
Expand All @@ -62,8 +68,39 @@ class JacksonParser(
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
logWarning(s"Dropping malformed line: $record")
if (!isWarningPrintedForMalformedRecord) {
logWarning(
s"""Found at least one malformed records (sample: $record). The JSON reader will drop
|all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
|corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
|mode and use the default inferred schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
isWarningPrintedForMalformedRecord = true
}
Nil
} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
if (!isWarningPrintedForMalformedRecord) {
logWarning(
s"""Found at least one malformed records (sample: $record). The JSON reader will replace
|all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
|To find out which corrupted records have been replaced with null, please use the
|default inferred schema instead of providing a custom schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}.
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
isWarningPrintedForMalformedRecord = true
}
emptyRow
} else {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}

test("Corrupt records: PERMISSIVE mode") {
test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
withTempView("jsonTable") {
val schema = StructType(
StructField("a", StringType, true) ::
StructField("b", StringType, true) ::
StructField("c", StringType, true) :: Nil)

val jsonDF = spark.read.schema(schema).json(corruptRecords)
jsonDF.createOrReplaceTempView("jsonTable")

checkAnswer(
sql(
"""
|SELECT a, b, c
|FROM jsonTable
""".stripMargin),
Seq(
// Corrupted records are replaced with null
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
}
}

test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempView("jsonTable") {
Expand Down

0 comments on commit bc2767d

Please sign in to comment.