diff --git a/iis-common/pom.xml b/iis-common/pom.xml index 53c37a3a8..af4504cc2 100644 --- a/iis-common/pom.xml +++ b/iis-common/pom.xml @@ -70,13 +70,6 @@ hadoop-common - - org.scala-lang - scala-library - ${scala.version} - provided - - org.apache.spark spark-core_2.12 @@ -169,29 +162,6 @@ - - net.alchim31.maven - scala-maven-plugin - 3.2.2 - - - scala-compile-first - process-resources - - add-source - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - - org.apache.avro @@ -257,8 +227,4 @@ - - 2.12.14 - - diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.java new file mode 100644 index 000000000..36ec7ad9b --- /dev/null +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.java @@ -0,0 +1,41 @@ +package eu.dnetlib.iis.common.spark.avro; + +import java.io.Serializable; + +import org.apache.avro.Schema; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; + +/** + * Support for reading avro datastores as dataframes. + * + * @author mhorst + * + */ +public class AvroDataFrameReader implements Serializable { + + private static final long serialVersionUID = 4858427693578954728L; + + private final SparkSession sparkSession; + + /** + * Default constructor accepting spark session as parameter. + * @param sparkSession spark session + */ + public AvroDataFrameReader(SparkSession sparkSession) { + this.sparkSession = sparkSession; + } + + /** + * @param path Path to the data store + * @param avroSchema Avro schema of the records + * @return DataFrame with data read from given path + */ + public Dataset read(String path, Schema avroSchema) { + Dataset in = sparkSession.read().format("avro").option("avroSchema", avroSchema.toString()).load(path); + return sparkSession.createDataFrame(in.rdd(), (StructType) SchemaConverters.toSqlType(avroSchema).dataType()); + } +} diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.java new file mode 100644 index 000000000..22156ac6f --- /dev/null +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.java @@ -0,0 +1,38 @@ +package eu.dnetlib.iis.common.spark.avro; + +import java.io.Serializable; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Support for dataframes of avro types. + * + * @author mhorst + */ +public class AvroDataFrameSupport implements Serializable { + + private static final long serialVersionUID = -3980871922050483460L; + + private AvroDataFrameSupport() { + } + + /** + * @param type of elements + * @param dataFrame seq with elements for the dataframe + * @param clazz class of objects in the dataset + * @return Dataset of objects corresponding to records in the given dataframe + */ + public static Dataset toDS(final Dataset dataFrame, final Class clazz) { + final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + false); + return (Dataset) dataFrame.toJSON().map((MapFunction) json -> (T) mapper.readValue(json, clazz), + Encoders.kryo((Class) clazz)); + } +} diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.java new file mode 100644 index 000000000..c90d62e86 --- /dev/null +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.java @@ -0,0 +1,38 @@ +package eu.dnetlib.iis.common.spark.avro; + +import java.io.Serializable; + +import org.apache.avro.Schema; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +/** + * Support for writing dataframes of avro types. + * + * @author mhorst + */ +public class AvroDataFrameWriter implements Serializable { + + private static final long serialVersionUID = 7842491849433906246L; + + private final Dataset dataFrame; + + /** + * Default constructor accepting DataFrame. + * + * @param dataFrame DataFrame of avro type + */ + public AvroDataFrameWriter(Dataset dataFrame) { + this.dataFrame = dataFrame; + } + + /** + * Writes a dataframe as avro datastore using avro schema. + * @param path path to the data store + * @param avroSchema Avro schema of the records + */ + public void write(String path, Schema avroSchema) { + dataFrame.write().format("avro").option("avroSchema", avroSchema.toString()) + .option("compression", "uncompressed").save(path); + } +} diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.java new file mode 100644 index 000000000..72b0ba164 --- /dev/null +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.java @@ -0,0 +1,43 @@ +package eu.dnetlib.iis.common.spark.avro; + +import java.io.Serializable; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; + +/** + * Support for reading avro datastores as datasets. + * + * @author mhorst + */ +public class AvroDatasetReader implements Serializable { + + private static final long serialVersionUID = 4858427693578954728L; + + private final SparkSession sparkSession; + + /** + * Default constructor accepting spark session as parameter. + * @param sparkSession spark session + */ + public AvroDatasetReader(SparkSession sparkSession) { + this.sparkSession = sparkSession; + } + + /** + * Reads avro datastore as Spark dataset using avro schema and kryo encoder. + * + * NOTE: due to inability to use bean-based encoder for avro types this method uses kryo encoder; + * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects. + * + * @param type of objects in the dataset + * @param path path to the data store + * @param avroSchema Avro schema of the records + * @param clazz class of objects in the dataset + * @return Dataset with data read from given path + */ + public Dataset read(String path, Schema avroSchema, Class clazz) { + return AvroDataFrameSupport.toDS(new AvroDataFrameReader(sparkSession).read(path, avroSchema), clazz); + } +} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.scala deleted file mode 100644 index bc4540c73..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReader.scala +++ /dev/null @@ -1,40 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import org.apache.avro.Schema -import org.apache.spark.sql.avro.SchemaConverters -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SparkSession} - -/** - * Support for reading avro datastores as dataframes. - * - * @param spark SparkSession instance. - */ -class AvroDataFrameReader(val spark: SparkSession) extends Serializable { - - /** - * Reads avro datastore as Spark dataframe using SQL schema. - * - * @param path Path to the datastore. - * @param schema SQL schema of the records. - * @return DataFrame with data read from given path. - */ - def read(path: String, schema: StructType): DataFrame = { - read(path, SchemaConverters.toAvroType(schema)) - } - - /** - * Reads avro datastore as Spark dataframe using avro schema. - * - * @param path Path to the data store. - * @param avroSchema Avro schema of the records. - * @return DataFrame with data read from given path. - */ - def read(path: String, avroSchema: Schema): DataFrame = { - val in = spark.read - .format("avro") - .option("avroSchema", avroSchema.toString) - .load(path) - spark.createDataFrame(in.rdd, SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]) - } -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala deleted file mode 100644 index 3ad0f7a1d..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala +++ /dev/null @@ -1,67 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} -import org.apache.avro.Schema -import org.apache.avro.specific.SpecificRecordBase -import org.apache.spark.sql._ -import org.apache.spark.sql.avro._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types.StructType - -import scala.collection.JavaConverters._ - -/** - * Support for dataframes of avro types. - * - * @param spark SparkSession instance. - */ -class AvroDataFrameSupport(val spark: SparkSession) extends Serializable { - - /** - * Creates a dataframe from a given collection. - * - * @param data List with elements for the dataframe. - * @param avroSchema Avro schema of the elements. - * @tparam T Type of elements. - * @return DataFrame containing data from the given list. - */ -// def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = { -// createDataFrame(data.asScala, avroSchema) -// } - - /** - * Creates a dataframe from a given collection. - * - * @param data Seq with elements for the dataframe. - * @param avroSchema Avro schema of the elements. - * @tparam T Type of elements. - * @return DataFrame containing data from the given seq. - */ -// def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = { -// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] -// val encoder = RowEncoder.apply(rowSchema).resolveAndBind() -// val deserializer = new AvroDeserializer(avroSchema, rowSchema) -// val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) -// spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema) -// } - - /** - * Creates a dataset from given dataframe using kryo encoder. - * - * NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder; - * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects. - * - * @param df DataFrame to be converted to a dataset. - * @param clazz Class of objects in the dataset. - * @tparam T Type of objects in the dataset. - * @return Dataset of objects corresponding to records in the given dataframe. - */ - def toDS[T <: SpecificRecordBase](df: DataFrame, clazz: Class[T]): Dataset[T] = { - implicit val encoder: Encoder[T] = Encoders.kryo(clazz) - val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - df - .toJSON - .map(json => mapper.readValue(json, clazz)) - } -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.scala deleted file mode 100644 index ca31204fd..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriter.scala +++ /dev/null @@ -1,38 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import org.apache.avro.Schema -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.avro.SchemaConverters - -/** - * Support for writing dataframes of avro types. - * - * @param df DataFrame of avro type. - */ -class AvroDataFrameWriter(df: DataFrame) extends Serializable { - - /** - * Writes a dataframe as avro datastore using avro schema generated from sql schema. - * - * @param path Path to the data store. - * @return - */ - def write(path: String): Unit = { - write(path, SchemaConverters.toAvroType(df.schema)) - } - - /** - * Writes a dataframe as avro datastore using avro schema. - * - * @param path Path to the data store. - * @param avroSchema Avro schema of the records. - */ - def write(path: String, avroSchema: Schema): Unit = { - df - .write - .format("avro") - .option("avroSchema", avroSchema.toString) - .option("compression", "uncompressed") - .save(path) - } -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.scala deleted file mode 100644 index cb17918b1..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetReader.scala +++ /dev/null @@ -1,29 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import org.apache.avro.Schema -import org.apache.avro.specific.SpecificRecordBase -import org.apache.spark.sql.{Dataset, SparkSession} - -/** - * Support for reading avro datastores as datasets. - * - * @param spark SparkSession instance. - */ -class AvroDatasetReader(val spark: SparkSession) extends Serializable { - - /** - * Reads avro datastore as Spark dataset using avro schema and kryo encoder. - * - * NOTE: due to inability to use bean-based encoder for avro types this method uses kryo encoder; - * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects. - * - * @param path Path to the data store. - * @param avroSchema Avro schema of the records. - * @param clazz Class of objects in the dataset. - * @tparam T Type of objects in the dataset. - * @return Dataset with data read from given path. - */ - def read[T <: SpecificRecordBase](path: String, avroSchema: Schema, clazz: Class[T]): Dataset[T] = { - new AvroDataFrameSupport(spark).toDS(new AvroDataFrameReader(spark).read(path, avroSchema), clazz) - } -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala deleted file mode 100644 index ea39f79c8..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala +++ /dev/null @@ -1,42 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import org.apache.avro.Schema -import org.apache.avro.specific.SpecificRecordBase -import org.apache.spark.sql._ -import org.apache.spark.sql.avro._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types.StructType - -/** - * Support for datasets of avro types. - * - * @param spark SparkSession instance. - */ -class AvroDatasetSupport(val spark: SparkSession) extends Serializable { - - /** - * Creates a dataframe from given dataset of avro type. - * - * @param ds Dataset to be converted to a dataframe. - * @param avroSchema Avro schema of the records. - * @tparam T Type of objects in the dataset. - * @return DataFrame of objects corresponding to records in the given dataset. - */ -// def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = { -// val avroSchemaStr = avroSchema.toString -// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] -// val encoder = RowEncoder(rowSchema).resolveAndBind() -// -// object SerializationSupport extends Serializable { -// @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema) -// private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) -// -// def doToDF(): DataFrame = { -// spark.createDataFrame(rows, rowSchema) -// } -// } -// -// SerializationSupport.doToDF() -// } -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala deleted file mode 100644 index 20258d3a9..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala +++ /dev/null @@ -1,25 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import org.apache.avro.Schema -import org.apache.avro.specific.SpecificRecordBase -import org.apache.spark.sql.Dataset - -/** - * Support for writing datasets of avro types. - * - * @param ds Dataset of avro type. - * @tparam T Avro type. - */ -class AvroDatasetWriter[T <: SpecificRecordBase](ds: Dataset[T]) extends Serializable { - - /** - * Writes a dataset as avro datastore using avro schema. - * - * @param path Path to the data store. - * @param avroSchema Avro schema of the records. - */ -// def write(path: String, avroSchema: Schema): Unit = { -// new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema)) -// .write(path, avroSchema) -// } -} diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java index eafcb3b02..f4168cafc 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameReaderTest.java @@ -1,23 +1,23 @@ package eu.dnetlib.iis.common.spark.avro; -import eu.dnetlib.iis.common.avro.Person; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.utils.AvroTestUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.avro.SchemaConverters; -import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.iis.common.avro.Person; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.common.utils.AvroTestUtils; class AvroDataFrameReaderTest extends TestWithSharedSparkSession { @@ -29,25 +29,6 @@ public void beforeEach() { reader = new AvroDataFrameReader(spark()); } - @Test - @DisplayName("Avro dataframe reader reads avro datastore with SQL schema as dataframe") - public void givenAvroDatastore_whenReadUsingAvroReaderWithSQLSchema_thenProperDataFrameIsReturned(@TempDir Path inputDir) throws IOException { - Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); - List data = Collections.singletonList(person); - AvroTestUtils.createLocalAvroDataStore(data, inputDir.toString(), Person.class); - - Dataset result = reader.read(inputDir.toString(), - (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType()); - - assertEquals(SchemaConverters.toSqlType(Person.SCHEMA$).dataType(), result.schema()); - List rows = result.collectAsList(); - assertEquals(1, rows.size()); - Row row = rows.get(0); - assertEquals(person.getId(), row.getAs("id")); - assertEquals(person.getName(), row.getAs("name")); - assertEquals(person.getAge(), row.getAs("age")); - } - @Test @DisplayName("Avro dataframe reader reads avro datastore with avro schema as dataframe") public void givenAvroDatastore_whenReadUsingAvroReaderWithAvroSchema_thenProperDataFrameIsReturned(@TempDir Path inputDir) throws IOException { diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java index 5e28202b3..c073b069b 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java @@ -1,49 +1,23 @@ package eu.dnetlib.iis.common.spark.avro; -import eu.dnetlib.iis.common.avro.Person; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import org.apache.avro.Schema; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; +import java.util.List; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.avro.SchemaConverters; import org.apache.spark.sql.types.StructType; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.iis.common.avro.Person; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; public class AvroDataFrameSupportTest extends TestWithSharedSparkSession { - private AvroDataFrameSupport support; - - @BeforeEach - public void beforeEach() { - super.beforeEach(); - support = new AvroDataFrameSupport(spark()); - } - -// @Test -// @DisplayName("Avro dataframe support creates dataframe from collection of avro type") -// public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { -// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); -// List data = Collections.singletonList(person); -// -// Dataset result = support.createDataFrame(data, Person.SCHEMA$); -// -// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); -// List rows = result.collectAsList(); -// assertEquals(1, rows.size()); -// Row row = rows.get(0); -// assertEquals(person.getId(), row.getAs("id")); -// assertEquals(person.getName(), row.getAs("name")); -// assertEquals(person.getAge(), row.getAs("age")); -// } - @Test @DisplayName("Avro dataframe support converts dataframe of avro type to dataset of avro type") public void givenDataFrameOfAvroType_whenConvertedToDataset_thenProperDatasetIsReturned() { @@ -53,7 +27,7 @@ public void givenDataFrameOfAvroType_whenConvertedToDataset_thenProperDatasetIsR data, (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType() ); - Dataset result = support.toDS(df, Person.class); + Dataset result = AvroDataFrameSupport.toDS(df, Person.class); List personList = result.collectAsList(); assertEquals(1, personList.size()); @@ -63,7 +37,4 @@ public void givenDataFrameOfAvroType_whenConvertedToDataset_thenProperDatasetIsR assertEquals(personRow.getAs(2), person.getAge()); } - private static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) { - assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable()); - } } \ No newline at end of file diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java index e257a9cae..74f2c2824 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameWriterTest.java @@ -1,9 +1,12 @@ package eu.dnetlib.iis.common.spark.avro; -import eu.dnetlib.iis.common.avro.Person; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.utils.AvroTestUtils; -import org.apache.avro.generic.GenericRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -14,12 +17,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.iis.common.avro.Person; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.common.utils.AvroTestUtils; class AvroDataFrameWriterTest extends TestWithSharedSparkSession { @@ -28,26 +28,6 @@ public void beforeEach() { super.beforeEach(); } - @Test - @DisplayName("Avro dataframe writer writes dataframe of avro type using SQL schema") - public void givenDataFrameOfAvroType_whenWrittenToOutputUsingSQLSchema_thenWriteSucceeds(@TempDir Path workingDir) throws IOException { - Path outputDir = workingDir.resolve("output"); - Row personRow = RowFactory.create(1, "name", 2); - Dataset df = spark().createDataFrame( - Collections.singletonList(personRow), - (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType() - ); - - new AvroDataFrameWriter(df).write(outputDir.toString()); - - List genericRecordList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); - assertEquals(1, genericRecordList.size()); - GenericRecord genericRecord = genericRecordList.get(0); - assertEquals(personRow.getAs(0), genericRecord.get(0)); - assertEquals(personRow.getAs(1).toString(), genericRecord.get(1).toString()); - assertEquals(personRow.getAs(2), genericRecord.get(2)); - } - @Test @DisplayName("Avro dataframe writer writes dataframe of avro type using avro schema") public void givenDataFrameOfAvroType_whenWrittenToOutputUsingAvroSchema_thenWriteSucceeds(@TempDir Path workingDir) throws IOException { diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java index e57403116..1537a87ae 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java @@ -1,21 +1,13 @@ package eu.dnetlib.iis.common.spark.avro; -import eu.dnetlib.iis.common.avro.Person; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import static org.junit.jupiter.api.Assertions.assertEquals; + import org.apache.avro.Schema; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; import org.apache.spark.sql.avro.SchemaConverters; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import java.util.Collections; -import java.util.List; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; public class AvroDatasetSupportTest extends TestWithSharedSparkSession { @@ -24,26 +16,6 @@ public void beforeEach() { super.beforeEach(); } -// @Test -// @DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type") -// public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { -// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); -// Dataset ds = spark().createDataset( -// Collections.singletonList(person), -// Encoders.kryo(Person.class) -// ); -// -// Dataset result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$); -// -// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); -// List rows = result.collectAsList(); -// assertEquals(1, rows.size()); -// Row row = rows.get(0); -// assertEquals(person.getId(), row.getAs("id")); -// assertEquals(person.getName(), row.getAs("name")); -// assertEquals(person.getAge(), row.getAs("age")); -// } - public static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) { assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable()); } diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java deleted file mode 100644 index beab31aa2..000000000 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java +++ /dev/null @@ -1,44 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro; - -import eu.dnetlib.iis.common.avro.Person; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.utils.AvroTestUtils; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -class AvroDatasetWriterTest extends TestWithSharedSparkSession { - - @BeforeEach - public void beforeEach() { - super.beforeEach(); - } - -// @Test -// @DisplayName("Avro dataset writer writes dataset of avro type") -// public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException { -// Path outputDir = workingDir.resolve("output"); -// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); -// Dataset ds = spark().createDataset( -// Collections.singletonList(person), -// Encoders.kryo(Person.class) -// ); -// -// new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$); -// -// List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); -// assertEquals(1, personList.size()); -// Person personRead = personList.get(0); -// assertEquals(person, personRead); -// } -} \ No newline at end of file diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java index 816914a9b..b4310ade2 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java +++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java @@ -46,7 +46,7 @@ public void givenInputCitationsPath_whenRun_thenSerializedAtomicActionsAndReport ); Path inputCitationsPath = rootInputPath.resolve("citations"); - new AvroDataFrameWriter(CitationRelationExporterTestUtils.createDataFrame(spark(), citationsList)).write(inputCitationsPath.toString()); + new AvroDataFrameWriter(CitationRelationExporterTestUtils.createDataFrame(spark(), citationsList)).write(inputCitationsPath.toString(),Citations.SCHEMA$); float trustLevelThreshold = 0.5f; Path outputRelationPath = rootOutputPath.resolve("output"); diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java index 687158df1..7c8400924 100644 --- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/content/HiveBasedDocumentContentUrlImporterJob.java @@ -59,7 +59,7 @@ public static void main(String[] args) throws Exception { Dataset result = sparkSession.sql("select id, location, mimetype, size, hash from " + params.inputTableName + " where location is not null"); - JavaRDD documentContentUrl = buildOutputRecord(result, sparkSession); + JavaRDD documentContentUrl = buildOutputRecord(result); documentContentUrl.cache(); JavaRDD reports = generateReportEntries(sparkSession, documentContentUrl.count()); @@ -76,7 +76,7 @@ private static JavaRDD generateReportEntries(SparkSession sparkSess Encoders.kryo(ReportEntry.class)).javaRDD(); } - private static JavaRDD buildOutputRecord(Dataset source, SparkSession spark) { + private static JavaRDD buildOutputRecord(Dataset source) { Dataset resultDs = source.select( concat(lit(InfoSpaceConstants.ROW_PREFIX_RESULT), col("id")).as("id"), col("location").as("url"), @@ -84,7 +84,7 @@ private static JavaRDD buildOutputRecord(Dataset source col("size").cast("long").divide(1024).as("contentSizeKB"), col("hash").as("contentChecksum") ); - return new AvroDataFrameSupport(spark).toDS(resultDs, DocumentContentUrl.class).toJavaRDD(); + return AvroDataFrameSupport.toDS(resultDs, DocumentContentUrl.class).toJavaRDD(); } @Parameters(separators = "=") diff --git a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java index bae8e4238..0af611eb1 100644 --- a/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java +++ b/iis-wf/iis-wf-import/src/main/java/eu/dnetlib/iis/wf/importer/infospace/ImportInformationSpaceJobUtils.java @@ -63,6 +63,6 @@ public static JavaRDD produceGraphIdToObjectStoreIdMapping(Ja col("oid").as("originalId") ); - return new AvroDataFrameSupport(spark).toDS(identifierMappingDF, IdentifierMapping.class).toJavaRDD(); + return AvroDataFrameSupport.toDS(identifierMappingDF, IdentifierMapping.class).toJavaRDD(); } } diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java index c8945089f..d23e49ef8 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java +++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/patent/PatentMetadataRetrieverJob.java @@ -94,7 +94,7 @@ public static void main(String[] args) throws Exception { JavaPairRDD> cacheById = cachedSources .mapToPair(x -> new Tuple2<>(x.getId(), Optional.of(x))) - .union(cachedFaults.mapToPair(x -> new Tuple2<>(x.getInputObjectId(), Optional.empty()))); + .union(cachedFaults.mapToPair(x -> new Tuple2>(x.getInputObjectId(), Optional.empty()))); JavaPairRDD inputById = importedPatents .mapToPair(x -> new Tuple2<>(getId(x), x)); JavaPairRDD>>> inputJoinedWithCache = diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java index f90d1cf76..1cff2db27 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java +++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java @@ -99,7 +99,7 @@ public static void main(String[] args) throws Exception { JavaPairRDD> cacheByUrl = cachedSources .mapToPair(x -> new Tuple2<>(x.getId(), Optional.of(x))) - .union(cachedFaults.mapToPair(x -> new Tuple2<>(x.getInputObjectId(), Optional.empty()))); + .union(cachedFaults.mapToPair(x -> new Tuple2>(x.getInputObjectId(), Optional.empty()))); JavaPairRDD inputByUrl = documentToSoftwareUrl .mapToPair(x -> new Tuple2<>(x.getSoftwareUrl(), x)); JavaPairRDD>>> inputJoinedWithCache =