diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala index e8ee87511..e7238f2f4 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfig.scala @@ -72,10 +72,12 @@ object CliConfig { text("skip default steps") opt[String]("folder").valueName(""). - action((x, c) => c.copy(folder = Some(x))) + action((x, c) => c.copy(folder = Some(x))). + text("exact run folder to load") opt[Unit]("dry-run"). - action((_, c) => c.copy(dryRun = true)) + action((_, c) => c.copy(dryRun = true)). + text("do not perform loading, but print SQL statements") help("help").text("prints this usage text") @@ -109,6 +111,7 @@ object CliConfig { * @param skip sequence of of decoded steps to skip * @param logkey filename, where RDB log dump will be saved * @param dryRun if RDB Loader should just discover data and print SQL + * @param skipConsistencyCheck do not perform consistency check during discovery */ private[config] case class RawConfig( config: String, diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala index 1b8bbf913..a3a804660 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Step.scala @@ -35,6 +35,7 @@ object Step { sealed trait SkipStep extends Step with StringEnum case object Analyze extends SkipStep { def asString = "analyze" } case object Shred extends SkipStep { def asString = "shred" } + case object ConsistencyCheck extends SkipStep { def asString = "consistency_check" } /** * Step that cannot be skipped nor included diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala index 8dc132824..61a70f187 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/Common.scala @@ -51,7 +51,7 @@ object Common { * Subpath to check `atomic-events` directory presence */ val atomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r - // year month day hour minute second + // year month day hour minute second /** * Process any valid storage target diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala index de7ccdb53..2507efae0 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/PostgresqlLoader.scala @@ -39,6 +39,7 @@ object PostgresqlLoader { case None => DataDiscovery.InShreddedGood(shreddedGood) } + // Should be safe to skip consistency check as whole folder gets downloaded val discovery = DataDiscovery.discoverAtomic(discoveryTarget) val statements = PostgresqlLoadStatements.build(target.eventsTable, steps) diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala index d4376d055..f61a591db 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoader.scala @@ -77,7 +77,7 @@ object RedshiftLoader { DataDiscovery.discoverAtomic(discoveryTarget) } - val consistent = DataDiscovery.checkConsistency(discovery) + val consistent = if (steps.contains(Step.ConsistencyCheck)) DataDiscovery.checkConsistency(discovery) else discovery Discovery.map(consistent)(buildQueue(config, target, steps)) } diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala index 71c9f63e9..a262fb18f 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala @@ -28,7 +28,8 @@ class CliConfigSpec extends Specification { def is = s2""" Aggregate errors $e3 Return None on invalid CLI options $e4 Parse minimal valid configuration with specific folder $e5 - Parse dry-run configuration $e6 + Parse CLI options with dry-run $e6 + Parse CLI options with skipped consistency check $e6 """ import SpecHelpers._ @@ -40,7 +41,7 @@ class CliConfigSpec extends Specification { def is = s2""" "--target", target, "--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef") - val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred) + val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred, Step.ConsistencyCheck) val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), None, false) @@ -58,7 +59,7 @@ class CliConfigSpec extends Specification { def is = s2""" "-i", "vacuum", "--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef") - val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Vacuum) + val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Vacuum, Step.ConsistencyCheck) val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), None, false) @@ -107,7 +108,7 @@ class CliConfigSpec extends Specification { def is = s2""" "--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef", "--folder", "s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10") - val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred) + val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred, Step.ConsistencyCheck) val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), Some(dir("s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10/")), false) @@ -125,6 +126,24 @@ class CliConfigSpec extends Specification { def is = s2""" "--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef", "--folder", "s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10") + val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred, Step.ConsistencyCheck) + + val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), Some(dir("s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10/")), true) + + val result = CliConfig.parse(cli) + + result must beSome(Validated.Valid(expected)) + } + + def e7 = { + val cli = Array( + "--config", configYml, + "--resolver", resolver, + "--target", target, + "--skip", "consistency_check", + "--logkey", "s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef", + "--folder", "s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10") + val expectedSteps: Set[Step] = Set(Step.Analyze, Step.Shred) val expected = CliConfig(validConfig, validTarget, expectedSteps, s3("s3://log-bucket/run=2017-04-12-10-01-02/abcdef-1234-8912-abcdef"), Some(dir("s3://snowplow-acme/archive/enriched/run=2017-04-12-10-00-10/")), true) diff --git a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala index ba2e84d8b..336d8c3f5 100644 --- a/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala +++ b/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loaders/RedshiftLoaderSpec.scala @@ -23,9 +23,10 @@ import Common.SqlString.{unsafeCoerce => sql} import config.Step class RedshiftLoaderSpec extends Specification { def is = s2""" - Disover atomic events data and create load statements $e1 - Disover full data and create load statements $e2 + Discover atomic events data and create load statements $e1 + Discover full data and create load statements $e2 Do not fail on empty discovery $e3 + Do not sleep with disabled consistency check $e4 """ import SpecHelpers._ @@ -157,9 +158,9 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" def interpreter: LoaderA ~> Id = new (LoaderA ~> Id) { def apply[A](effect: LoaderA[A]): Id[A] = { effect match { - case LoaderA.ListS3(bucket) => Right(Nil) + case LoaderA.ListS3(_) => Right(Nil) - case LoaderA.KeyExists(k) => false + case LoaderA.KeyExists(_) => false case LoaderA.Sleep(_) => () @@ -169,8 +170,6 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" } } - val separator = "\t" - val steps: Set[Step] = Step.defaultSteps ++ Set(Step.Vacuum) val action = RedshiftLoader.run(validConfig, validTarget, steps, None) val (resultSteps, result) = action.value.run(Nil).foldMap(interpreter) @@ -181,5 +180,78 @@ class RedshiftLoaderSpec extends Specification { def is = s2""" val resultExpectation = result must beRight stepsExpectation.and(resultExpectation) } + + def e4 = { + def interpreter: LoaderA ~> Id = new (LoaderA ~> Id) { + def apply[A](effect: LoaderA[A]): Id[A] = { + effect match { + case LoaderA.ListS3(bucket) => + Right(List( + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"), + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"), + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-00001"), + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00001-dbb35260-7b12-494b-be87-e7a4b1f59906.txt"), + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00002-cba3a610-0b90-494b-be87-e7a4b1f59906.txt"), + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00003-fba35670-9b83-494b-be87-e7a4b1f59906.txt"), + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00004-fba3866a-8b90-494b-be87-e7a4b1fa9906.txt"), + S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00005-aba3568f-7b96-494b-be87-e7a4b1fa9906.txt") + )) + + case LoaderA.KeyExists(k) => + if (k == "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json") { + true + } else false + + case LoaderA.Sleep(time) => + throw new RuntimeException(s"Data-discovery should not sleep with skipped consistency check. Sleep called for [$time]") + + case action => + throw new RuntimeException(s"Unexpected Action [$action]") + } + } + } + + val separator = "\t" + + val steps: Set[Step] = (Step.defaultSteps - Step.ConsistencyCheck) ++ Set(Step.Vacuum) + val action = RedshiftLoader.discover(validConfig, validTarget, steps, None) + val result: Either[LoaderError, List[RedshiftLoadStatements]] = action.foldMap(interpreter) + + val atomic = s""" + |COPY atomic.events FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/atomic-events/' + | CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' + | DELIMITER '$separator' MAXERROR 1 + | EMPTYASNULL FILLRECORD TRUNCATECOLUMNS + | TIMEFORMAT 'auto' ACCEPTINVCHARS ;""".stripMargin + + val vacuum = List( + sql("VACUUM SORT ONLY atomic.events;"), + sql("VACUUM SORT ONLY atomic.com_snowplowanalytics_snowplow_submit_form_1;")) + + val analyze = List( + sql("ANALYZE atomic.events;"), + sql("ANALYZE atomic.com_snowplowanalytics_snowplow_submit_form_1;")) + + val shredded = List(sql(""" + |COPY atomic.com_snowplowanalytics_snowplow_submit_form_1 FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-' + | CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/submit_form_1.json' + | REGION AS 'us-east-1' + | MAXERROR 1 TRUNCATECOLUMNS TIMEFORMAT 'auto' + | ACCEPTINVCHARS ;""".stripMargin)) + + val manifest = """ + |INSERT INTO atomic.manifest + | SELECT etl_tstamp, sysdate AS commit_tstamp, count(*) AS event_count, 1 AS shredded_cardinality + | FROM atomic.events + | WHERE etl_tstamp IS NOT null + | GROUP BY 1 + | ORDER BY etl_tstamp DESC + | LIMIT 1;""".stripMargin + + val expected = List(RedshiftLoadStatements(sql(atomic), shredded, Some(vacuum), Some(analyze), sql(manifest))) + + result.map(_.head) must beRight(expected.head) + } + }