Skip to content

Commit

Permalink
RDB Loader: improve log output (close #23)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jan 15, 2018
1 parent 2343b1d commit d3255eb
Show file tree
Hide file tree
Showing 21 changed files with 393 additions and 402 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
lazy val loader = project.in(file("."))
.settings(
name := "snowplow-rdb-loader",
version := "0.14.0",
version := "0.15.0-rc3",
initialCommands := "import com.snowplowanalytics.snowplow.rdbloader._",
mainClass in Compile := Some("com.snowplowanalytics.snowplow.rdbloader.Main")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,30 @@
*/
package com.snowplowanalytics.snowplow.rdbloader

import cats.data.{EitherT, Validated, ValidatedNel, NonEmptyList}
import cats.data._
import cats.implicits._
import cats.free.Free

import ShreddedType._
import LoaderError.{AtomicDiscoveryFailure, DiscoveryError, DiscoveryFailure, NoDataFailure}
import LoaderError._
import config.Semver

/**
* Result of data discovery in shredded.good folder
*/
sealed trait DataDiscovery extends Product with Serializable {
/**
* Shred run folder full path
*/
/** Shred run folder full path */
def base: S3.Folder

/**
* Amount of keys in atomic-events directory
*/
/** ETL id */
def runId: String = base.split("/").last

/** Amount of keys in atomic-events directory */
def atomicCardinality: Long

/**
* `atomic-events` directory full path
*/
def atomicEvents = S3.Folder.append(base, "atomic-events")
/** `atomic-events` directory full path */
def atomicEvents: S3.Folder =
S3.Folder.append(base, "atomic-events")

/**
* Time in ms for run folder to setup eventual consistency,
Expand All @@ -49,6 +47,13 @@ sealed trait DataDiscovery extends Product with Serializable {
case DataDiscovery.FullDiscovery(_, _, shreddedData) =>
((atomicCardinality * 0.1 * shreddedData.length).toLong + 5L) * 1000
}

def show: String = this match {
case DataDiscovery.AtomicDiscovery(_, cardinality) =>
s"$runId with $cardinality atomic files"
case DataDiscovery.FullDiscovery(_, cardinality, types) =>
s"$runId with $cardinality atomic files and following shredded types:\n${types.map(t => " + " + t.show).mkString("\n")}"
}
}

/**
Expand All @@ -62,11 +67,14 @@ sealed trait DataDiscovery extends Product with Serializable {
*/
object DataDiscovery {

val ConsistencyChecks = 5

/**
* ADT indicating whether shredded.good or arbitrary folder
* Empty dir will be a discovery failure in later case
* `InSpecificFolder` results on discovery error on empty folder
* `InShreddedGood` results in noop on empty folder
*/
sealed trait DiscoveryTarget
sealed trait DiscoveryTarget extends Product with Serializable
case class InShreddedGood(folder: S3.Folder) extends DiscoveryTarget
case class InSpecificFolder(folder: S3.Folder) extends DiscoveryTarget

Expand Down Expand Up @@ -96,12 +104,12 @@ object DataDiscovery {
* @return list (probably empty, but usually with single element) of discover results
* (atomic events and shredded types)
*/
def discoverFull(target: DiscoveryTarget, shredJob: Semver, region: String, assets: Option[S3.Folder]): Discovery[List[DataDiscovery]] = {
val validatedDataKeys: Discovery[ValidatedDataKeys] = target match {
def discoverFull(target: DiscoveryTarget, shredJob: Semver, region: String, assets: Option[S3.Folder]): LoaderAction[List[DataDiscovery]] = {
val validatedDataKeys: LoaderAction[ValidatedDataKeys] = target match {
case InShreddedGood(folder) =>
Discovery.map(listGoodBucket(folder))(transformKeys(shredJob, region, assets))
listGoodBucket(folder).map(transformKeys(shredJob, region, assets))
case InSpecificFolder(folder) =>
Discovery.map(listGoodBucket(folder)) { keys =>
listGoodBucket(folder).map { keys =>
if (keys.isEmpty) {
val failure = Validated.Invalid(NonEmptyList(NoDataFailure(folder), Nil))
Free.pure(failure)
Expand All @@ -110,10 +118,10 @@ object DataDiscovery {
}

val result = for {
keys <- EitherT(validatedDataKeys)
discovery <- EitherT(groupKeysFull(keys))
keys <- validatedDataKeys
discovery <- groupKeysFull(keys)
} yield discovery
result.value
result
}

/**
Expand All @@ -122,26 +130,27 @@ object DataDiscovery {
* @param target either shredded good or specific run folder
* @return list of run folders with `atomic-events`
*/
def discoverAtomic(target: DiscoveryTarget): Discovery[List[DataDiscovery]] =
def discoverAtomic(target: DiscoveryTarget): LoaderAction[List[DataDiscovery]] =
target match {
case InShreddedGood(folder) =>
Discovery.map(listGoodBucket(folder))(groupKeysAtomic).map(_.flatten)
case InSpecificFolder(folder) =>
val result = Discovery.map(listGoodBucket(folder)) { keys =>
if (keys.isEmpty) {
val error = DiscoveryError(List(NoDataFailure(folder)))
error.asLeft[List[AtomicDiscovery]]
} else groupKeysAtomic(keys)
}
for {
keys <- listGoodBucket(folder)
grouped <- LoaderAction.liftE(groupKeysAtomic(keys))
} yield grouped

result.map(_.flatten)
case InSpecificFolder(folder) =>
for {
keys <- listGoodBucket(folder)
discovery = if (keys.isEmpty) DiscoveryError(NoDataFailure(folder)).asLeft else groupKeysAtomic(keys)
result <- LoaderAction.liftE[List[DataDiscovery]](discovery)
} yield result
}

/**
* List whole directory excluding special files
*/
def listGoodBucket(folder: S3.Folder): Discovery[List[S3.Key]] =
Discovery.map(LoaderA.listS3(folder))(_.filterNot(isSpecial))
def listGoodBucket(folder: S3.Folder): LoaderAction[List[S3.Key]] =
EitherT(LoaderA.listS3(folder)).map(_.filterNot(isSpecial))

// Full discovery

Expand All @@ -151,19 +160,22 @@ object DataDiscovery {
* @param validatedDataKeys IO-action producing validated list of `FinalDataKey`
* @return IO-action producing list of
*/
def groupKeysFull(validatedDataKeys: ValidatedDataKeys): Discovery[List[DataDiscovery]] = {
def groupKeysFull(validatedDataKeys: ValidatedDataKeys): LoaderAction[List[DataDiscovery]] = {
def group(dataKeys: List[DataKeyFinal]): ValidatedNel[DiscoveryFailure, List[DataDiscovery]] =
dataKeys.groupBy(_.base).toList.reverse.traverse(validateFolderFull)

// Transform into Either with non-empty list of errors
validatedDataKeys.map { keys =>
keys.andThen(group) match {
case Validated.Valid(discovery) => Right(discovery)
case Validated.Invalid(failures) =>
val aggregated = LoaderError.aggregateDiscoveryFailures(failures.toList).distinct
Left(DiscoveryError(aggregated))
val result: Action[Either[LoaderError, List[DataDiscovery]]] =
validatedDataKeys.map { keys =>
keys.andThen(group) match {
case Validated.Valid(discovery) =>
discovery.asRight
case Validated.Invalid(failures) =>
val aggregated = LoaderError.aggregateDiscoveryFailures(failures.toList).distinct
DiscoveryError(aggregated).asLeft
}
}
}
EitherT(result)
}

/**
Expand Down Expand Up @@ -207,15 +219,18 @@ object DataDiscovery {
* @param assets optional JSONPath assets S3 bucket
* @return `Action` conaining `Validation` - as on next step we can aggregate errors
*/
private def transformDataKey(dataKey: DiscoveryStep[DataKeyIntermediate], region: String, assets: Option[S3.Folder]) = {
private def transformDataKey(
dataKey: DiscoveryStep[DataKeyIntermediate],
region: String,
assets: Option[S3.Folder]
): Action[ValidatedNel[DiscoveryFailure, DataKeyFinal]] = {
dataKey match {
case Right(ShreddedDataKeyIntermediate(fullPath, info)) =>
val jsonpathAction = ShreddedType.discoverJsonPath(region, assets, info)
val discoveryAction: DiscoveryAction[DataKeyFinal] =
DiscoveryAction.map(jsonpathAction) { jsonpath =>
ShreddedDataKeyFinal(fullPath, ShreddedType(info, jsonpath))
}
discoveryAction.map(_.toValidatedNel)
val jsonpathAction = EitherT(ShreddedType.discoverJsonPath(region, assets, info))
val discoveryAction = jsonpathAction.map { jsonpath =>
ShreddedDataKeyFinal(fullPath, ShreddedType(info, jsonpath))
}
discoveryAction.value.map(_.toValidatedNel)
case Right(AtomicDataKey(fullPath)) =>
val pure: Action[ValidatedNel[DiscoveryFailure, DataKeyFinal]] =
Free.pure(AtomicDataKey(fullPath).validNel[DiscoveryFailure])
Expand Down Expand Up @@ -253,7 +268,7 @@ object DataDiscovery {
/**
* Group list of S3 keys by run folder
*/
def groupKeysAtomic(keys: List[S3.Key]): Either[DiscoveryError, List[AtomicDiscovery]] = {
def groupKeysAtomic(keys: List[S3.Key]): Either[LoaderError, List[AtomicDiscovery]] = {
val atomicKeys: ValidatedNel[DiscoveryFailure, List[AtomicDataKey]] =
keys.filter(isAtomic).map(parseAtomicKey).map(_.toValidatedNel).sequence

Expand Down Expand Up @@ -307,39 +322,79 @@ object DataDiscovery {
* @param originalAction data-discovery action
* @return result of same request, but with more guarantees to be consistent
*/
def checkConsistency(originalAction: Discovery[List[DataDiscovery]]): Discovery[List[DataDiscovery]] = {
def check(checkAttempt: Int, last: Option[Either[DiscoveryError, List[DataDiscovery]]]): Discovery[List[DataDiscovery]] = {
val action = last.map(Free.pure[LoaderA, Either[DiscoveryError, List[DataDiscovery]]]).getOrElse(originalAction)
def checkConsistency(originalAction: LoaderAction[List[DataDiscovery]]): LoaderAction[List[DataDiscovery]] = {
def check(checkAttempt: Int, last: Option[Either[LoaderError, List[DataDiscovery]]]): ActionE[List[DataDiscovery]] = {
val action = last.map(Free.pure[LoaderA, Either[LoaderError, List[DataDiscovery]]]).getOrElse(originalAction.value)

for {
original <- action
_ <- sleepConsistency(original)
control <- originalAction
control <- originalAction.value
result <- retry(original, control, checkAttempt + 1)
} yield result
}

def retry(
original: Either[DiscoveryError, List[DataDiscovery]],
control: Either[DiscoveryError, List[DataDiscovery]],
attempt: Int): Discovery[List[DataDiscovery]] = {
if (attempt >= 5)
Free.pure(control.orElse(original))
else if (original.isRight && original == control)
Free.pure(original)
else if (control.isLeft || original.isLeft)
check(attempt, None)
else // Both Right, but not equal
check(attempt, Some(control))
original: Either[LoaderError, List[DataDiscovery]],
control: Either[LoaderError, List[DataDiscovery]],
attempt: Int): ActionE[List[DataDiscovery]] = {
(original, control) match {
case _ if attempt >= ConsistencyChecks =>
for {
_ <- LoaderA.print(s"Consistency check did not pass after $ConsistencyChecks attempts")
discovered <- Free.pure(control.orElse(original))
} yield discovered
case (Right(o), Right(c)) if o.sortBy(_.base.toString) == c.sortBy(_.base.toString) =>
val message = o.map(x => s"+ ${x.show}").mkString("\n")
for {
_ <- LoaderA.print(s"Consistency check passed after ${attempt - 1} attempt. Following run ids found:\n$message")
discovered <- Free.pure(original)
} yield discovered
case (Left(_), Left(c)) =>
for {
_ <- LoaderA.print(s"Consistency check failed. ${c.show}. Making another attempt")
next <- check(attempt, None)
} yield next
case (Right(o), Right(c)) =>
val message = discoveryDiff(o, c).map(m => s"+ $m").mkString("\n")
for {
_ <- LoaderA.print(s"Consistency check failed. Difference\n$message")
next <- check(attempt, Some(control))
} yield next
}
}

EitherT[Action, LoaderError, List[DataDiscovery]](check(1, None))
}


def discoveryDiff(original: List[DataDiscovery], control: List[DataDiscovery]): List[String] = {
original.flatMap { o =>
control.find(_.base == o.base) match {
case None => List(s"Folder ${o.base} was not found in control check (probably ghost-folder)")
case Some(c) => discoveryDiff(o, c)
}
}
}

check(1, None)
/** Get difference-message between two checks. Assuming they have same base */
private def discoveryDiff(original: DataDiscovery, control: DataDiscovery) = {
(if (original.atomicCardinality != control.atomicCardinality) List("Different cardinality of atomic files") else Nil) ++
((original, control) match {
case (o: FullDiscovery, c: FullDiscovery) =>
o.shreddedTypes.diff(c.shreddedTypes).map { d => s"${d.show} exists in first check and misses in control" } ++
c.shreddedTypes.diff(o.shreddedTypes).map { d => s"${d.show} exists in control check and misses in first" }
case (_: AtomicDiscovery, _: AtomicDiscovery) =>
Nil
case _ =>
List("Inconsistent state") // not possible
})
}

/**
* Aggregates wait time for all discovered folders or wait 10 sec in case action failed
*/
private def sleepConsistency(result: Either[DiscoveryError, List[DataDiscovery]]): Action[Unit] = {
private def sleepConsistency(result: Either[LoaderError, List[DataDiscovery]]): Action[Unit] = {
val timeoutMs = result match {
case Right(list) =>
list.map(_.consistencyTimeout).foldLeft(10000L)(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package com.snowplowanalytics.snowplow.rdbloader
import java.nio.file.Path

import cats.free.Free
import cats.data.EitherT
import cats.implicits._

// This library
import LoaderError.DiscoveryError
import Security.Tunnel
import loaders.Common.SqlString

Expand All @@ -31,7 +31,7 @@ sealed trait LoaderA[A]
object LoaderA {

// Discovery ops
case class ListS3(bucket: S3.Folder) extends LoaderA[Either[DiscoveryError, List[S3.Key]]]
case class ListS3(bucket: S3.Folder) extends LoaderA[Either[LoaderError, List[S3.Key]]]
case class KeyExists(key: S3.Key) extends LoaderA[Boolean]
case class DownloadData(path: S3.Folder, dest: Path) extends LoaderA[Either[LoaderError, List[Path]]]

Expand All @@ -48,6 +48,7 @@ object LoaderA {
case class Track(exitLog: Log) extends LoaderA[Unit]
case class Dump(key: S3.Key, exitLog: Log) extends LoaderA[Either[String, S3.Key]]
case class Exit(exitLog: Log, dumpResult: Option[Either[String, S3.Key]]) extends LoaderA[Int]
case class Print(message: String) extends LoaderA[Unit]

// Cache ops
case class Put(key: String, value: Option[S3.Key]) extends LoaderA[Unit]
Expand All @@ -61,9 +62,8 @@ object LoaderA {
case class GetEc2Property(name: String) extends LoaderA[Either[LoaderError, String]]


/** Get *all* S3 keys prefixed with some folder */
def listS3(bucket: S3.Folder): Action[Either[DiscoveryError, List[S3.Key]]] =
Free.liftF[LoaderA, Either[DiscoveryError, List[S3.Key]]](ListS3(bucket))
def listS3(bucket: S3.Folder): Action[Either[LoaderError, List[S3.Key]]] =
Free.liftF[LoaderA, Either[LoaderError, List[S3.Key]]](ListS3(bucket))

/** Check if S3 key exist */
def keyExists(key: S3.Key): Action[Boolean] =
Expand All @@ -79,8 +79,10 @@ object LoaderA {
Free.liftF[LoaderA, Either[LoaderError, Long]](ExecuteQuery(query))

/** Execute multiple (against target in interpreter) */
def executeQueries(queries: List[SqlString]): Action[Either[LoaderError, Unit]] =
queries.traverse(executeQuery).map(eithers => eithers.sequence.map(_.combineAll))
def executeQueries(queries: List[SqlString]): Action[Either[LoaderError, Unit]] = {
val shortCircuiting = queries.traverse(query => EitherT(executeQuery(query)))
shortCircuiting.void.value
}

/** Execute SQL transaction (against target in interpreter) */
def executeTransaction(queries: List[SqlString]): Action[Either[LoaderError, Unit]] = {
Expand Down Expand Up @@ -121,6 +123,10 @@ object LoaderA {
def exit(result: Log, dumpResult: Option[Either[String, S3.Key]]): Action[Int] =
Free.liftF[LoaderA, Int](Exit(result, dumpResult))

/** Print message to stdout */
def print(message: String): Action[Unit] =
Free.liftF[LoaderA, Unit](Print(message))


/** Put value into cache (stored in interpreter) */
def putCache(key: String, value: Option[S3.Key]): Action[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ object LoaderError {
* Contains multiple step failures
*/
case class DiscoveryError(failures: List[DiscoveryFailure]) extends LoaderError
object DiscoveryError {
def apply(single: DiscoveryFailure): LoaderError = DiscoveryError(List(single))
}

/**
* Error representing failure on database loading (or executing any statements)
Expand Down
Loading

0 comments on commit d3255eb

Please sign in to comment.