Skip to content

Commit

Permalink
Transformer-kinesis: add missing hadoop-aws dependency for s3 parquet…
Browse files Browse the repository at this point in the history
… files upload (close #920)
  • Loading branch information
pondzix committed Jun 13, 2022
1 parent ff75ca0 commit 3b301ad
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import cats.Functor
import cats.data.EitherT
import cats.effect.{Blocker, Concurrent, ContextShift, Timer}
import cats.implicits._
import com.github.mjakubowski84.parquet4s.{ParquetWriter, RowParquetRecord}
import com.github.mjakubowski84.parquet4s.parquet.viaParquet
import com.github.mjakubowski84.parquet4s.{ParquetWriter, RowParquetRecord}
import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
Expand All @@ -35,7 +35,6 @@ import fs2.{Pipe, Stream}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.schema.MessageType

import java.nio.file.Path
import java.net.URI

object ParquetSink {
Expand All @@ -47,7 +46,11 @@ object ParquetSink {
(path: SinkPath): Pipe[F, Transformed.Data, Unit] = {
transformedData =>

val targetPath = Path.of(uri.toString, window.getDir, path.value)
/**
* As uri can use 's3a' schema, using methods from 'java.nio.file.Path' would require additional dependency responsible for adding appropriate 'java.nio.file.spi.FileSystemProvider', see e.g. https://github.com/carlspring/s3fs-nio/
* Simple strings concat works for both cases: uri configured with and without trailing '/', bypassing usage of 'java.nio.file.Path'
*/
val targetPath = s"${uri.toString}/${window.getDir}/${path.value}"
val schemaCreation = createSchemaFromTypes(resources, window).value

Stream.eval(schemaCreation).flatMap {
Expand Down Expand Up @@ -85,7 +88,7 @@ object ParquetSink {

private def writeAsParquet[F[_] : Concurrent : ContextShift : Timer](blocker: Blocker,
compression: Compression,
path: Path,
path: String,
schema: MessageType) = {
implicit val targetSchema = schema

Expand All @@ -97,7 +100,7 @@ object ParquetSink {
viaParquet[F, List[FieldWithValue]]
.preWriteTransformation(buildParquetRecord)
.options(ParquetWriter.Options(compressionCodecName = compressionCodecName))
.write(blocker, path.toString)
.write(blocker, path)
}

private def buildParquetRecord(fieldsWithValues: List[FieldWithValue]) = Stream.emit {
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ object Dependencies {
val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.jacksonModule
val parquet4s = "com.github.mjakubowski84" %% "parquet4s-fs2" % V.parquet4s
val hadoop = "org.apache.hadoop" % "hadoop-client" % V.hadoopClient
val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % V.hadoopClient % Runtime
val parquetFormat = "org.apache.parquet" % "parquet-format-structures" % V.parquetFormat

// Java (Loader)
Expand Down Expand Up @@ -253,6 +254,7 @@ object Dependencies {
circeOptics,
parquet4s,
hadoop,
hadoopAws,
parquetFormat,
scalaTracker,
scalaTrackerEmit,
Expand Down

0 comments on commit 3b301ad

Please sign in to comment.