diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala index b1db6fd0f3..6a843d4754 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala @@ -151,8 +151,8 @@ class GeoJSONFileFormat extends TextBasedFileFormat with DataSourceRegister { allowArrayAsStructs = true) val dataSource = JsonDataSource(parsedOptions) - dataSource - .readFile(broadcastedHadoopConf.value.value, file, parser, actualSchema) + SparkCompatUtil + .readFile(dataSource, broadcastedHadoopConf.value.value, file, parser, actualSchema) .map(row => { val newRow = GeoJSONUtils.convertGeoJsonToGeometry(row, alteredSchema) newRow diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala index 4043dbd3e4..8ce7b61ad3 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala @@ -18,10 +18,14 @@ */ package org.apache.spark.sql.sedona_sql.io.geojson +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonParser} import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.LegacyDateFormat +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.json.JsonDataSource import org.apache.spark.sql.types.{DataType, StructField, StructType} import scala.reflect.runtime.{universe => ru} @@ -158,4 +162,43 @@ object SparkCompatUtil { } } } + + def readFile( + jsonDataSource: JsonDataSource, + conf: Configuration, + file: PartitionedFile, + parser: JacksonParser, + schema: StructType): Iterator[InternalRow] = { + val readFileMethods = + jsonDataSource.getClass.getDeclaredMethods.filter(_.getName == "readFile") + // Get the number of input arguments of the readFile method + readFileMethods.find(_.getParameterCount == 4) match { + case Some(readFileMethod) => + // The readFile method defined by open-source Apache Spark: + // def readFile( + // conf: Configuration, + // file: PartitionedFile, + // parser: JacksonParser, + // schema: StructType): Iterator[InternalRow] + readFileMethod + .invoke(jsonDataSource, conf, file, parser, schema) + .asInstanceOf[Iterator[InternalRow]] + case None => + readFileMethods.find(_.getParameterCount == 5) match { + case Some(readFileMethod) => + // The readFile method defined by DBR: + // def readFile( + // conf: Configuration, + // file: PartitionedFile, + // parser: JacksonParser, + // schema: StructType, + // badRecordsWriter: Option[BadRecordsWriter]): Iterator[InternalRow] + readFileMethod + .invoke(jsonDataSource, conf, file, parser, schema, None) + .asInstanceOf[Iterator[InternalRow]] + case None => + throw new Exception("No suitable readFile method found in JsonDataSource") + } + } + } }