Skip to content

Commit

Permalink
update the log message
Browse files Browse the repository at this point in the history
  • Loading branch information
clockfly committed Sep 2, 2016
1 parent adaaffa commit 82f9927
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -52,6 +53,11 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))

@transient
private var isWarningPrintedForMalformedRecord: Boolean = false

/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
Expand All @@ -62,8 +68,39 @@ class JacksonParser(
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
logWarning(s"Dropping malformed line: $record")
if (!isWarningPrintedForMalformedRecord) {
logWarning(
s"""Found at least one malformed records (sample: $record). The JSON reader will drop
|all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
|corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
|mode and use the default inferred schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
isWarningPrintedForMalformedRecord = true
}
Nil
} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
if (!isWarningPrintedForMalformedRecord) {
logWarning(
s"""Found at least one malformed records (sample: $record). The JSON reader will replace
|all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
|To find out which corrupted records have been replaced with null, please use the
|default inferred schema instead of providing a custom schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}.
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
isWarningPrintedForMalformedRecord = true
}
emptyRow
} else {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}

test("Corrupt records: PERMISSIVE mode") {
test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
withTempView("jsonTable") {
val schema = StructType(
StructField("a", StringType, true) ::
StructField("b", StringType, true) ::
StructField("c", StringType, true) :: Nil)

val jsonDF = spark.read.schema(schema).json(corruptRecords)
jsonDF.createOrReplaceTempView("jsonTable")

checkAnswer(
sql(
"""
|SELECT a, b, c
|FROM jsonTable
""".stripMargin),
Seq(
// Corrupted records are replaced with null
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
}
}

test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempView("jsonTable") {
Expand Down

0 comments on commit 82f9927

Please sign in to comment.