From 6782a7e20b87420790d740a21c135bf6175f9e7f Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Wed, 17 Apr 2019 14:24:08 +0700 Subject: [PATCH] RDB Loader: fix non-exhaustive match in data discovery (close #117) --- .../rdbloader/discovery/DataDiscovery.scala | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala index d577af4a0..108f87f56 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala @@ -82,6 +82,8 @@ case class DataDiscovery( */ object DataDiscovery { + type Discovered = List[DataDiscovery] + /** Amount of times consistency check will be performed */ val ConsistencyChecks = 5 @@ -115,8 +117,8 @@ object DataDiscovery { id: String, shredJob: Semver, region: String, - assets: Option[S3.Folder]): LoaderAction[List[DataDiscovery]] = { - def group(validatedDataKeys: LoaderAction[ValidatedDataKeys]): LoaderAction[List[DataDiscovery]] = + assets: Option[S3.Folder]): LoaderAction[Discovered] = { + def group(validatedDataKeys: LoaderAction[ValidatedDataKeys]): LoaderAction[Discovered] = for { keys <- validatedDataKeys discovery <- groupKeysFull(keys) @@ -151,7 +153,7 @@ object DataDiscovery { } /** Properly set `specificFolder` flag */ - def setSpecificFolder(target: DiscoveryTarget, discovery: LoaderAction[List[DataDiscovery]]): LoaderAction[List[DataDiscovery]] = { + def setSpecificFolder(target: DiscoveryTarget, discovery: LoaderAction[Discovered]): LoaderAction[Discovered] = { val F = Functor[LoaderAction].compose[List] F.map(discovery) { d => target match { @@ -177,12 +179,12 @@ object DataDiscovery { * @param validatedDataKeys IO-action producing validated list of `FinalDataKey` * @return IO-action producing list of */ - def groupKeysFull(validatedDataKeys: ValidatedDataKeys): LoaderAction[List[DataDiscovery]] = { - def group(dataKeys: List[DataKeyFinal]): ValidatedNel[DiscoveryFailure, List[DataDiscovery]] = + def groupKeysFull(validatedDataKeys: ValidatedDataKeys): LoaderAction[Discovered] = { + def group(dataKeys: List[DataKeyFinal]): ValidatedNel[DiscoveryFailure, Discovered] = dataKeys.groupBy(_.base).toList.reverse.traverse(validateFolderFull) // Transform into Either with non-empty list of errors - val result: Action[Either[LoaderError, List[DataDiscovery]]] = + val result: Action[Either[LoaderError, Discovered]] = validatedDataKeys.map { keys => keys.andThen(group) match { case Validated.Valid(discovery) => @@ -303,9 +305,9 @@ object DataDiscovery { * @param originalAction data-discovery action * @return result of same request, but with more guarantees to be consistent */ - def checkConsistency(originalAction: LoaderAction[List[DataDiscovery]]): LoaderAction[List[DataDiscovery]] = { - def check(checkAttempt: Int, last: Option[Either[LoaderError, List[DataDiscovery]]]): ActionE[List[DataDiscovery]] = { - val action = last.map(Free.pure[LoaderA, Either[LoaderError, List[DataDiscovery]]]).getOrElse(originalAction.value) + def checkConsistency(originalAction: LoaderAction[Discovered]): LoaderAction[Discovered] = { + def check(checkAttempt: Int, last: Option[Either[LoaderError, Discovered]]): ActionE[Discovered] = { + val action = last.map(Free.pure[LoaderA, Either[LoaderError, Discovered]]).getOrElse(originalAction.value) for { original <- action @@ -315,10 +317,7 @@ object DataDiscovery { } yield result } - def retry( - original: Either[LoaderError, List[DataDiscovery]], - control: Either[LoaderError, List[DataDiscovery]], - attempt: Int): ActionE[List[DataDiscovery]] = { + def retry(original: Either[LoaderError, Discovered], control: Either[LoaderError, Discovered], attempt: Int): ActionE[Discovered] = { (original, control) match { case _ if attempt >= ConsistencyChecks => for { @@ -331,11 +330,6 @@ object DataDiscovery { _ <- LoaderA.print(s"Consistency check passed after ${attempt - 1} attempt. Following run ids found:\n$message") discovered <- Free.pure(original) } yield discovered - case (Left(_), Left(_)) => - for { - _ <- LoaderA.print(s"Consistency check failed. Making another attempt") - next <- check(attempt, None) - } yield next case (Right(o), Right(c)) => val message = if (attempt == ConsistencyChecks - 1) s"Difference:\n ${discoveryDiff(o, c).map(m => s"+ $m").mkString("\n")}" @@ -345,13 +339,18 @@ object DataDiscovery { _ <- LoaderA.print(s"Consistency check failed. $message") next <- check(attempt, Some(control)) } yield next + case _ => + for { + _ <- LoaderA.print(s"Consistency check failed. Making another attempt") + next <- check(attempt, None) + } yield next } } - EitherT[Action, LoaderError, List[DataDiscovery]](check(1, None)) + EitherT[Action, LoaderError, Discovered](check(1, None)) } - def discoveryDiff(original: List[DataDiscovery], control: List[DataDiscovery]): List[String] = { + def discoveryDiff(original: Discovered, control: Discovered): List[String] = { original.flatMap { o => control.find(_.base == o.base) match { case None => List(s"Folder ${o.base} was not found in control check (probably ghost-folder)") @@ -370,7 +369,7 @@ object DataDiscovery { /** * Aggregates wait time for all discovered folders or wait 10 sec in case action failed */ - private def sleepConsistency(result: Either[LoaderError, List[DataDiscovery]]): Action[Unit] = { + private def sleepConsistency(result: Either[LoaderError, Discovered]): Action[Unit] = { val timeoutMs = result match { case Right(list) => list.map(_.consistencyTimeout).foldLeft(10000L)(_ + _)