diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscovery.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscovery.scala index a735b0a9a..d3c70c3af 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscovery.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscovery.scala @@ -12,32 +12,30 @@ */ package com.snowplowanalytics.snowplow.rdbloader -import cats.data.{EitherT, Validated, ValidatedNel, NonEmptyList} +import cats.data._ import cats.implicits._ import cats.free.Free import ShreddedType._ -import LoaderError.{AtomicDiscoveryFailure, DiscoveryError, DiscoveryFailure, NoDataFailure} +import LoaderError._ import config.Semver /** * Result of data discovery in shredded.good folder */ sealed trait DataDiscovery extends Product with Serializable { - /** - * Shred run folder full path - */ + /** Shred run folder full path */ def base: S3.Folder - /** - * Amount of keys in atomic-events directory - */ + /** ETL id */ + def runId: String = base.split("/").last + + /** Amount of keys in atomic-events directory */ def atomicCardinality: Long - /** - * `atomic-events` directory full path - */ - def atomicEvents = S3.Folder.append(base, "atomic-events") + /** `atomic-events` directory full path */ + def atomicEvents: S3.Folder = + S3.Folder.append(base, "atomic-events") /** * Time in ms for run folder to setup eventual consistency, @@ -49,6 +47,13 @@ sealed trait DataDiscovery extends Product with Serializable { case DataDiscovery.FullDiscovery(_, _, shreddedData) => ((atomicCardinality * 0.1 * shreddedData.length).toLong + 5L) * 1000 } + + def show: String = this match { + case DataDiscovery.AtomicDiscovery(_, cardinality) => + s"$runId with $cardinality atomic files" + case DataDiscovery.FullDiscovery(_, cardinality, types) => + s"$runId with $cardinality atomic files and following shredded types:\n${types.map(t => " + " + t.show).mkString("\n")}" + } } /** @@ -62,11 +67,14 @@ sealed trait DataDiscovery extends Product with Serializable { */ object DataDiscovery { + val ConsistencyChecks = 5 + /** * ADT indicating whether shredded.good or arbitrary folder - * Empty dir will be a discovery failure in later case + * `InSpecificFolder` results on discovery error on empty folder + * `InShreddedGood` results in noop on empty folder */ - sealed trait DiscoveryTarget + sealed trait DiscoveryTarget extends Product with Serializable case class InShreddedGood(folder: S3.Folder) extends DiscoveryTarget case class InSpecificFolder(folder: S3.Folder) extends DiscoveryTarget @@ -96,12 +104,12 @@ object DataDiscovery { * @return list (probably empty, but usually with single element) of discover results * (atomic events and shredded types) */ - def discoverFull(target: DiscoveryTarget, shredJob: Semver, region: String, assets: Option[S3.Folder]): Discovery[List[DataDiscovery]] = { - val validatedDataKeys: Discovery[ValidatedDataKeys] = target match { + def discoverFull(target: DiscoveryTarget, shredJob: Semver, region: String, assets: Option[S3.Folder]): LoaderAction[List[DataDiscovery]] = { + val validatedDataKeys: LoaderAction[ValidatedDataKeys] = target match { case InShreddedGood(folder) => - Discovery.map(listGoodBucket(folder))(transformKeys(shredJob, region, assets)) + listGoodBucket(folder).map(transformKeys(shredJob, region, assets)) case InSpecificFolder(folder) => - Discovery.map(listGoodBucket(folder)) { keys => + listGoodBucket(folder).map { keys => if (keys.isEmpty) { val failure = Validated.Invalid(NonEmptyList(NoDataFailure(folder), Nil)) Free.pure(failure) @@ -110,10 +118,10 @@ object DataDiscovery { } val result = for { - keys <- EitherT(validatedDataKeys) - discovery <- EitherT(groupKeysFull(keys)) + keys <- validatedDataKeys + discovery <- groupKeysFull(keys) } yield discovery - result.value + result } /** @@ -122,26 +130,27 @@ object DataDiscovery { * @param target either shredded good or specific run folder * @return list of run folders with `atomic-events` */ - def discoverAtomic(target: DiscoveryTarget): Discovery[List[DataDiscovery]] = + def discoverAtomic(target: DiscoveryTarget): LoaderAction[List[DataDiscovery]] = target match { case InShreddedGood(folder) => - Discovery.map(listGoodBucket(folder))(groupKeysAtomic).map(_.flatten) - case InSpecificFolder(folder) => - val result = Discovery.map(listGoodBucket(folder)) { keys => - if (keys.isEmpty) { - val error = DiscoveryError(List(NoDataFailure(folder))) - error.asLeft[List[AtomicDiscovery]] - } else groupKeysAtomic(keys) - } + for { + keys <- listGoodBucket(folder) + grouped <- LoaderAction.liftE(groupKeysAtomic(keys)) + } yield grouped - result.map(_.flatten) + case InSpecificFolder(folder) => + for { + keys <- listGoodBucket(folder) + discovery = if (keys.isEmpty) DiscoveryError(NoDataFailure(folder)).asLeft else groupKeysAtomic(keys) + result <- LoaderAction.liftE[List[DataDiscovery]](discovery) + } yield result } /** * List whole directory excluding special files */ - def listGoodBucket(folder: S3.Folder): Discovery[List[S3.Key]] = - Discovery.map(LoaderA.listS3(folder))(_.filterNot(isSpecial)) + def listGoodBucket(folder: S3.Folder): LoaderAction[List[S3.Key]] = + EitherT(LoaderA.listS3(folder)).map(_.filterNot(isSpecial)) // Full discovery @@ -151,19 +160,22 @@ object DataDiscovery { * @param validatedDataKeys IO-action producing validated list of `FinalDataKey` * @return IO-action producing list of */ - def groupKeysFull(validatedDataKeys: ValidatedDataKeys): Discovery[List[DataDiscovery]] = { + def groupKeysFull(validatedDataKeys: ValidatedDataKeys): LoaderAction[List[DataDiscovery]] = { def group(dataKeys: List[DataKeyFinal]): ValidatedNel[DiscoveryFailure, List[DataDiscovery]] = dataKeys.groupBy(_.base).toList.reverse.traverse(validateFolderFull) // Transform into Either with non-empty list of errors - validatedDataKeys.map { keys => - keys.andThen(group) match { - case Validated.Valid(discovery) => Right(discovery) - case Validated.Invalid(failures) => - val aggregated = LoaderError.aggregateDiscoveryFailures(failures.toList).distinct - Left(DiscoveryError(aggregated)) + val result: Action[Either[LoaderError, List[DataDiscovery]]] = + validatedDataKeys.map { keys => + keys.andThen(group) match { + case Validated.Valid(discovery) => + discovery.asRight + case Validated.Invalid(failures) => + val aggregated = LoaderError.aggregateDiscoveryFailures(failures.toList).distinct + DiscoveryError(aggregated).asLeft + } } - } + EitherT(result) } /** @@ -207,15 +219,18 @@ object DataDiscovery { * @param assets optional JSONPath assets S3 bucket * @return `Action` conaining `Validation` - as on next step we can aggregate errors */ - private def transformDataKey(dataKey: DiscoveryStep[DataKeyIntermediate], region: String, assets: Option[S3.Folder]) = { + private def transformDataKey( + dataKey: DiscoveryStep[DataKeyIntermediate], + region: String, + assets: Option[S3.Folder] + ): Action[ValidatedNel[DiscoveryFailure, DataKeyFinal]] = { dataKey match { case Right(ShreddedDataKeyIntermediate(fullPath, info)) => - val jsonpathAction = ShreddedType.discoverJsonPath(region, assets, info) - val discoveryAction: DiscoveryAction[DataKeyFinal] = - DiscoveryAction.map(jsonpathAction) { jsonpath => - ShreddedDataKeyFinal(fullPath, ShreddedType(info, jsonpath)) - } - discoveryAction.map(_.toValidatedNel) + val jsonpathAction = EitherT(ShreddedType.discoverJsonPath(region, assets, info)) + val discoveryAction = jsonpathAction.map { jsonpath => + ShreddedDataKeyFinal(fullPath, ShreddedType(info, jsonpath)) + } + discoveryAction.value.map(_.toValidatedNel) case Right(AtomicDataKey(fullPath)) => val pure: Action[ValidatedNel[DiscoveryFailure, DataKeyFinal]] = Free.pure(AtomicDataKey(fullPath).validNel[DiscoveryFailure]) @@ -253,7 +268,7 @@ object DataDiscovery { /** * Group list of S3 keys by run folder */ - def groupKeysAtomic(keys: List[S3.Key]): Either[DiscoveryError, List[AtomicDiscovery]] = { + def groupKeysAtomic(keys: List[S3.Key]): Either[LoaderError, List[AtomicDiscovery]] = { val atomicKeys: ValidatedNel[DiscoveryFailure, List[AtomicDataKey]] = keys.filter(isAtomic).map(parseAtomicKey).map(_.toValidatedNel).sequence @@ -307,39 +322,82 @@ object DataDiscovery { * @param originalAction data-discovery action * @return result of same request, but with more guarantees to be consistent */ - def checkConsistency(originalAction: Discovery[List[DataDiscovery]]): Discovery[List[DataDiscovery]] = { - def check(checkAttempt: Int, last: Option[Either[DiscoveryError, List[DataDiscovery]]]): Discovery[List[DataDiscovery]] = { - val action = last.map(Free.pure[LoaderA, Either[DiscoveryError, List[DataDiscovery]]]).getOrElse(originalAction) + def checkConsistency(originalAction: LoaderAction[List[DataDiscovery]]): LoaderAction[List[DataDiscovery]] = { + def check(checkAttempt: Int, last: Option[Either[LoaderError, List[DataDiscovery]]]): ActionE[List[DataDiscovery]] = { + val action = last.map(Free.pure[LoaderA, Either[LoaderError, List[DataDiscovery]]]).getOrElse(originalAction.value) for { original <- action _ <- sleepConsistency(original) - control <- originalAction + control <- originalAction.value result <- retry(original, control, checkAttempt + 1) } yield result } def retry( - original: Either[DiscoveryError, List[DataDiscovery]], - control: Either[DiscoveryError, List[DataDiscovery]], - attempt: Int): Discovery[List[DataDiscovery]] = { - if (attempt >= 5) - Free.pure(control.orElse(original)) - else if (original.isRight && original == control) - Free.pure(original) - else if (control.isLeft || original.isLeft) - check(attempt, None) - else // Both Right, but not equal - check(attempt, Some(control)) + original: Either[LoaderError, List[DataDiscovery]], + control: Either[LoaderError, List[DataDiscovery]], + attempt: Int): ActionE[List[DataDiscovery]] = { + (original, control) match { + case _ if attempt >= ConsistencyChecks => + for { + _ <- LoaderA.print(s"Consistency check did not pass after $ConsistencyChecks attempts") + discovered <- Free.pure(control.orElse(original)) + } yield discovered + case (Right(o), Right(c)) if o.sortBy(_.base.toString) == c.sortBy(_.base.toString) => + val message = o.map(x => s"+ ${x.show}").mkString("\n") + for { + _ <- LoaderA.print(s"Consistency check passed after ${attempt - 1} attempt. Following run ids found:\n$message") + discovered <- Free.pure(original) + } yield discovered + case (Left(_), Left(_)) => + for { + _ <- LoaderA.print(s"Consistency check failed. Making another attempt") + next <- check(attempt, None) + } yield next + case (Right(o), Right(c)) => + val message = if (attempt == ConsistencyChecks - 1) + s"Difference:\n ${discoveryDiff(o, c).map(m => s"+ $m").mkString("\n")}" + else "" + + for { + _ <- LoaderA.print(s"Consistency check failed. $message") + next <- check(attempt, Some(control)) + } yield next + } + } + + EitherT[Action, LoaderError, List[DataDiscovery]](check(1, None)) + } + + + def discoveryDiff(original: List[DataDiscovery], control: List[DataDiscovery]): List[String] = { + original.flatMap { o => + control.find(_.base == o.base) match { + case None => List(s"Folder ${o.base} was not found in control check (probably ghost-folder)") + case Some(c) => discoveryDiff(o, c) + } } + } - check(1, None) + /** Get difference-message between two checks. Assuming they have same base */ + private def discoveryDiff(original: DataDiscovery, control: DataDiscovery) = { + (if (original.atomicCardinality != control.atomicCardinality) List("Different cardinality of atomic files") else Nil) ++ + ((original, control) match { + case (o: FullDiscovery, c: FullDiscovery) => + o.shreddedTypes.diff(c.shreddedTypes).map { d => s"${d.show} exists in first check and misses in control" } ++ + c.shreddedTypes.diff(o.shreddedTypes).map { d => s"${d.show} exists in control check and misses in first" } + case (_: AtomicDiscovery, _: AtomicDiscovery) => + Nil + case _ => + List("Inconsistent state") // not possible + }) } /** * Aggregates wait time for all discovered folders or wait 10 sec in case action failed */ - private def sleepConsistency(result: Either[DiscoveryError, List[DataDiscovery]]): Action[Unit] = { + private def sleepConsistency(result: Either[LoaderError, List[DataDiscovery]]): Action[Unit] = { val timeoutMs = result match { case Right(list) => list.map(_.consistencyTimeout).foldLeft(10000L)(_ + _) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala index fef7def53..f9ff3f55d 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala @@ -15,10 +15,10 @@ package com.snowplowanalytics.snowplow.rdbloader import java.nio.file.Path import cats.free.Free +import cats.data.EitherT import cats.implicits._ // This library -import LoaderError.DiscoveryError import Security.Tunnel import loaders.Common.SqlString @@ -31,7 +31,7 @@ sealed trait LoaderA[A] object LoaderA { // Discovery ops - case class ListS3(bucket: S3.Folder) extends LoaderA[Either[DiscoveryError, List[S3.Key]]] + case class ListS3(bucket: S3.Folder) extends LoaderA[Either[LoaderError, List[S3.Key]]] case class KeyExists(key: S3.Key) extends LoaderA[Boolean] case class DownloadData(path: S3.Folder, dest: Path) extends LoaderA[Either[LoaderError, List[Path]]] @@ -48,6 +48,7 @@ object LoaderA { case class Track(exitLog: Log) extends LoaderA[Unit] case class Dump(key: S3.Key, exitLog: Log) extends LoaderA[Either[String, S3.Key]] case class Exit(exitLog: Log, dumpResult: Option[Either[String, S3.Key]]) extends LoaderA[Int] + case class Print(message: String) extends LoaderA[Unit] // Cache ops case class Put(key: String, value: Option[S3.Key]) extends LoaderA[Unit] @@ -61,9 +62,8 @@ object LoaderA { case class GetEc2Property(name: String) extends LoaderA[Either[LoaderError, String]] - /** Get *all* S3 keys prefixed with some folder */ - def listS3(bucket: S3.Folder): Action[Either[DiscoveryError, List[S3.Key]]] = - Free.liftF[LoaderA, Either[DiscoveryError, List[S3.Key]]](ListS3(bucket)) + def listS3(bucket: S3.Folder): Action[Either[LoaderError, List[S3.Key]]] = + Free.liftF[LoaderA, Either[LoaderError, List[S3.Key]]](ListS3(bucket)) /** Check if S3 key exist */ def keyExists(key: S3.Key): Action[Boolean] = @@ -79,8 +79,10 @@ object LoaderA { Free.liftF[LoaderA, Either[LoaderError, Long]](ExecuteQuery(query)) /** Execute multiple (against target in interpreter) */ - def executeQueries(queries: List[SqlString]): Action[Either[LoaderError, Unit]] = - queries.traverse(executeQuery).map(eithers => eithers.sequence.map(_.combineAll)) + def executeQueries(queries: List[SqlString]): Action[Either[LoaderError, Unit]] = { + val shortCircuiting = queries.traverse(query => EitherT(executeQuery(query))) + shortCircuiting.void.value + } /** Execute SQL transaction (against target in interpreter) */ def executeTransaction(queries: List[SqlString]): Action[Either[LoaderError, Unit]] = { @@ -121,6 +123,10 @@ object LoaderA { def exit(result: Log, dumpResult: Option[Either[String, S3.Key]]): Action[Int] = Free.liftF[LoaderA, Int](Exit(result, dumpResult)) + /** Print message to stdout */ + def print(message: String): Action[Unit] = + Free.liftF[LoaderA, Unit](Print(message)) + /** Put value into cache (stored in interpreter) */ def putCache(key: String, value: Option[S3.Key]): Action[Unit] = diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala index c31102c7d..58a8340a8 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala @@ -44,6 +44,9 @@ object LoaderError { * Contains multiple step failures */ case class DiscoveryError(failures: List[DiscoveryFailure]) extends LoaderError + object DiscoveryError { + def apply(single: DiscoveryFailure): LoaderError = DiscoveryError(List(single)) + } /** * Error representing failure on database loading (or executing any statements) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Log.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Log.scala index 7ebf9fcfb..0041a9715 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Log.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Log.scala @@ -12,9 +12,6 @@ */ package com.snowplowanalytics.snowplow.rdbloader -// This project -import config.Step - /** * End-of-the-world result type. * Controls how RDB Loader exits @@ -26,22 +23,14 @@ object Log { /** * Loading succeeded. No messages, 0 exit code */ - case class LoadingSucceeded(steps: List[Step]) extends Log { - override def toString: String = { - s"RDB Loader successfully completed following steps: [${steps.mkString(", ")}]" - } + case object LoadingSucceeded extends Log { + override def toString: String = s"RDB Loader successfully completed" } /** * Loading failed. Write error message. 1 exit code. */ - case class LoadingFailed(error: String, steps: List[Step]) extends Log { - override def toString: String = { - s"ERROR: $error\n" + (if (steps.nonEmpty) { - s"Following steps completed: [${steps.mkString(",")}]" } - else { - "No steps completed" - }) - } + case class LoadingFailed(error: String) extends Log { + override def toString: String = s"ERROR: $error\n" } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala index f864630c3..d00a575ef 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala @@ -17,7 +17,7 @@ import cats.data.Validated._ // This project import interpreters.Interpreter import config.CliConfig -import loaders.Common.load +import loaders.Common.{ load, discover } /** * Application entry point @@ -53,8 +53,12 @@ object Main { def run(config: CliConfig): Int = { val interpreter = Interpreter.initialize(config) - val actions = for { - result <- load(config).value.run(Nil) + val actions: Action[Int] = for { + data <- discover(config).value + result <- data match { + case Right(discovery) => load(config, discovery).value + case Left(error) => ActionE.liftError(error) + } message = utils.Common.interpret(result) _ <- LoaderA.track(message) status <- close(config.logKey, message) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala index ea848b55e..68d106bc9 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Security.scala @@ -13,6 +13,7 @@ package com.snowplowanalytics.snowplow.rdbloader import cats.Functor +import cats.data.EitherT import cats.implicits._ import config.StorageTarget.TunnelConfig @@ -37,13 +38,13 @@ object Security { } /** Perform loading and make sure tunnel is closed */ - def bracket(tunnelConfig: Option[TunnelConfig], action: TargetLoading[LoaderError, Unit]): TargetLoading[LoaderError, Unit] = { + def bracket(tunnelConfig: Option[TunnelConfig], action: LoaderAction[Unit]): LoaderAction[Unit] = { tunnelConfig match { case Some(tunnel) => for { - identity <- getIdentity(tunnel).withoutStep - _ <- LoaderA.establishTunnel(Security.Tunnel(tunnel, identity)).withoutStep + identity <- EitherT(getIdentity(tunnel)) + _ <- EitherT(LoaderA.establishTunnel(Security.Tunnel(tunnel, identity))) _ <- action - _ <- LoaderA.closeTunnel().withoutStep + _ <- EitherT(LoaderA.closeTunnel()) } yield () case None => action } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedType.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedType.scala index 13dfbbacf..55a7b1d59 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedType.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/ShreddedType.scala @@ -15,6 +15,8 @@ package com.snowplowanalytics.snowplow.rdbloader import cats.implicits._ import cats.free.Free +import com.snowplowanalytics.iglu.client.SchemaCriterion + import com.snowplowanalytics.iglu.core.SchemaKey // This project @@ -31,9 +33,7 @@ import utils.Common.toSnakeCase * @param jsonPaths existing JSONPaths file */ case class ShreddedType(info: ShreddedType.Info, jsonPaths: S3.Key) { - /** - * Get S3 prefix which Redshift should LOAD FROM - */ + /** Get S3 prefix which Redshift should LOAD FROM */ def getLoadPath: String = { if (info.shredJob <= ShreddedType.ShredJobBeforeSparkVersion) { s"${info.base}${info.vendor}/${info.name}/jsonschema/${info.model}-" @@ -41,6 +41,9 @@ case class ShreddedType(info: ShreddedType.Info, jsonPaths: S3.Key) { s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-" } } + + /** Human-readable form */ + def show: String = s"${info.toCriterion.toString} ($jsonPaths)" } /** @@ -58,7 +61,9 @@ object ShreddedType { * @param name self-describing type's name * @param model self-describing type's SchemaVer model */ - case class Info(base: S3.Folder, vendor: String, name: String, model: Int, shredJob: Semver) + case class Info(base: S3.Folder, vendor: String, name: String, model: Int, shredJob: Semver) { + def toCriterion: SchemaCriterion = SchemaCriterion(vendor, name, "jsonschema", model) + } /** * Basis for Snowplow hosted assets bucket. diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala index aa3d4570b..bbebd08e9 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/DryRunInterpreter.scala @@ -96,6 +96,8 @@ class DryRunInterpreter private[interpreters]( logMessages.append(s"Deleted temporary directory [${path.toString}]").asRight + case Print(message) => + println(message) case Sleep(timeout) => sleepTime = sleepTime + timeout Thread.sleep(timeout) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala index 3db265422..5bab80680 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/RealWorldInterpreter.scala @@ -24,6 +24,8 @@ import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 +import org.joda.time.DateTime + import com.snowplowanalytics.snowplow.scalatracker.Tracker // This project @@ -56,11 +58,19 @@ class RealWorldInterpreter private[interpreters]( // lazy to wait before tunnel established private lazy val dbConnection = PgInterpreter.getConnection(cliConfig.target) + private val messages = collection.mutable.ListBuffer.empty[String] + + def log(message: String) = { + System.out.println(s"RDB Loader [${DateTime.now()}]: $message") + messages.append(message) + } + def run: LoaderA ~> Id = new (LoaderA ~> Id) { def apply[A](effect: LoaderA[A]): Id[A] = { effect match { case ListS3(folder) => + log(s"Listing $folder") S3Interpreter.list(amazonS3, folder).map(summaries => summaries.map(S3.getKey)) case KeyExists(key) => S3Interpreter.keyExists(amazonS3, key) @@ -68,13 +78,15 @@ class RealWorldInterpreter private[interpreters]( S3Interpreter.downloadData(amazonS3, source, dest) case ExecuteQuery(query) => - for { + val result = for { conn <- dbConnection res <- PgInterpreter.executeQuery(conn)(query) } yield res + result.asInstanceOf[Id[A]] case CopyViaStdin(files, query) => for { conn <- dbConnection + _ = log(s"Copying ${files.length} files via stdin") res <- PgInterpreter.copyViaStdin(conn, files, query) } yield res @@ -82,7 +94,8 @@ class RealWorldInterpreter private[interpreters]( try { Files.createTempDirectory("rdb-loader").asRight } catch { - case NonFatal(e) => LoaderLocalError("Cannot create temporary directory.\n" + e.toString).asLeft + case NonFatal(e) => + LoaderLocalError("Cannot create temporary directory.\n" + e.toString).asLeft } case DeleteDir(path) => try { @@ -91,20 +104,27 @@ class RealWorldInterpreter private[interpreters]( case NonFatal(e) => LoaderLocalError(s"Cannot delete directory [${path.toString}].\n" + e.toString).asLeft } + case Print(message) => + log(message) case Sleep(timeout) => + log(s"Sleeping $timeout milliseconds") Thread.sleep(timeout) case Track(result) => result match { - case ExitLog.LoadingSucceeded(_) => + case ExitLog.LoadingSucceeded => + log("Tracking success") TrackerInterpreter.trackSuccess(tracker) - case ExitLog.LoadingFailed(message, _) => + case ExitLog.LoadingFailed(message) => + log("Tracking failure") val secrets = List(cliConfig.target.password.getUnencrypted, cliConfig.target.username) val sanitizedMessage = Common.sanitize(message, secrets) TrackerInterpreter.trackError(tracker, sanitizedMessage) } case Dump(key, result) => + log(s"Dumping $key") TrackerInterpreter.dumpStdout(amazonS3, key, result.toString) case Exit(loadResult, dumpResult) => + log("Quit") dbConnection.foreach(c => c.close()) TrackerInterpreter.exit(loadResult, dumpResult) @@ -116,8 +136,10 @@ class RealWorldInterpreter private[interpreters]( () case EstablishTunnel(config) => + log("Establishing SSH tunnel") SshInterpreter.establishTunnel(config) case CloseTunnel() => + log("Closing SSH tunnel") SshInterpreter.closeTunnel() case GetEc2Property(name) => diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala index 62058602e..b047653e2 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/PgInterpreter.scala @@ -20,10 +20,10 @@ import java.util.Properties import scala.util.control.NonFatal -import cats.implicits._ - import com.amazon.redshift.jdbc42.{Driver => RedshiftDriver} +import cats.implicits._ + import org.postgresql.copy.CopyManager import org.postgresql.jdbc.PgConnection import org.postgresql.{Driver => PgDriver} @@ -45,14 +45,23 @@ object PgInterpreter { Either.catchNonFatal { conn.createStatement().executeUpdate(sql) } leftMap { - case NonFatal(e) => StorageTargetError(Option(e.getMessage).getOrElse(e.toString)) + case NonFatal(e: java.sql.SQLException) if Option(e.getMessage).getOrElse("").contains("is not authorized to assume IAM Role") => + println(e.getMessage) + StorageTargetError("IAM Role with S3 Read permissions is not attached to Redshift instance") + case NonFatal(e) => + System.err.println("RDB Loader unknown error in executeQuery") + e.printStackTrace() + StorageTargetError(Option(e.getMessage).getOrElse(e.toString)) } def setAutocommit(conn: Connection, autoCommit: Boolean): Either[LoaderError, Unit] = try { Right(conn.setAutoCommit(autoCommit)) } catch { - case e: SQLException => Left(StorageTargetError(e.toString)) + case e: SQLException => + System.err.println("setAutocommit error") + e.printStackTrace() + Left(StorageTargetError(e.toString)) } @@ -65,10 +74,7 @@ object PgInterpreter { manager <- copyManager _ <- setAutocommit(conn, false) result = files.traverse(copyIn(manager, copyStatement)(_)).map(_.combineAll) - _ = result match { - case Left(_) => conn.rollback() - case Right(_) => conn.commit() - } + _ = result.fold(_ => conn.rollback(), _ => conn.commit()) _ <- setAutocommit(conn, true) endResult <- result } yield endResult @@ -113,7 +119,10 @@ object PgInterpreter { Right(new PgDriver().connect(url, props)) } } catch { - case NonFatal(e) => Left(StorageTargetError(s"Problems with establishing DB connection\n${e.getMessage}")) + case NonFatal(e) => + System.err.println("RDB Loader getConnection error") + e.printStackTrace() + Left(StorageTargetError(s"Problems with establishing DB connection\n${e.getMessage}")) } } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala index f33683430..4c0e40bbd 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala @@ -120,16 +120,16 @@ object TrackerInterpreter { def exit(result: Log, dumpResult: Option[Either[String, S3.Key]]): Int = { println(result) (result, dumpResult) match { - case (Log.LoadingSucceeded(_), None) => + case (Log.LoadingSucceeded, None) => println(s"INFO: Logs were not dumped to S3") 0 - case (Log.LoadingFailed(_, _), None) => + case (Log.LoadingFailed(_), None) => println(s"INFO: Logs were not dumped to S3") 1 - case (Log.LoadingSucceeded(_), Some(Right(key))) => + case (Log.LoadingSucceeded, Some(Right(key))) => println(s"INFO: Logs successfully dumped to S3 [$key]") 0 - case (Log.LoadingFailed(_, _), Some(Right(key))) => + case (Log.LoadingFailed(_), Some(Right(key))) => println(s"INFO: Logs successfully dumped to S3 [$key]") 1 case (_, Some(Left(error))) => diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala index d5be8c72d..10bc78c79 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala @@ -17,7 +17,7 @@ import shapeless.tag import shapeless.tag._ // This project -import config.CliConfig +import config.{ CliConfig, Step } import config.StorageTarget.{ PostgresqlConfig, RedshiftConfig } @@ -59,17 +59,47 @@ object Common { * * @param cliConfig RDB Loader app configuration */ - def load(cliConfig: CliConfig): TargetLoading[LoaderError, Unit] = { + def load(cliConfig: CliConfig, discovery: List[DataDiscovery]): LoaderAction[Unit] = { val loadDb = cliConfig.target match { case postgresqlTarget: PostgresqlConfig => - PostgresqlLoader.run(cliConfig.configYaml, postgresqlTarget, cliConfig.steps, cliConfig.folder) + PostgresqlLoader.run(postgresqlTarget, cliConfig.steps, discovery) case redshiftTarget: RedshiftConfig => - RedshiftLoader.run(cliConfig.configYaml, redshiftTarget, cliConfig.steps, cliConfig.folder) + RedshiftLoader.run(cliConfig.configYaml, redshiftTarget, cliConfig.steps, discovery) } Security.bracket(cliConfig.target.sshTunnel, loadDb) } + /** + * Choose a discovery strategy and perform it + * + * @param cliConfig RDB Loader app configuration + */ + def discover(cliConfig: CliConfig): LoaderAction[List[DataDiscovery]] = { + // Shortcuts + val shredJob = cliConfig.configYaml.storage.versions.rdbShredder + val region = cliConfig.configYaml.aws.s3.region + val assets = cliConfig.configYaml.aws.s3.buckets.jsonpathAssets + + val folder = cliConfig.folder match { + case Some(f) => + DataDiscovery.InSpecificFolder(f) + case None => + DataDiscovery.InShreddedGood(cliConfig.configYaml.aws.s3.buckets.shredded.good) + } + + cliConfig.target match { + case _: RedshiftConfig => + val original = DataDiscovery.discoverFull(folder, shredJob, region, assets) + if (cliConfig.steps.contains(Step.ConsistencyCheck)) + DataDiscovery.checkConsistency(original) + else original + case _: PostgresqlConfig => + // Safe to skip consistency check as whole folder will be downloaded + DataDiscovery.discoverAtomic(folder) + } + } + /** * String representing valid SQL query/statement, * ready to be executed diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala index 2507efae0..8f32d2e04 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala @@ -14,38 +14,29 @@ package com.snowplowanalytics.snowplow.rdbloader package loaders import cats.implicits._ +import cats.data._ import cats.free.Free // This project import LoaderA._ -import config.{Step, SnowplowConfig} +import config.Step import config.StorageTarget.PostgresqlConfig object PostgresqlLoader { /** - * Discovery data in `shredded.good`, build SQL statements to - * load this data and build `LoaderA` structure to interpret. + * Build SQL statements out of discovery and load data * Primary working method. Does not produce side-effects * - * @param config main Snowplow configuration * @param target Redshift storage target configuration * @param steps SQL steps + * @param discovery discovered data to load */ - def run(config: SnowplowConfig, target: PostgresqlConfig, steps: Set[Step], folder: Option[S3.Folder]) = { - val shreddedGood = config.aws.s3.buckets.shredded.good - val discoveryTarget = folder match { - case Some(f) => DataDiscovery.InShreddedGood(f) - case None => DataDiscovery.InShreddedGood(shreddedGood) - } - - // Should be safe to skip consistency check as whole folder gets downloaded - val discovery = DataDiscovery.discoverAtomic(discoveryTarget) + def run(target: PostgresqlConfig, steps: Set[Step], discovery: List[DataDiscovery]) = { val statements = PostgresqlLoadStatements.build(target.eventsTable, steps) for { - folders <- discovery.addStep(Step.Discover) - _ <- folders.traverse(loadFolder(statements)) + _ <- discovery.traverse(loadFolder(statements)) _ <- analyze(statements) _ <- vacuum(statements) } yield () @@ -58,12 +49,12 @@ object PostgresqlLoader { * @param discovery discovered run folder * @return changed app state */ - def loadFolder(statement: PostgresqlLoadStatements)(discovery: DataDiscovery): TargetLoading[LoaderError, Long] = { + def loadFolder(statement: PostgresqlLoadStatements)(discovery: DataDiscovery): LoaderAction[Long] = { for { - tmpdir <- createTmpDir.withoutStep - files <- downloadData(discovery.atomicEvents, tmpdir).addStep(Step.Download) - count <- copyViaStdin(files, statement.events).addStep(Step.Load) - _ <- deleteDir(tmpdir).addStep(Step.Delete) + tmpdir <- EitherT(createTmpDir) + files <- EitherT(downloadData(discovery.atomicEvents, tmpdir)) + count <- EitherT(copyViaStdin(files, statement.events)) + _ <- EitherT(deleteDir(tmpdir)) } yield count } @@ -71,14 +62,14 @@ object PostgresqlLoader { * Return action executing VACUUM statements if there's any vacuum statements, * or noop if no vacuum statements were generated */ - def analyze(statements: PostgresqlLoadStatements): TargetLoading[LoaderError, Unit] = { + def analyze(statements: PostgresqlLoadStatements): LoaderAction[Unit] = { statements.analyze match { case Some(analyze) => val result = executeQueries(List(analyze)).map(_.void) - result.addStep(Step.Analyze) + EitherT(result) case None => val noop: Action[Either[LoaderError, Unit]] = Free.pure(Right(())) - noop.withoutStep + EitherT(noop) } } @@ -86,14 +77,14 @@ object PostgresqlLoader { * Return action executing ANALYZE statements if there's any vacuum statements, * or noop if no vacuum statements were generated */ - def vacuum(statements: PostgresqlLoadStatements): TargetLoading[LoaderError, Unit] = { + def vacuum(statements: PostgresqlLoadStatements): LoaderAction[Unit] = { statements.vacuum match { case Some(vacuum) => val result = executeQueries(List(vacuum)).map(_.void) - result.addStep(Step.Vacuum) + EitherT(result) case None => val noop: Action[Either[LoaderError, Unit]] = Free.pure(Right(())) - noop.withoutStep + EitherT(noop) } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala index 398b93086..fb11b431c 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoadStatements.scala @@ -36,7 +36,8 @@ case class RedshiftLoadStatements( shredded: List[SqlString], vacuum: Option[List[SqlString]], analyze: Option[List[SqlString]], - manifest: SqlString) + manifest: SqlString, + base: S3.Folder) object RedshiftLoadStatements { @@ -77,10 +78,10 @@ object RedshiftLoadStatements { case discovery: FullDiscovery => val shreddedStatements = discovery.shreddedTypes.map(transformShreddedType(config, target, _)) val atomic = RedshiftLoadStatements.buildCopyFromTsvStatement(config, target, discovery.atomicEvents) - buildLoadStatements(target, steps, atomic, shreddedStatements) + buildLoadStatements(target, steps, atomic, shreddedStatements, discovery.base) case _: AtomicDiscovery => val atomic = RedshiftLoadStatements.buildCopyFromTsvStatement(config, target, discovery.atomicEvents) - buildLoadStatements(target, steps, atomic, Nil) + buildLoadStatements(target, steps, atomic, Nil, discovery.base) } } @@ -94,13 +95,15 @@ object RedshiftLoadStatements { * @param atomicCopyStatements COPY statements for `events` table * @param shreddedStatements statements for shredded tables (include COPY, * ANALYZE and VACUUM) + * @param base path to base folder * @return statements ready to be executed on Redshift */ def buildLoadStatements( target: RedshiftConfig, steps: Set[Step], atomicCopyStatements: SqlString, - shreddedStatements: List[ShreddedStatements] + shreddedStatements: List[ShreddedStatements], + base: S3.Folder ): RedshiftLoadStatements = { val shreddedCopyStatements = shreddedStatements.map(_.copy) @@ -118,7 +121,7 @@ object RedshiftLoadStatements { Some(statements) } else None - RedshiftLoadStatements(atomicCopyStatements, shreddedCopyStatements, vacuum, analyze, manifestStatement) + RedshiftLoadStatements(atomicCopyStatements, shreddedCopyStatements, vacuum, analyze, manifestStatement, base) } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala index b341ffdd7..918612b1b 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2017 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.rdbloader package loaders -import cats.free.Free +import cats.data._ import cats.implicits._ // This project @@ -43,49 +43,11 @@ object RedshiftLoader { * @param config main Snowplow configuration * @param target Redshift storage target configuration * @param steps SQL steps - * @param folder specific run-folder to load from instead shredded.good */ - def run(config: SnowplowConfig, target: RedshiftConfig, steps: Set[Step], folder: Option[S3.Folder]) = { - for { - statements <- discover(config, target, steps, folder).addStep(Step.Discover) - result <- load(statements) - } yield result - } - - /** - * Discovers data in `shredded.good` folder with its associated metadata - * (types, JSONPath files etc) and build SQL-statements to load it - * - * @param config main Snowplow configuration - * @param target Redshift storage target configuration - * @param steps SQL steps - * @return action to perform all necessary S3 interactions - */ - def discover(config: SnowplowConfig, target: RedshiftConfig, steps: Set[Step], folder: Option[S3.Folder]): Discovery[LoadQueue] = { - val discoveryTarget = folder match { - case Some(f) => DataDiscovery.InSpecificFolder(f) - case None => DataDiscovery.InShreddedGood(config.aws.s3.buckets.shredded.good) - } - - val shredJob = config.storage.versions.rdbShredder - val region = config.aws.s3.region - val assets = config.aws.s3.buckets.jsonpathAssets - - val discovery = DataDiscovery.discoverFull(discoveryTarget, shredJob, region, assets) - - val consistent = if (steps.contains(Step.ConsistencyCheck)) DataDiscovery.checkConsistency(discovery) else discovery - - Discovery.map(consistent)(buildQueue(config, target, steps)) - } - - /** - * Load all discovered data one by one - * - * @param queue properly sorted list of load statements - * @return application state with performed steps and success/failure result - */ - def load(queue: LoadQueue) = + def run(config: SnowplowConfig, target: RedshiftConfig, steps: Set[Step], discovery: List[DataDiscovery]) = { + val queue = buildQueue(config, target, steps)(discovery) queue.traverse(loadFolder).void + } /** * Perform data-loading for a single run folder. @@ -93,13 +55,15 @@ object RedshiftLoader { * @param statements prepared load statements * @return application state */ - def loadFolder(statements: RedshiftLoadStatements): TargetLoading[LoaderError, Unit] = { + def loadFolder(statements: RedshiftLoadStatements): LoaderAction[Unit] = { import LoaderA._ val loadStatements = statements.events :: statements.shredded ++ List(statements.manifest) for { - _ <- executeTransaction(loadStatements).addStep(Step.Load) + _ <- LoaderAction.liftA(LoaderA.print(s"Processing ${statements.base}")) + _ <- EitherT(executeTransaction(loadStatements)) + _ <- LoaderAction.liftA(LoaderA.print("Loaded")) _ <- vacuum(statements) _ <- analyze(statements) } yield () @@ -109,27 +73,33 @@ object RedshiftLoader { * Return action executing VACUUM statements if there's any vacuum statements, * or noop if no vacuum statements were generated */ - def analyze(statements: RedshiftLoadStatements): TargetLoading[LoaderError, Unit] = { + def analyze(statements: RedshiftLoadStatements): LoaderAction[Unit] = statements.analyze match { - case Some(analyze) => executeTransaction(analyze).addStep(Step.Analyze) - case None => - val noop: Action[Either[LoaderError, Unit]] = Free.pure(().asRight) - noop.withoutStep + case Some(analyze) => + for { + _ <- LoaderAction.liftA(LoaderA.print("Executing ANALYZE transaction")) + _ <- EitherT(executeTransaction(analyze)) + } yield () + case None => LoaderAction.unit } - } + /** * Return action executing ANALYZE statements if there's any vacuum statements, * or noop if no vacuum statements were generated */ - def vacuum(statements: RedshiftLoadStatements): TargetLoading[LoaderError, Unit] = { + def vacuum(statements: RedshiftLoadStatements): LoaderAction[Unit] = { statements.vacuum match { case Some(vacuum) => val block = SqlString.unsafeCoerce("END") :: vacuum - executeQueries(block).addStep(Step.Vacuum) - case None => - val noop: Action[Either[LoaderError, Unit]] = Free.pure(().asRight) - noop.withoutStep + val actions = for { + statement <- block + } yield for { + _ <- LoaderA.print(statement) + _ <- executeQuery(statement) + } yield () + LoaderAction.liftA(actions.sequence).void + case None => LoaderAction.liftA(LoaderA.print("Skip VACUUM")) } } } diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala index 3f7558e08..56de850eb 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala @@ -17,8 +17,7 @@ import cats.data._ import cats.free.Free import cats.implicits._ -import rdbloader.config.Step -import rdbloader.LoaderError.{DiscoveryError, DiscoveryFailure} +import rdbloader.LoaderError.DiscoveryFailure package object rdbloader { /** @@ -30,45 +29,43 @@ package object rdbloader { type Action[A] = Free[LoaderA, A] /** - * Loading effect, producing value of type `A`, - * that also mutates state, until short-circuited - * on failure `F`. In the end gives both - error and - * last state - * - * @tparam F failure, short-circuiting whole computation - * @tparam A value of computation - */ - type TargetLoading[F, A] = EitherT[StateT[Action, List[Step], ?], F, A] + * Loading effect, producing value of type `A` with possible `LoaderError` + * + * @tparam A value of computation + */ + type LoaderAction[A] = EitherT[Action, LoaderError, A] /** Lift value into */ - object TargetLoading { - def lift[A](value: A): TargetLoading[LoaderError, A] = { - val action: Action[A] = Free.pure(value) - val state = StateT.lift[Action, List[Step], A](action) - EitherT.liftT(state) - } + object LoaderAction { + def unit: LoaderAction[Unit] = + EitherT.liftT(Free.pure(())) + + def lift[A](value: A): LoaderAction[A] = + EitherT.liftT(Free.pure(value)) + + def liftE[A](either: Either[LoaderError, A]): LoaderAction[A] = + EitherT(Free.pure(either)) + + def liftA[A](action: Action[A]): LoaderAction[A] = + EitherT(action.map(_.asRight[LoaderError])) } - /** - * IO-free result validation - */ - type DiscoveryStep[A] = Either[DiscoveryFailure, A] + /** Non-short-circuiting version of `TargetLoading` */ + type ActionE[A] = Free[LoaderA, Either[LoaderError, A]] + + object ActionE { + def liftError(error: LoaderError): ActionE[Nothing] = + Free.pure(error.asLeft) + } - /** - * IO Action that can aggregate failures - */ - type ActionValidated[A] = Action[ValidatedNel[DiscoveryFailure, A]] /** - * FullDiscovery discovery process, + * IO-free result validation */ - type Discovery[A] = Action[Either[DiscoveryError, A]] + type DiscoveryStep[A] = Either[DiscoveryFailure, A] - val Discovery = Functor[Action].compose[Either[DiscoveryError, ?]] - /** - * Single discovery step - */ + /** Single discovery step */ type DiscoveryAction[A] = Action[DiscoveryStep[A]] /** @@ -77,28 +74,6 @@ package object rdbloader { private[rdbloader] val DiscoveryAction = Functor[Action].compose[DiscoveryStep] - /** - * Lift stateless `Action[Either[A, B]]` computation into - * computation that has state with last successful - * state in `current`, but also can short-circuit - * with error message - */ - implicit class StatefulLoading[A, B](val loading: Action[Either[B, A]]) extends AnyVal { - def addStep(current: Step): TargetLoading[B, A] = { - EitherT(StateT((last: List[Step]) => loading.map { - case Right(e) => (current :: last, Right(e)) - case Left(e) => (last, Left(e)) - })) - } - - def withoutStep: TargetLoading[B, A] = - EitherT(StateT((last: List[Step]) => loading.map(e => (last, e)))) - } - - implicit class StateToEither[A, B](val loading: Either[B, A]) extends AnyVal { - def withoutStep: TargetLoading[B, A] = - EitherT(StateT((last: List[Step]) => Free.pure(loading).map(e => (last, e)))) - } implicit class AggregateErrors[A, B](eithers: List[Either[A, B]]) { def aggregatedErrors: ValidatedNel[A, List[B]] = diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala index 7eabe219b..a1ca6d953 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/Common.scala @@ -27,7 +27,6 @@ import io.circe._ // This project import LoaderError._ -import config.Step /** * Various common utility functions @@ -53,10 +52,10 @@ object Common { * @param result loading process state * @return log entry, which can be interpreted accordingly */ - def interpret(result: (List[Step], Either[LoaderError, Unit])): Log = { + def interpret(result: Either[LoaderError, Unit]): Log = { result match { - case (steps, Right(_)) => Log.LoadingSucceeded(steps.reverse) - case (steps, Left(error)) => Log.LoadingFailed(error.show, steps.reverse) + case Right(_) => Log.LoadingSucceeded + case Left(error) => Log.LoadingFailed(error.show) } } diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscoverySpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscoverySpec.scala index cbbb66d04..63c5b9916 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscoverySpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/DataDiscoverySpec.scala @@ -103,7 +103,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) - val endResult = result.foldMap(interpreter) + val endResult = result.value.foldMap(interpreter) endResult must beRight(expected) } @@ -169,6 +169,9 @@ class DataDiscoverySpec extends Specification { def is = s2""" case LoaderA.Sleep(timeout) => State.modify((realWorld: RealWorld) => realWorld.copy(waited = timeout :: realWorld.waited)) + case LoaderA.Print(_) => + State.pure(()) + case action => throw new RuntimeException(s"Unexpected Action [$action]") } @@ -211,7 +214,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) val request = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) val result = DataDiscovery.checkConsistency(request) - val (endState, endResult) = result.foldMap(interpreter).run(RealWorld(0, Nil)).value + val (endState, endResult) = result.value.foldMap(interpreter).run(RealWorld(0, Nil)).value val state = endState must beEqualTo(RealWorld(4, List(20000L, 20000L, 20000L))) val response = endResult must beRight(expected) @@ -236,7 +239,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" val discoveryTarget = DataDiscovery.InSpecificFolder(shreddedGood) val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) - val endResult = result.foldMap(interpreter) + val endResult = result.value.foldMap(interpreter) endResult must beLeft(expected) } @@ -260,7 +263,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" // The only difference with e3 val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) - val endResult = result.foldMap(interpreter) + val endResult = result.value.foldMap(interpreter) endResult must beRight(expected) } @@ -316,7 +319,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" ) val discoveryTarget = DataDiscovery.InShreddedGood(shreddedGood) - val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None) + val result = DataDiscovery.discoverFull(discoveryTarget, Semver(0,11,0), "us-east-1", None).value val endResult = result.foldMap(interpreter) endResult must beRight(expected) diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala index e4e73cc2a..6d59bd6ad 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/CommonSpec.scala @@ -17,7 +17,7 @@ import cats._ import org.specs2.Specification // This project -import S3.Key +import S3.Folder import config.{ Step, StorageTarget } class CommonSpec extends Specification { def is = s2""" @@ -28,7 +28,6 @@ class CommonSpec extends Specification { def is = s2""" val expected = List( "EC2 PROPERTY snowplow.redshift.key key", // Retrieve key "SSH TUNNEL ESTABLISH", // Open - "LIST s3://snowplow-acme-storage/shredded/good/", "SLEEP 15000", "LIST s3://snowplow-acme-storage/shredded/good/", "BEGIN", "COPY", "INSERT", "COMMIT", "BEGIN", "ANALYZE", "COMMIT", "SSH TUNNEL CLOSE") // Close @@ -71,17 +70,12 @@ class CommonSpec extends Specification { def is = s2""" actions.append("SSH TUNNEL ESTABLISH") Right(()) - case LoaderA.ListS3(bucket) => - actions.append(s"LIST $bucket") - Right(List(Key.coerce(bucket ++ "run=2017-10-10-10-30-30/atomic-events/part-0001"))) - - case LoaderA.Sleep(time) => - actions.append(s"SLEEP $time") - case LoaderA.CloseTunnel() => actions.append(s"SSH TUNNEL CLOSE") Right(()) + case LoaderA.Print(_) => () + case action => throw new RuntimeException(s"Unexpected Action [$action]") } @@ -89,14 +83,15 @@ class CommonSpec extends Specification { def is = s2""" } val cliConfig = config.CliConfig(SpecHelpers.validConfig, target, Step.defaultSteps, None, None, false) - val state = Common.load(cliConfig) - val action = state.value.run(List.empty[Step]) - val (steps, result) = action.foldMap(interpreter) + val discovery = DataDiscovery.FullDiscovery( + Folder.coerce(cliConfig.configYaml.aws.s3.buckets.shredded.good ++ "run=2017-10-10-10-30-30/"), 1L, Nil) + val state = Common.load(cliConfig, List(discovery)) + val action = state.value + val result = action.foldMap(interpreter) val transactionsExpectation = actions.toList must beEqualTo(expected) val resultExpectation = result must beRight - val stepsExpectation = steps must beEqualTo(List(Step.Discover, Step.Load, Step.Analyze).reverse) - transactionsExpectation.and(resultExpectation).and(stepsExpectation) + transactionsExpectation.and(resultExpectation) } } diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala index 103d81914..d82c16644 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala @@ -14,17 +14,17 @@ package com.snowplowanalytics.snowplow.rdbloader package loaders import cats.{Id, ~>} -import cats.implicits._ import org.specs2.Specification // This project import Common.SqlString.{unsafeCoerce => sql} -import config.Step +import S3.{ Folder, Key } +import config.{ CliConfig, Step, Semver } class RedshiftLoaderSpec extends Specification { def is = s2""" - Discover atomic events data and create load statements $e1 - Discover full data and create load statements $e2 + Discover atomic events data $e1 + Create load statements for full data discovery $e2 Do not fail on empty discovery $e3 Do not sleep with disabled consistency check $e4 Perform manifest-insertion and load within same transaction $e5 @@ -53,76 +53,29 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" } } - val separator = "\t" - val action = RedshiftLoader.discover(validConfig, validTarget, Set.empty, None) - val result = action.foldMap(interpreter) - - val atomic = - s""" - |COPY atomic.events FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/atomic-events/' - | CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' - | DELIMITER '$separator' MAXERROR 1 - | EMPTYASNULL FILLRECORD TRUNCATECOLUMNS - | TIMEFORMAT 'auto' ACCEPTINVCHARS ;""".stripMargin - - val manifest = - """ - |INSERT INTO atomic.manifest - | SELECT etl_tstamp, sysdate AS commit_tstamp, count(*) AS event_count, 0 AS shredded_cardinality - | FROM atomic.events - | WHERE etl_tstamp IS NOT null - | GROUP BY 1 - | ORDER BY etl_tstamp DESC - | LIMIT 1;""".stripMargin + val expected = + List(DataDiscovery.FullDiscovery( S3.Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/"), 2, Nil)) - val expected = List(RedshiftLoadStatements(sql(atomic),Nil,None,None,sql(manifest))) + val action = Common.discover(CliConfig(validConfig, validTarget, Set.empty, None, None, false)) + val result = action.value.foldMap(interpreter) result must beRight(expected) } def e2 = { - def interpreter: LoaderA ~> Id = new (LoaderA ~> Id) { - - private val cache = collection.mutable.HashMap.empty[String, Option[S3.Key]] - - def apply[A](effect: LoaderA[A]): Id[A] = { - effect match { - case LoaderA.ListS3(bucket) => - Right(List( - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00001-dbb35260-7b12-494b-be87-e7a4b1f59906.txt"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00002-cba3a610-0b90-494b-be87-e7a4b1f59906.txt"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00003-fba35670-9b83-494b-be87-e7a4b1f59906.txt"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00004-fba3866a-8b90-494b-be87-e7a4b1fa9906.txt"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00005-aba3568f-7b96-494b-be87-e7a4b1fa9906.txt") - )) - - case LoaderA.Get(key: String) => - cache.get(key) - case LoaderA.Put(key: String, value: Option[S3.Key]) => - val _ = cache.put(key, value) - () - - case LoaderA.KeyExists(k) => - if (k == "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json") { - true - } else false - - case LoaderA.Sleep(_) => () - - case action => - throw new RuntimeException(s"Unexpected Action [$action]") - } - } - } - val separator = "\t" val steps: Set[Step] = Step.defaultSteps ++ Set(Step.Vacuum) - val action = RedshiftLoader.discover(validConfig, validTarget, steps, None) - val result: Either[LoaderError, List[RedshiftLoadStatements]] = action.foldMap(interpreter) + val discovery = DataDiscovery.FullDiscovery( + S3.Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/"), 3, + List( + ShreddedType( + ShreddedType.Info(Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0, 12, 0)), + Key.coerce("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json") + ) + ) + ) + val result = RedshiftLoadStatements.buildQueue(validConfig, validTarget, steps)(List(discovery)) val atomic = s""" |COPY atomic.events FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/atomic-events/' @@ -155,9 +108,12 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" | ORDER BY etl_tstamp DESC | LIMIT 1;""".stripMargin - val expected = List(RedshiftLoadStatements(sql(atomic), shredded, Some(vacuum), Some(analyze), sql(manifest))) + val expected = List(RedshiftLoadStatements( + sql(atomic), shredded, Some(vacuum), Some(analyze), sql(manifest), Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/") + + )) - result.map(_.head) must beRight(expected.head) + result must beEqualTo(expected) } def e3 = { @@ -177,14 +133,10 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" } val steps: Set[Step] = Step.defaultSteps ++ Set(Step.Vacuum) - val action = RedshiftLoader.run(validConfig, validTarget, steps, None) - val (resultSteps, result) = action.value.run(Nil).foldMap(interpreter) + val action = RedshiftLoader.run(validConfig, validTarget, steps, Nil) + val result = action.value.foldMap(interpreter) - val expected = List(Step.Discover) - - val stepsExpectation = resultSteps must beEqualTo(expected) - val resultExpectation = result must beRight - stepsExpectation.and(resultExpectation) + result must beRight } def e4 = { @@ -226,46 +178,21 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" } } - val separator = "\t" - val steps: Set[Step] = (Step.defaultSteps - Step.ConsistencyCheck) ++ Set(Step.Vacuum) - val action = RedshiftLoader.discover(validConfig, validTarget, steps, None) - val result: Either[LoaderError, List[RedshiftLoadStatements]] = action.foldMap(interpreter) - - val atomic = s""" - |COPY atomic.events FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/atomic-events/' - | CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' - | DELIMITER '$separator' MAXERROR 1 - | EMPTYASNULL FILLRECORD TRUNCATECOLUMNS - | TIMEFORMAT 'auto' ACCEPTINVCHARS ;""".stripMargin + val action = Common.discover(CliConfig(validConfig, validTarget, steps, None, None, false)).value + val result: Either[LoaderError, List[DataDiscovery]] = action.foldMap(interpreter) + + val expected = List(DataDiscovery.FullDiscovery( + S3.Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/"), 3, + List( + ShreddedType( + ShreddedType.Info(Folder.coerce("s3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0, 12, 0, Some(Semver.ReleaseCandidate(4)))), + Key.coerce("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json") + ) + ) + )) - val vacuum = List( - sql("VACUUM SORT ONLY atomic.events;"), - sql("VACUUM SORT ONLY atomic.com_snowplowanalytics_snowplow_submit_form_1;")) - - val analyze = List( - sql("ANALYZE atomic.events;"), - sql("ANALYZE atomic.com_snowplowanalytics_snowplow_submit_form_1;")) - - val shredded = List(sql(""" - |COPY atomic.com_snowplowanalytics_snowplow_submit_form_1 FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-' - | CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json' - | REGION AS 'us-east-1' - | MAXERROR 1 TRUNCATECOLUMNS TIMEFORMAT 'auto' - | ACCEPTINVCHARS ;""".stripMargin)) - - val manifest = """ - |INSERT INTO atomic.manifest - | SELECT etl_tstamp, sysdate AS commit_tstamp, count(*) AS event_count, 1 AS shredded_cardinality - | FROM atomic.events - | WHERE etl_tstamp IS NOT null - | GROUP BY 1 - | ORDER BY etl_tstamp DESC - | LIMIT 1;""".stripMargin - - val expected = List(RedshiftLoadStatements(sql(atomic), shredded, Some(vacuum), Some(analyze), sql(manifest))) - - result.map(_.head) must beRight(expected.head) + result must beRight(expected) } def e5 = { @@ -294,6 +221,9 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" queries.append(query) Right(1L) + case LoaderA.Print(_) => + () + case action => throw new RuntimeException(s"Unexpected Action [$action]") } @@ -305,18 +235,17 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" List(sql("LOAD INTO SHRED 1 MOCK"), sql("LOAD INTO SHRED 2 MOCK"), sql("LOAD INTO SHRED 3 MOCK")), Some(List(sql("VACUUM MOCK"))), // Must be shred cardinality + 1 Some(List(sql("ANALYZE MOCK"))), - sql("MANIFEST INSERT MOCK") + sql("MANIFEST INSERT MOCK"), + Folder.coerce("s3://noop") ) val state = RedshiftLoader.loadFolder(input) - val action = state.value.run(List.empty[Step]) - val (steps, result) = action.foldMap(interpreter) + val action = state.value + val result = action.foldMap(interpreter) val transactionsExpectation = queries.toList must beEqualTo(expected) val resultExpectation = result must beRight - val stepsExpectation = steps must beEqualTo(List(Step.Load, Step.Vacuum, Step.Analyze).reverse) - transactionsExpectation.and(resultExpectation).and(stepsExpectation) + transactionsExpectation.and(resultExpectation) } - }