From c149fd5ae466b014a8a012cfc432194c12c7eaf5 Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Sun, 5 May 2019 22:49:33 +0100 Subject: [PATCH] RDB Loader: add tabular output loading (close #152) --- build.sbt | 23 ++++ .../rdbloader/common/Flattening.scala | 71 ++++++++++ project/BuildSettings.scala | 1 - project/Dependencies.scala | 1 + .../spark/EventUtils.scala | 39 +----- .../snowplow/rdbloader/LoaderA.scala | 10 ++ .../snowplow/rdbloader/LoaderError.scala | 106 ++------------ .../snowplow/rdbloader/Main.scala | 3 +- .../snowplow/rdbloader/S3.scala | 1 + .../rdbloader/config/StorageTarget.scala | 18 +-- .../snowplow/rdbloader/db/Decoder.scala | 65 +++++++++ .../snowplow/rdbloader/db/Entities.scala | 8 +- .../snowplow/rdbloader/db/Migration.scala | 129 ++++++++++++++++++ .../rdbloader/discovery/DataDiscovery.scala | 30 +++- .../discovery/DiscoveryFailure.scala | 116 ++++++++++++++++ .../discovery/ManifestDiscovery.scala | 6 +- .../rdbloader/discovery/ShreddedType.scala | 120 ++++++++++------ .../interpreters/DryRunInterpreter.scala | 17 ++- .../interpreters/RealWorldInterpreter.scala | 33 +++-- .../implementations/JdbcInterpreter.scala | 7 +- .../implementations/S3Interpreter.scala | 3 +- .../loaders/RedshiftLoadStatements.scala | 29 ++-- .../snowplow/rdbloader/package.scala | 2 +- .../snowplow/rdbloader/utils/Common.scala | 3 +- .../snowplow/rdbloader/TestInterpreter.scala | 80 +++++++++++ .../snowplow/rdbloader/db/MigrationSpec.scala | 89 ++++++++++++ .../discovery/DataDiscoverySpec.scala | 36 ++--- .../discovery/ManifestDiscoverySpec.scala | 14 +- .../discovery/ShreddedTypeSpec.scala | 123 ++++++++++------- .../loaders/RedshiftLoaderSpec.scala | 4 +- 30 files changed, 868 insertions(+), 319 deletions(-) create mode 100644 common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala create mode 100644 src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala create mode 100644 src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DiscoveryFailure.scala create mode 100644 src/test/scala/com/snowplowanalytics/snowplow/rdbloader/TestInterpreter.scala create mode 100644 src/test/scala/com/snowplowanalytics/snowplow/rdbloader/db/MigrationSpec.scala diff --git a/build.sbt b/build.sbt index 48ca6bb41..ed55911bb 100755 --- a/build.sbt +++ b/build.sbt @@ -11,6 +11,25 @@ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ +lazy val common = project.in(file("common")) + .settings(Seq( + name := "snowplow-rdb-loader-common" + )) + .settings(BuildSettings.buildSettings) + .settings(resolvers ++= Dependencies.resolutionRepos) + .settings( + libraryDependencies ++= Seq( + Dependencies.igluClient, + Dependencies.igluCoreCirce, + Dependencies.scalaTracker, + Dependencies.scalaTrackerEmit, + Dependencies.circeGeneric, + Dependencies.circeGenericExtra, + Dependencies.circeLiteral, + Dependencies.schemaDdl + ) + ) + lazy val loader = project.in(file(".")) .settings( name := "snowplow-rdb-loader", @@ -26,6 +45,7 @@ lazy val loader = project.in(file(".")) libraryDependencies ++= Seq( Dependencies.decline, Dependencies.igluClient, + Dependencies.igluCore, Dependencies.igluCoreCirce, Dependencies.scalaTracker, Dependencies.scalaTrackerEmit, @@ -36,6 +56,7 @@ lazy val loader = project.in(file(".")) Dependencies.circeLiteral, Dependencies.manifest, Dependencies.fs2, + Dependencies.schemaDdl, Dependencies.postgres, Dependencies.redshift, @@ -50,6 +71,7 @@ lazy val loader = project.in(file(".")) Dependencies.scalaCheck ) ) + .dependsOn(common) lazy val shredder = project.in(file("shredder")) .settings( @@ -92,3 +114,4 @@ lazy val shredder = project.in(file("shredder")) "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.2" ) ) + .dependsOn(common) diff --git a/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala new file mode 100644 index 000000000..939cba8d5 --- /dev/null +++ b/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Flattening.scala @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.common + +import io.circe.Json + +import cats.Monad +import cats.data.EitherT +import cats.syntax.either._ +import cats.effect.Clock + +import com.snowplowanalytics.iglu.core._ +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.iglu.client.Resolver +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup +import com.snowplowanalytics.iglu.client.ClientError.ResolutionError + +import com.snowplowanalytics.iglu.schemaddl.IgluSchema +import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ + +object Flattening { + /** + * Error specific to shredding JSON instance into tabular format + * `SchemaList` is unavailable (in case no Iglu Server hosts this schemas) + * Particular schema could not be fetched, thus whole flattening algorithm cannot be built + */ + sealed trait FlatteningError + object FlatteningError { + case class SchemaListResolution(error: ResolutionError) extends FlatteningError + case class SchemaResolution(error: ResolutionError) extends FlatteningError + case class Parsing(error: String) extends FlatteningError + } + + // Cache = Map[SchemaKey, OrderedSchemas] + + def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FlatteningError, OrderedSchemas] = + getOrdered(resolver, key.vendor, key.name, key.version.model) + + def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FlatteningError, OrderedSchemas] = + for { + schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(vendor, name, Some(model))).leftMap(FlatteningError.SchemaListResolution) + ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver)) + } yield ordered + + def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FlatteningError, IgluSchema] = + for { + json <- EitherT(resolver.lookupSchema(key, 2)).leftMap(FlatteningError.SchemaResolution) + schema <- EitherT.fromEither(parseSchema(json)) + } yield schema + + /** Parse JSON into self-describing schema, or return `FlatteningError` */ + private def parseSchema(json: Json): Either[FlatteningError, IgluSchema] = + for { + selfDescribing <- SelfDescribingSchema.parse(json).leftMap(code => FlatteningError.Parsing(s"Cannot parse ${json.noSpaces} payload as self-describing schema, ${code.code}")) + parsed <- Schema.parse(selfDescribing.schema).toRight(FlatteningError.Parsing(s"Cannot parse ${selfDescribing.self.schemaKey.toSchemaUri} payload as JSON Schema")) + } yield SelfDescribingSchema(selfDescribing.self, parsed) + +} diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 9409508dc..86e3636f0 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -42,7 +42,6 @@ object BuildSettings { "-unchecked", "-Ywarn-unused-import", "-Ywarn-nullary-unit", - "-Xfatal-warnings", "-Xlint", "-Yinline-warnings", "-language:higherKinds", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 41ec90a84..3df735ee2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -58,6 +58,7 @@ object Dependencies { val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.scalaTracker val scalaTrackerEmit = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.scalaTracker val manifest = "com.snowplowanalytics" %% "snowplow-processing-manifest" % V.manifest + val igluCore = "com.snowplowanalytics" %% "iglu-core" % V.igluCore val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore val cats = "org.typelevel" %% "cats" % V.cats val catsFree = "org.typelevel" %% "cats-free" % V.cats diff --git a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala index 74c847ee4..1d5590af6 100644 --- a/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala +++ b/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala @@ -27,20 +27,18 @@ import cats.syntax.show._ import cats.effect.Clock import com.snowplowanalytics.iglu.core._ -import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.client.ClientError.ResolutionError -import com.snowplowanalytics.iglu.schemaddl.IgluSchema import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData -import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.{ getOrdered, FlatteningError } + object EventUtils { /** * Ready the enriched event for database load by removing a few JSON fields and truncating field @@ -84,18 +82,6 @@ object EventUtils { List(schema.vendor, schema.name, schema.format, schema.version.asString, rootId.toString, rootTstamp.formatted, "events", s"""["events","${schema.name}"]""", "events") - /** - * Error specific to shredding JSON instance into tabular format - * `SchemaList` is unavailable (in case no Iglu Server hosts this schemas) - * Particular schema could not be fetched, thus whole flattening algorithm cannot be built - */ - sealed trait FlatteningError - object FlatteningError { - case class SchemaListResolution(error: ResolutionError) extends FlatteningError - case class SchemaResolution(error: ResolutionError) extends FlatteningError - case class Parsing(error: String) extends FlatteningError - } - /** * Transform a self-desribing entity into tabular format, using its known schemas to get a correct order of columns * @param resolver Iglu resolver to get list of known schemas @@ -109,27 +95,6 @@ object EventUtils { private def escape(s: String): String = s.replace('\n', ' ').replace('\t', ' ') - // Cache = Map[SchemaKey, OrderedSchemas] - - def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey) = - for { - schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(key.vendor, key.name, Some(key.version.model))).leftMap(FlatteningError.SchemaListResolution) - ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver)) - } yield ordered - - def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FlatteningError, IgluSchema] = - for { - json <- EitherT(resolver.lookupSchema(key, 2)).leftMap(FlatteningError.SchemaResolution) - schema <- EitherT.fromEither(parseSchema(json)) - } yield schema - - /** Parse JSON into self-describing schema, or return `FlatteningError` */ - private def parseSchema(json: Json): Either[FlatteningError, IgluSchema] = - for { - selfDescribing <- SelfDescribingSchema.parse(json).leftMap(code => FlatteningError.Parsing(s"Cannot parse ${json.noSpaces} payload as self-describing schema, ${code.code}")) - parsed <- Schema.parse(selfDescribing.schema).toRight(FlatteningError.Parsing(s"Cannot parse ${selfDescribing.self.schemaKey.toSchemaUri} payload as JSON Schema")) - } yield SelfDescribingSchema(selfDescribing.self, parsed) - /** Get maximum length for a string value */ private def getLength(schema: Schema): Option[Int] = schema.maxLength.map(_.value.toInt) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala index 214665e3f..13aad1e1d 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala @@ -20,6 +20,8 @@ import cats.implicits._ import com.snowplowanalytics.manifest.core.{ Item, Application } +import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList + // This library import Security.Tunnel import loaders.Common.SqlString @@ -71,6 +73,9 @@ object LoaderA { // Security ops case class GetEc2Property(name: String) extends LoaderA[Either[LoaderError, String]] + // Iglu ops + case class GetSchemas(vendor: String, name: String, model: Int) extends LoaderA[Either[LoaderError, SchemaList]] + def listS3(bucket: S3.Folder): Action[Either[LoaderError, List[S3.BlobObject]]] = Free.liftF[LoaderA, Either[LoaderError, List[S3.BlobObject]]](ListS3(bucket)) @@ -171,5 +176,10 @@ object LoaderA { /** Retrieve decrypted property from EC2 Parameter Store */ def getEc2Property(name: String): Action[Either[LoaderError, String]] = Free.liftF[LoaderA, Either[LoaderError, String]](GetEc2Property(name)) + + + /** Retrieve list of schemas from Iglu Server */ + def getSchemas(vendor: String, name: String, model: Int): Action[Either[LoaderError, SchemaList]] = + Free.liftF[LoaderA, Either[LoaderError, SchemaList]](GetSchemas(vendor, name, model)) } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala index a98e37e02..06d628403 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala @@ -17,7 +17,8 @@ import cats.implicits._ import cats.data.ValidatedNel import com.snowplowanalytics.manifest.core.ManifestError -import com.snowplowanalytics.manifest.core.ManifestError._ + +import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure /** Root error type */ sealed trait LoaderError @@ -31,6 +32,7 @@ object LoaderError { case l: StorageTargetError => "Data loading error " + l.message case l: LoaderLocalError => "Internal Exception " + l.message case m: LoadManifestError => "Load Manifest: " + m.message + case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}" } } @@ -58,110 +60,22 @@ object LoaderError { /** `atomic.manifest` prevents this folder to be loaded */ case class LoadManifestError(message: String) extends LoaderError - /** - * Discovery failure. Represents failure of single step. - * Multiple failures can be aggregated into `DiscoveryError`, - * which is top-level `LoaderError` - */ - sealed trait DiscoveryFailure { - def getMessage: String - } - - /** - * Cannot find JSONPaths file - */ - case class JsonpathDiscoveryFailure(jsonpathFile: String) extends DiscoveryFailure { - def getMessage: String = - s"JSONPath file [$jsonpathFile] was not found" - } - - /** - * Cannot find `atomic-events` folder on S3 - */ - case class AtomicDiscoveryFailure(path: String) extends DiscoveryFailure { - def getMessage: String = - s"Folder with atomic-events was not found in [$path]" - } - - /** - * Cannot download file from S3 - */ - case class DownloadFailure(key: S3.Key, message: String) extends DiscoveryFailure { - def getMessage: String = - s"Cannot download S3 object [$key].\n$message" - } - - /** - * General S3 Exception - */ - case class S3Failure(error: String) extends DiscoveryFailure { - def getMessage = error - } - - /** - * Invalid path for S3 key - */ - case class ShreddedTypeKeyFailure(path: S3.Key) extends DiscoveryFailure { - def getMessage: String = - s"Cannot extract contexts or self-describing events from file [$path]. " + - s"Corrupted shredded/good state or unexpected Snowplow Shred job version" - } - - /** - * No data, while it **must** be present. Happens only with passed `--folder`, because on - * global discovery folder can be empty e.g. due eventual consistency - * @param path path, where data supposed to be found - */ - case class NoDataFailure(path: S3.Folder) extends DiscoveryFailure { - def getMessage: String = - s"No data discovered in [$path], while RDB Loader was explicitly pointed to it by '--folder' option. " + - s"Possible reasons: S3 eventual consistency or folder does not contain any files" - - // Message for enabled manifest - def getManifestMessage: String = - s"Processing manifest does not have unprocessed item [$path]. It can be there, but " + - "already loaded by RDB Loader or unprocessed by RDB Shredder" - } - - /** - * Cannot discovery shredded type in folder - */ - case class ShreddedTypeDiscoveryFailure(path: S3.Folder, invalidKeyCount: Int, example: S3.Key) extends DiscoveryFailure { - def getMessage: String = - s"Cannot extract contexts or self-describing events from directory [$path].\nInvalid key example: $example. Total $invalidKeyCount invalid keys.\nCorrupted shredded/good state or unexpected Snowplow Shred job version" - } - - case class ManifestFailure(manifestError: ManifestError) extends DiscoveryFailure { - def getMessage: String = manifestError.show - override def toString: String = getMessage - } - /** Turn non-empty list of discovery failures into top-level `LoaderError` */ def flattenValidated[A](validated: ValidatedNel[DiscoveryFailure, A]): Either[LoaderError, A] = validated.leftMap(errors => DiscoveryError(errors.toList): LoaderError).toEither def fromManifestError(manifestError: ManifestError): LoaderError = - DiscoveryError(ManifestFailure(manifestError)) - - /** Other errors */ - case class LoaderLocalError(message: String) extends LoaderError + DiscoveryFailure.ManifestFailure(manifestError).toLoaderError /** Exception wrapper to pass to processing manifest */ case class LoaderThrowable(origin: LoaderError) extends Throwable { override def getMessage: String = origin.show } - /** - * Aggregate some failures into more compact error-list to not pollute end-error - */ - def aggregateDiscoveryFailures(failures: List[DiscoveryFailure]): List[DiscoveryFailure] = { - val (shreddedTypeFailures, otherFailures) = failures.span(_.isInstanceOf[ShreddedTypeKeyFailure]) - val casted = shreddedTypeFailures.asInstanceOf[List[ShreddedTypeKeyFailure]] - val aggregatedByDir = casted.groupBy { failure => - S3.Key.getParent(failure.path) }.map { - case (k, v) => ShreddedTypeDiscoveryFailure(k, v.length, v.head.path) - }.toList - - aggregatedByDir ++ otherFailures - } + /** Other errors */ + case class LoaderLocalError(message: String) extends LoaderError + + /** Error happened during DDL-statements execution. Critical */ + case class MigrationError(message: String) extends LoaderError + } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala index 4455f7dc1..a99e98f41 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala @@ -12,6 +12,7 @@ */ package com.snowplowanalytics.snowplow.rdbloader +import cats.syntax.flatMap._ import cats.data.Validated._ // This project @@ -52,7 +53,7 @@ object Main { val interpreter = Interpreter.initialize(config) val actions: Action[Int] = for { - data <- discover(config).value + data <- discover(config).flatTap(db.Migration.perform(config.target.schema)).value result <- data match { case Right(discovery) => load(config, discovery).value case Left(LoaderError.StorageTargetError(message)) => diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala index a7d5e065d..c72fa8711 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala @@ -40,6 +40,7 @@ object S3 { case _ => coerce(s).asRight } + /** Turn proper `s3://bucket/path/` string into `Folder` */ def coerce(s: String): Folder = apply(appendTrailingSlash(fixPrefix(s)).asInstanceOf[Folder]) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala index e65dd06ea..97dedf7e9 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala @@ -55,8 +55,6 @@ sealed trait StorageTarget extends Product with Serializable { def shreddedTable(tableName: String): String = s"$schema.$tableName" - def purpose: StorageTarget.Purpose - def sshTunnel: Option[StorageTarget.TunnelConfig] } @@ -68,17 +66,9 @@ object StorageTarget { case object VerifyCa extends SslMode { def asString = "VERIFY_CA" } case object VerifyFull extends SslMode { def asString = "VERIFY_FULL" } - sealed trait Purpose extends StringEnum - case object DuplicateTracking extends Purpose { def asString = "DUPLICATE_TRACKING" } - case object FailedEvents extends Purpose { def asString = "FAILED_EVENTS" } - case object EnrichedEvents extends Purpose { def asString = "ENRICHED_EVENTS" } - implicit val sslModeDecoder: Decoder[SslMode] = decodeStringEnum[SslMode] - implicit val purposeDecoder: Decoder[Purpose] = - decodeStringEnum[Purpose] - /** * Configuration to access Snowplow Processing Manifest * @param amazonDynamoDb Amazon DynamoDB table, the single available implementation @@ -104,9 +94,7 @@ object StorageTarget { password: PasswordConfig, sshTunnel: Option[TunnelConfig], processingManifest: Option[ProcessingManifestConfig]) - extends StorageTarget { - val purpose = EnrichedEvents - } + extends StorageTarget /** * Redshift config @@ -126,9 +114,7 @@ object StorageTarget { compRows: Long, sshTunnel: Option[TunnelConfig], processingManifest: Option[ProcessingManifestConfig]) - extends StorageTarget { - val purpose = EnrichedEvents - } + extends StorageTarget /** * All possible JDBC according to Redshift documentation, except deprecated diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Decoder.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Decoder.scala index 836a3c5fa..e12c2ca9a 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Decoder.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Decoder.scala @@ -17,11 +17,14 @@ import java.sql.{ResultSet, SQLException} import cats.implicits._ +import com.snowplowanalytics.iglu.core.SchemaKey + import Decoder._ import Entities._ trait Decoder[A] { def decode(resultSet: ResultSet): Either[JdbcDecodeError, A] + def name: String } object Decoder { @@ -30,6 +33,7 @@ object Decoder { implicit val countDecoder: Decoder[Option[Count]] = new Decoder[Option[Count]] { + def name = "Option[Count]" final def decode(resultSet: ResultSet): Either[JdbcDecodeError, Option[Count]] = { var buffer: Count = null try { @@ -49,6 +53,7 @@ object Decoder { implicit val timestampDecoder: Decoder[Option[Timestamp]] = new Decoder[Option[Timestamp]] { + def name = "Option[Timestamp]" final def decode(resultSet: ResultSet): Either[JdbcDecodeError, Option[Timestamp]] = { var buffer: Timestamp = null try { @@ -68,6 +73,7 @@ object Decoder { implicit val manifestItemDecoder: Decoder[Option[LoadManifestItem]] = new Decoder[Option[LoadManifestItem]] { + def name = "Option[LoadManifestItem]" final def decode(resultSet: ResultSet): Either[JdbcDecodeError, Option[LoadManifestItem]] = { var buffer: LoadManifestItem = null try { @@ -94,4 +100,63 @@ object Decoder { } } } + + implicit val booleanDecoder: Decoder[Boolean] = + new Decoder[Boolean] { + def name = "Boolean" + final def decode(resultSet: ResultSet): Either[JdbcDecodeError, Boolean] = { + var buffer: Boolean = false + try { + if (resultSet.next()) { + val bool = resultSet.getBoolean(1) + buffer = bool + buffer.asRight[JdbcDecodeError] + } else false.asRight[JdbcDecodeError] + } catch { + case s: SQLException => + JdbcDecodeError(s.getMessage).asLeft[Boolean] + } finally { + resultSet.close() + } + } + } + + implicit val columnsDecoder: Decoder[Columns] = + new Decoder[Columns] { + def name = "Columns" + final def decode(resultSet: ResultSet): Either[JdbcDecodeError, Columns] = { + val buffer = collection.mutable.ListBuffer.empty[String] + try { + while (resultSet.next()) { + val col = resultSet.getString("column") + buffer.append(col) + } + Columns(buffer.toList).asRight + } catch { + case s: SQLException => + JdbcDecodeError(s.getMessage).asLeft[Columns] + } finally { + resultSet.close() + } + } + } + + implicit val tableStateDecoder: Decoder[TableState] = + new Decoder[TableState] { + def name = "TableState" + final def decode(resultSet: ResultSet): Either[JdbcDecodeError, TableState] = { + try { + if (resultSet.next()) { + val col = SchemaKey.fromUri(resultSet.getString(1)) + col.map(x => TableState(x)).leftMap(e => JdbcDecodeError(s"Table comment is not valid SchemaKey, ${e.code}")) + } else JdbcDecodeError("Table description is not available").asLeft + } catch { + case s: SQLException => + JdbcDecodeError(s.getMessage).asLeft[TableState] + } finally { + resultSet.close() + } + } + } + } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Entities.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Entities.scala index 956244810..e5da22e4a 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Entities.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Entities.scala @@ -12,7 +12,9 @@ */ package com.snowplowanalytics.snowplow.rdbloader.db -import java.sql.{ Timestamp => SqlTimestamp } +import java.sql.{Timestamp => SqlTimestamp} + +import com.snowplowanalytics.iglu.core.SchemaKey /** Different entities that are queried from database using `Decoder`s */ object Entities { @@ -29,4 +31,8 @@ object Entities { def show: String = s"ETL timestamp $etlTstamp with $eventCount events and $shreddedCardinality shredded types, commited at $commitTstamp" } + + case class Columns(names: List[String]) extends AnyVal + + case class TableState(version: SchemaKey) extends AnyVal } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala new file mode 100644 index 000000000..8b678c21f --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2014-2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.db + +import cats.data.EitherT +import cats.implicits._ + +import com.snowplowanalytics.iglu.core.{ SchemaKey, SchemaMap, SchemaVer } + +import com.snowplowanalytics.iglu.schemaddl.StringUtils +import com.snowplowanalytics.iglu.schemaddl.migrations.{ FlatSchema, Migration => DMigration, SchemaList => DSchemaList } +import com.snowplowanalytics.iglu.schemaddl.redshift.Ddl +import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlGenerator, MigrationGenerator} + +import com.snowplowanalytics.snowplow.rdbloader.{LoaderA, LoaderAction, LoaderError } +import com.snowplowanalytics.snowplow.rdbloader.db.Entities.{Columns, TableState} +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType, DiscoveryFailure} +import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.SqlString + +object Migration { + /** + * Perform all the machinery to check if any tables for tabular data do not match + * latest state on the Iglu Server. Create or update tables in that case. + * Do nothing in case there's only legacy JSON data + */ + def perform(dbSchema: String)(discoveries: List[DataDiscovery]): LoaderAction[Unit] = + discoveries.flatMap(_.shreddedTypes).traverse_ { + case ShreddedType.Tabular(ShreddedType.Info(_, vendor, name, model, _)) => + for { + schemas <- EitherT(LoaderA.getSchemas(vendor, name, model)) + tableName = StringUtils.getTableName(SchemaMap(SchemaKey(vendor, name, "jsonschema", SchemaVer.Full(model, 0, 0)))) + _ <- for { + exists <- tableExists(dbSchema, tableName) + _ <- if (exists) for { + description <- getVersion(dbSchema, tableName, schemas) + matches = schemas.latest.schemaKey == description.version + columns <- getColumns(dbSchema, tableName) + _ <- if (matches) LoaderAction.unit else updateTable(dbSchema, description.version, columns, schemas) + } yield () else createTable(dbSchema, tableName, schemas) + } yield () + } yield () + case ShreddedType.Json(_, _) => LoaderAction.unit + } + + /** Find the latest schema version in the table and confirm that it is the latest in `schemas` */ + def getVersion(dbSchema: String, tableName: String, latest: DSchemaList): LoaderAction[TableState] = { + val query = SqlString.unsafeCoerce(s"SELECT obj_description(oid) FROM pg_class WHERE relname = '$tableName'") + LoaderA.executeUpdate(SqlString.unsafeCoerce(s"SET SEARCH_PATH TO $dbSchema")) *> + LoaderA.executeQuery[TableState](query).leftMap(annotateError(dbSchema, tableName)) + } + + /** Check if table exists in `dbSchema` */ + def tableExists(dbSchema: String, table: String): LoaderAction[Boolean] = { + val query = SqlString.unsafeCoerce( + s""" + |SELECT EXISTS ( + | SELECT 1 + | FROM pg_tables + | WHERE schemaname = '$dbSchema' + | AND tablename = '$table') AS exists; + """.stripMargin) + + LoaderA.executeQuery[Boolean](query).leftMap(annotateError(dbSchema, table)) + } + + def createTable(dbSchema: String, name: String, schemas: DSchemaList): LoaderAction[Unit] = { + val subschemas = FlatSchema.extractProperties(schemas) + val tableName = StringUtils.getTableName(schemas.latest) + val ddl = DdlGenerator.generateTableDdl(subschemas, tableName, Some(dbSchema), 4096, false) + val comment = DdlGenerator.getTableComment(name, Some(dbSchema), schemas.latest) + LoaderA.print(s"Creating $dbSchema.$name table for ${comment.comment}").liftA *> + LoaderA.executeUpdate(ddl.toSql).void *> + LoaderA.executeUpdate(comment.toSql).void *> + LoaderA.print(s"Table created").liftA + } + + /** Update existing table specified by `current` into a final version present in `state` */ + def updateTable(dbSchema: String, current: SchemaKey, columns: Columns, state: DSchemaList): LoaderAction[Unit] = + state match { + case s: DSchemaList.Full => + val migrations = s.extractSegments.map(DMigration.fromSegment) + migrations.find(_.from == current.version) match { + case Some(relevantMigration) => + val ddlFile = MigrationGenerator.generateMigration(relevantMigration, 4096, Some(dbSchema)) + val ddl = SqlString.unsafeCoerce(ddlFile.render) + LoaderAction.liftA(ddlFile.warnings.traverse_(LoaderA.print)) *> + LoaderAction.liftA(LoaderA.print(s"Executing migration DDL statement: $ddl")) *> + LoaderA.executeUpdate(ddl).void + case None => + val message = s"Warning: Table's schema key '${current.toSchemaUri}' cannot be found in fetched schemas $state. Migration cannot be created" + LoaderAction.liftE[Unit](DiscoveryFailure.IgluError(message).toLoaderError.asLeft) + } + case _: DSchemaList.Single => + LoaderA.print(s"Warning: updateTable executed for a table with single schema\ncolumns: $columns\nstate: $state").liftA + } + + /** List all columns in the table */ + def getColumns(dbSchema: String, tableName: String): LoaderAction[Columns] = { + val setSchema = SqlString.unsafeCoerce(s"SET search_path TO $dbSchema;") + val getColumns = SqlString.unsafeCoerce(s"""SELECT "column" FROM PG_TABLE_DEF WHERE tablename = '$tableName';""") + for { + _ <- LoaderA.executeUpdate(setSchema) + columns <- LoaderA.executeQuery[Columns](getColumns).leftMap(annotateError(dbSchema, tableName)) + } yield columns + } + + private def annotateError(dbSchema: String, tableName: String)(error: LoaderError): LoaderError = + error match { + case LoaderError.StorageTargetError(message) => + LoaderError.StorageTargetError(s"$dbSchema.$tableName. " ++ message) + case other => + other + } + + private implicit class SqlDdl(ddl: Ddl) { + def toSql: SqlString = + SqlString.unsafeCoerce(ddl.toDdl) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala index 108f87f56..9f51df538 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala @@ -133,7 +133,7 @@ object DataDiscovery { val keys: LoaderAction[ValidatedDataKeys] = listGoodBucket(folder).map { keys => if (keys.isEmpty) { - val failure = Validated.Invalid(NonEmptyList(NoDataFailure(folder), Nil)) + val failure = Validated.Invalid(NonEmptyList(DiscoveryFailure.NoDataFailure(folder), Nil)) Free.pure(failure) } else transformKeys(shredJob, region, assets)(keys) } @@ -190,7 +190,7 @@ object DataDiscovery { case Validated.Valid(discovery) => discovery.asRight case Validated.Invalid(failures) => - val aggregated = LoaderError.aggregateDiscoveryFailures(failures.toList).distinct + val aggregated = DiscoveryFailure.aggregateDiscoveryFailures(failures).distinct DiscoveryError(aggregated).asLeft } } @@ -216,7 +216,7 @@ object DataDiscovery { val size = Some(atomicKeys.foldMap(_.size)) DataDiscovery(base, Some(atomicKeys.length), size, shreddedData, false, None).validNel } else { - AtomicDiscoveryFailure(base).invalidNel + DiscoveryFailure.AtomicDiscoveryFailure(base).invalidNel } } @@ -251,9 +251,11 @@ object DataDiscovery { case Right(ShreddedDataKeyIntermediate(fullPath, info)) => val jsonpathAction = EitherT(ShreddedType.discoverJsonPath(region, assets, info)) val discoveryAction = jsonpathAction.map { jsonpath => - ShreddedDataKeyFinal(fullPath, ShreddedType(info, jsonpath)) + ShreddedDataKeyFinal(fullPath, ShreddedType.Json(info, jsonpath)) } discoveryAction.value.map(_.toValidatedNel) + case Right(key @ ShreddedDataKeyTabular(_, _)) => + Free.pure(key.toFinal.validNel[DiscoveryFailure]) case Right(AtomicDataKey(fullPath, size)) => val pure: Action[ValidatedNel[DiscoveryFailure, DataKeyFinal]] = Free.pure(AtomicDataKey(fullPath, size).validNel[DiscoveryFailure]) @@ -278,8 +280,10 @@ object DataDiscovery { case Some(_) => AtomicDataKey(blobObject.key, blobObject.size).asRight case None => ShreddedType.transformPath(blobObject.key, shredJob) match { - case Right(info) => + case Right((false, info)) => ShreddedDataKeyIntermediate(blobObject.key, info).asRight + case Right((true, info)) => + ShreddedDataKeyTabular(blobObject.key, info).asRight case Left(e) => e.asLeft } } @@ -325,9 +329,10 @@ object DataDiscovery { discovered <- Free.pure(control.orElse(original)) } yield discovered case (Right(o), Right(c)) if o.sortBy(_.base.toString) == c.sortBy(_.base.toString) => - val message = o.map(x => s"+ ${x.show}").mkString("\n") + val found = o.map(x => s"+ ${x.show}").mkString("\n") + val message = if (found.isEmpty) "No run ids discovered" else s"Following run ids found:\n$found" for { - _ <- LoaderA.print(s"Consistency check passed after ${attempt - 1} attempt. Following run ids found:\n$message") + _ <- LoaderA.print(s"Consistency check passed after ${attempt - 1} attempt. " ++ message) discovered <- Free.pure(original) } yield discovered case (Right(o), Right(c)) => @@ -415,6 +420,17 @@ object DataDiscovery { */ private case class ShreddedDataKeyIntermediate(key: S3.Key, info: ShreddedType.Info) extends DataKeyIntermediate + /** Shredded key that doesn't need a JSONPath file and can be mapped to final */ + private case class ShreddedDataKeyTabular(key: S3.Key, info: ShreddedType.Info) extends DataKeyIntermediate { + def base: S3.Folder = { + val atomicEvents = S3.Key.getParent(key) + S3.Folder.getParent(atomicEvents) + } + + def toFinal: ShreddedDataKeyFinal = + ShreddedDataKeyFinal(key, ShreddedType.Tabular(info)) + } + /** * S3 key, representing intermediate shredded type file * It is final because shredded type proved to have JSONPath diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DiscoveryFailure.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DiscoveryFailure.scala new file mode 100644 index 000000000..47b0fe129 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DiscoveryFailure.scala @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.discovery + +import cats.data.NonEmptyList +import cats.syntax.show._ + +import com.snowplowanalytics.manifest.core.ManifestError + +import com.snowplowanalytics.snowplow.rdbloader.{S3, LoaderError} + +/** + * Discovery failure. Represents failure of single step. + * Multiple failures can be aggregated into `DiscoveryError`, + * which is top-level `LoaderError` + */ +sealed trait DiscoveryFailure { + def getMessage: String + + /** Cast into final `LoaderError` */ + def toLoaderError: LoaderError = + LoaderError.DiscoveryError(this) +} + +object DiscoveryFailure { + /** Cannot find JSONPaths file */ + case class JsonpathDiscoveryFailure(jsonpathFile: String) extends DiscoveryFailure { + def getMessage: String = + s"JSONPath file [$jsonpathFile] was not found" + } + + /** + * Cannot find `atomic-events` folder on S3 + */ + case class AtomicDiscoveryFailure(path: String) extends DiscoveryFailure { + def getMessage: String = + s"Folder with atomic-events was not found in [$path]" + } + + /** + * Cannot download file from S3 + */ + case class DownloadFailure(key: S3.Key, message: String) extends DiscoveryFailure { + def getMessage: String = + s"Cannot download S3 object [$key].\n$message" + } + + /** + * General S3 Exception + */ + case class S3Failure(error: String) extends DiscoveryFailure { + def getMessage = error + } + + /** Invalid path for S3 key */ + case class ShreddedTypeKeyFailure(path: S3.Key) extends DiscoveryFailure { + def getMessage: String = + s"Cannot extract contexts or self-describing events from file [$path]. " + + s"Corrupted shredded/good state or unexpected Snowplow Shred job version" + } + + case class IgluError(message: String) extends DiscoveryFailure { + def getMessage: String = message + } + + /** + * No data, while it **must** be present. Happens only with passed `--folder`, because on + * global discovery folder can be empty e.g. due eventual consistency + * @param path path, where data supposed to be found + */ + case class NoDataFailure(path: S3.Folder) extends DiscoveryFailure { + def getMessage: String = + s"No data discovered in [$path], while RDB Loader was explicitly pointed to it by '--folder' option. " + + s"Possible reasons: S3 eventual consistency or folder does not contain any files" + + // Message for enabled manifest + def getManifestMessage: String = + s"Processing manifest does not have unprocessed item [$path]. It can be there, but " + + "already loaded by RDB Loader or unprocessed by RDB Shredder" + } + + /** + * Cannot discovery shredded type in folder + */ + case class ShreddedTypeDiscoveryFailure(path: S3.Folder, invalidKeyCount: Int, example: S3.Key) extends DiscoveryFailure { + def getMessage: String = + s"Cannot extract contexts or self-describing events from directory [$path].\nInvalid key example: $example. Total $invalidKeyCount invalid keys.\nCorrupted shredded/good state or unexpected Snowplow Shred job version" + } + + case class ManifestFailure(manifestError: ManifestError) extends DiscoveryFailure { + def getMessage: String = manifestError.show + override def toString: String = getMessage + } + + /** Aggregate some failures into more compact error-list to not pollute end-error */ + def aggregateDiscoveryFailures(failures: NonEmptyList[DiscoveryFailure]): List[DiscoveryFailure] = { + val (shreddedTypeFailures, otherFailures) = failures.toList.span(_.isInstanceOf[DiscoveryFailure.ShreddedTypeKeyFailure]) + val casted = shreddedTypeFailures.asInstanceOf[List[DiscoveryFailure.ShreddedTypeKeyFailure]] + val aggregatedByDir = casted.groupBy { failure => + S3.Key.getParent(failure.path) }.map { + case (k, v) => DiscoveryFailure.ShreddedTypeDiscoveryFailure(k, v.length, v.head.path) + }.toList + + aggregatedByDir ++ otherFailures + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala index 2195c76c1..59bf6314e 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala @@ -105,7 +105,7 @@ object ManifestDiscovery { jsonpathAssets: Option[S3.Folder]): LoaderAction[DataDiscovery] = { val itemA: ActionE[Item] = LoaderA.manifestDiscover(getLoaderApp(id), ShredderApp, (folderPredicate(id, folder)(_)).some).map { case Right(h :: _) => h.asRight[LoaderError] - case Right(Nil) => DiscoveryError(NoDataFailure(folder) :: Nil).asLeft[Item] + case Right(Nil) => DiscoveryError(DiscoveryFailure.NoDataFailure(folder) :: Nil).asLeft[Item] case Left(error) => error.asLeft } @@ -127,12 +127,12 @@ object ManifestDiscovery { val processedRecord = findProcessed(shredderRecords) processedRecord.map(_.flatMap(parseRecord)) match { - case Some(either) => either.leftMap { error => DiscoveryError(ManifestFailure(error)) } + case Some(either) => either.leftMap { error => DiscoveryError(DiscoveryFailure.ManifestFailure(error)) } case None => // Impossible error-state if function is used on filtered `Item` before val message = s"Item [${item.id}] does not have 'PROCESSED' state for $ShredderName" val error: ManifestError = Corrupted(Corruption.InvalidContent(NonEmptyList.one(message))) - DiscoveryError(ManifestFailure(error)).asLeft + DiscoveryError(DiscoveryFailure.ManifestFailure(error)).asLeft } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala index e58e0ae53..7d59dea80 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala @@ -19,29 +19,16 @@ import cats.implicits._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SchemaCriterion} -import com.snowplowanalytics.snowplow.rdbloader.LoaderError._ import com.snowplowanalytics.snowplow.rdbloader.config.Semver import com.snowplowanalytics.snowplow.rdbloader.utils.Common.toSnakeCase -/** - * Container for S3 folder with shredded JSONs ready to load - * Usually it represents self-describing event or custom/derived context - * - * @param info raw metadata extracted from S3 Key - * @param jsonPaths existing JSONPaths file - */ -case class ShreddedType(info: ShreddedType.Info, jsonPaths: S3.Key) { +sealed trait ShreddedType { + /** raw metadata extracted from S3 Key */ + def info: ShreddedType.Info /** Get S3 prefix which Redshift should LOAD FROM */ - def getLoadPath: String = { - if (info.shredJob <= ShreddedType.ShredJobBeforeSparkVersion) { - s"${info.base}${info.vendor}/${info.name}/jsonschema/${info.model}-" - } else { - s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-" - } - } - + def getLoadPath: String /** Human-readable form */ - def show: String = s"${info.toCriterion.asString} ($jsonPaths)" + def show: String } /** @@ -49,6 +36,37 @@ case class ShreddedType(info: ShreddedType.Info, jsonPaths: S3.Key) { */ object ShreddedType { + /** + * Container for S3 folder with shredded JSONs ready to load with JSONPaths + * Usually it represents self-describing event or custom/derived context + * + * @param jsonPaths existing JSONPaths file + */ + case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType { + def getLoadPath: String = { + if (info.shredJob <= ShredJobBeforeSparkVersion) { + s"${info.base}${info.vendor}/${info.name}/jsonschema/${info.model}-" + } else { + s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-" + } + } + + def show: String = s"${info.toCriterion.asString} ($jsonPaths)" + } + + /** + * Container for S3 folder with shredded TSVs ready to load, without JSONPaths + * Usually it represents self-describing event or custom/derived context + * + * @param info raw metadata extracted from S3 Key + */ + case class Tabular(info: Info) extends ShreddedType { + def getLoadPath: String = + s"${info.base}shredded-tsv/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}" + + def show: String = s"${info.toCriterion.asString} TSV" + } + /** * Raw metadata that can be parsed from S3 Key. * It cannot be counted as "final" shredded type, @@ -74,9 +92,7 @@ object ShreddedType { */ val JsonpathsPath = "4-storage/redshift-storage/jsonpaths/" - /** - * Regex to extract `SchemaKey` from `shredded/good` - */ + /** Regex to extract `SchemaKey` from `shredded/good` */ val ShreddedSubpathPattern = ("""shredded\-types""" + """/vendor=(?[a-zA-Z0-9-_.]+)""" + @@ -84,12 +100,20 @@ object ShreddedType { """/format=(?[a-zA-Z0-9-_]+)""" + """/version=(?[1-9][0-9]*(?:-(?:0|[1-9][0-9]*)){2})$""").r - /** - * Version of legacy Shred job, where old path pattern was used - * `com.acme/event/jsonschema/1-0-0` - */ + /** Regex to extract `SchemaKey` from `shredded/good` */ + val ShreddedSubpathPatternTabular = + ("""shredded\-tsv""" + + """/vendor=(?[a-zA-Z0-9-_.]+)""" + + """/name=(?[a-zA-Z0-9-_]+)""" + + """/format=(?[a-zA-Z0-9-_]+)""" + + """/version=(?[1-9][0-9]*)$""").r + + /** Version of legacy Shred job, where old path pattern was used `com.acme/event/jsonschema/1-0-0` */ val ShredJobBeforeSparkVersion = Semver(0,11,0) + /** Version of legacy Shred job, where TSV output was not possible */ + val ShredJobBeforeTabularVersion = Semver(0,15,0) // TODO: is it + /** * vendor + name + format + version + filename */ @@ -117,7 +141,7 @@ object ShreddedType { case Some(Some(jsonPath)) => Free.pure(jsonPath.asRight) case Some(None) => - Free.pure(JsonpathDiscoveryFailure(key).asLeft) + Free.pure(DiscoveryFailure.JsonpathDiscoveryFailure(key).asLeft) case None => jsonpathAssets match { case Some(assets) => @@ -160,13 +184,9 @@ object ShreddedType { val s3Key = S3.Key.coerce(fullDir + key) LoaderA.keyExists(s3Key).flatMap { case true => - for { - _ <- LoaderA.putCache(key, Some(s3Key)) - } yield s3Key.asRight + LoaderA.putCache(key, Some(s3Key)).as(s3Key.asRight) case false => - for { - _ <- LoaderA.putCache(key, None) - } yield JsonpathDiscoveryFailure(key).asLeft + LoaderA.putCache(key, None).as(DiscoveryFailure.JsonpathDiscoveryFailure(key).asLeft) } } @@ -177,7 +197,7 @@ object ShreddedType { // Discover data for single item def discover(info: ShreddedType.Info): Action[ValidatedNel[DiscoveryFailure, ShreddedType]] = { val jsonpaths = ShreddedType.discoverJsonPath(region, jsonpathAssets, info) - val shreddedType = jsonpaths.map(_.map(s3key => ShreddedType(info, s3key))) + val shreddedType = jsonpaths.map(_.map(s3key => ShreddedType.Json(info, s3key))) shreddedType.map(_.toValidatedNel) } @@ -203,39 +223,55 @@ object ShreddedType { * * @param key valid S3 key * @param shredJob version of shred job to decide what path format should be present - * @return either discovery failure + * @return either discovery failure or info (which in turn can be tabular (true) or JSON (false)) */ - def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, Info] = { + def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (Boolean, Info)] = { val (bucket, path) = S3.splitS3Key(key) val (subpath, shredpath) = splitFilpath(path, shredJob) extractSchemaKey(shredpath, shredJob) match { - case Some(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _))) => + case Some(Extracted.Legacy(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)))) => + val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath) + val result = Info(prefix, vendor, name, model, shredJob) + (false, result).asRight + case Some(Extracted.Tabular(vendor, name, _, model)) => val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath) val result = Info(prefix, vendor, name, model, shredJob) - result.asRight - case _ => - ShreddedTypeKeyFailure(key).asLeft + (true, result).asRight + case None => + DiscoveryFailure.ShreddedTypeKeyFailure(key).asLeft } } + sealed trait Extracted + object Extracted { + case class Legacy(key: SchemaKey) extends Extracted + case class Tabular(vendor: String, name: String, format: String, model: Int) extends Extracted + } + /** * Extract `SchemaKey` from subpath, which can be * legacy-style (pre-0.12.0) com.acme/schema-name/jsonschema/1-0-0 or * modern-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0 + * tsv-style (port-0.16.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1 * This function transforms any of above valid paths to `SchemaKey` * * @param subpath S3 subpath of four `SchemaKey` elements * @param shredJob shred job version to decide what format should be present * @return valid schema key if found */ - def extractSchemaKey(subpath: String, shredJob: Semver): Option[SchemaKey] = { + def extractSchemaKey(subpath: String, shredJob: Semver): Option[Extracted] = { if (shredJob <= ShredJobBeforeSparkVersion) { val uri = "iglu:" + subpath - SchemaKey.fromUri(uri).toOption + SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy) } else subpath match { case ShreddedSubpathPattern(vendor, name, format, version) => val uri = s"iglu:$vendor/$name/$format/$version" - SchemaKey.fromUri(uri).toOption + SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy) + case ShreddedSubpathPatternTabular(vendor, name, format, model) if shredJob >= ShredJobBeforeTabularVersion => + scala.util.Try(model.toInt).toOption match { + case Some(m) => Extracted.Tabular(vendor, name, format, m).some + case None => None + } case _ => None } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala index 5fba5b313..20b930a5c 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala @@ -39,9 +39,10 @@ import LoaderA._ import LoaderError.LoaderLocalError import Interpreter.runIO import loaders.Common.SqlString -import discovery.ManifestDiscovery +import discovery.{ ManifestDiscovery, DiscoveryFailure } import implementations.{S3Interpreter, TrackerInterpreter, ManifestInterpreter} import implementations.ManifestInterpreter.ManifestE +import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.getOrdered /** @@ -50,11 +51,10 @@ import implementations.ManifestInterpreter.ManifestE * It contains and handles configuration, connections and mutable state, * all real-world interactions, except argument parsing */ -class DryRunInterpreter private[interpreters]( - cliConfig: CliConfig, - amazonS3: AmazonS3, - tracker: Option[Tracker[Id]], - resolver: Client[Id, Json]) extends Interpreter { +class DryRunInterpreter private[interpreters](cliConfig: CliConfig, + amazonS3: AmazonS3, + tracker: Option[Tracker[Id]], + igluClient: Client[Id, Json]) extends Interpreter { private val logQueries = ListBuffer.empty[SqlString] private val logCopyFiles = ListBuffer.empty[Path] @@ -167,6 +167,11 @@ class DryRunInterpreter private[interpreters]( logMessages.append(s"Fetched imaginary EC2 [$name] property") Right(name + " key") + case GetSchemas(vendor, name, model) => + getOrdered(igluClient.resolver, vendor, name, model).leftMap { resolutionError => + val message = s"Cannot get schemas for iglu:$vendor/$name/jsonschema/$model-*-*\n$resolutionError" + LoaderError.DiscoveryError(DiscoveryFailure.IgluError(message)) + }.value } } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala index 47b4612a6..14c67472d 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala @@ -27,10 +27,10 @@ import io.circe.Json import com.amazonaws.services.s3.AmazonS3 -import com.snowplowanalytics.iglu.client.Client - import org.joda.time.DateTime +import com.snowplowanalytics.iglu.client.Client + import com.snowplowanalytics.snowplow.scalatracker.Tracker import com.snowplowanalytics.manifest.core.ManifestError @@ -40,11 +40,12 @@ import LoaderA._ import LoaderError.LoaderLocalError import Interpreter.runIO import config.CliConfig -import discovery.ManifestDiscovery +import discovery.{ ManifestDiscovery, DiscoveryFailure } import utils.Common import implementations._ import com.snowplowanalytics.snowplow.rdbloader.{ Log => ExitLog } import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.SqlString +import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.getOrdered /** * Interpreter performs all actual side-effecting work, @@ -52,11 +53,10 @@ import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.SqlString * It contains and handles configuration, connections and mutable state, * all real-world interactions, except argument parsing */ -class RealWorldInterpreter private[interpreters]( - cliConfig: CliConfig, - amazonS3: AmazonS3, - tracker: Option[Tracker[Id]], - resolver: Client[Id, Json]) extends Interpreter { +class RealWorldInterpreter private[interpreters](cliConfig: CliConfig, + amazonS3: AmazonS3, + tracker: Option[Tracker[Id]], + igluClient: Client[Id, Json]) extends Interpreter { private val interpreter = this @@ -76,7 +76,7 @@ class RealWorldInterpreter private[interpreters]( dbConnection = JdbcInterpreter.getConnection(cliConfig.target) } if (force) { - println("Forcing reconnection to DB") + System.out.println("Forcing reconnection to DB") dbConnection = JdbcInterpreter.getConnection(cliConfig.target) } dbConnection @@ -96,11 +96,14 @@ class RealWorldInterpreter private[interpreters]( private val messagesCopy = collection.mutable.ListBuffer.empty[String] def executeWithRetry[A](action: Connection => SqlString => Either[LoaderError.StorageTargetError, A])(sql: SqlString) = { - val firstAttempt = for { conn <- getConnection(); r <- action(conn)(sql) } yield r + val firstAttempt = for { + conn <- getConnection() + _ <- JdbcInterpreter.setAutocommit(conn, false) + r <- action(conn)(sql) + } yield r firstAttempt match { case Left(LoaderError.StorageTargetError(message)) if message.contains("Connection refused") => - println(message) - println("Sleeping and making another try") + System.out.println("Sleeping and making another try") Thread.sleep(10000) for { conn <- getConnection(true) @@ -223,6 +226,12 @@ class RealWorldInterpreter private[interpreters]( case GetEc2Property(name) => SshInterpreter.getKey(name) + + case GetSchemas(vendor, name, model) => + getOrdered(igluClient.resolver, vendor, name, model).leftMap { resolutionError => + val message = s"Cannot get schemas for iglu:$vendor/$name/jsonschema/$model-*-*\n$resolutionError" + LoaderError.DiscoveryError(DiscoveryFailure.IgluError(message)) + }.value } } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala index b62d39437..8c6478cd1 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/JdbcInterpreter.scala @@ -59,9 +59,7 @@ object JdbcInterpreter { val resultSet = conn.createStatement().executeQuery(sql) ev.decode(resultSet) match { case Left(e) => StorageTargetError(s"Cannot decode SQL row: ${e.message}").asLeft - case Right(a) => - println(a) - a.asRight[StorageTargetError] + case Right(a) => a.asRight[StorageTargetError] } } catch { case NonFatal(e) => @@ -110,14 +108,13 @@ object JdbcInterpreter { val password = target.password match { case StorageTarget.PlainText(text) => text case StorageTarget.EncryptedKey(StorageTarget.EncryptedConfig(key)) => - SshInterpreter.getKey(key.parameterName).getOrElse(throw new RuntimeException("Cannot retrieve JDBC password from EC2 Parameter Store")) + SshInterpreter.getKey(key.parameterName).valueOr(error => throw new RuntimeException(s"Cannot retrieve JDBC password from EC2 Parameter Store. ${error.show}")) } val props = new Properties() props.setProperty("user", target.username) props.setProperty("password", password) - target match { case r: StorageTarget.RedshiftConfig => def connect() = diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/S3Interpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/S3Interpreter.scala index 6a65b2aa4..f2dac4eea 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/S3Interpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/S3Interpreter.scala @@ -29,7 +29,8 @@ import scala.collection.convert.wrapAsScala._ import scala.util.control.NonFatal // This project -import com.snowplowanalytics.snowplow.rdbloader.LoaderError.{DiscoveryError, DownloadFailure, S3Failure} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError.DiscoveryError +import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure.{S3Failure, DownloadFailure} /** diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala index 799525023..1dc4e84d7 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala @@ -197,20 +197,29 @@ object RedshiftLoadStatements { * Build COPY FROM JSON SQL-statement for shredded types * * @param config main Snowplow configuration - * @param s3path S3 path to folder with shredded JSON files - * @param jsonPathsFile S3 path to JSONPath file * @param tableName valid Redshift table name for shredded type * @return valid SQL statement to LOAD */ - def buildCopyShreddedStatement(config: SnowplowConfig, s3path: String, jsonPathsFile: String, tableName: String, maxError: Int, roleArn: String): SqlString = { + def buildCopyShreddedStatement(config: SnowplowConfig, shreddedType: ShreddedType, tableName: String, maxError: Int, roleArn: String): SqlString = { val compressionFormat = getCompressionFormat(config.enrich.outputCompression) - SqlString.unsafeCoerce( - s"""COPY $tableName FROM '$s3path' - | CREDENTIALS 'aws_iam_role=$roleArn' JSON AS '$jsonPathsFile' - | REGION AS '${config.aws.s3.region}' - | MAXERROR $maxError TRUNCATECOLUMNS TIMEFORMAT 'auto' - | ACCEPTINVCHARS $compressionFormat""".stripMargin) + shreddedType match { + case ShreddedType.Json(_, jsonPathsFile) => + SqlString.unsafeCoerce( + s"""COPY $tableName FROM '${shreddedType.getLoadPath}' + | CREDENTIALS 'aws_iam_role=$roleArn' JSON AS '$jsonPathsFile' + | REGION AS '${config.aws.s3.region}' + | MAXERROR $maxError TRUNCATECOLUMNS TIMEFORMAT 'auto' + | ACCEPTINVCHARS $compressionFormat""".stripMargin) + case ShreddedType.Tabular(_) => + SqlString.unsafeCoerce( + s"""COPY $tableName FROM '${shreddedType.getLoadPath}' + | CREDENTIALS 'aws_iam_role=$roleArn' + | REGION AS '${config.aws.s3.region}' + | DELIMITER '$EventFieldSeparator' + | MAXERROR $maxError TRUNCATECOLUMNS TIMEFORMAT 'auto' + | ACCEPTINVCHARS $compressionFormat""".stripMargin) + } } /** @@ -269,7 +278,7 @@ object RedshiftLoadStatements { */ private def transformShreddedType(config: SnowplowConfig, target: RedshiftConfig, shreddedType: ShreddedType): ShreddedStatements = { val tableName = target.shreddedTable(ShreddedType.getTableName(shreddedType)) - val copyFromJson = buildCopyShreddedStatement(config, shreddedType.getLoadPath, shreddedType.jsonPaths, tableName, target.maxError, target.roleArn) + val copyFromJson = buildCopyShreddedStatement(config, shreddedType, tableName, target.maxError, target.roleArn) val analyze = buildAnalyzeStatement(tableName) val vacuum = buildVacuumStatement(tableName) ShreddedStatements(copyFromJson, analyze, vacuum) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala index b86a6e648..d3086ac9e 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala @@ -19,7 +19,7 @@ import cats.implicits._ import cats.effect.Clock -import rdbloader.LoaderError.DiscoveryFailure +import rdbloader.discovery.DiscoveryFailure import rdbloader.interpreters.implementations.ManifestInterpreter.ManifestE import scala.concurrent.duration.{ TimeUnit, MILLISECONDS, NANOSECONDS } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala index 2d95798b4..50ad571a2 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala @@ -29,6 +29,7 @@ import com.snowplowanalytics.iglu.client.validator.CirceValidator // This project import LoaderError._ +import discovery.DiscoveryFailure import config.CliConfig import interpreters.implementations.ManifestInterpreter.ManifestE @@ -61,7 +62,7 @@ object Common { case Right(_) => Log.LoadingSucceeded case Left(e @ DiscoveryError(failures)) => val manifestError = failures.collect { - case e: NoDataFailure if config.target.processingManifest.nonEmpty => e.getManifestMessage + case e: DiscoveryFailure.NoDataFailure if config.target.processingManifest.nonEmpty => e.getManifestMessage } Log.LoadingFailed((e: LoaderError).show ++ s"\n${manifestError.mkString("\n")}") case Left(error) => diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/TestInterpreter.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/TestInterpreter.scala new file mode 100644 index 000000000..a4bed8840 --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/TestInterpreter.scala @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2014-2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader + +import cats.data.{EitherT, State} +import cats.implicits._ + +import io.circe.literal._ + +import com.snowplowanalytics.iglu.core._ +import com.snowplowanalytics.iglu.schemaddl.IgluSchema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ +import com.snowplowanalytics.iglu.schemaddl.migrations.{ SchemaList => DSchemaList } + +import com.snowplowanalytics.snowplow.rdbloader.db.Decoder +import com.snowplowanalytics.snowplow.rdbloader.Security.Tunnel +import com.snowplowanalytics.snowplow.rdbloader.db.Entities.{Columns, TableState} + + +object TestInterpreter { + + type Test[A] = State[List[String], A] + + def fetch(key: SchemaKey): EitherT[Test, String, IgluSchema] = { + val state = State[List[String], IgluSchema] { log => + val result = Schema.parse(json"""{}""").getOrElse(throw new RuntimeException("Not a valid JSON schema")) + val schema = SelfDescribingSchema(SchemaMap(key), result) + (s"Fetch ${key.toSchemaUri}" :: log, schema) + } + EitherT.liftF(state) + } + + def executeUpdate(query: String): Test[Either[LoaderError, Long]] = + State { log => (trim(query) :: log, 1L.asRight) } + + def print(message: String): Test[Unit] = + State { log => (message :: log, ()) } + + def executeQuery[A](query: String, decoder: Decoder[A]): Test[Either[LoaderError, A]] = { + val result = decoder.name match { + case "TableState" => TableState(SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2,0,0))) + case "Boolean" => false + case "Columns" => Columns(List("some_column")) + } + State { log => (trim(query) :: log, Right(result.asInstanceOf[A])) } + } + + def getEc2Property(name: String): Test[Either[LoaderError, String]] = + State { log => + val value = "EC2 PROPERTY " ++ name ++ " key" + (value :: log, Right(value)) + } + + def establishTunnel(tunnelConfig: Tunnel): Test[Either[LoaderError, Unit]] = + State { log => ("SSH TUNNEL ESTABLISH" :: log, Right(())) } + + def getSchemas(vendor: String, name: String, model: Int): Test[Either[LoaderError, DSchemaList]] = + SchemaList + .parseStrings(List(s"iglu:$vendor/$name/jsonschema/$model-0-0")) + .map { x => DSchemaList.fromSchemaList(x, TestInterpreter.fetch).value } + .sequence[Test, Either[String, DSchemaList]] + .map { e => e.flatten.leftMap { x => LoaderError.LoaderLocalError(x)} } + + + private def trim(s: String): String = + s.trim.replaceAll("\\s+", " ").replace("\n", " ") + +} + diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/db/MigrationSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/db/MigrationSpec.scala new file mode 100644 index 000000000..9e3d5ff16 --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/db/MigrationSpec.scala @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2014-2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.db + +import cats.~> + +import com.snowplowanalytics.snowplow.rdbloader.{LoaderA, S3} +import com.snowplowanalytics.snowplow.rdbloader.config.Semver +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} + +import org.specs2.Specification + +import com.snowplowanalytics.snowplow.rdbloader.TestInterpreter + +class MigrationSpec extends Specification { def is = s2""" + Perform migration only for ShreddedType.Tabular $e1 + """ + + def e1 = { + val types = + List( + ShreddedType.Tabular(ShreddedType.Info( + S3.Folder.coerce("s3://shredded/archive"), + "com.acme", + "some_context", + 2, + Semver(0, 17, 0) + )), + ShreddedType.Json(ShreddedType.Info( + S3.Folder.coerce("s3://shredded/archive"), + "com.acme", + "some_event", + 1, + Semver(0, 17, 0) + ), S3.Key.coerce("s3://shredded/jsonpaths")) + ) + val input = List(DataDiscovery(S3.Folder.coerce("s3://shredded/archive"), None, None, types, true, None)) + + val expected = List( + "Fetch iglu:com.acme/some_context/jsonschema/2-0-0", + "SELECT EXISTS ( SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'com_acme_some_context_2') AS exists;", + "Creating public.com_acme_some_context_2 table for iglu:com.acme/some_context/jsonschema/2-0-0", + "CREATE TABLE IF NOT EXISTS public.com_acme_some_context_2 ( \"schema_vendor\" VARCHAR(128) ENCODE ZSTD NOT NULL, \"schema_name\" VARCHAR(128) ENCODE ZSTD NOT NULL, \"schema_format\" VARCHAR(128) ENCODE ZSTD NOT NULL, \"schema_version\" VARCHAR(128) ENCODE ZSTD NOT NULL, \"root_id\" CHAR(36) ENCODE RAW NOT NULL, \"root_tstamp\" TIMESTAMP ENCODE ZSTD NOT NULL, \"ref_root\" VARCHAR(255) ENCODE ZSTD NOT NULL, \"ref_tree\" VARCHAR(1500) ENCODE ZSTD NOT NULL, \"ref_parent\" VARCHAR(255) ENCODE ZSTD NOT NULL, FOREIGN KEY (root_id) REFERENCES public.events(event_id) ) DISTSTYLE KEY DISTKEY (root_id) SORTKEY (root_tstamp)", + "COMMENT ON TABLE public.com_acme_some_context_2 IS 'iglu:com.acme/some_context/jsonschema/2-0-0'", + "Table created" + ) + + val action = Migration.perform("public")(input) + val (state, result) = action.value.foldMap(MigrationSpec.interpreter).run(Nil).value + (state.reverse must beEqualTo(expected)).and(result must beRight) + } +} + +object MigrationSpec { + + import TestInterpreter.Test + + def interpreter: LoaderA ~> Test = new (LoaderA ~> Test) { + def apply[A](effect: LoaderA[A]): Test[A] = { + effect match { + case LoaderA.Print(message) => + TestInterpreter.print(message) + + case LoaderA.ExecuteUpdate(query) => + TestInterpreter.executeUpdate(query) + + case LoaderA.ExecuteQuery(query, decoder) => + TestInterpreter.executeQuery(query, decoder) + + case LoaderA.GetSchemas(vendor, name, model) => + TestInterpreter.getSchemas(vendor, name, model) + + case action => + throw new RuntimeException(s"Unexpected Action [$action]") + } + } + } + +} \ No newline at end of file diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala index 93404c36a..f810be40d 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala @@ -19,7 +19,7 @@ import cats.syntax.either._ import org.specs2.Specification -import LoaderError.{DiscoveryError, NoDataFailure} +import LoaderError.DiscoveryError import config.Semver import discovery.ShreddedType._ import S3.Folder.{coerce => dir} @@ -87,10 +87,10 @@ class DataDiscoverySpec extends Specification { def is = s2""" Some(2L), Some(2L), List( - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) ), @@ -103,7 +103,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" Some(2L), Some(2L), List( - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"), "com.snowplowanalytics.snowplow","add_to_cart",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/add_to_cart_1.json")) ), @@ -199,10 +199,10 @@ class DataDiscoverySpec extends Specification { def is = s2""" Some(2L), Some(2L), List( - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) ), @@ -215,13 +215,13 @@ class DataDiscoverySpec extends Specification { def is = s2""" Some(2L), Some(2L), List( - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"), "com.snowplowanalytics.snowplow","add_to_cart",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/add_to_cart_1.json")), - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"), "com.snowplowanalytics.snowplow","geolocation",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/geolocation_1.json")), - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"), "com.snowplowanalytics.snowplow","custom_context",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/custom_context_1.json")) ), @@ -254,7 +254,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" val shreddedGood = S3.Folder.coerce("s3://runfolder-test/shredded/good/run=2017-08-21-19-18-20") - val expected = DiscoveryError(List(NoDataFailure(shreddedGood))) + val expected = DiscoveryError(List(DiscoveryFailure.NoDataFailure(shreddedGood))) val discoveryTarget = DataDiscovery.InSpecificFolder(shreddedGood) val result = DataDiscovery.discoverFull(discoveryTarget, "test", Semver(0,11,0), "us-east-1", None) @@ -328,10 +328,10 @@ class DataDiscoverySpec extends Specification { def is = s2""" Some(2L), Some(2L), List( - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) ), @@ -388,10 +388,10 @@ class DataDiscoverySpec extends Specification { def is = s2""" Some(2L), Some(2L), List( - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) ), @@ -455,10 +455,10 @@ class DataDiscoverySpec extends Specification { def is = s2""" Some(2L), Some(2L), List( - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType( + ShreddedType.Json( Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) ), @@ -508,8 +508,8 @@ class DataDiscoverySpec extends Specification { def is = s2""" def e9 = { val shreddedTypes = List( - ShreddedType(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "event", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/event_1.json")), - ShreddedType(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "context", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/context_1.json")) + ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "event", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/event_1.json")), + ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "context", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/context_1.json")) ) val discovery = DataDiscovery(S3.Folder.coerce("s3://my-bucket/my-path"), Some(8), Some(1024), shreddedTypes, false, None) diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala index 292c65e6e..a8595552f 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscoverySpec.scala @@ -28,7 +28,7 @@ import com.snowplowanalytics.manifest.core.ManifestError._ import com.snowplowanalytics.iglu.client.Resolver -import com.snowplowanalytics.snowplow.rdbloader.LoaderError.{DiscoveryError, ManifestFailure} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError.DiscoveryError import com.snowplowanalytics.snowplow.rdbloader.config.Semver import org.specs2.Specification @@ -90,7 +90,7 @@ class ManifestDiscoverySpec extends Specification { def is = s2""" val action = ManifestDiscovery.discover("test-storage", "us-east-1", None) val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(records)) result must beLeft.like { - case DiscoveryError(List(ManifestFailure(Corrupted(Corruption.ParseError(error))))) => + case DiscoveryError(List(DiscoveryFailure.ManifestFailure(Corrupted(Corruption.ParseError(error))))) => error must endingWith("Key [iglu:com.acme/event/jsonschema/0-0-1] is invalid Iglu URI, INVALID_SCHEMAVER, Path [invalidFolder] is not valid base for shredded type. Bucket name must start with s3:// prefix") } } @@ -124,7 +124,7 @@ class ManifestDiscoverySpec extends Specification { def is = s2""" val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(records)) result must beRight(List( DataDiscovery(base2, None, None, List( - ShreddedType( + ShreddedType.Json( ShreddedType.Info(base2, "com.acme", "context", 1, Semver(0,13,0)), S3.Key.coerce("s3://jsonpaths-assets/com.acme/context_1.json") ) @@ -162,19 +162,19 @@ class ManifestDiscoverySpec extends Specification { def is = s2""" val expected = List( DataDiscovery(base1, None, None, List( - ShreddedType( + ShreddedType.Json( ShreddedType.Info(base1, "com.acme", "event", 1, Semver(0,13,0)), S3.Key.coerce("s3://jsonpaths-assets-other/com.acme/event_1.json") ) ), specificFolder = false, Some(item1)), DataDiscovery(base2, None, None, List( - ShreddedType( + ShreddedType.Json( ShreddedType.Info(base2, "com.acme", "event", 1, Semver(0,13,0)), S3.Key.coerce("s3://jsonpaths-assets-other/com.acme/event_1.json") ) ), specificFolder = false, Some(item2)), DataDiscovery(base3, None, None, List( - ShreddedType( + ShreddedType.Json( ShreddedType.Info(base3, "com.acme", "context", 1, Semver(0,13,0)), S3.Key.coerce("s3://jsonpaths-assets/com.acme/context_1.json") ) @@ -203,7 +203,7 @@ class ManifestDiscoverySpec extends Specification { def is = s2""" val action = ManifestDiscovery.discover("id", "us-east-1", None) val result = action.value.foldMap(ManifestDiscoverySpec.interpreter(records)) result must beLeft.like { - case DiscoveryError(List(ManifestFailure(Locked(_, None)))) => ok + case DiscoveryError(List(DiscoveryFailure.ManifestFailure(Locked(_, None)))) => ok case error => ko(s"Discovery failed not due Failed record: ${error.show}") } } diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala index a12069124..99b8a3245 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala @@ -22,51 +22,6 @@ import org.specs2.{ScalaCheck, Specification} import com.snowplowanalytics.snowplow.rdbloader.config.Semver import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType._ -object ShreddedTypeSpec { - - /** - * `Gen` instance for a vendor/name-like string - */ - implicit val alphaNum: Gen[String] = for { - n <- Gen.chooseNum(1, 5) - d <- Gen.oneOf('_', '.', '-') - s <- Gen.listOf(Gen.alphaNumChar) - .map(_.mkString) - .suchThat(_.nonEmpty) - (a, b) = s.splitAt(n) - r <- Gen.const(s"$a$d$b") - .suchThat(x => !x.startsWith(d.toString)) - .suchThat(x => !x.endsWith(d.toString)) - } yield r - - implicit val subpath: Gen[String] = for { - s <- Gen.listOf(Gen.listOf(Gen.alphaNumChar).map(_.mkString).suchThat(!_.isEmpty)) - path = s.mkString("/") - } yield if (path.isEmpty) "" else path + "/" - - /** - * Elements for shredded path - */ - type ShreddedTypeElements = (String, String, String, String, Int, Int, Int) - - /** - * Generator of `ShreddedTypeElements` - * This generator doesn't guarantee that all elements are valid - * (such as `name` without dots), it allows to test parse failures - */ - val shreddedTypeElementsGen = for { - subpath <- subpath - vendor <- alphaNum - name <- alphaNum - format <- alphaNum - model <- Gen.chooseNum(0, 10) - revision <- Gen.chooseNum(0, 10) - addition <- Gen.chooseNum(0, 10) - } yield (subpath, vendor, name, format, model, revision, addition) - - -} - class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" Transform correct S3 path $e1 Fail to transform path without valid vendor $e2 @@ -76,6 +31,8 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" Transform correct S3 path without root folder $e6 Modern and legacy transformation always give same result $e7 Transform full modern shredded key $e8 + extractedSchemaKey parsed a path for tabular output $e9 + Transform correct tabular S3 path $e10 """ import ShreddedTypeSpec._ @@ -83,7 +40,7 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" def e1 = { val path = "cross-batch-test/shredded-archive/run=2017-04-27-14-39-42/com.snowplowanalytics.snowplow/submit_form/jsonschema/1-0-0/part-00000-00001" val expectedPrefix = S3.Folder.coerce("s3://rdb-test/cross-batch-test/shredded-archive/run=2017-04-27-14-39-42") - val expected = Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,10,0)) + val expected = (false, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,10,0))) val key = S3.Key.coerce(s"s3://rdb-test/$path") val result = ShreddedType.transformPath(key, Semver(0,10,0)) @@ -108,7 +65,7 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" val path = "com.snowplowanalytics.snowplow/submit_form/jsonschema/1-0-0/part-00000-00001" val key = S3.Key.coerce(s"s3://rdb-test/$path") val result = ShreddedType.transformPath(key, Semver(0,10,0)) - val expected = Info(S3.Folder.coerce("s3://rdb-test"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,10,0)) + val expected = (false, Info(S3.Folder.coerce("s3://rdb-test"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,10,0))) result must beRight(expected) } @@ -116,7 +73,7 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" val path = "vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001" val key = S3.Key.coerce(s"s3://rdb-test/shredded-types/$path") val result = ShreddedType.transformPath(key, Semver(0,13,0)) - val expected = Info(S3.Folder.coerce("s3://rdb-test"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0)) + val expected = (false, Info(S3.Folder.coerce("s3://rdb-test"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0))) result must beRight(expected) } @@ -125,7 +82,7 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" val key = S3.Key.coerce(s"s3://rdb-test/$path") val expectedPrefix = S3.Folder.coerce("s3://rdb-test/run%3D2017-04-27-14-39-42") - val expected = Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,11,0)) + val expected = (false, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,11,0))) val result = ShreddedType.transformPath(key, Semver(0,11,0)) result must beRight(expected) @@ -141,8 +98,8 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" val eitherMatch = legacyResult.void.leftMap(_ => ()) must beEqualTo(modernResult.void.leftMap(_ => ())) val valueMatch = (legacyResult, modernResult) match { case (l: Right[_, _], m: Right[_, _]) => - val legacy = l.b.copy(shredJob = Semver(0,0,0)) // Erase Shred job versions - val modern = m.b.copy(shredJob = Semver(0,0,0)) + val legacy = l.b._2.copy(shredJob = Semver(0,0,0)) // Erase Shred job versions + val modern = m.b._2.copy(shredJob = Semver(0,0,0)) legacy must beEqualTo(modern) case (Left(_), Left(_)) => ok case _ => ko @@ -156,9 +113,71 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001") val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/") - val expected = Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0)) + val expected = (false, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0))) val result = ShreddedType.transformPath(key, Semver(0,13,0)) result must beRight(expected) } + + def e9 = { + val input = "shredded-tsv/vendor=com.snowplow/name=event/format=jsonschema/version=1" + ShreddedType.extractSchemaKey(input, Semver(0,18,0)) must beSome(Extracted.Tabular("com.snowplow", "event", "jsonschema", 1)) + } + + def e10 = { + val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-tsv/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1/part-00000-00001") + + val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/") + val expected = (true, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,16,0))) + + val result = ShreddedType.transformPath(key, Semver(0,16,0)) + result must beRight(expected) + } + +} + +object ShreddedTypeSpec { + + /** + * `Gen` instance for a vendor/name-like string + */ + implicit val alphaNum: Gen[String] = for { + n <- Gen.chooseNum(1, 5) + d <- Gen.oneOf('_', '.', '-') + s <- Gen.listOf(Gen.alphaNumChar) + .map(_.mkString) + .suchThat(_.nonEmpty) + (a, b) = s.splitAt(n) + r <- Gen.const(s"$a$d$b") + .suchThat(x => !x.startsWith(d.toString)) + .suchThat(x => !x.endsWith(d.toString)) + } yield r + + implicit val subpath: Gen[String] = for { + s <- Gen.listOf(Gen.listOf(Gen.alphaNumChar).map(_.mkString).suchThat(!_.isEmpty)) + path = s.mkString("/") + } yield if (path.isEmpty) "" else path + "/" + + /** + * Elements for shredded path + */ + type ShreddedTypeElements = (String, String, String, String, Int, Int, Int) + + /** + * Generator of `ShreddedTypeElements` + * This generator doesn't guarantee that all elements are valid + * (such as `name` without dots), it allows to test parse failures + */ + val shreddedTypeElementsGen = for { + subpath <- subpath + vendor <- alphaNum + name <- alphaNum + format <- alphaNum + model <- Gen.chooseNum(0, 10) + revision <- Gen.chooseNum(0, 10) + addition <- Gen.chooseNum(0, 10) + } yield (subpath, vendor, name, format, model, revision, addition) + + } + diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala index 33dc0b34a..30b71255c 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala @@ -91,7 +91,7 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" Some(3), None, List( - ShreddedType( + ShreddedType.Json( ShreddedType.Info(Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0, 12, 0)), Key.coerce("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json") ) @@ -207,7 +207,7 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" Some(3), Some(6), List( - ShreddedType( + ShreddedType.Json( ShreddedType.Info(Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0, 12, 0, Some(Semver.ReleaseCandidate(4)))), Key.coerce("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json") )