From 3b012889415c6afc5624ad5d1d9bab964d69a381 Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Mon, 15 Jan 2018 19:19:06 +0700 Subject: [PATCH] Common: use processing manifest (close #81) --- build.sbt | 8 +- project/BuildSettings.scala | 16 +- project/Dependencies.scala | 2 + .../spark/DynamodbManifest.scala | 37 ++++ .../spark/ShredJob.scala | 80 +++++-- .../spark/ShredJobConfig.scala | 6 +- .../spark/StringSetAccumulator.scala | 59 +++++ .../spark/singleton.scala | 37 ++-- .../ShredJobSpec.scala | 6 +- .../snowplow/rdbloader/LoaderA.scala | 8 +- .../snowplow/rdbloader/LoaderError.scala | 15 ++ .../snowplow/rdbloader/S3.scala | 4 +- .../rdbloader/config/StorageTarget.scala | 92 +++++--- .../{ => discovery}/DataDiscovery.scala | 73 ++++--- .../discovery/ManifestDiscovery.scala | 171 +++++++++++++++ .../{ => discovery}/ShreddedType.scala | 28 ++- .../interpreters/DryRunInterpreter.scala | 18 +- .../interpreters/RealWorldInterpreter.scala | 18 +- .../implementations/ManifestInterpreter.scala | 50 +++++ .../implementations/PgInterpreter.scala | 2 +- .../snowplow/rdbloader/loaders/Common.scala | 15 +- .../rdbloader/loaders/PostgresqlLoader.scala | 1 + .../loaders/RedshiftLoadStatements.scala | 13 +- .../rdbloader/loaders/RedshiftLoader.scala | 1 + .../snowplow/rdbloader/package.scala | 29 ++- .../snowplow/rdbloader/SpecHelpers.scala | 61 ++++++ .../rdbloader/config/CliConfigSpec.scala | 2 +- .../rdbloader/config/StorageTargetSpec.scala | 6 +- .../{ => discovery}/DataDiscoverySpec.scala | 19 +- .../discovery/ManifestDiscoverySpec.scala | 201 ++++++++++++++++++ .../{ => discovery}/ShreddedTypeSpec.scala | 11 +- .../rdbloader/loaders/CommonSpec.scala | 4 +- .../loaders/RedshiftLoaderSpec.scala | 2 +- 33 files changed, 953 insertions(+), 142 deletions(-) create mode 100644 shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala create mode 100644 shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/StringSetAccumulator.scala rename src/main/scala/com/snowplowanalytics/snowplow/rdbloader/{ => discovery}/DataDiscovery.scala (89%) create mode 100644 src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala rename src/main/scala/com/snowplowanalytics/snowplow/rdbloader/{ => discovery}/ShreddedType.scala (89%) create mode 100644 src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala rename src/test/scala/com/snowplowanalytics/snowplow/rdbloader/{ => discovery}/DataDiscoverySpec.scala (97%) create mode 100644 src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala rename src/test/scala/com/snowplowanalytics/snowplow/rdbloader/{ => discovery}/ShreddedTypeSpec.scala (97%) diff --git a/build.sbt b/build.sbt index aa09bdbf8..5172a59b3 100755 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2017 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -19,7 +19,7 @@ lazy val loader = project.in(file(".")) mainClass in Compile := Some("com.snowplowanalytics.snowplow.rdbloader.Main") ) .settings(BuildSettings.buildSettings) - .settings(BuildSettings.scalifySettings) + .settings(BuildSettings.scalifySettings(name in shredder, version in shredder)) .settings(BuildSettings.assemblySettings) .settings(resolvers ++= Dependencies.resolutionRepos) .settings( @@ -34,12 +34,14 @@ lazy val loader = project.in(file(".")) Dependencies.circeYaml, Dependencies.circeGeneric, Dependencies.circeGenericExtra, + Dependencies.manifest, Dependencies.postgres, Dependencies.redshift, Dependencies.redshiftSdk, Dependencies.s3, Dependencies.ssm, + Dependencies.dynamodb, Dependencies.jSch, Dependencies.specs2, @@ -58,6 +60,7 @@ lazy val shredder = project.in(file("shredder")) .settings(BuildSettings.buildSettings) .settings(resolvers ++= Dependencies.resolutionRepos) .settings(BuildSettings.shredderAssemblySettings) + .settings(BuildSettings.scalifySettings(name, version)) .settings( libraryDependencies ++= Seq( // Java @@ -70,6 +73,7 @@ lazy val shredder = project.in(file("shredder")) Dependencies.scopt, Dependencies.commonEnrich, Dependencies.igluClient, + Dependencies.manifest, // Scala (test only) Dependencies.specs2 ) diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 34d0c4a7b..d15ce0a1c 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -56,7 +56,7 @@ object BuildSettings { "-target", "1.8" ), - addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.4" cross CrossVersion.binary) + addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.6" cross CrossVersion.binary) ) // sbt-assembly settings @@ -102,8 +102,8 @@ object BuildSettings { case x if x.startsWith("META-INF") => MergeStrategy.discard case x if x.endsWith(".html") => MergeStrategy.discard case x if x.endsWith("package-info.class") => MergeStrategy.first - case PathList("com", "google", "common", tail@_*) => MergeStrategy.first - case PathList("org", "apache", "spark", "unused", tail@_*) => MergeStrategy.first + case PathList("com", "google", "common", _) => MergeStrategy.first + case PathList("org", "apache", "spark", "unused", _) => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) @@ -113,17 +113,21 @@ object BuildSettings { /** * Makes package (build) metadata available withing source code */ - lazy val scalifySettings = Seq( + def scalifySettings(shredderName: SettingKey[String], shredderVersion: SettingKey[String]) = Seq( sourceGenerators in Compile += Def.task { val file = (sourceManaged in Compile).value / "settings.scala" IO.write(file, """package com.snowplowanalytics.snowplow.rdbloader.generated |object ProjectMetadata { | val version = "%s" - | val name = "%s" + | val name = "%s" // DO NOT EDIT! Processing Manifest depends on it | val organization = "%s" | val scalaVersion = "%s" + | + | val shredderName = "%s" // DO NOT EDIT! Processing Manifest depends on it + | val shredderVersion = "%s" |} - |""".stripMargin.format(version.value, name.value, organization.value, scalaVersion.value)) + |""".stripMargin.format( + version.value,name.value, organization.value, scalaVersion.value, shredderName.value, shredderVersion.value)) Seq(file) }.taskValue ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2d512966f..53cf48fc0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,6 +24,7 @@ object Dependencies { val circeYaml = "0.7.0" val circe = "0.9.0" val cats = "1.0.1" + val manifest = "0.1.0-SNAPSHOT" // Scala (Shredder) val spark = "2.2.0" @@ -62,6 +63,7 @@ object Dependencies { val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient val igluCore = "com.snowplowanalytics" %% "iglu-core" % V.igluCore intransitive() val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.scalaTracker + val manifest = "com.snowplowanalytics" %% "processing-manifest" % V.manifest val cats = "org.typelevel" %% "cats" % V.cats val catsFree = "org.typelevel" %% "cats-free" % V.cats val circeCore = "io.circe" %% "circe-core" % V.circe diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala new file mode 100644 index 000000000..8fceb43ab --- /dev/null +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/DynamodbManifest.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.spark + +import cats.implicits._ + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder + +import com.snowplowanalytics.manifest.core.ProcessingManifest._ +import com.snowplowanalytics.manifest.dynamodb.DynamoDbManifest +import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata + +object DynamodbManifest { + + type ManifestFailure[A] = Either[ManifestError, A] + + val ShredJobApplication = Application(ProjectMetadata.name, ProjectMetadata.version, None) + + val ShreddedTypesKeys = "processed:shredder:types" + + def initialize(tableName: String) = { + val client = AmazonDynamoDBClientBuilder.standard().build() + DynamoDbManifest[ManifestFailure](client, tableName) + } +} diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala index 28e304eed..98e19bc65 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala @@ -19,6 +19,7 @@ package storage.spark import java.io.{PrintWriter, StringWriter} import java.util.UUID +import scala.util.Try import scala.util.control.NonFatal // Jackson @@ -37,6 +38,10 @@ import Scalaz._ // AWS SDK import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException +// Manifest +import com.snowplowanalytics.manifest.core.ProcessingManifest +import com.snowplowanalytics.manifest.core.ProcessingManifest._ + // Snowplow import iglu.client.{JsonSchemaPair, ProcessingMessageNel, Resolver, SchemaKey} import iglu.client.validation.ProcessingMessageMethods._ @@ -69,17 +74,47 @@ object ShredJob extends SparkJob { classOf[org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult] ) override def sparkConfig(): SparkConf = new SparkConf() - .setAppName(getClass().getSimpleName()) + .setAppName(getClass.getSimpleName) .setIfMissing("spark.master", "local[*]") - .set("spark.serializer", classOf[KryoSerializer].getName()) + .set("spark.serializer", classOf[KryoSerializer].getName) .registerKryoClasses(classesToRegister) override def run(spark: SparkSession, args: Array[String]): Unit = { - val job = ShredJob(spark, args) - job.run() + // Job configuration + val shredConfig = ShredJobConfig + .loadConfigFrom(args) + .valueOr(e => throw new FatalEtlError(e.toString)) + + val job = new ShredJob(spark, shredConfig) + + // Processing manifest, existing only on a driver + val manifest = shredConfig.dynamodbManifestTable.map(DynamodbManifest.initialize) + runJob(manifest, shredConfig.inFolder, job) } - def apply(spark: SparkSession, args: Array[String]) = new ShredJob(spark, args) + /** Start a job, if necessary recording process to manifest */ + def runJob(manifest: Option[ProcessingManifest[Either[ManifestError, ?]]], + path: String, + job: ShredJob): Try[Unit] = { + manifest match { + case None => // Manifest is not enabled, simply run a job + Try(job.run()).map(_ => None) + case Some(m) => // Manifest is enabled. + // Envelope job into lazy function to pass to `Manifest.processItem` + val process = () => Try { + job.run() + val shreddedTypes = job.shreddedTypes.value.toSet + val payload: Payload = Payload.empty.copy(set = Map(DynamodbManifest.ShreddedTypesKeys -> shreddedTypes)) + Some(payload) + } + + m.processNewItem(path, DynamodbManifest.ShredJobApplication, process) match { + case Right(_) => util.Success(()) + case Left(ManifestError.ApplicationError(t, _, _)) => util.Failure(t) + case Left(error) => throw new FatalEtlError(error.toString) + } + } + } /** * Pipeline the loading of raw lines into shredded JSONs. @@ -238,7 +273,7 @@ object ShredJob extends SparkJob { */ def getAlteredEnrichedOutputPath(outFolder: String): String = { val alteredEnrichedEventSubdirectory = "atomic-events" - s"${outFolder}${if (outFolder.endsWith("/")) "" else "/"}${alteredEnrichedEventSubdirectory}" + s"$outFolder${if (outFolder.endsWith("/")) "" else "/"}$alteredEnrichedEventSubdirectory" } /** @@ -248,7 +283,7 @@ object ShredJob extends SparkJob { */ def getShreddedTypesOutputPath(outFolder: String): String = { val shreddedTypesSubdirectory = "shredded-types" - s"${outFolder}${if (outFolder.endsWith("/")) "" else "/"}${shreddedTypesSubdirectory}" + s"$outFolder${if (outFolder.endsWith("/")) "" else "/"}$shreddedTypesSubdirectory" } /** @@ -308,16 +343,16 @@ case class Event(eventId: String, newEventId: Option[String], shredded: Shredded /** * The Snowplow Shred job, written in Spark. * @param spark Spark session used throughout the job - * @param args Command line arguments for the shred job + * @param shredConfig parsed command-line arguments */ -class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends Serializable { +class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) extends Serializable { @transient private val sc: SparkContext = spark.sparkContext import spark.implicits._ import singleton._ - // Job configuration - private val shredConfig = ShredJobConfig.loadConfigFrom(args) - .valueOr(e => throw new FatalEtlError(e.toString)) + // Accumulator to track shredded types + val shreddedTypes = new StringSetAccumulator + sc.register(shreddedTypes) private val dupStorageConfig = DuplicateStorage.DynamoDbConfig.extract( shredConfig.duplicateStorageConfig.success, @@ -328,6 +363,18 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends // the table if it doesn't exist @transient private val _ = DuplicateStorageSingleton.get(dupStorageConfig) + /** Save set of found shredded types into accumulator if processing manifest is enabled */ + def recordShreddedType(jsonSchemaPairs: List[JsonSchemaPair]): Unit = { + if (shredConfig.duplicateStorageConfig.isEmpty) { + () + } else { + val typesSet = jsonSchemaPairs.toSet[JsonSchemaPair].map { + case (schemaKey, _) => schemaKey.toSchemaUri + } + shreddedTypes.add(typesSet) + } + } + /** * Runs the shred job by: * - shredding the Snowplow enriched events @@ -354,13 +401,17 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends val good = common .flatMap { case (line, shredded) => projectGoods(shredded).map((_, line)) } .map { case (shred, line) => Shredded(shred._1, shred._2, shred._3, shred._4, line) } - .groupBy(s => (s.eventId, s.eventFingerprint)) + .groupBy { s => (s.eventId, s.eventFingerprint) } .flatMap { case (_, vs) => vs.take(1) } .map { s => val absent = dedupeCrossBatch((s.eventId, s.eventFingerprint, s.etlTstamp), DuplicateStorageSingleton.get(dupStorageConfig)) (s, absent) } + .map { case original @ (Shredded(_, _, shreds, _, _), _) => + recordShreddedType(shreds) + original + } .cache() // Deduplication operation succeeded @@ -373,7 +424,7 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends // Count synthetic duplicates, defined as events with the same id but different fingerprints val syntheticDupes = dupeSucceeded - .map(_._1) + .map { case (shredded, _) => shredded } .groupBy(_.eventId) .filter { case (_, vs) => vs.size > 1 } .map { case (k, vs) => (k, vs.size.toLong) } @@ -398,6 +449,7 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends case Failure(m) => Some(Row(BadRow(s.originalLine, m).toCompactJson)) case _ => None } } + spark.createDataFrame(badDupes, StructType(StructField("_", StringType, true) :: Nil)) .write .mode(SaveMode.Overwrite) diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala index 32ded0dbc..a8273a59b 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala @@ -36,7 +36,8 @@ case class ShredJobConfig( outFolder: String = "", badFolder: String = "", igluConfig: String = "", - duplicateStorageConfig: Option[String] = None + duplicateStorageConfig: Option[String] = None, + dynamodbManifestTable: Option[String] = None ) object ShredJobConfig { @@ -57,6 +58,9 @@ object ShredJobConfig { opt[String]("duplicate-storage-config").optional().valueName(" c.copy(duplicateStorageConfig = Some(d))) .text("Duplicate storage configuration") + opt[String]("processing-manifest-table").optional().valueName("") + .action((d, c) => c.copy(dynamodbManifestTable = Some(d))) + .text("Processing manifest table") help("help").text("Prints this usage text") } diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/StringSetAccumulator.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/StringSetAccumulator.scala new file mode 100644 index 000000000..2c02d5d1f --- /dev/null +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/StringSetAccumulator.scala @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.spark + +import org.apache.spark.util.AccumulatorV2 + +import scala.collection.mutable + +import StringSetAccumulator._ + +class StringSetAccumulator extends AccumulatorV2[KeyAccum, KeyAccum] { + + private val accum = mutable.Set.empty[String] + + def merge(other: AccumulatorV2[KeyAccum, KeyAccum]): Unit = other match { + case o: StringSetAccumulator => accum ++= o.accum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + def isZero: Boolean = accum.isEmpty + + def copy(): AccumulatorV2[KeyAccum, KeyAccum] = { + val newAcc = new StringSetAccumulator + accum.synchronized { + newAcc.accum ++= accum + } + newAcc + } + + def value = accum + + def add(keys: KeyAccum): Unit = { + accum ++= keys + } + + def add(keys: Set[String]): Unit = { + val mutableSet = mutable.Set(keys.toList: _*) + add(mutableSet) + } + + def reset(): Unit = { + accum.clear() + } +} + +object StringSetAccumulator { + type KeyAccum = mutable.Set[String] +} diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala index 1b7a3cf25..7a621cbe3 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/singleton.scala @@ -12,22 +12,28 @@ * See the Apache License Version 2.0 for the specific language governing permissions and * limitations there under. */ -package com.snowplowanalytics -package snowplow +package com.snowplowanalytics.snowplow package storage.spark -// Jackson +// Iglu +import com.snowplowanalytics.iglu.client.Resolver import com.fasterxml.jackson.databind.JsonNode +import com.github.fge.jsonschema.core.report.ProcessingMessage -// Snowplow -import iglu.client.Resolver +// SCE import enrich.common.{FatalEtlError, ValidatedNelMessage} +// This project +import utils.base64.base64ToJsonNode + /** Singletons needed for unserializable or stateful classes. */ object singleton { + /** Singleton for Iglu's Resolver to maintain one Resolver per node. */ object ResolverSingleton { + @volatile private var instance: Resolver = _ + /** * Retrieve or build an instance of Iglu's Resolver. * @param igluConfig JSON representing the Iglu configuration @@ -49,28 +55,31 @@ object singleton { * @param igluConfig JSON representing the Iglu resolver * @return A Resolver or one or more error messages boxed in a Scalaz ValidationNel */ - private[spark] def getIgluResolver(igluConfig: String): ValidatedNelMessage[Resolver] = + private[spark] def getIgluResolver(igluConfig: String): ValidatedNelMessage[Resolver] = { + val json = base64ToJsonNode(igluConfig, "iglu") + for { - node <- (utils.base64.base64ToJsonNode(igluConfig, "iglu") - .toValidationNel: ValidatedNelMessage[JsonNode]) - reso <- Resolver.parse(node) - } yield reso + node <- json.toValidationNel[ProcessingMessage, JsonNode] + resolver <- Resolver.parse(node) + } yield resolver + } } /** Singleton for DuplicateStorage to maintain one per node. */ object DuplicateStorageSingleton { + import DuplicateStorage._ + @volatile private var instance: Option[DuplicateStorage] = _ + /** * Retrieve or build an instance of DuplicateStorage. * @param dupStorageConfig configuration for DuplicateStorage */ - def get( - dupStorageConfig: Option[DuplicateStorage.DuplicateStorageConfig] - ): Option[DuplicateStorage] = { + def get(dupStorageConfig: Option[DuplicateStorageConfig]): Option[DuplicateStorage] = { if (instance == null) { synchronized { if (instance == null) { - instance = dupStorageConfig.map(DuplicateStorage.initStorage) match { + instance = dupStorageConfig.map(initStorage) match { case Some(v) => v.fold(e => throw new FatalEtlError(e.toString), c => Some(c)) case None => None } diff --git a/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala b/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala index f13038b84..e0734d3a1 100644 --- a/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala +++ b/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala @@ -267,7 +267,11 @@ trait ShredJobSpec extends SparkSpec { Array.empty[String] } - val job = ShredJob(spark, config ++ dedupeConfig) + val shredJobConfig = ShredJobConfig + .loadConfigFrom(config ++ dedupeConfig) + .fold(e => throw new RuntimeException(s"Cannot parse test configuration: $e"), c => c) + + val job = new ShredJob(spark, shredJobConfig) job.run() deleteRecursively(input) } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala index f9ff3f55d..331b55070 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2017 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -18,6 +18,8 @@ import cats.free.Free import cats.data.EitherT import cats.implicits._ +import com.snowplowanalytics.manifest.core.ProcessingManifest.Item + // This library import Security.Tunnel import loaders.Common.SqlString @@ -34,6 +36,7 @@ object LoaderA { case class ListS3(bucket: S3.Folder) extends LoaderA[Either[LoaderError, List[S3.Key]]] case class KeyExists(key: S3.Key) extends LoaderA[Boolean] case class DownloadData(path: S3.Folder, dest: Path) extends LoaderA[Either[LoaderError, List[Path]]] + case class ManifestDiscover(predicate: Item => Boolean) extends LoaderA[Either[LoaderError, List[Item]]] // Loading ops case class ExecuteQuery(query: SqlString) extends LoaderA[Either[LoaderError, Long]] @@ -73,6 +76,9 @@ object LoaderA { def downloadData(source: S3.Folder, dest: Path): Action[Either[LoaderError, List[Path]]] = Free.liftF[LoaderA, Either[LoaderError, List[Path]]](DownloadData(source, dest)) + /** Discover data from manifest */ + def manifestDiscover(predicate: Item => Boolean): Action[Either[LoaderError, List[Item]]] = + Free.liftF[LoaderA, Either[LoaderError, List[Item]]](ManifestDiscover(predicate)) /** Execute single query (against target in interpreter) */ def executeQuery(query: SqlString): Action[Either[LoaderError, Long]] = diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala index 58a8340a8..82298c21a 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala @@ -13,6 +13,9 @@ package com.snowplowanalytics.snowplow.rdbloader import cats.Show +import cats.data.ValidatedNel + +import com.snowplowanalytics.manifest.core.ProcessingManifest.ManifestError /** * Root error type @@ -118,6 +121,18 @@ object LoaderError { s"Cannot extract contexts or self-describing events from directory [$path].\nInvalid key example: $example. Total $invalidKeyCount invalid keys.\nCorrupted shredded/good state or unexpected Snowplow Shred job version" } + case class ManifestFailure(manifestError: ManifestError) extends DiscoveryFailure { + def getMessage: String = manifestError.toString // TODO: come up with meaningful error message + } + + /** Turn non-empty list of discovery failures into top-level `LoaderError` */ + def flattenValidated[A](validated: ValidatedNel[DiscoveryFailure, A]): Either[LoaderError, A] = + validated.leftMap(errors => DiscoveryError(errors.toList): LoaderError).toEither + + + def fromManifestError(manifestError: ManifestError): LoaderError = + DiscoveryError(ManifestFailure(manifestError)) + /** Other errors */ case class LoaderLocalError(message: String) extends LoaderError diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala index d54dced79..8c91f72e8 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala @@ -36,8 +36,8 @@ object S3 { def parse(s: String): Either[String, Folder] = s match { case _ if !correctlyPrefixed(s) => "Bucket name must start with s3:// prefix".asLeft - case _ if s.length > 1024 => "Key length cannot be more than 1024 symbols".asLeft - case _ => coerce(s).asRight + case _ if s.length > 1024 => "Key length cannot be more than 1024 symbols".asLeft + case _ => coerce(s).asRight } def coerce(s: String): Folder = diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala index 17551b6a3..e6c78e23e 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala @@ -28,6 +28,8 @@ import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.client.validation.ValidatableJValue._ +import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget.ProcessingManifestConfig + // This project import LoaderError._ import utils.Compat._ @@ -48,10 +50,12 @@ sealed trait StorageTarget extends Product with Serializable { def username: String def password: StorageTarget.PasswordConfig - def eventsTable = + def processingManifest: Option[ProcessingManifestConfig] + + def eventsTable: String = loaders.Common.getEventsTable(schema) - def shreddedTable(tableName: String) = + def shreddedTable(tableName: String): String = s"$schema.$tableName" def purpose: StorageTarget.Purpose @@ -72,53 +76,79 @@ object StorageTarget { case object FailedEvents extends Purpose { def asString = "FAILED_EVENTS" } case object EnrichedEvents extends Purpose { def asString = "ENRICHED_EVENTS" } - implicit val sslModeDecoder = + implicit val sslModeDecoder: Decoder[SslMode] = decodeStringEnum[SslMode] - implicit val purposeDecoder = + implicit val purposeDecoder: Decoder[Purpose] = decodeStringEnum[Purpose] /** - * Redshift config + * Configuration to access Snowplow Processing Manifest + * @param amazonDynamoDb Amazon DynamoDB table, the single available implementation + */ + case class ProcessingManifestConfig(amazonDynamoDb: ProcessingManifestConfig.AmazonDynamoDbConfig) + + object ProcessingManifestConfig { + case class AmazonDynamoDbConfig(tableName: String) + } + + /** + * PostgreSQL config * `com.snowplowanalytics.snowplow.storage/postgresql_config/jsonschema/1-1-0` */ - case class PostgresqlConfig( - id: Option[String], - name: String, - host: String, - database: String, - port: Int, - sslMode: SslMode, - schema: String, - username: String, - password: PasswordConfig, - sshTunnel: Option[TunnelConfig]) + case class PostgresqlConfig(id: Option[String], + name: String, + host: String, + database: String, + port: Int, + sslMode: SslMode, + schema: String, + username: String, + password: PasswordConfig, + sshTunnel: Option[TunnelConfig], + processingManifest: Option[ProcessingManifestConfig]) extends StorageTarget { val purpose = EnrichedEvents } /** * Redshift config - * `com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/2-1-0` + * `com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/3-0-0` */ - case class RedshiftConfig( - id: Option[String], - name: String, - host: String, - database: String, - port: Int, - sslMode: SslMode, - roleArn: String, - schema: String, - username: String, - password: PasswordConfig, - maxError: Int, - compRows: Long, - sshTunnel: Option[TunnelConfig]) + case class RedshiftConfig(id: Option[String], + name: String, + host: String, + database: String, + port: Int, + sslMode: SslMode, + roleArn: String, + schema: String, + username: String, + password: PasswordConfig, + maxError: Int, + compRows: Long, + sshTunnel: Option[TunnelConfig], + processingManifest: Option[ProcessingManifestConfig]) extends StorageTarget { val purpose = EnrichedEvents } + /** + * All possible JDBC according to Redshift documentation, except deprecated + * and authentication-related + */ + private case class RedshiftJdbc(blockingRows: Option[Int], + disableIsValidQuery: Option[Boolean], + dsiLogLevel: Option[Int], + filterLevel: Option[String], + loginTimeout: Option[Int], + loglevel: Option[Int], + socketTimeout: Option[Int], + ssl: Option[Boolean], + sslRootCert: Option[String], + tcpKeepAlive: Option[Boolean], + tcpKeepAliveMinutes: Option[Int]) + /** Reference to encrypted entity inside EC2 Parameter Store */ case class ParameterStoreConfig(parameterName: String) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscovery.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala similarity index 89% rename from src/main/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscovery.scala rename to src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala index 6189f6c9b..81a40951d 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscovery.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala @@ -11,14 +11,14 @@ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ package com.snowplowanalytics.snowplow.rdbloader +package discovery import cats.data._ -import cats.implicits._ import cats.free.Free +import cats.implicits._ -import ShreddedType._ -import LoaderError._ -import config.Semver +import com.snowplowanalytics.snowplow.rdbloader.config.Semver +import com.snowplowanalytics.snowplow.rdbloader.LoaderError._ /** * Result of data discovery in shredded.good folder @@ -67,6 +67,7 @@ sealed trait DataDiscovery extends Product with Serializable { */ object DataDiscovery { + /** Amount of times consistency check will be performed */ val ConsistencyChecks = 5 /** @@ -75,8 +76,9 @@ object DataDiscovery { * `InShreddedGood` results in noop on empty folder */ sealed trait DiscoveryTarget extends Product with Serializable - case class InShreddedGood(folder: S3.Folder) extends DiscoveryTarget + case class Global(folder: S3.Folder) extends DiscoveryTarget case class InSpecificFolder(folder: S3.Folder) extends DiscoveryTarget + case class ViaManifest(folder: Option[S3.Folder]) extends DiscoveryTarget /** * Discovery result that contains only atomic data (list of S3 keys in `atomic-events`) @@ -105,23 +107,29 @@ object DataDiscovery { * (atomic events and shredded types) */ def discoverFull(target: DiscoveryTarget, shredJob: Semver, region: String, assets: Option[S3.Folder]): LoaderAction[List[DataDiscovery]] = { - val validatedDataKeys: LoaderAction[ValidatedDataKeys] = target match { - case InShreddedGood(folder) => - listGoodBucket(folder).map(transformKeys(shredJob, region, assets)) + def group(validatedDataKeys: LoaderAction[ValidatedDataKeys]): LoaderAction[List[DataDiscovery]] = + for { + keys <- validatedDataKeys + discovery <- groupKeysFull(keys) + } yield discovery + + target match { + case Global(folder) => + val keys: LoaderAction[ValidatedDataKeys] = + listGoodBucket(folder).map(transformKeys(shredJob, region, assets)) + group(keys) case InSpecificFolder(folder) => - listGoodBucket(folder).map { keys => - if (keys.isEmpty) { - val failure = Validated.Invalid(NonEmptyList(NoDataFailure(folder), Nil)) - Free.pure(failure) - } else transformKeys(shredJob, region, assets)(keys) - } + val keys: LoaderAction[ValidatedDataKeys] = + listGoodBucket(folder).map { keys => + if (keys.isEmpty) { + val failure = Validated.Invalid(NonEmptyList(NoDataFailure(folder), Nil)) + Free.pure(failure) + } else transformKeys(shredJob, region, assets)(keys) + } + group(keys) + case ViaManifest(_) => + ManifestDiscovery.discover(region, assets) } - - val result = for { - keys <- validatedDataKeys - discovery <- groupKeysFull(keys) - } yield discovery - result } /** @@ -132,7 +140,7 @@ object DataDiscovery { */ def discoverAtomic(target: DiscoveryTarget): LoaderAction[List[DataDiscovery]] = target match { - case InShreddedGood(folder) => + case Global(folder) => for { keys <- listGoodBucket(folder) grouped <- LoaderAction.liftE(groupKeysAtomic(keys)) @@ -144,6 +152,9 @@ object DataDiscovery { discovery = if (keys.isEmpty) DiscoveryError(NoDataFailure(folder)).asLeft else groupKeysAtomic(keys) result <- LoaderAction.liftE[List[DataDiscovery]](discovery) } yield result + + case ViaManifest(_) => + ManifestDiscovery.discoverAtomic } /** @@ -200,14 +211,24 @@ object DataDiscovery { } } + /** Turn `FullDiscovery` into `AtomicDiscovery` */ + def downCastFullDiscovery(original: DataDiscovery): AtomicDiscovery = + original match { + case e: AtomicDiscovery => e + case FullDiscovery(base, cardinality, _) => AtomicDiscovery(base, cardinality) + } + /** - * Transform list of S3 keys into list of `DataKeyFinal` for `FullDisocvery` + * Transform list of S3 keys into list of `DataKeyFinal` for `FullDiscovery` */ private def transformKeys(shredJob: Semver, region: String, assets: Option[S3.Folder])(keys: List[S3.Key]): ValidatedDataKeys = { + def id(x: ValidatedNel[DiscoveryFailure, List[DataKeyFinal]]) = x + val intermediateDataKeys = keys.map(parseDataKey(shredJob, _)) - intermediateDataKeys.map(transformDataKey(_, region, assets)).sequence.map(_.sequence) + val finalDataKeys = intermediateDataKeys.traverse(transformDataKey(_, region, assets)) + sequenceInF(finalDataKeys, id) } - + /** * Transform intermediate `DataKey` into `ReadyDataKey` by finding JSONPath file * for each shredded type. Used to aggregate "invalid path" errors (produced by @@ -270,7 +291,7 @@ object DataDiscovery { */ def groupKeysAtomic(keys: List[S3.Key]): Either[LoaderError, List[AtomicDiscovery]] = { val atomicKeys: ValidatedNel[DiscoveryFailure, List[AtomicDataKey]] = - keys.filter(isAtomic).map(parseAtomicKey).map(_.toValidatedNel).sequence + keys.filter(isAtomic).traverse(parseAtomicKey(_).toValidatedNel) val validated = atomicKeys.andThen { keys => keys.groupBy(_.base).toList.traverse(validateFolderAtomic) } @@ -438,7 +459,7 @@ object DataDiscovery { * S3 key, representing intermediate shredded type file * It is intermediate because shredded type doesn't contain its JSONPath yet */ - private case class ShreddedDataKeyIntermediate(key: S3.Key, info: Info) extends DataKeyIntermediate + private case class ShreddedDataKeyIntermediate(key: S3.Key, info: ShreddedType.Info) extends DataKeyIntermediate /** * S3 key, representing intermediate shredded type file diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala new file mode 100644 index 000000000..8a29820b7 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader +package discovery + +import cats.data._ +import cats.implicits._ + +import com.snowplowanalytics.iglu.core.SchemaKey + +import com.snowplowanalytics.manifest.core.ProcessingManifest._ +import com.snowplowanalytics.manifest.core.ProcessingManifest.ManifestError._ + + +import com.snowplowanalytics.snowplow.rdbloader.LoaderError._ +import com.snowplowanalytics.snowplow.rdbloader.config.Semver +import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata + + +/** + * Module containing logic for discovering `ShreddedType` through `ProcessingManifest`, + * as opposed to `DataDiscovery`, where `ShreddedType` discovered directly from S3 + */ +object ManifestDiscovery { + + /** + * Protocol for exchanging payloads between Shredder and Loader + * Should be common compile-time dependency + */ + object RdbPayload { + val ShreddedTypesKeys = "processed:shredder:types" + val ShreddedTypesGet = (payload: Payload) => payload.set.get(ShreddedTypesKeys) + val ShreddedTypesSet = (payload: Payload, types: Set[String]) => { + val current = ShreddedTypesGet(payload).getOrElse(Set.empty[String]) + val updated = payload.set.updated(ShreddedTypesKeys, current ++ types) + payload.copy(set = updated) + } + } + + val ShredderName: String = ProjectMetadata.shredderName + val LoaderApp = Application(ProjectMetadata.name, ProjectMetadata.version, None) + val ShredderApp = Application(ShredderName, ProjectMetadata.shredderVersion, None) + + /** + * Get list of unprocessed items (as `DataDiscovery`) from processing manifest + * Unprocessed items are those that were processed by RDB Shredder and were NOT + * processed by RDB Loader. Will return `ManifestFailure` if some item is blocked + * + * @param region AWS S3 Region for JSONPaths + * @param jsonpathAssets user-owned bucket with JSONPaths + * @return list of data discoveries (always with shredded types) or error + * if any error occurred + */ + def discover(region: String, jsonpathAssets: Option[S3.Folder]): LoaderAction[List[DataDiscovery]] = { + val itemsA = LoaderA.manifestDiscover(predicate) + + for { + items <- EitherT[Action, LoaderError, List[Item]](itemsA) + discoveries <- items.traverse { item => for { + infos <- LoaderAction.liftE(parseItemPayload(item)) + discovery <- itemToDiscovery(region, jsonpathAssets, item.id, infos) + } yield discovery } + + } yield discoveries.distinct // .distinct is double-protection from double-loading + } + + def discoverAtomic: LoaderAction[List[DataDiscovery]] = { + val itemsA = LoaderA.manifestDiscover(predicate) + for { + items <- EitherT[Action, LoaderError, List[Item]](itemsA) + discoveries <- items.traverse { item => for { + infos <- LoaderAction.liftE(parseItemPayload(item)) + baseE = S3.Folder.parse(item.id).leftMap(ManifestError.parseError).leftMap(LoaderError.fromManifestError) + base <- LoaderAction.liftE(baseE) + discovery = DataDiscovery.AtomicDiscovery(base, 0) + } yield discovery } + + } yield discoveries.distinct // .distinct is double-protection from double-loading + + } + + /** Primary predicate, deciding what `Item` should be loaded */ + def predicate(item: Item): Boolean = + processedBy(ShredderApp, item) && !processedBy(LoaderApp, item) + + /** Get only shredder-"consumed" items */ + private[discovery] def parseItemPayload[F[_]: ManifestAction](item: Item): Either[LoaderError, List[ShreddedType.Info]] = { + // At least New, Processing, Processed + val shredderRecords = item.records.filter(_.app.name == ShredderName) + val processedRecord = findProcessed(shredderRecords) + + processedRecord.map(_.flatMap(parseRecord)) match { + case Some(either) => either.leftMap { error => DiscoveryError(ManifestFailure(error)) } + case None => + // Impossible error-state if function is used on filtered `Item` before + val message = s"Item [${item.id}] does not have 'Processed' step for $ShredderName" + val error: ManifestError = Corrupted(InvalidContent(NonEmptyList.one(message))) + DiscoveryError(ManifestFailure(error)).asLeft + } + } + + /** Find 'Processed' by shredder, but only if it has valid state */ + private def findProcessed(processedByShredder: List[Record]): Option[Either[ManifestError, Record]] = { + processedByShredder.foldLeft(none[Record].asRight[ManifestError]) { (result, record) => + if (record.state.step == StepState.Processed) { + val consumed = processedByShredder.exists(_.state.step == StepState.Processing) + result match { + case Right(None) if consumed => record.some.asRight + case Right(None) => + val error = s"Processed record ${record.state.show} does not have corresponding 'Processing' state" + Corrupted(InvalidContent(NonEmptyList.one(error))).asLeft + case other => other + } + } else none.asRight + }.sequence + } + + /** + * Extract information about shredded types (added to manifest by RDB Shredder) + * from `Processed` `Record` + * + * @param record `Processed` by RDB Shredder record + * @return list of shredded types discovered in this item (added by shredder) + */ + def parseRecord(record: Record): Either[ManifestError, List[ShreddedType.Info]] = { + val version = Semver.decodeSemver(record.app.version).toValidatedNel + val types = record.payload.flatMap(RdbPayload.ShreddedTypesGet).getOrElse(Set.empty) + val schemaKeys = types.toList.traverse { t => SchemaKey.fromUri(t) match { + case Some(ss) => ss.validNel[String] + case None => s"Key [$t] is invalid Iglu URI".invalidNel[SchemaKey] + }} + + val base = S3.Folder + .parse(record.id) + .leftMap(message => s"Path [${record.id}] is not valid base for shredded type. $message") + .toValidatedNel + + (version, schemaKeys, base).mapN { (v: Semver, k: List[SchemaKey], b: S3.Folder) => + k.map(kk => ShreddedType.Info(b, kk.vendor, kk.name, kk.version.model, v)) + } match { + case Validated.Valid(infos) => infos.distinct.asRight + case Validated.Invalid(errors) => + val state = record.state.show + val details = errors.toList.mkString(", ") + ManifestError.parseError(s"Cannot parse manifest record [$state] into ShreddedType.Info. $details").asLeft + } + } + + private def itemToDiscovery(region: String, + jsonpathAssets: Option[S3.Folder], + itemId: ItemId, + infos: List[ShreddedType.Info]): LoaderAction[DataDiscovery] = { + val shreddedTypes = ShreddedType.discoverBatch(region, jsonpathAssets, infos) + + val base: Either[LoaderError, S3.Folder] = + S3.Folder.parse(itemId) + .leftMap(error => LoaderError.fromManifestError(ManifestError.parseError(error))) + + (LoaderAction.liftE(base), shreddedTypes).mapN(DataDiscovery.FullDiscovery(_, 0, _)) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedType.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala similarity index 89% rename from src/main/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedType.scala rename to src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala index 55a7b1d59..7f907903f 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedType.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala @@ -11,18 +11,21 @@ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ package com.snowplowanalytics.snowplow.rdbloader +package discovery -import cats.implicits._ +import cats.data._ import cats.free.Free +import cats.implicits._ import com.snowplowanalytics.iglu.client.SchemaCriterion import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.snowplow.rdbloader.LoaderError._ +import com.snowplowanalytics.snowplow.rdbloader.config.Semver +import com.snowplowanalytics.snowplow.rdbloader.utils.Common.toSnakeCase + // This project -import LoaderError._ -import config.Semver -import utils.Common.toSnakeCase /** @@ -172,6 +175,23 @@ object ShreddedType { } } + /** Discover multiple JSONPaths for shredded types at once and turn into `LoaderAction` */ + def discoverBatch(region: String, + jsonpathAssets: Option[S3.Folder], + raw: List[ShreddedType.Info]): LoaderAction[List[ShreddedType]] = { + // Discover data for single item + def discover(info: ShreddedType.Info): Action[ValidatedNel[DiscoveryFailure, ShreddedType]] = { + val jsonpaths = ShreddedType.discoverJsonPath(region, jsonpathAssets, info) + val shreddedType = jsonpaths.map(_.map(s3key => ShreddedType(info, s3key))) + shreddedType.map(_.toValidatedNel) + } + + val action: Action[Either[LoaderError, List[ShreddedType]]] = + sequenceInF(raw.traverse(discover), LoaderError.flattenValidated[List[ShreddedType]]) + + LoaderAction(action) + } + /** * Get Snowplow hosted assets S3 bucket for specific region * diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala index bbebd08e9..8881a97c8 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala @@ -16,10 +16,8 @@ package interpreters import java.nio.file._ import scala.collection.mutable.ListBuffer - import cats._ import cats.implicits._ - import com.amazonaws.services.s3.AmazonS3 import com.snowplowanalytics.snowplow.scalatracker.Tracker @@ -27,8 +25,10 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker // This project import config.CliConfig import LoaderA._ +import LoaderError.LoaderLocalError import loaders.Common.SqlString -import implementations.{S3Interpreter, TrackerInterpreter} +import implementations.{S3Interpreter, TrackerInterpreter, ManifestInterpreter} + /** * Interpreter performs all actual side-effecting work, @@ -53,6 +53,9 @@ class DryRunInterpreter private[interpreters]( */ private val cache = collection.mutable.HashMap.empty[String, Option[S3.Key]] + lazy val manifest = + ManifestInterpreter.initialize(cliConfig.target.processingManifest, cliConfig.configYaml.aws.s3.region) + def getDryRunLogs: String = { val sleep = s"Consistency check sleep time: $sleepTime\n" val queries = @@ -79,6 +82,15 @@ class DryRunInterpreter private[interpreters]( case DownloadData(source, dest) => logMessages.append(s"Downloading data from [$source] to [$dest]") List.empty[Path].asRight[LoaderError] + case ManifestDiscover(predicate) => + for { + optionalManifest <- manifest + manifestClient <- optionalManifest match { + case Some(manifestClient) => manifestClient.asRight + case None => LoaderLocalError("Processing Manifest is not configured").asLeft + } + result <- ManifestInterpreter.getUnprocessed(manifestClient, predicate) + } yield result case ExecuteQuery(query) => logQueries.append(query) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala index 5bab80680..13bd80808 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala @@ -18,14 +18,11 @@ import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes import scala.util.control.NonFatal - import cats._ import cats.implicits._ - import com.amazonaws.services.s3.AmazonS3 - +import com.snowplowanalytics.snowplow.rdbloader.interpreters.implementations.ManifestInterpreter import org.joda.time.DateTime - import com.snowplowanalytics.snowplow.scalatracker.Tracker // This project @@ -58,6 +55,9 @@ class RealWorldInterpreter private[interpreters]( // lazy to wait before tunnel established private lazy val dbConnection = PgInterpreter.getConnection(cliConfig.target) + lazy val manifest = + ManifestInterpreter.initialize(cliConfig.target.processingManifest, cliConfig.configYaml.aws.s3.region) + private val messages = collection.mutable.ListBuffer.empty[String] def log(message: String) = { @@ -76,6 +76,16 @@ class RealWorldInterpreter private[interpreters]( S3Interpreter.keyExists(amazonS3, key) case DownloadData(source, dest) => S3Interpreter.downloadData(amazonS3, source, dest) + case ManifestDiscover(predicate) => + for { + optionalManifest <- manifest + manifestClient <- optionalManifest match { + case Some(manifestClient) => manifestClient.asRight + case None => LoaderLocalError("Processing Manifest is not configured").asLeft + } + result <- ManifestInterpreter.getUnprocessed(manifestClient, predicate) + } yield result + case ExecuteQuery(query) => val result = for { diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala new file mode 100644 index 000000000..442b129d7 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/ManifestInterpreter.scala @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.interpreters.implementations + +import cats.implicits._ + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder + +import com.snowplowanalytics.manifest.core.ProcessingManifest +import com.snowplowanalytics.manifest.core.ProcessingManifest.{Item, ManifestError} +import com.snowplowanalytics.manifest.dynamodb.DynamoDbManifest + +import com.snowplowanalytics.snowplow.rdbloader.LoaderError +import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget.ProcessingManifestConfig + +import scala.util.control.NonFatal + +object ManifestInterpreter { + + type ManifestE[A] = Either[ManifestError, A] + + def initialize(manifestConfig: Option[ProcessingManifestConfig], emrRegion: String): Either[LoaderError, Option[DynamoDbManifest[ManifestE]]] = { + try { + manifestConfig.map { config => + val dynamodbClient = AmazonDynamoDBClientBuilder.standard().withRegion(emrRegion).build() + DynamoDbManifest[ManifestE](dynamodbClient, config.amazonDynamoDb.tableName) + }.asRight[LoaderError] + } catch { + case NonFatal(e) => + val error: LoaderError = + LoaderError.LoaderLocalError(s"Cannot initialize DynamoDB client for processing manifest, ${e.toString}") + error.asLeft[Option[DynamoDbManifest[ManifestE]]] + } + } + + def getUnprocessed(manifest: ProcessingManifest[ManifestE], + predicate: Item => Boolean): Either[LoaderError, List[Item]] = { + manifest.unprocessed(predicate).leftMap(LoaderError.fromManifestError) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala index b047653e2..764acd93a 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2017 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala index 10bc78c79..2b28809ae 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala @@ -17,6 +17,7 @@ import shapeless.tag import shapeless.tag._ // This project +import discovery.DataDiscovery import config.{ CliConfig, Step } import config.StorageTarget.{ PostgresqlConfig, RedshiftConfig } @@ -81,22 +82,24 @@ object Common { val region = cliConfig.configYaml.aws.s3.region val assets = cliConfig.configYaml.aws.s3.buckets.jsonpathAssets - val folder = cliConfig.folder match { - case Some(f) => + val target = (cliConfig.target.processingManifest, cliConfig.folder) match { + case (None, Some(f)) => DataDiscovery.InSpecificFolder(f) - case None => - DataDiscovery.InShreddedGood(cliConfig.configYaml.aws.s3.buckets.shredded.good) + case (Some(_), f) => + DataDiscovery.ViaManifest(f) + case (None, None) => + DataDiscovery.Global(cliConfig.configYaml.aws.s3.buckets.shredded.good) } cliConfig.target match { case _: RedshiftConfig => - val original = DataDiscovery.discoverFull(folder, shredJob, region, assets) + val original = DataDiscovery.discoverFull(target, shredJob, region, assets) if (cliConfig.steps.contains(Step.ConsistencyCheck)) DataDiscovery.checkConsistency(original) else original case _: PostgresqlConfig => // Safe to skip consistency check as whole folder will be downloaded - DataDiscovery.discoverAtomic(folder) + DataDiscovery.discoverAtomic(target) } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala index 8f32d2e04..2280ddebf 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala @@ -21,6 +21,7 @@ import cats.free.Free import LoaderA._ import config.Step import config.StorageTarget.PostgresqlConfig +import discovery.DataDiscovery object PostgresqlLoader { diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala index fb11b431c..1e4b26166 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala @@ -17,7 +17,7 @@ import cats.implicits._ // This project import Common._ -import DataDiscovery.{AtomicDiscovery, FullDiscovery} +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import config.{SnowplowConfig, Step} import config.StorageTarget.RedshiftConfig @@ -60,26 +60,29 @@ object RedshiftLoadStatements { else { val init = discoveries.map(getStatements(config, target, steps)).reverse val vacuum: Option[List[SqlString]] = - init.map(_.vacuum).sequence.map { statements => statements.flatten.distinct } + init.map(_.vacuum).sequence.map(uniqStatements) val analyze: Option[List[SqlString]] = - init.map(_.analyze).sequence.map { statements => statements.flatten.distinct } + init.map(_.analyze).sequence.map(uniqStatements) val cleaned = init.map { statements => statements.copy(vacuum = None, analyze = None) } val result = cleaned.head.copy(vacuum = vacuum, analyze = analyze) :: cleaned.tail result.reverse } } + private def uniqStatements[A](lists: List[List[A]]): List[A] = + lists.flatten.distinct + /** * Transform discovery results into group of load statements (atomic, shredded, etc) * More than one `RedshiftLoadStatements` must be grouped with others using `buildQueue` */ private def getStatements(config: SnowplowConfig, target: RedshiftConfig, steps: Set[Step])(discovery: DataDiscovery): RedshiftLoadStatements = { discovery match { - case discovery: FullDiscovery => + case discovery: DataDiscovery.FullDiscovery => val shreddedStatements = discovery.shreddedTypes.map(transformShreddedType(config, target, _)) val atomic = RedshiftLoadStatements.buildCopyFromTsvStatement(config, target, discovery.atomicEvents) buildLoadStatements(target, steps, atomic, shreddedStatements, discovery.base) - case _: AtomicDiscovery => + case _: DataDiscovery.AtomicDiscovery => val atomic = RedshiftLoadStatements.buildCopyFromTsvStatement(config, target, discovery.atomicEvents) buildLoadStatements(target, steps, atomic, Nil, discovery.base) } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala index 918612b1b..107fc32bf 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala @@ -20,6 +20,7 @@ import cats.implicits._ import LoaderA._ import RedshiftLoadStatements._ import Common.SqlString +import discovery.DataDiscovery import config.{ SnowplowConfig, Step } import config.StorageTarget.RedshiftConfig diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala index 5bb5fc132..32aa7d40e 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala @@ -12,7 +12,7 @@ */ package com.snowplowanalytics.snowplow -import cats.Functor +import cats._ import cats.data._ import cats.free.Free import cats.implicits._ @@ -20,6 +20,12 @@ import cats.implicits._ import rdbloader.LoaderError.DiscoveryFailure package object rdbloader { + + // RDB Loader's algebra defines hierarchy with three types common for all modules + // * Action[A] - IO substitution, end-of-the-world type + // * LoaderAction[A] - Validated and short-circuiting version of Action, equal to exception + // * ActionE[A] - Non-short-circuiting version of LoaderAction for results that can be recovered + /** * Main RDB Loader type. Represents all IO happening * during discovering, loading and monitoring. @@ -48,6 +54,9 @@ package object rdbloader { def liftA[A](action: Action[A]): LoaderAction[A] = EitherT(action.map(_.asRight[LoaderError])) + + def apply[A](actionE: ActionE[A]): LoaderAction[A] = + EitherT[Action, LoaderError, A](actionE) } /** Non-short-circuiting version of `TargetLoading` */ @@ -58,6 +67,23 @@ package object rdbloader { Free.pure(error.asLeft) } + /** + * Helper function to traverse multiple validated results inside a single `Action` + * + * @param f collection of results, e.g. `IO[List[Validated[Result]]]` + * @param ff helper function to transform end result, e.g. `ValidatedNel[String, A] => Either[String, A]` + * @tparam F outer action, such as `IO` + * @tparam G collection, such as `List` + * @tparam H inner-effect type, such as `Validation` + * @tparam J result effect, without constraints + * @tparam A result + * @return traversed and transformed action, where `H` replaced with `J` by `ff` + */ + def sequenceInF[F[_]: Functor, + G[_]: Traverse, + H[_]: Applicative, + J[_], A](f: F[G[H[A]]], ff: H[G[A]] => J[G[A]]): F[J[G[A]]] = + f.map(x => ff(x.sequence)) /** * IO-free result validation @@ -74,7 +100,6 @@ package object rdbloader { private[rdbloader] val DiscoveryAction = Functor[Action].compose[DiscoveryStep] - implicit class AggregateErrors[A, B](eithers: List[Either[A, B]]) { def aggregatedErrors: ValidatedNel[A, List[B]] = eithers.map(_.toValidatedNel).sequence diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala index 6fe83e557..49312a78b 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala @@ -73,7 +73,68 @@ object SpecHelpers { StorageTarget.PlainText("Supersecret1"), 1, 20000, + None, None) + /** + * Pretty prints a Scala value similar to its source represention. + * Particularly useful for case classes. + * @param a - The value to pretty print. + * @param indentSize - Number of spaces for each indent. + * @param maxElementWidth - Largest element size before wrapping. + * @param depth - Initial depth to pretty print indents. + * @author https://gist.github.com/carymrobbins/7b8ed52cd6ea186dbdf8 + */ + def prettyPrint(a: Any, indentSize: Int = 2, maxElementWidth: Int = 30, depth: Int = 0): String = { + val indent = " " * depth * indentSize + val fieldIndent = indent + (" " * indentSize) + val thisDepth = prettyPrint(_: Any, indentSize, maxElementWidth, depth) + val nextDepth = prettyPrint(_: Any, indentSize, maxElementWidth, depth + 1) + a match { + // Make Strings look similar to their literal form. + case s: String => + val replaceMap = Seq( + "\n" -> "\\n", + "\r" -> "\\r", + "\t" -> "\\t", + "\"" -> "\\\"" + ) + '"' + replaceMap.foldLeft(s) { case (acc, (c, r)) => acc.replace(c, r) } + '"' + // For an empty Seq just use its normal String representation. + case xs: Seq[_] if xs.isEmpty => xs.toString() + case xs: Seq[_] => + // If the Seq is not too long, pretty print on one line. + val resultOneLine = xs.map(nextDepth).toString() + if (resultOneLine.length <= maxElementWidth) return resultOneLine + // Otherwise, build it with newlines and proper field indents. + val result = xs.map(x => s"\n$fieldIndent${nextDepth(x)}").toString() + result.substring(0, result.length - 1) + "\n" + indent + ")" + // Product should cover case classes. + case p: Product => + val prefix = p.productPrefix + // We'll use reflection to get the constructor arg names and values. + val cls = p.getClass + val fields = cls.getDeclaredFields.filterNot(_.isSynthetic).map(_.getName) + val values = p.productIterator.toSeq + // If we weren't able to match up fields/values, fall back to toString. + if (fields.length != values.length) return p.toString + fields.zip(values).toList match { + // If there are no fields, just use the normal String representation. + case Nil => p.toString + // If there is just one field, let's just print it as a wrapper. + case (_, value) :: Nil => s"$prefix(${thisDepth(value)})" + // If there is more than one field, build up the field names and values. + case kvps => + val prettyFields = kvps.map { case (k, v) => s"$fieldIndent$k = ${nextDepth(v)}" } + // If the result is not too long, pretty print on one line. + val resultOneLine = s"$prefix(${prettyFields.mkString(", ")})" + if (resultOneLine.length <= maxElementWidth) return resultOneLine + // Otherwise, build it with newlines and proper field indents. + s"$prefix(\n${prettyFields.mkString(",\n")}\n$indent)" + } + // If we haven't specialized this type, just use its toString. + case _ => a.toString + } + } } diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala index 675b44bb9..936cf328c 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2017 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala index e26edb12c..68a5fd11b 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala @@ -82,6 +82,7 @@ class StorageTargetSpec extends Specification { def is = s2""" "atomic", "ADD HERE", StorageTarget.PlainText("ADD HERE"), + None, None) parseWithDefaultResolver(config).toEither must beRight(expected) @@ -122,6 +123,7 @@ class StorageTargetSpec extends Specification { def is = s2""" StorageTarget.PlainText("ADD HERE"), 1, 20000, + None, None) parseWithDefaultResolver(config).toEither must beRight(expected) @@ -182,7 +184,8 @@ class StorageTargetSpec extends Specification { def is = s2""" StorageTarget.PlainText("ADD HERE"), 1, 20000, - Some(tunnel)) + Some(tunnel), + None) parseWithDefaultResolver(config).toEither must beRight(expected) } @@ -225,6 +228,7 @@ class StorageTargetSpec extends Specification { def is = s2""" StorageTarget.EncryptedKey(StorageTarget.EncryptedConfig(StorageTarget.ParameterStoreConfig("snowplow.rdbloader.redshift.password"))), 1, 20000, + None, None) parseWithDefaultResolver(config).toEither must beRight(expected) diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscoverySpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala similarity index 97% rename from src/test/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscoverySpec.scala rename to src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala index 63c5b9916..1b59d32b6 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscoverySpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala @@ -11,18 +11,19 @@ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ package com.snowplowanalytics.snowplow.rdbloader +package discovery -import cats.{Id, ~>} import cats.data.State +import cats.{Id, ~>} import org.specs2.Specification +import LoaderError.{DiscoveryError, NoDataFailure} +import config.Semver +import discovery.ShreddedType._ import DataDiscovery._ -import ShreddedType._ -import LoaderError._ -import S3.Key.{coerce => s3key} import S3.Folder.{coerce => dir} -import config.Semver +import S3.Key.{coerce => s3key} class DataDiscoverySpec extends Specification { def is = s2""" @@ -101,7 +102,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" ) ) - val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) + val discoveryTarget = DataDiscovery.Global(shreddedGood) val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) val endResult = result.value.foldMap(interpreter) @@ -211,7 +212,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" ) ) - val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) + val discoveryTarget = DataDiscovery.Global(shreddedGood) val request = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) val result = DataDiscovery.checkConsistency(request) val (endState, endResult) = result.value.foldMap(interpreter).run(RealWorld(0, Nil)).value @@ -261,7 +262,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" val expected = List.empty[DataDiscovery] // The only difference with e3 - val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) + val discoveryTarget = DataDiscovery.Global(shreddedGood) val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) val endResult = result.value.foldMap(interpreter) @@ -318,7 +319,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" ) ) - val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) + val discoveryTarget = DataDiscovery.Global(shreddedGood) val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None).value val endResult = result.foldMap(interpreter) diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala new file mode 100644 index 000000000..e5da3cf8b --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader +package discovery + +import java.time.Instant +import java.util.UUID + +import scala.util.Random.shuffle +import cats._ +import cats.data.{State => _, _} +import cats.implicits._ +import com.snowplowanalytics.manifest.core.ProcessingManifest +import com.snowplowanalytics.manifest.core.ProcessingManifest._ +import com.snowplowanalytics.manifest.core.ProcessingManifest.ManifestError.{Corrupted, ParseError} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError.{DiscoveryError, ManifestFailure} +import com.snowplowanalytics.snowplow.rdbloader.config.Semver +import org.specs2.Specification + +class ManifestDiscoverySpec extends Specification { def is = s2""" + Return successful empty list for empty manifest $e1 + Return successful full discovery without shredded types $e2 + Return combined failure for invalid base path and invalid shredded type $e3 + Return multiple successfully discovered shredded types $e4 + Return multiple successfully discovered discoveries $e5 + """ + + def e1 = { + val action = ManifestDiscovery.discover("us-east-1", None) + val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(Nil)) + result must beRight(List.empty[Item]) + } + + def e2 = { + val time = Instant.now() + val base = S3.Folder.coerce("s3://folder") + val records = List( + Record(base, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00000"), StepState.New), time, "", None), + Record(base, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processing), time.plusSeconds(10), "", None), + Record(base, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processed), time.plusSeconds(20), "", None) + ) + + val action = ManifestDiscovery.discover("us-east-1", None) + val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(records)) + result must beRight(List( + DataDiscovery.FullDiscovery(base, 0, Nil) + )) + } + + def e3 = { + val time = Instant.now() + val payload = Some(Payload.empty.copy(set = Map("processed:shredder:types" -> Set("iglu:com.acme/event/jsonschema/0-0-1")))) + val records = List( + Record("invalidFolder", Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00000"), StepState.New), time, "", None), + Record("invalidFolder", Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processing), time.plusSeconds(10), "", None), + Record("invalidFolder", Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processed), time.plusSeconds(20), "", payload) + ) + + val action = ManifestDiscovery.discover("us-east-1", None) + val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(records)) + result must beLeft.like { + case DiscoveryError(List(ManifestFailure(Corrupted(ParseError(error))))) => + error must endingWith("Key [iglu:com.acme/event/jsonschema/0-0-1] is invalid Iglu URI, Path [invalidFolder] is not valid base for shredded type. Bucket name must start with s3:// prefix") + } + } + + def e4 = { + val time = Instant.now() + val base1 = S3.Folder.coerce("s3://snowplow-enriched-archive/shredded/good/run=2018-01-12-03-10-30") + val base2 = S3.Folder.coerce("s3://snowplow-enriched-archive/shredded/good/run=2018-01-12-03-20-30") + val payload1 = Some(Payload.empty.copy(set = Map("processed:shredder:types" -> Set("iglu:com.acme/event/jsonschema/1-0-1")))) + val payload2 = Some(Payload.empty.copy(set = Map("processed:shredder:types" -> Set("iglu:com.acme/context/jsonschema/1-0-0", "iglu:com.acme/context/jsonschema/1-0-1")))) + val records = List( + Record(base1, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00000"), StepState.New), time, "", None), + Record(base1, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processing), time.plusSeconds(10), "", None), + Record(base1, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processed), time.plusSeconds(20), "", payload1), + Record(base1, Application("snowplow-rdb-loader", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00002"), StepState.Processing), time.plusSeconds(30), "", None), + Record(base1, Application("snowplow-rdb-loader", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00002"), StepState.Processed), time.plusSeconds(30), "", None), + + Record(base2, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00003"), StepState.New), time, "", None), + Record(base2, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00004"), StepState.Processing), time.plusSeconds(50), "", None), + Record(base2, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00004"), StepState.Processed), time.plusSeconds(60), "", payload2) + ) + + val action = ManifestDiscovery.discover("us-east-1", None) + val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(records)) + result must beRight(List( + DataDiscovery.FullDiscovery(base2, 0, List( + ShreddedType( + ShreddedType.Info(base2, "com.acme", "context", 1, Semver(0,13,0)), + S3.Key.coerce("s3://jsonpaths-assets/com.acme/context_1.json") + ) + )) + )) + } + + def e5 = { + val time = Instant.now() + val base1 = S3.Folder.coerce("s3://snowplow-enriched-archive/shredded/good/run=2018-01-12-03-10-30") + val base2 = S3.Folder.coerce("s3://snowplow-enriched-archive/shredded/good/run=2018-01-12-03-20-30") + val base3 = S3.Folder.coerce("s3://snowplow-enriched-archive/shredded/good/run=2018-01-12-03-30-30") + val payload1 = Some(Payload.empty.copy(set = Map("processed:shredder:types" -> Set("iglu:com.acme/event/jsonschema/1-0-1")))) + val payload2 = Some(Payload.empty.copy(set = Map("processed:shredder:types" -> Set("iglu:com.acme/context/jsonschema/1-0-0", "iglu:com.acme/context/jsonschema/1-0-1")))) + val records = List( + Record(base1, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00000"), StepState.New), time, "", None), + Record(base1, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processing), time.plusSeconds(10), "", None), + Record(base1, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00001"), StepState.Processed), time.plusSeconds(20), "", payload1), + + Record(base2, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00002"), StepState.New), time, "", None), + Record(base2, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00003"), StepState.Processing), time.plusSeconds(10), "", None), + Record(base2, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00003"), StepState.Processed), time.plusSeconds(20), "", payload1), + + Record(base3, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00004"), StepState.New), time, "", None), + Record(base3, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00005"), StepState.Processing), time.plusSeconds(50), "", None), + Record(base3, Application("snowplow-rdb-shredder", "0.13.0", None), State(UUID.fromString("7c96c841-fc38-437d-bfec-4c1cd9b00005"), StepState.Processed), time.plusSeconds(60), "", payload2) + ) + + val expected = List( + DataDiscovery.FullDiscovery(base1, 0, List( + ShreddedType( + ShreddedType.Info(base1, "com.acme", "event", 1, Semver(0,13,0)), + S3.Key.coerce("s3://jsonpaths-assets-other/com.acme/event_1.json") + ) + )), + DataDiscovery.FullDiscovery(base2, 0, List( + ShreddedType( + ShreddedType.Info(base2, "com.acme", "event", 1, Semver(0,13,0)), + S3.Key.coerce("s3://jsonpaths-assets-other/com.acme/event_1.json") + ) + )), + DataDiscovery.FullDiscovery(base3, 0, List( + ShreddedType( + ShreddedType.Info(base3, "com.acme", "context", 1, Semver(0,13,0)), + S3.Key.coerce("s3://jsonpaths-assets/com.acme/context_1.json") + ) + )) + ) + + val action = ManifestDiscovery.discover("us-east-1", None) + val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(records)) + result must beRight.like { + case list => list must containTheSameElementsAs(expected) + } + } + +} + +object ManifestDiscoverySpec { + + type F[A] = Either[ManifestError, A] + + def interpreter(records: List[Record]): LoaderA ~> Id = new (LoaderA ~> Id) { + val manifest = ManifestDiscoverySpec.InMemoryManifest(records) + def apply[A](effect: LoaderA[A]): Id[A] = { + effect match { + case LoaderA.ManifestDiscover(predicate) => + manifest.unprocessed(predicate).leftMap(LoaderError.fromManifestError) + + case LoaderA.Get("com.acme/context_1.json") => + S3.Key.coerce("s3://jsonpaths-assets/com.acme/context_1.json").some.some + + case LoaderA.Get("com.acme/event_1.json") => + S3.Key.coerce("s3://jsonpaths-assets-other/com.acme/event_1.json").some.some + + case action => + throw new RuntimeException(s"Unexpected Action [$action]") + } + } + } + + case class InMemoryManifest(records: List[Record]) extends ProcessingManifest[F] { + + val stateBuffer = collection.mutable.ListBuffer(records: _*) + + def mixed: List[ProcessingManifest.Record] = shuffle(stateBuffer.toList) + + def getItem(id: ItemId): Either[ManifestError, Option[Item]] = { + val map = mixed.groupBy(_.id).map { case (i, r) => (i, Item(NonEmptyList.fromListUnsafe(r))) } + Right(map.get(id)) + } + + def put(id: ItemId, app: Application, state: State, payload: Option[Payload]): Either[ManifestError, Instant] = + Right { + val time = Instant.now() + stateBuffer += Record(id, app, state, time, "0.1.0", payload) + time + } + + def list: Either[ManifestError, List[Record]] = Right(mixed) + } +} diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedTypeSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala similarity index 97% rename from src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedTypeSpec.scala rename to src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala index a8829c95d..510da28a7 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedTypeSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala @@ -11,17 +11,16 @@ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ package com.snowplowanalytics.snowplow.rdbloader - -import org.scalacheck.Gen +package discovery import cats.implicits._ -import org.specs2.Specification -import org.specs2.ScalaCheck +import org.scalacheck.Gen +import org.specs2.{ScalaCheck, Specification} // This project -import ShreddedType._ -import config.Semver +import com.snowplowanalytics.snowplow.rdbloader.config.Semver +import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType._ object ShreddedTypeSpec { diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala index 6d59bd6ad..462bbf486 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala @@ -18,6 +18,7 @@ import org.specs2.Specification // This project import S3.Folder +import discovery.DataDiscovery import config.{ Step, StorageTarget } class CommonSpec extends Specification { def is = s2""" @@ -52,7 +53,8 @@ class CommonSpec extends Specification { def is = s2""" StorageTarget.PlainText("Supersecret1"), 100, 1000L, - Some(TunnelInput)) + Some(TunnelInput), + None) def interpreter: LoaderA ~> Id = new (LoaderA ~> Id) { def apply[A](effect: LoaderA[A]): Id[A] = { diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala index d82c16644..1a0753ce1 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala @@ -14,7 +14,7 @@ package com.snowplowanalytics.snowplow.rdbloader package loaders import cats.{Id, ~>} - +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import org.specs2.Specification // This project