diff --git a/build.sbt b/build.sbt index ed55911bb..2809c5ad4 100755 --- a/build.sbt +++ b/build.sbt @@ -26,7 +26,8 @@ lazy val common = project.in(file("common")) Dependencies.circeGeneric, Dependencies.circeGenericExtra, Dependencies.circeLiteral, - Dependencies.schemaDdl + Dependencies.schemaDdl, + Dependencies.specs2 ) ) diff --git a/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala index 939cba8d5..888d450f8 100644 --- a/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala +++ b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala @@ -12,7 +12,9 @@ */ package com.snowplowanalytics.snowplow.rdbloader.common -import io.circe.Json +import io.circe.{ Json, Encoder } +import io.circe.syntax._ +import io.circe.literal._ import cats.Monad import cats.data.EitherT @@ -24,10 +26,10 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.client.ClientError.ResolutionError +import com.snowplowanalytics.iglu.client.ClientError import com.snowplowanalytics.iglu.schemaddl.IgluSchema -import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas +import com.snowplowanalytics.iglu.schemaddl.migrations.{ SchemaList => DdlSchemaList } import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ @@ -37,27 +39,39 @@ object Flattening { * `SchemaList` is unavailable (in case no Iglu Server hosts this schemas) * Particular schema could not be fetched, thus whole flattening algorithm cannot be built */ - sealed trait FlatteningError + sealed trait FlatteningError extends Product with Serializable + object FlatteningError { - case class SchemaListResolution(error: ResolutionError) extends FlatteningError - case class SchemaResolution(error: ResolutionError) extends FlatteningError - case class Parsing(error: String) extends FlatteningError - } + final case class SchemaListResolution(resolutionError: ClientError.ResolutionError, criterion: SchemaCriterion) extends FlatteningError + final case class SchemaResolution(resolutionError: ClientError.ResolutionError, schema: SchemaKey) extends FlatteningError + final case class Parsing(parsingError: String) extends FlatteningError - // Cache = Map[SchemaKey, OrderedSchemas] + implicit val flatteningErrorCirceEncoder: Encoder[FlatteningError] = + Encoder.instance { + case SchemaListResolution(error, criterion) => + json"""{"resolutionError": ${error: ClientError}, "criterion": ${criterion.asString}, "type": "SchemaListResolution"}""" + case SchemaResolution(error, schema) => + json"""{"resolutionError": ${error: ClientError}, "schema": ${schema.toSchemaUri}, "type": "SchemaResolution"}""" + case Parsing(error) => + json"""{"parsingError": $error, "type": "Parsing"}""" + } + } - def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FlatteningError, OrderedSchemas] = + def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FlatteningError, DdlSchemaList] = getOrdered(resolver, key.vendor, key.name, key.version.model) - def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FlatteningError, OrderedSchemas] = + def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FlatteningError, DdlSchemaList] = { + val criterion = SchemaCriterion(vendor, name, "jsonschema", Some(model), None, None) + val schemaList = resolver.listSchemas(vendor, name, Some(model)) for { - schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(vendor, name, Some(model))).leftMap(FlatteningError.SchemaListResolution) - ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver)) + schemaList <- EitherT[F, ClientError.ResolutionError, SchemaList](schemaList).leftMap(error => FlatteningError.SchemaListResolution(error, criterion)) + ordered <- DdlSchemaList.fromSchemaList(schemaList, fetch(resolver)) } yield ordered + } def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FlatteningError, IgluSchema] = for { - json <- EitherT(resolver.lookupSchema(key, 2)).leftMap(FlatteningError.SchemaResolution) + json <- EitherT(resolver.lookupSchema(key)).leftMap(error => FlatteningError.SchemaResolution(error, key)) schema <- EitherT.fromEither(parseSchema(json)) } yield schema diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StorageTarget.scala similarity index 76% rename from src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala rename to common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StorageTarget.scala index 97dedf7e9..ad19bfc0e 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala +++ b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StorageTarget.scala @@ -10,8 +10,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.rdbloader -package config +package com.snowplowanalytics.snowplow.rdbloader.common import java.util.Properties @@ -19,19 +18,14 @@ import cats.Id import cats.data._ import cats.implicits._ -import io.circe._ -import io.circe.parser.parse -import io.circe.Decoder._ -import io.circe.generic.auto._ - -import com.snowplowanalytics.iglu.core.SelfDescribingData -import com.snowplowanalytics.iglu.core.circe.instances._ import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.core.circe.instances._ +import com.snowplowanalytics.iglu.core.{SelfDescribingData, SchemaCriterion} -// This project -import LoaderError._ -import utils.Common._ - +import io.circe._ +import io.circe.Decoder._ +import io.circe.generic.semiauto._ +import io.circe.parser.parse /** * Common configuration for JDBC target, such as Redshift and Postgres @@ -49,25 +43,29 @@ sealed trait StorageTarget extends Product with Serializable { def processingManifest: Option[StorageTarget.ProcessingManifestConfig] - def eventsTable: String = - loaders.Common.getEventsTable(schema) - def shreddedTable(tableName: String): String = s"$schema.$tableName" def sshTunnel: Option[StorageTarget.TunnelConfig] + + def blacklistTabular: Option[List[SchemaCriterion]] // None means tabular is disabled } object StorageTarget { + case class ParseError(message: String) extends AnyVal + sealed trait SslMode extends StringEnum { def asProperty = asString.toLowerCase.replace('_', '-') } - case object Disable extends SslMode { def asString = "DISABLE" } - case object Require extends SslMode { def asString = "REQUIRE" } - case object VerifyCa extends SslMode { def asString = "VERIFY_CA" } - case object VerifyFull extends SslMode { def asString = "VERIFY_FULL" } implicit val sslModeDecoder: Decoder[SslMode] = - decodeStringEnum[SslMode] + StringEnum.decodeStringEnum[SslMode] // TODO: tests fail if it is in object + + object SslMode { + case object Disable extends SslMode { def asString = "DISABLE" } + case object Require extends SslMode { def asString = "REQUIRE" } + case object VerifyCa extends SslMode { def asString = "VERIFY_CA" } + case object VerifyFull extends SslMode { def asString = "VERIFY_FULL" } + } /** * Configuration to access Snowplow Processing Manifest @@ -77,6 +75,9 @@ object StorageTarget { object ProcessingManifestConfig { case class AmazonDynamoDbConfig(tableName: String) + + implicit val amazonDynamoDbConfigDecoder: Decoder[AmazonDynamoDbConfig] = + deriveDecoder[AmazonDynamoDbConfig] } /** @@ -93,7 +94,8 @@ object StorageTarget { username: String, password: PasswordConfig, sshTunnel: Option[TunnelConfig], - processingManifest: Option[ProcessingManifestConfig]) + processingManifest: Option[ProcessingManifestConfig], + blacklistTabular: Option[List[SchemaCriterion]]) extends StorageTarget /** @@ -113,7 +115,8 @@ object StorageTarget { maxError: Int, compRows: Long, sshTunnel: Option[TunnelConfig], - processingManifest: Option[ProcessingManifestConfig]) + processingManifest: Option[ProcessingManifestConfig], + blacklistTabular: Option[List[SchemaCriterion]]) extends StorageTarget /** @@ -133,7 +136,7 @@ object StorageTarget { tcpKeepAlive: Option[Boolean], tcpKeepAliveMinutes: Option[Int]) { /** Either errors or list of mutators to update the `Properties` object */ - val validation: Either[LoaderError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map { + val validation: Either[ParseError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map { case (property, value) => value.fold( ((_: Properties) => ()).asRight, b => ((props: Properties) => { props.setProperty(property, b.toString); () }).asRight, @@ -150,10 +153,10 @@ object StorageTarget { _ => s"Impossible to apply JDBC property [$property] with JSON object".asLeft ) } traverse(_.toValidatedNel) match { - case Validated.Valid(updaters) => updaters.asRight[LoaderError] + case Validated.Valid(updaters) => updaters.asRight[ParseError] case Validated.Invalid(errors) => val messages = "Invalid JDBC options: " ++ errors.toList.mkString(", ") - val error: LoaderError = LoaderError.ConfigError(messages) + val error: ParseError = ParseError(messages) error.asLeft[List[Properties => Unit]] } } @@ -175,7 +178,6 @@ object StorageTarget { j.filterLevel, j.loginTimeout, j.loglevel, j.socketTimeout, j.ssl, j.sslMode, j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes)) - /** Reference to encrypted entity inside EC2 Parameter Store */ case class ParameterStoreConfig(parameterName: String) @@ -226,11 +228,11 @@ object StorageTarget { * @param validJson JSON that is presumably self-describing storage target configuration * @return validated entity of `StorageTarget` ADT if success */ - def decodeStorageTarget(validJson: SelfDescribingData[Json]): Either[ConfigError, StorageTarget] = + def decodeStorageTarget(validJson: SelfDescribingData[Json]): Either[ParseError, StorageTarget] = (validJson.schema.name, validJson.data) match { - case ("redshift_config", data) => data.as[RedshiftConfig].leftMap(e => ConfigError(e.show)) - case ("postgresql_config", data) => data.as[PostgresqlConfig].leftMap(e => ConfigError(e.show)) - case (name, _) => ConfigError(s"Unsupported storage target [$name]").asLeft + case ("redshift_config", data) => data.as[RedshiftConfig].leftMap(e => ParseError(e.show)) + case ("postgresql_config", data) => data.as[PostgresqlConfig].leftMap(e => ParseError(e.show)) + case (name, _) => ParseError(s"Unsupported storage target [$name]").asLeft } /** @@ -241,13 +243,45 @@ object StorageTarget { * @return valid `StorageTarget` OR * non-empty list of errors (such as validation or parse errors) */ - def parseTarget(client: Client[Id, Json], target: String): Either[ConfigError, StorageTarget] = + def parseTarget(client: Client[Id, Json], target: String): Either[ParseError, StorageTarget] = parse(target) - .leftMap(e => ConfigError(e.show)) - .flatMap(json => SelfDescribingData.parse(json).leftMap(e => ConfigError(s"Not a self-describing JSON, ${e.code}"))) + .leftMap(e => ParseError(e.show)) + .flatMap(json => SelfDescribingData.parse(json).leftMap(e => ParseError(s"Not a self-describing JSON, ${e.code}"))) .flatMap(payload => validate(client)(payload)) .flatMap(decodeStorageTarget) + implicit val postgresqlConfigDecoder: Decoder[PostgresqlConfig] = + deriveDecoder[PostgresqlConfig] + + implicit val redsfhitConfigDecoder: Decoder[RedshiftConfig] = + deriveDecoder[RedshiftConfig] + + implicit val encryptedConfigDecoder: Decoder[EncryptedConfig] = + deriveDecoder[EncryptedConfig] + + implicit val tunnerConfigDecoder: Decoder[TunnelConfig] = + deriveDecoder[TunnelConfig] + + implicit val bastionConfigDecoder: Decoder[BastionConfig] = + deriveDecoder[BastionConfig] + + implicit val destinationConfigDecoder: Decoder[DestinationConfig] = + deriveDecoder[DestinationConfig] + + implicit val parameterStoreConfigDecoder: Decoder[ParameterStoreConfig] = + deriveDecoder[ParameterStoreConfig] + + implicit val passwordConfigDecoder: Decoder[PasswordConfig] = + deriveDecoder[PasswordConfig] + + implicit val processingManifestConfigDecoder: Decoder[ProcessingManifestConfig] = + deriveDecoder[ProcessingManifestConfig] + + implicit val schemaCriterionConfigDecoder: Decoder[SchemaCriterion] = + Decoder.decodeString.emap { + s => SchemaCriterion.parse(s).toRight(s"Cannot parse [$s] as Iglu SchemaCriterion, it must have iglu:vendor/name/format/1-*-* format") + } + /** * Validate json4s JValue AST with Iglu Resolver and immediately convert it into circe AST * @@ -255,7 +289,7 @@ object StorageTarget { * @param json json4s AST * @return circe AST */ - private def validate(client: Client[Id, Json])(json: SelfDescribingData[Json]): Either[ConfigError, SelfDescribingData[Json]] = { - client.check(json).value.leftMap(e => ConfigError(e.show)).as(json) + private def validate(client: Client[Id, Json])(json: SelfDescribingData[Json]): Either[ParseError, SelfDescribingData[Json]] = { + client.check(json).value.leftMap(e => ParseError(e.show)).leftMap(error => ParseError(s"${json.schema.toSchemaUri} ${error.message}")).as(json) } } diff --git a/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StringEnum.scala b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StringEnum.scala new file mode 100644 index 000000000..5cdd1ceac --- /dev/null +++ b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StringEnum.scala @@ -0,0 +1,89 @@ +package com.snowplowanalytics.snowplow.rdbloader.common + +import scala.reflect.runtime.universe._ +import scala.reflect.runtime.{universe => ru} + +import cats.implicits._ + +import io.circe._ + +/** + * Common trait for all ADTs that can be read from string + * Must be extended by sealed hierarchy including only singletons + * Used by `decodeStringEnum` to get runtime representation of whole ADT + */ +trait StringEnum { + /** + * **IN** string representation. + * It should be used only to help read `StringEnum` from string + * and never other way round, such as render value into SQL statement + */ + def asString: String +} + +object StringEnum { + /** + * Derive decoder for ADT with `StringEnum` + * + * @tparam A sealed hierarchy + * @return circe decoder for ADT `A` + */ + def decodeStringEnum[A <: StringEnum: TypeTag]: Decoder[A] = + Decoder.instance(parseEnum[A]) + + /** + * Parse element of `StringEnum` sealed hierarchy from circe AST + * + * @param hCursor parser's cursor + * @tparam A sealed hierarchy + * @return either successful circe AST or decoding failure + */ + private def parseEnum[A <: StringEnum: TypeTag](hCursor: HCursor): Decoder.Result[A] = { + for { + string <- hCursor.as[String] + method = fromString[A](string) + result <- method.asDecodeResult(hCursor) + } yield result + } + + /** + * Parse element of `StringEnum` sealed hierarchy from String + * + * @param string line containing `asString` representation of `StringEnum` + * @tparam A sealed hierarchy + * @return either successful circe AST or decoding failure + */ + def fromString[A <: StringEnum: TypeTag](string: String): Either[String, A] = { + val map = sealedDescendants[A].map { o => (o.asString, o) }.toMap + map.get(string) match { + case Some(a) => Right(a) + case None => Left(s"Unknown ${typeOf[A].typeSymbol.name.toString} [$string]") + } + } + + /** + * Get all objects extending some sealed hierarchy + * @tparam Root some sealed trait with object descendants + * @return whole set of objects + */ + def sealedDescendants[Root: TypeTag]: Set[Root] = { + val symbol = typeOf[Root].typeSymbol + val internal = symbol.asInstanceOf[scala.reflect.internal.Symbols#Symbol] + val descendants = if (internal.isSealed) + Some(internal.sealedDescendants.map(_.asInstanceOf[Symbol]) - symbol) + else None + descendants.getOrElse(Set.empty).map(x => getCaseObject(x).asInstanceOf[Root]) + } + + private val m = ru.runtimeMirror(getClass.getClassLoader) + + /** + * Reflection method to get runtime object by compiler's `Symbol` + * @param desc compiler runtime `Symbol` + * @return "real" scala case object + */ + private def getCaseObject(desc: Symbol): Any = { + val mod = m.staticModule(desc.asClass.fullName) + m.reflectModule(mod).instance + } +} diff --git a/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala new file mode 100644 index 000000000..a534a9853 --- /dev/null +++ b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader + +import java.util.UUID + +import io.circe._ +import cats.Id +import cats.effect.Clock + +import scala.concurrent.duration.{MILLISECONDS, NANOSECONDS, TimeUnit} +import com.snowplowanalytics.snowplow.scalatracker.UUIDProvider + +package object common { + + implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] { + override def realTime(unit: TimeUnit): Id[Long] = + unit.convert(System.currentTimeMillis(), MILLISECONDS) + + override def monotonic(unit: TimeUnit): Id[Long] = + unit.convert(System.nanoTime(), NANOSECONDS) + } + + implicit val snowplowUuidIdInstance: UUIDProvider[Id] = new UUIDProvider[Id] { + def generateUUID: Id[UUID] = UUID.randomUUID() + } + + /** + * Syntax extension to transform `Either` with string as failure + * into circe-appropriate decoder result + */ + implicit class ParseErrorOps[A](val error: Either[String, A]) extends AnyVal { + def asDecodeResult(hCursor: HCursor): Decoder.Result[A] = error match { + case Right(success) => Right(success) + case Left(message) => Left(DecodingFailure(message, hCursor.history)) + } + } +} diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala b/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/StorageTargetSpec.scala similarity index 82% rename from src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala rename to common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/StorageTargetSpec.scala index 165b1633b..b07233478 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala +++ b/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/StorageTargetSpec.scala @@ -9,8 +9,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.rdbloader -package config +package com.snowplowanalytics.snowplow.rdbloader.common import cats.Id import io.circe.literal._ @@ -18,7 +17,8 @@ import io.circe.literal._ // specs2 import org.specs2.Specification -// Iglu client +// Iglu +import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.iglu.client.Client class StorageTargetSpec extends Specification { def is = s2""" @@ -29,8 +29,11 @@ class StorageTargetSpec extends Specification { def is = s2""" Fail to parse old Redshift storage target (3-0-0) with encrypted password $e5 Parse Redshift storage target (3-0-0) with many JDBC options $e6 Fail to parse Redshift storage target (3-0-0) with wrong JDBC value $e7 + Parse Redshift storage target (4-0-0) with tabular blacklist $e8 """ + private val IgluCentral = "http://iglucentral-dev.com.s3-website-us-east-1.amazonaws.com/feature/rdb-blacklist/" + private val resolverConfig = json""" { @@ -44,7 +47,7 @@ class StorageTargetSpec extends Specification { def is = s2""" "vendorPrefixes": [ "com.snowplowanalytics" ], "connection": { "http": { - "uri": "http://iglucentral.com" + "uri": $IgluCentral } } }, @@ -91,11 +94,12 @@ class StorageTargetSpec extends Specification { def is = s2""" "mydatabase.host.acme.com", "ADD HERE", 5432, - StorageTarget.Disable, + StorageTarget.SslMode.Disable, "atomic", "ADD HERE", StorageTarget.PlainText("ADD HERE"), None, + None, None) parseWithDefaultResolver(config) must beRight(expected) @@ -131,7 +135,7 @@ class StorageTargetSpec extends Specification { def is = s2""" "example.host", "ADD HERE", 5439, - SpecHelpers.enableSsl, + StorageTargetSpec.enableSsl, "arn:aws:iam::123456789876:role/RedshiftLoadRole", "atomic", "ADD HERE", @@ -139,6 +143,7 @@ class StorageTargetSpec extends Specification { def is = s2""" 1, 20000, None, + None, None) parseWithDefaultResolver(config) must beRight(expected) @@ -195,7 +200,7 @@ class StorageTargetSpec extends Specification { def is = s2""" "example.com", "ADD HERE", 5439, - SpecHelpers.enableSsl, + StorageTargetSpec.enableSsl, "arn:aws:iam::123456789876:role/RedshiftLoadRole", "atomic", "ADD HERE", @@ -203,6 +208,7 @@ class StorageTargetSpec extends Specification { def is = s2""" 1, 20000, Some(tunnel), + None, None) parseWithDefaultResolver(config) must beRight(expected) @@ -250,6 +256,7 @@ class StorageTargetSpec extends Specification { def is = s2""" 1, 20000, None, + None, None) parseWithDefaultResolver(config) must beRight(expected) @@ -345,6 +352,7 @@ class StorageTargetSpec extends Specification { def is = s2""" 1, 20000, None, + None, None) parseWithDefaultResolver(config) must beRight(expected) @@ -381,8 +389,67 @@ class StorageTargetSpec extends Specification { def is = s2""" """.stripMargin parseWithDefaultResolver(config) must beLeft.like { - case LoaderError.ConfigError(message) => message must contain("$.jdbc.sslMode: does not have a value in the enumeration [verify-ca, verify-full]") + case StorageTarget.ParseError(message) => message must contain("$.jdbc.sslMode: does not have a value in the enumeration [verify-ca, verify-full]") case _ => ko("Not a DecodingError") } } + + def e8 = { + val config = """ + |{ + | "schema": "iglu:com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/4-0-0", + | "data": { + | "name": "AWS Redshift enriched events storage", + | "id": "33334444-eee7-4845-a7e6-8fdc88d599d0", + | "host": "192.168.1.12", + | "database": "ADD HERE", + | "port": 5439, + | "jdbc": {}, + | "processingManifest": null, + | "sshTunnel": null, + | "username": "ADD HERE", + | "password": { + | "ec2ParameterStore": { + | "parameterName": "snowplow.rdbloader.redshift.password" + | } + | }, + | "roleArn": "arn:aws:iam::123456789876:role/RedshiftLoadRole", + | "schema": "atomic", + | "maxError": 1, + | "compRows": 20000, + | "blacklistTabular": [ + | "iglu:com.acme/event/jsonschema/1-*-*", + | "iglu:com.acme/context/jsonschema/2-*-*" + | ], + | "purpose": "ENRICHED_EVENTS" + | } + |} + """.stripMargin + + val expected = StorageTarget.RedshiftConfig( + "33334444-eee7-4845-a7e6-8fdc88d599d0", + "AWS Redshift enriched events storage", + "192.168.1.12", + "ADD HERE", + 5439, + StorageTarget.RedshiftJdbc.empty, + "arn:aws:iam::123456789876:role/RedshiftLoadRole", + "atomic", + "ADD HERE", + StorageTarget.EncryptedKey(StorageTarget.EncryptedConfig(StorageTarget.ParameterStoreConfig("snowplow.rdbloader.redshift.password"))), + 1, + 20000, + None, + None, + Some(List( + SchemaCriterion("com.acme", "event", "jsonschema", Some(1), None, None), + SchemaCriterion("com.acme", "context", "jsonschema", Some(2), None, None) + ))) + + parseWithDefaultResolver(config) must beRight(expected) + } +} + +object StorageTargetSpec { + val enableSsl = StorageTarget.RedshiftJdbc.empty.copy(ssl = Some(true)) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 56c5c05fe..b0642b07f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -44,7 +44,6 @@ object Dependencies { val scalaCheck = "1.12.6" } - val resolutionRepos = Seq( // Redshift native driver "redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release", diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/BadRow.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/BadRow.scala index bebc6837d..6677d7939 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/BadRow.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/BadRow.scala @@ -21,10 +21,12 @@ import io.circe.literal._ import com.snowplowanalytics.iglu.client.ClientError import com.snowplowanalytics.iglu.core.{ SchemaKey, SchemaVer, SelfDescribingData } import com.snowplowanalytics.iglu.core.circe.implicits._ + import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.FlatteningError -sealed trait BadRow { +sealed trait BadRow extends Product with Serializable { def toData: SelfDescribingData[Json] def toCompactJson: String = @@ -37,22 +39,29 @@ object BadRow { val IgluErrorSchema = SchemaKey("com.snowplowanalytics.snowplow.badrows", "loader_iglu_error", "jsonschema", SchemaVer.Full(1, 0, 0)) val RuntimeErrorSchema = SchemaKey("com.snowplowanalytics.snowplow.badrows", "loader_runtime_error", "jsonschema", SchemaVer.Full(1, 0, 0)) - case class ShreddingError(payload: String, errors: NonEmptyList[String]) extends BadRow { + val ShreddingErrorSchema = SchemaKey("com.snowplowanalytics.snowplow.badrows", "shredding_error", "jsonschema", SchemaVer.Full(1, 0, 0)) + + final case class ShreddingError(payload: String, errors: NonEmptyList[String]) extends BadRow { def toData: SelfDescribingData[Json] = SelfDescribingData[Json](ParsingErrorSchema, json"""{"payload": $payload, "errors": $errors}""") } - case class ValidationError(original: Event, errors: NonEmptyList[SchemaError]) extends BadRow { + final case class ValidationError(original: Event, errors: NonEmptyList[SchemaError]) extends BadRow { def toData: SelfDescribingData[Json] = SelfDescribingData[Json](IgluErrorSchema, json"""{"event": $original, "errors": $errors}""") } - case class RuntimeError(original: Event, error: String) extends BadRow { + final case class RuntimeError(original: Event, error: String) extends BadRow { def toData: SelfDescribingData[Json] = SelfDescribingData[Json](RuntimeErrorSchema, json"""{"event": $original, "error": $error}""") } - case class SchemaError(schema: SchemaKey, error: ClientError) + final case class EntityShreddingError(original: Event, errors: NonEmptyList[FlatteningError]) extends BadRow { // TODO: won't compile - add schema + def toData: SelfDescribingData[Json] = + SelfDescribingData[Json](ShreddingErrorSchema, json"""{"event": $original, "errors": $errors}""") + } + + final case class SchemaError(schema: SchemaKey, error: ClientError) implicit val schemaErrorCirceJsonEncoder: Encoder[SchemaError] = Encoder.instance { case SchemaError(schema, error) => diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala index 4e2c9600f..a5db1d9ae 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala @@ -31,6 +31,7 @@ import com.snowplowanalytics.manifest.core.{Application, ManifestError} import com.snowplowanalytics.manifest.dynamodb.DynamoDbManifest import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata +import com.snowplowanalytics.snowplow.rdbloader.common._ import scala.concurrent.duration.TimeUnit diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala index c428cea8c..9fa1f5dcb 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala @@ -18,11 +18,13 @@ package storage.spark import cats.Id import cats.instances.list._ +import cats.syntax.traverse._ import cats.syntax.show._ import cats.syntax.either._ import cats.syntax.foldable._ -import io.circe.Json +import io.circe.{ Json, Encoder } +import io.circe.syntax._ import io.circe.literal._ import java.util.UUID @@ -56,9 +58,7 @@ import com.snowplowanalytics.iglu.core.SchemaVer import com.snowplowanalytics.iglu.core.{ SchemaKey, SelfDescribingData } import com.snowplowanalytics.iglu.client.{ Client, ClientError } import DynamodbManifest.ShredderManifest - -case class FatalEtlError(msg: String) extends Error(msg) -case class UnexpectedEtlException(msg: String) extends Error(msg) +import rdbloader.common._ /** Helpers method for the shred job */ object ShredJob extends SparkJob { @@ -69,39 +69,53 @@ object ShredJob extends SparkJob { val AtomicSchema = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) - case class Hierarchy(eventId: UUID, collectorTstamp: Instant, entity: SelfDescribingData[Json]) { - def dumpJson: String = json""" - { - "schema": { - "vendor": ${entity.schema.vendor}, - "name": ${entity.schema.name}, - "format": ${entity.schema.format}, - "version": ${entity.schema.version.asString} - }, - "data": ${entity.data}, - "hierarchy": { - "rootId": $eventId, - "rootTstamp": ${collectorTstamp.formatted}, - "refRoot": "events", - "refTree": ["events", ${entity.schema.name}], - "refParent":"events" - } - }""".noSpaces + /** Final stage of event. After this, it can be shredded into different folders */ + case class FinalRow(atomic: Row, shredded: List[Shredded]) + + case class Hierarchy(eventId: UUID, collectorTstamp: Instant, entity: SelfDescribingData[Json]) { self => + def dumpJson: String = self.asJson.noSpaces } + object Hierarchy { + implicit val hierarchyCirceEncoder: Encoder[Hierarchy] = + Encoder.instance { h => + json"""{ + "schema": { + "vendor": ${h.entity.schema.vendor}, + "name": ${h.entity.schema.name}, + "format": ${h.entity.schema.format}, + "version": ${h.entity.schema.version.asString} + }, + "data": ${h.entity.data}, + "hierarchy": { + "rootId": ${h.eventId}, + "rootTstamp": ${h.collectorTstamp.formatted}, + "refRoot": "events", + "refTree": ["events", ${h.entity.schema.name}], + "refParent":"events" + } + }""" + } + + def fromEvent(event: Event): List[Hierarchy] = + getEntities(event).map(json => Hierarchy(event.event_id, event.collector_tstamp, json)) + } + + case class FatalEtlError(msg: String) extends Error(msg) + case class UnexpectedEtlException(msg: String) extends Error(msg) + def getEntities(event: Event): List[SelfDescribingData[Json]] = event.unstruct_event.data.toList ++ event.derived_contexts.data ++ event.contexts.data - def getShreddedEntities(event: Event): List[Hierarchy] = - getEntities(event).map(json => Hierarchy(event.event_id, event.collector_tstamp, json)) - private[spark] val classesToRegister: Array[Class[_]] = Array( classOf[Array[String]], classOf[SchemaKey], classOf[SelfDescribingData[_]], classOf[Event], + classOf[Hierarchy], + classOf[FinalRow], classOf[Instant], classOf[com.snowplowanalytics.iglu.core.SchemaVer$Full], classOf[io.circe.JsonObject$LinkedHashMapJsonObject], @@ -149,7 +163,7 @@ object ShredJob extends SparkJob { ShredderManifest(DynamodbManifest.initialize(m, resolver.cacheless), i) } - val atomicLengths = singleton.ResolverSingleton.get(shredConfig.igluConfig).resolver.lookupSchema(AtomicSchema) match { // TODO: retry + val atomicLengths = singleton.IgluSingleton.get(shredConfig.igluConfig).resolver.lookupSchema(AtomicSchema) match { // TODO: retry case Right(schema) => EventUtils.getAtomicLengths(schema).fold(e => throw new RuntimeException(e), identity) case Left(error) => @@ -282,6 +296,13 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) if (shredConfig.dynamodbManifestTable.isEmpty) () else shreddedTypes.add(inventory.map(_.toSchemaUri)) + /** Check if `shredType` should be transformed into TSV */ + def isTabular(shredType: SchemaKey): Boolean = + shredConfig.storage.flatMap(_.blacklistTabular) match { + case Some(blacklist) => !blacklist.exists(criterion => criterion.matches(shredType)) + case None => false + } + /** * Runs the shred job by: * - shredding the Snowplow enriched events @@ -294,6 +315,23 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) jsonOnly: Boolean): Unit = { import ShredJob._ + def shred(event: Event): Either[BadRow, FinalRow] = + Hierarchy.fromEvent(event).traverse { hierarchy => + val tabular = isTabular(hierarchy.entity.schema) + Shredded.fromHierarchy(tabular, singleton.IgluSingleton.get(shredConfig.igluConfig).resolver)(hierarchy).toValidatedNel + }.leftMap(errors => BadRow.EntityShreddingError(event, errors)).toEither.map { shredded => + val row = Row(EventUtils.alterEnrichedEvent(event, atomicLengths)) + FinalRow(row, shredded) + } + + def writeShredded(data: RDD[(String, String, String, String, String)], json: Boolean): Unit = + data + .toDF("vendor", "name", "format", "version", "data") + .write + .partitionBy("vendor", "name", "format", "version") + .mode(SaveMode.Append) + .text(getShreddedTypesOutputPath(shredConfig.outFolder, json)) + val input = sc.textFile(shredConfig.inFolder) // Enriched TSV lines along with their shredded components @@ -303,9 +341,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) .cache() // Handling of malformed rows; drop good, turn malformed into `BadRow` - val bad = common - .flatMap { shredded => shredded.swap.toOption } - .map { badRow => Row(badRow.toCompactJson) } + val bad = common.flatMap { shredded => shredded.swap.toOption.map(bad => Row(bad.toCompactJson)) } // Handling of properly-formed rows; drop bad, turn proper events to `Event` // Pefrorm in-batch and cross-batch natural deduplications and writes found types to accumulator @@ -348,25 +384,31 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) .cache() val uniqueGood = identifiedSyntheticDupes.flatMap { - case (_, (shredded, None)) => Some(shredded) + case (_, (event, None)) => Some(event) case _ => None }.setName("uniqueGood") // Avoid recomputing UUID at all costs in order to not create orphan shredded entities val syntheticDupedGood = identifiedSyntheticDupes.flatMap { - case (_, (shredded, Some(_))) => + case (_, (event, Some(_))) => val newEventId = UUID.randomUUID() - val newContext = SelfDescribingData(DuplicateSchema, json"""{"originalEventId":${shredded.event_id}}""") - val updatedContexts = newContext :: shredded.derived_contexts.data - Some(shredded.copy(event_id = newEventId, derived_contexts = Contexts(updatedContexts))) + val newContext = SelfDescribingData(DuplicateSchema, json"""{"originalEventId":${event.event_id}}""") + val updatedContexts = newContext :: event.derived_contexts.data + Some(event.copy(event_id = newEventId, derived_contexts = Contexts(updatedContexts))) case _ => None }.persist(StorageLevel.MEMORY_AND_DISK_SER).setName("syntheticDupedGood") - val goodWithSyntheticDupes = (uniqueGood ++ syntheticDupedGood).cache().setName("goodWithSyntheticDupes") + val withSyntheticDupes = (uniqueGood ++ syntheticDupedGood) + .map(shred).cache().setName("withSyntheticDupes") + + val goodWithSyntheticDupes = withSyntheticDupes.flatMap(_.toOption) // Ready the events for database load - val events = goodWithSyntheticDupes.map(e => Row(EventUtils.alterEnrichedEvent(e, atomicLengths))) + val events = goodWithSyntheticDupes.map(_.atomic) + + // Update the shredded JSONs with the new deduplicated event IDs and stringify + val shredded = goodWithSyntheticDupes.flatMap(_.shredded) // Write as strings to `atomic-events` directory spark.createDataFrame(events, StructType(StructField("_", StringType, true) :: Nil)) @@ -374,34 +416,22 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) .mode(SaveMode.Overwrite) .text(getAlteredEnrichedOutputPath(shredConfig.outFolder)) - // Update the shredded JSONs with the new deduplicated event IDs and stringify - val shredded = goodWithSyntheticDupes - .flatMap(getShreddedEntities) - .map(Shredded.fromHierarchy(jsonOnly, singleton.IgluSingleton.get(shredConfig.igluConfig).resolver)) - - if (jsonOnly) { - val jsons = shredded.map(s => s.json.getOrElse(throw FatalEtlError(s"Inconsistent configuration. Can be shredded only into JSON. Got $s"))) - writeShredded(jsons, true) - } else { // Partition TSV and JSON - writeShredded(shredded.flatMap(_.json), true) - writeShredded(shredded.flatMap(_.tabular), false) + // Final output + shredConfig.storage.flatMap(_.blacklistTabular).map(_.nonEmpty) match { + case Some(true) | None => writeShredded(shredded.flatMap(_.json), true) + case Some(false) => () } - - def writeShredded(data: RDD[(String, String, String, String, String)], json: Boolean): Unit = - data - .toDF("vendor", "name", "format", "version", "data") - .write - .partitionBy("vendor", "name", "format", "version") - .mode(SaveMode.Append) - .text(getShreddedTypesOutputPath(shredConfig.outFolder, json)) + writeShredded(shredded.flatMap(_.tabular), false) // Deduplication operation failed due to DynamoDB val dupeFailed = good.flatMap { case (_, Left(m)) => Some(Row(m.toCompactJson)) case _ => None } + // Data that failed TSV transformation + val shreddedBad = withSyntheticDupes.flatMap(_.swap.toOption.map(bad => Row(bad.toCompactJson))) - spark.createDataFrame(bad ++ dupeFailed, StructType(StructField("_", StringType, true) :: Nil)) + spark.createDataFrame(bad ++ dupeFailed ++ shreddedBad, StructType(StructField("_", StringType, true) :: Nil)) .write .mode(SaveMode.Overwrite) .text(shredConfig.badFolder) diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala index 9635b1364..0656d24d1 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala @@ -30,6 +30,7 @@ import org.apache.commons.codec.DecoderException import org.apache.commons.codec.binary.Base64 import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata +import com.snowplowanalytics.snowplow.rdbloader.common._ /** * Case class representing the configuration for the shred job. @@ -37,7 +38,6 @@ import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata * @param outFolder Output folder where the shredded events will be stored * @param badFolder Output folder where the malformed events will be stored * @param igluConfig JSON representing the Iglu configuration - * @param jsonOnly don't try to produce TSV output */ case class ShredJobConfig(inFolder: String, outFolder: String, @@ -46,7 +46,7 @@ case class ShredJobConfig(inFolder: String, duplicateStorageConfig: Option[Json], dynamodbManifestTable: Option[String], itemId: Option[String], - jsonOnly: Boolean) { + storage: Option[StorageTarget]) { /** Get both manifest table and item id to process */ def getManifestData: Option[(String, String)] = @@ -94,16 +94,25 @@ object ShredJobConfig { val itemId = Opts.option[String]("item-id", "Unique folder identificator for processing manifest (e.g. S3 URL)", metavar = "").orNone - val jsonOnly = Opts.flag("json-only", "Do not produce tabular output").orFalse + val storageTarget = Opts.option[String]("target", + "base64-encoded string with single storage target configuration JSON", "t", "target.json") + .mapValidated(Base64Json.decode).orNone - val shredJobConfig = (inputFolder, outputFolder, badFolder, igluConfig, duplicateStorageConfig, processingManifestTable, itemId, jsonOnly).mapN { - (input, output, bad, iglu, dupeStorage, manifest, itemId, jsonOnly) => ShredJobConfig(input, output, bad, iglu, dupeStorage, manifest, itemId, jsonOnly) + val shredJobConfig = (inputFolder, outputFolder, badFolder, igluConfig, duplicateStorageConfig, processingManifestTable, itemId, storageTarget).mapN { + (input, output, bad, iglu, dupeStorage, manifest, itemId, target) => (ShredJobConfig(input, output, bad, iglu, dupeStorage, manifest, itemId, None), target) }.validate("--item-id and --processing-manifest-table must be either both provided or both absent") { - case ShredJobConfig(_, _, _, _, _, manifest, i, _) => (manifest.isDefined && i.isDefined) || (manifest.isEmpty && i.isEmpty) + case (ShredJobConfig(_, _, _, _, _, manifest, i, _), _) => (manifest.isDefined && i.isDefined) || (manifest.isEmpty && i.isEmpty) case _ => false + }.mapValidated { + case (config, Some(target)) => + val client = singleton.IgluSingleton.get(config.igluConfig) + StorageTarget.parseTarget(client, target.noSpaces) + .leftMap(_.message) + .map(storage => config.copy(storage = Some(storage))) + .toValidatedNel + case (config, None) => config.validNel } - val command = Command(s"${ProjectMetadata.shredderName}-${ProjectMetadata.shredderVersion}", "Apache Spark job to prepare Snowplow enriched data to being loaded into Amazon Redshift warehouse")(shredJobConfig) diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/Shredded.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/Shredded.scala index aeeda1110..658ece2c9 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/Shredded.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/Shredded.scala @@ -15,8 +15,12 @@ package com.snowplowanalytics.snowplow.storage.spark import cats.Id +import cats.data.EitherT +import cats.syntax.either._ import com.snowplowanalytics.iglu.client.Resolver + +import com.snowplowanalytics.snowplow.rdbloader.common._ import com.snowplowanalytics.snowplow.storage.spark.ShredJob.Hierarchy /** ADT, representing possible forms of data in blob storage */ @@ -34,10 +38,10 @@ sealed trait Shredded { object Shredded { - /** Data will be present as JSON, with RDB Loader loading it using JSON Paths. Legacy format */ + /** Data will be represented as JSON, with RDB Loader loading it using JSON Paths. Legacy format */ case class Json(vendor: String, name: String, format: String, version: String, data: String) extends Shredded - /** Data will be present as TSV, with RDB Loader loading it directly */ + /** Data will be represented as TSV, with RDB Loader loading it directly */ case class Tabular(vendor: String, name: String, format: String, version: String, data: String) extends Shredded /** @@ -45,23 +49,20 @@ object Shredded { * specifying how it should look like in destination: JSON or TSV * If flattening algorithm failed at any point - it will fallback to the JSON format * - * @param jsonOnly output can only be JSON. All downstream components should agree on that + * @param tabular whether data should be transformed into TSV format * @param resolver Iglu resolver to request all necessary entities * @param hierarchy actual JSON hierarchy from an enriched event */ - def fromHierarchy(jsonOnly: Boolean, resolver: => Resolver[Id])(hierarchy: Hierarchy): Shredded = { + def fromHierarchy(tabular: Boolean, resolver: => Resolver[Id])(hierarchy: Hierarchy): Either[Flattening.FlatteningError, Shredded] = { val vendor = hierarchy.entity.schema.vendor val name = hierarchy.entity.schema.name val format = hierarchy.entity.schema.format - if (jsonOnly) - Json(vendor, name, format, hierarchy.entity.schema.version.asString, hierarchy.dumpJson) - else - EventUtils.flatten(resolver, hierarchy.entity).value match { - case Right(columns) => - val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema) - Tabular(vendor, name, format, hierarchy.entity.schema.version.model.toString, (meta ++ columns).mkString("\t")) - case Left(_) => - Json(vendor, name, format, hierarchy.entity.schema.version.asString, hierarchy.dumpJson) - } + val result: EitherT[Id, Flattening.FlatteningError, Shredded] = + if (tabular) EventUtils.flatten(resolver, hierarchy.entity).map { columns => + val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema) + Tabular(vendor, name, format, hierarchy.entity.schema.version.model.toString, (meta ++ columns).mkString("\t")) + } else EitherT.pure[Id, Flattening.FlatteningError](Json(vendor, name, format, hierarchy.entity.schema.version.asString, hierarchy.dumpJson)) + + result.value } } \ No newline at end of file diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/package.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/package.scala index 604830ea2..6edbf9fec 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/package.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/package.scala @@ -14,14 +14,10 @@ */ package com.snowplowanalytics.snowplow.storage -import scala.concurrent.duration.TimeUnit - import java.time.{Instant, ZoneOffset} import java.time.format.DateTimeFormatter -import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS} import cats.Id -import cats.effect.Clock import io.circe.Json @@ -48,10 +44,4 @@ package object spark { time.atOffset(ZoneOffset.UTC).format(Formatter) } } - - - implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] { - def realTime(unit: TimeUnit): Id[Long] = unit.convert(System.currentTimeMillis(), MILLISECONDS) - def monotonic(unit: TimeUnit): Id[Long] = unit.convert(System.nanoTime(), NANOSECONDS) - } } diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala index 8bfee01e1..6135352a2 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala @@ -45,7 +45,7 @@ object singleton { synchronized { if (instance == null) { instance = getIgluClient(igluConfig) - .valueOr(e => throw FatalEtlError(e.toString)) + .valueOr(e => throw ShredJob.FatalEtlError(e.toString)) } } } @@ -83,7 +83,7 @@ object singleton { .withEndpointConfiguration(new EndpointConfiguration("http://localhost:8000", region)) .build() Some(new DynamoDbManifest(client, table)) - case Some(config) => EventsManifest.initStorage(config).fold(e => throw FatalEtlError(e.toString), _.some) + case Some(config) => EventsManifest.initStorage(config).fold(e => throw ShredJob.FatalEtlError(e.toString), _.some) case None => None } } diff --git a/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala b/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala index cdc7abf67..61008f2ea 100644 --- a/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala +++ b/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala @@ -83,7 +83,12 @@ object ShredJobSpec { def readPartFile(root: File, relativePath: String): Option[(List[String], String)] = { val files = listFilesWithExclusions(new File(root, relativePath), List.empty) .filter(s => s.contains("part-")) - def read(f: String): List[String] = Source.fromFile(new File(f)).getLines.toList + def read(f: String): List[String] = { + val source = Source.fromFile(new File(f)) + val lines = source.getLines.toList + source.close() + lines + } files.foldLeft[Option[(List[String], String)]](None) { (acc, f) => val accValue = acc.getOrElse((List.empty, "")) val contents = accValue._1 ++ read(f) @@ -306,7 +311,7 @@ trait ShredJobSpec extends SparkSpec { val (dedupeConfigCli, dedupeConfig) = if (crossBatchDedupe) { val encoder = new Base64(true) val encoded = new String(encoder.encode(duplicateStorageConfig.noSpaces.getBytes())) - val config = SelfDescribingData.parse(duplicateStorageConfig).leftMap(_.code).flatMap(EventsManifestConfig.DynamoDb.extract).valueOr(e => throw FatalEtlError(e)) + val config = SelfDescribingData.parse(duplicateStorageConfig).leftMap(_.code).flatMap(EventsManifestConfig.DynamoDb.extract).valueOr(e => throw ShredJob.FatalEtlError(e)) (Array("--duplicate-storage-config", encoded), Some(config)) } else { (Array.empty[String], None) @@ -327,13 +332,14 @@ trait ShredJobSpec extends SparkSpec { "--input-folder", input.toString(), "--output-folder", dirs.output.toString(), "--bad-folder", dirs.badRows.toString(), - "--iglu-config", igluConfigWithLocal + "--iglu-config", igluConfigWithLocal, + "--target", storageConfig ) val (dedupeConfigCli, dedupeConfig) = if (crossBatchDedupe) { val encoder = new Base64(true) val encoded = new String(encoder.encode(duplicateStorageConfig.noSpaces.getBytes())) - val config = SelfDescribingData.parse(duplicateStorageConfig).leftMap(_.code).flatMap(EventsManifestConfig.DynamoDb.extract).valueOr(e => throw FatalEtlError(e)) + val config = SelfDescribingData.parse(duplicateStorageConfig).leftMap(_.code).flatMap(EventsManifestConfig.DynamoDb.extract).valueOr(e => throw ShredJob.FatalEtlError(e)) (Array("--duplicate-storage-config", encoded), Some(config)) } else { (Array.empty[String], None) @@ -348,6 +354,34 @@ trait ShredJobSpec extends SparkSpec { deleteRecursively(input) } + val storageConfig = { + val encoder = new Base64(true) + new String(encoder.encode("""{ + "schema": "iglu:com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/4-0-0", + "data": { + "name": "AWS Redshift enriched events storage", + "id": "33334444-eee7-4845-a7e6-8fdc88d599d0", + "host": "192.168.1.12", + "database": "ADD HERE", + "port": 5439, + "jdbc": {}, + "processingManifest": null, + "sshTunnel": null, + "username": "ADD HERE", + "password": { + "ec2ParameterStore": { + "parameterName": "snowplow.rdbloader.redshift.password" + } + }, + "roleArn": "arn:aws:iam::123456789876:role/RedshiftLoadRole", + "schema": "atomic", + "maxError": 1, + "compRows": 20000, + "blacklistTabular": [], + "purpose": "ENRICHED_EVENTS" + } + } """.stripMargin.replaceAll("[\n\r]","").getBytes())) + } override def afterAll(): Unit = { super.afterAll() diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala index b6d648333..673740a08 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala @@ -16,7 +16,7 @@ import cats.Functor import cats.data.EitherT import cats.implicits._ -import config.StorageTarget.TunnelConfig +import common.StorageTarget.TunnelConfig /** Functions working with identities and security layers */ object Security { diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala index 5834b2efb..b3e263640 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala @@ -20,12 +20,12 @@ import cats.Id import cats.data._ import cats.implicits._ -import com.monovore.decline.{ Opts, Argument, Command } - +import com.monovore.decline.{Argument, Command, Opts} import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.snowplow.rdbloader.common.{ StorageTarget, StringEnum } import io.circe.Json -import io.circe.parser.{ parse => parseJson } +import io.circe.parser.{parse => parseJson} // This project import LoaderError._ @@ -115,9 +115,6 @@ object CliConfig { folder: Option[String], dryRun: Boolean) - // Always invalid initial parsing configuration - private[this] val rawCliConfig = RawConfig("", "", "", Nil, Nil, None, None, false) - type Parsed[A] = ValidatedNel[ConfigError, A] /** Wrapper for any Base64-encoded entity */ @@ -138,7 +135,7 @@ object CliConfig { implicit def includeStepsArgumentInstance: Argument[Set[Step.IncludeStep]] = new Argument[Set[Step.IncludeStep]] { def read(string: String): ValidatedNel[String, Set[Step.IncludeStep]] = - string.split(",").toList.traverse(utils.Common.fromString[Step.IncludeStep](_).toValidatedNel).map(_.toSet) + string.split(",").toList.traverse(StringEnum.fromString[Step.IncludeStep](_).toValidatedNel).map(_.toSet) def defaultMetavar: String = "steps" } @@ -146,7 +143,7 @@ object CliConfig { implicit def skipStepsArgumentInstance: Argument[Set[Step.SkipStep]] = new Argument[Set[Step.SkipStep]] { def read(string: String): ValidatedNel[String, Set[Step.SkipStep]] = - string.split(",").toList.traverse(utils.Common.fromString[Step.SkipStep](_).toValidatedNel).map(_.toSet) + string.split(",").toList.traverse(StringEnum.fromString[Step.SkipStep](_).toValidatedNel).map(_.toSet) def defaultMetavar: String = "steps" } @@ -200,5 +197,5 @@ object CliConfig { * or successfully decoded storage target */ private def loadTarget(resolver: Client[Id, Json], targetConfigB64: String) = - base64decode(targetConfigB64).flatMap(StorageTarget.parseTarget(resolver, _)) + base64decode(targetConfigB64).flatMap(StorageTarget.parseTarget(resolver, _).leftMap(e => LoaderError.ConfigError(e.message))) } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/SnowplowConfig.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/SnowplowConfig.scala index cb6c6f18e..f8b35be6f 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/SnowplowConfig.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/SnowplowConfig.scala @@ -22,10 +22,11 @@ import io.circe.generic.extras.decoding.ConfiguredDecoder import io.circe.yaml.parser // This project +import common.StringEnum + import S3._ import Semver._ import LoaderError._ -import utils.Common._ /** * FullDiscovery Snowplow `config.yml` runtime representation @@ -123,10 +124,10 @@ object SnowplowConfig { Configuration.default.withSnakeCaseMemberNames implicit val decodeTrackerMethod: Decoder[TrackerMethod] = - decodeStringEnum[TrackerMethod] + StringEnum.decodeStringEnum[TrackerMethod] implicit val decodeLoggingLevel: Decoder[LoggingLevel] = - decodeStringEnum[LoggingLevel] + StringEnum.decodeStringEnum[LoggingLevel] implicit val snowplowMonitoringDecoder: Decoder[SnowplowMonitoring] = ConfiguredDecoder.decodeCaseClass @@ -135,7 +136,7 @@ object SnowplowConfig { ConfiguredDecoder.decodeCaseClass implicit val decodeOutputCompression: Decoder[OutputCompression] = - decodeStringEnum[OutputCompression] + StringEnum.decodeStringEnum[OutputCompression] implicit val enrichDecoder: Decoder[Enrich] = ConfiguredDecoder.decodeCaseClass diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala index 530833de1..93166bbfa 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala @@ -13,8 +13,7 @@ package com.snowplowanalytics.snowplow.rdbloader package config -// This project -import utils.Common._ +import common.StringEnum /** * Step is part of loading process or result SQL-statement @@ -52,7 +51,7 @@ object Step { case object Load extends DefaultStep /** Steps included into app by default */ - val defaultSteps: Set[Step] = sealedDescendants[SkipStep] ++ Set.empty[Step] + val defaultSteps: Set[Step] = StringEnum.sealedDescendants[SkipStep] ++ Set.empty[Step] /** * Remove explicitly disabled steps and add optional steps diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala index 20b930a5c..9c1694b13 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala @@ -34,15 +34,16 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker import com.snowplowanalytics.manifest.core.{ManifestError, ProcessingManifest, LockHandler} // This project -import config.CliConfig +import common._ + import LoaderA._ +import config.CliConfig import LoaderError.LoaderLocalError import Interpreter.runIO import loaders.Common.SqlString import discovery.{ ManifestDiscovery, DiscoveryFailure } import implementations.{S3Interpreter, TrackerInterpreter, ManifestInterpreter} import implementations.ManifestInterpreter.ManifestE -import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.getOrdered /** @@ -168,7 +169,7 @@ class DryRunInterpreter private[interpreters](cliConfig: CliConfig, Right(name + " key") case GetSchemas(vendor, name, model) => - getOrdered(igluClient.resolver, vendor, name, model).leftMap { resolutionError => + Flattening.getOrdered(igluClient.resolver, vendor, name, model).leftMap { resolutionError => val message = s"Cannot get schemas for iglu:$vendor/$name/jsonschema/$model-*-*\n$resolutionError" LoaderError.DiscoveryError(DiscoveryFailure.IgluError(message)) }.value diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala index 14c67472d..4fd9ea36a 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala @@ -36,6 +36,7 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker import com.snowplowanalytics.manifest.core.ManifestError // This project +import common._ import LoaderA._ import LoaderError.LoaderLocalError import Interpreter.runIO @@ -45,7 +46,6 @@ import utils.Common import implementations._ import com.snowplowanalytics.snowplow.rdbloader.{ Log => ExitLog } import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.SqlString -import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.getOrdered /** * Interpreter performs all actual side-effecting work, @@ -228,7 +228,7 @@ class RealWorldInterpreter private[interpreters](cliConfig: CliConfig, SshInterpreter.getKey(name) case GetSchemas(vendor, name, model) => - getOrdered(igluClient.resolver, vendor, name, model).leftMap { resolutionError => + Flattening.getOrdered(igluClient.resolver, vendor, name, model).leftMap { resolutionError => val message = s"Cannot get schemas for iglu:$vendor/$name/jsonschema/$model-*-*\n$resolutionError" LoaderError.DiscoveryError(DiscoveryFailure.IgluError(message)) }.value diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala index 8c6478cd1..41817fba3 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala @@ -19,18 +19,14 @@ import java.sql.{Connection, SQLException} import java.util.Properties import scala.util.control.NonFatal - import com.amazon.redshift.jdbc42.{Driver => RedshiftDriver} - import cats.implicits._ - import org.postgresql.copy.CopyManager import org.postgresql.jdbc.PgConnection import org.postgresql.{Driver => PgDriver} - import LoaderError.StorageTargetError +import com.snowplowanalytics.snowplow.rdbloader.common.StorageTarget import db.Decoder -import config.StorageTarget import loaders.Common.SqlString object JdbcInterpreter { @@ -122,7 +118,7 @@ object JdbcInterpreter { for { _ <- r.jdbc.validation match { - case Left(error) => error.asLeft + case Left(error) => LoaderError.ConfigError(error.message).asLeft case Right(propertyUpdaters) => propertyUpdaters.foreach(f => f(props)).asRight } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala index 25d47346c..eb71a6458 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala @@ -27,7 +27,7 @@ import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.manifest.core.{Item, ManifestError, ProcessingManifest, Application} import com.snowplowanalytics.manifest.dynamodb.DynamoDbManifest -import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget.ProcessingManifestConfig +import com.snowplowanalytics.snowplow.rdbloader.common.StorageTarget.ProcessingManifestConfig import com.snowplowanalytics.snowplow.rdbloader.discovery.ManifestDiscovery import com.snowplowanalytics.snowplow.rdbloader.interpreters.Interpreter diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala index eca0cdad2..126a93a94 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala @@ -14,7 +14,6 @@ package com.snowplowanalytics.snowplow.rdbloader package interpreters.implementations import java.io.ByteArrayInputStream -import java.util.UUID import java.nio.charset.StandardCharsets import scala.util.control.NonFatal @@ -38,6 +37,7 @@ import com.snowplowanalytics.snowplow.scalatracker.emitters.id.{SyncBatchEmitter // This project import config.SnowplowConfig.{GetMethod, Monitoring, PostMethod} +import common._ object TrackerInterpreter { @@ -45,12 +45,6 @@ object TrackerInterpreter { val LoadSucceededSchema = SchemaKey("com.snowplowanalytics.monitoring.batch", "load_succeeded", "jsonschema", SchemaVer.Full(1,0,0)) val LoadFailedSchema = SchemaKey("com.snowplowanalytics.monitoring.batch", "load_failed", "jsonschema", SchemaVer.Full(1,0,0)) - implicit val uuidProviderId: UUIDProvider[Id] = - new UUIDProvider[Id] { - def generateUUID: Id[UUID] = - UUID.randomUUID() - } - /** Callback for failed */ private def callback(params: CollectorParams, request: CollectorRequest, response: CollectorResponse): Unit = { def toMsg(rsp: CollectorResponse, includeHeader: Boolean): String = rsp match { diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala index 6ed7d09a5..37d31bbb2 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala @@ -21,8 +21,9 @@ import shapeless.tag import shapeless.tag._ // This project +import common.StorageTarget + import config.{ CliConfig, Step } -import config.StorageTarget.{ PostgresqlConfig, RedshiftConfig } import db.Entities._ import discovery.DataDiscovery @@ -51,27 +52,29 @@ object Common { def getDescriptor: String = getTable(schema, TransitEventsTable) } + /** + * Subpath to check `atomic-events` directory presence + */ + val atomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r + // year month day hour minute second + def getTable(databaseSchema: String, tableName: String): String = if (databaseSchema.isEmpty) tableName else databaseSchema + "." + tableName - /** - * Correctly merge database schema and table name - */ + /** Correctly merge database schema and table name */ def getEventsTable(databaseSchema: String): String = getTable(databaseSchema, EventsTable) - /** - * Correctly merge database schema and table name - */ + def getEventsTable(storage: StorageTarget): String = + getEventsTable(storage.schema) + + /** Correctly merge database schema and table name */ def getManifestTable(databaseSchema: String): String = getTable(databaseSchema, ManifestTable) - /** - * Subpath to check `atomic-events` directory presence - */ - val atomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r - // year month day hour minute second + def getManifestTable(storage: StorageTarget): String = + getManifestTable(storage.schema) /** * Process any valid storage target, @@ -81,9 +84,9 @@ object Common { */ def load(cliConfig: CliConfig, discovery: List[DataDiscovery]): LoaderAction[Unit] = { val loadDb = cliConfig.target match { - case postgresqlTarget: PostgresqlConfig => + case postgresqlTarget: StorageTarget.PostgresqlConfig => PostgresqlLoader.run(postgresqlTarget, cliConfig.steps, discovery) - case redshiftTarget: RedshiftConfig => + case redshiftTarget: StorageTarget.RedshiftConfig => RedshiftLoader.run(cliConfig.configYaml, redshiftTarget, cliConfig.steps, discovery) } @@ -111,12 +114,12 @@ object Common { } cliConfig.target match { - case _: RedshiftConfig => + case _: StorageTarget.RedshiftConfig => val original = DataDiscovery.discoverFull(target, cliConfig.target.id, shredJob, region, assets) if (cliConfig.steps.contains(Step.ConsistencyCheck) && cliConfig.target.processingManifest.isEmpty) DataDiscovery.checkConsistency(original) else original - case _: PostgresqlConfig => + case _: StorageTarget.PostgresqlConfig => // Safe to skip consistency check as whole folder will be downloaded DataDiscovery.discoverFull(target, cliConfig.target.id, shredJob, region, assets) } @@ -208,7 +211,7 @@ object Common { private[loaders] def getManifestItem(schema: String, etlTstamp: SqlTimestamp): LoaderAction[Option[LoadManifestItem]] = { val query = s"""SELECT * - | FROM ${Common.getManifestTable(schema)} + | FROM ${getManifestTable(schema)} | WHERE etl_tstamp = '$etlTstamp' | ORDER BY etl_tstamp DESC | LIMIT 1""".stripMargin diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala index 50a9ac96f..bee1102df 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala @@ -15,10 +15,9 @@ package loaders import cats.implicits._ -// This project +import common.StorageTarget.PostgresqlConfig import LoaderA._ import config.Step -import config.StorageTarget.PostgresqlConfig import discovery.DataDiscovery object PostgresqlLoader { @@ -32,7 +31,8 @@ object PostgresqlLoader { * @param discovery discovered data to load */ def run(target: PostgresqlConfig, steps: Set[Step], discovery: List[DataDiscovery]): LoaderAction[Unit] = { - val statements = PostgresqlLoadStatements.build(target.eventsTable, steps) + val eventsTable = Common.getEventsTable(target) + val statements = PostgresqlLoadStatements.build(eventsTable, steps) for { _ <- discovery.traverse(loadFolder(statements)) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala index 094124531..48ea80bcb 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala @@ -16,9 +16,10 @@ package loaders import cats.implicits._ // This project +import common.StorageTarget.RedshiftConfig + import Common._ import config.{SnowplowConfig, Step} -import config.StorageTarget.RedshiftConfig import config.SnowplowConfig.OutputCompression import discovery.{DataDiscovery, ShreddedType} @@ -127,16 +128,17 @@ object RedshiftLoadStatements { val shreddedCopyStatements = shreddedStatements.map(_.copy) val manifestStatement = getManifestStatements(target.schema, shreddedStatements.size) + val eventsTable = Common.getEventsTable(target) // Vacuum all tables including events-table val vacuum = if (steps.contains(Step.Vacuum)) { - val statements = buildVacuumStatement(target.eventsTable) :: shreddedStatements.map(_.vacuum) + val statements = buildVacuumStatement(eventsTable) :: shreddedStatements.map(_.vacuum) Some(statements) } else None // Analyze all tables including events-table val analyze = if (steps.contains(Step.Analyze)) { - val statements = buildAnalyzeStatement(target.eventsTable) :: shreddedStatements.map(_.analyze) + val statements = buildAnalyzeStatement(eventsTable) :: shreddedStatements.map(_.analyze) Some(statements) } else None @@ -155,6 +157,7 @@ object RedshiftLoadStatements { */ def buildEventsCopy(config: SnowplowConfig, target: RedshiftConfig, s3path: S3.Folder, transitCopy: Boolean): AtomicCopy = { val compressionFormat = getCompressionFormat(config.enrich.outputCompression) + val eventsTable = Common.getEventsTable(target) if (transitCopy) { TransitCopy(SqlString.unsafeCoerce( @@ -169,7 +172,7 @@ object RedshiftLoadStatements { | ACCEPTINVCHARS $compressionFormat""".stripMargin)) } else { StraightCopy(SqlString.unsafeCoerce( - s"""COPY ${target.eventsTable} FROM '$s3path' + s"""COPY $eventsTable FROM '$s3path' | CREDENTIALS 'aws_iam_role=${target.roleArn}' REGION AS '${config.aws.s3.region}' | DELIMITER '$EventFieldSeparator' | MAXERROR ${target.maxError} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala index 4b6bedee6..d9b73d32e 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala @@ -16,11 +16,13 @@ package loaders import cats.implicits._ // This project +import common.StorageTarget + import LoaderA._ import RedshiftLoadStatements._ import Common.{ SqlString, EventsTable, checkLoadManifest, AtomicEvents, TransitTable } import discovery.DataDiscovery -import config.{ SnowplowConfig, Step, StorageTarget } +import config.{ SnowplowConfig, Step } /** diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala index d3086ac9e..574d9a52f 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala @@ -22,8 +22,6 @@ import cats.effect.Clock import rdbloader.discovery.DiscoveryFailure import rdbloader.interpreters.implementations.ManifestInterpreter.ManifestE -import scala.concurrent.duration.{ TimeUnit, MILLISECONDS, NANOSECONDS } - package object rdbloader { // RDB Loader's algebra defines hierarchy with three types common for all modules @@ -120,12 +118,4 @@ package object rdbloader { } implicit val catsClockManifestInstance: Clock[ManifestE] = Clock.create[ManifestE] - - implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] { - override def realTime(unit: TimeUnit): Id[Long] = - unit.convert(System.nanoTime(), NANOSECONDS) - - override def monotonic(unit: TimeUnit): Id[Long] = - unit.convert(System.currentTimeMillis(), MILLISECONDS) - } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala index 50ad571a2..3f2cc136d 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala @@ -13,9 +13,6 @@ package com.snowplowanalytics.snowplow.rdbloader package utils -import scala.reflect.runtime.universe._ -import scala.reflect.runtime.{universe => ru} - import cats.data._ import cats.implicits._ @@ -86,74 +83,6 @@ object Common { .toLowerCase - /** - * Common trait for all ADTs that can be read from string - * Must be extended by sealed hierarchy including only singletons - * Used by `decodeStringEnum` to get runtime representation of whole ADT - */ - trait StringEnum { - /** - * **IN** string representation. - * It should be used only to help read `StringEnum` from string - * and never other way round, such as render value into SQL statement - */ - def asString: String - } - - /** - * Derive decoder for ADT with `StringEnum` - * - * @tparam A sealed hierarchy - * @return circe decoder for ADT `A` - */ - def decodeStringEnum[A <: StringEnum: TypeTag]: Decoder[A] = - Decoder.instance(parseEnum[A]) - - /** - * Parse element of `StringEnum` sealed hierarchy from circe AST - * - * @param hCursor parser's cursor - * @tparam A sealed hierarchy - * @return either successful circe AST or decoding failure - */ - private def parseEnum[A <: StringEnum: TypeTag](hCursor: HCursor): Decoder.Result[A] = { - for { - string <- hCursor.as[String] - method = fromString[A](string) - result <- method.asDecodeResult(hCursor) - } yield result - } - - /** - * Parse element of `StringEnum` sealed hierarchy from String - * - * @param string line containing `asString` representation of `StringEnum` - * @tparam A sealed hierarchy - * @return either successful circe AST or decoding failure - */ - def fromString[A <: StringEnum: TypeTag](string: String): Either[String, A] = { - val map = sealedDescendants[A].map { o => (o.asString, o) }.toMap - map.get(string) match { - case Some(a) => Right(a) - case None => Left(s"Unknown ${typeOf[A].typeSymbol.name.toString} [$string]") - } - } - - /** - * Get all objects extending some sealed hierarchy - * @tparam Root some sealed trait with object descendants - * @return whole set of objects - */ - def sealedDescendants[Root: TypeTag]: Set[Root] = { - val symbol = typeOf[Root].typeSymbol - val internal = symbol.asInstanceOf[scala.reflect.internal.Symbols#Symbol] - val descendants = if (internal.isSealed) - Some(internal.sealedDescendants.map(_.asInstanceOf[Symbol]) - symbol) - else None - descendants.getOrElse(Set.empty).map(x => getCaseObject(x).asInstanceOf[Root]) - } - - private val m = ru.runtimeMirror(getClass.getClassLoader) /** Registry embedded into RDB Loader jar */ private val loaderRefConf = Registry.Config("RDB Loader Embedded", 0, List("com.snowplowanalytics.snowplow.rdbloader")) @@ -163,16 +92,6 @@ object Common { val DefaultClient: Client[ManifestE, Json] = Client(Resolver(List(ManifestRegistry, LoaderRegistry), None), CirceValidator) - /** - * Reflection method to get runtime object by compiler's `Symbol` - * @param desc compiler runtime `Symbol` - * @return "real" scala case object - */ - private def getCaseObject(desc: Symbol): Any = { - val mod = m.staticModule(desc.asClass.fullName) - m.reflectModule(mod).instance - } - /** * Syntax extension to transform `Either` with string as failure * into circe-appropriate decoder result diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala index ef72898a9..1cff8b228 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala @@ -24,8 +24,10 @@ import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.core.SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ +import common.StorageTarget + import S3.Folder.{coerce => s3} -import config.{ SnowplowConfig, Semver, StorageTarget } +import config.{ SnowplowConfig, Semver } import config.Semver._ import config.SnowplowConfig._ import loaders.Common.SqlString @@ -40,6 +42,10 @@ object SpecHelpers { val resolverJson = parse(new String(java.util.Base64.getDecoder.decode(resolverConfig))).getOrElse(throw new RuntimeException("Invalid resolver.json")) val resolver = Resolver.parse[Id](resolverJson).toOption.getOrElse(throw new RuntimeException("Invalid resolver config")) +// val staticRegistryUri = "http://iglucentral-dev.com.s3-website-us-east-1.amazonaws.com/feature/rdb-blacklist" +// val staticRegistry = Registry.Http(Registry.Config("Test registry", 0, Nil), Registry.HttpConnection(java.net.URI.create(staticRegistryUri), None)) +// val resolver = Resolver(List(staticRegistry), None) + val targetStream = getClass.getResourceAsStream("/valid-redshift.json.base64") val target = fromInputStream(targetStream).getLines.mkString("\n") @@ -71,7 +77,6 @@ object SpecHelpers { Monitoring(Map(),Logging(DebugLevel),Some(SnowplowMonitoring(Some(GetMethod),Some("batch-pipeline"),Some("snplow.acme.com"))))) val disableSsl = StorageTarget.RedshiftJdbc.empty.copy(ssl = Some(false)) - val enableSsl = StorageTarget.RedshiftJdbc.empty.copy(ssl = Some(true)) val validTarget = StorageTarget.RedshiftConfig( "e17c0ded-eee7-4845-a7e6-8fdc88d599d0", @@ -87,6 +92,7 @@ object SpecHelpers { 1, 20000, None, + None, None) val validTargetWithManifest = StorageTarget.RedshiftConfig( @@ -104,7 +110,8 @@ object SpecHelpers { 20000, None, Some(StorageTarget.ProcessingManifestConfig( - StorageTarget.ProcessingManifestConfig.AmazonDynamoDbConfig("some-manifest"))) + StorageTarget.ProcessingManifestConfig.AmazonDynamoDbConfig("some-manifest"))), + None ) /** diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala index f68eb9e5f..4c6977d04 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala @@ -17,9 +17,11 @@ import cats._ import org.specs2.Specification // This project +import common.StorageTarget + import S3.Folder import discovery.DataDiscovery -import config.{ Step, StorageTarget } +import config.Step class CommonSpec extends Specification { def is = s2""" Check that SSH tunnel gets open and closed if necessary $e1 @@ -54,6 +56,7 @@ class CommonSpec extends Specification { def is = s2""" 100, 1000L, Some(TunnelInput), + None, None) def interpreter: LoaderA ~> Id = new (LoaderA ~> Id) {