Skip to content

Commit

Permalink
transformer-kafka: add semi-automatic test scenarios using cloud reso…
Browse files Browse the repository at this point in the history
…urces (close #1302)
  • Loading branch information
pondzix committed Jul 31, 2023
1 parent 741f7f1 commit 66ea6cd
Show file tree
Hide file tree
Showing 18 changed files with 1,238 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.Key

import java.nio.charset.StandardCharsets.UTF_8
import java.nio.charset.StandardCharsets

object S3 {

Expand All @@ -43,22 +43,24 @@ object S3 {
BlobStorage.BlobObject(key, url.path.representation.size.getOrElse(0L))
}

def get(path: Key): F[Either[Throwable, String]] = {
override def getBytes(path: Key): Stream[F, Byte] = {
val (bucketName, keyPath) = BlobStorage.splitKey(path)
Authority
.parse(bucketName)
.fold(
errors => Async[F].delay(new MultipleUrlValidationException(errors).asLeft[String]),
errors => Stream.raiseError[F](new MultipleUrlValidationException(errors)),
authority =>
client
.get(Url("s3", authority, Path(keyPath)), 1024)
.compile
.to(Array)
.map(array => new String(array, UTF_8))
.attempt
)
}

def get(path: Key): F[Either[Throwable, String]] =
getBytes(path).compile
.to(Array)
.map(array => new String(array, StandardCharsets.UTF_8))
.attempt

def list(folder: BlobStorage.Folder, recursive: Boolean): Stream[F, BlobStorage.BlobObject] = {
val (bucketName, folderPath) = BlobStorage.splitPath(folder)
Authority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.{Folder, Key}
import fs2.{Pipe, Stream}
import java.net.URI
import java.nio.charset.StandardCharsets

class AzureBlobStorage[F[_]: Async] private (store: AzureStore[F], configuredPath: AzureBlobStorage.PathParts) extends BlobStorage[F] {

Expand All @@ -42,16 +43,18 @@ class AzureBlobStorage[F[_]: Async] private (store: AzureStore[F], configuredPat
}

override def get(key: Key): F[Either[Throwable, String]] =
getBytes(key).compile
.to(Array)
.map(array => new String(array, StandardCharsets.UTF_8))
.attempt

override def getBytes(key: Key): Stream[F, Byte] =
createStorageUrlFrom(key) match {
case Valid(url) =>
store
.get(url, 1024)
.compile
.to(Array)
.map(array => new String(array))
.attempt
case Invalid(errors) =>
Async[F].delay(new MultipleUrlValidationException(errors).asLeft[String])
Stream.raiseError[F](new MultipleUrlValidationException(errors))
}

override def keyExists(key: Key): F[Boolean] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common

import cats.effect.IO
import com.github.mjakubowski84.parquet4s._
import com.github.mjakubowski84.parquet4s.parquet.fromParquet
import io.circe.Json
import com.github.mjakubowski84.parquet4s.{Path => ParquetPath, RowParquetRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path => HadoopPath}
import org.apache.parquet.column.ColumnDescriptor
Expand All @@ -28,37 +31,56 @@ import java.time.temporal.ChronoUnit
import java.time.{Instant, ZoneOffset}
import java.util.TimeZone
import scala.jdk.CollectionConverters._

import fs2.io.file.Path

import scala.annotation.nowarn

object ParquetUtils {

val config = ValueCodecConfiguration(TimeZone.getTimeZone(ZoneOffset.UTC))

def readParquetColumns(path: Path): Map[File, List[ColumnDescriptor]] = {
val conf = new Configuration();
val parquetFileFilter = new FileFilter {
override def accept(pathname: File): Boolean = pathname.toString.endsWith(".parquet")
}

new File(path.toString)
.listFiles(parquetFileFilter)
.map { parquetFile =>
@annotation.nowarn("cat=deprecation")
val parquetMetadata = ParquetFileReader.readFooter(conf, new HadoopPath(parquetFile.toString), ParquetMetadataConverter.NO_FILTER)
val columns = parquetMetadata.getFileMetaData.getSchema.getColumns.asScala.toList
(parquetFile, columns)
(parquetFile, readFileColumns(parquetFile))
}
.toMap
}

@nowarn("cat=deprecation")
def readFileColumns(parquetFile: File): List[ColumnDescriptor] =
ParquetFileReader
.readFooter(new Configuration(), new HadoopPath(parquetFile.toString), ParquetMetadataConverter.NO_FILTER)
.getFileMetaData
.getSchema
.getColumns
.asScala
.toList

def extractColumnsFromSchemaString(schema: String) =
MessageTypeParser
.parseMessageType(schema)
.getColumns
.asScala
.toList

def readParquetRowsAsJsonFrom(path: Path, columns: List[ColumnDescriptor]): IO[List[Json]] =
fromParquet[IO]
.as[RowParquetRecord]
.read(ParquetPath(path.toNioPath.toUri.toString))
.map { record =>
convertParquetRecordToJson(record, List.empty, columns)
}
.compile
.toList
.map(_.sortBy(_.asObject.flatMap(_("event_id")).flatMap(_.asString)))
.map(_.map(_.deepDropNullValues))

def convertParquetRecordToJson(
record: RowParquetRecord,
parentPath: List[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ object TestApplication {
override def get(path: Key): F[Either[Throwable, String]] =
Concurrent[F].raiseError(new Exception("readKey isn't implemented for blob storage file type"))

override def getBytes(path: Key): Stream[F, Byte] =
Stream.empty

override def keyExists(key: Key): F[Boolean] =
Concurrent[F].raiseError(new Exception(s"keyExists isn't implemented for blob storage file type"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.proce

import cats.effect.IO
import cats.effect.unsafe.implicits.global

import fs2.io.file.Path

import com.github.mjakubowski84.parquet4s.{Path => ParquetPath, RowParquetRecord}
import com.github.mjakubowski84.parquet4s.parquet.fromParquet
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils.readParquetColumns
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{AppId, ParquetUtils}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils.{readParquetColumns, readParquetRowsAsJsonFrom}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.WiderowParquetProcessingSpec.{appConfig, igluConfig}
import fs2.io.file.Path
import io.circe.syntax.EncoderOps
import io.circe.{Json, JsonObject}
import org.apache.parquet.column.ColumnDescriptor
Expand Down Expand Up @@ -54,7 +49,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec {
expectedParquetColumns <- readParquetColumnsFromResource(
"/processing-spec/4/output/good/parquet/schema"
) // the same schema as in resource file used in WideRowParquetSpec for batch transformer
actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns)
actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns)
actualParquetColumns = readParquetColumns(goodPath)
actualBadRows <- readStringRowsFrom(badPath)

Expand Down Expand Up @@ -90,7 +85,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec {
for {
output <- process(inputStream, config)
expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/5/output/good/parquet/schema")
actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns)
actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns)
actualParquetColumns = readParquetColumns(goodPath)
expectedCompletionMessage <- readMessageFromResource("/processing-spec/5/output/good/parquet/completion.json", outputDirectory)
expectedParquetRows <- readGoodParquetEventsFromResource(
Expand Down Expand Up @@ -123,7 +118,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec {
for {
output <- process(inputStream, config)
expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/6/output/good/parquet/schema")
actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns)
actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns)
actualParquetColumns = readParquetColumns(goodPath)
expectedCompletionMessage <- readMessageFromResource("/processing-spec/6/output/good/parquet/completion.json", outputDirectory)
expectedParquetRows <-
Expand Down Expand Up @@ -159,7 +154,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec {
expectedParquetColumns <- readParquetColumnsFromResource(
"/processing-spec/7/output/good/parquet/schema"
) // the same schema as in resource file used in WideRowParquetSpec for batch transformer
actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns)
actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns)
actualParquetColumns = readParquetColumns(goodPath)
expectedCompletionMessage <- readMessageFromResource("/processing-spec/7/output/good/parquet/completion.json", outputDirectory)
} yield {
Expand Down Expand Up @@ -198,18 +193,6 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec {
.map(transformEventForParquetTest(columnToAdjust.getOrElse("none")))
}

private def readParquetRowsFrom(path: Path, columns: List[ColumnDescriptor]) =
fromParquet[IO]
.as[RowParquetRecord]
.read(ParquetPath(path.toNioPath.toUri.toString))
.map { record =>
ParquetUtils.convertParquetRecordToJson(record, List.empty, columns)
}
.compile
.toList
.map(_.sortBy(_.asObject.flatMap(_("event_id")).flatMap(_.asString)))
.map(_.map(_.deepDropNullValues))

private def readParquetColumnsFromResource(path: String): IO[List[ColumnDescriptor]] =
readLinesFromResource(path)
.map(_.mkString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ trait BlobStorage[F[_]] {

def get(path: BlobStorage.Key): F[Either[Throwable, String]]

def getBytes(path: BlobStorage.Key): Stream[F, Byte]

/** Check if blob storage key exist */
def keyExists(key: BlobStorage.Key): F[Boolean]
}
Expand Down Expand Up @@ -210,6 +212,9 @@ object BlobStorage {
override def get(path: Key): F[Either[Throwable, String]] =
MonadThrow[F].raiseError(new IllegalArgumentException("noop blobstorage interpreter"))

override def getBytes(path: Key): Stream[F, Byte] =
Stream.empty

override def keyExists(key: Key): F[Boolean] =
MonadThrow[F].raiseError(new IllegalArgumentException("noop blobstorage interpreter"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import fs2.{Pipe, Stream}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.{Folder, Key}

import java.nio.charset.StandardCharsets

object GCS {

def blobStorage[F[_]: Async]: Resource[F, BlobStorage[F]] =
Expand Down Expand Up @@ -58,22 +60,24 @@ object GCS {
)
}

override def get(key: Key): F[Either[Throwable, String]] = {
override def getBytes(key: Key): Stream[F, Byte] = {
val (bucket, path) = BlobStorage.splitKey(key)
Authority
.parse(bucket)
.fold(
errors => Async[F].delay(new MultipleUrlValidationException(errors).asLeft[String]),
errors => Stream.raiseError[F](new MultipleUrlValidationException(errors)),
authority =>
client
.get(Url("gs", authority, Path(path)), 1024)
.compile
.to(Array)
.map(array => new String(array))
.attempt
)
}

override def get(key: Key): F[Either[Throwable, String]] =
getBytes(key).compile
.to(Array)
.map(array => new String(array, StandardCharsets.UTF_8))
.attempt

override def keyExists(key: Key): F[Boolean] = {
val (bucket, path) = BlobStorage.splitKey(key)
Authority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ object PureAWS {
def get(path: Key): Pure[Either[Throwable, String]] =
Pure.pure(Left(new NotImplementedError("Not used in tests")))

def getBytes(path: Key): Stream[Pure, Byte] =
Stream.empty

def keyExists(key: Key): Pure[Boolean] =
Pure.pure(results.keyExists(key))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2012-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression

final case class AppConfiguration(
compression: Compression,
fileFormat: WideRow.WideRowFormat,
windowFrequencyMinutes: Long
)

object AppConfiguration {

/**
* Regarding `windowFrequencyMinutes = 1` - officially the default 'windowing' setting for
* streaming transformer is '10 minutes'. As we don't want to make the tests take too much time,
* we use 1 minute here. It means that for all test scenarios using this default confguration,
* transformer instance under the test needs to be configured with `1 minute` windowing setting.
*
* Compression and file format defaults match the ones from the official reference file.
*
* See reference here ->
* https://github.com/snowplow/snowplow-rdb-loader/blob/master/modules/common-transformer-stream/src/main/resources/application.conf
*/
val default = AppConfiguration(Compression.Gzip, WideRow.WideRowFormat.JSON, windowFrequencyMinutes = 1)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2012-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental

import cats.effect.IO
import cats.effect.kernel.Resource
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}

final case class AppDependencies(
blobClient: BlobStorage[IO],
queueConsumer: Queue.Consumer[IO],
producer: Queue.Producer[IO]
)

object AppDependencies {

trait Provider {
def createDependencies(): Resource[IO, AppDependencies]
}
}
Loading

0 comments on commit 66ea6cd

Please sign in to comment.