Skip to content

Commit

Permalink
RDB Loader: fix non-exhaustive match in data discovery (close #117)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Aug 13, 2019
1 parent b0b9370 commit 6782a7e
Showing 1 changed file with 20 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ case class DataDiscovery(
*/
object DataDiscovery {

type Discovered = List[DataDiscovery]

/** Amount of times consistency check will be performed */
val ConsistencyChecks = 5

Expand Down Expand Up @@ -115,8 +117,8 @@ object DataDiscovery {
id: String,
shredJob: Semver,
region: String,
assets: Option[S3.Folder]): LoaderAction[List[DataDiscovery]] = {
def group(validatedDataKeys: LoaderAction[ValidatedDataKeys]): LoaderAction[List[DataDiscovery]] =
assets: Option[S3.Folder]): LoaderAction[Discovered] = {
def group(validatedDataKeys: LoaderAction[ValidatedDataKeys]): LoaderAction[Discovered] =
for {
keys <- validatedDataKeys
discovery <- groupKeysFull(keys)
Expand Down Expand Up @@ -151,7 +153,7 @@ object DataDiscovery {
}

/** Properly set `specificFolder` flag */
def setSpecificFolder(target: DiscoveryTarget, discovery: LoaderAction[List[DataDiscovery]]): LoaderAction[List[DataDiscovery]] = {
def setSpecificFolder(target: DiscoveryTarget, discovery: LoaderAction[Discovered]): LoaderAction[Discovered] = {
val F = Functor[LoaderAction].compose[List]
F.map(discovery) { d =>
target match {
Expand All @@ -177,12 +179,12 @@ object DataDiscovery {
* @param validatedDataKeys IO-action producing validated list of `FinalDataKey`
* @return IO-action producing list of
*/
def groupKeysFull(validatedDataKeys: ValidatedDataKeys): LoaderAction[List[DataDiscovery]] = {
def group(dataKeys: List[DataKeyFinal]): ValidatedNel[DiscoveryFailure, List[DataDiscovery]] =
def groupKeysFull(validatedDataKeys: ValidatedDataKeys): LoaderAction[Discovered] = {
def group(dataKeys: List[DataKeyFinal]): ValidatedNel[DiscoveryFailure, Discovered] =
dataKeys.groupBy(_.base).toList.reverse.traverse(validateFolderFull)

// Transform into Either with non-empty list of errors
val result: Action[Either[LoaderError, List[DataDiscovery]]] =
val result: Action[Either[LoaderError, Discovered]] =
validatedDataKeys.map { keys =>
keys.andThen(group) match {
case Validated.Valid(discovery) =>
Expand Down Expand Up @@ -303,9 +305,9 @@ object DataDiscovery {
* @param originalAction data-discovery action
* @return result of same request, but with more guarantees to be consistent
*/
def checkConsistency(originalAction: LoaderAction[List[DataDiscovery]]): LoaderAction[List[DataDiscovery]] = {
def check(checkAttempt: Int, last: Option[Either[LoaderError, List[DataDiscovery]]]): ActionE[List[DataDiscovery]] = {
val action = last.map(Free.pure[LoaderA, Either[LoaderError, List[DataDiscovery]]]).getOrElse(originalAction.value)
def checkConsistency(originalAction: LoaderAction[Discovered]): LoaderAction[Discovered] = {
def check(checkAttempt: Int, last: Option[Either[LoaderError, Discovered]]): ActionE[Discovered] = {
val action = last.map(Free.pure[LoaderA, Either[LoaderError, Discovered]]).getOrElse(originalAction.value)

for {
original <- action
Expand All @@ -315,10 +317,7 @@ object DataDiscovery {
} yield result
}

def retry(
original: Either[LoaderError, List[DataDiscovery]],
control: Either[LoaderError, List[DataDiscovery]],
attempt: Int): ActionE[List[DataDiscovery]] = {
def retry(original: Either[LoaderError, Discovered], control: Either[LoaderError, Discovered], attempt: Int): ActionE[Discovered] = {
(original, control) match {
case _ if attempt >= ConsistencyChecks =>
for {
Expand All @@ -331,11 +330,6 @@ object DataDiscovery {
_ <- LoaderA.print(s"Consistency check passed after ${attempt - 1} attempt. Following run ids found:\n$message")
discovered <- Free.pure(original)
} yield discovered
case (Left(_), Left(_)) =>
for {
_ <- LoaderA.print(s"Consistency check failed. Making another attempt")
next <- check(attempt, None)
} yield next
case (Right(o), Right(c)) =>
val message = if (attempt == ConsistencyChecks - 1)
s"Difference:\n ${discoveryDiff(o, c).map(m => s"+ $m").mkString("\n")}"
Expand All @@ -345,13 +339,18 @@ object DataDiscovery {
_ <- LoaderA.print(s"Consistency check failed. $message")
next <- check(attempt, Some(control))
} yield next
case _ =>
for {
_ <- LoaderA.print(s"Consistency check failed. Making another attempt")
next <- check(attempt, None)
} yield next
}
}

EitherT[Action, LoaderError, List[DataDiscovery]](check(1, None))
EitherT[Action, LoaderError, Discovered](check(1, None))
}

def discoveryDiff(original: List[DataDiscovery], control: List[DataDiscovery]): List[String] = {
def discoveryDiff(original: Discovered, control: Discovered): List[String] = {
original.flatMap { o =>
control.find(_.base == o.base) match {
case None => List(s"Folder ${o.base} was not found in control check (probably ghost-folder)")
Expand All @@ -370,7 +369,7 @@ object DataDiscovery {
/**
* Aggregates wait time for all discovered folders or wait 10 sec in case action failed
*/
private def sleepConsistency(result: Either[LoaderError, List[DataDiscovery]]): Action[Unit] = {
private def sleepConsistency(result: Either[LoaderError, Discovered]): Action[Unit] = {
val timeoutMs = result match {
case Right(list) =>
list.map(_.consistencyTimeout).foldLeft(10000L)(_ + _)
Expand Down

0 comments on commit 6782a7e

Please sign in to comment.