From 09a08da45d03e26ec6b181090895970d6eeaf7d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Fri, 10 Jun 2022 17:02:11 +0200 Subject: [PATCH] Transformer-kinesis: add missing hadoop-aws dependency for s3 parquet files upload (close #920) --- .../transformer/kinesis/parquet/ParquetSink.scala | 11 ++++++----- project/Dependencies.scala | 2 ++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/parquet/ParquetSink.scala b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/parquet/ParquetSink.scala index 665b133dc..d20f23b9c 100644 --- a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/parquet/ParquetSink.scala +++ b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/parquet/ParquetSink.scala @@ -18,8 +18,8 @@ import cats.Functor import cats.data.EitherT import cats.effect.{Blocker, Concurrent, ContextShift, Timer} import cats.implicits._ -import com.github.mjakubowski84.parquet4s.{ParquetWriter, RowParquetRecord} import com.github.mjakubowski84.parquet4s.parquet.viaParquet +import com.github.mjakubowski84.parquet4s.{ParquetWriter, RowParquetRecord} import com.snowplowanalytics.snowplow.badrows.FailureDetails import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression @@ -35,7 +35,6 @@ import fs2.{Pipe, Stream} import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.schema.MessageType -import java.nio.file.Path import java.net.URI object ParquetSink { @@ -47,7 +46,9 @@ object ParquetSink { (path: SinkPath): Pipe[F, Transformed.Data, Unit] = { transformedData => - val targetPath = Path.of(uri.toString, window.getDir, path.value) + // As uri can use 's3a' schema, using methods from 'java.nio.file.Path' would require additional dependency responsible for adding appropriate 'java.nio.file.spi.FileSystemProvider', see e.g. https://github.com/carlspring/s3fs-nio/ + // Simple strings concat works for both cases: uri configured with and without trailing '/', bypassing usage of 'java.nio.file.Path' + val targetPath = s"${uri.toString}/${window.getDir}/${path.value}" val schemaCreation = createSchemaFromTypes(resources, window).value Stream.eval(schemaCreation).flatMap { @@ -85,7 +86,7 @@ object ParquetSink { private def writeAsParquet[F[_] : Concurrent : ContextShift : Timer](blocker: Blocker, compression: Compression, - path: Path, + path: String, schema: MessageType) = { implicit val targetSchema = schema @@ -97,7 +98,7 @@ object ParquetSink { viaParquet[F, List[FieldWithValue]] .preWriteTransformation(buildParquetRecord) .options(ParquetWriter.Options(compressionCodecName = compressionCodecName)) - .write(blocker, path.toString) + .write(blocker, path) } private def buildParquetRecord(fieldsWithValues: List[FieldWithValue]) = Stream.emit { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 326bd13de..1f88d1d08 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -120,6 +120,7 @@ object Dependencies { val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.jacksonModule val parquet4s = "com.github.mjakubowski84" %% "parquet4s-fs2" % V.parquet4s val hadoop = "org.apache.hadoop" % "hadoop-client" % V.hadoopClient + val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % V.hadoopClient % Runtime val parquetFormat = "org.apache.parquet" % "parquet-format-structures" % V.parquetFormat // Java (Loader) @@ -253,6 +254,7 @@ object Dependencies { circeOptics, parquet4s, hadoop, + hadoopAws, parquetFormat, scalaTracker, scalaTrackerEmit,