diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index ac341031579..e8eec00405e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -1077,6 +1077,7 @@ case class CommitInfo( // infer the commit version from the file name and fill in this field then. @JsonDeserialize(contentAs = classOf[java.lang.Long]) version: Option[Long], + @JsonDeserialize(contentAs = classOf[java.lang.Long]) inCommitTimestamp: Option[Long], timestamp: Timestamp, userId: Option[String], diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index 605eb8ee77f..3fe73dbaea9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -185,6 +185,25 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta assert(Action.fromJson(json1) === expectedCommitInfo) } + test("deserialization of CommitInfo with a very small ICT") { + val json1 = + """{"commitInfo":{"inCommitTimestamp":123,"timestamp":123,"operation":"CONVERT",""" + + """"operationParameters":{},"readVersion":23,""" + + """"isolationLevel":"SnapshotIsolation","isBlindAppend":true,""" + + """"operationMetrics":{"m1":"v1","m2":"v2"},"userMetadata":"123"}}""".stripMargin + assert(Action.fromJson(json1).asInstanceOf[CommitInfo].inCommitTimestamp.get == 123L) + } + + test("deserialization of CommitInfo with missing ICT") { + val json1 = + """{"commitInfo":{"timestamp":123,"operation":"CONVERT",""" + + """"operationParameters":{},"readVersion":23,""" + + """"isolationLevel":"SnapshotIsolation","isBlindAppend":true,""" + + """"operationMetrics":{"m1":"v1","m2":"v2"},"userMetadata":"123"}}""".stripMargin + val ictOpt: Option[Long] = Action.fromJson(json1).asInstanceOf[CommitInfo].inCommitTimestamp + assert(ictOpt.isEmpty) + } + testActionSerDe( "Protocol - json serialization/deserialization", Protocol(minReaderVersion = 1, minWriterVersion = 2),