Skip to content

Commit

Permalink
RDB Loader: improve log output (close #23)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jun 14, 2018
1 parent 9106f61 commit ddbde29
Show file tree
Hide file tree
Showing 20 changed files with 395 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,30 @@
*/
package com.snowplowanalytics.snowplow.rdbloader

import cats.data.{EitherT, Validated, ValidatedNel, NonEmptyList}
import cats.data._
import cats.implicits._
import cats.free.Free

import ShreddedType._
import LoaderError.{AtomicDiscoveryFailure, DiscoveryError, DiscoveryFailure, NoDataFailure}
import LoaderError._
import config.Semver

/**
* Result of data discovery in shredded.good folder
*/
sealed trait DataDiscovery extends Product with Serializable {
/**
* Shred run folder full path
*/
/** Shred run folder full path */
def base: S3.Folder

/**
* Amount of keys in atomic-events directory
*/
/** ETL id */
def runId: String = base.split("/").last

/** Amount of keys in atomic-events directory */
def atomicCardinality: Long

/**
* `atomic-events` directory full path
*/
def atomicEvents = S3.Folder.append(base, "atomic-events")
/** `atomic-events` directory full path */
def atomicEvents: S3.Folder =
S3.Folder.append(base, "atomic-events")

/**
* Time in ms for run folder to setup eventual consistency,
Expand All @@ -49,6 +47,13 @@ sealed trait DataDiscovery extends Product with Serializable {
case DataDiscovery.FullDiscovery(_, _, shreddedData) =>
((atomicCardinality * 0.1 * shreddedData.length).toLong + 5L) * 1000
}

def show: String = this match {
case DataDiscovery.AtomicDiscovery(_, cardinality) =>
s"$runId with $cardinality atomic files"
case DataDiscovery.FullDiscovery(_, cardinality, types) =>
s"$runId with $cardinality atomic files and following shredded types:\n${types.map(t => " + " + t.show).mkString("\n")}"
}
}

/**
Expand All @@ -62,11 +67,14 @@ sealed trait DataDiscovery extends Product with Serializable {
*/
object DataDiscovery {

val ConsistencyChecks = 5

/**
* ADT indicating whether shredded.good or arbitrary folder
* Empty dir will be a discovery failure in later case
* `InSpecificFolder` results on discovery error on empty folder
* `InShreddedGood` results in noop on empty folder
*/
sealed trait DiscoveryTarget
sealed trait DiscoveryTarget extends Product with Serializable
case class InShreddedGood(folder: S3.Folder) extends DiscoveryTarget
case class InSpecificFolder(folder: S3.Folder) extends DiscoveryTarget

Expand Down Expand Up @@ -96,12 +104,12 @@ object DataDiscovery {
* @return list (probably empty, but usually with single element) of discover results
* (atomic events and shredded types)
*/
def discoverFull(target: DiscoveryTarget, shredJob: Semver, region: String, assets: Option[S3.Folder]): Discovery[List[DataDiscovery]] = {
val validatedDataKeys: Discovery[ValidatedDataKeys] = target match {
def discoverFull(target: DiscoveryTarget, shredJob: Semver, region: String, assets: Option[S3.Folder]): LoaderAction[List[DataDiscovery]] = {
val validatedDataKeys: LoaderAction[ValidatedDataKeys] = target match {
case InShreddedGood(folder) =>
Discovery.map(listGoodBucket(folder))(transformKeys(shredJob, region, assets))
listGoodBucket(folder).map(transformKeys(shredJob, region, assets))
case InSpecificFolder(folder) =>
Discovery.map(listGoodBucket(folder)) { keys =>
listGoodBucket(folder).map { keys =>
if (keys.isEmpty) {
val failure = Validated.Invalid(NonEmptyList(NoDataFailure(folder), Nil))
Free.pure(failure)
Expand All @@ -110,10 +118,10 @@ object DataDiscovery {
}

val result = for {
keys <- EitherT(validatedDataKeys)
discovery <- EitherT(groupKeysFull(keys))
keys <- validatedDataKeys
discovery <- groupKeysFull(keys)
} yield discovery
result.value
result
}

/**
Expand All @@ -122,26 +130,27 @@ object DataDiscovery {
* @param target either shredded good or specific run folder
* @return list of run folders with `atomic-events`
*/
def discoverAtomic(target: DiscoveryTarget): Discovery[List[DataDiscovery]] =
def discoverAtomic(target: DiscoveryTarget): LoaderAction[List[DataDiscovery]] =
target match {
case InShreddedGood(folder) =>
Discovery.map(listGoodBucket(folder))(groupKeysAtomic).map(_.flatten)
case InSpecificFolder(folder) =>
val result = Discovery.map(listGoodBucket(folder)) { keys =>
if (keys.isEmpty) {
val error = DiscoveryError(List(NoDataFailure(folder)))
error.asLeft[List[AtomicDiscovery]]
} else groupKeysAtomic(keys)
}
for {
keys <- listGoodBucket(folder)
grouped <- LoaderAction.liftE(groupKeysAtomic(keys))
} yield grouped

result.map(_.flatten)
case InSpecificFolder(folder) =>
for {
keys <- listGoodBucket(folder)
discovery = if (keys.isEmpty) DiscoveryError(NoDataFailure(folder)).asLeft else groupKeysAtomic(keys)
result <- LoaderAction.liftE[List[DataDiscovery]](discovery)
} yield result
}

/**
* List whole directory excluding special files
*/
def listGoodBucket(folder: S3.Folder): Discovery[List[S3.Key]] =
Discovery.map(LoaderA.listS3(folder))(_.filterNot(isSpecial))
def listGoodBucket(folder: S3.Folder): LoaderAction[List[S3.Key]] =
EitherT(LoaderA.listS3(folder)).map(_.filterNot(isSpecial))

// Full discovery

Expand All @@ -151,19 +160,22 @@ object DataDiscovery {
* @param validatedDataKeys IO-action producing validated list of `FinalDataKey`
* @return IO-action producing list of
*/
def groupKeysFull(validatedDataKeys: ValidatedDataKeys): Discovery[List[DataDiscovery]] = {
def groupKeysFull(validatedDataKeys: ValidatedDataKeys): LoaderAction[List[DataDiscovery]] = {
def group(dataKeys: List[DataKeyFinal]): ValidatedNel[DiscoveryFailure, List[DataDiscovery]] =
dataKeys.groupBy(_.base).toList.reverse.traverse(validateFolderFull)

// Transform into Either with non-empty list of errors
validatedDataKeys.map { keys =>
keys.andThen(group) match {
case Validated.Valid(discovery) => Right(discovery)
case Validated.Invalid(failures) =>
val aggregated = LoaderError.aggregateDiscoveryFailures(failures.toList).distinct
Left(DiscoveryError(aggregated))
val result: Action[Either[LoaderError, List[DataDiscovery]]] =
validatedDataKeys.map { keys =>
keys.andThen(group) match {
case Validated.Valid(discovery) =>
discovery.asRight
case Validated.Invalid(failures) =>
val aggregated = LoaderError.aggregateDiscoveryFailures(failures.toList).distinct
DiscoveryError(aggregated).asLeft
}
}
}
EitherT(result)
}

/**
Expand Down Expand Up @@ -207,15 +219,18 @@ object DataDiscovery {
* @param assets optional JSONPath assets S3 bucket
* @return `Action` conaining `Validation` - as on next step we can aggregate errors
*/
private def transformDataKey(dataKey: DiscoveryStep[DataKeyIntermediate], region: String, assets: Option[S3.Folder]) = {
private def transformDataKey(
dataKey: DiscoveryStep[DataKeyIntermediate],
region: String,
assets: Option[S3.Folder]
): Action[ValidatedNel[DiscoveryFailure, DataKeyFinal]] = {
dataKey match {
case Right(ShreddedDataKeyIntermediate(fullPath, info)) =>
val jsonpathAction = ShreddedType.discoverJsonPath(region, assets, info)
val discoveryAction: DiscoveryAction[DataKeyFinal] =
DiscoveryAction.map(jsonpathAction) { jsonpath =>
ShreddedDataKeyFinal(fullPath, ShreddedType(info, jsonpath))
}
discoveryAction.map(_.toValidatedNel)
val jsonpathAction = EitherT(ShreddedType.discoverJsonPath(region, assets, info))
val discoveryAction = jsonpathAction.map { jsonpath =>
ShreddedDataKeyFinal(fullPath, ShreddedType(info, jsonpath))
}
discoveryAction.value.map(_.toValidatedNel)
case Right(AtomicDataKey(fullPath)) =>
val pure: Action[ValidatedNel[DiscoveryFailure, DataKeyFinal]] =
Free.pure(AtomicDataKey(fullPath).validNel[DiscoveryFailure])
Expand Down Expand Up @@ -253,7 +268,7 @@ object DataDiscovery {
/**
* Group list of S3 keys by run folder
*/
def groupKeysAtomic(keys: List[S3.Key]): Either[DiscoveryError, List[AtomicDiscovery]] = {
def groupKeysAtomic(keys: List[S3.Key]): Either[LoaderError, List[AtomicDiscovery]] = {
val atomicKeys: ValidatedNel[DiscoveryFailure, List[AtomicDataKey]] =
keys.filter(isAtomic).map(parseAtomicKey).map(_.toValidatedNel).sequence

Expand Down Expand Up @@ -307,39 +322,82 @@ object DataDiscovery {
* @param originalAction data-discovery action
* @return result of same request, but with more guarantees to be consistent
*/
def checkConsistency(originalAction: Discovery[List[DataDiscovery]]): Discovery[List[DataDiscovery]] = {
def check(checkAttempt: Int, last: Option[Either[DiscoveryError, List[DataDiscovery]]]): Discovery[List[DataDiscovery]] = {
val action = last.map(Free.pure[LoaderA, Either[DiscoveryError, List[DataDiscovery]]]).getOrElse(originalAction)
def checkConsistency(originalAction: LoaderAction[List[DataDiscovery]]): LoaderAction[List[DataDiscovery]] = {
def check(checkAttempt: Int, last: Option[Either[LoaderError, List[DataDiscovery]]]): ActionE[List[DataDiscovery]] = {
val action = last.map(Free.pure[LoaderA, Either[LoaderError, List[DataDiscovery]]]).getOrElse(originalAction.value)

for {
original <- action
_ <- sleepConsistency(original)
control <- originalAction
control <- originalAction.value
result <- retry(original, control, checkAttempt + 1)
} yield result
}

def retry(
original: Either[DiscoveryError, List[DataDiscovery]],
control: Either[DiscoveryError, List[DataDiscovery]],
attempt: Int): Discovery[List[DataDiscovery]] = {
if (attempt >= 5)
Free.pure(control.orElse(original))
else if (original.isRight && original == control)
Free.pure(original)
else if (control.isLeft || original.isLeft)
check(attempt, None)
else // Both Right, but not equal
check(attempt, Some(control))
original: Either[LoaderError, List[DataDiscovery]],
control: Either[LoaderError, List[DataDiscovery]],
attempt: Int): ActionE[List[DataDiscovery]] = {
(original, control) match {
case _ if attempt >= ConsistencyChecks =>
for {
_ <- LoaderA.print(s"Consistency check did not pass after $ConsistencyChecks attempts")
discovered <- Free.pure(control.orElse(original))
} yield discovered
case (Right(o), Right(c)) if o.sortBy(_.base.toString) == c.sortBy(_.base.toString) =>
val message = o.map(x => s"+ ${x.show}").mkString("\n")
for {
_ <- LoaderA.print(s"Consistency check passed after ${attempt - 1} attempt. Following run ids found:\n$message")
discovered <- Free.pure(original)
} yield discovered
case (Left(_), Left(_)) =>
for {
_ <- LoaderA.print(s"Consistency check failed. Making another attempt")
next <- check(attempt, None)
} yield next
case (Right(o), Right(c)) =>
val message = if (attempt == ConsistencyChecks - 1)
s"Difference:\n ${discoveryDiff(o, c).map(m => s"+ $m").mkString("\n")}"
else ""

for {
_ <- LoaderA.print(s"Consistency check failed. $message")
next <- check(attempt, Some(control))
} yield next
}
}

EitherT[Action, LoaderError, List[DataDiscovery]](check(1, None))
}


def discoveryDiff(original: List[DataDiscovery], control: List[DataDiscovery]): List[String] = {
original.flatMap { o =>
control.find(_.base == o.base) match {
case None => List(s"Folder ${o.base} was not found in control check (probably ghost-folder)")
case Some(c) => discoveryDiff(o, c)
}
}
}

check(1, None)
/** Get difference-message between two checks. Assuming they have same base */
private def discoveryDiff(original: DataDiscovery, control: DataDiscovery) = {
(if (original.atomicCardinality != control.atomicCardinality) List("Different cardinality of atomic files") else Nil) ++
((original, control) match {
case (o: FullDiscovery, c: FullDiscovery) =>
o.shreddedTypes.diff(c.shreddedTypes).map { d => s"${d.show} exists in first check and misses in control" } ++
c.shreddedTypes.diff(o.shreddedTypes).map { d => s"${d.show} exists in control check and misses in first" }
case (_: AtomicDiscovery, _: AtomicDiscovery) =>
Nil
case _ =>
List("Inconsistent state") // not possible
})
}

/**
* Aggregates wait time for all discovered folders or wait 10 sec in case action failed
*/
private def sleepConsistency(result: Either[DiscoveryError, List[DataDiscovery]]): Action[Unit] = {
private def sleepConsistency(result: Either[LoaderError, List[DataDiscovery]]): Action[Unit] = {
val timeoutMs = result match {
case Right(list) =>
list.map(_.consistencyTimeout).foldLeft(10000L)(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package com.snowplowanalytics.snowplow.rdbloader
import java.nio.file.Path

import cats.free.Free
import cats.data.EitherT
import cats.implicits._

// This library
import LoaderError.DiscoveryError
import Security.Tunnel
import loaders.Common.SqlString

Expand All @@ -31,7 +31,7 @@ sealed trait LoaderA[A]
object LoaderA {

// Discovery ops
case class ListS3(bucket: S3.Folder) extends LoaderA[Either[DiscoveryError, List[S3.Key]]]
case class ListS3(bucket: S3.Folder) extends LoaderA[Either[LoaderError, List[S3.Key]]]
case class KeyExists(key: S3.Key) extends LoaderA[Boolean]
case class DownloadData(path: S3.Folder, dest: Path) extends LoaderA[Either[LoaderError, List[Path]]]

Expand All @@ -48,6 +48,7 @@ object LoaderA {
case class Track(exitLog: Log) extends LoaderA[Unit]
case class Dump(key: S3.Key, exitLog: Log) extends LoaderA[Either[String, S3.Key]]
case class Exit(exitLog: Log, dumpResult: Option[Either[String, S3.Key]]) extends LoaderA[Int]
case class Print(message: String) extends LoaderA[Unit]

// Cache ops
case class Put(key: String, value: Option[S3.Key]) extends LoaderA[Unit]
Expand All @@ -61,9 +62,8 @@ object LoaderA {
case class GetEc2Property(name: String) extends LoaderA[Either[LoaderError, String]]


/** Get *all* S3 keys prefixed with some folder */
def listS3(bucket: S3.Folder): Action[Either[DiscoveryError, List[S3.Key]]] =
Free.liftF[LoaderA, Either[DiscoveryError, List[S3.Key]]](ListS3(bucket))
def listS3(bucket: S3.Folder): Action[Either[LoaderError, List[S3.Key]]] =
Free.liftF[LoaderA, Either[LoaderError, List[S3.Key]]](ListS3(bucket))

/** Check if S3 key exist */
def keyExists(key: S3.Key): Action[Boolean] =
Expand All @@ -79,8 +79,10 @@ object LoaderA {
Free.liftF[LoaderA, Either[LoaderError, Long]](ExecuteQuery(query))

/** Execute multiple (against target in interpreter) */
def executeQueries(queries: List[SqlString]): Action[Either[LoaderError, Unit]] =
queries.traverse(executeQuery).map(eithers => eithers.sequence.map(_.combineAll))
def executeQueries(queries: List[SqlString]): Action[Either[LoaderError, Unit]] = {
val shortCircuiting = queries.traverse(query => EitherT(executeQuery(query)))
shortCircuiting.void.value
}

/** Execute SQL transaction (against target in interpreter) */
def executeTransaction(queries: List[SqlString]): Action[Either[LoaderError, Unit]] = {
Expand Down Expand Up @@ -121,6 +123,10 @@ object LoaderA {
def exit(result: Log, dumpResult: Option[Either[String, S3.Key]]): Action[Int] =
Free.liftF[LoaderA, Int](Exit(result, dumpResult))

/** Print message to stdout */
def print(message: String): Action[Unit] =
Free.liftF[LoaderA, Unit](Print(message))


/** Put value into cache (stored in interpreter) */
def putCache(key: String, value: Option[S3.Key]): Action[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ object LoaderError {
* Contains multiple step failures
*/
case class DiscoveryError(failures: List[DiscoveryFailure]) extends LoaderError
object DiscoveryError {
def apply(single: DiscoveryFailure): LoaderError = DiscoveryError(List(single))
}

/**
* Error representing failure on database loading (or executing any statements)
Expand Down
Loading

0 comments on commit ddbde29

Please sign in to comment.