diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 359a3e2aa8ad2..5ce1bf7432159 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE} import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -52,6 +53,11 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) + private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + + @transient + private[this] var isWarningPrintedForMalformedRecord: Boolean = false + /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. @@ -62,8 +68,39 @@ class JacksonParser( throw new RuntimeException(s"Malformed line in FAILFAST mode: $record") } if (options.dropMalformed) { - logWarning(s"Dropping malformed line: $record") + if (!isWarningPrintedForMalformedRecord) { + logWarning( + s"""Found at least one malformed records (sample: $record). The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |Code example to print all malformed records (scala): + |=================================================== + |// The corrupted record exists in column ${columnNameOfCorruptRecord} + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin) + isWarningPrintedForMalformedRecord = true + } Nil + } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) { + if (!isWarningPrintedForMalformedRecord) { + logWarning( + s"""Found at least one malformed records (sample: $record). The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |Code example to print all malformed records (scala): + |=================================================== + |// The corrupted record exists in column ${columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin) + isWarningPrintedForMalformedRecord = true + } + emptyRow } else { val row = new GenericMutableRow(schema.length) for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 63a9061210ca6..3d533c14e18e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(jsonDFTwo.schema === schemaTwo) } - test("Corrupt records: PERMISSIVE mode") { + test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") { + withTempView("jsonTable") { + val schema = StructType( + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + + val jsonDF = spark.read.schema(schema).json(corruptRecords) + jsonDF.createOrReplaceTempView("jsonTable") + + checkAnswer( + sql( + """ + |SELECT a, b, c + |FROM jsonTable + """.stripMargin), + Seq( + // Corrupted records are replaced with null + Row(null, null, null), + Row(null, null, null), + Row(null, null, null), + Row("str_a_4", "str_b_4", "str_c_4"), + Row(null, null, null)) + ) + } + } + + test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempView("jsonTable") {