Skip to content

Commit

Permalink
RDB Loader: fix eventual consistency problem (snowplow/snowplow#3113)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Sep 5, 2017
1 parent 2600820 commit 2dccaad
Show file tree
Hide file tree
Showing 19 changed files with 326 additions and 240 deletions.
2 changes: 1 addition & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object BuildSettings {
sourceGenerators in Compile += Def.task {
val file = (sourceManaged in Compile).value / "settings.scala"
IO.write(file, """package com.snowplowanalytics.snowplow.rdbloader.generated
|object ProjectSettings {
|object ProjectMetadata {
| val version = "%s"
| val name = "%s"
| val organization = "%s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.data.{EitherT, Validated, ValidatedNel}
import cats.implicits._
import cats.free.Free
import ShreddedType._
import LoaderError.{AtomicDiscoveryFailure, DiscoveryError, DiscoveryFailure, NoDataDiscovered}
import LoaderError.{AtomicDiscoveryFailure, DiscoveryError, DiscoveryFailure}
import config.Semver

/**
Expand All @@ -26,18 +26,28 @@ sealed trait DataDiscovery extends Product with Serializable {
/**
* Shred run folder full path
*/
def base: S3.Folder = S3.Folder.getParent(atomicEvents)
def base: S3.Folder

/**
* Amount of keys in atomic-events directory
*/
def atomicCardinality: Long

/**
* `atomic-events` directory full path
*/
def atomicEvents = S3.Key.getParent(atomicData.head)
def atomicEvents = S3.Folder.append(base, "atomic-events")

/**
* List of files in `atomic-events` dir
* Exactly one `atomic-events` directory must be present
* Time in ms for run folder to setup eventual consistency,
* based on amount of atomic and shredded files
*/
def atomicData: DataDiscovery.AtomicData
def consistencyTimeout: Long = this match {
case DataDiscovery.AtomicDiscovery(_, _) =>
((atomicCardinality * 0.1).toLong + 5L) * 1000
case DataDiscovery.FullDiscovery(_, _, shreddedData) =>
((atomicCardinality * 0.1 * shreddedData.length).toLong + 5L) * 1000
}
}

/**
Expand All @@ -51,26 +61,16 @@ sealed trait DataDiscovery extends Product with Serializable {
*/
object DataDiscovery {

/**
* All objects from single `atomic-events` directory
*/
type AtomicData = List[S3.Key]

/**
* Shredded types, each containing whole list of objects in it
*/
type ShreddedData = Map[ShreddedType, List[S3.Key]]

/**
* Discovery result that contains only atomic data (list of S3 keys in `atomic-events`)
*/
case class AtomicDiscovery(atomicData: AtomicData) extends DataDiscovery
case class AtomicDiscovery(base: S3.Folder, atomicCardinality: Long) extends DataDiscovery

/**
* Full discovery result that contains both atomic data (list of S3 keys in `atomic-events`)
* and shredded data
*/
case class FullDiscovery(atomicData: AtomicData, shreddedTypes: ShreddedData) extends DataDiscovery
case class FullDiscovery(base: S3.Folder, atomicCardinality: Long, shreddedTypes: List[ShreddedType]) extends DataDiscovery

/**
* Discover list of shred run folders, each containing
Expand All @@ -84,7 +84,7 @@ object DataDiscovery {
* @param shredJob shred job version to check path pattern
* @param region AWS region for S3 buckets
* @param assets optional JSONPath assets S3 bucket
* @return non-empty list (usually with single element) of discover results
* @return list (probably empty, but usually with single element) of discover results
* (atomic events and shredded types)
*/
def discoverFull(shreddedGood: S3.Folder, shredJob: Semver, region: String, assets: Option[S3.Folder]): Discovery[List[DataDiscovery]] = {
Expand All @@ -94,8 +94,7 @@ object DataDiscovery {
val result = for {
keys <- EitherT(validatedDataKeys)
discovery <- EitherT(groupKeysFull(keys))
list <- EitherT.fromEither[Action](checkNonEmpty(discovery))
} yield list
} yield discovery
result.value
}

Expand Down Expand Up @@ -152,8 +151,8 @@ object DataDiscovery {
}

if (atomicKeys.nonEmpty) {
val shreddedData = shreddedKeys.groupBy(_.info).mapValues(dataKeys => dataKeys.map(_.key).reverse)
FullDiscovery(atomicKeys.map(_.key).reverse, shreddedData).validNel
val shreddedData = shreddedKeys.map(_.info).distinct
FullDiscovery(base, atomicKeys.length, shreddedData).validNel
} else {
AtomicDiscoveryFailure(base).invalidNel
}
Expand Down Expand Up @@ -242,7 +241,7 @@ object DataDiscovery {
def validateFolderAtomic(groupOfKeys: (S3.Folder, List[AtomicDataKey])): ValidatedNel[DiscoveryFailure, AtomicDiscovery] = {
val (base, keys) = groupOfKeys
if (keys.nonEmpty) {
AtomicDiscovery(keys.map(_.key)).validNel
AtomicDiscovery(base, keys.length).validNel
} else {
AtomicDiscoveryFailure(base).invalidNel
}
Expand Down Expand Up @@ -307,15 +306,6 @@ object DataDiscovery {
check(1, None)
}

/**
* Check that list of discovered folders is non-empty
*/
private def checkNonEmpty(discovery: List[DataDiscovery]): Either[DiscoveryError, List[DataDiscovery]] =
if (discovery.isEmpty)
Left(DiscoveryError(List(NoDataDiscovered)))
else
Right(discovery)

/**
* Aggregates wait time for all discovered folders or wait 10 sec in case action failed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,17 @@ object LoaderError {
*/
case class ShreddedTypeKeyFailure(path: S3.Key) extends DiscoveryFailure {
def getMessage: String =
s"Cannot extract contexts/self-describing from file [$path]. Corrupted shredded/good state or unexpected Snowplow Shred job version"
s"Cannot extract contexts or self-describing events from file [$path]. Corrupted shredded/good state or unexpected Snowplow Shred job version"
}

/**
* Cannot discovery shredded type in folder
*/
case class ShreddedTypeDiscoveryFailure(path: S3.Folder) extends DiscoveryFailure {
case class ShreddedTypeDiscoveryFailure(path: S3.Folder, invalidKeyCount: Int, example: S3.Key) extends DiscoveryFailure {
def getMessage: String =
s"Cannot extract contexts/self-describing from directory [$path]. Corrupted shredded/good state or unexpected Snowplow Shred job version"
s"Cannot extract contexts or self-describing events from directory [$path].\nInvalid key example: $example. Total $invalidKeyCount invalid keys.\nCorrupted shredded/good state or unexpected Snowplow Shred job version"
}

case object NoDataDiscovered extends DiscoveryFailure {
def getMessage: String = "No data found in shredded.good folder"
}


case class LoaderLocalError(message: String) extends LoaderError

/**
Expand All @@ -120,7 +115,11 @@ object LoaderError {
def aggregateDiscoveryFailures(failures: List[DiscoveryFailure]): List[DiscoveryFailure] = {
val (shreddedTypeFailures, otherFailures) = failures.span(_.isInstanceOf[ShreddedTypeKeyFailure])
val casted = shreddedTypeFailures.asInstanceOf[List[ShreddedTypeKeyFailure]]
val aggregatedByDir = casted.map(failure => ShreddedTypeDiscoveryFailure(S3.Key.getParent(failure.path))).distinct
val aggregatedByDir = casted.groupBy { failure =>
S3.Key.getParent(failure.path) }.map {
case (k, v) => ShreddedTypeDiscoveryFailure(k, v.length, v.head.path)
}.toList

aggregatedByDir ++ otherFailures
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ package com.snowplowanalytics.snowplow.rdbloader

import cats.data.Validated._

import scala.util.control.NonFatal

// This project
import interpreters.Interpreter
import config.CliConfig
Expand Down
16 changes: 9 additions & 7 deletions src/main/scala/com/snowplowanalytics/snowplow/rdbloader/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.snowplow.rdbloader

import com.amazonaws.services.s3.model.S3ObjectSummary

import cats.syntax.either._

import shapeless.tag
Expand Down Expand Up @@ -61,10 +62,6 @@ object S3 {
if (s.endsWith("/")) s
else s + "/"

private def fixPrefix(s: String): String =
if (s.startsWith("s3n")) "s3" + s.stripPrefix("s3n")
else if (s.startsWith("s3a")) "s3" + s.stripPrefix("s3a")
else s
}

/**
Expand Down Expand Up @@ -107,7 +104,8 @@ object S3 {
Folder.coerce(string)
}

def coerce(s: String) = apply(s)
def coerce(s: String): Key =
fixPrefix(s).asInstanceOf[Key]

def parse(s: String): Either[String, Key] = s match {
case _ if !correctlyPrefixed(s) => "S3 key must start with s3:// prefix".asLeft
Expand All @@ -120,9 +118,8 @@ object S3 {
/**
* Transform S3 object summary into valid S3 key string
*/
def getKey(s3ObjectSummary: S3ObjectSummary): S3.Key = {
def getKey(s3ObjectSummary: S3ObjectSummary): S3.Key =
S3.Key.coerce(s"s3://${s3ObjectSummary.getBucketName}/${s3ObjectSummary.getKey}")
}

// Tags for refined types
sealed trait S3FolderTag
Expand Down Expand Up @@ -156,4 +153,9 @@ object S3 {
case head :: tail => (head, tail.mkString("/").stripSuffix("/"))
case _ => throw new IllegalArgumentException(s"Invalid S3 key [$key] was passed") // Impossible
}

private def fixPrefix(s: String): String =
if (s.startsWith("s3n")) "s3" + s.stripPrefix("s3n")
else if (s.startsWith("s3a")) "s3" + s.stripPrefix("s3a")
else s
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ object ShreddedType {

/**
* Extract `SchemaKey` from subpath, which can be
* legacy-style (pre 1.5.0) com.acme/schema-name/jsonschema/1-0-0 or
* modern-style (post-1.5.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0
* legacy-style (pre-0.12.0) com.acme/schema-name/jsonschema/1-0-0 or
* modern-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0
* This function transforms any of above valid paths to `SchemaKey`
*
* @param subpath S3 subpath of four `SchemaKey` elements
Expand Down Expand Up @@ -238,4 +238,4 @@ object ShreddedType {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,23 @@ package com.snowplowanalytics.snowplow.rdbloader
package interpreters

import java.io.IOException
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.sql.Connection

import cats._
import cats.implicits._

import com.amazonaws.services.s3.AmazonS3

import java.sql.Connection
import java.nio.file._
import java.util.Comparator

import org.postgresql.jdbc.PgConnection

import com.snowplowanalytics.snowplow.rdbloader.LoaderError.{LoaderLocalError, StorageTargetError}

import scala.util.control.NonFatal

import com.snowplowanalytics.snowplow.scalatracker.Tracker

// This project
import config.CliConfig
import LoaderA._
import LoaderError.LoaderLocalError
import utils.Common
import com.snowplowanalytics.snowplow.rdbloader.{ Log => ExitLog }

Expand All @@ -47,7 +42,7 @@ import com.snowplowanalytics.snowplow.rdbloader.{ Log => ExitLog }
*/
class Interpreter private(
cliConfig: CliConfig,
dbConnection: Connection,
dbConnection: Either[LoaderError, Connection],
amazonS3: AmazonS3,
tracker: Option[Tracker]) {

Expand All @@ -63,23 +58,38 @@ class Interpreter private(
S3Interpreter.downloadData(amazonS3, source, dest)

case ExecuteQuery(query) =>
PgInterpreter.executeQuery(dbConnection)(query)
for {
conn <- dbConnection
res <- PgInterpreter.executeQuery(conn)(query)
} yield res
case ExecuteTransaction(queries) =>
PgInterpreter.executeTransaction(dbConnection, queries)
for {
conn <- dbConnection
res <- PgInterpreter.executeTransaction(conn, queries)
} yield res
case ExecuteQueries(queries) =>
PgInterpreter.executeQueries(dbConnection, queries)
for {
conn <- dbConnection
res <- PgInterpreter.executeQueries(conn, queries)
} yield res
case CopyViaStdin(files, query) =>
PgInterpreter.copyViaStdin(dbConnection, files, query)
for {
conn <- dbConnection
res <- PgInterpreter.copyViaStdin(conn, files, query)
} yield res

case CreateTmpDir =>
try {
Right(Files.createTempDirectory("rdb-loader"))
Files.createTempDirectory("rdb-loader").asRight
} catch {
case NonFatal(e) => Left(LoaderLocalError("Cannot create temporary directory.\n" + e.toString))
case NonFatal(e) => LoaderLocalError("Cannot create temporary directory.\n" + e.toString).asLeft
}
case DeleteDir(path) =>
Files.walkFileTree(path, Interpreter.DeleteVisitor)
().asInstanceOf[Id[A]]
try {
Files.walkFileTree(path, Interpreter.DeleteVisitor).asRight[LoaderError].void
} catch {
case NonFatal(e) => LoaderLocalError(s"Cannot delete directory [${path.toString}].\n" + e.toString).asLeft
}


case Sleep(timeout) =>
Expand All @@ -95,7 +105,7 @@ class Interpreter private(
case Dump(result) =>
TrackerInterpreter.dumpStdout(amazonS3, cliConfig.logKey, result.toString)
case Exit(loadResult, dumpResult) =>
dbConnection.close()
dbConnection.foreach(c => c.close())
TrackerInterpreter.exit(loadResult, dumpResult)
}
}
Expand Down Expand Up @@ -124,6 +134,7 @@ object Interpreter {
*/
def initialize(cliConfig: CliConfig): Interpreter = {

// dbConnection is Either because not required for log dump
val dbConnection = PgInterpreter.getConnection(cliConfig.target)
val amazonS3 = S3Interpreter.getClient(cliConfig.configYaml.aws)
val tracker = TrackerInterpreter.initializeTracking(cliConfig.configYaml.monitoring)
Expand Down
Loading

0 comments on commit 2dccaad

Please sign in to comment.