Skip to content

Commit

Permalink
RDB Loader: add step to skip consistency check (close #34)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Sep 6, 2017
1 parent 21fbf33 commit a76138c
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ object CliConfig {
text("skip default steps")

opt[String]("folder").valueName("<s3-folder>").
action((x, c) => c.copy(folder = Some(x)))
action((x, c) => c.copy(folder = Some(x))).
text("exact run folder to load")

opt[Unit]("dry-run").
action((_, c) => c.copy(dryRun = true))
action((_, c) => c.copy(dryRun = true)).
text("do not perform loading, but print SQL statements")

help("help").text("prints this usage text")

Expand Down Expand Up @@ -109,6 +111,7 @@ object CliConfig {
* @param skip sequence of of decoded steps to skip
* @param logkey filename, where RDB log dump will be saved
* @param dryRun if RDB Loader should just discover data and print SQL
* @param skipConsistencyCheck do not perform consistency check during discovery
*/
private[config] case class RawConfig(
config: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object Step {
sealed trait SkipStep extends Step with StringEnum
case object Analyze extends SkipStep { def asString = "analyze" }
case object Shred extends SkipStep { def asString = "shred" }
case object ConsistencyCheck extends SkipStep { def asString = "consistency_check" }

/**
* Step that cannot be skipped nor included
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object Common {
* Subpath to check `atomic-events` directory presence
*/
val atomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r
// year month day hour minute second
// year month day hour minute second

/**
* Process any valid storage target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object PostgresqlLoader {
case None => DataDiscovery.InShreddedGood(shreddedGood)
}

// Should be safe to skip consistency check as whole folder gets downloaded
val discovery = DataDiscovery.discoverAtomic(discoveryTarget)
val statements = PostgresqlLoadStatements.build(target.eventsTable, steps)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object RedshiftLoader {
DataDiscovery.discoverAtomic(discoveryTarget)
}

val consistent = DataDiscovery.checkConsistency(discovery)
val consistent = if (steps.contains(Step.ConsistencyCheck)) DataDiscovery.checkConsistency(discovery) else discovery

Discovery.map(consistent)(buildQueue(config, target, steps))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class CliConfigSpec extends Specification { def is = s2"""
Aggregate errors $e3
Return None on invalid CLI options $e4
Parse minimal valid configuration with specific folder $e5
Parse dry-run configuration $e6
Parse CLI options with dry-run $e6
Parse CLI options with skipped consistency check $e6
"""

import SpecHelpers._
Expand All @@ -40,7 +41,7 @@ class CliConfigSpec extends Specification { def is = s2"""
"--target", target,
"--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef")

val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred)
val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred, Step.ConsistencyCheck)

val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), None, false)

Expand All @@ -58,7 +59,7 @@ class CliConfigSpec extends Specification { def is = s2"""
"-i", "vacuum",
"--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef")

val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Vacuum)
val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Vacuum, Step.ConsistencyCheck)

val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), None, false)

Expand Down Expand Up @@ -107,7 +108,7 @@ class CliConfigSpec extends Specification { def is = s2"""
"--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef",
"--folder", "s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10")

val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred)
val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred, Step.ConsistencyCheck)

val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), Some(dir("s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10/")), false)

Expand All @@ -125,6 +126,24 @@ class CliConfigSpec extends Specification { def is = s2"""
"--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef",
"--folder", "s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10")

val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred, Step.ConsistencyCheck)

val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), Some(dir("s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10/")), true)

val result = CliConfig.parse(cli)

result must beSome(Validated.Valid(expected))
}

def e7 = {
val cli = Array(
"--config", configYml,
"--resolver", resolver,
"--target", target,
"--skip", "consistency_check",
"--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef",
"--folder", "s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10")

val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred)

val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), Some(dir("s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10/")), true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import Common.SqlString.{unsafeCoerce => sql}
import config.Step

class RedshiftLoaderSpec extends Specification { def is = s2"""
Disover atomic events data and create load statements $e1
Disover full data and create load statements $e2
Discover atomic events data and create load statements $e1
Discover full data and create load statements $e2
Do not fail on empty discovery $e3
Do not sleep with disabled consistency check $e4
"""

import SpecHelpers._
Expand Down Expand Up @@ -157,9 +158,9 @@ class RedshiftLoaderSpec extends Specification { def is = s2"""
def interpreter: LoaderA ~> Id = new (LoaderA ~> Id) {
def apply[A](effect: LoaderA[A]): Id[A] = {
effect match {
case LoaderA.ListS3(bucket) => Right(Nil)
case LoaderA.ListS3(_) => Right(Nil)

case LoaderA.KeyExists(k) => false
case LoaderA.KeyExists(_) => false

case LoaderA.Sleep(_) => ()

Expand All @@ -169,8 +170,6 @@ class RedshiftLoaderSpec extends Specification { def is = s2"""
}
}

val separator = "\t"

val steps: Set[Step] = Step.defaultSteps ++ Set(Step.Vacuum)
val action = RedshiftLoader.run(validConfig, validTarget, steps, None)
val (resultSteps, result) = action.value.run(Nil).foldMap(interpreter)
Expand All @@ -181,5 +180,78 @@ class RedshiftLoaderSpec extends Specification { def is = s2"""
val resultExpectation = result must beRight
stepsExpectation.and(resultExpectation)
}

def e4 = {
def interpreter: LoaderA ~> Id = new (LoaderA ~> Id) {
def apply[A](effect: LoaderA[A]): Id[A] = {
effect match {
case LoaderA.ListS3(bucket) =>
Right(List(
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00001-dbb35260-7b12-494b-be87-e7a4b1f59906.txt"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00002-cba3a610-0b90-494b-be87-e7a4b1f59906.txt"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00003-fba35670-9b83-494b-be87-e7a4b1f59906.txt"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00004-fba3866a-8b90-494b-be87-e7a4b1fa9906.txt"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00005-aba3568f-7b96-494b-be87-e7a4b1fa9906.txt")
))

case LoaderA.KeyExists(k) =>
if (k == "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json") {
true
} else false

case LoaderA.Sleep(time) =>
throw new RuntimeException(s"Data-discovery should not sleep with skipped consistency check. Sleep called for [$time]")

case action =>
throw new RuntimeException(s"Unexpected Action [$action]")
}
}
}

val separator = "\t"

val steps: Set[Step] = (Step.defaultSteps - Step.ConsistencyCheck) ++ Set(Step.Vacuum)
val action = RedshiftLoader.discover(validConfig, validTarget, steps, None)
val result: Either[LoaderError, List[RedshiftLoadStatements]] = action.foldMap(interpreter)

val atomic = s"""
|COPY atomic.events FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/atomic-events/'
| CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1'
| DELIMITER '$separator' MAXERROR 1
| EMPTYASNULL FILLRECORD TRUNCATECOLUMNS
| TIMEFORMAT 'auto' ACCEPTINVCHARS ;""".stripMargin

val vacuum = List(
sql("VACUUM SORT ONLY atomic.events;"),
sql("VACUUM SORT ONLY atomic.com_snowplowanalytics_snowplow_submit_form_1;"))

val analyze = List(
sql("ANALYZE atomic.events;"),
sql("ANALYZE atomic.com_snowplowanalytics_snowplow_submit_form_1;"))

val shredded = List(sql("""
|COPY atomic.com_snowplowanalytics_snowplow_submit_form_1 FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-'
| CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json'
| REGION AS 'us-east-1'
| MAXERROR 1 TRUNCATECOLUMNS TIMEFORMAT 'auto'
| ACCEPTINVCHARS ;""".stripMargin))

val manifest = """
|INSERT INTO atomic.manifest
| SELECT etl_tstamp, sysdate AS commit_tstamp, count(*) AS event_count, 1 AS shredded_cardinality
| FROM atomic.events
| WHERE etl_tstamp IS NOT null
| GROUP BY 1
| ORDER BY etl_tstamp DESC
| LIMIT 1;""".stripMargin

val expected = List(RedshiftLoadStatements(sql(atomic), shredded, Some(vacuum), Some(analyze), sql(manifest)))

result.map(_.head) must beRight(expected.head)
}

}

0 comments on commit a76138c

Please sign in to comment.