Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17374][SQL] Better error messages when parsing JSON using DataFrameReader #14929

Conversation

clockfly
Copy link
Contributor

@clockfly clockfly commented Sep 2, 2016

What changes were proposed in this pull request?

This PR adds better error messages for malformed record when reading a JSON file using DataFrameReader.

For example, for query:

import org.apache.spark.sql.types._
val corruptRecords = spark.sparkContext.parallelize("""{"a":{, b:3}""" :: Nil)
val schema = StructType(StructField("a", StringType, true) :: Nil)
val jsonDF = spark.read.schema(schema).json(corruptRecords)

Before change:
We silently replace corrupted line with null

scala> jsonDF.show
+----+
|   a|
+----+
|null|
+----+

After change:
Add an explicit warning message:

scala> jsonDF.show
16/09/02 14:43:16 WARN JacksonParser: Found at least one malformed records (sample: {"a":{, b:3}). The JSON reader will replace
all malformed records with placeholder null in current PERMISSIVE parser mode.
To find out which corrupted records have been replaced with null, please use the
default inferred schema instead of providing a custom schema.

Code example to print all malformed records (scala):
===================================================
// The corrupted record exists in column _corrupt_record.
val parsedJson = spark.read.json("/path/to/json/file/test.json")

+----+
|   a|
+----+
|null|
+----+

How was this patch tested?

Unit test.

@clockfly clockfly force-pushed the logwarning_if_schema_not_contain_corrupted_record branch from 50e312f to 3491f15 Compare September 2, 2016 00:26
@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64818 has finished for PR 14929 at commit 50e312f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64821 has finished for PR 14929 at commit 3491f15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@clockfly clockfly force-pushed the logwarning_if_schema_not_contain_corrupted_record branch from 3491f15 to b7ebf26 Compare September 2, 2016 06:26
@@ -62,8 +68,35 @@ class JacksonParser(
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
logWarning(s"Dropping malformed line: $record")
if (!isWarningPrintedForMalformedRecord) {
logWarning(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only print a warning for the first time.

@clockfly clockfly changed the title [Don't merge][WIP] Better error message for JSON file format reader when finding malformed line in PERMISSIVE parser mode [SPARK-17374][SQL] Better error messages when parsing JSON using DataFrameReader Sep 2, 2016
@clockfly clockfly force-pushed the logwarning_if_schema_not_contain_corrupted_record branch from b7ebf26 to 82f9927 Compare September 2, 2016 06:52
@clockfly
Copy link
Contributor Author

clockfly commented Sep 2, 2016

@cloud-fan

private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))

@transient
private var isWarningPrintedForMalformedRecord: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[this]?

@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64838 has finished for PR 14929 at commit b7ebf26.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64841 has finished for PR 14929 at commit 82f9927.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 6, 2016

Test build #64955 has finished for PR 14929 at commit f47f921.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@clockfly
Copy link
Contributor Author

clockfly commented Sep 6, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Sep 6, 2016

Test build #64996 has finished for PR 14929 at commit f47f921.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

StructField("c", StringType, true) :: Nil)

val jsonDF = spark.read.schema(schema).json(corruptRecords)
jsonDF.createOrReplaceTempView("jsonTable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create this temp view? we can

checkAnswer(jsonDF.select($"a", $"b", $"c"), Seq(Row...))

@cloud-fan
Copy link
Contributor

thanks, merging to master! @clockfly can you send a follow-up PR to address the minor comment in test?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants